first commit

This commit is contained in:
Koncept Kit
2025-12-05 16:43:37 +07:00
commit 6ef7685ade
26 changed files with 2191 additions and 0 deletions

14
.env Normal file
View File

@@ -0,0 +1,14 @@
DATABASE_URL=postgresql://postgres:RchhcpaUKZuZuMOvB5kwCP1weLBnAG6tNMXE5FHdk8AwCvolBMALYFVYRM7WCl9x@10.9.23.11:5001/loaf
CORS_ORIGINS=*
JWT_SECRET=your-secret-key-change-this-in-production
JWT_ALGORITHM=HS256
ACCESS_TOKEN_EXPIRE_MINUTES=30
SMTP_HOST=smtp.gmail.com
SMTP_PORT=587
SMTP_USERNAME=your-email@gmail.com
SMTP_PASSWORD=your-app-password
SMTP_FROM_EMAIL=noreply@membership.com
SMTP_FROM_NAME=Membership Platform
FRONTEND_URL=http://localhost:3000
STRIPE_SECRET_KEY=sk_test_51RshYMLFfdCcyjTAYXXcFpH650EzUvUeN98nEa4Kj4EUqBpgdrIFsAHClbdn9lBnyWS54dHLPkUdQhlyxqEQKRds00TJYjpKnA
STRIPE_WEBHOOK_SECRET=whsec_78319c49ffe749cf62144160dc114e7523519d31432b062ef71423023ae7bbfa

0
README.md Normal file
View File

Binary file not shown.

Binary file not shown.

Binary file not shown.

Binary file not shown.

Binary file not shown.

Binary file not shown.

Binary file not shown.

Binary file not shown.

Binary file not shown.

Binary file not shown.

Binary file not shown.

80
auth.py Normal file
View File

@@ -0,0 +1,80 @@
from datetime import datetime, timedelta, timezone
from typing import Optional
from jose import JWTError, jwt
from passlib.context import CryptContext
from fastapi import Depends, HTTPException, status
from fastapi.security import HTTPBearer, HTTPAuthorizationCredentials
from sqlalchemy.orm import Session
import os
from database import get_db
from models import User, UserRole
pwd_context = CryptContext(schemes=["bcrypt"], deprecated="auto")
security = HTTPBearer()
JWT_SECRET = os.environ.get('JWT_SECRET', 'your-secret-key')
JWT_ALGORITHM = os.environ.get('JWT_ALGORITHM', 'HS256')
ACCESS_TOKEN_EXPIRE_MINUTES = int(os.environ.get('ACCESS_TOKEN_EXPIRE_MINUTES', 30))
def verify_password(plain_password, hashed_password):
return pwd_context.verify(plain_password, hashed_password)
def get_password_hash(password):
return pwd_context.hash(password)
def create_access_token(data: dict, expires_delta: Optional[timedelta] = None):
to_encode = data.copy()
if expires_delta:
expire = datetime.now(timezone.utc) + expires_delta
else:
expire = datetime.now(timezone.utc) + timedelta(minutes=ACCESS_TOKEN_EXPIRE_MINUTES)
to_encode.update({"exp": expire})
encoded_jwt = jwt.encode(to_encode, JWT_SECRET, algorithm=JWT_ALGORITHM)
return encoded_jwt
def decode_token(token: str):
try:
payload = jwt.decode(token, JWT_SECRET, algorithms=[JWT_ALGORITHM])
return payload
except JWTError:
return None
async def get_current_user(
credentials: HTTPAuthorizationCredentials = Depends(security),
db: Session = Depends(get_db)
) -> User:
token = credentials.credentials
payload = decode_token(token)
if payload is None:
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Invalid authentication credentials",
headers={"WWW-Authenticate": "Bearer"},
)
user_id: str = payload.get("sub")
if user_id is None:
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Invalid authentication credentials",
headers={"WWW-Authenticate": "Bearer"},
)
user = db.query(User).filter(User.id == user_id).first()
if user is None:
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="User not found",
headers={"WWW-Authenticate": "Bearer"},
)
return user
async def get_current_admin_user(current_user: User = Depends(get_current_user)) -> User:
if current_user.role != UserRole.admin:
raise HTTPException(
status_code=status.HTTP_403_FORBIDDEN,
detail="Not enough permissions"
)
return current_user

73
create_admin.py Normal file
View File

