Reviewed-on: TheFurya/nuzlocke-tracker#22 Co-authored-by: Julian Tabel <juliantabel.jt@gmail.com> Co-committed-by: Julian Tabel <juliantabel.jt@gmail.com>
559 lines
19 KiB
Python
559 lines
19 KiB
Python
"""Seed runner — reads JSON files and upserts into the database."""
|
|
|
|
import json
|
|
import re
|
|
import urllib.request
|
|
from pathlib import Path
|
|
|
|
from sqlalchemy import func, select
|
|
from sqlalchemy.ext.asyncio import AsyncSession
|
|
from sqlalchemy.orm import selectinload
|
|
|
|
from app.core.database import async_session
|
|
from app.models.boss_battle import BossBattle
|
|
from app.models.boss_pokemon import BossPokemon
|
|
from app.models.evolution import Evolution
|
|
from app.models.game import Game
|
|
from app.models.pokemon import Pokemon
|
|
from app.models.route import Route
|
|
from app.models.route_encounter import RouteEncounter
|
|
from app.models.version_group import VersionGroup
|
|
from app.seeds.loader import (
|
|
upsert_bosses,
|
|
upsert_evolutions,
|
|
upsert_games,
|
|
upsert_pokemon,
|
|
upsert_route_encounters,
|
|
upsert_routes,
|
|
upsert_version_groups,
|
|
)
|
|
|
|
DATA_DIR = Path(__file__).parent / "data"
|
|
VG_JSON = Path(__file__).parent / "version_groups.json"
|
|
|
|
|
|
def load_json(filename: str):
|
|
path = DATA_DIR / filename
|
|
with open(path) as f:
|
|
return json.load(f)
|
|
|
|
|
|
async def seed():
|
|
"""Run the full seed process."""
|
|
print("Starting seed...")
|
|
|
|
async with async_session() as session, session.begin():
|
|
# 1. Upsert version groups
|
|
with open(VG_JSON) as f:
|
|
vg_data = json.load(f)
|
|
vg_slug_to_id = await upsert_version_groups(session, vg_data)
|
|
print(f"Version Groups: {len(vg_slug_to_id)} upserted")
|
|
|
|
# Build game_slug -> vg_id mapping
|
|
game_slug_to_vg_id: dict[str, int] = {}
|
|
for vg_slug, vg_info in vg_data.items():
|
|
vg_id = vg_slug_to_id[vg_slug]
|
|
for game_slug in vg_info["games"]:
|
|
game_slug_to_vg_id[game_slug] = vg_id
|
|
|
|
# 2. Upsert games (with version_group_id)
|
|
games_data = load_json("games.json")
|
|
slug_to_id = await upsert_games(session, games_data, game_slug_to_vg_id)
|
|
print(f"Games: {len(slug_to_id)} upserted")
|
|
|
|
# 3. Upsert Pokemon
|
|
pokemon_data = load_json("pokemon.json")
|
|
dex_to_id = await upsert_pokemon(session, pokemon_data)
|
|
print(f"Pokemon: {len(dex_to_id)} upserted")
|
|
|
|
# 4. Per version group: upsert routes once, then encounters per game
|
|
total_routes = 0
|
|
total_encounters = 0
|
|
route_maps_by_vg: dict[int, dict[str, int]] = {}
|
|
|
|
for vg_slug, vg_info in vg_data.items():
|
|
vg_id = vg_slug_to_id[vg_slug]
|
|
game_slugs = list(vg_info["games"].keys())
|
|
|
|
# Use the first game's route JSON for the shared route structure
|
|
first_game_slug = game_slugs[0]
|
|
routes_file = DATA_DIR / f"{first_game_slug}.json"
|
|
if not routes_file.exists():
|
|
print(f" {vg_slug}: no route data ({first_game_slug}.json), skipping")
|
|
continue
|
|
|
|
routes_data = load_json(f"{first_game_slug}.json")
|
|
if not routes_data:
|
|
print(f" {vg_slug}: empty route data, skipping")
|
|
continue
|
|
|
|
# Upsert routes once per version group
|
|
route_map = await upsert_routes(session, vg_id, routes_data)
|
|
route_maps_by_vg[vg_id] = route_map
|
|
total_routes += len(route_map)
|
|
print(f" {vg_slug}: {len(route_map)} routes")
|
|
|
|
# Upsert encounters per game (each game may have different encounters)
|
|
for game_slug in game_slugs:
|
|
game_id = slug_to_id.get(game_slug)
|
|
if game_id is None:
|
|
print(f" Warning: game '{game_slug}' not found, skipping")
|
|
continue
|
|
|
|
game_routes_file = DATA_DIR / f"{game_slug}.json"
|
|
if not game_routes_file.exists():
|
|
continue
|
|
|
|
game_routes_data = load_json(f"{game_slug}.json")
|
|
for route in game_routes_data:
|
|
route_id = route_map.get(route["name"])
|
|
if route_id is None:
|
|
print(f" Warning: route '{route['name']}' not found")
|
|
continue
|
|
|
|
# Parent routes may have empty encounters
|
|
if route["encounters"]:
|
|
enc_count = await upsert_route_encounters(
|
|
session,
|
|
route_id,
|
|
route["encounters"],
|
|
dex_to_id,
|
|
game_id,
|
|
)
|
|
total_encounters += enc_count
|
|
|
|
# Handle child routes
|
|
for child in route.get("children", []):
|
|
child_id = route_map.get(child["name"])
|
|
if child_id is None:
|
|
print(
|
|
f" Warning: child route '{child['name']}' not found"
|
|
)
|
|
continue
|
|
|
|
enc_count = await upsert_route_encounters(
|
|
session,
|
|
child_id,
|
|
child["encounters"],
|
|
dex_to_id,
|
|
game_id,
|
|
)
|
|
total_encounters += enc_count
|
|
|
|
print(f" {game_slug}: encounters loaded")
|
|
|
|
print(f"\nTotal routes: {total_routes}")
|
|
print(f"Total encounters: {total_encounters}")
|
|
|
|
# 5. Per version group: upsert bosses
|
|
total_bosses = 0
|
|
for vg_slug, vg_info in vg_data.items():
|
|
vg_id = vg_slug_to_id[vg_slug]
|
|
first_game_slug = list(vg_info["games"].keys())[0]
|
|
bosses_file = DATA_DIR / f"{first_game_slug}-bosses.json"
|
|
if not bosses_file.exists():
|
|
continue
|
|
|
|
bosses_data = load_json(f"{first_game_slug}-bosses.json")
|
|
if not bosses_data:
|
|
continue
|
|
|
|
route_name_to_id = route_maps_by_vg.get(vg_id, {})
|
|
boss_count = await upsert_bosses(
|
|
session, vg_id, bosses_data, dex_to_id, route_name_to_id, slug_to_id
|
|
)
|
|
total_bosses += boss_count
|
|
print(f" {vg_slug}: {boss_count} bosses")
|
|
|
|
print(f"Total bosses: {total_bosses}")
|
|
|
|
# 6. Upsert evolutions
|
|
evolutions_path = DATA_DIR / "evolutions.json"
|
|
if evolutions_path.exists():
|
|
evolutions_data = load_json("evolutions.json")
|
|
evo_count = await upsert_evolutions(session, evolutions_data, dex_to_id)
|
|
print(f"Evolutions: {evo_count} upserted")
|
|
else:
|
|
print("No evolutions.json found, skipping evolutions")
|
|
|
|
print("Seed complete!")
|
|
|
|
|
|
async def verify():
|
|
"""Run post-seed verification checks."""
|
|
print("\n--- Verification ---")
|
|
|
|
async with async_session() as session:
|
|
# Overall counts
|
|
vg_count = (await session.execute(select(func.count(VersionGroup.id)))).scalar()
|
|
games_count = (await session.execute(select(func.count(Game.id)))).scalar()
|
|
pokemon_count = (await session.execute(select(func.count(Pokemon.id)))).scalar()
|
|
routes_count = (await session.execute(select(func.count(Route.id)))).scalar()
|
|
enc_count = (
|
|
await session.execute(select(func.count(RouteEncounter.id)))
|
|
).scalar()
|
|
boss_count = (await session.execute(select(func.count(BossBattle.id)))).scalar()
|
|
|
|
print(f"Version Groups: {vg_count}")
|
|
print(f"Games: {games_count}")
|
|
print(f"Pokemon: {pokemon_count}")
|
|
print(f"Routes: {routes_count}")
|
|
print(f"Route Encounters: {enc_count}")
|
|
print(f"Boss Battles: {boss_count}")
|
|
|
|
# Per-version-group route counts
|
|
result = await session.execute(
|
|
select(VersionGroup.slug, func.count(Route.id))
|
|
.join(Route, Route.version_group_id == VersionGroup.id)
|
|
.group_by(VersionGroup.slug)
|
|
.order_by(VersionGroup.slug)
|
|
)
|
|
print("\nRoutes per version group:")
|
|
for row in result:
|
|
print(f" {row[0]}: {row[1]}")
|
|
|
|
# Per-game encounter counts
|
|
result = await session.execute(
|
|
select(Game.name, func.count(RouteEncounter.id))
|
|
.join(RouteEncounter, RouteEncounter.game_id == Game.id)
|
|
.group_by(Game.name)
|
|
.order_by(Game.name)
|
|
)
|
|
print("\nEncounters per game:")
|
|
for row in result:
|
|
print(f" {row[0]}: {row[1]}")
|
|
|
|
# Per-version-group boss counts
|
|
result = await session.execute(
|
|
select(VersionGroup.slug, func.count(BossBattle.id))
|
|
.join(BossBattle, BossBattle.version_group_id == VersionGroup.id)
|
|
.group_by(VersionGroup.slug)
|
|
.order_by(VersionGroup.slug)
|
|
)
|
|
print("\nBosses per version group:")
|
|
for row in result:
|
|
print(f" {row[0]}: {row[1]}")
|
|
|
|
print("\nVerification complete!")
|
|
|
|
|
|
def _write_json(filename: str, data) -> Path:
|
|
"""Write data as JSON to DATA_DIR, return the path."""
|
|
out_path = DATA_DIR / filename
|
|
with open(out_path, "w") as f:
|
|
json.dump(data, f, indent=2)
|
|
f.write("\n")
|
|
return out_path
|
|
|
|
|
|
async def export_all():
|
|
"""Export all seed data from the database to JSON files."""
|
|
async with async_session() as session:
|
|
with open(VG_JSON) as f:
|
|
vg_data = json.load(f)
|
|
|
|
await _export_games(session)
|
|
await _export_pokemon(session)
|
|
await _export_evolutions(session)
|
|
await _export_routes(session, vg_data)
|
|
await _export_bosses(session, vg_data)
|
|
|
|
print("Export complete!")
|
|
|
|
|
|
async def _export_games(session: AsyncSession):
|
|
"""Export games to games.json."""
|
|
result = await session.execute(select(Game).order_by(Game.name))
|
|
games = result.scalars().all()
|
|
|
|
data = [
|
|
{
|
|
"name": g.name,
|
|
"slug": g.slug,
|
|
"generation": g.generation,
|
|
"region": g.region,
|
|
"release_year": g.release_year,
|
|
"color": g.color,
|
|
}
|
|
for g in games
|
|
]
|
|
|
|
_write_json("games.json", data)
|
|
print(f"Games: {len(data)} exported")
|
|
|
|
|
|
async def _export_pokemon(session: AsyncSession):
|
|
"""Export pokemon to pokemon.json."""
|
|
result = await session.execute(select(Pokemon).order_by(Pokemon.pokeapi_id))
|
|
pokemon_list = result.scalars().all()
|
|
|
|
data = [
|
|
{
|
|
"pokeapi_id": p.pokeapi_id,
|
|
"national_dex": p.national_dex,
|
|
"name": p.name,
|
|
"types": p.types,
|
|
"sprite_url": p.sprite_url,
|
|
}
|
|
for p in pokemon_list
|
|
]
|
|
|
|
_write_json("pokemon.json", data)
|
|
print(f"Pokemon: {len(data)} exported")
|
|
|
|
|
|
async def _export_evolutions(session: AsyncSession):
|
|
"""Export evolutions to evolutions.json."""
|
|
result = await session.execute(
|
|
select(Evolution)
|
|
.options(
|
|
selectinload(Evolution.from_pokemon),
|
|
selectinload(Evolution.to_pokemon),
|
|
)
|
|
.order_by(Evolution.id)
|
|
)
|
|
evolutions = result.scalars().all()
|
|
|
|
data = [
|
|
{
|
|
"from_pokeapi_id": e.from_pokemon.pokeapi_id,
|
|
"to_pokeapi_id": e.to_pokemon.pokeapi_id,
|
|
"trigger": e.trigger,
|
|
"min_level": e.min_level,
|
|
"item": e.item,
|
|
"held_item": e.held_item,
|
|
"condition": e.condition,
|
|
"region": e.region,
|
|
}
|
|
for e in evolutions
|
|
]
|
|
|
|
_write_json("evolutions.json", data)
|
|
print(f"Evolutions: {len(data)} exported")
|
|
|
|
|
|
async def _export_routes(session: AsyncSession, vg_data: dict):
|
|
"""Export routes and encounters per game."""
|
|
# Get all games keyed by slug
|
|
game_result = await session.execute(select(Game))
|
|
games_by_slug = {g.slug: g for g in game_result.scalars().all()}
|
|
|
|
exported = 0
|
|
for _vg_slug, vg_info in vg_data.items():
|
|
for game_slug in vg_info["games"]:
|
|
game = games_by_slug.get(game_slug)
|
|
if game is None or game.version_group_id is None:
|
|
continue
|
|
|
|
# Load routes for this version group with encounters + pokemon
|
|
result = await session.execute(
|
|
select(Route)
|
|
.where(Route.version_group_id == game.version_group_id)
|
|
.options(
|
|
selectinload(Route.route_encounters).selectinload(
|
|
RouteEncounter.pokemon
|
|
),
|
|
)
|
|
.order_by(Route.order)
|
|
)
|
|
routes = result.scalars().all()
|
|
|
|
if not routes:
|
|
continue
|
|
|
|
parent_routes = [r for r in routes if r.parent_route_id is None]
|
|
children_by_parent: dict[int, list[Route]] = {}
|
|
for r in routes:
|
|
if r.parent_route_id is not None:
|
|
children_by_parent.setdefault(r.parent_route_id, []).append(r)
|
|
|
|
def format_encounters(route: Route, _game: Game = game) -> list[dict]:
|
|
game_encounters = [
|
|
enc for enc in route.route_encounters if enc.game_id == _game.id
|
|
]
|
|
return [
|
|
{
|
|
"pokeapi_id": enc.pokemon.pokeapi_id,
|
|
"pokemon_name": enc.pokemon.name,
|
|
"method": enc.encounter_method,
|
|
"encounter_rate": enc.encounter_rate,
|
|
"min_level": enc.min_level,
|
|
"max_level": enc.max_level,
|
|
}
|
|
for enc in sorted(game_encounters, key=lambda e: -e.encounter_rate)
|
|
]
|
|
|
|
def format_child(route: Route) -> dict:
|
|
data: dict = {
|
|
"name": route.name,
|
|
"order": route.order,
|
|
"encounters": format_encounters(route),
|
|
}
|
|
if route.pinwheel_zone is not None:
|
|
data["pinwheel_zone"] = route.pinwheel_zone
|
|
return data
|
|
|
|
def format_route(
|
|
route: Route,
|
|
_children_by_parent: dict[int, list[Route]] = children_by_parent,
|
|
) -> dict:
|
|
data: dict = {
|
|
"name": route.name,
|
|
"order": route.order,
|
|
"encounters": format_encounters(route),
|
|
}
|
|
children = _children_by_parent.get(route.id, [])
|
|
if children:
|
|
data["children"] = [
|
|
format_child(c)
|
|
for c in sorted(children, key=lambda route: route.order)
|
|
]
|
|
return data
|
|
|
|
route_data = [format_route(r) for r in parent_routes]
|
|
_write_json(f"{game_slug}.json", route_data)
|
|
exported += 1
|
|
|
|
print(f"Routes: {exported} game files exported")
|
|
|
|
|
|
FRONTEND_PUBLIC = Path(__file__).resolve().parents[4] / "frontend" / "public"
|
|
|
|
|
|
def _slugify(name: str) -> str:
|
|
"""Convert a name to a filename-safe slug: lowercase, hyphens, no special chars."""
|
|
slug = name.lower().replace(" ", "-")
|
|
slug = re.sub(r"[^a-z0-9-]", "", slug)
|
|
return slug
|
|
|
|
|
|
def _download_image(
|
|
url: str,
|
|
output_dir: Path,
|
|
slug: str,
|
|
downloaded: set[str],
|
|
) -> str:
|
|
"""Download an image to output_dir/slug.ext if not already downloaded.
|
|
|
|
Returns the local path (relative to frontend/public).
|
|
"""
|
|
# Already a local path — check the file exists, otherwise re-derive the path
|
|
if url.startswith("/"):
|
|
local = FRONTEND_PUBLIC / url.lstrip("/")
|
|
if local.exists():
|
|
return url
|
|
# File missing — can't re-download without the original URL, keep as-is
|
|
return url
|
|
|
|
url_ext = url.rsplit(".", 1)[-1].split("?")[0].lower()
|
|
if url_ext in ("png", "jpg", "jpeg", "gif", "webp", "svg"):
|
|
ext = f".{url_ext}"
|
|
else:
|
|
ext = ".png"
|
|
|
|
filename = f"{slug}{ext}"
|
|
dest = output_dir / filename
|
|
|
|
if filename not in downloaded:
|
|
output_dir.mkdir(parents=True, exist_ok=True)
|
|
req = urllib.request.Request(
|
|
url, headers={"User-Agent": "nuzlocke-tracker/1.0"}
|
|
)
|
|
try:
|
|
with urllib.request.urlopen(req, timeout=30) as resp:
|
|
dest.write_bytes(resp.read())
|
|
except (urllib.error.URLError, OSError) as exc:
|
|
print(f" Warning: failed to download {url}: {exc}")
|
|
return url
|
|
downloaded.add(filename)
|
|
print(f" Downloaded: {dest.relative_to(FRONTEND_PUBLIC)}")
|
|
|
|
return f"/{dest.relative_to(FRONTEND_PUBLIC)}"
|
|
|
|
|
|
async def _export_bosses(session: AsyncSession, vg_data: dict):
|
|
"""Export boss battles per version group."""
|
|
vg_result = await session.execute(select(VersionGroup))
|
|
slug_to_vg = {vg.slug: vg for vg in vg_result.scalars().all()}
|
|
|
|
badge_dir = FRONTEND_PUBLIC / "badges"
|
|
downloaded_badges: set[str] = set()
|
|
|
|
exported = 0
|
|
for vg_slug, vg_info in vg_data.items():
|
|
vg = slug_to_vg.get(vg_slug)
|
|
if vg is None:
|
|
continue
|
|
|
|
result = await session.execute(
|
|
select(BossBattle)
|
|
.where(BossBattle.version_group_id == vg.id)
|
|
.options(
|
|
selectinload(BossBattle.pokemon).selectinload(BossPokemon.pokemon),
|
|
selectinload(BossBattle.after_route),
|
|
selectinload(BossBattle.game),
|
|
)
|
|
.order_by(BossBattle.order)
|
|
)
|
|
bosses = result.scalars().all()
|
|
|
|
if not bosses:
|
|
continue
|
|
|
|
first_game_slug = list(vg_info["games"].keys())[0]
|
|
sprite_dir = FRONTEND_PUBLIC / "boss-sprites" / first_game_slug
|
|
downloaded_sprites: set[str] = set()
|
|
data = []
|
|
for b in bosses:
|
|
badge_image_url = b.badge_image_url
|
|
sprite_url = b.sprite_url
|
|
|
|
if badge_image_url and b.badge_name:
|
|
badge_slug = _slugify(b.badge_name)
|
|
badge_image_url = _download_image(
|
|
badge_image_url,
|
|
badge_dir,
|
|
badge_slug,
|
|
downloaded_badges,
|
|
)
|
|
|
|
if sprite_url:
|
|
sprite_slug = _slugify(b.name)
|
|
sprite_url = _download_image(
|
|
sprite_url,
|
|
sprite_dir,
|
|
sprite_slug,
|
|
downloaded_sprites,
|
|
)
|
|
|
|
boss_dict: dict = {
|
|
"name": b.name,
|
|
"boss_type": b.boss_type,
|
|
"specialty_type": b.specialty_type,
|
|
"badge_name": b.badge_name,
|
|
"badge_image_url": badge_image_url,
|
|
"level_cap": b.level_cap,
|
|
"order": b.order,
|
|
"after_route_name": b.after_route.name if b.after_route else None,
|
|
"location": b.location,
|
|
"section": b.section,
|
|
"sprite_url": sprite_url,
|
|
"pokemon": [
|
|
{
|
|
"pokeapi_id": bp.pokemon.pokeapi_id,
|
|
"pokemon_name": bp.pokemon.name,
|
|
"level": bp.level,
|
|
"order": bp.order,
|
|
}
|
|
for bp in sorted(b.pokemon, key=lambda p: p.order)
|
|
],
|
|
}
|
|
if b.game_id:
|
|
boss_dict["game_slug"] = b.game.slug
|
|
data.append(boss_dict)
|
|
|
|
_write_json(f"{first_game_slug}-bosses.json", data)
|
|
exported += 1
|
|
|
|
print(f"Bosses: {exported} version group files exported")
|