Database fix

This commit is contained in:
Koncept Kit
2026-01-05 16:56:38 +07:00
parent 669d78beb5
commit 2547758864
2 changed files with 725 additions and 0 deletions

345
check_db_integrity.py Normal file
View File

@@ -0,0 +1,345 @@
#!/usr/bin/env python3
"""
Database Integrity Checker
Compares schema and data integrity between development and production databases
"""
import sys
from sqlalchemy import create_engine, inspect, text
from sqlalchemy.engine import reflection
import json
from collections import defaultdict
# Database URLs
DEV_DB = "postgresql://postgres:RchhcpaUKZuZuMOvB5kwCP1weLBnAG6tNMXE5FHdk8AwCvolBMALYFVYRM7WCl9x@10.9.23.11:5001/membership_demo"
PROD_DB = "postgresql://postgres:fDv3fRvMgfPueDWDUxj27NJVaynsewIdh6b2Hb28tcvG3Ew6mhscASg2kulx4tr7@10.9.23.11:54321/loaf_new"
def get_db_info(engine, label):
"""Get comprehensive database information"""
inspector = inspect(engine)
info = {
'label': label,
'tables': {},
'indexes': {},
'foreign_keys': {},
'sequences': [],
'enums': []
}
# Get all table names
table_names = inspector.get_table_names()
for table_name in table_names:
# Get columns
columns = inspector.get_columns(table_name)
info['tables'][table_name] = {
'columns': {
col['name']: {
'type': str(col['type']),
'nullable': col['nullable'],
'default': str(col.get('default', None)),
'autoincrement': col.get('autoincrement', False)
}
for col in columns
},
'column_count': len(columns)
}
# Get primary keys
pk = inspector.get_pk_constraint(table_name)
info['tables'][table_name]['primary_key'] = pk.get('constrained_columns', [])
# Get indexes
indexes = inspector.get_indexes(table_name)
info['indexes'][table_name] = [
{
'name': idx['name'],
'columns': idx['column_names'],
'unique': idx['unique']
}
for idx in indexes
]
# Get foreign keys
fks = inspector.get_foreign_keys(table_name)
info['foreign_keys'][table_name] = [
{
'name': fk.get('name'),
'columns': fk['constrained_columns'],
'referred_table': fk['referred_table'],
'referred_columns': fk['referred_columns']
}
for fk in fks
]
# Get sequences
with engine.connect() as conn:
result = conn.execute(text("""
SELECT sequence_name
FROM information_schema.sequences
WHERE sequence_schema = 'public'
"""))
info['sequences'] = [row[0] for row in result]
# Get enum types
result = conn.execute(text("""
SELECT t.typname as enum_name,
array_agg(e.enumlabel ORDER BY e.enumsortorder) as enum_values
FROM pg_type t
JOIN pg_enum e ON t.oid = e.enumtypid
WHERE t.typnamespace = (SELECT oid FROM pg_namespace WHERE nspname = 'public')
GROUP BY t.typname
"""))
info['enums'] = {row[0]: row[1] for row in result}
return info
def compare_tables(dev_info, prod_info):
"""Compare tables between databases"""
dev_tables = set(dev_info['tables'].keys())
prod_tables = set(prod_info['tables'].keys())
print("\n" + "="*80)
print("TABLE COMPARISON")
print("="*80)
# Tables only in dev
dev_only = dev_tables - prod_tables
if dev_only:
print(f"\n❌ Tables only in DEV ({len(dev_only)}):")
for table in sorted(dev_only):
print(f" - {table}")
# Tables only in prod
prod_only = prod_tables - dev_tables
if prod_only:
print(f"\n❌ Tables only in PROD ({len(prod_only)}):")
for table in sorted(prod_only):
print(f" - {table}")
# Common tables
common = dev_tables & prod_tables
print(f"\n✅ Common tables: {len(common)}")
return common
def compare_columns(dev_info, prod_info, common_tables):
"""Compare columns for common tables"""
print("\n" + "="*80)
print("COLUMN COMPARISON")
print("="*80)
issues = []
for table in sorted(common_tables):
dev_cols = set(dev_info['tables'][table]['columns'].keys())
prod_cols = set(prod_info['tables'][table]['columns'].keys())
dev_only = dev_cols - prod_cols
prod_only = prod_cols - dev_cols
if dev_only or prod_only:
print(f"\n⚠️ Table '{table}' has column differences:")
if dev_only:
print(f" Columns only in DEV: {', '.join(sorted(dev_only))}")
issues.append(f"{table}: DEV-only columns: {', '.join(dev_only)}")
if prod_only:
print(f" Columns only in PROD: {', '.join(sorted(prod_only))}")
issues.append(f"{table}: PROD-only columns: {', '.join(prod_only)}")
# Compare column types for common columns
common_cols = dev_cols & prod_cols
for col in common_cols:
dev_col = dev_info['tables'][table]['columns'][col]
prod_col = prod_info['tables'][table]['columns'][col]
if dev_col['type'] != prod_col['type']:
print(f" ⚠️ Column '{col}' type mismatch:")
print(f" DEV: {dev_col['type']}")
print(f" PROD: {prod_col['type']}")
issues.append(f"{table}.{col}: Type mismatch")
if dev_col['nullable'] != prod_col['nullable']:
print(f" ⚠️ Column '{col}' nullable mismatch:")
print(f" DEV: {dev_col['nullable']}")
print(f" PROD: {prod_col['nullable']}")
issues.append(f"{table}.{col}: Nullable mismatch")
if not issues:
print("\n✅ All columns match between DEV and PROD")
return issues
def compare_enums(dev_info, prod_info):
"""Compare enum types"""
print("\n" + "="*80)
print("ENUM TYPE COMPARISON")
print("="*80)
dev_enums = set(dev_info['enums'].keys())
prod_enums = set(prod_info['enums'].keys())
dev_only = dev_enums - prod_enums
prod_only = prod_enums - dev_enums
issues = []
if dev_only:
print(f"\n❌ Enums only in DEV: {', '.join(sorted(dev_only))}")
issues.extend([f"Enum '{e}' only in DEV" for e in dev_only])
if prod_only:
print(f"\n❌ Enums only in PROD: {', '.join(sorted(prod_only))}")
issues.extend([f"Enum '{e}' only in PROD" for e in prod_only])
# Compare enum values for common enums
common = dev_enums & prod_enums
for enum_name in sorted(common):
dev_values = set(dev_info['enums'][enum_name])
prod_values = set(prod_info['enums'][enum_name])
if dev_values != prod_values:
print(f"\n⚠️ Enum '{enum_name}' values differ:")
print(f" DEV: {', '.join(sorted(dev_values))}")
print(f" PROD: {', '.join(sorted(prod_values))}")
issues.append(f"Enum '{enum_name}' values differ")
if not issues:
print("\n✅ All enum types match")
return issues
def check_migration_history(dev_engine, prod_engine):
"""Check Alembic migration history"""
print("\n" + "="*80)
print("MIGRATION HISTORY")
print("="*80)
try:
with dev_engine.connect() as dev_conn:
dev_result = dev_conn.execute(text("SELECT version_num FROM alembic_version"))
dev_version = dev_result.fetchone()
dev_version = dev_version[0] if dev_version else None
with prod_engine.connect() as prod_conn:
prod_result = prod_conn.execute(text("SELECT version_num FROM alembic_version"))
prod_version = prod_result.fetchone()
prod_version = prod_version[0] if prod_version else None
print(f"\nDEV migration version: {dev_version}")
print(f"PROD migration version: {prod_version}")
if dev_version == prod_version:
print("✅ Migration versions match")
return []
else:
print("❌ Migration versions DO NOT match")
return ["Migration versions differ"]
except Exception as e:
print(f"⚠️ Could not check migration history: {str(e)}")
return [f"Migration check failed: {str(e)}"]
def get_row_counts(engine, tables):
"""Get row counts for all tables"""
counts = {}
with engine.connect() as conn:
for table in tables:
result = conn.execute(text(f"SELECT COUNT(*) FROM {table}"))
counts[table] = result.fetchone()[0]
return counts
def compare_data_counts(dev_engine, prod_engine, common_tables):
"""Compare row counts between databases"""
print("\n" + "="*80)
print("DATA ROW COUNTS")
print("="*80)
print("\nGetting DEV row counts...")
dev_counts = get_row_counts(dev_engine, common_tables)
print("Getting PROD row counts...")
prod_counts = get_row_counts(prod_engine, common_tables)
print(f"\n{'Table':<30} {'DEV':<15} {'PROD':<15} {'Diff':<15}")
print("-" * 75)
for table in sorted(common_tables):
dev_count = dev_counts[table]
prod_count = prod_counts[table]
diff = dev_count - prod_count
diff_str = f"+{diff}" if diff > 0 else str(diff)
status = "⚠️ " if abs(diff) > 0 else ""
print(f"{status} {table:<28} {dev_count:<15} {prod_count:<15} {diff_str:<15}")
def main():
print("\n" + "="*80)
print("DATABASE INTEGRITY CHECKER")
print("="*80)
print(f"\nDEV: {DEV_DB.split('@')[1]}") # Hide password
print(f"PROD: {PROD_DB.split('@')[1]}")
try:
# Connect to databases
print("\n🔌 Connecting to databases...")
dev_engine = create_engine(DEV_DB)
prod_engine = create_engine(PROD_DB)
# Test connections
with dev_engine.connect() as conn:
conn.execute(text("SELECT 1"))
print("✅ Connected to DEV database")
with prod_engine.connect() as conn:
conn.execute(text("SELECT 1"))
print("✅ Connected to PROD database")
# Get database info
print("\n📊 Gathering database information...")
dev_info = get_db_info(dev_engine, "DEV")
prod_info = get_db_info(prod_engine, "PROD")
# Run comparisons
all_issues = []
common_tables = compare_tables(dev_info, prod_info)
column_issues = compare_columns(dev_info, prod_info, common_tables)
all_issues.extend(column_issues)
enum_issues = compare_enums(dev_info, prod_info)
all_issues.extend(enum_issues)
migration_issues = check_migration_history(dev_engine, prod_engine)
all_issues.extend(migration_issues)
compare_data_counts(dev_engine, prod_engine, common_tables)
# Summary
print("\n" + "="*80)
print("SUMMARY")
print("="*80)
if all_issues:
print(f"\n❌ Found {len(all_issues)} integrity issues:")
for i, issue in enumerate(all_issues, 1):
print(f" {i}. {issue}")
print("\n⚠️ Databases are NOT in sync!")
sys.exit(1)
else:
print("\n✅ Databases are in sync!")
print("✅ No integrity issues found")
sys.exit(0)
except Exception as e:
print(f"\n❌ Error: {str(e)}")
import traceback
traceback.print_exc()
sys.exit(1)
if __name__ == "__main__":
main()