Skip to content

API Reference

Complete API documentation for all chapkit modules, classes, and functions.

Core Layer

Framework-agnostic infrastructure components.

Database

database

Async SQLAlchemy database connection manager.

Database

Generic async SQLAlchemy database connection manager.

Source code in src/chapkit/core/database.py
class Database:
    """Generic async SQLAlchemy database connection manager."""

    def __init__(
        self,
        url: str,
        *,
        echo: bool = False,
        alembic_dir: Path | None = None,
        auto_migrate: bool = True,
        pool_size: int = 5,
        max_overflow: int = 10,
        pool_recycle: int = 3600,
        pool_pre_ping: bool = True,
    ) -> None:
        """Initialize database with connection URL and pool configuration."""
        self.url = url
        self.alembic_dir = alembic_dir
        self.auto_migrate = auto_migrate

        # Build engine kwargs - skip pool params for in-memory SQLite databases
        engine_kwargs: dict = {"echo": echo, "future": True}
        if ":memory:" not in url:
            # Only add pool params for non-in-memory databases
            engine_kwargs.update(
                {
                    "pool_size": pool_size,
                    "max_overflow": max_overflow,
                    "pool_recycle": pool_recycle,
                    "pool_pre_ping": pool_pre_ping,
                }
            )

        self.engine: AsyncEngine = create_async_engine(url, **engine_kwargs)
        self._session_factory: async_sessionmaker[AsyncSession] = async_sessionmaker(
            bind=self.engine, class_=AsyncSession, expire_on_commit=False
        )

    async def init(self) -> None:
        """Initialize database tables using Alembic migrations or direct creation."""
        import asyncio

        # Import Base here to avoid circular import at module level
        from chapkit.core.models import Base

        # For databases without migrations, use direct table creation
        if not self.auto_migrate:
            async with self.engine.begin() as conn:
                await conn.run_sync(Base.metadata.create_all)
        else:
            # Use Alembic migrations
            alembic_cfg = Config()

            # Use custom alembic directory if provided, otherwise use bundled migrations
            if self.alembic_dir is not None:
                alembic_cfg.set_main_option("script_location", str(self.alembic_dir))
            else:
                alembic_cfg.set_main_option(
                    "script_location", str(Path(__file__).parent.parent.parent.parent / "alembic")
                )

            alembic_cfg.set_main_option("sqlalchemy.url", self.url)

            # Run upgrade in executor to avoid event loop conflicts
            loop = asyncio.get_running_loop()
            await loop.run_in_executor(None, command.upgrade, alembic_cfg, "head")

    @asynccontextmanager
    async def session(self) -> AsyncIterator[AsyncSession]:
        """Create a database session context manager."""
        async with self._session_factory() as s:
            yield s

    async def dispose(self) -> None:
        """Dispose of database engine and connection pool."""
        await self.engine.dispose()

__init__(url, *, echo=False, alembic_dir=None, auto_migrate=True, pool_size=5, max_overflow=10, pool_recycle=3600, pool_pre_ping=True)

Initialize database with connection URL and pool configuration.

Source code in src/chapkit/core/database.py
def __init__(
    self,
    url: str,
    *,
    echo: bool = False,
    alembic_dir: Path | None = None,
    auto_migrate: bool = True,
    pool_size: int = 5,
    max_overflow: int = 10,
    pool_recycle: int = 3600,
    pool_pre_ping: bool = True,
) -> None:
    """Initialize database with connection URL and pool configuration."""
    self.url = url
    self.alembic_dir = alembic_dir
    self.auto_migrate = auto_migrate

    # Build engine kwargs - skip pool params for in-memory SQLite databases
    engine_kwargs: dict = {"echo": echo, "future": True}
    if ":memory:" not in url:
        # Only add pool params for non-in-memory databases
        engine_kwargs.update(
            {
                "pool_size": pool_size,
                "max_overflow": max_overflow,
                "pool_recycle": pool_recycle,
                "pool_pre_ping": pool_pre_ping,
            }
        )

    self.engine: AsyncEngine = create_async_engine(url, **engine_kwargs)
    self._session_factory: async_sessionmaker[AsyncSession] = async_sessionmaker(
        bind=self.engine, class_=AsyncSession, expire_on_commit=False
    )

init() async

Initialize database tables using Alembic migrations or direct creation.

Source code in src/chapkit/core/database.py
async def init(self) -> None:
    """Initialize database tables using Alembic migrations or direct creation."""
    import asyncio

    # Import Base here to avoid circular import at module level
    from chapkit.core.models import Base

    # For databases without migrations, use direct table creation
    if not self.auto_migrate:
        async with self.engine.begin() as conn:
            await conn.run_sync(Base.metadata.create_all)
    else:
        # Use Alembic migrations
        alembic_cfg = Config()

        # Use custom alembic directory if provided, otherwise use bundled migrations
        if self.alembic_dir is not None:
            alembic_cfg.set_main_option("script_location", str(self.alembic_dir))
        else:
            alembic_cfg.set_main_option(
                "script_location", str(Path(__file__).parent.parent.parent.parent / "alembic")
            )

        alembic_cfg.set_main_option("sqlalchemy.url", self.url)

        # Run upgrade in executor to avoid event loop conflicts
        loop = asyncio.get_running_loop()
        await loop.run_in_executor(None, command.upgrade, alembic_cfg, "head")

session() async

Create a database session context manager.

Source code in src/chapkit/core/database.py
@asynccontextmanager
async def session(self) -> AsyncIterator[AsyncSession]:
    """Create a database session context manager."""
    async with self._session_factory() as s:
        yield s

dispose() async

Dispose of database engine and connection pool.

Source code in src/chapkit/core/database.py
async def dispose(self) -> None:
    """Dispose of database engine and connection pool."""
    await self.engine.dispose()

SqliteDatabase

Bases: Database

SQLite-specific database implementation with optimizations.

Source code in src/chapkit/core/database.py
class SqliteDatabase(Database):
    """SQLite-specific database implementation with optimizations."""

    def __init__(
        self,
        url: str,
        *,
        echo: bool = False,
        alembic_dir: Path | None = None,
        auto_migrate: bool = True,
        pool_size: int = 5,
        max_overflow: int = 10,
        pool_recycle: int = 3600,
        pool_pre_ping: bool = True,
    ) -> None:
        """Initialize SQLite database with connection URL and pool configuration."""
        self.url = url
        self.alembic_dir = alembic_dir
        self.auto_migrate = auto_migrate

        # Build engine kwargs - pool params only for non-in-memory databases
        engine_kwargs: dict = {"echo": echo, "future": True}
        if not self._is_in_memory_url(url):
            # File-based databases can use pool configuration
            engine_kwargs.update(
                {
                    "pool_size": pool_size,
                    "max_overflow": max_overflow,
                    "pool_recycle": pool_recycle,
                    "pool_pre_ping": pool_pre_ping,
                }
            )

        self.engine: AsyncEngine = create_async_engine(url, **engine_kwargs)
        _install_sqlite_connect_pragmas(self.engine)
        self._session_factory: async_sessionmaker[AsyncSession] = async_sessionmaker(
            bind=self.engine, class_=AsyncSession, expire_on_commit=False
        )

    @staticmethod
    def _is_in_memory_url(url: str) -> bool:
        """Check if URL represents an in-memory database."""
        return ":memory:" in url

    def is_in_memory(self) -> bool:
        """Check if this is an in-memory database."""
        return self._is_in_memory_url(self.url)

    async def init(self) -> None:
        """Initialize database tables and configure SQLite using Alembic migrations."""
        # Import Base here to avoid circular import at module level
        from chapkit.core.models import Base

        # Set WAL mode first (if not in-memory)
        if not self.is_in_memory():
            async with self.engine.begin() as conn:
                await conn.exec_driver_sql("PRAGMA journal_mode=WAL;")

        # For in-memory databases or when migrations are disabled, use direct table creation
        if self.is_in_memory() or not self.auto_migrate:
            async with self.engine.begin() as conn:
                await conn.run_sync(Base.metadata.create_all)
        else:
            # For file-based databases, use Alembic migrations
            await super().init()

__init__(url, *, echo=False, alembic_dir=None, auto_migrate=True, pool_size=5, max_overflow=10, pool_recycle=3600, pool_pre_ping=True)

Initialize SQLite database with connection URL and pool configuration.

Source code in src/chapkit/core/database.py
def __init__(
    self,
    url: str,
    *,
    echo: bool = False,
    alembic_dir: Path | None = None,
    auto_migrate: bool = True,
    pool_size: int = 5,
    max_overflow: int = 10,
    pool_recycle: int = 3600,
    pool_pre_ping: bool = True,
) -> None:
    """Initialize SQLite database with connection URL and pool configuration."""
    self.url = url
    self.alembic_dir = alembic_dir
    self.auto_migrate = auto_migrate

    # Build engine kwargs - pool params only for non-in-memory databases
    engine_kwargs: dict = {"echo": echo, "future": True}
    if not self._is_in_memory_url(url):
        # File-based databases can use pool configuration
        engine_kwargs.update(
            {
                "pool_size": pool_size,
                "max_overflow": max_overflow,
                "pool_recycle": pool_recycle,
                "pool_pre_ping": pool_pre_ping,
            }
        )

    self.engine: AsyncEngine = create_async_engine(url, **engine_kwargs)
    _install_sqlite_connect_pragmas(self.engine)
    self._session_factory: async_sessionmaker[AsyncSession] = async_sessionmaker(
        bind=self.engine, class_=AsyncSession, expire_on_commit=False
    )

is_in_memory()

Check if this is an in-memory database.

Source code in src/chapkit/core/database.py
def is_in_memory(self) -> bool:
    """Check if this is an in-memory database."""
    return self._is_in_memory_url(self.url)

init() async

Initialize database tables and configure SQLite using Alembic migrations.

Source code in src/chapkit/core/database.py
async def init(self) -> None:
    """Initialize database tables and configure SQLite using Alembic migrations."""
    # Import Base here to avoid circular import at module level
    from chapkit.core.models import Base

    # Set WAL mode first (if not in-memory)
    if not self.is_in_memory():
        async with self.engine.begin() as conn:
            await conn.exec_driver_sql("PRAGMA journal_mode=WAL;")

    # For in-memory databases or when migrations are disabled, use direct table creation
    if self.is_in_memory() or not self.auto_migrate:
        async with self.engine.begin() as conn:
            await conn.run_sync(Base.metadata.create_all)
    else:
        # For file-based databases, use Alembic migrations
        await super().init()

SqliteDatabaseBuilder

Builder for SQLite database configuration with fluent API.

Source code in src/chapkit/core/database.py
class SqliteDatabaseBuilder:
    """Builder for SQLite database configuration with fluent API."""

    def __init__(self) -> None:
        """Initialize builder with default values."""
        self._url: str = ""
        self._echo: bool = False
        self._alembic_dir: Path | None = None
        self._auto_migrate: bool = True
        self._pool_size: int = 5
        self._max_overflow: int = 10
        self._pool_recycle: int = 3600
        self._pool_pre_ping: bool = True

    @classmethod
    def in_memory(cls) -> Self:
        """Create an in-memory SQLite database configuration."""
        builder = cls()
        builder._url = "sqlite+aiosqlite:///:memory:"
        return builder

    @classmethod
    def from_file(cls, path: str | Path) -> Self:
        """Create a file-based SQLite database configuration."""
        builder = cls()
        if isinstance(path, Path):
            path = str(path)
        builder._url = f"sqlite+aiosqlite:///{path}"
        return builder

    def with_echo(self, enabled: bool = True) -> Self:
        """Enable SQL query logging."""
        self._echo = enabled
        return self

    def with_migrations(self, enabled: bool = True, alembic_dir: Path | None = None) -> Self:
        """Configure migration behavior."""
        self._auto_migrate = enabled
        self._alembic_dir = alembic_dir
        return self

    def with_pool(
        self,
        size: int = 5,
        max_overflow: int = 10,
        recycle: int = 3600,
        pre_ping: bool = True,
    ) -> Self:
        """Configure connection pool settings."""
        self._pool_size = size
        self._max_overflow = max_overflow
        self._pool_recycle = recycle
        self._pool_pre_ping = pre_ping
        return self

    def build(self) -> SqliteDatabase:
        """Build and return configured SqliteDatabase instance."""
        if not self._url:
            raise ValueError("Database URL not configured. Use .in_memory() or .from_file()")

        return SqliteDatabase(
            url=self._url,
            echo=self._echo,
            alembic_dir=self._alembic_dir,
            auto_migrate=self._auto_migrate,
            pool_size=self._pool_size,
            max_overflow=self._max_overflow,
            pool_recycle=self._pool_recycle,
            pool_pre_ping=self._pool_pre_ping,
        )

__init__()

Initialize builder with default values.

Source code in src/chapkit/core/database.py
def __init__(self) -> None:
    """Initialize builder with default values."""
    self._url: str = ""
    self._echo: bool = False
    self._alembic_dir: Path | None = None
    self._auto_migrate: bool = True
    self._pool_size: int = 5
    self._max_overflow: int = 10
    self._pool_recycle: int = 3600
    self._pool_pre_ping: bool = True

in_memory() classmethod

Create an in-memory SQLite database configuration.

Source code in src/chapkit/core/database.py
@classmethod
def in_memory(cls) -> Self:
    """Create an in-memory SQLite database configuration."""
    builder = cls()
    builder._url = "sqlite+aiosqlite:///:memory:"
    return builder

from_file(path) classmethod

Create a file-based SQLite database configuration.

Source code in src/chapkit/core/database.py
@classmethod
def from_file(cls, path: str | Path) -> Self:
    """Create a file-based SQLite database configuration."""
    builder = cls()
    if isinstance(path, Path):
        path = str(path)
    builder._url = f"sqlite+aiosqlite:///{path}"
    return builder

with_echo(enabled=True)

Enable SQL query logging.

Source code in src/chapkit/core/database.py
def with_echo(self, enabled: bool = True) -> Self:
    """Enable SQL query logging."""
    self._echo = enabled
    return self

with_migrations(enabled=True, alembic_dir=None)

Configure migration behavior.

Source code in src/chapkit/core/database.py
def with_migrations(self, enabled: bool = True, alembic_dir: Path | None = None) -> Self:
    """Configure migration behavior."""
    self._auto_migrate = enabled
    self._alembic_dir = alembic_dir
    return self

with_pool(size=5, max_overflow=10, recycle=3600, pre_ping=True)

Configure connection pool settings.

Source code in src/chapkit/core/database.py
def with_pool(
    self,
    size: int = 5,
    max_overflow: int = 10,
    recycle: int = 3600,
    pre_ping: bool = True,
) -> Self:
    """Configure connection pool settings."""
    self._pool_size = size
    self._max_overflow = max_overflow
    self._pool_recycle = recycle
    self._pool_pre_ping = pre_ping
    return self

build()

Build and return configured SqliteDatabase instance.

Source code in src/chapkit/core/database.py
def build(self) -> SqliteDatabase:
    """Build and return configured SqliteDatabase instance."""
    if not self._url:
        raise ValueError("Database URL not configured. Use .in_memory() or .from_file()")

    return SqliteDatabase(
        url=self._url,
        echo=self._echo,
        alembic_dir=self._alembic_dir,
        auto_migrate=self._auto_migrate,
        pool_size=self._pool_size,
        max_overflow=self._max_overflow,
        pool_recycle=self._pool_recycle,
        pool_pre_ping=self._pool_pre_ping,
    )

Models

models

Base ORM classes for SQLAlchemy models.

Base

Bases: AsyncAttrs, DeclarativeBase

Root declarative base with async support.

Source code in src/chapkit/core/models.py
class Base(AsyncAttrs, DeclarativeBase):
    """Root declarative base with async support."""

Entity

Bases: Base

Optional base with common columns for your models.

Source code in src/chapkit/core/models.py
class Entity(Base):
    """Optional base with common columns for your models."""

    __abstract__ = True

    id: Mapped[ULID] = mapped_column(ULIDType, primary_key=True, default=ULID)
    created_at: Mapped[datetime.datetime] = mapped_column(server_default=func.now())
    updated_at: Mapped[datetime.datetime] = mapped_column(server_default=func.now(), onupdate=func.now())

Repository

repository

Base repository classes for data access layer.

Repository

Bases: ABC

Abstract repository interface for data access operations.

Source code in src/chapkit/core/repository.py
class Repository[T, IdT = ULID](ABC):
    """Abstract repository interface for data access operations."""

    @abstractmethod
    async def save(self, entity: T) -> T:
        """Save an entity to the database."""
        ...

    @abstractmethod
    async def save_all(self, entities: Iterable[T]) -> Sequence[T]:
        """Save multiple entities to the database."""
        ...

    @abstractmethod
    async def commit(self) -> None:
        """Commit the current database transaction."""
        ...

    @abstractmethod
    async def refresh_many(self, entities: Iterable[T]) -> None:
        """Refresh multiple entities from the database."""
        ...

    @abstractmethod
    async def delete(self, entity: T) -> None:
        """Delete an entity from the database."""
        ...

    @abstractmethod
    async def delete_by_id(self, id: IdT) -> None:
        """Delete an entity by its ID."""
        ...

    @abstractmethod
    async def delete_all(self) -> None:
        """Delete all entities from the database."""
        ...

    @abstractmethod
    async def delete_all_by_id(self, ids: Sequence[IdT]) -> None:
        """Delete multiple entities by their IDs."""
        ...

    @abstractmethod
    async def count(self) -> int:
        """Count the number of entities."""
        ...

    @abstractmethod
    async def exists_by_id(self, id: IdT) -> bool:
        """Check if an entity exists by its ID."""
        ...

    @abstractmethod
    async def find_all(self) -> Sequence[T]:
        """Find all entities."""
        ...

    @abstractmethod
    async def find_all_paginated(self, offset: int, limit: int) -> Sequence[T]:
        """Find entities with pagination."""
        ...

    @abstractmethod
    async def find_all_by_id(self, ids: Sequence[IdT]) -> Sequence[T]:
        """Find entities by their IDs."""
        ...

    @abstractmethod
    async def find_by_id(self, id: IdT) -> T | None:
        """Find an entity by its ID."""
        ...

save(entity) abstractmethod async

Save an entity to the database.

Source code in src/chapkit/core/repository.py
@abstractmethod
async def save(self, entity: T) -> T:
    """Save an entity to the database."""
    ...

save_all(entities) abstractmethod async

Save multiple entities to the database.

Source code in src/chapkit/core/repository.py
@abstractmethod
async def save_all(self, entities: Iterable[T]) -> Sequence[T]:
    """Save multiple entities to the database."""
    ...

commit() abstractmethod async

Commit the current database transaction.

Source code in src/chapkit/core/repository.py
@abstractmethod
async def commit(self) -> None:
    """Commit the current database transaction."""
    ...

refresh_many(entities) abstractmethod async

Refresh multiple entities from the database.

Source code in src/chapkit/core/repository.py
@abstractmethod
async def refresh_many(self, entities: Iterable[T]) -> None:
    """Refresh multiple entities from the database."""
    ...

delete(entity) abstractmethod async

Delete an entity from the database.

Source code in src/chapkit/core/repository.py
@abstractmethod
async def delete(self, entity: T) -> None:
    """Delete an entity from the database."""
    ...

delete_by_id(id) abstractmethod async

Delete an entity by its ID.

Source code in src/chapkit/core/repository.py
@abstractmethod
async def delete_by_id(self, id: IdT) -> None:
    """Delete an entity by its ID."""
    ...

delete_all() abstractmethod async

Delete all entities from the database.

Source code in src/chapkit/core/repository.py
@abstractmethod
async def delete_all(self) -> None:
    """Delete all entities from the database."""
    ...

delete_all_by_id(ids) abstractmethod async

Delete multiple entities by their IDs.

Source code in src/chapkit/core/repository.py
@abstractmethod
async def delete_all_by_id(self, ids: Sequence[IdT]) -> None:
    """Delete multiple entities by their IDs."""
    ...

count() abstractmethod async

Count the number of entities.

Source code in src/chapkit/core/repository.py
@abstractmethod
async def count(self) -> int:
    """Count the number of entities."""
    ...

exists_by_id(id) abstractmethod async

Check if an entity exists by its ID.

Source code in src/chapkit/core/repository.py
@abstractmethod
async def exists_by_id(self, id: IdT) -> bool:
    """Check if an entity exists by its ID."""
    ...

find_all() abstractmethod async

Find all entities.

Source code in src/chapkit/core/repository.py
@abstractmethod
async def find_all(self) -> Sequence[T]:
    """Find all entities."""
    ...

find_all_paginated(offset, limit) abstractmethod async

Find entities with pagination.

Source code in src/chapkit/core/repository.py
@abstractmethod
async def find_all_paginated(self, offset: int, limit: int) -> Sequence[T]:
    """Find entities with pagination."""
    ...

find_all_by_id(ids) abstractmethod async

Find entities by their IDs.

Source code in src/chapkit/core/repository.py
@abstractmethod
async def find_all_by_id(self, ids: Sequence[IdT]) -> Sequence[T]:
    """Find entities by their IDs."""
    ...

find_by_id(id) abstractmethod async

Find an entity by its ID.

Source code in src/chapkit/core/repository.py
@abstractmethod
async def find_by_id(self, id: IdT) -> T | None:
    """Find an entity by its ID."""
    ...

BaseRepository

Bases: Repository[T, IdT]

Base repository implementation with common CRUD operations.

Source code in src/chapkit/core/repository.py
class BaseRepository[T, IdT = ULID](Repository[T, IdT]):
    """Base repository implementation with common CRUD operations."""

    def __init__(self, session: AsyncSession, model: type[T]) -> None:
        """Initialize repository with database session and model type."""
        self.s = session
        self.model = model

    # ---------- Create ----------
    async def save(self, entity: T) -> T:
        """Save an entity to the database."""
        self.s.add(entity)
        return entity

    async def save_all(self, entities: Iterable[T]) -> Sequence[T]:
        """Save multiple entities to the database."""
        entity_list = list(entities)
        self.s.add_all(entity_list)
        return entity_list

    async def commit(self) -> None:
        """Commit the current database transaction."""
        await self.s.commit()

    async def refresh_many(self, entities: Iterable[T]) -> None:
        """Refresh multiple entities from the database."""
        for e in entities:
            await self.s.refresh(e)

    # ---------- Delete ----------
    async def delete(self, entity: T) -> None:
        """Delete an entity from the database."""
        await self.s.delete(entity)

    async def delete_by_id(self, id: IdT) -> None:
        """Delete an entity by its ID."""
        id_col = getattr(self.model, "id")
        await self.s.execute(delete(self.model).where(id_col == id))

    async def delete_all(self) -> None:
        """Delete all entities from the database."""
        await self.s.execute(delete(self.model))

    async def delete_all_by_id(self, ids: Sequence[IdT]) -> None:
        """Delete multiple entities by their IDs."""
        if not ids:
            return
        # Access the "id" column generically
        id_col = getattr(self.model, "id")
        await self.s.execute(delete(self.model).where(id_col.in_(ids)))

    # ---------- Read / Count ----------
    async def count(self) -> int:
        """Count the number of entities."""
        return await self.s.scalar(select(func.count()).select_from(self.model)) or 0

    async def exists_by_id(self, id: IdT) -> bool:
        """Check if an entity exists by its ID."""
        # Access the "id" column generically
        id_col = getattr(self.model, "id")
        q = select(select(id_col).where(id_col == id).exists())
        return await self.s.scalar(q) or False

    async def find_all(self) -> Sequence[T]:
        """Find all entities."""
        result = await self.s.scalars(select(self.model))
        return result.all()

    async def find_all_paginated(self, offset: int, limit: int) -> Sequence[T]:
        """Find entities with pagination."""
        result = await self.s.scalars(select(self.model).offset(offset).limit(limit))
        return result.all()

    async def find_all_by_id(self, ids: Sequence[IdT]) -> Sequence[T]:
        """Find entities by their IDs."""
        if not ids:
            return []
        id_col = getattr(self.model, "id")
        result = await self.s.scalars(select(self.model).where(id_col.in_(ids)))
        return result.all()

    async def find_by_id(self, id: IdT) -> T | None:
        """Find an entity by its ID."""
        return await self.s.get(self.model, id)

__init__(session, model)

Initialize repository with database session and model type.

Source code in src/chapkit/core/repository.py
def __init__(self, session: AsyncSession, model: type[T]) -> None:
    """Initialize repository with database session and model type."""
    self.s = session
    self.model = model

save(entity) async

Save an entity to the database.

Source code in src/chapkit/core/repository.py
async def save(self, entity: T) -> T:
    """Save an entity to the database."""
    self.s.add(entity)
    return entity

save_all(entities) async

Save multiple entities to the database.

Source code in src/chapkit/core/repository.py
async def save_all(self, entities: Iterable[T]) -> Sequence[T]:
    """Save multiple entities to the database."""
    entity_list = list(entities)
    self.s.add_all(entity_list)
    return entity_list

commit() async

Commit the current database transaction.

Source code in src/chapkit/core/repository.py
async def commit(self) -> None:
    """Commit the current database transaction."""
    await self.s.commit()

refresh_many(entities) async

Refresh multiple entities from the database.

Source code in src/chapkit/core/repository.py
async def refresh_many(self, entities: Iterable[T]) -> None:
    """Refresh multiple entities from the database."""
    for e in entities:
        await self.s.refresh(e)

delete(entity) async

Delete an entity from the database.

Source code in src/chapkit/core/repository.py
async def delete(self, entity: T) -> None:
    """Delete an entity from the database."""
    await self.s.delete(entity)

delete_by_id(id) async

Delete an entity by its ID.

Source code in src/chapkit/core/repository.py
async def delete_by_id(self, id: IdT) -> None:
    """Delete an entity by its ID."""
    id_col = getattr(self.model, "id")
    await self.s.execute(delete(self.model).where(id_col == id))

delete_all() async

Delete all entities from the database.

Source code in src/chapkit/core/repository.py
async def delete_all(self) -> None:
    """Delete all entities from the database."""
    await self.s.execute(delete(self.model))

delete_all_by_id(ids) async

Delete multiple entities by their IDs.

Source code in src/chapkit/core/repository.py
async def delete_all_by_id(self, ids: Sequence[IdT]) -> None:
    """Delete multiple entities by their IDs."""
    if not ids:
        return
    # Access the "id" column generically
    id_col = getattr(self.model, "id")
    await self.s.execute(delete(self.model).where(id_col.in_(ids)))

count() async

Count the number of entities.

Source code in src/chapkit/core/repository.py
async def count(self) -> int:
    """Count the number of entities."""
    return await self.s.scalar(select(func.count()).select_from(self.model)) or 0

exists_by_id(id) async

Check if an entity exists by its ID.

Source code in src/chapkit/core/repository.py
async def exists_by_id(self, id: IdT) -> bool:
    """Check if an entity exists by its ID."""
    # Access the "id" column generically
    id_col = getattr(self.model, "id")
    q = select(select(id_col).where(id_col == id).exists())
    return await self.s.scalar(q) or False

find_all() async

Find all entities.

Source code in src/chapkit/core/repository.py
async def find_all(self) -> Sequence[T]:
    """Find all entities."""
    result = await self.s.scalars(select(self.model))
    return result.all()

find_all_paginated(offset, limit) async

Find entities with pagination.

Source code in src/chapkit/core/repository.py
async def find_all_paginated(self, offset: int, limit: int) -> Sequence[T]:
    """Find entities with pagination."""
    result = await self.s.scalars(select(self.model).offset(offset).limit(limit))
    return result.all()

find_all_by_id(ids) async

Find entities by their IDs.

Source code in src/chapkit/core/repository.py
async def find_all_by_id(self, ids: Sequence[IdT]) -> Sequence[T]:
    """Find entities by their IDs."""
    if not ids:
        return []
    id_col = getattr(self.model, "id")
    result = await self.s.scalars(select(self.model).where(id_col.in_(ids)))
    return result.all()

find_by_id(id) async

Find an entity by its ID.

Source code in src/chapkit/core/repository.py
async def find_by_id(self, id: IdT) -> T | None:
    """Find an entity by its ID."""
    return await self.s.get(self.model, id)

Manager

manager

Base classes for service layer managers with lifecycle hooks.

LifecycleHooks

Lifecycle hooks for entity operations.

Source code in src/chapkit/core/manager.py
class LifecycleHooks[ModelT, InSchemaT: BaseModel]:
    """Lifecycle hooks for entity operations."""

    def _should_assign_field(self, field: str, value: object) -> bool:
        """Determine if a field should be assigned during update."""
        return True

    async def pre_save(self, entity: ModelT, data: InSchemaT) -> None:
        """Hook called before saving a new entity."""
        pass

    async def post_save(self, entity: ModelT) -> None:
        """Hook called after saving a new entity."""
        pass

    async def pre_update(self, entity: ModelT, data: InSchemaT, old_values: dict[str, object]) -> None:
        """Hook called before updating an existing entity."""
        pass

    async def post_update(self, entity: ModelT, changes: dict[str, tuple[object, object]]) -> None:
        """Hook called after updating an existing entity."""
        pass

    async def pre_delete(self, entity: ModelT) -> None:
        """Hook called before deleting an entity."""
        pass

    async def post_delete(self, entity: ModelT) -> None:
        """Hook called after deleting an entity."""
        pass

pre_save(entity, data) async

Hook called before saving a new entity.

Source code in src/chapkit/core/manager.py
async def pre_save(self, entity: ModelT, data: InSchemaT) -> None:
    """Hook called before saving a new entity."""
    pass

post_save(entity) async

Hook called after saving a new entity.

Source code in src/chapkit/core/manager.py
async def post_save(self, entity: ModelT) -> None:
    """Hook called after saving a new entity."""
    pass

pre_update(entity, data, old_values) async

Hook called before updating an existing entity.

Source code in src/chapkit/core/manager.py
async def pre_update(self, entity: ModelT, data: InSchemaT, old_values: dict[str, object]) -> None:
    """Hook called before updating an existing entity."""
    pass

post_update(entity, changes) async

Hook called after updating an existing entity.

Source code in src/chapkit/core/manager.py
async def post_update(self, entity: ModelT, changes: dict[str, tuple[object, object]]) -> None:
    """Hook called after updating an existing entity."""
    pass

pre_delete(entity) async

Hook called before deleting an entity.

Source code in src/chapkit/core/manager.py
async def pre_delete(self, entity: ModelT) -> None:
    """Hook called before deleting an entity."""
    pass

post_delete(entity) async

Hook called after deleting an entity.

Source code in src/chapkit/core/manager.py
async def post_delete(self, entity: ModelT) -> None:
    """Hook called after deleting an entity."""
    pass

Manager

Bases: ABC

Abstract manager interface for business logic operations.

Source code in src/chapkit/core/manager.py
class Manager[InSchemaT: BaseModel, OutSchemaT: BaseModel, IdT](ABC):
    """Abstract manager interface for business logic operations."""

    @abstractmethod
    async def save(self, data: InSchemaT) -> OutSchemaT:
        """Save an entity."""
        ...

    @abstractmethod
    async def save_all(self, items: Iterable[InSchemaT]) -> list[OutSchemaT]:
        """Save multiple entities."""
        ...

    @abstractmethod
    async def delete_by_id(self, id: IdT) -> None:
        """Delete an entity by its ID."""
        ...

    @abstractmethod
    async def delete_all(self) -> None:
        """Delete all entities."""
        ...

    @abstractmethod
    async def delete_all_by_id(self, ids: Sequence[IdT]) -> None:
        """Delete multiple entities by their IDs."""
        ...

    @abstractmethod
    async def count(self) -> int:
        """Count the number of entities."""
        ...

    @abstractmethod
    async def exists_by_id(self, id: IdT) -> bool:
        """Check if an entity exists by its ID."""
        ...

    @abstractmethod
    async def find_by_id(self, id: IdT) -> OutSchemaT | None:
        """Find an entity by its ID."""
        ...

    @abstractmethod
    async def find_all(self) -> list[OutSchemaT]:
        """Find all entities."""
        ...

    @abstractmethod
    async def find_paginated(self, page: int, size: int) -> tuple[list[OutSchemaT], int]:
        """Find entities with pagination."""
        ...

    @abstractmethod
    async def find_all_by_id(self, ids: Sequence[IdT]) -> list[OutSchemaT]:
        """Find entities by their IDs."""
        ...

save(data) abstractmethod async

Save an entity.

Source code in src/chapkit/core/manager.py
@abstractmethod
async def save(self, data: InSchemaT) -> OutSchemaT:
    """Save an entity."""
    ...

save_all(items) abstractmethod async

Save multiple entities.

Source code in src/chapkit/core/manager.py
@abstractmethod
async def save_all(self, items: Iterable[InSchemaT]) -> list[OutSchemaT]:
    """Save multiple entities."""
    ...

delete_by_id(id) abstractmethod async

Delete an entity by its ID.

Source code in src/chapkit/core/manager.py
@abstractmethod
async def delete_by_id(self, id: IdT) -> None:
    """Delete an entity by its ID."""
    ...

delete_all() abstractmethod async

Delete all entities.

Source code in src/chapkit/core/manager.py
@abstractmethod
async def delete_all(self) -> None:
    """Delete all entities."""
    ...

delete_all_by_id(ids) abstractmethod async

Delete multiple entities by their IDs.

Source code in src/chapkit/core/manager.py
@abstractmethod
async def delete_all_by_id(self, ids: Sequence[IdT]) -> None:
    """Delete multiple entities by their IDs."""
    ...

count() abstractmethod async

Count the number of entities.

Source code in src/chapkit/core/manager.py
@abstractmethod
async def count(self) -> int:
    """Count the number of entities."""
    ...

exists_by_id(id) abstractmethod async

Check if an entity exists by its ID.

Source code in src/chapkit/core/manager.py
@abstractmethod
async def exists_by_id(self, id: IdT) -> bool:
    """Check if an entity exists by its ID."""
    ...

find_by_id(id) abstractmethod async

Find an entity by its ID.

Source code in src/chapkit/core/manager.py
@abstractmethod
async def find_by_id(self, id: IdT) -> OutSchemaT | None:
    """Find an entity by its ID."""
    ...

find_all() abstractmethod async

Find all entities.

Source code in src/chapkit/core/manager.py
@abstractmethod
async def find_all(self) -> list[OutSchemaT]:
    """Find all entities."""
    ...

find_paginated(page, size) abstractmethod async

Find entities with pagination.

Source code in src/chapkit/core/manager.py
@abstractmethod
async def find_paginated(self, page: int, size: int) -> tuple[list[OutSchemaT], int]:
    """Find entities with pagination."""
    ...

