Job Scheduler¶
Chapkit provides an async job scheduler for managing long-running tasks with real-time status monitoring via Server-Sent Events (SSE).
Quick Start¶
Submit a Job¶
# Start the example service
fastapi dev examples/job_scheduler_sse_api.py
# Submit a 30-second computation job and capture job ID
JOB_ID=$(curl -s -X POST http://localhost:8000/api/v1/slow-compute \
-H "Content-Type: application/json" \
-d '{"steps": 30}' | jq -r '.job_id')
echo "Job ID: $JOB_ID"
Response:
{
"job_id": "01JQRS7X...",
"message": "Job submitted with 30 steps. Stream real-time status...",
"stream_url": "/api/v1/jobs/01JQRS7X.../$stream"
}
Monitor Job Status (Real-Time SSE)¶
Output (streaming):
data: {"id":"01JQRS7X...","status":"pending","submitted_at":"2025-10-12T..."}
data: {"id":"01JQRS7X...","status":"running","started_at":"2025-10-12T..."}
data: {"id":"01JQRS7X...","status":"completed","finished_at":"2025-10-12T...","artifact_id":null}
Note: Use -N
flag to disable cURL buffering for real-time streaming.
Job Lifecycle¶
Jobs progress through these states:
- pending - Job submitted, waiting to start
- running - Job is currently executing
- completed - Job finished successfully
- failed - Job encountered an error
- canceled - Job was canceled by user
Terminal States¶
These states indicate the job is finished:
- completed
- Success
- failed
- Error occurred
- canceled
- User canceled
SSE streams automatically close when a terminal state is reached.
Polling vs Streaming¶
Traditional Polling¶
# Client must repeatedly poll every second
while true; do
curl http://localhost:8000/api/v1/jobs/01JQRS7X...
sleep 1
done
Problems: - Wastes bandwidth (repeated full HTTP requests) - Polling interval trade-off (fast = expensive, slow = delayed updates) - Client-side polling logic needed
SSE Streaming¶
# Server pushes updates automatically
curl -N http://localhost:8000/api/v1/jobs/01JQRS7X.../\$stream
Benefits: - Efficient (single HTTP connection, server pushes updates) - Real-time (updates sent immediately when status changes) - Standard (W3C EventSource API built into browsers) - Simple (no client-side polling logic)
Real-Time Streaming with SSE¶
Server-Sent Events (SSE) provide efficient real-time updates over a single HTTP connection.
Browser JavaScript (EventSource API)¶
const jobId = "01JQRS7X...";
const eventSource = new EventSource(`/api/v1/jobs/${jobId}/$stream`);
eventSource.onmessage = (event) => {
const job = JSON.parse(event.data);
console.log(`Status: ${job.status}`);
// Update UI
document.getElementById('status').textContent = job.status;
// Close connection when done
if (['completed', 'failed', 'canceled'].includes(job.status)) {
console.log('Job finished');
eventSource.close();
}
};
eventSource.onerror = (error) => {
console.error('SSE connection error:', error);
eventSource.close();
};
cURL (Command Line)¶
# Stream job status updates
curl -N http://localhost:8000/api/v1/jobs/01JQRS7X.../\$stream
# Custom poll interval (default: 0.5 seconds)
curl -N "http://localhost:8000/api/v1/jobs/01JQRS7X.../\$stream?poll_interval=1.0"
Important: Use -N
/ --no-buffer
flag to disable buffering and see real-time updates.
Python (httpx)¶
import httpx
import json
job_id = "01JQRS7X..."
url = f"http://localhost:8000/api/v1/jobs/{job_id}/$stream"
with httpx.stream("GET", url) as response:
for line in response.iter_lines():
if line.startswith("data: "):
data = line[6:] # Remove "data: " prefix
job = json.loads(data)
print(f"Status: {job['status']}")
# Stop when terminal state reached
if job['status'] in ['completed', 'failed', 'canceled']:
break
Python (requests - Not Recommended)¶
The requests
library buffers responses by default, making it unsuitable for SSE. Use httpx
instead.
Configuration¶
ServiceBuilder Setup¶
from chapkit.api import ServiceBuilder, ServiceInfo
app = (
ServiceBuilder(info=ServiceInfo(display_name="My Service"))
.with_jobs(max_concurrency=5) # Limit concurrent jobs
.build()
)
Parameters:
- max_concurrency
(int | None
): Maximum concurrent jobs. None
= unlimited.
SSE Poll Interval¶
The SSE endpoint polls the scheduler internally at configurable intervals:
# Default: 0.5 seconds
curl -N http://localhost:8000/api/v1/jobs/01JQRS.../\$stream
# Custom: 1.0 second
curl -N "http://localhost:8000/api/v1/jobs/01JQRS.../\$stream?poll_interval=1.0"
Recommendations: - Development: 0.5s (default) - good balance - Production: 1.0s - reduces server load - High-frequency: 0.1s - near real-time (use sparingly)
API Reference¶
POST /api/v1/jobs¶
Not exposed directly. Submit jobs via custom endpoints (e.g., /api/v1/slow-compute
).
GET /api/v1/jobs¶
List all jobs with optional status filtering.
# List all jobs
curl http://localhost:8000/api/v1/jobs
# Filter by status
curl http://localhost:8000/api/v1/jobs?status_filter=completed
GET /api/v1/jobs/{job_id}¶
Get job status and details (single request).
Response:
{
"id": "01JQRS7X...",
"status": "running",
"submitted_at": "2025-10-12T15:30:00Z",
"started_at": "2025-10-12T15:30:01Z",
"finished_at": null,
"error": null,
"artifact_id": null
}
GET /api/v1/jobs/{job_id}/$stream¶
Stream real-time job status updates via Server-Sent Events.
Query Parameters:
- poll_interval
(float, default: 0.5): Seconds between status checks
Response Format:
Content-Type: text/event-stream
Cache-Control: no-cache
Connection: keep-alive
data: {"id":"...","status":"pending",...}
data: {"id":"...","status":"running",...}
data: {"id":"...","status":"completed",...}
Connection closes automatically when job reaches terminal state.
DELETE /api/v1/jobs/{job_id}¶
Cancel and delete a job.
Returns 204 No Content
on success.
Error Handling¶
Invalid Job ID (400)¶
Job Not Found (404)¶
Failed Jobs¶
When a job fails, the error
field contains the error message:
{
"id": "01JQRS7X...",
"status": "failed",
"error": "ValueError: Invalid input",
"error_traceback": "Traceback (most recent call last):\n..."
}
Job Deletion During Streaming¶
If a job is deleted while being streamed, the SSE connection sends a final event and closes:
Testing¶
Manual Testing¶
Terminal 1: Start service
Terminal 2: Submit job and stream status
# Submit job and capture job ID
JOB_ID=$(curl -s -X POST http://localhost:8000/api/v1/slow-compute \
-H "Content-Type: application/json" \
-d '{"steps": 30}' | jq -r '.job_id')
echo "Job ID: $JOB_ID"
# Stream status updates in real-time
curl -N http://localhost:8000/api/v1/jobs/$JOB_ID/\$stream
Browser Testing¶
- Submit job via Swagger UI: http://localhost:8000/docs
- Open browser console (F12)
- Run JavaScript:
const jobId = "01JQRS7X..."; // From Swagger response
const es = new EventSource(`/api/v1/jobs/${jobId}/$stream`);
es.onmessage = (e) => console.log(JSON.parse(e.data));
Production Deployment¶
Concurrency Limits¶
Set max_concurrency
to prevent resource exhaustion:
Recommendations: - CPU-bound jobs: Set to number of CPU cores - I/O-bound jobs: Higher limits OK (10-50) - Memory-intensive: Lower limits to prevent OOM
Load Balancers and Proxies¶
SSE requires special configuration for long-lived connections.
nginx:
location /api/v1/jobs {
proxy_pass http://backend;
proxy_buffering off; # Required for SSE
proxy_read_timeout 600s; # Allow long connections
proxy_http_version 1.1;
}
Apache:
<Location /api/v1/jobs>
ProxyPass http://backend
ProxyPassReverse http://backend
ProxyPreserveHost On
SetEnv proxy-nokeepalive 1
</Location>
AWS ALB: - Enable HTTP/2 (supports SSE) - Set idle timeout ≥ 60 seconds
Monitoring¶
Track job metrics:
# Example: Prometheus metrics
from prometheus_client import Counter, Histogram
job_submissions = Counter('jobs_submitted_total', 'Total jobs submitted')
job_duration = Histogram('job_duration_seconds', 'Job execution time')
Troubleshooting¶
Stream Closes Immediately¶
Problem: SSE connection closes right after opening.
Causes: 1. Job already in terminal state 2. Invalid job ID
Solution:
# Check job status first
curl http://localhost:8000/api/v1/jobs/01JQRS7X...
# If completed/failed/canceled, stream will close immediately
No Updates Appearing¶
Problem: Connected but no events streaming.
Causes: 1. cURL buffering enabled 2. Proxy buffering responses
Solution:
# Use -N flag with cURL
curl -N http://localhost:8000/api/v1/jobs/01JQRS7X.../\$stream
# Check proxy configuration (see Production Deployment)
EventSource Not Working in Browser¶
Problem: JavaScript EventSource API fails or doesn't connect.
Causes: 1. CORS issues 2. HTTPS mixed content (HTTPS page, HTTP EventSource) 3. Ad blockers
Solution:
// Check for errors
const es = new EventSource('/api/v1/jobs/01JQRS.../$stream');
es.onerror = (e) => {
console.error('EventSource error:', e);
console.log('ReadyState:', es.readyState); // 0=connecting, 1=open, 2=closed
};
// CORS: Ensure same origin or proper CORS headers
// HTTPS: Use HTTPS for both page and EventSource
// Ad blockers: Disable and test
High CPU Usage¶
Problem: Scheduler consuming excessive CPU.
Causes: 1. Too many concurrent jobs 2. Short poll_interval with many streams
Solution:
# Increase poll interval
curl -N "http://localhost:8000/api/v1/jobs/01JQRS.../\$stream?poll_interval=1.0"
Examples¶
Complete Workflow¶
# 1. Submit job and extract job_id
JOB_ID=$(curl -s -X POST http://localhost:8000/api/v1/slow-compute \
-H "Content-Type: application/json" \
-d '{"steps": 30}' | jq -r '.job_id')
echo "Job ID: $JOB_ID"
# 2. Stream status updates in real-time
curl -N http://localhost:8000/api/v1/jobs/$JOB_ID/\$stream
React Component¶
import { useEffect, useState } from 'react';
function JobStatus({ jobId }) {
const [job, setJob] = useState(null);
useEffect(() => {
const eventSource = new EventSource(`/api/v1/jobs/${jobId}/$stream`);
eventSource.onmessage = (event) => {
const jobData = JSON.parse(event.data);
setJob(jobData);
// Close when finished
if (['completed', 'failed', 'canceled'].includes(jobData.status)) {
eventSource.close();
}
};
eventSource.onerror = () => {
eventSource.close();
};
return () => eventSource.close();
}, [jobId]);
return (
<div>
<h3>Job {jobId}</h3>
<p>Status: {job?.status || 'connecting...'}</p>
{job?.error && <p className="error">{job.error}</p>}
</div>
);
}
Vue Component¶
<template>
<div>
<h3>Job {{ jobId }}</h3>
<p>Status: {{ job?.status || 'connecting...' }}</p>
<p v-if="job?.error" class="error">{{ job.error }}</p>
</div>
</template>
<script setup>
import { ref, onMounted, onUnmounted } from 'vue';
const props = defineProps(['jobId']);
const job = ref(null);
let eventSource;
onMounted(() => {
eventSource = new EventSource(`/api/v1/jobs/${props.jobId}/$stream`);
eventSource.onmessage = (event) => {
const jobData = JSON.parse(event.data);
job.value = jobData;
if (['completed', 'failed', 'canceled'].includes(jobData.status)) {
eventSource.close();
}
};
});
onUnmounted(() => {
eventSource?.close();
});
</script>
Next Steps¶
- ML Workflows: Combine with
.with_ml()
for training jobs - Task Execution: Use with
.with_tasks()
for script execution - Artifact Storage: Jobs can return ULIDs to link results
For more examples:
- examples/job_scheduler_api.py
- Basic job scheduler
- examples/job_scheduler_sse_api.py
- SSE streaming (30s job)
- examples/task_execution_api.py
- Task execution with jobs