@@ -0,0 +1,73 @@
"""
Create an admin user for testing.
Run this script to add an admin account to your database.
"""
from database import SessionLocal
from models import User, UserStatus, UserRole
from auth import get_password_hash
from datetime import datetime, timezone
def create_admin():
"""Create an admin user"""
db = SessionLocal()
try:
# Check if admin already exists
existing_admin = db.query(User).filter(
User.email == "admin@loaf.org"
).first()
if existing_admin:
print(f"⚠️ Admin user already exists: {existing_admin.email}")
print(f" Role: {existing_admin.role.value}")
print(f" Status: {existing_admin.status.value}")
# Update to admin role if not already
if existing_admin.role != UserRole.admin:
existing_admin.role = UserRole.admin
existing_admin.status = UserStatus.active
existing_admin.email_verified = True
db.commit()
print("✅ Updated existing user to admin role")
return
print("Creating admin user...")
# Create admin user
admin_user = User(
email="admin@loaf.org",
password_hash=get_password_hash("admin123"), # Change this password!
first_name="Admin",
last_name="User",
phone="555-0001",
address="123 Admin Street",
city="Admin City",
state="CA",
zipcode="90001",
date_of_birth=datetime(1990, 1, 1),
status=UserStatus.active,
role=UserRole.admin,
email_verified=True,
newsletter_subscribed=False
)
db.add(admin_user)
db.commit()
db.refresh(admin_user)
print("✅ Admin user created successfully!")
print(f" Email: admin@loaf.org")
print(f" Password: admin123")
print(f" Role: {admin_user.role.value}")
print(f" User ID: {admin_user.id}")
print("\n⚠️ IMPORTANT: Change the password after first login!")
except Exception as e:
print(f"❌ Error creating admin user: {e}")
db.rollback()
finally:
db.close()
if __name__ == "__main__":
create_admin()

23
database.py Normal file
View File

@@ -0,0 +1,23 @@
from sqlalchemy import create_engine
from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy.orm import sessionmaker
import os
from dotenv import load_dotenv
from pathlib import Path
ROOT_DIR = Path(__file__).parent
load_dotenv(ROOT_DIR / '.env')
DATABASE_URL = os.environ.get('DATABASE_URL', 'postgresql://user:password@localhost:5432/membership_db')
engine = create_engine(DATABASE_URL)
SessionLocal = sessionmaker(autocommit=False, autoflush=False, bind=engine)
Base = declarative_base()
def get_db():
db = SessionLocal()
try:
yield db
finally:
db.close()

182
email_service.py Normal file
View File

