Research-grade instrumentation and metrics collection for AI/ML experiments in Elixir.
CrucibleTelemetry provides specialized observability for rigorous scientific experimentation, going beyond standard production telemetry with features designed for AI/ML research workflows.
- Experiment Isolation — Run multiple experiments concurrently without cross-contamination
- Centralized Event Registry — Programmatic access to all telemetry event definitions
- Rich Metadata Enrichment — Automatic context, timestamps, and custom tags
- ML Training Support — Track epochs, batches, checkpoints, and training metrics
- MetricsStore Port — Pluggable adapter system for persisting training metrics
- Inference Monitoring — Model deployment and inference telemetry
- Pipeline Tracking — Framework stage execution observability
- Streaming Metrics — Real-time latency/cost/reliability stats with O(1) memory
- Time-Window Queries — Fetch last N events or ranges without full rescans
- Multiple Export Formats — CSV, JSON Lines for Python, R, Julia, Excel
- Pause/Resume Lifecycle — Temporarily halt collection without losing state
def deps do
[
{:crucible_telemetry, "~> 0.5.0"}
]
end# Start an experiment
{:ok, experiment} = CrucibleTelemetry.start_experiment(
name: "bert_finetuning",
hypothesis: "Fine-tuned BERT achieves >95% accuracy",
tags: ["training", "bert", "nlp"]
)
# Events are automatically collected via telemetry
# Your existing :telemetry.execute() calls work unchanged
# Stop and analyze
{:ok, _} = CrucibleTelemetry.stop_experiment(experiment.id)
metrics = CrucibleTelemetry.calculate_metrics(experiment.id)
# => %{latency: %{mean: 150.5, p95: 250.0}, cost: %{total: 0.025}, ...}
# Export for analysis
{:ok, path} = CrucibleTelemetry.export(experiment.id, :csv)CrucibleTelemetry provides a centralized registry of all supported telemetry events:
# Get all standard events
CrucibleTelemetry.Events.standard_events()
# Get events by category
CrucibleTelemetry.Events.training_events()
CrucibleTelemetry.Events.deployment_events()
CrucibleTelemetry.Events.framework_events()
CrucibleTelemetry.Events.llm_events()
# Get events organized by category
CrucibleTelemetry.Events.events_by_category()
# => %{llm: [...], training: [...], deployment: [...], ...}
# Get info about a specific event
CrucibleTelemetry.Events.event_info([:crucible_train, :epoch, :stop])
# => %{category: :training, description: "Epoch completed with metrics"}| Event | Description |
|---|---|
[:req_llm, :request, :start] |
LLM request started |
[:req_llm, :request, :stop] |
LLM request completed |
[:req_llm, :request, :exception] |
LLM request failed |
| Event | Description | Enriched Fields |
|---|---|---|
[:crucible_train, :training, :start] |
Training job started | — |
[:crucible_train, :training, :stop] |
Training job completed | — |
[:crucible_train, :epoch, :start] |
Epoch started | epoch |
[:crucible_train, :epoch, :stop] |
Epoch completed | epoch, loss, accuracy, learning_rate |
[:crucible_train, :batch, :stop] |
Batch completed | epoch, batch, loss, gradient_norm |
[:crucible_train, :checkpoint, :saved] |
Checkpoint saved | epoch, checkpoint_path |
| Event | Description | Enriched Fields |
|---|---|---|
[:crucible_deployment, :inference, :start] |
Inference started | model_name, model_version |
[:crucible_deployment, :inference, :stop] |
Inference completed | input_size, output_size, batch_size |
[:crucible_deployment, :inference, :exception] |
Inference failed | — |
| Event | Description | Enriched Fields |
|---|---|---|
[:crucible_framework, :pipeline, :start] |
Pipeline started | pipeline_id |
[:crucible_framework, :pipeline, :stop] |
Pipeline completed | pipeline_id |
[:crucible_framework, :stage, :start] |
Stage started | stage_name, stage_index |
[:crucible_framework, :stage, :stop] |
Stage completed | stage_name, stage_index |
[:ensemble, :prediction, :start|stop]— Ensemble predictions[:ensemble, :vote, :completed]— Voting results[:hedging, :request, :start|duplicated|stop]— Request hedging[:causal_trace, :event, :created]— Reasoning traces[:altar, :tool, :start|stop]— Tool invocations
Track ML training jobs by emitting standard training events:
defmodule MyTrainer do
def train(model, data, epochs) do
:telemetry.execute(
[:crucible_train, :training, :start],
%{system_time: System.system_time()},
%{model_name: "bert-base", config: %{epochs: epochs}}
)
for epoch <- 1..epochs do
:telemetry.execute(
[:crucible_train, :epoch, :start],
%{system_time: System.system_time()},
%{epoch: epoch}
)
{loss, accuracy} = train_epoch(model, data)
:telemetry.execute(
[:crucible_train, :epoch, :stop],
%{duration: epoch_duration, loss: loss, accuracy: accuracy},
%{epoch: epoch, learning_rate: get_lr()}
)
end
:telemetry.execute(
[:crucible_train, :training, :stop],
%{duration: total_duration},
%{final_loss: final_loss}
)
end
endThe MetricsStore port provides a pluggable adapter system for persisting training metrics to various backends.
alias CrucibleTelemetry.Ports.MetricsStore
alias CrucibleTelemetry.Adapters.JSONLMetrics
# Create an adapter reference
adapter = {JSONLMetrics, [path: "/tmp/training/metrics.jsonl"]}
# Record metrics during training
MetricsStore.record(adapter, "run_123", :loss, 2.5, step: 0)
MetricsStore.record(adapter, "run_123", :loss, 1.8, step: 100)
MetricsStore.record(adapter, "run_123", :lr, 0.001, step: 100, metadata: %{epoch: 1})
# Flush any buffered data
MetricsStore.flush(adapter, "run_123")
# Read metrics back
{:ok, entries} = MetricsStore.read(adapter, "run_123")The built-in JSONL adapter writes metrics as newline-delimited JSON:
{"run_id":"run_123","metric":"loss","value":2.5,"step":0,"timestamp":"2025-12-28T10:30:00Z","metadata":{}}
{"run_id":"run_123","metric":"loss","value":1.8,"step":100,"timestamp":"2025-12-28T10:31:00Z","metadata":{}}Implement the CrucibleTelemetry.Ports.MetricsStore behaviour:
defmodule MyApp.Adapters.PostgresMetrics do
@behaviour CrucibleTelemetry.Ports.MetricsStore
@impl true
def record(opts, run_id, metric_name, value, record_opts) do
# Insert into database
:ok
end
@impl true
def flush(opts, run_id), do: :ok
@impl true
def read(opts, run_id) do
# Query database
{:ok, entries}
end
endmetrics = CrucibleTelemetry.calculate_metrics(experiment.id)
# Latency
metrics.latency.mean # Average latency
metrics.latency.p95 # 95th percentile
metrics.latency.p99 # 99th percentile
# Cost
metrics.cost.total # Total cost in USD
metrics.cost.cost_per_1m_requests # Projected cost for 1M requests
# Reliability
metrics.reliability.success_rate # Success rate (0.0-1.0)
metrics.reliability.sla_99 # Meets 99% SLA?
# Tokens
metrics.tokens.total_prompt # Total prompt tokens
metrics.tokens.mean_total # Average tokens per requestReal-time metrics update on every collected event:
# Get live metrics
metrics = CrucibleTelemetry.StreamingMetrics.get_metrics(experiment.id)
# Reset accumulators
CrucibleTelemetry.StreamingMetrics.reset(experiment.id)
# Stop streaming
CrucibleTelemetry.StreamingMetrics.stop(experiment.id)alias CrucibleTelemetry.Store
# Last 5 minutes
Store.query_window(experiment.id, {:last, 5, :minutes})
# Last 200 events
Store.query_window(experiment.id, {:last_n, 200})
# Specific time range with filter
Store.query_window(experiment.id, {:range, t_start, t_end}, &(&1.success))
# Sliding window metrics (5-min windows, 1-min step)
Store.windowed_metrics(experiment.id, 5 * 60_000_000, 60_000_000){:ok, paused} = CrucibleTelemetry.pause_experiment(experiment.id)
# ... maintenance ...
{:ok, resumed} = CrucibleTelemetry.resume_experiment(experiment.id)
CrucibleTelemetry.paused?(experiment.id) # => true/false{:ok, path} = CrucibleTelemetry.export(experiment.id, :csv,
path: "results/experiment.csv"
){:ok, path} = CrucibleTelemetry.export(experiment.id, :jsonl,
path: "results/experiment.jsonl"
)# Control group
{:ok, control} = CrucibleTelemetry.start_experiment(
name: "control_single_model",
condition: "control",
tags: ["ab_test"]
)
# Treatment group
{:ok, treatment} = CrucibleTelemetry.start_experiment(
name: "treatment_ensemble",
condition: "treatment",
tags: ["ab_test"]
)
# ... run workloads ...
# Compare results
comparison = CrucibleTelemetry.Analysis.compare_experiments([
control.id,
treatment.id
])| Function | Description |
|---|---|
start_experiment/1 |
Start a new experiment |
stop_experiment/1 |
Stop an experiment |
pause_experiment/1 |
Pause data collection |
resume_experiment/1 |
Resume data collection |
paused?/1 |
Check if experiment is paused |
get_experiment/1 |
Get experiment details |
list_experiments/0 |
List all experiments |
export/3 |
Export data to file |
calculate_metrics/1 |
Calculate comprehensive metrics |
| Function | Description |
|---|---|
standard_events/0 |
All standard telemetry events |
training_events/0 |
Training-related events |
deployment_events/0 |
Deployment-related events |
framework_events/0 |
Framework-related events |
llm_events/0 |
LLM-related events |
events_by_category/0 |
Events organized by category |
event_info/1 |
Get info about a specific event |
| Function | Description |
|---|---|
get_all/1 |
Get all events |
query/2 |
Query with filters |
query_window/3 |
Time-window queries |
windowed_metrics/3 |
Sliding window metrics |
| Function | Description |
|---|---|
record/5 |
Record a metric value at a step |
flush/2 |
Flush buffered metrics to storage |
read/2 |
Read all metrics for a run |
- Event handling: <1μs per event (in-memory ETS insert)
- Storage: Up to 1M events in memory (~100-500MB)
- Query: Fast filtering with ETS ordered_set
- Export: Streaming to avoid memory spikes
- Streaming metrics: O(1) space using online algorithms
mix test
mix test --cover- PostgreSQL backend for persistent storage
- TimescaleDB support for time-series optimization
- Parquet export format
- LiveView dashboard for real-time monitoring
- Statistical hypothesis testing (t-test, chi-square)
- Continuous aggregates
- S3 archival support
- Multi-node distributed experiments
MIT License — see LICENSE for details.