Skip to content

API Reference

Complete API documentation for all servicekit modules, classes, and functions.

Core Infrastructure

Framework-agnostic infrastructure components.

Database

database

Async SQLAlchemy database connection manager.

Classes

Database

Generic async SQLAlchemy database connection manager.

Source code in src/servicekit/database.py
class Database:
    """Generic async SQLAlchemy database connection manager."""

    def __init__(
        self,
        url: str,
        *,
        echo: bool = False,
        alembic_dir: Path | None = None,
        auto_migrate: bool = True,
        pool_size: int = 5,
        max_overflow: int = 10,
        pool_recycle: int = 3600,
        pool_pre_ping: bool = True,
    ) -> None:
        """Initialize database with connection URL and pool configuration."""
        self.url = url
        self.alembic_dir = alembic_dir
        self.auto_migrate = auto_migrate

        # Build engine kwargs - skip pool params for in-memory SQLite databases
        engine_kwargs: dict = {"echo": echo, "future": True}
        if ":memory:" not in url:
            # Only add pool params for non-in-memory databases
            engine_kwargs.update(
                {
                    "pool_size": pool_size,
                    "max_overflow": max_overflow,
                    "pool_recycle": pool_recycle,
                    "pool_pre_ping": pool_pre_ping,
                }
            )

        self.engine: AsyncEngine = create_async_engine(url, **engine_kwargs)
        self._session_factory: async_sessionmaker[AsyncSession] = async_sessionmaker(
            bind=self.engine, class_=AsyncSession, expire_on_commit=False
        )

    async def init(self) -> None:
        """Initialize database tables using Alembic migrations or direct creation."""
        import asyncio

        # Import Base here to avoid circular import at module level
        from servicekit.models import Base

        # For databases without migrations, use direct table creation
        if not self.auto_migrate:
            async with self.engine.begin() as conn:
                await conn.run_sync(Base.metadata.create_all)
        else:
            # Use Alembic migrations
            alembic_cfg = Config()

            # Use custom alembic directory if provided, otherwise use bundled migrations
            if self.alembic_dir is not None:
                alembic_cfg.set_main_option("script_location", str(self.alembic_dir))
            else:
                alembic_cfg.set_main_option("script_location", str(Path(__file__).parent.parent.parent / "alembic"))

            alembic_cfg.set_main_option("sqlalchemy.url", self.url)

            # Run upgrade in executor to avoid event loop conflicts
            loop = asyncio.get_running_loop()
            await loop.run_in_executor(None, command.upgrade, alembic_cfg, "head")

    @asynccontextmanager
    async def session(self) -> AsyncIterator[AsyncSession]:
        """Create a database session context manager."""
        async with self._session_factory() as s:
            yield s

    async def dispose(self) -> None:
        """Dispose of database engine and connection pool."""
        await self.engine.dispose()
Functions
__init__(url, *, echo=False, alembic_dir=None, auto_migrate=True, pool_size=5, max_overflow=10, pool_recycle=3600, pool_pre_ping=True)

Initialize database with connection URL and pool configuration.

Source code in src/servicekit/database.py
def __init__(
    self,
    url: str,
    *,
    echo: bool = False,
    alembic_dir: Path | None = None,
    auto_migrate: bool = True,
    pool_size: int = 5,
    max_overflow: int = 10,
    pool_recycle: int = 3600,
    pool_pre_ping: bool = True,
) -> None:
    """Initialize database with connection URL and pool configuration."""
    self.url = url
    self.alembic_dir = alembic_dir
    self.auto_migrate = auto_migrate

    # Build engine kwargs - skip pool params for in-memory SQLite databases
    engine_kwargs: dict = {"echo": echo, "future": True}
    if ":memory:" not in url:
        # Only add pool params for non-in-memory databases
        engine_kwargs.update(
            {
                "pool_size": pool_size,
                "max_overflow": max_overflow,
                "pool_recycle": pool_recycle,
                "pool_pre_ping": pool_pre_ping,
            }
        )

    self.engine: AsyncEngine = create_async_engine(url, **engine_kwargs)
    self._session_factory: async_sessionmaker[AsyncSession] = async_sessionmaker(
        bind=self.engine, class_=AsyncSession, expire_on_commit=False
    )
init() async

Initialize database tables using Alembic migrations or direct creation.

Source code in src/servicekit/database.py
async def init(self) -> None:
    """Initialize database tables using Alembic migrations or direct creation."""
    import asyncio

    # Import Base here to avoid circular import at module level
    from servicekit.models import Base

    # For databases without migrations, use direct table creation
    if not self.auto_migrate:
        async with self.engine.begin() as conn:
            await conn.run_sync(Base.metadata.create_all)
    else:
        # Use Alembic migrations
        alembic_cfg = Config()

        # Use custom alembic directory if provided, otherwise use bundled migrations
        if self.alembic_dir is not None:
            alembic_cfg.set_main_option("script_location", str(self.alembic_dir))
        else:
            alembic_cfg.set_main_option("script_location", str(Path(__file__).parent.parent.parent / "alembic"))

        alembic_cfg.set_main_option("sqlalchemy.url", self.url)

        # Run upgrade in executor to avoid event loop conflicts
        loop = asyncio.get_running_loop()
        await loop.run_in_executor(None, command.upgrade, alembic_cfg, "head")
session() async

Create a database session context manager.

Source code in src/servicekit/database.py
@asynccontextmanager
async def session(self) -> AsyncIterator[AsyncSession]:
    """Create a database session context manager."""
    async with self._session_factory() as s:
        yield s
dispose() async

Dispose of database engine and connection pool.

Source code in src/servicekit/database.py
async def dispose(self) -> None:
    """Dispose of database engine and connection pool."""
    await self.engine.dispose()

SqliteDatabase

Bases: Database

SQLite-specific database implementation with optimizations.

Source code in src/servicekit/database.py
class SqliteDatabase(Database):
    """SQLite-specific database implementation with optimizations."""

    def __init__(
        self,
        url: str,
        *,
        echo: bool = False,
        alembic_dir: Path | None = None,
        auto_migrate: bool = True,
        pool_size: int = 5,
        max_overflow: int = 10,
        pool_recycle: int = 3600,
        pool_pre_ping: bool = True,
    ) -> None:
        """Initialize SQLite database with connection URL and pool configuration."""
        self.url = url
        self.alembic_dir = alembic_dir
        self.auto_migrate = auto_migrate

        # Build engine kwargs - pool params only for non-in-memory databases
        engine_kwargs: dict = {"echo": echo, "future": True}
        if not self._is_in_memory_url(url):
            # File-based databases can use pool configuration
            engine_kwargs.update(
                {
                    "pool_size": pool_size,
                    "max_overflow": max_overflow,
                    "pool_recycle": pool_recycle,
                    "pool_pre_ping": pool_pre_ping,
                }
            )

        self.engine: AsyncEngine = create_async_engine(url, **engine_kwargs)
        _install_sqlite_connect_pragmas(self.engine)
        self._session_factory: async_sessionmaker[AsyncSession] = async_sessionmaker(
            bind=self.engine, class_=AsyncSession, expire_on_commit=False
        )

    @staticmethod
    def _is_in_memory_url(url: str) -> bool:
        """Check if URL represents an in-memory database."""
        return ":memory:" in url

    def is_in_memory(self) -> bool:
        """Check if this is an in-memory database."""
        return self._is_in_memory_url(self.url)

    async def init(self) -> None:
        """Initialize database tables and configure SQLite using Alembic migrations."""
        # Import Base here to avoid circular import at module level
        from servicekit.models import Base

        # Set WAL mode first (if not in-memory)
        if not self.is_in_memory():
            async with self.engine.begin() as conn:
                await conn.exec_driver_sql("PRAGMA journal_mode=WAL;")

        # For in-memory databases or when migrations are disabled, use direct table creation
        if self.is_in_memory() or not self.auto_migrate:
            async with self.engine.begin() as conn:
                await conn.run_sync(Base.metadata.create_all)
        else:
            # For file-based databases, use Alembic migrations
            await super().init()
Functions
__init__(url, *, echo=False, alembic_dir=None, auto_migrate=True, pool_size=5, max_overflow=10, pool_recycle=3600, pool_pre_ping=True)

Initialize SQLite database with connection URL and pool configuration.

Source code in src/servicekit/database.py
def __init__(
    self,
    url: str,
    *,
    echo: bool = False,
    alembic_dir: Path | None = None,
    auto_migrate: bool = True,
    pool_size: int = 5,
    max_overflow: int = 10,
    pool_recycle: int = 3600,
    pool_pre_ping: bool = True,
) -> None:
    """Initialize SQLite database with connection URL and pool configuration."""
    self.url = url
    self.alembic_dir = alembic_dir
    self.auto_migrate = auto_migrate

    # Build engine kwargs - pool params only for non-in-memory databases
    engine_kwargs: dict = {"echo": echo, "future": True}
    if not self._is_in_memory_url(url):
        # File-based databases can use pool configuration
        engine_kwargs.update(
            {
                "pool_size": pool_size,
                "max_overflow": max_overflow,
                "pool_recycle": pool_recycle,
                "pool_pre_ping": pool_pre_ping,
            }
        )

    self.engine: AsyncEngine = create_async_engine(url, **engine_kwargs)
    _install_sqlite_connect_pragmas(self.engine)
    self._session_factory: async_sessionmaker[AsyncSession] = async_sessionmaker(
        bind=self.engine, class_=AsyncSession, expire_on_commit=False
    )
is_in_memory()

Check if this is an in-memory database.

Source code in src/servicekit/database.py
def is_in_memory(self) -> bool:
    """Check if this is an in-memory database."""
    return self._is_in_memory_url(self.url)
init() async

Initialize database tables and configure SQLite using Alembic migrations.

Source code in src/servicekit/database.py
async def init(self) -> None:
    """Initialize database tables and configure SQLite using Alembic migrations."""
    # Import Base here to avoid circular import at module level
    from servicekit.models import Base

    # Set WAL mode first (if not in-memory)
    if not self.is_in_memory():
        async with self.engine.begin() as conn:
            await conn.exec_driver_sql("PRAGMA journal_mode=WAL;")

    # For in-memory databases or when migrations are disabled, use direct table creation
    if self.is_in_memory() or not self.auto_migrate:
        async with self.engine.begin() as conn:
            await conn.run_sync(Base.metadata.create_all)
    else:
        # For file-based databases, use Alembic migrations
        await super().init()

SqliteDatabaseBuilder

Builder for SQLite database configuration with fluent API.

Source code in src/servicekit/database.py
class SqliteDatabaseBuilder:
    """Builder for SQLite database configuration with fluent API."""

    def __init__(self) -> None:
        """Initialize builder with default values."""
        self._url: str = ""
        self._echo: bool = False
        self._alembic_dir: Path | None = None
        self._auto_migrate: bool = True
        self._pool_size: int = 5
        self._max_overflow: int = 10
        self._pool_recycle: int = 3600
        self._pool_pre_ping: bool = True

    @classmethod
    def in_memory(cls) -> Self:
        """Create an in-memory SQLite database configuration."""
        builder = cls()
        builder._url = "sqlite+aiosqlite:///:memory:"
        return builder

    @classmethod
    def from_file(cls, path: str | Path) -> Self:
        """Create a file-based SQLite database configuration."""
        builder = cls()
        if isinstance(path, Path):
            path = str(path)
        builder._url = f"sqlite+aiosqlite:///{path}"
        return builder

    def with_echo(self, enabled: bool = True) -> Self:
        """Enable SQL query logging."""
        self._echo = enabled
        return self

    def with_migrations(self, enabled: bool = True, alembic_dir: Path | None = None) -> Self:
        """Configure migration behavior."""
        self._auto_migrate = enabled
        self._alembic_dir = alembic_dir
        return self

    def with_pool(
        self,
        size: int = 5,
        max_overflow: int = 10,
        recycle: int = 3600,
        pre_ping: bool = True,
    ) -> Self:
        """Configure connection pool settings."""
        self._pool_size = size
        self._max_overflow = max_overflow
        self._pool_recycle = recycle
        self._pool_pre_ping = pre_ping
        return self

    def build(self) -> SqliteDatabase:
        """Build and return configured SqliteDatabase instance."""
        if not self._url:
            raise ValueError("Database URL not configured. Use .in_memory() or .from_file()")

        return SqliteDatabase(
            url=self._url,
            echo=self._echo,
            alembic_dir=self._alembic_dir,
            auto_migrate=self._auto_migrate,
            pool_size=self._pool_size,
            max_overflow=self._max_overflow,
            pool_recycle=self._pool_recycle,
            pool_pre_ping=self._pool_pre_ping,
        )
Functions
__init__()

Initialize builder with default values.

Source code in src/servicekit/database.py
def __init__(self) -> None:
    """Initialize builder with default values."""
    self._url: str = ""
    self._echo: bool = False
    self._alembic_dir: Path | None = None
    self._auto_migrate: bool = True
    self._pool_size: int = 5
    self._max_overflow: int = 10
    self._pool_recycle: int = 3600
    self._pool_pre_ping: bool = True
in_memory() classmethod

Create an in-memory SQLite database configuration.

Source code in src/servicekit/database.py
@classmethod
def in_memory(cls) -> Self:
    """Create an in-memory SQLite database configuration."""
    builder = cls()
    builder._url = "sqlite+aiosqlite:///:memory:"
    return builder
from_file(path) classmethod

Create a file-based SQLite database configuration.

Source code in src/servicekit/database.py
@classmethod
def from_file(cls, path: str | Path) -> Self:
    """Create a file-based SQLite database configuration."""
    builder = cls()
    if isinstance(path, Path):
        path = str(path)
    builder._url = f"sqlite+aiosqlite:///{path}"
    return builder
with_echo(enabled=True)

Enable SQL query logging.

Source code in src/servicekit/database.py
def with_echo(self, enabled: bool = True) -> Self:
    """Enable SQL query logging."""
    self._echo = enabled
    return self
with_migrations(enabled=True, alembic_dir=None)

Configure migration behavior.

Source code in src/servicekit/database.py
def with_migrations(self, enabled: bool = True, alembic_dir: Path | None = None) -> Self:
    """Configure migration behavior."""
    self._auto_migrate = enabled
    self._alembic_dir = alembic_dir
    return self
with_pool(size=5, max_overflow=10, recycle=3600, pre_ping=True)

Configure connection pool settings.

Source code in src/servicekit/database.py
def with_pool(
    self,
    size: int = 5,
    max_overflow: int = 10,
    recycle: int = 3600,
    pre_ping: bool = True,
) -> Self:
    """Configure connection pool settings."""
    self._pool_size = size
    self._max_overflow = max_overflow
    self._pool_recycle = recycle
    self._pool_pre_ping = pre_ping
    return self
build()

Build and return configured SqliteDatabase instance.

Source code in src/servicekit/database.py
def build(self) -> SqliteDatabase:
    """Build and return configured SqliteDatabase instance."""
    if not self._url:
        raise ValueError("Database URL not configured. Use .in_memory() or .from_file()")

    return SqliteDatabase(
        url=self._url,
        echo=self._echo,
        alembic_dir=self._alembic_dir,
        auto_migrate=self._auto_migrate,
        pool_size=self._pool_size,
        max_overflow=self._max_overflow,
        pool_recycle=self._pool_recycle,
        pool_pre_ping=self._pool_pre_ping,
    )

Models

models

Base ORM classes for SQLAlchemy models.

Classes

Base

Bases: AsyncAttrs, DeclarativeBase

Root declarative base with async support.

Source code in src/servicekit/models.py
class Base(AsyncAttrs, DeclarativeBase):
    """Root declarative base with async support."""

Entity

Bases: Base

Optional base with common columns for your models.

Source code in src/servicekit/models.py
class Entity(Base):
    """Optional base with common columns for your models."""

    __abstract__ = True

    id: Mapped[ULID] = mapped_column(ULIDType, primary_key=True, default=ULID)
    created_at: Mapped[datetime.datetime] = mapped_column(server_default=func.now())
    updated_at: Mapped[datetime.datetime] = mapped_column(server_default=func.now(), onupdate=func.now())

Repository

repository

Base repository classes for data access layer.

Classes

Repository

Bases: ABC

Abstract repository interface for data access operations.

Source code in src/servicekit/repository.py
class Repository[T, IdT = ULID](ABC):
    """Abstract repository interface for data access operations."""

    @abstractmethod
    async def save(self, entity: T) -> T:
        """Save an entity to the database."""
        ...

    @abstractmethod
    async def save_all(self, entities: Iterable[T]) -> Sequence[T]:
        """Save multiple entities to the database."""
        ...

    @abstractmethod
    async def commit(self) -> None:
        """Commit the current database transaction."""
        ...

    @abstractmethod
    async def refresh_many(self, entities: Iterable[T]) -> None:
        """Refresh multiple entities from the database."""
        ...

    @abstractmethod
    async def delete(self, entity: T) -> None:
        """Delete an entity from the database."""
        ...

    @abstractmethod
    async def delete_by_id(self, id: IdT) -> None:
        """Delete an entity by its ID."""
        ...

    @abstractmethod
    async def delete_all(self) -> None:
        """Delete all entities from the database."""
        ...

    @abstractmethod
    async def delete_all_by_id(self, ids: Sequence[IdT]) -> None:
        """Delete multiple entities by their IDs."""
        ...

    @abstractmethod
    async def count(self) -> int:
        """Count the number of entities."""
        ...

    @abstractmethod
    async def exists_by_id(self, id: IdT) -> bool:
        """Check if an entity exists by its ID."""
        ...

    @abstractmethod
    async def find_all(self) -> Sequence[T]:
        """Find all entities."""
        ...

    @abstractmethod
    async def find_all_paginated(self, offset: int, limit: int) -> Sequence[T]:
        """Find entities with pagination."""
        ...

    @abstractmethod
    async def find_all_by_id(self, ids: Sequence[IdT]) -> Sequence[T]:
        """Find entities by their IDs."""
        ...

    @abstractmethod
    async def find_by_id(self, id: IdT) -> T | None:
        """Find an entity by its ID."""
        ...
Functions
save(entity) abstractmethod async

Save an entity to the database.

Source code in src/servicekit/repository.py
@abstractmethod
async def save(self, entity: T) -> T:
    """Save an entity to the database."""
    ...
save_all(entities) abstractmethod async

Save multiple entities to the database.

Source code in src/servicekit/repository.py
@abstractmethod
async def save_all(self, entities: Iterable[T]) -> Sequence[T]:
    """Save multiple entities to the database."""
    ...
commit() abstractmethod async

Commit the current database transaction.

Source code in src/servicekit/repository.py
@abstractmethod
async def commit(self) -> None:
    """Commit the current database transaction."""
    ...
refresh_many(entities) abstractmethod async

Refresh multiple entities from the database.

Source code in src/servicekit/repository.py
@abstractmethod
async def refresh_many(self, entities: Iterable[T]) -> None:
    """Refresh multiple entities from the database."""
    ...
delete(entity) abstractmethod async

Delete an entity from the database.

Source code in src/servicekit/repository.py
@abstractmethod
async def delete(self, entity: T) -> None:
    """Delete an entity from the database."""
    ...
delete_by_id(id) abstractmethod async

Delete an entity by its ID.

Source code in src/servicekit/repository.py
@abstractmethod
async def delete_by_id(self, id: IdT) -> None:
    """Delete an entity by its ID."""
    ...
delete_all() abstractmethod async

Delete all entities from the database.

Source code in src/servicekit/repository.py
@abstractmethod
async def delete_all(self) -> None:
    """Delete all entities from the database."""
    ...
delete_all_by_id(ids) abstractmethod async

Delete multiple entities by their IDs.

Source code in src/servicekit/repository.py
@abstractmethod
async def delete_all_by_id(self, ids: Sequence[IdT]) -> None:
    """Delete multiple entities by their IDs."""
    ...
count() abstractmethod async

Count the number of entities.

Source code in src/servicekit/repository.py
@abstractmethod
async def count(self) -> int:
    """Count the number of entities."""
    ...
exists_by_id(id) abstractmethod async

Check if an entity exists by its ID.

Source code in src/servicekit/repository.py
@abstractmethod
async def exists_by_id(self, id: IdT) -> bool:
    """Check if an entity exists by its ID."""
    ...
find_all() abstractmethod async

Find all entities.

Source code in src/servicekit/repository.py
@abstractmethod
async def find_all(self) -> Sequence[T]:
    """Find all entities."""
    ...
find_all_paginated(offset, limit) abstractmethod async

Find entities with pagination.

Source code in src/servicekit/repository.py
@abstractmethod
async def find_all_paginated(self, offset: int, limit: int) -> Sequence[T]:
    """Find entities with pagination."""
    ...
find_all_by_id(ids) abstractmethod async

Find entities by their IDs.

Source code in src/servicekit/repository.py
@abstractmethod
async def find_all_by_id(self, ids: Sequence[IdT]) -> Sequence[T]:
    """Find entities by their IDs."""
    ...
find_by_id(id) abstractmethod async

Find an entity by its ID.

Source code in src/servicekit/repository.py
@abstractmethod
async def find_by_id(self, id: IdT) -> T | None:
    """Find an entity by its ID."""
    ...

BaseRepository

Bases: Repository[T, IdT]

Base repository implementation with common CRUD operations.

