Skip to content

Commit

Permalink
cleanup synchronize_user_tables script and working create_derived_tab…
Browse files Browse the repository at this point in the history
…les_in_clickhouse_database_by_profile
  • Loading branch information
importer system account committed Nov 11, 2024
1 parent 99c7538 commit 0affe74
Show file tree
Hide file tree
Showing 2 changed files with 124 additions and 62 deletions.
Original file line number Diff line number Diff line change
@@ -1,78 +1,139 @@
#!/usr/bin/env python3

import argparse
import datetime
import os
import re
import subprocess
import sys

def get_list_of_genetic_profile_ids():
get_genetic_profile_id_list_query="SELECT genetic_profile_id FROM genetic_profile WHERE genetic_alteration_type NOT IN ('GENERIC_ASSAY', 'MUTATION_EXTENDED', 'STRUCTURAL_VARIANT')"
query_argument_template="--query={0}"
query_argument = query_argument_template.format(get_genetic_profile_id_list_query)
clickhouse_client_obtain_genetic_profile_id_list = ["clickhouse", "client", "--config-file=clickhouse_client_config_2024-10-14-09-03-02.yaml", query_argument]
#TODO remove hardcode of clickhouse config file and accept (or generate) this file based on command like arguments
genetic_profile_query_result = subprocess.run(clickhouse_client_obtain_genetic_profile_id_list, shell=False, capture_output=True)#, stderr=stderr_file, stdout=stdout_file)
genetic_profile_id_list_string = genetic_profile_query_result.stdout.decode("utf-8")
genetic_profile_id_list = genetic_profile_id_list_string.splitlines()
return genetic_profile_id_list, genetic_profile_query_result.returncode
EXPECTED_GENETIC_ALTERATION_INSERT_STATEMENT_START = "INSERT INTO TABLE genetic_alteration_derived"
EXPECTED_GENETIC_ALTERATION_WHERE_FOR_PROFILE_TYPE = "gp.genetic_alteration_type NOT IN ('GENERIC_ASSAY', 'MUTATION_EXTENDED', 'STRUCTURAL_VARIANT')"
EXPECTED_GENERIC_ASSAY_INSERT_STATEMENT_START = "INSERT INTO TABLE generic_assay_data_derived"
EXPECTED_GENERIC_ASSAY_WHERE_FOR_PROFILE_TYPE = "gp.generic_assay_type IS NOT NULL"
GET_GENETIC_PROFILE_ID_LIST_QUERY = "SELECT genetic_profile_id FROM genetic_profile WHERE genetic_alteration_type NOT IN ('GENERIC_ASSAY', 'MUTATION_EXTENDED', 'STRUCTURAL_VARIANT')"
GET_GENERIC_ASSAY_PROFILE_ID_LIST_QUERY = "SELECT genetic_profile_id FROM genetic_profile WHERE generic_assay_type IS NOT NULL"
PROFILE_WHERE_REPLACEMENT_STRING = "gp.genetic_profile_id={0}"
GET_FAILED_PROFILE_STUDY_ID_QUERY = "SELECT genetic_profile.stable_id, cancer_study.cancer_study_identifier FROM genetic_profile JOIN cancer_study ON genetic_profile.cancer_study_id=cancer_study.cancer_study_id WHERE genetic_profile.genetic_profile_id={0}"

#TODO read SQL statement templates from external files rather than hardcoding
INSERT_EVENTS_INTO_GENETIC_ALTERATION_DERIVED_QUERY_TEMPLATE = '''
INSERT INTO TABLE genetic_alteration_derived
SELECT
sample_unique_id,
cancer_study_identifier,
hugo_gene_symbol,
replaceOne(stable_id, concat(sd.cancer_study_identifier, '_'), '') as profile_type,
alteration_value
FROM
(SELECT
sample_id,
hugo_gene_symbol,
stable_id,
alteration_value
FROM
(SELECT
g.hugo_gene_symbol AS hugo_gene_symbol,
gp.stable_id as stable_id,
arrayMap(x -> (x = '' ? NULL : x), splitByString(',', assumeNotNull(substring(ga.values,1,-1)))) AS alteration_value,
arrayMap(x -> (x = '' ? NULL : toInt32(x)), splitByString(',', assumeNotNull(substring(gps.ordered_sample_list,1,-1)))) AS sample_id
FROM
genetic_profile gp
JOIN genetic_profile_samples gps ON gp.genetic_profile_id = gps.genetic_profile_id
JOIN genetic_alteration ga ON gp.genetic_profile_id = ga.genetic_profile_id
JOIN gene g ON ga.genetic_entity_id = g.genetic_entity_id
WHERE
gp.genetic_profile_id={0})
ARRAY JOIN alteration_value, sample_id
WHERE alteration_value != 'NA') AS subquery
JOIN sample_derived sd ON sd.internal_id = subquery.sample_id'''
INSERT_EVENTS_INTO_GENETIC_ALTERATION_DERIVED_QUERY_TEMPLATE_1LINE = re.sub('\s+',' ', INSERT_EVENTS_INTO_GENETIC_ALTERATION_DERIVED_QUERY_TEMPLATE).strip()
def process_was_successful(status):
return status == 0

