Skip to content

kafka.zig is a simple-to-use Kafka Zig client built on top of C/C++ librdkafka.

Notifications You must be signed in to change notification settings

vspaz/kafka.zig

Folders and files

NameName
Last commit message
Last commit date

Latest commit

 

History

88 Commits
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 

Repository files navigation

kafka.zig

kafka.zig is the Apache Kafka Zig client library built on top of C/C++ librdkafka. This lib makes it plain simple to write your own Apache Kafka producers and consumers in Zig. kafka.zig is also very lean and efficient.

Installation

Important

kafka.zig relies on C/C++ librdkafka, so we need to install it first before we start using kafka.zig. The good news is that's the only dependency that's required by kafka.zig.

  1. install librdkafka

🐧 Linux - .deb-based, e.g., Debian, Ubuntu etc.

apt-get install librdkafka-dev

🐧 Linux - .rpm-based, e.g., RedHat, Fedora and other RHEL derivatives.

yum install librdkafka-devel

Mac OSX

brew install librdkafka
  1. Run the following command inside your project:
 zig fetch --save git+https://github.com/vspaz/kafka.zig.git#main

it should add the following dependency to your project build.zig.zon file, e.g.

.dependencies = .{
    .@"kafka.zig" = .{
        .url = "git+https://github.com/vspaz/kafka.zig.git?ref=main#c3d2d726ebce1daea58c12e077a90e7afdc60f88",
        .hash = "1220367e8cb4867b60e2bfa3e2f3663bc0669a4b6899650abcc82fc9b8763fd64050",
    },
},
  1. Navigate to build.zig file located in the root directory and add the following 3 lines as shown below:
 const exe = b.addExecutable(.{
        .name = "yourproject",
        .root_source_file = b.path("src/main.zig"),
        .target = target,
        .optimize = optimize,
    });
    
    // add these 3 lines! 
    const kafkazig = b.dependency("kafka.zig", .{});
    exe.root_module.addImport("kafka.zig", kafkazig.module("kafka.zig"));
    exe.linkSystemLibrary("rdkafka");
  1. Test the project build with zig build There should be no error!
  2. Import kafka.zig in your code as:
const kafka = @import("kafka.zig");

and you're good to go! 🚀

Configuration

Producer/Consumer

Configuration parameters can be set via kafka.ConfigBuilder An example of using kafka.ConfigBuilder

const kafka = @import("kafka.zig");

pub fn main() !void {
    var producer_config_builder = kafka.ConfigBuilder.get();
    const producer_conf = producer_config_builder
        .with("bootstrap.servers", "localhost:9092")
        .with("enable.idempotence", "true")
        .with("batch.num.messages", "10")
        .with("reconnect.backoff.ms", "1000")
        .with("reconnect.backoff.max.ms", "5000")
        .with("transaction.timeout.ms", "10000")
        .with("linger.ms", "100")
        .with("delivery.timeout.ms", "1800000")
        .with("compression.codec", "snappy")
        .with("batch.size", "16384")    
        .build();
}

Topic

A topic is configured similarly to Producer/Consumer, but using kafka.TopicBuilder class.

An example of configuring a topic via kafka.TopicBuilder.

pub fn main() !void {
    var topic_config_builder = kafka.TopicBuilder.get();
    const topic_conf = topic_config_builder
        .with("acks", "all")
        .build();
}

Producers

A simple Zig Apache Kafka producer sending plain text data.

const kafka = @import("kafka.zig");

fn plainTextProducer() void {
    var producer_config_builder = kafka.ConfigBuilder.get();
    const producer_conf = producer_config_builder
        .with("bootstrap.servers", "localhost:9092")
        .with("enable.idempotence", "true")
        .with("batch.num.messages", "10")
        .with("reconnect.backoff.ms", "1000")
        .with("reconnect.backoff.max.ms", "5000")
        .with("transaction.timeout.ms", "10000")
        .with("linger.ms", "100")
        .with("delivery.timeout.ms", "1800000")
        .with("compression.codec", "snappy")
        .with("batch.size", "16384")    
        .build();

    var topic_config_builder = kafka.TopicBuilder.get();
    const topic_conf = topic_config_builder
        .with("acks", "all")
        .build();

    const kafka_producer = kafka.Producer.init(producer_conf, topic_conf, "topic-name1");
    defer kafka_producer.deinit();

    kafka_producer.send("some payload", "key");
    kafka_producer.wait(100);
}

pub fn main() !void {
    plainTextProducer();
}

An example of a Kafka Zig producer, producing JSON or binary data.

const std = @import("std");
const kafka = @import("kafka.zig");

fn jsonProducer() !void {
    var producer_config_builder = kafka.ConfigBuilder.get();
    const producer_conf = producer_config_builder
        .with("bootstrap.servers", "localhost:9092")
        .with("enable.idempotence", "true")
        .with("batch.num.messages", "10")
        .with("reconnect.backoff.ms", "1000")
        .with("reconnect.backoff.max.ms", "5000")
        .with("transaction.timeout.ms", "10000")
        .with("linger.ms", "100")
        .with("delivery.timeout.ms", "1800000")
        .with("compression.codec", "snappy")
        .with("batch.size", "16384")
        .build();

    var topic_config_builder = kafka.TopicBuilder.get();
    const topic_conf = topic_config_builder
        .with("acks", "all")
        .build();

    const kafka_producer = kafka.Producer.init(producer_conf, topic_conf, "topic-name2");
    defer kafka_producer.deinit();

    var gpa = std.heap.GeneralPurposeAllocator(.{}){};
    const allocator = gpa.allocator();
    defer _ = gpa.deinit();

    for (0..100) |_| {
        const message = .{ .key1 = 100, .key2 = "kafka" };
        const encoded_message = try std.json.stringifyAlloc(allocator, message, .{});
        defer allocator.free(encoded_message);

        kafka_producer.send(encoded_message, "key");
        kafka_producer.wait(100);
    }
}

pub fn main() !void {
    try jsonProducer();
}

Callbacks

If you wish to set a producer callback, you can do it with kafka.setCb or kafka.setErrCb as follows:

const kafka = @import("kafka.zig");

fn onMessageSent(message: kafka.Message) void {
    std.log.info("Message sent: {s}", .{message.getPayload()});
}

fn onError(err: i32, reason: [*c]const u8) void {
    std.log.err("error code: {d}; error message: {s}.", .{err, reason});
}

fn producer() void {
    var producer_config_builder = kafka.ConfigBuilder.get();
    const producer_conf = producer_config_builder
        .with("bootstrap.servers", "localhost:9092")
        .build();
    
    kafka.setCb(producer_conf, onMessageSent);
    kafka.setErrCb(producer_conf, onError);
}

Note

The callback is a part of producer configuration and should be set before producer is initialized!

Consumers

An example of a Zig Kafka consumer, consuming JSON or binary data.

const std = @import("std");
const kafka = @import("kafka.zig");

const Data = struct { key1: u32, key2: []u8 };

fn jsonConsumer() !void {
    var gpa = std.heap.GeneralPurposeAllocator(.{}){};
    const allocator = gpa.allocator();
    defer _ = gpa.deinit();

    var consumer_config_builder = kafka.ConfigBuilder.get();
    const consumer_conf = consumer_config_builder
        .with("debug", "all")
        .with("bootstrap.servers", "localhost:9092")
        .with("group.id", "consumer1")
        .with("auto.offset.reset", "latest")
        .with("enable.auto.commit", "false")
        .with("isolation.level", "read_committed")
        .with("rreconnect.backoff.ms", "100")
        .with("reconnect.backoff.max.ms", "1000")
        .build();
    var kafka_consumer = kafka.Consumer.init(consumer_conf);
    defer kafka_consumer.deinit();

    const topics = [_][]const u8{"topic-name2"};
    kafka_consumer.subscribe(&topics);

    while (true) {
        const message_or_null: ?kafka.Message = kafka_consumer.poll(1000);
        if (message_or_null) |message| {
            std.log.info("offset: {d}", .{message.getOffset()});
            std.log.info("partition: {d}", .{message.getPartition()});
            std.log.info("message length: {d}", .{message.getPayloadLen()});
            std.log.info("key: {s}", .{message.getKey()});
            std.log.info("key length: {d}", .{message.getKeyLen()});
            std.log.info("error code: {d}", .{message.getErrCode()});
            std.log.info("timestamp: {d}", .{message.getTimestamp()});
            const payload: []const u8 = message.getPayload();
            std.log.info("Received message: {s}", .{payload});
            const parsed_payload = try std.json.parseFromSlice(Data, allocator, payload, .{});
            defer parsed_payload.deinit();
            std.log.info("parsed value: {s}", .{parsed_payload.value.key2});
            kafka_consumer.commitOffsetOnEvery(10, message); // or kafka_consumer.commitOffset(message) to commit on every message.
        }
    }
    // when the work is done call 'unsubscribe' & 'close'
    kafka_consumer.unsubscribe();
    kafka_consumer.close();
}

pub fn main() !void {
    const producer_worker = try std.Thread.spawn(.{}, jsonProducer, .{});
    const consumer_worker = try std.Thread.spawn(.{}, jsonConsumer, .{});
    producer_worker.join();
    consumer_worker.join();
}

AdminApiClient

Topics

List topics

const std = @import("std");
const kafka = @import("kafka.zig");

pub fn listTopics() !void {
    var gpa = std.heap.GeneralPurposeAllocator(.{}){};
    const allocator = gpa.allocator();
    defer _ = gpa.deinit();

    var config_builder = kafka.ConfigBuilder.get();
    const conf = config_builder
        .with("bootstrap.servers", "localhost:9092")
        .build();

    const api_client = kafka.AdminApiClient.init(conf);
    defer api_client.deinit();

    var meta = try api_client.getMetadata(allocator);
    defer meta.deinit();
    const topics = meta.listTopics();
    std.log.info("topics count {d}", .{topics.len});
    std.log.info("topic name {s}", .{topics[0].name});
    std.log.info("topic partition count {d}", .{topics[0].partitions.len});
    std.log.info("topic partition id {d}", .{topics[0].partitions[0].id});
}

Describe a topic

const std = @import("std");
const kafka = @import("kafka.zig");

pub fn describeTopic() !void {
    var gpa = std.heap.GeneralPurposeAllocator(.{}){};
    const allocator = gpa.allocator();
    defer _ = gpa.deinit();

    var config_builder = kafka.ConfigBuilder.get();
    const conf = config_builder
        .with("bootstrap.servers", "localhost:9092")
        .build();

    const api_client = kafka.AdminApiClient.init(conf);
    defer api_client.deinit();

    var meta = try api_client.getMetadata(allocator);
    defer meta.deinit();
    const topic_or_null = meta.describeTopic("topic-name2");
    if (topic_or_null) |topic| {
        std.log.info("topic name {s}", .{topic.name});
        std.log.info("topic partition count {d}", .{topic.partitions.len});
        std.log.info("topic partition id {d}", .{topic.partitions[0].id});
    }
}

Brokers

List brokers

const std = @import("std");
const kafka = @import("kafka.zig");

pub fn listBrokers() !void {
    var gpa = std.heap.GeneralPurposeAllocator(.{}){};
    const allocator = gpa.allocator();
    defer _ = gpa.deinit();

    var config_builder = kafka.ConfigBuilder.get();
    const conf = config_builder
        .with("bootstrap.servers", "localhost:9092")
        .build();

    const api_client = kafka.AdminApiClient.init(conf);
    defer api_client.deinit();

    var meta = try api_client.getMetadata(allocator);
    defer meta.deinit();

    const brokers = meta.listBrokers();
    std.log.info("brokers count: {d}", .{brokers.len});
    std.log.info("host: {s}", .{brokers[0].host});
    std.log.info("port: {d}", .{brokers[0].port});
    std.log.info("brokers id: {d}", .{brokers[0].id});
}

Describe a broker

const std = @import("std");
const kafka = @import("kafka.zig");

pub fn describeBroker() !void {
    var gpa = std.heap.GeneralPurposeAllocator(.{}){};
    const allocator = gpa.allocator();
    defer _ = gpa.deinit();

    var config_builder = kafka.ConfigBuilder.get();
    const conf = config_builder
        .with("bootstrap.servers", "localhost:9092")
        .build();

    const api_client = kafka.AdminApiClient.init(conf);
    defer api_client.deinit();

    var meta = try api_client.getMetadata(allocator);
    defer meta.deinit();

    const host = "localhost";
    const broker_or_null = meta.describeBroker(host);
    if (broker_or_null) |broker| {
        std.log.info("host: {s}", .{broker.host});
        std.log.info("port: {d}", .{broker.port});
        std.log.info("brokers id: {d}", .{broker.id});
    } else {
        std.log.err("broker {s} not found", .{host});
    }
}

Examples

Please, refer to the examples section.

Useful references