14 Commits

23 changed files with 765 additions and 12 deletions

View File

@@ -6,6 +6,10 @@ JWT_SECRET=your-secret-key-change-this-in-production
JWT_ALGORITHM=HS256
ACCESS_TOKEN_EXPIRE_MINUTES=30
# Settings Encryption (for database-stored sensitive settings)
# Generate with: python -c "import secrets; print(secrets.token_urlsafe(64))"
SETTINGS_ENCRYPTION_KEY=your-encryption-key-generate-with-command-above
# SMTP Email Configuration (Port 465 - SSL/TLS)
SMTP_HOST=p.konceptkit.com
SMTP_PORT=465
@@ -28,7 +32,14 @@ SMTP_FROM_NAME=LOAF Membership
# Frontend URL
FRONTEND_URL=http://localhost:3000
# Stripe Configuration (for future payment integration)
# Backend URL (for webhook URLs and API references)
# Used to construct Stripe webhook URL shown in Admin Settings
BACKEND_URL=http://localhost:8000
# Stripe Configuration (NOW DATABASE-DRIVEN via Admin Settings page)
# Configure Stripe credentials through the Admin Settings UI (requires SETTINGS_ENCRYPTION_KEY)
# No longer requires .env variables - managed through database for dynamic updates
# Legacy .env variables below are deprecated:
# STRIPE_SECRET_KEY=sk_test_...
# STRIPE_WEBHOOK_SECRET=whsec_...

3
.gitignore vendored
View File

@@ -1,3 +1,5 @@
.env
.venv
# ============================================================================
# Python Backend .gitignore
# For FastAPI + PostgreSQL + Cloudflare R2 + Stripe
@@ -8,6 +10,7 @@
.env.*
!.env.example
.envrc
.sh
# ===== Python =====
# Byte-compiled / optimized / DLL files

20
Dockerfile Normal file
View File

@@ -0,0 +1,20 @@
# Use an official Python image (Linux)
FROM python:3.12-slim
# Set a working directory
WORKDIR /app
# Copy dependency list
COPY requirements.txt .
# Install dependencies
RUN pip3 install --no-cache-dir -r requirements.txt
# Copy the rest of the project
COPY . .
# Expose port (whatever your backend runs on)
EXPOSE 8000
# Run exactly your command
CMD ["python", "-m", "uvicorn", "server:app", "--host", "0.0.0.0", "--port", "8000", "--reload"]

Binary file not shown.

Binary file not shown.

Binary file not shown.

Binary file not shown.

Binary file not shown.

Binary file not shown.

Binary file not shown.

Binary file not shown.

Binary file not shown.

Binary file not shown.

Binary file not shown.

View File

@@ -0,0 +1,48 @@
"""add_role_audit_fields
Revision ID: 4fa11836f7fd
Revises: 013_sync_permissions
Create Date: 2026-01-16 17:21:40.514605
"""
from typing import Sequence, Union
from alembic import op
import sqlalchemy as sa
from sqlalchemy.dialects.postgresql import UUID
# revision identifiers, used by Alembic.
revision: str = '4fa11836f7fd'
down_revision: Union[str, None] = '013_sync_permissions'
branch_labels: Union[str, Sequence[str], None] = None
depends_on: Union[str, Sequence[str], None] = None
def upgrade() -> None:
# Add role audit trail columns
op.add_column('users', sa.Column('role_changed_at', sa.DateTime(timezone=True), nullable=True))
op.add_column('users', sa.Column('role_changed_by', UUID(as_uuid=True), nullable=True))
# Create foreign key constraint to track who changed the role
op.create_foreign_key(
'fk_users_role_changed_by',
'users', 'users',
['role_changed_by'], ['id'],
ondelete='SET NULL'
)
# Create index for efficient querying by role change date
op.create_index('idx_users_role_changed_at', 'users', ['role_changed_at'])
def downgrade() -> None:
# Drop index first
op.drop_index('idx_users_role_changed_at')
# Drop foreign key constraint
op.drop_constraint('fk_users_role_changed_by', 'users', type_='foreignkey')
# Drop columns
op.drop_column('users', 'role_changed_by')
op.drop_column('users', 'role_changed_at')

