Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

add context control for pub cmd and add async method #1147

Open
someview opened this issue Dec 25, 2023 · 3 comments
Open

add context control for pub cmd and add async method #1147

someview opened this issue Dec 25, 2023 · 3 comments

Comments

@someview
Copy link

Is your feature request related to a problem? Please describe.
add context control for pub cmd and add async method.

  1. For go, there are many reasons for us to use context to control a request.
  2. go has no keyword like await, but sometimes we may want it nonblock, eg: we pub a message without need to receive its result.
func (c *rpcClient) RequestToAnyBroker(requestID uint64, cmdType pb.BaseCommand_Type,
	message proto.Message) (*RPCResult, error) {
	var err error
	var host *url.URL
	var rpcResult *RPCResult
	startTime := time.Now()
	backoff := DefaultBackoff{100 * time.Millisecond}
	// we can retry these requests because this kind of request is
	// not specific to any particular broker
	for time.Since(startTime) < c.requestTimeout {
		host, err = c.serviceNameResolver.ResolveHost()
		if err != nil {
			c.log.WithError(err).Errorf("rpc client failed to resolve host")
			return nil, err
		}
		rpcResult, err = c.Request(host, host, requestID, cmdType, message)
		// success we got a response
		if err == nil {
			break
		}

		retryTime := backoff.Next()
		c.log.Debugf("Retrying request in {%v} with timeout in {%v}", retryTime, c.requestTimeout)
		time.Sleep(retryTime)
	}

	return rpcResult, err
}


func (c *rpcClient) Request(logicalAddr *url.URL, physicalAddr *url.URL, requestID uint64,
	cmdType pb.BaseCommand_Type, message proto.Message) (*RPCResult, error) {
	c.metrics.RPCRequestCount.Inc()
	cnx, err := c.pool.GetConnection(logicalAddr, physicalAddr)
	if err != nil {
		return nil, err
	}

	ch := make(chan result, 1)

	cnx.SendRequest(requestID, baseCommand(cmdType, message), func(response *pb.BaseCommand, err error) {
		ch <- result{&RPCResult{
			Cnx:      cnx,
			Response: response,
		}, err}
	})

	timeoutCh := time.After(c.requestTimeout)
	for {
		select {
		case res := <-ch:
			// Ignoring producer not ready response.
			// Continue to wait for the producer to create successfully
			if res.error == nil && *res.RPCResult.Response.Type == pb.BaseCommand_PRODUCER_SUCCESS {
				if !res.RPCResult.Response.ProducerSuccess.GetProducerReady() {
					timeoutCh = nil
					break
				}
			}
			return res.RPCResult, res.error
		case <-timeoutCh:
			return nil, ErrRequestTimeOut
		}
	}
}

Describe the solution you'd like
the above code is confused. Retry should seperate from a single request. eg:

func (c *rpcClient) WithRetry(func()){
}
this would avoid the above code that inline Request method to RequestToAnyBroker :
for{
  for{
     select{
         case <-ch1:
         case <- ch2:
    }
  }
}
@someview
Copy link
Author

I have find other mistake in the client code:

func (c *connection) SendRequest(requestID uint64, req *pb.BaseCommand,
	callback func(command *pb.BaseCommand, err error)) {
	c.incomingRequestsWG.Add(1)
	defer c.incomingRequestsWG.Done()

	state := c.getState()
	if state == connectionClosed || state == connectionClosing {
		callback(req, ErrConnectionClosed)

	} else {
		select {
		case <-c.closeCh:
			callback(req, ErrConnectionClosed)

		case c.incomingRequestsCh <- &request{
			id:       &requestID,
			cmd:      req,
			callback: callback,
		}:
		}
	}
}

func (c *connection) closed() bool {
	return connectionClosed == c.getState()
}

The state should be treated as atomic value.

@someview
Copy link
Author

someview commented Dec 25, 2023

The code may use chan any to instead multi select cases. The multi cases may cause performance issue. And chan any interface
would have better performance for this.

func (p *partitionProducer) runEventsLoop() {
	for {
		select {
		case data, ok := <-p.dataChan:
			// when doClose() is call, p.dataChan will be closed, data will be nil
			if !ok {
				return
			}
			p.internalSend(data)
		case cmd, ok := <-p.cmdChan:
			// when doClose() is call, p.dataChan will be closed, cmd will be nil
			if !ok {
				return
			}
			switch v := cmd.(type) {
			case *flushRequest:
				p.internalFlush(v)
			case *closeProducer:
				p.internalClose(v)
				return
			}
		case <-p.connectClosedCh:
			p.log.Info("runEventsLoop will reconnect in producer")
			p.reconnectToBroker()
		case <-p.batchFlushTicker.C:
			p.internalFlushCurrentBatch()
		}
	}
}

I have test some cases for this(cpu: 3.3GHZ):
even for little concurrency write and read from channel, a select case spends 40-50ns, but for a interface assert, just 5-10ns for struct type. grpc-go framework also uses chan any to handle channel stateupdate。

@someview
Copy link
Author

the Receive is pretty confused with me. Why for range is needed

func (c *consumer) Receive(ctx context.Context) (message Message, err error) {
	for {
		select {
		case <-c.closeCh:
			return nil, newError(ConsumerClosed, "consumer closed")
		case cm, ok := <-c.messageCh:
			if !ok {
				return nil, newError(ConsumerClosed, "consumer closed")
			}
			return cm.Message, nil
		case <-ctx.Done():
			return nil, ctx.Err()
		}
	}
}

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

1 participant