Compare commits

...

2 Commits

Author SHA1 Message Date
kayela
09712e52bb Merge remote-tracking branch 'origin/dev' into docker 2025-12-26 16:47:24 -06:00
Koncept Kit
487481b322 Test Preparation 2025-12-26 20:03:53 +07:00
9 changed files with 1357 additions and 9 deletions

Binary file not shown.

Binary file not shown.

View File

@@ -77,7 +77,10 @@ CREATE TYPE importjobstatus AS ENUM (
'processing',
'completed',
'failed',
'partial'
'partial',
'validating',
'preview_ready',
'rolled_back'
);
COMMIT;
@@ -152,6 +155,12 @@ CREATE TABLE IF NOT EXISTS users (
reminder_60_days_sent BOOLEAN DEFAULT FALSE,
reminder_85_days_sent BOOLEAN DEFAULT FALSE,
-- WordPress Import Tracking
import_source VARCHAR(50),
import_job_id UUID REFERENCES import_jobs(id),
wordpress_user_id BIGINT,
wordpress_registered_date TIMESTAMP WITH TIME ZONE,
-- Timestamps
created_at TIMESTAMP WITH TIME ZONE DEFAULT CURRENT_TIMESTAMP,
updated_at TIMESTAMP WITH TIME ZONE DEFAULT CURRENT_TIMESTAMP
@@ -464,11 +473,30 @@ CREATE TABLE IF NOT EXISTS import_jobs (
error_count INTEGER DEFAULT 0,
error_log JSONB DEFAULT '[]'::jsonb,
-- WordPress import enhancements
field_mapping JSONB DEFAULT '{}'::jsonb,
wordpress_metadata JSONB DEFAULT '{}'::jsonb,
imported_user_ids JSONB DEFAULT '[]'::jsonb,
rollback_at TIMESTAMP WITH TIME ZONE,
rollback_by UUID REFERENCES users(id),
started_by UUID REFERENCES users(id),
started_at TIMESTAMP WITH TIME ZONE DEFAULT CURRENT_TIMESTAMP,
completed_at TIMESTAMP WITH TIME ZONE
);
-- Import Rollback Audit table (for tracking rollback operations)
CREATE TABLE IF NOT EXISTS import_rollback_audit (
id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
import_job_id UUID NOT NULL REFERENCES import_jobs(id),
rolled_back_by UUID NOT NULL REFERENCES users(id),
rolled_back_at TIMESTAMP WITH TIME ZONE NOT NULL DEFAULT NOW(),
deleted_user_count INTEGER NOT NULL,
deleted_user_ids JSONB NOT NULL,
reason TEXT,
created_at TIMESTAMP WITH TIME ZONE NOT NULL DEFAULT NOW()
);
COMMIT;
-- Display progress
@@ -488,6 +516,8 @@ CREATE INDEX IF NOT EXISTS idx_users_role_id ON users(role_id);
CREATE INDEX IF NOT EXISTS idx_users_email_verified ON users(email_verified);
CREATE INDEX IF NOT EXISTS idx_users_rejected_at ON users(rejected_at) WHERE rejected_at IS NOT NULL;
CREATE INDEX IF NOT EXISTS idx_users_created_at ON users(created_at);
CREATE INDEX IF NOT EXISTS idx_users_import_job ON users(import_job_id) WHERE import_job_id IS NOT NULL;
CREATE INDEX IF NOT EXISTS idx_users_import_source ON users(import_source) WHERE import_source IS NOT NULL;
-- Events table indexes
CREATE INDEX IF NOT EXISTS idx_events_created_by ON events(created_by);
@@ -514,6 +544,14 @@ CREATE INDEX IF NOT EXISTS idx_donation_type ON donations(donation_type);
CREATE INDEX IF NOT EXISTS idx_donation_status ON donations(status);
CREATE INDEX IF NOT EXISTS idx_donation_created ON donations(created_at);
-- Import Jobs indexes
CREATE INDEX IF NOT EXISTS idx_import_jobs_status ON import_jobs(status);
CREATE INDEX IF NOT EXISTS idx_import_jobs_started_by ON import_jobs(started_by);
-- Import Rollback Audit indexes
CREATE INDEX IF NOT EXISTS idx_rollback_audit_import_job ON import_rollback_audit(import_job_id);
CREATE INDEX IF NOT EXISTS idx_rollback_audit_rolled_back_at ON import_rollback_audit(rolled_back_at DESC);
-- Permissions indexes
CREATE INDEX IF NOT EXISTS idx_permissions_code ON permissions(code);
CREATE INDEX IF NOT EXISTS idx_permissions_module ON permissions(module);

View File

@@ -0,0 +1,153 @@
-- Migration: 011_wordpress_import_enhancements
-- Purpose: Enhance ImportJob and User tables for WordPress CSV import feature
-- Date: 2025-12-24
-- Author: Claude Code
-- ============================================================================
-- PART 1: Enhance ImportJob Table
-- ============================================================================
-- Add new columns to import_jobs table for WordPress import tracking
ALTER TABLE import_jobs
ADD COLUMN IF NOT EXISTS field_mapping JSONB DEFAULT '{}'::jsonb,
ADD COLUMN IF NOT EXISTS wordpress_metadata JSONB DEFAULT '{}'::jsonb,
ADD COLUMN IF NOT EXISTS imported_user_ids JSONB DEFAULT '[]'::jsonb,
ADD COLUMN IF NOT EXISTS rollback_at TIMESTAMP WITH TIME ZONE,
ADD COLUMN IF NOT EXISTS rollback_by UUID REFERENCES users(id);
-- Add comments for documentation
COMMENT ON COLUMN import_jobs.field_mapping IS 'Maps CSV columns to database fields: {csv_column: db_field}';
COMMENT ON COLUMN import_jobs.wordpress_metadata IS 'Stores preview data, validation results, and WordPress-specific metadata';
COMMENT ON COLUMN import_jobs.imported_user_ids IS 'Array of user IDs created from this import job (for rollback)';
COMMENT ON COLUMN import_jobs.rollback_at IS 'Timestamp when this import was rolled back';
COMMENT ON COLUMN import_jobs.rollback_by IS 'Admin user who performed the rollback';
-- ============================================================================
-- PART 2: Add New ImportJob Status Values
-- ============================================================================
-- Add new status values for import workflow
-- Note: PostgreSQL enum values cannot be added conditionally, so we use DO block
DO $$
BEGIN
-- Add 'validating' status if it doesn't exist
IF NOT EXISTS (SELECT 1 FROM pg_enum WHERE enumlabel = 'validating' AND enumtypid = (SELECT oid FROM pg_type WHERE typname = 'importjobstatus')) THEN
ALTER TYPE importjobstatus ADD VALUE 'validating';
END IF;
-- Add 'preview_ready' status if it doesn't exist
IF NOT EXISTS (SELECT 1 FROM pg_enum WHERE enumlabel = 'preview_ready' AND enumtypid = (SELECT oid FROM pg_type WHERE typname = 'importjobstatus')) THEN
ALTER TYPE importjobstatus ADD VALUE 'preview_ready';
END IF;
-- Add 'rolled_back' status if it doesn't exist
IF NOT EXISTS (SELECT 1 FROM pg_enum WHERE enumlabel = 'rolled_back' AND enumtypid = (SELECT oid FROM pg_type WHERE typname = 'importjobstatus')) THEN
ALTER TYPE importjobstatus ADD VALUE 'rolled_back';
END IF;
END$$;
-- ============================================================================
-- PART 3: Enhance User Table for Import Tracking
-- ============================================================================
-- Add columns to track import source and WordPress metadata
ALTER TABLE users
ADD COLUMN IF NOT EXISTS import_source VARCHAR(50),
ADD COLUMN IF NOT EXISTS import_job_id UUID REFERENCES import_jobs(id),
ADD COLUMN IF NOT EXISTS wordpress_user_id BIGINT,
ADD COLUMN IF NOT EXISTS wordpress_registered_date TIMESTAMP WITH TIME ZONE;
-- Add comments for documentation
COMMENT ON COLUMN users.import_source IS 'Source of user creation: wordpress, manual, registration, etc.';
COMMENT ON COLUMN users.import_job_id IS 'Reference to import job that created this user (if imported)';
COMMENT ON COLUMN users.wordpress_user_id IS 'Original WordPress user ID for reference';
COMMENT ON COLUMN users.wordpress_registered_date IS 'Original WordPress registration date';
-- ============================================================================
-- PART 4: Create Indexes for Performance
-- ============================================================================
-- Index for querying users by import job (used in rollback)
CREATE INDEX IF NOT EXISTS idx_users_import_job
ON users(import_job_id)
WHERE import_job_id IS NOT NULL;
-- Index for querying users by import source
CREATE INDEX IF NOT EXISTS idx_users_import_source
ON users(import_source)
WHERE import_source IS NOT NULL;
-- Index for querying import jobs by status
CREATE INDEX IF NOT EXISTS idx_import_jobs_status
ON import_jobs(status);
-- ============================================================================
-- PART 5: Create Rollback Audit Table (Optional but Recommended)
-- ============================================================================
-- Create table to track import rollback history for audit purposes
CREATE TABLE IF NOT EXISTS import_rollback_audit (
id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
import_job_id UUID NOT NULL REFERENCES import_jobs(id),
rolled_back_by UUID NOT NULL REFERENCES users(id),
rolled_back_at TIMESTAMP WITH TIME ZONE NOT NULL DEFAULT NOW(),
deleted_user_count INTEGER NOT NULL,
deleted_user_ids JSONB NOT NULL,
reason TEXT,
created_at TIMESTAMP WITH TIME ZONE NOT NULL DEFAULT NOW()
);
-- Index for querying rollback history
CREATE INDEX IF NOT EXISTS idx_rollback_audit_import_job
ON import_rollback_audit(import_job_id);
CREATE INDEX IF NOT EXISTS idx_rollback_audit_rolled_back_at
ON import_rollback_audit(rolled_back_at DESC);
COMMENT ON TABLE import_rollback_audit IS 'Audit trail for import rollback operations';
-- ============================================================================
-- VERIFICATION QUERIES (Run after migration to verify)
-- ============================================================================
-- Verify ImportJob columns exist
-- SELECT column_name, data_type
-- FROM information_schema.columns
-- WHERE table_name = 'import_jobs'
-- AND column_name IN ('field_mapping', 'wordpress_metadata', 'imported_user_ids', 'rollback_at', 'rollback_by');
-- Verify User columns exist
-- SELECT column_name, data_type
-- FROM information_schema.columns
-- WHERE table_name = 'users'
-- AND column_name IN ('import_source', 'import_job_id', 'wordpress_user_id', 'wordpress_registered_date');
-- Verify new enum values exist
-- SELECT enumlabel FROM pg_enum WHERE enumtypid = (SELECT oid FROM pg_type WHERE typname = 'importjobstatus') ORDER BY enumlabel;
-- Verify indexes exist
-- SELECT indexname, indexdef FROM pg_indexes WHERE tablename IN ('users', 'import_jobs', 'import_rollback_audit') ORDER BY indexname;
-- ============================================================================
-- ROLLBACK SCRIPT (if needed)
-- ============================================================================
-- WARNING: This will drop all columns and data related to WordPress imports
-- USE WITH EXTREME CAUTION
-- DROP TABLE IF EXISTS import_rollback_audit CASCADE;
-- DROP INDEX IF EXISTS idx_users_import_job;
-- DROP INDEX IF EXISTS idx_users_import_source;
-- DROP INDEX IF EXISTS idx_import_jobs_status;
-- ALTER TABLE users DROP COLUMN IF EXISTS import_source;
-- ALTER TABLE users DROP COLUMN IF EXISTS import_job_id;
-- ALTER TABLE users DROP COLUMN IF EXISTS wordpress_user_id;
-- ALTER TABLE users DROP COLUMN IF EXISTS wordpress_registered_date;
-- ALTER TABLE import_jobs DROP COLUMN IF EXISTS field_mapping;
-- ALTER TABLE import_jobs DROP COLUMN IF EXISTS wordpress_metadata;
-- ALTER TABLE import_jobs DROP COLUMN IF EXISTS imported_user_ids;
-- ALTER TABLE import_jobs DROP COLUMN IF EXISTS rollback_at;
-- ALTER TABLE import_jobs DROP COLUMN IF EXISTS rollback_by;
-- Note: Cannot easily remove enum values from importjobstatus type without recreating it
-- Manual intervention required if rollback of enum values is needed

View File

@@ -130,6 +130,12 @@ class User(Base):
rejected_at = Column(DateTime(timezone=True), nullable=True, comment="Timestamp when application was rejected")
rejected_by = Column(UUID(as_uuid=True), ForeignKey('users.id'), nullable=True, comment="Admin who rejected the application")
# WordPress Import Tracking
import_source = Column(String(50), nullable=True, comment="Source of user creation: wordpress, manual, registration")
import_job_id = Column(UUID(as_uuid=True), ForeignKey('import_jobs.id'), nullable=True, comment="Import job that created this user")
wordpress_user_id = Column(BigInteger, nullable=True, comment="Original WordPress user ID")
wordpress_registered_date = Column(DateTime(timezone=True), nullable=True, comment="Original WordPress registration date")
created_at = Column(DateTime, default=lambda: datetime.now(timezone.utc))
updated_at = Column(DateTime, default=lambda: datetime.now(timezone.utc), onupdate=lambda: datetime.now(timezone.utc))
@@ -451,6 +457,9 @@ class ImportJobStatus(enum.Enum):
completed = "completed"
failed = "failed"
partial = "partial"
validating = "validating"
preview_ready = "preview_ready"
rolled_back = "rolled_back"
class ImportJob(Base):
"""Track CSV import jobs with error handling"""
@@ -466,6 +475,13 @@ class ImportJob(Base):
status = Column(SQLEnum(ImportJobStatus), default=ImportJobStatus.processing, nullable=False)
errors = Column(JSON, default=list, nullable=False) # [{row: 5, field: "email", error: "Invalid format"}]
# WordPress import enhancements
field_mapping = Column(JSON, default=dict, nullable=False) # Maps CSV columns to DB fields
wordpress_metadata = Column(JSON, default=dict, nullable=False) # Preview data, validation results
imported_user_ids = Column(JSON, default=list, nullable=False) # User IDs for rollback
rollback_at = Column(DateTime, nullable=True)
rollback_by = Column(UUID(as_uuid=True), ForeignKey("users.id"), nullable=True)
# Tracking
imported_by = Column(UUID(as_uuid=True), ForeignKey("users.id"), nullable=False)
started_at = Column(DateTime, default=lambda: datetime.now(timezone.utc), nullable=False)
@@ -473,3 +489,22 @@ class ImportJob(Base):
# Relationships
importer = relationship("User", foreign_keys=[imported_by])
rollback_user = relationship("User", foreign_keys=[rollback_by])
class ImportRollbackAudit(Base):
"""Audit trail for import rollback operations"""
__tablename__ = "import_rollback_audit"
id = Column(UUID(as_uuid=True), primary_key=True, default=uuid.uuid4)
import_job_id = Column(UUID(as_uuid=True), ForeignKey("import_jobs.id"), nullable=False)
rolled_back_by = Column(UUID(as_uuid=True), ForeignKey("users.id"), nullable=False)
rolled_back_at = Column(DateTime, default=lambda: datetime.now(timezone.utc), nullable=False)
deleted_user_count = Column(Integer, nullable=False)
deleted_user_ids = Column(JSON, nullable=False) # List of deleted user UUIDs
reason = Column(Text, nullable=True)
created_at = Column(DateTime, default=lambda: datetime.now(timezone.utc), nullable=False)
# Relationships
import_job = relationship("ImportJob")
admin_user = relationship("User", foreign_keys=[rolled_back_by])

View File

@@ -37,6 +37,7 @@ pandas==2.3.3
passlib==1.7.4
pathspec==0.12.1
pillow==10.2.0
phpserialize==1.3
platformdirs==4.5.0
pluggy==1.6.0
psycopg2-binary==2.9.11

598
server.py
View File

@@ -17,7 +17,7 @@ import csv
import io
from database import engine, get_db, Base
from models import User, Event, EventRSVP, UserStatus, UserRole, RSVPStatus, SubscriptionPlan, Subscription, SubscriptionStatus, StorageUsage, EventGallery, NewsletterArchive, FinancialReport, BylawsDocument, Permission, RolePermission, Role, UserInvitation, InvitationStatus, ImportJob, ImportJobStatus, Donation, DonationType, DonationStatus
from models import User, Event, EventRSVP, UserStatus, UserRole, RSVPStatus, SubscriptionPlan, Subscription, SubscriptionStatus, StorageUsage, EventGallery, NewsletterArchive, FinancialReport, BylawsDocument, Permission, RolePermission, Role, UserInvitation, InvitationStatus, ImportJob, ImportJobStatus, ImportRollbackAudit, Donation, DonationType, DonationStatus
from auth import (
get_password_hash,
verify_password,
@@ -42,6 +42,7 @@ from email_service import (
from payment_service import create_checkout_session, verify_webhook_signature, get_subscription_end_date
from r2_storage import get_r2_storage
from calendar_service import CalendarService
from wordpress_parser import analyze_csv, format_preview_for_display
# Load environment variables
ROOT_DIR = Path(__file__).parent
@@ -655,9 +656,15 @@ async def login(request: LoginRequest, db: Session = Depends(get_db)):
access_token = create_access_token(data={"sub": str(user.id)})
# Clear verification token on first successful login after verification
# Don't let this fail the login if database commit fails
if user.email_verified and user.email_verification_token:
try:
user.email_verification_token = None
db.commit()
except Exception as e:
logger.warning(f"Failed to clear verification token for user {user.id}: {str(e)}")
db.rollback()
# Continue with login - this is not critical
return {
"access_token": access_token,
@@ -887,7 +894,8 @@ async def get_member_directory(
"social_media_facebook": member.social_media_facebook,
"social_media_instagram": member.social_media_instagram,
"social_media_twitter": member.social_media_twitter,
"social_media_linkedin": member.social_media_linkedin
"social_media_linkedin": member.social_media_linkedin,
"created_at": member.created_at.isoformat() if member.created_at else None
} for member in directory_members]
@api_router.get("/members/directory/{user_id}")
@@ -922,7 +930,8 @@ async def get_directory_member_profile(
"social_media_facebook": member.social_media_facebook,
"social_media_instagram": member.social_media_instagram,
"social_media_twitter": member.social_media_twitter,
"social_media_linkedin": member.social_media_linkedin
"social_media_linkedin": member.social_media_linkedin,
"created_at": member.created_at.isoformat() if member.created_at else None
}
# Enhanced Profile Routes (Active Members Only)
@@ -1573,6 +1582,54 @@ async def rsvp_to_event(
return {"message": "RSVP updated successfully"}
@api_router.get("/members/event-activity")
async def get_my_event_activity(
current_user: User = Depends(get_active_member),
db: Session = Depends(get_db)
):
"""
Get current user's event activity including upcoming RSVPs and attendance history
"""
# Get all user's RSVPs
rsvps = db.query(EventRSVP).filter(
EventRSVP.user_id == current_user.id
).order_by(EventRSVP.created_at.desc()).all()
# Categorize events
upcoming_events = []
past_events = []
now = datetime.now(timezone.utc)
for rsvp in rsvps:
event = db.query(Event).filter(Event.id == rsvp.event_id).first()
if not event:
continue
event_data = {
"id": str(event.id),
"title": event.title,
"description": event.description,
"location": event.location,
"start_at": event.start_at.isoformat(),
"end_at": event.end_at.isoformat(),
"rsvp_status": rsvp.rsvp_status.value,
"attended": rsvp.attended,
"attended_at": rsvp.attended_at.isoformat() if rsvp.attended_at else None
}
# Separate upcoming vs past events
if event.end_at > now:
upcoming_events.append(event_data)
else:
past_events.append(event_data)
return {
"upcoming_events": sorted(upcoming_events, key=lambda x: x["start_at"]),
"past_events": sorted(past_events, key=lambda x: x["start_at"], reverse=True),
"total_attended": sum(1 for rsvp in rsvps if rsvp.attended),
"total_rsvps": len(rsvps)
}
# ============================================================================
# Calendar Export Endpoints (Universal iCalendar .ics format)
# ============================================================================
@@ -3144,6 +3201,529 @@ async def get_import_job_details(
}
# ============================================================================
# WordPress CSV Import Endpoints
# ============================================================================
@api_router.post("/admin/import/upload-csv")
async def upload_wordpress_csv(
file: UploadFile = File(...),
current_user: User = Depends(require_permission("users.import")),
db: Session = Depends(get_db)
):
"""
Upload WordPress CSV, parse, and generate status suggestions.
This endpoint:
1. Validates the CSV file
2. Uploads to R2 storage
3. Parses WordPress data (PHP serialized roles, etc.)
4. Generates smart status suggestions
5. Creates ImportJob record with status='preview_ready'
6. Stores preview data in wordpress_metadata field
Returns:
Import job summary with data quality metrics
Requires permission: users.import
"""
# Validate file type
if not file.filename.endswith('.csv'):
raise HTTPException(status_code=400, detail="Only CSV files are supported")
# Validate file size (10MB max)
MAX_FILE_SIZE = 10 * 1024 * 1024 # 10MB
contents = await file.read()
if len(contents) > MAX_FILE_SIZE:
raise HTTPException(status_code=400, detail="File size exceeds 10MB limit")
# Save to temporary file for parsing
import tempfile
with tempfile.NamedTemporaryFile(mode='wb', delete=False, suffix='.csv') as tmp:
tmp.write(contents)
tmp_path = tmp.name
try:
# Fetch existing emails from database to check for duplicates
existing_emails = set(
email.lower() for (email,) in db.query(User.email).all()
)
logger.info(f"Checking against {len(existing_emails)} existing emails in database")
# Parse CSV with WordPress parser
analysis_result = analyze_csv(tmp_path, existing_emails=existing_emails)
# Note: File contents stored in wordpress_metadata, R2 upload optional
# Could implement R2 upload later if needed for archival purposes
# Create ImportJob record
import_job = ImportJob(
filename=file.filename,
file_key=None, # Optional: could add R2 upload later
total_rows=analysis_result['total_rows'],
processed_rows=0,
successful_rows=0,
failed_rows=0,
status=ImportJobStatus.preview_ready,
wordpress_metadata={
'preview_data': analysis_result['preview_data'],
'data_quality': analysis_result['data_quality'],
'valid_rows': analysis_result['valid_rows'],
'warnings': analysis_result['warnings'],
'errors': analysis_result['errors']
},
imported_by=current_user.id
)
db.add(import_job)
db.commit()
db.refresh(import_job)
logger.info(f"WordPress CSV uploaded: {import_job.id} by {current_user.email}")
return {
'import_job_id': str(import_job.id),
'total_rows': analysis_result['total_rows'],
'valid_rows': analysis_result['valid_rows'],
'warnings': analysis_result['warnings'],
'errors': analysis_result['errors'],
'data_quality': analysis_result['data_quality']
}
except Exception as e:
logger.error(f"Failed to upload WordPress CSV: {str(e)}")
raise HTTPException(status_code=500, detail=f"Failed to process CSV: {str(e)}")
finally:
# Clean up temp file
if os.path.exists(tmp_path):
os.unlink(tmp_path)
@api_router.get("/admin/import/{job_id}/preview")
async def get_import_preview(
job_id: str,
page: int = 1,
page_size: int = 50,
current_user: User = Depends(require_permission("users.view")),
db: Session = Depends(get_db)
):
"""
Get paginated preview data for WordPress import status review.
Returns preview data with suggested status mappings that admins
can review and override before executing the import.
Args:
job_id: Import job UUID
page: Page number (1-indexed)
page_size: Number of rows per page (default 50)
Returns:
Paginated preview data with status suggestions and warnings
Requires permission: users.view
"""
# Get import job
job = db.query(ImportJob).filter(ImportJob.id == job_id).first()
if not job:
raise HTTPException(status_code=404, detail="Import job not found")
# Verify job is in preview_ready status
if job.status != ImportJobStatus.preview_ready:
raise HTTPException(
status_code=400,
detail=f"Import job is not in preview_ready status (current: {job.status.value})"
)
# Get preview data from wordpress_metadata
preview_data = job.wordpress_metadata.get('preview_data', [])
# Format for paginated display
paginated = format_preview_for_display(preview_data, page, page_size)
return paginated
@api_router.post("/admin/import/{job_id}/execute")
async def execute_wordpress_import(
job_id: str,
overrides: dict = {},
options: dict = {},
current_user: User = Depends(require_permission("users.import")),
db: Session = Depends(get_db)
):
"""
Execute WordPress import with admin status overrides.
Process:
1. Merge status overrides with suggested mappings
2. Create users in batches (commit every 20 rows)
3. Track imported_user_ids for rollback capability
4. Queue password reset emails (async)
5. Update import job status
Args:
job_id: Import job UUID
overrides: Dict mapping row_number to status override
e.g., {'1': {'status': 'active'}, '5': {'status': 'inactive'}}
options: Import options
- send_password_emails: bool (default True)
- skip_errors: bool (default True)
Returns:
Import results with success/failure counts
Requires permission: users.import
"""
# Get import job
job = db.query(ImportJob).filter(ImportJob.id == job_id).first()
if not job:
raise HTTPException(status_code=404, detail="Import job not found")
# Verify job is in preview_ready status
if job.status != ImportJobStatus.preview_ready:
raise HTTPException(
status_code=400,
detail=f"Import job is not in preview_ready status (current: {job.status.value})"
)
# Update job status to processing
job.status = ImportJobStatus.processing
db.commit()
# Get preview data
preview_data = job.wordpress_metadata.get('preview_data', [])
# Import configuration
send_password_emails = options.get('send_password_emails', True)
skip_errors = options.get('skip_errors', True)
# Track results
imported_user_ids = []
successful_rows = 0
failed_rows = 0
errors = []
# Generate default password for all imported users
default_password_hash = get_password_hash(secrets.token_urlsafe(32))
try:
# Process each row
for idx, row_data in enumerate(preview_data):
row_num = row_data['row_number']
try:
# Skip rows with critical errors
if row_data.get('errors') and skip_errors:
failed_rows += 1
errors.append({
'row': row_num,
'email': row_data.get('email'),
'error': ', '.join(row_data['errors'])
})
continue
# Apply status override if provided
final_status = row_data['suggested_status']
if str(row_num) in overrides:
final_status = overrides[str(row_num)].get('status', final_status)
# Check if user already exists
existing_user = db.query(User).filter(User.email == row_data['email']).first()
if existing_user:
failed_rows += 1
errors.append({
'row': row_num,
'email': row_data['email'],
'error': 'User with this email already exists'
})
continue
# Create user
new_user = User(
email=row_data['email'],
password_hash=default_password_hash,
first_name=row_data.get('first_name', ''),
last_name=row_data.get('last_name', ''),
phone=row_data.get('phone'),
address='', # WordPress CSV doesn't have address data
city='',
state='',
zipcode='',
date_of_birth=row_data.get('date_of_birth'),
status=UserStatus[final_status],
role=UserRole[row_data['suggested_role']],
newsletter_subscribed=row_data.get('newsletter_consent', False),
email_verified=True, # WordPress users are pre-verified
import_source='wordpress',
import_job_id=job.id,
wordpress_user_id=row_data.get('wordpress_user_id'),
wordpress_registered_date=row_data.get('wordpress_registered')
)
db.add(new_user)
db.flush() # Flush to get the ID without committing
imported_user_ids.append(str(new_user.id))
successful_rows += 1
# Commit in batches of 20
if (idx + 1) % 20 == 0:
db.commit()
job.processed_rows = idx + 1
db.commit()
except Exception as e:
logger.error(f"Failed to import row {row_num}: {str(e)}")
failed_rows += 1
errors.append({
'row': row_num,
'email': row_data.get('email', ''),
'error': str(e)
})
if not skip_errors:
db.rollback()
raise HTTPException(status_code=500, detail=f"Import failed at row {row_num}: {str(e)}")
# Final commit
db.commit()
# Update import job
job.processed_rows = len(preview_data)
job.successful_rows = successful_rows
job.failed_rows = failed_rows
job.status = ImportJobStatus.completed if failed_rows == 0 else ImportJobStatus.partial
job.imported_user_ids = imported_user_ids
job.error_log = errors
job.completed_at = datetime.now(timezone.utc)
db.commit()
# Queue password reset emails (async, non-blocking)
password_emails_queued = 0
if send_password_emails and imported_user_ids:
try:
for user_id_str in imported_user_ids:
try:
# Convert to UUID and fetch user
user_uuid = uuid.UUID(user_id_str)
user = db.query(User).filter(User.id == user_uuid).first()
if user:
# Generate password reset token
reset_token = create_password_reset_token(user.email)
reset_url = f"{os.getenv('FRONTEND_URL')}/reset-password?token={reset_token}"
# Send email (async)
await send_password_reset_email(user.email, user.first_name, reset_url)
password_emails_queued += 1
except (ValueError, AttributeError) as e:
logger.warning(f"Skipping invalid user ID: {user_id_str}")
continue
except Exception as e:
logger.error(f"Failed to send password reset emails: {str(e)}")
# Don't fail import if emails fail
logger.info(f"Import executed: {job.id} - {successful_rows}/{len(preview_data)} by {current_user.email}")
return {
'successful_rows': successful_rows,
'failed_rows': failed_rows,
'imported_user_ids': imported_user_ids,
'password_emails_queued': password_emails_queued,
'errors': errors
}
except Exception as e:
db.rollback()
job.status = ImportJobStatus.failed
job.error_log = [{'error': str(e)}]
db.commit()
logger.error(f"Import execution failed: {str(e)}")
raise HTTPException(status_code=500, detail=f"Import execution failed: {str(e)}")
@api_router.post("/admin/import/{job_id}/rollback")
async def rollback_import_job(
job_id: str,
confirm: bool = False,
current_user: User = Depends(require_permission("users.import")),
db: Session = Depends(get_db)
):
"""
Delete all users from a specific import job (full rollback).
Safety checks:
- Requires confirm=True parameter
- Verifies job status is completed or partial
- Cannot rollback twice (checks rollback_at is None)
- Logs action to import_rollback_audit table
Args:
job_id: Import job UUID
confirm: Must be True to execute rollback
Returns:
Number of deleted users and confirmation message
Requires permission: users.import
"""
# Safety check: require explicit confirmation
if not confirm:
raise HTTPException(
status_code=400,
detail="Rollback requires confirm=true parameter"
)
# Get import job
job = db.query(ImportJob).filter(ImportJob.id == job_id).first()
if not job:
raise HTTPException(status_code=404, detail="Import job not found")
# Verify job can be rolled back
if job.status not in [ImportJobStatus.completed, ImportJobStatus.partial]:
raise HTTPException(
status_code=400,
detail=f"Cannot rollback import with status: {job.status.value}"
)
if job.rollback_at:
raise HTTPException(
status_code=400,
detail="Import has already been rolled back"
)
# Get imported user IDs
imported_user_ids = job.imported_user_ids or []
if not imported_user_ids:
raise HTTPException(
status_code=400,
detail="No users to rollback (imported_user_ids is empty)"
)
try:
# Delete all imported users
deleted_count = db.query(User).filter(
User.id.in_([uuid.UUID(uid) for uid in imported_user_ids])
).delete(synchronize_session=False)
# Update import job
job.status = ImportJobStatus.rolled_back
job.rollback_at = datetime.now(timezone.utc)
job.rollback_by = current_user.id
# Create audit record
from models import ImportRollbackAudit
audit = ImportRollbackAudit(
import_job_id=job.id,
rolled_back_by=current_user.id,
deleted_user_count=deleted_count,
deleted_user_ids=imported_user_ids,
reason="Manual rollback by admin"
)
db.add(audit)
db.commit()
logger.warning(f"Import rolled back: {job.id} - {deleted_count} users deleted by {current_user.email}")
return {
'deleted_users': deleted_count,
'message': f'Import rolled back successfully. {deleted_count} users deleted.'
}
except Exception as e:
db.rollback()
logger.error(f"Rollback failed for job {job.id}: {str(e)}")
raise HTTPException(status_code=500, detail=f"Rollback failed: {str(e)}")
@api_router.get("/admin/import/{job_id}/status")
async def get_import_status(
job_id: str,
current_user: User = Depends(require_permission("users.view")),
db: Session = Depends(get_db)
):
"""
Get real-time import progress status for polling.
Use this endpoint to poll for import progress updates
while the import is executing.
Args:
job_id: Import job UUID
Returns:
Current import status with progress percentage
Requires permission: users.view
"""
job = db.query(ImportJob).filter(ImportJob.id == job_id).first()
if not job:
raise HTTPException(status_code=404, detail="Import job not found")
progress_percent = 0.0
if job.total_rows > 0:
progress_percent = (job.processed_rows / job.total_rows) * 100
return {
'status': job.status.value,
'processed_rows': job.processed_rows,
'total_rows': job.total_rows,
'progress_percent': round(progress_percent, 1),
'successful_rows': job.successful_rows,
'failed_rows': job.failed_rows
}
@api_router.get("/admin/import/{job_id}/errors/download")
async def download_error_report(
job_id: str,
current_user: User = Depends(require_permission("users.view")),
db: Session = Depends(get_db)
):
"""
Download CSV report with all import errors.
CSV columns: Row Number, Email, Error Type, Error Message, Original Data
Args:
job_id: Import job UUID
Returns:
StreamingResponse with CSV file
Requires permission: users.view
"""
job = db.query(ImportJob).filter(ImportJob.id == job_id).first()
if not job:
raise HTTPException(status_code=404, detail="Import job not found")
errors = job.error_log or []
if not errors:
raise HTTPException(status_code=404, detail="No errors found for this import job")
# Generate CSV
output = io.StringIO()
writer = csv.DictWriter(output, fieldnames=['Row Number', 'Email', 'Error Type', 'Error Message'])
writer.writeheader()
for error in errors:
writer.writerow({
'Row Number': error.get('row', ''),
'Email': error.get('email', ''),
'Error Type': 'Import Error',
'Error Message': error.get('error', '')
})
# Return as streaming response
output.seek(0)
return StreamingResponse(
iter([output.getvalue()]),
media_type="text/csv",
headers={"Content-Disposition": f"attachment; filename=import_errors_{job_id}.csv"}
)
@api_router.post("/admin/events", response_model=EventResponse)
async def create_event(
request: EventCreate,
@@ -3257,8 +3837,18 @@ async def mark_attendance(
EventRSVP.user_id == request.user_id
).first()
# Auto-create RSVP if it doesn't exist (for retroactive attendance marking)
if not rsvp:
raise HTTPException(status_code=404, detail="RSVP not found")
rsvp = EventRSVP(
event_id=event_id,
user_id=request.user_id,
rsvp_status=RSVPStatus.yes, # Default to 'yes' for attended events
attended=False,
created_at=datetime.now(timezone.utc),
updated_at=datetime.now(timezone.utc)
)
db.add(rsvp)
db.flush() # Get the ID without committing
rsvp.attended = request.attended
rsvp.attended_at = datetime.now(timezone.utc) if request.attended else None

531
wordpress_parser.py Normal file
View File

@@ -0,0 +1,531 @@
"""
WordPress CSV Parser Module
This module provides utilities for parsing WordPress user export CSV files
and transforming them into LOAF platform-compatible data structures.
Key Features:
- Parse PHP serialized data (WordPress capabilities)
- Map WordPress roles to LOAF roles and statuses
- Validate and standardize user data (DOB, phone numbers)
- Generate smart status suggestions based on approval and subscription data
- Comprehensive data quality analysis and error reporting
Author: Claude Code
Date: 2025-12-24
"""
import csv
import re
import logging
from datetime import datetime
from typing import Dict, List, Optional, Tuple
import phpserialize
logger = logging.getLogger(__name__)
# ============================================================================
# WordPress Role Mapping Configuration
# ============================================================================
ROLE_MAPPING = {
# WordPress admin roles → LOAF admin roles (auto-active)
'administrator': ('superadmin', 'active'),
'loaf_admin': ('admin', 'active'),
'loaf_treasure': ('finance', 'active'),
'loaf_communication': ('admin', 'active'),
# WordPress member roles → LOAF member role (status from approval)
'pms_subscription_plan_63': ('member', None), # Status determined by approval
'registered': ('guest', None), # Default WordPress role
# Fallback for unknown roles
'__default__': ('guest', None)
}
# Role priority order (higher index = higher priority)
ROLE_PRIORITY = [
'registered',
'pms_subscription_plan_63',
'loaf_communication',
'loaf_treasure',
'loaf_admin',
'administrator'
]
# ============================================================================
# PHP Serialization Parsing
# ============================================================================
def parse_php_serialized(data: str) -> List[str]:
"""
Parse WordPress PHP serialized capabilities string.
WordPress stores user capabilities as serialized PHP arrays like:
a:1:{s:10:"registered";b:1;}
a:2:{s:10:"registered";b:1;s:24:"pms_subscription_plan_63";b:1;}
Args:
data: PHP serialized string
Returns:
List of role names (e.g., ['registered', 'pms_subscription_plan_63'])
Examples:
>>> parse_php_serialized('a:1:{s:10:"registered";b:1;}')
['registered']
>>> parse_php_serialized('a:2:{s:10:"registered";b:1;s:24:"pms_subscription_plan_63";b:1;}')
['registered', 'pms_subscription_plan_63']
"""
if not data or pd.isna(data):
return []
try:
# Use phpserialize library to parse
parsed = phpserialize.loads(data.encode('utf-8'))
# Extract role names (keys where value is True)
if isinstance(parsed, dict):
roles = [key.decode('utf-8') if isinstance(key, bytes) else key
for key, value in parsed.items() if value]
return roles
return []
except Exception as e:
logger.warning(f"Failed to parse PHP serialized data: {data[:50]}... Error: {str(e)}")
return []
# ============================================================================
# Role and Status Mapping
# ============================================================================
def map_wordpress_role(wp_roles: List[str]) -> Tuple[str, Optional[str]]:
"""
Map WordPress roles to LOAF role and suggested status.
Priority logic:
1. If user has any admin role → corresponding LOAF admin role with 'active' status
2. If user has subscription → 'member' role (status from approval)
3. Otherwise → 'guest' role (status from approval)
Args:
wp_roles: List of WordPress role names
Returns:
Tuple of (loaf_role, suggested_status)
- loaf_role: One of: superadmin, admin, finance, member, guest
- suggested_status: One of: active, pre_validated, payment_pending, None (determined by approval)
Examples:
>>> map_wordpress_role(['loaf_admin'])
('admin', 'active')
>>> map_wordpress_role(['loaf_treasure'])
('finance', 'active')
>>> map_wordpress_role(['pms_subscription_plan_63', 'registered'])
('member', None)
>>> map_wordpress_role(['registered'])
('guest', None)
"""
if not wp_roles:
return ROLE_MAPPING['__default__']
# Sort roles by priority (highest priority last)
prioritized_roles = sorted(
wp_roles,
key=lambda r: ROLE_PRIORITY.index(r) if r in ROLE_PRIORITY else -1
)
# Map highest priority role
highest_role = prioritized_roles[-1] if prioritized_roles else 'registered'
return ROLE_MAPPING.get(highest_role, ROLE_MAPPING['__default__'])
def suggest_status(approval_status: str, has_subscription: bool, wordpress_role: str = 'guest') -> str:
"""
Suggest LOAF user status based on WordPress approval and subscription data.
Logic:
1. Admin roles (loaf_admin, loaf_treasure, administrator) → always 'active'
2. approved + subscription → 'active'
3. approved without subscription → 'pre_validated'
4. pending → 'payment_pending'
5. Other/empty → 'pre_validated'
Args:
approval_status: WordPress approval status (approved, pending, unapproved, etc.)
has_subscription: Whether user has pms_subscription_plan_63 role
wordpress_role: LOAF role mapped from WordPress (for admin check)
Returns:
Suggested LOAF status: active, pre_validated, payment_pending, or inactive
Examples:
>>> suggest_status('approved', True, 'member')
'active'
>>> suggest_status('approved', False, 'member')
'pre_validated'
>>> suggest_status('pending', True, 'member')
'payment_pending'
>>> suggest_status('', False, 'admin')
'active'
"""
# Admin roles are always active
if wordpress_role in ('superadmin', 'admin', 'finance'):
return 'active'
# Normalize approval status
approval = (approval_status or '').lower().strip()
if approval == 'approved':
return 'active' if has_subscription else 'pre_validated'
elif approval == 'pending':
return 'payment_pending'
elif approval == 'unapproved':
return 'inactive'
else:
# Empty or unknown approval status
return 'pre_validated'
# ============================================================================
# Data Validation and Standardization
# ============================================================================
def standardize_phone(phone: str) -> str:
"""
Standardize phone number by extracting digits only.
Removes all non-digit characters:
- (713) 560-7850 → 7135607850
- 713-725-8902 → 7137258902
- Empty/None → 0000000000 (fallback)
Args:
phone: Phone number in any format
Returns:
10-digit phone number string (or 0000000000 if invalid)
Examples:
>>> standardize_phone('(713) 560-7850')
'7135607850'
>>> standardize_phone('713-725-8902')
'7137258902'
>>> standardize_phone('')
'0000000000'
"""
if not phone or pd.isna(phone):
return '0000000000'
# Extract all digits
digits = re.sub(r'\D', '', str(phone))
# Return 10 digits or fallback
if len(digits) == 10:
return digits
elif len(digits) == 11 and digits[0] == '1':
# Remove leading 1 (US country code)
return digits[1:]
else:
logger.warning(f"Invalid phone format: {phone} (extracted: {digits})")
return '0000000000'
def validate_dob(dob_str: str) -> Tuple[Optional[datetime], Optional[str]]:
"""
Validate and parse date of birth.
Validation rules:
- Must be in MM/DD/YYYY format
- Year must be between 1900 and current year
- Cannot be in the future
- Reject year 0000 or 2025+ (data quality issues in WordPress export)
Args:
dob_str: Date of birth string in MM/DD/YYYY format
Returns:
Tuple of (parsed_datetime, warning_message)
- parsed_datetime: datetime object if valid, None if invalid
- warning_message: Descriptive error message if invalid, None if valid
Examples:
>>> validate_dob('08/02/1962')
(datetime(1962, 8, 2), None)
>>> validate_dob('08/02/0000')
(None, 'Invalid year: 0000')
>>> validate_dob('08/02/2025')
(None, 'Date is in the future')
"""
if not dob_str or pd.isna(dob_str):
return None, 'Missing date of birth'
try:
# Parse MM/DD/YYYY format
parsed = datetime.strptime(str(dob_str).strip(), '%m/%d/%Y')
# Validate year range
if parsed.year == 0:
return None, 'Invalid year: 0000 (data quality issue)'
elif parsed.year < 1900:
return None, f'Year too old: {parsed.year} (likely invalid)'
elif parsed.year > datetime.now().year:
return None, f'Date is in the future: {parsed.year}'
elif parsed > datetime.now():
return None, 'Date is in the future'
return parsed, None
except ValueError as e:
return None, f'Invalid date format: {dob_str} (expected MM/DD/YYYY)'
# ============================================================================
# CSV Analysis and Preview Generation
# ============================================================================
def analyze_csv(file_path: str, existing_emails: Optional[set] = None) -> Dict:
"""
Analyze WordPress CSV file and generate preview data with status suggestions.
This is the main entry point for CSV processing. It:
1. Reads and parses the CSV file
2. Validates each row and generates warnings
3. Maps WordPress roles to LOAF roles
4. Suggests status for each user
5. Tracks data quality metrics
6. Checks for duplicate emails (both within CSV and against existing database)
7. Returns comprehensive analysis and preview data
Args:
file_path: Path to WordPress CSV export file
existing_emails: Set of emails already in the database (optional)
Returns:
Dictionary containing:
- total_rows: Total number of user rows
- valid_rows: Number of rows without critical errors
- warnings: Total warning count
- errors: Total critical error count
- preview_data: List of row dictionaries with suggestions
- data_quality: Dictionary of data quality metrics
Example output:
{
'total_rows': 183,
'valid_rows': 176,
'warnings': 66,
'errors': 7,
'preview_data': [
{
'row_number': 1,
'email': 'user@example.com',
'first_name': 'John',
'last_name': 'Doe',
'phone': '7135607850',
'date_of_birth': '1962-08-02',
'wordpress_roles': ['registered', 'pms_subscription_plan_63'],
'suggested_role': 'member',
'suggested_status': 'active',
'warnings': [],
'errors': []
},
...
],
'data_quality': {
'invalid_dob': 66,
'missing_phone': 7,
'duplicate_email_csv': 0,
'duplicate_email_db': 3,
'unparseable_roles': 2
}
}
"""
import pandas as pd
# Read CSV with pandas
df = pd.read_csv(file_path)
total_rows = len(df)
preview_data = []
data_quality = {
'invalid_dob': 0,
'missing_phone': 0,
'duplicate_email_csv': 0,
'duplicate_email_db': 0,
'unparseable_roles': 0,
'missing_email': 0
}
# Track seen emails for CSV duplicate detection
seen_emails = {}
# Convert existing_emails to set if provided
if existing_emails is None:
existing_emails = set()
for idx, row in df.iterrows():
row_num = idx + 1
warnings = []
errors = []
# Extract and validate email
email = str(row.get('user_email', '')).strip().lower()
if not email or email == 'nan':
errors.append('Missing email address')
data_quality['missing_email'] += 1
else:
# Check for duplicates within CSV
if email in seen_emails:
errors.append(f'Duplicate email in CSV (also in row {seen_emails[email]})')
data_quality['duplicate_email_csv'] += 1
# Check for duplicates in existing database
elif email in existing_emails:
errors.append(f'Email already exists in database')
data_quality['duplicate_email_db'] += 1
else:
seen_emails[email] = row_num
# Extract basic fields
first_name = str(row.get('first_name', '')).strip()
last_name = str(row.get('last_name', '')).strip()
# Parse and validate DOB
dob_parsed, dob_warning = validate_dob(row.get('date_of_birth'))
if dob_warning:
warnings.append(dob_warning)
data_quality['invalid_dob'] += 1
# Standardize phone
phone = standardize_phone(row.get('cell_phone'))
if phone == '0000000000':
warnings.append('Missing or invalid phone number')
data_quality['missing_phone'] += 1
# Parse WordPress roles
wp_capabilities = row.get('wp_capabilities', '')
wp_roles = parse_php_serialized(wp_capabilities)
if not wp_roles and wp_capabilities:
warnings.append('Could not parse WordPress roles')
data_quality['unparseable_roles'] += 1
# Map to LOAF role and status
loaf_role, role_suggested_status = map_wordpress_role(wp_roles)
# Determine if user has subscription
has_subscription = 'pms_subscription_plan_63' in wp_roles
# Get approval status
approval_status = str(row.get('wppb_approval_status', '')).strip()
# Suggest final status
if role_suggested_status:
# Admin roles have fixed status from role mapping
suggested_status = role_suggested_status
else:
# Regular users get status from approval logic
suggested_status = suggest_status(approval_status, has_subscription, loaf_role)
# Build preview row
preview_row = {
'row_number': row_num,
'email': email,
'first_name': first_name,
'last_name': last_name,
'phone': phone,
'date_of_birth': dob_parsed.isoformat() if dob_parsed else None,
'wordpress_user_id': int(row.get('ID', 0)) if pd.notna(row.get('ID')) else None,
'wordpress_registered': str(row.get('user_registered', '')),
'wordpress_roles': wp_roles,
'wordpress_approval_status': approval_status,
'has_subscription': has_subscription,
'suggested_role': loaf_role,
'suggested_status': suggested_status,
'warnings': warnings,
'errors': errors,
'newsletter_consent': str(row.get('newsletter_consent', '')).lower() == 'yes',
'newsletter_checklist': str(row.get('newsletter_checklist', '')).lower() == 'yes'
}
preview_data.append(preview_row)
# Calculate summary statistics
valid_rows = sum(1 for row in preview_data if not row['errors'])
total_warnings = sum(len(row['warnings']) for row in preview_data)
total_errors = sum(len(row['errors']) for row in preview_data)
return {
'total_rows': total_rows,
'valid_rows': valid_rows,
'warnings': total_warnings,
'errors': total_errors,
'preview_data': preview_data,
'data_quality': data_quality
}
# ============================================================================
# Utility Functions
# ============================================================================
def get_status_badge_color(status: str) -> str:
"""
Get appropriate badge color for status display in UI.
Args:
status: User status string
Returns:
Tailwind CSS color class
"""
colors = {
'active': 'bg-green-100 text-green-800',
'pre_validated': 'bg-blue-100 text-blue-800',
'payment_pending': 'bg-yellow-100 text-yellow-800',
'inactive': 'bg-gray-100 text-gray-800',
'pending_email': 'bg-purple-100 text-purple-800',
'awaiting_event': 'bg-indigo-100 text-indigo-800'
}
return colors.get(status, 'bg-gray-100 text-gray-800')
def format_preview_for_display(preview_data: List[Dict], page: int = 1, page_size: int = 50) -> Dict:
"""
Format preview data for paginated display in frontend.
Args:
preview_data: Full preview data list
page: Page number (1-indexed)
page_size: Number of rows per page
Returns:
Dictionary with paginated data and metadata
"""
total_pages = (len(preview_data) + page_size - 1) // page_size
start_idx = (page - 1) * page_size
end_idx = start_idx + page_size
return {
'page': page,
'page_size': page_size,
'total_pages': total_pages,
'total_rows': len(preview_data),
'rows': preview_data[start_idx:end_idx]
}
# ============================================================================
# Module Initialization
# ============================================================================
# Import pandas for CSV processing
try:
import pandas as pd
except ImportError:
logger.error("pandas library not found. Please install: pip install pandas")
raise
logger.info("WordPress parser module loaded successfully")