Skip to content

API Reference

Auto-generated from docstrings via mkdocstrings. The public surface is small — most users only touch the CLI — but if you're embedding Maneki in another tool, these are the entry points.

maneki.audio.metadata

Read source audio tags (FLAC / MP3 / generic) and write MP4 ALAC / AAC / MP3 tags.

metadata

Read source audio tags (FLAC / MP3 / generic) and write MP4 ALAC tags.

Public API is split across submodules; this module re-exports the names the rest of the project (and tests) import from maneki.audio.metadata.

Attributes

SUPPORTED_AUDIO_EXTS = frozenset({'.flac', '.mp3', '.m4a', '.m4b', '.aac', '.ogg', '.opus', '.wav', '.aiff', '.aif'}) module-attribute

Classes

SourceTrack

Bases: BaseModel

Tag bundle read from a single source audio file.

Source code in src/maneki/audio/metadata/models.py
class SourceTrack(BaseModel):
    """Tag bundle read from a single source audio file."""

    model_config = ConfigDict(arbitrary_types_allowed=True)

    path: Path
    title: str | None = None
    artist: str | None = None
    album_artist: str | None = None
    album: str | None = None
    date: str | None = None
    genre: str | None = None
    # Multi-genre support — FLAC repeats GENRE, ID3 repeats TCON, MP4 atoms
    # only carry one. `genre` stays as the primary (backwards-compat) and
    # equals `genres[0]` when both are populated.
    genres: list[str] = Field(default_factory=list)
    track_no: int | None = None
    track_total: int | None = None
    disc_no: int | None = None
    disc_total: int | None = None
    bpm: int | None = None
    label: str | None = None
    catalog: str | None = None
    lyrics: str | None = None
    replaygain: dict[str, str] = Field(default_factory=dict)
    embedded_picture: bytes | None = None
    embedded_picture_mime: str | None = None
    embedded_picture_pixels: int = 0
    duration_s: float | None = None  # audio duration; used by dedup to discriminate same-tag distinct content
    # MusicBrainz recording MBID (per-track, distinct from the album-level
    # release MBID). Populated by the MB enrichment follow-up call when
    # `--enrich` is on. Picard convention: stored as `MusicBrainz Track Id`
    # on MP4 freeform / `MusicBrainz Recording Id` on ID3 TXXX.
    mb_recording_id: str | None = None
    # Encoder / encoding-tool string. Read from MP4 `\xa9too` and ID3
    # `TSSE`. Files we produce embed `maneki X.Y.Z` here so a later
    # `maneki inspect` shows which release wrote them.
    encoder: str | None = None

AlbumSummary

Bases: BaseModel

Album-level rollup derived by majority-vote across the album's tracks.

Source code in src/maneki/audio/metadata/models.py
class AlbumSummary(BaseModel):
    """Album-level rollup derived by majority-vote across the album's tracks."""

    album: str | None = None
    album_artist: str | None = None
    artist_fallback: str | None = None
    year: str | None = None
    genre: str | None = None
    track_total: int | None = None
    disc_total: int | None = None
    is_compilation: bool = False
    label: str | None = None
    catalog: str | None = None

MusicBrainzIds

Bases: BaseModel

Album-level MusicBrainz IDs supplied by an --enrich provider.

Per-track recording MBIDs live on SourceTrack.mb_recording_id — they vary per track and don't belong on an album-scope object.

Source code in src/maneki/audio/metadata/models.py
class MusicBrainzIds(BaseModel):
    """Album-level MusicBrainz IDs supplied by an --enrich provider.

    Per-track recording MBIDs live on `SourceTrack.mb_recording_id` —
    they vary per track and don't belong on an album-scope object.
    """

    album_id: str | None = None
    artist_id: str | None = None
    release_group_id: str | None = None

TagOverrides

Bases: BaseModel

Optional tag overrides applied in-place by apply_tag_overrides.

Each field is None to mean "leave the existing tag alone". Pass an empty string to clear a tag explicitly (rare; typically you just leave it).

Source code in src/maneki/audio/metadata/models.py
class TagOverrides(BaseModel):
    """Optional tag overrides applied in-place by `apply_tag_overrides`.

    Each field is `None` to mean "leave the existing tag alone". Pass an empty
    string to *clear* a tag explicitly (rare; typically you just leave it).
    """

    title: str | None = None
    artist: str | None = None
    album_artist: str | None = None
    album: str | None = None
    year: str | None = None
    genre: str | None = None
    track_no: int | None = None
    track_total: int | None = None
    disc_no: int | None = None
    disc_total: int | None = None

    def is_empty(self) -> bool:
        return all(v is None for v in self.model_dump().values())

Functions

read_source(path, *, light=False, measure_pictures=False)

Read tags + embedded cover from a single audio file.

Source values that arrive entirely lowercase are smart-title-cased here so downstream filenames + tags display consistently. Anything with real casing (AC/DC, ABBA, iPhone, R.E.M.) is left alone.

light=True skips the two expensive operations the convert pipeline needs but the library scanner doesn't: - Pillow decode of the embedded picture (for cover_pixels) - A second mutagen open to read info.length (for duration_s) has_cover still works in light mode (presence is checked without touching the bytes); only the pixel measurement is skipped.

measure_pictures=True re-enables the Pillow decode even under light=True, so audit modes that need low-res-cover detection can pay just that cost without also paying the duration probe.

Source code in src/maneki/audio/metadata/read.py
def read_source(
    path: Path,
    *,
    light: bool = False,
    measure_pictures: bool = False,
) -> SourceTrack:
    """Read tags + embedded cover from a single audio file.

    Source values that arrive entirely lowercase are smart-title-cased here
    so downstream filenames + tags display consistently. Anything with real
    casing (`AC/DC`, `ABBA`, `iPhone`, `R.E.M.`) is left alone.

    `light=True` skips the two expensive operations the convert pipeline
    needs but the library scanner doesn't:
      - Pillow decode of the embedded picture (for `cover_pixels`)
      - A second mutagen open to read `info.length` (for `duration_s`)
    `has_cover` still works in light mode (presence is checked without
    touching the bytes); only the pixel measurement is skipped.

    `measure_pictures=True` re-enables the Pillow decode even under
    `light=True`, so audit modes that need low-res-cover detection can
    pay just that cost without also paying the duration probe.
    """
    suffix = path.suffix.lower()
    if suffix == ".flac":
        track = _read_flac(path, light=light, measure_pictures=measure_pictures)
    elif suffix == ".mp3":
        track = _read_mp3(path, light=light, measure_pictures=measure_pictures)
    elif suffix in (".m4a", ".mp4", ".m4b"):
        track = _read_mp4(path, light=light, measure_pictures=measure_pictures)
    else:
        track = _read_generic(path, light=light, measure_pictures=measure_pictures)
    track.title = smart_title_case(track.title)
    track.artist = smart_title_case(track.artist)
    track.album = smart_title_case(track.album)
    track.album_artist = smart_title_case(track.album_artist)
    return track

summarize_album(tracks)

Build an album-level summary by majority-vote across tracks.

For multi-disc albums the album-name vote is biased toward disc 1 — bonus discs often carry tags like Album (CD2) Live In ... that would otherwise win on count and produce a misleading combined name.

Source code in src/maneki/audio/metadata/album.py
def summarize_album(tracks: list[SourceTrack]) -> AlbumSummary:
    """Build an album-level summary by majority-vote across `tracks`.

    For multi-disc albums the album-name vote is biased toward disc 1 — bonus
    discs often carry tags like `Album (CD2) Live In ...` that would otherwise
    win on count and produce a misleading combined name.
    """
    disc_one_tracks = [t for t in tracks if t.disc_no == 1]
    album_source = disc_one_tracks if disc_one_tracks else tracks
    # Album name should be unanimous within a real album. Require quorum so a
    # single stray tagged track (foreign album mixed into the rip) can't
    # impersonate the whole-album value when most tracks have no album tag.
    album = clean_album_title(_majority((t.album for t in album_source), quorum=True))
    raw_album_artist = _majority(t.album_artist for t in tracks)
    year = _majority(t.date for t in tracks)
    genre = _majority(t.genre for t in tracks)
    label = _majority(t.label for t in tracks)
    catalog = _majority(t.catalog for t in tracks)

    artist_counts = Counter(t.artist for t in tracks if t.artist)
    distinct_artists = len(artist_counts)
    artist_fallback = artist_counts.most_common(1)[0][0] if artist_counts else None

    # Now apply the album_artist cleanup against the resolved per-track
    # artist majority. Done after artist_fallback so we have something
    # to fall back to.
    album_artist = clean_album_artist(raw_album_artist, artist_fallback)

    track_total = max((t.track_total or 0 for t in tracks), default=0) or len(tracks) or None
    disc_total = max((t.disc_total or 0 for t in tracks), default=0) or None

    # Compilation if: album_artist is a VA alias, the per-track artist majority
    # is itself a VA alias (rips that leave album_artist empty but stamp every
    # track artist as `VA`), or there's no album_artist + tracks span multiple
    # different artists.
    is_compilation = (
        is_various_artists(album_artist)
        or is_various_artists(artist_fallback)
        or (album_artist is None and distinct_artists > 1)
    )

    return AlbumSummary(
        album=album,
        album_artist=album_artist,
        artist_fallback=artist_fallback,
        year=year,
        genre=genre,
        track_total=track_total,
        disc_total=disc_total,
        is_compilation=is_compilation,
        label=label,
        catalog=catalog,
    )

