feat: add require_admin dependency and protect admin endpoints
Add require_admin FastAPI dependency that checks is_admin column on users table. Apply it to all admin-facing write endpoints (games, pokemon, evolutions, bosses, routes CRUD). Run-scoped endpoints remain protected by require_auth only since they manage user's own data. Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
This commit is contained in:
@@ -1,12 +1,14 @@
|
||||
import time
|
||||
from uuid import UUID
|
||||
|
||||
import jwt
|
||||
import pytest
|
||||
from httpx import ASGITransport, AsyncClient
|
||||
|
||||
from app.core.auth import AuthUser, get_current_user, require_auth
|
||||
from app.core.auth import AuthUser, get_current_user, require_admin, require_auth
|
||||
from app.core.config import settings
|
||||
from app.main import app
|
||||
from app.models.user import User
|
||||
|
||||
|
||||
@pytest.fixture
|
||||
@@ -177,3 +179,140 @@ async def test_read_endpoint_without_token(db_session):
|
||||
) as ac:
|
||||
response = await ac.get("/runs")
|
||||
assert response.status_code == 200
|
||||
|
||||
|
||||
async def test_require_admin_valid_admin_user(db_session):
|
||||
"""Test require_admin passes through for admin user."""
|
||||
user_id = "11111111-1111-1111-1111-111111111111"
|
||||
admin_user = User(
|
||||
id=UUID(user_id),
|
||||
email="admin@example.com",
|
||||
is_admin=True,
|
||||
)
|
||||
db_session.add(admin_user)
|
||||
await db_session.commit()
|
||||
|
||||
auth_user = AuthUser(id=user_id, email="admin@example.com")
|
||||
result = await require_admin(user=auth_user, session=db_session)
|
||||
assert result is auth_user
|
||||
|
||||
|
||||
async def test_require_admin_non_admin_user(db_session):
|
||||
"""Test require_admin raises 403 for non-admin user."""
|
||||
from fastapi import HTTPException
|
||||
|
||||
user_id = "22222222-2222-2222-2222-222222222222"
|
||||
regular_user = User(
|
||||
id=UUID(user_id),
|
||||
email="user@example.com",
|
||||
is_admin=False,
|
||||
)
|
||||
db_session.add(regular_user)
|
||||
await db_session.commit()
|
||||
|
||||
auth_user = AuthUser(id=user_id, email="user@example.com")
|
||||
with pytest.raises(HTTPException) as exc_info:
|
||||
await require_admin(user=auth_user, session=db_session)
|
||||
assert exc_info.value.status_code == 403
|
||||
assert exc_info.value.detail == "Admin access required"
|
||||
|
||||
|
||||
async def test_require_admin_user_not_in_db(db_session):
|
||||
"""Test require_admin raises 403 for user not in database."""
|
||||
from fastapi import HTTPException
|
||||
|
||||
auth_user = AuthUser(
|
||||
id="33333333-3333-3333-3333-333333333333", email="ghost@example.com"
|
||||
)
|
||||
with pytest.raises(HTTPException) as exc_info:
|
||||
await require_admin(user=auth_user, session=db_session)
|
||||
assert exc_info.value.status_code == 403
|
||||
assert exc_info.value.detail == "Admin access required"
|
||||
|
||||
|
||||
async def test_admin_endpoint_returns_403_for_non_admin(
|
||||
db_session, jwt_secret, monkeypatch
|
||||
):
|
||||
"""Test that admin endpoint returns 403 for authenticated non-admin user."""
|
||||
user_id = "44444444-4444-4444-4444-444444444444"
|
||||
regular_user = User(
|
||||
id=UUID(user_id),
|
||||
email="nonadmin@example.com",
|
||||
is_admin=False,
|
||||
)
|
||||
db_session.add(regular_user)
|
||||
await db_session.commit()
|
||||
|
||||
monkeypatch.setattr(settings, "supabase_jwt_secret", jwt_secret)
|
||||
token = jwt.encode(
|
||||
{
|
||||
"sub": user_id,
|
||||
"email": "nonadmin@example.com",
|
||||
"role": "authenticated",
|
||||
"aud": "authenticated",
|
||||
"exp": int(time.time()) + 3600,
|
||||
},
|
||||
jwt_secret,
|
||||
algorithm="HS256",
|
||||
)
|
||||
|
||||
async with AsyncClient(
|
||||
transport=ASGITransport(app=app),
|
||||
base_url="http://test",
|
||||
headers={"Authorization": f"Bearer {token}"},
|
||||
) as ac:
|
||||
response = await ac.post(
|
||||
"/games",
|
||||
json={
|
||||
"name": "Test Game",
|
||||
"slug": "test-game",
|
||||
"generation": 1,
|
||||
"region": "Kanto",
|
||||
"category": "core",
|
||||
},
|
||||
)
|
||||
assert response.status_code == 403
|
||||
assert response.json()["detail"] == "Admin access required"
|
||||
|
||||
|
||||
async def test_admin_endpoint_succeeds_for_admin(db_session, jwt_secret, monkeypatch):
|
||||
"""Test that admin endpoint succeeds for authenticated admin user."""
|
||||
user_id = "55555555-5555-5555-5555-555555555555"
|
||||
admin_user = User(
|
||||
id=UUID(user_id),
|
||||
email="admin@example.com",
|
||||
is_admin=True,
|
||||
)
|
||||
db_session.add(admin_user)
|
||||
await db_session.commit()
|
||||
|
||||
monkeypatch.setattr(settings, "supabase_jwt_secret", jwt_secret)
|
||||
token = jwt.encode(
|
||||
{
|
||||
"sub": user_id,
|
||||
"email": "admin@example.com",
|
||||
"role": "authenticated",
|
||||
"aud": "authenticated",
|
||||
"exp": int(time.time()) + 3600,
|
||||
},
|
||||
jwt_secret,
|
||||
algorithm="HS256",
|
||||
)
|
||||
|
||||
async with AsyncClient(
|
||||
transport=ASGITransport(app=app),
|
||||
base_url="http://test",
|
||||
headers={"Authorization": f"Bearer {token}"},
|
||||
) as ac:
|
||||
response = await ac.post(
|
||||
"/games",
|
||||
json={
|
||||
"name": "Test Game",
|
||||
"slug": "test-game",
|
||||
"generation": 1,
|
||||
"region": "Kanto",
|
||||
"category": "core",
|
||||
},
|
||||
)
|
||||
assert response.status_code == 201
|
||||
assert response.json()["name"] == "Test Game"
|
||||
|
||||
Reference in New Issue
Block a user