feat: migrate JWT verification from HS256 shared secret to JWKS
Replace symmetric HS256 JWT verification with asymmetric RS256 using JWKS.
Backend now fetches and caches public keys from Supabase's JWKS endpoint
instead of using a shared secret.
- Add cryptography dependency for RS256 support
- Use PyJWKClient to fetch/cache JWKS from {SUPABASE_URL}/.well-known/jwks.json
- Remove SUPABASE_JWT_SECRET from config, docker-compose, deploy workflow, .env
- Update tests to use RS256 tokens with mocked JWKS client
Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
This commit is contained in:
@@ -3,6 +3,7 @@ from uuid import UUID
|
||||
|
||||
import jwt
|
||||
from fastapi import Depends, HTTPException, Request, status
|
||||
from jwt import PyJWKClient, PyJWKClientError
|
||||
from sqlalchemy import select
|
||||
from sqlalchemy.ext.asyncio import AsyncSession
|
||||
|
||||
@@ -11,6 +12,8 @@ from app.core.database import get_session
|
||||
from app.models.nuzlocke_run import NuzlockeRun
|
||||
from app.models.user import User
|
||||
|
||||
_jwks_client: PyJWKClient | None = None
|
||||
|
||||
|
||||
@dataclass
|
||||
class AuthUser:
|
||||
@@ -21,6 +24,15 @@ class AuthUser:
|
||||
role: str | None = None
|
||||
|
||||
|
||||
def _get_jwks_client() -> PyJWKClient | None:
|
||||
"""Get or create a cached JWKS client."""
|
||||
global _jwks_client
|
||||
if _jwks_client is None and settings.supabase_url:
|
||||
jwks_url = f"{settings.supabase_url.rstrip('/')}/.well-known/jwks.json"
|
||||
_jwks_client = PyJWKClient(jwks_url, cache_jwk_set=True, lifespan=300)
|
||||
return _jwks_client
|
||||
|
||||
|
||||
def _extract_token(request: Request) -> str | None:
|
||||
"""Extract Bearer token from Authorization header."""
|
||||
auth_header = request.headers.get("Authorization")
|
||||
@@ -33,14 +45,16 @@ def _extract_token(request: Request) -> str | None:
|
||||
|
||||
|
||||
def _verify_jwt(token: str) -> dict | None:
|
||||
"""Verify JWT against Supabase JWT secret. Returns payload or None."""
|
||||
if not settings.supabase_jwt_secret:
|
||||
"""Verify JWT using JWKS public key. Returns payload or None."""
|
||||
client = _get_jwks_client()
|
||||
if not client:
|
||||
return None
|
||||
try:
|
||||
signing_key = client.get_signing_key_from_jwt(token)
|
||||
payload = jwt.decode(
|
||||
token,
|
||||
settings.supabase_jwt_secret,
|
||||
algorithms=["HS256"],
|
||||
signing_key.key,
|
||||
algorithms=["RS256"],
|
||||
audience="authenticated",
|
||||
)
|
||||
return payload
|
||||
@@ -48,6 +62,8 @@ def _verify_jwt(token: str) -> dict | None:
|
||||
return None
|
||||
except jwt.InvalidTokenError:
|
||||
return None
|
||||
except PyJWKClientError:
|
||||
return None
|
||||
|
||||
|
||||
def get_current_user(request: Request) -> AuthUser | None:
|
||||
|
||||
@@ -20,7 +20,6 @@ class Settings(BaseSettings):
|
||||
# Supabase Auth
|
||||
supabase_url: str | None = None
|
||||
supabase_anon_key: str | None = None
|
||||
supabase_jwt_secret: str | None = None
|
||||
|
||||
|
||||
settings = Settings()
|
||||
|
||||
Reference in New Issue
Block a user