Source code in src/servicekit/repository.py
class BaseRepository[T, IdT = ULID](Repository[T, IdT]):
    """Base repository implementation with common CRUD operations."""

    def __init__(self, session: AsyncSession, model: type[T]) -> None:
        """Initialize repository with database session and model type."""
        self.s = session
        self.model = model

    # ---------- Create ----------
    async def save(self, entity: T) -> T:
        """Save an entity to the database."""
        self.s.add(entity)
        return entity

    async def save_all(self, entities: Iterable[T]) -> Sequence[T]:
        """Save multiple entities to the database."""
        entity_list = list(entities)
        self.s.add_all(entity_list)
        return entity_list

    async def commit(self) -> None:
        """Commit the current database transaction."""
        await self.s.commit()

    async def refresh_many(self, entities: Iterable[T]) -> None:
        """Refresh multiple entities from the database."""
        for e in entities:
            await self.s.refresh(e)

    # ---------- Delete ----------
    async def delete(self, entity: T) -> None:
        """Delete an entity from the database."""
        await self.s.delete(entity)

    async def delete_by_id(self, id: IdT) -> None:
        """Delete an entity by its ID."""
        id_col = getattr(self.model, "id")
        await self.s.execute(delete(self.model).where(id_col == id))

    async def delete_all(self) -> None:
        """Delete all entities from the database."""
        await self.s.execute(delete(self.model))

    async def delete_all_by_id(self, ids: Sequence[IdT]) -> None:
        """Delete multiple entities by their IDs."""
        if not ids:
            return
        # Access the "id" column generically
        id_col = getattr(self.model, "id")
        await self.s.execute(delete(self.model).where(id_col.in_(ids)))

    # ---------- Read / Count ----------
    async def count(self) -> int:
        """Count the number of entities."""
        return await self.s.scalar(select(func.count()).select_from(self.model)) or 0

    async def exists_by_id(self, id: IdT) -> bool:
        """Check if an entity exists by its ID."""
        # Access the "id" column generically
        id_col = getattr(self.model, "id")
        q = select(select(id_col).where(id_col == id).exists())
        return await self.s.scalar(q) or False

    async def find_all(self) -> Sequence[T]:
        """Find all entities."""
        result = await self.s.scalars(select(self.model))
        return result.all()

    async def find_all_paginated(self, offset: int, limit: int) -> Sequence[T]:
        """Find entities with pagination."""
        result = await self.s.scalars(select(self.model).offset(offset).limit(limit))
        return result.all()

    async def find_all_by_id(self, ids: Sequence[IdT]) -> Sequence[T]:
        """Find entities by their IDs."""
        if not ids:
            return []
        id_col = getattr(self.model, "id")
        result = await self.s.scalars(select(self.model).where(id_col.in_(ids)))
        return result.all()

    async def find_by_id(self, id: IdT) -> T | None:
        """Find an entity by its ID."""
        return await self.s.get(self.model, id)
Functions
__init__(session, model)

Initialize repository with database session and model type.

Source code in src/servicekit/repository.py
def __init__(self, session: AsyncSession, model: type[T]) -> None:
    """Initialize repository with database session and model type."""
    self.s = session
    self.model = model
save(entity) async

Save an entity to the database.

Source code in src/servicekit/repository.py
async def save(self, entity: T) -> T:
    """Save an entity to the database."""
    self.s.add(entity)
    return entity
save_all(entities) async

Save multiple entities to the database.

Source code in src/servicekit/repository.py
async def save_all(self, entities: Iterable[T]) -> Sequence[T]:
    """Save multiple entities to the database."""
    entity_list = list(entities)
    self.s.add_all(entity_list)
    return entity_list
commit() async

Commit the current database transaction.

Source code in src/servicekit/repository.py
async def commit(self) -> None:
    """Commit the current database transaction."""
    await self.s.commit()
refresh_many(entities) async

Refresh multiple entities from the database.

Source code in src/servicekit/repository.py
async def refresh_many(self, entities: Iterable[T]) -> None:
    """Refresh multiple entities from the database."""
    for e in entities:
        await self.s.refresh(e)
delete(entity) async

Delete an entity from the database.

Source code in src/servicekit/repository.py
async def delete(self, entity: T) -> None:
    """Delete an entity from the database."""
    await self.s.delete(entity)
delete_by_id(id) async

Delete an entity by its ID.

Source code in src/servicekit/repository.py
async def delete_by_id(self, id: IdT) -> None:
    """Delete an entity by its ID."""
    id_col = getattr(self.model, "id")
    await self.s.execute(delete(self.model).where(id_col == id))
delete_all() async

Delete all entities from the database.

Source code in src/servicekit/repository.py
async def delete_all(self) -> None:
    """Delete all entities from the database."""
    await self.s.execute(delete(self.model))
delete_all_by_id(ids) async

Delete multiple entities by their IDs.

Source code in src/servicekit/repository.py
async def delete_all_by_id(self, ids: Sequence[IdT]) -> None:
    """Delete multiple entities by their IDs."""
    if not ids:
        return
    # Access the "id" column generically
    id_col = getattr(self.model, "id")
    await self.s.execute(delete(self.model).where(id_col.in_(ids)))
count() async

Count the number of entities.

Source code in src/servicekit/repository.py
async def count(self) -> int:
    """Count the number of entities."""
    return await self.s.scalar(select(func.count()).select_from(self.model)) or 0
exists_by_id(id) async

Check if an entity exists by its ID.

Source code in src/servicekit/repository.py
async def exists_by_id(self, id: IdT) -> bool:
    """Check if an entity exists by its ID."""
    # Access the "id" column generically
    id_col = getattr(self.model, "id")
    q = select(select(id_col).where(id_col == id).exists())
    return await self.s.scalar(q) or False
find_all() async

Find all entities.

Source code in src/servicekit/repository.py
async def find_all(self) -> Sequence[T]:
    """Find all entities."""
    result = await self.s.scalars(select(self.model))
    return result.all()
find_all_paginated(offset, limit) async

Find entities with pagination.

Source code in src/servicekit/repository.py
async def find_all_paginated(self, offset: int, limit: int) -> Sequence[T]:
    """Find entities with pagination."""
    result = await self.s.scalars(select(self.model).offset(offset).limit(limit))
    return result.all()
find_all_by_id(ids) async

Find entities by their IDs.

Source code in src/servicekit/repository.py
async def find_all_by_id(self, ids: Sequence[IdT]) -> Sequence[T]:
    """Find entities by their IDs."""
    if not ids:
        return []
    id_col = getattr(self.model, "id")
    result = await self.s.scalars(select(self.model).where(id_col.in_(ids)))
    return result.all()
find_by_id(id) async

Find an entity by its ID.

Source code in src/servicekit/repository.py
async def find_by_id(self, id: IdT) -> T | None:
    """Find an entity by its ID."""
    return await self.s.get(self.model, id)

Manager

manager

Base classes for service layer managers with lifecycle hooks.

Classes

LifecycleHooks

Lifecycle hooks for entity operations.

Source code in src/servicekit/manager.py
class LifecycleHooks[ModelT, InSchemaT: BaseModel]:
    """Lifecycle hooks for entity operations."""

    def _should_assign_field(self, field: str, value: object) -> bool:
        """Determine if a field should be assigned during update."""
        return True

    async def pre_save(self, entity: ModelT, data: InSchemaT) -> None:
        """Hook called before saving a new entity."""
        pass

    async def post_save(self, entity: ModelT) -> None:
        """Hook called after saving a new entity."""
        pass

    async def pre_update(self, entity: ModelT, data: InSchemaT, old_values: dict[str, object]) -> None:
        """Hook called before updating an existing entity."""
        pass

    async def post_update(self, entity: ModelT, changes: dict[str, tuple[object, object]]) -> None:
        """Hook called after updating an existing entity."""
        pass

    async def pre_delete(self, entity: ModelT) -> None:
        """Hook called before deleting an entity."""
        pass

    async def post_delete(self, entity: ModelT) -> None:
        """Hook called after deleting an entity."""
        pass
Functions
pre_save(entity, data) async

Hook called before saving a new entity.

Source code in src/servicekit/manager.py
async def pre_save(self, entity: ModelT, data: InSchemaT) -> None:
    """Hook called before saving a new entity."""
    pass
post_save(entity) async

Hook called after saving a new entity.

Source code in src/servicekit/manager.py
async def post_save(self, entity: ModelT) -> None:
    """Hook called after saving a new entity."""
    pass
pre_update(entity, data, old_values) async

Hook called before updating an existing entity.

Source code in src/servicekit/manager.py
async def pre_update(self, entity: ModelT, data: InSchemaT, old_values: dict[str, object]) -> None:
    """Hook called before updating an existing entity."""
    pass
post_update(entity, changes) async

Hook called after updating an existing entity.

Source code in src/servicekit/manager.py
async def post_update(self, entity: ModelT, changes: dict[str, tuple[object, object]]) -> None:
    """Hook called after updating an existing entity."""
    pass
pre_delete(entity) async

Hook called before deleting an entity.

Source code in src/servicekit/manager.py
async def pre_delete(self, entity: ModelT) -> None:
    """Hook called before deleting an entity."""
    pass
post_delete(entity) async

Hook called after deleting an entity.

Source code in src/servicekit/manager.py
async def post_delete(self, entity: ModelT) -> None:
    """Hook called after deleting an entity."""
    pass

Manager

Bases: ABC

Abstract manager interface for business logic operations.

Source code in src/servicekit/manager.py
class Manager[InSchemaT: BaseModel, OutSchemaT: BaseModel, IdT](ABC):
    """Abstract manager interface for business logic operations."""

    @abstractmethod
    async def save(self, data: InSchemaT) -> OutSchemaT:
        """Save an entity."""
        ...

    @abstractmethod
    async def save_all(self, items: Iterable[InSchemaT]) -> list[OutSchemaT]:
        """Save multiple entities."""
        ...

    @abstractmethod
    async def delete_by_id(self, id: IdT) -> None:
        """Delete an entity by its ID."""
        ...

    @abstractmethod
    async def delete_all(self) -> None:
        """Delete all entities."""
        ...

    @abstractmethod
    async def delete_all_by_id(self, ids: Sequence[IdT]) -> None:
        """Delete multiple entities by their IDs."""
        ...

    @abstractmethod
    async def count(self) -> int:
        """Count the number of entities."""
        ...

    @abstractmethod
    async def exists_by_id(self, id: IdT) -> bool:
        """Check if an entity exists by its ID."""
        ...

    @abstractmethod
    async def find_by_id(self, id: IdT) -> OutSchemaT | None:
        """Find an entity by its ID."""
        ...

    @abstractmethod
    async def find_all(self) -> list[OutSchemaT]:
        """Find all entities."""
        ...

    @abstractmethod
    async def find_paginated(self, page: int, size: int) -> tuple[list[OutSchemaT], int]:
        """Find entities with pagination."""
        ...

    @abstractmethod
    async def find_all_by_id(self, ids: Sequence[IdT]) -> list[OutSchemaT]:
        """Find entities by their IDs."""
        ...
Functions
save(data) abstractmethod async

Save an entity.

Source code in src/servicekit/manager.py
@abstractmethod
async def save(self, data: InSchemaT) -> OutSchemaT:
    """Save an entity."""
    ...
save_all(items) abstractmethod async

Save multiple entities.

Source code in src/servicekit/manager.py
@abstractmethod
async def save_all(self, items: Iterable[InSchemaT]) -> list[OutSchemaT]:
    """Save multiple entities."""
    ...
delete_by_id(id) abstractmethod async

Delete an entity by its ID.

Source code in src/servicekit/manager.py
@abstractmethod
async def delete_by_id(self, id: IdT) -> None:
    """Delete an entity by its ID."""
    ...
delete_all() abstractmethod async

Delete all entities.

Source code in src/servicekit/manager.py
@abstractmethod
async def delete_all(self) -> None:
    """Delete all entities."""
    ...
delete_all_by_id(ids) abstractmethod async

Delete multiple entities by their IDs.

Source code in src/servicekit/manager.py
@abstractmethod
async def delete_all_by_id(self, ids: Sequence[IdT]) -> None:
    """Delete multiple entities by their IDs."""
    ...
count() abstractmethod async

Count the number of entities.

Source code in src/servicekit/manager.py
@abstractmethod
async def count(self) -> int:
    """Count the number of entities."""
    ...
exists_by_id(id) abstractmethod async

Check if an entity exists by its ID.

Source code in src/servicekit/manager.py
@abstractmethod
async def exists_by_id(self, id: IdT) -> bool:
    """Check if an entity exists by its ID."""
    ...
find_by_id(id) abstractmethod async

Find an entity by its ID.

Source code in src/servicekit/manager.py
@abstractmethod
async def find_by_id(self, id: IdT) -> OutSchemaT | None:
    """Find an entity by its ID."""
    ...
find_all() abstractmethod async

Find all entities.

Source code in src/servicekit/manager.py
@abstractmethod
async def find_all(self) -> list[OutSchemaT]:
    """Find all entities."""
    ...
find_paginated(page, size) abstractmethod async

Find entities with pagination.

Source code in src/servicekit/manager.py
@abstractmethod
async def find_paginated(self, page: int, size: int) -> tuple[list[OutSchemaT], int]:
    """Find entities with pagination."""
    ...
find_all_by_id(ids) abstractmethod async

Find entities by their IDs.

Source code in src/servicekit/manager.py
@abstractmethod
async def find_all_by_id(self, ids: Sequence[IdT]) -> list[OutSchemaT]:
    """Find entities by their IDs."""
    ...

BaseManager

Bases: LifecycleHooks[ModelT, InSchemaT], Manager[InSchemaT, OutSchemaT, IdT]

Base manager implementation with CRUD operations and lifecycle hooks.

Source code in src/servicekit/manager.py
class BaseManager[ModelT, InSchemaT: BaseModel, OutSchemaT: BaseModel, IdT](
    LifecycleHooks[ModelT, InSchemaT],
    Manager[InSchemaT, OutSchemaT, IdT],
):
    """Base manager implementation with CRUD operations and lifecycle hooks."""

    def __init__(
        self,
        repo: BaseRepository[ModelT, IdT],
        model_cls: type[ModelT],
        out_schema_cls: type[OutSchemaT],
    ) -> None:
        """Initialize manager with repository, model class, and output schema class."""
        self.repo = repo
        self.model_cls = model_cls
        self.out_schema_cls = out_schema_cls

    def _to_output_schema(self, entity: ModelT) -> OutSchemaT:
        """Convert ORM entity to output schema."""
        return self.out_schema_cls.model_validate(entity, from_attributes=True)

    async def save(self, data: InSchemaT) -> OutSchemaT:
        """Save an entity (create or update)."""
        data_dict = data.model_dump(exclude_none=True)
        entity_id = data_dict.get("id")
        existing: ModelT | None = None

        if entity_id is not None:
            existing = await self.repo.find_by_id(entity_id)

        if existing is None:
            if data_dict.get("id") is None:
                data_dict.pop("id", None)
            entity = self.model_cls(**data_dict)
            await self.pre_save(entity, data)
            await self.repo.save(entity)
            await self.repo.commit()
            await self.repo.refresh_many([entity])
            await self.post_save(entity)
            return self._to_output_schema(entity)

        tracked_fields = set(data_dict.keys())
        if hasattr(existing, "level"):  # pragma: no branch
            tracked_fields.add("level")
        old_values = {field: getattr(existing, field) for field in tracked_fields if hasattr(existing, field)}

        for key, value in data_dict.items():
            if key == "id":  # pragma: no branch
                continue
            if not self._should_assign_field(key, value):
                continue
            if hasattr(existing, key):
                setattr(existing, key, value)

        await self.pre_update(existing, data, old_values)

        changes: dict[str, tuple[object, object]] = {}
        for field in tracked_fields:
            if hasattr(existing, field):
                new_value = getattr(existing, field)
                old_value = old_values.get(field)
                if old_value != new_value:
                    changes[field] = (old_value, new_value)

        await self.repo.save(existing)
        await self.repo.commit()
        await self.repo.refresh_many([existing])
        await self.post_update(existing, changes)
        return self._to_output_schema(existing)

    async def save_all(self, items: Iterable[InSchemaT]) -> list[OutSchemaT]:
        entities_to_insert: list[ModelT] = []
        updates: list[tuple[ModelT, dict[str, tuple[object, object]]]] = []
        outputs: list[ModelT] = []

        for data in items:
            data_dict = data.model_dump(exclude_none=True)
            entity_id = data_dict.get("id")
            existing: ModelT | None = None
            if entity_id is not None:
                existing = await self.repo.find_by_id(entity_id)

            if existing is None:
                if data_dict.get("id") is None:
                    data_dict.pop("id", None)
                entity = self.model_cls(**data_dict)
                await self.pre_save(entity, data)
                entities_to_insert.append(entity)
                outputs.append(entity)
                continue

            tracked_fields = set(data_dict.keys())
            if hasattr(existing, "level"):  # pragma: no branch
                tracked_fields.add("level")
            old_values = {field: getattr(existing, field) for field in tracked_fields if hasattr(existing, field)}

            for key, value in data_dict.items():
                if key == "id":  # pragma: no branch
                    continue
                if not self._should_assign_field(key, value):
                    continue
                if hasattr(existing, key):
                    setattr(existing, key, value)

            await self.pre_update(existing, data, old_values)

            changes: dict[str, tuple[object, object]] = {}
            for field in tracked_fields:
                if hasattr(existing, field):
                    new_value = getattr(existing, field)
                    old_value = old_values.get(field)
                    if old_value != new_value:
                        changes[field] = (old_value, new_value)

            updates.append((existing, changes))
            outputs.append(existing)

        if entities_to_insert:  # pragma: no branch
            await self.repo.save_all(entities_to_insert)
        await self.repo.commit()
        if outputs:  # pragma: no branch
            await self.repo.refresh_many(outputs)

        for entity in entities_to_insert:
            await self.post_save(entity)
        for entity, changes in updates:
            await self.post_update(entity, changes)

        return [self._to_output_schema(entity) for entity in outputs]

    async def delete_by_id(self, id: IdT) -> None:
        """Delete an entity by its ID."""
        entity = await self.repo.find_by_id(id)
        if entity is None:
            return
        await self.pre_delete(entity)
        await self.repo.delete(entity)
        await self.repo.commit()
        await self.post_delete(entity)

    async def delete_all(self) -> None:
        """Delete all entities."""
        entities = await self.repo.find_all()
        for entity in entities:
            await self.pre_delete(entity)
        await self.repo.delete_all()
        await self.repo.commit()
        for entity in entities:
            await self.post_delete(entity)

    async def delete_all_by_id(self, ids: Sequence[IdT]) -> None:
        """Delete multiple entities by their IDs."""
        if not ids:
            return
        entities = await self.repo.find_all_by_id(ids)
        for entity in entities:
            await self.pre_delete(entity)
        await self.repo.delete_all_by_id(ids)
        await self.repo.commit()
        for entity in entities:
            await self.post_delete(entity)

    async def count(self) -> int:
        """Count the number of entities."""
        return await self.repo.count()

    async def exists_by_id(self, id: IdT) -> bool:
        """Check if an entity exists by its ID."""
        return await self.repo.exists_by_id(id)

    async def find_by_id(self, id: IdT) -> OutSchemaT | None:
        """Find an entity by its ID."""
        entity = await self.repo.find_by_id(id)
        if entity is None:
            return None
        return self._to_output_schema(entity)

    async def find_all(self) -> list[OutSchemaT]:
        """Find all entities."""
        entities = await self.repo.find_all()
        return [self._to_output_schema(e) for e in entities]

    async def find_paginated(self, page: int, size: int) -> tuple[list[OutSchemaT], int]:
        """Find entities with pagination."""
        offset = (page - 1) * size
        entities = await self.repo.find_all_paginated(offset, size)
        total = await self.repo.count()
        return [self._to_output_schema(e) for e in entities], total

    async def find_all_by_id(self, ids: Sequence[IdT]) -> list[OutSchemaT]:
        """Find entities by their IDs."""
        entities = await self.repo.find_all_by_id(ids)
        return [self._to_output_schema(e) for e in entities]
Functions
__init__(repo, model_cls, out_schema_cls)

Initialize manager with repository, model class, and output schema class.

Source code in src/servicekit/manager.py
def __init__(
    self,
    repo: BaseRepository[ModelT, IdT],
    model_cls: type[ModelT],
    out_schema_cls: type[OutSchemaT],
) -> None:
    """Initialize manager with repository, model class, and output schema class."""
    self.repo = repo
    self.model_cls = model_cls
    self.out_schema_cls = out_schema_cls
save(data) async

Save an entity (create or update).

Source code in src/servicekit/manager.py
async def save(self, data: InSchemaT) -> OutSchemaT:
    """Save an entity (create or update)."""
    data_dict = data.model_dump(exclude_none=True)
    entity_id = data_dict.get("id")
    existing: ModelT | None = None

    if entity_id is not None:
        existing = await self.repo.find_by_id(entity_id)

    if existing is None:
        if data_dict.get("id") is None:
            data_dict.pop("id", None)
        entity = self.model_cls(**data_dict)
        await self.pre_save(entity, data)
        await self.repo.save(entity)
        await self.repo.commit()
        await self.repo.refresh_many([entity])
        await self.post_save(entity)
        return self._to_output_schema(entity)

    tracked_fields = set(data_dict.keys())
    if hasattr(existing, "level"):  # pragma: no branch
        tracked_fields.add("level")
    old_values = {field: getattr(existing, field) for field in tracked_fields if hasattr(existing, field)}

    for key, value in data_dict.items():
        if key == "id":  # pragma: no branch
            continue
        if not self._should_assign_field(key, value):
            continue
        if hasattr(existing, key):
            setattr(existing, key, value)

    await self.pre_update(existing, data, old_values)

    changes: dict[str, tuple[object, object]] = {}
    for field in tracked_fields:
        if hasattr(existing, field):
            new_value = getattr(existing, field)
            old_value = old_values.get(field)
            if old_value != new_value:
                changes[field] = (old_value, new_value)

    await self.repo.save(existing)
    await self.repo.commit()
    await self.repo.refresh_many([existing])
    await self.post_update(existing, changes)
    return self._to_output_schema(existing)
