""" Cloudflare R2 Storage Service Handles file uploads, downloads, and deletions using S3-compatible API """ import boto3 from botocore.client import Config from botocore.exceptions import ClientError import os import uuid import magic from typing import Optional, BinaryIO from fastapi import UploadFile, HTTPException from pathlib import Path class R2Storage: """ Cloudflare R2 Storage Service using S3-compatible API """ # Allowed MIME types for uploads ALLOWED_IMAGE_TYPES = { 'image/jpeg': ['.jpg', '.jpeg'], 'image/png': ['.png'], 'image/webp': ['.webp'], 'image/gif': ['.gif'] } ALLOWED_DOCUMENT_TYPES = { 'application/pdf': ['.pdf'], 'application/msword': ['.doc'], 'application/vnd.openxmlformats-officedocument.wordprocessingml.document': ['.docx'], 'application/vnd.ms-excel': ['.xls'], 'application/vnd.openxmlformats-officedocument.spreadsheetml.sheet': ['.xlsx'] } def __init__(self): """Initialize R2 client with credentials from environment""" self.account_id = os.getenv('R2_ACCOUNT_ID') self.access_key = os.getenv('R2_ACCESS_KEY_ID') self.secret_key = os.getenv('R2_SECRET_ACCESS_KEY') self.bucket_name = os.getenv('R2_BUCKET_NAME') self.public_url = os.getenv('R2_PUBLIC_URL') if not all([self.account_id, self.access_key, self.secret_key, self.bucket_name]): raise ValueError("R2 credentials not properly configured in environment variables") # Initialize S3 client for R2 self.client = boto3.client( 's3', endpoint_url=f'https://{self.account_id}.r2.cloudflarestorage.com', aws_access_key_id=self.access_key, aws_secret_access_key=self.secret_key, config=Config(signature_version='s3v4'), ) async def upload_file( self, file: UploadFile, folder: str, allowed_types: Optional[dict] = None, max_size_bytes: Optional[int] = None ) -> tuple[str, str, int]: """ Upload a file to R2 storage Args: file: FastAPI UploadFile object folder: Folder path in R2 (e.g., 'profiles', 'gallery/event-id') allowed_types: Dict of allowed MIME types and extensions max_size_bytes: Maximum file size in bytes Returns: tuple: (public_url, object_key, file_size_bytes) Raises: HTTPException: If upload fails or file is invalid """ try: # Read file content content = await file.read() file_size = len(content) # Check file size if max_size_bytes and file_size > max_size_bytes: max_mb = max_size_bytes / (1024 * 1024) actual_mb = file_size / (1024 * 1024) raise HTTPException( status_code=413, detail=f"File too large: {actual_mb:.2f}MB exceeds limit of {max_mb:.2f}MB" ) # Detect MIME type mime = magic.from_buffer(content, mime=True) # Validate MIME type if allowed_types and mime not in allowed_types: allowed_list = ', '.join(allowed_types.keys()) raise HTTPException( status_code=400, detail=f"Invalid file type: {mime}. Allowed types: {allowed_list}" ) # Generate unique filename file_extension = Path(file.filename).suffix.lower() if not file_extension and allowed_types and mime in allowed_types: file_extension = allowed_types[mime][0] unique_filename = f"{uuid.uuid4()}{file_extension}" object_key = f"{folder}/{unique_filename}" # Upload to R2 self.client.put_object( Bucket=self.bucket_name, Key=object_key, Body=content, ContentType=mime, ContentLength=file_size ) # Generate public URL public_url = self.get_public_url(object_key) return public_url, object_key, file_size except ClientError as e: raise HTTPException( status_code=500, detail=f"Failed to upload file to R2: {str(e)}" ) except Exception as e: raise HTTPException( status_code=500, detail=f"Upload error: {str(e)}" ) async def delete_file(self, object_key: str) -> bool: """ Delete a file from R2 storage Args: object_key: The S3 object key (path) of the file Returns: bool: True if successful Raises: HTTPException: If deletion fails """ try: self.client.delete_object( Bucket=self.bucket_name, Key=object_key ) return True except ClientError as e: raise HTTPException( status_code=500, detail=f"Failed to delete file from R2: {str(e)}" ) def get_public_url(self, object_key: str) -> str: """ Generate public URL for an R2 object Args: object_key: The S3 object key (path) of the file Returns: str: Public URL """ if self.public_url: # Use custom domain if configured return f"{self.public_url}/{object_key}" else: # Use default R2 public URL return f"https://{self.bucket_name}.{self.account_id}.r2.cloudflarestorage.com/{object_key}" async def get_file_size(self, object_key: str) -> int: """ Get the size of a file in R2 Args: object_key: The S3 object key (path) of the file Returns: int: File size in bytes Raises: HTTPException: If file not found """ try: response = self.client.head_object( Bucket=self.bucket_name, Key=object_key ) return response['ContentLength'] except ClientError as e: if e.response['Error']['Code'] == '404': raise HTTPException(status_code=404, detail="File not found") raise HTTPException( status_code=500, detail=f"Failed to get file info: {str(e)}" ) async def file_exists(self, object_key: str) -> bool: """ Check if a file exists in R2 Args: object_key: The S3 object key (path) of the file Returns: bool: True if file exists, False otherwise """ try: self.client.head_object( Bucket=self.bucket_name, Key=object_key ) return True except ClientError: return False # Singleton instance _r2_storage = None def get_r2_storage() -> R2Storage: """ Get singleton instance of R2Storage Returns: R2Storage: Initialized R2 storage service """ global _r2_storage if _r2_storage is None: _r2_storage = R2Storage() return _r2_storage