Add database schema with SQLAlchemy async + Alembic migrations

Set up PostgreSQL database layer with async SQLAlchemy 2.0 and asyncpg driver.
Implements 6 core tables (games, routes, pokemon, route_encounters, nuzlocke_runs,
encounters) with foreign keys, indexes, and an initial Alembic migration.

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
This commit is contained in:
Julian Tabel
2026-02-05 13:29:34 +01:00
parent a87d2ef523
commit d94364d6ce
19 changed files with 649 additions and 19 deletions

150
backend/alembic.ini Normal file
View File

@@ -0,0 +1,150 @@
# A generic, single database configuration.
[alembic]
# path to migration scripts.
# this is typically a path given in POSIX (e.g. forward slashes)
# format, relative to the token %(here)s which refers to the location of this
# ini file
script_location = %(here)s/src/app/alembic
# template used to generate migration file names; The default value is %%(rev)s_%%(slug)s
# Uncomment the line below if you want the files to be prepended with date and time
# see https://alembic.sqlalchemy.org/en/latest/tutorial.html#editing-the-ini-file
# for all available tokens
# file_template = %%(year)d_%%(month).2d_%%(day).2d_%%(hour).2d%%(minute).2d-%%(rev)s_%%(slug)s
# Or organize into date-based subdirectories (requires recursive_version_locations = true)
# file_template = %%(year)d/%%(month).2d/%%(day).2d_%%(hour).2d%%(minute).2d_%%(second).2d_%%(rev)s_%%(slug)s
# sys.path path, will be prepended to sys.path if present.
# defaults to the current working directory. for multiple paths, the path separator
# is defined by "path_separator" below.
prepend_sys_path = src
# timezone to use when rendering the date within the migration file
# as well as the filename.
# If specified, requires the tzdata library which can be installed by adding
# `alembic[tz]` to the pip requirements.
# string value is passed to ZoneInfo()
# leave blank for localtime
# timezone =
# max length of characters to apply to the "slug" field
# truncate_slug_length = 40
# set to 'true' to run the environment during
# the 'revision' command, regardless of autogenerate
# revision_environment = false
# set to 'true' to allow .pyc and .pyo files without
# a source .py file to be detected as revisions in the
# versions/ directory
# sourceless = false
# version location specification; This defaults
# to <script_location>/versions. When using multiple version
# directories, initial revisions must be specified with --version-path.
# The path separator used here should be the separator specified by "path_separator"
# below.
# version_locations = %(here)s/bar:%(here)s/bat:%(here)s/alembic/versions
# path_separator; This indicates what character is used to split lists of file
# paths, including version_locations and prepend_sys_path within configparser
# files such as alembic.ini.
# The default rendered in new alembic.ini files is "os", which uses os.pathsep
# to provide os-dependent path splitting.
#
# Note that in order to support legacy alembic.ini files, this default does NOT
# take place if path_separator is not present in alembic.ini. If this
# option is omitted entirely, fallback logic is as follows:
#
# 1. Parsing of the version_locations option falls back to using the legacy
# "version_path_separator" key, which if absent then falls back to the legacy
# behavior of splitting on spaces and/or commas.
# 2. Parsing of the prepend_sys_path option falls back to the legacy
# behavior of splitting on spaces, commas, or colons.
#
# Valid values for path_separator are:
#
# path_separator = :
# path_separator = ;
# path_separator = space
# path_separator = newline
#
# Use os.pathsep. Default configuration used for new projects.
path_separator = os
# set to 'true' to search source files recursively
# in each "version_locations" directory
# new in Alembic version 1.10
# recursive_version_locations = false
# the output encoding used when revision files
# are written from script.py.mako
# output_encoding = utf-8
# database URL. This is consumed by the user-maintained env.py script only.
# other means of configuring database URLs may be customized within the env.py
# file.
# URL is set programmatically in env.py from app settings
# sqlalchemy.url =
[post_write_hooks]
# post_write_hooks defines scripts or Python functions that are run
# on newly generated revision scripts. See the documentation for further
# detail and examples
# format using "black" - use the console_scripts runner, against the "black" entrypoint
# hooks = black
# black.type = console_scripts
# black.entrypoint = black
# black.options = -l 79 REVISION_SCRIPT_FILENAME
# lint with attempts to fix using "ruff" - use the module runner, against the "ruff" module
# hooks = ruff
# ruff.type = module
# ruff.module = ruff
# ruff.options = check --fix REVISION_SCRIPT_FILENAME
# Alternatively, use the exec runner to execute a binary found on your PATH
# hooks = ruff
# ruff.type = exec
# ruff.executable = ruff
# ruff.options = check --fix REVISION_SCRIPT_FILENAME
# Logging configuration. This is also consumed by the user-maintained
# env.py script only.
[loggers]
keys = root,sqlalchemy,alembic
[handlers]
keys = console
[formatters]
keys = generic
[logger_root]
level = WARNING
handlers = console
qualname =
[logger_sqlalchemy]
level = WARNING
handlers =
qualname = sqlalchemy.engine
[logger_alembic]
level = INFO
handlers =
qualname = alembic
[handler_console]
class = StreamHandler
args = (sys.stderr,)
level = NOTSET
formatter = generic
[formatter_generic]
format = %(levelname)-5.5s [%(name)s] %(message)s
datefmt = %H:%M:%S

