From e66d70c0f2b35c6978ff1f3d5f3bc85b146244a7 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Fran=C3=A7ois=20Samin?= Date: Fri, 12 Oct 2018 17:37:30 +0200 Subject: [PATCH] fix(api): write on sse (#3457) --- engine/api/events.go | 145 ++++++++++++++++++++++++++----------------- 1 file changed, 89 insertions(+), 56 deletions(-) diff --git a/engine/api/events.go b/engine/api/events.go index ec5ec99f4e..f8e93ddbf7 100644 --- a/engine/api/events.go +++ b/engine/api/events.go @@ -27,9 +27,9 @@ import ( type eventsBrokerSubscribe struct { UUID string User *sdk.User - Queue chan sdk.Event - Mutex *sync.Mutex - IsAlive *abool.AtomicBool + isAlive *abool.AtomicBool + w http.ResponseWriter + mutex sync.Mutex } // lastUpdateBroker keeps connected client of the current route, @@ -93,7 +93,7 @@ func (b *eventsBroker) Start(ctx context.Context) { b.chanAddClient = make(chan (*eventsBrokerSubscribe)) b.chanRemoveClient = make(chan (string)) - tickerMetrics := time.NewTicker(30 * time.Second) + tickerMetrics := time.NewTicker(10 * time.Second) defer tickerMetrics.Stop() for { @@ -103,40 +103,50 @@ func (b *eventsBroker) Start(ctx context.Context) { case <-ctx.Done(): if b.clients != nil { - for c, v := range b.clients { - close(v.Queue) - delete(b.clients, c) + for uuid := range b.clients { + delete(b.clients, uuid) } + observability.Record(b.router.Background, b.router.Stats.SSEClients, 0) } if ctx.Err() != nil { log.Error("eventsBroker.Start> Exiting: %v", ctx.Err()) return } + case receivedEvent := <-b.messages: for i := range b.clients { - go func(c *eventsBrokerSubscribe) { - c.Mutex.Lock() - defer c.Mutex.Unlock() - if c.IsAlive.IsSet() { - log.Debug("send data to %s", c.UUID) - c.Queue <- receivedEvent - } - }(b.clients[i]) + c := b.clients[i] + if c == nil { + delete(b.clients, i) + continue + } + + // Send the event to the client sse within a goroutine + s := "sse-" + b.clients[i].UUID + sdk.GoRoutine(ctx, s, + func(ctx context.Context) { + if c.isAlive.IsSet() { + log.Debug("send data to %s", c.UUID) + if err := c.Send(receivedEvent); err != nil { + log.Error("eventsBroker> unable to send event to %s: %v", c.UUID, err) + b.chanRemoveClient <- c.UUID + } + } + }, + ) } + case client := <-b.chanAddClient: b.clients[client.UUID] = client + case uuid := <-b.chanRemoveClient: client, has := b.clients[uuid] if !has { - return + continue } - go func(c *eventsBrokerSubscribe) { - c.Mutex.Lock() - close(c.Queue) - c.IsAlive.UnSet() - c.Mutex.Unlock() - }(client) + + client.isAlive.UnSet() delete(b.clients, uuid) } } @@ -155,9 +165,8 @@ func (b *eventsBroker) ServeHTTP() service.Handler { client := &eventsBrokerSubscribe{ UUID: uuid, User: getUser(ctx), - Queue: make(chan sdk.Event, 10), // chan buffered, to avoid goroutine Start() wait on push in queue - Mutex: new(sync.Mutex), - IsAlive: abool.NewBool(true), + isAlive: abool.NewBool(true), + w: w, } // Add this client to the map of those that should receive updates @@ -188,29 +197,6 @@ func (b *eventsBroker) ServeHTTP() service.Handler { log.Info("events.Http: client disconnected") b.chanRemoveClient <- client.UUID break leave - case event := <-client.Queue: - if ok := client.manageEvent(event); !ok { - continue - } - - msg, errJ := json.Marshal(event) - if errJ != nil { - log.Warning("sendevent> Unavble to marshall event: %v", errJ) - continue - } - - var buffer bytes.Buffer - buffer.WriteString("data: ") - buffer.Write(msg) - buffer.WriteString("\n\n") - - if !client.IsAlive.IsSet() { - break leave - } - if _, err := w.Write(buffer.Bytes()); err != nil { - return sdk.WrapError(err, "events.write> Unable to write to client") - } - f.Flush() case <-tick.C: if _, err := w.Write([]byte("")); err != nil { return sdk.WrapError(err, "events.write> Unable to ping client") @@ -223,9 +209,9 @@ func (b *eventsBroker) ServeHTTP() service.Handler { } } -func (s *eventsBrokerSubscribe) manageEvent(event sdk.Event) bool { +func (client *eventsBrokerSubscribe) manageEvent(event sdk.Event) bool { var isSharedInfra bool - for _, g := range s.User.Groups { + for _, g := range client.User.Groups { if g.ID == group.SharedInfraGroup.ID { isSharedInfra = true break @@ -233,40 +219,87 @@ func (s *eventsBrokerSubscribe) manageEvent(event sdk.Event) bool { } if strings.HasPrefix(event.EventType, "sdk.EventProject") { - if s.User.Admin || isSharedInfra || permission.ProjectPermission(event.ProjectKey, s.User) >= permission.PermissionRead { + if client.User.Admin || isSharedInfra || permission.ProjectPermission(event.ProjectKey, client.User) >= permission.PermissionRead { return true } return false } if strings.HasPrefix(event.EventType, "sdk.EventWorkflow") || strings.HasPrefix(event.EventType, "sdk.EventRunWorkflow") { - if s.User.Admin || isSharedInfra || permission.WorkflowPermission(event.ProjectKey, event.WorkflowName, s.User) >= permission.PermissionRead { + if client.User.Admin || isSharedInfra || permission.WorkflowPermission(event.ProjectKey, event.WorkflowName, client.User) >= permission.PermissionRead { return true } return false } if strings.HasPrefix(event.EventType, "sdk.EventApplication") { - if s.User.Admin || isSharedInfra || permission.ApplicationPermission(event.ProjectKey, event.ApplicationName, s.User) >= permission.PermissionRead { + if client.User.Admin || isSharedInfra || permission.ApplicationPermission(event.ProjectKey, event.ApplicationName, client.User) >= permission.PermissionRead { return true } return false } if strings.HasPrefix(event.EventType, "sdk.EventPipeline") { - if s.User.Admin || isSharedInfra || permission.PipelinePermission(event.ProjectKey, event.PipelineName, s.User) >= permission.PermissionRead { + if client.User.Admin || isSharedInfra || permission.PipelinePermission(event.ProjectKey, event.PipelineName, client.User) >= permission.PermissionRead { return true } return false } if strings.HasPrefix(event.EventType, "sdk.EventEnvironment") { - if s.User.Admin || isSharedInfra || permission.EnvironmentPermission(event.ProjectKey, event.EnvironmentName, s.User) >= permission.PermissionRead { + if client.User.Admin || isSharedInfra || permission.EnvironmentPermission(event.ProjectKey, event.EnvironmentName, client.User) >= permission.PermissionRead { return true } return false } if strings.HasPrefix(event.EventType, "sdk.EventBroadcast") { - if s.User.Admin || isSharedInfra || event.ProjectKey == "" || permission.AccessToProject(event.ProjectKey, s.User, permission.PermissionRead) { + if client.User.Admin || isSharedInfra || event.ProjectKey == "" || permission.AccessToProject(event.ProjectKey, client.User, permission.PermissionRead) { return true } return false } return false } + +// Send an event to a client +func (client *eventsBrokerSubscribe) Send(event sdk.Event) (err error) { + client.mutex.Lock() + defer client.mutex.Unlock() + + if client == nil || client.w == nil { + return nil + } + + // Make sure that the writer supports flushing. + f, ok := client.w.(http.Flusher) + if !ok { + return sdk.WrapError(fmt.Errorf("streaming unsupported"), "") + } + + if ok := client.manageEvent(event); !ok { + return nil + } + + msg, err := json.Marshal(event) + if err != nil { + return sdk.WrapError(err, "Unable to marshall event") + } + + var buffer bytes.Buffer + buffer.WriteString("data: ") + buffer.Write(msg) + buffer.WriteString("\n\n") + + if !client.isAlive.IsSet() { + return nil + } + + defer func() { + if r := recover(); r != nil { + err = fmt.Errorf("%v", r) + } + }() + + if _, err := client.w.Write(buffer.Bytes()); err != nil { + return sdk.WrapError(err, "unable to write to client") + } + f.Flush() + + return nil +}