diff --git a/.DS_Store b/.DS_Store index 517e95fc..960f17b6 100644 Binary files a/.DS_Store and b/.DS_Store differ diff --git a/partition_processor.go b/partition_processor.go index b8f4d6c0..4e639f21 100644 --- a/partition_processor.go +++ b/partition_processor.go @@ -641,8 +641,8 @@ func (pp *PartitionProcessor) processMessage(ctx context.Context, wg *sync.WaitG err error ) - pp.log.Debugf("processing message topic=%s offset=%v partition=%v key=%s", - msg.topic, msg.offset, msg.partition, msg.key) + //pp.log.Debugf("processing message topic=%s offset=%v partition=%v key=%s", + // msg.topic, msg.offset, msg.partition, msg.key) // decide whether to decode or ignore message switch { diff --git a/partition_table.go b/partition_table.go index 8d676145..c38c3955 100644 --- a/partition_table.go +++ b/partition_table.go @@ -463,41 +463,42 @@ func (p *PartitionTable) loadMessages(ctx context.Context, cons sarama.Partition } case msg, ok := <-messages: if !ok { - p.log.Debugf( - "load messages exit with msg !ok (1) = topic=%s, partition=%v", - p.topic, - p.partition, - ) - p.log.Debugf( - "load messages exit with msg !ok (2) = topic=%s, partition=%v, key=%s, offset=%v, timestamp=%v", - p.topic, - p.partition, - string(msg.Key), - msg.Offset, - msg.Timestamp, - ) + //p.log.Debugf( + // "load messages exit with msg !ok (1) = topic=%s, partition=%v", + // p.topic, + // p.partition, + //) + //p.log.Debugf( + // "load messages exit with msg !ok (2) = topic=%s, partition=%v, key=%s, offset=%v, timestamp=%v", + // p.topic, + // p.partition, + // string(msg.Key), + // msg.Offset, + // msg.Timestamp, + //) return nil } // This case is for the Tester to achieve synchronity. // Nil messages are never generated by the Sarama Consumer if msg == nil { - p.log.Debugf( - "load messages continue msg == nil = topic=%s, partition=%v, key=%s, offset=%v, timestamp=%v", - p.topic, - p.partition, - ) + //p.log.Debugf( + // "load messages continue msg == nil = topic=%s, partition=%v, key=%s, offset=%v, timestamp=%v", + // p.topic, + // p.partition, + //) continue } - if msg.Topic == "anomalydetector-nodata-rules" { + if p.topic == "anomalydetector-nodata-rules" { p.log.Debugf( - "load messages = topic=%s, partition=%v, key=%s, offset=%v, timestamp=%v", + "load messages = topic=%s, partition=%v, key=%s, offset=%v, timestamp=%v, partitionHwm=%v", msg.Topic, msg.Partition, string(msg.Key), msg.Offset, msg.Timestamp, + partitionHwm, ) } @@ -627,15 +628,15 @@ func (p *PartitionTable) updateHwmStats() { } func (p *PartitionTable) storeEvent(key string, value []byte, offset int64, headers []*sarama.RecordHeader) error { - if p.topic == "anomalydetector-nodata-rules" { - p.log.Debugf( - "store event = topic=%s, partition=%v, key=%s, offset=%v", - p.topic, - p.partition, - key, - offset, - ) - } + //if p.topic == "anomalydetector-nodata-rules" { + // p.log.Debugf( + // "store event = topic=%s, partition=%v, key=%s, offset=%v", + // p.topic, + // p.partition, + // key, + // offset, + // ) + //} err := p.st.Update(&DefaultUpdateContext{ topic: p.st.topic, diff --git a/processor.go b/processor.go index f3e58031..cb639742 100644 --- a/processor.go +++ b/processor.go @@ -823,16 +823,16 @@ func (g *Processor) ConsumeClaim(session sarama.ConsumerGroupSession, claim sara headers: msg.Headers, value: msg.Value, } - if msg.Topic == "anomalydetector-nodata-rules" { - g.log.Debugf( - "consume claim - topic=%s, partition=%v, key=%s, offset=%v, timestamp=%v, value=%v", - msg.Topic, - msg.Partition, - string(msg.Key), - msg.Offset, - msg.Timestamp, - ) - } + //if msg.Topic == "anomalydetector-nodata-rules" { + // g.log.Debugf( + // "consume claim - topic=%s, partition=%v, key=%s, offset=%v, timestamp=%v, value=%v", + // msg.Topic, + // msg.Partition, + // string(msg.Key), + // msg.Offset, + // msg.Timestamp, + // ) + //} return m }(): case <-stopping: