Skip to content

Query language (dhis2w_ql)

dhis2w_ql is the engine behind d2ql and d2path: a pipeline query/transform language with an embedded expression language (d2path) that navigates and computes over data with collection semantics. It is a pure, dependency-light package (pydantic only) — the DHIS2 binding (data sources, pushdown compiler, CLI, MCP) lives in the query plugin in dhis2w-core.

When to reach for it

  • Parse a d2ql program or d2path expression to a typed AST (parse, parse_pipeline, parse_expression).
  • Evaluate d2path over your own data (Evaluator) — DHIS2 models, FHIR JSON, fixtures.
  • Run a parsed program against any source via the engine (QueryEngine + a ResourceBinder); split pushdown from local work with plan_pipeline.
  • Browse or generate examples (SAMPLES, generate).

Worked example — evaluate d2path over JSON

from dhis2w_ql import Evaluator, parse_expression

patient = {"name": [{"use": "official", "given": ["Ada"], "family": "King"}]}
result = Evaluator().evaluate(parse_expression('name.where(use = "official").family'), [patient])
assert result == ["King"]

Worked example — run a program over an in-memory source

import asyncio

from dhis2w_ql import InMemoryBinder, QueryEngine, parse

rows = [{"id": "a1", "name": "ANC", "domainType": "AGGREGATE"}]
engine = QueryEngine(parse('dataElements | where domainType = "AGGREGATE" | transform { code: id }'),
                     InMemoryBinder({"dataElements": rows}))
print(asyncio.run(engine.run_terminal()).rows)  # [{'code': 'a1'}]

Reference

dhis2w_ql

  • d2ql is the pipeline language (source | where | select | transform | order | limit >> sink), with define / define function library definitions.
  • d2path (in dhis2w_ql.d2path) is the embedded expression language: path navigation, operators, and functions with collection semantics.
  • dhis2w_ql.engine runs a parsed library against a ResourceBinder, pushing supported work down to the source and evaluating the rest locally.

Classes

AggregateStage

Bases: _Node

Group rows by a key expression and reduce each group with aggregate expressions.

group by <group> { total: sum(value), n: count() } — each aggregation expression is evaluated against the group's rows, so value gathers that field across the group.

Source code in packages/dhis2w-ql/src/dhis2w_ql/ast.py
class AggregateStage(_Node):
    """Group rows by a key expression and reduce each group with aggregate expressions.

    `group by <group> { total: sum(value), n: count() }` — each aggregation expression is
    evaluated against the group's rows, so `value` gathers that field across the group.
    """

    kind: Literal["aggregate"] = "aggregate"
    group: Expr
    aggregations: ObjectExpr

CallSource

Bases: _Node

A source written as a named call with keyword arguments (e.g. analytics(dx: "...", pe: "...")).

Source code in packages/dhis2w-ql/src/dhis2w_ql/ast.py
class CallSource(_Node):
    """A source written as a named call with keyword arguments (e.g. `analytics(dx: "...", pe: "...")`)."""

    kind: Literal["call"] = "call"
    name: str
    args: list[ObjectField] = Field(default_factory=list)

Define

Bases: _Node

A named query definition: define NAME: <pipeline>.

Source code in packages/dhis2w-ql/src/dhis2w_ql/ast.py
class Define(_Node):
    """A named query definition: `define NAME: <pipeline>`."""

    kind: Literal["define"] = "define"
    name: str
    body: Pipeline

DefineFunction

Bases: _Node

A named function definition: define function NAME(params): <expr>.

Source code in packages/dhis2w-ql/src/dhis2w_ql/ast.py
class DefineFunction(_Node):
    """A named function definition: `define function NAME(params): <expr>`."""

    kind: Literal["define_function"] = "define_function"
    name: str
    params: list[str] = Field(default_factory=list)
    body: Expr

FoldStage

Bases: _Node

Collapse the whole stream into a single object (e.g. a FHIR Bundle or GeoJSON FeatureCollection).

fold { resourceType: "Bundle", entry: $rows } — the template is built once with the entire stream in focus; $rows is the stream as a list, and aggregate/select functions see all rows.

Source code in packages/dhis2w-ql/src/dhis2w_ql/ast.py
class FoldStage(_Node):
    """Collapse the whole stream into a single object (e.g. a FHIR Bundle or GeoJSON FeatureCollection).

    `fold { resourceType: "Bundle", entry: $rows }` — the template is built once with the entire
    stream in focus; `$rows` is the stream as a list, and aggregate/`select` functions see all rows.
    """

    kind: Literal["fold"] = "fold"
    template: ObjectExpr

Library

Bases: _Node

A whole d2ql program: zero or more definitions plus an optional terminal pipeline.

Source code in packages/dhis2w-ql/src/dhis2w_ql/ast.py
class Library(_Node):
    """A whole d2ql program: zero or more definitions plus an optional terminal pipeline."""

    definitions: list[Definition] = Field(default_factory=list)
    terminal: Pipeline | None = None

Pipeline

Bases: _Node

A source feeding a chain of stages, optionally ending in a sink.

Source code in packages/dhis2w-ql/src/dhis2w_ql/ast.py
class Pipeline(_Node):
    """A source feeding a chain of stages, optionally ending in a sink."""

    source: Source
    stages: list[Stage] = Field(default_factory=list)
    sink: Sink | None = None

EvalContext

Bases: BaseModel

Lexical scope for one evaluation: $this, $index, and any bound function parameters.

Source code in packages/dhis2w-ql/src/dhis2w_ql/d2path/evaluator.py
class EvalContext(BaseModel):
    """Lexical scope for one evaluation: `$this`, `$index`, and any bound function parameters."""

    model_config = ConfigDict(frozen=True, arbitrary_types_allowed=True)

    variables: dict[str, list[Any]] = {}

    def child(self, **updates: list[Any]) -> EvalContext:
        """Return a new context with the given variables overlaid on this one."""
        return EvalContext(variables={**self.variables, **updates})
Functions
child(**updates)

Return a new context with the given variables overlaid on this one.

Source code in packages/dhis2w-ql/src/dhis2w_ql/d2path/evaluator.py
def child(self, **updates: list[Any]) -> EvalContext:
    """Return a new context with the given variables overlaid on this one."""
    return EvalContext(variables={**self.variables, **updates})

Evaluator

Evaluates d2ql expressions against a focus collection.

