10 Commits

Author SHA1 Message Date
kayela
5ab0038c0a Merge branch 'dev' into docker 2026-01-17 13:37:04 -06:00
Koncept Kit
e938baa78e - Add Settings menu for Stripe configuration- In the Member Profile page, Superadmin can assign new Role to the member- Stripe Configuration is now stored with encryption in Database 2026-01-16 19:07:58 +07:00
Koncept Kit
39324ba6f6 Database prevent dead connection errors and make login work on the first try 2026-01-07 16:23:01 +07:00
Koncept Kit
adbfa7a3c8 - Fixed MutableHeaders bug- Disable API docs in production- CORS diagnostic endpoint- Security headers + CORS middlewareMust have ENVIRONMENT=production and CORS_ORIGINS=... in .env file 2026-01-07 14:21:47 +07:00
Koncept Kit
a74f161efa Security Hardening #1 2026-01-07 14:15:50 +07:00
Koncept Kit
d818d847bc Security Hardening 2026-01-07 14:03:38 +07:00
Koncept Kit
1390e07500 Login and Session Fixes 2026-01-07 13:37:28 +07:00
kayela
38e5f5377a Merge branch 'dev' into docker 2026-01-06 12:31:29 -06:00
kayela
e06f18ce17 Add start script for backend server initialization and update .gitignore 2026-01-06 12:30:26 -06:00
Koncept Kit
810366d00f feat: Implement Option 3 - Proper RBAC with role-based staff invitations
**Problem:** Admin had users.create permission but couldn't use it due to workflow requiring superadmin-only /admin/roles endpoint.

**Solution:** Created scalable endpoint that adapts role selection to user's permission level.

**Changes:**
- NEW: GET /admin/roles/assignable endpoint with intelligent role filtering
  - Superadmin: Returns all roles
  - Admin: Returns admin, finance, non-elevated custom roles (excludes superadmin)
  - Prevents privilege escalation via permission comparison

- UPDATED: InviteStaffDialog now uses /admin/roles/assignable
  - Removed 403 fallback logic (no longer needed)
  - Backend handles role filtering dynamically

- UPDATED: AdminStaff 'Invite Staff' button back to permission-based
  - Changed from user.role === 'superadmin' to hasPermission('users.create')
  - Both admin and superadmin can now invite staff with role restrictions