delete_by_id(id) async

Delete an entity by its ID.

Source code in src/servicekit/manager.py
async def delete_by_id(self, id: IdT) -> None:
    """Delete an entity by its ID."""
    entity = await self.repo.find_by_id(id)
    if entity is None:
        return
    await self.pre_delete(entity)
    await self.repo.delete(entity)
    await self.repo.commit()
    await self.post_delete(entity)
delete_all() async

Delete all entities.

Source code in src/servicekit/manager.py
async def delete_all(self) -> None:
    """Delete all entities."""
    entities = await self.repo.find_all()
    for entity in entities:
        await self.pre_delete(entity)
    await self.repo.delete_all()
    await self.repo.commit()
    for entity in entities:
        await self.post_delete(entity)
delete_all_by_id(ids) async

Delete multiple entities by their IDs.

Source code in src/servicekit/manager.py
async def delete_all_by_id(self, ids: Sequence[IdT]) -> None:
    """Delete multiple entities by their IDs."""
    if not ids:
        return
    entities = await self.repo.find_all_by_id(ids)
    for entity in entities:
        await self.pre_delete(entity)
    await self.repo.delete_all_by_id(ids)
    await self.repo.commit()
    for entity in entities:
        await self.post_delete(entity)
count() async

Count the number of entities.

Source code in src/servicekit/manager.py
async def count(self) -> int:
    """Count the number of entities."""
    return await self.repo.count()
exists_by_id(id) async

Check if an entity exists by its ID.

Source code in src/servicekit/manager.py
async def exists_by_id(self, id: IdT) -> bool:
    """Check if an entity exists by its ID."""
    return await self.repo.exists_by_id(id)
find_by_id(id) async

Find an entity by its ID.

Source code in src/servicekit/manager.py
async def find_by_id(self, id: IdT) -> OutSchemaT | None:
    """Find an entity by its ID."""
    entity = await self.repo.find_by_id(id)
    if entity is None:
        return None
    return self._to_output_schema(entity)
find_all() async

Find all entities.

Source code in src/servicekit/manager.py
async def find_all(self) -> list[OutSchemaT]:
    """Find all entities."""
    entities = await self.repo.find_all()
    return [self._to_output_schema(e) for e in entities]
find_paginated(page, size) async

Find entities with pagination.

Source code in src/servicekit/manager.py
async def find_paginated(self, page: int, size: int) -> tuple[list[OutSchemaT], int]:
    """Find entities with pagination."""
    offset = (page - 1) * size
    entities = await self.repo.find_all_paginated(offset, size)
    total = await self.repo.count()
    return [self._to_output_schema(e) for e in entities], total
find_all_by_id(ids) async

Find entities by their IDs.

Source code in src/servicekit/manager.py
async def find_all_by_id(self, ids: Sequence[IdT]) -> list[OutSchemaT]:
    """Find entities by their IDs."""
    entities = await self.repo.find_all_by_id(ids)
    return [self._to_output_schema(e) for e in entities]

Schemas

schemas

Core Pydantic schemas for entities, responses, and jobs.

Classes

EntityIn

Bases: BaseModel

Base input schema for entities with optional ID.

Source code in src/servicekit/schemas.py
class EntityIn(BaseModel):
    """Base input schema for entities with optional ID."""

    model_config = ConfigDict(arbitrary_types_allowed=True)

    id: ULID | None = None

EntityOut

Bases: BaseModel

Base output schema for entities with ID and timestamps.

Source code in src/servicekit/schemas.py
class EntityOut(BaseModel):
    """Base output schema for entities with ID and timestamps."""

    model_config = ConfigDict(from_attributes=True, arbitrary_types_allowed=True)

    id: ULID
    created_at: datetime
    updated_at: datetime

PaginatedResponse

Bases: BaseModel, Generic[T]

Paginated response with items, total count, page number, and computed page count.

Source code in src/servicekit/schemas.py
class PaginatedResponse(BaseModel, Generic[T]):
    """Paginated response with items, total count, page number, and computed page count."""

    items: list[T] = Field(description="List of items for the current page")
    total: int = Field(description="Total number of items across all pages", ge=0)
    page: int = Field(description="Current page number (1-indexed)", ge=1)
    size: int = Field(description="Number of items per page", ge=1)

    @computed_field  # type: ignore[prop-decorator]
    @property
    def pages(self) -> int:
        """Total number of pages."""
        if self.total == 0:
            return 0
        return (self.total + self.size - 1) // self.size
Attributes
pages property

Total number of pages.

BulkOperationError

Bases: BaseModel

Error information for a single item in a bulk operation.

Source code in src/servicekit/schemas.py
class BulkOperationError(BaseModel):
    """Error information for a single item in a bulk operation."""

    id: str = Field(description="Identifier of the item that failed")
    reason: str = Field(description="Human-readable error message")

BulkOperationResult

Bases: BaseModel

Result of bulk operation with counts of succeeded/failed items and error details.

Source code in src/servicekit/schemas.py
class BulkOperationResult(BaseModel):
    """Result of bulk operation with counts of succeeded/failed items and error details."""

    total: int = Field(description="Total number of items processed", ge=0)
    succeeded: int = Field(description="Number of items successfully processed", ge=0)
    failed: int = Field(description="Number of items that failed", ge=0)
    errors: list[BulkOperationError] = Field(default_factory=list, description="Details of failed items (if any)")

ProblemDetail

Bases: BaseModel

RFC 9457 Problem Details with URN error type, status, and human-readable messages.

Source code in src/servicekit/schemas.py
class ProblemDetail(BaseModel):
    """RFC 9457 Problem Details with URN error type, status, and human-readable messages."""

    type: str = Field(
        default="about:blank",
        description="URI reference identifying the problem type (URN format for servicekit errors)",
    )
    title: str = Field(description="Short, human-readable summary of the problem type")
    status: int = Field(description="HTTP status code", ge=100, le=599)
    detail: str | None = Field(default=None, description="Human-readable explanation specific to this occurrence")
    instance: str | None = Field(default=None, description="URI reference identifying the specific occurrence")
    trace_id: str | None = Field(default=None, description="Optional trace ID for debugging")

    model_config = {
        "json_schema_extra": {
            "examples": [
                {
                    "type": "urn:servicekit:error:not-found",
                    "title": "Resource Not Found",
                    "status": 404,
                    "detail": "Config with id 01ABC... not found",
                    "instance": "/api/config/01ABC...",
                }
            ]
        }
    }

JobStatus

Bases: StrEnum

Status of a scheduled job.

Source code in src/servicekit/schemas.py
class JobStatus(StrEnum):
    """Status of a scheduled job."""

    pending = "pending"
    running = "running"
    completed = "completed"
    failed = "failed"
    canceled = "canceled"

JobRecord

Bases: BaseModel

Complete record of a scheduled job's state and metadata.

Source code in src/servicekit/schemas.py
class JobRecord(BaseModel):
    """Complete record of a scheduled job's state and metadata."""

    model_config = ConfigDict(arbitrary_types_allowed=True)

    id: ULID = Field(description="Unique job identifier")
    status: JobStatus = Field(default=JobStatus.pending, description="Current job status")
    submitted_at: datetime | None = Field(default=None, description="When the job was submitted")
    started_at: datetime | None = Field(default=None, description="When the job started running")
    finished_at: datetime | None = Field(default=None, description="When the job finished")
    error: str | None = Field(default=None, description="User-friendly error message if job failed")
    error_traceback: str | None = Field(default=None, description="Full error traceback for debugging")

Exceptions

exceptions

Custom exceptions with RFC 9457 Problem Details support.

Classes

ErrorType

URN-based error type identifiers for RFC 9457 Problem Details.

Source code in src/servicekit/exceptions.py
class ErrorType:
    """URN-based error type identifiers for RFC 9457 Problem Details."""

    NOT_FOUND = "urn:servicekit:error:not-found"
    VALIDATION_FAILED = "urn:servicekit:error:validation-failed"
    CONFLICT = "urn:servicekit:error:conflict"
    INVALID_ULID = "urn:servicekit:error:invalid-ulid"
    INTERNAL_ERROR = "urn:servicekit:error:internal"
    UNAUTHORIZED = "urn:servicekit:error:unauthorized"
    FORBIDDEN = "urn:servicekit:error:forbidden"
    BAD_REQUEST = "urn:servicekit:error:bad-request"

ServicekitException

Bases: Exception

Base exception for servicekit with RFC 9457 Problem Details support.

Source code in src/servicekit/exceptions.py
class ServicekitException(Exception):
    """Base exception for servicekit with RFC 9457 Problem Details support."""

    def __init__(
        self,
        detail: str,
        *,
        type_uri: str = ErrorType.INTERNAL_ERROR,
        title: str = "Internal Server Error",
        status: int = 500,
        instance: str | None = None,
        **extensions: Any,
    ) -> None:
        super().__init__(detail)
        self.type_uri = type_uri
        self.title = title
        self.status = status
        self.detail = detail
        self.instance = instance
        self.extensions = extensions

NotFoundError

Bases: ServicekitException

Resource not found exception (404).

Source code in src/servicekit/exceptions.py
class NotFoundError(ServicekitException):
    """Resource not found exception (404)."""

    def __init__(self, detail: str, *, instance: str | None = None, **extensions: Any) -> None:
        super().__init__(
            detail,
            type_uri=ErrorType.NOT_FOUND,
            title="Resource Not Found",
            status=404,
            instance=instance,
            **extensions,
        )

ValidationError

Bases: ServicekitException

Validation failed exception (400).

Source code in src/servicekit/exceptions.py
class ValidationError(ServicekitException):
    """Validation failed exception (400)."""

    def __init__(self, detail: str, *, instance: str | None = None, **extensions: Any) -> None:
        super().__init__(
            detail,
            type_uri=ErrorType.VALIDATION_FAILED,
            title="Validation Failed",
            status=400,
            instance=instance,
            **extensions,
        )

ConflictError

Bases: ServicekitException

Resource conflict exception (409).

Source code in src/servicekit/exceptions.py
class ConflictError(ServicekitException):
    """Resource conflict exception (409)."""

    def __init__(self, detail: str, *, instance: str | None = None, **extensions: Any) -> None:
        super().__init__(
            detail,
            type_uri=ErrorType.CONFLICT,
            title="Resource Conflict",
            status=409,
            instance=instance,
            **extensions,
        )

InvalidULIDError

Bases: ServicekitException

Invalid ULID format exception (400).

Source code in src/servicekit/exceptions.py
class InvalidULIDError(ServicekitException):
    """Invalid ULID format exception (400)."""

    def __init__(self, detail: str, *, instance: str | None = None, **extensions: Any) -> None:
        super().__init__(
            detail,
            type_uri=ErrorType.INVALID_ULID,
            title="Invalid ULID Format",
            status=400,
            instance=instance,
            **extensions,
        )

BadRequestError

Bases: ServicekitException

Bad request exception (400).

Source code in src/servicekit/exceptions.py
class BadRequestError(ServicekitException):
    """Bad request exception (400)."""

    def __init__(self, detail: str, *, instance: str | None = None, **extensions: Any) -> None:
        super().__init__(
            detail,
            type_uri=ErrorType.BAD_REQUEST,
            title="Bad Request",
            status=400,
            instance=instance,
            **extensions,
        )

UnauthorizedError

Bases: ServicekitException

Unauthorized exception (401).

Source code in src/servicekit/exceptions.py
class UnauthorizedError(ServicekitException):
    """Unauthorized exception (401)."""

    def __init__(self, detail: str, *, instance: str | None = None, **extensions: Any) -> None:
        super().__init__(
            detail,
            type_uri=ErrorType.UNAUTHORIZED,
            title="Unauthorized",
            status=401,
            instance=instance,
            **extensions,
        )

ForbiddenError

Bases: ServicekitException

Forbidden exception (403).

Source code in src/servicekit/exceptions.py
class ForbiddenError(ServicekitException):
    """Forbidden exception (403)."""

    def __init__(self, detail: str, *, instance: str | None = None, **extensions: Any) -> None:
        super().__init__(
            detail,
            type_uri=ErrorType.FORBIDDEN,
            title="Forbidden",
            status=403,
            instance=instance,
            **extensions,
        )

Scheduler

scheduler

Job scheduler for async task management with in-memory asyncio implementation.

Classes

JobScheduler

Bases: BaseModel, ABC

Abstract job scheduler interface for async task management.

Source code in src/servicekit/scheduler.py
class JobScheduler(BaseModel, ABC):
    """Abstract job scheduler interface for async task management."""

    model_config = ConfigDict(arbitrary_types_allowed=True)

    @abstractmethod
    async def add_job(
        self,
        target: JobTarget,
        /,
        *args: Any,
        **kwargs: Any,
    ) -> ULID:
        """Add a job to the scheduler and return its ID."""
        ...

    @abstractmethod
    async def get_status(self, job_id: ULID) -> JobStatus:
        """Get the status of a job."""
        ...

    @abstractmethod
    async def get_record(self, job_id: ULID) -> JobRecord:
        """Get the full record of a job."""
        ...

    @abstractmethod
    async def get_all_records(self) -> list[JobRecord]:
        """Get all job records."""
        ...

    @abstractmethod
    async def cancel(self, job_id: ULID) -> bool:
        """Cancel a running job."""
        ...

    @abstractmethod
    async def delete(self, job_id: ULID) -> None:
        """Delete a job record."""
        ...

    @abstractmethod
    async def wait(self, job_id: ULID, timeout: float | None = None) -> None:
        """Wait for a job to complete."""
        ...

    @abstractmethod
    async def get_result(self, job_id: ULID) -> Any:
        """Get the result of a completed job."""
        ...
Functions
add_job(target, /, *args, **kwargs) abstractmethod async

Add a job to the scheduler and return its ID.

Source code in src/servicekit/scheduler.py
@abstractmethod
async def add_job(
    self,
    target: JobTarget,
    /,
    *args: Any,
    **kwargs: Any,
) -> ULID:
    """Add a job to the scheduler and return its ID."""
    ...
get_status(job_id) abstractmethod async

Get the status of a job.

Source code in src/servicekit/scheduler.py
@abstractmethod
async def get_status(self, job_id: ULID) -> JobStatus:
    """Get the status of a job."""
    ...
get_record(job_id) abstractmethod async

Get the full record of a job.

Source code in src/servicekit/scheduler.py
@abstractmethod
async def get_record(self, job_id: ULID) -> JobRecord:
    """Get the full record of a job."""
    ...
get_all_records() abstractmethod async

Get all job records.

Source code in src/servicekit/scheduler.py
@abstractmethod
async def get_all_records(self) -> list[JobRecord]:
    """Get all job records."""
    ...
cancel(job_id) abstractmethod async

Cancel a running job.

Source code in src/servicekit/scheduler.py
@abstractmethod
async def cancel(self, job_id: ULID) -> bool:
    """Cancel a running job."""
    ...
delete(job_id) abstractmethod async

Delete a job record.

Source code in src/servicekit/scheduler.py
@abstractmethod
async def delete(self, job_id: ULID) -> None:
    """Delete a job record."""
    ...
wait(job_id, timeout=None) abstractmethod async

Wait for a job to complete.

Source code in src/servicekit/scheduler.py
@abstractmethod
async def wait(self, job_id: ULID, timeout: float | None = None) -> None:
    """Wait for a job to complete."""
    ...
get_result(job_id) abstractmethod async

Get the result of a completed job.

Source code in src/servicekit/scheduler.py
@abstractmethod
async def get_result(self, job_id: ULID) -> Any:
    """Get the result of a completed job."""
    ...

AIOJobScheduler

Bases: JobScheduler

In-memory asyncio scheduler. Sync callables run in thread pool, concurrency controlled via semaphore.

Source code in src/servicekit/scheduler.py
class AIOJobScheduler(JobScheduler):
    """In-memory asyncio scheduler. Sync callables run in thread pool, concurrency controlled via semaphore."""

    name: str = Field(default="chap")
    max_concurrency: int | None = Field(default=None)

    _records: dict[ULID, JobRecord] = PrivateAttr(default_factory=dict)
    _results: dict[ULID, Any] = PrivateAttr(default_factory=dict)
    _tasks: dict[ULID, asyncio.Task[Any]] = PrivateAttr(default_factory=dict)
    _lock: asyncio.Lock = PrivateAttr(default_factory=asyncio.Lock)
    _sema: asyncio.Semaphore | None = PrivateAttr(default=None)

    def __init__(self, **data: Any):
        """Initialize scheduler with optional concurrency limit."""
        super().__init__(**data)
        if self.max_concurrency and self.max_concurrency > 0:
            self._sema = asyncio.Semaphore(self.max_concurrency)

    async def set_max_concurrency(self, n: int | None) -> None:
        """Set maximum number of concurrent jobs."""
        async with self._lock:
            self.max_concurrency = n
            if n and n > 0:
                self._sema = asyncio.Semaphore(n)
            else:
                self._sema = None

    async def add_job(
        self,
        target: JobTarget,
        /,
        *args: Any,
        **kwargs: Any,
    ) -> ULID:
        """Add a job to the scheduler and return its ID."""
        now = datetime.now(timezone.utc)
        jid = ULID()

        record = JobRecord(
            id=jid,
            status=JobStatus.pending,
            submitted_at=now,
        )

        async with self._lock:
            if jid in self._tasks:
                raise RuntimeError(f"Job {jid!r} already scheduled")
            self._records[jid] = record

        async def _execute_target() -> Any:
            if inspect.isawaitable(target):
                if args or kwargs:
                    # Close the coroutine to avoid "coroutine was never awaited" warning
                    if inspect.iscoroutine(target):
                        target.close()
                    raise TypeError("Args/kwargs not supported when target is an awaitable object.")
                return await target
            if inspect.iscoroutinefunction(target):
                return await target(*args, **kwargs)
            return await asyncio.to_thread(target, *args, **kwargs)

        async def _runner() -> Any:
            if self._sema:
                async with self._sema:
                    return await self._run_with_state(jid, _execute_target)
            else:
                return await self._run_with_state(jid, _execute_target)

        task = asyncio.create_task(_runner(), name=f"{self.name}-job-{jid}")

        def _drain(t: asyncio.Task[Any]) -> None:
            try:
                t.result()
            except Exception:
                pass

        task.add_done_callback(_drain)

        async with self._lock:
            self._tasks[jid] = task

        return jid

    async def _run_with_state(
        self,
        jid: ULID,
        exec_fn: JobExecutor,
    ) -> Any:
        """Execute job function and manage its state transitions."""
        async with self._lock:
            rec = self._records[jid]
            rec.status = JobStatus.running
            rec.started_at = datetime.now(timezone.utc)

        try:
            result = await exec_fn()

            async with self._lock:
                rec = self._records[jid]
                rec.status = JobStatus.completed
                rec.finished_at = datetime.now(timezone.utc)
                self._results[jid] = result

            return result

        except asyncio.CancelledError:
            async with self._lock:
                rec = self._records[jid]
                rec.status = JobStatus.canceled
                rec.finished_at = datetime.now(timezone.utc)

            raise

        except Exception as e:
            tb = traceback.format_exc()
            # Extract clean error message (exception type and message only)
            error_lines = tb.strip().split("\n")
            clean_error = error_lines[-1] if error_lines else str(e)

            async with self._lock:
                rec = self._records[jid]
                rec.status = JobStatus.failed
                rec.finished_at = datetime.now(timezone.utc)
                rec.error = clean_error
                rec.error_traceback = tb

            raise

    async def get_all_records(self) -> list[JobRecord]:
        """Get all job records sorted by submission time."""
        async with self._lock:
            records = [r.model_copy(deep=True) for r in self._records.values()]

        records.sort(
            key=lambda r: getattr(r, "submitted_at", datetime.min.replace(tzinfo=timezone.utc)),
            reverse=True,
        )

        return records

    async def get_record(self, job_id: ULID) -> JobRecord:
        """Get the full record of a job."""
        async with self._lock:
            rec = self._records.get(job_id)

            if rec is None:
                raise KeyError("Job not found")

            return rec.model_copy(deep=True)

    async def get_status(self, job_id: ULID) -> JobStatus:
        """Get the status of a job."""
        async with self._lock:
            rec = self._records.get(job_id)

            if rec is None:
                raise KeyError("Job not found")

            return rec.status

    async def get_result(self, job_id: ULID) -> Any:
        """Get the result of a completed job."""
        async with self._lock:
            rec = self._records.get(job_id)

            if rec is None:
                raise KeyError("Job not found")

            if rec.status == JobStatus.completed:
                return self._results.get(job_id)

            if rec.status == JobStatus.failed:
                msg = getattr(rec, "error", "Job failed")
                raise RuntimeError(msg)

            raise RuntimeError(f"Job not finished (status={rec.status})")

    async def wait(self, job_id: ULID, timeout: float | None = None) -> None:
        """Wait for a job to complete."""
        async with self._lock:
            task = self._tasks.get(job_id)

            if task is None:
                raise KeyError("Job not found")

        await asyncio.wait_for(asyncio.shield(task), timeout=timeout)

    async def cancel(self, job_id: ULID) -> bool:
        """Cancel a running job."""
        async with self._lock:
            task = self._tasks.get(job_id)
            exists = job_id in self._records

        if not exists:
            raise KeyError("Job not found")

        if not task or task.done():
            return False

        task.cancel()

        try:
            await task
        except asyncio.CancelledError:
            pass

        return True

    async def delete(self, job_id: ULID) -> None:
        """Delete a job record."""
        async with self._lock:
            rec = self._records.get(job_id)
            task = self._tasks.get(job_id)

        if rec is None:
            raise KeyError("Job not found")

        if task and not task.done():
            task.cancel()

            try:
                await task
            except asyncio.CancelledError:
                pass

        async with self._lock:
            self._records.pop(job_id, None)
            self._tasks.pop(job_id, None)
            self._results.pop(job_id, None)
