Skip to content

Commit

Permalink
Set sync job still syncs when plugin reports no hosts in sets (#1726)
Browse files Browse the repository at this point in the history
* Allow the set sync job to sync when no hosts are returned by the sync job.
  • Loading branch information
talanknight authored Nov 17, 2021
1 parent 7174a91 commit 18de365
Show file tree
Hide file tree
Showing 3 changed files with 34 additions and 18 deletions.
15 changes: 5 additions & 10 deletions internal/host/plugin/job_set_sync.go
Original file line number Diff line number Diff line change
Expand Up @@ -299,10 +299,8 @@ func (r *SetSyncJob) syncSets(ctx context.Context, setIds []string) error {
return errors.Wrap(ctx, err, op)
}

if len(resp.GetHosts()) > 0 {
if _, err := r.upsertHosts(ctx, ci.storeCat, catSetIds, resp.GetHosts()); err != nil {
return errors.Wrap(ctx, err, op, errors.WithMsg("upserting hosts"))
}
if _, err := r.upsertAndCleanHosts(ctx, ci.storeCat, catSetIds, resp.GetHosts()); err != nil {
return errors.Wrap(ctx, err, op, errors.WithMsg("upserting hosts"))
}

updateSyncDataQuery := `
Expand All @@ -326,7 +324,7 @@ where public_id in (?)
return nil
}

// upsertHosts inserts phs into the repository or updates its current
// upsertAndCleanHosts inserts phs into the repository or updates its current
// attributes/set memberships and returns Hosts. h is not changed. hc must
// contain a valid public ID and scope ID. Each ph in phs must not contain a
// PublicId but must contain an external ID. The PublicId is generated and
Expand All @@ -335,16 +333,13 @@ where public_id in (?)
// NOTE: If phs is empty, this assumes that there are simply no hosts that
// matched the given sets! Which means it will remove all hosts from the given
// sets.
func (r *SetSyncJob) upsertHosts(
func (r *SetSyncJob) upsertAndCleanHosts(
ctx context.Context,
hc *HostCatalog,
setIds []string,
phs []*plgpb.ListHostsResponseHost,
_ ...Option) ([]*Host, error) {
const op = "plugin.(SetSyncJob).upsertHosts"
if phs == nil {
return nil, errors.New(ctx, errors.InvalidParameter, op, "nil plugin hosts")
}
const op = "plugin.(SetSyncJob).upsertAndCleanHosts"
for _, ph := range phs {
if ph.GetExternalId() == "" {
return nil, errors.New(ctx, errors.InvalidParameter, op, "missing host external id")
Expand Down
27 changes: 24 additions & 3 deletions internal/host/plugin/job_set_sync_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -159,6 +159,28 @@ func TestSetSyncJob_Run(t *testing.T) {
_, prj := iam.TestScopes(t, iam.TestRepo(t, conn, wrapper))

cat := TestCatalog(t, conn, prj.GetPublicId(), plg.GetPublicId())

plgServer.ListHostsFn = func(_ context.Context, _ *plgpb.ListHostsRequest) (*plgpb.ListHostsResponse, error) {
return &plgpb.ListHostsResponse{}, nil
}
// Start with a set with a member that should be removed
setToRemoveHosts := TestSet(t, conn, kmsCache, sched, cat, plgm)
hostToRemove := TestHost(t, conn, cat.GetPublicId(), "remove this host")
TestSetMembers(t, conn, setToRemoveHosts.GetPublicId(), []*Host{hostToRemove})

// Run sync again with the newly created set
err = r.Run(context.Background())
require.NoError(err)

hsa := &hostSetAgg{PublicId: setToRemoveHosts.GetPublicId()}
require.NoError(rw.LookupByPublicId(ctx, hsa))
assert.Greater(hsa.LastSyncTime.AsTime().UnixNano(), hsa.CreateTime.AsTime().UnixNano())
hs, err := hsa.toHostSet(ctx)
require.NoError(err)
assert.Len(hs.HostIds, 0)
_, err = rw.Delete(ctx, hostToRemove)
require.NoError(err)

set1 := TestSet(t, conn, kmsCache, sched, cat, plgm)
counter := new(uint32)
plgServer.ListHostsFn = func(ctx context.Context, req *plgpb.ListHostsRequest) (*plgpb.ListHostsResponse, error) {
Expand All @@ -183,7 +205,7 @@ func TestSetSyncJob_Run(t *testing.T) {
hostRepo, err := NewRepository(rw, rw, kmsCache, sche, plgm)
require.NoError(err)

hsa := &hostSetAgg{PublicId: set1.GetPublicId()}
hsa = &hostSetAgg{PublicId: set1.GetPublicId()}
require.NoError(rw.LookupByPublicId(ctx, hsa))
assert.Less(hsa.LastSyncTime.AsTime().UnixNano(), hsa.CreateTime.AsTime().UnixNano())

Expand All @@ -209,12 +231,11 @@ func TestSetSyncJob_Run(t *testing.T) {
// Run sync again with the freshly synced set
err = r.Run(context.Background())
require.NoError(err)
// The single existing set should have been processed
assert.Equal(0, r.numSets)
assert.Equal(0, r.numProcessed)

// Set needs update
hs, err := hsa.toHostSet(ctx)
hs, err = hsa.toHostSet(ctx)
require.NoError(err)
hs.NeedSync = true
count, err := rw.Update(ctx, hs, []string{"NeedSync"}, nil)
Expand Down
10 changes: 5 additions & 5 deletions internal/host/plugin/repository_host_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,6 @@ func TestJob_UpsertHosts(t *testing.T) {
sets: setIds,
}
},
wantIsErr: errors.InvalidParameter,
},
{
name: "no-external-id-hosts",
Expand Down Expand Up @@ -194,14 +193,13 @@ func TestJob_UpsertHosts(t *testing.T) {
require.NoError(err)
require.NotNil(job)
in := tt.in()
got, err := job.upsertHosts(ctx, in.catalog, in.sets, in.phs, tt.opts...)
got, err := job.upsertAndCleanHosts(ctx, in.catalog, in.sets, in.phs, tt.opts...)
if tt.wantIsErr != 0 {
assert.Truef(errors.Match(errors.T(tt.wantIsErr), err), "want err: %q got: %q", tt.wantIsErr, err)
assert.Nil(got)
return
}
require.NoError(err, fmt.Sprintf("%v", in.catalog))
require.NotNil(got)

// Basic tests
assert.Len(got, len(in.phs))
Expand Down Expand Up @@ -230,7 +228,6 @@ func TestJob_UpsertHosts(t *testing.T) {
var gotPlg *hostplg.Plugin
got, gotPlg, err = repo.ListHostsByCatalogId(ctx, in.catalog.GetPublicId())
require.NoError(err)
require.NotNil(got)
assert.Len(got, len(in.phs))
assert.Empty(
cmp.Diff(
Expand All @@ -243,6 +240,10 @@ func TestJob_UpsertHosts(t *testing.T) {
}),
),
)
plg := plg
if len(in.phs) == 0 {
plg = nil
}
assert.Empty(
cmp.Diff(
plg,
Expand Down Expand Up @@ -283,7 +284,6 @@ func TestJob_UpsertHosts(t *testing.T) {
for setId, expHostIds := range setIdMap {
got, err = repo.ListHostsBySetIds(ctx, []string{setId})
require.NoError(err)
require.NotNil(got)
var gotHostIds []string
for _, h := range got {
gotHostIds = append(gotHostIds, h.GetPublicId())
Expand Down

0 comments on commit 18de365

Please sign in to comment.