Files
membership-be/wordpress_parser.py
Andika 1988787a1f Template-Based CSV Import System with R2 Storage
Solution: Updated backend/r2_storage.py:
  - Added ALLOWED_CSV_TYPES for CSV file validation
  - Added upload_bytes() method for uploading raw bytes to R2
  - Added download_file() method for retrieving files from R2
  - Added delete_multiple() method for bulk file deletion

  Comprehensive upload endpoint now stores CSVs in R2:
  r2_storage = get_r2_storage()
  for file_type, (content, filename) in file_contents.items():
      _, r2_key, _ = await r2_storage.upload_bytes(
          content=content,
          folder=f"imports/{job_id}",
          filename=f"{file_type}_{filename}",
          content_type='text/csv'
      )
      r2_keys[file_type] = r2_key

  ---
  2. Stripe Transaction ID Tracking

  Solution: Updated subscription and donation imports to capture Stripe metadata:

  Subscription fields:
  - stripe_subscription_id
  - stripe_customer_id
  - stripe_payment_intent_id
  - stripe_invoice_id
  - stripe_charge_id
  - stripe_receipt_url
  - card_last4, card_brand, payment_method

  Donation fields:
  - stripe_payment_intent_id
  - stripe_charge_id
  - stripe_receipt_url
  - card_last4, card_brand

  ---
  3. Fixed JSON Serialization Error

  Problem: Object of type datetime is not JSON serializable when saving import metadata.

  Solution: Added serialize_for_json() helper in backend/server.py:
  def serialize_for_json(obj):
      """Recursively convert datetime objects to ISO strings for JSON serialization."""
      if isinstance(obj, (datetime, date)):
          return obj.isoformat()
      elif isinstance(obj, dict):
          return {k: serialize_for_json(v) for k, v in obj.items()}
      elif isinstance(obj, list):
          return [serialize_for_json(item) for item in obj]
      # ... handles other types

  ---
  4. Fixed Route Ordering (401 Unauthorized)

  Problem: /admin/import/comprehensive/upload returned 401 because FastAPI matched "comprehensive" as a {job_id} parameter.

  Solution: Moved comprehensive import routes BEFORE generic {job_id} routes in backend/server.py:
  # Correct order:
  @app.post("/api/admin/import/comprehensive/upload")  # Specific route FIRST
  # ... other comprehensive routes ...

  @app.get("/api/admin/import/{job_id}/preview")  # Generic route AFTER

  ---
  5. Improved Date Parsing

  Solution: Added additional date formats to backend/wordpress_parser.py:
  formats = [
      '%m/%d/%Y', '%Y-%m-%d', '%d/%m/%Y', '%B %d, %Y', '%b %d, %Y',
      '%Y-%m-%d %H:%M:%S',
      '%m/%Y',      # Month/Year: 01/2020
      '%m-%Y',      # Month-Year: 01-2020
      '%b-%Y',      # Short month-Year: Jan-2020
      '%B-%Y',      # Full month-Year: January-2020
  ]
2026-02-04 22:50:36 +07:00

1245 lines
44 KiB
Python

