Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 10 additions & 0 deletions src/connectors/s3.js
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,16 @@ class Connector {
.then(async (response) => ({ ...response, Body: await response.Body.transformToString() }));
}

getObjectAsByteArray(inputParams, ctx) {
const params = {
Bucket: this.bucketName,
...inputParams,
};

return this._sendCommand(new GetObjectCommand(params), ctx)
.then(async (response) => ({ ...response, Body: await response.Body.transformToByteArray() }));
}

getObjectStream(inputParams, ctx) {
const params = {
Bucket: this.bucketName,
Expand Down
29 changes: 29 additions & 0 deletions src/queries/s3.js
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,35 @@ export const getObjectFromS3AsStream = ({
.flatMap(getObject);
};

export const getObjectFromS3AsByteArray = ({
id: pipelineId,
debug = d('s3'),
bucketName = process.env.BUCKET_NAME,
getRequestField = 'getRequest',
getResponseField = 'getResponse',
parallel = Number(process.env.S3_PARALLEL) || Number(process.env.PARALLEL) || 8,
step = 'get',
...opt
} = {}) => {
const connector = new Connector({
pipelineId, debug, bucketName, ...opt,
});

const getObject = (uow) => {
if (!uow[getRequestField]) return _(Promise.resolve(uow));

const p = () => connector.getObjectAsByteArray(uow[getRequestField], uow)
.then((getResponse) => ({ ...uow, [getResponseField]: getResponse })) // TODO decompress
.catch(rejectWithFault(uow));

return _(uow.metrics?.w(p, step) || p()); // wrap promise in a stream
};

return (s) => s
.map(getObject)
.parallel(parallel);
};

export const splitS3Object = ({
delimiter = '\n',
getResponseField = 'getResponse',
Expand Down
20 changes: 20 additions & 0 deletions test/unit/connectors/s3.test.js
Original file line number Diff line number Diff line change
Expand Up @@ -95,6 +95,26 @@ describe('connectors/s3.js', () => {
});
expect(data).to.deep.equal({ Body: 'b' });
});
it('should get object as byte array', async () => {
const arr = new Uint8Array([104, 101, 108, 108, 111]);
const spy = sinon.spy(() => ({ Body: sdkStreamMixin(Readable.from('hello')) }));
mockS3.on(GetObjectCommand).callsFake(spy);

const inputParams = {
Key: 'k1',
};

const data = await new Connector({
debug: debug('s3'),
bucketName: 'b1',
}).getObjectAsByteArray(inputParams);

expect(spy).to.have.been.calledWith({
Bucket: 'b1',
Key: 'k1',
});
expect(data).to.deep.equal({ Body: arr });
});

it('should get object as stream', (done) => {
const spy = sinon.spy(() => ({ Body: sdkStreamMixin(Readable.from(Buffer.from('data'))) }));
Expand Down
58 changes: 58 additions & 0 deletions test/unit/queries/s3.test.js
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ import {
toGetObjectRequest2,
getObjectFromS3AsStream,
headS3Object,
getObjectFromS3AsByteArray,
} from '../../../src/queries/s3';

import Connector from '../../../src/connectors/s3';
Expand Down Expand Up @@ -54,6 +55,63 @@ describe('queries/s3.js', () => {
.done(done);
});

it('should get object as byte array', (done) => {
const hello = new Uint8Array([104, 101, 108, 108, 111]);
const stub = sinon.stub(Connector.prototype, 'getObjectAsByteArray').resolves({
Body: hello,
});

const uows = [{
getRequest: {
Key: 'k1',
},
}];

_(uows)
.through(getObjectFromS3AsByteArray())
.collect()
.tap((collected) => {
// console.log(JSON.stringify(collected, null, 2));

expect(stub).to.have.been.calledWith({
Key: 'k1',
});

expect(collected.length).to.equal(1);
expect(collected[0]).to.deep.equal({
getRequest: {
Key: 'k1',
},
getResponse: {
Body: hello,
},
});
})
.done(done);
});
it('should get object as byte array - missing get request field', (done) => {
const hello = new Uint8Array([104, 101, 108, 108, 111]);
const stub = sinon.stub(Connector.prototype, 'getObjectAsByteArray').resolves({
Body: hello,
});

const uows = [{
// missing get request
}];

_(uows)
.through(getObjectFromS3AsByteArray())
.collect()
.tap((collected) => {
// console.log(JSON.stringify(collected, null, 2));

expect(stub).to.have.not.been.called;
expect(collected.length).to.equal(1);
expect(collected[0]).to.deep.equal({});
})
.done(done);
});

it('should get and split object', (done) => {
sinon.stub(Connector.prototype, 'getObject').resolves(GET_RESPONSE);

Expand Down