clean_album_title(album)

Clean disc markers, scene-rip dot-separators, and VA - prefixes from an album tag.

Strips: - trailing [CDx] / (Disc x) / - CD 1 / [CD.1] markers - embedded (CDx) markers (Cranberries Roses (CD2) Live In Madrid shape) - trailing (1) / (2) (bare-paren disc index, no keyword) - dots / underscores used as word-separator instead of spaces (Absolute.Music.60, Absolute_Music_45Absolute Music 60/45); preserves single-letter acronyms like R.E.M. - leading VA - / VA.-. / Various - prefixes once the dots are space

Source code in src/maneki/audio/metadata/album.py
def clean_album_title(album: str | None) -> str | None:
    """Clean disc markers, scene-rip dot-separators, and `VA -` prefixes from an album tag.

    Strips:
    - trailing `[CDx]` / `(Disc x)` / ` - CD 1` / `[CD.1]` markers
    - embedded `(CDx)` markers (Cranberries `Roses (CD2) Live In Madrid` shape)
    - trailing `(1)` / `(2)` (bare-paren disc index, no keyword)
    - dots / underscores used as word-separator instead of spaces
      (`Absolute.Music.60`, `Absolute_Music_45` → `Absolute Music 60/45`);
      preserves single-letter acronyms like `R.E.M.`
    - leading `VA - ` / `VA.-.` / `Various -` prefixes once the dots are space
    """
    if not album:
        return album
    cleaned = album
    # Repeatedly strip trailing disc markers (handles `Album [CD1] (Deluxe)`).
    while True:
        # Both numeric (`Disc 2`) and word-form (`Disc Two`) suffixes — some
        # box sets ship the latter, e.g. Queen `Greatest Hits I / Disc One`.
        stripped = _DISC_SUFFIX_RE.sub("", cleaned).strip(" -")
        stripped = _DISC_WORD_SUFFIX_RE.sub("", stripped).strip(" -")
        # Any leftover trailing separator (slash / dash / etc.) — common
        # after stripping `Album / Disc One` → `Album /`.
        stripped = _TRAILING_SEPARATOR_RE.sub("", stripped).strip()
        if stripped == cleaned:
            break
        cleaned = stripped
    # Strip embedded `(CDx)` markers (Cranberries Roses-style: `Roses (CD2) Live In Madrid`).
    cleaned_mid = _DISC_KEYWORD_RE.sub(" ", cleaned)
    cleaned_mid = re.sub(r"\s+", " ", cleaned_mid).strip(" -")
    if cleaned_mid:
        cleaned = cleaned_mid
    # Last pass: strip trailing `(1)` / `(2)` etc. (disc number without a keyword).
    bare = _BARE_DISC_PAREN_RE.sub("", cleaned).strip(" -")
    if bare:
        cleaned = bare
    # Dots/underscores as separator: replace between multi-letter chunks with space.
    cleaned = _SCENE_DOT_SEP_RE.sub(" ", cleaned)
    cleaned = _SCENE_USCORE_SEP_RE.sub(" ", cleaned)
    # Strip leading VA prefix (now that dots are spaces, `VA.-.Foo` reads
    # as `VA - Foo` / `VA.-.Foo`; either way the prefix should go).
    cleaned = _VA_PREFIX_IN_ALBUM_RE.sub("", cleaned)
    cleaned = re.sub(r"\s+", " ", cleaned).strip(" -")
    return cleaned or album

write_tags(path, track, album, *, cover_bytes, cover_mime, musicbrainz=None)

Write the target tag set to path, dispatching by file extension.

Source code in src/maneki/audio/metadata/write.py
def write_tags(
    path: Path,
    track: SourceTrack,
    album: AlbumSummary,
    *,
    cover_bytes: bytes | None,
    cover_mime: str | None,
    musicbrainz: MusicBrainzIds | None = None,
) -> None:
    """Write the target tag set to `path`, dispatching by file extension."""
    suffix = path.suffix.lower()
    if suffix == ".mp3":
        write_id3_tags(path, track, album, cover_bytes=cover_bytes, cover_mime=cover_mime, musicbrainz=musicbrainz)
    elif suffix in (".m4a", ".mp4", ".m4b"):
        write_mp4_tags(path, track, album, cover_bytes=cover_bytes, cover_mime=cover_mime, musicbrainz=musicbrainz)
    else:
        raise ValueError(f"unsupported output extension for tag writing: {suffix}")

write_mp4_tags(path, track, album, *, cover_bytes, cover_mime, musicbrainz=None)

Write the full target tag set to an existing ALAC/AAC .m4a file.

Source code in src/maneki/audio/metadata/write.py
def write_mp4_tags(
    path: Path,
    track: SourceTrack,
    album: AlbumSummary,
    *,
    cover_bytes: bytes | None,
    cover_mime: str | None,
    musicbrainz: MusicBrainzIds | None = None,
) -> None:
    """Write the full target tag set to an existing ALAC/AAC `.m4a` file."""
    mp4 = MP4(path)
    tags = mp4.tags
    if tags is None:
        mp4.add_tags()
        tags = mp4.tags
    assert tags is not None  # appease type-checker; add_tags always populates

    tags.clear()
    _set(tags, "\xa9nam", track.title)
    _set(tags, "\xa9ART", track.artist or album.artist_fallback)
    _set(tags, "\xa9alb", album.album)
    _set(tags, "aART", "Various Artists" if album.is_compilation else (album.album_artist or album.artist_fallback))
    _set(tags, "\xa9day", _year_only(album.year))
    _set(tags, "\xa9gen", track.genre or album.genre)
    _set(tags, "\xa9lyr", track.lyrics)

    track_no = track.track_no or 0
    track_total = track.track_total or album.track_total or 0
    if track_no or track_total:
        tags["trkn"] = [(track_no, track_total)]

    disc_no = track.disc_no or 0
    disc_total = track.disc_total or album.disc_total or 0
    if disc_no or disc_total:
        tags["disk"] = [(disc_no, disc_total)]

    if track.bpm is not None and track.bpm > 0:
        tags["tmpo"] = [int(track.bpm)]

    if album.is_compilation:
        tags["cpil"] = True

    label = track.label or album.label
    catalog = track.catalog or album.catalog
    _set_freeform(tags, "LABEL", label)
    _set_freeform(tags, "CATALOGNUMBER", catalog)
    for key, value in track.replaygain.items():
        _set_freeform(tags, key, value)

    if musicbrainz:
        _set_freeform(tags, "MusicBrainz Album Id", musicbrainz.album_id)
        _set_freeform(tags, "MusicBrainz Artist Id", musicbrainz.artist_id)
        _set_freeform(tags, "MusicBrainz Release Group Id", musicbrainz.release_group_id)
    # Per-track recording MBID — Picard convention is to store this as
    # the iTunes "MusicBrainz Track Id" freeform (despite the name, it's
    # the recording MBID, not the release-track MBID). Written even when
    # no album-level musicbrainz block is present, since per-track lookups
    # via AcoustID can produce recording IDs without an album hit.
    if track.mb_recording_id:
        _set_freeform(tags, "MusicBrainz Track Id", track.mb_recording_id)

    if cover_bytes:
        cover_format = MP4Cover.FORMAT_PNG if (cover_mime or "").lower().endswith("png") else MP4Cover.FORMAT_JPEG
        tags["covr"] = [MP4Cover(cover_bytes, imageformat=cover_format)]

    # iTunes-style "encoder" atom — read by `maneki inspect`, ffprobe,
    # and most tag editors as the encoder/encoding-tool field. Always
    # last so it overwrites any inherited value.
    _set(tags, "\xa9too", _encoder_tag())

    mp4.save()

write_id3_tags(path, track, album, *, cover_bytes, cover_mime, musicbrainz=None)

Write the full target tag set to an MP3 file as ID3v2.4.

