Implement Retire HoF (Gauntlet) rule enforcement for genlockes

When retireHoF is enabled, surviving HoF Pokemon and their evolutionary
families are retired at leg advancement and treated as duplicates in all
subsequent legs — both in the encounter modal and bulk randomize.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
This commit is contained in:
Julian Tabel
2026-02-09 10:05:03 +01:00
parent 3ff132f284
commit 48b56f9360
12 changed files with 218 additions and 40 deletions

View File

@@ -0,0 +1,30 @@
"""add retired_pokemon_ids to genlocke_legs
Revision ID: c3d4e5f6a7b9
Revises: b2c3d4e5f6a8
Create Date: 2026-02-09 18:00:00.000000
"""
from typing import Sequence, Union
from alembic import op
import sqlalchemy as sa
from sqlalchemy.dialects.postgresql import JSONB
# revision identifiers, used by Alembic.
revision: str = 'c3d4e5f6a7b9'
down_revision: Union[str, Sequence[str], None] = 'b2c3d4e5f6a8'
branch_labels: Union[str, Sequence[str], None] = None
depends_on: Union[str, Sequence[str], None] = None
def upgrade() -> None:
op.add_column(
'genlocke_legs',
sa.Column('retired_pokemon_ids', JSONB(), nullable=True),
)
def downgrade() -> None:
op.drop_column('genlocke_legs', 'retired_pokemon_ids')

View File

