Files
nuzlocke-tracker/backend/tests/test_auth.py
Julian Tabel 2e66186fac feat: add require_admin dependency and protect admin endpoints
Add require_admin FastAPI dependency that checks is_admin column on users
table. Apply it to all admin-facing write endpoints (games, pokemon,
evolutions, bosses, routes CRUD). Run-scoped endpoints remain protected
by require_auth only since they manage user's own data.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
2026-03-21 11:14:55 +01:00

319 lines
9.7 KiB
Python

import time
from uuid import UUID
import jwt
import pytest
from httpx import ASGITransport, AsyncClient
from app.core.auth import AuthUser, get_current_user, require_admin, require_auth
from app.core.config import settings
from app.main import app
from app.models.user import User
@pytest.fixture
def jwt_secret():
"""Provide a test JWT secret."""
return "test-jwt-secret-for-testing-only"
@pytest.fixture
def valid_token(jwt_secret):
"""Generate a valid JWT token."""
payload = {
"sub": "user-123",
"email": "test@example.com",
"role": "authenticated",
"aud": "authenticated",
"exp": int(time.time()) + 3600,
}
return jwt.encode(payload, jwt_secret, algorithm="HS256")
@pytest.fixture
def expired_token(jwt_secret):
"""Generate an expired JWT token."""
payload = {
"sub": "user-123",
"email": "test@example.com",
"role": "authenticated",
"aud": "authenticated",
"exp": int(time.time()) - 3600, # Expired 1 hour ago
}
return jwt.encode(payload, jwt_secret, algorithm="HS256")
@pytest.fixture
def invalid_token():
"""Generate a token signed with wrong secret."""
payload = {
"sub": "user-123",
"email": "test@example.com",
"role": "authenticated",
"aud": "authenticated",
"exp": int(time.time()) + 3600,
}
return jwt.encode(payload, "wrong-secret", algorithm="HS256")
@pytest.fixture
def auth_client(db_session, jwt_secret, valid_token, monkeypatch):
"""Client with valid auth token and configured JWT secret."""
monkeypatch.setattr(settings, "supabase_jwt_secret", jwt_secret)
async def _get_client():
async with AsyncClient(
transport=ASGITransport(app=app),
base_url="http://test",
headers={"Authorization": f"Bearer {valid_token}"},
) as ac:
yield ac
return _get_client
async def test_get_current_user_valid_token(jwt_secret, valid_token, monkeypatch):
"""Test get_current_user returns user for valid token."""
monkeypatch.setattr(settings, "supabase_jwt_secret", jwt_secret)
class MockRequest:
headers = {"Authorization": f"Bearer {valid_token}"}
user = get_current_user(MockRequest())
assert user is not None
assert user.id == "user-123"
assert user.email == "test@example.com"
assert user.role == "authenticated"
async def test_get_current_user_no_token(jwt_secret, monkeypatch):
"""Test get_current_user returns None when no token."""
monkeypatch.setattr(settings, "supabase_jwt_secret", jwt_secret)
class MockRequest:
headers = {}
user = get_current_user(MockRequest())
assert user is None
async def test_get_current_user_expired_token(jwt_secret, expired_token, monkeypatch):
"""Test get_current_user returns None for expired token."""
monkeypatch.setattr(settings, "supabase_jwt_secret", jwt_secret)
class MockRequest:
headers = {"Authorization": f"Bearer {expired_token}"}
user = get_current_user(MockRequest())
assert user is None
async def test_get_current_user_invalid_token(jwt_secret, invalid_token, monkeypatch):
"""Test get_current_user returns None for invalid token."""
monkeypatch.setattr(settings, "supabase_jwt_secret", jwt_secret)
class MockRequest:
headers = {"Authorization": f"Bearer {invalid_token}"}
user = get_current_user(MockRequest())
assert user is None
async def test_get_current_user_malformed_header(jwt_secret, monkeypatch):
"""Test get_current_user returns None for malformed auth header."""
monkeypatch.setattr(settings, "supabase_jwt_secret", jwt_secret)
class MockRequest:
headers = {"Authorization": "NotBearer token"}
user = get_current_user(MockRequest())
assert user is None
async def test_require_auth_valid_user():
"""Test require_auth passes through valid user."""
user = AuthUser(id="user-123", email="test@example.com")
result = require_auth(user)
assert result is user
async def test_require_auth_no_user():
"""Test require_auth raises 401 for no user."""
from fastapi import HTTPException
with pytest.raises(HTTPException) as exc_info:
require_auth(None)
assert exc_info.value.status_code == 401
assert exc_info.value.detail == "Authentication required"
async def test_protected_endpoint_without_token(db_session):
"""Test that write endpoint returns 401 without token."""
async with AsyncClient(
transport=ASGITransport(app=app), base_url="http://test"
) as ac:
response = await ac.post("/runs", json={"game_id": 1, "name": "Test Run"})
assert response.status_code == 401
assert response.json()["detail"] == "Authentication required"
async def test_protected_endpoint_with_expired_token(
db_session, jwt_secret, expired_token, monkeypatch
):
"""Test that write endpoint returns 401 with expired token."""
monkeypatch.setattr(settings, "supabase_jwt_secret", jwt_secret)
async with AsyncClient(
transport=ASGITransport(app=app),
base_url="http://test",
headers={"Authorization": f"Bearer {expired_token}"},
) as ac:
response = await ac.post("/runs", json={"game_id": 1, "name": "Test Run"})
assert response.status_code == 401
async def test_read_endpoint_without_token(db_session):
"""Test that read endpoints work without authentication."""
async with AsyncClient(
transport=ASGITransport(app=app), base_url="http://test"
) as ac:
response = await ac.get("/runs")
assert response.status_code == 200
async def test_require_admin_valid_admin_user(db_session):
"""Test require_admin passes through for admin user."""
user_id = "11111111-1111-1111-1111-111111111111"
admin_user = User(
id=UUID(user_id),
email="admin@example.com",
is_admin=True,
)
db_session.add(admin_user)
await db_session.commit()
auth_user = AuthUser(id=user_id, email="admin@example.com")
result = await require_admin(user=auth_user, session=db_session)
assert result is auth_user
async def test_require_admin_non_admin_user(db_session):
"""Test require_admin raises 403 for non-admin user."""
from fastapi import HTTPException
user_id = "22222222-2222-2222-2222-222222222222"
regular_user = User(
id=UUID(user_id),
email="user@example.com",
is_admin=False,
)
db_session.add(regular_user)
await db_session.commit()
auth_user = AuthUser(id=user_id, email="user@example.com")
with pytest.raises(HTTPException) as exc_info:
await require_admin(user=auth_user, session=db_session)
assert exc_info.value.status_code == 403
assert exc_info.value.detail == "Admin access required"
async def test_require_admin_user_not_in_db(db_session):
"""Test require_admin raises 403 for user not in database."""
from fastapi import HTTPException
auth_user = AuthUser(
id="33333333-3333-3333-3333-333333333333", email="ghost@example.com"
)
with pytest.raises(HTTPException) as exc_info:
await require_admin(user=auth_user, session=db_session)
assert exc_info.value.status_code == 403
assert exc_info.value.detail == "Admin access required"
async def test_admin_endpoint_returns_403_for_non_admin(
db_session, jwt_secret, monkeypatch
):
"""Test that admin endpoint returns 403 for authenticated non-admin user."""
user_id = "44444444-4444-4444-4444-444444444444"
regular_user = User(
id=UUID(user_id),
email="nonadmin@example.com",
is_admin=False,
)
db_session.add(regular_user)
await db_session.commit()
monkeypatch.setattr(settings, "supabase_jwt_secret", jwt_secret)
token = jwt.encode(
{
"sub": user_id,
"email": "nonadmin@example.com",
"role": "authenticated",
"aud": "authenticated",
"exp": int(time.time()) + 3600,
},
jwt_secret,
algorithm="HS256",
)
async with AsyncClient(
transport=ASGITransport(app=app),
base_url="http://test",
headers={"Authorization": f"Bearer {token}"},
) as ac:
response = await ac.post(
"/games",
json={
"name": "Test Game",
"slug": "test-game",
"generation": 1,
"region": "Kanto",
"category": "core",
},
)
assert response.status_code == 403
assert response.json()["detail"] == "Admin access required"
async def test_admin_endpoint_succeeds_for_admin(db_session, jwt_secret, monkeypatch):
"""Test that admin endpoint succeeds for authenticated admin user."""
user_id = "55555555-5555-5555-5555-555555555555"
admin_user = User(
id=UUID(user_id),
email="admin@example.com",
is_admin=True,
)
db_session.add(admin_user)
await db_session.commit()
monkeypatch.setattr(settings, "supabase_jwt_secret", jwt_secret)
token = jwt.encode(
{
"sub": user_id,
"email": "admin@example.com",
"role": "authenticated",
"aud": "authenticated",
"exp": int(time.time()) + 3600,
},
jwt_secret,
algorithm="HS256",
)
async with AsyncClient(
transport=ASGITransport(app=app),
base_url="http://test",
headers={"Authorization": f"Bearer {token}"},
) as ac:
response = await ac.post(
"/games",
json={
"name": "Test Game",
"slug": "test-game",
"generation": 1,
"region": "Kanto",
"category": "core",
},
)
assert response.status_code == 201
assert response.json()["name"] == "Test Game"