"""
WordPress CSV Parser Module
This module provides utilities for parsing WordPress user export CSV files
and transforming them into LOAF platform-compatible data structures.
Key Features:
- Parse PHP serialized data (WordPress capabilities)
- Map WordPress roles to LOAF roles and statuses
- Validate and standardize user data (DOB, phone numbers)
- Generate smart status suggestions based on approval and subscription data
- Comprehensive data quality analysis and error reporting
- Multi-file import support (Users, Members, Payments CSVs)
- Field mapping based on Meta Name Reference document
Author: Claude Code
Date: 2025-12-24
Updated: 2026-02-03 - Added comprehensive multi-file import support
"""
import csv
import re
import logging
from datetime import datetime
from typing import Dict, List, Optional, Tuple, Any
import phpserialize
import pandas as pd
logger = logging.getLogger(__name__)
# ============================================================================
# Meta Name Reference Field Mapping (from client's WordPress export)
# ============================================================================
# Maps WordPress meta names to our database fields
# Format: 'wordpress_meta_name': ('db_field', 'field_type', 'parser_function')
META_FIELD_MAPPING = {
# Basic user info
'first_name': ('first_name', 'string', None),
'last_name': ('last_name', 'string', None),
'user_email': ('email', 'string', 'lowercase'),
'user_login': ('username', 'string', None), # For reference only
'address': ('address', 'string', None),
'city': ('city', 'string', None),
'state': ('state', 'string', None),
'zipcode': ('zipcode', 'string', None),
'cell_phone': ('phone', 'string', 'phone'),
'date_of_birth': ('date_of_birth', 'date', 'date_mmddyyyy'),
# Partner info
'partner_first_name': ('partner_first_name', 'string', None),
'partner_last_name': ('partner_last_name', 'string', None),
'partner_membership_status': ('partner_is_member', 'boolean', 'yes_no'),
'partner_membership_consideration': ('partner_plan_to_become_member', 'boolean', 'yes_no'),
# Newsletter preferences
'newsletter_consent': ('newsletter_subscribed', 'boolean', 'yes_no'),
'newsletter_checklist': ('newsletter_preferences', 'multi_value', 'newsletter_checklist'),
# Referral and lead sources
'member_referral': ('referred_by_member_name', 'string', None),
'referral_source': ('lead_sources', 'multi_value', 'lead_sources'),
# Volunteer interests
'volunteer_checklist': ('volunteer_interests', 'multi_value', 'volunteer_checklist'),
# Scholarship
'scholarship_request': ('scholarship_requested', 'boolean', 'yes_no'),
'scholarship_reason': ('scholarship_reason', 'string', None),
# Directory settings
'members_directory_filter': ('show_in_directory', 'boolean', 'yes_no'),
'md_display_name': ('custom_registration_data.directory_display_name', 'custom', None),
'md_email': ('directory_email', 'string', None),
'description': ('directory_bio', 'string', None),
'md_adress': ('directory_address', 'string', None), # Note: typo in WordPress
'md_phone': ('directory_phone', 'string', None),
'md_dob': ('directory_dob', 'date', 'date_mmddyyyy'),
'md_partner_name': ('directory_partner_name', 'string', None),
'md_avatar': ('profile_photo_url', 'string', None),
# Metadata
'member_since': ('member_since', 'date', 'date_various'),
'user_registered': ('wordpress_registered_date', 'datetime', 'datetime_mysql'),
'ID': ('wordpress_user_id', 'integer', None),
# Stripe info (from WordPress)
'pms_stripe_customer_id': ('stripe_customer_id', 'string', None),
}
# Newsletter checklist option mapping
NEWSLETTER_CHECKLIST_OPTIONS = {
'name': 'newsletter_publish_name',
'photo': 'newsletter_publish_photo',
'birthday': 'newsletter_publish_birthday',
'none': 'newsletter_publish_none',
# Handle various WordPress stored formats
'my name': 'newsletter_publish_name',
'my photo': 'newsletter_publish_photo',
'my birthday': 'newsletter_publish_birthday',
}
# Volunteer interests mapping (WordPress values to our format)
VOLUNTEER_INTERESTS_MAP = {
'events': 'Events',
'fundraising': 'Fundraising',
'communications': 'Communications',
'membership': 'Membership',
'board': 'Board of Directors',
'other': 'Other',
# Handle various WordPress formats
'help with events': 'Events',
'help with fundraising': 'Fundraising',
'help with communications': 'Communications',
'help with membership': 'Membership',
'serve on the board': 'Board of Directors',
}
# Lead sources mapping
LEAD_SOURCES_MAP = {
'current member': 'Current member',
'friend': 'Friend',
'outsmart magazine': 'OutSmart Magazine',
'outsmart': 'OutSmart Magazine',
'search engine': 'Search engine (Google etc.)',
'google': 'Search engine (Google etc.)',
'known about loaf': "I've known about LOAF for a long time",
'long time': "I've known about LOAF for a long time",
'other': 'Other',
}
# ============================================================================
# WordPress Role Mapping Configuration
# ============================================================================
ROLE_MAPPING = {
# WordPress admin roles → LOAF admin roles (auto-active)
'administrator': ('superadmin', 'active'),
'loaf_admin': ('admin', 'active'),
'loaf_treasure': ('finance', 'active'),
'loaf_communication': ('admin', 'active'),
# WordPress member roles → LOAF member role (status from approval)
'pms_subscription_plan_63': ('member', None), # Status determined by approval
'registered': ('guest', None), # Default WordPress role
# Fallback for unknown roles
'__default__': ('guest', None)
}
# Role priority order (higher index = higher priority)
ROLE_PRIORITY = [
'registered',
'pms_subscription_plan_63',
'loaf_communication',
'loaf_treasure',
'loaf_admin',
'administrator'
]
# ============================================================================
# PHP Serialization Parsing
# ============================================================================
def parse_php_serialized(data: str) -> List[str]:
"""
Parse WordPress PHP serialized capabilities string.
WordPress stores user capabilities as serialized PHP arrays like:
a:1:{s:10:"registered";b:1;}
a:2:{s:10:"registered";b:1;s:24:"pms_subscription_plan_63";b:1;}
Args:
data: PHP serialized string
Returns:
List of role names (e.g., ['registered', 'pms_subscription_plan_63'])
Examples:
>>> parse_php_serialized('a:1:{s:10:"registered";b:1;}')
['registered']
>>> parse_php_serialized('a:2:{s:10:"registered";b:1;s:24:"pms_subscription_plan_63";b:1;}')
['registered', 'pms_subscription_plan_63']
"""
if not data or pd.isna(data):
return []
try:
# Use phpserialize library to parse
parsed = phpserialize.loads(data.encode('utf-8'))
# Extract role names (keys where value is True)
if isinstance(parsed, dict):
roles = [key.decode('utf-8') if isinstance(key, bytes) else key
for key, value in parsed.items() if value]
return roles
return []
except Exception as e:
logger.warning(f"Failed to parse PHP serialized data: {data[:50]}... Error: {str(e)}")
return []
# ============================================================================
# Role and Status Mapping
# ============================================================================
def map_wordpress_role(wp_roles: List[str]) -> Tuple[str, Optional[str]]:
"""
Map WordPress roles to LOAF role and suggested status.
Priority logic:
1. If user has any admin role → corresponding LOAF admin role with 'active' status
2. If user has subscription → 'member' role (status from approval)
3. Otherwise → 'guest' role (status from approval)
Args:
wp_roles: List of WordPress role names
Returns:
Tuple of (loaf_role, suggested_status)
- loaf_role: One of: superadmin, admin, finance, member, guest
- suggested_status: One of: active, pre_validated, payment_pending, None (determined by approval)
Examples:
>>> map_wordpress_role(['loaf_admin'])
('admin', 'active')
>>> map_wordpress_role(['loaf_treasure'])
('finance', 'active')
>>> map_wordpress_role(['pms_subscription_plan_63', 'registered'])
('member', None)
>>> map_wordpress_role(['registered'])
('guest', None)
"""
if not wp_roles:
return ROLE_MAPPING['__default__']
# Sort roles by priority (highest priority last)
prioritized_roles = sorted(
wp_roles,
key=lambda r: ROLE_PRIORITY.index(r) if r in ROLE_PRIORITY else -1
)
# Map highest priority role
highest_role = prioritized_roles[-1] if prioritized_roles else 'registered'
return ROLE_MAPPING.get(highest_role, ROLE_MAPPING['__default__'])
def suggest_status(approval_status: str, has_subscription: bool, wordpress_role: str = 'guest') -> str:
"""
Suggest LOAF user status based on WordPress approval and subscription data.
Logic:
1. Admin roles (loaf_admin, loaf_treasure, administrator) → always 'active'
2. approved + subscription → 'active'
3. approved without subscription → 'pre_validated'
4. pending → 'payment_pending'
5. Other/empty → 'pre_validated'
Args:
approval_status: WordPress approval status (approved, pending, unapproved, etc.)
has_subscription: Whether user has pms_subscription_plan_63 role
wordpress_role: LOAF role mapped from WordPress (for admin check)
Returns:
Suggested LOAF status: active, pre_validated, payment_pending, or inactive
Examples:
>>> suggest_status('approved', True, 'member')
'active'
>>> suggest_status('approved', False, 'member')
'pre_validated'
>>> suggest_status('pending', True, 'member')
'payment_pending'
>>> suggest_status('', False, 'admin')
'active'
"""
# Admin roles are always active
if wordpress_role in ('superadmin', 'admin', 'finance'):
return 'active'
# Normalize approval status
approval = (approval_status or '').lower().strip()
if approval == 'approved':
return 'active' if has_subscription else 'pre_validated'
elif approval == 'pending':
return 'payment_pending'
elif approval == 'unapproved':
return 'inactive'
else:
# Empty or unknown approval status
return 'pre_validated'
# ============================================================================
# Data Validation and Standardization
# ============================================================================
def standardize_phone(phone: str) -> str:
"""
Standardize phone number by extracting digits only.
Removes all non-digit characters:
- (713) 560-7850 → 7135607850
- 713-725-8902 → 7137258902
- Empty/None → 0000000000 (fallback)
Args:
phone: Phone number in any format
Returns:
10-digit phone number string (or 0000000000 if invalid)
Examples:
>>> standardize_phone('(713) 560-7850')
'7135607850'
>>> standardize_phone('713-725-8902')
'7137258902'
>>> standardize_phone('')
'0000000000'
"""
if not phone or pd.isna(phone):
return '0000000000'
# Extract all digits
digits = re.sub(r'\D', '', str(phone))
# Return 10 digits or fallback
if len(digits) == 10:
return digits
elif len(digits) == 11 and digits[0] == '1':
# Remove leading 1 (US country code)
return digits[1:]
else:
logger.warning(f"Invalid phone format: {phone} (extracted: {digits})")
return '0000000000'
def validate_dob(dob_str: str) -> Tuple[Optional[datetime], Optional[str]]:
"""
Validate and parse date of birth.
Validation rules:
- Must be in MM/DD/YYYY format
- Year must be between 1900 and current year
- Cannot be in the future
- Reject year 0000 or 2025+ (data quality issues in WordPress export)
Args:
dob_str: Date of birth string in MM/DD/YYYY format
Returns:
Tuple of (parsed_datetime, warning_message)
- parsed_datetime: datetime object if valid, None if invalid
- warning_message: Descriptive error message if invalid, None if valid
Examples:
>>> validate_dob('08/02/1962')
(datetime(1962, 8, 2), None)
>>> validate_dob('08/02/0000')
(None, 'Invalid year: 0000')
>>> validate_dob('08/02/2025')
(None, 'Date is in the future')
"""
if not dob_str or pd.isna(dob_str):
return None, 'Missing date of birth'
try:
# Parse MM/DD/YYYY format
parsed = datetime.strptime(str(dob_str).strip(), '%m/%d/%Y')
# Validate year range
if parsed.year == 0:
return None, 'Invalid year: 0000 (data quality issue)'
elif parsed.year < 1900:
return None, f'Year too old: {parsed.year} (likely invalid)'
elif parsed.year > datetime.now().year:
return None, f'Date is in the future: {parsed.year}'
elif parsed > datetime.now():
return None, 'Date is in the future'
return parsed, None
except ValueError as e:
return None, f'Invalid date format: {dob_str} (expected MM/DD/YYYY)'
# ============================================================================
# Enhanced Field Parsers for Meta Name Reference
# ============================================================================
def parse_boolean_yes_no(value: Any) -> bool:
"""
Parse yes/no style boolean values from WordPress.
Handles: yes, no, true, false, 1, 0, checked, unchecked
"""
if value is None or (isinstance(value, float) and pd.isna(value)):
return False
str_val = str(value).lower().strip()
return str_val in ('yes', 'true', '1', 'checked', 'on', 'y')
def parse_date_various(date_str: Any) -> Optional[datetime]:
"""
Parse dates in various formats commonly found in WordPress exports.
Handles:
- MM/DD/YYYY (US format)
- YYYY-MM-DD (ISO format)
- DD/MM/YYYY (EU format - attempted if US fails)
- Month DD, YYYY (e.g., "January 15, 2020")
"""
if date_str is None or (isinstance(date_str, float) and pd.isna(date_str)):
return None
date_str = str(date_str).strip()
if not date_str or date_str.lower() == 'nan':
return None
# Try various formats
formats = [
'%m/%d/%Y', # US: 01/15/2020
'%Y-%m-%d', # ISO: 2020-01-15
'%d/%m/%Y', # EU: 15/01/2020
'%B %d, %Y', # Full: January 15, 2020
'%b %d, %Y', # Short: Jan 15, 2020
'%Y-%m-%d %H:%M:%S', # MySQL datetime
'%m/%Y', # Month/Year: 01/2020
'%m-%Y', # Month-Year: 01-2020
'%b-%Y', # Short month-Year: Jan-2020
'%B-%Y', # Full month-Year: January-2020
]
for fmt in formats:
try:
parsed = datetime.strptime(date_str, fmt)
# Validate year range
if 1900 <= parsed.year <= datetime.now().year + 1:
return parsed
except ValueError:
continue
# Only log warning for strings that look like dates
if date_str and len(date_str) > 3:
logger.debug(f"Could not parse date: {date_str}")
return None
def parse_datetime_mysql(dt_str: Any) -> Optional[datetime]:
"""Parse MySQL datetime format: YYYY-MM-DD HH:MM:SS"""
if dt_str is None or (isinstance(dt_str, float) and pd.isna(dt_str)):
return None
try:
return datetime.strptime(str(dt_str).strip(), '%Y-%m-%d %H:%M:%S')
except ValueError:
return parse_date_various(dt_str)
def parse_newsletter_checklist(value: Any) -> Dict[str, bool]:
"""
Parse newsletter checklist multi-value field.
WordPress stores this as comma-separated or PHP serialized values.
Returns dict mapping to our newsletter_publish_* fields.
"""
result = {
'newsletter_publish_name': False,
'newsletter_publish_photo': False,
'newsletter_publish_birthday': False,
'newsletter_publish_none': False,
}
if value is None or (isinstance(value, float) and pd.isna(value)):
return result
str_val = str(value).lower().strip()
if not str_val or str_val == 'nan':
return result
# Try PHP serialized first
if str_val.startswith('a:'):
try:
parsed = phpserialize.loads(str_val.encode('utf-8'))
if isinstance(parsed, dict):
for key in parsed.keys():
key_str = key.decode('utf-8') if isinstance(key, bytes) else str(key)
key_lower = key_str.lower()
for match_key, field in NEWSLETTER_CHECKLIST_OPTIONS.items():
if match_key in key_lower:
result[field] = True
return result
except Exception:
pass
# Try comma-separated values
items = [item.strip().lower() for item in str_val.split(',')]
for item in items:
for match_key, field in NEWSLETTER_CHECKLIST_OPTIONS.items():
if match_key in item:
result[field] = True
return result
def parse_volunteer_checklist(value: Any) -> List[str]:
"""
Parse volunteer interests checklist.
Returns list of standardized volunteer interest labels.
"""
if value is None or (isinstance(value, float) and pd.isna(value)):
return []
str_val = str(value).lower().strip()
if not str_val or str_val == 'nan':
return []
interests = []
# Try PHP serialized first
if str_val.startswith('a:'):
try:
parsed = phpserialize.loads(str_val.encode('utf-8'))
if isinstance(parsed, dict):
for key in parsed.keys():
key_str = key.decode('utf-8') if isinstance(key, bytes) else str(key)
key_lower = key_str.lower()
for match_key, label in VOLUNTEER_INTERESTS_MAP.items():
if match_key in key_lower and label not in interests:
interests.append(label)
return interests
except Exception:
pass
# Try comma-separated values
items = [item.strip().lower() for item in str_val.split(',')]
for item in items:
for match_key, label in VOLUNTEER_INTERESTS_MAP.items():
if match_key in item and label not in interests:
interests.append(label)
return interests
def parse_lead_sources(value: Any) -> List[str]:
"""
Parse referral/lead sources field.
Returns list of standardized lead source labels.
"""
if value is None or (isinstance(value, float) and pd.isna(value)):
return []
str_val = str(value).lower().strip()
if not str_val or str_val == 'nan':
return []
sources = []
# Try PHP serialized first
if str_val.startswith('a:'):
try:
parsed = phpserialize.loads(str_val.encode('utf-8'))
if isinstance(parsed, dict):
for key in parsed.keys():
key_str = key.decode('utf-8') if isinstance(key, bytes) else str(key)
key_lower = key_str.lower()
for match_key, label in LEAD_SOURCES_MAP.items():
if match_key in key_lower and label not in sources:
sources.append(label)
return sources
except Exception:
pass
# Try comma-separated values
items = [item.strip().lower() for item in str_val.split(',')]
for item in items:
matched = False
for match_key, label in LEAD_SOURCES_MAP.items():
if match_key in item and label not in sources:
sources.append(label)
matched = True
break
# If no match, add as "Other" with original value
if not matched and item:
sources.append('Other')
return sources
def transform_csv_row_to_user_data(row: Dict[str, Any], existing_emails: set = None) -> Dict[str, Any]:
"""
Transform a CSV row to user data dictionary using Meta Name Reference mapping.
Args:
row: Dictionary of CSV column values
existing_emails: Set of emails already in database (for duplicate check)
Returns:
Dictionary with:
- user_data: Fields that map to User model
- custom_data: Fields for custom_registration_data JSON
- newsletter_prefs: Newsletter preference booleans
- warnings: List of warning messages
- errors: List of error messages
"""
user_data = {}
custom_data = {}
newsletter_prefs = {}
warnings = []
errors = []
# Process each mapped field
for csv_field, (db_field, field_type, parser) in META_FIELD_MAPPING.items():
value = row.get(csv_field)
# Skip if no value
if value is None or (isinstance(value, float) and pd.isna(value)):
continue
try:
# Parse based on field type
if field_type == 'string':
if parser == 'lowercase':
parsed_value = str(value).strip().lower()
elif parser == 'phone':
parsed_value = standardize_phone(value)
if parsed_value == '0000000000':
warnings.append(f'Invalid phone: {value}')
else:
parsed_value = str(value).strip() if value else None
elif field_type == 'integer':
parsed_value = int(value) if value else None
elif field_type == 'boolean':
parsed_value = parse_boolean_yes_no(value)
elif field_type == 'date':
if parser == 'date_mmddyyyy':
parsed_value, warning = validate_dob(value)
if warning:
warnings.append(warning)
else:
parsed_value = parse_date_various(value)
elif field_type == 'datetime':
parsed_value = parse_datetime_mysql(value)
elif field_type == 'multi_value':
if parser == 'newsletter_checklist':
newsletter_prefs = parse_newsletter_checklist(value)
continue # Handled separately
elif parser == 'volunteer_checklist':
parsed_value = parse_volunteer_checklist(value)
elif parser == 'lead_sources':
parsed_value = parse_lead_sources(value)
else:
parsed_value = [str(value)]
elif field_type == 'custom':
# Store in custom_registration_data
custom_field = db_field.replace('custom_registration_data.', '')
custom_data[custom_field] = str(value).strip() if value else None
continue
else:
parsed_value = value
# Store in appropriate location
if parsed_value is not None:
user_data[db_field] = parsed_value
except Exception as e:
warnings.append(f'Error parsing {csv_field}: {str(e)}')
# Check for required fields
if not user_data.get('email'):
errors.append('Missing email address')
elif existing_emails and user_data['email'] in existing_emails:
errors.append('Email already exists in database')
if not user_data.get('first_name'):
warnings.append('Missing first name')
if not user_data.get('last_name'):
warnings.append('Missing last name')
return {
'user_data': user_data,
'custom_data': custom_data,
'newsletter_prefs': newsletter_prefs,
'warnings': warnings,
'errors': errors
}
# ============================================================================
# Members CSV Parser (Subscription Data)
# ============================================================================
def parse_members_csv(file_path: str) -> Dict[str, Any]:
"""
Parse WordPress PMS Members export CSV for subscription data.
Args:
file_path: Path to pms-export-members CSV file
Returns:
Dictionary mapping user_email to subscription data
"""
members_data = {}
try:
df = pd.read_csv(file_path)
for _, row in df.iterrows():
email = str(row.get('user_email', '')).strip().lower()
if not email or email == 'nan':
continue
# Parse subscription dates
start_date = parse_date_various(row.get('start_date'))
expiration_date = parse_date_various(row.get('expiration_date'))
# Map subscription status
wp_status = str(row.get('status', '')).lower().strip()
if wp_status == 'active':
sub_status = 'active'
elif wp_status in ('expired', 'abandoned'):
sub_status = 'expired'
elif wp_status in ('canceled', 'cancelled'):
sub_status = 'cancelled'
else:
sub_status = 'active' # Default
# Parse payment gateway
payment_gateway = str(row.get('payment_gateway', '')).lower().strip()
if 'stripe' in payment_gateway:
payment_method = 'stripe'
elif 'paypal' in payment_gateway:
payment_method = 'paypal'
elif payment_gateway in ('manual', 'admin', ''):
payment_method = 'manual'
else:
payment_method = payment_gateway or 'manual'
members_data[email] = {
'subscription_plan_id': row.get('subscription_plan_id'),
'subscription_plan_name': row.get('subscription_plan_name'),
'start_date': start_date,
'end_date': expiration_date,
'status': sub_status,
'payment_method': payment_method,
'wordpress_user_id': row.get('user_id'),
'billing_first_name': row.get('billing_first_name'),
'billing_last_name': row.get('billing_last_name'),
'billing_address': row.get('billing_address'),
'billing_city': row.get('billing_city'),
'billing_state': row.get('billing_state'),
'billing_zip': row.get('billing_zip'),
'card_last4': row.get('billing_card_last4'),
}
except Exception as e:
logger.error(f"Error parsing members CSV: {str(e)}")
raise
return members_data
# ============================================================================
# Payments CSV Parser (Payment History)
# ============================================================================
def parse_payments_csv(file_path: str) -> Dict[str, List[Dict]]:
"""
Parse WordPress PMS Payments export CSV for payment history.
Args:
file_path: Path to pms-export-payments CSV file
Returns:
Dictionary mapping user_email to list of payment records
"""
payments_data = {}
try:
df = pd.read_csv(file_path)
for _, row in df.iterrows():
email = str(row.get('user_email', '')).strip().lower()
if not email or email == 'nan':
continue
# Parse payment date
payment_date = parse_date_various(row.get('date'))
# Parse amount (convert to cents)
amount_str = str(row.get('amount', '0')).replace('$', '').replace(',', '').strip()
try:
amount_cents = int(float(amount_str) * 100)
except (ValueError, TypeError):
amount_cents = 0
# Map payment status
wp_status = str(row.get('status', '')).lower().strip()
if wp_status == 'completed':
payment_status = 'completed'
elif wp_status in ('pending', 'processing'):
payment_status = 'pending'
elif wp_status in ('failed', 'refunded'):
payment_status = 'failed'
else:
payment_status = 'completed' # Default for historical data
payment_record = {
'payment_id': row.get('payment_id'),
'amount_cents': amount_cents,
'status': payment_status,
'date': payment_date,
'payment_gateway': row.get('payment_gateway'),
'transaction_id': row.get('transaction_id'),
'profile_id': row.get('profile_id'),
'subscription_plan_id': row.get('subscription_plan_id'),
'wordpress_user_id': row.get('user_id'),
}
if email not in payments_data:
payments_data[email] = []
payments_data[email].append(payment_record)
except Exception as e:
logger.error(f"Error parsing payments CSV: {str(e)}")
raise
return payments_data
# ============================================================================
# Comprehensive Import Analysis
# ============================================================================
def analyze_comprehensive_import(
users_csv_path: str,
members_csv_path: Optional[str] = None,
payments_csv_path: Optional[str] = None,
existing_emails: Optional[set] = None
) -> Dict[str, Any]:
"""
Analyze all CSV files for comprehensive import with cross-referencing.
Args:
users_csv_path: Path to WordPress users export CSV (required)
members_csv_path: Path to PMS members CSV (optional)
payments_csv_path: Path to PMS payments CSV (optional)
existing_emails: Set of emails already in database
Returns:
Comprehensive analysis with preview data for all files
"""
if existing_emails is None:
existing_emails = set()
result = {
'users': {'total': 0, 'valid': 0, 'warnings': 0, 'errors': 0, 'preview': []},
'members': {'total': 0, 'matched': 0, 'unmatched': 0, 'data': {}},
'payments': {'total': 0, 'matched': 0, 'total_amount_cents': 0, 'data': {}},
'summary': {
'total_users': 0,
'importable_users': 0,
'duplicate_emails': 0,
'users_with_subscriptions': 0,
'users_with_payments': 0,
'total_payment_amount': 0,
}
}
# Parse members CSV if provided
members_data = {}
if members_csv_path:
try:
members_data = parse_members_csv(members_csv_path)
result['members']['total'] = len(members_data)
result['members']['data'] = members_data
except Exception as e:
result['members']['error'] = str(e)
# Parse payments CSV if provided
payments_data = {}
if payments_csv_path:
try:
payments_data = parse_payments_csv(payments_csv_path)
result['payments']['total'] = sum(len(p) for p in payments_data.values())
result['payments']['data'] = payments_data
result['payments']['total_amount_cents'] = sum(
sum(p['amount_cents'] for p in payments)
for payments in payments_data.values()
)
except Exception as e:
result['payments']['error'] = str(e)
# Parse users CSV
try:
df = pd.read_csv(users_csv_path)
result['users']['total'] = len(df)
seen_emails = set()
total_warnings = 0
total_errors = 0
for idx, row in df.iterrows():
row_dict = row.to_dict()
transformed = transform_csv_row_to_user_data(row_dict, existing_emails)
email = transformed['user_data'].get('email', '').lower()
# Check for CSV duplicates
if email in seen_emails:
transformed['errors'].append(f'Duplicate email in CSV')
elif email:
seen_emails.add(email)
# Cross-reference with members data
subscription_data = members_data.get(email)
if subscription_data:
result['members']['matched'] += 1
# Cross-reference with payments data
payment_records = payments_data.get(email, [])
if payment_records:
result['payments']['matched'] += 1
# Parse WordPress roles for role/status suggestion
wp_capabilities = row.get('wp_capabilities', '')
wp_roles = parse_php_serialized(wp_capabilities)
loaf_role, role_status = map_wordpress_role(wp_roles)
# Determine status
approval_status = str(row.get('wppb_approval_status', '')).strip()
has_subscription = 'pms_subscription_plan_63' in wp_roles or subscription_data is not None
if role_status:
suggested_status = role_status
else:
suggested_status = suggest_status(approval_status, has_subscription, loaf_role)
# Build preview row
preview_row = {
'row_number': idx + 1,
'email': email,
'first_name': transformed['user_data'].get('first_name', ''),
'last_name': transformed['user_data'].get('last_name', ''),
'phone': transformed['user_data'].get('phone', ''),
'date_of_birth': transformed['user_data'].get('date_of_birth').isoformat() if transformed['user_data'].get('date_of_birth') else None,
'wordpress_user_id': transformed['user_data'].get('wordpress_user_id'),
'wordpress_roles': wp_roles,
'suggested_role': loaf_role,
'suggested_status': suggested_status,
'has_subscription': has_subscription,
'subscription_data': subscription_data,
'payment_count': len(payment_records),
'total_paid_cents': sum(p['amount_cents'] for p in payment_records),
'user_data': transformed['user_data'],
'custom_data': transformed['custom_data'],
'newsletter_prefs': transformed['newsletter_prefs'],
'warnings': transformed['warnings'],
'errors': transformed['errors'],
}
result['users']['preview'].append(preview_row)
total_warnings += len(transformed['warnings'])
total_errors += len(transformed['errors'])
if not transformed['errors']:
result['users']['valid'] += 1
result['users']['warnings'] = total_warnings
result['users']['errors'] = total_errors
# Calculate unmatched members
user_emails = {p['email'] for p in result['users']['preview'] if p['email']}
result['members']['unmatched'] = len(set(members_data.keys()) - user_emails)
# Summary stats
result['summary']['total_users'] = result['users']['total']
result['summary']['importable_users'] = result['users']['valid']
result['summary']['duplicate_emails'] = len(seen_emails & existing_emails)
result['summary']['users_with_subscriptions'] = result['members']['matched']
result['summary']['users_with_payments'] = result['payments']['matched']
result['summary']['total_payment_amount'] = result['payments']['total_amount_cents']
except Exception as e:
logger.error(f"Error analyzing users CSV: {str(e)}")
result['users']['error'] = str(e)
raise
return result
# ============================================================================
# CSV Analysis and Preview Generation
# ============================================================================
def analyze_csv(file_path: str, existing_emails: Optional[set] = None) -> Dict:
"""
Analyze WordPress CSV file and generate preview data with status suggestions.
This is the main entry point for CSV processing. It:
1. Reads and parses the CSV file
2. Validates each row and generates warnings
3. Maps WordPress roles to LOAF roles
4. Suggests status for each user
5. Tracks data quality metrics
6. Checks for duplicate emails (both within CSV and against existing database)
7. Returns comprehensive analysis and preview data
Args:
file_path: Path to WordPress CSV export file
existing_emails: Set of emails already in the database (optional)
Returns:
Dictionary containing:
- total_rows: Total number of user rows
- valid_rows: Number of rows without critical errors
- warnings: Total warning count
- errors: Total critical error count
- preview_data: List of row dictionaries with suggestions
- data_quality: Dictionary of data quality metrics
Example output:
{
'total_rows': 183,
'valid_rows': 176,
'warnings': 66,
'errors': 7,
'preview_data': [
{
'row_number': 1,
'email': 'user@example.com',
'first_name': 'John',
'last_name': 'Doe',
'phone': '7135607850',
'date_of_birth': '1962-08-02',
'wordpress_roles': ['registered', 'pms_subscription_plan_63'],
'suggested_role': 'member',
'suggested_status': 'active',
'warnings': [],
'errors': []
},
...
],
'data_quality': {
'invalid_dob': 66,
'missing_phone': 7,
'duplicate_email_csv': 0,
'duplicate_email_db': 3,
'unparseable_roles': 2
}
}
"""
# Read CSV with pandas
df = pd.read_csv(file_path)
total_rows = len(df)
preview_data = []
data_quality = {
'invalid_dob': 0,
'missing_phone': 0,
'duplicate_email_csv': 0,
'duplicate_email_db': 0,
'unparseable_roles': 0,
'missing_email': 0
}
# Track seen emails for CSV duplicate detection
seen_emails = {}
# Convert existing_emails to set if provided
if existing_emails is None:
existing_emails = set()
for idx, row in df.iterrows():
row_num = idx + 1
warnings = []
errors = []
# Extract and validate email
email = str(row.get('user_email', '')).strip().lower()
if not email or email == 'nan':
errors.append('Missing email address')
data_quality['missing_email'] += 1
else:
# Check for duplicates within CSV
if email in seen_emails:
errors.append(f'Duplicate email in CSV (also in row {seen_emails[email]})')
data_quality['duplicate_email_csv'] += 1
# Check for duplicates in existing database
elif email in existing_emails:
errors.append(f'Email already exists in database')
data_quality['duplicate_email_db'] += 1
else:
seen_emails[email] = row_num
# Extract basic fields
first_name = str(row.get('first_name', '')).strip()
last_name = str(row.get('last_name', '')).strip()
# Parse and validate DOB
dob_parsed, dob_warning = validate_dob(row.get('date_of_birth'))
if dob_warning:
warnings.append(dob_warning)
data_quality['invalid_dob'] += 1
# Standardize phone
phone = standardize_phone(row.get('cell_phone'))
if phone == '0000000000':
warnings.append('Missing or invalid phone number')
data_quality['missing_phone'] += 1
# Parse WordPress roles
wp_capabilities = row.get('wp_capabilities', '')
wp_roles = parse_php_serialized(wp_capabilities)
if not wp_roles and wp_capabilities:
warnings.append('Could not parse WordPress roles')
data_quality['unparseable_roles'] += 1
# Map to LOAF role and status
loaf_role, role_suggested_status = map_wordpress_role(wp_roles)
# Determine if user has subscription
has_subscription = 'pms_subscription_plan_63' in wp_roles
# Get approval status
approval_status = str(row.get('wppb_approval_status', '')).strip()
# Suggest final status
if role_suggested_status:
# Admin roles have fixed status from role mapping
suggested_status = role_suggested_status
else:
# Regular users get status from approval logic
suggested_status = suggest_status(approval_status, has_subscription, loaf_role)
# Build preview row
preview_row = {
'row_number': row_num,
'email': email,
'first_name': first_name,
'last_name': last_name,
'phone': phone,
'date_of_birth': dob_parsed.isoformat() if dob_parsed else None,
'wordpress_user_id': int(row.get('ID', 0)) if pd.notna(row.get('ID')) else None,
'wordpress_registered': str(row.get('user_registered', '')),
'wordpress_roles': wp_roles,
'wordpress_approval_status': approval_status,
'has_subscription': has_subscription,
'suggested_role': loaf_role,
'suggested_status': suggested_status,
'warnings': warnings,
'errors': errors,
'newsletter_consent': str(row.get('newsletter_consent', '')).lower() == 'yes',
'newsletter_checklist': str(row.get('newsletter_checklist', '')).lower() == 'yes'
}
preview_data.append(preview_row)
# Calculate summary statistics
valid_rows = sum(1 for row in preview_data if not row['errors'])
total_warnings = sum(len(row['warnings']) for row in preview_data)
total_errors = sum(len(row['errors']) for row in preview_data)
return {
'total_rows': total_rows,
'valid_rows': valid_rows,
'warnings': total_warnings,
'errors': total_errors,
'preview_data': preview_data,
'data_quality': data_quality
}
# ============================================================================
# Utility Functions
# ============================================================================
def get_status_badge_color(status: str) -> str:
"""
Get appropriate badge color for status display in UI.
Args:
status: User status string
Returns:
Tailwind CSS color class
"""
colors = {
'active': 'bg-green-100 text-green-800',
'pre_validated': 'bg-blue-100 text-blue-800',
'payment_pending': 'bg-yellow-100 text-yellow-800',
'inactive': 'bg-gray-100 text-gray-800',
'pending_email': 'bg-purple-100 text-purple-800',
'awaiting_event': 'bg-indigo-100 text-indigo-800'
}
return colors.get(status, 'bg-gray-100 text-gray-800')
def format_preview_for_display(preview_data: List[Dict], page: int = 1, page_size: int = 50) -> Dict:
"""
Format preview data for paginated display in frontend.
Args:
preview_data: Full preview data list
page: Page number (1-indexed)
page_size: Number of rows per page
Returns:
Dictionary with paginated data and metadata
"""
total_pages = (len(preview_data) + page_size - 1) // page_size
start_idx = (page - 1) * page_size
end_idx = start_idx + page_size
return {
'page': page,
'page_size': page_size,
'total_pages': total_pages,
'total_rows': len(preview_data),
'rows': preview_data[start_idx:end_idx]
}
# ============================================================================
# Module Initialization
# ============================================================================
logger.info("WordPress parser module loaded successfully")