View File

@@ -10,6 +10,9 @@ dependencies = [
"pydantic>=2.10.0",
"pydantic-settings>=2.7.0",
"python-dotenv>=1.0.0",
"sqlalchemy[asyncio]>=2.0.0",
"asyncpg>=0.30.0",
"alembic>=1.14.0",
]
[project.optional-dependencies]

View File

@@ -0,0 +1 @@
Generic single-database configuration.

View File

@@ -0,0 +1,61 @@
import asyncio
from logging.config import fileConfig
from sqlalchemy import pool
from sqlalchemy.ext.asyncio import async_engine_from_config
from alembic import context
from app.core.config import settings
from app.core.database import Base, _get_async_url
# Import all models so Base.metadata is populated
import app.models # noqa: F401
config = context.config
if config.config_file_name is not None:
fileConfig(config.config_file_name)
config.set_main_option("sqlalchemy.url", _get_async_url(settings.database_url))
target_metadata = Base.metadata
def run_migrations_offline() -> None:
url = config.get_main_option("sqlalchemy.url")
context.configure(
url=url,
target_metadata=target_metadata,
literal_binds=True,
dialect_opts={"paramstyle": "named"},
)
with context.begin_transaction():
context.run_migrations()
def do_run_migrations(connection):
context.configure(connection=connection, target_metadata=target_metadata)
with context.begin_transaction():
context.run_migrations()
async def run_migrations_online() -> None:
connectable = async_engine_from_config(
config.get_section(config.config_ini_section, {}),
prefix="sqlalchemy.",
poolclass=pool.NullPool,
)
async with connectable.connect() as connection:
await connection.run_sync(do_run_migrations)
await connectable.dispose()
if context.is_offline_mode():
run_migrations_offline()
else:
asyncio.run(run_migrations_online())

View File

@@ -0,0 +1,28 @@
"""${message}
Revision ID: ${up_revision}
Revises: ${down_revision | comma,n}
Create Date: ${create_date}
"""
from typing import Sequence, Union
from alembic import op
import sqlalchemy as sa
${imports if imports else ""}
# revision identifiers, used by Alembic.
revision: str = ${repr(up_revision)}
down_revision: Union[str, Sequence[str], None] = ${repr(down_revision)}
branch_labels: Union[str, Sequence[str], None] = ${repr(branch_labels)}
depends_on: Union[str, Sequence[str], None] = ${repr(depends_on)}
def upgrade() -> None:
"""Upgrade schema."""
${upgrades if upgrades else "pass"}
def downgrade() -> None:
"""Downgrade schema."""
${downgrades if downgrades else "pass"}

View File