#TODO add generic assay query too and allow selection of which to run by command line argument
def get_failed_profile_study_id_info(yaml_config_filename, profile_id):
failed_profile_query = GET_FAILED_PROFILE_STUDY_ID_QUERY.format(profile_id)
query_argument = f"--query={failed_profile_query}"
config_filename_argument = f"--config-file={yaml_config_filename}"
clickhouse_client_obtain_failed_profile_name = ["clickhouse", "client", config_filename_argument, query_argument]
failed_profile_query_result = subprocess.run(clickhouse_client_obtain_failed_profile_name, shell=False, capture_output=True)
failed_profile_result_string = failed_profile_query_result.stdout.decode("utf-8")
result_list = failed_profile_result_string.split()
return result_list[0], result_list[1], failed_profile_query_result.returncode

def insert_event_records_for_profile_into_derived_table(derived_table_name, genetic_profile_id):
insert_events_query = INSERT_EVENTS_INTO_GENETIC_ALTERATION_DERIVED_QUERY_TEMPLATE_1LINE.format(genetic_profile_id)
query_argument_template="--query={0}"
query_argument = query_argument_template.format(insert_events_query)
clickhouse_client_insert_records = ["clickhouse", "client", "--config-file=clickhouse_client_config_2024-10-14-09-03-02.yaml", query_argument]
insert_events_query_result = subprocess.run(clickhouse_client_insert_records, shell=False, capture_output=True)#, stderr=stderr_file, stdout=stdout_file)
print("stderr from insert:")
print(insert_events_query_result.stderr.decode("utf-8"))
def get_list_of_profile_ids_from_clickhouse(yaml_config_filename, get_profile_id_list_query):
query_argument = f"--query={get_profile_id_list_query}"
config_filename_argument = f"--config-file={yaml_config_filename}"
clickhouse_client_obtain_genetic_profile_id_list = ["clickhouse", "client", config_filename_argument, query_argument]
profile_query_result = subprocess.run(clickhouse_client_obtain_genetic_profile_id_list, shell=False, capture_output=True)
profile_id_list_string = profile_query_result.stdout.decode("utf-8")
profile_id_list = profile_id_list_string.splitlines()
return profile_id_list, profile_query_result.returncode

def convert_insert_to_be_by_genetic_profile(original_insert_sql_statement, expected_where_for_profile_type):
insert_query_by_profile_template = original_insert_sql_statement.replace(expected_where_for_profile_type, PROFILE_WHERE_REPLACEMENT_STRING)
return re.sub('\s+', ' ', insert_query_by_profile_template)

def insert_event_records_for_profile_into_derived_table(yaml_config_filename, insert_query_by_profile_template, genetic_profile_id):
insert_events_query = insert_query_by_profile_template.format(genetic_profile_id)
query_argument = f"--query={insert_events_query}"
config_filename_argument = f"--config-file={yaml_config_filename}"
clickhouse_client_insert_records = ["clickhouse", "client", config_filename_argument, query_argument]
insert_events_query_result = subprocess.run(clickhouse_client_insert_records, shell=False, capture_output=True)
return insert_events_query_result.returncode

def main():
print(datetime.datetime.now())
genetic_profile_id_list, returncode = get_list_of_genetic_profile_ids()
print(genetic_profile_id_list)
print("return code was {0}".format(returncode))
def create_arg_parser():
usage = "usage: %prog derived_table_name yaml_properties_filepath sql_filepath"
parser = argparse.ArgumentParser(
prog='create_derived_tables_in_clickhouse_database_by_profile.py',
description='Generates derived clickhouse tables')
parser.add_argument('derived_table_name')
parser.add_argument('yaml_config_filename')
parser.add_argument('sql_filepath')
return parser

def exit_if_args_are_invalid(args):
if not args.derived_table_name in ('genetic_alteration_derived', 'generic_assay_data_derived'):
print(f"Argument '{args.derived_table_name}' must be either 'genetic_alteration_derived' or 'generic_assay_data_derived'.", file=sys.stderr)
sys.exit(1)
if not os.path.isfile(args.yaml_config_filename):
print(f"File '{args.yaml_config_filename}' does not exist or is not a file", file=sys.stderr)
sys.exit(1)
if not os.path.isfile(args.sql_filepath):
print(f"File '{args.sql_filepath}' does not exist or is not a file", file=sys.stderr)
sys.exit(1)

def insert_sql_statement_matches_expectations(original_insert_sql_statement, expected_insert_statement_start, expected_where_for_profile_type):
# assume start and end are unchanged -- of course there is a risk that it will change (espcially to the end of the query)
if not original_insert_sql_statement.startswith(expected_insert_statement_start):
return False
if not expected_where_for_profile_type in original_insert_sql_statement:
return False
return True