Source code in packages/dhis2w-ql/src/dhis2w_ql/d2path/evaluator.py
class Evaluator:
    """Evaluates d2ql expressions against a focus collection."""

    def __init__(self, resolver: Resolver | None = None) -> None:
        """Bind an optional resolver for library-level variable/function references."""
        self._resolver = resolver
        self._active_functions: set[str] = set()

    def evaluate(self, expr: Expr, focus: list[Any], context: EvalContext | None = None) -> list[Any]:
        """Evaluate `expr` against `focus`, returning a collection (list of values)."""
        return self._eval(expr, focus, context or EvalContext())

    def call_function(
        self, definition: DefineFunction, args: list[list[Any]], focus: list[Any], context: EvalContext
    ) -> list[Any]:
        """Invoke a user-defined function, overlaying its params on the caller's scope.

        The caller context carries through so `$this`/`$index` stay visible inside an extracted
        helper; the function name is tracked to reject recursive calls instead of crashing.
        """
        if len(args) != len(definition.params):
            raise EvaluationError(
                f"function {definition.name!r} expects {len(definition.params)} argument(s), got {len(args)}"
            )
        if definition.name in self._active_functions:
            raise SemanticError(f"recursive function {definition.name!r}")
        bound = dict(zip(definition.params, args, strict=True))
        self._active_functions.add(definition.name)
        try:
            return self._eval(definition.body, focus, context.child(**bound))
        finally:
            self._active_functions.discard(definition.name)

    def per_item(self, expr: Expr, item: Any, index: int, context: EvalContext) -> list[Any]:
        """Evaluate `expr` against a single item, binding `$this`/`$index` for that item."""
        scope = context.child(this=[item], index=[index])
        return self._eval(expr, [item], scope)

    # ------------------------------------------------------------------ dispatch

    def _eval(self, expr: Expr, focus: list[Any], context: EvalContext) -> list[Any]:
        match expr:
            case LiteralExpr():
                return [] if expr.value is None and expr.literal_type == "null" else [expr.value]
            case NameExpr():
                return flatten_member(focus, expr.name)
            case VariableExpr():
                return self._eval_variable(expr, context)
            case MemberExpr():
                return flatten_member(self._eval(expr.target, focus, context), expr.name)
            case IndexExpr():
                return self._eval_index(expr, focus, context)
            case CallExpr():
                return self._eval_call(expr, focus, context)
            case UnaryExpr():
                return self._eval_unary(expr, focus, context)
            case BinaryExpr():
                return self._eval_binary(expr, focus, context)
            case ObjectExpr():
                return [self.build_object(expr, focus, context)]
            case ArrayExpr():
                return [[collapse(self._eval(item, focus, context)) for item in expr.items]]

    def build_object(self, expr: ObjectExpr, focus: list[Any], context: EvalContext) -> dict[str, Any]:
        """Build a single dict from an object constructor, collapsing each field value."""
        return {field.name: collapse(self._eval(field.value, focus, context)) for field in expr.fields}

    def _eval_variable(self, expr: VariableExpr, context: EvalContext) -> list[Any]:
        if expr.name in context.variables:
            return context.variables[expr.name]
        if self._resolver is not None:
            resolved = self._resolver.resolve_variable(expr.name)
            if resolved is not None:
                return resolved
        raise EvaluationError(f"unknown variable ${expr.name}")

    def _eval_index(self, expr: IndexExpr, focus: list[Any], context: EvalContext) -> list[Any]:
        target = self._eval(expr.target, focus, context)
        index_values = self._eval(expr.index, focus, context)
        if not index_values:
            return []
        index = index_values[0]
        if isinstance(index, bool):
            raise EvaluationError("index must be an integer or a string key")
        if isinstance(index, int):
            return [target[index]] if -len(target) <= index < len(target) else []
        if isinstance(index, str):
            # A string subscript is member access by key, mirroring `.name` — this reaches fields
            # whose names aren't identifiers (hyphens, spaces, leading digits).
            return flatten_member(target, index)
        raise EvaluationError("index must be an integer or a string key")

    def _eval_call(self, expr: CallExpr, focus: list[Any], context: EvalContext) -> list[Any]:
        target_focus = self._eval(expr.target, focus, context) if expr.target is not None else focus
        builtin = fn.lookup(expr.name)
        if builtin is not None:
            return builtin(self, target_focus, expr.args, context)
        if self._resolver is not None:
            definition = self._resolver.resolve_function(expr.name)
            if definition is not None:
                args = [self._eval(arg, focus, context) for arg in expr.args]
                return self.call_function(definition, args, target_focus, context)
        raise EvaluationError(f"unknown function {expr.name!r}")

    def _eval_unary(self, expr: UnaryExpr, focus: list[Any], context: EvalContext) -> list[Any]:
        operand = self._eval(expr.operand, focus, context)
        if not operand:
            return []
        number = to_number(operand[0])
        if number is None:
            raise EvaluationError(f"unary {expr.op!r} requires a number")
        return [-number if expr.op == "-" else number]

    def _eval_binary(self, expr: BinaryExpr, focus: list[Any], context: EvalContext) -> list[Any]:
        if expr.op in ("and", "or", "xor", "implies"):
            return self._eval_logical(expr, focus, context)
        if expr.op == "is":
            return self._eval_is(expr, focus, context)
        left = self._eval(expr.left, focus, context)
        right = self._eval(expr.right, focus, context)
        if expr.op == "in":
            return [] if not left else [left[0] in _as_membership(right)]
        if expr.op == "contains":
            return self._eval_contains(left, right)
        if not left or not right:
            return []
        # Comparison/equality/match operators are existential over collections: a repeated field
        # like `name.given` matches if ANY of its values satisfies the operator (use `contains`/`in`
        # for explicit membership). Arithmetic stays scalar (operates on the first value).
        if expr.op in _EXISTENTIAL and (len(left) > 1 or len(right) > 1):
            return [_existential(expr.op, left, right)]
        return self._eval_scalar_binary(expr.op, left[0], right[0])

    def _eval_is(self, expr: BinaryExpr, focus: list[Any], context: EvalContext) -> list[Any]:
        # `is` takes a type name on the right — a bare identifier (`x is Integer`) or a string.
        left = self._eval(expr.left, focus, context)
        if not left:
            return []
        if isinstance(expr.right, NameExpr):
            type_name: Any = expr.right.name
        else:
            resolved = self._eval(expr.right, focus, context)
            type_name = resolved[0] if resolved else None
        return [_type_name(left[0]) == type_name]

    def _eval_logical(self, expr: BinaryExpr, focus: list[Any], context: EvalContext) -> list[Any]:
        # `and`/`or`/`implies` short-circuit so guarded predicates (`value != 0 and 1 / value > 0`)
        # exclude rows rather than raising; `xor` needs both sides.
        left = truthy(self._eval(expr.left, focus, context))
        match expr.op:
            case "and":
                return [False] if not left else [truthy(self._eval(expr.right, focus, context))]
            case "or":
                return [True] if left else [truthy(self._eval(expr.right, focus, context))]
            case "implies":
                return [True] if not left else [truthy(self._eval(expr.right, focus, context))]
            case _:  # xor
                return [left != truthy(self._eval(expr.right, focus, context))]

    def _eval_contains(self, left: list[Any], right: list[Any]) -> list[Any]:
        if len(left) == 1 and isinstance(left[0], str) and right and isinstance(right[0], str):
            return [right[0] in left[0]]
        return [] if not right else [right[0] in _as_membership(left)]

    def _eval_scalar_binary(self, op: str, left: Any, right: Any) -> list[Any]:
        match op:
            case "=":
                return [loose_equal(left, right)]
            case "!=":
                return [not loose_equal(left, right)]
            case "~":
                return [like(left, right)]
            case "!~":
                return [not like(left, right)]
            case "is":
                return [_type_name(left) == right]
            case "+":
                return [self._arithmetic_or_concat(left, right)]
            case "-" | "*" | "/" | "div" | "mod":
                return [self._arithmetic(op, left, right)]
            case "<" | "<=" | ">" | ">=":
                return self._relational(op, left, right)
            case _:
                raise EvaluationError(f"unsupported operator {op!r}")

    def _arithmetic_or_concat(self, left: Any, right: Any) -> Any:
        if isinstance(left, str) and isinstance(right, str):
            return left + right
        return self._arithmetic("+", left, right)

    def _arithmetic(self, op: str, left: Any, right: Any) -> float:
        left_number, right_number = to_number(left), to_number(right)
        if left_number is None or right_number is None:
            raise EvaluationError(f"operator {op!r} requires numbers")
        match op:
            case "+":
                return left_number + right_number
            case "-":
                return left_number - right_number
            case "*":
                return left_number * right_number
            case "/" | "div":
                if right_number == 0:
                    raise EvaluationError("division by zero")
                result = left_number / right_number
                return float(int(result)) if op == "div" else result
            case _:  # mod
                if right_number == 0:
                    raise EvaluationError("division by zero")
                return left_number % right_number

    def _relational(self, op: str, left: Any, right: Any) -> list[Any]:
        order = compare(left, right)
        if order is None:
            return []
        match op:
            case "<":
                return [order < 0]
            case "<=":
                return [order <= 0]
            case ">":
                return [order > 0]
            case _:  # >=
                return [order >= 0]
Functions
__init__(resolver=None)

Bind an optional resolver for library-level variable/function references.

Source code in packages/dhis2w-ql/src/dhis2w_ql/d2path/evaluator.py
def __init__(self, resolver: Resolver | None = None) -> None:
    """Bind an optional resolver for library-level variable/function references."""
    self._resolver = resolver
    self._active_functions: set[str] = set()
evaluate(expr, focus, context=None)

Evaluate expr against focus, returning a collection (list of values).

Source code in packages/dhis2w-ql/src/dhis2w_ql/d2path/evaluator.py
def evaluate(self, expr: Expr, focus: list[Any], context: EvalContext | None = None) -> list[Any]:
    """Evaluate `expr` against `focus`, returning a collection (list of values)."""
    return self._eval(expr, focus, context or EvalContext())
call_function(definition, args, focus, context)

Invoke a user-defined function, overlaying its params on the caller's scope.

The caller context carries through so $this/$index stay visible inside an extracted helper; the function name is tracked to reject recursive calls instead of crashing.

Source code in packages/dhis2w-ql/src/dhis2w_ql/d2path/evaluator.py
def call_function(
    self, definition: DefineFunction, args: list[list[Any]], focus: list[Any], context: EvalContext
) -> list[Any]:
    """Invoke a user-defined function, overlaying its params on the caller's scope.

    The caller context carries through so `$this`/`$index` stay visible inside an extracted
    helper; the function name is tracked to reject recursive calls instead of crashing.
    """
    if len(args) != len(definition.params):
        raise EvaluationError(
            f"function {definition.name!r} expects {len(definition.params)} argument(s), got {len(args)}"
        )
    if definition.name in self._active_functions:
        raise SemanticError(f"recursive function {definition.name!r}")
    bound = dict(zip(definition.params, args, strict=True))
    self._active_functions.add(definition.name)
    try:
        return self._eval(definition.body, focus, context.child(**bound))
    finally:
        self._active_functions.discard(definition.name)
per_item(expr, item, index, context)

Evaluate expr against a single item, binding $this/$index for that item.

Source code in packages/dhis2w-ql/src/dhis2w_ql/d2path/evaluator.py
def per_item(self, expr: Expr, item: Any, index: int, context: EvalContext) -> list[Any]:
    """Evaluate `expr` against a single item, binding `$this`/`$index` for that item."""
    scope = context.child(this=[item], index=[index])
    return self._eval(expr, [item], scope)
build_object(expr, focus, context)

Build a single dict from an object constructor, collapsing each field value.

Source code in packages/dhis2w-ql/src/dhis2w_ql/d2path/evaluator.py
def build_object(self, expr: ObjectExpr, focus: list[Any], context: EvalContext) -> dict[str, Any]:
    """Build a single dict from an object constructor, collapsing each field value."""
    return {field.name: collapse(self._eval(field.value, focus, context)) for field in expr.fields}

Resolver

Bases: Protocol

Resolves library-level names referenced from inside expressions.

Source code in packages/dhis2w-ql/src/dhis2w_ql/d2path/evaluator.py
@runtime_checkable
class Resolver(Protocol):
    """Resolves library-level names referenced from inside expressions."""

    def resolve_variable(self, name: str) -> list[Any] | None:
        """Return a named scalar `define`'s value as a collection, or None when unknown."""
        ...

    def resolve_function(self, name: str) -> DefineFunction | None:
        """Return a user `define function` by name, or None when unknown."""
        ...
Functions
resolve_variable(name)

