diff --git a/pipeline_kafka--0.9.3.sql b/pipeline_kafka--0.9.3.sql index 9ca51bde..01b7d518 100644 --- a/pipeline_kafka--0.9.3.sql +++ b/pipeline_kafka--0.9.3.sql @@ -11,6 +11,7 @@ CREATE TABLE pipeline_kafka.consumers ( quote text, escape text, batchsize integer NOT NULL, + maxbytes integer NOT NULL, parallelism integer NOT NULL, timeout integer NOT NULL, UNIQUE (relation, topic) @@ -38,6 +39,7 @@ CREATE FUNCTION pipeline_kafka.consume_begin ( quote text DEFAULT NULL, escape text DEFAULT NULL, batchsize integer DEFAULT 1000, + maxbytes integer DEFAULT 32000000, -- 32mb parallelism integer DEFAULT 1, timeout integer DEFAULT 250, start_offset bigint DEFAULT NULL diff --git a/pipeline_kafka.c b/pipeline_kafka.c index fa82e877..bb668fde 100644 --- a/pipeline_kafka.c +++ b/pipeline_kafka.c @@ -60,7 +60,7 @@ PG_MODULE_MAGIC; #define PIPELINE_KAFKA_SCHEMA "pipeline_kafka" #define CONSUMER_RELATION "consumers" -#define CONSUMER_RELATION_NATTS 10 +#define CONSUMER_RELATION_NATTS 11 #define CONSUMER_ATTR_ID 1 #define CONSUMER_ATTR_RELATION 2 #define CONSUMER_ATTR_TOPIC 3 @@ -69,8 +69,9 @@ PG_MODULE_MAGIC; #define CONSUMER_ATTR_QUOTE 6 #define CONSUMER_ATTR_ESCAPE 7 #define CONSUMER_ATTR_BATCH_SIZE 8 -#define CONSUMER_ATTR_PARALLELISM 9 -#define CONSUMER_ATTR_TIMEOUT 10 +#define CONSUMER_ATTR_MAX_BYTES 9 +#define CONSUMER_ATTR_PARALLELISM 10 +#define CONSUMER_ATTR_TIMEOUT 11 #define OFFSETS_RELATION "offsets" #define OFFSETS_RELATION_NATTS 3 @@ -102,8 +103,8 @@ PG_MODULE_MAGIC; #define RD_KAFKA_OFFSET_NULL INT64_MIN -#define CONSUMER_LOG_PREFIX "[pipeline_kafka consumer] %s <- %s (PID %d): " -#define CONSUMER_LOG_PREFIX_PARAMS(consumer) consumer.rel->relname, consumer.topic, MyProcPid +#define CONSUMER_LOG_PREFIX "[pipeline_kafka] %s <- %s (PID %d): " +#define CONSUMER_LOG_PREFIX_PARAMS(consumer) (consumer)->rel->relname, (consumer)->topic, MyProcPid #define CONSUMER_WORKER_RESTART_TIME 1 static volatile sig_atomic_t got_SIGTERM = false; @@ -222,6 +223,7 @@ typedef struct KafkaConsumer int32_t partition; int64_t offset; int batch_size; + int max_bytes; int parallelism; int timeout; char *format; @@ -595,6 +597,11 @@ load_consumer_state(int32 consumer_id, KafkaConsumer *consumer) Assert(!isnull); consumer->batch_size = DatumGetInt32(d); + /* max bytes */ + d = slot_getattr(slot, CONSUMER_ATTR_MAX_BYTES, &isnull); + Assert(!isnull); + consumer->max_bytes = DatumGetInt32(d); + /* timeout */ d = slot_getattr(slot, CONSUMER_ATTR_TIMEOUT, &isnull); Assert(!isnull); @@ -791,17 +798,40 @@ get_copy_statement(KafkaConsumer *consumer) * * Write messages to stream */ -static uint64 -execute_copy(CopyStmt *stmt, StringInfo buf) +static void +execute_copy(KafkaConsumer *consumer, KafkaConsumerProc *proc, CopyStmt *stmt, StringInfo buf, int num_messages) { - uint64 processed; + MemoryContext old = CurrentMemoryContext; - copy_iter_hook = copy_next; - copy_iter_arg = buf; + StartTransactionCommand(); + + /* we don't want to die in the event of any errors */ + PG_TRY(); + { + uint64 processed; + copy_iter_arg = buf; + DoCopy(stmt, "COPY", &processed); + } + PG_CATCH(); + { + elog(LOG, CONSUMER_LOG_PREFIX "failed to process batch, dropped %d message%s", + CONSUMER_LOG_PREFIX_PARAMS(consumer), num_messages, (num_messages == 1 ? "" : "s")); + + EmitErrorReport(); + FlushErrorState(); - DoCopy(stmt, "COPY", &processed); + AbortCurrentTransaction(); + } + PG_END_TRY(); + + if (!IsTransactionState()) + StartTransactionCommand(); + + save_consumer_offsets(consumer, proc->partition_group); + + CommitTransactionCommand(); - return processed; + MemoryContextSwitchTo(old); } /* @@ -830,6 +860,8 @@ kafka_consume_main(Datum arg) int i; int my_partitions = 0; MemoryContext work_ctx; + char errstr[512]; + char val[64]; if (!found) { @@ -858,6 +890,9 @@ kafka_consume_main(Datum arg) copy = get_copy_statement(&consumer); topic_conf = rd_kafka_topic_conf_new(); + sprintf(val, "%d", consumer.max_bytes); + rd_kafka_topic_conf_set(topic_conf, "fetch.message.max.bytes", val, errstr, sizeof(errstr)); + kafka = rd_kafka_new(RD_KAFKA_CONSUMER, NULL, err_msg, sizeof(err_msg)); rd_kafka_set_logger(kafka, consumer_logger); @@ -867,7 +902,7 @@ kafka_consume_main(Datum arg) if (consumer.brokers == NIL) { elog(WARNING, CONSUMER_LOG_PREFIX "no brokers found in pipeline_kafka.brokers", - CONSUMER_LOG_PREFIX_PARAMS(consumer)); + CONSUMER_LOG_PREFIX_PARAMS(&consumer)); goto done; } @@ -876,7 +911,7 @@ kafka_consume_main(Datum arg) if (!valid_brokers) elog(ERROR, CONSUMER_LOG_PREFIX "no valid brokers were found", - CONSUMER_LOG_PREFIX_PARAMS(consumer)); + CONSUMER_LOG_PREFIX_PARAMS(&consumer)); /* * Set up our topic to read from @@ -886,7 +921,7 @@ kafka_consume_main(Datum arg) if (err != RD_KAFKA_RESP_ERR_NO_ERROR) elog(ERROR, CONSUMER_LOG_PREFIX "failed to acquire metadata: %s", - CONSUMER_LOG_PREFIX_PARAMS(consumer), rd_kafka_err2str(err)); + CONSUMER_LOG_PREFIX_PARAMS(&consumer), rd_kafka_err2str(err)); Assert(meta->topic_cnt == 1); topic_meta = meta->topics[0]; @@ -905,11 +940,11 @@ kafka_consume_main(Datum arg) continue; elog(LOG, CONSUMER_LOG_PREFIX "consuming partition %d from offset %ld", - CONSUMER_LOG_PREFIX_PARAMS(consumer), partition, consumer.offsets[partition]); + CONSUMER_LOG_PREFIX_PARAMS(&consumer), partition, consumer.offsets[partition]); if (rd_kafka_consume_start(topic, partition, consumer.offsets[partition]) == -1) elog(ERROR, CONSUMER_LOG_PREFIX "failed to start consuming: %s", - CONSUMER_LOG_PREFIX_PARAMS(consumer), rd_kafka_err2str(rd_kafka_errno2err(errno))); + CONSUMER_LOG_PREFIX_PARAMS(&consumer), rd_kafka_err2str(rd_kafka_errno2err(errno))); my_partitions++; } @@ -930,6 +965,9 @@ kafka_consume_main(Datum arg) ALLOCSET_DEFAULT_MAXSIZE); messages = palloc0(sizeof(rd_kafka_message_t *) * consumer.batch_size); + /* set copy hook */ + copy_iter_hook = copy_next; + /* * Consume messages until we are terminated */ @@ -967,9 +1005,9 @@ kafka_consume_main(Datum arg) /* Ignore partition EOF internal error */ if (messages[i]->err != RD_KAFKA_RESP_ERR__PARTITION_EOF) elog(LOG, CONSUMER_LOG_PREFIX "librdkafka error: %s", - CONSUMER_LOG_PREFIX_PARAMS(consumer), rd_kafka_err2str(messages[i]->err)); + CONSUMER_LOG_PREFIX_PARAMS(&consumer), rd_kafka_err2str(messages[i]->err)); } - else if (messages[i]->len > 0) + else if (messages[i]->len) { appendBinaryStringInfo(buf, messages[i]->payload, messages[i]->len); @@ -986,12 +1024,20 @@ kafka_consume_main(Datum arg) rd_kafka_message_destroy(messages[i]); messages[i] = NULL; } + + /* Flush if we've buffered enough messages or space used by messages has exceeded buffer size threshold */ + if (messages_buffered >= consumer.batch_size || buf->len >= consumer.max_bytes) + { + execute_copy(&consumer, proc, copy, buf, messages_buffered); + resetStringInfo(buf); + messages_buffered = 0; + } } librdkerrs = error_buf_pop(&my_error_buf); if (librdkerrs) elog(LOG, CONSUMER_LOG_PREFIX "librdkafka error: %s", - CONSUMER_LOG_PREFIX_PARAMS(consumer), librdkerrs); + CONSUMER_LOG_PREFIX_PARAMS(&consumer), librdkerrs); if (!messages_buffered) { @@ -999,31 +1045,7 @@ kafka_consume_main(Datum arg) continue; } - StartTransactionCommand(); - - /* we don't want to die in the event of any errors */ - PG_TRY(); - { - if (messages_buffered) - execute_copy(copy, buf); - } - PG_CATCH(); - { - elog(LOG, CONSUMER_LOG_PREFIX "failed to process batch, dropped %d message%s", - CONSUMER_LOG_PREFIX_PARAMS(consumer), messages_buffered, (messages_buffered == 1 ? "" : "s")); - EmitErrorReport(); - FlushErrorState(); - - AbortCurrentTransaction(); - } - PG_END_TRY(); - - if (!IsTransactionState()) - StartTransactionCommand(); - - save_consumer_offsets(&consumer, proc->partition_group); - - CommitTransactionCommand(); + execute_copy(&consumer, proc, copy, buf, messages_buffered); } done: @@ -1045,7 +1067,8 @@ kafka_consume_main(Datum arg) */ static int32 create_or_update_consumer(ResultRelInfo *consumers, text *relation, text *topic, - text *format, text *delimiter, text *quote, text *escape, int batchsize, int parallelism, int timeout) + text *format, text *delimiter, text *quote, text *escape, int batchsize, int maxbytes, + int parallelism, int timeout) { HeapTuple tup; Datum values[CONSUMER_RELATION_NATTS]; @@ -1066,6 +1089,7 @@ create_or_update_consumer(ResultRelInfo *consumers, text *relation, text *topic, tup = index_getnext(scan, ForwardScanDirection); values[CONSUMER_ATTR_BATCH_SIZE - 1] = Int32GetDatum(batchsize); + values[CONSUMER_ATTR_MAX_BYTES - 1] = Int32GetDatum(maxbytes); values[CONSUMER_ATTR_PARALLELISM - 1] = Int32GetDatum(parallelism); values[CONSUMER_ATTR_TIMEOUT - 1] = Int32GetDatum(timeout); values[CONSUMER_ATTR_FORMAT - 1] = PointerGetDatum(format); @@ -1270,6 +1294,7 @@ kafka_consume_begin_tr(PG_FUNCTION_ARGS) text *quote; text *escape; int batchsize; + int maxbytes; int parallelism; int timeout; int64 offset; @@ -1318,19 +1343,24 @@ kafka_consume_begin_tr(PG_FUNCTION_ARGS) batchsize = PG_GETARG_INT32(6); if (PG_ARGISNULL(7)) - parallelism = 1; + maxbytes = 32000000; else - parallelism = PG_GETARG_INT32(7); + maxbytes = PG_GETARG_INT32(7); if (PG_ARGISNULL(8)) - timeout = 250; + parallelism = 1; else - timeout = PG_GETARG_INT32(8); + parallelism = PG_GETARG_INT32(8); if (PG_ARGISNULL(9)) + timeout = 250; + else + timeout = PG_GETARG_INT32(9); + + if (PG_ARGISNULL(10)) offset = RD_KAFKA_OFFSET_NULL; else - offset = PG_GETARG_INT64(9); + offset = PG_GETARG_INT64(10); /* there's no point in progressing if there aren't any brokers */ if (!get_all_brokers()) @@ -1349,7 +1379,7 @@ kafka_consume_begin_tr(PG_FUNCTION_ARGS) consumers = relinfo_open(get_rangevar(CONSUMER_RELATION), ExclusiveLock); id = create_or_update_consumer(consumers, qualified_name, topic, format, - delimiter, quote, escape, batchsize, parallelism, timeout); + delimiter, quote, escape, batchsize, maxbytes, parallelism, timeout); load_consumer_state(id, &consumer); success = launch_consumer_group(&consumer, offset); relinfo_close(consumers, NoLock);