A Clojure Apache Kafka client with core.async api
[com.appsflyer/ketu "0.6.0"]
- Channels API: Take kafka data from a channel and send data to kafka through a channel.
- Consumer Source: Polls records from kafka and puts them on a channel.
- Producer Sink: Takes records from a channel and sends them to kafka.
- Shapes: Transform the original objects of the java client to clojure data and back.
- Simple Configuration: Friendly, validated configuration.
Consume a name string from kafka and produce a greeting string for that name back into kafka, all through channels:
(ns example
(:require [clojure.core.async :refer [chan close! <!! >!!]]
[ketu.async.source :as source]
[ketu.async.sink :as sink]))
(let [<names (chan 10)
source-opts {:name "greeter-consumer"
:brokers "broker1:9092"
:topic "names"
:group-id "greeter"
:value-type :string
:shape :value}
source (source/source <names source-opts)
>greets (chan 10)
sink-opts {:name "greeter-producer"
:brokers "broker2:9091"
:topic "greetings"
:value-type :string
:shape :value}
sink (sink/sink >greets sink-opts)]
;; Consume a name and produce a greeting. You could also do this with e.g. clojure.core.async/pipeline.
(->> (<!! <names)
(str "Hi, ")
(>!! >greets))
;; Close the source. It automatically closes the source channel `<names`.
(source/stop! source)
;; Close the sink channel `>greets`. It causes the sink to close itself as a consequence.
(close! >greets))
Anything that is not documented is not supported and might change.
Note: int
is used for brevity but can also mean long
. Don't worry about it.
Key | Type | Req? | Notes |
---|---|---|---|
:brokers | string | required | Comma separated host:port values e.g "broker1:9092,broker2:9092" |
:topic | string | required | |
:name | string | required | Simple human-readable identifier, used in logs and thread names |
:key-type | :string ,:byte-array |
optional | Default :byte-array , used in configuring key serializer/deserializer |
:value-type | :string ,:byte-array |
optional | Default :byte-array , used in configuring value serializer/deserializer |
:internal-config | map | optional | A map of the underlying java client properties, for any extra lower level config |
Key | Type | Req? | Notes |
---|---|---|---|
:group-id | string | required | |
:shape | :value: , [:vector <fields>] ,[:map <fields>] , or an arity-1 function of ConsumerRecord |
optional | If unspecified, channel will contain ConsumerRecord objects. Examples |
Key | Type | Req? | Notes |
---|---|---|---|
:shape | :value , [:vector <fields>] ,[:map <fields>] , or an arity-1 function of the input returning ProducerRecord |
optional | If unspecified, you must put ProducerRecord objects on the channel. Examples |
:compression-type | "none" "gzip" "snappy" "lz4" "zstd" |
optional | Default "none" , values are same as "compression.type" of the java producer |
:workers | int | optional | Default 1 , number of threads that take from the channel and invoke the internal producer |
You don't have to deal with ConsumerRecord or ProducerRecord objects.
To get a clojure data structure with any of the ConsumerRecord fields, configure the consumer shape:
; Value only:
{:topic "names"
:key-type :string
:value-type :string
:shape :value}
(<!! consumer-chan)
;=> "v"
; Vector:
{:shape [:vector :key :value :topic]}
(<!! consumer-chan)
;=> ["k" "v" "names"]
; Map
{:shape [:map :key :value :topic]}
(<!! consumer-chan)
;=> {:key "k", :value "v", :topic "names"}
Similarly, to put a clojure data structure on the producer channel:
; Value only:
{:key-type :string
:value-type :string
:shape :value}
(>!! producer-chan "v")
; Vector:
{:shape [:vector :key :value]}
(>!! producer-chan ["k" "v"])
; Vector with topic in each message:
{:shape [:vector :key :value :topic]}
(>!! producer-chan ["k1" "v1" "names"])
(>!! producer-chan ["k2" "v2" "events"])
We welcome feedback and would love to hear about use-cases other than ours. You can open issues, send pull requests, or contact us at clojurians slack.