find_all_by_id(ids) abstractmethod async

Find entities by their IDs.

Source code in src/chapkit/core/manager.py
@abstractmethod
async def find_all_by_id(self, ids: Sequence[IdT]) -> list[OutSchemaT]:
    """Find entities by their IDs."""
    ...

BaseManager

Bases: LifecycleHooks[ModelT, InSchemaT], Manager[InSchemaT, OutSchemaT, IdT]

Base manager implementation with CRUD operations and lifecycle hooks.

Source code in src/chapkit/core/manager.py
class BaseManager[ModelT, InSchemaT: BaseModel, OutSchemaT: BaseModel, IdT](
    LifecycleHooks[ModelT, InSchemaT],
    Manager[InSchemaT, OutSchemaT, IdT],
):
    """Base manager implementation with CRUD operations and lifecycle hooks."""

    def __init__(
        self,
        repo: BaseRepository[ModelT, IdT],
        model_cls: type[ModelT],
        out_schema_cls: type[OutSchemaT],
    ) -> None:
        """Initialize manager with repository, model class, and output schema class."""
        self.repo = repo
        self.model_cls = model_cls
        self.out_schema_cls = out_schema_cls

    def _to_output_schema(self, entity: ModelT) -> OutSchemaT:
        """Convert ORM entity to output schema."""
        return self.out_schema_cls.model_validate(entity, from_attributes=True)

    async def save(self, data: InSchemaT) -> OutSchemaT:
        """Save an entity (create or update)."""
        data_dict = data.model_dump(exclude_none=True)
        entity_id = data_dict.get("id")
        existing: ModelT | None = None

        if entity_id is not None:
            existing = await self.repo.find_by_id(entity_id)

        if existing is None:
            if data_dict.get("id") is None:
                data_dict.pop("id", None)
            entity = self.model_cls(**data_dict)
            await self.pre_save(entity, data)
            await self.repo.save(entity)
            await self.repo.commit()
            await self.repo.refresh_many([entity])
            await self.post_save(entity)
            return self._to_output_schema(entity)

        tracked_fields = set(data_dict.keys())
        if hasattr(existing, "level"):  # pragma: no branch
            tracked_fields.add("level")
        old_values = {field: getattr(existing, field) for field in tracked_fields if hasattr(existing, field)}

        for key, value in data_dict.items():
            if key == "id":  # pragma: no branch
                continue
            if not self._should_assign_field(key, value):
                continue
            if hasattr(existing, key):
                setattr(existing, key, value)

        await self.pre_update(existing, data, old_values)

        changes: dict[str, tuple[object, object]] = {}
        for field in tracked_fields:
            if hasattr(existing, field):
                new_value = getattr(existing, field)
                old_value = old_values.get(field)
                if old_value != new_value:
                    changes[field] = (old_value, new_value)

        await self.repo.save(existing)
        await self.repo.commit()
        await self.repo.refresh_many([existing])
        await self.post_update(existing, changes)
        return self._to_output_schema(existing)

    async def save_all(self, items: Iterable[InSchemaT]) -> list[OutSchemaT]:
        entities_to_insert: list[ModelT] = []
        updates: list[tuple[ModelT, dict[str, tuple[object, object]]]] = []
        outputs: list[ModelT] = []

        for data in items:
            data_dict = data.model_dump(exclude_none=True)
            entity_id = data_dict.get("id")
            existing: ModelT | None = None
            if entity_id is not None:
                existing = await self.repo.find_by_id(entity_id)

            if existing is None:
                if data_dict.get("id") is None:
                    data_dict.pop("id", None)
                entity = self.model_cls(**data_dict)
                await self.pre_save(entity, data)
                entities_to_insert.append(entity)
                outputs.append(entity)
                continue

            tracked_fields = set(data_dict.keys())
            if hasattr(existing, "level"):  # pragma: no branch
                tracked_fields.add("level")
            old_values = {field: getattr(existing, field) for field in tracked_fields if hasattr(existing, field)}

            for key, value in data_dict.items():
                if key == "id":  # pragma: no branch
                    continue
                if not self._should_assign_field(key, value):
                    continue
                if hasattr(existing, key):
                    setattr(existing, key, value)

            await self.pre_update(existing, data, old_values)

            changes: dict[str, tuple[object, object]] = {}
            for field in tracked_fields:
                if hasattr(existing, field):
                    new_value = getattr(existing, field)
                    old_value = old_values.get(field)
                    if old_value != new_value:
                        changes[field] = (old_value, new_value)

            updates.append((existing, changes))
            outputs.append(existing)

        if entities_to_insert:  # pragma: no branch
            await self.repo.save_all(entities_to_insert)
        await self.repo.commit()
        if outputs:  # pragma: no branch
            await self.repo.refresh_many(outputs)

        for entity in entities_to_insert:
            await self.post_save(entity)
        for entity, changes in updates:
            await self.post_update(entity, changes)

        return [self._to_output_schema(entity) for entity in outputs]

    async def delete_by_id(self, id: IdT) -> None:
        """Delete an entity by its ID."""
        entity = await self.repo.find_by_id(id)
        if entity is None:
            return
        await self.pre_delete(entity)
        await self.repo.delete(entity)
        await self.repo.commit()
        await self.post_delete(entity)

    async def delete_all(self) -> None:
        """Delete all entities."""
        entities = await self.repo.find_all()
        for entity in entities:
            await self.pre_delete(entity)
        await self.repo.delete_all()
        await self.repo.commit()
        for entity in entities:
            await self.post_delete(entity)

    async def delete_all_by_id(self, ids: Sequence[IdT]) -> None:
        """Delete multiple entities by their IDs."""
        if not ids:
            return
        entities = await self.repo.find_all_by_id(ids)
        for entity in entities:
            await self.pre_delete(entity)
        await self.repo.delete_all_by_id(ids)
        await self.repo.commit()
        for entity in entities:
            await self.post_delete(entity)

    async def count(self) -> int:
        """Count the number of entities."""
        return await self.repo.count()

    async def exists_by_id(self, id: IdT) -> bool:
        """Check if an entity exists by its ID."""
        return await self.repo.exists_by_id(id)

    async def find_by_id(self, id: IdT) -> OutSchemaT | None:
        """Find an entity by its ID."""
        entity = await self.repo.find_by_id(id)
        if entity is None:
            return None
        return self._to_output_schema(entity)

    async def find_all(self) -> list[OutSchemaT]:
        """Find all entities."""
        entities = await self.repo.find_all()
        return [self._to_output_schema(e) for e in entities]

    async def find_paginated(self, page: int, size: int) -> tuple[list[OutSchemaT], int]:
        """Find entities with pagination."""
        offset = (page - 1) * size
        entities = await self.repo.find_all_paginated(offset, size)
        total = await self.repo.count()
        return [self._to_output_schema(e) for e in entities], total

    async def find_all_by_id(self, ids: Sequence[IdT]) -> list[OutSchemaT]:
        """Find entities by their IDs."""
        entities = await self.repo.find_all_by_id(ids)
        return [self._to_output_schema(e) for e in entities]

__init__(repo, model_cls, out_schema_cls)

Initialize manager with repository, model class, and output schema class.

Source code in src/chapkit/core/manager.py
def __init__(
    self,
    repo: BaseRepository[ModelT, IdT],
    model_cls: type[ModelT],
    out_schema_cls: type[OutSchemaT],
) -> None:
    """Initialize manager with repository, model class, and output schema class."""
    self.repo = repo
    self.model_cls = model_cls
    self.out_schema_cls = out_schema_cls

save(data) async

Save an entity (create or update).

Source code in src/chapkit/core/manager.py
async def save(self, data: InSchemaT) -> OutSchemaT:
    """Save an entity (create or update)."""
    data_dict = data.model_dump(exclude_none=True)
    entity_id = data_dict.get("id")
    existing: ModelT | None = None

    if entity_id is not None:
        existing = await self.repo.find_by_id(entity_id)

    if existing is None:
        if data_dict.get("id") is None:
            data_dict.pop("id", None)
        entity = self.model_cls(**data_dict)
        await self.pre_save(entity, data)
        await self.repo.save(entity)
        await self.repo.commit()
        await self.repo.refresh_many([entity])
        await self.post_save(entity)
        return self._to_output_schema(entity)

    tracked_fields = set(data_dict.keys())
    if hasattr(existing, "level"):  # pragma: no branch
        tracked_fields.add("level")
    old_values = {field: getattr(existing, field) for field in tracked_fields if hasattr(existing, field)}

    for key, value in data_dict.items():
        if key == "id":  # pragma: no branch
            continue
        if not self._should_assign_field(key, value):
            continue
        if hasattr(existing, key):
            setattr(existing, key, value)

    await self.pre_update(existing, data, old_values)

    changes: dict[str, tuple[object, object]] = {}
    for field in tracked_fields:
        if hasattr(existing, field):
            new_value = getattr(existing, field)
            old_value = old_values.get(field)
            if old_value != new_value:
                changes[field] = (old_value, new_value)

    await self.repo.save(existing)
    await self.repo.commit()
    await self.repo.refresh_many([existing])
    await self.post_update(existing, changes)
    return self._to_output_schema(existing)

delete_by_id(id) async

Delete an entity by its ID.

Source code in src/chapkit/core/manager.py
async def delete_by_id(self, id: IdT) -> None:
    """Delete an entity by its ID."""
    entity = await self.repo.find_by_id(id)
    if entity is None:
        return
    await self.pre_delete(entity)
    await self.repo.delete(entity)
    await self.repo.commit()
    await self.post_delete(entity)

delete_all() async

Delete all entities.

Source code in src/chapkit/core/manager.py
async def delete_all(self) -> None:
    """Delete all entities."""
    entities = await self.repo.find_all()
    for entity in entities:
        await self.pre_delete(entity)
    await self.repo.delete_all()
    await self.repo.commit()
    for entity in entities:
        await self.post_delete(entity)

delete_all_by_id(ids) async

Delete multiple entities by their IDs.

Source code in src/chapkit/core/manager.py
async def delete_all_by_id(self, ids: Sequence[IdT]) -> None:
    """Delete multiple entities by their IDs."""
    if not ids:
        return
    entities = await self.repo.find_all_by_id(ids)
    for entity in entities:
        await self.pre_delete(entity)
    await self.repo.delete_all_by_id(ids)
    await self.repo.commit()
    for entity in entities:
        await self.post_delete(entity)

count() async

Count the number of entities.

Source code in src/chapkit/core/manager.py
async def count(self) -> int:
    """Count the number of entities."""
    return await self.repo.count()

exists_by_id(id) async

Check if an entity exists by its ID.

Source code in src/chapkit/core/manager.py
async def exists_by_id(self, id: IdT) -> bool:
    """Check if an entity exists by its ID."""
    return await self.repo.exists_by_id(id)

find_by_id(id) async

Find an entity by its ID.

Source code in src/chapkit/core/manager.py
async def find_by_id(self, id: IdT) -> OutSchemaT | None:
    """Find an entity by its ID."""
    entity = await self.repo.find_by_id(id)
    if entity is None:
        return None
    return self._to_output_schema(entity)

find_all() async

Find all entities.

Source code in src/chapkit/core/manager.py
async def find_all(self) -> list[OutSchemaT]:
    """Find all entities."""
    entities = await self.repo.find_all()
    return [self._to_output_schema(e) for e in entities]

find_paginated(page, size) async

Find entities with pagination.

Source code in src/chapkit/core/manager.py
async def find_paginated(self, page: int, size: int) -> tuple[list[OutSchemaT], int]:
    """Find entities with pagination."""
    offset = (page - 1) * size
    entities = await self.repo.find_all_paginated(offset, size)
    total = await self.repo.count()
    return [self._to_output_schema(e) for e in entities], total

find_all_by_id(ids) async

Find entities by their IDs.

Source code in src/chapkit/core/manager.py
async def find_all_by_id(self, ids: Sequence[IdT]) -> list[OutSchemaT]:
    """Find entities by their IDs."""
    entities = await self.repo.find_all_by_id(ids)
    return [self._to_output_schema(e) for e in entities]

Schemas

schemas

Core Pydantic schemas for entities, responses, and jobs.

EntityIn

Bases: BaseModel

Base input schema for entities with optional ID.

Source code in src/chapkit/core/schemas.py
class EntityIn(BaseModel):
    """Base input schema for entities with optional ID."""

    model_config = ConfigDict(arbitrary_types_allowed=True)

    id: ULID | None = None

EntityOut

Bases: BaseModel

Base output schema for entities with ID and timestamps.

Source code in src/chapkit/core/schemas.py
class EntityOut(BaseModel):
    """Base output schema for entities with ID and timestamps."""

    model_config = ConfigDict(from_attributes=True, arbitrary_types_allowed=True)

    id: ULID
    created_at: datetime
    updated_at: datetime

PaginatedResponse

Bases: BaseModel, Generic[T]

Paginated response with items, total count, page number, and computed page count.

Source code in src/chapkit/core/schemas.py
class PaginatedResponse(BaseModel, Generic[T]):
    """Paginated response with items, total count, page number, and computed page count."""

    items: list[T] = Field(description="List of items for the current page")
    total: int = Field(description="Total number of items across all pages", ge=0)
    page: int = Field(description="Current page number (1-indexed)", ge=1)
    size: int = Field(description="Number of items per page", ge=1)

    @computed_field  # type: ignore[prop-decorator]
    @property
    def pages(self) -> int:
        """Total number of pages."""
        if self.total == 0:
            return 0
        return (self.total + self.size - 1) // self.size

pages property

Total number of pages.

BulkOperationError

Bases: BaseModel

Error information for a single item in a bulk operation.

Source code in src/chapkit/core/schemas.py
class BulkOperationError(BaseModel):
    """Error information for a single item in a bulk operation."""

    id: str = Field(description="Identifier of the item that failed")
    reason: str = Field(description="Human-readable error message")

BulkOperationResult

Bases: BaseModel

Result of bulk operation with counts of succeeded/failed items and error details.

Source code in src/chapkit/core/schemas.py
class BulkOperationResult(BaseModel):
    """Result of bulk operation with counts of succeeded/failed items and error details."""

    total: int = Field(description="Total number of items processed", ge=0)
    succeeded: int = Field(description="Number of items successfully processed", ge=0)
    failed: int = Field(description="Number of items that failed", ge=0)
    errors: list[BulkOperationError] = Field(default_factory=list, description="Details of failed items (if any)")

ProblemDetail

Bases: BaseModel

RFC 9457 Problem Details with URN error type, status, and human-readable messages.

Source code in src/chapkit/core/schemas.py
class ProblemDetail(BaseModel):
    """RFC 9457 Problem Details with URN error type, status, and human-readable messages."""

    type: str = Field(
        default="about:blank",
        description="URI reference identifying the problem type (URN format for chapkit errors)",
    )
    title: str = Field(description="Short, human-readable summary of the problem type")
    status: int = Field(description="HTTP status code", ge=100, le=599)
    detail: str | None = Field(default=None, description="Human-readable explanation specific to this occurrence")
    instance: str | None = Field(default=None, description="URI reference identifying the specific occurrence")
    trace_id: str | None = Field(default=None, description="Optional trace ID for debugging")

    model_config = {
        "json_schema_extra": {
            "examples": [
                {
                    "type": "urn:chapkit:error:not-found",
                    "title": "Resource Not Found",
                    "status": 404,
                    "detail": "Config with id 01ABC... not found",
                    "instance": "/api/config/01ABC...",
                }
            ]
        }
    }

JobStatus

Bases: StrEnum

Status of a scheduled job.

Source code in src/chapkit/core/schemas.py
class JobStatus(StrEnum):
    """Status of a scheduled job."""

    pending = "pending"
    running = "running"
    completed = "completed"
    failed = "failed"
    canceled = "canceled"

JobRecord

Bases: BaseModel

Complete record of a scheduled job's state and metadata.

Source code in src/chapkit/core/schemas.py
class JobRecord(BaseModel):
    """Complete record of a scheduled job's state and metadata."""

    model_config = ConfigDict(arbitrary_types_allowed=True)

    id: ULID = Field(description="Unique job identifier")
    status: JobStatus = Field(default=JobStatus.pending, description="Current job status")
    submitted_at: datetime | None = Field(default=None, description="When the job was submitted")
    started_at: datetime | None = Field(default=None, description="When the job started running")
    finished_at: datetime | None = Field(default=None, description="When the job finished")
    error: str | None = Field(default=None, description="User-friendly error message if job failed")
    error_traceback: str | None = Field(default=None, description="Full error traceback for debugging")
    artifact_id: ULID | None = Field(default=None, description="ID of artifact created by job (if job returns a ULID)")

Exceptions

exceptions

Custom exceptions with RFC 9457 Problem Details support.

ErrorType

URN-based error type identifiers for RFC 9457 Problem Details.

Source code in src/chapkit/core/exceptions.py
class ErrorType:
    """URN-based error type identifiers for RFC 9457 Problem Details."""

    NOT_FOUND = "urn:chapkit:error:not-found"
    VALIDATION_FAILED = "urn:chapkit:error:validation-failed"
    CONFLICT = "urn:chapkit:error:conflict"
    INVALID_ULID = "urn:chapkit:error:invalid-ulid"
    INTERNAL_ERROR = "urn:chapkit:error:internal"
    UNAUTHORIZED = "urn:chapkit:error:unauthorized"
    FORBIDDEN = "urn:chapkit:error:forbidden"
    BAD_REQUEST = "urn:chapkit:error:bad-request"

ChapkitException

Bases: Exception

Base exception for chapkit with RFC 9457 Problem Details support.

Source code in src/chapkit/core/exceptions.py
class ChapkitException(Exception):
    """Base exception for chapkit with RFC 9457 Problem Details support."""

    def __init__(
        self,
        detail: str,
        *,
        type_uri: str = ErrorType.INTERNAL_ERROR,
        title: str = "Internal Server Error",
        status: int = 500,
        instance: str | None = None,
        **extensions: Any,
    ) -> None:
        super().__init__(detail)
        self.type_uri = type_uri
        self.title = title
        self.status = status
        self.detail = detail
        self.instance = instance
        self.extensions = extensions

NotFoundError

Bases: ChapkitException

Resource not found exception (404).

Source code in src/chapkit/core/exceptions.py
class NotFoundError(ChapkitException):
    """Resource not found exception (404)."""

    def __init__(self, detail: str, *, instance: str | None = None, **extensions: Any) -> None:
        super().__init__(
            detail,
            type_uri=ErrorType.NOT_FOUND,
            title="Resource Not Found",
            status=404,
            instance=instance,
            **extensions,
        )

ValidationError

Bases: ChapkitException

Validation failed exception (400).

Source code in src/chapkit/core/exceptions.py
class ValidationError(ChapkitException):
    """Validation failed exception (400)."""

    def __init__(self, detail: str, *, instance: str | None = None, **extensions: Any) -> None:
        super().__init__(
            detail,
            type_uri=ErrorType.VALIDATION_FAILED,
            title="Validation Failed",
            status=400,
            instance=instance,
            **extensions,
        )

ConflictError

Bases: ChapkitException

Resource conflict exception (409).

Source code in src/chapkit/core/exceptions.py
class ConflictError(ChapkitException):
    """Resource conflict exception (409)."""

    def __init__(self, detail: str, *, instance: str | None = None, **extensions: Any) -> None:
        super().__init__(
            detail,
            type_uri=ErrorType.CONFLICT,
            title="Resource Conflict",
            status=409,
            instance=instance,
            **extensions,
        )

InvalidULIDError

Bases: ChapkitException

Invalid ULID format exception (400).

Source code in src/chapkit/core/exceptions.py
class InvalidULIDError(ChapkitException):
    """Invalid ULID format exception (400)."""

    def __init__(self, detail: str, *, instance: str | None = None, **extensions: Any) -> None:
        super().__init__(
            detail,
            type_uri=ErrorType.INVALID_ULID,
            title="Invalid ULID Format",
            status=400,
            instance=instance,
            **extensions,
        )

BadRequestError

Bases: ChapkitException

Bad request exception (400).

Source code in src/chapkit/core/exceptions.py
class BadRequestError(ChapkitException):
    """Bad request exception (400)."""

    def __init__(self, detail: str, *, instance: str | None = None, **extensions: Any) -> None:
        super().__init__(
            detail,
            type_uri=ErrorType.BAD_REQUEST,
            title="Bad Request",
            status=400,
            instance=instance,
            **extensions,
        )

UnauthorizedError

Bases: ChapkitException

Unauthorized exception (401).

Source code in src/chapkit/core/exceptions.py
class UnauthorizedError(ChapkitException):
    """Unauthorized exception (401)."""

    def __init__(self, detail: str, *, instance: str | None = None, **extensions: Any) -> None:
        super().__init__(
            detail,
            type_uri=ErrorType.UNAUTHORIZED,
            title="Unauthorized",
            status=401,
            instance=instance,
            **extensions,
        )

ForbiddenError

Bases: ChapkitException

Forbidden exception (403).

Source code in src/chapkit/core/exceptions.py
class ForbiddenError(ChapkitException):
    """Forbidden exception (403)."""

    def __init__(self, detail: str, *, instance: str | None = None, **extensions: Any) -> None:
        super().__init__(
            detail,
            type_uri=ErrorType.FORBIDDEN,
            title="Forbidden",
            status=403,
            instance=instance,
            **extensions,
        )

Scheduler

scheduler

Job scheduler for async task management with in-memory asyncio implementation.

JobScheduler

Bases: BaseModel, ABC

Abstract job scheduler interface for async task management.

Source code in src/chapkit/core/scheduler.py
class JobScheduler(BaseModel, ABC):
    """Abstract job scheduler interface for async task management."""

    model_config = ConfigDict(arbitrary_types_allowed=True)

    @abstractmethod
    async def add_job(
        self,
        target: JobTarget,
        /,
        *args: Any,
        **kwargs: Any,
    ) -> ULID:
        """Add a job to the scheduler and return its ID."""
        ...

    @abstractmethod
    async def get_status(self, job_id: ULID) -> JobStatus:
        """Get the status of a job."""
        ...

    @abstractmethod
    async def get_record(self, job_id: ULID) -> JobRecord:
        """Get the full record of a job."""
        ...

    @abstractmethod
    async def get_all_records(self) -> list[JobRecord]:
        """Get all job records."""
        ...

    @abstractmethod
    async def cancel(self, job_id: ULID) -> bool:
        """Cancel a running job."""
        ...

    @abstractmethod
    async def delete(self, job_id: ULID) -> None:
        """Delete a job record."""
        ...

    @abstractmethod
    async def wait(self, job_id: ULID, timeout: float | None = None) -> None:
        """Wait for a job to complete."""
        ...

    @abstractmethod
    async def get_result(self, job_id: ULID) -> Any:
        """Get the result of a completed job."""
        ...

add_job(target, /, *args, **kwargs) abstractmethod async

Add a job to the scheduler and return its ID.

Source code in src/chapkit/core/scheduler.py
@abstractmethod
async def add_job(
    self,
    target: JobTarget,
    /,
    *args: Any,
    **kwargs: Any,
) -> ULID:
    """Add a job to the scheduler and return its ID."""
    ...

get_status(job_id) abstractmethod async

Get the status of a job.

Source code in src/chapkit/core/scheduler.py
@abstractmethod
async def get_status(self, job_id: ULID) -> JobStatus:
    """Get the status of a job."""
    ...

get_record(job_id) abstractmethod async

Get the full record of a job.

Source code in src/chapkit/core/scheduler.py
@abstractmethod
async def get_record(self, job_id: ULID) -> JobRecord:
    """Get the full record of a job."""
    ...

get_all_records() abstractmethod async

Get all job records.

Source code in src/chapkit/core/scheduler.py
@abstractmethod
async def get_all_records(self) -> list[JobRecord]:
    """Get all job records."""
    ...

cancel(job_id) abstractmethod async

Cancel a running job.

Source code in src/chapkit/core/scheduler.py
@abstractmethod
async def cancel(self, job_id: ULID) -> bool:
    """Cancel a running job."""
    ...

delete(job_id) abstractmethod async

Delete a job record.

Source code in src/chapkit/core/scheduler.py
@abstractmethod
async def delete(self, job_id: ULID) -> None:
    """Delete a job record."""
    ...

wait(job_id, timeout=None) abstractmethod async

Wait for a job to complete.

Source code in src/chapkit/core/scheduler.py
@abstractmethod
async def wait(self, job_id: ULID, timeout: float | None = None) -> None:
    """Wait for a job to complete."""
    ...

get_result(job_id) abstractmethod async

Get the result of a completed job.

Source code in src/chapkit/core/scheduler.py
@abstractmethod
async def get_result(self, job_id: ULID) -> Any:
    """Get the result of a completed job."""
    ...

AIOJobScheduler

Bases: JobScheduler

In-memory asyncio scheduler. Sync callables run in thread pool, concurrency controlled via semaphore.

Source code in src/chapkit/core/scheduler.py
class AIOJobScheduler(JobScheduler):
    """In-memory asyncio scheduler. Sync callables run in thread pool, concurrency controlled via semaphore."""

    name: str = Field(default="chap")
    max_concurrency: int | None = Field(default=None)

    _records: dict[ULID, JobRecord] = PrivateAttr(default_factory=dict)
    _results: dict[ULID, Any] = PrivateAttr(default_factory=dict)
    _tasks: dict[ULID, asyncio.Task[Any]] = PrivateAttr(default_factory=dict)
    _lock: asyncio.Lock = PrivateAttr(default_factory=asyncio.Lock)
    _sema: asyncio.Semaphore | None = PrivateAttr(default=None)

    def __init__(self, **data: Any):
        """Initialize scheduler with optional concurrency limit."""
        super().__init__(**data)
        if self.max_concurrency and self.max_concurrency > 0:
            self._sema = asyncio.Semaphore(self.max_concurrency)

    async def set_max_concurrency(self, n: int | None) -> None:
        """Set maximum number of concurrent jobs."""
        async with self._lock:
            self.max_concurrency = n
            if n and n > 0:
                self._sema = asyncio.Semaphore(n)
            else:
                self._sema = None

    async def add_job(
        self,
        target: JobTarget,
        /,
        *args: Any,
        **kwargs: Any,
    ) -> ULID:
        """Add a job to the scheduler and return its ID."""
        now = datetime.now(timezone.utc)
        jid = ULID()

        record = JobRecord(
            id=jid,
            status=JobStatus.pending,
            submitted_at=now,
        )

        async with self._lock:
            if jid in self._tasks:
                raise RuntimeError(f"Job {jid!r} already scheduled")
            self._records[jid] = record

        async def _execute_target() -> Any:
            if inspect.isawaitable(target):
                if args or kwargs:
                    # Close the coroutine to avoid "coroutine was never awaited" warning
                    if inspect.iscoroutine(target):
                        target.close()
                    raise TypeError("Args/kwargs not supported when target is an awaitable object.")
                return await target
            if inspect.iscoroutinefunction(target):
                return await target(*args, **kwargs)
            return await asyncio.to_thread(target, *args, **kwargs)

        async def _runner() -> Any:
            if self._sema:
                async with self._sema:
                    return await self._run_with_state(jid, _execute_target)
            else:
                return await self._run_with_state(jid, _execute_target)

        task = asyncio.create_task(_runner(), name=f"{self.name}-job-{jid}")

        def _drain(t: asyncio.Task[Any]) -> None:
            try:
                t.result()
            except Exception:
                pass

        task.add_done_callback(_drain)

        async with self._lock:
            self._tasks[jid] = task

        return jid

    async def _run_with_state(
        self,
        jid: ULID,
        exec_fn: JobExecutor,
    ) -> Any:
        """Execute job function and manage its state transitions."""
        async with self._lock:
            rec = self._records[jid]
            rec.status = JobStatus.running
            rec.started_at = datetime.now(timezone.utc)

        try:
            result = await exec_fn()

            artifact: ULID | None = result if isinstance(result, ULID) else None

            async with self._lock:
                rec = self._records[jid]
                rec.status = JobStatus.completed
                rec.finished_at = datetime.now(timezone.utc)
                rec.artifact_id = artifact
                self._results[jid] = result

            return result

        except asyncio.CancelledError:
            async with self._lock:
                rec = self._records[jid]
                rec.status = JobStatus.canceled
                rec.finished_at = datetime.now(timezone.utc)

            raise

        except Exception as e:
            tb = traceback.format_exc()
            # Extract clean error message (exception type and message only)
            error_lines = tb.strip().split("\n")
            clean_error = error_lines[-1] if error_lines else str(e)

            async with self._lock:
                rec = self._records[jid]
                rec.status = JobStatus.failed
                rec.finished_at = datetime.now(timezone.utc)
                rec.error = clean_error
                rec.error_traceback = tb

            raise

    async def get_all_records(self) -> list[JobRecord]:
        """Get all job records sorted by submission time."""
        async with self._lock:
            records = [r.model_copy(deep=True) for r in self._records.values()]

        records.sort(
            key=lambda r: getattr(r, "submitted_at", datetime.min.replace(tzinfo=timezone.utc)),
            reverse=True,
        )

        return records

    async def get_record(self, job_id: ULID) -> JobRecord:
        """Get the full record of a job."""
        async with self._lock:
            rec = self._records.get(job_id)

            if rec is None:
                raise KeyError("Job not found")

            return rec.model_copy(deep=True)

    async def get_status(self, job_id: ULID) -> JobStatus:
        """Get the status of a job."""
        async with self._lock:
            rec = self._records.get(job_id)

            if rec is None:
                raise KeyError("Job not found")

            return rec.status

    async def get_result(self, job_id: ULID) -> Any:
        """Get the result of a completed job."""
        async with self._lock:
            rec = self._records.get(job_id)

            if rec is None:
                raise KeyError("Job not found")

            if rec.status == JobStatus.completed:
                return self._results.get(job_id)

            if rec.status == JobStatus.failed:
                msg = getattr(rec, "error", "Job failed")
                raise RuntimeError(msg)

            raise RuntimeError(f"Job not finished (status={rec.status})")

    async def wait(self, job_id: ULID, timeout: float | None = None) -> None:
        """Wait for a job to complete."""
        async with self._lock:
            task = self._tasks.get(job_id)

            if task is None:
                raise KeyError("Job not found")

        await asyncio.wait_for(asyncio.shield(task), timeout=timeout)

    async def cancel(self, job_id: ULID) -> bool:
        """Cancel a running job."""
        async with self._lock:
            task = self._tasks.get(job_id)
            exists = job_id in self._records

        if not exists:
            raise KeyError("Job not found")

        if not task or task.done():
            return False

        task.cancel()

        try:
            await task
        except asyncio.CancelledError:
            pass

        return True

    async def delete(self, job_id: ULID) -> None:
        """Delete a job record."""
        async with self._lock:
            rec = self._records.get(job_id)
            task = self._tasks.get(job_id)

        if rec is None:
            raise KeyError("Job not found")

        if task and not task.done():
            task.cancel()

            try:
                await task
            except asyncio.CancelledError:
                pass

        async with self._lock:
            self._records.pop(job_id, None)
            self._tasks.pop(job_id, None)
            self._results.pop(job_id, None)

__init__(**data)

Initialize scheduler with optional concurrency limit.

Source code in src/chapkit/core/scheduler.py
def __init__(self, **data: Any):
    """Initialize scheduler with optional concurrency limit."""
    super().__init__(**data)
    if self.max_concurrency and self.max_concurrency > 0:
        self._sema = asyncio.Semaphore(self.max_concurrency)

set_max_concurrency(n) async

Set maximum number of concurrent jobs.

Source code in src/chapkit/core/scheduler.py
async def set_max_concurrency(self, n: int | None) -> None:
    """Set maximum number of concurrent jobs."""
    async with self._lock:
        self.max_concurrency = n
        if n and n > 0:
            self._sema = asyncio.Semaphore(n)
        else:
            self._sema = None

add_job(target, /, *args, **kwargs) async

Add a job to the scheduler and return its ID.

Source code in src/chapkit/core/scheduler.py
async def add_job(
    self,
    target: JobTarget,
    /,
    *args: Any,
    **kwargs: Any,
) -> ULID:
    """Add a job to the scheduler and return its ID."""
    now = datetime.now(timezone.utc)
    jid = ULID()

    record = JobRecord(
        id=jid,
        status=JobStatus.pending,
        submitted_at=now,
    )

    async with self._lock:
        if jid in self._tasks:
            raise RuntimeError(f"Job {jid!r} already scheduled")
        self._records[jid] = record

    async def _execute_target() -> Any:
        if inspect.isawaitable(target):
            if args or kwargs:
                # Close the coroutine to avoid "coroutine was never awaited" warning
                if inspect.iscoroutine(target):
                    target.close()
                raise TypeError("Args/kwargs not supported when target is an awaitable object.")
            return await target
        if inspect.iscoroutinefunction(target):
            return await target(*args, **kwargs)
        return await asyncio.to_thread(target, *args, **kwargs)

    async def _runner() -> Any:
        if self._sema:
            async with self._sema:
                return await self._run_with_state(jid, _execute_target)
        else:
            return await self._run_with_state(jid, _execute_target)

    task = asyncio.create_task(_runner(), name=f"{self.name}-job-{jid}")

    def _drain(t: asyncio.Task[Any]) -> None:
        try:
            t.result()
        except Exception:
            pass

    task.add_done_callback(_drain)

    async with self._lock:
        self._tasks[jid] = task

    return jid

get_all_records() async

Get all job records sorted by submission time.

Source code in src/chapkit/core/scheduler.py
async def get_all_records(self) -> list[JobRecord]:
    """Get all job records sorted by submission time."""
    async with self._lock:
        records = [r.model_copy(deep=True) for r in self._records.values()]

    records.sort(
        key=lambda r: getattr(r, "submitted_at", datetime.min.replace(tzinfo=timezone.utc)),
        reverse=True,
    )

    return records

get_record(job_id) async

Get the full record of a job.

Source code in src/chapkit/core/scheduler.py
async def get_record(self, job_id: ULID) -> JobRecord:
    """Get the full record of a job."""
    async with self._lock:
        rec = self._records.get(job_id)

        if rec is None:
            raise KeyError("Job not found")

        return rec.model_copy(deep=True)

get_status(job_id) async

Get the status of a job.

Source code in src/chapkit/core/scheduler.py
async def get_status(self, job_id: ULID) -> JobStatus:
    """Get the status of a job."""
    async with self._lock:
        rec = self._records.get(job_id)

        if rec is None:
            raise KeyError("Job not found")

        return rec.status

