diff --git a/.github/workflows/check.yaml b/.github/workflows/check.yaml index 246f618..f75369b 100644 --- a/.github/workflows/check.yaml +++ b/.github/workflows/check.yaml @@ -34,7 +34,7 @@ jobs: go-version: '1.23.x' - name: Run coverage - run: go test -coverprofile=coverage.out -covermode=atomic $(go list ./... | grep -v internal/th) + run: go test -coverprofile=coverage.out -covermode=atomic $(go list ./... | grep -v internal/th | grep -v mockapi) - name: Upload coverage reports to Codecov uses: codecov/codecov-action@v4.0.1 diff --git a/README.md b/README.md index 074d36d..f6ae4f7 100644 --- a/README.md +++ b/README.md @@ -1,404 +1,444 @@ # Rill [![GoDoc](https://pkg.go.dev/badge/github.com/destel/rill)](https://pkg.go.dev/github.com/destel/rill) [![Go Report Card](https://goreportcard.com/badge/github.com/destel/rill)](https://goreportcard.com/report/github.com/destel/rill) [![codecov](https://codecov.io/gh/destel/rill/graph/badge.svg?token=252K8OQ7E1)](https://codecov.io/gh/destel/rill) [![Mentioned in Awesome Go](https://awesome.re/mentioned-badge.svg)](https://github.com/avelino/awesome-go) -Rill (noun: a small stream) is a Go toolkit that offers a collection of easy-to-use functions for concurrency, streaming, -batching and pipeline construction. It abstracts away the complexities of concurrency, removes boilerplate, -provides a structured way to handle errors and allows developers to focus on core logic. -Whether you need to perform a basic concurrent ForEach or construct a complex multi-stage processing pipeline, -Rill has got you covered. - +Rill is a toolkit that brings composable concurrency to Go, making it easier to build concurrent programs from simple, reusable parts. +It reduces boilerplate while preserving Go's natural channel-based model. - -## Key Features -- **Easy to Use**: the complexity of managing goroutines, channels, wait groups, and atomics is abstracted away -- **Easy to Integrate**: seamlessly integrates into existing projects without any setup or configuration -- **Concurrent**: provides control over the level of concurrency for all operations -- **Error Handling**: provides a structured way to handle errors in concurrent applications -- **Streaming**: handles real-time data streams or large datasets with a minimal memory footprint -- **Modular**: allows composing functions to create custom pipelines and higher-order operations -- **Batching**: simplifies organizing and processing data in batches -- **Order Preservation**: provides functions that maintain the original order of data during concurrent processing -- **Efficient Resource Use**: ensures goroutine pool sizes and memory allocations are independent of input size -- **Generic**: all operations are type-safe and can be used with any data type +```bash +go get -u github.com/destel/rill +``` +## Goals -## Motivation -Rill might look like an iterator or functional programming library, but -at its core it's a concurrency library made specifically for Go channels. +- **Make common tasks easier.** +Rill provides a cleaner way of solving common concurrency problems, such as +processing slices and channels, calling APIs, or making DB queries in parallel. +It removes boilerplate and abstracts away the complexities of goroutine orchestration and error handling. +At the same time, developers retain full control over the concurrency level of all operations. -There is a consensus in the Go community that functional programming style operations like Map, Filter, ForEach and others are -not idiomatic in Go, and that basic for-loops are better, faster, and more concise. This is true for slices, -but for channels, the complexity can quickly escalate beyond a basic for-loop as requirements are added: +- **Make concurrent code composable and clean.** +Most functions in the library take Go channels as inputs and return new, transformed channels as outputs. +This allows them to be chained in various ways to build reusable pipelines from simpler parts, +similar to Unix pipes. +As a result, concurrent tasks become clear sequences of reusable operations. -- A basic for-range loop is sufficient to iterate over a channel -- Adding concurrency requires goroutines and a WaitGroup -- Adding error handling means replacing WaitGroup with ErrGroup -- Controlling concurrency level is often done with a semaphore -- If the above approach becomes a bottleneck, worker pools must be introduced and managed manually -- For a multi-stage pipeline, everything must be manually managed at each stage, causing complexity to grow non-linearly -- Features like batching or ordered fan-in require even more complex orchestration and synchronization +- **Centralize error handling.** +Errors are automatically propagated through the pipeline and can be handled in a single place at the end. +For more complex scenarios, Rill also provides tools to intercept and handle errors at any point in the pipeline. -These increasing levels of complexity introduce several challenges. While tools like channels, ErrGroups or semaphores are powerful on their own, -combining them into a more complex logic, can lead to code with lots of boilerplate that's difficult to write, read, and maintain. +- **Simplify stream processing.** +Thanks to Go channels, built-in functions can handle potentially infinite streams, processing items as they arrive. +This makes Rill a convenient tool for real-time data processing, handling large datasets that don't fit in memory, +or building responsive data pipelines. -Rill was born out of the desire to remove code duplication and to encapsulate all this complexity in a library with a -simple, composable, and expressive API. The introduction of generics in Go 1.18 opened the door to creating -functional-style operations on channels, providing a natural way to achieve this goal. +- **Provide solutions for advanced tasks.** +Beyond basic operations, the library includes ready-to-use functions for batching, ordered fan-in, map-reduce, +stream splitting, merging, and more. Pipelines, while usually linear, +can have any topology forming a directed acyclic graph (DAG). +- **Support custom extensions.** +Since Rill operates on standard Go channels, it's easy to write custom functions compatible with the library. +- **Keep it lightweight.** +Rill has a small, type-safe, channel-based API, and zero dependencies, making it straightforward to integrate into existing projects. +It's also lightweight in terms of resource usage, ensuring that the number of memory allocations and goroutines +does not grow with the input size. -## Example Usage -Consider an example application that loads users from an API concurrently, -updates their status to active and saves them back, -while controlling the level of concurrency for each operation. -[Full runnable example](https://pkg.go.dev/github.com/destel/rill#example-package) +## Quick Start +Rill makes it easy to process data concurrently. +Here's a simple example using **ForEach** to process items in parallel while handling errors: +[Try it](https://pkg.go.dev/github.com/destel/rill#example-ForEach) ```go func main() { - // In case of early exit this will cancel the user fetching, - // which in turn will terminate the entire pipeline. - ctx, cancel := context.WithCancel(context.Background()) - defer cancel() - - // Start with a stream of user ids - ids := rill.FromSlice([]int{1, 2, 3, 4, 5, 6, 7, 8, 9, 10}, nil) + // Convert a slice of numbers into a channel + numbers := rill.FromSlice([]int{1, 2, 3, 4, 5, 6, 7, 8, 9, 10}, nil) - // Read users from the API. + // Do something with each number and print the result // Concurrency = 3 - users := rill.Map(ids, 3, func(id int) (*User, error) { - return getUser(ctx, id) - }) - - // Activate users. - // Concurrency = 2 - err := rill.ForEach(users, 2, func(u *User) error { - if u.IsActive { - fmt.Printf("User %d is already active\n", u.ID) - return nil - } - - u.IsActive = true - return saveUser(ctx, u) + err := rill.ForEach(numbers, 3, func(x int) error { + y := doSomethingWithNumber(x) + fmt.Println(y) + return nil }) + // Handle errors fmt.Println("Error:", err) } ``` -## Installation -```bash -go get -u github.com/destel/rill -``` - - -## Testing Strategy -Rill has a test coverage of over 95%, with testing focused on: -- **Correctness**: ensuring that functions produce accurate results at different levels of concurrency -- **Concurrency**: confirming that correct number of goroutines is spawned and utilized -- **Ordering**: ensuring that ordered versions of functions preserve the order, while basic versions do not +## Multi-Stage Pipelines +The result as above can also be achieved with WaitGroup or ErrGroup, +but Rill shines when building complex multi-stage concurrent pipelines. +The next example demonstrates a multi-stage pipeline that fetches users from an external API in batches, +updates their status to active, and saves them back, while controlling the level of concurrency at each step. - - -## Design Philosophy -At the heart of rill lies a simple yet powerful concept: operating on channels of wrapped values, encapsulated by the **Try** container. -This allows to propagate both values and errors through the pipeline, ensuring that errors are handled correctly at each stage. -Such wrapped channels can be created manually or through utilities like **FromSlice** or **FromChan**, and then transformed via non-blocking -functions like **Map** or **Filter**. Finally, the transformed stream can be consumed by a blocking function such as -**ForEach**, **Reduce** or **MapReduce** - -One of the key features of Rill is the ability to control the level of concurrency for almost all operations through the **n** parameter. -This is possible due to the channel and goroutine orchestration that library does under the hood. Rill's built-in functions manage -worker pools internally, making the number of goroutines and allocations independent of the input size. - -Finally, rill is designed to be modular and extensible. Most functions take streams as input and return transformed streams as output. -It's easy to create custom reusable higher-order operations and pipelines by combining existing ones. - - - - -## Batching -Batching is a common pattern in concurrent processing, especially when dealing with external services or databases. -Rill provides a **Batch** function that transforms a stream of items into a stream of batches of a specified size. It's also possible -to specify a timeout, after which the batch is emitted even if it's not full. This is useful for keeping an application reactive -when input stream is slow or sparse. - -Consider a modification of the previous example, where list of ids is streamed from a remote file, -and users are fetched from the API in batches. - -[Full runnable example](https://pkg.go.dev/github.com/destel/rill#example-package-Batching) - +[Try it](https://pkg.go.dev/github.com/destel/rill#example-package-Batching) ```go func main() { - // In case of early exit this will cancel the file streaming, - // which in turn will terminate the entire pipeline. - ctx, cancel := context.WithCancel(context.Background()) - defer cancel() - - // Stream a file with user ids as an io.Reader - reader, err := downloadFile(ctx, "http://example.com/user_ids1.txt") - if err != nil { - fmt.Println("Error:", err) - return - } - - // Transform the reader into a stream of lines - lines := streamLines(reader) + ctx := context.Background() - // Parse lines as integers - // Concurrency = 3 - ids := rill.Map(lines, 3, func(line string) (int, error) { - return strconv.Atoi(line) - }) + // Convert a slice of user IDs into a channel + ids := rill.FromSlice([]int{ + 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18, 19, 20, + 21, 22, 23, 24, 25, 26, 27, 28, 29, 30, 31, 32, 33, 34, 35, 36, 37, 38, 39, 40, + }, nil) - // Group IDs into batches of 5 for bulk processing + // Group IDs into batches of 5 idBatches := rill.Batch(ids, 5, 1*time.Second) - // Fetch users for each batch of IDs + // Bulk fetch users from the API // Concurrency = 3 - userBatches := rill.Map(idBatches, 3, func(ids []int) ([]*User, error) { - return getUsers(ctx, ids...) + userBatches := rill.Map(idBatches, 3, func(ids []int) ([]*mockapi.User, error) { + return mockapi.GetUsers(ctx, ids) }) - // Transform batches back into a stream of users + // Transform the stream of batches back into a flat stream of users users := rill.Unbatch(userBatches) // Activate users. // Concurrency = 2 - err = rill.ForEach(users, 2, func(u *User) error { + err := rill.ForEach(users, 2, func(u *mockapi.User) error { if u.IsActive { fmt.Printf("User %d is already active\n", u.ID) return nil } u.IsActive = true - return saveUser(ctx, u) + err := mockapi.SaveUser(ctx, u) + if err != nil { + return err + } + + fmt.Printf("User saved: %+v\n", u) + return nil }) - fmt.Println("Error:", err) + // Handle errors + fmt.Println("Error:", err) } ``` +## Batching +When working with external services or databases, batching is a common pattern to reduce the number of requests and improve performance. +Rill provides a **Batch** function that transforms a stream of items into a stream of batches of a specified size. It's also possible +to specify a timeout, after which a batch is emitted even if it's not full. This is useful for keeping an application reactive +when the input stream is slow or sparse. -## Fan-in and Fan-out -Go channels support both Fan-in and Fan-out patterns, meaning that multiple goroutines can write to a single channel (fan-in) -or read from a single channel (fan-out). On top of that Rill adds a Merge function that can be used to combine multiple streams into a single one. +Previous examples have already shown how to use **Batch** to group user IDs for bulk fetching. +Let's examine a case where batching with timeout is particularly useful. -Consider a basic example application that concurrently sends messages through multiple servers, then collects the results -into a single stream and handles errors. +In the example below, the `UpdateUserTimestamp` function updates the _last_active_at_ column in the _users_ table with the current timestamp. +This function is called concurrently from multiple places in the application, such as HTTP handlers. +A large number of such calls would cause a large number of concurrent SQL queries, potentially overwhelming the database. -[Full runnable example](https://pkg.go.dev/github.com/destel/rill#example-package-FanIn_FanOut) +To mitigate this, it's possible to group the updates and send them to the database in bulk using the **Batch** function. +And when updates are sparse, the _timeout_ setting makes sure they're delayed by at most 100ms, +balancing between reducing database load and data freshness. +[Try it](https://pkg.go.dev/github.com/destel/rill#example-package-BatchingWithTimeout) ```go func main() { - messages := rill.FromSlice([]string{ - "message1", "message2", "message3", "message4", "message5", - "message6", "message7", "message8", "message9", "message10", - }, nil) + // Start the background worker that processes the updates + go updateUserTimestampWorker() + + // Do some updates. They'll be automatically grouped into + // batches: [1,2,3,4,5], [6,7], [8] + UpdateUserTimestamp(1) + UpdateUserTimestamp(2) + UpdateUserTimestamp(3) + UpdateUserTimestamp(4) + UpdateUserTimestamp(5) + UpdateUserTimestamp(6) + UpdateUserTimestamp(7) + time.Sleep(500 * time.Millisecond) // simulate sparse updates + UpdateUserTimestamp(8) +} - // Fan-out the messages to three servers - results1 := rill.Map(messages, 2, func(message string) (string, error) { - return message, sendMessage(message, "server1") - }) +// This is the queue of user IDs to update. +var userIDsToUpdate = make(chan int) - results2 := rill.Map(messages, 2, func(message string) (string, error) { - return message, sendMessage(message, "server2") - }) +// UpdateUserTimestamp is the public API for updating the last_active_at column in the users table +func UpdateUserTimestamp(userID int) { + userIDsToUpdate <- userID +} - results3 := rill.Map(messages, 2, func(message string) (string, error) { - return message, sendMessage(message, "server3") - }) +// This is a background worker that sends queued updates to the database in batches. +// For simplicity, there are no retries, error handling and synchronization +func updateUserTimestampWorker() { - // Fan-in the results from all servers into a single stream - results := rill.Merge(results1, results2, results3) + ids := rill.FromChan(userIDsToUpdate, nil) - // Handle errors - err := rill.Err(results) - fmt.Println("Error:", err) + idBatches := rill.Batch(ids, 5, 100*time.Millisecond) + + _ = rill.ForEach(idBatches, 1, func(batch []int) error { + fmt.Printf("Executed: UPDATE users SET last_active_at = NOW() WHERE id IN (%v)\n", batch) + return nil + }) } ``` + + ## Errors, Termination and Contexts -Error handling can be non-trivial in concurrent applications. Rill simplifies this by providing a structured error handling approach. -Usually rill pipelines consist of zero or more non-blocking stages that transform the input stream, -and one blocking stage that returns the results. General rule is: any error happening anywhere in the pipeline is -propagated down to the final stage, where it's caught by some blocking function and returned to the caller. +Error handling can be non-trivial in concurrent applications. Rill simplifies this by providing a structured approach to the problem. +Pipelines typically consist of a sequence of non-blocking channel transformations, followed by a blocking stage that returns a final result and an error. +The general rule is: any error occurring anywhere in a pipeline is propagated down to the final stage, +where it's caught by some blocking function and returned to the caller. -Rill provides several blocking functions out of the box: +Rill provides a wide selection of blocking functions. Some of them are: -- **ForEach:** Concurrently applies a user function to each item in the stream. +- **ForEach:** Concurrently applies a user function to each item in the stream. [Example](https://pkg.go.dev/github.com/destel/rill#example-ForEach) - **ToSlice:** Collects all stream items into a slice. [Example](https://pkg.go.dev/github.com/destel/rill#example-ToSlice) -- **ToSeq2:** Converts a stream into an iterator of value-error pairs. - [Example](https://pkg.go.dev/github.com/destel/rill#example-ToSeq2) +- **First:** Returns the first item or error encountered in the stream. + [Example](https://pkg.go.dev/github.com/destel/rill#example-First) - **Reduce:** Concurrently reduces the stream to a single value, using a user provided reducer function. [Example](https://pkg.go.dev/github.com/destel/rill#example-Reduce) -- **MapReduce:** Performs a concurrent MapReduce operation one the stream, reducing it to Go map, - using user provided mapper and reducer functions. - [Example](https://pkg.go.dev/github.com/destel/rill#example-MapReduce) -- **All:** Concurrently checks if all items in the stream satisfy a user provided condition. - [Example](https://pkg.go.dev/github.com/destel/rill#example-All) - **Any:** Concurrently checks if at least one item in the stream satisfies a user provided condition. [Example](https://pkg.go.dev/github.com/destel/rill#example-Any) -- **First:** Returns the first item or error encountered in the stream. - [Example](https://pkg.go.dev/github.com/destel/rill#example-First) -- **Err:** Checks if there's an error somewhere in the stream an returns it. - [Example](https://pkg.go.dev/github.com/destel/rill#example-Err) - -All blocking functions share a common behavior. In case of an early termination (before reaching the end of the input stream), -such functions initiate background draining of the remaining items. This is done to prevent goroutine leaks by ensuring that -all goroutines feeding the stream are allowed to complete. -The input stream should not be used anymore after calling a blocking function. -It also possible to use a for-range loop instead of a blocking function to consume the stream. -In this case, the caller would be responsible for draining the stream in case of an early termination. -See more details in the package documentation. +All blocking functions share a common behavior. In case of an early termination (before reaching the end of the input stream or in case of an error), +such functions initiate background draining of the remaining items. This is done to prevent goroutine leaks by ensuring that +all goroutines feeding the stream are allowed to complete. -Rill is context-agnostic, meaning that it does not enforce any specific context usage. +Rill is context-agnostic, meaning that it does not enforce any specific context usage. However, it's recommended to make user-defined pipeline stages context-aware. -This is especially important for the initial stage, as it allows to finish background draining -process, described above, faster. +This is especially important for the initial stage, as it allows to stop feeding the pipeline with new items when the context is canceled. + +In the example below the `FindFirstPrime` function uses several concurrent workers to find the first prime number after +a given number. Internally it creates an infinite stream of numbers. When the first prime number is found +in that stream, the context gets canceled, and the pipeline terminates gracefully. + +[Try it](https://pkg.go.dev/github.com/destel/rill#example-package-Context) +```go +func main() { + p := FindFirstPrime(10000, 3) // Use 3 concurrent workers + fmt.Println("The first prime after 10000 is", p) +} -In the example below the printOddSquares function initiates a pipeline that depends on a context. -When an error occurs in one of the pipeline stages, it propagates down the pipeline, causing an early return, -context cancellation (via defer) and resource cleanup. +// FindFirstPrime finds the first prime number after the given number, using several concurrent workers. +func FindFirstPrime(after int, concurrency int) int { + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() -[Full runnable example](https://pkg.go.dev/github.com/destel/rill#example-package-Context) + numbers := make(chan rill.Try[int]) + go func() { + defer close(numbers) + for i := after + 1; ; i++ { + select { + case <-ctx.Done(): + return + case numbers <- rill.Wrap(i, nil): + } + } + }() + + primes := rill.OrderedFilter(numbers, concurrency, func(x int) (bool, error) { + fmt.Println("Checking", x) + return isPrime(x), nil + }) + + result, _, _ := rill.First(primes) + return result +} +``` + +## Boosting Sequential Operations +There is a technique that significantly accelerates some seemingly sequential operations by +branching them into multiple parallel streams and then merging the results. +Common examples include listing S3 objects, querying APIs, or reading from databases. + +Example below fetches all users from an external paginated API. Doing it sequentially, page-by-page, +would take a long time since the API is slow and the number of pages is large. +One way to speed this up is to fetch users from multiple departments at the same time. +The code below uses **FlatMap** to stream users from 3 departments concurrently and merge the results as they arrive, +achieving up to 3x speedup compared to sequential processing. + +Additionally, it demonstrates how to write a custom reusable streaming wrapper around an existing API function. +The `StreamUsers` function is useful on its own, but can also be a part of a larger pipeline. + +[Try it](https://pkg.go.dev/github.com/destel/rill#example-package-ParallelStreams) ```go func main() { ctx := context.Background() - err := printOddSquares(ctx) + // Convert a list of all departments into a stream + departments := rill.FromSlice(mockapi.GetDepartments()) + + // Use FlatMap to stream users from 3 departments concurrently. + users := rill.FlatMap(departments, 3, func(department string) <-chan rill.Try[*mockapi.User] { + return StreamUsers(ctx, &mockapi.UserQuery{Department: department}) + }) + + // Print the users from the combined stream + err := rill.ForEach(users, 1, func(user *mockapi.User) error { + fmt.Printf("%+v\n", user) + return nil + }) fmt.Println("Error:", err) } -func printOddSquares(ctx context.Context) error { - ctx, cancel := context.WithCancel(ctx) - defer cancel() +// StreamUsers is a reusable streaming wrapper around the mockapi.ListUsers function. +// It iterates through all listing pages and returns a stream of users. +// This function is useful on its own or as a building block for more complex pipelines. +func StreamUsers(ctx context.Context, query *mockapi.UserQuery) <-chan rill.Try[*mockapi.User] { + res := make(chan rill.Try[*mockapi.User]) + + if query == nil { + query = &mockapi.UserQuery{} + } + + go func() { + defer close(res) + + for page := 0; ; page++ { + query.Page = page + + users, err := mockapi.ListUsers(ctx, query) + if err != nil { + res <- rill.Wrap[*mockapi.User](nil, err) + return + } - numbers := infiniteNumberStream(ctx) + if len(users) == 0 { + break + } - odds := rill.Filter(numbers, 3, func(x int) (bool, error) { - if x == 20 { - return false, fmt.Errorf("early exit") + for _, user := range users { + res <- rill.Wrap(user, nil) + } } - return x%2 == 1, nil - }) + }() - return rill.ForEach(odds, 3, func(x int) error { - fmt.Println(x * x) - return nil - }) + return res } ``` + -## Order Preservation -In concurrent applications, maintaining the original sequence of processed items is challenging due to the nature of parallel execution. -When values are read from an input stream, concurrently processed through a function **f**, and written to an output stream, their order might not -match the order of the input. To address this, rill provides ordered versions of its core functions, such as **OrderedMap** or **OrderedFilter**. -These ensure that if value **x** precedes value **y** in the input channel, then **f(x)** will precede **f(y)** in the output, -preserving the original order. It's important to note that these ordered functions have a small overhead compared to their unordered counterparts, -due to more advanced orchestration and synchronization happening under the hood. +## Order Preservation (Ordered Fan-In) +Concurrent processing can boost performance, but since tasks take different amounts of time to complete, +the results' order usually differs from the input order. This seemingly simple problem is deceptively challenging to solve correctly, +especially at scale. +While out-of-order results are acceptable in many scenarios, some cases require preserving the original order. -Order preservation is vital in scenarios where the sequence of data impacts the outcome, such as time-series data processing. -Take, for instance, an application that retrieves daily temperature measurements over a specific period and calculates the change -in temperature from one day to the next. Such application can benefit from concurrent data fetching, but need fetched data -to be processed in the correct order. +To address this, rill provides ordered versions of its core functions, such as **OrderedMap** or **OrderedFilter**. +These functions perform additional synchronization under the hood to ensure that if value **x** precedes value **y** in the input channel, +then **f(x)** will precede **f(y)** in the output. -[Full runnable example](https://pkg.go.dev/github.com/destel/rill#example-package-Ordering) +Here's a practical example: finding the first occurrence of a specific string among 1000 large files hosted online. +Downloading all files at once would consume too much memory, processing them sequentially would be too slow, +and traditional concurrency patterns do not preserve the order of files, making it challenging to find the first match. -```go -type Measurement struct { - Date time.Time - Temp float64 -} +The combination of **OrderedFilter** and **First** functions solves this elegantly, +while downloading and keeping in memory at most 5 files at a time. +[Try it](https://pkg.go.dev/github.com/destel/rill#example-package-Ordering) + +```go func main() { - city := "New York" - endDate := time.Now() - startDate := endDate.AddDate(0, 0, -30) + ctx := context.Background() + + // The string to search for in the downloaded files + needle := []byte("26") - // Create a stream of all days between startDate and endDate - days := make(chan rill.Try[time.Time]) + // Manually generate a stream of URLs from http://example.com/file-0.txt to http://example.com/file-999.txt + urls := make(chan rill.Try[string]) go func() { - defer close(days) - for date := startDate; date.Before(endDate); date = date.AddDate(0, 0, 1) { - days <- rill.Wrap(date, nil) + defer close(urls) + for i := 0; i < 1000; i++ { + // Stop generating URLs after the context is canceled (when the file is found) + // This can be rewritten as a select statement, but it's not necessary + if err := ctx.Err(); err != nil { + return + } + + urls <- rill.Wrap(fmt.Sprintf("https://example.com/file-%d.txt", i), nil) } }() - // Fetch the temperature for each day from the API - // Concurrency = 10; Ordered - measurements := rill.OrderedMap(days, 10, func(date time.Time) (Measurement, error) { - temp, err := getTemperature(city, date) - return Measurement{Date: date, Temp: temp}, err - }) + // Download and process the files + // At most 5 files are downloaded and held in memory at the same time + matchedUrls := rill.OrderedFilter(urls, 5, func(url string) (bool, error) { + fmt.Println("Downloading:", url) - // Iterate over the measurements, calculate and print changes. - // Concurrency = 1; Ordered - prev := Measurement{Temp: math.NaN()} - err := rill.ForEach(measurements, 1, func(m Measurement) error { - change := m.Temp - prev.Temp - prev = m + content, err := mockapi.DownloadFile(ctx, url) + if err != nil { + return false, err + } - fmt.Printf("%s: %.1f°C (change %+.1f°C)\n", m.Date.Format("2006-01-02"), m.Temp, change) - return nil + // keep only URLs of files that contain the needle + return bytes.Contains(content, needle), nil }) - fmt.Println("Error:", err) + // Find the first matched URL + firstMatchedUrl, found, err := rill.First(matchedUrls) + if err != nil { + fmt.Println("Error:", err) + return + } + + // Print the result + if found { + fmt.Println("Found in:", firstMatchedUrl) + } else { + fmt.Println("Not found") + } } ``` ## Go 1.23 Iterators -Starting from Go 1.23, the language supports *range over function*, allowing users to define custom iterators +Starting from Go 1.23, the language adds *range-over-function* feature, allowing users to define custom iterators for use in for-range loops. This feature enables Rill to integrate seamlessly with existing iterator-based functions in the standard library and third-party packages. -Rill provides **FromSeq** and **FromSeq2** functions to convert an iterator into -a stream. Additionally, there's a **ToSeq2** function to convert a stream back into an iterator. +Rill provides **FromSeq** and **FromSeq2** functions to convert an iterator into a stream, +and **ToSeq2** function to convert a stream back into an iterator. **ToSeq2** can be a good alternative to **ForEach** when concurrency is not needed. It gives more control and performs all necessary cleanup and draining, even if the loop is terminated early using *break* or *return*. -[Full runnable example](https://pkg.go.dev/github.com/destel/rill#example-ToSeq2) +[Try it](https://pkg.go.dev/github.com/destel/rill#example-ToSeq2) ```go func main() { - nums := rill.FromSeq2(genPositive(40)) - squares := rill.Map(nums, 4, func(x int) (int, error) { - return x * x, nil + // Convert a slice of numbers into a stream + numbers := rill.FromSlice([]int{1, 2, 3, 4, 5, 6, 7, 8, 9, 10}, nil) + + // Transform each number + // Concurrency = 3 + results := rill.Map(numbers, 3, func(x int) (int, error) { + return doSomethingWithNumber(x), nil }) - for val, err := range rill.ToSeq2(squares) { - fmt.Println(val, err) - } -} -func genPositive(to int) iter.Seq2[int, error] { - return func(yield func(i int, err error) bool) { - for i := 1; i <= to; i++ { - if !yield(i, nil) { - return - } + // Convert the stream into an iterator and use for-range to print the results + for val, err := range rill.ToSeq2(results) { + if err != nil { + fmt.Println("Error:", err) + break // cleanup is done regardless of early exit } + fmt.Printf("%+v\n", val) } } ``` +## Testing Strategy +Rill has a test coverage of over 95%, with testing focused on: +- **Correctness**: ensuring that functions produce accurate results at different levels of concurrency +- **Concurrency**: confirming that correct number of goroutines is spawned and utilized +- **Ordering**: ensuring that ordered versions of functions preserve the order, while basic versions do not -## Performance -Rill's performance is primarily bounded by the performance of channel operations. As a result, applications that -already use channels and channel-based concurrency patterns can expect minimal overhead when adopting Rill. -Additionally, Rill does fewer allocations compared to some traditional concurrency patterns that spawn a goroutine -for each channel item and use a semaphore to control the level of concurrency. For more details, refer to the [benchmarks](https://github.com/destel/rill/wiki/Benchmarks). - -This makes Rill well-suited for a wide range of tasks, especially I/O-bound workloads where the overhead of channel operations is typically negligible. +## Contributing +Contributions are welcome! Whether it's reporting a bug, suggesting a feature, or submitting a pull request, your support helps improve Rill. +Please ensure that your code adheres to the existing style and includes relevant tests._ diff --git a/doc.go b/doc.go index d99dd33..3a8d288 100644 --- a/doc.go +++ b/doc.go @@ -1,7 +1,7 @@ -// Package rill is a collection of easy-to-use functions for concurrency, streaming, batching and pipeline construction. -// It abstracts away the complexities of concurrency, removes boilerplate, and provides a structured way to handle errors. -// Rill is modular and can be easily integrated into existing projects: it requires no setup and allows using only the necessary functions. -// At the same time, rill's functions can be composed into complex, concurrent, and reusable pipelines when needed. +// Package rill provides composable channel-based concurrency primitives for Go that simplify parallel processing, +// batching, and stream handling. It offers building blocks for constructing concurrent pipelines from +// reusable parts while maintaining precise control over concurrency levels. The package reduces boilerplate, +// abstracts away goroutine orchestration, features centralized error handling, and has zero external dependencies. // // # Streams and Try Containers // @@ -17,7 +17,7 @@ // They do not block and return the output stream immediately. All the processing is done in the background by the goroutine pools they spawn. // These functions forward all errors from the input stream to the output stream. // Any errors returned by the user-provided functions are also sent to the output stream. -// When such function reaches the end of the input stream, it closes the output stream, stops processing and cleans up resources. +// When such a function reaches the end of the input stream, it closes the output stream, stops processing and cleans up resources. // // Such functions are designed to be composed together to build complex processing pipelines: // @@ -55,7 +55,7 @@ // // # Unordered functions // -// Functions such as [Map], [Filter] and [FlatMap] write items to the output stream as soon as they become available. +// Functions such as [Map], [Filter], and [FlatMap] write items to the output stream as soon as they become available. // Due to the concurrent nature of these functions, the order of items in the output stream may not match the order of items in the input stream. // These functions prioritize performance and concurrency over maintaining the original order. // @@ -75,5 +75,5 @@ // This allows the pipeline to terminate after the first error is encountered and return it to the caller. // // In cases where more complex error handling logic is required, the [Catch] function can be used. -// It allows to catch and handle errors at any point in the pipeline, providing the flexibility to handle not only the first error, but any of them. +// It can catch and handle errors at any point in the pipeline, providing the flexibility to handle not only the first error, but any of them. package rill diff --git a/example123_test.go b/example123_test.go index 4bffb89..814b3aa 100644 --- a/example123_test.go +++ b/example123_test.go @@ -4,27 +4,65 @@ package rill_test import ( "fmt" - "iter" + "slices" "github.com/destel/rill" ) -func ExampleToSeq2() { - nums := rill.FromSeq2(genPositive(40)) - squares := rill.Map(nums, 4, func(x int) (int, error) { - return x * x, nil +func ExampleFromSeq() { + // Start with an iterator that yields numbers from 1 to 10 + numbersSeq := slices.Values([]int{1, 2, 3, 4, 5, 6, 7, 8, 9, 10}) + + // Convert the iterator into a stream + numbers := rill.FromSeq(numbersSeq, nil) + + // Transform each number + // Concurrency = 3 + results := rill.Map(numbers, 3, func(x int) (int, error) { + return doSomethingWithNumber(x), nil }) - for val, err := range rill.ToSeq2(squares) { - fmt.Println(val, err) - } + + printStream(results) } -func genPositive(to int) iter.Seq2[int, error] { - return func(yield func(i int, err error) bool) { - for i := 1; i <= to; i++ { +func ExampleFromSeq2() { + // Create an iter.Seq2 iterator that yields numbers from 1 to 10 + numberSeq := func(yield func(int, error) bool) { + for i := 1; i <= 10; i++ { if !yield(i, nil) { return } } } + + // Convert the iterator into a stream + numbers := rill.FromSeq2(numberSeq) + + // Transform each number + // Concurrency = 3 + results := rill.Map(numbers, 3, func(x int) (int, error) { + return doSomethingWithNumber(x), nil + }) + + printStream(results) +} + +func ExampleToSeq2() { + // Convert a slice of numbers into a stream + numbers := rill.FromSlice([]int{1, 2, 3, 4, 5, 6, 7, 8, 9, 10}, nil) + + // Transform each number + // Concurrency = 3 + results := rill.Map(numbers, 3, func(x int) (int, error) { + return doSomethingWithNumber(x), nil + }) + + // Convert the stream into an iterator and use for-range to print the results + for val, err := range rill.ToSeq2(results) { + if err != nil { + fmt.Println("Error:", err) + break // cleanup is done regardless of early exit + } + fmt.Printf("%+v\n", val) + } } diff --git a/example_test.go b/example_test.go index 015f97d..f2f4666 100644 --- a/example_test.go +++ b/example_test.go @@ -1,13 +1,10 @@ package rill_test import ( - "bufio" + "bytes" "context" "errors" "fmt" - "hash/fnv" - "io" - "math" "math/rand" "regexp" "strconv" @@ -15,108 +12,212 @@ import ( "time" "github.com/destel/rill" + "github.com/destel/rill/mockapi" ) -type Measurement struct { - Date time.Time - Temp float64 -} - -type User struct { - ID int - Username string - IsActive bool -} - // --- Package examples --- // This example demonstrates a Rill pipeline that fetches users from an API, -// and updates their status to active and saves them back. Both operations are done concurrently. +// updates their status to active and saves them back. +// Both operations are performed concurrently func Example() { - // In case of early exit this will cancel the user fetching, - // which in turn will terminate the entire pipeline. - ctx, cancel := context.WithCancel(context.Background()) - defer cancel() + ctx := context.Background() - // Start with a stream of user ids + // Convert a slice of user IDs into a stream ids := rill.FromSlice([]int{1, 2, 3, 4, 5, 6, 7, 8, 9, 10}, nil) // Read users from the API. // Concurrency = 3 - users := rill.Map(ids, 3, func(id int) (*User, error) { - return getUser(ctx, id) + users := rill.Map(ids, 3, func(id int) (*mockapi.User, error) { + return mockapi.GetUser(ctx, id) }) // Activate users. // Concurrency = 2 - err := rill.ForEach(users, 2, func(u *User) error { + err := rill.ForEach(users, 2, func(u *mockapi.User) error { if u.IsActive { fmt.Printf("User %d is already active\n", u.ID) return nil } u.IsActive = true - return saveUser(ctx, u) + err := mockapi.SaveUser(ctx, u) + if err != nil { + return err + } + + fmt.Printf("User saved: %+v\n", u) + return nil }) + // Handle errors fmt.Println("Error:", err) } -// This example showcases the use of Rill for building a multi-stage data processing pipeline, -// with a focus on batch processing. It streams user ids from a remote file, then fetches users from an API in batches, -// updates their status to active, and saves them back. All operations are done concurrently. +// This example demonstrates a Rill pipeline that fetches users from an API, +// and updates their status to active and saves them back. +// Users are fetched concurrently and in batches to reduce the number of API calls. func Example_batching() { - // In case of early exit this will cancel the file streaming, - // which in turn will terminate the entire pipeline. - ctx, cancel := context.WithCancel(context.Background()) - defer cancel() - - // Stream a file with user ids as an io.Reader - reader, err := downloadFile(ctx, "http://example.com/user_ids1.txt") - if err != nil { - fmt.Println("Error:", err) - return - } - - // Transform the reader into a stream of words - lines := streamLines(reader) + ctx := context.Background() - // Parse lines as integers - // Concurrency = 3 - ids := rill.Map(lines, 3, func(line string) (int, error) { - return strconv.Atoi(line) - }) + // Convert a slice of user IDs into a stream + ids := rill.FromSlice([]int{ + 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18, 19, 20, + 21, 22, 23, 24, 25, 26, 27, 28, 29, 30, 31, 32, 33, 34, 35, 36, 37, 38, 39, 40, + }, nil) - // Group IDs into batches of 5 for bulk processing + // Group IDs into batches of 5 idBatches := rill.Batch(ids, 5, 1*time.Second) - // Fetch users for each batch of IDs + // Bulk fetch users from the API // Concurrency = 3 - userBatches := rill.Map(idBatches, 3, func(ids []int) ([]*User, error) { - return getUsers(ctx, ids...) + userBatches := rill.Map(idBatches, 3, func(ids []int) ([]*mockapi.User, error) { + return mockapi.GetUsers(ctx, ids) }) - // Transform batches back into a stream of users + // Transform the stream of batches back into a flat stream of users users := rill.Unbatch(userBatches) // Activate users. // Concurrency = 2 - err = rill.ForEach(users, 2, func(u *User) error { + err := rill.ForEach(users, 2, func(u *mockapi.User) error { if u.IsActive { fmt.Printf("User %d is already active\n", u.ID) return nil } u.IsActive = true - return saveUser(ctx, u) + err := mockapi.SaveUser(ctx, u) + if err != nil { + return err + } + + fmt.Printf("User saved: %+v\n", u) + return nil }) + // Handle errors fmt.Println("Error:", err) } +// This example demonstrates how batching can be used to group similar concurrent database updates into a single query. +// The UpdateUserTimestamp function is used to update the last_active_at column in the users table. Updates are not +// executed immediately, but are rather queued and then sent to the database in batches of up to 5. +// +// When updates are sparse, it can take some time to collect a full batch. In this case the [Batch] function +// emits partial batches, ensuring that updates are delayed by at most 100ms. +// +// For simplicity, this example does not have retries, error handling and synchronization +func Example_batchingWithTimeout() { + // Start the background worker that processes the updates + go updateUserTimestampWorker() + + // Do some updates. They'll be automatically grouped into + // batches: [1,2,3,4,5], [6,7], [8] + UpdateUserTimestamp(1) + UpdateUserTimestamp(2) + UpdateUserTimestamp(3) + UpdateUserTimestamp(4) + UpdateUserTimestamp(5) + UpdateUserTimestamp(6) + UpdateUserTimestamp(7) + time.Sleep(500 * time.Millisecond) // simulate sparse updates + UpdateUserTimestamp(8) + + // Wait for the updates to be processed + // In real-world application, different synchronization mechanisms would be used. + time.Sleep(1 * time.Second) +} + +// This is the queue of user IDs to update. +var userIDsToUpdate = make(chan int) + +// UpdateUserTimestamp is the public API for updating the last_active_at column in the users table +func UpdateUserTimestamp(userID int) { + userIDsToUpdate <- userID +} + +// This is a background worker that sends queued updates to the database in batches. +// For simplicity, there are no retries, error handling and synchronization +func updateUserTimestampWorker() { + // convert channel of userIDsStream into a stream + ids := rill.FromChan(userIDsToUpdate, nil) + + // Group IDs into batches of 5 for bulk processing + // In case of sparse updates, we want to send them to the database no later than 100ms after they were queued. + idBatches := rill.Batch(ids, 5, 100*time.Millisecond) + + // Send updates to the database + // Concurrency = 1 (this controls max number of concurrent updates) + _ = rill.ForEach(idBatches, 1, func(batch []int) error { + fmt.Printf("Executed: UPDATE users SET last_active_at = NOW() WHERE id IN (%v)\n", batch) + return nil + }) +} + +// This example demonstrates how to find the first file containing a specific string among 1000 large files +// hosted online. +// +// Downloading all files at once would consume too much memory, while processing +// them one-by-one would take too long. And traditional concurrency patterns do not preserve the order of files, +// and would make it challenging to find the first match. +// +// The combination of [OrderedFilter] and [First] functions solves the problem, +// while downloading and holding in memory at most 5 files at the same time. +func Example_ordering() { + ctx := context.Background() + + // The string to search for in the downloaded files + needle := []byte("26") + + // Manually generate a stream of URLs from http://example.com/file-0.txt to http://example.com/file-999.txt + urls := make(chan rill.Try[string]) + go func() { + defer close(urls) + for i := 0; i < 1000; i++ { + // Stop generating URLs after the context is canceled (when the file is found) + // This can be rewritten as a select statement, but it's not necessary + if err := ctx.Err(); err != nil { + return + } + + urls <- rill.Wrap(fmt.Sprintf("https://example.com/file-%d.txt", i), nil) + } + }() + + // Download and process the files + // At most 5 files are downloaded and held in memory at the same time + matchedUrls := rill.OrderedFilter(urls, 5, func(url string) (bool, error) { + fmt.Println("Downloading:", url) + + content, err := mockapi.DownloadFile(ctx, url) + if err != nil { + return false, err + } + + // keep only URLs of files that contain the needle + return bytes.Contains(content, needle), nil + }) + + // Find the first matched URL + firstMatchedUrl, found, err := rill.First(matchedUrls) + if err != nil { + fmt.Println("Error:", err) + return + } + + // Print the result + if found { + fmt.Println("Found in:", firstMatchedUrl) + } else { + fmt.Println("Not found") + } +} + // This example demonstrates how to use the Fan-in and Fan-out patterns // to send messages through multiple servers concurrently. func Example_fanIn_FanOut() { + // Convert a slice of messages into a stream messages := rill.FromSlice([]string{ "message1", "message2", "message3", "message4", "message5", "message6", "message7", "message8", "message9", "message10", @@ -143,135 +244,120 @@ func Example_fanIn_FanOut() { fmt.Println("Error:", err) } -// This example demonstrates how [OrderedMap] can be used to enforce ordering of processing results. -// Pipeline below fetches temperature measurements for a city and calculates daily temperature changes. -// Measurements are fetched concurrently, but ordered processing is used to calculate the changes. -func Example_ordering() { - city := "New York" - endDate := time.Now() - startDate := endDate.AddDate(0, 0, -30) +// Helper function that simulates sending a message through a server +func sendMessage(message string, server string) error { + randomSleep(500 * time.Millisecond) // simulate some additional work + fmt.Printf("Sent through %s: %s\n", server, message) + return nil +} - // Create a stream of all days between startDate and endDate - days := make(chan rill.Try[time.Time]) - go func() { - defer close(days) - for date := startDate; date.Before(endDate); date = date.AddDate(0, 0, 1) { - days <- rill.Wrap(date, nil) - } - }() +// This example demonstrates using [FlatMap] to accelerate paginated API calls. Instead of fetching all users sequentially, +// page-by-page (which would take a long time since the API is slow and the number of pages is large), it fetches users from +// multiple departments in parallel. The example also shows how to write a reusable streaming wrapper around an existing +// API function that can be used on its own or as part of a larger pipeline. +func Example_parallelStreams() { + ctx := context.Background() - // Fetch the temperature for each day from the API - // Concurrency = 10; Ordered - measurements := rill.OrderedMap(days, 10, func(date time.Time) (Measurement, error) { - temp, err := getTemperature(city, date) - return Measurement{Date: date, Temp: temp}, err - }) + // Convert a list of all departments into a stream + departments := rill.FromSlice(mockapi.GetDepartments()) - // Iterate over the measurements, calculate and print changes. - // Concurrency = 1; Ordered - prev := Measurement{Temp: math.NaN()} - err := rill.ForEach(measurements, 1, func(m Measurement) error { - change := m.Temp - prev.Temp - prev = m + // Use FlatMap to stream users from 3 departments concurrently. + users := rill.FlatMap(departments, 3, func(department string) <-chan rill.Try[*mockapi.User] { + return StreamUsers(ctx, &mockapi.UserQuery{Department: department}) + }) - fmt.Printf("%s: %.1f°C (change %+.1f°C)\n", m.Date.Format("2006-01-02"), m.Temp, change) + // Print the users from the combined stream + err := rill.ForEach(users, 1, func(user *mockapi.User) error { + fmt.Printf("%+v\n", user) return nil }) - fmt.Println("Error:", err) } -// This example demonstrates a concurrent [MapReduce] performed on a set of remote files. -// It downloads them and calculates how many times each word appears in all the files. -func Example_mapReduce() { - ctx := context.Background() - ctx, cancel := context.WithCancel(ctx) +// StreamUsers is a reusable streaming wrapper around the mockapi.ListUsers function. +// It iterates through all listing pages and returns a stream of users. +// This function is useful on its own or as a building block for more complex pipelines. +func StreamUsers(ctx context.Context, query *mockapi.UserQuery) <-chan rill.Try[*mockapi.User] { + res := make(chan rill.Try[*mockapi.User]) - defer cancel() + if query == nil { + query = &mockapi.UserQuery{} + } - // Start with a stream of file URLs - urls := rill.FromSlice([]string{ - "http://example.com/text1.txt", - "http://example.com/text2.txt", - "http://example.com/text3.txt", - }, nil) + go func() { + defer close(res) - // Download files concurrently, and get a stream of all words from all files - // Concurrency = 2 - words := rill.FlatMap(urls, 2, func(url string) <-chan rill.Try[string] { - reader, err := downloadFile(ctx, url) - if err != nil { - return rill.FromSlice[string](nil, err) // Wrap the error in a stream - } + for page := 0; ; page++ { + query.Page = page - return streamWords(reader) - }) + users, err := mockapi.ListUsers(ctx, query) + if err != nil { + res <- rill.Wrap[*mockapi.User](nil, err) + return + } - // Count the number of occurrences of each word - counts, err := rill.MapReduce(words, - // Map phase: Use the word as key and "1" as value - // Concurrency = 3 - 3, func(word string) (string, int, error) { - return strings.ToLower(word), 1, nil - }, - // Reduce phase: Sum all "1" values for the same key - // Concurrency = 2 - 2, func(x, y int) (int, error) { - return x + y, nil - }, - ) + if len(users) == 0 { + break + } - fmt.Println("Result:", counts) - fmt.Println("Error:", err) + for _, user := range users { + res <- rill.Wrap(user, nil) + } + } + }() + + return res } -// This example demonstrates how to use context cancellation to terminate a Rill pipeline in case of an early exit. -// The printOddSquares function initiates a pipeline that prints squares of odd numbers. -// The infiniteNumberStream function is the initial stage of the pipeline. It generates numbers indefinitely until the context is canceled. -// When an error occurs in one of the pipeline stages: -// - The error is propagated down the pipeline and reaches the ForEach stage. -// - The ForEach function returns the error. -// - The printOddSquares function returns, and the context is canceled using defer. -// - The infiniteNumberStream function terminates due to context cancellation. -// - The entire pipeline is cleaned up gracefully. +// This example demonstrates how to use a context for pipeline termination. +// The FindFirstPrime function uses several concurrent workers to find the first prime number after a given number. +// Internally it creates a pipeline that starts from an infinite stream of numbers. When the first prime number is found +// in that stream, the context gets canceled, and the pipeline terminates gracefully. func Example_context() { - ctx := context.Background() - - err := printOddSquares(ctx) - fmt.Println("Error:", err) - - // Wait one more second to see "infiniteNumberStream terminated" printed - time.Sleep(1 * time.Second) + p := FindFirstPrime(10000, 3) // Use 3 concurrent workers + fmt.Println("The first prime after 10000 is", p) } -func printOddSquares(ctx context.Context) error { - ctx, cancel := context.WithCancel(ctx) +// FindFirstPrime finds the first prime number after the given number, using several concurrent workers. +func FindFirstPrime(after int, concurrency int) int { + ctx, cancel := context.WithCancel(context.Background()) defer cancel() - numbers := infiniteNumberStream(ctx) - - odds := rill.Filter(numbers, 3, func(x int) (bool, error) { - if x == 20 { - return false, fmt.Errorf("early exit") + // Generate an infinite stream of numbers starting from the given number + numbers := make(chan rill.Try[int]) + go func() { + defer close(numbers) + for i := after + 1; ; i++ { + select { + case <-ctx.Done(): + return // Stop generating numbers when the context is canceled + case numbers <- rill.Wrap(i, nil): + } } - return x%2 == 1, nil - }) + }() - return rill.ForEach(odds, 3, func(x int) error { - fmt.Println(x * x) - return nil + // Filter out non-prime numbers, preserve the order + primes := rill.OrderedFilter(numbers, concurrency, func(x int) (bool, error) { + fmt.Println("Checking", x) + return isPrime(x), nil }) + + // Get the first prime and cancel the context + // This stops number generation and allows goroutines to exit + result, _, _ := rill.First(primes) + return result } // --- Function examples --- func ExampleAll() { + // Convert a slice of numbers into a stream numbers := rill.FromSlice([]int{1, 2, 3, 4, 5, 6, 7, 8, 9, 10}, nil) - // Are all numbers even? + // Are all numbers prime? // Concurrency = 3 ok, err := rill.All(numbers, 3, func(x int) (bool, error) { - return x%2 == 0, nil + return isPrime(x), nil }) fmt.Println("Result:", ok) @@ -279,12 +365,13 @@ func ExampleAll() { } func ExampleAny() { + // Convert a slice of numbers into a stream numbers := rill.FromSlice([]int{1, 2, 3, 4, 5, 6, 7, 8, 9, 10}, nil) - // Is there at least one even number? + // Is there at least one prime number? // Concurrency = 3 ok, err := rill.Any(numbers, 3, func(x int) (bool, error) { - return x%2 == 0, nil + return isPrime(x), nil }) fmt.Println("Result: ", ok) @@ -293,7 +380,7 @@ func ExampleAny() { // Also check out the package level examples to see Batch in action func ExampleBatch() { - // New number is emitted every 50ms + // Generate a stream of numbers 0 to 49, where a new number is emitted every 50ms numbers := make(chan rill.Try[int]) go func() { defer close(numbers) @@ -310,17 +397,18 @@ func ExampleBatch() { } func ExampleCatch() { + // Convert a slice of strings into a stream strs := rill.FromSlice([]string{"1", "2", "3", "4", "5", "not a number 6", "7", "8", "9", "10"}, nil) // Convert strings to ints - // Concurrency = 3; Unordered + // Concurrency = 3 ids := rill.Map(strs, 3, func(s string) (int, error) { - randomSleep(1000 * time.Millisecond) // simulate some additional work + randomSleep(500 * time.Millisecond) // simulate some additional work return strconv.Atoi(s) }) // Catch and ignore number parsing errors - // Concurrency = 2; Unordered + // Concurrency = 2 ids = rill.Catch(ids, 2, func(err error) error { if errors.Is(err, strconv.ErrSyntax) { return nil // Ignore this error @@ -334,17 +422,18 @@ func ExampleCatch() { // The same example as for the [Catch], but using ordered versions of functions. func ExampleOrderedCatch() { + // Convert a slice of strings into a stream strs := rill.FromSlice([]string{"1", "2", "3", "4", "5", "not a number 6", "7", "8", "9", "10"}, nil) // Convert strings to ints - // Concurrency = 3; Unordered + // Concurrency = 3; Ordered ids := rill.OrderedMap(strs, 3, func(s string) (int, error) { - randomSleep(1000 * time.Millisecond) // simulate some additional work + randomSleep(500 * time.Millisecond) // simulate some additional work return strconv.Atoi(s) }) // Catch and ignore number parsing errors - // Concurrency = 2; Unordered + // Concurrency = 2; Ordered ids = rill.OrderedCatch(ids, 2, func(err error) error { if errors.Is(err, strconv.ErrSyntax) { return nil // Ignore this error @@ -359,65 +448,65 @@ func ExampleOrderedCatch() { func ExampleErr() { ctx := context.Background() - users := rill.FromSlice([]*User{ - {ID: 1, Username: "foo"}, - {ID: 2, Username: "bar"}, - {ID: 3}, - {ID: 4, Username: "baz"}, - {ID: 5, Username: "qux"}, - {ID: 6, Username: "quux"}, + // Convert a slice of users into a stream + users := rill.FromSlice([]*mockapi.User{ + {ID: 1, Name: "foo", Age: 25}, + {ID: 2, Name: "bar", Age: 30}, + {ID: 3}, // empty username is invalid + {ID: 4, Name: "baz", Age: 35}, + {ID: 5, Name: "qux", Age: 26}, + {ID: 6, Name: "quux", Age: 27}, }, nil) // Save users. Use struct{} as a result type - // Concurrency = 2; Unordered - results := rill.Map(users, 2, func(user *User) (struct{}, error) { - return struct{}{}, saveUser(ctx, user) + // Concurrency = 2 + results := rill.Map(users, 2, func(user *mockapi.User) (struct{}, error) { + return struct{}{}, mockapi.SaveUser(ctx, user) }) - // We're interested only in side effects and errors from - // the pipeline above + // We're only need to know if all users were saved successfully err := rill.Err(results) fmt.Println("Error:", err) } func ExampleFilter() { + // Convert a slice of numbers into a stream numbers := rill.FromSlice([]int{1, 2, 3, 4, 5, 6, 7, 8, 9, 10}, nil) - // Keep only even numbers - // Concurrency = 3; Unordered - evens := rill.Filter(numbers, 3, func(x int) (bool, error) { - randomSleep(1000 * time.Millisecond) // simulate some additional work - return x%2 == 0, nil + // Keep only prime numbers + // Concurrency = 3 + primes := rill.Filter(numbers, 3, func(x int) (bool, error) { + return isPrime(x), nil }) - printStream(evens) + printStream(primes) } // The same example as for the [Filter], but using ordered versions of functions. func ExampleOrderedFilter() { + // Convert a slice of numbers into a stream numbers := rill.FromSlice([]int{1, 2, 3, 4, 5, 6, 7, 8, 9, 10}, nil) - // Keep only even numbers + // Keep only prime numbers // Concurrency = 3; Ordered - evens := rill.OrderedFilter(numbers, 3, func(x int) (bool, error) { - randomSleep(1000 * time.Millisecond) // simulate some additional work - return x%2 == 0, nil + primes := rill.OrderedFilter(numbers, 3, func(x int) (bool, error) { + return isPrime(x), nil }) - printStream(evens) + printStream(primes) } func ExampleFilterMap() { + // Convert a slice of numbers into a stream numbers := rill.FromSlice([]int{1, 2, 3, 4, 5, 6, 7, 8, 9, 10}, nil) - // Keep only odd numbers and square them - // Concurrency = 3; Unordered + // Keep only prime numbers and square them + // Concurrency = 3 squares := rill.FilterMap(numbers, 3, func(x int) (int, bool, error) { - if x%2 == 0 { + if !isPrime(x) { return 0, false, nil } - randomSleep(1000 * time.Millisecond) // simulate some additional work return x * x, true, nil }) @@ -426,16 +515,16 @@ func ExampleFilterMap() { // The same example as for the [FilterMap], but using ordered versions of functions. func ExampleOrderedFilterMap() { + // Convert a slice of numbers into a stream numbers := rill.FromSlice([]int{1, 2, 3, 4, 5, 6, 7, 8, 9, 10}, nil) - // Keep only odd numbers and square them - // Concurrency = 3; Ordered + // Keep only prime numbers and square them + // Concurrency = 3 squares := rill.OrderedFilterMap(numbers, 3, func(x int) (int, bool, error) { - if x%2 == 0 { + if !isPrime(x) { return 0, false, nil } - randomSleep(1000 * time.Millisecond) // simulate some additional work return x * x, true, nil }) @@ -443,6 +532,7 @@ func ExampleOrderedFilterMap() { } func ExampleFirst() { + // Convert a slice of numbers into a stream numbers := rill.FromSlice([]int{1, 2, 3, 4, 5, 6, 7, 8, 9, 10}, nil) // Keep only the numbers divisible by 4 @@ -459,12 +549,13 @@ func ExampleFirst() { } func ExampleFlatMap() { + // Convert a slice of numbers into a stream numbers := rill.FromSlice([]int{1, 2, 3, 4, 5}, nil) - // Replace each number with three strings - // Concurrency = 3; Unordered - result := rill.FlatMap(numbers, 3, func(x int) <-chan rill.Try[string] { - randomSleep(1000 * time.Millisecond) // simulate some additional work + // Replace each number in the input stream with three strings + // Concurrency = 2 + result := rill.FlatMap(numbers, 2, func(x int) <-chan rill.Try[string] { + randomSleep(500 * time.Millisecond) // simulate some additional work return rill.FromSlice([]string{ fmt.Sprintf("foo%d", x), @@ -478,12 +569,13 @@ func ExampleFlatMap() { // The same example as for the [FlatMap], but using ordered versions of functions. func ExampleOrderedFlatMap() { + // Convert a slice of numbers into a stream numbers := rill.FromSlice([]int{1, 2, 3, 4, 5}, nil) - // Replace each number with three strings - // Concurrency = 3; Ordered - result := rill.OrderedFlatMap(numbers, 3, func(x int) <-chan rill.Try[string] { - randomSleep(1000 * time.Millisecond) // simulate some additional work + // Replace each number in the input stream with three strings + // Concurrency = 2; Ordered + result := rill.OrderedFlatMap(numbers, 2, func(x int) <-chan rill.Try[string] { + randomSleep(500 * time.Millisecond) // simulate some additional work return rill.FromSlice([]string{ fmt.Sprintf("foo%d", x), @@ -496,18 +588,18 @@ func ExampleOrderedFlatMap() { } func ExampleForEach() { + // Convert a slice of numbers into a stream numbers := rill.FromSlice([]int{1, 2, 3, 4, 5, 6, 7, 8, 9, 10}, nil) - // Square and print each number - // Concurrency = 3; Unordered + // Do something with each number and print the result + // Concurrency = 3 err := rill.ForEach(numbers, 3, func(x int) error { - randomSleep(1000 * time.Millisecond) // simulate some additional work - - y := x * x + y := doSomethingWithNumber(x) fmt.Println(y) return nil }) + // Handle errors fmt.Println("Error:", err) } @@ -515,56 +607,58 @@ func ExampleForEach() { // If you need a concurrent and ordered ForEach, then do all processing with the [OrderedMap], // and then use ForEach with concurrency set to 1 at the final stage. func ExampleForEach_ordered() { + // Convert a slice of numbers into a stream numbers := rill.FromSlice([]int{1, 2, 3, 4, 5, 6, 7, 8, 9, 10}, nil) - // Square each number. + // Do something with each number // Concurrency = 3; Ordered - squares := rill.OrderedMap(numbers, 3, func(x int) (int, error) { - randomSleep(1000 * time.Millisecond) // simulate some additional work - return x * x, nil + results := rill.OrderedMap(numbers, 3, func(x int) (int, error) { + return doSomethingWithNumber(x), nil }) // Print results. // Concurrency = 1; Ordered - err := rill.ForEach(squares, 1, func(y int) error { + err := rill.ForEach(results, 1, func(y int) error { fmt.Println(y) return nil }) + + // Handle errors fmt.Println("Error:", err) } func ExampleMap() { + // Convert a slice of numbers into a stream numbers := rill.FromSlice([]int{1, 2, 3, 4, 5, 6, 7, 8, 9, 10}, nil) - // Square each number. - // Concurrency = 3; Unordered - squares := rill.Map(numbers, 3, func(x int) (int, error) { - randomSleep(1000 * time.Millisecond) // simulate some additional work - return x * x, nil + // Transform each number + // Concurrency = 3 + results := rill.Map(numbers, 3, func(x int) (int, error) { + return doSomethingWithNumber(x), nil }) - printStream(squares) + printStream(results) } // The same example as for the [Map], but using ordered versions of functions. func ExampleOrderedMap() { + // Convert a slice of numbers into a stream numbers := rill.FromSlice([]int{1, 2, 3, 4, 5, 6, 7, 8, 9, 10}, nil) - // Square each number. + // Transform each number // Concurrency = 3; Ordered - squares := rill.OrderedMap(numbers, 3, func(x int) (int, error) { - randomSleep(1000 * time.Millisecond) // simulate some additional work - return x * x, nil + results := rill.OrderedMap(numbers, 3, func(x int) (int, error) { + return doSomethingWithNumber(x), nil }) - printStream(squares) + printStream(results) } func ExampleMapReduce() { var re = regexp.MustCompile(`\w+`) text := "Early morning brings early birds to the early market. Birds sing, the market buzzes, and the morning shines." - // Start with a stream of words + // Convert a text into a stream of words words := rill.FromSlice(re.FindAllString(text, -1), nil) // Count the number of occurrences of each word @@ -586,6 +680,7 @@ func ExampleMapReduce() { } func ExampleMerge() { + // Convert slices of numbers into streams numbers1 := rill.FromSlice([]int{1, 2, 3, 4, 5}, nil) numbers2 := rill.FromSlice([]int{6, 7, 8, 9, 10}, nil) numbers3 := rill.FromSlice([]int{11, 12}, nil) @@ -596,6 +691,7 @@ func ExampleMerge() { } func ExampleReduce() { + // Convert a slice of numbers into a stream numbers := rill.FromSlice([]int{1, 2, 3, 4, 5, 6, 7, 8, 9, 10}, nil) // Sum all numbers @@ -608,21 +704,23 @@ func ExampleReduce() { } func ExampleToSlice() { + // Convert a slice of numbers into a stream numbers := rill.FromSlice([]int{1, 2, 3, 4, 5, 6, 7, 8, 9, 10}, nil) - // Square each number + // Transform each number // Concurrency = 3; Ordered - squares := rill.OrderedMap(numbers, 3, func(x int) (int, error) { - return x * x, nil + results := rill.OrderedMap(numbers, 3, func(x int) (int, error) { + return doSomethingWithNumber(x), nil }) - squaresSlice, err := rill.ToSlice(squares) + resultsSlice, err := rill.ToSlice(results) - fmt.Println("Result:", squaresSlice) + fmt.Println("Result:", resultsSlice) fmt.Println("Error:", err) } func ExampleUnbatch() { + // Create a stream of batches batches := rill.FromSlice([][]int{ {1, 2, 3}, {4, 5}, @@ -637,165 +735,27 @@ func ExampleUnbatch() { // --- Helpers --- -// streamLines converts an io.Reader into a stream of lines -func streamLines(r io.ReadCloser) <-chan rill.Try[string] { - out := make(chan rill.Try[string]) - go func() { - defer r.Close() - defer close(out) - - scanner := bufio.NewScanner(r) - for scanner.Scan() { - out <- rill.Wrap(scanner.Text(), nil) - } - if err := scanner.Err(); err != nil { - out <- rill.Wrap("", err) - } - }() - return out +// helper function that squares the number +// and simulates some additional work using sleep +func doSomethingWithNumber(x int) int { + randomSleep(500 * time.Millisecond) // simulate some additional work + return x * x } -// streamWords is helper function that converts an io.Reader into a stream of words. -func streamWords(r io.ReadCloser) <-chan rill.Try[string] { - words := make(chan rill.Try[string], 1) +// naive prime number check. +// also simulates some additional work using sleep +func isPrime(n int) bool { + randomSleep(500 * time.Millisecond) // simulate some additional work - go func() { - defer r.Close() - defer close(words) - - scanner := bufio.NewScanner(r) - scanner.Split(bufio.ScanWords) - - for scanner.Scan() { - word := scanner.Text() - word = strings.Trim(word, ".,;:!?&()") // strip all punctuation. it's basic and just for demonstration - if len(word) > 0 { - words <- rill.Wrap(word, nil) - } - } - if err := scanner.Err(); err != nil { - words <- rill.Wrap("", err) - } - }() - - return words -} - -var ErrFileNotFound = errors.New("file not found") - -var files = map[string]string{ - "http://example.com/user_ids1.txt": strings.ReplaceAll("1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20", " ", "\n"), - "http://example.com/user_ids2.txt": strings.ReplaceAll("21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40", " ", "\n"), - "http://example.com/user_ids3.txt": strings.ReplaceAll("41 42 43 44 45", " ", "\n"), - "http://example.com/text1.txt": "Early morning brings early birds to the early market. Birds sing, the market buzzes, and the morning shines.", - "http://example.com/text2.txt": "The birds often sing at the market", - "http://example.com/text3.txt": "The market closes, the birds rest, and the night brings peace to the town.", -} - -// downloadFile simulates downloading a file from a URL. -// Returns a reader for the file content. -func downloadFile(ctx context.Context, url string) (io.ReadCloser, error) { - content, ok := files[url] - if !ok { - return nil, ErrFileNotFound + if n < 2 { + return false } - - // In a real-world scenario, this would be an HTTP request depending on the ctx. - return io.NopCloser(strings.NewReader(content)), nil -} - -// Helper function that simulates sending a message through a server -func sendMessage(message string, server string) error { - randomSleep(1000 * time.Millisecond) // simulate some additional work - fmt.Printf("Sent through %s: %s\n", server, message) - return nil -} - -// getTemperature simulates fetching a temperature reading for a city and date, -func getTemperature(city string, date time.Time) (float64, error) { - randomSleep(1000 * time.Millisecond) // Simulate a network delay - - // Basic city hash, to make measurements unique for each city - cityHash := float64(hash(city)) - - // Simulate a temperature reading, by retuning a pseudo-random, but deterministic value - temp := 15 - 10*math.Sin(cityHash+float64(date.Unix())) - temp = math.Round(temp*10) / 10 // Round to one decimal place - - return temp, nil -} - -func infiniteNumberStream(ctx context.Context) <-chan rill.Try[int] { - out := make(chan rill.Try[int]) - go func() { - defer fmt.Println("infiniteNumberStream terminated") - defer close(out) - for i := 1; ; i++ { - if err := ctx.Err(); err != nil { - return // This can be rewritten as select, but it's not necessary - } - out <- rill.Wrap(i, nil) - time.Sleep(100 * time.Millisecond) + for i := 2; i*i <= n; i++ { + if n%i == 0 { + return false } - }() - return out -} - -var adjs = []string{"big", "small", "fast", "slow", "smart", "happy", "sad", "funny", "serious", "angry"} -var nouns = []string{"dog", "cat", "bird", "fish", "mouse", "elephant", "lion", "tiger", "bear", "wolf"} - -// getUsers simulates fetching multiple users from an API. -// User fields are pseudo-random, but deterministic based on the user ID. -func getUsers(ctx context.Context, ids ...int) ([]*User, error) { - randomSleep(1000 * time.Millisecond) // Simulate a network delay - - users := make([]*User, 0, len(ids)) - for _, id := range ids { - if err := ctx.Err(); err != nil { - return nil, err - } - - user := User{ - ID: id, - Username: adjs[hash(id, "adj")%len(adjs)] + "_" + nouns[hash(id, "noun")%len(nouns)], // adj + noun - IsActive: hash(id, "active")%100 < 60, // 60% - } - - users = append(users, &user) } - return users, nil -} - -var ErrUserNotFound = errors.New("user not found") - -// getUser simulates fetching a user from an API. -func getUser(ctx context.Context, id int) (*User, error) { - users, err := getUsers(ctx, id) - if err != nil { - return nil, err - } - - if len(users) == 0 { - return nil, ErrUserNotFound - } - - return users[0], nil -} - -// saveUser simulates saving a user through an API. -func saveUser(ctx context.Context, user *User) error { - randomSleep(1000 * time.Millisecond) // Simulate a network delay - - if err := ctx.Err(); err != nil { - return err - } - - if user.Username == "" { - return fmt.Errorf("empty username") - } - - fmt.Printf("User saved: %+v\n", user) - return nil + return true } // printStream prints all items from a stream (one per line) and an error if any. @@ -811,10 +771,3 @@ func printStream[A any](stream <-chan rill.Try[A]) { func randomSleep(max time.Duration) { time.Sleep(time.Duration(rand.Intn(int(max)))) } - -// hash is a simple hash function that returns an integer hash for a given input. -func hash(input ...any) int { - hasher := fnv.New32() - fmt.Fprintln(hasher, input...) - return int(hasher.Sum32()) -} diff --git a/mockapi/files.go b/mockapi/files.go new file mode 100644 index 0000000..1fab424 --- /dev/null +++ b/mockapi/files.go @@ -0,0 +1,17 @@ +package mockapi + +import ( + "context" + "fmt" + "time" +) + +// DownloadFile simulates a file download. It returns the whole content as []byte. +func DownloadFile(ctx context.Context, url string) ([]byte, error) { + randomSleep(ctx, 1000*time.Millisecond) + if err := ctx.Err(); err != nil { + return nil, err + } + + return []byte(fmt.Sprintf("This is the content of %s", url)), nil +} diff --git a/mockapi/users.go b/mockapi/users.go new file mode 100644 index 0000000..a723ab7 --- /dev/null +++ b/mockapi/users.go @@ -0,0 +1,179 @@ +// Package mockapi provides a very basic mock API client for examples and demos. +// Unfortunately it must live outside the internal folder to be accessible from runnable examples and go playground. +package mockapi + +import ( + "context" + "fmt" + "hash/fnv" + "math/rand" + "sync" + "time" +) + +type User struct { + ID int + Name string + Age int + Department string + IsActive bool +} + +// don't use pointers here, to make sure that raw data is not accessible from outside +var departments = []string{"HR", "IT", "Finance", "Marketing", "Sales", "Support", "Engineering", "Management"} +var users = make(map[int]User) + +var mu sync.RWMutex + +func init() { + var adjs = []string{"Big", "Small", "Fast", "Slow", "Smart", "Happy", "Sad", "Funny", "Serious", "Angry"} + var nouns = []string{"Dog", "Cat", "Bird", "Fish", "Mouse", "Elephant", "Lion", "Tiger", "Bear", "Wolf"} + + mu.Lock() + defer mu.Unlock() + + // Generate users + for i := 1; i <= 100; i++ { + user := User{ + ID: i, + Name: adjs[hash(i, "name1")%len(adjs)] + " " + nouns[hash(i, "name2")%len(nouns)], // adj + noun + Age: hash(i, "age")%20 + 30, // 20-50 + Department: departments[hash(i, "dep")%len(departments)], // one of + IsActive: hash(i, "active")%100 < 60, // 60% + } + + users[i] = user + } +} + +func GetDepartments() ([]string, error) { + res := make([]string, len(departments)) + copy(res, departments) + return res, nil +} + +// GetUser returns a user by ID. +func GetUser(ctx context.Context, id int) (*User, error) { + randomSleep(ctx, 500*time.Millisecond) + if err := ctx.Err(); err != nil { + return nil, err + } + + mu.RLock() + defer mu.RUnlock() + + user, ok := users[id] + if !ok { + return nil, fmt.Errorf("user not found") + } + + return &user, nil +} + +// GetUsers returns a list of users by IDs. +// If a user is not found, nil is returned in the corresponding position. +func GetUsers(ctx context.Context, ids []int) ([]*User, error) { + randomSleep(ctx, 1000*time.Millisecond) + if err := ctx.Err(); err != nil { + return nil, err + } + + mu.RLock() + defer mu.RUnlock() + + res := make([]*User, 0, len(ids)) + for _, id := range ids { + user, ok := users[id] + if !ok { + res = append(res, nil) + } else { + res = append(res, &user) + } + } + + return res, nil +} + +type UserQuery struct { + Department string + Page int +} + +// ListUsers returns a paginated list of users optionally filtered by department. +func ListUsers(ctx context.Context, query *UserQuery) ([]*User, error) { + randomSleep(ctx, 1000*time.Millisecond) + if err := ctx.Err(); err != nil { + return nil, err + } + + const pageSize = 10 + if query == nil { + query = &UserQuery{} + } + offset := query.Page * pageSize + + mu.RLock() + defer mu.RUnlock() + + res := make([]*User, 0, 10) + for _, user := range users { + if query.Department != "" && user.Department != query.Department { + continue + } + + if offset > 0 { + offset-- + continue + } + + if len(res) >= pageSize { + break + } + + res = append(res, &user) + } + + return res, nil +} + +// SaveUser saves a user. +func SaveUser(ctx context.Context, user *User) error { + randomSleep(ctx, 1000*time.Millisecond) + if err := ctx.Err(); err != nil { + return err + } + + if user == nil { + return fmt.Errorf("user is nil") + } + + if user.Name == "" { + return fmt.Errorf("username is empty") + } + if user.Age <= 0 { + return fmt.Errorf("age is invalid") + } + + mu.Lock() + defer mu.Unlock() + + users[user.ID] = *user + return nil +} + +func hash(input ...any) int { + hasher := fnv.New32() + fmt.Fprintln(hasher, input...) + return int(hasher.Sum32()) +} + +func randomSleep(ctx context.Context, max time.Duration) { + dur := time.Duration(rand.Intn(int(max))) + t := time.NewTimer(dur) + defer t.Stop() + + select { + case <-t.C: + case <-ctx.Done(): + } +}