@@ -0,0 +1,155 @@
"""initial schema
Revision ID: 03e5f186a9d5
Revises:
Create Date: 2026-02-05 13:27:47.649534
"""
from typing import Sequence, Union
import sqlalchemy as sa
from sqlalchemy.dialects import postgresql
from alembic import op
# revision identifiers, used by Alembic.
revision: str = "03e5f186a9d5"
down_revision: Union[str, Sequence[str], None] = None
branch_labels: Union[str, Sequence[str], None] = None
depends_on: Union[str, Sequence[str], None] = None
def upgrade() -> None:
op.create_table(
"games",
sa.Column("id", sa.Integer(), primary_key=True),
sa.Column("name", sa.String(100), nullable=False),
sa.Column("slug", sa.String(100), nullable=False, unique=True),
sa.Column("generation", sa.SmallInteger(), nullable=False),
sa.Column("region", sa.String(50), nullable=False),
sa.Column("box_art_url", sa.String(500), nullable=True),
sa.Column("release_year", sa.SmallInteger(), nullable=True),
)
op.create_table(
"routes",
sa.Column("id", sa.Integer(), primary_key=True),
sa.Column("name", sa.String(100), nullable=False),
sa.Column(
"game_id", sa.Integer(), sa.ForeignKey("games.id"), nullable=False
),
sa.Column("order", sa.SmallInteger(), nullable=False),
)
op.create_index("ix_routes_game_id", "routes", ["game_id"])
op.create_table(
"pokemon",
sa.Column("id", sa.Integer(), primary_key=True),
sa.Column(
"national_dex", sa.SmallInteger(), nullable=False, unique=True
),
sa.Column("name", sa.String(50), nullable=False),
sa.Column(
"types", postgresql.ARRAY(sa.String(20)), nullable=False
),
sa.Column("sprite_url", sa.String(500), nullable=True),
)
op.create_table(
"route_encounters",
sa.Column("id", sa.Integer(), primary_key=True),
sa.Column(
"route_id", sa.Integer(), sa.ForeignKey("routes.id"), nullable=False
),
sa.Column(
"pokemon_id",
sa.Integer(),
sa.ForeignKey("pokemon.id"),
nullable=False,
),
sa.Column("encounter_method", sa.String(30), nullable=False),
sa.Column("encounter_rate", sa.SmallInteger(), nullable=False),
sa.UniqueConstraint(
"route_id",
"pokemon_id",
"encounter_method",
name="uq_route_pokemon_method",
),
)
op.create_index(
"ix_route_encounters_route_id", "route_encounters", ["route_id"]
)
op.create_index(
"ix_route_encounters_pokemon_id", "route_encounters", ["pokemon_id"]
)
op.create_table(
"nuzlocke_runs",
sa.Column("id", sa.Integer(), primary_key=True),
sa.Column(
"game_id", sa.Integer(), sa.ForeignKey("games.id"), nullable=False
),
sa.Column("name", sa.String(100), nullable=False),
sa.Column("status", sa.String(20), nullable=False),
sa.Column(
"rules", postgresql.JSONB(), nullable=False, server_default="{}"
),
sa.Column(
"started_at",
sa.DateTime(timezone=True),
nullable=False,
server_default=sa.func.now(),
),
sa.Column(
"completed_at", sa.DateTime(timezone=True), nullable=True
),
)
op.create_index(
"ix_nuzlocke_runs_game_id", "nuzlocke_runs", ["game_id"]
)
op.create_index(
"ix_nuzlocke_runs_status", "nuzlocke_runs", ["status"]
)
op.create_table(
"encounters",
sa.Column("id", sa.Integer(), primary_key=True),
sa.Column(
"run_id",
sa.Integer(),
sa.ForeignKey("nuzlocke_runs.id"),
nullable=False,
),
sa.Column(
"route_id", sa.Integer(), sa.ForeignKey("routes.id"), nullable=False
),
sa.Column(
"pokemon_id",
sa.Integer(),
sa.ForeignKey("pokemon.id"),
nullable=False,
),
sa.Column("nickname", sa.String(50), nullable=True),
sa.Column("status", sa.String(20), nullable=False),
sa.Column("catch_level", sa.SmallInteger(), nullable=True),
sa.Column("faint_level", sa.SmallInteger(), nullable=True),
sa.Column(
"caught_at",
sa.DateTime(timezone=True),
nullable=False,
server_default=sa.func.now(),
),
)
op.create_index("ix_encounters_run_id", "encounters", ["run_id"])
op.create_index("ix_encounters_route_id", "encounters", ["route_id"])
op.create_index("ix_encounters_pokemon_id", "encounters", ["pokemon_id"])
def downgrade() -> None:
op.drop_table("encounters")
op.drop_table("nuzlocke_runs")
op.drop_table("route_encounters")
op.drop_table("pokemon")
op.drop_table("routes")
op.drop_table("games")

View File

@@ -1,12 +1,22 @@
from fastapi import APIRouter
from sqlalchemy import text
from app.core.database import async_session
router = APIRouter(tags=["health"])
@router.get("/health")
async def health_check():
"""Health check endpoint."""
return {"status": "healthy"}
"""Health check endpoint with database connectivity verification."""
try:
async with async_session() as session:
await session.execute(text("SELECT 1"))
db_status = "connected"
except Exception:
db_status = "disconnected"
return {"status": "healthy", "database": db_status}
@router.get("/")

