fix: enforce run ownership on all mutation endpoints
Add require_run_owner helper in auth.py that enforces ownership on mutation endpoints. Unowned (legacy) runs are now read-only. Applied ownership checks to: - All 4 encounter mutation endpoints - Both boss result mutation endpoints - Run update/delete endpoints - All 5 genlocke mutation endpoints (via first leg's run owner) Also sets owner_id on run creation in genlockes.py (create_genlocke, advance_leg) and adds 22 comprehensive ownership enforcement tests. Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
This commit is contained in:
447
backend/tests/test_ownership.py
Normal file
447
backend/tests/test_ownership.py
Normal file
@@ -0,0 +1,447 @@
|
||||
"""Tests for run ownership enforcement on mutation endpoints."""
|
||||
|
||||
from uuid import UUID
|
||||
|
||||
import pytest
|
||||
from httpx import ASGITransport, AsyncClient
|
||||
from sqlalchemy.ext.asyncio import AsyncSession
|
||||
|
||||
from app.core.auth import AuthUser, get_current_user, require_run_owner
|
||||
from app.main import app
|
||||
from app.models.game import Game
|
||||
from app.models.nuzlocke_run import NuzlockeRun
|
||||
from app.models.user import User
|
||||
|
||||
RUNS_BASE = "/api/v1/runs"
|
||||
ENC_BASE = "/api/v1/encounters"
|
||||
|
||||
OWNER_ID = "00000000-0000-4000-a000-000000000001"
|
||||
OTHER_USER_ID = "00000000-0000-4000-a000-000000000002"
|
||||
|
||||
|
||||
@pytest.fixture
|
||||
async def game(db_session: AsyncSession) -> Game:
|
||||
"""Create a test game."""
|
||||
game = Game(name="Test Game", slug="test-game", generation=1, region="kanto")
|
||||
db_session.add(game)
|
||||
await db_session.commit()
|
||||
await db_session.refresh(game)
|
||||
return game
|
||||
|
||||
|
||||
@pytest.fixture
|
||||
async def owner_user(db_session: AsyncSession) -> User:
|
||||
"""Create the owner user."""
|
||||
user = User(id=UUID(OWNER_ID), email="owner@example.com")
|
||||
db_session.add(user)
|
||||
await db_session.commit()
|
||||
await db_session.refresh(user)
|
||||
return user
|
||||
|
||||
|
||||
@pytest.fixture
|
||||
async def other_user(db_session: AsyncSession) -> User:
|
||||
"""Create another user who is not the owner."""
|
||||
user = User(id=UUID(OTHER_USER_ID), email="other@example.com")
|
||||
db_session.add(user)
|
||||
await db_session.commit()
|
||||
await db_session.refresh(user)
|
||||
return user
|
||||
|
||||
|
||||
@pytest.fixture
|
||||
async def owned_run(
|
||||
db_session: AsyncSession, game: Game, owner_user: User
|
||||
) -> NuzlockeRun:
|
||||
"""Create a run owned by the test owner."""
|
||||
run = NuzlockeRun(
|
||||
game_id=game.id,
|
||||
owner_id=owner_user.id,
|
||||
name="Owned Run",
|
||||
status="active",
|
||||
)
|
||||
db_session.add(run)
|
||||
await db_session.commit()
|
||||
await db_session.refresh(run)
|
||||
return run
|
||||
|
||||
|
||||
@pytest.fixture
|
||||
async def unowned_run(db_session: AsyncSession, game: Game) -> NuzlockeRun:
|
||||
"""Create a legacy run with no owner."""
|
||||
run = NuzlockeRun(
|
||||
game_id=game.id,
|
||||
owner_id=None,
|
||||
name="Unowned Run",
|
||||
status="active",
|
||||
)
|
||||
db_session.add(run)
|
||||
await db_session.commit()
|
||||
await db_session.refresh(run)
|
||||
return run
|
||||
|
||||
|
||||
@pytest.fixture
|
||||
def owner_auth_override(owner_user):
|
||||
"""Override auth to return the owner user."""
|
||||
|
||||
def _override():
|
||||
return AuthUser(id=OWNER_ID, email="owner@example.com")
|
||||
|
||||
app.dependency_overrides[get_current_user] = _override
|
||||
yield
|
||||
app.dependency_overrides.pop(get_current_user, None)
|
||||
|
||||
|
||||
@pytest.fixture
|
||||
def other_auth_override(other_user):
|
||||
"""Override auth to return a different user (not the owner)."""
|
||||
|
||||
def _override():
|
||||
return AuthUser(id=OTHER_USER_ID, email="other@example.com")
|
||||
|
||||
app.dependency_overrides[get_current_user] = _override
|
||||
yield
|
||||
app.dependency_overrides.pop(get_current_user, None)
|
||||
|
||||
|
||||
@pytest.fixture
|
||||
async def owner_client(db_session, owner_auth_override):
|
||||
"""Client authenticated as the owner."""
|
||||
async with AsyncClient(
|
||||
transport=ASGITransport(app=app), base_url="http://test"
|
||||
) as ac:
|
||||
yield ac
|
||||
|
||||
|
||||
@pytest.fixture
|
||||
async def other_client(db_session, other_auth_override):
|
||||
"""Client authenticated as a different user."""
|
||||
async with AsyncClient(
|
||||
transport=ASGITransport(app=app), base_url="http://test"
|
||||
) as ac:
|
||||
yield ac
|
||||
|
||||
|
||||
class TestRequireRunOwnerHelper:
|
||||
"""Tests for the require_run_owner helper function."""
|
||||
|
||||
async def test_owner_passes(self, owner_user, owned_run):
|
||||
"""Owner can access their own run."""
|
||||
auth_user = AuthUser(id=str(owner_user.id), email="owner@example.com")
|
||||
require_run_owner(owned_run, auth_user)
|
||||
|
||||
async def test_non_owner_raises_403(self, other_user, owned_run):
|
||||
"""Non-owner is rejected with 403."""
|
||||
from fastapi import HTTPException
|
||||
|
||||
auth_user = AuthUser(id=str(other_user.id), email="other@example.com")
|
||||
with pytest.raises(HTTPException) as exc_info:
|
||||
require_run_owner(owned_run, auth_user)
|
||||
assert exc_info.value.status_code == 403
|
||||
assert "Only the run owner" in exc_info.value.detail
|
||||
|
||||
async def test_unowned_run_raises_403(self, owner_user, unowned_run):
|
||||
"""Unowned runs reject all mutations with 403."""
|
||||
from fastapi import HTTPException
|
||||
|
||||
auth_user = AuthUser(id=str(owner_user.id), email="owner@example.com")
|
||||
with pytest.raises(HTTPException) as exc_info:
|
||||
require_run_owner(unowned_run, auth_user)
|
||||
assert exc_info.value.status_code == 403
|
||||
assert "no owner" in exc_info.value.detail
|
||||
|
||||
|
||||
class TestRunUpdateOwnership:
|
||||
"""Tests for run PATCH ownership enforcement."""
|
||||
|
||||
async def test_owner_can_update(self, owner_client: AsyncClient, owned_run):
|
||||
"""Owner can update their own run."""
|
||||
response = await owner_client.patch(
|
||||
f"{RUNS_BASE}/{owned_run.id}", json={"name": "Updated Name"}
|
||||
)
|
||||
assert response.status_code == 200
|
||||
assert response.json()["name"] == "Updated Name"
|
||||
|
||||
async def test_non_owner_cannot_update(self, other_client: AsyncClient, owned_run):
|
||||
"""Non-owner gets 403 when trying to update."""
|
||||
response = await other_client.patch(
|
||||
f"{RUNS_BASE}/{owned_run.id}", json={"name": "Stolen"}
|
||||
)
|
||||
assert response.status_code == 403
|
||||
assert "Only the run owner" in response.json()["detail"]
|
||||
|
||||
async def test_unauthenticated_cannot_update(self, client: AsyncClient, owned_run):
|
||||
"""Unauthenticated user gets 401."""
|
||||
response = await client.patch(
|
||||
f"{RUNS_BASE}/{owned_run.id}", json={"name": "Stolen"}
|
||||
)
|
||||
assert response.status_code == 401
|
||||
|
||||
async def test_unowned_run_rejects_all_updates(
|
||||
self, owner_client: AsyncClient, unowned_run
|
||||
):
|
||||
"""Unowned (legacy) runs cannot be updated by anyone."""
|
||||
response = await owner_client.patch(
|
||||
f"{RUNS_BASE}/{unowned_run.id}", json={"name": "Stolen"}
|
||||
)
|
||||
assert response.status_code == 403
|
||||
assert "no owner" in response.json()["detail"]
|
||||
|
||||
|
||||
class TestRunDeleteOwnership:
|
||||
"""Tests for run DELETE ownership enforcement."""
|
||||
|
||||
async def test_owner_can_delete(self, owner_client: AsyncClient, owned_run):
|
||||
"""Owner can delete their own run."""
|
||||
response = await owner_client.delete(f"{RUNS_BASE}/{owned_run.id}")
|
||||
assert response.status_code == 204
|
||||
|
||||
async def test_non_owner_cannot_delete(self, other_client: AsyncClient, owned_run):
|
||||
"""Non-owner gets 403 when trying to delete."""
|
||||
response = await other_client.delete(f"{RUNS_BASE}/{owned_run.id}")
|
||||
assert response.status_code == 403
|
||||
|
||||
async def test_unauthenticated_cannot_delete(self, client: AsyncClient, owned_run):
|
||||
"""Unauthenticated user gets 401."""
|
||||
response = await client.delete(f"{RUNS_BASE}/{owned_run.id}")
|
||||
assert response.status_code == 401
|
||||
|
||||
async def test_unowned_run_rejects_all_deletes(
|
||||
self, owner_client: AsyncClient, unowned_run
|
||||
):
|
||||
"""Unowned (legacy) runs cannot be deleted by anyone."""
|
||||
response = await owner_client.delete(f"{RUNS_BASE}/{unowned_run.id}")
|
||||
assert response.status_code == 403
|
||||
|
||||
|
||||
class TestEncounterCreateOwnership:
|
||||
"""Tests for encounter POST ownership enforcement."""
|
||||
|
||||
@pytest.fixture
|
||||
async def pokemon(self, db_session: AsyncSession):
|
||||
"""Create a test Pokemon."""
|
||||
from app.models.pokemon import Pokemon
|
||||
|
||||
pokemon = Pokemon(
|
||||
pokeapi_id=25, national_dex=25, name="pikachu", types=["electric"]
|
||||
)
|
||||
db_session.add(pokemon)
|
||||
await db_session.commit()
|
||||
await db_session.refresh(pokemon)
|
||||
return pokemon
|
||||
|
||||
@pytest.fixture
|
||||
async def route(self, db_session: AsyncSession, game: Game):
|
||||
"""Create a test route."""
|
||||
from app.models.route import Route
|
||||
from app.models.version_group import VersionGroup
|
||||
|
||||
vg = VersionGroup(name="Test VG", slug="test-vg")
|
||||
db_session.add(vg)
|
||||
await db_session.flush()
|
||||
|
||||
game.version_group_id = vg.id
|
||||
await db_session.flush()
|
||||
|
||||
route = Route(name="Test Route", version_group_id=vg.id, order=1)
|
||||
db_session.add(route)
|
||||
await db_session.commit()
|
||||
await db_session.refresh(route)
|
||||
return route
|
||||
|
||||
async def test_owner_can_create_encounter(
|
||||
self, owner_client: AsyncClient, owned_run, pokemon, route
|
||||
):
|
||||
"""Owner can create encounters on their own run."""
|
||||
response = await owner_client.post(
|
||||
f"{RUNS_BASE}/{owned_run.id}/encounters",
|
||||
json={
|
||||
"routeId": route.id,
|
||||
"pokemonId": pokemon.id,
|
||||
"status": "caught",
|
||||
},
|
||||
)
|
||||
assert response.status_code == 201
|
||||
|
||||
async def test_non_owner_cannot_create_encounter(
|
||||
self, other_client: AsyncClient, owned_run, pokemon, route
|
||||
):
|
||||
"""Non-owner gets 403 when trying to create encounters."""
|
||||
response = await other_client.post(
|
||||
f"{RUNS_BASE}/{owned_run.id}/encounters",
|
||||
json={
|
||||
"routeId": route.id,
|
||||
"pokemonId": pokemon.id,
|
||||
"status": "caught",
|
||||
},
|
||||
)
|
||||
assert response.status_code == 403
|
||||
|
||||
async def test_unauthenticated_cannot_create_encounter(
|
||||
self, client: AsyncClient, owned_run, pokemon, route
|
||||
):
|
||||
"""Unauthenticated user gets 401."""
|
||||
response = await client.post(
|
||||
f"{RUNS_BASE}/{owned_run.id}/encounters",
|
||||
json={
|
||||
"routeId": route.id,
|
||||
"pokemonId": pokemon.id,
|
||||
"status": "caught",
|
||||
},
|
||||
)
|
||||
assert response.status_code == 401
|
||||
|
||||
|
||||
class TestEncounterUpdateOwnership:
|
||||
"""Tests for encounter PATCH ownership enforcement."""
|
||||
|
||||
@pytest.fixture
|
||||
async def encounter(self, db_session: AsyncSession, owned_run):
|
||||
"""Create a test encounter."""
|
||||
from app.models.encounter import Encounter
|
||||
from app.models.pokemon import Pokemon
|
||||
from app.models.route import Route
|
||||
from app.models.version_group import VersionGroup
|
||||
|
||||
pokemon = Pokemon(
|
||||
pokeapi_id=25, national_dex=25, name="pikachu", types=["electric"]
|
||||
)
|
||||
db_session.add(pokemon)
|
||||
await db_session.flush()
|
||||
|
||||
game = await db_session.get(Game, owned_run.game_id)
|
||||
if game.version_group_id is None:
|
||||
vg = VersionGroup(name="Test VG", slug="test-vg")
|
||||
db_session.add(vg)
|
||||
await db_session.flush()
|
||||
game.version_group_id = vg.id
|
||||
await db_session.flush()
|
||||
else:
|
||||
vg = await db_session.get(VersionGroup, game.version_group_id)
|
||||
|
||||
route = Route(name="Test Route", version_group_id=vg.id, order=1)
|
||||
db_session.add(route)
|
||||
await db_session.flush()
|
||||
|
||||
encounter = Encounter(
|
||||
run_id=owned_run.id,
|
||||
route_id=route.id,
|
||||
pokemon_id=pokemon.id,
|
||||
status="caught",
|
||||
)
|
||||
db_session.add(encounter)
|
||||
await db_session.commit()
|
||||
await db_session.refresh(encounter)
|
||||
return encounter
|
||||
|
||||
async def test_owner_can_update_encounter(
|
||||
self, owner_client: AsyncClient, encounter
|
||||
):
|
||||
"""Owner can update encounters on their own run."""
|
||||
response = await owner_client.patch(
|
||||
f"{ENC_BASE}/{encounter.id}", json={"nickname": "Sparky"}
|
||||
)
|
||||
assert response.status_code == 200
|
||||
assert response.json()["nickname"] == "Sparky"
|
||||
|
||||
async def test_non_owner_cannot_update_encounter(
|
||||
self, other_client: AsyncClient, encounter
|
||||
):
|
||||
"""Non-owner gets 403 when trying to update encounters."""
|
||||
response = await other_client.patch(
|
||||
f"{ENC_BASE}/{encounter.id}", json={"nickname": "Stolen"}
|
||||
)
|
||||
assert response.status_code == 403
|
||||
|
||||
async def test_unauthenticated_cannot_update_encounter(
|
||||
self, client: AsyncClient, encounter
|
||||
):
|
||||
"""Unauthenticated user gets 401."""
|
||||
response = await client.patch(
|
||||
f"{ENC_BASE}/{encounter.id}", json={"nickname": "Stolen"}
|
||||
)
|
||||
assert response.status_code == 401
|
||||
|
||||
|
||||
class TestEncounterDeleteOwnership:
|
||||
"""Tests for encounter DELETE ownership enforcement."""
|
||||
|
||||
@pytest.fixture
|
||||
async def encounter(self, db_session: AsyncSession, owned_run):
|
||||
"""Create a test encounter."""
|
||||
from app.models.encounter import Encounter
|
||||
from app.models.pokemon import Pokemon
|
||||
from app.models.route import Route
|
||||
from app.models.version_group import VersionGroup
|
||||
|
||||
pokemon = Pokemon(
|
||||
pokeapi_id=25, national_dex=25, name="pikachu", types=["electric"]
|
||||
)
|
||||
db_session.add(pokemon)
|
||||
await db_session.flush()
|
||||
|
||||
game = await db_session.get(Game, owned_run.game_id)
|
||||
if game.version_group_id is None:
|
||||
vg = VersionGroup(name="Test VG", slug="test-vg")
|
||||
db_session.add(vg)
|
||||
await db_session.flush()
|
||||
game.version_group_id = vg.id
|
||||
await db_session.flush()
|
||||
else:
|
||||
vg = await db_session.get(VersionGroup, game.version_group_id)
|
||||
|
||||
route = Route(name="Test Route", version_group_id=vg.id, order=1)
|
||||
db_session.add(route)
|
||||
await db_session.flush()
|
||||
|
||||
encounter = Encounter(
|
||||
run_id=owned_run.id,
|
||||
route_id=route.id,
|
||||
pokemon_id=pokemon.id,
|
||||
status="caught",
|
||||
)
|
||||
db_session.add(encounter)
|
||||
await db_session.commit()
|
||||
await db_session.refresh(encounter)
|
||||
return encounter
|
||||
|
||||
async def test_owner_can_delete_encounter(
|
||||
self, owner_client: AsyncClient, encounter
|
||||
):
|
||||
"""Owner can delete encounters on their own run."""
|
||||
response = await owner_client.delete(f"{ENC_BASE}/{encounter.id}")
|
||||
assert response.status_code == 204
|
||||
|
||||
async def test_non_owner_cannot_delete_encounter(
|
||||
self, other_client: AsyncClient, encounter
|
||||
):
|
||||
"""Non-owner gets 403 when trying to delete encounters."""
|
||||
response = await other_client.delete(f"{ENC_BASE}/{encounter.id}")
|
||||
assert response.status_code == 403
|
||||
|
||||
async def test_unauthenticated_cannot_delete_encounter(
|
||||
self, client: AsyncClient, encounter
|
||||
):
|
||||
"""Unauthenticated user gets 401."""
|
||||
response = await client.delete(f"{ENC_BASE}/{encounter.id}")
|
||||
assert response.status_code == 401
|
||||
|
||||
|
||||
class TestRunVisibilityPreserved:
|
||||
"""Verify read access still works for public runs."""
|
||||
|
||||
async def test_non_owner_can_read_public_run(
|
||||
self, other_client: AsyncClient, owned_run
|
||||
):
|
||||
"""Non-owner can read (but not modify) a public run."""
|
||||
response = await other_client.get(f"{RUNS_BASE}/{owned_run.id}")
|
||||
assert response.status_code == 200
|
||||
assert response.json()["id"] == owned_run.id
|
||||
|
||||
async def test_unauthenticated_can_read_public_run(
|
||||
self, client: AsyncClient, owned_run
|
||||
):
|
||||
"""Unauthenticated user can read a public run."""
|
||||
response = await client.get(f"{RUNS_BASE}/{owned_run.id}")
|
||||
assert response.status_code == 200
|
||||
Reference in New Issue
Block a user