## Summary - Add `is_admin` column to users table with Alembic migration and a `require_admin` FastAPI dependency that protects all admin-facing write endpoints (games, pokemon, evolutions, bosses, routes CRUD) - Expose admin status to frontend via user API and update AuthContext to fetch/store `isAdmin` after login - Make navigation menu auth-aware (different links for logged-out, logged-in, and admin users) and protect frontend routes with `ProtectedRoute` and `AdminRoute` components, preserving deep-linking through redirects - Fix test reliability: `drop_all` before `create_all` to clear stale PostgreSQL enums from interrupted test runs - Fix test auth: add `admin_client` fixture and use valid UUID for mock user so tests pass with new admin-protected endpoints ## Test plan - [x] All 252 backend tests pass - [ ] Verify non-admin users cannot access admin write endpoints (games, pokemon, evolutions, bosses CRUD) - [ ] Verify admin users can access admin endpoints normally - [ ] Verify navigation shows correct links for logged-out, logged-in, and admin states - [ ] Verify `/admin/*` routes redirect non-admin users with a toast - [ ] Verify `/runs/new` and `/genlockes/new` redirect unauthenticated users to login, then back after auth 🤖 Generated with [Claude Code](https://claude.com/claude-code) Reviewed-on: #67 Co-authored-by: Julian Tabel <juliantabel.jt@gmail.com> Co-committed-by: Julian Tabel <juliantabel.jt@gmail.com>
232 lines
7.8 KiB
Python
232 lines
7.8 KiB
Python
from fastapi import APIRouter, Depends, HTTPException, Query
|
|
from sqlalchemy import func, or_, select
|
|
from sqlalchemy.ext.asyncio import AsyncSession
|
|
from sqlalchemy.orm import joinedload
|
|
|
|
from app.core.auth import AuthUser, require_admin
|
|
from app.core.database import get_session
|
|
from app.models.evolution import Evolution
|
|
from app.models.pokemon import Pokemon
|
|
from app.schemas.pokemon import (
|
|
BulkEvolutionItem,
|
|
BulkImportResult,
|
|
EvolutionAdminResponse,
|
|
EvolutionCreate,
|
|
EvolutionUpdate,
|
|
PaginatedEvolutionResponse,
|
|
)
|
|
|
|
router = APIRouter()
|
|
|
|
|
|
@router.get("/evolutions", response_model=PaginatedEvolutionResponse)
|
|
async def list_evolutions(
|
|
search: str | None = Query(None),
|
|
trigger: str | None = Query(None),
|
|
limit: int = Query(50, ge=1, le=500),
|
|
offset: int = Query(0, ge=0),
|
|
session: AsyncSession = Depends(get_session),
|
|
):
|
|
base_query = select(Evolution).options(
|
|
joinedload(Evolution.from_pokemon), joinedload(Evolution.to_pokemon)
|
|
)
|
|
|
|
if search:
|
|
search_lower = search.lower()
|
|
# Join pokemon to search by name
|
|
from_pokemon = (
|
|
select(Pokemon.id)
|
|
.where(func.lower(Pokemon.name).contains(search_lower))
|
|
.scalar_subquery()
|
|
)
|
|
base_query = base_query.where(
|
|
or_(
|
|
Evolution.from_pokemon_id.in_(from_pokemon),
|
|
Evolution.to_pokemon_id.in_(from_pokemon),
|
|
func.lower(Evolution.trigger).contains(search_lower),
|
|
func.lower(Evolution.item).contains(search_lower),
|
|
)
|
|
)
|
|
if trigger:
|
|
base_query = base_query.where(Evolution.trigger == trigger)
|
|
|
|
# Count total (without eager loads)
|
|
count_base = select(Evolution)
|
|
if search:
|
|
search_lower = search.lower()
|
|
from_pokemon = (
|
|
select(Pokemon.id)
|
|
.where(func.lower(Pokemon.name).contains(search_lower))
|
|
.scalar_subquery()
|
|
)
|
|
count_base = count_base.where(
|
|
or_(
|
|
Evolution.from_pokemon_id.in_(from_pokemon),
|
|
Evolution.to_pokemon_id.in_(from_pokemon),
|
|
func.lower(Evolution.trigger).contains(search_lower),
|
|
func.lower(Evolution.item).contains(search_lower),
|
|
)
|
|
)
|
|
if trigger:
|
|
count_base = count_base.where(Evolution.trigger == trigger)
|
|
count_query = select(func.count()).select_from(count_base.subquery())
|
|
total = (await session.execute(count_query)).scalar() or 0
|
|
|
|
items_query = (
|
|
base_query.order_by(Evolution.from_pokemon_id, Evolution.to_pokemon_id)
|
|
.offset(offset)
|
|
.limit(limit)
|
|
)
|
|
result = await session.execute(items_query)
|
|
items = result.scalars().unique().all()
|
|
|
|
return PaginatedEvolutionResponse(
|
|
items=items,
|
|
total=total,
|
|
limit=limit,
|
|
offset=offset,
|
|
)
|
|
|
|
|
|
@router.post("/evolutions", response_model=EvolutionAdminResponse, status_code=201)
|
|
async def create_evolution(
|
|
data: EvolutionCreate,
|
|
session: AsyncSession = Depends(get_session),
|
|
_user: AuthUser = Depends(require_admin),
|
|
):
|
|
from_pokemon = await session.get(Pokemon, data.from_pokemon_id)
|
|
if from_pokemon is None:
|
|
raise HTTPException(status_code=404, detail="From pokemon not found")
|
|
|
|
to_pokemon = await session.get(Pokemon, data.to_pokemon_id)
|
|
if to_pokemon is None:
|
|
raise HTTPException(status_code=404, detail="To pokemon not found")
|
|
|
|
evolution = Evolution(**data.model_dump())
|
|
session.add(evolution)
|
|
await session.commit()
|
|
|
|
# Reload with relationships
|
|
result = await session.execute(
|
|
select(Evolution)
|
|
.where(Evolution.id == evolution.id)
|
|
.options(joinedload(Evolution.from_pokemon), joinedload(Evolution.to_pokemon))
|
|
)
|
|
return result.scalar_one()
|
|
|
|
|
|
@router.put("/evolutions/{evolution_id}", response_model=EvolutionAdminResponse)
|
|
async def update_evolution(
|
|
evolution_id: int,
|
|
data: EvolutionUpdate,
|
|
session: AsyncSession = Depends(get_session),
|
|
_user: AuthUser = Depends(require_admin),
|
|
):
|
|
evolution = await session.get(Evolution, evolution_id)
|
|
if evolution is None:
|
|
raise HTTPException(status_code=404, detail="Evolution not found")
|
|
|
|
update_data = data.model_dump(exclude_unset=True)
|
|
|
|
if "from_pokemon_id" in update_data:
|
|
from_pokemon = await session.get(Pokemon, update_data["from_pokemon_id"])
|
|
if from_pokemon is None:
|
|
raise HTTPException(status_code=404, detail="From pokemon not found")
|
|
|
|
if "to_pokemon_id" in update_data:
|
|
to_pokemon = await session.get(Pokemon, update_data["to_pokemon_id"])
|
|
if to_pokemon is None:
|
|
raise HTTPException(status_code=404, detail="To pokemon not found")
|
|
|
|
for field, value in update_data.items():
|
|
setattr(evolution, field, value)
|
|
|
|
await session.commit()
|
|
|
|
# Reload with relationships
|
|
result = await session.execute(
|
|
select(Evolution)
|
|
.where(Evolution.id == evolution.id)
|
|
.options(joinedload(Evolution.from_pokemon), joinedload(Evolution.to_pokemon))
|
|
)
|
|
return result.scalar_one()
|
|
|
|
|
|
@router.delete("/evolutions/{evolution_id}", status_code=204)
|
|
async def delete_evolution(
|
|
evolution_id: int,
|
|
session: AsyncSession = Depends(get_session),
|
|
_user: AuthUser = Depends(require_admin),
|
|
):
|
|
evolution = await session.get(Evolution, evolution_id)
|
|
if evolution is None:
|
|
raise HTTPException(status_code=404, detail="Evolution not found")
|
|
|
|
await session.delete(evolution)
|
|
await session.commit()
|
|
|
|
|
|
@router.post("/evolutions/bulk-import", response_model=BulkImportResult)
|
|
async def bulk_import_evolutions(
|
|
items: list[BulkEvolutionItem],
|
|
session: AsyncSession = Depends(get_session),
|
|
_user: AuthUser = Depends(require_admin),
|
|
):
|
|
# Build pokeapi_id -> id mapping
|
|
result = await session.execute(select(Pokemon.pokeapi_id, Pokemon.id))
|
|
dex_to_id = {row.pokeapi_id: row.id for row in result}
|
|
|
|
created = 0
|
|
updated = 0
|
|
errors: list[str] = []
|
|
|
|
for item in items:
|
|
from_id = dex_to_id.get(item.from_pokeapi_id)
|
|
to_id = dex_to_id.get(item.to_pokeapi_id)
|
|
|
|
if from_id is None:
|
|
errors.append(f"Pokemon with pokeapi_id {item.from_pokeapi_id} not found")
|
|
continue
|
|
if to_id is None:
|
|
errors.append(f"Pokemon with pokeapi_id {item.to_pokeapi_id} not found")
|
|
continue
|
|
|
|
try:
|
|
# Check if evolution already exists
|
|
existing = await session.execute(
|
|
select(Evolution).where(
|
|
Evolution.from_pokemon_id == from_id,
|
|
Evolution.to_pokemon_id == to_id,
|
|
)
|
|
)
|
|
evolution = existing.scalar_one_or_none()
|
|
|
|
if evolution is not None:
|
|
evolution.trigger = item.trigger
|
|
evolution.min_level = item.min_level
|
|
evolution.item = item.item
|
|
evolution.held_item = item.held_item
|
|
evolution.condition = item.condition
|
|
evolution.region = item.region
|
|
updated += 1
|
|
else:
|
|
evolution = Evolution(
|
|
from_pokemon_id=from_id,
|
|
to_pokemon_id=to_id,
|
|
trigger=item.trigger,
|
|
min_level=item.min_level,
|
|
item=item.item,
|
|
held_item=item.held_item,
|
|
condition=item.condition,
|
|
region=item.region,
|
|
)
|
|
session.add(evolution)
|
|
created += 1
|
|
except Exception as e:
|
|
errors.append(
|
|
f"Evolution {item.from_pokeapi_id} -> {item.to_pokeapi_id}: {e}"
|
|
)
|
|
|
|
await session.commit()
|
|
return BulkImportResult(created=created, updated=updated, errors=errors)
|