Skip to content

Commit

Permalink
CBG-3212: add api to fetch a document by its CV value (#6579)
Browse files Browse the repository at this point in the history
* CBG-3212: add api to fetch a document by its CV value

* test fix

* rebased SourceAndVersion -> Version rename

* Update currentRevChannels on CV revcache load and doc.updateChannels

* fix spelling

* Remove currentRevChannels

* Move common GetRev/GetCV work into documentRevisionForRequest function

* Pass revision.RevID into authorizeUserForChannels

* Update db/crud.go

Co-authored-by: Tor Colvin <tor.colvin@couchbase.com>

---------

Co-authored-by: Ben Brooks <ben.brooks@couchbase.com>
Co-authored-by: Tor Colvin <tor.colvin@couchbase.com>
  • Loading branch information
3 people authored Jan 18, 2024
1 parent f185349 commit 07f4320
Show file tree
Hide file tree
Showing 6 changed files with 247 additions and 37 deletions.
52 changes: 41 additions & 11 deletions db/crud.go
Original file line number Diff line number Diff line change
Expand Up @@ -322,14 +322,29 @@ func (db *DatabaseCollectionWithUser) getRev(ctx context.Context, docid, revid s
// No rev ID given, so load active revision
revision, err = db.revisionCache.GetActive(ctx, docid, includeBody)
}

if err != nil {
return DocumentRevision{}, err
}

return db.documentRevisionForRequest(ctx, docid, revision, &revid, nil, maxHistory, historyFrom)
}

