from dataclasses import dataclass from uuid import UUID import jwt from fastapi import Depends, HTTPException, Request, status from sqlalchemy import select from sqlalchemy.ext.asyncio import AsyncSession from app.core.config import settings from app.core.database import get_session from app.models.user import User @dataclass class AuthUser: """Authenticated user info extracted from JWT.""" id: str # Supabase user UUID email: str | None = None role: str | None = None def _extract_token(request: Request) -> str | None: """Extract Bearer token from Authorization header.""" auth_header = request.headers.get("Authorization") if not auth_header: return None parts = auth_header.split() if len(parts) != 2 or parts[0].lower() != "bearer": return None return parts[1] def _verify_jwt(token: str) -> dict | None: """Verify JWT against Supabase JWT secret. Returns payload or None.""" if not settings.supabase_jwt_secret: return None try: payload = jwt.decode( token, settings.supabase_jwt_secret, algorithms=["HS256"], audience="authenticated", ) return payload except jwt.ExpiredSignatureError: return None except jwt.InvalidTokenError: return None def get_current_user(request: Request) -> AuthUser | None: """ Extract and verify the current user from the request. Returns AuthUser if valid token, None otherwise. """ token = _extract_token(request) if not token: return None payload = _verify_jwt(token) if not payload: return None # Supabase JWT has 'sub' as user ID user_id = payload.get("sub") if not user_id: return None return AuthUser( id=user_id, email=payload.get("email"), role=payload.get("role"), ) def require_auth(user: AuthUser | None = Depends(get_current_user)) -> AuthUser: """ Dependency that requires authentication. Raises 401 if no valid token is present. """ if user is None: raise HTTPException( status_code=status.HTTP_401_UNAUTHORIZED, detail="Authentication required", headers={"WWW-Authenticate": "Bearer"}, ) return user async def require_admin( user: AuthUser = Depends(require_auth), session: AsyncSession = Depends(get_session), ) -> AuthUser: """ Dependency that requires admin privileges. Raises 401 if not authenticated, 403 if not an admin. """ result = await session.execute(select(User).where(User.id == UUID(user.id))) db_user = result.scalar_one_or_none() if db_user is None or not db_user.is_admin: raise HTTPException( status_code=status.HTTP_403_FORBIDDEN, detail="Admin access required", ) return user