Skip to content

API Reference

Auto-generated from docstrings via mkdocstrings. The public surface is small — most users only touch the CLI — but if you're embedding musickit in another tool, these are the entry points.

musickit.metadata

Read source audio tags (FLAC / MP3 / generic) and write MP4 ALAC / AAC / MP3 tags.

metadata

Read source audio tags (FLAC / MP3 / generic) and write MP4 ALAC tags.

Public API is split across submodules; this module re-exports the names the rest of the project (and tests) import from musickit.metadata.

Attributes

SUPPORTED_AUDIO_EXTS = frozenset({'.flac', '.mp3', '.m4a', '.m4b', '.mp4', '.aac', '.ogg', '.opus', '.wav', '.aiff', '.aif'}) module-attribute

Classes

SourceTrack

Bases: BaseModel

Tag bundle read from a single source audio file.

Source code in src/musickit/metadata/models.py
class SourceTrack(BaseModel):
    """Tag bundle read from a single source audio file."""

    model_config = ConfigDict(arbitrary_types_allowed=True)

    path: Path
    title: str | None = None
    artist: str | None = None
    album_artist: str | None = None
    album: str | None = None
    date: str | None = None
    genre: str | None = None
    # Multi-genre support — FLAC repeats GENRE, ID3 repeats TCON, MP4 atoms
    # only carry one. `genre` stays as the primary (backwards-compat) and
    # equals `genres[0]` when both are populated.
    genres: list[str] = Field(default_factory=list)
    track_no: int | None = None
    track_total: int | None = None
    disc_no: int | None = None
    disc_total: int | None = None
    bpm: int | None = None
    label: str | None = None
    catalog: str | None = None
    lyrics: str | None = None
    replaygain: dict[str, str] = Field(default_factory=dict)
    embedded_picture: bytes | None = None
    embedded_picture_mime: str | None = None
    embedded_picture_pixels: int = 0
    duration_s: float | None = None  # audio duration; used by dedup to discriminate same-tag distinct content
    # MusicBrainz recording MBID (per-track, distinct from the album-level
    # release MBID). Populated by the MB enrichment follow-up call when
    # `--enrich` is on. Picard convention: stored as `MusicBrainz Track Id`
    # on MP4 freeform / `MusicBrainz Recording Id` on ID3 TXXX.
    mb_recording_id: str | None = None

AlbumSummary

Bases: BaseModel

Album-level rollup derived by majority-vote across the album's tracks.

Source code in src/musickit/metadata/models.py
class AlbumSummary(BaseModel):
    """Album-level rollup derived by majority-vote across the album's tracks."""

    album: str | None = None
    album_artist: str | None = None
    artist_fallback: str | None = None
    year: str | None = None
    genre: str | None = None
    track_total: int | None = None
    disc_total: int | None = None
    is_compilation: bool = False
    label: str | None = None
    catalog: str | None = None

MusicBrainzIds

Bases: BaseModel

Album-level MusicBrainz IDs supplied by an --enrich provider.

Per-track recording MBIDs live on SourceTrack.mb_recording_id — they vary per track and don't belong on an album-scope object.

Source code in src/musickit/metadata/models.py
class MusicBrainzIds(BaseModel):
    """Album-level MusicBrainz IDs supplied by an --enrich provider.

    Per-track recording MBIDs live on `SourceTrack.mb_recording_id` —
    they vary per track and don't belong on an album-scope object.
    """

    album_id: str | None = None
    artist_id: str | None = None
    release_group_id: str | None = None

TagOverrides

Bases: BaseModel

Optional tag overrides applied in-place by apply_tag_overrides.

Each field is None to mean "leave the existing tag alone". Pass an empty string to clear a tag explicitly (rare; typically you just leave it).

Source code in src/musickit/metadata/models.py
class TagOverrides(BaseModel):
    """Optional tag overrides applied in-place by `apply_tag_overrides`.

    Each field is `None` to mean "leave the existing tag alone". Pass an empty
    string to *clear* a tag explicitly (rare; typically you just leave it).
    """

    title: str | None = None
    artist: str | None = None
    album_artist: str | None = None
    album: str | None = None
    year: str | None = None
    genre: str | None = None
    track_no: int | None = None
    track_total: int | None = None
    disc_no: int | None = None
    disc_total: int | None = None

    def is_empty(self) -> bool:
        return all(v is None for v in self.model_dump().values())

Functions

read_source(path, *, light=False, measure_pictures=False)

Read tags + embedded cover from a single audio file.

Source values that arrive entirely lowercase are smart-title-cased here so downstream filenames + tags display consistently. Anything with real casing (AC/DC, ABBA, iPhone, R.E.M.) is left alone.

light=True skips the two expensive operations the convert pipeline needs but the library scanner / TUI doesn't: - Pillow decode of the embedded picture (for cover_pixels) - A second mutagen open to read info.length (for duration_s) has_cover still works in light mode (presence is checked without touching the bytes); only the pixel measurement is skipped.

measure_pictures=True re-enables the Pillow decode even under light=True, so audit modes that need low-res-cover detection can pay just that cost without also paying the duration probe.

Source code in src/musickit/metadata/read.py
def read_source(
    path: Path,
    *,
    light: bool = False,
    measure_pictures: bool = False,
) -> SourceTrack:
    """Read tags + embedded cover from a single audio file.

    Source values that arrive entirely lowercase are smart-title-cased here
    so downstream filenames + tags display consistently. Anything with real
    casing (`AC/DC`, `ABBA`, `iPhone`, `R.E.M.`) is left alone.

    `light=True` skips the two expensive operations the convert pipeline
    needs but the library scanner / TUI doesn't:
      - Pillow decode of the embedded picture (for `cover_pixels`)
      - A second mutagen open to read `info.length` (for `duration_s`)
    `has_cover` still works in light mode (presence is checked without
    touching the bytes); only the pixel measurement is skipped.

    `measure_pictures=True` re-enables the Pillow decode even under
    `light=True`, so audit modes that need low-res-cover detection can
    pay just that cost without also paying the duration probe.
    """
    suffix = path.suffix.lower()
    if suffix == ".flac":
        track = _read_flac(path, light=light, measure_pictures=measure_pictures)
    elif suffix == ".mp3":
        track = _read_mp3(path, light=light, measure_pictures=measure_pictures)
    elif suffix in (".m4a", ".mp4", ".m4b"):
        track = _read_mp4(path, light=light, measure_pictures=measure_pictures)
    else:
        track = _read_generic(path, light=light, measure_pictures=measure_pictures)
    track.title = smart_title_case(track.title)
    track.artist = smart_title_case(track.artist)
    track.album = smart_title_case(track.album)
    track.album_artist = smart_title_case(track.album_artist)
    return track

summarize_album(tracks)

Build an album-level summary by majority-vote across tracks.

For multi-disc albums the album-name vote is biased toward disc 1 — bonus discs often carry tags like Album (CD2) Live In ... that would otherwise win on count and produce a misleading combined name.

Source code in src/musickit/metadata/album.py
def summarize_album(tracks: list[SourceTrack]) -> AlbumSummary:
    """Build an album-level summary by majority-vote across `tracks`.

    For multi-disc albums the album-name vote is biased toward disc 1 — bonus
    discs often carry tags like `Album (CD2) Live In ...` that would otherwise
    win on count and produce a misleading combined name.
    """
    disc_one_tracks = [t for t in tracks if t.disc_no == 1]
    album_source = disc_one_tracks if disc_one_tracks else tracks
    # Album name should be unanimous within a real album. Require quorum so a
    # single stray tagged track (foreign album mixed into the rip) can't
    # impersonate the whole-album value when most tracks have no album tag.
    album = clean_album_title(_majority((t.album for t in album_source), quorum=True))
    album_artist = _majority(t.album_artist for t in tracks)
    year = _majority(t.date for t in tracks)
    genre = _majority(t.genre for t in tracks)
    label = _majority(t.label for t in tracks)
    catalog = _majority(t.catalog for t in tracks)

    artist_counts = Counter(t.artist for t in tracks if t.artist)
    distinct_artists = len(artist_counts)
    artist_fallback = artist_counts.most_common(1)[0][0] if artist_counts else None

    track_total = max((t.track_total or 0 for t in tracks), default=0) or len(tracks) or None
    disc_total = max((t.disc_total or 0 for t in tracks), default=0) or None

    # Compilation if: album_artist is a VA alias, the per-track artist majority
    # is itself a VA alias (rips that leave album_artist empty but stamp every
    # track artist as `VA`), or there's no album_artist + tracks span multiple
    # different artists.
    is_compilation = (
        is_various_artists(album_artist)
        or is_various_artists(artist_fallback)
        or (album_artist is None and distinct_artists > 1)
    )

    return AlbumSummary(
        album=album,
        album_artist=album_artist,
        artist_fallback=artist_fallback,
        year=year,
        genre=genre,
        track_total=track_total,
        disc_total=disc_total,
        is_compilation=is_compilation,
        label=label,
        catalog=catalog,
    )

