1
1
(ns robertluo.waterfall
2
2
" API namespace for the library"
3
3
(:require
4
- [robertluo.waterfall
5
- [core :as core]]
6
- [manifold.stream :as ms]))
4
+ [robertluo.waterfall
5
+ [core :as core]
6
+ [util :as util]]
7
+ [manifold.stream :as ms]
8
+ [robertluo.waterfall.shape :as shape]
9
+ [robertluo.waterfall.util :as util]))
7
10
8
11
(def schema
9
12
" Schema for waterfall"
80
83
(-> (ms/transform xform strm) (ms/connect sink))
81
84
strm))
82
85
83
- (comment
86
+ (defn kafka-cluster
87
+ " returns a convient life-cycle-map of a kafka cluster. It requires:
88
+ - `::nodes` kafka bootstrap servers
89
+ - `::shapes` shape of data, example: [(shape/topic (constantly \" sentence\" ))(shape/edn)(shape/value-only)]
90
+
91
+ If you just use it to publish message,
92
+ - optional `::producer-config` can specify additional kafka producer configuration.
93
+
94
+ If you want to consume from topics:
95
+ - `::topics` the topic you want to subscribe to. example: [\" sentence\" ]
96
+ - `::group-id` the group id for the message consumer
97
+ - optional `::source-xform` is a transducer to process message before consuming
98
+ - optional `::consumer-config` can specify additional kafka consumer configuration. With additions:
99
+ - `:position` either `:beginning` `:end`, none for commited position (default)
100
+ - `:poll-duration` for how long the consumer poll returns, is a Duration value, default 10 seconds
101
+
102
+ The returned map has different level of key-values let you use:
103
+ - Highest level, no additional knowledge:
104
+ - For consumer: `::consume` a function, a one-arity (each message) function as its arg, returns nil.
105
+ - For producer:
106
+ -`::put` a function with a message as its arg. e.g. ((::put return-map) {:a 3})
107
+ -`::put-all` a function with message sequence as its arg
108
+ - Mid level, if you need access of underlying manifold sink/source.
109
+ - `::source` a manifold source.
110
+ - `::sink` a manifold sink.
111
+ - Lowest level, if you want to access kafka directly:
112
+ - `::consumer` a Kafka consumer.
113
+ - `::producer` a Kafka message producer.
114
+ "
115
+ {:malli/schema
116
+ [:=> schema [:cat [:map {:closed true }
117
+ [::nodes ::nodes ]
118
+ [::shapes [:vector [:fn shape/shape?]]]
119
+ [::producer-config {:optional true } ::producer-config ]
120
+ [::group-id {:optional true } :string ]
121
+ [::topics {:optional true } [:vector :string ]]
122
+ [::source-xform {:optional true } fn?]]]
123
+ :map ]}
124
+ [kafka-conf-map]
125
+ (util/optional-require
126
+ [robertluo.fun-map :as fm :refer [fw fnk]]
127
+ (merge
128
+ (fm/life-cycle-map
129
+ {::producer (fnk [::nodes ::producer-config ]
130
+ (let [prod (producer nodes (or producer-config {}))]
131
+ (fm/closeable prod #(ms/close! prod))))
132
+ ::consumer (fnk [::nodes ::group-id ::topics ::consumer-config ]
133
+ (assert (and group-id topics) " Has to provide group-id and topics" )
134
+ (let [cmer (consumer nodes group-id topics (or consumer-config {}))]
135
+ (fm/closeable cmer #(ms/close! cmer))))
136
+ ::sink (fnk [::producer ::shapes ]
137
+ (-> producer ignore (xform-sink (comp (map (shape/serializer shapes))))))
138
+ ::source (fnk [::consumer ::shapes ::source-xform ]
139
+ (->> (comp (map (shape/deserializer shapes)) (or source-xform (map identity)))
140
+ (xform-source consumer)))
141
+ ::put! (fnk [::sink ] (partial ms/put! sink))
142
+ ::put-all (fnk [::sink ] (partial ms/put-all! sink))
143
+ ::consume (fnk [::source ] #(ms/consume % source))})
144
+ kafka-conf-map)
145
+ (throw (ClassNotFoundException. " Need io.github.robertluo/fun-map library in the classpath" ))))
146
+
147
+ (comment
84
148
(require '[malli.dev])
85
149
(malli.dev/start! )
150
+ (malli.dev/stop! )
151
+ (def clu
152
+ (kafka-cluster
153
+ {::nodes " localhost:9092"
154
+ ::shapes [(shape/topic (constantly " sentence" )) (shape/edn ) (shape/value-only )]
155
+ ::consumer-config {:position :beginning }
156
+ ::group-id " tester1"
157
+ ::topics [" sentence" ]
158
+ ::source-xform (map identity)}))
159
+ (def put! (::put! clu))
160
+ (put! " Hello, world" )
161
+ (def consume (::consume clu))
162
+ (consume println)
163
+ (.close clu)
86
164
)
0 commit comments