Skip to content

Commit

Permalink
[fix] Only decompress the payload if it's not empty
Browse files Browse the repository at this point in the history
The message payload is optional and in some cases only message
properties are sent. In this case, the message decompression would fail
so we only want to do the decompression if the payload is not empty.
  • Loading branch information
stepanbujnak committed Sep 4, 2024
1 parent 953d9ea commit 8a4e24b
Showing 1 changed file with 11 additions and 8 deletions.
19 changes: 11 additions & 8 deletions pulsar/consumer_partition.go
Original file line number Diff line number Diff line change
Expand Up @@ -1099,15 +1099,18 @@ func (pc *partitionConsumer) MessageReceived(response *pb.CommandMessage, header
}
}

// decryption is success, decompress the payload
uncompressedHeadersAndPayload, err := pc.Decompress(msgMeta, processedPayloadBuffer)
if err != nil {
pc.discardCorruptedMessage(pbMsgID, pb.CommandAck_DecompressionError)
return err
}
var uncompressedHeadersAndPayload internal.Buffer
// decryption is success, decompress the payload, but only if payload is not empty
if n := msgMeta.UncompressedSize; n != nil && *n > 0 {
uncompressedHeadersAndPayload, err = pc.Decompress(msgMeta, processedPayloadBuffer)
if err != nil {
pc.discardCorruptedMessage(pbMsgID, pb.CommandAck_DecompressionError)
return err
}

// Reset the reader on the uncompressed buffer
reader.ResetBuffer(uncompressedHeadersAndPayload)
// Reset the reader on the uncompressed buffer
reader.ResetBuffer(uncompressedHeadersAndPayload)
}

numMsgs := 1
if msgMeta.NumMessagesInBatch != nil {
Expand Down

0 comments on commit 8a4e24b

Please sign in to comment.