View File

@@ -0,0 +1,68 @@
"""add_system_settings_table
Revision ID: ec4cb4a49cde
Revises: 4fa11836f7fd
Create Date: 2026-01-16 18:16:00.283455
"""
from typing import Sequence, Union
from alembic import op
import sqlalchemy as sa
from sqlalchemy.dialects.postgresql import UUID
# revision identifiers, used by Alembic.
revision: str = 'ec4cb4a49cde'
down_revision: Union[str, None] = '4fa11836f7fd'
branch_labels: Union[str, Sequence[str], None] = None
depends_on: Union[str, Sequence[str], None] = None
def upgrade() -> None:
# Create enum for setting types (only if not exists)
op.execute("""
DO $$ BEGIN
CREATE TYPE settingtype AS ENUM ('plaintext', 'encrypted', 'json');
EXCEPTION
WHEN duplicate_object THEN null;
END $$;
""")
# Create system_settings table
op.execute("""
CREATE TABLE system_settings (
id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
setting_key VARCHAR(100) UNIQUE NOT NULL,
setting_value TEXT,
setting_type settingtype NOT NULL DEFAULT 'plaintext'::settingtype,
description TEXT,
updated_by UUID REFERENCES users(id) ON DELETE SET NULL,
created_at TIMESTAMP WITH TIME ZONE NOT NULL DEFAULT CURRENT_TIMESTAMP,
updated_at TIMESTAMP WITH TIME ZONE NOT NULL DEFAULT CURRENT_TIMESTAMP,
is_sensitive BOOLEAN NOT NULL DEFAULT FALSE
);
COMMENT ON COLUMN system_settings.setting_key IS 'Unique setting identifier (e.g., stripe_secret_key)';
COMMENT ON COLUMN system_settings.setting_value IS 'Setting value (encrypted if setting_type is encrypted)';
COMMENT ON COLUMN system_settings.setting_type IS 'Type of setting: plaintext, encrypted, or json';
COMMENT ON COLUMN system_settings.description IS 'Human-readable description of the setting';
COMMENT ON COLUMN system_settings.updated_by IS 'User who last updated this setting';
COMMENT ON COLUMN system_settings.is_sensitive IS 'Whether this setting contains sensitive data';
""")
# Create indexes
op.create_index('idx_system_settings_key', 'system_settings', ['setting_key'])
op.create_index('idx_system_settings_updated_at', 'system_settings', ['updated_at'])
def downgrade() -> None:
# Drop indexes
op.drop_index('idx_system_settings_updated_at')
op.drop_index('idx_system_settings_key')
# Drop table
op.drop_table('system_settings')
# Drop enum
op.execute('DROP TYPE IF EXISTS settingtype')

14
docker-compose.yml Normal file
View File

@@ -0,0 +1,14 @@
services:
backend:
build:
context: .
dockerfile: Dockerfile # Use Dockerfile.prod for production
ports:
- "8000:8000"
env_file:
- .env
environment:
DATABASE_URL: ${DATABASE_URL}
volumes:
- .:/app # sync code for hot reload

122
encryption_service.py Normal file
View File

