Skip to content

Commit

Permalink
Optimize decoding logic in fetch_response.go
Browse files Browse the repository at this point in the history
- fix returned offset value in decodeRecordsMagic2
- return if remaining bytes is incomplete
  • Loading branch information
childe committed Oct 15, 2024
1 parent c9310ad commit e0a9022
Showing 1 changed file with 13 additions and 10 deletions.
23 changes: 13 additions & 10 deletions fetch_response.go
Original file line number Diff line number Diff line change
Expand Up @@ -110,6 +110,7 @@ func uncompress(compress int8, reader io.Reader) (uncompressedBytes []byte, err

func (streamDecoder *fetchResponseStreamDecoder) decodeMessageSetMagic0or1(topicName string, partitionID int32, magic int, header17 []byte) (offset int, err error) {
firstMessageSet := true
// value is the bytes of whole record consists of (offset(int64) message_size(int32) message)
var value []byte
for {
if firstMessageSet {
Expand All @@ -123,20 +124,20 @@ func (streamDecoder *fetchResponseStreamDecoder) decodeMessageSetMagic0or1(topic

// 12 is the size of offset(int64) & message_size(int32) in header17
value = make([]byte, 12+messageSize)
copy(value, header17)
n, e := streamDecoder.Read(value[17:])
offset += n
if e != nil {
return offset, e
}

// remaining bytes is not complete
if n < messageSize-17 {
return
// remaining bytes is incomplete
if n < len(value)-17 {
return offset, err
}

offset += messageSize + 12
copy(value, header17)
} else {
buf, n, err := streamDecoder.read(12)
offset += n
if err != nil {
return offset, err
}
Expand All @@ -146,15 +147,16 @@ func (streamDecoder *fetchResponseStreamDecoder) decodeMessageSetMagic0or1(topic
messageSize := int(binary.BigEndian.Uint32(buf[8:]))
value = make([]byte, 12+messageSize)
n, err = streamDecoder.Read(value[12:])
offset += n
if err != nil {
return offset, err
}
if n < messageSize {

// remaining bytes is incomplete
if n < len(value)-17 {
return offset, err
}
copy(value, buf)

offset += messageSize + 12
}

messageSet, err := DecodeToMessageSet(value)
Expand Down Expand Up @@ -275,7 +277,8 @@ func (streamDecoder *fetchResponseStreamDecoder) decodeRecordsMagic2(topicName s
return offset, nil
}

// messageSetSizeBytes may include more the one `Record Batch`, that is, `Record Batch`,`Record Batch`,`Record Batch`...
// messageSetSizeBytes may include more the one `Record Batch`,
// that is, `Record Batch`,`Record Batch`,`Record Batch`...
func (streamDecoder *fetchResponseStreamDecoder) decodeMessageSet(topicName string, partitionID int32, messageSetSizeBytes int32, version uint16) (err error) {
defer func() {
if err == io.EOF || err == &maxBytesTooSmall {
Expand Down

0 comments on commit e0a9022

Please sign in to comment.