From f185349766f5e37b1fdc2c797ef0f25cf0eb89bf Mon Sep 17 00:00:00 2001 From: Adam Fraser Date: Tue, 9 Jan 2024 00:35:21 -0800 Subject: [PATCH] CBG-3354 Channel query support for current version (#6625) * CBG-3354 Channel query support for current version Adds current version to marshalled _sync.rev property for use with existing indexes. New struct RevAndVersion handles marshal/unmarshal of the rev property, and supports rev only (string) and rev/src/version (map). New structs SyncDataJSON and SyncDataAlias are used to encapsulate this handling at the persistence/marshalling layer. This avoids changes to use of SyncData.CurrentRev, and also avoids potential errors by not duplicating cv in SyncData. * Test updates based on PR feedback --- channels/log_entry.go | 4 +- db/change_cache.go | 8 +++ db/changes.go | 2 +- db/changes_view.go | 7 +- db/crud.go | 13 ++-- db/database.go | 4 +- db/database_test.go | 84 +++++++++++++++++++++++- db/document.go | 89 +++++++++++++++++++++++-- db/document_test.go | 90 ++++++++++++++++++++++++++ db/hybrid_logical_vector.go | 32 ++++++++- db/import_test.go | 16 ++--- db/query.go | 20 +++--- db/util_testing.go | 26 ++++++++ rest/api_test.go | 6 +- rest/changes_test.go | 2 + rest/changestest/changes_api_test.go | 97 +++++++++------------------- rest/importtest/import_test.go | 38 +++++------ rest/utilities_testing.go | 5 +- 18 files changed, 413 insertions(+), 130 deletions(-) diff --git a/channels/log_entry.go b/channels/log_entry.go index ca73896bef..c0b46eecfc 100644 --- a/channels/log_entry.go +++ b/channels/log_entry.go @@ -60,13 +60,15 @@ type LogEntry struct { func (l LogEntry) String() string { return fmt.Sprintf( - "seq: %d docid: %s revid: %s vbno: %d type: %v collectionID: %d", + "seq: %d docid: %s revid: %s vbno: %d type: %v collectionID: %d source: %s version: %d", l.Sequence, l.DocID, l.RevID, l.VbNo, l.Type, l.CollectionID, + l.SourceID, + l.Version, ) } diff --git a/db/change_cache.go b/db/change_cache.go index eabbdb238a..547df73d84 100644 --- a/db/change_cache.go +++ b/db/change_cache.go @@ -120,6 +120,14 @@ func (entry *LogEntry) SetDeleted() { entry.Flags |= channels.Deleted } +func (entry *LogEntry) SetRevAndVersion(rv RevAndVersion) { + entry.RevID = rv.RevTreeID + if rv.CurrentSource != "" { + entry.SourceID = rv.CurrentSource + entry.Version = base.HexCasToUint64(rv.CurrentVersion) + } +} + type LogEntries []*LogEntry // A priority-queue of LogEntries, kept ordered by increasing sequence #. diff --git a/db/changes.go b/db/changes.go index d3deb60c06..04e651862c 100644 --- a/db/changes.go +++ b/db/changes.go @@ -57,7 +57,7 @@ type ChangeEntry struct { principalDoc bool // Used to indicate _user/_role docs Revoked bool `json:"revoked,omitempty"` collectionID uint32 - CurrentVersion *Version `json:"current_version,omitempty"` // the current version of the change entry + CurrentVersion *Version `json:"-"` // the current version of the change entry. (Not marshalled, pending REST support for cv) } const ( diff --git a/db/changes_view.go b/db/changes_view.go index 2bbe671bfc..dcbd7eab12 100644 --- a/db/changes_view.go +++ b/db/changes_view.go @@ -25,7 +25,7 @@ type channelsViewRow struct { ID string Key []interface{} // Actually [channelName, sequence] Value struct { - Rev string + Rev RevAndVersion Flags uint8 } } @@ -42,13 +42,12 @@ func nextChannelViewEntry(ctx context.Context, results sgbucket.QueryResultItera entry := &LogEntry{ Sequence: uint64(viewRow.Key[1].(float64)), DocID: viewRow.ID, - RevID: viewRow.Value.Rev, Flags: viewRow.Value.Flags, TimeReceived: time.Now(), CollectionID: collectionID, } + entry.SetRevAndVersion(viewRow.Value.Rev) return entry, true - } func nextChannelQueryEntry(ctx context.Context, results sgbucket.QueryResultIterator, collectionID uint32) (*LogEntry, bool) { @@ -61,11 +60,11 @@ func nextChannelQueryEntry(ctx context.Context, results sgbucket.QueryResultIter entry := &LogEntry{ Sequence: queryRow.Sequence, DocID: queryRow.Id, - RevID: queryRow.Rev, Flags: queryRow.Flags, TimeReceived: time.Now(), CollectionID: collectionID, } + entry.SetRevAndVersion(queryRow.Rev) if queryRow.RemovalRev != "" { entry.RevID = queryRow.RemovalRev diff --git a/db/crud.go b/db/crud.go index 9097ceca91..78b22de8ec 100644 --- a/db/crud.go +++ b/db/crud.go @@ -2762,10 +2762,11 @@ func (db *DatabaseCollectionWithUser) CheckProposedRev(ctx context.Context, doci } const ( - xattrMacroCas = "cas" - xattrMacroValueCrc32c = "value_crc32c" - versionVectorVrsMacro = "_vv.vrs" - versionVectorCVCASMacro = "_vv.cvCas" + xattrMacroCas = "cas" // SyncData.Cas + xattrMacroValueCrc32c = "value_crc32c" // SyncData.Crc32c + xattrMacroCurrentRevVersion = "rev.vrs" // SyncDataJSON.RevAndVersion.CurrentVersion + versionVectorVrsMacro = "_vv.vrs" // PersistedHybridLogicalVector.Version + versionVectorCVCASMacro = "_vv.cvCas" // PersistedHybridLogicalVector.CurrentVersionCAS ) func macroExpandSpec(xattrName string) []sgbucket.MacroExpansionSpec { @@ -2785,6 +2786,10 @@ func xattrCrc32cPath(xattrKey string) string { return xattrKey + "." + xattrMacroValueCrc32c } +func xattrCurrentRevVersionPath(xattrKey string) string { + return xattrKey + "." + xattrMacroCurrentRevVersion +} + func xattrCurrentVersionPath(xattrKey string) string { return xattrKey + "." + versionVectorVrsMacro } diff --git a/db/database.go b/db/database.go index 2bd4c28ac7..25bf4c9261 100644 --- a/db/database.go +++ b/db/database.go @@ -970,7 +970,7 @@ func (c *DatabaseCollection) processForEachDocIDResults(ctx context.Context, cal found = results.Next(ctx, &viewRow) if found { docid = viewRow.Key - revid = viewRow.Value.RevID + revid = viewRow.Value.RevID.RevTreeID seq = viewRow.Value.Sequence channels = viewRow.Value.Channels } @@ -978,7 +978,7 @@ func (c *DatabaseCollection) processForEachDocIDResults(ctx context.Context, cal found = results.Next(ctx, &queryRow) if found { docid = queryRow.Id - revid = queryRow.RevID + revid = queryRow.RevID.RevTreeID seq = queryRow.Sequence channels = make([]string, 0) // Query returns all channels, but we only want to return active channels diff --git a/db/database_test.go b/db/database_test.go index 74b596992e..a2f60a6bee 100644 --- a/db/database_test.go +++ b/db/database_test.go @@ -1828,6 +1828,84 @@ func TestChannelView(t *testing.T) { log.Printf("View Query returned entry (%d): %v", i, entry) } assert.Equal(t, 1, len(entries)) + require.Equal(t, "doc1", entries[0].DocID) + collection.RequireCurrentVersion(t, "doc1", entries[0].SourceID, entries[0].Version) +} + +func TestChannelQuery(t *testing.T) { + + db, ctx := setupTestDB(t) + defer db.Close(ctx) + collection := GetSingleDatabaseCollectionWithUser(t, db) + _, err := collection.UpdateSyncFun(ctx, `function(doc, oldDoc) { + channel(doc.channels); + }`) + require.NoError(t, err) + + // Create doc + body := Body{"key1": "value1", "key2": 1234, "channels": "ABC"} + rev1ID, _, err := collection.Put(ctx, "doc1", body) + require.NoError(t, err, "Couldn't create doc1") + + // Create a doc to test removal handling. Needs three revisions so that the removal rev (2) isn't + // the current revision + removedDocID := "removed_doc" + removedDocRev1, _, err := collection.Put(ctx, removedDocID, body) + require.NoError(t, err, "Couldn't create removed_doc") + removalSource, removalVersion := collection.GetDocumentCurrentVersion(t, removedDocID) + + updatedChannelBody := Body{"_rev": removedDocRev1, "key1": "value1", "key2": 1234, "channels": "DEF"} + removalRev, _, err := collection.Put(ctx, removedDocID, updatedChannelBody) + require.NoError(t, err, "Couldn't update removed_doc") + + updatedChannelBody = Body{"_rev": removalRev, "key1": "value1", "key2": 2345, "channels": "DEF"} + removedDocRev3, _, err := collection.Put(ctx, removedDocID, updatedChannelBody) + require.NoError(t, err, "Couldn't update removed_doc") + + var entries LogEntries + + // Test query retrieval via star channel and named channel (queries use different indexes) + testCases := []struct { + testName string + channelName string + }{ + { + testName: "star channel", + channelName: "*", + }, + { + testName: "named channel", + channelName: "ABC", + }, + } + + for _, testCase := range testCases { + t.Run(testCase.testName, func(t *testing.T) { + entries, err = collection.getChangesInChannelFromQuery(ctx, testCase.channelName, 0, 100, 0, false) + require.NoError(t, err) + + for i, entry := range entries { + log.Printf("Channel Query returned entry (%d): %v", i, entry) + } + require.Len(t, entries, 2) + require.Equal(t, "doc1", entries[0].DocID) + require.Equal(t, rev1ID, entries[0].RevID) + collection.RequireCurrentVersion(t, "doc1", entries[0].SourceID, entries[0].Version) + + removedDocEntry := entries[1] + require.Equal(t, removedDocID, removedDocEntry.DocID) + if testCase.channelName == "*" { + require.Equal(t, removedDocRev3, removedDocEntry.RevID) + collection.RequireCurrentVersion(t, removedDocID, removedDocEntry.SourceID, removedDocEntry.Version) + } else { + require.Equal(t, removalRev, removedDocEntry.RevID) + // TODO: Pending channel removal rev handling, CBG-3213 + log.Printf("removal rev check of removal cv %s@%d is pending CBG-3213", removalSource, removalVersion) + //require.Equal(t, removalSource, removedDocEntry.SourceID) + //require.Equal(t, removalVersion, removedDocEntry.Version) + } + }) + } } @@ -2451,7 +2529,7 @@ func TestDeleteWithNoTombstoneCreationSupport(t *testing.T) { assert.NoError(t, err) var doc Body - var xattr Body + var xattr SyncData // Ensure document has been added waitAndAssertCondition(t, func() bool { @@ -2462,8 +2540,8 @@ func TestDeleteWithNoTombstoneCreationSupport(t *testing.T) { assert.Equal(t, int64(1), db.DbStats.SharedBucketImport().ImportCount.Value()) assert.Nil(t, doc) - assert.Equal(t, "1-2cac91faf7b3f5e5fd56ff377bdb5466", xattr["rev"]) - assert.Equal(t, float64(2), xattr["sequence"]) + assert.Equal(t, "1-2cac91faf7b3f5e5fd56ff377bdb5466", xattr.CurrentRev) + assert.Equal(t, uint64(2), xattr.Sequence) } func TestResyncUpdateAllDocChannels(t *testing.T) { diff --git a/db/document.go b/db/document.go index 9ab3d02702..af8448d119 100644 --- a/db/document.go +++ b/db/document.go @@ -65,7 +65,7 @@ type ChannelSetEntry struct { // The sync-gateway metadata stored in the "_sync" property of a Couchbase document. type SyncData struct { - CurrentRev string `json:"rev"` + CurrentRev string `json:"-"` // CurrentRev. Persisted as RevAndVersion in SyncDataJSON NewestRev string `json:"new_rev,omitempty"` // Newest rev, if different from CurrentRev Flags uint8 `json:"flags,omitempty"` Sequence uint64 `json:"sequence,omitempty"` @@ -192,7 +192,7 @@ type historyOnlySyncData struct { type revOnlySyncData struct { casOnlySyncData - CurrentRev string `json:"rev"` + CurrentRev RevAndVersion `json:"rev"` } type casOnlySyncData struct { @@ -1160,7 +1160,7 @@ func (doc *Document) UnmarshalWithXattr(ctx context.Context, data []byte, xdata return pkgerrors.WithStack(base.RedactErrorf("Failed to UnmarshalWithXattr() doc with id: %s (DocUnmarshalHistory). Error: %v", base.UD(doc.ID), unmarshalErr)) } doc.SyncData = SyncData{ - CurrentRev: historyOnlyMeta.CurrentRev, + CurrentRev: historyOnlyMeta.CurrentRev.RevTreeID, History: historyOnlyMeta.History, Cas: historyOnlyMeta.Cas, } @@ -1173,7 +1173,7 @@ func (doc *Document) UnmarshalWithXattr(ctx context.Context, data []byte, xdata return pkgerrors.WithStack(base.RedactErrorf("Failed to UnmarshalWithXattr() doc with id: %s (DocUnmarshalRev). Error: %v", base.UD(doc.ID), unmarshalErr)) } doc.SyncData = SyncData{ - CurrentRev: revOnlyMeta.CurrentRev, + CurrentRev: revOnlyMeta.CurrentRev.RevTreeID, Cas: revOnlyMeta.Cas, } doc._rawBody = data @@ -1230,7 +1230,7 @@ func (doc *Document) MarshalWithXattr() (data []byte, xdata []byte, err error) { } } - xdata, err = base.JSONMarshal(doc.SyncData) + xdata, err = base.JSONMarshal(&doc.SyncData) if err != nil { return nil, nil, pkgerrors.WithStack(base.RedactErrorf("Failed to MarshalWithXattr() doc SyncData with id: %s. Error: %v", base.UD(doc.ID), err)) } @@ -1251,3 +1251,82 @@ func (d *Document) HasCurrentVersion(cv Version) error { } return nil } + +// SyncDataAlias is an alias for SyncData that doesn't define custom MarshalJSON/UnmarshalJSON +type SyncDataAlias SyncData + +// SyncDataJSON is the persisted form of SyncData, with RevAndVersion populated at marshal time +type SyncDataJSON struct { + *SyncDataAlias + RevAndVersion RevAndVersion `json:"rev"` +} + +// MarshalJSON populates RevAndVersion using CurrentRev and the HLV (current) source and version. +// Marshals using SyncDataAlias to avoid recursion, and SyncDataJSON to add the combined RevAndVersion. +func (s SyncData) MarshalJSON() (data []byte, err error) { + + var sdj SyncDataJSON + var sd SyncDataAlias + sd = (SyncDataAlias)(s) + sdj.SyncDataAlias = &sd + sdj.RevAndVersion.RevTreeID = s.CurrentRev + if s.HLV != nil { + sdj.RevAndVersion.CurrentSource = s.HLV.SourceID + sdj.RevAndVersion.CurrentVersion = string(base.Uint64CASToLittleEndianHex(s.HLV.Version)) + } + return base.JSONMarshal(sdj) +} + +// UnmarshalJSON unmarshals using SyncDataJSON, then sets currentRev on SyncData based on the value in RevAndVersion. +// The HLV's current version stored in RevAndVersion is ignored at unmarshal time - the value in the HLV is the source +// of truth. +func (s *SyncData) UnmarshalJSON(data []byte) error { + + var sdj *SyncDataJSON + err := base.JSONUnmarshal(data, &sdj) + if err != nil { + return err + } + *s = SyncData(*sdj.SyncDataAlias) + s.CurrentRev = sdj.RevAndVersion.RevTreeID + return nil +} + +// RevAndVersion is used to store both revTreeID and currentVersion in a single property, for backwards compatibility +// with existing indexes using rev. When only RevTreeID is specified, is marshalled/unmarshalled as a string. Otherwise +// marshalled normally. +type RevAndVersion struct { + RevTreeID string `json:"rev,omitempty"` + CurrentSource string `json:"src,omitempty"` + CurrentVersion string `json:"vrs,omitempty"` // String representation of version +} + +// RevAndVersionJSON aliases RevAndVersion to support conditional unmarshalling from either string (revTreeID) or +// map (RevAndVersion) representations +type RevAndVersionJSON RevAndVersion + +// Marshals RevAndVersion as simple string when only RevTreeID is specified - otherwise performs standard +// marshalling +func (rv RevAndVersion) MarshalJSON() (data []byte, err error) { + + if rv.CurrentSource == "" { + return base.JSONMarshal(rv.RevTreeID) + } + return base.JSONMarshal(RevAndVersionJSON(rv)) +} + +// Unmarshals either from string (legacy, revID only) or standard RevAndVersion unmarshalling. +func (rv *RevAndVersion) UnmarshalJSON(data []byte) error { + + if len(data) == 0 { + return nil + } + switch data[0] { + case '"': + return base.JSONUnmarshal(data, &rv.RevTreeID) + case '{': + return base.JSONUnmarshal(data, (*RevAndVersionJSON)(rv)) + default: + return fmt.Errorf("unrecognized JSON format for RevAndVersion: %s", data) + } +} diff --git a/db/document_test.go b/db/document_test.go index 6301e99ec3..f28b750397 100644 --- a/db/document_test.go +++ b/db/document_test.go @@ -291,6 +291,96 @@ func TestParseVersionVectorSyncData(t *testing.T) { assert.True(t, reflect.DeepEqual(pv, doc.SyncData.HLV.PreviousVersions)) } +// TestRevAndVersion tests marshalling and unmarshalling rev and current version +func TestRevAndVersion(t *testing.T) { + + ctx := base.TestCtx(t) + testCases := []struct { + testName string + revTreeID string + source string + version uint64 + }{ + { + testName: "rev_and_version", + revTreeID: "1-abc", + source: "source1", + version: 1, + }, + { + testName: "both_empty", + revTreeID: "", + source: "", + version: 0, + }, + { + testName: "revTreeID_only", + revTreeID: "1-abc", + source: "", + version: 0, + }, + { + testName: "currentVersion_only", + revTreeID: "", + source: "source1", + version: 1, + }, + } + + var expectedSequence = uint64(100) + for _, test := range testCases { + t.Run(test.testName, func(t *testing.T) { + syncData := &SyncData{ + CurrentRev: test.revTreeID, + Sequence: expectedSequence, + } + if test.source != "" { + syncData.HLV = &HybridLogicalVector{ + SourceID: test.source, + Version: test.version, + } + } + // SyncData test + marshalledSyncData, err := base.JSONMarshal(syncData) + require.NoError(t, err) + log.Printf("marshalled:%s", marshalledSyncData) + + var newSyncData SyncData + err = base.JSONUnmarshal(marshalledSyncData, &newSyncData) + require.NoError(t, err) + require.Equal(t, test.revTreeID, newSyncData.CurrentRev) + require.Equal(t, expectedSequence, newSyncData.Sequence) + if test.source != "" { + require.NotNil(t, newSyncData.HLV) + require.Equal(t, test.source, newSyncData.HLV.SourceID) + require.Equal(t, test.version, newSyncData.HLV.Version) + } + + // Document test + document := NewDocument("docID") + document.SyncData.CurrentRev = test.revTreeID + document.SyncData.HLV = &HybridLogicalVector{ + SourceID: test.source, + Version: test.version, + } + marshalledDoc, marshalledXattr, err := document.MarshalWithXattr() + require.NoError(t, err) + + newDocument := NewDocument("docID") + err = newDocument.UnmarshalWithXattr(ctx, marshalledDoc, marshalledXattr, DocUnmarshalAll) + require.NoError(t, err) + require.Equal(t, test.revTreeID, newDocument.CurrentRev) + require.Equal(t, expectedSequence, newSyncData.Sequence) + if test.source != "" { + require.NotNil(t, newDocument.HLV) + require.Equal(t, test.source, newDocument.HLV.SourceID) + require.Equal(t, test.version, newDocument.HLV.Version) + } + //require.Equal(t, test.expectedCombinedVersion, newDocument.RevAndVersion) + }) + } +} + func TestParseXattr(t *testing.T) { zeroByte := byte(0) // Build payload for single xattr pair and body diff --git a/db/hybrid_logical_vector.go b/db/hybrid_logical_vector.go index f3f1c2facc..5a44611ce8 100644 --- a/db/hybrid_logical_vector.go +++ b/db/hybrid_logical_vector.go @@ -12,6 +12,7 @@ import ( "encoding/base64" "fmt" "math" + "strings" sgbucket "github.com/couchbase/sg-bucket" "github.com/couchbase/sync_gateway/base" @@ -45,6 +46,20 @@ func CreateVersion(source string, version uint64) Version { } } +func CreateVersionFromString(versionString string) (version Version, err error) { + timestampString, sourceBase64, found := strings.Cut(versionString, "@") + if !found { + return version, fmt.Errorf("Malformed version string %s, delimiter not found", versionString) + } + sourceBytes, err := base64.StdEncoding.DecodeString(sourceBase64) + if err != nil { + return version, fmt.Errorf("Unable to decode sourceID for version %s: %w", versionString, err) + } + version.SourceID = string(sourceBytes) + version.Value = base.HexCasToUint64(timestampString) + return version, nil +} + // String returns a Couchbase Lite-compatible string representation of the version. func (v Version) String() string { timestamp := string(base.Uint64CASToLittleEndianHex(v.Value)) @@ -76,7 +91,19 @@ func (hlv *HybridLogicalVector) GetCurrentVersion() (string, uint64) { return hlv.SourceID, hlv.Version } -// IsInConflict tests to see if in memory HLV is conflicting with another HLV. +// GetCurrentVersion returns the current version in transport format +func (hlv *HybridLogicalVector) GetCurrentVersionString() string { + if hlv == nil || hlv.SourceID == "" { + return "" + } + version := Version{ + SourceID: hlv.SourceID, + Value: hlv.Version, + } + return version.String() +} + +// IsInConflict tests to see if in memory HLV is conflicting with another HLV func (hlv *HybridLogicalVector) IsInConflict(otherVector HybridLogicalVector) bool { // test if either HLV(A) or HLV(B) are dominating over each other. If so they are not in conflict if hlv.isDominating(otherVector) || otherVector.isDominating(*hlv) { @@ -354,6 +381,9 @@ func (hlv *HybridLogicalVector) computeMacroExpansions() []sgbucket.MacroExpansi if hlv.Version == hlvExpandMacroCASValue { spec := sgbucket.NewMacroExpansionSpec(xattrCurrentVersionPath(base.SyncXattrName), sgbucket.MacroCas) outputSpec = append(outputSpec, spec) + // If version is being expanded, we need to also specify the macro expansion for the expanded rev property + currentRevSpec := sgbucket.NewMacroExpansionSpec(xattrCurrentRevVersionPath(base.SyncXattrName), sgbucket.MacroCas) + outputSpec = append(outputSpec, currentRevSpec) } if hlv.CurrentVersionCAS == hlvExpandMacroCASValue { spec := sgbucket.NewMacroExpansionSpec(xattrCurrentVersionCASPath(base.SyncXattrName), sgbucket.MacroCas) diff --git a/db/import_test.go b/db/import_test.go index 1d514d33b5..e1e8967fd3 100644 --- a/db/import_test.go +++ b/db/import_test.go @@ -424,14 +424,14 @@ func TestImportNullDocRaw(t *testing.T) { } func assertXattrSyncMetaRevGeneration(t *testing.T, dataStore base.DataStore, key string, expectedRevGeneration int) { - xattr := map[string]interface{}{} - _, err := dataStore.GetWithXattr(base.TestCtx(t), key, base.SyncXattrName, "", nil, &xattr, nil) - assert.NoError(t, err, "Error Getting Xattr") - revision, ok := xattr["rev"] - assert.True(t, ok) - generation, _ := ParseRevID(base.TestCtx(t), revision.(string)) - log.Printf("assertXattrSyncMetaRevGeneration generation: %d rev: %s", generation, revision) - assert.True(t, generation == expectedRevGeneration) + + var syncData SyncData + _, err := dataStore.GetWithXattr(base.TestCtx(t), key, base.SyncXattrName, "", nil, &syncData, nil) + require.NoError(t, err, "Error Getting Xattr") + require.True(t, syncData.CurrentRev != "") + generation, _ := ParseRevID(base.TestCtx(t), syncData.CurrentRev) + log.Printf("assertXattrSyncMetaRevGeneration generation: %d rev: %s", generation, syncData.CurrentRev) + assert.Equal(t, generation, expectedRevGeneration) } func TestEvaluateFunction(t *testing.T) { diff --git a/db/query.go b/db/query.go index 28c9685cd2..1472c51e68 100644 --- a/db/query.go +++ b/db/query.go @@ -154,12 +154,12 @@ var QuerySequences = SGQuery{ } type QueryChannelsRow struct { - Id string `json:"id,omitempty"` - Rev string `json:"rev,omitempty"` - Sequence uint64 `json:"seq,omitempty"` - Flags uint8 `json:"flags,omitempty"` - RemovalRev string `json:"rRev,omitempty"` - RemovalDel bool `json:"rDel,omitempty"` + Id string `json:"id,omitempty"` + Rev RevAndVersion `json:"rev,omitempty"` + Sequence uint64 `json:"seq,omitempty"` + Flags uint8 `json:"flags,omitempty"` + RemovalRev string `json:"rRev,omitempty"` + RemovalDel bool `json:"rDel,omitempty"` } var QueryPrincipals = SGQuery{ @@ -688,15 +688,15 @@ func (context *DatabaseContext) QueryAllRoles(ctx context.Context, startKey stri type AllDocsViewQueryRow struct { Key string Value struct { - RevID string `json:"r"` - Sequence uint64 `json:"s"` - Channels []string `json:"c"` + RevID RevAndVersion `json:"r"` + Sequence uint64 `json:"s"` + Channels []string `json:"c"` } } type AllDocsIndexQueryRow struct { Id string - RevID string `json:"r"` + RevID RevAndVersion `json:"r"` Sequence uint64 `json:"s"` Channels channels.ChannelMap `json:"c"` } diff --git a/db/util_testing.go b/db/util_testing.go index 1d6462ef3b..4cfa00d722 100644 --- a/db/util_testing.go +++ b/db/util_testing.go @@ -677,3 +677,29 @@ func createTestDocument(docID string, revID string, body Body, deleted bool, exp } return newDoc } + +// requireCurrentVersion fetches the document by key, and validates that cv matches. +func (c *DatabaseCollection) RequireCurrentVersion(t *testing.T, key string, source string, version uint64) { + ctx := base.TestCtx(t) + doc, err := c.GetDocument(ctx, key, DocUnmarshalSync) + require.NoError(t, err) + if doc.HLV == nil { + require.Equal(t, "", source) + require.Equal(t, "", version) + return + } + + require.Equal(t, doc.HLV.SourceID, source) + require.Equal(t, doc.HLV.Version, version) +} + +// GetDocumentCurrentVersion fetches the document by key and returns the current version +func (c *DatabaseCollection) GetDocumentCurrentVersion(t *testing.T, key string) (source string, version uint64) { + ctx := base.TestCtx(t) + doc, err := c.GetDocument(ctx, key, DocUnmarshalSync) + require.NoError(t, err) + if doc.HLV == nil { + return "", 0 + } + return doc.HLV.SourceID, doc.HLV.Version +} diff --git a/rest/api_test.go b/rest/api_test.go index 5f7cf18d30..2f2234f004 100644 --- a/rest/api_test.go +++ b/rest/api_test.go @@ -1652,10 +1652,10 @@ func TestWriteTombstonedDocUsingXattrs(t *testing.T) { // Fetch the xattr and make sure it contains the above value var retrievedVal map[string]interface{} - var retrievedXattr map[string]interface{} - _, err = rt.GetSingleDataStore().GetWithXattr(rt.Context(), "-21SK00U-ujxUO9fU2HezxL", base.SyncXattrName, "", &retrievedVal, &retrievedXattr, nil) + var retrievedSyncData db.SyncData + _, err = rt.GetSingleDataStore().GetWithXattr(rt.Context(), "-21SK00U-ujxUO9fU2HezxL", base.SyncXattrName, "", &retrievedVal, &retrievedSyncData, nil) assert.NoError(t, err, "Unexpected Error") - assert.Equal(t, "2-466a1fab90a810dc0a63565b70680e4e", retrievedXattr["rev"]) + assert.Equal(t, "2-466a1fab90a810dc0a63565b70680e4e", retrievedSyncData.CurrentRev) } diff --git a/rest/changes_test.go b/rest/changes_test.go index 91f53149d4..411509ebcd 100644 --- a/rest/changes_test.go +++ b/rest/changes_test.go @@ -275,6 +275,7 @@ func TestWebhookWinningRevChangedEvent(t *testing.T) { } func TestCVPopulationOnChangesViaAPI(t *testing.T) { + t.Skip("Disabled until REST support for version is added") rtConfig := RestTesterConfig{ SyncFn: `function(doc) {channel(doc.channels)}`, } @@ -306,6 +307,7 @@ func TestCVPopulationOnChangesViaAPI(t *testing.T) { } func TestCVPopulationOnDocIDChanges(t *testing.T) { + t.Skip("Disabled until REST support for version is added") rtConfig := RestTesterConfig{ SyncFn: `function(doc) {channel(doc.channels)}`, } diff --git a/rest/changestest/changes_api_test.go b/rest/changestest/changes_api_test.go index 3172ba71a5..cd87ffaedd 100644 --- a/rest/changestest/changes_api_test.go +++ b/rest/changestest/changes_api_test.go @@ -9,7 +9,6 @@ package changestest import ( - "encoding/json" "errors" "fmt" "log" @@ -901,9 +900,7 @@ func TestChangesFromCompoundSinceViaDocGrant(t *testing.T) { changes, err := rt.WaitForChanges(len(expectedResults), "/{{.keyspace}}/_changes", "bernard", false) require.NoError(t, err, "Error retrieving changes results") for index, result := range changes.Results { - var expectedChange db.ChangeEntry - require.NoError(t, base.JSONUnmarshal([]byte(expectedResults[index]), &expectedChange)) - assert.Equal(t, expectedChange, result) + assertChangeEntryMatches(t, expectedResults[index], result) } // create doc that dynamically grants both users access to PBS and HBO @@ -921,9 +918,7 @@ func TestChangesFromCompoundSinceViaDocGrant(t *testing.T) { fmt.Sprintf("/{{.keyspace}}/_changes?since=%s", changes.Last_Seq), "bernard", false) require.NoError(t, err, "Error retrieving changes results") for index, result := range changes.Results { - var expectedChange db.ChangeEntry - require.NoError(t, base.JSONUnmarshal([]byte(expectedResults[index]), &expectedChange)) - assert.Equal(t, expectedChange, result) + assertChangeEntryMatches(t, expectedResults[index], result) } // Write another doc @@ -947,9 +942,7 @@ func TestChangesFromCompoundSinceViaDocGrant(t *testing.T) { changes, err = rt.WaitForChanges(len(expectedResults), "/{{.keyspace}}/_changes?since=8:1", "alice", false) require.NoError(t, err, "Error retrieving changes results for alice") for index, result := range changes.Results { - var expectedChange db.ChangeEntry - require.NoError(t, base.JSONUnmarshal([]byte(expectedResults[index]), &expectedChange)) - assert.Equal(t, expectedChange, result) + assertChangeEntryMatches(t, expectedResults[index], result) } }) @@ -957,13 +950,33 @@ func TestChangesFromCompoundSinceViaDocGrant(t *testing.T) { changes, err = rt.WaitForChanges(len(expectedResults), "/{{.keyspace}}/_changes?since=8:1", "bernard", false) require.NoError(t, err, "Error retrieving changes results for bernard") for index, result := range changes.Results { - var expectedChange db.ChangeEntry - require.NoError(t, base.JSONUnmarshal([]byte(expectedResults[index]), &expectedChange)) - assert.Equal(t, expectedChange, result) + assertChangeEntryMatches(t, expectedResults[index], result) } }) } +// TODO: enhance to compare source/version when expectedChanges are updated to include +func assertChangeEntryMatches(t *testing.T, expectedChangeEntryString string, result db.ChangeEntry) { + var expectedChange db.ChangeEntry + require.NoError(t, base.JSONUnmarshal([]byte(expectedChangeEntryString), &expectedChange)) + assert.Equal(t, expectedChange.Seq, result.Seq) + assert.Equal(t, expectedChange.ID, result.ID) + assert.Equal(t, expectedChange.Changes, result.Changes) + assert.Equal(t, expectedChange.Deleted, result.Deleted) + assert.Equal(t, expectedChange.Removed, result.Removed) + + if expectedChange.Doc != nil { + // result.Doc is json.RawMessage, and properties may not be in the same order for a direct comparison + var expectedBody db.Body + var resultBody db.Body + assert.NoError(t, expectedBody.Unmarshal(expectedChange.Doc)) + assert.NoError(t, resultBody.Unmarshal(result.Doc)) + db.AssertEqualBodies(t, expectedBody, resultBody) + } else { + assert.Equal(t, expectedChange.Doc, result.Doc) + } +} + // Ensures that changes feed goroutines blocked on a ChangeWaiter are closed when the changes feed is terminated. // Reproduces CBG-1113 and #1329 (even with the fix in PR #1360) // Tests all combinations of HTTP feed types, admin/non-admin, and with and without a manual notify to wake up. @@ -2115,26 +2128,7 @@ func TestChangesIncludeDocs(t *testing.T) { assert.Equal(t, len(expectedResults), len(changes.Results)) for index, result := range changes.Results { - var expectedChange db.ChangeEntry - assert.NoError(t, base.JSONUnmarshal([]byte(expectedResults[index]), &expectedChange)) - - assert.Equal(t, expectedChange.ID, result.ID) - assert.Equal(t, expectedChange.Seq, result.Seq) - assert.Equal(t, expectedChange.Deleted, result.Deleted) - assert.Equal(t, expectedChange.Changes, result.Changes) - assert.Equal(t, expectedChange.Err, result.Err) - assert.Equal(t, expectedChange.Removed, result.Removed) - - if expectedChange.Doc != nil { - // result.Doc is json.RawMessage, and properties may not be in the same order for a direct comparison - var expectedBody db.Body - var resultBody db.Body - assert.NoError(t, expectedBody.Unmarshal(expectedChange.Doc)) - assert.NoError(t, resultBody.Unmarshal(result.Doc)) - db.AssertEqualBodies(t, expectedBody, resultBody) - } else { - assert.Equal(t, expectedChange.Doc, result.Doc) - } + assertChangeEntryMatches(t, expectedResults[index], result) } // Flush the rev cache, and issue changes again to ensure successful handling for rev cache misses @@ -2150,16 +2144,10 @@ func TestChangesIncludeDocs(t *testing.T) { assert.Equal(t, len(expectedResults), len(postFlushChanges.Results)) for index, result := range postFlushChanges.Results { + + assertChangeEntryMatches(t, expectedResults[index], result) var expectedChange db.ChangeEntry assert.NoError(t, base.JSONUnmarshal([]byte(expectedResults[index]), &expectedChange)) - - assert.Equal(t, expectedChange.ID, result.ID) - assert.Equal(t, expectedChange.Seq, result.Seq) - assert.Equal(t, expectedChange.Deleted, result.Deleted) - assert.Equal(t, expectedChange.Changes, result.Changes) - assert.Equal(t, expectedChange.Err, result.Err) - assert.Equal(t, expectedChange.Removed, result.Removed) - if expectedChange.Doc != nil { // result.Doc is json.RawMessage, and properties may not be in the same order for a direct comparison var expectedBody db.Body @@ -2186,14 +2174,12 @@ func TestChangesIncludeDocs(t *testing.T) { expectedStyleAllDocs[9] = `{"seq":26,"id":"doc_resolved_conflict","changes":[{"rev":"2-251ba04e5889887152df5e7a350745b4"},{"rev":"3-f25ad98ef169791adec6c1d385717b84"}]}` styleAllDocsChangesResponse := rt.SendUserRequest("GET", "/{{.keyspace}}/_changes?style=all_docs", "", "user1") - var allDocsChanges struct { - Results []*json.RawMessage - } + var allDocsChanges rest.ChangesResults err = base.JSONUnmarshal(styleAllDocsChangesResponse.Body.Bytes(), &allDocsChanges) assert.NoError(t, err, "Error unmarshalling changes response") assert.Equal(t, len(expectedStyleAllDocs), len(allDocsChanges.Results)) for index, result := range allDocsChanges.Results { - assert.Equal(t, expectedStyleAllDocs[index], fmt.Sprintf("%s", *result)) + assertChangeEntryMatches(t, expectedStyleAllDocs[index], result) } // Validate style=all_docs, include_docs=true permutations. Only modified doc from include_docs test is doc_conflict (adds open revisions) @@ -2206,26 +2192,7 @@ func TestChangesIncludeDocs(t *testing.T) { assert.Equal(t, len(expectedResults), len(combinedChanges.Results)) for index, result := range combinedChanges.Results { - var expectedChange db.ChangeEntry - assert.NoError(t, base.JSONUnmarshal([]byte(expectedResults[index]), &expectedChange)) - - assert.Equal(t, expectedChange.ID, result.ID) - assert.Equal(t, expectedChange.Seq, result.Seq) - assert.Equal(t, expectedChange.Deleted, result.Deleted) - assert.Equal(t, expectedChange.Changes, result.Changes) - assert.Equal(t, expectedChange.Err, result.Err) - assert.Equal(t, expectedChange.Removed, result.Removed) - - if expectedChange.Doc != nil { - // result.Doc is json.RawMessage, and properties may not be in the same order for a direct comparison - var expectedBody db.Body - var resultBody db.Body - assert.NoError(t, expectedBody.Unmarshal(expectedChange.Doc)) - assert.NoError(t, resultBody.Unmarshal(result.Doc)) - db.AssertEqualBodies(t, expectedBody, resultBody) - } else { - assert.Equal(t, expectedChange.Doc, result.Doc) - } + assertChangeEntryMatches(t, expectedResults[index], result) } } diff --git a/rest/importtest/import_test.go b/rest/importtest/import_test.go index 6c30ff714a..c087949e51 100644 --- a/rest/importtest/import_test.go +++ b/rest/importtest/import_test.go @@ -1612,7 +1612,7 @@ func TestImportRevisionCopy(t *testing.T) { var rawInsertResponse rest.RawResponse err = base.JSONUnmarshal(response.Body.Bytes(), &rawInsertResponse) assert.NoError(t, err, "Unable to unmarshal raw response") - rev1id := rawInsertResponse.Sync.Rev + rev1id := rawInsertResponse.Sync.Rev.RevTreeID // 3. Update via SDK updatedBody := make(map[string]interface{}) @@ -1673,7 +1673,7 @@ func TestImportRevisionCopyUnavailable(t *testing.T) { var rawInsertResponse rest.RawResponse err = base.JSONUnmarshal(response.Body.Bytes(), &rawInsertResponse) assert.NoError(t, err, "Unable to unmarshal raw response") - rev1id := rawInsertResponse.Sync.Rev + rev1id := rawInsertResponse.Sync.Rev.RevTreeID // 3. Flush the rev cache (simulates attempted retrieval by a different SG node, since testing framework isn't great // at simulating multiple SG instances) @@ -1903,7 +1903,7 @@ func TestImportRevisionCopyDisabled(t *testing.T) { var rawInsertResponse rest.RawResponse err = base.JSONUnmarshal(response.Body.Bytes(), &rawInsertResponse) assert.NoError(t, err, "Unable to unmarshal raw response") - rev1id := rawInsertResponse.Sync.Rev + rev1id := rawInsertResponse.Sync.Rev.RevTreeID // 3. Update via SDK updatedBody := make(map[string]interface{}) @@ -2078,13 +2078,12 @@ func rawDocWithSyncMeta() string { } func assertXattrSyncMetaRevGeneration(t *testing.T, dataStore base.DataStore, key string, expectedRevGeneration int) { - xattr := map[string]interface{}{} - _, err := dataStore.GetWithXattr(base.TestCtx(t), key, base.SyncXattrName, "", nil, &xattr, nil) + var syncData db.SyncData + _, err := dataStore.GetWithXattr(base.TestCtx(t), key, base.SyncXattrName, "", nil, &syncData, nil) assert.NoError(t, err, "Error Getting Xattr") - revision, ok := xattr["rev"] - assert.True(t, ok) - generation, _ := db.ParseRevID(base.TestCtx(t), revision.(string)) - log.Printf("assertXattrSyncMetaRevGeneration generation: %d rev: %s", generation, revision) + assert.True(t, syncData.CurrentRev != "") + generation, _ := db.ParseRevID(base.TestCtx(t), syncData.CurrentRev) + log.Printf("assertXattrSyncMetaRevGeneration generation: %d rev: %s", generation, syncData.CurrentRev) assert.True(t, generation == expectedRevGeneration) } @@ -2109,13 +2108,12 @@ func TestDeletedEmptyDocumentImport(t *testing.T) { // Get the doc and check deleted revision is getting imported response = rt.SendAdminRequest(http.MethodGet, "/{{.keyspace}}/_raw/"+docId, "") assert.Equal(t, http.StatusOK, response.Code) - rawResponse := make(map[string]interface{}) + var rawResponse rest.RawResponse err = base.JSONUnmarshal(response.Body.Bytes(), &rawResponse) require.NoError(t, err, "Unable to unmarshal raw response") - assert.True(t, rawResponse[db.BodyDeleted].(bool)) - syncMeta := rawResponse["_sync"].(map[string]interface{}) - assert.Equal(t, "2-5d3308aae9930225ed7f6614cf115366", syncMeta["rev"]) + assert.True(t, rawResponse.Deleted) + assert.Equal(t, "2-5d3308aae9930225ed7f6614cf115366", rawResponse.Sync.Rev.RevTreeID) } // Check deleted document via SDK is getting imported if it is included in through ImportFilter function. @@ -2149,10 +2147,9 @@ func TestDeletedDocumentImportWithImportFilter(t *testing.T) { endpoint := fmt.Sprintf("/{{.keyspace}}/_raw/%s?redact=false", key) response := rt.SendAdminRequest(http.MethodGet, endpoint, "") assert.Equal(t, http.StatusOK, response.Code) - var respBody db.Body + var respBody rest.RawResponse require.NoError(t, base.JSONUnmarshal(response.Body.Bytes(), &respBody)) - syncMeta := respBody[base.SyncPropertyName].(map[string]interface{}) - assert.NotEmpty(t, syncMeta["rev"].(string)) + assert.NotEmpty(t, respBody.Sync.Rev.RevTreeID) // Delete the document via SDK err = dataStore.Delete(key) @@ -2162,9 +2159,8 @@ func TestDeletedDocumentImportWithImportFilter(t *testing.T) { response = rt.SendAdminRequest(http.MethodGet, endpoint, "") assert.Equal(t, http.StatusOK, response.Code) require.NoError(t, base.JSONUnmarshal(response.Body.Bytes(), &respBody)) - assert.True(t, respBody[db.BodyDeleted].(bool)) - syncMeta = respBody[base.SyncPropertyName].(map[string]interface{}) - assert.NotEmpty(t, syncMeta["rev"].(string)) + assert.True(t, respBody.Deleted) + assert.NotEmpty(t, respBody.Sync.Rev.RevTreeID) } // CBG-1995: Test the support for using an underscore prefix in the top-level body of a document @@ -2333,7 +2329,7 @@ func TestImportTouch(t *testing.T) { var rawInsertResponse rest.RawResponse err = base.JSONUnmarshal(response.Body.Bytes(), &rawInsertResponse) require.NoError(t, err, "Unable to unmarshal raw response") - initialRev := rawInsertResponse.Sync.Rev + initialRev := rawInsertResponse.Sync.Rev.RevTreeID // 2. Test import behaviour after SDK touch _, err = dataStore.Touch(key, 1000000) @@ -2345,7 +2341,7 @@ func TestImportTouch(t *testing.T) { var rawUpdateResponse rest.RawResponse err = base.JSONUnmarshal(response.Body.Bytes(), &rawUpdateResponse) require.NoError(t, err, "Unable to unmarshal raw response") - require.Equal(t, initialRev, rawUpdateResponse.Sync.Rev) + require.Equal(t, initialRev, rawUpdateResponse.Sync.Rev.RevTreeID) } func TestImportingPurgedDocument(t *testing.T) { if !base.TestUseXattrs() { diff --git a/rest/utilities_testing.go b/rest/utilities_testing.go index e49349709c..07274b4d82 100644 --- a/rest/utilities_testing.go +++ b/rest/utilities_testing.go @@ -1036,12 +1036,13 @@ func (rt *RestTester) SetAdminChannels(username string, keyspace string, channel type SimpleSync struct { Channels map[string]interface{} - Rev string + Rev db.RevAndVersion Sequence uint64 } type RawResponse struct { - Sync SimpleSync `json:"_sync"` + Sync SimpleSync `json:"_sync"` + Deleted bool `json:"_deleted"` } // GetDocumentSequence looks up the sequence for a document using the _raw endpoint.