Tasks are processes that handle a bunch of the nitty-gritty details for you. They're the natural evolution from processes for many use cases.
Let's start by exploring the async
+ await
use case.
Task.async/1
spawns a new task, similarly to spawn/1
. Instead of sending messages back and forth, you can use Task.await/1
to collect the value returned by the spawned task.
task =
Task.async(fn ->
Process.sleep(1000)
IO.puts("Expensive computation is done!")
Enum.random(1..100)
end)
IO.puts("Running task...")
Task.await(task)
Task.yield/2
is similar to Task.await/1
, but it returns nil
if the task doesn't return a value within the specified timeout (Task.await/1
exits instead).
task =
Task.async(fn ->
Process.sleep(1000)
IO.puts("Expensive computation is done!")
Enum.random(1..100)
end)
Task.yield(task, _timeout = 5000)
As you can see, if the task returns in time, Task.yield/2
returns {:ok, result}
. Let's see what happens if the task doesn't return in time, instead:
task =
Task.async(fn ->
Process.sleep(1000)
IO.puts("Expensive computation is done!")
Enum.random(1..100)
end)
Task.yield(task, _timeout = 500)
Task.yield/2
returns nil
, but after a while the task seems to still print something. That's because Task.yield/2
"peeks" into whether the task finished, but doesn't shut the task down in case it hasn't finished. To stop the task, we can use Task.shutdown/1
.
Task.yield/2
and Task.shutdown/1
are often combined to implement the use case when you need a computation to be bound by time. It goes something like this:
- Start the computation
- Do some other work on the side
- When you're ready, check the result of the task with
Task.yield/2
. - If the task does not complete within the timeout, shut down the task.
Task.shutdown/1
also takes care of race conditions, which can happen in case the task completes right as we are telling it to shut down.
task =
Task.async(fn ->
Process.sleep(1000)
IO.puts("Expensive computation is done!")
Enum.random(1..100)
end)
IO.puts("Running task...")
Task.yield(task, 500) || Task.shutdown(task)
Task
provides the most underrated function (IMO) in all of Elixir's standard library: Task.async_stream/3
. It takes an enumerable and a function, and returns a stream that maps the function over the enumerable in parallel.
stream =
Task.async_stream([200, 100, 400], fn timeout ->
Process.sleep(timeout)
IO.puts("Slept for #{timeout} ms")
timeout * 2
end)
Enum.to_list(stream)
Seems like nothing special, right? Well, it is!
async_stream
's coolest feature is that it uses a bounded number of processes. You can control this number through the :max_concurrency
option, and it defaults to the number of cores on your machine. This feature is huge: our previous naive parallel-map implementation would spawn one process per element in the enumerable, regardless of the number of elements. Billions of processes? Not good. async_stream
will happily churn through infinite streams, using :max_concurrency
processes at a time.
async_stream
is also flexible. It accepts any enumerable as its input (including infinite streams) and returns itself an enumerable.
- If you want to perform a few requests to different services and then collect the results
- If you need a simple parallel mapping approach
- If you need to perform a computation in a limited timeframe and want to stop it if it times out
- When you want to spawn a computation in the background (
Task.start/1
), for something like side effects
If you're using Task.async_stream/1
and don't care about the ordering of results, use the ordered: false
option.
This is great for when you're using async_stream/1
to parallelize side-effects over a collection, for example. It's also useful when you're going to do something with the mapped collection that doesn't require ordering, like aggregating into a map.
print_after_timeout = fn timeout ->
Process.sleep(timeout)
IO.puts("Slept for #{timeout} ms")
timeout
end
[200, 100, 400]
|> Task.async_stream(print_after_timeout, ordered: false)
|> Enum.to_list()
As you can see, the results are returned in the order in which they finish computing, and not in the order of the original list.
The documentation for Task.yield/2
has a great code snippet to use when you need to perform a time-capped computation.
task =
Task.async(fn ->
Process.sleep(Enum.random(499..501))
IO.puts("Done!")
end)
case Task.yield(task, 500) || Task.shutdown(task) do
{:ok, result} -> result
nil -> :timeout
end
Before talking about GenStage, Broadway, and Flow, I want to stress the importance of async_stream
. I've seen many cases of solutions that used GenStage or Flow that were essentially overengineered async_stream
s. async_stream
has some limitations, but combining the bounded number of processes, the optional ordering, and the fact that it processes lazy streams makes it a great choice in many situations.