Return a named scalar define's value as a collection, or None when unknown.

Source code in packages/dhis2w-ql/src/dhis2w_ql/d2path/evaluator.py
def resolve_variable(self, name: str) -> list[Any] | None:
    """Return a named scalar `define`'s value as a collection, or None when unknown."""
    ...
resolve_function(name)

Return a user define function by name, or None when unknown.

Source code in packages/dhis2w-ql/src/dhis2w_ql/d2path/evaluator.py
def resolve_function(self, name: str) -> DefineFunction | None:
    """Return a user `define function` by name, or None when unknown."""
    ...

DataSource

Bases: Protocol

A fetchable collection of rows that may satisfy part of a query natively.

Source code in packages/dhis2w-ql/src/dhis2w_ql/engine/datasource.py
@runtime_checkable
class DataSource(Protocol):
    """A fetchable collection of rows that may satisfy part of a query natively."""

    def capabilities(self) -> SourceCapabilities:
        """Report which pushdown operations this source can satisfy."""
        ...

    async def fetch(self, native: NativeQuery) -> list[Any]:
        """Fetch rows, honouring whatever portion of `native` the capabilities allow."""
        ...
Functions
capabilities()

Report which pushdown operations this source can satisfy.

Source code in packages/dhis2w-ql/src/dhis2w_ql/engine/datasource.py
def capabilities(self) -> SourceCapabilities:
    """Report which pushdown operations this source can satisfy."""
    ...
fetch(native) async

Fetch rows, honouring whatever portion of native the capabilities allow.

Source code in packages/dhis2w-ql/src/dhis2w_ql/engine/datasource.py
async def fetch(self, native: NativeQuery) -> list[Any]:
    """Fetch rows, honouring whatever portion of `native` the capabilities allow."""
    ...

InMemoryBinder

Binds resource names to in-memory row lists (test/fixture binding).

Source code in packages/dhis2w-ql/src/dhis2w_ql/engine/datasource.py
class InMemoryBinder:
    """Binds resource names to in-memory row lists (test/fixture binding)."""

    def __init__(self, resources: dict[str, list[Any]]) -> None:
        """Hold the name-to-rows mapping this binder resolves against."""
        self._resources = resources

    def bind(self, name: str) -> DataSource | None:
        """Return an in-memory source for `name`, or None when the name is unknown."""
        if name not in self._resources:
            return None
        return InMemoryDataSource(self._resources[name])

    def bind_call(self, name: str, args: dict[str, Any]) -> DataSource | None:
        """In-memory binding has no call sources; bind a name with no args, else None."""
        return self.bind(name) if not args else None
Functions
__init__(resources)

Hold the name-to-rows mapping this binder resolves against.

Source code in packages/dhis2w-ql/src/dhis2w_ql/engine/datasource.py
def __init__(self, resources: dict[str, list[Any]]) -> None:
    """Hold the name-to-rows mapping this binder resolves against."""
    self._resources = resources
bind(name)

Return an in-memory source for name, or None when the name is unknown.

Source code in packages/dhis2w-ql/src/dhis2w_ql/engine/datasource.py
def bind(self, name: str) -> DataSource | None:
    """Return an in-memory source for `name`, or None when the name is unknown."""
    if name not in self._resources:
        return None
    return InMemoryDataSource(self._resources[name])
bind_call(name, args)

In-memory binding has no call sources; bind a name with no args, else None.

Source code in packages/dhis2w-ql/src/dhis2w_ql/engine/datasource.py
def bind_call(self, name: str, args: dict[str, Any]) -> DataSource | None:
    """In-memory binding has no call sources; bind a name with no args, else None."""
    return self.bind(name) if not args else None

InMemoryDataSource

A data source backed by an in-memory list; declares no native capabilities (all-local).

Source code in packages/dhis2w-ql/src/dhis2w_ql/engine/datasource.py
class InMemoryDataSource:
    """A data source backed by an in-memory list; declares no native capabilities (all-local)."""

    def __init__(self, rows: list[Any]) -> None:
        """Hold the rows this source will return verbatim."""
        self._rows = rows

    def capabilities(self) -> SourceCapabilities:
        """In-memory sources push nothing down; the engine applies every stage locally."""
        return SourceCapabilities()

    async def fetch(self, native: NativeQuery) -> list[Any]:
        """Return all rows; pushdown is a no-op for an in-memory source."""
        return list(self._rows)
Functions
__init__(rows)

Hold the rows this source will return verbatim.

Source code in packages/dhis2w-ql/src/dhis2w_ql/engine/datasource.py
def __init__(self, rows: list[Any]) -> None:
    """Hold the rows this source will return verbatim."""
    self._rows = rows
capabilities()

In-memory sources push nothing down; the engine applies every stage locally.

Source code in packages/dhis2w-ql/src/dhis2w_ql/engine/datasource.py
def capabilities(self) -> SourceCapabilities:
    """In-memory sources push nothing down; the engine applies every stage locally."""
    return SourceCapabilities()
fetch(native) async

Return all rows; pushdown is a no-op for an in-memory source.

Source code in packages/dhis2w-ql/src/dhis2w_ql/engine/datasource.py
async def fetch(self, native: NativeQuery) -> list[Any]:
    """Return all rows; pushdown is a no-op for an in-memory source."""
    return list(self._rows)

NativeFilter

Bases: BaseModel

A single field filter pushed to the source (property OPERATOR value).

Source code in packages/dhis2w-ql/src/dhis2w_ql/engine/plan.py
class NativeFilter(BaseModel):
    """A single field filter pushed to the source (`property OPERATOR value`)."""

    model_config = ConfigDict(frozen=True)

    property: str
    operator: Literal["eq", "ne", "ilike", "like", "gt", "ge", "lt", "le", "in"]
    value: Any

NativeOrder

Bases: BaseModel

A single sort key pushed to the source.

Source code in packages/dhis2w-ql/src/dhis2w_ql/engine/plan.py
class NativeOrder(BaseModel):
    """A single sort key pushed to the source."""

    model_config = ConfigDict(frozen=True)

    property: str
    descending: bool = False

NativeQuery

Bases: BaseModel

The pushed-down portion of a pipeline against a named resource.

Source code in packages/dhis2w-ql/src/dhis2w_ql/engine/plan.py
class NativeQuery(BaseModel):
    """The pushed-down portion of a pipeline against a named resource."""

    model_config = ConfigDict(frozen=True)

    resource: str
    filters: list[NativeFilter] = Field(default_factory=list)
    root_junction: Literal["AND", "OR"] = "AND"
    order: list[NativeOrder] = Field(default_factory=list)
    limit: int | None = None
    skip: int | None = None

