Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: notify omics result #4

Merged
merged 10 commits into from
Dec 7, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .github/workflows/build.yml

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion .pre-commit-config.yaml
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
exclude: ".projen*|.git*|test/__*|benchling_packager.yaml"
exclude: ".projen*|.git*|test/__*|ts*|LICENSE|.npm*|cdk*"
repos:
- repo: https://github.com/pre-commit/pre-commit-hooks
rev: v4.5.0
Expand Down
2 changes: 1 addition & 1 deletion .projenrc.ts
Original file line number Diff line number Diff line change
Expand Up @@ -29,9 +29,9 @@ const project = new awscdk.AwsCdkTypeScriptApp({
project.tryFindObjectFile('.github/workflows/build.yml')!.addOverride('jobs.build.env', {
CI: 'true',
AWS_ACCESS_KEY_ID: '${{ secrets.AWS_ACCESS_KEY_ID }}',
AWS_SECRET_ACCESS_KEY: '${{ secrets.AWS_SECRET_ACCESS_KEY }}',
AWS_ACCOUNT_ID: '${{ secrets.AWS_ACCOUNT_ID }}',
AWS_DEFAULT_REGION: '${{ secrets.AWS_DEFAULT_REGION }}',
AWS_SECRET_ACCESS_KEY: '${{ secrets.AWS_SECRET_ACCESS_KEY }}',
CDK_APP_NAME: '${{ secrets.CDK_APP_NAME }}',
CDK_DEFAULT_ACCOUNT: '${{ secrets.AWS_ACCOUNT_ID }}',
CDK_DEFAULT_REGION: '${{ secrets.AWS_DEFAULT_REGION }}',
Expand Down
28 changes: 28 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
@@ -1,2 +1,30 @@
# omics-quilt-demo

Use CDK to create Quilt packages from AWS HealthOmics

## Usage

```bash
aws configure list-profiles
yarn install
npm run deploy
```

## Development

Uses [pre-commit](https://pre-commit.com/) to pre-lint files.

```bash
pre-commit install
pre-commit run --all-files
```

Uses [projen](https://github.com/projen/projen) to manage project files.

```bash
yarn install
npm run projen
npm run eslint
npm run build
npm run test:watch
```
6 changes: 3 additions & 3 deletions package.json

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

9 changes: 7 additions & 2 deletions src/constants.ts
Original file line number Diff line number Diff line change
Expand Up @@ -84,14 +84,19 @@ export class Constants {
const template = handlebars.compile(data);
data = template(env);
}
var parsed: any;

if (extension === 'yaml' || extension === 'yml') {
return yaml.load(data) as KeyedConfig;
parsed = yaml.load(data);
} else if (extension === 'json') {
return JSON.parse(data);
parsed = JSON.parse(data);
} else {
throw new Error(`Unsupported file extension: ${extension}`);
}
if (Array.isArray(parsed)) {
return parsed[0] as KeyedConfig;
}
return parsed as KeyedConfig;
}

public static async LoadPipeline(pipeline: string, env: any = {}) {
Expand Down
File renamed without changes.
114 changes: 29 additions & 85 deletions src/omics-quilt.fastq.ts
Original file line number Diff line number Diff line change
@@ -1,50 +1,19 @@

import { createWriteStream, readFile } from 'fs';
import { promisify } from 'util';
import {
OmicsClient,
RunLogLevel,
StartRunCommand,
StartRunCommandInput,
WorkflowType,
} from '@aws-sdk/client-omics';
import {
GetObjectCommand,
} from '@aws-sdk/client-s3';
import { v4 as uuidv4 } from 'uuid';
import { Constants } from './constants';
import { Constants, KeyedConfig } from './constants';

const OUTPUT_S3_LOCATION = process.env.OUTPUT_S3_LOCATION!;
const OMICS_ROLE = process.env.OMICS_ROLE!;
const WORKFLOW_ID = process.env.WORKFLOW_ID!;
const LOG_LEVEL = process.env.LOG_LEVEL!;

async function download_s3_file(
bucket: string,
key: string,
local_file: string,
) {
const command = new GetObjectCommand({
Bucket: bucket,
Key: key,
});
const s3Client = Constants.DefaultS3();
const writer = createWriteStream(local_file);
const extension = local_file.split('.').pop()?.toLowerCase();

try {
const response = await s3Client.send(command);
const contents = await response.Body!.transformToString();
const data = Constants.LoadObjectData(contents, extension!);
writer.write(data);
} catch (e: any) {
if (e.code === 'NoSuchKey') {
console.error('The object does not exist.');
} else {
throw e;
}
}
}

async function start_omics_run(options: StartRunCommandInput) {
const omicsClient = new OmicsClient();
Expand All @@ -53,32 +22,19 @@ async function start_omics_run(options: StartRunCommandInput) {
return response;
}

export async function fastq_config_from_json(manifest_json_file: string) {
const contents = await promisify(readFile)(manifest_json_file, 'utf8');
console.debug(`fastq_config_from_json[${manifest_json_file}]:\n${contents}`);
const samples = JSON.parse(contents);
if (!Array.isArray(samples)) {
throw new Error(`samples is not an array: ${samples}`);
}
const samples_params = [];
for (const _sample of samples) {
if (typeof _sample !== 'object') {
throw new Error(`sample is not an object: ${_sample}`);
}
console.info(`Creating input payload for sample: ${_sample}`);
const _params: Record<string, any> = {};
_params.sample_name = _sample.sample_name;
_params.fastq_pairs = [];
_params.fastq_pairs.push({
read_group: _sample.read_group as string,
fastq_1: _sample.fastq_1 as string,
fastq_2: _sample.fastq_2 as string,
platform: _sample.platform as string,
});
samples_params.push(_params);
}

return samples_params;
export async function fastq_config_from_uri(uri: string) {
const params: Record<string, any> = {};
const sample: KeyedConfig = await Constants.LoadObjectURI(uri);
console.info(`Loaded JSON manifest:\n${JSON.stringify(sample, null, 2)}`);
params.sample_name = sample.sample_name;
params.fastq_pairs = [];
params.fastq_pairs.push({
read_group: sample.read_group as string,
fastq_1: sample.fastq_1 as string,
fastq_2: sample.fastq_2 as string,
platform: sample.platform as string,
});
return params;
}

export async function handler(event: any, context: any) {
Expand All @@ -96,38 +52,26 @@ export async function handler(event: any, context: any) {
} else {
throw new Error('Multiple s3 files in event not yet supported');
}

var local_file = context.local_file || '/tmp/sample_manifest.json';
if (!context.local_file) {
await download_s3_file(bucket_name, filename, local_file);
}
console.info(`Downloaded manifest JSON to: ${local_file}`);

const multi_sample_params = await fastq_config_from_json(local_file);
const uri = context.local_file || `s3://${bucket_name}/${filename}`;
const item = await fastq_config_from_uri(uri);
let error_count = 0;
for (const _item of multi_sample_params) {
error_count += await run_workflow(
_item,
bucket_name,
filename,
error_count,
context,
);
}
error_count += await run_workflow(
item,
uri,
context,
);

if (error_count > 0) {
throw new Error('Error launching some workflows, check logs');
}
return { message: 'Success' };
}
async function run_workflow(
_item: Record<string, string>,
bucket_name: string,
filename: string,
error_count: number,
item: Record<string, string>,
uri: string,
context: any,
) {
const _samplename = _item.sample_name;
const _samplename = item.sample_name;
console.info(`Starting workflow for sample: ${_samplename}`);
const uuid = uuidv4();
const run_name = `Sample_${_samplename}_${uuid}`;
Expand All @@ -137,13 +81,13 @@ async function run_workflow(
workflowId: WORKFLOW_ID,
name: run_name,
roleArn: OMICS_ROLE,
parameters: _item,
parameters: item,
logLevel: LOG_LEVEL as RunLogLevel,
outputUri: OUTPUT_S3_LOCATION,
tags: {
SOURCE: 'LAMBDA_WF1_FASTQ',
SOURCE: 'LAMBDA_FASTQ',
RUN_NAME: run_name,
SAMPLE_MANIFEST: `s3://${bucket_name}/${filename}`,
SAMPLE_MANIFEST: uri,
},
requestId: uuid,
};
Expand All @@ -158,7 +102,7 @@ async function run_workflow(
}
} catch (e: any) {
console.error('Error : ' + e.toString());
error_count += 1;
return 1;
}
return error_count;
return 0;
}
97 changes: 66 additions & 31 deletions src/omics-quilt.ts
Original file line number Diff line number Diff line change
Expand Up @@ -18,13 +18,13 @@ import {
BucketEncryption,
} from 'aws-cdk-lib/aws-s3';
import { Topic } from 'aws-cdk-lib/aws-sns';
import { EmailSubscription } from 'aws-cdk-lib/aws-sns-subscriptions';
import { type Construct } from 'constructs';
import { Constants } from './constants';

export class OmicsQuiltStack extends Stack {
public readonly inputBucket: Bucket;
public readonly outputBucket: Bucket;
public readonly statusTopic: Topic;

public readonly manifest_prefix: string;
public readonly manifest_suffix: string;
Expand All @@ -46,17 +46,58 @@ export class OmicsQuiltStack extends Stack {
this.inputBucket = this.makeBucket('input');
this.outputBucket = this.makeBucket('output');

// SNS Topic for failure notifications
const topicName = `${this.cc.app}_workflow_status_topic`;
this.statusTopic = new Topic(this, topicName, {
// SNS Topic for Workflow notifications
this.makeStatusNotifications(this.principal); // for debugging purposes

// Create an IAM service role for HealthOmics workflows
this.omicsRole = this.makeOmicsRole();

// Create an IAM role for the Lambda functions
this.lambdaRole = this.makeLambdaRole();

// Create Lambda function to submit initial HealthOmics workflow
const fastqLambda = this.makeLambda('fastq', {});
// Add S3 event source to Lambda
const fastqTrigger = new S3EventSource(this.inputBucket, {
events: [EventType.OBJECT_CREATED],
filters: [
{ prefix: this.manifest_prefix, suffix: this.manifest_suffix },
],
});
fastqLambda.addEventSource(fastqTrigger);
}

private makeStatusNotifications(principal: AccountPrincipal) {
const topicName = `${this.cc.app}-status-topic`;
const statusTopic = new Topic(this, topicName, {
displayName: topicName,
topicName: topicName,
});
const email = this.cc.get('CDK_DEFAULT_EMAIL');
statusTopic.addSubscription(new EmailSubscription(email));
const servicePrincipal = new ServicePrincipal('events.amazonaws.com');
statusTopic.grantPublish(servicePrincipal);
statusTopic.grantPublish(principal);

// Create EventBridge rule to detect Omics status changes
const omicsRule = this.makeOmicsRule(`${topicName}-omics-rule`);
omicsRule.addTarget(new SnsTopic(statusTopic));
// Create EventBridge rule to detect S3 bucket events
const inputBucketRule = this.makeBucketEventRule(
this.inputBucket,
`${topicName}-input-bucket-rule`,
);
inputBucketRule.addTarget(new SnsTopic(statusTopic));
const outputBucketRule = this.makeBucketEventRule(
this.outputBucket,
`${topicName}-output-bucket-rule`,
);
outputBucketRule.addTarget(new SnsTopic(statusTopic));
return statusTopic;
}

// Create an EventBridge rule that sends SNS notification on failure
const ruleWorkflowStatusTopic = new Rule(
this,
`${this.cc.app}_rule_workflow_status_topic`,
private makeOmicsRule(ruleName: string) {
const ruleOmics = new Rule(this, ruleName,
{
eventPattern: {
source: ['aws.omics'],
Expand All @@ -67,30 +108,24 @@ export class OmicsQuiltStack extends Stack {
},
},
);
return ruleOmics;
}

ruleWorkflowStatusTopic.addTarget(new SnsTopic(this.statusTopic));

const servicePrincipal = new ServicePrincipal('events.amazonaws.com');
this.statusTopic.grantPublish(servicePrincipal);
this.statusTopic.grantPublish(this.principal); // for debugging purposes

// Create an IAM service role for HealthOmics workflows
this.omicsRole = this.makeOmicsRole();

// Create an IAM role for the Lambda functions
this.lambdaRole = this.makeLambdaRole();

// Create Lambda function to submit initial HealthOmics workflow
const fastqWorkflowLambda = this.makeLambda('fastq', {});
// Add S3 event source to Lambda
fastqWorkflowLambda.addEventSource(
new S3EventSource(this.inputBucket, {
events: [EventType.OBJECT_CREATED],
filters: [
{ prefix: this.manifest_prefix, suffix: this.manifest_suffix },
],
}),
);
private makeBucketEventRule(bucket: Bucket, ruleName: string) {
const rule = new Rule(this, ruleName, {
eventPattern: {
source: ['aws.s3'],
detailType: ['AWS API Call via CloudTrail'],
detail: {
eventSource: ['s3.amazonaws.com'],
eventName: ['PutObject', 'CompleteMultipartUpload'],
requestParameters: {
bucketName: [bucket.bucketName],
},
},
},
});
return rule;
}

private makeBucket(type: string) {
Expand Down
Loading