Amazon Kinesis Data Firehose configurable queue supporting arbitrary adapters.
When you want to integrate with Amazon Kinesis Data Firehose, you will most likely want to batch the requests you do in order to not hit Amazon limits. Hence, you'd ideally have an abstraction that allows you to push data, automatically buffering it and pumping data to any given stream from time to time. This is what firefighter
does.
You can configure different options (e.g., :batch_size
, :interval
, :delimiter
, :flush_grace_period
) which should be tuned to your specific usage. Defaults are as follows:
:batch_size
:40
:interval
:2_000
(milliseconds, i.e., 2 seconds):delimiter
:""
(i.e., the empty string):flush_grace_period
:30_000
(milliseconds, i.e., 30 seconds)
The package can be installed by adding firefighter
to your list of dependencies in mix.exs
:
def deps do
[
{:firefighter, "~> 0.1.2"} # check most recent version in this project's mix.exs
]
end
You should configure firefighter
in, e.g., config/config.exs
and select your specific adapter.
# config/config.exs
config :firefighter, :adapter, Firefighter.Adapters.ExAws
Adapters provide implementations for the underlying libraries you may use to pump data to Firehose. By default, we provide a logger adapter that just logs data. We also provide an adapter for ex_aws
out of the box. It should be easy enough to expand on this to provide more adapters (e.g., a new adapter for aws-elixir
).
# config/config.exs
config :firefighter, :adapter, Firefighter.Adapters.Logger
# config/prod.exs
config :firefighter, :adapter, Firefighter.Adapters.ExAws
# lib/example/application.ex
defmodule Example.Application do
use Application
@impl true
def start(_type, _args) do
children = [
{Firefighter, [name: :my_firefighter, delivery_stream_name: "s3-firehose", batch_size: 10]}
]
opts = [strategy: :one_for_one, name: Example.Supervisor]
Supervisor.start_link(children, opts)
end
end
# lib/example.ex
defmodule Example do
def run do
pid = Process.whereis(:my_firefighter)
for i <- 0..30, do: Firefighter.push(pid, "sample-data-#{i}")
pid
end
end
As an alternative usage method, you might also decide to go with the Firefighter.Execution
abstraction, provided out of the box:
Execution.start(%{user_id: "user-1", post_id: "post-123"})
|> Execution.record(%{age: 29})
|> Execution.push(:my_firefighter)
You can also just hose it directly, without ceremony, if you're pumping a simple record that needs no composition:
Execution.hose(:my_firefighter, %{user_id: "user-1", post_id: "post-123", age: 29})
You can also use pids instead of the process name (:my_firefighter
in the example above).
For a detailed example project using firefighter
, check the example/
directory.
Copyright © 2020-present Daniel Serrano <danieljdserrano at protonmail>
This work is free. You can redistribute it and/or modify it under the
terms of the MIT License. See the LICENSE file for more details.
Made in Portugal 🇵🇹 by dnlserrano