diff --git a/pulsar/producer_partition.go b/pulsar/producer_partition.go old mode 100644 new mode 100755 index 837d1d78e..f6c872e8c --- a/pulsar/producer_partition.go +++ b/pulsar/producer_partition.go @@ -702,17 +702,19 @@ func addRequestToBatch(smm *pb.SingleMessageMetadata, p *partitionProducer, uncompressedPayload []byte, request *sendRequest, msg *ProducerMessage, deliverAt time.Time, schemaVersion []byte, multiSchemaEnabled bool) bool { - var ok bool + var useTxn bool + var mostSigBits uint64 + var leastSigBits uint64 if request.transaction != nil { txnID := request.transaction.GetTxnID() - ok = p.batchBuilder.Add(smm, p.sequenceIDGenerator, uncompressedPayload, request, - msg.ReplicationClusters, deliverAt, schemaVersion, multiSchemaEnabled, true, txnID.MostSigBits, - txnID.LeastSigBits) - } else { - ok = p.batchBuilder.Add(smm, p.sequenceIDGenerator, uncompressedPayload, request, - msg.ReplicationClusters, deliverAt, schemaVersion, multiSchemaEnabled, false, 0, 0) + useTxn = true + mostSigBits = txnID.MostSigBits + leastSigBits = txnID.LeastSigBits } - return ok + + return p.batchBuilder.Add(smm, p.sequenceIDGenerator, uncompressedPayload, request, + msg.ReplicationClusters, deliverAt, schemaVersion, multiSchemaEnabled, useTxn, mostSigBits, + leastSigBits) } func (p *partitionProducer) genMetadata(msg *ProducerMessage, @@ -756,6 +758,14 @@ func (p *partitionProducer) updateMetadataSeqID(mm *pb.MessageMetadata, msg *Pro } } +func (p *partitionProducer) updateSingleMessageMetadataSeqID(smm *pb.SingleMessageMetadata, msg *ProducerMessage) { + if msg.SequenceID != nil { + smm.SequenceId = proto.Uint64(uint64(*msg.SequenceID)) + } else { + smm.SequenceId = proto.Uint64(internal.GetAndAdd(p.sequenceIDGenerator, 1)) + } +} + func (p *partitionProducer) genSingleMessageMetadataInBatch(msg *ProducerMessage, uncompressedSize int) (smm *pb.SingleMessageMetadata) { smm = &pb.SingleMessageMetadata{ @@ -778,14 +788,7 @@ func (p *partitionProducer) genSingleMessageMetadataInBatch(msg *ProducerMessage smm.Properties = internal.ConvertFromStringMap(msg.Properties) } - var sequenceID uint64 - if msg.SequenceID != nil { - sequenceID = uint64(*msg.SequenceID) - } else { - sequenceID = internal.GetAndAdd(p.sequenceIDGenerator, 1) - } - - smm.SequenceId = proto.Uint64(sequenceID) + p.updateSingleMessageMetadataSeqID(smm, msg) return } @@ -805,35 +808,30 @@ func (p *partitionProducer) internalSingleSend(mm *pb.MessageMetadata, } sid := *mm.SequenceId - var err error + var useTxn bool + var mostSigBits uint64 + var leastSigBits uint64 + if request.transaction != nil { txnID := request.transaction.GetTxnID() - err = internal.SingleSend( - buffer, - p.producerID, - sid, - mm, - payloadBuf, - p.encryptor, - maxMessageSize, - true, - txnID.MostSigBits, - txnID.LeastSigBits, - ) - } else { - err = internal.SingleSend( - buffer, - p.producerID, - sid, - mm, - payloadBuf, - p.encryptor, - maxMessageSize, - false, - 0, - 0, - ) - } + useTxn = true + mostSigBits = txnID.MostSigBits + leastSigBits = txnID.LeastSigBits + } + + err := internal.SingleSend( + buffer, + p.producerID, + sid, + mm, + payloadBuf, + p.encryptor, + maxMessageSize, + useTxn, + mostSigBits, + leastSigBits, + ) + if err != nil { request.callback(nil, request.msg, err) p.releaseSemaphoreAndMem(int64(len(msg.Payload))) @@ -993,7 +991,7 @@ func (p *partitionProducer) failTimeoutMessages() { } } - // flag the send has completed with error, flush make no effect + // flag the sending has completed with error, flush make no effect pi.Complete() pi.Unlock() @@ -1106,10 +1104,12 @@ func (p *partitionProducer) SendAsync(ctx context.Context, msg *ProducerMessage, func (p *partitionProducer) internalSendAsync(ctx context.Context, msg *ProducerMessage, callback func(MessageID, *ProducerMessage, error), flushImmediately bool) { - //Register transaction operation to transaction and the transaction coordinator. + // Register transaction operation to transaction and the transaction coordinator. var newCallback func(MessageID, *ProducerMessage, error) + var txn *transaction if msg.Transaction != nil { transactionImpl := (msg.Transaction).(*transaction) + txn = transactionImpl if transactionImpl.state != TxnOpen { p.log.WithField("state", transactionImpl.state).Error("Failed to send message" + " by a non-open transaction.") @@ -1142,10 +1142,6 @@ func (p *partitionProducer) internalSendAsync(ctx context.Context, msg *Producer // callbackOnce make sure the callback is only invoked once in chunking callbackOnce := &sync.Once{} - var txn *transaction - if msg.Transaction != nil { - txn = (msg.Transaction).(*transaction) - } sr := &sendRequest{ ctx: ctx, msg: msg, @@ -1392,7 +1388,7 @@ func (p *partitionProducer) _setConn(conn internal.Connection) { // _getConn returns internal connection field of this partition producer atomically. // Note: should only be called by this partition producer before attempting to use the connection func (p *partitionProducer) _getConn() internal.Connection { - // Invariant: The conn must be non-nill for the lifetime of the partitionProducer. + // Invariant: The conn must be non-nil for the lifetime of the partitionProducer. // For this reason we leave this cast unchecked and panic() if the // invariant is broken return p.conn.Load().(internal.Connection)