Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Improve][Producer]Simplify the MaxPendingMessages controlling #1043

Closed
gunli opened this issue Jun 28, 2023 · 1 comment · Fixed by #1121
Closed

[Improve][Producer]Simplify the MaxPendingMessages controlling #1043

gunli opened this issue Jun 28, 2023 · 1 comment · Fixed by #1121

Comments

@gunli
Copy link
Contributor

gunli commented Jun 28, 2023

Expected behavior

  1. make pendingQueue of dynamic size (can be expanded at runtime) or with a bigger size than MaxPendingMessages ;
  2. partitionProducer.canAddToQueue() do not check pendingQueue any more if it can be expanded at runtime;
  3. call partitionProducer.canAddToQueue() in partitionProducer.internalSendAsync before putting the sendRequest into partitionProducer.dataChan;
  4. delete sendRequest.blockCh, no blocking any more;
  5. treat all the messages that have been put into partitionProducer.dataChan as pendingMessage, send and flush them when the producer is closed.

Now, we won't block every time, just block when MaxPendingMessages is reached, and no semantics confusion any more.

Actual behavior

Currently, we use a lot of measures to controll the MaxPendingMessages in the producer, partitionProducer.dataChan with MaxPendingMessages capacity, partitionProducer.publishSemaphore , sendRequest.blockCh, partitionProducer.canAddToQueue() and the complicated code in partitionProducer.internalSend(), these make the code difficult to understand and maintain. What is worse, is that when DisableBlockIfQueueFull is set to false, the partitionProducer.internalSendAsync() will block every time, event if the queue is NOT full, which will block the application level logic.

And, when chunking is enabled, a Message will be seperated into many chunks, correspondingly, many pendingItems in the pendingQueue, when the pendingQueue is full, it doesn't mean MaxPendingMessages is reached from the user‘s eye view, which is semantically confused.

if !p.options.DisableBlockIfQueueFull {
  // block if queue full
	<-bc
}

Steps to reproduce

Review the code

func (p *partitionProducer) internalSendAsync(ctx context.Context, msg *ProducerMessage,
	callback func(MessageID, *ProducerMessage, error), flushImmediately bool) {
	//Register transaction operation to transaction and the transaction coordinator.
	var newCallback func(MessageID, *ProducerMessage, error)
	if msg.Transaction != nil {
		transactionImpl := (msg.Transaction).(*transaction)
		if transactionImpl.state != TxnOpen {
			p.log.WithField("state", transactionImpl.state).Error("Failed to send message" +
				" by a non-open transaction.")
			callback(nil, msg, newError(InvalidStatus, "Failed to send message by a non-open transaction."))
			return
		}

		if err := transactionImpl.registerProducerTopic(p.topic); err != nil {
			callback(nil, msg, err)
			return
		}
		if err := transactionImpl.registerSendOrAckOp(); err != nil {
			callback(nil, msg, err)
		}
		newCallback = func(id MessageID, producerMessage *ProducerMessage, err error) {
			callback(id, producerMessage, err)
			transactionImpl.endSendOrAckOp(err)
		}
	} else {
		newCallback = callback
	}
	if p.getProducerState() != producerReady {
		// Producer is closing
		newCallback(nil, msg, errProducerClosed)
		return
	}

	// bc only works when DisableBlockIfQueueFull is false
	bc := make(chan struct{})

	// callbackOnce make sure the callback is only invoked once in chunking
	callbackOnce := &sync.Once{}
	var txn *transaction
	if msg.Transaction != nil {
		txn = (msg.Transaction).(*transaction)
	}
	sr := &sendRequest{
		ctx:              ctx,
		msg:              msg,
		callback:         newCallback,
		callbackOnce:     callbackOnce,
		flushImmediately: flushImmediately,
		publishTime:      time.Now(),
		blockCh:          bc,
		closeBlockChOnce: &sync.Once{},
		transaction:      txn,
	}
	p.options.Interceptors.BeforeSend(p, msg)

	p.dataChan <- sr

	if !p.options.DisableBlockIfQueueFull {
		// block if queue full
		<-bc
	}
}

func (p *partitionProducer) canAddToQueue(sr *sendRequest, uncompressedPayloadSize int64) bool {
	if p.options.DisableBlockIfQueueFull {
		if !p.publishSemaphore.TryAcquire() {
			if sr.callback != nil {
				sr.callback(nil, sr.msg, errSendQueueIsFull)
			}
			return false
		}
		if !p.client.memLimit.TryReserveMemory(uncompressedPayloadSize) {
			p.publishSemaphore.Release()
			if sr.callback != nil {
				sr.callback(nil, sr.msg, errMemoryBufferIsFull)
			}
			return false
		}

	} else {
		if !p.publishSemaphore.Acquire(sr.ctx) {
			sr.callback(nil, sr.msg, errContextExpired)
			return false
		}
		if !p.client.memLimit.ReserveMemory(sr.ctx, uncompressedPayloadSize) {
			p.publishSemaphore.Release()
			sr.callback(nil, sr.msg, errContextExpired)
			return false
		}
	}
	p.metrics.MessagesPending.Inc()
	p.metrics.BytesPending.Add(float64(len(sr.msg.Payload)))
	return true
}