Source code in src/maneki/audio/metadata/write.py
def write_id3_tags(
    path: Path,
    track: SourceTrack,
    album: AlbumSummary,
    *,
    cover_bytes: bytes | None,
    cover_mime: str | None,
    musicbrainz: MusicBrainzIds | None = None,
) -> None:
    """Write the full target tag set to an MP3 file as ID3v2.4."""
    try:
        id3 = ID3(path)
    except ID3NoHeaderError:
        id3 = ID3()

    id3.delete()

    title = track.title
    artist = track.artist or album.artist_fallback
    album_artist = "Various Artists" if album.is_compilation else (album.album_artist or album.artist_fallback)
    year = _year_only(album.year)
    genre = track.genre or album.genre

    if title:
        id3.add(TIT2(encoding=3, text=title))
    if artist:
        id3.add(TPE1(encoding=3, text=artist))
    if album.album:
        id3.add(TALB(encoding=3, text=album.album))
    if album_artist:
        id3.add(TPE2(encoding=3, text=album_artist))
    if year:
        id3.add(TDRC(encoding=3, text=year))
    if genre:
        id3.add(TCON(encoding=3, text=genre))

    track_no = track.track_no or 0
    track_total = track.track_total or album.track_total or 0
    if track_no or track_total:
        id3.add(TRCK(encoding=3, text=f"{track_no}/{track_total}" if track_total else str(track_no)))

    disc_no = track.disc_no or 0
    disc_total = track.disc_total or album.disc_total or 0
    if disc_no or disc_total:
        id3.add(TPOS(encoding=3, text=f"{disc_no}/{disc_total}" if disc_total else str(disc_no)))

    if track.bpm is not None and track.bpm > 0:
        id3.add(TBPM(encoding=3, text=str(int(track.bpm))))

    if album.is_compilation:
        id3.add(TCMP(encoding=3, text="1"))

    label = track.label or album.label
    if label:
        id3.add(TPUB(encoding=3, text=label))

    if track.lyrics:
        id3.add(USLT(encoding=3, lang="eng", desc="", text=track.lyrics))

    catalog = track.catalog or album.catalog
    if catalog:
        id3.add(TXXX(encoding=3, desc="CATALOGNUMBER", text=catalog))

    for key, value in track.replaygain.items():
        id3.add(TXXX(encoding=3, desc=key, text=value))

    if musicbrainz:
        mb_pairs: list[tuple[str, str | None]] = [
            ("MusicBrainz Album Id", musicbrainz.album_id),
            ("MusicBrainz Artist Id", musicbrainz.artist_id),
            ("MusicBrainz Release Group Id", musicbrainz.release_group_id),
        ]
        for desc, mb_value in mb_pairs:
            if mb_value:
                id3.add(TXXX(encoding=3, desc=desc, text=mb_value))
    # Per-track recording MBID — Picard's `MusicBrainz Recording Id` TXXX
    # frame. Independent of the album-level block above so that AcoustID-
    # only lookups can still emit a recording MBID.
    if track.mb_recording_id:
        id3.add(TXXX(encoding=3, desc="MusicBrainz Recording Id", text=track.mb_recording_id))

    if cover_bytes:
        mime = "image/png" if (cover_mime or "").lower().endswith("png") else "image/jpeg"
        id3.add(APIC(encoding=3, mime=mime, type=3, desc="Front cover", data=cover_bytes))

    # ID3v2.4 "Software/hardware and settings used for encoding" — the
    # ID3 equivalent of MP4's `\xa9too`. Mirrors what we write in
    # `write_mp4_tags`; same release-traceability use case.
    id3.add(TSSE(encoding=3, text=_encoder_tag()))

    id3.save(path, v2_version=4)

embed_cover_only(path, *, cover_bytes, cover_mime)

Replace the cover of an existing audio file without touching other tags.

Supports .m4a/.mp4/.m4b, .mp3, and .flac. Used by maneki cover to retrofit album art onto already-converted files. All previous pictures are dropped first so we don't end up with multiple covers.

Source code in src/maneki/audio/metadata/write.py
def embed_cover_only(path: Path, *, cover_bytes: bytes, cover_mime: str) -> None:
    """Replace the cover of an existing audio file without touching other tags.

    Supports `.m4a/.mp4/.m4b`, `.mp3`, and `.flac`. Used by `maneki cover`
    to retrofit album art onto already-converted files. All previous pictures
    are dropped first so we don't end up with multiple covers.
    """
    suffix = path.suffix.lower()
    if suffix in (".m4a", ".mp4", ".m4b"):
        mp4 = MP4(path)
        if mp4.tags is None:
            mp4.add_tags()
        tags = mp4.tags
        assert tags is not None
        cover_format = MP4Cover.FORMAT_PNG if cover_mime.lower().endswith("png") else MP4Cover.FORMAT_JPEG
        tags["covr"] = [MP4Cover(cover_bytes, imageformat=cover_format)]
        mp4.save()
        return
    if suffix == ".mp3":
        try:
            id3 = ID3(path)
        except ID3NoHeaderError:
            id3 = ID3()
        for apic_key in list(id3.keys()):
            if apic_key.startswith("APIC"):
                del id3[apic_key]
        mime = "image/png" if cover_mime.lower().endswith("png") else "image/jpeg"
        id3.add(APIC(encoding=3, mime=mime, type=3, desc="Front cover", data=cover_bytes))
        id3.save(path, v2_version=4)
        return
    if suffix == ".flac":
        from mutagen.flac import Picture  # local import — only needed for FLAC

        flac = FLAC(path)
        flac.clear_pictures()
        picture = Picture()
        picture.type = 3  # front cover
        picture.mime = "image/png" if cover_mime.lower().endswith("png") else "image/jpeg"
        picture.data = cover_bytes
        flac.add_picture(picture)
        flac.save()
        return
    raise ValueError(f"unsupported audio file for cover injection: {path}")

apply_tag_overrides(path, overrides)

Apply overrides to path in-place; leave unspecified tags untouched.

Supports .m4a/.mp4/.m4b, .mp3, .flac. Track totals get merged into the existing (track, total) tuple so we don't lose the per-track number.

Source code in src/maneki/audio/metadata/overrides.py
def apply_tag_overrides(path: Path, overrides: TagOverrides) -> None:
    """Apply `overrides` to `path` in-place; leave unspecified tags untouched.

    Supports `.m4a/.mp4/.m4b`, `.mp3`, `.flac`. Track totals get merged into
    the existing `(track, total)` tuple so we don't lose the per-track number.
    """
    if overrides.is_empty():
        return
    suffix = path.suffix.lower()
    if suffix in (".m4a", ".mp4", ".m4b"):
        _apply_overrides_mp4(path, overrides)
        return
    if suffix == ".mp3":
        _apply_overrides_id3(path, overrides)
        return
    if suffix == ".flac":
        _apply_overrides_flac(path, overrides)
        return
    raise ValueError(f"unsupported audio file for tag override: {path}")

maneki.audio.library

Walk a converted-output directory, build an Artist→Album→Track index, audit it, fix the deterministic warnings, and persist it as a SQLite cache at <root>/.maneki/index.db.

library

Walk a converted-output directory, build an Artist→Album→Track index, audit it.

Public surface re-exported here so callers keep using from maneki import library / from maneki.audio.library import …. The leading-underscore helpers _audit_cover and _split_dir_year are also re-exported because tests/CLI import them directly.

Attributes

SCHEMA_VERSION = 1 module-attribute

Bumped when _SCHEMA changes shape; mismatched DBs are unlinked + rebuilt.

INDEX_DIR_NAME = '.maneki' module-attribute

INDEX_DB_NAME = 'index.db' module-attribute

ScanProgressCallback = Callable[[Path, int, int], None] module-attribute

Classes

LibraryTrack

Bases: BaseModel

Track-level summary used by LibraryIndex.

Source code in src/maneki/audio/library/models.py
class LibraryTrack(BaseModel):
    """Track-level summary used by `LibraryIndex`."""

    model_config = ConfigDict(arbitrary_types_allowed=True)

    path: Path
    title: str | None = None
    artist: str | None = None
    album_artist: str | None = None
    album: str | None = None
    year: str | None = None
    track_no: int | None = None
    disc_no: int | None = None
    genre: str | None = None
    genres: list[str] = []
    lyrics: str | None = None
    # ReplayGain values from source tags (`replaygain_track_gain`,
    # `replaygain_album_gain`, `..._peak`). Empty dict when the source had
    # no RG tags. AudioPlayer uses these to normalise level differences
    # between tracks during local playback.
    replaygain: dict[str, str] = {}
    duration_s: float = 0.0
    has_cover: bool = False
    cover_pixels: int = 0
    # When set, clients play this URL instead of `path` - populated by the
    # Subsonic client mode so the same widgets/format helpers work for both
    # local files and remote streams.
    stream_url: str | None = None
    # ISO 8601 timestamp the track was starred at, or None when not starred.
    # Populated by Subsonic-client mode from `getAlbum` / `getStarred2`
    # responses (the server enriches every payload via `StarStore.enrich`).
    # Clients surface it as a heart glyph in the track row; toggling fires
    # `/rest/star` / `/rest/unstar` and rewrites this field optimistically.
    starred: str | None = None

LibraryAlbum

Bases: BaseModel

Album-level rollup with audit warnings populated by audit().

Source code in src/maneki/audio/library/models.py
class LibraryAlbum(BaseModel):
    """Album-level rollup with audit warnings populated by `audit()`."""

    model_config = ConfigDict(arbitrary_types_allowed=True)

    path: Path
    artist_dir: str
    album_dir: str
    tag_album: str | None = None
    tag_year: str | None = None
    tag_album_artist: str | None = None
    tag_genre: str | None = None
    track_count: int = 0
    disc_count: int = 1
    is_compilation: bool = False
    has_cover: bool = False
    cover_pixels: int = 0
    tracks: list[LibraryTrack] = []
    warnings: list[str] = []
    # When set, this album was sourced from a Subsonic server with this ID.
    # Clients use it to lazy-load tracks via `getAlbum?id=...` when the
    # user opens the album, instead of pre-fetching every track at launch.
    subsonic_id: str | None = None

LibraryIndex

Bases: BaseModel

Full library index, sorted by (artist_dir, album_dir).

Source code in src/maneki/audio/library/models.py
class LibraryIndex(BaseModel):
    """Full library index, sorted by `(artist_dir, album_dir)`."""

    model_config = ConfigDict(arbitrary_types_allowed=True)

    root: Path
    albums: list[LibraryAlbum] = []

ValidationResult

Counts returned by validate() for one-line logging.

Source code in src/maneki/audio/library/scan.py
class ValidationResult:
    """Counts returned by `validate()` for one-line logging."""

    __slots__ = ("added", "removed", "updated")

    def __init__(self, *, added: int, removed: int, updated: int) -> None:
        self.added = added
        self.removed = removed
        self.updated = updated

    def __bool__(self) -> bool:
        return bool(self.added or self.removed or self.updated)

    def __repr__(self) -> str:  # pragma: no cover — debug aid only
        return f"ValidationResult(added={self.added}, removed={self.removed}, updated={self.updated})"

