Skip to content

Commit

Permalink
Beryllium: Rename SourceAndVersion to Version / Improve HLV comme…
Browse files Browse the repository at this point in the history
…nts. (#6614)

* - Rename `SourceAndVersion` to just `Version`, and rename `SourceAndVersion.Version` to `Value`.
- Add/Improve comments.

* Update db/hybrid_logical_vector.go
  • Loading branch information
bbrks committed Jan 10, 2024
1 parent 2586bf3 commit 481b531
Show file tree
Hide file tree
Showing 13 changed files with 110 additions and 97 deletions.
8 changes: 4 additions & 4 deletions db/changes.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ type ChangeEntry struct {
principalDoc bool // Used to indicate _user/_role docs
Revoked bool `json:"revoked,omitempty"`
collectionID uint32
CurrentVersion *SourceAndVersion `json:"current_version,omitempty"` // the current version of the change entry
CurrentVersion *Version `json:"current_version,omitempty"` // the current version of the change entry
}

const (
Expand Down Expand Up @@ -486,7 +486,7 @@ func makeChangeEntry(logEntry *LogEntry, seqID SequenceID, channel channels.ID)
// This allows current version to be nil in event of CV not being populated on log entry
// allowing omitempty to work as expected
if logEntry.SourceID != "" && logEntry.Version != 0 {
change.CurrentVersion = &SourceAndVersion{SourceID: logEntry.SourceID, Version: logEntry.Version}
change.CurrentVersion = &Version{SourceID: logEntry.SourceID, Value: logEntry.Version}
}
if logEntry.Flags&channels.Removed != 0 {
change.Removed = base.SetOf(channel.Name)
Expand Down Expand Up @@ -1289,8 +1289,8 @@ func createChangesEntry(ctx context.Context, docid string, db *DatabaseCollectio
row.SetBranched((populatedDoc.Flags & channels.Branched) != 0)

if populatedDoc.HLV != nil {
cv := SourceAndVersion{}
cv.SourceID, cv.Version = populatedDoc.HLV.GetCurrentVersion()
cv := Version{}
cv.SourceID, cv.Value = populatedDoc.HLV.GetCurrentVersion()
row.CurrentVersion = &cv
}

Expand Down
2 changes: 1 addition & 1 deletion db/changes_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -320,7 +320,7 @@ func TestCVPopulationOnChangeEntry(t *testing.T) {

assert.Equal(t, doc.ID, changes[0].ID)
assert.Equal(t, bucketUUID, changes[0].CurrentVersion.SourceID)
assert.Equal(t, doc.Cas, changes[0].CurrentVersion.Version)
assert.Equal(t, doc.Cas, changes[0].CurrentVersion.Value)
}

func TestDocDeletionFromChannelCoalesced(t *testing.T) {
Expand Down
16 changes: 8 additions & 8 deletions db/crud.go
Original file line number Diff line number Diff line change
Expand Up @@ -884,9 +884,9 @@ func (db *DatabaseCollectionWithUser) updateHLV(d *Document, docUpdateEvent DocU
d.HLV.ImportCAS = d.Cas
} else {
// Otherwise this is an SDK mutation made by the local cluster that should be added to HLV.
newVVEntry := SourceAndVersion{}
newVVEntry := Version{}
newVVEntry.SourceID = db.dbCtx.BucketUUID
newVVEntry.Version = hlvExpandMacroCASValue
newVVEntry.Value = hlvExpandMacroCASValue
err := d.SyncData.HLV.AddVersion(newVVEntry)
if err != nil {
return nil, err
Expand All @@ -897,9 +897,9 @@ func (db *DatabaseCollectionWithUser) updateHLV(d *Document, docUpdateEvent DocU

case NewVersion, ExistingVersionWithUpdateToHLV:
// add a new entry to the version vector
newVVEntry := SourceAndVersion{}
newVVEntry := Version{}
newVVEntry.SourceID = db.dbCtx.BucketUUID
newVVEntry.Version = hlvExpandMacroCASValue
newVVEntry.Value = hlvExpandMacroCASValue
err := d.SyncData.HLV.AddVersion(newVVEntry)
if err != nil {
return nil, err
Expand Down Expand Up @@ -1059,7 +1059,7 @@ func (db *DatabaseCollectionWithUser) Put(ctx context.Context, docid string, bod
return newRevID, doc, err
}

func (db *DatabaseCollectionWithUser) PutExistingCurrentVersion(ctx context.Context, newDoc *Document, docHLV HybridLogicalVector, existingDoc *sgbucket.BucketDocument) (doc *Document, cv *SourceAndVersion, newRevID string, err error) {
func (db *DatabaseCollectionWithUser) PutExistingCurrentVersion(ctx context.Context, newDoc *Document, docHLV HybridLogicalVector, existingDoc *sgbucket.BucketDocument) (doc *Document, cv *Version, newRevID string, err error) {
var matchRev string
if existingDoc != nil {
doc, unmarshalErr := unmarshalDocumentWithXattr(ctx, newDoc.ID, existingDoc.Body, existingDoc.Xattr, existingDoc.UserXattr, existingDoc.Cas, DocUnmarshalRev)
Expand Down Expand Up @@ -1146,11 +1146,11 @@ func (db *DatabaseCollectionWithUser) PutExistingCurrentVersion(ctx context.Cont

if doc != nil && doc.HLV != nil {
if cv == nil {
cv = &SourceAndVersion{}
cv = &Version{}
}
source, version := doc.HLV.GetCurrentVersion()
cv.SourceID = source
cv.Version = version
cv.Value = version
}

return doc, cv, newRevID, err
Expand Down Expand Up @@ -2190,7 +2190,7 @@ func (db *DatabaseCollectionWithUser) updateAndReturnDoc(ctx context.Context, do
Expiry: doc.Expiry,
Deleted: doc.History[newRevID].Deleted,
_shallowCopyBody: storedDoc.Body(ctx),
CV: &SourceAndVersion{Version: doc.HLV.Version, SourceID: doc.HLV.SourceID},
CV: &Version{Value: doc.HLV.Version, SourceID: doc.HLV.SourceID},
}

if createNewRevIDSkipped {
Expand Down
4 changes: 2 additions & 2 deletions db/crud_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1744,7 +1744,7 @@ func TestPutExistingCurrentVersion(t *testing.T) {
require.NoError(t, err)
// assert on returned CV
assert.Equal(t, "test", cv.SourceID)
assert.Equal(t, incomingVersion, cv.Version)
assert.Equal(t, incomingVersion, cv.Value)
assert.Equal(t, []byte(`{"key1":"value2"}`), doc._rawBody)

// assert on the sync data from the above update to the doc
Expand Down Expand Up @@ -1851,7 +1851,7 @@ func TestPutExistingCurrentVersionWithNoExistingDoc(t *testing.T) {
assert.NotNil(t, doc)
// assert on returned CV value
assert.Equal(t, "test", cv.SourceID)
assert.Equal(t, incomingVersion, cv.Version)
assert.Equal(t, incomingVersion, cv.Value)
assert.Equal(t, []byte(`{"key1":"value2"}`), doc._rawBody)

// assert on the sync data from the above update to the doc
Expand Down
4 changes: 2 additions & 2 deletions db/database_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1138,7 +1138,7 @@ func TestConflicts(t *testing.T) {
Changes: []ChangeRev{{"rev": "2-b"}, {"rev": "2-a"}},
branched: true,
collectionID: collectionID,
CurrentVersion: &SourceAndVersion{SourceID: bucketUUID, Version: fetchedDoc.Cas},
CurrentVersion: &Version{SourceID: bucketUUID, Value: fetchedDoc.Cas},
}, changes[0],
)

Expand Down Expand Up @@ -1174,7 +1174,7 @@ func TestConflicts(t *testing.T) {
Changes: []ChangeRev{{"rev": "2-a"}, {"rev": rev3}},
branched: true,
collectionID: collectionID,
CurrentVersion: &SourceAndVersion{SourceID: bucketUUID, Version: doc.Cas},
CurrentVersion: &Version{SourceID: bucketUUID, Value: doc.Cas},
}, changes[0])

}
Expand Down
4 changes: 2 additions & 2 deletions db/document.go
Original file line number Diff line number Diff line change
Expand Up @@ -1239,14 +1239,14 @@ func (doc *Document) MarshalWithXattr() (data []byte, xdata []byte, err error) {
}

// HasCurrentVersion Compares the specified CV with the fetched documents CV, returns error on mismatch between the two
func (d *Document) HasCurrentVersion(cv SourceAndVersion) error {
func (d *Document) HasCurrentVersion(cv Version) error {
if d.HLV == nil {
return base.RedactErrorf("no HLV present in fetched doc %s", base.UD(d.ID))
}

// fetch the current version for the loaded doc and compare against the CV specified in the IDandCV key
fetchedDocSource, fetchedDocVersion := d.HLV.GetCurrentVersion()
if fetchedDocSource != cv.SourceID || fetchedDocVersion != cv.Version {
if fetchedDocSource != cv.SourceID || fetchedDocVersion != cv.Value {
return base.RedactErrorf("mismatch between specified current version and fetched document current version for doc %s", base.UD(d.ID))
}
return nil
Expand Down
51 changes: 32 additions & 19 deletions db/hybrid_logical_vector.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
package db

import (
"encoding/base64"
"fmt"
"math"

Expand All @@ -19,6 +20,7 @@ import (
// hlvExpandMacroCASValue causes the field to be populated by CAS value by macro expansion
const hlvExpandMacroCASValue = math.MaxUint64

// HybridLogicalVector (HLV) is a type that represents a vector of Hybrid Logical Clocks.
type HybridLogicalVector struct {
CurrentVersionCAS uint64 // current version cas (or cvCAS) stores the current CAS at the time of replication
ImportCAS uint64 // Set when an import modifies the document CAS but preserves the HLV (import of a version replicated by XDCR)
Expand All @@ -28,19 +30,30 @@ type HybridLogicalVector struct {
PreviousVersions map[string]uint64 // map of previous versions for fast efficient lookup
}

// SourceAndVersion is a structure used to add a new entry to a HLV
type SourceAndVersion struct {
// Version is representative of a single entry in a HybridLogicalVector.
type Version struct {
// SourceID is an ID representing the source of the value (e.g. Couchbase Lite ID)
SourceID string `json:"source_id"`
Version uint64 `json:"version"`
// Value is a Hybrid Logical Clock value (In Couchbase Server, CAS is a HLC)
Value uint64 `json:"version"`
}

func CreateVersion(source string, version uint64) SourceAndVersion {
return SourceAndVersion{
func CreateVersion(source string, version uint64) Version {
return Version{
SourceID: source,
Version: version,
Value: version,
}
}

// String returns a Couchbase Lite-compatible string representation of the version.
func (v Version) String() string {
timestamp := string(base.Uint64CASToLittleEndianHex(v.Value))
source := base64.StdEncoding.EncodeToString([]byte(v.SourceID))
return timestamp + "@" + source
}

// PersistedHybridLogicalVector is the marshalled format of HybridLogicalVector.
// This representation needs to be kept in sync with XDCR.
type PersistedHybridLogicalVector struct {
CurrentVersionCAS string `json:"cvCas,omitempty"`
ImportCAS string `json:"importCAS,omitempty"`
Expand All @@ -50,20 +63,20 @@ type PersistedHybridLogicalVector struct {
PreviousVersions map[string]string `json:"pv,omitempty"`
}

// NewHybridLogicalVector returns a HybridLogicalVector struct with maps initialised in the struct
// NewHybridLogicalVector returns an initialised HybridLogicalVector.
func NewHybridLogicalVector() HybridLogicalVector {
return HybridLogicalVector{
PreviousVersions: make(map[string]uint64),
MergeVersions: make(map[string]uint64),
}
}

// GetCurrentVersion return the current version vector from the HLV in memory
// GetCurrentVersion returns the current version from the HLV in memory.
func (hlv *HybridLogicalVector) GetCurrentVersion() (string, uint64) {
return hlv.SourceID, hlv.Version
}

// IsInConflict tests to see if in memory HLV is conflicting with another HLV
// IsInConflict tests to see if in memory HLV is conflicting with another HLV.
func (hlv *HybridLogicalVector) IsInConflict(otherVector HybridLogicalVector) bool {
// test if either HLV(A) or HLV(B) are dominating over each other. If so they are not in conflict
if hlv.isDominating(otherVector) || otherVector.isDominating(*hlv) {
Expand All @@ -73,21 +86,20 @@ func (hlv *HybridLogicalVector) IsInConflict(otherVector HybridLogicalVector) bo
return true
}

// AddVersion adds a version vector to the in memory representation of a HLV and moves current version vector to
// previous versions on the HLV if needed
func (hlv *HybridLogicalVector) AddVersion(newVersion SourceAndVersion) error {
if newVersion.Version < hlv.Version {
return fmt.Errorf("attempting to add new version vector entry with a CAS that is less than the current version CAS value. Current cas: %d new cas %d", hlv.Version, newVersion.Version)
// AddVersion adds newVersion to the in memory representation of the HLV.
func (hlv *HybridLogicalVector) AddVersion(newVersion Version) error {
if newVersion.Value < hlv.Version {
return fmt.Errorf("attempting to add new version vector entry with a CAS that is less than the current version CAS value. Current cas: %d new cas %d", hlv.Version, newVersion.Value)
}
// check if this is the first time we're adding a source - version pair
if hlv.SourceID == "" {
hlv.Version = newVersion.Version
hlv.Version = newVersion.Value
hlv.SourceID = newVersion.SourceID
return nil
}
// if new entry has the same source we simple just update the version
if newVersion.SourceID == hlv.SourceID {
hlv.Version = newVersion.Version
hlv.Version = newVersion.Value
return nil
}
// if we get here this is a new version from a different sourceID thus need to move current sourceID to previous versions and update current version
Expand All @@ -107,12 +119,13 @@ func (hlv *HybridLogicalVector) AddVersion(newVersion SourceAndVersion) error {
// source doesn't exist in PV so add
hlv.PreviousVersions[hlv.SourceID] = hlv.Version
}
hlv.Version = newVersion.Version
hlv.Version = newVersion.Value
hlv.SourceID = newVersion.SourceID
return nil
}

// Remove removes a vector from previous versions section of in memory HLV
// Remove removes a source from previous versions of the HLV.
// TODO: Does this need to remove source from current version as well? Merge Versions?
func (hlv *HybridLogicalVector) Remove(source string) error {
// if entry is not found in previous versions we return error
if hlv.PreviousVersions[source] == 0 {
Expand Down Expand Up @@ -215,7 +228,7 @@ func (hlv *HybridLogicalVector) AddNewerVersions(otherVector HybridLogicalVector

// create current version for incoming vector and attempt to add it to the local HLV, AddVersion will handle if attempting to add older
// version than local HLVs CV pair
otherVectorCV := SourceAndVersion{SourceID: otherVector.SourceID, Version: otherVector.Version}
otherVectorCV := Version{SourceID: otherVector.SourceID, Value: otherVector.Version}
err := hlv.AddVersion(otherVectorCV)
if err != nil {
return err
Expand Down
8 changes: 4 additions & 4 deletions db/hybrid_logical_vector_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,13 +37,13 @@ func TestInternalHLVFunctions(t *testing.T) {
const newSource = "s_testsource"

// create a new version vector entry that will error method AddVersion
badNewVector := SourceAndVersion{
Version: 123345,
badNewVector := Version{
Value: 123345,
SourceID: currSourceId,
}
// create a new version vector entry that should be added to HLV successfully
newVersionVector := SourceAndVersion{
Version: newCAS,
newVersionVector := Version{
Value: newCAS,
SourceID: currSourceId,
}

Expand Down
4 changes: 2 additions & 2 deletions db/revision_cache_bypass.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ func (rc *BypassRevisionCache) GetWithRev(ctx context.Context, docID, revID stri
}

// GetWithCV fetches the Current Version for the given docID and CV immediately from the bucket.
func (rc *BypassRevisionCache) GetWithCV(ctx context.Context, docID string, cv *SourceAndVersion, includeBody, includeDelta bool) (docRev DocumentRevision, err error) {
func (rc *BypassRevisionCache) GetWithCV(ctx context.Context, docID string, cv *Version, includeBody, includeDelta bool) (docRev DocumentRevision, err error) {

unmarshalLevel := DocUnmarshalSync
if includeBody {
Expand Down Expand Up @@ -126,7 +126,7 @@ func (rc *BypassRevisionCache) RemoveWithRev(docID, revID string) {
// nop
}

func (rc *BypassRevisionCache) RemoveWithCV(docID string, cv *SourceAndVersion) {
func (rc *BypassRevisionCache) RemoveWithCV(docID string, cv *Version) {
// nop
}

Expand Down
18 changes: 9 additions & 9 deletions db/revision_cache_interface.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ type RevisionCache interface {
// GetWithCV returns the given revision by CV, and stores if not already cached.
// When includeBody=true, the returned DocumentRevision will include a mutable shallow copy of the marshaled body.
// When includeDelta=true, the returned DocumentRevision will include delta - requires additional locking during retrieval.
GetWithCV(ctx context.Context, docID string, cv *SourceAndVersion, includeBody, includeDelta bool) (DocumentRevision, error)
GetWithCV(ctx context.Context, docID string, cv *Version, includeBody, includeDelta bool) (DocumentRevision, error)

// GetActive returns the current revision for the given doc ID, and stores if not already cached.
// When includeBody=true, the returned DocumentRevision will include a mutable shallow copy of the marshaled body.
Expand All @@ -55,7 +55,7 @@ type RevisionCache interface {
RemoveWithRev(docID, revID string)

// RemoveWithCV evicts a revision from the cache using its current version.
RemoveWithCV(docID string, cv *SourceAndVersion)
RemoveWithCV(docID string, cv *Version)

// UpdateDelta stores the given toDelta value in the given rev if cached
UpdateDelta(ctx context.Context, docID, revID string, toDelta RevisionDelta)
Expand Down Expand Up @@ -128,7 +128,7 @@ type DocumentRevision struct {
Delta *RevisionDelta
Deleted bool
Removed bool // True if the revision is a removal.
CV *SourceAndVersion
CV *Version

_shallowCopyBody Body // an unmarshalled body that can produce shallow copies
}
Expand Down Expand Up @@ -262,7 +262,7 @@ func newRevCacheDelta(deltaBytes []byte, fromRevID string, toRevision DocumentRe

// This is the RevisionCacheLoaderFunc callback for the context's RevisionCache.
// Its job is to load a revision from the bucket when there's a cache miss.
func revCacheLoader(ctx context.Context, backingStore RevisionCacheBackingStore, id IDAndRev, unmarshalBody bool) (bodyBytes []byte, body Body, history Revisions, channels base.Set, removed bool, attachments AttachmentsMeta, deleted bool, expiry *time.Time, fetchedCV *SourceAndVersion, err error) {
func revCacheLoader(ctx context.Context, backingStore RevisionCacheBackingStore, id IDAndRev, unmarshalBody bool) (bodyBytes []byte, body Body, history Revisions, channels base.Set, removed bool, attachments AttachmentsMeta, deleted bool, expiry *time.Time, fetchedCV *Version, err error) {
var doc *Document
unmarshalLevel := DocUnmarshalSync
if unmarshalBody {
Expand All @@ -278,8 +278,8 @@ func revCacheLoader(ctx context.Context, backingStore RevisionCacheBackingStore,
// revCacheLoaderForCv will load a document from the bucket using the CV, comapre the fetched doc and the CV specified in the function,
// and will still return revid for purpose of populating the Rev ID lookup map on the cache
func revCacheLoaderForCv(ctx context.Context, backingStore RevisionCacheBackingStore, id IDandCV, unmarshalBody bool) (bodyBytes []byte, body Body, history Revisions, channels base.Set, removed bool, attachments AttachmentsMeta, deleted bool, expiry *time.Time, revid string, err error) {
cv := SourceAndVersion{
Version: id.Version,
cv := Version{
Value: id.Version,
SourceID: id.Source,
}
var doc *Document
Expand All @@ -295,7 +295,7 @@ func revCacheLoaderForCv(ctx context.Context, backingStore RevisionCacheBackingS
}

// Common revCacheLoader functionality used either during a cache miss (from revCacheLoader), or directly when retrieving current rev from cache
func revCacheLoaderForDocument(ctx context.Context, backingStore RevisionCacheBackingStore, doc *Document, revid string) (bodyBytes []byte, body Body, history Revisions, channels base.Set, removed bool, attachments AttachmentsMeta, deleted bool, expiry *time.Time, fetchedCV *SourceAndVersion, err error) {
func revCacheLoaderForDocument(ctx context.Context, backingStore RevisionCacheBackingStore, doc *Document, revid string) (bodyBytes []byte, body Body, history Revisions, channels base.Set, removed bool, attachments AttachmentsMeta, deleted bool, expiry *time.Time, fetchedCV *Version, err error) {
if bodyBytes, body, attachments, err = backingStore.getRevision(ctx, doc, revid); err != nil {
// If we can't find the revision (either as active or conflicted body from the document, or as old revision body backup), check whether
// the revision was a channel removal. If so, we want to store as removal in the revision cache
Expand All @@ -320,15 +320,15 @@ func revCacheLoaderForDocument(ctx context.Context, backingStore RevisionCacheBa
history = encodeRevisions(ctx, doc.ID, validatedHistory)
channels = doc.History[revid].Channels
if doc.HLV != nil {
fetchedCV = &SourceAndVersion{SourceID: doc.HLV.SourceID, Version: doc.HLV.Version}
fetchedCV = &Version{SourceID: doc.HLV.SourceID, Value: doc.HLV.Version}
}

return bodyBytes, body, history, channels, removed, attachments, deleted, doc.Expiry, fetchedCV, err
}

// revCacheLoaderForDocumentCV used either during cache miss (from revCacheLoaderForCv), or used directly when getting current active CV from cache
// nolint:staticcheck
func revCacheLoaderForDocumentCV(ctx context.Context, backingStore RevisionCacheBackingStore, doc *Document, cv SourceAndVersion) (bodyBytes []byte, body Body, history Revisions, channels base.Set, removed bool, attachments AttachmentsMeta, deleted bool, expiry *time.Time, revid string, err error) {
func revCacheLoaderForDocumentCV(ctx context.Context, backingStore RevisionCacheBackingStore, doc *Document, cv Version) (bodyBytes []byte, body Body, history Revisions, channels base.Set, removed bool, attachments AttachmentsMeta, deleted bool, expiry *time.Time, revid string, err error) {
if bodyBytes, body, attachments, err = backingStore.getCurrentVersion(ctx, doc); err != nil {
// TODO: pending CBG-3213 support of channel removal for CV
// we need implementation of IsChannelRemoval for CV here.
Expand Down
Loading

0 comments on commit 481b531

Please sign in to comment.