Skip to content

Validation + predictors

ValidationAccessor on Dhis2Client.validation + PredictorsAccessor on Dhis2Client.predictors. Covers the DHIS2 validation-rule + predictor workflow endpoints. CRUD on the rules / predictors themselves stays on the generated client.resources.validation_rules / client.resources.predictors accessors.

validation

DHIS2 validation-rule workflow — Dhis2Client.validation.

Companion to client.maintenance (data-integrity). Validation rules are boolean formulas over data elements (e.g. ANC 1st visit >= ANC 4th visit logically doesn't hold). Running the rules against captured data produces violations — cells where the rule evaluates to false.

Three endpoint families covered:

  • Ad-hoc analysisPOST /api/dataAnalysis/validationRules. Runs the analysis synchronously + returns violations. persist=True writes the violations into DHIS2's /api/validationResults table; notification=True sends out the configured notification templates.
  • Stored resultsGET|DELETE /api/validationResults. Browse / purge results previously persisted.
  • Expression validation/api/expressions/description, plus per-context variants (/api/validationRules/expression/description, /api/indicators/expression/description, /api/predictors/expression/description, /api/programIndicators/expression/description). Ad-hoc check that an expression parses + references existing UIDs.

CRUD for validationRules / predictors themselves stays on the generated client.resources accessors (client.resources.validation_rules.save_bulk(...) etc.). This module is only for the workflow endpoints.

Classes

ValidationAnalysisResult

Bases: BaseModel

One hit from POST /api/dataAnalysis/validationRules.

Distinct from the persisted ValidationResult (stored at /api/validationResults): the ad-hoc analysis endpoint returns a flat shape — IDs + display names inlined — while the persisted shape nests full BaseIdentifiableObject refs. Both are useful, so we keep both models; this one is what run_analysis() returns.

Source code in packages/dhis2w-client/src/dhis2w_client/validation.py
class ValidationAnalysisResult(BaseModel):
    """One hit from `POST /api/dataAnalysis/validationRules`.

    Distinct from the persisted `ValidationResult` (stored at
    `/api/validationResults`): the ad-hoc analysis endpoint returns a flat
    shape — IDs + display names inlined — while the persisted shape nests
    full `BaseIdentifiableObject` refs. Both are useful, so we keep both
    models; this one is what `run_analysis()` returns.
    """

    model_config = ConfigDict(extra="allow", populate_by_name=True)

    validationRuleId: str | None = None
    validationRuleDescription: str | None = None
    organisationUnitId: str | None = None
    organisationUnitDisplayName: str | None = None
    organisationUnitPath: str | None = None
    organisationUnitAncestorNames: str | None = None
    periodId: str | None = None
    periodDisplayName: str | None = None
    attributeOptionComboId: str | None = None
    attributeOptionComboDisplayName: str | None = None
    importance: Importance | None = None
    leftSideValue: float | None = None
    operator: str | None = None
    rightSideValue: float | None = None

ExpressionDescription

Bases: BaseModel

Result of /api/expressions/description — parse status + rendered description.

DHIS2 returns a WebMessage-ish envelope with message="Valid" on parse success and description="…" rendering the expression with UIDs replaced by names. Errors come back with status="ERROR" and a message naming the problem (e.g. "Data element not found: abcUid").

Source code in packages/dhis2w-client/src/dhis2w_client/validation.py
class ExpressionDescription(BaseModel):
    """Result of `/api/expressions/description` — parse status + rendered description.

    DHIS2 returns a `WebMessage`-ish envelope with `message="Valid"` on parse
    success and `description="…"` rendering the expression with UIDs replaced
    by names. Errors come back with `status="ERROR"` and a `message` naming
    the problem (e.g. `"Data element not found: abcUid"`).
    """

    model_config = ConfigDict(extra="allow")

    status: str | None = None
    message: str | None = None
    description: str | None = None

    @property
    def valid(self) -> bool:
        """`True` when DHIS2 accepted the expression; `False` on parse errors."""
        return (self.status or "").upper() == "OK"
Attributes
valid property

True when DHIS2 accepted the expression; False on parse errors.

ValidationAccessor

Dhis2Client.validation — run validation rules + inspect stored results.

Read the docstrings on individual methods for the DHIS2-endpoint-level semantics (persist / notification flags, filter parameters, etc.).

Source code in packages/dhis2w-client/src/dhis2w_client/validation.py
class ValidationAccessor:
    """`Dhis2Client.validation` — run validation rules + inspect stored results.

    Read the docstrings on individual methods for the DHIS2-endpoint-level
    semantics (persist / notification flags, filter parameters, etc.).
    """

    def __init__(self, client: Dhis2Client) -> None:
        """Bind to the sharing client."""
        self._client = client

    async def run_analysis(
        self,
        *,
        org_unit: str,
        start_date: str,
        end_date: str,
        validation_rule_group: str | None = None,
        max_results: int | None = None,
        notification: bool = False,
        persist: bool = False,
    ) -> list[ValidationAnalysisResult]:
        """Run `POST /api/dataAnalysis/validationRules` synchronously; return violations.

        Synchronous — DHIS2 returns the violations in the response body (no
        task polling). For a whole-instance sweep pass the root org unit UID
        (DHIS2 walks the sub-tree). Narrow with `validation_rule_group` when
        you only want to evaluate one rule bundle.

        `persist=True` writes each violation into DHIS2's
        `/api/validationResults` table so later `list_results()` calls can
        walk them; `notification=True` fires the configured notification
        templates for each triggered rule. Both default off for
        ad-hoc / exploratory runs.
        """
        body: dict[str, Any] = {
            "ou": org_unit,
            "startDate": start_date,
            "endDate": end_date,
            "notification": notification,
            "persist": persist,
        }
        if validation_rule_group is not None:
            body["vrg"] = validation_rule_group
        if max_results is not None:
            body["maxResults"] = max_results
        raw = await self._client.post_raw("/api/dataAnalysis/validationRules", body=body)
        # DHIS2 wraps the list as either `{"data": [...]}` (older v42 builds)
        # or returns the array directly. `_parse_json` wraps top-level arrays
        # as `{"data": [...]}` before this method sees them.
        candidates: list[Any] = []
        data = raw.get("data")
        if isinstance(data, list):
            candidates = data
        else:
            results = raw.get("validationResults")
            if isinstance(results, list):
                candidates = results
        return [ValidationAnalysisResult.model_validate(row) for row in candidates if isinstance(row, dict)]

    async def list_results(
        self,
        *,
        org_unit: str | None = None,
        period: str | None = None,
        validation_rule: str | None = None,
        created_date: str | None = None,
        page: int | None = None,
        page_size: int | None = None,
        fields: str | None = None,
    ) -> list[ValidationResult]:
        """List persisted validation results (`GET /api/validationResults`).

        DHIS2 accepts `ou`, `pe`, `vr`, `createdDate` as filters (repeatable
        for `ou`, `pe`, `vr`). Defaults to returning every result — narrow
        with at least one filter on real instances where the table can run
        to millions of rows.

        `fields` defaults to a selector that pulls `displayName` + the
        owning rule's `importance` + `operator` inline, so the resulting
        `ValidationResult`s carry readable data without a second lookup.
        Override with a narrower selector for large-scale runs where only
        counts / UIDs matter.
        """
        params: dict[str, Any] = {"fields": fields if fields is not None else _DEFAULT_RESULT_FIELDS}
        if org_unit is not None:
            params["ou"] = org_unit
        if period is not None:
            params["pe"] = period
        if validation_rule is not None:
            params["vr"] = validation_rule
        if created_date is not None:
            params["createdDate"] = created_date
        if page is not None:
            params["page"] = page
        if page_size is not None:
            params["pageSize"] = page_size
        raw = await self._client.get_raw("/api/validationResults", params=params)
        rows = raw.get("validationResults") or []
        return [ValidationResult.model_validate(row) for row in rows if isinstance(row, dict)]

    async def get_result(self, result_id: int | str, *, fields: str | None = None) -> ValidationResult:
        """Fetch a single persisted validation result by its numeric id.

        `fields` defaults to the same display-friendly selector
        `list_results` uses — override with a narrower selector if you
        only need a subset.
        """
        params = {"fields": fields if fields is not None else _DEFAULT_RESULT_FIELDS}
        raw = await self._client.get_raw(f"/api/validationResults/{result_id}", params=params)
        return ValidationResult.model_validate(raw)

    async def delete_results(
        self,
        *,
        org_units: Sequence[str] | None = None,
        periods: Sequence[str] | None = None,
        validation_rules: Sequence[str] | None = None,
    ) -> None:
        """Bulk-delete persisted validation results matching the filters.

        `DELETE /api/validationResults` accepts the same filter keys as the
        list endpoint. At least one of `org_units` / `periods` /
        `validation_rules` must be non-empty — a filter-less delete would
        wipe every row.
        """
        if not (org_units or periods or validation_rules):
            raise ValueError(
                "delete_results requires at least one of org_units / periods / validation_rules — "
                "refusing to wipe the whole validation-results table.",
            )
        params: dict[str, list[str]] = {}
        if org_units:
            params["ou"] = list(org_units)
        if periods:
            params["pe"] = list(periods)
        if validation_rules:
            params["vr"] = list(validation_rules)
        await self._client.delete_raw("/api/validationResults", params=params)

    async def send_notifications(self) -> WebMessageResponse:
        """Fire the configured notification templates for every current violation.

        Posts to `/api/validation/sendNotifications` — DHIS2 walks the
        `validationResults` table + queues messages per template.
        """
        raw = await self._client.post_raw("/api/validation/sendNotifications")
        return WebMessageResponse.model_validate(raw)

    async def describe_expression(
        self,
        expression: str,
        *,
        context: ExpressionContext = "generic",
    ) -> ExpressionDescription:
        """Parse-check an expression + render a human description.

        `context` picks which DHIS2 parser runs — `generic` / `validation-rule`
        / `indicator` / `predictor` / `program-indicator` use different
        allowed-reference sets. Returns `ExpressionDescription.valid` (bool)
        + `message` (the parse error, on failure).
        """
        method, path = _EXPRESSION_DESCRIBE_PATHS[context]
        if method == "GET":
            raw = await self._client.get_raw(path, params={"expression": expression})
        else:
            response = await self._client._request(  # noqa: SLF001 — accessor is tight with the client
                "POST",
                path,
                content=expression.encode("utf-8"),
                extra_headers={"Content-Type": "text/plain"},
            )
            raw = response.json() if response.content else {}
        return ExpressionDescription.model_validate(raw)
Functions
__init__(client)

Bind to the sharing client.

Source code in packages/dhis2w-client/src/dhis2w_client/validation.py
def __init__(self, client: Dhis2Client) -> None:
    """Bind to the sharing client."""
    self._client = client
run_analysis(*, org_unit, start_date, end_date, validation_rule_group=None, max_results=None, notification=False, persist=False) async

Run POST /api/dataAnalysis/validationRules synchronously; return violations.

Synchronous — DHIS2 returns the violations in the response body (no task polling). For a whole-instance sweep pass the root org unit UID (DHIS2 walks the sub-tree). Narrow with validation_rule_group when you only want to evaluate one rule bundle.

persist=True writes each violation into DHIS2's /api/validationResults table so later list_results() calls can walk them; notification=True fires the configured notification templates for each triggered rule. Both default off for ad-hoc / exploratory runs.

Source code in packages/dhis2w-client/src/dhis2w_client/validation.py
async def run_analysis(
    self,
    *,
    org_unit: str,
    start_date: str,
    end_date: str,
    validation_rule_group: str | None = None,
    max_results: int | None = None,
    notification: bool = False,
    persist: bool = False,
) -> list[ValidationAnalysisResult]:
    """Run `POST /api/dataAnalysis/validationRules` synchronously; return violations.

    Synchronous — DHIS2 returns the violations in the response body (no
    task polling). For a whole-instance sweep pass the root org unit UID
    (DHIS2 walks the sub-tree). Narrow with `validation_rule_group` when
    you only want to evaluate one rule bundle.

    `persist=True` writes each violation into DHIS2's
    `/api/validationResults` table so later `list_results()` calls can
    walk them; `notification=True` fires the configured notification
    templates for each triggered rule. Both default off for
    ad-hoc / exploratory runs.
    """
    body: dict[str, Any] = {
        "ou": org_unit,
        "startDate": start_date,
        "endDate": end_date,
        "notification": notification,
        "persist": persist,
    }
    if validation_rule_group is not None:
        body["vrg"] = validation_rule_group
    if max_results is not None:
        body["maxResults"] = max_results
    raw = await self._client.post_raw("/api/dataAnalysis/validationRules", body=body)
    # DHIS2 wraps the list as either `{"data": [...]}` (older v42 builds)
    # or returns the array directly. `_parse_json` wraps top-level arrays
    # as `{"data": [...]}` before this method sees them.
    candidates: list[Any] = []
    data = raw.get("data")
    if isinstance(data, list):
        candidates = data
    else:
        results = raw.get("validationResults")
        if isinstance(results, list):
            candidates = results
    return [ValidationAnalysisResult.model_validate(row) for row in candidates if isinstance(row, dict)]
list_results(*, org_unit=None, period=None, validation_rule=None, created_date=None, page=None, page_size=None, fields=None) async

List persisted validation results (GET /api/validationResults).

DHIS2 accepts ou, pe, vr, createdDate as filters (repeatable for ou, pe, vr). Defaults to returning every result — narrow with at least one filter on real instances where the table can run to millions of rows.

fields defaults to a selector that pulls displayName + the owning rule's importance + operator inline, so the resulting ValidationResults carry readable data without a second lookup. Override with a narrower selector for large-scale runs where only counts / UIDs matter.

Source code in packages/dhis2w-client/src/dhis2w_client/validation.py
async def list_results(
    self,
    *,
    org_unit: str | None = None,
    period: str | None = None,
    validation_rule: str | None = None,
    created_date: str | None = None,
    page: int | None = None,
    page_size: int | None = None,
    fields: str | None = None,
) -> list[ValidationResult]:
    """List persisted validation results (`GET /api/validationResults`).

    DHIS2 accepts `ou`, `pe`, `vr`, `createdDate` as filters (repeatable
    for `ou`, `pe`, `vr`). Defaults to returning every result — narrow
    with at least one filter on real instances where the table can run
    to millions of rows.

    `fields` defaults to a selector that pulls `displayName` + the
    owning rule's `importance` + `operator` inline, so the resulting
    `ValidationResult`s carry readable data without a second lookup.
    Override with a narrower selector for large-scale runs where only
    counts / UIDs matter.
    """
    params: dict[str, Any] = {"fields": fields if fields is not None else _DEFAULT_RESULT_FIELDS}
    if org_unit is not None:
        params["ou"] = org_unit
    if period is not None:
        params["pe"] = period
    if validation_rule is not None:
        params["vr"] = validation_rule
    if created_date is not None:
        params["createdDate"] = created_date
    if page is not None:
        params["page"] = page
    if page_size is not None:
        params["pageSize"] = page_size
    raw = await self._client.get_raw("/api/validationResults", params=params)
    rows = raw.get("validationResults") or []
    return [ValidationResult.model_validate(row) for row in rows if isinstance(row, dict)]
get_result(result_id, *, fields=None) async

Fetch a single persisted validation result by its numeric id.

fields defaults to the same display-friendly selector list_results uses — override with a narrower selector if you only need a subset.

Source code in packages/dhis2w-client/src/dhis2w_client/validation.py
async def get_result(self, result_id: int | str, *, fields: str | None = None) -> ValidationResult:
    """Fetch a single persisted validation result by its numeric id.

    `fields` defaults to the same display-friendly selector
    `list_results` uses — override with a narrower selector if you
    only need a subset.
    """
    params = {"fields": fields if fields is not None else _DEFAULT_RESULT_FIELDS}
    raw = await self._client.get_raw(f"/api/validationResults/{result_id}", params=params)
    return ValidationResult.model_validate(raw)
delete_results(*, org_units=None, periods=None, validation_rules=None) async

Bulk-delete persisted validation results matching the filters.

DELETE /api/validationResults accepts the same filter keys as the list endpoint. At least one of org_units / periods / validation_rules must be non-empty — a filter-less delete would wipe every row.

Source code in packages/dhis2w-client/src/dhis2w_client/validation.py
async def delete_results(
    self,
    *,
    org_units: Sequence[str] | None = None,
    periods: Sequence[str] | None = None,
    validation_rules: Sequence[str] | None = None,
) -> None:
    """Bulk-delete persisted validation results matching the filters.

    `DELETE /api/validationResults` accepts the same filter keys as the
    list endpoint. At least one of `org_units` / `periods` /
    `validation_rules` must be non-empty — a filter-less delete would
    wipe every row.
    """
    if not (org_units or periods or validation_rules):
        raise ValueError(
            "delete_results requires at least one of org_units / periods / validation_rules — "
            "refusing to wipe the whole validation-results table.",
        )
    params: dict[str, list[str]] = {}
    if org_units:
        params["ou"] = list(org_units)
    if periods:
        params["pe"] = list(periods)
    if validation_rules:
        params["vr"] = list(validation_rules)
    await self._client.delete_raw("/api/validationResults", params=params)
send_notifications() async

Fire the configured notification templates for every current violation.

Posts to /api/validation/sendNotifications — DHIS2 walks the validationResults table + queues messages per template.

Source code in packages/dhis2w-client/src/dhis2w_client/validation.py
async def send_notifications(self) -> WebMessageResponse:
    """Fire the configured notification templates for every current violation.

    Posts to `/api/validation/sendNotifications` — DHIS2 walks the
    `validationResults` table + queues messages per template.
    """
    raw = await self._client.post_raw("/api/validation/sendNotifications")
    return WebMessageResponse.model_validate(raw)
describe_expression(expression, *, context='generic') async

Parse-check an expression + render a human description.

context picks which DHIS2 parser runs — generic / validation-rule / indicator / predictor / program-indicator use different allowed-reference sets. Returns ExpressionDescription.valid (bool) + message (the parse error, on failure).

Source code in packages/dhis2w-client/src/dhis2w_client/validation.py
async def describe_expression(
    self,
    expression: str,
    *,
    context: ExpressionContext = "generic",
) -> ExpressionDescription:
    """Parse-check an expression + render a human description.

    `context` picks which DHIS2 parser runs — `generic` / `validation-rule`
    / `indicator` / `predictor` / `program-indicator` use different
    allowed-reference sets. Returns `ExpressionDescription.valid` (bool)
    + `message` (the parse error, on failure).
    """
    method, path = _EXPRESSION_DESCRIBE_PATHS[context]
    if method == "GET":
        raw = await self._client.get_raw(path, params={"expression": expression})
    else:
        response = await self._client._request(  # noqa: SLF001 — accessor is tight with the client
            "POST",
            path,
            content=expression.encode("utf-8"),
            extra_headers={"Content-Type": "text/plain"},
        )
        raw = response.json() if response.content else {}
    return ExpressionDescription.model_validate(raw)

predictors

Predictor authoring + run — Dhis2Client.predictors.

Predictors generate data values from expressions over historical data (e.g. "3-month rolling average of X" → emit a synthetic DataElement row). The accessor covers both authoring (create / update / delete) and the run endpoints DHIS2 exposes:

  • POST /api/predictors/run?startDate=…&endDate=… — run every predictor on the instance.
  • POST /api/predictors/{uid}/run?startDate=…&endDate=… — run one.
  • POST /api/predictorGroups/{uid}/run?startDate=…&endDate=… — run a named group of predictors in one pass (exposed from PredictorsAccessor.run_group for backward compatibility + also from PredictorGroupsAccessor.run).

All three run shapes return a WebMessageResponse with a summary of predictions written / ignored / failed; none of them kick a background job, so there's no task to watch.

Authoring surface: Predictor.generator is an Expression sub-object typed as Any on the generated schema. create(...) assembles the minimal wrapper here so callers pass the expression string + description, not the nested payload.

No *Spec builder — continues the spec-audit data point.

Classes

Predictor

Bases: BaseModel

Generated model for DHIS2 Predictor.

DHIS2 Predictor - persisted metadata (generated from /api/schemas at DHIS2 v42).

API endpoint: /api/predictors.

Field Field(description=...) entries flag DHIS2 semantics the bare type can't capture: which side of a relationship owns the link (writable) vs the inverse side (ignored by the API), uniqueness constraints, and length bounds.

Source code in packages/dhis2w-client/src/dhis2w_client/generated/v42/schemas/predictor.py
class Predictor(BaseModel):
    """Generated model for DHIS2 `Predictor`.

    DHIS2 Predictor - persisted metadata (generated from /api/schemas at DHIS2 v42).

    API endpoint: /api/predictors.

    Field `Field(description=...)` entries flag DHIS2 semantics the bare
    type can't capture: which side of a relationship owns the link
    (writable) vs the inverse side (ignored by the API), uniqueness
    constraints, and length bounds.
    """

    model_config = ConfigDict(extra="allow", populate_by_name=True)

    access: Any | None = Field(default=None, description="Reference to Access. Read-only (inverse side).")
    annualSampleCount: int | None = Field(default=None, description="Length/value max=10.")
    attributeValues: Any | None = Field(
        default=None, description="Reference to AttributeValues. Read-only (inverse side)."
    )
    code: str | None = Field(default=None, description="Unique. Length/value max=50.")
    created: datetime | None = None
    createdBy: Reference | None = Field(default=None, description="Reference to User. Read-only (inverse side).")
    description: str | None = Field(default=None, description="Length/value min=1, max=2147483647.")
    displayDescription: str | None = Field(default=None, description="Read-only.")
    displayFormName: str | None = Field(default=None, description="Read-only.")
    displayName: str | None = Field(default=None, description="Read-only.")
    displayShortName: str | None = Field(default=None, description="Read-only.")
    favorite: bool | None = Field(default=None, description="Read-only.")
    favorites: list[Any] | None = Field(default=None, description="Collection of String. Read-only (inverse side).")
    formName: str | None = Field(default=None, description="Length/value max=2147483647.")
    generator: Any | None = Field(default=None, description="Reference to Expression. Unique. Length/value max=255.")
    href: str | None = None
    id: str | None = Field(default=None, description="Unique. Length/value min=11, max=11.")
    lastUpdated: datetime | None = None
    lastUpdatedBy: Reference | None = Field(default=None, description="Reference to User.")
    name: str | None = Field(default=None, description="Unique. Length/value min=1, max=230.")
    organisationUnitDescendants: OrganisationUnitDescendants | None = None
    organisationUnitLevels: list[Any] | None = Field(default=None, description="Collection of OrganisationUnitLevel.")
    output: Reference | None = Field(default=None, description="Reference to DataElement.")
    outputCombo: Reference | None = Field(default=None, description="Reference to CategoryOptionCombo.")
    periodType: PeriodType | None = Field(default=None, description="Reference to PeriodType. Length/value max=255.")
    predictorGroups: list[Any] | None = Field(
        default=None, description="Collection of PredictorGroup. Read-only (inverse side)."
    )
    sampleSkipTest: Any | None = Field(
        default=None, description="Reference to Expression. Unique. Length/value max=255."
    )
    sequentialSampleCount: int | None = Field(default=None, description="Length/value max=2147483647.")
    sequentialSkipCount: int | None = Field(default=None, description="Length/value max=2147483647.")
    sharing: Any | None = Field(default=None, description="Reference to Sharing. Read-only (inverse side).")
    shortName: str | None = Field(default=None, description="Unique. Length/value min=1, max=50.")
    translations: list[Any] | None = Field(default=None, description="Collection of Translation. Length/value max=255.")
    user: Reference | None = Field(default=None, description="Reference to User. Read-only (inverse side).")

PredictorsAccessor

Dhis2Client.predictors — CRUD + run helpers over /api/predictors.

Source code in packages/dhis2w-client/src/dhis2w_client/predictors.py
class PredictorsAccessor:
    """`Dhis2Client.predictors` — CRUD + run helpers over `/api/predictors`."""

    def __init__(self, client: Dhis2Client) -> None:
        """Bind to the sharing client."""
        self._client = client

    # ---- CRUD -----------------------------------------------------------

    async def list_all(
        self,
        *,
        period_type: PeriodType | str | None = None,
        page: int = 1,
        page_size: int = 50,
    ) -> list[Predictor]:
        """Page through Predictors, optionally filtered by periodType."""
        filters: list[str] | None = None
        if period_type is not None:
            value = period_type.value if isinstance(period_type, PeriodType) else period_type
            filters = [f"periodType:eq:{value}"]
        return cast(
            list[Predictor],
            await self._client.resources.predictors.list(
                fields=_PREDICTOR_FIELDS,
                filters=filters,
                page=page,
                page_size=page_size,
            ),
        )

    async def get(self, uid: str) -> Predictor:
        """Fetch one Predictor with generator, output, OU scope resolved inline."""
        return await self._client.get(f"/api/predictors/{uid}", model=Predictor, params={"fields": _PREDICTOR_FIELDS})

    async def create(
        self,
        *,
        name: str,
        short_name: str,
        expression: str,
        output_data_element_uid: str,
        period_type: PeriodType | str = PeriodType.MONTHLY,
        sequential_sample_count: int = 3,
        annual_sample_count: int = 0,
        sequential_skip_count: int = 0,
        organisation_unit_descendants: OrganisationUnitDescendants | str = OrganisationUnitDescendants.SELECTED,
        organisation_unit_level_uids: list[str] | None = None,
        output_combo_uid: str | None = None,
        missing_value_strategy: MissingValueStrategy | str = MissingValueStrategy.SKIP_IF_ALL_VALUES_MISSING,
        generator_description: str | None = None,
        description: str | None = None,
        code: str | None = None,
        uid: str | None = None,
    ) -> Predictor:
        """Create a Predictor.

        `expression` uses DHIS2's aggregate expression syntax; the
        accessor wraps it in the `generator` Expression sub-object.
        `output_data_element_uid` is the target DE the prediction writes
        to — needs a `TRACKER` or `AGGREGATE` domain and a numeric
        valueType.

        `sequential_sample_count` + `annual_sample_count` control the
        look-back window: `3` monthly samples with the default period
        type averages the three prior months.

        `organisation_unit_level_uids` scopes the run — pass the UIDs of
        the `OrganisationUnitLevel` rows the predictor should cover
        (typically the facility level for data-entry predictors).
        """
        payload: dict[str, Any] = {
            "name": name,
            "shortName": short_name,
            "periodType": period_type.value if isinstance(period_type, PeriodType) else period_type,
            "sequentialSampleCount": sequential_sample_count,
            "annualSampleCount": annual_sample_count,
            "sequentialSkipCount": sequential_skip_count,
            "organisationUnitDescendants": (
                organisation_unit_descendants.value
                if isinstance(organisation_unit_descendants, OrganisationUnitDescendants)
                else organisation_unit_descendants
            ),
            "output": {"id": output_data_element_uid},
            "generator": {
                "expression": expression,
                "missingValueStrategy": (
                    missing_value_strategy.value
                    if isinstance(missing_value_strategy, MissingValueStrategy)
                    else missing_value_strategy
                ),
                "slidingWindow": False,
            },
        }
        if generator_description:
            payload["generator"]["description"] = generator_description
        if output_combo_uid:
            payload["outputCombo"] = {"id": output_combo_uid}
        if organisation_unit_level_uids:
            payload["organisationUnitLevels"] = [{"id": level_uid} for level_uid in organisation_unit_level_uids]
        if uid:
            payload["id"] = uid
        if code:
            payload["code"] = code
        if description:
            payload["description"] = description
        envelope = await self._client.post("/api/predictors", payload, model=WebMessageResponse)
        created_uid = envelope.created_uid or uid
        if not created_uid:
            raise RuntimeError("predictor create did not return a uid")
        return await self.get(created_uid)

    async def update(self, predictor: Predictor) -> Predictor:
        """PUT an edited Predictor back. `predictor.id` must be set."""
        if not predictor.id:
            raise ValueError("update requires predictor.id to be set")
        body = predictor.model_dump(by_alias=True, exclude_none=True, mode="json")
        await self._client.put_raw(f"/api/predictors/{predictor.id}", body=body)
        return await self.get(predictor.id)

    async def rename(
        self,
        uid: str,
        *,
        name: str | None = None,
        short_name: str | None = None,
        description: str | None = None,
    ) -> Predictor:
        """Partial-update shortcut — read, mutate the label fields, PUT."""
        if name is None and short_name is None and description is None:
            raise ValueError("rename requires at least one of name / short_name / description")
        current = await self.get(uid)
        if name is not None:
            current.name = name
        if short_name is not None:
            current.shortName = short_name
        if description is not None:
            current.description = description
        return await self.update(current)

    async def delete(self, uid: str) -> None:
        """Delete a Predictor. DHIS2 keeps any data values it already wrote."""
        if not uid:
            raise ValueError("delete requires a non-empty uid")
        await self._client.resources.predictors.delete(uid)

    # ---- Run ------------------------------------------------------------

    async def run_all(self, *, start_date: str, end_date: str) -> WebMessageResponse:
        """Run every predictor on the instance for the given date range.

        Returns the summary envelope — `.import_count()` gives
        `imported / updated / ignored / deleted` counts for the emitted
        data values.
        """
        return await self._run("/api/predictors/run", start_date=start_date, end_date=end_date)

    async def run_one(self, predictor_uid: str, *, start_date: str, end_date: str) -> WebMessageResponse:
        """Run a single predictor by UID over the given date range."""
        return await self._run(
            f"/api/predictors/{predictor_uid}/run",
            start_date=start_date,
            end_date=end_date,
        )

    async def run_group(self, group_uid: str, *, start_date: str, end_date: str) -> WebMessageResponse:
        """Run every predictor in a `PredictorGroup` over the given date range."""
        return await self._run(
            f"/api/predictorGroups/{group_uid}/run",
            start_date=start_date,
            end_date=end_date,
        )

    async def _run(self, path: str, *, start_date: str, end_date: str) -> WebMessageResponse:
        """Dispatch a predictor-run POST + return the typed envelope."""
        params: dict[str, Any] = {"startDate": start_date, "endDate": end_date}
        return await self._client.post(path, body=None, params=params, model=WebMessageResponse)
Functions
__init__(client)

Bind to the sharing client.

Source code in packages/dhis2w-client/src/dhis2w_client/predictors.py
def __init__(self, client: Dhis2Client) -> None:
    """Bind to the sharing client."""
    self._client = client
list_all(*, period_type=None, page=1, page_size=50) async

Page through Predictors, optionally filtered by periodType.

Source code in packages/dhis2w-client/src/dhis2w_client/predictors.py
async def list_all(
    self,
    *,
    period_type: PeriodType | str | None = None,
    page: int = 1,
    page_size: int = 50,
) -> list[Predictor]:
    """Page through Predictors, optionally filtered by periodType."""
    filters: list[str] | None = None
    if period_type is not None:
        value = period_type.value if isinstance(period_type, PeriodType) else period_type
        filters = [f"periodType:eq:{value}"]
    return cast(
        list[Predictor],
        await self._client.resources.predictors.list(
            fields=_PREDICTOR_FIELDS,
            filters=filters,
            page=page,
            page_size=page_size,
        ),
    )
get(uid) async

Fetch one Predictor with generator, output, OU scope resolved inline.

Source code in packages/dhis2w-client/src/dhis2w_client/predictors.py
async def get(self, uid: str) -> Predictor:
    """Fetch one Predictor with generator, output, OU scope resolved inline."""
    return await self._client.get(f"/api/predictors/{uid}", model=Predictor, params={"fields": _PREDICTOR_FIELDS})
create(*, name, short_name, expression, output_data_element_uid, period_type=PeriodType.MONTHLY, sequential_sample_count=3, annual_sample_count=0, sequential_skip_count=0, organisation_unit_descendants=OrganisationUnitDescendants.SELECTED, organisation_unit_level_uids=None, output_combo_uid=None, missing_value_strategy=MissingValueStrategy.SKIP_IF_ALL_VALUES_MISSING, generator_description=None, description=None, code=None, uid=None) async

Create a Predictor.

expression uses DHIS2's aggregate expression syntax; the accessor wraps it in the generator Expression sub-object. output_data_element_uid is the target DE the prediction writes to — needs a TRACKER or AGGREGATE domain and a numeric valueType.

sequential_sample_count + annual_sample_count control the look-back window: 3 monthly samples with the default period type averages the three prior months.

organisation_unit_level_uids scopes the run — pass the UIDs of the OrganisationUnitLevel rows the predictor should cover (typically the facility level for data-entry predictors).

Source code in packages/dhis2w-client/src/dhis2w_client/predictors.py
async def create(
    self,
    *,
    name: str,
    short_name: str,
    expression: str,
    output_data_element_uid: str,
    period_type: PeriodType | str = PeriodType.MONTHLY,
    sequential_sample_count: int = 3,
    annual_sample_count: int = 0,
    sequential_skip_count: int = 0,
    organisation_unit_descendants: OrganisationUnitDescendants | str = OrganisationUnitDescendants.SELECTED,
    organisation_unit_level_uids: list[str] | None = None,
    output_combo_uid: str | None = None,
    missing_value_strategy: MissingValueStrategy | str = MissingValueStrategy.SKIP_IF_ALL_VALUES_MISSING,
    generator_description: str | None = None,
    description: str | None = None,
    code: str | None = None,
    uid: str | None = None,
) -> Predictor:
    """Create a Predictor.

    `expression` uses DHIS2's aggregate expression syntax; the
    accessor wraps it in the `generator` Expression sub-object.
    `output_data_element_uid` is the target DE the prediction writes
    to — needs a `TRACKER` or `AGGREGATE` domain and a numeric
    valueType.

    `sequential_sample_count` + `annual_sample_count` control the
    look-back window: `3` monthly samples with the default period
    type averages the three prior months.

    `organisation_unit_level_uids` scopes the run — pass the UIDs of
    the `OrganisationUnitLevel` rows the predictor should cover
    (typically the facility level for data-entry predictors).
    """
    payload: dict[str, Any] = {
        "name": name,
        "shortName": short_name,
        "periodType": period_type.value if isinstance(period_type, PeriodType) else period_type,
        "sequentialSampleCount": sequential_sample_count,
        "annualSampleCount": annual_sample_count,
        "sequentialSkipCount": sequential_skip_count,
        "organisationUnitDescendants": (
            organisation_unit_descendants.value
            if isinstance(organisation_unit_descendants, OrganisationUnitDescendants)
            else organisation_unit_descendants
        ),
        "output": {"id": output_data_element_uid},
        "generator": {
            "expression": expression,
            "missingValueStrategy": (
                missing_value_strategy.value
                if isinstance(missing_value_strategy, MissingValueStrategy)
                else missing_value_strategy
            ),
            "slidingWindow": False,
        },
    }
    if generator_description:
        payload["generator"]["description"] = generator_description
    if output_combo_uid:
        payload["outputCombo"] = {"id": output_combo_uid}
    if organisation_unit_level_uids:
        payload["organisationUnitLevels"] = [{"id": level_uid} for level_uid in organisation_unit_level_uids]
    if uid:
        payload["id"] = uid
    if code:
        payload["code"] = code
    if description:
        payload["description"] = description
    envelope = await self._client.post("/api/predictors", payload, model=WebMessageResponse)
    created_uid = envelope.created_uid or uid
    if not created_uid:
        raise RuntimeError("predictor create did not return a uid")
    return await self.get(created_uid)
update(predictor) async

PUT an edited Predictor back. predictor.id must be set.

Source code in packages/dhis2w-client/src/dhis2w_client/predictors.py
async def update(self, predictor: Predictor) -> Predictor:
    """PUT an edited Predictor back. `predictor.id` must be set."""
    if not predictor.id:
        raise ValueError("update requires predictor.id to be set")
    body = predictor.model_dump(by_alias=True, exclude_none=True, mode="json")
    await self._client.put_raw(f"/api/predictors/{predictor.id}", body=body)
    return await self.get(predictor.id)
rename(uid, *, name=None, short_name=None, description=None) async

Partial-update shortcut — read, mutate the label fields, PUT.

Source code in packages/dhis2w-client/src/dhis2w_client/predictors.py
async def rename(
    self,
    uid: str,
    *,
    name: str | None = None,
    short_name: str | None = None,
    description: str | None = None,
) -> Predictor:
    """Partial-update shortcut — read, mutate the label fields, PUT."""
    if name is None and short_name is None and description is None:
        raise ValueError("rename requires at least one of name / short_name / description")
    current = await self.get(uid)
    if name is not None:
        current.name = name
    if short_name is not None:
        current.shortName = short_name
    if description is not None:
        current.description = description
    return await self.update(current)
delete(uid) async

Delete a Predictor. DHIS2 keeps any data values it already wrote.

Source code in packages/dhis2w-client/src/dhis2w_client/predictors.py
async def delete(self, uid: str) -> None:
    """Delete a Predictor. DHIS2 keeps any data values it already wrote."""
    if not uid:
        raise ValueError("delete requires a non-empty uid")
    await self._client.resources.predictors.delete(uid)
run_all(*, start_date, end_date) async

Run every predictor on the instance for the given date range.

Returns the summary envelope — .import_count() gives imported / updated / ignored / deleted counts for the emitted data values.

Source code in packages/dhis2w-client/src/dhis2w_client/predictors.py
async def run_all(self, *, start_date: str, end_date: str) -> WebMessageResponse:
    """Run every predictor on the instance for the given date range.

    Returns the summary envelope — `.import_count()` gives
    `imported / updated / ignored / deleted` counts for the emitted
    data values.
    """
    return await self._run("/api/predictors/run", start_date=start_date, end_date=end_date)
run_one(predictor_uid, *, start_date, end_date) async

Run a single predictor by UID over the given date range.

Source code in packages/dhis2w-client/src/dhis2w_client/predictors.py
async def run_one(self, predictor_uid: str, *, start_date: str, end_date: str) -> WebMessageResponse:
    """Run a single predictor by UID over the given date range."""
    return await self._run(
        f"/api/predictors/{predictor_uid}/run",
        start_date=start_date,
        end_date=end_date,
    )
run_group(group_uid, *, start_date, end_date) async

Run every predictor in a PredictorGroup over the given date range.

Source code in packages/dhis2w-client/src/dhis2w_client/predictors.py
async def run_group(self, group_uid: str, *, start_date: str, end_date: str) -> WebMessageResponse:
    """Run every predictor in a `PredictorGroup` over the given date range."""
    return await self._run(
        f"/api/predictorGroups/{group_uid}/run",
        start_date=start_date,
        end_date=end_date,
    )