Skip to content

Commit

Permalink
CBG-3354 Channel query support for current version (#6625)
Browse files Browse the repository at this point in the history
* CBG-3354 Channel query support for current version

Adds current version to marshalled _sync.rev property for use with existing indexes.

New struct RevAndVersion handles marshal/unmarshal of the rev property, and supports rev only (string) and rev/src/version (map).

New structs SyncDataJSON and SyncDataAlias are used to encapsulate this handling at the persistence/marshalling layer.  This avoids changes to use of SyncData.CurrentRev, and also avoids potential errors by not duplicating cv in SyncData.

* Test updates based on PR feedback
  • Loading branch information
adamcfraser authored and bbrks committed Jan 10, 2024
1 parent 481b531 commit f185349
Show file tree
Hide file tree
Showing 18 changed files with 413 additions and 130 deletions.
4 changes: 3 additions & 1 deletion channels/log_entry.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,13 +60,15 @@ type LogEntry struct {

func (l LogEntry) String() string {
return fmt.Sprintf(
"seq: %d docid: %s revid: %s vbno: %d type: %v collectionID: %d",
"seq: %d docid: %s revid: %s vbno: %d type: %v collectionID: %d source: %s version: %d",
l.Sequence,
l.DocID,
l.RevID,
l.VbNo,
l.Type,
l.CollectionID,
l.SourceID,
l.Version,
)
}

Expand Down
8 changes: 8 additions & 0 deletions db/change_cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -120,6 +120,14 @@ func (entry *LogEntry) SetDeleted() {
entry.Flags |= channels.Deleted
}

func (entry *LogEntry) SetRevAndVersion(rv RevAndVersion) {
entry.RevID = rv.RevTreeID
if rv.CurrentSource != "" {
entry.SourceID = rv.CurrentSource
entry.Version = base.HexCasToUint64(rv.CurrentVersion)
}
}

type LogEntries []*LogEntry

// A priority-queue of LogEntries, kept ordered by increasing sequence #.
Expand Down
2 changes: 1 addition & 1 deletion db/changes.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ type ChangeEntry struct {
principalDoc bool // Used to indicate _user/_role docs
Revoked bool `json:"revoked,omitempty"`
collectionID uint32
CurrentVersion *Version `json:"current_version,omitempty"` // the current version of the change entry
CurrentVersion *Version `json:"-"` // the current version of the change entry. (Not marshalled, pending REST support for cv)
}

const (
Expand Down
7 changes: 3 additions & 4 deletions db/changes_view.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ type channelsViewRow struct {
ID string
Key []interface{} // Actually [channelName, sequence]
Value struct {
Rev string
Rev RevAndVersion
Flags uint8
}
}
Expand All @@ -42,13 +42,12 @@ func nextChannelViewEntry(ctx context.Context, results sgbucket.QueryResultItera
entry := &LogEntry{
Sequence: uint64(viewRow.Key[1].(float64)),
DocID: viewRow.ID,
RevID: viewRow.Value.Rev,
Flags: viewRow.Value.Flags,
TimeReceived: time.Now(),
CollectionID: collectionID,
}
entry.SetRevAndVersion(viewRow.Value.Rev)
return entry, true

}

func nextChannelQueryEntry(ctx context.Context, results sgbucket.QueryResultIterator, collectionID uint32) (*LogEntry, bool) {
Expand All @@ -61,11 +60,11 @@ func nextChannelQueryEntry(ctx context.Context, results sgbucket.QueryResultIter
entry := &LogEntry{
Sequence: queryRow.Sequence,
DocID: queryRow.Id,
RevID: queryRow.Rev,
Flags: queryRow.Flags,
TimeReceived: time.Now(),
CollectionID: collectionID,
}
entry.SetRevAndVersion(queryRow.Rev)

if queryRow.RemovalRev != "" {
entry.RevID = queryRow.RemovalRev
Expand Down
13 changes: 9 additions & 4 deletions db/crud.go
Original file line number Diff line number Diff line change
Expand Up @@ -2762,10 +2762,11 @@ func (db *DatabaseCollectionWithUser) CheckProposedRev(ctx context.Context, doci
}

const (
xattrMacroCas = "cas"
xattrMacroValueCrc32c = "value_crc32c"
versionVectorVrsMacro = "_vv.vrs"
versionVectorCVCASMacro = "_vv.cvCas"
xattrMacroCas = "cas" // SyncData.Cas
xattrMacroValueCrc32c = "value_crc32c" // SyncData.Crc32c
xattrMacroCurrentRevVersion = "rev.vrs" // SyncDataJSON.RevAndVersion.CurrentVersion
versionVectorVrsMacro = "_vv.vrs" // PersistedHybridLogicalVector.Version
versionVectorCVCASMacro = "_vv.cvCas" // PersistedHybridLogicalVector.CurrentVersionCAS
)

func macroExpandSpec(xattrName string) []sgbucket.MacroExpansionSpec {
Expand All @@ -2785,6 +2786,10 @@ func xattrCrc32cPath(xattrKey string) string {
return xattrKey + "." + xattrMacroValueCrc32c
}

func xattrCurrentRevVersionPath(xattrKey string) string {
return xattrKey + "." + xattrMacroCurrentRevVersion
}

func xattrCurrentVersionPath(xattrKey string) string {
return xattrKey + "." + versionVectorVrsMacro
}
Expand Down
4 changes: 2 additions & 2 deletions db/database.go
Original file line number Diff line number Diff line change
Expand Up @@ -970,15 +970,15 @@ func (c *DatabaseCollection) processForEachDocIDResults(ctx context.Context, cal
found = results.Next(ctx, &viewRow)
if found {
docid = viewRow.Key
revid = viewRow.Value.RevID
revid = viewRow.Value.RevID.RevTreeID
seq = viewRow.Value.Sequence
channels = viewRow.Value.Channels
}
} else {
found = results.Next(ctx, &queryRow)
if found {
docid = queryRow.Id
revid = queryRow.RevID
revid = queryRow.RevID.RevTreeID
seq = queryRow.Sequence
channels = make([]string, 0)
// Query returns all channels, but we only want to return active channels
Expand Down
84 changes: 81 additions & 3 deletions db/database_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1828,6 +1828,84 @@ func TestChannelView(t *testing.T) {
log.Printf("View Query returned entry (%d): %v", i, entry)
}
assert.Equal(t, 1, len(entries))
require.Equal(t, "doc1", entries[0].DocID)
collection.RequireCurrentVersion(t, "doc1", entries[0].SourceID, entries[0].Version)
}

func TestChannelQuery(t *testing.T) {

db, ctx := setupTestDB(t)
defer db.Close(ctx)
collection := GetSingleDatabaseCollectionWithUser(t, db)
_, err := collection.UpdateSyncFun(ctx, `function(doc, oldDoc) {
channel(doc.channels);
}`)
require.NoError(t, err)

// Create doc
body := Body{"key1": "value1", "key2": 1234, "channels": "ABC"}
rev1ID, _, err := collection.Put(ctx, "doc1", body)
require.NoError(t, err, "Couldn't create doc1")

// Create a doc to test removal handling. Needs three revisions so that the removal rev (2) isn't
// the current revision
removedDocID := "removed_doc"
removedDocRev1, _, err := collection.Put(ctx, removedDocID, body)
require.NoError(t, err, "Couldn't create removed_doc")
removalSource, removalVersion := collection.GetDocumentCurrentVersion(t, removedDocID)

updatedChannelBody := Body{"_rev": removedDocRev1, "key1": "value1", "key2": 1234, "channels": "DEF"}
removalRev, _, err := collection.Put(ctx, removedDocID, updatedChannelBody)
require.NoError(t, err, "Couldn't update removed_doc")

updatedChannelBody = Body{"_rev": removalRev, "key1": "value1", "key2": 2345, "channels": "DEF"}
removedDocRev3, _, err := collection.Put(ctx, removedDocID, updatedChannelBody)
require.NoError(t, err, "Couldn't update removed_doc")

var entries LogEntries

// Test query retrieval via star channel and named channel (queries use different indexes)
testCases := []struct {
testName string
channelName string
}{
{
testName: "star channel",
channelName: "*",
},
{
testName: "named channel",
channelName: "ABC",
},
}

for _, testCase := range testCases {
t.Run(testCase.testName, func(t *testing.T) {
entries, err = collection.getChangesInChannelFromQuery(ctx, testCase.channelName, 0, 100, 0, false)
require.NoError(t, err)

for i, entry := range entries {
log.Printf("Channel Query returned entry (%d): %v", i, entry)
}
require.Len(t, entries, 2)
require.Equal(t, "doc1", entries[0].DocID)
require.Equal(t, rev1ID, entries[0].RevID)
collection.RequireCurrentVersion(t, "doc1", entries[0].SourceID, entries[0].Version)

removedDocEntry := entries[1]
require.Equal(t, removedDocID, removedDocEntry.DocID)
if testCase.channelName == "*" {
require.Equal(t, removedDocRev3, removedDocEntry.RevID)
collection.RequireCurrentVersion(t, removedDocID, removedDocEntry.SourceID, removedDocEntry.Version)
} else {
require.Equal(t, removalRev, removedDocEntry.RevID)
// TODO: Pending channel removal rev handling, CBG-3213
log.Printf("removal rev check of removal cv %s@%d is pending CBG-3213", removalSource, removalVersion)
//require.Equal(t, removalSource, removedDocEntry.SourceID)
//require.Equal(t, removalVersion, removedDocEntry.Version)
}
})
}

}

Expand Down Expand Up @@ -2451,7 +2529,7 @@ func TestDeleteWithNoTombstoneCreationSupport(t *testing.T) {
assert.NoError(t, err)

var doc Body
var xattr Body
var xattr SyncData

// Ensure document has been added
waitAndAssertCondition(t, func() bool {
Expand All @@ -2462,8 +2540,8 @@ func TestDeleteWithNoTombstoneCreationSupport(t *testing.T) {
assert.Equal(t, int64(1), db.DbStats.SharedBucketImport().ImportCount.Value())

assert.Nil(t, doc)
assert.Equal(t, "1-2cac91faf7b3f5e5fd56ff377bdb5466", xattr["rev"])
assert.Equal(t, float64(2), xattr["sequence"])
assert.Equal(t, "1-2cac91faf7b3f5e5fd56ff377bdb5466", xattr.CurrentRev)
assert.Equal(t, uint64(2), xattr.Sequence)
}

func TestResyncUpdateAllDocChannels(t *testing.T) {
Expand Down
89 changes: 84 additions & 5 deletions db/document.go
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,7 @@ type ChannelSetEntry struct {

// The sync-gateway metadata stored in the "_sync" property of a Couchbase document.
type SyncData struct {
CurrentRev string `json:"rev"`
CurrentRev string `json:"-"` // CurrentRev. Persisted as RevAndVersion in SyncDataJSON
NewestRev string `json:"new_rev,omitempty"` // Newest rev, if different from CurrentRev
Flags uint8 `json:"flags,omitempty"`
Sequence uint64 `json:"sequence,omitempty"`
Expand Down Expand Up @@ -192,7 +192,7 @@ type historyOnlySyncData struct {

type revOnlySyncData struct {
casOnlySyncData
CurrentRev string `json:"rev"`
CurrentRev RevAndVersion `json:"rev"`
}

type casOnlySyncData struct {
Expand Down Expand Up @@ -1160,7 +1160,7 @@ func (doc *Document) UnmarshalWithXattr(ctx context.Context, data []byte, xdata
return pkgerrors.WithStack(base.RedactErrorf("Failed to UnmarshalWithXattr() doc with id: %s (DocUnmarshalHistory). Error: %v", base.UD(doc.ID), unmarshalErr))
}
doc.SyncData = SyncData{
CurrentRev: historyOnlyMeta.CurrentRev,
CurrentRev: historyOnlyMeta.CurrentRev.RevTreeID,
History: historyOnlyMeta.History,
Cas: historyOnlyMeta.Cas,
}
Expand All @@ -1173,7 +1173,7 @@ func (doc *Document) UnmarshalWithXattr(ctx context.Context, data []byte, xdata
return pkgerrors.WithStack(base.RedactErrorf("Failed to UnmarshalWithXattr() doc with id: %s (DocUnmarshalRev). Error: %v", base.UD(doc.ID), unmarshalErr))
}
doc.SyncData = SyncData{
CurrentRev: revOnlyMeta.CurrentRev,
CurrentRev: revOnlyMeta.CurrentRev.RevTreeID,
Cas: revOnlyMeta.Cas,
}
doc._rawBody = data
Expand Down Expand Up @@ -1230,7 +1230,7 @@ func (doc *Document) MarshalWithXattr() (data []byte, xdata []byte, err error) {
}
}

xdata, err = base.JSONMarshal(doc.SyncData)
xdata, err = base.JSONMarshal(&doc.SyncData)
if err != nil {
return nil, nil, pkgerrors.WithStack(base.RedactErrorf("Failed to MarshalWithXattr() doc SyncData with id: %s. Error: %v", base.UD(doc.ID), err))
}
Expand All @@ -1251,3 +1251,82 @@ func (d *Document) HasCurrentVersion(cv Version) error {
}
return nil
}

// SyncDataAlias is an alias for SyncData that doesn't define custom MarshalJSON/UnmarshalJSON
type SyncDataAlias SyncData

// SyncDataJSON is the persisted form of SyncData, with RevAndVersion populated at marshal time
type SyncDataJSON struct {
*SyncDataAlias
RevAndVersion RevAndVersion `json:"rev"`
}

// MarshalJSON populates RevAndVersion using CurrentRev and the HLV (current) source and version.
// Marshals using SyncDataAlias to avoid recursion, and SyncDataJSON to add the combined RevAndVersion.
func (s SyncData) MarshalJSON() (data []byte, err error) {

var sdj SyncDataJSON
var sd SyncDataAlias
sd = (SyncDataAlias)(s)
sdj.SyncDataAlias = &sd
sdj.RevAndVersion.RevTreeID = s.CurrentRev
if s.HLV != nil {
sdj.RevAndVersion.CurrentSource = s.HLV.SourceID
sdj.RevAndVersion.CurrentVersion = string(base.Uint64CASToLittleEndianHex(s.HLV.Version))
}
return base.JSONMarshal(sdj)
}

// UnmarshalJSON unmarshals using SyncDataJSON, then sets currentRev on SyncData based on the value in RevAndVersion.
// The HLV's current version stored in RevAndVersion is ignored at unmarshal time - the value in the HLV is the source
// of truth.
func (s *SyncData) UnmarshalJSON(data []byte) error {

var sdj *SyncDataJSON
err := base.JSONUnmarshal(data, &sdj)
if err != nil {
return err
}
*s = SyncData(*sdj.SyncDataAlias)
s.CurrentRev = sdj.RevAndVersion.RevTreeID
return nil
}

// RevAndVersion is used to store both revTreeID and currentVersion in a single property, for backwards compatibility
// with existing indexes using rev. When only RevTreeID is specified, is marshalled/unmarshalled as a string. Otherwise
// marshalled normally.
type RevAndVersion struct {
RevTreeID string `json:"rev,omitempty"`
CurrentSource string `json:"src,omitempty"`
CurrentVersion string `json:"vrs,omitempty"` // String representation of version
}

// RevAndVersionJSON aliases RevAndVersion to support conditional unmarshalling from either string (revTreeID) or
// map (RevAndVersion) representations
type RevAndVersionJSON RevAndVersion

// Marshals RevAndVersion as simple string when only RevTreeID is specified - otherwise performs standard
// marshalling
func (rv RevAndVersion) MarshalJSON() (data []byte, err error) {

if rv.CurrentSource == "" {
return base.JSONMarshal(rv.RevTreeID)
}
return base.JSONMarshal(RevAndVersionJSON(rv))
}

// Unmarshals either from string (legacy, revID only) or standard RevAndVersion unmarshalling.
func (rv *RevAndVersion) UnmarshalJSON(data []byte) error {

if len(data) == 0 {
return nil
}
switch data[0] {
case '"':
return base.JSONUnmarshal(data, &rv.RevTreeID)
case '{':
return base.JSONUnmarshal(data, (*RevAndVersionJSON)(rv))
default:
return fmt.Errorf("unrecognized JSON format for RevAndVersion: %s", data)
}
}
Loading

0 comments on commit f185349

Please sign in to comment.