From 833c8dc3ec2059ed67206801e1c05486df9a3451 Mon Sep 17 00:00:00 2001 From: Gregory Newman-Smith <109068393+gregns1@users.noreply.github.com> Date: Fri, 20 Sep 2024 11:11:03 +0100 Subject: [PATCH 1/3] CBG-4207: Attachment metadata migration on import (#7117) * CBG-4207: have import feed migrate attachments from sync data to global sync data even if document doesn't need importing * new comment * updates for attachment compaction * remove print line * move code into else clause --- db/attachment_compaction.go | 37 ++++-- db/crud.go | 26 ++++ db/import.go | 2 +- db/import_listener.go | 9 +- db/util_testing.go | 25 ++++ rest/importtest/import_test.go | 232 +++++++++++++++++++++++++++++++++ 6 files changed, 317 insertions(+), 14 deletions(-) diff --git a/db/attachment_compaction.go b/db/attachment_compaction.go index 0d54835ee1..57b9ccb642 100644 --- a/db/attachment_compaction.go +++ b/db/attachment_compaction.go @@ -193,9 +193,9 @@ type AttachmentsMetaMap struct { Attachments map[string]AttachmentsMeta `json:"_attachments"` } -// AttachmentCompactionData struct to unmarshal a document sync data into in order to process attachments during mark +// AttachmentCompactionSyncData struct to unmarshal a document sync data into in order to process attachments during mark // phase. Contains only what is necessary -type AttachmentCompactionData struct { +type AttachmentCompactionSyncData struct { Attachments map[string]AttachmentsMeta `json:"attachments"` Flags uint8 `json:"flags"` History struct { @@ -204,29 +204,42 @@ type AttachmentCompactionData struct { } `json:"history"` } -// getAttachmentSyncData takes the data type and data from the DCP feed and will return a AttachmentCompactionData +// AttachmentCompactionGlobalSyncData is to unmarshal a documents global xattr in order to process attachments during mark phase. +type AttachmentCompactionGlobalSyncData struct { + Attachments map[string]AttachmentsMeta `json:"attachments_meta"` +} + +// getAttachmentSyncData takes the data type and data from the DCP feed and will return a AttachmentCompactionSyncData // struct containing data needed to process attachments on a document. -func getAttachmentSyncData(dataType uint8, data []byte) (*AttachmentCompactionData, error) { - var attachmentData *AttachmentCompactionData +func getAttachmentSyncData(dataType uint8, data []byte) (*AttachmentCompactionSyncData, error) { + var attachmentSyncData *AttachmentCompactionSyncData + var attachmentGlobalSyncData AttachmentCompactionGlobalSyncData var documentBody []byte if dataType&base.MemcachedDataTypeXattr != 0 { - body, xattrs, err := sgbucket.DecodeValueWithXattrs([]string{base.SyncXattrName}, data) + body, xattrs, err := sgbucket.DecodeValueWithXattrs([]string{base.SyncXattrName, base.GlobalXattrName}, data) if err != nil { if errors.Is(err, sgbucket.ErrXattrInvalidLen) { return nil, nil } return nil, fmt.Errorf("Could not parse DCP attachment sync data: %w", err) } - err = base.JSONUnmarshal(xattrs[base.SyncXattrName], &attachmentData) + err = base.JSONUnmarshal(xattrs[base.SyncXattrName], &attachmentSyncData) if err != nil { return nil, err } + if xattrs[base.GlobalXattrName] != nil && attachmentSyncData.Attachments == nil { + err = base.JSONUnmarshal(xattrs[base.GlobalXattrName], &attachmentGlobalSyncData) + if err != nil { + return nil, err + } + attachmentSyncData.Attachments = attachmentGlobalSyncData.Attachments + } documentBody = body } else { type AttachmentDataSync struct { - AttachmentData AttachmentCompactionData `json:"_sync"` + AttachmentData AttachmentCompactionSyncData `json:"_sync"` } var attachmentDataSync AttachmentDataSync err := base.JSONUnmarshal(data, &attachmentDataSync) @@ -235,21 +248,21 @@ func getAttachmentSyncData(dataType uint8, data []byte) (*AttachmentCompactionDa } documentBody = data - attachmentData = &attachmentDataSync.AttachmentData + attachmentSyncData = &attachmentDataSync.AttachmentData } // If we've not yet found any attachments have a last effort attempt to grab it from the body for pre-2.5 documents - if len(attachmentData.Attachments) == 0 { + if len(attachmentSyncData.Attachments) == 0 { attachmentMetaMap, err := checkForInlineAttachments(documentBody) if err != nil { return nil, err } if attachmentMetaMap != nil { - attachmentData.Attachments = attachmentMetaMap.Attachments + attachmentSyncData.Attachments = attachmentMetaMap.Attachments } } - return attachmentData, nil + return attachmentSyncData, nil } // checkForInlineAttachments will scan a body for "_attachments" for pre-2.5 attachments and will return any attachments diff --git a/db/crud.go b/db/crud.go index 3dc283111d..01986c3379 100644 --- a/db/crud.go +++ b/db/crud.go @@ -941,6 +941,32 @@ func (db *DatabaseCollectionWithUser) updateHLV(d *Document, docUpdateEvent DocU return d, nil } +// MigrateAttachmentMetadata will move any attachment metadata defined in sync data to global sync xattr +func (c *DatabaseCollectionWithUser) MigrateAttachmentMetadata(ctx context.Context, docID string, cas uint64, syncData *SyncData) error { + globalData := GlobalSyncData{ + GlobalAttachments: syncData.Attachments, + } + globalXattr, err := base.JSONMarshal(globalData) + if err != nil { + return base.RedactErrorf("Failed to Marshal global sync data when attempting to migrate sync data attachments to global xattr with id: %s. Error: %v", base.UD(docID), err) + } + syncData.Attachments = nil + rawSyncXattr, err := base.JSONMarshal(*syncData) + if err != nil { + return base.RedactErrorf("Failed to Marshal sync data when attempting to migrate sync data attachments to global xattr with id: %s. Error: %v", base.UD(docID), err) + } + + // build macro expansion for sync data. This will avoid the update to xattrs causing an extra import event (i.e. sync cas will be == to doc cas) + opts := &sgbucket.MutateInOptions{} + spec := macroExpandSpec(base.SyncXattrName) + opts.MacroExpansion = spec + opts.PreserveExpiry = true // if doc has expiry, we should preserve this + + updatedXattr := map[string][]byte{base.SyncXattrName: rawSyncXattr, base.GlobalXattrName: globalXattr} + _, err = c.dataStore.UpdateXattrs(ctx, docID, 0, cas, updatedXattr, opts) + return err +} + // Updates or creates a document. // The new body's BodyRev property must match the current revision's, if any. func (db *DatabaseCollectionWithUser) Put(ctx context.Context, docid string, body Body) (newRevID string, doc *Document, err error) { diff --git a/db/import.go b/db/import.go index c0c7bb2569..b5d324be92 100644 --- a/db/import.go +++ b/db/import.go @@ -97,7 +97,7 @@ func (db *DatabaseCollectionWithUser) ImportDoc(ctx context.Context, docid strin existingBucketDoc.Xattrs[base.MouXattrName], err = base.JSONMarshal(existingDoc.metadataOnlyUpdate) } } else { - existingBucketDoc.Body, existingBucketDoc.Xattrs[base.SyncXattrName], existingBucketDoc.Xattrs[base.VvXattrName], existingBucketDoc.Xattrs[base.MouXattrName], _, err = existingDoc.MarshalWithXattrs() + existingBucketDoc.Body, existingBucketDoc.Xattrs[base.SyncXattrName], existingBucketDoc.Xattrs[base.VvXattrName], existingBucketDoc.Xattrs[base.MouXattrName], existingBucketDoc.Xattrs[base.GlobalXattrName], err = existingDoc.MarshalWithXattrs() } } diff --git a/db/import_listener.go b/db/import_listener.go index 162932d6d1..a92c7b72ec 100644 --- a/db/import_listener.go +++ b/db/import_listener.go @@ -188,13 +188,13 @@ func (il *importListener) ImportFeedEvent(ctx context.Context, collection *Datab } } + docID := string(event.Key) // If syncData is nil, or if this was not an SG write, attempt to import if syncData == nil || !isSGWrite { isDelete := event.Opcode == sgbucket.FeedOpDeletion if isDelete { rawBody = nil } - docID := string(event.Key) // last attempt to exit processing if the importListener has been closed before attempting to write to the bucket select { @@ -220,6 +220,13 @@ func (il *importListener) ImportFeedEvent(ctx context.Context, collection *Datab base.DebugfCtx(ctx, base.KeyImport, "Did not import doc %q - external update will not be accessible via Sync Gateway. Reason: %v", base.UD(docID), err) } } + } else if syncData != nil && syncData.Attachments != nil { + base.DebugfCtx(ctx, base.KeyImport, "Attachment metadata found in sync data for doc with id %s, migrating attachment metadata", base.UD(docID)) + // we have attachments to migrate + err := collection.MigrateAttachmentMetadata(ctx, docID, event.Cas, syncData) + if err != nil { + base.WarnfCtx(ctx, "error migrating attachment metadata from sync data to global sync for doc %s. Error: %v", base.UD(docID), err) + } } } diff --git a/db/util_testing.go b/db/util_testing.go index a4f201cff2..ce9ab46530 100644 --- a/db/util_testing.go +++ b/db/util_testing.go @@ -893,3 +893,28 @@ func RetrieveDocRevSeqNo(t *testing.T, docxattr []byte) uint64 { require.NoError(t, err) return revNo } + +// MoveAttachmentXattrFromGlobalToSync is a test only function that will move any defined attachment metadata in global xattr to sync data xattr +func MoveAttachmentXattrFromGlobalToSync(t *testing.T, ctx context.Context, docID string, cas uint64, value, syncXattr []byte, attachments AttachmentsMeta, macroExpand bool, dataStore base.DataStore) { + var docSync SyncData + err := base.JSONUnmarshal(syncXattr, &docSync) + require.NoError(t, err) + docSync.Attachments = attachments + + opts := &sgbucket.MutateInOptions{} + // this should be true for cases we want to move the attachment metadata without causing a new import feed event + if macroExpand { + spec := macroExpandSpec(base.SyncXattrName) + opts.MacroExpansion = spec + } else { + opts = nil + docSync.Cas = "" + } + + newSync, err := base.JSONMarshal(docSync) + require.NoError(t, err) + + // change this to update xattr + _, err = dataStore.WriteWithXattrs(ctx, docID, 0, cas, value, map[string][]byte{base.SyncXattrName: newSync}, []string{base.GlobalXattrName}, opts) + require.NoError(t, err) +} diff --git a/rest/importtest/import_test.go b/rest/importtest/import_test.go index f5ae7bfa61..a78311acc7 100644 --- a/rest/importtest/import_test.go +++ b/rest/importtest/import_test.go @@ -2447,3 +2447,235 @@ func TestPrevRevNoPopulationImportFeed(t *testing.T) { assert.Equal(t, revNo-1, mou.PreviousRevSeqNo) } + +// TestMigrationOfAttachmentsOnImport: +// - Create a doc and move the attachment metadata from global xattr to sync data xattr in a way that when the doc +// arrives over import feed it will be determined that it doesn't require import +// - Wait for the doc to arrive over import feed and assert even though the doc is not imported it will still get +// attachment metadata migrated from sync data to global xattr +// - Create a doc and move the attachment metadata from global xattr to sync data xattr in a way that when the doc +// arrives over import feed it will be determined that it does require import +// - Wait for the doc to arrive over the import feed and assert that once doc was imported the attachment metadata +// was migrated from sync data xattr to global xattr +func TestMigrationOfAttachmentsOnImport(t *testing.T) { + base.SkipImportTestsIfNotEnabled(t) + + rtConfig := rest.RestTesterConfig{ + DatabaseConfig: &rest.DatabaseConfig{DbConfig: rest.DbConfig{ + AutoImport: true, + }}, + } + rt := rest.NewRestTester(t, &rtConfig) + defer rt.Close() + dataStore := rt.GetSingleDataStore() + ctx := base.TestCtx(t) + + // add new doc to test a doc arriving import feed that doesn't need importing still has attachment migration take place + key := "doc1" + body := `{"test": true, "_attachments": {"hello.txt": {"data":"aGVsbG8gd29ybGQ="}}}` + rt.PutDoc(key, body) + + // grab defined attachment metadata to move to sync data + value, xattrs, cas, err := dataStore.GetWithXattrs(ctx, key, []string{base.SyncXattrName, base.GlobalXattrName}) + require.NoError(t, err) + syncXattr, ok := xattrs[base.SyncXattrName] + require.True(t, ok) + globalXattr, ok := xattrs[base.GlobalXattrName] + require.True(t, ok) + + var attachs db.GlobalSyncData + err = base.JSONUnmarshal(globalXattr, &attachs) + require.NoError(t, err) + + db.MoveAttachmentXattrFromGlobalToSync(t, ctx, key, cas, value, syncXattr, attachs.GlobalAttachments, true, dataStore) + + // retry loop to wait for import event to arrive over dcp, as doc won't be 'imported' we can't wait for import stat + var retryXattrs map[string][]byte + err = rt.WaitForCondition(func() bool { + retryXattrs, _, err = dataStore.GetXattrs(ctx, key, []string{base.SyncXattrName, base.GlobalXattrName}) + require.NoError(t, err) + _, ok := retryXattrs[base.GlobalXattrName] + return ok + }) + require.NoError(t, err) + + syncXattr, ok = retryXattrs[base.SyncXattrName] + require.True(t, ok) + globalXattr, ok = retryXattrs[base.GlobalXattrName] + require.True(t, ok) + + // empty global sync, + attachs = db.GlobalSyncData{} + err = base.JSONUnmarshal(globalXattr, &attachs) + require.NoError(t, err) + var syncData db.SyncData + err = base.JSONUnmarshal(syncXattr, &syncData) + require.NoError(t, err) + + // assert that the attachment metadata has been moved + assert.NotNil(t, attachs.GlobalAttachments) + assert.Nil(t, syncData.Attachments) + att := attachs.GlobalAttachments["hello.txt"].(map[string]interface{}) + assert.Equal(t, float64(11), att["length"]) + + // assert that no import took place + base.RequireWaitForStat(t, func() int64 { + return rt.GetDatabase().DbStats.SharedBucketImportStats.ImportCount.Value() + }, 0) + + // add new doc to test import of doc over feed moves attachments + key = "doc2" + body = `{"test": true, "_attachments": {"hello.txt": {"data":"aGVsbG8gd29ybGQ="}}}` + rt.PutDoc(key, body) + + _, xattrs, cas, err = dataStore.GetWithXattrs(ctx, key, []string{base.SyncXattrName, base.GlobalXattrName}) + require.NoError(t, err) + + syncXattr, ok = xattrs[base.SyncXattrName] + require.True(t, ok) + globalXattr, ok = xattrs[base.GlobalXattrName] + require.True(t, ok) + // grab defined attachment metadata to move to sync data + attachs = db.GlobalSyncData{} + err = base.JSONUnmarshal(globalXattr, &attachs) + require.NoError(t, err) + + // change doc body to trigger import on feed + value = []byte(`{"test": "doc"}`) + db.MoveAttachmentXattrFromGlobalToSync(t, ctx, key, cas, value, syncXattr, attachs.GlobalAttachments, false, dataStore) + + // Wait for import + base.RequireWaitForStat(t, func() int64 { + return rt.GetDatabase().DbStats.SharedBucketImportStats.ImportCount.Value() + }, 1) + + // grab the sync and global xattr from doc2 + xattrs, _, err = dataStore.GetXattrs(ctx, key, []string{base.SyncXattrName, base.GlobalXattrName}) + require.NoError(t, err) + syncXattr, ok = xattrs[base.SyncXattrName] + require.True(t, ok) + globalXattr, ok = xattrs[base.GlobalXattrName] + require.True(t, ok) + + err = base.JSONUnmarshal(globalXattr, &attachs) + require.NoError(t, err) + syncData = db.SyncData{} + err = base.JSONUnmarshal(syncXattr, &syncData) + require.NoError(t, err) + + // assert that the attachment metadata has been moved + assert.NotNil(t, attachs.GlobalAttachments) + assert.Nil(t, syncData.Attachments) + att = attachs.GlobalAttachments["hello.txt"].(map[string]interface{}) + assert.Equal(t, float64(11), att["length"]) +} + +// TestMigrationOfAttachmentsOnDemandImport: +// - Create a doc and move the attachment metadata from global xattr to sync data xattr +// - Trigger on demand import for get +// - Assert that the attachment metadata is migrated from sync data xattr to global sync xattr +// - Create a new doc and move the attachment metadata from global xattr to sync data xattr +// - Trigger an on demand import for write +// - Assert that the attachment metadata is migrated from sync data xattr to global sync xattr +func TestMigrationOfAttachmentsOnDemandImport(t *testing.T) { + base.SkipImportTestsIfNotEnabled(t) + + rtConfig := rest.RestTesterConfig{ + DatabaseConfig: &rest.DatabaseConfig{DbConfig: rest.DbConfig{ + AutoImport: false, // avoid anything arriving over import feed for this test + }}, + } + rt := rest.NewRestTester(t, &rtConfig) + defer rt.Close() + dataStore := rt.GetSingleDataStore() + ctx := base.TestCtx(t) + + key := "doc1" + body := `{"test": true, "_attachments": {"hello.txt": {"data":"aGVsbG8gd29ybGQ="}}}` + rt.PutDoc(key, body) + + _, xattrs, cas, err := dataStore.GetWithXattrs(ctx, key, []string{base.SyncXattrName, base.GlobalXattrName}) + require.NoError(t, err) + syncXattr, ok := xattrs[base.SyncXattrName] + require.True(t, ok) + globalXattr, ok := xattrs[base.GlobalXattrName] + require.True(t, ok) + + // grab defined attachment metadata to move to sync data + var attachs db.GlobalSyncData + err = base.JSONUnmarshal(globalXattr, &attachs) + require.NoError(t, err) + + value := []byte(`{"update": "doc"}`) + db.MoveAttachmentXattrFromGlobalToSync(t, ctx, key, cas, value, syncXattr, attachs.GlobalAttachments, false, dataStore) + + // on demand import for get + _, _ = rt.GetDoc(key) + + xattrs, _, err = dataStore.GetXattrs(ctx, key, []string{base.SyncXattrName, base.GlobalXattrName}) + require.NoError(t, err) + + syncXattr, ok = xattrs[base.SyncXattrName] + require.True(t, ok) + globalXattr, ok = xattrs[base.GlobalXattrName] + require.True(t, ok) + + // empty global sync, + attachs = db.GlobalSyncData{} + + err = base.JSONUnmarshal(globalXattr, &attachs) + require.NoError(t, err) + var syncData db.SyncData + err = base.JSONUnmarshal(syncXattr, &syncData) + require.NoError(t, err) + + // assert that the attachment metadata has been moved + assert.NotNil(t, attachs.GlobalAttachments) + assert.Nil(t, syncData.Attachments) + att := attachs.GlobalAttachments["hello.txt"].(map[string]interface{}) + assert.Equal(t, float64(11), att["length"]) + + key = "doc2" + body = `{"test": true, "_attachments": {"hello.txt": {"data":"aGVsbG8gd29ybGQ="}}}` + rt.PutDoc(key, body) + + _, xattrs, cas, err = dataStore.GetWithXattrs(ctx, key, []string{base.SyncXattrName, base.GlobalXattrName}) + require.NoError(t, err) + syncXattr, ok = xattrs[base.SyncXattrName] + require.True(t, ok) + globalXattr, ok = xattrs[base.GlobalXattrName] + require.True(t, ok) + + // grab defined attachment metadata to move to sync data + attachs = db.GlobalSyncData{} + err = base.JSONUnmarshal(globalXattr, &attachs) + require.NoError(t, err) + value = []byte(`{"update": "doc"}`) + db.MoveAttachmentXattrFromGlobalToSync(t, ctx, key, cas, value, syncXattr, attachs.GlobalAttachments, false, dataStore) + + // trigger on demand import for write + resp := rt.SendAdminRequest(http.MethodPut, "/{{.keyspace}}/doc2", `{}`) + rest.RequireStatus(t, resp, http.StatusConflict) + + // assert that the attachments metadata is migrated + xattrs, _, err = dataStore.GetXattrs(ctx, key, []string{base.SyncXattrName, base.GlobalXattrName}) + require.NoError(t, err) + syncXattr, ok = xattrs[base.SyncXattrName] + require.True(t, ok) + globalXattr, ok = xattrs[base.GlobalXattrName] + require.True(t, ok) + + // empty global sync, + attachs = db.GlobalSyncData{} + err = base.JSONUnmarshal(globalXattr, &attachs) + require.NoError(t, err) + syncData = db.SyncData{} + err = base.JSONUnmarshal(syncXattr, &syncData) + require.NoError(t, err) + + // assert that the attachment metadata has been moved + assert.NotNil(t, attachs.GlobalAttachments) + assert.Nil(t, syncData.Attachments) + att = attachs.GlobalAttachments["hello.txt"].(map[string]interface{}) + assert.Equal(t, float64(11), att["length"]) +} From f2aca0351a79824d7e3a7c25c7f0d90f8e5440dc Mon Sep 17 00:00:00 2001 From: Gregory Newman-Smith <109068393+gregns1@users.noreply.github.com> Date: Fri, 20 Sep 2024 15:38:36 +0100 Subject: [PATCH 2/3] CBG-4209: Add test for blip doc update attachment metadata migration (#7119) * CBG-4209: Add test for blip doc update attachment metadata migration * fix spelling error --- rest/attachment_test.go | 72 +++++++++++++++++++++++++++++++++++++++++ 1 file changed, 72 insertions(+) diff --git a/rest/attachment_test.go b/rest/attachment_test.go index dbd8c67a3a..cf4aed0c7a 100644 --- a/rest/attachment_test.go +++ b/rest/attachment_test.go @@ -2252,6 +2252,78 @@ func TestAttachmentDeleteOnExpiry(t *testing.T) { } } + +// TestUpdateViaBlipMigrateAttachment: +// - Tests document update through blip to a doc with attachment metadata deined in sync data +// - Assert that the c doc update this way will migrate the attachment metadata from sync data to global sync data +func TestUpdateViaBlipMigrateAttachment(t *testing.T) { + rtConfig := &RestTesterConfig{ + GuestEnabled: true, + } + + btcRunner := NewBlipTesterClientRunner(t) + btcRunner.SkipSubtest[VersionVectorSubtestName] = true // attachments not yet replicated in V4 protocol + const ( + doc1ID = "doc1" + ) + btcRunner.Run(func(t *testing.T, SupportedBLIPProtocols []string) { + rt := NewRestTester(t, rtConfig) + defer rt.Close() + + opts := &BlipTesterClientOpts{SupportedBLIPProtocols: SupportedBLIPProtocols} + btc := btcRunner.NewBlipTesterClientOptsWithRT(rt, opts) + defer btc.Close() + ds := rt.GetSingleDataStore() + ctx := base.TestCtx(t) + + initialVersion := btc.rt.PutDoc(doc1ID, `{"_attachments": {"hello.txt": {"data": "aGVsbG8gd29ybGQ="}}}`) + btc.rt.WaitForPendingChanges() + btcRunner.StartOneshotPull(btc.id) + btcRunner.WaitForVersion(btc.id, doc1ID, initialVersion) + + value, xattrs, cas, err := ds.GetWithXattrs(ctx, doc1ID, []string{base.SyncXattrName, base.GlobalXattrName}) + require.NoError(t, err) + syncXattr, ok := xattrs[base.SyncXattrName] + require.True(t, ok) + globalXattr, ok := xattrs[base.GlobalXattrName] + require.True(t, ok) + + var attachs db.GlobalSyncData + err = base.JSONUnmarshal(globalXattr, &attachs) + require.NoError(t, err) + + // move attachment metadata from global xattr to sync xattr + db.MoveAttachmentXattrFromGlobalToSync(t, ctx, doc1ID, cas, value, syncXattr, attachs.GlobalAttachments, true, ds) + + // push revision from client + doc1Version, err := btcRunner.PushRev(btc.id, doc1ID, initialVersion, []byte(`{"new": "val", "_attachments": {"hello.txt": {"data": "aGVsbG8gd29ybGQ="}}}`)) + require.NoError(t, err) + assert.NoError(t, rt.WaitForVersion(doc1ID, doc1Version)) + + // assert the pushed rev updates the doc in bucket and migrates attachment metadata in process + xattrs, _, err = ds.GetXattrs(ctx, doc1ID, []string{base.SyncXattrName, base.GlobalXattrName}) + require.NoError(t, err) + syncXattr, ok = xattrs[base.SyncXattrName] + require.True(t, ok) + globalXattr, ok = xattrs[base.GlobalXattrName] + require.True(t, ok) + + // empty global sync, + attachs = db.GlobalSyncData{} + err = base.JSONUnmarshal(globalXattr, &attachs) + require.NoError(t, err) + var syncData db.SyncData + err = base.JSONUnmarshal(syncXattr, &syncData) + require.NoError(t, err) + + // assert that the attachment metadata has been moved + assert.NotNil(t, attachs.GlobalAttachments) + assert.Nil(t, syncData.Attachments) + att := attachs.GlobalAttachments["hello.txt"].(map[string]interface{}) + assert.Equal(t, float64(11), att["length"]) + }) +} + func TestUpdateExistingAttachment(t *testing.T) { rtConfig := &RestTesterConfig{ GuestEnabled: true, From edc64074ac9b68934d244ba908b307d83a9e0eba Mon Sep 17 00:00:00 2001 From: Tor Colvin Date: Thu, 26 Sep 2024 11:50:59 -0400 Subject: [PATCH 3/3] CBG-4253 create interfaces for integration testing (#7112) * CBG-4253 create interfaces for integration testing * fixups: - move code to _test.go - force removal of BucketID for couchbase lite peers - remove uneeded SyncGatewayPeerID, now defined at replication time - use a better name to define a database (since peer ids are unique, but test names are too long) - define CouchbaseLitePeerType but the only implementation is a mock peer --- rest/config_test.go | 3 +- rest/utilities_testing.go | 9 + ...st.go => utilities_testing_blip_client.go} | 15 +- topologytest/couchbase_lite_mock_peer_test.go | 155 +++++++++ topologytest/couchbase_server_peer_test.go | 168 ++++++++++ topologytest/hlv_test.go | 61 ++++ topologytest/main_test.go | 26 ++ topologytest/peer_test.go | 206 ++++++++++++ topologytest/sync_gateway_peer_test.go | 94 ++++++ topologytest/topologies_test.go | 314 ++++++++++++++++++ xdcr/cbs_xdcr.go | 32 +- xdcr/rosmar_xdcr.go | 1 - 12 files changed, 1052 insertions(+), 32 deletions(-) rename rest/{blip_client_test.go => utilities_testing_blip_client.go} (99%) create mode 100644 topologytest/couchbase_lite_mock_peer_test.go create mode 100644 topologytest/couchbase_server_peer_test.go create mode 100644 topologytest/hlv_test.go create mode 100644 topologytest/main_test.go create mode 100644 topologytest/peer_test.go create mode 100644 topologytest/sync_gateway_peer_test.go create mode 100644 topologytest/topologies_test.go diff --git a/rest/config_test.go b/rest/config_test.go index 9f1aa44345..b8e6f7410a 100644 --- a/rest/config_test.go +++ b/rest/config_test.go @@ -2362,9 +2362,8 @@ func TestInvalidJavascriptFunctions(t *testing.T) { for _, testCase := range testCases { t.Run(testCase.Name, func(t *testing.T) { - safeDbName := strings.ToLower(strings.ReplaceAll(testCase.Name, " ", "-")) dbConfig := DbConfig{ - Name: safeDbName, + Name: SafeDatabaseName(t, testCase.Name), } if testCase.SyncFunction != nil { diff --git a/rest/utilities_testing.go b/rest/utilities_testing.go index fabe8f5a2c..108ee0d2fa 100644 --- a/rest/utilities_testing.go +++ b/rest/utilities_testing.go @@ -2850,3 +2850,12 @@ func RequireGocbDCPResync(t *testing.T) { t.Skip("This test only works against Couchbase Server since rosmar has no support for DCP resync") } } + +// SafeDatabaseName returns a database name free of any special characters for use in tests. +func SafeDatabaseName(t *testing.T, name string) string { + dbName := strings.ToLower(name) + for _, c := range []string{" ", "<", ">", "/", "="} { + dbName = strings.ReplaceAll(dbName, c, "_") + } + return dbName +} diff --git a/rest/blip_client_test.go b/rest/utilities_testing_blip_client.go similarity index 99% rename from rest/blip_client_test.go rename to rest/utilities_testing_blip_client.go index a58b0457f3..474eec5629 100644 --- a/rest/blip_client_test.go +++ b/rest/utilities_testing_blip_client.go @@ -44,6 +44,7 @@ type BlipTesterClientOpts struct { SupportedBLIPProtocols []string SkipCollectionsInitialization bool + AllowCreationWithoutBlipTesterClientRunner bool // Allow the client to be created outside of a BlipTesterClientRunner.Run() subtest // a deltaSrc rev ID for which to reject a delta rejectDeltasForSrcRev string @@ -646,12 +647,12 @@ func getCollectionsForBLIP(_ testing.TB, rt *RestTester) []string { } func (btcRunner *BlipTestClientRunner) NewBlipTesterClientOptsWithRT(rt *RestTester, opts *BlipTesterClientOpts) (client *BlipTesterClient) { - if !btcRunner.initialisedInsideRunnerCode { - require.FailNow(btcRunner.TB(), "must initialise BlipTesterClient inside Run() method") - } if opts == nil { opts = &BlipTesterClientOpts{} } + if !opts.AllowCreationWithoutBlipTesterClientRunner && !btcRunner.initialisedInsideRunnerCode { + require.FailNow(btcRunner.TB(), "must initialise BlipTesterClient inside Run() method") + } id, err := uuid.NewRandom() require.NoError(btcRunner.TB(), err) @@ -667,6 +668,11 @@ func (btcRunner *BlipTestClientRunner) NewBlipTesterClientOptsWithRT(rt *RestTes return client } +// ID returns the unique ID of the client. +func (btc *BlipTesterClient) ID() uint32 { + return btc.id +} + // TB returns testing.TB for the current test func (btc *BlipTesterClient) TB() testing.TB { return btc.rt.TB() @@ -1238,11 +1244,12 @@ func (btc *BlipTesterCollectionClient) WaitForDoc(docID string) (data []byte) { if data, found := btc.GetDoc(docID); found { return data } + timeout := 10 * time.Second require.EventuallyWithT(btc.TB(), func(c *assert.CollectT) { var found bool data, found = btc.GetDoc(docID) assert.True(c, found, "Could not find docID:%+v", docID) - }, 10*time.Second, 50*time.Millisecond, "BlipTesterClient timed out waiting for doc %+v", docID) + }, timeout, 50*time.Millisecond, "BlipTesterClient timed out waiting for doc %+v after %s", docID, timeout) return data } diff --git a/topologytest/couchbase_lite_mock_peer_test.go b/topologytest/couchbase_lite_mock_peer_test.go new file mode 100644 index 0000000000..857eefc88c --- /dev/null +++ b/topologytest/couchbase_lite_mock_peer_test.go @@ -0,0 +1,155 @@ +// Copyright 2024-Present Couchbase, Inc. +// +// Use of this software is governed by the Business Source License included +// in the file licenses/BSL-Couchbase.txt. As of the Change Date specified +// in that file, in accordance with the Business Source License, use of this +// software will be governed by the Apache License, Version 2.0, included in +// the file licenses/APL2.txt. + +package topologytest + +import ( + "fmt" + "testing" + + sgbucket "github.com/couchbase/sg-bucket" + "github.com/couchbase/sync_gateway/base" + "github.com/couchbase/sync_gateway/db" + "github.com/couchbase/sync_gateway/rest" + "github.com/stretchr/testify/require" +) + +// PeerBlipTesterClient is a wrapper around a BlipTesterClientRunner and BlipTesterClient, which need to match for a given Couchbase Lite interface. +type PeerBlipTesterClient struct { + btcRunner *rest.BlipTestClientRunner + btc *rest.BlipTesterClient +} + +// ID returns the unique ID of the blip client. +func (p *PeerBlipTesterClient) ID() uint32 { + return p.btc.ID() +} + +// CouchbaseLiteMockPeer represents an in-memory Couchbase Lite peer. This utilizes BlipTesterClient from the rest package to send and receive blip messages. +type CouchbaseLiteMockPeer struct { + t *testing.T + blipClients map[string]*PeerBlipTesterClient + name string +} + +func (p *CouchbaseLiteMockPeer) String() string { + return p.name +} + +// GetDocument returns the latest version of a document. The test will fail the document does not exist. +func (p *CouchbaseLiteMockPeer) GetDocument(dsName sgbucket.DataStoreName, docID string) (rest.DocVersion, db.Body) { + // this isn't yet collection aware, using single default collection + return rest.EmptyDocVersion(), nil +} + +// getSingleBlipClient returns the single blip client for the peer. If there are multiple clients, or not clients it will fail the test. This is temporary to stub support for multiple Sync Gateway peers. +func (p *CouchbaseLiteMockPeer) getSingleBlipClient() *PeerBlipTesterClient { + // this isn't yet collection aware, using single default collection + if len(p.blipClients) != 1 { + require.Fail(p.t, "blipClients haven't been created for %s, a temporary limitation of CouchbaseLiteMockPeer", p) + } + for _, c := range p.blipClients { + return c + } + require.Fail(p.t, "no blipClients found for %s", p) + return nil +} + +// WriteDocument writes a document to the peer. The test will fail if the write does not succeed. +func (p *CouchbaseLiteMockPeer) WriteDocument(dsName sgbucket.DataStoreName, docID string, body []byte) rest.DocVersion { + // this isn't yet collection aware, using single default collection + client := p.getSingleBlipClient() + // set an HLV here. + docVersion, err := client.btcRunner.PushRev(client.ID(), docID, rest.EmptyDocVersion(), body) + require.NoError(client.btcRunner.TB(), err) + return docVersion +} + +// WaitForDocVersion waits for a document to reach a specific version. The test will fail if the document does not reach the expected version in 20s. +func (p *CouchbaseLiteMockPeer) WaitForDocVersion(dsName sgbucket.DataStoreName, docID string, expected rest.DocVersion) db.Body { + // this isn't yet collection aware, using single default collection + client := p.getSingleBlipClient() + // FIXME: waiting for a specific version isn't working yet. + bodyBytes := client.btcRunner.WaitForDoc(client.ID(), docID) + var body db.Body + require.NoError(p.t, base.JSONUnmarshal(bodyBytes, &body)) + return body +} + +// RequireDocNotFound asserts that a document does not exist on the peer. +func (p *CouchbaseLiteMockPeer) RequireDocNotFound(dsName sgbucket.DataStoreName, docID string) { + // not implemented yet in blip client tester + // _, err := p.btcRunner.GetDoc(p.btc.id, docID) + // base.RequireDocNotFoundError(p.btcRunner.TB(), err) +} + +// Close will shut down the peer and close any active replications on the peer. +func (p *CouchbaseLiteMockPeer) Close() { + for _, c := range p.blipClients { + c.btc.Close() + } +} + +// CreateReplication creates a replication instance +func (p *CouchbaseLiteMockPeer) CreateReplication(peer Peer, config PeerReplicationConfig) PeerReplication { + sg, ok := peer.(*SyncGatewayPeer) + if !ok { + require.Fail(p.t, fmt.Sprintf("unsupported peer type %T for pull replication", peer)) + } + replication := &CouchbaseLiteMockReplication{ + activePeer: p, + passivePeer: peer, + btcRunner: rest.NewBlipTesterClientRunner(sg.rt.TB().(*testing.T)), + } + replication.btc = replication.btcRunner.NewBlipTesterClientOptsWithRT(sg.rt, &rest.BlipTesterClientOpts{ + Username: "user", + Channels: []string{"*"}, + SupportedBLIPProtocols: []string{db.CBMobileReplicationV4.SubprotocolString()}, + AllowCreationWithoutBlipTesterClientRunner: true, + }, + ) + p.blipClients[sg.String()] = &PeerBlipTesterClient{ + btcRunner: replication.btcRunner, + btc: replication.btc, + } + return replication +} + +// GetBackingBucket returns the backing bucket for the peer. This is always nil. +func (p *CouchbaseLiteMockPeer) GetBackingBucket() base.Bucket { + return nil +} + +// CouchbaseLiteMockReplication represents a replication between Couchbase Lite and Sync Gateway. This can be a push or pull replication. +type CouchbaseLiteMockReplication struct { + activePeer Peer + passivePeer Peer + btc *rest.BlipTesterClient + btcRunner *rest.BlipTestClientRunner +} + +// ActivePeer returns the peer sending documents +func (r *CouchbaseLiteMockReplication) ActivePeer() Peer { + return r.activePeer +} + +// PassivePeer returns the peer receiving documents +func (r *CouchbaseLiteMockReplication) PassivePeer() Peer { + return r.passivePeer +} + +// Start starts the replication +func (r *CouchbaseLiteMockReplication) Start() { + r.btcRunner.StartPull(r.btc.ID()) +} + +// Stop halts the replication. The replication can be restarted after it is stopped. +func (r *CouchbaseLiteMockReplication) Stop() { + _, err := r.btcRunner.UnsubPullChanges(r.btc.ID()) + require.NoError(r.btcRunner.TB(), err) +} diff --git a/topologytest/couchbase_server_peer_test.go b/topologytest/couchbase_server_peer_test.go new file mode 100644 index 0000000000..96418e7118 --- /dev/null +++ b/topologytest/couchbase_server_peer_test.go @@ -0,0 +1,168 @@ +// Copyright 2024-Present Couchbase, Inc. +// +// Use of this software is governed by the Business Source License included +// in the file licenses/BSL-Couchbase.txt. As of the Change Date specified +// in that file, in accordance with the Business Source License, use of this +// software will be governed by the Apache License, Version 2.0, included in +// the file licenses/APL2.txt. + +package topologytest + +import ( + "context" + "fmt" + "testing" + "time" + + sgbucket "github.com/couchbase/sg-bucket" + "github.com/couchbase/sync_gateway/base" + "github.com/couchbase/sync_gateway/db" + "github.com/couchbase/sync_gateway/rest" + "github.com/couchbase/sync_gateway/xdcr" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" +) + +// CouchbaseServerPeer represents an instance of a backing server (bucket). This is rosmar unless SG_TEST_BACKING_STORE=couchbase is set. +type CouchbaseServerPeer struct { + tb testing.TB + bucket base.Bucket + pullReplications map[Peer]xdcr.Manager + pushReplications map[Peer]xdcr.Manager + name string +} + +// CouchbaseServerReplication represents a unidirectional replication between two CouchbaseServerPeers. These are two buckets, using bucket to bucket XDCR. A rosmar implementation is used if SG_TEST_BACKING_STORE is unset. +type CouchbaseServerReplication struct { + t testing.TB + ctx context.Context + activePeer Peer + passivePeer Peer + manager xdcr.Manager +} + +// ActivePeer returns the peer sending documents +func (r *CouchbaseServerReplication) ActivePeer() Peer { + return r.activePeer +} + +// PassivePeer returns the peer receiving documents +func (r *CouchbaseServerReplication) PassivePeer() Peer { + return r.passivePeer +} + +// Start starts the replication +func (r *CouchbaseServerReplication) Start() { + require.NoError(r.t, r.manager.Start(r.ctx)) +} + +// Stop halts the replication. The replication can be restarted after it is stopped. +func (r *CouchbaseServerReplication) Stop() { + require.NoError(r.t, r.manager.Stop(r.ctx)) +} + +func (p *CouchbaseServerPeer) String() string { + return p.name +} + +func (p *CouchbaseServerPeer) ctx() context.Context { + return base.TestCtx(p.tb) +} + +func (p *CouchbaseServerPeer) getCollection(dsName sgbucket.DataStoreName) sgbucket.DataStore { + collection, err := p.bucket.NamedDataStore(dsName) + require.NoError(p.tb, err) + return collection +} + +// GetDocument returns the latest version of a document. The test will fail the document does not exist. +func (p *CouchbaseServerPeer) GetDocument(dsName sgbucket.DataStoreName, docID string) (rest.DocVersion, db.Body) { + docBytes, _, _, err := p.getCollection(dsName).GetWithXattrs(p.ctx(), docID, []string{base.SyncXattrName, base.VvXattrName}) + require.NoError(p.tb, err) + // get hlv to construct DocVersion + var body db.Body + require.NoError(p.tb, base.JSONUnmarshal(docBytes, &body)) + return rest.EmptyDocVersion(), body +} + +// WriteDocument writes a document to the peer. The test will fail if the write does not succeed. +func (p *CouchbaseServerPeer) WriteDocument(dsName sgbucket.DataStoreName, docID string, body []byte) rest.DocVersion { + err := p.getCollection(dsName).Set(docID, 0, nil, body) + require.NoError(p.tb, err) + return rest.EmptyDocVersion() +} + +// WaitForDocVersion waits for a document to reach a specific version. The test will fail if the document does not reach the expected version in 20s. +func (p *CouchbaseServerPeer) WaitForDocVersion(dsName sgbucket.DataStoreName, docID string, expected rest.DocVersion) db.Body { + var docBytes []byte + require.EventuallyWithT(p.tb, func(c *assert.CollectT) { + var err error + docBytes, _, _, err = p.getCollection(dsName).GetWithXattrs(p.ctx(), docID, []string{base.SyncXattrName, base.VvXattrName}) + assert.NoError(c, err) + }, 5*time.Second, 100*time.Millisecond) + // get hlv to construct DocVersion + var body db.Body + require.NoError(p.tb, base.JSONUnmarshal(docBytes, &body), "couldn't unmarshal docID %s: %s", docID, docBytes) + return body +} + +// RequireDocNotFound asserts that a document does not exist on the peer. +func (p *CouchbaseServerPeer) RequireDocNotFound(dsName sgbucket.DataStoreName, docID string) { + _, err := p.getCollection(dsName).Get(docID, nil) + base.RequireDocNotFoundError(p.tb, err) +} + +// Close will shut down the peer and close any active replications on the peer. +func (p *CouchbaseServerPeer) Close() { + for _, r := range p.pullReplications { + assert.NoError(p.tb, r.Stop(p.ctx())) + } + for _, r := range p.pushReplications { + assert.NoError(p.tb, r.Stop(p.ctx())) + } +} + +// CreateReplication creates an XDCR manager. +func (p *CouchbaseServerPeer) CreateReplication(passivePeer Peer, config PeerReplicationConfig) PeerReplication { + switch config.direction { + case PeerReplicationDirectionPull: + _, ok := p.pullReplications[passivePeer] + if ok { + require.Fail(p.tb, fmt.Sprintf("pull replication already exists for %s-%s", p, passivePeer)) + } + r, err := xdcr.NewXDCR(p.ctx(), passivePeer.GetBackingBucket(), p.bucket, xdcr.XDCROptions{Mobile: xdcr.MobileOn}) + require.NoError(p.tb, err) + p.pullReplications[passivePeer] = r + + return &CouchbaseServerReplication{ + activePeer: p, + passivePeer: passivePeer, + t: p.tb.(*testing.T), + ctx: p.ctx(), + manager: r, + } + case PeerReplicationDirectionPush: + _, ok := p.pushReplications[passivePeer] + if ok { + require.Fail(p.tb, fmt.Sprintf("pull replication already exists for %s-%s", p, passivePeer)) + } + r, err := xdcr.NewXDCR(p.ctx(), p.bucket, passivePeer.GetBackingBucket(), xdcr.XDCROptions{Mobile: xdcr.MobileOn}) + require.NoError(p.tb, err) + p.pushReplications[passivePeer] = r + return &CouchbaseServerReplication{ + activePeer: p, + passivePeer: passivePeer, + t: p.tb.(*testing.T), + ctx: p.ctx(), + manager: r, + } + default: + require.Fail(p.tb, fmt.Sprintf("unsupported replication direction %d for %s-%s", config.direction, p, passivePeer)) + } + return nil +} + +// GetBackingBucket returns the backing bucket for the peer. +func (p *CouchbaseServerPeer) GetBackingBucket() base.Bucket { + return p.bucket +} diff --git a/topologytest/hlv_test.go b/topologytest/hlv_test.go new file mode 100644 index 0000000000..c55d6c339a --- /dev/null +++ b/topologytest/hlv_test.go @@ -0,0 +1,61 @@ +// Copyright 2024-Present Couchbase, Inc. +// +// Use of this software is governed by the Business Source License included +// in the file licenses/BSL-Couchbase.txt. As of the Change Date specified +// in that file, in accordance with the Business Source License, use of this +// software will be governed by the Apache License, Version 2.0, included in +// the file licenses/APL2.txt. + +package topologytest + +import ( + "fmt" + "testing" + + "github.com/couchbase/sync_gateway/base" + "github.com/couchbase/sync_gateway/db" + + "github.com/stretchr/testify/require" +) + +func getSingleDsName() base.ScopeAndCollectionName { + if base.TestsUseNamedCollections() { + return base.ScopeAndCollectionName{Scope: "sg_test_0", Collection: "sg_test_0"} + } + return base.DefaultScopeAndCollectionName() +} + +// TestHLVCreateDocumentSingleActor tests creating a document with a single actor in different topologies. +func TestHLVCreateDocumentSingleActor(t *testing.T) { + collectionName := getSingleDsName() + for i, tc := range Topologies { + t.Run(tc.description, func(t *testing.T) { + for peerID := range tc.peers { + t.Run("actor="+peerID, func(t *testing.T) { + peers := createPeers(t, tc.peers) + replications := CreatePeerReplications(t, peers, tc.replications) + for _, replication := range replications { + // temporarily start the replication before writing the document, limitation of CouchbaseLiteMockPeer as active peer since WriteDocument is calls PushRev + replication.Start() + } + docID := fmt.Sprintf("doc_%d_%s", i, peerID) + + docBody := []byte(fmt.Sprintf(`{"peer": "%s", "topology": "%s"}`, peerID, tc.description)) + docVersion := peers[peerID].WriteDocument(collectionName, docID, docBody) + for _, peer := range peers { + t.Logf("waiting for doc version on %s, written from %s", peer, peerID) + body := peer.WaitForDocVersion(collectionName, docID, docVersion) + // remove internal properties to do a comparison + stripInternalProperties(body) + require.JSONEq(t, string(docBody), string(base.MustJSONMarshal(t, body))) + } + }) + } + }) + } +} + +func stripInternalProperties(body db.Body) { + delete(body, "_rev") + delete(body, "_id") +} diff --git a/topologytest/main_test.go b/topologytest/main_test.go new file mode 100644 index 0000000000..923c992f9c --- /dev/null +++ b/topologytest/main_test.go @@ -0,0 +1,26 @@ +/* +Copyright 2020-Present Couchbase, Inc. + +Use of this software is governed by the Business Source License included in +the file licenses/BSL-Couchbase.txt. As of the Change Date specified in that +file, in accordance with the Business Source License, use of this software will +be governed by the Apache License, Version 2.0, included in the file +licenses/APL2.txt. +*/ + +package topologytest + +import ( + "context" + "testing" + + "github.com/couchbase/sync_gateway/base" + "github.com/couchbase/sync_gateway/db" +) + +func TestMain(m *testing.M) { + ctx := context.Background() // start of test process + tbpOptions := base.TestBucketPoolOptions{MemWatermarkThresholdMB: 2048} + // Do not create indexes for this test, so they are built by server_context.go + db.TestBucketPoolWithIndexes(ctx, m, tbpOptions) +} diff --git a/topologytest/peer_test.go b/topologytest/peer_test.go new file mode 100644 index 0000000000..396cce316a --- /dev/null +++ b/topologytest/peer_test.go @@ -0,0 +1,206 @@ +// Copyright 2024-Present Couchbase, Inc. +// +// Use of this software is governed by the Business Source License included +// in the file licenses/BSL-Couchbase.txt. As of the Change Date specified +// in that file, in accordance with the Business Source License, use of this +// software will be governed by the Apache License, Version 2.0, included in +// the file licenses/APL2.txt. + +// Package topologytest implements code to be able to test with Couchbase Server, Sync Gateway, and Couchbase Lite from a go test. This can be with Couchbase Server or rosmar depending on SG_TEST_BACKING_STORE. Couchbase Lite can either be an in memory implementation of a Couchbase Lite peer, or a real Couchbase Lite peer. +package topologytest + +import ( + "fmt" + "testing" + + sgbucket "github.com/couchbase/sg-bucket" + "github.com/couchbase/sync_gateway/base" + "github.com/couchbase/sync_gateway/db" + "github.com/couchbase/sync_gateway/rest" + "github.com/couchbase/sync_gateway/xdcr" + "github.com/stretchr/testify/require" +) + +// Peer represents a peer in an Mobile workflow. The types of Peers are Couchbase Server, Sync Gateway, or Couchbase Lite. +type Peer interface { + // GetDocument returns the latest version of a document. The test will fail the document does not exist. + GetDocument(dsName sgbucket.DataStoreName, docID string) (rest.DocVersion, db.Body) + // WriteDocument writes a document to the peer. The test will fail if the write does not succeed. + WriteDocument(dsName sgbucket.DataStoreName, docID string, body []byte) rest.DocVersion + + // WaitForDocVersion waits for a document to reach a specific version. The test will fail if the document does not reach the expected version in 20s. + WaitForDocVersion(dsName sgbucket.DataStoreName, docID string, expected rest.DocVersion) db.Body + + // RequireDocNotFound asserts that a document does not exist on the peer. + RequireDocNotFound(dsName sgbucket.DataStoreName, docID string) + + // CreateReplication creates a replication instance + CreateReplication(Peer, PeerReplicationConfig) PeerReplication + + // Close will shut down the peer and close any active replications on the peer. + Close() + + // GetBackingBucket returns the backing bucket for the peer. This is nil when the peer is a Couchbase Lite peer. + GetBackingBucket() base.Bucket +} + +// PeerReplication represents a replication between two peers. This replication is unidirectional since all bi-directional replications are represented by two unidirectional instances. +type PeerReplication interface { + // ActivePeer returns the peer sending documents + ActivePeer() Peer + // PassivePeer returns the peer receiving documents + PassivePeer() Peer + // Start starts the replication + Start() + // Stop halts the replication. The replication can be restarted after it is stopped. + Stop() +} + +var _ PeerReplication = &CouchbaseLiteMockReplication{} +var _ PeerReplication = &CouchbaseServerReplication{} +var _ PeerReplication = &CouchbaseServerReplication{} + +// PeerReplicationDirection represents the direction of a replication from the active peer. +type PeerReplicationDirection int + +const ( + // PeerReplicationDirectionPush pushes data from an active peer to a passive peer. + PeerReplicationDirectionPush PeerReplicationDirection = iota + // PeerReplicationDirectionPull pulls data from an active peer to a passive peer. + PeerReplicationDirectionPull +) + +// PeerReplicationConfig represents the configuration for a given replication. +type PeerReplicationConfig struct { + direction PeerReplicationDirection + // oneShot bool // not implemented, would only be supported for SG <-> CBL, XDCR is always continuous +} + +// PeerReplicationDefinition defines a pair of peers and a configuration. +type PeerReplicationDefinition struct { + activePeer string + passivePeer string + config PeerReplicationConfig +} + +var _ Peer = &CouchbaseServerPeer{} +var _ Peer = &CouchbaseLiteMockPeer{} +var _ Peer = &SyncGatewayPeer{} + +// PeerType represents the type of a peer. These will be: +// +// - Couchbase Server (backed by TestBucket) +// - rosmar default +// - Couchbase Server based on SG_TEST_BACKING_STORE=couchbase +// +// - Sync Gateway (backed by RestTester) +// +// - Couchbase Lite +// - CouchbaseLiteMockPeer is in memory backed by BlipTesterClient +// - CouchbaseLitePeer (backed by Test Server) Not Yet Implemented +type PeerType int + +const ( + // PeerTypeCouchbaseServer represents a Couchbase Server peer. This can be backed by rosmar or couchbase server (controlled by SG_TEST_BACKING_STORE). + PeerTypeCouchbaseServer PeerType = iota + // PeerTypeCouchbaseLite represents a Couchbase Lite peer. This is currently backed in memory but will be backed by in memory structure that will send and receive blip messages. Future expansion to real Couchbase Lite peer in CBG-4260. + PeerTypeCouchbaseLite + // PeerTypeSyncGateway represents a Sync Gateway peer backed by a RestTester. + PeerTypeSyncGateway +) + +// PeerBucketID represents a specific bucket for a test. This allows multiple Sync Gateway instances to point to the same bucket, or a different buckets. There is no significance to the numbering of the buckets. We can use as many buckets as the MainTestBucketPool allows. +type PeerBucketID int + +const ( + // PeerBucketNoBackingBucket represents a peer that does not have a backing bucket. This is used for Couchbase Lite peers. + PeerBucketNoBackingBucket PeerBucketID = iota + // PeerBucketID1 represents the first bucket in the test. + PeerBucketID1 // start at 1 to avoid 0 value being accidentally used + // PeerBucketID2 represents the second bucket in the test. + PeerBucketID2 +) + +// PeerOptions are options to create a peer. +type PeerOptions struct { + Type PeerType + BucketID PeerBucketID // BucketID is used to identify the bucket for a Couchbase Server or Sync Gateway peer. This option is ignored for Couchbase Lite peers. +} + +// NewPeer creates a new peer for replication. The buckets must be created before the peers are created. +func NewPeer(t *testing.T, name string, buckets map[PeerBucketID]*base.TestBucket, opts PeerOptions) Peer { + switch opts.Type { + case PeerTypeCouchbaseServer: + bucket, ok := buckets[opts.BucketID] + require.True(t, ok, "bucket not found for bucket ID %d", opts.BucketID) + return &CouchbaseServerPeer{ + name: name, + tb: t, + bucket: bucket, + pullReplications: make(map[Peer]xdcr.Manager), + pushReplications: make(map[Peer]xdcr.Manager), + } + case PeerTypeCouchbaseLite: + require.Equal(t, PeerBucketNoBackingBucket, opts.BucketID, "bucket should not be specified for Couchbase Lite peer %+v", opts) + _, ok := buckets[opts.BucketID] + require.False(t, ok, "bucket should not be specified for Couchbase Lite peer") + return &CouchbaseLiteMockPeer{ + t: t, + name: name, + blipClients: make(map[string]*PeerBlipTesterClient), + } + case PeerTypeSyncGateway: + bucket, ok := buckets[opts.BucketID] + require.True(t, ok, "bucket not found for bucket ID %d", opts.BucketID) + return newSyncGatewayPeer(t, name, bucket) + default: + require.Fail(t, fmt.Sprintf("unsupported peer type %T", opts.Type)) + } + return nil +} + +// CreatePeerReplications creates a list of peers and replications. The replications will not have started. +func CreatePeerReplications(t *testing.T, peers map[string]Peer, configs []PeerReplicationDefinition) []PeerReplication { + replications := make([]PeerReplication, 0, len(configs)) + for _, config := range configs { + activePeer, ok := peers[config.activePeer] + require.True(t, ok, "active peer %s not found", config.activePeer) + passivePeer, ok := peers[config.passivePeer] + require.True(t, ok, "passive peer %s not found", config.passivePeer) + replications = append(replications, activePeer.CreateReplication(passivePeer, config.config)) + } + return replications +} + +// getPeerBuckets returns a map of bucket IDs to buckets for a list of peers. This requires sufficient number of buckets in the bucket pool. The buckets will be released with a testing.T.Cleanup function. +func getPeerBuckets(t *testing.T, peerOptions map[string]PeerOptions) map[PeerBucketID]*base.TestBucket { + buckets := make(map[PeerBucketID]*base.TestBucket) + for _, p := range peerOptions { + if p.BucketID == PeerBucketNoBackingBucket { + continue + } + _, ok := buckets[p.BucketID] + if !ok { + bucket := base.GetTestBucket(t) + buckets[p.BucketID] = bucket + t.Cleanup(func() { + bucket.Close(base.TestCtx(t)) + }) + } + } + return buckets +} + +// createPeers will create a sets of peers. The underlying buckets will be created. The peers will be closed and the buckets will be destroyed. +func createPeers(t *testing.T, peersOptions map[string]PeerOptions) map[string]Peer { + buckets := getPeerBuckets(t, peersOptions) + peers := make(map[string]Peer, len(peersOptions)) + for id, peerOptions := range peersOptions { + peer := NewPeer(t, id, buckets, peerOptions) + t.Cleanup(func() { + peer.Close() + }) + peers[id] = peer + } + return peers +} diff --git a/topologytest/sync_gateway_peer_test.go b/topologytest/sync_gateway_peer_test.go new file mode 100644 index 0000000000..016352cb6e --- /dev/null +++ b/topologytest/sync_gateway_peer_test.go @@ -0,0 +1,94 @@ +// Copyright 2024-Present Couchbase, Inc. +// +// Use of this software is governed by the Business Source License included +// in the file licenses/BSL-Couchbase.txt. As of the Change Date specified +// in that file, in accordance with the Business Source License, use of this +// software will be governed by the Apache License, Version 2.0, included in +// the file licenses/APL2.txt. + +package topologytest + +import ( + "net/http" + "testing" + "time" + + sgbucket "github.com/couchbase/sg-bucket" + "github.com/couchbase/sync_gateway/base" + "github.com/couchbase/sync_gateway/db" + "github.com/couchbase/sync_gateway/rest" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" +) + +type SyncGatewayPeer struct { + rt *rest.RestTester + name string +} + +func newSyncGatewayPeer(t *testing.T, name string, bucket *base.TestBucket) Peer { + rt := rest.NewRestTester(t, &rest.RestTesterConfig{ + PersistentConfig: true, + CustomTestBucket: bucket.NoCloseClone(), + }) + config := rt.NewDbConfig() + config.AutoImport = base.BoolPtr(true) + rest.RequireStatus(t, rt.CreateDatabase(rest.SafeDatabaseName(t, name), config), http.StatusCreated) + return &SyncGatewayPeer{ + name: name, + rt: rt, + } +} + +func (p *SyncGatewayPeer) String() string { + return p.name +} + +// GetDocument returns the latest version of a document. The test will fail the document does not exist. +func (p *SyncGatewayPeer) GetDocument(dsName sgbucket.DataStoreName, docID string) (rest.DocVersion, db.Body) { + // this function is not yet collections aware + return p.rt.GetDoc(docID) +} + +// WriteDocument writes a document to the peer. The test will fail if the write does not succeed. +func (p *SyncGatewayPeer) WriteDocument(dsName sgbucket.DataStoreName, docID string, body []byte) rest.DocVersion { + // this function is not yet collections aware + return p.rt.PutDoc(docID, string(body)) +} + +// WaitForDocVersion waits for a document to reach a specific version. The test will fail if the document does not reach the expected version in 20s. +func (p *SyncGatewayPeer) WaitForDocVersion(dsName sgbucket.DataStoreName, docID string, expected rest.DocVersion) db.Body { + // this function is not yet collections aware + var body db.Body + require.EventuallyWithT(p.rt.TB(), func(c *assert.CollectT) { + response := p.rt.SendAdminRequest("GET", "/{{.keyspace}}/"+docID, "") + assert.Equal(c, http.StatusOK, response.Code) + body = nil + assert.NoError(c, base.JSONUnmarshal(response.Body.Bytes(), &body)) + // FIXME can't assert for a specific version right now, not everything returns the correct version. + // assert.Equal(c, expected.RevTreeID, body.ExtractRev()) + }, 10*time.Second, 100*time.Millisecond) + return body +} + +// RequireDocNotFound asserts that a document does not exist on the peer. +func (p *SyncGatewayPeer) RequireDocNotFound(dsName sgbucket.DataStoreName, docID string) { + // _, err := p.rt.GetDoc(docID) + // base.RequireDocNotFoundError(p.rt.TB(), err) +} + +// Close will shut down the peer and close any active replications on the peer. +func (p *SyncGatewayPeer) Close() { + p.rt.Close() +} + +// CreateReplication creates a replication instance. This is currently not supported for Sync Gateway peers. A future ISGR implementation will support this. +func (p *SyncGatewayPeer) CreateReplication(peer Peer, config PeerReplicationConfig) PeerReplication { + require.Fail(p.rt.TB(), "can not create a replication with Sync Gateway as an active peer") + return nil +} + +// GetBackingBucket returns the backing bucket for the peer. +func (p *SyncGatewayPeer) GetBackingBucket() base.Bucket { + return p.rt.Bucket() +} diff --git a/topologytest/topologies_test.go b/topologytest/topologies_test.go new file mode 100644 index 0000000000..563e243483 --- /dev/null +++ b/topologytest/topologies_test.go @@ -0,0 +1,314 @@ +// Copyright 2024-Present Couchbase, Inc. +// +// Use of this software is governed by the Business Source License included +// in the file licenses/BSL-Couchbase.txt. As of the Change Date specified +// in that file, in accordance with the Business Source License, use of this +// software will be governed by the Apache License, Version 2.0, included in +// the file licenses/APL2.txt. + +package topologytest + +// Topology defines a topology for a set of peers and replications. This can include Couchbase Server, Sync Gateway, and Couchbase Lite peers, with push or pull replications between them. +type Topology struct { + description string + peers map[string]PeerOptions + replications []PeerReplicationDefinition +} + +// Topologies represents user configurations of replications. +var Topologies = []Topology{ + { + /* + + - - - - - - + + ' +---------+ ' + ' | cbs1 | ' + ' +---------+ ' + ' +---------+ ' + ' | sg1 | ' + ' +---------+ ' + + - - - - - - + + ^ + | + | + v + +---------+ + | cbl1 | + +---------+ + */ + description: "CBL <-> Sync Gateway <-> CBS", + peers: map[string]PeerOptions{ + "cbs1": {Type: PeerTypeCouchbaseServer, BucketID: PeerBucketID1}, + "sg1": {Type: PeerTypeSyncGateway, BucketID: PeerBucketID1}, + "cbl1": {Type: PeerTypeCouchbaseLite}, + }, + replications: []PeerReplicationDefinition{ + { + activePeer: "cbl1", + passivePeer: "sg1", + config: PeerReplicationConfig{ + direction: PeerReplicationDirectionPull, + }, + }, + { + activePeer: "cbl1", + passivePeer: "sg1", + config: PeerReplicationConfig{ + direction: PeerReplicationDirectionPush, + }, + }, + }, + }, + { + /* + Test topology 1.2 + + + - - - - - - + +- - - - - - -+ + ' cluster A ' ' cluster B ' + ' +---------+ ' ' +---------+ ' + ' | cbs1 | ' <--> ' | cbs2 | ' + ' +---------+ ' ' +---------+ ' + ' +---------+ ' + - - - - - - + + ' | sg1 | ' + ' +---------+ ' + + - - - - - - + + ^ + | + | + v + +---------+ + | cbl1 | + +---------+ + */ + description: "CBL<->SG<->CBS1 CBS1<->CBS2", + peers: map[string]PeerOptions{ + "sg1": {Type: PeerTypeSyncGateway, BucketID: PeerBucketID1}, + "cbs1": {Type: PeerTypeCouchbaseServer, BucketID: PeerBucketID1}, + + "cbs2": {Type: PeerTypeCouchbaseServer, BucketID: PeerBucketID2}, + "cbl1": {Type: PeerTypeCouchbaseLite}, + }, + replications: []PeerReplicationDefinition{ + { + activePeer: "cbs2", + passivePeer: "cbs1", + config: PeerReplicationConfig{ + direction: PeerReplicationDirectionPull, + }, + }, + { + activePeer: "cbs2", + passivePeer: "cbs1", + config: PeerReplicationConfig{ + direction: PeerReplicationDirectionPush, + }, + }, + { + activePeer: "cbl1", + passivePeer: "sg1", + config: PeerReplicationConfig{ + direction: PeerReplicationDirectionPull, + }, + }, + { + activePeer: "cbl1", + passivePeer: "sg1", + config: PeerReplicationConfig{ + direction: PeerReplicationDirectionPush, + }, + }, + }, + }, + { + /* + Test topology 1.3 + + + - - - - - - + +- - - - - - -+ + ' cluster A ' ' cluster B ' + ' +---------+ ' ' +---------+ ' + ' | cbs1 | ' <--> ' | cbs2 | ' + ' +---------+ ' ' +---------+ ' + ' +---------+ ' ' +---------+ ' + ' | sg1 | ' ' | sg2 | ' + ' +---------+ ' ' +---------+ ' + + - - - - - - + +- - - - - - -+ + ^ ^ + | | + | | + v v + +---------+ +---------+ + | cbl1 | | cbl2 | + +---------+ +---------+ + */ + description: "2x CBL<->SG<->CBS XDCR only", + peers: map[string]PeerOptions{ + "cbs1": {Type: PeerTypeCouchbaseServer, BucketID: PeerBucketID1}, + "cbs2": {Type: PeerTypeCouchbaseServer, BucketID: PeerBucketID2}, + "sg1": {Type: PeerTypeSyncGateway, BucketID: PeerBucketID1}, + "cbl1": {Type: PeerTypeCouchbaseLite}, + // TODO: CBG-4270, push replication only exists empemerally + // "cbl1": {Type: PeerTypeCouchbaseLite}, + }, + replications: []PeerReplicationDefinition{ + { + activePeer: "cbs2", + passivePeer: "cbs1", + config: PeerReplicationConfig{ + direction: PeerReplicationDirectionPull, + }, + }, + { + activePeer: "cbs2", + passivePeer: "cbs1", + config: PeerReplicationConfig{ + direction: PeerReplicationDirectionPush, + }, + }, + { + activePeer: "cbl1", + passivePeer: "sg1", + config: PeerReplicationConfig{ + direction: PeerReplicationDirectionPull, + }, + }, + { + activePeer: "cbl1", + passivePeer: "sg1", + config: PeerReplicationConfig{ + direction: PeerReplicationDirectionPush, + }, + }, + }, + }, + // topology 1.4 not present, no P2P supported yet + { + /* + Test topology 1.5 + + + - - - - - - + +- - - - - - -+ + ' cluster A ' ' cluster B ' + ' +---------+ ' ' +---------+ ' + ' | cbs1 | ' <--> ' | cbs2 | ' + ' +---------+ ' ' +---------+ ' + ' +---------+ ' ' +---------+ ' + ' | sg1 | ' ' | sg2 | ' + ' +---------+ ' ' +---------+ ' + + - - - - - - + +- - - - - - -+ + ^ ^ + | | + | | + | | + | +------+ | + +---> | cbl1 | <---+ + +------+ + */ + /* This test doesn't work yet, CouchbaseLiteMockPeer doesn't support writing data to multiple Sync Gateway peers yet + description: "Sync Gateway -> Couchbase Server -> Couchbase Server", + peers: map[string]PeerOptions{ + "cbs1": {Type: PeerTypeCouchbaseServer, BucketID: PeerBucketID1}, + "cbs2": {Type: PeerTypeCouchbaseServer, BucketID: PeerBucketID2}, + "sg1": {Type: PeerTypeSyncGateway, BucketID: PeerBucketID1}, + "sg2": {Type: PeerTypeSyncGateway, BucketID: PeerBucketID2}, + "cbl1": {Type: PeerTypeCouchbaseLite}, + }, + replications: []PeerReplicationDefinition{ + { + activePeer: "cbs2", + passivePeer: "cbs1", + config: PeerReplicationConfig{ + direction: PeerReplicationDirectionPull, + }, + }, + { + activePeer: "cbs2", + passivePeer: "cbs1", + config: PeerReplicationConfig{ + direction: PeerReplicationDirectionPull, + }, + }, + + { + activePeer: "cbl1", + passivePeer: "sg1", + config: PeerReplicationConfig{ + direction: PeerReplicationDirectionPull, + }, + }, + { + activePeer: "cbl1", + passivePeer: "sg1", + config: PeerReplicationConfig{ + direction: PeerReplicationDirectionPush, + }, + }, + { + activePeer: "cbl1", + passivePeer: "sg2", + config: PeerReplicationConfig{ + direction: PeerReplicationDirectionPull, + }, + }, + { + activePeer: "cbl1", + passivePeer: "sg2", + config: PeerReplicationConfig{ + direction: PeerReplicationDirectionPush, + }, + }, + }, + */ + }, +} + +// simpleTopologies represents simplified topologies to make testing the integration test code easier. +// nolint: unused +var simpleTopologies = []Topology{ + { + + /* + +------+ +------+ + | cbs1 | --> | cbs2 | + +------+ +------+ + */ + description: "Couchbase Server -> Couchbase Server", + peers: map[string]PeerOptions{ + "cbs1": {Type: PeerTypeCouchbaseServer, BucketID: PeerBucketID1}, + "cbs2": {Type: PeerTypeCouchbaseServer, BucketID: PeerBucketID2}}, + replications: []PeerReplicationDefinition{ + { + activePeer: "cbs1", + passivePeer: "cbs2", + config: PeerReplicationConfig{ + direction: PeerReplicationDirectionPush, + }, + }, + }, + }, + { + /* + + - - - - - - + +- - - - - - -+ + ' cluster A ' ' cluster B ' + ' +---------+ ' ' +---------+ ' + ' | cbs1 | ' <--> ' | cbs2 | ' + ' +---------+ ' ' +---------+ ' + ' +---------+ ' + - - - - - - + + ' | sg1 | ' + ' +---------+ ' + + - - - - - - + + */ + description: "Couchbase Server (with SG) -> Couchbase Server", + peers: map[string]PeerOptions{ + "cbs1": {Type: PeerTypeCouchbaseServer, BucketID: PeerBucketID1}, + "sg1": {Type: PeerTypeSyncGateway, BucketID: PeerBucketID1}, + "cbs2": {Type: PeerTypeCouchbaseServer, BucketID: PeerBucketID2}, + }, + replications: []PeerReplicationDefinition{ + { + activePeer: "cbs1", + passivePeer: "cbs2", + config: PeerReplicationConfig{ + direction: PeerReplicationDirectionPush, + }, + }, + }, + }, +} diff --git a/xdcr/cbs_xdcr.go b/xdcr/cbs_xdcr.go index 02d620a72f..057d1ed271 100644 --- a/xdcr/cbs_xdcr.go +++ b/xdcr/cbs_xdcr.go @@ -21,7 +21,7 @@ import ( const ( cbsRemoteClustersEndpoint = "/pools/default/remoteClusters" - xdcrClusterName = "sync_gateway_xdcr" + xdcrClusterName = "sync_gateway_xdcr" // this is a hardcoded name for the local XDCR cluster totalDocsFilteredStat = "xdcr_docs_filtered_total" totalDocsWrittenStat = "xdcr_docs_written_total" ) @@ -62,22 +62,7 @@ func isClusterPresent(ctx context.Context, bucket *base.GocbV2Bucket) (bool, err return false, nil } -// deleteCluster deletes an XDCR cluster. The cluster must be present in order to delete it. -func deleteCluster(ctx context.Context, bucket *base.GocbV2Bucket) error { - method := http.MethodDelete - url := "/pools/default/remoteClusters/" + xdcrClusterName - output, statusCode, err := bucket.MgmtRequest(ctx, method, url, "application/x-www-form-urlencoded", nil) - if err != nil { - return err - } - - if statusCode != http.StatusOK { - return fmt.Errorf("Could not delete xdcr cluster: %s. %s %s -> (%d) %s", xdcrClusterName, http.MethodDelete, method, statusCode, output) - } - return nil -} - -// createCluster deletes an XDCR cluster. The cluster must be present in order to delete it. +// createCluster creates an XDCR cluster. func createCluster(ctx context.Context, bucket *base.GocbV2Bucket) error { serverURL, err := url.Parse(base.UnitTestUrl()) if err != nil { @@ -105,21 +90,18 @@ func createCluster(ctx context.Context, bucket *base.GocbV2Bucket) error { // newCouchbaseServerManager creates an instance of XDCR backed by Couchbase Server. This is not started until Start is called. func newCouchbaseServerManager(ctx context.Context, fromBucket *base.GocbV2Bucket, toBucket *base.GocbV2Bucket, opts XDCROptions) (*couchbaseServerManager, error) { + // there needs to be a global cluster present, this is a hostname + username + password. There can be only one per hostname, so create it lazily. isPresent, err := isClusterPresent(ctx, fromBucket) if err != nil { return nil, err } - if isPresent { - err := deleteCluster(ctx, fromBucket) + if !isPresent { + err := createCluster(ctx, fromBucket) if err != nil { return nil, err } } - err = createCluster(ctx, fromBucket) - if err != nil { - return nil, err - } return &couchbaseServerManager{ fromBucket: fromBucket, toBucket: toBucket, @@ -132,7 +114,7 @@ func newCouchbaseServerManager(ctx context.Context, fromBucket *base.GocbV2Bucke func (x *couchbaseServerManager) Start(ctx context.Context) error { method := http.MethodPost body := url.Values{} - body.Add("name", xdcrClusterName) + body.Add("name", fmt.Sprintf("%s_%s", x.fromBucket.GetName(), x.toBucket.GetName())) body.Add("fromBucket", x.fromBucket.GetName()) body.Add("toBucket", x.toBucket.GetName()) body.Add("toCluster", xdcrClusterName) @@ -172,7 +154,7 @@ func (x *couchbaseServerManager) Stop(ctx context.Context) error { url := "/controller/cancelXDCR/" + url.PathEscape(x.replicationID) output, statusCode, err := x.fromBucket.MgmtRequest(ctx, method, url, "application/x-www-form-urlencoded", nil) if err != nil { - return err + return fmt.Errorf("Could not %s to %s: %w", method, url, err) } if statusCode != http.StatusOK { return fmt.Errorf("Could not cancel XDCR replication: %s. %s %s -> (%d) %s", x.replicationID, method, url, statusCode, output) diff --git a/xdcr/rosmar_xdcr.go b/xdcr/rosmar_xdcr.go index 6daa60d158..673adefb02 100644 --- a/xdcr/rosmar_xdcr.go +++ b/xdcr/rosmar_xdcr.go @@ -167,7 +167,6 @@ func (r *rosmarManager) Start(ctx context.Context) error { args := sgbucket.FeedArguments{ ID: "xdcr-" + r.replicationID, - Backfill: sgbucket.FeedNoBackfill, Terminator: r.terminator, Scopes: scopes, }