Skip to content

ML Workflows

Chapkit provides a complete ML workflow system for training models and making predictions with artifact-based model storage, job scheduling, and hierarchical model lineage tracking.

Quick Start

from chapkit import BaseConfig
from chapkit.api import MLServiceBuilder, MLServiceInfo
from chapkit.modules.artifact import ArtifactHierarchy
from chapkit.modules.ml import FunctionalModelRunner
import pandas as pd
from sklearn.linear_model import LinearRegression

class ModelConfig(BaseConfig):
    pass

async def on_train(config: ModelConfig, data: pd.DataFrame, geo=None):
    X = data[["feature1", "feature2"]]
    y = data["target"]
    model = LinearRegression()
    model.fit(X, y)
    return model

async def on_predict(config: ModelConfig, model, historic, future, geo=None):
    X = future[["feature1", "feature2"]]
    future["sample_0"] = model.predict(X)
    return future

runner = FunctionalModelRunner(on_train=on_train, on_predict=on_predict)

app = (
    MLServiceBuilder(
        info=MLServiceInfo(display_name="My ML Service"),
        config_schema=ModelConfig,
        hierarchy=ArtifactHierarchy(name="ml", level_labels={0: "model", 1: "predictions"}),
        runner=runner,
    )
    .build()
)

Run: fastapi dev your_file.py

from chapkit.modules.ml import BaseModelRunner
from sklearn.preprocessing import StandardScaler

class CustomModelRunner(BaseModelRunner):
    def __init__(self):
        self.scaler = StandardScaler()

    async def on_train(self, config, data, geo=None):
        X = data[["feature1", "feature2"]]
        y = data["target"]

        X_scaled = self.scaler.fit_transform(X)
        model = LinearRegression()
        model.fit(X_scaled, y)

        return {"model": model, "scaler": self.scaler}

    async def on_predict(self, config, model, historic, future, geo=None):
        X = future[["feature1", "feature2"]]
        X_scaled = model["scaler"].transform(X)
        future["sample_0"] = model["model"].predict(X_scaled)
        return future

runner = CustomModelRunner()
# Use same MLServiceBuilder setup as above

Shell-Based Approach (Language-Agnostic)

from chapkit.modules.ml import ShellModelRunner
import sys

train_command = (
    f"{sys.executable} scripts/train.py "
    "--config {config_file} --data {data_file} --model {model_file}"
)

predict_command = (
    f"{sys.executable} scripts/predict.py "
    "--config {config_file} --model {model_file} "
    "--future {future_file} --output {output_file}"
)

runner = ShellModelRunner(
    train_command=train_command,
    predict_command=predict_command,
    model_format="pickle"
)
# Use same MLServiceBuilder setup as above

Architecture

Train/Predict Flow

1. TRAIN                           2. PREDICT
   POST /api/v1/ml/$train             POST /api/v1/ml/$predict
   ├─> Submit job                     ├─> Load trained model artifact
   ├─> Load config                    ├─> Load config
   ├─> Execute runner.on_train()      ├─> Execute runner.on_predict()
   └─> Store model in artifact        └─> Store predictions in artifact
       (level 0, parent_id=None)           (level 1, parent_id=model_id)

Artifact Hierarchy

Config
  └─> Trained Model (level 0)
       ├─> Predictions 1 (level 1)
       ├─> Predictions 2 (level 1)
       └─> Predictions 3 (level 1)

Benefits: - Complete model lineage tracking - Multiple predictions from same model - Config linked to all model artifacts - Immutable model versioning

Job Scheduling

All train/predict operations are asynchronous: - Submit returns immediately with job_id and artifact_id - Monitor progress via Job API or SSE streaming - Results stored in artifacts when complete


Model Runners

BaseModelRunner

Abstract base class for custom model runners with lifecycle hooks.

from chapkit.modules.ml import BaseModelRunner

