- Add Settings menu for Stripe configuration- In the Member Profile page, Superadmin can assign new Role to the member- Stripe Configuration is now stored with encryption in Database
This commit is contained in:
375
server.py
375
server.py
@@ -514,6 +514,10 @@ class AcceptInvitationRequest(BaseModel):
|
||||
zipcode: Optional[str] = None
|
||||
date_of_birth: Optional[datetime] = None
|
||||
|
||||
class ChangeRoleRequest(BaseModel):
|
||||
role: str
|
||||
role_id: Optional[str] = None # For custom roles
|
||||
|
||||
# Auth Routes
|
||||
@api_router.post("/auth/register")
|
||||
async def register(request: RegisterRequest, db: Session = Depends(get_db)):
|
||||
@@ -2527,6 +2531,102 @@ async def admin_reset_user_password(
|
||||
|
||||
return {"message": f"Password reset for {user.email}. Temporary password emailed."}
|
||||
|
||||
@api_router.put("/admin/users/{user_id}/role")
|
||||
async def change_user_role(
|
||||
user_id: str,
|
||||
request: ChangeRoleRequest,
|
||||
current_user: User = Depends(require_permission("users.edit")),
|
||||
db: Session = Depends(get_db)
|
||||
):
|
||||
"""
|
||||
Change an existing user's role with privilege escalation prevention.
|
||||
|
||||
Requires: users.edit permission
|
||||
|
||||
Rules:
|
||||
- Superadmin: Can assign any role (including superadmin)
|
||||
- Admin: Can assign admin, finance, member, guest, and non-elevated custom roles
|
||||
- Admin CANNOT assign: superadmin or custom roles with elevated permissions
|
||||
- Users CANNOT change their own role
|
||||
"""
|
||||
|
||||
# 1. Fetch target user
|
||||
target_user = db.query(User).filter(User.id == user_id).first()
|
||||
if not target_user:
|
||||
raise HTTPException(status_code=404, detail="User not found")
|
||||
|
||||
# 2. Prevent self-role-change
|
||||
if str(target_user.id) == str(current_user.id):
|
||||
raise HTTPException(
|
||||
status_code=403,
|
||||
detail="You cannot change your own role"
|
||||
)
|
||||
|
||||
# 3. Validate new role
|
||||
if request.role not in ['guest', 'member', 'admin', 'finance', 'superadmin']:
|
||||
raise HTTPException(status_code=400, detail="Invalid role")
|
||||
|
||||
# 4. Privilege escalation check
|
||||
if current_user.role != 'superadmin':
|
||||
# Non-superadmin cannot assign superadmin role
|
||||
if request.role == 'superadmin':
|
||||
raise HTTPException(
|
||||
status_code=403,
|
||||
detail="Only superadmin can assign superadmin role"
|
||||
)
|
||||
|
||||
# Check custom role elevation
|
||||
if request.role_id:
|
||||
custom_role = db.query(Role).filter(Role.id == request.role_id).first()
|
||||
if not custom_role:
|
||||
raise HTTPException(status_code=404, detail="Custom role not found")
|
||||
|
||||
# Check if custom role has elevated permissions
|
||||
elevated_permissions = ['users.delete', 'roles.create', 'roles.edit',
|
||||
'roles.delete', 'permissions.edit']
|
||||
role_perms = db.query(Permission.name).join(RolePermission).filter(
|
||||
RolePermission.role_id == custom_role.id,
|
||||
Permission.name.in_(elevated_permissions)
|
||||
).all()
|
||||
|
||||
if role_perms:
|
||||
raise HTTPException(
|
||||
status_code=403,
|
||||
detail=f"Cannot assign role with elevated permissions: {custom_role.name}"
|
||||
)
|
||||
|
||||
# 5. Update role with audit trail
|
||||
old_role = target_user.role
|
||||
old_role_id = target_user.role_id
|
||||
|
||||
target_user.role = request.role
|
||||
target_user.role_id = request.role_id if request.role_id else None
|
||||
target_user.role_changed_at = datetime.now(timezone.utc)
|
||||
target_user.role_changed_by = current_user.id
|
||||
target_user.updated_at = datetime.now(timezone.utc)
|
||||
|
||||
db.commit()
|
||||
db.refresh(target_user)
|
||||
|
||||
# Log admin action
|
||||
logger.info(
|
||||
f"Admin {current_user.email} changed role for user {target_user.email} "
|
||||
f"from {old_role} to {request.role}"
|
||||
)
|
||||
|
||||
return {
|
||||
"message": f"Role changed from {old_role} to {request.role}",
|
||||
"user": {
|
||||
"id": str(target_user.id),
|
||||
"email": target_user.email,
|
||||
"name": f"{target_user.first_name} {target_user.last_name}",
|
||||
"old_role": old_role,
|
||||
"new_role": target_user.role,
|
||||
"changed_by": f"{current_user.first_name} {current_user.last_name}",
|
||||
"changed_at": target_user.role_changed_at.isoformat()
|
||||
}
|
||||
}
|
||||
|
||||
@api_router.post("/admin/users/{user_id}/resend-verification")
|
||||
async def admin_resend_verification(
|
||||
user_id: str,
|
||||
@@ -6197,8 +6297,8 @@ async def stripe_webhook(request: Request, db: Session = Depends(get_db)):
|
||||
raise HTTPException(status_code=400, detail="Missing stripe-signature header")
|
||||
|
||||
try:
|
||||
# Verify webhook signature
|
||||
event = verify_webhook_signature(payload, sig_header)
|
||||
# Verify webhook signature (pass db for reading webhook secret from database)
|
||||
event = verify_webhook_signature(payload, sig_header, db)
|
||||
except ValueError as e:
|
||||
logger.error(f"Webhook signature verification failed: {str(e)}")
|
||||
raise HTTPException(status_code=400, detail=str(e))
|
||||
@@ -6298,6 +6398,277 @@ async def stripe_webhook(request: Request, db: Session = Depends(get_db)):
|
||||
|
||||
return {"status": "success"}
|
||||
|
||||
# ============================================================================
|
||||
# ADMIN SETTINGS ENDPOINTS
|
||||
# ============================================================================
|
||||
|
||||
# Helper functions for system settings
|
||||
def get_setting(db: Session, key: str, decrypt: bool = False) -> str | None:
|
||||
"""
|
||||
Get a system setting value from database.
|
||||
|
||||
Args:
|
||||
db: Database session
|
||||
key: Setting key to retrieve
|
||||
decrypt: If True and setting_type is 'encrypted', decrypt the value
|
||||
|
||||
Returns:
|
||||
Setting value or None if not found
|
||||
"""
|
||||
from models import SystemSettings, SettingType
|
||||
from encryption_service import get_encryption_service
|
||||
|
||||
setting = db.query(SystemSettings).filter(SystemSettings.setting_key == key).first()
|
||||
if not setting:
|
||||
return None
|
||||
|
||||
value = setting.setting_value
|
||||
if decrypt and setting.setting_type == SettingType.encrypted and value:
|
||||
try:
|
||||
encryption_service = get_encryption_service()
|
||||
value = encryption_service.decrypt(value)
|
||||
except Exception as e:
|
||||
print(f"Failed to decrypt setting {key}: {e}")
|
||||
return None
|
||||
|
||||
return value
|
||||
|
||||
|
||||
def set_setting(
|
||||
db: Session,
|
||||
key: str,
|
||||
value: str,
|
||||
user_id: str,
|
||||
setting_type: str = "plaintext",
|
||||
description: str = None,
|
||||
is_sensitive: bool = False,
|
||||
encrypt: bool = False
|
||||
) -> None:
|
||||
"""
|
||||
Set a system setting value in database.
|
||||
|
||||
Args:
|
||||
db: Database session
|
||||
key: Setting key
|
||||
value: Setting value
|
||||
user_id: ID of user making the change
|
||||
setting_type: Type of setting (plaintext, encrypted, json)
|
||||
description: Human-readable description
|
||||
is_sensitive: Whether this is sensitive data
|
||||
encrypt: If True, encrypt the value before storing
|
||||
"""
|
||||
from models import SystemSettings, SettingType
|
||||
from encryption_service import get_encryption_service
|
||||
|
||||
# Encrypt value if requested
|
||||
if encrypt and value:
|
||||
encryption_service = get_encryption_service()
|
||||
value = encryption_service.encrypt(value)
|
||||
setting_type = "encrypted"
|
||||
|
||||
# Find or create setting
|
||||
setting = db.query(SystemSettings).filter(SystemSettings.setting_key == key).first()
|
||||
|
||||
if setting:
|
||||
# Update existing
|
||||
setting.setting_value = value
|
||||
setting.setting_type = SettingType[setting_type]
|
||||
setting.updated_by = user_id
|
||||
setting.updated_at = datetime.now(timezone.utc)
|
||||
if description:
|
||||
setting.description = description
|
||||
setting.is_sensitive = is_sensitive
|
||||
else:
|
||||
# Create new
|
||||
setting = SystemSettings(
|
||||
setting_key=key,
|
||||
setting_value=value,
|
||||
setting_type=SettingType[setting_type],
|
||||
description=description,
|
||||
updated_by=user_id,
|
||||
is_sensitive=is_sensitive
|
||||
)
|
||||
db.add(setting)
|
||||
|
||||
db.commit()
|
||||
|
||||
@api_router.get("/admin/settings/stripe/status")
|
||||
async def get_stripe_status(
|
||||
current_user: User = Depends(get_current_superadmin),
|
||||
db: Session = Depends(get_db)
|
||||
):
|
||||
"""
|
||||
Get Stripe integration status (superadmin only).
|
||||
|
||||
Returns:
|
||||
- configured: Whether credentials exist in database
|
||||
- secret_key_prefix: First 10 chars of secret key (for verification)
|
||||
- webhook_configured: Whether webhook secret exists
|
||||
- environment: test or live (based on key prefix)
|
||||
- webhook_url: Full webhook URL for Stripe configuration
|
||||
"""
|
||||
import os
|
||||
|
||||
# Read from database
|
||||
secret_key = get_setting(db, 'stripe_secret_key', decrypt=True)
|
||||
webhook_secret = get_setting(db, 'stripe_webhook_secret', decrypt=True)
|
||||
|
||||
configured = bool(secret_key)
|
||||
environment = 'unknown'
|
||||
|
||||
if secret_key:
|
||||
if secret_key.startswith('sk_test_'):
|
||||
environment = 'test'
|
||||
elif secret_key.startswith('sk_live_'):
|
||||
environment = 'live'
|
||||
|
||||
# Get backend URL from environment for webhook URL
|
||||
# Try multiple environment variable patterns for flexibility
|
||||
backend_url = (
|
||||
os.environ.get('BACKEND_URL') or
|
||||
os.environ.get('API_URL') or
|
||||
f"http://{os.environ.get('HOST', 'localhost')}:{os.environ.get('PORT', '8000')}"
|
||||
)
|
||||
webhook_url = f"{backend_url}/api/webhooks/stripe"
|
||||
|
||||
return {
|
||||
"configured": configured,
|
||||
"secret_key_prefix": secret_key[:10] if secret_key else None,
|
||||
"secret_key_set": bool(secret_key),
|
||||
"webhook_secret_set": bool(webhook_secret),
|
||||
"environment": environment,
|
||||
"webhook_url": webhook_url,
|
||||
"instructions": {
|
||||
"location": "Database (system_settings table)",
|
||||
"required_settings": [
|
||||
"stripe_secret_key (sk_test_... or sk_live_...)",
|
||||
"stripe_webhook_secret (whsec_...)"
|
||||
],
|
||||
"restart_required": "No - changes take effect immediately"
|
||||
}
|
||||
}
|
||||
|
||||
@api_router.post("/admin/settings/stripe/test-connection")
|
||||
async def test_stripe_connection(
|
||||
current_user: User = Depends(get_current_superadmin),
|
||||
db: Session = Depends(get_db)
|
||||
):
|
||||
"""
|
||||
Test Stripe API connection (superadmin only).
|
||||
|
||||
Performs a simple API call to verify credentials work.
|
||||
"""
|
||||
import stripe
|
||||
|
||||
# Read from database
|
||||
secret_key = get_setting(db, 'stripe_secret_key', decrypt=True)
|
||||
|
||||
if not secret_key:
|
||||
raise HTTPException(
|
||||
status_code=400,
|
||||
detail="STRIPE_SECRET_KEY not configured in database. Please configure Stripe settings first."
|
||||
)
|
||||
|
||||
try:
|
||||
stripe.api_key = secret_key
|
||||
|
||||
# Make a simple API call to test connection
|
||||
balance = stripe.Balance.retrieve()
|
||||
|
||||
return {
|
||||
"success": True,
|
||||
"message": "Stripe connection successful",
|
||||
"environment": "test" if secret_key.startswith('sk_test_') else "live",
|
||||
"balance": {
|
||||
"available": balance.available,
|
||||
"pending": balance.pending
|
||||
}
|
||||
}
|
||||
except stripe.error.AuthenticationError as e:
|
||||
raise HTTPException(
|
||||
status_code=401,
|
||||
detail=f"Stripe authentication failed: {str(e)}"
|
||||
)
|
||||
except Exception as e:
|
||||
raise HTTPException(
|
||||
status_code=500,
|
||||
detail=f"Stripe connection test failed: {str(e)}"
|
||||
)
|
||||
|
||||
|
||||
class UpdateStripeSettingsRequest(BaseModel):
|
||||
"""Request model for updating Stripe settings"""
|
||||
secret_key: str = Field(..., min_length=1, description="Stripe secret key (sk_test_... or sk_live_...)")
|
||||
webhook_secret: str = Field(..., min_length=1, description="Stripe webhook secret (whsec_...)")
|
||||
|
||||
|
||||
@api_router.put("/admin/settings/stripe")
|
||||
async def update_stripe_settings(
|
||||
request: UpdateStripeSettingsRequest,
|
||||
current_user: User = Depends(get_current_superadmin),
|
||||
db: Session = Depends(get_db)
|
||||
):
|
||||
"""
|
||||
Update Stripe integration settings (superadmin only).
|
||||
|
||||
Stores Stripe credentials encrypted in the database.
|
||||
Changes take effect immediately without server restart.
|
||||
"""
|
||||
# Validate secret key format
|
||||
if not (request.secret_key.startswith('sk_test_') or request.secret_key.startswith('sk_live_')):
|
||||
raise HTTPException(
|
||||
status_code=400,
|
||||
detail="Invalid Stripe secret key format. Must start with 'sk_test_' or 'sk_live_'"
|
||||
)
|
||||
|
||||
# Validate webhook secret format
|
||||
if not request.webhook_secret.startswith('whsec_'):
|
||||
raise HTTPException(
|
||||
status_code=400,
|
||||
detail="Invalid Stripe webhook secret format. Must start with 'whsec_'"
|
||||
)
|
||||
|
||||
try:
|
||||
# Store secret key (encrypted)
|
||||
set_setting(
|
||||
db=db,
|
||||
key='stripe_secret_key',
|
||||
value=request.secret_key,
|
||||
user_id=str(current_user.id),
|
||||
description='Stripe API secret key for payment processing',
|
||||
is_sensitive=True,
|
||||
encrypt=True
|
||||
)
|
||||
|
||||
# Store webhook secret (encrypted)
|
||||
set_setting(
|
||||
db=db,
|
||||
key='stripe_webhook_secret',
|
||||
value=request.webhook_secret,
|
||||
user_id=str(current_user.id),
|
||||
description='Stripe webhook secret for verifying webhook signatures',
|
||||
is_sensitive=True,
|
||||
encrypt=True
|
||||
)
|
||||
|
||||
# Determine environment
|
||||
environment = 'test' if request.secret_key.startswith('sk_test_') else 'live'
|
||||
|
||||
return {
|
||||
"success": True,
|
||||
"message": "Stripe settings updated successfully",
|
||||
"environment": environment,
|
||||
"updated_at": datetime.now(timezone.utc).isoformat(),
|
||||
"updated_by": f"{current_user.first_name} {current_user.last_name}"
|
||||
}
|
||||
|
||||
except Exception as e:
|
||||
raise HTTPException(
|
||||
status_code=500,
|
||||
detail=f"Failed to update Stripe settings: {str(e)}"
|
||||
)
|
||||
|
||||
|
||||
# Include the router in the main app
|
||||
app.include_router(api_router)
|
||||
|
||||
|
||||
Reference in New Issue
Block a user