Kolt is a library that gets the maximum offset for the given Kafka topics, and compares them with the offsets of any consumers on that Topic. This allows you to see how far 'behind' the producers your kafka consumers are.
Kolt uses the wonderful beam-telemetry project to output its metrics, allowing you to track or store them in whatever manner your please.
To install, simply add kolt to your mix.exs
file:
def deps do
[
# ...
{:kolt, "~> 0.1.0"},
# ...
]
end
Kolt uses Brod to communicate with kafka, and gets Kafka client information from Brod's config in your mix.exs
file. It should look something like this:
config :brod,
clients: [
cool_client: [
endpoints: {'localhost', 9092},
reconnect_cool_down_seconds: 10,
auto_start_producers: true,
default_producer_config: []
]
]
Note that Kolt won't be able to track offsets on Topics outside of the clients given.
To start Kolt, simply add the Kolt.Monitor
GenServer to your application's supervision tree, passing it the consumer groups whose offset lags you want to track, as well as the kafka client that those consumer groups are registered on:
defmodule MyApplication do
@moduledoc false
use Application
def start(_, _) do
import Supervisor.Spec
tree =
[
# ...
{Kolt.Monitor, [%Kolt.Monitor{consumer_group_ids: ["consumer_group_one", "consumer_group_two"], kafka_client: :cool_client, poll_delay: 2_000}]},
# ...
]
opts = [name: MyApplication.Supervisor, strategy: :one_for_one]
Supervisor.start_link(tree, opts)
end
end
Once this GenServer is in place, Kolt will check offset lags every 10 seconds (by default, you can set the poll_delay
field on the %Kolt.Monitor
struct to any number of milliseconds), and output them using BEAM Telemetry. This means that to consume the metrics Kolt produces, you'll need to attach a function to Kolt's telemetry stream.
Kolt outputs telemetry in the namespace [:kafka, :topic, :offset_lag]
, so you'll to call :telemetry.attach/4
on this namespace. I recommend doing this in your application's start method:
def start(_, _) do
import Supervisor.Spec
tree =
[
# ...
{OffsetLagMonitor, [["consumer_group_one", "consumer_group_two"], :cool_client]},
# ...
]
opts = [name: MyApplication.Supervisor, strategy: :one_for_one]
:telemetry.attach("offset-lag-metrics", [:kafka, :topic, :offset_lag], &TelemetryHandler.handle_event/4, nil)
Supervisor.start_link(tree, opts)
end
Initial development was sponsored by Sailthru. Thanks Sailthru!