forked from KxSystems/kafka
-
Notifications
You must be signed in to change notification settings - Fork 0
/
kfk.q
102 lines (86 loc) · 3.14 KB
/
kfk.q
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
\d .kfk
LIBPATH:`:libkfk 2:
funcs:(
// .kfk.Client[client_type:c;conf:S!S]:i
(`kfkClient;2);
// .kfk.ClientDel[client_id:i]:_
(`kfkClientDel;1);
// .kfk.ClientName[client_id:i]:s
(`kfkClientName;1);
// .kfk.ClientMemberId[client_id:i]:s
(`kfkClientMemberId;1);
// .kfk.Topic[client_id:i;topicname:s;conf:S!S]:i
(`kfkTopic;3);
// .kfk.TopicDel[topic_id:i]:_
(`kfkTopicDel;1);
// .kfk.TopicName[topic_id:i]:s
(`kfkTopicName;1);
// .kfk.Metadata[client_id:i]:S!()
(`kfkMetadata;1);
// PRODUCER API
// .kfk.Pub[topic_id:i;partid:i;data;key]:_
(`kfkPub;4);
// .kfk.OutQLen[client_id:i]:i
(`kfkOutQLen;1);
// CONSUMER API
// .kfk.Sub[client_id:i;topicname:s;partition_list|partition_offsets:I!J]:()
(`kfkSub;3);
// .kfk.Unsub[client_id:i]:()
(`kfkUnsub;1);
// .kfk.Subscription[client_id:i]
(`kfkSubscription;1);
// .kfk.Poll[client_id:i;timeout;max_messages]
(`kfkPoll;3);
// .kfk.Version[]:i
(`kfkVersion;1);
// .kfk.ExportErr[]:T
(`kfkExportErr;1);
// .kfk.CommitOffsets[client_id;topic:s;partition_offsets:I!J;async:b]:()
(`kfkCommitOffsets;4);
// .kfk.PositionOffsets[client_id:i;topic:s;partition_offsets:I!J]:partition_offsets
(`kfkPositionOffsets;3);
// .kfk.CommittedOffsets[client_id:i;topic:s;partition_offsets:I!J]:partition_offsets
(`kfkCommittedOffsets;3);
// .kfk.AssignOffsets[client_id:i;topic:s;partition_offsets:I!J]:()
(`kfkAssignOffsets;3)
);
// binding functions from dictionary funcs using rule
// kfk<Name> -> .kfk.<Name>
.kfk,:(`$3_'string funcs[;0])!LIBPATH@/:funcs
// Current version of librdkafka
Version:Version[];
// Table with all errors return by kafka with codes and description
Errors:ExportErr[];
// Unassigned partition.
// The unassigned partition is used by the producer API for messages
// that should be partitioned using the configured or default partitioner.
PARTITION_UA:-1i
// taken from librdkafka.h
OFFSET.BEGINNING: -2 /**< Start consuming from beginning of kafka partition queue: oldest msg */
OFFSET.END: -1 /**< Start consuming from end of kafka partition queue: next msg */
OFFSET.STORED: -1000 /**< Start consuming from offset retrieved from offset store */
OFFSET.INVALID: -1001 /**< Invalid offset */
// Producer client code
PRODUCER:"p"
Producer:Client[PRODUCER;]
// Consumer client code
CONSUMER:"c"
Consumer:Client[CONSUMER;]
// table with kafka statistics
stats:()
// CALLBACKS - should not be deleted or renamed and be present in .kfk namespace
// https://docs.confluent.io/current/clients/librdkafka/rdkafka_8h.html
// statistics provided by kafka about current state (rd_kafka_conf_set_stats_cb)
statcb:{[j]
s:.j.k j;if[all `ts`time in key s;s[`ts]:-10957D+`timestamp$s[`ts]*1000;s[`time]:-10957D+`timestamp$1000000000*s[`time]];
.kfk.stats,::enlist s;
delete from `.kfk.stats where i<count[.kfk.stats]-100;}
// logger callback(rd_kafka_conf_set_log_cb)
logcb:{[level;fac;buf] show -3!(level;fac;buf);}
// PRODUCER: delivery callback (rd_kafka_conf_set_dr_msg_cb )
drcb:{[cid;msg]}
// CONSUMER: offset commit callback(rd_kafka_conf_set_offset_commit_cb)
offsetcb:{[cid;err;offsets]}
// Main callback for consuming messages(including errors)
consumecb:{[msg]}
\d .