Skip to content

Commit

Permalink
Documentation (#7)
Browse files Browse the repository at this point in the history
  • Loading branch information
destel authored Mar 20, 2024
1 parent f14a30b commit f38fe7e
Show file tree
Hide file tree
Showing 18 changed files with 721 additions and 3 deletions.
5 changes: 5 additions & 0 deletions chans/batch.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,10 @@ import (
"time"
)

// Batch groups items from an input channel into batches based on a maximum size and a timeout.
// A batch is emitted when it reaches the maximum size, the timeout expires, or the input channel closes.
// To emit batches only when full, set the timeout to -1. This function never emits empty batches.
// The timeout countdown starts when the first item is added to a new batch.
func Batch[A any](in <-chan A, n int, timeout time.Duration) <-chan []A {
if in == nil {
return nil
Expand Down Expand Up @@ -91,6 +95,7 @@ func Batch[A any](in <-chan A, n int, timeout time.Duration) <-chan []A {
return out
}

// Unbatch is the inverse of Batch. It takes a channel of batches and emits individual items.
func Unbatch[A any](in <-chan []A) <-chan A {
if in == nil {
return nil
Expand Down
22 changes: 19 additions & 3 deletions chans/core.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,51 +2,67 @@ package chans

import "github.com/destel/rill/internal/common"

// Map applies a transformation function to each item in an input channel, using n goroutines for concurrency.
// The output order is not guaranteed: results are written to the output as soon as they're ready.
// Use OrderedMap to preserve the input order.
func Map[A, B any](in <-chan A, n int, f func(A) B) <-chan B {
return common.MapOrFilter(in, n, func(a A) (B, bool) {
return f(a), true
})
}

// OrderedMap is similar to Map, but it guarantees that the output order is the same as the input order.
func OrderedMap[A, B any](in <-chan A, n int, f func(A) B) <-chan B {
return common.OrderedMapOrFilter(in, n, func(a A) (B, bool) {
return f(a), true
})
}

// Filter removes items that do not meet a specified condition, using n goroutines for concurrent processing.
// The output order is not guaranteed: results are written to the output as soon as they're ready.
// Use OrderedFilter to preserve the input order.
func Filter[A any](in <-chan A, n int, f func(A) bool) <-chan A {
return common.MapOrFilter(in, n, func(a A) (A, bool) {
return a, f(a)
})
}

// OrderedFilter is similar to Filter, but it guarantees that the output order is the same as the input order.
func OrderedFilter[A any](in <-chan A, n int, f func(A) bool) <-chan A {
return common.OrderedMapOrFilter(in, n, func(a A) (A, bool) {
return a, f(a)
})
}

// FlatMap applies a function to each item in an input channel, where the function returns a channel of items.
// These items are then flattened into a single output channel. Uses n goroutines for concurrency.
// The output order is not guaranteed: results are written to the output as soon as they're ready.
// Use OrderedFlatMap to preserve the input order.
func FlatMap[A, B any](in <-chan A, n int, f func(A) <-chan B) <-chan B {
var zero B
return common.MapOrFlatMap(in, n, func(a A) (b B, bb <-chan B, flat bool) {
return zero, f(a), true
})
}

// OrderedFlatMap is similar to FlatMap, but it guarantees that the output order is the same as the input order.
func OrderedFlatMap[A, B any](in <-chan A, n int, f func(A) <-chan B) <-chan B {
var zero B
return common.OrderedMapOrFlatMap(in, n, func(a A) (b B, bb <-chan B, flat bool) {
return zero, f(a), true
})
}

// blocking
// todo: explain that if false has been returned for item[i] that it's guranteed that function would have been called for all previous items
// ForEach applies a function to each item in an input channel using n goroutines. The function blocks until
// all items are processed or the function returns false. In case of early termination, ForEach ensures
// the input channel is drained to avoid goroutine leaks, making it safe for use in environments where cleanup is crucial.
// While this function does not guarantee the order of item processing due to its concurrent nature,
// using n = 1 results in sequential processing, as in a simple for-range loop.
func ForEach[A any](in <-chan A, n int, f func(A) bool) {
if n == 1 {
for a := range in {
if !f(a) {
defer DrainNB(in)
DrainNB(in)
break
}
}
Expand Down
2 changes: 2 additions & 0 deletions chans/delay.go
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,8 @@ type delayedValue[A any] struct {
SendAt time.Time
}

// Delay postpones the delivery of items from an input channel by a specified duration, maintaining the order.
// Useful for adding delays in processing or simulating latency.
func Delay[A any](in <-chan A, delay time.Duration) <-chan A {
wrapped := make(chan delayedValue[A])
go func() {
Expand Down
8 changes: 8 additions & 0 deletions chans/merge.go
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,8 @@ func slowMerge[A any](ins []<-chan A) <-chan A {
return out
}

// Merge combines multiple input channels into a single output channel. Items are emitted as soon as they're available,
// so the output order is not defined.
func Merge[A any](ins ...<-chan A) <-chan A {
switch len(ins) {
case 0:
Expand All @@ -90,13 +92,19 @@ func Merge[A any](ins ...<-chan A) <-chan A {
}
}

// Split2 divides the input channel into two output channels based on the discriminator function f, using n goroutines for concurrency.
// The function f takes an item from the input and decides which output channel (out0 or out1) it should go to by returning 0 or 1, respectively.
// Return values other than 0 or 1 lead to the item being discarded.
// The output order is not guaranteed: results are written to the outputs as soon as they're ready.
// Use OrderedSplit2 to preserve the input order.
func Split2[A any](in <-chan A, n int, f func(A) int) (out0 <-chan A, out1 <-chan A) {
outs := common.MapAndSplit(in, 2, n, func(a A) (A, int) {
return a, f(a)
})
return outs[0], outs[1]
}

// OrderedSplit2 is similar to Split2, but it guarantees that the order of the outputs matches the order of the input.
func OrderedSplit2[A any](in <-chan A, n int, f func(A) int) (out0 <-chan A, out1 <-chan A) {
outs := common.OrderedMapAndSplit(in, 2, n, func(a A) (A, int) {
return a, f(a)
Expand Down
6 changes: 6 additions & 0 deletions chans/util.go
Original file line number Diff line number Diff line change
@@ -1,14 +1,18 @@
package chans

// Drain consumes and discards all items from an input channel, blocking until the channel is closed
func Drain[A any](in <-chan A) {
for range in {
}
}

// DrainNB is a non-blocking version of Drain.
func DrainNB[A any](in <-chan A) {
go Drain(in)
}

// Buffer takes a channel of items and returns a buffered channel of exact same items in the same order.
// This is useful when you want to write to the input channel without blocking the writer.
func Buffer[A any](in <-chan A, n int) <-chan A {
// we use n+1 since 1 additional is held on the stack (x variable)
out := make(chan A, n-1)
Expand All @@ -23,6 +27,7 @@ func Buffer[A any](in <-chan A, n int) <-chan A {
return out
}

// FromSlice converts a slice into a channel.
func FromSlice[A any](slice []A) <-chan A {
out := make(chan A, len(slice))
for _, a := range slice {
Expand All @@ -32,6 +37,7 @@ func FromSlice[A any](slice []A) <-chan A {
return out
}

// ToSlice converts a channel into a slice.
func ToSlice[A any](in <-chan A) []A {
var res []A
for x := range in {
Expand Down
189 changes: 189 additions & 0 deletions echans/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,189 @@
# Rill
Rill (noun: a small stream) is a comprehensive Go toolkit for streaming, parallel processing, and pipeline construction.
Designed to reduce boilerplate and simplify usage, it empowers developers to focus on core logic
without getting bogged down by the complexity of concurrency.


## Key features
- **Lightweight**: fast and modular, can be easily integrated into existing projects
- **Easy to use**: the complexity of managing goroutines, wait groups, and error handling is abstracted away
- **Concurrent**: control the level of concurrency for all operations
- **Batching**: provides a simple way to organize and process data in batches
- **Error Handling**: provides a structured way to handle errors in concurrent apps
- **Streaming**: handles real-time data streams or large datasets with a minimal memory footprint
- **Order Preservation**: offers functions that preserve the original order of data, while still allowing for concurrent processing
- **Functional Programming**: based on functional programming concepts, making operations like map, filter, flatMap and others available for channel-based workflows
- **Generic**: all operations are type-safe and can be used with any data type


## Installation
```bash
go get github.com/destel/rill
```

## Example
A function that fetches keys from multiple URLs, retrieves their values from a Redis database, and prints them.
This example demonstrates the library's strengths in handling concurrent tasks, error propagation, batching and data streaming,
all while maintaining simplicity and efficiency.
See full runnable example at examples/redis-read/main.go

```go
type KV struct {
Key string
Value string
}

func printValues(ctx context.Context, urls []string) error {
ctx, cancel := context.WithCancel(ctx)
defer cancel() // In case of error or early exit, this ensures all http and redis operations are canceled

// Convert URLs into a channel
urlsChan := echans.FromSlice(urls)

// Fetch and stream keys from each URL concurrently
keys := echans.FlatMap(urlsChan, 10, func(url string) <-chan echans.Try[string] {
return streamKeys(ctx, url)
})

// Exclude any empty keys from the stream
keys = echans.Filter(keys, 5, func(key string) (bool, error) {
return key != "", nil
})

// Organize keys into manageable batches of 10 for bulk operations
keyBatches := echans.Batch(keys, 10, 1*time.Second)

// Fetch values from Redis for each batch of keys
resultBatches := echans.Map(keyBatches, 5, func(keys []string) ([]KV, error) {
values, err := redisMGet(ctx, keys...)
if err != nil {
return nil, err
}

results := make([]KV, len(keys))
for i, key := range keys {
results[i] = KV{Key: key, Value: values[i]}
}

return results, nil
})

// Convert batches back to a single items for final processing
results := echans.Unbatch(resultBatches)

// Exclude any empty values from the stream
results = echans.Filter(results, 5, func(kv KV) (bool, error) {
return kv.Value != "<nil>", nil
})

// Iterate over each key-value pair and print
cnt := 0
err := echans.ForEach(results, 1, func(kv KV) error {
fmt.Println(kv.Key, "=>", kv.Value)
cnt++
return nil
})
fmt.Println("Total keys:", cnt)

return err
}




```


## Design philosophy
At the heart of rill lies a simple yet powerful concept: operating on channels of wrapped values, encapsulated by the Try structure.
Such channels can be created manually or through utilities like **FromSlice**, **Wrap**, and **WrapAsync**, and then transformed via operations
such as **Map**, **Filter**, **FlatMap** and others. Finally when all processing stages are completed, the data can be consumed by
**ForEach**, **ToSlice** or manually by iterating over the resulting channel.



## Batching
Batching is a common pattern in concurrent processing, especially when dealing with external services or databases.
Rill provides a Batch function that organizes a stream of items into batches of a specified size. It's also possible
to specify a timeout, after which the batch is emitted even if it's not full. This is useful for keeping an app reactive
when input stream is slow or sparse.





## Error handling
In the examples above errors are handled using **ForEach**, which is good for most use cases.
**ForEach** stops processing on the first error and returns it. If you need to handle error in the middle of pipeline,
and continue processing, there is a **Catch** function that can be used for that.

```go
results := echans.Map(input, 10, func(item int) (int, error) {
// do some processing
})

results = echans.Catch(results, 5, func(err error) {
if errors.Is(err, sql.ErrNoRows) {
return nil // ignore this error
} else {
return fmt.Errorf("error processing item: %w", err) // wrap error and continue processing
}
})

err := echans.ForEach(results, 1, func(item int) error {
// process results as usual
})
```


## Order preservation
There are use cases where it's necessary to preserve the original order of data, while still allowing for concurrent processing.
Below is an example function that fetches temperature measurements for each day in a specified range
and prints temperature movements for each day. OrderedMap function fetches measurements in parallel, but returns them in chronological order.
This allows the next stage of processing to calculate temperature differences between consecutive days.
See full runnable example at examples/weather/main.go

```go
type Measurement struct {
Date time.Time
Temp float64
Movement float64
}

func printTemperatureMovements(ctx context.Context, city string, startDate, endDate time.Time) error {
ctx, cancel := context.WithCancel(ctx)
defer cancel() // In case of error or early exit, this ensures all http are canceled

// Make a channel that emits all the days between startDate and endDate
days := make(chan echans.Try[time.Time])
go func() {
defer close(days)
for date := startDate; date.Before(endDate); date = date.AddDate(0, 0, 1) {
days <- echans.Try[time.Time]{V: date}
}
}()

// Download the temperature for each day in parallel and in order
measurements := echans.OrderedMap(days, 10, func(date time.Time) (Measurement, error) {
temp, err := getTemperature(ctx, city, date)
return Measurement{Date: date, Temp: temp}, err
})

// Calculate the temperature movements. Use a single goroutine
prev := Measurement{Temp: math.NaN()}
measurements = echans.OrderedMap(measurements, 1, func(m Measurement) (Measurement, error) {
m.Movement = m.Temp - prev.Temp
prev = m
return m, nil
})

// Iterate over the measurements and print the movements
err := echans.ForEach(measurements, 1, func(m Measurement) error {
fmt.Printf("%s: %.1f°C (movement %+.1f°C)\n", m.Date.Format("2006-01-02"), m.Temp, m.Movement)
prev = m
return nil
})

return err
}
```
5 changes: 5 additions & 0 deletions echans/batch.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,12 +6,17 @@ import (
"github.com/destel/rill/chans"
)

// Batch groups items from an input channel into batches based on a maximum size and a timeout.
// A batch is emitted when it reaches the maximum size, the timeout expires, or the input channel closes.
// To emit batches only when full, set the timeout to -1. This function never emits empty batches.
// The timeout countdown starts when the first item is added to a new batch.
func Batch[A any](in <-chan Try[A], n int, timeout time.Duration) <-chan Try[[]A] {
values, errs := Unwrap(in)
batches := chans.Batch(values, n, timeout)
return WrapAsync(batches, errs)
}

// Unbatch is the inverse of Batch. It takes a channel of batches and emits individual items.
func Unbatch[A any](in <-chan Try[[]A]) <-chan Try[A] {
batches, errs := Unwrap(in)
values := chans.Unbatch(batches)
Expand Down
Loading

0 comments on commit f38fe7e

Please sign in to comment.