Skip to content

Commit

Permalink
Merge remote-tracking branch 'origin/release/anemone' into CBG-3861-u…
Browse files Browse the repository at this point in the history
…pdate-tests
  • Loading branch information
torcolvin committed Sep 30, 2024
2 parents 1b333cf + edc6407 commit ffa5fa7
Show file tree
Hide file tree
Showing 18 changed files with 1,440 additions and 44 deletions.
37 changes: 25 additions & 12 deletions db/attachment_compaction.go
Original file line number Diff line number Diff line change
Expand Up @@ -193,9 +193,9 @@ type AttachmentsMetaMap struct {
Attachments map[string]AttachmentsMeta `json:"_attachments"`
}

// AttachmentCompactionData struct to unmarshal a document sync data into in order to process attachments during mark
// AttachmentCompactionSyncData struct to unmarshal a document sync data into in order to process attachments during mark
// phase. Contains only what is necessary
type AttachmentCompactionData struct {
type AttachmentCompactionSyncData struct {
Attachments map[string]AttachmentsMeta `json:"attachments"`
Flags uint8 `json:"flags"`
History struct {
Expand All @@ -204,29 +204,42 @@ type AttachmentCompactionData struct {
} `json:"history"`
}

// getAttachmentSyncData takes the data type and data from the DCP feed and will return a AttachmentCompactionData
// AttachmentCompactionGlobalSyncData is to unmarshal a documents global xattr in order to process attachments during mark phase.
type AttachmentCompactionGlobalSyncData struct {
Attachments map[string]AttachmentsMeta `json:"attachments_meta"`
}

// getAttachmentSyncData takes the data type and data from the DCP feed and will return a AttachmentCompactionSyncData
// struct containing data needed to process attachments on a document.
func getAttachmentSyncData(dataType uint8, data []byte) (*AttachmentCompactionData, error) {
var attachmentData *AttachmentCompactionData
func getAttachmentSyncData(dataType uint8, data []byte) (*AttachmentCompactionSyncData, error) {
var attachmentSyncData *AttachmentCompactionSyncData
var attachmentGlobalSyncData AttachmentCompactionGlobalSyncData
var documentBody []byte

if dataType&base.MemcachedDataTypeXattr != 0 {
body, xattrs, err := sgbucket.DecodeValueWithXattrs([]string{base.SyncXattrName}, data)
body, xattrs, err := sgbucket.DecodeValueWithXattrs([]string{base.SyncXattrName, base.GlobalXattrName}, data)
if err != nil {
if errors.Is(err, sgbucket.ErrXattrInvalidLen) {
return nil, nil
}
return nil, fmt.Errorf("Could not parse DCP attachment sync data: %w", err)
}
err = base.JSONUnmarshal(xattrs[base.SyncXattrName], &attachmentData)
err = base.JSONUnmarshal(xattrs[base.SyncXattrName], &attachmentSyncData)
if err != nil {
return nil, err
}
if xattrs[base.GlobalXattrName] != nil && attachmentSyncData.Attachments == nil {
err = base.JSONUnmarshal(xattrs[base.GlobalXattrName], &attachmentGlobalSyncData)
if err != nil {
return nil, err
}
attachmentSyncData.Attachments = attachmentGlobalSyncData.Attachments
}
documentBody = body

} else {
type AttachmentDataSync struct {
AttachmentData AttachmentCompactionData `json:"_sync"`
AttachmentData AttachmentCompactionSyncData `json:"_sync"`
}
var attachmentDataSync AttachmentDataSync
err := base.JSONUnmarshal(data, &attachmentDataSync)
Expand All @@ -235,21 +248,21 @@ func getAttachmentSyncData(dataType uint8, data []byte) (*AttachmentCompactionDa
}

documentBody = data
attachmentData = &attachmentDataSync.AttachmentData
attachmentSyncData = &attachmentDataSync.AttachmentData
}

// If we've not yet found any attachments have a last effort attempt to grab it from the body for pre-2.5 documents
if len(attachmentData.Attachments) == 0 {
if len(attachmentSyncData.Attachments) == 0 {
attachmentMetaMap, err := checkForInlineAttachments(documentBody)
if err != nil {
return nil, err
}
if attachmentMetaMap != nil {
attachmentData.Attachments = attachmentMetaMap.Attachments
attachmentSyncData.Attachments = attachmentMetaMap.Attachments
}
}

return attachmentData, nil
return attachmentSyncData, nil
}

// checkForInlineAttachments will scan a body for "_attachments" for pre-2.5 attachments and will return any attachments
Expand Down
26 changes: 26 additions & 0 deletions db/crud.go
Original file line number Diff line number Diff line change
Expand Up @@ -941,6 +941,32 @@ func (db *DatabaseCollectionWithUser) updateHLV(d *Document, docUpdateEvent DocU
return d, nil
}

// MigrateAttachmentMetadata will move any attachment metadata defined in sync data to global sync xattr
func (c *DatabaseCollectionWithUser) MigrateAttachmentMetadata(ctx context.Context, docID string, cas uint64, syncData *SyncData) error {
globalData := GlobalSyncData{
GlobalAttachments: syncData.Attachments,
}
globalXattr, err := base.JSONMarshal(globalData)
if err != nil {
return base.RedactErrorf("Failed to Marshal global sync data when attempting to migrate sync data attachments to global xattr with id: %s. Error: %v", base.UD(docID), err)
}
syncData.Attachments = nil
rawSyncXattr, err := base.JSONMarshal(*syncData)
if err != nil {
return base.RedactErrorf("Failed to Marshal sync data when attempting to migrate sync data attachments to global xattr with id: %s. Error: %v", base.UD(docID), err)
}

// build macro expansion for sync data. This will avoid the update to xattrs causing an extra import event (i.e. sync cas will be == to doc cas)
opts := &sgbucket.MutateInOptions{}
spec := macroExpandSpec(base.SyncXattrName)
opts.MacroExpansion = spec
opts.PreserveExpiry = true // if doc has expiry, we should preserve this

updatedXattr := map[string][]byte{base.SyncXattrName: rawSyncXattr, base.GlobalXattrName: globalXattr}
_, err = c.dataStore.UpdateXattrs(ctx, docID, 0, cas, updatedXattr, opts)
return err
}

// Updates or creates a document.
// The new body's BodyRev property must match the current revision's, if any.
func (db *DatabaseCollectionWithUser) Put(ctx context.Context, docid string, body Body) (newRevID string, doc *Document, err error) {
Expand Down
2 changes: 1 addition & 1 deletion db/import.go
Original file line number Diff line number Diff line change
Expand Up @@ -97,7 +97,7 @@ func (db *DatabaseCollectionWithUser) ImportDoc(ctx context.Context, docid strin
existingBucketDoc.Xattrs[base.MouXattrName], err = base.JSONMarshal(existingDoc.metadataOnlyUpdate)
}
} else {
existingBucketDoc.Body, existingBucketDoc.Xattrs[base.SyncXattrName], existingBucketDoc.Xattrs[base.VvXattrName], existingBucketDoc.Xattrs[base.MouXattrName], _, err = existingDoc.MarshalWithXattrs()
existingBucketDoc.Body, existingBucketDoc.Xattrs[base.SyncXattrName], existingBucketDoc.Xattrs[base.VvXattrName], existingBucketDoc.Xattrs[base.MouXattrName], existingBucketDoc.Xattrs[base.GlobalXattrName], err = existingDoc.MarshalWithXattrs()
}
}

Expand Down
9 changes: 8 additions & 1 deletion db/import_listener.go
Original file line number Diff line number Diff line change
Expand Up @@ -188,13 +188,13 @@ func (il *importListener) ImportFeedEvent(ctx context.Context, collection *Datab
}
}

docID := string(event.Key)
// If syncData is nil, or if this was not an SG write, attempt to import
if syncData == nil || !isSGWrite {
isDelete := event.Opcode == sgbucket.FeedOpDeletion
if isDelete {
rawBody = nil
}
docID := string(event.Key)

// last attempt to exit processing if the importListener has been closed before attempting to write to the bucket
select {
Expand All @@ -220,6 +220,13 @@ func (il *importListener) ImportFeedEvent(ctx context.Context, collection *Datab
base.DebugfCtx(ctx, base.KeyImport, "Did not import doc %q - external update will not be accessible via Sync Gateway. Reason: %v", base.UD(docID), err)
}
}
} else if syncData != nil && syncData.Attachments != nil {
base.DebugfCtx(ctx, base.KeyImport, "Attachment metadata found in sync data for doc with id %s, migrating attachment metadata", base.UD(docID))
// we have attachments to migrate
err := collection.MigrateAttachmentMetadata(ctx, docID, event.Cas, syncData)
if err != nil {
base.WarnfCtx(ctx, "error migrating attachment metadata from sync data to global sync for doc %s. Error: %v", base.UD(docID), err)
}
}
}

Expand Down
25 changes: 25 additions & 0 deletions db/util_testing.go
Original file line number Diff line number Diff line change
Expand Up @@ -893,3 +893,28 @@ func RetrieveDocRevSeqNo(t *testing.T, docxattr []byte) uint64 {
require.NoError(t, err)
return revNo
}

// MoveAttachmentXattrFromGlobalToSync is a test only function that will move any defined attachment metadata in global xattr to sync data xattr
func MoveAttachmentXattrFromGlobalToSync(t *testing.T, ctx context.Context, docID string, cas uint64, value, syncXattr []byte, attachments AttachmentsMeta, macroExpand bool, dataStore base.DataStore) {
var docSync SyncData
err := base.JSONUnmarshal(syncXattr, &docSync)
require.NoError(t, err)
docSync.Attachments = attachments

opts := &sgbucket.MutateInOptions{}
// this should be true for cases we want to move the attachment metadata without causing a new import feed event
if macroExpand {
spec := macroExpandSpec(base.SyncXattrName)
opts.MacroExpansion = spec
} else {
opts = nil
docSync.Cas = ""
}

newSync, err := base.JSONMarshal(docSync)
require.NoError(t, err)

// change this to update xattr
_, err = dataStore.WriteWithXattrs(ctx, docID, 0, cas, value, map[string][]byte{base.SyncXattrName: newSync}, []string{base.GlobalXattrName}, opts)
require.NoError(t, err)
}
72 changes: 72 additions & 0 deletions rest/attachment_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2252,6 +2252,78 @@ func TestAttachmentDeleteOnExpiry(t *testing.T) {
}

}