Functions
__init__(**data)

Initialize scheduler with optional concurrency limit.

Source code in src/servicekit/scheduler.py
def __init__(self, **data: Any):
    """Initialize scheduler with optional concurrency limit."""
    super().__init__(**data)
    if self.max_concurrency and self.max_concurrency > 0:
        self._sema = asyncio.Semaphore(self.max_concurrency)
set_max_concurrency(n) async

Set maximum number of concurrent jobs.

Source code in src/servicekit/scheduler.py
async def set_max_concurrency(self, n: int | None) -> None:
    """Set maximum number of concurrent jobs."""
    async with self._lock:
        self.max_concurrency = n
        if n and n > 0:
            self._sema = asyncio.Semaphore(n)
        else:
            self._sema = None
add_job(target, /, *args, **kwargs) async

Add a job to the scheduler and return its ID.

Source code in src/servicekit/scheduler.py
async def add_job(
    self,
    target: JobTarget,
    /,
    *args: Any,
    **kwargs: Any,
) -> ULID:
    """Add a job to the scheduler and return its ID."""
    now = datetime.now(timezone.utc)
    jid = ULID()

    record = JobRecord(
        id=jid,
        status=JobStatus.pending,
        submitted_at=now,
    )

    async with self._lock:
        if jid in self._tasks:
            raise RuntimeError(f"Job {jid!r} already scheduled")
        self._records[jid] = record

    async def _execute_target() -> Any:
        if inspect.isawaitable(target):
            if args or kwargs:
                # Close the coroutine to avoid "coroutine was never awaited" warning
                if inspect.iscoroutine(target):
                    target.close()
                raise TypeError("Args/kwargs not supported when target is an awaitable object.")
            return await target
        if inspect.iscoroutinefunction(target):
            return await target(*args, **kwargs)
        return await asyncio.to_thread(target, *args, **kwargs)

    async def _runner() -> Any:
        if self._sema:
            async with self._sema:
                return await self._run_with_state(jid, _execute_target)
        else:
            return await self._run_with_state(jid, _execute_target)

    task = asyncio.create_task(_runner(), name=f"{self.name}-job-{jid}")

    def _drain(t: asyncio.Task[Any]) -> None:
        try:
            t.result()
        except Exception:
            pass

    task.add_done_callback(_drain)

    async with self._lock:
        self._tasks[jid] = task

    return jid
get_all_records() async

Get all job records sorted by submission time.

Source code in src/servicekit/scheduler.py
async def get_all_records(self) -> list[JobRecord]:
    """Get all job records sorted by submission time."""
    async with self._lock:
        records = [r.model_copy(deep=True) for r in self._records.values()]

    records.sort(
        key=lambda r: getattr(r, "submitted_at", datetime.min.replace(tzinfo=timezone.utc)),
        reverse=True,
    )

    return records
get_record(job_id) async

Get the full record of a job.

Source code in src/servicekit/scheduler.py
async def get_record(self, job_id: ULID) -> JobRecord:
    """Get the full record of a job."""
    async with self._lock:
        rec = self._records.get(job_id)

        if rec is None:
            raise KeyError("Job not found")

        return rec.model_copy(deep=True)
get_status(job_id) async

Get the status of a job.

Source code in src/servicekit/scheduler.py
async def get_status(self, job_id: ULID) -> JobStatus:
    """Get the status of a job."""
    async with self._lock:
        rec = self._records.get(job_id)

        if rec is None:
            raise KeyError("Job not found")

        return rec.status
get_result(job_id) async

Get the result of a completed job.

Source code in src/servicekit/scheduler.py
async def get_result(self, job_id: ULID) -> Any:
    """Get the result of a completed job."""
    async with self._lock:
        rec = self._records.get(job_id)

        if rec is None:
            raise KeyError("Job not found")

        if rec.status == JobStatus.completed:
            return self._results.get(job_id)

        if rec.status == JobStatus.failed:
            msg = getattr(rec, "error", "Job failed")
            raise RuntimeError(msg)

        raise RuntimeError(f"Job not finished (status={rec.status})")
wait(job_id, timeout=None) async

Wait for a job to complete.

Source code in src/servicekit/scheduler.py
async def wait(self, job_id: ULID, timeout: float | None = None) -> None:
    """Wait for a job to complete."""
    async with self._lock:
        task = self._tasks.get(job_id)

        if task is None:
            raise KeyError("Job not found")

    await asyncio.wait_for(asyncio.shield(task), timeout=timeout)
cancel(job_id) async

Cancel a running job.

Source code in src/servicekit/scheduler.py
async def cancel(self, job_id: ULID) -> bool:
    """Cancel a running job."""
    async with self._lock:
        task = self._tasks.get(job_id)
        exists = job_id in self._records

    if not exists:
        raise KeyError("Job not found")

    if not task or task.done():
        return False

    task.cancel()

    try:
        await task
    except asyncio.CancelledError:
        pass

    return True
delete(job_id) async

Delete a job record.

Source code in src/servicekit/scheduler.py
async def delete(self, job_id: ULID) -> None:
    """Delete a job record."""
    async with self._lock:
        rec = self._records.get(job_id)
        task = self._tasks.get(job_id)

    if rec is None:
        raise KeyError("Job not found")

    if task and not task.done():
        task.cancel()

        try:
            await task
        except asyncio.CancelledError:
            pass

    async with self._lock:
        self._records.pop(job_id, None)
        self._tasks.pop(job_id, None)
        self._results.pop(job_id, None)

Types

types

Custom types for servicekit - SQLAlchemy and Pydantic types.

Attributes

JsonSafe = Annotated[Any, PlainSerializer(_serialize_with_metadata, return_type=Any)] module-attribute

Pydantic type for JSON-safe serialization with graceful handling of non-serializable values.

Classes

ULIDType

Bases: TypeDecorator[ULID]

SQLAlchemy custom type for ULID stored as 26-character strings.

Source code in src/servicekit/types.py
class ULIDType(TypeDecorator[ULID]):
    """SQLAlchemy custom type for ULID stored as 26-character strings."""

    impl = String(26)
    cache_ok = True

    def process_bind_param(self, value: ULID | str | None, dialect: Any) -> str | None:
        """Convert ULID to string for database storage."""
        if value is None:
            return None
        if isinstance(value, str):
            return str(ULID.from_str(value))  # Validate and normalize
        return str(value)

    def process_result_value(self, value: str | None, dialect: Any) -> ULID | None:
        """Convert string from database to ULID object."""
        if value is None:
            return None
        return ULID.from_str(value)
Functions
process_bind_param(value, dialect)

Convert ULID to string for database storage.

Source code in src/servicekit/types.py
def process_bind_param(self, value: ULID | str | None, dialect: Any) -> str | None:
    """Convert ULID to string for database storage."""
    if value is None:
        return None
    if isinstance(value, str):
        return str(ULID.from_str(value))  # Validate and normalize
    return str(value)
process_result_value(value, dialect)

Convert string from database to ULID object.

Source code in src/servicekit/types.py
def process_result_value(self, value: str | None, dialect: Any) -> ULID | None:
    """Convert string from database to ULID object."""
    if value is None:
        return None
    return ULID.from_str(value)

Logging

logging

Structured logging configuration with request tracing support.

Functions

configure_logging()

Configure structlog and intercept standard library logging.

Source code in src/servicekit/logging.py
def configure_logging() -> None:
    """Configure structlog and intercept standard library logging."""
    log_format = os.getenv("LOG_FORMAT", "console").lower()
    log_level = os.getenv("LOG_LEVEL", "INFO").upper()
    level = getattr(logging, log_level, logging.INFO)

    # Shared processors for structlog
    shared_processors: list[Processor] = [
        structlog.contextvars.merge_contextvars,
        structlog.stdlib.add_log_level,
        structlog.processors.TimeStamper(fmt="iso", utc=True),
        structlog.processors.StackInfoRenderer(),
    ]

    # Choose renderer based on format
    if log_format == "json":
        formatter_processors = shared_processors + [
            structlog.stdlib.ProcessorFormatter.remove_processors_meta,
            structlog.processors.format_exc_info,
            structlog.processors.JSONRenderer(),
        ]
    else:
        formatter_processors = shared_processors + [
            structlog.stdlib.ProcessorFormatter.remove_processors_meta,
            structlog.dev.ConsoleRenderer(colors=True),
        ]

    # Configure structlog to use standard library logging
    structlog.configure(
        processors=[
            structlog.contextvars.merge_contextvars,
            structlog.stdlib.add_log_level,
            structlog.stdlib.add_logger_name,
            structlog.processors.TimeStamper(fmt="iso", utc=True),
            structlog.processors.StackInfoRenderer(),
            structlog.processors.CallsiteParameterAdder(
                [
                    structlog.processors.CallsiteParameter.FILENAME,
                    structlog.processors.CallsiteParameter.LINENO,
                    structlog.processors.CallsiteParameter.FUNC_NAME,
                ]
            ),
            structlog.stdlib.ProcessorFormatter.wrap_for_formatter,
        ],
        wrapper_class=structlog.make_filtering_bound_logger(level),
        context_class=dict,
        logger_factory=structlog.stdlib.LoggerFactory(),
        cache_logger_on_first_use=True,
    )

    # Configure standard library logging to use structlog formatter
    handler = logging.StreamHandler(sys.stdout)
    handler.setFormatter(structlog.stdlib.ProcessorFormatter(processors=formatter_processors))

    # Configure root logger
    root_logger = logging.getLogger()
    root_logger.handlers.clear()
    root_logger.addHandler(handler)
    root_logger.setLevel(level)

    # Configure uvicorn and gunicorn loggers to use the same handler
    for logger_name in ["uvicorn", "uvicorn.access", "uvicorn.error", "gunicorn.access", "gunicorn.error"]:
        logger = logging.getLogger(logger_name)
        logger.handlers.clear()
        logger.addHandler(handler)
        logger.setLevel(level)
        logger.propagate = False

get_logger(name=None)

Get a configured structlog logger instance.

Source code in src/servicekit/logging.py
def get_logger(name: str | None = None) -> Any:
    """Get a configured structlog logger instance."""
    return structlog.get_logger(name)

add_request_context(**context)

Add context variables that will be included in all log messages.

Source code in src/servicekit/logging.py
def add_request_context(**context: Any) -> None:
    """Add context variables that will be included in all log messages."""
    structlog.contextvars.bind_contextvars(**context)

clear_request_context(*keys)

Clear specific context variables.

Source code in src/servicekit/logging.py
def clear_request_context(*keys: str) -> None:
    """Clear specific context variables."""
    structlog.contextvars.unbind_contextvars(*keys)

reset_request_context()

Clear all context variables.

Source code in src/servicekit/logging.py
def reset_request_context() -> None:
    """Clear all context variables."""
    structlog.contextvars.clear_contextvars()

FastAPI Layer

FastAPI-specific components for building web services.

Service Builder

Service builder class for composing FastAPI applications.

BaseServiceBuilder

BaseServiceBuilder

Base service builder providing core FastAPI functionality without module dependencies.

