#!/usr/bin/env python3 """ Database Integrity Checker Compares schema and data integrity between development and production databases """ import sys from sqlalchemy import create_engine, inspect, text from sqlalchemy.engine import reflection import json from collections import defaultdict # Database URLs DEV_DB = "postgresql://postgres:RchhcpaUKZuZuMOvB5kwCP1weLBnAG6tNMXE5FHdk8AwCvolBMALYFVYRM7WCl9x@10.9.23.11:5001/membership_demo" PROD_DB = "postgresql://postgres:fDv3fRvMgfPueDWDUxj27NJVaynsewIdh6b2Hb28tcvG3Ew6mhscASg2kulx4tr7@10.9.23.11:54321/loaf_new" def get_db_info(engine, label): """Get comprehensive database information""" inspector = inspect(engine) info = { 'label': label, 'tables': {}, 'indexes': {}, 'foreign_keys': {}, 'sequences': [], 'enums': [] } # Get all table names table_names = inspector.get_table_names() for table_name in table_names: # Get columns columns = inspector.get_columns(table_name) info['tables'][table_name] = { 'columns': { col['name']: { 'type': str(col['type']), 'nullable': col['nullable'], 'default': str(col.get('default', None)), 'autoincrement': col.get('autoincrement', False) } for col in columns }, 'column_count': len(columns) } # Get primary keys pk = inspector.get_pk_constraint(table_name) info['tables'][table_name]['primary_key'] = pk.get('constrained_columns', []) # Get indexes indexes = inspector.get_indexes(table_name) info['indexes'][table_name] = [ { 'name': idx['name'], 'columns': idx['column_names'], 'unique': idx['unique'] } for idx in indexes ] # Get foreign keys fks = inspector.get_foreign_keys(table_name) info['foreign_keys'][table_name] = [ { 'name': fk.get('name'), 'columns': fk['constrained_columns'], 'referred_table': fk['referred_table'], 'referred_columns': fk['referred_columns'] } for fk in fks ] # Get sequences with engine.connect() as conn: result = conn.execute(text(""" SELECT sequence_name FROM information_schema.sequences WHERE sequence_schema = 'public' """)) info['sequences'] = [row[0] for row in result] # Get enum types result = conn.execute(text(""" SELECT t.typname as enum_name, array_agg(e.enumlabel ORDER BY e.enumsortorder) as enum_values FROM pg_type t JOIN pg_enum e ON t.oid = e.enumtypid WHERE t.typnamespace = (SELECT oid FROM pg_namespace WHERE nspname = 'public') GROUP BY t.typname """)) info['enums'] = {row[0]: row[1] for row in result} return info def compare_tables(dev_info, prod_info): """Compare tables between databases""" dev_tables = set(dev_info['tables'].keys()) prod_tables = set(prod_info['tables'].keys()) print("\n" + "="*80) print("TABLE COMPARISON") print("="*80) # Tables only in dev dev_only = dev_tables - prod_tables if dev_only: print(f"\n❌ Tables only in DEV ({len(dev_only)}):") for table in sorted(dev_only): print(f" - {table}") # Tables only in prod prod_only = prod_tables - dev_tables if prod_only: print(f"\n❌ Tables only in PROD ({len(prod_only)}):") for table in sorted(prod_only): print(f" - {table}") # Common tables common = dev_tables & prod_tables print(f"\n✅ Common tables: {len(common)}") return common def compare_columns(dev_info, prod_info, common_tables): """Compare columns for common tables""" print("\n" + "="*80) print("COLUMN COMPARISON") print("="*80) issues = [] for table in sorted(common_tables): dev_cols = set(dev_info['tables'][table]['columns'].keys()) prod_cols = set(prod_info['tables'][table]['columns'].keys()) dev_only = dev_cols - prod_cols prod_only = prod_cols - dev_cols if dev_only or prod_only: print(f"\n⚠️ Table '{table}' has column differences:") if dev_only: print(f" Columns only in DEV: {', '.join(sorted(dev_only))}") issues.append(f"{table}: DEV-only columns: {', '.join(dev_only)}") if prod_only: print(f" Columns only in PROD: {', '.join(sorted(prod_only))}") issues.append(f"{table}: PROD-only columns: {', '.join(prod_only)}") # Compare column types for common columns common_cols = dev_cols & prod_cols for col in common_cols: dev_col = dev_info['tables'][table]['columns'][col] prod_col = prod_info['tables'][table]['columns'][col] if dev_col['type'] != prod_col['type']: print(f" ⚠️ Column '{col}' type mismatch:") print(f" DEV: {dev_col['type']}") print(f" PROD: {prod_col['type']}") issues.append(f"{table}.{col}: Type mismatch") if dev_col['nullable'] != prod_col['nullable']: print(f" ⚠️ Column '{col}' nullable mismatch:") print(f" DEV: {dev_col['nullable']}") print(f" PROD: {prod_col['nullable']}") issues.append(f"{table}.{col}: Nullable mismatch") if not issues: print("\n✅ All columns match between DEV and PROD") return issues def compare_enums(dev_info, prod_info): """Compare enum types""" print("\n" + "="*80) print("ENUM TYPE COMPARISON") print("="*80) dev_enums = set(dev_info['enums'].keys()) prod_enums = set(prod_info['enums'].keys()) dev_only = dev_enums - prod_enums prod_only = prod_enums - dev_enums issues = [] if dev_only: print(f"\n❌ Enums only in DEV: {', '.join(sorted(dev_only))}") issues.extend([f"Enum '{e}' only in DEV" for e in dev_only]) if prod_only: print(f"\n❌ Enums only in PROD: {', '.join(sorted(prod_only))}") issues.extend([f"Enum '{e}' only in PROD" for e in prod_only]) # Compare enum values for common enums common = dev_enums & prod_enums for enum_name in sorted(common): dev_values = set(dev_info['enums'][enum_name]) prod_values = set(prod_info['enums'][enum_name]) if dev_values != prod_values: print(f"\n⚠️ Enum '{enum_name}' values differ:") print(f" DEV: {', '.join(sorted(dev_values))}") print(f" PROD: {', '.join(sorted(prod_values))}") issues.append(f"Enum '{enum_name}' values differ") if not issues: print("\n✅ All enum types match") return issues def check_migration_history(dev_engine, prod_engine): """Check Alembic migration history""" print("\n" + "="*80) print("MIGRATION HISTORY") print("="*80) try: with dev_engine.connect() as dev_conn: dev_result = dev_conn.execute(text("SELECT version_num FROM alembic_version")) dev_version = dev_result.fetchone() dev_version = dev_version[0] if dev_version else None with prod_engine.connect() as prod_conn: prod_result = prod_conn.execute(text("SELECT version_num FROM alembic_version")) prod_version = prod_result.fetchone() prod_version = prod_version[0] if prod_version else None print(f"\nDEV migration version: {dev_version}") print(f"PROD migration version: {prod_version}") if dev_version == prod_version: print("✅ Migration versions match") return [] else: print("❌ Migration versions DO NOT match") return ["Migration versions differ"] except Exception as e: print(f"⚠️ Could not check migration history: {str(e)}") return [f"Migration check failed: {str(e)}"] def get_row_counts(engine, tables): """Get row counts for all tables""" counts = {} with engine.connect() as conn: for table in tables: result = conn.execute(text(f"SELECT COUNT(*) FROM {table}")) counts[table] = result.fetchone()[0] return counts def compare_data_counts(dev_engine, prod_engine, common_tables): """Compare row counts between databases""" print("\n" + "="*80) print("DATA ROW COUNTS") print("="*80) print("\nGetting DEV row counts...") dev_counts = get_row_counts(dev_engine, common_tables) print("Getting PROD row counts...") prod_counts = get_row_counts(prod_engine, common_tables) print(f"\n{'Table':<30} {'DEV':<15} {'PROD':<15} {'Diff':<15}") print("-" * 75) for table in sorted(common_tables): dev_count = dev_counts[table] prod_count = prod_counts[table] diff = dev_count - prod_count diff_str = f"+{diff}" if diff > 0 else str(diff) status = "⚠️ " if abs(diff) > 0 else "✅" print(f"{status} {table:<28} {dev_count:<15} {prod_count:<15} {diff_str:<15}") def main(): print("\n" + "="*80) print("DATABASE INTEGRITY CHECKER") print("="*80) print(f"\nDEV: {DEV_DB.split('@')[1]}") # Hide password print(f"PROD: {PROD_DB.split('@')[1]}") try: # Connect to databases print("\n🔌 Connecting to databases...") dev_engine = create_engine(DEV_DB) prod_engine = create_engine(PROD_DB) # Test connections with dev_engine.connect() as conn: conn.execute(text("SELECT 1")) print("✅ Connected to DEV database") with prod_engine.connect() as conn: conn.execute(text("SELECT 1")) print("✅ Connected to PROD database") # Get database info print("\n📊 Gathering database information...") dev_info = get_db_info(dev_engine, "DEV") prod_info = get_db_info(prod_engine, "PROD") # Run comparisons all_issues = [] common_tables = compare_tables(dev_info, prod_info) column_issues = compare_columns(dev_info, prod_info, common_tables) all_issues.extend(column_issues) enum_issues = compare_enums(dev_info, prod_info) all_issues.extend(enum_issues) migration_issues = check_migration_history(dev_engine, prod_engine) all_issues.extend(migration_issues) compare_data_counts(dev_engine, prod_engine, common_tables) # Summary print("\n" + "="*80) print("SUMMARY") print("="*80) if all_issues: print(f"\n❌ Found {len(all_issues)} integrity issues:") for i, issue in enumerate(all_issues, 1): print(f" {i}. {issue}") print("\n⚠️ Databases are NOT in sync!") sys.exit(1) else: print("\n✅ Databases are in sync!") print("✅ No integrity issues found") sys.exit(0) except Exception as e: print(f"\n❌ Error: {str(e)}") import traceback traceback.print_exc() sys.exit(1) if __name__ == "__main__": main()