QueryEngine

Executes d2ql pipelines and definitions against a resource binder.

Source code in packages/dhis2w-ql/src/dhis2w_ql/engine/executor.py
class QueryEngine:
    """Executes d2ql pipelines and definitions against a resource binder."""

    def __init__(self, library: Library, binder: ResourceBinder) -> None:
        """Index the library's definitions and bind the source resolver."""
        self._library = library
        self._binder = binder
        self._evaluator = Evaluator(resolver=self)
        self._defines: dict[str, Define] = {}
        self._functions: dict[str, DefineFunction] = {}
        self._scalar_cache: dict[str, list[Any]] = {}
        self._resolving: set[str] = set()
        seen: set[str] = set()
        for definition in library.definitions:
            if definition.name in seen:
                raise SemanticError(f"duplicate definition {definition.name!r}")
            seen.add(definition.name)
            if isinstance(definition, Define):
                self._defines[definition.name] = definition
            else:
                if builtin_function(definition.name) is not None:
                    raise SemanticError(f"cannot define function {definition.name!r}: it shadows a built-in function")
                self._functions[definition.name] = definition

    # ------------------------------------------------------------------ Resolver protocol

    def resolve_variable(self, name: str) -> list[Any] | None:
        """Resolve a `$name` reference to a scalar `define`'s value, or None when not scalar."""
        if name in self._scalar_cache:
            return self._scalar_cache[name]
        define = self._defines.get(name)
        if define is None or not _is_scalar_define(define):
            return None
        if name in self._resolving:
            raise SemanticError(f"recursive definition {name!r}")
        source = define.body.source
        assert isinstance(source, ExprSource)  # guaranteed by _is_scalar_define
        self._resolving.add(name)
        try:
            value = self._evaluator.evaluate(source.expr, [])
        finally:
            self._resolving.discard(name)
        self._scalar_cache[name] = value
        return value

    def resolve_function(self, name: str) -> DefineFunction | None:
        """Resolve a `define function` by name, or None when unknown."""
        return self._functions.get(name)

    # ------------------------------------------------------------------ public run surface

    async def run_terminal(self) -> QueryResult:
        """Run the library's terminal pipeline, raising when there is none."""
        if self._library.terminal is None:
            raise SemanticError("this program has no terminal pipeline to run")
        return await self.run(self._library.terminal)

    async def run_define(self, name: str) -> QueryResult:
        """Run a named `define`'s pipeline."""
        define = self._defines.get(name)
        if define is None:
            raise SemanticError(f"unknown definition {name!r}")
        return await self.run(define.body)

    async def run(self, pipeline: Pipeline) -> QueryResult:
        """Execute a pipeline and apply its sink, returning the rows and sink metadata."""
        outcome = await self._execute(pipeline)
        if isinstance(pipeline.sink, FileSink):
            written = _write_file(pipeline.sink, outcome.rows, scalar=outcome.scalar)
            return QueryResult(
                rows=outcome.rows,
                count=len(outcome.rows),
                scalar=outcome.scalar,
                written_to=written.path,
                format=written.format,
            )
        return QueryResult(rows=outcome.rows, count=len(outcome.rows), scalar=outcome.scalar)

    # ------------------------------------------------------------------ execution

    async def _execute(self, pipeline: Pipeline) -> _StageOutcome:
        stages = self._effective_stages(pipeline)
        resolved = await self._resolve_source(pipeline, stages)
        return self._run_local(resolved.residual, resolved.rows)

    def _effective_stages(self, pipeline: Pipeline) -> list[Stage]:
        source = pipeline.source
        if isinstance(source, NameSource) and source.inline_filter is not None:
            return [WhereStage(predicate=source.inline_filter), *pipeline.stages]
        return list(pipeline.stages)

    async def _resolve_source(self, pipeline: Pipeline, stages: list[Stage]) -> _SourceResolution:
        source = pipeline.source
        if isinstance(source, NameSource):
            data_source = self._binder.bind(source.name)
            if data_source is not None:
                plan = plan_pipeline(source.name, data_source.capabilities(), stages)
                rows = await data_source.fetch(plan.native)
                return _SourceResolution(rows=rows, residual=list(plan.residual))
            define = self._defines.get(source.name)
            if define is None:
                raise SemanticError(f"unknown source {source.name!r}: not a bound resource or definition")
            if source.name in self._resolving:
                raise SemanticError(f"recursive definition {source.name!r}")
            self._resolving.add(source.name)
            try:
                inner = await self._execute(define.body)
            finally:
                self._resolving.discard(source.name)
            return _SourceResolution(rows=inner.rows, residual=list(stages))
        if isinstance(source, CallSource):
            args = {field.name: collapse(self._evaluator.evaluate(field.value, [])) for field in source.args}
            data_source = self._binder.bind_call(source.name, args)
            if data_source is None:
                raise SemanticError(f"unknown source {source.name!r}(...): no call binding for it")
            plan = plan_pipeline(source.name, data_source.capabilities(), stages)
            rows = await data_source.fetch(plan.native)
            return _SourceResolution(rows=rows, residual=list(plan.residual))
        if isinstance(source, ReadSource):
            return _SourceResolution(rows=_read_rows(source.path), residual=list(stages))
        return _SourceResolution(rows=self._evaluator.evaluate(source.expr, []), residual=list(stages))

    def _run_local(self, stages: list[Any], rows: list[Any]) -> _StageOutcome:
        outcome = _StageOutcome(rows=rows, scalar=False)
        for stage in stages:
            outcome = self._apply_stage(stage, outcome.rows)
        return outcome

    def _apply_stage(self, stage: Stage, rows: list[Any]) -> _StageOutcome:
        match stage:
            case WhereStage():
                kept = [row for index, row in enumerate(rows) if truthy(self._per_item(stage.predicate, row, index))]
                return _StageOutcome(rows=kept, scalar=False)
            case SelectStage():
                _ensure_unique([_select_key(item, i) for i, item in enumerate(stage.items)], "select")
                return _StageOutcome(rows=[self._project(stage, row, index) for index, row in enumerate(rows)])
            case TransformStage():
                built = [collapse(self._per_item(stage.template, row, index)) for index, row in enumerate(rows)]
                return _StageOutcome(rows=built, scalar=False)
            case OrderStage():
                return _StageOutcome(rows=self._order(stage, rows), scalar=False)
            case LimitStage():
                return _StageOutcome(rows=rows[: stage.count], scalar=False)
            case SkipStage():
                return _StageOutcome(rows=rows[stage.count :], scalar=False)
            case CountStage():
                return _StageOutcome(rows=[len(rows)], scalar=True)
            case AggregateStage():
                return _StageOutcome(rows=self._aggregate(stage, rows), scalar=False)
            case FoldStage():
                scope = EvalContext().child(rows=rows)
                return _StageOutcome(rows=[self._evaluator.build_object(stage.template, rows, scope)], scalar=True)
            # No catch-all: `Stage` is a closed union and every variant is handled above, so the
            # type checkers prove exhaustiveness. Adding a variant without a case here is a
            # compile-time error (missing return), which is stronger than a runtime guard.

    def _per_item(self, expr: Expr, item: Any, index: int) -> list[Any]:
        return self._evaluator.per_item(expr, item, index, EvalContext())

    def _project(self, stage: SelectStage, item: Any, index: int) -> dict[str, Any]:
        return {
            _select_key(select_item, position): collapse(self._per_item(select_item.expr, item, index))
            for position, select_item in enumerate(stage.items)
        }

    def _aggregate(self, stage: AggregateStage, rows: list[Any]) -> list[Any]:
        key_name = _derive_name(stage.group, 0)
        _ensure_unique([key_name, *(field.name for field in stage.aggregations.fields)], "group by")
        buckets: list[tuple[Any, list[Any]]] = []
        for index, row in enumerate(rows):
            key = collapse(self._per_item(stage.group, row, index))
            for existing_key, bucket in buckets:
                if existing_key == key:
                    bucket.append(row)
                    break
            else:
                buckets.append((key, [row]))
        result: list[dict[str, Any]] = []
        for key, bucket in buckets:
            aggregated = self._evaluator.build_object(stage.aggregations, bucket, EvalContext())
            result.append({key_name: key, **aggregated})
        return result

    def _order(self, stage: OrderStage, rows: list[Any]) -> list[Any]:
        # Sort keys are per-row field/value expressions; `$index` has no meaning in a comparator and
        # is intentionally fixed to 0 (it would compare equal for every row anyway).
        def comparator(left: Any, right: Any) -> int:
            for key in stage.keys:
                left_value = collapse(self._per_item(key.expr, left, 0))
                right_value = collapse(self._per_item(key.expr, right, 0))
                order = _compare_nullable(left_value, right_value)
                if order != 0:
                    return -order if key.descending else order
            return 0

        return sorted(rows, key=cmp_to_key(comparator))
