diff --git a/extensions/replication/ReplicationQueuePopulator.js b/extensions/replication/ReplicationQueuePopulator.js index a16fed3e8..4ebb94c61 100644 --- a/extensions/replication/ReplicationQueuePopulator.js +++ b/extensions/replication/ReplicationQueuePopulator.js @@ -19,10 +19,7 @@ class ReplicationQueuePopulator extends QueuePopulatorExtension { if (entry.bucket === usersBucket) { return this._filterBucketOp(entry); } - if (!isMasterKey(entry.key)) { - return this._filterVersionedKey(entry); - } - return undefined; + return this._filterKeyOp(entry); } _filterBucketOp(entry) { @@ -40,7 +37,7 @@ class ReplicationQueuePopulator extends QueuePopulatorExtension { entry.bucket, JSON.stringify(publishedEntry)); } - _filterVersionedKey(entry) { + _filterKeyOp(entry) { if (entry.type !== 'put') { return; } @@ -51,8 +48,6 @@ class ReplicationQueuePopulator extends QueuePopulatorExtension { if (sanityCheckRes) { return; } - // Allow a non-versioned object if being replicated from an NFS bucket. - // Or if the master key is of a non versioned object if (!this._entryCanBeReplicated(queueEntry)) { return; } @@ -85,30 +80,28 @@ class ReplicationQueuePopulator extends QueuePopulatorExtension { } /** - * Filter if the entry is considered a valid master key entry. + * Accept the entry if considered a valid master key entry. * There is a case where a single null entry looks like a master key and * will not have a duplicate versioned key. They are created when you have a * non-versioned bucket with objects, and then convert bucket to versioned. * If no new versioned objects are added for given object(s), they look like * standalone master keys. The `isNull` case is undefined for these entries. - * Non-versioned objects if being replicated from an NFS bucket are also allowed * Null versions which are objects created after suspending versioning are allowed, * these only have a master object that has an internal versionId and a 'isNull' flag. - * @param {ObjectQueueEntry} entry - raw queue entry - * @return {Boolean} true if we should filter entry + * @param {ObjectQueueEntry} entry - queue entry + * @return {Boolean} true if we should accept entry */ _entryCanBeReplicated(entry) { const isMaster = isMasterKey(entry.getObjectVersionedKey()); - const isNFS = entry.getReplicationIsNFS(); // single null entries will have a version id as undefined or null. // do not filter single null entries const isNonVersionedMaster = entry.getVersionId() === undefined; const isNullVersionedMaster = entry.getIsNull(); - if (isMaster && !isNFS && !isNonVersionedMaster && !isNullVersionedMaster) { - this.log.trace('skipping master key entry'); - return false; + if (!isMaster || isNonVersionedMaster || isNullVersionedMaster) { + return true; } - return true; + this.log.trace('skipping master key entry'); + return false; } } diff --git a/package.json b/package.json index 92d04632f..1173096f0 100644 --- a/package.json +++ b/package.json @@ -37,7 +37,7 @@ "homepage": "https://github.com/scality/backbeat#readme", "dependencies": { "@hapi/joi": "^15.1.0", - "arsenal": "git+https://github.com/scality/arsenal#7.70.14", + "arsenal": "git+https://github.com/scality/arsenal#7.70.30", "async": "^2.3.0", "aws-sdk": "^2.1326.0", "backo": "^1.1.0", diff --git a/tests/unit/replication/ReplicationQueuePopulator.spec.js b/tests/unit/replication/ReplicationQueuePopulator.spec.js index d1f9a6bd4..bae3fa54b 100644 --- a/tests/unit/replication/ReplicationQueuePopulator.spec.js +++ b/tests/unit/replication/ReplicationQueuePopulator.spec.js @@ -120,7 +120,7 @@ describe('replication queue populator', () => { }, }, { value: JSON.stringify(kafkaValue) }); - rqp._filterVersionedKey(entry); + rqp._filterKeyOp(entry); sinon.assert.calledOnceWithExactly( params.metricsHandler.bytes, @@ -149,4 +149,68 @@ describe('replication queue populator', () => { // should not throw rqp._filterBucketOp(entry); }); + + // A "standalone null master key" is created when an object is placed in a non-versioned bucket, + // which is then converted to a versioned bucket. If no new versioned objects are added for that object, + // it appears as a standalone null master key with no version id. + it('should replicate standalone null master key', () => { + const customKafkaValue = { + ...kafkaValue, + }; + delete customKafkaValue.versionId; + const entry = Object.assign({}, { + type: 'put', + bucket: 'test-bucket-source', + key: '\x7FMkey0', + logReader: { + getMetricLabels: () => {}, + }, + }, { value: JSON.stringify(customKafkaValue) }); + + rqp._filterKeyOp(entry); + + const publishedMessage = rqp.getState(); + assert(publishedMessage.key); + }); + + it('should replicate master suspended null version', () => { + const customKafkaValue = { + ...kafkaValue, + versionId: '98285859405462999999RG001 ', + isNull: true, + }; + const entry = Object.assign({}, { + type: 'put', + bucket: 'test-bucket-source', + key: '\x7FMkey0', + logReader: { + getMetricLabels: () => {}, + }, + }, { value: JSON.stringify(customKafkaValue) }); + + rqp._filterKeyOp(entry); + + const publishedMessage = rqp.getState(); + assert(publishedMessage.key); + }); + + it('should not replicate non-null master', () => { + const customKafkaValue = { + ...kafkaValue, + versionId: '98285859405462999999RG001 ', + }; + const entry = Object.assign({}, { + type: 'put', + bucket: 'test-bucket-source', + key: '\x7FMkey0', + logReader: { + getMetricLabels: () => {}, + }, + }, { value: JSON.stringify(customKafkaValue) }); + + rqp._filterKeyOp(entry); + + const publishedMessage = rqp.getState(); + assert(!publishedMessage.key); + }); }); diff --git a/yarn.lock b/yarn.lock index ccc9802d5..3be69958b 100644 --- a/yarn.lock +++ b/yarn.lock @@ -265,6 +265,11 @@ dependencies: "@hapi/hoek" "^9.0.0" +"@js-sdsl/ordered-set@^4.4.2": + version "4.4.2" + resolved "https://registry.yarnpkg.com/@js-sdsl/ordered-set/-/ordered-set-4.4.2.tgz#ab857eb63cf358b5a0f74fdd458b4601423779b7" + integrity sha512-ieYQ8WlBPKYzEo81H3q0DFbd8WtFRXXABb4+vRCF0AO3WWtJZFxYvRGdipUXGrd6tlSySmqhcPuO3J6SCodCxg== + "@npmcli/fs@^1.0.0": version "1.1.0" resolved "https://registry.yarnpkg.com/@npmcli/fs/-/fs-1.1.0.tgz#bec1d1b89c170d40e1b73ad6c943b0b75e7d2951" @@ -841,10 +846,11 @@ arraybuffer.slice@~0.0.7: optionalDependencies: ioctl "^2.0.2" -"arsenal@git+https://github.com/scality/arsenal#7.70.14": - version "7.70.14" - resolved "git+https://github.com/scality/arsenal#a99a6d9d971ec57500b396616cfadf50df1f725a" +"arsenal@git+https://github.com/scality/arsenal#7.70.30": + version "7.70.30" + resolved "git+https://github.com/scality/arsenal#a86cff46317e36f6018aa17ae96130a43b422188" dependencies: + "@js-sdsl/ordered-set" "^4.4.2" "@types/async" "^3.2.12" "@types/utf8" "^3.0.1" JSONStream "^1.0.0" @@ -876,7 +882,7 @@ arraybuffer.slice@~0.0.7: sproxydclient "github:scality/sproxydclient#8.0.4" utf8 "2.1.2" uuid "^3.0.1" - werelogs scality/werelogs#8.1.0 + werelogs scality/werelogs#8.1.4 xml2js "~0.4.23" optionalDependencies: ioctl "^2.0.2" @@ -6081,6 +6087,13 @@ werelogs@scality/werelogs#8.1.0: dependencies: safe-json-stringify "1.0.3" +werelogs@scality/werelogs#8.1.4: + version "8.1.4" + resolved "https://codeload.github.com/scality/werelogs/tar.gz/d6bec11df034c88a12959791eb7dd60913eb5f47" + dependencies: + fast-safe-stringify "^2.1.1" + safe-json-stringify "^1.2.0" + werelogs@scality/werelogs#GA7.2.0.5: version "7.2.0" resolved "https://codeload.github.com/scality/werelogs/tar.gz/bc034589ebf7810d6e6d61932f94327976de6eef"