get_result(job_id) async

Get the result of a completed job.

Source code in src/chapkit/core/scheduler.py
async def get_result(self, job_id: ULID) -> Any:
    """Get the result of a completed job."""
    async with self._lock:
        rec = self._records.get(job_id)

        if rec is None:
            raise KeyError("Job not found")

        if rec.status == JobStatus.completed:
            return self._results.get(job_id)

        if rec.status == JobStatus.failed:
            msg = getattr(rec, "error", "Job failed")
            raise RuntimeError(msg)

        raise RuntimeError(f"Job not finished (status={rec.status})")

wait(job_id, timeout=None) async

Wait for a job to complete.

Source code in src/chapkit/core/scheduler.py
async def wait(self, job_id: ULID, timeout: float | None = None) -> None:
    """Wait for a job to complete."""
    async with self._lock:
        task = self._tasks.get(job_id)

        if task is None:
            raise KeyError("Job not found")

    await asyncio.wait_for(asyncio.shield(task), timeout=timeout)

cancel(job_id) async

Cancel a running job.

Source code in src/chapkit/core/scheduler.py
async def cancel(self, job_id: ULID) -> bool:
    """Cancel a running job."""
    async with self._lock:
        task = self._tasks.get(job_id)
        exists = job_id in self._records

    if not exists:
        raise KeyError("Job not found")

    if not task or task.done():
        return False

    task.cancel()

    try:
        await task
    except asyncio.CancelledError:
        pass

    return True

delete(job_id) async

Delete a job record.

Source code in src/chapkit/core/scheduler.py
async def delete(self, job_id: ULID) -> None:
    """Delete a job record."""
    async with self._lock:
        rec = self._records.get(job_id)
        task = self._tasks.get(job_id)

    if rec is None:
        raise KeyError("Job not found")

    if task and not task.done():
        task.cancel()

        try:
            await task
        except asyncio.CancelledError:
            pass

    async with self._lock:
        self._records.pop(job_id, None)
        self._tasks.pop(job_id, None)
        self._results.pop(job_id, None)

Types

types

Custom types for chapkit - SQLAlchemy and Pydantic types.

JsonSafe = Annotated[Any, PlainSerializer(_serialize_with_metadata, return_type=Any)] module-attribute

Pydantic type for JSON-safe serialization with graceful handling of non-serializable values.

ULIDType

Bases: TypeDecorator[ULID]

SQLAlchemy custom type for ULID stored as 26-character strings.

Source code in src/chapkit/core/types.py
class ULIDType(TypeDecorator[ULID]):
    """SQLAlchemy custom type for ULID stored as 26-character strings."""

    impl = String(26)
    cache_ok = True

    def process_bind_param(self, value: ULID | str | None, dialect: Any) -> str | None:
        """Convert ULID to string for database storage."""
        if value is None:
            return None
        if isinstance(value, str):
            return str(ULID.from_str(value))  # Validate and normalize
        return str(value)

    def process_result_value(self, value: str | None, dialect: Any) -> ULID | None:
        """Convert string from database to ULID object."""
        if value is None:
            return None
        return ULID.from_str(value)

process_bind_param(value, dialect)

Convert ULID to string for database storage.

Source code in src/chapkit/core/types.py
def process_bind_param(self, value: ULID | str | None, dialect: Any) -> str | None:
    """Convert ULID to string for database storage."""
    if value is None:
        return None
    if isinstance(value, str):
        return str(ULID.from_str(value))  # Validate and normalize
    return str(value)

process_result_value(value, dialect)

Convert string from database to ULID object.

Source code in src/chapkit/core/types.py
def process_result_value(self, value: str | None, dialect: Any) -> ULID | None:
    """Convert string from database to ULID object."""
    if value is None:
        return None
    return ULID.from_str(value)

Logging

logging

Structured logging configuration with request tracing support.

configure_logging()

Configure structlog and intercept standard library logging.

Source code in src/chapkit/core/logging.py
def configure_logging() -> None:
    """Configure structlog and intercept standard library logging."""
    log_format = os.getenv("LOG_FORMAT", "console").lower()
    log_level = os.getenv("LOG_LEVEL", "INFO").upper()
    level = getattr(logging, log_level, logging.INFO)

    # Shared processors for structlog
    shared_processors: list[Processor] = [
        structlog.contextvars.merge_contextvars,
        structlog.stdlib.add_log_level,
        structlog.processors.TimeStamper(fmt="iso", utc=True),
        structlog.processors.StackInfoRenderer(),
    ]

    # Choose renderer based on format
    if log_format == "json":
        formatter_processors = shared_processors + [
            structlog.stdlib.ProcessorFormatter.remove_processors_meta,
            structlog.processors.format_exc_info,
            structlog.processors.JSONRenderer(),
        ]
    else:
        formatter_processors = shared_processors + [
            structlog.stdlib.ProcessorFormatter.remove_processors_meta,
            structlog.processors.ExceptionRenderer(),
            structlog.dev.ConsoleRenderer(colors=True),
        ]

    # Configure structlog to use standard library logging
    structlog.configure(
        processors=[
            structlog.contextvars.merge_contextvars,
            structlog.stdlib.add_log_level,
            structlog.stdlib.add_logger_name,
            structlog.processors.TimeStamper(fmt="iso", utc=True),
            structlog.processors.StackInfoRenderer(),
            structlog.processors.CallsiteParameterAdder(
                [
                    structlog.processors.CallsiteParameter.FILENAME,
                    structlog.processors.CallsiteParameter.LINENO,
                    structlog.processors.CallsiteParameter.FUNC_NAME,
                ]
            ),
            structlog.stdlib.ProcessorFormatter.wrap_for_formatter,
        ],
        wrapper_class=structlog.make_filtering_bound_logger(level),
        context_class=dict,
        logger_factory=structlog.stdlib.LoggerFactory(),
        cache_logger_on_first_use=True,
    )

    # Configure standard library logging to use structlog formatter
    handler = logging.StreamHandler(sys.stdout)
    handler.setFormatter(structlog.stdlib.ProcessorFormatter(processors=formatter_processors))

    # Configure root logger
    root_logger = logging.getLogger()
    root_logger.handlers.clear()
    root_logger.addHandler(handler)
    root_logger.setLevel(level)

    # Configure uvicorn and gunicorn loggers to use the same handler
    for logger_name in ["uvicorn", "uvicorn.access", "uvicorn.error", "gunicorn.access", "gunicorn.error"]:
        logger = logging.getLogger(logger_name)
        logger.handlers.clear()
        logger.addHandler(handler)
        logger.setLevel(level)
        logger.propagate = False

get_logger(name=None)

Get a configured structlog logger instance.

Source code in src/chapkit/core/logging.py
def get_logger(name: str | None = None) -> Any:
    """Get a configured structlog logger instance."""
    return structlog.get_logger(name)

add_request_context(**context)

Add context variables that will be included in all log messages.

Source code in src/chapkit/core/logging.py
def add_request_context(**context: Any) -> None:
    """Add context variables that will be included in all log messages."""
    structlog.contextvars.bind_contextvars(**context)

clear_request_context(*keys)

Clear specific context variables.

Source code in src/chapkit/core/logging.py
def clear_request_context(*keys: str) -> None:
    """Clear specific context variables."""
    structlog.contextvars.unbind_contextvars(*keys)

reset_request_context()

Clear all context variables.

Source code in src/chapkit/core/logging.py
def reset_request_context() -> None:
    """Clear all context variables."""
    structlog.contextvars.clear_contextvars()

FastAPI Layer

FastAPI-specific components for building web services.

Service Builders

Service builder classes for composing FastAPI applications.

BaseServiceBuilder

BaseServiceBuilder

Base service builder providing core FastAPI functionality without module dependencies.

Source code in src/chapkit/core/api/service_builder.py
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
class BaseServiceBuilder:
    """Base service builder providing core FastAPI functionality without module dependencies."""

    def __init__(
        self,
        *,
        info: ServiceInfo,
        database_url: str = "sqlite+aiosqlite:///:memory:",
        include_error_handlers: bool = True,
        include_logging: bool = False,
    ) -> None:
        """Initialize base service builder with core options."""
        if info.description is None and info.summary is not None:
            # Preserve summary as description for FastAPI metadata if description missing
            self.info = info.model_copy(update={"description": info.summary})
        else:
            self.info = info
        self._title = self.info.display_name
        self._app_description = self.info.summary or self.info.description or ""
        self._version = self.info.version
        self._database_url = database_url
        self._database_instance: Database | None = None
        self._pool_size: int = 5
        self._max_overflow: int = 10
        self._pool_recycle: int = 3600
        self._pool_pre_ping: bool = True
        self._include_error_handlers = include_error_handlers
        self._include_logging = include_logging
        self._health_options: _HealthOptions | None = None
        self._system_options: _SystemOptions | None = None
        self._job_options: _JobOptions | None = None
        self._auth_options: _AuthOptions | None = None
        self._monitoring_options: _MonitoringOptions | None = None
        self._app_configs: List[App] = []
        self._custom_routers: List[APIRouter] = []
        self._dependency_overrides: Dict[DependencyOverride, DependencyOverride] = {}
        self._startup_hooks: List[LifecycleHook] = []
        self._shutdown_hooks: List[LifecycleHook] = []

    # --------------------------------------------------------------------- Fluent configuration

    def with_database(
        self,
        url_or_instance: str | Database | None = None,
        *,
        pool_size: int = 5,
        max_overflow: int = 10,
        pool_recycle: int = 3600,
        pool_pre_ping: bool = True,
    ) -> Self:
        """Configure database with URL string, Database instance, or default in-memory SQLite."""
        if isinstance(url_or_instance, Database):
            # Pre-configured instance provided
            self._database_instance = url_or_instance
            return self  # Skip pool configuration for instances
        elif isinstance(url_or_instance, str):
            # String URL provided
            self._database_url = url_or_instance
        elif url_or_instance is None:
            # Default: in-memory SQLite
            self._database_url = "sqlite+aiosqlite:///:memory:"
        else:
            raise TypeError(
                f"Expected str, Database, or None, got {type(url_or_instance).__name__}. "
                "Use .with_database() for default, .with_database('url') for custom URL, "
                "or .with_database(db_instance) for pre-configured database."
            )

        # Configure pool settings (only applies to URL-based databases)
        self._pool_size = pool_size
        self._max_overflow = max_overflow
        self._pool_recycle = pool_recycle
        self._pool_pre_ping = pool_pre_ping
        return self

    def with_landing_page(self) -> Self:
        """Enable landing page at root path."""
        return self.with_app(("chapkit.core.api", "apps/landing"))

    def with_logging(self, enabled: bool = True) -> Self:
        """Enable structured logging with request tracing."""
        self._include_logging = enabled
        return self

    def with_health(
        self,
        *,
        prefix: str = "/health",
        tags: List[str] | None = None,
        checks: dict[str, HealthCheck] | None = None,
        include_database_check: bool = True,
    ) -> Self:
        """Add health check endpoint with optional custom checks."""
        health_checks = checks or {}

        if include_database_check:
            health_checks["database"] = self._create_database_health_check()

        self._health_options = _HealthOptions(
            prefix=prefix,
            tags=list(tags) if tags is not None else ["Observability"],
            checks=health_checks,
        )
        return self

    def with_system(
        self,
        *,
        prefix: str = "/api/v1/system",
        tags: List[str] | None = None,
    ) -> Self:
        """Add system info endpoint."""
        self._system_options = _SystemOptions(
            prefix=prefix,
            tags=list(tags) if tags is not None else ["Service"],
        )
        return self

    def with_jobs(
        self,
        *,
        prefix: str = "/api/v1/jobs",
        tags: List[str] | None = None,
        max_concurrency: int | None = None,
    ) -> Self:
        """Add job scheduler endpoints."""
        self._job_options = _JobOptions(
            prefix=prefix,
            tags=list(tags) if tags is not None else ["Jobs"],
            max_concurrency=max_concurrency,
        )
        return self

    def with_auth(
        self,
        *,
        api_keys: List[str] | None = None,
        api_key_file: str | None = None,
        env_var: str = "CHAPKIT_API_KEYS",
        header_name: str = "X-API-Key",
        unauthenticated_paths: List[str] | None = None,
    ) -> Self:
        """Enable API key authentication."""
        keys: set[str] = set()
        auth_source: str = ""  # Track source for later logging

        # Priority 1: Direct list (examples/dev)
        if api_keys is not None:
            keys = set(api_keys)
            auth_source = "direct_keys"

        # Priority 2: File (Docker secrets)
        elif api_key_file is not None:
            keys = load_api_keys_from_file(api_key_file)
            auth_source = f"file:{api_key_file}"

        # Priority 3: Environment variable (default)
        else:
            keys = load_api_keys_from_env(env_var)
            if keys:
                auth_source = f"env:{env_var}"
            else:
                auth_source = f"env:{env_var}:empty"

        if not keys:
            raise ValueError("No API keys configured. Provide api_keys, api_key_file, or set environment variable.")

        # Default unauthenticated paths
        default_unauth = {"/docs", "/redoc", "/openapi.json", "/health", "/"}
        unauth_set = set(unauthenticated_paths) if unauthenticated_paths else default_unauth

        self._auth_options = _AuthOptions(
            api_keys=keys,
            header_name=header_name,
            unauthenticated_paths=unauth_set,
            source=auth_source,
        )
        return self

    def with_monitoring(
        self,
        *,
        prefix: str = "/metrics",
        tags: List[str] | None = None,
        service_name: str | None = None,
        enable_traces: bool = False,
    ) -> Self:
        """Enable OpenTelemetry monitoring with Prometheus endpoint and auto-instrumentation."""
        self._monitoring_options = _MonitoringOptions(
            prefix=prefix,
            tags=list(tags) if tags is not None else ["Observability"],
            service_name=service_name,
            enable_traces=enable_traces,
        )
        return self

    def with_app(self, path: str | Path | tuple[str, str], prefix: str | None = None) -> Self:
        """Register static app from filesystem path or package resource tuple."""
        app = AppLoader.load(path, prefix=prefix)
        self._app_configs.append(app)
        return self

    def with_apps(self, path: str | Path | tuple[str, str]) -> Self:
        """Auto-discover and register all apps in directory."""
        apps = AppLoader.discover(path)
        self._app_configs.extend(apps)
        return self

    def include_router(self, router: APIRouter) -> Self:
        """Include a custom router."""
        self._custom_routers.append(router)
        return self

    def override_dependency(self, dependency: DependencyOverride, override: DependencyOverride) -> Self:
        """Override a dependency for testing or customization."""
        self._dependency_overrides[dependency] = override
        return self

    def on_startup(self, hook: LifecycleHook) -> Self:
        """Register a startup hook."""
        self._startup_hooks.append(hook)
        return self

    def on_shutdown(self, hook: LifecycleHook) -> Self:
        """Register a shutdown hook."""
        self._shutdown_hooks.append(hook)
        return self

    # --------------------------------------------------------------------- Build mechanics

    def build(self) -> FastAPI:
        """Build and configure the FastAPI application."""
        self._validate_configuration()
        self._validate_module_configuration()  # Extension point for subclasses

        lifespan = self._build_lifespan()
        app = FastAPI(
            title=self._title,
            description=self._app_description,
            version=self._version,
            lifespan=lifespan,
        )
        app.state.database_url = self._database_url

        # Override schema generation to clean up generic type names
        app.openapi = self._create_openapi_customizer(app)  # type: ignore[method-assign]

        if self._include_error_handlers:
            add_error_handlers(app)

        if self._include_logging:
            add_logging_middleware(app)

        if self._auth_options:
            app.add_middleware(
                APIKeyMiddleware,
                api_keys=self._auth_options.api_keys,
                header_name=self._auth_options.header_name,
                unauthenticated_paths=self._auth_options.unauthenticated_paths,
            )
            # Store auth_source for logging during startup
            app.state.auth_source = self._auth_options.source
            app.state.auth_key_count = len(self._auth_options.api_keys)

        if self._health_options:
            health_router = HealthRouter.create(
                prefix=self._health_options.prefix,
                tags=self._health_options.tags,
                checks=self._health_options.checks,
            )
            app.include_router(health_router)

        if self._system_options:
            system_router = SystemRouter.create(
                prefix=self._system_options.prefix,
                tags=self._system_options.tags,
            )
            app.include_router(system_router)

        if self._job_options:
            job_router = JobRouter.create(
                prefix=self._job_options.prefix,
                tags=self._job_options.tags,
                scheduler_factory=get_scheduler,
            )
            app.include_router(job_router)

        if self._monitoring_options:
            from .monitoring import setup_monitoring

            metric_reader = setup_monitoring(
                app,
                service_name=self._monitoring_options.service_name,
                enable_traces=self._monitoring_options.enable_traces,
            )
            metrics_router = MetricsRouter.create(
                prefix=self._monitoring_options.prefix,
                tags=self._monitoring_options.tags,
                metric_reader=metric_reader,
            )
            app.include_router(metrics_router)

        # Extension point for module-specific routers
        self._register_module_routers(app)

        for router in self._custom_routers:
            app.include_router(router)

        # Install route endpoints BEFORE mounting apps (routes take precedence over mounts)
        self._install_info_endpoint(app, info=self.info)

        # Mount apps AFTER all routes (apps act as catch-all for unmatched paths)
        if self._app_configs:
            from fastapi.staticfiles import StaticFiles

            for app_config in self._app_configs:
                static_files = StaticFiles(directory=str(app_config.directory), html=True)
                app.mount(app_config.prefix, static_files, name=f"app_{app_config.manifest.name}")
                logger.info(
                    "app.mounted",
                    name=app_config.manifest.name,
                    prefix=app_config.prefix,
                    directory=str(app_config.directory),
                    is_package=app_config.is_package,
                )

        # Initialize app manager for metadata queries (always, even if no apps)
        from .app import AppManager
        from .dependencies import set_app_manager

        app_manager = AppManager(self._app_configs)
        set_app_manager(app_manager)

        for dependency, override in self._dependency_overrides.items():
            app.dependency_overrides[dependency] = override

        return app

    # --------------------------------------------------------------------- Extension points

    def _validate_module_configuration(self) -> None:
        """Extension point for module-specific validation (override in subclasses)."""
        pass

    def _register_module_routers(self, app: FastAPI) -> None:
        """Extension point for registering module-specific routers (override in subclasses)."""
        pass

    # --------------------------------------------------------------------- Core helpers

    def _validate_configuration(self) -> None:
        """Validate core configuration."""
        # Validate health check names don't contain invalid characters
        if self._health_options:
            for name in self._health_options.checks.keys():
                if not name.replace("_", "").replace("-", "").isalnum():
                    raise ValueError(
                        f"Health check name '{name}' contains invalid characters. "
                        "Only alphanumeric characters, underscores, and hyphens are allowed."
                    )

        # Validate app configurations
        if self._app_configs:
            # Deduplicate apps with same prefix (last one wins)
            # This allows overriding apps, especially useful for root prefix "/"
            seen_prefixes: dict[str, int] = {}  # prefix -> last index
            for i, app in enumerate(self._app_configs):
                if app.prefix in seen_prefixes:
                    # Log warning about override
                    prev_idx = seen_prefixes[app.prefix]
                    prev_app = self._app_configs[prev_idx]
                    logger.warning(
                        "app.prefix.override",
                        prefix=app.prefix,
                        replaced_app=prev_app.manifest.name,
                        new_app=app.manifest.name,
                    )
                seen_prefixes[app.prefix] = i

            # Keep only the last app for each prefix
            self._app_configs = [self._app_configs[i] for i in sorted(set(seen_prefixes.values()))]

            # Validate that non-root prefixes don't have duplicates (shouldn't happen after dedup, but safety check)
            prefixes = [app.prefix for app in self._app_configs]
            if len(prefixes) != len(set(prefixes)):
                raise ValueError("Internal error: duplicate prefixes after deduplication")

    def _build_lifespan(self) -> LifespanFactory:
        """Build lifespan context manager for app startup/shutdown."""
        database_url = self._database_url
        database_instance = self._database_instance
        pool_size = self._pool_size
        max_overflow = self._max_overflow
        pool_recycle = self._pool_recycle
        pool_pre_ping = self._pool_pre_ping
        job_options = self._job_options
        include_logging = self._include_logging
        startup_hooks = list(self._startup_hooks)
        shutdown_hooks = list(self._shutdown_hooks)

        @asynccontextmanager
        async def lifespan(app: FastAPI) -> AsyncIterator[None]:
            # Configure logging if enabled
            if include_logging:
                configure_logging()

            # Use injected database or create new one from URL
            if database_instance is not None:
                database = database_instance
                should_manage_lifecycle = False
            else:
                # Create appropriate database type based on URL
                if "sqlite" in database_url.lower():
                    database = SqliteDatabase(
                        database_url,
                        pool_size=pool_size,
                        max_overflow=max_overflow,
                        pool_recycle=pool_recycle,
                        pool_pre_ping=pool_pre_ping,
                    )
                else:
                    database = Database(
                        database_url,
                        pool_size=pool_size,
                        max_overflow=max_overflow,
                        pool_recycle=pool_recycle,
                        pool_pre_ping=pool_pre_ping,
                    )
                should_manage_lifecycle = True

            # Always initialize database (safe to call multiple times)
            await database.init()

            set_database(database)
            app.state.database = database

            # Initialize scheduler if jobs are enabled
            if job_options is not None:
                from chapkit.core.scheduler import AIOJobScheduler

                scheduler = AIOJobScheduler(max_concurrency=job_options.max_concurrency)
                set_scheduler(scheduler)
                app.state.scheduler = scheduler

            # Log auth configuration after logging is configured
            if hasattr(app.state, "auth_source"):
                auth_source = app.state.auth_source
                key_count = app.state.auth_key_count

                if auth_source == "direct_keys":
                    logger.warning(
                        "auth.direct_keys",
                        message="Using direct API keys - not recommended for production",
                        count=key_count,
                    )
                elif auth_source.startswith("file:"):
                    file_path = auth_source.split(":", 1)[1]
                    logger.info("auth.loaded_from_file", file=file_path, count=key_count)
                elif auth_source.startswith("env:"):
                    parts = auth_source.split(":", 2)
                    env_var = parts[1]
                    if len(parts) > 2 and parts[2] == "empty":
                        logger.warning(
                            "auth.no_keys",
                            message=f"No API keys found in {env_var}. Service will reject all requests.",
                        )
                    else:
                        logger.info("auth.loaded_from_env", env_var=env_var, count=key_count)

            for hook in startup_hooks:
                await hook(app)
            try:
                yield
            finally:
                for hook in shutdown_hooks:
                    await hook(app)
                app.state.database = None

                # Dispose database only if we created it
                if should_manage_lifecycle:
                    await database.dispose()

        return lifespan

    @staticmethod
    def _create_database_health_check() -> HealthCheck:
        """Create database connectivity health check."""

        async def check_database() -> tuple[HealthState, str | None]:
            try:
                db = get_database()
                async with db.session() as session:
                    # Simple connectivity check - execute a trivial query
                    await session.execute(text("SELECT 1"))
                    return (HealthState.HEALTHY, None)
            except Exception as e:
                return (HealthState.UNHEALTHY, f"Database connection failed: {str(e)}")

        return check_database

    @staticmethod
    def _create_openapi_customizer(app: FastAPI) -> Callable[[], dict[str, Any]]:
        """Create OpenAPI schema customizer that cleans up generic type names."""

        def custom_openapi() -> dict[str, Any]:
            if app.openapi_schema:
                return app.openapi_schema

            from fastapi.openapi.utils import get_openapi

            openapi_schema = get_openapi(
                title=app.title,
                version=app.version,
                description=app.description,
                routes=app.routes,
            )

            # Clean up schema titles by removing generic type parameters
            if "components" in openapi_schema and "schemas" in openapi_schema["components"]:
                schemas = openapi_schema["components"]["schemas"]
                cleaned_schemas: dict[str, Any] = {}

                for schema_name, schema_def in schemas.items():
                    # Remove generic type parameters from schema names
                    clean_name = re.sub(r"\[.*?\]", "", schema_name)
                    # If title exists in schema, clean it too
                    if isinstance(schema_def, dict) and "title" in schema_def:
                        schema_def["title"] = re.sub(r"\[.*?\]", "", schema_def["title"])
                    cleaned_schemas[clean_name] = schema_def

                openapi_schema["components"]["schemas"] = cleaned_schemas

                # Update all $ref pointers to use cleaned names
                def clean_refs(obj: Any) -> Any:
                    if isinstance(obj, dict):
                        if "$ref" in obj:
                            obj["$ref"] = re.sub(r"\[.*?\]", "", obj["$ref"])
                        for value in obj.values():
                            clean_refs(value)
                    elif isinstance(obj, list):
                        for item in obj:
                            clean_refs(item)

                clean_refs(openapi_schema)

            app.openapi_schema = openapi_schema
            return app.openapi_schema

        return custom_openapi

    @staticmethod
    def _install_info_endpoint(app: FastAPI, *, info: ServiceInfo) -> None:
        """Install service info endpoint."""
        info_type = type(info)

        @app.get("/api/v1/info", tags=["Service"], include_in_schema=True, response_model=info_type)
        async def get_info() -> ServiceInfo:
            return info

    # --------------------------------------------------------------------- Convenience

    @classmethod
    def create(cls, *, info: ServiceInfo, **kwargs: Any) -> FastAPI:
        """Create and build a FastAPI application in one call."""
        return cls(info=info, **kwargs).build()

__init__(*, info, database_url='sqlite+aiosqlite:///:memory:', include_error_handlers=True, include_logging=False)

Initialize base service builder with core options.

Source code in src/chapkit/core/api/service_builder.py
def __init__(
    self,
    *,
    info: ServiceInfo,
    database_url: str = "sqlite+aiosqlite:///:memory:",
    include_error_handlers: bool = True,
    include_logging: bool = False,
) -> None:
    """Initialize base service builder with core options."""
    if info.description is None and info.summary is not None:
        # Preserve summary as description for FastAPI metadata if description missing
        self.info = info.model_copy(update={"description": info.summary})
    else:
        self.info = info
    self._title = self.info.display_name
    self._app_description = self.info.summary or self.info.description or ""
    self._version = self.info.version
    self._database_url = database_url
    self._database_instance: Database | None = None
    self._pool_size: int = 5
    self._max_overflow: int = 10
    self._pool_recycle: int = 3600
    self._pool_pre_ping: bool = True
    self._include_error_handlers = include_error_handlers
    self._include_logging = include_logging
    self._health_options: _HealthOptions | None = None
    self._system_options: _SystemOptions | None = None
    self._job_options: _JobOptions | None = None
    self._auth_options: _AuthOptions | None = None
    self._monitoring_options: _MonitoringOptions | None = None
    self._app_configs: List[App] = []
    self._custom_routers: List[APIRouter] = []
    self._dependency_overrides: Dict[DependencyOverride, DependencyOverride] = {}
    self._startup_hooks: List[LifecycleHook] = []
    self._shutdown_hooks: List[LifecycleHook] = []

with_database(url_or_instance=None, *, pool_size=5, max_overflow=10, pool_recycle=3600, pool_pre_ping=True)

Configure database with URL string, Database instance, or default in-memory SQLite.

Source code in src/chapkit/core/api/service_builder.py
def with_database(
    self,
    url_or_instance: str | Database | None = None,
    *,
    pool_size: int = 5,
    max_overflow: int = 10,
    pool_recycle: int = 3600,
    pool_pre_ping: bool = True,
) -> Self:
    """Configure database with URL string, Database instance, or default in-memory SQLite."""
    if isinstance(url_or_instance, Database):
        # Pre-configured instance provided
        self._database_instance = url_or_instance
        return self  # Skip pool configuration for instances
    elif isinstance(url_or_instance, str):
        # String URL provided
        self._database_url = url_or_instance
    elif url_or_instance is None:
        # Default: in-memory SQLite
        self._database_url = "sqlite+aiosqlite:///:memory:"
    else:
        raise TypeError(
            f"Expected str, Database, or None, got {type(url_or_instance).__name__}. "
            "Use .with_database() for default, .with_database('url') for custom URL, "
            "or .with_database(db_instance) for pre-configured database."
        )

    # Configure pool settings (only applies to URL-based databases)
    self._pool_size = pool_size
    self._max_overflow = max_overflow
    self._pool_recycle = pool_recycle
    self._pool_pre_ping = pool_pre_ping
    return self

with_landing_page()

Enable landing page at root path.

Source code in src/chapkit/core/api/service_builder.py
def with_landing_page(self) -> Self:
    """Enable landing page at root path."""
    return self.with_app(("chapkit.core.api", "apps/landing"))

with_logging(enabled=True)

Enable structured logging with request tracing.

Source code in src/chapkit/core/api/service_builder.py
def with_logging(self, enabled: bool = True) -> Self:
    """Enable structured logging with request tracing."""
    self._include_logging = enabled
    return self

with_health(*, prefix='/health', tags=None, checks=None, include_database_check=True)

Add health check endpoint with optional custom checks.

Source code in src/chapkit/core/api/service_builder.py
def with_health(
    self,
    *,
    prefix: str = "/health",
    tags: List[str] | None = None,
    checks: dict[str, HealthCheck] | None = None,
    include_database_check: bool = True,
) -> Self:
    """Add health check endpoint with optional custom checks."""
    health_checks = checks or {}

    if include_database_check:
        health_checks["database"] = self._create_database_health_check()

    self._health_options = _HealthOptions(
        prefix=prefix,
        tags=list(tags) if tags is not None else ["Observability"],
        checks=health_checks,
    )
    return self

with_system(*, prefix='/api/v1/system', tags=None)

Add system info endpoint.

Source code in src/chapkit/core/api/service_builder.py
def with_system(
    self,
    *,
    prefix: str = "/api/v1/system",
    tags: List[str] | None = None,
) -> Self:
    """Add system info endpoint."""
    self._system_options = _SystemOptions(
        prefix=prefix,
        tags=list(tags) if tags is not None else ["Service"],
    )
    return self

with_jobs(*, prefix='/api/v1/jobs', tags=None, max_concurrency=None)

Add job scheduler endpoints.

Source code in src/chapkit/core/api/service_builder.py
def with_jobs(
    self,
    *,
    prefix: str = "/api/v1/jobs",
    tags: List[str] | None = None,
    max_concurrency: int | None = None,
) -> Self:
    """Add job scheduler endpoints."""
    self._job_options = _JobOptions(
        prefix=prefix,
        tags=list(tags) if tags is not None else ["Jobs"],
        max_concurrency=max_concurrency,
    )
    return self

with_auth(*, api_keys=None, api_key_file=None, env_var='CHAPKIT_API_KEYS', header_name='X-API-Key', unauthenticated_paths=None)

Enable API key authentication.

Source code in src/chapkit/core/api/service_builder.py
def with_auth(
    self,
    *,
    api_keys: List[str] | None = None,
    api_key_file: str | None = None,
    env_var: str = "CHAPKIT_API_KEYS",
    header_name: str = "X-API-Key",
    unauthenticated_paths: List[str] | None = None,
) -> Self:
    """Enable API key authentication."""
    keys: set[str] = set()
    auth_source: str = ""  # Track source for later logging

    # Priority 1: Direct list (examples/dev)
    if api_keys is not None:
        keys = set(api_keys)
        auth_source = "direct_keys"

    # Priority 2: File (Docker secrets)
    elif api_key_file is not None:
        keys = load_api_keys_from_file(api_key_file)
        auth_source = f"file:{api_key_file}"

    # Priority 3: Environment variable (default)
    else:
        keys = load_api_keys_from_env(env_var)
        if keys:
            auth_source = f"env:{env_var}"
        else:
            auth_source = f"env:{env_var}:empty"

    if not keys:
        raise ValueError("No API keys configured. Provide api_keys, api_key_file, or set environment variable.")

    # Default unauthenticated paths
    default_unauth = {"/docs", "/redoc", "/openapi.json", "/health", "/"}
    unauth_set = set(unauthenticated_paths) if unauthenticated_paths else default_unauth

    self._auth_options = _AuthOptions(
        api_keys=keys,
        header_name=header_name,
        unauthenticated_paths=unauth_set,
        source=auth_source,
    )
    return self

with_monitoring(*, prefix='/metrics', tags=None, service_name=None, enable_traces=False)

Enable OpenTelemetry monitoring with Prometheus endpoint and auto-instrumentation.

Source code in src/chapkit/core/api/service_builder.py
def with_monitoring(
    self,
    *,
    prefix: str = "/metrics",
    tags: List[str] | None = None,
    service_name: str | None = None,
    enable_traces: bool = False,
) -> Self:
    """Enable OpenTelemetry monitoring with Prometheus endpoint and auto-instrumentation."""
    self._monitoring_options = _MonitoringOptions(
        prefix=prefix,
        tags=list(tags) if tags is not None else ["Observability"],
        service_name=service_name,
        enable_traces=enable_traces,
    )
    return self

with_app(path, prefix=None)

Register static app from filesystem path or package resource tuple.

Source code in src/chapkit/core/api/service_builder.py
def with_app(self, path: str | Path | tuple[str, str], prefix: str | None = None) -> Self:
    """Register static app from filesystem path or package resource tuple."""
    app = AppLoader.load(path, prefix=prefix)
    self._app_configs.append(app)
    return self

with_apps(path)

Auto-discover and register all apps in directory.

Source code in src/chapkit/core/api/service_builder.py
def with_apps(self, path: str | Path | tuple[str, str]) -> Self:
    """Auto-discover and register all apps in directory."""
    apps = AppLoader.discover(path)
    self._app_configs.extend(apps)
    return self

include_router(router)

Include a custom router.

Source code in src/chapkit/core/api/service_builder.py
def include_router(self, router: APIRouter) -> Self:
    """Include a custom router."""
    self._custom_routers.append(router)
    return self

override_dependency(dependency, override)

Override a dependency for testing or customization.

Source code in src/chapkit/core/api/service_builder.py
def override_dependency(self, dependency: DependencyOverride, override: DependencyOverride) -> Self:
    """Override a dependency for testing or customization."""
    self._dependency_overrides[dependency] = override
    return self

on_startup(hook)

Register a startup hook.

Source code in src/chapkit/core/api/service_builder.py
def on_startup(self, hook: LifecycleHook) -> Self:
    """Register a startup hook."""
    self._startup_hooks.append(hook)
    return self

on_shutdown(hook)

Register a shutdown hook.

Source code in src/chapkit/core/api/service_builder.py
def on_shutdown(self, hook: LifecycleHook) -> Self:
    """Register a shutdown hook."""
    self._shutdown_hooks.append(hook)
    return self