Functions

audit_album(album)

Replace album.warnings with a fresh audit pass for one album.

Warnings are sorted alphabetically at the end so the in-memory LibraryIndex produced by scan_full matches the one produced by load (SQLite returns album_warnings rows ORDER BY warning).

Source code in src/maneki/audio/library/audit.py
def audit_album(album: LibraryAlbum) -> None:
    """Replace `album.warnings` with a fresh audit pass for one album.

    Warnings are sorted alphabetically at the end so the in-memory
    `LibraryIndex` produced by `scan_full` matches the one produced by
    `load` (SQLite returns `album_warnings` rows ORDER BY warning).
    """
    album.warnings = []
    _audit_cover(album)
    _audit_year(album)
    _audit_album_artist(album)
    _audit_album_name(album)
    _audit_artist_name(album)
    _audit_tag_path_mismatch(album)
    _audit_track_gaps(album)
    _audit_track_count(album)
    album.warnings.sort()

fix_index(index, *, dry_run=False, console=None, year_lookup=None, prefer_dirname=False, on_album=None)

Apply deterministic fixes to every flagged album in index.

Returns a list of human-readable action lines. year_lookup is the MusicBrainz year-lookup callable (defaults to enrich.musicbrainz.lookup_release_year — injectable for tests).

prefer_dirname=True inverts the tag/path-mismatch resolution: tags get rewritten from the dir name instead of the dir being renamed from the tag. Use this when you've hand-curated the directory layout and want the tags to follow.

on_album(album, idx, total) fires once per FLAGGED album right before its fixes run; clean albums (no warnings) are skipped silently and don't count against the total. Used by the CLI to drive a progress bar through the slow MB lookups.

Source code in src/maneki/audio/library/fix.py
def fix_index(
    index: LibraryIndex,
    *,
    dry_run: bool = False,
    console: Console | None = None,
    year_lookup: object | None = None,
    prefer_dirname: bool = False,
    on_album: FixProgressCallback | None = None,
) -> list[str]:
    """Apply deterministic fixes to every flagged album in `index`.

    Returns a list of human-readable action lines. `year_lookup` is the
    MusicBrainz year-lookup callable (defaults to
    `enrich.musicbrainz.lookup_release_year` — injectable for tests).

    `prefer_dirname=True` inverts the tag/path-mismatch resolution: tags
    get rewritten from the dir name instead of the dir being renamed from
    the tag. Use this when you've hand-curated the directory layout and
    want the tags to follow.

    `on_album(album, idx, total)` fires once per FLAGGED album right
    before its fixes run; clean albums (no warnings) are skipped silently
    and don't count against the total. Used by the CLI to drive a
    progress bar through the slow MB lookups.
    """
    if year_lookup is None:
        from maneki.audio.enrich.musicbrainz import lookup_release_year

        year_lookup = lookup_release_year

    flagged = [a for a in index.albums if a.warnings]
    total = len(flagged)
    actions: list[str] = []
    for idx, album in enumerate(flagged, start=1):
        if on_album is not None:
            on_album(album, idx, total)
        actions.extend(
            fix_album(
                album,
                dry_run=dry_run,
                console=console,
                year_lookup=year_lookup,
                prefer_dirname=prefer_dirname,
            )
        )
    return actions

fix_album(album, *, dry_run=False, console=None, year_lookup, prefer_dirname=False)

Apply fixes to one album. Returns the action lines performed (or planned).

Source code in src/maneki/audio/library/fix.py
def fix_album(
    album: LibraryAlbum,
    *,
    dry_run: bool = False,
    console: Console | None = None,
    year_lookup: object,
    prefer_dirname: bool = False,
) -> list[str]:
    """Apply fixes to one album. Returns the action lines performed (or planned)."""
    actions: list[str] = []
    label = f"{album.artist_dir} / {album.album_dir}"

    # Missing-year fixes go first so the rename below sees the new year.
    if any("missing year" in w for w in album.warnings):
        new_year = _fix_missing_year(album, dry_run=dry_run, year_lookup=year_lookup)
        if new_year:
            actions.append(f"{label}: year ← {new_year} (musicbrainz)")
            if console is not None:
                console.print(f"[green]✓[/green] {label}: year ← {new_year} (musicbrainz)")

    has_mismatch = any(w.startswith("tag/path mismatch") for w in album.warnings)
    if prefer_dirname:
        # Push dir-name → tags. Year set by MB above (if any) is preserved
        # only if the dir has no leading year prefix.
        if has_mismatch:
            updated = _fix_retag_to_match_dir(album, dry_run=dry_run)
            if updated:
                tag_album, tag_year = updated
                msg = f"{label}: tag ← album={tag_album!r}"
                if tag_year:
                    msg += f", year={tag_year}"
                actions.append(msg)
                if console is not None:
                    console.print(f"[green]✓[/green] {msg}")
    else:
        # Default: tag wins, rename the dir to match.
        if has_mismatch or actions:
            renamed = _fix_rename_to_match_tag(album, dry_run=dry_run)
            if renamed:
                actions.append(f"{label}: renamed dir → {renamed}")
                if console is not None:
                    console.print(f"[green]✓[/green] {label}: renamed dir → {renamed}")

    return actions

db_path(root)

Return <root>/.maneki/index.db (the absolute index location).

Source code in src/maneki/audio/library/db.py
def db_path(root: Path) -> Path:
    """Return `<root>/.maneki/index.db` (the absolute index location)."""
    return root / INDEX_DIR_NAME / INDEX_DB_NAME

open_db(root)

Open or create the index DB for root.

If the existing DB has a stale schema_version or was written for a different library_root_abs, the file (and any WAL sidecars) is unlinked and a fresh schema is created.

Source code in src/maneki/audio/library/db.py
def open_db(root: Path) -> sqlite3.Connection:
    """Open or create the index DB for `root`.

    If the existing DB has a stale `schema_version` or was written for a
    different `library_root_abs`, the file (and any WAL sidecars) is
    unlinked and a fresh schema is created.
    """
    path = db_path(root)
    path.parent.mkdir(parents=True, exist_ok=True)

    if path.exists() and not _can_use_existing(path, root):
        log.info("library index: schema/root/version mismatch at %s; rebuilding", path)
        _unlink_db(path)

    fresh = not path.exists()
    conn = sqlite3.connect(path, check_same_thread=False, isolation_level=None)
    conn.row_factory = sqlite3.Row
    _apply_pragmas(conn)
    if fresh:
        _create_schema(conn, root)
    else:
        # Reusing the existing index (schema + root matched). Refresh the
        # maneki_version stamp to the running version so it records "last run
        # by" rather than reading like a stale older release forever — this is
        # a one-row update, NOT a re-index (the version bump never triggers
        # one; only a SCHEMA_VERSION / root change does).
        conn.execute("UPDATE meta SET value=? WHERE key='maneki_version'", (MANEKI_VERSION,))
    return conn

is_empty(conn)

True when the DB has no album rows yet (fresh schema, never scanned).

Source code in src/maneki/audio/library/db.py
def is_empty(conn: sqlite3.Connection) -> bool:
    """True when the DB has no album rows yet (fresh schema, never scanned)."""
    row = conn.execute("SELECT 1 FROM albums LIMIT 1").fetchone()
    return row is None

load_or_scan(root, *, use_cache=True, force=False, on_album=None, measure_pictures=False)

Return a LibraryIndex for root, using the on-disk cache when available.

use_cache=False skips the DB entirely (in-memory scan + audit). Used when .maneki/ cannot be created (read-only mount) or when the caller passes --no-cache.

force=True ignores any existing cache and runs a full rescan, rewriting every row. Maps to the --full-rescan CLI flag and the startScan Subsonic endpoint.

Without force, a warm cache is loaded and a validate() pass reconciles the DB against any filesystem-level adds/removes/tag-edits that happened while no watcher was running.

Source code in src/maneki/audio/library/load.py
def load_or_scan(
    root: Path,
    *,
    use_cache: bool = True,
    force: bool = False,
    on_album: ScanProgressCallback | None = None,
    measure_pictures: bool = False,
) -> LibraryIndex:
    """Return a `LibraryIndex` for `root`, using the on-disk cache when available.

    `use_cache=False` skips the DB entirely (in-memory scan + audit). Used
    when `.maneki/` cannot be created (read-only mount) or when the
    caller passes `--no-cache`.

    `force=True` ignores any existing cache and runs a full rescan,
    rewriting every row. Maps to the `--full-rescan` CLI flag and the
    `startScan` Subsonic endpoint.

    Without `force`, a warm cache is loaded and a `validate()` pass
    reconciles the DB against any filesystem-level adds/removes/tag-edits
    that happened while no watcher was running.
    """
    from maneki.audio.library.audit import audit
    from maneki.audio.library.db import open_db
    from maneki.audio.library.scan import scan, scan_full, validate

    if not use_cache:
        index = scan(root, on_album=on_album, measure_pictures=measure_pictures)
        audit(index)
        return index

    try:
        conn = open_db(root)
    except OSError as exc:
        log.warning(
            "library cache disabled: cannot create %s/.maneki (%s); falling back to in-memory scan",
            root,
            exc,
        )
        index = scan(root, on_album=on_album, measure_pictures=measure_pictures)
        audit(index)
        return index

    try:
        from maneki.audio.library.db import is_empty

        if force or is_empty(conn):
            return scan_full(root, conn, on_album=on_album, measure_pictures=measure_pictures)
        result = validate(root, conn, measure_pictures=measure_pictures, on_album=on_album)
        if result:
            log.info(
                "library cache: validated (added=%d removed=%d updated=%d)",
                result.added,
                result.removed,
                result.updated,
            )
        return load(root, conn)
    finally:
        conn.close()

