Typed models + client accessor for DHIS2 maintenance + data-integrity + task-notification APIs.
DataIntegrityCheck and DataIntegrityIssue come from
dhis2w_client.generated.v42.oas. DataIntegrityResult and
DataIntegrityReport stay hand-written — OpenAPI splits the result into
separate DataIntegrityDetails / DataIntegritySummary shapes, but this
module's callers want the merged view + the client-side {check_name: result}
map. Notification re-exports the OAS type so callers get typed
category: JobType, dataType: NotificationDataType, level: NotificationLevel
enums + time: datetime.
MaintenanceAccessor (bound to Dhis2Client.maintenance) exposes the
data-integrity read paths. iter_integrity_issues is the ergonomic
entry point for large runs — yields one issue at a time tagged with its
owning check's metadata, so callers don't have to walk the
{check_name: {issues: [...]}} two-level shape themselves.
Classes
DataIntegrityCheck
Bases: BaseModel
OpenAPI schema DataIntegrityCheck.
Source code in packages/dhis2w-client/src/dhis2w_client/generated/v42/oas/data_integrity_check.py
| class DataIntegrityCheck(_BaseModel):
"""OpenAPI schema `DataIntegrityCheck`."""
model_config = _ConfigDict(extra="allow", populate_by_name=True, defer_build=True)
averageExecutionTime: int | None = None
code: str | None = None
description: str | None = None
displayName: str | None = None
introduction: str | None = None
isProgrammatic: bool | None = None
isSlow: bool | None = None
issuesIdType: str | None = None
name: str | None = None
recommendation: str | None = None
section: str | None = None
sectionOrder: int | None = None
severity: DataIntegritySeverity | None = None
|
DataIntegrityIssue
Bases: BaseModel
OpenAPI schema DataIntegrityIssue.
Source code in packages/dhis2w-client/src/dhis2w_client/generated/v42/oas/data_integrity_issue.py
| class DataIntegrityIssue(_BaseModel):
"""OpenAPI schema `DataIntegrityIssue`."""
model_config = _ConfigDict(extra="allow", populate_by_name=True, defer_build=True)
comment: str | None = None
id: str | None = None
name: str | None = None
refs: list[str] | None = None
|
Notification
Bases: BaseModel
OpenAPI schema Notification.
Source code in packages/dhis2w-client/src/dhis2w_client/generated/v42/oas/notification.py
| class Notification(_BaseModel):
"""OpenAPI schema `Notification`."""
model_config = _ConfigDict(extra="allow", populate_by_name=True, defer_build=True)
category: JobType | None = None
completed: bool | None = None
data: Any | None = None
dataType: NotificationDataType | None = None
id: str | None = None
level: NotificationLevel | None = None
message: str | None = None
time: datetime | None = None
uid: str | None = None
|
JobType
Bases: StrEnum
JobType.
Source code in packages/dhis2w-client/src/dhis2w_client/generated/v42/oas/_enums.py
| class JobType(StrEnum):
"""JobType."""
DATA_INTEGRITY = "DATA_INTEGRITY"
DATA_INTEGRITY_DETAILS = "DATA_INTEGRITY_DETAILS"
RESOURCE_TABLE = "RESOURCE_TABLE"
ANALYTICS_TABLE = "ANALYTICS_TABLE"
CONTINUOUS_ANALYTICS_TABLE = "CONTINUOUS_ANALYTICS_TABLE"
SINGLE_EVENT_DATA_SYNC = "SINGLE_EVENT_DATA_SYNC"
TRACKED_ENTITY_DATA_SYNC = "TRACKED_ENTITY_DATA_SYNC"
DATA_SYNC = "DATA_SYNC"
META_DATA_SYNC = "META_DATA_SYNC"
AGGREGATE_DATA_EXCHANGE = "AGGREGATE_DATA_EXCHANGE"
SEND_SCHEDULED_MESSAGE = "SEND_SCHEDULED_MESSAGE"
PROGRAM_NOTIFICATIONS = "PROGRAM_NOTIFICATIONS"
MONITORING = "MONITORING"
PUSH_ANALYSIS = "PUSH_ANALYSIS"
HTML_PUSH_ANALYTICS = "HTML_PUSH_ANALYTICS"
TRACKER_SEARCH_OPTIMIZATION = "TRACKER_SEARCH_OPTIMIZATION"
PREDICTOR = "PREDICTOR"
MATERIALIZED_SQL_VIEW_UPDATE = "MATERIALIZED_SQL_VIEW_UPDATE"
DISABLE_INACTIVE_USERS = "DISABLE_INACTIVE_USERS"
TEST = "TEST"
LOCK_EXCEPTION_CLEANUP = "LOCK_EXCEPTION_CLEANUP"
MOCK = "MOCK"
SMS_SEND = "SMS_SEND"
SMS_INBOUND_PROCESSING = "SMS_INBOUND_PROCESSING"
TRACKER_IMPORT_JOB = "TRACKER_IMPORT_JOB"
TRACKER_IMPORT_NOTIFICATION_JOB = "TRACKER_IMPORT_NOTIFICATION_JOB"
TRACKER_IMPORT_RULE_ENGINE_JOB = "TRACKER_IMPORT_RULE_ENGINE_JOB"
IMAGE_PROCESSING = "IMAGE_PROCESSING"
COMPLETE_DATA_SET_REGISTRATION_IMPORT = "COMPLETE_DATA_SET_REGISTRATION_IMPORT"
DATAVALUE_IMPORT_INTERNAL = "DATAVALUE_IMPORT_INTERNAL"
METADATA_IMPORT = "METADATA_IMPORT"
DATAVALUE_IMPORT = "DATAVALUE_IMPORT"
GEOJSON_IMPORT = "GEOJSON_IMPORT"
GML_IMPORT = "GML_IMPORT"
HOUSEKEEPING = "HOUSEKEEPING"
DATA_VALUE_TRIM = "DATA_VALUE_TRIM"
DATA_SET_NOTIFICATION = "DATA_SET_NOTIFICATION"
CREDENTIALS_EXPIRY_ALERT = "CREDENTIALS_EXPIRY_ALERT"
DATA_STATISTICS = "DATA_STATISTICS"
FILE_RESOURCE_CLEANUP = "FILE_RESOURCE_CLEANUP"
ACCOUNT_EXPIRY_ALERT = "ACCOUNT_EXPIRY_ALERT"
VALIDATION_RESULTS_NOTIFICATION = "VALIDATION_RESULTS_NOTIFICATION"
REMOVE_USED_OR_EXPIRED_RESERVED_VALUES = "REMOVE_USED_OR_EXPIRED_RESERVED_VALUES"
SYSTEM_VERSION_UPDATE_CHECK = "SYSTEM_VERSION_UPDATE_CHECK"
|
NotificationDataType
Bases: StrEnum
NotificationDataType.
Source code in packages/dhis2w-client/src/dhis2w_client/generated/v42/oas/_enums.py
| class NotificationDataType(StrEnum):
"""NotificationDataType."""
PARAMETERS = "PARAMETERS"
|
NotificationLevel
Bases: StrEnum
NotificationLevel.
Source code in packages/dhis2w-client/src/dhis2w_client/generated/v42/oas/_enums.py
| class NotificationLevel(StrEnum):
"""NotificationLevel."""
OFF = "OFF"
DEBUG = "DEBUG"
LOOP = "LOOP"
INFO = "INFO"
WARN = "WARN"
ERROR = "ERROR"
|
DataIntegrityResult
Bases: BaseModel
Result of one check — populated after the async job completes.
Merges OpenAPI's DataIntegrityDetails (has issues[]) and
DataIntegritySummary (has count) into one caller-friendly shape.
Both modes populate startTime / finishedTime once the job has run;
an unrun check returns the definition block alone.
Source code in packages/dhis2w-client/src/dhis2w_client/maintenance.py
| class DataIntegrityResult(BaseModel):
"""Result of one check — populated after the async job completes.
Merges OpenAPI's `DataIntegrityDetails` (has `issues[]`) and
`DataIntegritySummary` (has `count`) into one caller-friendly shape.
Both modes populate `startTime` / `finishedTime` once the job has run;
an unrun check returns the definition block alone.
"""
model_config = ConfigDict(extra="allow")
name: str
displayName: str | None = None
section: str | None = None
severity: str | None = None
code: str | None = None
count: int | None = None
issues: list[DataIntegrityIssue] = Field(default_factory=list)
startTime: str | None = None
finishedTime: str | None = None
averageExecutionTime: int | None = None
|
DataIntegrityReport
Bases: BaseModel
/api/dataIntegrity/summary or /details response — keyed by check name.
Hand-written: DHIS2 returns {check_name: result} — a client-side convenience
shape not in OpenAPI. The from_api classmethod hides the raw-dict detail.
Source code in packages/dhis2w-client/src/dhis2w_client/maintenance.py
| class DataIntegrityReport(BaseModel):
"""`/api/dataIntegrity/summary` or `/details` response — keyed by check name.
Hand-written: DHIS2 returns `{check_name: result}` — a client-side convenience
shape not in OpenAPI. The `from_api` classmethod hides the raw-dict detail.
"""
model_config = ConfigDict(extra="allow")
results: dict[str, DataIntegrityResult] = Field(default_factory=dict)
@classmethod
def from_api(cls, raw: dict[str, Any]) -> DataIntegrityReport:
"""Validate the raw `{check_name: {...}}` dict DHIS2 returns into a typed report."""
results = {name: DataIntegrityResult.model_validate(body) for name, body in raw.items()}
return cls(results=results)
|
Functions
from_api(raw)
classmethod
Validate the raw {check_name: {...}} dict DHIS2 returns into a typed report.
Source code in packages/dhis2w-client/src/dhis2w_client/maintenance.py
| @classmethod
def from_api(cls, raw: dict[str, Any]) -> DataIntegrityReport:
"""Validate the raw `{check_name: {...}}` dict DHIS2 returns into a typed report."""
results = {name: DataIntegrityResult.model_validate(body) for name, body in raw.items()}
return cls(results=results)
|
IntegrityIssueRow
Bases: BaseModel
One issue from a data-integrity run, tagged with its owning check's metadata.
iter_integrity_issues yields these as a flat stream so callers can
filter / transform without walking the two-level
{check_name: {issues: [...]}} shape themselves.
Source code in packages/dhis2w-client/src/dhis2w_client/maintenance.py
| class IntegrityIssueRow(BaseModel):
"""One issue from a data-integrity run, tagged with its owning check's metadata.
`iter_integrity_issues` yields these as a flat stream so callers can
filter / transform without walking the two-level
`{check_name: {issues: [...]}}` shape themselves.
"""
model_config = ConfigDict(frozen=True)
check_name: str
check_display_name: str | None = None
severity: str | None = None
issue: DataIntegrityIssue
|
MaintenanceAccessor
Dhis2Client.maintenance — read paths for the data-integrity surface.
Writes (kicking off a run, clearing cache) stay on the plugin-layer
service in dhis2w_core for now — those need a Profile for OAuth2
token-store keying, which the raw client doesn't know about.
Source code in packages/dhis2w-client/src/dhis2w_client/maintenance.py
| class MaintenanceAccessor:
"""`Dhis2Client.maintenance` — read paths for the data-integrity surface.
Writes (kicking off a run, clearing cache) stay on the plugin-layer
service in `dhis2w_core` for now — those need a `Profile` for OAuth2
token-store keying, which the raw client doesn't know about.
"""
def __init__(self, client: Dhis2Client) -> None:
"""Bind to the sharing client — reuses its auth + HTTP pool for every request."""
self._client = client
async def get_integrity_report(
self,
*,
checks: Sequence[str] | None = None,
details: bool = True,
) -> DataIntegrityReport:
"""Fetch the full `/api/dataIntegrity/{details|summary}` report as a typed model.
`details=True` (the default) populates `issues[]` on each result;
`details=False` hits the cheaper `/summary` endpoint which returns
just counts + timing. Pass `checks` to narrow to specific check
names (from `list_dataintegrity_checks`); omit for every check
the last run produced.
"""
path = "/api/dataIntegrity/details" if details else "/api/dataIntegrity/summary"
params: dict[str, list[str]] = {"checks": list(checks)} if checks else {}
raw = await self._client.get_raw(path, params=params or None)
return DataIntegrityReport.from_api(raw)
async def iter_integrity_issues(
self,
*,
checks: Sequence[str] | None = None,
) -> AsyncIterator[IntegrityIssueRow]:
"""Stream every issue from `/api/dataIntegrity/details` one at a time.
DHIS2's endpoint returns the whole `{check_name: {issues: [...]}}`
structure in one response (no server-side pagination). This helper
still buys you:
- A flat stream — `async for row in ...` instead of nested loops.
- Tagged rows — each yielded `IntegrityIssueRow` carries the
owning check's name + display name + severity, so the caller
knows the provenance without a second lookup.
- Early break — stop iteration mid-stream without building the
full list in memory on the Python side.
Issues yield in the order DHIS2 returns checks, then the order
of that check's `issues[]` list — stable across runs.
"""
report = await self.get_integrity_report(checks=checks, details=True)
for check_name, result in report.results.items():
for issue in result.issues:
yield IntegrityIssueRow(
check_name=check_name,
check_display_name=result.displayName,
severity=result.severity,
issue=issue,
)
|
Functions
__init__(client)
Bind to the sharing client — reuses its auth + HTTP pool for every request.
Source code in packages/dhis2w-client/src/dhis2w_client/maintenance.py
| def __init__(self, client: Dhis2Client) -> None:
"""Bind to the sharing client — reuses its auth + HTTP pool for every request."""
self._client = client
|
get_integrity_report(*, checks=None, details=True)
async
Fetch the full /api/dataIntegrity/{details|summary} report as a typed model.
details=True (the default) populates issues[] on each result;
details=False hits the cheaper /summary endpoint which returns
just counts + timing. Pass checks to narrow to specific check
names (from list_dataintegrity_checks); omit for every check
the last run produced.
Source code in packages/dhis2w-client/src/dhis2w_client/maintenance.py
| async def get_integrity_report(
self,
*,
checks: Sequence[str] | None = None,
details: bool = True,
) -> DataIntegrityReport:
"""Fetch the full `/api/dataIntegrity/{details|summary}` report as a typed model.
`details=True` (the default) populates `issues[]` on each result;
`details=False` hits the cheaper `/summary` endpoint which returns
just counts + timing. Pass `checks` to narrow to specific check
names (from `list_dataintegrity_checks`); omit for every check
the last run produced.
"""
path = "/api/dataIntegrity/details" if details else "/api/dataIntegrity/summary"
params: dict[str, list[str]] = {"checks": list(checks)} if checks else {}
raw = await self._client.get_raw(path, params=params or None)
return DataIntegrityReport.from_api(raw)
|
iter_integrity_issues(*, checks=None)
async
Stream every issue from /api/dataIntegrity/details one at a time.
DHIS2's endpoint returns the whole {check_name: {issues: [...]}}
structure in one response (no server-side pagination). This helper
still buys you:
- A flat stream —
async for row in ... instead of nested loops.
- Tagged rows — each yielded
IntegrityIssueRow carries the
owning check's name + display name + severity, so the caller
knows the provenance without a second lookup.
- Early break — stop iteration mid-stream without building the
full list in memory on the Python side.
Issues yield in the order DHIS2 returns checks, then the order
of that check's issues[] list — stable across runs.
Source code in packages/dhis2w-client/src/dhis2w_client/maintenance.py
| async def iter_integrity_issues(
self,
*,
checks: Sequence[str] | None = None,
) -> AsyncIterator[IntegrityIssueRow]:
"""Stream every issue from `/api/dataIntegrity/details` one at a time.
DHIS2's endpoint returns the whole `{check_name: {issues: [...]}}`
structure in one response (no server-side pagination). This helper
still buys you:
- A flat stream — `async for row in ...` instead of nested loops.
- Tagged rows — each yielded `IntegrityIssueRow` carries the
owning check's name + display name + severity, so the caller
knows the provenance without a second lookup.
- Early break — stop iteration mid-stream without building the
full list in memory on the Python side.
Issues yield in the order DHIS2 returns checks, then the order
of that check's `issues[]` list — stable across runs.
"""
report = await self.get_integrity_report(checks=checks, details=True)
for check_name, result in report.results.items():
for issue in result.issues:
yield IntegrityIssueRow(
check_name=check_name,
check_display_name=result.displayName,
severity=result.severity,
issue=issue,
)
|