From e45aebdcaa4bd1198fd8e9dbdc0191850c61e342 Mon Sep 17 00:00:00 2001 From: Travis Barnette Date: Fri, 11 Jul 2025 12:49:53 -0400 Subject: [PATCH 1/5] adding the ability to run headObject on the s3 connector --- src/connectors/s3.js | 11 ++++++++++- test/unit/connectors/s3.test.js | 18 +++++++++++++++++- 2 files changed, 27 insertions(+), 2 deletions(-) diff --git a/src/connectors/s3.js b/src/connectors/s3.js index 7202df98..d7468d10 100644 --- a/src/connectors/s3.js +++ b/src/connectors/s3.js @@ -3,7 +3,7 @@ import { Readable } from 'stream'; import { CopyObjectCommand, - DeleteObjectCommand, GetObjectCommand, ListObjectsV2Command, PutObjectCommand, S3Client, + DeleteObjectCommand, GetObjectCommand, HeadObjectCommand, ListObjectsV2Command, PutObjectCommand, S3Client, } from '@aws-sdk/client-s3'; import { NodeHttpHandler } from '@smithy/node-http-handler'; import Promise from 'bluebird'; @@ -47,6 +47,15 @@ class Connector { return this.clients[pipelineId]; } + headObject(inputParams, ctx) { + const params = { + Bucket: this.bucketName, + ...inputParams, + }; + + return this._sendCommand(new HeadObjectCommand(params), ctx); + } + putObject(inputParams, ctx) { const params = { Bucket: this.bucketName, diff --git a/test/unit/connectors/s3.test.js b/test/unit/connectors/s3.test.js index d1e70521..e12b4c6b 100644 --- a/test/unit/connectors/s3.test.js +++ b/test/unit/connectors/s3.test.js @@ -6,7 +6,7 @@ import { Readable } from 'stream'; import { mockClient } from 'aws-sdk-client-mock'; import { CopyObjectCommand, - DeleteObjectCommand, GetObjectCommand, ListObjectsV2Command, PutObjectCommand, S3Client, + DeleteObjectCommand, GetObjectCommand, HeadObjectCommand, ListObjectsV2Command, PutObjectCommand, S3Client, } from '@aws-sdk/client-s3'; import { sdkStreamMixin } from '@smithy/util-stream'; @@ -199,4 +199,20 @@ describe('connectors/s3.js', () => { }); expect(data).to.deep.equal({}); }); + it('should head object', async () => { + const spy = sinon.spy(() => ({})); + mockS3.on(HeadObjectCommand).callsFake(spy); + const inputParams = { + Key: 'k1', + }; + const data = await new Connector({ + debug: debug('s3'), + bucketName: 'b1', + }).headObject(inputParams); + expect(spy).to.have.been.calledWith({ + Key: 'k1', + Bucket: 'b1', + }); + expect(data).to.deep.equal({}); + }); }); From 55f0500dcd51ae06adf6bd8a2c1b9575c95a5a4f Mon Sep 17 00:00:00 2001 From: Travis Barnette Date: Fri, 11 Jul 2025 13:03:24 -0400 Subject: [PATCH 2/5] adding query for headObject query --- src/queries/s3.js | 29 +++++++++++++++++ test/unit/queries/s3.test.js | 63 ++++++++++++++++++++++++++++++++++++ 2 files changed, 92 insertions(+) diff --git a/src/queries/s3.js b/src/queries/s3.js index 30e26ea2..ee64c4ce 100644 --- a/src/queries/s3.js +++ b/src/queries/s3.js @@ -193,3 +193,32 @@ export const pageObjectsFromS3 = ({ .map(listObjects) .parallel(parallel); }; + +export const headS3Object= ({ + id: pipelineId, + debug = d('s3'), + bucketName = process.env.BUCKET_NAME, + headRequestField = 'headRequest', + headResponseField = 'headResponse', + parallel = Number(process.env.S3_PARALLEL) || Number(process.env.PARALLEL) || 8, + step = 'get', + ...opt +} = {}) => { + const connector = new Connector({ + pipelineId, debug, bucketName, ...opt, + }); + + const headObject = (uow) => { + if (!uow[headRequestField]) return _(Promise.resolve(uow)); + + const p = () => connector.headObject(uow[headRequestField], uow) + .then((headResponse) => ({ ...uow, [headResponseField]: headResponse })) + .catch(rejectWithFault(uow)); + + return _(uow.metrics?.w(p, step) || p()); // wrap promise in a stream + }; + + return (s) => s + .map(headObject) + .parallel(parallel); +}; diff --git a/test/unit/queries/s3.test.js b/test/unit/queries/s3.test.js index 5736aae5..1a63cd75 100644 --- a/test/unit/queries/s3.test.js +++ b/test/unit/queries/s3.test.js @@ -12,6 +12,7 @@ import { toGetObjectRequest, toGetObjectRequest2, getObjectFromS3AsStream, + headS3Object, } from '../../../src/queries/s3'; import Connector from '../../../src/connectors/s3'; @@ -329,6 +330,68 @@ describe('queries/s3.js', () => { }) .done(done); }); + it('should head object', (done) => { + const stub = sinon.stub(Connector.prototype, 'headObject').resolves({ + Metadata: { + testkey: '1', + }, + }); + + const uows = [{ + headRequest: { + Key: 'k1', + }, + }]; + + _(uows) + .through(headS3Object()) + .collect() + .tap((collected) => { + console.log(JSON.stringify(collected, null, 2)); + + expect(stub).to.have.been.calledWith({ + Key: 'k1', + }); + + expect(collected.length).to.equal(1); + expect(collected[0]).to.deep.equal({ + headRequest: { + Key: 'k1', + }, + headResponse: { + Metadata: { + testkey: '1', + }, + }, + }); + }) + .done(done); + }); + it('should head object missing headRequestField', (done) => { + const stub = sinon.stub(Connector.prototype, 'headObject').resolves({ + Metadata: { + testkey: '1', + }, + }); + + const uows = [{ + // headRequest: { + // Key: 'k1', + // }, + }]; + + _(uows) + .through(headS3Object()) + .collect() + .tap((collected) => { + // console.log(JSON.stringify(collected, null, 2)); + + expect(collected[0]).to.deep.equal({}); + + expect(collected.length).to.equal(1); + }) + .done(done); + }); }); const GET_RESPONSE = { From 32bc3967f93cf964c49cca0c339fd0b2b004fa33 Mon Sep 17 00:00:00 2001 From: Travis Barnette Date: Fri, 11 Jul 2025 13:04:29 -0400 Subject: [PATCH 3/5] remove log --- test/unit/queries/s3.test.js | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/test/unit/queries/s3.test.js b/test/unit/queries/s3.test.js index 1a63cd75..3e141c29 100644 --- a/test/unit/queries/s3.test.js +++ b/test/unit/queries/s3.test.js @@ -347,7 +347,7 @@ describe('queries/s3.js', () => { .through(headS3Object()) .collect() .tap((collected) => { - console.log(JSON.stringify(collected, null, 2)); + // console.log(JSON.stringify(collected, null, 2)); expect(stub).to.have.been.calledWith({ Key: 'k1', From b7ef93fab654c9c7411a52a64ba745bfd7fb4f06 Mon Sep 17 00:00:00 2001 From: Travis Barnette Date: Fri, 11 Jul 2025 14:48:12 -0400 Subject: [PATCH 4/5] reformat --- src/queries/s3.js | 2 +- test/unit/queries/s3.test.js | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/src/queries/s3.js b/src/queries/s3.js index ee64c4ce..dc3683d5 100644 --- a/src/queries/s3.js +++ b/src/queries/s3.js @@ -194,7 +194,7 @@ export const pageObjectsFromS3 = ({ .parallel(parallel); }; -export const headS3Object= ({ +export const headS3Object = ({ id: pipelineId, debug = d('s3'), bucketName = process.env.BUCKET_NAME, diff --git a/test/unit/queries/s3.test.js b/test/unit/queries/s3.test.js index 3e141c29..c44cb569 100644 --- a/test/unit/queries/s3.test.js +++ b/test/unit/queries/s3.test.js @@ -368,7 +368,7 @@ describe('queries/s3.js', () => { .done(done); }); it('should head object missing headRequestField', (done) => { - const stub = sinon.stub(Connector.prototype, 'headObject').resolves({ + sinon.stub(Connector.prototype, 'headObject').resolves({ Metadata: { testkey: '1', }, From f64e1737c254bf9e5b137c5c284f9f02d52e7a08 Mon Sep 17 00:00:00 2001 From: Travis Barnette Date: Fri, 11 Jul 2025 14:49:34 -0400 Subject: [PATCH 5/5] bump version --- package-lock.json | 4 ++-- package.json | 2 +- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/package-lock.json b/package-lock.json index 76805711..0e3589aa 100644 --- a/package-lock.json +++ b/package-lock.json @@ -1,12 +1,12 @@ { "name": "aws-lambda-stream", - "version": "1.1.6", + "version": "1.1.7", "lockfileVersion": 3, "requires": true, "packages": { "": { "name": "aws-lambda-stream", - "version": "1.1.6", + "version": "1.1.7", "license": "MIT", "dependencies": { "object-sizeof": "^2.6.0" diff --git a/package.json b/package.json index 0b05acf9..a981db11 100644 --- a/package.json +++ b/package.json @@ -1,6 +1,6 @@ { "name": "aws-lambda-stream", - "version": "1.1.6", + "version": "1.1.7", "description": "Create stream processors with AWS Lambda functions.", "keywords": [ "aws",