Add user authentication with login/signup/protected routes, boss pokemon detail fields and result team tracking, moves and abilities selector components and API, run ownership and visibility controls, and various UI improvements across encounters, run list, and journal pages. Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
539 lines
18 KiB
Python
539 lines
18 KiB
Python
from datetime import UTC, datetime
|
|
from uuid import UUID
|
|
|
|
from fastapi import APIRouter, Depends, HTTPException, Request, Response
|
|
from sqlalchemy import func, select
|
|
from sqlalchemy.ext.asyncio import AsyncSession
|
|
from sqlalchemy.orm import joinedload, selectinload
|
|
|
|
from app.core.auth import AuthUser, get_current_user, require_auth
|
|
from app.core.database import get_session
|
|
from app.models.boss_result import BossResult
|
|
from app.models.encounter import Encounter
|
|
from app.models.evolution import Evolution
|
|
from app.models.game import Game
|
|
from app.models.genlocke import GenlockeLeg
|
|
from app.models.genlocke_transfer import GenlockeTransfer
|
|
from app.models.nuzlocke_run import NuzlockeRun, RunVisibility
|
|
from app.models.user import User
|
|
from app.schemas.run import (
|
|
OwnerResponse,
|
|
RunCreate,
|
|
RunDetailResponse,
|
|
RunGenlockeContext,
|
|
RunResponse,
|
|
RunUpdate,
|
|
)
|
|
from app.services.families import build_families
|
|
from app.services.naming import (
|
|
get_naming_categories,
|
|
strip_roman_suffix,
|
|
suggest_names,
|
|
to_roman,
|
|
)
|
|
|
|
router = APIRouter()
|
|
|
|
|
|
@router.get("/naming-categories", response_model=list[str])
|
|
async def list_naming_categories():
|
|
return get_naming_categories()
|
|
|
|
|
|
@router.get("/{run_id}/name-suggestions", response_model=list[str])
|
|
async def get_name_suggestions(
|
|
run_id: int,
|
|
count: int = 10,
|
|
pokemon_id: int | None = None,
|
|
session: AsyncSession = Depends(get_session),
|
|
):
|
|
run = await session.get(NuzlockeRun, run_id)
|
|
if run is None:
|
|
raise HTTPException(status_code=404, detail="Run not found")
|
|
|
|
if not run.naming_scheme:
|
|
return []
|
|
|
|
# Collect nicknames already used in this run
|
|
result = await session.execute(
|
|
select(Encounter.nickname).where(
|
|
Encounter.run_id == run_id,
|
|
Encounter.nickname.isnot(None),
|
|
)
|
|
)
|
|
used_names = {row[0] for row in result}
|
|
|
|
lineage_suggestion: str | None = None
|
|
|
|
# Lineage-aware suggestion: check if this run belongs to a genlocke
|
|
if pokemon_id is not None:
|
|
lineage_suggestion = await _compute_lineage_suggestion(
|
|
session, run_id, pokemon_id
|
|
)
|
|
|
|
suggestions = suggest_names(run.naming_scheme, used_names, count)
|
|
|
|
if lineage_suggestion and lineage_suggestion not in suggestions:
|
|
suggestions.insert(0, lineage_suggestion)
|
|
|
|
return suggestions
|
|
|
|
|
|
async def _compute_lineage_suggestion(
|
|
session: AsyncSession,
|
|
run_id: int,
|
|
pokemon_id: int,
|
|
) -> str | None:
|
|
"""Check previous genlocke legs for the same evolution family and suggest a name with roman numeral."""
|
|
# Find the genlocke leg for this run
|
|
leg_result = await session.execute(
|
|
select(GenlockeLeg).where(GenlockeLeg.run_id == run_id)
|
|
)
|
|
current_leg = leg_result.scalar_one_or_none()
|
|
if current_leg is None or current_leg.leg_order <= 1:
|
|
return None
|
|
|
|
# Build evolution family map
|
|
evo_result = await session.execute(select(Evolution))
|
|
evolutions = evo_result.scalars().all()
|
|
pokemon_to_family = build_families(evolutions)
|
|
|
|
family_ids = set(pokemon_to_family.get(pokemon_id, [pokemon_id]))
|
|
family_ids.add(pokemon_id)
|
|
|
|
# Get run IDs for all previous legs
|
|
prev_legs_result = await session.execute(
|
|
select(GenlockeLeg.run_id).where(
|
|
GenlockeLeg.genlocke_id == current_leg.genlocke_id,
|
|
GenlockeLeg.leg_order < current_leg.leg_order,
|
|
GenlockeLeg.run_id.isnot(None),
|
|
)
|
|
)
|
|
prev_run_ids = [row[0] for row in prev_legs_result]
|
|
if not prev_run_ids:
|
|
return None
|
|
|
|
# Get transfer target encounter IDs (these are not "original" catches)
|
|
transfer_targets_result = await session.execute(
|
|
select(GenlockeTransfer.target_encounter_id).where(
|
|
GenlockeTransfer.genlocke_id == current_leg.genlocke_id,
|
|
)
|
|
)
|
|
transfer_target_ids = {row[0] for row in transfer_targets_result}
|
|
|
|
# Find original (non-transfer) encounters from previous legs matching this family
|
|
enc_result = await session.execute(
|
|
select(Encounter.id, Encounter.nickname, Encounter.run_id).where(
|
|
Encounter.run_id.in_(prev_run_ids),
|
|
Encounter.pokemon_id.in_(family_ids),
|
|
Encounter.status == "caught",
|
|
Encounter.nickname.isnot(None),
|
|
)
|
|
)
|
|
matches = [
|
|
(row[0], row[1], row[2])
|
|
for row in enc_result
|
|
if row[0] not in transfer_target_ids
|
|
]
|
|
|
|
if not matches:
|
|
return None
|
|
|
|
# Use the nickname from the first encounter (earliest leg)
|
|
# Build run_id -> leg_order mapping for sorting
|
|
leg_order_result = await session.execute(
|
|
select(GenlockeLeg.run_id, GenlockeLeg.leg_order).where(
|
|
GenlockeLeg.genlocke_id == current_leg.genlocke_id,
|
|
GenlockeLeg.run_id.in_(prev_run_ids),
|
|
)
|
|
)
|
|
run_to_leg_order = {row[0]: row[1] for row in leg_order_result}
|
|
|
|
# Sort by leg order to find the first appearance
|
|
matches.sort(key=lambda m: run_to_leg_order.get(m[2], 0))
|
|
base_name = strip_roman_suffix(matches[0][1])
|
|
|
|
# Count distinct legs with original encounters for this family
|
|
legs_with_family = len({run_to_leg_order.get(m[2]) for m in matches})
|
|
|
|
# The new one would be the next numeral (legs_with_family + 1)
|
|
numeral = to_roman(legs_with_family + 1)
|
|
return f"{base_name} {numeral}"
|
|
|
|
|
|
def _build_run_response(run: NuzlockeRun) -> RunResponse:
|
|
"""Build RunResponse with owner info if present."""
|
|
owner = None
|
|
if run.owner:
|
|
owner = OwnerResponse(id=run.owner.id, display_name=run.owner.display_name)
|
|
return RunResponse(
|
|
id=run.id,
|
|
game_id=run.game_id,
|
|
name=run.name,
|
|
status=run.status,
|
|
rules=run.rules,
|
|
hof_encounter_ids=run.hof_encounter_ids,
|
|
naming_scheme=run.naming_scheme,
|
|
visibility=run.visibility,
|
|
owner=owner,
|
|
started_at=run.started_at,
|
|
completed_at=run.completed_at,
|
|
)
|
|
|
|
|
|
def _check_run_access(
|
|
run: NuzlockeRun, user: AuthUser | None, require_owner: bool = False
|
|
) -> None:
|
|
"""
|
|
Check if user can access the run.
|
|
Raises 403 for private runs if user is not owner.
|
|
If require_owner=True, always requires ownership (for mutations).
|
|
"""
|
|
if run.owner_id is None:
|
|
# Unowned runs are accessible by everyone (legacy)
|
|
if require_owner:
|
|
raise HTTPException(
|
|
status_code=403, detail="Only the run owner can perform this action"
|
|
)
|
|
return
|
|
|
|
user_id = UUID(user.id) if user else None
|
|
|
|
if require_owner:
|
|
if user_id != run.owner_id:
|
|
raise HTTPException(
|
|
status_code=403, detail="Only the run owner can perform this action"
|
|
)
|
|
return
|
|
|
|
if run.visibility == RunVisibility.PRIVATE and user_id != run.owner_id:
|
|
raise HTTPException(status_code=403, detail="This run is private")
|
|
|
|
|
|
@router.post("", response_model=RunResponse, status_code=201)
|
|
async def create_run(
|
|
data: RunCreate,
|
|
session: AsyncSession = Depends(get_session),
|
|
user: AuthUser = Depends(require_auth),
|
|
):
|
|
# Validate game exists
|
|
game = await session.get(Game, data.game_id)
|
|
if game is None:
|
|
raise HTTPException(status_code=404, detail="Game not found")
|
|
|
|
# Ensure user exists in local DB
|
|
user_id = UUID(user.id)
|
|
db_user = await session.get(User, user_id)
|
|
if db_user is None:
|
|
db_user = User(id=user_id, email=user.email or "")
|
|
session.add(db_user)
|
|
|
|
run = NuzlockeRun(
|
|
game_id=data.game_id,
|
|
owner_id=user_id,
|
|
name=data.name,
|
|
status="active",
|
|
visibility=data.visibility,
|
|
rules=data.rules,
|
|
naming_scheme=data.naming_scheme,
|
|
)
|
|
session.add(run)
|
|
await session.commit()
|
|
|
|
# Reload with owner relationship
|
|
result = await session.execute(
|
|
select(NuzlockeRun)
|
|
.where(NuzlockeRun.id == run.id)
|
|
.options(joinedload(NuzlockeRun.owner))
|
|
)
|
|
run = result.scalar_one()
|
|
return _build_run_response(run)
|
|
|
|
|
|
@router.get("", response_model=list[RunResponse])
|
|
async def list_runs(
|
|
request: Request,
|
|
session: AsyncSession = Depends(get_session),
|
|
user: AuthUser | None = Depends(get_current_user),
|
|
):
|
|
"""
|
|
List runs. Shows public runs and user's own private runs.
|
|
"""
|
|
query = select(NuzlockeRun).options(joinedload(NuzlockeRun.owner))
|
|
|
|
if user:
|
|
user_id = UUID(user.id)
|
|
# Show public runs OR runs owned by current user
|
|
query = query.where(
|
|
(NuzlockeRun.visibility == RunVisibility.PUBLIC)
|
|
| (NuzlockeRun.owner_id == user_id)
|
|
)
|
|
else:
|
|
# Anonymous: only public runs
|
|
query = query.where(NuzlockeRun.visibility == RunVisibility.PUBLIC)
|
|
|
|
query = query.order_by(NuzlockeRun.started_at.desc())
|
|
result = await session.execute(query)
|
|
runs = result.scalars().all()
|
|
return [_build_run_response(run) for run in runs]
|
|
|
|
|
|
@router.get("/{run_id}", response_model=RunDetailResponse)
|
|
async def get_run(
|
|
run_id: int,
|
|
request: Request,
|
|
session: AsyncSession = Depends(get_session),
|
|
user: AuthUser | None = Depends(get_current_user),
|
|
):
|
|
result = await session.execute(
|
|
select(NuzlockeRun)
|
|
.where(NuzlockeRun.id == run_id)
|
|
.options(
|
|
joinedload(NuzlockeRun.game),
|
|
joinedload(NuzlockeRun.owner),
|
|
selectinload(NuzlockeRun.encounters).joinedload(Encounter.pokemon),
|
|
selectinload(NuzlockeRun.encounters).joinedload(Encounter.current_pokemon),
|
|
selectinload(NuzlockeRun.encounters).joinedload(Encounter.route),
|
|
)
|
|
)
|
|
run = result.scalar_one_or_none()
|
|
if run is None:
|
|
raise HTTPException(status_code=404, detail="Run not found")
|
|
|
|
# Check visibility access
|
|
_check_run_access(run, user)
|
|
|
|
# Check if this run belongs to a genlocke
|
|
genlocke_context = None
|
|
leg_result = await session.execute(
|
|
select(GenlockeLeg)
|
|
.where(GenlockeLeg.run_id == run_id)
|
|
.options(joinedload(GenlockeLeg.genlocke))
|
|
)
|
|
leg = leg_result.scalar_one_or_none()
|
|
if leg:
|
|
total_legs_result = await session.execute(
|
|
select(func.count())
|
|
.select_from(GenlockeLeg)
|
|
.where(GenlockeLeg.genlocke_id == leg.genlocke_id)
|
|
)
|
|
total_legs = total_legs_result.scalar_one()
|
|
|
|
# Aggregate retired Pokemon IDs from prior legs (retireHoF rule)
|
|
retired_pokemon_ids: list[int] = []
|
|
if leg.genlocke.genlocke_rules.get("retireHoF", False) and leg.leg_order > 1:
|
|
prior_result = await session.execute(
|
|
select(GenlockeLeg.retired_pokemon_ids).where(
|
|
GenlockeLeg.genlocke_id == leg.genlocke_id,
|
|
GenlockeLeg.leg_order < leg.leg_order,
|
|
GenlockeLeg.retired_pokemon_ids.isnot(None),
|
|
)
|
|
)
|
|
cumulative: set[int] = set()
|
|
for (ids,) in prior_result:
|
|
cumulative.update(ids)
|
|
retired_pokemon_ids = sorted(cumulative)
|
|
|
|
genlocke_context = RunGenlockeContext(
|
|
genlocke_id=leg.genlocke_id,
|
|
genlocke_name=leg.genlocke.name,
|
|
leg_order=leg.leg_order,
|
|
total_legs=total_legs,
|
|
is_final_leg=leg.leg_order == total_legs,
|
|
retired_pokemon_ids=retired_pokemon_ids,
|
|
)
|
|
|
|
# Load transfer-target encounter IDs for this run
|
|
transfer_ids_result = await session.execute(
|
|
select(GenlockeTransfer.target_encounter_id).where(
|
|
GenlockeTransfer.target_encounter_id.in_(
|
|
select(Encounter.id).where(Encounter.run_id == run_id)
|
|
)
|
|
)
|
|
)
|
|
transfer_encounter_ids = [row[0] for row in transfer_ids_result]
|
|
|
|
response = RunDetailResponse.model_validate(run)
|
|
response.genlocke = genlocke_context
|
|
response.transfer_encounter_ids = transfer_encounter_ids
|
|
return response
|
|
|
|
|
|
@router.patch("/{run_id}", response_model=RunResponse)
|
|
async def update_run(
|
|
run_id: int,
|
|
data: RunUpdate,
|
|
session: AsyncSession = Depends(get_session),
|
|
user: AuthUser = Depends(require_auth),
|
|
):
|
|
result = await session.execute(
|
|
select(NuzlockeRun)
|
|
.where(NuzlockeRun.id == run_id)
|
|
.options(joinedload(NuzlockeRun.owner))
|
|
)
|
|
run = result.scalar_one_or_none()
|
|
if run is None:
|
|
raise HTTPException(status_code=404, detail="Run not found")
|
|
|
|
# Check ownership for mutations (unowned runs allow anyone for backwards compat)
|
|
_check_run_access(run, user, require_owner=run.owner_id is not None)
|
|
|
|
update_data = data.model_dump(exclude_unset=True)
|
|
|
|
# Validate hof_encounter_ids if provided
|
|
if (
|
|
"hof_encounter_ids" in update_data
|
|
and update_data["hof_encounter_ids"] is not None
|
|
):
|
|
hof_ids = update_data["hof_encounter_ids"]
|
|
if len(hof_ids) > 6:
|
|
raise HTTPException(
|
|
status_code=400, detail="HoF team cannot have more than 6 Pokemon"
|
|
)
|
|
if hof_ids:
|
|
# Validate all encounter IDs belong to this run and are alive
|
|
enc_result = await session.execute(
|
|
select(Encounter).where(
|
|
Encounter.id.in_(hof_ids),
|
|
Encounter.run_id == run_id,
|
|
)
|
|
)
|
|
found = {e.id: e for e in enc_result.scalars().all()}
|
|
missing = [eid for eid in hof_ids if eid not in found]
|
|
if missing:
|
|
raise HTTPException(
|
|
status_code=400,
|
|
detail=f"Encounters not found in this run: {missing}",
|
|
)
|
|
not_alive = [
|
|
eid
|
|
for eid, e in found.items()
|
|
if e.status != "caught" or e.faint_level is not None
|
|
]
|
|
if not_alive:
|
|
raise HTTPException(
|
|
status_code=400,
|
|
detail=f"Encounters are not alive: {not_alive}",
|
|
)
|
|
|
|
# Auto-set completed_at when ending a run
|
|
if "status" in update_data and update_data["status"] in ("completed", "failed"):
|
|
if run.status != "active":
|
|
raise HTTPException(status_code=400, detail="Only active runs can be ended")
|
|
update_data["completed_at"] = datetime.now(UTC)
|
|
|
|
# Block reactivating a completed/failed run that belongs to a genlocke
|
|
if (
|
|
"status" in update_data
|
|
and update_data["status"] == "active"
|
|
and run.status != "active"
|
|
):
|
|
leg_result = await session.execute(
|
|
select(GenlockeLeg).where(GenlockeLeg.run_id == run_id)
|
|
)
|
|
if leg_result.scalar_one_or_none() is not None:
|
|
raise HTTPException(
|
|
status_code=400,
|
|
detail="Cannot reactivate a genlocke-linked run. The genlocke controls leg progression.",
|
|
)
|
|
|
|
for field, value in update_data.items():
|
|
setattr(run, field, value)
|
|
|
|
# Genlocke side effects when run status changes
|
|
if "status" in update_data and update_data["status"] in ("completed", "failed"):
|
|
leg_result = await session.execute(
|
|
select(GenlockeLeg)
|
|
.where(GenlockeLeg.run_id == run_id)
|
|
.options(joinedload(GenlockeLeg.genlocke))
|
|
)
|
|
leg = leg_result.scalar_one_or_none()
|
|
if leg:
|
|
genlocke = leg.genlocke
|
|
if update_data["status"] == "failed":
|
|
genlocke.status = "failed"
|
|
elif update_data["status"] == "completed":
|
|
total_legs_result = await session.execute(
|
|
select(func.count())
|
|
.select_from(GenlockeLeg)
|
|
.where(GenlockeLeg.genlocke_id == genlocke.id)
|
|
)
|
|
total_legs = total_legs_result.scalar_one()
|
|
if leg.leg_order == total_legs:
|
|
genlocke.status = "completed"
|
|
|
|
await session.commit()
|
|
|
|
# Reload with owner relationship
|
|
result = await session.execute(
|
|
select(NuzlockeRun)
|
|
.where(NuzlockeRun.id == run.id)
|
|
.options(joinedload(NuzlockeRun.owner))
|
|
)
|
|
run = result.scalar_one()
|
|
return _build_run_response(run)
|
|
|
|
|
|
@router.delete("/{run_id}", status_code=204)
|
|
async def delete_run(
|
|
run_id: int,
|
|
session: AsyncSession = Depends(get_session),
|
|
user: AuthUser = Depends(require_auth),
|
|
):
|
|
run = await session.get(NuzlockeRun, run_id)
|
|
if run is None:
|
|
raise HTTPException(status_code=404, detail="Run not found")
|
|
|
|
# Check ownership for deletion (unowned runs allow anyone for backwards compat)
|
|
_check_run_access(run, user, require_owner=run.owner_id is not None)
|
|
|
|
# Block deletion if run is linked to a genlocke leg
|
|
leg_result = await session.execute(
|
|
select(GenlockeLeg).where(GenlockeLeg.run_id == run_id)
|
|
)
|
|
if leg_result.scalar_one_or_none() is not None:
|
|
raise HTTPException(
|
|
status_code=400,
|
|
detail="Cannot delete a run that belongs to a genlocke. Remove the leg or delete the genlocke first.",
|
|
)
|
|
|
|
# Delete associated boss results first
|
|
boss_results = await session.execute(
|
|
select(BossResult).where(BossResult.run_id == run_id)
|
|
)
|
|
for br in boss_results.scalars():
|
|
await session.delete(br)
|
|
|
|
# Delete genlocke transfers referencing this run's encounters
|
|
encounter_ids_result = await session.execute(
|
|
select(Encounter.id).where(Encounter.run_id == run_id)
|
|
)
|
|
encounter_ids = [row[0] for row in encounter_ids_result]
|
|
if encounter_ids:
|
|
transfers = await session.execute(
|
|
select(GenlockeTransfer).where(
|
|
GenlockeTransfer.source_encounter_id.in_(encounter_ids)
|
|
| GenlockeTransfer.target_encounter_id.in_(encounter_ids)
|
|
)
|
|
)
|
|
for t in transfers.scalars():
|
|
await session.delete(t)
|
|
|
|
# Delete associated encounters
|
|
encounters = await session.execute(
|
|
select(Encounter).where(Encounter.run_id == run_id)
|
|
)
|
|
for enc in encounters.scalars():
|
|
await session.delete(enc)
|
|
|
|
# Unlink from any genlocke leg
|
|
leg_result = await session.execute(
|
|
select(GenlockeLeg).where(GenlockeLeg.run_id == run_id)
|
|
)
|
|
for leg in leg_result.scalars():
|
|
leg.run_id = None
|
|
|
|
await session.delete(run)
|
|
await session.commit()
|
|
return Response(status_code=204)
|