""" Membership Status Transition Logic This module handles all user status transitions, validation, and automated rules. Ensures state machine integrity and prevents invalid status changes. """ from models import UserStatus, UserRole from typing import Optional, Dict, List from datetime import datetime, timezone import logging # Configure logging logger = logging.getLogger(__name__) # Define valid status transitions (state machine) ALLOWED_TRANSITIONS: Dict[UserStatus, List[UserStatus]] = { UserStatus.pending_email: [ UserStatus.pending_validation, # Email verified (normal flow) UserStatus.pre_validated, # Email verified + referred by member UserStatus.abandoned, # Timeout without verification (optional) ], UserStatus.pending_validation: [ UserStatus.pre_validated, # Attended event UserStatus.abandoned, # 90-day timeout without event ], UserStatus.pre_validated: [ UserStatus.payment_pending, # Admin validates application UserStatus.inactive, # Admin rejects (rare) ], UserStatus.payment_pending: [ UserStatus.active, # Payment successful UserStatus.abandoned, # Timeout without payment (optional) ], UserStatus.active: [ UserStatus.expired, # Subscription ended UserStatus.canceled, # User/admin cancels UserStatus.inactive, # Admin deactivates ], UserStatus.inactive: [ UserStatus.active, # Admin reactivates UserStatus.payment_pending, # Admin prompts for payment ], UserStatus.canceled: [ UserStatus.payment_pending, # User requests to rejoin UserStatus.active, # Admin reactivates with subscription ], UserStatus.expired: [ UserStatus.payment_pending, # User chooses to renew UserStatus.active, # Admin manually renews ], UserStatus.abandoned: [ UserStatus.pending_email, # Admin resets - resend verification UserStatus.pending_validation, # Admin resets - manual email verify UserStatus.payment_pending, # Admin resets - bypass requirements ], } # Define role mappings for each status STATUS_ROLE_MAP: Dict[UserStatus, UserRole] = { UserStatus.pending_email: UserRole.guest, UserStatus.pending_validation: UserRole.guest, UserStatus.pre_validated: UserRole.guest, UserStatus.payment_pending: UserRole.guest, UserStatus.active: UserRole.member, UserStatus.inactive: UserRole.guest, UserStatus.canceled: UserRole.guest, UserStatus.expired: UserRole.guest, UserStatus.abandoned: UserRole.guest, } # Define newsletter subscription rules for each status NEWSLETTER_SUBSCRIBED_STATUSES = { UserStatus.pending_validation, UserStatus.pre_validated, UserStatus.payment_pending, UserStatus.active, } class StatusTransitionError(Exception): """Raised when an invalid status transition is attempted""" pass def is_transition_allowed(from_status: UserStatus, to_status: UserStatus) -> bool: """ Check if a status transition is allowed by the state machine. Args: from_status: Current user status to_status: Target user status Returns: True if transition is allowed, False otherwise """ if from_status not in ALLOWED_TRANSITIONS: logger.warning(f"Unknown source status: {from_status}") return False return to_status in ALLOWED_TRANSITIONS[from_status] def get_allowed_transitions(current_status: UserStatus) -> List[UserStatus]: """ Get list of allowed next statuses for the current status. Args: current_status: Current user status Returns: List of allowed target statuses """ return ALLOWED_TRANSITIONS.get(current_status, []) def get_role_for_status(status: UserStatus) -> UserRole: """ Get the appropriate role for a given status. Args: status: User status Returns: Corresponding UserRole """ return STATUS_ROLE_MAP.get(status, UserRole.guest) def should_subscribe_newsletter(status: UserStatus) -> bool: """ Determine if user should be subscribed to newsletter for given status. Args: status: User status Returns: True if user should receive newsletter """ return status in NEWSLETTER_SUBSCRIBED_STATUSES def transition_user_status( user, new_status: UserStatus, reason: Optional[str] = None, admin_id: Optional[str] = None, db_session = None, send_notification: bool = True ) -> Dict[str, any]: """ Transition a user to a new status with validation and side effects. Args: user: User object (SQLAlchemy model instance) new_status: Target status to transition to reason: Optional reason for the transition admin_id: Optional admin user ID if transition is manual db_session: SQLAlchemy database session send_notification: Whether to send email notification (default True) Returns: Dictionary with transition details: { 'success': bool, 'old_status': str, 'new_status': str, 'role_changed': bool, 'newsletter_changed': bool, 'message': str } Raises: StatusTransitionError: If transition is not allowed """ old_status = user.status old_role = user.role old_newsletter = user.newsletter_subscribed # Validate transition if not is_transition_allowed(old_status, new_status): allowed = get_allowed_transitions(old_status) allowed_names = [s.value for s in allowed] error_msg = ( f"Invalid status transition: {old_status.value} → {new_status.value}. " f"Allowed transitions from {old_status.value}: {allowed_names}" ) logger.error(error_msg) raise StatusTransitionError(error_msg) # Update status user.status = new_status # Update role based on new status new_role = get_role_for_status(new_status) role_changed = new_role != old_role if role_changed: user.role = new_role # Update newsletter subscription should_subscribe = should_subscribe_newsletter(new_status) newsletter_changed = should_subscribe != old_newsletter if newsletter_changed: user.newsletter_subscribed = should_subscribe # Update timestamp user.updated_at = datetime.now(timezone.utc) # Log the transition logger.info( f"Status transition: user_id={user.id}, " f"{old_status.value} → {new_status.value}, " f"reason={reason}, admin_id={admin_id}" ) # Commit to database if session provided if db_session: db_session.commit() # Prepare notification email (actual sending should be done by caller) # This is just a flag - the API endpoint should handle the actual email notification_needed = send_notification # Build result result = { 'success': True, 'old_status': old_status.value, 'new_status': new_status.value, 'old_role': old_role.value, 'new_role': new_role.value, 'role_changed': role_changed, 'old_newsletter': old_newsletter, 'new_newsletter': should_subscribe, 'newsletter_changed': newsletter_changed, 'message': f'Successfully transitioned from {old_status.value} to {new_status.value}', 'notification_needed': notification_needed, 'reason': reason, 'admin_id': admin_id } logger.info(f"Transition result: {result}") return result def get_status_metadata(status: UserStatus) -> Dict[str, any]: """ Get metadata about a status including permissions and properties. Args: status: User status Returns: Dictionary with status metadata """ return { 'status': status.value, 'role': get_role_for_status(status).value, 'newsletter_subscribed': should_subscribe_newsletter(status), 'allowed_transitions': [s.value for s in get_allowed_transitions(status)], 'can_login': status != UserStatus.pending_email and status != UserStatus.abandoned, 'has_member_access': status == UserStatus.active, 'is_pending': status in { UserStatus.pending_email, UserStatus.pending_validation, UserStatus.pre_validated, UserStatus.payment_pending }, 'is_terminated': status in { UserStatus.canceled, UserStatus.expired, UserStatus.abandoned, UserStatus.inactive } } # Helper functions for common transitions def verify_email(user, db_session=None, is_referred: bool = False): """ Transition user after email verification. Args: user: User object db_session: Database session is_referred: Whether user was referred by a member Returns: Transition result dict """ target_status = UserStatus.pre_validated if is_referred else UserStatus.pending_validation return transition_user_status( user=user, new_status=target_status, reason="Email verified" + (" (referred by member)" if is_referred else ""), db_session=db_session, send_notification=True ) def mark_event_attendance(user, admin_id: str, db_session=None): """ Transition user after attending an event. Args: user: User object admin_id: ID of admin marking attendance db_session: Database session Returns: Transition result dict """ return transition_user_status( user=user, new_status=UserStatus.pre_validated, reason="Attended event", admin_id=admin_id, db_session=db_session, send_notification=False # Event attendance doesn't need immediate email ) def validate_application(user, admin_id: str, db_session=None): """ Admin validates user application (formerly "approve"). Args: user: User object admin_id: ID of admin validating application db_session: Database session Returns: Transition result dict """ return transition_user_status( user=user, new_status=UserStatus.payment_pending, reason="Application validated by admin", admin_id=admin_id, db_session=db_session, send_notification=True # Send payment instructions email ) def activate_membership(user, admin_id: Optional[str] = None, db_session=None): """ Activate membership after payment or manual activation. Args: user: User object admin_id: Optional ID of admin (for manual activation) db_session: Database session Returns: Transition result dict """ reason = "Payment successful" if not admin_id else "Manually activated by admin" return transition_user_status( user=user, new_status=UserStatus.active, reason=reason, admin_id=admin_id, db_session=db_session, send_notification=True # Send welcome email ) def cancel_membership(user, admin_id: Optional[str] = None, reason: str = None, db_session=None): """ Cancel membership. Args: user: User object admin_id: Optional ID of admin (if admin canceled) reason: Optional cancellation reason db_session: Database session Returns: Transition result dict """ cancel_reason = reason or ("Canceled by admin" if admin_id else "Canceled by user") return transition_user_status( user=user, new_status=UserStatus.canceled, reason=cancel_reason, admin_id=admin_id, db_session=db_session, send_notification=True # Send cancellation confirmation ) def expire_membership(user, db_session=None): """ Expire membership when subscription ends. Args: user: User object db_session: Database session Returns: Transition result dict """ return transition_user_status( user=user, new_status=UserStatus.expired, reason="Subscription ended", db_session=db_session, send_notification=True # Send renewal prompt email ) def abandon_application(user, reason: str, db_session=None): """ Mark application as abandoned due to timeout. Args: user: User object reason: Reason for abandonment (e.g., "Email verification timeout") db_session: Database session Returns: Transition result dict """ return transition_user_status( user=user, new_status=UserStatus.abandoned, reason=reason, db_session=db_session, send_notification=True # Send "incomplete application" notice ) def reactivate_user(user, target_status: UserStatus, admin_id: str, reason: str = None, db_session=None): """ Reactivate user from terminated status (admin action). Args: user: User object target_status: Status to transition to admin_id: ID of admin performing reactivation reason: Optional reason for reactivation db_session: Database session Returns: Transition result dict """ reactivation_reason = reason or f"Reactivated by admin to {target_status.value}" return transition_user_status( user=user, new_status=target_status, reason=reactivation_reason, admin_id=admin_id, db_session=db_session, send_notification=True ) # Background job functions (to be called by scheduler) def check_pending_email_timeouts(db_session, timeout_days: int = 30): """ Check for users in pending_email status past timeout and transition to abandoned. This should be run as a daily background job. Args: db_session: Database session timeout_days: Number of days before abandonment (0 = disabled) Returns: Number of users transitioned """ if timeout_days <= 0: return 0 from datetime import timedelta from models import User cutoff_date = datetime.now(timezone.utc) - timedelta(days=timeout_days) # Find users in pending_email status created before cutoff timeout_users = db_session.query(User).filter( User.status == UserStatus.pending_email, User.created_at < cutoff_date, User.email_verified == False ).all() count = 0 for user in timeout_users: try: abandon_application( user=user, reason=f"Email verification timeout ({timeout_days} days)", db_session=db_session ) count += 1 logger.info(f"Abandoned user {user.id} due to email verification timeout") except Exception as e: logger.error(f"Error abandoning user {user.id}: {str(e)}") return count def check_event_attendance_timeouts(db_session, timeout_days: int = 90): """ Check for users in pending_validation status past 90-day timeout. This should be run as a daily background job. Args: db_session: Database session timeout_days: Number of days before abandonment (default 90 per policy) Returns: Number of users transitioned """ from datetime import timedelta from models import User cutoff_date = datetime.now(timezone.utc) - timedelta(days=timeout_days) # Find users in pending_validation status past deadline # Note: We check updated_at (when they entered this status) not created_at timeout_users = db_session.query(User).filter( User.status == UserStatus.pending_validation, User.updated_at < cutoff_date ).all() count = 0 for user in timeout_users: try: abandon_application( user=user, reason=f"Event attendance timeout ({timeout_days} days)", db_session=db_session ) count += 1 logger.info(f"Abandoned user {user.id} due to event attendance timeout") except Exception as e: logger.error(f"Error abandoning user {user.id}: {str(e)}") return count def check_payment_timeouts(db_session, timeout_days: int = 0): """ Check for users in payment_pending status past timeout. This should be run as a daily background job. Default timeout_days=0 means never auto-abandon (recommended). Args: db_session: Database session timeout_days: Number of days before abandonment (0 = disabled) Returns: Number of users transitioned """ if timeout_days <= 0: return 0 # Disabled by default from datetime import timedelta from models import User cutoff_date = datetime.now(timezone.utc) - timedelta(days=timeout_days) timeout_users = db_session.query(User).filter( User.status == UserStatus.payment_pending, User.updated_at < cutoff_date ).all() count = 0 for user in timeout_users: try: abandon_application( user=user, reason=f"Payment timeout ({timeout_days} days)", db_session=db_session ) count += 1 logger.info(f"Abandoned user {user.id} due to payment timeout") except Exception as e: logger.error(f"Error abandoning user {user.id}: {str(e)}") return count def check_subscription_expirations(db_session): """ Check for active subscriptions past end_date and transition to expired. This should be run as a daily background job. Args: db_session: Database session Returns: Number of users transitioned """ from models import User, Subscription from sqlalchemy import and_ today = datetime.now(timezone.utc).date() # Find active users with expired subscriptions expired_subs = db_session.query(User, Subscription).join( Subscription, User.id == Subscription.user_id ).filter( and_( User.status == UserStatus.active, Subscription.end_date < today ) ).all() count = 0 for user, subscription in expired_subs: try: expire_membership(user=user, db_session=db_session) count += 1 logger.info(f"Expired user {user.id} - subscription ended {subscription.end_date}") except Exception as e: logger.error(f"Error expiring user {user.id}: {str(e)}") return count