diff --git a/internal/host/plugin/job_set_sync.go b/internal/host/plugin/job_set_sync.go index f1d45f293d..33cef5b2a5 100644 --- a/internal/host/plugin/job_set_sync.go +++ b/internal/host/plugin/job_set_sync.go @@ -299,10 +299,8 @@ func (r *SetSyncJob) syncSets(ctx context.Context, setIds []string) error { return errors.Wrap(ctx, err, op) } - if len(resp.GetHosts()) > 0 { - if _, err := r.upsertHosts(ctx, ci.storeCat, catSetIds, resp.GetHosts()); err != nil { - return errors.Wrap(ctx, err, op, errors.WithMsg("upserting hosts")) - } + if _, err := r.upsertAndCleanHosts(ctx, ci.storeCat, catSetIds, resp.GetHosts()); err != nil { + return errors.Wrap(ctx, err, op, errors.WithMsg("upserting hosts")) } updateSyncDataQuery := ` @@ -326,7 +324,7 @@ where public_id in (?) return nil } -// upsertHosts inserts phs into the repository or updates its current +// upsertAndCleanHosts inserts phs into the repository or updates its current // attributes/set memberships and returns Hosts. h is not changed. hc must // contain a valid public ID and scope ID. Each ph in phs must not contain a // PublicId but must contain an external ID. The PublicId is generated and @@ -335,16 +333,13 @@ where public_id in (?) // NOTE: If phs is empty, this assumes that there are simply no hosts that // matched the given sets! Which means it will remove all hosts from the given // sets. -func (r *SetSyncJob) upsertHosts( +func (r *SetSyncJob) upsertAndCleanHosts( ctx context.Context, hc *HostCatalog, setIds []string, phs []*plgpb.ListHostsResponseHost, _ ...Option) ([]*Host, error) { - const op = "plugin.(SetSyncJob).upsertHosts" - if phs == nil { - return nil, errors.New(ctx, errors.InvalidParameter, op, "nil plugin hosts") - } + const op = "plugin.(SetSyncJob).upsertAndCleanHosts" for _, ph := range phs { if ph.GetExternalId() == "" { return nil, errors.New(ctx, errors.InvalidParameter, op, "missing host external id") diff --git a/internal/host/plugin/job_set_sync_test.go b/internal/host/plugin/job_set_sync_test.go index e7432b3ef2..1dea7986cf 100644 --- a/internal/host/plugin/job_set_sync_test.go +++ b/internal/host/plugin/job_set_sync_test.go @@ -159,6 +159,28 @@ func TestSetSyncJob_Run(t *testing.T) { _, prj := iam.TestScopes(t, iam.TestRepo(t, conn, wrapper)) cat := TestCatalog(t, conn, prj.GetPublicId(), plg.GetPublicId()) + + plgServer.ListHostsFn = func(_ context.Context, _ *plgpb.ListHostsRequest) (*plgpb.ListHostsResponse, error) { + return &plgpb.ListHostsResponse{}, nil + } + // Start with a set with a member that should be removed + setToRemoveHosts := TestSet(t, conn, kmsCache, sched, cat, plgm) + hostToRemove := TestHost(t, conn, cat.GetPublicId(), "remove this host") + TestSetMembers(t, conn, setToRemoveHosts.GetPublicId(), []*Host{hostToRemove}) + + // Run sync again with the newly created set + err = r.Run(context.Background()) + require.NoError(err) + + hsa := &hostSetAgg{PublicId: setToRemoveHosts.GetPublicId()} + require.NoError(rw.LookupByPublicId(ctx, hsa)) + assert.Greater(hsa.LastSyncTime.AsTime().UnixNano(), hsa.CreateTime.AsTime().UnixNano()) + hs, err := hsa.toHostSet(ctx) + require.NoError(err) + assert.Len(hs.HostIds, 0) + _, err = rw.Delete(ctx, hostToRemove) + require.NoError(err) + set1 := TestSet(t, conn, kmsCache, sched, cat, plgm) counter := new(uint32) plgServer.ListHostsFn = func(ctx context.Context, req *plgpb.ListHostsRequest) (*plgpb.ListHostsResponse, error) { @@ -183,7 +205,7 @@ func TestSetSyncJob_Run(t *testing.T) { hostRepo, err := NewRepository(rw, rw, kmsCache, sche, plgm) require.NoError(err) - hsa := &hostSetAgg{PublicId: set1.GetPublicId()} + hsa = &hostSetAgg{PublicId: set1.GetPublicId()} require.NoError(rw.LookupByPublicId(ctx, hsa)) assert.Less(hsa.LastSyncTime.AsTime().UnixNano(), hsa.CreateTime.AsTime().UnixNano()) @@ -209,12 +231,11 @@ func TestSetSyncJob_Run(t *testing.T) { // Run sync again with the freshly synced set err = r.Run(context.Background()) require.NoError(err) - // The single existing set should have been processed assert.Equal(0, r.numSets) assert.Equal(0, r.numProcessed) // Set needs update - hs, err := hsa.toHostSet(ctx) + hs, err = hsa.toHostSet(ctx) require.NoError(err) hs.NeedSync = true count, err := rw.Update(ctx, hs, []string{"NeedSync"}, nil) diff --git a/internal/host/plugin/repository_host_test.go b/internal/host/plugin/repository_host_test.go index 0bf601f6e2..bf7dc5f32e 100644 --- a/internal/host/plugin/repository_host_test.go +++ b/internal/host/plugin/repository_host_test.go @@ -72,7 +72,6 @@ func TestJob_UpsertHosts(t *testing.T) { sets: setIds, } }, - wantIsErr: errors.InvalidParameter, }, { name: "no-external-id-hosts", @@ -194,14 +193,13 @@ func TestJob_UpsertHosts(t *testing.T) { require.NoError(err) require.NotNil(job) in := tt.in() - got, err := job.upsertHosts(ctx, in.catalog, in.sets, in.phs, tt.opts...) + got, err := job.upsertAndCleanHosts(ctx, in.catalog, in.sets, in.phs, tt.opts...) if tt.wantIsErr != 0 { assert.Truef(errors.Match(errors.T(tt.wantIsErr), err), "want err: %q got: %q", tt.wantIsErr, err) assert.Nil(got) return } require.NoError(err, fmt.Sprintf("%v", in.catalog)) - require.NotNil(got) // Basic tests assert.Len(got, len(in.phs)) @@ -230,7 +228,6 @@ func TestJob_UpsertHosts(t *testing.T) { var gotPlg *hostplg.Plugin got, gotPlg, err = repo.ListHostsByCatalogId(ctx, in.catalog.GetPublicId()) require.NoError(err) - require.NotNil(got) assert.Len(got, len(in.phs)) assert.Empty( cmp.Diff( @@ -243,6 +240,10 @@ func TestJob_UpsertHosts(t *testing.T) { }), ), ) + plg := plg + if len(in.phs) == 0 { + plg = nil + } assert.Empty( cmp.Diff( plg, @@ -283,7 +284,6 @@ func TestJob_UpsertHosts(t *testing.T) { for setId, expHostIds := range setIdMap { got, err = repo.ListHostsBySetIds(ctx, []string{setId}) require.NoError(err) - require.NotNil(got) var gotHostIds []string for _, h := range got { gotHostIds = append(gotHostIds, h.GetPublicId())