From 76dd0ef2a07e69b0e802a1a41d360de4fe6c6301 Mon Sep 17 00:00:00 2001 From: gunli Date: Mon, 26 Jun 2023 11:34:39 +0800 Subject: [PATCH 1/7] [Refactor] refactor duplicated code lines and fix typo errors --- pulsar/producer_partition.go | 102 +++++++++++++++++------------------ 1 file changed, 50 insertions(+), 52 deletions(-) mode change 100644 => 100755 pulsar/producer_partition.go diff --git a/pulsar/producer_partition.go b/pulsar/producer_partition.go old mode 100644 new mode 100755 index 837d1d78e6..f94a262455 --- a/pulsar/producer_partition.go +++ b/pulsar/producer_partition.go @@ -29,7 +29,7 @@ import ( "time" "github.com/apache/pulsar-client-go/pulsar/internal/compression" - internalcrypto "github.com/apache/pulsar-client-go/pulsar/internal/crypto" + internalCrypto "github.com/apache/pulsar-client-go/pulsar/internal/crypto" "google.golang.org/protobuf/proto" @@ -80,7 +80,7 @@ type partitionProducer struct { batchBuilder internal.BatchBuilder sequenceIDGenerator *uint64 batchFlushTicker *time.Ticker - encryptor internalcrypto.Encryptor + encryptor internalCrypto.Encryptor compressionProvider compression.Provider // Channel where app is posting messages to be published @@ -264,12 +264,12 @@ func (p *partitionProducer) grabCnx() error { p.topicEpoch = &nextTopicEpoch if p.options.Encryption != nil { - p.encryptor = internalcrypto.NewProducerEncryptor(p.options.Encryption.Keys, + p.encryptor = internalCrypto.NewProducerEncryptor(p.options.Encryption.Keys, p.options.Encryption.KeyReader, p.options.Encryption.MessageCrypto, p.options.Encryption.ProducerCryptoFailureAction, p.log) } else { - p.encryptor = internalcrypto.NewNoopEncryptor() + p.encryptor = internalCrypto.NewNoopEncryptor() } if p.sequenceIDGenerator == nil { @@ -499,6 +499,7 @@ func (p *partitionProducer) internalSend(request *sendRequest) { return } } + var schema Schema var schemaVersion []byte if msg.Schema != nil { @@ -506,6 +507,7 @@ func (p *partitionProducer) internalSend(request *sendRequest) { } else if p.options.Schema != nil { schema = p.options.Schema } + if msg.Value != nil { // payload and schema are mutually exclusive // try to get payload from schema value only if payload is not set @@ -519,6 +521,7 @@ func (p *partitionProducer) internalSend(request *sendRequest) { } } } + if uncompressedPayload == nil { uncompressedPayload = schemaPayload } @@ -690,10 +693,9 @@ func (p *partitionProducer) internalSend(request *sendRequest) { return } } - if request.flushImmediately { + if request.flushImmediately { p.internalFlushCurrentBatch() - } } } @@ -702,17 +704,18 @@ func addRequestToBatch(smm *pb.SingleMessageMetadata, p *partitionProducer, uncompressedPayload []byte, request *sendRequest, msg *ProducerMessage, deliverAt time.Time, schemaVersion []byte, multiSchemaEnabled bool) bool { - var ok bool + var useTxn bool + var mostSigBits uint64 + var leastSigBits uint64 if request.transaction != nil { txnID := request.transaction.GetTxnID() - ok = p.batchBuilder.Add(smm, p.sequenceIDGenerator, uncompressedPayload, request, - msg.ReplicationClusters, deliverAt, schemaVersion, multiSchemaEnabled, true, txnID.MostSigBits, - txnID.LeastSigBits) - } else { - ok = p.batchBuilder.Add(smm, p.sequenceIDGenerator, uncompressedPayload, request, - msg.ReplicationClusters, deliverAt, schemaVersion, multiSchemaEnabled, false, 0, 0) + mostSigBits = txnID.MostSigBits + leastSigBits = txnID.LeastSigBits } - return ok + + return p.batchBuilder.Add(smm, p.sequenceIDGenerator, uncompressedPayload, request, + msg.ReplicationClusters, deliverAt, schemaVersion, multiSchemaEnabled, useTxn, mostSigBits, + leastSigBits) } func (p *partitionProducer) genMetadata(msg *ProducerMessage, @@ -756,6 +759,14 @@ func (p *partitionProducer) updateMetadataSeqID(mm *pb.MessageMetadata, msg *Pro } } +func (p *partitionProducer) updateSingleMessageMetadataSeqID(smm *pb.SingleMessageMetadata, msg *ProducerMessage) { + if msg.SequenceID != nil { + smm.SequenceId = proto.Uint64(uint64(*msg.SequenceID)) + } else { + smm.SequenceId = proto.Uint64(internal.GetAndAdd(p.sequenceIDGenerator, 1)) + } +} + func (p *partitionProducer) genSingleMessageMetadataInBatch(msg *ProducerMessage, uncompressedSize int) (smm *pb.SingleMessageMetadata) { smm = &pb.SingleMessageMetadata{ @@ -778,14 +789,7 @@ func (p *partitionProducer) genSingleMessageMetadataInBatch(msg *ProducerMessage smm.Properties = internal.ConvertFromStringMap(msg.Properties) } - var sequenceID uint64 - if msg.SequenceID != nil { - sequenceID = uint64(*msg.SequenceID) - } else { - sequenceID = internal.GetAndAdd(p.sequenceIDGenerator, 1) - } - - smm.SequenceId = proto.Uint64(sequenceID) + p.updateSingleMessageMetadataSeqID(smm, msg) return } @@ -805,35 +809,29 @@ func (p *partitionProducer) internalSingleSend(mm *pb.MessageMetadata, } sid := *mm.SequenceId - var err error + var useTxn bool + var mostSigBits uint64 + var leastSigBits uint64 + if request.transaction != nil { txnID := request.transaction.GetTxnID() - err = internal.SingleSend( - buffer, - p.producerID, - sid, - mm, - payloadBuf, - p.encryptor, - maxMessageSize, - true, - txnID.MostSigBits, - txnID.LeastSigBits, - ) - } else { - err = internal.SingleSend( - buffer, - p.producerID, - sid, - mm, - payloadBuf, - p.encryptor, - maxMessageSize, - false, - 0, - 0, - ) - } + mostSigBits = txnID.MostSigBits + leastSigBits = txnID.LeastSigBits + } + + err := internal.SingleSend( + buffer, + p.producerID, + sid, + mm, + payloadBuf, + p.encryptor, + maxMessageSize, + useTxn, + mostSigBits, + leastSigBits, + ) + if err != nil { request.callback(nil, request.msg, err) p.releaseSemaphoreAndMem(int64(len(msg.Payload))) @@ -993,7 +991,7 @@ func (p *partitionProducer) failTimeoutMessages() { } } - // flag the send has completed with error, flush make no effect + // mark the sending has completed with error, flush make no effect pi.Complete() pi.Unlock() @@ -1106,7 +1104,7 @@ func (p *partitionProducer) SendAsync(ctx context.Context, msg *ProducerMessage, func (p *partitionProducer) internalSendAsync(ctx context.Context, msg *ProducerMessage, callback func(MessageID, *ProducerMessage, error), flushImmediately bool) { - //Register transaction operation to transaction and the transaction coordinator. + // Register transaction operation to transaction and the transaction coordinator. var newCallback func(MessageID, *ProducerMessage, error) if msg.Transaction != nil { transactionImpl := (msg.Transaction).(*transaction) @@ -1392,7 +1390,7 @@ func (p *partitionProducer) _setConn(conn internal.Connection) { // _getConn returns internal connection field of this partition producer atomically. // Note: should only be called by this partition producer before attempting to use the connection func (p *partitionProducer) _getConn() internal.Connection { - // Invariant: The conn must be non-nill for the lifetime of the partitionProducer. + // Invariant: The connection must be non-nil for the lifetime of the partitionProducer. // For this reason we leave this cast unchecked and panic() if the // invariant is broken return p.conn.Load().(internal.Connection) From a3a59acb56632001f7fc065022fd30092b3b9c9d Mon Sep 17 00:00:00 2001 From: gunli Date: Thu, 29 Jun 2023 18:59:47 +0800 Subject: [PATCH 2/7] [typo] revert --- pulsar/producer_partition.go | 14 ++++++-------- 1 file changed, 6 insertions(+), 8 deletions(-) diff --git a/pulsar/producer_partition.go b/pulsar/producer_partition.go index f94a262455..45e1e5b59e 100755 --- a/pulsar/producer_partition.go +++ b/pulsar/producer_partition.go @@ -29,7 +29,7 @@ import ( "time" "github.com/apache/pulsar-client-go/pulsar/internal/compression" - internalCrypto "github.com/apache/pulsar-client-go/pulsar/internal/crypto" + internalcrypto "github.com/apache/pulsar-client-go/pulsar/internal/crypto" "google.golang.org/protobuf/proto" @@ -80,7 +80,7 @@ type partitionProducer struct { batchBuilder internal.BatchBuilder sequenceIDGenerator *uint64 batchFlushTicker *time.Ticker - encryptor internalCrypto.Encryptor + encryptor internalcrypto.Encryptor compressionProvider compression.Provider // Channel where app is posting messages to be published @@ -264,12 +264,12 @@ func (p *partitionProducer) grabCnx() error { p.topicEpoch = &nextTopicEpoch if p.options.Encryption != nil { - p.encryptor = internalCrypto.NewProducerEncryptor(p.options.Encryption.Keys, + p.encryptor = internalcrypto.NewProducerEncryptor(p.options.Encryption.Keys, p.options.Encryption.KeyReader, p.options.Encryption.MessageCrypto, p.options.Encryption.ProducerCryptoFailureAction, p.log) } else { - p.encryptor = internalCrypto.NewNoopEncryptor() + p.encryptor = internalcrypto.NewNoopEncryptor() } if p.sequenceIDGenerator == nil { @@ -499,7 +499,6 @@ func (p *partitionProducer) internalSend(request *sendRequest) { return } } - var schema Schema var schemaVersion []byte if msg.Schema != nil { @@ -507,7 +506,6 @@ func (p *partitionProducer) internalSend(request *sendRequest) { } else if p.options.Schema != nil { schema = p.options.Schema } - if msg.Value != nil { // payload and schema are mutually exclusive // try to get payload from schema value only if payload is not set @@ -525,7 +523,6 @@ func (p *partitionProducer) internalSend(request *sendRequest) { if uncompressedPayload == nil { uncompressedPayload = schemaPayload } - if schema != nil { schemaVersion = p.schemaCache.Get(schema.GetSchemaInfo()) if schemaVersion == nil { @@ -693,9 +690,10 @@ func (p *partitionProducer) internalSend(request *sendRequest) { return } } - if request.flushImmediately { + p.internalFlushCurrentBatch() + } } } From 46c7e576102fcbe3a022f382f7b8650f87998741 Mon Sep 17 00:00:00 2001 From: gunli Date: Thu, 29 Jun 2023 19:01:22 +0800 Subject: [PATCH 3/7] [typo] revert --- pulsar/producer_partition.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/pulsar/producer_partition.go b/pulsar/producer_partition.go index 45e1e5b59e..8a0ee90698 100755 --- a/pulsar/producer_partition.go +++ b/pulsar/producer_partition.go @@ -519,10 +519,10 @@ func (p *partitionProducer) internalSend(request *sendRequest) { } } } - if uncompressedPayload == nil { uncompressedPayload = schemaPayload } + if schema != nil { schemaVersion = p.schemaCache.Get(schema.GetSchemaInfo()) if schemaVersion == nil { @@ -691,7 +691,7 @@ func (p *partitionProducer) internalSend(request *sendRequest) { } } if request.flushImmediately { - + p.internalFlushCurrentBatch() } From 9052f9c5c7278c28c52d32c258fdfe3b4cc1282e Mon Sep 17 00:00:00 2001 From: gunli Date: Thu, 29 Jun 2023 19:02:24 +0800 Subject: [PATCH 4/7] [typo] revert --- pulsar/producer_partition.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pulsar/producer_partition.go b/pulsar/producer_partition.go index 8a0ee90698..cf9b6dd621 100755 --- a/pulsar/producer_partition.go +++ b/pulsar/producer_partition.go @@ -522,7 +522,7 @@ func (p *partitionProducer) internalSend(request *sendRequest) { if uncompressedPayload == nil { uncompressedPayload = schemaPayload } - + if schema != nil { schemaVersion = p.schemaCache.Get(schema.GetSchemaInfo()) if schemaVersion == nil { From 9138dd92b89c1bc42d018019f4e8891b656e4d24 Mon Sep 17 00:00:00 2001 From: gunli Date: Thu, 29 Jun 2023 19:07:47 +0800 Subject: [PATCH 5/7] [refactor] delete redunpdant code lines --- pulsar/producer_partition.go | 6 ++---- 1 file changed, 2 insertions(+), 4 deletions(-) diff --git a/pulsar/producer_partition.go b/pulsar/producer_partition.go index cf9b6dd621..221c19ffca 100755 --- a/pulsar/producer_partition.go +++ b/pulsar/producer_partition.go @@ -1104,8 +1104,10 @@ func (p *partitionProducer) internalSendAsync(ctx context.Context, msg *Producer callback func(MessageID, *ProducerMessage, error), flushImmediately bool) { // Register transaction operation to transaction and the transaction coordinator. var newCallback func(MessageID, *ProducerMessage, error) + var txn *transaction if msg.Transaction != nil { transactionImpl := (msg.Transaction).(*transaction) + txn = transactionImpl if transactionImpl.state != TxnOpen { p.log.WithField("state", transactionImpl.state).Error("Failed to send message" + " by a non-open transaction.") @@ -1138,10 +1140,6 @@ func (p *partitionProducer) internalSendAsync(ctx context.Context, msg *Producer // callbackOnce make sure the callback is only invoked once in chunking callbackOnce := &sync.Once{} - var txn *transaction - if msg.Transaction != nil { - txn = (msg.Transaction).(*transaction) - } sr := &sendRequest{ ctx: ctx, msg: msg, From 8d34019ec07b81796924fb227ec78638331f4230 Mon Sep 17 00:00:00 2001 From: gunli Date: Thu, 29 Jun 2023 19:13:11 +0800 Subject: [PATCH 6/7] [typo] revert some words --- pulsar/producer_partition.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/pulsar/producer_partition.go b/pulsar/producer_partition.go index 221c19ffca..5d2fabb6ab 100755 --- a/pulsar/producer_partition.go +++ b/pulsar/producer_partition.go @@ -989,7 +989,7 @@ func (p *partitionProducer) failTimeoutMessages() { } } - // mark the sending has completed with error, flush make no effect + // flag the sending has completed with error, flush make no effect pi.Complete() pi.Unlock() @@ -1386,7 +1386,7 @@ func (p *partitionProducer) _setConn(conn internal.Connection) { // _getConn returns internal connection field of this partition producer atomically. // Note: should only be called by this partition producer before attempting to use the connection func (p *partitionProducer) _getConn() internal.Connection { - // Invariant: The connection must be non-nil for the lifetime of the partitionProducer. + // Invariant: The conn must be non-nil for the lifetime of the partitionProducer. // For this reason we leave this cast unchecked and panic() if the // invariant is broken return p.conn.Load().(internal.Connection) From c40581d10ba87d5bf4170abc61609aa24fe2a11d Mon Sep 17 00:00:00 2001 From: gunli Date: Fri, 30 Jun 2023 22:17:12 +0800 Subject: [PATCH 7/7] [fix] set useTxn when use transaction --- pulsar/producer_partition.go | 2 ++ 1 file changed, 2 insertions(+) diff --git a/pulsar/producer_partition.go b/pulsar/producer_partition.go index 5d2fabb6ab..f6c872e8c1 100755 --- a/pulsar/producer_partition.go +++ b/pulsar/producer_partition.go @@ -707,6 +707,7 @@ func addRequestToBatch(smm *pb.SingleMessageMetadata, p *partitionProducer, var leastSigBits uint64 if request.transaction != nil { txnID := request.transaction.GetTxnID() + useTxn = true mostSigBits = txnID.MostSigBits leastSigBits = txnID.LeastSigBits } @@ -813,6 +814,7 @@ func (p *partitionProducer) internalSingleSend(mm *pb.MessageMetadata, if request.transaction != nil { txnID := request.transaction.GetTxnID() + useTxn = true mostSigBits = txnID.MostSigBits leastSigBits = txnID.LeastSigBits }