from datetime import UTC, datetime from uuid import UUID from fastapi import APIRouter, Depends, HTTPException, Request, Response from sqlalchemy import func, select from sqlalchemy.ext.asyncio import AsyncSession from sqlalchemy.orm import joinedload, selectinload from app.core.auth import AuthUser, get_current_user, require_auth, require_run_owner from app.core.database import get_session from app.models.boss_result import BossResult from app.models.encounter import Encounter from app.models.evolution import Evolution from app.models.game import Game from app.models.genlocke import GenlockeLeg from app.models.genlocke_transfer import GenlockeTransfer from app.models.nuzlocke_run import NuzlockeRun, RunVisibility from app.models.user import User from app.schemas.run import ( OwnerResponse, RunCreate, RunDetailResponse, RunGenlockeContext, RunResponse, RunUpdate, ) from app.services.families import build_families from app.services.naming import ( get_naming_categories, strip_roman_suffix, suggest_names, to_roman, ) router = APIRouter() @router.get("/naming-categories", response_model=list[str]) async def list_naming_categories(): return get_naming_categories() @router.get("/{run_id}/name-suggestions", response_model=list[str]) async def get_name_suggestions( run_id: int, count: int = 10, pokemon_id: int | None = None, session: AsyncSession = Depends(get_session), ): run = await session.get(NuzlockeRun, run_id) if run is None: raise HTTPException(status_code=404, detail="Run not found") if not run.naming_scheme: return [] # Collect nicknames already used in this run result = await session.execute( select(Encounter.nickname).where( Encounter.run_id == run_id, Encounter.nickname.isnot(None), ) ) used_names = {row[0] for row in result} lineage_suggestion: str | None = None # Lineage-aware suggestion: check if this run belongs to a genlocke if pokemon_id is not None: lineage_suggestion = await _compute_lineage_suggestion( session, run_id, pokemon_id ) suggestions = suggest_names(run.naming_scheme, used_names, count) if lineage_suggestion and lineage_suggestion not in suggestions: suggestions.insert(0, lineage_suggestion) return suggestions async def _compute_lineage_suggestion( session: AsyncSession, run_id: int, pokemon_id: int, ) -> str | None: """Check previous genlocke legs for the same evolution family and suggest a name with roman numeral.""" # Find the genlocke leg for this run leg_result = await session.execute( select(GenlockeLeg).where(GenlockeLeg.run_id == run_id) ) current_leg = leg_result.scalar_one_or_none() if current_leg is None or current_leg.leg_order <= 1: return None # Build evolution family map evo_result = await session.execute(select(Evolution)) evolutions = evo_result.scalars().all() pokemon_to_family = build_families(evolutions) family_ids = set(pokemon_to_family.get(pokemon_id, [pokemon_id])) family_ids.add(pokemon_id) # Get run IDs for all previous legs prev_legs_result = await session.execute( select(GenlockeLeg.run_id).where( GenlockeLeg.genlocke_id == current_leg.genlocke_id, GenlockeLeg.leg_order < current_leg.leg_order, GenlockeLeg.run_id.isnot(None), ) ) prev_run_ids = [row[0] for row in prev_legs_result] if not prev_run_ids: return None # Get transfer target encounter IDs (these are not "original" catches) transfer_targets_result = await session.execute( select(GenlockeTransfer.target_encounter_id).where( GenlockeTransfer.genlocke_id == current_leg.genlocke_id, ) ) transfer_target_ids = {row[0] for row in transfer_targets_result} # Find original (non-transfer) encounters from previous legs matching this family enc_result = await session.execute( select(Encounter.id, Encounter.nickname, Encounter.run_id).where( Encounter.run_id.in_(prev_run_ids), Encounter.pokemon_id.in_(family_ids), Encounter.status == "caught", Encounter.nickname.isnot(None), ) ) matches = [ (row[0], row[1], row[2]) for row in enc_result if row[0] not in transfer_target_ids ] if not matches: return None # Use the nickname from the first encounter (earliest leg) # Build run_id -> leg_order mapping for sorting leg_order_result = await session.execute( select(GenlockeLeg.run_id, GenlockeLeg.leg_order).where( GenlockeLeg.genlocke_id == current_leg.genlocke_id, GenlockeLeg.run_id.in_(prev_run_ids), ) ) run_to_leg_order = {row[0]: row[1] for row in leg_order_result} # Sort by leg order to find the first appearance matches.sort(key=lambda m: run_to_leg_order.get(m[2], 0)) base_name = strip_roman_suffix(matches[0][1]) # Count distinct legs with original encounters for this family legs_with_family = len({run_to_leg_order.get(m[2]) for m in matches}) # The new one would be the next numeral (legs_with_family + 1) numeral = to_roman(legs_with_family + 1) return f"{base_name} {numeral}" def _build_run_response(run: NuzlockeRun) -> RunResponse: """Build RunResponse with owner info if present.""" owner = None if run.owner: owner = OwnerResponse(id=run.owner.id, display_name=run.owner.display_name) return RunResponse( id=run.id, game_id=run.game_id, name=run.name, status=run.status, rules=run.rules, hof_encounter_ids=run.hof_encounter_ids, naming_scheme=run.naming_scheme, visibility=run.visibility, owner=owner, started_at=run.started_at, completed_at=run.completed_at, ) def _check_run_read_access(run: NuzlockeRun, user: AuthUser | None) -> None: """ Check if user can read the run. Raises 403 for private runs if user is not owner. Unowned runs are readable by everyone (legacy). """ if run.owner_id is None: return user_id = UUID(user.id) if user else None if run.visibility == RunVisibility.PRIVATE and user_id != run.owner_id: raise HTTPException(status_code=403, detail="This run is private") @router.post("", response_model=RunResponse, status_code=201) async def create_run( data: RunCreate, session: AsyncSession = Depends(get_session), user: AuthUser = Depends(require_auth), ): # Validate game exists game = await session.get(Game, data.game_id) if game is None: raise HTTPException(status_code=404, detail="Game not found") # Ensure user exists in local DB user_id = UUID(user.id) db_user = await session.get(User, user_id) if db_user is None: db_user = User(id=user_id, email=user.email or "") session.add(db_user) run = NuzlockeRun( game_id=data.game_id, owner_id=user_id, name=data.name, status="active", visibility=data.visibility, rules=data.rules, naming_scheme=data.naming_scheme, ) session.add(run) await session.commit() # Reload with owner relationship result = await session.execute( select(NuzlockeRun) .where(NuzlockeRun.id == run.id) .options(joinedload(NuzlockeRun.owner)) ) run = result.scalar_one() return _build_run_response(run) @router.get("", response_model=list[RunResponse]) async def list_runs( request: Request, session: AsyncSession = Depends(get_session), user: AuthUser | None = Depends(get_current_user), ): """ List runs. Shows public runs and user's own private runs. """ query = select(NuzlockeRun).options(joinedload(NuzlockeRun.owner)) if user: user_id = UUID(user.id) # Show public runs OR runs owned by current user query = query.where( (NuzlockeRun.visibility == RunVisibility.PUBLIC) | (NuzlockeRun.owner_id == user_id) ) else: # Anonymous: only public runs query = query.where(NuzlockeRun.visibility == RunVisibility.PUBLIC) query = query.order_by(NuzlockeRun.started_at.desc()) result = await session.execute(query) runs = result.scalars().all() return [_build_run_response(run) for run in runs] @router.get("/{run_id}", response_model=RunDetailResponse) async def get_run( run_id: int, request: Request, session: AsyncSession = Depends(get_session), user: AuthUser | None = Depends(get_current_user), ): result = await session.execute( select(NuzlockeRun) .where(NuzlockeRun.id == run_id) .options( joinedload(NuzlockeRun.game), joinedload(NuzlockeRun.owner), selectinload(NuzlockeRun.encounters).joinedload(Encounter.pokemon), selectinload(NuzlockeRun.encounters).joinedload(Encounter.current_pokemon), selectinload(NuzlockeRun.encounters).joinedload(Encounter.route), ) ) run = result.scalar_one_or_none() if run is None: raise HTTPException(status_code=404, detail="Run not found") # Check visibility access _check_run_read_access(run, user) # Check if this run belongs to a genlocke genlocke_context = None leg_result = await session.execute( select(GenlockeLeg) .where(GenlockeLeg.run_id == run_id) .options(joinedload(GenlockeLeg.genlocke)) ) leg = leg_result.scalar_one_or_none() if leg: total_legs_result = await session.execute( select(func.count()) .select_from(GenlockeLeg) .where(GenlockeLeg.genlocke_id == leg.genlocke_id) ) total_legs = total_legs_result.scalar_one() # Aggregate retired Pokemon IDs from prior legs (retireHoF rule) retired_pokemon_ids: list[int] = [] if leg.genlocke.genlocke_rules.get("retireHoF", False) and leg.leg_order > 1: prior_result = await session.execute( select(GenlockeLeg.retired_pokemon_ids).where( GenlockeLeg.genlocke_id == leg.genlocke_id, GenlockeLeg.leg_order < leg.leg_order, GenlockeLeg.retired_pokemon_ids.isnot(None), ) ) cumulative: set[int] = set() for (ids,) in prior_result: cumulative.update(ids) retired_pokemon_ids = sorted(cumulative) genlocke_context = RunGenlockeContext( genlocke_id=leg.genlocke_id, genlocke_name=leg.genlocke.name, leg_order=leg.leg_order, total_legs=total_legs, is_final_leg=leg.leg_order == total_legs, retired_pokemon_ids=retired_pokemon_ids, ) # Load transfer-target encounter IDs for this run transfer_ids_result = await session.execute( select(GenlockeTransfer.target_encounter_id).where( GenlockeTransfer.target_encounter_id.in_( select(Encounter.id).where(Encounter.run_id == run_id) ) ) ) transfer_encounter_ids = [row[0] for row in transfer_ids_result] response = RunDetailResponse.model_validate(run) response.genlocke = genlocke_context response.transfer_encounter_ids = transfer_encounter_ids return response @router.patch("/{run_id}", response_model=RunResponse) async def update_run( run_id: int, data: RunUpdate, session: AsyncSession = Depends(get_session), user: AuthUser = Depends(require_auth), ): result = await session.execute( select(NuzlockeRun) .where(NuzlockeRun.id == run_id) .options(joinedload(NuzlockeRun.owner)) ) run = result.scalar_one_or_none() if run is None: raise HTTPException(status_code=404, detail="Run not found") require_run_owner(run, user) update_data = data.model_dump(exclude_unset=True) # Validate hof_encounter_ids if provided if ( "hof_encounter_ids" in update_data and update_data["hof_encounter_ids"] is not None ): hof_ids = update_data["hof_encounter_ids"] if len(hof_ids) > 6: raise HTTPException( status_code=400, detail="HoF team cannot have more than 6 Pokemon" ) if hof_ids: # Validate all encounter IDs belong to this run and are alive enc_result = await session.execute( select(Encounter).where( Encounter.id.in_(hof_ids), Encounter.run_id == run_id, ) ) found = {e.id: e for e in enc_result.scalars().all()} missing = [eid for eid in hof_ids if eid not in found] if missing: raise HTTPException( status_code=400, detail=f"Encounters not found in this run: {missing}", ) not_alive = [ eid for eid, e in found.items() if e.status != "caught" or e.faint_level is not None ] if not_alive: raise HTTPException( status_code=400, detail=f"Encounters are not alive: {not_alive}", ) # Auto-set completed_at when ending a run if "status" in update_data and update_data["status"] in ("completed", "failed"): if run.status != "active": raise HTTPException(status_code=400, detail="Only active runs can be ended") update_data["completed_at"] = datetime.now(UTC) # Block reactivating a completed/failed run that belongs to a genlocke if ( "status" in update_data and update_data["status"] == "active" and run.status != "active" ): leg_result = await session.execute( select(GenlockeLeg).where(GenlockeLeg.run_id == run_id) ) if leg_result.scalar_one_or_none() is not None: raise HTTPException( status_code=400, detail="Cannot reactivate a genlocke-linked run. The genlocke controls leg progression.", ) for field, value in update_data.items(): setattr(run, field, value) # Genlocke side effects when run status changes if "status" in update_data and update_data["status"] in ("completed", "failed"): leg_result = await session.execute( select(GenlockeLeg) .where(GenlockeLeg.run_id == run_id) .options(joinedload(GenlockeLeg.genlocke)) ) leg = leg_result.scalar_one_or_none() if leg: genlocke = leg.genlocke if update_data["status"] == "failed": genlocke.status = "failed" elif update_data["status"] == "completed": total_legs_result = await session.execute( select(func.count()) .select_from(GenlockeLeg) .where(GenlockeLeg.genlocke_id == genlocke.id) ) total_legs = total_legs_result.scalar_one() if leg.leg_order == total_legs: genlocke.status = "completed" await session.commit() # Reload with owner relationship result = await session.execute( select(NuzlockeRun) .where(NuzlockeRun.id == run.id) .options(joinedload(NuzlockeRun.owner)) ) run = result.scalar_one() return _build_run_response(run) @router.delete("/{run_id}", status_code=204) async def delete_run( run_id: int, session: AsyncSession = Depends(get_session), user: AuthUser = Depends(require_auth), ): run = await session.get(NuzlockeRun, run_id) if run is None: raise HTTPException(status_code=404, detail="Run not found") require_run_owner(run, user) # Block deletion if run is linked to a genlocke leg leg_result = await session.execute( select(GenlockeLeg).where(GenlockeLeg.run_id == run_id) ) if leg_result.scalar_one_or_none() is not None: raise HTTPException( status_code=400, detail="Cannot delete a run that belongs to a genlocke. Remove the leg or delete the genlocke first.", ) # Delete associated boss results first boss_results = await session.execute( select(BossResult).where(BossResult.run_id == run_id) ) for br in boss_results.scalars(): await session.delete(br) # Delete genlocke transfers referencing this run's encounters encounter_ids_result = await session.execute( select(Encounter.id).where(Encounter.run_id == run_id) ) encounter_ids = [row[0] for row in encounter_ids_result] if encounter_ids: transfers = await session.execute( select(GenlockeTransfer).where( GenlockeTransfer.source_encounter_id.in_(encounter_ids) | GenlockeTransfer.target_encounter_id.in_(encounter_ids) ) ) for t in transfers.scalars(): await session.delete(t) # Delete associated encounters encounters = await session.execute( select(Encounter).where(Encounter.run_id == run_id) ) for enc in encounters.scalars(): await session.delete(enc) # Unlink from any genlocke leg leg_result = await session.execute( select(GenlockeLeg).where(GenlockeLeg.run_id == run_id) ) for leg in leg_result.scalars(): leg.run_id = None await session.delete(run) await session.commit() return Response(status_code=204)