build()

Build and configure the FastAPI application.

Source code in src/chapkit/core/api/service_builder.py
def build(self) -> FastAPI:
    """Build and configure the FastAPI application."""
    self._validate_configuration()
    self._validate_module_configuration()  # Extension point for subclasses

    lifespan = self._build_lifespan()
    app = FastAPI(
        title=self._title,
        description=self._app_description,
        version=self._version,
        lifespan=lifespan,
    )
    app.state.database_url = self._database_url

    # Override schema generation to clean up generic type names
    app.openapi = self._create_openapi_customizer(app)  # type: ignore[method-assign]

    if self._include_error_handlers:
        add_error_handlers(app)

    if self._include_logging:
        add_logging_middleware(app)

    if self._auth_options:
        app.add_middleware(
            APIKeyMiddleware,
            api_keys=self._auth_options.api_keys,
            header_name=self._auth_options.header_name,
            unauthenticated_paths=self._auth_options.unauthenticated_paths,
        )
        # Store auth_source for logging during startup
        app.state.auth_source = self._auth_options.source
        app.state.auth_key_count = len(self._auth_options.api_keys)

    if self._health_options:
        health_router = HealthRouter.create(
            prefix=self._health_options.prefix,
            tags=self._health_options.tags,
            checks=self._health_options.checks,
        )
        app.include_router(health_router)

    if self._system_options:
        system_router = SystemRouter.create(
            prefix=self._system_options.prefix,
            tags=self._system_options.tags,
        )
        app.include_router(system_router)

    if self._job_options:
        job_router = JobRouter.create(
            prefix=self._job_options.prefix,
            tags=self._job_options.tags,
            scheduler_factory=get_scheduler,
        )
        app.include_router(job_router)

    if self._monitoring_options:
        from .monitoring import setup_monitoring

        metric_reader = setup_monitoring(
            app,
            service_name=self._monitoring_options.service_name,
            enable_traces=self._monitoring_options.enable_traces,
        )
        metrics_router = MetricsRouter.create(
            prefix=self._monitoring_options.prefix,
            tags=self._monitoring_options.tags,
            metric_reader=metric_reader,
        )
        app.include_router(metrics_router)

    # Extension point for module-specific routers
    self._register_module_routers(app)

    for router in self._custom_routers:
        app.include_router(router)

    # Install route endpoints BEFORE mounting apps (routes take precedence over mounts)
    self._install_info_endpoint(app, info=self.info)

    # Mount apps AFTER all routes (apps act as catch-all for unmatched paths)
    if self._app_configs:
        from fastapi.staticfiles import StaticFiles

        for app_config in self._app_configs:
            static_files = StaticFiles(directory=str(app_config.directory), html=True)
            app.mount(app_config.prefix, static_files, name=f"app_{app_config.manifest.name}")
            logger.info(
                "app.mounted",
                name=app_config.manifest.name,
                prefix=app_config.prefix,
                directory=str(app_config.directory),
                is_package=app_config.is_package,
            )

    # Initialize app manager for metadata queries (always, even if no apps)
    from .app import AppManager
    from .dependencies import set_app_manager

    app_manager = AppManager(self._app_configs)
    set_app_manager(app_manager)

    for dependency, override in self._dependency_overrides.items():
        app.dependency_overrides[dependency] = override

    return app

create(*, info, **kwargs) classmethod

Create and build a FastAPI application in one call.

Source code in src/chapkit/core/api/service_builder.py
@classmethod
def create(cls, *, info: ServiceInfo, **kwargs: Any) -> FastAPI:
    """Create and build a FastAPI application in one call."""
    return cls(info=info, **kwargs).build()

ServiceInfo

ServiceInfo

Bases: BaseModel

Service metadata for FastAPI application.

Source code in src/chapkit/core/api/service_builder.py
class ServiceInfo(BaseModel):
    """Service metadata for FastAPI application."""

    display_name: str
    version: str = "1.0.0"
    summary: str | None = None
    description: str | None = None
    contact: dict[str, str] | None = None
    license_info: dict[str, str] | None = None

    model_config = ConfigDict(extra="forbid")

Routers

Base router classes and generic routers.

Router

Router

Bases: ABC

Base class for FastAPI routers.

Source code in src/chapkit/core/api/router.py
class Router(ABC):
    """Base class for FastAPI routers."""

    default_response_model_exclude_none: bool = False

    def __init__(self, prefix: str, tags: Sequence[str], **kwargs: Any) -> None:
        """Initialize router with prefix and tags."""
        self.router = APIRouter(prefix=prefix, tags=list(tags), **kwargs)
        self._register_routes()

    @classmethod
    def create(cls, prefix: str, tags: Sequence[str], **kwargs: Any) -> APIRouter:
        """Create a router instance and return the FastAPI router."""
        return cls(prefix=prefix, tags=tags, **kwargs).router

    @abstractmethod
    def _register_routes(self) -> None:
        """Register routes for this router."""
        ...

__init__(prefix, tags, **kwargs)

Initialize router with prefix and tags.

Source code in src/chapkit/core/api/router.py
def __init__(self, prefix: str, tags: Sequence[str], **kwargs: Any) -> None:
    """Initialize router with prefix and tags."""
    self.router = APIRouter(prefix=prefix, tags=list(tags), **kwargs)
    self._register_routes()

create(prefix, tags, **kwargs) classmethod

Create a router instance and return the FastAPI router.

Source code in src/chapkit/core/api/router.py
@classmethod
def create(cls, prefix: str, tags: Sequence[str], **kwargs: Any) -> APIRouter:
    """Create a router instance and return the FastAPI router."""
    return cls(prefix=prefix, tags=tags, **kwargs).router

CrudRouter

CrudRouter

Bases: Router

Router base class for standard REST CRUD operations.

Source code in src/chapkit/core/api/crud.py
class CrudRouter[InSchemaT: BaseModel, OutSchemaT: BaseModel](Router):
    """Router base class for standard REST CRUD operations."""

    def __init__(
        self,
        prefix: str,
        tags: list[str],
        entity_in_type: type[InSchemaT],
        entity_out_type: type[OutSchemaT],
        manager_factory: ManagerFactory[InSchemaT, OutSchemaT],
        *,
        permissions: CrudPermissions | None = None,
        **kwargs: Any,
    ) -> None:
        """Initialize CRUD router with entity types and manager factory."""
        self.manager_factory = manager_factory
        self.entity_in_type = entity_in_type
        self.entity_out_type = entity_out_type
        self._permissions = permissions or CrudPermissions()
        super().__init__(prefix=prefix, tags=tags, **kwargs)

    def _register_routes(self) -> None:
        """Register CRUD routes based on permissions."""
        manager_dependency, manager_annotation = self._manager_dependency()
        perms = self._permissions
        if perms.create:
            self._register_create_route(manager_dependency, manager_annotation)
        if perms.read:
            self._register_find_all_route(manager_dependency, manager_annotation)
            self._register_find_by_id_route(manager_dependency, manager_annotation)
            self._register_schema_route()
        if perms.update:
            self._register_update_route(manager_dependency, manager_annotation)
        if perms.delete:
            self._register_delete_route(manager_dependency, manager_annotation)

    def register_entity_operation(
        self,
        name: str,
        handler: Callable[..., Any],
        *,
        http_method: str = "GET",
        response_model: type[Any] | None = None,
        status_code: int | None = None,
        summary: str | None = None,
        description: str | None = None,
    ) -> None:
        """Register a custom entity operation with $ prefix.

        Entity operations are automatically inserted before generic {entity_id} routes
        to ensure proper route matching (e.g., /{entity_id}/$validate should match
        before /{entity_id}).
        """
        route = f"/{{entity_id}}/${name}"
        route_kwargs: dict[str, Any] = {}

        if response_model is not None:
            route_kwargs["response_model"] = response_model
        if status_code is not None:
            route_kwargs["status_code"] = status_code
        if summary is not None:
            route_kwargs["summary"] = summary
        if description is not None:
            route_kwargs["description"] = description

        # Register the route with the appropriate HTTP method
        http_method_lower = http_method.lower()
        if http_method_lower == "get":
            self.router.get(route, **route_kwargs)(handler)
        elif http_method_lower == "post":
            self.router.post(route, **route_kwargs)(handler)
        elif http_method_lower == "put":
            self.router.put(route, **route_kwargs)(handler)
        elif http_method_lower == "patch":
            self.router.patch(route, **route_kwargs)(handler)
        elif http_method_lower == "delete":
            self.router.delete(route, **route_kwargs)(handler)
        else:
            raise ValueError(f"Unsupported HTTP method: {http_method}")

        # Move the just-added route to before generic parametric routes
        # Entity operations like /{entity_id}/$validate should match before /{entity_id}
        if len(self.router.routes) > 1:
            new_route = self.router.routes.pop()
            insert_index = self._find_generic_parametric_route_index()
            self.router.routes.insert(insert_index, new_route)

    def register_collection_operation(
        self,
        name: str,
        handler: Callable[..., Any],
        *,
        http_method: str = "GET",
        response_model: type[Any] | None = None,
        status_code: int | None = None,
        summary: str | None = None,
        description: str | None = None,
    ) -> None:
        """Register a custom collection operation with $ prefix.

        Collection operations are automatically inserted before parametric {entity_id} routes
        to ensure proper route matching (e.g., /$stats should match before /{entity_id}).
        """
        route = f"/${name}"
        route_kwargs: dict[str, Any] = {}

        if response_model is not None:
            route_kwargs["response_model"] = response_model
        if status_code is not None:
            route_kwargs["status_code"] = status_code
        if summary is not None:
            route_kwargs["summary"] = summary
        if description is not None:
            route_kwargs["description"] = description

        # Register the route with the appropriate HTTP method
        http_method_lower = http_method.lower()
        if http_method_lower == "get":
            self.router.get(route, **route_kwargs)(handler)
        elif http_method_lower == "post":
            self.router.post(route, **route_kwargs)(handler)
        elif http_method_lower == "put":
            self.router.put(route, **route_kwargs)(handler)
        elif http_method_lower == "patch":
            self.router.patch(route, **route_kwargs)(handler)
        elif http_method_lower == "delete":
            self.router.delete(route, **route_kwargs)(handler)
        else:
            raise ValueError(f"Unsupported HTTP method: {http_method}")

        # Move the just-added route to before parametric routes
        # FastAPI appends to routes list, so the last route is the one we just added
        if len(self.router.routes) > 1:
            new_route = self.router.routes.pop()  # Remove the route we just added
            # Find the first parametric route and insert before it
            insert_index = self._find_parametric_route_index()
            self.router.routes.insert(insert_index, new_route)

    # Route registration helpers --------------------------------------

    def _register_create_route(self, manager_dependency: Any, manager_annotation: Any) -> None:
        entity_in_annotation: Any = self.entity_in_type
        entity_out_annotation: Any = self.entity_out_type
        router_prefix = self.router.prefix

        @self.router.post("", status_code=status.HTTP_201_CREATED, response_model=entity_out_annotation)
        async def create(
            entity_in: InSchemaT,
            request: Request,
            response: Response,
            manager: Manager[InSchemaT, OutSchemaT, ULID] = manager_dependency,
        ) -> OutSchemaT:
            from .utilities import build_location_url

            created_entity = await manager.save(entity_in)
            entity_id = getattr(created_entity, "id")
            response.headers["Location"] = build_location_url(request, f"{router_prefix}/{entity_id}")
            return created_entity

        self._annotate_manager(create, manager_annotation)
        create.__annotations__["entity_in"] = entity_in_annotation
        create.__annotations__["return"] = entity_out_annotation

    def _register_find_all_route(self, manager_dependency: Any, manager_annotation: Any) -> None:
        entity_out_annotation: Any = self.entity_out_type
        collection_response_model: Any = list[entity_out_annotation] | PaginatedResponse[entity_out_annotation]

        @self.router.get("", response_model=collection_response_model)
        async def find_all(
            page: int | None = None,
            size: int | None = None,
            manager: Manager[InSchemaT, OutSchemaT, ULID] = manager_dependency,
        ) -> list[OutSchemaT] | PaginatedResponse[OutSchemaT]:
            from .pagination import create_paginated_response

            # Pagination is opt-in: both page and size must be provided
            if page is not None and size is not None:
                items, total = await manager.find_paginated(page, size)
                return create_paginated_response(items, total, page, size)
            return await manager.find_all()

        self._annotate_manager(find_all, manager_annotation)
        find_all.__annotations__["return"] = list[entity_out_annotation] | PaginatedResponse[entity_out_annotation]

    def _register_find_by_id_route(self, manager_dependency: Any, manager_annotation: Any) -> None:
        entity_out_annotation: Any = self.entity_out_type
        router_prefix = self.router.prefix

        @self.router.get("/{entity_id}", response_model=entity_out_annotation)
        async def find_by_id(
            entity_id: str,
            manager: Manager[InSchemaT, OutSchemaT, ULID] = manager_dependency,
        ) -> OutSchemaT:
            from chapkit.core.exceptions import NotFoundError

            ulid_id = self._parse_ulid(entity_id)
            entity = await manager.find_by_id(ulid_id)
            if entity is None:
                raise NotFoundError(
                    f"Entity with id {entity_id} not found",
                    instance=f"{router_prefix}/{entity_id}",
                )
            return entity

        self._annotate_manager(find_by_id, manager_annotation)
        find_by_id.__annotations__["return"] = entity_out_annotation

    def _register_update_route(self, manager_dependency: Any, manager_annotation: Any) -> None:
        entity_in_type = self.entity_in_type
        entity_in_annotation: Any = entity_in_type
        entity_out_annotation: Any = self.entity_out_type
        router_prefix = self.router.prefix

        @self.router.put("/{entity_id}", response_model=entity_out_annotation)
        async def update(
            entity_id: str,
            entity_in: InSchemaT,
            manager: Manager[InSchemaT, OutSchemaT, ULID] = manager_dependency,
        ) -> OutSchemaT:
            from chapkit.core.exceptions import NotFoundError

            ulid_id = self._parse_ulid(entity_id)
            if not await manager.exists_by_id(ulid_id):
                raise NotFoundError(
                    f"Entity with id {entity_id} not found",
                    instance=f"{router_prefix}/{entity_id}",
                )
            entity_dict = entity_in.model_dump(exclude_unset=True)
            entity_dict["id"] = ulid_id
            entity_with_id = entity_in_type.model_validate(entity_dict)
            return await manager.save(entity_with_id)

        self._annotate_manager(update, manager_annotation)
        update.__annotations__["entity_in"] = entity_in_annotation
        update.__annotations__["return"] = entity_out_annotation

    def _register_delete_route(self, manager_dependency: Any, manager_annotation: Any) -> None:
        router_prefix = self.router.prefix

        @self.router.delete("/{entity_id}", status_code=status.HTTP_204_NO_CONTENT)
        async def delete_by_id(
            entity_id: str,
            manager: Manager[InSchemaT, OutSchemaT, ULID] = manager_dependency,
        ) -> None:
            from chapkit.core.exceptions import NotFoundError

            ulid_id = self._parse_ulid(entity_id)
            if not await manager.exists_by_id(ulid_id):
                raise NotFoundError(
                    f"Entity with id {entity_id} not found",
                    instance=f"{router_prefix}/{entity_id}",
                )
            await manager.delete_by_id(ulid_id)

        self._annotate_manager(delete_by_id, manager_annotation)

    def _register_schema_route(self) -> None:
        """Register JSON schema endpoint for the entity output type."""
        entity_out_type = self.entity_out_type

        async def get_schema() -> dict[str, Any]:
            return entity_out_type.model_json_schema()

        self.register_collection_operation(
            name="schema",
            handler=get_schema,
            http_method="GET",
            response_model=dict[str, Any],
        )

    # Helper utilities -------------------------------------------------

    def _manager_dependency(self) -> tuple[Any, Any]:
        manager_dependency = Depends(self.manager_factory)
        manager_annotation: Any = Manager[Any, Any, ULID]
        return manager_dependency, manager_annotation

    def _annotate_manager(self, endpoint: Any, manager_annotation: Any) -> None:
        endpoint.__annotations__["manager"] = manager_annotation

    def _parse_ulid(self, entity_id: str) -> ULID:
        from chapkit.core.exceptions import InvalidULIDError

        try:
            return ULID.from_str(entity_id)
        except ValueError as e:
            raise InvalidULIDError(
                f"Invalid ULID format: {entity_id}",
                instance=f"{self.router.prefix}/{entity_id}",
            ) from e

    def _find_parametric_route_index(self) -> int:
        """Find the index of the first parametric route containing {entity_id}.

        Returns the index where collection operations should be inserted to ensure
        they're matched before parametric routes.
        """
        for i, route in enumerate(self.router.routes):
            route_path = getattr(route, "path", "")
            if "{entity_id}" in route_path:
                return i
        # If no parametric route found, append at the end
        return len(self.router.routes)

    def _find_generic_parametric_route_index(self) -> int:
        """Find the index of the first generic parametric route (/{entity_id} without $).

        Returns the index where entity operations should be inserted to ensure
        they're matched before generic routes like GET/PUT/DELETE /{entity_id}.
        """
        for i, route in enumerate(self.router.routes):
            route_path = getattr(route, "path", "")
            # Match routes like /{entity_id} but not /{entity_id}/$operation
            if "{entity_id}" in route_path and "/$" not in route_path:
                return i
        # If no generic parametric route found, append at the end
        return len(self.router.routes)

__init__(prefix, tags, entity_in_type, entity_out_type, manager_factory, *, permissions=None, **kwargs)

Initialize CRUD router with entity types and manager factory.

Source code in src/chapkit/core/api/crud.py
def __init__(
    self,
    prefix: str,
    tags: list[str],
    entity_in_type: type[InSchemaT],
    entity_out_type: type[OutSchemaT],
    manager_factory: ManagerFactory[InSchemaT, OutSchemaT],
    *,
    permissions: CrudPermissions | None = None,
    **kwargs: Any,
) -> None:
    """Initialize CRUD router with entity types and manager factory."""
    self.manager_factory = manager_factory
    self.entity_in_type = entity_in_type
    self.entity_out_type = entity_out_type
    self._permissions = permissions or CrudPermissions()
    super().__init__(prefix=prefix, tags=tags, **kwargs)

register_entity_operation(name, handler, *, http_method='GET', response_model=None, status_code=None, summary=None, description=None)

Register a custom entity operation with $ prefix.

Entity operations are automatically inserted before generic {entity_id} routes to ensure proper route matching (e.g., /{entity_id}/$validate should match before /{entity_id}).

Source code in src/chapkit/core/api/crud.py
def register_entity_operation(
    self,
    name: str,
    handler: Callable[..., Any],
    *,
    http_method: str = "GET",
    response_model: type[Any] | None = None,
    status_code: int | None = None,
    summary: str | None = None,
    description: str | None = None,
) -> None:
    """Register a custom entity operation with $ prefix.

    Entity operations are automatically inserted before generic {entity_id} routes
    to ensure proper route matching (e.g., /{entity_id}/$validate should match
    before /{entity_id}).
    """
    route = f"/{{entity_id}}/${name}"
    route_kwargs: dict[str, Any] = {}

    if response_model is not None:
        route_kwargs["response_model"] = response_model
    if status_code is not None:
        route_kwargs["status_code"] = status_code
    if summary is not None:
        route_kwargs["summary"] = summary
    if description is not None:
        route_kwargs["description"] = description

    # Register the route with the appropriate HTTP method
    http_method_lower = http_method.lower()
    if http_method_lower == "get":
        self.router.get(route, **route_kwargs)(handler)
    elif http_method_lower == "post":
        self.router.post(route, **route_kwargs)(handler)
    elif http_method_lower == "put":
        self.router.put(route, **route_kwargs)(handler)
    elif http_method_lower == "patch":
        self.router.patch(route, **route_kwargs)(handler)
    elif http_method_lower == "delete":
        self.router.delete(route, **route_kwargs)(handler)
    else:
        raise ValueError(f"Unsupported HTTP method: {http_method}")

    # Move the just-added route to before generic parametric routes
    # Entity operations like /{entity_id}/$validate should match before /{entity_id}
    if len(self.router.routes) > 1:
        new_route = self.router.routes.pop()
        insert_index = self._find_generic_parametric_route_index()
        self.router.routes.insert(insert_index, new_route)

register_collection_operation(name, handler, *, http_method='GET', response_model=None, status_code=None, summary=None, description=None)

Register a custom collection operation with $ prefix.

Collection operations are automatically inserted before parametric {entity_id} routes to ensure proper route matching (e.g., /$stats should match before /{entity_id}).

Source code in src/chapkit/core/api/crud.py
def register_collection_operation(
    self,
    name: str,
    handler: Callable[..., Any],
    *,
    http_method: str = "GET",
    response_model: type[Any] | None = None,
    status_code: int | None = None,
    summary: str | None = None,
    description: str | None = None,
) -> None:
    """Register a custom collection operation with $ prefix.

    Collection operations are automatically inserted before parametric {entity_id} routes
    to ensure proper route matching (e.g., /$stats should match before /{entity_id}).
    """
    route = f"/${name}"
    route_kwargs: dict[str, Any] = {}

    if response_model is not None:
        route_kwargs["response_model"] = response_model
    if status_code is not None:
        route_kwargs["status_code"] = status_code
    if summary is not None:
        route_kwargs["summary"] = summary
    if description is not None:
        route_kwargs["description"] = description

    # Register the route with the appropriate HTTP method
    http_method_lower = http_method.lower()
    if http_method_lower == "get":
        self.router.get(route, **route_kwargs)(handler)
    elif http_method_lower == "post":
        self.router.post(route, **route_kwargs)(handler)
    elif http_method_lower == "put":
        self.router.put(route, **route_kwargs)(handler)
    elif http_method_lower == "patch":
        self.router.patch(route, **route_kwargs)(handler)
    elif http_method_lower == "delete":
        self.router.delete(route, **route_kwargs)(handler)
    else:
        raise ValueError(f"Unsupported HTTP method: {http_method}")

    # Move the just-added route to before parametric routes
    # FastAPI appends to routes list, so the last route is the one we just added
    if len(self.router.routes) > 1:
        new_route = self.router.routes.pop()  # Remove the route we just added
        # Find the first parametric route and insert before it
        insert_index = self._find_parametric_route_index()
        self.router.routes.insert(insert_index, new_route)

CrudPermissions

CrudPermissions dataclass

Permissions configuration for CRUD operations.

Source code in src/chapkit/core/api/crud.py
@dataclass(slots=True)
class CrudPermissions:
    """Permissions configuration for CRUD operations."""

    create: bool = True
    read: bool = True
    update: bool = True
    delete: bool = True

HealthRouter

HealthRouter

Bases: Router

Health check router for service health monitoring.

Source code in src/chapkit/core/api/routers/health.py
class HealthRouter(Router):
    """Health check router for service health monitoring."""

    default_response_model_exclude_none = True

    def __init__(
        self,
        prefix: str,
        tags: list[str],
        checks: dict[str, HealthCheck] | None = None,
        **kwargs: object,
    ) -> None:
        """Initialize health router with optional health checks."""
        self.checks = checks or {}
        super().__init__(prefix=prefix, tags=tags, **kwargs)

    def _register_routes(self) -> None:
        """Register health check endpoint."""
        checks = self.checks

        async def run_health_checks() -> HealthStatus:
            """Run all health checks and aggregate results."""
            if not checks:
                return HealthStatus(status=HealthState.HEALTHY)

            check_results: dict[str, CheckResult] = {}
            overall_state = HealthState.HEALTHY

            for name, check_fn in checks.items():
                try:
                    state, message = await check_fn()
                    check_results[name] = CheckResult(state=state, message=message)

                    if state == HealthState.UNHEALTHY:
                        overall_state = HealthState.UNHEALTHY
                    elif state == HealthState.DEGRADED and overall_state == HealthState.HEALTHY:
                        overall_state = HealthState.DEGRADED

                except Exception as e:
                    check_results[name] = CheckResult(state=HealthState.UNHEALTHY, message=f"Check failed: {str(e)}")
                    overall_state = HealthState.UNHEALTHY

            return HealthStatus(status=overall_state, checks=check_results)

        @self.router.get(
            "",
            summary="Health check",
            response_model=HealthStatus,
            response_model_exclude_none=self.default_response_model_exclude_none,
        )
        async def health_check() -> HealthStatus:
            return await run_health_checks()

        @self.router.get(
            "/$stream",
            summary="Stream health status updates via SSE",
            description="Real-time Server-Sent Events stream of health status at regular intervals",
        )
        async def stream_health_status(poll_interval: float = 1.0) -> StreamingResponse:
            """Stream real-time health status updates using Server-Sent Events."""

            async def event_stream() -> AsyncGenerator[bytes, None]:
                while True:
                    status = await run_health_checks()
                    yield format_sse_model_event(status, exclude_none=self.default_response_model_exclude_none)
                    await asyncio.sleep(poll_interval)

            return StreamingResponse(
                event_stream(),
                media_type="text/event-stream",
                headers=SSE_HEADERS,
            )

__init__(prefix, tags, checks=None, **kwargs)

Initialize health router with optional health checks.

Source code in src/chapkit/core/api/routers/health.py
def __init__(
    self,
    prefix: str,
    tags: list[str],
    checks: dict[str, HealthCheck] | None = None,
    **kwargs: object,
) -> None:
    """Initialize health router with optional health checks."""
    self.checks = checks or {}
    super().__init__(prefix=prefix, tags=tags, **kwargs)

JobRouter

JobRouter

Bases: Router

REST API router for job scheduler operations.

Source code in src/chapkit/core/api/routers/job.py
class JobRouter(Router):
    """REST API router for job scheduler operations."""

    def __init__(
        self,
        prefix: str,
        tags: list[str],
        scheduler_factory: Callable[[], JobScheduler],
        **kwargs: object,
    ) -> None:
        """Initialize job router with scheduler factory."""
        self.scheduler_factory = scheduler_factory
        super().__init__(prefix=prefix, tags=tags, **kwargs)

    def _register_routes(self) -> None:
        """Register job management endpoints."""
        scheduler_dependency = Depends(self.scheduler_factory)

        @self.router.get("", summary="List all jobs", response_model=list[JobRecord])
        async def get_jobs(
            scheduler: JobScheduler = scheduler_dependency,
            status_filter: JobStatus | None = None,
        ) -> list[JobRecord]:
            jobs = await scheduler.get_all_records()
            if status_filter:
                return [job for job in jobs if job.status == status_filter]
            return jobs

        @self.router.get("/$schema", summary="Get jobs list schema", response_model=dict[str, Any])
        async def get_jobs_schema() -> dict[str, Any]:
            """Get JSON schema for jobs list response."""
            return TypeAdapter(list[JobRecord]).json_schema()

        @self.router.get("/{job_id}", summary="Get job by ID", response_model=JobRecord)
        async def get_job(
            job_id: str,
            scheduler: JobScheduler = scheduler_dependency,
        ) -> JobRecord:
            try:
                ulid_id = ULID.from_str(job_id)
                return await scheduler.get_record(ulid_id)
            except (ValueError, KeyError):
                raise HTTPException(status_code=404, detail="Job not found")

        @self.router.delete("/{job_id}", summary="Cancel and delete job", status_code=status.HTTP_204_NO_CONTENT)
        async def delete_job(
            job_id: str,
            scheduler: JobScheduler = scheduler_dependency,
        ) -> Response:
            try:
                ulid_id = ULID.from_str(job_id)
                await scheduler.delete(ulid_id)
                return Response(status_code=status.HTTP_204_NO_CONTENT)
            except (ValueError, KeyError):
                raise HTTPException(status_code=404, detail="Job not found")

        @self.router.get(
            "/{job_id}/$stream",
            summary="Stream job status updates via SSE",
            description="Real-time Server-Sent Events stream of job status changes until terminal state",
        )
        async def stream_job_status(
            job_id: str,
            scheduler: JobScheduler = scheduler_dependency,
            poll_interval: float = 0.5,
        ) -> StreamingResponse:
            """Stream real-time job status updates using Server-Sent Events."""
            # Validate job_id format
            try:
                ulid_id = ULID.from_str(job_id)
            except ValueError:
                raise HTTPException(status_code=400, detail="Invalid job ID format")

            # Check job exists before starting stream
            try:
                await scheduler.get_record(ulid_id)
            except KeyError:
                raise HTTPException(status_code=404, detail="Job not found")

            # SSE event generator
            async def event_stream() -> AsyncGenerator[bytes, None]:
                terminal_states = {"completed", "failed", "canceled"}

                while True:
                    try:
                        record = await scheduler.get_record(ulid_id)
                        # Format as SSE event
                        yield format_sse_model_event(record)

                        # Stop streaming if job reached terminal state
                        if record.status in terminal_states:
                            break

                    except KeyError:
                        # Job was deleted - send final event and close
                        yield b'data: {"status": "deleted"}\n\n'
                        break

                    await asyncio.sleep(poll_interval)

            return StreamingResponse(
                event_stream(),
                media_type="text/event-stream",
                headers=SSE_HEADERS,
            )

__init__(prefix, tags, scheduler_factory, **kwargs)

Initialize job router with scheduler factory.

Source code in src/chapkit/core/api/routers/job.py
def __init__(
    self,
    prefix: str,
    tags: list[str],
    scheduler_factory: Callable[[], JobScheduler],
    **kwargs: object,
) -> None:
    """Initialize job router with scheduler factory."""
    self.scheduler_factory = scheduler_factory
    super().__init__(prefix=prefix, tags=tags, **kwargs)

SystemRouter

SystemRouter

Bases: Router

System information router.

Source code in src/chapkit/core/api/routers/system.py
class SystemRouter(Router):
    """System information router."""

    def _register_routes(self) -> None:
        """Register system info endpoint."""

        @self.router.get(
            "",
            summary="System information",
            response_model=SystemInfo,
        )
        async def get_system_info() -> SystemInfo:
            return SystemInfo(
                current_time=datetime.now(timezone.utc),
                timezone=str(datetime.now().astimezone().tzinfo),
                python_version=f"{sys.version_info.major}.{sys.version_info.minor}.{sys.version_info.micro}",
                platform=platform.platform(),
                hostname=platform.node(),
            )

        @self.router.get(
            "/apps",
            summary="List installed apps",
            response_model=list[AppInfo],
        )
        async def list_apps(
            app_manager: Annotated[AppManager, Depends(get_app_manager)],
        ) -> list[AppInfo]:
            """List all installed apps with their metadata."""
            return [
                AppInfo(
                    name=app.manifest.name,
                    version=app.manifest.version,
                    prefix=app.prefix,
                    description=app.manifest.description,
                    author=app.manifest.author,
                    entry=app.manifest.entry,
                    is_package=app.is_package,
                )
                for app in app_manager.list()
            ]

        @self.router.get(
            "/apps/$schema",
            summary="Get apps list schema",
            response_model=dict[str, Any],
        )
        async def get_apps_schema() -> dict[str, Any]:
            """Get JSON schema for apps list response."""
            return TypeAdapter(list[AppInfo]).json_schema()

App System

Static web application hosting system.

AppLoader

AppLoader

Loads and validates apps from filesystem or package resources.

Source code in src/chapkit/core/api/app.py
class AppLoader:
    """Loads and validates apps from filesystem or package resources."""

    @staticmethod
    def load(path: str | Path | tuple[str, str], prefix: str | None = None) -> App:
        """Load and validate app from filesystem path or package resource tuple."""
        # Detect source type and resolve to directory
        if isinstance(path, tuple):
            # Package resource
            dir_path, is_package = AppLoader._resolve_package_path(path)
        else:
            # Filesystem path
            dir_path = Path(path).resolve()
            is_package = False

            if not dir_path.exists():
                raise FileNotFoundError(f"App directory not found: {dir_path}")
            if not dir_path.is_dir():
                raise NotADirectoryError(f"App path is not a directory: {dir_path}")

        # Load and validate manifest
        manifest_path = dir_path / "manifest.json"
        if not manifest_path.exists():
            raise FileNotFoundError(f"manifest.json not found in: {dir_path}")

        try:
            with manifest_path.open() as f:
                manifest_data = json.load(f)
        except json.JSONDecodeError as e:
            raise ValueError(f"Invalid JSON in manifest.json: {e}") from e

        manifest = AppManifest(**manifest_data)

        # Validate entry file exists
        entry_path = dir_path / manifest.entry
        if not entry_path.exists():
            raise FileNotFoundError(f"Entry file '{manifest.entry}' not found in: {dir_path}")

        # Use override or manifest prefix
        final_prefix = prefix if prefix is not None else manifest.prefix

        # Re-validate prefix if overridden
        if prefix is not None:
            validated = AppManifest(
                name=manifest.name,
                version=manifest.version,
                prefix=final_prefix,
            )
            final_prefix = validated.prefix

        return App(
            manifest=manifest,
            directory=dir_path,
            prefix=final_prefix,
            is_package=is_package,
        )

    @staticmethod
    def discover(path: str | Path | tuple[str, str]) -> list[App]:
        """Discover all apps with manifest.json in directory."""
        # Resolve directory
        if isinstance(path, tuple):
            dir_path, _ = AppLoader._resolve_package_path(path)
        else:
            dir_path = Path(path).resolve()

            if not dir_path.exists():
                raise FileNotFoundError(f"Apps directory not found: {dir_path}")
            if not dir_path.is_dir():
                raise NotADirectoryError(f"Apps path is not a directory: {dir_path}")

        # Scan for subdirectories with manifest.json
        apps: list[App] = []
        for subdir in dir_path.iterdir():
            if subdir.is_dir() and (subdir / "manifest.json").exists():
                try:
                    # Determine if we're in a package context
                    if isinstance(path, tuple):
                        # Build tuple path for subdirectory
                        package_name: str = path[0]
                        base_path: str = path[1]
                        subdir_name = subdir.name
                        subpath = f"{base_path}/{subdir_name}" if base_path else subdir_name
                        app = AppLoader.load((package_name, subpath))
                    else:
                        app = AppLoader.load(subdir)
                    apps.append(app)
                except Exception as e:
                    # Log but don't fail discovery for invalid apps
                    logger.warning(
                        "app.discovery.failed",
                        directory=str(subdir),
                        error=str(e),
                    )

        return apps

    @staticmethod
    def _resolve_package_path(package_tuple: tuple[str, str]) -> tuple[Path, bool]:
        """Resolve package resource tuple to filesystem path."""
        package_name, subpath = package_tuple

        # Validate subpath for security
        if ".." in subpath:
            raise ValueError(f"subpath cannot contain '..' (got: {subpath})")
        if subpath.startswith("/"):
            raise ValueError(f"subpath must be relative (got: {subpath})")

        try:
            spec = importlib.util.find_spec(package_name)
        except (ModuleNotFoundError, ValueError) as e:
            raise ValueError(f"Package '{package_name}' could not be found") from e

        if spec is None or spec.origin is None:
            raise ValueError(f"Package '{package_name}' could not be found")

        # Resolve to package directory
        package_dir = Path(spec.origin).parent
        app_dir = package_dir / subpath

        # Verify resolved path is still within package directory
        try:
            app_dir.resolve().relative_to(package_dir.resolve())
        except ValueError as e:
            raise ValueError(f"App path '{subpath}' escapes package directory") from e

        if not app_dir.exists():
            raise FileNotFoundError(f"App path '{subpath}' not found in package '{package_name}'")
        if not app_dir.is_dir():
            raise NotADirectoryError(f"App path '{subpath}' in package '{package_name}' is not a directory")

        return app_dir, True

