Skip to content

Commit

Permalink
fix: improve performance of BES handling for Aspect CLI & Workflows p…
Browse files Browse the repository at this point in the history
…lugin (#7585)

Resolves performance issues with BES event processing in the Aspect CLI
and in the Workflows plugin.

The current implementation was the receiving a single bes event,
forwarding it to Workflows plugin, waiting for response from plugin and
finally sending ack back to the Bazel server for that event before
starting the recv for the next bes event. This was created a bottleneck
since events could not be received and processed and ack'd in parallel.

The fix here is to leverage multiple go routines in the BES backend:

- 1 to recv the grpc BES stream events from the Bazel server
- 10 to forward to Workflows plugin with BEPEventCallback API calls
- 1 to ack the grpc BES events so the Bazel server keeps sending as we
are receiving and processing

Plugins need to be opted in the new behavior via
`.aspect/cli/config.yaml` setting. Plugins that opt-in will need to
handle multi-threaded calls on that method and potentially out of order
events. For this reason, this performance bump is being landed as an
opt-in so customers with custom plugins can update their custom plugin
code accordingly before turning it on.

Airtable will be the first to opt-in after this lands since they are
suffering the most from the bug (60+ seconds of BES processing after the
bazel build completes on some of their builds).

---

### Changes are visible to end-users: yes

- Searched for relevant documentation and updated as needed: yes
- Breaking change (forces users to change their own code or config): no
- Suggested release notes appear below: yes

Workflows now supports faster build event transmission to Aspect CLI
plugins by sending up to 10 build events in parallel via 10 concurrent
BEPEventCallback calls. Plugins can opt-in to faster build event
transmission by setting `multi_threaded_build_events: true` in their
configuration in the `.aspect/cli/config.yaml` configuration file.
Plugins that opt-in must handle multi-threaded BEPEventCallback. If
plugins require build events to be processed in order they should
buffering out-of-order build events and processing the build events in
order of sequence number. The sequence number of a build event can be
obtained via a new 2nd parameter of the BEPEventCallback function:
`BEPEventCallback(event *buildeventstream.BuildEvent, sequenceNumber
int64)`

### Test plan

- Covered by existing test cases

GitOrigin-RevId: e4fa5c4d4d1866b282b56fdbc23d8ad5d171c253
  • Loading branch information
gregmagolan authored and jbedard committed Dec 13, 2024
1 parent dbdc5bb commit 45d04ed
Show file tree
Hide file tree
Showing 13 changed files with 249 additions and 146 deletions.
2 changes: 1 addition & 1 deletion pkg/aspect/lint/bep.go
Original file line number Diff line number Diff line change
Expand Up @@ -127,7 +127,7 @@ func parseLinterMnemonicFromFilename(filename string) string {
return s[len(s)-2]
}

func (runner *LintBEPHandler) bepEventCallback(event *buildeventstream.BuildEvent) error {
func (runner *LintBEPHandler) bepEventCallback(event *buildeventstream.BuildEvent, sn int64) error {
switch event.Payload.(type) {

case *buildeventstream.BuildEvent_WorkspaceInfo:
Expand Down
2 changes: 1 addition & 1 deletion pkg/aspect/lint/lint.go
Original file line number Diff line number Diff line change
Expand Up @@ -243,7 +243,7 @@ lint:
}

lintBEPHandler = newLintBEPHandler(workspaceRoot, besCompleted)
besBackend.RegisterSubscriber(lintBEPHandler.bepEventCallback)
besBackend.RegisterSubscriber(lintBEPHandler.bepEventCallback, false)
}

if postTerminateArgs != nil {
Expand Down
12 changes: 7 additions & 5 deletions pkg/aspect/root/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -429,14 +429,16 @@ func UnmarshalPluginConfig(pluginsConfig interface{}) ([]types.PluginConfig, err

version, _ := pluginsMap["version"].(string)
logLevel, _ := pluginsMap["log_level"].(string)
multi_threaded_build_events, _ := pluginsMap["multi_threaded_build_events"].(bool)
properties, _ := pluginsMap["properties"].(map[string]interface{})

plugins = append(plugins, types.PluginConfig{
Name: name,
From: from,
Version: version,
LogLevel: logLevel,
Properties: properties,
Name: name,
From: from,
Version: version,
LogLevel: logLevel,
MultiThreadedBuildEvents: multi_threaded_build_events,
Properties: properties,
})
}

Expand Down
6 changes: 4 additions & 2 deletions pkg/plugin/client/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -158,8 +158,9 @@ func (c *clientFactory) New(aspectplugin types.PluginConfig, streams ioutils.Str
}

res := &PluginInstance{
Plugin: rawplugin.(plugin.Plugin),
Provider: goclient,
Plugin: rawplugin.(plugin.Plugin),
Provider: goclient,
MultiThreaded: aspectplugin.MultiThreadedBuildEvents,
}

if customCommandExecutor, ok := rawplugin.(CustomCommandExecutor); ok {
Expand All @@ -180,6 +181,7 @@ type Provider interface {
// as any associated objects or metadata.
type PluginInstance struct {
plugin.Plugin
MultiThreaded bool
Provider
CustomCommandExecutor
}
Expand Down
6 changes: 3 additions & 3 deletions pkg/plugin/sdk/v1alpha4/plugin/grpc.go
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,7 @@ func (m *GRPCServer) BEPEventCallback(
ctx context.Context,
req *proto.BEPEventCallbackReq,
) (*proto.BEPEventCallbackRes, error) {
return &proto.BEPEventCallbackRes{}, m.Impl.BEPEventCallback(req.Event)
return &proto.BEPEventCallbackRes{}, m.Impl.BEPEventCallback(req.Event, req.SequenceNumber)
}

// Setup translates the gRPC call to the Plugin Setup implementation.
Expand Down Expand Up @@ -189,8 +189,8 @@ var _ Plugin = (*GRPCClient)(nil)

// BEPEventCallback is called from the Core to execute the Plugin
// BEPEventCallback.
func (m *GRPCClient) BEPEventCallback(event *buildeventstream.BuildEvent) error {
_, err := m.client.BEPEventCallback(context.Background(), &proto.BEPEventCallbackReq{Event: event})
func (m *GRPCClient) BEPEventCallback(event *buildeventstream.BuildEvent, sn int64) error {
_, err := m.client.BEPEventCallback(context.Background(), &proto.BEPEventCallbackReq{Event: event, SequenceNumber: sn})
return err
}

Expand Down
4 changes: 2 additions & 2 deletions pkg/plugin/sdk/v1alpha4/plugin/interface.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ import (

// Plugin determines how an aspect Plugin should be implemented.
type Plugin interface {
BEPEventCallback(event *buildeventstream.BuildEvent) error
BEPEventCallback(event *buildeventstream.BuildEvent, sn int64) error
CustomCommands() ([]*Command, error)
PostBuildHook(
isInteractiveMode bool,
Expand Down Expand Up @@ -88,7 +88,7 @@ func (*Base) Setup(*SetupConfig) error {
}

// BEPEventCallback satisfies Plugin.BEPEventCallback.
func (*Base) BEPEventCallback(*buildeventstream.BuildEvent) error {
func (*Base) BEPEventCallback(*buildeventstream.BuildEvent, int64) error {
return nil
}

Expand Down
9 changes: 9 additions & 0 deletions pkg/plugin/sdk/v1alpha4/proto/BUILD.bazel
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
load("@io_bazel_rules_go//go:def.bzl", "go_library")
load("@io_bazel_rules_go//proto:def.bzl", "go_proto_library")
load("@rules_proto//proto:defs.bzl", "proto_library")
load("//bazel/go:write_go_generated_source_files.bzl", "write_go_generated_source_files")

# gazelle:exclude dummy.go

Expand All @@ -20,6 +21,14 @@ go_proto_library(
deps = ["//bazel/buildeventstream"],
)

write_go_generated_source_files(
name = "write_pb_go",
src = ":proto_go_proto",
output_files = [
"plugin.pb.go",
],
)

go_library(
name = "proto",
embed = [":proto_go_proto"],
Expand Down
1 change: 1 addition & 0 deletions pkg/plugin/sdk/v1alpha4/proto/plugin.proto
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ service Plugin {

message BEPEventCallbackReq {
build_event_stream.BuildEvent event = 1;
int64 sequence_number = 2;
}

message BEPEventCallbackRes {}
Expand Down
Loading

0 comments on commit 45d04ed

Please sign in to comment.