feat: add auth system, boss pokemon details, moves/abilities API, and run ownership
Some checks failed
CI / backend-tests (push) Failing after 1m16s
CI / frontend-tests (push) Successful in 57s

Add user authentication with login/signup/protected routes, boss pokemon
detail fields and result team tracking, moves and abilities selector
components and API, run ownership and visibility controls, and various
UI improvements across encounters, run list, and journal pages.

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
This commit is contained in:
2026-03-20 21:41:38 +01:00
parent a6cb309b8b
commit 0a519e356e
69 changed files with 3574 additions and 693 deletions

View File

@@ -7,3 +7,8 @@ API_V1_PREFIX="/api/v1"
# Database settings
DATABASE_URL="sqlite:///./nuzlocke.db"
# Supabase Auth
SUPABASE_URL=https://your-project.supabase.co
SUPABASE_ANON_KEY=your-anon-key
SUPABASE_JWT_SECRET=your-jwt-secret

View File

@@ -13,6 +13,7 @@ dependencies = [
"sqlalchemy[asyncio]==2.0.48",
"asyncpg==0.31.0",
"alembic==1.18.4",
"PyJWT==2.10.1",
]
[project.optional-dependencies]

View File

@@ -0,0 +1,81 @@
#!/usr/bin/env python3
"""Assign existing unowned runs to a user.
Usage:
cd backend && uv run python scripts/assign_unowned_runs.py <user_uuid>
This script assigns all runs without an owner to the specified user.
Useful for migrating existing data after implementing user ownership.
"""
import asyncio
import sys
from uuid import UUID
from sqlalchemy import select, update
sys.path.insert(0, "src")
from app.core.database import async_session # noqa: E402
from app.models.nuzlocke_run import NuzlockeRun # noqa: E402
from app.models.user import User # noqa: E402
async def main(user_uuid: str) -> None:
try:
user_id = UUID(user_uuid)
except ValueError:
print(f"Error: Invalid UUID format: {user_uuid}")
sys.exit(1)
async with async_session() as session:
# Verify user exists
user_result = await session.execute(select(User).where(User.id == user_id))
user = user_result.scalar_one_or_none()
if user is None:
print(f"Error: User {user_id} not found")
sys.exit(1)
print(f"Found user: {user.email} (display_name: {user.display_name})")
# Count unowned runs
count_result = await session.execute(
select(NuzlockeRun.id, NuzlockeRun.name).where(
NuzlockeRun.owner_id.is_(None)
)
)
unowned_runs = count_result.all()
if not unowned_runs:
print("No unowned runs found.")
return
print(f"\nFound {len(unowned_runs)} unowned run(s):")
for run_id, run_name in unowned_runs:
print(f" - [{run_id}] {run_name}")
# Confirm action
confirm = input(f"\nAssign all {len(unowned_runs)} runs to this user? [y/N] ")
if confirm.lower() != "y":
print("Aborted.")
return
# Perform the update
await session.execute(
update(NuzlockeRun)
.where(NuzlockeRun.owner_id.is_(None))
.values(owner_id=user_id)
)
await session.commit()
print(f"\nAssigned {len(unowned_runs)} run(s) to user {user.email}")
if __name__ == "__main__":
if len(sys.argv) != 2:
print("Usage: python scripts/assign_unowned_runs.py <user_uuid>")
print("\nExample:")
print(" uv run python scripts/assign_unowned_runs.py 550e8400-e29b-41d4-a716-446655440000")
sys.exit(1)
asyncio.run(main(sys.argv[1]))

View File

@@ -0,0 +1,62 @@
"""add boss pokemon details
Revision ID: l3a4b5c6d7e8
Revises: k2f3a4b5c6d7
Create Date: 2026-03-20 19:30:00.000000
"""
from collections.abc import Sequence
import sqlalchemy as sa
from alembic import op
# revision identifiers, used by Alembic.
revision: str = "l3a4b5c6d7e8"
down_revision: str | Sequence[str] | None = "k2f3a4b5c6d7"
branch_labels: str | Sequence[str] | None = None
depends_on: str | Sequence[str] | None = None
def upgrade() -> None:
# Add ability reference
op.add_column(
"boss_pokemon",
sa.Column(
"ability_id", sa.Integer(), sa.ForeignKey("abilities.id"), nullable=True
),
)
op.create_index("ix_boss_pokemon_ability_id", "boss_pokemon", ["ability_id"])
# Add held item (plain string)
op.add_column(
"boss_pokemon",
sa.Column("held_item", sa.String(50), nullable=True),
)
# Add nature (plain string)
op.add_column(
"boss_pokemon",
sa.Column("nature", sa.String(20), nullable=True),
)
# Add move references (up to 4 moves)
for i in range(1, 5):
op.add_column(
"boss_pokemon",
sa.Column(
f"move{i}_id", sa.Integer(), sa.ForeignKey("moves.id"), nullable=True
),
)
op.create_index(f"ix_boss_pokemon_move{i}_id", "boss_pokemon", [f"move{i}_id"])
def downgrade() -> None:
for i in range(1, 5):
op.drop_index(f"ix_boss_pokemon_move{i}_id", "boss_pokemon")
op.drop_column("boss_pokemon", f"move{i}_id")
op.drop_column("boss_pokemon", "nature")
op.drop_column("boss_pokemon", "held_item")
op.drop_index("ix_boss_pokemon_ability_id", "boss_pokemon")
op.drop_column("boss_pokemon", "ability_id")

View File

@@ -0,0 +1,44 @@
"""add boss result team
Revision ID: m4b5c6d7e8f9
Revises: l3a4b5c6d7e8
Create Date: 2026-03-20 20:00:00.000000
"""
from collections.abc import Sequence
import sqlalchemy as sa
from alembic import op
# revision identifiers, used by Alembic.
revision: str = "m4b5c6d7e8f9"
down_revision: str | Sequence[str] | None = "l3a4b5c6d7e8"
branch_labels: str | Sequence[str] | None = None
depends_on: str | Sequence[str] | None = None
def upgrade() -> None:
op.create_table(
"boss_result_team",
sa.Column("id", sa.Integer(), primary_key=True),
sa.Column(
"boss_result_id",
sa.Integer(),
sa.ForeignKey("boss_results.id", ondelete="CASCADE"),
nullable=False,
index=True,
),
sa.Column(
"encounter_id",
sa.Integer(),
sa.ForeignKey("encounters.id", ondelete="CASCADE"),
nullable=False,
index=True,
),
sa.Column("level", sa.SmallInteger(), nullable=False),
)
def downgrade() -> None:
op.drop_table("boss_result_team")

View File

@@ -0,0 +1,37 @@
"""create users table
Revision ID: n5c6d7e8f9a0
Revises: m4b5c6d7e8f9
Create Date: 2026-03-20 22:00:00.000000
"""
from collections.abc import Sequence
import sqlalchemy as sa
from alembic import op
# revision identifiers, used by Alembic.
revision: str = "n5c6d7e8f9a0"
down_revision: str | Sequence[str] | None = "m4b5c6d7e8f9"
branch_labels: str | Sequence[str] | None = None
depends_on: str | Sequence[str] | None = None
def upgrade() -> None:
op.create_table(
"users",
sa.Column("id", sa.UUID(), primary_key=True),
sa.Column("email", sa.String(255), nullable=False, unique=True, index=True),
sa.Column("display_name", sa.String(100), nullable=True),
sa.Column(
"created_at",
sa.DateTime(timezone=True),
nullable=False,
server_default=sa.func.now(),
),
)
def downgrade() -> None:
op.drop_table("users")

View File

@@ -0,0 +1,60 @@
"""add owner_id and visibility to runs
Revision ID: o6d7e8f9a0b1
Revises: n5c6d7e8f9a0
Create Date: 2026-03-20 22:01:00.000000
"""
from collections.abc import Sequence
import sqlalchemy as sa
from alembic import op
# revision identifiers, used by Alembic.
revision: str = "o6d7e8f9a0b1"
down_revision: str | Sequence[str] | None = "n5c6d7e8f9a0"
branch_labels: str | Sequence[str] | None = None
depends_on: str | Sequence[str] | None = None
def upgrade() -> None:
# Create visibility enum
visibility_enum = sa.Enum("public", "private", name="run_visibility")
visibility_enum.create(op.get_bind(), checkfirst=True)
# Add owner_id (nullable FK to users)
op.add_column(
"nuzlocke_runs",
sa.Column("owner_id", sa.UUID(), nullable=True),
)
op.create_foreign_key(
"fk_nuzlocke_runs_owner_id",
"nuzlocke_runs",
"users",
["owner_id"],
["id"],
ondelete="SET NULL",
)
op.create_index("ix_nuzlocke_runs_owner_id", "nuzlocke_runs", ["owner_id"])
# Add visibility column with default 'public'
op.add_column(
"nuzlocke_runs",
sa.Column(
"visibility",
visibility_enum,
nullable=False,
server_default="public",
),
)
def downgrade() -> None:
op.drop_column("nuzlocke_runs", "visibility")
op.drop_index("ix_nuzlocke_runs_owner_id", table_name="nuzlocke_runs")
op.drop_constraint("fk_nuzlocke_runs_owner_id", "nuzlocke_runs", type_="foreignkey")
op.drop_column("nuzlocke_runs", "owner_id")
# Drop the enum type
sa.Enum(name="run_visibility").drop(op.get_bind(), checkfirst=True)

View File

@@ -5,10 +5,13 @@ from sqlalchemy import or_, select
from sqlalchemy.ext.asyncio import AsyncSession
from sqlalchemy.orm import selectinload
from app.core.auth import AuthUser, require_auth
from app.core.database import get_session
from app.models.boss_battle import BossBattle
from app.models.boss_pokemon import BossPokemon
from app.models.boss_result import BossResult
from app.models.boss_result_team import BossResultTeam
from app.models.encounter import Encounter
from app.models.game import Game
from app.models.nuzlocke_run import NuzlockeRun
from app.models.pokemon import Pokemon
@@ -28,6 +31,18 @@ from app.seeds.loader import upsert_bosses
router = APIRouter()
def _boss_pokemon_load_options():
"""Standard eager-loading options for BossPokemon relationships."""
return (
selectinload(BossBattle.pokemon).selectinload(BossPokemon.pokemon),
selectinload(BossBattle.pokemon).selectinload(BossPokemon.ability),
selectinload(BossBattle.pokemon).selectinload(BossPokemon.move1),
selectinload(BossBattle.pokemon).selectinload(BossPokemon.move2),
selectinload(BossBattle.pokemon).selectinload(BossPokemon.move3),
selectinload(BossBattle.pokemon).selectinload(BossPokemon.move4),
)
async def _get_version_group_id(session: AsyncSession, game_id: int) -> int:
game = await session.get(Game, game_id)
if game is None:
@@ -53,7 +68,7 @@ async def list_bosses(
query = (
select(BossBattle)
.where(BossBattle.version_group_id == vg_id)
.options(selectinload(BossBattle.pokemon).selectinload(BossPokemon.pokemon))
.options(*_boss_pokemon_load_options())
.order_by(BossBattle.order)
)
@@ -71,6 +86,7 @@ async def reorder_bosses(
game_id: int,
data: BossReorderRequest,
session: AsyncSession = Depends(get_session),
_user: AuthUser = Depends(require_auth),
):
vg_id = await _get_version_group_id(session, game_id)
@@ -101,7 +117,7 @@ async def reorder_bosses(
result = await session.execute(
select(BossBattle)
.where(BossBattle.version_group_id == vg_id)
.options(selectinload(BossBattle.pokemon).selectinload(BossPokemon.pokemon))
.options(*_boss_pokemon_load_options())
.order_by(BossBattle.order)
)
return result.scalars().all()
@@ -114,6 +130,7 @@ async def create_boss(
game_id: int,
data: BossBattleCreate,
session: AsyncSession = Depends(get_session),
_user: AuthUser = Depends(require_auth),
):
vg_id = await _get_version_group_id(session, game_id)
@@ -133,7 +150,7 @@ async def create_boss(
result = await session.execute(
select(BossBattle)
.where(BossBattle.id == boss.id)
.options(selectinload(BossBattle.pokemon).selectinload(BossPokemon.pokemon))
.options(*_boss_pokemon_load_options())
)
return result.scalar_one()
@@ -144,6 +161,7 @@ async def update_boss(
boss_id: int,
data: BossBattleUpdate,
session: AsyncSession = Depends(get_session),
_user: AuthUser = Depends(require_auth),
):
vg_id = await _get_version_group_id(session, game_id)
@@ -158,7 +176,7 @@ async def update_boss(
result = await session.execute(
select(BossBattle)
.where(BossBattle.id == boss_id, BossBattle.version_group_id == vg_id)
.options(selectinload(BossBattle.pokemon).selectinload(BossPokemon.pokemon))
.options(*_boss_pokemon_load_options())
)
boss = result.scalar_one_or_none()
if boss is None:
@@ -174,7 +192,7 @@ async def update_boss(
result = await session.execute(
select(BossBattle)
.where(BossBattle.id == boss.id)
.options(selectinload(BossBattle.pokemon).selectinload(BossPokemon.pokemon))
.options(*_boss_pokemon_load_options())
)
return result.scalar_one()
@@ -184,6 +202,7 @@ async def delete_boss(
game_id: int,
boss_id: int,
session: AsyncSession = Depends(get_session),
_user: AuthUser = Depends(require_auth),
):
vg_id = await _get_version_group_id(session, game_id)
@@ -206,6 +225,7 @@ async def bulk_import_bosses(
game_id: int,
items: list[BulkBossItem],
session: AsyncSession = Depends(get_session),
_user: AuthUser = Depends(require_auth),
):
vg_id = await _get_version_group_id(session, game_id)
@@ -248,6 +268,7 @@ async def set_boss_team(
boss_id: int,
team: list[BossPokemonInput],
session: AsyncSession = Depends(get_session),
_user: AuthUser = Depends(require_auth),
):
vg_id = await _get_version_group_id(session, game_id)
@@ -272,6 +293,13 @@ async def set_boss_team(
level=item.level,
order=item.order,
condition_label=item.condition_label,
ability_id=item.ability_id,
held_item=item.held_item,
nature=item.nature,
move1_id=item.move1_id,
move2_id=item.move2_id,
move3_id=item.move3_id,
move4_id=item.move4_id,
)
session.add(bp)
@@ -286,7 +314,7 @@ async def set_boss_team(
result = await session.execute(
select(BossBattle)
.where(BossBattle.id == boss.id)
.options(selectinload(BossBattle.pokemon).selectinload(BossPokemon.pokemon))
.options(*_boss_pokemon_load_options())
)
return result.scalar_one()
@@ -301,7 +329,10 @@ async def list_boss_results(run_id: int, session: AsyncSession = Depends(get_ses
raise HTTPException(status_code=404, detail="Run not found")
result = await session.execute(
select(BossResult).where(BossResult.run_id == run_id).order_by(BossResult.id)
select(BossResult)
.where(BossResult.run_id == run_id)
.options(selectinload(BossResult.team))
.order_by(BossResult.id)
)
return result.scalars().all()
@@ -313,6 +344,7 @@ async def create_boss_result(
run_id: int,
data: BossResultCreate,
session: AsyncSession = Depends(get_session),
_user: AuthUser = Depends(require_auth),
):
run = await session.get(NuzlockeRun, run_id)
if run is None:
@@ -322,12 +354,30 @@ async def create_boss_result(
if boss is None:
raise HTTPException(status_code=404, detail="Boss battle not found")
# Validate team encounter IDs belong to this run
if data.team:
encounter_ids = [t.encounter_id for t in data.team]
enc_result = await session.execute(
select(Encounter).where(
Encounter.id.in_(encounter_ids), Encounter.run_id == run_id
)
)
found_encounters = {e.id for e in enc_result.scalars().all()}
missing = [eid for eid in encounter_ids if eid not in found_encounters]
if missing:
raise HTTPException(
status_code=400,
detail=f"Encounters not found in this run: {missing}",
)
# Check for existing result (upsert)
existing = await session.execute(
select(BossResult).where(
select(BossResult)
.where(
BossResult.run_id == run_id,
BossResult.boss_battle_id == data.boss_battle_id,
)
.options(selectinload(BossResult.team))
)
result = existing.scalar_one_or_none()
@@ -335,6 +385,10 @@ async def create_boss_result(
result.result = data.result
result.attempts = data.attempts
result.completed_at = datetime.now(UTC) if data.result == "won" else None
# Clear existing team and add new
for tm in result.team:
await session.delete(tm)
await session.flush()
else:
result = BossResult(
run_id=run_id,
@@ -344,10 +398,26 @@ async def create_boss_result(
completed_at=datetime.now(UTC) if data.result == "won" else None,
)
session.add(result)
await session.flush()
# Add team members
for tm in data.team:
team_member = BossResultTeam(
boss_result_id=result.id,
encounter_id=tm.encounter_id,
level=tm.level,
)
session.add(team_member)
await session.commit()
await session.refresh(result)
return result
# Re-fetch with team loaded
fresh = await session.execute(
select(BossResult)
.where(BossResult.id == result.id)
.options(selectinload(BossResult.team))
)
return fresh.scalar_one()
@router.delete("/runs/{run_id}/boss-results/{result_id}", status_code=204)
@@ -355,6 +425,7 @@ async def delete_boss_result(
run_id: int,
result_id: int,
session: AsyncSession = Depends(get_session),
_user: AuthUser = Depends(require_auth),
):
result = await session.execute(
select(BossResult).where(

View File

@@ -5,6 +5,7 @@ from sqlalchemy import select
from sqlalchemy.ext.asyncio import AsyncSession
from sqlalchemy.orm import joinedload, selectinload
from app.core.auth import AuthUser, require_auth
from app.core.database import get_session
from app.models.encounter import Encounter
from app.models.evolution import Evolution
@@ -35,6 +36,7 @@ async def create_encounter(
run_id: int,
data: EncounterCreate,
session: AsyncSession = Depends(get_session),
_user: AuthUser = Depends(require_auth),
):
# Validate run exists
run = await session.get(NuzlockeRun, run_id)
@@ -137,6 +139,7 @@ async def update_encounter(
encounter_id: int,
data: EncounterUpdate,
session: AsyncSession = Depends(get_session),
_user: AuthUser = Depends(require_auth),
):
encounter = await session.get(Encounter, encounter_id)
if encounter is None:
@@ -163,7 +166,9 @@ async def update_encounter(
@router.delete("/encounters/{encounter_id}", status_code=204)
async def delete_encounter(
encounter_id: int, session: AsyncSession = Depends(get_session)
encounter_id: int,
session: AsyncSession = Depends(get_session),
_user: AuthUser = Depends(require_auth),
):
encounter = await session.get(Encounter, encounter_id)
if encounter is None:
@@ -195,6 +200,7 @@ async def delete_encounter(
async def bulk_randomize_encounters(
run_id: int,
session: AsyncSession = Depends(get_session),
_user: AuthUser = Depends(require_auth),
):
# 1. Validate run
run = await session.get(NuzlockeRun, run_id)

View File

@@ -6,6 +6,7 @@ from sqlalchemy import delete, select, update
from sqlalchemy.ext.asyncio import AsyncSession
from sqlalchemy.orm import selectinload
from app.core.auth import AuthUser, require_auth
from app.core.database import get_session
from app.models.boss_battle import BossBattle
from app.models.game import Game
@@ -228,7 +229,11 @@ async def list_game_routes(
@router.post("", response_model=GameResponse, status_code=201)
async def create_game(data: GameCreate, session: AsyncSession = Depends(get_session)):
async def create_game(
data: GameCreate,
session: AsyncSession = Depends(get_session),
_user: AuthUser = Depends(require_auth),
):
existing = await session.execute(select(Game).where(Game.slug == data.slug))
if existing.scalar_one_or_none() is not None:
raise HTTPException(
@@ -244,7 +249,10 @@ async def create_game(data: GameCreate, session: AsyncSession = Depends(get_sess
@router.put("/{game_id}", response_model=GameResponse)
async def update_game(
game_id: int, data: GameUpdate, session: AsyncSession = Depends(get_session)
game_id: int,
data: GameUpdate,
session: AsyncSession = Depends(get_session),
_user: AuthUser = Depends(require_auth),
):
game = await session.get(Game, game_id)
if game is None:
@@ -269,7 +277,11 @@ async def update_game(
@router.delete("/{game_id}", status_code=204)
async def delete_game(game_id: int, session: AsyncSession = Depends(get_session)):
async def delete_game(
game_id: int,
session: AsyncSession = Depends(get_session),
_user: AuthUser = Depends(require_auth),
):
result = await session.execute(
select(Game).where(Game.id == game_id).options(selectinload(Game.runs))
)
@@ -323,7 +335,10 @@ async def delete_game(game_id: int, session: AsyncSession = Depends(get_session)
@router.post("/{game_id}/routes", response_model=RouteResponse, status_code=201)
async def create_route(
game_id: int, data: RouteCreate, session: AsyncSession = Depends(get_session)
game_id: int,
data: RouteCreate,
session: AsyncSession = Depends(get_session),
_user: AuthUser = Depends(require_auth),
):
vg_id = await _get_version_group_id(session, game_id)
@@ -339,6 +354,7 @@ async def reorder_routes(
game_id: int,
data: RouteReorderRequest,
session: AsyncSession = Depends(get_session),
_user: AuthUser = Depends(require_auth),
):
vg_id = await _get_version_group_id(session, game_id)
@@ -365,6 +381,7 @@ async def update_route(
route_id: int,
data: RouteUpdate,
session: AsyncSession = Depends(get_session),
_user: AuthUser = Depends(require_auth),
):
vg_id = await _get_version_group_id(session, game_id)
@@ -385,6 +402,7 @@ async def delete_route(
game_id: int,
route_id: int,
session: AsyncSession = Depends(get_session),
_user: AuthUser = Depends(require_auth),
):
vg_id = await _get_version_group_id(session, game_id)
@@ -419,6 +437,7 @@ async def bulk_import_routes(
game_id: int,
items: list[BulkRouteItem],
session: AsyncSession = Depends(get_session),
_user: AuthUser = Depends(require_auth),
):
vg_id = await _get_version_group_id(session, game_id)

View File

@@ -6,6 +6,7 @@ from sqlalchemy import update as sa_update
from sqlalchemy.ext.asyncio import AsyncSession
from sqlalchemy.orm import selectinload
from app.core.auth import AuthUser, require_auth
from app.core.database import get_session
from app.models.encounter import Encounter
from app.models.evolution import Evolution
@@ -437,7 +438,9 @@ async def get_genlocke_lineages(
@router.post("", response_model=GenlockeResponse, status_code=201)
async def create_genlocke(
data: GenlockeCreate, session: AsyncSession = Depends(get_session)
data: GenlockeCreate,
session: AsyncSession = Depends(get_session),
_user: AuthUser = Depends(require_auth),
):
if not data.game_ids:
raise HTTPException(status_code=400, detail="At least one game is required")
@@ -568,6 +571,7 @@ async def advance_leg(
leg_order: int,
data: AdvanceLegRequest | None = None,
session: AsyncSession = Depends(get_session),
_user: AuthUser = Depends(require_auth),
):
# Load genlocke with legs
result = await session.execute(
@@ -822,6 +826,7 @@ async def update_genlocke(
genlocke_id: int,
data: GenlockeUpdate,
session: AsyncSession = Depends(get_session),
_user: AuthUser = Depends(require_auth),
):
result = await session.execute(
select(Genlocke)
@@ -858,6 +863,7 @@ async def update_genlocke(
async def delete_genlocke(
genlocke_id: int,
session: AsyncSession = Depends(get_session),
_user: AuthUser = Depends(require_auth),
):
genlocke = await session.get(Genlocke, genlocke_id)
if genlocke is None:
@@ -889,6 +895,7 @@ async def add_leg(
genlocke_id: int,
data: AddLegRequest,
session: AsyncSession = Depends(get_session),
_user: AuthUser = Depends(require_auth),
):
genlocke = await session.get(Genlocke, genlocke_id)
if genlocke is None:
@@ -931,6 +938,7 @@ async def remove_leg(
genlocke_id: int,
leg_id: int,
session: AsyncSession = Depends(get_session),
_user: AuthUser = Depends(require_auth),
):
result = await session.execute(
select(GenlockeLeg).where(

View File

@@ -5,6 +5,7 @@ from fastapi import APIRouter, Depends, HTTPException, Response
from sqlalchemy import select
from sqlalchemy.ext.asyncio import AsyncSession
from app.core.auth import AuthUser, require_auth
from app.core.database import get_session
from app.models.boss_result import BossResult
from app.models.journal_entry import JournalEntry
@@ -45,6 +46,7 @@ async def create_journal_entry(
run_id: int,
data: JournalEntryCreate,
session: AsyncSession = Depends(get_session),
_user: AuthUser = Depends(require_auth),
):
# Validate run exists
run = await session.get(NuzlockeRun, run_id)
@@ -97,6 +99,7 @@ async def update_journal_entry(
entry_id: UUID,
data: JournalEntryUpdate,
session: AsyncSession = Depends(get_session),
_user: AuthUser = Depends(require_auth),
):
result = await session.execute(
select(JournalEntry).where(
@@ -135,6 +138,7 @@ async def delete_journal_entry(
run_id: int,
entry_id: UUID,
session: AsyncSession = Depends(get_session),
_user: AuthUser = Depends(require_auth),
):
result = await session.execute(
select(JournalEntry).where(

View File

@@ -0,0 +1,95 @@
from fastapi import APIRouter, Depends, Query
from sqlalchemy import func, select
from sqlalchemy.ext.asyncio import AsyncSession
from app.core.database import get_session
from app.models.ability import Ability
from app.models.move import Move
from app.schemas.move import (
AbilityResponse,
MoveResponse,
PaginatedAbilityResponse,
PaginatedMoveResponse,
)
router = APIRouter()
@router.get("/moves", response_model=PaginatedMoveResponse)
async def list_moves(
search: str | None = None,
limit: int = Query(default=20, ge=1, le=100),
offset: int = Query(default=0, ge=0),
session: AsyncSession = Depends(get_session),
):
query = select(Move)
if search:
query = query.where(Move.name.ilike(f"%{search}%"))
query = query.order_by(Move.name).offset(offset).limit(limit)
result = await session.execute(query)
items = result.scalars().all()
# Count total
count_query = select(func.count()).select_from(Move)
if search:
count_query = count_query.where(Move.name.ilike(f"%{search}%"))
total_result = await session.execute(count_query)
total = total_result.scalar() or 0
return PaginatedMoveResponse(items=items, total=total, limit=limit, offset=offset)
@router.get("/moves/{move_id}", response_model=MoveResponse)
async def get_move(
move_id: int,
session: AsyncSession = Depends(get_session),
):
move = await session.get(Move, move_id)
if move is None:
from fastapi import HTTPException
raise HTTPException(status_code=404, detail="Move not found")
return move
@router.get("/abilities", response_model=PaginatedAbilityResponse)
async def list_abilities(
search: str | None = None,
limit: int = Query(default=20, ge=1, le=100),
offset: int = Query(default=0, ge=0),
session: AsyncSession = Depends(get_session),
):
query = select(Ability)
if search:
query = query.where(Ability.name.ilike(f"%{search}%"))
query = query.order_by(Ability.name).offset(offset).limit(limit)
result = await session.execute(query)
items = result.scalars().all()
# Count total
count_query = select(func.count()).select_from(Ability)
if search:
count_query = count_query.where(Ability.name.ilike(f"%{search}%"))
total_result = await session.execute(count_query)
total = total_result.scalar() or 0
return PaginatedAbilityResponse(
items=items, total=total, limit=limit, offset=offset
)
@router.get("/abilities/{ability_id}", response_model=AbilityResponse)
async def get_ability(
ability_id: int,
session: AsyncSession = Depends(get_session),
):
ability = await session.get(Ability, ability_id)
if ability is None:
from fastapi import HTTPException
raise HTTPException(status_code=404, detail="Ability not found")
return ability

View File

@@ -9,13 +9,16 @@ from app.api import (
genlockes,
health,
journal_entries,
moves_abilities,
pokemon,
runs,
stats,
users,
)
api_router = APIRouter()
api_router.include_router(health.router)
api_router.include_router(users.router, prefix="/users", tags=["users"])
api_router.include_router(games.router, prefix="/games", tags=["games"])
api_router.include_router(pokemon.router, tags=["pokemon"])
api_router.include_router(evolutions.router, tags=["evolutions"])
@@ -25,4 +28,5 @@ api_router.include_router(genlockes.router, prefix="/genlockes", tags=["genlocke
api_router.include_router(encounters.router, tags=["encounters"])
api_router.include_router(stats.router, prefix="/stats", tags=["stats"])
api_router.include_router(bosses.router, tags=["bosses"])
api_router.include_router(moves_abilities.router, tags=["moves", "abilities"])
api_router.include_router(export.router, prefix="/export", tags=["export"])

View File

@@ -1,10 +1,12 @@
from datetime import UTC, datetime
from uuid import UUID
from fastapi import APIRouter, Depends, HTTPException, Response
from fastapi import APIRouter, Depends, HTTPException, Request, Response
from sqlalchemy import func, select
from sqlalchemy.ext.asyncio import AsyncSession
from sqlalchemy.orm import joinedload, selectinload
from app.core.auth import AuthUser, get_current_user, require_auth
from app.core.database import get_session
from app.models.boss_result import BossResult
from app.models.encounter import Encounter
@@ -12,8 +14,10 @@ from app.models.evolution import Evolution
from app.models.game import Game
from app.models.genlocke import GenlockeLeg
from app.models.genlocke_transfer import GenlockeTransfer
from app.models.nuzlocke_run import NuzlockeRun
from app.models.nuzlocke_run import NuzlockeRun, RunVisibility
from app.models.user import User
from app.schemas.run import (
OwnerResponse,
RunCreate,
RunDetailResponse,
RunGenlockeContext,
@@ -157,41 +161,136 @@ async def _compute_lineage_suggestion(
return f"{base_name} {numeral}"
def _build_run_response(run: NuzlockeRun) -> RunResponse:
"""Build RunResponse with owner info if present."""
owner = None
if run.owner:
owner = OwnerResponse(id=run.owner.id, display_name=run.owner.display_name)
return RunResponse(
id=run.id,
game_id=run.game_id,
name=run.name,
status=run.status,
rules=run.rules,
hof_encounter_ids=run.hof_encounter_ids,
naming_scheme=run.naming_scheme,
visibility=run.visibility,
owner=owner,
started_at=run.started_at,
completed_at=run.completed_at,
)
def _check_run_access(
run: NuzlockeRun, user: AuthUser | None, require_owner: bool = False
) -> None:
"""
Check if user can access the run.
Raises 403 for private runs if user is not owner.
If require_owner=True, always requires ownership (for mutations).
"""
if run.owner_id is None:
# Unowned runs are accessible by everyone (legacy)
if require_owner:
raise HTTPException(
status_code=403, detail="Only the run owner can perform this action"
)
return
user_id = UUID(user.id) if user else None
if require_owner:
if user_id != run.owner_id:
raise HTTPException(
status_code=403, detail="Only the run owner can perform this action"
)
return
if run.visibility == RunVisibility.PRIVATE and user_id != run.owner_id:
raise HTTPException(status_code=403, detail="This run is private")
@router.post("", response_model=RunResponse, status_code=201)
async def create_run(data: RunCreate, session: AsyncSession = Depends(get_session)):
async def create_run(
data: RunCreate,
session: AsyncSession = Depends(get_session),
user: AuthUser = Depends(require_auth),
):
# Validate game exists
game = await session.get(Game, data.game_id)
if game is None:
raise HTTPException(status_code=404, detail="Game not found")
# Ensure user exists in local DB
user_id = UUID(user.id)
db_user = await session.get(User, user_id)
if db_user is None:
db_user = User(id=user_id, email=user.email or "")
session.add(db_user)
run = NuzlockeRun(
game_id=data.game_id,
owner_id=user_id,
name=data.name,
status="active",
visibility=data.visibility,
rules=data.rules,
naming_scheme=data.naming_scheme,
)
session.add(run)
await session.commit()
await session.refresh(run)
return run
# Reload with owner relationship
result = await session.execute(
select(NuzlockeRun)
.where(NuzlockeRun.id == run.id)
.options(joinedload(NuzlockeRun.owner))
)
run = result.scalar_one()
return _build_run_response(run)
@router.get("", response_model=list[RunResponse])
async def list_runs(session: AsyncSession = Depends(get_session)):
result = await session.execute(
select(NuzlockeRun).order_by(NuzlockeRun.started_at.desc())
)
return result.scalars().all()
async def list_runs(
request: Request,
session: AsyncSession = Depends(get_session),
user: AuthUser | None = Depends(get_current_user),
):
"""
List runs. Shows public runs and user's own private runs.
"""
query = select(NuzlockeRun).options(joinedload(NuzlockeRun.owner))
if user:
user_id = UUID(user.id)
# Show public runs OR runs owned by current user
query = query.where(
(NuzlockeRun.visibility == RunVisibility.PUBLIC)
| (NuzlockeRun.owner_id == user_id)
)
else:
# Anonymous: only public runs
query = query.where(NuzlockeRun.visibility == RunVisibility.PUBLIC)
query = query.order_by(NuzlockeRun.started_at.desc())
result = await session.execute(query)
runs = result.scalars().all()
return [_build_run_response(run) for run in runs]
@router.get("/{run_id}", response_model=RunDetailResponse)
async def get_run(run_id: int, session: AsyncSession = Depends(get_session)):
async def get_run(
run_id: int,
request: Request,
session: AsyncSession = Depends(get_session),
user: AuthUser | None = Depends(get_current_user),
):
result = await session.execute(
select(NuzlockeRun)
.where(NuzlockeRun.id == run_id)
.options(
joinedload(NuzlockeRun.game),
joinedload(NuzlockeRun.owner),
selectinload(NuzlockeRun.encounters).joinedload(Encounter.pokemon),
selectinload(NuzlockeRun.encounters).joinedload(Encounter.current_pokemon),
selectinload(NuzlockeRun.encounters).joinedload(Encounter.route),
@@ -201,6 +300,9 @@ async def get_run(run_id: int, session: AsyncSession = Depends(get_session)):
if run is None:
raise HTTPException(status_code=404, detail="Run not found")
# Check visibility access
_check_run_access(run, user)
# Check if this run belongs to a genlocke
genlocke_context = None
leg_result = await session.execute(
@@ -262,11 +364,20 @@ async def update_run(
run_id: int,
data: RunUpdate,
session: AsyncSession = Depends(get_session),
user: AuthUser = Depends(require_auth),
):
run = await session.get(NuzlockeRun, run_id)
result = await session.execute(
select(NuzlockeRun)
.where(NuzlockeRun.id == run_id)
.options(joinedload(NuzlockeRun.owner))
)
run = result.scalar_one_or_none()
if run is None:
raise HTTPException(status_code=404, detail="Run not found")
# Check ownership for mutations (unowned runs allow anyone for backwards compat)
_check_run_access(run, user, require_owner=run.owner_id is not None)
update_data = data.model_dump(exclude_unset=True)
# Validate hof_encounter_ids if provided
@@ -352,16 +463,30 @@ async def update_run(
genlocke.status = "completed"
await session.commit()
await session.refresh(run)
return run
# Reload with owner relationship
result = await session.execute(
select(NuzlockeRun)
.where(NuzlockeRun.id == run.id)
.options(joinedload(NuzlockeRun.owner))
)
run = result.scalar_one()
return _build_run_response(run)
@router.delete("/{run_id}", status_code=204)
async def delete_run(run_id: int, session: AsyncSession = Depends(get_session)):
async def delete_run(
run_id: int,
session: AsyncSession = Depends(get_session),
user: AuthUser = Depends(require_auth),
):
run = await session.get(NuzlockeRun, run_id)
if run is None:
raise HTTPException(status_code=404, detail="Run not found")
# Check ownership for deletion (unowned runs allow anyone for backwards compat)
_check_run_access(run, user, require_owner=run.owner_id is not None)
# Block deletion if run is linked to a genlocke leg
leg_result = await session.execute(
select(GenlockeLeg).where(GenlockeLeg.run_id == run_id)

View File

@@ -0,0 +1,106 @@
from uuid import UUID
from fastapi import APIRouter, Depends
from sqlalchemy import select
from sqlalchemy.ext.asyncio import AsyncSession
from app.core.auth import AuthUser, require_auth
from app.core.database import get_session
from app.models.user import User
from app.schemas.base import CamelModel
router = APIRouter()
class UserResponse(CamelModel):
id: UUID
email: str
display_name: str | None = None
@router.post("/me", response_model=UserResponse)
async def sync_current_user(
session: AsyncSession = Depends(get_session),
auth_user: AuthUser = Depends(require_auth),
):
"""
Sync the current authenticated user from Supabase to local DB.
Creates user on first login, updates email if changed.
"""
user_id = UUID(auth_user.id)
result = await session.execute(select(User).where(User.id == user_id))
user = result.scalar_one_or_none()
if user is None:
# First login - create user record
user = User(
id=user_id,
email=auth_user.email or "",
display_name=None,
)
session.add(user)
elif auth_user.email and user.email != auth_user.email:
# Email changed in Supabase - update local record
user.email = auth_user.email
await session.commit()
await session.refresh(user)
return user
@router.get("/me", response_model=UserResponse)
async def get_current_user(
session: AsyncSession = Depends(get_session),
auth_user: AuthUser = Depends(require_auth),
):
"""Get the current authenticated user's profile."""
user_id = UUID(auth_user.id)
result = await session.execute(select(User).where(User.id == user_id))
user = result.scalar_one_or_none()
if user is None:
# Auto-create if not exists (shouldn't happen if /me POST is called on login)
user = User(
id=user_id,
email=auth_user.email or "",
display_name=None,
)
session.add(user)
await session.commit()
await session.refresh(user)
return user
class UserUpdateRequest(CamelModel):
display_name: str | None = None
@router.patch("/me", response_model=UserResponse)
async def update_current_user(
data: UserUpdateRequest,
session: AsyncSession = Depends(get_session),
auth_user: AuthUser = Depends(require_auth),
):
"""Update the current user's profile (display name)."""
user_id = UUID(auth_user.id)
result = await session.execute(select(User).where(User.id == user_id))
user = result.scalar_one_or_none()
if user is None:
user = User(
id=user_id,
email=auth_user.email or "",
display_name=data.display_name,
)
session.add(user)
else:
if data.display_name is not None:
user.display_name = data.display_name
await session.commit()
await session.refresh(user)
return user

View File

@@ -0,0 +1,83 @@
from dataclasses import dataclass
import jwt
from fastapi import Depends, HTTPException, Request, status
from app.core.config import settings
@dataclass
class AuthUser:
"""Authenticated user info extracted from JWT."""
id: str # Supabase user UUID
email: str | None = None
role: str | None = None
def _extract_token(request: Request) -> str | None:
"""Extract Bearer token from Authorization header."""
auth_header = request.headers.get("Authorization")
if not auth_header:
return None
parts = auth_header.split()
if len(parts) != 2 or parts[0].lower() != "bearer":
return None
return parts[1]
def _verify_jwt(token: str) -> dict | None:
"""Verify JWT against Supabase JWT secret. Returns payload or None."""
if not settings.supabase_jwt_secret:
return None
try:
payload = jwt.decode(
token,
settings.supabase_jwt_secret,
algorithms=["HS256"],
audience="authenticated",
)
return payload
except jwt.ExpiredSignatureError:
return None
except jwt.InvalidTokenError:
return None
def get_current_user(request: Request) -> AuthUser | None:
"""
Extract and verify the current user from the request.
Returns AuthUser if valid token, None otherwise.
"""
token = _extract_token(request)
if not token:
return None
payload = _verify_jwt(token)
if not payload:
return None
# Supabase JWT has 'sub' as user ID
user_id = payload.get("sub")
if not user_id:
return None
return AuthUser(
id=user_id,
email=payload.get("email"),
role=payload.get("role"),
)
def require_auth(user: AuthUser | None = Depends(get_current_user)) -> AuthUser:
"""
Dependency that requires authentication.
Raises 401 if no valid token is present.
"""
if user is None:
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Authentication required",
headers={"WWW-Authenticate": "Bearer"},
)
return user

View File

@@ -17,5 +17,10 @@ class Settings(BaseSettings):
# Database settings
database_url: str = "postgresql+asyncpg://postgres:postgres@localhost:5432/nuzlocke"
# Supabase Auth
supabase_url: str | None = None
supabase_anon_key: str | None = None
supabase_jwt_secret: str | None = None
settings = Settings()

View File

@@ -2,6 +2,7 @@ from app.models.ability import Ability
from app.models.boss_battle import BossBattle
from app.models.boss_pokemon import BossPokemon
from app.models.boss_result import BossResult
from app.models.boss_result_team import BossResultTeam
from app.models.encounter import Encounter
from app.models.evolution import Evolution
from app.models.game import Game
@@ -13,6 +14,7 @@ from app.models.nuzlocke_run import NuzlockeRun
from app.models.pokemon import Pokemon
from app.models.route import Route
from app.models.route_encounter import RouteEncounter
from app.models.user import User
from app.models.version_group import VersionGroup
__all__ = [
@@ -20,6 +22,7 @@ __all__ = [
"BossBattle",
"BossPokemon",
"BossResult",
"BossResultTeam",
"Encounter",
"Evolution",
"Game",
@@ -32,5 +35,6 @@ __all__ = [
"Pokemon",
"Route",
"RouteEncounter",
"User",
"VersionGroup",
]

View File

@@ -1,8 +1,18 @@
from __future__ import annotations
from typing import TYPE_CHECKING
from sqlalchemy import ForeignKey, SmallInteger, String
from sqlalchemy.orm import Mapped, mapped_column, relationship
from app.core.database import Base
if TYPE_CHECKING:
from app.models.ability import Ability
from app.models.boss_battle import BossBattle
from app.models.move import Move
from app.models.pokemon import Pokemon
class BossPokemon(Base):
__tablename__ = "boss_pokemon"
@@ -16,8 +26,24 @@ class BossPokemon(Base):
order: Mapped[int] = mapped_column(SmallInteger)
condition_label: Mapped[str | None] = mapped_column(String(100))
# Detail fields
ability_id: Mapped[int | None] = mapped_column(
ForeignKey("abilities.id"), index=True
)
held_item: Mapped[str | None] = mapped_column(String(50))
nature: Mapped[str | None] = mapped_column(String(20))
move1_id: Mapped[int | None] = mapped_column(ForeignKey("moves.id"), index=True)
move2_id: Mapped[int | None] = mapped_column(ForeignKey("moves.id"), index=True)
move3_id: Mapped[int | None] = mapped_column(ForeignKey("moves.id"), index=True)
move4_id: Mapped[int | None] = mapped_column(ForeignKey("moves.id"), index=True)
boss_battle: Mapped[BossBattle] = relationship(back_populates="pokemon")
pokemon: Mapped[Pokemon] = relationship()
ability: Mapped[Ability | None] = relationship()
move1: Mapped[Move | None] = relationship(foreign_keys=[move1_id])
move2: Mapped[Move | None] = relationship(foreign_keys=[move2_id])
move3: Mapped[Move | None] = relationship(foreign_keys=[move3_id])
move4: Mapped[Move | None] = relationship(foreign_keys=[move4_id])
def __repr__(self) -> str:
return f"<BossPokemon(id={self.id}, boss_battle_id={self.boss_battle_id}, pokemon_id={self.pokemon_id})>"

View File

@@ -25,6 +25,12 @@ class BossResult(Base):
run: Mapped[NuzlockeRun] = relationship(back_populates="boss_results")
boss_battle: Mapped[BossBattle] = relationship()
team: Mapped[list[BossResultTeam]] = relationship(
back_populates="boss_result", cascade="all, delete-orphan"
)
def __repr__(self) -> str:
return f"<BossResult(id={self.id}, run_id={self.run_id}, boss_battle_id={self.boss_battle_id}, result='{self.result}')>"
return (
f"<BossResult(id={self.id}, run_id={self.run_id}, "
f"boss_battle_id={self.boss_battle_id}, result='{self.result}')>"
)

View File

@@ -0,0 +1,26 @@
from sqlalchemy import ForeignKey, SmallInteger
from sqlalchemy.orm import Mapped, mapped_column, relationship
from app.core.database import Base
class BossResultTeam(Base):
__tablename__ = "boss_result_team"
id: Mapped[int] = mapped_column(primary_key=True)
boss_result_id: Mapped[int] = mapped_column(
ForeignKey("boss_results.id", ondelete="CASCADE"), index=True
)
encounter_id: Mapped[int] = mapped_column(
ForeignKey("encounters.id", ondelete="CASCADE"), index=True
)
level: Mapped[int] = mapped_column(SmallInteger)
boss_result: Mapped[BossResult] = relationship(back_populates="team")
encounter: Mapped[Encounter] = relationship()
def __repr__(self) -> str:
return (
f"<BossResultTeam(id={self.id}, boss_result_id={self.boss_result_id}, "
f"encounter_id={self.encounter_id}, level={self.level})>"
)

View File

@@ -1,21 +1,46 @@
from datetime import datetime
from __future__ import annotations
from sqlalchemy import DateTime, ForeignKey, String, func
from datetime import datetime
from enum import StrEnum
from typing import TYPE_CHECKING
from uuid import UUID
from sqlalchemy import DateTime, Enum, ForeignKey, String, func
from sqlalchemy.dialects.postgresql import JSONB
from sqlalchemy.orm import Mapped, mapped_column, relationship
from app.core.database import Base
if TYPE_CHECKING:
from app.models.boss_result import BossResult
from app.models.encounter import Encounter
from app.models.game import Game
from app.models.journal_entry import JournalEntry
from app.models.user import User
class RunVisibility(StrEnum):
PUBLIC = "public"
PRIVATE = "private"
class NuzlockeRun(Base):
__tablename__ = "nuzlocke_runs"
id: Mapped[int] = mapped_column(primary_key=True)
game_id: Mapped[int] = mapped_column(ForeignKey("games.id"), index=True)
owner_id: Mapped[UUID | None] = mapped_column(
ForeignKey("users.id", ondelete="SET NULL"), index=True
)
name: Mapped[str] = mapped_column(String(100))
status: Mapped[str] = mapped_column(
String(20), index=True
) # active, completed, failed
visibility: Mapped[RunVisibility] = mapped_column(
Enum(RunVisibility, name="run_visibility", create_constraint=False),
default=RunVisibility.PUBLIC,
server_default="public",
)
rules: Mapped[dict] = mapped_column(JSONB, default=dict)
started_at: Mapped[datetime] = mapped_column(
DateTime(timezone=True), server_default=func.now()
@@ -25,6 +50,7 @@ class NuzlockeRun(Base):
naming_scheme: Mapped[str | None] = mapped_column(String(50), nullable=True)
game: Mapped[Game] = relationship(back_populates="runs")
owner: Mapped[User | None] = relationship(back_populates="runs")
encounters: Mapped[list[Encounter]] = relationship(back_populates="run")
boss_results: Mapped[list[BossResult]] = relationship(back_populates="run")
journal_entries: Mapped[list[JournalEntry]] = relationship(back_populates="run")

View File

@@ -0,0 +1,29 @@
from __future__ import annotations
from datetime import datetime
from typing import TYPE_CHECKING
from uuid import UUID
from sqlalchemy import DateTime, String, func
from sqlalchemy.orm import Mapped, mapped_column, relationship
from app.core.database import Base
if TYPE_CHECKING:
from app.models.nuzlocke_run import NuzlockeRun
class User(Base):
__tablename__ = "users"
id: Mapped[UUID] = mapped_column(primary_key=True)
email: Mapped[str] = mapped_column(String(255), unique=True, index=True)
display_name: Mapped[str | None] = mapped_column(String(100))
created_at: Mapped[datetime] = mapped_column(
DateTime(timezone=True), server_default=func.now()
)
runs: Mapped[list[NuzlockeRun]] = relationship(back_populates="owner")
def __repr__(self) -> str:
return f"<User(id={self.id}, email='{self.email}')>"

View File

@@ -4,6 +4,16 @@ from app.schemas.base import CamelModel
from app.schemas.pokemon import PokemonResponse
class MoveRef(CamelModel):
id: int
name: str
class AbilityRef(CamelModel):
id: int
name: str
class BossPokemonResponse(CamelModel):
id: int
pokemon_id: int
@@ -11,6 +21,19 @@ class BossPokemonResponse(CamelModel):
order: int
condition_label: str | None
pokemon: PokemonResponse
# Detail fields
ability_id: int | None = None
ability: AbilityRef | None = None
held_item: str | None = None
nature: str | None = None
move1_id: int | None = None
move2_id: int | None = None
move3_id: int | None = None
move4_id: int | None = None
move1: MoveRef | None = None
move2: MoveRef | None = None
move3: MoveRef | None = None
move4: MoveRef | None = None
class BossBattleResponse(CamelModel):
@@ -31,6 +54,12 @@ class BossBattleResponse(CamelModel):
pokemon: list[BossPokemonResponse] = []
class BossResultTeamMemberResponse(CamelModel):
id: int
encounter_id: int
level: int
class BossResultResponse(CamelModel):
id: int
run_id: int
@@ -38,6 +67,7 @@ class BossResultResponse(CamelModel):
result: str
attempts: int
completed_at: datetime | None
team: list[BossResultTeamMemberResponse] = []
# --- Input schemas ---
@@ -78,12 +108,26 @@ class BossPokemonInput(CamelModel):
level: int
order: int
condition_label: str | None = None
# Detail fields
ability_id: int | None = None
held_item: str | None = None
nature: str | None = None
move1_id: int | None = None
move2_id: int | None = None
move3_id: int | None = None
move4_id: int | None = None
class BossResultTeamMemberInput(CamelModel):
encounter_id: int
level: int
class BossResultCreate(CamelModel):
boss_battle_id: int
result: str
attempts: int = 1
team: list[BossResultTeamMemberInput] = []
class BossReorderItem(CamelModel):

View File

@@ -1,15 +1,23 @@
from datetime import datetime
from uuid import UUID
from app.models.nuzlocke_run import RunVisibility
from app.schemas.base import CamelModel
from app.schemas.encounter import EncounterDetailResponse
from app.schemas.game import GameResponse
class OwnerResponse(CamelModel):
id: UUID
display_name: str | None = None
class RunCreate(CamelModel):
game_id: int
name: str
rules: dict = {}
naming_scheme: str | None = None
visibility: RunVisibility = RunVisibility.PUBLIC
class RunUpdate(CamelModel):
@@ -18,6 +26,7 @@ class RunUpdate(CamelModel):
rules: dict | None = None
hof_encounter_ids: list[int] | None = None
naming_scheme: str | None = None
visibility: RunVisibility | None = None
class RunResponse(CamelModel):
@@ -28,6 +37,8 @@ class RunResponse(CamelModel):
rules: dict
hof_encounter_ids: list[int] | None = None
naming_scheme: str | None = None
visibility: RunVisibility
owner: OwnerResponse | None = None
started_at: datetime
completed_at: datetime | None

View File

@@ -87,7 +87,9 @@ RUN_DEFS = [
"name": "Kanto Heartbreak",
"status": "failed",
"progress": 0.45,
"rules": {"customRules": "- Hardcore mode: no items in battle\n- Set mode only"},
"rules": {
"customRules": "- Hardcore mode: no items in battle\n- Set mode only"
},
"started_days_ago": 30,
"ended_days_ago": 20,
},

View File

@@ -1,13 +1,18 @@
import os
import time
import jwt
import pytest
from httpx import ASGITransport, AsyncClient
from sqlalchemy.ext.asyncio import async_sessionmaker, create_async_engine
import app.models # noqa: F401 — ensures all models register with Base.metadata
from app.core.auth import AuthUser, get_current_user
from app.core.database import Base, get_session
from app.main import app
TEST_JWT_SECRET = "test-jwt-secret-for-testing-only"
TEST_DATABASE_URL = os.getenv(
"TEST_DATABASE_URL",
"postgresql+asyncpg://postgres:postgres@localhost:5433/nuzlocke_test",
@@ -59,3 +64,43 @@ async def client(db_session):
transport=ASGITransport(app=app), base_url="http://test"
) as ac:
yield ac
@pytest.fixture
def mock_auth_user():
"""Return a mock authenticated user for tests."""
return AuthUser(id="test-user-123", email="test@example.com", role="authenticated")
@pytest.fixture
def auth_override(mock_auth_user):
"""Override get_current_user to return a mock user."""
def _override():
return mock_auth_user
app.dependency_overrides[get_current_user] = _override
yield
app.dependency_overrides.pop(get_current_user, None)
@pytest.fixture
async def auth_client(db_session, auth_override):
"""Async HTTP client with mocked authentication."""
async with AsyncClient(
transport=ASGITransport(app=app), base_url="http://test"
) as ac:
yield ac
@pytest.fixture
def valid_token():
"""Generate a valid JWT token for testing."""
payload = {
"sub": "test-user-123",
"email": "test@example.com",
"role": "authenticated",
"aud": "authenticated",
"exp": int(time.time()) + 3600,
}
return jwt.encode(payload, TEST_JWT_SECRET, algorithm="HS256")

179
backend/tests/test_auth.py Normal file
View File

@@ -0,0 +1,179 @@
import time
import jwt
import pytest
from httpx import ASGITransport, AsyncClient
from app.core.auth import AuthUser, get_current_user, require_auth
from app.core.config import settings
from app.main import app
@pytest.fixture
def jwt_secret():
"""Provide a test JWT secret."""
return "test-jwt-secret-for-testing-only"
@pytest.fixture
def valid_token(jwt_secret):
"""Generate a valid JWT token."""
payload = {
"sub": "user-123",
"email": "test@example.com",
"role": "authenticated",
"aud": "authenticated",
"exp": int(time.time()) + 3600,
}
return jwt.encode(payload, jwt_secret, algorithm="HS256")
@pytest.fixture
def expired_token(jwt_secret):
"""Generate an expired JWT token."""
payload = {
"sub": "user-123",
"email": "test@example.com",
"role": "authenticated",
"aud": "authenticated",
"exp": int(time.time()) - 3600, # Expired 1 hour ago
}
return jwt.encode(payload, jwt_secret, algorithm="HS256")
@pytest.fixture
def invalid_token():
"""Generate a token signed with wrong secret."""
payload = {
"sub": "user-123",
"email": "test@example.com",
"role": "authenticated",
"aud": "authenticated",
"exp": int(time.time()) + 3600,
}
return jwt.encode(payload, "wrong-secret", algorithm="HS256")
@pytest.fixture
def auth_client(db_session, jwt_secret, valid_token, monkeypatch):
"""Client with valid auth token and configured JWT secret."""
monkeypatch.setattr(settings, "supabase_jwt_secret", jwt_secret)
async def _get_client():
async with AsyncClient(
transport=ASGITransport(app=app),
base_url="http://test",
headers={"Authorization": f"Bearer {valid_token}"},
) as ac:
yield ac
return _get_client
async def test_get_current_user_valid_token(jwt_secret, valid_token, monkeypatch):
"""Test get_current_user returns user for valid token."""
monkeypatch.setattr(settings, "supabase_jwt_secret", jwt_secret)
class MockRequest:
headers = {"Authorization": f"Bearer {valid_token}"}
user = get_current_user(MockRequest())
assert user is not None
assert user.id == "user-123"
assert user.email == "test@example.com"
assert user.role == "authenticated"
async def test_get_current_user_no_token(jwt_secret, monkeypatch):
"""Test get_current_user returns None when no token."""
monkeypatch.setattr(settings, "supabase_jwt_secret", jwt_secret)
class MockRequest:
headers = {}
user = get_current_user(MockRequest())
assert user is None
async def test_get_current_user_expired_token(jwt_secret, expired_token, monkeypatch):
"""Test get_current_user returns None for expired token."""
monkeypatch.setattr(settings, "supabase_jwt_secret", jwt_secret)
class MockRequest:
headers = {"Authorization": f"Bearer {expired_token}"}
user = get_current_user(MockRequest())
assert user is None
async def test_get_current_user_invalid_token(jwt_secret, invalid_token, monkeypatch):
"""Test get_current_user returns None for invalid token."""
monkeypatch.setattr(settings, "supabase_jwt_secret", jwt_secret)
class MockRequest:
headers = {"Authorization": f"Bearer {invalid_token}"}
user = get_current_user(MockRequest())
assert user is None
async def test_get_current_user_malformed_header(jwt_secret, monkeypatch):
"""Test get_current_user returns None for malformed auth header."""
monkeypatch.setattr(settings, "supabase_jwt_secret", jwt_secret)
class MockRequest:
headers = {"Authorization": "NotBearer token"}
user = get_current_user(MockRequest())
assert user is None
async def test_require_auth_valid_user():
"""Test require_auth passes through valid user."""
user = AuthUser(id="user-123", email="test@example.com")
result = require_auth(user)
assert result is user
async def test_require_auth_no_user():
"""Test require_auth raises 401 for no user."""
from fastapi import HTTPException
with pytest.raises(HTTPException) as exc_info:
require_auth(None)
assert exc_info.value.status_code == 401
assert exc_info.value.detail == "Authentication required"
async def test_protected_endpoint_without_token(db_session):
"""Test that write endpoint returns 401 without token."""
async with AsyncClient(
transport=ASGITransport(app=app), base_url="http://test"
) as ac:
response = await ac.post("/runs", json={"game_id": 1, "name": "Test Run"})
assert response.status_code == 401
assert response.json()["detail"] == "Authentication required"
async def test_protected_endpoint_with_expired_token(
db_session, jwt_secret, expired_token, monkeypatch
):
"""Test that write endpoint returns 401 with expired token."""
monkeypatch.setattr(settings, "supabase_jwt_secret", jwt_secret)
async with AsyncClient(
transport=ASGITransport(app=app),
base_url="http://test",
headers={"Authorization": f"Bearer {expired_token}"},
) as ac:
response = await ac.post("/runs", json={"game_id": 1, "name": "Test Run"})
assert response.status_code == 401
async def test_read_endpoint_without_token(db_session):
"""Test that read endpoints work without authentication."""
async with AsyncClient(
transport=ASGITransport(app=app), base_url="http://test"
) as ac:
response = await ac.get("/runs")
assert response.status_code == 200

View File

@@ -17,9 +17,9 @@ GAME_PAYLOAD = {
@pytest.fixture
async def game(client: AsyncClient) -> dict:
async def game(auth_client: AsyncClient) -> dict:
"""A game created via the API (no version_group_id)."""
response = await client.post(BASE, json=GAME_PAYLOAD)
response = await auth_client.post(BASE, json=GAME_PAYLOAD)
assert response.status_code == 201
return response.json()
@@ -68,22 +68,24 @@ class TestListGames:
class TestCreateGame:
async def test_creates_and_returns_game(self, client: AsyncClient):
response = await client.post(BASE, json=GAME_PAYLOAD)
async def test_creates_and_returns_game(self, auth_client: AsyncClient):
response = await auth_client.post(BASE, json=GAME_PAYLOAD)
assert response.status_code == 201
data = response.json()
assert data["name"] == "Pokemon Red"
assert data["slug"] == "red"
assert isinstance(data["id"], int)
async def test_duplicate_slug_returns_409(self, client: AsyncClient, game: dict):
response = await client.post(
async def test_duplicate_slug_returns_409(
self, auth_client: AsyncClient, game: dict
):
response = await auth_client.post(
BASE, json={**GAME_PAYLOAD, "name": "Pokemon Red v2"}
)
assert response.status_code == 409
async def test_missing_required_field_returns_422(self, client: AsyncClient):
response = await client.post(BASE, json={"name": "Pokemon Red"})
async def test_missing_required_field_returns_422(self, auth_client: AsyncClient):
response = await auth_client.post(BASE, json={"name": "Pokemon Red"})
assert response.status_code == 422
@@ -113,29 +115,35 @@ class TestGetGame:
class TestUpdateGame:
async def test_updates_name(self, client: AsyncClient, game: dict):
response = await client.put(
async def test_updates_name(self, auth_client: AsyncClient, game: dict):
response = await auth_client.put(
f"{BASE}/{game['id']}", json={"name": "Pokemon Blue"}
)
assert response.status_code == 200
assert response.json()["name"] == "Pokemon Blue"
async def test_slug_unchanged_on_partial_update(
self, client: AsyncClient, game: dict
self, auth_client: AsyncClient, game: dict
):
response = await client.put(f"{BASE}/{game['id']}", json={"name": "New Name"})
response = await auth_client.put(
f"{BASE}/{game['id']}", json={"name": "New Name"}
)
assert response.json()["slug"] == "red"
async def test_not_found_returns_404(self, client: AsyncClient):
assert (await client.put(f"{BASE}/9999", json={"name": "x"})).status_code == 404
async def test_not_found_returns_404(self, auth_client: AsyncClient):
assert (
await auth_client.put(f"{BASE}/9999", json={"name": "x"})
).status_code == 404
async def test_duplicate_slug_returns_409(self, client: AsyncClient):
await client.post(BASE, json={**GAME_PAYLOAD, "slug": "blue", "name": "Blue"})
r1 = await client.post(
async def test_duplicate_slug_returns_409(self, auth_client: AsyncClient):
await auth_client.post(
BASE, json={**GAME_PAYLOAD, "slug": "blue", "name": "Blue"}
)
r1 = await auth_client.post(
BASE, json={**GAME_PAYLOAD, "slug": "red", "name": "Red"}
)
game_id = r1.json()["id"]
response = await client.put(f"{BASE}/{game_id}", json={"slug": "blue"})
response = await auth_client.put(f"{BASE}/{game_id}", json={"slug": "blue"})
assert response.status_code == 409
@@ -145,13 +153,13 @@ class TestUpdateGame:
class TestDeleteGame:
async def test_deletes_game(self, client: AsyncClient, game: dict):
response = await client.delete(f"{BASE}/{game['id']}")
async def test_deletes_game(self, auth_client: AsyncClient, game: dict):
response = await auth_client.delete(f"{BASE}/{game['id']}")
assert response.status_code == 204
assert (await client.get(f"{BASE}/{game['id']}")).status_code == 404
assert (await auth_client.get(f"{BASE}/{game['id']}")).status_code == 404
async def test_not_found_returns_404(self, client: AsyncClient):
assert (await client.delete(f"{BASE}/9999")).status_code == 404
async def test_not_found_returns_404(self, auth_client: AsyncClient):
assert (await auth_client.delete(f"{BASE}/9999")).status_code == 404
# ---------------------------------------------------------------------------
@@ -187,9 +195,9 @@ class TestListByRegion:
class TestCreateRoute:
async def test_creates_route(self, client: AsyncClient, game_with_vg: tuple):
async def test_creates_route(self, auth_client: AsyncClient, game_with_vg: tuple):
game_id, _ = game_with_vg
response = await client.post(
response = await auth_client.post(
f"{BASE}/{game_id}/routes",
json={"name": "Pallet Town", "order": 1},
)
@@ -200,35 +208,35 @@ class TestCreateRoute:
assert isinstance(data["id"], int)
async def test_game_detail_includes_route(
self, client: AsyncClient, game_with_vg: tuple
self, auth_client: AsyncClient, game_with_vg: tuple
):
game_id, _ = game_with_vg
await client.post(
await auth_client.post(
f"{BASE}/{game_id}/routes", json={"name": "Route 1", "order": 1}
)
response = await client.get(f"{BASE}/{game_id}")
response = await auth_client.get(f"{BASE}/{game_id}")
routes = response.json()["routes"]
assert len(routes) == 1
assert routes[0]["name"] == "Route 1"
async def test_game_without_version_group_returns_400(
self, client: AsyncClient, game: dict
self, auth_client: AsyncClient, game: dict
):
response = await client.post(
response = await auth_client.post(
f"{BASE}/{game['id']}/routes",
json={"name": "Route 1", "order": 1},
)
assert response.status_code == 400
async def test_list_routes_excludes_routes_without_encounters(
self, client: AsyncClient, game_with_vg: tuple
self, auth_client: AsyncClient, game_with_vg: tuple
):
"""list_game_routes only returns routes that have Pokemon encounters."""
game_id, _ = game_with_vg
await client.post(
await auth_client.post(
f"{BASE}/{game_id}/routes", json={"name": "Route 1", "order": 1}
)
response = await client.get(f"{BASE}/{game_id}/routes?flat=true")
response = await auth_client.get(f"{BASE}/{game_id}/routes?flat=true")
assert response.status_code == 200
assert response.json() == []
@@ -239,14 +247,16 @@ class TestCreateRoute:
class TestUpdateRoute:
async def test_updates_route_name(self, client: AsyncClient, game_with_vg: tuple):
async def test_updates_route_name(
self, auth_client: AsyncClient, game_with_vg: tuple
):
game_id, _ = game_with_vg
r = (
await client.post(
await auth_client.post(
f"{BASE}/{game_id}/routes", json={"name": "Old Name", "order": 1}
)
).json()
response = await client.put(
response = await auth_client.put(
f"{BASE}/{game_id}/routes/{r['id']}",
json={"name": "New Name"},
)
@@ -254,11 +264,11 @@ class TestUpdateRoute:
assert response.json()["name"] == "New Name"
async def test_route_not_found_returns_404(
self, client: AsyncClient, game_with_vg: tuple
self, auth_client: AsyncClient, game_with_vg: tuple
):
game_id, _ = game_with_vg
assert (
await client.put(f"{BASE}/{game_id}/routes/9999", json={"name": "x"})
await auth_client.put(f"{BASE}/{game_id}/routes/9999", json={"name": "x"})
).status_code == 404
@@ -268,25 +278,27 @@ class TestUpdateRoute:
class TestDeleteRoute:
async def test_deletes_route(self, client: AsyncClient, game_with_vg: tuple):
async def test_deletes_route(self, auth_client: AsyncClient, game_with_vg: tuple):
game_id, _ = game_with_vg
r = (
await client.post(
await auth_client.post(
f"{BASE}/{game_id}/routes", json={"name": "Route 1", "order": 1}
)
).json()
assert (
await client.delete(f"{BASE}/{game_id}/routes/{r['id']}")
await auth_client.delete(f"{BASE}/{game_id}/routes/{r['id']}")
).status_code == 204
# No longer in game detail
detail = (await client.get(f"{BASE}/{game_id}")).json()
detail = (await auth_client.get(f"{BASE}/{game_id}")).json()
assert all(route["id"] != r["id"] for route in detail["routes"])
async def test_route_not_found_returns_404(
self, client: AsyncClient, game_with_vg: tuple
self, auth_client: AsyncClient, game_with_vg: tuple
):
game_id, _ = game_with_vg
assert (await client.delete(f"{BASE}/{game_id}/routes/9999")).status_code == 404
assert (
await auth_client.delete(f"{BASE}/{game_id}/routes/9999")
).status_code == 404
# ---------------------------------------------------------------------------
@@ -295,20 +307,20 @@ class TestDeleteRoute:
class TestReorderRoutes:
async def test_reorders_routes(self, client: AsyncClient, game_with_vg: tuple):
async def test_reorders_routes(self, auth_client: AsyncClient, game_with_vg: tuple):
game_id, _ = game_with_vg
r1 = (
await client.post(
await auth_client.post(
f"{BASE}/{game_id}/routes", json={"name": "A", "order": 1}
)
).json()
r2 = (
await client.post(
await auth_client.post(
f"{BASE}/{game_id}/routes", json={"name": "B", "order": 2}
)
).json()
response = await client.put(
response = await auth_client.put(
f"{BASE}/{game_id}/routes/reorder",
json={
"routes": [{"id": r1["id"], "order": 2}, {"id": r2["id"], "order": 1}]

View File

@@ -30,9 +30,11 @@ async def game_id(db_session: AsyncSession) -> int:
@pytest.fixture
async def run(client: AsyncClient, game_id: int) -> dict:
async def run(auth_client: AsyncClient, game_id: int) -> dict:
"""An active run created via the API."""
response = await client.post(RUNS_BASE, json={"gameId": game_id, "name": "My Run"})
response = await auth_client.post(
RUNS_BASE, json={"gameId": game_id, "name": "My Run"}
)
assert response.status_code == 201
return response.json()
@@ -127,8 +129,8 @@ class TestListRuns:
class TestCreateRun:
async def test_creates_active_run(self, client: AsyncClient, game_id: int):
response = await client.post(
async def test_creates_active_run(self, auth_client: AsyncClient, game_id: int):
response = await auth_client.post(
RUNS_BASE, json={"gameId": game_id, "name": "New Run"}
)
assert response.status_code == 201
@@ -138,20 +140,22 @@ class TestCreateRun:
assert data["gameId"] == game_id
assert isinstance(data["id"], int)
async def test_rules_stored(self, client: AsyncClient, game_id: int):
async def test_rules_stored(self, auth_client: AsyncClient, game_id: int):
rules = {"duplicatesClause": True, "shinyClause": False}
response = await client.post(
response = await auth_client.post(
RUNS_BASE, json={"gameId": game_id, "name": "Run", "rules": rules}
)
assert response.status_code == 201
assert response.json()["rules"]["duplicatesClause"] is True
async def test_invalid_game_returns_404(self, client: AsyncClient):
response = await client.post(RUNS_BASE, json={"gameId": 9999, "name": "Run"})
async def test_invalid_game_returns_404(self, auth_client: AsyncClient):
response = await auth_client.post(
RUNS_BASE, json={"gameId": 9999, "name": "Run"}
)
assert response.status_code == 404
async def test_missing_required_returns_422(self, client: AsyncClient):
response = await client.post(RUNS_BASE, json={"name": "Run"})
async def test_missing_required_returns_422(self, auth_client: AsyncClient):
response = await auth_client.post(RUNS_BASE, json={"name": "Run"})
assert response.status_code == 422
@@ -181,15 +185,17 @@ class TestGetRun:
class TestUpdateRun:
async def test_updates_name(self, client: AsyncClient, run: dict):
response = await client.patch(
async def test_updates_name(self, auth_client: AsyncClient, run: dict):
response = await auth_client.patch(
f"{RUNS_BASE}/{run['id']}", json={"name": "Renamed"}
)
assert response.status_code == 200
assert response.json()["name"] == "Renamed"
async def test_complete_run_sets_completed_at(self, client: AsyncClient, run: dict):
response = await client.patch(
async def test_complete_run_sets_completed_at(
self, auth_client: AsyncClient, run: dict
):
response = await auth_client.patch(
f"{RUNS_BASE}/{run['id']}", json={"status": "completed"}
)
assert response.status_code == 200
@@ -197,25 +203,27 @@ class TestUpdateRun:
assert data["status"] == "completed"
assert data["completedAt"] is not None
async def test_fail_run(self, client: AsyncClient, run: dict):
response = await client.patch(
async def test_fail_run(self, auth_client: AsyncClient, run: dict):
response = await auth_client.patch(
f"{RUNS_BASE}/{run['id']}", json={"status": "failed"}
)
assert response.status_code == 200
assert response.json()["status"] == "failed"
async def test_ending_already_ended_run_returns_400(
self, client: AsyncClient, run: dict
self, auth_client: AsyncClient, run: dict
):
await client.patch(f"{RUNS_BASE}/{run['id']}", json={"status": "completed"})
response = await client.patch(
await auth_client.patch(
f"{RUNS_BASE}/{run['id']}", json={"status": "completed"}
)
response = await auth_client.patch(
f"{RUNS_BASE}/{run['id']}", json={"status": "failed"}
)
assert response.status_code == 400
async def test_not_found_returns_404(self, client: AsyncClient):
async def test_not_found_returns_404(self, auth_client: AsyncClient):
assert (
await client.patch(f"{RUNS_BASE}/9999", json={"name": "x"})
await auth_client.patch(f"{RUNS_BASE}/9999", json={"name": "x"})
).status_code == 404
@@ -225,12 +233,12 @@ class TestUpdateRun:
class TestDeleteRun:
async def test_deletes_run(self, client: AsyncClient, run: dict):
assert (await client.delete(f"{RUNS_BASE}/{run['id']}")).status_code == 204
assert (await client.get(f"{RUNS_BASE}/{run['id']}")).status_code == 404
async def test_deletes_run(self, auth_client: AsyncClient, run: dict):
assert (await auth_client.delete(f"{RUNS_BASE}/{run['id']}")).status_code == 204
assert (await auth_client.get(f"{RUNS_BASE}/{run['id']}")).status_code == 404
async def test_not_found_returns_404(self, client: AsyncClient):
assert (await client.delete(f"{RUNS_BASE}/9999")).status_code == 404
async def test_not_found_returns_404(self, auth_client: AsyncClient):
assert (await auth_client.delete(f"{RUNS_BASE}/9999")).status_code == 404
# ---------------------------------------------------------------------------
@@ -239,8 +247,8 @@ class TestDeleteRun:
class TestCreateEncounter:
async def test_creates_encounter(self, client: AsyncClient, enc_ctx: dict):
response = await client.post(
async def test_creates_encounter(self, auth_client: AsyncClient, enc_ctx: dict):
response = await auth_client.post(
f"{RUNS_BASE}/{enc_ctx['run_id']}/encounters",
json={
"routeId": enc_ctx["standalone_id"],
@@ -255,8 +263,10 @@ class TestCreateEncounter:
assert data["status"] == "caught"
assert data["isShiny"] is False
async def test_invalid_run_returns_404(self, client: AsyncClient, enc_ctx: dict):
response = await client.post(
async def test_invalid_run_returns_404(
self, auth_client: AsyncClient, enc_ctx: dict
):
response = await auth_client.post(
f"{RUNS_BASE}/9999/encounters",
json={
"routeId": enc_ctx["standalone_id"],
@@ -266,8 +276,10 @@ class TestCreateEncounter:
)
assert response.status_code == 404
async def test_invalid_route_returns_404(self, client: AsyncClient, enc_ctx: dict):
response = await client.post(
async def test_invalid_route_returns_404(
self, auth_client: AsyncClient, enc_ctx: dict
):
response = await auth_client.post(
f"{RUNS_BASE}/{enc_ctx['run_id']}/encounters",
json={
"routeId": 9999,
@@ -278,9 +290,9 @@ class TestCreateEncounter:
assert response.status_code == 404
async def test_invalid_pokemon_returns_404(
self, client: AsyncClient, enc_ctx: dict
self, auth_client: AsyncClient, enc_ctx: dict
):
response = await client.post(
response = await auth_client.post(
f"{RUNS_BASE}/{enc_ctx['run_id']}/encounters",
json={
"routeId": enc_ctx["standalone_id"],
@@ -290,9 +302,11 @@ class TestCreateEncounter:
)
assert response.status_code == 404
async def test_parent_route_rejected_400(self, client: AsyncClient, enc_ctx: dict):
async def test_parent_route_rejected_400(
self, auth_client: AsyncClient, enc_ctx: dict
):
"""Cannot create an encounter directly on a parent route (use child routes)."""
response = await client.post(
response = await auth_client.post(
f"{RUNS_BASE}/{enc_ctx['run_id']}/encounters",
json={
"routeId": enc_ctx["parent_id"],
@@ -303,10 +317,10 @@ class TestCreateEncounter:
assert response.status_code == 400
async def test_route_lock_prevents_second_sibling_encounter(
self, client: AsyncClient, enc_ctx: dict
self, auth_client: AsyncClient, enc_ctx: dict
):
"""Once a sibling child has an encounter, other siblings in the group return 409."""
await client.post(
await auth_client.post(
f"{RUNS_BASE}/{enc_ctx['run_id']}/encounters",
json={
"routeId": enc_ctx["child1_id"],
@@ -314,7 +328,7 @@ class TestCreateEncounter:
"status": "caught",
},
)
response = await client.post(
response = await auth_client.post(
f"{RUNS_BASE}/{enc_ctx['run_id']}/encounters",
json={
"routeId": enc_ctx["child2_id"],
@@ -325,11 +339,11 @@ class TestCreateEncounter:
assert response.status_code == 409
async def test_shiny_bypasses_route_lock(
self, client: AsyncClient, enc_ctx: dict, db_session: AsyncSession
self, auth_client: AsyncClient, enc_ctx: dict, db_session: AsyncSession
):
"""A shiny encounter bypasses the route-lock when shinyClause is enabled."""
# First encounter occupies the group
await client.post(
await auth_client.post(
f"{RUNS_BASE}/{enc_ctx['run_id']}/encounters",
json={
"routeId": enc_ctx["child1_id"],
@@ -338,7 +352,7 @@ class TestCreateEncounter:
},
)
# Shiny encounter on sibling should succeed
response = await client.post(
response = await auth_client.post(
f"{RUNS_BASE}/{enc_ctx['run_id']}/encounters",
json={
"routeId": enc_ctx["child2_id"],
@@ -351,7 +365,7 @@ class TestCreateEncounter:
assert response.json()["isShiny"] is True
async def test_gift_bypasses_route_lock_when_clause_on(
self, client: AsyncClient, enc_ctx: dict, db_session: AsyncSession
self, auth_client: AsyncClient, enc_ctx: dict, db_session: AsyncSession
):
"""A gift encounter bypasses route-lock when giftClause is enabled."""
# Enable giftClause on the run
@@ -359,7 +373,7 @@ class TestCreateEncounter:
run.rules = {"shinyClause": True, "giftClause": True}
await db_session.commit()
await client.post(
await auth_client.post(
f"{RUNS_BASE}/{enc_ctx['run_id']}/encounters",
json={
"routeId": enc_ctx["child1_id"],
@@ -367,7 +381,7 @@ class TestCreateEncounter:
"status": "caught",
},
)
response = await client.post(
response = await auth_client.post(
f"{RUNS_BASE}/{enc_ctx['run_id']}/encounters",
json={
"routeId": enc_ctx["child2_id"],
@@ -387,8 +401,8 @@ class TestCreateEncounter:
class TestUpdateEncounter:
@pytest.fixture
async def encounter(self, client: AsyncClient, enc_ctx: dict) -> dict:
response = await client.post(
async def encounter(self, auth_client: AsyncClient, enc_ctx: dict) -> dict:
response = await auth_client.post(
f"{RUNS_BASE}/{enc_ctx['run_id']}/encounters",
json={
"routeId": enc_ctx["standalone_id"],
@@ -398,17 +412,17 @@ class TestUpdateEncounter:
)
return response.json()
async def test_updates_nickname(self, client: AsyncClient, encounter: dict):
response = await client.patch(
async def test_updates_nickname(self, auth_client: AsyncClient, encounter: dict):
response = await auth_client.patch(
f"{ENC_BASE}/{encounter['id']}", json={"nickname": "Sparky"}
)
assert response.status_code == 200
assert response.json()["nickname"] == "Sparky"
async def test_updates_status_to_fainted(
self, client: AsyncClient, encounter: dict
self, auth_client: AsyncClient, encounter: dict
):
response = await client.patch(
response = await auth_client.patch(
f"{ENC_BASE}/{encounter['id']}",
json={"status": "fainted", "faintLevel": 12, "deathCause": "wild battle"},
)
@@ -418,9 +432,9 @@ class TestUpdateEncounter:
assert data["faintLevel"] == 12
assert data["deathCause"] == "wild battle"
async def test_not_found_returns_404(self, client: AsyncClient):
async def test_not_found_returns_404(self, auth_client: AsyncClient):
assert (
await client.patch(f"{ENC_BASE}/9999", json={"nickname": "x"})
await auth_client.patch(f"{ENC_BASE}/9999", json={"nickname": "x"})
).status_code == 404
@@ -431,8 +445,8 @@ class TestUpdateEncounter:
class TestDeleteEncounter:
@pytest.fixture
async def encounter(self, client: AsyncClient, enc_ctx: dict) -> dict:
response = await client.post(
async def encounter(self, auth_client: AsyncClient, enc_ctx: dict) -> dict:
response = await auth_client.post(
f"{RUNS_BASE}/{enc_ctx['run_id']}/encounters",
json={
"routeId": enc_ctx["standalone_id"],
@@ -443,12 +457,14 @@ class TestDeleteEncounter:
return response.json()
async def test_deletes_encounter(
self, client: AsyncClient, encounter: dict, enc_ctx: dict
self, auth_client: AsyncClient, encounter: dict, enc_ctx: dict
):
assert (await client.delete(f"{ENC_BASE}/{encounter['id']}")).status_code == 204
assert (
await auth_client.delete(f"{ENC_BASE}/{encounter['id']}")
).status_code == 204
# Run detail should no longer include it
detail = (await client.get(f"{RUNS_BASE}/{enc_ctx['run_id']}")).json()
detail = (await auth_client.get(f"{RUNS_BASE}/{enc_ctx['run_id']}")).json()
assert all(e["id"] != encounter["id"] for e in detail["encounters"])
async def test_not_found_returns_404(self, client: AsyncClient):
assert (await client.delete(f"{ENC_BASE}/9999")).status_code == 404
async def test_not_found_returns_404(self, auth_client: AsyncClient):
assert (await auth_client.delete(f"{ENC_BASE}/9999")).status_code == 404