diff --git a/src/connectors/s3.js b/src/connectors/s3.js index d7468d1..2993a2d 100644 --- a/src/connectors/s3.js +++ b/src/connectors/s3.js @@ -84,6 +84,16 @@ class Connector { .then(async (response) => ({ ...response, Body: await response.Body.transformToString() })); } + getObjectAsByteArray(inputParams, ctx) { + const params = { + Bucket: this.bucketName, + ...inputParams, + }; + + return this._sendCommand(new GetObjectCommand(params), ctx) + .then(async (response) => ({ ...response, Body: await response.Body.transformToByteArray() })); + } + getObjectStream(inputParams, ctx) { const params = { Bucket: this.bucketName, diff --git a/src/queries/s3.js b/src/queries/s3.js index dc3683d..00d296e 100644 --- a/src/queries/s3.js +++ b/src/queries/s3.js @@ -80,6 +80,35 @@ export const getObjectFromS3AsStream = ({ .flatMap(getObject); }; +export const getObjectFromS3AsByteArray = ({ + id: pipelineId, + debug = d('s3'), + bucketName = process.env.BUCKET_NAME, + getRequestField = 'getRequest', + getResponseField = 'getResponse', + parallel = Number(process.env.S3_PARALLEL) || Number(process.env.PARALLEL) || 8, + step = 'get', + ...opt +} = {}) => { + const connector = new Connector({ + pipelineId, debug, bucketName, ...opt, + }); + + const getObject = (uow) => { + if (!uow[getRequestField]) return _(Promise.resolve(uow)); + + const p = () => connector.getObjectAsByteArray(uow[getRequestField], uow) + .then((getResponse) => ({ ...uow, [getResponseField]: getResponse })) // TODO decompress + .catch(rejectWithFault(uow)); + + return _(uow.metrics?.w(p, step) || p()); // wrap promise in a stream + }; + + return (s) => s + .map(getObject) + .parallel(parallel); +}; + export const splitS3Object = ({ delimiter = '\n', getResponseField = 'getResponse', diff --git a/test/unit/connectors/s3.test.js b/test/unit/connectors/s3.test.js index e12b4c6..baf11be 100644 --- a/test/unit/connectors/s3.test.js +++ b/test/unit/connectors/s3.test.js @@ -95,6 +95,26 @@ describe('connectors/s3.js', () => { }); expect(data).to.deep.equal({ Body: 'b' }); }); + it('should get object as byte array', async () => { + const arr = new Uint8Array([104, 101, 108, 108, 111]); + const spy = sinon.spy(() => ({ Body: sdkStreamMixin(Readable.from('hello')) })); + mockS3.on(GetObjectCommand).callsFake(spy); + + const inputParams = { + Key: 'k1', + }; + + const data = await new Connector({ + debug: debug('s3'), + bucketName: 'b1', + }).getObjectAsByteArray(inputParams); + + expect(spy).to.have.been.calledWith({ + Bucket: 'b1', + Key: 'k1', + }); + expect(data).to.deep.equal({ Body: arr }); + }); it('should get object as stream', (done) => { const spy = sinon.spy(() => ({ Body: sdkStreamMixin(Readable.from(Buffer.from('data'))) })); diff --git a/test/unit/queries/s3.test.js b/test/unit/queries/s3.test.js index c44cb56..4251599 100644 --- a/test/unit/queries/s3.test.js +++ b/test/unit/queries/s3.test.js @@ -13,6 +13,7 @@ import { toGetObjectRequest2, getObjectFromS3AsStream, headS3Object, + getObjectFromS3AsByteArray, } from '../../../src/queries/s3'; import Connector from '../../../src/connectors/s3'; @@ -54,6 +55,63 @@ describe('queries/s3.js', () => { .done(done); }); + it('should get object as byte array', (done) => { + const hello = new Uint8Array([104, 101, 108, 108, 111]); + const stub = sinon.stub(Connector.prototype, 'getObjectAsByteArray').resolves({ + Body: hello, + }); + + const uows = [{ + getRequest: { + Key: 'k1', + }, + }]; + + _(uows) + .through(getObjectFromS3AsByteArray()) + .collect() + .tap((collected) => { + // console.log(JSON.stringify(collected, null, 2)); + + expect(stub).to.have.been.calledWith({ + Key: 'k1', + }); + + expect(collected.length).to.equal(1); + expect(collected[0]).to.deep.equal({ + getRequest: { + Key: 'k1', + }, + getResponse: { + Body: hello, + }, + }); + }) + .done(done); + }); + it('should get object as byte array - missing get request field', (done) => { + const hello = new Uint8Array([104, 101, 108, 108, 111]); + const stub = sinon.stub(Connector.prototype, 'getObjectAsByteArray').resolves({ + Body: hello, + }); + + const uows = [{ + // missing get request + }]; + + _(uows) + .through(getObjectFromS3AsByteArray()) + .collect() + .tap((collected) => { + // console.log(JSON.stringify(collected, null, 2)); + + expect(stub).to.have.not.been.called; + expect(collected.length).to.equal(1); + expect(collected[0]).to.deep.equal({}); + }) + .done(done); + }); + it('should get and split object', (done) => { sinon.stub(Connector.prototype, 'getObject').resolves(GET_RESPONSE);