Source code in src/servicekit/api/service_builder.py
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
class BaseServiceBuilder:
    """Base service builder providing core FastAPI functionality without module dependencies."""

    def __init__(
        self,
        *,
        info: ServiceInfo,
        database_url: str = "sqlite+aiosqlite:///:memory:",
        include_error_handlers: bool = True,
        include_logging: bool = False,
    ) -> None:
        """Initialize base service builder with core options."""
        if info.description is None and info.summary is not None:
            # Preserve summary as description for FastAPI metadata if description missing
            self.info = info.model_copy(update={"description": info.summary})
        else:
            self.info = info
        self._title = self.info.display_name
        self._app_description = self.info.summary or self.info.description or ""
        self._version = self.info.version
        self._database_url = database_url
        self._database_instance: Database | None = None
        self._pool_size: int = 5
        self._max_overflow: int = 10
        self._pool_recycle: int = 3600
        self._pool_pre_ping: bool = True
        self._include_error_handlers = include_error_handlers
        self._include_logging = include_logging
        self._health_options: _HealthOptions | None = None
        self._system_options: _SystemOptions | None = None
        self._job_options: _JobOptions | None = None
        self._auth_options: _AuthOptions | None = None
        self._monitoring_options: _MonitoringOptions | None = None
        self._registration_options: _RegistrationOptions | None = None
        self._app_configs: List[App] = []
        self._custom_routers: List[APIRouter] = []
        self._dependency_overrides: Dict[DependencyOverride, DependencyOverride] = {}
        self._startup_hooks: List[LifecycleHook] = []
        self._shutdown_hooks: List[LifecycleHook] = []

    # --------------------------------------------------------------------- Fluent configuration

    def with_database(
        self,
        url_or_instance: str | Database | None = None,
        *,
        pool_size: int = 5,
        max_overflow: int = 10,
        pool_recycle: int = 3600,
        pool_pre_ping: bool = True,
    ) -> Self:
        """Configure database with URL string, Database instance, or default in-memory SQLite."""
        if isinstance(url_or_instance, Database):
            # Pre-configured instance provided
            self._database_instance = url_or_instance
            return self  # Skip pool configuration for instances
        elif isinstance(url_or_instance, str):
            # String URL provided
            self._database_url = url_or_instance
        elif url_or_instance is None:
            # Default: in-memory SQLite
            self._database_url = "sqlite+aiosqlite:///:memory:"
        else:
            raise TypeError(
                f"Expected str, Database, or None, got {type(url_or_instance).__name__}. "
                "Use .with_database() for default, .with_database('url') for custom URL, "
                "or .with_database(db_instance) for pre-configured database."
            )

        # Configure pool settings (only applies to URL-based databases)
        self._pool_size = pool_size
        self._max_overflow = max_overflow
        self._pool_recycle = pool_recycle
        self._pool_pre_ping = pool_pre_ping
        return self

    def with_landing_page(self) -> Self:
        """Enable landing page at root path."""
        return self.with_app(("servicekit.api", "apps/landing"))

    def with_logging(self, enabled: bool = True) -> Self:
        """Enable structured logging with request tracing."""
        self._include_logging = enabled
        return self

    def with_health(
        self,
        *,
        prefix: str = "/health",
        tags: List[str] | None = None,
        checks: dict[str, HealthCheck] | None = None,
        include_database_check: bool = True,
    ) -> Self:
        """Add health check endpoint with optional custom checks."""
        health_checks = checks or {}

        if include_database_check:
            health_checks["database"] = self._create_database_health_check()

        self._health_options = _HealthOptions(
            prefix=prefix,
            tags=list(tags) if tags is not None else ["Observability"],
            checks=health_checks,
        )
        return self

    def with_system(
        self,
        *,
        prefix: str = "/api/v1/system",
        tags: List[str] | None = None,
    ) -> Self:
        """Add system info endpoint."""
        self._system_options = _SystemOptions(
            prefix=prefix,
            tags=list(tags) if tags is not None else ["Service"],
        )
        return self

    def with_jobs(
        self,
        *,
        prefix: str = "/api/v1/jobs",
        tags: List[str] | None = None,
        max_concurrency: int | None = None,
    ) -> Self:
        """Add job scheduler endpoints."""
        self._job_options = _JobOptions(
            prefix=prefix,
            tags=list(tags) if tags is not None else ["Jobs"],
            max_concurrency=max_concurrency,
        )
        return self

    def with_auth(
        self,
        *,
        api_keys: List[str] | None = None,
        api_key_file: str | None = None,
        env_var: str = "SERVICEKIT_API_KEYS",
        header_name: str = "X-API-Key",
        unauthenticated_paths: List[str] | None = None,
    ) -> Self:
        """Enable API key authentication."""
        keys: set[str] = set()
        auth_source: str = ""  # Track source for later logging

        # Priority 1: Direct list (examples/dev)
        if api_keys is not None:
            keys = set(api_keys)
            auth_source = "direct_keys"

        # Priority 2: File (Docker secrets)
        elif api_key_file is not None:
            keys = load_api_keys_from_file(api_key_file)
            auth_source = f"file:{api_key_file}"

        # Priority 3: Environment variable (default)
        else:
            keys = load_api_keys_from_env(env_var)
            if keys:
                auth_source = f"env:{env_var}"
            else:
                auth_source = f"env:{env_var}:empty"

        if not keys:
            raise ValueError("No API keys configured. Provide api_keys, api_key_file, or set environment variable.")

        # Default unauthenticated paths
        default_unauth = {"/docs", "/redoc", "/openapi.json", "/health", "/"}
        unauth_set = set(unauthenticated_paths) if unauthenticated_paths else default_unauth

        self._auth_options = _AuthOptions(
            api_keys=keys,
            header_name=header_name,
            unauthenticated_paths=unauth_set,
            source=auth_source,
        )
        return self

    def with_monitoring(
        self,
        *,
        prefix: str = "/metrics",
        tags: List[str] | None = None,
        service_name: str | None = None,
        enable_traces: bool = False,
    ) -> Self:
        """Enable OpenTelemetry monitoring with Prometheus endpoint and auto-instrumentation."""
        self._monitoring_options = _MonitoringOptions(
            prefix=prefix,
            tags=list(tags) if tags is not None else ["Observability"],
            service_name=service_name,
            enable_traces=enable_traces,
        )
        return self

    def with_registration(
        self,
        *,
        orchestrator_url: str | None = None,
        host: str | None = None,
        port: int | None = None,
        orchestrator_url_env: str = "SERVICEKIT_ORCHESTRATOR_URL",
        host_env: str = "SERVICEKIT_HOST",
        port_env: str = "SERVICEKIT_PORT",
        max_retries: int = 5,
        retry_delay: float = 2.0,
        fail_on_error: bool = False,
        timeout: float = 10.0,
    ) -> Self:
        """Enable service registration with orchestrator for service discovery."""
        self._registration_options = _RegistrationOptions(
            orchestrator_url=orchestrator_url,
            host=host,
            port=port,
            orchestrator_url_env=orchestrator_url_env,
            host_env=host_env,
            port_env=port_env,
            max_retries=max_retries,
            retry_delay=retry_delay,
            fail_on_error=fail_on_error,
            timeout=timeout,
        )
        return self

    def with_app(self, path: str | Path | tuple[str, str], prefix: str | None = None) -> Self:
        """Register static app from filesystem path or package resource tuple."""
        app = AppLoader.load(path, prefix=prefix)
        self._app_configs.append(app)
        return self

    def with_apps(self, path: str | Path | tuple[str, str]) -> Self:
        """Auto-discover and register all apps in directory."""
        apps = AppLoader.discover(path)
        self._app_configs.extend(apps)
        return self

    def include_router(self, router: APIRouter) -> Self:
        """Include a custom router."""
        self._custom_routers.append(router)
        return self

    def override_dependency(self, dependency: DependencyOverride, override: DependencyOverride) -> Self:
        """Override a dependency for testing or customization."""
        self._dependency_overrides[dependency] = override
        return self

    def on_startup(self, hook: LifecycleHook) -> Self:
        """Register a startup hook."""
        self._startup_hooks.append(hook)
        return self

    def on_shutdown(self, hook: LifecycleHook) -> Self:
        """Register a shutdown hook."""
        self._shutdown_hooks.append(hook)
        return self

    # --------------------------------------------------------------------- Build mechanics

    def build(self) -> FastAPI:
        """Build and configure the FastAPI application."""
        self._validate_configuration()
        self._validate_module_configuration()  # Extension point for subclasses

        lifespan = self._build_lifespan()
        app = FastAPI(
            title=self._title,
            description=self._app_description,
            version=self._version,
            lifespan=lifespan,
        )
        app.state.database_url = self._database_url

        # Override schema generation to clean up generic type names
        app.openapi = self._create_openapi_customizer(app)  # type: ignore[method-assign]

        if self._include_error_handlers:
            add_error_handlers(app)

        if self._include_logging:
            add_logging_middleware(app)

        if self._auth_options:
            app.add_middleware(
                APIKeyMiddleware,
                api_keys=self._auth_options.api_keys,
                header_name=self._auth_options.header_name,
                unauthenticated_paths=self._auth_options.unauthenticated_paths,
            )
            # Store auth_source for logging during startup
            app.state.auth_source = self._auth_options.source
            app.state.auth_key_count = len(self._auth_options.api_keys)

        if self._health_options:
            health_router = HealthRouter.create(
                prefix=self._health_options.prefix,
                tags=self._health_options.tags,
                checks=self._health_options.checks,
            )
            app.include_router(health_router)

        if self._system_options:
            system_router = SystemRouter.create(
                prefix=self._system_options.prefix,
                tags=self._system_options.tags,
            )
            app.include_router(system_router)

        if self._job_options:
            job_router = JobRouter.create(
                prefix=self._job_options.prefix,
                tags=self._job_options.tags,
                scheduler_factory=get_scheduler,
            )
            app.include_router(job_router)

        if self._monitoring_options:
            from .monitoring import setup_monitoring

            metric_reader = setup_monitoring(
                app,
                service_name=self._monitoring_options.service_name,
                enable_traces=self._monitoring_options.enable_traces,
            )
            metrics_router = MetricsRouter.create(
                prefix=self._monitoring_options.prefix,
                tags=self._monitoring_options.tags,
                metric_reader=metric_reader,
            )
            app.include_router(metrics_router)

        # Extension point for module-specific routers
        self._register_module_routers(app)

        for router in self._custom_routers:
            app.include_router(router)

        # Install route endpoints BEFORE mounting apps (routes take precedence over mounts)
        self._install_info_endpoint(app, info=self.info)

        # Mount apps AFTER all routes (apps act as catch-all for unmatched paths)
        if self._app_configs:
            from fastapi.staticfiles import StaticFiles

            # Collect all router prefixes to exclude from redirect middleware
            # This ensures routes take precedence over app mounts
            router_prefixes = set()
            if self._health_options:
                router_prefixes.add(self._health_options.prefix)
            if self._system_options:
                router_prefixes.add(self._system_options.prefix)
            if self._job_options:
                router_prefixes.add(self._job_options.prefix)
            if self._monitoring_options:
                router_prefixes.add(self._monitoring_options.prefix)
            for router in self._custom_routers:
                if hasattr(router, "prefix") and router.prefix:
                    router_prefixes.add(router.prefix)

            # Add middleware to handle trailing slash redirects for app prefixes
            # Skip prefixes that are already claimed by routes (routes take precedence)
            from .middleware import AppPrefixRedirectMiddleware

            app_prefixes = [
                cfg.prefix for cfg in self._app_configs if cfg.prefix != "/" and cfg.prefix not in router_prefixes
            ]
            if app_prefixes:
                app.add_middleware(AppPrefixRedirectMiddleware, app_prefixes=app_prefixes)

            # Mount all apps
            for app_config in self._app_configs:
                static_files = StaticFiles(directory=str(app_config.directory), html=True)
                app.mount(app_config.prefix, static_files, name=f"app_{app_config.manifest.name}")
                logger.info(
                    "app.mounted",
                    name=app_config.manifest.name,
                    prefix=app_config.prefix,
                    directory=str(app_config.directory),
                    is_package=app_config.is_package,
                )

        # Initialize app manager for metadata queries (always, even if no apps)
        from .app import AppManager
        from .dependencies import set_app_manager

        app_manager = AppManager(self._app_configs)
        set_app_manager(app_manager)

        for dependency, override in self._dependency_overrides.items():
            app.dependency_overrides[dependency] = override

        return app

    # --------------------------------------------------------------------- Extension points

    def _validate_module_configuration(self) -> None:
        """Extension point for module-specific validation (override in subclasses)."""
        pass

    def _register_module_routers(self, app: FastAPI) -> None:
        """Extension point for registering module-specific routers (override in subclasses)."""
        pass

    # --------------------------------------------------------------------- Core helpers

    def _validate_configuration(self) -> None:
        """Validate core configuration."""
        # Validate health check names don't contain invalid characters
        if self._health_options:
            for name in self._health_options.checks.keys():
                if not name.replace("_", "").replace("-", "").isalnum():
                    raise ValueError(
                        f"Health check name '{name}' contains invalid characters. "
                        "Only alphanumeric characters, underscores, and hyphens are allowed."
                    )

        # Validate app configurations
        if self._app_configs:
            # Deduplicate apps with same prefix (last one wins)
            # This allows overriding apps, especially useful for root prefix "/"
            seen_prefixes: dict[str, int] = {}  # prefix -> last index
            for i, app in enumerate(self._app_configs):
                if app.prefix in seen_prefixes:
                    # Log warning about override
                    prev_idx = seen_prefixes[app.prefix]
                    prev_app = self._app_configs[prev_idx]
                    logger.warning(
                        "app.prefix.override",
                        prefix=app.prefix,
                        replaced_app=prev_app.manifest.name,
                        new_app=app.manifest.name,
                    )
                seen_prefixes[app.prefix] = i

            # Keep only the last app for each prefix
            self._app_configs = [self._app_configs[i] for i in sorted(set(seen_prefixes.values()))]

            # Sort so root mounts are last (most specific paths mounted first)
            # This ensures FastAPI matches more specific routes before catch-all root
            # Sorting: (is_root, -path_length, path) ensures longer paths before shorter, root last
            self._app_configs.sort(key=lambda app: (app.prefix == "/", -len(app.prefix), app.prefix))

            # Validate that non-root prefixes don't have duplicates (shouldn't happen after dedup, but safety check)
            prefixes = [app.prefix for app in self._app_configs]
            if len(prefixes) != len(set(prefixes)):
                raise ValueError("Internal error: duplicate prefixes after deduplication")

    def _build_lifespan(self) -> LifespanFactory:
        """Build lifespan context manager for app startup/shutdown."""
        database_url = self._database_url
        database_instance = self._database_instance
        pool_size = self._pool_size
        max_overflow = self._max_overflow
        pool_recycle = self._pool_recycle
        pool_pre_ping = self._pool_pre_ping
        job_options = self._job_options
        include_logging = self._include_logging
        registration_options = self._registration_options
        info = self.info
        startup_hooks = list(self._startup_hooks)
        shutdown_hooks = list(self._shutdown_hooks)

        @asynccontextmanager
        async def lifespan(app: FastAPI) -> AsyncIterator[None]:
            # Configure logging if enabled
            if include_logging:
                configure_logging()

            # Use injected database or create new one from URL
            if database_instance is not None:
                database = database_instance
                should_manage_lifecycle = False
            else:
                # Create appropriate database type based on URL
                if "sqlite" in database_url.lower():
                    database = SqliteDatabase(
                        database_url,
                        pool_size=pool_size,
                        max_overflow=max_overflow,
                        pool_recycle=pool_recycle,
                        pool_pre_ping=pool_pre_ping,
                    )
                else:
                    database = Database(
                        database_url,
                        pool_size=pool_size,
                        max_overflow=max_overflow,
                        pool_recycle=pool_recycle,
                        pool_pre_ping=pool_pre_ping,
                    )
                should_manage_lifecycle = True

            # Always initialize database (safe to call multiple times)
            await database.init()

            set_database(database)
            app.state.database = database

            # Initialize scheduler if jobs are enabled
            if job_options is not None:
                from servicekit.scheduler import AIOJobScheduler

                scheduler = AIOJobScheduler(max_concurrency=job_options.max_concurrency)
                set_scheduler(scheduler)
                app.state.scheduler = scheduler

            # Log auth configuration after logging is configured
            if hasattr(app.state, "auth_source"):
                auth_source = app.state.auth_source
                key_count = app.state.auth_key_count

                if auth_source == "direct_keys":
                    logger.warning(
                        "auth.direct_keys",
                        message="Using direct API keys - not recommended for production",
                        count=key_count,
                    )
                elif auth_source.startswith("file:"):
                    file_path = auth_source.split(":", 1)[1]
                    logger.info("auth.loaded_from_file", file=file_path, count=key_count)
                elif auth_source.startswith("env:"):
                    parts = auth_source.split(":", 2)
                    env_var = parts[1]
                    if len(parts) > 2 and parts[2] == "empty":
                        logger.warning(
                            "auth.no_keys",
                            message=f"No API keys found in {env_var}. Service will reject all requests.",
                        )
                    else:
                        logger.info("auth.loaded_from_env", env_var=env_var, count=key_count)

            for hook in startup_hooks:
                await hook(app)

            # Register with orchestrator if enabled
            if registration_options is not None:
                from .registration import register_service

                await register_service(
                    orchestrator_url=registration_options.orchestrator_url,
                    host=registration_options.host,
                    port=registration_options.port,
                    info=info,
                    orchestrator_url_env=registration_options.orchestrator_url_env,
                    host_env=registration_options.host_env,
                    port_env=registration_options.port_env,
                    max_retries=registration_options.max_retries,
                    retry_delay=registration_options.retry_delay,
                    fail_on_error=registration_options.fail_on_error,
                    timeout=registration_options.timeout,
                )

            try:
                yield
            finally:
                for hook in shutdown_hooks:
                    await hook(app)
                app.state.database = None

                # Dispose database only if we created it
                if should_manage_lifecycle:
                    await database.dispose()

        return lifespan

    @staticmethod
    def _create_database_health_check() -> HealthCheck:
        """Create database connectivity health check."""

        async def check_database() -> tuple[HealthState, str | None]:
            try:
                db = get_database()
                async with db.session() as session:
                    # Simple connectivity check - execute a trivial query
                    await session.execute(text("SELECT 1"))
                    return (HealthState.HEALTHY, None)
            except Exception as e:
                return (HealthState.UNHEALTHY, f"Database connection failed: {str(e)}")

        return check_database

    @staticmethod
    def _create_openapi_customizer(app: FastAPI) -> Callable[[], dict[str, Any]]:
        """Create OpenAPI schema customizer that cleans up generic type names."""

        def custom_openapi() -> dict[str, Any]:
            if app.openapi_schema:
                return app.openapi_schema

            from fastapi.openapi.utils import get_openapi

            openapi_schema = get_openapi(
                title=app.title,
                version=app.version,
                description=app.description,
                routes=app.routes,
            )

            # Clean up schema titles by removing generic type parameters
            if "components" in openapi_schema and "schemas" in openapi_schema["components"]:
                schemas = openapi_schema["components"]["schemas"]
                cleaned_schemas: dict[str, Any] = {}

                for schema_name, schema_def in schemas.items():
                    # Remove generic type parameters from schema names
                    clean_name = re.sub(r"\[.*?\]", "", schema_name)
                    # If title exists in schema, clean it too
                    if isinstance(schema_def, dict) and "title" in schema_def:
                        schema_def["title"] = re.sub(r"\[.*?\]", "", schema_def["title"])
                    cleaned_schemas[clean_name] = schema_def

                openapi_schema["components"]["schemas"] = cleaned_schemas

                # Update all $ref pointers to use cleaned names
                def clean_refs(obj: Any) -> Any:
                    if isinstance(obj, dict):
                        if "$ref" in obj:
                            obj["$ref"] = re.sub(r"\[.*?\]", "", obj["$ref"])
                        for value in obj.values():
                            clean_refs(value)
                    elif isinstance(obj, list):
                        for item in obj:
                            clean_refs(item)

                clean_refs(openapi_schema)

            app.openapi_schema = openapi_schema
            return app.openapi_schema

        return custom_openapi

    @staticmethod
    def _install_info_endpoint(app: FastAPI, *, info: ServiceInfo) -> None:
        """Install service info endpoint."""
        info_type = type(info)

        @app.get("/api/v1/info", tags=["Service"], include_in_schema=True, response_model=info_type)
        async def get_info() -> ServiceInfo:
            return info

    # --------------------------------------------------------------------- Convenience

    @classmethod
    def create(cls, *, info: ServiceInfo, **kwargs: Any) -> FastAPI:
        """Create and build a FastAPI application in one call."""
        return cls(info=info, **kwargs).build()

Functions

__init__(*, info, database_url='sqlite+aiosqlite:///:memory:', include_error_handlers=True, include_logging=False)

Initialize base service builder with core options.

Source code in src/servicekit/api/service_builder.py
def __init__(
    self,
    *,
    info: ServiceInfo,
    database_url: str = "sqlite+aiosqlite:///:memory:",
    include_error_handlers: bool = True,
    include_logging: bool = False,
) -> None:
    """Initialize base service builder with core options."""
    if info.description is None and info.summary is not None:
        # Preserve summary as description for FastAPI metadata if description missing
        self.info = info.model_copy(update={"description": info.summary})
    else:
        self.info = info
    self._title = self.info.display_name
    self._app_description = self.info.summary or self.info.description or ""
    self._version = self.info.version
    self._database_url = database_url
    self._database_instance: Database | None = None
    self._pool_size: int = 5
    self._max_overflow: int = 10
    self._pool_recycle: int = 3600
    self._pool_pre_ping: bool = True
    self._include_error_handlers = include_error_handlers
    self._include_logging = include_logging
    self._health_options: _HealthOptions | None = None
    self._system_options: _SystemOptions | None = None
    self._job_options: _JobOptions | None = None
    self._auth_options: _AuthOptions | None = None
    self._monitoring_options: _MonitoringOptions | None = None
    self._registration_options: _RegistrationOptions | None = None
    self._app_configs: List[App] = []
    self._custom_routers: List[APIRouter] = []
    self._dependency_overrides: Dict[DependencyOverride, DependencyOverride] = {}
    self._startup_hooks: List[LifecycleHook] = []
    self._shutdown_hooks: List[LifecycleHook] = []

with_database(url_or_instance=None, *, pool_size=5, max_overflow=10, pool_recycle=3600, pool_pre_ping=True)

Configure database with URL string, Database instance, or default in-memory SQLite.

Source code in src/servicekit/api/service_builder.py
def with_database(
    self,
    url_or_instance: str | Database | None = None,
    *,
    pool_size: int = 5,
    max_overflow: int = 10,
    pool_recycle: int = 3600,
    pool_pre_ping: bool = True,
) -> Self:
    """Configure database with URL string, Database instance, or default in-memory SQLite."""
    if isinstance(url_or_instance, Database):
        # Pre-configured instance provided
        self._database_instance = url_or_instance
        return self  # Skip pool configuration for instances
    elif isinstance(url_or_instance, str):
        # String URL provided
        self._database_url = url_or_instance
    elif url_or_instance is None:
        # Default: in-memory SQLite
        self._database_url = "sqlite+aiosqlite:///:memory:"
    else:
        raise TypeError(
            f"Expected str, Database, or None, got {type(url_or_instance).__name__}. "
            "Use .with_database() for default, .with_database('url') for custom URL, "
            "or .with_database(db_instance) for pre-configured database."
        )

    # Configure pool settings (only applies to URL-based databases)
    self._pool_size = pool_size
    self._max_overflow = max_overflow
    self._pool_recycle = pool_recycle
    self._pool_pre_ping = pool_pre_ping
    return self

with_landing_page()

Enable landing page at root path.

Source code in src/servicekit/api/service_builder.py
def with_landing_page(self) -> Self:
    """Enable landing page at root path."""
    return self.with_app(("servicekit.api", "apps/landing"))

with_logging(enabled=True)

Enable structured logging with request tracing.

Source code in src/servicekit/api/service_builder.py
def with_logging(self, enabled: bool = True) -> Self:
    """Enable structured logging with request tracing."""
    self._include_logging = enabled
    return self

with_health(*, prefix='/health', tags=None, checks=None, include_database_check=True)

Add health check endpoint with optional custom checks.

Source code in src/servicekit/api/service_builder.py
def with_health(
    self,
    *,
    prefix: str = "/health",
    tags: List[str] | None = None,
    checks: dict[str, HealthCheck] | None = None,
    include_database_check: bool = True,
) -> Self:
    """Add health check endpoint with optional custom checks."""
    health_checks = checks or {}

    if include_database_check:
        health_checks["database"] = self._create_database_health_check()

    self._health_options = _HealthOptions(
        prefix=prefix,
        tags=list(tags) if tags is not None else ["Observability"],
        checks=health_checks,
    )
    return self

with_system(*, prefix='/api/v1/system', tags=None)

Add system info endpoint.

Source code in src/servicekit/api/service_builder.py
def with_system(
    self,
    *,
    prefix: str = "/api/v1/system",
    tags: List[str] | None = None,
) -> Self:
    """Add system info endpoint."""
    self._system_options = _SystemOptions(
        prefix=prefix,
        tags=list(tags) if tags is not None else ["Service"],
    )
    return self

with_jobs(*, prefix='/api/v1/jobs', tags=None, max_concurrency=None)

Add job scheduler endpoints.

Source code in src/servicekit/api/service_builder.py
def with_jobs(
    self,
    *,
    prefix: str = "/api/v1/jobs",
    tags: List[str] | None = None,
    max_concurrency: int | None = None,
) -> Self:
    """Add job scheduler endpoints."""
    self._job_options = _JobOptions(
        prefix=prefix,
        tags=list(tags) if tags is not None else ["Jobs"],
        max_concurrency=max_concurrency,
    )
    return self

with_auth(*, api_keys=None, api_key_file=None, env_var='SERVICEKIT_API_KEYS', header_name='X-API-Key', unauthenticated_paths=None)

Enable API key authentication.

Source code in src/servicekit/api/service_builder.py
def with_auth(
    self,
    *,
    api_keys: List[str] | None = None,
    api_key_file: str | None = None,
    env_var: str = "SERVICEKIT_API_KEYS",
    header_name: str = "X-API-Key",
    unauthenticated_paths: List[str] | None = None,
) -> Self:
    """Enable API key authentication."""
    keys: set[str] = set()
    auth_source: str = ""  # Track source for later logging

    # Priority 1: Direct list (examples/dev)
    if api_keys is not None:
        keys = set(api_keys)
        auth_source = "direct_keys"

    # Priority 2: File (Docker secrets)
    elif api_key_file is not None:
        keys = load_api_keys_from_file(api_key_file)
        auth_source = f"file:{api_key_file}"

    # Priority 3: Environment variable (default)
    else:
        keys = load_api_keys_from_env(env_var)
        if keys:
            auth_source = f"env:{env_var}"
        else:
            auth_source = f"env:{env_var}:empty"

    if not keys:
        raise ValueError("No API keys configured. Provide api_keys, api_key_file, or set environment variable.")

    # Default unauthenticated paths
    default_unauth = {"/docs", "/redoc", "/openapi.json", "/health", "/"}
    unauth_set = set(unauthenticated_paths) if unauthenticated_paths else default_unauth

    self._auth_options = _AuthOptions(
        api_keys=keys,
        header_name=header_name,
        unauthenticated_paths=unauth_set,
        source=auth_source,
    )
    return self

with_monitoring(*, prefix='/metrics', tags=None, service_name=None, enable_traces=False)

Enable OpenTelemetry monitoring with Prometheus endpoint and auto-instrumentation.

Source code in src/servicekit/api/service_builder.py
def with_monitoring(
    self,
    *,
    prefix: str = "/metrics",
    tags: List[str] | None = None,
    service_name: str | None = None,
    enable_traces: bool = False,
) -> Self:
    """Enable OpenTelemetry monitoring with Prometheus endpoint and auto-instrumentation."""
    self._monitoring_options = _MonitoringOptions(
        prefix=prefix,
        tags=list(tags) if tags is not None else ["Observability"],
        service_name=service_name,
        enable_traces=enable_traces,
    )
    return self

with_registration(*, orchestrator_url=None, host=None, port=None, orchestrator_url_env='SERVICEKIT_ORCHESTRATOR_URL', host_env='SERVICEKIT_HOST', port_env='SERVICEKIT_PORT', max_retries=5, retry_delay=2.0, fail_on_error=False, timeout=10.0)

Enable service registration with orchestrator for service discovery.

Source code in src/servicekit/api/service_builder.py
def with_registration(
    self,
    *,
    orchestrator_url: str | None = None,
    host: str | None = None,
    port: int | None = None,
    orchestrator_url_env: str = "SERVICEKIT_ORCHESTRATOR_URL",
    host_env: str = "SERVICEKIT_HOST",
    port_env: str = "SERVICEKIT_PORT",
    max_retries: int = 5,
    retry_delay: float = 2.0,
    fail_on_error: bool = False,
    timeout: float = 10.0,
) -> Self:
    """Enable service registration with orchestrator for service discovery."""
    self._registration_options = _RegistrationOptions(
        orchestrator_url=orchestrator_url,
        host=host,
        port=port,
        orchestrator_url_env=orchestrator_url_env,
        host_env=host_env,
        port_env=port_env,
        max_retries=max_retries,
        retry_delay=retry_delay,
        fail_on_error=fail_on_error,
        timeout=timeout,
    )
    return self

with_app(path, prefix=None)

Register static app from filesystem path or package resource tuple.

Source code in src/servicekit/api/service_builder.py
def with_app(self, path: str | Path | tuple[str, str], prefix: str | None = None) -> Self:
    """Register static app from filesystem path or package resource tuple."""
    app = AppLoader.load(path, prefix=prefix)
    self._app_configs.append(app)
    return self

with_apps(path)

Auto-discover and register all apps in directory.

Source code in src/servicekit/api/service_builder.py
def with_apps(self, path: str | Path | tuple[str, str]) -> Self:
    """Auto-discover and register all apps in directory."""
    apps = AppLoader.discover(path)
    self._app_configs.extend(apps)
    return self

include_router(router)

Include a custom router.

Source code in src/servicekit/api/service_builder.py
def include_router(self, router: APIRouter) -> Self:
    """Include a custom router."""
    self._custom_routers.append(router)
    return self

override_dependency(dependency, override)

Override a dependency for testing or customization.

Source code in src/servicekit/api/service_builder.py
def override_dependency(self, dependency: DependencyOverride, override: DependencyOverride) -> Self:
    """Override a dependency for testing or customization."""
    self._dependency_overrides[dependency] = override
    return self

on_startup(hook)

Register a startup hook.

Source code in src/servicekit/api/service_builder.py
def on_startup(self, hook: LifecycleHook) -> Self:
    """Register a startup hook."""
    self._startup_hooks.append(hook)
    return self

on_shutdown(hook)

Register a shutdown hook.

Source code in src/servicekit/api/service_builder.py
def on_shutdown(self, hook: LifecycleHook) -> Self:
    """Register a shutdown hook."""
    self._shutdown_hooks.append(hook)
    return self

build()

Build and configure the FastAPI application.

