par is a Go library to process calls with the following characteristics:
- calls will be processed in parallel
- parallelism may be optionally bounded
- leverages Google's Context library to allow calls to be timed our or actively canceled
Let's start off with a Coin Toss example. Suppose we would like to flip a coin N times in parallel. Using par, we could write the following:
import (
"code.google.com/p/go.net/context"
"github.com/savaki/par"
"math/rand"
)
func flipCoin(results chan CoinFlip) par.RequestFunc {
return func(context.Context) error {
if rand.Intn(2) == 0 {
results <- Heads
} else {
results <- Tails
}
return nil
}
}
func simple() {
flips := 10
// 1. create a channel to hold your results
results := make(chan CoinFlip, flips)
// 2. create a channel of requests
requests := make(chan par.RequestFunc, flips)
for flip := 0; flip < flips; flip = flip + 1 {
requests <- flipCoin(results)
}
close(requests)
// 3. execute the flips in parallel
_ = par.Requests(requests).Do()
// 4. results channel now has your results and can be ranged over
}
Seems like a lot of work to execute 10 parallel flips. But let's take a more realistic scenario. Suppose we want to check the weather in three cities. Using openweathermap, we can query the weather in any one city. For our three cities, querying them one at a time is slow. par allows us to query these cities in parallel. As follows:
package main
import (
"code.google.com/p/go.net/context"
"github.com/savaki/par"
"github.com/savaki/openweathermap"
"log"
)
func ok(err error) {
if err != nil {
log.Fatalln(err)
}
}
func find(city string, responses chan *openweathermap.Forecast) par.RequestFunc {
return func(ctx context.Context) error {
forecast, err := openweathermap.WithContext(ctx).ByCityName(city)
ok(err)
responses <- forecast
return nil
}
}
func main() {
// create a channel to capture our results
forecasts := make(chan *openweathermap.Forecast, 3)
// create our channel of requests
requests := make(chan par.RequestFunc, 3)
requests <- find("San Francisco", forecasts)
requests <- find("Oakland", forecasts)
requests <- find("Berkeley", forecasts)
close(requests) // important to remember to close the channel
// execute the requests with a concurrency of 1
resolver := par.Requests(requests).WithConcurrency(1)
err := resolver.Do()
ok(err)
// the forecasts channel now contains all our forecasts
close(forecasts)
cities := map[string]*openweathermap.Forecast{}
for forecast := range forecasts {
cities[forecast.Name] = forecast
}
}
Here we're using a go client for openweathermap to get the weather by city. Since we want to be a well behaved netizen, we don't want to flood their service, so we'll execute 1 query at a time.
You can see from the results, that we wait until we receive a query before sending the next one.
22:00:07.800 349us 349us par - request: 0
22:00:08.393 593ms 593ms openweathermap - http - ok
22:00:08.393 593ms 593ms par - received - 0
22:00:08.393 3us 3us par - request: 1
22:00:08.684 290ms 290ms openweathermap - http - ok
22:00:08.684 290ms 290ms par - received - 1
22:00:08.684 3us 3us par - request: 2
22:00:08.869 184ms 184ms openweathermap - http - ok
22:00:08.869 184ms 184ms par - received - 2
22:00:08.869 5us 5us par - finished
As we've stated before, this can be slow for the end user. So now, we've talked to openweathermap and set up an account and now our concurrent limit has been upped to 3. Wonderful. Let's first look at what code change would be required. Here we add .WithConcurrency(3)
to indicate make at most three calls at a time.
resolver := par.Requests(requests).WithConcurrency(3)
err := resolver.Do()
ok(err)
Looking at our results we can see what calls 0, 1, 2 go through immediately since our limit is 3. Great.
22:00:58.584 155us 155us par - request: 0
22:00:58.584 40us 40us par - request: 1
22:00:58.584 8us 8us par - request: 2
22:00:58.781 196ms 196ms openweathermap - http - ok
22:00:58.781 196ms 196ms par - received - 0
22:00:58.786 5ms 5ms openweathermap - http - ok
22:00:58.786 5ms 5ms par - received - 1
22:00:58.787 1ms 1ms openweathermap - http - ok
22:00:58.787 1ms 1ms par - received - 2
22:00:58.787 4us 4us par - finished
Now let's suppose we can't afford 3 concurrent calls with this service provide and we need to drop down to two. We still want to provide a responsive service to our users, so let's set an upper bound on how long the call can take via .DoWithContext(ctx)
the excellent context library is courtesy of Google.
resolver := par.Requests(requests).WithConcurrency(2)
ctx, cancel := context.WithTimeout(context.Background(), 250*time.Millisecond)
defer cancel()
err := resolver.DoWithContext(ctx)
So now we see that we're only making two calls at a time and that we didn't finish our third call before the timeout happened.
22:06:14.061 381us 381us par - request: 0
22:06:14.061 422us 422us par - request: 1
22:06:14.237 176ms 176ms openweathermap - http - ok
22:06:14.237 175ms 175ms par - received - 0
22:06:14.237 2us 2us par - request: 2
22:06:14.248 11ms 11ms openweathermap - http - ok
22:06:14.249 11ms 11ms par - received - 1
22:06:14.312 63ms 63ms par - timeout
Sometimes, it's not about trying to fit within the bounds of an api agreement. Sometimes, we want to provide the best experience possible for our users. Every now and then, when you invoke a service, you get a slow response. Maybe it's one in ten times you get 500ms of latency. 90% of the time, it's fine. But 10% of the time, ugh. Now instead of a single call, if you made two redundant calls and took the fast responses, your 1 in 10 chance now drops to 1 in 100. Much better!
The more concurrent calls to other services you make, the more likely you'll want to make redundant calls to minimize the latency issues that will arise. Let's take a look at our example again. Here we'd like to use unbounded parallelism (make as many concurrent calls as you like) with a redundancy of 2 (make each city's weather request twice). We can do that via the .WithRedundancy(2)
call.
Here's the core element of code.
resolver := par.Requests(requests).WithRedundancy(2)
ctx, cancel := context.WithTimeout(context.Background(), 200*time.Millisecond)
defer cancel()
err := resolver.DoWithContext(ctx)
In the results, you can see we're making multiple calls for each of our cities, but in spite of that, we're still not able to finish in the 200ms response timeout window.
22:12:41.270 649us 649us par - request: 0
22:12:41.270 45us 45us par - request: 1
22:12:41.270 10us 10us par - request: 2
22:12:41.270 13us 13us par - request: 0
22:12:41.270 11us 11us par - request: 1
22:12:41.270 10us 9us par - request: 2
22:12:41.453 183ms 183ms openweathermap - http - ok
22:12:41.453 183ms 183ms par - received - 1
22:12:41.455 2ms 2ms openweathermap - http - ok
22:12:41.455 2ms 2ms par - received - 2
22:12:41.457 1ms 1ms openweathermap - http - ok
22:12:41.457 1ms 1ms par - received - 1
22:12:41.458 1ms 1ms openweathermap - http - ok
22:12:41.463 5ms 5ms openweathermap - http - ok
22:12:41.466 2ms 2ms openweathermap - http - ok
22:12:41.470 13ms 13ms par - timeout
To see the debug output for yourself, execute the following in your shell:
export DEBUG=\*