Skip to content

Metadata accessor

MetadataAccessor bound to Dhis2Client.metadata — bulk operations over /api/metadata that don't fit the per-resource generated CRUD shape. Per-UID CRUD lives on client.resources.<resource>; this accessor is specifically for the multi-resource / multi-UID paths.

What's here

Method Role
search(query, ...) Cross-resource UID / code / name search. Fans out three concurrent filter calls per match axis and merges by UID.
usage(uid) Reverse lookup: "what references this UID?" Resolves the owning resource + finds every object that points at it.
delete_bulk / delete_bulk_multi Fast-delete via POST /api/metadata?importStrategy=DELETE.
patch_bulk / patch_bulk_multi Apply RFC 6902 patches to many UIDs in parallel. Client-side fan-out over PATCH /api/<resource>/<uid>; per-UID failures land in BulkPatchResult.failures instead of raising.
dry_run(by_resource) Validate a cross-resource bundle without committing (importMode=VALIDATE).

patch_bulk + BulkPatchResult

DHIS2 has no bulk-PATCH endpoint, so patch_bulk is client-side fan-out with a concurrency cap (default 8). Each (uid, ops) pair hits PATCH /api/<resource>/<uid> with the RFC 6902 JSON body. Per-UID failures are captured into BulkPatchResult.failures instead of raising — callers see a row-level report:

from dhis2w_client import ReplaceOp

result = await client.metadata.patch_bulk(
    "dataElements",
    [
        (de_a_uid, [ReplaceOp(op="replace", path="/shortName", value="A2")]),
        (de_b_uid, [ReplaceOp(op="replace", path="/shortName", value="B2")]),
    ],
)
if not result.ok:
    for failure in result.failures:
        print(failure.uid, failure.status_code, failure.message)

Cross-resource variant for mixed types:

result = await client.metadata.patch_bulk_multi(
    {
        "dataElements": [(de_uid, ops_a)],
        "indicators": [(ind_uid, ops_b)],
    },
    concurrency=16,
)

Typed ReplaceOp / AddOp / RemoveOp / MoveOp / CopyOp / TestOp models are available on the top-level package; raw dicts matching the RFC 6902 shape are also accepted.

metadata

Dhis2Client.metadata — bulk operations over /api/metadata.

One accessor for bulk-write paths that don't have a typed generated CRUD entry (generated resources cover the per-UID GET / POST / PUT / PATCH / DELETE surface per resource type). Covers:

  • delete_bulk / delete_bulk_multi — fast-delete via importStrategy=DELETE.
  • dry_run — validate a cross-resource bundle without committing (importMode=VALIDATE).
  • search — cross-resource metadata search. Fans out three concurrent /api/metadata?filter=<field>:ilike:<q> calls (one per match axis: id, code, name) and merges the results with UID dedup. Supports --resource <type> narrowing to one resource kind, --fields extra columns in the typed response, exact=True to switch from ilike substring to eq exact match. DHIS2's /api/metadata silently ignores rootJunction and ANDs multiple filters (see BUGS.md #29), so OR-across-fields needs N requests.
  • usage — reverse lookup: "what metadata references this UID?" Given a UID, resolves the owning resource via /api/identifiableObjects/{uid}, then fans out concurrent per-resource queries against /api/<target>? filter=<ref-path>:eq:<uid> to find every object that references it. Useful as a deletion-safety check — any dashboard / viz / dataset referencing the UID you're about to delete surfaces in the result.

For typed bulk writes scoped to a single resource, reach for the generated per-resource accessor's save_bulk method (client.resources.data_elements.save_bulk([DataElement(...), ...])) — IDE autocomplete gives you model-typed input on that path.

Attributes

Classes

SearchHit

Bases: BaseModel

One matching metadata object returned by MetadataAccessor.search or .usage.

extras holds any DHIS2 fields beyond the core four (id, name, code, href) — populated when callers pass a wider fields selector to search.

Source code in packages/dhis2w-client/src/dhis2w_client/metadata.py
class SearchHit(BaseModel):
    """One matching metadata object returned by `MetadataAccessor.search` or `.usage`.

    `extras` holds any DHIS2 fields beyond the core four (`id`, `name`, `code`,
    `href`) — populated when callers pass a wider `fields` selector to `search`.
    """

    model_config = ConfigDict(frozen=True)

    uid: str
    name: str
    code: str | None = None
    resource: str = Field(..., description="DHIS2 resource plural (e.g. 'dataElements', 'dashboards')")
    href: str | None = None
    extras: dict[str, Any] = Field(default_factory=dict)

SearchResults

Bases: BaseModel

Grouped results from MetadataAccessor.search — one list per resource type.

hits maps each DHIS2 resource plural (dataElements, indicators, dashboards, …) to the matching objects within that resource. Empty resources are omitted, so SearchResults.total plus hits.keys() answer "which resource types matched" in one look.

Source code in packages/dhis2w-client/src/dhis2w_client/metadata.py
class SearchResults(BaseModel):
    """Grouped results from `MetadataAccessor.search` — one list per resource type.

    `hits` maps each DHIS2 resource plural (`dataElements`, `indicators`,
    `dashboards`, …) to the matching objects within that resource. Empty
    resources are omitted, so `SearchResults.total` plus
    `hits.keys()` answer "which resource types matched" in one look.
    """

    model_config = ConfigDict(frozen=True)

    query: str
    hits: dict[str, list[SearchHit]] = Field(default_factory=dict)

    @property
    def total(self) -> int:
        """Total hits across every resource type."""
        return sum(len(rows) for rows in self.hits.values())

    def flat(self) -> list[SearchHit]:
        """Return every hit as a flat list — convenient for sorted/ranked display."""
        return [hit for rows in self.hits.values() for hit in rows]
Attributes
total property

