diff --git a/pkg/aspect/lint/bep.go b/pkg/aspect/lint/bep.go index 140c6d4da..dcc0f5567 100644 --- a/pkg/aspect/lint/bep.go +++ b/pkg/aspect/lint/bep.go @@ -127,7 +127,7 @@ func parseLinterMnemonicFromFilename(filename string) string { return s[len(s)-2] } -func (runner *LintBEPHandler) bepEventCallback(event *buildeventstream.BuildEvent) error { +func (runner *LintBEPHandler) bepEventCallback(event *buildeventstream.BuildEvent, sn int64) error { switch event.Payload.(type) { case *buildeventstream.BuildEvent_WorkspaceInfo: diff --git a/pkg/aspect/lint/lint.go b/pkg/aspect/lint/lint.go index f268cd83e..f5ed85606 100644 --- a/pkg/aspect/lint/lint.go +++ b/pkg/aspect/lint/lint.go @@ -243,7 +243,7 @@ lint: } lintBEPHandler = newLintBEPHandler(workspaceRoot, besCompleted) - besBackend.RegisterSubscriber(lintBEPHandler.bepEventCallback) + besBackend.RegisterSubscriber(lintBEPHandler.bepEventCallback, false) } if postTerminateArgs != nil { diff --git a/pkg/aspect/root/config/config.go b/pkg/aspect/root/config/config.go index 3e43479b4..52df0f608 100644 --- a/pkg/aspect/root/config/config.go +++ b/pkg/aspect/root/config/config.go @@ -429,14 +429,16 @@ func UnmarshalPluginConfig(pluginsConfig interface{}) ([]types.PluginConfig, err version, _ := pluginsMap["version"].(string) logLevel, _ := pluginsMap["log_level"].(string) + multi_threaded_build_events, _ := pluginsMap["multi_threaded_build_events"].(bool) properties, _ := pluginsMap["properties"].(map[string]interface{}) plugins = append(plugins, types.PluginConfig{ - Name: name, - From: from, - Version: version, - LogLevel: logLevel, - Properties: properties, + Name: name, + From: from, + Version: version, + LogLevel: logLevel, + MultiThreadedBuildEvents: multi_threaded_build_events, + Properties: properties, }) } diff --git a/pkg/plugin/client/client.go b/pkg/plugin/client/client.go index 2f7a6cd39..b2a44cfed 100644 --- a/pkg/plugin/client/client.go +++ b/pkg/plugin/client/client.go @@ -158,8 +158,9 @@ func (c *clientFactory) New(aspectplugin types.PluginConfig, streams ioutils.Str } res := &PluginInstance{ - Plugin: rawplugin.(plugin.Plugin), - Provider: goclient, + Plugin: rawplugin.(plugin.Plugin), + Provider: goclient, + MultiThreaded: aspectplugin.MultiThreadedBuildEvents, } if customCommandExecutor, ok := rawplugin.(CustomCommandExecutor); ok { @@ -180,6 +181,7 @@ type Provider interface { // as any associated objects or metadata. type PluginInstance struct { plugin.Plugin + MultiThreaded bool Provider CustomCommandExecutor } diff --git a/pkg/plugin/sdk/v1alpha4/plugin/grpc.go b/pkg/plugin/sdk/v1alpha4/plugin/grpc.go index ed2b9a997..4b6288fe0 100644 --- a/pkg/plugin/sdk/v1alpha4/plugin/grpc.go +++ b/pkg/plugin/sdk/v1alpha4/plugin/grpc.go @@ -71,7 +71,7 @@ func (m *GRPCServer) BEPEventCallback( ctx context.Context, req *proto.BEPEventCallbackReq, ) (*proto.BEPEventCallbackRes, error) { - return &proto.BEPEventCallbackRes{}, m.Impl.BEPEventCallback(req.Event) + return &proto.BEPEventCallbackRes{}, m.Impl.BEPEventCallback(req.Event, req.SequenceNumber) } // Setup translates the gRPC call to the Plugin Setup implementation. @@ -189,8 +189,8 @@ var _ Plugin = (*GRPCClient)(nil) // BEPEventCallback is called from the Core to execute the Plugin // BEPEventCallback. -func (m *GRPCClient) BEPEventCallback(event *buildeventstream.BuildEvent) error { - _, err := m.client.BEPEventCallback(context.Background(), &proto.BEPEventCallbackReq{Event: event}) +func (m *GRPCClient) BEPEventCallback(event *buildeventstream.BuildEvent, sn int64) error { + _, err := m.client.BEPEventCallback(context.Background(), &proto.BEPEventCallbackReq{Event: event, SequenceNumber: sn}) return err } diff --git a/pkg/plugin/sdk/v1alpha4/plugin/interface.go b/pkg/plugin/sdk/v1alpha4/plugin/interface.go index 4741a969a..250e98ad0 100644 --- a/pkg/plugin/sdk/v1alpha4/plugin/interface.go +++ b/pkg/plugin/sdk/v1alpha4/plugin/interface.go @@ -28,7 +28,7 @@ import ( // Plugin determines how an aspect Plugin should be implemented. type Plugin interface { - BEPEventCallback(event *buildeventstream.BuildEvent) error + BEPEventCallback(event *buildeventstream.BuildEvent, sn int64) error CustomCommands() ([]*Command, error) PostBuildHook( isInteractiveMode bool, @@ -88,7 +88,7 @@ func (*Base) Setup(*SetupConfig) error { } // BEPEventCallback satisfies Plugin.BEPEventCallback. -func (*Base) BEPEventCallback(*buildeventstream.BuildEvent) error { +func (*Base) BEPEventCallback(*buildeventstream.BuildEvent, int64) error { return nil } diff --git a/pkg/plugin/sdk/v1alpha4/proto/BUILD.bazel b/pkg/plugin/sdk/v1alpha4/proto/BUILD.bazel index 4a74c692b..c6ad49441 100644 --- a/pkg/plugin/sdk/v1alpha4/proto/BUILD.bazel +++ b/pkg/plugin/sdk/v1alpha4/proto/BUILD.bazel @@ -1,6 +1,7 @@ load("@io_bazel_rules_go//go:def.bzl", "go_library") load("@io_bazel_rules_go//proto:def.bzl", "go_proto_library") load("@rules_proto//proto:defs.bzl", "proto_library") +load("//bazel/go:write_go_generated_source_files.bzl", "write_go_generated_source_files") # gazelle:exclude dummy.go @@ -20,6 +21,14 @@ go_proto_library( deps = ["//bazel/buildeventstream"], ) +write_go_generated_source_files( + name = "write_pb_go", + src = ":proto_go_proto", + output_files = [ + "plugin.pb.go", + ], +) + go_library( name = "proto", embed = [":proto_go_proto"], diff --git a/pkg/plugin/sdk/v1alpha4/proto/plugin.proto b/pkg/plugin/sdk/v1alpha4/proto/plugin.proto index 8d3a74681..5d4c8bfa3 100644 --- a/pkg/plugin/sdk/v1alpha4/proto/plugin.proto +++ b/pkg/plugin/sdk/v1alpha4/proto/plugin.proto @@ -19,6 +19,7 @@ service Plugin { message BEPEventCallbackReq { build_event_stream.BuildEvent event = 1; + int64 sequence_number = 2; } message BEPEventCallbackRes {} diff --git a/pkg/plugin/system/bep/bes_backend.go b/pkg/plugin/system/bep/bes_backend.go index 4f4656f1a..beb087fd4 100644 --- a/pkg/plugin/system/bep/bes_backend.go +++ b/pkg/plugin/system/bep/bes_backend.go @@ -56,7 +56,7 @@ type BESBackend interface { GracefulStop() Addr() string RegisterBesProxy(p besproxy.BESProxy) - RegisterSubscriber(callback CallbackFn) + RegisterSubscriber(callback CallbackFn, multiThreaded bool) Errors() []error } @@ -83,28 +83,30 @@ func InjectBESBackend(ctx context.Context, besBackend BESBackend) context.Contex } type besBackend struct { - besProxies []besproxy.BESProxy - closeOnce sync.Once - ctx context.Context - errors *aspecterrors.ErrorList - grpcDialer aspectgrpc.Dialer - grpcServer aspectgrpc.Server - listener net.Listener - netListen func(network, address string) (net.Listener, error) - startServe chan struct{} - subscribers *subscriberList + besProxies []besproxy.BESProxy + ctx context.Context + errors *aspecterrors.ErrorList + errorsMutex sync.RWMutex + grpcDialer aspectgrpc.Dialer + grpcServer aspectgrpc.Server + listener net.Listener + netListen func(network, address string) (net.Listener, error) + startServe chan struct{} + subscribers *subscriberList + mtSubscribers *subscriberList } // NewBESBackend creates a new Build Event Protocol backend. func NewBESBackend(ctx context.Context) BESBackend { return &besBackend{ - besProxies: []besproxy.BESProxy{}, - ctx: ctx, - errors: &aspecterrors.ErrorList{}, - grpcDialer: aspectgrpc.NewDialer(), - netListen: net.Listen, - startServe: make(chan struct{}, 1), - subscribers: &subscriberList{}, + besProxies: []besproxy.BESProxy{}, + ctx: ctx, + errors: &aspecterrors.ErrorList{}, + grpcDialer: aspectgrpc.NewDialer(), + netListen: net.Listen, + startServe: make(chan struct{}, 1), + subscribers: &subscriberList{}, + mtSubscribers: &subscriberList{}, } } @@ -176,6 +178,8 @@ func (bb *besBackend) Addr() string { // Errors return the errors produced by the subscriber callback functions. func (bb *besBackend) Errors() []error { + bb.errorsMutex.RLock() + defer bb.errorsMutex.RUnlock() return bb.errors.Errors() } @@ -187,12 +191,16 @@ func (bb *besBackend) RegisterBesProxy(p besproxy.BESProxy) { // CallbackFn is the signature for the callback function used by the subscribers // of the Build Event Protocol events. -type CallbackFn func(*buildeventstream.BuildEvent) error +type CallbackFn func(*buildeventstream.BuildEvent, int64) error // RegisterSubscriber registers a new subscriber callback function to the // Build Event Protocol events. -func (bb *besBackend) RegisterSubscriber(callback CallbackFn) { - bb.subscribers.Insert(callback) +func (bb *besBackend) RegisterSubscriber(callback CallbackFn, multiThreaded bool) { + if multiThreaded { + bb.mtSubscribers.Insert(callback) + } else { + bb.subscribers.Insert(callback) + } } // PublishLifecycleEvent implements the gRPC PublishLifecycleEvent service. @@ -212,6 +220,36 @@ func (bb *besBackend) PublishLifecycleEvent( return &emptypb.Empty{}, eg.Wait() } +func (bb *besBackend) SendEventsToSubscribers(c chan *buildv1.PublishBuildToolEventStreamRequest, subscribers *subscriberList) { + for req := range c { + // Forward the build event to subscribers + if subscribers.head == nil { + continue + } + event := req.GetOrderedBuildEvent().GetEvent() + if event != nil { + bazelEvent := event.GetBazelEvent() + if bazelEvent != nil { + var buildEvent *buildeventstream.BuildEvent = &buildeventstream.BuildEvent{} + err := bazelEvent.UnmarshalTo(buildEvent) + if err != nil { + fmt.Fprintf(os.Stderr, "Error unmarshaling build event %v: %s\n", req.GetOrderedBuildEvent().GetSequenceNumber(), err.Error()) + continue + } + s := subscribers.head + for s != nil { + if err := s.callback(buildEvent, req.GetOrderedBuildEvent().GetSequenceNumber()); err != nil { + bb.errorsMutex.Lock() + bb.errors.Insert(err) + bb.errorsMutex.Unlock() + } + s = s.next + } + } + } + } +} + // PublishBuildToolEventStream implements the gRPC PublishBuildToolEventStream // service. func (bb *besBackend) PublishBuildToolEventStream( @@ -219,88 +257,121 @@ func (bb *besBackend) PublishBuildToolEventStream( ) error { ctx := stream.Context() + const numMultiSends = 10 + + subChan := make(chan *buildv1.PublishBuildToolEventStreamRequest, 1000) + subMultiChan := make(chan *buildv1.PublishBuildToolEventStreamRequest, 1000) + fwdChan := make(chan *buildv1.PublishBuildToolEventStreamRequest, 1000) + ackChan := make(chan *buildv1.PublishBuildToolEventStreamRequest, 1000) + + eg, egCtx := errgroup.WithContext(ctx) + // Setup forwarding proxy streams - eg, ctx := errgroup.WithContext(ctx) for _, bp := range bb.besProxies { - // Make a copy of the BESProxy before passing into the go-routine below. - proxy := bp - err := bp.PublishBuildToolEventStream(ctx, grpc.WaitForReady(false)) + err := bp.PublishBuildToolEventStream(egCtx, grpc.WaitForReady(false)) if err != nil { - fmt.Fprintf(os.Stderr, "Error creating build event stream to %v: %s\n", proxy.Host(), err.Error()) - continue + // If we fail to create the build event stream to a proxy then print out an error but don't fail the GRPC call + fmt.Fprintf(os.Stderr, "Error creating build event stream to %v: %s\n", bp.Host(), err.Error()) } - eg.Go(func() error { - for { - _, err := proxy.Recv() + } + + // Goroutine to receive messages from the Bazel server and send them to processing channels + eg.Go(func() error { + for { + req, err := stream.Recv() + if err != nil { + close(subChan) + close(subMultiChan) + close(fwdChan) + close(ackChan) if err == io.EOF { - break - } - if err != nil { - return fmt.Errorf("error receiving build event stream ack %v: %s\n", proxy.Host(), err.Error()) + return nil } + // If we fail to receive a BES event from bazel server then fail the GRPC call early + // and surface this error back to Bazel; this is over localhost so should generally not + // happen unless something has gone terribly wrong. + return fmt.Errorf("error receiving on build event stream from bazel server: %v", err.Error()) } - return nil - }) - } - defer bb.closeBesProxies() - - for { - // Wait for a build event - req, err := stream.Recv() - if err == io.EOF { - // Close BES proxy streams and wait for acks - bb.closeBesProxies() - return eg.Wait() - } - if err != nil { - return err + subChan <- req + subMultiChan <- req + fwdChan <- req + ackChan <- req } + }) - // Forward the build event to grpc outStreams - for _, bp := range bb.besProxies { - err := bp.Send(req) + // Goroutine to send acknowledgments back to the Bazel server + eg.Go(func() error { + for req := range ackChan { + res := &buildv1.PublishBuildToolEventStreamResponse{ + StreamId: req.OrderedBuildEvent.StreamId, + SequenceNumber: req.OrderedBuildEvent.SequenceNumber, + } + err := stream.Send(res) if err != nil { - fmt.Fprintf(os.Stderr, "Error sending build event to %v: %s\n", bp.Host(), err.Error()) + // If we fail to send an ack back to the bazel server then fail the GRPC call early + // since Bazel will hang waiting for all acks; this is over localhost so should generally not + // happen unless something has gone terribly wrong. + return fmt.Errorf("error sending ack %v to bazel server: %v", res.SequenceNumber, err.Error()) } } + return nil + }) - // Forward the build event to subscribers - event := req.OrderedBuildEvent.Event - if event != nil { - bazelEvent := event.GetBazelEvent() - if bazelEvent != nil { - var buildEvent buildeventstream.BuildEvent - if err := bazelEvent.UnmarshalTo(&buildEvent); err != nil { - return err - } + // Goroutines to process messages and send to subscribers + eg.Go(func() error { bb.SendEventsToSubscribers(subChan, bb.subscribers); return nil }) + for i := 0; i < numMultiSends; i++ { + eg.Go(func() error { bb.SendEventsToSubscribers(subMultiChan, bb.mtSubscribers); return nil }) + } - s := bb.subscribers.head - for s != nil { - if err := s.callback(&buildEvent); err != nil { - bb.errors.Insert(err) + // Goroutines to receive acks from BES proxies + for _, bp := range bb.besProxies { + if !bp.StreamCreated() { + continue + } + proxy := bp // make a copy of the BESProxy before passing into the go-routine below. + eg.Go(func() error { + for { + _, err := proxy.Recv() + if err != nil { + if err != io.EOF { + // If we fail to recv an ack from a proxy then print out an error but don't fail the GRPC call + fmt.Fprintf(os.Stderr, "Error receiving build event stream ack %v: %s\n", proxy.Host(), err.Error()) } - s = s.next + break } } - } - - // Ack the message - res := &buildv1.PublishBuildToolEventStreamResponse{ - StreamId: req.OrderedBuildEvent.StreamId, - SequenceNumber: req.OrderedBuildEvent.SequenceNumber, - } - if err := stream.Send(res); err != nil { - return err - } + return nil + }) } -} -func (bb *besBackend) closeBesProxies() { - bb.closeOnce.Do(func() { + // Goroutine to forward to build event to BES proxies + eg.Go(func() error { + for fwd := range fwdChan { + for _, bp := range bb.besProxies { + if !bp.StreamCreated() { + continue + } + err := bp.Send(fwd) + if err != nil { + // If we fail to send to a proxy then print out an error but don't fail the GRPC call + fmt.Fprintf(os.Stderr, "Error sending build event to %v: %s\n", bp.Host(), err.Error()) + } + } + } for _, bp := range bb.besProxies { - bp.CloseSend() + if !bp.StreamCreated() { + continue + } + if err := bp.CloseSend(); err != nil { + fmt.Fprintf(os.Stderr, "Error closing build event stream to %v: %s\n", bp.Host(), err.Error()) + } } + return nil }) + + err := eg.Wait() + + return err } // SubscriberList is a linked list for the Build Event Protocol event diff --git a/pkg/plugin/system/bep/bes_backend_test.go b/pkg/plugin/system/bep/bes_backend_test.go index eab0d3124..12cefc374 100644 --- a/pkg/plugin/system/bep/bes_backend_test.go +++ b/pkg/plugin/system/bep/bes_backend_test.go @@ -247,7 +247,7 @@ func TestPublishBuildToolEventStream(t *testing.T) { besBackend := &besBackend{} err := besBackend.PublishBuildToolEventStream(eventStream) - g.Expect(err).To(MatchError(expectedErr)) + g.Expect(err).To(MatchError(fmt.Errorf("error receiving on build event stream from bazel server: %v", expectedErr))) }) t.Run("fails when stream.Send fails", func(t *testing.T) { @@ -269,6 +269,12 @@ func TestPublishBuildToolEventStream(t *testing.T) { Recv(). Return(req, nil). Times(1) + eventStream. + EXPECT(). + Recv(). + Return(nil, io.EOF). + Times(1). + After(recv) res := &buildv1.PublishBuildToolEventStreamResponse{ StreamId: req.OrderedBuildEvent.StreamId, SequenceNumber: req.OrderedBuildEvent.SequenceNumber, @@ -278,18 +284,17 @@ func TestPublishBuildToolEventStream(t *testing.T) { EXPECT(). Send(res). Return(expectedErr). - Times(1). - After(recv) + Times(1) eventStream. EXPECT(). Context(). Return(context.Background()). Times(1) - besBackend := &besBackend{subscribers: &subscriberList{}} + besBackend := &besBackend{subscribers: &subscriberList{}, mtSubscribers: &subscriberList{}} err := besBackend.PublishBuildToolEventStream(eventStream) - g.Expect(err).To(MatchError(expectedErr)) + g.Expect(err).To(MatchError(fmt.Errorf("error sending ack %v to bazel server: %v", 1, expectedErr))) }) t.Run("succeeds without subscribers", func(t *testing.T) { @@ -311,29 +316,28 @@ func TestPublishBuildToolEventStream(t *testing.T) { Recv(). Return(req, nil). Times(1) + eventStream. + EXPECT(). + Recv(). + Return(nil, io.EOF). + Times(1). + After(recv) res := &buildv1.PublishBuildToolEventStreamResponse{ StreamId: req.OrderedBuildEvent.StreamId, SequenceNumber: req.OrderedBuildEvent.SequenceNumber, } - send := eventStream. + eventStream. EXPECT(). Send(res). Return(nil). - Times(1). - After(recv) - eventStream. - EXPECT(). - Recv(). - Return(nil, io.EOF). - Times(1). - After(send) + Times(1) eventStream. EXPECT(). Context(). Return(context.Background()). Times(1) - besBackend := &besBackend{subscribers: &subscriberList{}} + besBackend := &besBackend{subscribers: &subscriberList{}, mtSubscribers: &subscriberList{}} err := besBackend.PublishBuildToolEventStream(eventStream) g.Expect(err).To(Not(HaveOccurred())) @@ -361,45 +365,48 @@ func TestPublishBuildToolEventStream(t *testing.T) { Recv(). Return(req, nil). Times(1) + eventStream. + EXPECT(). + Recv(). + Return(nil, io.EOF). + Times(1). + After(recv) res := &buildv1.PublishBuildToolEventStreamResponse{ StreamId: req.OrderedBuildEvent.StreamId, SequenceNumber: req.OrderedBuildEvent.SequenceNumber, } - send := eventStream. + eventStream. EXPECT(). Send(res). Return(nil). - Times(1). - After(recv) - eventStream. - EXPECT(). - Recv(). - Return(nil, io.EOF). - Times(1). - After(send) + Times(1) besBackend := &besBackend{ - subscribers: &subscriberList{}, - errors: &aspecterrors.ErrorList{}, + subscribers: &subscriberList{}, + mtSubscribers: &subscriberList{}, + errors: &aspecterrors.ErrorList{}, } var calledSubscriber1, calledSubscriber2, calledSubscriber3 bool - besBackend.RegisterSubscriber(func(evt *buildeventstream.BuildEvent) error { + besBackend.RegisterSubscriber(func(evt *buildeventstream.BuildEvent, sn int64) error { g.Expect(evt).To(Equal(buildEvent)) + g.Expect(sn).To(Equal(orderedBuildEvent.SequenceNumber)) calledSubscriber1 = true return nil - }) + }, false) expectedSubscriber2Err := fmt.Errorf("error from subscriber 2") - besBackend.RegisterSubscriber(func(evt *buildeventstream.BuildEvent) error { + besBackend.RegisterSubscriber(func(evt *buildeventstream.BuildEvent, sn int64) error { g.Expect(evt).To(Equal(buildEvent)) + g.Expect(sn).To(Equal(orderedBuildEvent.SequenceNumber)) calledSubscriber2 = true return expectedSubscriber2Err - }) + }, false) expectedSubscriber3Err := fmt.Errorf("error from subscriber 3") - besBackend.RegisterSubscriber(func(evt *buildeventstream.BuildEvent) error { + besBackend.RegisterSubscriber(func(evt *buildeventstream.BuildEvent, sn int64) error { g.Expect(evt).To(Equal(buildEvent)) + g.Expect(sn).To(Equal(orderedBuildEvent.SequenceNumber)) calledSubscriber3 = true return expectedSubscriber3Err - }) + }, false) eventStream. EXPECT(). @@ -441,27 +448,27 @@ func TestPublishBuildToolEventStream(t *testing.T) { Recv(). Return(req, nil). Times(1) + eventStream. + EXPECT(). + Recv(). + Return(nil, io.EOF). + Times(1). + After(recv) res := &buildv1.PublishBuildToolEventStreamResponse{ StreamId: req.OrderedBuildEvent.StreamId, SequenceNumber: req.OrderedBuildEvent.SequenceNumber, } - send := eventStream. + eventStream. EXPECT(). Send(res). Return(nil). - Times(1). - After(recv) - eventStream. - EXPECT(). - Recv(). - Return(nil, io.EOF). - Times(1). - After(send) + Times(1) besBackend := &besBackend{ - besProxies: []besproxy.BESProxy{}, - subscribers: &subscriberList{}, - errors: &aspecterrors.ErrorList{}, + besProxies: []besproxy.BESProxy{}, + subscribers: &subscriberList{}, + mtSubscribers: &subscriberList{}, + errors: &aspecterrors.ErrorList{}, } _, egCtx := errgroup.WithContext(ctx) @@ -473,6 +480,11 @@ func TestPublishBuildToolEventStream(t *testing.T) { PublishBuildToolEventStream(egCtx, grpc.WaitForReady(false)). Return(nil). Times(1) + besProxy. + EXPECT(). + StreamCreated(). + Return(true). + Times(3) besProxy. EXPECT(). Send(req). diff --git a/pkg/plugin/system/besproxy/bes_proxy.go b/pkg/plugin/system/besproxy/bes_proxy.go index 6bbcba267..2976a91e8 100644 --- a/pkg/plugin/system/besproxy/bes_proxy.go +++ b/pkg/plugin/system/besproxy/bes_proxy.go @@ -37,6 +37,7 @@ type BESProxy interface { Host() string PublishBuildToolEventStream(ctx context.Context, opts ...grpc.CallOption) error PublishLifecycleEvent(ctx context.Context, req *buildv1.PublishLifecycleEventRequest, opts ...grpc.CallOption) (*emptypb.Empty, error) + StreamCreated() bool Recv() (*buildv1.PublishBuildToolEventStreamResponse, error) Send(req *buildv1.PublishBuildToolEventStreamRequest) error } @@ -87,9 +88,13 @@ func (bp *besProxy) PublishBuildToolEventStream(ctx context.Context, opts ...grp return nil } +func (bp *besProxy) StreamCreated() bool { + return bp.stream != nil +} + func (bp *besProxy) Send(req *buildv1.PublishBuildToolEventStreamRequest) error { if bp.stream == nil { - return nil + return fmt.Errorf("stream to %v not configured", bp.host) } // If we want to mutate the BES events in the future before they are sent out to external consumers, this is the place diff --git a/pkg/plugin/system/system.go b/pkg/plugin/system/system.go index 95c5158cb..c59a745d5 100644 --- a/pkg/plugin/system/system.go +++ b/pkg/plugin/system/system.go @@ -205,7 +205,7 @@ func (ps *pluginSystem) createBesBackend(ctx context.Context, cmd *cobra.Command // Create the BES backend besBackend := bep.NewBESBackend(ctx) for node := ps.plugins.head; node != nil; node = node.next { - besBackend.RegisterSubscriber(node.payload.BEPEventCallback) + besBackend.RegisterSubscriber(node.payload.BEPEventCallback, node.payload.MultiThreaded) } opts := []grpc.ServerOption{ // Bazel doesn't seem to set a maximum send message size, therefore diff --git a/pkg/plugin/types/plugin.go b/pkg/plugin/types/plugin.go index 7ed25baf8..56b212513 100644 --- a/pkg/plugin/types/plugin.go +++ b/pkg/plugin/types/plugin.go @@ -18,9 +18,10 @@ package types // PluginConfig represents a plugin entry in the config file. type PluginConfig struct { - Name string - From string - Version string - LogLevel string - Properties map[string]interface{} + Name string + From string + Version string + LogLevel string + MultiThreadedBuildEvents bool + Properties map[string]interface{} }