Skip to content

Commit

Permalink
[Fix] check if callback is nil before calling it
Browse files Browse the repository at this point in the history
  • Loading branch information
gunli committed Jun 25, 2023
1 parent 7f91b2b commit 041fd4a
Showing 1 changed file with 30 additions and 28 deletions.
58 changes: 30 additions & 28 deletions pulsar/producer_partition.go
Original file line number Diff line number Diff line change
Expand Up @@ -467,6 +467,14 @@ func (p *partitionProducer) Name() string {
return p.producerName
}

func runCallback(cb func(MessageID, *ProducerMessage, error), id MessageID, msg *ProducerMessage, err error) {
if cb == nil {
return
}

cb(id, msg, err)
}

func (p *partitionProducer) internalSend(request *sendRequest) {
p.log.Debug("Received send request: ", *request.msg)

Expand All @@ -480,7 +488,7 @@ func (p *partitionProducer) internalSend(request *sendRequest) {
var err error
if msg.Value != nil && msg.Payload != nil {
p.log.Error("Can not set Value and Payload both")
request.callback(nil, request.msg, errors.New("can not set Value and Payload both"))
runCallback(request.callback, nil, request.msg, errors.New("can not set Value and Payload both"))
return
}

Expand All @@ -494,7 +502,7 @@ func (p *partitionProducer) internalSend(request *sendRequest) {
if msg.Schema != nil && p.options.Schema != nil &&
msg.Schema.GetSchemaInfo().hash() != p.options.Schema.GetSchemaInfo().hash() {
p.releaseSemaphoreAndMem(uncompressedPayloadSize)
request.callback(nil, request.msg, fmt.Errorf("msg schema can not match with producer schema"))
runCallback(request.callback, nil, request.msg, fmt.Errorf("msg schema can not match with producer schema"))
p.log.WithError(err).Errorf("The producer %s of the topic %s is disabled the `MultiSchema`", p.producerName, p.topic)
return
}
Expand All @@ -513,7 +521,7 @@ func (p *partitionProducer) internalSend(request *sendRequest) {
schemaPayload, err = schema.Encode(msg.Value)
if err != nil {
p.releaseSemaphoreAndMem(uncompressedPayloadSize)
request.callback(nil, request.msg, newError(SchemaFailure, err.Error()))
runCallback(request.callback, nil, request.msg, newError(SchemaFailure, err.Error()))
p.log.WithError(err).Errorf("Schema encode message failed %s", msg.Value)
return
}
Expand All @@ -530,7 +538,7 @@ func (p *partitionProducer) internalSend(request *sendRequest) {
if err != nil {
p.releaseSemaphoreAndMem(uncompressedPayloadSize)
p.log.WithError(err).Error("get schema version fail")
request.callback(nil, request.msg, fmt.Errorf("get schema version fail, err: %w", err))
runCallback(request.callback, nil, request.msg, fmt.Errorf("get schema version fail, err: %w", err))
return
}
p.schemaCache.Put(schema.GetSchemaInfo(), schemaVersion)
Expand Down Expand Up @@ -589,7 +597,7 @@ func (p *partitionProducer) internalSend(request *sendRequest) {
// if msg is too large and chunking is disabled
if checkSize > maxMessageSize && !p.options.EnableChunking {
p.releaseSemaphoreAndMem(uncompressedPayloadSize)
request.callback(nil, request.msg, errMessageTooLarge)
runCallback(request.callback, nil, request.msg, errMessageTooLarge)
p.log.WithError(errMessageTooLarge).
WithField("size", checkSize).
WithField("properties", msg.Properties).
Expand All @@ -608,7 +616,7 @@ func (p *partitionProducer) internalSend(request *sendRequest) {
payloadChunkSize = int(p._getConn().GetMaxMessageSize()) - proto.Size(mm)
if payloadChunkSize <= 0 {
p.releaseSemaphoreAndMem(uncompressedPayloadSize)
request.callback(nil, msg, errMetaTooLarge)
runCallback(request.callback, nil, msg, errMetaTooLarge)
p.log.WithError(errMetaTooLarge).
WithField("metadata size", proto.Size(mm)).
WithField("properties", msg.Properties).
Expand Down Expand Up @@ -683,7 +691,7 @@ func (p *partitionProducer) internalSend(request *sendRequest) {
if ok := addRequestToBatch(smm, p, uncompressedPayload, request, msg, deliverAt, schemaVersion,
multiSchemaEnabled); !ok {
p.releaseSemaphoreAndMem(uncompressedPayloadSize)
request.callback(nil, request.msg, errFailAddToBatch)
runCallback(request.callback, nil, request.msg, errFailAddToBatch)
p.log.WithField("size", uncompressedSize).
WithField("properties", msg.Properties).
Error("unable to add message to batch")
Expand Down Expand Up @@ -831,7 +839,7 @@ func (p *partitionProducer) internalSingleSend(mm *pb.MessageMetadata,
)
}
if err != nil {
request.callback(nil, request.msg, err)
runCallback(request.callback, nil, request.msg, err)
p.releaseSemaphoreAndMem(int64(len(msg.Payload)))
p.log.WithError(err).Errorf("Single message serialize failed %s", msg.Value)
return
Expand Down Expand Up @@ -871,7 +879,7 @@ func (p *partitionProducer) internalFlushCurrentBatch() {
if err != nil {
for _, cb := range callbacks {
if sr, ok := cb.(*sendRequest); ok {
sr.callback(nil, sr.msg, err)
runCallback(sr.callback, nil, sr.msg, err)
}
}
if errors.Is(err, internal.ErrExceedMaxMessageSize) {
Expand Down Expand Up @@ -981,7 +989,7 @@ func (p *partitionProducer) failTimeoutMessages() {

if sr.callback != nil {
sr.callbackOnce.Do(func() {
sr.callback(nil, sr.msg, errSendTimeout)
runCallback(sr.callback, nil, sr.msg, errSendTimeout)
})
}
if sr.transaction != nil {
Expand Down Expand Up @@ -1014,7 +1022,7 @@ func (p *partitionProducer) internalFlushCurrentBatches() {
if errs[i] != nil {
for _, cb := range callbacks[i] {
if sr, ok := cb.(*sendRequest); ok {
sr.callback(nil, sr.msg, errs[i])
runCallback(sr.callback, nil, sr.msg, errs[i])
}
}
if errors.Is(errs[i], internal.ErrExceedMaxMessageSize) {
Expand Down Expand Up @@ -1102,34 +1110,34 @@ func (p *partitionProducer) SendAsync(ctx context.Context, msg *ProducerMessage,

func (p *partitionProducer) internalSendAsync(ctx context.Context, msg *ProducerMessage,
callback func(MessageID, *ProducerMessage, error), flushImmediately bool) {
//Register transaction operation to transaction and the transaction coordinator.
// Register transaction operation to transaction and the transaction coordinator.
var newCallback func(MessageID, *ProducerMessage, error)
if msg.Transaction != nil {
transactionImpl := (msg.Transaction).(*transaction)
if transactionImpl.state != TxnOpen {
p.log.WithField("state", transactionImpl.state).Error("Failed to send message" +
" by a non-open transaction.")
callback(nil, msg, newError(InvalidStatus, "Failed to send message by a non-open transaction."))
runCallback(callback, nil, msg, newError(InvalidStatus, "Failed to send message by a non-open transaction."))
return
}

if err := transactionImpl.registerProducerTopic(p.topic); err != nil {
callback(nil, msg, err)
runCallback(callback, nil, msg, err)
return
}
if err := transactionImpl.registerSendOrAckOp(); err != nil {
callback(nil, msg, err)
runCallback(callback, nil, msg, err)
}
newCallback = func(id MessageID, producerMessage *ProducerMessage, err error) {
callback(id, producerMessage, err)
runCallback(callback, id, producerMessage, err)
transactionImpl.endSendOrAckOp(err)
}
} else {
newCallback = callback
}
if p.getProducerState() != producerReady {
// Producer is closing
newCallback(nil, msg, errProducerClosed)
runCallback(newCallback, nil, msg, errProducerClosed)
return
}

Expand Down Expand Up @@ -1249,9 +1257,7 @@ func (p *partitionProducer) ReceivedSendReceipt(response *pb.CommandSendReceipt)
}

if sr.totalChunks <= 1 || sr.chunkID == sr.totalChunks-1 {
if sr.callback != nil {
sr.callback(msgID, sr.msg, nil)
}
runCallback(sr.callback, msgID, sr.msg, nil)
p.options.Interceptors.OnSendAcknowledgement(p, sr.msg, msgID)
}
}
Expand Down Expand Up @@ -1402,27 +1408,23 @@ func (p *partitionProducer) releaseSemaphoreAndMem(size int64) {
func (p *partitionProducer) canAddToQueue(sr *sendRequest, uncompressedPayloadSize int64) bool {
if p.options.DisableBlockIfQueueFull {
if !p.publishSemaphore.TryAcquire() {
if sr.callback != nil {
sr.callback(nil, sr.msg, errSendQueueIsFull)
}
runCallback(sr.callback, nil, sr.msg, errSendQueueIsFull)
return false
}
if !p.client.memLimit.TryReserveMemory(uncompressedPayloadSize) {
p.publishSemaphore.Release()
if sr.callback != nil {
sr.callback(nil, sr.msg, errMemoryBufferIsFull)
}
runCallback(sr.callback, nil, sr.msg, errMemoryBufferIsFull)
return false
}

} else {
if !p.publishSemaphore.Acquire(sr.ctx) {
sr.callback(nil, sr.msg, errContextExpired)
runCallback(sr.callback, nil, sr.msg, errContextExpired)
return false
}
if !p.client.memLimit.ReserveMemory(sr.ctx, uncompressedPayloadSize) {
p.publishSemaphore.Release()
sr.callback(nil, sr.msg, errContextExpired)
runCallback(sr.callback, nil, sr.msg, errContextExpired)
return false
}
}
Expand Down

0 comments on commit 041fd4a

Please sign in to comment.