Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: remove seurat [do not merge yet] #7380

Closed
wants to merge 12 commits into from
176 changes: 40 additions & 136 deletions .happy/terraform/modules/sfn/main.tf
Original file line number Diff line number Diff line change
@@ -1,4 +1,3 @@
# Same file as https://github.com/chanzuckerberg/single-cell-infra/blob/main/.happy/terraform/modules/sfn/main.tf
# This is used for environment (dev, staging, prod) deployments
locals {
timeout = 86400 # 24 hours
Expand Down Expand Up @@ -154,7 +153,7 @@ resource "aws_sfn_state_machine" "state_machine" {
"Validate": {
"Type": "Task",
"Resource": "arn:aws:states:::batch:submitJob.sync",
"Next": "CxgSeuratParallel",
"Next": "Cxg",
"Parameters": {
"JobDefinition.$": "$.batch.JobDefinitionName",
"JobName": "validate",
Expand Down Expand Up @@ -194,115 +193,56 @@ resource "aws_sfn_state_machine" "state_machine" {
}
]
},
"CxgSeuratParallel": {
"Type": "Parallel",
"Cxg": {
"Type": "Task",
"Next": "HandleSuccess",
"Branches": [
{
"StartAt": "Cxg",
"States": {
"Cxg": {
"Type": "Task",
"End": true,
"Resource": "arn:aws:states:::batch:submitJob.sync",
"Parameters": {
"JobDefinition.$": "$.batch.JobDefinitionName",
"JobName": "cxg",
"JobQueue.$": "$.job_queue",
"ContainerOverrides": {
"Environment": [
{
"Name": "DATASET_VERSION_ID",
"Value.$": "$.dataset_version_id"
},
{
"Name": "STEP_NAME",
"Value": "cxg"
}
]
}
},
"Retry": [ {
"ErrorEquals": ["AWS.Batch.TooManyRequestsException", "Batch.BatchException", "Batch.AWSBatchException"],
"IntervalSeconds": 2,
"MaxAttempts": 7,
"BackoffRate": 5
} ],
"Catch": [
{
"ErrorEquals": [
"States.ALL"
],
"Next": "CatchCxgFailure",
"ResultPath": "$.error"
}
],
"ResultPath": null,
"TimeoutSeconds": 360000
"Resource": "arn:aws:states:::batch:submitJob.sync",
"Parameters": {
"JobDefinition.$": "$.batch.JobDefinitionName",
"JobName": "cxg",
"JobQueue.$": "$.job_queue",
"ContainerOverrides": {
"Environment": [
{
"Name": "DATASET_VERSION_ID",
"Value.$": "$.dataset_version_id"
},
"CatchCxgFailure": {
"Type": "Pass",
"End": true
{
"Name": "STEP_NAME",
"Value": "cxg"
}
}
},
]
}
},
"Retry": [ {
"ErrorEquals": ["AWS.Batch.TooManyRequestsException", "Batch.BatchException", "Batch.AWSBatchException"],
"IntervalSeconds": 2,
"MaxAttempts": 7,
"BackoffRate": 5
} ],
"Catch": [
{
"StartAt": "Seurat",
"States": {
"Seurat": {
"Type": "Task",
"End": true,
"Resource": "arn:aws:states:::batch:submitJob.sync",
"Parameters": {
"JobDefinition.$": "$.batch.JobDefinitionName",
"JobName": "seurat",
"JobQueue.$": "$.job_queue",
"ContainerOverrides": {
"Environment": [
{
"Name": "DATASET_VERSION_ID",
"Value.$": "$.dataset_version_id"
},
{
"Name": "STEP_NAME",
"Value": "seurat"
}
]
}
},
"Retry": [ {
"ErrorEquals": ["AWS.Batch.TooManyRequestsException", "Batch.BatchException", "Batch.AWSBatchException"],
"IntervalSeconds": 2,
"MaxAttempts": 7,
"BackoffRate": 5
} ],
"Catch": [
{
"ErrorEquals": [
"States.ALL"
],
"Next": "CatchSeuratFailure",
"ResultPath": "$.error"
}
],
"TimeoutSeconds": ${local.timeout}
},
"CatchSeuratFailure": {
"Type": "Pass",
"End": true
}
}
"ErrorEquals": [
"States.ALL"
],
"Next": "CatchCxgFailure",
"ResultPath": "$.error"
}
]
],
"ResultPath": null,
"TimeoutSeconds": 360000
},
"CatchCxgFailure": {
"Type": "Pass",
"End": true
},
"HandleSuccess": {
"Type": "Task",
"InputPath": "$",
"Resource": "${var.lambda_success_handler}",
"Parameters": {
"execution_id.$": "$$.Execution.Id",
"cxg_job.$": "$[0]",
"seurat_job.$": "$[1]"
"cxg_job.$": "$"
},
"Retry": [ {
"ErrorEquals": ["Lambda.AWSLambdaException"],
Expand Down Expand Up @@ -351,7 +291,7 @@ resource "aws_sfn_state_machine" "state_machine" {
"Type": "Task",
"Next": "CheckForErrors",
"Parameters": {
"JobDefinition.$": "$[0].batch.JobDefinitionName"
"JobDefinition.$": "$.batch.JobDefinitionName"
},
"Resource": "arn:aws:states:::aws-sdk:batch:deregisterJobDefinition",
"Retry": [ {
Expand Down Expand Up @@ -403,42 +343,6 @@ resource "aws_sfn_state_machine" "state_machine" {
EOF
}

resource "aws_sfn_state_machine" "state_machine_seurat" {
name = "dp-${var.deployment_stage}-${var.custom_stack_name}-seurat-sfn"
role_arn = var.role_arn

definition = <<EOF
{
"StartAt": "Seurat",
"States": {
"Seurat": {
"Type": "Task",
"End": true,
"Resource": "arn:aws:states:::batch:submitJob.sync",
"Parameters": {
"JobDefinition": "${var.job_definition_arn}",
"JobName": "seurat",
"JobQueue": "${var.job_queue_arn}",
"ContainerOverrides": {
"Environment": [
{
"Name": "DATASET_VERSION_ID",
"Value.$": "$.dataset_version_id"
},
{
"Name": "STEP_NAME",
"Value": "seurat"
}
]
}
},
"TimeoutSeconds": ${local.timeout}
}
}
}
EOF
}

resource "aws_sfn_state_machine" "state_machine_cxg_remaster" {
name = "dp-${var.deployment_stage}-${var.custom_stack_name}-cxg-remaster-v2-sfn"
role_arn = var.role_arn
Expand Down
84 changes: 2 additions & 82 deletions backend/layers/processing/dataset_metadata_update.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,8 +9,6 @@

import scanpy
import tiledb
from rpy2.robjects import StrVector
from rpy2.robjects.packages import importr

from backend.common.utils.corpora_constants import CorporaConstants
from backend.layers.business.business import BusinessLogic
Expand All @@ -33,9 +31,6 @@
from backend.layers.thirdparty.s3_provider import S3Provider
from backend.layers.thirdparty.uri_provider import UriProvider

base = importr("base")
seurat = importr("SeuratObject")

configure_logging(level=logging.INFO)

# maps artifact name for metadata field to DB field name, if different
Expand Down Expand Up @@ -136,48 +131,6 @@ def update_h5ad(
finally:
os.remove(h5ad_filename)

def update_rds(
self,
rds_uri: str,
new_key_prefix: str,
new_dataset_version_id: DatasetVersionId,
metadata_update: DatasetArtifactMetadataUpdate,
):
seurat_filename = self.download_from_source_uri(
source_uri=rds_uri,
local_path=CorporaConstants.LABELED_RDS_ARTIFACT_FILENAME,
)

try:
self.update_processing_status(
new_dataset_version_id, DatasetStatusKey.RDS, DatasetConversionStatus.CONVERTING
)

rds_object = base.readRDS(seurat_filename)

for key, val in metadata_update.as_dict_without_none_values().items():
seurat_metadata = seurat.Misc(object=rds_object)
if seurat_metadata.rx2[key]:
val = val if isinstance(val, list) else [val]
seurat_metadata[seurat_metadata.names.index(key)] = StrVector(val)

base.saveRDS(rds_object, file=seurat_filename)

self.create_artifact(
seurat_filename,
DatasetArtifactType.RDS,
new_key_prefix,
new_dataset_version_id,
self.artifact_bucket,
DatasetStatusKey.RDS,
datasets_bucket=self.datasets_bucket,
)
self.update_processing_status(
new_dataset_version_id, DatasetStatusKey.RDS, DatasetConversionStatus.CONVERTED
)
finally:
os.remove(seurat_filename)

def update_cxg(
self,
cxg_uri: str,
Expand Down Expand Up @@ -257,19 +210,6 @@ def update_h5ad(
metadata_update,
)

@staticmethod
def update_rds(
artifact_bucket: str,
datasets_bucket: str,
rds_uri: str,
new_key_prefix: str,
new_dataset_version_id: DatasetVersionId,
metadata_update: DatasetArtifactMetadataUpdate,
):
DatasetMetadataUpdaterWorker(artifact_bucket, datasets_bucket).update_rds(
rds_uri, new_key_prefix, new_dataset_version_id, metadata_update
)

@staticmethod
def update_cxg(
artifact_bucket: str,
Expand Down Expand Up @@ -353,28 +293,8 @@ def update_metadata(
self.logger.error(f"Cannot find labeled H5AD artifact uri for {current_dataset_version_id}.")
self.update_processing_status(new_dataset_version_id, DatasetStatusKey.H5AD, DatasetConversionStatus.FAILED)

if DatasetArtifactType.RDS in artifact_uris:
self.logger.info("Main: Starting thread for rds update")
rds_job = Process(
target=DatasetMetadataUpdater.update_rds,
args=(
self.artifact_bucket,
self.datasets_bucket,
artifact_uris[DatasetArtifactType.RDS],
new_artifact_key_prefix,
new_dataset_version_id,
metadata_update,
),
)
artifact_jobs.append(rds_job)
rds_job.start()
elif current_dataset_version.status.rds_status == DatasetConversionStatus.SKIPPED:
self.update_processing_status(new_dataset_version_id, DatasetStatusKey.RDS, DatasetConversionStatus.SKIPPED)
else:
self.logger.error(
f"Cannot find RDS artifact uri for {current_dataset_version_id}, and Conversion Status is not SKIPPED."
)
self.update_processing_status(new_dataset_version_id, DatasetStatusKey.RDS, DatasetConversionStatus.FAILED)
# Mark all RDS conversions as skipped
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this seemed like the simplest backwards compatible solution

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this approach is consistent since we treat metadata updates as new versions, but we should double-check with Lattice/Brian that they're aware + cool with it

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

self.update_processing_status(new_dataset_version_id, DatasetStatusKey.RDS, DatasetConversionStatus.SKIPPED)

if DatasetArtifactType.CXG in artifact_uris:
self.logger.info("Main: Starting thread for cxg update")
Expand Down
21 changes: 0 additions & 21 deletions backend/layers/processing/make_seurat.R

This file was deleted.

Loading
Loading