diff --git a/.beans/nuzlocke-tracker-73ba--enforce-run-ownership-on-all-mutation-endpoints.md b/.beans/nuzlocke-tracker-73ba--enforce-run-ownership-on-all-mutation-endpoints.md new file mode 100644 index 0000000..c5d59ee --- /dev/null +++ b/.beans/nuzlocke-tracker-73ba--enforce-run-ownership-on-all-mutation-endpoints.md @@ -0,0 +1,84 @@ +--- +# nuzlocke-tracker-73ba +title: Enforce run ownership on all mutation endpoints +status: completed +type: bug +priority: critical +created_at: 2026-03-21T12:18:27Z +updated_at: 2026-03-21T12:28:35Z +parent: nuzlocke-tracker-wwnu +--- + +## Problem + +Backend mutation endpoints for encounters, bosses, and run updates use `require_auth` but do NOT verify the authenticated user is the run's owner. Any authenticated user can modify any run's encounters, mark bosses as defeated, or change run settings. + +Additionally, `_check_run_access` in `runs.py:184` allows anyone to edit unowned (legacy) runs when `require_owner=False`. + +### Affected endpoints + +**encounters.py** — all mutations use `require_auth` with no ownership check: +- `POST /runs/{run_id}/encounters` (line 35) +- `PATCH /runs/{run_id}/encounters/{encounter_id}` (line 142) +- `DELETE /runs/{run_id}/encounters/{encounter_id}` (line 171) +- `POST /runs/{run_id}/encounters/bulk-randomize` (line 203) + +**bosses.py** — boss result mutations: +- `POST /runs/{run_id}/boss-results` (line 347) +- `DELETE /runs/{run_id}/boss-results/{result_id}` (line 428) + +**runs.py** — run updates/deletion: +- `PATCH /runs/{run_id}` (line 379) — uses `_check_run_access(run, user, require_owner=run.owner_id is not None)` which skips check for unowned runs +- `DELETE /runs/{run_id}` (line 488) — same conditional check + +**genlockes.py** — genlocke mutations: +- `POST /genlockes` (line 439) — no owner assigned to created genlocke or its first run +- `PATCH /genlockes/{id}` (line 824) — no ownership check +- `DELETE /genlockes/{id}` (line 862) — no ownership check +- `POST /genlockes/{id}/legs/{leg_order}/advance` (line 569) — no ownership check +- `POST /genlockes/{id}/legs` (line 894) — no ownership check +- `DELETE /genlockes/{id}/legs/{leg_id}` (line 936) — no ownership check + +## Approach + +1. Add a reusable `_check_run_owner(run, user)` helper in `auth.py` or `runs.py` that raises 403 if `user.id != str(run.owner_id)` (no fallback for unowned runs — they should be read-only) +2. Apply ownership check to ALL encounter/boss/run mutation endpoints +3. For genlocke mutations, load the first leg's run and verify ownership against that +4. Update `_check_run_access` to always require ownership for mutations (remove the `require_owner` conditional) +5. When creating runs (standalone or via genlocke), set `owner_id` from the authenticated user + +## Checklist + +- [x] Add `_check_run_owner` helper that rejects non-owners (including unowned/legacy runs) +- [x] Apply ownership check to all 4 encounter mutation endpoints +- [x] Apply ownership check to both boss result mutation endpoints +- [x] Fix `_check_run_access` to always require ownership on mutations +- [x] Set `owner_id` on run creation in `runs.py` and `genlockes.py` (create_genlocke, advance_leg) +- [x] Apply ownership check to all genlocke mutation endpoints (via first leg's run owner) +- [x] Add tests for ownership enforcement (403 for non-owner, 401 for unauthenticated) + +## Summary of Changes + +Added `require_run_owner` helper in `auth.py` that enforces ownership on mutation endpoints: +- Returns 403 for unowned (legacy) runs - they are now read-only +- Returns 403 if authenticated user is not the run's owner + +Applied ownership checks to: +- All 4 encounter mutation endpoints (create, update, delete, bulk-randomize) +- Both boss result mutation endpoints (create, delete) +- Run update and delete endpoints (via `require_run_owner`) +- All 5 genlocke mutation endpoints (update, delete, advance_leg, add_leg, remove_leg via `_check_genlocke_owner`) + +Added `owner_id` on run creation: +- `runs.py`: create_run already sets owner_id (verified) +- `genlockes.py`: create_genlocke now sets owner_id on the first run +- `genlockes.py`: advance_leg preserves owner_id from current run to new run + +Renamed `_check_run_access` to `_check_run_read_access` (read-only visibility check) for clarity. + +Added 22 comprehensive tests in `test_ownership.py` covering: +- Owner can perform mutations +- Non-owner gets 403 on mutations +- Unauthenticated user gets 401 +- Unowned (legacy) runs reject all mutations +- Read access preserved for public runs diff --git a/backend/src/app/api/bosses.py b/backend/src/app/api/bosses.py index b03fa6f..4e05268 100644 --- a/backend/src/app/api/bosses.py +++ b/backend/src/app/api/bosses.py @@ -5,7 +5,7 @@ from sqlalchemy import or_, select from sqlalchemy.ext.asyncio import AsyncSession from sqlalchemy.orm import selectinload -from app.core.auth import AuthUser, require_admin, require_auth +from app.core.auth import AuthUser, require_admin, require_auth, require_run_owner from app.core.database import get_session from app.models.boss_battle import BossBattle from app.models.boss_pokemon import BossPokemon @@ -344,12 +344,14 @@ async def create_boss_result( run_id: int, data: BossResultCreate, session: AsyncSession = Depends(get_session), - _user: AuthUser = Depends(require_auth), + user: AuthUser = Depends(require_auth), ): run = await session.get(NuzlockeRun, run_id) if run is None: raise HTTPException(status_code=404, detail="Run not found") + require_run_owner(run, user) + boss = await session.get(BossBattle, data.boss_battle_id) if boss is None: raise HTTPException(status_code=404, detail="Boss battle not found") @@ -425,8 +427,14 @@ async def delete_boss_result( run_id: int, result_id: int, session: AsyncSession = Depends(get_session), - _user: AuthUser = Depends(require_auth), + user: AuthUser = Depends(require_auth), ): + run = await session.get(NuzlockeRun, run_id) + if run is None: + raise HTTPException(status_code=404, detail="Run not found") + + require_run_owner(run, user) + result = await session.execute( select(BossResult).where( BossResult.id == result_id, BossResult.run_id == run_id diff --git a/backend/src/app/api/encounters.py b/backend/src/app/api/encounters.py index fc92d37..fb71a99 100644 --- a/backend/src/app/api/encounters.py +++ b/backend/src/app/api/encounters.py @@ -5,7 +5,7 @@ from sqlalchemy import select from sqlalchemy.ext.asyncio import AsyncSession from sqlalchemy.orm import joinedload, selectinload -from app.core.auth import AuthUser, require_auth +from app.core.auth import AuthUser, require_auth, require_run_owner from app.core.database import get_session from app.models.encounter import Encounter from app.models.evolution import Evolution @@ -36,13 +36,15 @@ async def create_encounter( run_id: int, data: EncounterCreate, session: AsyncSession = Depends(get_session), - _user: AuthUser = Depends(require_auth), + user: AuthUser = Depends(require_auth), ): # Validate run exists run = await session.get(NuzlockeRun, run_id) if run is None: raise HTTPException(status_code=404, detail="Run not found") + require_run_owner(run, user) + # Validate route exists and load its children result = await session.execute( select(Route) @@ -139,12 +141,17 @@ async def update_encounter( encounter_id: int, data: EncounterUpdate, session: AsyncSession = Depends(get_session), - _user: AuthUser = Depends(require_auth), + user: AuthUser = Depends(require_auth), ): encounter = await session.get(Encounter, encounter_id) if encounter is None: raise HTTPException(status_code=404, detail="Encounter not found") + run = await session.get(NuzlockeRun, encounter.run_id) + if run is None: + raise HTTPException(status_code=404, detail="Run not found") + require_run_owner(run, user) + update_data = data.model_dump(exclude_unset=True) for field, value in update_data.items(): setattr(encounter, field, value) @@ -168,12 +175,17 @@ async def update_encounter( async def delete_encounter( encounter_id: int, session: AsyncSession = Depends(get_session), - _user: AuthUser = Depends(require_auth), + user: AuthUser = Depends(require_auth), ): encounter = await session.get(Encounter, encounter_id) if encounter is None: raise HTTPException(status_code=404, detail="Encounter not found") + run = await session.get(NuzlockeRun, encounter.run_id) + if run is None: + raise HTTPException(status_code=404, detail="Run not found") + require_run_owner(run, user) + # Block deletion if encounter is referenced by a genlocke transfer transfer_result = await session.execute( select(GenlockeTransfer.id).where( @@ -200,12 +212,15 @@ async def delete_encounter( async def bulk_randomize_encounters( run_id: int, session: AsyncSession = Depends(get_session), - _user: AuthUser = Depends(require_auth), + user: AuthUser = Depends(require_auth), ): # 1. Validate run run = await session.get(NuzlockeRun, run_id) if run is None: raise HTTPException(status_code=404, detail="Run not found") + + require_run_owner(run, user) + if run.status != "active": raise HTTPException(status_code=400, detail="Run is not active") diff --git a/backend/src/app/api/genlockes.py b/backend/src/app/api/genlockes.py index 5ccabf2..3222b98 100644 --- a/backend/src/app/api/genlockes.py +++ b/backend/src/app/api/genlockes.py @@ -1,3 +1,5 @@ +from uuid import UUID + from fastapi import APIRouter, Depends, HTTPException from pydantic import BaseModel from sqlalchemy import delete as sa_delete @@ -6,7 +8,7 @@ from sqlalchemy import update as sa_update from sqlalchemy.ext.asyncio import AsyncSession from sqlalchemy.orm import selectinload -from app.core.auth import AuthUser, require_auth +from app.core.auth import AuthUser, require_auth, require_run_owner from app.core.database import get_session from app.models.encounter import Encounter from app.models.evolution import Evolution @@ -16,6 +18,7 @@ from app.models.genlocke_transfer import GenlockeTransfer from app.models.nuzlocke_run import NuzlockeRun from app.models.pokemon import Pokemon from app.models.route import Route +from app.models.user import User from app.schemas.genlocke import ( AddLegRequest, AdvanceLegRequest, @@ -41,6 +44,38 @@ from app.services.families import build_families, resolve_base_form router = APIRouter() +async def _check_genlocke_owner( + genlocke_id: int, + user: AuthUser, + session: AsyncSession, +) -> None: + """ + Verify user owns the genlocke via the first leg's run. + Raises 404 if the genlocke doesn't exist. + Raises 403 if the first leg has a run with a different owner. + Raises 403 if the first leg has an unowned run (read-only legacy data). + """ + # First check if genlocke exists + genlocke = await session.get(Genlocke, genlocke_id) + if genlocke is None: + raise HTTPException(status_code=404, detail="Genlocke not found") + + leg_result = await session.execute( + select(GenlockeLeg) + .where(GenlockeLeg.genlocke_id == genlocke_id, GenlockeLeg.leg_order == 1) + .options(selectinload(GenlockeLeg.run)) + ) + first_leg = leg_result.scalar_one_or_none() + + if first_leg is None or first_leg.run is None: + raise HTTPException( + status_code=403, + detail="Cannot modify genlocke: no run found for first leg", + ) + + require_run_owner(first_leg.run, user) + + @router.get("", response_model=list[GenlockeListItem]) async def list_genlockes(session: AsyncSession = Depends(get_session)): result = await session.execute( @@ -440,7 +475,7 @@ async def get_genlocke_lineages( async def create_genlocke( data: GenlockeCreate, session: AsyncSession = Depends(get_session), - _user: AuthUser = Depends(require_auth), + user: AuthUser = Depends(require_auth), ): if not data.game_ids: raise HTTPException(status_code=400, detail="At least one game is required") @@ -455,6 +490,13 @@ async def create_genlocke( if missing: raise HTTPException(status_code=404, detail=f"Games not found: {missing}") + # Ensure user exists in local DB + user_id = UUID(user.id) + db_user = await session.get(User, user_id) + if db_user is None: + db_user = User(id=user_id, email=user.email or "") + session.add(db_user) + # Create genlocke genlocke = Genlocke( name=data.name.strip(), @@ -481,6 +523,7 @@ async def create_genlocke( first_game = found_games[data.game_ids[0]] first_run = NuzlockeRun( game_id=first_game.id, + owner_id=user_id, name=f"{data.name.strip()} \u2014 Leg 1", status="active", rules=data.nuzlocke_rules, @@ -571,8 +614,10 @@ async def advance_leg( leg_order: int, data: AdvanceLegRequest | None = None, session: AsyncSession = Depends(get_session), - _user: AuthUser = Depends(require_auth), + user: AuthUser = Depends(require_auth), ): + await _check_genlocke_owner(genlocke_id, user, session) + # Load genlocke with legs result = await session.execute( select(Genlocke) @@ -653,9 +698,10 @@ async def advance_leg( else: current_leg.retired_pokemon_ids = [] - # Create a new run for the next leg + # Create a new run for the next leg, preserving owner from current run new_run = NuzlockeRun( game_id=next_leg.game_id, + owner_id=current_run.owner_id, name=f"{genlocke.name} \u2014 Leg {next_leg.leg_order}", status="active", rules=genlocke.nuzlocke_rules, @@ -826,8 +872,10 @@ async def update_genlocke( genlocke_id: int, data: GenlockeUpdate, session: AsyncSession = Depends(get_session), - _user: AuthUser = Depends(require_auth), + user: AuthUser = Depends(require_auth), ): + await _check_genlocke_owner(genlocke_id, user, session) + result = await session.execute( select(Genlocke) .where(Genlocke.id == genlocke_id) @@ -863,8 +911,10 @@ async def update_genlocke( async def delete_genlocke( genlocke_id: int, session: AsyncSession = Depends(get_session), - _user: AuthUser = Depends(require_auth), + user: AuthUser = Depends(require_auth), ): + await _check_genlocke_owner(genlocke_id, user, session) + genlocke = await session.get(Genlocke, genlocke_id) if genlocke is None: raise HTTPException(status_code=404, detail="Genlocke not found") @@ -895,8 +945,10 @@ async def add_leg( genlocke_id: int, data: AddLegRequest, session: AsyncSession = Depends(get_session), - _user: AuthUser = Depends(require_auth), + user: AuthUser = Depends(require_auth), ): + await _check_genlocke_owner(genlocke_id, user, session) + genlocke = await session.get(Genlocke, genlocke_id) if genlocke is None: raise HTTPException(status_code=404, detail="Genlocke not found") @@ -938,8 +990,10 @@ async def remove_leg( genlocke_id: int, leg_id: int, session: AsyncSession = Depends(get_session), - _user: AuthUser = Depends(require_auth), + user: AuthUser = Depends(require_auth), ): + await _check_genlocke_owner(genlocke_id, user, session) + result = await session.execute( select(GenlockeLeg).where( GenlockeLeg.id == leg_id, diff --git a/backend/src/app/api/runs.py b/backend/src/app/api/runs.py index 52d9d1e..53282ea 100644 --- a/backend/src/app/api/runs.py +++ b/backend/src/app/api/runs.py @@ -6,7 +6,7 @@ from sqlalchemy import func, select from sqlalchemy.ext.asyncio import AsyncSession from sqlalchemy.orm import joinedload, selectinload -from app.core.auth import AuthUser, get_current_user, require_auth +from app.core.auth import AuthUser, get_current_user, require_auth, require_run_owner from app.core.database import get_session from app.models.boss_result import BossResult from app.models.encounter import Encounter @@ -181,31 +181,17 @@ def _build_run_response(run: NuzlockeRun) -> RunResponse: ) -def _check_run_access( - run: NuzlockeRun, user: AuthUser | None, require_owner: bool = False -) -> None: +def _check_run_read_access(run: NuzlockeRun, user: AuthUser | None) -> None: """ - Check if user can access the run. + Check if user can read the run. Raises 403 for private runs if user is not owner. - If require_owner=True, always requires ownership (for mutations). + Unowned runs are readable by everyone (legacy). """ if run.owner_id is None: - # Unowned runs are accessible by everyone (legacy) - if require_owner: - raise HTTPException( - status_code=403, detail="Only the run owner can perform this action" - ) return user_id = UUID(user.id) if user else None - if require_owner: - if user_id != run.owner_id: - raise HTTPException( - status_code=403, detail="Only the run owner can perform this action" - ) - return - if run.visibility == RunVisibility.PRIVATE and user_id != run.owner_id: raise HTTPException(status_code=403, detail="This run is private") @@ -301,7 +287,7 @@ async def get_run( raise HTTPException(status_code=404, detail="Run not found") # Check visibility access - _check_run_access(run, user) + _check_run_read_access(run, user) # Check if this run belongs to a genlocke genlocke_context = None @@ -375,8 +361,7 @@ async def update_run( if run is None: raise HTTPException(status_code=404, detail="Run not found") - # Check ownership for mutations (unowned runs allow anyone for backwards compat) - _check_run_access(run, user, require_owner=run.owner_id is not None) + require_run_owner(run, user) update_data = data.model_dump(exclude_unset=True) @@ -484,8 +469,7 @@ async def delete_run( if run is None: raise HTTPException(status_code=404, detail="Run not found") - # Check ownership for deletion (unowned runs allow anyone for backwards compat) - _check_run_access(run, user, require_owner=run.owner_id is not None) + require_run_owner(run, user) # Block deletion if run is linked to a genlocke leg leg_result = await session.execute( diff --git a/backend/src/app/core/auth.py b/backend/src/app/core/auth.py index 6a5b392..d2bb37a 100644 --- a/backend/src/app/core/auth.py +++ b/backend/src/app/core/auth.py @@ -8,6 +8,7 @@ from sqlalchemy.ext.asyncio import AsyncSession from app.core.config import settings from app.core.database import get_session +from app.models.nuzlocke_run import NuzlockeRun from app.models.user import User @@ -105,3 +106,20 @@ async def require_admin( detail="Admin access required", ) return user + + +def require_run_owner(run: NuzlockeRun, user: AuthUser) -> None: + """ + Verify user owns the run. Raises 403 if not owner. + Unowned (legacy) runs are read-only and reject all mutations. + """ + if run.owner_id is None: + raise HTTPException( + status_code=status.HTTP_403_FORBIDDEN, + detail="This run has no owner and cannot be modified", + ) + if UUID(user.id) != run.owner_id: + raise HTTPException( + status_code=status.HTTP_403_FORBIDDEN, + detail="Only the run owner can perform this action", + ) diff --git a/backend/tests/test_ownership.py b/backend/tests/test_ownership.py new file mode 100644 index 0000000..2135502 --- /dev/null +++ b/backend/tests/test_ownership.py @@ -0,0 +1,447 @@ +"""Tests for run ownership enforcement on mutation endpoints.""" + +from uuid import UUID + +import pytest +from httpx import ASGITransport, AsyncClient +from sqlalchemy.ext.asyncio import AsyncSession + +from app.core.auth import AuthUser, get_current_user, require_run_owner +from app.main import app +from app.models.game import Game +from app.models.nuzlocke_run import NuzlockeRun +from app.models.user import User + +RUNS_BASE = "/api/v1/runs" +ENC_BASE = "/api/v1/encounters" + +OWNER_ID = "00000000-0000-4000-a000-000000000001" +OTHER_USER_ID = "00000000-0000-4000-a000-000000000002" + + +@pytest.fixture +async def game(db_session: AsyncSession) -> Game: + """Create a test game.""" + game = Game(name="Test Game", slug="test-game", generation=1, region="kanto") + db_session.add(game) + await db_session.commit() + await db_session.refresh(game) + return game + + +@pytest.fixture +async def owner_user(db_session: AsyncSession) -> User: + """Create the owner user.""" + user = User(id=UUID(OWNER_ID), email="owner@example.com") + db_session.add(user) + await db_session.commit() + await db_session.refresh(user) + return user + + +@pytest.fixture +async def other_user(db_session: AsyncSession) -> User: + """Create another user who is not the owner.""" + user = User(id=UUID(OTHER_USER_ID), email="other@example.com") + db_session.add(user) + await db_session.commit() + await db_session.refresh(user) + return user + + +@pytest.fixture +async def owned_run( + db_session: AsyncSession, game: Game, owner_user: User +) -> NuzlockeRun: + """Create a run owned by the test owner.""" + run = NuzlockeRun( + game_id=game.id, + owner_id=owner_user.id, + name="Owned Run", + status="active", + ) + db_session.add(run) + await db_session.commit() + await db_session.refresh(run) + return run + + +@pytest.fixture +async def unowned_run(db_session: AsyncSession, game: Game) -> NuzlockeRun: + """Create a legacy run with no owner.""" + run = NuzlockeRun( + game_id=game.id, + owner_id=None, + name="Unowned Run", + status="active", + ) + db_session.add(run) + await db_session.commit() + await db_session.refresh(run) + return run + + +@pytest.fixture +def owner_auth_override(owner_user): + """Override auth to return the owner user.""" + + def _override(): + return AuthUser(id=OWNER_ID, email="owner@example.com") + + app.dependency_overrides[get_current_user] = _override + yield + app.dependency_overrides.pop(get_current_user, None) + + +@pytest.fixture +def other_auth_override(other_user): + """Override auth to return a different user (not the owner).""" + + def _override(): + return AuthUser(id=OTHER_USER_ID, email="other@example.com") + + app.dependency_overrides[get_current_user] = _override + yield + app.dependency_overrides.pop(get_current_user, None) + + +@pytest.fixture +async def owner_client(db_session, owner_auth_override): + """Client authenticated as the owner.""" + async with AsyncClient( + transport=ASGITransport(app=app), base_url="http://test" + ) as ac: + yield ac + + +@pytest.fixture +async def other_client(db_session, other_auth_override): + """Client authenticated as a different user.""" + async with AsyncClient( + transport=ASGITransport(app=app), base_url="http://test" + ) as ac: + yield ac + + +class TestRequireRunOwnerHelper: + """Tests for the require_run_owner helper function.""" + + async def test_owner_passes(self, owner_user, owned_run): + """Owner can access their own run.""" + auth_user = AuthUser(id=str(owner_user.id), email="owner@example.com") + require_run_owner(owned_run, auth_user) + + async def test_non_owner_raises_403(self, other_user, owned_run): + """Non-owner is rejected with 403.""" + from fastapi import HTTPException + + auth_user = AuthUser(id=str(other_user.id), email="other@example.com") + with pytest.raises(HTTPException) as exc_info: + require_run_owner(owned_run, auth_user) + assert exc_info.value.status_code == 403 + assert "Only the run owner" in exc_info.value.detail + + async def test_unowned_run_raises_403(self, owner_user, unowned_run): + """Unowned runs reject all mutations with 403.""" + from fastapi import HTTPException + + auth_user = AuthUser(id=str(owner_user.id), email="owner@example.com") + with pytest.raises(HTTPException) as exc_info: + require_run_owner(unowned_run, auth_user) + assert exc_info.value.status_code == 403 + assert "no owner" in exc_info.value.detail + + +class TestRunUpdateOwnership: + """Tests for run PATCH ownership enforcement.""" + + async def test_owner_can_update(self, owner_client: AsyncClient, owned_run): + """Owner can update their own run.""" + response = await owner_client.patch( + f"{RUNS_BASE}/{owned_run.id}", json={"name": "Updated Name"} + ) + assert response.status_code == 200 + assert response.json()["name"] == "Updated Name" + + async def test_non_owner_cannot_update(self, other_client: AsyncClient, owned_run): + """Non-owner gets 403 when trying to update.""" + response = await other_client.patch( + f"{RUNS_BASE}/{owned_run.id}", json={"name": "Stolen"} + ) + assert response.status_code == 403 + assert "Only the run owner" in response.json()["detail"] + + async def test_unauthenticated_cannot_update(self, client: AsyncClient, owned_run): + """Unauthenticated user gets 401.""" + response = await client.patch( + f"{RUNS_BASE}/{owned_run.id}", json={"name": "Stolen"} + ) + assert response.status_code == 401 + + async def test_unowned_run_rejects_all_updates( + self, owner_client: AsyncClient, unowned_run + ): + """Unowned (legacy) runs cannot be updated by anyone.""" + response = await owner_client.patch( + f"{RUNS_BASE}/{unowned_run.id}", json={"name": "Stolen"} + ) + assert response.status_code == 403 + assert "no owner" in response.json()["detail"] + + +class TestRunDeleteOwnership: + """Tests for run DELETE ownership enforcement.""" + + async def test_owner_can_delete(self, owner_client: AsyncClient, owned_run): + """Owner can delete their own run.""" + response = await owner_client.delete(f"{RUNS_BASE}/{owned_run.id}") + assert response.status_code == 204 + + async def test_non_owner_cannot_delete(self, other_client: AsyncClient, owned_run): + """Non-owner gets 403 when trying to delete.""" + response = await other_client.delete(f"{RUNS_BASE}/{owned_run.id}") + assert response.status_code == 403 + + async def test_unauthenticated_cannot_delete(self, client: AsyncClient, owned_run): + """Unauthenticated user gets 401.""" + response = await client.delete(f"{RUNS_BASE}/{owned_run.id}") + assert response.status_code == 401 + + async def test_unowned_run_rejects_all_deletes( + self, owner_client: AsyncClient, unowned_run + ): + """Unowned (legacy) runs cannot be deleted by anyone.""" + response = await owner_client.delete(f"{RUNS_BASE}/{unowned_run.id}") + assert response.status_code == 403 + + +class TestEncounterCreateOwnership: + """Tests for encounter POST ownership enforcement.""" + + @pytest.fixture + async def pokemon(self, db_session: AsyncSession): + """Create a test Pokemon.""" + from app.models.pokemon import Pokemon + + pokemon = Pokemon( + pokeapi_id=25, national_dex=25, name="pikachu", types=["electric"] + ) + db_session.add(pokemon) + await db_session.commit() + await db_session.refresh(pokemon) + return pokemon + + @pytest.fixture + async def route(self, db_session: AsyncSession, game: Game): + """Create a test route.""" + from app.models.route import Route + from app.models.version_group import VersionGroup + + vg = VersionGroup(name="Test VG", slug="test-vg") + db_session.add(vg) + await db_session.flush() + + game.version_group_id = vg.id + await db_session.flush() + + route = Route(name="Test Route", version_group_id=vg.id, order=1) + db_session.add(route) + await db_session.commit() + await db_session.refresh(route) + return route + + async def test_owner_can_create_encounter( + self, owner_client: AsyncClient, owned_run, pokemon, route + ): + """Owner can create encounters on their own run.""" + response = await owner_client.post( + f"{RUNS_BASE}/{owned_run.id}/encounters", + json={ + "routeId": route.id, + "pokemonId": pokemon.id, + "status": "caught", + }, + ) + assert response.status_code == 201 + + async def test_non_owner_cannot_create_encounter( + self, other_client: AsyncClient, owned_run, pokemon, route + ): + """Non-owner gets 403 when trying to create encounters.""" + response = await other_client.post( + f"{RUNS_BASE}/{owned_run.id}/encounters", + json={ + "routeId": route.id, + "pokemonId": pokemon.id, + "status": "caught", + }, + ) + assert response.status_code == 403 + + async def test_unauthenticated_cannot_create_encounter( + self, client: AsyncClient, owned_run, pokemon, route + ): + """Unauthenticated user gets 401.""" + response = await client.post( + f"{RUNS_BASE}/{owned_run.id}/encounters", + json={ + "routeId": route.id, + "pokemonId": pokemon.id, + "status": "caught", + }, + ) + assert response.status_code == 401 + + +class TestEncounterUpdateOwnership: + """Tests for encounter PATCH ownership enforcement.""" + + @pytest.fixture + async def encounter(self, db_session: AsyncSession, owned_run): + """Create a test encounter.""" + from app.models.encounter import Encounter + from app.models.pokemon import Pokemon + from app.models.route import Route + from app.models.version_group import VersionGroup + + pokemon = Pokemon( + pokeapi_id=25, national_dex=25, name="pikachu", types=["electric"] + ) + db_session.add(pokemon) + await db_session.flush() + + game = await db_session.get(Game, owned_run.game_id) + if game.version_group_id is None: + vg = VersionGroup(name="Test VG", slug="test-vg") + db_session.add(vg) + await db_session.flush() + game.version_group_id = vg.id + await db_session.flush() + else: + vg = await db_session.get(VersionGroup, game.version_group_id) + + route = Route(name="Test Route", version_group_id=vg.id, order=1) + db_session.add(route) + await db_session.flush() + + encounter = Encounter( + run_id=owned_run.id, + route_id=route.id, + pokemon_id=pokemon.id, + status="caught", + ) + db_session.add(encounter) + await db_session.commit() + await db_session.refresh(encounter) + return encounter + + async def test_owner_can_update_encounter( + self, owner_client: AsyncClient, encounter + ): + """Owner can update encounters on their own run.""" + response = await owner_client.patch( + f"{ENC_BASE}/{encounter.id}", json={"nickname": "Sparky"} + ) + assert response.status_code == 200 + assert response.json()["nickname"] == "Sparky" + + async def test_non_owner_cannot_update_encounter( + self, other_client: AsyncClient, encounter + ): + """Non-owner gets 403 when trying to update encounters.""" + response = await other_client.patch( + f"{ENC_BASE}/{encounter.id}", json={"nickname": "Stolen"} + ) + assert response.status_code == 403 + + async def test_unauthenticated_cannot_update_encounter( + self, client: AsyncClient, encounter + ): + """Unauthenticated user gets 401.""" + response = await client.patch( + f"{ENC_BASE}/{encounter.id}", json={"nickname": "Stolen"} + ) + assert response.status_code == 401 + + +class TestEncounterDeleteOwnership: + """Tests for encounter DELETE ownership enforcement.""" + + @pytest.fixture + async def encounter(self, db_session: AsyncSession, owned_run): + """Create a test encounter.""" + from app.models.encounter import Encounter + from app.models.pokemon import Pokemon + from app.models.route import Route + from app.models.version_group import VersionGroup + + pokemon = Pokemon( + pokeapi_id=25, national_dex=25, name="pikachu", types=["electric"] + ) + db_session.add(pokemon) + await db_session.flush() + + game = await db_session.get(Game, owned_run.game_id) + if game.version_group_id is None: + vg = VersionGroup(name="Test VG", slug="test-vg") + db_session.add(vg) + await db_session.flush() + game.version_group_id = vg.id + await db_session.flush() + else: + vg = await db_session.get(VersionGroup, game.version_group_id) + + route = Route(name="Test Route", version_group_id=vg.id, order=1) + db_session.add(route) + await db_session.flush() + + encounter = Encounter( + run_id=owned_run.id, + route_id=route.id, + pokemon_id=pokemon.id, + status="caught", + ) + db_session.add(encounter) + await db_session.commit() + await db_session.refresh(encounter) + return encounter + + async def test_owner_can_delete_encounter( + self, owner_client: AsyncClient, encounter + ): + """Owner can delete encounters on their own run.""" + response = await owner_client.delete(f"{ENC_BASE}/{encounter.id}") + assert response.status_code == 204 + + async def test_non_owner_cannot_delete_encounter( + self, other_client: AsyncClient, encounter + ): + """Non-owner gets 403 when trying to delete encounters.""" + response = await other_client.delete(f"{ENC_BASE}/{encounter.id}") + assert response.status_code == 403 + + async def test_unauthenticated_cannot_delete_encounter( + self, client: AsyncClient, encounter + ): + """Unauthenticated user gets 401.""" + response = await client.delete(f"{ENC_BASE}/{encounter.id}") + assert response.status_code == 401 + + +class TestRunVisibilityPreserved: + """Verify read access still works for public runs.""" + + async def test_non_owner_can_read_public_run( + self, other_client: AsyncClient, owned_run + ): + """Non-owner can read (but not modify) a public run.""" + response = await other_client.get(f"{RUNS_BASE}/{owned_run.id}") + assert response.status_code == 200 + assert response.json()["id"] == owned_run.id + + async def test_unauthenticated_can_read_public_run( + self, client: AsyncClient, owned_run + ): + """Unauthenticated user can read a public run.""" + response = await client.get(f"{RUNS_BASE}/{owned_run.id}") + assert response.status_code == 200 diff --git a/backend/tests/test_runs.py b/backend/tests/test_runs.py index ed48f01..651234b 100644 --- a/backend/tests/test_runs.py +++ b/backend/tests/test_runs.py @@ -1,5 +1,7 @@ """Integration tests for the Runs & Encounters API.""" +from uuid import UUID + import pytest from httpx import AsyncClient from sqlalchemy.ext.asyncio import AsyncSession @@ -8,8 +10,11 @@ from app.models.game import Game from app.models.nuzlocke_run import NuzlockeRun from app.models.pokemon import Pokemon from app.models.route import Route +from app.models.user import User from app.models.version_group import VersionGroup +MOCK_AUTH_USER_ID = UUID("00000000-0000-4000-a000-000000000001") + RUNS_BASE = "/api/v1/runs" ENC_BASE = "/api/v1/encounters" @@ -42,6 +47,11 @@ async def run(auth_client: AsyncClient, game_id: int) -> dict: @pytest.fixture async def enc_ctx(db_session: AsyncSession) -> dict: """Full context for encounter tests: game, run, pokemon, standalone and grouped routes.""" + # Create the mock auth user to own the run + user = User(id=MOCK_AUTH_USER_ID, email="test@example.com") + db_session.add(user) + await db_session.flush() + vg = VersionGroup(name="Enc Test VG", slug="enc-test-vg") db_session.add(vg) await db_session.flush() @@ -83,6 +93,7 @@ async def enc_ctx(db_session: AsyncSession) -> dict: run = NuzlockeRun( game_id=game.id, + owner_id=user.id, name="Enc Run", status="active", rules={"shinyClause": True, "giftClause": False},