Skip to content

Data values (streaming import)

DataValuesAccessor on Dhis2Client.data_values — streams uploads to POST /api/dataValueSets without buffering the whole payload in memory. Accepts JSON / XML / CSV / ADX content types. For the typed-list-of-DataValue case, see Aggregate data values; the streaming accessor is the right tool when the payload is too large to materialise (CSV with hundreds of thousands of rows, etc.).

When to reach for it

  • Importing a CSV / JSON file that's larger than the host's free RAM.
  • Pipe-style imports where the source is an AsyncIterable[bytes] (e.g. a transform step that emits a row at a time).
  • Mixed-DataSet writes on DHIS2 v43 — the grouped path is the workaround for BUGS #35.

Worked example — stream a CSV file to DHIS2

from pathlib import Path

from dhis2w_core.client_context import open_client
from dhis2w_core.profile import profile_from_env


async with open_client(profile_from_env()) as client:
    # `stream` takes a Path (or any AsyncIterable[bytes]) + a content type.
    # The body is sent chunked; httpx never materialises the whole file.
    envelope = await client.data_values.stream(
        Path("./monthly-coverage-2026.csv"),
        content_type="application/csv",
    )
    count = envelope.import_count()
    if envelope.status == "OK" and count:
        print(f"imported {count.imported}  updated {count.updated}  ignored {count.ignored}")
    else:
        print(f"status={envelope.status!r}  message={envelope.message!r}")

Worked example — typed DataValue write (small batch)

from dhis2w_client import DataValue


values = [
    DataValue(
        dataElement="fbfJHSPpUQD",
        period="202604",
        orgUnit="ImspTQPwCqd",
        categoryOptionCombo="HllvX50cXC0",
        attributeOptionCombo="HllvX50cXC0",
        value="42",
    ),
]

async with open_client(profile_from_env()) as client:
    # `import_grouped_by_dataset` is the cross-version write path
    # (required on v43 for DEs in multiple DataSets — BUGS #35).
    # Returns `list[WebMessageResponse]` — one envelope per DataSet group.
    envelopes = await client.data_values.import_grouped_by_dataset(values)
    for env in envelopes:
        count = env.import_count()
        print(f"  status={env.status}  imported={count.imported if count else '?'}")

data_values

Streaming data-value-set import — client.data_values.stream.

DHIS2's POST /api/dataValueSets accepts JSON, XML, CSV, and ADX payloads. For a 100k-row push (a typical month-end aggregate upload), buffering the whole body in Python memory before the POST is the thing to avoid:

  • A 100k-row JSON payload sits at ~30-60 MB on the wire, and the Python parsed shape is 3-5x that — so ~150 MB resident just to stage the request.
  • The same payload on CSV is ~8 MB; XML is in between.

client.data_values.stream(source, content_type) feeds httpx's chunked transfer encoding directly, so the payload never sits fully in memory on the client side. The server consumes it as it arrives.

source accepts any of:

  • pathlib.Path — opens the file and chunks it through.
  • bytes / bytearray — single-shot for callers who already have the body assembled but want the typed WebMessageResponse envelope.
  • Iterable[bytes] / AsyncIterable[bytes] — pass-through for generators that build the body on the fly (e.g. DB-row → CSV line).
  • File-like with .read(size) -> bytes (sync or async) — adapted to a chunk iterator.

Supported content_type values map to the DHIS2-accepted MIME types:

  • application/json (default)
  • application/xml
  • application/csv (also accepted: text/csv)
  • application/adx+xml

Classes

DataValuesAccessor

Dhis2Client.data_values — streaming uploads to /api/dataValueSets.

Stateless wrapper over the streaming POST path. Stay here for the large import cases; use dhis2w_core.plugins.aggregate.service.push_data_values when the payload is already a small in-memory list of typed data values.

Source code in packages/dhis2w-client/src/dhis2w_client/v42/data_values.py
class DataValuesAccessor:
    """`Dhis2Client.data_values` — streaming uploads to `/api/dataValueSets`.

    Stateless wrapper over the streaming POST path. Stay here for the large
    import cases; use `dhis2w_core.plugins.aggregate.service.push_data_values`
    when the payload is already a small in-memory list of typed data values.
    """

    def __init__(self, client: Dhis2Client) -> None:
        """Bind to the sharing client."""
        self._client = client

    async def stream(
        self,
        source: StreamSource,
        *,
        content_type: str = "application/json",
        dry_run: bool = False,
        preheat_cache: bool = True,
        import_strategy: str | None = None,
        id_scheme: str | None = None,
        data_element_id_scheme: str | None = None,
        org_unit_id_scheme: str | None = None,
        skip_audit: bool = False,
        async_job: bool = False,
        chunk_size: int = _DEFAULT_CHUNK_SIZE,
    ) -> WebMessageResponse:
        """Stream `source` to `POST /api/dataValueSets` and return the typed envelope.

        `content_type` picks which DHIS2 parser handles the body (JSON / XML /
        CSV / ADX). Every param from the standard `/api/dataValueSets` surface
        is forwarded via query string:

        - `dry_run` → `dryRun=true`: validate without committing.
        - `preheat_cache=False` → `preheatCache=false`.
        - `import_strategy`: `CREATE` / `UPDATE` / `CREATE_AND_UPDATE` / `DELETE`.
        - `id_scheme` / `data_element_id_scheme` / `org_unit_id_scheme`: pick
          the identifier scheme for the payload (`UID` / `CODE` / `NAME` / ...).
        - `skip_audit=True` → `skipAudit=true`.
        - `async_job=True` → `async=true`: DHIS2 queues the import as a job
          and the returned envelope carries `response.jobType` / `response.id`.
          Poll with `client.tasks.await_completion(envelope.task_ref())`.

        Returns a `WebMessageResponse`. For synchronous imports,
        `envelope.import_count()` gives `ImportCount.imported / updated /
        ignored / deleted`; `envelope.conflicts()` lists per-row rejections.
        Async imports return the task-ref envelope — poll it to completion
        to get the final report from DHIS2.
        """
        params: dict[str, Any] = {}
        if dry_run:
            params["dryRun"] = "true"
        if not preheat_cache:
            params["preheatCache"] = "false"
        if import_strategy is not None:
            params["importStrategy"] = import_strategy
        if id_scheme is not None:
            params["idScheme"] = id_scheme
        if data_element_id_scheme is not None:
            params["dataElementIdScheme"] = data_element_id_scheme
        if org_unit_id_scheme is not None:
            params["orgUnitIdScheme"] = org_unit_id_scheme
        if skip_audit:
            params["skipAudit"] = "true"
        if async_job:
            params["async"] = "true"

        content = _coerce_stream_source(source, chunk_size=chunk_size)
        response = await self._client._request(  # noqa: SLF001 — accessor is intentionally tight with the client
            "POST",
            "/api/dataValueSets",
            params=params,
            content=content,
            extra_headers={"Content-Type": content_type},
        )
        raw = response.json() if response.content else {}
        return WebMessageResponse.model_validate(raw)

    async def import_grouped_by_dataset(
        self,
        values: Sequence[DataValue],
        *,
        chunk_size: int = 1000,
        force: bool = False,
        skip_audit: bool = False,
    ) -> list[WebMessageResponse]:
        """Import typed `DataValue`s grouped by dataset — explicit-envelope POST.

        v43 added auto-target dataset detection that aborts mixed-DE chunks
        (BUGS.md #35). v42 accepts the same explicit `{"dataSet": "<id>",
        "dataValues": [...]}` envelope shape that v43 needs. Using this
        method on v42 is forward-compatible: code that works on v42 keeps
        working on v43 without changes.

        Pre-fetches the DataElement → DataSet membership map, groups the
        input values by their DataSet (lexicographically-first DataSet id
        when a DE belongs to multiple — deterministic across runs), and
        POSTs each group as a separate envelope. Splits each per-dataset
        group into `chunk_size` rows per POST.

        Returns one `WebMessageResponse` per chunk. Skips values whose
        DataElement isn't in any DataSet.
        """
        dataelement_to_dataset = await self._build_dataelement_to_dataset()
        grouped: dict[str, list[dict[str, Any]]] = {}
        for value in values:
            if value.dataElement is None:
                continue
            dataset_id = dataelement_to_dataset.get(value.dataElement)
            if dataset_id is None:
                continue
            grouped.setdefault(dataset_id, []).append(value.model_dump(by_alias=True, exclude_none=True, mode="json"))
        params: dict[str, Any] = {}
        if force:
            params["force"] = "true"
        if skip_audit:
            params["skipAudit"] = "true"
        responses: list[WebMessageResponse] = []
        for dataset_id, dumped in grouped.items():
            for start in range(0, len(dumped), chunk_size):
                chunk = dumped[start : start + chunk_size]
                raw = await self._client._request(  # noqa: SLF001
                    "POST",
                    "/api/dataValueSets",
                    params=params,
                    json={"dataSet": dataset_id, "dataValues": chunk},
                )
                body = raw.json() if raw.content else {}
                responses.append(WebMessageResponse.model_validate(body))
        return responses

    async def _build_dataelement_to_dataset(self) -> dict[str, str]:
        """Map every DE id to one of its DataSets (lexicographically-first when multiple)."""
        raw = await self._client.get_raw(
            "/api/dataSets",
            params={"fields": "id,dataSetElements[dataElement[id]]", "paging": "false"},
        )
        members: dict[str, list[str]] = {}
        for dataset in raw.get("dataSets") or []:
            dataset_id = dataset.get("id")
            if not isinstance(dataset_id, str):
                continue
            for entry in dataset.get("dataSetElements") or []:
                element = (entry.get("dataElement") or {}).get("id")
                if isinstance(element, str):
                    members.setdefault(element, []).append(dataset_id)
        return {element_id: sorted(dataset_ids)[0] for element_id, dataset_ids in members.items()}
Functions
__init__(client)

Bind to the sharing client.

Source code in packages/dhis2w-client/src/dhis2w_client/v42/data_values.py
def __init__(self, client: Dhis2Client) -> None:
    """Bind to the sharing client."""
    self._client = client
stream(source, *, content_type='application/json', dry_run=False, preheat_cache=True, import_strategy=None, id_scheme=None, data_element_id_scheme=None, org_unit_id_scheme=None, skip_audit=False, async_job=False, chunk_size=_DEFAULT_CHUNK_SIZE) async

Stream source to POST /api/dataValueSets and return the typed envelope.

content_type picks which DHIS2 parser handles the body (JSON / XML / CSV / ADX). Every param from the standard /api/dataValueSets surface is forwarded via query string:

  • dry_rundryRun=true: validate without committing.
  • preheat_cache=FalsepreheatCache=false.
  • import_strategy: CREATE / UPDATE / CREATE_AND_UPDATE / DELETE.
  • id_scheme / data_element_id_scheme / org_unit_id_scheme: pick the identifier scheme for the payload (UID / CODE / NAME / ...).
  • skip_audit=TrueskipAudit=true.
  • async_job=Trueasync=true: DHIS2 queues the import as a job and the returned envelope carries response.jobType / response.id. Poll with client.tasks.await_completion(envelope.task_ref()).

Returns a WebMessageResponse. For synchronous imports, envelope.import_count() gives ImportCount.imported / updated / ignored / deleted; envelope.conflicts() lists per-row rejections. Async imports return the task-ref envelope — poll it to completion to get the final report from DHIS2.

Source code in packages/dhis2w-client/src/dhis2w_client/v42/data_values.py
async def stream(
    self,
    source: StreamSource,
    *,
    content_type: str = "application/json",
    dry_run: bool = False,
    preheat_cache: bool = True,
    import_strategy: str | None = None,
    id_scheme: str | None = None,
    data_element_id_scheme: str | None = None,
    org_unit_id_scheme: str | None = None,
    skip_audit: bool = False,
    async_job: bool = False,
    chunk_size: int = _DEFAULT_CHUNK_SIZE,
) -> WebMessageResponse:
    """Stream `source` to `POST /api/dataValueSets` and return the typed envelope.

    `content_type` picks which DHIS2 parser handles the body (JSON / XML /
    CSV / ADX). Every param from the standard `/api/dataValueSets` surface
    is forwarded via query string:

    - `dry_run` → `dryRun=true`: validate without committing.
    - `preheat_cache=False` → `preheatCache=false`.
    - `import_strategy`: `CREATE` / `UPDATE` / `CREATE_AND_UPDATE` / `DELETE`.
    - `id_scheme` / `data_element_id_scheme` / `org_unit_id_scheme`: pick
      the identifier scheme for the payload (`UID` / `CODE` / `NAME` / ...).
    - `skip_audit=True` → `skipAudit=true`.
    - `async_job=True` → `async=true`: DHIS2 queues the import as a job
      and the returned envelope carries `response.jobType` / `response.id`.
      Poll with `client.tasks.await_completion(envelope.task_ref())`.

    Returns a `WebMessageResponse`. For synchronous imports,
    `envelope.import_count()` gives `ImportCount.imported / updated /
    ignored / deleted`; `envelope.conflicts()` lists per-row rejections.
    Async imports return the task-ref envelope — poll it to completion
    to get the final report from DHIS2.
    """
    params: dict[str, Any] = {}
    if dry_run:
        params["dryRun"] = "true"
    if not preheat_cache:
        params["preheatCache"] = "false"
    if import_strategy is not None:
        params["importStrategy"] = import_strategy
    if id_scheme is not None:
        params["idScheme"] = id_scheme
    if data_element_id_scheme is not None:
        params["dataElementIdScheme"] = data_element_id_scheme
    if org_unit_id_scheme is not None:
        params["orgUnitIdScheme"] = org_unit_id_scheme
    if skip_audit:
        params["skipAudit"] = "true"
    if async_job:
        params["async"] = "true"

    content = _coerce_stream_source(source, chunk_size=chunk_size)
    response = await self._client._request(  # noqa: SLF001 — accessor is intentionally tight with the client
        "POST",
        "/api/dataValueSets",
        params=params,
        content=content,
        extra_headers={"Content-Type": content_type},
    )
    raw = response.json() if response.content else {}
    return WebMessageResponse.model_validate(raw)
