Skip to content

Commit

Permalink
Merge branch 'refs/heads/main' into f/filter-map
Browse files Browse the repository at this point in the history
  • Loading branch information
destel committed Jul 21, 2024
2 parents 6cf9731 + a7c2c3f commit 1ff30dd
Show file tree
Hide file tree
Showing 9 changed files with 473 additions and 333 deletions.
41 changes: 10 additions & 31 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ but for channels, the complexity can quickly escalate beyond a basic for-loop as
- Controlling concurrency level is often done with a semaphore
- If the above approach becomes a bottleneck, worker pools must be introduced and managed manually
- For a multi-stage pipeline, everything must be manually managed at each stage, causing complexity to grow non-linearly
- Features like ordered fan-in require even more complex orchestration and synchronization
- Features like batching or ordered fan-in require even more complex orchestration and synchronization

The list can be continued. And while tools like channels, ErrGroups or semaphores are powerful on their own,
combining them into a more complex logic, can lead to code with lots of boilerplate that's difficult to write, read, and maintain.
Expand Down Expand Up @@ -356,33 +356,12 @@ func main() {
```


## Limitations
While rill provides a powerful and expressive way to build concurrent pipelines, there are certain limitations and
scenarios where alternative approaches might be more suitable.

Go channels are a fundamental and convenient feature for handling concurrency and communication between goroutines.
However, it's important to note that channels come with a certain overhead. The impact of this overhead varies depending on
the specific use:

- **I/O-bound tasks:** Channels are great for handling I/O-bound tasks, such as reading from or writing to files,
network communication, or database operations. The overhead of channels is typically negligible compared to
the time spent waiting for I/O operations to complete
- **Light CPU-bound tasks:** When parallelizing a large number of small CPU-bound tasks, such as simple arithmetic operations,
the overhead of channels can become significant. In such cases, using channels and goroutines may not provide
the desired performance benefits.
- **Heavy CPU-bound tasks:** For more computationally intensive tasks, such as complex string manipulation, parsing, encryption,
or hash calculation, the overhead of channels becomes less significant compared to the overall processing time.
In these scenarios, using channels and rill can still provide an efficient way to parallelize the workload.
See [benchmarks](https://github.com/destel/rill/wiki/Benchmarks) for more details.

If your use case requires high-performance calculations and you want to minimize the overhead of channels,
you can consider alternative approaches or libraries. For example, it's possible to transform a slice without channels and
with almost zero orchestration, just by dividing the slice into n chunks and assigning each chunk to a separate goroutine.

Because of the reasons mentioned above and to avoid misleading users, rill does not provide functions that operate directly on slices.
It main focus is channels and streaming. However, slices can still be used with rill by converting them to and from channels,
and leveraging ordered transformations when necessary.

Another limitation of rill is that it does not provide a way to create a global worker pool for the entire pipeline.
Each stage of the pipeline must have at least one alive goroutine to keep the whole pipeline running.
That's why each stage has its own goroutine pool, which is created and managed internally.

## Performance
Rill's performance is primarily bounded by the performance of channel operations. As a result, applications that
already use channels and channel-based concurrency patterns can expect minimal overhead when adopting Rill.
Moreover, Rill outperforms some traditional concurrency patterns that spawn a goroutine for each channel item and use a semaphore
to control the level of concurrency. For example, Rill's ForEach function while being more concise, outperforms the errgroup.Go + errgroup.SetLimit pattern
both in terms of speed and number of allocations. For more details, refer to the [benchmarks](https://github.com/destel/rill/wiki/Benchmarks).

This makes Rill well-suited for a wide range of tasks, especially I/O-bound workloads where the overhead of channel operations is typically negligible.
124 changes: 55 additions & 69 deletions consume.go
Original file line number Diff line number Diff line change
@@ -1,71 +1,46 @@
package rill

import (
"errors"
"sync"

"github.com/destel/rill/internal/core"
)

// ForEach applies a function f to each item in an input stream.
//
// This is a blocking unordered function that processes items concurrently using n goroutines.
// The case when n = 1 is optimized: it does not spawn additional goroutines and processes items sequentially,
// making the function ordered and similar to a regular for-range loop.
// When n = 1, processing becomes sequential, making the function ordered and similar to a regular for-range loop.
//
// See the package documentation for more information on blocking unordered functions and error handling.
func ForEach[A any](in <-chan Try[A], n int, f func(A) error) error {
if n == 1 {
for a := range in {
var retErr error
var once core.OnceWithWait
setReturns := func(err error) {
once.Do(func() {
retErr = err
})
}

go func() {
core.ForEach(in, n, func(a Try[A]) {
if once.WasCalled() {
return // drain
}

err := a.Error
if err == nil {
err = f(a.Value)
}

if err != nil {
DrainNB(in)
return err
setReturns(err)
}
}

return nil
}

var retErr error
var once sync.Once

in, earlyExit := core.Breakable(in)
done := make(chan struct{})

core.Loop(in, done, n, func(a Try[A]) {
err := a.Error
if err == nil {
err = f(a.Value)
}
})

if err != nil {
earlyExit()
once.Do(func() {
retErr = err
})
}
})
setReturns(nil)
}()

<-done
once.Wait()
return retErr
}

// onceFunc1 returns a single argument function that invokes f only once. The returned function may be called concurrently.
func onceFunc1[T any](f func(T)) func(T) {
var once sync.Once
return func(value T) {
once.Do(func() {
f(value)
f = nil
})
}
}

// Err returns the first error encountered in the input stream or nil if there were no errors.
//
// This is a blocking ordered function that processes items sequentially.
Expand Down Expand Up @@ -102,44 +77,55 @@ func First[A any](in <-chan Try[A]) (value A, found bool, err error) {
// This function returns true as soon as it finds such an item. Otherwise, it returns false.
//
// Any is a blocking unordered function that processes items concurrently using n goroutines.
// The case when n = 1 is optimized: it does not spawn additional goroutines and processes items sequentially,
// making the function ordered.
// When n = 1, processing becomes sequential, making the function ordered.
//
// See the package documentation for more information on blocking unordered functions and error handling.
func Any[A any](in <-chan Try[A], n int, f func(A) (bool, error)) (bool, error) {
errBreak := errors.New("break")
res := false
setRes := onceFunc1(func(a bool) {
res = a
})
var retFound bool
var retErr error
var once core.OnceWithWait
setReturns := func(found bool, err error) {
once.Do(func() {
retFound = found
retErr = err
})
}

err := ForEach(in, n, func(a A) error {
ok, err := f(a)
if err != nil {
return err
}
go func() {
core.ForEach(in, n, func(a Try[A]) {
if once.WasCalled() {
return // drain
}

if ok {
setRes(true)
return errBreak
if err := a.Error; err != nil {
setReturns(false, err)
return
}

}
return nil
})
ok, err := f(a.Value)
if err != nil {
setReturns(false, err)
return
}
if ok {
setReturns(true, nil)
return
}
})

if err != nil && errors.Is(err, errBreak) {
err = nil
}
return res, err
setReturns(false, nil)
}()

once.Wait()
return retFound, retErr
}

// All checks if all items in the input stream satisfy the condition f.
// This function returns false as soon as it finds an item that does not satisfy the condition. Otherwise, it returns true,
// including the case when the stream was empty.
//
// This is a blocking unordered function that processes items concurrently using n goroutines.
// The case when n = 1 is optimized: it does not spawn additional goroutines and processes items sequentially,
// making the function ordered.
// When n = 1, processing becomes sequential, making the function ordered.
//
// See the package documentation for more information on blocking unordered functions and error handling.
func All[A any](in <-chan Try[A], n int, f func(A) (bool, error)) (bool, error) {
Expand Down
Loading

0 comments on commit 1ff30dd

Please sign in to comment.