Kafka Connect Sink Connector for IBM Cloud Object Storage.
Using this connector, you can:
- Copy data from a Kafka topic into IBM Cloud Object Storage.
- Flexibly batch together multiple Kafka records into a single object for efficient retrieval from object storage.
- Preserve ordering of Kafka data. Kafka ordering of records within a partition is maintained as the data is written into object storage.
- Transfer data exactly once (subject to some restrictions, see Exactly once delivery).
- Building the connector
- Configuration
- Object Names
- Combining multiple Kafka records into an object
- Exactly once delivery
- Provisioning an IBM Cloud Object Storage Service instance
- Running the connector
To build the connector, you must have the following installed:
- git
- Gradle 6.8 or later
- Java 8 or later
Clone the repository with the following command:
git clone https://github.com/ibm-messaging/kafka-connect-ibmcos-sink
Change directory into the kafka-connect-ibmcos-sink
directory:
cd kafka-connect-ibmcos-sink
Build the connector using Gradle:
$ gradle shadowJar
-
cos.api.key
(required) - API key used to connect to the Cloud Object Storage service instance. -
cos.bucket.location
(required) - Location of the Cloud Object Storage service bucket, for example:eu-gb
. -
cos.bucket.name
(required) - Name of the Cloud Object Storage service bucket to write data into. -
cos.bucket.resiliency
(required) - Resiliency of the Cloud Object Storage bucket. Must be one of:cross-region
,regional
, orsingle-site
. -
cos.endpoint.visibility
(optional) - Specifypublic
to connect to the Cloud Object Storage service over the public internet, orprivate
to connect from a connector running inside the IBM Cloud network, for example from an IBM Cloud Kubernetes Service cluster. The default ispublic
. -
cos.object.deadline.seconds
(optional) - The number of seconds (as measured wall clock time for the Connect Task instance) between reading the first record from Kafka, and writing all of the records read so far into a Cloud Object Storage object. This can be useful in situations where there are long pauses between Kafka records being produced to a topic, as it ensures that any records received by this connector will always be written into object storage within the specified period of time. -
cos.object.interval.seconds
(optional) - The number of seconds (as measured by the timestamps in Kafka records) between reading the first record from Kafka, and writing all of the records read so far into a Cloud Object Storage object. -
cos.object.records
(optional) - The maximum number of Kafka records to combine into a object. -
cos.object.record.delimiter.nl
(optional) - If set to true (default false) records within a single object will be separated by new line. -
cos.service.crn
(required) - CRN for the Cloud Object Storage service instance. -
cos.endpoints.url
(optional) - Endpoints URL for the Cloud Object Storage instance. Only set this in environments where a non-default set of endpoints is required. -
cos.writer.format
(optional) - Determines the output format of files written to COS. Can be "json" (default) or "parquet". It is recommended to setcos.object.record.delimiter.nl
to "true" when JSON format is chosen. With Parquet the delimiter setting is silently ignored. -
cos.writer.schema.uri
(required if Parquet output format is selected) - Points to the Avro schema of records to be written in the Parquet file. Currently only thefile
URI schema is supported. -
cos.writer.parquet.buffer.size
(optional) Specifies the size, in bytes, of the Parquet output stream buffer. The value must be sufficient to accommodate the entire batch of records as determined bycos.object.deadline.seconds
,cos.interval.seconds
, andcos.object.records
. Default: 26214400 (25 Mib). -
cos.writer.parquet.write.mode
(optional) Can be "create" (default) or "overwrite". If "create" is specified and the file already exists, an exception will be raised. -
cos.writer.parquet.compression.codec
(optional) Can be one of the following values:
"none", "uncompressed", "snappy" (default), "gzip", "lzo", "brotli", "lz4", "zstd". -
cos.writer.parquet.row.group.size
(optional) Determines the Parquet file row group size in bytes. Default value: 536870912 (512 Mib). -
cos.writer.parquet.page.size
(optional) Determines the Parquet file page size in bytes. Default value: 65536 (64 Kib). -
cos.writer.parquet.enable.dictionary
(optional) Set to "false" to disable the column dictionaries. Default: true. -
value.converter.cos.writer.schema.uri
(required if Parquet output format is selected) - Must be set to the same value ascos.writer.schema.uri
.
Note that while the configuration properties cos.object.deadline.seconds
,
cos.interval.seconds
, and cos.object.records
are all listed as optional,
at least one of these properties must be set to a non-default value.
The connector uses the following scheme to name Cloud Object Storage objects:
topic/partition/firstoffset-lastoffset
Both firstoffset
and lastoffset
refer to the Kafka offsets of the records included in this object and are padded to 16 characters using zeros.
For example:
mytopic/1/0000000000017805-0000000000017809
means this objects contains 5 Kafka records, offsets 17805 to 17809, from partition 1 of mytopic.
When Parquet output format is selected the suffix .parquet
is added to the object name.
Typically Kafka records are much smaller than the maximum size an object storage object. And while it is possible to create an object for each Kafka record this is usually not an efficient way to use Cloud Object Storage. This connector offers three different controls for deciding how much Kafka data gets combined into a object:
cos.object.records
cos.object.interval.seconds
cos.object.deadline.seconds
At least one of these configuration properties must be specified. If more than one property is specified then an object is written at the point the first of these limits is reached.
For example, given the following configuration:
cos.object.records=100
cos.object.deadline.seconds=60
Assuming that at least one Kafka record is available to be written into an object then objects will be created either: every minute, or after 100 Kafka records have been received, whichever condition occurs first.
This connector can be used to provide exactly once delivery of Kafka records into object storage. When used like this, data is copied without loss or duplication.
Exactly once delivery requires that Kafka records are combined into objects in a completely deterministic way. Which is to say that given a stream of Kafka records to process, the connector will always group the same records into each Cloud Object Storage object.
Using either the cos.object.records
property or the
cos.object.interval.seconds
property (or both together) will result in
deterministic processing of Kafka records into objects. However the
cos.object.deadline.seconds
option cannot be used for exactly once delivery as
the grouping of Kafka records into objects is dependent on the speed at which
the system hosting the connector can process records.
To use this connector you must provision an instance of the IBM Cloud Object Storage Service. Once provisioned, navigate to the Service Credentials
tab in your instance to retrieve the required configurations for the connector. You also need to create a Bucket.
To run the connector, you must have:
- The JAR from building the connector
- A properties file containing the configuration for the connector
- Apache Kafka 1.1.0 or later, either standalone or included as part of an offering such as IBM Event Streams
The connector can be run in a Kafka Connect worker in either standalone (single process) or distributed mode.
You need two configuration files, one for the configuration that applies to all of the connectors such as the Kafka bootstrap servers, and another for the configuration specific to the IBM Cloud Object Storage sink connector such as the connection information for your Cloud Object Storage service. For the former, the Kafka distribution includes a file called connect-standalone.properties that you can use as a starting point. For the latter, you can use config/cos-sink.properties
in this repository after replacing all placeholders.
To run the connector in standalone mode from the directory into which you installed Apache Kafka, you use a command like this:
bin/connect-standalone.sh connect-standalone.properties cos-sink.properties
You need an instance of Kafka Connect running in distributed mode. The Kafka distribution includes a file called connect-distributed.properties that you can use as a starting point.
To start the COS connector, you can use config/cos-sink.json
in this repository after replacing all placeholders,
and use a command like this:
curl -X POST -H "Content-Type: application/json" http://localhost:8083/connectors \
--data "@./config/cos-sink.json"