Functions
__init__(library, binder)

Index the library's definitions and bind the source resolver.

Source code in packages/dhis2w-ql/src/dhis2w_ql/engine/executor.py
def __init__(self, library: Library, binder: ResourceBinder) -> None:
    """Index the library's definitions and bind the source resolver."""
    self._library = library
    self._binder = binder
    self._evaluator = Evaluator(resolver=self)
    self._defines: dict[str, Define] = {}
    self._functions: dict[str, DefineFunction] = {}
    self._scalar_cache: dict[str, list[Any]] = {}
    self._resolving: set[str] = set()
    seen: set[str] = set()
    for definition in library.definitions:
        if definition.name in seen:
            raise SemanticError(f"duplicate definition {definition.name!r}")
        seen.add(definition.name)
        if isinstance(definition, Define):
            self._defines[definition.name] = definition
        else:
            if builtin_function(definition.name) is not None:
                raise SemanticError(f"cannot define function {definition.name!r}: it shadows a built-in function")
            self._functions[definition.name] = definition
resolve_variable(name)

Resolve a $name reference to a scalar define's value, or None when not scalar.

Source code in packages/dhis2w-ql/src/dhis2w_ql/engine/executor.py
def resolve_variable(self, name: str) -> list[Any] | None:
    """Resolve a `$name` reference to a scalar `define`'s value, or None when not scalar."""
    if name in self._scalar_cache:
        return self._scalar_cache[name]
    define = self._defines.get(name)
    if define is None or not _is_scalar_define(define):
        return None
    if name in self._resolving:
        raise SemanticError(f"recursive definition {name!r}")
    source = define.body.source
    assert isinstance(source, ExprSource)  # guaranteed by _is_scalar_define
    self._resolving.add(name)
    try:
        value = self._evaluator.evaluate(source.expr, [])
    finally:
        self._resolving.discard(name)
    self._scalar_cache[name] = value
    return value
resolve_function(name)

Resolve a define function by name, or None when unknown.

Source code in packages/dhis2w-ql/src/dhis2w_ql/engine/executor.py
def resolve_function(self, name: str) -> DefineFunction | None:
    """Resolve a `define function` by name, or None when unknown."""
    return self._functions.get(name)
run_terminal() async

Run the library's terminal pipeline, raising when there is none.

Source code in packages/dhis2w-ql/src/dhis2w_ql/engine/executor.py
async def run_terminal(self) -> QueryResult:
    """Run the library's terminal pipeline, raising when there is none."""
    if self._library.terminal is None:
        raise SemanticError("this program has no terminal pipeline to run")
    return await self.run(self._library.terminal)
run_define(name) async

Run a named define's pipeline.

Source code in packages/dhis2w-ql/src/dhis2w_ql/engine/executor.py
async def run_define(self, name: str) -> QueryResult:
    """Run a named `define`'s pipeline."""
    define = self._defines.get(name)
    if define is None:
        raise SemanticError(f"unknown definition {name!r}")
    return await self.run(define.body)
run(pipeline) async

Execute a pipeline and apply its sink, returning the rows and sink metadata.

Source code in packages/dhis2w-ql/src/dhis2w_ql/engine/executor.py
async def run(self, pipeline: Pipeline) -> QueryResult:
    """Execute a pipeline and apply its sink, returning the rows and sink metadata."""
    outcome = await self._execute(pipeline)
    if isinstance(pipeline.sink, FileSink):
        written = _write_file(pipeline.sink, outcome.rows, scalar=outcome.scalar)
        return QueryResult(
            rows=outcome.rows,
            count=len(outcome.rows),
            scalar=outcome.scalar,
            written_to=written.path,
            format=written.format,
        )
    return QueryResult(rows=outcome.rows, count=len(outcome.rows), scalar=outcome.scalar)

