forked from andika/membership-be
244 lines
7.2 KiB
Python
244 lines
7.2 KiB
Python
"""
|
|
Cloudflare R2 Storage Service
|
|
Handles file uploads, downloads, and deletions using S3-compatible API
|
|
"""
|
|
|
|
import boto3
|
|
from botocore.client import Config
|
|
from botocore.exceptions import ClientError
|
|
import os
|
|
import uuid
|
|
import magic
|
|
from typing import Optional, BinaryIO
|
|
from fastapi import UploadFile, HTTPException
|
|
from pathlib import Path
|
|
|
|
|
|
class R2Storage:
|
|
"""
|
|
Cloudflare R2 Storage Service using S3-compatible API
|
|
"""
|
|
|
|
# Allowed MIME types for uploads
|
|
ALLOWED_IMAGE_TYPES = {
|
|
'image/jpeg': ['.jpg', '.jpeg'],
|
|
'image/png': ['.png'],
|
|
'image/webp': ['.webp'],
|
|
'image/gif': ['.gif']
|
|
}
|
|
|
|
ALLOWED_DOCUMENT_TYPES = {
|
|
'application/pdf': ['.pdf'],
|
|
'application/msword': ['.doc'],
|
|
'application/vnd.openxmlformats-officedocument.wordprocessingml.document': ['.docx'],
|
|
'application/vnd.ms-excel': ['.xls'],
|
|
'application/vnd.openxmlformats-officedocument.spreadsheetml.sheet': ['.xlsx']
|
|
}
|
|
|
|
def __init__(self):
|
|
"""Initialize R2 client with credentials from environment"""
|
|
self.account_id = os.getenv('R2_ACCOUNT_ID')
|
|
self.access_key = os.getenv('R2_ACCESS_KEY_ID')
|
|
self.secret_key = os.getenv('R2_SECRET_ACCESS_KEY')
|
|
self.bucket_name = os.getenv('R2_BUCKET_NAME')
|
|
self.public_url = os.getenv('R2_PUBLIC_URL')
|
|
|
|
if not all([self.account_id, self.access_key, self.secret_key, self.bucket_name]):
|
|
raise ValueError("R2 credentials not properly configured in environment variables")
|
|
|
|
# Initialize S3 client for R2
|
|
self.client = boto3.client(
|
|
's3',
|
|
endpoint_url=f'https://{self.account_id}.r2.cloudflarestorage.com',
|
|
aws_access_key_id=self.access_key,
|
|
aws_secret_access_key=self.secret_key,
|
|
config=Config(signature_version='s3v4'),
|
|
)
|
|
|
|
async def upload_file(
|
|
self,
|
|
file: UploadFile,
|
|
folder: str,
|
|
allowed_types: Optional[dict] = None,
|
|
max_size_bytes: Optional[int] = None
|
|
) -> tuple[str, str, int]:
|
|
"""
|
|
Upload a file to R2 storage
|
|
|
|
Args:
|
|
file: FastAPI UploadFile object
|
|
folder: Folder path in R2 (e.g., 'profiles', 'gallery/event-id')
|
|
allowed_types: Dict of allowed MIME types and extensions
|
|
max_size_bytes: Maximum file size in bytes
|
|
|
|
Returns:
|
|
tuple: (public_url, object_key, file_size_bytes)
|
|
|
|
Raises:
|
|
HTTPException: If upload fails or file is invalid
|
|
"""
|
|
try:
|
|
# Read file content
|
|
content = await file.read()
|
|
file_size = len(content)
|
|
|
|
# Check file size
|
|
if max_size_bytes and file_size > max_size_bytes:
|
|
max_mb = max_size_bytes / (1024 * 1024)
|
|
actual_mb = file_size / (1024 * 1024)
|
|
raise HTTPException(
|
|
status_code=413,
|
|
detail=f"File too large: {actual_mb:.2f}MB exceeds limit of {max_mb:.2f}MB"
|
|
)
|
|
|
|
# Detect MIME type
|
|
mime = magic.from_buffer(content, mime=True)
|
|
|
|
# Validate MIME type
|
|
if allowed_types and mime not in allowed_types:
|
|
allowed_list = ', '.join(allowed_types.keys())
|
|
raise HTTPException(
|
|
status_code=400,
|
|
detail=f"Invalid file type: {mime}. Allowed types: {allowed_list}"
|
|
)
|
|
|
|
# Generate unique filename
|
|
file_extension = Path(file.filename).suffix.lower()
|
|
if not file_extension and allowed_types and mime in allowed_types:
|
|
file_extension = allowed_types[mime][0]
|
|
|
|
unique_filename = f"{uuid.uuid4()}{file_extension}"
|
|
object_key = f"{folder}/{unique_filename}"
|
|
|
|
# Upload to R2
|
|
self.client.put_object(
|
|
Bucket=self.bucket_name,
|
|
Key=object_key,
|
|
Body=content,
|
|
ContentType=mime,
|
|
ContentLength=file_size
|
|
)
|
|
|
|
# Generate public URL
|
|
public_url = self.get_public_url(object_key)
|
|
|
|
return public_url, object_key, file_size
|
|
|
|
except ClientError as e:
|
|
raise HTTPException(
|
|
status_code=500,
|
|
detail=f"Failed to upload file to R2: {str(e)}"
|
|
)
|
|
except Exception as e:
|
|
raise HTTPException(
|
|
status_code=500,
|
|
detail=f"Upload error: {str(e)}"
|
|
)
|
|
|
|
async def delete_file(self, object_key: str) -> bool:
|
|
"""
|
|
Delete a file from R2 storage
|
|
|
|
Args:
|
|
object_key: The S3 object key (path) of the file
|
|
|
|
Returns:
|
|
bool: True if successful
|
|
|
|
Raises:
|
|
HTTPException: If deletion fails
|
|
"""
|
|
try:
|
|
self.client.delete_object(
|
|
Bucket=self.bucket_name,
|
|
Key=object_key
|
|
)
|
|
return True
|
|
|
|
except ClientError as e:
|
|
raise HTTPException(
|
|
status_code=500,
|
|
detail=f"Failed to delete file from R2: {str(e)}"
|
|
)
|
|
|
|
def get_public_url(self, object_key: str) -> str:
|
|
"""
|
|
Generate public URL for an R2 object
|
|
|
|
Args:
|
|
object_key: The S3 object key (path) of the file
|
|
|
|
Returns:
|
|
str: Public URL
|
|
"""
|
|
if self.public_url:
|
|
# Use custom domain if configured
|
|
return f"{self.public_url}/{object_key}"
|
|
else:
|
|
# Use default R2 public URL
|
|
return f"https://{self.bucket_name}.{self.account_id}.r2.cloudflarestorage.com/{object_key}"
|
|
|
|
async def get_file_size(self, object_key: str) -> int:
|
|
"""
|
|
Get the size of a file in R2
|
|
|
|
Args:
|
|
object_key: The S3 object key (path) of the file
|
|
|
|
Returns:
|
|
int: File size in bytes
|
|
|
|
Raises:
|
|
HTTPException: If file not found
|
|
"""
|
|
try:
|
|
response = self.client.head_object(
|
|
Bucket=self.bucket_name,
|
|
Key=object_key
|
|
)
|
|
return response['ContentLength']
|
|
|
|
except ClientError as e:
|
|
if e.response['Error']['Code'] == '404':
|
|
raise HTTPException(status_code=404, detail="File not found")
|
|
raise HTTPException(
|
|
status_code=500,
|
|
detail=f"Failed to get file info: {str(e)}"
|
|
)
|
|
|
|
async def file_exists(self, object_key: str) -> bool:
|
|
"""
|
|
Check if a file exists in R2
|
|
|
|
Args:
|
|
object_key: The S3 object key (path) of the file
|
|
|
|
Returns:
|
|
bool: True if file exists, False otherwise
|
|
"""
|
|
try:
|
|
self.client.head_object(
|
|
Bucket=self.bucket_name,
|
|
Key=object_key
|
|
)
|
|
return True
|
|
except ClientError:
|
|
return False
|
|
|
|
|
|
# Singleton instance
|
|
_r2_storage = None
|
|
|
|
|
|
def get_r2_storage() -> R2Storage:
|
|
"""
|
|
Get singleton instance of R2Storage
|
|
|
|
Returns:
|
|
R2Storage: Initialized R2 storage service
|
|
"""
|
|
global _r2_storage
|
|
if _r2_storage is None:
|
|
_r2_storage = R2Storage()
|
|
return _r2_storage
|