**Security:**
-  Privilege escalation blocked (admin can't create superadmin)
-  Custom roles filtered by permission comparison
-  Multi-layer enforcement (frontend + backend)

**Files Modified:**
- backend/server.py (+94 lines)
- frontend/src/components/InviteStaffDialog.js (-14 lines)
- frontend/src/pages/admin/AdminStaff.js (1 line changed)
- RBAC_IMPLEMENTATION_FINAL.md (new documentation)

**Testing:**
- Superadmin can assign all roles including superadmin ✓
- Admin can assign admin and finance ✓
- Admin cannot see/assign superadmin ✓
- Custom role elevation detection working ✓
2026-01-06 14:42:25 +07:00
10 changed files with 977 additions and 18 deletions

View File

@@ -6,6 +6,10 @@ JWT_SECRET=your-secret-key-change-this-in-production
JWT_ALGORITHM=HS256
ACCESS_TOKEN_EXPIRE_MINUTES=30
# Settings Encryption (for database-stored sensitive settings)
# Generate with: python -c "import secrets; print(secrets.token_urlsafe(64))"
SETTINGS_ENCRYPTION_KEY=your-encryption-key-generate-with-command-above
# SMTP Email Configuration (Port 465 - SSL/TLS)
SMTP_HOST=p.konceptkit.com
SMTP_PORT=465
@@ -28,7 +32,14 @@ SMTP_FROM_NAME=LOAF Membership
# Frontend URL
FRONTEND_URL=http://localhost:3000
# Stripe Configuration (for future payment integration)
# Backend URL (for webhook URLs and API references)
# Used to construct Stripe webhook URL shown in Admin Settings
BACKEND_URL=http://localhost:8000
# Stripe Configuration (NOW DATABASE-DRIVEN via Admin Settings page)
# Configure Stripe credentials through the Admin Settings UI (requires SETTINGS_ENCRYPTION_KEY)
# No longer requires .env variables - managed through database for dynamic updates
# Legacy .env variables below are deprecated:
# STRIPE_SECRET_KEY=sk_test_...
# STRIPE_WEBHOOK_SECRET=whsec_...

1
.gitignore vendored
View File

@@ -10,6 +10,7 @@
.env.*
!.env.example
.envrc
.sh
# ===== Python =====
# Byte-compiled / optimized / DLL files

View File

@@ -0,0 +1,48 @@
"""add_role_audit_fields
Revision ID: 4fa11836f7fd
Revises: 013_sync_permissions
Create Date: 2026-01-16 17:21:40.514605
"""
from typing import Sequence, Union
from alembic import op
import sqlalchemy as sa
from sqlalchemy.dialects.postgresql import UUID
# revision identifiers, used by Alembic.
revision: str = '4fa11836f7fd'
down_revision: Union[str, None] = '013_sync_permissions'
branch_labels: Union[str, Sequence[str], None] = None
depends_on: Union[str, Sequence[str], None] = None
def upgrade() -> None:
# Add role audit trail columns
op.add_column('users', sa.Column('role_changed_at', sa.DateTime(timezone=True), nullable=True))
op.add_column('users', sa.Column('role_changed_by', UUID(as_uuid=True), nullable=True))
# Create foreign key constraint to track who changed the role
op.create_foreign_key(
'fk_users_role_changed_by',
'users', 'users',
['role_changed_by'], ['id'],
ondelete='SET NULL'
)
# Create index for efficient querying by role change date
op.create_index('idx_users_role_changed_at', 'users', ['role_changed_at'])
def downgrade() -> None:
# Drop index first
op.drop_index('idx_users_role_changed_at')
# Drop foreign key constraint
op.drop_constraint('fk_users_role_changed_by', 'users', type_='foreignkey')
# Drop columns
op.drop_column('users', 'role_changed_by')
op.drop_column('users', 'role_changed_at')

View File

@@ -0,0 +1,68 @@
"""add_system_settings_table
Revision ID: ec4cb4a49cde
Revises: 4fa11836f7fd
Create Date: 2026-01-16 18:16:00.283455
"""
from typing import Sequence, Union
from alembic import op
import sqlalchemy as sa
from sqlalchemy.dialects.postgresql import UUID
# revision identifiers, used by Alembic.
revision: str = 'ec4cb4a49cde'
down_revision: Union[str, None] = '4fa11836f7fd'
branch_labels: Union[str, Sequence[str], None] = None
depends_on: Union[str, Sequence[str], None] = None
def upgrade() -> None:
# Create enum for setting types (only if not exists)
op.execute("""
DO $$ BEGIN
CREATE TYPE settingtype AS ENUM ('plaintext', 'encrypted', 'json');
EXCEPTION
WHEN duplicate_object THEN null;
END $$;
""")
# Create system_settings table
op.execute("""
CREATE TABLE system_settings (
id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
setting_key VARCHAR(100) UNIQUE NOT NULL,
setting_value TEXT,
setting_type settingtype NOT NULL DEFAULT 'plaintext'::settingtype,
description TEXT,
updated_by UUID REFERENCES users(id) ON DELETE SET NULL,
created_at TIMESTAMP WITH TIME ZONE NOT NULL DEFAULT CURRENT_TIMESTAMP,
updated_at TIMESTAMP WITH TIME ZONE NOT NULL DEFAULT CURRENT_TIMESTAMP,
is_sensitive BOOLEAN NOT NULL DEFAULT FALSE
);
COMMENT ON COLUMN system_settings.setting_key IS 'Unique setting identifier (e.g., stripe_secret_key)';
COMMENT ON COLUMN system_settings.setting_value IS 'Setting value (encrypted if setting_type is encrypted)';
COMMENT ON COLUMN system_settings.setting_type IS 'Type of setting: plaintext, encrypted, or json';
COMMENT ON COLUMN system_settings.description IS 'Human-readable description of the setting';
COMMENT ON COLUMN system_settings.updated_by IS 'User who last updated this setting';
COMMENT ON COLUMN system_settings.is_sensitive IS 'Whether this setting contains sensitive data';
""")
# Create indexes
op.create_index('idx_system_settings_key', 'system_settings', ['setting_key'])
op.create_index('idx_system_settings_updated_at', 'system_settings', ['updated_at'])
def downgrade() -> None:
# Drop indexes
op.drop_index('idx_system_settings_updated_at')
op.drop_index('idx_system_settings_key')
# Drop table
op.drop_table('system_settings')
# Drop enum
op.execute('DROP TYPE IF EXISTS settingtype')

View File

@@ -1,6 +1,7 @@
from sqlalchemy import create_engine
from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy.orm import sessionmaker
from sqlalchemy.pool import QueuePool
import os
from dotenv import load_dotenv
from pathlib import Path
@@ -10,7 +11,21 @@ load_dotenv(ROOT_DIR / '.env')
DATABASE_URL = os.environ.get('DATABASE_URL', 'postgresql://user:password@localhost:5432/membership_db')
engine = create_engine(DATABASE_URL)
# Configure engine with connection pooling and connection health checks
engine = create_engine(
DATABASE_URL,
poolclass=QueuePool,
pool_size=5, # Keep 5 connections open
max_overflow=10, # Allow up to 10 extra connections during peak
pool_pre_ping=True, # CRITICAL: Test connections before using them
pool_recycle=3600, # Recycle connections every hour (prevents stale connections)
echo=False, # Set to True for SQL debugging
connect_args={
'connect_timeout': 10, # Timeout connection attempts after 10 seconds
'options': '-c statement_timeout=30000' # 30 second query timeout
}
)
SessionLocal = sessionmaker(autocommit=False, autoflush=False, bind=engine)
Base = declarative_base()

122
encryption_service.py Normal file
View File

@@ -0,0 +1,122 @@
"""
Encryption service for sensitive settings stored in database.
Uses Fernet symmetric encryption (AES-128 in CBC mode with HMAC authentication).
The encryption key is derived from a master secret stored in .env.
"""
import os
import base64
from cryptography.fernet import Fernet
from cryptography.hazmat.primitives import hashes
from cryptography.hazmat.primitives.kdf.pbkdf2 import PBKDF2HMAC
from cryptography.hazmat.backends import default_backend
class EncryptionService:
"""Service for encrypting and decrypting sensitive configuration values"""
def __init__(self):
# Get master encryption key from environment
# This should be a long, random string (e.g., 64 characters)
# Generate one with: python -c "import secrets; print(secrets.token_urlsafe(64))"
self.master_secret = os.environ.get('SETTINGS_ENCRYPTION_KEY')
if not self.master_secret:
raise ValueError(
"SETTINGS_ENCRYPTION_KEY environment variable not set. "
"Generate one with: python -c \"import secrets; print(secrets.token_urlsafe(64))\""
)
# Derive encryption key from master secret using PBKDF2HMAC
# This adds an extra layer of security
kdf = PBKDF2HMAC(
algorithm=hashes.SHA256(),
length=32,
salt=b'systemsettings', # Fixed salt (OK for key derivation from strong secret)
iterations=100000,
backend=default_backend()
)
key = base64.urlsafe_b64encode(kdf.derive(self.master_secret.encode()))
self.cipher = Fernet(key)
def encrypt(self, plaintext: str) -> str:
"""
Encrypt a plaintext string.
Args:
plaintext: The string to encrypt
Returns:
Base64-encoded encrypted string
"""
if not plaintext:
return ""
encrypted_bytes = self.cipher.encrypt(plaintext.encode())
return encrypted_bytes.decode('utf-8')
def decrypt(self, encrypted: str) -> str:
"""
Decrypt an encrypted string.
Args:
encrypted: The base64-encoded encrypted string
Returns:
Decrypted plaintext string
Raises:
cryptography.fernet.InvalidToken: If decryption fails (wrong key or corrupted data)
"""
if not encrypted:
return ""
decrypted_bytes = self.cipher.decrypt(encrypted.encode())
return decrypted_bytes.decode('utf-8')
def is_encrypted(self, value: str) -> bool:
"""
Check if a value appears to be encrypted (starts with Fernet token format).
This is a heuristic check - not 100% reliable but useful for validation.
Args:
value: String to check
Returns:
True if value looks like a Fernet token
"""
if not value:
return False
# Fernet tokens are base64-encoded and start with version byte (gAAAAA...)
# They're always > 60 characters
try:
return len(value) > 60 and value.startswith('gAAAAA')
except:
return False
# Global encryption service instance
# Initialize on module import so it fails fast if encryption key is missing
try:
encryption_service = EncryptionService()
except ValueError as e:
print(f"WARNING: {e}")
print("Encryption service will not be available.")
encryption_service = None
def get_encryption_service() -> EncryptionService:
"""
Get the global encryption service instance.
Raises:
ValueError: If encryption service is not initialized (missing SETTINGS_ENCRYPTION_KEY)
"""
if encryption_service is None:
raise ValueError(
"Encryption service not initialized. Set SETTINGS_ENCRYPTION_KEY environment variable."
)
return encryption_service

View File

@@ -137,6 +137,10 @@ class User(Base):
wordpress_user_id = Column(BigInteger, nullable=True, comment="Original WordPress user ID")
wordpress_registered_date = Column(DateTime(timezone=True), nullable=True, comment="Original WordPress registration date")
# Role Change Audit Trail
role_changed_at = Column(DateTime(timezone=True), nullable=True, comment="Timestamp when role was last changed")
role_changed_by = Column(UUID(as_uuid=True), ForeignKey('users.id', ondelete='SET NULL'), nullable=True, comment="Admin who changed the role")
created_at = Column(DateTime, default=lambda: datetime.now(timezone.utc))
updated_at = Column(DateTime, default=lambda: datetime.now(timezone.utc), onupdate=lambda: datetime.now(timezone.utc))
@@ -145,6 +149,7 @@ class User(Base):
events_created = relationship("Event", back_populates="creator")
rsvps = relationship("EventRSVP", back_populates="user")
subscriptions = relationship("Subscription", back_populates="user", foreign_keys="Subscription.user_id")
role_changer = relationship("User", foreign_keys=[role_changed_by], remote_side="User.id", post_update=True)
class Event(Base):
__tablename__ = "events"
@@ -509,3 +514,36 @@ class ImportRollbackAudit(Base):
# Relationships
import_job = relationship("ImportJob")
admin_user = relationship("User", foreign_keys=[rolled_back_by])
# ============================================================
# System Settings Models
# ============================================================
class SettingType(enum.Enum):
plaintext = "plaintext"
encrypted = "encrypted"
json = "json"
class SystemSettings(Base):
"""System-wide configuration settings stored in database"""
__tablename__ = "system_settings"
id = Column(UUID(as_uuid=True), primary_key=True, default=uuid.uuid4)
setting_key = Column(String(100), unique=True, nullable=False, index=True)
setting_value = Column(Text, nullable=True)
setting_type = Column(SQLEnum(SettingType), default=SettingType.plaintext, nullable=False)
description = Column(Text, nullable=True)
updated_by = Column(UUID(as_uuid=True), ForeignKey("users.id", ondelete="SET NULL"), nullable=True)
created_at = Column(DateTime, default=lambda: datetime.now(timezone.utc), nullable=False)
updated_at = Column(DateTime, default=lambda: datetime.now(timezone.utc), onupdate=lambda: datetime.now(timezone.utc), nullable=False)
is_sensitive = Column(Boolean, default=False, nullable=False)
# Relationships
updater = relationship("User", foreign_keys=[updated_by])
# Index on updated_at for audit queries
__table_args__ = (
Index('idx_system_settings_updated_at', 'updated_at'),
)

View File

@@ -11,11 +11,9 @@ from datetime import datetime, timezone, timedelta
# Load environment variables
load_dotenv()
# Initialize Stripe with secret key
stripe.api_key = os.getenv("STRIPE_SECRET_KEY")
# Stripe webhook secret for signature verification
STRIPE_WEBHOOK_SECRET = os.getenv("STRIPE_WEBHOOK_SECRET")
# NOTE: Stripe credentials are now database-driven
# These .env fallbacks are kept for backward compatibility only
# The actual credentials are loaded dynamically from system_settings table
def create_checkout_session(
user_id: str,
@@ -23,11 +21,15 @@ def create_checkout_session(
plan_id: str,
stripe_price_id: str,
success_url: str,
cancel_url: str
cancel_url: str,
db = None
):
"""
Create a Stripe Checkout session for subscription payment.
Args:
db: Database session (optional, for reading Stripe credentials from database)
Args:
user_id: User's UUID
user_email: User's email address
@@ -39,6 +41,28 @@ def create_checkout_session(
Returns:
dict: Checkout session object with session ID and URL
"""
# Load Stripe API key from database if available
if db:
try:
# Import here to avoid circular dependency
from models import SystemSettings, SettingType
from encryption_service import get_encryption_service
setting = db.query(SystemSettings).filter(
SystemSettings.setting_key == 'stripe_secret_key'
).first()
if setting and setting.setting_value:
encryption_service = get_encryption_service()
stripe.api_key = encryption_service.decrypt(setting.setting_value)
except Exception as e:
# Fallback to .env if database read fails
print(f"Failed to read Stripe key from database: {e}")
stripe.api_key = os.getenv("STRIPE_SECRET_KEY")
else:
# Fallback to .env if no db session
stripe.api_key = os.getenv("STRIPE_SECRET_KEY")
try:
# Create Checkout Session
checkout_session = stripe.checkout.Session.create(
@@ -74,13 +98,14 @@ def create_checkout_session(
raise Exception(f"Stripe error: {str(e)}")
def verify_webhook_signature(payload: bytes, sig_header: str) -> dict:
def verify_webhook_signature(payload: bytes, sig_header: str, db=None) -> dict:
"""
Verify Stripe webhook signature and construct event.
Args:
payload: Raw webhook payload bytes
sig_header: Stripe signature header
db: Database session (optional, for reading webhook secret from database)
Returns:
dict: Verified webhook event
@@ -88,9 +113,32 @@ def verify_webhook_signature(payload: bytes, sig_header: str) -> dict:
Raises:
ValueError: If signature verification fails
"""
# Load webhook secret from database if available
webhook_secret = None
if db:
try:
from models import SystemSettings
from encryption_service import get_encryption_service
setting = db.query(SystemSettings).filter(
SystemSettings.setting_key == 'stripe_webhook_secret'
).first()
if setting and setting.setting_value:
encryption_service = get_encryption_service()
webhook_secret = encryption_service.decrypt(setting.setting_value)
except Exception as e:
print(f"Failed to read webhook secret from database: {e}")
webhook_secret = os.getenv("STRIPE_WEBHOOK_SECRET")
else:
webhook_secret = os.getenv("STRIPE_WEBHOOK_SECRET")
if not webhook_secret:
raise ValueError("STRIPE_WEBHOOK_SECRET not configured")
try:
event = stripe.Webhook.construct_event(
payload, sig_header, STRIPE_WEBHOOK_SECRET
payload, sig_header, webhook_secret
)
return event
except ValueError as e:

614
server.py
View File

@@ -60,11 +60,29 @@ async def lifespan(app: FastAPI):
# Shutdown
logger.info("Application shutdown")
# Environment detection
ENVIRONMENT = os.environ.get('ENVIRONMENT', 'development')
IS_PRODUCTION = ENVIRONMENT == 'production'
# Security: Disable API documentation in production
if IS_PRODUCTION:
print("🔒 Production mode: API documentation disabled")
app_config = {
"lifespan": lifespan,
"root_path": "/membership",
"docs_url": None, # Disable /docs
"redoc_url": None, # Disable /redoc
"openapi_url": None # Disable /openapi.json
}
else:
print("🔓 Development mode: API documentation enabled at /docs and /redoc")
app_config = {
"lifespan": lifespan,
"root_path": "/membership"
}
# Create the main app
app = FastAPI(
lifespan=lifespan,
root_path="/membership" # Configure for serving under /membership path
)
app = FastAPI(**app_config)
# Create a router with the /api prefix
api_router = APIRouter(prefix="/api")
@@ -496,6 +514,10 @@ class AcceptInvitationRequest(BaseModel):
zipcode: Optional[str] = None
date_of_birth: Optional[datetime] = None
class ChangeRoleRequest(BaseModel):
role: str
role_id: Optional[str] = None # For custom roles
# Auth Routes
@api_router.post("/auth/register")
async def register(request: RegisterRequest, db: Session = Depends(get_db)):
@@ -790,6 +812,53 @@ async def get_config():
"max_file_size_mb": int(max_file_size_mb)
}
@api_router.get("/diagnostics/cors")
async def cors_diagnostics(request: Request):
"""
CORS Diagnostics Endpoint
Shows current CORS configuration and request details for debugging
Use this to verify:
1. What origins are allowed
2. What origin is making the request
3. Whether CORS is properly configured
"""
cors_origins_env = os.environ.get('CORS_ORIGINS', '')
if cors_origins_env:
configured_origins = [origin.strip() for origin in cors_origins_env.split(',')]
cors_status = "✅ CONFIGURED"
else:
configured_origins = [
"http://localhost:3000",
"http://localhost:8000",
"http://127.0.0.1:3000",
"http://127.0.0.1:8000"
]
cors_status = "⚠️ NOT CONFIGURED (using defaults)"
request_origin = request.headers.get('origin', 'None')
origin_allowed = request_origin in configured_origins
return {
"cors_status": cors_status,
"environment": ENVIRONMENT,
"cors_origins_env_variable": cors_origins_env or "(not set)",
"allowed_origins": configured_origins,
"request_origin": request_origin,
"origin_allowed": origin_allowed,
"diagnosis": {
"cors_configured": bool(cors_origins_env),
"origin_matches": origin_allowed,
"issue": None if origin_allowed else f"Origin '{request_origin}' is not in allowed origins list"
},
"fix_instructions": None if origin_allowed else (
f"Add to backend .env file:\n"
f"CORS_ORIGINS={request_origin}"
f"{(',' + ','.join(configured_origins)) if cors_origins_env else ''}"
)
}
# User Profile Routes
@api_router.get("/users/profile", response_model=UserResponse)
async def get_profile(current_user: User = Depends(get_current_user)):
@@ -2462,6 +2531,102 @@ async def admin_reset_user_password(
return {"message": f"Password reset for {user.email}. Temporary password emailed."}
@api_router.put("/admin/users/{user_id}/role")
async def change_user_role(
user_id: str,
request: ChangeRoleRequest,
current_user: User = Depends(require_permission("users.edit")),
db: Session = Depends(get_db)
):
"""
Change an existing user's role with privilege escalation prevention.
Requires: users.edit permission
Rules:
- Superadmin: Can assign any role (including superadmin)
- Admin: Can assign admin, finance, member, guest, and non-elevated custom roles
- Admin CANNOT assign: superadmin or custom roles with elevated permissions
- Users CANNOT change their own role
"""
# 1. Fetch target user
target_user = db.query(User).filter(User.id == user_id).first()
if not target_user:
raise HTTPException(status_code=404, detail="User not found")
# 2. Prevent self-role-change
if str(target_user.id) == str(current_user.id):
raise HTTPException(
status_code=403,
detail="You cannot change your own role"
)
# 3. Validate new role
if request.role not in ['guest', 'member', 'admin', 'finance', 'superadmin']:
raise HTTPException(status_code=400, detail="Invalid role")
# 4. Privilege escalation check
if current_user.role != 'superadmin':
# Non-superadmin cannot assign superadmin role
if request.role == 'superadmin':
raise HTTPException(
status_code=403,
detail="Only superadmin can assign superadmin role"
)
# Check custom role elevation
if request.role_id:
custom_role = db.query(Role).filter(Role.id == request.role_id).first()
if not custom_role:
raise HTTPException(status_code=404, detail="Custom role not found")
# Check if custom role has elevated permissions
elevated_permissions = ['users.delete', 'roles.create', 'roles.edit',
'roles.delete', 'permissions.edit']
role_perms = db.query(Permission.name).join(RolePermission).filter(
RolePermission.role_id == custom_role.id,
Permission.name.in_(elevated_permissions)
).all()
if role_perms:
raise HTTPException(
status_code=403,
detail=f"Cannot assign role with elevated permissions: {custom_role.name}"
)
# 5. Update role with audit trail
old_role = target_user.role
old_role_id = target_user.role_id
target_user.role = request.role
target_user.role_id = request.role_id if request.role_id else None
target_user.role_changed_at = datetime.now(timezone.utc)
target_user.role_changed_by = current_user.id
target_user.updated_at = datetime.now(timezone.utc)
db.commit()
db.refresh(target_user)
# Log admin action
logger.info(
f"Admin {current_user.email} changed role for user {target_user.email} "
f"from {old_role} to {request.role}"
)
return {
"message": f"Role changed from {old_role} to {request.role}",
"user": {
"id": str(target_user.id),
"email": target_user.email,
"name": f"{target_user.first_name} {target_user.last_name}",
"old_role": old_role,
"new_role": target_user.role,
"changed_by": f"{current_user.first_name} {current_user.last_name}",
"changed_at": target_user.role_changed_at.isoformat()
}
}
@api_router.post("/admin/users/{user_id}/resend-verification")
async def admin_resend_verification(
user_id: str,
@@ -5242,6 +5407,101 @@ async def get_all_roles(
for role, count in roles_with_counts
]
@api_router.get("/admin/roles/assignable", response_model=List[RoleResponse])
async def get_assignable_roles(
current_user: User = Depends(require_permission("users.create")),
db: Session = Depends(get_db)
):
"""
Get roles that the current user can assign when inviting staff
- Superadmin: Can assign all roles
- Admin: Can assign admin, finance, and non-elevated custom roles
- Returns roles filtered by user's permission level
"""
from sqlalchemy import func
# Query all roles with permission counts
roles_query = db.query(
Role,
func.count(RolePermission.id).label('permission_count')
).outerjoin(RolePermission, Role.id == RolePermission.role_id)\
.group_by(Role.id)\
.order_by(Role.is_system_role.desc(), Role.name)
all_roles = roles_query.all()
# Superadmin can assign any role
if current_user.role == UserRole.superadmin:
return [
{
"id": str(role.id),
"code": role.code,
"name": role.name,
"description": role.description,
"is_system_role": role.is_system_role,
"created_at": role.created_at,
"updated_at": role.updated_at,
"permission_count": count
}
for role, count in all_roles
]
# Admin users can assign: admin, finance, and non-elevated custom roles
# Get admin role's permissions to check for elevation
admin_role = db.query(Role).filter(Role.code == "admin").first()
admin_permission_codes = set()
if admin_role:
admin_permissions = db.query(RolePermission).filter(
RolePermission.role_id == admin_role.id
).all()
admin_permission_codes = {rp.permission_id for rp in admin_permissions}
assignable_roles = []
for role, count in all_roles:
# Always exclude superadmin role
if role.code == "superadmin":
continue
# Include system roles: admin and finance
if role.is_system_role and role.code in ["admin", "finance"]:
assignable_roles.append({
"id": str(role.id),
"code": role.code,
"name": role.name,
"description": role.description,
"is_system_role": role.is_system_role,
"created_at": role.created_at,
"updated_at": role.updated_at,
"permission_count": count
})
continue
# For custom roles, check if they're elevated
if not role.is_system_role:
role_permissions = db.query(RolePermission).filter(
RolePermission.role_id == role.id
).all()
role_permission_ids = {rp.permission_id for rp in role_permissions}
# Check if custom role has permissions admin doesn't have (elevated)
has_elevated_permissions = bool(role_permission_ids - admin_permission_codes)
# Only include non-elevated custom roles
if not has_elevated_permissions:
assignable_roles.append({
"id": str(role.id),
"code": role.code,
"name": role.name,
"description": role.description,
"is_system_role": role.is_system_role,
"created_at": role.created_at,
"updated_at": role.updated_at,
"permission_count": count
})
return assignable_roles
@api_router.post("/admin/roles", response_model=RoleResponse)
async def create_role(
request: CreateRoleRequest,
@@ -6037,8 +6297,8 @@ async def stripe_webhook(request: Request, db: Session = Depends(get_db)):
raise HTTPException(status_code=400, detail="Missing stripe-signature header")
try:
# Verify webhook signature
event = verify_webhook_signature(payload, sig_header)
# Verify webhook signature (pass db for reading webhook secret from database)
event = verify_webhook_signature(payload, sig_header, db)
except ValueError as e:
logger.error(f"Webhook signature verification failed: {str(e)}")
raise HTTPException(status_code=400, detail=str(e))
@@ -6138,13 +6398,351 @@ async def stripe_webhook(request: Request, db: Session = Depends(get_db)):
return {"status": "success"}
# ============================================================================
# ADMIN SETTINGS ENDPOINTS
# ============================================================================
# Helper functions for system settings
def get_setting(db: Session, key: str, decrypt: bool = False) -> str | None:
"""
Get a system setting value from database.
Args:
db: Database session
key: Setting key to retrieve
decrypt: If True and setting_type is 'encrypted', decrypt the value
Returns:
Setting value or None if not found
"""
from models import SystemSettings, SettingType
from encryption_service import get_encryption_service
setting = db.query(SystemSettings).filter(SystemSettings.setting_key == key).first()
if not setting:
return None
value = setting.setting_value
if decrypt and setting.setting_type == SettingType.encrypted and value:
try:
encryption_service = get_encryption_service()
value = encryption_service.decrypt(value)
except Exception as e:
print(f"Failed to decrypt setting {key}: {e}")
return None
return value
def set_setting(
db: Session,
key: str,
value: str,
user_id: str,
setting_type: str = "plaintext",
description: str = None,
is_sensitive: bool = False,
encrypt: bool = False
) -> None:
"""
Set a system setting value in database.
Args:
db: Database session
key: Setting key
value: Setting value
user_id: ID of user making the change
setting_type: Type of setting (plaintext, encrypted, json)
description: Human-readable description
is_sensitive: Whether this is sensitive data
encrypt: If True, encrypt the value before storing
"""
from models import SystemSettings, SettingType
from encryption_service import get_encryption_service
# Encrypt value if requested
if encrypt and value:
encryption_service = get_encryption_service()
value = encryption_service.encrypt(value)
setting_type = "encrypted"
# Find or create setting
setting = db.query(SystemSettings).filter(SystemSettings.setting_key == key).first()
if setting:
# Update existing
setting.setting_value = value
setting.setting_type = SettingType[setting_type]
setting.updated_by = user_id
setting.updated_at = datetime.now(timezone.utc)
if description:
setting.description = description
setting.is_sensitive = is_sensitive
else:
# Create new
setting = SystemSettings(
setting_key=key,
setting_value=value,
setting_type=SettingType[setting_type],
description=description,
updated_by=user_id,
is_sensitive=is_sensitive
)
db.add(setting)
db.commit()
@api_router.get("/admin/settings/stripe/status")
async def get_stripe_status(
current_user: User = Depends(get_current_superadmin),
db: Session = Depends(get_db)
):
"""
Get Stripe integration status (superadmin only).
Returns:
- configured: Whether credentials exist in database
- secret_key_prefix: First 10 chars of secret key (for verification)
- webhook_configured: Whether webhook secret exists
- environment: test or live (based on key prefix)
- webhook_url: Full webhook URL for Stripe configuration
"""
import os
# Read from database
secret_key = get_setting(db, 'stripe_secret_key', decrypt=True)
webhook_secret = get_setting(db, 'stripe_webhook_secret', decrypt=True)
configured = bool(secret_key)
environment = 'unknown'
if secret_key:
if secret_key.startswith('sk_test_'):
environment = 'test'
elif secret_key.startswith('sk_live_'):
environment = 'live'
# Get backend URL from environment for webhook URL
# Try multiple environment variable patterns for flexibility
backend_url = (
os.environ.get('BACKEND_URL') or
os.environ.get('API_URL') or
f"http://{os.environ.get('HOST', 'localhost')}:{os.environ.get('PORT', '8000')}"
)
webhook_url = f"{backend_url}/api/webhooks/stripe"
return {
"configured": configured,
"secret_key_prefix": secret_key[:10] if secret_key else None,
"secret_key_set": bool(secret_key),
"webhook_secret_set": bool(webhook_secret),
"environment": environment,
"webhook_url": webhook_url,
"instructions": {
"location": "Database (system_settings table)",
"required_settings": [
"stripe_secret_key (sk_test_... or sk_live_...)",
"stripe_webhook_secret (whsec_...)"
],
"restart_required": "No - changes take effect immediately"
}
}
@api_router.post("/admin/settings/stripe/test-connection")
async def test_stripe_connection(
current_user: User = Depends(get_current_superadmin),
db: Session = Depends(get_db)
):
"""
Test Stripe API connection (superadmin only).
Performs a simple API call to verify credentials work.
"""
import stripe
# Read from database
secret_key = get_setting(db, 'stripe_secret_key', decrypt=True)
if not secret_key:
raise HTTPException(
status_code=400,
detail="STRIPE_SECRET_KEY not configured in database. Please configure Stripe settings first."
)
try:
stripe.api_key = secret_key
# Make a simple API call to test connection
balance = stripe.Balance.retrieve()
return {
"success": True,
"message": "Stripe connection successful",
"environment": "test" if secret_key.startswith('sk_test_') else "live",
"balance": {
"available": balance.available,
"pending": balance.pending
}
}
except stripe.error.AuthenticationError as e:
raise HTTPException(
status_code=401,
detail=f"Stripe authentication failed: {str(e)}"
)
except Exception as e:
raise HTTPException(
status_code=500,
detail=f"Stripe connection test failed: {str(e)}"
)
class UpdateStripeSettingsRequest(BaseModel):
"""Request model for updating Stripe settings"""
secret_key: str = Field(..., min_length=1, description="Stripe secret key (sk_test_... or sk_live_...)")
webhook_secret: str = Field(..., min_length=1, description="Stripe webhook secret (whsec_...)")
@api_router.put("/admin/settings/stripe")
async def update_stripe_settings(
request: UpdateStripeSettingsRequest,
current_user: User = Depends(get_current_superadmin),
db: Session = Depends(get_db)
):
"""
Update Stripe integration settings (superadmin only).
Stores Stripe credentials encrypted in the database.
Changes take effect immediately without server restart.
"""
# Validate secret key format
if not (request.secret_key.startswith('sk_test_') or request.secret_key.startswith('sk_live_')):
raise HTTPException(
status_code=400,
detail="Invalid Stripe secret key format. Must start with 'sk_test_' or 'sk_live_'"
)
# Validate webhook secret format
if not request.webhook_secret.startswith('whsec_'):
raise HTTPException(
status_code=400,
detail="Invalid Stripe webhook secret format. Must start with 'whsec_'"
)
try:
# Store secret key (encrypted)
set_setting(
db=db,
key='stripe_secret_key',
value=request.secret_key,
user_id=str(current_user.id),
description='Stripe API secret key for payment processing',
is_sensitive=True,
encrypt=True
)
# Store webhook secret (encrypted)
set_setting(
db=db,
key='stripe_webhook_secret',
value=request.webhook_secret,
user_id=str(current_user.id),
description='Stripe webhook secret for verifying webhook signatures',
is_sensitive=True,
encrypt=True
)
# Determine environment
environment = 'test' if request.secret_key.startswith('sk_test_') else 'live'
return {
"success": True,
"message": "Stripe settings updated successfully",
"environment": environment,
"updated_at": datetime.now(timezone.utc).isoformat(),
"updated_by": f"{current_user.first_name} {current_user.last_name}"
}
except Exception as e:
raise HTTPException(
status_code=500,
detail=f"Failed to update Stripe settings: {str(e)}"
)
# Include the router in the main app
app.include_router(api_router)
# ============================================================================
# MIDDLEWARE CONFIGURATION
# ============================================================================
# IMPORTANT: In FastAPI, middleware is executed in REVERSE order of addition
# Last added = First executed
# So we add them in this order: Security Headers -> CORS
# Execution order will be: CORS -> Security Headers
# Security Headers Middleware (Added first, executes second)
@app.middleware("http")
async def add_security_headers(request: Request, call_next):
response = await call_next(request)
# Security headers to protect against common vulnerabilities
security_headers = {
# Prevent clickjacking attacks
"X-Frame-Options": "DENY",
# Prevent MIME type sniffing
"X-Content-Type-Options": "nosniff",
# Enable XSS protection in older browsers
"X-XSS-Protection": "1; mode=block",
# Control referrer information
"Referrer-Policy": "strict-origin-when-cross-origin",
# Permissions policy (formerly Feature-Policy)
"Permissions-Policy": "geolocation=(), microphone=(), camera=()",
}
# Add HSTS header in production (force HTTPS)
if IS_PRODUCTION:
security_headers["Strict-Transport-Security"] = "max-age=31536000; includeSubDomains"
# Apply all security headers
for header, value in security_headers.items():
response.headers[header] = value
# Remove server identification headers (use del, not pop for MutableHeaders)
if "Server" in response.headers:
del response.headers["Server"]
return response
print(f"✓ Security headers configured (Production: {IS_PRODUCTION})")
# CORS Configuration (Added second, executes first)
cors_origins = os.environ.get('CORS_ORIGINS', '')
if cors_origins:
# Use explicitly configured origins
allowed_origins = [origin.strip() for origin in cors_origins.split(',')]
else:
# Default to common development origins if not configured
allowed_origins = [
"http://localhost:3000",
"http://localhost:8000",
"http://127.0.0.1:3000",
"http://127.0.0.1:8000"
]
print(f"⚠️ WARNING: CORS_ORIGINS not set. Using defaults: {allowed_origins}")
print("⚠️ For production, set CORS_ORIGINS in .env file!")
print(f"✓ CORS allowed origins: {allowed_origins}")
app.add_middleware(
CORSMiddleware,
allow_credentials=True,
allow_origins=os.environ.get('CORS_ORIGINS', '*').split(','),
allow_methods=["*"],
allow_origins=allowed_origins,
allow_methods=["GET", "POST", "PUT", "DELETE", "OPTIONS", "PATCH"],
allow_headers=["*"],
expose_headers=["*"],
max_age=600, # Cache preflight requests for 10 minutes
)

10
start.sh Normal file
View File

@@ -0,0 +1,10 @@
#!/usr/bin/env bash
# Exit immediately if a command fails
set -e
# Activate virtual environment
source .venv/bin/activate
# Start the backend
python -m uvicorn server:app --reload --port 8000