import time from uuid import UUID import jwt import pytest from httpx import ASGITransport, AsyncClient from app.core.auth import AuthUser, get_current_user, require_admin, require_auth from app.core.config import settings from app.main import app from app.models.user import User @pytest.fixture def jwt_secret(): """Provide a test JWT secret.""" return "test-jwt-secret-for-testing-only" @pytest.fixture def valid_token(jwt_secret): """Generate a valid JWT token.""" payload = { "sub": "user-123", "email": "test@example.com", "role": "authenticated", "aud": "authenticated", "exp": int(time.time()) + 3600, } return jwt.encode(payload, jwt_secret, algorithm="HS256") @pytest.fixture def expired_token(jwt_secret): """Generate an expired JWT token.""" payload = { "sub": "user-123", "email": "test@example.com", "role": "authenticated", "aud": "authenticated", "exp": int(time.time()) - 3600, # Expired 1 hour ago } return jwt.encode(payload, jwt_secret, algorithm="HS256") @pytest.fixture def invalid_token(): """Generate a token signed with wrong secret.""" payload = { "sub": "user-123", "email": "test@example.com", "role": "authenticated", "aud": "authenticated", "exp": int(time.time()) + 3600, } return jwt.encode(payload, "wrong-secret", algorithm="HS256") @pytest.fixture def auth_client(db_session, jwt_secret, valid_token, monkeypatch): """Client with valid auth token and configured JWT secret.""" monkeypatch.setattr(settings, "supabase_jwt_secret", jwt_secret) async def _get_client(): async with AsyncClient( transport=ASGITransport(app=app), base_url="http://test", headers={"Authorization": f"Bearer {valid_token}"}, ) as ac: yield ac return _get_client async def test_get_current_user_valid_token(jwt_secret, valid_token, monkeypatch): """Test get_current_user returns user for valid token.""" monkeypatch.setattr(settings, "supabase_jwt_secret", jwt_secret) class MockRequest: headers = {"Authorization": f"Bearer {valid_token}"} user = get_current_user(MockRequest()) assert user is not None assert user.id == "user-123" assert user.email == "test@example.com" assert user.role == "authenticated" async def test_get_current_user_no_token(jwt_secret, monkeypatch): """Test get_current_user returns None when no token.""" monkeypatch.setattr(settings, "supabase_jwt_secret", jwt_secret) class MockRequest: headers = {} user = get_current_user(MockRequest()) assert user is None async def test_get_current_user_expired_token(jwt_secret, expired_token, monkeypatch): """Test get_current_user returns None for expired token.""" monkeypatch.setattr(settings, "supabase_jwt_secret", jwt_secret) class MockRequest: headers = {"Authorization": f"Bearer {expired_token}"} user = get_current_user(MockRequest()) assert user is None async def test_get_current_user_invalid_token(jwt_secret, invalid_token, monkeypatch): """Test get_current_user returns None for invalid token.""" monkeypatch.setattr(settings, "supabase_jwt_secret", jwt_secret) class MockRequest: headers = {"Authorization": f"Bearer {invalid_token}"} user = get_current_user(MockRequest()) assert user is None async def test_get_current_user_malformed_header(jwt_secret, monkeypatch): """Test get_current_user returns None for malformed auth header.""" monkeypatch.setattr(settings, "supabase_jwt_secret", jwt_secret) class MockRequest: headers = {"Authorization": "NotBearer token"} user = get_current_user(MockRequest()) assert user is None async def test_require_auth_valid_user(): """Test require_auth passes through valid user.""" user = AuthUser(id="user-123", email="test@example.com") result = require_auth(user) assert result is user async def test_require_auth_no_user(): """Test require_auth raises 401 for no user.""" from fastapi import HTTPException with pytest.raises(HTTPException) as exc_info: require_auth(None) assert exc_info.value.status_code == 401 assert exc_info.value.detail == "Authentication required" async def test_protected_endpoint_without_token(db_session): """Test that write endpoint returns 401 without token.""" async with AsyncClient( transport=ASGITransport(app=app), base_url="http://test" ) as ac: response = await ac.post("/runs", json={"game_id": 1, "name": "Test Run"}) assert response.status_code == 401 assert response.json()["detail"] == "Authentication required" async def test_protected_endpoint_with_expired_token( db_session, jwt_secret, expired_token, monkeypatch ): """Test that write endpoint returns 401 with expired token.""" monkeypatch.setattr(settings, "supabase_jwt_secret", jwt_secret) async with AsyncClient( transport=ASGITransport(app=app), base_url="http://test", headers={"Authorization": f"Bearer {expired_token}"}, ) as ac: response = await ac.post("/runs", json={"game_id": 1, "name": "Test Run"}) assert response.status_code == 401 async def test_read_endpoint_without_token(db_session): """Test that read endpoints work without authentication.""" async with AsyncClient( transport=ASGITransport(app=app), base_url="http://test" ) as ac: response = await ac.get("/runs") assert response.status_code == 200 async def test_require_admin_valid_admin_user(db_session): """Test require_admin passes through for admin user.""" user_id = "11111111-1111-1111-1111-111111111111" admin_user = User( id=UUID(user_id), email="admin@example.com", is_admin=True, ) db_session.add(admin_user) await db_session.commit() auth_user = AuthUser(id=user_id, email="admin@example.com") result = await require_admin(user=auth_user, session=db_session) assert result is auth_user async def test_require_admin_non_admin_user(db_session): """Test require_admin raises 403 for non-admin user.""" from fastapi import HTTPException user_id = "22222222-2222-2222-2222-222222222222" regular_user = User( id=UUID(user_id), email="user@example.com", is_admin=False, ) db_session.add(regular_user) await db_session.commit() auth_user = AuthUser(id=user_id, email="user@example.com") with pytest.raises(HTTPException) as exc_info: await require_admin(user=auth_user, session=db_session) assert exc_info.value.status_code == 403 assert exc_info.value.detail == "Admin access required" async def test_require_admin_user_not_in_db(db_session): """Test require_admin raises 403 for user not in database.""" from fastapi import HTTPException auth_user = AuthUser( id="33333333-3333-3333-3333-333333333333", email="ghost@example.com" ) with pytest.raises(HTTPException) as exc_info: await require_admin(user=auth_user, session=db_session) assert exc_info.value.status_code == 403 assert exc_info.value.detail == "Admin access required" async def test_admin_endpoint_returns_403_for_non_admin( db_session, jwt_secret, monkeypatch ): """Test that admin endpoint returns 403 for authenticated non-admin user.""" user_id = "44444444-4444-4444-4444-444444444444" regular_user = User( id=UUID(user_id), email="nonadmin@example.com", is_admin=False, ) db_session.add(regular_user) await db_session.commit() monkeypatch.setattr(settings, "supabase_jwt_secret", jwt_secret) token = jwt.encode( { "sub": user_id, "email": "nonadmin@example.com", "role": "authenticated", "aud": "authenticated", "exp": int(time.time()) + 3600, }, jwt_secret, algorithm="HS256", ) async with AsyncClient( transport=ASGITransport(app=app), base_url="http://test", headers={"Authorization": f"Bearer {token}"}, ) as ac: response = await ac.post( "/games", json={ "name": "Test Game", "slug": "test-game", "generation": 1, "region": "Kanto", "category": "core", }, ) assert response.status_code == 403 assert response.json()["detail"] == "Admin access required" async def test_admin_endpoint_succeeds_for_admin(db_session, jwt_secret, monkeypatch): """Test that admin endpoint succeeds for authenticated admin user.""" user_id = "55555555-5555-5555-5555-555555555555" admin_user = User( id=UUID(user_id), email="admin@example.com", is_admin=True, ) db_session.add(admin_user) await db_session.commit() monkeypatch.setattr(settings, "supabase_jwt_secret", jwt_secret) token = jwt.encode( { "sub": user_id, "email": "admin@example.com", "role": "authenticated", "aud": "authenticated", "exp": int(time.time()) + 3600, }, jwt_secret, algorithm="HS256", ) async with AsyncClient( transport=ASGITransport(app=app), base_url="http://test", headers={"Authorization": f"Bearer {token}"}, ) as ac: response = await ac.post( "/games", json={ "name": "Test Game", "slug": "test-game", "generation": 1, "region": "Kanto", "category": "core", }, ) assert response.status_code == 201 assert response.json()["name"] == "Test Game"