load(path, prefix=None) staticmethod

Load and validate app from filesystem path or package resource tuple.

Source code in src/chapkit/core/api/app.py
@staticmethod
def load(path: str | Path | tuple[str, str], prefix: str | None = None) -> App:
    """Load and validate app from filesystem path or package resource tuple."""
    # Detect source type and resolve to directory
    if isinstance(path, tuple):
        # Package resource
        dir_path, is_package = AppLoader._resolve_package_path(path)
    else:
        # Filesystem path
        dir_path = Path(path).resolve()
        is_package = False

        if not dir_path.exists():
            raise FileNotFoundError(f"App directory not found: {dir_path}")
        if not dir_path.is_dir():
            raise NotADirectoryError(f"App path is not a directory: {dir_path}")

    # Load and validate manifest
    manifest_path = dir_path / "manifest.json"
    if not manifest_path.exists():
        raise FileNotFoundError(f"manifest.json not found in: {dir_path}")

    try:
        with manifest_path.open() as f:
            manifest_data = json.load(f)
    except json.JSONDecodeError as e:
        raise ValueError(f"Invalid JSON in manifest.json: {e}") from e

    manifest = AppManifest(**manifest_data)

    # Validate entry file exists
    entry_path = dir_path / manifest.entry
    if not entry_path.exists():
        raise FileNotFoundError(f"Entry file '{manifest.entry}' not found in: {dir_path}")

    # Use override or manifest prefix
    final_prefix = prefix if prefix is not None else manifest.prefix

    # Re-validate prefix if overridden
    if prefix is not None:
        validated = AppManifest(
            name=manifest.name,
            version=manifest.version,
            prefix=final_prefix,
        )
        final_prefix = validated.prefix

    return App(
        manifest=manifest,
        directory=dir_path,
        prefix=final_prefix,
        is_package=is_package,
    )

discover(path) staticmethod

Discover all apps with manifest.json in directory.

Source code in src/chapkit/core/api/app.py
@staticmethod
def discover(path: str | Path | tuple[str, str]) -> list[App]:
    """Discover all apps with manifest.json in directory."""
    # Resolve directory
    if isinstance(path, tuple):
        dir_path, _ = AppLoader._resolve_package_path(path)
    else:
        dir_path = Path(path).resolve()

        if not dir_path.exists():
            raise FileNotFoundError(f"Apps directory not found: {dir_path}")
        if not dir_path.is_dir():
            raise NotADirectoryError(f"Apps path is not a directory: {dir_path}")

    # Scan for subdirectories with manifest.json
    apps: list[App] = []
    for subdir in dir_path.iterdir():
        if subdir.is_dir() and (subdir / "manifest.json").exists():
            try:
                # Determine if we're in a package context
                if isinstance(path, tuple):
                    # Build tuple path for subdirectory
                    package_name: str = path[0]
                    base_path: str = path[1]
                    subdir_name = subdir.name
                    subpath = f"{base_path}/{subdir_name}" if base_path else subdir_name
                    app = AppLoader.load((package_name, subpath))
                else:
                    app = AppLoader.load(subdir)
                apps.append(app)
            except Exception as e:
                # Log but don't fail discovery for invalid apps
                logger.warning(
                    "app.discovery.failed",
                    directory=str(subdir),
                    error=str(e),
                )

    return apps

AppManifest

AppManifest

Bases: BaseModel

App manifest configuration.

Source code in src/chapkit/core/api/app.py
class AppManifest(BaseModel):
    """App manifest configuration."""

    model_config = ConfigDict(extra="forbid")

    name: str = Field(description="Human-readable app name")
    version: str = Field(description="Semantic version")
    prefix: str = Field(description="URL prefix for mounting")
    description: str | None = Field(default=None, description="App description")
    author: str | None = Field(default=None, description="Author name")
    entry: str = Field(default="index.html", description="Entry point filename")

    @field_validator("prefix")
    @classmethod
    def validate_prefix(cls, v: str) -> str:
        """Validate mount prefix format."""
        if not v.startswith("/"):
            raise ValueError("prefix must start with '/'")
        if ".." in v:
            raise ValueError("prefix cannot contain '..'")
        if v.startswith("/api/") or v == "/api":
            raise ValueError("prefix cannot be '/api' or start with '/api/'")
        return v

    @field_validator("entry")
    @classmethod
    def validate_entry(cls, v: str) -> str:
        """Validate entry file path for security."""
        if ".." in v:
            raise ValueError("entry cannot contain '..'")
        if v.startswith("/"):
            raise ValueError("entry must be a relative path")
        # Normalize and check for path traversal
        normalized = Path(v).as_posix()
        if normalized.startswith("../") or "/../" in normalized:
            raise ValueError("entry cannot contain path traversal")
        return v

validate_prefix(v) classmethod

Validate mount prefix format.

Source code in src/chapkit/core/api/app.py
@field_validator("prefix")
@classmethod
def validate_prefix(cls, v: str) -> str:
    """Validate mount prefix format."""
    if not v.startswith("/"):
        raise ValueError("prefix must start with '/'")
    if ".." in v:
        raise ValueError("prefix cannot contain '..'")
    if v.startswith("/api/") or v == "/api":
        raise ValueError("prefix cannot be '/api' or start with '/api/'")
    return v

validate_entry(v) classmethod

Validate entry file path for security.

Source code in src/chapkit/core/api/app.py
@field_validator("entry")
@classmethod
def validate_entry(cls, v: str) -> str:
    """Validate entry file path for security."""
    if ".." in v:
        raise ValueError("entry cannot contain '..'")
    if v.startswith("/"):
        raise ValueError("entry must be a relative path")
    # Normalize and check for path traversal
    normalized = Path(v).as_posix()
    if normalized.startswith("../") or "/../" in normalized:
        raise ValueError("entry cannot contain path traversal")
    return v

App

App dataclass

Represents a loaded app with manifest and directory.

Source code in src/chapkit/core/api/app.py
@dataclass
class App:
    """Represents a loaded app with manifest and directory."""

    manifest: AppManifest
    directory: Path
    prefix: str  # May differ from manifest if overridden
    is_package: bool  # True if loaded from package resources

AppManager

AppManager

Lightweight manager for app metadata queries.

Source code in src/chapkit/core/api/app.py
class AppManager:
    """Lightweight manager for app metadata queries."""

    def __init__(self, apps: list[App]):
        """Initialize with loaded apps."""
        self._apps = apps

    def list(self) -> list[App]:
        """Return all installed apps."""
        return self._apps

    def get(self, prefix: str) -> App | None:
        """Get app by mount prefix."""
        return next((app for app in self._apps if app.prefix == prefix), None)

__init__(apps)

Initialize with loaded apps.

Source code in src/chapkit/core/api/app.py
def __init__(self, apps: list[App]):
    """Initialize with loaded apps."""
    self._apps = apps

list()

Return all installed apps.

Source code in src/chapkit/core/api/app.py
def list(self) -> list[App]:
    """Return all installed apps."""
    return self._apps

get(prefix)

Get app by mount prefix.

Source code in src/chapkit/core/api/app.py
def get(self, prefix: str) -> App | None:
    """Get app by mount prefix."""
    return next((app for app in self._apps if app.prefix == prefix), None)

AppInfo

AppInfo

Bases: BaseModel

App metadata for API responses.

Source code in src/chapkit/core/api/app.py
class AppInfo(BaseModel):
    """App metadata for API responses."""

    name: str = Field(description="Human-readable app name")
    version: str = Field(description="Semantic version")
    prefix: str = Field(description="URL prefix for mounting")
    description: str | None = Field(default=None, description="App description")
    author: str | None = Field(default=None, description="Author name")
    entry: str = Field(description="Entry point filename")
    is_package: bool = Field(description="Whether app is loaded from package resources")

Authentication

API key authentication middleware and utilities.

APIKeyMiddleware

APIKeyMiddleware

Bases: BaseHTTPMiddleware

Middleware for API key authentication via X-API-Key header.

Source code in src/chapkit/core/api/auth.py
class APIKeyMiddleware(BaseHTTPMiddleware):
    """Middleware for API key authentication via X-API-Key header."""

    def __init__(
        self,
        app: Any,
        *,
        api_keys: Set[str],
        header_name: str = "X-API-Key",
        unauthenticated_paths: Set[str],
    ) -> None:
        """Initialize API key middleware.

        Args:
            app: ASGI application
            api_keys: Set of valid API keys
            header_name: HTTP header name for API key
            unauthenticated_paths: Paths that don't require authentication
        """
        super().__init__(app)
        self.api_keys = api_keys
        self.header_name = header_name
        self.unauthenticated_paths = unauthenticated_paths

    async def dispatch(self, request: Request, call_next: MiddlewareCallNext) -> Response:
        """Process request with API key authentication."""
        # Allow unauthenticated access to specific paths
        if request.url.path in self.unauthenticated_paths:
            return await call_next(request)

        # Extract API key from header
        api_key = request.headers.get(self.header_name)

        if not api_key:
            logger.warning(
                "auth.missing_key",
                path=request.url.path,
                method=request.method,
            )
            problem = ProblemDetail(
                type="urn:chapkit:error:unauthorized",
                title="Unauthorized",
                status=status.HTTP_401_UNAUTHORIZED,
                detail=f"Missing authentication header: {self.header_name}",
                instance=str(request.url.path),
            )
            return JSONResponse(
                status_code=status.HTTP_401_UNAUTHORIZED,
                content=problem.model_dump(exclude_none=True),
                media_type="application/problem+json",
            )

        # Validate API key
        if api_key not in self.api_keys:
            # Log only prefix for security
            key_prefix = api_key[:7] if len(api_key) >= 7 else "***"
            logger.warning(
                "auth.invalid_key",
                key_prefix=key_prefix,
                path=request.url.path,
                method=request.method,
            )
            problem = ProblemDetail(
                type="urn:chapkit:error:unauthorized",
                title="Unauthorized",
                status=status.HTTP_401_UNAUTHORIZED,
                detail="Invalid API key",
                instance=str(request.url.path),
            )
            return JSONResponse(
                status_code=status.HTTP_401_UNAUTHORIZED,
                content=problem.model_dump(exclude_none=True),
                media_type="application/problem+json",
            )

        # Attach key prefix to request state for logging
        request.state.api_key_prefix = api_key[:7] if len(api_key) >= 7 else "***"

        logger.info(
            "auth.success",
            key_prefix=request.state.api_key_prefix,
            path=request.url.path,
        )

        return await call_next(request)

__init__(app, *, api_keys, header_name='X-API-Key', unauthenticated_paths)

Initialize API key middleware.

Parameters:

Name Type Description Default
app Any

ASGI application

required
api_keys Set[str]

Set of valid API keys

required
header_name str

HTTP header name for API key

'X-API-Key'
unauthenticated_paths Set[str]

Paths that don't require authentication

required
Source code in src/chapkit/core/api/auth.py
def __init__(
    self,
    app: Any,
    *,
    api_keys: Set[str],
    header_name: str = "X-API-Key",
    unauthenticated_paths: Set[str],
) -> None:
    """Initialize API key middleware.

    Args:
        app: ASGI application
        api_keys: Set of valid API keys
        header_name: HTTP header name for API key
        unauthenticated_paths: Paths that don't require authentication
    """
    super().__init__(app)
    self.api_keys = api_keys
    self.header_name = header_name
    self.unauthenticated_paths = unauthenticated_paths

dispatch(request, call_next) async

Process request with API key authentication.

Source code in src/chapkit/core/api/auth.py
async def dispatch(self, request: Request, call_next: MiddlewareCallNext) -> Response:
    """Process request with API key authentication."""
    # Allow unauthenticated access to specific paths
    if request.url.path in self.unauthenticated_paths:
        return await call_next(request)

    # Extract API key from header
    api_key = request.headers.get(self.header_name)

    if not api_key:
        logger.warning(
            "auth.missing_key",
            path=request.url.path,
            method=request.method,
        )
        problem = ProblemDetail(
            type="urn:chapkit:error:unauthorized",
            title="Unauthorized",
            status=status.HTTP_401_UNAUTHORIZED,
            detail=f"Missing authentication header: {self.header_name}",
            instance=str(request.url.path),
        )
        return JSONResponse(
            status_code=status.HTTP_401_UNAUTHORIZED,
            content=problem.model_dump(exclude_none=True),
            media_type="application/problem+json",
        )

    # Validate API key
    if api_key not in self.api_keys:
        # Log only prefix for security
        key_prefix = api_key[:7] if len(api_key) >= 7 else "***"
        logger.warning(
            "auth.invalid_key",
            key_prefix=key_prefix,
            path=request.url.path,
            method=request.method,
        )
        problem = ProblemDetail(
            type="urn:chapkit:error:unauthorized",
            title="Unauthorized",
            status=status.HTTP_401_UNAUTHORIZED,
            detail="Invalid API key",
            instance=str(request.url.path),
        )
        return JSONResponse(
            status_code=status.HTTP_401_UNAUTHORIZED,
            content=problem.model_dump(exclude_none=True),
            media_type="application/problem+json",
        )

    # Attach key prefix to request state for logging
    request.state.api_key_prefix = api_key[:7] if len(api_key) >= 7 else "***"

    logger.info(
        "auth.success",
        key_prefix=request.state.api_key_prefix,
        path=request.url.path,
    )

    return await call_next(request)

Middleware

Error handling and logging middleware.

middleware

FastAPI middleware for error handling, CORS, and other cross-cutting concerns.

RequestLoggingMiddleware

Bases: BaseHTTPMiddleware

Middleware for logging HTTP requests with unique request IDs and context binding.

Source code in src/chapkit/core/api/middleware.py
class RequestLoggingMiddleware(BaseHTTPMiddleware):
    """Middleware for logging HTTP requests with unique request IDs and context binding."""

    async def dispatch(self, request: Request, call_next: MiddlewareCallNext) -> Response:
        """Process request with logging and context binding."""
        request_id = str(ULID())
        start_time = time.perf_counter()

        # Bind request context
        add_request_context(
            request_id=request_id,
            method=request.method,
            path=request.url.path,
            client_host=request.client.host if request.client else None,
        )

        # Add request_id to request state for access in endpoints
        request.state.request_id = request_id

        logger.info(
            "http.request.start",
            query_params=str(request.url.query) if request.url.query else None,
        )

        try:
            response = await call_next(request)
            duration_ms = (time.perf_counter() - start_time) * 1000

            logger.info(
                "http.request.complete",
                status_code=response.status_code,
                duration_ms=round(duration_ms, 2),
            )

            # Add request_id to response headers for tracing
            response.headers["X-Request-ID"] = request_id

            return response

        except Exception as exc:
            duration_ms = (time.perf_counter() - start_time) * 1000

            logger.error(
                "http.request.error",
                duration_ms=round(duration_ms, 2),
                error=str(exc),
                exc_info=True,
            )
            raise

        finally:
            # Clear request context after response
            reset_request_context()

dispatch(request, call_next) async

Process request with logging and context binding.

Source code in src/chapkit/core/api/middleware.py
async def dispatch(self, request: Request, call_next: MiddlewareCallNext) -> Response:
    """Process request with logging and context binding."""
    request_id = str(ULID())
    start_time = time.perf_counter()

    # Bind request context
    add_request_context(
        request_id=request_id,
        method=request.method,
        path=request.url.path,
        client_host=request.client.host if request.client else None,
    )

    # Add request_id to request state for access in endpoints
    request.state.request_id = request_id

    logger.info(
        "http.request.start",
        query_params=str(request.url.query) if request.url.query else None,
    )

    try:
        response = await call_next(request)
        duration_ms = (time.perf_counter() - start_time) * 1000

        logger.info(
            "http.request.complete",
            status_code=response.status_code,
            duration_ms=round(duration_ms, 2),
        )

        # Add request_id to response headers for tracing
        response.headers["X-Request-ID"] = request_id

        return response

    except Exception as exc:
        duration_ms = (time.perf_counter() - start_time) * 1000

        logger.error(
            "http.request.error",
            duration_ms=round(duration_ms, 2),
            error=str(exc),
            exc_info=True,
        )
        raise

    finally:
        # Clear request context after response
        reset_request_context()

database_error_handler(request, exc) async

Handle database errors and return error response.

Source code in src/chapkit/core/api/middleware.py
async def database_error_handler(request: Request, exc: Exception) -> JSONResponse:
    """Handle database errors and return error response."""
    logger.error(
        "database.error",
        error=str(exc),
        path=request.url.path,
        exc_info=True,
    )
    return JSONResponse(
        status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
        content={"detail": "Database error occurred", "error": str(exc)},
    )

validation_error_handler(request, exc) async

Handle validation errors and return error response.

Source code in src/chapkit/core/api/middleware.py
async def validation_error_handler(request: Request, exc: Exception) -> JSONResponse:
    """Handle validation errors and return error response."""
    logger.warning(
        "validation.error",
        error=str(exc),
        path=request.url.path,
    )
    return JSONResponse(
        status_code=status.HTTP_422_UNPROCESSABLE_CONTENT,
        content={"detail": "Validation error", "errors": str(exc)},
    )

chapkit_exception_handler(request, exc) async

Handle ChapkitException and return RFC 9457 Problem Details.

Source code in src/chapkit/core/api/middleware.py
async def chapkit_exception_handler(request: Request, exc: ChapkitException) -> JSONResponse:
    """Handle ChapkitException and return RFC 9457 Problem Details."""
    logger.warning(
        "chapkit.error",
        error_type=exc.type_uri,
        status=exc.status,
        detail=exc.detail,
        path=request.url.path,
    )

    problem = ProblemDetail(
        type=exc.type_uri,
        title=exc.title,
        status=exc.status,
        detail=exc.detail,
        instance=exc.instance or str(request.url),
        **exc.extensions,
    )

    return JSONResponse(
        status_code=exc.status,
        content=problem.model_dump(exclude_none=True),
        media_type="application/problem+json",
    )

add_error_handlers(app)

Add error handlers to FastAPI application.

Source code in src/chapkit/core/api/middleware.py
def add_error_handlers(app: Any) -> None:
    """Add error handlers to FastAPI application."""
    from pydantic import ValidationError
    from sqlalchemy.exc import SQLAlchemyError

    app.add_exception_handler(ChapkitException, chapkit_exception_handler)
    app.add_exception_handler(SQLAlchemyError, database_error_handler)
    app.add_exception_handler(ValidationError, validation_error_handler)

add_logging_middleware(app)

Add request logging middleware to FastAPI application.

Source code in src/chapkit/core/api/middleware.py
def add_logging_middleware(app: Any) -> None:
    """Add request logging middleware to FastAPI application."""
    app.add_middleware(RequestLoggingMiddleware)

Dependencies

FastAPI dependency injection functions.

dependencies

Generic FastAPI dependency injection for database and scheduler.

set_database(database)

Set the global database instance.

Source code in src/chapkit/core/api/dependencies.py
def set_database(database: Database) -> None:
    """Set the global database instance."""
    global _database
    _database = database

get_database()

Get the global database instance.

Source code in src/chapkit/core/api/dependencies.py
def get_database() -> Database:
    """Get the global database instance."""
    if _database is None:
        raise RuntimeError("Database not initialized. Call set_database() during app startup.")
    return _database

get_session(db) async

Get a database session for dependency injection.

Source code in src/chapkit/core/api/dependencies.py
async def get_session(db: Annotated[Database, Depends(get_database)]) -> AsyncIterator[AsyncSession]:
    """Get a database session for dependency injection."""
    async with db.session() as session:
        yield session

set_scheduler(scheduler)

Set the global scheduler instance.

Source code in src/chapkit/core/api/dependencies.py
def set_scheduler(scheduler: JobScheduler) -> None:
    """Set the global scheduler instance."""
    global _scheduler
    _scheduler = scheduler

get_scheduler()

Get the global scheduler instance.

Source code in src/chapkit/core/api/dependencies.py
def get_scheduler() -> JobScheduler:
    """Get the global scheduler instance."""
    if _scheduler is None:
        raise RuntimeError("Scheduler not initialized. Call set_scheduler() during app startup.")
    return _scheduler

set_app_manager(manager)

Set the global app manager instance.

Source code in src/chapkit/core/api/dependencies.py
def set_app_manager(manager: AppManager) -> None:
    """Set the global app manager instance."""
    global _app_manager
    _app_manager = manager

get_app_manager()

Get the global app manager instance.

Source code in src/chapkit/core/api/dependencies.py
def get_app_manager() -> AppManager:
    """Get the global app manager instance."""
    if _app_manager is None:
        raise RuntimeError("AppManager not initialized. Call set_app_manager() during app startup.")
    return _app_manager

Pagination

Pagination helpers for collection endpoints.

pagination

Pagination utilities for API endpoints.

PaginationParams

Bases: BaseModel

Query parameters for opt-in pagination (both page and size required).

Source code in src/chapkit/core/api/pagination.py
class PaginationParams(BaseModel):
    """Query parameters for opt-in pagination (both page and size required)."""

    page: int | None = Field(default=None, ge=1, description="Page number (1-indexed)")
    size: int | None = Field(default=None, ge=1, le=100, description="Number of items per page (max 100)")

    def is_paginated(self) -> bool:
        """Check if both page and size parameters are provided."""
        return self.page is not None and self.size is not None

is_paginated()

Check if both page and size parameters are provided.

Source code in src/chapkit/core/api/pagination.py
def is_paginated(self) -> bool:
    """Check if both page and size parameters are provided."""
    return self.page is not None and self.size is not None

create_paginated_response(items, total, page, size)

Create paginated response with items and metadata.

Source code in src/chapkit/core/api/pagination.py
def create_paginated_response(items: list[T], total: int, page: int, size: int) -> PaginatedResponse[T]:
    """Create paginated response with items and metadata."""
    return PaginatedResponse(items=items, total=total, page=page, size=size)

Utilities

Utility functions for FastAPI applications.

utilities

Utility functions for FastAPI routers and endpoints.

build_location_url(request, path)

Build a full URL for the Location header.

Source code in src/chapkit/core/api/utilities.py
def build_location_url(request: Request, path: str) -> str:
    """Build a full URL for the Location header."""
    return f"{request.url.scheme}://{request.url.netloc}{path}"

run_app(app, *, host=None, port=None, workers=None, reload=None, log_level=None, **uvicorn_kwargs)

Run FastAPI app with Uvicorn development server.

For reload to work, pass a string in "module:app" format. App instance disables reload automatically.

Examples:


# Direct execution (reload disabled)
if __name__ == "__main__":
    run_app(app)

# With module path (reload enabled)
run_app("examples.config_api:app")

# Production: multiple workers
run_app(app, workers=4)

Parameters:

Name Type Description Default
app Any | str

FastAPI app instance OR string "module:app" path

required
host str | None

Server host (default: "127.0.0.1", env: HOST)

None
port int | None

Server port (default: 8000, env: PORT)

None
workers int | None

Number of worker processes (default: 1, env: WORKERS)

None
reload bool | None

Enable auto-reload (default: True for string, False for instance)

None
log_level str | None

Logging level (default: from LOG_LEVEL env var or "info")

None
**uvicorn_kwargs Any

Additional uvicorn.run() arguments

{}
Source code in src/chapkit/core/api/utilities.py
def run_app(
    app: Any | str,
    *,
    host: str | None = None,
    port: int | None = None,
    workers: int | None = None,
    reload: bool | None = None,
    log_level: str | None = None,
    **uvicorn_kwargs: Any,
) -> None:
    """Run FastAPI app with Uvicorn development server.

    For reload to work, pass a string in "module:app" format.
    App instance disables reload automatically.

    Examples:
    --------
        # Direct execution (reload disabled)
        if __name__ == "__main__":
            run_app(app)

        # With module path (reload enabled)
        run_app("examples.config_api:app")

        # Production: multiple workers
        run_app(app, workers=4)

    Args:
        app: FastAPI app instance OR string "module:app" path
        host: Server host (default: "127.0.0.1", env: HOST)
        port: Server port (default: 8000, env: PORT)
        workers: Number of worker processes (default: 1, env: WORKERS)
        reload: Enable auto-reload (default: True for string, False for instance)
        log_level: Logging level (default: from LOG_LEVEL env var or "info")
        **uvicorn_kwargs: Additional uvicorn.run() arguments
    """
    import uvicorn

    # Configure structured logging before uvicorn starts
    from chapkit.core.logging import configure_logging

    configure_logging()

    # Read from environment variables with defaults
    resolved_host: str = host if host is not None else os.getenv("HOST", "127.0.0.1")
    resolved_port: int = port if port is not None else int(os.getenv("PORT", "8000"))
    resolved_workers: int = workers if workers is not None else int(os.getenv("WORKERS", "1"))
    resolved_log_level: str = log_level if log_level is not None else os.getenv("LOG_LEVEL", "info").lower()

    # Auto-detect reload behavior if not specified
    if reload is None:
        reload = isinstance(app, str)  # Enable reload for string paths, disable for instances

    # Auto-reload is incompatible with multiple workers
    if resolved_workers > 1 and reload:
        reload = False

    uvicorn.run(
        app,
        host=resolved_host,
        port=resolved_port,
        workers=resolved_workers,
        reload=reload,
        log_level=resolved_log_level,
        log_config=None,  # Disable uvicorn's default logging config
        **uvicorn_kwargs,
    )

Application Layer

High-level application orchestration.

ServiceBuilder

Domain-aware service builder with module support.

ServiceBuilder

Bases: BaseServiceBuilder

Service builder with integrated module support (config, artifact, task).

Source code in src/chapkit/api/service_builder.py
class ServiceBuilder(BaseServiceBuilder):
    """Service builder with integrated module support (config, artifact, task)."""

    def __init__(self, **kwargs: Any) -> None:
        """Initialize service builder with module-specific state."""
        super().__init__(**kwargs)
        self._config_options: _ConfigOptions | None = None
        self._artifact_options: _ArtifactOptions | None = None
        self._task_options: _TaskOptions | None = None
        self._ml_options: _MLOptions | None = None

    # --------------------------------------------------------------------- Module-specific fluent methods

    def with_config(
        self,
        schema: type[BaseConfig],
        *,
        prefix: str = "/api/v1/configs",
        tags: List[str] | None = None,
        permissions: CrudPermissions | None = None,
        allow_create: bool | None = None,
        allow_read: bool | None = None,
        allow_update: bool | None = None,
        allow_delete: bool | None = None,
    ) -> Self:
        base = permissions or CrudPermissions()
        perms = CrudPermissions(
            create=allow_create if allow_create is not None else base.create,
            read=allow_read if allow_read is not None else base.read,
            update=allow_update if allow_update is not None else base.update,
            delete=allow_delete if allow_delete is not None else base.delete,
        )
        self._config_options = _ConfigOptions(
            schema=schema,
            prefix=prefix,
            tags=list(tags) if tags else ["Config"],
            permissions=perms,
        )
        return self

    def with_artifacts(
        self,
        *,
        hierarchy: ArtifactHierarchy,
        prefix: str = "/api/v1/artifacts",
        tags: List[str] | None = None,
        enable_config_linking: bool = False,
        permissions: CrudPermissions | None = None,
        allow_create: bool | None = None,
        allow_read: bool | None = None,
        allow_update: bool | None = None,
        allow_delete: bool | None = None,
    ) -> Self:
        base = permissions or CrudPermissions()
        perms = CrudPermissions(
            create=allow_create if allow_create is not None else base.create,
            read=allow_read if allow_read is not None else base.read,
            update=allow_update if allow_update is not None else base.update,
            delete=allow_delete if allow_delete is not None else base.delete,
        )
        self._artifact_options = _ArtifactOptions(
            hierarchy=hierarchy,
            prefix=prefix,
            tags=list(tags) if tags else ["Artifacts"],
            enable_config_linking=enable_config_linking,
            permissions=perms,
        )
        return self

    def with_tasks(
        self,
        *,
        prefix: str = "/api/v1/tasks",
        tags: List[str] | None = None,
        permissions: CrudPermissions | None = None,
        validate_on_startup: bool = True,
        allow_create: bool | None = None,
        allow_read: bool | None = None,
        allow_update: bool | None = None,
        allow_delete: bool | None = None,
    ) -> Self:
        """Enable task execution endpoints with script runner."""
        base = permissions or CrudPermissions()
        perms = CrudPermissions(
            create=allow_create if allow_create is not None else base.create,
            read=allow_read if allow_read is not None else base.read,
            update=allow_update if allow_update is not None else base.update,
            delete=allow_delete if allow_delete is not None else base.delete,
        )
        self._task_options = _TaskOptions(
            prefix=prefix,
            tags=list(tags) if tags else ["Tasks"],
            permissions=perms,
            validate_on_startup=validate_on_startup,
        )
        return self

    def with_ml(
        self,
        runner: ModelRunnerProtocol,
        *,
        prefix: str = "/api/v1/ml",
        tags: List[str] | None = None,
    ) -> Self:
        """Enable ML train/predict endpoints with model runner."""
        self._ml_options = _MLOptions(
            runner=runner,
            prefix=prefix,
            tags=list(tags) if tags else ["ML"],
        )
        return self

    # --------------------------------------------------------------------- Extension point implementations

    def _validate_module_configuration(self) -> None:
        """Validate module-specific configuration."""
        if self._artifact_options and self._artifact_options.enable_config_linking and not self._config_options:
            raise ValueError(
                "Artifact config-linking requires a config schema. "
                "Call `with_config(...)` before enabling config linking in artifacts."
            )

        if self._task_options and not self._artifact_options:
            raise ValueError(
                "Task execution requires artifacts to store results. Call `with_artifacts(...)` before `with_tasks()`."
            )

        if self._ml_options:
            if not self._config_options:
                raise ValueError(
                    "ML operations require config for model configuration. "
                    "Call `with_config(...)` before `with_ml(...)`."
                )
            if not self._artifact_options:
                raise ValueError(
                    "ML operations require artifacts for model storage. "
                    "Call `with_artifacts(...)` before `with_ml(...)`."
                )
            if not self._job_options:
                raise ValueError(
                    "ML operations require job scheduler for async execution. "
                    "Call `with_jobs(...)` before `with_ml(...)`."
                )

    def _register_module_routers(self, app: FastAPI) -> None:
        """Register module-specific routers (config, artifact, task)."""
        if self._config_options:
            config_options = self._config_options
            config_schema = config_options.schema
            config_dep = self._build_config_dependency(config_schema)
            entity_in_type: type[ConfigIn[BaseConfig]] = ConfigIn[config_schema]  # type: ignore[valid-type]
            entity_out_type: type[ConfigOut[BaseConfig]] = ConfigOut[config_schema]  # type: ignore[valid-type]
            config_router = ConfigRouter.create(
                prefix=config_options.prefix,
                tags=config_options.tags,
                manager_factory=config_dep,
                entity_in_type=entity_in_type,
                entity_out_type=entity_out_type,
                permissions=config_options.permissions,
                enable_artifact_operations=(
                    self._artifact_options is not None and self._artifact_options.enable_config_linking
                ),
            )
            app.include_router(config_router)
            app.dependency_overrides[default_get_config_manager] = config_dep

        if self._artifact_options:
            artifact_options = self._artifact_options
            artifact_dep = self._build_artifact_dependency(
                hierarchy=artifact_options.hierarchy,
                include_config=artifact_options.enable_config_linking,
            )
            artifact_router = ArtifactRouter.create(
                prefix=artifact_options.prefix,
                tags=artifact_options.tags,
                manager_factory=artifact_dep,
                entity_in_type=ArtifactIn,
                entity_out_type=ArtifactOut,
                permissions=artifact_options.permissions,
                enable_config_access=self._config_options is not None and artifact_options.enable_config_linking,
            )
            app.include_router(artifact_router)
            app.dependency_overrides[default_get_artifact_manager] = artifact_dep

        if self._task_options:
            task_options = self._task_options
            task_dep = self._build_task_dependency()
            task_router = TaskRouter.create(
                prefix=task_options.prefix,
                tags=task_options.tags,
                manager_factory=task_dep,
                entity_in_type=TaskIn,
                entity_out_type=TaskOut,
                permissions=task_options.permissions,
            )
            app.include_router(task_router)
            app.dependency_overrides[default_get_task_manager] = task_dep

            # Register validation startup hook if enabled
            if task_options.validate_on_startup:

                async def _validate_tasks_on_startup(app_instance: FastAPI) -> None:
                    """Validate and disable orphaned Python tasks on startup."""
                    await validate_and_disable_orphaned_tasks(app_instance)

                self._startup_hooks.append(_validate_tasks_on_startup)

        if self._ml_options:
            ml_options = self._ml_options
            ml_dep = self._build_ml_dependency()
            ml_router = MLRouter.create(
                prefix=ml_options.prefix,
                tags=ml_options.tags,
                manager_factory=ml_dep,
            )
            app.include_router(ml_router)
            app.dependency_overrides[default_get_ml_manager] = ml_dep

    # --------------------------------------------------------------------- Module dependency builders

    @staticmethod
    def _build_config_dependency(
        schema: type[BaseConfig],
    ) -> DependencyFactory:
        async def _dependency(session: AsyncSession = Depends(get_session)) -> ConfigManager[BaseConfig]:
            repo = ConfigRepository(session)
            return ConfigManager[BaseConfig](repo, schema)

        return _dependency

    @staticmethod
    def _build_artifact_dependency(
        *,
        hierarchy: ArtifactHierarchy,
        include_config: bool,
    ) -> DependencyFactory:
        async def _dependency(session: AsyncSession = Depends(get_session)) -> ArtifactManager:
            artifact_repo = ArtifactRepository(session)
            config_repo = ConfigRepository(session) if include_config else None
            return ArtifactManager(artifact_repo, hierarchy=hierarchy, config_repo=config_repo)

        return _dependency

    @staticmethod
    def _build_task_dependency() -> DependencyFactory:
        async def _dependency(
            session: AsyncSession = Depends(get_session),
            artifact_manager: ArtifactManager = Depends(default_get_artifact_manager),
        ) -> TaskManager:
            repo = TaskRepository(session)
            try:
                scheduler = get_scheduler()
            except RuntimeError:
                scheduler = None
            try:
                database = get_database()
            except RuntimeError:
                database = None
            return TaskManager(repo, scheduler, database, artifact_manager)

        return _dependency

    def _build_ml_dependency(self) -> DependencyFactory:
        ml_runner = self._ml_options.runner if self._ml_options else None
        config_schema = self._config_options.schema if self._config_options else None

        async def _dependency() -> MLManager:
            if ml_runner is None:
                raise RuntimeError("ML runner not configured")
            if config_schema is None:
                raise RuntimeError("Config schema not configured")

            runner: ModelRunnerProtocol = ml_runner
            scheduler = get_scheduler()
            database = get_database()
            return MLManager(runner, scheduler, database, config_schema)

        return _dependency

