diff --git a/README.md b/README.md index 24b6b93..94e2df4 100644 --- a/README.md +++ b/README.md @@ -24,63 +24,60 @@ without getting bogged down by the complexity of concurrency. go get github.com/destel/rill ``` + + ## Example usage -Consider function that fetches keys from multiple URLs, retrieves their values from a key-value database, and prints them. +Consider an application that fetches keys from multiple URLs, retrieves their values from a key-value database in batches, and prints them. This example demonstrates the library's strengths in handling concurrent tasks, error propagation, batching and data streaming, all while maintaining simplicity and efficiency. -See a full runnable example at examples/kv-read +[Full runnable example](https://pkg.go.dev/github.com/destel/rill#example-package-Batching) ```go -type KV struct { - Key string - Value string -} - - -func printValuesFromDB(ctx context.Context, urls []string) error { - ctx, cancel := context.WithCancel(ctx) - defer cancel() // In case of error, this ensures all http and DB operations are canceled - - // Convert urls into a channel - urlsChan := rill.FromSlice(urls, nil) - - // Fetch and stream keys from each URL concurrently - keys := rill.FlatMap(urlsChan, 10, func(url string) <-chan rill.Try[string] { - return streamLines(ctx, url) +func main() { + urls := rill.FromSlice([]string{ + "https://example.com/file1.txt", + "https://example.com/file2.txt", + "https://example.com/file3.txt", + "https://example.com/file4.txt", + }, nil) + + // Fetch keys from each URL and flatten them into a single stream + keys := rill.FlatMap(urls, 3, func(url string) <-chan rill.Try[string] { + return streamFileLines(url) }) - + // Exclude any empty keys from the stream - keys = rill.Filter(keys, 5, func(key string) (bool, error) { + keys = rill.Filter(keys, 3, func(key string) (bool, error) { return key != "", nil }) - + // Organize keys into manageable batches of 10 for bulk operations keyBatches := rill.Batch(keys, 10, 1*time.Second) - + // Fetch values from DB for each batch of keys - resultBatches := rill.Map(keyBatches, 5, func(keys []string) ([]KV, error) { - values, err := dbMultiGet(ctx, keys...) + resultBatches := rill.Map(keyBatches, 3, func(keys []string) ([]KV, error) { + values, err := kvMultiGet(keys...) if err != nil { return nil, err } - + results := make([]KV, len(keys)) for i, key := range keys { results[i] = KV{Key: key, Value: values[i]} } - + return results, nil }) - + // Convert batches back to a single items for final processing results := rill.Unbatch(resultBatches) - + // Exclude any empty values from the stream - results = rill.Filter(results, 5, func(kv KV) (bool, error) { + results = rill.Filter(results, 3, func(kv KV) (bool, error) { return kv.Value != "", nil }) - + // Iterate over each key-value pair and print cnt := 0 err := rill.ForEach(results, 1, func(kv KV) error { @@ -89,26 +86,21 @@ func printValuesFromDB(ctx context.Context, urls []string) error { return nil }) if err != nil { - return err + fmt.Println("Error:", err) } - + fmt.Println("Total keys:", cnt) - return nil } -// streamLines reads a file from the given URL line by line and returns a channel of lines -func streamLines(ctx context.Context, url string) <-chan rill.Try[string] { +// streamFileLines does line-by-line streaming of a file from a URL, +func streamFileLines(url string) <-chan rill.Try[string] { // ... } -// dbMultiGet does a batch read from a key-value database. It returns the values for the given keys. -func dbMultiGet(ctx context.Context, keys ...string) ([]string, error) { +// kvMultiGet does a batch read from a key-value database, +func kvMultiGet(keys ...string) ([]string, error) { // ... } - - - - ``` @@ -125,7 +117,7 @@ Rill has a test coverage of over 95%, with testing focused on: ## Design philosophy -At the heart of rill lies a simple yet powerful concept: operating on channels of wrapped values, encapsulated by the Try structure. +At the heart of rill lies a simple yet powerful concept: operating on channels of wrapped values, encapsulated by the **Try** structure. Such channels can be created manually or through utilities like **FromSlice** or **FromChan**, and then transformed via operations such as **Map**, **Filter**, **FlatMap** and others. Finally, when all processing stages are completed, the data can be consumed by **ForEach**, **ToSlice** or manually by iterating over the resulting channel. @@ -190,7 +182,7 @@ func doWork(ctx context.Context) error { ids := streamIDs(ctx) // Define other pipeline stages... - + // Final stage processing for value := range results { // Process value... @@ -211,10 +203,10 @@ func doWork(ctx context.Context) error { ids := streamIDs(ctx) // Define other pipeline stages... - + // Ensure pipeline is drained in case of failure defer rill.DrainNB(results) - + // Final stage processing for value := range results { // Process value... @@ -247,7 +239,7 @@ func doWork(ctx context.Context) error { } ``` -While these measures are effective in preventing leaks, the pipeline may continue to operate in the background as long +While these measures are effective in preventing leaks, the pipeline may continue draining values in the background as long as the initial stage produces values. A best practice is to manage the first stage (and potentially others) with a context, allowing for a controlled shutdown: @@ -284,24 +276,24 @@ and others. These ensure that if value **x** precedes value **y** in the input c preserving the original order. It's important to note that these ordered functions incur a small overhead compared to their unordered counterparts, due to the additional logic required to maintain order. -Order preservation is vital in scenarios where the sequence of data impacts the outcome. Take, for instance, a function that retrieves +Order preservation is vital in scenarios where the sequence of data impacts the outcome. Take, for instance, an application that retrieves daily temperature measurements over a specific period and calculates the change in temperature from one day to the next. Although fetching the data in parallel boosts efficiency, processing it in the original order is crucial for accurate computation of temperature variations. -See a full runnable example at examples/weather +[Full runnable example](https://pkg.go.dev/github.com/destel/rill#example-package-Ordering) ```go type Measurement struct { - Date time.Time - Temp float64 - Change float64 + Date time.Time + Temp float64 } -func printTemperatureChanges(ctx context.Context, city string, startDate, endDate time.Time) error { - ctx, cancel := context.WithCancel(ctx) - defer cancel() // In case of error, this ensures all pending operations are canceled - +func main() { + city := "New York" + endDate := time.Now() + startDate := endDate.AddDate(0, 0, -30) + // Make a channel that emits all the days between startDate and endDate days := make(chan rill.Try[time.Time]) go func() { @@ -310,33 +302,30 @@ func printTemperatureChanges(ctx context.Context, city string, startDate, endDat days <- rill.Wrap(date, nil) } }() - - // Download the temperature for each day in parallel and in order + + // Download the temperature for each day concurrently measurements := rill.OrderedMap(days, 10, func(date time.Time) (Measurement, error) { - temp, err := getTemperature(ctx, city, date) + temp, err := getTemperature(city, date) return Measurement{Date: date, Temp: temp}, err }) - - // Calculate the temperature changes. Use a single goroutine + + // Iterate over the measurements, calculate and print changes. Use a single goroutine prev := Measurement{Temp: math.NaN()} - measurements = rill.OrderedMap(measurements, 1, func(m Measurement) (Measurement, error) { - m.Change = m.Temp - prev.Temp - prev = m - return m, nil - }) - - // Iterate over the measurements and print the results err := rill.ForEach(measurements, 1, func(m Measurement) error { - fmt.Printf("%s: %.1f°C (change %+.1f°C)\n", m.Date.Format("2006-01-02"), m.Temp, m.Change) + change := m.Temp - prev.Temp prev = m + + fmt.Printf("%s: %.1f°C (change %+.1f°C)\n", m.Date.Format("2006-01-02"), m.Temp, change) return nil }) - - return err + if err != nil { + fmt.Println("Error:", err) + } } -// getTemperature does a network request to fetch the temperature for a given city and date. -func getTemperature(ctx context.Context, city string, date time.Time) (float64, error) { +// getTemperature fetches a temperature reading for a city and date, +func getTemperature(city string, date time.Time) (float64, error) { // ... } + ``` \ No newline at end of file diff --git a/example_test.go b/example_test.go new file mode 100644 index 0000000..521fd3d --- /dev/null +++ b/example_test.go @@ -0,0 +1,213 @@ +package rill_test + +import ( + "fmt" + "math" + "math/rand" + "path/filepath" + "strings" + "time" + + "github.com/destel/rill" +) + +type KV struct { + Key string + Value string +} + +type Measurement struct { + Date time.Time + Temp float64 +} + +// A basic demonstrating how [ForEach] can be used to process a list of items concurrently. +func Example_basic() { + items := rill.FromSlice([]string{"item1", "item2", "item3", "item4", "item5", "item6", "item7", "item8", "item9", "item10"}, nil) + + err := rill.ForEach(items, 3, func(item string) error { + randomSleep(1000 * time.Millisecond) // simulate some additional work + res := strings.ToUpper(item) + fmt.Println(res) + return nil + }) + if err != nil { + fmt.Println("Error:", err) + } +} + +// This example fetches keys from a list of URLs, retrieves their values from a key-value database, and prints them. +// The pipeline leverages concurrency for fetching and processing and uses batching to reduce the number of database calls. +func Example_batching() { + startedAt := time.Now() + defer func() { fmt.Println("Elapsed:", time.Since(startedAt)) }() + + urls := rill.FromSlice([]string{ + "https://example.com/file1.txt", + "https://example.com/file2.txt", + "https://example.com/file3.txt", + "https://example.com/file4.txt", + }, nil) + + // Fetch keys from each URL and flatten them into a single stream + keys := rill.FlatMap(urls, 3, func(url string) <-chan rill.Try[string] { + return streamFileLines(url) + }) + + // Exclude any empty keys from the stream + keys = rill.Filter(keys, 3, func(key string) (bool, error) { + return key != "", nil + }) + + // Organize keys into manageable batches of 10 for bulk operations + keyBatches := rill.Batch(keys, 10, 1*time.Second) + + // Fetch values from DB for each batch of keys + resultBatches := rill.Map(keyBatches, 3, func(keys []string) ([]KV, error) { + values, err := kvMultiGet(keys...) + if err != nil { + return nil, err + } + + results := make([]KV, len(keys)) + for i, key := range keys { + results[i] = KV{Key: key, Value: values[i]} + } + + return results, nil + }) + + // Convert batches back to a single items for final processing + results := rill.Unbatch(resultBatches) + + // Exclude any empty values from the stream + results = rill.Filter(results, 3, func(kv KV) (bool, error) { + return kv.Value != "", nil + }) + + // Iterate over each key-value pair and print + cnt := 0 + err := rill.ForEach(results, 1, func(kv KV) error { + fmt.Println(kv.Key, "=>", kv.Value) + cnt++ + return nil + }) + if err != nil { + fmt.Println("Error:", err) + } + + fmt.Println("Total keys:", cnt) +} + +// This example demonstrates how [OrderedMap] can be used to enforce ordering of processing results. +// Pipeline below fetches temperature measurements for a city and calculates daily temperature changes. +// Measurements are fetched concurrently, but ordered processing is used to calculate the changes. +func Example_ordering() { + startedAt := time.Now() + defer func() { fmt.Println("Elapsed:", time.Since(startedAt)) }() + + city := "New York" + endDate := time.Now() + startDate := endDate.AddDate(0, 0, -30) + + // Make a channel that emits all the days between startDate and endDate + days := make(chan rill.Try[time.Time]) + go func() { + defer close(days) + for date := startDate; date.Before(endDate); date = date.AddDate(0, 0, 1) { + days <- rill.Wrap(date, nil) + } + }() + + // Download the temperature for each day concurrently + measurements := rill.OrderedMap(days, 10, func(date time.Time) (Measurement, error) { + temp, err := getTemperature(city, date) + return Measurement{Date: date, Temp: temp}, err + }) + + // Iterate over the measurements, calculate and print changes. Use a single goroutine + prev := Measurement{Temp: math.NaN()} + err := rill.ForEach(measurements, 1, func(m Measurement) error { + change := m.Temp - prev.Temp + prev = m + + fmt.Printf("%s: %.1f°C (change %+.1f°C)\n", m.Date.Format("2006-01-02"), m.Temp, change) + return nil + }) + if err != nil { + fmt.Println("Error:", err) + } +} + +// streamFileLines simulates line-by-line streaming of a file from a URL, +// introducing a randomized delay to simulate network latency. +// It's a simplified placeholder for actual network-based file streaming. +func streamFileLines(url string) <-chan rill.Try[string] { + out := make(chan rill.Try[string]) + go func() { + defer close(out) + + base := filepath.Base(url) + base = strings.TrimSuffix(base, filepath.Ext(base)) + + for i := 0; i < 10; i++ { + randomSleep(20 * time.Millisecond) // Simulate a network delay + out <- rill.Wrap(fmt.Sprintf("%s:key:%d", base, i), nil) + } + }() + return out +} + +// kvGet simulates fetching a value form a key-value database, +// introducing a randomized delay to simulate network latency. +// It's a simplified placeholder for actual database operation. +func kvGet(key string) (string, error) { + randomSleep(1000 * time.Millisecond) // Simulate a network delay + + // Simulates that some keys are missing + if strings.HasSuffix(key, "2") || strings.HasSuffix(key, "3") { + return "", nil + } + + return strings.Replace(key, "key:", "val:", 1), nil +} + +// kvMultiGet simulates a batch read from a key-value database, +// introducing a randomized delay to simulate network latency. +// It's a simplified placeholder for actual database operation. +func kvMultiGet(keys ...string) ([]string, error) { + randomSleep(1000 * time.Millisecond) // Simulate a network delay + + values := make([]string, len(keys)) + for i, key := range keys { + // Simulates that some keys are missing + if strings.HasSuffix(key, "2") || strings.HasSuffix(key, "3") { + values[i] = "" + continue + } + + values[i] = strings.Replace(key, "key:", "val:", 1) + } + + return values, nil +} + +// getTemperature simulates fetching a temperature reading for a city and date, +func getTemperature(city string, date time.Time) (float64, error) { + randomSleep(1000 * time.Millisecond) // Simulate a network delay + + // Basic city hash, to make measurements unique for each city + var h float64 + for _, c := range city { + h += float64(c) + } + + // Simulate a temperature reading, by retuning a pseudo-random, but deterministic value + temp := 15 - 10*math.Sin(h+float64(date.Unix())) + + return temp, nil +} + +func randomSleep(max time.Duration) { + time.Sleep(time.Duration(rand.Intn(int(max)))) +} diff --git a/examples/kv-read/ids1.txt b/examples/kv-read/ids1.txt deleted file mode 100644 index 6c99644..0000000 --- a/examples/kv-read/ids1.txt +++ /dev/null @@ -1,100 +0,0 @@ -id1000 -id1001 -id1002 -id1003 -id1004 -id1005 -id1006 -id1007 -id1008 -id1009 -id1010 -id1011 -id1012 -id1013 -id1014 -id1015 -id1016 -id1017 -id1018 -id1019 -id1020 -id1021 -id1022 -id1023 -id1024 -id1025 -id1026 -id1027 -id1028 -id1029 -id1030 -id1031 -id1032 -id1033 -id1034 -id1035 -id1036 -id1037 -id1038 -id1039 -id1040 -id1041 -id1042 -id1043 -id1044 -id1045 -id1046 -id1047 -id1048 -id1049 -id1050 -id1051 -id1052 -id1053 -id1054 -id1055 -id1056 -id1057 -id1058 -id1059 -id1060 -id1061 -id1062 -id1063 -id1064 -id1065 -id1066 -id1067 -id1068 -id1069 -id1070 -id1071 -id1072 -id1073 -id1074 -id1075 -id1076 -id1077 -id1078 -id1079 -id1080 -id1081 -id1082 -id1083 -id1084 -id1085 -id1086 -id1087 -id1088 -id1089 -id1090 -id1091 -id1092 -id1093 -id1094 -id1095 -id1096 -id1097 -id1098 -id1099 diff --git a/examples/kv-read/ids2.txt b/examples/kv-read/ids2.txt deleted file mode 100644 index 8806c99..0000000 --- a/examples/kv-read/ids2.txt +++ /dev/null @@ -1,50 +0,0 @@ -id2000 -id2001 -id2002 -id2003 -id2004 -id2005 -id2006 -id2007 -id2008 -id2009 -id2010 -id2011 -id2012 -id2013 -id2014 -id2015 -id2016 -id2017 -id2018 -id2019 -id2020 -id2021 -id2022 -id2023 -id2024 -id2025 -id2026 -id2027 -id2028 -id2029 -id2030 -id2031 -id2032 -id2033 -id2034 -id2035 -id2036 -id2037 -id2038 -id2039 -id2040 -id2041 -id2042 -id2043 -id2044 -id2045 -id2046 -id2047 -id2048 -id2049 diff --git a/examples/kv-read/ids3.txt b/examples/kv-read/ids3.txt deleted file mode 100644 index 9714c2b..0000000 --- a/examples/kv-read/ids3.txt +++ /dev/null @@ -1,23 +0,0 @@ -id3000 -id3001 -id3002 -id3003 - -id3004 -id3005 -id3006 -id3007 -id3008 -id3009 - -id3010 -id3011 -id3012 -id3013 -id3014 - -id3015 -id3016 -id3017 -id3018 -id3019 diff --git a/examples/kv-read/main.go b/examples/kv-read/main.go deleted file mode 100644 index f485dd4..0000000 --- a/examples/kv-read/main.go +++ /dev/null @@ -1,167 +0,0 @@ -package main - -import ( - "bufio" - "context" - "errors" - "fmt" - "io" - "math/rand" - "net/http" - "strings" - "time" - - "github.com/destel/rill" -) - -type KV struct { - Key string - Value string -} - -func main() { - err := printValuesFromDB(context.Background(), []string{ - "https://raw.githubusercontent.com/destel/rill/main/examples/kv-read/ids1.txt", - "https://raw.githubusercontent.com/destel/rill/main/examples/kv-read/ids2.txt", - "https://raw.githubusercontent.com/destel/rill/main/examples/kv-read/ids3.txt", - }) - - if err != nil { - fmt.Println("Error:", err) - } -} - -// printValuesFromDB orchestrates a pipeline that fetches keys from URLs, retrieves their values from -// key-value database, and prints them. -// The pipeline leverages concurrency for fetching and processing and uses batching to reduce the number of db calls. -func printValuesFromDB(ctx context.Context, urls []string) error { - ctx, cancel := context.WithCancel(ctx) - defer cancel() // In case of error, this ensures all http and DB operations are canceled - - // Convert urls into a channel - urlsChan := rill.FromSlice(urls, nil) - - // Fetch and stream keys from each URL concurrently - keys := rill.FlatMap(urlsChan, 10, func(url string) <-chan rill.Try[string] { - return streamLines(ctx, url) - }) - - // Exclude any empty keys from the stream - keys = rill.Filter(keys, 5, func(key string) (bool, error) { - return key != "", nil - }) - - // Organize keys into manageable batches of 10 for bulk operations - keyBatches := rill.Batch(keys, 10, 1*time.Second) - - // Fetch values from DB for each batch of keys - resultBatches := rill.Map(keyBatches, 5, func(keys []string) ([]KV, error) { - values, err := dbMultiGet(ctx, keys...) - if err != nil { - return nil, err - } - - results := make([]KV, len(keys)) - for i, key := range keys { - results[i] = KV{Key: key, Value: values[i]} - } - - return results, nil - }) - - // Convert batches back to a single items for final processing - results := rill.Unbatch(resultBatches) - - // Exclude any empty values from the stream - results = rill.Filter(results, 5, func(kv KV) (bool, error) { - return kv.Value != "", nil - }) - - // Iterate over each key-value pair and print - cnt := 0 - err := rill.ForEach(results, 1, func(kv KV) error { - fmt.Println(kv.Key, "=>", kv.Value) - cnt++ - return nil - }) - if err != nil { - return err - } - - fmt.Println("Total keys:", cnt) - return nil -} - -// streamLines reads a file from the given URL line by line and returns a channel of lines -func streamLines(ctx context.Context, url string) <-chan rill.Try[string] { - out := make(chan rill.Try[string], 1) - - go func() { - defer close(out) - - req, err := http.NewRequestWithContext(ctx, "GET", url, nil) - if err != nil { - out <- rill.Try[string]{Error: err} - return - } - - res, err := http.DefaultClient.Do(req) - if err != nil { - out <- rill.Try[string]{Error: err} - return - } - defer res.Body.Close() - - if res.StatusCode != http.StatusOK { - out <- rill.Try[string]{Error: fmt.Errorf("got %d code for %s", res.StatusCode, url)} - return - } - - r := bufio.NewReader(res.Body) - - for { - line, err := r.ReadString('\n') - line = strings.TrimSuffix(line, "\n") - - if errors.Is(err, io.EOF) { - out <- rill.Try[string]{Value: line} - return - } - if err != nil { - out <- rill.Try[string]{Error: err} - return - } - - out <- rill.Try[string]{Value: line} - } - }() - - return out -} - -// dbMultiGet emulates a batch read from a key-value database. It returns the values for the given keys. -func dbMultiGet(ctx context.Context, keys ...string) ([]string, error) { - if err := ctx.Err(); err != nil { - return nil, err - } - - // Emulate a network delay - randomSleep(1000 * time.Millisecond) - - values := make([]string, len(keys)) - for i, key := range keys { - // Emulate that some keys are missing - if strings.HasSuffix(key, "0") || strings.HasSuffix(key, "5") { - values[i] = "" - continue - } - - values[i] = "val" + strings.TrimPrefix(key, "id") - } - - return values, nil -} - -func randomSleep(max time.Duration) { - time.Sleep(time.Duration(rand.Intn(int(max)))) -} diff --git a/examples/weather/main.go b/examples/weather/main.go deleted file mode 100644 index fa3039e..0000000 --- a/examples/weather/main.go +++ /dev/null @@ -1,92 +0,0 @@ -package main - -import ( - "context" - "fmt" - "math" - "math/rand" - "time" - - "github.com/destel/rill" -) - -type Measurement struct { - Date time.Time - Temp float64 - Change float64 -} - -func main() { - endDate := time.Now() - startDate := endDate.AddDate(0, 0, -30) - - err := printTemperatureChanges(context.Background(), "New York", startDate, endDate) - if err != nil { - fmt.Println("Error:", err) - } -} - -// printTemperatureChanges orchestrates a pipeline that fetches temperature measurements for a given city and -// prints the daily temperature changes. Measurements are fetched concurrently, but the changes are calculated -// in order, using a single goroutine. -func printTemperatureChanges(ctx context.Context, city string, startDate, endDate time.Time) error { - ctx, cancel := context.WithCancel(ctx) - defer cancel() // In case of error, this ensures all pending operations are canceled - - // Make a channel that emits all the days between startDate and endDate - days := make(chan rill.Try[time.Time]) - go func() { - defer close(days) - for date := startDate; date.Before(endDate); date = date.AddDate(0, 0, 1) { - days <- rill.Wrap(date, nil) - } - }() - - // Download the temperature for each day in parallel and in order - measurements := rill.OrderedMap(days, 10, func(date time.Time) (Measurement, error) { - temp, err := getTemperature(ctx, city, date) - return Measurement{Date: date, Temp: temp}, err - }) - - // Calculate the temperature changes. Use a single goroutine - prev := Measurement{Temp: math.NaN()} - measurements = rill.OrderedMap(measurements, 1, func(m Measurement) (Measurement, error) { - m.Change = m.Temp - prev.Temp - prev = m - return m, nil - }) - - // Iterate over the measurements and print the results - err := rill.ForEach(measurements, 1, func(m Measurement) error { - fmt.Printf("%s: %.1f°C (change %+.1f°C)\n", m.Date.Format("2006-01-02"), m.Temp, m.Change) - prev = m - return nil - }) - - return err -} - -// getTemperature emulates a network request to fetch the temperature for a given city and date. -func getTemperature(ctx context.Context, city string, date time.Time) (float64, error) { - if err := ctx.Err(); err != nil { - return 0, err - } - - // Emulates a network delay - randomSleep(1000 * time.Millisecond) - - // Basic city hash, to make measurements unique for each city - var h float64 - for _, c := range city { - h += float64(c) - } - - // Emulates a temperature reading, by retuning a pseudo-random, but deterministic value - temp := 15 - 10*math.Sin(h+float64(date.Unix())) - - return temp, nil -} - -func randomSleep(max time.Duration) { - time.Sleep(time.Duration(rand.Intn(int(max)))) -}