Skip to content

Commit

Permalink
Merge pull request #28 from childe/fetch-version-7
Browse files Browse the repository at this point in the history
Fetch version 7
  • Loading branch information
childe authored Oct 17, 2024
2 parents e0a9022 + 586bd4b commit ee0b1d0
Show file tree
Hide file tree
Showing 3 changed files with 9 additions and 9 deletions.
12 changes: 6 additions & 6 deletions fetch_request.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ func (fetchRequest *FetchRequest) addPartition(topic string, partitionID int32,
}

if value, ok := fetchRequest.Topics[topic]; ok {
value = append(value, partitionBlock)
fetchRequest.Topics[topic] = append(value, partitionBlock)
} else {
fetchRequest.Topics[topic] = []*PartitionBlock{partitionBlock}
}
Expand Down Expand Up @@ -94,17 +94,17 @@ func (fetchRequest *FetchRequest) Encode(version uint16) []byte {
binary.BigEndian.PutUint32(payload[offset:], uint32(fetchRequest.MinBytes))
offset += 4

if version >= 10 {
if version >= 7 {
binary.BigEndian.PutUint32(payload[offset:], uint32(fetchRequest.MaxBytes))
offset += 4
payload[offset] = byte(fetchRequest.ISOLationLevel)
offset++
}
if version >= 10 {
if version >= 7 {
binary.BigEndian.PutUint32(payload[offset:], uint32(fetchRequest.SessionID))
offset += 4
}
if version >= 10 {
if version >= 7 {
binary.BigEndian.PutUint32(payload[offset:], uint32(fetchRequest.SessionEpoch))
offset += 4
}
Expand All @@ -127,7 +127,7 @@ func (fetchRequest *FetchRequest) Encode(version uint16) []byte {
}
binary.BigEndian.PutUint64(payload[offset:], uint64(partitionBlock.FetchOffset))
offset += 8
if version >= 10 {
if version >= 7 {
binary.BigEndian.PutUint64(payload[offset:], uint64(partitionBlock.LogStartOffset))
offset += 8
}
Expand All @@ -136,7 +136,7 @@ func (fetchRequest *FetchRequest) Encode(version uint16) []byte {
}
}

if version >= 10 {
if version >= 7 {
binary.BigEndian.PutUint32(payload[offset:], uint32(len(fetchRequest.ForgottenTopicsDatas)))
offset += 4
for topicName, partitions := range fetchRequest.ForgottenTopicsDatas {
Expand Down
4 changes: 2 additions & 2 deletions fetch_response.go
Original file line number Diff line number Diff line change
Expand Up @@ -344,7 +344,7 @@ func (streamDecoder *fetchResponseStreamDecoder) decodePartitionResponse(topicNa
switch version {
case 0:
bytesBeforeRecordsLength = 18
case 10:
case 7, 10:
bytesBeforeRecordsLength = 38
}
buffer, n, err = streamDecoder.read(bytesBeforeRecordsLength)
Expand Down Expand Up @@ -424,7 +424,7 @@ func (streamDecoder *fetchResponseStreamDecoder) decodeHeader(version uint16) er
case 0:
headerLength = 8
countOffset = 4
case 10:
case 7, 10:
headerLength = 18
countOffset = 14
}
Expand Down
2 changes: 1 addition & 1 deletion request.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ const (
// It must be sorted from high to low
var availableVersions map[uint16][]uint16 = map[uint16][]uint16{
API_MetadataRequest: {7, 1},
API_FetchRequest: {10, 0},
API_FetchRequest: {10, 7, 0},
API_OffsetRequest: {1, 0},
API_CreatePartitions: {2, 0},
API_SaslHandshake: {1, 0},
Expand Down

0 comments on commit ee0b1d0

Please sign in to comment.