Skip to content

Commit

Permalink
[PSL-1196] control concurrent calls to RQService
Browse files Browse the repository at this point in the history
  • Loading branch information
mateeullahmalik committed Jun 21, 2024
1 parent a25cb63 commit d4a38cd
Showing 1 changed file with 19 additions and 6 deletions.
25 changes: 19 additions & 6 deletions raptorq/node/grpc/raptorq.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,12 +23,14 @@ const (
inputEncodeFileName = "input.data"
symbolIDFileSubDir = "meta"
symbolFileSubdir = "symbols"
concurrency = 4
)

type raptorQ struct {
conn *clientConn
client pb.RaptorQClient
config *node.Config
conn *clientConn
client pb.RaptorQClient
config *node.Config
semaphore chan struct{} // Semaphore to control concurrency
}

func randID() string {
Expand Down Expand Up @@ -136,6 +138,11 @@ func scanSymbolIDFiles(dirPath string) (map[string]node.RawSymbolIDFile, error)

// Encode data, and return a list of identifier of symbols
func (service *raptorQ) RQEncode(ctx context.Context, data []byte, id string, store rqstore.Store) (*node.Encode, error) {
service.semaphore <- struct{}{} // Acquire slot
defer func() {
<-service.semaphore // Release the semaphore slot
}()

if data == nil {
return nil, errors.Errorf("invalid data")
}
Expand Down Expand Up @@ -183,6 +190,11 @@ func (service *raptorQ) RQEncode(ctx context.Context, data []byte, id string, st
}

func (service *raptorQ) EncodeInfo(ctx context.Context, data []byte, copies uint32, blockHash string, pastelID string) (*node.EncodeInfo, error) {
service.semaphore <- struct{}{} // Acquire a semaphore slot
defer func() {
<-service.semaphore // Release the semaphore slot
}()

if data == nil {
return nil, errors.Errorf("invalid data")
}
Expand Down Expand Up @@ -270,8 +282,9 @@ func (service *raptorQ) contextWithLogPrefix(ctx context.Context) context.Contex

func newRaptorQ(conn *clientConn, config *node.Config) node.RaptorQ {
return &raptorQ{
conn: conn,
client: pb.NewRaptorQClient(conn),
config: config,
conn: conn,
client: pb.NewRaptorQClient(conn),
config: config,
semaphore: make(chan struct{}, concurrency),
}
}

0 comments on commit d4a38cd

Please sign in to comment.