// documentRevisionForRequest processes the given DocumentRevision and returns a version of it for a given client request, depending on access, deleted, etc.
func (db *DatabaseCollectionWithUser) documentRevisionForRequest(ctx context.Context, docID string, revision DocumentRevision, revID *string, cv *Version, maxHistory int, historyFrom []string) (DocumentRevision, error) {
// ensure only one of cv or revID is specified
if cv != nil && revID != nil {
return DocumentRevision{}, fmt.Errorf("must have one of cv or revID in documentRevisionForRequest (had cv=%v revID=%v)", cv, revID)
}
var requestedVersion string
if revID != nil {
requestedVersion = *revID
} else if cv != nil {
requestedVersion = cv.String()
}

if revision.BodyBytes == nil {
if db.ForceAPIForbiddenErrors() {
base.InfofCtx(ctx, base.KeyCRUD, "Doc: %s %s is missing", base.UD(docid), base.MD(revid))
base.InfofCtx(ctx, base.KeyCRUD, "Doc: %s %s is missing", base.UD(docID), base.MD(requestedVersion))
return DocumentRevision{}, ErrForbidden
}
return DocumentRevision{}, ErrMissing
Expand All @@ -348,16 +363,17 @@ func (db *DatabaseCollectionWithUser) getRev(ctx context.Context, docid, revid s
_, requestedHistory = trimEncodedRevisionsToAncestor(ctx, requestedHistory, historyFrom, maxHistory)
}

isAuthorized, redactedRev := db.authorizeUserForChannels(docid, revision.RevID, revision.Channels, revision.Deleted, requestedHistory)
isAuthorized, redactedRevision := db.authorizeUserForChannels(docID, revision.RevID, cv, revision.Channels, revision.Deleted, requestedHistory)
if !isAuthorized {
if revid == "" {
// client just wanted active revision, not a specific one
if requestedVersion == "" {
return DocumentRevision{}, ErrForbidden
}
if db.ForceAPIForbiddenErrors() {
base.InfofCtx(ctx, base.KeyCRUD, "Not authorized to view doc: %s %s", base.UD(docid), base.MD(revid))
base.InfofCtx(ctx, base.KeyCRUD, "Not authorized to view doc: %s %s", base.UD(docID), base.MD(requestedVersion))
return DocumentRevision{}, ErrForbidden
}
return redactedRev, nil
return redactedRevision, nil
}

// If the revision is a removal cache entry (no body), but the user has access to that removal, then just
Expand All @@ -366,13 +382,26 @@ func (db *DatabaseCollectionWithUser) getRev(ctx context.Context, docid, revid s
return DocumentRevision{}, ErrMissing
}

if revision.Deleted && revid == "" {
if revision.Deleted && requestedVersion == "" {
return DocumentRevision{}, ErrDeleted
}

return revision, nil
}

func (db *DatabaseCollectionWithUser) GetCV(ctx context.Context, docid string, cv *Version, includeBody bool) (revision DocumentRevision, err error) {
if cv != nil {
revision, err = db.revisionCache.GetWithCV(ctx, docid, cv, includeBody, RevCacheOmitDelta)
} else {
revision, err = db.revisionCache.GetActive(ctx, docid, includeBody)
}
if err != nil {
return DocumentRevision{}, err
}

return db.documentRevisionForRequest(ctx, docid, revision, nil, cv, 0, nil)
}

// GetDelta attempts to return the delta between fromRevId and toRevId. If the delta can't be generated,
// returns nil.
func (db *DatabaseCollectionWithUser) GetDelta(ctx context.Context, docID, fromRevID, toRevID string) (delta *RevisionDelta, redactedRev *DocumentRevision, err error) {
Expand Down Expand Up @@ -404,7 +433,7 @@ func (db *DatabaseCollectionWithUser) GetDelta(ctx context.Context, docID, fromR
if fromRevision.Delta != nil {
if fromRevision.Delta.ToRevID == toRevID {

isAuthorized, redactedBody := db.authorizeUserForChannels(docID, toRevID, fromRevision.Delta.ToChannels, fromRevision.Delta.ToDeleted, encodeRevisions(ctx, docID, fromRevision.Delta.RevisionHistory))
isAuthorized, redactedBody := db.authorizeUserForChannels(docID, toRevID, nil, fromRevision.Delta.ToChannels, fromRevision.Delta.ToDeleted, encodeRevisions(ctx, docID, fromRevision.Delta.RevisionHistory))
if !isAuthorized {
return nil, &redactedBody, nil
}
Expand All @@ -427,7 +456,7 @@ func (db *DatabaseCollectionWithUser) GetDelta(ctx context.Context, docID, fromR
}

deleted := toRevision.Deleted
isAuthorized, redactedBody := db.authorizeUserForChannels(docID, toRevID, toRevision.Channels, deleted, toRevision.History)
isAuthorized, redactedBody := db.authorizeUserForChannels(docID, toRevID, nil, toRevision.Channels, deleted, toRevision.History)
if !isAuthorized {
return nil, &redactedBody, nil
}
Expand Down Expand Up @@ -486,7 +515,7 @@ func (db *DatabaseCollectionWithUser) GetDelta(ctx context.Context, docID, fromR
return nil, nil, nil
}

func (col *DatabaseCollectionWithUser) authorizeUserForChannels(docID, revID string, channels base.Set, isDeleted bool, history Revisions) (isAuthorized bool, redactedRev DocumentRevision) {
func (col *DatabaseCollectionWithUser) authorizeUserForChannels(docID, revID string, cv *Version, channels base.Set, isDeleted bool, history Revisions) (isAuthorized bool, redactedRev DocumentRevision) {

if col.user != nil {
if err := col.user.AuthorizeAnyCollectionChannel(col.ScopeName, col.Name, channels); err != nil {
Expand All @@ -498,6 +527,7 @@ func (col *DatabaseCollectionWithUser) authorizeUserForChannels(docID, revID str
RevID: revID,
History: history,
Deleted: isDeleted,
CV: cv,
}
if isDeleted {
// Deletions are denoted by the deleted message property during 2.x replication
Expand Down Expand Up @@ -1064,7 +1094,7 @@ func (db *DatabaseCollectionWithUser) PutExistingCurrentVersion(ctx context.Cont
if existingDoc != nil {
doc, unmarshalErr := unmarshalDocumentWithXattr(ctx, newDoc.ID, existingDoc.Body, existingDoc.Xattr, existingDoc.UserXattr, existingDoc.Cas, DocUnmarshalRev)
if unmarshalErr != nil {
return nil, nil, "", base.HTTPErrorf(http.StatusBadRequest, "Error unmarshaling exsiting doc")
return nil, nil, "", base.HTTPErrorf(http.StatusBadRequest, "Error unmarshaling existing doc")
}
matchRev = doc.CurrentRev
}
Expand Down
179 changes: 179 additions & 0 deletions db/crud_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import (

sgbucket "github.com/couchbase/sg-bucket"
"github.com/couchbase/sync_gateway/base"
"github.com/couchbase/sync_gateway/channels"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
)
Expand Down Expand Up @@ -1867,3 +1868,181 @@ func TestPutExistingCurrentVersionWithNoExistingDoc(t *testing.T) {
assert.True(t, reflect.DeepEqual(syncData.HLV.PreviousVersions, pv))
assert.Equal(t, "1-3a208ea66e84121b528f05b5457d1134", syncData.CurrentRev)
}

// TestGetCVWithDocResidentInCache:
// - Two test cases, one with doc a user will have access to, one without
// - Purpose is to have a doc that is resident in rev cache and use the GetCV function to retrieve these docs
// - Assert that the doc the user has access to is corrected fetched
// - Assert the doc the user doesn't have access to is fetched but correctly redacted
func TestGetCVWithDocResidentInCache(t *testing.T) {
const docID = "doc1"

testCases := []struct {
name string
docChannels []string
access bool
}{
{
name: "getCVWithUserAccess",
docChannels: []string{"A"},
access: true,
},
{
name: "getCVWithoutUserAccess",
docChannels: []string{"B"},
access: false,
},
}
for _, testCase := range testCases {
t.Run(testCase.name, func(t *testing.T) {
db, ctx := setupTestDB(t)
defer db.Close(ctx)
collection := GetSingleDatabaseCollectionWithUser(t, db)
collection.ChannelMapper = channels.NewChannelMapper(ctx, channels.DocChannelsSyncFunction, db.Options.JavascriptTimeout)

// Create a user with access to channel A
authenticator := db.Authenticator(base.TestCtx(t))
user, err := authenticator.NewUser("alice", "letmein", channels.BaseSetOf(t, "A"))
require.NoError(t, err)
require.NoError(t, authenticator.Save(user))
collection.user, err = authenticator.GetUser("alice")
require.NoError(t, err)

// create doc with the channels for the test case
docBody := Body{"channels": testCase.docChannels}
rev, doc, err := collection.Put(ctx, docID, docBody)
require.NoError(t, err)

vrs := doc.HLV.Version
src := doc.HLV.SourceID
sv := &Version{Value: vrs, SourceID: src}
revision, err := collection.GetCV(ctx, docID, sv, true)
require.NoError(t, err)
if testCase.access {
assert.Equal(t, rev, revision.RevID)
assert.Equal(t, sv, revision.CV)
assert.Equal(t, docID, revision.DocID)
assert.Equal(t, []byte(`{"channels":["A"]}`), revision.BodyBytes)
} else {
assert.Equal(t, rev, revision.RevID)
assert.Equal(t, sv, revision.CV)
assert.Equal(t, docID, revision.DocID)
assert.Equal(t, []byte(RemovedRedactedDocument), revision.BodyBytes)
}
})
}
}

// TestGetByCVForDocNotResidentInCache:
// - Setup db with rev cache size of 1
// - Put two docs forcing eviction of the first doc
// - Use GetCV function to fetch the first doc, forcing the rev cache to load the doc from bucket
// - Assert the doc revision fetched is correct to the first doc we created
func TestGetByCVForDocNotResidentInCache(t *testing.T) {
db, ctx := SetupTestDBWithOptions(t, DatabaseContextOptions{
RevisionCacheOptions: &RevisionCacheOptions{
Size: 1,
},
})
defer db.Close(ctx)
collection := GetSingleDatabaseCollectionWithUser(t, db)
collection.ChannelMapper = channels.NewChannelMapper(ctx, channels.DocChannelsSyncFunction, db.Options.JavascriptTimeout)

// Create a user with access to channel A
authenticator := db.Authenticator(base.TestCtx(t))
user, err := authenticator.NewUser("alice", "letmein", channels.BaseSetOf(t, "A"))
require.NoError(t, err)
require.NoError(t, authenticator.Save(user))
collection.user, err = authenticator.GetUser("alice")
require.NoError(t, err)

const (
doc1ID = "doc1"
doc2ID = "doc2"
)

revBody := Body{"channels": []string{"A"}}
rev, doc, err := collection.Put(ctx, doc1ID, revBody)
require.NoError(t, err)

// put another doc that should evict first doc from cache
_, _, err = collection.Put(ctx, doc2ID, revBody)
require.NoError(t, err)

// get by CV should force a load from bucket and have a cache miss
vrs := doc.HLV.Version
src := doc.HLV.SourceID
sv := &Version{Value: vrs, SourceID: src}
revision, err := collection.GetCV(ctx, doc1ID, sv, true)
require.NoError(t, err)

// assert the fetched doc is the first doc we added and assert that we did in fact get cache miss
assert.Equal(t, int64(1), db.DbStats.Cache().RevisionCacheMisses.Value())
assert.Equal(t, rev, revision.RevID)
assert.Equal(t, sv, revision.CV)
assert.Equal(t, doc1ID, revision.DocID)
assert.Equal(t, []byte(`{"channels":["A"]}`), revision.BodyBytes)
}

// TestGetCVActivePathway:
// - Two test cases, one with doc a user will have access to, one without
// - Purpose is top specify nil CV to the GetCV function to force the GetActive code pathway
// - Assert doc that is created is fetched correctly when user has access to doc
// - Assert that correct error is returned when user has no access to the doc
func TestGetCVActivePathway(t *testing.T) {
const docID = "doc1"

testCases := []struct {
name string
docChannels []string
access bool
}{
{
name: "activeFetchWithUserAccess",
docChannels: []string{"A"},
access: true,
},
{
name: "activeFetchWithoutUserAccess",
docChannels: []string{"B"},
access: false,
},
}
for _, testCase := range testCases {
t.Run(testCase.name, func(t *testing.T) {
db, ctx := setupTestDB(t)
defer db.Close(ctx)
collection := GetSingleDatabaseCollectionWithUser(t, db)
collection.ChannelMapper = channels.NewChannelMapper(ctx, channels.DocChannelsSyncFunction, db.Options.JavascriptTimeout)

// Create a user with access to channel A
authenticator := db.Authenticator(base.TestCtx(t))
user, err := authenticator.NewUser("alice", "letmein", channels.BaseSetOf(t, "A"))
require.NoError(t, err)
require.NoError(t, authenticator.Save(user))
collection.user, err = authenticator.GetUser("alice")
require.NoError(t, err)

// test get active path by specifying nil cv
revBody := Body{"channels": testCase.docChannels}
rev, doc, err := collection.Put(ctx, docID, revBody)
require.NoError(t, err)
revision, err := collection.GetCV(ctx, docID, nil, true)

if testCase.access == true {
require.NoError(t, err)
vrs := doc.HLV.Version
src := doc.HLV.SourceID
sv := &Version{Value: vrs, SourceID: src}
assert.Equal(t, rev, revision.RevID)
assert.Equal(t, sv, revision.CV)
assert.Equal(t, docID, revision.DocID)
assert.Equal(t, []byte(`{"channels":["A"]}`), revision.BodyBytes)
} else {
require.Error(t, err)
assert.ErrorContains(t, err, ErrForbidden.Error())
assert.Equal(t, DocumentRevision{}, revision)
}
})
}
}
34 changes: 16 additions & 18 deletions db/document.go
Original file line number Diff line number Diff line change
Expand Up @@ -98,6 +98,17 @@ type SyncData struct {
removedRevisionBodyKeys map[string]string // keys of non-winning revisions that have been removed (and so may require deletion), indexed by revID
}

// determine set of current channels based on removal entries.
func (sd *SyncData) getCurrentChannels() base.Set {
ch := base.SetOf()
for channelName, channelRemoval := range sd.Channels {
if channelRemoval == nil || channelRemoval.Seq == 0 {
ch.Add(channelName)
}
}
return ch
}

func (sd *SyncData) HashRedact(salt string) SyncData {

// Creating a new SyncData with the redacted info. We copy all the information which stays the same and create new
Expand Down Expand Up @@ -177,12 +188,11 @@ type Document struct {
Cas uint64 // Document cas
rawUserXattr []byte // Raw user xattr as retrieved from the bucket

Deleted bool
DocExpiry uint32
RevID string
DocAttachments AttachmentsMeta
inlineSyncData bool
currentRevChannels base.Set // A base.Set of the current revision's channels (determined by SyncData.Channels at UnmarshalJSON time)
Deleted bool
DocExpiry uint32
RevID string
DocAttachments AttachmentsMeta
inlineSyncData bool
}

type historyOnlySyncData struct {
Expand Down Expand Up @@ -970,7 +980,6 @@ func (doc *Document) updateChannels(ctx context.Context, newChannels base.Set) (
doc.updateChannelHistory(channel, doc.Sequence, true)
}
}
doc.currentRevChannels = newChannels
if changed != nil {
base.InfofCtx(ctx, base.KeyCRUD, "\tDoc %q / %q in channels %q", base.UD(doc.ID), doc.CurrentRev, base.UD(newChannels))
changedChannels, err = channels.SetFromArray(changed, channels.KeepStar)
Expand Down Expand Up @@ -1080,17 +1089,6 @@ func (doc *Document) UnmarshalJSON(data []byte) error {
doc.SyncData = *syncData.SyncData
}

// determine current revision's channels and store in-memory (avoids doc.Channels iteration at access-check time)
if len(doc.Channels) > 0 {
ch := base.SetOf()
for channelName, channelRemoval := range doc.Channels {
if channelRemoval == nil || channelRemoval.Seq == 0 {
ch.Add(channelName)
}
}
doc.currentRevChannels = ch
}

// Unmarshal the rest of the doc body as map[string]interface{}
if err := doc._body.Unmarshal(data); err != nil {
return pkgerrors.WithStack(base.RedactErrorf("Failed to UnmarshalJSON() doc with id: %s. Error: %v", base.UD(doc.ID), err))
Expand Down
4 changes: 2 additions & 2 deletions db/revision_cache_interface.go
Original file line number Diff line number Diff line change
Expand Up @@ -275,7 +275,7 @@ func revCacheLoader(ctx context.Context, backingStore RevisionCacheBackingStore,
return revCacheLoaderForDocument(ctx, backingStore, doc, id.RevID)
}

// revCacheLoaderForCv will load a document from the bucket using the CV, comapre the fetched doc and the CV specified in the function,
// revCacheLoaderForCv will load a document from the bucket using the CV, compare the fetched doc and the CV specified in the function,
// and will still return revid for purpose of populating the Rev ID lookup map on the cache
func revCacheLoaderForCv(ctx context.Context, backingStore RevisionCacheBackingStore, id IDandCV, unmarshalBody bool) (bodyBytes []byte, body Body, history Revisions, channels base.Set, removed bool, attachments AttachmentsMeta, deleted bool, expiry *time.Time, revid string, err error) {
cv := Version{
Expand Down Expand Up @@ -337,7 +337,7 @@ func revCacheLoaderForDocumentCV(ctx context.Context, backingStore RevisionCache
if err = doc.HasCurrentVersion(cv); err != nil {
return bodyBytes, body, history, channels, removed, attachments, deleted, doc.Expiry, revid, err
}
channels = doc.currentRevChannels
channels = doc.SyncData.getCurrentChannels()
revid = doc.CurrentRev

return bodyBytes, body, history, channels, removed, attachments, deleted, doc.Expiry, revid, err
Expand Down
Loading

0 comments on commit 07f4320

Please sign in to comment.