Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Bug] Should we return when transactionImpl.registerSendOrAckOp() returns an error? #1040

Closed
gunli opened this issue Jun 26, 2023 · 2 comments · Fixed by #1045
Closed

[Bug] Should we return when transactionImpl.registerSendOrAckOp() returns an error? #1040

gunli opened this issue Jun 26, 2023 · 2 comments · Fixed by #1045

Comments

@gunli
Copy link
Contributor

gunli commented Jun 26, 2023

Expected behavior

Should we return when transactionImpl.registerSendOrAckOp() returns an error?

Actual behavior

Keep going when transactionImpl.registerSendOrAckOp() returns an error, is this a bug?

func (p *partitionProducer) internalSendAsync(ctx context.Context, msg *ProducerMessage,
	callback func(MessageID, *ProducerMessage, error), flushImmediately bool) {
	//Register transaction operation to transaction and the transaction coordinator.
	var newCallback func(MessageID, *ProducerMessage, error)
	if msg.Transaction != nil {
		transactionImpl := (msg.Transaction).(*transaction)
		if transactionImpl.state != TxnOpen {
			p.log.WithField("state", transactionImpl.state).Error("Failed to send message" +
				" by a non-open transaction.")
			callback(nil, msg, newError(InvalidStatus, "Failed to send message by a non-open transaction."))
			return
		}

		if err := transactionImpl.registerProducerTopic(p.topic); err != nil {
			callback(nil, msg, err)
			return
		}
		if err := transactionImpl.registerSendOrAckOp(); err != nil {
			callback(nil, msg, err)
                        //Should WE RETURN HERE ???
		}
		newCallback = func(id MessageID, producerMessage *ProducerMessage, err error) {
			callback(id, producerMessage, err)
			transactionImpl.endSendOrAckOp(err)
		}
	} else {
		newCallback = callback
	}
	if p.getProducerState() != producerReady {
		// Producer is closing
		newCallback(nil, msg, errProducerClosed)
		return
	}

	// bc only works when DisableBlockIfQueueFull is false
	bc := make(chan struct{})

	// callbackOnce make sure the callback is only invoked once in chunking
	callbackOnce := &sync.Once{}
	var txn *transaction
	if msg.Transaction != nil {
		txn = (msg.Transaction).(*transaction)
	}
	sr := &sendRequest{
		ctx:              ctx,
		msg:              msg,
		callback:         newCallback,
		callbackOnce:     callbackOnce,
		flushImmediately: flushImmediately,
		publishTime:      time.Now(),
		blockCh:          bc,
		closeBlockChOnce: &sync.Once{},
		transaction:      txn,
	}
	p.options.Interceptors.BeforeSend(p, msg)

	p.dataChan <- sr

	if !p.options.DisableBlockIfQueueFull {
		// block if queue full
		<-bc
	}
}

Steps to reproduce

How can we reproduce the issue

System configuration

Pulsar version: x.y

@shibd
Copy link
Member

shibd commented Jun 29, 2023

This is a bug, and you can create a new PR to fix it.

No need to mix with this PR.
#1036.

@gunli
Copy link
Contributor Author

gunli commented Jun 29, 2023

OK

RobertIndie pushed a commit that referenced this issue Jul 5, 2023
Master Issue: #1040

### Motivation


fix #1040, return when registerSendOrAckOp() return an error.

### Modifications

- modified: pulsar/producer_partition.go

---------

Co-authored-by: gunli <gunli@tencent.com>
RobertIndie pushed a commit that referenced this issue Sep 7, 2023
Master Issue: #1040

### Motivation

fix #1040, return when registerSendOrAckOp() return an error.

### Modifications

- modified: pulsar/producer_partition.go

---------

Co-authored-by: gunli <gunli@tencent.com>
(cherry picked from commit aa66471)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging a pull request may close this issue.

2 participants