Skip to content

Commit

Permalink
[Refactor] refactor duplicated code lines and fix typo errors
Browse files Browse the repository at this point in the history
  • Loading branch information
gunli committed Jun 26, 2023
1 parent 7f91b2b commit 4620dac
Showing 1 changed file with 50 additions and 52 deletions.
102 changes: 50 additions & 52 deletions pulsar/producer_partition.go
100644 → 100755
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ import (
"time"

"github.com/apache/pulsar-client-go/pulsar/internal/compression"
internalcrypto "github.com/apache/pulsar-client-go/pulsar/internal/crypto"
internalCrypto "github.com/apache/pulsar-client-go/pulsar/internal/crypto"

"google.golang.org/protobuf/proto"

Expand Down Expand Up @@ -80,7 +80,7 @@ type partitionProducer struct {
batchBuilder internal.BatchBuilder
sequenceIDGenerator *uint64
batchFlushTicker *time.Ticker
encryptor internalcrypto.Encryptor
encryptor internalCrypto.Encryptor
compressionProvider compression.Provider

// Channel where app is posting messages to be published
Expand Down Expand Up @@ -264,12 +264,12 @@ func (p *partitionProducer) grabCnx() error {
p.topicEpoch = &nextTopicEpoch

if p.options.Encryption != nil {
p.encryptor = internalcrypto.NewProducerEncryptor(p.options.Encryption.Keys,
p.encryptor = internalCrypto.NewProducerEncryptor(p.options.Encryption.Keys,
p.options.Encryption.KeyReader,
p.options.Encryption.MessageCrypto,
p.options.Encryption.ProducerCryptoFailureAction, p.log)
} else {
p.encryptor = internalcrypto.NewNoopEncryptor()
p.encryptor = internalCrypto.NewNoopEncryptor()
}

if p.sequenceIDGenerator == nil {
Expand Down Expand Up @@ -499,13 +499,15 @@ func (p *partitionProducer) internalSend(request *sendRequest) {
return
}
}

var schema Schema
var schemaVersion []byte
if msg.Schema != nil {
schema = msg.Schema
} else if p.options.Schema != nil {
schema = p.options.Schema
}

if msg.Value != nil {
// payload and schema are mutually exclusive
// try to get payload from schema value only if payload is not set
Expand All @@ -519,6 +521,7 @@ func (p *partitionProducer) internalSend(request *sendRequest) {
}
}
}

if uncompressedPayload == nil {
uncompressedPayload = schemaPayload
}
Expand Down Expand Up @@ -690,10 +693,9 @@ func (p *partitionProducer) internalSend(request *sendRequest) {
return
}
}
if request.flushImmediately {

if request.flushImmediately {
p.internalFlushCurrentBatch()

}
}
}
Expand All @@ -702,17 +704,18 @@ func addRequestToBatch(smm *pb.SingleMessageMetadata, p *partitionProducer,
uncompressedPayload []byte,
request *sendRequest, msg *ProducerMessage, deliverAt time.Time,
schemaVersion []byte, multiSchemaEnabled bool) bool {
var ok bool
var useTxn bool
var mostSigBits uint64
var leastSigBits uint64
if request.transaction != nil {
txnID := request.transaction.GetTxnID()
ok = p.batchBuilder.Add(smm, p.sequenceIDGenerator, uncompressedPayload, request,
msg.ReplicationClusters, deliverAt, schemaVersion, multiSchemaEnabled, true, txnID.MostSigBits,
txnID.LeastSigBits)
} else {
ok = p.batchBuilder.Add(smm, p.sequenceIDGenerator, uncompressedPayload, request,
msg.ReplicationClusters, deliverAt, schemaVersion, multiSchemaEnabled, false, 0, 0)
mostSigBits = txnID.MostSigBits
leastSigBits = txnID.LeastSigBits
}
return ok

return p.batchBuilder.Add(smm, p.sequenceIDGenerator, uncompressedPayload, request,
msg.ReplicationClusters, deliverAt, schemaVersion, multiSchemaEnabled, useTxn, mostSigBits,
leastSigBits)
}

func (p *partitionProducer) genMetadata(msg *ProducerMessage,
Expand Down Expand Up @@ -752,6 +755,14 @@ func (p *partitionProducer) updateMetadataSeqID(mm *pb.MessageMetadata, msg *Pro
}
}

func (p *partitionProducer) updateSingleMessageMetadataSeqID(smm *pb.SingleMessageMetadata, msg *ProducerMessage) {
if msg.SequenceID != nil {
smm.SequenceId = proto.Uint64(uint64(*msg.SequenceID))
} else {
smm.SequenceId = proto.Uint64(internal.GetAndAdd(p.sequenceIDGenerator, 1))
}
}

func (p *partitionProducer) genSingleMessageMetadataInBatch(msg *ProducerMessage,
uncompressedSize int) (smm *pb.SingleMessageMetadata) {
smm = &pb.SingleMessageMetadata{
Expand All @@ -774,14 +785,7 @@ func (p *partitionProducer) genSingleMessageMetadataInBatch(msg *ProducerMessage
smm.Properties = internal.ConvertFromStringMap(msg.Properties)
}

var sequenceID uint64
if msg.SequenceID != nil {
sequenceID = uint64(*msg.SequenceID)
} else {
sequenceID = internal.GetAndAdd(p.sequenceIDGenerator, 1)
}

smm.SequenceId = proto.Uint64(sequenceID)
p.updateSingleMessageMetadataSeqID(smm, msg)

return
}
Expand All @@ -801,35 +805,29 @@ func (p *partitionProducer) internalSingleSend(mm *pb.MessageMetadata,
}

sid := *mm.SequenceId
var err error
var useTxn bool
var mostSigBits uint64
var leastSigBits uint64

if request.transaction != nil {
txnID := request.transaction.GetTxnID()
err = internal.SingleSend(
buffer,
p.producerID,
sid,
mm,
payloadBuf,
p.encryptor,
maxMessageSize,
true,
txnID.MostSigBits,
txnID.LeastSigBits,
)
} else {
err = internal.SingleSend(
buffer,
p.producerID,
sid,
mm,
payloadBuf,
p.encryptor,
maxMessageSize,
false,
0,
0,
)
}
mostSigBits = txnID.MostSigBits
leastSigBits = txnID.LeastSigBits
}

err := internal.SingleSend(
buffer,
p.producerID,
sid,
mm,
payloadBuf,
p.encryptor,
maxMessageSize,
useTxn,
mostSigBits,
leastSigBits,
)

if err != nil {
request.callback(nil, request.msg, err)
p.releaseSemaphoreAndMem(int64(len(msg.Payload)))
Expand Down Expand Up @@ -989,7 +987,7 @@ func (p *partitionProducer) failTimeoutMessages() {
}
}

// flag the send has completed with error, flush make no effect
// mark the sending has completed with error, flush make no effect
pi.Complete()
pi.Unlock()

Expand Down Expand Up @@ -1102,7 +1100,7 @@ func (p *partitionProducer) SendAsync(ctx context.Context, msg *ProducerMessage,

func (p *partitionProducer) internalSendAsync(ctx context.Context, msg *ProducerMessage,
callback func(MessageID, *ProducerMessage, error), flushImmediately bool) {
//Register transaction operation to transaction and the transaction coordinator.
// Register transaction operation to transaction and the transaction coordinator.
var newCallback func(MessageID, *ProducerMessage, error)
if msg.Transaction != nil {
transactionImpl := (msg.Transaction).(*transaction)
Expand Down Expand Up @@ -1388,7 +1386,7 @@ func (p *partitionProducer) _setConn(conn internal.Connection) {
// _getConn returns internal connection field of this partition producer atomically.
// Note: should only be called by this partition producer before attempting to use the connection
func (p *partitionProducer) _getConn() internal.Connection {
// Invariant: The conn must be non-nill for the lifetime of the partitionProducer.
// Invariant: The connection must be non-nil for the lifetime of the partitionProducer.
// For this reason we leave this cast unchecked and panic() if the
// invariant is broken
return p.conn.Load().(internal.Connection)
Expand Down

0 comments on commit 4620dac

Please sign in to comment.