Skip to content

Commit

Permalink
CBG-3861 support updating vv on xdcr (#7118)
Browse files Browse the repository at this point in the history
* CBG-3861 support updating vv on xdcr

- this code will only update cv when appropriate, does not yet
  support changing pv or mv (not supported in mobile xdcr anyway)
- stubs out some kinds of tests we can write

* Create docsProcessed stat to account for conflict rejected docs on server

* Update rosmar
  • Loading branch information
torcolvin authored Oct 2, 2024
1 parent 47c041b commit d583123
Show file tree
Hide file tree
Showing 10 changed files with 382 additions and 124 deletions.
5 changes: 5 additions & 0 deletions base/util_testing.go
Original file line number Diff line number Diff line change
Expand Up @@ -788,6 +788,11 @@ func RequireDocNotFoundError(t testing.TB, e error) {
require.True(t, IsDocNotFoundError(e), fmt.Sprintf("Expected error to be a doc not found error, but was: %v", e))
}

// RequireXattrNotFoundError asserts that the given error represents an xattr not found error.
func RequireXattrNotFoundError(t testing.TB, e error) {
require.True(t, IsXattrNotFoundError(e), fmt.Sprintf("Expected error to be an xattr not found error, but was: %v", e))
}

func requireCasMismatchError(t testing.TB, err error) {
require.Error(t, err, "Expected an error of type IsCasMismatch %+v\n", err)
require.True(t, IsCasMismatch(err), "Expected error of type IsCasMismatch but got %+v\n", err)
Expand Down
6 changes: 3 additions & 3 deletions db/database.go
Original file line number Diff line number Diff line change
Expand Up @@ -349,8 +349,8 @@ func ConnectToBucket(ctx context.Context, spec base.BucketSpec, failFast bool) (
return ibucket.(base.Bucket), nil
}

// Returns Couchbase Server Cluster UUID on a timeout. If running against walrus, do return an empty string.
func getServerUUID(ctx context.Context, bucket base.Bucket) (string, error) {
// GetServerUUID returns Couchbase Server Cluster UUID on a timeout. If running against rosmar, do return an empty string.
func GetServerUUID(ctx context.Context, bucket base.Bucket) (string, error) {
gocbV2Bucket, err := base.AsGocbV2Bucket(bucket)
if err != nil {
return "", nil
Expand Down Expand Up @@ -393,7 +393,7 @@ func NewDatabaseContext(ctx context.Context, dbName string, bucket base.Bucket,
return nil, err
}

serverUUID, err := getServerUUID(ctx, bucket)
serverUUID, err := GetServerUUID(ctx, bucket)
if err != nil {
return nil, err
}
Expand Down
30 changes: 2 additions & 28 deletions db/hybrid_logical_vector_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -307,7 +307,7 @@ func TestHLVImport(t *testing.T) {
require.NoError(t, err)
encodedCAS = EncodeValue(cas)

docxattr, _ := existingXattrs[base.VirtualXattrRevSeqNo]
docxattr := existingXattrs[base.VirtualXattrRevSeqNo]
revSeqNo := RetrieveDocRevSeqNo(t, docxattr)

importOpts = importDocOptions{
Expand Down Expand Up @@ -466,7 +466,7 @@ func TestExtractHLVFromChangesMessage(t *testing.T) {

// TODO: When CBG-3662 is done, should be able to simplify base64 handling to treat source as a string
// that may represent a base64 encoding
base64EncodedHlvString := cblEncodeTestSources(test.hlvString)
base64EncodedHlvString := EncodeTestHistory(test.hlvString)
hlv, err := extractHLVFromBlipMessage(base64EncodedHlvString)
require.NoError(t, err)

Expand All @@ -491,29 +491,3 @@ func BenchmarkExtractHLVFromBlipMessage(b *testing.B) {
})
}
}

// cblEncodeTestSources converts the simplified versions in test data to CBL-style encoding
func cblEncodeTestSources(hlvString string) (base64HLVString string) {

vectorFields := strings.Split(hlvString, ";")
vectorLength := len(vectorFields)
if vectorLength == 0 {
return hlvString
}

// first vector field is single vector, cv
base64HLVString += EncodeTestVersion(vectorFields[0])
for _, field := range vectorFields[1:] {
base64HLVString += ";"
versions := strings.Split(field, ",")
if len(versions) == 0 {
continue
}
base64HLVString += EncodeTestVersion(versions[0])
for _, version := range versions[1:] {
base64HLVString += ","
base64HLVString += EncodeTestVersion(version)
}
}
return base64HLVString
}
4 changes: 2 additions & 2 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ require (
github.com/couchbase/sg-bucket v0.0.0-20240606153601-d152b90edccb
github.com/couchbaselabs/go-fleecedelta v0.0.0-20220909152808-6d09efa7a338
github.com/couchbaselabs/gocbconnstr v1.0.5
github.com/couchbaselabs/rosmar v0.0.0-20240610211258-c856107e8e78
github.com/couchbaselabs/rosmar v0.0.0-20240924211003-933f0fd5bba0
github.com/elastic/gosigar v0.14.3
github.com/felixge/fgprof v0.9.4
github.com/go-jose/go-jose/v4 v4.0.2
Expand Down Expand Up @@ -71,7 +71,7 @@ require (
github.com/grpc-ecosystem/go-grpc-middleware v1.4.0 // indirect
github.com/klauspost/compress v1.17.3 // indirect
github.com/lufia/plan9stats v0.0.0-20211012122336-39d0f177ccd0 // indirect
github.com/mattn/go-sqlite3 v1.14.22 // indirect
github.com/mattn/go-sqlite3 v1.14.23 // indirect
github.com/matttproud/golang_protobuf_extensions v1.0.4 // indirect
github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd // indirect
github.com/modern-go/reflect2 v1.0.2 // indirect
Expand Down
4 changes: 4 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,8 @@ github.com/couchbaselabs/gocbconnstr/v2 v2.0.0-20240607131231-fb385523de28 h1:lh
github.com/couchbaselabs/gocbconnstr/v2 v2.0.0-20240607131231-fb385523de28/go.mod h1:o7T431UOfFVHDNvMBUmUxpHnhivwv7BziUao/nMl81E=
github.com/couchbaselabs/rosmar v0.0.0-20240610211258-c856107e8e78 h1:pdMO4naNb0W68OisY0Y7LEE6xOXlrlZow5IWmwow2Wc=
github.com/couchbaselabs/rosmar v0.0.0-20240610211258-c856107e8e78/go.mod h1:BZgg7zjF7c8e7BR5/JBuSXZ+PLIHgyrNKwE0eLFeglw=
github.com/couchbaselabs/rosmar v0.0.0-20240924211003-933f0fd5bba0 h1:CQil6oxiHYhJBITdKTlxEUOetPdcgN6bk8wOZd4maDM=
github.com/couchbaselabs/rosmar v0.0.0-20240924211003-933f0fd5bba0/go.mod h1:Abf5EPwi/7j5caDy2OPmo+L36I02H7sp9dkgek5t4bM=
github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c=
github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
Expand Down Expand Up @@ -166,6 +168,8 @@ github.com/lufia/plan9stats v0.0.0-20211012122336-39d0f177ccd0/go.mod h1:zJYVVT2
github.com/mailru/easyjson v0.7.7/go.mod h1:xzfreul335JAWq5oZzymOObrkdz5UnU4kGfJJLY9Nlc=
github.com/mattn/go-sqlite3 v1.14.22 h1:2gZY6PC6kBnID23Tichd1K+Z0oS6nE/XwU+Vz/5o4kU=
github.com/mattn/go-sqlite3 v1.14.22/go.mod h1:Uh1q+B4BYcTPb+yiD3kU8Ct7aC0hY9fxUwlHK0RXw+Y=
github.com/mattn/go-sqlite3 v1.14.23 h1:gbShiuAP1W5j9UOksQ06aiiqPMxYecovVGwmTxWtuw0=
github.com/mattn/go-sqlite3 v1.14.23/go.mod h1:Uh1q+B4BYcTPb+yiD3kU8Ct7aC0hY9fxUwlHK0RXw+Y=
github.com/matttproud/golang_protobuf_extensions v1.0.4 h1:mmDVorXM7PCGKw94cs5zkfA9PSy5pEvNWRP0ET0TIVo=
github.com/matttproud/golang_protobuf_extensions v1.0.4/go.mod h1:BSXmuO+STAnVfrANrmjBb36TMTDstsz7MSK+HVaYKv4=
github.com/modern-go/concurrent v0.0.0-20180228061459-e0a39a4cb421/go.mod h1:6dJC0mAP4ikYIbvyc7fijjWJddQyLn8Ig3JB5CqoB9Q=
Expand Down
42 changes: 29 additions & 13 deletions xdcr/cbs_xdcr.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ package xdcr

import (
"context"
"errors"
"fmt"
"net/http"
"net/url"
Expand All @@ -20,12 +21,15 @@ import (
)

const (
cbsRemoteClustersEndpoint = "/pools/default/remoteClusters"
xdcrClusterName = "sync_gateway_xdcr" // this is a hardcoded name for the local XDCR cluster
totalDocsFilteredStat = "xdcr_docs_filtered_total"
totalDocsWrittenStat = "xdcr_docs_written_total"
cbsRemoteClustersEndpoint = "/pools/default/remoteClusters"
xdcrClusterName = "sync_gateway_xdcr" // this is a hardcoded name for the local XDCR cluster
totalMobileDocsFiltered = "xdcr_mobile_docs_filtered_total"
totalDocsWrittenStat = "xdcr_docs_written_total"
totalDocsConflictResolutionRejected = "xdcr_docs_failed_cr_source_total"
)

var errNoXDCRMetrics = errors.New("No metric found")

// couchbaseServerManager implements a XDCR setup cluster on Couchbase Server.
type couchbaseServerManager struct {
fromBucket *base.GocbV2Bucket
Expand Down Expand Up @@ -170,15 +174,27 @@ func (x *couchbaseServerManager) Stats(ctx context.Context) (*Stats, error) {
return nil, err
}
stats := &Stats{}
stats.DocsFiltered, err = x.getValue(mf[totalDocsFilteredStat])
if err != nil {
return stats, err
}
stats.DocsWritten, err = x.getValue(mf[totalDocsWrittenStat])
if err != nil {
return stats, err

statMap := map[string]*uint64{
totalMobileDocsFiltered: &stats.MobileDocsFiltered,
totalDocsWrittenStat: &stats.DocsWritten,
totalDocsConflictResolutionRejected: &stats.TargetNewerDocs,
}
var errs *base.MultiError
for metricName, stat := range statMap {
metricFamily, ok := mf[metricName]
if !ok {
errs = errs.Append(fmt.Errorf("Could not find %s metric: %+v", metricName, mf))
continue
}
var err error
*stat, err = x.getValue(metricFamily)
if err != nil {
errs = errs.Append(err)
}
}
return stats, nil
stats.DocsProcessed = stats.DocsWritten + stats.MobileDocsFiltered + stats.TargetNewerDocs
return stats, errs.ErrorOrNil()
}

func (x *couchbaseServerManager) getValue(metrics *dto.MetricFamily) (uint64, error) {
Expand All @@ -204,7 +220,7 @@ outer:
return 0, fmt.Errorf("Do not have a relevant type for %v", metrics.Type)
}
}
return 0, fmt.Errorf("Could not find relevant value for metrics %v", metrics)
return 0, errNoXDCRMetrics
}

var _ Manager = &couchbaseServerManager{}
14 changes: 14 additions & 0 deletions xdcr/replication.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ import (
"fmt"

"github.com/couchbase/sync_gateway/base"
"github.com/couchbase/sync_gateway/db"
"github.com/couchbaselabs/rosmar"
)

Expand Down Expand Up @@ -75,3 +76,16 @@ func NewXDCR(ctx context.Context, fromBucket, toBucket base.Bucket, opts XDCROpt
}
return newCouchbaseServerManager(ctx, gocbFromBucket, gocbToBucket, opts)
}

// getSourceID returns the source ID for a bucket.
func getSourceID(ctx context.Context, bucket base.Bucket) (string, error) {
serverUUID, err := db.GetServerUUID(ctx, bucket)
if err != nil {
return "", err
}
bucketUUID, err := bucket.UUID()
if err != nil {
return "", err
}
return db.CreateEncodedSourceID(bucketUUID, serverUUID)
}
80 changes: 57 additions & 23 deletions xdcr/rosmar_xdcr.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ import (

sgbucket "github.com/couchbase/sg-bucket"
"github.com/couchbase/sync_gateway/base"
"github.com/couchbase/sync_gateway/db"
"github.com/couchbaselabs/rosmar"
)

Expand All @@ -27,21 +28,27 @@ type rosmarManager struct {
terminator chan bool
toBucketCollections map[uint32]*rosmar.Collection
fromBucket *rosmar.Bucket
fromBucketSourceID string
toBucket *rosmar.Bucket
replicationID string
docsFiltered atomic.Uint64
mobileDocsFiltered atomic.Uint64
docsWritten atomic.Uint64
errorCount atomic.Uint64
targetNewerDocs atomic.Uint64
}

// newRosmarManager creates an instance of XDCR backed by rosmar. This is not started until Start is called.
func newRosmarManager(_ context.Context, fromBucket, toBucket *rosmar.Bucket, opts XDCROptions) (Manager, error) {
func newRosmarManager(ctx context.Context, fromBucket, toBucket *rosmar.Bucket, opts XDCROptions) (Manager, error) {
if opts.Mobile != MobileOn {
return nil, errors.New("Only sgbucket.XDCRMobileOn is supported in rosmar")
}
fromBucketSourceID, err := getSourceID(ctx, fromBucket)
if err != nil {
return nil, fmt.Errorf("Could not get source ID for %s: %w", fromBucket.GetName(), err)
}
return &rosmarManager{
fromBucket: fromBucket,
fromBucketSourceID: fromBucketSourceID,
toBucket: toBucket,
replicationID: fmt.Sprintf("%s-%s", fromBucket.GetName(), toBucket.GetName()),
toBucketCollections: make(map[uint32]*rosmar.Collection),
Expand All @@ -67,18 +74,19 @@ func (r *rosmarManager) processEvent(ctx context.Context, event sgbucket.FeedEve
// Filter out events if we have a non XDCR filter
if r.filterFunc != nil && !r.filterFunc(&event) {
base.TracefCtx(ctx, base.KeyWalrus, "Filtering doc %s", docID)
r.docsFiltered.Add(1)
r.mobileDocsFiltered.Add(1)
return true
}

toCas, err := col.Get(docID, nil)
// Have to use GetWithXattrs to get a cas value back if there are no xattrs (GetWithXattrs will not return a cas if there are no xattrs)
_, toXattrs, toCas, err := col.GetWithXattrs(ctx, docID, []string{base.VvXattrName, base.MouXattrName})
if err != nil && !base.IsDocNotFoundError(err) {
base.WarnfCtx(ctx, "Skipping replicating doc %s, could not perform a kv op get doc in toBucket: %s", event.Key, err)
r.errorCount.Add(1)
return false
}

/* full LWW conflict resolution is not implemented in rosmar yet
/* full LWW conflict resolution is not implemented in rosmar yet. There is no need to implement this since CAS will always be unique due to rosmar limitations.
CBS algorithm is:
Expand Down Expand Up @@ -115,7 +123,7 @@ func (r *rosmarManager) processEvent(ctx context.Context, event sgbucket.FeedEve
return true
}

err = opWithMeta(ctx, col, toCas, event)
err = opWithMeta(ctx, col, r.fromBucketSourceID, toCas, toXattrs, event)
if err != nil {
base.WarnfCtx(ctx, "Replicating doc %s, could not write doc: %s", event.Key, err)
r.errorCount.Add(1)
Expand Down Expand Up @@ -185,42 +193,68 @@ func (r *rosmarManager) Stop(_ context.Context) error {
return nil
}

// opWithMeta writes a document to the target datastore given a type of Deletion or Mutation event with a specific cas.
func opWithMeta(ctx context.Context, collection *rosmar.Collection, originalCas uint64, event sgbucket.FeedEvent) error {
var xattrs []byte
// opWithMeta writes a document to the target datastore given a type of Deletion or Mutation event with a specific cas. The originalXattrs will contain only the _vv and _mou xattr.
func opWithMeta(ctx context.Context, collection *rosmar.Collection, sourceID string, originalCas uint64, originalXattrs map[string][]byte, event sgbucket.FeedEvent) error {
var xattrs map[string][]byte
var body []byte
if event.DataType&sgbucket.FeedDataTypeXattr != 0 {
var err error
var dcpXattrs map[string][]byte
body, dcpXattrs, err = sgbucket.DecodeValueWithAllXattrs(event.Value)
body, xattrs, err = sgbucket.DecodeValueWithAllXattrs(event.Value)
if err != nil {
return err
}
xattrs, err = xattrToBytes(dcpXattrs)
} else {
xattrs = make(map[string][]byte, 1) // size one for _vv
body = event.Value
}
var vv *db.HybridLogicalVector
if bytes, ok := originalXattrs[base.VvXattrName]; ok {
err := json.Unmarshal(bytes, &vv)
if err != nil {
return err
return fmt.Errorf("Could not unmarshal the existing vv xattr %s: %w", string(bytes), err)
}
} else {
body = event.Value
newVv := db.NewHybridLogicalVector()
vv = &newVv
}
// TODO: read existing originalXattrs[base.VvXattrName] and update the pv CBG-4250

// TODO: clear _mou when appropriate CBG-4251

// update new cv with new source/cas
casBytes := string(base.Uint64CASToLittleEndianHex(event.Cas))
vv.SourceID = sourceID
vv.CurrentVersionCAS = casBytes
vv.Version = casBytes

var err error
xattrs[base.VvXattrName], err = json.Marshal(vv)
if err != nil {
return err
}
xattrBytes, err := xattrToBytes(xattrs)
if err != nil {
return err
}

if event.Opcode == sgbucket.FeedOpDeletion {
return collection.DeleteWithMeta(ctx, string(event.Key), originalCas, event.Cas, event.Expiry, xattrs)
return collection.DeleteWithMeta(ctx, string(event.Key), originalCas, event.Cas, event.Expiry, xattrBytes)
}

return collection.SetWithMeta(ctx, string(event.Key), originalCas, event.Cas, event.Expiry, xattrs, body, event.DataType)
return collection.SetWithMeta(ctx, string(event.Key), originalCas, event.Cas, event.Expiry, xattrBytes, body, event.DataType)

}

// Stats returns the stats of the XDCR replication.
func (r *rosmarManager) Stats(context.Context) (*Stats, error) {

return &Stats{
DocsWritten: r.docsWritten.Load(),
DocsFiltered: r.docsFiltered.Load(),
ErrorCount: r.errorCount.Load(),
TargetNewerDocs: r.targetNewerDocs.Load(),
}, nil
stats := &Stats{
DocsWritten: r.docsWritten.Load(),
MobileDocsFiltered: r.mobileDocsFiltered.Load(),
ErrorCount: r.errorCount.Load(),
TargetNewerDocs: r.targetNewerDocs.Load(),
}
stats.DocsProcessed = stats.DocsWritten + stats.MobileDocsFiltered + stats.TargetNewerDocs
return stats, nil
}

// xattrToBytes converts a map of xattrs of marshalled json.
Expand Down
6 changes: 4 additions & 2 deletions xdcr/stats.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,10 +10,12 @@ package xdcr

// Stats represents the stats of a replication.
type Stats struct {
// DocsFiltered is the number of documents that have been filtered out and have not been replicated to the target cluster.
DocsFiltered uint64
// MobileDocsFiltered is the number of documents that have been filtered out and have not been replicated to the target cluster.
MobileDocsFiltered uint64
// DocsWritten is the number of documents written to the destination cluster, since the start or resumption of the current replication.
DocsWritten uint64
// DocsProcessed is the number of documents that have been processed by the replication.
DocsProcessed uint64
// ErrorCount is the number of errors that have occurred during the replication.
ErrorCount uint64

Expand Down
Loading

0 comments on commit d583123

Please sign in to comment.