from fastapi import APIRouter, Depends from sqlalchemy import func, select from sqlalchemy.ext.asyncio import AsyncSession from app.core.database import get_session from app.models.encounter import Encounter from app.models.game import Game from app.models.nuzlocke_run import NuzlockeRun from app.models.pokemon import Pokemon from app.schemas.stats import ( DeathCause, GameRunCount, PokemonRanking, StatsResponse, TypeCount, ) router = APIRouter() @router.get("", response_model=StatsResponse) async def get_stats(session: AsyncSession = Depends(get_session)): # --- Run overview --- run_counts = await session.execute( select( func.count().label("total"), func.count().filter(NuzlockeRun.status == "active").label("active"), func.count().filter(NuzlockeRun.status == "completed").label("completed"), func.count().filter(NuzlockeRun.status == "failed").label("failed"), ).select_from(NuzlockeRun) ) row = run_counts.one() total_runs = row.total active_runs = row.active completed_runs = row.completed failed_runs = row.failed finished = completed_runs + failed_runs win_rate = round(completed_runs / finished, 4) if finished > 0 else None # Average duration for finished runs avg_dur = await session.execute( select( func.avg( func.extract("epoch", NuzlockeRun.completed_at) - func.extract("epoch", NuzlockeRun.started_at) ) ) .select_from(NuzlockeRun) .where(NuzlockeRun.completed_at.is_not(None)) ) avg_seconds = avg_dur.scalar() avg_duration_days = round(avg_seconds / 86400, 1) if avg_seconds else None # --- Runs by game --- runs_by_game_q = await session.execute( select( Game.id, Game.name, Game.color, func.count().label("count"), ) .join(NuzlockeRun, NuzlockeRun.game_id == Game.id) .group_by(Game.id, Game.name, Game.color) .order_by(func.count().desc()) ) runs_by_game = [ GameRunCount(game_id=r.id, game_name=r.name, game_color=r.color, count=r.count) for r in runs_by_game_q.all() ] # --- Encounter stats --- enc_counts = await session.execute( select( func.count().label("total"), func.count().filter(Encounter.status == "caught").label("caught"), func.count().filter(Encounter.status == "fainted").label("fainted"), func.count().filter(Encounter.status == "missed").label("missed"), ).select_from(Encounter) ) enc = enc_counts.one() total_encounters = enc.total caught_count = enc.caught fainted_count = enc.fainted missed_count = enc.missed catch_rate = round(caught_count / total_encounters, 4) if total_encounters > 0 else None avg_encounters_per_run = round(total_encounters / total_runs, 1) if total_runs > 0 else None # --- Top caught pokemon (top 10) --- top_caught_q = await session.execute( select( Pokemon.id, Pokemon.name, Pokemon.sprite_url, func.count().label("count"), ) .join(Encounter, Encounter.pokemon_id == Pokemon.id) .where(Encounter.status == "caught") .group_by(Pokemon.id, Pokemon.name, Pokemon.sprite_url) .order_by(func.count().desc()) .limit(10) ) top_caught_pokemon = [ PokemonRanking(pokemon_id=r.id, name=r.name, sprite_url=r.sprite_url, count=r.count) for r in top_caught_q.all() ] # --- Top encountered pokemon (top 10) --- top_enc_q = await session.execute( select( Pokemon.id, Pokemon.name, Pokemon.sprite_url, func.count().label("count"), ) .join(Encounter, Encounter.pokemon_id == Pokemon.id) .group_by(Pokemon.id, Pokemon.name, Pokemon.sprite_url) .order_by(func.count().desc()) .limit(10) ) top_encountered_pokemon = [ PokemonRanking(pokemon_id=r.id, name=r.name, sprite_url=r.sprite_url, count=r.count) for r in top_enc_q.all() ] # --- Deaths --- total_deaths_q = await session.execute( select(func.count()) .select_from(Encounter) .where(Encounter.status == "caught", Encounter.faint_level.is_not(None)) ) total_deaths = total_deaths_q.scalar() or 0 mortality_rate = round(total_deaths / caught_count, 4) if caught_count > 0 else None # Top death causes death_causes_q = await session.execute( select( Encounter.death_cause, func.count().label("count"), ) .where( Encounter.death_cause.is_not(None), Encounter.death_cause != "", ) .group_by(Encounter.death_cause) .order_by(func.count().desc()) .limit(5) ) top_death_causes = [ DeathCause(cause=r.death_cause, count=r.count) for r in death_causes_q.all() ] # Average levels avg_levels = await session.execute( select( func.avg(Encounter.catch_level).label("avg_catch"), func.avg(Encounter.faint_level).label("avg_faint"), ) .select_from(Encounter) .where(Encounter.status == "caught") ) levels = avg_levels.one() avg_catch_level = round(float(levels.avg_catch), 1) if levels.avg_catch else None avg_faint_level = round(float(levels.avg_faint), 1) if levels.avg_faint else None # --- Type distribution --- # Pokemon types is a PostgreSQL ARRAY, unnest it type_q = await session.execute( select( func.unnest(Pokemon.types).label("type_name"), func.count().label("count"), ) .join(Encounter, Encounter.pokemon_id == Pokemon.id) .where(Encounter.status == "caught") .group_by("type_name") .order_by(func.count().desc()) ) type_distribution = [ TypeCount(type=r.type_name, count=r.count) for r in type_q.all() ] return StatsResponse( total_runs=total_runs, active_runs=active_runs, completed_runs=completed_runs, failed_runs=failed_runs, win_rate=win_rate, avg_duration_days=avg_duration_days, runs_by_game=runs_by_game, total_encounters=total_encounters, caught_count=caught_count, fainted_count=fainted_count, missed_count=missed_count, catch_rate=catch_rate, avg_encounters_per_run=avg_encounters_per_run, top_caught_pokemon=top_caught_pokemon, top_encountered_pokemon=top_encountered_pokemon, total_deaths=total_deaths, mortality_rate=mortality_rate, top_death_causes=top_death_causes, avg_catch_level=avg_catch_level, avg_faint_level=avg_faint_level, type_distribution=type_distribution, )