Skip to content

Commit

Permalink
maxbytes and proper handling of large messages
Browse files Browse the repository at this point in the history
  • Loading branch information
usmanm committed Jul 6, 2016
1 parent 9a811d5 commit 6eb3a69
Show file tree
Hide file tree
Showing 2 changed files with 84 additions and 52 deletions.
2 changes: 2 additions & 0 deletions pipeline_kafka--0.9.3.sql
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ CREATE TABLE pipeline_kafka.consumers (
quote text,
escape text,
batchsize integer NOT NULL,
maxbytes integer NOT NULL,
parallelism integer NOT NULL,
timeout integer NOT NULL,
UNIQUE (relation, topic)
Expand Down Expand Up @@ -38,6 +39,7 @@ CREATE FUNCTION pipeline_kafka.consume_begin (
quote text DEFAULT NULL,
escape text DEFAULT NULL,
batchsize integer DEFAULT 1000,
maxbytes integer DEFAULT 32000000, -- 32mb
parallelism integer DEFAULT 1,
timeout integer DEFAULT 250,
start_offset bigint DEFAULT NULL
Expand Down
134 changes: 82 additions & 52 deletions pipeline_kafka.c
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@ PG_MODULE_MAGIC;
#define PIPELINE_KAFKA_SCHEMA "pipeline_kafka"

#define CONSUMER_RELATION "consumers"
#define CONSUMER_RELATION_NATTS 10
#define CONSUMER_RELATION_NATTS 11
#define CONSUMER_ATTR_ID 1
#define CONSUMER_ATTR_RELATION 2
#define CONSUMER_ATTR_TOPIC 3
Expand All @@ -69,8 +69,9 @@ PG_MODULE_MAGIC;
#define CONSUMER_ATTR_QUOTE 6
#define CONSUMER_ATTR_ESCAPE 7
#define CONSUMER_ATTR_BATCH_SIZE 8
#define CONSUMER_ATTR_PARALLELISM 9
#define CONSUMER_ATTR_TIMEOUT 10
#define CONSUMER_ATTR_MAX_BYTES 9
#define CONSUMER_ATTR_PARALLELISM 10
#define CONSUMER_ATTR_TIMEOUT 11

#define OFFSETS_RELATION "offsets"
#define OFFSETS_RELATION_NATTS 3
Expand Down Expand Up @@ -102,8 +103,8 @@ PG_MODULE_MAGIC;

#define RD_KAFKA_OFFSET_NULL INT64_MIN

#define CONSUMER_LOG_PREFIX "[pipeline_kafka consumer] %s <- %s (PID %d): "
#define CONSUMER_LOG_PREFIX_PARAMS(consumer) consumer.rel->relname, consumer.topic, MyProcPid
#define CONSUMER_LOG_PREFIX "[pipeline_kafka] %s <- %s (PID %d): "
#define CONSUMER_LOG_PREFIX_PARAMS(consumer) (consumer)->rel->relname, (consumer)->topic, MyProcPid
#define CONSUMER_WORKER_RESTART_TIME 1

static volatile sig_atomic_t got_SIGTERM = false;
Expand Down Expand Up @@ -222,6 +223,7 @@ typedef struct KafkaConsumer
int32_t partition;
int64_t offset;
int batch_size;
int max_bytes;
int parallelism;
int timeout;
char *format;
Expand Down Expand Up @@ -595,6 +597,11 @@ load_consumer_state(int32 consumer_id, KafkaConsumer *consumer)
Assert(!isnull);
consumer->batch_size = DatumGetInt32(d);

/* max bytes */
d = slot_getattr(slot, CONSUMER_ATTR_MAX_BYTES, &isnull);
Assert(!isnull);
consumer->max_bytes = DatumGetInt32(d);

/* timeout */
d = slot_getattr(slot, CONSUMER_ATTR_TIMEOUT, &isnull);
Assert(!isnull);
Expand Down Expand Up @@ -791,17 +798,40 @@ get_copy_statement(KafkaConsumer *consumer)
*
* Write messages to stream
*/
static uint64
execute_copy(CopyStmt *stmt, StringInfo buf)
static void
execute_copy(KafkaConsumer *consumer, KafkaConsumerProc *proc, CopyStmt *stmt, StringInfo buf, int num_messages)
{
uint64 processed;
MemoryContext old = CurrentMemoryContext;

copy_iter_hook = copy_next;
copy_iter_arg = buf;
StartTransactionCommand();

/* we don't want to die in the event of any errors */
PG_TRY();
{
uint64 processed;
copy_iter_arg = buf;
DoCopy(stmt, "COPY", &processed);
}
PG_CATCH();
{
elog(LOG, CONSUMER_LOG_PREFIX "failed to process batch, dropped %d message%s",
CONSUMER_LOG_PREFIX_PARAMS(consumer), num_messages, (num_messages == 1 ? "" : "s"));

EmitErrorReport();
FlushErrorState();

DoCopy(stmt, "COPY", &processed);
AbortCurrentTransaction();
}
PG_END_TRY();

if (!IsTransactionState())
StartTransactionCommand();

save_consumer_offsets(consumer, proc->partition_group);

CommitTransactionCommand();

return processed;
MemoryContextSwitchTo(old);
}

/*
Expand Down Expand Up @@ -830,6 +860,8 @@ kafka_consume_main(Datum arg)
int i;
int my_partitions = 0;
MemoryContext work_ctx;
char errstr[512];
char val[64];

if (!found)
{
Expand Down Expand Up @@ -858,6 +890,9 @@ kafka_consume_main(Datum arg)
copy = get_copy_statement(&consumer);

topic_conf = rd_kafka_topic_conf_new();
sprintf(val, "%d", consumer.max_bytes);
rd_kafka_topic_conf_set(topic_conf, "fetch.message.max.bytes", val, errstr, sizeof(errstr));

kafka = rd_kafka_new(RD_KAFKA_CONSUMER, NULL, err_msg, sizeof(err_msg));
rd_kafka_set_logger(kafka, consumer_logger);

Expand All @@ -867,7 +902,7 @@ kafka_consume_main(Datum arg)
if (consumer.brokers == NIL)
{
elog(WARNING, CONSUMER_LOG_PREFIX "no brokers found in pipeline_kafka.brokers",
CONSUMER_LOG_PREFIX_PARAMS(consumer));
CONSUMER_LOG_PREFIX_PARAMS(&consumer));
goto done;
}

Expand All @@ -876,7 +911,7 @@ kafka_consume_main(Datum arg)

if (!valid_brokers)
elog(ERROR, CONSUMER_LOG_PREFIX "no valid brokers were found",
CONSUMER_LOG_PREFIX_PARAMS(consumer));
CONSUMER_LOG_PREFIX_PARAMS(&consumer));

/*
* Set up our topic to read from
Expand All @@ -886,7 +921,7 @@ kafka_consume_main(Datum arg)

if (err != RD_KAFKA_RESP_ERR_NO_ERROR)
elog(ERROR, CONSUMER_LOG_PREFIX "failed to acquire metadata: %s",
CONSUMER_LOG_PREFIX_PARAMS(consumer), rd_kafka_err2str(err));
CONSUMER_LOG_PREFIX_PARAMS(&consumer), rd_kafka_err2str(err));

Assert(meta->topic_cnt == 1);
topic_meta = meta->topics[0];
Expand All @@ -905,11 +940,11 @@ kafka_consume_main(Datum arg)
continue;

elog(LOG, CONSUMER_LOG_PREFIX "consuming partition %d from offset %ld",
CONSUMER_LOG_PREFIX_PARAMS(consumer), partition, consumer.offsets[partition]);
CONSUMER_LOG_PREFIX_PARAMS(&consumer), partition, consumer.offsets[partition]);

if (rd_kafka_consume_start(topic, partition, consumer.offsets[partition]) == -1)
elog(ERROR, CONSUMER_LOG_PREFIX "failed to start consuming: %s",
CONSUMER_LOG_PREFIX_PARAMS(consumer), rd_kafka_err2str(rd_kafka_errno2err(errno)));
CONSUMER_LOG_PREFIX_PARAMS(&consumer), rd_kafka_err2str(rd_kafka_errno2err(errno)));

my_partitions++;
}
Expand All @@ -930,6 +965,9 @@ kafka_consume_main(Datum arg)
ALLOCSET_DEFAULT_MAXSIZE);
messages = palloc0(sizeof(rd_kafka_message_t *) * consumer.batch_size);

/* set copy hook */
copy_iter_hook = copy_next;

/*
* Consume messages until we are terminated
*/
Expand Down Expand Up @@ -967,9 +1005,9 @@ kafka_consume_main(Datum arg)
/* Ignore partition EOF internal error */
if (messages[i]->err != RD_KAFKA_RESP_ERR__PARTITION_EOF)
elog(LOG, CONSUMER_LOG_PREFIX "librdkafka error: %s",
CONSUMER_LOG_PREFIX_PARAMS(consumer), rd_kafka_err2str(messages[i]->err));
CONSUMER_LOG_PREFIX_PARAMS(&consumer), rd_kafka_err2str(messages[i]->err));
}
else if (messages[i]->len > 0)
else if (messages[i]->len)
{
appendBinaryStringInfo(buf, messages[i]->payload, messages[i]->len);

Expand All @@ -986,44 +1024,28 @@ kafka_consume_main(Datum arg)
rd_kafka_message_destroy(messages[i]);
messages[i] = NULL;
}

/* Flush if we've buffered enough messages or space used by messages has exceeded buffer size threshold */
if (messages_buffered >= consumer.batch_size || buf->len >= consumer.max_bytes)
{
execute_copy(&consumer, proc, copy, buf, messages_buffered);
resetStringInfo(buf);
messages_buffered = 0;
}
}

librdkerrs = error_buf_pop(&my_error_buf);
if (librdkerrs)
elog(LOG, CONSUMER_LOG_PREFIX "librdkafka error: %s",
CONSUMER_LOG_PREFIX_PARAMS(consumer), librdkerrs);
CONSUMER_LOG_PREFIX_PARAMS(&consumer), librdkerrs);

if (!messages_buffered)
{
pg_usleep(1000);
continue;
}

StartTransactionCommand();

/* we don't want to die in the event of any errors */
PG_TRY();
{
if (messages_buffered)
execute_copy(copy, buf);
}
PG_CATCH();
{
elog(LOG, CONSUMER_LOG_PREFIX "failed to process batch, dropped %d message%s",
CONSUMER_LOG_PREFIX_PARAMS(consumer), messages_buffered, (messages_buffered == 1 ? "" : "s"));
EmitErrorReport();
FlushErrorState();

AbortCurrentTransaction();
}
PG_END_TRY();

