Skip to content

Commit

Permalink
BB-483 fix replicating null version
Browse files Browse the repository at this point in the history
  • Loading branch information
nicolas2bert committed May 16, 2024
1 parent bd830a5 commit 2ba28c0
Show file tree
Hide file tree
Showing 4 changed files with 92 additions and 22 deletions.
25 changes: 9 additions & 16 deletions extensions/replication/ReplicationQueuePopulator.js
Original file line number Diff line number Diff line change
Expand Up @@ -19,10 +19,7 @@ class ReplicationQueuePopulator extends QueuePopulatorExtension {
if (entry.bucket === usersBucket) {
return this._filterBucketOp(entry);
}
if (!isMasterKey(entry.key)) {
return this._filterVersionedKey(entry);
}
return undefined;
return this._filterKeyOp(entry);
}

_filterBucketOp(entry) {
Expand All @@ -40,7 +37,7 @@ class ReplicationQueuePopulator extends QueuePopulatorExtension {
entry.bucket, JSON.stringify(publishedEntry));
}

_filterVersionedKey(entry) {
_filterKeyOp(entry) {
if (entry.type !== 'put') {
return;
}
Expand All @@ -51,8 +48,6 @@ class ReplicationQueuePopulator extends QueuePopulatorExtension {
if (sanityCheckRes) {
return;
}
// Allow a non-versioned object if being replicated from an NFS bucket.
// Or if the master key is of a non versioned object
if (!this._entryCanBeReplicated(queueEntry)) {
return;
}
Expand Down Expand Up @@ -85,30 +80,28 @@ class ReplicationQueuePopulator extends QueuePopulatorExtension {
}

/**
* Filter if the entry is considered a valid master key entry.
* Accept the entry if considered a valid master key entry.
* There is a case where a single null entry looks like a master key and
* will not have a duplicate versioned key. They are created when you have a
* non-versioned bucket with objects, and then convert bucket to versioned.
* If no new versioned objects are added for given object(s), they look like
* standalone master keys. The `isNull` case is undefined for these entries.
* Non-versioned objects if being replicated from an NFS bucket are also allowed
* Null versions which are objects created after suspending versioning are allowed,
* these only have a master object that has an internal versionId and a 'isNull' flag.
* @param {ObjectQueueEntry} entry - raw queue entry
* @return {Boolean} true if we should filter entry
* @param {ObjectQueueEntry} entry - queue entry
* @return {Boolean} true if we should accept entry
*/
_entryCanBeReplicated(entry) {
const isMaster = isMasterKey(entry.getObjectVersionedKey());
const isNFS = entry.getReplicationIsNFS();
// single null entries will have a version id as undefined or null.
// do not filter single null entries
const isNonVersionedMaster = entry.getVersionId() === undefined;
const isNullVersionedMaster = entry.getIsNull();
if (isMaster && !isNFS && !isNonVersionedMaster && !isNullVersionedMaster) {
this.log.trace('skipping master key entry');
return false;
if (!isMaster || isNonVersionedMaster || isNullVersionedMaster) {
return true;
}
return true;
this.log.trace('skipping master key entry');
return false;
}
}

Expand Down
2 changes: 1 addition & 1 deletion package.json
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@
"homepage": "https://github.com/scality/backbeat#readme",
"dependencies": {
"@hapi/joi": "^15.1.0",
"arsenal": "git+https://github.com/scality/arsenal#7.70.14",
"arsenal": "git+https://github.com/scality/arsenal#7.70.30",
"async": "^2.3.0",
"aws-sdk": "^2.1326.0",
"backo": "^1.1.0",
Expand Down
66 changes: 65 additions & 1 deletion tests/unit/replication/ReplicationQueuePopulator.spec.js
Original file line number Diff line number Diff line change
Expand Up @@ -120,7 +120,7 @@ describe('replication queue populator', () => {
},
}, { value: JSON.stringify(kafkaValue) });

rqp._filterVersionedKey(entry);
rqp._filterKeyOp(entry);

sinon.assert.calledOnceWithExactly(
params.metricsHandler.bytes,
Expand Down Expand Up @@ -149,4 +149,68 @@ describe('replication queue populator', () => {
// should not throw
rqp._filterBucketOp(entry);
});

// A "standalone null master key" is created when an object is placed in a non-versioned bucket,
// which is then converted to a versioned bucket. If no new versioned objects are added for that object,
// it appears as a standalone null master key with no version id.
it('should replicate standalone null master key', () => {
const customKafkaValue = {
...kafkaValue,
};
delete customKafkaValue.versionId;
const entry = Object.assign({}, {
type: 'put',
bucket: 'test-bucket-source',
key: '\x7FMkey0',
logReader: {
getMetricLabels: () => {},
},
}, { value: JSON.stringify(customKafkaValue) });

rqp._filterKeyOp(entry);

const publishedMessage = rqp.getState();
assert(publishedMessage.key);
});

it('should replicate master suspended null version', () => {
const customKafkaValue = {
...kafkaValue,
versionId: '98285859405462999999RG001 ',
isNull: true,
};
const entry = Object.assign({}, {
type: 'put',
bucket: 'test-bucket-source',
key: '\x7FMkey0',
logReader: {
getMetricLabels: () => {},
},
}, { value: JSON.stringify(customKafkaValue) });

rqp._filterKeyOp(entry);

const publishedMessage = rqp.getState();
assert(publishedMessage.key);
});

it('should not replicate non-null master', () => {
const customKafkaValue = {
...kafkaValue,
versionId: '98285859405462999999RG001 ',
};
const entry = Object.assign({}, {
type: 'put',
bucket: 'test-bucket-source',
key: '\x7FMkey0',
logReader: {
getMetricLabels: () => {},
},
}, { value: JSON.stringify(customKafkaValue) });

rqp._filterKeyOp(entry);

const publishedMessage = rqp.getState();
assert(!publishedMessage.key);
});
});
21 changes: 17 additions & 4 deletions yarn.lock
Original file line number Diff line number Diff line change
Expand Up @@ -265,6 +265,11 @@
dependencies:
"@hapi/hoek" "^9.0.0"

"@js-sdsl/ordered-set@^4.4.2":
version "4.4.2"
resolved "https://registry.yarnpkg.com/@js-sdsl/ordered-set/-/ordered-set-4.4.2.tgz#ab857eb63cf358b5a0f74fdd458b4601423779b7"
integrity sha512-ieYQ8WlBPKYzEo81H3q0DFbd8WtFRXXABb4+vRCF0AO3WWtJZFxYvRGdipUXGrd6tlSySmqhcPuO3J6SCodCxg==

"@npmcli/fs@^1.0.0":
version "1.1.0"
resolved "https://registry.yarnpkg.com/@npmcli/fs/-/fs-1.1.0.tgz#bec1d1b89c170d40e1b73ad6c943b0b75e7d2951"
Expand Down Expand Up @@ -841,10 +846,11 @@ arraybuffer.slice@~0.0.7:
optionalDependencies:
ioctl "^2.0.2"

"arsenal@git+https://github.com/scality/arsenal#7.70.14":
version "7.70.14"
resolved "git+https://github.com/scality/arsenal#a99a6d9d971ec57500b396616cfadf50df1f725a"
"arsenal@git+https://github.com/scality/arsenal#7.70.30":
version "7.70.30"
resolved "git+https://github.com/scality/arsenal#a86cff46317e36f6018aa17ae96130a43b422188"
dependencies:
"@js-sdsl/ordered-set" "^4.4.2"
"@types/async" "^3.2.12"
"@types/utf8" "^3.0.1"
JSONStream "^1.0.0"
Expand Down Expand Up @@ -876,7 +882,7 @@ arraybuffer.slice@~0.0.7:
sproxydclient "github:scality/sproxydclient#8.0.4"
utf8 "2.1.2"
uuid "^3.0.1"
werelogs scality/werelogs#8.1.0
werelogs scality/werelogs#8.1.4
xml2js "~0.4.23"
optionalDependencies:
ioctl "^2.0.2"
Expand Down Expand Up @@ -6081,6 +6087,13 @@ werelogs@scality/werelogs#8.1.0:
dependencies:
safe-json-stringify "1.0.3"

werelogs@scality/werelogs#8.1.4:
version "8.1.4"
resolved "https://codeload.github.com/scality/werelogs/tar.gz/d6bec11df034c88a12959791eb7dd60913eb5f47"
dependencies:
fast-safe-stringify "^2.1.1"
safe-json-stringify "^1.2.0"

werelogs@scality/werelogs#GA7.2.0.5:
version "7.2.0"
resolved "https://codeload.github.com/scality/werelogs/tar.gz/bc034589ebf7810d6e6d61932f94327976de6eef"
Expand Down

0 comments on commit 2ba28c0

Please sign in to comment.