Labeling queue and governance toolkit for human-in-the-loop workflows. Anvil provides GenServer-based queues for fast, in-memory work plus a Postgres/Oban pipeline for production-grade exports, telemetry, and retention.
Add Anvil to your dependencies:
{:anvil_ex, "~> 0.1.1"}- Schema-driven validation with typed fields (
:text,:select,:multiselect,:range,:number,:boolean,:date,:datetime) - Pluggable assignment policies: round-robin, random, weighted expertise, redundancy (k labels per sample), or custom policy modules
- Storage adapters for ETS (default) and Postgres (
Anvil.Storage.Postgres) with Ecto schemas for queues, assignments, labels, schema versions, and audit logs - Agreement metrics (Cohen, Fleiss, Krippendorff) with telemetry and background recomputation
- PII-aware exports with redaction, pseudonyms, manifests, and reproducibility verification
- Background jobs via Oban for timeouts, agreement recompute, and retention sweeps
- Optional Forge sample bridge and simple ACL helpers for queue membership
# 1) Define a schema
schema =
Anvil.Schema.new(
name: "sentiment",
fields: [
%Anvil.Schema.Field{
name: "sentiment",
type: :select,
required: true,
options: ["positive", "negative", "neutral"]
}
]
)
# 2) Start a queue (ETS storage by default)
{:ok, queue} =
Anvil.create_queue(
queue_id: "sentiment_queue",
schema: schema,
labels_per_sample: 2,
policy: :round_robin
)
# 3) Load work and labelers
Anvil.add_samples(queue, [%{id: "s1", text: "Great product!"}])
Anvil.add_labelers(queue, ["alice", "bob"])
# 4) Pull and start an assignment
{:ok, assignment} = Anvil.get_next_assignment(queue, "alice")
{:ok, assignment} = Anvil.Queue.start_assignment(queue, assignment.id)
# 5) Submit a label (validated against the schema)
{:ok, label} =
Anvil.submit_label(queue, assignment.id, %{"sentiment" => "positive"})
# 6) Fetch labels and compute agreement
labels = Anvil.Queue.get_labels(queue)
{:ok, score} = Anvil.Agreement.compute(labels):round_robin– walks samples in order:random– random sample from available set:expertise– weighted expertise policy (:expertise_scores,:min_expertise, optional:difficulty_field):redundancy– prioritize under-labeled samples; default whenlabels_per_sample > 1- Custom module – pass a module or
{module, config}that implementsAnvil.Queue.Policy
- ETS (default): zero-dependency, in-memory storage for tests and ephemeral queues.
- Postgres: use
Anvil.Storage.Postgres(defaults toAnvil.Repo). Requires the host app to provide the database schema matching the Ecto modules underAnvil.Schema.*; migrations are not bundled in this repo.
{:ok, queue} =
Anvil.create_queue(
queue_id: "prod_queue",
schema: schema,
storage: Anvil.Storage.Postgres
)Anvil.Agreement.compute/2auto-selects Cohen/Fleiss based on rater count or acceptsmetric: :cohen | :fleiss | :krippendorff.- Helpers:
compute_for_field/3,compute_all_dimensions/3, andsummary/3for per-field rollups. - Telemetry emits low-agreement events when scores drop below 0.6.
-
Manifested export (ADR-005): deterministic ordering, SHA256 manifest, optional PII redaction and pseudonyms. Requires Postgres data with schema versions.
{:ok, %{manifest: manifest, output_path: path}} = Anvil.Export.to_format(:csv, queue_id, %{ schema_version_id: schema_version_id, output_path: "/tmp/labels.csv", redaction_mode: :automatic }) {:ok, :reproducible} = Anvil.Export.verify_reproducibility(manifest)
-
Legacy export: works with ETS queues; serializes current in-memory labels.
:ok = Anvil.Export.export(queue, format: :csv, path: "labels.csv")
- PII metadata on fields (
pii,retention_days,redaction_policy) drives redaction and retention. Anvil.PII.Redactorsupports strip/truncate/hash/regex policies and payload redaction modes (:none,:automatic,:aggressive).
-
Anvil ships a Plug/Cowboy server for
/v1IR endpoints. Enable it with:# config/config.exs or runtime.exs config :anvil, :api_server, enabled: true, port: 4101
-
In test, the server is disabled by default. Start your Anvil app (or release) to expose the API for HTTP clients; Ingot’s default adapter expects this.
-
Anvil.PII.Retentionand theAnvil.Workers.RetentionSweepOban job enforce retention windows and optional soft/hard deletion. -
Labeler pseudonyms available via
Anvil.PII.Pseudonym.
- Oban cron (see
config/config.exs): timeout sweeps, agreement recompute, retention sweeps. - Telemetry events cover queue creation, assignment dispatch/completion, validation errors, exports, agreement, and storage queries.
Anvil.ForgeBridgefetches samples via pluggable backends (Direct,HTTP,Cached,Mock) with Cachex-based caching.
Anvil.Auth.ACLprovides queue membership checks (:labeler,:reviewer,:owner) plus helpers for granting/revoking access.- Signed URL and OIDC helpers are available under
Anvil.Auth.
mix test
mix docsMIT License - see LICENSE for details.
Built by the North Shore AI team for the machine learning community.