A fully asynchronous, futures-based Kafka client library for Rust based on librdkafka.
rust-rdkafka
provides a safe Rust interface to librdkafka. The master branch is currently based on librdkafka 1.3.0.
The main features provided at the moment are:
- Support for all Kafka versions since 0.8.x. For more information about broker compatibility options, check the librdkafka documentation.
- Consume from single or multiple topics.
- Automatic consumer rebalancing.
- Customizable rebalance, with pre and post rebalance callbacks.
- Synchronous or asynchronous message production.
- Customizable offset commit.
- Access to cluster metadata (list of topic-partitions, replicas, active brokers etc).
- Access to group metadata (list groups, list members of groups, hostnames etc).
- Access to producer and consumer metrics, errors and callbacks.
rust-rdkafka
is designed to be easy and safe to use thanks to the abstraction layer written in Rust, while at the same time being extremely fast thanks to the librdkafka C library.
Here are some benchmark results using the rust-rdkafka BaseProducer
, sending data to a single Kafka 0.11 process running in localhost (default configuration, 3 partitions). Hardware: Dell laptop, with Intel Core i7-4712HQ @ 2.30GHz.
-
Scenario: produce 5 million messages, 10 bytes each, wait for all of them to be acked
- 1045413 messages/s, 9.970 MB/s (average over 5 runs)
-
Scenario: produce 100000 messages, 10 KB each, wait for all of them to be acked
- 24623 messages/s, 234.826 MB/s (average over 5 runs)
For more numbers, check out the kafka-benchmark project.
rust-rdkafka
provides low level and high level consumers and producers. Low level:
BaseConsumer
: simple wrapper around the librdkafka consumer. It requires to be periodicallypoll()
ed in order to execute callbacks, rebalances and to receive messages.BaseProducer
: simple wrapper around the librdkafka producer. As in the consumer case, the user must callpoll()
periodically to execute delivery callbacks.ThreadedProducer
:BaseProducer
with a separate thread dedicated to polling the producer.
High level:
StreamConsumer
: it returns astream
of messages and takes care of polling the consumer internally.FutureProducer
: it returns afuture
that will be completed once the message is delivered to Kafka (or failed).
For more information about consumers and producers, refer to their module-level documentation.
Warning: the library is under active development and the APIs are likely to change.
tokio-rs is a platform for fast processing of asynchronous events in Rust. The interfaces exposed by the StreamConsumer
and the FutureProducer
allow rust-rdkafka users to easily integrate Kafka consumers and producers within the tokio-rs platform, and write asynchronous message processing code. Note that rust-rdkafka can be used without tokio-rs.
To see rust-rdkafka in action with tokio-rs, check out the asynchronous processing example in the examples folder.
At-least-once delivery semantic is common in many streaming applications: every message is guaranteed to be processed at least once; in case of temporary failure, the message can be re-processed and/or re-delivered, but no message will be lost.
In order to implement at-least-once delivery the stream processing application has to carefully commit the offset only once the message has been processed. Committing the offset too early, instead, might cause message loss, since upon recovery the consumer will start from the next message, skipping the one where the failure occurred.
To see how to implement at-least-once delivery with rdkafka
, check out the at-least-once delivery example in the examples folder. To know more about delivery semantics, check the message delivery semantics chapter in the Kafka documentation.
Here are some of the projects using rust-rdkafka:
- timely-dataflow: a modular implementation of timely dataflow in Rust (you can also check the blog post).
- kafka-view: a web interface for Kafka clusters.
- kafka-benchmark: a high performance benchmarking tool for Kafka.
If you are using rust-rdkafka, please let me know!
Add this to your Cargo.toml
:
[dependencies]
rdkafka = { version = "0.23", features = ["cmake-build"] }
This crate will compile librdkafka from sources and link it statically to your executable. To compile librdkafka you'll need:
- the GNU toolchain
- GNU
make
pthreads
zlib
: optional, but included by default (feature:libz
)cmake
: optional, not included by default (feature:cmake-build
)libssl-dev
: optional, not included by default (feature:ssl
)libsasl2-dev
: optional, not included by default (feature:gssapi
)libzstd-dev
: optional, not included by default (feature:zstd-pkg-config
)
Note that using the CMake build system, via the cmake-build
feature, is
strongly encouraged. The default build system has a known
issue that can cause corrupted builds.
By default a submodule with the librdkafka sources pinned to a specific commit
will be used to compile and statically link the library. The dynamic-linking
feature can be used to instead dynamically link rdkafka to the system's version
of librdkafka. Example:
[dependencies]
rdkafka = { version = "0.23", features = ["dynamic-linking"] }
For a full listing of features, consult the rdkafka-sys crate's documentation. All of rdkafka-sys features are re-exported as rdkafka features.
To compile from sources, you'll have to update the submodule containing librdkafka:
git submodule update --init
and then compile using cargo
, selecting the features that you want. Example:
cargo build --features "ssl gssapi"
You can find examples in the examples
folder. To run them:
cargo run --example <example_name> -- <example_args>
The unit tests can run without a Kafka broker present:
cargo test --lib
rust-rdkafka contains a suite of tests which is automatically executed by travis in docker-compose. Given the interaction with C code that rust-rdkafka has to do, tests are executed in valgrind to check eventual memory errors and leaks.
To run the full suite using docker-compose:
./test_suite.sh
To run locally, instead:
KAFKA_HOST="kafka_server:9092" cargo test
In this case there is a broker expected to be running on KAFKA_HOST
.
The broker must be configured with default partition number 3 and topic autocreation in order
for the tests to succeed.
rust-rdkafka uses the log
and env_logger
crates to handle logging. Logging can be enabled
using the RUST_LOG
environment variable, for example:
RUST_LOG="librdkafka=trace,rdkafka::client=debug" cargo test
This will configure the logging level of librdkafka to trace, and the level of the client
module of the Rust client to debug. To actually receive logs from librdkafka, you also have to
set the debug
option in the producer or consumer configuration (see librdkafka
configuration).
To enable debugging in your project, make sure you initialize the logger with
env_logger::init()
or equivalent.
See rdkafka-sys.
Thanks to:
- Thijs Cadier - thijsc
- kafka-rust: a pure Rust implementation of the Kafka client.