532 lines
18 KiB
Python
532 lines
18 KiB
Python
"""
|
|
WordPress CSV Parser Module
|
|
|
|
This module provides utilities for parsing WordPress user export CSV files
|
|
and transforming them into LOAF platform-compatible data structures.
|
|
|
|
Key Features:
|
|
- Parse PHP serialized data (WordPress capabilities)
|
|
- Map WordPress roles to LOAF roles and statuses
|
|
- Validate and standardize user data (DOB, phone numbers)
|
|
- Generate smart status suggestions based on approval and subscription data
|
|
- Comprehensive data quality analysis and error reporting
|
|
|
|
Author: Claude Code
|
|
Date: 2025-12-24
|
|
"""
|
|
|
|
import csv
|
|
import re
|
|
import logging
|
|
from datetime import datetime
|
|
from typing import Dict, List, Optional, Tuple
|
|
import phpserialize
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
|
|
# ============================================================================
|
|
# WordPress Role Mapping Configuration
|
|
# ============================================================================
|
|
|
|
ROLE_MAPPING = {
|
|
# WordPress admin roles → LOAF admin roles (auto-active)
|
|
'administrator': ('superadmin', 'active'),
|
|
'loaf_admin': ('admin', 'active'),
|
|
'loaf_treasure': ('finance', 'active'),
|
|
'loaf_communication': ('admin', 'active'),
|
|
|
|
# WordPress member roles → LOAF member role (status from approval)
|
|
'pms_subscription_plan_63': ('member', None), # Status determined by approval
|
|
'registered': ('guest', None), # Default WordPress role
|
|
|
|
# Fallback for unknown roles
|
|
'__default__': ('guest', None)
|
|
}
|
|
|
|
# Role priority order (higher index = higher priority)
|
|
ROLE_PRIORITY = [
|
|
'registered',
|
|
'pms_subscription_plan_63',
|
|
'loaf_communication',
|
|
'loaf_treasure',
|
|
'loaf_admin',
|
|
'administrator'
|
|
]
|
|
|
|
|
|
# ============================================================================
|
|
# PHP Serialization Parsing
|
|
# ============================================================================
|
|
|
|
def parse_php_serialized(data: str) -> List[str]:
|
|
"""
|
|
Parse WordPress PHP serialized capabilities string.
|
|
|
|
WordPress stores user capabilities as serialized PHP arrays like:
|
|
a:1:{s:10:"registered";b:1;}
|
|
a:2:{s:10:"registered";b:1;s:24:"pms_subscription_plan_63";b:1;}
|
|
|
|
Args:
|
|
data: PHP serialized string
|
|
|
|
Returns:
|
|
List of role names (e.g., ['registered', 'pms_subscription_plan_63'])
|
|
|
|
Examples:
|
|
>>> parse_php_serialized('a:1:{s:10:"registered";b:1;}')
|
|
['registered']
|
|
>>> parse_php_serialized('a:2:{s:10:"registered";b:1;s:24:"pms_subscription_plan_63";b:1;}')
|
|
['registered', 'pms_subscription_plan_63']
|
|
"""
|
|
if not data or pd.isna(data):
|
|
return []
|
|
|
|
try:
|
|
# Use phpserialize library to parse
|
|
parsed = phpserialize.loads(data.encode('utf-8'))
|
|
|
|
# Extract role names (keys where value is True)
|
|
if isinstance(parsed, dict):
|
|
roles = [key.decode('utf-8') if isinstance(key, bytes) else key
|
|
for key, value in parsed.items() if value]
|
|
return roles
|
|
|
|
return []
|
|
except Exception as e:
|
|
logger.warning(f"Failed to parse PHP serialized data: {data[:50]}... Error: {str(e)}")
|
|
return []
|
|
|
|
|
|
# ============================================================================
|
|
# Role and Status Mapping
|
|
# ============================================================================
|
|
|
|
def map_wordpress_role(wp_roles: List[str]) -> Tuple[str, Optional[str]]:
|
|
"""
|
|
Map WordPress roles to LOAF role and suggested status.
|
|
|
|
Priority logic:
|
|
1. If user has any admin role → corresponding LOAF admin role with 'active' status
|
|
2. If user has subscription → 'member' role (status from approval)
|
|
3. Otherwise → 'guest' role (status from approval)
|
|
|
|
Args:
|
|
wp_roles: List of WordPress role names
|
|
|
|
Returns:
|
|
Tuple of (loaf_role, suggested_status)
|
|
- loaf_role: One of: superadmin, admin, finance, member, guest
|
|
- suggested_status: One of: active, pre_validated, payment_pending, None (determined by approval)
|
|
|
|
Examples:
|
|
>>> map_wordpress_role(['loaf_admin'])
|
|
('admin', 'active')
|
|
>>> map_wordpress_role(['loaf_treasure'])
|
|
('finance', 'active')
|
|
>>> map_wordpress_role(['pms_subscription_plan_63', 'registered'])
|
|
('member', None)
|
|
>>> map_wordpress_role(['registered'])
|
|
('guest', None)
|
|
"""
|
|
if not wp_roles:
|
|
return ROLE_MAPPING['__default__']
|
|
|
|
# Sort roles by priority (highest priority last)
|
|
prioritized_roles = sorted(
|
|
wp_roles,
|
|
key=lambda r: ROLE_PRIORITY.index(r) if r in ROLE_PRIORITY else -1
|
|
)
|
|
|
|
# Map highest priority role
|
|
highest_role = prioritized_roles[-1] if prioritized_roles else 'registered'
|
|
return ROLE_MAPPING.get(highest_role, ROLE_MAPPING['__default__'])
|
|
|
|
|
|
def suggest_status(approval_status: str, has_subscription: bool, wordpress_role: str = 'guest') -> str:
|
|
"""
|
|
Suggest LOAF user status based on WordPress approval and subscription data.
|
|
|
|
Logic:
|
|
1. Admin roles (loaf_admin, loaf_treasure, administrator) → always 'active'
|
|
2. approved + subscription → 'active'
|
|
3. approved without subscription → 'pre_validated'
|
|
4. pending → 'payment_pending'
|
|
5. Other/empty → 'pre_validated'
|
|
|
|
Args:
|
|
approval_status: WordPress approval status (approved, pending, unapproved, etc.)
|
|
has_subscription: Whether user has pms_subscription_plan_63 role
|
|
wordpress_role: LOAF role mapped from WordPress (for admin check)
|
|
|
|
Returns:
|
|
Suggested LOAF status: active, pre_validated, payment_pending, or inactive
|
|
|
|
Examples:
|
|
>>> suggest_status('approved', True, 'member')
|
|
'active'
|
|
>>> suggest_status('approved', False, 'member')
|
|
'pre_validated'
|
|
>>> suggest_status('pending', True, 'member')
|
|
'payment_pending'
|
|
>>> suggest_status('', False, 'admin')
|
|
'active'
|
|
"""
|
|
# Admin roles are always active
|
|
if wordpress_role in ('superadmin', 'admin', 'finance'):
|
|
return 'active'
|
|
|
|
# Normalize approval status
|
|
approval = (approval_status or '').lower().strip()
|
|
|
|
if approval == 'approved':
|
|
return 'active' if has_subscription else 'pre_validated'
|
|
elif approval == 'pending':
|
|
return 'payment_pending'
|
|
elif approval == 'unapproved':
|
|
return 'inactive'
|
|
else:
|
|
# Empty or unknown approval status
|
|
return 'pre_validated'
|
|
|
|
|
|
# ============================================================================
|
|
# Data Validation and Standardization
|
|
# ============================================================================
|
|
|
|
def standardize_phone(phone: str) -> str:
|
|
"""
|
|
Standardize phone number by extracting digits only.
|
|
|
|
Removes all non-digit characters:
|
|
- (713) 560-7850 → 7135607850
|
|
- 713-725-8902 → 7137258902
|
|
- Empty/None → 0000000000 (fallback)
|
|
|
|
Args:
|
|
phone: Phone number in any format
|
|
|
|
Returns:
|
|
10-digit phone number string (or 0000000000 if invalid)
|
|
|
|
Examples:
|
|
>>> standardize_phone('(713) 560-7850')
|
|
'7135607850'
|
|
>>> standardize_phone('713-725-8902')
|
|
'7137258902'
|
|
>>> standardize_phone('')
|
|
'0000000000'
|
|
"""
|
|
if not phone or pd.isna(phone):
|
|
return '0000000000'
|
|
|
|
# Extract all digits
|
|
digits = re.sub(r'\D', '', str(phone))
|
|
|
|
# Return 10 digits or fallback
|
|
if len(digits) == 10:
|
|
return digits
|
|
elif len(digits) == 11 and digits[0] == '1':
|
|
# Remove leading 1 (US country code)
|
|
return digits[1:]
|
|
else:
|
|
logger.warning(f"Invalid phone format: {phone} (extracted: {digits})")
|
|
return '0000000000'
|
|
|
|
|
|
def validate_dob(dob_str: str) -> Tuple[Optional[datetime], Optional[str]]:
|
|
"""
|
|
Validate and parse date of birth.
|
|
|
|
Validation rules:
|
|
- Must be in MM/DD/YYYY format
|
|
- Year must be between 1900 and current year
|
|
- Cannot be in the future
|
|
- Reject year 0000 or 2025+ (data quality issues in WordPress export)
|
|
|
|
Args:
|
|
dob_str: Date of birth string in MM/DD/YYYY format
|
|
|
|
Returns:
|
|
Tuple of (parsed_datetime, warning_message)
|
|
- parsed_datetime: datetime object if valid, None if invalid
|
|
- warning_message: Descriptive error message if invalid, None if valid
|
|
|
|
Examples:
|
|
>>> validate_dob('08/02/1962')
|
|
(datetime(1962, 8, 2), None)
|
|
>>> validate_dob('08/02/0000')
|
|
(None, 'Invalid year: 0000')
|
|
>>> validate_dob('08/02/2025')
|
|
(None, 'Date is in the future')
|
|
"""
|
|
if not dob_str or pd.isna(dob_str):
|
|
return None, 'Missing date of birth'
|
|
|
|
try:
|
|
# Parse MM/DD/YYYY format
|
|
parsed = datetime.strptime(str(dob_str).strip(), '%m/%d/%Y')
|
|
|
|
# Validate year range
|
|
if parsed.year == 0:
|
|
return None, 'Invalid year: 0000 (data quality issue)'
|
|
elif parsed.year < 1900:
|
|
return None, f'Year too old: {parsed.year} (likely invalid)'
|
|
elif parsed.year > datetime.now().year:
|
|
return None, f'Date is in the future: {parsed.year}'
|
|
elif parsed > datetime.now():
|
|
return None, 'Date is in the future'
|
|
|
|
return parsed, None
|
|
|
|
except ValueError as e:
|
|
return None, f'Invalid date format: {dob_str} (expected MM/DD/YYYY)'
|
|
|
|
|
|
# ============================================================================
|
|
# CSV Analysis and Preview Generation
|
|
# ============================================================================
|
|
|
|
def analyze_csv(file_path: str, existing_emails: Optional[set] = None) -> Dict:
|
|
"""
|
|
Analyze WordPress CSV file and generate preview data with status suggestions.
|
|
|
|
This is the main entry point for CSV processing. It:
|
|
1. Reads and parses the CSV file
|
|
2. Validates each row and generates warnings
|
|
3. Maps WordPress roles to LOAF roles
|
|
4. Suggests status for each user
|
|
5. Tracks data quality metrics
|
|
6. Checks for duplicate emails (both within CSV and against existing database)
|
|
7. Returns comprehensive analysis and preview data
|
|
|
|
Args:
|
|
file_path: Path to WordPress CSV export file
|
|
existing_emails: Set of emails already in the database (optional)
|
|
|
|
Returns:
|
|
Dictionary containing:
|
|
- total_rows: Total number of user rows
|
|
- valid_rows: Number of rows without critical errors
|
|
- warnings: Total warning count
|
|
- errors: Total critical error count
|
|
- preview_data: List of row dictionaries with suggestions
|
|
- data_quality: Dictionary of data quality metrics
|
|
|
|
Example output:
|
|
{
|
|
'total_rows': 183,
|
|
'valid_rows': 176,
|
|
'warnings': 66,
|
|
'errors': 7,
|
|
'preview_data': [
|
|
{
|
|
'row_number': 1,
|
|
'email': 'user@example.com',
|
|
'first_name': 'John',
|
|
'last_name': 'Doe',
|
|
'phone': '7135607850',
|
|
'date_of_birth': '1962-08-02',
|
|
'wordpress_roles': ['registered', 'pms_subscription_plan_63'],
|
|
'suggested_role': 'member',
|
|
'suggested_status': 'active',
|
|
'warnings': [],
|
|
'errors': []
|
|
},
|
|
...
|
|
],
|
|
'data_quality': {
|
|
'invalid_dob': 66,
|
|
'missing_phone': 7,
|
|
'duplicate_email_csv': 0,
|
|
'duplicate_email_db': 3,
|
|
'unparseable_roles': 2
|
|
}
|
|
}
|
|
"""
|
|
import pandas as pd
|
|
|
|
# Read CSV with pandas
|
|
df = pd.read_csv(file_path)
|
|
|
|
total_rows = len(df)
|
|
preview_data = []
|
|
data_quality = {
|
|
'invalid_dob': 0,
|
|
'missing_phone': 0,
|
|
'duplicate_email_csv': 0,
|
|
'duplicate_email_db': 0,
|
|
'unparseable_roles': 0,
|
|
'missing_email': 0
|
|
}
|
|
|
|
# Track seen emails for CSV duplicate detection
|
|
seen_emails = {}
|
|
|
|
# Convert existing_emails to set if provided
|
|
if existing_emails is None:
|
|
existing_emails = set()
|
|
|
|
for idx, row in df.iterrows():
|
|
row_num = idx + 1
|
|
warnings = []
|
|
errors = []
|
|
|
|
# Extract and validate email
|
|
email = str(row.get('user_email', '')).strip().lower()
|
|
if not email or email == 'nan':
|
|
errors.append('Missing email address')
|
|
data_quality['missing_email'] += 1
|
|
else:
|
|
# Check for duplicates within CSV
|
|
if email in seen_emails:
|
|
errors.append(f'Duplicate email in CSV (also in row {seen_emails[email]})')
|
|
data_quality['duplicate_email_csv'] += 1
|
|
# Check for duplicates in existing database
|
|
elif email in existing_emails:
|
|
errors.append(f'Email already exists in database')
|
|
data_quality['duplicate_email_db'] += 1
|
|
else:
|
|
seen_emails[email] = row_num
|
|
|
|
# Extract basic fields
|
|
first_name = str(row.get('first_name', '')).strip()
|
|
last_name = str(row.get('last_name', '')).strip()
|
|
|
|
# Parse and validate DOB
|
|
dob_parsed, dob_warning = validate_dob(row.get('date_of_birth'))
|
|
if dob_warning:
|
|
warnings.append(dob_warning)
|
|
data_quality['invalid_dob'] += 1
|
|
|
|
# Standardize phone
|
|
phone = standardize_phone(row.get('cell_phone'))
|
|
if phone == '0000000000':
|
|
warnings.append('Missing or invalid phone number')
|
|
data_quality['missing_phone'] += 1
|
|
|
|
# Parse WordPress roles
|
|
wp_capabilities = row.get('wp_capabilities', '')
|
|
wp_roles = parse_php_serialized(wp_capabilities)
|
|
if not wp_roles and wp_capabilities:
|
|
warnings.append('Could not parse WordPress roles')
|
|
data_quality['unparseable_roles'] += 1
|
|
|
|
# Map to LOAF role and status
|
|
loaf_role, role_suggested_status = map_wordpress_role(wp_roles)
|
|
|
|
# Determine if user has subscription
|
|
has_subscription = 'pms_subscription_plan_63' in wp_roles
|
|
|
|
# Get approval status
|
|
approval_status = str(row.get('wppb_approval_status', '')).strip()
|
|
|
|
# Suggest final status
|
|
if role_suggested_status:
|
|
# Admin roles have fixed status from role mapping
|
|
suggested_status = role_suggested_status
|
|
else:
|
|
# Regular users get status from approval logic
|
|
suggested_status = suggest_status(approval_status, has_subscription, loaf_role)
|
|
|
|
# Build preview row
|
|
preview_row = {
|
|
'row_number': row_num,
|
|
'email': email,
|
|
'first_name': first_name,
|
|
'last_name': last_name,
|
|
'phone': phone,
|
|
'date_of_birth': dob_parsed.isoformat() if dob_parsed else None,
|
|
'wordpress_user_id': int(row.get('ID', 0)) if pd.notna(row.get('ID')) else None,
|
|
'wordpress_registered': str(row.get('user_registered', '')),
|
|
'wordpress_roles': wp_roles,
|
|
'wordpress_approval_status': approval_status,
|
|
'has_subscription': has_subscription,
|
|
'suggested_role': loaf_role,
|
|
'suggested_status': suggested_status,
|
|
'warnings': warnings,
|
|
'errors': errors,
|
|
'newsletter_consent': str(row.get('newsletter_consent', '')).lower() == 'yes',
|
|
'newsletter_checklist': str(row.get('newsletter_checklist', '')).lower() == 'yes'
|
|
}
|
|
|
|
preview_data.append(preview_row)
|
|
|
|
# Calculate summary statistics
|
|
valid_rows = sum(1 for row in preview_data if not row['errors'])
|
|
total_warnings = sum(len(row['warnings']) for row in preview_data)
|
|
total_errors = sum(len(row['errors']) for row in preview_data)
|
|
|
|
return {
|
|
'total_rows': total_rows,
|
|
'valid_rows': valid_rows,
|
|
'warnings': total_warnings,
|
|
'errors': total_errors,
|
|
'preview_data': preview_data,
|
|
'data_quality': data_quality
|
|
}
|
|
|
|
|
|
# ============================================================================
|
|
# Utility Functions
|
|
# ============================================================================
|
|
|
|
def get_status_badge_color(status: str) -> str:
|
|
"""
|
|
Get appropriate badge color for status display in UI.
|
|
|
|
Args:
|
|
status: User status string
|
|
|
|
Returns:
|
|
Tailwind CSS color class
|
|
"""
|
|
colors = {
|
|
'active': 'bg-green-100 text-green-800',
|
|
'pre_validated': 'bg-blue-100 text-blue-800',
|
|
'payment_pending': 'bg-yellow-100 text-yellow-800',
|
|
'inactive': 'bg-gray-100 text-gray-800',
|
|
'pending_email': 'bg-purple-100 text-purple-800',
|
|
'awaiting_event': 'bg-indigo-100 text-indigo-800'
|
|
}
|
|
return colors.get(status, 'bg-gray-100 text-gray-800')
|
|
|
|
|
|
def format_preview_for_display(preview_data: List[Dict], page: int = 1, page_size: int = 50) -> Dict:
|
|
"""
|
|
Format preview data for paginated display in frontend.
|
|
|
|
Args:
|
|
preview_data: Full preview data list
|
|
page: Page number (1-indexed)
|
|
page_size: Number of rows per page
|
|
|
|
Returns:
|
|
Dictionary with paginated data and metadata
|
|
"""
|
|
total_pages = (len(preview_data) + page_size - 1) // page_size
|
|
start_idx = (page - 1) * page_size
|
|
end_idx = start_idx + page_size
|
|
|
|
return {
|
|
'page': page,
|
|
'page_size': page_size,
|
|
'total_pages': total_pages,
|
|
'total_rows': len(preview_data),
|
|
'rows': preview_data[start_idx:end_idx]
|
|
}
|
|
|
|
|
|
# ============================================================================
|
|
# Module Initialization
|
|
# ============================================================================
|
|
|
|
# Import pandas for CSV processing
|
|
try:
|
|
import pandas as pd
|
|
except ImportError:
|
|
logger.error("pandas library not found. Please install: pip install pandas")
|
|
raise
|
|
|
|
logger.info("WordPress parser module loaded successfully")
|