diff --git a/fetch_response.go b/fetch_response.go index d9b7870..a3081d5 100644 --- a/fetch_response.go +++ b/fetch_response.go @@ -110,6 +110,7 @@ func uncompress(compress int8, reader io.Reader) (uncompressedBytes []byte, err func (streamDecoder *fetchResponseStreamDecoder) decodeMessageSetMagic0or1(topicName string, partitionID int32, magic int, header17 []byte) (offset int, err error) { firstMessageSet := true + // value is the bytes of whole record consists of (offset(int64) message_size(int32) message) var value []byte for { if firstMessageSet { @@ -123,20 +124,20 @@ func (streamDecoder *fetchResponseStreamDecoder) decodeMessageSetMagic0or1(topic // 12 is the size of offset(int64) & message_size(int32) in header17 value = make([]byte, 12+messageSize) - copy(value, header17) n, e := streamDecoder.Read(value[17:]) + offset += n if e != nil { return offset, e } - // remaining bytes is not complete - if n < messageSize-17 { - return + // remaining bytes is incomplete + if n < len(value)-17 { + return offset, err } - - offset += messageSize + 12 + copy(value, header17) } else { buf, n, err := streamDecoder.read(12) + offset += n if err != nil { return offset, err } @@ -146,15 +147,16 @@ func (streamDecoder *fetchResponseStreamDecoder) decodeMessageSetMagic0or1(topic messageSize := int(binary.BigEndian.Uint32(buf[8:])) value = make([]byte, 12+messageSize) n, err = streamDecoder.Read(value[12:]) + offset += n if err != nil { return offset, err } - if n < messageSize { + + // remaining bytes is incomplete + if n < len(value)-17 { return offset, err } copy(value, buf) - - offset += messageSize + 12 } messageSet, err := DecodeToMessageSet(value) @@ -275,7 +277,8 @@ func (streamDecoder *fetchResponseStreamDecoder) decodeRecordsMagic2(topicName s return offset, nil } -// messageSetSizeBytes may include more the one `Record Batch`, that is, `Record Batch`,`Record Batch`,`Record Batch`... +// messageSetSizeBytes may include more the one `Record Batch`, +// that is, `Record Batch`,`Record Batch`,`Record Batch`... func (streamDecoder *fetchResponseStreamDecoder) decodeMessageSet(topicName string, partitionID int32, messageSetSizeBytes int32, version uint16) (err error) { defer func() { if err == io.EOF || err == &maxBytesTooSmall {