Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Improve documentation clarity and reorganize examples #41

Merged
merged 18 commits into from
Nov 23, 2024
338 changes: 183 additions & 155 deletions README.md

Large diffs are not rendered by default.

18 changes: 9 additions & 9 deletions example123_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,11 +18,11 @@ func ExampleFromSeq() {

// Transform each number
// Concurrency = 3
results := rill.Map(numbers, 3, func(x int) (int, error) {
return doSomethingWithNumber(x), nil
squares := rill.Map(numbers, 3, func(x int) (int, error) {
return square(x), nil
})

printStream(results)
printStream(squares)
}

func ExampleFromSeq2() {
Expand All @@ -40,11 +40,11 @@ func ExampleFromSeq2() {

// Transform each number
// Concurrency = 3
results := rill.Map(numbers, 3, func(x int) (int, error) {
return doSomethingWithNumber(x), nil
squares := rill.Map(numbers, 3, func(x int) (int, error) {
return square(x), nil
})

printStream(results)
printStream(squares)
}

func ExampleToSeq2() {
Expand All @@ -53,12 +53,12 @@ func ExampleToSeq2() {

// Transform each number
// Concurrency = 3
results := rill.Map(numbers, 3, func(x int) (int, error) {
return doSomethingWithNumber(x), nil
squares := rill.Map(numbers, 3, func(x int) (int, error) {
return square(x), nil
})

// Convert the stream into an iterator and use for-range to print the results
for val, err := range rill.ToSeq2(results) {
for val, err := range rill.ToSeq2(squares) {
if err != nil {
fmt.Println("Error:", err)
break // cleanup is done regardless of early exit
Expand Down
153 changes: 78 additions & 75 deletions example_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,7 @@ func Example_batching() {
}, nil)

// Group IDs into batches of 5
idBatches := rill.Batch(ids, 5, 1*time.Second)
idBatches := rill.Batch(ids, 5, -1)

// Bulk fetch users from the API
// Concurrency = 3
Expand Down Expand Up @@ -108,7 +108,7 @@ func Example_batching() {
// emits partial batches, ensuring that updates are delayed by at most 100ms.
//
// For simplicity, this example does not have retries, error handling and synchronization
func Example_batchingWithTimeout() {
func Example_batchingRealTime() {
// Start the background worker that processes the updates
go updateUserTimestampWorker()

Expand Down Expand Up @@ -170,20 +170,13 @@ func Example_ordering() {
// The string to search for in the downloaded files
needle := []byte("26")

// Manually generate a stream of URLs from http://example.com/file-0.txt to http://example.com/file-999.txt
urls := make(chan rill.Try[string])
go func() {
defer close(urls)
for i := 0; i < 1000; i++ {
// Stop generating URLs after the context is canceled (when the file is found)
// This can be rewritten as a select statement, but it's not necessary
if err := ctx.Err(); err != nil {
return
}
// Start with a stream of numbers from 0 to 999
fileIDs := streamNumbers(ctx, 0, 1000)

urls <- rill.Wrap(fmt.Sprintf("https://example.com/file-%d.txt", i), nil)
}
}()
// Generate a stream of URLs from http://example.com/file-0.txt to http://example.com/file-999.txt
urls := rill.OrderedMap(fileIDs, 1, func(id int) (string, error) {
return fmt.Sprintf("https://example.com/file-%d.txt", id), nil
})

// Download and process the files
// At most 5 files are downloaded and held in memory at the same time
Expand Down Expand Up @@ -251,17 +244,16 @@ func sendMessage(message string, server string) error {
return nil
}

// This example demonstrates using [FlatMap] to accelerate paginated API calls. Instead of fetching all users sequentially,
// page-by-page (which would take a long time since the API is slow and the number of pages is large), it fetches users from
// multiple departments in parallel. The example also shows how to write a reusable streaming wrapper around an existing
// API function that can be used on its own or as part of a larger pipeline.
func Example_parallelStreams() {
// This example demonstrates using [FlatMap] to fetch users from multiple departments concurrently.
// Additionally, it demonstrates how to write a reusable streaming wrapper over paginated API calls - the StreamUsers function
func Example_flatMap() {
ctx := context.Background()

// Convert a list of all departments into a stream
departments := rill.FromSlice(mockapi.GetDepartments())
// Start with a stream of department names
departments := rill.FromSlice([]string{"IT", "Finance", "Marketing", "Support", "Engineering"}, nil)

// Use FlatMap to stream users from 3 departments concurrently.
// Stream users from all departments concurrently.
// At most 3 departments at the same time.
users := rill.FlatMap(departments, 3, func(department string) <-chan rill.Try[*mockapi.User] {
return StreamUsers(ctx, &mockapi.UserQuery{Department: department})
})
Expand All @@ -276,7 +268,7 @@ func Example_parallelStreams() {

// StreamUsers is a reusable streaming wrapper around the mockapi.ListUsers function.
// It iterates through all listing pages and returns a stream of users.
// This function is useful on its own or as a building block for more complex pipelines.
// This function is useful both on its own and as part of larger pipelines.
func StreamUsers(ctx context.Context, query *mockapi.UserQuery) <-chan rill.Try[*mockapi.User] {
res := make(chan rill.Try[*mockapi.User])

Expand Down Expand Up @@ -309,43 +301,38 @@ func StreamUsers(ctx context.Context, query *mockapi.UserQuery) <-chan rill.Try[
return res
}

// This example demonstrates how to use a context for pipeline termination.
// The FindFirstPrime function uses several concurrent workers to find the first prime number after a given number.
// Internally it creates a pipeline that starts from an infinite stream of numbers. When the first prime number is found
// in that stream, the context gets canceled, and the pipeline terminates gracefully.
// This example demonstrates how to gracefully stop a pipeline on the first error.
// The CheckAllUsersExist uses several concurrent workers and returns an error as soon as it encounters a non-existent user.
// Such early return triggers the context cancellation, which in turn stops all remaining users fetches.
func Example_context() {
p := FindFirstPrime(10000, 3) // Use 3 concurrent workers
fmt.Println("The first prime after 10000 is", p)
ctx := context.Background()

// ID 999 doesn't exist, so fetching will stop after hitting it.
err := CheckAllUsersExist(ctx, 3, []int{1, 2, 3, 4, 5, 999, 7, 8, 9, 10})
fmt.Printf("Check result: %v\n", err)
}

// FindFirstPrime finds the first prime number after the given number, using several concurrent workers.
func FindFirstPrime(after int, concurrency int) int {
ctx, cancel := context.WithCancel(context.Background())
// CheckAllUsersExist uses several concurrent workers to checks if all users with given IDs exist.
func CheckAllUsersExist(ctx context.Context, concurrency int, ids []int) error {
// Create new context that will be canceled when this function returns
ctx, cancel := context.WithCancel(ctx)
defer cancel()

// Generate an infinite stream of numbers starting from the given number
numbers := make(chan rill.Try[int])
go func() {
defer close(numbers)
for i := after + 1; ; i++ {
select {
case <-ctx.Done():
return // Stop generating numbers when the context is canceled
case numbers <- rill.Wrap(i, nil):
}
idsStream := rill.FromSlice(ids, nil)

// Fetch users concurrently.
users := rill.Map(idsStream, concurrency, func(id int) (*mockapi.User, error) {
u, err := mockapi.GetUser(ctx, id)
if err != nil {
return nil, fmt.Errorf("failed to fetch user %d: %w", id, err)
}
}()

// Filter out non-prime numbers, preserve the order
primes := rill.OrderedFilter(numbers, concurrency, func(x int) (bool, error) {
fmt.Println("Checking", x)
return isPrime(x), nil
fmt.Printf("Fetched user %d\n", id)
return u, nil
})

// Get the first prime and cancel the context
// This stops number generation and allows goroutines to exit
result, _, _ := rill.First(primes)
return result
// Return the first error (if any) and cancel remaining fetches via context
return rill.Err(users)
}

// --- Function examples ---
Expand Down Expand Up @@ -591,10 +578,10 @@ func ExampleForEach() {
// Convert a slice of numbers into a stream
numbers := rill.FromSlice([]int{1, 2, 3, 4, 5, 6, 7, 8, 9, 10}, nil)

// Do something with each number and print the result
// Square each number and print the result
// Concurrency = 3
err := rill.ForEach(numbers, 3, func(x int) error {
y := doSomethingWithNumber(x)
y := square(x)
fmt.Println(y)
return nil
})
Expand All @@ -610,15 +597,15 @@ func ExampleForEach_ordered() {
// Convert a slice of numbers into a stream
numbers := rill.FromSlice([]int{1, 2, 3, 4, 5, 6, 7, 8, 9, 10}, nil)

// Do something with each number
// Square each number
// Concurrency = 3; Ordered
results := rill.OrderedMap(numbers, 3, func(x int) (int, error) {
return doSomethingWithNumber(x), nil
squares := rill.OrderedMap(numbers, 3, func(x int) (int, error) {
return square(x), nil
})

// Print results.
// Concurrency = 1; Ordered
err := rill.ForEach(results, 1, func(y int) error {
err := rill.ForEach(squares, 1, func(y int) error {
fmt.Println(y)
return nil
})
Expand All @@ -633,11 +620,11 @@ func ExampleMap() {

// Transform each number
// Concurrency = 3
results := rill.Map(numbers, 3, func(x int) (int, error) {
return doSomethingWithNumber(x), nil
squares := rill.Map(numbers, 3, func(x int) (int, error) {
return square(x), nil
})

printStream(results)
printStream(squares)
}

// The same example as for the [Map], but using ordered versions of functions.
Expand All @@ -647,11 +634,11 @@ func ExampleOrderedMap() {

// Transform each number
// Concurrency = 3; Ordered
results := rill.OrderedMap(numbers, 3, func(x int) (int, error) {
return doSomethingWithNumber(x), nil
squares := rill.OrderedMap(numbers, 3, func(x int) (int, error) {
return square(x), nil
})

printStream(results)
printStream(squares)
}

func ExampleMapReduce() {
Expand Down Expand Up @@ -709,11 +696,11 @@ func ExampleToSlice() {

// Transform each number
// Concurrency = 3; Ordered
results := rill.OrderedMap(numbers, 3, func(x int) (int, error) {
return doSomethingWithNumber(x), nil
squares := rill.OrderedMap(numbers, 3, func(x int) (int, error) {
return square(x), nil
})

resultsSlice, err := rill.ToSlice(results)
resultsSlice, err := rill.ToSlice(squares)

fmt.Println("Result:", resultsSlice)
fmt.Println("Error:", err)
Expand All @@ -735,15 +722,8 @@ func ExampleUnbatch() {

// --- Helpers ---

// helper function that squares the number
// helper function that checks if a number is prime
// and simulates some additional work using sleep
func doSomethingWithNumber(x int) int {
randomSleep(500 * time.Millisecond) // simulate some additional work
return x * x
}

// naive prime number check.
// also simulates some additional work using sleep
func isPrime(n int) bool {
randomSleep(500 * time.Millisecond) // simulate some additional work

Expand All @@ -758,6 +738,29 @@ func isPrime(n int) bool {
return true
}

// helper function that squares the number
// and simulates some additional work using sleep
func square(x int) int {
randomSleep(500 * time.Millisecond) // simulate some additional work
return x * x
}

// helper function that creates a stream of numbers [start, end) and respects the context
func streamNumbers(ctx context.Context, start, end int) <-chan rill.Try[int] {
out := make(chan rill.Try[int])
go func() {
defer close(out)
for i := start; i < end; i++ {
select {
case <-ctx.Done():
return
case out <- rill.Try[int]{Value: i}:
}
}
}()
return out
}

// printStream prints all items from a stream (one per line) and an error if any.
func printStream[A any](stream <-chan rill.Try[A]) {
fmt.Println("Result:")
Expand Down
11 changes: 11 additions & 0 deletions mockapi/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
# Mock API

A minimal mock API implementation used in runnable examples on [pkg.go.dev](https://pkg.go.dev/github.com/destel/rill).

This package simulates common API patterns like:
- Single and bulk item fetching
- Pagination
- Network delay simulation
- Realistic error scenarios

The package is intentionally kept public to enable running and experimenting with examples in the Go Playground.
7 changes: 4 additions & 3 deletions mockapi/users.go
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
// Package mockapi provides a very basic mock API client for examples and demos.
// Unfortunately it must live outside the internal folder to be accessible from runnable examples and go playground.
// Package mockapi provides a very basic mock API for examples and demos.
// It's intentionally kept public to enable running and experimenting with examples in the Go Playground.
package mockapi

import (
Expand Down Expand Up @@ -54,11 +54,12 @@ func GetDepartments() ([]string, error) {

// GetUser returns a user by ID.
func GetUser(ctx context.Context, id int) (*User, error) {
randomSleep(ctx, 500*time.Millisecond)
if err := ctx.Err(); err != nil {
return nil, err
}

randomSleep(ctx, 500*time.Millisecond)

mu.RLock()
defer mu.RUnlock()

Expand Down
2 changes: 1 addition & 1 deletion util.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ func Drain[A any](in <-chan A) {
core.Drain(in)
}

// DrainNB is a non-blocking version of [Drain]. Is does draining in a separate goroutine.
// DrainNB is a non-blocking version of [Drain]. It does draining in a separate goroutine.
func DrainNB[A any](in <-chan A) {
core.DrainNB(in)
}
Expand Down
Loading