Total hits across every resource type.

Functions
flat()

Return every hit as a flat list — convenient for sorted/ranked display.

Source code in packages/dhis2w-client/src/dhis2w_client/metadata.py
def flat(self) -> list[SearchHit]:
    """Return every hit as a flat list — convenient for sorted/ranked display."""
    return [hit for rows in self.hits.values() for hit in rows]

BulkPatchError

Bases: BaseModel

One per-UID failure from MetadataAccessor.patch_bulk(_multi).

DHIS2 reports PATCH errors one-at-a-time (the bulk endpoint is client-side fan-out over per-UID PATCH /api/<resource>/<uid>). This model captures what each rejection carried so callers can surface row-level detail without catching exceptions themselves.

Source code in packages/dhis2w-client/src/dhis2w_client/metadata.py
class BulkPatchError(BaseModel):
    """One per-UID failure from `MetadataAccessor.patch_bulk(_multi)`.

    DHIS2 reports PATCH errors one-at-a-time (the bulk endpoint is
    client-side fan-out over per-UID `PATCH /api/<resource>/<uid>`).
    This model captures what each rejection carried so callers can
    surface row-level detail without catching exceptions themselves.
    """

    model_config = ConfigDict(frozen=True)

    uid: str
    resource: str
    status_code: int
    message: str

BulkPatchResult

Bases: BaseModel

Aggregated result from MetadataAccessor.patch_bulk(_multi).

Tracks per-UID success/failure across a fan-out of RFC 6902 PATCH requests. The overall call always succeeds at the HTTP-layer level — individual rejections land in failures instead of raising.

Source code in packages/dhis2w-client/src/dhis2w_client/metadata.py
class BulkPatchResult(BaseModel):
    """Aggregated result from `MetadataAccessor.patch_bulk(_multi)`.

    Tracks per-UID success/failure across a fan-out of RFC 6902 PATCH
    requests. The overall call always succeeds at the HTTP-layer level
    — individual rejections land in `failures` instead of raising.
    """

    model_config = ConfigDict(frozen=True)

    successful_uids: list[str] = Field(default_factory=list)
    failures: list[BulkPatchError] = Field(default_factory=list)

    @property
    def total(self) -> int:
        """Total UIDs attempted — `successful + failed`."""
        return len(self.successful_uids) + len(self.failures)

    @property
    def ok(self) -> bool:
        """True when every UID succeeded; False when at least one failed."""
        return not self.failures
Attributes
total property

Total UIDs attempted — successful + failed.

ok property

True when every UID succeeded; False when at least one failed.

BulkSharingError

Bases: BaseModel

One per-UID failure from MetadataAccessor.apply_sharing_bulk(_multi).

DHIS2's /api/sharing is per-object, so the bulk surface is client-side fan-out. Per-object rejections land here so callers surface row-level detail without catching exceptions themselves.

Source code in packages/dhis2w-client/src/dhis2w_client/metadata.py
class BulkSharingError(BaseModel):
    """One per-UID failure from `MetadataAccessor.apply_sharing_bulk(_multi)`.

    DHIS2's `/api/sharing` is per-object, so the bulk surface is
    client-side fan-out. Per-object rejections land here so callers
    surface row-level detail without catching exceptions themselves.
    """

    model_config = ConfigDict(frozen=True)

    uid: str
    resource: str
    status_code: int
    message: str

BulkSharingResult

Bases: BaseModel

Aggregated result from MetadataAccessor.apply_sharing_bulk(_multi).

Tracks per-UID success/failure across a fan-out of POST /api/sharing requests. The overall call always succeeds at the HTTP-layer level — individual rejections land in failures instead of raising.

Source code in packages/dhis2w-client/src/dhis2w_client/metadata.py
class BulkSharingResult(BaseModel):
    """Aggregated result from `MetadataAccessor.apply_sharing_bulk(_multi)`.

    Tracks per-UID success/failure across a fan-out of `POST /api/sharing`
    requests. The overall call always succeeds at the HTTP-layer level
    — individual rejections land in `failures` instead of raising.
    """

    model_config = ConfigDict(frozen=True)

    successful_uids: list[str] = Field(default_factory=list)
    failures: list[BulkSharingError] = Field(default_factory=list)

    @property
    def total(self) -> int:
        """Total UIDs attempted — `successful + failed`."""
        return len(self.successful_uids) + len(self.failures)

    @property
    def ok(self) -> bool:
        """True when every UID succeeded; False when at least one failed."""
        return not self.failures
Attributes
total property

Total UIDs attempted — successful + failed.

ok property

True when every UID succeeded; False when at least one failed.

MetadataAccessor

Bulk metadata operations on /api/metadata.

Per-resource CRUD lives on the generated client.resources.<Resource> accessors (one class per DHIS2 resource type, auto-generated from /api/schemas). This accessor is specifically for the multi-resource / multi-UID paths that need the /api/metadata bundle endpoint — they don't fit the single-resource accessor shape.