__init__(**kwargs)

Initialize service builder with module-specific state.

Source code in src/chapkit/api/service_builder.py
def __init__(self, **kwargs: Any) -> None:
    """Initialize service builder with module-specific state."""
    super().__init__(**kwargs)
    self._config_options: _ConfigOptions | None = None
    self._artifact_options: _ArtifactOptions | None = None
    self._task_options: _TaskOptions | None = None
    self._ml_options: _MLOptions | None = None

with_tasks(*, prefix='/api/v1/tasks', tags=None, permissions=None, validate_on_startup=True, allow_create=None, allow_read=None, allow_update=None, allow_delete=None)

Enable task execution endpoints with script runner.

Source code in src/chapkit/api/service_builder.py
def with_tasks(
    self,
    *,
    prefix: str = "/api/v1/tasks",
    tags: List[str] | None = None,
    permissions: CrudPermissions | None = None,
    validate_on_startup: bool = True,
    allow_create: bool | None = None,
    allow_read: bool | None = None,
    allow_update: bool | None = None,
    allow_delete: bool | None = None,
) -> Self:
    """Enable task execution endpoints with script runner."""
    base = permissions or CrudPermissions()
    perms = CrudPermissions(
        create=allow_create if allow_create is not None else base.create,
        read=allow_read if allow_read is not None else base.read,
        update=allow_update if allow_update is not None else base.update,
        delete=allow_delete if allow_delete is not None else base.delete,
    )
    self._task_options = _TaskOptions(
        prefix=prefix,
        tags=list(tags) if tags else ["Tasks"],
        permissions=perms,
        validate_on_startup=validate_on_startup,
    )
    return self

with_ml(runner, *, prefix='/api/v1/ml', tags=None)

Enable ML train/predict endpoints with model runner.

Source code in src/chapkit/api/service_builder.py
def with_ml(
    self,
    runner: ModelRunnerProtocol,
    *,
    prefix: str = "/api/v1/ml",
    tags: List[str] | None = None,
) -> Self:
    """Enable ML train/predict endpoints with model runner."""
    self._ml_options = _MLOptions(
        runner=runner,
        prefix=prefix,
        tags=list(tags) if tags else ["ML"],
    )
    return self

MLServiceBuilder

Specialized builder for machine learning services.

MLServiceBuilder

Bases: ServiceBuilder

Specialized service builder for ML services with all required components pre-configured.

Source code in src/chapkit/api/service_builder.py
class MLServiceBuilder(ServiceBuilder):
    """Specialized service builder for ML services with all required components pre-configured."""

    def __init__(
        self,
        *,
        info: ServiceInfo | MLServiceInfo,
        config_schema: type[BaseConfig],
        hierarchy: ArtifactHierarchy,
        runner: ModelRunnerProtocol,
        database_url: str = "sqlite+aiosqlite:///:memory:",
        include_error_handlers: bool = True,
        include_logging: bool = True,
    ) -> None:
        """Initialize ML service builder with required ML components."""
        super().__init__(
            info=info,
            database_url=database_url,
            include_error_handlers=include_error_handlers,
            include_logging=include_logging,
        )

        # Automatically configure required ML components
        self.with_health()
        self.with_system()
        self.with_config(config_schema)
        self.with_artifacts(hierarchy=hierarchy, enable_config_linking=True)
        self.with_jobs()
        self.with_landing_page()
        self.with_ml(runner=runner)

__init__(*, info, config_schema, hierarchy, runner, database_url='sqlite+aiosqlite:///:memory:', include_error_handlers=True, include_logging=True)

Initialize ML service builder with required ML components.

Source code in src/chapkit/api/service_builder.py
def __init__(
    self,
    *,
    info: ServiceInfo | MLServiceInfo,
    config_schema: type[BaseConfig],
    hierarchy: ArtifactHierarchy,
    runner: ModelRunnerProtocol,
    database_url: str = "sqlite+aiosqlite:///:memory:",
    include_error_handlers: bool = True,
    include_logging: bool = True,
) -> None:
    """Initialize ML service builder with required ML components."""
    super().__init__(
        info=info,
        database_url=database_url,
        include_error_handlers=include_error_handlers,
        include_logging=include_logging,
    )

    # Automatically configure required ML components
    self.with_health()
    self.with_system()
    self.with_config(config_schema)
    self.with_artifacts(hierarchy=hierarchy, enable_config_linking=True)
    self.with_jobs()
    self.with_landing_page()
    self.with_ml(runner=runner)

Dependencies

Application-level dependency injection functions.

dependencies

Feature-specific FastAPI dependency injection for managers.

get_config_manager(session) async

Get a config manager instance for dependency injection.

Source code in src/chapkit/api/dependencies.py
async def get_config_manager(session: Annotated[AsyncSession, Depends(get_session)]) -> ConfigManager[BaseConfig]:
    """Get a config manager instance for dependency injection."""
    repo = ConfigRepository(session)
    return ConfigManager[BaseConfig](repo, BaseConfig)

get_artifact_manager(session) async

Get an artifact manager instance for dependency injection.

Source code in src/chapkit/api/dependencies.py
async def get_artifact_manager(session: Annotated[AsyncSession, Depends(get_session)]) -> ArtifactManager:
    """Get an artifact manager instance for dependency injection."""
    artifact_repo = ArtifactRepository(session)
    config_repo = ConfigRepository(session)
    return ArtifactManager(artifact_repo, config_repo=config_repo)

get_task_manager(session, artifact_manager) async

Get a task manager instance for dependency injection.

Source code in src/chapkit/api/dependencies.py
async def get_task_manager(
    session: Annotated[AsyncSession, Depends(get_session)],
    artifact_manager: Annotated[ArtifactManager, Depends(get_artifact_manager)],
) -> TaskManager:
    """Get a task manager instance for dependency injection."""
    from chapkit.core import Database
    from chapkit.core.scheduler import JobScheduler

    repo = TaskRepository(session)

    # Get scheduler if available
    scheduler: JobScheduler | None
    try:
        scheduler = get_scheduler()
    except RuntimeError:
        scheduler = None

    # Get database if available
    database: Database | None
    try:
        database = get_database()
    except RuntimeError:
        database = None

    return TaskManager(repo, scheduler, database, artifact_manager)

get_ml_manager() async

Get an ML manager instance for dependency injection.

Note: This is a placeholder. The actual dependency is built by ServiceBuilder with the runner in closure, then overridden via app.dependency_overrides.

Source code in src/chapkit/api/dependencies.py
async def get_ml_manager() -> MLManager:
    """Get an ML manager instance for dependency injection.

    Note: This is a placeholder. The actual dependency is built by ServiceBuilder
    with the runner in closure, then overridden via app.dependency_overrides.
    """
    raise RuntimeError("ML manager dependency not configured. Use ServiceBuilder.with_ml() to enable ML operations.")

Domain Modules

Vertical slice modules with complete functionality.

Config Module

Key-value configuration with JSON data support.

config

Config feature - key-value configuration with JSON data storage.

ConfigManager

Bases: BaseManager[Config, ConfigIn[DataT], ConfigOut[DataT], ULID]

Manager for Config entities with artifact linking operations.

Source code in src/chapkit/modules/config/manager.py
class ConfigManager[DataT: BaseConfig](BaseManager[Config, ConfigIn[DataT], ConfigOut[DataT], ULID]):
    """Manager for Config entities with artifact linking operations."""

    def __init__(self, repo: ConfigRepository, data_cls: type[DataT]) -> None:
        """Initialize config manager with repository and data class."""
        super().__init__(repo, Config, ConfigOut)
        self.repo: ConfigRepository = repo
        self.data_cls = data_cls

    async def find_by_name(self, name: str) -> ConfigOut[DataT] | None:
        """Find a config by its unique name."""
        config = await self.repo.find_by_name(name)
        if config:
            return self._to_output_schema(config)
        return None

    async def link_artifact(self, config_id: ULID, artifact_id: ULID) -> None:
        """Link a config to a root artifact."""
        await self.repo.link_artifact(config_id, artifact_id)
        await self.repo.commit()

    async def unlink_artifact(self, artifact_id: ULID) -> None:
        """Unlink an artifact from its config."""
        await self.repo.unlink_artifact(artifact_id)
        await self.repo.commit()

    async def get_config_for_artifact(
        self, artifact_id: ULID, artifact_repo: ArtifactRepository
    ) -> ConfigOut[DataT] | None:
        """Get the config for an artifact by traversing to its root."""
        root = await artifact_repo.get_root_artifact(artifact_id)
        if root is None:
            return None

        config = await self.repo.find_by_root_artifact_id(root.id)
        if config is None:
            return None

        return self._to_output_schema(config)

    async def get_linked_artifacts(self, config_id: ULID) -> list[ArtifactOut]:
        """Get all root artifacts linked to a config."""
        artifacts = await self.repo.find_artifacts_for_config(config_id)
        return [ArtifactOut.model_validate(artifact, from_attributes=True) for artifact in artifacts]

    def _to_output_schema(self, entity: Config) -> ConfigOut[DataT]:
        """Convert ORM entity to output schema with proper data class validation."""
        return ConfigOut[DataT].model_validate(entity, from_attributes=True, context={"data_cls": self.data_cls})

__init__(repo, data_cls)

Initialize config manager with repository and data class.

Source code in src/chapkit/modules/config/manager.py
def __init__(self, repo: ConfigRepository, data_cls: type[DataT]) -> None:
    """Initialize config manager with repository and data class."""
    super().__init__(repo, Config, ConfigOut)
    self.repo: ConfigRepository = repo
    self.data_cls = data_cls

find_by_name(name) async

Find a config by its unique name.

Source code in src/chapkit/modules/config/manager.py
async def find_by_name(self, name: str) -> ConfigOut[DataT] | None:
    """Find a config by its unique name."""
    config = await self.repo.find_by_name(name)
    if config:
        return self._to_output_schema(config)
    return None

Link a config to a root artifact.

Source code in src/chapkit/modules/config/manager.py
async def link_artifact(self, config_id: ULID, artifact_id: ULID) -> None:
    """Link a config to a root artifact."""
    await self.repo.link_artifact(config_id, artifact_id)
    await self.repo.commit()

Unlink an artifact from its config.

Source code in src/chapkit/modules/config/manager.py
async def unlink_artifact(self, artifact_id: ULID) -> None:
    """Unlink an artifact from its config."""
    await self.repo.unlink_artifact(artifact_id)
    await self.repo.commit()

get_config_for_artifact(artifact_id, artifact_repo) async

Get the config for an artifact by traversing to its root.

Source code in src/chapkit/modules/config/manager.py
async def get_config_for_artifact(
    self, artifact_id: ULID, artifact_repo: ArtifactRepository
) -> ConfigOut[DataT] | None:
    """Get the config for an artifact by traversing to its root."""
    root = await artifact_repo.get_root_artifact(artifact_id)
    if root is None:
        return None

    config = await self.repo.find_by_root_artifact_id(root.id)
    if config is None:
        return None

    return self._to_output_schema(config)

get_linked_artifacts(config_id) async

Get all root artifacts linked to a config.

Source code in src/chapkit/modules/config/manager.py
async def get_linked_artifacts(self, config_id: ULID) -> list[ArtifactOut]:
    """Get all root artifacts linked to a config."""
    artifacts = await self.repo.find_artifacts_for_config(config_id)
    return [ArtifactOut.model_validate(artifact, from_attributes=True) for artifact in artifacts]

Config

Bases: Entity

ORM model for configuration with JSON data storage.

Source code in src/chapkit/modules/config/models.py
class Config(Entity):
    """ORM model for configuration with JSON data storage."""

    __tablename__ = "configs"

    name: Mapped[str] = mapped_column(index=True)
    _data_json: Mapped[dict[str, Any]] = mapped_column("data", JSON, nullable=False)

    @property
    def data(self) -> dict[str, Any]:
        """Return JSON data as dict."""
        return self._data_json

    @data.setter
    def data(self, value: BaseConfig | dict[str, Any]) -> None:
        """Serialize Pydantic model to JSON or store dict directly."""
        if isinstance(value, dict):
            self._data_json = value
        elif hasattr(value, "model_dump") and callable(value.model_dump):
            # BaseConfig or other Pydantic model
            self._data_json = value.model_dump(mode="json")
        else:
            raise TypeError(f"data must be a BaseConfig subclass or dict, got {type(value)}")

data property writable

Return JSON data as dict.

ConfigArtifact

Bases: Base

Junction table linking Configs to root Artifacts.

Source code in src/chapkit/modules/config/models.py
class ConfigArtifact(Base):
    """Junction table linking Configs to root Artifacts."""

    __tablename__ = "config_artifacts"

    config_id: Mapped[ULID] = mapped_column(
        ULIDType,
        ForeignKey("configs.id", ondelete="CASCADE"),
        primary_key=True,
    )

    artifact_id: Mapped[ULID] = mapped_column(
        ULIDType,
        ForeignKey("artifacts.id", ondelete="CASCADE"),
        primary_key=True,
        unique=True,
    )

    __table_args__ = (UniqueConstraint("artifact_id", name="uq_artifact_id"),)

ConfigRepository

Bases: BaseRepository[Config, ULID]

Repository for Config entities with artifact linking operations.

Source code in src/chapkit/modules/config/repository.py
class ConfigRepository(BaseRepository[Config, ULID]):
    """Repository for Config entities with artifact linking operations."""

    def __init__(self, session: AsyncSession) -> None:
        """Initialize config repository with database session."""
        super().__init__(session, Config)

    async def find_by_name(self, name: str) -> Config | None:
        """Find a config by its unique name."""
        result = await self.s.scalars(select(self.model).where(self.model.name == name))
        return result.one_or_none()

    async def link_artifact(self, config_id: ULID, artifact_id: ULID) -> None:
        """Link a config to a root artifact."""
        artifact = await self.s.get(Artifact, artifact_id)
        if artifact is None:
            raise ValueError(f"Artifact {artifact_id} not found")
        if artifact.parent_id is not None:
            raise ValueError(f"Artifact {artifact_id} is not a root artifact (parent_id={artifact.parent_id})")

        link = ConfigArtifact(config_id=config_id, artifact_id=artifact_id)
        self.s.add(link)

    async def unlink_artifact(self, artifact_id: ULID) -> None:
        """Unlink an artifact from its config."""
        stmt = sql_delete(ConfigArtifact).where(ConfigArtifact.artifact_id == artifact_id)
        await self.s.execute(stmt)

    async def delete_by_id(self, id: ULID) -> None:
        """Delete a config and cascade delete all linked artifact trees."""
        from chapkit.modules.artifact.repository import ArtifactRepository

        linked_artifacts = await self.find_artifacts_for_config(id)

        artifact_repo = ArtifactRepository(self.s)
        for root_artifact in linked_artifacts:
            subtree = await artifact_repo.find_subtree(root_artifact.id)
            for artifact in subtree:
                await self.s.delete(artifact)

        await super().delete_by_id(id)

    async def find_by_root_artifact_id(self, artifact_id: ULID) -> Config | None:
        """Find the config linked to a root artifact."""
        stmt = (
            select(Config)
            .join(ConfigArtifact, Config.id == ConfigArtifact.config_id)
            .where(ConfigArtifact.artifact_id == artifact_id)
        )
        result = await self.s.scalars(stmt)
        return result.one_or_none()

    async def find_artifacts_for_config(self, config_id: ULID) -> list[Artifact]:
        """Find all root artifacts linked to a config."""
        stmt = (
            select(Artifact)
            .join(ConfigArtifact, Artifact.id == ConfigArtifact.artifact_id)
            .where(ConfigArtifact.config_id == config_id)
        )
        result = await self.s.scalars(stmt)
        return list(result.all())

__init__(session)

Initialize config repository with database session.

Source code in src/chapkit/modules/config/repository.py
def __init__(self, session: AsyncSession) -> None:
    """Initialize config repository with database session."""
    super().__init__(session, Config)

find_by_name(name) async

Find a config by its unique name.

Source code in src/chapkit/modules/config/repository.py
async def find_by_name(self, name: str) -> Config | None:
    """Find a config by its unique name."""
    result = await self.s.scalars(select(self.model).where(self.model.name == name))
    return result.one_or_none()

Link a config to a root artifact.

Source code in src/chapkit/modules/config/repository.py
async def link_artifact(self, config_id: ULID, artifact_id: ULID) -> None:
    """Link a config to a root artifact."""
    artifact = await self.s.get(Artifact, artifact_id)
    if artifact is None:
        raise ValueError(f"Artifact {artifact_id} not found")
    if artifact.parent_id is not None:
        raise ValueError(f"Artifact {artifact_id} is not a root artifact (parent_id={artifact.parent_id})")

    link = ConfigArtifact(config_id=config_id, artifact_id=artifact_id)
    self.s.add(link)

Unlink an artifact from its config.

Source code in src/chapkit/modules/config/repository.py
async def unlink_artifact(self, artifact_id: ULID) -> None:
    """Unlink an artifact from its config."""
    stmt = sql_delete(ConfigArtifact).where(ConfigArtifact.artifact_id == artifact_id)
    await self.s.execute(stmt)

delete_by_id(id) async

Delete a config and cascade delete all linked artifact trees.

Source code in src/chapkit/modules/config/repository.py
async def delete_by_id(self, id: ULID) -> None:
    """Delete a config and cascade delete all linked artifact trees."""
    from chapkit.modules.artifact.repository import ArtifactRepository

    linked_artifacts = await self.find_artifacts_for_config(id)

    artifact_repo = ArtifactRepository(self.s)
    for root_artifact in linked_artifacts:
        subtree = await artifact_repo.find_subtree(root_artifact.id)
        for artifact in subtree:
            await self.s.delete(artifact)

    await super().delete_by_id(id)

find_by_root_artifact_id(artifact_id) async

Find the config linked to a root artifact.

Source code in src/chapkit/modules/config/repository.py
async def find_by_root_artifact_id(self, artifact_id: ULID) -> Config | None:
    """Find the config linked to a root artifact."""
    stmt = (
        select(Config)
        .join(ConfigArtifact, Config.id == ConfigArtifact.config_id)
        .where(ConfigArtifact.artifact_id == artifact_id)
    )
    result = await self.s.scalars(stmt)
    return result.one_or_none()

find_artifacts_for_config(config_id) async

Find all root artifacts linked to a config.

Source code in src/chapkit/modules/config/repository.py
async def find_artifacts_for_config(self, config_id: ULID) -> list[Artifact]:
    """Find all root artifacts linked to a config."""
    stmt = (
        select(Artifact)
        .join(ConfigArtifact, Artifact.id == ConfigArtifact.artifact_id)
        .where(ConfigArtifact.config_id == config_id)
    )
    result = await self.s.scalars(stmt)
    return list(result.all())

ConfigRouter

Bases: CrudRouter[ConfigIn[BaseConfig], ConfigOut[BaseConfig]]

CRUD router for Config entities with artifact linking operations.

Source code in src/chapkit/modules/config/router.py
class ConfigRouter(CrudRouter[ConfigIn[BaseConfig], ConfigOut[BaseConfig]]):
    """CRUD router for Config entities with artifact linking operations."""

    def __init__(
        self,
        prefix: str,
        tags: Sequence[str],
        manager_factory: Any,
        entity_in_type: type[ConfigIn[BaseConfig]],
        entity_out_type: type[ConfigOut[BaseConfig]],
        permissions: CrudPermissions | None = None,
        enable_artifact_operations: bool = False,
        **kwargs: Any,
    ) -> None:
        """Initialize config router with entity types and manager factory."""
        self.enable_artifact_operations = enable_artifact_operations
        super().__init__(
            prefix=prefix,
            tags=list(tags),
            entity_in_type=entity_in_type,
            entity_out_type=entity_out_type,
            manager_factory=manager_factory,
            permissions=permissions,
            **kwargs,
        )

    def _register_routes(self) -> None:
        """Register config CRUD routes and artifact linking operations."""
        super()._register_routes()

        if not self.enable_artifact_operations:
            return

        manager_factory = self.manager_factory

        async def link_artifact(
            entity_id: str,
            request: LinkArtifactRequest,
            manager: ConfigManager[BaseConfig] = Depends(manager_factory),
        ) -> None:
            config_id = self._parse_ulid(entity_id)

            try:
                await manager.link_artifact(config_id, request.artifact_id)
            except ValueError as e:
                raise HTTPException(
                    status_code=status.HTTP_400_BAD_REQUEST,
                    detail=str(e),
                )

        async def unlink_artifact(
            entity_id: str,
            request: UnlinkArtifactRequest,
            manager: ConfigManager[BaseConfig] = Depends(manager_factory),
        ) -> None:
            try:
                await manager.unlink_artifact(request.artifact_id)
            except Exception as e:
                raise HTTPException(
                    status_code=status.HTTP_400_BAD_REQUEST,
                    detail=str(e),
                )

        async def get_linked_artifacts(
            entity_id: str,
            manager: ConfigManager[BaseConfig] = Depends(manager_factory),
        ) -> list[ArtifactOut]:
            config_id = self._parse_ulid(entity_id)
            return await manager.get_linked_artifacts(config_id)

        self.register_entity_operation(
            "link-artifact",
            link_artifact,
            http_method="POST",
            status_code=status.HTTP_204_NO_CONTENT,
            summary="Link artifact to config",
            description="Link a config to a root artifact (parent_id IS NULL)",
        )

        self.register_entity_operation(
            "unlink-artifact",
            unlink_artifact,
            http_method="POST",
            status_code=status.HTTP_204_NO_CONTENT,
            summary="Unlink artifact from config",
            description="Remove the link between a config and an artifact",
        )

        self.register_entity_operation(
            "artifacts",
            get_linked_artifacts,
            http_method="GET",
            response_model=list[ArtifactOut],
            summary="Get linked artifacts",
            description="Get all root artifacts linked to this config",
        )

__init__(prefix, tags, manager_factory, entity_in_type, entity_out_type, permissions=None, enable_artifact_operations=False, **kwargs)

Initialize config router with entity types and manager factory.

Source code in src/chapkit/modules/config/router.py
def __init__(
    self,
    prefix: str,
    tags: Sequence[str],
    manager_factory: Any,
    entity_in_type: type[ConfigIn[BaseConfig]],
    entity_out_type: type[ConfigOut[BaseConfig]],
    permissions: CrudPermissions | None = None,
    enable_artifact_operations: bool = False,
    **kwargs: Any,
) -> None:
    """Initialize config router with entity types and manager factory."""
    self.enable_artifact_operations = enable_artifact_operations
    super().__init__(
        prefix=prefix,
        tags=list(tags),
        entity_in_type=entity_in_type,
        entity_out_type=entity_out_type,
        manager_factory=manager_factory,
        permissions=permissions,
        **kwargs,
    )

BaseConfig

Bases: BaseModel

Base class for configuration schemas with arbitrary extra fields allowed.

Source code in src/chapkit/modules/config/schemas.py
class BaseConfig(BaseModel):
    """Base class for configuration schemas with arbitrary extra fields allowed."""

    model_config = {"extra": "allow"}

ConfigIn

Bases: EntityIn

Input schema for creating or updating configurations.

Source code in src/chapkit/modules/config/schemas.py
class ConfigIn[DataT: BaseConfig](EntityIn):
    """Input schema for creating or updating configurations."""

    name: str
    data: DataT

ConfigOut

Bases: EntityOut

Output schema for configuration entities.

Source code in src/chapkit/modules/config/schemas.py
class ConfigOut[DataT: BaseConfig](EntityOut):
    """Output schema for configuration entities."""

    name: str
    data: DataT

    model_config = {"ser_json_timedelta": "float", "ser_json_bytes": "base64"}

    @field_validator("data", mode="before")
    @classmethod
    def convert_dict_to_model(cls, v: Any, info: ValidationInfo) -> Any:
        """Convert dict to BaseConfig model if data_cls is provided in validation context."""
        if isinstance(v, BaseConfig):
            return v
        if isinstance(v, dict):
            if info.context and "data_cls" in info.context:
                data_cls = info.context["data_cls"]
                return data_cls.model_validate(v)
        return v

    @field_serializer("data", when_used="json")
    def serialize_data(self, value: DataT) -> dict[str, Any]:
        """Serialize BaseConfig data to JSON dict."""
        if isinstance(value, BaseConfig):  # pyright: ignore[reportUnnecessaryIsInstance]
            return value.model_dump(mode="json")
        return value

convert_dict_to_model(v, info) classmethod

Convert dict to BaseConfig model if data_cls is provided in validation context.

Source code in src/chapkit/modules/config/schemas.py
@field_validator("data", mode="before")
@classmethod
def convert_dict_to_model(cls, v: Any, info: ValidationInfo) -> Any:
    """Convert dict to BaseConfig model if data_cls is provided in validation context."""
    if isinstance(v, BaseConfig):
        return v
    if isinstance(v, dict):
        if info.context and "data_cls" in info.context:
            data_cls = info.context["data_cls"]
            return data_cls.model_validate(v)
    return v

serialize_data(value)

Serialize BaseConfig data to JSON dict.

Source code in src/chapkit/modules/config/schemas.py
@field_serializer("data", when_used="json")
def serialize_data(self, value: DataT) -> dict[str, Any]:
    """Serialize BaseConfig data to JSON dict."""
    if isinstance(value, BaseConfig):  # pyright: ignore[reportUnnecessaryIsInstance]
        return value.model_dump(mode="json")
    return value

LinkArtifactRequest

Bases: BaseModel

Request schema for linking an artifact to a config.

Source code in src/chapkit/modules/config/schemas.py
class LinkArtifactRequest(BaseModel):
    """Request schema for linking an artifact to a config."""

    artifact_id: ULID

UnlinkArtifactRequest

Bases: BaseModel

Request schema for unlinking an artifact from a config.

Source code in src/chapkit/modules/config/schemas.py
class UnlinkArtifactRequest(BaseModel):
    """Request schema for unlinking an artifact from a config."""

    artifact_id: ULID

Artifact Module

Hierarchical artifact tree storage.

artifact

Artifact feature - hierarchical data storage with parent-child relationships.

ArtifactManager

Bases: BaseManager[Artifact, ArtifactIn, ArtifactOut, ULID]

Manager for Artifact entities with hierarchical tree operations.

Source code in src/chapkit/modules/artifact/manager.py
class ArtifactManager(BaseManager[Artifact, ArtifactIn, ArtifactOut, ULID]):
    """Manager for Artifact entities with hierarchical tree operations."""

    def __init__(
        self,
        repo: ArtifactRepository,
        hierarchy: ArtifactHierarchy | None = None,
        config_repo: ConfigRepository | None = None,
    ) -> None:
        """Initialize artifact manager with repository, hierarchy, and optional config repo."""
        super().__init__(repo, Artifact, ArtifactOut)
        self.repo: ArtifactRepository = repo
        self.hierarchy = hierarchy
        self.config_repo = config_repo

    # Public API ------------------------------------------------------

    async def find_subtree(self, start_id: ULID) -> list[ArtifactTreeNode]:
        """Find all artifacts in the subtree rooted at the given ID."""
        artifacts = await self.repo.find_subtree(start_id)
        return [self._to_tree_node(artifact) for artifact in artifacts]

    async def expand_artifact(self, artifact_id: ULID) -> ArtifactTreeNode | None:
        """Expand a single artifact with hierarchy metadata but without children."""
        artifact = await self.repo.find_by_id(artifact_id)
        if artifact is None:
            return None

        node = self._to_tree_node(artifact)
        node.children = None

        # Populate config if available and artifact is a root
        if self.config_repo is not None:
            config = await self.config_repo.find_by_root_artifact_id(artifact_id)
            if config is not None:
                # Use model_construct to bypass validation since we don't know the concrete data type
                node.config = ConfigOut[BaseConfig].model_construct(
                    id=config.id,
                    created_at=config.created_at,
                    updated_at=config.updated_at,
                    name=config.name,
                    data=config.data,
                )

        return node

    async def build_tree(self, start_id: ULID) -> ArtifactTreeNode | None:
        """Build a hierarchical tree structure rooted at the given artifact ID."""
        artifacts = await self.find_subtree(start_id)
        if not artifacts:
            return None

        node_map: dict[ULID, ArtifactTreeNode] = {}
        for node in artifacts:
            node.children = []
            node_map[node.id] = node

        for node in artifacts:
            if node.parent_id is None:
                continue
            parent = node_map.get(node.parent_id)
            if parent is None:
                continue
            if parent.children is None:
                parent.children = []
            parent.children.append(node)

        # Keep children as [] for leaf nodes (semantic: "loaded but empty")
        # Only expand_artifact sets children=None (semantic: "not loaded")

        root = node_map.get(start_id)

        # Populate config for root node only
        if root is not None and self.config_repo is not None:
            config = await self.config_repo.find_by_root_artifact_id(start_id)
            if config is not None:
                # Use model_construct to bypass validation since we don't know the concrete data type
                root.config = ConfigOut[BaseConfig].model_construct(
                    id=config.id,
                    created_at=config.created_at,
                    updated_at=config.updated_at,
                    name=config.name,
                    data=config.data,
                )

        return root

    # Lifecycle overrides --------------------------------------------

    def _should_assign_field(self, field: str, value: object) -> bool:
        """Prevent assigning None to level field during updates."""
        if field == "level" and value is None:
            return False
        return super()._should_assign_field(field, value)

    async def pre_save(self, entity: Artifact, data: ArtifactIn) -> None:
        """Compute and set artifact level before saving."""
        entity.level = await self._compute_level(entity.parent_id)

    async def pre_update(self, entity: Artifact, data: ArtifactIn, old_values: dict[str, object]) -> None:
        """Recalculate artifact level and cascade updates to descendants if parent changed."""
        previous_level = old_values.get("level", entity.level)
        entity.level = await self._compute_level(entity.parent_id)
        parent_changed = old_values.get("parent_id") != entity.parent_id
        if parent_changed or previous_level != entity.level:
            await self._recalculate_descendants(entity)

    # Helper utilities ------------------------------------------------

    async def _compute_level(self, parent_id: ULID | None) -> int:
        """Compute the level of an artifact based on its parent."""
        if parent_id is None:
            return 0
        parent = await self.repo.find_by_id(parent_id)
        if parent is None:
            return 0  # pragma: no cover
        return parent.level + 1

    async def _recalculate_descendants(self, entity: Artifact) -> None:
        """Recalculate levels for all descendants of an artifact."""
        subtree = await self.repo.find_subtree(entity.id)
        by_parent: dict[ULID | None, list[Artifact]] = {}
        for node in subtree:
            by_parent.setdefault(node.parent_id, []).append(node)

        queue: deque[Artifact] = deque([entity])
        while queue:
            current = queue.popleft()
            for child in by_parent.get(current.id, []):
                child.level = current.level + 1
                queue.append(child)

    def _to_tree_node(self, entity: Artifact) -> ArtifactTreeNode:
        """Convert artifact entity to tree node with hierarchy metadata."""
        base = super()._to_output_schema(entity)
        node = ArtifactTreeNode.from_artifact(base)
        if self.hierarchy is not None:
            meta = self.hierarchy.describe(node.level)
            hierarchy_value = meta.get(self.hierarchy.hierarchy_key)
            if hierarchy_value is not None:
                node.hierarchy = str(hierarchy_value)
            label_value = meta.get(self.hierarchy.label_key)
            if label_value is not None:
                node.level_label = str(label_value)

        return node

__init__(repo, hierarchy=None, config_repo=None)

Initialize artifact manager with repository, hierarchy, and optional config repo.

Source code in src/chapkit/modules/artifact/manager.py
def __init__(
    self,
    repo: ArtifactRepository,
    hierarchy: ArtifactHierarchy | None = None,
    config_repo: ConfigRepository | None = None,
) -> None:
    """Initialize artifact manager with repository, hierarchy, and optional config repo."""
    super().__init__(repo, Artifact, ArtifactOut)
    self.repo: ArtifactRepository = repo
    self.hierarchy = hierarchy
    self.config_repo = config_repo

find_subtree(start_id) async

Find all artifacts in the subtree rooted at the given ID.

Source code in src/chapkit/modules/artifact/manager.py
async def find_subtree(self, start_id: ULID) -> list[ArtifactTreeNode]:
    """Find all artifacts in the subtree rooted at the given ID."""
    artifacts = await self.repo.find_subtree(start_id)
    return [self._to_tree_node(artifact) for artifact in artifacts]

expand_artifact(artifact_id) async

Expand a single artifact with hierarchy metadata but without children.

Source code in src/chapkit/modules/artifact/manager.py
async def expand_artifact(self, artifact_id: ULID) -> ArtifactTreeNode | None:
    """Expand a single artifact with hierarchy metadata but without children."""
    artifact = await self.repo.find_by_id(artifact_id)
    if artifact is None:
        return None

    node = self._to_tree_node(artifact)
    node.children = None

    # Populate config if available and artifact is a root
    if self.config_repo is not None:
        config = await self.config_repo.find_by_root_artifact_id(artifact_id)
        if config is not None:
            # Use model_construct to bypass validation since we don't know the concrete data type
            node.config = ConfigOut[BaseConfig].model_construct(
                id=config.id,
                created_at=config.created_at,
                updated_at=config.updated_at,
                name=config.name,
                data=config.data,
            )

    return node

build_tree(start_id) async

Build a hierarchical tree structure rooted at the given artifact ID.