QueryPlan

Bases: BaseModel

A pushed-down NativeQuery plus the stages the engine still runs locally.

Source code in packages/dhis2w-ql/src/dhis2w_ql/engine/plan.py
class QueryPlan(BaseModel):
    """A pushed-down `NativeQuery` plus the stages the engine still runs locally."""

    model_config = ConfigDict(frozen=True)

    native: NativeQuery
    residual: list[Annotated[Stage, Field(discriminator="kind")]] = Field(default_factory=list)

QueryResult

Bases: BaseModel

The outcome of running a pipeline: the produced rows plus sink/shape metadata.

Source code in packages/dhis2w-ql/src/dhis2w_ql/engine/executor.py
class QueryResult(BaseModel):
    """The outcome of running a pipeline: the produced rows plus sink/shape metadata."""

    model_config = ConfigDict(arbitrary_types_allowed=True)

    rows: list[Any] = Field(default_factory=list)
    count: int = 0
    scalar: bool = False
    written_to: str | None = None
    format: str | None = None

ResourceBinder

Bases: Protocol

Resolves a pipeline source name to a DataSource, or None when it is not a known resource.

Source code in packages/dhis2w-ql/src/dhis2w_ql/engine/datasource.py
@runtime_checkable
class ResourceBinder(Protocol):
    """Resolves a pipeline source name to a `DataSource`, or None when it is not a known resource."""

    def bind(self, name: str) -> DataSource | None:
        """Return a data source for `name`, or None when the name is not a bindable resource."""
        ...

    def bind_call(self, name: str, args: dict[str, Any]) -> DataSource | None:
        """Return a data source for a call source `name(args)` (e.g. analytics), or None when unknown."""
        ...
Functions
bind(name)

Return a data source for name, or None when the name is not a bindable resource.

Source code in packages/dhis2w-ql/src/dhis2w_ql/engine/datasource.py
def bind(self, name: str) -> DataSource | None:
    """Return a data source for `name`, or None when the name is not a bindable resource."""
    ...
bind_call(name, args)

Return a data source for a call source name(args) (e.g. analytics), or None when unknown.

Source code in packages/dhis2w-ql/src/dhis2w_ql/engine/datasource.py
def bind_call(self, name: str, args: dict[str, Any]) -> DataSource | None:
    """Return a data source for a call source `name(args)` (e.g. analytics), or None when unknown."""
    ...

SourceCapabilities

Bases: BaseModel

Declares which pushdown operations a source can satisfy natively.

non_pushable_paths lists field roots the source cannot filter or order on natively (e.g. a DHIS2 embedded GeoJSON geometry object): predicates touching them stay local instead of being pushed down to an endpoint that would reject them.

Source code in packages/dhis2w-ql/src/dhis2w_ql/engine/plan.py
class SourceCapabilities(BaseModel):
    """Declares which pushdown operations a source can satisfy natively.

    `non_pushable_paths` lists field roots the source cannot filter or order on natively (e.g. a
    DHIS2 embedded GeoJSON `geometry` object): predicates touching them stay local instead of being
    pushed down to an endpoint that would reject them.
    """

    model_config = ConfigDict(frozen=True)

    filter: bool = False
    order: bool = False
    paging: bool = False
    non_pushable_paths: tuple[str, ...] = ()

D2qlError

Bases: Exception

Base class for every d2ql language error.

Source code in packages/dhis2w-ql/src/dhis2w_ql/errors.py
class D2qlError(Exception):
    """Base class for every d2ql language error."""

EvaluationError

Bases: D2qlError

Raised when evaluating an expression or executing a pipeline fails at runtime.

Source code in packages/dhis2w-ql/src/dhis2w_ql/errors.py
class EvaluationError(D2qlError):
    """Raised when evaluating an expression or executing a pipeline fails at runtime."""

LexError

Bases: D2qlError

Raised when the tokenizer hits a character it cannot start a token with.

Source code in packages/dhis2w-ql/src/dhis2w_ql/errors.py
class LexError(D2qlError):
    """Raised when the tokenizer hits a character it cannot start a token with."""

    def __init__(self, message: str, *, position: int, line: int, column: int) -> None:
        """Record the offending message and its source position."""
        super().__init__(f"{message} (line {line}, column {column})")
        self.message = message
        self.position = position
        self.line = line
        self.column = column
Functions
__init__(message, *, position, line, column)

Record the offending message and its source position.

Source code in packages/dhis2w-ql/src/dhis2w_ql/errors.py
def __init__(self, message: str, *, position: int, line: int, column: int) -> None:
    """Record the offending message and its source position."""
    super().__init__(f"{message} (line {line}, column {column})")
    self.message = message
    self.position = position
    self.line = line
    self.column = column

ParseError

Bases: D2qlError

Raised when the parser cannot build an AST from the token stream.

Source code in packages/dhis2w-ql/src/dhis2w_ql/errors.py
class ParseError(D2qlError):
    """Raised when the parser cannot build an AST from the token stream."""

    def __init__(self, message: str, *, position: int, line: int, column: int) -> None:
        """Record the offending message and its source position."""
        super().__init__(f"{message} (line {line}, column {column})")
        self.message = message
        self.position = position
        self.line = line
        self.column = column
Functions
__init__(message, *, position, line, column)

Record the offending message and its source position.

Source code in packages/dhis2w-ql/src/dhis2w_ql/errors.py
def __init__(self, message: str, *, position: int, line: int, column: int) -> None:
    """Record the offending message and its source position."""
    super().__init__(f"{message} (line {line}, column {column})")
    self.message = message
    self.position = position
    self.line = line
    self.column = column

SemanticError

Bases: D2qlError

Raised when a parsed program is well-formed but cannot be resolved or planned.

Source code in packages/dhis2w-ql/src/dhis2w_ql/errors.py
class SemanticError(D2qlError):
    """Raised when a parsed program is well-formed but cannot be resolved or planned."""

GeneratedExample

Bases: BaseModel

One generated d2ql pipeline or d2path expression.

Source code in packages/dhis2w-ql/src/dhis2w_ql/generate.py
class GeneratedExample(BaseModel):
    """One generated d2ql pipeline or d2path expression."""

    model_config = ConfigDict(frozen=True)

    id: str
    category: str
    language: Literal["d2ql", "d2path"]
    source: str

SchemaSpec

Bases: BaseModel