Source code in packages/dhis2w-client/src/dhis2w_client/metadata.py
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
class MetadataAccessor:
    """Bulk metadata operations on `/api/metadata`.

    Per-resource CRUD lives on the generated `client.resources.<Resource>`
    accessors (one class per DHIS2 resource type, auto-generated from
    `/api/schemas`). This accessor is specifically for the
    multi-resource / multi-UID paths that need the `/api/metadata`
    bundle endpoint — they don't fit the single-resource accessor shape.
    """

    def __init__(self, client: Dhis2Client) -> None:
        """Bind to the sharing client."""
        self._client = client

    async def search(
        self,
        query: str,
        *,
        page_size: int = 50,
        resource: str | None = None,
        fields: str | None = None,
        exact: bool = False,
    ) -> SearchResults:
        """Cross-resource metadata search — UID / code / name, OR-merged.

        Fans out `len(_SEARCH_FIELDS)` concurrent `/api/metadata` calls, one
        per match axis (`id`, `code`, `name`), each filtered as
        `<field>:ilike:<query>` (or `<field>:eq:<query>` when `exact=True`).
        DHIS2 returns a bundle `{dataElements: [...], indicators: [...], ...}`
        per call grouped by resource type; this method merges them,
        deduplicating by `(resource, uid)` so an object matching on both
        id and name doesn't appear twice.

        Works uniformly for:

        - **Full UID** — id:ilike hits the exact record, the other axes
          usually miss; result is one hit.
        - **Partial UID** — id:ilike:<prefix> matches every UID starting
          with or containing the substring across every resource.
        - **Code lookup** — code:ilike:<fragment> matches on the business
          identifier; indicators / DEs often carry meaningful codes.
        - **Name substring** — name:ilike:<fragment> is the broadest
          match and usually dominates the result set.

        Pass `resource="dataElements"` (etc.) to narrow the result to one
        resource kind. Pass `fields="id,name,code,valueType,domainType"`
        to ask DHIS2 for extra attributes per hit — anything beyond the
        core four (`id` / `name` / `code` / `href`) lands on `SearchHit.extras`.
        Pass `exact=True` to switch from `ilike` substring to `eq` exact
        match (useful when a partial-UID search would otherwise match too
        many siblings).

        A single `rootJunction=OR` call would be cleaner, but DHIS2's
        `/api/metadata` endpoint silently ignores `rootJunction` and
        ANDs multiple filters (BUGS.md #29), so N requests are the only
        way to get cross-field OR.
        """
        operator = "eq" if exact else "ilike"
        effective_fields = fields or _SEARCH_DEFAULT_FIELDS
        field_results = await asyncio.gather(
            *(
                self._search_one_field(
                    field,
                    query,
                    operator=operator,
                    fields=effective_fields,
                    resource=resource,
                    page_size=page_size,
                )
                for field in _SEARCH_FIELDS
            ),
        )
        return _merge_search_results(query, field_results)

    async def _search_one_field(
        self,
        field: str,
        query: str,
        *,
        operator: str,
        fields: str,
        resource: str | None,
        page_size: int,
    ) -> SearchResults:
        """Issue one `/api/metadata?filter=<field>:<op>:<query>` call, optionally narrowed."""
        params: dict[str, Any] = {
            "filter": f"{field}:{operator}:{query}",
            "fields": fields,
            "pageSize": page_size,
        }
        if resource is not None:
            # Per-resource narrowing: DHIS2 understands `filter=<attr>:...` on
            # `/api/<resource>` exactly the same way, so we can hit the typed
            # endpoint and save the broadcast over every resource section.
            raw = await self._client.get_raw(f"/api/{resource}", params=params)
            # Response shape is `{<resource>: [...]}` — repackage as a bundle
            # so `_search_results_from_bundle` sees a uniform input.
            bundle = {resource: raw.get(resource) or []}
            return _search_results_from_bundle(query, bundle)
        raw = await self._client.get_raw("/api/metadata", params=params)
        return _search_results_from_bundle(query, raw)

    async def usage(
        self,
        uid: str,
        *,
        page_size: int = 100,
    ) -> SearchResults:
        """Reverse lookup — find every object that references `uid`.

        Two-step workflow: (1) resolve the UID's owning resource via
        `/api/identifiableObjects/{uid}` so we know which reference-shapes
        to look up; (2) fan out concurrent `/api/<target>?filter=<path>:eq:<uid>`
        calls against the known reference paths for that owning type.

        Coverage is best-effort — the reference map (`_USAGE_PATTERNS`)
        encodes the reference shapes most likely to block a delete in
        practice (dataSets + visualizations + maps + programStages
        referencing a DE, dashboards referencing a viz/map, categoryCombo
        references on DEs / dataSets / programs, OU references on users /
        groups, etc.). Extend `_USAGE_PATTERNS` when a new shape surfaces.

        Returns a `SearchResults` keyed by target resource — the same
        shape as `search`, so CLI rendering reuses cleanly. Empty result
        means no reference was found on any covered path — caveat: it
        does not prove the UID is safe to delete if the reference shape
        isn't in the map.

        Raises `Dhis2ApiError` with `status_code=404` when the UID
        doesn't resolve to any known resource.
        """
        owning = await self._resolve_resource(uid)
        patterns = _USAGE_PATTERNS.get(owning, ())
        if not patterns:
            return SearchResults(query=uid, hits={})
        query_results = await asyncio.gather(
            *(
                self._usage_query(target, template.format(uid=uid), page_size=page_size)
                for target, template in patterns
            ),
        )
        return _merge_search_results(uid, query_results)

    async def _resolve_resource(self, uid: str) -> str:
        """Resolve the UID's owning resource via `/api/identifiableObjects/{uid}`."""
        raw = await self._client.get_raw(f"/api/identifiableObjects/{uid}")
        href = str(raw.get("href") or "")
        parts = [p for p in href.split("/") if p]
        if len(parts) < 2 or parts[-1] != uid:
            return "unknown"
        return parts[-2]

    async def _usage_query(self, target: str, filter_expr: str, *, page_size: int) -> SearchResults:
        """Issue one `/api/<target>?filter=<expr>` call for the reverse-reference scan."""
        params: dict[str, Any] = {"filter": filter_expr, "fields": _SEARCH_DEFAULT_FIELDS, "pageSize": page_size}
        raw = await self._client.get_raw(f"/api/{target}", params=params)
        bundle = {target: raw.get(target) or []}
        return _search_results_from_bundle(target, bundle)

    async def delete_bulk(self, resource_type: str, uids: Sequence[str]) -> WebMessageResponse:
        """Delete every UID in `uids` from one DHIS2 resource type in a single request.

        Wraps `POST /api/metadata?importStrategy=DELETE&atomicMode=NONE` with a
        minimal `{resource_type: [{"id": uid}, ...]}` bundle. Returns the
        `WebMessageResponse` envelope — `.import_count().deleted` reports the
        total rows deleted; `.conflicts()` lists anything DHIS2 refused
        (foreign-key constraints, soft-delete protection, etc.).

        `atomicMode=NONE` lets partial failures through: some UIDs deleted,
        some held back with a conflict. Switch to `delete_bulk_multi` with
        atomic semantics when every row must delete or none should.
        Empty `uids` short-circuits with a no-op envelope (no HTTP call).
        """
        return await self.delete_bulk_multi({resource_type: list(uids)})

    async def delete_bulk_multi(
        self,
        by_resource: Mapping[str, Sequence[str]],
        *,
        atomic_mode: str = "NONE",
    ) -> WebMessageResponse:
        """Delete across multiple resource types in one `/api/metadata` call.

        `by_resource` maps each resource type (e.g. `"dataElements"`,
        `"indicators"`) to the UIDs to delete for that type. Entries with
        empty UID lists are skipped. `atomic_mode` controls DHIS2's
        partial-failure behaviour: `"NONE"` (default) lets individual
        conflicts through, `"ALL"` rolls the entire bundle back on any
        conflict.
        """
        bundle: dict[str, list[dict[str, str]]] = {
            resource: [{"id": uid} for uid in uids] for resource, uids in by_resource.items() if uids
        }
        if not bundle:
            return WebMessageResponse(status=Status.OK, httpStatus="OK", httpStatusCode=200, message="no uids supplied")
        raw = await self._client.post_raw(
            "/api/metadata",
            body=bundle,
            params={"importStrategy": "DELETE", "atomicMode": atomic_mode},
        )
        return WebMessageResponse.model_validate(raw)

    async def patch_bulk(
        self,
        resource_type: str,
        patches: Sequence[tuple[str, Sequence[JsonPatchOp | dict[str, Any]]]],
        *,
        concurrency: int = 8,
    ) -> BulkPatchResult:
        """Apply RFC 6902 patches to many UIDs on one resource in parallel.

        `patches` is a list of `(uid, ops)` pairs. `ops` can carry typed
        `JsonPatchOp` models (auto-dumped via `by_alias + exclude_none`)
        or raw dicts already matching the RFC 6902 shape. DHIS2 does not
        expose a single bulk-PATCH endpoint, so this is client-side
        fan-out over `PATCH /api/<resource>/<uid>` — `concurrency` caps
        simultaneous in-flight requests (default 8, a sensible sweet
        spot against a single DHIS2 node).

        Per-UID failures do not raise — they land in the returned
        `BulkPatchResult.failures`. Call `.ok` for a bool "every patch
        applied" summary, or inspect `.failures` for row-level detail.
        """
        return await self.patch_bulk_multi({resource_type: patches}, concurrency=concurrency)

    async def patch_bulk_multi(
        self,
        by_resource: Mapping[str, Sequence[tuple[str, Sequence[JsonPatchOp | dict[str, Any]]]]],
        *,
        concurrency: int = 8,
    ) -> BulkPatchResult:
        """Apply RFC 6902 patches across multiple resource types in parallel.

        `by_resource` maps each resource type to its `(uid, ops)` pairs;
        every pair across every type runs through the same concurrency
        budget. Resources with empty pair lists are skipped.
        Merges into one `BulkPatchResult`.
        """
        flat: list[tuple[str, str, list[dict[str, Any]]]] = []
        for resource, pairs in by_resource.items():
            for uid, ops in pairs:
                flat.append((resource, uid, _normalise_patch_ops(ops)))
        if not flat:
            return BulkPatchResult()

        semaphore = asyncio.Semaphore(max(1, concurrency))

        async def _one(resource: str, uid: str, ops: list[dict[str, Any]]) -> tuple[str, str, BulkPatchError | None]:
            async with semaphore:
                try:
                    await self._client.patch_raw(f"/api/{resource}/{uid}", body=ops)
                except Dhis2ApiError as exc:
                    return (
                        resource,
                        uid,
                        BulkPatchError(
                            uid=uid,
                            resource=resource,
                            status_code=exc.status_code,
                            message=exc.message,
                        ),
                    )
            return resource, uid, None

        results = await asyncio.gather(*(_one(r, u, o) for r, u, o in flat))
        successful: list[str] = []
        failures: list[BulkPatchError] = []
        for _resource, uid, error in results:
            if error is None:
                successful.append(uid)
            else:
                failures.append(error)
        return BulkPatchResult(successful_uids=successful, failures=failures)

    async def apply_sharing_bulk(
        self,
        resource_type: str,
        uids: Sequence[str],
        sharing: SharingObject | SharingBuilder,
        *,
        concurrency: int = 8,
    ) -> BulkSharingResult:
        """Apply one sharing block to many UIDs of one resource in parallel.

        DHIS2's `/api/sharing` is per-object (one POST per UID). This method
        fans the same `SharingObject` / `SharingBuilder` payload across every
        UID in `uids` under a `concurrency` semaphore (default 8). Useful
        when rolling a single user-group-access pattern across a cohort
        without writing the loop in caller code.

        Per-UID failures do not raise — they land in the returned
        `BulkSharingResult.failures`. Call `.ok` for a bool "every grant
        applied" summary, or inspect `.failures` for row-level detail.
        """
        return await self.apply_sharing_bulk_multi({resource_type: uids}, sharing, concurrency=concurrency)

    async def apply_sharing_bulk_multi(
        self,
        by_resource: Mapping[str, Sequence[str]],
        sharing: SharingObject | SharingBuilder,
        *,
        concurrency: int = 8,
    ) -> BulkSharingResult:
        """Apply one sharing block across multiple resource types in parallel.

        `by_resource` maps each resource type (`"dataSet"`, `"program"`, ...)
        to the UIDs receiving the same sharing payload; every UID across
        every type runs through one `concurrency` budget. Resources with
        empty UID lists are skipped. Merges into one `BulkSharingResult`.
        """
        payload_obj = sharing.to_sharing_object() if isinstance(sharing, SharingBuilder) else sharing
        payload = {"object": payload_obj.model_dump(by_alias=True, exclude_none=True, mode="json")}

        flat: list[tuple[str, str]] = []
        for resource, uids in by_resource.items():
            for uid in uids:
                flat.append((resource, uid))
        if not flat:
            return BulkSharingResult()

        semaphore = asyncio.Semaphore(max(1, concurrency))

        async def _one(resource: str, uid: str) -> tuple[str, str, BulkSharingError | None]:
            async with semaphore:
                try:
                    await self._client.post_raw(
                        "/api/sharing",
                        payload,
                        params={"type": resource, "id": uid},
                    )
                except Dhis2ApiError as exc:
                    return (
                        resource,
                        uid,
                        BulkSharingError(
                            uid=uid,
                            resource=resource,
                            status_code=exc.status_code,
                            message=exc.message,
                        ),
                    )
            return resource, uid, None

        results = await asyncio.gather(*(_one(r, u) for r, u in flat))
        successful: list[str] = []
        failures: list[BulkSharingError] = []
        for _resource, uid, error in results:
            if error is None:
                successful.append(uid)
            else:
                failures.append(error)
        return BulkSharingResult(successful_uids=successful, failures=failures)

    async def dry_run(
        self,
        by_resource: Mapping[str, Sequence[BaseModel | dict[str, Any]]],
        *,
        import_strategy: str = "CREATE_AND_UPDATE",
    ) -> WebMessageResponse:
        """Validate a cross-resource bundle without committing (`importMode=VALIDATE`).

        `by_resource` maps each resource type (e.g. `"dataElements"`,
        `"indicators"`) to the objects that would be imported. Objects can be
        typed pydantic models (auto-dumped via `by_alias + exclude_none`) or
        raw dicts (pass-through). Empty resource entries are skipped.

        Returns the `WebMessageResponse` DHIS2 would have returned on a real
        import — `.import_report().stats` carries the per-type
        created/updated counts; `.conflicts()` lists everything DHIS2 would
        have rejected. Useful as a safety gate in a CI pipeline before a
        real bulk write, or before `delete_bulk` on resources with
        foreign-key dependencies.
        """
        bundle = _bundle_from_by_resource(by_resource)
        if not bundle:
            return WebMessageResponse(
                status=Status.OK, httpStatus="OK", httpStatusCode=200, message="no items supplied"
            )
        raw = await self._client.post_raw(
            "/api/metadata",
            body=bundle,
            params={"importStrategy": import_strategy, "importMode": "VALIDATE"},
        )
        return WebMessageResponse.model_validate(raw)
Functions
__init__(client)

Bind to the sharing client.

Source code in packages/dhis2w-client/src/dhis2w_client/metadata.py
def __init__(self, client: Dhis2Client) -> None:
    """Bind to the sharing client."""
    self._client = client
search(query, *, page_size=50, resource=None, fields=None, exact=False) async

Cross-resource metadata search — UID / code / name, OR-merged.

Fans out len(_SEARCH_FIELDS) concurrent /api/metadata calls, one per match axis (id, code, name), each filtered as <field>:ilike:<query> (or <field>:eq:<query> when exact=True). DHIS2 returns a bundle {dataElements: [...], indicators: [...], ...} per call grouped by resource type; this method merges them, deduplicating by (resource, uid) so an object matching on both id and name doesn't appear twice.

Works uniformly for:

  • Full UID — id:ilike hits the exact record, the other axes usually miss; result is one hit.
  • Partial UID — id:ilike: matches every UID starting with or containing the substring across every resource.
  • Code lookup — code:ilike: matches on the business identifier; indicators / DEs often carry meaningful codes.
  • Name substring — name:ilike: is the broadest match and usually dominates the result set.

Pass resource="dataElements" (etc.) to narrow the result to one resource kind. Pass fields="id,name,code,valueType,domainType" to ask DHIS2 for extra attributes per hit — anything beyond the core four (id / name / code / href) lands on SearchHit.extras. Pass exact=True to switch from ilike substring to eq exact match (useful when a partial-UID search would otherwise match too many siblings).