import_grouped_by_dataset(values, *, chunk_size=1000, force=False, skip_audit=False) async

Import typed DataValues grouped by dataset — explicit-envelope POST.

v43 added auto-target dataset detection that aborts mixed-DE chunks (BUGS.md #35). v42 accepts the same explicit {"dataSet": "<id>", "dataValues": [...]} envelope shape that v43 needs. Using this method on v42 is forward-compatible: code that works on v42 keeps working on v43 without changes.

Pre-fetches the DataElement → DataSet membership map, groups the input values by their DataSet (lexicographically-first DataSet id when a DE belongs to multiple — deterministic across runs), and POSTs each group as a separate envelope. Splits each per-dataset group into chunk_size rows per POST.

Returns one WebMessageResponse per chunk. Skips values whose DataElement isn't in any DataSet.

Source code in packages/dhis2w-client/src/dhis2w_client/v42/data_values.py
async def import_grouped_by_dataset(
    self,
    values: Sequence[DataValue],
    *,
    chunk_size: int = 1000,
    force: bool = False,
    skip_audit: bool = False,
) -> list[WebMessageResponse]:
    """Import typed `DataValue`s grouped by dataset — explicit-envelope POST.

    v43 added auto-target dataset detection that aborts mixed-DE chunks
    (BUGS.md #35). v42 accepts the same explicit `{"dataSet": "<id>",
    "dataValues": [...]}` envelope shape that v43 needs. Using this
    method on v42 is forward-compatible: code that works on v42 keeps
    working on v43 without changes.

    Pre-fetches the DataElement → DataSet membership map, groups the
    input values by their DataSet (lexicographically-first DataSet id
    when a DE belongs to multiple — deterministic across runs), and
    POSTs each group as a separate envelope. Splits each per-dataset
    group into `chunk_size` rows per POST.

    Returns one `WebMessageResponse` per chunk. Skips values whose
    DataElement isn't in any DataSet.
    """
    dataelement_to_dataset = await self._build_dataelement_to_dataset()
    grouped: dict[str, list[dict[str, Any]]] = {}
    for value in values:
        if value.dataElement is None:
            continue
        dataset_id = dataelement_to_dataset.get(value.dataElement)
        if dataset_id is None:
            continue
        grouped.setdefault(dataset_id, []).append(value.model_dump(by_alias=True, exclude_none=True, mode="json"))
    params: dict[str, Any] = {}
    if force:
        params["force"] = "true"
    if skip_audit:
        params["skipAudit"] = "true"
    responses: list[WebMessageResponse] = []
    for dataset_id, dumped in grouped.items():
        for start in range(0, len(dumped), chunk_size):
            chunk = dumped[start : start + chunk_size]
            raw = await self._client._request(  # noqa: SLF001
                "POST",
                "/api/dataValueSets",
                params=params,
                json={"dataSet": dataset_id, "dataValues": chunk},
            )
            body = raw.json() if raw.content else {}
            responses.append(WebMessageResponse.model_validate(body))
    return responses