Source code in src/servicekit/api/service_builder.py
def build(self) -> FastAPI:
    """Build and configure the FastAPI application."""
    self._validate_configuration()
    self._validate_module_configuration()  # Extension point for subclasses

    lifespan = self._build_lifespan()
    app = FastAPI(
        title=self._title,
        description=self._app_description,
        version=self._version,
        lifespan=lifespan,
    )
    app.state.database_url = self._database_url

    # Override schema generation to clean up generic type names
    app.openapi = self._create_openapi_customizer(app)  # type: ignore[method-assign]

    if self._include_error_handlers:
        add_error_handlers(app)

    if self._include_logging:
        add_logging_middleware(app)

    if self._auth_options:
        app.add_middleware(
            APIKeyMiddleware,
            api_keys=self._auth_options.api_keys,
            header_name=self._auth_options.header_name,
            unauthenticated_paths=self._auth_options.unauthenticated_paths,
        )
        # Store auth_source for logging during startup
        app.state.auth_source = self._auth_options.source
        app.state.auth_key_count = len(self._auth_options.api_keys)

    if self._health_options:
        health_router = HealthRouter.create(
            prefix=self._health_options.prefix,
            tags=self._health_options.tags,
            checks=self._health_options.checks,
        )
        app.include_router(health_router)

    if self._system_options:
        system_router = SystemRouter.create(
            prefix=self._system_options.prefix,
            tags=self._system_options.tags,
        )
        app.include_router(system_router)

    if self._job_options:
        job_router = JobRouter.create(
            prefix=self._job_options.prefix,
            tags=self._job_options.tags,
            scheduler_factory=get_scheduler,
        )
        app.include_router(job_router)

    if self._monitoring_options:
        from .monitoring import setup_monitoring

        metric_reader = setup_monitoring(
            app,
            service_name=self._monitoring_options.service_name,
            enable_traces=self._monitoring_options.enable_traces,
        )
        metrics_router = MetricsRouter.create(
            prefix=self._monitoring_options.prefix,
            tags=self._monitoring_options.tags,
            metric_reader=metric_reader,
        )
        app.include_router(metrics_router)

    # Extension point for module-specific routers
    self._register_module_routers(app)

    for router in self._custom_routers:
        app.include_router(router)

    # Install route endpoints BEFORE mounting apps (routes take precedence over mounts)
    self._install_info_endpoint(app, info=self.info)

    # Mount apps AFTER all routes (apps act as catch-all for unmatched paths)
    if self._app_configs:
        from fastapi.staticfiles import StaticFiles

        # Collect all router prefixes to exclude from redirect middleware
        # This ensures routes take precedence over app mounts
        router_prefixes = set()
        if self._health_options:
            router_prefixes.add(self._health_options.prefix)
        if self._system_options:
            router_prefixes.add(self._system_options.prefix)
        if self._job_options:
            router_prefixes.add(self._job_options.prefix)
        if self._monitoring_options:
            router_prefixes.add(self._monitoring_options.prefix)
        for router in self._custom_routers:
            if hasattr(router, "prefix") and router.prefix:
                router_prefixes.add(router.prefix)

        # Add middleware to handle trailing slash redirects for app prefixes
        # Skip prefixes that are already claimed by routes (routes take precedence)
        from .middleware import AppPrefixRedirectMiddleware

        app_prefixes = [
            cfg.prefix for cfg in self._app_configs if cfg.prefix != "/" and cfg.prefix not in router_prefixes
        ]
        if app_prefixes:
            app.add_middleware(AppPrefixRedirectMiddleware, app_prefixes=app_prefixes)

        # Mount all apps
        for app_config in self._app_configs:
            static_files = StaticFiles(directory=str(app_config.directory), html=True)
            app.mount(app_config.prefix, static_files, name=f"app_{app_config.manifest.name}")
            logger.info(
                "app.mounted",
                name=app_config.manifest.name,
                prefix=app_config.prefix,
                directory=str(app_config.directory),
                is_package=app_config.is_package,
            )

    # Initialize app manager for metadata queries (always, even if no apps)
    from .app import AppManager
    from .dependencies import set_app_manager

    app_manager = AppManager(self._app_configs)
    set_app_manager(app_manager)

    for dependency, override in self._dependency_overrides.items():
        app.dependency_overrides[dependency] = override

    return app

create(*, info, **kwargs) classmethod

Create and build a FastAPI application in one call.

Source code in src/servicekit/api/service_builder.py
@classmethod
def create(cls, *, info: ServiceInfo, **kwargs: Any) -> FastAPI:
    """Create and build a FastAPI application in one call."""
    return cls(info=info, **kwargs).build()

ServiceInfo

ServiceInfo

Bases: BaseModel

Service metadata for FastAPI application.

Source code in src/servicekit/api/service_builder.py
class ServiceInfo(BaseModel):
    """Service metadata for FastAPI application."""

    display_name: str
    version: str = "1.0.0"
    summary: str | None = None
    description: str | None = None
    contact: dict[str, str] | None = None
    license_info: dict[str, str] | None = None

    model_config = ConfigDict(extra="forbid")

Routers

Base router classes and generic routers.

Router

Router

Bases: ABC

Base class for FastAPI routers.

Source code in src/servicekit/api/router.py
class Router(ABC):
    """Base class for FastAPI routers."""

    default_response_model_exclude_none: bool = False

    def __init__(self, prefix: str, tags: Sequence[str], **kwargs: Any) -> None:
        """Initialize router with prefix and tags."""
        self.router = APIRouter(prefix=prefix, tags=list(tags), **kwargs)
        self._register_routes()

    @classmethod
    def create(cls, prefix: str, tags: Sequence[str], **kwargs: Any) -> APIRouter:
        """Create a router instance and return the FastAPI router."""
        return cls(prefix=prefix, tags=tags, **kwargs).router

    @abstractmethod
    def _register_routes(self) -> None:
        """Register routes for this router."""
        ...

Functions

__init__(prefix, tags, **kwargs)

Initialize router with prefix and tags.

Source code in src/servicekit/api/router.py
def __init__(self, prefix: str, tags: Sequence[str], **kwargs: Any) -> None:
    """Initialize router with prefix and tags."""
    self.router = APIRouter(prefix=prefix, tags=list(tags), **kwargs)
    self._register_routes()

create(prefix, tags, **kwargs) classmethod

Create a router instance and return the FastAPI router.

Source code in src/servicekit/api/router.py
@classmethod
def create(cls, prefix: str, tags: Sequence[str], **kwargs: Any) -> APIRouter:
    """Create a router instance and return the FastAPI router."""
    return cls(prefix=prefix, tags=tags, **kwargs).router

CrudRouter

CrudRouter

Bases: Router

Router base class for standard REST CRUD operations.

Source code in src/servicekit/api/crud.py
class CrudRouter[InSchemaT: BaseModel, OutSchemaT: BaseModel](Router):
    """Router base class for standard REST CRUD operations."""

    def __init__(
        self,
        prefix: str,
        tags: list[str],
        entity_in_type: type[InSchemaT],
        entity_out_type: type[OutSchemaT],
        manager_factory: ManagerFactory[InSchemaT, OutSchemaT],
        *,
        permissions: CrudPermissions | None = None,
        **kwargs: Any,
    ) -> None:
        """Initialize CRUD router with entity types and manager factory."""
        self.manager_factory = manager_factory
        self.entity_in_type = entity_in_type
        self.entity_out_type = entity_out_type
        self._permissions = permissions or CrudPermissions()
        super().__init__(prefix=prefix, tags=tags, **kwargs)

    def _register_routes(self) -> None:
        """Register CRUD routes based on permissions."""
        manager_dependency, manager_annotation = self._manager_dependency()
        perms = self._permissions
        if perms.create:
            self._register_create_route(manager_dependency, manager_annotation)
        if perms.read:
            self._register_find_all_route(manager_dependency, manager_annotation)
            self._register_find_by_id_route(manager_dependency, manager_annotation)
            self._register_schema_route()
        if perms.update:
            self._register_update_route(manager_dependency, manager_annotation)
        if perms.delete:
            self._register_delete_route(manager_dependency, manager_annotation)

    def register_entity_operation(
        self,
        name: str,
        handler: Callable[..., Any],
        *,
        http_method: str = "GET",
        response_model: type[Any] | None = None,
        status_code: int | None = None,
        summary: str | None = None,
        description: str | None = None,
    ) -> None:
        """Register a custom entity operation with $ prefix.

        Entity operations are automatically inserted before generic {entity_id} routes
        to ensure proper route matching (e.g., /{entity_id}/$validate should match
        before /{entity_id}).
        """
        route = f"/{{entity_id}}/${name}"
        route_kwargs: dict[str, Any] = {}

        if response_model is not None:
            route_kwargs["response_model"] = response_model
        if status_code is not None:
            route_kwargs["status_code"] = status_code
        if summary is not None:
            route_kwargs["summary"] = summary
        if description is not None:
            route_kwargs["description"] = description

        # Register the route with the appropriate HTTP method
        http_method_lower = http_method.lower()
        if http_method_lower == "get":
            self.router.get(route, **route_kwargs)(handler)
        elif http_method_lower == "post":
            self.router.post(route, **route_kwargs)(handler)
        elif http_method_lower == "put":
            self.router.put(route, **route_kwargs)(handler)
        elif http_method_lower == "patch":
            self.router.patch(route, **route_kwargs)(handler)
        elif http_method_lower == "delete":
            self.router.delete(route, **route_kwargs)(handler)
        else:
            raise ValueError(f"Unsupported HTTP method: {http_method}")

        # Move the just-added route to before generic parametric routes
        # Entity operations like /{entity_id}/$validate should match before /{entity_id}
        if len(self.router.routes) > 1:
            new_route = self.router.routes.pop()
            insert_index = self._find_generic_parametric_route_index()
            self.router.routes.insert(insert_index, new_route)

    def register_collection_operation(
        self,
        name: str,
        handler: Callable[..., Any],
        *,
        http_method: str = "GET",
        response_model: type[Any] | None = None,
        status_code: int | None = None,
        summary: str | None = None,
        description: str | None = None,
    ) -> None:
        """Register a custom collection operation with $ prefix.

        Collection operations are automatically inserted before parametric {entity_id} routes
        to ensure proper route matching (e.g., /$stats should match before /{entity_id}).
        """
        route = f"/${name}"
        route_kwargs: dict[str, Any] = {}

        if response_model is not None:
            route_kwargs["response_model"] = response_model
        if status_code is not None:
            route_kwargs["status_code"] = status_code
        if summary is not None:
            route_kwargs["summary"] = summary
        if description is not None:
            route_kwargs["description"] = description

        # Register the route with the appropriate HTTP method
        http_method_lower = http_method.lower()
        if http_method_lower == "get":
            self.router.get(route, **route_kwargs)(handler)
        elif http_method_lower == "post":
            self.router.post(route, **route_kwargs)(handler)
        elif http_method_lower == "put":
            self.router.put(route, **route_kwargs)(handler)
        elif http_method_lower == "patch":
            self.router.patch(route, **route_kwargs)(handler)
        elif http_method_lower == "delete":
            self.router.delete(route, **route_kwargs)(handler)
        else:
            raise ValueError(f"Unsupported HTTP method: {http_method}")

        # Move the just-added route to before parametric routes
        # FastAPI appends to routes list, so the last route is the one we just added
        if len(self.router.routes) > 1:
            new_route = self.router.routes.pop()  # Remove the route we just added
            # Find the first parametric route and insert before it
            insert_index = self._find_parametric_route_index()
            self.router.routes.insert(insert_index, new_route)

    # Route registration helpers --------------------------------------

    def _register_create_route(self, manager_dependency: Any, manager_annotation: Any) -> None:
        entity_in_annotation: Any = self.entity_in_type
        entity_out_annotation: Any = self.entity_out_type
        router_prefix = self.router.prefix

        @self.router.post("", status_code=status.HTTP_201_CREATED, response_model=entity_out_annotation)
        async def create(
            entity_in: InSchemaT,
            request: Request,
            response: Response,
            manager: Manager[InSchemaT, OutSchemaT, ULID] = manager_dependency,
        ) -> OutSchemaT:
            from .utilities import build_location_url

            created_entity = await manager.save(entity_in)
            entity_id = getattr(created_entity, "id")
            response.headers["Location"] = build_location_url(request, f"{router_prefix}/{entity_id}")
            return created_entity

        self._annotate_manager(create, manager_annotation)
        create.__annotations__["entity_in"] = entity_in_annotation
        create.__annotations__["return"] = entity_out_annotation

    def _register_find_all_route(self, manager_dependency: Any, manager_annotation: Any) -> None:
        entity_out_annotation: Any = self.entity_out_type
        collection_response_model: Any = list[entity_out_annotation] | PaginatedResponse[entity_out_annotation]

        @self.router.get("", response_model=collection_response_model)
        async def find_all(
            page: int | None = None,
            size: int | None = None,
            manager: Manager[InSchemaT, OutSchemaT, ULID] = manager_dependency,
        ) -> list[OutSchemaT] | PaginatedResponse[OutSchemaT]:
            from .pagination import create_paginated_response

            # Pagination is opt-in: both page and size must be provided
            if page is not None and size is not None:
                items, total = await manager.find_paginated(page, size)
                return create_paginated_response(items, total, page, size)
            return await manager.find_all()

        self._annotate_manager(find_all, manager_annotation)
        find_all.__annotations__["return"] = list[entity_out_annotation] | PaginatedResponse[entity_out_annotation]

    def _register_find_by_id_route(self, manager_dependency: Any, manager_annotation: Any) -> None:
        entity_out_annotation: Any = self.entity_out_type
        router_prefix = self.router.prefix

        @self.router.get("/{entity_id}", response_model=entity_out_annotation)
        async def find_by_id(
            entity_id: str,
            manager: Manager[InSchemaT, OutSchemaT, ULID] = manager_dependency,
        ) -> OutSchemaT:
            from servicekit.exceptions import NotFoundError

            ulid_id = self._parse_ulid(entity_id)
            entity = await manager.find_by_id(ulid_id)
            if entity is None:
                raise NotFoundError(
                    f"Entity with id {entity_id} not found",
                    instance=f"{router_prefix}/{entity_id}",
                )
            return entity

        self._annotate_manager(find_by_id, manager_annotation)
        find_by_id.__annotations__["return"] = entity_out_annotation

    def _register_update_route(self, manager_dependency: Any, manager_annotation: Any) -> None:
        entity_in_type = self.entity_in_type
        entity_in_annotation: Any = entity_in_type
        entity_out_annotation: Any = self.entity_out_type
        router_prefix = self.router.prefix

        @self.router.put("/{entity_id}", response_model=entity_out_annotation)
        async def update(
            entity_id: str,
            entity_in: InSchemaT,
            manager: Manager[InSchemaT, OutSchemaT, ULID] = manager_dependency,
        ) -> OutSchemaT:
            from servicekit.exceptions import NotFoundError

            ulid_id = self._parse_ulid(entity_id)
            if not await manager.exists_by_id(ulid_id):
                raise NotFoundError(
                    f"Entity with id {entity_id} not found",
                    instance=f"{router_prefix}/{entity_id}",
                )
            entity_dict = entity_in.model_dump(exclude_unset=True)
            entity_dict["id"] = ulid_id
            entity_with_id = entity_in_type.model_validate(entity_dict)
            return await manager.save(entity_with_id)

        self._annotate_manager(update, manager_annotation)
        update.__annotations__["entity_in"] = entity_in_annotation
        update.__annotations__["return"] = entity_out_annotation

    def _register_delete_route(self, manager_dependency: Any, manager_annotation: Any) -> None:
        router_prefix = self.router.prefix

        @self.router.delete("/{entity_id}", status_code=status.HTTP_204_NO_CONTENT)
        async def delete_by_id(
            entity_id: str,
            manager: Manager[InSchemaT, OutSchemaT, ULID] = manager_dependency,
        ) -> None:
            from servicekit.exceptions import NotFoundError

            ulid_id = self._parse_ulid(entity_id)
            if not await manager.exists_by_id(ulid_id):
                raise NotFoundError(
                    f"Entity with id {entity_id} not found",
                    instance=f"{router_prefix}/{entity_id}",
                )
            await manager.delete_by_id(ulid_id)

        self._annotate_manager(delete_by_id, manager_annotation)

    def _register_schema_route(self) -> None:
        """Register JSON schema endpoint for the entity output type."""
        entity_out_type = self.entity_out_type

        async def get_schema() -> dict[str, Any]:
            return entity_out_type.model_json_schema()

        self.register_collection_operation(
            name="schema",
            handler=get_schema,
            http_method="GET",
            response_model=dict[str, Any],
        )

    # Helper utilities -------------------------------------------------

    def _manager_dependency(self) -> tuple[Any, Any]:
        manager_dependency = Depends(self.manager_factory)
        manager_annotation: Any = Manager[Any, Any, ULID]
        return manager_dependency, manager_annotation

    def _annotate_manager(self, endpoint: Any, manager_annotation: Any) -> None:
        endpoint.__annotations__["manager"] = manager_annotation

    def _parse_ulid(self, entity_id: str) -> ULID:
        from servicekit.exceptions import InvalidULIDError

        try:
            return ULID.from_str(entity_id)
        except ValueError as e:
            raise InvalidULIDError(
                f"Invalid ULID format: {entity_id}",
                instance=f"{self.router.prefix}/{entity_id}",
            ) from e

    def _find_parametric_route_index(self) -> int:
        """Find the index of the first parametric route containing {entity_id}.

        Returns the index where collection operations should be inserted to ensure
        they're matched before parametric routes.
        """
        for i, route in enumerate(self.router.routes):
            route_path = getattr(route, "path", "")
            if "{entity_id}" in route_path:
                return i
        # If no parametric route found, append at the end
        return len(self.router.routes)

    def _find_generic_parametric_route_index(self) -> int:
        """Find the index of the first generic parametric route (/{entity_id} without $).

        Returns the index where entity operations should be inserted to ensure
        they're matched before generic routes like GET/PUT/DELETE /{entity_id}.
        """
        for i, route in enumerate(self.router.routes):
            route_path = getattr(route, "path", "")
            # Match routes like /{entity_id} but not /{entity_id}/$operation
            if "{entity_id}" in route_path and "/$" not in route_path:
                return i
        # If no generic parametric route found, append at the end
        return len(self.router.routes)

Functions

__init__(prefix, tags, entity_in_type, entity_out_type, manager_factory, *, permissions=None, **kwargs)

Initialize CRUD router with entity types and manager factory.

Source code in src/servicekit/api/crud.py
def __init__(
    self,
    prefix: str,
    tags: list[str],
    entity_in_type: type[InSchemaT],
    entity_out_type: type[OutSchemaT],
    manager_factory: ManagerFactory[InSchemaT, OutSchemaT],
    *,
    permissions: CrudPermissions | None = None,
    **kwargs: Any,
) -> None:
    """Initialize CRUD router with entity types and manager factory."""
    self.manager_factory = manager_factory
    self.entity_in_type = entity_in_type
    self.entity_out_type = entity_out_type
    self._permissions = permissions or CrudPermissions()
    super().__init__(prefix=prefix, tags=tags, **kwargs)

register_entity_operation(name, handler, *, http_method='GET', response_model=None, status_code=None, summary=None, description=None)

Register a custom entity operation with $ prefix.

Entity operations are automatically inserted before generic {entity_id} routes to ensure proper route matching (e.g., /{entity_id}/$validate should match before /{entity_id}).

Source code in src/servicekit/api/crud.py
def register_entity_operation(
    self,
    name: str,
    handler: Callable[..., Any],
    *,
    http_method: str = "GET",
    response_model: type[Any] | None = None,
    status_code: int | None = None,
    summary: str | None = None,
    description: str | None = None,
) -> None:
    """Register a custom entity operation with $ prefix.

    Entity operations are automatically inserted before generic {entity_id} routes
    to ensure proper route matching (e.g., /{entity_id}/$validate should match
    before /{entity_id}).
    """
    route = f"/{{entity_id}}/${name}"
    route_kwargs: dict[str, Any] = {}

    if response_model is not None:
        route_kwargs["response_model"] = response_model
    if status_code is not None:
        route_kwargs["status_code"] = status_code
    if summary is not None:
        route_kwargs["summary"] = summary
    if description is not None:
        route_kwargs["description"] = description

    # Register the route with the appropriate HTTP method
    http_method_lower = http_method.lower()
    if http_method_lower == "get":
        self.router.get(route, **route_kwargs)(handler)
    elif http_method_lower == "post":
        self.router.post(route, **route_kwargs)(handler)
    elif http_method_lower == "put":
        self.router.put(route, **route_kwargs)(handler)
    elif http_method_lower == "patch":
        self.router.patch(route, **route_kwargs)(handler)
    elif http_method_lower == "delete":
        self.router.delete(route, **route_kwargs)(handler)
    else:
        raise ValueError(f"Unsupported HTTP method: {http_method}")

    # Move the just-added route to before generic parametric routes
    # Entity operations like /{entity_id}/$validate should match before /{entity_id}
    if len(self.router.routes) > 1:
        new_route = self.router.routes.pop()
        insert_index = self._find_generic_parametric_route_index()
        self.router.routes.insert(insert_index, new_route)

register_collection_operation(name, handler, *, http_method='GET', response_model=None, status_code=None, summary=None, description=None)

Register a custom collection operation with $ prefix.

Collection operations are automatically inserted before parametric {entity_id} routes to ensure proper route matching (e.g., /$stats should match before /{entity_id}).

