Skip to content

Commit

Permalink
fastq_config_from_uri
Browse files Browse the repository at this point in the history
  • Loading branch information
drernie committed Dec 7, 2023
1 parent 32aa93e commit 4ef070d
Show file tree
Hide file tree
Showing 8 changed files with 83 additions and 110 deletions.
9 changes: 7 additions & 2 deletions src/constants.ts
Original file line number Diff line number Diff line change
Expand Up @@ -84,14 +84,19 @@ export class Constants {
const template = handlebars.compile(data);
data = template(env);
}
var parsed: any;

if (extension === 'yaml' || extension === 'yml') {
return yaml.load(data) as KeyedConfig;
parsed = yaml.load(data);
} else if (extension === 'json') {
return JSON.parse(data);
parsed = JSON.parse(data);
} else {
throw new Error(`Unsupported file extension: ${extension}`);
}
if (Array.isArray(parsed)) {
return parsed[0] as KeyedConfig;
}
return parsed as KeyedConfig;
}

public static async LoadPipeline(pipeline: string, env: any = {}) {
Expand Down
114 changes: 29 additions & 85 deletions src/omics-quilt.fastq.ts
Original file line number Diff line number Diff line change
@@ -1,50 +1,19 @@

import { createWriteStream, readFile } from 'fs';
import { promisify } from 'util';
import {
OmicsClient,
RunLogLevel,
StartRunCommand,
StartRunCommandInput,
WorkflowType,
} from '@aws-sdk/client-omics';
import {
GetObjectCommand,
} from '@aws-sdk/client-s3';
import { v4 as uuidv4 } from 'uuid';
import { Constants } from './constants';
import { Constants, KeyedConfig } from './constants';

const OUTPUT_S3_LOCATION = process.env.OUTPUT_S3_LOCATION!;
const OMICS_ROLE = process.env.OMICS_ROLE!;
const WORKFLOW_ID = process.env.WORKFLOW_ID!;
const LOG_LEVEL = process.env.LOG_LEVEL!;

async function download_s3_file(
bucket: string,
key: string,
local_file: string,
) {
const command = new GetObjectCommand({
Bucket: bucket,
Key: key,
});
const s3Client = Constants.DefaultS3();
const writer = createWriteStream(local_file);
const extension = local_file.split('.').pop()?.toLowerCase();

try {
const response = await s3Client.send(command);
const contents = await response.Body!.transformToString();
const data = Constants.LoadObjectData(contents, extension!);
writer.write(data);
} catch (e: any) {
if (e.code === 'NoSuchKey') {
console.error('The object does not exist.');
} else {
throw e;
}
}
}

async function start_omics_run(options: StartRunCommandInput) {
const omicsClient = new OmicsClient();
Expand All @@ -53,32 +22,19 @@ async function start_omics_run(options: StartRunCommandInput) {
return response;
}

export async function fastq_config_from_json(manifest_json_file: string) {
const contents = await promisify(readFile)(manifest_json_file, 'utf8');
console.debug(`fastq_config_from_json[${manifest_json_file}]:\n${contents}`);
const samples = JSON.parse(contents);
if (!Array.isArray(samples)) {
throw new Error(`samples is not an array: ${samples}`);
}
const samples_params = [];
for (const _sample of samples) {
if (typeof _sample !== 'object') {
throw new Error(`sample is not an object: ${_sample}`);
}
console.info(`Creating input payload for sample: ${_sample}`);
const _params: Record<string, any> = {};
_params.sample_name = _sample.sample_name;
_params.fastq_pairs = [];
_params.fastq_pairs.push({
read_group: _sample.read_group as string,
fastq_1: _sample.fastq_1 as string,
fastq_2: _sample.fastq_2 as string,
platform: _sample.platform as string,
});
samples_params.push(_params);
}

return samples_params;
export async function fastq_config_from_uri(uri: string) {
const params: Record<string, any> = {};
const sample: KeyedConfig = await Constants.LoadObjectURI(uri);
console.info(`Loaded JSON manifest:\n${JSON.stringify(sample, null, 2)}`);
params.sample_name = sample.sample_name;
params.fastq_pairs = [];
params.fastq_pairs.push({
read_group: sample.read_group as string,
fastq_1: sample.fastq_1 as string,
fastq_2: sample.fastq_2 as string,
platform: sample.platform as string,
});
return params;
}

export async function handler(event: any, context: any) {
Expand All @@ -96,38 +52,26 @@ export async function handler(event: any, context: any) {
} else {
throw new Error('Multiple s3 files in event not yet supported');
}

var local_file = context.local_file || '/tmp/sample_manifest.json';
if (!context.local_file) {
await download_s3_file(bucket_name, filename, local_file);
}
console.info(`Downloaded manifest JSON to: ${local_file}`);

const multi_sample_params = await fastq_config_from_json(local_file);
const uri = context.local_file || `s3://${bucket_name}/${filename}`;
const item = await fastq_config_from_uri(uri);
let error_count = 0;
for (const _item of multi_sample_params) {
error_count += await run_workflow(
_item,
bucket_name,
filename,
error_count,
context,
);
}
error_count += await run_workflow(
item,
uri,
context,
);

if (error_count > 0) {
throw new Error('Error launching some workflows, check logs');
}
return { message: 'Success' };
}
async function run_workflow(
_item: Record<string, string>,
bucket_name: string,
filename: string,
error_count: number,
item: Record<string, string>,
uri: string,
context: any,
) {
const _samplename = _item.sample_name;
const _samplename = item.sample_name;
console.info(`Starting workflow for sample: ${_samplename}`);
const uuid = uuidv4();
const run_name = `Sample_${_samplename}_${uuid}`;
Expand All @@ -137,13 +81,13 @@ async function run_workflow(
workflowId: WORKFLOW_ID,
name: run_name,
roleArn: OMICS_ROLE,
parameters: _item,
parameters: item,
logLevel: LOG_LEVEL as RunLogLevel,
outputUri: OUTPUT_S3_LOCATION,
tags: {
SOURCE: 'LAMBDA_WF1_FASTQ',
SOURCE: 'LAMBDA_FASTQ',
RUN_NAME: run_name,
SAMPLE_MANIFEST: `s3://${bucket_name}/${filename}`,
SAMPLE_MANIFEST: uri,
},
requestId: uuid,
};
Expand All @@ -158,7 +102,7 @@ async function run_workflow(
}
} catch (e: any) {
console.error('Error : ' + e.toString());
error_count += 1;
return 1;
}
return error_count;
return 0;
}
2 changes: 1 addition & 1 deletion test/constants.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,7 @@ describe('Constants', () => {
}

it('should return the value for a given key path', () => {
checkKeyPathValue('0.platform', 'illumina');
checkKeyPathValue('platform', 'illumina');
});

it('should return undefined if the key path does not exist', () => {
Expand Down
39 changes: 39 additions & 0 deletions test/events/event-fastq.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
{
"Records": [
{
"eventVersion": "2.1",
"eventSource": "aws:s3",
"awsRegion": "us-west-2",
"eventTime": "2023-12-07T00:01:19.408Z",
"eventName": "ObjectCreated:Put",
"userIdentity": {
"principalId": "AWS:AIDA4MFXGKBG5A5H7C2P6"
},
"requestParameters": {
"sourceIPAddress": "104.28.111.127"
},
"responseElements": {
"x-amz-request-id": "ETYMDP7H46PCZ7E5",
"x-amz-id-2": "wgTZZp3L8U7wCcWgVCIOM9Sq0JNjpAo4bn96b8h+2Rd/9Nl+HVlAeGSSmjhb0rEIhF3h3QIHoyTC2NS7qZR6flN4JEAYThSa"
},
"s3": {
"s3SchemaVersion": "1.0",
"configurationId": "MzBiMzdiNmEtZDlkNS00M2E0LWE2YmUtNzU5MWVjMDU0MjA0",
"bucket": {
"name": "omics-quilt-omicsquiltckainput850787717197uswest2-mxbkungl5wod",
"ownerIdentity": {
"principalId": "A2UU9N6TQHWEL4"
},
"arn": "arn:aws:s3:::omics-quilt-omicsquiltckainput850787717197uswest2-mxbkungl5wod"
},
"object": {
"key": "fastq/us-west-2/us-west-2.json",
"size": 435,
"eTag": "0d3747d0a320638bde5a5e4900d9a93b",
"versionId": "jOV.tQlvWPncrW6HDgLhpnZrL9KnJVAy",
"sequencer": "0065710B4F5CF19691"
}
}
}
]
}
2 changes: 1 addition & 1 deletion test/fastq-config.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ import { fastqConfig } from '../src/fastq-config';

describe('fastqConfig', () => {
it('should return the correct regional manifest', () => {
const region = 'us-test-0';
const region = 'us-west-2';
const timestamp = '2020-01-01T00:00:00.000Z';
const expected_folder = `workflows/fastq/${region}`;
const expected_file = `${expected_folder}/${region}.json`;
Expand Down
14 changes: 5 additions & 9 deletions test/omics-quilt.fastq.test.ts
Original file line number Diff line number Diff line change
@@ -1,23 +1,19 @@
// import exp from 'constants'
import { TEST_EVENT } from './fixture';
import {
fastq_config_from_json,
fastq_config_from_uri,
handler,
} from '../src/omics-quilt.fastq';


const CONTEXT = {
debug: true,
local_file: 'workflows/fastq/aws_region.json',
local_file: './workflows/fastq/aws_region.json',
};

describe('fastq_config_from_json', () => {
it('should return a list of sample params', async () => {
// Make the test function async
const result: any[] = await fastq_config_from_json(CONTEXT.local_file);
expect(result).toBeDefined();
expect(result.length).toEqual(1);
const sample = result[0];
describe('fastq_config_from_uri', () => {
it('should return a single sample', async () => {
const sample = await fastq_config_from_uri(CONTEXT.local_file);
expect(sample).toBeDefined();
expect(typeof sample).toEqual('object');
expect(sample.sample_name).toEqual('NA12878');
Expand Down
11 changes: 0 additions & 11 deletions workflows/fastq/us-test-0/us-test-0.json

This file was deleted.

2 changes: 1 addition & 1 deletion workflows/fastq/us-west-2/us-west-2.json
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,6 @@
"fastq_2": "s3://aws-genomics-static-us-west-2/omics-tutorials/data/fastq/NA12878/Sample_U0a/U0a_CGATGT_L001_R2_001.fastq.gz",
"platform": "illumina",
"region": "us-west-2",
"timestamp": "1701733322002"
"timestamp": "2020-01-01T00:00:00.000Z"
}
]

0 comments on commit 4ef070d

Please sign in to comment.