Skip to content

Commit

Permalink
avoid duplicate egress (#701)
Browse files Browse the repository at this point in the history
  • Loading branch information
frostbyte73 authored Jun 14, 2024
1 parent 23d4004 commit 900ec3f
Show file tree
Hide file tree
Showing 4 changed files with 16 additions and 1 deletion.
1 change: 1 addition & 0 deletions pkg/errors/errors.go
Original file line number Diff line number Diff line change
Expand Up @@ -99,6 +99,7 @@ var (
ErrNoCompatibleCodec = psrpc.NewErrorf(psrpc.InvalidArgument, "no supported codec is compatible with all outputs")
ErrNoCompatibleFileOutputType = psrpc.NewErrorf(psrpc.InvalidArgument, "no supported file output type is compatible with the selected codecs")
ErrEgressNotFound = psrpc.NewErrorf(psrpc.NotFound, "egress not found")
ErrEgressAlreadyExists = psrpc.NewErrorf(psrpc.AlreadyExists, "egress already exists")
ErrSubscriptionFailed = psrpc.NewErrorf(psrpc.Unavailable, "failed to subscribe to track")
ErrNotEnoughCPU = psrpc.NewErrorf(psrpc.Unavailable, "not enough CPU")
ErrShuttingDown = psrpc.NewErrorf(psrpc.Unavailable, "server is shutting down")
Expand Down
5 changes: 4 additions & 1 deletion pkg/server/server_rpc.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,10 @@ func (s *Server) StartEgress(ctx context.Context, req *rpc.StartEgressRequest) (
s.activeRequests.Dec()
return nil, errors.ErrShuttingDown
}

if s.AlreadyExists(req.EgressId) {
s.activeRequests.Dec()
return nil, errors.ErrEgressAlreadyExists
}
if err := s.monitor.AcceptRequest(req); err != nil {
s.activeRequests.Dec()
return nil, err
Expand Down
8 changes: 8 additions & 0 deletions pkg/service/process.go
Original file line number Diff line number Diff line change
Expand Up @@ -101,6 +101,14 @@ func (pm *ProcessManager) GetContext(egressID string) context.Context {
return context.Background()
}

func (pm *ProcessManager) AlreadyExists(egressID string) bool {
pm.mu.RLock()
defer pm.mu.RUnlock()

_, ok := pm.activeHandlers[egressID]
return ok
}

func (pm *ProcessManager) HandlerStarted(egressID string) error {
pm.mu.RLock()
defer pm.mu.RUnlock()
Expand Down
3 changes: 3 additions & 0 deletions pkg/stats/monitor.go
Original file line number Diff line number Diff line change
Expand Up @@ -189,6 +189,9 @@ func (m *Monitor) AcceptRequest(req *rpc.StartEgressRequest) error {
m.mu.Lock()
defer m.mu.Unlock()

if m.pending[req.EgressId] != nil {
return errors.ErrEgressAlreadyExists
}
if !m.canAcceptRequestLocked(req) {
return errors.ErrNotEnoughCPU
}
Expand Down

0 comments on commit 900ec3f

Please sign in to comment.