A single rootJunction=OR call would be cleaner, but DHIS2's /api/metadata endpoint silently ignores rootJunction and ANDs multiple filters (BUGS.md #29), so N requests are the only way to get cross-field OR.

Source code in packages/dhis2w-client/src/dhis2w_client/metadata.py
async def search(
    self,
    query: str,
    *,
    page_size: int = 50,
    resource: str | None = None,
    fields: str | None = None,
    exact: bool = False,
) -> SearchResults:
    """Cross-resource metadata search — UID / code / name, OR-merged.

    Fans out `len(_SEARCH_FIELDS)` concurrent `/api/metadata` calls, one
    per match axis (`id`, `code`, `name`), each filtered as
    `<field>:ilike:<query>` (or `<field>:eq:<query>` when `exact=True`).
    DHIS2 returns a bundle `{dataElements: [...], indicators: [...], ...}`
    per call grouped by resource type; this method merges them,
    deduplicating by `(resource, uid)` so an object matching on both
    id and name doesn't appear twice.

    Works uniformly for:

    - **Full UID** — id:ilike hits the exact record, the other axes
      usually miss; result is one hit.
    - **Partial UID** — id:ilike:<prefix> matches every UID starting
      with or containing the substring across every resource.
    - **Code lookup** — code:ilike:<fragment> matches on the business
      identifier; indicators / DEs often carry meaningful codes.
    - **Name substring** — name:ilike:<fragment> is the broadest
      match and usually dominates the result set.

    Pass `resource="dataElements"` (etc.) to narrow the result to one
    resource kind. Pass `fields="id,name,code,valueType,domainType"`
    to ask DHIS2 for extra attributes per hit — anything beyond the
    core four (`id` / `name` / `code` / `href`) lands on `SearchHit.extras`.
    Pass `exact=True` to switch from `ilike` substring to `eq` exact
    match (useful when a partial-UID search would otherwise match too
    many siblings).

    A single `rootJunction=OR` call would be cleaner, but DHIS2's
    `/api/metadata` endpoint silently ignores `rootJunction` and
    ANDs multiple filters (BUGS.md #29), so N requests are the only
    way to get cross-field OR.
    """
    operator = "eq" if exact else "ilike"
    effective_fields = fields or _SEARCH_DEFAULT_FIELDS
    field_results = await asyncio.gather(
        *(
            self._search_one_field(
                field,
                query,
                operator=operator,
                fields=effective_fields,
                resource=resource,
                page_size=page_size,
            )
            for field in _SEARCH_FIELDS
        ),
    )
    return _merge_search_results(query, field_results)
usage(uid, *, page_size=100) async

Reverse lookup — find every object that references uid.

Two-step workflow: (1) resolve the UID's owning resource via /api/identifiableObjects/{uid} so we know which reference-shapes to look up; (2) fan out concurrent /api/<target>?filter=<path>:eq:<uid> calls against the known reference paths for that owning type.

Coverage is best-effort — the reference map (_USAGE_PATTERNS) encodes the reference shapes most likely to block a delete in practice (dataSets + visualizations + maps + programStages referencing a DE, dashboards referencing a viz/map, categoryCombo references on DEs / dataSets / programs, OU references on users / groups, etc.). Extend _USAGE_PATTERNS when a new shape surfaces.

Returns a SearchResults keyed by target resource — the same shape as search, so CLI rendering reuses cleanly. Empty result means no reference was found on any covered path — caveat: it does not prove the UID is safe to delete if the reference shape isn't in the map.

Raises Dhis2ApiError with status_code=404 when the UID doesn't resolve to any known resource.

Source code in packages/dhis2w-client/src/dhis2w_client/metadata.py
async def usage(
    self,
    uid: str,
    *,
    page_size: int = 100,
) -> SearchResults:
    """Reverse lookup — find every object that references `uid`.

    Two-step workflow: (1) resolve the UID's owning resource via
    `/api/identifiableObjects/{uid}` so we know which reference-shapes
    to look up; (2) fan out concurrent `/api/<target>?filter=<path>:eq:<uid>`
    calls against the known reference paths for that owning type.

    Coverage is best-effort — the reference map (`_USAGE_PATTERNS`)
    encodes the reference shapes most likely to block a delete in
    practice (dataSets + visualizations + maps + programStages
    referencing a DE, dashboards referencing a viz/map, categoryCombo
    references on DEs / dataSets / programs, OU references on users /
    groups, etc.). Extend `_USAGE_PATTERNS` when a new shape surfaces.

    Returns a `SearchResults` keyed by target resource — the same
    shape as `search`, so CLI rendering reuses cleanly. Empty result
    means no reference was found on any covered path — caveat: it
    does not prove the UID is safe to delete if the reference shape
    isn't in the map.

    Raises `Dhis2ApiError` with `status_code=404` when the UID
    doesn't resolve to any known resource.
    """
    owning = await self._resolve_resource(uid)
    patterns = _USAGE_PATTERNS.get(owning, ())
    if not patterns:
        return SearchResults(query=uid, hits={})
    query_results = await asyncio.gather(
        *(
            self._usage_query(target, template.format(uid=uid), page_size=page_size)
            for target, template in patterns
        ),
    )
    return _merge_search_results(uid, query_results)
delete_bulk(resource_type, uids) async

Delete every UID in uids from one DHIS2 resource type in a single request.

Wraps POST /api/metadata?importStrategy=DELETE&atomicMode=NONE with a minimal {resource_type: [{"id": uid}, ...]} bundle. Returns the WebMessageResponse envelope — .import_count().deleted reports the total rows deleted; .conflicts() lists anything DHIS2 refused (foreign-key constraints, soft-delete protection, etc.).

atomicMode=NONE lets partial failures through: some UIDs deleted, some held back with a conflict. Switch to delete_bulk_multi with atomic semantics when every row must delete or none should. Empty uids short-circuits with a no-op envelope (no HTTP call).

Source code in packages/dhis2w-client/src/dhis2w_client/metadata.py
async def delete_bulk(self, resource_type: str, uids: Sequence[str]) -> WebMessageResponse:
    """Delete every UID in `uids` from one DHIS2 resource type in a single request.

    Wraps `POST /api/metadata?importStrategy=DELETE&atomicMode=NONE` with a
    minimal `{resource_type: [{"id": uid}, ...]}` bundle. Returns the
    `WebMessageResponse` envelope — `.import_count().deleted` reports the
    total rows deleted; `.conflicts()` lists anything DHIS2 refused
    (foreign-key constraints, soft-delete protection, etc.).

    `atomicMode=NONE` lets partial failures through: some UIDs deleted,
    some held back with a conflict. Switch to `delete_bulk_multi` with
    atomic semantics when every row must delete or none should.
    Empty `uids` short-circuits with a no-op envelope (no HTTP call).
    """
    return await self.delete_bulk_multi({resource_type: list(uids)})
delete_bulk_multi(by_resource, *, atomic_mode='NONE') async

Delete across multiple resource types in one /api/metadata call.

by_resource maps each resource type (e.g. "dataElements", "indicators") to the UIDs to delete for that type. Entries with empty UID lists are skipped. atomic_mode controls DHIS2's partial-failure behaviour: "NONE" (default) lets individual conflicts through, "ALL" rolls the entire bundle back on any conflict.

Source code in packages/dhis2w-client/src/dhis2w_client/metadata.py
async def delete_bulk_multi(
    self,
    by_resource: Mapping[str, Sequence[str]],
    *,
    atomic_mode: str = "NONE",
) -> WebMessageResponse:
    """Delete across multiple resource types in one `/api/metadata` call.

    `by_resource` maps each resource type (e.g. `"dataElements"`,
    `"indicators"`) to the UIDs to delete for that type. Entries with
    empty UID lists are skipped. `atomic_mode` controls DHIS2's
    partial-failure behaviour: `"NONE"` (default) lets individual
    conflicts through, `"ALL"` rolls the entire bundle back on any
    conflict.
    """
    bundle: dict[str, list[dict[str, str]]] = {
        resource: [{"id": uid} for uid in uids] for resource, uids in by_resource.items() if uids
    }
    if not bundle:
        return WebMessageResponse(status=Status.OK, httpStatus="OK", httpStatusCode=200, message="no uids supplied")
    raw = await self._client.post_raw(
        "/api/metadata",
        body=bundle,
        params={"importStrategy": "DELETE", "atomicMode": atomic_mode},
    )
    return WebMessageResponse.model_validate(raw)
patch_bulk(resource_type, patches, *, concurrency=8) async

Apply RFC 6902 patches to many UIDs on one resource in parallel.

patches is a list of (uid, ops) pairs. ops can carry typed JsonPatchOp models (auto-dumped via by_alias + exclude_none) or raw dicts already matching the RFC 6902 shape. DHIS2 does not expose a single bulk-PATCH endpoint, so this is client-side fan-out over PATCH /api/<resource>/<uid>concurrency caps simultaneous in-flight requests (default 8, a sensible sweet spot against a single DHIS2 node).

Per-UID failures do not raise — they land in the returned BulkPatchResult.failures. Call .ok for a bool "every patch applied" summary, or inspect .failures for row-level detail.

Source code in packages/dhis2w-client/src/dhis2w_client/metadata.py
async def patch_bulk(
    self,
    resource_type: str,
    patches: Sequence[tuple[str, Sequence[JsonPatchOp | dict[str, Any]]]],
    *,
    concurrency: int = 8,
) -> BulkPatchResult:
    """Apply RFC 6902 patches to many UIDs on one resource in parallel.

    `patches` is a list of `(uid, ops)` pairs. `ops` can carry typed
    `JsonPatchOp` models (auto-dumped via `by_alias + exclude_none`)
    or raw dicts already matching the RFC 6902 shape. DHIS2 does not
    expose a single bulk-PATCH endpoint, so this is client-side
    fan-out over `PATCH /api/<resource>/<uid>` — `concurrency` caps
    simultaneous in-flight requests (default 8, a sensible sweet
    spot against a single DHIS2 node).

    Per-UID failures do not raise — they land in the returned
    `BulkPatchResult.failures`. Call `.ok` for a bool "every patch
    applied" summary, or inspect `.failures` for row-level detail.
    """
    return await self.patch_bulk_multi({resource_type: patches}, concurrency=concurrency)
patch_bulk_multi(by_resource, *, concurrency=8) async

Apply RFC 6902 patches across multiple resource types in parallel.

by_resource maps each resource type to its (uid, ops) pairs; every pair across every type runs through the same concurrency budget. Resources with empty pair lists are skipped. Merges into one BulkPatchResult.

Source code in packages/dhis2w-client/src/dhis2w_client/metadata.py
async def patch_bulk_multi(
    self,
    by_resource: Mapping[str, Sequence[tuple[str, Sequence[JsonPatchOp | dict[str, Any]]]]],
    *,
    concurrency: int = 8,
) -> BulkPatchResult:
    """Apply RFC 6902 patches across multiple resource types in parallel.

    `by_resource` maps each resource type to its `(uid, ops)` pairs;
    every pair across every type runs through the same concurrency
    budget. Resources with empty pair lists are skipped.
    Merges into one `BulkPatchResult`.
    """
    flat: list[tuple[str, str, list[dict[str, Any]]]] = []
    for resource, pairs in by_resource.items():
        for uid, ops in pairs:
            flat.append((resource, uid, _normalise_patch_ops(ops)))
    if not flat:
        return BulkPatchResult()

    semaphore = asyncio.Semaphore(max(1, concurrency))

    async def _one(resource: str, uid: str, ops: list[dict[str, Any]]) -> tuple[str, str, BulkPatchError | None]:
        async with semaphore:
            try:
                await self._client.patch_raw(f"/api/{resource}/{uid}", body=ops)
            except Dhis2ApiError as exc:
                return (
                    resource,
                    uid,
                    BulkPatchError(
                        uid=uid,
                        resource=resource,
                        status_code=exc.status_code,
                        message=exc.message,
                    ),
                )
        return resource, uid, None

    results = await asyncio.gather(*(_one(r, u, o) for r, u, o in flat))
    successful: list[str] = []
    failures: list[BulkPatchError] = []
    for _resource, uid, error in results:
        if error is None:
            successful.append(uid)
        else:
            failures.append(error)
    return BulkPatchResult(successful_uids=successful, failures=failures)
apply_sharing_bulk(resource_type, uids, sharing, *, concurrency=8) async

Apply one sharing block to many UIDs of one resource in parallel.