View File

@@ -14,8 +14,8 @@ class Settings(BaseSettings):
# API settings
api_v1_prefix: str = "/api/v1"
# Database settings (for future use)
database_url: str = "sqlite:///./nuzlocke.db"
# Database settings
database_url: str = "postgresql+asyncpg://postgres:postgres@localhost:5432/nuzlocke"
settings = Settings()

View File

@@ -0,0 +1,31 @@
from collections.abc import AsyncGenerator
from sqlalchemy.ext.asyncio import AsyncSession, async_sessionmaker, create_async_engine
from sqlalchemy.orm import DeclarativeBase
from app.core.config import settings
def _get_async_url(url: str) -> str:
"""Ensure the database URL uses the asyncpg driver."""
if url.startswith("postgresql://"):
return url.replace("postgresql://", "postgresql+asyncpg://", 1)
return url
engine = create_async_engine(
_get_async_url(settings.database_url),
echo=settings.debug,
pool_pre_ping=True,
)
async_session = async_sessionmaker(engine, expire_on_commit=False)
class Base(DeclarativeBase):
pass
async def get_session() -> AsyncGenerator[AsyncSession]:
async with async_session() as session:
yield session

View File

@@ -1,14 +1,25 @@
from contextlib import asynccontextmanager
from fastapi import FastAPI
from fastapi.middleware.cors import CORSMiddleware
from app.api.routes import api_router
from app.core.config import settings
from app.core.database import engine
@asynccontextmanager
async def lifespan(app: FastAPI):
yield
await engine.dispose()
app = FastAPI(
title=settings.app_name,
openapi_url=f"{settings.api_v1_prefix}/openapi.json",
docs_url="/docs",
redoc_url="/redoc",
lifespan=lifespan,
)
# Configure CORS

View File

@@ -0,0 +1,15 @@
from app.models.encounter import Encounter
from app.models.game import Game
from app.models.nuzlocke_run import NuzlockeRun
from app.models.pokemon import Pokemon
from app.models.route import Route
from app.models.route_encounter import RouteEncounter
__all__ = [
"Encounter",
"Game",
"NuzlockeRun",
"Pokemon",
"Route",
"RouteEncounter",
]

View File

@@ -0,0 +1,29 @@
from datetime import datetime
from sqlalchemy import DateTime, ForeignKey, SmallInteger, String, func
from sqlalchemy.orm import Mapped, mapped_column, relationship
from app.core.database import Base
class Encounter(Base):
__tablename__ = "encounters"
id: Mapped[int] = mapped_column(primary_key=True)
run_id: Mapped[int] = mapped_column(ForeignKey("nuzlocke_runs.id"), index=True)
route_id: Mapped[int] = mapped_column(ForeignKey("routes.id"), index=True)
pokemon_id: Mapped[int] = mapped_column(ForeignKey("pokemon.id"), index=True)
nickname: Mapped[str | None] = mapped_column(String(50))
status: Mapped[str] = mapped_column(String(20)) # caught, fainted, missed
catch_level: Mapped[int | None] = mapped_column(SmallInteger)
faint_level: Mapped[int | None] = mapped_column(SmallInteger)
caught_at: Mapped[datetime] = mapped_column(
DateTime(timezone=True), server_default=func.now()
)
run: Mapped["NuzlockeRun"] = relationship(back_populates="encounters")
route: Mapped["Route"] = relationship(back_populates="encounters")
pokemon: Mapped["Pokemon"] = relationship(back_populates="encounters")
def __repr__(self) -> str:
return f"<Encounter(id={self.id}, pokemon_id={self.pokemon_id}, status='{self.status}')>"

View File

@@ -0,0 +1,22 @@
from sqlalchemy import SmallInteger, String
from sqlalchemy.orm import Mapped, mapped_column, relationship
from app.core.database import Base
class Game(Base):
__tablename__ = "games"
id: Mapped[int] = mapped_column(primary_key=True)
name: Mapped[str] = mapped_column(String(100))
slug: Mapped[str] = mapped_column(String(100), unique=True)
generation: Mapped[int] = mapped_column(SmallInteger)
region: Mapped[str] = mapped_column(String(50))
box_art_url: Mapped[str | None] = mapped_column(String(500))
release_year: Mapped[int | None] = mapped_column(SmallInteger)
routes: Mapped[list["Route"]] = relationship(back_populates="game")
runs: Mapped[list["NuzlockeRun"]] = relationship(back_populates="game")
def __repr__(self) -> str:
return f"<Game(id={self.id}, name='{self.name}')>"

