Skip to content

Auth providers

Every auth method implements the same AuthProvider Protocol (headers() + refresh_if_needed()), so the rest of the client is identical regardless of what you pick.

The Protocol

base

AuthProvider protocol for pluggable DHIS2 authentication.

Classes

AuthProvider

Bases: Protocol

Injects authentication headers into outgoing DHIS2 requests.

Source code in packages/dhis2w-client/src/dhis2w_client/v42/auth/base.py
@runtime_checkable
class AuthProvider(Protocol):
    """Injects authentication headers into outgoing DHIS2 requests."""

    async def headers(self) -> dict[str, str]:
        """Return headers to apply to the next request."""
        ...

    async def refresh_if_needed(self) -> None:
        """Refresh credentials when close to expiry; no-op for static auth."""
        ...
Functions
headers() async

Return headers to apply to the next request.

Source code in packages/dhis2w-client/src/dhis2w_client/v42/auth/base.py
async def headers(self) -> dict[str, str]:
    """Return headers to apply to the next request."""
    ...
refresh_if_needed() async

Refresh credentials when close to expiry; no-op for static auth.

Source code in packages/dhis2w-client/src/dhis2w_client/v42/auth/base.py
async def refresh_if_needed(self) -> None:
    """Refresh credentials when close to expiry; no-op for static auth."""
    ...

Basic

basic

HTTP Basic authentication provider.

Classes

BasicAuth

Bases: BaseModel

HTTP Basic auth — username/password encoded into Authorization header.

Source code in packages/dhis2w-client/src/dhis2w_client/v42/auth/basic.py
class BasicAuth(BaseModel):
    """HTTP Basic auth — username/password encoded into Authorization header."""

    model_config = ConfigDict(frozen=True)

    username: str
    password: str

    async def headers(self) -> dict[str, str]:
        """Return the Authorization: Basic header for this credential pair."""
        token = base64.b64encode(f"{self.username}:{self.password}".encode()).decode("ascii")
        return {"Authorization": f"Basic {token}"}

    async def refresh_if_needed(self) -> None:
        """Basic auth has no expiry; nothing to refresh."""
        return None
Functions
headers() async

Return the Authorization: Basic header for this credential pair.

Source code in packages/dhis2w-client/src/dhis2w_client/v42/auth/basic.py
async def headers(self) -> dict[str, str]:
    """Return the Authorization: Basic header for this credential pair."""
    token = base64.b64encode(f"{self.username}:{self.password}".encode()).decode("ascii")
    return {"Authorization": f"Basic {token}"}
refresh_if_needed() async

Basic auth has no expiry; nothing to refresh.

Source code in packages/dhis2w-client/src/dhis2w_client/v42/auth/basic.py
async def refresh_if_needed(self) -> None:
    """Basic auth has no expiry; nothing to refresh."""
    return None

Personal Access Token

pat

DHIS2 Personal Access Token authentication.

Classes

PatAuth

Bases: BaseModel

DHIS2 Personal Access Token — sent as Authorization: ApiToken .

Source code in packages/dhis2w-client/src/dhis2w_client/v42/auth/pat.py
class PatAuth(BaseModel):
    """DHIS2 Personal Access Token — sent as Authorization: ApiToken <pat>."""

    model_config = ConfigDict(frozen=True)

    token: str

    async def headers(self) -> dict[str, str]:
        """Return the Authorization: ApiToken header."""
        return {"Authorization": f"ApiToken {self.token}"}

    async def refresh_if_needed(self) -> None:
        """PATs are long-lived; nothing to refresh."""
        return None
Functions
headers() async

Return the Authorization: ApiToken header.

Source code in packages/dhis2w-client/src/dhis2w_client/v42/auth/pat.py
async def headers(self) -> dict[str, str]:
    """Return the Authorization: ApiToken header."""
    return {"Authorization": f"ApiToken {self.token}"}
refresh_if_needed() async

PATs are long-lived; nothing to refresh.

Source code in packages/dhis2w-client/src/dhis2w_client/v42/auth/pat.py
async def refresh_if_needed(self) -> None:
    """PATs are long-lived; nothing to refresh."""
    return None

OAuth2 / OIDC