DHIS2's /api/sharing is per-object (one POST per UID). This method fans the same SharingObject / SharingBuilder payload across every UID in uids under a concurrency semaphore (default 8). Useful when rolling a single user-group-access pattern across a cohort without writing the loop in caller code.

Per-UID failures do not raise — they land in the returned BulkSharingResult.failures. Call .ok for a bool "every grant applied" summary, or inspect .failures for row-level detail.

Source code in packages/dhis2w-client/src/dhis2w_client/metadata.py
async def apply_sharing_bulk(
    self,
    resource_type: str,
    uids: Sequence[str],
    sharing: SharingObject | SharingBuilder,
    *,
    concurrency: int = 8,
) -> BulkSharingResult:
    """Apply one sharing block to many UIDs of one resource in parallel.

    DHIS2's `/api/sharing` is per-object (one POST per UID). This method
    fans the same `SharingObject` / `SharingBuilder` payload across every
    UID in `uids` under a `concurrency` semaphore (default 8). Useful
    when rolling a single user-group-access pattern across a cohort
    without writing the loop in caller code.

    Per-UID failures do not raise — they land in the returned
    `BulkSharingResult.failures`. Call `.ok` for a bool "every grant
    applied" summary, or inspect `.failures` for row-level detail.
    """
    return await self.apply_sharing_bulk_multi({resource_type: uids}, sharing, concurrency=concurrency)
apply_sharing_bulk_multi(by_resource, sharing, *, concurrency=8) async

Apply one sharing block across multiple resource types in parallel.

by_resource maps each resource type ("dataSet", "program", ...) to the UIDs receiving the same sharing payload; every UID across every type runs through one concurrency budget. Resources with empty UID lists are skipped. Merges into one BulkSharingResult.

Source code in packages/dhis2w-client/src/dhis2w_client/metadata.py
async def apply_sharing_bulk_multi(
    self,
    by_resource: Mapping[str, Sequence[str]],
    sharing: SharingObject | SharingBuilder,
    *,
    concurrency: int = 8,
) -> BulkSharingResult:
    """Apply one sharing block across multiple resource types in parallel.

    `by_resource` maps each resource type (`"dataSet"`, `"program"`, ...)
    to the UIDs receiving the same sharing payload; every UID across
    every type runs through one `concurrency` budget. Resources with
    empty UID lists are skipped. Merges into one `BulkSharingResult`.
    """
    payload_obj = sharing.to_sharing_object() if isinstance(sharing, SharingBuilder) else sharing
    payload = {"object": payload_obj.model_dump(by_alias=True, exclude_none=True, mode="json")}

    flat: list[tuple[str, str]] = []
    for resource, uids in by_resource.items():
        for uid in uids:
            flat.append((resource, uid))
    if not flat:
        return BulkSharingResult()

    semaphore = asyncio.Semaphore(max(1, concurrency))

    async def _one(resource: str, uid: str) -> tuple[str, str, BulkSharingError | None]:
        async with semaphore:
            try:
                await self._client.post_raw(
                    "/api/sharing",
                    payload,
                    params={"type": resource, "id": uid},
                )
            except Dhis2ApiError as exc:
                return (
                    resource,
                    uid,
                    BulkSharingError(
                        uid=uid,
                        resource=resource,
                        status_code=exc.status_code,
                        message=exc.message,
                    ),
                )
        return resource, uid, None

    results = await asyncio.gather(*(_one(r, u) for r, u in flat))
    successful: list[str] = []
    failures: list[BulkSharingError] = []
    for _resource, uid, error in results:
        if error is None:
            successful.append(uid)
        else:
            failures.append(error)
    return BulkSharingResult(successful_uids=successful, failures=failures)
dry_run(by_resource, *, import_strategy='CREATE_AND_UPDATE') async

Validate a cross-resource bundle without committing (importMode=VALIDATE).

by_resource maps each resource type (e.g. "dataElements", "indicators") to the objects that would be imported. Objects can be typed pydantic models (auto-dumped via by_alias + exclude_none) or raw dicts (pass-through). Empty resource entries are skipped.

Returns the WebMessageResponse DHIS2 would have returned on a real import — .import_report().stats carries the per-type created/updated counts; .conflicts() lists everything DHIS2 would have rejected. Useful as a safety gate in a CI pipeline before a real bulk write, or before delete_bulk on resources with foreign-key dependencies.

Source code in packages/dhis2w-client/src/dhis2w_client/metadata.py
async def dry_run(
    self,
    by_resource: Mapping[str, Sequence[BaseModel | dict[str, Any]]],
    *,
    import_strategy: str = "CREATE_AND_UPDATE",
) -> WebMessageResponse:
    """Validate a cross-resource bundle without committing (`importMode=VALIDATE`).

    `by_resource` maps each resource type (e.g. `"dataElements"`,
    `"indicators"`) to the objects that would be imported. Objects can be
    typed pydantic models (auto-dumped via `by_alias + exclude_none`) or
    raw dicts (pass-through). Empty resource entries are skipped.

    Returns the `WebMessageResponse` DHIS2 would have returned on a real
    import — `.import_report().stats` carries the per-type
    created/updated counts; `.conflicts()` lists everything DHIS2 would
    have rejected. Useful as a safety gate in a CI pipeline before a
    real bulk write, or before `delete_bulk` on resources with
    foreign-key dependencies.
    """
    bundle = _bundle_from_by_resource(by_resource)
    if not bundle:
        return WebMessageResponse(
            status=Status.OK, httpStatus="OK", httpStatusCode=200, message="no items supplied"
        )
    raw = await self._client.post_raw(
        "/api/metadata",
        body=bundle,
        params={"importStrategy": import_strategy, "importMode": "VALIDATE"},
    )
    return WebMessageResponse.model_validate(raw)