forked from andika/membership-be
346 lines
11 KiB
Python
346 lines
11 KiB
Python
#!/usr/bin/env python3
|
|
"""
|
|
Database Integrity Checker
|
|
Compares schema and data integrity between development and production databases
|
|
"""
|
|
|
|
import sys
|
|
from sqlalchemy import create_engine, inspect, text
|
|
from sqlalchemy.engine import reflection
|
|
import json
|
|
from collections import defaultdict
|
|
|
|
# Database URLs
|
|
DEV_DB = "postgresql://postgres:RchhcpaUKZuZuMOvB5kwCP1weLBnAG6tNMXE5FHdk8AwCvolBMALYFVYRM7WCl9x@10.9.23.11:5001/membership_demo"
|
|
PROD_DB = "postgresql://postgres:fDv3fRvMgfPueDWDUxj27NJVaynsewIdh6b2Hb28tcvG3Ew6mhscASg2kulx4tr7@10.9.23.11:54321/loaf_new"
|
|
|
|
def get_db_info(engine, label):
|
|
"""Get comprehensive database information"""
|
|
inspector = inspect(engine)
|
|
|
|
info = {
|
|
'label': label,
|
|
'tables': {},
|
|
'indexes': {},
|
|
'foreign_keys': {},
|
|
'sequences': [],
|
|
'enums': []
|
|
}
|
|
|
|
# Get all table names
|
|
table_names = inspector.get_table_names()
|
|
|
|
for table_name in table_names:
|
|
# Get columns
|
|
columns = inspector.get_columns(table_name)
|
|
info['tables'][table_name] = {
|
|
'columns': {
|
|
col['name']: {
|
|
'type': str(col['type']),
|
|
'nullable': col['nullable'],
|
|
'default': str(col.get('default', None)),
|
|
'autoincrement': col.get('autoincrement', False)
|
|
}
|
|
for col in columns
|
|
},
|
|
'column_count': len(columns)
|
|
}
|
|
|
|
# Get primary keys
|
|
pk = inspector.get_pk_constraint(table_name)
|
|
info['tables'][table_name]['primary_key'] = pk.get('constrained_columns', [])
|
|
|
|
# Get indexes
|
|
indexes = inspector.get_indexes(table_name)
|
|
info['indexes'][table_name] = [
|
|
{
|
|
'name': idx['name'],
|
|
'columns': idx['column_names'],
|
|
'unique': idx['unique']
|
|
}
|
|
for idx in indexes
|
|
]
|
|
|
|
# Get foreign keys
|
|
fks = inspector.get_foreign_keys(table_name)
|
|
info['foreign_keys'][table_name] = [
|
|
{
|
|
'name': fk.get('name'),
|
|
'columns': fk['constrained_columns'],
|
|
'referred_table': fk['referred_table'],
|
|
'referred_columns': fk['referred_columns']
|
|
}
|
|
for fk in fks
|
|
]
|
|
|
|
# Get sequences
|
|
with engine.connect() as conn:
|
|
result = conn.execute(text("""
|
|
SELECT sequence_name
|
|
FROM information_schema.sequences
|
|
WHERE sequence_schema = 'public'
|
|
"""))
|
|
info['sequences'] = [row[0] for row in result]
|
|
|
|
# Get enum types
|
|
result = conn.execute(text("""
|
|
SELECT t.typname as enum_name,
|
|
array_agg(e.enumlabel ORDER BY e.enumsortorder) as enum_values
|
|
FROM pg_type t
|
|
JOIN pg_enum e ON t.oid = e.enumtypid
|
|
WHERE t.typnamespace = (SELECT oid FROM pg_namespace WHERE nspname = 'public')
|
|
GROUP BY t.typname
|
|
"""))
|
|
info['enums'] = {row[0]: row[1] for row in result}
|
|
|
|
return info
|
|
|
|
def compare_tables(dev_info, prod_info):
|
|
"""Compare tables between databases"""
|
|
dev_tables = set(dev_info['tables'].keys())
|
|
prod_tables = set(prod_info['tables'].keys())
|
|
|
|
print("\n" + "="*80)
|
|
print("TABLE COMPARISON")
|
|
print("="*80)
|
|
|
|
# Tables only in dev
|
|
dev_only = dev_tables - prod_tables
|
|
if dev_only:
|
|
print(f"\n❌ Tables only in DEV ({len(dev_only)}):")
|
|
for table in sorted(dev_only):
|
|
print(f" - {table}")
|
|
|
|
# Tables only in prod
|
|
prod_only = prod_tables - dev_tables
|
|
if prod_only:
|
|
print(f"\n❌ Tables only in PROD ({len(prod_only)}):")
|
|
for table in sorted(prod_only):
|
|
print(f" - {table}")
|
|
|
|
# Common tables
|
|
common = dev_tables & prod_tables
|
|
print(f"\n✅ Common tables: {len(common)}")
|
|
|
|
return common
|
|
|
|
def compare_columns(dev_info, prod_info, common_tables):
|
|
"""Compare columns for common tables"""
|
|
print("\n" + "="*80)
|
|
print("COLUMN COMPARISON")
|
|
print("="*80)
|
|
|
|
issues = []
|
|
|
|
for table in sorted(common_tables):
|
|
dev_cols = set(dev_info['tables'][table]['columns'].keys())
|
|
prod_cols = set(prod_info['tables'][table]['columns'].keys())
|
|
|
|
dev_only = dev_cols - prod_cols
|
|
prod_only = prod_cols - dev_cols
|
|
|
|
if dev_only or prod_only:
|
|
print(f"\n⚠️ Table '{table}' has column differences:")
|
|
|
|
if dev_only:
|
|
print(f" Columns only in DEV: {', '.join(sorted(dev_only))}")
|
|
issues.append(f"{table}: DEV-only columns: {', '.join(dev_only)}")
|
|
|
|
if prod_only:
|
|
print(f" Columns only in PROD: {', '.join(sorted(prod_only))}")
|
|
issues.append(f"{table}: PROD-only columns: {', '.join(prod_only)}")
|
|
|
|
# Compare column types for common columns
|
|
common_cols = dev_cols & prod_cols
|
|
for col in common_cols:
|
|
dev_col = dev_info['tables'][table]['columns'][col]
|
|
prod_col = prod_info['tables'][table]['columns'][col]
|
|
|
|
if dev_col['type'] != prod_col['type']:
|
|
print(f" ⚠️ Column '{col}' type mismatch:")
|
|
print(f" DEV: {dev_col['type']}")
|
|
print(f" PROD: {prod_col['type']}")
|
|
issues.append(f"{table}.{col}: Type mismatch")
|
|
|
|
if dev_col['nullable'] != prod_col['nullable']:
|
|
print(f" ⚠️ Column '{col}' nullable mismatch:")
|
|
print(f" DEV: {dev_col['nullable']}")
|
|
print(f" PROD: {prod_col['nullable']}")
|
|
issues.append(f"{table}.{col}: Nullable mismatch")
|
|
|
|
if not issues:
|
|
print("\n✅ All columns match between DEV and PROD")
|
|
|
|
return issues
|
|
|
|
def compare_enums(dev_info, prod_info):
|
|
"""Compare enum types"""
|
|
print("\n" + "="*80)
|
|
print("ENUM TYPE COMPARISON")
|
|
print("="*80)
|
|
|
|
dev_enums = set(dev_info['enums'].keys())
|
|
prod_enums = set(prod_info['enums'].keys())
|
|
|
|
dev_only = dev_enums - prod_enums
|
|
prod_only = prod_enums - dev_enums
|
|
|
|
issues = []
|
|
|
|
if dev_only:
|
|
print(f"\n❌ Enums only in DEV: {', '.join(sorted(dev_only))}")
|
|
issues.extend([f"Enum '{e}' only in DEV" for e in dev_only])
|
|
|
|
if prod_only:
|
|
print(f"\n❌ Enums only in PROD: {', '.join(sorted(prod_only))}")
|
|
issues.extend([f"Enum '{e}' only in PROD" for e in prod_only])
|
|
|
|
# Compare enum values for common enums
|
|
common = dev_enums & prod_enums
|
|
for enum_name in sorted(common):
|
|
dev_values = set(dev_info['enums'][enum_name])
|
|
prod_values = set(prod_info['enums'][enum_name])
|
|
|
|
if dev_values != prod_values:
|
|
print(f"\n⚠️ Enum '{enum_name}' values differ:")
|
|
print(f" DEV: {', '.join(sorted(dev_values))}")
|
|
print(f" PROD: {', '.join(sorted(prod_values))}")
|
|
issues.append(f"Enum '{enum_name}' values differ")
|
|
|
|
if not issues:
|
|
print("\n✅ All enum types match")
|
|
|
|
return issues
|
|
|
|
def check_migration_history(dev_engine, prod_engine):
|
|
"""Check Alembic migration history"""
|
|
print("\n" + "="*80)
|
|
print("MIGRATION HISTORY")
|
|
print("="*80)
|
|
|
|
try:
|
|
with dev_engine.connect() as dev_conn:
|
|
dev_result = dev_conn.execute(text("SELECT version_num FROM alembic_version"))
|
|
dev_version = dev_result.fetchone()
|
|
dev_version = dev_version[0] if dev_version else None
|
|
|
|
with prod_engine.connect() as prod_conn:
|
|
prod_result = prod_conn.execute(text("SELECT version_num FROM alembic_version"))
|
|
prod_version = prod_result.fetchone()
|
|
prod_version = prod_version[0] if prod_version else None
|
|
|
|
print(f"\nDEV migration version: {dev_version}")
|
|
print(f"PROD migration version: {prod_version}")
|
|
|
|
if dev_version == prod_version:
|
|
print("✅ Migration versions match")
|
|
return []
|
|
else:
|
|
print("❌ Migration versions DO NOT match")
|
|
return ["Migration versions differ"]
|
|
|
|
except Exception as e:
|
|
print(f"⚠️ Could not check migration history: {str(e)}")
|
|
return [f"Migration check failed: {str(e)}"]
|
|
|
|
def get_row_counts(engine, tables):
|
|
"""Get row counts for all tables"""
|
|
counts = {}
|
|
with engine.connect() as conn:
|
|
for table in tables:
|
|
result = conn.execute(text(f"SELECT COUNT(*) FROM {table}"))
|
|
counts[table] = result.fetchone()[0]
|
|
return counts
|
|
|
|
def compare_data_counts(dev_engine, prod_engine, common_tables):
|
|
"""Compare row counts between databases"""
|
|
print("\n" + "="*80)
|
|
print("DATA ROW COUNTS")
|
|
print("="*80)
|
|
|
|
print("\nGetting DEV row counts...")
|
|
dev_counts = get_row_counts(dev_engine, common_tables)
|
|
|
|
print("Getting PROD row counts...")
|
|
prod_counts = get_row_counts(prod_engine, common_tables)
|
|
|
|
print(f"\n{'Table':<30} {'DEV':<15} {'PROD':<15} {'Diff':<15}")
|
|
print("-" * 75)
|
|
|
|
for table in sorted(common_tables):
|
|
dev_count = dev_counts[table]
|
|
prod_count = prod_counts[table]
|
|
diff = dev_count - prod_count
|
|
diff_str = f"+{diff}" if diff > 0 else str(diff)
|
|
|
|
status = "⚠️ " if abs(diff) > 0 else "✅"
|
|
print(f"{status} {table:<28} {dev_count:<15} {prod_count:<15} {diff_str:<15}")
|
|
|
|
def main():
|
|
print("\n" + "="*80)
|
|
print("DATABASE INTEGRITY CHECKER")
|
|
print("="*80)
|
|
print(f"\nDEV: {DEV_DB.split('@')[1]}") # Hide password
|
|
print(f"PROD: {PROD_DB.split('@')[1]}")
|
|
|
|
try:
|
|
# Connect to databases
|
|
print("\n🔌 Connecting to databases...")
|
|
dev_engine = create_engine(DEV_DB)
|
|
prod_engine = create_engine(PROD_DB)
|
|
|
|
# Test connections
|
|
with dev_engine.connect() as conn:
|
|
conn.execute(text("SELECT 1"))
|
|
print("✅ Connected to DEV database")
|
|
|
|
with prod_engine.connect() as conn:
|
|
conn.execute(text("SELECT 1"))
|
|
print("✅ Connected to PROD database")
|
|
|
|
# Get database info
|
|
print("\n📊 Gathering database information...")
|
|
dev_info = get_db_info(dev_engine, "DEV")
|
|
prod_info = get_db_info(prod_engine, "PROD")
|
|
|
|
# Run comparisons
|
|
all_issues = []
|
|
|
|
common_tables = compare_tables(dev_info, prod_info)
|
|
|
|
column_issues = compare_columns(dev_info, prod_info, common_tables)
|
|
all_issues.extend(column_issues)
|
|
|
|
enum_issues = compare_enums(dev_info, prod_info)
|
|
all_issues.extend(enum_issues)
|
|
|
|
migration_issues = check_migration_history(dev_engine, prod_engine)
|
|
all_issues.extend(migration_issues)
|
|
|
|
compare_data_counts(dev_engine, prod_engine, common_tables)
|
|
|
|
# Summary
|
|
print("\n" + "="*80)
|
|
print("SUMMARY")
|
|
print("="*80)
|
|
|
|
if all_issues:
|
|
print(f"\n❌ Found {len(all_issues)} integrity issues:")
|
|
for i, issue in enumerate(all_issues, 1):
|
|
print(f" {i}. {issue}")
|
|
print("\n⚠️ Databases are NOT in sync!")
|
|
sys.exit(1)
|
|
else:
|
|
print("\n✅ Databases are in sync!")
|
|
print("✅ No integrity issues found")
|
|
sys.exit(0)
|
|
|
|
except Exception as e:
|
|
print(f"\n❌ Error: {str(e)}")
|
|
import traceback
|
|
traceback.print_exc()
|
|
sys.exit(1)
|
|
|
|
if __name__ == "__main__":
|
|
main()
|