oauth2

OAuth 2.1 authorization-code flow with PKCE for DHIS2 OpenID Connect.

Attributes

RedirectCapturer = Callable[[str, str], Awaitable[str]] module-attribute

Callable signature for the redirect-receiver hook.

Takes (auth_url, expected_state) and returns the authorization code. The default implementation calls capture_code() with sensible defaults; tests and specialised callers (e.g. d2w profile verify's "don't open a browser, just fail" probe) can inject their own implementation here.

DEFAULT_REDIRECT_PORT = 8765 module-attribute

Loopback port the OAuth2 redirect receiver listens on by default.

DEFAULT_REDIRECT_URI = f'http://localhost:{DEFAULT_REDIRECT_PORT}' module-attribute

Canonical loopback redirect URI used by every CLI / service / example default.

Single source of truth so the port number doesn't drift across the six profile / dev / sample / oauth2-registration call sites that previously inlined "http://localhost:8765". Override via the matching --redirect-uri flag (CLI) or redirect_uri keyword argument (service / library).

Classes

OAuth2Token

Bases: BaseModel

Access + refresh token pair with expiry info (unix epoch seconds).

Source code in packages/dhis2w-client/src/dhis2w_client/v42/auth/oauth2.py
class OAuth2Token(BaseModel):
    """Access + refresh token pair with expiry info (unix epoch seconds)."""

    access_token: str
    refresh_token: str | None = None
    expires_at: float

TokenStore

Bases: Protocol

Persists OAuth2 tokens across runs — filesystem, keyring, SQLite, etc.

Source code in packages/dhis2w-client/src/dhis2w_client/v42/auth/oauth2.py
@runtime_checkable
class TokenStore(Protocol):
    """Persists OAuth2 tokens across runs — filesystem, keyring, SQLite, etc."""

    async def get(self, key: str) -> OAuth2Token | None:
        """Load tokens for `key` or return None if none stored."""
        ...

    async def set(self, key: str, token: OAuth2Token) -> None:
        """Persist tokens for `key`."""
        ...
Functions
get(key) async

Load tokens for key or return None if none stored.

Source code in packages/dhis2w-client/src/dhis2w_client/v42/auth/oauth2.py
async def get(self, key: str) -> OAuth2Token | None:
    """Load tokens for `key` or return None if none stored."""
    ...
set(key, token) async

Persist tokens for key.

Source code in packages/dhis2w-client/src/dhis2w_client/v42/auth/oauth2.py
async def set(self, key: str, token: OAuth2Token) -> None:
    """Persist tokens for `key`."""
    ...

OAuth2Auth

Authorization-code flow with PKCE against DHIS2 /oauth2/* endpoints.

Source code in packages/dhis2w-client/src/dhis2w_client/v42/auth/oauth2.py
class OAuth2Auth:
    """Authorization-code flow with PKCE against DHIS2 /oauth2/* endpoints."""

    def __init__(
        self,
        base_url: str,
        client_id: str,
        client_secret: str,
        scope: str,
        redirect_uri: str,
        token_store: TokenStore,
        store_key: str | None = None,
        redirect_capturer: RedirectCapturer | None = None,
        open_browser: bool = True,
    ) -> None:
        """Construct the provider.

        `store_key` distinguishes tokens across profiles. `redirect_capturer`
        is an optional callable `(auth_url, expected_state) -> code` that
        replaces the default `asyncio.start_server` loopback implementation
        — `dhis2w-core` injects a FastAPI-backed one here for a nicer UX.

        `open_browser=False` skips the `webbrowser.open()` call in the
        default capturer and prints the authorization URL to stderr for
        copy-paste instead. Ignored when a custom `redirect_capturer` is
        supplied — in that case, the caller owns the "how to get the URL
        in front of the user" decision.
        """
        self._base_url = base_url.rstrip("/")
        self._client_id = client_id
        self._client_secret = client_secret
        self._scope = scope
        self._redirect_uri = redirect_uri
        self._token_store = token_store
        self._store_key = store_key or f"{base_url}:{client_id}"
        self._token: OAuth2Token | None = None
        self._redirect_capturer = redirect_capturer
        self._open_browser = open_browser

    async def headers(self) -> dict[str, str]:
        """Return Authorization: Bearer <access_token>, running interactive flow if needed."""
        await self.refresh_if_needed()
        if self._token is None:
            raise OAuth2FlowError("token missing after refresh — refresh_if_needed should have set it")
        return {"Authorization": f"Bearer {self._token.access_token}"}

    async def refresh_if_needed(self) -> None:
        """Load cached token, refresh if close to expiry, run interactive flow if missing."""
        if self._token is None:
            self._token = await self._token_store.get(self._store_key)
        if self._token is None:
            self._token = await self._run_authorization_flow()
        elif self._token.expires_at < time.time() + 60:
            self._token = await self._refresh(self._token)
        await self._token_store.set(self._store_key, self._token)

    async def _run_authorization_flow(self) -> OAuth2Token:
        """Run the browser-based PKCE authorization-code flow."""
        code_verifier = secrets.token_urlsafe(96)
        digest = hashlib.sha256(code_verifier.encode("ascii")).digest()
        code_challenge = base64.urlsafe_b64encode(digest).decode("ascii").rstrip("=")
        state = secrets.token_urlsafe(16)

        auth_params = {
            "client_id": self._client_id,
            "response_type": "code",
            "redirect_uri": self._redirect_uri,
            "scope": self._scope,
            "state": state,
            "code_challenge": code_challenge,
            "code_challenge_method": "S256",
        }
        auth_url = f"{self._base_url}/oauth2/authorize?{urllib.parse.urlencode(auth_params)}"

        capturer = self._redirect_capturer or self._capture_code
        code = await capturer(auth_url, state)
        return await self._exchange_code(code, code_verifier)

    async def _capture_code(self, auth_url: str, expected_state: str) -> str:
        """Default capturer — delegate to the module-level `capture_code`."""
        return await capture_code(
            self._redirect_uri,
            expected_state,
            auth_url=auth_url,
            open_browser=self._open_browser,
        )

    async def _exchange_code(self, code: str, code_verifier: str) -> OAuth2Token:
        """Exchange an authorization code for access+refresh tokens.

        Wraps HTTP failures in `OAuth2FlowError` so callers see a clean
        actionable message instead of a raw `httpx.HTTPStatusError`
        traceback. Common failure modes: rejected client secret (DHIS2
        returns 401), redirect-URI mismatch with the OAuth2 client
        registration (400), or DHIS2-side OAuth2 misconfig (5xx).
        """
        data = {
            "grant_type": "authorization_code",
            "code": code,
            "redirect_uri": self._redirect_uri,
            "client_id": self._client_id,
            "client_secret": self._client_secret,
            "code_verifier": code_verifier,
        }
        async with httpx.AsyncClient(follow_redirects=True) as http_client:
            response = await http_client.post(f"{self._base_url}/oauth2/token", data=data)
        if response.status_code >= 400:
            raise OAuth2FlowError(_format_token_endpoint_failure("authorization-code exchange", response))
        return self._token_from_response(response.json())

    async def _refresh(self, expired: OAuth2Token) -> OAuth2Token:
        """Refresh tokens using the refresh_token grant.

        Wraps HTTP failures in `OAuth2FlowError` so callers see a clean
        actionable message ("run `d2w profile login <name>`") instead of a
        raw `httpx.HTTPStatusError` traceback. The most common case is DHIS2
        rotating its OAuth2 client (volume wiped, client UID reissued) —
        the stored refresh_token no longer matches and DHIS2 returns 400.
        """
        if expired.refresh_token is None:
            raise OAuth2FlowError(
                "access token expired and no refresh_token available — run `d2w profile login <name>` to re-authorise"
            )
        data = {
            "grant_type": "refresh_token",
            "refresh_token": expired.refresh_token,
            "client_id": self._client_id,
            "client_secret": self._client_secret,
        }
        async with httpx.AsyncClient(follow_redirects=True) as http_client:
            response = await http_client.post(f"{self._base_url}/oauth2/token", data=data)
        if response.status_code >= 400:
            raise OAuth2FlowError(
                f"token refresh failed ({response.status_code}) — "
                "stored refresh_token rejected by DHIS2. "
                "Run `d2w profile login <name>` to re-authorise."
            )
        return self._token_from_response(response.json(), fallback_refresh=expired.refresh_token)

    @staticmethod
    def _token_from_response(data: dict[str, Any], fallback_refresh: str | None = None) -> OAuth2Token:
        """Parse a token-endpoint JSON response into an OAuth2Token."""
        expires_in = float(data.get("expires_in", 3600))
        refresh = data.get("refresh_token") or fallback_refresh
        return OAuth2Token(
            access_token=str(data["access_token"]),
            refresh_token=str(refresh) if refresh else None,
            expires_at=time.time() + expires_in,
        )
Functions
__init__(base_url, client_id, client_secret, scope, redirect_uri, token_store, store_key=None, redirect_capturer=None, open_browser=True)

Construct the provider.

store_key distinguishes tokens across profiles. redirect_capturer is an optional callable (auth_url, expected_state) -> code that replaces the default asyncio.start_server loopback implementation — dhis2w-core injects a FastAPI-backed one here for a nicer UX.

open_browser=False skips the webbrowser.open() call in the default capturer and prints the authorization URL to stderr for copy-paste instead. Ignored when a custom redirect_capturer is supplied — in that case, the caller owns the "how to get the URL in front of the user" decision.

Source code in packages/dhis2w-client/src/dhis2w_client/v42/auth/oauth2.py
def __init__(
    self,
    base_url: str,
    client_id: str,
    client_secret: str,
    scope: str,
    redirect_uri: str,
    token_store: TokenStore,
    store_key: str | None = None,
    redirect_capturer: RedirectCapturer | None = None,
    open_browser: bool = True,
) -> None:
    """Construct the provider.

    `store_key` distinguishes tokens across profiles. `redirect_capturer`
    is an optional callable `(auth_url, expected_state) -> code` that
    replaces the default `asyncio.start_server` loopback implementation
    — `dhis2w-core` injects a FastAPI-backed one here for a nicer UX.

    `open_browser=False` skips the `webbrowser.open()` call in the
    default capturer and prints the authorization URL to stderr for
    copy-paste instead. Ignored when a custom `redirect_capturer` is
    supplied — in that case, the caller owns the "how to get the URL
    in front of the user" decision.
    """
    self._base_url = base_url.rstrip("/")
    self._client_id = client_id
    self._client_secret = client_secret
    self._scope = scope
    self._redirect_uri = redirect_uri
    self._token_store = token_store
    self._store_key = store_key or f"{base_url}:{client_id}"
    self._token: OAuth2Token | None = None
    self._redirect_capturer = redirect_capturer
    self._open_browser = open_browser
headers() async

Return Authorization: Bearer , running interactive flow if needed.

Source code in packages/dhis2w-client/src/dhis2w_client/v42/auth/oauth2.py
async def headers(self) -> dict[str, str]:
    """Return Authorization: Bearer <access_token>, running interactive flow if needed."""
    await self.refresh_if_needed()
    if self._token is None:
        raise OAuth2FlowError("token missing after refresh — refresh_if_needed should have set it")
    return {"Authorization": f"Bearer {self._token.access_token}"}
refresh_if_needed() async

Load cached token, refresh if close to expiry, run interactive flow if missing.

Source code in packages/dhis2w-client/src/dhis2w_client/v42/auth/oauth2.py
async def refresh_if_needed(self) -> None:
    """Load cached token, refresh if close to expiry, run interactive flow if missing."""
    if self._token is None:
        self._token = await self._token_store.get(self._store_key)
    if self._token is None:
        self._token = await self._run_authorization_flow()
    elif self._token.expires_at < time.time() + 60:
        self._token = await self._refresh(self._token)
    await self._token_store.set(self._store_key, self._token)

Functions

capture_code(redirect_uri, expected_state, *, auth_url, open_browser=True, timeout=300.0) async

Listen on redirect_uri's host:port for the OAuth2 redirect; return code.

Bare asyncio.start_server — no FastAPI / uvicorn dependency. Validates state and surfaces error / error_description query params raised by the IdP. The browser sees a styled HTML confirmation page either way.

auth_url is opened with webbrowser.open() once the server is listening (skip with open_browser=False; URL is then printed to stderr so the user can paste it into any browser). timeout bounds the wait — raises OAuth2FlowError on timeout, state mismatch, IdP error, or missing code.

Source code in packages/dhis2w-client/src/dhis2w_client/v42/auth/oauth2.py
async def capture_code(
    redirect_uri: str,
    expected_state: str,
    *,
    auth_url: str,
    open_browser: bool = True,
    timeout: float = 300.0,
) -> str:
    """Listen on `redirect_uri`'s host:port for the OAuth2 redirect; return `code`.

    Bare `asyncio.start_server` — no FastAPI / uvicorn dependency. Validates
    `state` and surfaces `error` / `error_description` query params raised
    by the IdP. The browser sees a styled HTML confirmation page either
    way.

    `auth_url` is opened with `webbrowser.open()` once the server is
    listening (skip with `open_browser=False`; URL is then printed to
    stderr so the user can paste it into any browser). `timeout` bounds
    the wait — raises `OAuth2FlowError` on timeout, state mismatch, IdP
    error, or missing code.
    """
    parsed = urllib.parse.urlparse(redirect_uri)
    host = parsed.hostname or "localhost"
    port = parsed.port or DEFAULT_REDIRECT_PORT

    loop = asyncio.get_running_loop()
    captured: asyncio.Future[str] = loop.create_future()

    async def handle(reader: asyncio.StreamReader, writer: asyncio.StreamWriter) -> None:
        request_line = (await reader.readline()).decode("latin-1")
        while (await reader.readline()).strip():
            pass
        try:
            path = request_line.split(" ", 2)[1]
        except IndexError:
            path = ""
        params = {k: v[0] for k, v in urllib.parse.parse_qs(urllib.parse.urlparse(path).query).items() if v}

        status_line, body = b"HTTP/1.1 200 OK\r\n", b""
        try:
            error = params.get("error")
            if error:
                description = params.get("error_description") or error
                status_line = b"HTTP/1.1 400 Bad Request\r\n"
                body = _render_html(heading="Authentication failed", body=description, success=False)
                if not captured.done():
                    captured.set_exception(OAuth2FlowError(f"authorization failed: {description}"))
                return
            if params.get("state") != expected_state:
                status_line = b"HTTP/1.1 400 Bad Request\r\n"
                body = _render_html(heading="Authentication failed", body="State mismatch.", success=False)
                if not captured.done():
                    captured.set_exception(OAuth2FlowError("state mismatch — possible CSRF"))
                return
            code = params.get("code")
            if not code:
                status_line = b"HTTP/1.1 400 Bad Request\r\n"
                body = _render_html(
                    heading="Authentication failed", body="No authorization code in redirect.", success=False
                )
                if not captured.done():
                    captured.set_exception(OAuth2FlowError("no authorization code returned in redirect"))
                return
            body = _render_html(
                heading="Authentication successful",
                body="You can close this tab and return to the terminal.",
                success=True,
            )
            if not captured.done():
                captured.set_result(code)
        finally:
            writer.write(status_line)
            writer.write(b"Content-Type: text/html; charset=utf-8\r\n")
            writer.write(f"Content-Length: {len(body)}\r\n".encode("ascii"))
            writer.write(b"Connection: close\r\n\r\n")
            writer.write(body)
            with contextlib.suppress(Exception):  # best-effort teardown
                await writer.drain()
                writer.close()
                await writer.wait_closed()

    server = await asyncio.start_server(handle, host, port)
    try:
        if open_browser:
            webbrowser.open(auth_url)
        else:
            print(  # noqa: T201 — user-facing copy-paste prompt
                f"\nOpen this URL in a browser to authenticate:\n\n  {auth_url}\n\n"
                f"Waiting for redirect to {redirect_uri} ...",
                file=sys.stderr,
                flush=True,
            )
        try:
            return await asyncio.wait_for(captured, timeout=timeout)
        except TimeoutError as exc:
            raise OAuth2FlowError(f"no OAuth2 redirect received within {timeout}s") from exc
    finally:
        server.close()
        with contextlib.suppress(Exception):
            await server.wait_closed()