Skip to content

Commit

Permalink
Improve documentation clarity and reorganize examples (#41)
Browse files Browse the repository at this point in the history
  • Loading branch information
destel authored Nov 23, 2024
1 parent 72244bc commit 3acd02c
Show file tree
Hide file tree
Showing 6 changed files with 286 additions and 243 deletions.
338 changes: 183 additions & 155 deletions README.md

Large diffs are not rendered by default.

18 changes: 9 additions & 9 deletions example123_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,11 +18,11 @@ func ExampleFromSeq() {

// Transform each number
// Concurrency = 3
results := rill.Map(numbers, 3, func(x int) (int, error) {
return doSomethingWithNumber(x), nil
squares := rill.Map(numbers, 3, func(x int) (int, error) {
return square(x), nil
})

printStream(results)
printStream(squares)
}

func ExampleFromSeq2() {
Expand All @@ -40,11 +40,11 @@ func ExampleFromSeq2() {

// Transform each number
// Concurrency = 3
results := rill.Map(numbers, 3, func(x int) (int, error) {
return doSomethingWithNumber(x), nil
squares := rill.Map(numbers, 3, func(x int) (int, error) {
return square(x), nil
})

printStream(results)
printStream(squares)
}

func ExampleToSeq2() {
Expand All @@ -53,12 +53,12 @@ func ExampleToSeq2() {

// Transform each number
// Concurrency = 3
results := rill.Map(numbers, 3, func(x int) (int, error) {
return doSomethingWithNumber(x), nil
squares := rill.Map(numbers, 3, func(x int) (int, error) {
return square(x), nil
})

// Convert the stream into an iterator and use for-range to print the results
for val, err := range rill.ToSeq2(results) {
for val, err := range rill.ToSeq2(squares) {
if err != nil {
fmt.Println("Error:", err)
break // cleanup is done regardless of early exit
Expand Down
153 changes: 78 additions & 75 deletions example_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,7 @@ func Example_batching() {
}, nil)

// Group IDs into batches of 5
idBatches := rill.Batch(ids, 5, 1*time.Second)
idBatches := rill.Batch(ids, 5, -1)

// Bulk fetch users from the API
// Concurrency = 3
Expand Down Expand Up @@ -108,7 +108,7 @@ func Example_batching() {
// emits partial batches, ensuring that updates are delayed by at most 100ms.
//
// For simplicity, this example does not have retries, error handling and synchronization
func Example_batchingWithTimeout() {
func Example_batchingRealTime() {
// Start the background worker that processes the updates
go updateUserTimestampWorker()

Expand Down Expand Up @@ -170,20 +170,13 @@ func Example_ordering() {
// The string to search for in the downloaded files
needle := []byte("26")

// Manually generate a stream of URLs from http://example.com/file-0.txt to http://example.com/file-999.txt
urls := make(chan rill.Try[string])
go func() {
defer close(urls)
for i := 0; i < 1000; i++ {
// Stop generating URLs after the context is canceled (when the file is found)
// This can be rewritten as a select statement, but it's not necessary
if err := ctx.Err(); err != nil {
return
}
// Start with a stream of numbers from 0 to 999
fileIDs := streamNumbers(ctx, 0, 1000)

urls <- rill.Wrap(fmt.Sprintf("https://example.com/file-%d.txt", i), nil)
}
}()
// Generate a stream of URLs from http://example.com/file-0.txt to http://example.com/file-999.txt
urls := rill.OrderedMap(fileIDs, 1, func(id int) (string, error) {
return fmt.Sprintf("https://example.com/file-%d.txt", id), nil
})

// Download and process the files
// At most 5 files are downloaded and held in memory at the same time
Expand Down Expand Up @@ -251,17 +244,16 @@ func sendMessage(message string, server string) error {
return nil
}

// This example demonstrates using [FlatMap] to accelerate paginated API calls. Instead of fetching all users sequentially,
// page-by-page (which would take a long time since the API is slow and the number of pages is large), it fetches users from
// multiple departments in parallel. The example also shows how to write a reusable streaming wrapper around an existing
// API function that can be used on its own or as part of a larger pipeline.
func Example_parallelStreams() {
// This example demonstrates using [FlatMap] to fetch users from multiple departments concurrently.
// Additionally, it demonstrates how to write a reusable streaming wrapper over paginated API calls - the StreamUsers function
func Example_flatMap() {
ctx := context.Background()

// Convert a list of all departments into a stream
departments := rill.FromSlice(mockapi.GetDepartments())
// Start with a stream of department names
departments := rill.FromSlice([]string{"IT", "Finance", "Marketing", "Support", "Engineering"}, nil)

// Use FlatMap to stream users from 3 departments concurrently.
// Stream users from all departments concurrently.
// At most 3 departments at the same time.
users := rill.FlatMap(departments, 3, func(department string) <-chan rill.Try[*mockapi.User] {
return StreamUsers(ctx, &mockapi.UserQuery{Department: department})
})
Expand All @@ -276,7 +268,7 @@ func Example_parallelStreams() {

// StreamUsers is a reusable streaming wrapper around the mockapi.ListUsers function.
// It iterates through all listing pages and returns a stream of users.
// This function is useful on its own or as a building block for more complex pipelines.
// This function is useful both on its own and as part of larger pipelines.
func StreamUsers(ctx context.Context, query *mockapi.UserQuery) <-chan rill.Try[*mockapi.User] {
res := make(chan rill.Try[*mockapi.User])

Expand Down Expand Up @@ -309,43 +301,38 @@ func StreamUsers(ctx context.Context, query *mockapi.UserQuery) <-chan rill.Try[
return res
}

// This example demonstrates how to use a context for pipeline termination.
// The FindFirstPrime function uses several concurrent workers to find the first prime number after a given number.
// Internally it creates a pipeline that starts from an infinite stream of numbers. When the first prime number is found
// in that stream, the context gets canceled, and the pipeline terminates gracefully.
// This example demonstrates how to gracefully stop a pipeline on the first error.
// The CheckAllUsersExist uses several concurrent workers and returns an error as soon as it encounters a non-existent user.
// Such early return triggers the context cancellation, which in turn stops all remaining users fetches.
func Example_context() {
p := FindFirstPrime(10000, 3) // Use 3 concurrent workers
fmt.Println("The first prime after 10000 is", p)
ctx := context.Background()

// ID 999 doesn't exist, so fetching will stop after hitting it.
err := CheckAllUsersExist(ctx, 3, []int{1, 2, 3, 4, 5, 999, 7, 8, 9, 10})
fmt.Printf("Check result: %v\n", err)
}

// FindFirstPrime finds the first prime number after the given number, using several concurrent workers.
func FindFirstPrime(after int, concurrency int) int {
ctx, cancel := context.WithCancel(context.Background())
// CheckAllUsersExist uses several concurrent workers to checks if all users with given IDs exist.
func CheckAllUsersExist(ctx context.Context, concurrency int, ids []int) error {
// Create new context that will be canceled when this function returns
ctx, cancel := context.WithCancel(ctx)
defer cancel()

// Generate an infinite stream of numbers starting from the given number
numbers := make(chan rill.Try[int])
go func() {
defer close(numbers)
for i := after + 1; ; i++ {
select {
case <-ctx.Done():
return // Stop generating numbers when the context is canceled
case numbers <- rill.Wrap(i, nil):
}
idsStream := rill.FromSlice(ids, nil)

// Fetch users concurrently.
users := rill.Map(idsStream, concurrency, func(id int) (*mockapi.User, error) {
u, err := mockapi.GetUser(ctx, id)
if err != nil {
return nil, fmt.Errorf("failed to fetch user %d: %w", id, err)
}
}()

// Filter out non-prime numbers, preserve the order
primes := rill.OrderedFilter(numbers, concurrency, func(x int) (bool, error) {
fmt.Println("Checking", x)
return isPrime(x), nil
fmt.Printf("Fetched user %d\n", id)
return u, nil
})

// Get the first prime and cancel the context
// This stops number generation and allows goroutines to exit
result, _, _ := rill.First(primes)
return result
// Return the first error (if any) and cancel remaining fetches via context
return rill.Err(users)
}

// --- Function examples ---
Expand Down Expand Up @@ -591,10 +578,10 @@ func ExampleForEach() {
// Convert a slice of numbers into a stream
numbers := rill.FromSlice([]int{1, 2, 3, 4, 5, 6, 7, 8, 9, 10}, nil)

// Do something with each number and print the result
// Square each number and print the result
// Concurrency = 3
err := rill.ForEach(numbers, 3, func(x int) error {
y := doSomethingWithNumber(x)
y := square(x)
fmt.Println(y)
return nil
})
Expand All @@ -610,15 +597,15 @@ func ExampleForEach_ordered() {
// Convert a slice of numbers into a stream
numbers := rill.FromSlice([]int{1, 2, 3, 4, 5, 6, 7, 8, 9, 10}, nil)

// Do something with each number
// Square each number
// Concurrency = 3; Ordered
results := rill.OrderedMap(numbers, 3, func(x int) (int, error) {
return doSomethingWithNumber(x), nil
squares := rill.OrderedMap(numbers, 3, func(x int) (int, error) {
return square(x), nil
})

// Print results.
// Concurrency = 1; Ordered
err := rill.ForEach(results, 1, func(y int) error {
err := rill.ForEach(squares, 1, func(y int) error {
fmt.Println(y)
return nil
})
Expand All @@ -633,11 +620,11 @@ func ExampleMap() {

// Transform each number
// Concurrency = 3
results := rill.Map(numbers, 3, func(x int) (int, error) {
return doSomethingWithNumber(x), nil
squares := rill.Map(numbers, 3, func(x int) (int, error) {
return square(x), nil
})

printStream(results)
printStream(squares)
}

// The same example as for the [Map], but using ordered versions of functions.
Expand All @@ -647,11 +634,11 @@ func ExampleOrderedMap() {

// Transform each number
// Concurrency = 3; Ordered
results := rill.OrderedMap(numbers, 3, func(x int) (int, error) {
return doSomethingWithNumber(x), nil
squares := rill.OrderedMap(numbers, 3, func(x int) (int, error) {
return square(x), nil
})

printStream(results)
printStream(squares)
}

func ExampleMapReduce() {
Expand Down Expand Up @@ -709,11 +696,11 @@ func ExampleToSlice() {

// Transform each number
// Concurrency = 3; Ordered
results := rill.OrderedMap(numbers, 3, func(x int) (int, error) {
return doSomethingWithNumber(x), nil
squares := rill.OrderedMap(numbers, 3, func(x int) (int, error) {
return square(x), nil
})

resultsSlice, err := rill.ToSlice(results)
resultsSlice, err := rill.ToSlice(squares)

fmt.Println("Result:", resultsSlice)
fmt.Println("Error:", err)
Expand All @@ -735,15 +722,8 @@ func ExampleUnbatch() {

// --- Helpers ---

// helper function that squares the number
// helper function that checks if a number is prime
// and simulates some additional work using sleep
func doSomethingWithNumber(x int) int {
randomSleep(500 * time.Millisecond) // simulate some additional work
return x * x
}

// naive prime number check.
// also simulates some additional work using sleep
func isPrime(n int) bool {
randomSleep(500 * time.Millisecond) // simulate some additional work

Expand All @@ -758,6 +738,29 @@ func isPrime(n int) bool {
return true
}

// helper function that squares the number
// and simulates some additional work using sleep
func square(x int) int {
randomSleep(500 * time.Millisecond) // simulate some additional work
return x * x
}

// helper function that creates a stream of numbers [start, end) and respects the context
func streamNumbers(ctx context.Context, start, end int) <-chan rill.Try[int] {
out := make(chan rill.Try[int])
go func() {
defer close(out)
for i := start; i < end; i++ {
select {
case <-ctx.Done():
return
case out <- rill.Try[int]{Value: i}:
}
}
}()
return out
}

// printStream prints all items from a stream (one per line) and an error if any.
func printStream[A any](stream <-chan rill.Try[A]) {
fmt.Println("Result:")
Expand Down
11 changes: 11 additions & 0 deletions mockapi/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
# Mock API

A minimal mock API implementation used in runnable examples on [pkg.go.dev](https://pkg.go.dev/github.com/destel/rill).

This package simulates common API patterns like:
- Single and bulk item fetching
- Pagination
- Network delay simulation
- Realistic error scenarios

The package is intentionally kept public to enable running and experimenting with examples in the Go Playground.
7 changes: 4 additions & 3 deletions mockapi/users.go
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
// Package mockapi provides a very basic mock API client for examples and demos.
// Unfortunately it must live outside the internal folder to be accessible from runnable examples and go playground.
// Package mockapi provides a very basic mock API for examples and demos.
// It's intentionally kept public to enable running and experimenting with examples in the Go Playground.
package mockapi

import (
Expand Down Expand Up @@ -54,11 +54,12 @@ func GetDepartments() ([]string, error) {

// GetUser returns a user by ID.
func GetUser(ctx context.Context, id int) (*User, error) {
randomSleep(ctx, 500*time.Millisecond)
if err := ctx.Err(); err != nil {
return nil, err
}

randomSleep(ctx, 500*time.Millisecond)

mu.RLock()
defer mu.RUnlock()

Expand Down
2 changes: 1 addition & 1 deletion util.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ func Drain[A any](in <-chan A) {
core.Drain(in)
}

// DrainNB is a non-blocking version of [Drain]. Is does draining in a separate goroutine.
// DrainNB is a non-blocking version of [Drain]. It does draining in a separate goroutine.
func DrainNB[A any](in <-chan A) {
core.DrainNB(in)
}
Expand Down

0 comments on commit 3acd02c

Please sign in to comment.