Skip to content

Commit

Permalink
refact(server): return worker id from UpsertWorkerStatus
Browse files Browse the repository at this point in the history
  • Loading branch information
irenarindos committed Nov 13, 2024
1 parent 8f7ad8f commit b6ebf4d
Show file tree
Hide file tree
Showing 6 changed files with 60 additions and 50 deletions.
12 changes: 6 additions & 6 deletions internal/daemon/cluster/handlers/worker_service.go
Original file line number Diff line number Diff line change
Expand Up @@ -159,15 +159,15 @@ func (ws *workerServiceServer) Status(ctx context.Context, req *pbs.StatusReques
if wStat.GetKeyId() != "" {
opts = append(opts, server.WithKeyId(wStat.GetKeyId()))
}
wrk, err := serverRepo.UpsertWorkerStatus(ctx, wConf, opts...)
workerId, err := serverRepo.UpsertWorkerStatus(ctx, wConf, opts...)
if err != nil {
event.WriteError(ctx, op, err, event.WithInfoMsg("error storing worker status"))
return &pbs.StatusResponse{}, status.Errorf(codes.Internal, "Error storing worker status: %v", err)
}

// update storage states
if sbcStates := wStat.GetStorageBucketCredentialStates(); sbcStates != nil && wrk.GetPublicId() != "" {
updateWorkerStorageBucketCredentialStatesFn(ctx, serverRepo, wrk.GetPublicId(), sbcStates)
if sbcStates := wStat.GetStorageBucketCredentialStates(); sbcStates != nil && workerId != "" {
updateWorkerStorageBucketCredentialStatesFn(ctx, serverRepo, workerId, sbcStates)
}

controllers, err := serverRepo.ListControllers(ctx, server.WithLiveness(time.Duration(ws.livenessTimeToStale.Load())))
Expand Down Expand Up @@ -222,7 +222,7 @@ func (ws *workerServiceServer) Status(ctx context.Context, req *pbs.StatusReques

ret := &pbs.StatusResponse{
CalculatedUpstreams: responseControllers,
WorkerId: wrk.GetPublicId(),
WorkerId: workerId,
AuthorizedWorkers: authorizedWorkerList,
AuthorizedDownstreamWorkers: authorizedDownstreams,
}
Expand Down Expand Up @@ -288,11 +288,11 @@ func (ws *workerServiceServer) Status(ctx context.Context, req *pbs.StatusReques
return &pbs.StatusResponse{}, status.Errorf(codes.Internal, "Error acquiring repo to query session status: %v", err)
}

notActive, err := session.WorkerStatusReport(ctx, sessRepo, connectionRepo, wrk.GetPublicId(), stateReport)
notActive, err := session.WorkerStatusReport(ctx, sessRepo, connectionRepo, workerId, stateReport)
if err != nil {
return nil, status.Errorf(codes.Internal,
"Error comparing state of sessions for worker with public id %q: %v",
wrk.GetPublicId(), err)
workerId, err)
}
for _, na := range notActive {
var connChanges []*pbs.Connection
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -148,17 +148,19 @@ func TestGet(t *testing.T) {
server.WithDescription("test pki worker description"),
server.WithTestPkiWorkerAuthorizedKeyId(&pkiWorkerKeyId))
// Add config tags to the created worker
pkiWorker, err = repo.UpsertWorkerStatus(context.Background(),
pkiWorker, err = server.UpsertAndReturnWorker(context.Background(), t,
server.NewWorker(pkiWorker.GetScopeId(),
server.WithAddress("test pki worker address"),
server.WithLocalStorageState(server.AvailableLocalStorageState.String()),
server.WithWorkerTags(&server.Tag{
Key: "config",
Value: "test",
})),
repo,
server.WithUpdateTags(true),
server.WithPublicId(pkiWorker.GetPublicId()),
server.WithKeyId(pkiWorkerKeyId))
server.WithKeyId(pkiWorkerKeyId),
)
require.NoError(t, err)

wantPkiWorker := &pb.Worker{
Expand Down Expand Up @@ -201,14 +203,15 @@ func TestGet(t *testing.T) {
)

// Add config tags to the created worker
managedPkiWorker, err = repo.UpsertWorkerStatus(context.Background(),
managedPkiWorker, err = server.UpsertAndReturnWorker(context.Background(), t,
server.NewWorker(managedPkiWorker.GetScopeId(),
server.WithAddress("test managed pki worker address"),
server.WithLocalStorageState(server.AvailableLocalStorageState.String()),
server.WithWorkerTags(&server.Tag{
Key: server.ManagedWorkerTag,
Value: "true",
})),
repo,
server.WithUpdateTags(true),
server.WithPublicId(managedPkiWorker.GetPublicId()),
server.WithKeyId(managedPkiWorkerKeyId))
Expand Down
34 changes: 12 additions & 22 deletions internal/server/repository_worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -256,25 +256,25 @@ func ListWorkers(ctx context.Context, reader db.Reader, scopeIds []string, opt .
// The WithPublicId, WithKeyId, and WithUpdateTags options are
// the only ones used. All others are ignored.
// Workers are intentionally not oplogged.
func (r *Repository) UpsertWorkerStatus(ctx context.Context, worker *Worker, opt ...Option) (*Worker, error) {
func (r *Repository) UpsertWorkerStatus(ctx context.Context, worker *Worker, opt ...Option) (string, error) {
const op = "server.(Repository).UpsertWorkerStatus"

opts := GetOpts(opt...)
switch {
case worker == nil:
return nil, errors.New(ctx, errors.InvalidParameter, op, "worker is nil")
return "", errors.New(ctx, errors.InvalidParameter, op, "worker is nil")
case worker.GetAddress() == "":
return nil, errors.New(ctx, errors.InvalidParameter, op, "worker reported address is empty")
return "", errors.New(ctx, errors.InvalidParameter, op, "worker reported address is empty")
case worker.ScopeId == "":
return nil, errors.New(ctx, errors.InvalidParameter, op, "scope id is empty")
return "", errors.New(ctx, errors.InvalidParameter, op, "scope id is empty")
case worker.PublicId != "":
return nil, errors.New(ctx, errors.InvalidParameter, op, "worker id is not empty")
return "", errors.New(ctx, errors.InvalidParameter, op, "worker id is not empty")
case worker.GetName() == "" && opts.withKeyId == "":
return nil, errors.New(ctx, errors.InvalidParameter, op, "worker keyId and reported name are both empty; one is required")
return "", errors.New(ctx, errors.InvalidParameter, op, "worker keyId and reported name are both empty; one is required")
case worker.OperationalState == "":
return nil, errors.New(ctx, errors.InvalidParameter, op, "worker operational state is empty")
return "", errors.New(ctx, errors.InvalidParameter, op, "worker operational state is empty")
case worker.LocalStorageState == "":
return nil, errors.New(ctx, errors.InvalidParameter, op, "worker local storage state is empty")
return "", errors.New(ctx, errors.InvalidParameter, op, "worker local storage state is empty")
}

var workerId string
Expand All @@ -285,7 +285,7 @@ func (r *Repository) UpsertWorkerStatus(ctx context.Context, worker *Worker, opt
case opts.withKeyId != "":
workerId, err = r.LookupWorkerIdByKeyId(ctx, opts.withKeyId)
if err != nil || workerId == "" {
return nil, errors.Wrap(ctx, err, op, errors.WithMsg("error finding worker by keyId"))
return "", errors.Wrap(ctx, err, op, errors.WithMsg("error finding worker by keyId"))
}
default:
// generating the worker id based off of the scope and name ensures
Expand All @@ -295,11 +295,10 @@ func (r *Repository) UpsertWorkerStatus(ctx context.Context, worker *Worker, opt
// workers and kms workers.
workerId, err = NewWorkerIdFromScopeAndName(ctx, worker.GetScopeId(), worker.GetName())
if err != nil || workerId == "" {
return nil, errors.Wrap(ctx, err, op, errors.WithMsg("error creating a worker id"))
return "", errors.Wrap(ctx, err, op, errors.WithMsg("error creating a worker id"))
}
}

var ret *Worker
_, err = r.writer.DoTx(
ctx,
db.StdRetryCnt,
Expand Down Expand Up @@ -366,23 +365,14 @@ func (r *Repository) UpsertWorkerStatus(ctx context.Context, worker *Worker, opt
}
}

wAgg := &workerAggregate{PublicId: workerClone.GetPublicId()}
if err := reader.LookupById(ctx, wAgg); err != nil {
return errors.Wrap(ctx, err, op, errors.WithMsg("error looking up worker aggregate"))
}
ret, err = wAgg.toWorker(ctx)
if err != nil {
return errors.Wrap(ctx, err, op, errors.WithMsg("error converting worker aggregate to worker"))
}

return nil
},
)
if err != nil {
return nil, err
return "", err
}

return ret, nil
return workerId, nil
}

// setWorkerConfigTags removes all existing config tags from the same source and worker id
Expand Down
43 changes: 26 additions & 17 deletions internal/server/repository_worker_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -235,7 +235,7 @@ func TestUpsertWorkerStatus(t *testing.T) {
server.WithAddress("address"), server.WithName("config_name1"),
server.WithDescription("kms_description1"),
)
worker, err := repo.UpsertWorkerStatus(ctx, wStatus1)
worker, err := server.UpsertAndReturnWorker(ctx, t, wStatus1, repo)
require.NoError(t, err)

assert.True(t, strings.HasPrefix(worker.GetPublicId(), "w_"))
Expand All @@ -250,7 +250,7 @@ func TestUpsertWorkerStatus(t *testing.T) {
// update again and see updated last status time
wStatus2 := server.NewWorker(scope.Global.String(),
server.WithAddress("new_address"), server.WithName("config_name1"), server.WithReleaseVersion("test-version"))
worker, err = repo.UpsertWorkerStatus(ctx, wStatus2)
worker, err = server.UpsertAndReturnWorker(ctx, t, wStatus2, repo)
require.NoError(t, err)
assert.Greater(t, worker.GetLastStatusTime().AsTime(), worker.GetCreateTime().AsTime())
assert.Equal(t, "config_name1", worker.Name)
Expand All @@ -269,7 +269,7 @@ func TestUpsertWorkerStatus(t *testing.T) {
wStatus3 := server.NewWorker(scope.Global.String(),
server.WithAddress("new_address"), server.WithName("config_name1"),
server.WithOperationalState("shutdown"), server.WithReleaseVersion("Boundary v0.11.0"))
worker, err = repo.UpsertWorkerStatus(ctx, wStatus3)
worker, err = server.UpsertAndReturnWorker(ctx, t, wStatus3, repo)
require.NoError(t, err)
assert.Greater(t, worker.GetLastStatusTime().AsTime(), worker.GetCreateTime().AsTime())
// Version does not change for status updates
Expand All @@ -282,7 +282,7 @@ func TestUpsertWorkerStatus(t *testing.T) {
server.WithAddress("new_address"), server.WithName("config_name1"),
server.WithOperationalState("shutdown"), server.WithReleaseVersion("Boundary v0.11.0"),
server.WithLocalStorageState("available"))
worker, err = repo.UpsertWorkerStatus(ctx, wStatus4)
worker, err = server.UpsertAndReturnWorker(ctx, t, wStatus4, repo)
require.NoError(t, err)
assert.Greater(t, worker.GetLastStatusTime().AsTime(), worker.GetCreateTime().AsTime())
// Version does not change for status updates
Expand All @@ -303,7 +303,7 @@ func TestUpsertWorkerStatus(t *testing.T) {
wStatus1 := server.NewWorker(scope.Global.String(),
server.WithAddress("pki_address"), server.WithDescription("pki_description2"),
server.WithReleaseVersion("test-version"))
worker, err := repo.UpsertWorkerStatus(ctx, wStatus1, server.WithKeyId(pkiWorkerKeyId), server.WithReleaseVersion("test-version"))
worker, err := server.UpsertAndReturnWorker(ctx, t, wStatus1, repo, server.WithKeyId(pkiWorkerKeyId), server.WithReleaseVersion("test-version"))
require.NoError(t, err)

assert.True(t, strings.HasPrefix(worker.GetPublicId(), "w_"))
Expand Down Expand Up @@ -558,8 +558,7 @@ func TestTagUpdatingListing(t *testing.T) {
Value: "value2",
}))

worker1, err = repo.UpsertWorkerStatus(ctx, wStatus,
server.WithUpdateTags(true))
worker1, err = server.UpsertAndReturnWorker(ctx, t, wStatus, repo, server.WithUpdateTags(true))
require.NoError(err)
assert.Len(t, worker1.CanonicalTags(), 1)
assert.ElementsMatch(t, []string{"value1", "value2"}, worker1.CanonicalTags()["tag1"])
Expand All @@ -577,13 +576,13 @@ func TestTagUpdatingListing(t *testing.T) {
Key: "tag22",
Value: "value22",
}))
worker1, err = repo.UpsertWorkerStatus(ctx, wStatus)
worker1, err = server.UpsertAndReturnWorker(ctx, t, wStatus, repo)
require.NoError(err)
assert.Len(t, worker1.CanonicalTags(), 1)
assert.ElementsMatch(t, []string{"value1", "value2"}, worker1.CanonicalTags()["tag1"])

// Update tags and test again
worker1, err = repo.UpsertWorkerStatus(ctx, wStatus, server.WithUpdateTags(true))
worker1, err = server.UpsertAndReturnWorker(ctx, t, wStatus, repo, server.WithUpdateTags(true))
require.NoError(err)
assert.Len(t, worker1.CanonicalTags(), 1)
assert.ElementsMatch(t, []string{"value21", "value22"}, worker1.CanonicalTags()["tag22"])
Expand Down Expand Up @@ -782,41 +781,46 @@ func TestListWorkers_WithActiveWorkers(t *testing.T) {
{
name: "upsert-worker1-to-shutdown",
upsertFn: func() (*server.Worker, error) {
return serversRepo.UpsertWorkerStatus(ctx,
return server.UpsertAndReturnWorker(ctx, t,
server.NewWorker(scope.Global.String(),
server.WithName(worker1.GetName()),
server.WithAddress(worker1.GetAddress()),
server.WithOperationalState(server.ShutdownOperationalState.String()),
server.WithReleaseVersion("Boundary v.0.11"),
server.WithPublicId(worker1.GetPublicId())))
server.WithPublicId(worker1.GetPublicId())),
serversRepo)
},
wantCnt: 2,
wantState: server.ShutdownOperationalState.String(),
},
{
name: "upsert-worker2-to-shutdown",
upsertFn: func() (*server.Worker, error) {
return serversRepo.UpsertWorkerStatus(ctx,
workerId, err := serversRepo.UpsertWorkerStatus(ctx,
server.NewWorker(scope.Global.String(),
server.WithName(worker2.GetName()),
server.WithAddress(worker2.GetAddress()),
server.WithOperationalState(server.ShutdownOperationalState.String()),
server.WithReleaseVersion("Boundary v.0.11"),
server.WithPublicId(worker2.GetPublicId())))
require.NoError(err)
return serversRepo.LookupWorker(ctx, workerId)
},
wantCnt: 1,
wantState: server.ShutdownOperationalState.String(),
},
{
name: "upsert-worker3-to-shutdown",
upsertFn: func() (*server.Worker, error) {
return serversRepo.UpsertWorkerStatus(ctx,
workerId, err := serversRepo.UpsertWorkerStatus(ctx,
server.NewWorker(scope.Global.String(),
server.WithName(worker3.GetName()),
server.WithAddress(worker3.GetAddress()),
server.WithOperationalState(server.ShutdownOperationalState.String()),
server.WithReleaseVersion("Boundary v.0.11"),
server.WithPublicId(worker3.GetPublicId())))
require.NoError(err)
return serversRepo.LookupWorker(ctx, workerId)
},
wantCnt: 0,
wantState: server.ShutdownOperationalState.String(),
Expand All @@ -825,36 +829,41 @@ func TestListWorkers_WithActiveWorkers(t *testing.T) {
// Pre 0.11 workers will default to Active
name: "upsert-no-release-version-no-state",
upsertFn: func() (*server.Worker, error) {
return serversRepo.UpsertWorkerStatus(ctx,
workerId, err := serversRepo.UpsertWorkerStatus(ctx,
server.NewWorker(scope.Global.String(),
server.WithName(worker3.GetName()),
server.WithAddress(worker3.GetAddress())),
server.WithPublicId(worker3.GetPublicId()))
require.NoError(err)
return serversRepo.LookupWorker(ctx, workerId)
},
wantCnt: 1,
wantState: server.ActiveOperationalState.String(),
},
{ // Upsert with active status and no version and expect to get a hit- test backwards compatibility
name: "upsert-no-release-version-active-state",
upsertFn: func() (*server.Worker, error) {
return serversRepo.UpsertWorkerStatus(ctx,
return server.UpsertAndReturnWorker(ctx, t,
server.NewWorker(scope.Global.String(),
server.WithName(worker3.GetName()),
server.WithAddress(worker3.GetAddress()),
server.WithOperationalState(server.ActiveOperationalState.String())),
server.WithPublicId(worker3.GetPublicId()))
serversRepo,
server.WithPublicId(worker3.GetPublicId()),
)
},
wantCnt: 1,
wantState: server.ActiveOperationalState.String(),
},
{ // Upsert with unknown status and do not expect to get a hit- test worker create before status
name: "upsert-unknown-status",
upsertFn: func() (*server.Worker, error) {
return serversRepo.UpsertWorkerStatus(ctx,
return server.UpsertAndReturnWorker(ctx, t,
server.NewWorker(scope.Global.String(),
server.WithName(worker3.GetName()),
server.WithAddress(worker3.GetAddress()),
server.WithOperationalState(server.UnknownOperationalState.String())),
serversRepo,
server.WithPublicId(worker3.GetPublicId()))
},
wantCnt: 0,
Expand Down
10 changes: 9 additions & 1 deletion internal/server/testing.go
Original file line number Diff line number Diff line change
Expand Up @@ -125,7 +125,8 @@ func TestKmsWorker(t *testing.T, conn *db.DB, wrapper wrapping.Wrapper, opt ...O
}

wrk := NewWorker(scope.Global.String(), opt...)
wrk, err = serversRepo.UpsertWorkerStatus(ctx, wrk)
wrk, err = UpsertAndReturnWorker(ctx, t, wrk, serversRepo)
require.NoError(t, err)
require.NoError(t, err)
require.NotNil(t, wrk)
require.Equal(t, "kms", wrk.Type)
Expand Down Expand Up @@ -222,3 +223,10 @@ func TestLookupWorkerByName(ctx context.Context, t *testing.T, name string, serv
}
return nil, nil
}

func UpsertAndReturnWorker(ctx context.Context, t *testing.T, w *Worker, serversRepo *Repository, opt ...Option) (*Worker, error) {
workerId, err := serversRepo.UpsertWorkerStatus(ctx, w, opt...)
require.NoError(t, err)
require.NotEmpty(t, workerId)
return serversRepo.LookupWorker(ctx, workerId)
}
2 changes: 1 addition & 1 deletion internal/session/job_session_cleanup_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -278,7 +278,7 @@ func TestCloseConnectionsForDeadWorkers(t *testing.T) {
t.Helper()
pubId := w.GetPublicId()
w.PublicId = ""
wkr, err := serversRepo.UpsertWorkerStatus(ctx, w, server.WithPublicId(pubId))
wkr, err := server.UpsertAndReturnWorker(ctx, t, w, serversRepo, server.WithPublicId(pubId))
require.NoError(err)
return wkr
}
Expand Down

0 comments on commit b6ebf4d

Please sign in to comment.