Data values (streaming import)¶
DataValuesAccessor on Dhis2Client.data_values — streams uploads to POST /api/dataValueSets without buffering the whole payload in memory. Accepts JSON / XML / CSV / ADX content types. For the typed-list-of-DataValue case, see Aggregate data values; the streaming accessor is the right tool when the payload is too large to materialise (CSV with hundreds of thousands of rows, etc.).
When to reach for it¶
- Importing a CSV / JSON file that's larger than the host's free RAM.
- Pipe-style imports where the source is an
AsyncIterable[bytes](e.g. a transform step that emits a row at a time). - Mixed-DataSet writes on DHIS2 v43 — the grouped path is the workaround for BUGS #35.
Worked example — stream a CSV file to DHIS2¶
from pathlib import Path
from dhis2w_core.client_context import open_client
from dhis2w_core.profile import profile_from_env
async with open_client(profile_from_env()) as client:
# `stream` takes a Path (or any AsyncIterable[bytes]) + a content type.
# The body is sent chunked; httpx never materialises the whole file.
envelope = await client.data_values.stream(
Path("./monthly-coverage-2026.csv"),
content_type="application/csv",
)
count = envelope.import_count()
if envelope.status == "OK" and count:
print(f"imported {count.imported} updated {count.updated} ignored {count.ignored}")
else:
print(f"status={envelope.status!r} message={envelope.message!r}")
Worked example — typed DataValue write (small batch)¶
from dhis2w_client import DataValue
values = [
DataValue(
dataElement="fbfJHSPpUQD",
period="202604",
orgUnit="ImspTQPwCqd",
categoryOptionCombo="HllvX50cXC0",
attributeOptionCombo="HllvX50cXC0",
value="42",
),
]
async with open_client(profile_from_env()) as client:
# `import_grouped_by_dataset` is the cross-version write path
# (required on v43 for DEs in multiple DataSets — BUGS #35).
# Returns `list[WebMessageResponse]` — one envelope per DataSet group.
envelopes = await client.data_values.import_grouped_by_dataset(values)
for env in envelopes:
count = env.import_count()
print(f" status={env.status} imported={count.imported if count else '?'}")
Related examples¶
examples/v42/client/stream_data_values.py— four streaming shapes (bytes, sync generator, Path/CSV, 1000-row file with timing).examples/v43/client/aggregate_bulk_grouped.py— the grouped path against a v43 stack.
data_values
¶
Streaming data-value-set import — client.data_values.stream.
DHIS2's POST /api/dataValueSets accepts JSON, XML, CSV, and ADX payloads.
For a 100k-row push (a typical month-end aggregate upload), buffering the
whole body in Python memory before the POST is the thing to avoid:
- A 100k-row JSON payload sits at ~30-60 MB on the wire, and the Python parsed shape is 3-5x that — so ~150 MB resident just to stage the request.
- The same payload on CSV is ~8 MB; XML is in between.
client.data_values.stream(source, content_type) feeds httpx's chunked
transfer encoding directly, so the payload never sits fully in memory
on the client side. The server consumes it as it arrives.
source accepts any of:
pathlib.Path— opens the file and chunks it through.bytes/bytearray— single-shot for callers who already have the body assembled but want the typedWebMessageResponseenvelope.Iterable[bytes]/AsyncIterable[bytes]— pass-through for generators that build the body on the fly (e.g. DB-row → CSV line).- File-like with
.read(size) -> bytes(sync or async) — adapted to a chunk iterator.
Supported content_type values map to the DHIS2-accepted MIME types:
application/json(default)application/xmlapplication/csv(also accepted:text/csv)application/adx+xml
Classes¶
DataValuesAccessor
¶
Dhis2Client.data_values — streaming uploads to /api/dataValueSets.
Stateless wrapper over the streaming POST path. Stay here for the large
import cases; use dhis2w_core.plugins.aggregate.service.push_data_values
when the payload is already a small in-memory list of typed data values.
Source code in packages/dhis2w-client/src/dhis2w_client/v42/data_values.py
52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 | |
Functions¶
__init__(client)
¶
stream(source, *, content_type='application/json', dry_run=False, preheat_cache=True, import_strategy=None, id_scheme=None, data_element_id_scheme=None, org_unit_id_scheme=None, skip_audit=False, async_job=False, chunk_size=_DEFAULT_CHUNK_SIZE)
async
¶
Stream source to POST /api/dataValueSets and return the typed envelope.
content_type picks which DHIS2 parser handles the body (JSON / XML /
CSV / ADX). Every param from the standard /api/dataValueSets surface
is forwarded via query string:
dry_run→dryRun=true: validate without committing.preheat_cache=False→preheatCache=false.import_strategy:CREATE/UPDATE/CREATE_AND_UPDATE/DELETE.id_scheme/data_element_id_scheme/org_unit_id_scheme: pick the identifier scheme for the payload (UID/CODE/NAME/ ...).skip_audit=True→skipAudit=true.async_job=True→async=true: DHIS2 queues the import as a job and the returned envelope carriesresponse.jobType/response.id. Poll withclient.tasks.await_completion(envelope.task_ref()).
Returns a WebMessageResponse. For synchronous imports,
envelope.import_count() gives ImportCount.imported / updated /
ignored / deleted; envelope.conflicts() lists per-row rejections.
Async imports return the task-ref envelope — poll it to completion
to get the final report from DHIS2.
Source code in packages/dhis2w-client/src/dhis2w_client/v42/data_values.py
import_grouped_by_dataset(values, *, chunk_size=1000, force=False, skip_audit=False)
async
¶
Import typed DataValues grouped by dataset — explicit-envelope POST.
v43 added auto-target dataset detection that aborts mixed-DE chunks
(BUGS.md #35). v42 accepts the same explicit {"dataSet": "<id>",
"dataValues": [...]} envelope shape that v43 needs. Using this
method on v42 is forward-compatible: code that works on v42 keeps
working on v43 without changes.
Pre-fetches the DataElement → DataSet membership map, groups the
input values by their DataSet (lexicographically-first DataSet id
when a DE belongs to multiple — deterministic across runs), and
POSTs each group as a separate envelope. Splits each per-dataset
group into chunk_size rows per POST.
Returns one WebMessageResponse per chunk. Skips values whose
DataElement isn't in any DataSet.