clean_album_title(album)

Clean disc markers, scene-rip dot-separators, and VA - prefixes from an album tag.

Strips: - trailing [CDx] / (Disc x) / - CD 1 / [CD.1] markers - embedded (CDx) markers (Cranberries Roses (CD2) Live In Madrid shape) - trailing (1) / (2) (bare-paren disc index, no keyword) - dots / underscores used as word-separator instead of spaces (Absolute.Music.60, Absolute_Music_45Absolute Music 60/45); preserves single-letter acronyms like R.E.M. - leading VA - / VA.-. / Various - prefixes once the dots are space

Source code in src/musickit/metadata/album.py
def clean_album_title(album: str | None) -> str | None:
    """Clean disc markers, scene-rip dot-separators, and `VA -` prefixes from an album tag.

    Strips:
    - trailing `[CDx]` / `(Disc x)` / ` - CD 1` / `[CD.1]` markers
    - embedded `(CDx)` markers (Cranberries `Roses (CD2) Live In Madrid` shape)
    - trailing `(1)` / `(2)` (bare-paren disc index, no keyword)
    - dots / underscores used as word-separator instead of spaces
      (`Absolute.Music.60`, `Absolute_Music_45` → `Absolute Music 60/45`);
      preserves single-letter acronyms like `R.E.M.`
    - leading `VA - ` / `VA.-.` / `Various -` prefixes once the dots are space
    """
    if not album:
        return album
    cleaned = album
    # Repeatedly strip trailing disc markers (handles `Album [CD1] (Deluxe)`).
    while True:
        stripped = _DISC_SUFFIX_RE.sub("", cleaned).strip(" -")
        if stripped == cleaned:
            break
        cleaned = stripped
    # Strip embedded `(CDx)` markers (Cranberries Roses-style: `Roses (CD2) Live In Madrid`).
    cleaned_mid = _DISC_KEYWORD_RE.sub(" ", cleaned)
    cleaned_mid = re.sub(r"\s+", " ", cleaned_mid).strip(" -")
    if cleaned_mid:
        cleaned = cleaned_mid
    # Last pass: strip trailing `(1)` / `(2)` etc. (disc number without a keyword).
    bare = _BARE_DISC_PAREN_RE.sub("", cleaned).strip(" -")
    if bare:
        cleaned = bare
    # Dots/underscores as separator: replace between multi-letter chunks with space.
    cleaned = _SCENE_DOT_SEP_RE.sub(" ", cleaned)
    cleaned = _SCENE_USCORE_SEP_RE.sub(" ", cleaned)
    # Strip leading VA prefix (now that dots are spaces, `VA.-.Foo` reads
    # as `VA - Foo` / `VA.-.Foo`; either way the prefix should go).
    cleaned = _VA_PREFIX_IN_ALBUM_RE.sub("", cleaned)
    cleaned = re.sub(r"\s+", " ", cleaned).strip(" -")
    return cleaned or album

write_tags(path, track, album, *, cover_bytes, cover_mime, musicbrainz=None)

Write the target tag set to path, dispatching by file extension.

Source code in src/musickit/metadata/write.py
def write_tags(
    path: Path,
    track: SourceTrack,
    album: AlbumSummary,
    *,
    cover_bytes: bytes | None,
    cover_mime: str | None,
    musicbrainz: MusicBrainzIds | None = None,
) -> None:
    """Write the target tag set to `path`, dispatching by file extension."""
    suffix = path.suffix.lower()
    if suffix == ".mp3":
        write_id3_tags(path, track, album, cover_bytes=cover_bytes, cover_mime=cover_mime, musicbrainz=musicbrainz)
    elif suffix in (".m4a", ".mp4", ".m4b"):
        write_mp4_tags(path, track, album, cover_bytes=cover_bytes, cover_mime=cover_mime, musicbrainz=musicbrainz)
    else:
        raise ValueError(f"unsupported output extension for tag writing: {suffix}")

write_mp4_tags(path, track, album, *, cover_bytes, cover_mime, musicbrainz=None)

Write the full target tag set to an existing ALAC/AAC .m4a file.

Source code in src/musickit/metadata/write.py
def write_mp4_tags(
    path: Path,
    track: SourceTrack,
    album: AlbumSummary,
    *,
    cover_bytes: bytes | None,
    cover_mime: str | None,
    musicbrainz: MusicBrainzIds | None = None,
) -> None:
    """Write the full target tag set to an existing ALAC/AAC `.m4a` file."""
    mp4 = MP4(path)
    tags = mp4.tags
    if tags is None:
        mp4.add_tags()
        tags = mp4.tags
    assert tags is not None  # appease type-checker; add_tags always populates

    tags.clear()
    _set(tags, "\xa9nam", track.title)
    _set(tags, "\xa9ART", track.artist or album.artist_fallback)
    _set(tags, "\xa9alb", album.album)
    _set(tags, "aART", "Various Artists" if album.is_compilation else (album.album_artist or album.artist_fallback))
    _set(tags, "\xa9day", _year_only(album.year))
    _set(tags, "\xa9gen", track.genre or album.genre)
    _set(tags, "\xa9lyr", track.lyrics)

    track_no = track.track_no or 0
    track_total = track.track_total or album.track_total or 0
    if track_no or track_total:
        tags["trkn"] = [(track_no, track_total)]

    disc_no = track.disc_no or 0
    disc_total = track.disc_total or album.disc_total or 0
    if disc_no or disc_total:
        tags["disk"] = [(disc_no, disc_total)]

    if track.bpm is not None and track.bpm > 0:
        tags["tmpo"] = [int(track.bpm)]

    if album.is_compilation:
        tags["cpil"] = True

    label = track.label or album.label
    catalog = track.catalog or album.catalog
    _set_freeform(tags, "LABEL", label)
    _set_freeform(tags, "CATALOGNUMBER", catalog)
    for key, value in track.replaygain.items():
        _set_freeform(tags, key, value)

    if musicbrainz:
        _set_freeform(tags, "MusicBrainz Album Id", musicbrainz.album_id)
        _set_freeform(tags, "MusicBrainz Artist Id", musicbrainz.artist_id)
        _set_freeform(tags, "MusicBrainz Release Group Id", musicbrainz.release_group_id)
    # Per-track recording MBID — Picard convention is to store this as
    # the iTunes "MusicBrainz Track Id" freeform (despite the name, it's
    # the recording MBID, not the release-track MBID). Written even when
    # no album-level musicbrainz block is present, since per-track lookups
    # via AcoustID can produce recording IDs without an album hit.
    if track.mb_recording_id:
        _set_freeform(tags, "MusicBrainz Track Id", track.mb_recording_id)

    if cover_bytes:
        cover_format = MP4Cover.FORMAT_PNG if (cover_mime or "").lower().endswith("png") else MP4Cover.FORMAT_JPEG
        tags["covr"] = [MP4Cover(cover_bytes, imageformat=cover_format)]

    mp4.save()

write_id3_tags(path, track, album, *, cover_bytes, cover_mime, musicbrainz=None)

Write the full target tag set to an MP3 file as ID3v2.4.

Source code in src/musickit/metadata/write.py
def write_id3_tags(
    path: Path,
    track: SourceTrack,
    album: AlbumSummary,
    *,
    cover_bytes: bytes | None,
    cover_mime: str | None,
    musicbrainz: MusicBrainzIds | None = None,
) -> None:
    """Write the full target tag set to an MP3 file as ID3v2.4."""
    try:
        id3 = ID3(path)
    except ID3NoHeaderError:
        id3 = ID3()

    id3.delete()

    title = track.title
    artist = track.artist or album.artist_fallback
    album_artist = "Various Artists" if album.is_compilation else (album.album_artist or album.artist_fallback)
    year = _year_only(album.year)
    genre = track.genre or album.genre

    if title:
        id3.add(TIT2(encoding=3, text=title))
    if artist:
        id3.add(TPE1(encoding=3, text=artist))
    if album.album:
        id3.add(TALB(encoding=3, text=album.album))
    if album_artist:
        id3.add(TPE2(encoding=3, text=album_artist))
    if year:
        id3.add(TDRC(encoding=3, text=year))
    if genre:
        id3.add(TCON(encoding=3, text=genre))

    track_no = track.track_no or 0
    track_total = track.track_total or album.track_total or 0
    if track_no or track_total:
        id3.add(TRCK(encoding=3, text=f"{track_no}/{track_total}" if track_total else str(track_no)))

    disc_no = track.disc_no or 0
    disc_total = track.disc_total or album.disc_total or 0
    if disc_no or disc_total:
        id3.add(TPOS(encoding=3, text=f"{disc_no}/{disc_total}" if disc_total else str(disc_no)))

    if track.bpm is not None and track.bpm > 0:
        id3.add(TBPM(encoding=3, text=str(int(track.bpm))))

    if album.is_compilation:
        id3.add(TCMP(encoding=3, text="1"))

    label = track.label or album.label
    if label:
        id3.add(TPUB(encoding=3, text=label))

    if track.lyrics:
        id3.add(USLT(encoding=3, lang="eng", desc="", text=track.lyrics))

    catalog = track.catalog or album.catalog
    if catalog:
        id3.add(TXXX(encoding=3, desc="CATALOGNUMBER", text=catalog))

    for key, value in track.replaygain.items():
        id3.add(TXXX(encoding=3, desc=key, text=value))

    if musicbrainz:
        mb_pairs: list[tuple[str, str | None]] = [
            ("MusicBrainz Album Id", musicbrainz.album_id),
            ("MusicBrainz Artist Id", musicbrainz.artist_id),
            ("MusicBrainz Release Group Id", musicbrainz.release_group_id),
        ]
        for desc, mb_value in mb_pairs:
            if mb_value:
                id3.add(TXXX(encoding=3, desc=desc, text=mb_value))
    # Per-track recording MBID — Picard's `MusicBrainz Recording Id` TXXX
    # frame. Independent of the album-level block above so that AcoustID-
    # only lookups can still emit a recording MBID.
    if track.mb_recording_id:
        id3.add(TXXX(encoding=3, desc="MusicBrainz Recording Id", text=track.mb_recording_id))

    if cover_bytes:
        mime = "image/png" if (cover_mime or "").lower().endswith("png") else "image/jpeg"
        id3.add(APIC(encoding=3, mime=mime, type=3, desc="Front cover", data=cover_bytes))

    id3.save(path, v2_version=4)

embed_cover_only(path, *, cover_bytes, cover_mime)

Replace the cover of an existing audio file without touching other tags.

Supports .m4a/.mp4/.m4b, .mp3, and .flac. Used by musickit cover to retrofit album art onto already-converted files. All previous pictures are dropped first so we don't end up with multiple covers.

Source code in src/musickit/metadata/write.py
def embed_cover_only(path: Path, *, cover_bytes: bytes, cover_mime: str) -> None:
    """Replace the cover of an existing audio file without touching other tags.

    Supports `.m4a/.mp4/.m4b`, `.mp3`, and `.flac`. Used by `musickit cover`
    to retrofit album art onto already-converted files. All previous pictures
    are dropped first so we don't end up with multiple covers.
    """
    suffix = path.suffix.lower()
    if suffix in (".m4a", ".mp4", ".m4b"):
        mp4 = MP4(path)
        if mp4.tags is None:
            mp4.add_tags()
        tags = mp4.tags
        assert tags is not None
        cover_format = MP4Cover.FORMAT_PNG if cover_mime.lower().endswith("png") else MP4Cover.FORMAT_JPEG
        tags["covr"] = [MP4Cover(cover_bytes, imageformat=cover_format)]
        mp4.save()
        return
    if suffix == ".mp3":
        try:
            id3 = ID3(path)
        except ID3NoHeaderError:
            id3 = ID3()
        for apic_key in list(id3.keys()):
            if apic_key.startswith("APIC"):
                del id3[apic_key]
        mime = "image/png" if cover_mime.lower().endswith("png") else "image/jpeg"
        id3.add(APIC(encoding=3, mime=mime, type=3, desc="Front cover", data=cover_bytes))
        id3.save(path, v2_version=4)
        return
    if suffix == ".flac":
        from mutagen.flac import Picture  # local import — only needed for FLAC

        flac = FLAC(path)
        flac.clear_pictures()
        picture = Picture()
        picture.type = 3  # front cover
        picture.mime = "image/png" if cover_mime.lower().endswith("png") else "image/jpeg"
        picture.data = cover_bytes
        flac.add_picture(picture)
        flac.save()
        return
    raise ValueError(f"unsupported audio file for cover injection: {path}")

apply_tag_overrides(path, overrides)

Apply overrides to path in-place; leave unspecified tags untouched.

Supports .m4a/.mp4/.m4b, .mp3, .flac. Track totals get merged into the existing (track, total) tuple so we don't lose the per-track number.

Source code in src/musickit/metadata/overrides.py
def apply_tag_overrides(path: Path, overrides: TagOverrides) -> None:
    """Apply `overrides` to `path` in-place; leave unspecified tags untouched.

    Supports `.m4a/.mp4/.m4b`, `.mp3`, `.flac`. Track totals get merged into
    the existing `(track, total)` tuple so we don't lose the per-track number.
    """
    if overrides.is_empty():
        return
    suffix = path.suffix.lower()
    if suffix in (".m4a", ".mp4", ".m4b"):
        _apply_overrides_mp4(path, overrides)
        return
    if suffix == ".mp3":
        _apply_overrides_id3(path, overrides)
        return
    if suffix == ".flac":
        _apply_overrides_flac(path, overrides)
        return
    raise ValueError(f"unsupported audio file for tag override: {path}")

musickit.library

Walk a converted-output directory, build an Artist→Album→Track index, audit it, fix the deterministic warnings, and persist it as a SQLite cache at <root>/.musickit/index.db.

library

Walk a converted-output directory, build an Artist→Album→Track index, audit it.

Public surface re-exported here so callers keep using from musickit import library / from musickit.library import …. The leading-underscore helpers _audit_cover and _split_dir_year are also re-exported because tests/CLI import them directly.

Attributes

SCHEMA_VERSION = 1 module-attribute

Bumped when _SCHEMA changes shape; mismatched DBs are unlinked + rebuilt.

INDEX_DIR_NAME = '.musickit' module-attribute

INDEX_DB_NAME = 'index.db' module-attribute

ScanProgressCallback = Callable[[Path, int, int], None] module-attribute

Classes

LibraryTrack

Bases: BaseModel

Track-level summary used by LibraryIndex.

Source code in src/musickit/library/models.py
class LibraryTrack(BaseModel):
    """Track-level summary used by `LibraryIndex`."""

    model_config = ConfigDict(arbitrary_types_allowed=True)

    path: Path
    title: str | None = None
    artist: str | None = None
    album_artist: str | None = None
    album: str | None = None
    year: str | None = None
    track_no: int | None = None
    disc_no: int | None = None
    genre: str | None = None
    genres: list[str] = []
    lyrics: str | None = None
    # ReplayGain values from source tags (`replaygain_track_gain`,
    # `replaygain_album_gain`, `..._peak`). Empty dict when the source had
    # no RG tags. AudioPlayer uses these to normalise level differences
    # between tracks during local playback.
    replaygain: dict[str, str] = {}
    duration_s: float = 0.0
    has_cover: bool = False
    cover_pixels: int = 0
    # When set, the TUI plays this URL instead of `path` — populated by the
    # Subsonic client mode so the same widgets/format helpers work for both
    # local files and remote streams.
    stream_url: str | None = None

LibraryAlbum

Bases: BaseModel

Album-level rollup with audit warnings populated by audit().

Source code in src/musickit/library/models.py
class LibraryAlbum(BaseModel):
    """Album-level rollup with audit warnings populated by `audit()`."""

    model_config = ConfigDict(arbitrary_types_allowed=True)

    path: Path
    artist_dir: str
    album_dir: str
    tag_album: str | None = None
    tag_year: str | None = None
    tag_album_artist: str | None = None
    tag_genre: str | None = None
    track_count: int = 0
    disc_count: int = 1
    is_compilation: bool = False
    has_cover: bool = False
    cover_pixels: int = 0
    tracks: list[LibraryTrack] = []
    warnings: list[str] = []
    # When set, this album was sourced from a Subsonic server with this ID.
    # The TUI uses it to lazy-load tracks via `getAlbum?id=...` when the
    # user opens the album, instead of pre-fetching every track at launch.
    subsonic_id: str | None = None

LibraryIndex

Bases: BaseModel

Full library index, sorted by (artist_dir, album_dir).

Source code in src/musickit/library/models.py
class LibraryIndex(BaseModel):
    """Full library index, sorted by `(artist_dir, album_dir)`."""

    model_config = ConfigDict(arbitrary_types_allowed=True)

    root: Path
    albums: list[LibraryAlbum] = []

ValidationResult

Counts returned by validate() for one-line logging.

Source code in src/musickit/library/scan.py
class ValidationResult:
    """Counts returned by `validate()` for one-line logging."""

    __slots__ = ("added", "removed", "updated")

    def __init__(self, *, added: int, removed: int, updated: int) -> None:
        self.added = added
        self.removed = removed
        self.updated = updated

    def __bool__(self) -> bool:
        return bool(self.added or self.removed or self.updated)

    def __repr__(self) -> str:  # pragma: no cover — debug aid only
        return f"ValidationResult(added={self.added}, removed={self.removed}, updated={self.updated})"

Functions

audit_album(album)

Replace album.warnings with a fresh audit pass for one album.

Warnings are sorted alphabetically at the end so the in-memory LibraryIndex produced by scan_full matches the one produced by load (SQLite returns album_warnings rows ORDER BY warning).

Source code in src/musickit/library/audit.py
def audit_album(album: LibraryAlbum) -> None:
    """Replace `album.warnings` with a fresh audit pass for one album.

    Warnings are sorted alphabetically at the end so the in-memory
    `LibraryIndex` produced by `scan_full` matches the one produced by
    `load` (SQLite returns `album_warnings` rows ORDER BY warning).
    """
    album.warnings = []
    _audit_cover(album)
    _audit_year(album)
    _audit_album_artist(album)
    _audit_album_name(album)
    _audit_artist_name(album)
    _audit_tag_path_mismatch(album)
    _audit_track_gaps(album)
    _audit_track_count(album)
    album.warnings.sort()

fix_index(index, *, dry_run=False, console=None, year_lookup=None, prefer_dirname=False, on_album=None)

Apply deterministic fixes to every flagged album in index.

Returns a list of human-readable action lines. year_lookup is the MusicBrainz year-lookup callable (defaults to enrich.musicbrainz.lookup_release_year — injectable for tests).

prefer_dirname=True inverts the tag/path-mismatch resolution: tags get rewritten from the dir name instead of the dir being renamed from the tag. Use this when you've hand-curated the directory layout and want the tags to follow.

on_album(album, idx, total) fires once per FLAGGED album right before its fixes run; clean albums (no warnings) are skipped silently and don't count against the total. Used by the CLI to drive a progress bar through the slow MB lookups.

Source code in src/musickit/library/fix.py
def fix_index(
    index: LibraryIndex,
    *,
    dry_run: bool = False,
    console: Console | None = None,
    year_lookup: object | None = None,
    prefer_dirname: bool = False,
    on_album: FixProgressCallback | None = None,
) -> list[str]:
    """Apply deterministic fixes to every flagged album in `index`.

    Returns a list of human-readable action lines. `year_lookup` is the
    MusicBrainz year-lookup callable (defaults to
    `enrich.musicbrainz.lookup_release_year` — injectable for tests).

    `prefer_dirname=True` inverts the tag/path-mismatch resolution: tags
    get rewritten from the dir name instead of the dir being renamed from
    the tag. Use this when you've hand-curated the directory layout and
    want the tags to follow.

    `on_album(album, idx, total)` fires once per FLAGGED album right
    before its fixes run; clean albums (no warnings) are skipped silently
    and don't count against the total. Used by the CLI to drive a
    progress bar through the slow MB lookups.
    """
    if year_lookup is None:
        from musickit.enrich.musicbrainz import lookup_release_year

        year_lookup = lookup_release_year

    flagged = [a for a in index.albums if a.warnings]
    total = len(flagged)
    actions: list[str] = []
    for idx, album in enumerate(flagged, start=1):
        if on_album is not None:
            on_album(album, idx, total)
        actions.extend(
            fix_album(
                album,
                dry_run=dry_run,
                console=console,
                year_lookup=year_lookup,
                prefer_dirname=prefer_dirname,
            )
        )
    return actions

fix_album(album, *, dry_run=False, console=None, year_lookup, prefer_dirname=False)

Apply fixes to one album. Returns the action lines performed (or planned).

Source code in src/musickit/library/fix.py
def fix_album(
    album: LibraryAlbum,
    *,
    dry_run: bool = False,
    console: Console | None = None,
    year_lookup: object,
    prefer_dirname: bool = False,
) -> list[str]:
    """Apply fixes to one album. Returns the action lines performed (or planned)."""
    actions: list[str] = []
    label = f"{album.artist_dir} / {album.album_dir}"

    # Missing-year fixes go first so the rename below sees the new year.
    if any("missing year" in w for w in album.warnings):
        new_year = _fix_missing_year(album, dry_run=dry_run, year_lookup=year_lookup)
        if new_year:
            actions.append(f"{label}: year ← {new_year} (musicbrainz)")
            if console is not None:
                console.print(f"[green]✓[/green] {label}: year ← {new_year} (musicbrainz)")

    has_mismatch = any(w.startswith("tag/path mismatch") for w in album.warnings)
    if prefer_dirname:
        # Push dir-name → tags. Year set by MB above (if any) is preserved
        # only if the dir has no leading year prefix.
        if has_mismatch:
            updated = _fix_retag_to_match_dir(album, dry_run=dry_run)
            if updated:
                tag_album, tag_year = updated
                msg = f"{label}: tag ← album={tag_album!r}"
                if tag_year:
                    msg += f", year={tag_year}"
                actions.append(msg)
                if console is not None:
                    console.print(f"[green]✓[/green] {msg}")
    else:
        # Default: tag wins, rename the dir to match.
        if has_mismatch or actions:
            renamed = _fix_rename_to_match_tag(album, dry_run=dry_run)
            if renamed:
                actions.append(f"{label}: renamed dir → {renamed}")
                if console is not None:
                    console.print(f"[green]✓[/green] {label}: renamed dir → {renamed}")

    return actions

db_path(root)

Return <root>/.musickit/index.db (the absolute index location).

Source code in src/musickit/library/db.py
def db_path(root: Path) -> Path:
    """Return `<root>/.musickit/index.db` (the absolute index location)."""
    return root / INDEX_DIR_NAME / INDEX_DB_NAME

open_db(root)

Open or create the index DB for root.

If the existing DB has a stale schema_version or was written for a different library_root_abs, the file (and any WAL sidecars) is unlinked and a fresh schema is created.

Source code in src/musickit/library/db.py
def open_db(root: Path) -> sqlite3.Connection:
    """Open or create the index DB for `root`.

    If the existing DB has a stale `schema_version` or was written for a
    different `library_root_abs`, the file (and any WAL sidecars) is
    unlinked and a fresh schema is created.
    """
    path = db_path(root)
    path.parent.mkdir(parents=True, exist_ok=True)

    if path.exists() and not _can_use_existing(path, root):
        log.info("library index: schema/root mismatch at %s; rebuilding", path)
        _unlink_db(path)

    fresh = not path.exists()
    conn = sqlite3.connect(path, check_same_thread=False, isolation_level=None)
    conn.row_factory = sqlite3.Row
    _apply_pragmas(conn)
    if fresh:
        _create_schema(conn, root)
    return conn

is_empty(conn)

True when the DB has no album rows yet (fresh schema, never scanned).

Source code in src/musickit/library/db.py
def is_empty(conn: sqlite3.Connection) -> bool:
    """True when the DB has no album rows yet (fresh schema, never scanned)."""
    row = conn.execute("SELECT 1 FROM albums LIMIT 1").fetchone()
    return row is None

load_or_scan(root, *, use_cache=True, force=False, on_album=None, measure_pictures=False)

Return a LibraryIndex for root, using the on-disk cache when available.

use_cache=False skips the DB entirely (in-memory scan + audit). Used when .musickit/ cannot be created (read-only mount) or when the caller passes --no-cache.

force=True ignores any existing cache and runs a full rescan, rewriting every row. Maps to the --full-rescan CLI flag and the startScan Subsonic endpoint.

Without force, a warm cache is loaded and a validate() pass reconciles the DB against any filesystem-level adds/removes/tag-edits that happened while no watcher was running.

Source code in src/musickit/library/load.py
def load_or_scan(
    root: Path,
    *,
    use_cache: bool = True,
    force: bool = False,
    on_album: ScanProgressCallback | None = None,
    measure_pictures: bool = False,
) -> LibraryIndex:
    """Return a `LibraryIndex` for `root`, using the on-disk cache when available.

    `use_cache=False` skips the DB entirely (in-memory scan + audit). Used
    when `.musickit/` cannot be created (read-only mount) or when the
    caller passes `--no-cache`.

    `force=True` ignores any existing cache and runs a full rescan,
    rewriting every row. Maps to the `--full-rescan` CLI flag and the
    `startScan` Subsonic endpoint.

    Without `force`, a warm cache is loaded and a `validate()` pass
    reconciles the DB against any filesystem-level adds/removes/tag-edits
    that happened while no watcher was running.
    """
    from musickit.library.audit import audit
    from musickit.library.db import open_db
    from musickit.library.scan import scan, scan_full, validate

    if not use_cache:
        index = scan(root, on_album=on_album, measure_pictures=measure_pictures)
        audit(index)
        return index

    try:
        conn = open_db(root)
    except OSError as exc:
        log.warning(
            "library cache disabled: cannot create %s/.musickit (%s); falling back to in-memory scan",
            root,
            exc,
        )
        index = scan(root, on_album=on_album, measure_pictures=measure_pictures)
        audit(index)
        return index

    try:
        from musickit.library.db import is_empty

        if force or is_empty(conn):
            return scan_full(root, conn, on_album=on_album, measure_pictures=measure_pictures)
        result = validate(root, conn, measure_pictures=measure_pictures, on_album=on_album)
        if result:
            log.info(
                "library cache: validated (added=%d removed=%d updated=%d)",
                result.added,
                result.removed,
                result.updated,
            )
        return load(root, conn)
    finally:
        conn.close()

scan_full(root, conn, *, on_album=None, measure_pictures=False)

Walk root, audit, and write the full result to the index DB.

Used on cold start when the DB is empty and after a startScan. Wipes every album/track/genre/warning row first so the DB matches the filesystem exactly. Returns the same LibraryIndex that callers used to get from scan() + audit().

Source code in src/musickit/library/scan.py
def scan_full(
    root: Path,
    conn: sqlite3.Connection,
    *,
    on_album: ScanProgressCallback | None = None,
    measure_pictures: bool = False,
) -> LibraryIndex:
    """Walk `root`, audit, and write the full result to the index DB.

    Used on cold start when the DB is empty and after a `startScan`. Wipes
    every album/track/genre/warning row first so the DB matches the
    filesystem exactly. Returns the same `LibraryIndex` that callers used
    to get from `scan()` + `audit()`.
    """
    # Imported here to avoid a circular import: audit.py depends on scan.py
    # for `_split_dir_year`, so we can't import it at module scope.
    from musickit.library.audit import audit

    index = scan(root, on_album=on_album, measure_pictures=measure_pictures)
    audit(index)
    write_index(conn, root, index)
    return index

validate(root, conn, *, measure_pictures=False, on_album=None)

Diff the filesystem against DB rows and apply add/remove/update deltas.

Catches changes that happened while no serve/watcher was running: new albums dropped in, removed albums, tag edits applied with another tool. Each affected album is re-scanned in full and re-audited; rows for vanished albums are dropped.

Returns a ValidationResult so callers can log a one-line summary.

Source code in src/musickit/library/scan.py
def validate(
    root: Path,
    conn: sqlite3.Connection,
    *,
    measure_pictures: bool = False,
    on_album: ScanProgressCallback | None = None,
) -> "ValidationResult":
    """Diff the filesystem against DB rows and apply add/remove/update deltas.

    Catches changes that happened while no `serve`/watcher was running:
    new albums dropped in, removed albums, tag edits applied with another
    tool. Each affected album is re-scanned in full and re-audited; rows
    for vanished albums are dropped.

    Returns a `ValidationResult` so callers can log a one-line summary.
    """
    fs_album_dirs = {p.resolve() for p in _iter_album_dirs(root)}

    db_track_rows = list(conn.execute("SELECT id, album_id, rel_path, file_mtime, file_size FROM tracks"))
    db_album_rows = list(conn.execute("SELECT id, rel_path FROM albums"))

    db_track_keys = {row["rel_path"]: row for row in db_track_rows}
    db_album_dirs = {(root / row["rel_path"]).resolve(): row for row in db_album_rows}

    affected: set[Path] = set()

    # Albums that vanished entirely → row deletion only, no rescan.
    for db_dir, _row in db_album_dirs.items():
        if db_dir not in fs_album_dirs:
            affected.add(db_dir)

    # New albums on disk that the DB doesn't know about.
    for fs_dir in fs_album_dirs:
        if fs_dir not in db_album_dirs:
            affected.add(fs_dir)

    # For album dirs the DB and FS both know, find tag-edit / file-add /
    # file-remove deltas at the track level.
    fs_audio_by_dir: dict[Path, set[Path]] = {}
    for fs_dir in fs_album_dirs & set(db_album_dirs):
        try:
            fs_audio_by_dir[fs_dir] = {
                p.resolve() for p in fs_dir.iterdir() if p.is_file() and p.suffix.lower() in SUPPORTED_AUDIO_EXTS
            }
        except OSError:
            affected.add(fs_dir)
            continue

    # Build a per-album view of DB tracks for the dirs we still care about.
    db_audio_by_dir: dict[Path, dict[Path, "_TrackRow"]] = {}
    for rel, row in db_track_keys.items():
        abs_path = (root / rel).resolve()
        parent = abs_path.parent
        if parent not in fs_audio_by_dir:
            continue
        db_audio_by_dir.setdefault(parent, {})[abs_path] = row

    for fs_dir, fs_files in fs_audio_by_dir.items():
        db_files = db_audio_by_dir.get(fs_dir, {})
        if set(fs_files) != set(db_files):
            affected.add(fs_dir)
            continue
        for fs_file in fs_files:
            row = db_files[fs_file]
            mtime, size = _safe_stat(fs_file)
            if mtime != row["file_mtime"] or size != row["file_size"]:
                affected.add(fs_dir)
                break

    if not affected:
        return ValidationResult(added=0, removed=0, updated=0)

    return rescan_albums(
        root,
        conn,
        affected,
        measure_pictures=measure_pictures,
        on_album=on_album,
        _db_album_dirs=db_album_dirs,
    )

rescan_albums(root, conn, album_dirs, *, measure_pictures=False, on_album=None, _db_album_dirs=None)

Re-scan + re-audit each album dir; drop rows for any that vanished.

Reusable by the cold-start validate() pass and (in PR 2) the filesystem watcher. The DB is updated under one transaction so a crash mid-rescan can't half-apply changes.

Source code in src/musickit/library/scan.py
def rescan_albums(
    root: Path,
    conn: sqlite3.Connection,
    album_dirs: Iterable[Path],
    *,
    measure_pictures: bool = False,
    on_album: ScanProgressCallback | None = None,
    _db_album_dirs: dict[Path, "_AlbumRow"] | None = None,
) -> "ValidationResult":
    """Re-scan + re-audit each album dir; drop rows for any that vanished.

    Reusable by the cold-start `validate()` pass and (in PR 2) the
    filesystem watcher. The DB is updated under one transaction so a
    crash mid-rescan can't half-apply changes.
    """
    from musickit.library.audit import audit_album

    dirs = sorted({p.resolve() for p in album_dirs})
    if _db_album_dirs is None:
        _db_album_dirs = {
            (root / row["rel_path"]).resolve(): row for row in conn.execute("SELECT id, rel_path FROM albums")
        }

    now = time.time()
    root_abs = root.resolve()
    added = removed = updated = 0
    total = len(dirs)

    conn.execute("BEGIN IMMEDIATE")
    try:
        for idx, album_dir in enumerate(dirs, start=1):
            if on_album is not None:
                on_album(album_dir, idx, total)

            existing_row = _db_album_dirs.get(album_dir)
            if existing_row is not None:
                conn.execute("DELETE FROM albums WHERE id = ?", (existing_row["id"],))

            if not album_dir.is_dir():
                if existing_row is not None:
                    removed += 1
                continue

            album = _scan_album(album_dir, measure_pictures=measure_pictures)
            if album.track_count == 0 and existing_row is None:
                # Empty dir that never had a row — nothing to do.
                continue
            audit_album(album)
            new_album_id = _insert_album(conn, root_abs, album, now)
            for track in album.tracks:
                track_id = _insert_track(conn, new_album_id, root_abs, track, now)
                for genre in track.genres:
                    conn.execute(
                        "INSERT OR IGNORE INTO track_genres(track_id, genre) VALUES (?, ?)",
                        (track_id, genre),
                    )
            for warning in album.warnings:
                conn.execute(
                    "INSERT OR IGNORE INTO album_warnings(album_id, warning) VALUES (?, ?)",
                    (new_album_id, warning),
                )

            if existing_row is None:
                added += 1
            else:
                updated += 1
        conn.execute("COMMIT")
    except Exception:
        conn.execute("ROLLBACK")
        raise

    # Reclaim pages freed by the per-album DELETE+INSERT cycle. Cheap
    # when there's nothing to free; matters across many rescans.
    if removed or updated:
        from musickit.library.db import reclaim_freelist

        reclaim_freelist(conn)

    return ValidationResult(added=added, removed=removed, updated=updated)

musickit.serve

FastAPI factory + auth + config for the Subsonic-compatible HTTP server.

serve

Subsonic-compatible HTTP server for the converted musickit library.

musickit serve [DIR] launches a FastAPI app that exposes the library via the Subsonic API (v1.16.1). Any Subsonic client (Symfonium, play:Sub, Feishin, DSub, etc.) can browse, search, and stream from it.

Classes

ServeConfig

Bases: BaseModel

Resolved server credentials. Plain text — this is local-self-hosted.

Source code in src/musickit/serve/config.py
class ServeConfig(BaseModel):
    """Resolved server credentials. Plain text — this is local-self-hosted."""

    username: str
    password: str

Functions

create_app(*, root, cfg, use_cache=True)

Build the FastAPI app for root with the given credentials.

use_cache=False disables the persistent <root>/.musickit/index.db and falls back to in-memory scan on every rebuild.

Source code in src/musickit/serve/app.py
def create_app(*, root: Path, cfg: ServeConfig, use_cache: bool = True) -> FastAPI:
    """Build the FastAPI app for `root` with the given credentials.

    `use_cache=False` disables the persistent `<root>/.musickit/index.db`
    and falls back to in-memory scan on every rebuild.
    """
    app = FastAPI(
        title="musickit",
        description="Subsonic-compatible API server for a converted musickit library.",
        version=SERVER_VERSION,
        docs_url=None,  # the OpenAPI docs collide with `?u=&p=` — keep them off for now
        redoc_url=None,
    )
    app.state.root = root
    app.state.cfg = cfg
    app.state.cache = IndexCache(root, use_cache=use_cache)
    # Stars / favourites — separate file from the index DB (which is
    # fully derived and gets wiped on schema bumps). User data lives at
    # `<root>/.musickit/stars.toml`; survives `library index drop`.
    from musickit.serve.stars import StarStore

    app.state.stars = StarStore.for_root(root)

    # Spec default is XML; clients opt into JSON via `?f=json`. Convert here
    # so endpoints stay simple (return dicts; the middleware emits the right
    # serialization). Binary responses (stream / cover) skip conversion via
    # the content-type check below.
    app.add_middleware(SubsonicFormatMiddleware)
    # Outermost: merge POST form-body params into query string so the
    # auth dependency + endpoint Query() defaults pick them up uniformly.
    # play:Sub (iOS) sends credentials this way.
    app.add_middleware(PostFormToQueryMiddleware)

    async def require_auth(
        request: Request,
        u: str | None = Query(default=None),
        p: str | None = Query(default=None),
        t: str | None = Query(default=None),
        s: str | None = Query(default=None),
    ) -> None:
        """FastAPI dependency that enforces Subsonic auth on every endpoint."""
        del request
        try:
            verify(cfg, user=u, password=p, token=t, salt=s)
        except AuthError as exc:
            raise _SubsonicAuthError(str(exc)) from exc

    app.state.require_auth = require_auth

    @app.exception_handler(_SubsonicAuthError)
    async def auth_exception_handler(_request: Request, exc: _SubsonicAuthError) -> JSONResponse:
        return JSONResponse(error_envelope(40, str(exc)))

    # Mount endpoint groups. Imports happen lazily to keep the module graph
    # shallow and to avoid pulling FastAPI into pure-data modules.
    from musickit.serve.endpoints.browsing import router as browsing_router
    from musickit.serve.endpoints.extras import router as extras_router
    from musickit.serve.endpoints.lyrics import router as lyrics_router
    from musickit.serve.endpoints.media import router as media_router
    from musickit.serve.endpoints.scan import router as scan_router
    from musickit.serve.endpoints.search import router as search_router
    from musickit.serve.endpoints.system import router as system_router

    auth_dep = [Depends(require_auth)]
    app.include_router(system_router, prefix="/rest", dependencies=auth_dep)
    app.include_router(browsing_router, prefix="/rest", dependencies=auth_dep)
    app.include_router(scan_router, prefix="/rest", dependencies=auth_dep)
    app.include_router(media_router, prefix="/rest", dependencies=auth_dep)
    app.include_router(search_router, prefix="/rest", dependencies=auth_dep)
    app.include_router(extras_router, prefix="/rest", dependencies=auth_dep)
    app.include_router(lyrics_router, prefix="/rest", dependencies=auth_dep)

    # Root probe: Amperfy and some other clients hit `GET /` before `/rest/ping`
    # to confirm the host is reachable. Without this they get a 404 and refuse
    # to log in. The response body is informational + harmless to expose pre-auth.
    @app.get("/")
    async def server_info() -> dict[str, Any]:
        return {
            "name": SERVER_NAME,
            "version": SERVER_VERSION,
            "type": "subsonic-compatible",
            "api": "/rest/",
            "spec": "https://opensubsonic.netlify.app/docs/api-reference/",
        }

    return app

resolve_credentials(*, cli_user, cli_password)

CLI flags win over the TOML. Falls back to admin/admin when nothing is set.

Returns (cfg, used_defaults) so the caller can warn the user when the insecure defaults are in play.

Source code in src/musickit/serve/config.py
def resolve_credentials(*, cli_user: str | None, cli_password: str | None) -> tuple[ServeConfig, bool]:
    """CLI flags win over the TOML. Falls back to admin/admin when nothing is set.

    Returns `(cfg, used_defaults)` so the caller can warn the user when the
    insecure defaults are in play.
    """
    file_creds = load_config()
    username = cli_user or file_creds.get("username") or DEFAULT_USERNAME
    password = cli_password or file_creds.get("password") or DEFAULT_PASSWORD
    used_defaults = username == DEFAULT_USERNAME and password == DEFAULT_PASSWORD
    return ServeConfig(username=username, password=password), used_defaults

musickit.naming

Filesystem-safe folder + filename builders.

naming

Filesystem-safe name building for artist / album / track output paths.

Attributes

VARIOUS_ARTISTS = 'Various Artists' module-attribute

Functions

artist_folder(album_artist, fallback_artist, *, is_compilation=False)

Folder name for the artist level. Maps VA / compilation albums to Various Artists.

Three triggers route to the canonical Various Artists folder: - album_artist tag is a VA alias (VA, V.A., Various, …) - fallback_artist (per-track majority) is itself a VA alias — some rips stamp VA as the per-track artist and leave album_artist empty - is_compilation is True (album-level signal: distinct per-track artists with no shared album_artist tag, e.g. an MP3 mix labelled only by filename)

Source code in src/musickit/naming.py
def artist_folder(album_artist: str | None, fallback_artist: str | None, *, is_compilation: bool = False) -> str:
    """Folder name for the artist level. Maps VA / compilation albums to `Various Artists`.

    Three triggers route to the canonical `Various Artists` folder:
    - `album_artist` tag is a VA alias (`VA`, `V.A.`, `Various`, …)
    - `fallback_artist` (per-track majority) is itself a VA alias — some rips
      stamp `VA` as the per-track artist and leave `album_artist` empty
    - `is_compilation` is True (album-level signal: distinct per-track artists
      with no shared `album_artist` tag, e.g. an MP3 mix labelled only by
      filename)
    """
    if is_compilation or is_various_artists(album_artist) or is_various_artists(fallback_artist):
        return VARIOUS_ARTISTS
    name = (album_artist or "").strip() or (fallback_artist or "").strip() or "Unknown Artist"
    return sanitize_component(name)

album_folder(album, year)

Folder name for the album level.

Format: YYYY - Album so directory listings inside an artist folder sort chronologically. Year is omitted if unknown, falling back to just Album. A year that's part of the album title (e.g. Vocal Trance Hits 2024, Taylor Swift's 1989) is intentionally left in place — it's the actual title.

Source code in src/musickit/naming.py
def album_folder(album: str | None, year: str | int | None) -> str:
    """Folder name for the album level.

    Format: `YYYY - Album` so directory listings inside an artist folder sort
    chronologically. Year is omitted if unknown, falling back to just `Album`.
    A year that's part of the album title (e.g. `Vocal Trance Hits 2024`,
    Taylor Swift's `1989`) is intentionally left in place — it's the actual
    title.
    """
    base = (album or "").strip() or "Unknown Album"
    year_str = _coerce_year(year)
    full = f"{year_str} - {base}" if year_str else base
    return sanitize_component(full)

track_filename(track_no, title, *, artist=None, disc_no=None, disc_total=None, track_total=None, extension='.m4a')

Output filename for a single track.

Default format: 01 - Title<ext>. When the album spans multiple discs (disc_total > 1), the disc number is prefixed: 01-01 - Title<ext>. When artist is provided (typically only for compilations / VA albums) it is inserted between track number and title: 01-05 - Artist - Title<ext>.

Track-number width grows with track_total so albums with ≥100 tracks sort alphabetically correctly: a 100-track album yields 001, 002, 010, 099, 100 instead of breaking at the ⅔-digit boundary. Disc-number width is fixed at 2 (no realistic disc count needs more).

Source code in src/musickit/naming.py
def track_filename(
    track_no: int | None,
    title: str | None,
    *,
    artist: str | None = None,
    disc_no: int | None = None,
    disc_total: int | None = None,
    track_total: int | None = None,
    extension: str = ".m4a",
) -> str:
    """Output filename for a single track.

    Default format: `01 - Title<ext>`. When the album spans multiple discs
    (`disc_total > 1`), the disc number is prefixed: `01-01 - Title<ext>`.
    When `artist` is provided (typically only for compilations / VA albums)
    it is inserted between track number and title: `01-05 - Artist - Title<ext>`.

    Track-number width grows with `track_total` so albums with ≥100 tracks
    sort alphabetically correctly: a 100-track album yields `001`, `002`,
    `010`, `099`, `100` instead of breaking at the 2/3-digit boundary.
    Disc-number width is fixed at 2 (no realistic disc count needs more).
    """
    width = 3 if (track_total or 0) >= 100 else 2
    track_str = f"{track_no:0{width}d}" if track_no else "0" * width
    title_str = (title or "").strip() or "Untitled"
    title_str = sanitize_component(title_str)
    if disc_total and disc_total > 1 and disc_no:
        prefix = f"{disc_no:02d}-{track_str}"
    else:
        prefix = track_str
    ext = extension if extension.startswith(".") else f".{extension}"
    if artist:
        artist_str = sanitize_component(artist.strip())
        return f"{prefix} - {artist_str} - {title_str}{ext.lower()}"
    return f"{prefix} - {title_str}{ext.lower()}"

clean_folder_album_name(name)

Strip codec/quality tags + edition annotations + extract year.

Returns (cleaned_album_name, year_or_None). Used as a fallback when an album has no ALBUM tag and we have to lean on the folder name.

Strip order
  1. Edition annotations ((Deluxe Edition), [Remastered], (2018 Reissue), (40th Anniversary Edition)) — these would otherwise leak into _FOLDER_YEAR_RE and pollute the year pick.
  2. Year extraction from any remaining (YYYY) / bare digits.
  3. Codec/quality tags ([FLAC], [16Bit-44.1kHz]).
  4. VA - / Various - prefix.

Live annotations ((Live), (Live in Madrid 2019)) are kept — a live album is a distinct work from its studio counterpart and the audience expects to see it labeled.

Source code in src/musickit/naming.py
def clean_folder_album_name(name: str) -> tuple[str, str | None]:
    """Strip codec/quality tags + edition annotations + extract year.

    Returns `(cleaned_album_name, year_or_None)`. Used as a fallback when an
    album has no `ALBUM` tag and we have to lean on the folder name.

    Strip order:
      1. Edition annotations (`(Deluxe Edition)`, `[Remastered]`, `(2018
         Reissue)`, `(40th Anniversary Edition)`) — these would otherwise
         leak into `_FOLDER_YEAR_RE` and pollute the year pick.
      2. Year extraction from any remaining `(YYYY)` / bare digits.
      3. Codec/quality tags (`[FLAC]`, `[16Bit-44.1kHz]`).
      4. `VA -` / `Various -` prefix.

    Live annotations (`(Live)`, `(Live in Madrid 2019)`) are kept — a live
    album is a distinct work from its studio counterpart and the audience
    expects to see it labeled.
    """
    cleaned = _FOLDER_EDITION_RE.sub(" ", name)
    year_match = _FOLDER_YEAR_RE.search(cleaned)
    year = year_match.group(1) if year_match else None
    if year_match:
        cleaned = cleaned.replace(year_match.group(0), " ")
    cleaned = _FOLDER_TAG_RE.sub(" ", cleaned)
    cleaned = _VA_PREFIX_RE.sub("", cleaned)
    cleaned = re.sub(r"\s+", " ", cleaned).strip(" -_")
    return cleaned or name, year

leading_year_from_folder(name)

Return the 4-digit year iff name starts with one followed by a separator.

Used by the convert pipeline to override reissue years that survive in track tags when the input dir is hand-named with the original year (e.g. 1983. Album! [2018 Reissue] should yield 1983, not 2018).

Source code in src/musickit/naming.py
def leading_year_from_folder(name: str | None) -> str | None:
    """Return the 4-digit year iff `name` starts with one followed by a separator.

    Used by the convert pipeline to override reissue years that survive in
    track tags when the input dir is hand-named with the original year (e.g.
    `1983. Album! [2018 Reissue]` should yield 1983, not 2018).
    """
    if not name:
        return None
    match = _LEADING_FOLDER_YEAR_RE.match(name)
    return match.group(1) if match else None

is_various_artists(album_artist)

Return True if album_artist indicates a Various-Artists compilation.

Source code in src/musickit/naming.py
def is_various_artists(album_artist: str | None) -> bool:
    """Return True if `album_artist` indicates a Various-Artists compilation."""
    if not album_artist:
        return False
    return album_artist.strip().casefold() in _VA_ALIASES

sanitize_component(value)

Make value safe to use as a single path component on any OS.

Replaces forbidden characters, collapses whitespace, NFC-normalizes unicode, strips trailing dots/spaces, and caps the encoded length at 180 bytes.