A set of resources the generator expands into example programs.

Source code in packages/dhis2w-ql/src/dhis2w_ql/generate.py
class SchemaSpec(BaseModel):
    """A set of resources the generator expands into example programs."""

    model_config = ConfigDict(frozen=True)

    resources: list[ResourceSpec]

Sample

Bases: BaseModel

One illustrative d2ql pipeline or d2path expression with metadata for display and testing.

Source code in packages/dhis2w-ql/src/dhis2w_ql/samples.py
class Sample(BaseModel):
    """One illustrative d2ql pipeline or d2path expression with metadata for display and testing."""

    model_config = ConfigDict(frozen=True)

    id: str
    title: str
    category: str
    language: Literal["d2ql", "d2path"]
    source: str
    description: str
    needs_profile: bool = True

Token

Bases: BaseModel

A single lexed token with its source position.

Source code in packages/dhis2w-ql/src/dhis2w_ql/tokenizer.py
class Token(BaseModel):
    """A single lexed token with its source position."""

    model_config = ConfigDict(frozen=True)

    kind: TokenKind
    value: str
    position: int
    line: int
    column: int

TokenKind

Bases: StrEnum

The lexical category of a token.

Source code in packages/dhis2w-ql/src/dhis2w_ql/tokenizer.py
class TokenKind(StrEnum):
    """The lexical category of a token."""

    IDENT = "ident"
    KEYWORD = "keyword"
    STRING = "string"
    INTEGER = "integer"
    DECIMAL = "decimal"
    DATETIME = "datetime"
    VARIABLE = "variable"
    OP = "op"
    PIPE = "pipe"
    SINK = "sink"
    LPAREN = "lparen"
    RPAREN = "rparen"
    LBRACKET = "lbracket"
    RBRACKET = "rbracket"
    LBRACE = "lbrace"
    RBRACE = "rbrace"
    COMMA = "comma"
    COLON = "colon"
    DOT = "dot"
    EOF = "eof"

Functions

plan_pipeline(resource, capabilities, stages)

Split stages into a NativeQuery (pushed down) and a residual list (run locally).

Source code in packages/dhis2w-ql/src/dhis2w_ql/engine/planner.py
def plan_pipeline(resource: str, capabilities: SourceCapabilities, stages: list[Stage]) -> QueryPlan:
    """Split `stages` into a `NativeQuery` (pushed down) and a residual list (run locally)."""
    filters: list[NativeFilter] = []
    root_junction: Literal["AND", "OR"] = "AND"
    index = 0

    blocked = frozenset(capabilities.non_pushable_paths)
    while index < len(stages) and capabilities.filter:
        stage = stages[index]
        if not isinstance(stage, WhereStage):
            break
        compiled = _compile_predicate(stage.predicate, blocked)
        if compiled is None:
            break
        if compiled.junction == "OR" and len(compiled.filters) > 1:
            if filters:
                break  # cannot AND an OR-group onto already-collected filters
            filters, root_junction = compiled.filters, "OR"
            index += 1
            break  # a pushed OR-group cannot be combined with further AND'd where stages
        filters.extend(compiled.filters)
        index += 1

    paging = _consume_paging(stages, index, capabilities, blocked)
    native = NativeQuery(
        resource=resource,
        filters=filters,
        root_junction=root_junction,
        order=paging.order,
        skip=paging.skip,
        limit=paging.limit,
    )
    return QueryPlan(native=native, residual=list(stages[paging.next_index :]))

to_jsonable(value)

Convert a result value (pydantic models, dicts, lists, scalars) to JSON-serialisable data.

Source code in packages/dhis2w-ql/src/dhis2w_ql/engine/executor.py
def to_jsonable(value: Any) -> Any:
    """Convert a result value (pydantic models, dicts, lists, scalars) to JSON-serialisable data."""
    return _jsonable(value)

write_rows(path, rows, fmt=None, *, scalar=False)

Write rows to path as json/ndjson/csv (inferring the format from the extension when None).

Source code in packages/dhis2w-ql/src/dhis2w_ql/engine/executor.py
def write_rows(
    path: str, rows: list[Any], fmt: Literal["json", "ndjson", "csv"] | None = None, *, scalar: bool = False
) -> str:
    """Write rows to `path` as json/ndjson/csv (inferring the format from the extension when None)."""
    if fmt is None:
        extension = path.rsplit(".", 1)[-1].lower() if "." in path else ""
        fmt = _EXT_FORMAT.get(extension, "json")
    return _write_file(FileSink(path=path, format=fmt), rows, scalar=scalar).path

parse(source)

Parse d2ql source text into a Library AST.

Source code in packages/dhis2w-ql/src/dhis2w_ql/parser.py
def parse(source: str) -> Library:
    """Parse d2ql source text into a `Library` AST."""
    return _Parser(tokenize(source)).parse_library()

parse_expression(source)

Parse a single bare expression (the d2path expression surface).

Source code in packages/dhis2w-ql/src/dhis2w_ql/parser.py
def parse_expression(source: str) -> Expr:
    """Parse a single bare expression (the d2path expression surface)."""
    parser = _Parser(tokenize(source))
    expr = parser.parse_expr()
    parser.expect_eof()
    return expr

parse_pipeline(source)

Parse a single pipeline (no definitions) into a Pipeline AST.

Source code in packages/dhis2w-ql/src/dhis2w_ql/parser.py
def parse_pipeline(source: str) -> Pipeline:
    """Parse a single pipeline (no definitions) into a `Pipeline` AST."""
    library = parse(source)
    if library.definitions or library.terminal is None:
        raise ParseError("expected a single pipeline", position=0, line=1, column=1)
    return library.terminal

samples_by_category()

Group the catalog by category, preserving definition order within each group.

Source code in packages/dhis2w-ql/src/dhis2w_ql/samples.py
def samples_by_category() -> dict[str, list[Sample]]:
    """Group the catalog by category, preserving definition order within each group."""
    grouped: dict[str, list[Sample]] = {}
    for sample in SAMPLES:
        grouped.setdefault(sample.category, []).append(sample)
    return grouped

tokenize(source)

Lex d2ql source text into a list of tokens terminated by an EOF token.

Source code in packages/dhis2w-ql/src/dhis2w_ql/tokenizer.py
def tokenize(source: str) -> list[Token]:
    """Lex d2ql source text into a list of tokens terminated by an EOF token."""
    return _Lexer(source).run()