diff --git a/__pycache__/email_service.cpython-312.pyc b/__pycache__/email_service.cpython-312.pyc index a4fb7d5..2df2f9b 100644 Binary files a/__pycache__/email_service.cpython-312.pyc and b/__pycache__/email_service.cpython-312.pyc differ diff --git a/__pycache__/models.cpython-312.pyc b/__pycache__/models.cpython-312.pyc index f5045d6..aa1ad19 100644 Binary files a/__pycache__/models.cpython-312.pyc and b/__pycache__/models.cpython-312.pyc differ diff --git a/__pycache__/wordpress_parser.cpython-312.pyc b/__pycache__/wordpress_parser.cpython-312.pyc new file mode 100644 index 0000000..8028514 Binary files /dev/null and b/__pycache__/wordpress_parser.cpython-312.pyc differ diff --git a/migrations/000_initial_schema.sql b/migrations/000_initial_schema.sql index a795b24..72591b0 100644 --- a/migrations/000_initial_schema.sql +++ b/migrations/000_initial_schema.sql @@ -77,7 +77,10 @@ CREATE TYPE importjobstatus AS ENUM ( 'processing', 'completed', 'failed', - 'partial' + 'partial', + 'validating', + 'preview_ready', + 'rolled_back' ); COMMIT; @@ -152,6 +155,12 @@ CREATE TABLE IF NOT EXISTS users ( reminder_60_days_sent BOOLEAN DEFAULT FALSE, reminder_85_days_sent BOOLEAN DEFAULT FALSE, + -- WordPress Import Tracking + import_source VARCHAR(50), + import_job_id UUID REFERENCES import_jobs(id), + wordpress_user_id BIGINT, + wordpress_registered_date TIMESTAMP WITH TIME ZONE, + -- Timestamps created_at TIMESTAMP WITH TIME ZONE DEFAULT CURRENT_TIMESTAMP, updated_at TIMESTAMP WITH TIME ZONE DEFAULT CURRENT_TIMESTAMP @@ -464,11 +473,30 @@ CREATE TABLE IF NOT EXISTS import_jobs ( error_count INTEGER DEFAULT 0, error_log JSONB DEFAULT '[]'::jsonb, + -- WordPress import enhancements + field_mapping JSONB DEFAULT '{}'::jsonb, + wordpress_metadata JSONB DEFAULT '{}'::jsonb, + imported_user_ids JSONB DEFAULT '[]'::jsonb, + rollback_at TIMESTAMP WITH TIME ZONE, + rollback_by UUID REFERENCES users(id), + started_by UUID REFERENCES users(id), started_at TIMESTAMP WITH TIME ZONE DEFAULT CURRENT_TIMESTAMP, completed_at TIMESTAMP WITH TIME ZONE ); +-- Import Rollback Audit table (for tracking rollback operations) +CREATE TABLE IF NOT EXISTS import_rollback_audit ( + id UUID PRIMARY KEY DEFAULT gen_random_uuid(), + import_job_id UUID NOT NULL REFERENCES import_jobs(id), + rolled_back_by UUID NOT NULL REFERENCES users(id), + rolled_back_at TIMESTAMP WITH TIME ZONE NOT NULL DEFAULT NOW(), + deleted_user_count INTEGER NOT NULL, + deleted_user_ids JSONB NOT NULL, + reason TEXT, + created_at TIMESTAMP WITH TIME ZONE NOT NULL DEFAULT NOW() +); + COMMIT; -- Display progress @@ -488,6 +516,8 @@ CREATE INDEX IF NOT EXISTS idx_users_role_id ON users(role_id); CREATE INDEX IF NOT EXISTS idx_users_email_verified ON users(email_verified); CREATE INDEX IF NOT EXISTS idx_users_rejected_at ON users(rejected_at) WHERE rejected_at IS NOT NULL; CREATE INDEX IF NOT EXISTS idx_users_created_at ON users(created_at); +CREATE INDEX IF NOT EXISTS idx_users_import_job ON users(import_job_id) WHERE import_job_id IS NOT NULL; +CREATE INDEX IF NOT EXISTS idx_users_import_source ON users(import_source) WHERE import_source IS NOT NULL; -- Events table indexes CREATE INDEX IF NOT EXISTS idx_events_created_by ON events(created_by); @@ -514,6 +544,14 @@ CREATE INDEX IF NOT EXISTS idx_donation_type ON donations(donation_type); CREATE INDEX IF NOT EXISTS idx_donation_status ON donations(status); CREATE INDEX IF NOT EXISTS idx_donation_created ON donations(created_at); +-- Import Jobs indexes +CREATE INDEX IF NOT EXISTS idx_import_jobs_status ON import_jobs(status); +CREATE INDEX IF NOT EXISTS idx_import_jobs_started_by ON import_jobs(started_by); + +-- Import Rollback Audit indexes +CREATE INDEX IF NOT EXISTS idx_rollback_audit_import_job ON import_rollback_audit(import_job_id); +CREATE INDEX IF NOT EXISTS idx_rollback_audit_rolled_back_at ON import_rollback_audit(rolled_back_at DESC); + -- Permissions indexes CREATE INDEX IF NOT EXISTS idx_permissions_code ON permissions(code); CREATE INDEX IF NOT EXISTS idx_permissions_module ON permissions(module); diff --git a/migrations/011_wordpress_import_enhancements.sql b/migrations/011_wordpress_import_enhancements.sql new file mode 100644 index 0000000..2b1754c --- /dev/null +++ b/migrations/011_wordpress_import_enhancements.sql @@ -0,0 +1,153 @@ +-- Migration: 011_wordpress_import_enhancements +-- Purpose: Enhance ImportJob and User tables for WordPress CSV import feature +-- Date: 2025-12-24 +-- Author: Claude Code + +-- ============================================================================ +-- PART 1: Enhance ImportJob Table +-- ============================================================================ + +-- Add new columns to import_jobs table for WordPress import tracking +ALTER TABLE import_jobs +ADD COLUMN IF NOT EXISTS field_mapping JSONB DEFAULT '{}'::jsonb, +ADD COLUMN IF NOT EXISTS wordpress_metadata JSONB DEFAULT '{}'::jsonb, +ADD COLUMN IF NOT EXISTS imported_user_ids JSONB DEFAULT '[]'::jsonb, +ADD COLUMN IF NOT EXISTS rollback_at TIMESTAMP WITH TIME ZONE, +ADD COLUMN IF NOT EXISTS rollback_by UUID REFERENCES users(id); + +-- Add comments for documentation +COMMENT ON COLUMN import_jobs.field_mapping IS 'Maps CSV columns to database fields: {csv_column: db_field}'; +COMMENT ON COLUMN import_jobs.wordpress_metadata IS 'Stores preview data, validation results, and WordPress-specific metadata'; +COMMENT ON COLUMN import_jobs.imported_user_ids IS 'Array of user IDs created from this import job (for rollback)'; +COMMENT ON COLUMN import_jobs.rollback_at IS 'Timestamp when this import was rolled back'; +COMMENT ON COLUMN import_jobs.rollback_by IS 'Admin user who performed the rollback'; + +-- ============================================================================ +-- PART 2: Add New ImportJob Status Values +-- ============================================================================ + +-- Add new status values for import workflow +-- Note: PostgreSQL enum values cannot be added conditionally, so we use DO block +DO $$ +BEGIN + -- Add 'validating' status if it doesn't exist + IF NOT EXISTS (SELECT 1 FROM pg_enum WHERE enumlabel = 'validating' AND enumtypid = (SELECT oid FROM pg_type WHERE typname = 'importjobstatus')) THEN + ALTER TYPE importjobstatus ADD VALUE 'validating'; + END IF; + + -- Add 'preview_ready' status if it doesn't exist + IF NOT EXISTS (SELECT 1 FROM pg_enum WHERE enumlabel = 'preview_ready' AND enumtypid = (SELECT oid FROM pg_type WHERE typname = 'importjobstatus')) THEN + ALTER TYPE importjobstatus ADD VALUE 'preview_ready'; + END IF; + + -- Add 'rolled_back' status if it doesn't exist + IF NOT EXISTS (SELECT 1 FROM pg_enum WHERE enumlabel = 'rolled_back' AND enumtypid = (SELECT oid FROM pg_type WHERE typname = 'importjobstatus')) THEN + ALTER TYPE importjobstatus ADD VALUE 'rolled_back'; + END IF; +END$$; + +-- ============================================================================ +-- PART 3: Enhance User Table for Import Tracking +-- ============================================================================ + +-- Add columns to track import source and WordPress metadata +ALTER TABLE users +ADD COLUMN IF NOT EXISTS import_source VARCHAR(50), +ADD COLUMN IF NOT EXISTS import_job_id UUID REFERENCES import_jobs(id), +ADD COLUMN IF NOT EXISTS wordpress_user_id BIGINT, +ADD COLUMN IF NOT EXISTS wordpress_registered_date TIMESTAMP WITH TIME ZONE; + +-- Add comments for documentation +COMMENT ON COLUMN users.import_source IS 'Source of user creation: wordpress, manual, registration, etc.'; +COMMENT ON COLUMN users.import_job_id IS 'Reference to import job that created this user (if imported)'; +COMMENT ON COLUMN users.wordpress_user_id IS 'Original WordPress user ID for reference'; +COMMENT ON COLUMN users.wordpress_registered_date IS 'Original WordPress registration date'; + +-- ============================================================================ +-- PART 4: Create Indexes for Performance +-- ============================================================================ + +-- Index for querying users by import job (used in rollback) +CREATE INDEX IF NOT EXISTS idx_users_import_job +ON users(import_job_id) +WHERE import_job_id IS NOT NULL; + +-- Index for querying users by import source +CREATE INDEX IF NOT EXISTS idx_users_import_source +ON users(import_source) +WHERE import_source IS NOT NULL; + +-- Index for querying import jobs by status +CREATE INDEX IF NOT EXISTS idx_import_jobs_status +ON import_jobs(status); + +-- ============================================================================ +-- PART 5: Create Rollback Audit Table (Optional but Recommended) +-- ============================================================================ + +-- Create table to track import rollback history for audit purposes +CREATE TABLE IF NOT EXISTS import_rollback_audit ( + id UUID PRIMARY KEY DEFAULT gen_random_uuid(), + import_job_id UUID NOT NULL REFERENCES import_jobs(id), + rolled_back_by UUID NOT NULL REFERENCES users(id), + rolled_back_at TIMESTAMP WITH TIME ZONE NOT NULL DEFAULT NOW(), + deleted_user_count INTEGER NOT NULL, + deleted_user_ids JSONB NOT NULL, + reason TEXT, + created_at TIMESTAMP WITH TIME ZONE NOT NULL DEFAULT NOW() +); + +-- Index for querying rollback history +CREATE INDEX IF NOT EXISTS idx_rollback_audit_import_job +ON import_rollback_audit(import_job_id); + +CREATE INDEX IF NOT EXISTS idx_rollback_audit_rolled_back_at +ON import_rollback_audit(rolled_back_at DESC); + +COMMENT ON TABLE import_rollback_audit IS 'Audit trail for import rollback operations'; + +-- ============================================================================ +-- VERIFICATION QUERIES (Run after migration to verify) +-- ============================================================================ + +-- Verify ImportJob columns exist +-- SELECT column_name, data_type +-- FROM information_schema.columns +-- WHERE table_name = 'import_jobs' +-- AND column_name IN ('field_mapping', 'wordpress_metadata', 'imported_user_ids', 'rollback_at', 'rollback_by'); + +-- Verify User columns exist +-- SELECT column_name, data_type +-- FROM information_schema.columns +-- WHERE table_name = 'users' +-- AND column_name IN ('import_source', 'import_job_id', 'wordpress_user_id', 'wordpress_registered_date'); + +-- Verify new enum values exist +-- SELECT enumlabel FROM pg_enum WHERE enumtypid = (SELECT oid FROM pg_type WHERE typname = 'importjobstatus') ORDER BY enumlabel; + +-- Verify indexes exist +-- SELECT indexname, indexdef FROM pg_indexes WHERE tablename IN ('users', 'import_jobs', 'import_rollback_audit') ORDER BY indexname; + +-- ============================================================================ +-- ROLLBACK SCRIPT (if needed) +-- ============================================================================ + +-- WARNING: This will drop all columns and data related to WordPress imports +-- USE WITH EXTREME CAUTION + +-- DROP TABLE IF EXISTS import_rollback_audit CASCADE; +-- DROP INDEX IF EXISTS idx_users_import_job; +-- DROP INDEX IF EXISTS idx_users_import_source; +-- DROP INDEX IF EXISTS idx_import_jobs_status; +-- ALTER TABLE users DROP COLUMN IF EXISTS import_source; +-- ALTER TABLE users DROP COLUMN IF EXISTS import_job_id; +-- ALTER TABLE users DROP COLUMN IF EXISTS wordpress_user_id; +-- ALTER TABLE users DROP COLUMN IF EXISTS wordpress_registered_date; +-- ALTER TABLE import_jobs DROP COLUMN IF EXISTS field_mapping; +-- ALTER TABLE import_jobs DROP COLUMN IF EXISTS wordpress_metadata; +-- ALTER TABLE import_jobs DROP COLUMN IF EXISTS imported_user_ids; +-- ALTER TABLE import_jobs DROP COLUMN IF EXISTS rollback_at; +-- ALTER TABLE import_jobs DROP COLUMN IF EXISTS rollback_by; + +-- Note: Cannot easily remove enum values from importjobstatus type without recreating it +-- Manual intervention required if rollback of enum values is needed diff --git a/models.py b/models.py index ad4f06e..a48761a 100644 --- a/models.py +++ b/models.py @@ -130,6 +130,12 @@ class User(Base): rejected_at = Column(DateTime(timezone=True), nullable=True, comment="Timestamp when application was rejected") rejected_by = Column(UUID(as_uuid=True), ForeignKey('users.id'), nullable=True, comment="Admin who rejected the application") + # WordPress Import Tracking + import_source = Column(String(50), nullable=True, comment="Source of user creation: wordpress, manual, registration") + import_job_id = Column(UUID(as_uuid=True), ForeignKey('import_jobs.id'), nullable=True, comment="Import job that created this user") + wordpress_user_id = Column(BigInteger, nullable=True, comment="Original WordPress user ID") + wordpress_registered_date = Column(DateTime(timezone=True), nullable=True, comment="Original WordPress registration date") + created_at = Column(DateTime, default=lambda: datetime.now(timezone.utc)) updated_at = Column(DateTime, default=lambda: datetime.now(timezone.utc), onupdate=lambda: datetime.now(timezone.utc)) @@ -451,6 +457,9 @@ class ImportJobStatus(enum.Enum): completed = "completed" failed = "failed" partial = "partial" + validating = "validating" + preview_ready = "preview_ready" + rolled_back = "rolled_back" class ImportJob(Base): """Track CSV import jobs with error handling""" @@ -466,6 +475,13 @@ class ImportJob(Base): status = Column(SQLEnum(ImportJobStatus), default=ImportJobStatus.processing, nullable=False) errors = Column(JSON, default=list, nullable=False) # [{row: 5, field: "email", error: "Invalid format"}] + # WordPress import enhancements + field_mapping = Column(JSON, default=dict, nullable=False) # Maps CSV columns to DB fields + wordpress_metadata = Column(JSON, default=dict, nullable=False) # Preview data, validation results + imported_user_ids = Column(JSON, default=list, nullable=False) # User IDs for rollback + rollback_at = Column(DateTime, nullable=True) + rollback_by = Column(UUID(as_uuid=True), ForeignKey("users.id"), nullable=True) + # Tracking imported_by = Column(UUID(as_uuid=True), ForeignKey("users.id"), nullable=False) started_at = Column(DateTime, default=lambda: datetime.now(timezone.utc), nullable=False) @@ -473,3 +489,22 @@ class ImportJob(Base): # Relationships importer = relationship("User", foreign_keys=[imported_by]) + rollback_user = relationship("User", foreign_keys=[rollback_by]) + + +class ImportRollbackAudit(Base): + """Audit trail for import rollback operations""" + __tablename__ = "import_rollback_audit" + + id = Column(UUID(as_uuid=True), primary_key=True, default=uuid.uuid4) + import_job_id = Column(UUID(as_uuid=True), ForeignKey("import_jobs.id"), nullable=False) + rolled_back_by = Column(UUID(as_uuid=True), ForeignKey("users.id"), nullable=False) + rolled_back_at = Column(DateTime, default=lambda: datetime.now(timezone.utc), nullable=False) + deleted_user_count = Column(Integer, nullable=False) + deleted_user_ids = Column(JSON, nullable=False) # List of deleted user UUIDs + reason = Column(Text, nullable=True) + created_at = Column(DateTime, default=lambda: datetime.now(timezone.utc), nullable=False) + + # Relationships + import_job = relationship("ImportJob") + admin_user = relationship("User", foreign_keys=[rolled_back_by]) diff --git a/requirements.txt b/requirements.txt index 49de466..4fa6855 100644 --- a/requirements.txt +++ b/requirements.txt @@ -37,6 +37,7 @@ pandas==2.3.3 passlib==1.7.4 pathspec==0.12.1 pillow==10.2.0 +phpserialize==1.3 platformdirs==4.5.0 pluggy==1.6.0 psycopg2-binary==2.9.11 diff --git a/server.py b/server.py index fe4616b..a924204 100644 --- a/server.py +++ b/server.py @@ -17,7 +17,7 @@ import csv import io from database import engine, get_db, Base -from models import User, Event, EventRSVP, UserStatus, UserRole, RSVPStatus, SubscriptionPlan, Subscription, SubscriptionStatus, StorageUsage, EventGallery, NewsletterArchive, FinancialReport, BylawsDocument, Permission, RolePermission, Role, UserInvitation, InvitationStatus, ImportJob, ImportJobStatus, Donation, DonationType, DonationStatus +from models import User, Event, EventRSVP, UserStatus, UserRole, RSVPStatus, SubscriptionPlan, Subscription, SubscriptionStatus, StorageUsage, EventGallery, NewsletterArchive, FinancialReport, BylawsDocument, Permission, RolePermission, Role, UserInvitation, InvitationStatus, ImportJob, ImportJobStatus, ImportRollbackAudit, Donation, DonationType, DonationStatus from auth import ( get_password_hash, verify_password, @@ -42,6 +42,7 @@ from email_service import ( from payment_service import create_checkout_session, verify_webhook_signature, get_subscription_end_date from r2_storage import get_r2_storage from calendar_service import CalendarService +from wordpress_parser import analyze_csv, format_preview_for_display # Load environment variables ROOT_DIR = Path(__file__).parent @@ -655,9 +656,15 @@ async def login(request: LoginRequest, db: Session = Depends(get_db)): access_token = create_access_token(data={"sub": str(user.id)}) # Clear verification token on first successful login after verification + # Don't let this fail the login if database commit fails if user.email_verified and user.email_verification_token: - user.email_verification_token = None - db.commit() + try: + user.email_verification_token = None + db.commit() + except Exception as e: + logger.warning(f"Failed to clear verification token for user {user.id}: {str(e)}") + db.rollback() + # Continue with login - this is not critical return { "access_token": access_token, @@ -887,7 +894,8 @@ async def get_member_directory( "social_media_facebook": member.social_media_facebook, "social_media_instagram": member.social_media_instagram, "social_media_twitter": member.social_media_twitter, - "social_media_linkedin": member.social_media_linkedin + "social_media_linkedin": member.social_media_linkedin, + "created_at": member.created_at.isoformat() if member.created_at else None } for member in directory_members] @api_router.get("/members/directory/{user_id}") @@ -922,7 +930,8 @@ async def get_directory_member_profile( "social_media_facebook": member.social_media_facebook, "social_media_instagram": member.social_media_instagram, "social_media_twitter": member.social_media_twitter, - "social_media_linkedin": member.social_media_linkedin + "social_media_linkedin": member.social_media_linkedin, + "created_at": member.created_at.isoformat() if member.created_at else None } # Enhanced Profile Routes (Active Members Only) @@ -1573,6 +1582,54 @@ async def rsvp_to_event( return {"message": "RSVP updated successfully"} +@api_router.get("/members/event-activity") +async def get_my_event_activity( + current_user: User = Depends(get_active_member), + db: Session = Depends(get_db) +): + """ + Get current user's event activity including upcoming RSVPs and attendance history + """ + # Get all user's RSVPs + rsvps = db.query(EventRSVP).filter( + EventRSVP.user_id == current_user.id + ).order_by(EventRSVP.created_at.desc()).all() + + # Categorize events + upcoming_events = [] + past_events = [] + now = datetime.now(timezone.utc) + + for rsvp in rsvps: + event = db.query(Event).filter(Event.id == rsvp.event_id).first() + if not event: + continue + + event_data = { + "id": str(event.id), + "title": event.title, + "description": event.description, + "location": event.location, + "start_at": event.start_at.isoformat(), + "end_at": event.end_at.isoformat(), + "rsvp_status": rsvp.rsvp_status.value, + "attended": rsvp.attended, + "attended_at": rsvp.attended_at.isoformat() if rsvp.attended_at else None + } + + # Separate upcoming vs past events + if event.end_at > now: + upcoming_events.append(event_data) + else: + past_events.append(event_data) + + return { + "upcoming_events": sorted(upcoming_events, key=lambda x: x["start_at"]), + "past_events": sorted(past_events, key=lambda x: x["start_at"], reverse=True), + "total_attended": sum(1 for rsvp in rsvps if rsvp.attended), + "total_rsvps": len(rsvps) + } + # ============================================================================ # Calendar Export Endpoints (Universal iCalendar .ics format) # ============================================================================ @@ -3144,6 +3201,529 @@ async def get_import_job_details( } +# ============================================================================ +# WordPress CSV Import Endpoints +# ============================================================================ + +@api_router.post("/admin/import/upload-csv") +async def upload_wordpress_csv( + file: UploadFile = File(...), + current_user: User = Depends(require_permission("users.import")), + db: Session = Depends(get_db) +): + """ + Upload WordPress CSV, parse, and generate status suggestions. + + This endpoint: + 1. Validates the CSV file + 2. Uploads to R2 storage + 3. Parses WordPress data (PHP serialized roles, etc.) + 4. Generates smart status suggestions + 5. Creates ImportJob record with status='preview_ready' + 6. Stores preview data in wordpress_metadata field + + Returns: + Import job summary with data quality metrics + + Requires permission: users.import + """ + # Validate file type + if not file.filename.endswith('.csv'): + raise HTTPException(status_code=400, detail="Only CSV files are supported") + + # Validate file size (10MB max) + MAX_FILE_SIZE = 10 * 1024 * 1024 # 10MB + contents = await file.read() + if len(contents) > MAX_FILE_SIZE: + raise HTTPException(status_code=400, detail="File size exceeds 10MB limit") + + # Save to temporary file for parsing + import tempfile + with tempfile.NamedTemporaryFile(mode='wb', delete=False, suffix='.csv') as tmp: + tmp.write(contents) + tmp_path = tmp.name + + try: + # Fetch existing emails from database to check for duplicates + existing_emails = set( + email.lower() for (email,) in db.query(User.email).all() + ) + logger.info(f"Checking against {len(existing_emails)} existing emails in database") + + # Parse CSV with WordPress parser + analysis_result = analyze_csv(tmp_path, existing_emails=existing_emails) + + # Note: File contents stored in wordpress_metadata, R2 upload optional + # Could implement R2 upload later if needed for archival purposes + + # Create ImportJob record + import_job = ImportJob( + filename=file.filename, + file_key=None, # Optional: could add R2 upload later + total_rows=analysis_result['total_rows'], + processed_rows=0, + successful_rows=0, + failed_rows=0, + status=ImportJobStatus.preview_ready, + wordpress_metadata={ + 'preview_data': analysis_result['preview_data'], + 'data_quality': analysis_result['data_quality'], + 'valid_rows': analysis_result['valid_rows'], + 'warnings': analysis_result['warnings'], + 'errors': analysis_result['errors'] + }, + imported_by=current_user.id + ) + + db.add(import_job) + db.commit() + db.refresh(import_job) + + logger.info(f"WordPress CSV uploaded: {import_job.id} by {current_user.email}") + + return { + 'import_job_id': str(import_job.id), + 'total_rows': analysis_result['total_rows'], + 'valid_rows': analysis_result['valid_rows'], + 'warnings': analysis_result['warnings'], + 'errors': analysis_result['errors'], + 'data_quality': analysis_result['data_quality'] + } + + except Exception as e: + logger.error(f"Failed to upload WordPress CSV: {str(e)}") + raise HTTPException(status_code=500, detail=f"Failed to process CSV: {str(e)}") + + finally: + # Clean up temp file + if os.path.exists(tmp_path): + os.unlink(tmp_path) + + +@api_router.get("/admin/import/{job_id}/preview") +async def get_import_preview( + job_id: str, + page: int = 1, + page_size: int = 50, + current_user: User = Depends(require_permission("users.view")), + db: Session = Depends(get_db) +): + """ + Get paginated preview data for WordPress import status review. + + Returns preview data with suggested status mappings that admins + can review and override before executing the import. + + Args: + job_id: Import job UUID + page: Page number (1-indexed) + page_size: Number of rows per page (default 50) + + Returns: + Paginated preview data with status suggestions and warnings + + Requires permission: users.view + """ + # Get import job + job = db.query(ImportJob).filter(ImportJob.id == job_id).first() + if not job: + raise HTTPException(status_code=404, detail="Import job not found") + + # Verify job is in preview_ready status + if job.status != ImportJobStatus.preview_ready: + raise HTTPException( + status_code=400, + detail=f"Import job is not in preview_ready status (current: {job.status.value})" + ) + + # Get preview data from wordpress_metadata + preview_data = job.wordpress_metadata.get('preview_data', []) + + # Format for paginated display + paginated = format_preview_for_display(preview_data, page, page_size) + + return paginated + + +@api_router.post("/admin/import/{job_id}/execute") +async def execute_wordpress_import( + job_id: str, + overrides: dict = {}, + options: dict = {}, + current_user: User = Depends(require_permission("users.import")), + db: Session = Depends(get_db) +): + """ + Execute WordPress import with admin status overrides. + + Process: + 1. Merge status overrides with suggested mappings + 2. Create users in batches (commit every 20 rows) + 3. Track imported_user_ids for rollback capability + 4. Queue password reset emails (async) + 5. Update import job status + + Args: + job_id: Import job UUID + overrides: Dict mapping row_number to status override + e.g., {'1': {'status': 'active'}, '5': {'status': 'inactive'}} + options: Import options + - send_password_emails: bool (default True) + - skip_errors: bool (default True) + + Returns: + Import results with success/failure counts + + Requires permission: users.import + """ + # Get import job + job = db.query(ImportJob).filter(ImportJob.id == job_id).first() + if not job: + raise HTTPException(status_code=404, detail="Import job not found") + + # Verify job is in preview_ready status + if job.status != ImportJobStatus.preview_ready: + raise HTTPException( + status_code=400, + detail=f"Import job is not in preview_ready status (current: {job.status.value})" + ) + + # Update job status to processing + job.status = ImportJobStatus.processing + db.commit() + + # Get preview data + preview_data = job.wordpress_metadata.get('preview_data', []) + + # Import configuration + send_password_emails = options.get('send_password_emails', True) + skip_errors = options.get('skip_errors', True) + + # Track results + imported_user_ids = [] + successful_rows = 0 + failed_rows = 0 + errors = [] + + # Generate default password for all imported users + default_password_hash = get_password_hash(secrets.token_urlsafe(32)) + + try: + # Process each row + for idx, row_data in enumerate(preview_data): + row_num = row_data['row_number'] + + try: + # Skip rows with critical errors + if row_data.get('errors') and skip_errors: + failed_rows += 1 + errors.append({ + 'row': row_num, + 'email': row_data.get('email'), + 'error': ', '.join(row_data['errors']) + }) + continue + + # Apply status override if provided + final_status = row_data['suggested_status'] + if str(row_num) in overrides: + final_status = overrides[str(row_num)].get('status', final_status) + + # Check if user already exists + existing_user = db.query(User).filter(User.email == row_data['email']).first() + if existing_user: + failed_rows += 1 + errors.append({ + 'row': row_num, + 'email': row_data['email'], + 'error': 'User with this email already exists' + }) + continue + + # Create user + new_user = User( + email=row_data['email'], + password_hash=default_password_hash, + first_name=row_data.get('first_name', ''), + last_name=row_data.get('last_name', ''), + phone=row_data.get('phone'), + address='', # WordPress CSV doesn't have address data + city='', + state='', + zipcode='', + date_of_birth=row_data.get('date_of_birth'), + status=UserStatus[final_status], + role=UserRole[row_data['suggested_role']], + newsletter_subscribed=row_data.get('newsletter_consent', False), + email_verified=True, # WordPress users are pre-verified + import_source='wordpress', + import_job_id=job.id, + wordpress_user_id=row_data.get('wordpress_user_id'), + wordpress_registered_date=row_data.get('wordpress_registered') + ) + + db.add(new_user) + db.flush() # Flush to get the ID without committing + imported_user_ids.append(str(new_user.id)) + successful_rows += 1 + + # Commit in batches of 20 + if (idx + 1) % 20 == 0: + db.commit() + job.processed_rows = idx + 1 + db.commit() + + except Exception as e: + logger.error(f"Failed to import row {row_num}: {str(e)}") + failed_rows += 1 + errors.append({ + 'row': row_num, + 'email': row_data.get('email', ''), + 'error': str(e) + }) + if not skip_errors: + db.rollback() + raise HTTPException(status_code=500, detail=f"Import failed at row {row_num}: {str(e)}") + + # Final commit + db.commit() + + # Update import job + job.processed_rows = len(preview_data) + job.successful_rows = successful_rows + job.failed_rows = failed_rows + job.status = ImportJobStatus.completed if failed_rows == 0 else ImportJobStatus.partial + job.imported_user_ids = imported_user_ids + job.error_log = errors + job.completed_at = datetime.now(timezone.utc) + db.commit() + + # Queue password reset emails (async, non-blocking) + password_emails_queued = 0 + if send_password_emails and imported_user_ids: + try: + for user_id_str in imported_user_ids: + try: + # Convert to UUID and fetch user + user_uuid = uuid.UUID(user_id_str) + user = db.query(User).filter(User.id == user_uuid).first() + + if user: + # Generate password reset token + reset_token = create_password_reset_token(user.email) + reset_url = f"{os.getenv('FRONTEND_URL')}/reset-password?token={reset_token}" + + # Send email (async) + await send_password_reset_email(user.email, user.first_name, reset_url) + password_emails_queued += 1 + except (ValueError, AttributeError) as e: + logger.warning(f"Skipping invalid user ID: {user_id_str}") + continue + except Exception as e: + logger.error(f"Failed to send password reset emails: {str(e)}") + # Don't fail import if emails fail + + logger.info(f"Import executed: {job.id} - {successful_rows}/{len(preview_data)} by {current_user.email}") + + return { + 'successful_rows': successful_rows, + 'failed_rows': failed_rows, + 'imported_user_ids': imported_user_ids, + 'password_emails_queued': password_emails_queued, + 'errors': errors + } + + except Exception as e: + db.rollback() + job.status = ImportJobStatus.failed + job.error_log = [{'error': str(e)}] + db.commit() + logger.error(f"Import execution failed: {str(e)}") + raise HTTPException(status_code=500, detail=f"Import execution failed: {str(e)}") + + +@api_router.post("/admin/import/{job_id}/rollback") +async def rollback_import_job( + job_id: str, + confirm: bool = False, + current_user: User = Depends(require_permission("users.import")), + db: Session = Depends(get_db) +): + """ + Delete all users from a specific import job (full rollback). + + Safety checks: + - Requires confirm=True parameter + - Verifies job status is completed or partial + - Cannot rollback twice (checks rollback_at is None) + - Logs action to import_rollback_audit table + + Args: + job_id: Import job UUID + confirm: Must be True to execute rollback + + Returns: + Number of deleted users and confirmation message + + Requires permission: users.import + """ + # Safety check: require explicit confirmation + if not confirm: + raise HTTPException( + status_code=400, + detail="Rollback requires confirm=true parameter" + ) + + # Get import job + job = db.query(ImportJob).filter(ImportJob.id == job_id).first() + if not job: + raise HTTPException(status_code=404, detail="Import job not found") + + # Verify job can be rolled back + if job.status not in [ImportJobStatus.completed, ImportJobStatus.partial]: + raise HTTPException( + status_code=400, + detail=f"Cannot rollback import with status: {job.status.value}" + ) + + if job.rollback_at: + raise HTTPException( + status_code=400, + detail="Import has already been rolled back" + ) + + # Get imported user IDs + imported_user_ids = job.imported_user_ids or [] + if not imported_user_ids: + raise HTTPException( + status_code=400, + detail="No users to rollback (imported_user_ids is empty)" + ) + + try: + # Delete all imported users + deleted_count = db.query(User).filter( + User.id.in_([uuid.UUID(uid) for uid in imported_user_ids]) + ).delete(synchronize_session=False) + + # Update import job + job.status = ImportJobStatus.rolled_back + job.rollback_at = datetime.now(timezone.utc) + job.rollback_by = current_user.id + + # Create audit record + from models import ImportRollbackAudit + audit = ImportRollbackAudit( + import_job_id=job.id, + rolled_back_by=current_user.id, + deleted_user_count=deleted_count, + deleted_user_ids=imported_user_ids, + reason="Manual rollback by admin" + ) + db.add(audit) + + db.commit() + + logger.warning(f"Import rolled back: {job.id} - {deleted_count} users deleted by {current_user.email}") + + return { + 'deleted_users': deleted_count, + 'message': f'Import rolled back successfully. {deleted_count} users deleted.' + } + + except Exception as e: + db.rollback() + logger.error(f"Rollback failed for job {job.id}: {str(e)}") + raise HTTPException(status_code=500, detail=f"Rollback failed: {str(e)}") + + +@api_router.get("/admin/import/{job_id}/status") +async def get_import_status( + job_id: str, + current_user: User = Depends(require_permission("users.view")), + db: Session = Depends(get_db) +): + """ + Get real-time import progress status for polling. + + Use this endpoint to poll for import progress updates + while the import is executing. + + Args: + job_id: Import job UUID + + Returns: + Current import status with progress percentage + + Requires permission: users.view + """ + job = db.query(ImportJob).filter(ImportJob.id == job_id).first() + if not job: + raise HTTPException(status_code=404, detail="Import job not found") + + progress_percent = 0.0 + if job.total_rows > 0: + progress_percent = (job.processed_rows / job.total_rows) * 100 + + return { + 'status': job.status.value, + 'processed_rows': job.processed_rows, + 'total_rows': job.total_rows, + 'progress_percent': round(progress_percent, 1), + 'successful_rows': job.successful_rows, + 'failed_rows': job.failed_rows + } + + +@api_router.get("/admin/import/{job_id}/errors/download") +async def download_error_report( + job_id: str, + current_user: User = Depends(require_permission("users.view")), + db: Session = Depends(get_db) +): + """ + Download CSV report with all import errors. + + CSV columns: Row Number, Email, Error Type, Error Message, Original Data + + Args: + job_id: Import job UUID + + Returns: + StreamingResponse with CSV file + + Requires permission: users.view + """ + job = db.query(ImportJob).filter(ImportJob.id == job_id).first() + if not job: + raise HTTPException(status_code=404, detail="Import job not found") + + errors = job.error_log or [] + if not errors: + raise HTTPException(status_code=404, detail="No errors found for this import job") + + # Generate CSV + output = io.StringIO() + writer = csv.DictWriter(output, fieldnames=['Row Number', 'Email', 'Error Type', 'Error Message']) + writer.writeheader() + + for error in errors: + writer.writerow({ + 'Row Number': error.get('row', ''), + 'Email': error.get('email', ''), + 'Error Type': 'Import Error', + 'Error Message': error.get('error', '') + }) + + # Return as streaming response + output.seek(0) + return StreamingResponse( + iter([output.getvalue()]), + media_type="text/csv", + headers={"Content-Disposition": f"attachment; filename=import_errors_{job_id}.csv"} + ) + + @api_router.post("/admin/events", response_model=EventResponse) async def create_event( request: EventCreate, @@ -3256,10 +3836,20 @@ async def mark_attendance( EventRSVP.event_id == event_id, EventRSVP.user_id == request.user_id ).first() - + + # Auto-create RSVP if it doesn't exist (for retroactive attendance marking) if not rsvp: - raise HTTPException(status_code=404, detail="RSVP not found") - + rsvp = EventRSVP( + event_id=event_id, + user_id=request.user_id, + rsvp_status=RSVPStatus.yes, # Default to 'yes' for attended events + attended=False, + created_at=datetime.now(timezone.utc), + updated_at=datetime.now(timezone.utc) + ) + db.add(rsvp) + db.flush() # Get the ID without committing + rsvp.attended = request.attended rsvp.attended_at = datetime.now(timezone.utc) if request.attended else None rsvp.updated_at = datetime.now(timezone.utc) diff --git a/wordpress_parser.py b/wordpress_parser.py new file mode 100644 index 0000000..4a1e329 --- /dev/null +++ b/wordpress_parser.py @@ -0,0 +1,531 @@ +""" +WordPress CSV Parser Module + +This module provides utilities for parsing WordPress user export CSV files +and transforming them into LOAF platform-compatible data structures. + +Key Features: +- Parse PHP serialized data (WordPress capabilities) +- Map WordPress roles to LOAF roles and statuses +- Validate and standardize user data (DOB, phone numbers) +- Generate smart status suggestions based on approval and subscription data +- Comprehensive data quality analysis and error reporting + +Author: Claude Code +Date: 2025-12-24 +""" + +import csv +import re +import logging +from datetime import datetime +from typing import Dict, List, Optional, Tuple +import phpserialize + +logger = logging.getLogger(__name__) + + +# ============================================================================ +# WordPress Role Mapping Configuration +# ============================================================================ + +ROLE_MAPPING = { + # WordPress admin roles → LOAF admin roles (auto-active) + 'administrator': ('superadmin', 'active'), + 'loaf_admin': ('admin', 'active'), + 'loaf_treasure': ('finance', 'active'), + 'loaf_communication': ('admin', 'active'), + + # WordPress member roles → LOAF member role (status from approval) + 'pms_subscription_plan_63': ('member', None), # Status determined by approval + 'registered': ('guest', None), # Default WordPress role + + # Fallback for unknown roles + '__default__': ('guest', None) +} + +# Role priority order (higher index = higher priority) +ROLE_PRIORITY = [ + 'registered', + 'pms_subscription_plan_63', + 'loaf_communication', + 'loaf_treasure', + 'loaf_admin', + 'administrator' +] + + +# ============================================================================ +# PHP Serialization Parsing +# ============================================================================ + +def parse_php_serialized(data: str) -> List[str]: + """ + Parse WordPress PHP serialized capabilities string. + + WordPress stores user capabilities as serialized PHP arrays like: + a:1:{s:10:"registered";b:1;} + a:2:{s:10:"registered";b:1;s:24:"pms_subscription_plan_63";b:1;} + + Args: + data: PHP serialized string + + Returns: + List of role names (e.g., ['registered', 'pms_subscription_plan_63']) + + Examples: + >>> parse_php_serialized('a:1:{s:10:"registered";b:1;}') + ['registered'] + >>> parse_php_serialized('a:2:{s:10:"registered";b:1;s:24:"pms_subscription_plan_63";b:1;}') + ['registered', 'pms_subscription_plan_63'] + """ + if not data or pd.isna(data): + return [] + + try: + # Use phpserialize library to parse + parsed = phpserialize.loads(data.encode('utf-8')) + + # Extract role names (keys where value is True) + if isinstance(parsed, dict): + roles = [key.decode('utf-8') if isinstance(key, bytes) else key + for key, value in parsed.items() if value] + return roles + + return [] + except Exception as e: + logger.warning(f"Failed to parse PHP serialized data: {data[:50]}... Error: {str(e)}") + return [] + + +# ============================================================================ +# Role and Status Mapping +# ============================================================================ + +def map_wordpress_role(wp_roles: List[str]) -> Tuple[str, Optional[str]]: + """ + Map WordPress roles to LOAF role and suggested status. + + Priority logic: + 1. If user has any admin role → corresponding LOAF admin role with 'active' status + 2. If user has subscription → 'member' role (status from approval) + 3. Otherwise → 'guest' role (status from approval) + + Args: + wp_roles: List of WordPress role names + + Returns: + Tuple of (loaf_role, suggested_status) + - loaf_role: One of: superadmin, admin, finance, member, guest + - suggested_status: One of: active, pre_validated, payment_pending, None (determined by approval) + + Examples: + >>> map_wordpress_role(['loaf_admin']) + ('admin', 'active') + >>> map_wordpress_role(['loaf_treasure']) + ('finance', 'active') + >>> map_wordpress_role(['pms_subscription_plan_63', 'registered']) + ('member', None) + >>> map_wordpress_role(['registered']) + ('guest', None) + """ + if not wp_roles: + return ROLE_MAPPING['__default__'] + + # Sort roles by priority (highest priority last) + prioritized_roles = sorted( + wp_roles, + key=lambda r: ROLE_PRIORITY.index(r) if r in ROLE_PRIORITY else -1 + ) + + # Map highest priority role + highest_role = prioritized_roles[-1] if prioritized_roles else 'registered' + return ROLE_MAPPING.get(highest_role, ROLE_MAPPING['__default__']) + + +def suggest_status(approval_status: str, has_subscription: bool, wordpress_role: str = 'guest') -> str: + """ + Suggest LOAF user status based on WordPress approval and subscription data. + + Logic: + 1. Admin roles (loaf_admin, loaf_treasure, administrator) → always 'active' + 2. approved + subscription → 'active' + 3. approved without subscription → 'pre_validated' + 4. pending → 'payment_pending' + 5. Other/empty → 'pre_validated' + + Args: + approval_status: WordPress approval status (approved, pending, unapproved, etc.) + has_subscription: Whether user has pms_subscription_plan_63 role + wordpress_role: LOAF role mapped from WordPress (for admin check) + + Returns: + Suggested LOAF status: active, pre_validated, payment_pending, or inactive + + Examples: + >>> suggest_status('approved', True, 'member') + 'active' + >>> suggest_status('approved', False, 'member') + 'pre_validated' + >>> suggest_status('pending', True, 'member') + 'payment_pending' + >>> suggest_status('', False, 'admin') + 'active' + """ + # Admin roles are always active + if wordpress_role in ('superadmin', 'admin', 'finance'): + return 'active' + + # Normalize approval status + approval = (approval_status or '').lower().strip() + + if approval == 'approved': + return 'active' if has_subscription else 'pre_validated' + elif approval == 'pending': + return 'payment_pending' + elif approval == 'unapproved': + return 'inactive' + else: + # Empty or unknown approval status + return 'pre_validated' + + +# ============================================================================ +# Data Validation and Standardization +# ============================================================================ + +def standardize_phone(phone: str) -> str: + """ + Standardize phone number by extracting digits only. + + Removes all non-digit characters: + - (713) 560-7850 → 7135607850 + - 713-725-8902 → 7137258902 + - Empty/None → 0000000000 (fallback) + + Args: + phone: Phone number in any format + + Returns: + 10-digit phone number string (or 0000000000 if invalid) + + Examples: + >>> standardize_phone('(713) 560-7850') + '7135607850' + >>> standardize_phone('713-725-8902') + '7137258902' + >>> standardize_phone('') + '0000000000' + """ + if not phone or pd.isna(phone): + return '0000000000' + + # Extract all digits + digits = re.sub(r'\D', '', str(phone)) + + # Return 10 digits or fallback + if len(digits) == 10: + return digits + elif len(digits) == 11 and digits[0] == '1': + # Remove leading 1 (US country code) + return digits[1:] + else: + logger.warning(f"Invalid phone format: {phone} (extracted: {digits})") + return '0000000000' + + +def validate_dob(dob_str: str) -> Tuple[Optional[datetime], Optional[str]]: + """ + Validate and parse date of birth. + + Validation rules: + - Must be in MM/DD/YYYY format + - Year must be between 1900 and current year + - Cannot be in the future + - Reject year 0000 or 2025+ (data quality issues in WordPress export) + + Args: + dob_str: Date of birth string in MM/DD/YYYY format + + Returns: + Tuple of (parsed_datetime, warning_message) + - parsed_datetime: datetime object if valid, None if invalid + - warning_message: Descriptive error message if invalid, None if valid + + Examples: + >>> validate_dob('08/02/1962') + (datetime(1962, 8, 2), None) + >>> validate_dob('08/02/0000') + (None, 'Invalid year: 0000') + >>> validate_dob('08/02/2025') + (None, 'Date is in the future') + """ + if not dob_str or pd.isna(dob_str): + return None, 'Missing date of birth' + + try: + # Parse MM/DD/YYYY format + parsed = datetime.strptime(str(dob_str).strip(), '%m/%d/%Y') + + # Validate year range + if parsed.year == 0: + return None, 'Invalid year: 0000 (data quality issue)' + elif parsed.year < 1900: + return None, f'Year too old: {parsed.year} (likely invalid)' + elif parsed.year > datetime.now().year: + return None, f'Date is in the future: {parsed.year}' + elif parsed > datetime.now(): + return None, 'Date is in the future' + + return parsed, None + + except ValueError as e: + return None, f'Invalid date format: {dob_str} (expected MM/DD/YYYY)' + + +# ============================================================================ +# CSV Analysis and Preview Generation +# ============================================================================ + +def analyze_csv(file_path: str, existing_emails: Optional[set] = None) -> Dict: + """ + Analyze WordPress CSV file and generate preview data with status suggestions. + + This is the main entry point for CSV processing. It: + 1. Reads and parses the CSV file + 2. Validates each row and generates warnings + 3. Maps WordPress roles to LOAF roles + 4. Suggests status for each user + 5. Tracks data quality metrics + 6. Checks for duplicate emails (both within CSV and against existing database) + 7. Returns comprehensive analysis and preview data + + Args: + file_path: Path to WordPress CSV export file + existing_emails: Set of emails already in the database (optional) + + Returns: + Dictionary containing: + - total_rows: Total number of user rows + - valid_rows: Number of rows without critical errors + - warnings: Total warning count + - errors: Total critical error count + - preview_data: List of row dictionaries with suggestions + - data_quality: Dictionary of data quality metrics + + Example output: + { + 'total_rows': 183, + 'valid_rows': 176, + 'warnings': 66, + 'errors': 7, + 'preview_data': [ + { + 'row_number': 1, + 'email': 'user@example.com', + 'first_name': 'John', + 'last_name': 'Doe', + 'phone': '7135607850', + 'date_of_birth': '1962-08-02', + 'wordpress_roles': ['registered', 'pms_subscription_plan_63'], + 'suggested_role': 'member', + 'suggested_status': 'active', + 'warnings': [], + 'errors': [] + }, + ... + ], + 'data_quality': { + 'invalid_dob': 66, + 'missing_phone': 7, + 'duplicate_email_csv': 0, + 'duplicate_email_db': 3, + 'unparseable_roles': 2 + } + } + """ + import pandas as pd + + # Read CSV with pandas + df = pd.read_csv(file_path) + + total_rows = len(df) + preview_data = [] + data_quality = { + 'invalid_dob': 0, + 'missing_phone': 0, + 'duplicate_email_csv': 0, + 'duplicate_email_db': 0, + 'unparseable_roles': 0, + 'missing_email': 0 + } + + # Track seen emails for CSV duplicate detection + seen_emails = {} + + # Convert existing_emails to set if provided + if existing_emails is None: + existing_emails = set() + + for idx, row in df.iterrows(): + row_num = idx + 1 + warnings = [] + errors = [] + + # Extract and validate email + email = str(row.get('user_email', '')).strip().lower() + if not email or email == 'nan': + errors.append('Missing email address') + data_quality['missing_email'] += 1 + else: + # Check for duplicates within CSV + if email in seen_emails: + errors.append(f'Duplicate email in CSV (also in row {seen_emails[email]})') + data_quality['duplicate_email_csv'] += 1 + # Check for duplicates in existing database + elif email in existing_emails: + errors.append(f'Email already exists in database') + data_quality['duplicate_email_db'] += 1 + else: + seen_emails[email] = row_num + + # Extract basic fields + first_name = str(row.get('first_name', '')).strip() + last_name = str(row.get('last_name', '')).strip() + + # Parse and validate DOB + dob_parsed, dob_warning = validate_dob(row.get('date_of_birth')) + if dob_warning: + warnings.append(dob_warning) + data_quality['invalid_dob'] += 1 + + # Standardize phone + phone = standardize_phone(row.get('cell_phone')) + if phone == '0000000000': + warnings.append('Missing or invalid phone number') + data_quality['missing_phone'] += 1 + + # Parse WordPress roles + wp_capabilities = row.get('wp_capabilities', '') + wp_roles = parse_php_serialized(wp_capabilities) + if not wp_roles and wp_capabilities: + warnings.append('Could not parse WordPress roles') + data_quality['unparseable_roles'] += 1 + + # Map to LOAF role and status + loaf_role, role_suggested_status = map_wordpress_role(wp_roles) + + # Determine if user has subscription + has_subscription = 'pms_subscription_plan_63' in wp_roles + + # Get approval status + approval_status = str(row.get('wppb_approval_status', '')).strip() + + # Suggest final status + if role_suggested_status: + # Admin roles have fixed status from role mapping + suggested_status = role_suggested_status + else: + # Regular users get status from approval logic + suggested_status = suggest_status(approval_status, has_subscription, loaf_role) + + # Build preview row + preview_row = { + 'row_number': row_num, + 'email': email, + 'first_name': first_name, + 'last_name': last_name, + 'phone': phone, + 'date_of_birth': dob_parsed.isoformat() if dob_parsed else None, + 'wordpress_user_id': int(row.get('ID', 0)) if pd.notna(row.get('ID')) else None, + 'wordpress_registered': str(row.get('user_registered', '')), + 'wordpress_roles': wp_roles, + 'wordpress_approval_status': approval_status, + 'has_subscription': has_subscription, + 'suggested_role': loaf_role, + 'suggested_status': suggested_status, + 'warnings': warnings, + 'errors': errors, + 'newsletter_consent': str(row.get('newsletter_consent', '')).lower() == 'yes', + 'newsletter_checklist': str(row.get('newsletter_checklist', '')).lower() == 'yes' + } + + preview_data.append(preview_row) + + # Calculate summary statistics + valid_rows = sum(1 for row in preview_data if not row['errors']) + total_warnings = sum(len(row['warnings']) for row in preview_data) + total_errors = sum(len(row['errors']) for row in preview_data) + + return { + 'total_rows': total_rows, + 'valid_rows': valid_rows, + 'warnings': total_warnings, + 'errors': total_errors, + 'preview_data': preview_data, + 'data_quality': data_quality + } + + +# ============================================================================ +# Utility Functions +# ============================================================================ + +def get_status_badge_color(status: str) -> str: + """ + Get appropriate badge color for status display in UI. + + Args: + status: User status string + + Returns: + Tailwind CSS color class + """ + colors = { + 'active': 'bg-green-100 text-green-800', + 'pre_validated': 'bg-blue-100 text-blue-800', + 'payment_pending': 'bg-yellow-100 text-yellow-800', + 'inactive': 'bg-gray-100 text-gray-800', + 'pending_email': 'bg-purple-100 text-purple-800', + 'awaiting_event': 'bg-indigo-100 text-indigo-800' + } + return colors.get(status, 'bg-gray-100 text-gray-800') + + +def format_preview_for_display(preview_data: List[Dict], page: int = 1, page_size: int = 50) -> Dict: + """ + Format preview data for paginated display in frontend. + + Args: + preview_data: Full preview data list + page: Page number (1-indexed) + page_size: Number of rows per page + + Returns: + Dictionary with paginated data and metadata + """ + total_pages = (len(preview_data) + page_size - 1) // page_size + start_idx = (page - 1) * page_size + end_idx = start_idx + page_size + + return { + 'page': page, + 'page_size': page_size, + 'total_pages': total_pages, + 'total_rows': len(preview_data), + 'rows': preview_data[start_idx:end_idx] + } + + +# ============================================================================ +# Module Initialization +# ============================================================================ + +# Import pandas for CSV processing +try: + import pandas as pd +except ImportError: + logger.error("pandas library not found. Please install: pip install pandas") + raise + +logger.info("WordPress parser module loaded successfully")