Source code in src/servicekit/api/crud.py
def register_collection_operation(
    self,
    name: str,
    handler: Callable[..., Any],
    *,
    http_method: str = "GET",
    response_model: type[Any] | None = None,
    status_code: int | None = None,
    summary: str | None = None,
    description: str | None = None,
) -> None:
    """Register a custom collection operation with $ prefix.

    Collection operations are automatically inserted before parametric {entity_id} routes
    to ensure proper route matching (e.g., /$stats should match before /{entity_id}).
    """
    route = f"/${name}"
    route_kwargs: dict[str, Any] = {}

    if response_model is not None:
        route_kwargs["response_model"] = response_model
    if status_code is not None:
        route_kwargs["status_code"] = status_code
    if summary is not None:
        route_kwargs["summary"] = summary
    if description is not None:
        route_kwargs["description"] = description

    # Register the route with the appropriate HTTP method
    http_method_lower = http_method.lower()
    if http_method_lower == "get":
        self.router.get(route, **route_kwargs)(handler)
    elif http_method_lower == "post":
        self.router.post(route, **route_kwargs)(handler)
    elif http_method_lower == "put":
        self.router.put(route, **route_kwargs)(handler)
    elif http_method_lower == "patch":
        self.router.patch(route, **route_kwargs)(handler)
    elif http_method_lower == "delete":
        self.router.delete(route, **route_kwargs)(handler)
    else:
        raise ValueError(f"Unsupported HTTP method: {http_method}")

    # Move the just-added route to before parametric routes
    # FastAPI appends to routes list, so the last route is the one we just added
    if len(self.router.routes) > 1:
        new_route = self.router.routes.pop()  # Remove the route we just added
        # Find the first parametric route and insert before it
        insert_index = self._find_parametric_route_index()
        self.router.routes.insert(insert_index, new_route)

CrudPermissions

CrudPermissions dataclass

Permissions configuration for CRUD operations.

Source code in src/servicekit/api/crud.py
@dataclass(slots=True)
class CrudPermissions:
    """Permissions configuration for CRUD operations."""

    create: bool = True
    read: bool = True
    update: bool = True
    delete: bool = True

HealthRouter

HealthRouter

Bases: Router

Health check router for service health monitoring.

Source code in src/servicekit/api/routers/health.py
class HealthRouter(Router):
    """Health check router for service health monitoring."""

    default_response_model_exclude_none = True

    def __init__(
        self,
        prefix: str,
        tags: list[str],
        checks: dict[str, HealthCheck] | None = None,
        **kwargs: object,
    ) -> None:
        """Initialize health router with optional health checks."""
        self.checks = checks or {}
        super().__init__(prefix=prefix, tags=tags, **kwargs)

    def _register_routes(self) -> None:
        """Register health check endpoint."""
        checks = self.checks

        async def run_health_checks() -> HealthStatus:
            """Run all health checks and aggregate results."""
            if not checks:
                return HealthStatus(status=HealthState.HEALTHY)

            check_results: dict[str, CheckResult] = {}
            overall_state = HealthState.HEALTHY

            for name, check_fn in checks.items():
                try:
                    state, message = await check_fn()
                    check_results[name] = CheckResult(state=state, message=message)

                    if state == HealthState.UNHEALTHY:
                        overall_state = HealthState.UNHEALTHY
                    elif state == HealthState.DEGRADED and overall_state == HealthState.HEALTHY:
                        overall_state = HealthState.DEGRADED

                except Exception as e:
                    check_results[name] = CheckResult(state=HealthState.UNHEALTHY, message=f"Check failed: {str(e)}")
                    overall_state = HealthState.UNHEALTHY

            return HealthStatus(status=overall_state, checks=check_results)

        @self.router.get(
            "",
            summary="Health check",
            response_model=HealthStatus,
            response_model_exclude_none=self.default_response_model_exclude_none,
        )
        async def health_check() -> HealthStatus:
            return await run_health_checks()

        @self.router.get(
            "/$stream",
            summary="Stream health status updates via SSE",
            description="Real-time Server-Sent Events stream of health status at regular intervals",
        )
        async def stream_health_status(poll_interval: float = 1.0) -> StreamingResponse:
            """Stream real-time health status updates using Server-Sent Events."""

            async def event_stream() -> AsyncGenerator[bytes, None]:
                while True:
                    status = await run_health_checks()
                    yield format_sse_model_event(status, exclude_none=self.default_response_model_exclude_none)
                    await asyncio.sleep(poll_interval)

            return StreamingResponse(
                event_stream(),
                media_type="text/event-stream",
                headers=SSE_HEADERS,
            )

Functions

__init__(prefix, tags, checks=None, **kwargs)

Initialize health router with optional health checks.

Source code in src/servicekit/api/routers/health.py
def __init__(
    self,
    prefix: str,
    tags: list[str],
    checks: dict[str, HealthCheck] | None = None,
    **kwargs: object,
) -> None:
    """Initialize health router with optional health checks."""
    self.checks = checks or {}
    super().__init__(prefix=prefix, tags=tags, **kwargs)

JobRouter

JobRouter

Bases: Router

REST API router for job scheduler operations.

Source code in src/servicekit/api/routers/job.py
class JobRouter(Router):
    """REST API router for job scheduler operations."""

    def __init__(
        self,
        prefix: str,
        tags: list[str],
        scheduler_factory: Callable[[], JobScheduler],
        **kwargs: object,
    ) -> None:
        """Initialize job router with scheduler factory."""
        self.scheduler_factory = scheduler_factory
        super().__init__(prefix=prefix, tags=tags, **kwargs)

    def _register_routes(self) -> None:
        """Register job management endpoints."""
        scheduler_dependency = Depends(self.scheduler_factory)

        @self.router.get("", summary="List all jobs", response_model=list[JobRecord])
        async def get_jobs(
            scheduler: JobScheduler = scheduler_dependency,
            status_filter: JobStatus | None = None,
        ) -> list[JobRecord]:
            jobs = await scheduler.get_all_records()
            if status_filter:
                return [job for job in jobs if job.status == status_filter]
            return jobs

        @self.router.get("/$schema", summary="Get jobs list schema", response_model=dict[str, Any])
        async def get_jobs_schema() -> dict[str, Any]:
            """Get JSON schema for jobs list response."""
            return TypeAdapter(list[JobRecord]).json_schema()

        @self.router.get("/{job_id}", summary="Get job by ID", response_model=JobRecord)
        async def get_job(
            job_id: str,
            scheduler: JobScheduler = scheduler_dependency,
        ) -> JobRecord:
            try:
                ulid_id = ULID.from_str(job_id)
                return await scheduler.get_record(ulid_id)
            except (ValueError, KeyError):
                raise HTTPException(status_code=404, detail="Job not found")

        @self.router.delete("/{job_id}", summary="Cancel and delete job", status_code=status.HTTP_204_NO_CONTENT)
        async def delete_job(
            job_id: str,
            scheduler: JobScheduler = scheduler_dependency,
        ) -> Response:
            try:
                ulid_id = ULID.from_str(job_id)
                await scheduler.delete(ulid_id)
                return Response(status_code=status.HTTP_204_NO_CONTENT)
            except (ValueError, KeyError):
                raise HTTPException(status_code=404, detail="Job not found")

        @self.router.get(
            "/{job_id}/$stream",
            summary="Stream job status updates via SSE",
            description="Real-time Server-Sent Events stream of job status changes until terminal state",
        )
        async def stream_job_status(
            job_id: str,
            scheduler: JobScheduler = scheduler_dependency,
            poll_interval: float = 0.5,
        ) -> StreamingResponse:
            """Stream real-time job status updates using Server-Sent Events."""
            # Validate job_id format
            try:
                ulid_id = ULID.from_str(job_id)
            except ValueError:
                raise HTTPException(status_code=400, detail="Invalid job ID format")

            # Check job exists before starting stream
            try:
                await scheduler.get_record(ulid_id)
            except KeyError:
                raise HTTPException(status_code=404, detail="Job not found")

            # SSE event generator
            async def event_stream() -> AsyncGenerator[bytes, None]:
                terminal_states = {"completed", "failed", "canceled"}

                while True:
                    try:
                        record = await scheduler.get_record(ulid_id)
                        # Format as SSE event
                        yield format_sse_model_event(record)

                        # Stop streaming if job reached terminal state
                        if record.status in terminal_states:
                            break

                    except KeyError:
                        # Job was deleted - send final event and close
                        yield b'data: {"status": "deleted"}\n\n'
                        break

                    await asyncio.sleep(poll_interval)

            return StreamingResponse(
                event_stream(),
                media_type="text/event-stream",
                headers=SSE_HEADERS,
            )

Functions

__init__(prefix, tags, scheduler_factory, **kwargs)

Initialize job router with scheduler factory.

Source code in src/servicekit/api/routers/job.py
def __init__(
    self,
    prefix: str,
    tags: list[str],
    scheduler_factory: Callable[[], JobScheduler],
    **kwargs: object,
) -> None:
    """Initialize job router with scheduler factory."""
    self.scheduler_factory = scheduler_factory
    super().__init__(prefix=prefix, tags=tags, **kwargs)

SystemRouter

SystemRouter

Bases: Router

System information router.

Source code in src/servicekit/api/routers/system.py
class SystemRouter(Router):
    """System information router."""

    def _register_routes(self) -> None:
        """Register system info endpoint."""

        @self.router.get(
            "",
            summary="System information",
            response_model=SystemInfo,
        )
        async def get_system_info() -> SystemInfo:
            return SystemInfo(
                current_time=datetime.now(timezone.utc),
                timezone=str(datetime.now().astimezone().tzinfo),
                python_version=f"{sys.version_info.major}.{sys.version_info.minor}.{sys.version_info.micro}",
                platform=platform.platform(),
                hostname=platform.node(),
            )

        @self.router.get(
            "/apps",
            summary="List installed apps",
            response_model=list[AppInfo],
        )
        async def list_apps(
            app_manager: Annotated[AppManager, Depends(get_app_manager)],
        ) -> list[AppInfo]:
            """List all installed apps with their metadata."""
            return [
                AppInfo(
                    name=app.manifest.name,
                    version=app.manifest.version,
                    prefix=app.prefix,
                    description=app.manifest.description,
                    author=app.manifest.author,
                    entry=app.manifest.entry,
                    is_package=app.is_package,
                )
                for app in app_manager.list()
            ]

        @self.router.get(
            "/apps/$schema",
            summary="Get apps list schema",
            response_model=dict[str, Any],
        )
        async def get_apps_schema() -> dict[str, Any]:
            """Get JSON schema for apps list response."""
            return TypeAdapter(list[AppInfo]).json_schema()

MetricsRouter

MetricsRouter

Bases: Router

Metrics router for Prometheus metrics exposition.

Source code in src/servicekit/api/routers/metrics.py
class MetricsRouter(Router):
    """Metrics router for Prometheus metrics exposition."""

    def __init__(
        self,
        prefix: str,
        tags: list[str],
        metric_reader: PrometheusMetricReader,
        **kwargs: object,
    ) -> None:
        """Initialize metrics router with Prometheus metric reader."""
        self.metric_reader = metric_reader
        super().__init__(prefix=prefix, tags=tags, **kwargs)

    def _register_routes(self) -> None:
        """Register Prometheus metrics endpoint."""

        @self.router.get(
            "",
            summary="Prometheus metrics",
            response_class=Response,
        )
        async def get_metrics() -> Response:
            """Expose metrics in Prometheus text format."""
            # Get latest metrics from the reader
            from prometheus_client import REGISTRY, generate_latest

            metrics_output = generate_latest(REGISTRY)

            return Response(
                content=metrics_output,
                media_type="text/plain; version=0.0.4; charset=utf-8",
            )

Functions

__init__(prefix, tags, metric_reader, **kwargs)

Initialize metrics router with Prometheus metric reader.

Source code in src/servicekit/api/routers/metrics.py
def __init__(
    self,
    prefix: str,
    tags: list[str],
    metric_reader: PrometheusMetricReader,
    **kwargs: object,
) -> None:
    """Initialize metrics router with Prometheus metric reader."""
    self.metric_reader = metric_reader
    super().__init__(prefix=prefix, tags=tags, **kwargs)

App System

Static web application hosting system.

AppLoader

AppLoader

Loads and validates apps from filesystem or package resources.

Source code in src/servicekit/api/app.py
class AppLoader:
    """Loads and validates apps from filesystem or package resources."""

    @staticmethod
    def load(path: str | Path | tuple[str, str], prefix: str | None = None) -> App:
        """Load and validate app from filesystem path or package resource tuple."""
        # Detect source type and resolve to directory
        if isinstance(path, tuple):
            # Package resource
            dir_path, is_package = AppLoader._resolve_package_path(path)
        else:
            # Filesystem path
            dir_path = Path(path).resolve()
            is_package = False

            if not dir_path.exists():
                raise FileNotFoundError(f"App directory not found: {dir_path}")
            if not dir_path.is_dir():
                raise NotADirectoryError(f"App path is not a directory: {dir_path}")

        # Load and validate manifest
        manifest_path = dir_path / "manifest.json"
        if not manifest_path.exists():
            raise FileNotFoundError(f"manifest.json not found in: {dir_path}")

        try:
            with manifest_path.open() as f:
                manifest_data = json.load(f)
        except json.JSONDecodeError as e:
            raise ValueError(f"Invalid JSON in manifest.json: {e}") from e

        manifest = AppManifest(**manifest_data)

        # Validate entry file exists
        entry_path = dir_path / manifest.entry
        if not entry_path.exists():
            raise FileNotFoundError(f"Entry file '{manifest.entry}' not found in: {dir_path}")

        # Use override or manifest prefix
        final_prefix = prefix if prefix is not None else manifest.prefix

        # Re-validate prefix if overridden
        if prefix is not None:
            validated = AppManifest(
                name=manifest.name,
                version=manifest.version,
                prefix=final_prefix,
            )
            final_prefix = validated.prefix

        return App(
            manifest=manifest,
            directory=dir_path,
            prefix=final_prefix,
            is_package=is_package,
        )

    @staticmethod
    def discover(path: str | Path | tuple[str, str]) -> list[App]:
        """Discover all apps with manifest.json in directory."""
        # Resolve directory
        if isinstance(path, tuple):
            dir_path, _ = AppLoader._resolve_package_path(path)
        else:
            dir_path = Path(path).resolve()

            if not dir_path.exists():
                raise FileNotFoundError(f"Apps directory not found: {dir_path}")
            if not dir_path.is_dir():
                raise NotADirectoryError(f"Apps path is not a directory: {dir_path}")

        # Scan for subdirectories with manifest.json
        apps: list[App] = []
        for subdir in dir_path.iterdir():
            if subdir.is_dir() and (subdir / "manifest.json").exists():
                try:
                    # Determine if we're in a package context
                    if isinstance(path, tuple):
                        # Build tuple path for subdirectory
                        package_name: str = path[0]
                        base_path: str = path[1]
                        subdir_name = subdir.name
                        subpath = f"{base_path}/{subdir_name}" if base_path else subdir_name
                        app = AppLoader.load((package_name, subpath))
                    else:
                        app = AppLoader.load(subdir)
                    apps.append(app)
                except Exception as e:
                    # Log but don't fail discovery for invalid apps
                    logger.warning(
                        "app.discovery.failed",
                        directory=str(subdir),
                        error=str(e),
                    )

        return apps

    @staticmethod
    def _resolve_package_path(package_tuple: tuple[str, str]) -> tuple[Path, bool]:
        """Resolve package resource tuple to filesystem path."""
        package_name, subpath = package_tuple

        # Validate subpath for security
        if ".." in subpath:
            raise ValueError(f"subpath cannot contain '..' (got: {subpath})")
        if subpath.startswith("/"):
            raise ValueError(f"subpath must be relative (got: {subpath})")

        try:
            spec = importlib.util.find_spec(package_name)
        except (ModuleNotFoundError, ValueError) as e:
            raise ValueError(f"Package '{package_name}' could not be found") from e

        if spec is None or spec.origin is None:
            raise ValueError(f"Package '{package_name}' could not be found")

        # Resolve to package directory
        package_dir = Path(spec.origin).parent
        app_dir = package_dir / subpath

        # Verify resolved path is still within package directory
        try:
            app_dir.resolve().relative_to(package_dir.resolve())
        except ValueError as e:
            raise ValueError(f"App path '{subpath}' escapes package directory") from e

        if not app_dir.exists():
            raise FileNotFoundError(f"App path '{subpath}' not found in package '{package_name}'")
        if not app_dir.is_dir():
            raise NotADirectoryError(f"App path '{subpath}' in package '{package_name}' is not a directory")

        return app_dir, True

Functions

load(path, prefix=None) staticmethod

Load and validate app from filesystem path or package resource tuple.

Source code in src/servicekit/api/app.py
@staticmethod
def load(path: str | Path | tuple[str, str], prefix: str | None = None) -> App:
    """Load and validate app from filesystem path or package resource tuple."""
    # Detect source type and resolve to directory
    if isinstance(path, tuple):
        # Package resource
        dir_path, is_package = AppLoader._resolve_package_path(path)
    else:
        # Filesystem path
        dir_path = Path(path).resolve()
        is_package = False

        if not dir_path.exists():
            raise FileNotFoundError(f"App directory not found: {dir_path}")
        if not dir_path.is_dir():
            raise NotADirectoryError(f"App path is not a directory: {dir_path}")

    # Load and validate manifest
    manifest_path = dir_path / "manifest.json"
    if not manifest_path.exists():
        raise FileNotFoundError(f"manifest.json not found in: {dir_path}")

    try:
        with manifest_path.open() as f:
            manifest_data = json.load(f)
    except json.JSONDecodeError as e:
        raise ValueError(f"Invalid JSON in manifest.json: {e}") from e

    manifest = AppManifest(**manifest_data)

    # Validate entry file exists
    entry_path = dir_path / manifest.entry
    if not entry_path.exists():
        raise FileNotFoundError(f"Entry file '{manifest.entry}' not found in: {dir_path}")

    # Use override or manifest prefix
    final_prefix = prefix if prefix is not None else manifest.prefix

    # Re-validate prefix if overridden
    if prefix is not None:
        validated = AppManifest(
            name=manifest.name,
            version=manifest.version,
            prefix=final_prefix,
        )
        final_prefix = validated.prefix

    return App(
        manifest=manifest,
        directory=dir_path,
        prefix=final_prefix,
        is_package=is_package,
    )

discover(path) staticmethod

Discover all apps with manifest.json in directory.

Source code in src/servicekit/api/app.py
@staticmethod
def discover(path: str | Path | tuple[str, str]) -> list[App]:
    """Discover all apps with manifest.json in directory."""
    # Resolve directory
    if isinstance(path, tuple):
        dir_path, _ = AppLoader._resolve_package_path(path)
    else:
        dir_path = Path(path).resolve()

        if not dir_path.exists():
            raise FileNotFoundError(f"Apps directory not found: {dir_path}")
        if not dir_path.is_dir():
            raise NotADirectoryError(f"Apps path is not a directory: {dir_path}")

    # Scan for subdirectories with manifest.json
    apps: list[App] = []
    for subdir in dir_path.iterdir():
        if subdir.is_dir() and (subdir / "manifest.json").exists():
            try:
                # Determine if we're in a package context
                if isinstance(path, tuple):
                    # Build tuple path for subdirectory
                    package_name: str = path[0]
                    base_path: str = path[1]
                    subdir_name = subdir.name
                    subpath = f"{base_path}/{subdir_name}" if base_path else subdir_name
                    app = AppLoader.load((package_name, subpath))
                else:
                    app = AppLoader.load(subdir)
                apps.append(app)
            except Exception as e:
                # Log but don't fail discovery for invalid apps
                logger.warning(
                    "app.discovery.failed",
                    directory=str(subdir),
                    error=str(e),
                )

    return apps

AppManifest

AppManifest

Bases: BaseModel

App manifest configuration.

Source code in src/servicekit/api/app.py
class AppManifest(BaseModel):
    """App manifest configuration."""

    model_config = ConfigDict(extra="forbid")

    name: str = Field(description="Human-readable app name")
    version: str = Field(description="Semantic version")
    prefix: str = Field(description="URL prefix for mounting")
    description: str | None = Field(default=None, description="App description")
    author: str | None = Field(default=None, description="Author name")
    entry: str = Field(default="index.html", description="Entry point filename")

    @field_validator("prefix")
    @classmethod
    def validate_prefix(cls, v: str) -> str:
        """Validate mount prefix format."""
        if not v.startswith("/"):
            raise ValueError("prefix must start with '/'")
        if ".." in v:
            raise ValueError("prefix cannot contain '..'")
        if v.startswith("/api/") or v == "/api":
            raise ValueError("prefix cannot be '/api' or start with '/api/'")
        return v

    @field_validator("entry")
    @classmethod
    def validate_entry(cls, v: str) -> str:
        """Validate entry file path for security."""
        if ".." in v:
            raise ValueError("entry cannot contain '..'")
        if v.startswith("/"):
            raise ValueError("entry must be a relative path")
        # Normalize and check for path traversal
        normalized = Path(v).as_posix()
        if normalized.startswith("../") or "/../" in normalized:
            raise ValueError("entry cannot contain path traversal")
        return v

Functions

validate_prefix(v) classmethod

Validate mount prefix format.

Source code in src/servicekit/api/app.py
@field_validator("prefix")
@classmethod
def validate_prefix(cls, v: str) -> str:
    """Validate mount prefix format."""
    if not v.startswith("/"):
        raise ValueError("prefix must start with '/'")
    if ".." in v:
        raise ValueError("prefix cannot contain '..'")
    if v.startswith("/api/") or v == "/api":
        raise ValueError("prefix cannot be '/api' or start with '/api/'")
    return v

validate_entry(v) classmethod

Validate entry file path for security.

Source code in src/servicekit/api/app.py
@field_validator("entry")
@classmethod
def validate_entry(cls, v: str) -> str:
    """Validate entry file path for security."""
    if ".." in v:
        raise ValueError("entry cannot contain '..'")
    if v.startswith("/"):
        raise ValueError("entry must be a relative path")
    # Normalize and check for path traversal
    normalized = Path(v).as_posix()
    if normalized.startswith("../") or "/../" in normalized:
        raise ValueError("entry cannot contain path traversal")
    return v

App

App dataclass

Represents a loaded app with manifest and directory.

