Backend: auto-fix and format all ruff issues, manually fix B904/B023/ SIM117/B007/E741/F841 errors, suppress B008 (FastAPI Depends) and F821 (SQLAlchemy forward refs) in config. Frontend: allow constant exports, disable React compiler-specific rules (set-state-in-effect, preserve-manual-memoization). Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
481 lines
15 KiB
Python
481 lines
15 KiB
Python
from fastapi import APIRouter, Depends, HTTPException, Query
|
|
from sqlalchemy import func, or_, select
|
|
from sqlalchemy.ext.asyncio import AsyncSession
|
|
from sqlalchemy.orm import joinedload, selectinload
|
|
|
|
from app.core.database import get_session
|
|
from app.models.evolution import Evolution
|
|
from app.models.pokemon import Pokemon
|
|
from app.models.route import Route
|
|
from app.models.route_encounter import RouteEncounter
|
|
from app.schemas.pokemon import (
|
|
BulkImportItem,
|
|
BulkImportResult,
|
|
EvolutionAdminResponse,
|
|
EvolutionResponse,
|
|
FamiliesResponse,
|
|
PaginatedPokemonResponse,
|
|
PokemonCreate,
|
|
PokemonEncounterLocationItem,
|
|
PokemonEncounterLocationResponse,
|
|
PokemonResponse,
|
|
PokemonUpdate,
|
|
RouteEncounterCreate,
|
|
RouteEncounterDetailResponse,
|
|
RouteEncounterUpdate,
|
|
)
|
|
|
|
router = APIRouter()
|
|
|
|
|
|
@router.get("/pokemon", response_model=PaginatedPokemonResponse)
|
|
async def list_pokemon(
|
|
search: str | None = Query(None),
|
|
type: str | None = Query(None),
|
|
limit: int = Query(50, ge=1, le=500),
|
|
offset: int = Query(0, ge=0),
|
|
session: AsyncSession = Depends(get_session),
|
|
):
|
|
# Build base query with optional search filter
|
|
base_query = select(Pokemon)
|
|
if search:
|
|
base_query = base_query.where(func.lower(Pokemon.name).contains(search.lower()))
|
|
if type:
|
|
base_query = base_query.where(Pokemon.types.any(type))
|
|
|
|
# Get total count
|
|
count_query = select(func.count()).select_from(base_query.subquery())
|
|
total = (await session.execute(count_query)).scalar() or 0
|
|
|
|
# Get paginated items
|
|
items_query = (
|
|
base_query.order_by(Pokemon.national_dex, Pokemon.name)
|
|
.offset(offset)
|
|
.limit(limit)
|
|
)
|
|
result = await session.execute(items_query)
|
|
items = result.scalars().all()
|
|
|
|
return PaginatedPokemonResponse(
|
|
items=items,
|
|
total=total,
|
|
limit=limit,
|
|
offset=offset,
|
|
)
|
|
|
|
|
|
@router.post("/pokemon/bulk-import", response_model=BulkImportResult)
|
|
async def bulk_import_pokemon(
|
|
items: list[BulkImportItem],
|
|
session: AsyncSession = Depends(get_session),
|
|
):
|
|
created = 0
|
|
updated = 0
|
|
errors: list[str] = []
|
|
|
|
for item in items:
|
|
try:
|
|
existing = await session.execute(
|
|
select(Pokemon).where(Pokemon.pokeapi_id == item.pokeapi_id)
|
|
)
|
|
pokemon = existing.scalar_one_or_none()
|
|
|
|
if pokemon is not None:
|
|
pokemon.national_dex = item.national_dex
|
|
pokemon.name = item.name
|
|
pokemon.types = item.types
|
|
if item.sprite_url is not None:
|
|
pokemon.sprite_url = item.sprite_url
|
|
updated += 1
|
|
else:
|
|
pokemon = Pokemon(**item.model_dump())
|
|
session.add(pokemon)
|
|
created += 1
|
|
except Exception as e:
|
|
errors.append(f"PokeAPI #{item.pokeapi_id} ({item.name}): {e}")
|
|
|
|
await session.commit()
|
|
return BulkImportResult(created=created, updated=updated, errors=errors)
|
|
|
|
|
|
@router.post("/pokemon", response_model=PokemonResponse, status_code=201)
|
|
async def create_pokemon(
|
|
data: PokemonCreate, session: AsyncSession = Depends(get_session)
|
|
):
|
|
existing = await session.execute(
|
|
select(Pokemon).where(Pokemon.pokeapi_id == data.pokeapi_id)
|
|
)
|
|
if existing.scalar_one_or_none() is not None:
|
|
raise HTTPException(
|
|
status_code=409,
|
|
detail=f"Pokemon with PokeAPI ID #{data.pokeapi_id} already exists",
|
|
)
|
|
|
|
pokemon = Pokemon(**data.model_dump())
|
|
session.add(pokemon)
|
|
await session.commit()
|
|
await session.refresh(pokemon)
|
|
return pokemon
|
|
|
|
|
|
@router.get("/pokemon/families", response_model=FamiliesResponse)
|
|
async def get_pokemon_families(
|
|
session: AsyncSession = Depends(get_session),
|
|
):
|
|
"""Return evolution families as connected components of Pokemon IDs."""
|
|
from collections import deque
|
|
|
|
result = await session.execute(select(Evolution))
|
|
evolutions = result.scalars().all()
|
|
|
|
# Build undirected adjacency list
|
|
adj: dict[int, set[int]] = {}
|
|
for evo in evolutions:
|
|
adj.setdefault(evo.from_pokemon_id, set()).add(evo.to_pokemon_id)
|
|
adj.setdefault(evo.to_pokemon_id, set()).add(evo.from_pokemon_id)
|
|
|
|
# BFS to find connected components
|
|
visited: set[int] = set()
|
|
families: list[list[int]] = []
|
|
for node in adj:
|
|
if node in visited:
|
|
continue
|
|
component: list[int] = []
|
|
queue = deque([node])
|
|
while queue:
|
|
current = queue.popleft()
|
|
if current in visited:
|
|
continue
|
|
visited.add(current)
|
|
component.append(current)
|
|
for neighbor in adj.get(current, set()):
|
|
if neighbor not in visited:
|
|
queue.append(neighbor)
|
|
families.append(sorted(component))
|
|
|
|
return FamiliesResponse(families=families)
|
|
|
|
|
|
@router.get("/pokemon/{pokemon_id}", response_model=PokemonResponse)
|
|
async def get_pokemon(pokemon_id: int, session: AsyncSession = Depends(get_session)):
|
|
pokemon = await session.get(Pokemon, pokemon_id)
|
|
if pokemon is None:
|
|
raise HTTPException(status_code=404, detail="Pokemon not found")
|
|
return pokemon
|
|
|
|
|
|
@router.get("/pokemon/{pokemon_id}/forms", response_model=list[PokemonResponse])
|
|
async def get_pokemon_forms(
|
|
pokemon_id: int, session: AsyncSession = Depends(get_session)
|
|
):
|
|
pokemon = await session.get(Pokemon, pokemon_id)
|
|
if pokemon is None:
|
|
raise HTTPException(status_code=404, detail="Pokemon not found")
|
|
|
|
result = await session.execute(
|
|
select(Pokemon)
|
|
.where(Pokemon.national_dex == pokemon.national_dex, Pokemon.id != pokemon_id)
|
|
.order_by(Pokemon.pokeapi_id)
|
|
)
|
|
return result.scalars().all()
|
|
|
|
|
|
@router.get(
|
|
"/pokemon/{pokemon_id}/encounter-locations",
|
|
response_model=list[PokemonEncounterLocationResponse],
|
|
)
|
|
async def get_pokemon_encounter_locations(
|
|
pokemon_id: int, session: AsyncSession = Depends(get_session)
|
|
):
|
|
pokemon = await session.get(Pokemon, pokemon_id)
|
|
if pokemon is None:
|
|
raise HTTPException(status_code=404, detail="Pokemon not found")
|
|
|
|
result = await session.execute(
|
|
select(RouteEncounter)
|
|
.where(RouteEncounter.pokemon_id == pokemon_id)
|
|
.options(joinedload(RouteEncounter.route), joinedload(RouteEncounter.game))
|
|
.order_by(RouteEncounter.game_id, RouteEncounter.route_id)
|
|
)
|
|
encounters = result.scalars().unique().all()
|
|
|
|
grouped: dict[int, PokemonEncounterLocationResponse] = {}
|
|
for enc in encounters:
|
|
if enc.game_id not in grouped:
|
|
grouped[enc.game_id] = PokemonEncounterLocationResponse(
|
|
game_id=enc.game_id,
|
|
game_name=enc.game.name,
|
|
encounters=[],
|
|
)
|
|
grouped[enc.game_id].encounters.append(
|
|
PokemonEncounterLocationItem(
|
|
route_id=enc.route_id,
|
|
route_name=enc.route.name,
|
|
encounter_method=enc.encounter_method,
|
|
encounter_rate=enc.encounter_rate,
|
|
min_level=enc.min_level,
|
|
max_level=enc.max_level,
|
|
)
|
|
)
|
|
|
|
return list(grouped.values())
|
|
|
|
|
|
@router.get(
|
|
"/pokemon/{pokemon_id}/evolution-chain",
|
|
response_model=list[EvolutionAdminResponse],
|
|
)
|
|
async def get_pokemon_evolution_chain(
|
|
pokemon_id: int, session: AsyncSession = Depends(get_session)
|
|
):
|
|
from collections import deque
|
|
|
|
pokemon = await session.get(Pokemon, pokemon_id)
|
|
if pokemon is None:
|
|
raise HTTPException(status_code=404, detail="Pokemon not found")
|
|
|
|
# Load all evolutions to build adjacency
|
|
result = await session.execute(select(Evolution))
|
|
evolutions = result.scalars().all()
|
|
|
|
adj: dict[int, set[int]] = {}
|
|
for evo in evolutions:
|
|
adj.setdefault(evo.from_pokemon_id, set()).add(evo.to_pokemon_id)
|
|
adj.setdefault(evo.to_pokemon_id, set()).add(evo.from_pokemon_id)
|
|
|
|
# BFS from pokemon_id to find family members
|
|
family: set[int] = set()
|
|
queue = deque([pokemon_id])
|
|
while queue:
|
|
current = queue.popleft()
|
|
if current in family:
|
|
continue
|
|
family.add(current)
|
|
for neighbor in adj.get(current, set()):
|
|
if neighbor not in family:
|
|
queue.append(neighbor)
|
|
|
|
# Filter evolutions to only those in the family
|
|
family_evo_ids = [
|
|
evo.id
|
|
for evo in evolutions
|
|
if evo.from_pokemon_id in family and evo.to_pokemon_id in family
|
|
]
|
|
|
|
if not family_evo_ids:
|
|
return []
|
|
|
|
# Reload with eager-loaded relationships
|
|
result = await session.execute(
|
|
select(Evolution)
|
|
.where(Evolution.id.in_(family_evo_ids))
|
|
.options(
|
|
joinedload(Evolution.from_pokemon),
|
|
joinedload(Evolution.to_pokemon),
|
|
)
|
|
.order_by(Evolution.from_pokemon_id, Evolution.to_pokemon_id)
|
|
)
|
|
return result.scalars().unique().all()
|
|
|
|
|
|
@router.get("/pokemon/{pokemon_id}/evolutions", response_model=list[EvolutionResponse])
|
|
async def get_pokemon_evolutions(
|
|
pokemon_id: int,
|
|
region: str | None = Query(None),
|
|
session: AsyncSession = Depends(get_session),
|
|
):
|
|
pokemon = await session.get(Pokemon, pokemon_id)
|
|
if pokemon is None:
|
|
raise HTTPException(status_code=404, detail="Pokemon not found")
|
|
|
|
query = (
|
|
select(Evolution)
|
|
.where(Evolution.from_pokemon_id == pokemon_id)
|
|
.options(joinedload(Evolution.to_pokemon))
|
|
)
|
|
if region is not None:
|
|
query = query.where(or_(Evolution.region.is_(None), Evolution.region == region))
|
|
result = await session.execute(query)
|
|
evolutions = result.scalars().unique().all()
|
|
|
|
if region is not None:
|
|
# Regional evolutions replace the non-regional one that shares the
|
|
# same trigger + item (e.g. Pikachu + thunder-stone → Alolan Raichu
|
|
# replaces Pikachu + thunder-stone → Raichu in Alola).
|
|
regional_keys = {
|
|
(e.trigger, e.item) for e in evolutions if e.region is not None
|
|
}
|
|
if regional_keys:
|
|
evolutions = [
|
|
e
|
|
for e in evolutions
|
|
if e.region is not None or (e.trigger, e.item) not in regional_keys
|
|
]
|
|
|
|
return evolutions
|
|
|
|
|
|
@router.put("/pokemon/{pokemon_id}", response_model=PokemonResponse)
|
|
async def update_pokemon(
|
|
pokemon_id: int,
|
|
data: PokemonUpdate,
|
|
session: AsyncSession = Depends(get_session),
|
|
):
|
|
pokemon = await session.get(Pokemon, pokemon_id)
|
|
if pokemon is None:
|
|
raise HTTPException(status_code=404, detail="Pokemon not found")
|
|
|
|
update_data = data.model_dump(exclude_unset=True)
|
|
if "pokeapi_id" in update_data:
|
|
existing = await session.execute(
|
|
select(Pokemon).where(
|
|
Pokemon.pokeapi_id == update_data["pokeapi_id"],
|
|
Pokemon.id != pokemon_id,
|
|
)
|
|
)
|
|
if existing.scalar_one_or_none() is not None:
|
|
raise HTTPException(
|
|
status_code=409,
|
|
detail=f"Pokemon with PokeAPI ID #{update_data['pokeapi_id']} already exists",
|
|
)
|
|
|
|
for field, value in update_data.items():
|
|
setattr(pokemon, field, value)
|
|
|
|
await session.commit()
|
|
await session.refresh(pokemon)
|
|
return pokemon
|
|
|
|
|
|
@router.delete("/pokemon/{pokemon_id}", status_code=204)
|
|
async def delete_pokemon(pokemon_id: int, session: AsyncSession = Depends(get_session)):
|
|
result = await session.execute(
|
|
select(Pokemon)
|
|
.where(Pokemon.id == pokemon_id)
|
|
.options(selectinload(Pokemon.encounters))
|
|
)
|
|
pokemon = result.scalar_one_or_none()
|
|
if pokemon is None:
|
|
raise HTTPException(status_code=404, detail="Pokemon not found")
|
|
|
|
if pokemon.encounters:
|
|
raise HTTPException(
|
|
status_code=409,
|
|
detail="Cannot delete pokemon with existing encounters.",
|
|
)
|
|
|
|
await session.delete(pokemon)
|
|
await session.commit()
|
|
|
|
|
|
@router.get(
|
|
"/routes/{route_id}/pokemon",
|
|
response_model=list[RouteEncounterDetailResponse],
|
|
)
|
|
async def list_route_encounters(
|
|
route_id: int,
|
|
game_id: int | None = Query(None),
|
|
session: AsyncSession = Depends(get_session),
|
|
):
|
|
# Verify route exists
|
|
route = await session.get(Route, route_id)
|
|
if route is None:
|
|
raise HTTPException(status_code=404, detail="Route not found")
|
|
|
|
query = (
|
|
select(RouteEncounter)
|
|
.where(RouteEncounter.route_id == route_id)
|
|
.options(joinedload(RouteEncounter.pokemon))
|
|
.order_by(RouteEncounter.encounter_rate.desc())
|
|
)
|
|
if game_id is not None:
|
|
query = query.where(RouteEncounter.game_id == game_id)
|
|
|
|
result = await session.execute(query)
|
|
return result.scalars().unique().all()
|
|
|
|
|
|
@router.post(
|
|
"/routes/{route_id}/pokemon",
|
|
response_model=RouteEncounterDetailResponse,
|
|
status_code=201,
|
|
)
|
|
async def add_route_encounter(
|
|
route_id: int,
|
|
data: RouteEncounterCreate,
|
|
session: AsyncSession = Depends(get_session),
|
|
):
|
|
route = await session.get(Route, route_id)
|
|
if route is None:
|
|
raise HTTPException(status_code=404, detail="Route not found")
|
|
|
|
pokemon = await session.get(Pokemon, data.pokemon_id)
|
|
if pokemon is None:
|
|
raise HTTPException(status_code=404, detail="Pokemon not found")
|
|
|
|
encounter = RouteEncounter(route_id=route_id, **data.model_dump())
|
|
session.add(encounter)
|
|
await session.commit()
|
|
|
|
# Reload with pokemon relationship
|
|
result = await session.execute(
|
|
select(RouteEncounter)
|
|
.where(RouteEncounter.id == encounter.id)
|
|
.options(joinedload(RouteEncounter.pokemon))
|
|
)
|
|
return result.scalar_one()
|
|
|
|
|
|
@router.put(
|
|
"/routes/{route_id}/pokemon/{encounter_id}",
|
|
response_model=RouteEncounterDetailResponse,
|
|
)
|
|
async def update_route_encounter(
|
|
route_id: int,
|
|
encounter_id: int,
|
|
data: RouteEncounterUpdate,
|
|
session: AsyncSession = Depends(get_session),
|
|
):
|
|
result = await session.execute(
|
|
select(RouteEncounter)
|
|
.where(RouteEncounter.id == encounter_id, RouteEncounter.route_id == route_id)
|
|
.options(joinedload(RouteEncounter.pokemon))
|
|
)
|
|
encounter = result.scalar_one_or_none()
|
|
if encounter is None:
|
|
raise HTTPException(status_code=404, detail="Route encounter not found")
|
|
|
|
for field, value in data.model_dump(exclude_unset=True).items():
|
|
setattr(encounter, field, value)
|
|
|
|
await session.commit()
|
|
await session.refresh(encounter)
|
|
|
|
# Reload with pokemon relationship
|
|
result = await session.execute(
|
|
select(RouteEncounter)
|
|
.where(RouteEncounter.id == encounter.id)
|
|
.options(joinedload(RouteEncounter.pokemon))
|
|
)
|
|
return result.scalar_one()
|
|
|
|
|
|
@router.delete("/routes/{route_id}/pokemon/{encounter_id}", status_code=204)
|
|
async def remove_route_encounter(
|
|
route_id: int,
|
|
encounter_id: int,
|
|
session: AsyncSession = Depends(get_session),
|
|
):
|
|
encounter = await session.execute(
|
|
select(RouteEncounter).where(
|
|
RouteEncounter.id == encounter_id,
|
|
RouteEncounter.route_id == route_id,
|
|
)
|
|
)
|
|
encounter = encounter.scalar_one_or_none()
|
|
if encounter is None:
|
|
raise HTTPException(status_code=404, detail="Route encounter not found")
|
|
|
|
await session.delete(encounter)
|
|
await session.commit()
|