Files
nuzlocke-tracker/backend/src/app/api/genlockes.py
Julian Tabel d3b65e3c79 Add genlocke lineage tracking with aligned timeline view
Implement read-only lineage view that traces Pokemon across genlocke legs
via existing transfer records. Backend walks transfer chains to build
lineage entries; frontend renders them as cards with a column-aligned
timeline grid so leg dots line up vertically across all lineages.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
2026-02-09 11:58:38 +01:00

966 lines
30 KiB
Python

from fastapi import APIRouter, Depends, HTTPException
from pydantic import BaseModel
from sqlalchemy import delete as sa_delete, func, select, update as sa_update
from sqlalchemy.ext.asyncio import AsyncSession
from sqlalchemy.orm import selectinload
from app.core.database import get_session
from app.models.encounter import Encounter
from app.models.evolution import Evolution
from app.models.game import Game
from app.models.genlocke import Genlocke, GenlockeLeg
from app.models.nuzlocke_run import NuzlockeRun
from app.models.pokemon import Pokemon
from app.models.genlocke_transfer import GenlockeTransfer
from app.models.route import Route
from app.schemas.genlocke import (
AddLegRequest,
AdvanceLegRequest,
GenlockeCreate,
GenlockeDetailResponse,
GenlockeGraveyardResponse,
GenlockeLegDetailResponse,
GenlockeLineageResponse,
GenlockeListItem,
GenlockeResponse,
GenlockeStatsResponse,
GenlockeUpdate,
GraveyardEntryResponse,
GraveyardLegSummary,
LineageEntry,
LineageLegEntry,
RetiredPokemonResponse,
SurvivorResponse,
)
from app.schemas.pokemon import PokemonResponse
from app.services.families import build_families, resolve_base_form
router = APIRouter()
@router.get("", response_model=list[GenlockeListItem])
async def list_genlockes(session: AsyncSession = Depends(get_session)):
result = await session.execute(
select(Genlocke)
.options(
selectinload(Genlocke.legs).selectinload(GenlockeLeg.run),
)
.order_by(Genlocke.created_at.desc())
)
genlockes = result.scalars().all()
items = []
for g in genlockes:
completed_legs = 0
current_leg_order = None
for leg in g.legs:
if leg.run and leg.run.status == "completed":
completed_legs += 1
elif leg.run and leg.run.status == "active":
current_leg_order = leg.leg_order
items.append(
GenlockeListItem(
id=g.id,
name=g.name,
status=g.status,
created_at=g.created_at,
total_legs=len(g.legs),
completed_legs=completed_legs,
current_leg_order=current_leg_order,
)
)
return items
@router.get("/{genlocke_id}", response_model=GenlockeDetailResponse)
async def get_genlocke(
genlocke_id: int, session: AsyncSession = Depends(get_session)
):
result = await session.execute(
select(Genlocke)
.where(Genlocke.id == genlocke_id)
.options(
selectinload(Genlocke.legs).selectinload(GenlockeLeg.game),
selectinload(Genlocke.legs).selectinload(GenlockeLeg.run),
)
)
genlocke = result.scalar_one_or_none()
if genlocke is None:
raise HTTPException(status_code=404, detail="Genlocke not found")
# Collect run IDs for aggregate query
run_ids = [leg.run_id for leg in genlocke.legs if leg.run_id is not None]
stats_by_run: dict[int, tuple[int, int]] = {}
if run_ids:
stats_result = await session.execute(
select(
Encounter.run_id,
func.count().label("encounter_count"),
func.count(Encounter.faint_level).label("death_count"),
)
.where(Encounter.run_id.in_(run_ids))
.group_by(Encounter.run_id)
)
for row in stats_result:
stats_by_run[row.run_id] = (row.encounter_count, row.death_count)
legs = []
total_encounters = 0
total_deaths = 0
legs_completed = 0
for leg in genlocke.legs:
run_status = leg.run.status if leg.run else None
enc_count, death_count = stats_by_run.get(leg.run_id, (0, 0)) if leg.run_id else (0, 0)
total_encounters += enc_count
total_deaths += death_count
if run_status == "completed":
legs_completed += 1
legs.append(
GenlockeLegDetailResponse(
id=leg.id,
leg_order=leg.leg_order,
game=leg.game,
run_id=leg.run_id,
run_status=run_status,
encounter_count=enc_count,
death_count=death_count,
retired_pokemon_ids=leg.retired_pokemon_ids,
)
)
# Fetch retired Pokemon data
retired_pokemon: dict[int, RetiredPokemonResponse] = {}
all_retired_ids: set[int] = set()
for leg in genlocke.legs:
if leg.retired_pokemon_ids:
all_retired_ids.update(leg.retired_pokemon_ids)
if all_retired_ids:
pokemon_result = await session.execute(
select(Pokemon).where(Pokemon.id.in_(all_retired_ids))
)
for p in pokemon_result.scalars().all():
retired_pokemon[p.id] = RetiredPokemonResponse(
id=p.id, name=p.name, sprite_url=p.sprite_url
)
return GenlockeDetailResponse(
id=genlocke.id,
name=genlocke.name,
status=genlocke.status,
genlocke_rules=genlocke.genlocke_rules,
nuzlocke_rules=genlocke.nuzlocke_rules,
created_at=genlocke.created_at,
legs=legs,
stats=GenlockeStatsResponse(
total_encounters=total_encounters,
total_deaths=total_deaths,
legs_completed=legs_completed,
total_legs=len(genlocke.legs),
),
retired_pokemon=retired_pokemon,
)
@router.get(
"/{genlocke_id}/graveyard",
response_model=GenlockeGraveyardResponse,
)
async def get_genlocke_graveyard(
genlocke_id: int, session: AsyncSession = Depends(get_session)
):
# Load genlocke with legs + game
result = await session.execute(
select(Genlocke)
.where(Genlocke.id == genlocke_id)
.options(
selectinload(Genlocke.legs).selectinload(GenlockeLeg.game),
)
)
genlocke = result.scalar_one_or_none()
if genlocke is None:
raise HTTPException(status_code=404, detail="Genlocke not found")
# Build run_id → (leg_order, game_name) lookup
run_ids = [leg.run_id for leg in genlocke.legs if leg.run_id is not None]
run_lookup: dict[int, tuple[int, str]] = {}
for leg in genlocke.legs:
if leg.run_id is not None:
run_lookup[leg.run_id] = (leg.leg_order, leg.game.name)
if not run_ids:
return GenlockeGraveyardResponse(
entries=[], total_deaths=0, deaths_per_leg=[], deadliest_leg=None
)
# Query all fainted encounters across all legs
enc_result = await session.execute(
select(Encounter)
.where(
Encounter.run_id.in_(run_ids),
Encounter.faint_level.isnot(None),
Encounter.status == "caught",
)
.options(
selectinload(Encounter.pokemon),
selectinload(Encounter.current_pokemon),
selectinload(Encounter.route),
)
)
encounters = enc_result.scalars().all()
# Map to response entries and compute stats
entries: list[GraveyardEntryResponse] = []
deaths_count: dict[int, int] = {} # run_id → count
for enc in encounters:
leg_order, game_name = run_lookup[enc.run_id]
deaths_count[enc.run_id] = deaths_count.get(enc.run_id, 0) + 1
entries.append(
GraveyardEntryResponse(
id=enc.id,
pokemon=PokemonResponse.model_validate(enc.pokemon),
current_pokemon=(
PokemonResponse.model_validate(enc.current_pokemon)
if enc.current_pokemon
else None
),
nickname=enc.nickname,
catch_level=enc.catch_level,
faint_level=enc.faint_level,
death_cause=enc.death_cause,
is_shiny=enc.is_shiny,
route_name=enc.route.name,
leg_order=leg_order,
game_name=game_name,
)
)
# Build per-leg summaries
deaths_per_leg: list[GraveyardLegSummary] = []
for leg in genlocke.legs:
if leg.run_id is not None:
count = deaths_count.get(leg.run_id, 0)
if count > 0:
deaths_per_leg.append(
GraveyardLegSummary(
leg_order=leg.leg_order,
game_name=leg.game.name,
death_count=count,
)
)
deadliest = max(deaths_per_leg, key=lambda s: s.death_count) if deaths_per_leg else None
return GenlockeGraveyardResponse(
entries=entries,
total_deaths=len(entries),
deaths_per_leg=deaths_per_leg,
deadliest_leg=deadliest,
)
@router.get(
"/{genlocke_id}/lineages",
response_model=GenlockeLineageResponse,
)
async def get_genlocke_lineages(
genlocke_id: int, session: AsyncSession = Depends(get_session)
):
# Load genlocke with legs + game
result = await session.execute(
select(Genlocke)
.where(Genlocke.id == genlocke_id)
.options(
selectinload(Genlocke.legs).selectinload(GenlockeLeg.game),
)
)
genlocke = result.scalar_one_or_none()
if genlocke is None:
raise HTTPException(status_code=404, detail="Genlocke not found")
# Query all transfers for this genlocke
transfer_result = await session.execute(
select(GenlockeTransfer).where(
GenlockeTransfer.genlocke_id == genlocke_id
)
)
transfers = transfer_result.scalars().all()
if not transfers:
return GenlockeLineageResponse(lineages=[], total_lineages=0)
# Build forward/backward maps
forward: dict[int, GenlockeTransfer] = {} # source_encounter_id -> transfer
backward: set[int] = set() # target_encounter_ids
for t in transfers:
forward[t.source_encounter_id] = t
backward.add(t.target_encounter_id)
# Find roots: sources that are NOT targets
roots = [t.source_encounter_id for t in transfers if t.source_encounter_id not in backward]
# Deduplicate while preserving order
seen_roots: set[int] = set()
unique_roots: list[int] = []
for r in roots:
if r not in seen_roots:
seen_roots.add(r)
unique_roots.append(r)
# Walk forward from each root to build chains
chains: list[list[int]] = []
for root in unique_roots:
chain = [root]
current = root
while current in forward:
target = forward[current].target_encounter_id
chain.append(target)
current = target
chains.append(chain)
# Batch-load all encounters in the chains
all_encounter_ids: set[int] = set()
for chain in chains:
all_encounter_ids.update(chain)
enc_result = await session.execute(
select(Encounter)
.where(Encounter.id.in_(all_encounter_ids))
.options(
selectinload(Encounter.pokemon),
selectinload(Encounter.current_pokemon),
selectinload(Encounter.route),
)
)
encounter_map: dict[int, Encounter] = {
enc.id: enc for enc in enc_result.scalars().all()
}
# Build run_id -> (leg_order, game_name) lookup
run_lookup: dict[int, tuple[int, str]] = {}
for leg in genlocke.legs:
if leg.run_id is not None:
run_lookup[leg.run_id] = (leg.leg_order, leg.game.name)
# Load HoF encounter IDs for all runs
run_ids = [leg.run_id for leg in genlocke.legs if leg.run_id is not None]
hof_encounter_ids: set[int] = set()
if run_ids:
run_result = await session.execute(
select(NuzlockeRun.id, NuzlockeRun.hof_encounter_ids).where(
NuzlockeRun.id.in_(run_ids)
)
)
for row in run_result:
if row.hof_encounter_ids:
hof_encounter_ids.update(row.hof_encounter_ids)
# Build lineage entries
lineages: list[LineageEntry] = []
for chain in chains:
legs: list[LineageLegEntry] = []
first_pokemon = None
for enc_id in chain:
enc = encounter_map.get(enc_id)
if enc is None:
continue
leg_order, game_name = run_lookup.get(enc.run_id, (0, "Unknown"))
is_alive = enc.faint_level is None and enc.status == "caught"
entered_hof = enc.id in hof_encounter_ids
was_transferred = enc.id in forward
pokemon_resp = PokemonResponse.model_validate(enc.pokemon)
if first_pokemon is None:
first_pokemon = pokemon_resp
current_pokemon_resp = (
PokemonResponse.model_validate(enc.current_pokemon)
if enc.current_pokemon
else None
)
legs.append(
LineageLegEntry(
leg_order=leg_order,
game_name=game_name,
encounter_id=enc.id,
pokemon=pokemon_resp,
current_pokemon=current_pokemon_resp,
nickname=enc.nickname,
catch_level=enc.catch_level,
faint_level=enc.faint_level,
death_cause=enc.death_cause,
is_shiny=enc.is_shiny,
route_name=enc.route.name if enc.route else "Unknown",
is_alive=is_alive,
entered_hof=entered_hof,
was_transferred=was_transferred,
)
)
if not legs or first_pokemon is None:
continue
# Status based on last encounter in the chain
last_leg = legs[-1]
status = "alive" if last_leg.is_alive else "dead"
lineages.append(
LineageEntry(
nickname=legs[0].nickname,
pokemon=first_pokemon,
legs=legs,
status=status,
)
)
# Sort by first leg order, then by encounter ID
lineages.sort(key=lambda l: (l.legs[0].leg_order, l.legs[0].encounter_id))
return GenlockeLineageResponse(
lineages=lineages,
total_lineages=len(lineages),
)
@router.post("", response_model=GenlockeResponse, status_code=201)
async def create_genlocke(
data: GenlockeCreate, session: AsyncSession = Depends(get_session)
):
if not data.game_ids:
raise HTTPException(status_code=400, detail="At least one game is required")
if not data.name.strip():
raise HTTPException(status_code=400, detail="Name is required")
# Validate all game_ids exist
result = await session.execute(
select(Game).where(Game.id.in_(data.game_ids))
)
found_games = {g.id: g for g in result.scalars().all()}
missing = [gid for gid in data.game_ids if gid not in found_games]
if missing:
raise HTTPException(
status_code=404, detail=f"Games not found: {missing}"
)
# Create genlocke
genlocke = Genlocke(
name=data.name.strip(),
status="active",
genlocke_rules=data.genlocke_rules,
nuzlocke_rules=data.nuzlocke_rules,
)
session.add(genlocke)
await session.flush() # get genlocke.id
# Create legs
legs = []
for i, game_id in enumerate(data.game_ids, start=1):
leg = GenlockeLeg(
genlocke_id=genlocke.id,
game_id=game_id,
leg_order=i,
)
session.add(leg)
legs.append(leg)
# Create the first run
first_game = found_games[data.game_ids[0]]
first_run = NuzlockeRun(
game_id=first_game.id,
name=f"{data.name.strip()} \u2014 Leg 1",
status="active",
rules=data.nuzlocke_rules,
)
session.add(first_run)
await session.flush() # get first_run.id
# Link first leg to the run
legs[0].run_id = first_run.id
await session.commit()
# Reload with relationships
result = await session.execute(
select(Genlocke)
.where(Genlocke.id == genlocke.id)
.options(
selectinload(Genlocke.legs).selectinload(GenlockeLeg.game),
)
)
return result.scalar_one()
@router.get(
"/{genlocke_id}/legs/{leg_order}/survivors",
response_model=list[SurvivorResponse],
)
async def get_leg_survivors(
genlocke_id: int,
leg_order: int,
session: AsyncSession = Depends(get_session),
):
# Find the leg
result = await session.execute(
select(GenlockeLeg).where(
GenlockeLeg.genlocke_id == genlocke_id,
GenlockeLeg.leg_order == leg_order,
)
)
leg = result.scalar_one_or_none()
if leg is None:
raise HTTPException(status_code=404, detail="Leg not found")
if leg.run_id is None:
raise HTTPException(status_code=400, detail="Leg has no run")
# Query surviving encounters: caught and alive (no faint_level)
enc_result = await session.execute(
select(Encounter)
.where(
Encounter.run_id == leg.run_id,
Encounter.status == "caught",
Encounter.faint_level.is_(None),
)
.options(
selectinload(Encounter.pokemon),
selectinload(Encounter.current_pokemon),
selectinload(Encounter.route),
)
)
encounters = enc_result.scalars().all()
return [
SurvivorResponse(
id=enc.id,
pokemon=PokemonResponse.model_validate(enc.pokemon),
current_pokemon=(
PokemonResponse.model_validate(enc.current_pokemon)
if enc.current_pokemon
else None
),
nickname=enc.nickname,
catch_level=enc.catch_level,
is_shiny=enc.is_shiny,
route_name=enc.route.name,
)
for enc in encounters
]
@router.post(
"/{genlocke_id}/legs/{leg_order}/advance",
response_model=GenlockeResponse,
)
async def advance_leg(
genlocke_id: int,
leg_order: int,
data: AdvanceLegRequest | None = None,
session: AsyncSession = Depends(get_session),
):
# Load genlocke with legs
result = await session.execute(
select(Genlocke)
.where(Genlocke.id == genlocke_id)
.options(
selectinload(Genlocke.legs).selectinload(GenlockeLeg.game),
)
)
genlocke = result.scalar_one_or_none()
if genlocke is None:
raise HTTPException(status_code=404, detail="Genlocke not found")
if genlocke.status != "active":
raise HTTPException(
status_code=400, detail="Genlocke is not active"
)
# Find the current leg
current_leg = None
next_leg = None
for leg in genlocke.legs:
if leg.leg_order == leg_order:
current_leg = leg
elif leg.leg_order == leg_order + 1:
next_leg = leg
if current_leg is None:
raise HTTPException(status_code=404, detail="Leg not found")
# Verify current leg's run is completed
if current_leg.run_id is None:
raise HTTPException(
status_code=400, detail="Current leg has no run"
)
current_run = await session.get(NuzlockeRun, current_leg.run_id)
if current_run is None or current_run.status != "completed":
raise HTTPException(
status_code=400, detail="Current leg's run is not completed"
)
if next_leg is None:
raise HTTPException(
status_code=400, detail="No next leg to advance to"
)
if next_leg.run_id is not None:
raise HTTPException(
status_code=400, detail="Next leg already has a run"
)
# Compute retired Pokemon families if retireHoF is enabled
if genlocke.genlocke_rules.get("retireHoF", False):
# Prefer the player's HoF team selection; fall back to all alive
if current_run.hof_encounter_ids:
survivors_result = await session.execute(
select(Encounter.pokemon_id).where(
Encounter.id.in_(current_run.hof_encounter_ids),
)
)
else:
# Fallback: all surviving caught, non-shiny Pokemon
survivors_result = await session.execute(
select(Encounter.pokemon_id).where(
Encounter.run_id == current_leg.run_id,
Encounter.status == "caught",
Encounter.faint_level.is_(None),
Encounter.is_shiny.is_(False),
)
)
survivor_ids = [row[0] for row in survivors_result]
if survivor_ids:
# Build family map from evolution data
evo_result = await session.execute(select(Evolution))
evolutions = evo_result.scalars().all()
pokemon_to_family = build_families(evolutions)
# Collect all family members of surviving Pokemon
retired: set[int] = set()
for pid in survivor_ids:
retired.add(pid)
for member in pokemon_to_family.get(pid, []):
retired.add(member)
current_leg.retired_pokemon_ids = sorted(retired)
else:
current_leg.retired_pokemon_ids = []
# Create a new run for the next leg
new_run = NuzlockeRun(
game_id=next_leg.game_id,
name=f"{genlocke.name} \u2014 Leg {next_leg.leg_order}",
status="active",
rules=genlocke.nuzlocke_rules,
)
session.add(new_run)
await session.flush()
next_leg.run_id = new_run.id
# Handle transfers if requested
transfer_ids = data.transfer_encounter_ids if data else []
if transfer_ids:
# Validate all encounter IDs belong to the current leg's run, are caught, and alive
enc_result = await session.execute(
select(Encounter).where(
Encounter.id.in_(transfer_ids),
Encounter.run_id == current_leg.run_id,
Encounter.status == "caught",
Encounter.faint_level.is_(None),
)
)
source_encounters = enc_result.scalars().all()
if len(source_encounters) != len(transfer_ids):
found_ids = {e.id for e in source_encounters}
missing = [eid for eid in transfer_ids if eid not in found_ids]
raise HTTPException(
status_code=400,
detail=f"Invalid transfer encounter IDs: {missing}. Must be alive, caught encounters from the current leg.",
)
# Load evolutions once for base form resolution
evo_result = await session.execute(select(Evolution))
evolutions = evo_result.scalars().all()
# Find the first leaf route in the next leg's game for hatch location
next_game = await session.get(Game, next_leg.game_id)
if next_game is None or next_game.version_group_id is None:
raise HTTPException(
status_code=400,
detail="Next leg's game has no version group configured",
)
route_result = await session.execute(
select(Route)
.where(
Route.version_group_id == next_game.version_group_id,
Route.parent_route_id.is_(None),
)
.options(selectinload(Route.children))
.order_by(Route.order)
)
routes = route_result.scalars().all()
hatch_route = None
for r in routes:
if r.children:
# Pick the first child as the leaf
hatch_route = min(r.children, key=lambda c: c.order)
break
else:
hatch_route = r
break
if hatch_route is None:
raise HTTPException(
status_code=400,
detail="No routes found for the next leg's game. Cannot place transferred Pokemon.",
)
# Create egg encounters and transfer records
for source_enc in source_encounters:
# Resolve base form (breed down)
pokemon_id = source_enc.current_pokemon_id or source_enc.pokemon_id
base_form_id = resolve_base_form(pokemon_id, evolutions)
egg_encounter = Encounter(
run_id=new_run.id,
route_id=hatch_route.id,
pokemon_id=base_form_id,
nickname=source_enc.nickname,
status="caught",
catch_level=1,
is_shiny=source_enc.is_shiny,
)
session.add(egg_encounter)
await session.flush()
transfer = GenlockeTransfer(
genlocke_id=genlocke_id,
source_encounter_id=source_enc.id,
target_encounter_id=egg_encounter.id,
source_leg_order=leg_order,
target_leg_order=next_leg.leg_order,
)
session.add(transfer)
await session.commit()
# Reload with relationships
result = await session.execute(
select(Genlocke)
.where(Genlocke.id == genlocke_id)
.options(
selectinload(Genlocke.legs).selectinload(GenlockeLeg.game),
)
)
return result.scalar_one()
class RetiredLegResponse(BaseModel):
leg_order: int
retired_pokemon_ids: list[int]
class Config:
from_attributes = True
class RetiredFamiliesResponse(BaseModel):
retired_pokemon_ids: list[int]
by_leg: list[RetiredLegResponse]
@router.get(
"/{genlocke_id}/retired-families",
response_model=RetiredFamiliesResponse,
)
async def get_retired_families(
genlocke_id: int,
session: AsyncSession = Depends(get_session),
):
# Verify genlocke exists
genlocke = await session.get(Genlocke, genlocke_id)
if genlocke is None:
raise HTTPException(status_code=404, detail="Genlocke not found")
# Query all legs with retired_pokemon_ids
result = await session.execute(
select(GenlockeLeg)
.where(
GenlockeLeg.genlocke_id == genlocke_id,
GenlockeLeg.retired_pokemon_ids.isnot(None),
)
.order_by(GenlockeLeg.leg_order)
)
legs = result.scalars().all()
cumulative: set[int] = set()
by_leg: list[RetiredLegResponse] = []
for leg in legs:
ids = leg.retired_pokemon_ids or []
cumulative.update(ids)
by_leg.append(RetiredLegResponse(
leg_order=leg.leg_order,
retired_pokemon_ids=ids,
))
return RetiredFamiliesResponse(
retired_pokemon_ids=sorted(cumulative),
by_leg=by_leg,
)
@router.patch("/{genlocke_id}", response_model=GenlockeResponse)
async def update_genlocke(
genlocke_id: int,
data: GenlockeUpdate,
session: AsyncSession = Depends(get_session),
):
result = await session.execute(
select(Genlocke)
.where(Genlocke.id == genlocke_id)
.options(
selectinload(Genlocke.legs).selectinload(GenlockeLeg.game),
)
)
genlocke = result.scalar_one_or_none()
if genlocke is None:
raise HTTPException(status_code=404, detail="Genlocke not found")
update_data = data.model_dump(exclude_unset=True)
if "status" in update_data:
if update_data["status"] not in ("active", "completed", "failed"):
raise HTTPException(
status_code=400,
detail="Status must be one of: active, completed, failed",
)
for field, value in update_data.items():
setattr(genlocke, field, value)
await session.commit()
await session.refresh(genlocke)
return genlocke
@router.delete("/{genlocke_id}", status_code=204)
async def delete_genlocke(
genlocke_id: int,
session: AsyncSession = Depends(get_session),
):
genlocke = await session.get(Genlocke, genlocke_id)
if genlocke is None:
raise HTTPException(status_code=404, detail="Genlocke not found")
# Unlink runs from legs so runs are preserved
await session.execute(
sa_update(GenlockeLeg)
.where(GenlockeLeg.genlocke_id == genlocke_id)
.values(run_id=None)
)
# Delete legs explicitly to avoid ORM cascade issues
# (genlocke_id is non-nullable, so SQLAlchemy can't nullify it)
await session.execute(
sa_delete(GenlockeLeg)
.where(GenlockeLeg.genlocke_id == genlocke_id)
)
await session.delete(genlocke)
await session.commit()
@router.post(
"/{genlocke_id}/legs",
response_model=GenlockeResponse,
status_code=201,
)
async def add_leg(
genlocke_id: int,
data: AddLegRequest,
session: AsyncSession = Depends(get_session),
):
genlocke = await session.get(Genlocke, genlocke_id)
if genlocke is None:
raise HTTPException(status_code=404, detail="Genlocke not found")
# Validate game exists
game = await session.get(Game, data.game_id)
if game is None:
raise HTTPException(status_code=404, detail="Game not found")
# Find max leg_order
max_order_result = await session.execute(
select(func.max(GenlockeLeg.leg_order)).where(
GenlockeLeg.genlocke_id == genlocke_id
)
)
max_order = max_order_result.scalar() or 0
leg = GenlockeLeg(
genlocke_id=genlocke_id,
game_id=data.game_id,
leg_order=max_order + 1,
)
session.add(leg)
await session.commit()
# Reload with relationships
result = await session.execute(
select(Genlocke)
.where(Genlocke.id == genlocke_id)
.options(
selectinload(Genlocke.legs).selectinload(GenlockeLeg.game),
)
)
return result.scalar_one()
@router.delete("/{genlocke_id}/legs/{leg_id}", status_code=204)
async def remove_leg(
genlocke_id: int,
leg_id: int,
session: AsyncSession = Depends(get_session),
):
result = await session.execute(
select(GenlockeLeg).where(
GenlockeLeg.id == leg_id,
GenlockeLeg.genlocke_id == genlocke_id,
)
)
leg = result.scalar_one_or_none()
if leg is None:
raise HTTPException(status_code=404, detail="Leg not found")
if leg.run_id is not None:
raise HTTPException(
status_code=400,
detail="Cannot remove a leg that has a linked run. Delete or unlink the run first.",
)
removed_order = leg.leg_order
await session.delete(leg)
# Re-number remaining legs to keep leg_order contiguous
remaining_result = await session.execute(
select(GenlockeLeg)
.where(
GenlockeLeg.genlocke_id == genlocke_id,
GenlockeLeg.leg_order > removed_order,
)
.order_by(GenlockeLeg.leg_order)
)
for remaining_leg in remaining_result.scalars().all():
remaining_leg.leg_order -= 1
await session.commit()