Source code in src/servicekit/api/app.py
@dataclass
class App:
    """Represents a loaded app with manifest and directory."""

    manifest: AppManifest
    directory: Path
    prefix: str  # May differ from manifest if overridden
    is_package: bool  # True if loaded from package resources

AppManager

AppManager

Lightweight manager for app metadata queries.

Source code in src/servicekit/api/app.py
class AppManager:
    """Lightweight manager for app metadata queries."""

    def __init__(self, apps: list[App]):
        """Initialize with loaded apps."""
        self._apps = apps

    def list(self) -> list[App]:
        """Return all installed apps."""
        return self._apps

    def get(self, prefix: str) -> App | None:
        """Get app by mount prefix."""
        return next((app for app in self._apps if app.prefix == prefix), None)

Functions

__init__(apps)

Initialize with loaded apps.

Source code in src/servicekit/api/app.py
def __init__(self, apps: list[App]):
    """Initialize with loaded apps."""
    self._apps = apps

list()

Return all installed apps.

Source code in src/servicekit/api/app.py
def list(self) -> list[App]:
    """Return all installed apps."""
    return self._apps

get(prefix)

Get app by mount prefix.

Source code in src/servicekit/api/app.py
def get(self, prefix: str) -> App | None:
    """Get app by mount prefix."""
    return next((app for app in self._apps if app.prefix == prefix), None)

AppInfo

AppInfo

Bases: BaseModel

App metadata for API responses.

Source code in src/servicekit/api/app.py
class AppInfo(BaseModel):
    """App metadata for API responses."""

    name: str = Field(description="Human-readable app name")
    version: str = Field(description="Semantic version")
    prefix: str = Field(description="URL prefix for mounting")
    description: str | None = Field(default=None, description="App description")
    author: str | None = Field(default=None, description="Author name")
    entry: str = Field(description="Entry point filename")
    is_package: bool = Field(description="Whether app is loaded from package resources")

Authentication

API key authentication middleware and utilities.

APIKeyMiddleware

APIKeyMiddleware

Bases: BaseHTTPMiddleware

Middleware for API key authentication via X-API-Key header.

Source code in src/servicekit/api/auth.py
class APIKeyMiddleware(BaseHTTPMiddleware):
    """Middleware for API key authentication via X-API-Key header."""

    def __init__(
        self,
        app: Any,
        *,
        api_keys: Set[str],
        header_name: str = "X-API-Key",
        unauthenticated_paths: Set[str],
    ) -> None:
        """Initialize API key middleware.

        Args:
            app: ASGI application
            api_keys: Set of valid API keys
            header_name: HTTP header name for API key
            unauthenticated_paths: Paths that don't require authentication
        """
        super().__init__(app)
        self.api_keys = api_keys
        self.header_name = header_name
        self.unauthenticated_paths = unauthenticated_paths

    async def dispatch(self, request: Request, call_next: MiddlewareCallNext) -> Response:
        """Process request with API key authentication."""
        # Allow unauthenticated access to specific paths
        if request.url.path in self.unauthenticated_paths:
            return await call_next(request)

        # Extract API key from header
        api_key = request.headers.get(self.header_name)

        if not api_key:
            logger.warning(
                "auth.missing_key",
                path=request.url.path,
                method=request.method,
            )
            problem = ProblemDetail(
                type="urn:servicekit:error:unauthorized",
                title="Unauthorized",
                status=status.HTTP_401_UNAUTHORIZED,
                detail=f"Missing authentication header: {self.header_name}",
                instance=str(request.url.path),
            )
            return JSONResponse(
                status_code=status.HTTP_401_UNAUTHORIZED,
                content=problem.model_dump(exclude_none=True),
                media_type="application/problem+json",
            )

        # Validate API key
        if api_key not in self.api_keys:
            # Log only prefix for security
            key_prefix = api_key[:7] if len(api_key) >= 7 else "***"
            logger.warning(
                "auth.invalid_key",
                key_prefix=key_prefix,
                path=request.url.path,
                method=request.method,
            )
            problem = ProblemDetail(
                type="urn:servicekit:error:unauthorized",
                title="Unauthorized",
                status=status.HTTP_401_UNAUTHORIZED,
                detail="Invalid API key",
                instance=str(request.url.path),
            )
            return JSONResponse(
                status_code=status.HTTP_401_UNAUTHORIZED,
                content=problem.model_dump(exclude_none=True),
                media_type="application/problem+json",
            )

        # Attach key prefix to request state for logging
        request.state.api_key_prefix = api_key[:7] if len(api_key) >= 7 else "***"

        logger.info(
            "auth.success",
            key_prefix=request.state.api_key_prefix,
            path=request.url.path,
        )

        return await call_next(request)

Functions

__init__(app, *, api_keys, header_name='X-API-Key', unauthenticated_paths)

Initialize API key middleware.

Parameters:

Name Type Description Default
app Any

ASGI application

required
api_keys Set[str]

Set of valid API keys

required
header_name str

HTTP header name for API key

'X-API-Key'
unauthenticated_paths Set[str]

Paths that don't require authentication

required
Source code in src/servicekit/api/auth.py
def __init__(
    self,
    app: Any,
    *,
    api_keys: Set[str],
    header_name: str = "X-API-Key",
    unauthenticated_paths: Set[str],
) -> None:
    """Initialize API key middleware.

    Args:
        app: ASGI application
        api_keys: Set of valid API keys
        header_name: HTTP header name for API key
        unauthenticated_paths: Paths that don't require authentication
    """
    super().__init__(app)
    self.api_keys = api_keys
    self.header_name = header_name
    self.unauthenticated_paths = unauthenticated_paths

dispatch(request, call_next) async

Process request with API key authentication.

Source code in src/servicekit/api/auth.py
async def dispatch(self, request: Request, call_next: MiddlewareCallNext) -> Response:
    """Process request with API key authentication."""
    # Allow unauthenticated access to specific paths
    if request.url.path in self.unauthenticated_paths:
        return await call_next(request)

    # Extract API key from header
    api_key = request.headers.get(self.header_name)

    if not api_key:
        logger.warning(
            "auth.missing_key",
            path=request.url.path,
            method=request.method,
        )
        problem = ProblemDetail(
            type="urn:servicekit:error:unauthorized",
            title="Unauthorized",
            status=status.HTTP_401_UNAUTHORIZED,
            detail=f"Missing authentication header: {self.header_name}",
            instance=str(request.url.path),
        )
        return JSONResponse(
            status_code=status.HTTP_401_UNAUTHORIZED,
            content=problem.model_dump(exclude_none=True),
            media_type="application/problem+json",
        )

    # Validate API key
    if api_key not in self.api_keys:
        # Log only prefix for security
        key_prefix = api_key[:7] if len(api_key) >= 7 else "***"
        logger.warning(
            "auth.invalid_key",
            key_prefix=key_prefix,
            path=request.url.path,
            method=request.method,
        )
        problem = ProblemDetail(
            type="urn:servicekit:error:unauthorized",
            title="Unauthorized",
            status=status.HTTP_401_UNAUTHORIZED,
            detail="Invalid API key",
            instance=str(request.url.path),
        )
        return JSONResponse(
            status_code=status.HTTP_401_UNAUTHORIZED,
            content=problem.model_dump(exclude_none=True),
            media_type="application/problem+json",
        )

    # Attach key prefix to request state for logging
    request.state.api_key_prefix = api_key[:7] if len(api_key) >= 7 else "***"

    logger.info(
        "auth.success",
        key_prefix=request.state.api_key_prefix,
        path=request.url.path,
    )

    return await call_next(request)

Middleware

Error handling and logging middleware.

middleware

FastAPI middleware for error handling, CORS, and other cross-cutting concerns.

Classes

AppPrefixRedirectMiddleware

Bases: BaseHTTPMiddleware

Middleware to redirect app prefix requests without trailing slash to version with trailing slash.

Source code in src/servicekit/api/middleware.py
class AppPrefixRedirectMiddleware(BaseHTTPMiddleware):
    """Middleware to redirect app prefix requests without trailing slash to version with trailing slash."""

    def __init__(self, app: Any, app_prefixes: list[str]) -> None:
        """Initialize middleware with list of app prefixes to handle."""
        super().__init__(app)
        self.app_prefixes = set(app_prefixes)

    async def dispatch(self, request: Request, call_next: MiddlewareCallNext) -> Response:
        """Redirect requests to app prefixes without trailing slash."""
        # Check if path matches one of our app prefixes exactly (no trailing slash)
        if request.url.path in self.app_prefixes and request.method in ("GET", "HEAD"):
            from fastapi.responses import RedirectResponse

            # Redirect to same path with trailing slash
            redirect_url = request.url.replace(path=f"{request.url.path}/")
            return RedirectResponse(url=str(redirect_url), status_code=307)

        # Continue processing
        return await call_next(request)
Functions
__init__(app, app_prefixes)

Initialize middleware with list of app prefixes to handle.

Source code in src/servicekit/api/middleware.py
def __init__(self, app: Any, app_prefixes: list[str]) -> None:
    """Initialize middleware with list of app prefixes to handle."""
    super().__init__(app)
    self.app_prefixes = set(app_prefixes)
dispatch(request, call_next) async

Redirect requests to app prefixes without trailing slash.

Source code in src/servicekit/api/middleware.py
async def dispatch(self, request: Request, call_next: MiddlewareCallNext) -> Response:
    """Redirect requests to app prefixes without trailing slash."""
    # Check if path matches one of our app prefixes exactly (no trailing slash)
    if request.url.path in self.app_prefixes and request.method in ("GET", "HEAD"):
        from fastapi.responses import RedirectResponse

        # Redirect to same path with trailing slash
        redirect_url = request.url.replace(path=f"{request.url.path}/")
        return RedirectResponse(url=str(redirect_url), status_code=307)

    # Continue processing
    return await call_next(request)

RequestLoggingMiddleware

Bases: BaseHTTPMiddleware

Middleware for logging HTTP requests with unique request IDs and context binding.

Source code in src/servicekit/api/middleware.py
class RequestLoggingMiddleware(BaseHTTPMiddleware):
    """Middleware for logging HTTP requests with unique request IDs and context binding."""

    async def dispatch(self, request: Request, call_next: MiddlewareCallNext) -> Response:
        """Process request with logging and context binding."""
        request_id = str(ULID())
        start_time = time.perf_counter()

        # Bind request context
        add_request_context(
            request_id=request_id,
            method=request.method,
            path=request.url.path,
            client_host=request.client.host if request.client else None,
        )

        # Add request_id to request state for access in endpoints
        request.state.request_id = request_id

        logger.info(
            "http.request.start",
            query_params=str(request.url.query) if request.url.query else None,
        )

        try:
            response = await call_next(request)
            duration_ms = (time.perf_counter() - start_time) * 1000

            logger.info(
                "http.request.complete",
                status_code=response.status_code,
                duration_ms=round(duration_ms, 2),
            )

            # Add request_id to response headers for tracing
            response.headers["X-Request-ID"] = request_id

            return response

        except Exception as exc:
            duration_ms = (time.perf_counter() - start_time) * 1000

            logger.error(
                "http.request.error",
                duration_ms=round(duration_ms, 2),
                error=str(exc),
                exc_info=True,
            )
            raise

        finally:
            # Clear request context after response
            reset_request_context()
Functions
dispatch(request, call_next) async

Process request with logging and context binding.

Source code in src/servicekit/api/middleware.py
async def dispatch(self, request: Request, call_next: MiddlewareCallNext) -> Response:
    """Process request with logging and context binding."""
    request_id = str(ULID())
    start_time = time.perf_counter()

    # Bind request context
    add_request_context(
        request_id=request_id,
        method=request.method,
        path=request.url.path,
        client_host=request.client.host if request.client else None,
    )

    # Add request_id to request state for access in endpoints
    request.state.request_id = request_id

    logger.info(
        "http.request.start",
        query_params=str(request.url.query) if request.url.query else None,
    )

    try:
        response = await call_next(request)
        duration_ms = (time.perf_counter() - start_time) * 1000

        logger.info(
            "http.request.complete",
            status_code=response.status_code,
            duration_ms=round(duration_ms, 2),
        )

        # Add request_id to response headers for tracing
        response.headers["X-Request-ID"] = request_id

        return response

    except Exception as exc:
        duration_ms = (time.perf_counter() - start_time) * 1000

        logger.error(
            "http.request.error",
            duration_ms=round(duration_ms, 2),
            error=str(exc),
            exc_info=True,
        )
        raise

    finally:
        # Clear request context after response
        reset_request_context()

Functions

database_error_handler(request, exc) async

Handle database errors and return error response.

Source code in src/servicekit/api/middleware.py
async def database_error_handler(request: Request, exc: Exception) -> JSONResponse:
    """Handle database errors and return error response."""
    logger.error(
        "database.error",
        error=str(exc),
        path=request.url.path,
        exc_info=True,
    )
    return JSONResponse(
        status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
        content={"detail": "Database error occurred", "error": str(exc)},
    )

validation_error_handler(request, exc) async

Handle validation errors and return error response.

Source code in src/servicekit/api/middleware.py
async def validation_error_handler(request: Request, exc: Exception) -> JSONResponse:
    """Handle validation errors and return error response."""
    logger.warning(
        "validation.error",
        error=str(exc),
        path=request.url.path,
    )
    return JSONResponse(
        status_code=status.HTTP_422_UNPROCESSABLE_CONTENT,
        content={"detail": "Validation error", "errors": str(exc)},
    )

servicekit_exception_handler(request, exc) async

Handle ServicekitException and return RFC 9457 Problem Details.

Source code in src/servicekit/api/middleware.py
async def servicekit_exception_handler(request: Request, exc: ServicekitException) -> JSONResponse:
    """Handle ServicekitException and return RFC 9457 Problem Details."""
    logger.warning(
        "servicekit.error",
        error_type=exc.type_uri,
        status=exc.status,
        detail=exc.detail,
        path=request.url.path,
    )

    problem = ProblemDetail(
        type=exc.type_uri,
        title=exc.title,
        status=exc.status,
        detail=exc.detail,
        instance=exc.instance or str(request.url),
        **exc.extensions,
    )

    return JSONResponse(
        status_code=exc.status,
        content=problem.model_dump(exclude_none=True),
        media_type="application/problem+json",
    )

add_error_handlers(app)

Add error handlers to FastAPI application.

Source code in src/servicekit/api/middleware.py
def add_error_handlers(app: Any) -> None:
    """Add error handlers to FastAPI application."""
    from pydantic import ValidationError
    from sqlalchemy.exc import SQLAlchemyError

    app.add_exception_handler(ServicekitException, servicekit_exception_handler)
    app.add_exception_handler(SQLAlchemyError, database_error_handler)
    app.add_exception_handler(ValidationError, validation_error_handler)

add_logging_middleware(app)

Add request logging middleware to FastAPI application.

Source code in src/servicekit/api/middleware.py
def add_logging_middleware(app: Any) -> None:
    """Add request logging middleware to FastAPI application."""
    app.add_middleware(RequestLoggingMiddleware)

Dependencies

FastAPI dependency injection functions.

dependencies

Generic FastAPI dependency injection for database and scheduler.

Classes

Functions

set_database(database)

Set the global database instance.

Source code in src/servicekit/api/dependencies.py
def set_database(database: Database) -> None:
    """Set the global database instance."""
    global _database
    _database = database

get_database()

Get the global database instance.

Source code in src/servicekit/api/dependencies.py
def get_database() -> Database:
    """Get the global database instance."""
    if _database is None:
        raise RuntimeError("Database not initialized. Call set_database() during app startup.")
    return _database

get_session(db) async

Get a database session for dependency injection.

Source code in src/servicekit/api/dependencies.py
async def get_session(db: Annotated[Database, Depends(get_database)]) -> AsyncIterator[AsyncSession]:
    """Get a database session for dependency injection."""
    async with db.session() as session:
        yield session

set_scheduler(scheduler)

Set the global scheduler instance.

Source code in src/servicekit/api/dependencies.py
def set_scheduler(scheduler: JobScheduler) -> None:
    """Set the global scheduler instance."""
    global _scheduler
    _scheduler = scheduler

get_scheduler()

Get the global scheduler instance.

Source code in src/servicekit/api/dependencies.py
def get_scheduler() -> JobScheduler:
    """Get the global scheduler instance."""
    if _scheduler is None:
        raise RuntimeError("Scheduler not initialized. Call set_scheduler() during app startup.")
    return _scheduler

set_app_manager(manager)

Set the global app manager instance.

Source code in src/servicekit/api/dependencies.py
def set_app_manager(manager: AppManager) -> None:
    """Set the global app manager instance."""
    global _app_manager
    _app_manager = manager

get_app_manager()

Get the global app manager instance.

Source code in src/servicekit/api/dependencies.py
def get_app_manager() -> AppManager:
    """Get the global app manager instance."""
    if _app_manager is None:
        raise RuntimeError("AppManager not initialized. Call set_app_manager() during app startup.")
    return _app_manager

Pagination

Pagination helpers for collection endpoints.

pagination

Pagination utilities for API endpoints.

Classes

PaginationParams

Bases: BaseModel

Query parameters for opt-in pagination (both page and size required).

Source code in src/servicekit/api/pagination.py
class PaginationParams(BaseModel):
    """Query parameters for opt-in pagination (both page and size required)."""

    page: int | None = Field(default=None, ge=1, description="Page number (1-indexed)")
    size: int | None = Field(default=None, ge=1, le=100, description="Number of items per page (max 100)")

    def is_paginated(self) -> bool:
        """Check if both page and size parameters are provided."""
        return self.page is not None and self.size is not None
Functions
is_paginated()

Check if both page and size parameters are provided.

Source code in src/servicekit/api/pagination.py
def is_paginated(self) -> bool:
    """Check if both page and size parameters are provided."""
    return self.page is not None and self.size is not None

Functions

create_paginated_response(items, total, page, size)

Create paginated response with items and metadata.

Source code in src/servicekit/api/pagination.py
def create_paginated_response(items: list[T], total: int, page: int, size: int) -> PaginatedResponse[T]:
    """Create paginated response with items and metadata."""
    return PaginatedResponse(items=items, total=total, page=page, size=size)

Utilities

Utility functions for FastAPI applications.

utilities

Utility functions for FastAPI routers and endpoints.

Functions

build_location_url(request, path)

Build a full URL for the Location header.

Source code in src/servicekit/api/utilities.py
def build_location_url(request: Request, path: str) -> str:
    """Build a full URL for the Location header."""
    return f"{request.url.scheme}://{request.url.netloc}{path}"

run_app(app, *, host=None, port=None, workers=None, reload=None, log_level=None, **uvicorn_kwargs)

Run FastAPI app with Uvicorn development server.

For reload to work, pass a string in "module:app" format. App instance disables reload automatically.

Examples:

# Direct execution (reload disabled)
if __name__ == "__main__":
    run_app(app)

# With module path (reload enabled)
run_app("examples.config_api:app")

# Production: multiple workers
run_app(app, workers=4)

Parameters:

Name Type Description Default
app Any | str

FastAPI app instance OR string "module:app" path

required
host str | None

Server host (default: "127.0.0.1", env: HOST)

None
port int | None

Server port (default: 8000, env: PORT)

None
workers int | None

Number of worker processes (default: 1, env: WORKERS)

None
reload bool | None

Enable auto-reload (default: True for string, False for instance)

None
log_level str | None

Logging level (default: from LOG_LEVEL env var or "info")

None
**uvicorn_kwargs Any

Additional uvicorn.run() arguments

{}
Source code in src/servicekit/api/utilities.py
def run_app(
    app: Any | str,
    *,
    host: str | None = None,
    port: int | None = None,
    workers: int | None = None,
    reload: bool | None = None,
    log_level: str | None = None,
    **uvicorn_kwargs: Any,
) -> None:
    """Run FastAPI app with Uvicorn development server.

    For reload to work, pass a string in "module:app" format.
    App instance disables reload automatically.

    Examples:
    --------
        # Direct execution (reload disabled)
        if __name__ == "__main__":
            run_app(app)

        # With module path (reload enabled)
        run_app("examples.config_api:app")

        # Production: multiple workers
        run_app(app, workers=4)

    Args:
        app: FastAPI app instance OR string "module:app" path
        host: Server host (default: "127.0.0.1", env: HOST)
        port: Server port (default: 8000, env: PORT)
        workers: Number of worker processes (default: 1, env: WORKERS)
        reload: Enable auto-reload (default: True for string, False for instance)
        log_level: Logging level (default: from LOG_LEVEL env var or "info")
        **uvicorn_kwargs: Additional uvicorn.run() arguments
    """
    import uvicorn

    # Configure structured logging before uvicorn starts
    from servicekit.logging import configure_logging

    configure_logging()

    # Read from environment variables with defaults
    resolved_host: str = host if host is not None else os.getenv("HOST", "127.0.0.1")
    resolved_port: int = port if port is not None else int(os.getenv("PORT", "8000"))
    resolved_workers: int = workers if workers is not None else int(os.getenv("WORKERS", "1"))
    resolved_log_level: str = log_level if log_level is not None else os.getenv("LOG_LEVEL", "info").lower()

    # Auto-detect reload behavior if not specified
    if reload is None:
        reload = isinstance(app, str)  # Enable reload for string paths, disable for instances

    # Auto-reload is incompatible with multiple workers
    if resolved_workers > 1 and reload:
        reload = False

    uvicorn.run(
        app,
        host=resolved_host,
        port=resolved_port,
        workers=resolved_workers,
        reload=reload,
        log_level=resolved_log_level,
        log_config=None,  # Disable uvicorn's default logging config
        **uvicorn_kwargs,
    )