Source code in src/chapkit/modules/artifact/manager.py
async def build_tree(self, start_id: ULID) -> ArtifactTreeNode | None:
    """Build a hierarchical tree structure rooted at the given artifact ID."""
    artifacts = await self.find_subtree(start_id)
    if not artifacts:
        return None

    node_map: dict[ULID, ArtifactTreeNode] = {}
    for node in artifacts:
        node.children = []
        node_map[node.id] = node

    for node in artifacts:
        if node.parent_id is None:
            continue
        parent = node_map.get(node.parent_id)
        if parent is None:
            continue
        if parent.children is None:
            parent.children = []
        parent.children.append(node)

    # Keep children as [] for leaf nodes (semantic: "loaded but empty")
    # Only expand_artifact sets children=None (semantic: "not loaded")

    root = node_map.get(start_id)

    # Populate config for root node only
    if root is not None and self.config_repo is not None:
        config = await self.config_repo.find_by_root_artifact_id(start_id)
        if config is not None:
            # Use model_construct to bypass validation since we don't know the concrete data type
            root.config = ConfigOut[BaseConfig].model_construct(
                id=config.id,
                created_at=config.created_at,
                updated_at=config.updated_at,
                name=config.name,
                data=config.data,
            )

    return root

pre_save(entity, data) async

Compute and set artifact level before saving.

Source code in src/chapkit/modules/artifact/manager.py
async def pre_save(self, entity: Artifact, data: ArtifactIn) -> None:
    """Compute and set artifact level before saving."""
    entity.level = await self._compute_level(entity.parent_id)

pre_update(entity, data, old_values) async

Recalculate artifact level and cascade updates to descendants if parent changed.

Source code in src/chapkit/modules/artifact/manager.py
async def pre_update(self, entity: Artifact, data: ArtifactIn, old_values: dict[str, object]) -> None:
    """Recalculate artifact level and cascade updates to descendants if parent changed."""
    previous_level = old_values.get("level", entity.level)
    entity.level = await self._compute_level(entity.parent_id)
    parent_changed = old_values.get("parent_id") != entity.parent_id
    if parent_changed or previous_level != entity.level:
        await self._recalculate_descendants(entity)

Artifact

Bases: Entity

ORM model for hierarchical artifacts with parent-child relationships.

Source code in src/chapkit/modules/artifact/models.py
class Artifact(Entity):
    """ORM model for hierarchical artifacts with parent-child relationships."""

    __tablename__ = "artifacts"

    parent_id: Mapped[ULID | None] = mapped_column(
        ULIDType,
        ForeignKey("artifacts.id", ondelete="SET NULL"),
        nullable=True,
        index=True,
    )

    parent: Mapped[Artifact | None] = relationship(
        remote_side="Artifact.id",
        back_populates="children",
    )

    children: Mapped[list[Artifact]] = relationship(
        back_populates="parent",
    )

    data: Mapped[Any] = mapped_column(PickleType(protocol=4), nullable=False)
    level: Mapped[int] = mapped_column(default=0, nullable=False, index=True)

ArtifactRepository

Bases: BaseRepository[Artifact, ULID]

Repository for Artifact entities with tree traversal operations.

Source code in src/chapkit/modules/artifact/repository.py
class ArtifactRepository(BaseRepository[Artifact, ULID]):
    """Repository for Artifact entities with tree traversal operations."""

    def __init__(self, session: AsyncSession) -> None:
        """Initialize artifact repository with database session."""
        super().__init__(session, Artifact)

    async def find_by_id(self, id: ULID) -> Artifact | None:
        """Find an artifact by ID with children eagerly loaded."""
        return await self.s.get(self.model, id, options=[selectinload(self.model.children)])

    async def find_subtree(self, start_id: ULID) -> Iterable[Artifact]:
        """Find all artifacts in the subtree rooted at the given ID using recursive CTE."""
        cte = select(self.model.id).where(self.model.id == start_id).cte(name="descendants", recursive=True)
        cte = cte.union_all(select(self.model.id).where(self.model.parent_id == cte.c.id))

        subtree_ids = (await self.s.scalars(select(cte.c.id))).all()
        rows = (await self.s.scalars(select(self.model).where(self.model.id.in_(subtree_ids)))).all()
        return rows

    async def get_root_artifact(self, artifact_id: ULID) -> Artifact | None:
        """Find the root artifact by traversing up the parent chain."""
        artifact = await self.s.get(self.model, artifact_id)
        if artifact is None:
            return None

        while artifact.parent_id is not None:
            parent = await self.s.get(self.model, artifact.parent_id)
            if parent is None:
                break
            artifact = parent

        return artifact

__init__(session)

Initialize artifact repository with database session.

Source code in src/chapkit/modules/artifact/repository.py
def __init__(self, session: AsyncSession) -> None:
    """Initialize artifact repository with database session."""
    super().__init__(session, Artifact)

find_by_id(id) async

Find an artifact by ID with children eagerly loaded.

Source code in src/chapkit/modules/artifact/repository.py
async def find_by_id(self, id: ULID) -> Artifact | None:
    """Find an artifact by ID with children eagerly loaded."""
    return await self.s.get(self.model, id, options=[selectinload(self.model.children)])

find_subtree(start_id) async

Find all artifacts in the subtree rooted at the given ID using recursive CTE.

Source code in src/chapkit/modules/artifact/repository.py
async def find_subtree(self, start_id: ULID) -> Iterable[Artifact]:
    """Find all artifacts in the subtree rooted at the given ID using recursive CTE."""
    cte = select(self.model.id).where(self.model.id == start_id).cte(name="descendants", recursive=True)
    cte = cte.union_all(select(self.model.id).where(self.model.parent_id == cte.c.id))

    subtree_ids = (await self.s.scalars(select(cte.c.id))).all()
    rows = (await self.s.scalars(select(self.model).where(self.model.id.in_(subtree_ids)))).all()
    return rows

get_root_artifact(artifact_id) async

Find the root artifact by traversing up the parent chain.

Source code in src/chapkit/modules/artifact/repository.py
async def get_root_artifact(self, artifact_id: ULID) -> Artifact | None:
    """Find the root artifact by traversing up the parent chain."""
    artifact = await self.s.get(self.model, artifact_id)
    if artifact is None:
        return None

    while artifact.parent_id is not None:
        parent = await self.s.get(self.model, artifact.parent_id)
        if parent is None:
            break
        artifact = parent

    return artifact

ArtifactRouter

Bases: CrudRouter[ArtifactIn, ArtifactOut]

CRUD router for Artifact entities with tree operations and config access.

Source code in src/chapkit/modules/artifact/router.py
class ArtifactRouter(CrudRouter[ArtifactIn, ArtifactOut]):
    """CRUD router for Artifact entities with tree operations and config access."""

    def __init__(
        self,
        prefix: str,
        tags: Sequence[str],
        manager_factory: Any,
        entity_in_type: type[ArtifactIn],
        entity_out_type: type[ArtifactOut],
        permissions: CrudPermissions | None = None,
        enable_config_access: bool = False,
        **kwargs: Any,
    ) -> None:
        """Initialize artifact router with entity types and manager factory."""
        self.enable_config_access = enable_config_access
        super().__init__(
            prefix=prefix,
            tags=list(tags),
            entity_in_type=entity_in_type,
            entity_out_type=entity_out_type,
            manager_factory=manager_factory,
            permissions=permissions,
            **kwargs,
        )

    def _register_routes(self) -> None:
        """Register artifact CRUD routes and tree operations."""
        super()._register_routes()

        manager_factory = self.manager_factory

        async def expand_artifact(
            entity_id: str,
            manager: ArtifactManager = Depends(manager_factory),
        ) -> ArtifactTreeNode:
            ulid_id = self._parse_ulid(entity_id)

            expanded = await manager.expand_artifact(ulid_id)
            if expanded is None:
                raise HTTPException(
                    status_code=status.HTTP_404_NOT_FOUND,
                    detail=f"Artifact with id {entity_id} not found",
                )
            return expanded

        async def build_tree(
            entity_id: str,
            manager: ArtifactManager = Depends(manager_factory),
        ) -> ArtifactTreeNode:
            ulid_id = self._parse_ulid(entity_id)

            tree = await manager.build_tree(ulid_id)
            if tree is None:
                raise HTTPException(
                    status_code=status.HTTP_404_NOT_FOUND,
                    detail=f"Artifact with id {entity_id} not found",
                )
            return tree

        self.register_entity_operation(
            "expand",
            expand_artifact,
            response_model=ArtifactTreeNode,
            summary="Expand artifact",
            description="Get artifact with hierarchy metadata but without children",
        )

        self.register_entity_operation(
            "tree",
            build_tree,
            response_model=ArtifactTreeNode,
            summary="Build artifact tree",
            description="Build hierarchical tree structure rooted at the given artifact",
        )

        if self.enable_config_access:
            # Import locally to avoid circular dependency
            from chapkit.api.dependencies import get_config_manager

            async def get_config(
                entity_id: str,
                artifact_manager: ArtifactManager = Depends(manager_factory),
                config_manager: ConfigManager[BaseConfig] = Depends(get_config_manager),
            ) -> ConfigOut[BaseConfig] | None:
                artifact_id = self._parse_ulid(entity_id)
                config = await config_manager.get_config_for_artifact(artifact_id, artifact_manager.repo)
                return config

            self.register_entity_operation(
                "config",
                get_config,
                http_method="GET",
                response_model=ConfigOut[BaseConfig],
                summary="Get artifact config",
                description="Get the config for an artifact by walking up the tree to find the root's config",
            )

__init__(prefix, tags, manager_factory, entity_in_type, entity_out_type, permissions=None, enable_config_access=False, **kwargs)

Initialize artifact router with entity types and manager factory.

Source code in src/chapkit/modules/artifact/router.py
def __init__(
    self,
    prefix: str,
    tags: Sequence[str],
    manager_factory: Any,
    entity_in_type: type[ArtifactIn],
    entity_out_type: type[ArtifactOut],
    permissions: CrudPermissions | None = None,
    enable_config_access: bool = False,
    **kwargs: Any,
) -> None:
    """Initialize artifact router with entity types and manager factory."""
    self.enable_config_access = enable_config_access
    super().__init__(
        prefix=prefix,
        tags=list(tags),
        entity_in_type=entity_in_type,
        entity_out_type=entity_out_type,
        manager_factory=manager_factory,
        permissions=permissions,
        **kwargs,
    )

ArtifactHierarchy

Bases: BaseModel

Configuration for artifact hierarchy with level labels.

Source code in src/chapkit/modules/artifact/schemas.py
class ArtifactHierarchy(BaseModel):
    """Configuration for artifact hierarchy with level labels."""

    name: str = Field(..., description="Human readable name of this hierarchy")
    level_labels: Mapping[int, str] = Field(
        default_factory=dict,
        description="Mapping of numeric levels to labels (0 -> 'train', etc.)",
    )

    model_config = {"frozen": True}

    hierarchy_key: ClassVar[str] = "hierarchy"
    depth_key: ClassVar[str] = "level_depth"
    label_key: ClassVar[str] = "level_label"

    def label_for(self, level: int) -> str:
        """Get the label for a given level or return default."""
        return self.level_labels.get(level, f"level_{level}")

    def describe(self, level: int) -> dict[str, Any]:
        """Get hierarchy metadata dict for a given level."""
        return {
            self.hierarchy_key: self.name,
            self.depth_key: level,
            self.label_key: self.label_for(level),
        }

label_for(level)

Get the label for a given level or return default.

Source code in src/chapkit/modules/artifact/schemas.py
def label_for(self, level: int) -> str:
    """Get the label for a given level or return default."""
    return self.level_labels.get(level, f"level_{level}")

describe(level)

Get hierarchy metadata dict for a given level.

Source code in src/chapkit/modules/artifact/schemas.py
def describe(self, level: int) -> dict[str, Any]:
    """Get hierarchy metadata dict for a given level."""
    return {
        self.hierarchy_key: self.name,
        self.depth_key: level,
        self.label_key: self.label_for(level),
    }

ArtifactIn

Bases: EntityIn

Input schema for creating or updating artifacts.

Source code in src/chapkit/modules/artifact/schemas.py
class ArtifactIn(EntityIn):
    """Input schema for creating or updating artifacts."""

    data: Any
    parent_id: ULID | None = None
    level: int | None = None

ArtifactOut

Bases: EntityOut

Output schema for artifact entities.

Source code in src/chapkit/modules/artifact/schemas.py
class ArtifactOut(EntityOut):
    """Output schema for artifact entities."""

    data: JsonSafe
    parent_id: ULID | None = None
    level: int

ArtifactTreeNode

Bases: ArtifactOut

Artifact node with tree structure metadata and optional config.

Source code in src/chapkit/modules/artifact/schemas.py
class ArtifactTreeNode(ArtifactOut):
    """Artifact node with tree structure metadata and optional config."""

    level_label: str | None = None
    hierarchy: str | None = None
    children: list["ArtifactTreeNode"] | None = None
    config: "ConfigOut[BaseConfig] | None" = None

    @classmethod
    def from_artifact(cls, artifact: ArtifactOut) -> Self:
        """Create a tree node from an artifact output schema."""
        return cls.model_validate(artifact.model_dump())

from_artifact(artifact) classmethod

Create a tree node from an artifact output schema.

Source code in src/chapkit/modules/artifact/schemas.py
@classmethod
def from_artifact(cls, artifact: ArtifactOut) -> Self:
    """Create a tree node from an artifact output schema."""
    return cls.model_validate(artifact.model_dump())

PandasDataFrame

Bases: BaseModel

Pydantic schema for serializing pandas DataFrames.

Source code in src/chapkit/modules/artifact/schemas.py
class PandasDataFrame(BaseModel):
    """Pydantic schema for serializing pandas DataFrames."""

    columns: list[str]
    data: list[list[Any]]

    @classmethod
    def from_dataframe(cls, df: pd.DataFrame) -> Self:
        """Create schema from pandas DataFrame."""
        if not isinstance(df, pd.DataFrame):  # pyright: ignore[reportUnnecessaryIsInstance]
            raise TypeError(f"Expected a pandas DataFrame, but got {type(df)}")
        return cls(columns=df.columns.tolist(), data=df.values.tolist())

    def to_dataframe(self) -> pd.DataFrame:
        """Convert schema back to pandas DataFrame."""
        return pd.DataFrame(self.data, columns=self.columns)

from_dataframe(df) classmethod

Create schema from pandas DataFrame.

Source code in src/chapkit/modules/artifact/schemas.py
@classmethod
def from_dataframe(cls, df: pd.DataFrame) -> Self:
    """Create schema from pandas DataFrame."""
    if not isinstance(df, pd.DataFrame):  # pyright: ignore[reportUnnecessaryIsInstance]
        raise TypeError(f"Expected a pandas DataFrame, but got {type(df)}")
    return cls(columns=df.columns.tolist(), data=df.values.tolist())

to_dataframe()

Convert schema back to pandas DataFrame.

Source code in src/chapkit/modules/artifact/schemas.py
def to_dataframe(self) -> pd.DataFrame:
    """Convert schema back to pandas DataFrame."""
    return pd.DataFrame(self.data, columns=self.columns)

Task Module

Script execution template system.

task

Task feature - reusable command templates for task execution.

TaskManager

Bases: BaseManager[Task, TaskIn, TaskOut, ULID]

Manager for Task template entities with artifact-based execution.

Source code in src/chapkit/modules/task/manager.py
class TaskManager(BaseManager[Task, TaskIn, TaskOut, ULID]):
    """Manager for Task template entities with artifact-based execution."""

    def __init__(
        self,
        repo: TaskRepository,
        scheduler: JobScheduler | None = None,
        database: Database | None = None,
        artifact_manager: ArtifactManager | None = None,
    ) -> None:
        """Initialize task manager with repository, scheduler, database, and artifact manager."""
        super().__init__(repo, Task, TaskOut)
        self.repo: TaskRepository = repo
        self.scheduler = scheduler
        self.database = database
        self.artifact_manager = artifact_manager

    async def find_all(self, *, enabled: bool | None = None) -> list[TaskOut]:
        """Find all tasks, optionally filtered by enabled status."""
        tasks = await self.repo.find_all(enabled=enabled)
        return [self._to_output_schema(task) for task in tasks]

    def _is_injectable_type(self, param_type: type | None) -> bool:
        """Check if a parameter type should be injected by the framework."""
        if param_type is None:
            return False

        # Handle Optional[Type] -> extract the non-None type
        origin = get_origin(param_type)
        if origin is types.UnionType or origin is Union:  # Union type (both syntaxes)
            # For Optional types, we still want to inject if the non-None type is injectable
            # This allows Optional[AsyncSession] to work
            args = getattr(param_type, "__args__", ())
            non_none_types = [arg for arg in args if arg is not type(None)]
            if len(non_none_types) == 1:
                param_type = non_none_types[0]

        # Check if type is in injectable set
        return param_type in INJECTABLE_TYPES

    def _build_injection_map(self, task_id: ULID, session: AsyncSession | None) -> dict[type, Any]:
        """Build map of injectable types to their instances."""
        return {
            AsyncSession: session,
            Database: self.database,
            ArtifactManager: self.artifact_manager,
            JobScheduler: self.scheduler,
        }

    def _inject_parameters(
        self, func: Any, user_params: dict[str, Any], task_id: ULID, session: AsyncSession | None
    ) -> dict[str, Any]:
        """Merge user parameters with framework injections based on function signature."""
        sig = inspect.signature(func)
        type_hints = get_type_hints(func)

        # Build injection map
        injection_map = self._build_injection_map(task_id, session)

        # Start with user parameters
        final_params = dict(user_params)

        # Inspect each parameter in function signature
        for param_name, param in sig.parameters.items():
            # Skip self, *args, **kwargs
            if param.kind in (param.VAR_POSITIONAL, param.VAR_KEYWORD):
                continue

            # Get type hint for this parameter
            param_type = type_hints.get(param_name)

            # Check if this type should be injected
            if self._is_injectable_type(param_type):
                # Get the actual type (handle Optional)
                actual_type = param_type
                origin = get_origin(param_type)
                if origin is types.UnionType or origin is Union:
                    args = getattr(param_type, "__args__", ())
                    non_none_types = [arg for arg in args if arg is not type(None)]
                    if non_none_types:
                        actual_type = non_none_types[0]

                # Inject if we have an instance of this type
                if actual_type in injection_map:
                    injectable_value = injection_map[actual_type]
                    # For required parameters, inject even if None
                    # For optional parameters, only inject if not None
                    if param.default is param.empty:
                        # Required parameter - inject whatever we have (even None)
                        final_params[param_name] = injectable_value
                    elif injectable_value is not None:
                        # Optional parameter - only inject if we have a value
                        final_params[param_name] = injectable_value
                continue

            # Not injectable - must come from user parameters
            if param_name not in final_params:
                # Check if parameter has a default value
                if param.default is not param.empty:
                    continue  # Will use default

                # Required parameter missing
                raise ValueError(
                    f"Missing required parameter '{param_name}' for task function. "
                    f"Parameter is not injectable and not provided in task.parameters."
                )

        return final_params

    async def execute_task(self, task_id: ULID) -> ULID:
        """Execute a task by submitting it to the scheduler and return the job ID."""
        if self.scheduler is None:
            raise ValueError("Task execution requires a scheduler. Use ServiceBuilder.with_jobs() to enable.")

        if self.artifact_manager is None:
            raise ValueError(
                "Task execution requires artifacts. Use ServiceBuilder.with_artifacts() before with_tasks()."
            )

        task = await self.repo.find_by_id(task_id)
        if task is None:
            raise ValueError(f"Task {task_id} not found")

        # Check if task is enabled
        if not task.enabled:
            raise ValueError(f"Cannot execute disabled task {task_id}")

        # Route based on task type
        if task.task_type == "python":
            job_id = await self.scheduler.add_job(self._execute_python, task_id)
        else:  # shell
            job_id = await self.scheduler.add_job(self._execute_command, task_id)

        return job_id

    async def _execute_command(self, task_id: ULID) -> ULID:
        """Execute command and return artifact_id containing results."""
        if self.database is None:
            raise RuntimeError("Database instance required for task execution")

        if self.artifact_manager is None:
            raise RuntimeError("ArtifactManager instance required for task execution")

        # Fetch task and serialize snapshot before execution
        async with self.database.session() as session:
            task_repo = TaskRepository(session)
            task = await task_repo.find_by_id(task_id)
            if task is None:
                raise ValueError(f"Task {task_id} not found")

            # Capture task snapshot
            task_snapshot = {
                "id": str(task.id),
                "command": task.command,
                "created_at": task.created_at.isoformat(),
                "updated_at": task.updated_at.isoformat(),
            }

        # Execute command using asyncio subprocess
        process = await asyncio.create_subprocess_shell(
            task.command,
            stdout=asyncio.subprocess.PIPE,
            stderr=asyncio.subprocess.PIPE,
        )

        # Wait for completion and capture output
        stdout_bytes, stderr_bytes = await process.communicate()

        # Decode outputs
        stdout_text = stdout_bytes.decode("utf-8") if stdout_bytes else ""
        stderr_text = stderr_bytes.decode("utf-8") if stderr_bytes else ""

        # Create artifact with execution results
        result_data: dict[str, Any] = {
            "task": task_snapshot,
            "stdout": stdout_text,
            "stderr": stderr_text,
            "exit_code": process.returncode,
        }

        async with self.database.session() as session:
            artifact_repo = ArtifactRepository(session)
            artifact_mgr = ArtifactManager(artifact_repo)

            artifact_out = await artifact_mgr.save(
                ArtifactIn(
                    data=result_data,
                    parent_id=None,
                )
            )

        return artifact_out.id

    async def _execute_python(self, task_id: ULID) -> ULID:
        """Execute Python function and return artifact_id containing results."""
        if self.database is None:
            raise RuntimeError("Database instance required for task execution")

        if self.artifact_manager is None:
            raise RuntimeError("ArtifactManager instance required for task execution")

        # Create a database session for potential injection
        session_context = self.database.session()
        session = await session_context.__aenter__()

        try:
            # Fetch task and serialize snapshot
            task_repo = TaskRepository(session)
            task = await task_repo.find_by_id(task_id)
            if task is None:
                raise ValueError(f"Task {task_id} not found")

            # Capture task snapshot
            task_snapshot = {
                "id": str(task.id),
                "command": task.command,
                "task_type": task.task_type,
                "parameters": task.parameters,
                "created_at": task.created_at.isoformat(),
                "updated_at": task.updated_at.isoformat(),
            }

            # Get function from registry
            try:
                func = TaskRegistry.get(task.command)
            except KeyError:
                raise ValueError(f"Python function '{task.command}' not found in registry")

            # Execute function with type-based injection
            result_data: dict[str, Any]
            try:
                user_params = task.parameters or {}

                # Inject framework dependencies based on function signature
                final_params = self._inject_parameters(func, user_params, task_id, session)

                # Handle sync/async functions
                if inspect.iscoroutinefunction(func):
                    result = await func(**final_params)
                else:
                    result = await asyncio.to_thread(func, **final_params)

                result_data = {
                    "task": task_snapshot,
                    "result": result,
                    "error": None,
                }
            except Exception as e:
                result_data = {
                    "task": task_snapshot,
                    "result": None,
                    "error": {
                        "type": type(e).__name__,
                        "message": str(e),
                        "traceback": traceback.format_exc(),
                    },
                }
        finally:
            # Always close the session
            await session_context.__aexit__(None, None, None)

        # Create artifact (with a new session)
        async with self.database.session() as artifact_session:
            artifact_repo = ArtifactRepository(artifact_session)
            artifact_mgr = ArtifactManager(artifact_repo)
            artifact_out = await artifact_mgr.save(ArtifactIn(data=result_data, parent_id=None))

        return artifact_out.id

__init__(repo, scheduler=None, database=None, artifact_manager=None)

Initialize task manager with repository, scheduler, database, and artifact manager.

Source code in src/chapkit/modules/task/manager.py
def __init__(
    self,
    repo: TaskRepository,
    scheduler: JobScheduler | None = None,
    database: Database | None = None,
    artifact_manager: ArtifactManager | None = None,
) -> None:
    """Initialize task manager with repository, scheduler, database, and artifact manager."""
    super().__init__(repo, Task, TaskOut)
    self.repo: TaskRepository = repo
    self.scheduler = scheduler
    self.database = database
    self.artifact_manager = artifact_manager

find_all(*, enabled=None) async

Find all tasks, optionally filtered by enabled status.

Source code in src/chapkit/modules/task/manager.py
async def find_all(self, *, enabled: bool | None = None) -> list[TaskOut]:
    """Find all tasks, optionally filtered by enabled status."""
    tasks = await self.repo.find_all(enabled=enabled)
    return [self._to_output_schema(task) for task in tasks]

execute_task(task_id) async

Execute a task by submitting it to the scheduler and return the job ID.

Source code in src/chapkit/modules/task/manager.py
async def execute_task(self, task_id: ULID) -> ULID:
    """Execute a task by submitting it to the scheduler and return the job ID."""
    if self.scheduler is None:
        raise ValueError("Task execution requires a scheduler. Use ServiceBuilder.with_jobs() to enable.")

    if self.artifact_manager is None:
        raise ValueError(
            "Task execution requires artifacts. Use ServiceBuilder.with_artifacts() before with_tasks()."
        )

    task = await self.repo.find_by_id(task_id)
    if task is None:
        raise ValueError(f"Task {task_id} not found")

    # Check if task is enabled
    if not task.enabled:
        raise ValueError(f"Cannot execute disabled task {task_id}")

    # Route based on task type
    if task.task_type == "python":
        job_id = await self.scheduler.add_job(self._execute_python, task_id)
    else:  # shell
        job_id = await self.scheduler.add_job(self._execute_command, task_id)

    return job_id

Task

Bases: Entity

ORM model for reusable task templates containing commands to execute.

Source code in src/chapkit/modules/task/models.py
class Task(Entity):
    """ORM model for reusable task templates containing commands to execute."""

    __tablename__ = "tasks"

    command: Mapped[str] = mapped_column(Text, nullable=False)
    task_type: Mapped[str] = mapped_column(Text, nullable=False, default="shell", server_default="shell")
    parameters: Mapped[dict | None] = mapped_column(JSON, nullable=True)
    enabled: Mapped[bool] = mapped_column(Boolean, nullable=False, default=True, server_default="1")

TaskRegistry

Global registry for Python task functions.

Source code in src/chapkit/modules/task/registry.py
class TaskRegistry:
    """Global registry for Python task functions."""

    _registry: dict[str, Callable[..., Any]] = {}

    @classmethod
    def register(cls, name: str) -> Callable[[Callable[..., Any]], Callable[..., Any]]:
        """Decorator to register a task function with support for type-based dependency injection."""

        def decorator(func: Callable[..., Any]) -> Callable[..., Any]:
            if name in cls._registry:
                raise ValueError(f"Task '{name}' already registered")
            cls._registry[name] = func
            return func

        return decorator

    @classmethod
    def register_function(cls, name: str, func: Callable[..., Any]) -> None:
        """Imperatively register a task function."""
        if name in cls._registry:
            raise ValueError(f"Task '{name}' already registered")
        cls._registry[name] = func

    @classmethod
    def get(cls, name: str) -> Callable[..., Any]:
        """Retrieve a registered task function."""
        if name not in cls._registry:
            raise KeyError(f"Task '{name}' not found in registry")
        return cls._registry[name]

    @classmethod
    def list_all(cls) -> list[str]:
        """List all registered task names."""
        return sorted(cls._registry.keys())

    @classmethod
    def clear(cls) -> None:
        """Clear all registered tasks (useful for testing)."""
        cls._registry.clear()

register(name) classmethod

Decorator to register a task function with support for type-based dependency injection.

Source code in src/chapkit/modules/task/registry.py
@classmethod
def register(cls, name: str) -> Callable[[Callable[..., Any]], Callable[..., Any]]:
    """Decorator to register a task function with support for type-based dependency injection."""

    def decorator(func: Callable[..., Any]) -> Callable[..., Any]:
        if name in cls._registry:
            raise ValueError(f"Task '{name}' already registered")
        cls._registry[name] = func
        return func

    return decorator

register_function(name, func) classmethod

Imperatively register a task function.

Source code in src/chapkit/modules/task/registry.py
@classmethod
def register_function(cls, name: str, func: Callable[..., Any]) -> None:
    """Imperatively register a task function."""
    if name in cls._registry:
        raise ValueError(f"Task '{name}' already registered")
    cls._registry[name] = func

get(name) classmethod

Retrieve a registered task function.

Source code in src/chapkit/modules/task/registry.py
@classmethod
def get(cls, name: str) -> Callable[..., Any]:
    """Retrieve a registered task function."""
    if name not in cls._registry:
        raise KeyError(f"Task '{name}' not found in registry")
    return cls._registry[name]

list_all() classmethod

List all registered task names.

Source code in src/chapkit/modules/task/registry.py
@classmethod
def list_all(cls) -> list[str]:
    """List all registered task names."""
    return sorted(cls._registry.keys())

clear() classmethod

Clear all registered tasks (useful for testing).

Source code in src/chapkit/modules/task/registry.py
@classmethod
def clear(cls) -> None:
    """Clear all registered tasks (useful for testing)."""
    cls._registry.clear()

TaskRepository

Bases: BaseRepository[Task, ULID]

Repository for Task template entities.

Source code in src/chapkit/modules/task/repository.py
class TaskRepository(BaseRepository[Task, ULID]):
    """Repository for Task template entities."""

    def __init__(self, session: AsyncSession) -> None:
        """Initialize task repository with database session."""
        super().__init__(session, Task)

    async def find_by_enabled(self, enabled: bool) -> list[Task]:
        """Find all tasks by enabled status."""
        stmt = select(Task).where(Task.enabled == enabled).order_by(Task.created_at.desc())
        result = await self.s.execute(stmt)
        return list(result.scalars().all())

    async def find_all(self, *, enabled: bool | None = None) -> list[Task]:
        """Find all tasks, optionally filtered by enabled status."""
        if enabled is None:
            result = await super().find_all()
            return list(result)
        return await self.find_by_enabled(enabled)

__init__(session)

Initialize task repository with database session.

Source code in src/chapkit/modules/task/repository.py
def __init__(self, session: AsyncSession) -> None:
    """Initialize task repository with database session."""
    super().__init__(session, Task)

find_by_enabled(enabled) async

Find all tasks by enabled status.

Source code in src/chapkit/modules/task/repository.py
async def find_by_enabled(self, enabled: bool) -> list[Task]:
    """Find all tasks by enabled status."""
    stmt = select(Task).where(Task.enabled == enabled).order_by(Task.created_at.desc())
    result = await self.s.execute(stmt)
    return list(result.scalars().all())

find_all(*, enabled=None) async

Find all tasks, optionally filtered by enabled status.

Source code in src/chapkit/modules/task/repository.py
async def find_all(self, *, enabled: bool | None = None) -> list[Task]:
    """Find all tasks, optionally filtered by enabled status."""
    if enabled is None:
        result = await super().find_all()
        return list(result)
    return await self.find_by_enabled(enabled)

TaskRouter

Bases: CrudRouter[TaskIn, TaskOut]

CRUD router for Task entities with execution operation.

Source code in src/chapkit/modules/task/router.py
class TaskRouter(CrudRouter[TaskIn, TaskOut]):
    """CRUD router for Task entities with execution operation."""

    def __init__(
        self,
        prefix: str,
        tags: Sequence[str],
        manager_factory: Any,
        entity_in_type: type[TaskIn],
        entity_out_type: type[TaskOut],
        permissions: CrudPermissions | None = None,
        **kwargs: Any,
    ) -> None:
        """Initialize task router with entity types and manager factory."""
        super().__init__(
            prefix=prefix,
            tags=list(tags),
            entity_in_type=entity_in_type,
            entity_out_type=entity_out_type,
            manager_factory=manager_factory,
            permissions=permissions,
            **kwargs,
        )

    def _register_find_all_route(self, manager_dependency: Any, manager_annotation: Any) -> None:
        """Register find all route with enabled filtering support."""
        entity_out_annotation: Any = self.entity_out_type
        collection_response_model: Any = list[entity_out_annotation] | PaginatedResponse[entity_out_annotation]

        @self.router.get("", response_model=collection_response_model)
        async def find_all(
            page: int | None = None,
            size: int | None = None,
            enabled: bool | None = Query(None, description="Filter by enabled status"),
            manager: Manager[TaskIn, TaskOut, ULID] = manager_dependency,
        ) -> list[TaskOut] | PaginatedResponse[TaskOut]:
            from chapkit.core.api.pagination import create_paginated_response

            # Pagination is opt-in: both page and size must be provided
            if page is not None and size is not None:
                items, total = await manager.find_paginated(page, size)
                return create_paginated_response(items, total, page, size)

            # Use TaskRepository's find_all with enabled filtering
            # Cast manager to access repository with enabled parameter
            task_manager = manager  # TaskManager with TaskRepository
            return await task_manager.find_all(enabled=enabled)  # type: ignore[call-arg]

        self._annotate_manager(find_all, manager_annotation)
        find_all.__annotations__["return"] = list[entity_out_annotation] | PaginatedResponse[entity_out_annotation]

    def _register_routes(self) -> None:
        """Register task CRUD routes and execution operation."""
        super()._register_routes()

        manager_factory = self.manager_factory

        async def execute_task(
            entity_id: str,
            manager: TaskManager = Depends(manager_factory),
        ) -> TaskExecuteResponse:
            """Execute a task asynchronously via the job scheduler."""
            task_id = self._parse_ulid(entity_id)

            try:
                job_id = await manager.execute_task(task_id)
                return TaskExecuteResponse(
                    job_id=str(job_id),
                    message=f"Task submitted for execution. Job ID: {job_id}",
                )
            except ValueError as e:
                raise HTTPException(
                    status_code=status.HTTP_400_BAD_REQUEST,
                    detail=str(e),
                )
            except RuntimeError as e:
                raise HTTPException(
                    status_code=status.HTTP_409_CONFLICT,
                    detail=str(e),
                )

        self.register_entity_operation(
            "execute",
            execute_task,
            http_method="POST",
            response_model=TaskExecuteResponse,
            status_code=status.HTTP_202_ACCEPTED,
            summary="Execute task",
            description="Submit the task to the scheduler for execution",
        )

__init__(prefix, tags, manager_factory, entity_in_type, entity_out_type, permissions=None, **kwargs)

Initialize task router with entity types and manager factory.

