nagare (ๆตใ) - "flow" in Japanese
Like a nagare carving its path through mountains, Nagare guides your data streams with grace and power
Nagare is a next-generation stream processing library that delivers 5-10x performance improvements over traditional JavaScript stream libraries. Built with Rust/WASM for compute-heavy workloads, SIMD acceleration, and designed specifically for edge computing environments like Cloudflare Workers.
We include a checksum-verified benchmark suite to compare Nagare, RxJS and native loops on common streaming patterns.
- Map + Filter (array sources)
- Reduce (sum) over arrays
- ConcatMap (outer ร inner)
Run:
npm run build:ts && node benches/standard-suite.mjs
Notes:
- The suite validates outputs via checksums to prevent โtoo good to be trueโ results.
- You can toggle array fusion (JIT-compiled kernels) programmatically:
Nagare.setFusionEnabled(true|false)- Default runs report both fusion on/off.
- For WebAssembly tests, use
npm run test:wasmto run the Vitest suite with real WASM.
Nagare.setFusionEnabled(false); // disable array kernels (semantic, slower)
Nagare.setFusionEnabled(true); // enable array kernels (fast)
Nagare.setJitMode('fast'); // or 'off' to disable codegen๐ Benchmark Results (vs RxJS, after WASM warmup)
โโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโ
Map + Filter : 8.78x faster โก
Scan/Reduce : 7.32x faster โก
Complex Pipeline : 1.52x faster โก
Average Speedup : 5.87x ๐
Note: Performance gains are measured after WASM initialization. Initial cold-start includes ~50ms WASM loading overhead, making Nagare optimal for sustained processing workloads rather than one-off operations.
- First-class Cloudflare Durable Objects support
- WebSocket hibernation for efficient real-time streams
- Optimized for V8 isolates and edge runtimes
- Rust core compiled to WebAssembly
- SIMD acceleration for numeric operations
- Zero-copy BYOB streaming
- JIT compilation and operator fusion
- Full TypeScript support with strict typing
- Comprehensive error handling and recovery
- Credit-based backpressure control
- Battle-tested in production environments
- Familiar RxJS-like API
- Tree-shakeable and optimized bundles
- Extensive documentation and examples
- Works in Node.js, browsers, and edge
npm install @aid-on/nagareyarn add @aid-on/nagarepnpm add @aid-on/nagareimport { Nagare } from '@aid-on/nagare';
// Create and transform a stream
const stream = Nagare.from([1, 2, 3, 4, 5])
.map(x => x * 2)
.filter(x => x > 5)
.scan((acc, x) => acc + x, 0);
// Subscribe with automatic cleanup
using subscription = stream.observe(
value => console.log('Value:', value),
{
onComplete: () => console.log('Complete!'),
onError: error => console.error('Error:', error)
}
);
### Common Aggregates (JIT-optimized)
```typescript
// Numbers
const values = Nagare.from([3, 1, 4, 1, 5]);
const s = await values.sum(); // 14
const mn = await values.min(); // 1
const mx = await values.max(); // 5
const avg = await values.mean();// 2.8
// Find with predicate
const firstEven = await values.find(x => x % 2 === 0); // 4These aggregates use the same array fast paths / fused operator chains as reduce, avoiding intermediate arrays.
### SIMD-Accelerated Processing
```typescript
// Process large Float32Arrays with SIMD
const audioData = new Float32Array(1_000_000);
const processed = await Nagare
.from([audioData])
.mapWasm('fft_transform') // Fast Fourier Transform
.mapWasm('noise_reduction') // SIMD noise reduction
.toArray();
// Float64 support
import { processFloat64Batch, simdMapMulAdd64 } from '@aid-on/nagare';
const doubles = new Float64Array(1_000_000);
const squared = await processFloat64Batch(doubles, 'square');
import { createFromWebSocket } from '@aid-on/nagare';
const wsStream = createFromWebSocket(socket, {
binary: true,
reconnect: true
});
wsStream
.map(msg => JSON.parse(msg.data))
.filter(event => event.type === 'trade')
.scan((volume, trade) => volume + trade.amount, 0)
.observe(volume => {
console.log('Total volume:', volume);
});Nagare<T, E> is the core abstraction - a lazy, composable stream that can be transformed, merged, and observed.
// Multiple ways to create nagares
const fromArray = Nagare.from([1, 2, 3]);
const fromPromise = Nagare.from(Promise.resolve(42));
const fromInterval = nagare.interval(1000);
const fromWebStream = Nagare.fromReadableStream(stream);
// Chainable operators
const pipeline = source
.map(transform)
.filter(predicate)
.debounce(300)
.distinctUntilChanged()
.scan(reducer, initial);import { CreditController, AdaptiveBackpressure } from '@aid-on/nagare';
// Credit-based flow control
const credits = new CreditController({
initialCredits: 100,
lowWaterMark: 20,
highWaterMark: 80
});
// Adaptive backpressure based on latency
const adaptive = new AdaptiveBackpressure({
targetLatency: 50, // ms
initialRate: 100,
minRate: 10,
maxRate: 1000
});const resilient = nagare
.map(riskyOperation)
.rescue(error => {
logger.error('Operation failed:', error);
return fallbackValue; // Recover with default
})
.retry(3, 1000) // Retry up to 3 times
.terminateOnErrorMode(); // Stop on critical errorsexport class StreamProcessor extends DurableObject {
private nagare?: Nagare<any>;
async fetch(request: Request) {
// Create a stateful stream processor
this.nagare = Nagare
.fromWebSocket(request)
.map(this.processMessage)
.buffer(100)
.scan(this.aggregate, {});
return new Response('Stream initialized');
}
async alarm() {
// Process buffered data periodically
const batch = await this.nagare?.take(100).toArray();
await this.processBatch(batch);
}
}// Efficient WebSocket handling with hibernation
export class WebSocketDO extends DurableObject {
async webSocketMessage(ws: WebSocket, message: string) {
// Only wake up when messages arrive
const result = await this.processMessage(message);
ws.send(JSON.stringify(result));
}
async webSocketClose(ws: WebSocket) {
// Clean up on disconnect
this.state.deleteWebSocket(ws);
}
}// Real-time trade aggregation
const trades = Nagare.fromEventSource('/trades')
.map(e => JSON.parse(e.data))
.filter(t => t.symbol === 'BTC/USD')
.windowedAggregate(100, 'mean') // 100-trade moving average
.scan((stats, price) => ({
...stats,
vwap: (stats.vwap * stats.count + price) / (stats.count + 1),
count: stats.count + 1
}), { vwap: 0, count: 0 })
.observe(stats => {
dashboard.update(stats);
});// Process sensor telemetry with SIMD
const telemetry = Nagare
.fromMQTT(mqttClient, 'sensors/+/data')
.map(msg => new Float32Array(msg.payload))
.mapWasm('kalman_filter') // SIMD Kalman filtering
.mapWasm('anomaly_detection') // SIMD anomaly detection
.filter(reading => reading.anomalyScore > 0.8)
.observe(anomaly => {
alerts.send(anomaly);
});// Merge and combine multiple streams
const temperature = Nagare.fromSensor('temp');
const humidity = Nagare.fromSensor('humidity');
const pressure = Nagare.fromSensor('pressure');
// Merge all readings (interleaved)
const allReadings = nagare.merge(temperature, humidity, pressure);
// Combine latest values from each
const combined = nagare.combine(temperature, humidity, pressure)
.map(([t, h, p]) => ({
temperature: t,
humidity: h,
pressure: p,
timestamp: Date.now()
}));โโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโ
โ TypeScript API Layer โ
โ (Reactive operators, type safety) โ
โโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโค
โ WASM Bridge (wasm-bindgen) โ
โ (Automatic memory management) โ
โโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโค
โ Rust Core Engine โ
โ โโโโโโโโโโโโ โโโโโโโโโโโโ โโโโโโโโโโโโ โ
โ โ SIMD โ โ Buffer โ โ Flow โ โ
โ โ Kernels โ โ Pool โ โ Control โ โ
โ โโโโโโโโโโโโ โโโโโโโโโโโโ โโโโโโโโโโโโ โ
โโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโค
โ Runtime Adapters โ
โ (Node.js / Browser / Edge Workers) โ
โโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโ
Nagare.from(source) // From iterable/async iterable
Nagare.of(...values) // From values
Nagare.empty() // Empty stream
nagare.range(0, 100) // Numeric range
nagare.interval(1000) // Periodic emissions
createFromWebSocket(ws) // WebSocket stream
createFromEventSource(url) // Server-sent events
createFromFetch(url, opts) // HTTP pollingmap(fn) // Transform values
filter(predicate) // Filter values
scan(reducer, seed) // Accumulate values
take(n) // Take first n
skip(n) // Skip first n
distinctUntilChanged() // Emit on change
debounce(ms) // Debounce emissions
throttle(ms) // Throttle emissions
buffer(size) // Buffer values
bufferTime(ms) // Time-based buffer
pairwise() // Emit consecutive pairs
startWith(...values) // Prepend valuesmerge(...nagares) // Merge streams
fork(predicate) // Split stream
concatMap(fn) // Sequential flatten
switchMap(fn) // Cancel previous
combineLatest(...nagares) // Combine latest values
withLatestFrom(nagare) // Sample other streamrescue(handler) // Recover from errors
retry(count, delay) // Retry on failure
terminateOnErrorMode() // Stop on errormapWasm(kernel, params) // Apply WASM kernel
windowedAggregate(n, op) // SIMD aggregation# Run all tests
npm test
# Run benchmarks
npm run bench
# Run specific benchmark
npm run bench:rxjs
npm run bench:wasm
# Test in browser
npm run test:browser# wrangler.toml
name = "nagare-app"
main = "dist/worker.js"
compatibility_date = "2024-01-01"
[[durable_objects.bindings]]
name = "STREAM_DO"
class_name = "StreamProcessor"npm run build:worker
wrangler deploy// Automatic WASM loading
import { nagare } from '@aid-on/nagare';
const result = await nagare
.from(data)
.mapWasm('simd_kernel')
.toArray();<script type="module">
import { Nagare } from 'https://cdn.skypack.dev/@aid-on/nagare';
const stream = Nagare.from([1, 2, 3])
.map(x => x * 2)
.observe(console.log);
</script>MIT ยฉ Aid-On
Making streams flow faster at the edge
nagare.combineLatest(a, b, ...) behaves like RxJS combineLatest: it emits only after all sources have produced at least one value, then re-emits when any source updates.
const ab = nagare.combineLatest(streamA, streamB);
const out = await ab.toArray();
nagare.combine(...) remains a zip-like pairing of next values.
When each outer element expands to a small, fixed-size array, concatMapArray avoids generator overhead and can preallocate output:
import { concatMapArray, Nagare } from '@aid-on/nagare';
const out = await concatMapArray((x: number) => [x, x + 1])(Nagare.from([1,2,3])).toArray();
// => [1,2, 2,3, 3,4]