Files
membership-be/status_transitions.py
2025-12-16 20:03:50 +07:00

625 lines
18 KiB
Python

"""
Membership Status Transition Logic
This module handles all user status transitions, validation, and automated rules.
Ensures state machine integrity and prevents invalid status changes.
"""
from models import UserStatus, UserRole
from typing import Optional, Dict, List
from datetime import datetime, timezone
import logging
# Configure logging
logger = logging.getLogger(__name__)
# Define valid status transitions (state machine)
ALLOWED_TRANSITIONS: Dict[UserStatus, List[UserStatus]] = {
UserStatus.pending_email: [
UserStatus.pending_validation, # Email verified (normal flow)
UserStatus.pre_validated, # Email verified + referred by member
UserStatus.abandoned, # Timeout without verification (optional)
],
UserStatus.pending_validation: [
UserStatus.pre_validated, # Attended event
UserStatus.abandoned, # 90-day timeout without event
],
UserStatus.pre_validated: [
UserStatus.payment_pending, # Admin validates application
UserStatus.inactive, # Admin rejects (rare)
],
UserStatus.payment_pending: [
UserStatus.active, # Payment successful
UserStatus.abandoned, # Timeout without payment (optional)
],
UserStatus.active: [
UserStatus.expired, # Subscription ended
UserStatus.canceled, # User/admin cancels
UserStatus.inactive, # Admin deactivates
],
UserStatus.inactive: [
UserStatus.active, # Admin reactivates
UserStatus.payment_pending, # Admin prompts for payment
],
UserStatus.canceled: [
UserStatus.payment_pending, # User requests to rejoin
UserStatus.active, # Admin reactivates with subscription
],
UserStatus.expired: [
UserStatus.payment_pending, # User chooses to renew
UserStatus.active, # Admin manually renews
],
UserStatus.abandoned: [
UserStatus.pending_email, # Admin resets - resend verification
UserStatus.pending_validation, # Admin resets - manual email verify
UserStatus.payment_pending, # Admin resets - bypass requirements
],
}
# Define role mappings for each status
STATUS_ROLE_MAP: Dict[UserStatus, UserRole] = {
UserStatus.pending_email: UserRole.guest,
UserStatus.pending_validation: UserRole.guest,
UserStatus.pre_validated: UserRole.guest,
UserStatus.payment_pending: UserRole.guest,
UserStatus.active: UserRole.member,
UserStatus.inactive: UserRole.guest,
UserStatus.canceled: UserRole.guest,
UserStatus.expired: UserRole.guest,
UserStatus.abandoned: UserRole.guest,
}
# Define newsletter subscription rules for each status
NEWSLETTER_SUBSCRIBED_STATUSES = {
UserStatus.pending_validation,
UserStatus.pre_validated,
UserStatus.payment_pending,
UserStatus.active,
}
class StatusTransitionError(Exception):
"""Raised when an invalid status transition is attempted"""
pass
def is_transition_allowed(from_status: UserStatus, to_status: UserStatus) -> bool:
"""
Check if a status transition is allowed by the state machine.
Args:
from_status: Current user status
to_status: Target user status
Returns:
True if transition is allowed, False otherwise
"""
if from_status not in ALLOWED_TRANSITIONS:
logger.warning(f"Unknown source status: {from_status}")
return False
return to_status in ALLOWED_TRANSITIONS[from_status]
def get_allowed_transitions(current_status: UserStatus) -> List[UserStatus]:
"""
Get list of allowed next statuses for the current status.
Args:
current_status: Current user status
Returns:
List of allowed target statuses
"""
return ALLOWED_TRANSITIONS.get(current_status, [])
def get_role_for_status(status: UserStatus) -> UserRole:
"""
Get the appropriate role for a given status.
Args:
status: User status
Returns:
Corresponding UserRole
"""
return STATUS_ROLE_MAP.get(status, UserRole.guest)
def should_subscribe_newsletter(status: UserStatus) -> bool:
"""
Determine if user should be subscribed to newsletter for given status.
Args:
status: User status
Returns:
True if user should receive newsletter
"""
return status in NEWSLETTER_SUBSCRIBED_STATUSES
def transition_user_status(
user,
new_status: UserStatus,
reason: Optional[str] = None,
admin_id: Optional[str] = None,
db_session = None,
send_notification: bool = True
) -> Dict[str, any]:
"""
Transition a user to a new status with validation and side effects.
Args:
user: User object (SQLAlchemy model instance)
new_status: Target status to transition to
reason: Optional reason for the transition
admin_id: Optional admin user ID if transition is manual
db_session: SQLAlchemy database session
send_notification: Whether to send email notification (default True)
Returns:
Dictionary with transition details:
{
'success': bool,
'old_status': str,
'new_status': str,
'role_changed': bool,
'newsletter_changed': bool,
'message': str
}
Raises:
StatusTransitionError: If transition is not allowed
"""
old_status = user.status
old_role = user.role
old_newsletter = user.newsletter_subscribed
# Validate transition
if not is_transition_allowed(old_status, new_status):
allowed = get_allowed_transitions(old_status)
allowed_names = [s.value for s in allowed]
error_msg = (
f"Invalid status transition: {old_status.value}{new_status.value}. "
f"Allowed transitions from {old_status.value}: {allowed_names}"
)
logger.error(error_msg)
raise StatusTransitionError(error_msg)
# Update status
user.status = new_status
# Update role based on new status
new_role = get_role_for_status(new_status)
role_changed = new_role != old_role
if role_changed:
user.role = new_role
# Update newsletter subscription
should_subscribe = should_subscribe_newsletter(new_status)
newsletter_changed = should_subscribe != old_newsletter
if newsletter_changed:
user.newsletter_subscribed = should_subscribe
# Update timestamp
user.updated_at = datetime.now(timezone.utc)
# Log the transition
logger.info(
f"Status transition: user_id={user.id}, "
f"{old_status.value}{new_status.value}, "
f"reason={reason}, admin_id={admin_id}"
)
# Commit to database if session provided
if db_session:
db_session.commit()
# Prepare notification email (actual sending should be done by caller)
# This is just a flag - the API endpoint should handle the actual email
notification_needed = send_notification
# Build result
result = {
'success': True,
'old_status': old_status.value,
'new_status': new_status.value,
'old_role': old_role.value,
'new_role': new_role.value,
'role_changed': role_changed,
'old_newsletter': old_newsletter,
'new_newsletter': should_subscribe,
'newsletter_changed': newsletter_changed,
'message': f'Successfully transitioned from {old_status.value} to {new_status.value}',
'notification_needed': notification_needed,
'reason': reason,
'admin_id': admin_id
}
logger.info(f"Transition result: {result}")
return result
def get_status_metadata(status: UserStatus) -> Dict[str, any]:
"""
Get metadata about a status including permissions and properties.
Args:
status: User status
Returns:
Dictionary with status metadata
"""
return {
'status': status.value,
'role': get_role_for_status(status).value,
'newsletter_subscribed': should_subscribe_newsletter(status),
'allowed_transitions': [s.value for s in get_allowed_transitions(status)],
'can_login': status != UserStatus.pending_email and status != UserStatus.abandoned,
'has_member_access': status == UserStatus.active,
'is_pending': status in {
UserStatus.pending_email,
UserStatus.pending_validation,
UserStatus.pre_validated,
UserStatus.payment_pending
},
'is_terminated': status in {
UserStatus.canceled,
UserStatus.expired,
UserStatus.abandoned,
UserStatus.inactive
}
}
# Helper functions for common transitions
def verify_email(user, db_session=None, is_referred: bool = False):
"""
Transition user after email verification.
Args:
user: User object
db_session: Database session
is_referred: Whether user was referred by a member
Returns:
Transition result dict
"""
target_status = UserStatus.pre_validated if is_referred else UserStatus.pending_validation
return transition_user_status(
user=user,
new_status=target_status,
reason="Email verified" + (" (referred by member)" if is_referred else ""),
db_session=db_session,
send_notification=True
)
def mark_event_attendance(user, admin_id: str, db_session=None):
"""
Transition user after attending an event.
Args:
user: User object
admin_id: ID of admin marking attendance
db_session: Database session
Returns:
Transition result dict
"""
return transition_user_status(
user=user,
new_status=UserStatus.pre_validated,
reason="Attended event",
admin_id=admin_id,
db_session=db_session,
send_notification=False # Event attendance doesn't need immediate email
)
def validate_application(user, admin_id: str, db_session=None):
"""
Admin validates user application (formerly "approve").
Args:
user: User object
admin_id: ID of admin validating application
db_session: Database session
Returns:
Transition result dict
"""
return transition_user_status(
user=user,
new_status=UserStatus.payment_pending,
reason="Application validated by admin",
admin_id=admin_id,
db_session=db_session,
send_notification=True # Send payment instructions email
)
def activate_membership(user, admin_id: Optional[str] = None, db_session=None):
"""
Activate membership after payment or manual activation.
Args:
user: User object
admin_id: Optional ID of admin (for manual activation)
db_session: Database session
Returns:
Transition result dict
"""
reason = "Payment successful" if not admin_id else "Manually activated by admin"
return transition_user_status(
user=user,
new_status=UserStatus.active,
reason=reason,
admin_id=admin_id,
db_session=db_session,
send_notification=True # Send welcome email
)
def cancel_membership(user, admin_id: Optional[str] = None, reason: str = None, db_session=None):
"""
Cancel membership.
Args:
user: User object
admin_id: Optional ID of admin (if admin canceled)
reason: Optional cancellation reason
db_session: Database session
Returns:
Transition result dict
"""
cancel_reason = reason or ("Canceled by admin" if admin_id else "Canceled by user")
return transition_user_status(
user=user,
new_status=UserStatus.canceled,
reason=cancel_reason,
admin_id=admin_id,
db_session=db_session,
send_notification=True # Send cancellation confirmation
)
def expire_membership(user, db_session=None):
"""
Expire membership when subscription ends.
Args:
user: User object
db_session: Database session
Returns:
Transition result dict
"""
return transition_user_status(
user=user,
new_status=UserStatus.expired,
reason="Subscription ended",
db_session=db_session,
send_notification=True # Send renewal prompt email
)
def abandon_application(user, reason: str, db_session=None):
"""
Mark application as abandoned due to timeout.
Args:
user: User object
reason: Reason for abandonment (e.g., "Email verification timeout")
db_session: Database session
Returns:
Transition result dict
"""
return transition_user_status(
user=user,
new_status=UserStatus.abandoned,
reason=reason,
db_session=db_session,
send_notification=True # Send "incomplete application" notice
)
def reactivate_user(user, target_status: UserStatus, admin_id: str, reason: str = None, db_session=None):
"""
Reactivate user from terminated status (admin action).
Args:
user: User object
target_status: Status to transition to
admin_id: ID of admin performing reactivation
reason: Optional reason for reactivation
db_session: Database session
Returns:
Transition result dict
"""
reactivation_reason = reason or f"Reactivated by admin to {target_status.value}"
return transition_user_status(
user=user,
new_status=target_status,
reason=reactivation_reason,
admin_id=admin_id,
db_session=db_session,
send_notification=True
)
# Background job functions (to be called by scheduler)
def check_pending_email_timeouts(db_session, timeout_days: int = 30):
"""
Check for users in pending_email status past timeout and transition to abandoned.
This should be run as a daily background job.
Args:
db_session: Database session
timeout_days: Number of days before abandonment (0 = disabled)
Returns:
Number of users transitioned
"""
if timeout_days <= 0:
return 0
from datetime import timedelta
from models import User
cutoff_date = datetime.now(timezone.utc) - timedelta(days=timeout_days)
# Find users in pending_email status created before cutoff
timeout_users = db_session.query(User).filter(
User.status == UserStatus.pending_email,
User.created_at < cutoff_date,
User.email_verified == False
).all()
count = 0
for user in timeout_users:
try:
abandon_application(
user=user,
reason=f"Email verification timeout ({timeout_days} days)",
db_session=db_session
)
count += 1
logger.info(f"Abandoned user {user.id} due to email verification timeout")
except Exception as e:
logger.error(f"Error abandoning user {user.id}: {str(e)}")
return count
def check_event_attendance_timeouts(db_session, timeout_days: int = 90):
"""
Check for users in pending_validation status past 90-day timeout.
This should be run as a daily background job.
Args:
db_session: Database session
timeout_days: Number of days before abandonment (default 90 per policy)
Returns:
Number of users transitioned
"""
from datetime import timedelta
from models import User
cutoff_date = datetime.now(timezone.utc) - timedelta(days=timeout_days)
# Find users in pending_validation status past deadline
# Note: We check updated_at (when they entered this status) not created_at
timeout_users = db_session.query(User).filter(
User.status == UserStatus.pending_validation,
User.updated_at < cutoff_date
).all()
count = 0
for user in timeout_users:
try:
abandon_application(
user=user,
reason=f"Event attendance timeout ({timeout_days} days)",
db_session=db_session
)
count += 1
logger.info(f"Abandoned user {user.id} due to event attendance timeout")
except Exception as e:
logger.error(f"Error abandoning user {user.id}: {str(e)}")
return count
def check_payment_timeouts(db_session, timeout_days: int = 0):
"""
Check for users in payment_pending status past timeout.
This should be run as a daily background job.
Default timeout_days=0 means never auto-abandon (recommended).
Args:
db_session: Database session
timeout_days: Number of days before abandonment (0 = disabled)
Returns:
Number of users transitioned
"""
if timeout_days <= 0:
return 0 # Disabled by default
from datetime import timedelta
from models import User
cutoff_date = datetime.now(timezone.utc) - timedelta(days=timeout_days)
timeout_users = db_session.query(User).filter(
User.status == UserStatus.payment_pending,
User.updated_at < cutoff_date
).all()
count = 0
for user in timeout_users:
try:
abandon_application(
user=user,
reason=f"Payment timeout ({timeout_days} days)",
db_session=db_session
)
count += 1
logger.info(f"Abandoned user {user.id} due to payment timeout")
except Exception as e:
logger.error(f"Error abandoning user {user.id}: {str(e)}")
return count
def check_subscription_expirations(db_session):
"""
Check for active subscriptions past end_date and transition to expired.
This should be run as a daily background job.
Args:
db_session: Database session
Returns:
Number of users transitioned
"""
from models import User, Subscription
from sqlalchemy import and_
today = datetime.now(timezone.utc).date()
# Find active users with expired subscriptions
expired_subs = db_session.query(User, Subscription).join(
Subscription, User.id == Subscription.user_id
).filter(
and_(
User.status == UserStatus.active,
Subscription.end_date < today
)
).all()
count = 0
for user, subscription in expired_subs:
try:
expire_membership(user=user, db_session=db_session)
count += 1
logger.info(f"Expired user {user.id} - subscription ended {subscription.end_date}")
except Exception as e:
logger.error(f"Error expiring user {user.id}: {str(e)}")
return count