@@ -0,0 +1,122 @@
"""
Encryption service for sensitive settings stored in database.
Uses Fernet symmetric encryption (AES-128 in CBC mode with HMAC authentication).
The encryption key is derived from a master secret stored in .env.
"""
import os
import base64
from cryptography.fernet import Fernet
from cryptography.hazmat.primitives import hashes
from cryptography.hazmat.primitives.kdf.pbkdf2 import PBKDF2HMAC
from cryptography.hazmat.backends import default_backend
class EncryptionService:
"""Service for encrypting and decrypting sensitive configuration values"""
def __init__(self):
# Get master encryption key from environment
# This should be a long, random string (e.g., 64 characters)
# Generate one with: python -c "import secrets; print(secrets.token_urlsafe(64))"
self.master_secret = os.environ.get('SETTINGS_ENCRYPTION_KEY')
if not self.master_secret:
raise ValueError(
"SETTINGS_ENCRYPTION_KEY environment variable not set. "
"Generate one with: python -c \"import secrets; print(secrets.token_urlsafe(64))\""
)
# Derive encryption key from master secret using PBKDF2HMAC
# This adds an extra layer of security
kdf = PBKDF2HMAC(
algorithm=hashes.SHA256(),
length=32,
salt=b'systemsettings', # Fixed salt (OK for key derivation from strong secret)
iterations=100000,
backend=default_backend()
)
key = base64.urlsafe_b64encode(kdf.derive(self.master_secret.encode()))
self.cipher = Fernet(key)
def encrypt(self, plaintext: str) -> str:
"""
Encrypt a plaintext string.
Args:
plaintext: The string to encrypt
Returns:
Base64-encoded encrypted string
"""
if not plaintext:
return ""
encrypted_bytes = self.cipher.encrypt(plaintext.encode())
return encrypted_bytes.decode('utf-8')
def decrypt(self, encrypted: str) -> str:
"""
Decrypt an encrypted string.
Args:
encrypted: The base64-encoded encrypted string
Returns:
Decrypted plaintext string
Raises:
cryptography.fernet.InvalidToken: If decryption fails (wrong key or corrupted data)
"""
if not encrypted:
return ""
decrypted_bytes = self.cipher.decrypt(encrypted.encode())
return decrypted_bytes.decode('utf-8')
def is_encrypted(self, value: str) -> bool:
"""
Check if a value appears to be encrypted (starts with Fernet token format).
This is a heuristic check - not 100% reliable but useful for validation.
Args:
value: String to check
Returns:
True if value looks like a Fernet token
"""
if not value:
return False
# Fernet tokens are base64-encoded and start with version byte (gAAAAA...)
# They're always > 60 characters
try:
return len(value) > 60 and value.startswith('gAAAAA')
except:
return False
# Global encryption service instance
# Initialize on module import so it fails fast if encryption key is missing
try:
encryption_service = EncryptionService()
except ValueError as e:
print(f"WARNING: {e}")
print("Encryption service will not be available.")
encryption_service = None
def get_encryption_service() -> EncryptionService:
"""
Get the global encryption service instance.
Raises:
ValueError: If encryption service is not initialized (missing SETTINGS_ENCRYPTION_KEY)
"""
if encryption_service is None:
raise ValueError(
"Encryption service not initialized. Set SETTINGS_ENCRYPTION_KEY environment variable."
)
return encryption_service

View File

@@ -137,6 +137,10 @@ class User(Base):
wordpress_user_id = Column(BigInteger, nullable=True, comment="Original WordPress user ID")
wordpress_registered_date = Column(DateTime(timezone=True), nullable=True, comment="Original WordPress registration date")
# Role Change Audit Trail
role_changed_at = Column(DateTime(timezone=True), nullable=True, comment="Timestamp when role was last changed")
role_changed_by = Column(UUID(as_uuid=True), ForeignKey('users.id', ondelete='SET NULL'), nullable=True, comment="Admin who changed the role")
created_at = Column(DateTime, default=lambda: datetime.now(timezone.utc))
updated_at = Column(DateTime, default=lambda: datetime.now(timezone.utc), onupdate=lambda: datetime.now(timezone.utc))
@@ -145,6 +149,7 @@ class User(Base):
events_created = relationship("Event", back_populates="creator")
rsvps = relationship("EventRSVP", back_populates="user")
subscriptions = relationship("Subscription", back_populates="user", foreign_keys="Subscription.user_id")
role_changer = relationship("User", foreign_keys=[role_changed_by], remote_side="User.id", post_update=True)
class Event(Base):
__tablename__ = "events"
@@ -509,3 +514,36 @@ class ImportRollbackAudit(Base):
# Relationships
import_job = relationship("ImportJob")
admin_user = relationship("User", foreign_keys=[rolled_back_by])
# ============================================================
# System Settings Models
# ============================================================
class SettingType(enum.Enum):
plaintext = "plaintext"
encrypted = "encrypted"
json = "json"
class SystemSettings(Base):
"""System-wide configuration settings stored in database"""
__tablename__ = "system_settings"
id = Column(UUID(as_uuid=True), primary_key=True, default=uuid.uuid4)
setting_key = Column(String(100), unique=True, nullable=False, index=True)
setting_value = Column(Text, nullable=True)
setting_type = Column(SQLEnum(SettingType), default=SettingType.plaintext, nullable=False)
description = Column(Text, nullable=True)
updated_by = Column(UUID(as_uuid=True), ForeignKey("users.id", ondelete="SET NULL"), nullable=True)
created_at = Column(DateTime, default=lambda: datetime.now(timezone.utc), nullable=False)
updated_at = Column(DateTime, default=lambda: datetime.now(timezone.utc), onupdate=lambda: datetime.now(timezone.utc), nullable=False)
is_sensitive = Column(Boolean, default=False, nullable=False)
# Relationships
updater = relationship("User", foreign_keys=[updated_by])
# Index on updated_at for audit queries
__table_args__ = (
Index('idx_system_settings_updated_at', 'updated_at'),
)