@@ -0,0 +1,182 @@
import os
from email.mime.text import MIMEText
from email.mime.multipart import MIMEMultipart
import aiosmtplib
import logging
logger = logging.getLogger(__name__)
SMTP_HOST = os.environ.get('SMTP_HOST', 'smtp.gmail.com')
SMTP_PORT = int(os.environ.get('SMTP_PORT', 587))
SMTP_USERNAME = os.environ.get('SMTP_USERNAME', '')
SMTP_PASSWORD = os.environ.get('SMTP_PASSWORD', '')
SMTP_FROM_EMAIL = os.environ.get('SMTP_FROM_EMAIL', 'noreply@membership.com')
SMTP_FROM_NAME = os.environ.get('SMTP_FROM_NAME', 'Membership Platform')
FRONTEND_URL = os.environ.get('FRONTEND_URL', 'http://localhost:3000')
async def send_email(to_email: str, subject: str, html_content: str):
"""Send an email using SMTP"""
try:
message = MIMEMultipart('alternative')
message['From'] = f"{SMTP_FROM_NAME} <{SMTP_FROM_EMAIL}>"
message['To'] = to_email
message['Subject'] = subject
html_part = MIMEText(html_content, 'html')
message.attach(html_part)
# For development/testing, just log the email
if not SMTP_USERNAME or not SMTP_PASSWORD:
logger.info(f"[EMAIL] To: {to_email}")
logger.info(f"[EMAIL] Subject: {subject}")
logger.info(f"[EMAIL] Content: {html_content}")
return True
# Send actual email
await aiosmtplib.send(
message,
hostname=SMTP_HOST,
port=SMTP_PORT,
username=SMTP_USERNAME,
password=SMTP_PASSWORD,
start_tls=True
)
logger.info(f"Email sent successfully to {to_email}")
return True
except Exception as e:
logger.error(f"Failed to send email to {to_email}: {str(e)}")
return False
async def send_verification_email(to_email: str, token: str):
"""Send email verification link"""
verification_url = f"{FRONTEND_URL}/verify-email?token={token}"
subject = "Verify Your Email Address"
html_content = f"""
<!DOCTYPE html>
<html>
<head>
<style>
body {{ font-family: 'DM Sans', Arial, sans-serif; line-height: 1.6; color: #3D405B; }}
.container {{ max-width: 600px; margin: 0 auto; padding: 20px; }}
.header {{ background: linear-gradient(135deg, #F2CC8F 0%, #E07A5F 100%); padding: 30px; text-align: center; border-radius: 10px 10px 0 0; }}
.header h1 {{ color: white; margin: 0; font-family: 'Fraunces', serif; }}
.content {{ background: #FDFCF8; padding: 30px; border-radius: 0 0 10px 10px; }}
.button {{ display: inline-block; background: #E07A5F; color: white; padding: 15px 40px; text-decoration: none; border-radius: 50px; font-weight: 600; margin: 20px 0; }}
.button:hover {{ background: #D0694E; }}
</style>
</head>
<body>
<div class="container">
<div class="header">
<h1>Welcome to Our Community!</h1>
</div>
<div class="content">
<p>Thank you for registering with us. We're excited to have you join our community.</p>
<p>Please click the button below to verify your email address:</p>
<p style="text-align: center;">
<a href="{verification_url}" class="button">Verify Email</a>
</p>
<p>Or copy and paste this link into your browser:</p>
<p style="word-break: break-all; color: #6B708D;">{verification_url}</p>
<p>This link will expire in 24 hours.</p>
<p>If you didn't create an account, please ignore this email.</p>
</div>
</div>
</body>
</html>
"""
return await send_email(to_email, subject, html_content)
async def send_approval_notification(to_email: str, first_name: str):
"""Send notification when user is approved"""
login_url = f"{FRONTEND_URL}/login"
subject = "Your Membership Application Has Been Approved!"
html_content = f"""
<!DOCTYPE html>
<html>
<head>
<style>
body {{ font-family: 'DM Sans', Arial, sans-serif; line-height: 1.6; color: #3D405B; }}
.container {{ max-width: 600px; margin: 0 auto; padding: 20px; }}
.header {{ background: linear-gradient(135deg, #F2CC8F 0%, #E07A5F 100%); padding: 30px; text-align: center; border-radius: 10px 10px 0 0; }}
.header h1 {{ color: white; margin: 0; font-family: 'Fraunces', serif; }}
.content {{ background: #FDFCF8; padding: 30px; border-radius: 0 0 10px 10px; }}
.button {{ display: inline-block; background: #E07A5F; color: white; padding: 15px 40px; text-decoration: none; border-radius: 50px; font-weight: 600; margin: 20px 0; }}
</style>
</head>
<body>
<div class="container">
<div class="header">
<h1>Congratulations, {first_name}!</h1>
</div>
<div class="content">
<p>Great news! Your membership application has been approved.</p>
<p>You now have full access to all member features and events.</p>
<p style="text-align: center;">
<a href="{login_url}" class="button">Login to Your Account</a>
</p>
<p>We look forward to seeing you at our events!</p>
</div>
</div>
</body>
</html>
"""
return await send_email(to_email, subject, html_content)
async def send_payment_prompt_email(to_email: str, first_name: str):
"""Send payment prompt email after admin approval"""
payment_url = f"{FRONTEND_URL}/plans"
subject = "Complete Your LOAF Membership - Payment Required"
html_content = f"""
<!DOCTYPE html>
<html>
<head>
<style>
body {{ font-family: 'DM Sans', Arial, sans-serif; line-height: 1.6; color: #3D405B; }}
.container {{ max-width: 600px; margin: 0 auto; padding: 20px; }}
.header {{ background: linear-gradient(135deg, #F2CC8F 0%, #E07A5F 100%); padding: 30px; text-align: center; border-radius: 10px 10px 0 0; }}
.header h1 {{ color: white; margin: 0; font-family: 'Fraunces', serif; }}
.content {{ background: #FDFCF8; padding: 30px; border-radius: 0 0 10px 10px; }}
.button {{ display: inline-block; background: #E07A5F; color: white; padding: 15px 40px; text-decoration: none; border-radius: 50px; font-weight: 600; margin: 20px 0; }}
.button:hover {{ background: #D0694E; }}
.benefits {{ background: white; padding: 20px; border-radius: 8px; margin: 20px 0; }}
.benefits ul {{ list-style: none; padding: 0; }}
.benefits li {{ padding: 8px 0; padding-left: 25px; position: relative; }}
.benefits li:before {{ content: ""; position: absolute; left: 0; color: #E07A5F; font-weight: bold; }}
</style>
</head>
<body>
<div class="container">
<div class="header">
<h1>🎉 You're Approved, {first_name}!</h1>
</div>
<div class="content">
<p><strong>Great news!</strong> Your LOAF membership application has been approved by our admin team.</p>
<p>To activate your membership and gain full access, please complete your annual membership payment:</p>
<p style="text-align: center;">
<a href="{payment_url}" class="button">Complete Payment →</a>
</p>
<div class="benefits">
<p><strong>Once payment is processed, you'll have immediate access to:</strong></p>
<ul>
<li>Members-only events and gatherings</li>
<li>Community directory and networking</li>
<li>Exclusive member benefits and discounts</li>
<li>LOAF newsletter and updates</li>
</ul>
</div>
<p>We're excited to have you join the LOAF community!</p>
<p style="margin-top: 30px; padding-top: 20px; border-top: 1px solid #E8E4DB; color: #6B708D; font-size: 14px;">
Questions? Contact us at support@loaf.org
</p>
</div>
</div>
</body>
</html>
"""
return await send_email(to_email, subject, html_content)