class MyRunner(BaseModelRunner):
    async def on_init(self):
        """Called before train or predict (optional)."""
        pass

    async def on_cleanup(self):
        """Called after train or predict (optional)."""
        pass

    async def on_train(self, config, data, geo=None):
        """Train and return model (must be pickleable)."""
        # Your training logic
        return trained_model

    async def on_predict(self, config, model, historic, future, geo=None):
        """Make predictions and return DataFrame."""
        # Your prediction logic
        return predictions_df

Key Points: - Model must be pickleable (stored in artifact) - Return value from on_train is passed to on_predict as model parameter - historic parameter is required (must be provided, can be empty DataFrame) - GeoJSON support via geo parameter

FunctionalModelRunner

Wraps train/predict functions for functional-style ML workflows.

from chapkit.modules.ml import FunctionalModelRunner

async def train_fn(config, data, geo=None):
    # Training logic
    return model

async def predict_fn(config, model, historic, future, geo=None):
    # Prediction logic
    return predictions

runner = FunctionalModelRunner(on_train=train_fn, on_predict=predict_fn)

Use Cases: - Simple models without state - Quick prototypes - Pure function workflows

ShellModelRunner

Executes external scripts for language-agnostic ML workflows.

from chapkit.modules.ml import ShellModelRunner

runner = ShellModelRunner(
    train_command="python train.py --config {config_file} --data {data_file} --model {model_file}",
    predict_command="python predict.py --config {config_file} --model {model_file} --future {future_file} --output {output_file}",
    model_format="pickle"  # or "joblib", "json", etc.
)

Variable Substitution: - {config_file} - JSON config file - {data_file} - Training data CSV - {model_file} - Model file (format specified) - {future_file} - Future data CSV - {historic_file} - Historic data CSV (required) - {output_file} - Predictions output CSV - {geo_file} - GeoJSON file (if provided)

Script Requirements: - Training script: Read data/config, train model, save model to {model_file} - Prediction script: Read model/data/config, make predictions, save to {output_file} - Exit code 0 on success, non-zero on failure - Use stderr for logging

Example Training Script (Python):

#!/usr/bin/env python3
import argparse, json, pickle
import pandas as pd
from sklearn.linear_model import LinearRegression

parser = argparse.ArgumentParser()
parser.add_argument("--config", required=True)
parser.add_argument("--data", required=True)
parser.add_argument("--model", required=True)
args = parser.parse_args()

# Load config
with open(args.config) as f:
    config = json.load(f)

# Load data
data = pd.read_csv(args.data)

# Train
X = data[["feature1", "feature2"]]
y = data["target"]
model = LinearRegression()
model.fit(X, y)

# Save
with open(args.model, "wb") as f:
    pickle.dump(model, f)

Use Cases: - Integration with R, Julia, or other languages - Legacy scripts without modification - Containerized ML pipelines - Team collaboration across languages


ServiceBuilder Setup

Bundles health, config, artifacts, jobs, and ML in one builder.

from chapkit.api import MLServiceBuilder, MLServiceInfo, AssessedStatus
from chapkit.modules.artifact import ArtifactHierarchy

info = MLServiceInfo(
    display_name="Disease Prediction Service",
    version="1.0.0",
    summary="ML service for disease prediction",
    description="Train and predict disease cases using weather data",
    author="ML Team",
    author_assessed_status=AssessedStatus.green,
    contact_email="ml-team@example.com",
)

hierarchy = ArtifactHierarchy(
    name="ml_pipeline",
    level_labels={0: "trained_model", 1: "predictions"},
)

app = (
    MLServiceBuilder(
        info=info,
        config_schema=ModelConfig,
        hierarchy=hierarchy,
        runner=runner,
    )
    .with_monitoring()  # Optional: Prometheus metrics
    .build()
)

MLServiceBuilder automatically includes: - Health check (/health) - Config CRUD (/api/v1/configs) - Artifact CRUD (/api/v1/artifacts) - Job scheduler (/api/v1/jobs) with concurrency control - ML endpoints (/api/v1/ml/$train, /api/v1/ml/$predict)

