Custom and internal dataset management for NSAI with versioning, lineage tracking, and streaming support
Custom/internal dataset management library for North Shore AI.
DatasetsEx is the home for NSAI's own proprietary and custom datasets. It provides comprehensive versioning, lineage tracking, and data curation tools for datasets created and maintained by the NSAI team.
Note on Dataset Libraries: NSAI has two dataset libraries with distinct purposes:
- datasets_ex (this library): For NSAI's own custom/internal/proprietary datasets with full versioning and lineage
- crucible_datasets: For integrating external benchmark datasets via
hf_datasets_ex(HuggingFace) plus evaluation workflowsUse
datasets_exwhen creating and managing your own datasets. Usecrucible_datasetswhen working with standard ML benchmarks (MMLU, GSM8K, HumanEval, etc.) and evaluation metrics.
- Custom Dataset Creation: Build and manage NSAI's proprietary datasets with flexible schemas
- Versioning & Lineage: Track dataset versions with full lineage history, diffs, and SHA-256 checksums
- Reference Dataset Loading: Built-in loaders for reference datasets (SciFact, FEVER) used in CNS development
- LineageIR Integration: Emit artifact references and provenance edges for dataset pipeline tracking
- Smart Splitting: Train/test splits with support for stratification and k-fold cross-validation
- Multiple Formats: Import/export JSONL, JSON, and CSV formats
- Reproducibility: Deterministic splits with seed support
- Data Transformation: Text normalization, deduplication, sampling, balancing, and augmentation
- Quality Checks: Schema validation, duplicate detection, label distribution analysis, and outlier detection
- Streaming Support: Memory-efficient processing of large datasets with lazy evaluation and parallel processing
Add datasets_ex to your list of dependencies in mix.exs:
def deps do
[
{:datasets_ex, "~> 0.2.0"}
]
end# Load entire dataset
{:ok, scifact} = DatasetsEx.load(:scifact)
# Load specific split with limit
{:ok, fever_train} = DatasetsEx.load(:fever, split: :train, limit: 1000)
# List available datasets
DatasetsEx.list()
# => [:scifact, :fever, :gsm8k, :human_eval, :mmlu, :truthful_qa, :hellaswag]
# Get dataset info
DatasetsEx.info(:scifact)
# => %{name: :scifact, size: 5183, splits: [:train, :test], ...}# Load JSONL file
{:ok, dataset} = DatasetsEx.load_file("data.jsonl")
# Load CSV with options
{:ok, dataset} = DatasetsEx.load_file("data.csv",
format: :csv,
schema: :text_classification
)
# Load JSON
{:ok, dataset} = DatasetsEx.load_file("data.json"){:ok, dataset} = DatasetsEx.create("my_dataset", %{
data: [
%{claim: "The sky is blue", evidence: "...", label: "SUPPORTS"},
%{claim: "The earth is flat", evidence: "...", label: "REFUTES"}
],
schema: :claim_evidence,
metadata: %{
source: "manual_annotation",
created_by: "researcher@example.com"
}
})# Simple train/test split
{train, test} = DatasetsEx.split(dataset, ratio: 0.8, seed: 42)
# Train/validation/test split
{train, val, test} = DatasetsEx.split_three(dataset,
ratios: [0.7, 0.15, 0.15],
seed: 42
)
# K-fold cross-validation
folds = DatasetsEx.k_fold(dataset, k: 5, seed: 42)
for {train_fold, test_fold} <- folds do
# Train and evaluate on each fold
end
# Stratified split (maintains class distribution)
{train, test} = DatasetsEx.stratified_split(dataset,
label_key: :label,
ratio: 0.8,
seed: 42
)# Create a version
{:ok, v1} = DatasetsEx.version(dataset, "v1.0.0")
# Load specific version
{:ok, dataset} = DatasetsEx.load_version("my_dataset", "v1.0.0")
# List all versions
DatasetsEx.list_versions("my_dataset")
# => ["v1.0.0", "v1.1.0", "v2.0.0"]
# Get version history
DatasetsEx.lineage("my_dataset")
# => [
# %{version: "v2.0.0", hash: "abc...", created_at: ~U[...], size: 1500},
# %{version: "v1.1.0", hash: "def...", created_at: ~U[...], size: 1200},
# %{version: "v1.0.0", hash: "ghi...", created_at: ~U[...], size: 1000}
# ]DatasetsEx can emit LineageIR artifact references for dataset nodes and pipeline edges:
{:ok, dataset} =
DatasetsEx.create("my_dataset", %{
data: [%{text: "hello", label: :greeting}],
schema: :text_classification
})
dataset_ref = DatasetsEx.artifact_ref(dataset)
normalized = DatasetsEx.Transform.normalize_text(dataset)
normalized_ref = DatasetsEx.artifact_ref(normalized)
edge =
DatasetsEx.lineage_edge(dataset_ref, normalized_ref,
relationship: "derived_from",
metadata: %{operation: "normalize_text"}
)# Export to JSONL
DatasetsEx.export(dataset, format: :jsonl, path: "output.jsonl")
# Export to JSON (pretty-printed)
DatasetsEx.export(dataset, format: :json, path: "output.json", pretty: true)
# Export to CSV
DatasetsEx.export(dataset, format: :csv, path: "output.csv")
# Export specific split
DatasetsEx.export(dataset, format: :jsonl, path: "train.jsonl", split: :train)
# Export with limit
DatasetsEx.export(dataset, format: :jsonl, path: "sample.jsonl", limit: 100)# Get specific split
train_data = DatasetsEx.get_split(dataset, :train)
# List available splits
DatasetsEx.list_splits(dataset)
# => [:train, :test, :validation]
# Get dataset size
DatasetsEx.size(dataset)
# => 5183datasets_ex/
├── lib/
│ └── datasets_ex/
│ ├── dataset.ex # Core Dataset struct
│ ├── registry.ex # Dataset catalog (GenServer)
│ ├── loader.ex # Multi-format loader
│ ├── loaders/
│ │ ├── scifact.ex # SciFact dataset loader
│ │ ├── fever.ex # FEVER dataset loader
│ │ └── jsonl.ex # Generic JSONL loader
│ ├── lineage.ex # LineageIR artifact refs & provenance edges
│ ├── splitter.ex # Train/test splitting
│ ├── versioning.ex # Version management
│ └── export.ex # Multi-format export
├── priv/
│ └── datasets/ # Cached datasets and versions
└── test/
└── datasets_ex/ # Comprehensive test suite
- Size: 5,183 claims
- Splits: train, test
- Schema: claim_evidence
- Task: Scientific claim verification
- Size: 185,445 claims
- Splits: train, dev, test
- Schema: claim_evidence
- Task: Fact extraction and verification
- Size: 8,500 problems
- Splits: train, test
- Schema: math_word_problems
- Task: Grade school math reasoning
- Size: 164 problems
- Splits: (single dataset)
- Schema: code_generation
- Task: Programming problem solving
- Size: 15,908 questions
- Splits: test, dev, val
- Schema: multiple_choice
- Task: Multitask language understanding (57 subjects)
- Size: 817 questions
- Splits: validation
- Schema: truthfulness
- Task: Truthful question answering
- Size: 70,000 examples
- Splits: train, val, test
- Schema: commonsense_nli
- Task: Commonsense natural language inference
All datasets use a consistent structure:
%DatasetsEx.Dataset{
name: "my_dataset",
data: [%{...}, %{...}], # Optional: flat data
splits: %{train: [...], test: [...]}, # Optional: pre-split data
schema: :claim_evidence,
metadata: %{source: "...", ...},
artifact_id: "550e8400-...", # Auto-generated UUID for lineage
version: "v1.0.0",
hash: "abc123..." # SHA-256 of content
}All splitting operations support seeded randomization:
# Same seed = same splits
{train1, test1} = DatasetsEx.split(dataset, seed: 42)
{train2, test2} = DatasetsEx.split(dataset, seed: 42)
train1.data == train2.data # => trueLarge datasets are processed using Elixir streams:
# Only loads what's needed
{:ok, dataset} = DatasetsEx.load(:fever, limit: 1000, offset: 5000)# Load SciFact for claim extraction training
{:ok, scifact} = DatasetsEx.load(:scifact, split: :train)
# Create versioned training set
{:ok, v1} = DatasetsEx.version(scifact, "training-v1.0.0")
# Export for external training
DatasetsEx.export(v1, format: :jsonl, path: "training_data.jsonl")dataset = load_dataset()
folds = DatasetsEx.k_fold(dataset, k: 5, seed: 42)
results = for {train, test} <- folds do
model = train_model(train)
evaluate(model, test)
end
avg_score = Enum.sum(results) / length(results)# Load raw data
{:ok, raw} = DatasetsEx.load_file("raw_data.jsonl")
# Clean and transform
cleaned_data = Enum.map(raw.data, &clean_example/1)
{:ok, cleaned} = DatasetsEx.create("cleaned_dataset", %{data: cleaned_data})
# Version it
{:ok, v1} = DatasetsEx.version(cleaned, "v1.0.0")
# Split for experiments
{train, val, test} = DatasetsEx.split_three(v1, seed: 42)
# Export splits
DatasetsEx.export(train, format: :jsonl, path: "train.jsonl")
DatasetsEx.export(val, format: :jsonl, path: "val.jsonl")
DatasetsEx.export(test, format: :jsonl, path: "test.jsonl")Transform datasets with text preprocessing, filtering, and augmentation:
alias DatasetsEx.Transform
# Normalize text
dataset
|> Transform.normalize_text(lowercase: true, trim: true)
# Remove duplicates
|> Transform.deduplicate(:text)
# Filter by condition
|> Transform.filter(fn item -> String.length(item.text) > 10 end)
# Sample random subset
|> Transform.sample(1000, seed: 42)
# Balance classes
|> Transform.balance_classes(strategy: :oversample, label_key: :label)
# Add text noise for augmentation
|> Transform.add_text_noise(char_noise_prob: 0.05, word_drop_prob: 0.1)Validate and analyze dataset quality:
alias DatasetsEx.Quality
# Validate schema
{:ok, result} = Quality.validate_schema(dataset,
required_keys: [:text, :label],
type_checks: %{text: &is_binary/1, label: &is_atom/1}
)
# Detect duplicates
result = Quality.detect_duplicates(dataset, key: :text, ignore_case: true)
# => %{duplicate_groups: 5, duplicate_rate: 0.12, ...}
# Analyze label distribution
result = Quality.label_distribution(dataset, label_key: :label)
# => %{num_classes: 3, is_balanced: false, distribution: %{...}}
# Profile dataset
profile = Quality.profile(dataset)
# => %{text_stats: %{min_length: 5, max_length: 500, ...}, vocabulary: %{...}}
# Detect outliers
result = Quality.detect_outliers(dataset, field: :score, method: :iqr)
# => %{outlier_count: 12, outlier_indices: [45, 67, ...]}Process large datasets efficiently without loading everything into memory:
alias DatasetsEx.Stream
# Stream from dataset
dataset
|> Stream.lazy()
|> Stream.map_stream(fn item -> transform(item) end)
|> Stream.filter_stream(fn item -> valid?(item) end)
|> Enum.take(1000)
# Stream from file
Stream.from_file("large_dataset.jsonl", format: :jsonl)
|> Stream.take(10000)
|> Enum.to_list()
# Batch processing
dataset
|> Stream.batch(batch_size: 32)
|> Enum.each(fn batch -> process_batch(batch) end)
# Parallel processing
dataset
|> Stream.lazy()
|> Stream.parallel_map(
fn item -> expensive_operation(item) end,
max_concurrency: 4
)
|> Enum.to_list()Run the test suite:
mix test
mix test --coverCurrent test count: 76 tests
Generate documentation:
mix docsView at doc/index.html
DatasetsEx is part of the North Shore AI monorepo. Contributions welcome!
MIT