diff --git a/raptorq/node/grpc/raptorq.go b/raptorq/node/grpc/raptorq.go index 1169fb59b..86f17dec9 100644 --- a/raptorq/node/grpc/raptorq.go +++ b/raptorq/node/grpc/raptorq.go @@ -23,12 +23,14 @@ const ( inputEncodeFileName = "input.data" symbolIDFileSubDir = "meta" symbolFileSubdir = "symbols" + concurrency = 4 ) type raptorQ struct { - conn *clientConn - client pb.RaptorQClient - config *node.Config + conn *clientConn + client pb.RaptorQClient + config *node.Config + semaphore chan struct{} // Semaphore to control concurrency } func randID() string { @@ -136,6 +138,11 @@ func scanSymbolIDFiles(dirPath string) (map[string]node.RawSymbolIDFile, error) // Encode data, and return a list of identifier of symbols func (service *raptorQ) RQEncode(ctx context.Context, data []byte, id string, store rqstore.Store) (*node.Encode, error) { + service.semaphore <- struct{}{} // Acquire slot + defer func() { + <-service.semaphore // Release the semaphore slot + }() + if data == nil { return nil, errors.Errorf("invalid data") } @@ -183,6 +190,11 @@ func (service *raptorQ) RQEncode(ctx context.Context, data []byte, id string, st } func (service *raptorQ) EncodeInfo(ctx context.Context, data []byte, copies uint32, blockHash string, pastelID string) (*node.EncodeInfo, error) { + service.semaphore <- struct{}{} // Acquire a semaphore slot + defer func() { + <-service.semaphore // Release the semaphore slot + }() + if data == nil { return nil, errors.Errorf("invalid data") } @@ -270,8 +282,9 @@ func (service *raptorQ) contextWithLogPrefix(ctx context.Context) context.Contex func newRaptorQ(conn *clientConn, config *node.Config) node.RaptorQ { return &raptorQ{ - conn: conn, - client: pb.NewRaptorQClient(conn), - config: config, + conn: conn, + client: pb.NewRaptorQClient(conn), + config: config, + semaphore: make(chan struct{}, concurrency), } }