Skip to content

Tasks module

client.tasks — block on DHIS2 background jobs (analytics refresh, metadata import, predictor runs, etc.). DHIS2's task endpoints (/api/system/tasks/{jobType} and /api/system/taskSummaries/{jobType}) report job progress as a stream of Notification entries; this module polls them, de-dupes, and resolves a typed TaskCompletion when completed=True lands on the feed.

When to reach for it

  • After kicking off any async DHIS2 endpoint (POST /api/resourceTables/analytics, POST /api/predictors/run, POST /api/dataAnalysis/validationRules, an /api/metadata import with async=true, …).
  • When you want a Rich progress display in the terminal — call iter_notifications instead of await_completion and render each entry as it arrives.
  • In a test that needs to wait for a real DHIS2 side-effect to land before asserting.

Worked example — kick off + block + branch on completion

from dhis2w_client import WebMessageResponse
from dhis2w_client.v42.tasks import TaskTimeoutError
from dhis2w_core.client_context import open_client
from dhis2w_core.profile import profile_from_env

async with open_client(profile_from_env()) as client:
    # 1. Kick off an analytics resource-table refresh via the raw HTTP path.
    #    DHIS2 returns a WebMessage envelope; `.task_ref()` extracts the
    #    `(job_type, uid)` tuple await_completion takes.
    raw = await client.post_raw(
        "/api/resourceTables/analytics", params={"lastYears": 1}
    )
    envelope = WebMessageResponse.model_validate(raw)
    ref = envelope.task_ref()
    if ref is None:
        print("no task-ref in response — nothing to watch")
        return
    job_type, task_uid = ref
    print(f"kicked off {job_type}/{task_uid}")

    # 2. Block until DHIS2 marks the job complete.
    try:
        completion = await client.tasks.await_completion(
            ref,
            timeout=300.0,
            poll_interval=2.0,
        )
    except TaskTimeoutError as exc:
        # The partial notification list is reachable via the iterator path,
        # not this exception — the exception only carries the timeout message.
        print(f"timed out: {exc}")
        return

    last = completion.final
    print(f"done in {len(completion.notifications)} notifications")
    print(f"  last: level={last.level} message={last.message!r}")

Two failure shapes to handle

  • TaskTimeoutError — the polling loop hit timeout before DHIS2 marked the job complete. Pass timeout=None for jobs that can legitimately take hours (full analytics rebuilds on large datasets).
  • completion.final.level == "ERROR" — the job finished but DHIS2 marked it failed. The notifications list shows the trail; the last entry typically carries the actionable message.
if completion.final.level == "ERROR":
    raise RuntimeError(f"task {job_type}/{task_uid} failed: {completion.final.message}")

Streaming notifications (Rich progress / SSE bridges)

If you want to render each notification as it arrives instead of waiting for the final one, use iter_notifications:

async for notification in client.tasks.iter_notifications(ref, timeout=600):
    print(f"  [{notification.level}] {notification.message}")
    if notification.completed:
        break

parse_task_ref(...) converts either a (job_type, uid) tuple or a "JOB_TYPE/uid" string into the canonical tuple form — handy when wiring the awaiter to call sites that get the ref from different shapes.

tasks

Client-level task awaiter for DHIS2 background jobs.

Every async DHIS2 operation (analytics refresh, metadata import, data-integrity run, tracker async push) returns a WebMessageResponse carrying jobType + task UID. Callers historically had two options: (a) roll their own polling loop against /api/system/tasks/{type}/{uid}, or (b) go through the plugin-layer CLI --watch flag. Neither works when you want to block a library script on job completion.

client.tasks.await_completion(task_ref) is that helper. Polls the task-status feed on the already-open HTTP connection (no new client per poll, unlike the profile-based watch_task in dhis2w-core), de-dupes notifications by their identifier, and returns a typed TaskCompletion once completed=True lands.

Caveats:

  • Poll interval defaults to 2.0s. DHIS2's notification feed updates at best every ~1s, so faster polling just burns request quota.
  • Timeout defaults to 600.0s (10 min). Analytics refreshes on large instances can run longer — pass timeout=None to block forever, or timeout=3600 etc.

Classes

TaskTimeoutError

Bases: TimeoutError

Raised when await_completion / iter_notifications exceed the timeout.

Source code in packages/dhis2w-client/src/dhis2w_client/v42/tasks.py
class TaskTimeoutError(TimeoutError):
    """Raised when `await_completion` / `iter_notifications` exceed the timeout."""

