Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add configurable pubsub package so we can increase the number of publishers on topics #261

Closed
wants to merge 46 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
46 commits
Select commit Hold shift + click to select a range
883ff61
Use gcppubsub client when gcp pubsub is used
Hamishpk Oct 18, 2023
908f968
fix build files
Hamishpk Oct 18, 2023
6e4f1ca
cleanup
Hamishpk Oct 18, 2023
4892097
Use url opener rather than openTopic
Hamishpk Oct 19, 2023
f19013b
Add metric
Hamishpk Oct 19, 2023
67f84fe
fix lint
Hamishpk Oct 19, 2023
73ec819
Add failure to publish on message metrics
Hamishpk Oct 19, 2023
8a3ad33
use new pubsub opts in api server
Hamishpk Oct 20, 2023
0817754
Add and implement api pubsub opts
Hamishpk Oct 20, 2023
9a3536e
Add batch size and pollers to test struct
Hamishpk Oct 20, 2023
d8b020d
Fix tests
Hamishpk Oct 20, 2023
6bd56c1
Move pubsub opts to api package and actually make things configurable
Hamishpk Oct 23, 2023
ac22373
Fix subscription name when instance name isn't passed
Hamishpk Oct 23, 2023
4397456
make configurable pubsub package
Hamishpk Oct 23, 2023
61c0f51
add topic opts
Hamishpk Oct 23, 2023
f547bc8
lint
Hamishpk Oct 23, 2023
5eb1286
fix import
Hamishpk Oct 23, 2023
5bcf832
Remove topic config options in favour of passing them in the url
Hamishpk Oct 23, 2023
5701d14
Group pubsub opts for api server and add metrics related to failing t…
Hamishpk Oct 26, 2023
defae24
Generate release (#264)
Hamishpk Oct 26, 2023
c2e82d5
Correctly construct subscript name in api server (#265)
Hamishpk Oct 26, 2023
1760ac2
Fix bug in api server (#266)
Hamishpk Oct 26, 2023
b9ca40a
Update gocloud to v0.34.0 (#267)
Hamishpk Oct 26, 2023
6554703
Feat: Add rate limit to Redis client (#268)
Garbett1 Oct 29, 2023
e04ab39
make configurable pubsub package so we can configure the number of
Hamishpk Oct 18, 2023
af61dde
Group pubsub opts for api server and add metrics related to failing t…
Hamishpk Oct 26, 2023
81d9fe3
Randomly offset retention and expiration time. (#270)
fische Nov 10, 2023
f4bee82
Don't use Allow, because that consumes tokens (#272)
Garbett1 Nov 10, 2023
e2bd815
Fix runlocal (#271)
Tatskaari Nov 10, 2023
4634d00
Start receiving from queue only once we got all inflight executions. …
fische Nov 14, 2023
a75c4b8
Add GCP Cloud profiler support and update Go to 1.21 (#274)
Garbett1 Jan 6, 2024
f18e595
Bump to 11.7.0 (#275)
Garbett1 Jan 8, 2024
90fd7e8
Update remote apis to correct (#276)
Garbett1 Jan 15, 2024
51d669d
Set lowercase names for the profiler to work. (#277)
Garbett1 Feb 12, 2024
226af38
Increase timeout for UploadIfMissing (#278)
Hamishpk Feb 14, 2024
8365834
Set limiter before context in upload one (#279)
Hamishpk Feb 16, 2024
2c8ba3b
Bump GHA versions (#280)
peterebden Feb 19, 2024
9cc4d69
Periodically delete jobs in Mettle api server (#281)
Hamishpk Feb 21, 2024
4004669
Don't stop delete jobs ticker (#282)
Hamishpk Feb 22, 2024
7921cc0
Refactor gauge strategy (#284)
Garbett1 Mar 4, 2024
b5ca967
Reduce verbosity of api server logs (#286)
Hamishpk Mar 5, 2024
c38775b
Make startup logs more descriptive in elan (#288)
Hamishpk Mar 5, 2024
6955f2f
Reduce verbosity of worker logs (#287)
isobelormiston Mar 5, 2024
cad62f0
Make platform details optional (#285)
Garbett1 Mar 6, 2024
9c1ea8c
Update job deletion logic + propagate update times (#283)
Garbett1 Mar 6, 2024
2efe507
make configurable pubsub package
Hamishpk Oct 18, 2023
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 6 additions & 7 deletions .github/workflows/codeql-analysis.yml
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,7 @@ on:
jobs:
analyze:
name: Analyze
# TODO(jpoole): set this back to ubuntu-latest once https://github.com/actions/virtual-environments/issues/1816 lands
runs-on: ubuntu-20.04
runs-on: ubuntu-latest

strategy:
fail-fast: false
Expand All @@ -31,26 +30,26 @@ jobs:

steps:
- name: Checkout repository
uses: actions/checkout@v2
uses: actions/checkout@v4
with:
# We must fetch at least the immediate parents so that if this is
# a pull request then we can checkout the head.
fetch-depth: 2

# Initializes the CodeQL tools for scanning.
- name: Initialize CodeQL
uses: github/codeql-action/init@v1
uses: github/codeql-action/init@v2
with:
languages: ${{ matrix.language }}
# If you wish to specify custom queries, you can do so here or in a config file.
# By default, queries listed here will override any specified in a config file.
# By default, queries listed here will override any specified in a config file.
# Prefix the list here with "+" to use these queries and those in the config file.
# queries: ./path/to/local/query, your-org/your-repo/queries@main

# Autobuild attempts to build any compiled languages (C/C++, C#, or Java).
# If this step fails, then you should remove it and run the build manually (see below)
- name: Autobuild
uses: github/codeql-action/autobuild@v1
uses: github/codeql-action/autobuild@v2

# ℹ️ Command-line programs to run using the OS shell.
# 📚 https://git.io/JvXDl
Expand All @@ -64,4 +63,4 @@ jobs:
# make release

- name: Perform CodeQL Analysis
uses: github/codeql-action/analyze@v1
uses: github/codeql-action/analyze@v2
10 changes: 5 additions & 5 deletions .github/workflows/golangci-lint.yml
Original file line number Diff line number Diff line change
Expand Up @@ -11,12 +11,12 @@ jobs:
name: lint
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v2
- uses: actions/setup-go@v2
- uses: actions/checkout@v4
- uses: actions/setup-go@v4
with:
go-version: '^1.20'
go-version: '^1.21'
- name: golangci-lint
uses: golangci/golangci-lint-action@v2
uses: golangci/golangci-lint-action@v4
with:
version: v1.52.0
version: v1.55.2
args: cli/... discern/... elan/... flair/... grpcutil/... lucidity/... mettle/... purity/... rexclient/... zeal/...
12 changes: 6 additions & 6 deletions .github/workflows/release.yml
Original file line number Diff line number Diff line change
Expand Up @@ -7,9 +7,9 @@ jobs:
image: thoughtmachine/please-servers:20230319
steps:
- name: Checkout code
uses: actions/checkout@v2
uses: actions/checkout@v4
- name: Cache
uses: actions/cache@v2
uses: actions/cache@v4
with:
path: plz-out/gen/third_party/go
key: ${{ runner.os }}-go-${{ hashFiles('third_party/go/BUILD', 'scripts/Dockerfile') }}
Expand All @@ -19,7 +19,7 @@ jobs:
run: ./pleasew test --profile ci -p -v 2 --exclude //tests/...
- name: Archive logs
if: always()
uses: actions/upload-artifact@v2
uses: actions/upload-artifact@v4
with:
name: logs
path: |
Expand All @@ -28,13 +28,13 @@ jobs:
needs: [test]
runs-on: ubuntu-latest
container:
image: thoughtmachine/please-servers:20220816
image: thoughtmachine/please-servers:20230319
if: github.ref == 'refs/heads/master'
steps:
- name: Checkout code
uses: actions/checkout@v2
uses: actions/checkout@v4
- name: Cache
uses: actions/cache@v2
uses: actions/cache@v4
with:
path: plz-out/gen/third_party/go
key: ${{ runner.os }}-go-${{ hashFiles('third_party/go/BUILD', 'scripts/Dockerfile') }}
Expand Down
1 change: 1 addition & 0 deletions .golangci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ issues:
- SA5008 # Warns for duplicate struct tags which are meaningful to go-flags
- unslice # This may be useful in some places.
- ifElseChain # Generally don't agree.
- indent-error-flow # Similar to above, seems to be making questionable choices
- appendAssign
- halp # Make misspell be quiet about this.
- exitAfterDefer # Potentially useful but not in any cases it fires right now.
Expand Down
3 changes: 2 additions & 1 deletion .plzconfig.ci
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,9 @@ dir = .plz-cache
dircompress = true

[plugin "go"]
target = //plugins:go
defaultstatic = true
gotool = //third_party/go:system_toolchain|go
gotool = //third_party/go:toolchain|go
coverageredesign = true

[Plugin "proto"]
Expand Down
2 changes: 1 addition & 1 deletion BUILD
Original file line number Diff line number Diff line change
Expand Up @@ -8,4 +8,4 @@ filegroup(
name = "go.mod",
srcs = ["go.mod"],
visibility = ["PUBLIC"],
)
)
78 changes: 78 additions & 0 deletions ChangeLog
Original file line number Diff line number Diff line change
@@ -1,3 +1,81 @@
Version 11.9.0
--------------
* Revert job deletion change to match previous behaviour

Version 11.8.5
--------------
* Fix bug for periodically deleting jobs

Version 11.8.4
--------------
* delete in memory jobs periodically in one routine

Version 11.7.4
--------------
* Set limiter before context in elan client

Version 11.7.3
--------------
* Bump timeout for uploadIfMissing to handle actions with large output
files

Version 11.7.2
--------------
* Lower case the service names for the GCP Cloud Profiler to work

Version 11.7.1
--------------
* Use the correct commit of the forked SDK

Version 11.7.0
--------------
* Update to Go 1.21
* Add support for Google Cloud Profiler

Version 11.6.3
--------------
* Start receiving from queue only once we got all inflight executions.
This should fix a potential data race on init.

Version 11.6.2
--------------
* Error token bucket was mistakenly consuming with `Allow()`. Refactored to make token bucket behaviour correct.

Version 11.6.1
--------------
* Randomly offset retentionTime and expiryTime for each job so the
goroutines don't all wake up at the same time. This is a bit of a hack,
a proper fix will be raised soon.

Version 11.6.0
--------------
* Add rate-limiter to Redis client so we don't fail slowly

Version 11.5.0
--------------
* Update gocloud to v0.34.0 to support setting max batchsize on topics

Version 11.4.2
--------------
* Fix bug in api server

Version 11.4.2
--------------
* Fix bug in api server

Version 11.4.0
--------------
* Add api server pubsub opts
* Add metrics for failed publish requests
* Add no execution in progress metric
* Add preresponse publish duration metric

Version 11.3.0
--------------
* Add api server pubsub opts
* Add metrics for failed publish requests
* Add no execution in progress metric

Version 11.3.0
--------------
* Add a way to specify the batch size for the response subscription in
Expand Down
2 changes: 1 addition & 1 deletion VERSION
Original file line number Diff line number Diff line change
@@ -1 +1 @@
11.3.0
11.9.0
3 changes: 2 additions & 1 deletion cli/BUILD
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ go_library(
"///third_party/go/github.com_peterebden_go-cli-init_v4//logging",
"///third_party/go/github.com_thought-machine_http-admin//:http-admin",
"///third_party/go/go.uber.org_automaxprocs//maxprocs",
"///third_party/go/cloud.google.com_go_profiler//:profiler",
],
)

Expand All @@ -16,6 +17,6 @@ go_test(
srcs = ["cli_test.go"],
deps = [
":cli",
"///third_party/go/github.com_stretchr_testify//assert",
"///third_party/go/github.com_stretchr_testify//assert",
],
)
33 changes: 26 additions & 7 deletions cli/cli.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,8 +10,10 @@ import (
pb "github.com/bazelbuild/remote-apis/build/bazel/remote/execution/v2"
"github.com/peterebden/go-cli-init/v4/flags"
"github.com/peterebden/go-cli-init/v4/logging"
"github.com/thought-machine/http-admin"
admin "github.com/thought-machine/http-admin"
"go.uber.org/automaxprocs/maxprocs"

"cloud.google.com/go/profiler"
)

var log = logging.MustGetLogger()
Expand All @@ -29,24 +31,41 @@ type LoggingOpts struct {
}

// AdminOpts is a re-export of the admin type so servers don't need to import it directly.
type AdminOpts = admin.Opts
type AdminOpts struct {
Admin admin.Opts
EnableGcpProfiling bool `long:"gcp_profiling" description:"Enable pushing profiles to GCP Cloud profiling." env:"ADMIN_GCP_PROFILING"`
}

// ParseFlagsOrDie parses incoming flags and sets up logging etc.
func ParseFlagsOrDie(name string, opts interface{}, loggingOpts *LoggingOpts) (string, logging.LogLevelInfo) {
cmd := flags.ParseFlagsOrDie(name, opts)
info := logging.MustInitStructuredLogging(loggingOpts.Verbosity, loggingOpts.FileVerbosity, loggingOpts.LogFile, loggingOpts.Structured)
if _, err := maxprocs.Set(maxprocs.Logger(log.Notice), maxprocs.Min(1)); err != nil {
log.Error("Failed to set GOMAXPROCS: %s", err)
log.Errorf("Failed to set GOMAXPROCS: %s", err)
}
return cmd, info
}

// ServeAdmin starts the admin HTTP server.
// It will block forever so the caller may well want to use a goroutine.
func ServeAdmin(opts AdminOpts, info logging.LogLevelInfo) {
opts.Logger = logging.MustGetLoggerNamed("github.com.thought-machine.http-admin")
opts.LogInfo = info
go admin.Serve(opts)
func ServeAdmin(serviceName string, opts AdminOpts, info logging.LogLevelInfo) {
opts.Admin.Logger = logging.MustGetLoggerNamed("github.com.thought-machine.http-admin")
opts.Admin.LogInfo = info
if opts.EnableGcpProfiling {
setupProfiling(strings.ToLower(serviceName))
}
go admin.Serve(opts.Admin)
}

func setupProfiling(serviceName string) {
cfg := profiler.Config{
Service: serviceName,
}

// Profiler initialization, best done as early as possible.
if err := profiler.Start(cfg); err != nil {
log.Warningf("Failed to set up profiling, continuing anyway: %s", err)
}
}

// An Action represents a combined hash / size pair written like
Expand Down
2 changes: 1 addition & 1 deletion elan/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,6 @@ modes are intended for testing only.

func main() {
_, info := cli.ParseFlagsOrDie("Elan", &opts, &opts.Logging)
go cli.ServeAdmin(opts.Admin, info)
go cli.ServeAdmin("elan", opts.Admin, info)
rpc.ServeForever(opts.GRPC, opts.Storage, opts.Parallelism, opts.DirCacheSize, int64(opts.KnownBlobCacheSize))
}
8 changes: 3 additions & 5 deletions elan/rpc/BUILD
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@ go_library(
"///third_party/go/github.com_klauspost_compress//zstd",
"///third_party/go/github.com_peterebden_go-cli-init_v4//logging",
"///third_party/go/github.com_prometheus_client_golang//prometheus",
"//proto/purity",
"///third_party/go/gocloud.dev//blob",
"///third_party/go/gocloud.dev//blob/fileblob",
"///third_party/go/gocloud.dev//blob/gcsblob",
Expand All @@ -32,10 +31,9 @@ go_library(
"///third_party/go/google.golang.org_api//googleapi",
"///third_party/go/google.golang.org_genproto_googleapis_bytestream//:bytestream",
"///third_party/go/google.golang.org_genproto_googleapis_rpc//status",
"///third_party/go/google.golang.org_grpc//codes",
"///third_party/go/google.golang.org_grpc//health/grpc_health_v1",
"///third_party/go/google.golang.org_grpc//status",
"//grpcutil",
"//proto/purity",
"//rexclient",
"//third_party/go:grpc",
],
Expand All @@ -48,10 +46,10 @@ go_test(
deps = [
":rpc",
"///third_party/go/github.com_klauspost_compress//zstd",
"///third_party/go/github.com_stretchr_testify//assert",
"///third_party/go/github.com_stretchr_testify//require",
"///third_party/go/google.golang.org_genproto_googleapis_bytestream//:bytestream",
"//grpcutil",
"///third_party/go/github.com_stretchr_testify//assert",
"///third_party/go/github.com_stretchr_testify//require",
],
)

Expand Down
4 changes: 2 additions & 2 deletions elan/rpc/eclient.go
Original file line number Diff line number Diff line change
Expand Up @@ -97,10 +97,10 @@ func (e *elanClient) uploadOne(entry *uploadinfo.Entry, compressor pb.Compressor
}
compressed := compressor != pb.Compressor_IDENTITY
key := e.s.compressedKey("cas", entry.Digest.ToProto(), compressed)
ctx, cancel := context.WithTimeout(context.Background(), e.timeout)
defer cancel()
e.s.limiter <- struct{}{}
defer func() { <-e.s.limiter }()
ctx, cancel := context.WithTimeout(context.Background(), e.timeout)
defer cancel()
if len(entry.Contents) > 0 {
if compressed {
entry.Contents = e.s.compressor.EncodeAll(entry.Contents, make([]byte, 0, entry.Digest.Size))
Expand Down
5 changes: 3 additions & 2 deletions elan/rpc/rclient.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,9 +3,10 @@ package rpc
import (
"context"
"fmt"
"github.com/prometheus/client_golang/prometheus"
"time"

"github.com/prometheus/client_golang/prometheus"

"github.com/bazelbuild/remote-apis-sdks/go/pkg/client"
"github.com/bazelbuild/remote-apis-sdks/go/pkg/digest"
"github.com/bazelbuild/remote-apis-sdks/go/pkg/uploadinfo"
Expand Down Expand Up @@ -67,7 +68,7 @@ func (r *remoteClient) UpdateActionResult(req *pb.UpdateActionResultRequest) (*p

func (r *remoteClient) UploadIfMissing(entries []*uploadinfo.Entry, compressors []pb.Compressor_Value) error {
defer observeTime(time.Now(), "UploadIfMissing")
ctx, cnx := context.WithTimeout(context.Background(), time.Minute*5)
ctx, cnx := context.WithTimeout(context.Background(), time.Minute*10)
defer cnx()
_, _, err := r.c.UploadIfMissing(ctx, entries...)
return err
Expand Down
6 changes: 4 additions & 2 deletions elan/rpc/rpc.go
Original file line number Diff line number Diff line change
Expand Up @@ -185,7 +185,7 @@ func mustCache(size int64) *ristretto.Cache {
BufferItems: 64, // recommended by upstream
})
if err != nil {
log.Fatalf("Failed to construct cache: %s", err)
log.Fatalf("Failed to construct in memory cache: %s", err)
}
return cache
}
Expand Down Expand Up @@ -652,7 +652,9 @@ func (s *server) writeBlob(ctx context.Context, prefix string, digest *pb.Digest
s.limiter <- struct{}{}
defer func() { <-s.limiter }()
start := time.Now()
defer writeLatencies.Observe(time.Since(start).Seconds())
defer func() {
writeLatencies.Observe(time.Since(start).Seconds())
}()
ctx, cancel := context.WithCancel(ctx)
defer cancel()
w, err := s.bucket.NewWriter(ctx, key, &blob.WriterOptions{BufferSize: s.bufferSize(digest)})
Expand Down
2 changes: 1 addition & 1 deletion elan/rpc/storage.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ type bucket interface {
func mustOpenStorage(url string) bucket {
bucket, err := blob.OpenBucket(context.Background(), url)
if err != nil {
log.Fatalf("Failed to open storage %s: %v", url, err)
log.Fatalf("Failed to open storage bucket %s: %v", url, err)
}

var gcsClient *storage.Client
Expand Down
Loading