diff --git a/base/util_testing.go b/base/util_testing.go index 6ec763f5a2..30ebae3fa7 100644 --- a/base/util_testing.go +++ b/base/util_testing.go @@ -788,6 +788,11 @@ func RequireDocNotFoundError(t testing.TB, e error) { require.True(t, IsDocNotFoundError(e), fmt.Sprintf("Expected error to be a doc not found error, but was: %v", e)) } +// RequireXattrNotFoundError asserts that the given error represents an xattr not found error. +func RequireXattrNotFoundError(t testing.TB, e error) { + require.True(t, IsXattrNotFoundError(e), fmt.Sprintf("Expected error to be an xattr not found error, but was: %v", e)) +} + func requireCasMismatchError(t testing.TB, err error) { require.Error(t, err, "Expected an error of type IsCasMismatch %+v\n", err) require.True(t, IsCasMismatch(err), "Expected error of type IsCasMismatch but got %+v\n", err) diff --git a/db/database.go b/db/database.go index 6e0f488d5b..7837ec208f 100644 --- a/db/database.go +++ b/db/database.go @@ -349,8 +349,8 @@ func ConnectToBucket(ctx context.Context, spec base.BucketSpec, failFast bool) ( return ibucket.(base.Bucket), nil } -// Returns Couchbase Server Cluster UUID on a timeout. If running against walrus, do return an empty string. -func getServerUUID(ctx context.Context, bucket base.Bucket) (string, error) { +// GetServerUUID returns Couchbase Server Cluster UUID on a timeout. If running against rosmar, do return an empty string. +func GetServerUUID(ctx context.Context, bucket base.Bucket) (string, error) { gocbV2Bucket, err := base.AsGocbV2Bucket(bucket) if err != nil { return "", nil @@ -393,7 +393,7 @@ func NewDatabaseContext(ctx context.Context, dbName string, bucket base.Bucket, return nil, err } - serverUUID, err := getServerUUID(ctx, bucket) + serverUUID, err := GetServerUUID(ctx, bucket) if err != nil { return nil, err } diff --git a/db/hybrid_logical_vector_test.go b/db/hybrid_logical_vector_test.go index 1729fa8c2b..36d1673259 100644 --- a/db/hybrid_logical_vector_test.go +++ b/db/hybrid_logical_vector_test.go @@ -307,7 +307,7 @@ func TestHLVImport(t *testing.T) { require.NoError(t, err) encodedCAS = EncodeValue(cas) - docxattr, _ := existingXattrs[base.VirtualXattrRevSeqNo] + docxattr := existingXattrs[base.VirtualXattrRevSeqNo] revSeqNo := RetrieveDocRevSeqNo(t, docxattr) importOpts = importDocOptions{ @@ -466,7 +466,7 @@ func TestExtractHLVFromChangesMessage(t *testing.T) { // TODO: When CBG-3662 is done, should be able to simplify base64 handling to treat source as a string // that may represent a base64 encoding - base64EncodedHlvString := cblEncodeTestSources(test.hlvString) + base64EncodedHlvString := EncodeTestHistory(test.hlvString) hlv, err := extractHLVFromBlipMessage(base64EncodedHlvString) require.NoError(t, err) @@ -491,29 +491,3 @@ func BenchmarkExtractHLVFromBlipMessage(b *testing.B) { }) } } - -// cblEncodeTestSources converts the simplified versions in test data to CBL-style encoding -func cblEncodeTestSources(hlvString string) (base64HLVString string) { - - vectorFields := strings.Split(hlvString, ";") - vectorLength := len(vectorFields) - if vectorLength == 0 { - return hlvString - } - - // first vector field is single vector, cv - base64HLVString += EncodeTestVersion(vectorFields[0]) - for _, field := range vectorFields[1:] { - base64HLVString += ";" - versions := strings.Split(field, ",") - if len(versions) == 0 { - continue - } - base64HLVString += EncodeTestVersion(versions[0]) - for _, version := range versions[1:] { - base64HLVString += "," - base64HLVString += EncodeTestVersion(version) - } - } - return base64HLVString -} diff --git a/xdcr/replication.go b/xdcr/replication.go index 2001ce0bae..92f1d8490b 100644 --- a/xdcr/replication.go +++ b/xdcr/replication.go @@ -14,6 +14,7 @@ import ( "fmt" "github.com/couchbase/sync_gateway/base" + "github.com/couchbase/sync_gateway/db" "github.com/couchbaselabs/rosmar" ) @@ -75,3 +76,16 @@ func NewXDCR(ctx context.Context, fromBucket, toBucket base.Bucket, opts XDCROpt } return newCouchbaseServerManager(ctx, gocbFromBucket, gocbToBucket, opts) } + +// getSourceID returns the source ID for a bucket. +func getSourceID(ctx context.Context, bucket base.Bucket) (string, error) { + serverUUID, err := db.GetServerUUID(ctx, bucket) + if err != nil { + return "", err + } + bucketUUID, err := bucket.UUID() + if err != nil { + return "", err + } + return db.CreateEncodedSourceID(bucketUUID, serverUUID) +} diff --git a/xdcr/rosmar_xdcr.go b/xdcr/rosmar_xdcr.go index 6daa60d158..6e23863f0e 100644 --- a/xdcr/rosmar_xdcr.go +++ b/xdcr/rosmar_xdcr.go @@ -18,6 +18,7 @@ import ( sgbucket "github.com/couchbase/sg-bucket" "github.com/couchbase/sync_gateway/base" + "github.com/couchbase/sync_gateway/db" "github.com/couchbaselabs/rosmar" ) @@ -27,6 +28,7 @@ type rosmarManager struct { terminator chan bool toBucketCollections map[uint32]*rosmar.Collection fromBucket *rosmar.Bucket + fromBucketSourceID string toBucket *rosmar.Bucket replicationID string docsFiltered atomic.Uint64 @@ -36,12 +38,17 @@ type rosmarManager struct { } // newRosmarManager creates an instance of XDCR backed by rosmar. This is not started until Start is called. -func newRosmarManager(_ context.Context, fromBucket, toBucket *rosmar.Bucket, opts XDCROptions) (Manager, error) { +func newRosmarManager(ctx context.Context, fromBucket, toBucket *rosmar.Bucket, opts XDCROptions) (Manager, error) { if opts.Mobile != MobileOn { return nil, errors.New("Only sgbucket.XDCRMobileOn is supported in rosmar") } + fromBucketSourceID, err := getSourceID(ctx, fromBucket) + if err != nil { + return nil, fmt.Errorf("Could not get source ID for %s: %w", fromBucket.GetName(), err) + } return &rosmarManager{ fromBucket: fromBucket, + fromBucketSourceID: fromBucketSourceID, toBucket: toBucket, replicationID: fmt.Sprintf("%s-%s", fromBucket.GetName(), toBucket.GetName()), toBucketCollections: make(map[uint32]*rosmar.Collection), @@ -71,14 +78,15 @@ func (r *rosmarManager) processEvent(ctx context.Context, event sgbucket.FeedEve return true } - toCas, err := col.Get(docID, nil) + // Have to use GetWithXattrs to get a cas value back if there are no xattrs (GetWithXattrs will not return a cas if there are no xattrs) + _, toXattrs, toCas, err := col.GetWithXattrs(ctx, docID, []string{base.VvXattrName, base.MouXattrName}) if err != nil && !base.IsDocNotFoundError(err) { base.WarnfCtx(ctx, "Skipping replicating doc %s, could not perform a kv op get doc in toBucket: %s", event.Key, err) r.errorCount.Add(1) return false } - /* full LWW conflict resolution is not implemented in rosmar yet + /* full LWW conflict resolution is not implemented in rosmar yet. There is no need to implement this since CAS will always be unique due to rosmar limitations. CBS algorithm is: @@ -115,7 +123,7 @@ func (r *rosmarManager) processEvent(ctx context.Context, event sgbucket.FeedEve return true } - err = opWithMeta(ctx, col, toCas, event) + err = opWithMeta(ctx, col, r.fromBucketSourceID, toCas, toXattrs, event) if err != nil { base.WarnfCtx(ctx, "Replicating doc %s, could not write doc: %s", event.Key, err) r.errorCount.Add(1) @@ -167,7 +175,6 @@ func (r *rosmarManager) Start(ctx context.Context) error { args := sgbucket.FeedArguments{ ID: "xdcr-" + r.replicationID, - Backfill: sgbucket.FeedNoBackfill, Terminator: r.terminator, Scopes: scopes, } @@ -186,30 +193,55 @@ func (r *rosmarManager) Stop(_ context.Context) error { return nil } -// opWithMeta writes a document to the target datastore given a type of Deletion or Mutation event with a specific cas. -func opWithMeta(ctx context.Context, collection *rosmar.Collection, originalCas uint64, event sgbucket.FeedEvent) error { - var xattrs []byte +// opWithMeta writes a document to the target datastore given a type of Deletion or Mutation event with a specific cas. The originalXattrs will contain only the _vv and _mou xattr. +func opWithMeta(ctx context.Context, collection *rosmar.Collection, sourceID string, originalCas uint64, originalXattrs map[string][]byte, event sgbucket.FeedEvent) error { + var xattrs map[string][]byte var body []byte if event.DataType&sgbucket.FeedDataTypeXattr != 0 { var err error - var dcpXattrs map[string][]byte - body, dcpXattrs, err = sgbucket.DecodeValueWithAllXattrs(event.Value) + body, xattrs, err = sgbucket.DecodeValueWithAllXattrs(event.Value) if err != nil { return err } - xattrs, err = xattrToBytes(dcpXattrs) + } else { + xattrs = make(map[string][]byte, 1) // size one for _vv + body = event.Value + } + var vv *db.HybridLogicalVector + if bytes, ok := originalXattrs[base.VvXattrName]; ok { + err := json.Unmarshal(bytes, &vv) if err != nil { - return err + return fmt.Errorf("Could not unmarshal the existing vv xattr %s: %w", string(bytes), err) } } else { - body = event.Value + newVv := db.NewHybridLogicalVector() + vv = &newVv + } + // TODO: read existing originalXattrs[base.VvXattrName] and update the pv CBG-4250 + + // TODO: clear _mou when appropriate CBG-4251 + + // update new cv with new source/cas + casBytes := string(base.Uint64CASToLittleEndianHex(event.Cas)) + vv.SourceID = sourceID + vv.CurrentVersionCAS = casBytes + vv.Version = casBytes + + var err error + xattrs[base.VvXattrName], err = json.Marshal(vv) + if err != nil { + return err + } + xattrBytes, err := xattrToBytes(xattrs) + if err != nil { + return err } if event.Opcode == sgbucket.FeedOpDeletion { - return collection.DeleteWithMeta(ctx, string(event.Key), originalCas, event.Cas, event.Expiry, xattrs) + return collection.DeleteWithMeta(ctx, string(event.Key), originalCas, event.Cas, event.Expiry, xattrBytes) } - return collection.SetWithMeta(ctx, string(event.Key), originalCas, event.Cas, event.Expiry, xattrs, body, event.DataType) + return collection.SetWithMeta(ctx, string(event.Key), originalCas, event.Cas, event.Expiry, xattrBytes, body, event.DataType) } diff --git a/xdcr/xdcr_test.go b/xdcr/xdcr_test.go index b861937340..4febe03601 100644 --- a/xdcr/xdcr_test.go +++ b/xdcr/xdcr_test.go @@ -13,7 +13,9 @@ import ( "testing" "time" + sgbucket "github.com/couchbase/sg-bucket" "github.com/couchbase/sync_gateway/base" + "github.com/couchbase/sync_gateway/db" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" ) @@ -50,74 +52,299 @@ func TestMobileXDCRNoSyncDataCopied(t *testing.T) { attachmentDoc = "_sync:att2:foo" normalDoc = "doc2" exp = 0 - body = `{"key":"value"}` - version = "ver" - source = "src" - curCAS = "cvCas" + startingBody = `{"key":"value"}` ) dataStores := map[base.DataStore]base.DataStore{ fromBucket.DefaultDataStore(): toBucket.DefaultDataStore(), } + var fromDs base.DataStore + var toDs base.DataStore if base.TestsUseNamedCollections() { - fromDs, err := fromBucket.GetNamedDataStore(0) + fromDs, err = fromBucket.GetNamedDataStore(0) require.NoError(t, err) - toDs, err := toBucket.GetNamedDataStore(0) + toDs, err = toBucket.GetNamedDataStore(0) require.NoError(t, err) dataStores[fromDs] = toDs + } else { + fromDs = fromBucket.DefaultDataStore() + toDs = toBucket.DefaultDataStore() + } + fromBucketSourceID, err := getSourceID(ctx, fromBucket) + require.NoError(t, err) + docCas := make(map[string]uint64) + for _, doc := range []string{syncDoc, attachmentDoc, normalDoc} { + var inputCas uint64 + var err error + docCas[doc], err = fromDs.WriteCas(doc, exp, inputCas, []byte(startingBody), 0) + require.NoError(t, err) + _, _, err = fromDs.GetXattrs(ctx, doc, []string{base.VvXattrName}) + // make sure that the doc does not have a version vector + base.RequireXattrNotFoundError(t, err) + } + + // make sure attachments are copied + for _, doc := range []string{normalDoc, attachmentDoc} { + var body []byte + var xattrs map[string][]byte + var cas uint64 + require.EventuallyWithT(t, func(c *assert.CollectT) { + var err error + body, xattrs, cas, err = toDs.GetWithXattrs(ctx, doc, []string{base.VvXattrName, base.MouXattrName}) + assert.NoError(c, err, "Could not get doc %s", doc) + }, time.Second*5, time.Millisecond*100) + require.Equal(t, docCas[doc], cas) + require.JSONEq(t, startingBody, string(body)) + require.NotContains(t, xattrs, base.MouXattrName) + if !base.TestSupportsMobileXDCR() { + require.Len(t, xattrs, 0) + continue + } + require.Contains(t, xattrs, base.VvXattrName) + casString := string(base.Uint64CASToLittleEndianHex(cas)) + var vv *db.HybridLogicalVector + require.NoError(t, base.JSONUnmarshal(xattrs[base.VvXattrName], &vv)) + require.Equal(t, &db.HybridLogicalVector{ + CurrentVersionCAS: casString, + SourceID: fromBucketSourceID, + Version: casString, + }, vv) } + + _, err = toDs.Get(syncDoc, nil) + base.RequireDocNotFoundError(t, err) + var totalDocsWritten uint64 var totalDocsFiltered uint64 - for fromDs, toDs := range dataStores { - for _, doc := range []string{syncDoc, attachmentDoc, normalDoc} { - _, err = fromDs.Add(doc, exp, body) - require.NoError(t, err) - } - // make sure attachments are copied - for _, doc := range []string{normalDoc, attachmentDoc} { - require.EventuallyWithT(t, func(c *assert.CollectT) { - var value string - _, err = toDs.Get(doc, &value) - assert.NoError(c, err, "Could not get doc %s", doc) - assert.Equal(c, body, value) - }, time.Second*5, time.Millisecond*100) - } + // stats are not updated in real time, so we need to wait a bit + require.EventuallyWithT(t, func(c *assert.CollectT) { + stats, err := xdcr.Stats(ctx) + assert.NoError(t, err) + assert.Equal(c, totalDocsFiltered+1, stats.DocsFiltered) + assert.Equal(c, totalDocsWritten+2, stats.DocsWritten) - var value any - _, err = toDs.Get(syncDoc, &value) - base.RequireDocNotFoundError(t, err) + }, time.Second*5, time.Millisecond*100) +} - // stats are not updated in real time, so we need to wait a bit - require.EventuallyWithT(t, func(c *assert.CollectT) { - stats, err := xdcr.Stats(ctx) - assert.NoError(t, err) - assert.Equal(c, totalDocsFiltered+1, stats.DocsFiltered) - assert.Equal(c, totalDocsWritten+2, stats.DocsWritten) +// getTwoBucketDataStores creates two data stores in separate buckets to run xdcr within. Returns a named collection or a default collection based on the global test configuration. +func getTwoBucketDataStores(t *testing.T) (base.Bucket, sgbucket.DataStore, base.Bucket, sgbucket.DataStore) { + ctx := base.TestCtx(t) + base.RequireNumTestBuckets(t, 2) + fromBucket := base.GetTestBucket(t) + t.Cleanup(func() { + fromBucket.Close(ctx) + }) + toBucket := base.GetTestBucket(t) + t.Cleanup(func() { + toBucket.Close(ctx) + }) + var fromDs base.DataStore + var toDs base.DataStore + if base.TestsUseNamedCollections() { + var err error + fromDs, err = fromBucket.GetNamedDataStore(0) + require.NoError(t, err) + toDs, err = toBucket.GetNamedDataStore(0) + require.NoError(t, err) + } else { + fromDs = fromBucket.DefaultDataStore() + toDs = toBucket.DefaultDataStore() + } + return fromBucket, fromDs, toBucket, toDs +} - }, time.Second*5, time.Millisecond*100) - totalDocsWritten += 2 - totalDocsFiltered++ - if base.UnitTestUrlIsWalrus() { - // TODO: CBG-3861 implement _vv support in rosmar - continue - } - // in mobile xdcr mode a version vector will be written - if base.TestSupportsMobileXDCR() { - // verify VV is written to docs that are replicated - for _, doc := range []string{normalDoc, attachmentDoc} { - require.EventuallyWithT(t, func(c *assert.CollectT) { - xattrs, _, err := toDs.GetXattrs(ctx, doc, []string{"_vv"}) - assert.NoError(c, err, "Could not get doc %s", doc) - vvXattrBytes, ok := xattrs["_vv"] - require.True(t, ok) - var vvXattrVal map[string]any - require.NoError(t, base.JSONUnmarshal(vvXattrBytes, &vvXattrVal)) - assert.NotNil(c, vvXattrVal[version]) - assert.NotNil(c, vvXattrVal[source]) - assert.NotNil(c, vvXattrVal[curCAS]) - - }, time.Second*5, time.Millisecond*100) - } - } +func TestReplicateVV(t *testing.T) { + fromBucket, fromDs, toBucket, toDs := getTwoBucketDataStores(t) + ctx := base.TestCtx(t) + fromBucketSourceID, err := getSourceID(ctx, fromBucket) + require.NoError(t, err) + + testCases := []struct { + name string + docID string + body string + preXDCRFunc func(t *testing.T, docID string) uint64 + }{ + { + name: "normal doc", + docID: "doc1", + body: `{"key":"value"}`, + preXDCRFunc: func(t *testing.T, docID string) uint64 { + cas, err := fromDs.WriteCas(docID, 0, 0, []byte(`{"key":"value"}`), 0) + require.NoError(t, err) + return cas + }, + }, + { + name: "dest doc older, expect overwrite", + docID: "doc2", + body: `{"datastore":"fromDs"}`, + preXDCRFunc: func(t *testing.T, docID string) uint64 { + _, err := toDs.WriteCas(docID, 0, 0, []byte(`{"datastore":"toDs"}`), 0) + require.NoError(t, err) + cas, err := fromDs.WriteCas(docID, 0, 0, []byte(`{"datastore":"fromDs"}`), 0) + require.NoError(t, err) + return cas + }, + }, + { + name: "dest doc newer, expect keep old version", + docID: "doc3", + body: `{"datastore":"fromDs"}`, + preXDCRFunc: func(t *testing.T, docID string) uint64 { + _, err := toDs.WriteCas(docID, 0, 0, []byte(`{"datastore":"toDs"}`), 0) + require.NoError(t, err) + cas, err := fromDs.WriteCas(docID, 0, 0, []byte(`{"datastore":"fromDs"}`), 0) + require.NoError(t, err) + return cas + }, + }, } + // tests write a document + // start xdcr + // verify result + for _, testCase := range testCases { + t.Run(testCase.name, func(t *testing.T) { + fromCAS := testCase.preXDCRFunc(t, testCase.docID) + + xdcr := startXDCR(t, fromBucket, toBucket, XDCROptions{Mobile: MobileOn}) + defer func() { + assert.NoError(t, xdcr.Stop(ctx)) + }() + requireWaitForXDCRDocsWritten(t, xdcr, 1) + + body, xattrs, destCas, err := toDs.GetWithXattrs(ctx, testCase.docID, []string{base.VvXattrName, base.MouXattrName}) + require.NoError(t, err, "Could not get doc %s", testCase.docID) + require.Equal(t, fromCAS, destCas) + require.JSONEq(t, testCase.body, string(body)) + require.NotContains(t, xattrs, base.MouXattrName) + require.Contains(t, xattrs, base.VvXattrName) + casString := string(base.Uint64CASToLittleEndianHex(fromCAS)) + var vv *db.HybridLogicalVector + require.NoError(t, base.JSONUnmarshal(xattrs[base.VvXattrName], &vv)) + require.Equal(t, &db.HybridLogicalVector{ + CurrentVersionCAS: casString, + SourceID: fromBucketSourceID, + Version: casString, + }, vv) + + }) + } +} + +func TestVVWriteTwice(t *testing.T) { + fromBucket, fromDs, toBucket, toDs := getTwoBucketDataStores(t) + ctx := base.TestCtx(t) + fromBucketSourceID, err := getSourceID(ctx, fromBucket) + require.NoError(t, err) + + docID := "doc1" + ver1Body := `{"ver":1}` + fromCAS, err := fromDs.WriteCas(docID, 0, 0, []byte(ver1Body), 0) + require.NoError(t, err) + xdcr := startXDCR(t, fromBucket, toBucket, XDCROptions{Mobile: MobileOn}) + defer func() { + assert.NoError(t, xdcr.Stop(ctx)) + }() + requireWaitForXDCRDocsWritten(t, xdcr, 1) + + body, xattrs, destCas, err := toDs.GetWithXattrs(ctx, docID, []string{base.VvXattrName, base.MouXattrName}) + require.NoError(t, err) + require.Equal(t, fromCAS, destCas) + require.JSONEq(t, ver1Body, string(body)) + require.Contains(t, xattrs, base.VvXattrName) + casString := string(base.Uint64CASToLittleEndianHex(fromCAS)) + var vv *db.HybridLogicalVector + require.NoError(t, base.JSONUnmarshal(xattrs[base.VvXattrName], &vv)) + require.Equal(t, &db.HybridLogicalVector{ + CurrentVersionCAS: casString, + SourceID: fromBucketSourceID, + Version: casString, + }, vv) + + fromCAS2, err := fromDs.WriteCas(docID, 0, fromCAS, []byte(`{"ver":2}`), 0) + require.NoError(t, err) + requireWaitForXDCRDocsWritten(t, xdcr, 2) + + body, xattrs, destCas, err = toDs.GetWithXattrs(ctx, docID, []string{base.VvXattrName, base.MouXattrName}) + require.NoError(t, err) + require.Equal(t, fromCAS2, destCas) + require.JSONEq(t, `{"ver":2}`, string(body)) + require.Contains(t, xattrs, base.VvXattrName) + casString = string(base.Uint64CASToLittleEndianHex(fromCAS2)) + require.NoError(t, base.JSONUnmarshal(xattrs[base.VvXattrName], &vv)) + require.Equal(t, &db.HybridLogicalVector{ + CurrentVersionCAS: casString, + SourceID: fromBucketSourceID, + Version: casString, + }, vv) +} + +func TestVVWriteOnDestAfterInitialReplication(t *testing.T) { + fromBucket, fromDs, toBucket, toDs := getTwoBucketDataStores(t) + ctx := base.TestCtx(t) + fromBucketSourceID, err := getSourceID(ctx, fromBucket) + require.NoError(t, err) + + docID := "doc1" + ver1Body := `{"ver":1}` + fromCAS, err := fromDs.WriteCas(docID, 0, 0, []byte(ver1Body), 0) + require.NoError(t, err) + xdcr := startXDCR(t, fromBucket, toBucket, XDCROptions{Mobile: MobileOn}) + defer func() { + assert.NoError(t, xdcr.Stop(ctx)) + }() + requireWaitForXDCRDocsWritten(t, xdcr, 1) + + body, xattrs, destCas, err := toDs.GetWithXattrs(ctx, docID, []string{base.VvXattrName, base.MouXattrName}) + require.NoError(t, err) + require.Equal(t, fromCAS, destCas) + require.JSONEq(t, ver1Body, string(body)) + require.Contains(t, xattrs, base.VvXattrName) + casString := string(base.Uint64CASToLittleEndianHex(fromCAS)) + fmt.Printf("vv after write1: %+v\n", string(xattrs[base.VvXattrName])) + var vv *db.HybridLogicalVector + require.NoError(t, base.JSONUnmarshal(xattrs[base.VvXattrName], &vv)) + require.Equal(t, &db.HybridLogicalVector{ + CurrentVersionCAS: casString, + SourceID: fromBucketSourceID, + Version: casString, + }, vv) + + // write to dest bucket again + toCas2, err := toDs.WriteCas(docID, 0, fromCAS, []byte(`{"ver":3}`), 0) + require.NoError(t, err) + fmt.Printf("toCas2: %d\n", toCas2) + + body, xattrs, destCas, err = toDs.GetWithXattrs(ctx, docID, []string{base.VvXattrName, base.MouXattrName}) + require.NoError(t, err) + require.Equal(t, toCas2, destCas) + require.JSONEq(t, `{"ver":3}`, string(body)) + require.Contains(t, xattrs, base.VvXattrName) + casString = string(base.Uint64CASToLittleEndianHex(fromCAS)) + fmt.Printf("vv after write2: %+v\n", string(xattrs[base.VvXattrName])) + require.NoError(t, base.JSONUnmarshal(xattrs[base.VvXattrName], &vv)) + require.Equal(t, &db.HybridLogicalVector{ + CurrentVersionCAS: casString, + SourceID: fromBucketSourceID, + Version: casString, + }, vv) +} + +func startXDCR(t *testing.T, fromBucket base.Bucket, toBucket base.Bucket, opts XDCROptions) Manager { + ctx := base.TestCtx(t) + xdcr, err := NewXDCR(ctx, fromBucket, toBucket, opts) + require.NoError(t, err) + err = xdcr.Start(ctx) + require.NoError(t, err) + return xdcr +} + +func requireWaitForXDCRDocsWritten(t *testing.T, xdcr Manager, expectedDocsWritten uint64) { + ctx := base.TestCtx(t) + require.EventuallyWithT(t, func(c *assert.CollectT) { + stats, err := xdcr.Stats(ctx) + assert.NoError(t, err) + assert.Equal(c, expectedDocsWritten, stats.DocsWritten) + }, time.Second*5, time.Millisecond*100) }