Source code in src/musickit/naming.py
def sanitize_component(value: str) -> str:
    """Make `value` safe to use as a single path component on any OS.

    Replaces forbidden characters, collapses whitespace, NFC-normalizes unicode,
    strips trailing dots/spaces, and caps the encoded length at 180 bytes.
    """
    cleaned = unicodedata.normalize("NFC", value).translate(_BAD_CHARS)
    cleaned = _WHITESPACE_RE.sub(" ", cleaned).strip()
    cleaned = _TRAILING_BAD_RE.sub("", cleaned)
    if not cleaned:
        cleaned = "Unknown"
    encoded = cleaned.encode("utf-8")
    if len(encoded) > _MAX_COMPONENT_BYTES:
        # Truncate on a unicode-safe boundary by progressively decoding.
        cleaned = encoded[:_MAX_COMPONENT_BYTES].decode("utf-8", errors="ignore").rstrip()
    return cleaned

musickit.cover

Cover-art candidates, picker, normaliser.

cover

Locate, normalize, and embed album cover art.

Attributes

DEFAULT_MAX_EDGE = 1000 module-attribute

Classes

CoverCandidate

Bases: BaseModel

A candidate cover image, before normalization.

Source code in src/musickit/cover.py
class CoverCandidate(BaseModel):
    """A candidate cover image, before normalization."""

    model_config = ConfigDict(arbitrary_types_allowed=True)

    source: CoverSource
    data: bytes
    mime: str | None = None
    width: int = 0
    height: int = 0
    label: str = ""

    @property
    def pixels(self) -> int:
        return self.width * self.height

    @property
    def size_bytes(self) -> int:
        return len(self.data)

CoverSource

Bases: str, Enum

Where the cover came from. Used for reporting + tie-breaking under --enrich.

Source code in src/musickit/cover.py
class CoverSource(str, Enum):
    """Where the cover came from. Used for reporting + tie-breaking under --enrich."""

    EMBEDDED = "embedded"
    FOLDER = "folder"
    ONLINE = "online"

Cover

Bases: BaseModel

A normalized album cover ready to embed into every track of an album.

Source code in src/musickit/cover.py
class Cover(BaseModel):
    """A normalized album cover ready to embed into every track of an album."""

    model_config = ConfigDict(arbitrary_types_allowed=True)

    data: bytes
    mime: str  # "image/jpeg" or "image/png"
    width: int
    height: int
    source: CoverSource
    label: str

Functions

collect_candidates(album_dir, tracks)

Gather every plausible cover candidate for an album (offline only).

Source code in src/musickit/cover.py
def collect_candidates(album_dir: Path, tracks: list[SourceTrack]) -> list[CoverCandidate]:
    """Gather every plausible cover candidate for an album (offline only)."""
    candidates: list[CoverCandidate] = []

    # Collect every distinct embedded picture across the album. Most albums
    # carry the same cover on every track, but mixed rips occasionally have
    # one track with corrected/larger artwork — let the picker compare them
    # all and pick the highest-area image. Drop unparseable bytes here so
    # they never reach `normalize()`.
    seen_embedded: set[bytes] = set()
    for track in tracks:
        if not track.embedded_picture:
            continue
        if track.embedded_picture in seen_embedded:
            continue
        seen_embedded.add(track.embedded_picture)
        width, height = _measure(track.embedded_picture)
        if width == 0 or height == 0:
            continue
        candidates.append(
            CoverCandidate(
                source=CoverSource.EMBEDDED,
                data=track.embedded_picture,
                mime=track.embedded_picture_mime,
                width=width,
                height=height,
                label=f"embedded in {track.path.name}",
            )
        )

    # Search the album anchor AND every disc subfolder for folder images.
    # For bare-leading multi-disc layouts the anchor IS the wrapper; for
    # shared-prefix layouts (`Album (CD1)`) the anchor is the first disc
    # folder, so the other disc subfolders need scanning too — and the
    # parent wrapper might carry a top-level `folder.jpg` shared across
    # both discs (anchor.parent).
    seen_dirs: set[Path] = {album_dir}
    search_dirs: list[Path] = [album_dir]
    if _looks_like_disc_anchor(album_dir):
        # Anchor is `<wrapper>/<Album (CD1)>` — also scan `<wrapper>` for the
        # shared cover that lives at the wrapper level.
        parent = album_dir.parent
        if parent not in seen_dirs and parent != album_dir:
            seen_dirs.add(parent)
            search_dirs.append(parent)
    for track in tracks:
        if track.path.parent not in seen_dirs:
            seen_dirs.add(track.path.parent)
            search_dirs.append(track.path.parent)

    seen_image_paths: set[Path] = set()
    for folder_dir in search_dirs:
        for path in _find_folder_images(folder_dir):
            if path in seen_image_paths:
                continue
            seen_image_paths.add(path)
            try:
                data = path.read_bytes()
            except OSError:
                continue
            width, height = _measure(data)
            if width == 0 or height == 0:
                # Pillow couldn't decode — image is corrupt or not actually
                # an image despite the extension. Skip it so it can't be
                # picked, normalised, and crash the album later.
                continue
            candidates.append(
                CoverCandidate(
                    source=CoverSource.FOLDER,
                    data=data,
                    mime=_guess_mime(path.suffix),
                    width=width,
                    height=height,
                    label=path.name,
                )
            )

    return candidates

pick_best(candidates)

Pick the highest-quality candidate.

"Quality" = pixel area first, then file size, then source order (online > folder > embedded). The source-order tiebreaker matters under --enrich so that an online provider returning the same dimensions as a 600×600 scanned folder.jpg still wins.

Source code in src/musickit/cover.py
def pick_best(candidates: Iterable[CoverCandidate]) -> CoverCandidate | None:
    """Pick the highest-quality candidate.

    "Quality" = pixel area first, then file size, then source order
    (online > folder > embedded). The source-order tiebreaker matters under
    `--enrich` so that an online provider returning the same dimensions as
    a 600×600 scanned folder.jpg still wins.
    """
    source_order = {CoverSource.ONLINE: 2, CoverSource.FOLDER: 1, CoverSource.EMBEDDED: 0}
    best: CoverCandidate | None = None
    for candidate in candidates:
        if best is None:
            best = candidate
            continue
        if candidate.pixels > best.pixels:
            best = candidate
        elif candidate.pixels == best.pixels and candidate.size_bytes > best.size_bytes:
            best = candidate
        elif (
            candidate.pixels == best.pixels
            and candidate.size_bytes == best.size_bytes
            and source_order[candidate.source] > source_order[best.source]
        ):
            best = candidate
    return best

normalize(candidate, *, max_edge=DEFAULT_MAX_EDGE)

Decode + recompress the chosen candidate.

Output is JPEG ≤ max_edge px on the long side, RGB, quality 92 — except for PNGs that already fit, which are passed through unchanged.

Source code in src/musickit/cover.py
def normalize(candidate: CoverCandidate, *, max_edge: int = DEFAULT_MAX_EDGE) -> Cover:
    """Decode + recompress the chosen candidate.

    Output is JPEG ≤ `max_edge` px on the long side, RGB, quality 92 — except
    for PNGs that already fit, which are passed through unchanged.
    """
    opened = Image.open(io.BytesIO(candidate.data))
    opened.load()
    width, height = opened.size
    long_edge = max(width, height)
    needs_resize = long_edge > max_edge
    is_png = (candidate.mime or "").lower().endswith("png") or opened.format == "PNG"

    if not needs_resize and is_png:
        return Cover(
            data=candidate.data,
            mime="image/png",
            width=width,
            height=height,
            source=candidate.source,
            label=candidate.label,
        )

    image: Image.Image = opened
    if needs_resize:
        scale = max_edge / long_edge
        image = image.resize((int(width * scale), int(height * scale)), Image.Resampling.LANCZOS)
        width, height = image.size

    if image.mode not in ("RGB", "L"):
        image = image.convert("RGB")

    buffer = io.BytesIO()
    image.save(buffer, format="JPEG", quality=_JPEG_QUALITY, optimize=True, progressive=True)
    return Cover(
        data=buffer.getvalue(),
        mime="image/jpeg",
        width=width,
        height=height,
        source=candidate.source,
        label=candidate.label,
    )