Skip to content

Commit

Permalink
CBG-3861 support updating vv on xdcr
Browse files Browse the repository at this point in the history
- this code will only update cv when appropriate, does not yet
  support changing pv or mv (not supported in mobile xdcr anyway)
- stubs out some kinds of tests we can write
  • Loading branch information
torcolvin committed Sep 19, 2024
1 parent 05abda1 commit 0825ff2
Show file tree
Hide file tree
Showing 6 changed files with 352 additions and 100 deletions.
5 changes: 5 additions & 0 deletions base/util_testing.go
Original file line number Diff line number Diff line change
Expand Up @@ -788,6 +788,11 @@ func RequireDocNotFoundError(t testing.TB, e error) {
require.True(t, IsDocNotFoundError(e), fmt.Sprintf("Expected error to be a doc not found error, but was: %v", e))
}

// RequireXattrNotFoundError asserts that the given error represents an xattr not found error.
func RequireXattrNotFoundError(t testing.TB, e error) {
require.True(t, IsXattrNotFoundError(e), fmt.Sprintf("Expected error to be an xattr not found error, but was: %v", e))
}

func requireCasMismatchError(t testing.TB, err error) {
require.Error(t, err, "Expected an error of type IsCasMismatch %+v\n", err)
require.True(t, IsCasMismatch(err), "Expected error of type IsCasMismatch but got %+v\n", err)
Expand Down
6 changes: 3 additions & 3 deletions db/database.go
Original file line number Diff line number Diff line change
Expand Up @@ -349,8 +349,8 @@ func ConnectToBucket(ctx context.Context, spec base.BucketSpec, failFast bool) (
return ibucket.(base.Bucket), nil
}

// Returns Couchbase Server Cluster UUID on a timeout. If running against walrus, do return an empty string.
func getServerUUID(ctx context.Context, bucket base.Bucket) (string, error) {
// GetServerUUID returns Couchbase Server Cluster UUID on a timeout. If running against rosmar, do return an empty string.
func GetServerUUID(ctx context.Context, bucket base.Bucket) (string, error) {
gocbV2Bucket, err := base.AsGocbV2Bucket(bucket)
if err != nil {
return "", nil
Expand Down Expand Up @@ -393,7 +393,7 @@ func NewDatabaseContext(ctx context.Context, dbName string, bucket base.Bucket,
return nil, err
}

serverUUID, err := getServerUUID(ctx, bucket)
serverUUID, err := GetServerUUID(ctx, bucket)
if err != nil {
return nil, err
}
Expand Down
30 changes: 2 additions & 28 deletions db/hybrid_logical_vector_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -307,7 +307,7 @@ func TestHLVImport(t *testing.T) {
require.NoError(t, err)
encodedCAS = EncodeValue(cas)

docxattr, _ := existingXattrs[base.VirtualXattrRevSeqNo]
docxattr := existingXattrs[base.VirtualXattrRevSeqNo]
revSeqNo := RetrieveDocRevSeqNo(t, docxattr)

importOpts = importDocOptions{
Expand Down Expand Up @@ -466,7 +466,7 @@ func TestExtractHLVFromChangesMessage(t *testing.T) {

// TODO: When CBG-3662 is done, should be able to simplify base64 handling to treat source as a string
// that may represent a base64 encoding
base64EncodedHlvString := cblEncodeTestSources(test.hlvString)
base64EncodedHlvString := EncodeTestHistory(test.hlvString)
hlv, err := extractHLVFromBlipMessage(base64EncodedHlvString)
require.NoError(t, err)

Expand All @@ -491,29 +491,3 @@ func BenchmarkExtractHLVFromBlipMessage(b *testing.B) {
})
}
}

// cblEncodeTestSources converts the simplified versions in test data to CBL-style encoding
func cblEncodeTestSources(hlvString string) (base64HLVString string) {

vectorFields := strings.Split(hlvString, ";")
vectorLength := len(vectorFields)
if vectorLength == 0 {
return hlvString
}

// first vector field is single vector, cv
base64HLVString += EncodeTestVersion(vectorFields[0])
for _, field := range vectorFields[1:] {
base64HLVString += ";"
versions := strings.Split(field, ",")
if len(versions) == 0 {
continue
}
base64HLVString += EncodeTestVersion(versions[0])
for _, version := range versions[1:] {
base64HLVString += ","
base64HLVString += EncodeTestVersion(version)
}
}
return base64HLVString
}
14 changes: 14 additions & 0 deletions xdcr/replication.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ import (
"fmt"

"github.com/couchbase/sync_gateway/base"
"github.com/couchbase/sync_gateway/db"
"github.com/couchbaselabs/rosmar"
)

Expand Down Expand Up @@ -75,3 +76,16 @@ func NewXDCR(ctx context.Context, fromBucket, toBucket base.Bucket, opts XDCROpt
}
return newCouchbaseServerManager(ctx, gocbFromBucket, gocbToBucket, opts)
}

// getSourceID returns the source ID for a bucket.
func getSourceID(ctx context.Context, bucket base.Bucket) (string, error) {
serverUUID, err := db.GetServerUUID(ctx, bucket)
if err != nil {
return "", err
}
bucketUUID, err := bucket.UUID()
if err != nil {
return "", err
}
return db.CreateEncodedSourceID(bucketUUID, serverUUID)
}
62 changes: 47 additions & 15 deletions xdcr/rosmar_xdcr.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ import (

sgbucket "github.com/couchbase/sg-bucket"
"github.com/couchbase/sync_gateway/base"
"github.com/couchbase/sync_gateway/db"
"github.com/couchbaselabs/rosmar"
)

Expand All @@ -27,6 +28,7 @@ type rosmarManager struct {
terminator chan bool
toBucketCollections map[uint32]*rosmar.Collection
fromBucket *rosmar.Bucket
fromBucketSourceID string
toBucket *rosmar.Bucket
replicationID string
docsFiltered atomic.Uint64
Expand All @@ -36,12 +38,17 @@ type rosmarManager struct {
}

// newRosmarManager creates an instance of XDCR backed by rosmar. This is not started until Start is called.
func newRosmarManager(_ context.Context, fromBucket, toBucket *rosmar.Bucket, opts XDCROptions) (Manager, error) {
func newRosmarManager(ctx context.Context, fromBucket, toBucket *rosmar.Bucket, opts XDCROptions) (Manager, error) {
if opts.Mobile != MobileOn {
return nil, errors.New("Only sgbucket.XDCRMobileOn is supported in rosmar")
}
fromBucketSourceID, err := getSourceID(ctx, fromBucket)
if err != nil {
return nil, fmt.Errorf("Could not get source ID for %s: %w", fromBucket.GetName(), err)
}
return &rosmarManager{
fromBucket: fromBucket,
fromBucketSourceID: fromBucketSourceID,
toBucket: toBucket,
replicationID: fmt.Sprintf("%s-%s", fromBucket.GetName(), toBucket.GetName()),
toBucketCollections: make(map[uint32]*rosmar.Collection),
Expand Down Expand Up @@ -71,14 +78,15 @@ func (r *rosmarManager) processEvent(ctx context.Context, event sgbucket.FeedEve
return true
}

toCas, err := col.Get(docID, nil)
// Have to use GetWithXattrs to get a cas value back if there are no xattrs (GetWithXattrs will not return a cas if there are no xattrs)
_, toXattrs, toCas, err := col.GetWithXattrs(ctx, docID, []string{base.VvXattrName, base.MouXattrName})
if err != nil && !base.IsDocNotFoundError(err) {
base.WarnfCtx(ctx, "Skipping replicating doc %s, could not perform a kv op get doc in toBucket: %s", event.Key, err)
r.errorCount.Add(1)
return false
}

/* full LWW conflict resolution is not implemented in rosmar yet
/* full LWW conflict resolution is not implemented in rosmar yet. There is no need to implement this since CAS will always be unique due to rosmar limitations.
CBS algorithm is:
Expand Down Expand Up @@ -115,7 +123,7 @@ func (r *rosmarManager) processEvent(ctx context.Context, event sgbucket.FeedEve
return true
}

err = opWithMeta(ctx, col, toCas, event)
err = opWithMeta(ctx, col, r.fromBucketSourceID, toCas, toXattrs, event)
if err != nil {
base.WarnfCtx(ctx, "Replicating doc %s, could not write doc: %s", event.Key, err)
r.errorCount.Add(1)
Expand Down Expand Up @@ -167,7 +175,6 @@ func (r *rosmarManager) Start(ctx context.Context) error {

args := sgbucket.FeedArguments{
ID: "xdcr-" + r.replicationID,
Backfill: sgbucket.FeedNoBackfill,
Terminator: r.terminator,
Scopes: scopes,
}
Expand All @@ -186,30 +193,55 @@ func (r *rosmarManager) Stop(_ context.Context) error {
return nil
}

// opWithMeta writes a document to the target datastore given a type of Deletion or Mutation event with a specific cas.
func opWithMeta(ctx context.Context, collection *rosmar.Collection, originalCas uint64, event sgbucket.FeedEvent) error {
var xattrs []byte
// opWithMeta writes a document to the target datastore given a type of Deletion or Mutation event with a specific cas. The originalXattrs will contain only the _vv and _mou xattr.
func opWithMeta(ctx context.Context, collection *rosmar.Collection, sourceID string, originalCas uint64, originalXattrs map[string][]byte, event sgbucket.FeedEvent) error {
var xattrs map[string][]byte
var body []byte
if event.DataType&sgbucket.FeedDataTypeXattr != 0 {
var err error
var dcpXattrs map[string][]byte
body, dcpXattrs, err = sgbucket.DecodeValueWithAllXattrs(event.Value)
body, xattrs, err = sgbucket.DecodeValueWithAllXattrs(event.Value)
if err != nil {
return err
}
xattrs, err = xattrToBytes(dcpXattrs)
} else {
xattrs = make(map[string][]byte, 1) // size one for _vv
body = event.Value
}
var vv *db.HybridLogicalVector
if bytes, ok := originalXattrs[base.VvXattrName]; ok {
err := json.Unmarshal(bytes, &vv)
if err != nil {
return err
return fmt.Errorf("Could not unmarshal the existing vv xattr %s: %w", string(bytes), err)
}
} else {
body = event.Value
newVv := db.NewHybridLogicalVector()
vv = &newVv
}
// TODO: read existing originalXattrs[base.VvXattrName] and update the pv CBG-4250

// TODO: clear _mou when appropriate CBG-4251

// update new cv with new source/cas
casBytes := string(base.Uint64CASToLittleEndianHex(event.Cas))
vv.SourceID = sourceID
vv.CurrentVersionCAS = casBytes
vv.Version = casBytes

var err error
xattrs[base.VvXattrName], err = json.Marshal(vv)
if err != nil {
return err
}
xattrBytes, err := xattrToBytes(xattrs)
if err != nil {
return err
}

if event.Opcode == sgbucket.FeedOpDeletion {
return collection.DeleteWithMeta(ctx, string(event.Key), originalCas, event.Cas, event.Expiry, xattrs)
return collection.DeleteWithMeta(ctx, string(event.Key), originalCas, event.Cas, event.Expiry, xattrBytes)
}

return collection.SetWithMeta(ctx, string(event.Key), originalCas, event.Cas, event.Expiry, xattrs, body, event.DataType)
return collection.SetWithMeta(ctx, string(event.Key), originalCas, event.Cas, event.Expiry, xattrBytes, body, event.DataType)

}

Expand Down
Loading

0 comments on commit 0825ff2

Please sign in to comment.