// TestUpdateViaBlipMigrateAttachment:
// - Tests document update through blip to a doc with attachment metadata deined in sync data
// - Assert that the c doc update this way will migrate the attachment metadata from sync data to global sync data
func TestUpdateViaBlipMigrateAttachment(t *testing.T) {
rtConfig := &RestTesterConfig{
GuestEnabled: true,
}

btcRunner := NewBlipTesterClientRunner(t)
btcRunner.SkipSubtest[VersionVectorSubtestName] = true // attachments not yet replicated in V4 protocol
const (
doc1ID = "doc1"
)
btcRunner.Run(func(t *testing.T, SupportedBLIPProtocols []string) {
rt := NewRestTester(t, rtConfig)
defer rt.Close()

opts := &BlipTesterClientOpts{SupportedBLIPProtocols: SupportedBLIPProtocols}
btc := btcRunner.NewBlipTesterClientOptsWithRT(rt, opts)
defer btc.Close()
ds := rt.GetSingleDataStore()
ctx := base.TestCtx(t)

initialVersion := btc.rt.PutDoc(doc1ID, `{"_attachments": {"hello.txt": {"data": "aGVsbG8gd29ybGQ="}}}`)
btc.rt.WaitForPendingChanges()
btcRunner.StartOneshotPull(btc.id)
btcRunner.WaitForVersion(btc.id, doc1ID, initialVersion)

value, xattrs, cas, err := ds.GetWithXattrs(ctx, doc1ID, []string{base.SyncXattrName, base.GlobalXattrName})
require.NoError(t, err)
syncXattr, ok := xattrs[base.SyncXattrName]
require.True(t, ok)
globalXattr, ok := xattrs[base.GlobalXattrName]
require.True(t, ok)

var attachs db.GlobalSyncData
err = base.JSONUnmarshal(globalXattr, &attachs)
require.NoError(t, err)

// move attachment metadata from global xattr to sync xattr
db.MoveAttachmentXattrFromGlobalToSync(t, ctx, doc1ID, cas, value, syncXattr, attachs.GlobalAttachments, true, ds)

// push revision from client
doc1Version, err := btcRunner.PushRev(btc.id, doc1ID, initialVersion, []byte(`{"new": "val", "_attachments": {"hello.txt": {"data": "aGVsbG8gd29ybGQ="}}}`))
require.NoError(t, err)
assert.NoError(t, rt.WaitForVersion(doc1ID, doc1Version))

// assert the pushed rev updates the doc in bucket and migrates attachment metadata in process
xattrs, _, err = ds.GetXattrs(ctx, doc1ID, []string{base.SyncXattrName, base.GlobalXattrName})
require.NoError(t, err)
syncXattr, ok = xattrs[base.SyncXattrName]
require.True(t, ok)
globalXattr, ok = xattrs[base.GlobalXattrName]
require.True(t, ok)

// empty global sync,
attachs = db.GlobalSyncData{}
err = base.JSONUnmarshal(globalXattr, &attachs)
require.NoError(t, err)
var syncData db.SyncData
err = base.JSONUnmarshal(syncXattr, &syncData)
require.NoError(t, err)

// assert that the attachment metadata has been moved
assert.NotNil(t, attachs.GlobalAttachments)
assert.Nil(t, syncData.Attachments)
att := attachs.GlobalAttachments["hello.txt"].(map[string]interface{})
assert.Equal(t, float64(11), att["length"])
})
}

func TestUpdateExistingAttachment(t *testing.T) {
rtConfig := &RestTesterConfig{
GuestEnabled: true,
Expand Down
3 changes: 1 addition & 2 deletions rest/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2362,9 +2362,8 @@ func TestInvalidJavascriptFunctions(t *testing.T) {

for _, testCase := range testCases {
t.Run(testCase.Name, func(t *testing.T) {
safeDbName := strings.ToLower(strings.ReplaceAll(testCase.Name, " ", "-"))
dbConfig := DbConfig{
Name: safeDbName,
Name: SafeDatabaseName(t, testCase.Name),
}

if testCase.SyncFunction != nil {
Expand Down
Loading

0 comments on commit ffa5fa7

Please sign in to comment.