func (p *partitionProducer) internalSend(request *sendRequest) {
	p.log.Debug("Received send request: ", *request.msg)

	msg := request.msg

	// read payload from message
	uncompressedPayload := msg.Payload
	uncompressedPayloadSize := int64(len(uncompressedPayload))

	var schemaPayload []byte
	var err error
	if msg.Value != nil && msg.Payload != nil {
		p.log.Error("Can not set Value and Payload both")
		request.callback(nil, request.msg, errors.New("can not set Value and Payload both"))
		return
	}

	// The block chan must be closed when returned with exception
	defer request.stopBlock()
	if !p.canAddToQueue(request, uncompressedPayloadSize) {
		return
	}

	if p.options.DisableMultiSchema {
		if msg.Schema != nil && p.options.Schema != nil &&
			msg.Schema.GetSchemaInfo().hash() != p.options.Schema.GetSchemaInfo().hash() {
			p.releaseSemaphoreAndMem(uncompressedPayloadSize)
			request.callback(nil, request.msg, fmt.Errorf("msg schema can not match with producer schema"))
			p.log.WithError(err).Errorf("The producer %s of the topic %s is disabled the `MultiSchema`", p.producerName, p.topic)
			return
		}
	}
	var schema Schema
	var schemaVersion []byte
	if msg.Schema != nil {
		schema = msg.Schema
	} else if p.options.Schema != nil {
		schema = p.options.Schema
	}
	if msg.Value != nil {
		// payload and schema are mutually exclusive
		// try to get payload from schema value only if payload is not set
		if uncompressedPayload == nil && schema != nil {
			schemaPayload, err = schema.Encode(msg.Value)
			if err != nil {
				p.releaseSemaphoreAndMem(uncompressedPayloadSize)
				request.callback(nil, request.msg, newError(SchemaFailure, err.Error()))
				p.log.WithError(err).Errorf("Schema encode message failed %s", msg.Value)
				return
			}
		}
	}
	if uncompressedPayload == nil {
		uncompressedPayload = schemaPayload
	}

	if schema != nil {
		schemaVersion = p.schemaCache.Get(schema.GetSchemaInfo())
		if schemaVersion == nil {
			schemaVersion, err = p.getOrCreateSchema(schema.GetSchemaInfo())
			if err != nil {
				p.releaseSemaphoreAndMem(uncompressedPayloadSize)
				p.log.WithError(err).Error("get schema version fail")
				request.callback(nil, request.msg, fmt.Errorf("get schema version fail, err: %w", err))
				return
			}
			p.schemaCache.Put(schema.GetSchemaInfo(), schemaVersion)
		}
	}

	uncompressedSize := len(uncompressedPayload)

	deliverAt := msg.DeliverAt
	if msg.DeliverAfter.Nanoseconds() > 0 {
		deliverAt = time.Now().Add(msg.DeliverAfter)
	}

	mm := p.genMetadata(msg, uncompressedSize, deliverAt)

	// set default ReplicationClusters when DisableReplication
	if msg.DisableReplication {
		msg.ReplicationClusters = []string{"__local__"}
	}

	sendAsBatch := !p.options.DisableBatching &&
		msg.ReplicationClusters == nil &&
		deliverAt.UnixNano() < 0

	// Once the batching is enabled, it can close blockCh early to make block finish
	if sendAsBatch {
		request.stopBlock()
	} else {
		// update sequence id for metadata, make the size of msgMetadata more accurate
		// batch sending will update sequence ID in the BatchBuilder
		p.updateMetadataSeqID(mm, msg)
	}

	maxMessageSize := int(p._getConn().GetMaxMessageSize())

	// compress payload if not batching
	var compressedPayload []byte
	var compressedSize int
	var checkSize int
	if !sendAsBatch {
		compressedPayload = p.compressionProvider.Compress(nil, uncompressedPayload)
		compressedSize = len(compressedPayload)
		checkSize = compressedSize

		// set the compress type in msgMetaData
		compressionType := pb.CompressionType(p.options.CompressionType)
		if compressionType != pb.CompressionType_NONE {
			mm.Compression = &compressionType
		}
	} else {
		// final check for batching message is in serializeMessage
		// this is a double check
		checkSize = uncompressedSize
	}

	// if msg is too large and chunking is disabled
	if checkSize > maxMessageSize && !p.options.EnableChunking {
		p.releaseSemaphoreAndMem(uncompressedPayloadSize)
		request.callback(nil, request.msg, errMessageTooLarge)
		p.log.WithError(errMessageTooLarge).
			WithField("size", checkSize).
			WithField("properties", msg.Properties).
			Errorf("MaxMessageSize %d", maxMessageSize)
		p.metrics.PublishErrorsMsgTooLarge.Inc()
		return
	}

	var totalChunks int
	// max chunk payload size
	var payloadChunkSize int
	if sendAsBatch || !p.options.EnableChunking {
		totalChunks = 1
		payloadChunkSize = int(p._getConn().GetMaxMessageSize())
	} else {
		payloadChunkSize = int(p._getConn().GetMaxMessageSize()) - proto.Size(mm)
		if payloadChunkSize <= 0 {
			p.releaseSemaphoreAndMem(uncompressedPayloadSize)
			request.callback(nil, msg, errMetaTooLarge)
			p.log.WithError(errMetaTooLarge).
				WithField("metadata size", proto.Size(mm)).
				WithField("properties", msg.Properties).
				Errorf("MaxMessageSize %d", int(p._getConn().GetMaxMessageSize()))
			p.metrics.PublishErrorsMsgTooLarge.Inc()
			return
		}
		// set ChunkMaxMessageSize
		if p.options.ChunkMaxMessageSize != 0 {
			payloadChunkSize = int(math.Min(float64(payloadChunkSize), float64(p.options.ChunkMaxMessageSize)))
		}
		totalChunks = int(math.Max(1, math.Ceil(float64(compressedSize)/float64(payloadChunkSize))))
	}

	// set total chunks to send request
	request.totalChunks = totalChunks

	if !sendAsBatch {
		if totalChunks > 1 {
			var lhs, rhs int
			uuid := fmt.Sprintf("%s-%s", p.producerName, strconv.FormatUint(*mm.SequenceId, 10))
			mm.Uuid = proto.String(uuid)
			mm.NumChunksFromMsg = proto.Int32(int32(totalChunks))
			mm.TotalChunkMsgSize = proto.Int32(int32(compressedSize))
			cr := newChunkRecorder()
			for chunkID := 0; chunkID < totalChunks; chunkID++ {
				lhs = chunkID * payloadChunkSize
				if rhs = lhs + payloadChunkSize; rhs > compressedSize {
					rhs = compressedSize
				}
				// update chunk id
				mm.ChunkId = proto.Int32(int32(chunkID))
				nsr := &sendRequest{
					ctx:              request.ctx,
					msg:              request.msg,
					callback:         request.callback,
					callbackOnce:     request.callbackOnce,
					publishTime:      request.publishTime,
					blockCh:          request.blockCh,
					closeBlockChOnce: request.closeBlockChOnce,
					totalChunks:      totalChunks,
					chunkID:          chunkID,
					uuid:             uuid,
					chunkRecorder:    cr,
					transaction:      request.transaction,
				}
				// the permit of first chunk has acquired
				if chunkID != 0 && !p.canAddToQueue(nsr, 0) {
					p.releaseSemaphoreAndMem(uncompressedPayloadSize - int64(rhs))
					return
				}
				p.internalSingleSend(mm, compressedPayload[lhs:rhs], nsr, uint32(maxMessageSize))
			}
			// close the blockCh when all the chunks acquired permits
			request.stopBlock()
		} else {
			// close the blockCh when totalChunks is 1 (it has acquired permits)
			request.stopBlock()
			p.internalSingleSend(mm, compressedPayload, request, uint32(maxMessageSize))
		}
	} else {
		smm := p.genSingleMessageMetadataInBatch(msg, uncompressedSize)
		multiSchemaEnabled := !p.options.DisableMultiSchema
		added := addRequestToBatch(smm, p, uncompressedPayload, request, msg, deliverAt, schemaVersion,
			multiSchemaEnabled)
		if !added {
			// The current batch is full. flush it and retry

			p.internalFlushCurrentBatch()

			// after flushing try again to add the current payload
			if ok := addRequestToBatch(smm, p, uncompressedPayload, request, msg, deliverAt, schemaVersion,
				multiSchemaEnabled); !ok {
				p.releaseSemaphoreAndMem(uncompressedPayloadSize)
				request.callback(nil, request.msg, errFailAddToBatch)
				p.log.WithField("size", uncompressedSize).
					WithField("properties", msg.Properties).
					Error("unable to add message to batch")
				return
			}
		}
		if request.flushImmediately {

			p.internalFlushCurrentBatch()

		}
	}
}

System configuration

Pulsar version: x.y
@merlimat @RobertIndie @wolfstudy @Gleiphir2769

@Gleiphir2769
Copy link
Contributor

Gleiphir2769 commented Jul 8, 2023

Hi @gunli. Why not submit a PR to commit your inprove? It's easier to review and discuss in the PR thread.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment