## Summary - Add `is_admin` column to users table with Alembic migration and a `require_admin` FastAPI dependency that protects all admin-facing write endpoints (games, pokemon, evolutions, bosses, routes CRUD) - Expose admin status to frontend via user API and update AuthContext to fetch/store `isAdmin` after login - Make navigation menu auth-aware (different links for logged-out, logged-in, and admin users) and protect frontend routes with `ProtectedRoute` and `AdminRoute` components, preserving deep-linking through redirects - Fix test reliability: `drop_all` before `create_all` to clear stale PostgreSQL enums from interrupted test runs - Fix test auth: add `admin_client` fixture and use valid UUID for mock user so tests pass with new admin-protected endpoints ## Test plan - [x] All 252 backend tests pass - [ ] Verify non-admin users cannot access admin write endpoints (games, pokemon, evolutions, bosses CRUD) - [ ] Verify admin users can access admin endpoints normally - [ ] Verify navigation shows correct links for logged-out, logged-in, and admin states - [ ] Verify `/admin/*` routes redirect non-admin users with a toast - [ ] Verify `/runs/new` and `/genlockes/new` redirect unauthenticated users to login, then back after auth 🤖 Generated with [Claude Code](https://claude.com/claude-code) Reviewed-on: #67 Co-authored-by: Julian Tabel <juliantabel.jt@gmail.com> Co-committed-by: Julian Tabel <juliantabel.jt@gmail.com>
108 lines
2.8 KiB
Python
108 lines
2.8 KiB
Python
from dataclasses import dataclass
|
|
from uuid import UUID
|
|
|
|
import jwt
|
|
from fastapi import Depends, HTTPException, Request, status
|
|
from sqlalchemy import select
|
|
from sqlalchemy.ext.asyncio import AsyncSession
|
|
|
|
from app.core.config import settings
|
|
from app.core.database import get_session
|
|
from app.models.user import User
|
|
|
|
|
|
@dataclass
|
|
class AuthUser:
|
|
"""Authenticated user info extracted from JWT."""
|
|
|
|
id: str # Supabase user UUID
|
|
email: str | None = None
|
|
role: str | None = None
|
|
|
|
|
|
def _extract_token(request: Request) -> str | None:
|
|
"""Extract Bearer token from Authorization header."""
|
|
auth_header = request.headers.get("Authorization")
|
|
if not auth_header:
|
|
return None
|
|
parts = auth_header.split()
|
|
if len(parts) != 2 or parts[0].lower() != "bearer":
|
|
return None
|
|
return parts[1]
|
|
|
|
|
|
def _verify_jwt(token: str) -> dict | None:
|
|
"""Verify JWT against Supabase JWT secret. Returns payload or None."""
|
|
if not settings.supabase_jwt_secret:
|
|
return None
|
|
try:
|
|
payload = jwt.decode(
|
|
token,
|
|
settings.supabase_jwt_secret,
|
|
algorithms=["HS256"],
|
|
audience="authenticated",
|
|
)
|
|
return payload
|
|
except jwt.ExpiredSignatureError:
|
|
return None
|
|
except jwt.InvalidTokenError:
|
|
return None
|
|
|
|
|
|
def get_current_user(request: Request) -> AuthUser | None:
|
|
"""
|
|
Extract and verify the current user from the request.
|
|
Returns AuthUser if valid token, None otherwise.
|
|
"""
|
|
token = _extract_token(request)
|
|
if not token:
|
|
return None
|
|
|
|
payload = _verify_jwt(token)
|
|
if not payload:
|
|
return None
|
|
|
|
# Supabase JWT has 'sub' as user ID
|
|
user_id = payload.get("sub")
|
|
if not user_id:
|
|
return None
|
|
|
|
return AuthUser(
|
|
id=user_id,
|
|
email=payload.get("email"),
|
|
role=payload.get("role"),
|
|
)
|
|
|
|
|
|
def require_auth(user: AuthUser | None = Depends(get_current_user)) -> AuthUser:
|
|
"""
|
|
Dependency that requires authentication.
|
|
Raises 401 if no valid token is present.
|
|
"""
|
|
if user is None:
|
|
raise HTTPException(
|
|
status_code=status.HTTP_401_UNAUTHORIZED,
|
|
detail="Authentication required",
|
|
headers={"WWW-Authenticate": "Bearer"},
|
|
)
|
|
return user
|
|
|
|
|
|
async def require_admin(
|
|
user: AuthUser = Depends(require_auth),
|
|
session: AsyncSession = Depends(get_session),
|
|
) -> AuthUser:
|
|
"""
|
|
Dependency that requires admin privileges.
|
|
Raises 401 if not authenticated, 403 if not an admin.
|
|
"""
|
|
result = await session.execute(select(User).where(User.id == UUID(user.id)))
|
|
db_user = result.scalar_one_or_none()
|
|
|
|
if db_user is None or not db_user.is_admin:
|
|
raise HTTPException(
|
|
status_code=status.HTTP_403_FORBIDDEN,
|
|
detail="Admin access required",
|
|
)
|
|
return user
|