Files
nuzlocke-tracker/backend/src/app/api/evolutions.py
Julian Tabel c6521dd206 Add filter controls to admin tables
Pokemon (type), Evolutions (trigger), Games (region/generation),
and Runs (status/game) now have dropdown filters alongside search.
Server-side filtering for paginated tables, client-side for small datasets.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
2026-02-08 20:29:55 +01:00

216 lines
7.4 KiB
Python

from fastapi import APIRouter, Depends, HTTPException, Query
from sqlalchemy import func, or_, select
from sqlalchemy.ext.asyncio import AsyncSession
from sqlalchemy.orm import joinedload
from app.core.database import get_session
from app.models.evolution import Evolution
from app.models.pokemon import Pokemon
from app.schemas.pokemon import (
BulkEvolutionItem,
BulkImportResult,
EvolutionAdminResponse,
EvolutionCreate,
EvolutionUpdate,
PaginatedEvolutionResponse,
)
router = APIRouter()
@router.get("/evolutions", response_model=PaginatedEvolutionResponse)
async def list_evolutions(
search: str | None = Query(None),
trigger: str | None = Query(None),
limit: int = Query(50, ge=1, le=500),
offset: int = Query(0, ge=0),
session: AsyncSession = Depends(get_session),
):
base_query = (
select(Evolution)
.options(joinedload(Evolution.from_pokemon), joinedload(Evolution.to_pokemon))
)
if search:
search_lower = search.lower()
# Join pokemon to search by name
from_pokemon = select(Pokemon.id).where(
func.lower(Pokemon.name).contains(search_lower)
).scalar_subquery()
base_query = base_query.where(
or_(
Evolution.from_pokemon_id.in_(from_pokemon),
Evolution.to_pokemon_id.in_(from_pokemon),
func.lower(Evolution.trigger).contains(search_lower),
func.lower(Evolution.item).contains(search_lower),
)
)
if trigger:
base_query = base_query.where(Evolution.trigger == trigger)
# Count total (without eager loads)
count_base = select(Evolution)
if search:
search_lower = search.lower()
from_pokemon = select(Pokemon.id).where(
func.lower(Pokemon.name).contains(search_lower)
).scalar_subquery()
count_base = count_base.where(
or_(
Evolution.from_pokemon_id.in_(from_pokemon),
Evolution.to_pokemon_id.in_(from_pokemon),
func.lower(Evolution.trigger).contains(search_lower),
func.lower(Evolution.item).contains(search_lower),
)
)
if trigger:
count_base = count_base.where(Evolution.trigger == trigger)
count_query = select(func.count()).select_from(count_base.subquery())
total = (await session.execute(count_query)).scalar() or 0
items_query = base_query.order_by(Evolution.from_pokemon_id, Evolution.to_pokemon_id).offset(offset).limit(limit)
result = await session.execute(items_query)
items = result.scalars().unique().all()
return PaginatedEvolutionResponse(
items=items,
total=total,
limit=limit,
offset=offset,
)
@router.post("/evolutions", response_model=EvolutionAdminResponse, status_code=201)
async def create_evolution(
data: EvolutionCreate, session: AsyncSession = Depends(get_session)
):
from_pokemon = await session.get(Pokemon, data.from_pokemon_id)
if from_pokemon is None:
raise HTTPException(status_code=404, detail="From pokemon not found")
to_pokemon = await session.get(Pokemon, data.to_pokemon_id)
if to_pokemon is None:
raise HTTPException(status_code=404, detail="To pokemon not found")
evolution = Evolution(**data.model_dump())
session.add(evolution)
await session.commit()
# Reload with relationships
result = await session.execute(
select(Evolution)
.where(Evolution.id == evolution.id)
.options(joinedload(Evolution.from_pokemon), joinedload(Evolution.to_pokemon))
)
return result.scalar_one()
@router.put("/evolutions/{evolution_id}", response_model=EvolutionAdminResponse)
async def update_evolution(
evolution_id: int,
data: EvolutionUpdate,
session: AsyncSession = Depends(get_session),
):
evolution = await session.get(Evolution, evolution_id)
if evolution is None:
raise HTTPException(status_code=404, detail="Evolution not found")
update_data = data.model_dump(exclude_unset=True)
if "from_pokemon_id" in update_data:
from_pokemon = await session.get(Pokemon, update_data["from_pokemon_id"])
if from_pokemon is None:
raise HTTPException(status_code=404, detail="From pokemon not found")
if "to_pokemon_id" in update_data:
to_pokemon = await session.get(Pokemon, update_data["to_pokemon_id"])
if to_pokemon is None:
raise HTTPException(status_code=404, detail="To pokemon not found")
for field, value in update_data.items():
setattr(evolution, field, value)
await session.commit()
# Reload with relationships
result = await session.execute(
select(Evolution)
.where(Evolution.id == evolution.id)
.options(joinedload(Evolution.from_pokemon), joinedload(Evolution.to_pokemon))
)
return result.scalar_one()
@router.delete("/evolutions/{evolution_id}", status_code=204)
async def delete_evolution(
evolution_id: int, session: AsyncSession = Depends(get_session)
):
evolution = await session.get(Evolution, evolution_id)
if evolution is None:
raise HTTPException(status_code=404, detail="Evolution not found")
await session.delete(evolution)
await session.commit()
@router.post("/evolutions/bulk-import", response_model=BulkImportResult)
async def bulk_import_evolutions(
items: list[BulkEvolutionItem],
session: AsyncSession = Depends(get_session),
):
# Build pokeapi_id -> id mapping
result = await session.execute(select(Pokemon.pokeapi_id, Pokemon.id))
dex_to_id = {row.pokeapi_id: row.id for row in result}
created = 0
updated = 0
errors: list[str] = []
for item in items:
from_id = dex_to_id.get(item.from_pokeapi_id)
to_id = dex_to_id.get(item.to_pokeapi_id)
if from_id is None:
errors.append(f"Pokemon with pokeapi_id {item.from_pokeapi_id} not found")
continue
if to_id is None:
errors.append(f"Pokemon with pokeapi_id {item.to_pokeapi_id} not found")
continue
try:
# Check if evolution already exists
existing = await session.execute(
select(Evolution).where(
Evolution.from_pokemon_id == from_id,
Evolution.to_pokemon_id == to_id,
)
)
evolution = existing.scalar_one_or_none()
if evolution is not None:
evolution.trigger = item.trigger
evolution.min_level = item.min_level
evolution.item = item.item
evolution.held_item = item.held_item
evolution.condition = item.condition
evolution.region = item.region
updated += 1
else:
evolution = Evolution(
from_pokemon_id=from_id,
to_pokemon_id=to_id,
trigger=item.trigger,
min_level=item.min_level,
item=item.item,
held_item=item.held_item,
condition=item.condition,
region=item.region,
)
session.add(evolution)
created += 1
except Exception as e:
errors.append(f"Evolution {item.from_pokeapi_id} -> {item.to_pokeapi_id}: {e}")
await session.commit()
return BulkImportResult(created=created, updated=updated, errors=errors)