Skip to content

Commit

Permalink
feat(workers): add VerifyKnownWorkers repo function (#5189)
Browse files Browse the repository at this point in the history
  • Loading branch information
irenarindos authored Oct 23, 2024
1 parent dfde64e commit 3615805
Show file tree
Hide file tree
Showing 5 changed files with 101 additions and 6 deletions.
4 changes: 2 additions & 2 deletions internal/daemon/cluster/handlers/worker_service.go
Original file line number Diff line number Diff line change
Expand Up @@ -193,12 +193,12 @@ func (ws *workerServiceServer) Status(ctx context.Context, req *pbs.StatusReques

authorizedDownstreams := &pbs.AuthorizedDownstreamWorkerList{}
if len(req.GetConnectedWorkerPublicIds()) > 0 {
knownConnectedWorkers, err := serverRepo.ListWorkers(ctx, []string{scope.Global.String()}, server.WithWorkerPool(req.GetConnectedWorkerPublicIds()), server.WithLiveness(-1))
knownConnectedWorkers, err := serverRepo.VerifyKnownWorkers(ctx, req.GetConnectedWorkerPublicIds())
if err != nil {
event.WriteError(ctx, op, err, event.WithInfoMsg("error getting known connected worker ids"))
return &pbs.StatusResponse{}, status.Errorf(codes.Internal, "Error getting known connected worker ids: %v", err)
}
authorizedDownstreams.WorkerPublicIds = server.WorkerList(knownConnectedWorkers).PublicIds()
authorizedDownstreams.WorkerPublicIds = knownConnectedWorkers
}

if len(req.GetConnectedUnmappedWorkerKeyIdentifiers()) > 0 {
Expand Down
6 changes: 2 additions & 4 deletions internal/daemon/controller/tickers.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,9 +12,7 @@ import (
"github.com/hashicorp/boundary/internal/daemon/cluster"
"github.com/hashicorp/boundary/internal/errors"
"github.com/hashicorp/boundary/internal/event"
"github.com/hashicorp/boundary/internal/server"
"github.com/hashicorp/boundary/internal/server/store"
"github.com/hashicorp/boundary/internal/types/scope"
)

// In the future we could make this configurable
Expand Down Expand Up @@ -198,12 +196,12 @@ func (c *Controller) startWorkerConnectionMaintenanceTicking(cancelCtx context.C
event.WriteError(cancelCtx, op, err, event.WithInfoMsg("error fetching server repository for cluster connection maintenance"))
break
}
knownWorker, err := serverRepo.ListWorkers(cancelCtx, []string{scope.Global.String()}, server.WithWorkerPool(connectionState.WorkerIds()), server.WithLiveness(-1))
knownWorkers, err := serverRepo.VerifyKnownWorkers(cancelCtx, connectionState.WorkerIds())
if err != nil {
event.WriteError(cancelCtx, op, err, event.WithInfoMsg("couldn't get known workers from repo"))
break
}
connectionState.DisconnectMissingWorkers(server.WorkerList(knownWorker).PublicIds())
connectionState.DisconnectMissingWorkers(knownWorkers)
}

if len(connectionState.UnmappedKeyIds()) > 0 {
Expand Down
6 changes: 6 additions & 0 deletions internal/server/query.go
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,12 @@ const (
where worker_key_identifier = @worker_key_identifier
`

verifyKnownWorkersQuery = `
select public_id
from server_worker
where public_id in (?);
`

getWorkerAuthsByWorkerIdQuery = `
select *
from worker_auth_authorized
Expand Down
34 changes: 34 additions & 0 deletions internal/server/repository_worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -431,6 +431,40 @@ func (r *Repository) UpsertWorkerStatus(ctx context.Context, worker *Worker, opt
return ret, nil
}

// VerifyKnownWorkers checks that the passed worker IDs are found in the repository and returns
// the public IDs of the workers that are found.
func (r *Repository) VerifyKnownWorkers(ctx context.Context, ids []string) ([]string, error) {
const op = "server.(Repository).VerifyKnownWorkers"

if len(ids) == 0 {
return nil, nil
}

rows, err := r.reader.Query(ctx, verifyKnownWorkersQuery, []any{ids})
if err != nil {
return nil, errors.Wrap(ctx, err, op)
}
defer rows.Close()

type rowsResult struct {
PublicId string
}
var ret []string
for rows.Next() {
var result rowsResult
err = r.reader.ScanRows(ctx, rows, &result)
if err != nil {
return nil, errors.Wrap(ctx, err, op)
}
ret = append(ret, result.PublicId)
}
if err := rows.Err(); err != nil {
return nil, errors.Wrap(ctx, err, op)
}

return ret, nil
}

// setWorkerTags removes all existing tags from the same source and worker id
// and creates new ones based on the ones provided. This function should be
// called from inside a db transaction.
Expand Down
57 changes: 57 additions & 0 deletions internal/server/repository_worker_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -567,6 +567,63 @@ func TestUpsertWorkerStatus(t *testing.T) {
})
}

func TestVerifyKnownWorkers(t *testing.T) {
ctx := context.Background()
conn, _ := db.TestSetup(t, "postgres")
rw := db.New(conn)
wrapper := db.TestWrapper(t)
kmsCache := kms.TestKms(t, conn, wrapper)
require.NoError(t, kmsCache.CreateKeys(context.Background(), scope.Global.String(), kms.WithRandomReader(rand.Reader)))
repo, err := server.NewRepository(ctx, rw, rw, kmsCache)
require.NoError(t, err)

workerIds := make([]string, 0, 10)
// Seed the repo with workers
for i := 0; i < 10; i++ {
w := server.TestPkiWorker(t, conn, wrapper)
workerIds = append(workerIds, w.GetPublicId())
}

tests := []struct {
name string
testIds []string
wantCnt int
}{
{
name: "empty-list",
testIds: []string{},
wantCnt: 0,
},
{
name: "full-list",
testIds: workerIds,
wantCnt: 10,
},
{
name: "bogus-list",
testIds: []string{"w_bogus1", "w_bogus2"},
wantCnt: 0,
},
{
name: "partial-bogus-list",
testIds: []string{"w_bogus1", "w_bogus2", workerIds[0], workerIds[1]},
wantCnt: 2,
},
{
name: "partial-list",
testIds: workerIds[:5],
wantCnt: 5,
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
ids, err := repo.VerifyKnownWorkers(ctx, tt.testIds)
require.NoError(t, err)
require.Equal(t, tt.wantCnt, len(ids))
})
}
}

func TestTagUpdatingListing(t *testing.T) {
ctx := context.Background()
require := require.New(t)
Expand Down

0 comments on commit 3615805

Please sign in to comment.