Skip to content

Commit

Permalink
remove things
Browse files Browse the repository at this point in the history
  • Loading branch information
joyceyan committed Nov 7, 2024
1 parent c436039 commit a017251
Show file tree
Hide file tree
Showing 14 changed files with 75 additions and 581 deletions.
175 changes: 40 additions & 135 deletions .happy/terraform/modules/sfn/main.tf
Original file line number Diff line number Diff line change
Expand Up @@ -154,7 +154,7 @@ resource "aws_sfn_state_machine" "state_machine" {
"Validate": {
"Type": "Task",
"Resource": "arn:aws:states:::batch:submitJob.sync",
"Next": "CxgSeuratParallel",
"Next": "Cxg",
"Parameters": {
"JobDefinition.$": "$.batch.JobDefinitionName",
"JobName": "validate",
Expand Down Expand Up @@ -194,115 +194,56 @@ resource "aws_sfn_state_machine" "state_machine" {
}
]
},
"CxgSeuratParallel": {
"Type": "Parallel",
"Next": "HandleSuccess",
"Branches": [
{
"StartAt": "Cxg",
"States": {
"Cxg": {
"Type": "Task",
"End": true,
"Resource": "arn:aws:states:::batch:submitJob.sync",
"Parameters": {
"JobDefinition.$": "$.batch.JobDefinitionName",
"JobName": "cxg",
"JobQueue.$": "$.job_queue",
"ContainerOverrides": {
"Environment": [
{
"Name": "DATASET_VERSION_ID",
"Value.$": "$.dataset_version_id"
},
{
"Name": "STEP_NAME",
"Value": "cxg"
}
]
}
},
"Retry": [ {
"ErrorEquals": ["AWS.Batch.TooManyRequestsException", "Batch.BatchException", "Batch.AWSBatchException"],
"IntervalSeconds": 2,
"MaxAttempts": 7,
"BackoffRate": 5
} ],
"Catch": [
{
"ErrorEquals": [
"States.ALL"
],
"Next": "CatchCxgFailure",
"ResultPath": "$.error"
}
],
"ResultPath": null,
"TimeoutSeconds": 360000
"Cxg": {
"Type": "Task",
"End": true,
"Resource": "arn:aws:states:::batch:submitJob.sync",
"Parameters": {
"JobDefinition.$": "$.batch.JobDefinitionName",
"JobName": "cxg",
"JobQueue.$": "$.job_queue",
"ContainerOverrides": {
"Environment": [
{
"Name": "DATASET_VERSION_ID",
"Value.$": "$.dataset_version_id"
},
"CatchCxgFailure": {
"Type": "Pass",
"End": true
{
"Name": "STEP_NAME",
"Value": "cxg"
}
}
},
]
}
},
"Retry": [ {
"ErrorEquals": ["AWS.Batch.TooManyRequestsException", "Batch.BatchException", "Batch.AWSBatchException"],
"IntervalSeconds": 2,
"MaxAttempts": 7,
"BackoffRate": 5
} ],
"Catch": [
{
"StartAt": "Seurat",
"States": {
"Seurat": {
"Type": "Task",
"End": true,
"Resource": "arn:aws:states:::batch:submitJob.sync",
"Parameters": {
"JobDefinition.$": "$.batch.JobDefinitionName",
"JobName": "seurat",
"JobQueue.$": "$.job_queue",
"ContainerOverrides": {
"Environment": [
{
"Name": "DATASET_VERSION_ID",
"Value.$": "$.dataset_version_id"
},
{
"Name": "STEP_NAME",
"Value": "seurat"
}
]
}
},
"Retry": [ {
"ErrorEquals": ["AWS.Batch.TooManyRequestsException", "Batch.BatchException", "Batch.AWSBatchException"],
"IntervalSeconds": 2,
"MaxAttempts": 7,
"BackoffRate": 5
} ],
"Catch": [
{
"ErrorEquals": [
"States.ALL"
],
"Next": "CatchSeuratFailure",
"ResultPath": "$.error"
}
],
"TimeoutSeconds": ${local.timeout}
},
"CatchSeuratFailure": {
"Type": "Pass",
"End": true
}
}
"ErrorEquals": [
"States.ALL"
],
"Next": "CatchCxgFailure",
"ResultPath": "$.error"
}
]
],
"ResultPath": null,
"TimeoutSeconds": 360000
},
"CatchCxgFailure": {
"Type": "Pass",
"End": true
},
"HandleSuccess": {
"Type": "Task",
"InputPath": "$",
"Resource": "${var.lambda_success_handler}",
"Parameters": {
"execution_id.$": "$$.Execution.Id",
"cxg_job.$": "$[0]",
"seurat_job.$": "$[1]"
"cxg_job.$": "$[0]"
},
"Retry": [ {
"ErrorEquals": ["Lambda.AWSLambdaException"],
Expand Down Expand Up @@ -403,42 +344,6 @@ resource "aws_sfn_state_machine" "state_machine" {
EOF
}

resource "aws_sfn_state_machine" "state_machine_seurat" {
name = "dp-${var.deployment_stage}-${var.custom_stack_name}-seurat-sfn"
role_arn = var.role_arn

definition = <<EOF
{
"StartAt": "Seurat",
"States": {
"Seurat": {
"Type": "Task",
"End": true,
"Resource": "arn:aws:states:::batch:submitJob.sync",
"Parameters": {
"JobDefinition": "${var.job_definition_arn}",
"JobName": "seurat",
"JobQueue": "${var.job_queue_arn}",
"ContainerOverrides": {
"Environment": [
{
"Name": "DATASET_VERSION_ID",
"Value.$": "$.dataset_version_id"
},
{
"Name": "STEP_NAME",
"Value": "seurat"
}
]
}
},
"TimeoutSeconds": ${local.timeout}
}
}
}
EOF
}

resource "aws_sfn_state_machine" "state_machine_cxg_remaster" {
name = "dp-${var.deployment_stage}-${var.custom_stack_name}-cxg-remaster-v2-sfn"
role_arn = var.role_arn
Expand Down
81 changes: 2 additions & 79 deletions backend/layers/processing/dataset_metadata_update.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,6 @@

import scanpy
import tiledb
from rpy2.robjects import StrVector
from rpy2.robjects.packages import importr

from backend.common.utils.corpora_constants import CorporaConstants
Expand All @@ -34,7 +33,6 @@
from backend.layers.thirdparty.uri_provider import UriProvider

base = importr("base")
seurat = importr("SeuratObject")

configure_logging(level=logging.INFO)

Expand Down Expand Up @@ -136,48 +134,6 @@ def update_h5ad(
finally:
os.remove(h5ad_filename)

def update_rds(
self,
rds_uri: str,
new_key_prefix: str,
new_dataset_version_id: DatasetVersionId,
metadata_update: DatasetArtifactMetadataUpdate,
):
seurat_filename = self.download_from_source_uri(
source_uri=rds_uri,
local_path=CorporaConstants.LABELED_RDS_ARTIFACT_FILENAME,
)

try:
self.update_processing_status(
new_dataset_version_id, DatasetStatusKey.RDS, DatasetConversionStatus.CONVERTING
)

rds_object = base.readRDS(seurat_filename)

for key, val in metadata_update.as_dict_without_none_values().items():
seurat_metadata = seurat.Misc(object=rds_object)
if seurat_metadata.rx2[key]:
val = val if isinstance(val, list) else [val]
seurat_metadata[seurat_metadata.names.index(key)] = StrVector(val)

base.saveRDS(rds_object, file=seurat_filename)

self.create_artifact(
seurat_filename,
DatasetArtifactType.RDS,
new_key_prefix,
new_dataset_version_id,
self.artifact_bucket,
DatasetStatusKey.RDS,
datasets_bucket=self.datasets_bucket,
)
self.update_processing_status(
new_dataset_version_id, DatasetStatusKey.RDS, DatasetConversionStatus.CONVERTED
)
finally:
os.remove(seurat_filename)

def update_cxg(
self,
cxg_uri: str,
Expand Down Expand Up @@ -257,19 +213,6 @@ def update_h5ad(
metadata_update,
)

@staticmethod
def update_rds(
artifact_bucket: str,
datasets_bucket: str,
rds_uri: str,
new_key_prefix: str,
new_dataset_version_id: DatasetVersionId,
metadata_update: DatasetArtifactMetadataUpdate,
):
DatasetMetadataUpdaterWorker(artifact_bucket, datasets_bucket).update_rds(
rds_uri, new_key_prefix, new_dataset_version_id, metadata_update
)

@staticmethod
def update_cxg(
artifact_bucket: str,
Expand Down Expand Up @@ -353,28 +296,8 @@ def update_metadata(
self.logger.error(f"Cannot find labeled H5AD artifact uri for {current_dataset_version_id}.")
self.update_processing_status(new_dataset_version_id, DatasetStatusKey.H5AD, DatasetConversionStatus.FAILED)

if DatasetArtifactType.RDS in artifact_uris:
self.logger.info("Main: Starting thread for rds update")
rds_job = Process(
target=DatasetMetadataUpdater.update_rds,
args=(
self.artifact_bucket,
self.datasets_bucket,
artifact_uris[DatasetArtifactType.RDS],
new_artifact_key_prefix,
new_dataset_version_id,
metadata_update,
),
)
artifact_jobs.append(rds_job)
rds_job.start()
elif current_dataset_version.status.rds_status == DatasetConversionStatus.SKIPPED:
self.update_processing_status(new_dataset_version_id, DatasetStatusKey.RDS, DatasetConversionStatus.SKIPPED)
else:
self.logger.error(
f"Cannot find RDS artifact uri for {current_dataset_version_id}, and Conversion Status is not SKIPPED."
)
self.update_processing_status(new_dataset_version_id, DatasetStatusKey.RDS, DatasetConversionStatus.FAILED)
# Mark all RDS conversions as skipped
self.update_processing_status(new_dataset_version_id, DatasetStatusKey.RDS, DatasetConversionStatus.SKIPPED)

if DatasetArtifactType.CXG in artifact_uris:
self.logger.info("Main: Starting thread for cxg update")
Expand Down
21 changes: 0 additions & 21 deletions backend/layers/processing/make_seurat.R

This file was deleted.

7 changes: 0 additions & 7 deletions backend/layers/processing/process.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,6 @@
from backend.layers.processing.process_cxg import ProcessCxg
from backend.layers.processing.process_download import ProcessDownload
from backend.layers.processing.process_logic import ProcessingLogic
from backend.layers.processing.process_seurat import ProcessSeurat
from backend.layers.processing.process_validate import ProcessValidate
from backend.layers.processing.schema_migration import SchemaMigrate
from backend.layers.thirdparty.s3_provider import S3Provider, S3ProviderInterface
Expand All @@ -44,7 +43,6 @@ class ProcessMain(ProcessingLogic):
"""

process_validate: ProcessValidate
process_seurat: ProcessSeurat
process_cxg: ProcessCxg

def __init__(
Expand All @@ -63,7 +61,6 @@ def __init__(
self.process_validate = ProcessValidate(
self.business_logic, self.uri_provider, self.s3_provider, self.schema_validator
)
self.process_seurat = ProcessSeurat(self.business_logic, self.uri_provider, self.s3_provider)
self.process_cxg = ProcessCxg(self.business_logic, self.uri_provider, self.s3_provider)
self.schema_migrate = SchemaMigrate(self.business_logic, self.schema_validator)

Expand Down Expand Up @@ -115,8 +112,6 @@ def process(
self.process_cxg.process(dataset_version_id, artifact_bucket, cxg_bucket)
elif step_name == "cxg_remaster":
self.process_cxg.process(dataset_version_id, artifact_bucket, cxg_bucket, is_reprocess=True)
elif step_name == "seurat":
self.process_seurat.process(dataset_version_id, artifact_bucket, datasets_bucket)
else:
self.logger.error(f"Step function configuration error: Unexpected STEP_NAME '{step_name}'")

Expand All @@ -143,8 +138,6 @@ def process(
self.logger.exception(f"An unexpected error occurred while processing the data set: {e}")
if step_name in ["validate", "download"]:
self.update_processing_status(dataset_version_id, DatasetStatusKey.UPLOAD, DatasetUploadStatus.FAILED)
elif step_name == "seurat":
self.update_processing_status(dataset_version_id, DatasetStatusKey.RDS, DatasetConversionStatus.FAILED)
elif step_name == "cxg" or step_name == "cxg_remaster":
self.update_processing_status(dataset_version_id, DatasetStatusKey.CXG, DatasetConversionStatus.FAILED)
return False
Expand Down
2 changes: 1 addition & 1 deletion backend/layers/processing/process_cxg.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ class ProcessCxg(ProcessingLogic):
1. Download the labeled h5ad artifact from S3 (uploaded by DownloadAndValidate)
2. Convert to cxg
3. Upload the cxg artifact (a directory) to S3
If this step completes successfully, and ProcessSeurat is completed, the handle_success lambda will be invoked
If this step completes successfully, the handle_success lambda will be invoked
If this step fails, the handle_failures lambda will be invoked
"""

Expand Down
Loading

0 comments on commit a017251

Please sign in to comment.