scan_full(root, conn, *, on_album=None, measure_pictures=False)

Walk root, audit, and write the full result to the index DB.

Used on cold start when the DB is empty and after a startScan. Wipes every album/track/genre/warning row first so the DB matches the filesystem exactly. Returns the same LibraryIndex that callers used to get from scan() + audit().

Source code in src/maneki/audio/library/scan.py
def scan_full(
    root: Path,
    conn: sqlite3.Connection,
    *,
    on_album: ScanProgressCallback | None = None,
    measure_pictures: bool = False,
) -> LibraryIndex:
    """Walk `root`, audit, and write the full result to the index DB.

    Used on cold start when the DB is empty and after a `startScan`. Wipes
    every album/track/genre/warning row first so the DB matches the
    filesystem exactly. Returns the same `LibraryIndex` that callers used
    to get from `scan()` + `audit()`.
    """
    # Imported here to avoid a circular import: audit.py depends on scan.py
    # for `_split_dir_year`, so we can't import it at module scope.
    from maneki.audio.library.audit import audit

    index = scan(root, on_album=on_album, measure_pictures=measure_pictures)
    audit(index)
    write_index(conn, root, index)
    return index

validate(root, conn, *, measure_pictures=False, on_album=None)

Diff the filesystem against DB rows and apply add/remove/update deltas.

Catches changes that happened while no serve/watcher was running: new albums dropped in, removed albums, tag edits applied with another tool. Each affected album is re-scanned in full and re-audited; rows for vanished albums are dropped.

Returns a ValidationResult so callers can log a one-line summary.

Source code in src/maneki/audio/library/scan.py
def validate(
    root: Path,
    conn: sqlite3.Connection,
    *,
    measure_pictures: bool = False,
    on_album: ScanProgressCallback | None = None,
) -> "ValidationResult":
    """Diff the filesystem against DB rows and apply add/remove/update deltas.

    Catches changes that happened while no `serve`/watcher was running:
    new albums dropped in, removed albums, tag edits applied with another
    tool. Each affected album is re-scanned in full and re-audited; rows
    for vanished albums are dropped.

    Returns a `ValidationResult` so callers can log a one-line summary.
    """
    fs_album_dirs = {p.resolve() for p in _iter_album_dirs(root)}

    db_track_rows = list(conn.execute("SELECT id, album_id, rel_path, file_mtime, file_size FROM tracks"))
    db_album_rows = list(conn.execute("SELECT id, rel_path FROM albums"))

    db_track_keys = {row["rel_path"]: row for row in db_track_rows}
    db_album_dirs = {(root / row["rel_path"]).resolve(): row for row in db_album_rows}

    affected: set[Path] = set()

    # Albums that vanished entirely → row deletion only, no rescan.
    for db_dir, _row in db_album_dirs.items():
        if db_dir not in fs_album_dirs:
            affected.add(db_dir)

    # New albums on disk that the DB doesn't know about.
    for fs_dir in fs_album_dirs:
        if fs_dir not in db_album_dirs:
            affected.add(fs_dir)

    # For album dirs the DB and FS both know, find tag-edit / file-add /
    # file-remove deltas at the track level.
    fs_audio_by_dir: dict[Path, set[Path]] = {}
    for fs_dir in fs_album_dirs & set(db_album_dirs):
        try:
            fs_audio_by_dir[fs_dir] = {
                p.resolve() for p in fs_dir.iterdir() if p.is_file() and p.suffix.lower() in SUPPORTED_AUDIO_EXTS
            }
        except OSError:
            affected.add(fs_dir)
            continue

    # Build a per-album view of DB tracks for the dirs we still care about.
    db_audio_by_dir: dict[Path, dict[Path, "_TrackRow"]] = {}
    for rel, row in db_track_keys.items():
        abs_path = (root / rel).resolve()
        parent = abs_path.parent
        if parent not in fs_audio_by_dir:
            continue
        db_audio_by_dir.setdefault(parent, {})[abs_path] = row

    for fs_dir, fs_files in fs_audio_by_dir.items():
        db_files = db_audio_by_dir.get(fs_dir, {})
        if set(fs_files) != set(db_files):
            affected.add(fs_dir)
            continue
        for fs_file in fs_files:
            row = db_files[fs_file]
            mtime, size = _safe_stat(fs_file)
            if mtime != row["file_mtime"] or size != row["file_size"]:
                affected.add(fs_dir)
                break

    if not affected:
        return ValidationResult(added=0, removed=0, updated=0)

    return rescan_albums(
        root,
        conn,
        affected,
        measure_pictures=measure_pictures,
        on_album=on_album,
        _db_album_dirs=db_album_dirs,
    )

rescan_albums(root, conn, album_dirs, *, measure_pictures=False, on_album=None, _db_album_dirs=None)

Re-scan + re-audit each album dir; drop rows for any that vanished.

Reusable by the cold-start validate() pass and (in PR 2) the filesystem watcher. The DB is updated under one transaction so a crash mid-rescan can't half-apply changes.

Source code in src/maneki/audio/library/scan.py
def rescan_albums(
    root: Path,
    conn: sqlite3.Connection,
    album_dirs: Iterable[Path],
    *,
    measure_pictures: bool = False,
    on_album: ScanProgressCallback | None = None,
    _db_album_dirs: dict[Path, "_AlbumRow"] | None = None,
) -> "ValidationResult":
    """Re-scan + re-audit each album dir; drop rows for any that vanished.

    Reusable by the cold-start `validate()` pass and (in PR 2) the
    filesystem watcher. The DB is updated under one transaction so a
    crash mid-rescan can't half-apply changes.
    """
    from maneki.audio.library.audit import audit_album

    dirs = sorted({p.resolve() for p in album_dirs})
    if _db_album_dirs is None:
        _db_album_dirs = {
            (root / row["rel_path"]).resolve(): row for row in conn.execute("SELECT id, rel_path FROM albums")
        }

    now = time.time()
    root_abs = root.resolve()
    added = removed = updated = 0
    total = len(dirs)

    conn.execute("BEGIN IMMEDIATE")
    try:
        for idx, album_dir in enumerate(dirs, start=1):
            if on_album is not None:
                on_album(album_dir, idx, total)

            existing_row = _db_album_dirs.get(album_dir)
            if existing_row is not None:
                conn.execute("DELETE FROM albums WHERE id = ?", (existing_row["id"],))

            if not album_dir.is_dir():
                if existing_row is not None:
                    removed += 1
                continue

            album = _scan_album(album_dir, measure_pictures=measure_pictures)
            if album.track_count == 0 and existing_row is None:
                # Empty dir that never had a row — nothing to do.
                continue
            audit_album(album)
            new_album_id = _insert_album(conn, root_abs, album, now)
            for track in album.tracks:
                track_id = _insert_track(conn, new_album_id, root_abs, track, now)
                for genre in track.genres:
                    conn.execute(
                        "INSERT OR IGNORE INTO track_genres(track_id, genre) VALUES (?, ?)",
                        (track_id, genre),
                    )
            for warning in album.warnings:
                conn.execute(
                    "INSERT OR IGNORE INTO album_warnings(album_id, warning) VALUES (?, ?)",
                    (new_album_id, warning),
                )

            if existing_row is None:
                added += 1
            else:
                updated += 1
        conn.execute("COMMIT")
    except Exception:
        conn.execute("ROLLBACK")
        raise

    # Reclaim pages freed by the per-album DELETE+INSERT cycle. Cheap
    # when there's nothing to free; matters across many rescans.
    if removed or updated:
        from maneki.audio.library.db import reclaim_freelist

        reclaim_freelist(conn)

    return ValidationResult(added=added, removed=removed, updated=updated)

maneki.audio.serve

FastAPI factory + auth + config for the Subsonic-compatible HTTP server.

serve

Subsonic-compatible HTTP server for the converted maneki library.

maneki serve [DIR] launches a FastAPI app that exposes the library via the Subsonic API (v1.16.1). Any Subsonic client (Symfonium, play:Sub, Feishin, DSub, etc.) can browse, search, and stream from it.

Classes

ServeConfig

Bases: BaseModel

Resolved server credentials + scrobble settings. Plain text — local-self-hosted.

Source code in src/maneki/audio/serve/config.py
class ServeConfig(BaseModel):
    """Resolved server credentials + scrobble settings. Plain text — local-self-hosted."""

    username: str
    password: str
    scrobble: ScrobbleConfig = ScrobbleConfig()

Functions

create_app(*, root, cfg, use_cache=True)

Build the FastAPI app for root with the given credentials.

use_cache=False disables the persistent <root>/.maneki/index.db and falls back to in-memory scan on every rebuild.