View File

@@ -0,0 +1,27 @@
from datetime import datetime
from sqlalchemy import DateTime, ForeignKey, String, func
from sqlalchemy.dialects.postgresql import JSONB
from sqlalchemy.orm import Mapped, mapped_column, relationship
from app.core.database import Base
class NuzlockeRun(Base):
__tablename__ = "nuzlocke_runs"
id: Mapped[int] = mapped_column(primary_key=True)
game_id: Mapped[int] = mapped_column(ForeignKey("games.id"), index=True)
name: Mapped[str] = mapped_column(String(100))
status: Mapped[str] = mapped_column(String(20), index=True) # active, completed, failed
rules: Mapped[dict] = mapped_column(JSONB, default=dict)
started_at: Mapped[datetime] = mapped_column(
DateTime(timezone=True), server_default=func.now()
)
completed_at: Mapped[datetime | None] = mapped_column(DateTime(timezone=True))
game: Mapped["Game"] = relationship(back_populates="runs")
encounters: Mapped[list["Encounter"]] = relationship(back_populates="run")
def __repr__(self) -> str:
return f"<NuzlockeRun(id={self.id}, name='{self.name}', status='{self.status}')>"

View File

@@ -0,0 +1,23 @@
from sqlalchemy import SmallInteger, String
from sqlalchemy.dialects.postgresql import ARRAY
from sqlalchemy.orm import Mapped, mapped_column, relationship
from app.core.database import Base
class Pokemon(Base):
__tablename__ = "pokemon"
id: Mapped[int] = mapped_column(primary_key=True)
national_dex: Mapped[int] = mapped_column(SmallInteger, unique=True)
name: Mapped[str] = mapped_column(String(50))
types: Mapped[list[str]] = mapped_column(ARRAY(String(20)))
sprite_url: Mapped[str | None] = mapped_column(String(500))
route_encounters: Mapped[list["RouteEncounter"]] = relationship(
back_populates="pokemon"
)
encounters: Mapped[list["Encounter"]] = relationship(back_populates="pokemon")
def __repr__(self) -> str:
return f"<Pokemon(id={self.id}, name='{self.name}', dex={self.national_dex})>"

View File

@@ -0,0 +1,22 @@
from sqlalchemy import ForeignKey, SmallInteger, String
from sqlalchemy.orm import Mapped, mapped_column, relationship
from app.core.database import Base
class Route(Base):
__tablename__ = "routes"
id: Mapped[int] = mapped_column(primary_key=True)
name: Mapped[str] = mapped_column(String(100))
game_id: Mapped[int] = mapped_column(ForeignKey("games.id"), index=True)
order: Mapped[int] = mapped_column(SmallInteger)
game: Mapped["Game"] = relationship(back_populates="routes")
route_encounters: Mapped[list["RouteEncounter"]] = relationship(
back_populates="route"
)
encounters: Mapped[list["Encounter"]] = relationship(back_populates="route")
def __repr__(self) -> str:
return f"<Route(id={self.id}, name='{self.name}')>"

View File

@@ -0,0 +1,25 @@
from sqlalchemy import ForeignKey, SmallInteger, String, UniqueConstraint
from sqlalchemy.orm import Mapped, mapped_column, relationship
from app.core.database import Base
class RouteEncounter(Base):
__tablename__ = "route_encounters"
__table_args__ = (
UniqueConstraint(
"route_id", "pokemon_id", "encounter_method", name="uq_route_pokemon_method"
),
)
id: Mapped[int] = mapped_column(primary_key=True)
route_id: Mapped[int] = mapped_column(ForeignKey("routes.id"), index=True)
pokemon_id: Mapped[int] = mapped_column(ForeignKey("pokemon.id"), index=True)
encounter_method: Mapped[str] = mapped_column(String(30))
encounter_rate: Mapped[int] = mapped_column(SmallInteger)
route: Mapped["Route"] = relationship(back_populates="route_encounters")
pokemon: Mapped["Pokemon"] = relationship(back_populates="route_encounters")
def __repr__(self) -> str:
return f"<RouteEncounter(route_id={self.route_id}, pokemon_id={self.pokemon_id}, method='{self.encounter_method}')>"