Data Sources Guide¶
This guide covers FHIR data sources for CQL evaluation. Data sources provide the bridge between your FHIR data and the CQL engine, enabling retrieve operations, patient-scoped queries, and code filtering.
Introduction¶
What are Data Sources?¶
Data sources are abstractions that provide FHIR resources to the CQL evaluator during execution. When a CQL expression includes a retrieve statement like [Condition] or [Observation: "Blood Pressure"], the data source is responsible for:
- Finding all resources of the requested type
- Filtering by patient context
- Applying code/valueset filters
- Applying date range filters
Available Data Sources¶
| Data Source | Use Case |
|---|---|
InMemoryDataSource |
General-purpose storage for testing and simple scenarios |
BundleDataSource |
Load data from a FHIR Bundle |
PatientBundleDataSource |
Patient-centric bundle with automatic patient context |
| Custom implementations | Connect to FHIR servers, databases, etc. |
Quick Start¶
Basic Usage¶
from fhirkit.engine.cql import CQLEvaluator, InMemoryDataSource
# Create and populate data source
data_source = InMemoryDataSource()
# Add a patient
data_source.add_resource({
"resourceType": "Patient",
"id": "patient-1",
"birthDate": "1980-05-15"
})
# Add a condition
data_source.add_resource({
"resourceType": "Condition",
"id": "condition-1",
"subject": {"reference": "Patient/patient-1"},
"code": {"coding": [{"system": "http://snomed.info/sct", "code": "44054006"}]}
})
# Create evaluator with data source
evaluator = CQLEvaluator(data_source=data_source)
# Compile and run CQL
evaluator.compile("""
library Test version '1.0'
using FHIR version '4.0.1'
context Patient
define HasCondition: exists([Condition])
""")
patient = {"resourceType": "Patient", "id": "patient-1"}
result = evaluator.evaluate_definition("HasCondition", resource=patient)
print(result) # True
Using Bundle Data Source¶
from fhirkit.engine.cql import CQLEvaluator, BundleDataSource
# Load from a FHIR Bundle
bundle = {
"resourceType": "Bundle",
"entry": [
{"resource": {"resourceType": "Patient", "id": "p1", "birthDate": "1990-01-01"}},
{"resource": {"resourceType": "Observation", "id": "o1", "subject": {"reference": "Patient/p1"}}}
]
}
data_source = BundleDataSource(bundle)
evaluator = CQLEvaluator(data_source=data_source)
InMemoryDataSource¶
The most flexible data source, storing resources in memory with full filtering support.
Creating and Populating¶
from fhirkit.engine.cql import InMemoryDataSource
ds = InMemoryDataSource()
# Add single resource
ds.add_resource({
"resourceType": "Patient",
"id": "p1",
"birthDate": "1985-03-15"
})
# Add multiple resources
ds.add_resources([
{"resourceType": "Patient", "id": "p2", "birthDate": "1990-06-20"},
{"resourceType": "Patient", "id": "p3", "birthDate": "1975-12-01"},
])
Retrieving Resources¶
# Retrieve all resources of a type
patients = ds.retrieve("Patient")
print(f"Found {len(patients)} patients")
# Retrieve with patient context
from fhirkit.engine.cql import PatientContext
context = PatientContext(resource={"resourceType": "Patient", "id": "p1"})
conditions = ds.retrieve("Condition", context=context)
# Returns only conditions for patient p1
Resolving References¶
# Resolve a FHIR reference
patient = ds.resolve_reference("Patient/p1")
if patient:
print(f"Found: {patient['id']}")
else:
print("Not found")
Working with ValueSets¶
from fhirkit.engine.cql import CQLCode
# Add expanded valueset
ds.add_valueset(
"http://example.org/valueset/diabetes",
[
CQLCode(code="44054006", system="http://snomed.info/sct"), # Type 2 DM
CQLCode(code="46635009", system="http://snomed.info/sct"), # Type 1 DM
]
)
# Retrieve with valueset filter
diabetes_conditions = ds.retrieve(
"Condition",
code_path="code",
valueset="http://example.org/valueset/diabetes"
)
Clearing Data¶
BundleDataSource¶
Load FHIR resources from a Bundle for evaluation.
Creating from Bundle¶
from fhirkit.engine.cql import BundleDataSource
bundle = {
"resourceType": "Bundle",
"type": "collection",
"entry": [
{"resource": {"resourceType": "Patient", "id": "p1"}},
{"resource": {"resourceType": "Condition", "id": "c1", "subject": {"reference": "Patient/p1"}}},
{"resource": {"resourceType": "Observation", "id": "o1", "subject": {"reference": "Patient/p1"}}}
]
}
ds = BundleDataSource(bundle)
Loading Single Resource¶
You can also pass a single resource:
patient = {"resourceType": "Patient", "id": "p1", "birthDate": "1990-01-01"}
ds = BundleDataSource(patient)
Accessing Resources¶
# Get all resources by type
print(ds.resources) # {"Patient": [...], "Condition": [...], ...}
# Count resources
for resource_type, resources in ds.resources.items():
print(f"{resource_type}: {len(resources)}")
Adding ValueSets¶
from fhirkit.engine.cql import CQLCode
ds.add_valueset(
"http://example.org/valueset/labs",
[CQLCode(code="2339-0", system="http://loinc.org")]
)
PatientBundleDataSource¶
Specialized for patient-centric bundles with automatic patient context.
Usage¶
from fhirkit.engine.cql import PatientBundleDataSource
# Patient bundle (like Synthea output)
bundle = {
"resourceType": "Bundle",
"entry": [
{"resource": {"resourceType": "Patient", "id": "p1", "name": [{"family": "Smith"}]}},
{"resource": {"resourceType": "Condition", "id": "c1", "subject": {"reference": "Patient/p1"}}},
{"resource": {"resourceType": "Condition", "id": "c2", "subject": {"reference": "Patient/p2"}}}, # Different patient
]
}
ds = PatientBundleDataSource(bundle)
# Access the extracted patient
print(ds.patient) # {"resourceType": "Patient", "id": "p1", ...}
# Retrieves are automatically filtered to the patient
conditions = ds.retrieve("Condition")
print(len(conditions)) # 1 (only c1, which belongs to p1)
Key Differences from BundleDataSource¶
| Feature | BundleDataSource | PatientBundleDataSource |
|---|---|---|
| Patient extraction | Manual | Automatic |
| Default context | None | Patient from bundle |
| Resource filtering | Requires explicit context | Automatic patient scope |
Filtering Capabilities¶
Code Filtering¶
Filter resources by code values:
from fhirkit.engine.cql import CQLCode
# Using CQLCode
diabetes_code = CQLCode(code="44054006", system="http://snomed.info/sct")
conditions = ds.retrieve(
"Condition",
code_path="code",
codes=[diabetes_code]
)
# Using simple string (matches any system)
conditions = ds.retrieve(
"Condition",
code_path="code",
codes=["44054006"]
)
# Using dict
conditions = ds.retrieve(
"Condition",
code_path="code",
codes=[{"code": "44054006", "system": "http://snomed.info/sct"}]
)
ValueSet Filtering¶
Filter by valueset membership:
# First, add the valueset
ds.add_valueset(
"http://example.org/valueset/diabetes",
[
CQLCode(code="44054006", system="http://snomed.info/sct"),
CQLCode(code="46635009", system="http://snomed.info/sct"),
]
)
# Then filter by valueset
conditions = ds.retrieve(
"Condition",
code_path="code",
valueset="http://example.org/valueset/diabetes"
)
Date Range Filtering¶
Filter resources by date:
from fhirkit.engine.cql import CQLInterval
from fhirkit.engine.types import FHIRDateTime
date_range = CQLInterval(
low=FHIRDateTime.parse("2024-01-01T00:00:00"),
high=FHIRDateTime.parse("2024-12-31T23:59:59"),
low_closed=True,
high_closed=True
)
observations = ds.retrieve(
"Observation",
date_path="effectiveDateTime",
date_range=date_range
)
Combined Filtering¶
Apply multiple filters:
# Get blood pressure observations from 2024 for a specific patient
observations = ds.retrieve(
"Observation",
context=patient_context,
code_path="code",
valueset="http://example.org/valueset/blood-pressure",
date_path="effectiveDateTime",
date_range=date_range
)
Nested Value Access¶
Data sources support dot notation for accessing nested values:
# Simple path
# {"patient": {"name": "John"}} -> "John"
value = ds._get_nested_value(resource, "patient.name")
# Array index
# {"name": [{"family": "Smith"}, {"family": "Jones"}]} -> "Smith"
value = ds._get_nested_value(resource, "name.0.family")
# Array property (returns all values)
# {"name": [{"family": "Smith"}, {"family": "Jones"}]} -> ["Smith", "Jones"]
value = ds._get_nested_value(resource, "name.family")
Supported Code Paths¶
Common code paths for FHIR resources:
| Resource Type | Code Path |
|---|---|
| Condition | code |
| Observation | code |
| Procedure | code |
| MedicationRequest | medicationCodeableConcept |
| AllergyIntolerance | code |
| Immunization | vaccineCode |
Patient Reference Resolution¶
Data sources automatically extract patient references from resources:
# The data source knows how to find patient references for each resource type
# Condition: subject.reference or patient.reference
# Observation: subject.reference or patient.reference
# AllergyIntolerance: patient.reference
# etc.
Supported Resource Types¶
| Resource Type | Patient Reference Path(s) |
|---|---|
| Condition | subject.reference, patient.reference |
| Observation | subject.reference, patient.reference |
| MedicationRequest | subject.reference, patient.reference |
| Procedure | subject.reference, patient.reference |
| Encounter | subject.reference, patient.reference |
| DiagnosticReport | subject.reference, patient.reference |
| AllergyIntolerance | patient.reference |
| Immunization | patient.reference |
| Coverage | beneficiary.reference |
| Claim | patient.reference |
Creating Custom Data Sources¶
Implement your own data source for specific needs.
Interface¶
from fhirkit.engine.cql.datasource import FHIRDataSource
from fhirkit.engine.cql import CQLInterval
from typing import Any
class MyDataSource(FHIRDataSource):
"""Custom data source implementation."""
def retrieve(
self,
resource_type: str,
context: "CQLContext | None" = None,
code_path: str | None = None,
codes: list[Any] | None = None,
valueset: str | None = None,
date_path: str | None = None,
date_range: CQLInterval | None = None,
**kwargs: Any,
) -> list[dict[str, Any]]:
"""Retrieve FHIR resources.
Args:
resource_type: FHIR resource type (e.g., "Patient", "Condition")
context: CQL context (for patient-scoped queries)
code_path: Path to code element for filtering
codes: List of codes to filter by
valueset: ValueSet URL to filter by
date_path: Path to date element for filtering
date_range: Date interval for filtering
Returns:
List of matching FHIR resources
"""
# Implement your retrieval logic
pass
def resolve_reference(self, reference: str) -> dict[str, Any] | None:
"""Resolve a FHIR reference.
Args:
reference: Reference string (e.g., "Patient/123")
Returns:
The referenced resource or None
"""
# Implement reference resolution
pass
Example: HTTP FHIR Server Data Source¶
import requests
from fhirkit.engine.cql.datasource import FHIRDataSource
class FHIRServerDataSource(FHIRDataSource):
"""Data source backed by a FHIR server."""
def __init__(self, base_url: str, auth: tuple | None = None):
self.base_url = base_url.rstrip("/")
self.auth = auth
self._session = requests.Session()
if auth:
self._session.auth = auth
def retrieve(
self,
resource_type: str,
context=None,
code_path=None,
codes=None,
valueset=None,
date_path=None,
date_range=None,
**kwargs,
) -> list[dict]:
# Build search parameters
params = {}
# Add patient filter if context provided
if context and context.resource:
patient_id = context.resource.get("id")
if patient_id and resource_type != "Patient":
params["patient"] = f"Patient/{patient_id}"
# Add code filter
if codes:
code_values = [c.code if hasattr(c, 'code') else c for c in codes]
params["code"] = ",".join(code_values)
# Make request
url = f"{self.base_url}/{resource_type}"
response = self._session.get(url, params=params)
response.raise_for_status()
bundle = response.json()
return [entry["resource"] for entry in bundle.get("entry", [])]
def resolve_reference(self, reference: str) -> dict | None:
url = f"{self.base_url}/{reference}"
response = self._session.get(url)
if response.status_code == 200:
return response.json()
return None
Example: Database Data Source¶
from fhirkit.engine.cql.datasource import FHIRDataSource
import json
class SQLiteDataSource(FHIRDataSource):
"""Data source backed by SQLite database."""
def __init__(self, db_path: str):
import sqlite3
self.conn = sqlite3.connect(db_path)
def retrieve(
self,
resource_type: str,
context=None,
code_path=None,
codes=None,
**kwargs,
) -> list[dict]:
cursor = self.conn.cursor()
query = "SELECT data FROM resources WHERE type = ?"
params = [resource_type]
# Add patient filter
if context and context.resource:
patient_id = context.resource.get("id")
if patient_id:
query += " AND patient_id = ?"
params.append(patient_id)
cursor.execute(query, params)
rows = cursor.fetchall()
resources = [json.loads(row[0]) for row in rows]
# Apply code filter in memory
if code_path and codes:
resources = [
r for r in resources
if self._matches_code(r, code_path, codes)
]
return resources
def resolve_reference(self, reference: str) -> dict | None:
resource_type, resource_id = reference.split("/")
cursor = self.conn.cursor()
cursor.execute(
"SELECT data FROM resources WHERE type = ? AND id = ?",
(resource_type, resource_id)
)
row = cursor.fetchone()
return json.loads(row[0]) if row else None
Integration with CQL Evaluator¶
Setting Data Source¶
from fhirkit.engine.cql import CQLEvaluator, InMemoryDataSource
# Method 1: Pass to constructor
data_source = InMemoryDataSource()
evaluator = CQLEvaluator(data_source=data_source)
# Method 2: Set after creation
evaluator = CQLEvaluator()
evaluator._data_source = data_source
Using with Retrieve Expressions¶
data_source = InMemoryDataSource()
data_source.add_resources([
{"resourceType": "Patient", "id": "p1", "birthDate": "1980-01-01"},
{
"resourceType": "Condition",
"id": "c1",
"subject": {"reference": "Patient/p1"},
"code": {"coding": [{"system": "http://snomed.info/sct", "code": "44054006"}]}
}
])
evaluator = CQLEvaluator(data_source=data_source)
# CQL with retrieve
evaluator.compile("""
library Test version '1.0'
using FHIR version '4.0.1'
codesystem "SNOMED": 'http://snomed.info/sct'
code "Diabetes": '44054006' from "SNOMED"
context Patient
define AllConditions: [Condition]
define DiabetesConditions: [Condition: "Diabetes"]
define HasDiabetes: exists(DiabetesConditions)
""")
patient = {"resourceType": "Patient", "id": "p1"}
# Evaluate
all_conditions = evaluator.evaluate_definition("AllConditions", resource=patient)
has_diabetes = evaluator.evaluate_definition("HasDiabetes", resource=patient)
print(f"Conditions: {len(all_conditions)}") # 1
print(f"Has diabetes: {has_diabetes}") # True
Using with Measure Evaluation¶
from fhirkit.engine.cql import MeasureEvaluator, InMemoryDataSource
# Set up data
data_source = InMemoryDataSource()
data_source.add_resources([...])
# Create measure evaluator with data source
evaluator = MeasureEvaluator(data_source=data_source)
evaluator.load_measure(measure_cql)
# Evaluate population
patients = data_source.retrieve("Patient")
report = evaluator.evaluate_population(patients, data_source=data_source)
Performance Tips¶
Batch Loading¶
# Good: Load all resources at once
ds.add_resources(all_resources)
# Less efficient: Add one at a time in a loop
for resource in all_resources:
ds.add_resource(resource)
Pre-expand ValueSets¶
# Expand valuesets once, upfront
ds.add_valueset("http://example.org/vs/diabetes", expanded_codes)
# Instead of expanding during each retrieve
Use Specific Resource Types¶
# Good: Specific type
ds.retrieve("Condition")
# Avoid: Retrieving all and filtering in code
all_resources = ds._resources
conditions = [r for r in all_resources.get("Condition", [])]
Patient Context Early¶
# Good: Filter by patient early
conditions = ds.retrieve("Condition", context=patient_context)
# Less efficient: Retrieve all, filter later
all_conditions = ds.retrieve("Condition")
patient_conditions = [c for c in all_conditions if matches_patient(c)]
Examples¶
Loading Synthea Data¶
import json
from pathlib import Path
from fhirkit.engine.cql import PatientBundleDataSource, CQLEvaluator
def load_synthea_patient(patient_file: Path):
"""Load a Synthea patient bundle."""
with open(patient_file) as f:
bundle = json.load(f)
ds = PatientBundleDataSource(bundle)
return ds
# Load patient
ds = load_synthea_patient(Path("synthea/John_Smith.json"))
# Patient is automatically extracted
print(f"Patient: {ds.patient['id']}")
# Use with evaluator
evaluator = CQLEvaluator(data_source=ds)
evaluator.compile(my_cql)
result = evaluator.evaluate_definition("HasCondition", resource=ds.patient)
Multi-Patient Evaluation¶
from fhirkit.engine.cql import InMemoryDataSource, CQLEvaluator, PatientContext
# Load all patients and resources into one data source
ds = InMemoryDataSource()
ds.add_resources(all_patients)
ds.add_resources(all_conditions)
ds.add_resources(all_observations)
evaluator = CQLEvaluator(data_source=ds)
evaluator.compile(cql_source)
# Evaluate for each patient
results = {}
for patient in ds.retrieve("Patient"):
context = PatientContext(resource=patient)
evaluator._context = context
results[patient["id"]] = evaluator.evaluate_definition("Result", resource=patient)
Testing with Mock Data¶
import pytest
from fhirkit.engine.cql import InMemoryDataSource, CQLEvaluator
@pytest.fixture
def test_data_source():
"""Create a data source with test data."""
ds = InMemoryDataSource()
ds.add_resources([
{"resourceType": "Patient", "id": "test-patient", "birthDate": "1980-01-01"},
{
"resourceType": "Condition",
"id": "test-condition",
"subject": {"reference": "Patient/test-patient"},
"code": {"coding": [{"system": "http://snomed.info/sct", "code": "44054006"}]}
}
])
return ds
def test_diabetes_detection(test_data_source):
"""Test that diabetes is correctly detected."""
evaluator = CQLEvaluator(data_source=test_data_source)
evaluator.compile("""
library Test version '1.0'
using FHIR version '4.0.1'
context Patient
define HasDiabetes: exists([Condition])
""")
patient = {"resourceType": "Patient", "id": "test-patient"}
result = evaluator.evaluate_definition("HasDiabetes", resource=patient)
assert result is True
Troubleshooting¶
No Resources Returned¶
- Check resource type spelling (case-sensitive)
- Verify patient context is set correctly
- Check code paths match FHIR structure
- Verify valueset is loaded
# Debug: List all resources
print(ds._resources)
# Debug: Check specific type
print(ds.retrieve("Condition"))
Code Filtering Not Working¶
# Check the resource structure
resource = ds.retrieve("Condition")[0]
print(resource.get("code"))
# Verify code path
print(ds._get_nested_value(resource, "code"))
print(ds._get_nested_value(resource, "code.coding"))
Patient Context Issues¶
# Check patient reference in resources
for condition in ds.retrieve("Condition"):
print(ds._get_patient_reference(condition))
# Verify context patient ID matches
print(context.resource.get("id"))
See Also¶
- CQL Tutorial - CQL language basics
- CQL Python API - CQL evaluator reference
- Measure Evaluation Guide - Quality measures
- FHIR Server Guide - Built-in FHIR server