Source code in src/chapkit/modules/task/router.py
def __init__(
    self,
    prefix: str,
    tags: Sequence[str],
    manager_factory: Any,
    entity_in_type: type[TaskIn],
    entity_out_type: type[TaskOut],
    permissions: CrudPermissions | None = None,
    **kwargs: Any,
) -> None:
    """Initialize task router with entity types and manager factory."""
    super().__init__(
        prefix=prefix,
        tags=list(tags),
        entity_in_type=entity_in_type,
        entity_out_type=entity_out_type,
        manager_factory=manager_factory,
        permissions=permissions,
        **kwargs,
    )

TaskIn

Bases: EntityIn

Input schema for creating or updating task templates.

Source code in src/chapkit/modules/task/schemas.py
class TaskIn(EntityIn):
    """Input schema for creating or updating task templates."""

    command: str = Field(description="Shell command or Python function name to execute")
    task_type: Literal["shell", "python"] = Field(default="shell", description="Type of task: 'shell' or 'python'")
    parameters: dict[str, Any] | None = Field(
        default=None, description="Parameters to pass to Python function (ignored for shell tasks)"
    )
    enabled: bool = Field(default=True, description="Whether task is enabled for execution")

TaskOut

Bases: EntityOut

Output schema for task template entities.

Source code in src/chapkit/modules/task/schemas.py
class TaskOut(EntityOut):
    """Output schema for task template entities."""

    command: str = Field(description="Shell command or Python function name to execute")
    task_type: str = Field(description="Type of task: 'shell' or 'python'")
    parameters: dict[str, Any] | None = Field(default=None, description="Parameters to pass to Python function")
    enabled: bool = Field(description="Whether task is enabled for execution")

validate_and_disable_orphaned_tasks(app) async

Validate Python tasks and disable orphaned ones that reference missing functions.

Source code in src/chapkit/modules/task/validation.py
async def validate_and_disable_orphaned_tasks(app: FastAPI) -> int:
    """Validate Python tasks and disable orphaned ones that reference missing functions."""
    database: Database | None = getattr(app.state, "database", None)
    if database is None:
        logger.debug("No database configured, skipping task validation")
        return 0

    disabled_count = 0

    async with database.session() as session:
        task_repo = TaskRepository(session)
        task_manager = TaskManager(task_repo, scheduler=None, database=None, artifact_manager=None)

        # Get all tasks
        all_tasks = await task_manager.find_all()

        # Get registered function names
        registered_functions = set(TaskRegistry.list_all())

        # Find orphaned Python tasks
        orphaned_tasks = [
            task for task in all_tasks if task.task_type == "python" and task.command not in registered_functions
        ]

        if orphaned_tasks:
            logger.warning(
                "Found orphaned Python tasks - disabling them",
                extra={
                    "count": len(orphaned_tasks),
                    "task_ids": [str(task.id) for task in orphaned_tasks],
                    "commands": [task.command for task in orphaned_tasks],
                },
            )

            # Disable each orphaned task
            for task in orphaned_tasks:
                logger.info(
                    f"Disabling orphaned task {task.id}: function '{task.command}' not found in registry",
                    extra={"task_id": str(task.id), "command": task.command, "task_type": task.task_type},
                )

                # Create TaskIn with enabled=False
                task_type_value = task.task_type if task.task_type in ("shell", "python") else "shell"
                task_in = TaskIn(
                    id=task.id,
                    command=task.command,
                    task_type=task_type_value,  # type: ignore[arg-type]
                    parameters=task.parameters,
                    enabled=False,
                )
                await task_manager.save(task_in)
                disabled_count += 1

    if disabled_count > 0:
        logger.warning(f"Disabled {disabled_count} orphaned Python task(s)")
    else:
        logger.debug("No orphaned Python tasks found")

    return disabled_count

ML Module

Machine learning train and predict operations.

ml

ML module for train/predict operations with artifact-based model storage.

MLManager

Manager for ML train/predict operations with job scheduling and artifact storage.

Source code in src/chapkit/modules/ml/manager.py
class MLManager:
    """Manager for ML train/predict operations with job scheduling and artifact storage."""

    def __init__(
        self,
        runner: ModelRunnerProtocol,
        scheduler: JobScheduler,
        database: Database,
        config_schema: type[BaseConfig],
    ) -> None:
        """Initialize ML manager with runner, scheduler, database, and config schema."""
        self.runner = runner
        self.scheduler = scheduler
        self.database = database
        self.config_schema = config_schema

    async def execute_train(self, request: TrainRequest) -> TrainResponse:
        """Submit a training job to the scheduler and return job/artifact IDs."""
        # Pre-allocate artifact ID for the trained model
        model_artifact_id = ULID()

        # Submit job to scheduler
        job_id = await self.scheduler.add_job(
            self._train_task,
            request,
            model_artifact_id,
        )

        return TrainResponse(
            job_id=str(job_id),
            model_artifact_id=str(model_artifact_id),
            message=f"Training job submitted. Job ID: {job_id}",
        )

    async def execute_predict(self, request: PredictRequest) -> PredictResponse:
        """Submit a prediction job to the scheduler and return job/artifact IDs."""
        # Pre-allocate artifact ID for predictions
        prediction_artifact_id = ULID()

        # Submit job to scheduler
        job_id = await self.scheduler.add_job(
            self._predict_task,
            request,
            prediction_artifact_id,
        )

        return PredictResponse(
            job_id=str(job_id),
            prediction_artifact_id=str(prediction_artifact_id),
            message=f"Prediction job submitted. Job ID: {job_id}",
        )

    async def _train_task(self, request: TrainRequest, model_artifact_id: ULID) -> ULID:
        """Execute training task and store trained model in artifact."""
        # Load config
        async with self.database.session() as session:
            config_repo = ConfigRepository(session)
            config_manager: ConfigManager[BaseConfig] = ConfigManager(config_repo, self.config_schema)
            config = await config_manager.find_by_id(request.config_id)

            if config is None:
                raise ValueError(f"Config {request.config_id} not found")

        # Convert PandasDataFrame to pandas
        data_df = request.data.to_dataframe()

        # Train model with timing
        training_started_at = datetime.datetime.now(datetime.UTC)
        trained_model = await self.runner.on_train(
            config=config.data,
            data=data_df,
            geo=request.geo,
        )
        training_completed_at = datetime.datetime.now(datetime.UTC)
        training_duration = (training_completed_at - training_started_at).total_seconds()

        # Calculate model metrics
        model_type = _extract_model_type(trained_model)
        model_size_bytes = _calculate_model_size(trained_model)

        # Store trained model in artifact with metadata
        async with self.database.session() as session:
            artifact_repo = ArtifactRepository(session)
            artifact_manager = ArtifactManager(artifact_repo)
            config_repo = ConfigRepository(session)

            # Create and validate artifact data with Pydantic
            artifact_data_model = TrainedModelArtifactData(
                ml_type="trained_model",
                config_id=str(request.config_id),
                model=trained_model,
                started_at=training_started_at.isoformat(),
                completed_at=training_completed_at.isoformat(),
                duration_seconds=round(training_duration, 2),
                model_type=model_type,
                model_size_bytes=model_size_bytes,
            )

            await artifact_manager.save(
                ArtifactIn(
                    id=model_artifact_id,
                    data=artifact_data_model.model_dump(),
                    parent_id=None,
                    level=0,
                )
            )

            # Link config to root artifact for tree traversal
            await config_repo.link_artifact(request.config_id, model_artifact_id)
            await config_repo.commit()

        return model_artifact_id

    async def _predict_task(self, request: PredictRequest, prediction_artifact_id: ULID) -> ULID:
        """Execute prediction task and store predictions in artifact."""
        # Load model artifact
        async with self.database.session() as session:
            artifact_repo = ArtifactRepository(session)
            artifact_manager = ArtifactManager(artifact_repo)
            model_artifact = await artifact_manager.find_by_id(request.model_artifact_id)

            if model_artifact is None:
                raise ValueError(f"Model artifact {request.model_artifact_id} not found")

        # Extract model and config_id from artifact
        model_data = model_artifact.data
        if not isinstance(model_data, dict) or model_data.get("ml_type") != "trained_model":
            raise ValueError(f"Artifact {request.model_artifact_id} is not a trained model")

        trained_model = model_data["model"]
        config_id = ULID.from_str(model_data["config_id"])

        # Load config
        async with self.database.session() as session:
            config_repo = ConfigRepository(session)
            config_manager: ConfigManager[BaseConfig] = ConfigManager(config_repo, self.config_schema)
            config = await config_manager.find_by_id(config_id)

            if config is None:
                raise ValueError(f"Config {config_id} not found")

        # Convert PandasDataFrames to pandas
        historic_df = request.historic.to_dataframe()
        future_df = request.future.to_dataframe()

        # Make predictions with timing
        prediction_started_at = datetime.datetime.now(datetime.UTC)
        predictions_df = await self.runner.on_predict(
            config=config.data,
            model=trained_model,
            historic=historic_df,
            future=future_df,
            geo=request.geo,
        )
        prediction_completed_at = datetime.datetime.now(datetime.UTC)
        prediction_duration = (prediction_completed_at - prediction_started_at).total_seconds()

        # Store predictions in artifact with parent linkage
        async with self.database.session() as session:
            artifact_repo = ArtifactRepository(session)
            artifact_manager = ArtifactManager(artifact_repo)

            from chapkit.modules.artifact.schemas import PandasDataFrame

            # Create and validate artifact data with Pydantic
            artifact_data_model = PredictionArtifactData(
                ml_type="prediction",
                model_artifact_id=str(request.model_artifact_id),
                config_id=str(config_id),
                predictions=PandasDataFrame.from_dataframe(predictions_df),
                started_at=prediction_started_at.isoformat(),
                completed_at=prediction_completed_at.isoformat(),
                duration_seconds=round(prediction_duration, 2),
            )

            await artifact_manager.save(
                ArtifactIn(
                    id=prediction_artifact_id,
                    data=artifact_data_model.model_dump(),
                    parent_id=request.model_artifact_id,
                    level=1,
                )
            )

        return prediction_artifact_id

__init__(runner, scheduler, database, config_schema)

Initialize ML manager with runner, scheduler, database, and config schema.

Source code in src/chapkit/modules/ml/manager.py
def __init__(
    self,
    runner: ModelRunnerProtocol,
    scheduler: JobScheduler,
    database: Database,
    config_schema: type[BaseConfig],
) -> None:
    """Initialize ML manager with runner, scheduler, database, and config schema."""
    self.runner = runner
    self.scheduler = scheduler
    self.database = database
    self.config_schema = config_schema

execute_train(request) async

Submit a training job to the scheduler and return job/artifact IDs.

Source code in src/chapkit/modules/ml/manager.py
async def execute_train(self, request: TrainRequest) -> TrainResponse:
    """Submit a training job to the scheduler and return job/artifact IDs."""
    # Pre-allocate artifact ID for the trained model
    model_artifact_id = ULID()

    # Submit job to scheduler
    job_id = await self.scheduler.add_job(
        self._train_task,
        request,
        model_artifact_id,
    )

    return TrainResponse(
        job_id=str(job_id),
        model_artifact_id=str(model_artifact_id),
        message=f"Training job submitted. Job ID: {job_id}",
    )

execute_predict(request) async

Submit a prediction job to the scheduler and return job/artifact IDs.

Source code in src/chapkit/modules/ml/manager.py
async def execute_predict(self, request: PredictRequest) -> PredictResponse:
    """Submit a prediction job to the scheduler and return job/artifact IDs."""
    # Pre-allocate artifact ID for predictions
    prediction_artifact_id = ULID()

    # Submit job to scheduler
    job_id = await self.scheduler.add_job(
        self._predict_task,
        request,
        prediction_artifact_id,
    )

    return PredictResponse(
        job_id=str(job_id),
        prediction_artifact_id=str(prediction_artifact_id),
        message=f"Prediction job submitted. Job ID: {job_id}",
    )

MLRouter

Bases: Router

Router with $train and $predict collection operations.

Source code in src/chapkit/modules/ml/router.py
class MLRouter(Router):
    """Router with $train and $predict collection operations."""

    def __init__(
        self,
        prefix: str,
        tags: list[str],
        manager_factory: Any,
        **kwargs: Any,
    ) -> None:
        """Initialize ML router with manager factory."""
        self.manager_factory = manager_factory
        super().__init__(prefix=prefix, tags=tags, **kwargs)

    def _register_routes(self) -> None:
        """Register ML train and predict routes."""
        from fastapi import HTTPException

        manager_factory = self.manager_factory

        @self.router.post(
            "/$train",
            response_model=TrainResponse,
            status_code=status.HTTP_202_ACCEPTED,
            summary="Train model",
            description="Submit a training job to the scheduler",
        )
        async def train(
            request: TrainRequest,
            manager: MLManager = Depends(manager_factory),
        ) -> TrainResponse:
            """Train a model asynchronously and return job/artifact IDs."""
            try:
                response = await manager.execute_train(request)
                train_counter, _ = _get_counters()
                train_counter.add(1)
                return response
            except ValueError as e:
                raise HTTPException(
                    status_code=status.HTTP_400_BAD_REQUEST,
                    detail=str(e),
                )
            except RuntimeError as e:
                raise HTTPException(
                    status_code=status.HTTP_409_CONFLICT,
                    detail=str(e),
                )

        @self.router.post(
            "/$predict",
            response_model=PredictResponse,
            status_code=status.HTTP_202_ACCEPTED,
            summary="Make predictions",
            description="Submit a prediction job to the scheduler",
        )
        async def predict(
            request: PredictRequest,
            manager: MLManager = Depends(manager_factory),
        ) -> PredictResponse:
            """Make predictions asynchronously and return job/artifact IDs."""
            try:
                response = await manager.execute_predict(request)
                _, predict_counter = _get_counters()
                predict_counter.add(1)
                return response
            except ValueError as e:
                raise HTTPException(
                    status_code=status.HTTP_400_BAD_REQUEST,
                    detail=str(e),
                )
            except RuntimeError as e:
                raise HTTPException(
                    status_code=status.HTTP_409_CONFLICT,
                    detail=str(e),
                )

__init__(prefix, tags, manager_factory, **kwargs)

Initialize ML router with manager factory.

Source code in src/chapkit/modules/ml/router.py
def __init__(
    self,
    prefix: str,
    tags: list[str],
    manager_factory: Any,
    **kwargs: Any,
) -> None:
    """Initialize ML router with manager factory."""
    self.manager_factory = manager_factory
    super().__init__(prefix=prefix, tags=tags, **kwargs)

BaseModelRunner

Bases: ABC

Abstract base class for model runners with lifecycle hooks.

Source code in src/chapkit/modules/ml/runner.py
class BaseModelRunner(ABC):
    """Abstract base class for model runners with lifecycle hooks."""

    async def on_init(self) -> None:
        """Optional initialization hook called before training or prediction."""
        pass

    async def on_cleanup(self) -> None:
        """Optional cleanup hook called after training or prediction."""
        pass

    @abstractmethod
    async def on_train(
        self,
        config: BaseConfig,
        data: pd.DataFrame,
        geo: FeatureCollection | None = None,
    ) -> Any:
        """Train a model and return the trained model object (must be pickleable)."""
        ...

    @abstractmethod
    async def on_predict(
        self,
        config: BaseConfig,
        model: Any,
        historic: pd.DataFrame,
        future: pd.DataFrame,
        geo: FeatureCollection | None = None,
    ) -> pd.DataFrame:
        """Make predictions using a trained model and return predictions as DataFrame."""
        ...

on_init() async

Optional initialization hook called before training or prediction.

Source code in src/chapkit/modules/ml/runner.py
async def on_init(self) -> None:
    """Optional initialization hook called before training or prediction."""
    pass

on_cleanup() async

Optional cleanup hook called after training or prediction.

Source code in src/chapkit/modules/ml/runner.py
async def on_cleanup(self) -> None:
    """Optional cleanup hook called after training or prediction."""
    pass

on_train(config, data, geo=None) abstractmethod async

Train a model and return the trained model object (must be pickleable).

Source code in src/chapkit/modules/ml/runner.py
@abstractmethod
async def on_train(
    self,
    config: BaseConfig,
    data: pd.DataFrame,
    geo: FeatureCollection | None = None,
) -> Any:
    """Train a model and return the trained model object (must be pickleable)."""
    ...

on_predict(config, model, historic, future, geo=None) abstractmethod async

Make predictions using a trained model and return predictions as DataFrame.

Source code in src/chapkit/modules/ml/runner.py
@abstractmethod
async def on_predict(
    self,
    config: BaseConfig,
    model: Any,
    historic: pd.DataFrame,
    future: pd.DataFrame,
    geo: FeatureCollection | None = None,
) -> pd.DataFrame:
    """Make predictions using a trained model and return predictions as DataFrame."""
    ...

FunctionalModelRunner

Bases: BaseModelRunner, Generic[ConfigT]

Functional model runner wrapping train and predict functions.

Source code in src/chapkit/modules/ml/runner.py
class FunctionalModelRunner(BaseModelRunner, Generic[ConfigT]):
    """Functional model runner wrapping train and predict functions."""

    def __init__(
        self,
        on_train: TrainFunction[ConfigT],
        on_predict: PredictFunction[ConfigT],
    ) -> None:
        """Initialize functional runner with train and predict functions."""
        self._on_train = on_train
        self._on_predict = on_predict

    async def on_train(
        self,
        config: BaseConfig,
        data: pd.DataFrame,
        geo: FeatureCollection | None = None,
    ) -> Any:
        """Train a model and return the trained model object."""
        return await self._on_train(config, data, geo)  # type: ignore[arg-type]

    async def on_predict(
        self,
        config: BaseConfig,
        model: Any,
        historic: pd.DataFrame,
        future: pd.DataFrame,
        geo: FeatureCollection | None = None,
    ) -> pd.DataFrame:
        """Make predictions using a trained model."""
        return await self._on_predict(config, model, historic, future, geo)  # type: ignore[arg-type]

__init__(on_train, on_predict)

Initialize functional runner with train and predict functions.

Source code in src/chapkit/modules/ml/runner.py
def __init__(
    self,
    on_train: TrainFunction[ConfigT],
    on_predict: PredictFunction[ConfigT],
) -> None:
    """Initialize functional runner with train and predict functions."""
    self._on_train = on_train
    self._on_predict = on_predict

on_train(config, data, geo=None) async

Train a model and return the trained model object.

Source code in src/chapkit/modules/ml/runner.py
async def on_train(
    self,
    config: BaseConfig,
    data: pd.DataFrame,
    geo: FeatureCollection | None = None,
) -> Any:
    """Train a model and return the trained model object."""
    return await self._on_train(config, data, geo)  # type: ignore[arg-type]

on_predict(config, model, historic, future, geo=None) async

Make predictions using a trained model.

Source code in src/chapkit/modules/ml/runner.py
async def on_predict(
    self,
    config: BaseConfig,
    model: Any,
    historic: pd.DataFrame,
    future: pd.DataFrame,
    geo: FeatureCollection | None = None,
) -> pd.DataFrame:
    """Make predictions using a trained model."""
    return await self._on_predict(config, model, historic, future, geo)  # type: ignore[arg-type]

ShellModelRunner

Bases: BaseModelRunner

Shell-based model runner that executes external scripts for train/predict operations.

Source code in src/chapkit/modules/ml/runner.py
class ShellModelRunner(BaseModelRunner):
    """Shell-based model runner that executes external scripts for train/predict operations."""

    def __init__(
        self,
        train_command: str,
        predict_command: str,
        model_format: str = "pickle",
    ) -> None:
        """Initialize shell runner with command templates for train/predict operations."""
        self.train_command = train_command
        self.predict_command = predict_command
        self.model_format = model_format

    async def on_train(
        self,
        config: BaseConfig,
        data: pd.DataFrame,
        geo: FeatureCollection | None = None,
    ) -> Any:
        """Train a model by executing external training script."""
        temp_dir = Path(tempfile.mkdtemp(prefix="chapkit_ml_train_"))

        try:
            # Write config to JSON file
            config_file = temp_dir / "config.json"
            config_file.write_text(json.dumps(config.model_dump(), indent=2))

            # Write training data to CSV
            data_file = temp_dir / "data.csv"
            data.to_csv(data_file, index=False)

            # Write geo data if provided
            geo_file = temp_dir / "geo.json" if geo else None
            if geo:
                assert geo_file is not None  # For type checker
                geo_file.write_text(geo.model_dump_json(indent=2))

            # Model file path
            model_file = temp_dir / f"model.{self.model_format}"

            # Substitute variables in command
            command = self.train_command.format(
                config_file=str(config_file),
                data_file=str(data_file),
                model_file=str(model_file),
                geo_file=str(geo_file) if geo_file else "",
            )

            logger.info("executing_train_script", command=command, temp_dir=str(temp_dir))

            # Execute subprocess
            process = await asyncio.create_subprocess_shell(
                command,
                stdout=asyncio.subprocess.PIPE,
                stderr=asyncio.subprocess.PIPE,
                cwd=str(temp_dir),
            )

            stdout_bytes, stderr_bytes = await process.communicate()
            stdout = stdout_bytes.decode("utf-8") if stdout_bytes else ""
            stderr = stderr_bytes.decode("utf-8") if stderr_bytes else ""

            if process.returncode != 0:
                logger.error("train_script_failed", exit_code=process.returncode, stderr=stderr)
                raise RuntimeError(f"Training script failed with exit code {process.returncode}: {stderr}")

            logger.info("train_script_completed", stdout=stdout[:500], stderr=stderr[:500])

            # Load trained model from file
            if not model_file.exists():
                raise RuntimeError(f"Training script did not create model file at {model_file}")

            with open(model_file, "rb") as f:
                model = pickle.load(f)

            return model

        finally:
            # Cleanup temp files
            import shutil

            shutil.rmtree(temp_dir, ignore_errors=True)

    async def on_predict(
        self,
        config: BaseConfig,
        model: Any,
        historic: pd.DataFrame,
        future: pd.DataFrame,
        geo: FeatureCollection | None = None,
    ) -> pd.DataFrame:
        """Make predictions by executing external prediction script."""
        temp_dir = Path(tempfile.mkdtemp(prefix="chapkit_ml_predict_"))

        try:
            # Write config to JSON file
            config_file = temp_dir / "config.json"
            config_file.write_text(json.dumps(config.model_dump(), indent=2))

            # Write model to file
            model_file = temp_dir / f"model.{self.model_format}"
            with open(model_file, "wb") as f:
                pickle.dump(model, f)

            # Write historic data
            historic_file = temp_dir / "historic.csv"
            historic.to_csv(historic_file, index=False)

            # Write future data to CSV
            future_file = temp_dir / "future.csv"
            future.to_csv(future_file, index=False)

            # Write geo data if provided
            geo_file = temp_dir / "geo.json" if geo else None
            if geo:
                assert geo_file is not None  # For type checker
                geo_file.write_text(geo.model_dump_json(indent=2))

            # Output file path
            output_file = temp_dir / "predictions.csv"

            # Substitute variables in command
            command = self.predict_command.format(
                config_file=str(config_file),
                model_file=str(model_file),
                historic_file=str(historic_file),
                future_file=str(future_file),
                output_file=str(output_file),
                geo_file=str(geo_file) if geo_file else "",
            )

            logger.info("executing_predict_script", command=command, temp_dir=str(temp_dir))

            # Execute subprocess
            process = await asyncio.create_subprocess_shell(
                command,
                stdout=asyncio.subprocess.PIPE,
                stderr=asyncio.subprocess.PIPE,
                cwd=str(temp_dir),
            )

            stdout_bytes, stderr_bytes = await process.communicate()
            stdout = stdout_bytes.decode("utf-8") if stdout_bytes else ""
            stderr = stderr_bytes.decode("utf-8") if stderr_bytes else ""

            if process.returncode != 0:
                logger.error("predict_script_failed", exit_code=process.returncode, stderr=stderr)
                raise RuntimeError(f"Prediction script failed with exit code {process.returncode}: {stderr}")

            logger.info("predict_script_completed", stdout=stdout[:500], stderr=stderr[:500])

            # Load predictions from file
            if not output_file.exists():
                raise RuntimeError(f"Prediction script did not create output file at {output_file}")

            predictions = pd.read_csv(output_file)
            return predictions

        finally:
            # Cleanup temp files
            import shutil

            shutil.rmtree(temp_dir, ignore_errors=True)

__init__(train_command, predict_command, model_format='pickle')

Initialize shell runner with command templates for train/predict operations.

Source code in src/chapkit/modules/ml/runner.py
def __init__(
    self,
    train_command: str,
    predict_command: str,
    model_format: str = "pickle",
) -> None:
    """Initialize shell runner with command templates for train/predict operations."""
    self.train_command = train_command
    self.predict_command = predict_command
    self.model_format = model_format

on_train(config, data, geo=None) async

Train a model by executing external training script.

Source code in src/chapkit/modules/ml/runner.py
async def on_train(
    self,
    config: BaseConfig,
    data: pd.DataFrame,
    geo: FeatureCollection | None = None,
) -> Any:
    """Train a model by executing external training script."""
    temp_dir = Path(tempfile.mkdtemp(prefix="chapkit_ml_train_"))

    try:
        # Write config to JSON file
        config_file = temp_dir / "config.json"
        config_file.write_text(json.dumps(config.model_dump(), indent=2))

        # Write training data to CSV
        data_file = temp_dir / "data.csv"
        data.to_csv(data_file, index=False)

        # Write geo data if provided
        geo_file = temp_dir / "geo.json" if geo else None
        if geo:
            assert geo_file is not None  # For type checker
            geo_file.write_text(geo.model_dump_json(indent=2))

        # Model file path
        model_file = temp_dir / f"model.{self.model_format}"

        # Substitute variables in command
        command = self.train_command.format(
            config_file=str(config_file),
            data_file=str(data_file),
            model_file=str(model_file),
            geo_file=str(geo_file) if geo_file else "",
        )

        logger.info("executing_train_script", command=command, temp_dir=str(temp_dir))

        # Execute subprocess
        process = await asyncio.create_subprocess_shell(
            command,
            stdout=asyncio.subprocess.PIPE,
            stderr=asyncio.subprocess.PIPE,
            cwd=str(temp_dir),
        )

        stdout_bytes, stderr_bytes = await process.communicate()
        stdout = stdout_bytes.decode("utf-8") if stdout_bytes else ""
        stderr = stderr_bytes.decode("utf-8") if stderr_bytes else ""

        if process.returncode != 0:
            logger.error("train_script_failed", exit_code=process.returncode, stderr=stderr)
            raise RuntimeError(f"Training script failed with exit code {process.returncode}: {stderr}")

        logger.info("train_script_completed", stdout=stdout[:500], stderr=stderr[:500])

        # Load trained model from file
        if not model_file.exists():
            raise RuntimeError(f"Training script did not create model file at {model_file}")

        with open(model_file, "rb") as f:
            model = pickle.load(f)

        return model

    finally:
        # Cleanup temp files
        import shutil

        shutil.rmtree(temp_dir, ignore_errors=True)

on_predict(config, model, historic, future, geo=None) async

Make predictions by executing external prediction script.

Source code in src/chapkit/modules/ml/runner.py
async def on_predict(
    self,
    config: BaseConfig,
    model: Any,
    historic: pd.DataFrame,
    future: pd.DataFrame,
    geo: FeatureCollection | None = None,
) -> pd.DataFrame:
    """Make predictions by executing external prediction script."""
    temp_dir = Path(tempfile.mkdtemp(prefix="chapkit_ml_predict_"))

    try:
        # Write config to JSON file
        config_file = temp_dir / "config.json"
        config_file.write_text(json.dumps(config.model_dump(), indent=2))

        # Write model to file
        model_file = temp_dir / f"model.{self.model_format}"
        with open(model_file, "wb") as f:
            pickle.dump(model, f)

        # Write historic data
        historic_file = temp_dir / "historic.csv"
        historic.to_csv(historic_file, index=False)

        # Write future data to CSV
        future_file = temp_dir / "future.csv"
        future.to_csv(future_file, index=False)

        # Write geo data if provided
        geo_file = temp_dir / "geo.json" if geo else None
        if geo:
            assert geo_file is not None  # For type checker
            geo_file.write_text(geo.model_dump_json(indent=2))

        # Output file path
        output_file = temp_dir / "predictions.csv"

        # Substitute variables in command
        command = self.predict_command.format(
            config_file=str(config_file),
            model_file=str(model_file),
            historic_file=str(historic_file),
            future_file=str(future_file),
            output_file=str(output_file),
            geo_file=str(geo_file) if geo_file else "",
        )

        logger.info("executing_predict_script", command=command, temp_dir=str(temp_dir))

        # Execute subprocess
        process = await asyncio.create_subprocess_shell(
            command,
            stdout=asyncio.subprocess.PIPE,
            stderr=asyncio.subprocess.PIPE,
            cwd=str(temp_dir),
        )

        stdout_bytes, stderr_bytes = await process.communicate()
        stdout = stdout_bytes.decode("utf-8") if stdout_bytes else ""
        stderr = stderr_bytes.decode("utf-8") if stderr_bytes else ""

        if process.returncode != 0:
            logger.error("predict_script_failed", exit_code=process.returncode, stderr=stderr)
            raise RuntimeError(f"Prediction script failed with exit code {process.returncode}: {stderr}")

        logger.info("predict_script_completed", stdout=stdout[:500], stderr=stderr[:500])

        # Load predictions from file
        if not output_file.exists():
            raise RuntimeError(f"Prediction script did not create output file at {output_file}")

        predictions = pd.read_csv(output_file)
        return predictions

    finally:
        # Cleanup temp files
        import shutil

        shutil.rmtree(temp_dir, ignore_errors=True)

ModelRunnerProtocol

Bases: Protocol

Protocol defining the interface for model runners.

Source code in src/chapkit/modules/ml/schemas.py
class ModelRunnerProtocol(Protocol):
    """Protocol defining the interface for model runners."""

    async def on_train(
        self,
        config: BaseConfig,
        data: pd.DataFrame,
        geo: FeatureCollection | None = None,
    ) -> Any:
        """Train a model and return the trained model object (must be pickleable)."""
        ...

    async def on_predict(
        self,
        config: BaseConfig,
        model: Any,
        historic: pd.DataFrame,
        future: pd.DataFrame,
        geo: FeatureCollection | None = None,
    ) -> pd.DataFrame:
        """Make predictions using a trained model and return predictions as DataFrame."""
        ...

on_train(config, data, geo=None) async

Train a model and return the trained model object (must be pickleable).

Source code in src/chapkit/modules/ml/schemas.py
async def on_train(
    self,
    config: BaseConfig,
    data: pd.DataFrame,
    geo: FeatureCollection | None = None,
) -> Any:
    """Train a model and return the trained model object (must be pickleable)."""
    ...

on_predict(config, model, historic, future, geo=None) async

Make predictions using a trained model and return predictions as DataFrame.

Source code in src/chapkit/modules/ml/schemas.py
async def on_predict(
    self,
    config: BaseConfig,
    model: Any,
    historic: pd.DataFrame,
    future: pd.DataFrame,
    geo: FeatureCollection | None = None,
) -> pd.DataFrame:
    """Make predictions using a trained model and return predictions as DataFrame."""
    ...

PredictionArtifactData

Bases: BaseModel

Schema for prediction artifact data stored in the artifact system.

Source code in src/chapkit/modules/ml/schemas.py
class PredictionArtifactData(BaseModel):
    """Schema for prediction artifact data stored in the artifact system."""

    ml_type: Literal["prediction"] = Field(description="Artifact type identifier")
    config_id: str = Field(description="ID of the config used for prediction")
    model_artifact_id: str = Field(description="ID of the trained model artifact used for prediction")
    started_at: str = Field(description="ISO format timestamp when operation started")
    completed_at: str = Field(description="ISO format timestamp when operation completed")
    duration_seconds: float = Field(description="Operation duration in seconds (rounded to 2 decimals)")
    predictions: PandasDataFrame = Field(description="Prediction results as structured DataFrame")

PredictRequest

Bases: BaseModel

Request schema for making predictions.

Source code in src/chapkit/modules/ml/schemas.py
class PredictRequest(BaseModel):
    """Request schema for making predictions."""

    model_artifact_id: ULID = Field(description="ID of the artifact containing the trained model")
    historic: PandasDataFrame = Field(description="Historic data as pandas DataFrame")
    future: PandasDataFrame = Field(description="Future/prediction data as pandas DataFrame")
    geo: FeatureCollection | None = Field(default=None, description="Optional geospatial data")

PredictResponse

Bases: BaseModel

Response schema for predict operation submission.

Source code in src/chapkit/modules/ml/schemas.py
class PredictResponse(BaseModel):
    """Response schema for predict operation submission."""

    job_id: str = Field(description="ID of the prediction job in the scheduler")
    prediction_artifact_id: str = Field(description="ID that will contain the prediction artifact")
    message: str = Field(description="Human-readable message")

TrainedModelArtifactData

Bases: BaseModel

Schema for trained model artifact data stored in the artifact system.

Source code in src/chapkit/modules/ml/schemas.py
class TrainedModelArtifactData(BaseModel):
    """Schema for trained model artifact data stored in the artifact system."""

    ml_type: Literal["trained_model"] = Field(description="Artifact type identifier")
    config_id: str = Field(description="ID of the config used for training")
    started_at: str = Field(description="ISO format timestamp when operation started")
    completed_at: str = Field(description="ISO format timestamp when operation completed")
    duration_seconds: float = Field(description="Operation duration in seconds (rounded to 2 decimals)")
    model: Any = Field(description="The trained model object (must be pickleable)")
    model_type: str | None = Field(default=None, description="Fully qualified class name of the model")
    model_size_bytes: int | None = Field(default=None, description="Serialized pickle size of the model in bytes")

    model_config = {"arbitrary_types_allowed": True}

TrainRequest

Bases: BaseModel

Request schema for training a model.

Source code in src/chapkit/modules/ml/schemas.py
class TrainRequest(BaseModel):
    """Request schema for training a model."""

    config_id: ULID = Field(description="ID of the config to use for training")
    data: PandasDataFrame = Field(description="Training data as pandas DataFrame")
    geo: FeatureCollection | None = Field(default=None, description="Optional geospatial data")

TrainResponse

Bases: BaseModel

Response schema for train operation submission.

Source code in src/chapkit/modules/ml/schemas.py
class TrainResponse(BaseModel):
    """Response schema for train operation submission."""

    job_id: str = Field(description="ID of the training job in the scheduler")
    model_artifact_id: str = Field(description="ID that will contain the trained model artifact")
    message: str = Field(description="Human-readable message")