Skip to content

Commit

Permalink
Add runnable examples to the docs (#16)
Browse files Browse the repository at this point in the history
  • Loading branch information
destel authored Apr 18, 2024
1 parent 05225c2 commit 86f62df
Show file tree
Hide file tree
Showing 7 changed files with 274 additions and 504 deletions.
133 changes: 61 additions & 72 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -24,63 +24,60 @@ without getting bogged down by the complexity of concurrency.
go get github.com/destel/rill
```



## Example usage
Consider function that fetches keys from multiple URLs, retrieves their values from a key-value database, and prints them.
Consider an application that fetches keys from multiple URLs, retrieves their values from a key-value database in batches, and prints them.
This example demonstrates the library's strengths in handling concurrent tasks, error propagation, batching and data streaming,
all while maintaining simplicity and efficiency.

See a full runnable example at examples/kv-read
[Full runnable example](https://pkg.go.dev/github.com/destel/rill#example-package-Batching)

```go
type KV struct {
Key string
Value string
}


func printValuesFromDB(ctx context.Context, urls []string) error {
ctx, cancel := context.WithCancel(ctx)
defer cancel() // In case of error, this ensures all http and DB operations are canceled

// Convert urls into a channel
urlsChan := rill.FromSlice(urls, nil)

// Fetch and stream keys from each URL concurrently
keys := rill.FlatMap(urlsChan, 10, func(url string) <-chan rill.Try[string] {
return streamLines(ctx, url)
func main() {
urls := rill.FromSlice([]string{
"https://example.com/file1.txt",
"https://example.com/file2.txt",
"https://example.com/file3.txt",
"https://example.com/file4.txt",
}, nil)

// Fetch keys from each URL and flatten them into a single stream
keys := rill.FlatMap(urls, 3, func(url string) <-chan rill.Try[string] {
return streamFileLines(url)
})

// Exclude any empty keys from the stream
keys = rill.Filter(keys, 5, func(key string) (bool, error) {
keys = rill.Filter(keys, 3, func(key string) (bool, error) {
return key != "", nil
})

// Organize keys into manageable batches of 10 for bulk operations
keyBatches := rill.Batch(keys, 10, 1*time.Second)

// Fetch values from DB for each batch of keys
resultBatches := rill.Map(keyBatches, 5, func(keys []string) ([]KV, error) {
values, err := dbMultiGet(ctx, keys...)
resultBatches := rill.Map(keyBatches, 3, func(keys []string) ([]KV, error) {
values, err := kvMultiGet(keys...)
if err != nil {
return nil, err
}

results := make([]KV, len(keys))
for i, key := range keys {
results[i] = KV{Key: key, Value: values[i]}
}

return results, nil
})

// Convert batches back to a single items for final processing
results := rill.Unbatch(resultBatches)

// Exclude any empty values from the stream
results = rill.Filter(results, 5, func(kv KV) (bool, error) {
results = rill.Filter(results, 3, func(kv KV) (bool, error) {
return kv.Value != "<nil>", nil
})

// Iterate over each key-value pair and print
cnt := 0
err := rill.ForEach(results, 1, func(kv KV) error {
Expand All @@ -89,26 +86,21 @@ func printValuesFromDB(ctx context.Context, urls []string) error {
return nil
})
if err != nil {
return err
fmt.Println("Error:", err)
}

fmt.Println("Total keys:", cnt)
return nil
}

// streamLines reads a file from the given URL line by line and returns a channel of lines
func streamLines(ctx context.Context, url string) <-chan rill.Try[string] {
// streamFileLines does line-by-line streaming of a file from a URL,
func streamFileLines(url string) <-chan rill.Try[string] {
// ...
}

// dbMultiGet does a batch read from a key-value database. It returns the values for the given keys.
func dbMultiGet(ctx context.Context, keys ...string) ([]string, error) {
// kvMultiGet does a batch read from a key-value database,
func kvMultiGet(keys ...string) ([]string, error) {
// ...
}




```


Expand All @@ -125,7 +117,7 @@ Rill has a test coverage of over 95%, with testing focused on:


## Design philosophy
At the heart of rill lies a simple yet powerful concept: operating on channels of wrapped values, encapsulated by the Try structure.
At the heart of rill lies a simple yet powerful concept: operating on channels of wrapped values, encapsulated by the **Try** structure.
Such channels can be created manually or through utilities like **FromSlice** or **FromChan**, and then transformed via operations
such as **Map**, **Filter**, **FlatMap** and others. Finally, when all processing stages are completed, the data can be consumed by
**ForEach**, **ToSlice** or manually by iterating over the resulting channel.
Expand Down Expand Up @@ -190,7 +182,7 @@ func doWork(ctx context.Context) error {
ids := streamIDs(ctx)

// Define other pipeline stages...
// Final stage processing
for value := range results {
// Process value...
Expand All @@ -211,10 +203,10 @@ func doWork(ctx context.Context) error {
ids := streamIDs(ctx)

// Define other pipeline stages...
// Ensure pipeline is drained in case of failure
defer rill.DrainNB(results)
// Final stage processing
for value := range results {
// Process value...
Expand Down Expand Up @@ -247,7 +239,7 @@ func doWork(ctx context.Context) error {
}
```

While these measures are effective in preventing leaks, the pipeline may continue to operate in the background as long
While these measures are effective in preventing leaks, the pipeline may continue draining values in the background as long
as the initial stage produces values. A best practice is to manage the first stage (and potentially others) with a context,
allowing for a controlled shutdown:

Expand Down Expand Up @@ -284,24 +276,24 @@ and others. These ensure that if value **x** precedes value **y** in the input c
preserving the original order. It's important to note that these ordered functions incur a small overhead compared to their unordered counterparts,
due to the additional logic required to maintain order.

Order preservation is vital in scenarios where the sequence of data impacts the outcome. Take, for instance, a function that retrieves
Order preservation is vital in scenarios where the sequence of data impacts the outcome. Take, for instance, an application that retrieves
daily temperature measurements over a specific period and calculates the change in temperature from one day to the next.
Although fetching the data in parallel boosts efficiency, processing it in the original order is crucial for
accurate computation of temperature variations.

See a full runnable example at examples/weather
[Full runnable example](https://pkg.go.dev/github.com/destel/rill#example-package-Ordering)

```go
type Measurement struct {
Date time.Time
Temp float64
Change float64
Date time.Time
Temp float64
}

func printTemperatureChanges(ctx context.Context, city string, startDate, endDate time.Time) error {
ctx, cancel := context.WithCancel(ctx)
defer cancel() // In case of error, this ensures all pending operations are canceled

func main() {
city := "New York"
endDate := time.Now()
startDate := endDate.AddDate(0, 0, -30)

// Make a channel that emits all the days between startDate and endDate
days := make(chan rill.Try[time.Time])
go func() {
Expand All @@ -310,33 +302,30 @@ func printTemperatureChanges(ctx context.Context, city string, startDate, endDat
days <- rill.Wrap(date, nil)
}
}()
// Download the temperature for each day in parallel and in order

// Download the temperature for each day concurrently
measurements := rill.OrderedMap(days, 10, func(date time.Time) (Measurement, error) {
temp, err := getTemperature(ctx, city, date)
temp, err := getTemperature(city, date)
return Measurement{Date: date, Temp: temp}, err
})
// Calculate the temperature changes. Use a single goroutine

// Iterate over the measurements, calculate and print changes. Use a single goroutine
prev := Measurement{Temp: math.NaN()}
measurements = rill.OrderedMap(measurements, 1, func(m Measurement) (Measurement, error) {
m.Change = m.Temp - prev.Temp
prev = m
return m, nil
})

// Iterate over the measurements and print the results
err := rill.ForEach(measurements, 1, func(m Measurement) error {
fmt.Printf("%s: %.1f°C (change %+.1f°C)\n", m.Date.Format("2006-01-02"), m.Temp, m.Change)
change := m.Temp - prev.Temp
prev = m

fmt.Printf("%s: %.1f°C (change %+.1f°C)\n", m.Date.Format("2006-01-02"), m.Temp, change)
return nil
})

return err
if err != nil {
fmt.Println("Error:", err)
}
}

// getTemperature does a network request to fetch the temperature for a given city and date.
func getTemperature(ctx context.Context, city string, date time.Time) (float64, error) {
// getTemperature fetches a temperature reading for a city and date,
func getTemperature(city string, date time.Time) (float64, error) {
// ...
}

```
Loading

0 comments on commit 86f62df

Please sign in to comment.