Skip to content

Commit

Permalink
Logs
Browse files Browse the repository at this point in the history
  • Loading branch information
nobl9-adam-szymanski committed Aug 5, 2023
1 parent 6ed18c7 commit 489b414
Show file tree
Hide file tree
Showing 4 changed files with 42 additions and 41 deletions.
Binary file modified .DS_Store
Binary file not shown.
4 changes: 2 additions & 2 deletions partition_processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -641,8 +641,8 @@ func (pp *PartitionProcessor) processMessage(ctx context.Context, wg *sync.WaitG
err error
)

pp.log.Debugf("processing message topic=%s offset=%v partition=%v key=%s",
msg.topic, msg.offset, msg.partition, msg.key)
//pp.log.Debugf("processing message topic=%s offset=%v partition=%v key=%s",
// msg.topic, msg.offset, msg.partition, msg.key)

// decide whether to decode or ignore message
switch {
Expand Down
59 changes: 30 additions & 29 deletions partition_table.go
Original file line number Diff line number Diff line change
Expand Up @@ -463,41 +463,42 @@ func (p *PartitionTable) loadMessages(ctx context.Context, cons sarama.Partition
}
case msg, ok := <-messages:
if !ok {
p.log.Debugf(
"load messages exit with msg !ok (1) = topic=%s, partition=%v",
p.topic,
p.partition,
)
p.log.Debugf(
"load messages exit with msg !ok (2) = topic=%s, partition=%v, key=%s, offset=%v, timestamp=%v",
p.topic,
p.partition,
string(msg.Key),
msg.Offset,
msg.Timestamp,
)
//p.log.Debugf(
// "load messages exit with msg !ok (1) = topic=%s, partition=%v",
// p.topic,
// p.partition,
//)
//p.log.Debugf(
// "load messages exit with msg !ok (2) = topic=%s, partition=%v, key=%s, offset=%v, timestamp=%v",
// p.topic,
// p.partition,
// string(msg.Key),
// msg.Offset,
// msg.Timestamp,
//)
return nil
}

// This case is for the Tester to achieve synchronity.
// Nil messages are never generated by the Sarama Consumer
if msg == nil {
p.log.Debugf(
"load messages continue msg == nil = topic=%s, partition=%v, key=%s, offset=%v, timestamp=%v",
p.topic,
p.partition,
)
//p.log.Debugf(
// "load messages continue msg == nil = topic=%s, partition=%v, key=%s, offset=%v, timestamp=%v",
// p.topic,
// p.partition,
//)
continue
}

if msg.Topic == "anomalydetector-nodata-rules" {
if p.topic == "anomalydetector-nodata-rules" {
p.log.Debugf(
"load messages = topic=%s, partition=%v, key=%s, offset=%v, timestamp=%v",
"load messages = topic=%s, partition=%v, key=%s, offset=%v, timestamp=%v, partitionHwm=%v",
msg.Topic,
msg.Partition,
string(msg.Key),
msg.Offset,
msg.Timestamp,
partitionHwm,
)
}

Expand Down Expand Up @@ -627,15 +628,15 @@ func (p *PartitionTable) updateHwmStats() {
}

func (p *PartitionTable) storeEvent(key string, value []byte, offset int64, headers []*sarama.RecordHeader) error {
if p.topic == "anomalydetector-nodata-rules" {
p.log.Debugf(
"store event = topic=%s, partition=%v, key=%s, offset=%v",
p.topic,
p.partition,
key,
offset,
)
}
//if p.topic == "anomalydetector-nodata-rules" {
// p.log.Debugf(
// "store event = topic=%s, partition=%v, key=%s, offset=%v",
// p.topic,
// p.partition,
// key,
// offset,
// )
//}

err := p.st.Update(&DefaultUpdateContext{
topic: p.st.topic,
Expand Down
20 changes: 10 additions & 10 deletions processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -823,16 +823,16 @@ func (g *Processor) ConsumeClaim(session sarama.ConsumerGroupSession, claim sara
headers: msg.Headers,
value: msg.Value,
}
if msg.Topic == "anomalydetector-nodata-rules" {
g.log.Debugf(
"consume claim - topic=%s, partition=%v, key=%s, offset=%v, timestamp=%v, value=%v",
msg.Topic,
msg.Partition,
string(msg.Key),
msg.Offset,
msg.Timestamp,
)
}
//if msg.Topic == "anomalydetector-nodata-rules" {
// g.log.Debugf(
// "consume claim - topic=%s, partition=%v, key=%s, offset=%v, timestamp=%v, value=%v",
// msg.Topic,
// msg.Partition,
// string(msg.Key),
// msg.Offset,
// msg.Timestamp,
// )
//}
return m
}():
case <-stopping:
Expand Down

0 comments on commit 489b414

Please sign in to comment.