16
init_db.py Normal file
View File

@@ -0,0 +1,16 @@
"""
Database initialization script.
Creates all tables defined in models.py
"""
from database import Base, engine
from models import User, Event, EventRSVP, SubscriptionPlan, Subscription
def init_database():
"""Create all database tables"""
print("Creating database tables...")
Base.metadata.create_all(bind=engine)
print("✅ Database tables created successfully!")
if __name__ == "__main__":
init_database()

View File

@@ -0,0 +1,40 @@
"""
Migration script to add manual payment columns to subscriptions table.
Run this once to update the database schema.
"""
from database import engine
from sqlalchemy import text
def add_manual_payment_columns():
"""Add manual payment columns to subscriptions table"""
migrations = [
"ALTER TABLE subscriptions ADD COLUMN IF NOT EXISTS manual_payment BOOLEAN NOT NULL DEFAULT FALSE",
"ALTER TABLE subscriptions ADD COLUMN IF NOT EXISTS manual_payment_notes TEXT",
"ALTER TABLE subscriptions ADD COLUMN IF NOT EXISTS manual_payment_admin_id UUID REFERENCES users(id)",
"ALTER TABLE subscriptions ADD COLUMN IF NOT EXISTS manual_payment_date TIMESTAMP",
"ALTER TABLE subscriptions ADD COLUMN IF NOT EXISTS payment_method VARCHAR"
]
try:
print("Adding manual payment columns to subscriptions table...")
with engine.connect() as conn:
for sql in migrations:
print(f" Executing: {sql[:60]}...")
conn.execute(text(sql))
conn.commit()
print("✅ Migration completed successfully!")
print("\nAdded columns:")
print(" - manual_payment (BOOLEAN)")
print(" - manual_payment_notes (TEXT)")
print(" - manual_payment_admin_id (UUID)")
print(" - manual_payment_date (TIMESTAMP)")
print(" - payment_method (VARCHAR)")
except Exception as e:
print(f"❌ Migration failed: {e}")
raise
if __name__ == "__main__":
add_manual_payment_columns()

31
migrate_status.py Normal file
View File

@@ -0,0 +1,31 @@
"""
Migrate user status from awaiting_event to pending_approval
Run this once to update existing database records
"""
from database import SessionLocal
from models import User
def migrate_status():
db = SessionLocal()
try:
# Find all users with old status
users = db.query(User).filter(User.status == "awaiting_event").all()
count = 0
for user in users:
user.status = "pending_approval"
count += 1
db.commit()
print(f"✅ Successfully migrated {count} users from 'awaiting_event' to 'pending_approval'")
except Exception as e:
print(f"❌ Error during migration: {e}")
db.rollback()
finally:
db.close()
if __name__ == "__main__":
print("Starting status migration...")
migrate_status()

141
models.py Normal file
View File

@@ -0,0 +1,141 @@
from sqlalchemy import Column, String, Boolean, DateTime, Enum as SQLEnum, Text, Integer, ForeignKey, JSON
from sqlalchemy.dialects.postgresql import UUID
from sqlalchemy.orm import relationship
from datetime import datetime, timezone
import uuid
import enum
from database import Base
class UserStatus(enum.Enum):
pending_email = "pending_email"
pending_approval = "pending_approval"
pre_approved = "pre_approved"
payment_pending = "payment_pending"
active = "active"
inactive = "inactive"
class UserRole(enum.Enum):
guest = "guest"
member = "member"
admin = "admin"
class RSVPStatus(enum.Enum):
yes = "yes"
no = "no"
maybe = "maybe"
class SubscriptionStatus(enum.Enum):
active = "active"
expired = "expired"
cancelled = "cancelled"
class User(Base):
__tablename__ = "users"
id = Column(UUID(as_uuid=True), primary_key=True, default=uuid.uuid4)
email = Column(String, unique=True, nullable=False, index=True)
password_hash = Column(String, nullable=False)
first_name = Column(String, nullable=False)
last_name = Column(String, nullable=False)
phone = Column(String, nullable=False)
address = Column(String, nullable=False)
city = Column(String, nullable=False)
state = Column(String, nullable=False)
zipcode = Column(String, nullable=False)
date_of_birth = Column(DateTime, nullable=False)
lead_sources = Column(JSON, default=list)
partner_first_name = Column(String, nullable=True)
partner_last_name = Column(String, nullable=True)
partner_is_member = Column(Boolean, default=False)
partner_plan_to_become_member = Column(Boolean, default=False)
referred_by_member_name = Column(String, nullable=True)
status = Column(SQLEnum(UserStatus), default=UserStatus.pending_email, nullable=False)
role = Column(SQLEnum(UserRole), default=UserRole.guest, nullable=False)
email_verified = Column(Boolean, default=False)
email_verification_token = Column(String, nullable=True)
newsletter_subscribed = Column(Boolean, default=False)
created_at = Column(DateTime, default=lambda: datetime.now(timezone.utc))
updated_at = Column(DateTime, default=lambda: datetime.now(timezone.utc), onupdate=lambda: datetime.now(timezone.utc))
# Relationships
events_created = relationship("Event", back_populates="creator")
rsvps = relationship("EventRSVP", back_populates="user")
subscriptions = relationship("Subscription", back_populates="user", foreign_keys="Subscription.user_id")
class Event(Base):
__tablename__ = "events"
id = Column(UUID(as_uuid=True), primary_key=True, default=uuid.uuid4)
title = Column(String, nullable=False)
description = Column(Text, nullable=True)
start_at = Column(DateTime, nullable=False)
end_at = Column(DateTime, nullable=False)
location = Column(String, nullable=False)
capacity = Column(Integer, nullable=True)
created_by = Column(UUID(as_uuid=True), ForeignKey("users.id"), nullable=False)
published = Column(Boolean, default=False)
created_at = Column(DateTime, default=lambda: datetime.now(timezone.utc))
updated_at = Column(DateTime, default=lambda: datetime.now(timezone.utc), onupdate=lambda: datetime.now(timezone.utc))
# Relationships
creator = relationship("User", back_populates="events_created")
rsvps = relationship("EventRSVP", back_populates="event", cascade="all, delete-orphan")
class EventRSVP(Base):
__tablename__ = "event_rsvps"
id = Column(UUID(as_uuid=True), primary_key=True, default=uuid.uuid4)
event_id = Column(UUID(as_uuid=True), ForeignKey("events.id"), nullable=False)
user_id = Column(UUID(as_uuid=True), ForeignKey("users.id"), nullable=False)
rsvp_status = Column(SQLEnum(RSVPStatus), default=RSVPStatus.maybe, nullable=False)
attended = Column(Boolean, default=False)
attended_at = Column(DateTime, nullable=True)
created_at = Column(DateTime, default=lambda: datetime.now(timezone.utc))
updated_at = Column(DateTime, default=lambda: datetime.now(timezone.utc), onupdate=lambda: datetime.now(timezone.utc))
# Relationships
event = relationship("Event", back_populates="rsvps")
user = relationship("User", back_populates="rsvps")
class SubscriptionPlan(Base):
__tablename__ = "subscription_plans"
id = Column(UUID(as_uuid=True), primary_key=True, default=uuid.uuid4)
name = Column(String, nullable=False)
description = Column(Text, nullable=True)
price_cents = Column(Integer, nullable=False) # Price in cents
billing_cycle = Column(String, default="yearly", nullable=False) # yearly, monthly, etc.
stripe_price_id = Column(String, nullable=True) # Stripe Price ID
active = Column(Boolean, default=True)
created_at = Column(DateTime, default=lambda: datetime.now(timezone.utc))
updated_at = Column(DateTime, default=lambda: datetime.now(timezone.utc), onupdate=lambda: datetime.now(timezone.utc))
# Relationships
subscriptions = relationship("Subscription", back_populates="plan")
class Subscription(Base):
__tablename__ = "subscriptions"
id = Column(UUID(as_uuid=True), primary_key=True, default=uuid.uuid4)
user_id = Column(UUID(as_uuid=True), ForeignKey("users.id"), nullable=False)
plan_id = Column(UUID(as_uuid=True), ForeignKey("subscription_plans.id"), nullable=False)
stripe_subscription_id = Column(String, nullable=True) # Stripe Subscription ID
stripe_customer_id = Column(String, nullable=True) # Stripe Customer ID
status = Column(SQLEnum(SubscriptionStatus), default=SubscriptionStatus.active, nullable=False)
start_date = Column(DateTime, nullable=False)
end_date = Column(DateTime, nullable=True)
amount_paid_cents = Column(Integer, nullable=True) # Amount paid in cents
# Manual payment fields
manual_payment = Column(Boolean, default=False, nullable=False) # Whether this was a manual offline payment
manual_payment_notes = Column(Text, nullable=True) # Admin notes about the payment
manual_payment_admin_id = Column(UUID(as_uuid=True), ForeignKey("users.id"), nullable=True) # Admin who processed the payment
manual_payment_date = Column(DateTime, nullable=True) # Date payment was received
payment_method = Column(String, nullable=True) # Payment method: stripe, cash, bank_transfer, check, other
created_at = Column(DateTime, default=lambda: datetime.now(timezone.utc))
updated_at = Column(DateTime, default=lambda: datetime.now(timezone.utc), onupdate=lambda: datetime.now(timezone.utc))
# Relationships
user = relationship("User", back_populates="subscriptions", foreign_keys=[user_id])
plan = relationship("SubscriptionPlan", back_populates="subscriptions")

180
payment_service.py Normal file
View File

@@ -0,0 +1,180 @@
"""
Payment service for Stripe integration.
Handles subscription creation, checkout sessions, and webhook processing.
"""
import stripe
import os
from dotenv import load_dotenv
from datetime import datetime, timezone, timedelta
# Load environment variables
load_dotenv()
# Initialize Stripe with secret key
stripe.api_key = os.getenv("STRIPE_SECRET_KEY")
# Stripe webhook secret for signature verification
STRIPE_WEBHOOK_SECRET = os.getenv("STRIPE_WEBHOOK_SECRET")
def create_checkout_session(
user_id: str,
user_email: str,
plan_id: str,
stripe_price_id: str,
success_url: str,
cancel_url: str
):
"""
Create a Stripe Checkout session for subscription payment.
Args:
user_id: User's UUID
user_email: User's email address
plan_id: SubscriptionPlan UUID
stripe_price_id: Stripe Price ID for the plan
success_url: URL to redirect after successful payment
cancel_url: URL to redirect if user cancels
Returns:
dict: Checkout session object with session ID and URL
"""
try:
# Create Checkout Session
checkout_session = stripe.checkout.Session.create(
customer_email=user_email,
payment_method_types=["card"],
line_items=[
{
"price": stripe_price_id,
"quantity": 1,
}
],
mode="subscription",
success_url=success_url,
cancel_url=cancel_url,
metadata={
"user_id": str(user_id),
"plan_id": str(plan_id),
},
subscription_data={
"metadata": {
"user_id": str(user_id),
"plan_id": str(plan_id),
}
}
)
return {
"session_id": checkout_session.id,
"url": checkout_session.url
}
except stripe.error.StripeError as e:
raise Exception(f"Stripe error: {str(e)}")
def verify_webhook_signature(payload: bytes, sig_header: str) -> dict:
"""
Verify Stripe webhook signature and construct event.
Args:
payload: Raw webhook payload bytes
sig_header: Stripe signature header
Returns:
dict: Verified webhook event
Raises:
ValueError: If signature verification fails
"""
try:
event = stripe.Webhook.construct_event(
payload, sig_header, STRIPE_WEBHOOK_SECRET
)
return event
except ValueError as e:
raise ValueError(f"Invalid payload: {str(e)}")
except stripe.error.SignatureVerificationError as e:
raise ValueError(f"Invalid signature: {str(e)}")
def get_subscription_end_date(billing_cycle: str = "yearly") -> datetime:
"""
Calculate subscription end date based on billing cycle.
Args:
billing_cycle: "yearly" or "monthly"
Returns:
datetime: End date for the subscription
"""
now = datetime.now(timezone.utc)
if billing_cycle == "yearly":
# Add 1 year
return now + timedelta(days=365)
elif billing_cycle == "monthly":
# Add 1 month (approximation)
return now + timedelta(days=30)
else:
# Default to yearly
return now + timedelta(days=365)
def create_stripe_price(
product_name: str,
price_cents: int,
billing_cycle: str = "yearly"
) -> str:
"""
Create a Stripe Price object for a subscription plan.
Args:
product_name: Name of the product/plan
price_cents: Price in cents
billing_cycle: "yearly" or "monthly"
Returns:
str: Stripe Price ID
"""
try:
# Create a product first
product = stripe.Product.create(name=product_name)
# Determine recurring interval
interval = "year" if billing_cycle == "yearly" else "month"
# Create price
price = stripe.Price.create(
product=product.id,
unit_amount=price_cents,
currency="usd",
recurring={"interval": interval},
)
return price.id
except stripe.error.StripeError as e:
raise Exception(f"Stripe error creating price: {str(e)}")
def get_customer_portal_url(stripe_customer_id: str, return_url: str) -> str:
"""
Create a Stripe Customer Portal session for subscription management.
Args:
stripe_customer_id: Stripe Customer ID
return_url: URL to return to after portal session
Returns:
str: Customer portal URL
"""
try:
session = stripe.billing_portal.Session.create(
customer=stripe_customer_id,
return_url=return_url,
)
return session.url
except stripe.error.StripeError as e:
raise Exception(f"Stripe error creating portal session: {str(e)}")

74
requirements.txt Normal file
View File

@@ -0,0 +1,74 @@
aiosmtplib==5.0.0
annotated-types==0.7.0
anyio==4.11.0
bcrypt==4.1.3
black==25.11.0
boto3==1.41.3
botocore==1.41.3
certifi==2025.11.12
cffi==2.0.0
charset-normalizer==3.4.4
click==8.3.1
cryptography==46.0.3
dnspython==2.8.0
ecdsa==0.19.1
email-validator==2.3.0
fastapi==0.110.1
flake8==7.3.0
greenlet==3.2.4
h11==0.16.0
idna==3.11
iniconfig==2.3.0
isort==7.0.0
jmespath==1.0.1
jq==1.10.0
markdown-it-py==4.0.0
mccabe==0.7.0
mdurl==0.1.2
motor==3.3.1
mypy==1.18.2
mypy_extensions==1.1.0
numpy==2.3.5
oauthlib==3.3.1
packaging==25.0
pandas==2.3.3
passlib==1.7.4
pathspec==0.12.1
platformdirs==4.5.0
pluggy==1.6.0
psycopg2-binary==2.9.11
pyasn1==0.6.1
pycodestyle==2.14.0
pycparser==2.23
pydantic==2.12.4
pydantic_core==2.41.5
pyflakes==3.4.0
Pygments==2.19.2
PyJWT==2.10.1
pymongo==4.5.0
pytest==9.0.1
python-dateutil==2.9.0.post0
python-dotenv==1.2.1
python-jose==3.5.0
python-multipart==0.0.20
pytokens==0.3.0
pytz==2025.2
requests==2.32.5
requests-oauthlib==2.0.0
rich==14.2.0
rsa==4.9.1
s3transfer==0.15.0
s5cmd==0.2.0
shellingham==1.5.4
six==1.17.0
sniffio==1.3.1
stripe==11.2.0
SQLAlchemy==2.0.44
starlette==0.37.2
typer==0.20.0
typing-inspection==0.4.2
typing_extensions==4.15.0
tzdata==2025.2
urllib3==2.5.0
uvicorn==0.25.0
watchfiles==1.1.1

81
seed_plans.py Normal file
View File

