Skip to content

Commit

Permalink
[Refactor] refactor duplicated code lines and fix typo errors (#1039)
Browse files Browse the repository at this point in the history
* [Refactor] refactor duplicated code lines and fix typo errors

* [typo] revert

* [typo] revert

* [typo] revert

* [refactor] delete redunpdant code lines

* [typo] revert some words

* [fix] set useTxn when use transaction

---------

Co-authored-by: gunli <gunli@tencent.com>
  • Loading branch information
gunli and gunli committed Jul 3, 2023
1 parent 6f01a7c commit 5f8df27
Showing 1 changed file with 45 additions and 49 deletions.
94 changes: 45 additions & 49 deletions pulsar/producer_partition.go
100644 → 100755
Original file line number Diff line number Diff line change
Expand Up @@ -710,17 +710,19 @@ func addRequestToBatch(smm *pb.SingleMessageMetadata, p *partitionProducer,
uncompressedPayload []byte,
request *sendRequest, msg *ProducerMessage, deliverAt time.Time,
schemaVersion []byte, multiSchemaEnabled bool) bool {
var ok bool
var useTxn bool
var mostSigBits uint64
var leastSigBits uint64
if request.transaction != nil {
txnID := request.transaction.GetTxnID()
ok = p.batchBuilder.Add(smm, p.sequenceIDGenerator, uncompressedPayload, request,
msg.ReplicationClusters, deliverAt, schemaVersion, multiSchemaEnabled, true, txnID.MostSigBits,
txnID.LeastSigBits)
} else {
ok = p.batchBuilder.Add(smm, p.sequenceIDGenerator, uncompressedPayload, request,
msg.ReplicationClusters, deliverAt, schemaVersion, multiSchemaEnabled, false, 0, 0)
useTxn = true
mostSigBits = txnID.MostSigBits
leastSigBits = txnID.LeastSigBits
}
return ok

return p.batchBuilder.Add(smm, p.sequenceIDGenerator, uncompressedPayload, request,
msg.ReplicationClusters, deliverAt, schemaVersion, multiSchemaEnabled, useTxn, mostSigBits,
leastSigBits)
}

func (p *partitionProducer) genMetadata(msg *ProducerMessage,
Expand Down Expand Up @@ -764,6 +766,14 @@ func (p *partitionProducer) updateMetadataSeqID(mm *pb.MessageMetadata, msg *Pro
}
}

func (p *partitionProducer) updateSingleMessageMetadataSeqID(smm *pb.SingleMessageMetadata, msg *ProducerMessage) {
if msg.SequenceID != nil {
smm.SequenceId = proto.Uint64(uint64(*msg.SequenceID))
} else {
smm.SequenceId = proto.Uint64(internal.GetAndAdd(p.sequenceIDGenerator, 1))
}
}

func (p *partitionProducer) genSingleMessageMetadataInBatch(msg *ProducerMessage,
uncompressedSize int) (smm *pb.SingleMessageMetadata) {
smm = &pb.SingleMessageMetadata{
Expand All @@ -786,14 +796,7 @@ func (p *partitionProducer) genSingleMessageMetadataInBatch(msg *ProducerMessage
smm.Properties = internal.ConvertFromStringMap(msg.Properties)
}

var sequenceID uint64
if msg.SequenceID != nil {
sequenceID = uint64(*msg.SequenceID)
} else {
sequenceID = internal.GetAndAdd(p.sequenceIDGenerator, 1)
}

smm.SequenceId = proto.Uint64(sequenceID)
p.updateSingleMessageMetadataSeqID(smm, msg)

return
}
Expand All @@ -813,35 +816,30 @@ func (p *partitionProducer) internalSingleSend(mm *pb.MessageMetadata,
}

sid := *mm.SequenceId
var err error
var useTxn bool
var mostSigBits uint64
var leastSigBits uint64

if request.transaction != nil {
txnID := request.transaction.GetTxnID()
err = internal.SingleSend(
buffer,
p.producerID,
sid,
mm,
payloadBuf,
p.encryptor,
maxMessageSize,
true,
txnID.MostSigBits,
txnID.LeastSigBits,
)
} else {
err = internal.SingleSend(
buffer,
p.producerID,
sid,
mm,
payloadBuf,
p.encryptor,
maxMessageSize,
false,
0,
0,
)
}
useTxn = true
mostSigBits = txnID.MostSigBits
leastSigBits = txnID.LeastSigBits
}

err := internal.SingleSend(
buffer,
p.producerID,
sid,
mm,
payloadBuf,
p.encryptor,
maxMessageSize,
useTxn,
mostSigBits,
leastSigBits,
)

if err != nil {
runCallback(request.callback, nil, request.msg, err)
p.releaseSemaphoreAndMem(int64(len(msg.Payload)))
Expand Down Expand Up @@ -1001,7 +999,7 @@ func (p *partitionProducer) failTimeoutMessages() {
}
}

// flag the send has completed with error, flush make no effect
// flag the sending has completed with error, flush make no effect
pi.Complete()
pi.Unlock()

Expand Down Expand Up @@ -1116,8 +1114,10 @@ func (p *partitionProducer) internalSendAsync(ctx context.Context, msg *Producer
callback func(MessageID, *ProducerMessage, error), flushImmediately bool) {
// Register transaction operation to transaction and the transaction coordinator.
var newCallback func(MessageID, *ProducerMessage, error)
var txn *transaction
if msg.Transaction != nil {
transactionImpl := (msg.Transaction).(*transaction)
txn = transactionImpl
if transactionImpl.state != TxnOpen {
p.log.WithField("state", transactionImpl.state).Error("Failed to send message" +
" by a non-open transaction.")
Expand Down Expand Up @@ -1150,10 +1150,6 @@ func (p *partitionProducer) internalSendAsync(ctx context.Context, msg *Producer

// callbackOnce make sure the callback is only invoked once in chunking
callbackOnce := &sync.Once{}
var txn *transaction
if msg.Transaction != nil {
txn = (msg.Transaction).(*transaction)
}
sr := &sendRequest{
ctx: ctx,
msg: msg,
Expand Down Expand Up @@ -1398,7 +1394,7 @@ func (p *partitionProducer) _setConn(conn internal.Connection) {
// _getConn returns internal connection field of this partition producer atomically.
// Note: should only be called by this partition producer before attempting to use the connection
func (p *partitionProducer) _getConn() internal.Connection {
// Invariant: The conn must be non-nill for the lifetime of the partitionProducer.
// Invariant: The conn must be non-nil for the lifetime of the partitionProducer.
// For this reason we leave this cast unchecked and panic() if the
// invariant is broken
return p.conn.Load().(internal.Connection)
Expand Down

0 comments on commit 5f8df27

Please sign in to comment.