feat: migrate JWT verification from HS256 shared secret to JWKS
All checks were successful
CI / backend-tests (pull_request) Successful in 28s
CI / frontend-tests (pull_request) Successful in 28s

Replace symmetric HS256 JWT verification with asymmetric RS256 using JWKS.
Backend now fetches and caches public keys from Supabase's JWKS endpoint
instead of using a shared secret.

- Add cryptography dependency for RS256 support
- Use PyJWKClient to fetch/cache JWKS from {SUPABASE_URL}/.well-known/jwks.json
- Remove SUPABASE_JWT_SECRET from config, docker-compose, deploy workflow, .env
- Update tests to use RS256 tokens with mocked JWKS client

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
This commit is contained in:
2026-03-21 14:01:31 +01:00
parent a12958ae32
commit 177c02006a
11 changed files with 233 additions and 123 deletions

View File

@@ -1,25 +1,29 @@
import time
from unittest.mock import MagicMock, patch
from uuid import UUID
import jwt
import pytest
from cryptography.hazmat.primitives.asymmetric import rsa
from httpx import ASGITransport, AsyncClient
from app.core.auth import AuthUser, get_current_user, require_admin, require_auth
from app.core.config import settings
from app.main import app
from app.models.user import User
@pytest.fixture
def jwt_secret():
"""Provide a test JWT secret."""
return "test-jwt-secret-for-testing-only"
@pytest.fixture(scope="module")
def rsa_key_pair():
"""Generate RSA key pair for testing."""
private_key = rsa.generate_private_key(public_exponent=65537, key_size=2048)
public_key = private_key.public_key()
return private_key, public_key
@pytest.fixture
def valid_token(jwt_secret):
"""Generate a valid JWT token."""
def valid_token(rsa_key_pair):
"""Generate a valid RS256 JWT token."""
private_key, _ = rsa_key_pair
payload = {
"sub": "user-123",
"email": "test@example.com",
@@ -27,12 +31,13 @@ def valid_token(jwt_secret):
"aud": "authenticated",
"exp": int(time.time()) + 3600,
}
return jwt.encode(payload, jwt_secret, algorithm="HS256")
return jwt.encode(payload, private_key, algorithm="RS256")
@pytest.fixture
def expired_token(jwt_secret):
"""Generate an expired JWT token."""
def expired_token(rsa_key_pair):
"""Generate an expired RS256 JWT token."""
private_key, _ = rsa_key_pair
payload = {
"sub": "user-123",
"email": "test@example.com",
@@ -40,12 +45,13 @@ def expired_token(jwt_secret):
"aud": "authenticated",
"exp": int(time.time()) - 3600, # Expired 1 hour ago
}
return jwt.encode(payload, jwt_secret, algorithm="HS256")
return jwt.encode(payload, private_key, algorithm="RS256")
@pytest.fixture
def invalid_token():
"""Generate a token signed with wrong secret."""
"""Generate a token signed with wrong key."""
wrong_key = rsa.generate_private_key(public_exponent=65537, key_size=2048)
payload = {
"sub": "user-123",
"email": "test@example.com",
@@ -53,81 +59,76 @@ def invalid_token():
"aud": "authenticated",
"exp": int(time.time()) + 3600,
}
return jwt.encode(payload, "wrong-secret", algorithm="HS256")
return jwt.encode(payload, wrong_key, algorithm="RS256")
@pytest.fixture
def auth_client(db_session, jwt_secret, valid_token, monkeypatch):
"""Client with valid auth token and configured JWT secret."""
monkeypatch.setattr(settings, "supabase_jwt_secret", jwt_secret)
async def _get_client():
async with AsyncClient(
transport=ASGITransport(app=app),
base_url="http://test",
headers={"Authorization": f"Bearer {valid_token}"},
) as ac:
yield ac
return _get_client
def mock_jwks_client(rsa_key_pair):
"""Create a mock JWKS client that returns our test public key."""
_, public_key = rsa_key_pair
mock_client = MagicMock()
mock_signing_key = MagicMock()
mock_signing_key.key = public_key
mock_client.get_signing_key_from_jwt.return_value = mock_signing_key
return mock_client
async def test_get_current_user_valid_token(jwt_secret, valid_token, monkeypatch):
async def test_get_current_user_valid_token(valid_token, mock_jwks_client):
"""Test get_current_user returns user for valid token."""
monkeypatch.setattr(settings, "supabase_jwt_secret", jwt_secret)
with patch("app.core.auth._get_jwks_client", return_value=mock_jwks_client):
class MockRequest:
headers = {"Authorization": f"Bearer {valid_token}"}
class MockRequest:
headers = {"Authorization": f"Bearer {valid_token}"}
user = get_current_user(MockRequest())
assert user is not None
assert user.id == "user-123"
assert user.email == "test@example.com"
assert user.role == "authenticated"
user = get_current_user(MockRequest())
assert user is not None
assert user.id == "user-123"
assert user.email == "test@example.com"
assert user.role == "authenticated"
async def test_get_current_user_no_token(jwt_secret, monkeypatch):
async def test_get_current_user_no_token(mock_jwks_client):
"""Test get_current_user returns None when no token."""
monkeypatch.setattr(settings, "supabase_jwt_secret", jwt_secret)
with patch("app.core.auth._get_jwks_client", return_value=mock_jwks_client):
class MockRequest:
headers = {}
class MockRequest:
headers = {}
user = get_current_user(MockRequest())
assert user is None
user = get_current_user(MockRequest())
assert user is None
async def test_get_current_user_expired_token(jwt_secret, expired_token, monkeypatch):
async def test_get_current_user_expired_token(expired_token, mock_jwks_client):
"""Test get_current_user returns None for expired token."""
monkeypatch.setattr(settings, "supabase_jwt_secret", jwt_secret)
with patch("app.core.auth._get_jwks_client", return_value=mock_jwks_client):
class MockRequest:
headers = {"Authorization": f"Bearer {expired_token}"}
class MockRequest:
headers = {"Authorization": f"Bearer {expired_token}"}
user = get_current_user(MockRequest())
assert user is None
user = get_current_user(MockRequest())
assert user is None
async def test_get_current_user_invalid_token(jwt_secret, invalid_token, monkeypatch):
async def test_get_current_user_invalid_token(invalid_token, mock_jwks_client):
"""Test get_current_user returns None for invalid token."""
monkeypatch.setattr(settings, "supabase_jwt_secret", jwt_secret)
with patch("app.core.auth._get_jwks_client", return_value=mock_jwks_client):
class MockRequest:
headers = {"Authorization": f"Bearer {invalid_token}"}
class MockRequest:
headers = {"Authorization": f"Bearer {invalid_token}"}
user = get_current_user(MockRequest())
assert user is None
user = get_current_user(MockRequest())
assert user is None
async def test_get_current_user_malformed_header(jwt_secret, monkeypatch):
async def test_get_current_user_malformed_header(mock_jwks_client):
"""Test get_current_user returns None for malformed auth header."""
monkeypatch.setattr(settings, "supabase_jwt_secret", jwt_secret)
with patch("app.core.auth._get_jwks_client", return_value=mock_jwks_client):
class MockRequest:
headers = {"Authorization": "NotBearer token"}
class MockRequest:
headers = {"Authorization": "NotBearer token"}
user = get_current_user(MockRequest())
assert user is None
user = get_current_user(MockRequest())
assert user is None
async def test_require_auth_valid_user():
@@ -158,17 +159,16 @@ async def test_protected_endpoint_without_token(db_session):
async def test_protected_endpoint_with_expired_token(
db_session, jwt_secret, expired_token, monkeypatch
db_session, expired_token, mock_jwks_client
):
"""Test that write endpoint returns 401 with expired token."""
monkeypatch.setattr(settings, "supabase_jwt_secret", jwt_secret)
async with AsyncClient(
transport=ASGITransport(app=app),
base_url="http://test",
headers={"Authorization": f"Bearer {expired_token}"},
) as ac:
response = await ac.post("/runs", json={"game_id": 1, "name": "Test Run"})
with patch("app.core.auth._get_jwks_client", return_value=mock_jwks_client):
async with AsyncClient(
transport=ASGITransport(app=app),
base_url="http://test",
headers={"Authorization": f"Bearer {expired_token}"},
) as ac:
response = await ac.post("/runs", json={"game_id": 1, "name": "Test Run"})
assert response.status_code == 401
@@ -231,7 +231,7 @@ async def test_require_admin_user_not_in_db(db_session):
async def test_admin_endpoint_returns_403_for_non_admin(
db_session, jwt_secret, monkeypatch
db_session, rsa_key_pair, mock_jwks_client
):
"""Test that admin endpoint returns 403 for authenticated non-admin user."""
user_id = "44444444-4444-4444-4444-444444444444"
@@ -243,7 +243,7 @@ async def test_admin_endpoint_returns_403_for_non_admin(
db_session.add(regular_user)
await db_session.commit()
monkeypatch.setattr(settings, "supabase_jwt_secret", jwt_secret)
private_key, _ = rsa_key_pair
token = jwt.encode(
{
"sub": user_id,
@@ -252,30 +252,33 @@ async def test_admin_endpoint_returns_403_for_non_admin(
"aud": "authenticated",
"exp": int(time.time()) + 3600,
},
jwt_secret,
algorithm="HS256",
private_key,
algorithm="RS256",
)
async with AsyncClient(
transport=ASGITransport(app=app),
base_url="http://test",
headers={"Authorization": f"Bearer {token}"},
) as ac:
response = await ac.post(
"/games",
json={
"name": "Test Game",
"slug": "test-game",
"generation": 1,
"region": "Kanto",
"category": "core",
},
)
with patch("app.core.auth._get_jwks_client", return_value=mock_jwks_client):
async with AsyncClient(
transport=ASGITransport(app=app),
base_url="http://test",
headers={"Authorization": f"Bearer {token}"},
) as ac:
response = await ac.post(
"/games",
json={
"name": "Test Game",
"slug": "test-game",
"generation": 1,
"region": "Kanto",
"category": "core",
},
)
assert response.status_code == 403
assert response.json()["detail"] == "Admin access required"
async def test_admin_endpoint_succeeds_for_admin(db_session, jwt_secret, monkeypatch):
async def test_admin_endpoint_succeeds_for_admin(
db_session, rsa_key_pair, mock_jwks_client
):
"""Test that admin endpoint succeeds for authenticated admin user."""
user_id = "55555555-5555-5555-5555-555555555555"
admin_user = User(
@@ -286,7 +289,7 @@ async def test_admin_endpoint_succeeds_for_admin(db_session, jwt_secret, monkeyp
db_session.add(admin_user)
await db_session.commit()
monkeypatch.setattr(settings, "supabase_jwt_secret", jwt_secret)
private_key, _ = rsa_key_pair
token = jwt.encode(
{
"sub": user_id,
@@ -295,24 +298,25 @@ async def test_admin_endpoint_succeeds_for_admin(db_session, jwt_secret, monkeyp
"aud": "authenticated",
"exp": int(time.time()) + 3600,
},
jwt_secret,
algorithm="HS256",
private_key,
algorithm="RS256",
)
async with AsyncClient(
transport=ASGITransport(app=app),
base_url="http://test",
headers={"Authorization": f"Bearer {token}"},
) as ac:
response = await ac.post(
"/games",
json={
"name": "Test Game",
"slug": "test-game",
"generation": 1,
"region": "Kanto",
"category": "core",
},
)
with patch("app.core.auth._get_jwks_client", return_value=mock_jwks_client):
async with AsyncClient(
transport=ASGITransport(app=app),
base_url="http://test",
headers={"Authorization": f"Bearer {token}"},
) as ac:
response = await ac.post(
"/games",
json={
"name": "Test Game",
"slug": "test-game",
"generation": 1,
"region": "Kanto",
"category": "core",
},
)
assert response.status_code == 201
assert response.json()["name"] == "Test Game"