From 900ec3f476f7b3ff0ecd5c20b9ecd69e36503027 Mon Sep 17 00:00:00 2001 From: David Colburn Date: Thu, 13 Jun 2024 17:43:31 -0700 Subject: [PATCH] avoid duplicate egress (#701) --- pkg/errors/errors.go | 1 + pkg/server/server_rpc.go | 5 ++++- pkg/service/process.go | 8 ++++++++ pkg/stats/monitor.go | 3 +++ 4 files changed, 16 insertions(+), 1 deletion(-) diff --git a/pkg/errors/errors.go b/pkg/errors/errors.go index e31846f2..1d92aa41 100644 --- a/pkg/errors/errors.go +++ b/pkg/errors/errors.go @@ -99,6 +99,7 @@ var ( ErrNoCompatibleCodec = psrpc.NewErrorf(psrpc.InvalidArgument, "no supported codec is compatible with all outputs") ErrNoCompatibleFileOutputType = psrpc.NewErrorf(psrpc.InvalidArgument, "no supported file output type is compatible with the selected codecs") ErrEgressNotFound = psrpc.NewErrorf(psrpc.NotFound, "egress not found") + ErrEgressAlreadyExists = psrpc.NewErrorf(psrpc.AlreadyExists, "egress already exists") ErrSubscriptionFailed = psrpc.NewErrorf(psrpc.Unavailable, "failed to subscribe to track") ErrNotEnoughCPU = psrpc.NewErrorf(psrpc.Unavailable, "not enough CPU") ErrShuttingDown = psrpc.NewErrorf(psrpc.Unavailable, "server is shutting down") diff --git a/pkg/server/server_rpc.go b/pkg/server/server_rpc.go index 8a7b6cc6..0c15a7da 100644 --- a/pkg/server/server_rpc.go +++ b/pkg/server/server_rpc.go @@ -46,7 +46,10 @@ func (s *Server) StartEgress(ctx context.Context, req *rpc.StartEgressRequest) ( s.activeRequests.Dec() return nil, errors.ErrShuttingDown } - + if s.AlreadyExists(req.EgressId) { + s.activeRequests.Dec() + return nil, errors.ErrEgressAlreadyExists + } if err := s.monitor.AcceptRequest(req); err != nil { s.activeRequests.Dec() return nil, err diff --git a/pkg/service/process.go b/pkg/service/process.go index 293c0c31..883db5ab 100644 --- a/pkg/service/process.go +++ b/pkg/service/process.go @@ -101,6 +101,14 @@ func (pm *ProcessManager) GetContext(egressID string) context.Context { return context.Background() } +func (pm *ProcessManager) AlreadyExists(egressID string) bool { + pm.mu.RLock() + defer pm.mu.RUnlock() + + _, ok := pm.activeHandlers[egressID] + return ok +} + func (pm *ProcessManager) HandlerStarted(egressID string) error { pm.mu.RLock() defer pm.mu.RUnlock() diff --git a/pkg/stats/monitor.go b/pkg/stats/monitor.go index 30cf342f..65eb7c5d 100644 --- a/pkg/stats/monitor.go +++ b/pkg/stats/monitor.go @@ -189,6 +189,9 @@ func (m *Monitor) AcceptRequest(req *rpc.StartEgressRequest) error { m.mu.Lock() defer m.mu.Unlock() + if m.pending[req.EgressId] != nil { + return errors.ErrEgressAlreadyExists + } if !m.canAcceptRequestLocked(req) { return errors.ErrNotEnoughCPU }