if (!IsTransactionState())
StartTransactionCommand();

save_consumer_offsets(&consumer, proc->partition_group);

CommitTransactionCommand();
execute_copy(&consumer, proc, copy, buf, messages_buffered);
}

done:
Expand All @@ -1045,7 +1067,8 @@ kafka_consume_main(Datum arg)
*/
static int32
create_or_update_consumer(ResultRelInfo *consumers, text *relation, text *topic,
text *format, text *delimiter, text *quote, text *escape, int batchsize, int parallelism, int timeout)
text *format, text *delimiter, text *quote, text *escape, int batchsize, int maxbytes,
int parallelism, int timeout)
{
HeapTuple tup;
Datum values[CONSUMER_RELATION_NATTS];
Expand All @@ -1066,6 +1089,7 @@ create_or_update_consumer(ResultRelInfo *consumers, text *relation, text *topic,
tup = index_getnext(scan, ForwardScanDirection);

values[CONSUMER_ATTR_BATCH_SIZE - 1] = Int32GetDatum(batchsize);
values[CONSUMER_ATTR_MAX_BYTES - 1] = Int32GetDatum(maxbytes);
values[CONSUMER_ATTR_PARALLELISM - 1] = Int32GetDatum(parallelism);
values[CONSUMER_ATTR_TIMEOUT - 1] = Int32GetDatum(timeout);
values[CONSUMER_ATTR_FORMAT - 1] = PointerGetDatum(format);
Expand Down Expand Up @@ -1270,6 +1294,7 @@ kafka_consume_begin_tr(PG_FUNCTION_ARGS)
text *quote;
text *escape;
int batchsize;
int maxbytes;
int parallelism;
int timeout;
int64 offset;
Expand Down Expand Up @@ -1318,19 +1343,24 @@ kafka_consume_begin_tr(PG_FUNCTION_ARGS)
batchsize = PG_GETARG_INT32(6);

if (PG_ARGISNULL(7))
parallelism = 1;
maxbytes = 32000000;
else
parallelism = PG_GETARG_INT32(7);
maxbytes = PG_GETARG_INT32(7);

if (PG_ARGISNULL(8))
timeout = 250;
parallelism = 1;
else
timeout = PG_GETARG_INT32(8);
parallelism = PG_GETARG_INT32(8);

if (PG_ARGISNULL(9))
timeout = 250;
else
timeout = PG_GETARG_INT32(9);

if (PG_ARGISNULL(10))
offset = RD_KAFKA_OFFSET_NULL;
else
offset = PG_GETARG_INT64(9);
offset = PG_GETARG_INT64(10);

/* there's no point in progressing if there aren't any brokers */
if (!get_all_brokers())
Expand All @@ -1349,7 +1379,7 @@ kafka_consume_begin_tr(PG_FUNCTION_ARGS)

consumers = relinfo_open(get_rangevar(CONSUMER_RELATION), ExclusiveLock);
id = create_or_update_consumer(consumers, qualified_name, topic, format,
delimiter, quote, escape, batchsize, parallelism, timeout);
delimiter, quote, escape, batchsize, maxbytes, parallelism, timeout);
load_consumer_state(id, &consumer);
success = launch_consumer_group(&consumer, offset);
relinfo_close(consumers, NoLock);
Expand Down

0 comments on commit 6eb3a69

Please sign in to comment.