ServiceBuilder (Manual Configuration)

For fine-grained control:

from chapkit.api import ServiceBuilder, ServiceInfo

app = (
    ServiceBuilder(info=ServiceInfo(display_name="Custom ML Service"))
    .with_health()
    .with_config(ModelConfig)
    .with_artifacts(hierarchy=hierarchy)
    .with_jobs(max_concurrency=3)
    .with_ml(runner=runner)
    .build()
)

Requirements: - .with_config() must be called before .with_ml() - .with_artifacts() must be called before .with_ml() - .with_jobs() must be called before .with_ml()

Configuration Options

MLServiceBuilder(
    info=info,
    config_schema=YourConfig,
    hierarchy=hierarchy,
    runner=runner,
    max_concurrency=5,       # Limit concurrent jobs (default: unlimited)
    database_url="ml.db",    # Persistent storage (default: in-memory)
)

API Reference

POST /api/v1/ml/$train

Train a model asynchronously.

Request:

{
  "config_id": "01JCONFIG...",
  "data": {
    "columns": ["feature1", "feature2", "target"],
    "data": [
      [1.0, 2.0, 10.0],
      [2.0, 3.0, 15.0],
      [3.0, 4.0, 20.0]
    ]
  },
  "geo": null
}

Response (202 Accepted):

{
  "job_id": "01JOB123...",
  "model_artifact_id": "01MODEL456...",
  "message": "Training job submitted. Job ID: 01JOB123..."
}

cURL Example:

# Create config first
CONFIG_ID=$(curl -s -X POST http://localhost:8000/api/v1/configs \
  -H "Content-Type: application/json" \
  -d '{"name": "my_config", "data": {}}' | jq -r '.id')

# Submit training job
curl -X POST http://localhost:8000/api/v1/ml/\$train \
  -H "Content-Type: application/json" \
  -d '{
    "config_id": "'$CONFIG_ID'",
    "data": {
      "columns": ["rainfall", "temperature", "cases"],
      "data": [[10.0, 25.0, 5.0], [15.0, 28.0, 8.0]]
    }
  }' | jq

POST /api/v1/ml/$predict

Make predictions using a trained model.

Request:

{
  "model_artifact_id": "01MODEL456...",
  "historic": {
    "columns": ["feature1", "feature2"],
    "data": []
  },
  "future": {
    "columns": ["feature1", "feature2"],
    "data": [
      [1.5, 2.5],
      [2.5, 3.5]
    ]
  },
  "geo": null
}

Response (202 Accepted):

{
  "job_id": "01JOB789...",
  "prediction_artifact_id": "01PRED012...",
  "message": "Prediction job submitted. Job ID: 01JOB789..."
}

cURL Example:

# Use model from training
curl -X POST http://localhost:8000/api/v1/ml/\$predict \
  -H "Content-Type: application/json" \
  -d '{
    "model_artifact_id": "'$MODEL_ARTIFACT_ID'",
    "historic": {
      "columns": ["rainfall", "temperature"],
      "data": []
    },
    "future": {
      "columns": ["rainfall", "temperature"],
      "data": [[12.0, 26.0], [18.0, 29.0]]
    }
  }' | jq

Monitor Job Status

# Poll job status
curl http://localhost:8000/api/v1/jobs/$JOB_ID | jq

# Stream status updates (SSE)
curl -N http://localhost:8000/api/v1/jobs/$JOB_ID/\$stream