@@ -1,5 +1,4 @@
import random
from collections import deque
from fastapi import APIRouter, Depends, HTTPException, Response
from sqlalchemy import select
@@ -9,6 +8,7 @@ from sqlalchemy.orm import joinedload, selectinload
from app.core.database import get_session
from app.models.encounter import Encounter
from app.models.evolution import Evolution
from app.models.genlocke import GenlockeLeg
from app.models.nuzlocke_run import NuzlockeRun
from app.models.pokemon import Pokemon
from app.models.route import Route
@@ -20,6 +20,7 @@ from app.schemas.encounter import (
EncounterResponse,
EncounterUpdate,
)
from app.services.families import build_families
router = APIRouter()
@@ -159,34 +160,6 @@ async def delete_encounter(
return Response(status_code=204)
def _build_families(evolutions: list[Evolution]) -> dict[int, list[int]]:
"""Build pokemon_id → family members mapping using BFS on evolution graph."""
adj: dict[int, set[int]] = {}
for evo in evolutions:
adj.setdefault(evo.from_pokemon_id, set()).add(evo.to_pokemon_id)
adj.setdefault(evo.to_pokemon_id, set()).add(evo.from_pokemon_id)
visited: set[int] = set()
pokemon_to_family: dict[int, list[int]] = {}
for node in adj:
if node in visited:
continue
component: list[int] = []
queue = deque([node])
while queue:
current = queue.popleft()
if current in visited:
continue
visited.add(current)
component.append(current)
for neighbor in adj.get(current, set()):
if neighbor not in visited:
queue.append(neighbor)
for member in component:
pokemon_to_family[member] = component
return pokemon_to_family
@router.post(
"/runs/{run_id}/encounters/bulk-randomize",
response_model=BulkRandomizeResponse,
@@ -247,7 +220,7 @@ async def bulk_randomize_encounters(
if dupes_clause_on:
evo_result = await session.execute(select(Evolution))
evolutions = evo_result.scalars().all()
pokemon_to_family = _build_families(evolutions)
pokemon_to_family = build_families(evolutions)
# 7. Build initial duped set from existing caught encounters
duped: set[int] = set()
@@ -260,6 +233,23 @@ async def bulk_randomize_encounters(
for member in family:
duped.add(member)
# Seed duped set with retired Pokemon IDs from prior genlocke legs
leg_result = await session.execute(
select(GenlockeLeg).where(GenlockeLeg.run_id == run_id)
)
leg = leg_result.scalar_one_or_none()
if leg:
genlocke_result = await session.execute(
select(GenlockeLeg.retired_pokemon_ids)
.where(
GenlockeLeg.genlocke_id == leg.genlocke_id,
GenlockeLeg.leg_order < leg.leg_order,
GenlockeLeg.retired_pokemon_ids.isnot(None),
)
)
for (retired_ids,) in genlocke_result:
duped.update(retired_ids)
# 8. Organize routes: identify top-level and children
routes_by_id = {r.id: r for r in all_routes}
top_level = [r for r in all_routes if r.parent_route_id is None]

View File

@@ -1,13 +1,17 @@
from fastapi import APIRouter, Depends, HTTPException
from pydantic import BaseModel
from sqlalchemy import select
from sqlalchemy.ext.asyncio import AsyncSession
from sqlalchemy.orm import selectinload
from app.core.database import get_session
from app.models.encounter import Encounter
from app.models.evolution import Evolution
from app.models.game import Game
from app.models.genlocke import Genlocke, GenlockeLeg
from app.models.nuzlocke_run import NuzlockeRun
from app.schemas.genlocke import GenlockeCreate, GenlockeResponse
from app.services.families import build_families
router = APIRouter()
@@ -140,6 +144,37 @@ async def advance_leg(
status_code=400, detail="Next leg already has a run"
)
# Compute retired Pokemon families if retireHoF is enabled
if genlocke.genlocke_rules.get("retireHoF", False):
# Query surviving caught Pokemon from the completed run
# "Surviving HoF" = caught, not fainted, not shiny
survivors_result = await session.execute(
select(Encounter.pokemon_id).where(
Encounter.run_id == current_leg.run_id,
Encounter.status == "caught",
Encounter.faint_level.is_(None),
Encounter.is_shiny.is_(False),
)
)
survivor_ids = [row[0] for row in survivors_result]
if survivor_ids:
# Build family map from evolution data
evo_result = await session.execute(select(Evolution))
evolutions = evo_result.scalars().all()
pokemon_to_family = build_families(evolutions)
# Collect all family members of surviving Pokemon
retired: set[int] = set()
for pid in survivor_ids:
retired.add(pid)
for member in pokemon_to_family.get(pid, []):
retired.add(member)
current_leg.retired_pokemon_ids = sorted(retired)
else:
current_leg.retired_pokemon_ids = []
# Create a new run for the next leg
new_run = NuzlockeRun(
game_id=next_leg.game_id,
@@ -162,3 +197,56 @@ async def advance_leg(
)
)
return result.scalar_one()
class RetiredLegResponse(BaseModel):
leg_order: int
retired_pokemon_ids: list[int]
class Config:
from_attributes = True
class RetiredFamiliesResponse(BaseModel):
retired_pokemon_ids: list[int]
by_leg: list[RetiredLegResponse]
@router.get(
"/{genlocke_id}/retired-families",
response_model=RetiredFamiliesResponse,
)
async def get_retired_families(
genlocke_id: int,
session: AsyncSession = Depends(get_session),
):
# Verify genlocke exists
genlocke = await session.get(Genlocke, genlocke_id)
if genlocke is None:
raise HTTPException(status_code=404, detail="Genlocke not found")
# Query all legs with retired_pokemon_ids
result = await session.execute(
select(GenlockeLeg)
.where(
GenlockeLeg.genlocke_id == genlocke_id,
GenlockeLeg.retired_pokemon_ids.isnot(None),
)
.order_by(GenlockeLeg.leg_order)
)
legs = result.scalars().all()
cumulative: set[int] = set()
by_leg: list[RetiredLegResponse] = []
for leg in legs:
ids = leg.retired_pokemon_ids or []
cumulative.update(ids)
by_leg.append(RetiredLegResponse(
leg_order=leg.leg_order,
retired_pokemon_ids=ids,
))
return RetiredFamiliesResponse(
retired_pokemon_ids=sorted(cumulative),
by_leg=by_leg,
)

View File

@@ -78,12 +78,29 @@ async def get_run(run_id: int, session: AsyncSession = Depends(get_session)):
.where(GenlockeLeg.genlocke_id == leg.genlocke_id)
)
total_legs = total_legs_result.scalar_one()
# Aggregate retired Pokemon IDs from prior legs (retireHoF rule)
retired_pokemon_ids: list[int] = []
if leg.genlocke.genlocke_rules.get("retireHoF", False) and leg.leg_order > 1:
prior_result = await session.execute(
select(GenlockeLeg.retired_pokemon_ids).where(
GenlockeLeg.genlocke_id == leg.genlocke_id,
GenlockeLeg.leg_order < leg.leg_order,
GenlockeLeg.retired_pokemon_ids.isnot(None),
)
)
cumulative: set[int] = set()
for (ids,) in prior_result:
cumulative.update(ids)
retired_pokemon_ids = sorted(cumulative)
genlocke_context = RunGenlockeContext(
genlocke_id=leg.genlocke_id,
genlocke_name=leg.genlocke.name,
leg_order=leg.leg_order,
total_legs=total_legs,
is_final_leg=leg.leg_order == total_legs,
retired_pokemon_ids=retired_pokemon_ids,
)
response = RunDetailResponse.model_validate(run)

View File

@@ -40,6 +40,7 @@ class GenlockeLeg(Base):
ForeignKey("nuzlocke_runs.id"), index=True
)
leg_order: Mapped[int] = mapped_column(SmallInteger)
retired_pokemon_ids: Mapped[list[int] | None] = mapped_column(JSONB, default=None)
genlocke: Mapped["Genlocke"] = relationship(back_populates="legs")
game: Mapped["Game"] = relationship()

View File

@@ -33,6 +33,7 @@ class RunGenlockeContext(CamelModel):
leg_order: int
total_legs: int
is_final_leg: bool
retired_pokemon_ids: list[int] = []
class RunDetailResponse(RunResponse):

View File

@@ -0,0 +1,31 @@
from collections import deque
from app.models.evolution import Evolution
def build_families(evolutions: list[Evolution]) -> dict[int, list[int]]:
"""Build pokemon_id -> family members mapping using BFS on evolution graph."""
adj: dict[int, set[int]] = {}
for evo in evolutions:
adj.setdefault(evo.from_pokemon_id, set()).add(evo.to_pokemon_id)
adj.setdefault(evo.to_pokemon_id, set()).add(evo.from_pokemon_id)
visited: set[int] = set()
pokemon_to_family: dict[int, list[int]] = {}
for node in adj:
if node in visited:
continue
component: list[int] = []
queue = deque([node])
while queue:
current = queue.popleft()
if current in visited:
continue
visited.add(current)
component.append(current)
for neighbor in adj.get(current, set()):
if neighbor not in visited:
queue.append(neighbor)
for member in component:
pokemon_to_family[member] = component
return pokemon_to_family