Test Preparation

This commit is contained in:
Koncept Kit
2025-12-26 20:03:53 +07:00
parent fad23c6e57
commit 487481b322
10 changed files with 1357 additions and 9 deletions

606
server.py
View File

@@ -17,7 +17,7 @@ import csv
import io
from database import engine, get_db, Base
from models import User, Event, EventRSVP, UserStatus, UserRole, RSVPStatus, SubscriptionPlan, Subscription, SubscriptionStatus, StorageUsage, EventGallery, NewsletterArchive, FinancialReport, BylawsDocument, Permission, RolePermission, Role, UserInvitation, InvitationStatus, ImportJob, ImportJobStatus, Donation, DonationType, DonationStatus
from models import User, Event, EventRSVP, UserStatus, UserRole, RSVPStatus, SubscriptionPlan, Subscription, SubscriptionStatus, StorageUsage, EventGallery, NewsletterArchive, FinancialReport, BylawsDocument, Permission, RolePermission, Role, UserInvitation, InvitationStatus, ImportJob, ImportJobStatus, ImportRollbackAudit, Donation, DonationType, DonationStatus
from auth import (
get_password_hash,
verify_password,
@@ -42,6 +42,7 @@ from email_service import (
from payment_service import create_checkout_session, verify_webhook_signature, get_subscription_end_date
from r2_storage import get_r2_storage
from calendar_service import CalendarService
from wordpress_parser import analyze_csv, format_preview_for_display
# Load environment variables
ROOT_DIR = Path(__file__).parent
@@ -655,9 +656,15 @@ async def login(request: LoginRequest, db: Session = Depends(get_db)):
access_token = create_access_token(data={"sub": str(user.id)})
# Clear verification token on first successful login after verification
# Don't let this fail the login if database commit fails
if user.email_verified and user.email_verification_token:
user.email_verification_token = None
db.commit()
try:
user.email_verification_token = None
db.commit()
except Exception as e:
logger.warning(f"Failed to clear verification token for user {user.id}: {str(e)}")
db.rollback()
# Continue with login - this is not critical
return {
"access_token": access_token,
@@ -887,7 +894,8 @@ async def get_member_directory(
"social_media_facebook": member.social_media_facebook,
"social_media_instagram": member.social_media_instagram,
"social_media_twitter": member.social_media_twitter,
"social_media_linkedin": member.social_media_linkedin
"social_media_linkedin": member.social_media_linkedin,
"created_at": member.created_at.isoformat() if member.created_at else None
} for member in directory_members]
@api_router.get("/members/directory/{user_id}")
@@ -922,7 +930,8 @@ async def get_directory_member_profile(
"social_media_facebook": member.social_media_facebook,
"social_media_instagram": member.social_media_instagram,
"social_media_twitter": member.social_media_twitter,
"social_media_linkedin": member.social_media_linkedin
"social_media_linkedin": member.social_media_linkedin,
"created_at": member.created_at.isoformat() if member.created_at else None
}
# Enhanced Profile Routes (Active Members Only)
@@ -1573,6 +1582,54 @@ async def rsvp_to_event(
return {"message": "RSVP updated successfully"}
@api_router.get("/members/event-activity")
async def get_my_event_activity(
current_user: User = Depends(get_active_member),
db: Session = Depends(get_db)
):
"""
Get current user's event activity including upcoming RSVPs and attendance history
"""
# Get all user's RSVPs
rsvps = db.query(EventRSVP).filter(
EventRSVP.user_id == current_user.id
).order_by(EventRSVP.created_at.desc()).all()
# Categorize events
upcoming_events = []
past_events = []
now = datetime.now(timezone.utc)
for rsvp in rsvps:
event = db.query(Event).filter(Event.id == rsvp.event_id).first()
if not event:
continue
event_data = {
"id": str(event.id),
"title": event.title,
"description": event.description,
"location": event.location,
"start_at": event.start_at.isoformat(),
"end_at": event.end_at.isoformat(),
"rsvp_status": rsvp.rsvp_status.value,
"attended": rsvp.attended,
"attended_at": rsvp.attended_at.isoformat() if rsvp.attended_at else None
}
# Separate upcoming vs past events
if event.end_at > now:
upcoming_events.append(event_data)
else:
past_events.append(event_data)
return {
"upcoming_events": sorted(upcoming_events, key=lambda x: x["start_at"]),
"past_events": sorted(past_events, key=lambda x: x["start_at"], reverse=True),
"total_attended": sum(1 for rsvp in rsvps if rsvp.attended),
"total_rsvps": len(rsvps)
}
# ============================================================================
# Calendar Export Endpoints (Universal iCalendar .ics format)
# ============================================================================
@@ -3144,6 +3201,529 @@ async def get_import_job_details(
}
# ============================================================================
# WordPress CSV Import Endpoints
# ============================================================================
@api_router.post("/admin/import/upload-csv")
async def upload_wordpress_csv(
file: UploadFile = File(...),
current_user: User = Depends(require_permission("users.import")),
db: Session = Depends(get_db)
):
"""
Upload WordPress CSV, parse, and generate status suggestions.
This endpoint:
1. Validates the CSV file
2. Uploads to R2 storage
3. Parses WordPress data (PHP serialized roles, etc.)
4. Generates smart status suggestions
5. Creates ImportJob record with status='preview_ready'
6. Stores preview data in wordpress_metadata field
Returns:
Import job summary with data quality metrics
Requires permission: users.import
"""
# Validate file type
if not file.filename.endswith('.csv'):
raise HTTPException(status_code=400, detail="Only CSV files are supported")
# Validate file size (10MB max)
MAX_FILE_SIZE = 10 * 1024 * 1024 # 10MB
contents = await file.read()
if len(contents) > MAX_FILE_SIZE:
raise HTTPException(status_code=400, detail="File size exceeds 10MB limit")
# Save to temporary file for parsing
import tempfile
with tempfile.NamedTemporaryFile(mode='wb', delete=False, suffix='.csv') as tmp:
tmp.write(contents)
tmp_path = tmp.name
try:
# Fetch existing emails from database to check for duplicates
existing_emails = set(
email.lower() for (email,) in db.query(User.email).all()
)
logger.info(f"Checking against {len(existing_emails)} existing emails in database")
# Parse CSV with WordPress parser
analysis_result = analyze_csv(tmp_path, existing_emails=existing_emails)
# Note: File contents stored in wordpress_metadata, R2 upload optional
# Could implement R2 upload later if needed for archival purposes
# Create ImportJob record
import_job = ImportJob(
filename=file.filename,
file_key=None, # Optional: could add R2 upload later
total_rows=analysis_result['total_rows'],
processed_rows=0,
successful_rows=0,
failed_rows=0,
status=ImportJobStatus.preview_ready,
wordpress_metadata={
'preview_data': analysis_result['preview_data'],
'data_quality': analysis_result['data_quality'],
'valid_rows': analysis_result['valid_rows'],
'warnings': analysis_result['warnings'],
'errors': analysis_result['errors']
},
imported_by=current_user.id
)
db.add(import_job)
db.commit()
db.refresh(import_job)
logger.info(f"WordPress CSV uploaded: {import_job.id} by {current_user.email}")
return {
'import_job_id': str(import_job.id),
'total_rows': analysis_result['total_rows'],
'valid_rows': analysis_result['valid_rows'],
'warnings': analysis_result['warnings'],
'errors': analysis_result['errors'],
'data_quality': analysis_result['data_quality']
}
except Exception as e:
logger.error(f"Failed to upload WordPress CSV: {str(e)}")
raise HTTPException(status_code=500, detail=f"Failed to process CSV: {str(e)}")
finally:
# Clean up temp file
if os.path.exists(tmp_path):
os.unlink(tmp_path)
@api_router.get("/admin/import/{job_id}/preview")
async def get_import_preview(
job_id: str,
page: int = 1,
page_size: int = 50,
current_user: User = Depends(require_permission("users.view")),
db: Session = Depends(get_db)
):
"""
Get paginated preview data for WordPress import status review.
Returns preview data with suggested status mappings that admins
can review and override before executing the import.
Args:
job_id: Import job UUID
page: Page number (1-indexed)
page_size: Number of rows per page (default 50)
Returns:
Paginated preview data with status suggestions and warnings
Requires permission: users.view
"""
# Get import job
job = db.query(ImportJob).filter(ImportJob.id == job_id).first()
if not job:
raise HTTPException(status_code=404, detail="Import job not found")
# Verify job is in preview_ready status
if job.status != ImportJobStatus.preview_ready:
raise HTTPException(
status_code=400,
detail=f"Import job is not in preview_ready status (current: {job.status.value})"
)
# Get preview data from wordpress_metadata
preview_data = job.wordpress_metadata.get('preview_data', [])
# Format for paginated display
paginated = format_preview_for_display(preview_data, page, page_size)
return paginated
@api_router.post("/admin/import/{job_id}/execute")
async def execute_wordpress_import(
job_id: str,
overrides: dict = {},
options: dict = {},
current_user: User = Depends(require_permission("users.import")),
db: Session = Depends(get_db)
):
"""
Execute WordPress import with admin status overrides.
Process:
1. Merge status overrides with suggested mappings
2. Create users in batches (commit every 20 rows)
3. Track imported_user_ids for rollback capability
4. Queue password reset emails (async)
5. Update import job status
Args:
job_id: Import job UUID
overrides: Dict mapping row_number to status override
e.g., {'1': {'status': 'active'}, '5': {'status': 'inactive'}}
options: Import options
- send_password_emails: bool (default True)
- skip_errors: bool (default True)
Returns:
Import results with success/failure counts
Requires permission: users.import
"""
# Get import job
job = db.query(ImportJob).filter(ImportJob.id == job_id).first()
if not job:
raise HTTPException(status_code=404, detail="Import job not found")
# Verify job is in preview_ready status
if job.status != ImportJobStatus.preview_ready:
raise HTTPException(
status_code=400,
detail=f"Import job is not in preview_ready status (current: {job.status.value})"
)
# Update job status to processing
job.status = ImportJobStatus.processing
db.commit()
# Get preview data
preview_data = job.wordpress_metadata.get('preview_data', [])
# Import configuration
send_password_emails = options.get('send_password_emails', True)
skip_errors = options.get('skip_errors', True)
# Track results
imported_user_ids = []
successful_rows = 0
failed_rows = 0
errors = []
# Generate default password for all imported users
default_password_hash = get_password_hash(secrets.token_urlsafe(32))
try:
# Process each row
for idx, row_data in enumerate(preview_data):
row_num = row_data['row_number']
try:
# Skip rows with critical errors
if row_data.get('errors') and skip_errors:
failed_rows += 1
errors.append({
'row': row_num,
'email': row_data.get('email'),
'error': ', '.join(row_data['errors'])
})
continue
# Apply status override if provided
final_status = row_data['suggested_status']
if str(row_num) in overrides:
final_status = overrides[str(row_num)].get('status', final_status)
# Check if user already exists
existing_user = db.query(User).filter(User.email == row_data['email']).first()
if existing_user:
failed_rows += 1
errors.append({
'row': row_num,
'email': row_data['email'],
'error': 'User with this email already exists'
})
continue
# Create user
new_user = User(
email=row_data['email'],
password_hash=default_password_hash,
first_name=row_data.get('first_name', ''),
last_name=row_data.get('last_name', ''),
phone=row_data.get('phone'),
address='', # WordPress CSV doesn't have address data
city='',
state='',
zipcode='',
date_of_birth=row_data.get('date_of_birth'),
status=UserStatus[final_status],
role=UserRole[row_data['suggested_role']],
newsletter_subscribed=row_data.get('newsletter_consent', False),
email_verified=True, # WordPress users are pre-verified
import_source='wordpress',
import_job_id=job.id,
wordpress_user_id=row_data.get('wordpress_user_id'),
wordpress_registered_date=row_data.get('wordpress_registered')
)
db.add(new_user)
db.flush() # Flush to get the ID without committing
imported_user_ids.append(str(new_user.id))
successful_rows += 1
# Commit in batches of 20
if (idx + 1) % 20 == 0:
db.commit()
job.processed_rows = idx + 1
db.commit()
except Exception as e:
logger.error(f"Failed to import row {row_num}: {str(e)}")
failed_rows += 1
errors.append({
'row': row_num,
'email': row_data.get('email', ''),
'error': str(e)
})
if not skip_errors:
db.rollback()
raise HTTPException(status_code=500, detail=f"Import failed at row {row_num}: {str(e)}")
# Final commit
db.commit()
# Update import job
job.processed_rows = len(preview_data)
job.successful_rows = successful_rows
job.failed_rows = failed_rows
job.status = ImportJobStatus.completed if failed_rows == 0 else ImportJobStatus.partial
job.imported_user_ids = imported_user_ids
job.error_log = errors
job.completed_at = datetime.now(timezone.utc)
db.commit()
# Queue password reset emails (async, non-blocking)
password_emails_queued = 0
if send_password_emails and imported_user_ids:
try:
for user_id_str in imported_user_ids:
try:
# Convert to UUID and fetch user
user_uuid = uuid.UUID(user_id_str)
user = db.query(User).filter(User.id == user_uuid).first()
if user:
# Generate password reset token
reset_token = create_password_reset_token(user.email)
reset_url = f"{os.getenv('FRONTEND_URL')}/reset-password?token={reset_token}"
# Send email (async)
await send_password_reset_email(user.email, user.first_name, reset_url)
password_emails_queued += 1
except (ValueError, AttributeError) as e:
logger.warning(f"Skipping invalid user ID: {user_id_str}")
continue
except Exception as e:
logger.error(f"Failed to send password reset emails: {str(e)}")
# Don't fail import if emails fail
logger.info(f"Import executed: {job.id} - {successful_rows}/{len(preview_data)} by {current_user.email}")
return {
'successful_rows': successful_rows,
'failed_rows': failed_rows,
'imported_user_ids': imported_user_ids,
'password_emails_queued': password_emails_queued,
'errors': errors
}
except Exception as e:
db.rollback()
job.status = ImportJobStatus.failed
job.error_log = [{'error': str(e)}]
db.commit()
logger.error(f"Import execution failed: {str(e)}")
raise HTTPException(status_code=500, detail=f"Import execution failed: {str(e)}")
@api_router.post("/admin/import/{job_id}/rollback")
async def rollback_import_job(
job_id: str,
confirm: bool = False,
current_user: User = Depends(require_permission("users.import")),
db: Session = Depends(get_db)
):
"""
Delete all users from a specific import job (full rollback).
Safety checks:
- Requires confirm=True parameter
- Verifies job status is completed or partial
- Cannot rollback twice (checks rollback_at is None)
- Logs action to import_rollback_audit table
Args:
job_id: Import job UUID
confirm: Must be True to execute rollback
Returns:
Number of deleted users and confirmation message
Requires permission: users.import
"""
# Safety check: require explicit confirmation
if not confirm:
raise HTTPException(
status_code=400,
detail="Rollback requires confirm=true parameter"
)
# Get import job
job = db.query(ImportJob).filter(ImportJob.id == job_id).first()
if not job:
raise HTTPException(status_code=404, detail="Import job not found")
# Verify job can be rolled back
if job.status not in [ImportJobStatus.completed, ImportJobStatus.partial]:
raise HTTPException(
status_code=400,
detail=f"Cannot rollback import with status: {job.status.value}"
)
if job.rollback_at:
raise HTTPException(
status_code=400,
detail="Import has already been rolled back"
)
# Get imported user IDs
imported_user_ids = job.imported_user_ids or []
if not imported_user_ids:
raise HTTPException(
status_code=400,
detail="No users to rollback (imported_user_ids is empty)"
)
try:
# Delete all imported users
deleted_count = db.query(User).filter(
User.id.in_([uuid.UUID(uid) for uid in imported_user_ids])
).delete(synchronize_session=False)
# Update import job
job.status = ImportJobStatus.rolled_back
job.rollback_at = datetime.now(timezone.utc)
job.rollback_by = current_user.id
# Create audit record
from models import ImportRollbackAudit
audit = ImportRollbackAudit(
import_job_id=job.id,
rolled_back_by=current_user.id,
deleted_user_count=deleted_count,
deleted_user_ids=imported_user_ids,
reason="Manual rollback by admin"
)
db.add(audit)
db.commit()
logger.warning(f"Import rolled back: {job.id} - {deleted_count} users deleted by {current_user.email}")
return {
'deleted_users': deleted_count,
'message': f'Import rolled back successfully. {deleted_count} users deleted.'
}
except Exception as e:
db.rollback()
logger.error(f"Rollback failed for job {job.id}: {str(e)}")
raise HTTPException(status_code=500, detail=f"Rollback failed: {str(e)}")
@api_router.get("/admin/import/{job_id}/status")
async def get_import_status(
job_id: str,
current_user: User = Depends(require_permission("users.view")),
db: Session = Depends(get_db)
):
"""
Get real-time import progress status for polling.
Use this endpoint to poll for import progress updates
while the import is executing.
Args:
job_id: Import job UUID
Returns:
Current import status with progress percentage
Requires permission: users.view
"""
job = db.query(ImportJob).filter(ImportJob.id == job_id).first()
if not job:
raise HTTPException(status_code=404, detail="Import job not found")
progress_percent = 0.0
if job.total_rows > 0:
progress_percent = (job.processed_rows / job.total_rows) * 100
return {
'status': job.status.value,
'processed_rows': job.processed_rows,
'total_rows': job.total_rows,
'progress_percent': round(progress_percent, 1),
'successful_rows': job.successful_rows,
'failed_rows': job.failed_rows
}
@api_router.get("/admin/import/{job_id}/errors/download")
async def download_error_report(
job_id: str,
current_user: User = Depends(require_permission("users.view")),
db: Session = Depends(get_db)
):
"""
Download CSV report with all import errors.
CSV columns: Row Number, Email, Error Type, Error Message, Original Data
Args:
job_id: Import job UUID
Returns:
StreamingResponse with CSV file
Requires permission: users.view
"""
job = db.query(ImportJob).filter(ImportJob.id == job_id).first()
if not job:
raise HTTPException(status_code=404, detail="Import job not found")
errors = job.error_log or []
if not errors:
raise HTTPException(status_code=404, detail="No errors found for this import job")
# Generate CSV
output = io.StringIO()
writer = csv.DictWriter(output, fieldnames=['Row Number', 'Email', 'Error Type', 'Error Message'])
writer.writeheader()
for error in errors:
writer.writerow({
'Row Number': error.get('row', ''),
'Email': error.get('email', ''),
'Error Type': 'Import Error',
'Error Message': error.get('error', '')
})
# Return as streaming response
output.seek(0)
return StreamingResponse(
iter([output.getvalue()]),
media_type="text/csv",
headers={"Content-Disposition": f"attachment; filename=import_errors_{job_id}.csv"}
)
@api_router.post("/admin/events", response_model=EventResponse)
async def create_event(
request: EventCreate,
@@ -3256,10 +3836,20 @@ async def mark_attendance(
EventRSVP.event_id == event_id,
EventRSVP.user_id == request.user_id
).first()
# Auto-create RSVP if it doesn't exist (for retroactive attendance marking)
if not rsvp:
raise HTTPException(status_code=404, detail="RSVP not found")
rsvp = EventRSVP(
event_id=event_id,
user_id=request.user_id,
rsvp_status=RSVPStatus.yes, # Default to 'yes' for attended events
attended=False,
created_at=datetime.now(timezone.utc),
updated_at=datetime.now(timezone.utc)
)
db.add(rsvp)
db.flush() # Get the ID without committing
rsvp.attended = request.attended
rsvp.attended_at = datetime.now(timezone.utc) if request.attended else None
rsvp.updated_at = datetime.now(timezone.utc)