Add seeder support for boss battles so new database instances come pre-populated. Adds --export-bosses CLI flag to dump boss data from the database to JSON seed files, and loads those files during normal seeding. Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
293 lines
11 KiB
Python
293 lines
11 KiB
Python
"""Seed runner — reads JSON files and upserts into the database."""
|
|
|
|
import json
|
|
from pathlib import Path
|
|
|
|
from sqlalchemy import func, select
|
|
from sqlalchemy.ext.asyncio import AsyncSession
|
|
from sqlalchemy.orm import selectinload
|
|
|
|
from app.core.database import async_session
|
|
from app.models.boss_battle import BossBattle
|
|
from app.models.boss_pokemon import BossPokemon
|
|
from app.models.game import Game
|
|
from app.models.pokemon import Pokemon
|
|
from app.models.route import Route
|
|
from app.models.route_encounter import RouteEncounter
|
|
from app.models.version_group import VersionGroup
|
|
from app.seeds.loader import (
|
|
upsert_bosses,
|
|
upsert_evolutions,
|
|
upsert_games,
|
|
upsert_pokemon,
|
|
upsert_route_encounters,
|
|
upsert_routes,
|
|
upsert_version_groups,
|
|
)
|
|
|
|
DATA_DIR = Path(__file__).parent / "data"
|
|
VG_JSON = Path(__file__).parent / "version_groups.json"
|
|
|
|
|
|
def load_json(filename: str):
|
|
path = DATA_DIR / filename
|
|
with open(path) as f:
|
|
return json.load(f)
|
|
|
|
|
|
async def seed():
|
|
"""Run the full seed process."""
|
|
print("Starting seed...")
|
|
|
|
async with async_session() as session:
|
|
async with session.begin():
|
|
# 1. Upsert version groups
|
|
with open(VG_JSON) as f:
|
|
vg_data = json.load(f)
|
|
vg_slug_to_id = await upsert_version_groups(session, vg_data)
|
|
print(f"Version Groups: {len(vg_slug_to_id)} upserted")
|
|
|
|
# Build game_slug -> vg_id mapping
|
|
game_slug_to_vg_id: dict[str, int] = {}
|
|
for vg_slug, vg_info in vg_data.items():
|
|
vg_id = vg_slug_to_id[vg_slug]
|
|
for game_slug in vg_info["games"]:
|
|
game_slug_to_vg_id[game_slug] = vg_id
|
|
|
|
# 2. Upsert games (with version_group_id)
|
|
games_data = load_json("games.json")
|
|
slug_to_id = await upsert_games(session, games_data, game_slug_to_vg_id)
|
|
print(f"Games: {len(slug_to_id)} upserted")
|
|
|
|
# 3. Upsert Pokemon
|
|
pokemon_data = load_json("pokemon.json")
|
|
dex_to_id = await upsert_pokemon(session, pokemon_data)
|
|
print(f"Pokemon: {len(dex_to_id)} upserted")
|
|
|
|
# 4. Per version group: upsert routes once, then encounters per game
|
|
total_routes = 0
|
|
total_encounters = 0
|
|
|
|
for vg_slug, vg_info in vg_data.items():
|
|
vg_id = vg_slug_to_id[vg_slug]
|
|
game_slugs = list(vg_info["games"].keys())
|
|
|
|
# Use the first game's route JSON for the shared route structure
|
|
first_game_slug = game_slugs[0]
|
|
routes_file = DATA_DIR / f"{first_game_slug}.json"
|
|
if not routes_file.exists():
|
|
print(f" {vg_slug}: no route data ({first_game_slug}.json), skipping")
|
|
continue
|
|
|
|
routes_data = load_json(f"{first_game_slug}.json")
|
|
if not routes_data:
|
|
print(f" {vg_slug}: empty route data, skipping")
|
|
continue
|
|
|
|
# Upsert routes once per version group
|
|
route_map = await upsert_routes(session, vg_id, routes_data)
|
|
total_routes += len(route_map)
|
|
print(f" {vg_slug}: {len(route_map)} routes")
|
|
|
|
# Upsert encounters per game (each game may have different encounters)
|
|
for game_slug in game_slugs:
|
|
game_id = slug_to_id.get(game_slug)
|
|
if game_id is None:
|
|
print(f" Warning: game '{game_slug}' not found, skipping")
|
|
continue
|
|
|
|
game_routes_file = DATA_DIR / f"{game_slug}.json"
|
|
if not game_routes_file.exists():
|
|
continue
|
|
|
|
game_routes_data = load_json(f"{game_slug}.json")
|
|
for route in game_routes_data:
|
|
route_id = route_map.get(route["name"])
|
|
if route_id is None:
|
|
print(f" Warning: route '{route['name']}' not found")
|
|
continue
|
|
|
|
# Parent routes may have empty encounters
|
|
if route["encounters"]:
|
|
enc_count = await upsert_route_encounters(
|
|
session, route_id, route["encounters"],
|
|
dex_to_id, game_id,
|
|
)
|
|
total_encounters += enc_count
|
|
|
|
# Handle child routes
|
|
for child in route.get("children", []):
|
|
child_id = route_map.get(child["name"])
|
|
if child_id is None:
|
|
print(f" Warning: child route '{child['name']}' not found")
|
|
continue
|
|
|
|
enc_count = await upsert_route_encounters(
|
|
session, child_id, child["encounters"],
|
|
dex_to_id, game_id,
|
|
)
|
|
total_encounters += enc_count
|
|
|
|
print(f" {game_slug}: encounters loaded")
|
|
|
|
print(f"\nTotal routes: {total_routes}")
|
|
print(f"Total encounters: {total_encounters}")
|
|
|
|
# 5. Per version group: upsert bosses
|
|
total_bosses = 0
|
|
for vg_slug, vg_info in vg_data.items():
|
|
vg_id = vg_slug_to_id[vg_slug]
|
|
first_game_slug = list(vg_info["games"].keys())[0]
|
|
bosses_file = DATA_DIR / f"{first_game_slug}-bosses.json"
|
|
if not bosses_file.exists():
|
|
continue
|
|
|
|
bosses_data = load_json(f"{first_game_slug}-bosses.json")
|
|
if not bosses_data:
|
|
continue
|
|
|
|
boss_count = await upsert_bosses(session, vg_id, bosses_data, dex_to_id)
|
|
total_bosses += boss_count
|
|
print(f" {vg_slug}: {boss_count} bosses")
|
|
|
|
print(f"Total bosses: {total_bosses}")
|
|
|
|
# 6. Upsert evolutions
|
|
evolutions_path = DATA_DIR / "evolutions.json"
|
|
if evolutions_path.exists():
|
|
evolutions_data = load_json("evolutions.json")
|
|
evo_count = await upsert_evolutions(session, evolutions_data, dex_to_id)
|
|
print(f"Evolutions: {evo_count} upserted")
|
|
else:
|
|
print("No evolutions.json found, skipping evolutions")
|
|
|
|
print("Seed complete!")
|
|
|
|
|
|
async def verify():
|
|
"""Run post-seed verification checks."""
|
|
print("\n--- Verification ---")
|
|
|
|
async with async_session() as session:
|
|
# Overall counts
|
|
vg_count = (await session.execute(select(func.count(VersionGroup.id)))).scalar()
|
|
games_count = (await session.execute(select(func.count(Game.id)))).scalar()
|
|
pokemon_count = (await session.execute(select(func.count(Pokemon.id)))).scalar()
|
|
routes_count = (await session.execute(select(func.count(Route.id)))).scalar()
|
|
enc_count = (await session.execute(select(func.count(RouteEncounter.id)))).scalar()
|
|
boss_count = (await session.execute(select(func.count(BossBattle.id)))).scalar()
|
|
|
|
print(f"Version Groups: {vg_count}")
|
|
print(f"Games: {games_count}")
|
|
print(f"Pokemon: {pokemon_count}")
|
|
print(f"Routes: {routes_count}")
|
|
print(f"Route Encounters: {enc_count}")
|
|
print(f"Boss Battles: {boss_count}")
|
|
|
|
# Per-version-group route counts
|
|
result = await session.execute(
|
|
select(VersionGroup.slug, func.count(Route.id))
|
|
.join(Route, Route.version_group_id == VersionGroup.id)
|
|
.group_by(VersionGroup.slug)
|
|
.order_by(VersionGroup.slug)
|
|
)
|
|
print("\nRoutes per version group:")
|
|
for row in result:
|
|
print(f" {row[0]}: {row[1]}")
|
|
|
|
# Per-game encounter counts
|
|
result = await session.execute(
|
|
select(Game.name, func.count(RouteEncounter.id))
|
|
.join(RouteEncounter, RouteEncounter.game_id == Game.id)
|
|
.group_by(Game.name)
|
|
.order_by(Game.name)
|
|
)
|
|
print("\nEncounters per game:")
|
|
for row in result:
|
|
print(f" {row[0]}: {row[1]}")
|
|
|
|
# Per-version-group boss counts
|
|
result = await session.execute(
|
|
select(VersionGroup.slug, func.count(BossBattle.id))
|
|
.join(BossBattle, BossBattle.version_group_id == VersionGroup.id)
|
|
.group_by(VersionGroup.slug)
|
|
.order_by(VersionGroup.slug)
|
|
)
|
|
print("\nBosses per version group:")
|
|
for row in result:
|
|
print(f" {row[0]}: {row[1]}")
|
|
|
|
print("\nVerification complete!")
|
|
|
|
|
|
async def export_bosses():
|
|
"""Export boss battles from the database to seed JSON files."""
|
|
print("Exporting boss battles...")
|
|
|
|
async with async_session() as session:
|
|
# Load version group data to get the first game slug for filenames
|
|
with open(VG_JSON) as f:
|
|
vg_data = json.load(f)
|
|
|
|
# Query all version groups with their games
|
|
vg_result = await session.execute(
|
|
select(VersionGroup).options(selectinload(VersionGroup.games))
|
|
)
|
|
version_groups = vg_result.scalars().all()
|
|
|
|
slug_to_vg = {vg.slug: vg for vg in version_groups}
|
|
|
|
exported = 0
|
|
for vg_slug, vg_info in vg_data.items():
|
|
vg = slug_to_vg.get(vg_slug)
|
|
if vg is None:
|
|
continue
|
|
|
|
# Query boss battles for this version group
|
|
result = await session.execute(
|
|
select(BossBattle)
|
|
.where(BossBattle.version_group_id == vg.id)
|
|
.options(
|
|
selectinload(BossBattle.pokemon).selectinload(BossPokemon.pokemon),
|
|
)
|
|
.order_by(BossBattle.order)
|
|
)
|
|
bosses = result.scalars().all()
|
|
|
|
if not bosses:
|
|
continue
|
|
|
|
first_game_slug = list(vg_info["games"].keys())[0]
|
|
data = [
|
|
{
|
|
"name": b.name,
|
|
"boss_type": b.boss_type,
|
|
"badge_name": b.badge_name,
|
|
"badge_image_url": b.badge_image_url,
|
|
"level_cap": b.level_cap,
|
|
"order": b.order,
|
|
"location": b.location,
|
|
"sprite_url": b.sprite_url,
|
|
"pokemon": [
|
|
{
|
|
"pokeapi_id": bp.pokemon.pokeapi_id,
|
|
"pokemon_name": bp.pokemon.name,
|
|
"level": bp.level,
|
|
"order": bp.order,
|
|
}
|
|
for bp in sorted(b.pokemon, key=lambda p: p.order)
|
|
],
|
|
}
|
|
for b in bosses
|
|
]
|
|
|
|
out_path = DATA_DIR / f"{first_game_slug}-bosses.json"
|
|
with open(out_path, "w") as f:
|
|
json.dump(data, f, indent=2)
|
|
f.write("\n")
|
|
|
|
print(f" {vg_slug}: {len(bosses)} bosses -> {out_path.name}")
|
|
exported += 1
|
|
|
|
print(f"Exported bosses for {exported} version groups.")
|