## Summary - Add `is_admin` column to users table with Alembic migration and a `require_admin` FastAPI dependency that protects all admin-facing write endpoints (games, pokemon, evolutions, bosses, routes CRUD) - Expose admin status to frontend via user API and update AuthContext to fetch/store `isAdmin` after login - Make navigation menu auth-aware (different links for logged-out, logged-in, and admin users) and protect frontend routes with `ProtectedRoute` and `AdminRoute` components, preserving deep-linking through redirects - Fix test reliability: `drop_all` before `create_all` to clear stale PostgreSQL enums from interrupted test runs - Fix test auth: add `admin_client` fixture and use valid UUID for mock user so tests pass with new admin-protected endpoints ## Test plan - [x] All 252 backend tests pass - [ ] Verify non-admin users cannot access admin write endpoints (games, pokemon, evolutions, bosses CRUD) - [ ] Verify admin users can access admin endpoints normally - [ ] Verify navigation shows correct links for logged-out, logged-in, and admin states - [ ] Verify `/admin/*` routes redirect non-admin users with a toast - [ ] Verify `/runs/new` and `/genlockes/new` redirect unauthenticated users to login, then back after auth 🤖 Generated with [Claude Code](https://claude.com/claude-code) Reviewed-on: #67 Co-authored-by: Julian Tabel <juliantabel.jt@gmail.com> Co-committed-by: Julian Tabel <juliantabel.jt@gmail.com>
319 lines
9.7 KiB
Python
319 lines
9.7 KiB
Python
import time
|
|
from uuid import UUID
|
|
|
|
import jwt
|
|
import pytest
|
|
from httpx import ASGITransport, AsyncClient
|
|
|
|
from app.core.auth import AuthUser, get_current_user, require_admin, require_auth
|
|
from app.core.config import settings
|
|
from app.main import app
|
|
from app.models.user import User
|
|
|
|
|
|
@pytest.fixture
|
|
def jwt_secret():
|
|
"""Provide a test JWT secret."""
|
|
return "test-jwt-secret-for-testing-only"
|
|
|
|
|
|
@pytest.fixture
|
|
def valid_token(jwt_secret):
|
|
"""Generate a valid JWT token."""
|
|
payload = {
|
|
"sub": "user-123",
|
|
"email": "test@example.com",
|
|
"role": "authenticated",
|
|
"aud": "authenticated",
|
|
"exp": int(time.time()) + 3600,
|
|
}
|
|
return jwt.encode(payload, jwt_secret, algorithm="HS256")
|
|
|
|
|
|
@pytest.fixture
|
|
def expired_token(jwt_secret):
|
|
"""Generate an expired JWT token."""
|
|
payload = {
|
|
"sub": "user-123",
|
|
"email": "test@example.com",
|
|
"role": "authenticated",
|
|
"aud": "authenticated",
|
|
"exp": int(time.time()) - 3600, # Expired 1 hour ago
|
|
}
|
|
return jwt.encode(payload, jwt_secret, algorithm="HS256")
|
|
|
|
|
|
@pytest.fixture
|
|
def invalid_token():
|
|
"""Generate a token signed with wrong secret."""
|
|
payload = {
|
|
"sub": "user-123",
|
|
"email": "test@example.com",
|
|
"role": "authenticated",
|
|
"aud": "authenticated",
|
|
"exp": int(time.time()) + 3600,
|
|
}
|
|
return jwt.encode(payload, "wrong-secret", algorithm="HS256")
|
|
|
|
|
|
@pytest.fixture
|
|
def auth_client(db_session, jwt_secret, valid_token, monkeypatch):
|
|
"""Client with valid auth token and configured JWT secret."""
|
|
monkeypatch.setattr(settings, "supabase_jwt_secret", jwt_secret)
|
|
|
|
async def _get_client():
|
|
async with AsyncClient(
|
|
transport=ASGITransport(app=app),
|
|
base_url="http://test",
|
|
headers={"Authorization": f"Bearer {valid_token}"},
|
|
) as ac:
|
|
yield ac
|
|
|
|
return _get_client
|
|
|
|
|
|
async def test_get_current_user_valid_token(jwt_secret, valid_token, monkeypatch):
|
|
"""Test get_current_user returns user for valid token."""
|
|
monkeypatch.setattr(settings, "supabase_jwt_secret", jwt_secret)
|
|
|
|
class MockRequest:
|
|
headers = {"Authorization": f"Bearer {valid_token}"}
|
|
|
|
user = get_current_user(MockRequest())
|
|
assert user is not None
|
|
assert user.id == "user-123"
|
|
assert user.email == "test@example.com"
|
|
assert user.role == "authenticated"
|
|
|
|
|
|
async def test_get_current_user_no_token(jwt_secret, monkeypatch):
|
|
"""Test get_current_user returns None when no token."""
|
|
monkeypatch.setattr(settings, "supabase_jwt_secret", jwt_secret)
|
|
|
|
class MockRequest:
|
|
headers = {}
|
|
|
|
user = get_current_user(MockRequest())
|
|
assert user is None
|
|
|
|
|
|
async def test_get_current_user_expired_token(jwt_secret, expired_token, monkeypatch):
|
|
"""Test get_current_user returns None for expired token."""
|
|
monkeypatch.setattr(settings, "supabase_jwt_secret", jwt_secret)
|
|
|
|
class MockRequest:
|
|
headers = {"Authorization": f"Bearer {expired_token}"}
|
|
|
|
user = get_current_user(MockRequest())
|
|
assert user is None
|
|
|
|
|
|
async def test_get_current_user_invalid_token(jwt_secret, invalid_token, monkeypatch):
|
|
"""Test get_current_user returns None for invalid token."""
|
|
monkeypatch.setattr(settings, "supabase_jwt_secret", jwt_secret)
|
|
|
|
class MockRequest:
|
|
headers = {"Authorization": f"Bearer {invalid_token}"}
|
|
|
|
user = get_current_user(MockRequest())
|
|
assert user is None
|
|
|
|
|
|
async def test_get_current_user_malformed_header(jwt_secret, monkeypatch):
|
|
"""Test get_current_user returns None for malformed auth header."""
|
|
monkeypatch.setattr(settings, "supabase_jwt_secret", jwt_secret)
|
|
|
|
class MockRequest:
|
|
headers = {"Authorization": "NotBearer token"}
|
|
|
|
user = get_current_user(MockRequest())
|
|
assert user is None
|
|
|
|
|
|
async def test_require_auth_valid_user():
|
|
"""Test require_auth passes through valid user."""
|
|
user = AuthUser(id="user-123", email="test@example.com")
|
|
result = require_auth(user)
|
|
assert result is user
|
|
|
|
|
|
async def test_require_auth_no_user():
|
|
"""Test require_auth raises 401 for no user."""
|
|
from fastapi import HTTPException
|
|
|
|
with pytest.raises(HTTPException) as exc_info:
|
|
require_auth(None)
|
|
assert exc_info.value.status_code == 401
|
|
assert exc_info.value.detail == "Authentication required"
|
|
|
|
|
|
async def test_protected_endpoint_without_token(db_session):
|
|
"""Test that write endpoint returns 401 without token."""
|
|
async with AsyncClient(
|
|
transport=ASGITransport(app=app), base_url="http://test"
|
|
) as ac:
|
|
response = await ac.post("/runs", json={"game_id": 1, "name": "Test Run"})
|
|
assert response.status_code == 401
|
|
assert response.json()["detail"] == "Authentication required"
|
|
|
|
|
|
async def test_protected_endpoint_with_expired_token(
|
|
db_session, jwt_secret, expired_token, monkeypatch
|
|
):
|
|
"""Test that write endpoint returns 401 with expired token."""
|
|
monkeypatch.setattr(settings, "supabase_jwt_secret", jwt_secret)
|
|
|
|
async with AsyncClient(
|
|
transport=ASGITransport(app=app),
|
|
base_url="http://test",
|
|
headers={"Authorization": f"Bearer {expired_token}"},
|
|
) as ac:
|
|
response = await ac.post("/runs", json={"game_id": 1, "name": "Test Run"})
|
|
assert response.status_code == 401
|
|
|
|
|
|
async def test_read_endpoint_without_token(db_session):
|
|
"""Test that read endpoints work without authentication."""
|
|
async with AsyncClient(
|
|
transport=ASGITransport(app=app), base_url="http://test"
|
|
) as ac:
|
|
response = await ac.get("/runs")
|
|
assert response.status_code == 200
|
|
|
|
|
|
async def test_require_admin_valid_admin_user(db_session):
|
|
"""Test require_admin passes through for admin user."""
|
|
user_id = "11111111-1111-1111-1111-111111111111"
|
|
admin_user = User(
|
|
id=UUID(user_id),
|
|
email="admin@example.com",
|
|
is_admin=True,
|
|
)
|
|
db_session.add(admin_user)
|
|
await db_session.commit()
|
|
|
|
auth_user = AuthUser(id=user_id, email="admin@example.com")
|
|
result = await require_admin(user=auth_user, session=db_session)
|
|
assert result is auth_user
|
|
|
|
|
|
async def test_require_admin_non_admin_user(db_session):
|
|
"""Test require_admin raises 403 for non-admin user."""
|
|
from fastapi import HTTPException
|
|
|
|
user_id = "22222222-2222-2222-2222-222222222222"
|
|
regular_user = User(
|
|
id=UUID(user_id),
|
|
email="user@example.com",
|
|
is_admin=False,
|
|
)
|
|
db_session.add(regular_user)
|
|
await db_session.commit()
|
|
|
|
auth_user = AuthUser(id=user_id, email="user@example.com")
|
|
with pytest.raises(HTTPException) as exc_info:
|
|
await require_admin(user=auth_user, session=db_session)
|
|
assert exc_info.value.status_code == 403
|
|
assert exc_info.value.detail == "Admin access required"
|
|
|
|
|
|
async def test_require_admin_user_not_in_db(db_session):
|
|
"""Test require_admin raises 403 for user not in database."""
|
|
from fastapi import HTTPException
|
|
|
|
auth_user = AuthUser(
|
|
id="33333333-3333-3333-3333-333333333333", email="ghost@example.com"
|
|
)
|
|
with pytest.raises(HTTPException) as exc_info:
|
|
await require_admin(user=auth_user, session=db_session)
|
|
assert exc_info.value.status_code == 403
|
|
assert exc_info.value.detail == "Admin access required"
|
|
|
|
|
|
async def test_admin_endpoint_returns_403_for_non_admin(
|
|
db_session, jwt_secret, monkeypatch
|
|
):
|
|
"""Test that admin endpoint returns 403 for authenticated non-admin user."""
|
|
user_id = "44444444-4444-4444-4444-444444444444"
|
|
regular_user = User(
|
|
id=UUID(user_id),
|
|
email="nonadmin@example.com",
|
|
is_admin=False,
|
|
)
|
|
db_session.add(regular_user)
|
|
await db_session.commit()
|
|
|
|
monkeypatch.setattr(settings, "supabase_jwt_secret", jwt_secret)
|
|
token = jwt.encode(
|
|
{
|
|
"sub": user_id,
|
|
"email": "nonadmin@example.com",
|
|
"role": "authenticated",
|
|
"aud": "authenticated",
|
|
"exp": int(time.time()) + 3600,
|
|
},
|
|
jwt_secret,
|
|
algorithm="HS256",
|
|
)
|
|
|
|
async with AsyncClient(
|
|
transport=ASGITransport(app=app),
|
|
base_url="http://test",
|
|
headers={"Authorization": f"Bearer {token}"},
|
|
) as ac:
|
|
response = await ac.post(
|
|
"/games",
|
|
json={
|
|
"name": "Test Game",
|
|
"slug": "test-game",
|
|
"generation": 1,
|
|
"region": "Kanto",
|
|
"category": "core",
|
|
},
|
|
)
|
|
assert response.status_code == 403
|
|
assert response.json()["detail"] == "Admin access required"
|
|
|
|
|
|
async def test_admin_endpoint_succeeds_for_admin(db_session, jwt_secret, monkeypatch):
|
|
"""Test that admin endpoint succeeds for authenticated admin user."""
|
|
user_id = "55555555-5555-5555-5555-555555555555"
|
|
admin_user = User(
|
|
id=UUID(user_id),
|
|
email="admin@example.com",
|
|
is_admin=True,
|
|
)
|
|
db_session.add(admin_user)
|
|
await db_session.commit()
|
|
|
|
monkeypatch.setattr(settings, "supabase_jwt_secret", jwt_secret)
|
|
token = jwt.encode(
|
|
{
|
|
"sub": user_id,
|
|
"email": "admin@example.com",
|
|
"role": "authenticated",
|
|
"aud": "authenticated",
|
|
"exp": int(time.time()) + 3600,
|
|
},
|
|
jwt_secret,
|
|
algorithm="HS256",
|
|
)
|
|
|
|
async with AsyncClient(
|
|
transport=ASGITransport(app=app),
|
|
base_url="http://test",
|
|
headers={"Authorization": f"Bearer {token}"},
|
|
) as ac:
|
|
response = await ac.post(
|
|
"/games",
|
|
json={
|
|
"name": "Test Game",
|
|
"slug": "test-game",
|
|
"generation": 1,
|
|
"region": "Kanto",
|
|
"category": "core",
|
|
},
|
|
)
|
|
assert response.status_code == 201
|
|
assert response.json()["name"] == "Test Game"
|