# Get results from artifact
ARTIFACT_ID=$(curl -s http://localhost:8000/api/v1/jobs/$JOB_ID | jq -r '.artifact_id')
curl http://localhost:8000/api/v1/artifacts/$ARTIFACT_ID | jq

Data Formats

PandasDataFrame Schema

All tabular data uses the PandasDataFrame schema:

{
  "columns": ["col1", "col2", "col3"],
  "data": [
    [1.0, 2.0, 3.0],
    [4.0, 5.0, 6.0]
  ],
  "index": null,
  "column_types": null
}

Python Usage:

from chapkit.modules.artifact.schemas import PandasDataFrame

# Create from DataFrame
df = pd.DataFrame({"a": [1, 2], "b": [3, 4]})
pandas_df = PandasDataFrame.from_dataframe(df)

# Convert to DataFrame
df = pandas_df.to_dataframe()

GeoJSON Support

Optional geospatial data via geojson-pydantic:

{
  "type": "FeatureCollection",
  "features": [
    {
      "type": "Feature",
      "geometry": {
        "type": "Point",
        "coordinates": [-122.4194, 37.7749]
      },
      "properties": {
        "name": "San Francisco",
        "population": 883305
      }
    }
  ]
}

Artifact Structure

TrainedModelArtifactData

Stored at hierarchy level 0:

{
  "ml_type": "trained_model",
  "config_id": "01CONFIG...",
  "model": "<pickled model object>",
  "model_type": "sklearn.linear_model.LinearRegression",
  "model_size_bytes": 1234,
  "started_at": "2025-10-14T10:00:00Z",
  "completed_at": "2025-10-14T10:00:15Z",
  "duration_seconds": 15.23
}

Fields: - ml_type: Always "trained_model" - config_id: Config used for training - model: Pickled model object (any Python object) - model_type: Fully qualified class name (e.g., sklearn.linear_model.LinearRegression) - model_size_bytes: Serialized pickle size - started_at, completed_at: ISO timestamps - duration_seconds: Training duration (rounded to 2 decimals)

PredictionArtifactData

Stored at hierarchy level 1 (linked to model):

{
  "ml_type": "prediction",
  "config_id": "01CONFIG...",
  "model_artifact_id": "01MODEL...",
  "predictions": {
    "columns": ["feature1", "feature2", "sample_0"],
    "data": [[1.5, 2.5, 12.3], [2.5, 3.5, 17.8]]
  },
  "started_at": "2025-10-14T10:05:00Z",
  "completed_at": "2025-10-14T10:05:02Z",
  "duration_seconds": 2.15
}

Fields: - ml_type: Always "prediction" - config_id: Config used for prediction - model_artifact_id: Parent trained model artifact - predictions: Result DataFrame (PandasDataFrame schema) - started_at, completed_at: ISO timestamps - duration_seconds: Prediction duration (rounded to 2 decimals)


Complete Workflow Examples

Basic Functional Workflow

# 1. Start service
fastapi dev examples/ml_basic.py

# 2. Create config
CONFIG_ID=$(curl -s -X POST http://localhost:8000/api/v1/configs \
  -H "Content-Type: application/json" \
  -d '{"name": "weather_model", "data": {}}' | jq -r '.id')

# 3. Train model
TRAIN_RESPONSE=$(curl -s -X POST http://localhost:8000/api/v1/ml/\$train \
  -H "Content-Type: application/json" \
  -d '{
    "config_id": "'$CONFIG_ID'",
    "data": {
      "columns": ["rainfall", "mean_temperature", "disease_cases"],
      "data": [
        [10.0, 25.0, 5.0],
        [15.0, 28.0, 8.0],
        [8.0, 22.0, 3.0],
        [20.0, 30.0, 12.0],
        [12.0, 26.0, 6.0]
      ]
    }
  }')

JOB_ID=$(echo $TRAIN_RESPONSE | jq -r '.job_id')
MODEL_ARTIFACT_ID=$(echo $TRAIN_RESPONSE | jq -r '.model_artifact_id')

echo "Training Job ID: $JOB_ID"
echo "Model Artifact ID: $MODEL_ARTIFACT_ID"

# 4. Wait for training completion
curl -N http://localhost:8000/api/v1/jobs/$JOB_ID/\$stream

# 5. View trained model
curl http://localhost:8000/api/v1/artifacts/$MODEL_ARTIFACT_ID | jq

# 6. Make predictions
PREDICT_RESPONSE=$(curl -s -X POST http://localhost:8000/api/v1/ml/\$predict \
  -H "Content-Type: application/json" \
  -d '{
    "model_artifact_id": "'$MODEL_ARTIFACT_ID'",
    "historic": {
      "columns": ["rainfall", "mean_temperature"],
      "data": []
    },
    "future": {
      "columns": ["rainfall", "mean_temperature"],
      "data": [
        [11.0, 26.0],
        [14.0, 27.0],
        [9.0, 24.0]
      ]
    }
  }')

PRED_JOB_ID=$(echo $PREDICT_RESPONSE | jq -r '.job_id')
PRED_ARTIFACT_ID=$(echo $PREDICT_RESPONSE | jq -r '.prediction_artifact_id')

# 7. Wait for predictions
curl -N http://localhost:8000/api/v1/jobs/$PRED_JOB_ID/\$stream

# 8. View predictions
curl http://localhost:8000/api/v1/artifacts/$PRED_ARTIFACT_ID | jq '.data.predictions'

Class-Based with Preprocessing

# examples/ml_class.py demonstrates:
# - StandardScaler for feature normalization
# - State management (scaler shared between train/predict)
# - Lifecycle hooks (on_init, on_cleanup)
# - Model artifact containing multiple objects

from chapkit.modules.ml import BaseModelRunner
from sklearn.preprocessing import StandardScaler
from sklearn.linear_model import LinearRegression

class WeatherModelRunner(BaseModelRunner):
    def __init__(self):
        self.feature_names = ["rainfall", "mean_temperature", "humidity"]
        self.scaler = None

    async def on_train(self, config, data, geo=None):
        X = data[self.feature_names].fillna(0)
        y = data["disease_cases"].fillna(0)

        # Normalize features
        self.scaler = StandardScaler()
        X_scaled = self.scaler.fit_transform(X)

        # Train model
        model = LinearRegression()
        model.fit(X_scaled, y)

        # Return dict with model and preprocessing artifacts
        return {
            "model": model,
            "scaler": self.scaler,
            "feature_names": self.feature_names,
        }

    async def on_predict(self, config, model, historic, future, geo=None):
        # Extract artifacts
        trained_model = model["model"]
        scaler = model["scaler"]
        feature_names = model["feature_names"]

        # Apply same preprocessing
        X = future[feature_names].fillna(0)
        X_scaled = scaler.transform(X)

        # Predict
        future["sample_0"] = trained_model.predict(X_scaled)
        return future

Benefits: - Consistent preprocessing between train/predict - Model artifacts include all necessary objects - Type safety and validation - Easy testing and debugging

Shell-Based Language-Agnostic

# examples/ml_shell.py demonstrates:
# - External R/Julia/Python scripts
# - File-based data interchange
# - No code modification required
# - Container-friendly workflows

from chapkit.modules.ml import ShellModelRunner
import sys

runner = ShellModelRunner(
    train_command=f"{sys.executable} scripts/train_model.py --config {{config_file}} --data {{data_file}} --model {{model_file}}",
    predict_command=f"{sys.executable} scripts/predict_model.py --config {{config_file}} --model {{model_file}} --future {{future_file}} --output {{output_file}}",
    model_format="pickle"
)

External Script Example (R):

#!/usr/bin/env Rscript
library(jsonlite)

args <- commandArgs(trailingOnly = TRUE)
config_file <- args[which(args == "--config") + 1]
data_file <- args[which(args == "--data") + 1]
model_file <- args[which(args == "--model") + 1]

# Load data
config <- fromJSON(config_file)
data <- read.csv(data_file)

# Train model
model <- lm(disease_cases ~ rainfall + mean_temperature, data = data)

# Save model
saveRDS(model, model_file)
cat("SUCCESS: Model trained\n")


Testing

Manual Testing

Terminal 1:

fastapi dev examples/ml_basic.py

Terminal 2:

# Complete workflow test
CONFIG_ID=$(curl -s -X POST http://localhost:8000/api/v1/configs \
  -d '{"name":"test","data":{}}' | jq -r '.id')

TRAIN=$(curl -s -X POST http://localhost:8000/api/v1/ml/\$train -d '{
  "config_id":"'$CONFIG_ID'",
  "data":{"columns":["a","b","y"],"data":[[1,2,10],[2,3,15],[3,4,20]]}
}')

MODEL_ID=$(echo $TRAIN | jq -r '.model_artifact_id')
JOB_ID=$(echo $TRAIN | jq -r '.job_id')

# Wait for completion
sleep 2
curl http://localhost:8000/api/v1/jobs/$JOB_ID | jq '.status'

# Predict
PRED=$(curl -s -X POST http://localhost:8000/api/v1/ml/\$predict -d '{
  "model_artifact_id":"'$MODEL_ID'",
  "historic":{"columns":["a","b"],"data":[]},
  "future":{"columns":["a","b"],"data":[[1.5,2.5],[2.5,3.5]]}
}')

PRED_ID=$(echo $PRED | jq -r '.prediction_artifact_id')
sleep 2

# View results
curl http://localhost:8000/api/v1/artifacts/$PRED_ID | jq '.data.predictions'

Automated Testing

import time
from fastapi.testclient import TestClient

def wait_for_job(client: TestClient, job_id: str, timeout: float = 5.0):
    """Poll until job completes."""
    start = time.time()
    while time.time() - start < timeout:
        job = client.get(f"/api/v1/jobs/{job_id}").json()
        if job["status"] in ["completed", "failed", "canceled"]:
            return job
        time.sleep(0.1)
    raise TimeoutError(f"Job {job_id} timeout")


def test_train_predict_workflow(client: TestClient):
    """Test complete ML workflow."""
    # Create config
    config_resp = client.post("/api/v1/configs", json={
        "name": "test_config",
        "data": {}
    })
    config_id = config_resp.json()["id"]

    # Train
    train_resp = client.post("/api/v1/ml/$train", json={
        "config_id": config_id,
        "data": {
            "columns": ["x1", "x2", "y"],
            "data": [[1, 2, 10], [2, 3, 15], [3, 4, 20]]
        }
    })
    assert train_resp.status_code == 202

    train_data = train_resp.json()
    job_id = train_data["job_id"]
    model_id = train_data["model_artifact_id"]

    # Wait for training
    job = wait_for_job(client, job_id)
    assert job["status"] == "completed"

    # Verify model artifact
    model_artifact = client.get(f"/api/v1/artifacts/{model_id}").json()
    assert model_artifact["data"]["ml_type"] == "trained_model"
    assert model_artifact["level"] == 0

    # Predict
    pred_resp = client.post("/api/v1/ml/$predict", json={
        "model_artifact_id": model_id,
        "historic": {
            "columns": ["x1", "x2"],
            "data": []
        },
        "future": {
            "columns": ["x1", "x2"],
            "data": [[1.5, 2.5], [2.5, 3.5]]
        }
    })
    assert pred_resp.status_code == 202

    pred_data = pred_resp.json()
    pred_job_id = pred_data["job_id"]
    pred_id = pred_data["prediction_artifact_id"]

    # Wait for prediction
    pred_job = wait_for_job(client, pred_job_id)
    assert pred_job["status"] == "completed"

    # Verify predictions
    pred_artifact = client.get(f"/api/v1/artifacts/{pred_id}").json()
    assert pred_artifact["data"]["ml_type"] == "prediction"
    assert pred_artifact["parent_id"] == model_id
    assert pred_artifact["level"] == 1
    assert "sample_0" in pred_artifact["data"]["predictions"]["columns"]

Browser Testing (Swagger UI)

  1. Open http://localhost:8000/docs
  2. Create config via POST /api/v1/configs
  3. Train via POST /api/v1/ml/$train
  4. Monitor job via GET /api/v1/jobs/{job_id}
  5. Predict via POST /api/v1/ml/$predict
  6. View artifacts via GET /api/v1/artifacts/{artifact_id}

Production Deployment

Concurrency Control

MLServiceBuilder(
    info=info,
    config_schema=config_schema,
    hierarchy=hierarchy,
    runner=runner,
    max_concurrency=3,  # Limit concurrent training jobs
)

Recommendations: - CPU-intensive models: Set to CPU core count (4-8) - GPU models: Set to GPU count (1-4) - Memory-intensive: Lower limits (2-3) - I/O-bound: Higher limits OK (10-20)

Database Configuration

MLServiceBuilder(
    info=info,
    config_schema=config_schema,
    hierarchy=hierarchy,
    runner=runner,
    database_url="/data/ml.db",  # Persistent storage
)

Best Practices: - Mount persistent volume for /data - Regular backups (models + artifacts) - Monitor database size growth - Implement artifact retention policies

Model Versioning

# Use config name for version tracking
config = {
    "name": "weather_model_v1.2.3",
    "data": {
        "version": "1.2.3",
        "features": ["rainfall", "temperature"],
        "hyperparameters": {"alpha": 0.01}
    }
}

Artifact Hierarchy for Versions:

weather_model_v1.0.0 (config)
  └─> trained_model_1 (artifact level 0)
       └─> predictions_* (artifact level 1)

weather_model_v1.1.0 (config)
  └─> trained_model_2 (artifact level 0)
       └─> predictions_* (artifact level 1)

Monitoring

app = (
    MLServiceBuilder(info=info, config_schema=config, hierarchy=hierarchy, runner=runner)
    .with_monitoring()  # Prometheus metrics at /metrics
    .build()
)

Available Metrics: - ml_train_jobs_total - Total training jobs submitted - ml_predict_jobs_total - Total prediction jobs submitted - Job scheduler metrics (see Job Scheduler guide)

Custom Metrics:

from prometheus_client import Histogram

model_training_duration = Histogram(
    'model_training_duration_seconds',
    'Model training duration'
)

# Training durations already tracked in artifact metadata
# Query via artifact API

Docker Deployment

Dockerfile:

FROM python:3.13-slim

WORKDIR /app
COPY . /app

RUN pip install --no-cache-dir -e .

# Create non-root user
RUN useradd -m -u 1000 mluser && chown -R mluser:mluser /app
USER mluser

EXPOSE 8000

CMD ["fastapi", "run", "ml_service.py", "--host", "0.0.0.0"]

docker-compose.yml:

version: '3.8'

services:
  ml-service:
    build: .
    ports:
      - "8000:8000"
    volumes:
      - ml-data:/data
    environment:
      - DATABASE_URL=/data/ml.db
    deploy:
      resources:
        limits:
          cpus: '4.0'
          memory: 8G

volumes:
  ml-data:

GPU Support

FROM nvidia/cuda:12.0-runtime-ubuntu22.04
FROM python:3.13

# Install ML libraries with GPU support
RUN pip install torch torchvision --index-url https://download.pytorch.org/whl/cu120

# Your ML code
COPY . /app

docker-compose.yml:

services:
  ml-service:
    build: .
    deploy:
      resources:
        reservations:
          devices:
            - driver: nvidia
              count: 1
              capabilities: [gpu]


Troubleshooting

"Config not found" Error

Problem: Training fails with "Config {id} not found"

Cause: Invalid or deleted config ID

Solution:

# List configs
curl http://localhost:8000/api/v1/configs | jq

# Verify config exists
curl http://localhost:8000/api/v1/configs/$CONFIG_ID

"Model artifact not found" Error

Problem: Prediction fails with "Model artifact {id} not found"

Cause: Invalid model artifact ID or training failed

Solution:

# Check training job status
curl http://localhost:8000/api/v1/jobs/$TRAIN_JOB_ID | jq

# If training failed, check error
curl http://localhost:8000/api/v1/jobs/$TRAIN_JOB_ID | jq '.error'

# List artifacts
curl http://localhost:8000/api/v1/artifacts | jq

Training Job Fails Immediately

Problem: Job status shows "failed" right after submission

Causes: 1. Model not pickleable 2. Missing required columns in data 3. Insufficient training data 4. Config validation errors

Solution:

# Check job error message
curl http://localhost:8000/api/v1/jobs/$JOB_ID | jq '.error, .error_traceback'

# Common fixes:
# - Ensure model is pickleable (no lambda functions, local classes)
# - Verify DataFrame columns match feature expectations
# - Check config schema validation

Prediction Returns Wrong Shape

Problem: Predictions DataFrame has incorrect columns

Cause: on_predict must add prediction columns to input DataFrame

Solution:

async def on_predict(self, config, model, historic, future, geo=None):
    X = future[["feature1", "feature2"]]
    predictions = model.predict(X)

    # IMPORTANT: Add predictions to future DataFrame
    future["sample_0"] = predictions  # Required column name

    return future  # Return modified DataFrame

Shell Runner Script Fails

Problem: ShellModelRunner returns "script failed with exit code 1"

Causes: 1. Script not executable 2. Wrong interpreter 3. Missing dependencies 4. File path issues

Solution:

# Make script executable
chmod +x scripts/train_model.py

# Test script manually
python scripts/train_model.py \
  --config /tmp/test_config.json \
  --data /tmp/test_data.csv \
  --model /tmp/test_model.pkl

# Check script stderr output
curl http://localhost:8000/api/v1/jobs/$JOB_ID | jq '.error'

High Memory Usage

Problem: Service consuming excessive memory

Causes: 1. Large models in memory 2. Too many concurrent jobs 3. Artifact accumulation

Solution:

# Limit concurrent jobs
MLServiceBuilder(..., max_concurrency=2)

# Implement artifact cleanup
async def cleanup_old_artifacts(app):
    # Delete artifacts older than 30 days
    cutoff = datetime.now() - timedelta(days=30)
    # Implementation depends on your needs

app.on_startup(cleanup_old_artifacts)

Model Size Too Large

Problem: "Model size exceeds limit" or slow artifact storage

Cause: Large models (>100MB) stored in SQLite

Solution:

# Option 1: External model storage
async def on_train(self, config, data, geo=None):
    model = train_large_model(data)

    # Save to external storage (S3, etc.)
    model_url = save_to_s3(model)

    # Return metadata instead of model
    return {
        "model_url": model_url,
        "model_metadata": {...}
    }

# Option 2: Use PostgreSQL instead of SQLite
MLServiceBuilder(..., database_url="postgresql://...")

DataFrame Validation Errors

Problem: "Invalid PandasDataFrame schema" during train/predict

Cause: Incorrect data format in request

Solution:

// Correct format
{
  "columns": ["col1", "col2"],
  "data": [
    [1.0, 2.0],
    [3.0, 4.0]
  ]
}

// Wrong formats:
// {"col1": [1, 3], "col2": [2, 4]}  (dict format - not supported)
// [{"col1": 1, "col2": 2}]  (records format - not supported)


Next Steps

  • Job Monitoring: See Job Scheduler guide for SSE streaming
  • Task Execution: Combine with Tasks for preprocessing pipelines
  • Authentication: Secure ML endpoints with API keys
  • Monitoring: Track model performance with Prometheus metrics

For more examples: - examples/ml_basic.py - Functional runner with LinearRegression - examples/ml_class.py - Class-based runner with preprocessing - examples/ml_shell.py - Shell-based runner with external scripts - tests/test_example_ml_basic.py - Complete test suite