From b6ebf4d0807cbc7de5d69b060199a5fa7919ead0 Mon Sep 17 00:00:00 2001 From: irenarindos Date: Wed, 13 Nov 2024 14:25:49 -0500 Subject: [PATCH] refact(server): return worker id from UpsertWorkerStatus --- .../daemon/cluster/handlers/worker_service.go | 12 +++--- .../handlers/workers/worker_service_test.go | 9 ++-- internal/server/repository_worker.go | 34 ++++++--------- internal/server/repository_worker_test.go | 43 +++++++++++-------- internal/server/testing.go | 10 ++++- internal/session/job_session_cleanup_test.go | 2 +- 6 files changed, 60 insertions(+), 50 deletions(-) diff --git a/internal/daemon/cluster/handlers/worker_service.go b/internal/daemon/cluster/handlers/worker_service.go index 2e3209f2a7..824fec2f58 100644 --- a/internal/daemon/cluster/handlers/worker_service.go +++ b/internal/daemon/cluster/handlers/worker_service.go @@ -159,15 +159,15 @@ func (ws *workerServiceServer) Status(ctx context.Context, req *pbs.StatusReques if wStat.GetKeyId() != "" { opts = append(opts, server.WithKeyId(wStat.GetKeyId())) } - wrk, err := serverRepo.UpsertWorkerStatus(ctx, wConf, opts...) + workerId, err := serverRepo.UpsertWorkerStatus(ctx, wConf, opts...) if err != nil { event.WriteError(ctx, op, err, event.WithInfoMsg("error storing worker status")) return &pbs.StatusResponse{}, status.Errorf(codes.Internal, "Error storing worker status: %v", err) } // update storage states - if sbcStates := wStat.GetStorageBucketCredentialStates(); sbcStates != nil && wrk.GetPublicId() != "" { - updateWorkerStorageBucketCredentialStatesFn(ctx, serverRepo, wrk.GetPublicId(), sbcStates) + if sbcStates := wStat.GetStorageBucketCredentialStates(); sbcStates != nil && workerId != "" { + updateWorkerStorageBucketCredentialStatesFn(ctx, serverRepo, workerId, sbcStates) } controllers, err := serverRepo.ListControllers(ctx, server.WithLiveness(time.Duration(ws.livenessTimeToStale.Load()))) @@ -222,7 +222,7 @@ func (ws *workerServiceServer) Status(ctx context.Context, req *pbs.StatusReques ret := &pbs.StatusResponse{ CalculatedUpstreams: responseControllers, - WorkerId: wrk.GetPublicId(), + WorkerId: workerId, AuthorizedWorkers: authorizedWorkerList, AuthorizedDownstreamWorkers: authorizedDownstreams, } @@ -288,11 +288,11 @@ func (ws *workerServiceServer) Status(ctx context.Context, req *pbs.StatusReques return &pbs.StatusResponse{}, status.Errorf(codes.Internal, "Error acquiring repo to query session status: %v", err) } - notActive, err := session.WorkerStatusReport(ctx, sessRepo, connectionRepo, wrk.GetPublicId(), stateReport) + notActive, err := session.WorkerStatusReport(ctx, sessRepo, connectionRepo, workerId, stateReport) if err != nil { return nil, status.Errorf(codes.Internal, "Error comparing state of sessions for worker with public id %q: %v", - wrk.GetPublicId(), err) + workerId, err) } for _, na := range notActive { var connChanges []*pbs.Connection diff --git a/internal/daemon/controller/handlers/workers/worker_service_test.go b/internal/daemon/controller/handlers/workers/worker_service_test.go index d5157e8214..e2271ee545 100644 --- a/internal/daemon/controller/handlers/workers/worker_service_test.go +++ b/internal/daemon/controller/handlers/workers/worker_service_test.go @@ -148,7 +148,7 @@ func TestGet(t *testing.T) { server.WithDescription("test pki worker description"), server.WithTestPkiWorkerAuthorizedKeyId(&pkiWorkerKeyId)) // Add config tags to the created worker - pkiWorker, err = repo.UpsertWorkerStatus(context.Background(), + pkiWorker, err = server.UpsertAndReturnWorker(context.Background(), t, server.NewWorker(pkiWorker.GetScopeId(), server.WithAddress("test pki worker address"), server.WithLocalStorageState(server.AvailableLocalStorageState.String()), @@ -156,9 +156,11 @@ func TestGet(t *testing.T) { Key: "config", Value: "test", })), + repo, server.WithUpdateTags(true), server.WithPublicId(pkiWorker.GetPublicId()), - server.WithKeyId(pkiWorkerKeyId)) + server.WithKeyId(pkiWorkerKeyId), + ) require.NoError(t, err) wantPkiWorker := &pb.Worker{ @@ -201,7 +203,7 @@ func TestGet(t *testing.T) { ) // Add config tags to the created worker - managedPkiWorker, err = repo.UpsertWorkerStatus(context.Background(), + managedPkiWorker, err = server.UpsertAndReturnWorker(context.Background(), t, server.NewWorker(managedPkiWorker.GetScopeId(), server.WithAddress("test managed pki worker address"), server.WithLocalStorageState(server.AvailableLocalStorageState.String()), @@ -209,6 +211,7 @@ func TestGet(t *testing.T) { Key: server.ManagedWorkerTag, Value: "true", })), + repo, server.WithUpdateTags(true), server.WithPublicId(managedPkiWorker.GetPublicId()), server.WithKeyId(managedPkiWorkerKeyId)) diff --git a/internal/server/repository_worker.go b/internal/server/repository_worker.go index c97f95f862..9047709c56 100644 --- a/internal/server/repository_worker.go +++ b/internal/server/repository_worker.go @@ -256,25 +256,25 @@ func ListWorkers(ctx context.Context, reader db.Reader, scopeIds []string, opt . // The WithPublicId, WithKeyId, and WithUpdateTags options are // the only ones used. All others are ignored. // Workers are intentionally not oplogged. -func (r *Repository) UpsertWorkerStatus(ctx context.Context, worker *Worker, opt ...Option) (*Worker, error) { +func (r *Repository) UpsertWorkerStatus(ctx context.Context, worker *Worker, opt ...Option) (string, error) { const op = "server.(Repository).UpsertWorkerStatus" opts := GetOpts(opt...) switch { case worker == nil: - return nil, errors.New(ctx, errors.InvalidParameter, op, "worker is nil") + return "", errors.New(ctx, errors.InvalidParameter, op, "worker is nil") case worker.GetAddress() == "": - return nil, errors.New(ctx, errors.InvalidParameter, op, "worker reported address is empty") + return "", errors.New(ctx, errors.InvalidParameter, op, "worker reported address is empty") case worker.ScopeId == "": - return nil, errors.New(ctx, errors.InvalidParameter, op, "scope id is empty") + return "", errors.New(ctx, errors.InvalidParameter, op, "scope id is empty") case worker.PublicId != "": - return nil, errors.New(ctx, errors.InvalidParameter, op, "worker id is not empty") + return "", errors.New(ctx, errors.InvalidParameter, op, "worker id is not empty") case worker.GetName() == "" && opts.withKeyId == "": - return nil, errors.New(ctx, errors.InvalidParameter, op, "worker keyId and reported name are both empty; one is required") + return "", errors.New(ctx, errors.InvalidParameter, op, "worker keyId and reported name are both empty; one is required") case worker.OperationalState == "": - return nil, errors.New(ctx, errors.InvalidParameter, op, "worker operational state is empty") + return "", errors.New(ctx, errors.InvalidParameter, op, "worker operational state is empty") case worker.LocalStorageState == "": - return nil, errors.New(ctx, errors.InvalidParameter, op, "worker local storage state is empty") + return "", errors.New(ctx, errors.InvalidParameter, op, "worker local storage state is empty") } var workerId string @@ -285,7 +285,7 @@ func (r *Repository) UpsertWorkerStatus(ctx context.Context, worker *Worker, opt case opts.withKeyId != "": workerId, err = r.LookupWorkerIdByKeyId(ctx, opts.withKeyId) if err != nil || workerId == "" { - return nil, errors.Wrap(ctx, err, op, errors.WithMsg("error finding worker by keyId")) + return "", errors.Wrap(ctx, err, op, errors.WithMsg("error finding worker by keyId")) } default: // generating the worker id based off of the scope and name ensures @@ -295,11 +295,10 @@ func (r *Repository) UpsertWorkerStatus(ctx context.Context, worker *Worker, opt // workers and kms workers. workerId, err = NewWorkerIdFromScopeAndName(ctx, worker.GetScopeId(), worker.GetName()) if err != nil || workerId == "" { - return nil, errors.Wrap(ctx, err, op, errors.WithMsg("error creating a worker id")) + return "", errors.Wrap(ctx, err, op, errors.WithMsg("error creating a worker id")) } } - var ret *Worker _, err = r.writer.DoTx( ctx, db.StdRetryCnt, @@ -366,23 +365,14 @@ func (r *Repository) UpsertWorkerStatus(ctx context.Context, worker *Worker, opt } } - wAgg := &workerAggregate{PublicId: workerClone.GetPublicId()} - if err := reader.LookupById(ctx, wAgg); err != nil { - return errors.Wrap(ctx, err, op, errors.WithMsg("error looking up worker aggregate")) - } - ret, err = wAgg.toWorker(ctx) - if err != nil { - return errors.Wrap(ctx, err, op, errors.WithMsg("error converting worker aggregate to worker")) - } - return nil }, ) if err != nil { - return nil, err + return "", err } - return ret, nil + return workerId, nil } // setWorkerConfigTags removes all existing config tags from the same source and worker id diff --git a/internal/server/repository_worker_test.go b/internal/server/repository_worker_test.go index c86ce5ced8..88cf3889ce 100644 --- a/internal/server/repository_worker_test.go +++ b/internal/server/repository_worker_test.go @@ -235,7 +235,7 @@ func TestUpsertWorkerStatus(t *testing.T) { server.WithAddress("address"), server.WithName("config_name1"), server.WithDescription("kms_description1"), ) - worker, err := repo.UpsertWorkerStatus(ctx, wStatus1) + worker, err := server.UpsertAndReturnWorker(ctx, t, wStatus1, repo) require.NoError(t, err) assert.True(t, strings.HasPrefix(worker.GetPublicId(), "w_")) @@ -250,7 +250,7 @@ func TestUpsertWorkerStatus(t *testing.T) { // update again and see updated last status time wStatus2 := server.NewWorker(scope.Global.String(), server.WithAddress("new_address"), server.WithName("config_name1"), server.WithReleaseVersion("test-version")) - worker, err = repo.UpsertWorkerStatus(ctx, wStatus2) + worker, err = server.UpsertAndReturnWorker(ctx, t, wStatus2, repo) require.NoError(t, err) assert.Greater(t, worker.GetLastStatusTime().AsTime(), worker.GetCreateTime().AsTime()) assert.Equal(t, "config_name1", worker.Name) @@ -269,7 +269,7 @@ func TestUpsertWorkerStatus(t *testing.T) { wStatus3 := server.NewWorker(scope.Global.String(), server.WithAddress("new_address"), server.WithName("config_name1"), server.WithOperationalState("shutdown"), server.WithReleaseVersion("Boundary v0.11.0")) - worker, err = repo.UpsertWorkerStatus(ctx, wStatus3) + worker, err = server.UpsertAndReturnWorker(ctx, t, wStatus3, repo) require.NoError(t, err) assert.Greater(t, worker.GetLastStatusTime().AsTime(), worker.GetCreateTime().AsTime()) // Version does not change for status updates @@ -282,7 +282,7 @@ func TestUpsertWorkerStatus(t *testing.T) { server.WithAddress("new_address"), server.WithName("config_name1"), server.WithOperationalState("shutdown"), server.WithReleaseVersion("Boundary v0.11.0"), server.WithLocalStorageState("available")) - worker, err = repo.UpsertWorkerStatus(ctx, wStatus4) + worker, err = server.UpsertAndReturnWorker(ctx, t, wStatus4, repo) require.NoError(t, err) assert.Greater(t, worker.GetLastStatusTime().AsTime(), worker.GetCreateTime().AsTime()) // Version does not change for status updates @@ -303,7 +303,7 @@ func TestUpsertWorkerStatus(t *testing.T) { wStatus1 := server.NewWorker(scope.Global.String(), server.WithAddress("pki_address"), server.WithDescription("pki_description2"), server.WithReleaseVersion("test-version")) - worker, err := repo.UpsertWorkerStatus(ctx, wStatus1, server.WithKeyId(pkiWorkerKeyId), server.WithReleaseVersion("test-version")) + worker, err := server.UpsertAndReturnWorker(ctx, t, wStatus1, repo, server.WithKeyId(pkiWorkerKeyId), server.WithReleaseVersion("test-version")) require.NoError(t, err) assert.True(t, strings.HasPrefix(worker.GetPublicId(), "w_")) @@ -558,8 +558,7 @@ func TestTagUpdatingListing(t *testing.T) { Value: "value2", })) - worker1, err = repo.UpsertWorkerStatus(ctx, wStatus, - server.WithUpdateTags(true)) + worker1, err = server.UpsertAndReturnWorker(ctx, t, wStatus, repo, server.WithUpdateTags(true)) require.NoError(err) assert.Len(t, worker1.CanonicalTags(), 1) assert.ElementsMatch(t, []string{"value1", "value2"}, worker1.CanonicalTags()["tag1"]) @@ -577,13 +576,13 @@ func TestTagUpdatingListing(t *testing.T) { Key: "tag22", Value: "value22", })) - worker1, err = repo.UpsertWorkerStatus(ctx, wStatus) + worker1, err = server.UpsertAndReturnWorker(ctx, t, wStatus, repo) require.NoError(err) assert.Len(t, worker1.CanonicalTags(), 1) assert.ElementsMatch(t, []string{"value1", "value2"}, worker1.CanonicalTags()["tag1"]) // Update tags and test again - worker1, err = repo.UpsertWorkerStatus(ctx, wStatus, server.WithUpdateTags(true)) + worker1, err = server.UpsertAndReturnWorker(ctx, t, wStatus, repo, server.WithUpdateTags(true)) require.NoError(err) assert.Len(t, worker1.CanonicalTags(), 1) assert.ElementsMatch(t, []string{"value21", "value22"}, worker1.CanonicalTags()["tag22"]) @@ -782,13 +781,14 @@ func TestListWorkers_WithActiveWorkers(t *testing.T) { { name: "upsert-worker1-to-shutdown", upsertFn: func() (*server.Worker, error) { - return serversRepo.UpsertWorkerStatus(ctx, + return server.UpsertAndReturnWorker(ctx, t, server.NewWorker(scope.Global.String(), server.WithName(worker1.GetName()), server.WithAddress(worker1.GetAddress()), server.WithOperationalState(server.ShutdownOperationalState.String()), server.WithReleaseVersion("Boundary v.0.11"), - server.WithPublicId(worker1.GetPublicId()))) + server.WithPublicId(worker1.GetPublicId())), + serversRepo) }, wantCnt: 2, wantState: server.ShutdownOperationalState.String(), @@ -796,13 +796,15 @@ func TestListWorkers_WithActiveWorkers(t *testing.T) { { name: "upsert-worker2-to-shutdown", upsertFn: func() (*server.Worker, error) { - return serversRepo.UpsertWorkerStatus(ctx, + workerId, err := serversRepo.UpsertWorkerStatus(ctx, server.NewWorker(scope.Global.String(), server.WithName(worker2.GetName()), server.WithAddress(worker2.GetAddress()), server.WithOperationalState(server.ShutdownOperationalState.String()), server.WithReleaseVersion("Boundary v.0.11"), server.WithPublicId(worker2.GetPublicId()))) + require.NoError(err) + return serversRepo.LookupWorker(ctx, workerId) }, wantCnt: 1, wantState: server.ShutdownOperationalState.String(), @@ -810,13 +812,15 @@ func TestListWorkers_WithActiveWorkers(t *testing.T) { { name: "upsert-worker3-to-shutdown", upsertFn: func() (*server.Worker, error) { - return serversRepo.UpsertWorkerStatus(ctx, + workerId, err := serversRepo.UpsertWorkerStatus(ctx, server.NewWorker(scope.Global.String(), server.WithName(worker3.GetName()), server.WithAddress(worker3.GetAddress()), server.WithOperationalState(server.ShutdownOperationalState.String()), server.WithReleaseVersion("Boundary v.0.11"), server.WithPublicId(worker3.GetPublicId()))) + require.NoError(err) + return serversRepo.LookupWorker(ctx, workerId) }, wantCnt: 0, wantState: server.ShutdownOperationalState.String(), @@ -825,11 +829,13 @@ func TestListWorkers_WithActiveWorkers(t *testing.T) { // Pre 0.11 workers will default to Active name: "upsert-no-release-version-no-state", upsertFn: func() (*server.Worker, error) { - return serversRepo.UpsertWorkerStatus(ctx, + workerId, err := serversRepo.UpsertWorkerStatus(ctx, server.NewWorker(scope.Global.String(), server.WithName(worker3.GetName()), server.WithAddress(worker3.GetAddress())), server.WithPublicId(worker3.GetPublicId())) + require.NoError(err) + return serversRepo.LookupWorker(ctx, workerId) }, wantCnt: 1, wantState: server.ActiveOperationalState.String(), @@ -837,12 +843,14 @@ func TestListWorkers_WithActiveWorkers(t *testing.T) { { // Upsert with active status and no version and expect to get a hit- test backwards compatibility name: "upsert-no-release-version-active-state", upsertFn: func() (*server.Worker, error) { - return serversRepo.UpsertWorkerStatus(ctx, + return server.UpsertAndReturnWorker(ctx, t, server.NewWorker(scope.Global.String(), server.WithName(worker3.GetName()), server.WithAddress(worker3.GetAddress()), server.WithOperationalState(server.ActiveOperationalState.String())), - server.WithPublicId(worker3.GetPublicId())) + serversRepo, + server.WithPublicId(worker3.GetPublicId()), + ) }, wantCnt: 1, wantState: server.ActiveOperationalState.String(), @@ -850,11 +858,12 @@ func TestListWorkers_WithActiveWorkers(t *testing.T) { { // Upsert with unknown status and do not expect to get a hit- test worker create before status name: "upsert-unknown-status", upsertFn: func() (*server.Worker, error) { - return serversRepo.UpsertWorkerStatus(ctx, + return server.UpsertAndReturnWorker(ctx, t, server.NewWorker(scope.Global.String(), server.WithName(worker3.GetName()), server.WithAddress(worker3.GetAddress()), server.WithOperationalState(server.UnknownOperationalState.String())), + serversRepo, server.WithPublicId(worker3.GetPublicId())) }, wantCnt: 0, diff --git a/internal/server/testing.go b/internal/server/testing.go index 91a910c0be..39e2c6131d 100644 --- a/internal/server/testing.go +++ b/internal/server/testing.go @@ -125,7 +125,8 @@ func TestKmsWorker(t *testing.T, conn *db.DB, wrapper wrapping.Wrapper, opt ...O } wrk := NewWorker(scope.Global.String(), opt...) - wrk, err = serversRepo.UpsertWorkerStatus(ctx, wrk) + wrk, err = UpsertAndReturnWorker(ctx, t, wrk, serversRepo) + require.NoError(t, err) require.NoError(t, err) require.NotNil(t, wrk) require.Equal(t, "kms", wrk.Type) @@ -222,3 +223,10 @@ func TestLookupWorkerByName(ctx context.Context, t *testing.T, name string, serv } return nil, nil } + +func UpsertAndReturnWorker(ctx context.Context, t *testing.T, w *Worker, serversRepo *Repository, opt ...Option) (*Worker, error) { + workerId, err := serversRepo.UpsertWorkerStatus(ctx, w, opt...) + require.NoError(t, err) + require.NotEmpty(t, workerId) + return serversRepo.LookupWorker(ctx, workerId) +} diff --git a/internal/session/job_session_cleanup_test.go b/internal/session/job_session_cleanup_test.go index 3e7e8a5284..fe7828cb90 100644 --- a/internal/session/job_session_cleanup_test.go +++ b/internal/session/job_session_cleanup_test.go @@ -278,7 +278,7 @@ func TestCloseConnectionsForDeadWorkers(t *testing.T) { t.Helper() pubId := w.GetPublicId() w.PublicId = "" - wkr, err := serversRepo.UpsertWorkerStatus(ctx, w, server.WithPublicId(pubId)) + wkr, err := server.UpsertAndReturnWorker(ctx, t, w, serversRepo, server.WithPublicId(pubId)) require.NoError(err) return wkr }