Tasks module¶
client.tasks — block on DHIS2 background jobs (analytics refresh, metadata import, predictor runs, etc.). DHIS2's task endpoints (/api/system/tasks/{jobType} and /api/system/taskSummaries/{jobType}) report job progress as a stream of Notification entries; this module polls them, de-dupes, and resolves a typed TaskCompletion when completed=True lands on the feed.
When to reach for it¶
- After kicking off any async DHIS2 endpoint (
POST /api/resourceTables/analytics,POST /api/predictors/run,POST /api/dataAnalysis/validationRules, an/api/metadataimport withasync=true, …). - When you want a Rich progress display in the terminal — call
iter_notificationsinstead ofawait_completionand render each entry as it arrives. - In a test that needs to wait for a real DHIS2 side-effect to land before asserting.
Worked example — kick off + block + branch on completion¶
from dhis2w_client import WebMessageResponse
from dhis2w_client.v42.tasks import TaskTimeoutError
from dhis2w_core.client_context import open_client
from dhis2w_core.profile import profile_from_env
async with open_client(profile_from_env()) as client:
# 1. Kick off an analytics resource-table refresh via the raw HTTP path.
# DHIS2 returns a WebMessage envelope; `.task_ref()` extracts the
# `(job_type, uid)` tuple await_completion takes.
raw = await client.post_raw(
"/api/resourceTables/analytics", params={"lastYears": 1}
)
envelope = WebMessageResponse.model_validate(raw)
ref = envelope.task_ref()
if ref is None:
print("no task-ref in response — nothing to watch")
return
job_type, task_uid = ref
print(f"kicked off {job_type}/{task_uid}")
# 2. Block until DHIS2 marks the job complete.
try:
completion = await client.tasks.await_completion(
ref,
timeout=300.0,
poll_interval=2.0,
)
except TaskTimeoutError as exc:
# The partial notification list is reachable via the iterator path,
# not this exception — the exception only carries the timeout message.
print(f"timed out: {exc}")
return
last = completion.final
print(f"done in {len(completion.notifications)} notifications")
print(f" last: level={last.level} message={last.message!r}")
Two failure shapes to handle¶
TaskTimeoutError— the polling loop hittimeoutbefore DHIS2 marked the job complete. Passtimeout=Nonefor jobs that can legitimately take hours (full analytics rebuilds on large datasets).completion.final.level == "ERROR"— the job finished but DHIS2 marked it failed. Thenotificationslist shows the trail; the last entry typically carries the actionable message.
if completion.final.level == "ERROR":
raise RuntimeError(f"task {job_type}/{task_uid} failed: {completion.final.message}")
Streaming notifications (Rich progress / SSE bridges)¶
If you want to render each notification as it arrives instead of waiting for the final one, use iter_notifications:
async for notification in client.tasks.iter_notifications(ref, timeout=600):
print(f" [{notification.level}] {notification.message}")
if notification.completed:
break
parse_task_ref(...) converts either a (job_type, uid) tuple or a "JOB_TYPE/uid" string into the canonical tuple form — handy when wiring the awaiter to call sites that get the ref from different shapes.
Related example¶
examples/v42/client/task_await.py— end-to-end analytics-refresh kick-off + block.
tasks
¶
Client-level task awaiter for DHIS2 background jobs.
Every async DHIS2 operation (analytics refresh, metadata import, data-integrity
run, tracker async push) returns a WebMessageResponse carrying
jobType + task UID. Callers historically had two options: (a) roll
their own polling loop against /api/system/tasks/{type}/{uid}, or (b)
go through the plugin-layer CLI --watch flag. Neither works when you
want to block a library script on job completion.
client.tasks.await_completion(task_ref) is that helper. Polls the
task-status feed on the already-open HTTP connection (no new client per
poll, unlike the profile-based watch_task in dhis2w-core), de-dupes
notifications by their identifier, and returns a typed TaskCompletion
once completed=True lands.
Caveats:
- Poll interval defaults to 2.0s. DHIS2's notification feed updates at best every ~1s, so faster polling just burns request quota.
- Timeout defaults to 600.0s (10 min). Analytics refreshes on large
instances can run longer — pass
timeout=Noneto block forever, ortimeout=3600etc.
Classes¶
TaskTimeoutError
¶
TaskCompletion
¶
Bases: BaseModel
Result of awaiting a DHIS2 background task — every notification + the terminal row.
Source code in packages/dhis2w-client/src/dhis2w_client/v42/tasks.py
TaskModule
¶
Accessor bound to a Dhis2Client exposing background-task polling.
Source code in packages/dhis2w-client/src/dhis2w_client/v42/tasks.py
Functions¶
__init__(client)
¶
await_completion(task_ref, *, timeout=600.0, poll_interval=2.0)
async
¶
Block until the task completes; return a typed TaskCompletion.
Polls /api/system/tasks/{job_type}/{uid} every poll_interval
seconds until a notification with completed=True arrives. Raises
TaskTimeoutError if timeout elapses first (pass None for no
timeout). De-dupes notifications by uid/id/time so repeated
polls don't surface the same entries twice.
Source code in packages/dhis2w-client/src/dhis2w_client/v42/tasks.py
iter_notifications(task_ref, *, timeout=600.0, poll_interval=2.0)
async
¶
Yield each notification as it arrives; stop when completed=True or timeout.
Separate from await_completion so CLI renderers (Rich progress
bars, server-sent-event bridges, etc.) can render each entry
incrementally instead of waiting for the final result.
Source code in packages/dhis2w-client/src/dhis2w_client/v42/tasks.py
Functions¶
parse_task_ref(task_ref)
¶
Normalise (job_type, uid) or "job_type/uid" into a (job_type, uid) tuple.
Convenience for callers that store task refs as a single string — matches
what the CLI task-watch flag prints. Strings are split on the last /.