TaskCompletion

Bases: BaseModel

Result of awaiting a DHIS2 background task — every notification + the terminal row.

Source code in packages/dhis2w-client/src/dhis2w_client/v42/tasks.py
class TaskCompletion(BaseModel):
    """Result of awaiting a DHIS2 background task — every notification + the terminal row."""

    model_config = ConfigDict(frozen=True)

    job_type: str
    task_uid: str
    notifications: list[Notification]
    final: Notification

    @property
    def level(self) -> str:
        """Level of the completing notification (typically INFO for success, ERROR for failure)."""
        return (self.final.level or "INFO").upper()

    @property
    def message(self) -> str:
        """Message from the completing notification — typically a short summary."""
        return self.final.message or ""
Attributes
level property

Level of the completing notification (typically INFO for success, ERROR for failure).

message property

Message from the completing notification — typically a short summary.

TaskModule

Accessor bound to a Dhis2Client exposing background-task polling.

Source code in packages/dhis2w-client/src/dhis2w_client/v42/tasks.py
class TaskModule:
    """Accessor bound to a `Dhis2Client` exposing background-task polling."""

    def __init__(self, client: Dhis2Client) -> None:
        """Bind to the sharing client — reuses its auth + HTTP pool for every poll."""
        self._client = client

    async def await_completion(
        self,
        task_ref: tuple[str, str] | str,
        *,
        timeout: float | None = 600.0,
        poll_interval: float = 2.0,
    ) -> TaskCompletion:
        """Block until the task completes; return a typed `TaskCompletion`.

        Polls `/api/system/tasks/{job_type}/{uid}` every `poll_interval`
        seconds until a notification with `completed=True` arrives. Raises
        `TaskTimeoutError` if `timeout` elapses first (pass `None` for no
        timeout). De-dupes notifications by `uid`/`id`/`time` so repeated
        polls don't surface the same entries twice.
        """
        notifications: list[Notification] = []
        async for notification in self.iter_notifications(task_ref, timeout=timeout, poll_interval=poll_interval):
            notifications.append(notification)
            if notification.completed:
                job_type, task_uid = parse_task_ref(task_ref)
                return TaskCompletion(
                    job_type=job_type,
                    task_uid=task_uid,
                    notifications=notifications,
                    final=notification,
                )
        # Reached when timeout expires before any `completed=True` row.
        raise TaskTimeoutError(f"task {task_ref!r} did not complete within the iterator's timeout")

    async def iter_notifications(
        self,
        task_ref: tuple[str, str] | str,
        *,
        timeout: float | None = 600.0,
        poll_interval: float = 2.0,
    ) -> AsyncIterator[Notification]:
        """Yield each notification as it arrives; stop when `completed=True` or timeout.

        Separate from `await_completion` so CLI renderers (Rich progress
        bars, server-sent-event bridges, etc.) can render each entry
        incrementally instead of waiting for the final result.
        """
        job_type, task_uid = parse_task_ref(task_ref)
        path = f"/api/system/tasks/{job_type}/{task_uid}"
        deadline = None if timeout is None else asyncio.get_running_loop().time() + timeout
        seen: set[str] = set()
        while True:
            raw = await self._client.get_raw(path)
            data = raw.get("data")
            items: list[object] = data if isinstance(data, list) else []
            # DHIS2 returns newest-first; yield oldest-first so callers see chronological order.
            for entry in reversed(items):
                notification = Notification.model_validate(entry)
                identifier = (
                    notification.uid
                    or notification.id
                    or (notification.time.isoformat() if notification.time is not None else "")
                )
                if identifier and identifier in seen:
                    continue
                if identifier:
                    seen.add(identifier)
                yield notification
                if notification.completed:
                    return
            if deadline is not None and asyncio.get_running_loop().time() >= deadline:
                raise TaskTimeoutError(f"task {job_type}/{task_uid} did not complete within {timeout}s")
            await asyncio.sleep(poll_interval)
Functions
__init__(client)

Bind to the sharing client — reuses its auth + HTTP pool for every poll.

Source code in packages/dhis2w-client/src/dhis2w_client/v42/tasks.py
def __init__(self, client: Dhis2Client) -> None:
    """Bind to the sharing client — reuses its auth + HTTP pool for every poll."""
    self._client = client
await_completion(task_ref, *, timeout=600.0, poll_interval=2.0) async

Block until the task completes; return a typed TaskCompletion.

Polls /api/system/tasks/{job_type}/{uid} every poll_interval seconds until a notification with completed=True arrives. Raises TaskTimeoutError if timeout elapses first (pass None for no timeout). De-dupes notifications by uid/id/time so repeated polls don't surface the same entries twice.

Source code in packages/dhis2w-client/src/dhis2w_client/v42/tasks.py
async def await_completion(
    self,
    task_ref: tuple[str, str] | str,
    *,
    timeout: float | None = 600.0,
    poll_interval: float = 2.0,
) -> TaskCompletion:
    """Block until the task completes; return a typed `TaskCompletion`.

    Polls `/api/system/tasks/{job_type}/{uid}` every `poll_interval`
    seconds until a notification with `completed=True` arrives. Raises
    `TaskTimeoutError` if `timeout` elapses first (pass `None` for no
    timeout). De-dupes notifications by `uid`/`id`/`time` so repeated
    polls don't surface the same entries twice.
    """
    notifications: list[Notification] = []
    async for notification in self.iter_notifications(task_ref, timeout=timeout, poll_interval=poll_interval):
        notifications.append(notification)
        if notification.completed:
            job_type, task_uid = parse_task_ref(task_ref)
            return TaskCompletion(
                job_type=job_type,
                task_uid=task_uid,
                notifications=notifications,
                final=notification,
            )
    # Reached when timeout expires before any `completed=True` row.
    raise TaskTimeoutError(f"task {task_ref!r} did not complete within the iterator's timeout")
iter_notifications(task_ref, *, timeout=600.0, poll_interval=2.0) async

Yield each notification as it arrives; stop when completed=True or timeout.

Separate from await_completion so CLI renderers (Rich progress bars, server-sent-event bridges, etc.) can render each entry incrementally instead of waiting for the final result.

Source code in packages/dhis2w-client/src/dhis2w_client/v42/tasks.py
async def iter_notifications(
    self,
    task_ref: tuple[str, str] | str,
    *,
    timeout: float | None = 600.0,
    poll_interval: float = 2.0,
) -> AsyncIterator[Notification]:
    """Yield each notification as it arrives; stop when `completed=True` or timeout.

    Separate from `await_completion` so CLI renderers (Rich progress
    bars, server-sent-event bridges, etc.) can render each entry
    incrementally instead of waiting for the final result.
    """
    job_type, task_uid = parse_task_ref(task_ref)
    path = f"/api/system/tasks/{job_type}/{task_uid}"
    deadline = None if timeout is None else asyncio.get_running_loop().time() + timeout
    seen: set[str] = set()
    while True:
        raw = await self._client.get_raw(path)
        data = raw.get("data")
        items: list[object] = data if isinstance(data, list) else []
        # DHIS2 returns newest-first; yield oldest-first so callers see chronological order.
        for entry in reversed(items):
            notification = Notification.model_validate(entry)
            identifier = (
                notification.uid
                or notification.id
                or (notification.time.isoformat() if notification.time is not None else "")
            )
            if identifier and identifier in seen:
                continue
            if identifier:
                seen.add(identifier)
            yield notification
            if notification.completed:
                return
        if deadline is not None and asyncio.get_running_loop().time() >= deadline:
            raise TaskTimeoutError(f"task {job_type}/{task_uid} did not complete within {timeout}s")
        await asyncio.sleep(poll_interval)

Functions

parse_task_ref(task_ref)

Normalise (job_type, uid) or "job_type/uid" into a (job_type, uid) tuple.

Convenience for callers that store task refs as a single string — matches what the CLI task-watch flag prints. Strings are split on the last /.

Source code in packages/dhis2w-client/src/dhis2w_client/v42/tasks.py
def parse_task_ref(task_ref: tuple[str, str] | str) -> tuple[str, str]:
    """Normalise `(job_type, uid)` or `"job_type/uid"` into a `(job_type, uid)` tuple.

    Convenience for callers that store task refs as a single string — matches
    what the CLI task-watch flag prints. Strings are split on the last `/`.
    """
    if isinstance(task_ref, tuple):
        return task_ref
    if "/" not in task_ref:
        raise ValueError(f"task_ref string must be 'JOB_TYPE/uid', got {task_ref!r}")
    job_type, _, uid = task_ref.rpartition("/")
    if not job_type or not uid:
        raise ValueError(f"task_ref string must be 'JOB_TYPE/uid' with both halves, got {task_ref!r}")
    return job_type, uid