diff --git a/partition_table.go b/partition_table.go index 7985d225..f81cce85 100644 --- a/partition_table.go +++ b/partition_table.go @@ -461,7 +461,7 @@ func (p *PartitionTable) loadMessages(ctx context.Context, cons sarama.Partition continue } - if msg.Topic == "anomalydetector-nodata-rules-table" { + if msg.Topic == "anomalydetector-nodata-rules" { p.log.Debugf( "load messages = topic=%s, partition=%v, key=%s, offset=%v, timestamp=%v", msg.Topic, @@ -598,7 +598,7 @@ func (p *PartitionTable) updateHwmStats() { } func (p *PartitionTable) storeEvent(key string, value []byte, offset int64, headers []*sarama.RecordHeader) error { - if p.topic == "anomalydetector-nodata-rules-table" { + if p.topic == "anomalydetector-nodata-rules" { p.log.Debugf( "store event = topic=%s, partition=%v, key=%s, offset=%v", p.topic, diff --git a/processor.go b/processor.go index 5e146674..f3e58031 100644 --- a/processor.go +++ b/processor.go @@ -823,16 +823,16 @@ func (g *Processor) ConsumeClaim(session sarama.ConsumerGroupSession, claim sara headers: msg.Headers, value: msg.Value, } - //if msg.Topic == "anomalydetector-nodata-rules-table" { - // g.log.Debugf( - // "topic=%s, partition=%v, key=%s, offset=%v, timestamp=%v, value=%v", - // msg.Topic, - // msg.Partition, - // string(msg.Key), - // msg.Offset, - // msg.Timestamp, - // ) - //} + if msg.Topic == "anomalydetector-nodata-rules" { + g.log.Debugf( + "consume claim - topic=%s, partition=%v, key=%s, offset=%v, timestamp=%v, value=%v", + msg.Topic, + msg.Partition, + string(msg.Key), + msg.Offset, + msg.Timestamp, + ) + } return m }(): case <-stopping: