Unified job scheduler for the North-Shore-AI platform
NsaiWork provides a protocol-first, multi-tenant job scheduling system with priority queues, resource-aware scheduling, and pluggable backend execution.
- Unified Job IR: Single representation for all work types (tool calls, experiments, training, inference)
- Priority Queues: Four levels (realtime/interactive/batch/offline) with FIFO ordering
- Multi-Tenancy: Tenant isolation with quotas and fairness
- Resource-Aware: CPU, GPU, memory, cost-aware scheduling
- Pluggable Backends: Local, ALTAR, Modal, Ray, and custom backends
- Retry Policies: Configurable exponential/linear/constant backoff
- Telemetry Integration: Full observability via Erlang telemetry
- ETS-Based Registry: Fast in-memory job storage and indexing
Add nsai_work to your list of dependencies in mix.exs:
def deps do
[
{:nsai_work, "~> 0.1.0"}
]
end# Start the application
{:ok, _} = Application.ensure_all_started(:nsai_work)
# Create a job
job = NsaiWork.Job.new(
kind: :tool_call,
tenant_id: "acme",
namespace: "default",
priority: :interactive,
payload: %{tool: "calculator", args: [2, 2]},
resources: NsaiWork.Resources.new(cpu: 1.0, memory_mb: 512)
)
# Submit for execution
{:ok, submitted} = NsaiWork.submit(job)
# Check status
{:ok, job} = NsaiWork.get(submitted.id)
IO.inspect(job.status) # => :running or :succeeded
# Get statistics
stats = NsaiWork.stats()
IO.inspect(stats)Runnable examples are in the examples/ directory:
basic_job.exs— Submit and track a jobpriority_queues.exs— Priority queue routingcustom_backend.exs— Implement a custom backendtelemetry_events.exs— Observe telemetry eventsretry_policies.exs— Configure retry behavior
Run any example with:
mix run examples/basic_job.exsOr run all examples:
./examples/run_all.shNsaiWork consists of several cooperating components:
┌─────────────────────────────────────────────────┐
│ Application Layer │
│ (Crucible, Synapse, CNS, ALTAR, etc.) │
└───────────────────────┬─────────────────────────┘
│ submit jobs
▼
┌─────────────────────────────────────────────────┐
│ NsaiWork.Scheduler │
│ - Admission control │
│ - Priority queue routing │
│ - Resource-aware dispatch │
└────────────────────┬────────────────────────────┘
│
┌────────────┼────────────┐
▼ ▼ ▼
[realtime] [interactive] [batch] [offline]
│ │ │ │
└────────────┴────────────┴─────────┘
│
▼
┌─────────────────────────────────────────────────┐
│ NsaiWork.Executor │
│ - Backend selection │
│ - Job execution │
│ - Retry handling │
└────────────────────┬────────────────────────────┘
│
┌────────────┼────────────┐
▼ ▼ ▼
[Local] [ALTAR] [Modal] [Custom]
- NsaiWork.Job: Universal job IR with metadata, resources, constraints
- NsaiWork.Scheduler: Priority queue management and admission control
- NsaiWork.Queue: FIFO queues per priority level
- NsaiWork.Executor: Job execution with backend delegation
- NsaiWork.Registry: ETS-based job storage and indexing
- NsaiWork.Backend: Pluggable execution backends
- NsaiWork.Telemetry: Event instrumentation
Jobs move through the following states:
pending -> queued -> running -> {succeeded | failed | canceled | timeout}
↓
(retry) -> queued
- pending: Job created, waiting for admission
- queued: Admitted and waiting in priority queue
- running: Executing on a backend
- succeeded: Completed successfully
- failed: Execution failed (may retry)
- canceled: Explicitly canceled
- timeout: Exceeded time limit
Jobs are scheduled based on four priority levels:
- realtime: Immediate execution, user-blocking operations
- interactive: User-facing operations with quick turnaround
- batch: Background processing, experiments, training
- offline: Bulk operations, data processing, low priority
Specify resource requirements for intelligent scheduling:
resources = NsaiWork.Resources.new(
cpu: 4.0, # CPU cores
memory_mb: 8192, # Memory in MB
gpu: "A100", # GPU type
gpu_count: 1, # Number of GPUs
timeout_ms: 300_000, # 5 minute timeout
max_cost_usd: 1.0 # Cost cap
)
job = NsaiWork.Job.new(
kind: :training_step,
tenant_id: "acme",
namespace: "ml",
payload: %{model: "llama-3.1-8b"},
resources: resources
)Configure retry behavior per job:
constraints = NsaiWork.Constraints.new(
retry_policy: %{
max_attempts: 5,
backoff: :exponential,
base_delay_ms: 1000,
max_delay_ms: 60_000,
jitter: true
}
)
job = NsaiWork.Job.new(
kind: :tool_call,
tenant_id: "acme",
namespace: "default",
payload: %{},
constraints: constraints
)NsaiWork ships with three built-in backends:
Direct BEAM process execution (development, lightweight tasks):
# Automatically selected for simple jobs
job = NsaiWork.Job.new(
kind: :tool_call,
tenant_id: "test",
namespace: "default",
payload: %{tool: "echo"}
)Executes tool calls via ALTAR's LATER runtime with ADM function declarations:
# Enable ALTAR in config
config :nsai_work,
enable_altar: true,
altar_registry: NsaiWork.AltarRegistry
# Register a tool
NsaiWork.AltarTools.register(
"proposer_extract",
"Extract claims from document",
%{
type: :OBJECT,
properties: %{
"doc_id" => %{type: :STRING, description: "Document ID"},
"max_claims" => %{type: :NUMBER, description: "Max claims"}
},
required: ["doc_id"]
},
&MyApp.CNS.Proposer.extract/1
)
# Submit a tool call job
job = NsaiWork.Job.new(
kind: :tool_call,
tenant_id: "acme",
namespace: "cns",
payload: %{
tool_name: "proposer_extract",
args: %{doc_id: "doc_123", max_claims: 10}
}
)
{:ok, job} = NsaiWork.submit(job)See the ALTAR Integration section for more details.
Testing backend with configurable behavior:
# Configure mock to fail
NsaiWork.Backends.Mock.configure(behavior: {:fail, "Simulated failure"})
# Or delay execution
NsaiWork.Backends.Mock.configure(delay_ms: 500)
# Check execution history
history = NsaiWork.Backends.Mock.history()Implement the NsaiWork.Backend behaviour:
defmodule MyApp.Backend.Custom do
@behaviour NsaiWork.Backend
@impl true
def execute(job) do
# Your execution logic
{:ok, result}
end
@impl true
def supports?(job) do
job.kind == :custom_work
end
end
# Configure executor with custom backend
NsaiWork.Executor.start_link(
backends: [
local: NsaiWork.Backends.Local,
custom: MyApp.Backend.Custom
]
)NsaiWork emits telemetry events for observability:
# Attach a handler
:telemetry.attach(
"nsai-work-metrics",
[:nsai_work, :job, :completed],
&MyApp.Metrics.handle_event/4,
nil
)
# Or use the built-in console logger
NsaiWork.Telemetry.attach_console_logger()[:nsai_work, :job, :submitted]- Job submitted[:nsai_work, :job, :queued]- Job enqueued[:nsai_work, :job, :started]- Job execution started[:nsai_work, :job, :completed]- Job completed[:nsai_work, :job, :canceled]- Job canceled[:nsai_work, :job, :retry]- Job retry scheduled[:nsai_work, :scheduler, :admit]- Admission decision[:nsai_work, :scheduler, :select_backend]- Backend selection[:nsai_work, :queue, :enqueue]- Queue enqueue[:nsai_work, :queue, :dequeue]- Queue dequeue[:nsai_work, :queue, :overflow]- Queue capacity exceeded
All jobs belong to a tenant and namespace:
job = NsaiWork.Job.new(
tenant_id: "customer-123",
namespace: "production",
# ...
)
# List jobs for a tenant
jobs = NsaiWork.list("customer-123")
# Filter by namespace
jobs = NsaiWork.list("customer-123", namespace: "production")
# Filter by status
running = NsaiWork.list("customer-123", status: :running)ALTAR (Adaptive Language Tool and Action Runtime) provides a local execution runtime for tool calls with ADM (function declaration) support. The ALTAR backend enables NsaiWork to execute tool calls through ALTAR's LATER runtime.
- Add ALTAR to your dependencies (already included with NsaiWork):
{:altar, "~> 0.2.0"}- Enable ALTAR in your application config:
# config/config.exs
config :nsai_work,
enable_altar: true,
altar_registry: NsaiWork.AltarRegistryUse NsaiWork.AltarTools to register tools with ALTAR:
# Simple tool
NsaiWork.AltarTools.register(
"calculator_add",
"Add two numbers",
%{
type: :OBJECT,
properties: %{
"a" => %{type: :NUMBER, description: "First number"},
"b" => %{type: :NUMBER, description: "Second number"}
},
required: ["a", "b"]
},
fn %{"a" => a, "b" => b} -> {:ok, a + b} end
)
# Complex tool with nested objects
NsaiWork.AltarTools.register(
"search_documents",
"Search document corpus with filters",
%{
type: :OBJECT,
properties: %{
"query" => %{type: :STRING, description: "Search query"},
"limit" => %{type: :NUMBER, description: "Max results"},
"filters" => %{
type: :OBJECT,
properties: %{
"category" => %{type: :STRING},
"date_range" => %{
type: :OBJECT,
properties: %{
"start" => %{type: :STRING},
"end" => %{type: :STRING}
}
}
}
}
},
required: ["query"]
},
&MyApp.Search.search/1
)Tool functions must:
- Accept a single argument (map of parameters)
- Return
{:ok, result}on success - Return
{:error, reason}on failure
Example:
defmodule MyApp.CNS.Proposer do
def extract(%{"doc_id" => doc_id} = args) do
max_claims = Map.get(args, "max_claims", 10)
case fetch_document(doc_id) do
{:ok, document} ->
claims = extract_claims(document, max_claims)
{:ok, %{claims: claims, doc_id: doc_id}}
{:error, reason} ->
{:error, "Failed to extract claims: #{reason}"}
end
end
endCreate and submit jobs with the :tool_call kind:
job = NsaiWork.Job.new(
kind: :tool_call,
tenant_id: "acme",
namespace: "default",
priority: :interactive,
payload: %{
tool_name: "proposer_extract",
args: %{
doc_id: "doc_123",
max_claims: 5
}
}
)
{:ok, submitted_job} = NsaiWork.submit(job)
# Wait for completion
case NsaiWork.get(submitted_job.id) do
{:ok, %{status: :succeeded, result: result}} ->
IO.inspect(result)
{:ok, %{status: :failed, error: error}} ->
IO.puts("Job failed: #{error.message}")
end# Check if tool exists
if NsaiWork.AltarTools.registered?("calculator_add") do
IO.puts("Tool is available")
endFor CNS (Critic-Network Synthesis) agents:
# Register Proposer agent tool
NsaiWork.AltarTools.register(
"cns_proposer",
"Extract structured claims from documents",
%{
type: :OBJECT,
properties: %{
"document" => %{type: :STRING, description: "Source document"},
"schema" => %{type: :STRING, description: "Target schema"}
},
required: ["document"]
},
&CNS.Agents.Proposer.extract/1
)
# Register Antagonist agent tool
NsaiWork.AltarTools.register(
"cns_antagonist",
"Find contradictions in claim networks",
%{
type: :OBJECT,
properties: %{
"sno_graph" => %{type: :OBJECT, description: "SNO graph"},
"beta1_threshold" => %{type: :NUMBER, description: "β₁ threshold"}
},
required: ["sno_graph"]
},
&CNS.Agents.Antagonist.critique/1
)
# Execute dialectical workflow
proposer_job = NsaiWork.Job.new(
kind: :tool_call,
tenant_id: "research",
namespace: "cns",
priority: :batch,
payload: %{
tool_name: "cns_proposer",
args: %{document: document_text, schema: "scifact"}
}
)
{:ok, proposer_result} = NsaiWork.submit(proposer_job)defmodule Crucible.Pipeline.Runner do
def run_stage(experiment, stage, context) do
job = NsaiWork.Job.new(
kind: :experiment_step,
tenant_id: context[:tenant_id],
namespace: experiment.id,
priority: :batch,
payload: %{
experiment_id: experiment.id,
stage_name: stage.name,
input_context: context
},
constraints: NsaiWork.Constraints.new(
concurrency_group: "experiment:#{experiment.id}"
)
)
NsaiWork.submit(job)
end
enddefmodule ALTAR.GRID do
def dispatch(tool, input, opts \\ []) do
job = NsaiWork.Job.new(
kind: :tool_call,
tenant_id: opts[:tenant_id] || "default",
namespace: opts[:namespace] || "default",
priority: opts[:priority] || :interactive,
payload: %{
tool_id: tool.id,
function_name: tool.name,
arguments: input
},
resources: translate_resources(tool.resources)
)
NsaiWork.submit(job)
end
end- Persistent storage backend (Postgres/Ecto)
- Tenant quotas and rate limiting
- ALTAR backend integration
- Modal.com backend adapter
- Ray backend adapter
- Job dependencies and DAGs
- Concurrency groups
- Cost tracking and budgets
- Web UI for monitoring
This is part of the North-Shore-AI platform. Contributions welcome!
MIT License - see LICENSE file for details