forked from andika/membership-be
123 lines
3.8 KiB
Python
123 lines
3.8 KiB
Python
"""
|
|
Encryption service for sensitive settings stored in database.
|
|
|
|
Uses Fernet symmetric encryption (AES-128 in CBC mode with HMAC authentication).
|
|
The encryption key is derived from a master secret stored in .env.
|
|
"""
|
|
|
|
import os
|
|
import base64
|
|
from cryptography.fernet import Fernet
|
|
from cryptography.hazmat.primitives import hashes
|
|
from cryptography.hazmat.primitives.kdf.pbkdf2 import PBKDF2HMAC
|
|
from cryptography.hazmat.backends import default_backend
|
|
|
|
|
|
class EncryptionService:
|
|
"""Service for encrypting and decrypting sensitive configuration values"""
|
|
|
|
def __init__(self):
|
|
# Get master encryption key from environment
|
|
# This should be a long, random string (e.g., 64 characters)
|
|
# Generate one with: python -c "import secrets; print(secrets.token_urlsafe(64))"
|
|
self.master_secret = os.environ.get('SETTINGS_ENCRYPTION_KEY')
|
|
|
|
if not self.master_secret:
|
|
raise ValueError(
|
|
"SETTINGS_ENCRYPTION_KEY environment variable not set. "
|
|
"Generate one with: python -c \"import secrets; print(secrets.token_urlsafe(64))\""
|
|
)
|
|
|
|
# Derive encryption key from master secret using PBKDF2HMAC
|
|
# This adds an extra layer of security
|
|
kdf = PBKDF2HMAC(
|
|
algorithm=hashes.SHA256(),
|
|
length=32,
|
|
salt=b'systemsettings', # Fixed salt (OK for key derivation from strong secret)
|
|
iterations=100000,
|
|
backend=default_backend()
|
|
)
|
|
key = base64.urlsafe_b64encode(kdf.derive(self.master_secret.encode()))
|
|
self.cipher = Fernet(key)
|
|
|
|
def encrypt(self, plaintext: str) -> str:
|
|
"""
|
|
Encrypt a plaintext string.
|
|
|
|
Args:
|
|
plaintext: The string to encrypt
|
|
|
|
Returns:
|
|
Base64-encoded encrypted string
|
|
"""
|
|
if not plaintext:
|
|
return ""
|
|
|
|
encrypted_bytes = self.cipher.encrypt(plaintext.encode())
|
|
return encrypted_bytes.decode('utf-8')
|
|
|
|
def decrypt(self, encrypted: str) -> str:
|
|
"""
|
|
Decrypt an encrypted string.
|
|
|
|
Args:
|
|
encrypted: The base64-encoded encrypted string
|
|
|
|
Returns:
|
|
Decrypted plaintext string
|
|
|
|
Raises:
|
|
cryptography.fernet.InvalidToken: If decryption fails (wrong key or corrupted data)
|
|
"""
|
|
if not encrypted:
|
|
return ""
|
|
|
|
decrypted_bytes = self.cipher.decrypt(encrypted.encode())
|
|
return decrypted_bytes.decode('utf-8')
|
|
|
|
def is_encrypted(self, value: str) -> bool:
|
|
"""
|
|
Check if a value appears to be encrypted (starts with Fernet token format).
|
|
|
|
This is a heuristic check - not 100% reliable but useful for validation.
|
|
|
|
Args:
|
|
value: String to check
|
|
|
|
Returns:
|
|
True if value looks like a Fernet token
|
|
"""
|
|
if not value:
|
|
return False
|
|
|
|
# Fernet tokens are base64-encoded and start with version byte (gAAAAA...)
|
|
# They're always > 60 characters
|
|
try:
|
|
return len(value) > 60 and value.startswith('gAAAAA')
|
|
except:
|
|
return False
|
|
|
|
|
|
# Global encryption service instance
|
|
# Initialize on module import so it fails fast if encryption key is missing
|
|
try:
|
|
encryption_service = EncryptionService()
|
|
except ValueError as e:
|
|
print(f"WARNING: {e}")
|
|
print("Encryption service will not be available.")
|
|
encryption_service = None
|
|
|
|
|
|
def get_encryption_service() -> EncryptionService:
|
|
"""
|
|
Get the global encryption service instance.
|
|
|
|
Raises:
|
|
ValueError: If encryption service is not initialized (missing SETTINGS_ENCRYPTION_KEY)
|
|
"""
|
|
if encryption_service is None:
|
|
raise ValueError(
|
|
"Encryption service not initialized. Set SETTINGS_ENCRYPTION_KEY environment variable."
|
|
)
|
|
return encryption_service
|