Add per-condition encounter rates to seed data (#26)
Co-authored-by: Julian Tabel <juliantabel.jt@gmail.com> Co-committed-by: Julian Tabel <juliantabel.jt@gmail.com>
This commit was merged in pull request #26.
This commit is contained in:
@@ -84,8 +84,8 @@ ENCOUNTER_METHOD_MAP: dict[str, str] = {
|
||||
"cave-spot": "walk",
|
||||
"bubble-spot": "surf",
|
||||
"sand-spot": "walk",
|
||||
"horde": "walk",
|
||||
"sos-encounter": "walk",
|
||||
"horde": "horde",
|
||||
"sos-encounter": "sos",
|
||||
"ambush": "walk",
|
||||
# Seaweed / diving
|
||||
"diving": "surf",
|
||||
@@ -105,7 +105,7 @@ ENCOUNTER_METHOD_MAP: dict[str, str] = {
|
||||
"dust-cloud": "walk",
|
||||
"hidden-grotto": "static",
|
||||
"hidden-encounter": "walk",
|
||||
"horde-encounter": "walk",
|
||||
"horde-encounter": "horde",
|
||||
"shaking-trees": "walk",
|
||||
"shaking-ore-deposits": "walk",
|
||||
"island-scan": "static",
|
||||
|
||||
@@ -13,16 +13,22 @@ class Encounter:
|
||||
encounter_rate: int
|
||||
min_level: int
|
||||
max_level: int
|
||||
conditions: dict[str, int] | None = None
|
||||
|
||||
def to_dict(self) -> dict:
|
||||
return {
|
||||
d: dict = {
|
||||
"pokeapi_id": self.pokeapi_id,
|
||||
"pokemon_name": self.pokemon_name,
|
||||
"method": self.method,
|
||||
"encounter_rate": self.encounter_rate,
|
||||
"min_level": self.min_level,
|
||||
"max_level": self.max_level,
|
||||
}
|
||||
if self.conditions:
|
||||
d["encounter_rate"] = None
|
||||
d["conditions"] = self.conditions
|
||||
else:
|
||||
d["encounter_rate"] = self.encounter_rate
|
||||
return d
|
||||
|
||||
|
||||
@dataclass
|
||||
|
||||
@@ -65,61 +65,75 @@ def parse_rate(value: str | None) -> int | None:
|
||||
return None
|
||||
|
||||
|
||||
def extract_encounter_rate(record: dict[str, Any], generation: int) -> int:
|
||||
"""Extract a single encounter_rate from a PokeDB encounter record.
|
||||
def extract_encounter_data(
|
||||
record: dict[str, Any],
|
||||
generation: int,
|
||||
) -> tuple[int, dict[str, int] | None]:
|
||||
"""Extract encounter rate and per-condition rates from a PokeDB record.
|
||||
|
||||
Flattens generation-specific rate variants into a single value.
|
||||
Returns (rate, conditions) where:
|
||||
- rate is the max/overall rate (used for sorting and backward compat)
|
||||
- conditions is a dict of {condition_name: rate} or None for flat rates
|
||||
"""
|
||||
# Gen 1/3/6: rate_overall
|
||||
# Gen 1/3/6: rate_overall — flat rate, no conditions
|
||||
rate_overall = parse_rate(record.get("rate_overall"))
|
||||
if rate_overall is not None:
|
||||
return rate_overall
|
||||
return rate_overall, None
|
||||
|
||||
# Gen 2/4: time-of-day rates — take the max
|
||||
time_rates = [
|
||||
parse_rate(record.get("rate_morning")),
|
||||
parse_rate(record.get("rate_day")),
|
||||
parse_rate(record.get("rate_night")),
|
||||
]
|
||||
time_rates = [r for r in time_rates if r is not None]
|
||||
if time_rates:
|
||||
return max(time_rates)
|
||||
# Gen 2/4/7: time-of-day rates
|
||||
time_fields = {
|
||||
"morning": parse_rate(record.get("rate_morning")),
|
||||
"day": parse_rate(record.get("rate_day")),
|
||||
"night": parse_rate(record.get("rate_night")),
|
||||
}
|
||||
time_conditions = {k: v for k, v in time_fields.items() if v is not None}
|
||||
if time_conditions:
|
||||
rate = max(time_conditions.values())
|
||||
return rate, time_conditions
|
||||
|
||||
# Gen 5: seasonal rates — take the max
|
||||
season_rates = [
|
||||
parse_rate(record.get("rate_spring")),
|
||||
parse_rate(record.get("rate_summer")),
|
||||
parse_rate(record.get("rate_autumn")),
|
||||
parse_rate(record.get("rate_winter")),
|
||||
]
|
||||
season_rates = [r for r in season_rates if r is not None]
|
||||
if season_rates:
|
||||
return max(season_rates)
|
||||
# Gen 5: seasonal rates
|
||||
season_fields = {
|
||||
"spring": parse_rate(record.get("rate_spring")),
|
||||
"summer": parse_rate(record.get("rate_summer")),
|
||||
"autumn": parse_rate(record.get("rate_autumn")),
|
||||
"winter": parse_rate(record.get("rate_winter")),
|
||||
}
|
||||
season_conditions = {
|
||||
k: v for k, v in season_fields.items() if v is not None
|
||||
}
|
||||
if season_conditions:
|
||||
rate = max(season_conditions.values())
|
||||
return rate, season_conditions
|
||||
|
||||
# Gen 8 Sw/Sh: weather rates — take the max
|
||||
weather_rates = []
|
||||
# Gen 8 Sw/Sh: weather rates
|
||||
weather_conditions: dict[str, int] = {}
|
||||
for key, val in record.items():
|
||||
if key.startswith("weather_") and key.endswith("_rate") and val:
|
||||
parsed = parse_rate(val)
|
||||
if parsed is not None:
|
||||
weather_rates.append(parsed)
|
||||
if weather_rates:
|
||||
return max(weather_rates)
|
||||
# "weather_clear_rate" -> "clear"
|
||||
condition_name = key[len("weather_"):-len("_rate")]
|
||||
weather_conditions[condition_name] = parsed
|
||||
if weather_conditions:
|
||||
rate = max(weather_conditions.values())
|
||||
return rate, weather_conditions
|
||||
|
||||
# Gen 8 Legends Arceus: boolean conditions → presence-based
|
||||
if record.get("during_any_time") or record.get("during_morning") or \
|
||||
record.get("during_day") or record.get("during_evening") or record.get("during_night"):
|
||||
return 100 # Present under conditions
|
||||
# Gen 8 Legends Arceus: boolean conditions — presence-based
|
||||
if (
|
||||
record.get("during_any_time")
|
||||
or record.get("during_morning")
|
||||
or record.get("during_day")
|
||||
or record.get("during_evening")
|
||||
or record.get("during_night")
|
||||
):
|
||||
return 100, None
|
||||
|
||||
# Gen 9 Sc/Vi: probability weights → normalize
|
||||
# Gen 9 Sc/Vi: probability weights — normalize
|
||||
prob_overall = record.get("probability_overall")
|
||||
if prob_overall:
|
||||
parsed = parse_rate(prob_overall)
|
||||
if parsed is not None:
|
||||
# These are spawn weights (e.g. "20", "300"), not percentages.
|
||||
# We'll normalize them later during aggregation when we have
|
||||
# all encounters for a location. For now, store the raw weight.
|
||||
return parsed
|
||||
return parsed, None
|
||||
|
||||
# Check time-based probability variants
|
||||
prob_rates = [
|
||||
@@ -130,10 +144,10 @@ def extract_encounter_rate(record: dict[str, Any], generation: int) -> int:
|
||||
]
|
||||
prob_rates = [r for r in prob_rates if r is not None]
|
||||
if prob_rates:
|
||||
return max(prob_rates)
|
||||
return max(prob_rates), None
|
||||
|
||||
# Fallback: gift/trade/static encounters with no rate
|
||||
return 100
|
||||
return 100, None
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
@@ -212,8 +226,8 @@ def process_encounters(
|
||||
# Parse levels
|
||||
min_level, max_level = parse_levels(record.get("levels"))
|
||||
|
||||
# Extract rate
|
||||
encounter_rate = extract_encounter_rate(record, generation)
|
||||
# Extract rate and conditions
|
||||
encounter_rate, conditions = extract_encounter_data(record, generation)
|
||||
|
||||
# Location area
|
||||
area_id = record.get("location_area_identifier", "")
|
||||
@@ -227,6 +241,7 @@ def process_encounters(
|
||||
encounter_rate=encounter_rate,
|
||||
min_level=min_level,
|
||||
max_level=max_level,
|
||||
conditions=conditions,
|
||||
)
|
||||
|
||||
by_area.setdefault(area_id, []).append(enc)
|
||||
@@ -234,10 +249,28 @@ def process_encounters(
|
||||
return by_area
|
||||
|
||||
|
||||
def _merge_conditions(
|
||||
a: dict[str, int] | None,
|
||||
b: dict[str, int] | None,
|
||||
) -> dict[str, int] | None:
|
||||
"""Merge two condition dicts by summing rates per key."""
|
||||
if a is None and b is None:
|
||||
return None
|
||||
merged = dict(a or {})
|
||||
for k, v in (b or {}).items():
|
||||
merged[k] = merged.get(k, 0) + v
|
||||
return merged
|
||||
|
||||
|
||||
def _cap_conditions(conditions: dict[str, int]) -> dict[str, int]:
|
||||
"""Cap each condition rate at 100."""
|
||||
return {k: min(v, 100) for k, v in conditions.items()}
|
||||
|
||||
|
||||
def aggregate_encounters(encounters: list[Encounter]) -> list[Encounter]:
|
||||
"""Aggregate encounters by (pokeapi_id, method), merging level ranges and summing rates.
|
||||
|
||||
Replicates the Go tool's aggregation logic.
|
||||
Preserves per-condition rates through aggregation.
|
||||
"""
|
||||
key_type = tuple[int, str]
|
||||
agg: dict[key_type, Encounter] = {}
|
||||
@@ -250,8 +283,10 @@ def aggregate_encounters(encounters: list[Encounter]) -> list[Encounter]:
|
||||
existing.encounter_rate += enc.encounter_rate
|
||||
existing.min_level = min(existing.min_level, enc.min_level)
|
||||
existing.max_level = max(existing.max_level, enc.max_level)
|
||||
existing.conditions = _merge_conditions(
|
||||
existing.conditions, enc.conditions
|
||||
)
|
||||
else:
|
||||
# Copy so we don't mutate the original
|
||||
agg[k] = Encounter(
|
||||
pokeapi_id=enc.pokeapi_id,
|
||||
pokemon_name=enc.pokemon_name,
|
||||
@@ -259,6 +294,7 @@ def aggregate_encounters(encounters: list[Encounter]) -> list[Encounter]:
|
||||
encounter_rate=enc.encounter_rate,
|
||||
min_level=enc.min_level,
|
||||
max_level=enc.max_level,
|
||||
conditions=dict(enc.conditions) if enc.conditions else None,
|
||||
)
|
||||
order.append(k)
|
||||
|
||||
@@ -266,6 +302,9 @@ def aggregate_encounters(encounters: list[Encounter]) -> list[Encounter]:
|
||||
for k in order:
|
||||
e = agg[k]
|
||||
e.encounter_rate = min(e.encounter_rate, 100)
|
||||
if e.conditions:
|
||||
e.conditions = _cap_conditions(e.conditions)
|
||||
e.encounter_rate = max(e.conditions.values())
|
||||
result.append(e)
|
||||
|
||||
# Sort by rate descending, then name ascending
|
||||
|
||||
322
tools/merge-conditions.py
Normal file
322
tools/merge-conditions.py
Normal file
@@ -0,0 +1,322 @@
|
||||
#!/usr/bin/env python3
|
||||
"""Merge per-condition encounter rates from a fresh import into curated seed files.
|
||||
|
||||
Usage:
|
||||
# From repo root (requires PokeDB cache):
|
||||
python tools/merge-conditions.py --game heartgold
|
||||
|
||||
# Process all games that have conditions:
|
||||
python tools/merge-conditions.py --all
|
||||
|
||||
# Dry run (print what would change, don't write):
|
||||
python tools/merge-conditions.py --game heartgold --dry-run
|
||||
"""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
import argparse
|
||||
import json
|
||||
import sys
|
||||
from pathlib import Path
|
||||
|
||||
# Add tools/import-pokedb to sys.path so we can import the library
|
||||
REPO_ROOT = Path(__file__).resolve().parent.parent
|
||||
sys.path.insert(0, str(REPO_ROOT / "tools" / "import-pokedb"))
|
||||
|
||||
from import_pokedb.loader import load_pokedb_data, load_seed_config # noqa: E402
|
||||
from import_pokedb.mappings import ( # noqa: E402
|
||||
LocationMapper,
|
||||
PokemonMapper,
|
||||
build_version_map,
|
||||
)
|
||||
from import_pokedb.processing import ( # noqa: E402
|
||||
build_routes,
|
||||
filter_den_routes,
|
||||
filter_encounters_for_game,
|
||||
process_encounters,
|
||||
)
|
||||
from import_pokedb.output import merge_special_encounters, sort_routes # noqa: E402
|
||||
|
||||
SEEDS_DIR = REPO_ROOT / "backend" / "src" / "app" / "seeds"
|
||||
DATA_DIR = SEEDS_DIR / "data"
|
||||
|
||||
# Games that have per-condition encounter rates
|
||||
CONDITION_GAMES: dict[str, str] = {
|
||||
# Gen 2: morning/day/night
|
||||
"gold": "gold-silver",
|
||||
"silver": "gold-silver",
|
||||
"crystal": "crystal",
|
||||
# Gen 4: morning/day/night
|
||||
"heartgold": "heartgold-soulsilver",
|
||||
"soulsilver": "heartgold-soulsilver",
|
||||
"diamond": "diamond-pearl",
|
||||
"pearl": "diamond-pearl",
|
||||
"platinum": "platinum",
|
||||
"brilliant-diamond": "brilliant-diamond-shining-pearl",
|
||||
"shining-pearl": "brilliant-diamond-shining-pearl",
|
||||
# Gen 5: spring/summer/autumn/winter
|
||||
"black": "black-white",
|
||||
"white": "black-white",
|
||||
"black-2": "black-2-white-2",
|
||||
"white-2": "black-2-white-2",
|
||||
# Gen 6: horde encounters
|
||||
"x": "x-y",
|
||||
"y": "x-y",
|
||||
# Gen 7: day/night + SOS
|
||||
"sun": "sun-moon",
|
||||
"moon": "sun-moon",
|
||||
"ultra-sun": "ultra-sun-ultra-moon",
|
||||
"ultra-moon": "ultra-sun-ultra-moon",
|
||||
# Gen 8: weather
|
||||
"sword": "sword-shield",
|
||||
"shield": "sword-shield",
|
||||
}
|
||||
|
||||
|
||||
def normalize_route_name(name: str) -> str:
|
||||
"""Normalize a route name for fuzzy matching."""
|
||||
return name.lower().strip()
|
||||
|
||||
|
||||
def build_fresh_lookup(
|
||||
game_slug: str,
|
||||
vg_key: str,
|
||||
generation: int,
|
||||
pokedb: object,
|
||||
config: object,
|
||||
pokemon_mapper: PokemonMapper,
|
||||
location_mapper: LocationMapper,
|
||||
) -> dict[str, dict[tuple[int, str], dict[str, int]]]:
|
||||
"""Run the import pipeline and build a conditions lookup.
|
||||
|
||||
Returns: {normalized_route_name: {(pokeapi_id, method): conditions_dict}}
|
||||
"""
|
||||
game_encounters = filter_encounters_for_game(
|
||||
pokedb.encounters, game_slug
|
||||
)
|
||||
if not game_encounters:
|
||||
return {}
|
||||
|
||||
encounters_by_area = process_encounters(
|
||||
game_encounters, generation, pokemon_mapper, location_mapper
|
||||
)
|
||||
|
||||
routes = build_routes(encounters_by_area, location_mapper)
|
||||
|
||||
if vg_key == "sword-shield":
|
||||
routes = filter_den_routes(routes)
|
||||
|
||||
routes = merge_special_encounters(
|
||||
routes, config, vg_key, pokemon_mapper
|
||||
)
|
||||
routes = sort_routes(routes, config, vg_key)
|
||||
|
||||
lookup: dict[str, dict[tuple[int, str], dict[str, int]]] = {}
|
||||
|
||||
def index_route(route):
|
||||
key = normalize_route_name(route.name)
|
||||
enc_map: dict[tuple[int, str], dict[str, int]] = {}
|
||||
for enc in route.encounters:
|
||||
if enc.conditions:
|
||||
enc_map[(enc.pokeapi_id, enc.method)] = enc.conditions
|
||||
if enc_map:
|
||||
lookup[key] = enc_map
|
||||
|
||||
for route in routes:
|
||||
index_route(route)
|
||||
for child in route.children:
|
||||
index_route(child)
|
||||
|
||||
return lookup
|
||||
|
||||
|
||||
def merge_conditions_into_seed(
|
||||
seed_data: list[dict],
|
||||
lookup: dict[str, dict[tuple[int, str], dict[str, int]]],
|
||||
game_slug: str,
|
||||
dry_run: bool = False,
|
||||
) -> tuple[list[dict], int]:
|
||||
"""Merge conditions from lookup into seed data, return (updated_data, count)."""
|
||||
merged_count = 0
|
||||
|
||||
def process_route(route: dict) -> None:
|
||||
nonlocal merged_count
|
||||
route_key = normalize_route_name(route["name"])
|
||||
route_lookup = lookup.get(route_key)
|
||||
|
||||
if route_lookup is None:
|
||||
return
|
||||
|
||||
for enc in route.get("encounters", []):
|
||||
key = (enc["pokeapi_id"], enc["method"])
|
||||
conditions = route_lookup.get(key)
|
||||
if conditions:
|
||||
if dry_run:
|
||||
print(
|
||||
f" {route['name']}: "
|
||||
f"{enc.get('pokemon_name', '?')} ({enc['method']}) "
|
||||
f"-> {conditions}"
|
||||
)
|
||||
enc["conditions"] = conditions
|
||||
enc["encounter_rate"] = None
|
||||
merged_count += 1
|
||||
|
||||
for child in route.get("children", []):
|
||||
process_route(child)
|
||||
|
||||
for route in seed_data:
|
||||
process_route(route)
|
||||
|
||||
return seed_data, merged_count
|
||||
|
||||
|
||||
def process_game(
|
||||
game_slug: str,
|
||||
pokedb,
|
||||
config,
|
||||
pokemon_mapper: PokemonMapper,
|
||||
location_mapper: LocationMapper,
|
||||
version_map: dict[str, str],
|
||||
dry_run: bool = False,
|
||||
) -> int:
|
||||
"""Process a single game. Returns number of encounters merged."""
|
||||
vg_key = CONDITION_GAMES.get(game_slug)
|
||||
if vg_key is None:
|
||||
print(f" Skipping {game_slug}: not a condition game")
|
||||
return 0
|
||||
|
||||
# Find generation
|
||||
vg_info = config.version_groups.get(vg_key)
|
||||
if vg_info is None:
|
||||
print(f" Warning: version group '{vg_key}' not found")
|
||||
return 0
|
||||
|
||||
generation = vg_info.get("generation", 0)
|
||||
|
||||
# Build fresh import lookup
|
||||
lookup = build_fresh_lookup(
|
||||
game_slug,
|
||||
vg_key,
|
||||
generation,
|
||||
pokedb,
|
||||
config,
|
||||
pokemon_mapper,
|
||||
location_mapper,
|
||||
)
|
||||
|
||||
if not lookup:
|
||||
print(" No conditions found in fresh import")
|
||||
return 0
|
||||
|
||||
total_conditions = sum(len(v) for v in lookup.values())
|
||||
print(
|
||||
f" Fresh import: {len(lookup)} routes with conditions, "
|
||||
f"{total_conditions} encounter+condition pairs"
|
||||
)
|
||||
|
||||
# Load existing seed file
|
||||
seed_path = DATA_DIR / f"{game_slug}.json"
|
||||
if not seed_path.exists():
|
||||
print(f" Warning: seed file not found: {seed_path}")
|
||||
return 0
|
||||
|
||||
with open(seed_path) as f:
|
||||
seed_data = json.load(f)
|
||||
|
||||
# Merge
|
||||
updated_data, merged_count = merge_conditions_into_seed(
|
||||
seed_data, lookup, game_slug, dry_run=dry_run
|
||||
)
|
||||
|
||||
if merged_count == 0:
|
||||
print(" No encounters matched for merging")
|
||||
return 0
|
||||
|
||||
print(f" Merged conditions into {merged_count} encounters")
|
||||
|
||||
if not dry_run:
|
||||
with open(seed_path, "w") as f:
|
||||
json.dump(updated_data, f, indent=2, ensure_ascii=False)
|
||||
f.write("\n")
|
||||
print(f" Wrote {seed_path}")
|
||||
|
||||
return merged_count
|
||||
|
||||
|
||||
def main() -> None:
|
||||
parser = argparse.ArgumentParser(
|
||||
description="Merge per-condition encounter rates into seed files."
|
||||
)
|
||||
parser.add_argument(
|
||||
"--game", type=str, help="Process a specific game slug"
|
||||
)
|
||||
parser.add_argument(
|
||||
"--all",
|
||||
action="store_true",
|
||||
help="Process all games with conditions",
|
||||
)
|
||||
parser.add_argument(
|
||||
"--dry-run",
|
||||
action="store_true",
|
||||
help="Print what would change without writing files",
|
||||
)
|
||||
parser.add_argument(
|
||||
"--pokedb-dir",
|
||||
type=Path,
|
||||
default=None,
|
||||
help="Path to PokeDB data directory",
|
||||
)
|
||||
args = parser.parse_args()
|
||||
|
||||
if not args.game and not args.all:
|
||||
parser.error("Specify --game SLUG or --all")
|
||||
|
||||
pokedb_dir = args.pokedb_dir or (SEEDS_DIR / ".pokedb_cache")
|
||||
|
||||
print(f"PokeDB data: {pokedb_dir}")
|
||||
print(f"Seed data: {DATA_DIR}")
|
||||
print()
|
||||
|
||||
# Load PokeDB data
|
||||
pokedb = load_pokedb_data(pokedb_dir)
|
||||
print(pokedb.summary())
|
||||
print()
|
||||
|
||||
# Load seed config
|
||||
config = load_seed_config(SEEDS_DIR)
|
||||
print(f"Loaded {len(config.version_groups)} version groups")
|
||||
print()
|
||||
|
||||
# Build mappings
|
||||
pokemon_json = DATA_DIR / "pokemon.json"
|
||||
pokemon_mapper = PokemonMapper(pokemon_json, pokedb)
|
||||
location_mapper = LocationMapper(pokedb)
|
||||
version_map = build_version_map(pokedb, config.version_groups)
|
||||
|
||||
# Determine games to process
|
||||
if args.game:
|
||||
games = [args.game]
|
||||
else:
|
||||
games = list(CONDITION_GAMES.keys())
|
||||
|
||||
total_merged = 0
|
||||
for game_slug in games:
|
||||
print(f"\n--- {game_slug} ---")
|
||||
count = process_game(
|
||||
game_slug,
|
||||
pokedb,
|
||||
config,
|
||||
pokemon_mapper,
|
||||
location_mapper,
|
||||
version_map,
|
||||
dry_run=args.dry_run,
|
||||
)
|
||||
total_merged += count
|
||||
|
||||
print(f"\nTotal: {total_merged} encounters updated across {len(games)} games")
|
||||
if args.dry_run:
|
||||
print("(dry run — no files written)")
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
main()
|
||||
Reference in New Issue
Block a user