From 620b042ec5f3786cd9212834f5bf2bdf4945de12 Mon Sep 17 00:00:00 2001 From: destel Date: Thu, 21 Nov 2024 09:42:30 +0200 Subject: [PATCH 01/18] Typo --- util.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/util.go b/util.go index a8d8482..d31bd23 100644 --- a/util.go +++ b/util.go @@ -7,7 +7,7 @@ func Drain[A any](in <-chan A) { core.Drain(in) } -// DrainNB is a non-blocking version of [Drain]. Is does draining in a separate goroutine. +// DrainNB is a non-blocking version of [Drain]. It does draining in a separate goroutine. func DrainNB[A any](in <-chan A) { core.DrainNB(in) } From c8a5579268ef6fda41148f7440960f57c7881964 Mon Sep 17 00:00:00 2001 From: destel Date: Thu, 21 Nov 2024 11:38:53 +0200 Subject: [PATCH 02/18] Removed doSomethingWithNumber --- example123_test.go | 18 +++++++++--------- example_test.go | 46 +++++++++++++++++++++++----------------------- 2 files changed, 32 insertions(+), 32 deletions(-) diff --git a/example123_test.go b/example123_test.go index 814b3aa..741d08b 100644 --- a/example123_test.go +++ b/example123_test.go @@ -18,11 +18,11 @@ func ExampleFromSeq() { // Transform each number // Concurrency = 3 - results := rill.Map(numbers, 3, func(x int) (int, error) { - return doSomethingWithNumber(x), nil + squares := rill.Map(numbers, 3, func(x int) (int, error) { + return square(x), nil }) - printStream(results) + printStream(squares) } func ExampleFromSeq2() { @@ -40,11 +40,11 @@ func ExampleFromSeq2() { // Transform each number // Concurrency = 3 - results := rill.Map(numbers, 3, func(x int) (int, error) { - return doSomethingWithNumber(x), nil + squares := rill.Map(numbers, 3, func(x int) (int, error) { + return square(x), nil }) - printStream(results) + printStream(squares) } func ExampleToSeq2() { @@ -53,12 +53,12 @@ func ExampleToSeq2() { // Transform each number // Concurrency = 3 - results := rill.Map(numbers, 3, func(x int) (int, error) { - return doSomethingWithNumber(x), nil + squares := rill.Map(numbers, 3, func(x int) (int, error) { + return square(x), nil }) // Convert the stream into an iterator and use for-range to print the results - for val, err := range rill.ToSeq2(results) { + for val, err := range rill.ToSeq2(squares) { if err != nil { fmt.Println("Error:", err) break // cleanup is done regardless of early exit diff --git a/example_test.go b/example_test.go index f2f4666..75b459f 100644 --- a/example_test.go +++ b/example_test.go @@ -591,10 +591,10 @@ func ExampleForEach() { // Convert a slice of numbers into a stream numbers := rill.FromSlice([]int{1, 2, 3, 4, 5, 6, 7, 8, 9, 10}, nil) - // Do something with each number and print the result + // Square each number and print the result // Concurrency = 3 err := rill.ForEach(numbers, 3, func(x int) error { - y := doSomethingWithNumber(x) + y := square(x) fmt.Println(y) return nil }) @@ -610,15 +610,15 @@ func ExampleForEach_ordered() { // Convert a slice of numbers into a stream numbers := rill.FromSlice([]int{1, 2, 3, 4, 5, 6, 7, 8, 9, 10}, nil) - // Do something with each number + // Square each number // Concurrency = 3; Ordered - results := rill.OrderedMap(numbers, 3, func(x int) (int, error) { - return doSomethingWithNumber(x), nil + squares := rill.OrderedMap(numbers, 3, func(x int) (int, error) { + return square(x), nil }) // Print results. // Concurrency = 1; Ordered - err := rill.ForEach(results, 1, func(y int) error { + err := rill.ForEach(squares, 1, func(y int) error { fmt.Println(y) return nil }) @@ -633,11 +633,11 @@ func ExampleMap() { // Transform each number // Concurrency = 3 - results := rill.Map(numbers, 3, func(x int) (int, error) { - return doSomethingWithNumber(x), nil + squares := rill.Map(numbers, 3, func(x int) (int, error) { + return square(x), nil }) - printStream(results) + printStream(squares) } // The same example as for the [Map], but using ordered versions of functions. @@ -647,11 +647,11 @@ func ExampleOrderedMap() { // Transform each number // Concurrency = 3; Ordered - results := rill.OrderedMap(numbers, 3, func(x int) (int, error) { - return doSomethingWithNumber(x), nil + squares := rill.OrderedMap(numbers, 3, func(x int) (int, error) { + return square(x), nil }) - printStream(results) + printStream(squares) } func ExampleMapReduce() { @@ -709,11 +709,11 @@ func ExampleToSlice() { // Transform each number // Concurrency = 3; Ordered - results := rill.OrderedMap(numbers, 3, func(x int) (int, error) { - return doSomethingWithNumber(x), nil + squares := rill.OrderedMap(numbers, 3, func(x int) (int, error) { + return square(x), nil }) - resultsSlice, err := rill.ToSlice(results) + resultsSlice, err := rill.ToSlice(squares) fmt.Println("Result:", resultsSlice) fmt.Println("Error:", err) @@ -735,15 +735,8 @@ func ExampleUnbatch() { // --- Helpers --- -// helper function that squares the number +// helper function that checks if a number is prime // and simulates some additional work using sleep -func doSomethingWithNumber(x int) int { - randomSleep(500 * time.Millisecond) // simulate some additional work - return x * x -} - -// naive prime number check. -// also simulates some additional work using sleep func isPrime(n int) bool { randomSleep(500 * time.Millisecond) // simulate some additional work @@ -758,6 +751,13 @@ func isPrime(n int) bool { return true } +// helper function that squares the number +// and simulates some additional work using sleep +func square(x int) int { + randomSleep(500 * time.Millisecond) // simulate some additional work + return x * x +} + // printStream prints all items from a stream (one per line) and an error if any. func printStream[A any](stream <-chan rill.Try[A]) { fmt.Println("Result:") From 9c805767404936ca1128f964b9804f6091715b23 Mon Sep 17 00:00:00 2001 From: destel Date: Thu, 21 Nov 2024 20:00:17 +0200 Subject: [PATCH 03/18] Changed examples and texts in the first 3 sections of the readme --- README.md | 77 +++++++++++++++++++++++++++++++++++-------------------- 1 file changed, 49 insertions(+), 28 deletions(-) diff --git a/README.md b/README.md index f6ae4f7..33a9c27 100644 --- a/README.md +++ b/README.md @@ -46,20 +46,39 @@ does not grow with the input size. ## Quick Start -Rill makes it easy to process data concurrently. -Here's a simple example using **ForEach** to process items in parallel while handling errors: +Rill transforms concurrent operations into clean, readable pipelines. +Here's a practical example that fetches users from an API, activates them, and saves the changes back - +all with explicit control over concurrency at each step. -[Try it](https://pkg.go.dev/github.com/destel/rill#example-ForEach) +[Try it](https://pkg.go.dev/github.com/destel/rill#example-package) ```go func main() { - // Convert a slice of numbers into a channel - numbers := rill.FromSlice([]int{1, 2, 3, 4, 5, 6, 7, 8, 9, 10}, nil) + ctx := context.Background() + + // Convert a slice of user IDs into a channel + ids := rill.FromSlice([]int{1, 2, 3, 4, 5, 6, 7, 8, 9, 10}, nil) - // Do something with each number and print the result + // Read users from the API. // Concurrency = 3 - err := rill.ForEach(numbers, 3, func(x int) error { - y := doSomethingWithNumber(x) - fmt.Println(y) + users := rill.Map(ids, 3, func(id int) (*mockapi.User, error) { + return mockapi.GetUser(ctx, id) + }) + + // Activate users. + // Concurrency = 2 + err := rill.ForEach(users, 2, func(u *mockapi.User) error { + if u.IsActive { + fmt.Printf("User %d is already active\n", u.ID) + return nil + } + + u.IsActive = true + err := mockapi.SaveUser(ctx, u) + if err != nil { + return err + } + + fmt.Printf("User saved: %+v\n", u) return nil }) @@ -69,12 +88,14 @@ func main() { ``` -## Multi-Stage Pipelines -The result as above can also be achieved with WaitGroup or ErrGroup, -but Rill shines when building complex multi-stage concurrent pipelines. +## Batching +While processing items individually works well in many cases, it's often more efficient to handle items in batches, +especially when working with external services or databases. This reduces the number of queries and API calls, +improves throughput, and often reduces costs -The next example demonstrates a multi-stage pipeline that fetches users from an external API in batches, -updates their status to active, and saves them back, while controlling the level of concurrency at each step. +The previous example can be improved by using the API's bulk fetching capability. Instead of making individual +`GetUser` calls, the IDs can be grouped into batches to fetch multiple users at once using a single `GetUsers` API call. +The **Batch** function transforms a stream of individual items into a stream of batches, and **Unbatch** does the reverse. [Try it](https://pkg.go.dev/github.com/destel/rill#example-package-Batching) ```go @@ -88,7 +109,7 @@ func main() { }, nil) // Group IDs into batches of 5 - idBatches := rill.Batch(ids, 5, 1*time.Second) + idBatches := rill.Batch(ids, 5, -1) // Bulk fetch users from the API // Concurrency = 3 @@ -123,22 +144,22 @@ func main() { ``` -## Batching -When working with external services or databases, batching is a common pattern to reduce the number of requests and improve performance. -Rill provides a **Batch** function that transforms a stream of items into a stream of batches of a specified size. It's also possible -to specify a timeout, after which a batch is emitted even if it's not full. This is useful for keeping an application reactive -when the input stream is slow or sparse. +## Real-Time Batching +Real-world applications often need to handle data that arrives at unpredictable rates. While batching is still +desirable for efficiency, waiting to collect a full batch might introduce unacceptable delays when +the input stream becomes slow or sparse. -Previous examples have already shown how to use **Batch** to group user IDs for bulk fetching. -Let's examine a case where batching with timeout is particularly useful. +Rill solves this with timeout-based batching: batches are emitted either when they're full or after a specified timeout, +whichever comes first. This ensures optimal batch sizes during high load while maintaining responsiveness during quiet periods. -In the example below, the `UpdateUserTimestamp` function updates the _last_active_at_ column in the _users_ table with the current timestamp. -This function is called concurrently from multiple places in the application, such as HTTP handlers. -A large number of such calls would cause a large number of concurrent SQL queries, potentially overwhelming the database. +Consider an application that needs to update users' _last_active_at_ timestamps in a database. The function responsible +for this - `UpdateUserTimestamp` can be called concurrently, at unpredictable rates, and from different parts of the application. +Sending all this updates individually may create too many concurrent queries, potentially overwhelming the database. -To mitigate this, it's possible to group the updates and send them to the database in bulk using the **Batch** function. -And when updates are sparse, the _timeout_ setting makes sure they're delayed by at most 100ms, -balancing between reducing database load and data freshness. +In the example below, updates are collected into batches of up to 5 items, but a batch is also emitted if 100ms passes +without reaching the full size. +This provides an excellent balance between efficiency and latency: full batches and zero latency during high load, +smaller batches and up to 100ms latency during quiet periods. [Try it](https://pkg.go.dev/github.com/destel/rill#example-package-BatchingWithTimeout) ```go From 3302efb647d3a9c85715384e7c21179cbe8fe3b4 Mon Sep 17 00:00:00 2001 From: destel Date: Fri, 22 Nov 2024 02:02:23 +0200 Subject: [PATCH 04/18] Few more Goals tweaks --- README.md | 52 +++++++++++++++++++++++++--------------------------- 1 file changed, 25 insertions(+), 27 deletions(-) diff --git a/README.md b/README.md index 33a9c27..65ae91b 100644 --- a/README.md +++ b/README.md @@ -11,9 +11,8 @@ go get -u github.com/destel/rill ## Goals - **Make common tasks easier.** -Rill provides a cleaner way of solving common concurrency problems, such as -processing slices and channels, calling APIs, or making DB queries in parallel. -It removes boilerplate and abstracts away the complexities of goroutine orchestration and error handling. +Rill provides a clean and safe way of solving common concurrency problems, such as parallel job execution. +It removes boilerplate and abstracts away the complexities of goroutine, channel, and error management. At the same time, developers retain full control over the concurrency level of all operations. - **Make concurrent code composable and clean.** @@ -46,6 +45,8 @@ does not grow with the input size. ## Quick Start + + Rill transforms concurrent operations into clean, readable pipelines. Here's a practical example that fetches users from an API, activates them, and saves the changes back - all with explicit control over concurrency at each step. @@ -156,7 +157,7 @@ Consider an application that needs to update users' _last_active_at_ timestamps for this - `UpdateUserTimestamp` can be called concurrently, at unpredictable rates, and from different parts of the application. Sending all this updates individually may create too many concurrent queries, potentially overwhelming the database. -In the example below, updates are collected into batches of up to 5 items, but a batch is also emitted if 100ms passes + In the example below, updates are collected into batches of up to 5 items, but a batch is also emitted if 100ms passes without reaching the full size. This provides an excellent balance between efficiency and latency: full batches and zero latency during high load, smaller batches and up to 100ms latency during quiet periods. @@ -233,41 +234,38 @@ Rill is context-agnostic, meaning that it does not enforce any specific context However, it's recommended to make user-defined pipeline stages context-aware. This is especially important for the initial stage, as it allows to stop feeding the pipeline with new items when the context is canceled. -In the example below the `FindFirstPrime` function uses several concurrent workers to find the first prime number after -a given number. Internally it creates an infinite stream of numbers. When the first prime number is found -in that stream, the context gets canceled, and the pipeline terminates gracefully. +In the example below the `CheckAllUsersExist` function uses several concurrent workers to check if all users +from the given list exist. The function returns as soon as it encounters a non-existent user. +Such early return triggers the context cancellation, which in-turn stops all remaining users fetches. [Try it](https://pkg.go.dev/github.com/destel/rill#example-package-Context) ```go func main() { - p := FindFirstPrime(10000, 3) // Use 3 concurrent workers - fmt.Println("The first prime after 10000 is", p) + ctx := context.Background() + + // ID 999 doesn't exist, so fetching will stop after hitting it. + err := CheckAllUsersExist(ctx, 3, []int{1, 2, 3, 4, 5, 999, 7, 8, 9, 10}) + fmt.Printf("Check result: %v\n", err) } -// FindFirstPrime finds the first prime number after the given number, using several concurrent workers. -func FindFirstPrime(after int, concurrency int) int { - ctx, cancel := context.WithCancel(context.Background()) +// CheckAllUsersExist uses several concurrent workers to checks if all users with given IDs exist. +func CheckAllUsersExist(ctx context.Context, concurrency int, ids []int) error { + ctx, cancel := context.WithCancel(ctx) defer cancel() - numbers := make(chan rill.Try[int]) - go func() { - defer close(numbers) - for i := after + 1; ; i++ { - select { - case <-ctx.Done(): - return - case numbers <- rill.Wrap(i, nil): - } + idsStream := rill.FromSlice(ids, nil) + + users := rill.Map(idsStream, concurrency, func(id int) (*mockapi.User, error) { + u, err := mockapi.GetUser(ctx, id) + if err != nil { + return nil, fmt.Errorf("failed to fetch user %d: %w", id, err) } - }() - primes := rill.OrderedFilter(numbers, concurrency, func(x int) (bool, error) { - fmt.Println("Checking", x) - return isPrime(x), nil + fmt.Printf("Fetched user %d\n", id) + return u, nil }) - result, _, _ := rill.First(primes) - return result + return rill.Err(users) } ``` From 11d24c3497a5160b2ab62138dd67f281046c412d Mon Sep 17 00:00:00 2001 From: destel Date: Fri, 22 Nov 2024 02:19:50 +0200 Subject: [PATCH 05/18] Example changes --- example_test.go | 55 +++++++++++++++++++++++------------------------- mockapi/users.go | 3 ++- 2 files changed, 28 insertions(+), 30 deletions(-) diff --git a/example_test.go b/example_test.go index 75b459f..78fa560 100644 --- a/example_test.go +++ b/example_test.go @@ -67,7 +67,7 @@ func Example_batching() { }, nil) // Group IDs into batches of 5 - idBatches := rill.Batch(ids, 5, 1*time.Second) + idBatches := rill.Batch(ids, 5, -1) // Bulk fetch users from the API // Concurrency = 3 @@ -309,43 +309,40 @@ func StreamUsers(ctx context.Context, query *mockapi.UserQuery) <-chan rill.Try[ return res } -// This example demonstrates how to use a context for pipeline termination. -// The FindFirstPrime function uses several concurrent workers to find the first prime number after a given number. -// Internally it creates a pipeline that starts from an infinite stream of numbers. When the first prime number is found -// in that stream, the context gets canceled, and the pipeline terminates gracefully. +// This example demonstrates how to gracefully stop a pipeline on the first error. +// The CheckAllUsersExist uses several concurrent workers and returns an error as soon as it encounters a non-existent user. +// Such early return triggers the context cancellation, which in turn stops all remaining users fetches. func Example_context() { - p := FindFirstPrime(10000, 3) // Use 3 concurrent workers - fmt.Println("The first prime after 10000 is", p) + ctx := context.Background() + + // ID 999 doesn't exist, so fetching will stop after hitting it. + err := CheckAllUsersExist(ctx, 3, []int{1, 2, 3, 4, 5, 999, 7, 8, 9, 10}) + fmt.Printf("Check result: %v\n", err) } -// FindFirstPrime finds the first prime number after the given number, using several concurrent workers. -func FindFirstPrime(after int, concurrency int) int { - ctx, cancel := context.WithCancel(context.Background()) +// CheckAllUsersExist uses several concurrent workers to checks if all users with given IDs exist. +func CheckAllUsersExist(ctx context.Context, concurrency int, ids []int) error { + // Create new context that will be canceled when this function returns + ctx, cancel := context.WithCancel(ctx) defer cancel() - // Generate an infinite stream of numbers starting from the given number - numbers := make(chan rill.Try[int]) - go func() { - defer close(numbers) - for i := after + 1; ; i++ { - select { - case <-ctx.Done(): - return // Stop generating numbers when the context is canceled - case numbers <- rill.Wrap(i, nil): - } + idsStream := rill.FromSlice(ids, nil) + + // Fetch users concurrently. + // Prints messages for successfully fetched users to demonstrate + // how pipeline stops after first error. + users := rill.Map(idsStream, concurrency, func(id int) (*mockapi.User, error) { + u, err := mockapi.GetUser(ctx, id) + if err != nil { + return nil, fmt.Errorf("failed to fetch user %d: %w", id, err) } - }() - // Filter out non-prime numbers, preserve the order - primes := rill.OrderedFilter(numbers, concurrency, func(x int) (bool, error) { - fmt.Println("Checking", x) - return isPrime(x), nil + fmt.Printf("Fetched user %d\n", id) + return u, nil }) - // Get the first prime and cancel the context - // This stops number generation and allows goroutines to exit - result, _, _ := rill.First(primes) - return result + // Stop at first error and cancel remaining fetches via context + return rill.Err(users) } // --- Function examples --- diff --git a/mockapi/users.go b/mockapi/users.go index a723ab7..2fc2d1c 100644 --- a/mockapi/users.go +++ b/mockapi/users.go @@ -54,11 +54,12 @@ func GetDepartments() ([]string, error) { // GetUser returns a user by ID. func GetUser(ctx context.Context, id int) (*User, error) { - randomSleep(ctx, 500*time.Millisecond) if err := ctx.Err(); err != nil { return nil, err } + randomSleep(ctx, 500*time.Millisecond) + mu.RLock() defer mu.RUnlock() From 384a30e035caea446861cb904048f781d251d968 Mon Sep 17 00:00:00 2001 From: destel Date: Fri, 22 Nov 2024 15:16:12 +0200 Subject: [PATCH 06/18] Renamed rt batching example --- README.md | 2 +- example_test.go | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/README.md b/README.md index 65ae91b..49147e7 100644 --- a/README.md +++ b/README.md @@ -162,7 +162,7 @@ without reaching the full size. This provides an excellent balance between efficiency and latency: full batches and zero latency during high load, smaller batches and up to 100ms latency during quiet periods. -[Try it](https://pkg.go.dev/github.com/destel/rill#example-package-BatchingWithTimeout) +[Try it](https://pkg.go.dev/github.com/destel/rill#example-package-BatchingRealTime) ```go func main() { // Start the background worker that processes the updates diff --git a/example_test.go b/example_test.go index 78fa560..f148009 100644 --- a/example_test.go +++ b/example_test.go @@ -108,7 +108,7 @@ func Example_batching() { // emits partial batches, ensuring that updates are delayed by at most 100ms. // // For simplicity, this example does not have retries, error handling and synchronization -func Example_batchingWithTimeout() { +func Example_batchingRealTime() { // Start the background worker that processes the updates go updateUserTimestampWorker() From 91406ccadc2fbde84f2245e8b39713bd246cebde Mon Sep 17 00:00:00 2001 From: destel Date: Fri, 22 Nov 2024 15:58:38 +0200 Subject: [PATCH 07/18] Readme for mockapi --- mockapi/README.md | 4 ++++ 1 file changed, 4 insertions(+) create mode 100644 mockapi/README.md diff --git a/mockapi/README.md b/mockapi/README.md new file mode 100644 index 0000000..91e373b --- /dev/null +++ b/mockapi/README.md @@ -0,0 +1,4 @@ +# Mock API + +This package provides a very simple mock API used in runnable examples on https://pkg.go.dev/github.com/destel/rill +The package should remain public to be accessible from the go playground. \ No newline at end of file From 467c9a8d92e652b1029060fddda762ef8fa1c377 Mon Sep 17 00:00:00 2001 From: destel Date: Sat, 23 Nov 2024 01:33:36 +0200 Subject: [PATCH 08/18] Better flatmap section in the readme --- README.md | 184 ++++++++++++++++++++++++---------------------- example_test.go | 52 +++++++------ mockapi/README.md | 2 +- 3 files changed, 127 insertions(+), 111 deletions(-) diff --git a/README.md b/README.md index 49147e7..6155dea 100644 --- a/README.md +++ b/README.md @@ -11,7 +11,8 @@ go get -u github.com/destel/rill ## Goals - **Make common tasks easier.** -Rill provides a clean and safe way of solving common concurrency problems, such as parallel job execution. +Rill provides a cleaner and safer way of solving common concurrency problems, such as parallel job execution or +real-time event processing. It removes boilerplate and abstracts away the complexities of goroutine, channel, and error management. At the same time, developers retain full control over the concurrency level of all operations. @@ -270,80 +271,6 @@ func CheckAllUsersExist(ctx context.Context, concurrency int, ids []int) error { ``` -## Boosting Sequential Operations -There is a technique that significantly accelerates some seemingly sequential operations by -branching them into multiple parallel streams and then merging the results. -Common examples include listing S3 objects, querying APIs, or reading from databases. - -Example below fetches all users from an external paginated API. Doing it sequentially, page-by-page, -would take a long time since the API is slow and the number of pages is large. -One way to speed this up is to fetch users from multiple departments at the same time. -The code below uses **FlatMap** to stream users from 3 departments concurrently and merge the results as they arrive, -achieving up to 3x speedup compared to sequential processing. - -Additionally, it demonstrates how to write a custom reusable streaming wrapper around an existing API function. -The `StreamUsers` function is useful on its own, but can also be a part of a larger pipeline. - -[Try it](https://pkg.go.dev/github.com/destel/rill#example-package-ParallelStreams) -```go -func main() { - ctx := context.Background() - - // Convert a list of all departments into a stream - departments := rill.FromSlice(mockapi.GetDepartments()) - - // Use FlatMap to stream users from 3 departments concurrently. - users := rill.FlatMap(departments, 3, func(department string) <-chan rill.Try[*mockapi.User] { - return StreamUsers(ctx, &mockapi.UserQuery{Department: department}) - }) - - // Print the users from the combined stream - err := rill.ForEach(users, 1, func(user *mockapi.User) error { - fmt.Printf("%+v\n", user) - return nil - }) - fmt.Println("Error:", err) -} - -// StreamUsers is a reusable streaming wrapper around the mockapi.ListUsers function. -// It iterates through all listing pages and returns a stream of users. -// This function is useful on its own or as a building block for more complex pipelines. -func StreamUsers(ctx context.Context, query *mockapi.UserQuery) <-chan rill.Try[*mockapi.User] { - res := make(chan rill.Try[*mockapi.User]) - - if query == nil { - query = &mockapi.UserQuery{} - } - - go func() { - defer close(res) - - for page := 0; ; page++ { - query.Page = page - - users, err := mockapi.ListUsers(ctx, query) - if err != nil { - res <- rill.Wrap[*mockapi.User](nil, err) - return - } - - if len(users) == 0 { - break - } - - for _, user := range users { - res <- rill.Wrap(user, nil) - } - } - }() - - return res -} -``` - - - - ## Order Preservation (Ordered Fan-In) Concurrent processing can boost performance, but since tasks take different amounts of time to complete, the results' order usually differs from the input order. This seemingly simple problem is deceptively challenging to solve correctly, @@ -370,20 +297,13 @@ func main() { // The string to search for in the downloaded files needle := []byte("26") - // Manually generate a stream of URLs from http://example.com/file-0.txt to http://example.com/file-999.txt - urls := make(chan rill.Try[string]) - go func() { - defer close(urls) - for i := 0; i < 1000; i++ { - // Stop generating URLs after the context is canceled (when the file is found) - // This can be rewritten as a select statement, but it's not necessary - if err := ctx.Err(); err != nil { - return - } + // Start with a stream of numbers from 0 to 999 + fileIDs := streamNumbers(ctx, 0, 1000) - urls <- rill.Wrap(fmt.Sprintf("https://example.com/file-%d.txt", i), nil) - } - }() + // Generate a stream of URLs from http://example.com/file-0.txt to http://example.com/file-999.txt + urls := rill.OrderedMap(fileIDs, 1, func(id int) (string, error) { + return fmt.Sprintf("https://example.com/file-%d.txt", id), nil + }) // Download and process the files // At most 5 files are downloaded and held in memory at the same time @@ -413,8 +333,96 @@ func main() { fmt.Println("Not found") } } + +// helper function that creates a stream of numbers [start, end) and respects the context +func streamNumbers(ctx context.Context, start, end int) <-chan rill.Try[int] { + out := make(chan rill.Try[int]) + go func() { + defer close(out) + for i := start; i < end; i++ { + select { + case <-ctx.Done(): + return + case out <- rill.Try[int]{Value: i}: + } + } + }() + return out +} ``` + +## Stream Merging and FlatMap +Rill comes with the **Merge** function that combines multiple streams into a single one. Another, often overlooked, +function that can combine streams is **FlatMap**. It's a powerful tool that transforms each input item into its own stream, +and then merges all those streams together. + +In the example below **FlatMap** transforms each department into its own stream of users, and then merges them into a final unified stream of users. +Like other Rill functions, **FlatMap** gives full control over concurrency. +In this particular case the concurrency level is 3, meaning that users are fetched from up to 3 departments at the same time. + +Additionally, this example demonstrates how to write a reusable streaming wrapper over paginated API calls - the `StreamUsers` function. +This wrapper can be useful both on its own and as part of larger pipelines. + +[Try it](https://pkg.go.dev/github.com/destel/rill#example-package-FlatMap) +```go +func main() { + ctx := context.Background() + + // Start with a stream of department names + departments := rill.FromSlice([]string{"IT", "Finance", "Marketing", "Support", "Engineering"}, nil) + + // Stream users from all departments concurrently. + // At most 3 departments at the same time. + users := rill.FlatMap(departments, 3, func(department string) <-chan rill.Try[*mockapi.User] { + return StreamUsers(ctx, &mockapi.UserQuery{Department: department}) + }) + + // Print the users from the combined stream + err := rill.ForEach(users, 1, func(user *mockapi.User) error { + fmt.Printf("%+v\n", user) + return nil + }) + fmt.Println("Error:", err) +} + +// StreamUsers is a reusable streaming wrapper around the mockapi.ListUsers function. +// It iterates through all listing pages and returns a stream of users. +// This function is useful both on its own and as part of larger pipelines. +func StreamUsers(ctx context.Context, query *mockapi.UserQuery) <-chan rill.Try[*mockapi.User] { + res := make(chan rill.Try[*mockapi.User]) + + if query == nil { + query = &mockapi.UserQuery{} + } + + go func() { + defer close(res) + + for page := 0; ; page++ { + query.Page = page + + users, err := mockapi.ListUsers(ctx, query) + if err != nil { + res <- rill.Wrap[*mockapi.User](nil, err) + return + } + + if len(users) == 0 { + break + } + + for _, user := range users { + res <- rill.Wrap(user, nil) + } + } + }() + + return res +} +``` + + ## Go 1.23 Iterators Starting from Go 1.23, the language adds *range-over-function* feature, allowing users to define custom iterators for use in for-range loops. This feature enables Rill to integrate seamlessly with existing iterator-based functions diff --git a/example_test.go b/example_test.go index f148009..bc34af6 100644 --- a/example_test.go +++ b/example_test.go @@ -170,20 +170,13 @@ func Example_ordering() { // The string to search for in the downloaded files needle := []byte("26") - // Manually generate a stream of URLs from http://example.com/file-0.txt to http://example.com/file-999.txt - urls := make(chan rill.Try[string]) - go func() { - defer close(urls) - for i := 0; i < 1000; i++ { - // Stop generating URLs after the context is canceled (when the file is found) - // This can be rewritten as a select statement, but it's not necessary - if err := ctx.Err(); err != nil { - return - } + // Start with a stream of numbers from 0 to 999 + fileIDs := streamNumbers(ctx, 0, 1000) - urls <- rill.Wrap(fmt.Sprintf("https://example.com/file-%d.txt", i), nil) - } - }() + // Generate a stream of URLs from http://example.com/file-0.txt to http://example.com/file-999.txt + urls := rill.OrderedMap(fileIDs, 1, func(id int) (string, error) { + return fmt.Sprintf("https://example.com/file-%d.txt", id), nil + }) // Download and process the files // At most 5 files are downloaded and held in memory at the same time @@ -251,17 +244,16 @@ func sendMessage(message string, server string) error { return nil } -// This example demonstrates using [FlatMap] to accelerate paginated API calls. Instead of fetching all users sequentially, -// page-by-page (which would take a long time since the API is slow and the number of pages is large), it fetches users from -// multiple departments in parallel. The example also shows how to write a reusable streaming wrapper around an existing -// API function that can be used on its own or as part of a larger pipeline. -func Example_parallelStreams() { +// This example demonstrates using [FlatMap] to fetch users from multiple departments concurrently. +// Additionally, it demonstrates how to write a reusable streaming wrapper over paginated API calls - the StreamUsers function +func Example_flatMap() { ctx := context.Background() - // Convert a list of all departments into a stream - departments := rill.FromSlice(mockapi.GetDepartments()) + // Start with a stream of department names + departments := rill.FromSlice([]string{"IT", "Finance", "Marketing", "Support", "Engineering"}, nil) - // Use FlatMap to stream users from 3 departments concurrently. + // Stream users from all departments concurrently. + // At most 3 departments at the same time. users := rill.FlatMap(departments, 3, func(department string) <-chan rill.Try[*mockapi.User] { return StreamUsers(ctx, &mockapi.UserQuery{Department: department}) }) @@ -276,7 +268,7 @@ func Example_parallelStreams() { // StreamUsers is a reusable streaming wrapper around the mockapi.ListUsers function. // It iterates through all listing pages and returns a stream of users. -// This function is useful on its own or as a building block for more complex pipelines. +// This function is useful both on its own and as part of larger pipelines. func StreamUsers(ctx context.Context, query *mockapi.UserQuery) <-chan rill.Try[*mockapi.User] { res := make(chan rill.Try[*mockapi.User]) @@ -755,6 +747,22 @@ func square(x int) int { return x * x } +// helper function that creates a stream of numbers [start, end) and respects the context +func streamNumbers(ctx context.Context, start, end int) <-chan rill.Try[int] { + out := make(chan rill.Try[int]) + go func() { + defer close(out) + for i := start; i < end; i++ { + select { + case <-ctx.Done(): + return + case out <- rill.Try[int]{Value: i}: + } + } + }() + return out +} + // printStream prints all items from a stream (one per line) and an error if any. func printStream[A any](stream <-chan rill.Try[A]) { fmt.Println("Result:") diff --git a/mockapi/README.md b/mockapi/README.md index 91e373b..ec8f558 100644 --- a/mockapi/README.md +++ b/mockapi/README.md @@ -1,4 +1,4 @@ # Mock API This package provides a very simple mock API used in runnable examples on https://pkg.go.dev/github.com/destel/rill -The package should remain public to be accessible from the go playground. \ No newline at end of file +The package is kept public to be accessible from the go playground. \ No newline at end of file From d007e46d8f67ba9e60932e4a949d432f31516c64 Mon Sep 17 00:00:00 2001 From: destel Date: Sat, 23 Nov 2024 02:03:42 +0200 Subject: [PATCH 09/18] Better intro text for the first example --- README.md | 7 ++----- 1 file changed, 2 insertions(+), 5 deletions(-) diff --git a/README.md b/README.md index 6155dea..a502a55 100644 --- a/README.md +++ b/README.md @@ -46,11 +46,8 @@ does not grow with the input size. ## Quick Start - - -Rill transforms concurrent operations into clean, readable pipelines. -Here's a practical example that fetches users from an API, activates them, and saves the changes back - -all with explicit control over concurrency at each step. +Let's look at a common task: fetch users from an API, activate them, and save the changes back. +The example shows how to control concurrency at each step while keeping the code clean and manageable. [Try it](https://pkg.go.dev/github.com/destel/rill#example-package) ```go From 73a5a4074be4ff67df5700cac95610cf688ff0b2 Mon Sep 17 00:00:00 2001 From: destel Date: Sat, 23 Nov 2024 02:04:23 +0200 Subject: [PATCH 10/18] Better mockapi readme --- mockapi/README.md | 11 +++++++++-- mockapi/users.go | 4 ++-- 2 files changed, 11 insertions(+), 4 deletions(-) diff --git a/mockapi/README.md b/mockapi/README.md index ec8f558..5ae2fb2 100644 --- a/mockapi/README.md +++ b/mockapi/README.md @@ -1,4 +1,11 @@ # Mock API -This package provides a very simple mock API used in runnable examples on https://pkg.go.dev/github.com/destel/rill -The package is kept public to be accessible from the go playground. \ No newline at end of file +A minimal mock API implementation used in runnable examples on [pkg.go.dev](https://pkg.go.dev/github.com/destel/rill). + +This package simulates common API patterns like: +- Single and bulk item fetching +- Pagination +- Network delay simulation +- Realistic error scenarios + +The package is intentionally kept public to enable running and experimenting with examples in the Go Playground. \ No newline at end of file diff --git a/mockapi/users.go b/mockapi/users.go index 2fc2d1c..8160618 100644 --- a/mockapi/users.go +++ b/mockapi/users.go @@ -1,5 +1,5 @@ -// Package mockapi provides a very basic mock API client for examples and demos. -// Unfortunately it must live outside the internal folder to be accessible from runnable examples and go playground. +// Package mockapi provides a very basic mock API for examples and demos. +// It's intentionally kept public to enable running and experimenting with examples in the Go Playground. package mockapi import ( From 3a4d98ced6e7f9a94b016c8b344d796e276f4150 Mon Sep 17 00:00:00 2001 From: destel Date: Sat, 23 Nov 2024 02:59:28 +0200 Subject: [PATCH 11/18] Minor tweaks --- README.md | 20 +++++++++++--------- 1 file changed, 11 insertions(+), 9 deletions(-) diff --git a/README.md b/README.md index a502a55..2687b7c 100644 --- a/README.md +++ b/README.md @@ -92,9 +92,11 @@ While processing items individually works well in many cases, it's often more ef especially when working with external services or databases. This reduces the number of queries and API calls, improves throughput, and often reduces costs -The previous example can be improved by using the API's bulk fetching capability. Instead of making individual -`GetUser` calls, the IDs can be grouped into batches to fetch multiple users at once using a single `GetUsers` API call. -The **Batch** function transforms a stream of individual items into a stream of batches, and **Unbatch** does the reverse. +To demonstrate batching, let's improve the previous example by using the API's bulk fetching capability. +The **Batch** function transforms a stream of individual IDs into a stream of batches, enabling the use of `GetUsers` API +to fetch multiple users in a single call instead of making individual `GetUser` requests. + + [Try it](https://pkg.go.dev/github.com/destel/rill#example-package-Batching) ```go @@ -149,16 +151,16 @@ desirable for efficiency, waiting to collect a full batch might introduce unacce the input stream becomes slow or sparse. Rill solves this with timeout-based batching: batches are emitted either when they're full or after a specified timeout, -whichever comes first. This ensures optimal batch sizes during high load while maintaining responsiveness during quiet periods. +whichever comes first. This approach ensures optimal batch sizes during high load while maintaining responsiveness during quiet periods. Consider an application that needs to update users' _last_active_at_ timestamps in a database. The function responsible for this - `UpdateUserTimestamp` can be called concurrently, at unpredictable rates, and from different parts of the application. -Sending all this updates individually may create too many concurrent queries, potentially overwhelming the database. +Performing all these updates individually may create too many concurrent queries, potentially overwhelming the database. - In the example below, updates are collected into batches of up to 5 items, but a batch is also emitted if 100ms passes -without reaching the full size. -This provides an excellent balance between efficiency and latency: full batches and zero latency during high load, -smaller batches and up to 100ms latency during quiet periods. +In the example below, the updates are queued into `userIDsToUpdate` channel and then grouped into batches of up to 5 items, +with each batch sent to the database as a single query. +The *Batch* functions is used with a timeout of 100ms, meaning zero latency during high load, +and up to 100ms latency with smaller batches during quiet periods. [Try it](https://pkg.go.dev/github.com/destel/rill#example-package-BatchingRealTime) ```go From ced854e7f83bf7b32e34b28939cc6d843d604d06 Mon Sep 17 00:00:00 2001 From: destel Date: Sat, 23 Nov 2024 13:10:40 +0200 Subject: [PATCH 12/18] Minor tweaks --- README.md | 7 +++---- 1 file changed, 3 insertions(+), 4 deletions(-) diff --git a/README.md b/README.md index 2687b7c..afcf755 100644 --- a/README.md +++ b/README.md @@ -33,8 +33,7 @@ or building responsive data pipelines. - **Provide solutions for advanced tasks.** Beyond basic operations, the library includes ready-to-use functions for batching, ordered fan-in, map-reduce, -stream splitting, merging, and more. Pipelines, while usually linear, -can have any topology forming a directed acyclic graph (DAG). +stream splitting, merging, and more. Pipelines, while usually linear, can have any cycle-free topology (DAG). - **Support custom extensions.** Since Rill operates on standard Go channels, it's easy to write custom functions compatible with the library. @@ -46,7 +45,7 @@ does not grow with the input size. ## Quick Start -Let's look at a common task: fetch users from an API, activate them, and save the changes back. +Let's look at a practical example: fetch users from an API, activate them, and save the changes back. The example shows how to control concurrency at each step while keeping the code clean and manageable. [Try it](https://pkg.go.dev/github.com/destel/rill#example-package) @@ -94,7 +93,7 @@ improves throughput, and often reduces costs To demonstrate batching, let's improve the previous example by using the API's bulk fetching capability. The **Batch** function transforms a stream of individual IDs into a stream of batches, enabling the use of `GetUsers` API -to fetch multiple users in a single call instead of making individual `GetUser` requests. +to fetch multiple users in a single call, instead of making individual `GetUser` calls. From 97a835a0afe385341a670f21487f33d8e3c3d6bd Mon Sep 17 00:00:00 2001 From: destel Date: Sat, 23 Nov 2024 13:41:33 +0200 Subject: [PATCH 13/18] Minor tweaks --- README.md | 17 ++++++++--------- 1 file changed, 8 insertions(+), 9 deletions(-) diff --git a/README.md b/README.md index afcf755..da3d207 100644 --- a/README.md +++ b/README.md @@ -87,9 +87,9 @@ func main() { ## Batching -While processing items individually works well in many cases, it's often more efficient to handle items in batches, -especially when working with external services or databases. This reduces the number of queries and API calls, -improves throughput, and often reduces costs +Processing items in batches rather than individually can significantly improve performance across many scenarios, +particularly when working with external services or databases. Batching reduces the number of queries and API calls, +increases throughput, and typically lowers costs. To demonstrate batching, let's improve the previous example by using the API's bulk fetching capability. The **Batch** function transforms a stream of individual IDs into a stream of batches, enabling the use of `GetUsers` API @@ -147,7 +147,7 @@ func main() { ## Real-Time Batching Real-world applications often need to handle data that arrives at unpredictable rates. While batching is still desirable for efficiency, waiting to collect a full batch might introduce unacceptable delays when -the input stream becomes slow or sparse. +the input stream slows down or becomes sparse. Rill solves this with timeout-based batching: batches are emitted either when they're full or after a specified timeout, whichever comes first. This approach ensures optimal batch sizes during high load while maintaining responsiveness during quiet periods. @@ -158,7 +158,7 @@ Performing all these updates individually may create too many concurrent queries In the example below, the updates are queued into `userIDsToUpdate` channel and then grouped into batches of up to 5 items, with each batch sent to the database as a single query. -The *Batch* functions is used with a timeout of 100ms, meaning zero latency during high load, +The *Batch* functions is used with a timeout of 100ms, ensuring zero latency during high load, and up to 100ms latency with smaller batches during quiet periods. [Try it](https://pkg.go.dev/github.com/destel/rill#example-package-BatchingRealTime) @@ -271,8 +271,7 @@ func CheckAllUsersExist(ctx context.Context, concurrency int, ids []int) error { ## Order Preservation (Ordered Fan-In) Concurrent processing can boost performance, but since tasks take different amounts of time to complete, -the results' order usually differs from the input order. This seemingly simple problem is deceptively challenging to solve correctly, -especially at scale. +the results' order usually differs from the input order. This seemingly simple problem is deceptively challenging to solve correctly. While out-of-order results are acceptable in many scenarios, some cases require preserving the original order. To address this, rill provides ordered versions of its core functions, such as **OrderedMap** or **OrderedFilter**. @@ -355,7 +354,7 @@ Rill comes with the **Merge** function that combines multiple streams into a sin function that can combine streams is **FlatMap**. It's a powerful tool that transforms each input item into its own stream, and then merges all those streams together. -In the example below **FlatMap** transforms each department into its own stream of users, and then merges them into a final unified stream of users. +In the example below **FlatMap** transforms each department into its own stream of users, and then merges them all into a final unified stream. Like other Rill functions, **FlatMap** gives full control over concurrency. In this particular case the concurrency level is 3, meaning that users are fetched from up to 3 departments at the same time. @@ -422,7 +421,7 @@ func StreamUsers(ctx context.Context, query *mockapi.UserQuery) <-chan rill.Try[ ## Go 1.23 Iterators -Starting from Go 1.23, the language adds *range-over-function* feature, allowing users to define custom iterators +Starting from Go 1.23, the language added *range-over-function* feature, allowing users to define custom iterators for use in for-range loops. This feature enables Rill to integrate seamlessly with existing iterator-based functions in the standard library and third-party packages. From 63797586d597fa6209f77cd7f21c8ff79e12e783 Mon Sep 17 00:00:00 2001 From: destel Date: Sat, 23 Nov 2024 13:50:08 +0200 Subject: [PATCH 14/18] Minor tweaks --- README.md | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/README.md b/README.md index da3d207..7bb4343 100644 --- a/README.md +++ b/README.md @@ -46,7 +46,7 @@ does not grow with the input size. ## Quick Start Let's look at a practical example: fetch users from an API, activate them, and save the changes back. -The example shows how to control concurrency at each step while keeping the code clean and manageable. +It shows how to control concurrency at each step while keeping the code clean and manageable. [Try it](https://pkg.go.dev/github.com/destel/rill#example-package) ```go @@ -274,7 +274,7 @@ Concurrent processing can boost performance, but since tasks take different amou the results' order usually differs from the input order. This seemingly simple problem is deceptively challenging to solve correctly. While out-of-order results are acceptable in many scenarios, some cases require preserving the original order. -To address this, rill provides ordered versions of its core functions, such as **OrderedMap** or **OrderedFilter**. +To address this, Rill provides ordered versions of its core functions, such as **OrderedMap** or **OrderedFilter**. These functions perform additional synchronization under the hood to ensure that if value **x** precedes value **y** in the input channel, then **f(x)** will precede **f(y)** in the output. From 655065c13d88025ef68187db8851ca25ea31352b Mon Sep 17 00:00:00 2001 From: destel Date: Sat, 23 Nov 2024 16:20:21 +0200 Subject: [PATCH 15/18] Minor tweaks --- README.md | 23 +++++++++++++---------- example_test.go | 2 -- 2 files changed, 13 insertions(+), 12 deletions(-) diff --git a/README.md b/README.md index 7bb4343..44370c3 100644 --- a/README.md +++ b/README.md @@ -158,7 +158,7 @@ Performing all these updates individually may create too many concurrent queries In the example below, the updates are queued into `userIDsToUpdate` channel and then grouped into batches of up to 5 items, with each batch sent to the database as a single query. -The *Batch* functions is used with a timeout of 100ms, ensuring zero latency during high load, +The **Batch** function is used with a timeout of 100ms, ensuring zero latency during high load, and up to 100ms latency with smaller batches during quiet periods. [Try it](https://pkg.go.dev/github.com/destel/rill#example-package-BatchingRealTime) @@ -217,12 +217,12 @@ Rill provides a wide selection of blocking functions. Some of them are: [Example](https://pkg.go.dev/github.com/destel/rill#example-ForEach) - **ToSlice:** Collects all stream items into a slice. [Example](https://pkg.go.dev/github.com/destel/rill#example-ToSlice) -- **First:** Returns the first item or error encountered in the stream. +- **First:** Returns the first item or error encountered in the stream and discards the rest. [Example](https://pkg.go.dev/github.com/destel/rill#example-First) - **Reduce:** Concurrently reduces the stream to a single value, using a user provided reducer function. [Example](https://pkg.go.dev/github.com/destel/rill#example-Reduce) -- **Any:** Concurrently checks if at least one item in the stream satisfies a user provided condition. - [Example](https://pkg.go.dev/github.com/destel/rill#example-Any) +- **All:** Concurrently checks if all items in the stream satisfy a user provided condition. + [Example](https://pkg.go.dev/github.com/destel/rill#example-All) All blocking functions share a common behavior. In case of an early termination (before reaching the end of the input stream or in case of an error), @@ -247,13 +247,15 @@ func main() { fmt.Printf("Check result: %v\n", err) } -// CheckAllUsersExist uses several concurrent workers to checks if all users with given IDs exist. +// CheckAllUsersExist uses several concurrent workers to check if all users with given IDs exist. func CheckAllUsersExist(ctx context.Context, concurrency int, ids []int) error { + // Create new context that will be canceled when this function returns ctx, cancel := context.WithCancel(ctx) defer cancel() idsStream := rill.FromSlice(ids, nil) + // Fetch users concurrently. users := rill.Map(idsStream, concurrency, func(id int) (*mockapi.User, error) { u, err := mockapi.GetUser(ctx, id) if err != nil { @@ -264,6 +266,7 @@ func CheckAllUsersExist(ctx context.Context, concurrency int, ids []int) error { return u, nil }) + // Stop at first error and cancel remaining fetches via context return rill.Err(users) } ``` @@ -275,7 +278,7 @@ the results' order usually differs from the input order. This seemingly simple p While out-of-order results are acceptable in many scenarios, some cases require preserving the original order. To address this, Rill provides ordered versions of its core functions, such as **OrderedMap** or **OrderedFilter**. -These functions perform additional synchronization under the hood to ensure that if value **x** precedes value **y** in the input channel, +These functions perform additional synchronization under the hood to ensure that if value **x** precedes value **y** in the input stream, then **f(x)** will precede **f(y)** in the output. Here's a practical example: finding the first occurrence of a specific string among 1000 large files hosted online. @@ -354,7 +357,7 @@ Rill comes with the **Merge** function that combines multiple streams into a sin function that can combine streams is **FlatMap**. It's a powerful tool that transforms each input item into its own stream, and then merges all those streams together. -In the example below **FlatMap** transforms each department into its own stream of users, and then merges them all into a final unified stream. +In the example below **FlatMap** transforms each department into its own stream of users, and then merges them into a single user stream. Like other Rill functions, **FlatMap** gives full control over concurrency. In this particular case the concurrency level is 3, meaning that users are fetched from up to 3 departments at the same time. @@ -440,12 +443,12 @@ func main() { // Transform each number // Concurrency = 3 - results := rill.Map(numbers, 3, func(x int) (int, error) { - return doSomethingWithNumber(x), nil + squares := rill.Map(numbers, 3, func(x int) (int, error) { + return square(x), nil }) // Convert the stream into an iterator and use for-range to print the results - for val, err := range rill.ToSeq2(results) { + for val, err := range rill.ToSeq2(squares) { if err != nil { fmt.Println("Error:", err) break // cleanup is done regardless of early exit diff --git a/example_test.go b/example_test.go index bc34af6..8ae71cd 100644 --- a/example_test.go +++ b/example_test.go @@ -321,8 +321,6 @@ func CheckAllUsersExist(ctx context.Context, concurrency int, ids []int) error { idsStream := rill.FromSlice(ids, nil) // Fetch users concurrently. - // Prints messages for successfully fetched users to demonstrate - // how pipeline stops after first error. users := rill.Map(idsStream, concurrency, func(id int) (*mockapi.User, error) { u, err := mockapi.GetUser(ctx, id) if err != nil { From 6abc3b9e7cbf6626c5f4a988fb73e92ba2ee6268 Mon Sep 17 00:00:00 2001 From: destel Date: Sat, 23 Nov 2024 17:20:59 +0200 Subject: [PATCH 16/18] Clarified the Err function --- README.md | 4 +++- example_test.go | 2 +- 2 files changed, 4 insertions(+), 2 deletions(-) diff --git a/README.md b/README.md index 44370c3..00c7267 100644 --- a/README.md +++ b/README.md @@ -223,6 +223,8 @@ Rill provides a wide selection of blocking functions. Some of them are: [Example](https://pkg.go.dev/github.com/destel/rill#example-Reduce) - **All:** Concurrently checks if all items in the stream satisfy a user provided condition. [Example](https://pkg.go.dev/github.com/destel/rill#example-All) +- **Err:** Returns the first error encountered in the stream or nil, and discards the rest. + [Example](https://pkg.go.dev/github.com/destel/rill#example-Err) All blocking functions share a common behavior. In case of an early termination (before reaching the end of the input stream or in case of an error), @@ -266,7 +268,7 @@ func CheckAllUsersExist(ctx context.Context, concurrency int, ids []int) error { return u, nil }) - // Stop at first error and cancel remaining fetches via context + // Return the first error (if any) and cancel remaining fetches via context return rill.Err(users) } ``` diff --git a/example_test.go b/example_test.go index 8ae71cd..dba9c4c 100644 --- a/example_test.go +++ b/example_test.go @@ -331,7 +331,7 @@ func CheckAllUsersExist(ctx context.Context, concurrency int, ids []int) error { return u, nil }) - // Stop at first error and cancel remaining fetches via context + // Return the first error (if any) and cancel remaining fetches via context return rill.Err(users) } From 7d029f415d59e16de51c11e8260e43e11422db6e Mon Sep 17 00:00:00 2001 From: destel Date: Sat, 23 Nov 2024 20:54:07 +0200 Subject: [PATCH 17/18] Minor text tweaks --- README.md | 29 ++++++++++++++--------------- 1 file changed, 14 insertions(+), 15 deletions(-) diff --git a/README.md b/README.md index 00c7267..c5415e7 100644 --- a/README.md +++ b/README.md @@ -20,16 +20,15 @@ At the same time, developers retain full control over the concurrency level of a Most functions in the library take Go channels as inputs and return new, transformed channels as outputs. This allows them to be chained in various ways to build reusable pipelines from simpler parts, similar to Unix pipes. -As a result, concurrent tasks become clear sequences of reusable operations. +As a result, concurrent programs become clear sequences of reusable operations. - **Centralize error handling.** -Errors are automatically propagated through the pipeline and can be handled in a single place at the end. -For more complex scenarios, Rill also provides tools to intercept and handle errors at any point in the pipeline. +Errors are automatically propagated through a pipeline and can be handled in a single place at the end. +For more complex scenarios, Rill also provides tools to intercept and handle errors at any point in a pipeline. - **Simplify stream processing.** Thanks to Go channels, built-in functions can handle potentially infinite streams, processing items as they arrive. -This makes Rill a convenient tool for real-time data processing, handling large datasets that don't fit in memory, -or building responsive data pipelines. +This makes Rill a convenient tool for real-time processing or handling large datasets that don't fit in memory. - **Provide solutions for advanced tasks.** Beyond basic operations, the library includes ready-to-use functions for batching, ordered fan-in, map-reduce, @@ -87,12 +86,12 @@ func main() { ## Batching -Processing items in batches rather than individually can significantly improve performance across many scenarios, +Processing items in batches rather than individually can significantly improve performance in many scenarios, particularly when working with external services or databases. Batching reduces the number of queries and API calls, increases throughput, and typically lowers costs. To demonstrate batching, let's improve the previous example by using the API's bulk fetching capability. -The **Batch** function transforms a stream of individual IDs into a stream of batches, enabling the use of `GetUsers` API +The **Batch** function transforms a stream of individual IDs into a stream of slices. This enables the use of `GetUsers` API to fetch multiple users in a single call, instead of making individual `GetUser` calls. @@ -145,9 +144,9 @@ func main() { ## Real-Time Batching -Real-world applications often need to handle data that arrives at unpredictable rates. While batching is still +Real-world applications often need to handle events or data that arrives at unpredictable rates. While batching is still desirable for efficiency, waiting to collect a full batch might introduce unacceptable delays when -the input stream slows down or becomes sparse. +the input stream becomes slow or sparse. Rill solves this with timeout-based batching: batches are emitted either when they're full or after a specified timeout, whichever comes first. This approach ensures optimal batch sizes during high load while maintaining responsiveness during quiet periods. @@ -217,13 +216,13 @@ Rill provides a wide selection of blocking functions. Some of them are: [Example](https://pkg.go.dev/github.com/destel/rill#example-ForEach) - **ToSlice:** Collects all stream items into a slice. [Example](https://pkg.go.dev/github.com/destel/rill#example-ToSlice) -- **First:** Returns the first item or error encountered in the stream and discards the rest. +- **First:** Returns the first item or error encountered in the stream and discards the rest [Example](https://pkg.go.dev/github.com/destel/rill#example-First) - **Reduce:** Concurrently reduces the stream to a single value, using a user provided reducer function. [Example](https://pkg.go.dev/github.com/destel/rill#example-Reduce) - **All:** Concurrently checks if all items in the stream satisfy a user provided condition. [Example](https://pkg.go.dev/github.com/destel/rill#example-All) -- **Err:** Returns the first error encountered in the stream or nil, and discards the rest. +- **Err:** Returns the first error encountered in the stream or nil, and discards the rest of the stream. [Example](https://pkg.go.dev/github.com/destel/rill#example-Err) @@ -276,8 +275,8 @@ func CheckAllUsersExist(ctx context.Context, concurrency int, ids []int) error { ## Order Preservation (Ordered Fan-In) Concurrent processing can boost performance, but since tasks take different amounts of time to complete, -the results' order usually differs from the input order. This seemingly simple problem is deceptively challenging to solve correctly. -While out-of-order results are acceptable in many scenarios, some cases require preserving the original order. +the results' order usually differs from the input order. While out-of-order results are acceptable in many scenarios, +some cases require preserving the original order. This seemingly simple problem is deceptively challenging to solve correctly. To address this, Rill provides ordered versions of its core functions, such as **OrderedMap** or **OrderedFilter**. These functions perform additional synchronization under the hood to ensure that if value **x** precedes value **y** in the input stream, @@ -357,9 +356,9 @@ func streamNumbers(ctx context.Context, start, end int) <-chan rill.Try[int] { ## Stream Merging and FlatMap Rill comes with the **Merge** function that combines multiple streams into a single one. Another, often overlooked, function that can combine streams is **FlatMap**. It's a powerful tool that transforms each input item into its own stream, -and then merges all those streams together. +and then merges all these streams together. -In the example below **FlatMap** transforms each department into its own stream of users, and then merges them into a single user stream. +In the example below, **FlatMap** transforms each department into a stream of users, then merges these streams into one. Like other Rill functions, **FlatMap** gives full control over concurrency. In this particular case the concurrency level is 3, meaning that users are fetched from up to 3 departments at the same time. From 8078601d8cd108ad7a0c10c52f7e87423a656fcf Mon Sep 17 00:00:00 2001 From: destel Date: Sat, 23 Nov 2024 21:00:03 +0200 Subject: [PATCH 18/18] Minor text tweaks --- README.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/README.md b/README.md index c5415e7..862d395 100644 --- a/README.md +++ b/README.md @@ -360,7 +360,7 @@ and then merges all these streams together. In the example below, **FlatMap** transforms each department into a stream of users, then merges these streams into one. Like other Rill functions, **FlatMap** gives full control over concurrency. -In this particular case the concurrency level is 3, meaning that users are fetched from up to 3 departments at the same time. +In this particular case the concurrency level is 3, meaning that users are fetched from at most 3 departments at the same time. Additionally, this example demonstrates how to write a reusable streaming wrapper over paginated API calls - the `StreamUsers` function. This wrapper can be useful both on its own and as part of larger pipelines.