def create_sql_insert_statement_template(derived_table_name, sql_filepath):
if derived_table_name == 'genetic_alteration_derived':
return create_sql_insert_statement_template_from_clickhouse(sql_filepath, EXPECTED_GENETIC_ALTERATION_INSERT_STATEMENT_START, EXPECTED_GENETIC_ALTERATION_WHERE_FOR_PROFILE_TYPE)
if derived_table_name == 'generic_assay_data_derived':
return create_sql_insert_statement_template_from_clickhouse(sql_filepath, EXPECTED_GENERIC_ASSAY_INSERT_STATEMENT_START, EXPECTED_GENERIC_ASSAY_WHERE_FOR_PROFILE_TYPE)
# We should never get here if args were properly validated
print(f"Internal Error : derived_table_name argument had unexpected value : {derived_table_name}", file=sys.stderr)
sys.exit(1)

def create_sql_insert_statement_template_from_clickhouse(sql_filepath, expected_insert_statement_start, expected_where_for_profile_type):
with open(sql_filepath, 'r') as sql_file:
original_insert_sql_statement = sql_file.read().strip()
if (not insert_sql_statement_matches_expectations(original_insert_sql_statement, expected_insert_statement_start, expected_where_for_profile_type)):
print(f"Error: original insert query '{sql_filepath}' does not have the expected format", file=sys.stderr)
print(f"\tExpected query start is '{expected_insert_statement_start}'", file=sys.stderr)
print(f"\tExpected query contains '{expected_where_for_profile_type}'", file=sys.stderr)
sys.exit(1)
insert_by_profile_statement_template = convert_insert_to_be_by_genetic_profile(original_insert_sql_statement, expected_where_for_profile_type)
return insert_by_profile_statement_template

def insert_event_records_for_all_profiles(yaml_config_filename, genetic_profile_id_list, sql_insert_statement_template):
successful_profile_count = 0
for genetic_profile_id in genetic_profile_id_list:
print(datetime.datetime.now())
returncode = insert_event_records_for_profile_into_derived_table("genetic_alteration", genetic_profile_id)
returncode = insert_event_records_for_profile_into_derived_table(yaml_config_filename, sql_insert_statement_template, genetic_profile_id)
if returncode != 0:
print("Error occurred during insertion of record for profile {0}".format(genetic_profile_id))
#TODO add a step where genetic profiles are mapped to cancer study stable id, and print that instead
print(datetime.datetime.now())
print("stop")
profile_stable_id, cancer_study_identifier, select_returncode = get_failed_profile_study_id_info(yaml_config_filename, genetic_profile_id)
if select_returncode != 0:
print("WARNING: Error occurred during insertion of record for profile id '{genetic_profile_id}'. We could not retrieve the cancer_study_identifier. The derived table records for this profile are missing or incomplete.", file=sys.stderr)
else:
print(f"WARNING: Error occurred during insertion of record for profile '{profile_stable_id}' in study '{cancer_study_identifier}'. The derived table records for this profile are missing or incomplete.", file=sys.stderr)
else:
print('.', end='')
sys.stdout.flush()
successful_profile_count += 1
print(f"\nSuccessfully processed {successful_profile_count} out of {len(genetic_profile_id_list)} profiles.")

def get_list_of_profile_ids(derived_table_name, yaml_config_filename):
if derived_table_name == 'genetic_alteration_derived':
return get_list_of_profile_ids_from_clickhouse(yaml_config_filename, GET_GENETIC_PROFILE_ID_LIST_QUERY)
if derived_table_name == 'generic_assay_data_derived':
return get_list_of_profile_ids_from_clickhouse(yaml_config_filename, GET_GENERIC_ASSAY_PROFILE_ID_LIST_QUERY)
# We should never get here if args were properly validated
print(f"Internal Error : derived_table_name argument had unexpected value : {derived_table_name}", file=sys.stderr)
sys.exit(1)

def main():
parser = create_arg_parser()
args = parser.parse_args()
exit_if_args_are_invalid(args)
profile_id_list, returncode = get_list_of_profile_ids(args.derived_table_name, args.yaml_config_filename)
if not process_was_successful(returncode):
print(f"ERROR: Failed to get list of profile ids. Return code was '{returncode}'. Please check the properties file containing your database credentials.")
sys.exit(1)
sql_insert_statement_template = create_sql_insert_statement_template(args.derived_table_name, args.sql_filepath)
insert_event_records_for_all_profiles(args.yaml_config_filename, profile_id_list, sql_insert_statement_template)

if __name__ == '__main__':
main()
Original file line number Diff line number Diff line change
Expand Up @@ -93,6 +93,7 @@ function delete_output_stream_files() {

function shutdown_main_and_clean_up() {
shutdown_mysql_command_line_functions
shutdown_clickhouse_client_command_line_functions
delete_output_stream_files
unset my_properties
unset mysql_source_database_name
Expand Down

0 comments on commit 0affe74

Please sign in to comment.