The server exposes only the Subsonic /rest/* surface; the embedded /web browser UI was removed in 0.20.4 in favour of the standalone maneki ui command, which serves the same SPA against any Subsonic server without needing one running in-process.

Source code in src/maneki/audio/serve/app.py
def create_app(*, root: Path, cfg: ServeConfig, use_cache: bool = True) -> FastAPI:
    """Build the FastAPI app for `root` with the given credentials.

    `use_cache=False` disables the persistent `<root>/.maneki/index.db`
    and falls back to in-memory scan on every rebuild.

    The server exposes only the Subsonic `/rest/*` surface; the embedded
    `/web` browser UI was removed in 0.20.4 in favour of the
    standalone `maneki ui` command, which serves the same SPA against
    any Subsonic server without needing one running in-process.
    """
    app = FastAPI(
        # The Swagger / OpenAPI title is user-facing — match the branded
        # name used in the desktop wrappers and the HTML landing page.
        # (The Subsonic-spec `type` field stays lowercase "maneki" in
        # API payloads; that's a machine identifier.)
        title="Maneki",
        description=(
            "Subsonic-compatible HTTP API for a Maneki library. "
            "Every endpoint accepts the Subsonic salted-token query params "
            "(`?u=&t=&s=&v=&c=`); Swagger's *Try it out* form is read-only "
            "without those, but the schema itself is fully populated."
        ),
        version=SERVER_VERSION,
        docs_url="/docs",
        redoc_url=None,
        lifespan=_lifespan,
    )
    app.state.root = root
    app.state.cfg = cfg
    app.state.cache = IndexCache(root, use_cache=use_cache)
    # Stars / favourites — separate file from the index DB (which is
    # fully derived and gets wiped on schema bumps). User data lives at
    # `<root>/.maneki/stars.toml`; survives `library index drop`.
    from maneki.audio.serve.stars import StarStore

    app.state.stars = StarStore.for_root(root)

    # Scrobble forwarder — only spun up when `[scrobble.webhook]` or
    # `[scrobble.mqtt]` is in serve.toml. The dispatcher's `dispatch()`
    # is a no-op when both are unset, so keeping it on `app.state` even
    # in the disabled case keeps the endpoint code branchless.
    from maneki.audio.serve.scrobble import ScrobbleDispatcher

    app.state.scrobble = ScrobbleDispatcher(cfg.scrobble)

    # Middleware order is REVERSE of registration — last add_middleware
    # is the outermost wrap. We need:
    #   request -> CORS -> PostFormToQuery -> SubsonicFormat -> route
    # so that:
    #   - CORS handles cross-origin preflights + appends headers to
    #     responses. Required for `maneki ui` and the Maneki
    #     desktop wrappers (Tauri / Electron) — they load from a
    #     `tauri://` / `file://` / `http://localhost:1888` origin and
    #     fetch `http://server/rest/*` cross-origin; without CORS
    #     headers the browser blocks the response from JS access even
    #     though the server returned 200.
    #   - PostForm merges form-body credentials so play:Sub works.
    #   - SubsonicFormat converts JSON responses to XML when needed.
    #
    # The session + session-to-query middleware pair used to live here
    # for the embedded `/web` UI (cookies → `<audio src=/rest/stream>`
    # auth). Both went away with `/web` itself in 0.20.4 — Subsonic
    # `/rest/*` only ever needs salted-token auth from query params.
    app.add_middleware(SubsonicFormatMiddleware)
    app.add_middleware(PostFormToQueryMiddleware)
    # Access log — one structured line per HTTP request, with the
    # canonical Apache combined fields (client, user, request line,
    # status, bytes, referer, user_agent) plus a duration_ms for
    # easy slow-endpoint spotting. Outputs through the same
    # structlog handler `configure_logging()` installed, so JSON
    # mode produces one record per request that log shippers can
    # pivot on without regex parsing.
    app.add_middleware(make_access_log_middleware("maneki.audio.serve.access"))
    # CORS — outermost. We allow any origin because the Subsonic auth
    # token (`?u=&t=&s=`) is the security boundary, not the request
    # origin. This lets `maneki ui` (and the desktop wrappers) talk
    # to a remote maneki serve from their own origin without the
    # webview blocking the response. Other Subsonic clients (Symfonium
    # / Amperfy / play:Sub) aren't browsers and don't care about CORS
    # either way; this is purely additive.
    app.add_middleware(
        CORSMiddleware,
        allow_origins=["*"],
        allow_credentials=False,
        allow_methods=["*"],
        allow_headers=["*"],
        expose_headers=["*"],
    )

    async def require_auth(
        request: Request,
        u: str | None = Query(default=None, include_in_schema=False),
        p: str | None = Query(default=None, include_in_schema=False),
        t: str | None = Query(default=None, include_in_schema=False),
        s: str | None = Query(default=None, include_in_schema=False),
    ) -> None:
        """FastAPI dependency that enforces Subsonic auth on every endpoint.

        Two auth modes are accepted, both per the Subsonic spec:

          - **Plain**: `?u=<user>&p=<pass>`
          - **Token**: `?u=<user>&t=<md5(pass+salt)>&s=<salt>`

        The `v=` (client API version) and `c=` (client name) params are also
        part of the spec but not enforced here — clients send them for
        compatibility but we don't gate on them.

        These four params are hidden from the OpenAPI schema
        (`include_in_schema=False`). They apply identically to every
        endpoint and would otherwise drown each per-endpoint param list
        in boilerplate. Subsonic's multi-field auth (with optional
        `t`+`s` token pair) doesn't model cleanly as an OpenAPI security
        scheme, so the contract lives in this docstring + the app
        `description` instead.
        """
        del request
        try:
            verify(cfg, user=u, password=p, token=t, salt=s)
        except AuthError as exc:
            raise _SubsonicAuthError(str(exc)) from exc

    app.state.require_auth = require_auth

    @app.exception_handler(_SubsonicAuthError)
    async def auth_exception_handler(_request: Request, exc: _SubsonicAuthError) -> JSONResponse:
        return JSONResponse(error_envelope(40, str(exc)))

    # Mount endpoint groups. Imports happen lazily to keep the module graph
    # shallow and to avoid pulling FastAPI into pure-data modules.
    from maneki.audio.serve.endpoints.browsing import router as browsing_router
    from maneki.audio.serve.endpoints.extras import router as extras_router
    from maneki.audio.serve.endpoints.lyrics import router as lyrics_router
    from maneki.audio.serve.endpoints.media import router as media_router
    from maneki.audio.serve.endpoints.radio import router as radio_router
    from maneki.audio.serve.endpoints.scan import router as scan_router
    from maneki.audio.serve.endpoints.search import router as search_router
    from maneki.audio.serve.endpoints.stubs import router as stubs_router
    from maneki.audio.serve.endpoints.system import router as system_router

    auth_dep = [Depends(require_auth)]
    # Tags group endpoints in `/docs` (Swagger UI) — without them every
    # route lands under "default" and the page is one long flat list.
    # The grouping mirrors the module layout (`endpoints/<category>.py`)
    # so a request from a client maps directly to its source file.
    app.include_router(system_router, prefix="/rest", dependencies=auth_dep, tags=["system"])
    app.include_router(browsing_router, prefix="/rest", dependencies=auth_dep, tags=["browsing"])
    app.include_router(scan_router, prefix="/rest", dependencies=auth_dep, tags=["scan"])
    app.include_router(media_router, prefix="/rest", dependencies=auth_dep, tags=["media"])
    app.include_router(search_router, prefix="/rest", dependencies=auth_dep, tags=["search"])
    app.include_router(extras_router, prefix="/rest", dependencies=auth_dep, tags=["extras"])
    app.include_router(lyrics_router, prefix="/rest", dependencies=auth_dep, tags=["lyrics"])
    app.include_router(radio_router, prefix="/rest", dependencies=auth_dep, tags=["radio"])
    app.include_router(stubs_router, prefix="/rest", dependencies=auth_dep, tags=["stubs"])

    # Root probe — JSON only. Browsers visiting `/` get the same
    # introspection payload Subsonic clients hit on pre-login: server
    # name, version, and a pointer to the Subsonic API surface. The
    # browser UI is served by `maneki ui` (a separate static-file
    # server); this server is pure Subsonic now.
    @app.get("/", response_class=HTMLResponse)
    async def server_info(request: Request) -> HTMLResponse:
        # Tiny HTML landing page — clickable links to `/docs` and the
        # `/rest/` Subsonic surface from any browser that hits the root.
        # Subsonic clients (Symfonium, Amperfy, play:Sub) hit `/rest/ping`
        # directly so they never see this page; this is purely for the
        # human who paste the host into Safari to confirm the server is
        # reachable. ReDoc is disabled — Swagger's enough.
        base = str(request.base_url).rstrip("/")
        # `SERVER_NAME` ("maneki") stays the lowercase Subsonic API
        # identifier — it appears in `{"type": "maneki"}` payloads and
        # is what other clients match on. The user-facing brand in the
        # UI is "Maneki" (matches the desktop wrappers' window title
        # and the project README). Hardcode rather than `.title()` so
        # future renames stay explicit.
        display_name = "Maneki"
        body = (
            "<!doctype html><html><head><meta charset=utf-8>"
            f"<title>{display_name} {SERVER_VERSION}</title>"
            "<style>"
            "body{font-family:-apple-system,BlinkMacSystemFont,sans-serif;"
            "background:#1a1b26;color:#a9b1d6;max-width:42rem;margin:3rem auto;"
            "padding:0 1.5rem;line-height:1.6}"
            "h1{color:#7aa2f7;font-weight:600;margin-bottom:0.2rem}"
            ".sub{color:#565f89;font-size:0.95rem;margin-bottom:1.5rem}"
            "a{color:#7dcfff}"
            "code{background:#24283b;padding:0.1rem 0.4rem;border-radius:3px;"
            "font-size:0.9em;color:#bb9af7}"
            "ul{padding-left:1.2rem}"
            "</style></head><body>"
            f"<h1>{display_name}</h1>"
            f'<p class="sub">{SERVER_VERSION} · Subsonic-compatible</p>'
            f'<ul><li><a href="{base}/docs">/docs</a> — OpenAPI schema (Swagger UI)</li>'
            f"<li><code>{base}/rest/</code> — Subsonic API root "
            "(connect any Subsonic client here)</li></ul>"
            "</body></html>"
        )
        return HTMLResponse(body)

    return app

resolve_credentials(*, cli_user, cli_password)

CLI flags win over env vars / TOML. Falls back to admin/admin when nothing is set.

Returns (cfg, used_defaults) so the caller can warn the user when the insecure defaults are in play. Reads the consolidated maneki.settings (which falls back to the legacy serve.toml for one release cycle). The primary admin account backs the single-user ServeConfig the Subsonic + native auth paths consume; full multi-user enforcement layers on top.

Import is function-local: maneki.settings imports back into maneki.audio.config, so a module-level import here would cycle.

Source code in src/maneki/audio/serve/config.py
def resolve_credentials(*, cli_user: str | None, cli_password: str | None) -> tuple[ServeConfig, bool]:
    """CLI flags win over env vars / TOML. Falls back to admin/admin when nothing is set.

    Returns `(cfg, used_defaults)` so the caller can warn the user when the
    insecure defaults are in play. Reads the consolidated `maneki.settings`
    (which falls back to the legacy `serve.toml` for one release cycle). The
    primary admin account backs the single-user `ServeConfig` the Subsonic +
    native auth paths consume; full multi-user enforcement layers on top.

    Import is function-local: `maneki.settings` imports back into
    `maneki.audio.config`, so a module-level import here would cycle.
    """
    from maneki.settings import get_settings

    settings = get_settings()
    admin = settings.primary_admin()
    username = cli_user or admin.name
    password = cli_password or admin.password
    used_defaults = username == DEFAULT_USERNAME and password == DEFAULT_PASSWORD
    return ServeConfig(username=username, password=password, scrobble=settings.server.scrobble), used_defaults

maneki.audio.naming

Filesystem-safe folder + filename builders.

naming

Filesystem-safe name building for artist / album / track output paths.

Attributes

VARIOUS_ARTISTS = 'Various Artists' module-attribute

Functions

artist_folder(album_artist, fallback_artist, *, is_compilation=False)

Folder name for the artist level. Maps VA / compilation albums to Various Artists.

Three triggers route to the canonical Various Artists folder: - album_artist tag is a VA alias (VA, V.A., Various, …) - fallback_artist (per-track majority) is itself a VA alias — some rips stamp VA as the per-track artist and leave album_artist empty - is_compilation is True (album-level signal: distinct per-track artists with no shared album_artist tag, e.g. an MP3 mix labelled only by filename)

Source code in src/maneki/audio/naming.py
def artist_folder(album_artist: str | None, fallback_artist: str | None, *, is_compilation: bool = False) -> str:
    """Folder name for the artist level. Maps VA / compilation albums to `Various Artists`.

    Three triggers route to the canonical `Various Artists` folder:
    - `album_artist` tag is a VA alias (`VA`, `V.A.`, `Various`, …)
    - `fallback_artist` (per-track majority) is itself a VA alias — some rips
      stamp `VA` as the per-track artist and leave `album_artist` empty
    - `is_compilation` is True (album-level signal: distinct per-track artists
      with no shared `album_artist` tag, e.g. an MP3 mix labelled only by
      filename)
    """
    if is_compilation or is_various_artists(album_artist) or is_various_artists(fallback_artist):
        return VARIOUS_ARTISTS
    name = (album_artist or "").strip() or (fallback_artist or "").strip() or "Unknown Artist"
    return sanitize_component(name)

album_folder(album, year)

Folder name for the album level.

Format: YYYY - Album so directory listings inside an artist folder sort chronologically. Year is omitted if unknown, falling back to just Album. A year that's part of the album title (e.g. Vocal Trance Hits 2024, Taylor Swift's 1989) is intentionally left in place — it's the actual title.

Source code in src/maneki/audio/naming.py
def album_folder(album: str | None, year: str | int | None) -> str:
    """Folder name for the album level.

    Format: `YYYY - Album` so directory listings inside an artist folder sort
    chronologically. Year is omitted if unknown, falling back to just `Album`.
    A year that's part of the album title (e.g. `Vocal Trance Hits 2024`,
    Taylor Swift's `1989`) is intentionally left in place — it's the actual
    title.
    """
    base = (album or "").strip() or "Unknown Album"
    year_str = _coerce_year(year)
    full = f"{year_str} - {base}" if year_str else base
    return sanitize_component(full)

track_filename(track_no, title, *, artist=None, disc_no=None, disc_total=None, track_total=None, extension='.m4a')

Output filename for a single track.

Default format: 01 - Title<ext>. When the album spans multiple discs (disc_total > 1), the disc number is prefixed: 01-01 - Title<ext>. When artist is provided (typically only for compilations / VA albums) it is inserted between track number and title: 01-05 - Artist - Title<ext>.

Track-number width grows with track_total so albums with ≥100 tracks sort alphabetically correctly: a 100-track album yields 001, 002, 010, 099, 100 instead of breaking at the ⅔-digit boundary. Disc-number width is fixed at 2 (no realistic disc count needs more).

Source code in src/maneki/audio/naming.py
def track_filename(
    track_no: int | None,
    title: str | None,
    *,
    artist: str | None = None,
    disc_no: int | None = None,
    disc_total: int | None = None,
    track_total: int | None = None,
    extension: str = ".m4a",
) -> str:
    """Output filename for a single track.

    Default format: `01 - Title<ext>`. When the album spans multiple discs
    (`disc_total > 1`), the disc number is prefixed: `01-01 - Title<ext>`.
    When `artist` is provided (typically only for compilations / VA albums)
    it is inserted between track number and title: `01-05 - Artist - Title<ext>`.

    Track-number width grows with `track_total` so albums with ≥100 tracks
    sort alphabetically correctly: a 100-track album yields `001`, `002`,
    `010`, `099`, `100` instead of breaking at the 2/3-digit boundary.
    Disc-number width is fixed at 2 (no realistic disc count needs more).
    """
    width = 3 if (track_total or 0) >= 100 else 2
    track_str = f"{track_no:0{width}d}" if track_no else "0" * width
    title_str = (title or "").strip() or "Untitled"
    title_str = sanitize_component(title_str)
    if disc_total and disc_total > 1 and disc_no:
        prefix = f"{disc_no:02d}-{track_str}"
    else:
        prefix = track_str
    ext = extension if extension.startswith(".") else f".{extension}"
    if artist:
        artist_str = sanitize_component(artist.strip())
        return f"{prefix} - {artist_str} - {title_str}{ext.lower()}"
    return f"{prefix} - {title_str}{ext.lower()}"

clean_folder_album_name(name)

Strip codec/quality tags + edition annotations + extract year.

Returns (cleaned_album_name, year_or_None). Used as a fallback when an album has no ALBUM tag and we have to lean on the folder name.

Strip order
  1. Edition annotations ((Deluxe Edition), [Remastered], (2018 Reissue), (40th Anniversary Edition)) — these would otherwise leak into _FOLDER_YEAR_RE and pollute the year pick.
  2. Year extraction from any remaining (YYYY) / bare digits.
  3. Codec/quality tags ([FLAC], [16Bit-44.1kHz]).
  4. VA - / Various - prefix.

Live annotations ((Live), (Live in Madrid 2019)) are kept — a live album is a distinct work from its studio counterpart and the audience expects to see it labeled.

Source code in src/maneki/audio/naming.py
def clean_folder_album_name(name: str) -> tuple[str, str | None]:
    """Strip codec/quality tags + edition annotations + extract year.

    Returns `(cleaned_album_name, year_or_None)`. Used as a fallback when an
    album has no `ALBUM` tag and we have to lean on the folder name.

    Strip order:
      1. Edition annotations (`(Deluxe Edition)`, `[Remastered]`, `(2018
         Reissue)`, `(40th Anniversary Edition)`) — these would otherwise
         leak into `_FOLDER_YEAR_RE` and pollute the year pick.
      2. Year extraction from any remaining `(YYYY)` / bare digits.
      3. Codec/quality tags (`[FLAC]`, `[16Bit-44.1kHz]`).
      4. `VA -` / `Various -` prefix.

    Live annotations (`(Live)`, `(Live in Madrid 2019)`) are kept — a live
    album is a distinct work from its studio counterpart and the audience
    expects to see it labeled.
    """
    cleaned = _FOLDER_EDITION_RE.sub(" ", name)
    year_match = _FOLDER_YEAR_RE.search(cleaned)
    year = year_match.group(1) if year_match else None
    if year_match:
        cleaned = cleaned.replace(year_match.group(0), " ")
    cleaned = _FOLDER_TAG_RE.sub(" ", cleaned)
    cleaned = _VA_PREFIX_RE.sub("", cleaned)
    cleaned = re.sub(r"\s+", " ", cleaned).strip(" -_")
    return cleaned or name, year

leading_year_from_folder(name)

Return the 4-digit year iff name starts with one followed by a separator.

Used by the convert pipeline to override reissue years that survive in track tags when the input dir is hand-named with the original year (e.g. 1983. Album! [2018 Reissue] should yield 1983, not 2018).

Source code in src/maneki/audio/naming.py
def leading_year_from_folder(name: str | None) -> str | None:
    """Return the 4-digit year iff `name` starts with one followed by a separator.

    Used by the convert pipeline to override reissue years that survive in
    track tags when the input dir is hand-named with the original year (e.g.
    `1983. Album! [2018 Reissue]` should yield 1983, not 2018).
    """
    if not name:
        return None
    match = _LEADING_FOLDER_YEAR_RE.match(name)
    return match.group(1) if match else None

is_various_artists(album_artist)

Return True if album_artist indicates a Various-Artists compilation.

Source code in src/maneki/audio/naming.py
def is_various_artists(album_artist: str | None) -> bool:
    """Return True if `album_artist` indicates a Various-Artists compilation."""
    if not album_artist:
        return False
    return album_artist.strip().casefold() in _VA_ALIASES

sanitize_component(value)

Make value safe to use as a single path component on any OS.

Replaces forbidden characters, collapses whitespace, NFC-normalizes unicode, strips trailing dots/spaces, and caps the encoded length at 180 bytes.

Source code in src/maneki/audio/naming.py
def sanitize_component(value: str) -> str:
    """Make `value` safe to use as a single path component on any OS.

    Replaces forbidden characters, collapses whitespace, NFC-normalizes unicode,
    strips trailing dots/spaces, and caps the encoded length at 180 bytes.
    """
    cleaned = unicodedata.normalize("NFC", value).translate(_BAD_CHARS)
    cleaned = _WHITESPACE_RE.sub(" ", cleaned).strip()
    cleaned = _TRAILING_BAD_RE.sub("", cleaned)
    if not cleaned:
        cleaned = "Unknown"
    encoded = cleaned.encode("utf-8")
    if len(encoded) > _MAX_COMPONENT_BYTES:
        # Truncate on a unicode-safe boundary by progressively decoding.
        cleaned = encoded[:_MAX_COMPONENT_BYTES].decode("utf-8", errors="ignore").rstrip()
    return cleaned

maneki.audio.cover

Cover-art candidates, picker, normaliser.

cover

Locate, normalize, and embed album cover art.

Attributes

DEFAULT_MAX_EDGE = 1000 module-attribute

Classes

CoverCandidate

Bases: BaseModel

A candidate cover image, before normalization.

Source code in src/maneki/audio/cover.py
class CoverCandidate(BaseModel):
    """A candidate cover image, before normalization."""

    model_config = ConfigDict(arbitrary_types_allowed=True)

    source: CoverSource
    data: bytes
    mime: str | None = None
    width: int = 0
    height: int = 0
    label: str = ""

    @property
    def pixels(self) -> int:
        return self.width * self.height

    @property
    def size_bytes(self) -> int:
        return len(self.data)

CoverSource

Bases: str, Enum

Where the cover came from. Used for reporting + tie-breaking under --enrich.

Source code in src/maneki/audio/cover.py
class CoverSource(str, Enum):
    """Where the cover came from. Used for reporting + tie-breaking under --enrich."""

    EMBEDDED = "embedded"
    FOLDER = "folder"
    ONLINE = "online"

Cover

Bases: BaseModel

A normalized album cover ready to embed into every track of an album.

Source code in src/maneki/audio/cover.py
class Cover(BaseModel):
    """A normalized album cover ready to embed into every track of an album."""

    model_config = ConfigDict(arbitrary_types_allowed=True)

    data: bytes
    mime: str  # "image/jpeg" or "image/png"
    width: int
    height: int
    source: CoverSource
    label: str

Functions

collect_candidates(album_dir, tracks)

Gather every plausible cover candidate for an album (offline only).

Source code in src/maneki/audio/cover.py
def collect_candidates(album_dir: Path, tracks: list[SourceTrack]) -> list[CoverCandidate]:
    """Gather every plausible cover candidate for an album (offline only)."""
    candidates: list[CoverCandidate] = []

    # Collect every distinct embedded picture across the album. Most albums
    # carry the same cover on every track, but mixed rips occasionally have
    # one track with corrected/larger artwork — let the picker compare them
    # all and pick the highest-area image. Drop unparseable bytes here so
    # they never reach `normalize()`.
    seen_embedded: set[bytes] = set()
    for track in tracks:
        if not track.embedded_picture:
            continue
        if track.embedded_picture in seen_embedded:
            continue
        seen_embedded.add(track.embedded_picture)
        width, height = _measure(track.embedded_picture)
        if width == 0 or height == 0:
            continue
        candidates.append(
            CoverCandidate(
                source=CoverSource.EMBEDDED,
                data=track.embedded_picture,
                mime=track.embedded_picture_mime,
                width=width,
                height=height,
                label=f"embedded in {track.path.name}",
            )
        )

    # Search the album anchor AND every disc subfolder for folder images.
    # For bare-leading multi-disc layouts the anchor IS the wrapper; for
    # shared-prefix layouts (`Album (CD1)`) the anchor is the first disc
    # folder, so the other disc subfolders need scanning too — and the
    # parent wrapper might carry a top-level `folder.jpg` shared across
    # both discs (anchor.parent).
    seen_dirs: set[Path] = {album_dir}
    search_dirs: list[Path] = [album_dir]
    if _looks_like_disc_anchor(album_dir):
        # Anchor is `<wrapper>/<Album (CD1)>` — also scan `<wrapper>` for the
        # shared cover that lives at the wrapper level.
        parent = album_dir.parent
        if parent not in seen_dirs and parent != album_dir:
            seen_dirs.add(parent)
            search_dirs.append(parent)
    for track in tracks:
        if track.path.parent not in seen_dirs:
            seen_dirs.add(track.path.parent)
            search_dirs.append(track.path.parent)

    seen_image_paths: set[Path] = set()
    for folder_dir in search_dirs:
        for path in _find_folder_images(folder_dir):
            if path in seen_image_paths:
                continue
            seen_image_paths.add(path)
            try:
                data = path.read_bytes()
            except OSError:
                continue
            width, height = _measure(data)
            if width == 0 or height == 0:
                # Pillow couldn't decode — image is corrupt or not actually
                # an image despite the extension. Skip it so it can't be
                # picked, normalised, and crash the album later.
                continue
            candidates.append(
                CoverCandidate(
                    source=CoverSource.FOLDER,
                    data=data,
                    mime=_guess_mime(path.suffix),
                    width=width,
                    height=height,
                    label=path.name,
                )
            )

    return candidates

pick_best(candidates)

Pick the highest-quality candidate.

"Quality" = pixel area first, then file size, then source order (online > folder > embedded). The source-order tiebreaker matters under --enrich so that an online provider returning the same dimensions as a 600×600 scanned folder.jpg still wins.

Source code in src/maneki/audio/cover.py
def pick_best(candidates: Iterable[CoverCandidate]) -> CoverCandidate | None:
    """Pick the highest-quality candidate.

    "Quality" = pixel area first, then file size, then source order
    (online > folder > embedded). The source-order tiebreaker matters under
    `--enrich` so that an online provider returning the same dimensions as
    a 600×600 scanned folder.jpg still wins.
    """
    source_order = {CoverSource.ONLINE: 2, CoverSource.FOLDER: 1, CoverSource.EMBEDDED: 0}
    best: CoverCandidate | None = None
    for candidate in candidates:
        if best is None:
            best = candidate
            continue
        if candidate.pixels > best.pixels:
            best = candidate
        elif candidate.pixels == best.pixels and candidate.size_bytes > best.size_bytes:
            best = candidate
        elif (
            candidate.pixels == best.pixels
            and candidate.size_bytes == best.size_bytes
            and source_order[candidate.source] > source_order[best.source]
        ):
            best = candidate
    return best

normalize(candidate, *, max_edge=DEFAULT_MAX_EDGE)

Decode + recompress the chosen candidate.

Output is JPEG ≤ max_edge px on the long side, RGB, quality 92 — except for PNGs that already fit, which are passed through unchanged.

Source code in src/maneki/audio/cover.py
def normalize(candidate: CoverCandidate, *, max_edge: int = DEFAULT_MAX_EDGE) -> Cover:
    """Decode + recompress the chosen candidate.

    Output is JPEG ≤ `max_edge` px on the long side, RGB, quality 92 — except
    for PNGs that already fit, which are passed through unchanged.
    """
    opened = Image.open(io.BytesIO(candidate.data))
    opened.load()
    width, height = opened.size
    long_edge = max(width, height)
    needs_resize = long_edge > max_edge
    is_png = (candidate.mime or "").lower().endswith("png") or opened.format == "PNG"

    if not needs_resize and is_png:
        return Cover(
            data=candidate.data,
            mime="image/png",
            width=width,
            height=height,
            source=candidate.source,
            label=candidate.label,
        )

    image: Image.Image = opened
    if needs_resize:
        scale = max_edge / long_edge
        image = image.resize((int(width * scale), int(height * scale)), Image.Resampling.LANCZOS)
        width, height = image.size

    if image.mode not in ("RGB", "L"):
        image = image.convert("RGB")

    buffer = io.BytesIO()
    image.save(buffer, format="JPEG", quality=_JPEG_QUALITY, optimize=True, progressive=True)
    return Cover(
        data=buffer.getvalue(),
        mime="image/jpeg",
        width=width,
        height=height,
        source=candidate.source,
        label=candidate.label,
    )