View File

@@ -11,11 +11,9 @@ from datetime import datetime, timezone, timedelta
# Load environment variables
load_dotenv()
# Initialize Stripe with secret key
stripe.api_key = os.getenv("STRIPE_SECRET_KEY")
# Stripe webhook secret for signature verification
STRIPE_WEBHOOK_SECRET = os.getenv("STRIPE_WEBHOOK_SECRET")
# NOTE: Stripe credentials are now database-driven
# These .env fallbacks are kept for backward compatibility only
# The actual credentials are loaded dynamically from system_settings table
def create_checkout_session(
user_id: str,
@@ -23,11 +21,15 @@ def create_checkout_session(
plan_id: str,
stripe_price_id: str,
success_url: str,
cancel_url: str
cancel_url: str,
db = None
):
"""
Create a Stripe Checkout session for subscription payment.
Args:
db: Database session (optional, for reading Stripe credentials from database)
Args:
user_id: User's UUID
user_email: User's email address
@@ -39,6 +41,28 @@ def create_checkout_session(
Returns:
dict: Checkout session object with session ID and URL
"""
# Load Stripe API key from database if available
if db:
try:
# Import here to avoid circular dependency
from models import SystemSettings, SettingType
from encryption_service import get_encryption_service
setting = db.query(SystemSettings).filter(
SystemSettings.setting_key == 'stripe_secret_key'
).first()
if setting and setting.setting_value:
encryption_service = get_encryption_service()
stripe.api_key = encryption_service.decrypt(setting.setting_value)
except Exception as e:
# Fallback to .env if database read fails
print(f"Failed to read Stripe key from database: {e}")
stripe.api_key = os.getenv("STRIPE_SECRET_KEY")
else:
# Fallback to .env if no db session
stripe.api_key = os.getenv("STRIPE_SECRET_KEY")
try:
# Create Checkout Session
checkout_session = stripe.checkout.Session.create(
@@ -74,13 +98,14 @@ def create_checkout_session(
raise Exception(f"Stripe error: {str(e)}")
def verify_webhook_signature(payload: bytes, sig_header: str) -> dict:
def verify_webhook_signature(payload: bytes, sig_header: str, db=None) -> dict:
"""
Verify Stripe webhook signature and construct event.
Args:
payload: Raw webhook payload bytes
sig_header: Stripe signature header
db: Database session (optional, for reading webhook secret from database)
Returns:
dict: Verified webhook event
@@ -88,9 +113,32 @@ def verify_webhook_signature(payload: bytes, sig_header: str) -> dict:
Raises:
ValueError: If signature verification fails
"""
# Load webhook secret from database if available
webhook_secret = None
if db:
try:
from models import SystemSettings
from encryption_service import get_encryption_service
setting = db.query(SystemSettings).filter(
SystemSettings.setting_key == 'stripe_webhook_secret'
).first()
if setting and setting.setting_value:
encryption_service = get_encryption_service()
webhook_secret = encryption_service.decrypt(setting.setting_value)
except Exception as e:
print(f"Failed to read webhook secret from database: {e}")
webhook_secret = os.getenv("STRIPE_WEBHOOK_SECRET")
else:
webhook_secret = os.getenv("STRIPE_WEBHOOK_SECRET")
if not webhook_secret:
raise ValueError("STRIPE_WEBHOOK_SECRET not configured")
try:
event = stripe.Webhook.construct_event(
payload, sig_header, STRIPE_WEBHOOK_SECRET
payload, sig_header, webhook_secret
)
return event
except ValueError as e:

View File

@@ -31,7 +31,7 @@ motor==3.3.1
msal==1.27.0
mypy==1.18.2
mypy_extensions==1.1.0
numpy==2.3.5
numpy==2.2.6
oauthlib==3.3.1
packaging==25.0
pandas==2.3.3

375
server.py
View File

@@ -514,6 +514,10 @@ class AcceptInvitationRequest(BaseModel):
zipcode: Optional[str] = None
date_of_birth: Optional[datetime] = None
class ChangeRoleRequest(BaseModel):
role: str
role_id: Optional[str] = None # For custom roles
# Auth Routes
@api_router.post("/auth/register")
async def register(request: RegisterRequest, db: Session = Depends(get_db)):
@@ -2527,6 +2531,102 @@ async def admin_reset_user_password(
return {"message": f"Password reset for {user.email}. Temporary password emailed."}
@api_router.put("/admin/users/{user_id}/role")
async def change_user_role(
user_id: str,
request: ChangeRoleRequest,
current_user: User = Depends(require_permission("users.edit")),
db: Session = Depends(get_db)
):
"""
Change an existing user's role with privilege escalation prevention.
Requires: users.edit permission
Rules:
- Superadmin: Can assign any role (including superadmin)
- Admin: Can assign admin, finance, member, guest, and non-elevated custom roles
- Admin CANNOT assign: superadmin or custom roles with elevated permissions
- Users CANNOT change their own role
"""
# 1. Fetch target user
target_user = db.query(User).filter(User.id == user_id).first()
if not target_user:
raise HTTPException(status_code=404, detail="User not found")
# 2. Prevent self-role-change
if str(target_user.id) == str(current_user.id):
raise HTTPException(
status_code=403,
detail="You cannot change your own role"
)
# 3. Validate new role
if request.role not in ['guest', 'member', 'admin', 'finance', 'superadmin']:
raise HTTPException(status_code=400, detail="Invalid role")
# 4. Privilege escalation check
if current_user.role != 'superadmin':
# Non-superadmin cannot assign superadmin role
if request.role == 'superadmin':
raise HTTPException(
status_code=403,
detail="Only superadmin can assign superadmin role"
)
# Check custom role elevation
if request.role_id:
custom_role = db.query(Role).filter(Role.id == request.role_id).first()
if not custom_role:
raise HTTPException(status_code=404, detail="Custom role not found")
# Check if custom role has elevated permissions
elevated_permissions = ['users.delete', 'roles.create', 'roles.edit',
'roles.delete', 'permissions.edit']
role_perms = db.query(Permission.name).join(RolePermission).filter(
RolePermission.role_id == custom_role.id,
Permission.name.in_(elevated_permissions)
).all()
if role_perms:
raise HTTPException(
status_code=403,
detail=f"Cannot assign role with elevated permissions: {custom_role.name}"
)
# 5. Update role with audit trail
old_role = target_user.role
old_role_id = target_user.role_id
target_user.role = request.role
target_user.role_id = request.role_id if request.role_id else None
target_user.role_changed_at = datetime.now(timezone.utc)
target_user.role_changed_by = current_user.id
target_user.updated_at = datetime.now(timezone.utc)
db.commit()
db.refresh(target_user)
# Log admin action
logger.info(
f"Admin {current_user.email} changed role for user {target_user.email} "
f"from {old_role} to {request.role}"
)
return {
"message": f"Role changed from {old_role} to {request.role}",
"user": {
"id": str(target_user.id),
"email": target_user.email,
"name": f"{target_user.first_name} {target_user.last_name}",
"old_role": old_role,
"new_role": target_user.role,
"changed_by": f"{current_user.first_name} {current_user.last_name}",
"changed_at": target_user.role_changed_at.isoformat()
}
}
@api_router.post("/admin/users/{user_id}/resend-verification")
async def admin_resend_verification(
user_id: str,
@@ -6197,8 +6297,8 @@ async def stripe_webhook(request: Request, db: Session = Depends(get_db)):
raise HTTPException(status_code=400, detail="Missing stripe-signature header")
try:
# Verify webhook signature
event = verify_webhook_signature(payload, sig_header)
# Verify webhook signature (pass db for reading webhook secret from database)
event = verify_webhook_signature(payload, sig_header, db)
except ValueError as e:
logger.error(f"Webhook signature verification failed: {str(e)}")
raise HTTPException(status_code=400, detail=str(e))
@@ -6298,6 +6398,277 @@ async def stripe_webhook(request: Request, db: Session = Depends(get_db)):
return {"status": "success"}
# ============================================================================
# ADMIN SETTINGS ENDPOINTS
# ============================================================================
# Helper functions for system settings
def get_setting(db: Session, key: str, decrypt: bool = False) -> str | None:
"""
Get a system setting value from database.
Args:
db: Database session
key: Setting key to retrieve
decrypt: If True and setting_type is 'encrypted', decrypt the value
Returns:
Setting value or None if not found
"""
from models import SystemSettings, SettingType
from encryption_service import get_encryption_service
setting = db.query(SystemSettings).filter(SystemSettings.setting_key == key).first()
if not setting:
return None
value = setting.setting_value
if decrypt and setting.setting_type == SettingType.encrypted and value:
try:
encryption_service = get_encryption_service()
value = encryption_service.decrypt(value)
except Exception as e:
print(f"Failed to decrypt setting {key}: {e}")
return None
return value
def set_setting(
db: Session,
key: str,
value: str,
user_id: str,
setting_type: str = "plaintext",
description: str = None,
is_sensitive: bool = False,
encrypt: bool = False
) -> None:
"""
Set a system setting value in database.
Args:
db: Database session
key: Setting key
value: Setting value
user_id: ID of user making the change
setting_type: Type of setting (plaintext, encrypted, json)
description: Human-readable description
is_sensitive: Whether this is sensitive data
encrypt: If True, encrypt the value before storing
"""
from models import SystemSettings, SettingType
from encryption_service import get_encryption_service
# Encrypt value if requested
if encrypt and value:
encryption_service = get_encryption_service()
value = encryption_service.encrypt(value)
setting_type = "encrypted"
# Find or create setting
setting = db.query(SystemSettings).filter(SystemSettings.setting_key == key).first()
if setting:
# Update existing
setting.setting_value = value
setting.setting_type = SettingType[setting_type]
setting.updated_by = user_id
setting.updated_at = datetime.now(timezone.utc)
if description:
setting.description = description
setting.is_sensitive = is_sensitive
else:
# Create new
setting = SystemSettings(
setting_key=key,
setting_value=value,
setting_type=SettingType[setting_type],
description=description,
updated_by=user_id,
is_sensitive=is_sensitive
)
db.add(setting)
db.commit()
@api_router.get("/admin/settings/stripe/status")
async def get_stripe_status(
current_user: User = Depends(get_current_superadmin),
db: Session = Depends(get_db)
):
"""
Get Stripe integration status (superadmin only).
Returns:
- configured: Whether credentials exist in database
- secret_key_prefix: First 10 chars of secret key (for verification)
- webhook_configured: Whether webhook secret exists
- environment: test or live (based on key prefix)
- webhook_url: Full webhook URL for Stripe configuration
"""
import os
# Read from database
secret_key = get_setting(db, 'stripe_secret_key', decrypt=True)
webhook_secret = get_setting(db, 'stripe_webhook_secret', decrypt=True)
configured = bool(secret_key)
environment = 'unknown'
if secret_key:
if secret_key.startswith('sk_test_'):
environment = 'test'
elif secret_key.startswith('sk_live_'):
environment = 'live'
# Get backend URL from environment for webhook URL
# Try multiple environment variable patterns for flexibility
backend_url = (
os.environ.get('BACKEND_URL') or
os.environ.get('API_URL') or
f"http://{os.environ.get('HOST', 'localhost')}:{os.environ.get('PORT', '8000')}"
)
webhook_url = f"{backend_url}/api/webhooks/stripe"
return {
"configured": configured,
"secret_key_prefix": secret_key[:10] if secret_key else None,
"secret_key_set": bool(secret_key),
"webhook_secret_set": bool(webhook_secret),
"environment": environment,
"webhook_url": webhook_url,
"instructions": {
"location": "Database (system_settings table)",
"required_settings": [
"stripe_secret_key (sk_test_... or sk_live_...)",
"stripe_webhook_secret (whsec_...)"
],
"restart_required": "No - changes take effect immediately"
}
}
@api_router.post("/admin/settings/stripe/test-connection")
async def test_stripe_connection(
current_user: User = Depends(get_current_superadmin),
db: Session = Depends(get_db)
):
"""
Test Stripe API connection (superadmin only).
Performs a simple API call to verify credentials work.
"""
import stripe
# Read from database
secret_key = get_setting(db, 'stripe_secret_key', decrypt=True)
if not secret_key:
raise HTTPException(
status_code=400,
detail="STRIPE_SECRET_KEY not configured in database. Please configure Stripe settings first."
)
try:
stripe.api_key = secret_key
# Make a simple API call to test connection
balance = stripe.Balance.retrieve()
return {
"success": True,
"message": "Stripe connection successful",
"environment": "test" if secret_key.startswith('sk_test_') else "live",
"balance": {
"available": balance.available,
"pending": balance.pending
}
}
except stripe.error.AuthenticationError as e:
raise HTTPException(
status_code=401,
detail=f"Stripe authentication failed: {str(e)}"
)
except Exception as e:
raise HTTPException(
status_code=500,
detail=f"Stripe connection test failed: {str(e)}"
)
class UpdateStripeSettingsRequest(BaseModel):
"""Request model for updating Stripe settings"""
secret_key: str = Field(..., min_length=1, description="Stripe secret key (sk_test_... or sk_live_...)")
webhook_secret: str = Field(..., min_length=1, description="Stripe webhook secret (whsec_...)")
@api_router.put("/admin/settings/stripe")
async def update_stripe_settings(
request: UpdateStripeSettingsRequest,
current_user: User = Depends(get_current_superadmin),
db: Session = Depends(get_db)
):
"""
Update Stripe integration settings (superadmin only).
Stores Stripe credentials encrypted in the database.
Changes take effect immediately without server restart.
"""
# Validate secret key format
if not (request.secret_key.startswith('sk_test_') or request.secret_key.startswith('sk_live_')):
raise HTTPException(
status_code=400,
detail="Invalid Stripe secret key format. Must start with 'sk_test_' or 'sk_live_'"
)
# Validate webhook secret format
if not request.webhook_secret.startswith('whsec_'):
raise HTTPException(
status_code=400,
detail="Invalid Stripe webhook secret format. Must start with 'whsec_'"
)
try:
# Store secret key (encrypted)
set_setting(
db=db,
key='stripe_secret_key',
value=request.secret_key,
user_id=str(current_user.id),
description='Stripe API secret key for payment processing',
is_sensitive=True,
encrypt=True
)
# Store webhook secret (encrypted)
set_setting(
db=db,
key='stripe_webhook_secret',
value=request.webhook_secret,
user_id=str(current_user.id),
description='Stripe webhook secret for verifying webhook signatures',
is_sensitive=True,
encrypt=True
)
# Determine environment
environment = 'test' if request.secret_key.startswith('sk_test_') else 'live'
return {
"success": True,
"message": "Stripe settings updated successfully",
"environment": environment,
"updated_at": datetime.now(timezone.utc).isoformat(),
"updated_by": f"{current_user.first_name} {current_user.last_name}"
}
except Exception as e:
raise HTTPException(
status_code=500,
detail=f"Failed to update Stripe settings: {str(e)}"
)
# Include the router in the main app
app.include_router(api_router)

10
start.sh Normal file
View File

@@ -0,0 +1,10 @@
#!/usr/bin/env bash
# Exit immediately if a command fails
set -e
# Activate virtual environment
source .venv/bin/activate
# Start the backend
python -m uvicorn server:app --reload --port 8000