diff --git a/chans/batch.go b/chans/batch.go index 417995f..5e1e003 100644 --- a/chans/batch.go +++ b/chans/batch.go @@ -5,6 +5,10 @@ import ( "time" ) +// Batch groups items from an input channel into batches based on a maximum size and a timeout. +// A batch is emitted when it reaches the maximum size, the timeout expires, or the input channel closes. +// To emit batches only when full, set the timeout to -1. This function never emits empty batches. +// The timeout countdown starts when the first item is added to a new batch. func Batch[A any](in <-chan A, n int, timeout time.Duration) <-chan []A { if in == nil { return nil @@ -91,6 +95,7 @@ func Batch[A any](in <-chan A, n int, timeout time.Duration) <-chan []A { return out } +// Unbatch is the inverse of Batch. It takes a channel of batches and emits individual items. func Unbatch[A any](in <-chan []A) <-chan A { if in == nil { return nil diff --git a/chans/core.go b/chans/core.go index ee9cb48..9c66cac 100644 --- a/chans/core.go +++ b/chans/core.go @@ -2,30 +2,42 @@ package chans import "github.com/destel/rill/internal/common" +// Map applies a transformation function to each item in an input channel, using n goroutines for concurrency. +// The output order is not guaranteed: results are written to the output as soon as they're ready. +// Use OrderedMap to preserve the input order. func Map[A, B any](in <-chan A, n int, f func(A) B) <-chan B { return common.MapOrFilter(in, n, func(a A) (B, bool) { return f(a), true }) } +// OrderedMap is similar to Map, but it guarantees that the output order is the same as the input order. func OrderedMap[A, B any](in <-chan A, n int, f func(A) B) <-chan B { return common.OrderedMapOrFilter(in, n, func(a A) (B, bool) { return f(a), true }) } +// Filter removes items that do not meet a specified condition, using n goroutines for concurrent processing. +// The output order is not guaranteed: results are written to the output as soon as they're ready. +// Use OrderedFilter to preserve the input order. func Filter[A any](in <-chan A, n int, f func(A) bool) <-chan A { return common.MapOrFilter(in, n, func(a A) (A, bool) { return a, f(a) }) } +// OrderedFilter is similar to Filter, but it guarantees that the output order is the same as the input order. func OrderedFilter[A any](in <-chan A, n int, f func(A) bool) <-chan A { return common.OrderedMapOrFilter(in, n, func(a A) (A, bool) { return a, f(a) }) } +// FlatMap applies a function to each item in an input channel, where the function returns a channel of items. +// These items are then flattened into a single output channel. Uses n goroutines for concurrency. +// The output order is not guaranteed: results are written to the output as soon as they're ready. +// Use OrderedFlatMap to preserve the input order. func FlatMap[A, B any](in <-chan A, n int, f func(A) <-chan B) <-chan B { var zero B return common.MapOrFlatMap(in, n, func(a A) (b B, bb <-chan B, flat bool) { @@ -33,6 +45,7 @@ func FlatMap[A, B any](in <-chan A, n int, f func(A) <-chan B) <-chan B { }) } +// OrderedFlatMap is similar to FlatMap, but it guarantees that the output order is the same as the input order. func OrderedFlatMap[A, B any](in <-chan A, n int, f func(A) <-chan B) <-chan B { var zero B return common.OrderedMapOrFlatMap(in, n, func(a A) (b B, bb <-chan B, flat bool) { @@ -40,13 +53,16 @@ func OrderedFlatMap[A, B any](in <-chan A, n int, f func(A) <-chan B) <-chan B { }) } -// blocking -// todo: explain that if false has been returned for item[i] that it's guranteed that function would have been called for all previous items +// ForEach applies a function to each item in an input channel using n goroutines. The function blocks until +// all items are processed or the function returns false. In case of early termination, ForEach ensures +// the input channel is drained to avoid goroutine leaks, making it safe for use in environments where cleanup is crucial. +// While this function does not guarantee the order of item processing due to its concurrent nature, +// using n = 1 results in sequential processing, as in a simple for-range loop. func ForEach[A any](in <-chan A, n int, f func(A) bool) { if n == 1 { for a := range in { if !f(a) { - defer DrainNB(in) + DrainNB(in) break } } diff --git a/chans/delay.go b/chans/delay.go index f6b1f2d..32a2cb3 100644 --- a/chans/delay.go +++ b/chans/delay.go @@ -71,6 +71,8 @@ type delayedValue[A any] struct { SendAt time.Time } +// Delay postpones the delivery of items from an input channel by a specified duration, maintaining the order. +// Useful for adding delays in processing or simulating latency. func Delay[A any](in <-chan A, delay time.Duration) <-chan A { wrapped := make(chan delayedValue[A]) go func() { diff --git a/chans/merge.go b/chans/merge.go index 6b23e24..873f106 100644 --- a/chans/merge.go +++ b/chans/merge.go @@ -77,6 +77,8 @@ func slowMerge[A any](ins []<-chan A) <-chan A { return out } +// Merge combines multiple input channels into a single output channel. Items are emitted as soon as they're available, +// so the output order is not defined. func Merge[A any](ins ...<-chan A) <-chan A { switch len(ins) { case 0: @@ -90,6 +92,11 @@ func Merge[A any](ins ...<-chan A) <-chan A { } } +// Split2 divides the input channel into two output channels based on the discriminator function f, using n goroutines for concurrency. +// The function f takes an item from the input and decides which output channel (out0 or out1) it should go to by returning 0 or 1, respectively. +// Return values other than 0 or 1 lead to the item being discarded. +// The output order is not guaranteed: results are written to the outputs as soon as they're ready. +// Use OrderedSplit2 to preserve the input order. func Split2[A any](in <-chan A, n int, f func(A) int) (out0 <-chan A, out1 <-chan A) { outs := common.MapAndSplit(in, 2, n, func(a A) (A, int) { return a, f(a) @@ -97,6 +104,7 @@ func Split2[A any](in <-chan A, n int, f func(A) int) (out0 <-chan A, out1 <-cha return outs[0], outs[1] } +// OrderedSplit2 is similar to Split2, but it guarantees that the order of the outputs matches the order of the input. func OrderedSplit2[A any](in <-chan A, n int, f func(A) int) (out0 <-chan A, out1 <-chan A) { outs := common.OrderedMapAndSplit(in, 2, n, func(a A) (A, int) { return a, f(a) diff --git a/chans/util.go b/chans/util.go index 02b8a1b..31b6261 100644 --- a/chans/util.go +++ b/chans/util.go @@ -1,14 +1,18 @@ package chans +// Drain consumes and discards all items from an input channel, blocking until the channel is closed func Drain[A any](in <-chan A) { for range in { } } +// DrainNB is a non-blocking version of Drain. func DrainNB[A any](in <-chan A) { go Drain(in) } +// Buffer takes a channel of items and returns a buffered channel of exact same items in the same order. +// This is useful when you want to write to the input channel without blocking the writer. func Buffer[A any](in <-chan A, n int) <-chan A { // we use n+1 since 1 additional is held on the stack (x variable) out := make(chan A, n-1) @@ -23,6 +27,7 @@ func Buffer[A any](in <-chan A, n int) <-chan A { return out } +// FromSlice converts a slice into a channel. func FromSlice[A any](slice []A) <-chan A { out := make(chan A, len(slice)) for _, a := range slice { @@ -32,6 +37,7 @@ func FromSlice[A any](slice []A) <-chan A { return out } +// ToSlice converts a channel into a slice. func ToSlice[A any](in <-chan A) []A { var res []A for x := range in { diff --git a/echans/README.md b/echans/README.md new file mode 100644 index 0000000..9c3fcfc --- /dev/null +++ b/echans/README.md @@ -0,0 +1,189 @@ +# Rill +Rill (noun: a small stream) is a comprehensive Go toolkit for streaming, parallel processing, and pipeline construction. +Designed to reduce boilerplate and simplify usage, it empowers developers to focus on core logic +without getting bogged down by the complexity of concurrency. + + +## Key features +- **Lightweight**: fast and modular, can be easily integrated into existing projects +- **Easy to use**: the complexity of managing goroutines, wait groups, and error handling is abstracted away +- **Concurrent**: control the level of concurrency for all operations +- **Batching**: provides a simple way to organize and process data in batches +- **Error Handling**: provides a structured way to handle errors in concurrent apps +- **Streaming**: handles real-time data streams or large datasets with a minimal memory footprint +- **Order Preservation**: offers functions that preserve the original order of data, while still allowing for concurrent processing +- **Functional Programming**: based on functional programming concepts, making operations like map, filter, flatMap and others available for channel-based workflows +- **Generic**: all operations are type-safe and can be used with any data type + + +## Installation +```bash +go get github.com/destel/rill +``` + +## Example +A function that fetches keys from multiple URLs, retrieves their values from a Redis database, and prints them. +This example demonstrates the library's strengths in handling concurrent tasks, error propagation, batching and data streaming, +all while maintaining simplicity and efficiency. +See full runnable example at examples/redis-read/main.go + +```go +type KV struct { + Key string + Value string +} + +func printValues(ctx context.Context, urls []string) error { + ctx, cancel := context.WithCancel(ctx) + defer cancel() // In case of error or early exit, this ensures all http and redis operations are canceled + + // Convert URLs into a channel + urlsChan := echans.FromSlice(urls) + + // Fetch and stream keys from each URL concurrently + keys := echans.FlatMap(urlsChan, 10, func(url string) <-chan echans.Try[string] { + return streamKeys(ctx, url) + }) + + // Exclude any empty keys from the stream + keys = echans.Filter(keys, 5, func(key string) (bool, error) { + return key != "", nil + }) + + // Organize keys into manageable batches of 10 for bulk operations + keyBatches := echans.Batch(keys, 10, 1*time.Second) + + // Fetch values from Redis for each batch of keys + resultBatches := echans.Map(keyBatches, 5, func(keys []string) ([]KV, error) { + values, err := redisMGet(ctx, keys...) + if err != nil { + return nil, err + } + + results := make([]KV, len(keys)) + for i, key := range keys { + results[i] = KV{Key: key, Value: values[i]} + } + + return results, nil + }) + + // Convert batches back to a single items for final processing + results := echans.Unbatch(resultBatches) + + // Exclude any empty values from the stream + results = echans.Filter(results, 5, func(kv KV) (bool, error) { + return kv.Value != "", nil + }) + + // Iterate over each key-value pair and print + cnt := 0 + err := echans.ForEach(results, 1, func(kv KV) error { + fmt.Println(kv.Key, "=>", kv.Value) + cnt++ + return nil + }) + fmt.Println("Total keys:", cnt) + + return err +} + + + + +``` + + +## Design philosophy +At the heart of rill lies a simple yet powerful concept: operating on channels of wrapped values, encapsulated by the Try structure. +Such channels can be created manually or through utilities like **FromSlice**, **Wrap**, and **WrapAsync**, and then transformed via operations +such as **Map**, **Filter**, **FlatMap** and others. Finally when all processing stages are completed, the data can be consumed by +**ForEach**, **ToSlice** or manually by iterating over the resulting channel. + + + +## Batching +Batching is a common pattern in concurrent processing, especially when dealing with external services or databases. +Rill provides a Batch function that organizes a stream of items into batches of a specified size. It's also possible +to specify a timeout, after which the batch is emitted even if it's not full. This is useful for keeping an app reactive +when input stream is slow or sparse. + + + + + +## Error handling +In the examples above errors are handled using **ForEach**, which is good for most use cases. +**ForEach** stops processing on the first error and returns it. If you need to handle error in the middle of pipeline, +and continue processing, there is a **Catch** function that can be used for that. + +```go +results := echans.Map(input, 10, func(item int) (int, error) { + // do some processing +}) + +results = echans.Catch(results, 5, func(err error) { + if errors.Is(err, sql.ErrNoRows) { + return nil // ignore this error + } else { + return fmt.Errorf("error processing item: %w", err) // wrap error and continue processing + } +}) + +err := echans.ForEach(results, 1, func(item int) error { + // process results as usual +}) +``` + + +## Order preservation +There are use cases where it's necessary to preserve the original order of data, while still allowing for concurrent processing. +Below is an example function that fetches temperature measurements for each day in a specified range +and prints temperature movements for each day. OrderedMap function fetches measurements in parallel, but returns them in chronological order. +This allows the next stage of processing to calculate temperature differences between consecutive days. +See full runnable example at examples/weather/main.go + +```go +type Measurement struct { + Date time.Time + Temp float64 + Movement float64 +} + +func printTemperatureMovements(ctx context.Context, city string, startDate, endDate time.Time) error { + ctx, cancel := context.WithCancel(ctx) + defer cancel() // In case of error or early exit, this ensures all http are canceled + + // Make a channel that emits all the days between startDate and endDate + days := make(chan echans.Try[time.Time]) + go func() { + defer close(days) + for date := startDate; date.Before(endDate); date = date.AddDate(0, 0, 1) { + days <- echans.Try[time.Time]{V: date} + } + }() + + // Download the temperature for each day in parallel and in order + measurements := echans.OrderedMap(days, 10, func(date time.Time) (Measurement, error) { + temp, err := getTemperature(ctx, city, date) + return Measurement{Date: date, Temp: temp}, err + }) + + // Calculate the temperature movements. Use a single goroutine + prev := Measurement{Temp: math.NaN()} + measurements = echans.OrderedMap(measurements, 1, func(m Measurement) (Measurement, error) { + m.Movement = m.Temp - prev.Temp + prev = m + return m, nil + }) + + // Iterate over the measurements and print the movements + err := echans.ForEach(measurements, 1, func(m Measurement) error { + fmt.Printf("%s: %.1f°C (movement %+.1f°C)\n", m.Date.Format("2006-01-02"), m.Temp, m.Movement) + prev = m + return nil + }) + + return err +} +``` \ No newline at end of file diff --git a/echans/batch.go b/echans/batch.go index b88cd9f..deec960 100644 --- a/echans/batch.go +++ b/echans/batch.go @@ -6,12 +6,17 @@ import ( "github.com/destel/rill/chans" ) +// Batch groups items from an input channel into batches based on a maximum size and a timeout. +// A batch is emitted when it reaches the maximum size, the timeout expires, or the input channel closes. +// To emit batches only when full, set the timeout to -1. This function never emits empty batches. +// The timeout countdown starts when the first item is added to a new batch. func Batch[A any](in <-chan Try[A], n int, timeout time.Duration) <-chan Try[[]A] { values, errs := Unwrap(in) batches := chans.Batch(values, n, timeout) return WrapAsync(batches, errs) } +// Unbatch is the inverse of Batch. It takes a channel of batches and emits individual items. func Unbatch[A any](in <-chan Try[[]A]) <-chan Try[A] { batches, errs := Unwrap(in) values := chans.Unbatch(batches) diff --git a/echans/core.go b/echans/core.go index 0339e75..690964f 100644 --- a/echans/core.go +++ b/echans/core.go @@ -7,6 +7,10 @@ import ( "github.com/destel/rill/internal/common" ) +// Map applies a transformation function to each item in an input channel, using n goroutines for concurrency. +// If an error is encountered, either from the function f itself or from upstream it is forwarded to the output for further handling. +// The output order is not guaranteed: results are written to the output as soon as they're ready. +// Use [OrderedMap] to preserve the input order. func Map[A, B any](in <-chan Try[A], n int, f func(A) (B, error)) <-chan Try[B] { return common.MapOrFilter(in, n, func(a Try[A]) (Try[B], bool) { if a.Error != nil { @@ -22,6 +26,7 @@ func Map[A, B any](in <-chan Try[A], n int, f func(A) (B, error)) <-chan Try[B] }) } +// OrderedMap is similar to [Map], but it guarantees that the output order is the same as the input order. func OrderedMap[A, B any](in <-chan Try[A], n int, f func(A) (B, error)) <-chan Try[B] { return common.OrderedMapOrFilter(in, n, func(a Try[A]) (Try[B], bool) { if a.Error != nil { @@ -37,6 +42,10 @@ func OrderedMap[A, B any](in <-chan Try[A], n int, f func(A) (B, error)) <-chan }) } +// Filter removes items that do not meet a specified condition, using n goroutines for concurrency. +// If an error is encountered, either from the function f itself or from upstream it is forwarded to the output for further handling. +// The output order is not guaranteed: results are written to the output as soon as they're ready. +// Use [OrderedFilter] to preserve the input order. func Filter[A any](in <-chan Try[A], n int, f func(A) (bool, error)) <-chan Try[A] { return common.MapOrFilter(in, n, func(a Try[A]) (Try[A], bool) { if a.Error != nil { @@ -52,6 +61,7 @@ func Filter[A any](in <-chan Try[A], n int, f func(A) (bool, error)) <-chan Try[ }) } +// OrderedFilter is similar to [Filter], but it guarantees that the output order is the same as the input order. func OrderedFilter[A any](in <-chan Try[A], n int, f func(A) (bool, error)) <-chan Try[A] { return common.OrderedMapOrFilter(in, n, func(a Try[A]) (Try[A], bool) { if a.Error != nil { @@ -67,6 +77,10 @@ func OrderedFilter[A any](in <-chan Try[A], n int, f func(A) (bool, error)) <-ch }) } +// FlatMap applies a function to each item in an input channel, where the function returns a channel of items. +// These items are then flattened into a single output channel. Uses n goroutines for concurrency. +// The output order is not guaranteed: results are written to the output as soon as they're ready. +// Use [OrderedFlatMap] to preserve the input order. func FlatMap[A, B any](in <-chan Try[A], n int, f func(A) <-chan Try[B]) <-chan Try[B] { return common.MapOrFlatMap(in, n, func(a Try[A]) (b Try[B], bb <-chan Try[B], flat bool) { if a.Error != nil { @@ -76,6 +90,7 @@ func FlatMap[A, B any](in <-chan Try[A], n int, f func(A) <-chan Try[B]) <-chan }) } +// OrderedFlatMap is similar to [FlatMap], but it guarantees that the output order is the same as the input order. func OrderedFlatMap[A, B any](in <-chan Try[A], n int, f func(A) <-chan Try[B]) <-chan Try[B] { return common.OrderedMapOrFlatMap(in, n, func(a Try[A]) (b Try[B], bb <-chan Try[B], flat bool) { if a.Error != nil { @@ -85,6 +100,10 @@ func OrderedFlatMap[A, B any](in <-chan Try[A], n int, f func(A) <-chan Try[B]) }) } +// Catch allows handling errors from the input channel using n goroutines for concurrency. +// When f returns nil, error is considered handled and filtered out; otherwise it is replaced by the result of f. +// The output order is not guaranteed: results are written to the output as soon as they're ready. +// Use [OrderedCatch] to preserve the input order. func Catch[A any](in <-chan Try[A], n int, f func(error) error) <-chan Try[A] { return common.MapOrFilter(in, n, func(a Try[A]) (Try[A], bool) { if a.Error == nil { @@ -100,6 +119,7 @@ func Catch[A any](in <-chan Try[A], n int, f func(error) error) <-chan Try[A] { }) } +// OrderedCatch is similar to [Catch], but it guarantees that the output order is the same as the input order. func OrderedCatch[A any](in <-chan Try[A], n int, f func(error) error) <-chan Try[A] { return common.OrderedMapOrFilter(in, n, func(a Try[A]) (Try[A], bool) { if a.Error == nil { @@ -115,6 +135,13 @@ func OrderedCatch[A any](in <-chan Try[A], n int, f func(error) error) <-chan Tr }) } +// ForEach applies a function f to each item in an input channel using n goroutines for parallel processing. The function +// blocks until all items are processed or an error is encountered, either from the function f itself or from upstream. +// In case of an error leading to early termination, ForEach ensures the input channel is drained to avoid goroutine leaks, +// making it safe for use in environments where cleanup is crucial. The function returns the first encountered error, or nil +// if all items were processed successfully. +// While this function does not guarantee the order of item processing due to its concurrent nature, +// using n = 1 results in sequential processing, as in a simple for-range loop. func ForEach[A any](in <-chan Try[A], n int, f func(A) error) error { var retErr error var once sync.Once diff --git a/echans/delay.go b/echans/delay.go index 800d597..72b776a 100644 --- a/echans/delay.go +++ b/echans/delay.go @@ -6,6 +6,8 @@ import ( "github.com/destel/rill/chans" ) +// Delay postpones the delivery of items from an input channel by a specified duration, maintaining the order. +// Useful for adding delays in processing or simulating latency. func Delay[A any](in <-chan A, delay time.Duration) <-chan A { return chans.Delay(in, delay) } diff --git a/echans/doc.go b/echans/doc.go new file mode 100644 index 0000000..c62d064 --- /dev/null +++ b/echans/doc.go @@ -0,0 +1,5 @@ +// Package rill is a Go toolkit designed for efficient and straightforward streaming, parallel processing, and pipeline construction. +// It abstracts away the complexities of concurrency management, enabling developers to focus on core logic. +// With features like lightweight integration, batch processing, error handling, and support for functional programming paradigms, +// rill enhances productivity in building concurrent applications. It offers type-safe operations, and minimizes memory usage even for large data sets. +package echans diff --git a/echans/merge.go b/echans/merge.go index 4f3650f..e55c60a 100644 --- a/echans/merge.go +++ b/echans/merge.go @@ -7,10 +7,19 @@ import ( "github.com/destel/rill/internal/common" ) +// Merge combines multiple input channels into a single output channel. Items are emitted as soon as they're available, +// so the output order is not defined. func Merge[A any](ins ...<-chan A) <-chan A { return chans.Merge(ins...) } +// Split2 divides the input channel into two output channels based on the discriminator function f, using n goroutines for concurrency. +// The function f takes an item from the input and decides which output channel (out0 or out1) it should go to by returning 0 or 1, respectively. +// Return values other than 0 or 1 lead to the item being discarded. +// If an error is encountered, either from the function f itself or from upstream it is intentionally sent +// to one of the output channels in a non-deterministic manner. +// The output order is not guaranteed: results are written to the outputs as soon as they're ready. +// Use [OrderedSplit2] to preserve the input order. func Split2[A any](in <-chan Try[A], n int, f func(A) (int, error)) (out0 <-chan Try[A], out1 <-chan Try[A]) { outs := common.MapAndSplit(in, 2, n, func(a Try[A]) (Try[A], int) { if a.Error != nil { @@ -28,6 +37,7 @@ func Split2[A any](in <-chan Try[A], n int, f func(A) (int, error)) (out0 <-chan return outs[0], outs[1] } +// OrderedSplit2 is similar to [Split2], but it guarantees that the order of the outputs matches the order of the input. func OrderedSplit2[A any](in <-chan Try[A], n int, f func(A) (int, error)) (out0 <-chan Try[A], out1 <-chan Try[A]) { outs := common.OrderedMapAndSplit(in, 2, n, func(a Try[A]) (Try[A], int) { if a.Error != nil { diff --git a/echans/util.go b/echans/util.go index e421aab..74af150 100644 --- a/echans/util.go +++ b/echans/util.go @@ -4,18 +4,28 @@ import ( "github.com/destel/rill/chans" ) +// Drain consumes and discards all items from an input channel, blocking until the channel is closed func Drain[A any](in <-chan A) { chans.Drain(in) } +// DrainNB is a non-blocking version of [Drain]. func DrainNB[A any](in <-chan A) { chans.DrainNB(in) } +// Buffer takes a channel of items and returns a buffered channel of exact same items in the same order. +// This is useful when you want to write to the input channel without blocking the writer. +// +// Typical use case would look like +// +// ids = Buffer(ids, 100) +// // Now up to 100 ids can be buffered if subsequent stages of the pipeline are slow func Buffer[A any](in <-chan A, n int) <-chan A { return chans.Buffer(in, n) } +// FromSlice converts a slice into a channel. func FromSlice[A any](slice []A) <-chan Try[A] { out := make(chan Try[A], len(slice)) for _, a := range slice { @@ -25,6 +35,9 @@ func FromSlice[A any](slice []A) <-chan Try[A] { return out } +// ToSlice converts a channel into a slice. +// Conversion stops at the first error encountered. +// In case of an error, ToSlice ensures the input channel is drained to avoid goroutine leaks, func ToSlice[A any](in <-chan Try[A]) ([]A, error) { var res []A diff --git a/echans/wrap.go b/echans/wrap.go index f0ab4ec..d36fa83 100644 --- a/echans/wrap.go +++ b/echans/wrap.go @@ -5,11 +5,15 @@ import ( "github.com/destel/rill/internal/common" ) +// Try is a container for a value or an error type Try[A any] struct { V A Error error } +// Wrap converts a regular channel of items into a channel of items wrapped in a [Try] container. +// Additionally, this function can also take an error, which will be added to the output channel. +// Either the input channel or the error can be nil, but not both simultaneously. func Wrap[A any](values <-chan A, err error) <-chan Try[A] { if values == nil && err == nil { return nil @@ -31,6 +35,9 @@ func Wrap[A any](values <-chan A, err error) <-chan Try[A] { return out } +// WrapAsync converts a regular channel of items into a channel of items wrapped in a [Try] container. +// Additionally, this function can also take a channel of errors, that will be added to the output channel. +// Either the input channel or the error channel can be nil, but not both simultaneously. func WrapAsync[A any](values <-chan A, errs <-chan error) <-chan Try[A] { wrappedValues := chans.Map(values, 1, func(a A) Try[A] { return Try[A]{V: a} @@ -52,6 +59,7 @@ func WrapAsync[A any](values <-chan A, errs <-chan error) <-chan Try[A] { return chans.Merge(wrappedErrs, wrappedValues) } +// Unwrap converts a channel of [Try] containers into a channel of values and a channel of errors. func Unwrap[A any](in <-chan Try[A]) (<-chan A, <-chan error) { if in == nil { return nil, nil diff --git a/examples/redis-read/ids1.txt b/examples/redis-read/ids1.txt new file mode 100644 index 0000000..6c99644 --- /dev/null +++ b/examples/redis-read/ids1.txt @@ -0,0 +1,100 @@ +id1000 +id1001 +id1002 +id1003 +id1004 +id1005 +id1006 +id1007 +id1008 +id1009 +id1010 +id1011 +id1012 +id1013 +id1014 +id1015 +id1016 +id1017 +id1018 +id1019 +id1020 +id1021 +id1022 +id1023 +id1024 +id1025 +id1026 +id1027 +id1028 +id1029 +id1030 +id1031 +id1032 +id1033 +id1034 +id1035 +id1036 +id1037 +id1038 +id1039 +id1040 +id1041 +id1042 +id1043 +id1044 +id1045 +id1046 +id1047 +id1048 +id1049 +id1050 +id1051 +id1052 +id1053 +id1054 +id1055 +id1056 +id1057 +id1058 +id1059 +id1060 +id1061 +id1062 +id1063 +id1064 +id1065 +id1066 +id1067 +id1068 +id1069 +id1070 +id1071 +id1072 +id1073 +id1074 +id1075 +id1076 +id1077 +id1078 +id1079 +id1080 +id1081 +id1082 +id1083 +id1084 +id1085 +id1086 +id1087 +id1088 +id1089 +id1090 +id1091 +id1092 +id1093 +id1094 +id1095 +id1096 +id1097 +id1098 +id1099 diff --git a/examples/redis-read/ids2.txt b/examples/redis-read/ids2.txt new file mode 100644 index 0000000..8806c99 --- /dev/null +++ b/examples/redis-read/ids2.txt @@ -0,0 +1,50 @@ +id2000 +id2001 +id2002 +id2003 +id2004 +id2005 +id2006 +id2007 +id2008 +id2009 +id2010 +id2011 +id2012 +id2013 +id2014 +id2015 +id2016 +id2017 +id2018 +id2019 +id2020 +id2021 +id2022 +id2023 +id2024 +id2025 +id2026 +id2027 +id2028 +id2029 +id2030 +id2031 +id2032 +id2033 +id2034 +id2035 +id2036 +id2037 +id2038 +id2039 +id2040 +id2041 +id2042 +id2043 +id2044 +id2045 +id2046 +id2047 +id2048 +id2049 diff --git a/examples/redis-read/ids3.txt b/examples/redis-read/ids3.txt new file mode 100644 index 0000000..9714c2b --- /dev/null +++ b/examples/redis-read/ids3.txt @@ -0,0 +1,23 @@ +id3000 +id3001 +id3002 +id3003 + +id3004 +id3005 +id3006 +id3007 +id3008 +id3009 + +id3010 +id3011 +id3012 +id3013 +id3014 + +id3015 +id3016 +id3017 +id3018 +id3019 diff --git a/examples/redis-read/main.go b/examples/redis-read/main.go new file mode 100644 index 0000000..8ed741e --- /dev/null +++ b/examples/redis-read/main.go @@ -0,0 +1,158 @@ +package main + +import ( + "bufio" + "context" + "errors" + "fmt" + "io" + "math/rand" + "net/http" + "strings" + "time" + + "github.com/destel/rill/echans" +) + +type KV struct { + Key string + Value string +} + +func main() { + err := printValues(context.Background(), []string{ + "https://raw.githubusercontent.com/destel/rill/f/docs/examples/redis-read/ids1.txt", + "https://raw.githubusercontent.com/destel/rill/f/docs/examples/redis-read/ids2.txt", + "https://raw.githubusercontent.com/destel/rill/f/docs/examples/redis-read/ids3.txt", + }) + + if err != nil { + fmt.Println("Error:", err) + } +} + +// printValues orchestrates a pipeline that fetches keys from URLs, retrieves their values from Redis, and prints them. +// The pipeline leverages concurrency for fetching and processing and uses batching to reduce the number of Redis calls. +func printValues(ctx context.Context, urls []string) error { + ctx, cancel := context.WithCancel(ctx) + defer cancel() // In case of error, this ensures all http and redis operations are canceled + + // Convert URLs into a channel + urlsChan := echans.FromSlice(urls) + + // Fetch and stream keys from each URL concurrently + keys := echans.FlatMap(urlsChan, 10, func(url string) <-chan echans.Try[string] { + return streamKeys(ctx, url) + }) + + // Exclude any empty keys from the stream + keys = echans.Filter(keys, 5, func(key string) (bool, error) { + return key != "", nil + }) + + // Organize keys into manageable batches of 10 for bulk operations + keyBatches := echans.Batch(keys, 10, 1*time.Second) + + // Fetch values from Redis for each batch of keys + resultBatches := echans.Map(keyBatches, 5, func(keys []string) ([]KV, error) { + values, err := redisMGet(ctx, keys...) + if err != nil { + return nil, err + } + + results := make([]KV, len(keys)) + for i, key := range keys { + results[i] = KV{Key: key, Value: values[i]} + } + + return results, nil + }) + + // Convert batches back to a single items for final processing + results := echans.Unbatch(resultBatches) + + // Exclude any empty values from the stream + results = echans.Filter(results, 5, func(kv KV) (bool, error) { + return kv.Value != "", nil + }) + + // Iterate over each key-value pair and print + cnt := 0 + err := echans.ForEach(results, 1, func(kv KV) error { + fmt.Println(kv.Key, "=>", kv.Value) + cnt++ + return nil + }) + fmt.Println("Total keys:", cnt) + + return err +} + +// streamKeys reads a file from the given URL line by line and returns a channel of lines/keys +func streamKeys(ctx context.Context, url string) <-chan echans.Try[string] { + out := make(chan echans.Try[string], 1) + + go func() { + defer close(out) + + req, err := http.NewRequestWithContext(ctx, "GET", url, nil) + if err != nil { + out <- echans.Try[string]{Error: err} + return + } + + res, err := http.DefaultClient.Do(req) + if err != nil { + out <- echans.Try[string]{Error: err} + return + } + defer res.Body.Close() + + r := bufio.NewReader(res.Body) + + for { + line, err := r.ReadString('\n') + line = strings.TrimSuffix(line, "\n") + + if errors.Is(err, io.EOF) { + out <- echans.Try[string]{V: line} + return + } + if err != nil { + out <- echans.Try[string]{Error: err} + return + } + + out <- echans.Try[string]{V: line} + } + }() + + return out +} + +// redisMGet emulates a batch Redis read operation. It returns the values for the given keys. +func redisMGet(ctx context.Context, keys ...string) ([]string, error) { + if err := ctx.Err(); err != nil { + return nil, err + } + + // Emulate a network delay + randomSleep(1000 * time.Millisecond) + + values := make([]string, len(keys)) + for i, key := range keys { + // Emulate that some keys are missing + if strings.HasSuffix(key, "0") || strings.HasSuffix(key, "5") { + values[i] = "" + continue + } + + values[i] = "val" + strings.TrimPrefix(key, "id") + } + + return values, nil +} + +func randomSleep(max time.Duration) { + time.Sleep(time.Duration(rand.Intn(int(max)))) +} diff --git a/examples/weather/main.go b/examples/weather/main.go new file mode 100644 index 0000000..ab5f999 --- /dev/null +++ b/examples/weather/main.go @@ -0,0 +1,91 @@ +package main + +import ( + "context" + "fmt" + "math" + "math/rand" + "time" + + "github.com/destel/rill/echans" +) + +type Measurement struct { + Date time.Time + Temp float64 + Movement float64 +} + +func main() { + endDate := time.Now() + startDate := endDate.AddDate(0, 0, -30) + + err := printTemperatureMovements(context.Background(), "New York", startDate, endDate) + if err != nil { + fmt.Println("Error:", err) + } +} + +// printTemperatureMovements orchestrates a pipeline that fetches temperature measurements for a given city and +// prints the daily temperature movements. Measurements are fetched concurrently, but the movements are calculated +// in order, using a single goroutine. +func printTemperatureMovements(ctx context.Context, city string, startDate, endDate time.Time) error { + ctx, cancel := context.WithCancel(ctx) + defer cancel() // In case of error, this ensures all pending operations are canceled + + // Make a channel that emits all the days between startDate and endDate + days := make(chan echans.Try[time.Time]) + go func() { + defer close(days) + for date := startDate; date.Before(endDate); date = date.AddDate(0, 0, 1) { + days <- echans.Try[time.Time]{V: date} + } + }() + + // Download the temperature for each day in parallel and in order + measurements := echans.OrderedMap(days, 10, func(date time.Time) (Measurement, error) { + temp, err := getTemperature(ctx, city, date) + return Measurement{Date: date, Temp: temp}, err + }) + + // Calculate the temperature movements. Use a single goroutine + prev := Measurement{Temp: math.NaN()} + measurements = echans.OrderedMap(measurements, 1, func(m Measurement) (Measurement, error) { + m.Movement = m.Temp - prev.Temp + prev = m + return m, nil + }) + + // Iterate over the measurements and print the movements + err := echans.ForEach(measurements, 1, func(m Measurement) error { + fmt.Printf("%s: %.1f°C (movement %+.1f°C)\n", m.Date.Format("2006-01-02"), m.Temp, m.Movement) + prev = m + return nil + }) + + return err +} + +func getTemperature(ctx context.Context, city string, date time.Time) (float64, error) { + if err := ctx.Err(); err != nil { + return 0, err + } + + // Simulate a network request + randomSleep(1000 * time.Millisecond) + + // Basic city hash, to make measurements unique for each city + var h float64 + for _, c := range city { + h += float64(c) + } + + // Simulate a temperature reading, by retuning a pseudo-random, but deterministic value + temp := 15 - 10*math.Sin(h+float64(date.Unix())) + + return temp, nil +} + +func randomSleep(max time.Duration) { + time.Sleep(time.Duration(rand.Intn(int(max)))) +}