Skip to content

Commit

Permalink
fix: use offset instead of OldStyleOffsets
Browse files Browse the repository at this point in the history
  • Loading branch information
childe committed Oct 8, 2024
1 parent a2e9b39 commit 2f8753a
Show file tree
Hide file tree
Showing 6 changed files with 20 additions and 37 deletions.
12 changes: 2 additions & 10 deletions command/healer/cmd/apicontrollers/group.go
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
package apicontrollers

import (
"fmt"
"net/http"

"github.com/childe/healer"
Expand Down Expand Up @@ -85,16 +84,9 @@ func getOffset(brokers *healer.Brokers, topic, client string) (map[int32]int64,

rst := make(map[int32]int64)
for _, offsetsResponse := range offsetsResponses {
for topic, partitionOffsets := range offsetsResponse.TopicPartitionOffsets {
for _, partitionOffsets := range offsetsResponse.TopicPartitionOffsets {
for _, partitionOffset := range partitionOffsets {
if len(partitionOffset.OldStyleOffsets) == 0 {
rst[partitionOffset.Partition] = -1
continue
}
if len(partitionOffset.OldStyleOffsets) != 1 {
return nil, fmt.Errorf("%s[%d] offsets length mismatch: %v", topic, partitionOffset.Partition, partitionOffset.OldStyleOffsets)
}
rst[partitionOffset.Partition] = partitionOffset.OldStyleOffsets[0]
rst[partitionOffset.Partition] = partitionOffset.GetOffset()
}
}
}
Expand Down
11 changes: 1 addition & 10 deletions command/healer/cmd/get-offsets.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,16 +61,7 @@ var getOffsetsCmd = &cobra.Command{
sort.Slice(allPartitions, func(i, j int) bool { return allPartitions[i].Partition < allPartitions[j].Partition })
for _, p := range allPartitions {
fmt.Printf("%s:%d:", topic, p.Partition)
if len(p.OldStyleOffsets) > 0 {
for i, offset := range p.OldStyleOffsets {
if i != 0 {
fmt.Print(",")
}
fmt.Printf("%d", offset)
}
} else {
fmt.Printf("%d %d", p.Timestamp, p.Offset)
}
fmt.Printf("%d %d", p.Timestamp, p.GetOffset())
fmt.Println()
}

Expand Down
11 changes: 2 additions & 9 deletions command/healer/cmd/get-pending.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,16 +55,9 @@ func getOffset(topic, client string) (map[int32]int64, error) {

rst := make(map[int32]int64)
for _, offsetsResponse := range offsetsResponses {
for topic, partitionOffsets := range offsetsResponse.TopicPartitionOffsets {
for _, partitionOffsets := range offsetsResponse.TopicPartitionOffsets {
for _, partitionOffset := range partitionOffsets {
if len(partitionOffset.OldStyleOffsets) == 0 {
rst[partitionOffset.Partition] = -1
continue
}
if len(partitionOffset.OldStyleOffsets) != 1 {
return nil, fmt.Errorf("%s[%d] offsets length mismatch: %v", topic, partitionOffset.Partition, partitionOffset.OldStyleOffsets)
}
rst[partitionOffset.Partition] = partitionOffset.OldStyleOffsets[0]
rst[partitionOffset.Partition] = partitionOffset.GetOffset()
}
}
}
Expand Down
9 changes: 2 additions & 7 deletions command/healer/cmd/reset-offset.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,15 +50,10 @@ var resetOffsetCmd = &cobra.Command{
if err := offsetsResponse.Error(); err != nil {
return fmt.Errorf("request offsets error: %w. topic: %s, timestmap: %d", err, topic, timestamp)
}
for topic, partitionOffsets := range offsetsResponse.TopicPartitionOffsets {
for _, partitionOffsets := range offsetsResponse.TopicPartitionOffsets {
for _, partitionOffset := range partitionOffsets {
partition := partitionOffset.Partition
if len(partitionOffset.OldStyleOffsets) == 0 {
return fmt.Errorf("offsets of %s[%d] is blank", topic, partition)
}
klog.Infof("%s:%d:%v", topic, partition, partitionOffset.OldStyleOffsets)
offset := int64(partitionOffset.OldStyleOffsets[0])
offsets[partition] = offset
offsets[partition] = partitionOffset.GetOffset()
}
}
}
Expand Down
12 changes: 12 additions & 0 deletions offset_response.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,18 @@ type PartitionOffset struct {
Timestamp int64
Offset int64
}

// get the offset of the given partition from OldStyleOffsets or Offset
func (p *PartitionOffset) GetOffset() int64 {
if len(p.OldStyleOffsets) == 0 {
return p.Offset
}
if len(p.OldStyleOffsets) == 1 {
return p.OldStyleOffsets[0]
}
panic("more than one old style offset found in offset response")
}

type OffsetsResponse struct {
CorrelationID uint32
ThrottleTimeMs int32
Expand Down
2 changes: 1 addition & 1 deletion simple_consumer.go
Original file line number Diff line number Diff line change
Expand Up @@ -218,7 +218,7 @@ func (c *SimpleConsumer) getOffset(fromBeginning bool) (int64, error) {
if err := offsetsResponse.Error(); err != nil {
return -1, err
}
return int64(offsetsResponse.TopicPartitionOffsets[c.topic][0].OldStyleOffsets[0]), nil
return int64(offsetsResponse.TopicPartitionOffsets[c.topic][0].GetOffset()), nil
}

func (c *SimpleConsumer) getCommitedOffet() error {
Expand Down

0 comments on commit 2f8753a

Please sign in to comment.