Files
nuzlocke-tracker/backend/src/app/api/genlockes.py
Julian Tabel a81a17c485 Add genlocke admin panel with CRUD endpoints and UI
Backend: PATCH/DELETE genlocke, POST/DELETE legs with order
re-numbering. Frontend: admin list page with status filter,
detail page with inline editing, legs table, and stats display.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
2026-02-09 10:51:47 +01:00

545 lines
16 KiB
Python

from fastapi import APIRouter, Depends, HTTPException
from pydantic import BaseModel
from sqlalchemy import delete as sa_delete, func, select, update as sa_update
from sqlalchemy.ext.asyncio import AsyncSession
from sqlalchemy.orm import selectinload
from app.core.database import get_session
from app.models.encounter import Encounter
from app.models.evolution import Evolution
from app.models.game import Game
from app.models.genlocke import Genlocke, GenlockeLeg
from app.models.nuzlocke_run import NuzlockeRun
from app.models.pokemon import Pokemon
from app.schemas.genlocke import (
AddLegRequest,
GenlockeCreate,
GenlockeDetailResponse,
GenlockeLegDetailResponse,
GenlockeListItem,
GenlockeResponse,
GenlockeStatsResponse,
GenlockeUpdate,
RetiredPokemonResponse,
)
from app.services.families import build_families
router = APIRouter()
@router.get("", response_model=list[GenlockeListItem])
async def list_genlockes(session: AsyncSession = Depends(get_session)):
result = await session.execute(
select(Genlocke)
.options(
selectinload(Genlocke.legs).selectinload(GenlockeLeg.run),
)
.order_by(Genlocke.created_at.desc())
)
genlockes = result.scalars().all()
items = []
for g in genlockes:
completed_legs = 0
current_leg_order = None
for leg in g.legs:
if leg.run and leg.run.status == "completed":
completed_legs += 1
elif leg.run and leg.run.status == "active":
current_leg_order = leg.leg_order
items.append(
GenlockeListItem(
id=g.id,
name=g.name,
status=g.status,
created_at=g.created_at,
total_legs=len(g.legs),
completed_legs=completed_legs,
current_leg_order=current_leg_order,
)
)
return items
@router.get("/{genlocke_id}", response_model=GenlockeDetailResponse)
async def get_genlocke(
genlocke_id: int, session: AsyncSession = Depends(get_session)
):
result = await session.execute(
select(Genlocke)
.where(Genlocke.id == genlocke_id)
.options(
selectinload(Genlocke.legs).selectinload(GenlockeLeg.game),
selectinload(Genlocke.legs).selectinload(GenlockeLeg.run),
)
)
genlocke = result.scalar_one_or_none()
if genlocke is None:
raise HTTPException(status_code=404, detail="Genlocke not found")
# Collect run IDs for aggregate query
run_ids = [leg.run_id for leg in genlocke.legs if leg.run_id is not None]
stats_by_run: dict[int, tuple[int, int]] = {}
if run_ids:
stats_result = await session.execute(
select(
Encounter.run_id,
func.count().label("encounter_count"),
func.count(Encounter.faint_level).label("death_count"),
)
.where(Encounter.run_id.in_(run_ids))
.group_by(Encounter.run_id)
)
for row in stats_result:
stats_by_run[row.run_id] = (row.encounter_count, row.death_count)
legs = []
total_encounters = 0
total_deaths = 0
legs_completed = 0
for leg in genlocke.legs:
run_status = leg.run.status if leg.run else None
enc_count, death_count = stats_by_run.get(leg.run_id, (0, 0)) if leg.run_id else (0, 0)
total_encounters += enc_count
total_deaths += death_count
if run_status == "completed":
legs_completed += 1
legs.append(
GenlockeLegDetailResponse(
id=leg.id,
leg_order=leg.leg_order,
game=leg.game,
run_id=leg.run_id,
run_status=run_status,
encounter_count=enc_count,
death_count=death_count,
retired_pokemon_ids=leg.retired_pokemon_ids,
)
)
# Fetch retired Pokemon data
retired_pokemon: dict[int, RetiredPokemonResponse] = {}
all_retired_ids: set[int] = set()
for leg in genlocke.legs:
if leg.retired_pokemon_ids:
all_retired_ids.update(leg.retired_pokemon_ids)
if all_retired_ids:
pokemon_result = await session.execute(
select(Pokemon).where(Pokemon.id.in_(all_retired_ids))
)
for p in pokemon_result.scalars().all():
retired_pokemon[p.id] = RetiredPokemonResponse(
id=p.id, name=p.name, sprite_url=p.sprite_url
)
return GenlockeDetailResponse(
id=genlocke.id,
name=genlocke.name,
status=genlocke.status,
genlocke_rules=genlocke.genlocke_rules,
nuzlocke_rules=genlocke.nuzlocke_rules,
created_at=genlocke.created_at,
legs=legs,
stats=GenlockeStatsResponse(
total_encounters=total_encounters,
total_deaths=total_deaths,
legs_completed=legs_completed,
total_legs=len(genlocke.legs),
),
retired_pokemon=retired_pokemon,
)
@router.post("", response_model=GenlockeResponse, status_code=201)
async def create_genlocke(
data: GenlockeCreate, session: AsyncSession = Depends(get_session)
):
if not data.game_ids:
raise HTTPException(status_code=400, detail="At least one game is required")
if not data.name.strip():
raise HTTPException(status_code=400, detail="Name is required")
# Validate all game_ids exist
result = await session.execute(
select(Game).where(Game.id.in_(data.game_ids))
)
found_games = {g.id: g for g in result.scalars().all()}
missing = [gid for gid in data.game_ids if gid not in found_games]
if missing:
raise HTTPException(
status_code=404, detail=f"Games not found: {missing}"
)
# Create genlocke
genlocke = Genlocke(
name=data.name.strip(),
status="active",
genlocke_rules=data.genlocke_rules,
nuzlocke_rules=data.nuzlocke_rules,
)
session.add(genlocke)
await session.flush() # get genlocke.id
# Create legs
legs = []
for i, game_id in enumerate(data.game_ids, start=1):
leg = GenlockeLeg(
genlocke_id=genlocke.id,
game_id=game_id,
leg_order=i,
)
session.add(leg)
legs.append(leg)
# Create the first run
first_game = found_games[data.game_ids[0]]
first_run = NuzlockeRun(
game_id=first_game.id,
name=f"{data.name.strip()} \u2014 Leg 1",
status="active",
rules=data.nuzlocke_rules,
)
session.add(first_run)
await session.flush() # get first_run.id
# Link first leg to the run
legs[0].run_id = first_run.id
await session.commit()
# Reload with relationships
result = await session.execute(
select(Genlocke)
.where(Genlocke.id == genlocke.id)
.options(
selectinload(Genlocke.legs).selectinload(GenlockeLeg.game),
)
)
return result.scalar_one()
@router.post(
"/{genlocke_id}/legs/{leg_order}/advance",
response_model=GenlockeResponse,
)
async def advance_leg(
genlocke_id: int,
leg_order: int,
session: AsyncSession = Depends(get_session),
):
# Load genlocke with legs
result = await session.execute(
select(Genlocke)
.where(Genlocke.id == genlocke_id)
.options(
selectinload(Genlocke.legs).selectinload(GenlockeLeg.game),
)
)
genlocke = result.scalar_one_or_none()
if genlocke is None:
raise HTTPException(status_code=404, detail="Genlocke not found")
if genlocke.status != "active":
raise HTTPException(
status_code=400, detail="Genlocke is not active"
)
# Find the current leg
current_leg = None
next_leg = None
for leg in genlocke.legs:
if leg.leg_order == leg_order:
current_leg = leg
elif leg.leg_order == leg_order + 1:
next_leg = leg
if current_leg is None:
raise HTTPException(status_code=404, detail="Leg not found")
# Verify current leg's run is completed
if current_leg.run_id is None:
raise HTTPException(
status_code=400, detail="Current leg has no run"
)
current_run = await session.get(NuzlockeRun, current_leg.run_id)
if current_run is None or current_run.status != "completed":
raise HTTPException(
status_code=400, detail="Current leg's run is not completed"
)
if next_leg is None:
raise HTTPException(
status_code=400, detail="No next leg to advance to"
)
if next_leg.run_id is not None:
raise HTTPException(
status_code=400, detail="Next leg already has a run"
)
# Compute retired Pokemon families if retireHoF is enabled
if genlocke.genlocke_rules.get("retireHoF", False):
# Prefer the player's HoF team selection; fall back to all alive
if current_run.hof_encounter_ids:
survivors_result = await session.execute(
select(Encounter.pokemon_id).where(
Encounter.id.in_(current_run.hof_encounter_ids),
)
)
else:
# Fallback: all surviving caught, non-shiny Pokemon
survivors_result = await session.execute(
select(Encounter.pokemon_id).where(
Encounter.run_id == current_leg.run_id,
Encounter.status == "caught",
Encounter.faint_level.is_(None),
Encounter.is_shiny.is_(False),
)
)
survivor_ids = [row[0] for row in survivors_result]
if survivor_ids:
# Build family map from evolution data
evo_result = await session.execute(select(Evolution))
evolutions = evo_result.scalars().all()
pokemon_to_family = build_families(evolutions)
# Collect all family members of surviving Pokemon
retired: set[int] = set()
for pid in survivor_ids:
retired.add(pid)
for member in pokemon_to_family.get(pid, []):
retired.add(member)
current_leg.retired_pokemon_ids = sorted(retired)
else:
current_leg.retired_pokemon_ids = []
# Create a new run for the next leg
new_run = NuzlockeRun(
game_id=next_leg.game_id,
name=f"{genlocke.name} \u2014 Leg {next_leg.leg_order}",
status="active",
rules=genlocke.nuzlocke_rules,
)
session.add(new_run)
await session.flush()
next_leg.run_id = new_run.id
await session.commit()
# Reload with relationships
result = await session.execute(
select(Genlocke)
.where(Genlocke.id == genlocke_id)
.options(
selectinload(Genlocke.legs).selectinload(GenlockeLeg.game),
)
)
return result.scalar_one()
class RetiredLegResponse(BaseModel):
leg_order: int
retired_pokemon_ids: list[int]
class Config:
from_attributes = True
class RetiredFamiliesResponse(BaseModel):
retired_pokemon_ids: list[int]
by_leg: list[RetiredLegResponse]
@router.get(
"/{genlocke_id}/retired-families",
response_model=RetiredFamiliesResponse,
)
async def get_retired_families(
genlocke_id: int,
session: AsyncSession = Depends(get_session),
):
# Verify genlocke exists
genlocke = await session.get(Genlocke, genlocke_id)
if genlocke is None:
raise HTTPException(status_code=404, detail="Genlocke not found")
# Query all legs with retired_pokemon_ids
result = await session.execute(
select(GenlockeLeg)
.where(
GenlockeLeg.genlocke_id == genlocke_id,
GenlockeLeg.retired_pokemon_ids.isnot(None),
)
.order_by(GenlockeLeg.leg_order)
)
legs = result.scalars().all()
cumulative: set[int] = set()
by_leg: list[RetiredLegResponse] = []
for leg in legs:
ids = leg.retired_pokemon_ids or []
cumulative.update(ids)
by_leg.append(RetiredLegResponse(
leg_order=leg.leg_order,
retired_pokemon_ids=ids,
))
return RetiredFamiliesResponse(
retired_pokemon_ids=sorted(cumulative),
by_leg=by_leg,
)
@router.patch("/{genlocke_id}", response_model=GenlockeResponse)
async def update_genlocke(
genlocke_id: int,
data: GenlockeUpdate,
session: AsyncSession = Depends(get_session),
):
result = await session.execute(
select(Genlocke)
.where(Genlocke.id == genlocke_id)
.options(
selectinload(Genlocke.legs).selectinload(GenlockeLeg.game),
)
)
genlocke = result.scalar_one_or_none()
if genlocke is None:
raise HTTPException(status_code=404, detail="Genlocke not found")
update_data = data.model_dump(exclude_unset=True)
if "status" in update_data:
if update_data["status"] not in ("active", "completed", "failed"):
raise HTTPException(
status_code=400,
detail="Status must be one of: active, completed, failed",
)
for field, value in update_data.items():
setattr(genlocke, field, value)
await session.commit()
await session.refresh(genlocke)
return genlocke
@router.delete("/{genlocke_id}", status_code=204)
async def delete_genlocke(
genlocke_id: int,
session: AsyncSession = Depends(get_session),
):
genlocke = await session.get(Genlocke, genlocke_id)
if genlocke is None:
raise HTTPException(status_code=404, detail="Genlocke not found")
# Unlink runs from legs so runs are preserved
await session.execute(
sa_update(GenlockeLeg)
.where(GenlockeLeg.genlocke_id == genlocke_id)
.values(run_id=None)
)
# Delete legs explicitly to avoid ORM cascade issues
# (genlocke_id is non-nullable, so SQLAlchemy can't nullify it)
await session.execute(
sa_delete(GenlockeLeg)
.where(GenlockeLeg.genlocke_id == genlocke_id)
)
await session.delete(genlocke)
await session.commit()
@router.post(
"/{genlocke_id}/legs",
response_model=GenlockeResponse,
status_code=201,
)
async def add_leg(
genlocke_id: int,
data: AddLegRequest,
session: AsyncSession = Depends(get_session),
):
genlocke = await session.get(Genlocke, genlocke_id)
if genlocke is None:
raise HTTPException(status_code=404, detail="Genlocke not found")
# Validate game exists
game = await session.get(Game, data.game_id)
if game is None:
raise HTTPException(status_code=404, detail="Game not found")
# Find max leg_order
max_order_result = await session.execute(
select(func.max(GenlockeLeg.leg_order)).where(
GenlockeLeg.genlocke_id == genlocke_id
)
)
max_order = max_order_result.scalar() or 0
leg = GenlockeLeg(
genlocke_id=genlocke_id,
game_id=data.game_id,
leg_order=max_order + 1,
)
session.add(leg)
await session.commit()
# Reload with relationships
result = await session.execute(
select(Genlocke)
.where(Genlocke.id == genlocke_id)
.options(
selectinload(Genlocke.legs).selectinload(GenlockeLeg.game),
)
)
return result.scalar_one()
@router.delete("/{genlocke_id}/legs/{leg_id}", status_code=204)
async def remove_leg(
genlocke_id: int,
leg_id: int,
session: AsyncSession = Depends(get_session),
):
result = await session.execute(
select(GenlockeLeg).where(
GenlockeLeg.id == leg_id,
GenlockeLeg.genlocke_id == genlocke_id,
)
)
leg = result.scalar_one_or_none()
if leg is None:
raise HTTPException(status_code=404, detail="Leg not found")
if leg.run_id is not None:
raise HTTPException(
status_code=400,
detail="Cannot remove a leg that has a linked run. Delete or unlink the run first.",
)
removed_order = leg.leg_order
await session.delete(leg)
# Re-number remaining legs to keep leg_order contiguous
remaining_result = await session.execute(
select(GenlockeLeg)
.where(
GenlockeLeg.genlocke_id == genlocke_id,
GenlockeLeg.leg_order > removed_order,
)
.order_by(GenlockeLeg.leg_order)
)
for remaining_leg in remaining_result.scalars().all():
remaining_leg.leg_order -= 1
await session.commit()