@@ -0,0 +1,81 @@
"""
Seed subscription plans into the database.
Creates a default annual membership plan for testing.
"""
import os
from database import SessionLocal
from models import SubscriptionPlan
from payment_service import create_stripe_price
def seed_plans():
"""Create default subscription plans"""
db = SessionLocal()
try:
# Check if plans already exist
existing_plans = db.query(SubscriptionPlan).count()
if existing_plans > 0:
print(f"⚠️ Found {existing_plans} existing plan(s). Skipping seed.")
return
print("Creating subscription plans...")
# Option 1: Create plan WITHOUT Stripe Price ID (for testing without Stripe)
# Uncomment this if you want to test UI without actual Stripe integration
annual_plan = SubscriptionPlan(
name="Annual Membership",
description="Full access to all LOAF community benefits for one year",
price_cents=10000, # $100.00
billing_cycle="yearly",
stripe_price_id=None, # Set to None for local testing
active=True
)
# Option 2: Create plan WITH Stripe Price ID (requires valid Stripe API key)
# Uncomment this block and comment out Option 1 if you have Stripe configured
"""
try:
stripe_price_id = create_stripe_price(
product_name="LOAF Annual Membership",
price_cents=10000, # $100.00
billing_cycle="yearly"
)
print(f"✅ Created Stripe Price: {stripe_price_id}")
annual_plan = SubscriptionPlan(
name="Annual Membership",
description="Full access to all LOAF community benefits for one year",
price_cents=10000,
billing_cycle="yearly",
stripe_price_id=stripe_price_id,
active=True
)
except Exception as e:
print(f"❌ Failed to create Stripe Price: {e}")
print("Creating plan without Stripe Price ID for local testing...")
annual_plan = SubscriptionPlan(
name="Annual Membership",
description="Full access to all LOAF community benefits for one year",
price_cents=10000,
billing_cycle="yearly",
stripe_price_id=None,
active=True
)
"""
db.add(annual_plan)
db.commit()
db.refresh(annual_plan)
print(f"✅ Created plan: {annual_plan.name} (${annual_plan.price_cents/100:.2f}/{annual_plan.billing_cycle})")
print(f" Plan ID: {annual_plan.id}")
except Exception as e:
print(f"❌ Error seeding plans: {e}")
db.rollback()
finally:
db.close()
if __name__ == "__main__":
seed_plans()

1191
server.py Normal file

File diff suppressed because it is too large Load Diff

65
test_plans_endpoint.py Normal file
View File

@@ -0,0 +1,65 @@
"""
Test script to diagnose the subscription plans endpoint issue.
"""
from database import SessionLocal
from models import SubscriptionPlan, Subscription, SubscriptionStatus
def test_plans_query():
"""Test the same query that the endpoint uses"""
db = SessionLocal()
try:
print("Testing subscription plans query...\n")
# Step 1: Get all plans
plans = db.query(SubscriptionPlan).order_by(SubscriptionPlan.created_at.desc()).all()
print(f"✅ Found {len(plans)} plan(s)")
# Step 2: For each plan, get subscriber count (same as endpoint)
result = []
for plan in plans:
print(f"\nProcessing plan: {plan.name}")
try:
subscriber_count = db.query(Subscription).filter(
Subscription.plan_id == plan.id,
Subscription.status == SubscriptionStatus.active
).count()
print(f" ✓ Subscriber count: {subscriber_count}")
except Exception as e:
print(f" ❌ Error counting subscribers: {e}")
raise
result.append({
"id": str(plan.id),
"name": plan.name,
"description": plan.description,
"price_cents": plan.price_cents,
"billing_cycle": plan.billing_cycle,
"stripe_price_id": plan.stripe_price_id,
"active": plan.active,
"subscriber_count": subscriber_count,
"created_at": plan.created_at,
"updated_at": plan.updated_at
})
print("\n" + "="*50)
print("✅ Query completed successfully!")
print("="*50)
for plan in result:
print(f"\n{plan['name']}")
print(f" Price: ${plan['price_cents']/100:.2f}")
print(f" Cycle: {plan['billing_cycle']}")
print(f" Active: {plan['active']}")
print(f" Subscribers: {plan['subscriber_count']}")
except Exception as e:
print(f"\n❌ ERROR: {e}")
import traceback
traceback.print_exc()
finally:
db.close()
if __name__ == "__main__":
test_plans_query()