diff --git a/src/nhp/data/aggregated_data/ecds.py b/src/nhp/data/aggregated_data/ecds.py index 3cfd0dc..258863f 100644 --- a/src/nhp/data/aggregated_data/ecds.py +++ b/src/nhp/data/aggregated_data/ecds.py @@ -27,6 +27,7 @@ def get_ecds_data(spark: SparkSession) -> DataFrame: F.col("type"), F.col("hsagrp"), F.col("ndggrp"), + F.col("capacity_conversion_group"), F.col("icb"), F.col("is_main_icb"), F.col("is_adult"), diff --git a/src/nhp/data/aggregated_data/outpatients.py b/src/nhp/data/aggregated_data/outpatients.py index c588dbd..7b53bca 100644 --- a/src/nhp/data/aggregated_data/outpatients.py +++ b/src/nhp/data/aggregated_data/outpatients.py @@ -25,6 +25,7 @@ def get_outpatients_data(spark: SparkSession) -> DataFrame: F.col("pod"), F.col("hsagrp"), F.col("ndggrp"), + F.col("capacity_conversion_group"), F.col("has_procedures"), F.col("sushrg").substr(1, 4).alias("sushrg_trimmed"), F.col("icb"), diff --git a/src/nhp/data/raw_data/aae.py b/src/nhp/data/raw_data/aae.py index e729e6b..b32b6fa 100644 --- a/src/nhp/data/raw_data/aae.py +++ b/src/nhp/data/raw_data/aae.py @@ -1,14 +1,19 @@ """Generate the AAE data""" +import pyspark.sql.functions as F from databricks.connect import DatabricksSession +from pyspark.sql import DataFrame, SparkSession +from pyspark.sql.types import * # noqa: F403 -import pyspark.sql.functions as F from nhp.data.nhp_datasets.icbs import add_main_icb, icb_mapping from nhp.data.nhp_datasets.local_authorities import local_authority_successors from nhp.data.nhp_datasets.providers import read_data_with_provider from nhp.data.raw_data.helpers import add_age_group_column -from pyspark.sql import DataFrame, SparkSession -from pyspark.sql.types import * # noqa: F403 + + +def create_capacity_conversion_group(): + # can't create capacity groups on AAE data + return F.lit("aae-unknown") def get_aae_data(spark: SparkSession) -> DataFrame: @@ -192,6 +197,7 @@ def get_aae_data(spark: SparkSession) -> DataFrame: .withColumn("tretspef_grouped", F.lit("Other")) .withColumn("pod", F.concat(F.lit("aae_type-"), F.col("aedepttype"))) .withColumn("ndggrp", F.col("group")) + .withColumn("capacity_conversion_group", create_capacity_conversion_group()) .repartition("fyear", "provider") ) diff --git a/src/nhp/data/raw_data/ecds.py b/src/nhp/data/raw_data/ecds.py index 4c91236..0b7b49b 100644 --- a/src/nhp/data/raw_data/ecds.py +++ b/src/nhp/data/raw_data/ecds.py @@ -3,14 +3,25 @@ from itertools import chain from databricks.connect import DatabricksSession +from pyspark.sql import DataFrame, SparkSession +from pyspark.sql import functions as F +from pyspark.sql.types import * # noqa: F403 from nhp.data.nhp_datasets.icbs import add_main_icb, icb_mapping from nhp.data.nhp_datasets.local_authorities import local_authority_successors from nhp.data.nhp_datasets.providers import add_provider from nhp.data.raw_data.helpers import add_age_group_column -from pyspark.sql import DataFrame, SparkSession -from pyspark.sql import functions as F -from pyspark.sql.types import * # noqa: F403 + + +def create_capacity_conversion_group(): + is_child = F.col("age") <= 17 + + return ( + F.when(F.col("acuity") == "immediate-resuscitation", "aae-resus") + .when(is_child, "aae-childrens") + .when(F.col("acuity").isin(["urgent", "very-urgent"]), "aae-majors") + .otherwise("aae-minors") + ) def get_ecds_data(spark: SparkSession) -> DataFrame: @@ -239,6 +250,7 @@ def get_ecds_data(spark: SparkSession) -> DataFrame: .withColumn("tretspef_grouped", F.lit("Other")) .withColumn("pod", F.concat(F.lit("aae_type-"), F.col("aedepttype"))) .withColumn("ndggrp", F.col("group")) + .withColumn("capacity_conversion_group", create_capacity_conversion_group()) .repartition("fyear", "provider") ) diff --git a/src/nhp/data/raw_data/inpatients.py b/src/nhp/data/raw_data/inpatients.py index f017ae9..99b234e 100644 --- a/src/nhp/data/raw_data/inpatients.py +++ b/src/nhp/data/raw_data/inpatients.py @@ -2,13 +2,81 @@ from databricks.connect import DatabricksSession from delta.tables import DeltaTable +from pyspark.sql import DataFrame, SparkSession +from pyspark.sql import functions as F +from pyspark.sql.types import * # noqa: F403 from nhp.data.nhp_datasets.apc import apc_primary_procedures, hes_apc from nhp.data.nhp_datasets.icbs import add_main_icb from nhp.data.raw_data.helpers import add_age_group_column, add_tretspef_grouped_column -from pyspark.sql import DataFrame, SparkSession -from pyspark.sql import functions as F -from pyspark.sql.types import * # noqa: F403 + + +def create_capacity_conversion_group(): + is_child = F.col("age") <= 17 + is_surgical_specialty = F.col("tretspef").rlike("^1(?!80|9[012])") + is_zero_los = F.col("speldur") == 0 + is_elective = F.col("group") == "elective" + is_nonelective = F.col("group") == "non-elective" + + # the logic for this will fall through, so we do not need to do thinks like apply an "is_adult" + # filter after filtering for is_child. + return ( + # daycases + F.when( + F.col("classpat").isin(["2", "3"]), + F.when(is_child, "ip-daycase-childrens") + .when(F.col("tretspef").isin(["320", "321"]), "ip-daycase-cardiology") + .when( + F.col("tretspef").isin(["280", "811"]), + "ip-daycase-interventional_radiology", + ) + # TODO: add endoscopy + .when( + F.col("tretspef").isin(["253", "260", "303", "370", "800"]), + "ip-daycase-oncology_haematology", + ) + .when(is_surgical_specialty, "ip-daycase-surgical") + .otherwise("ip-daycase-non_surgical"), + ) + # everything else will be non-daycase + # maternity admissions + .when(F.col("tretspef") == "501", "ip-maternity-obstetric") + .when(F.col("tretspef") == "560", "ip-maternity-midwife_led") + .when(F.col("group") == "maternity", "ip-maternity-unknown") + # paediatric admissions + .when( + is_child, + F.when( + is_zero_los & is_nonelective, "ip-childrens-assessment_unit" + ).otherwise("ip-childrens-inpatients"), + ) + # adult admissions + # elective admissions + # TODO: add ip-stroke + .when( + is_elective, + F.when( + is_surgical_specialty, + F.when( + F.col("speldur") <= 3, "ip-elective-surgical-short_stay" + ).otherwise("ip-elective-surgical-long_stay"), + ).otherwise( + F.when( + F.col("speldur") <= 3, "ip-elective-non_surgical-short_stay" + ).otherwise("ip-elective-non_surgical-long_stay") + ), + ) + # non-elective admissions + .when(is_zero_los, "ip-adult_acute_assessment") + .when( + is_surgical_specialty, + F.when(F.col("speldur") <= 3, "ip-acute-surgical-short_stay").otherwise( + "ip-acute-surgical-longer_stay" + ), + ) + .when(F.col("speldur") <= 3, "ip-acute-non_surgical-short_stay") + .otherwise("ip-acute-non_surgical-longer_stay") + ) def get_inpatients_data(spark: SparkSession) -> DataFrame: @@ -90,6 +158,8 @@ def get_inpatients_data(spark: SparkSession) -> DataFrame: # add in primary diagnosis and procedure columns .join(df_primary_diagnosis, ["epikey", "fyear", "procode3"], "left") .join(df_primary_procedure, ["epikey", "fyear", "procode3"], "left") + # capacity conversion + .withColumn("capacity_conversion_group", create_capacity_conversion_group()) .select( F.col("epikey"), F.col("fyear"), @@ -108,6 +178,7 @@ def get_inpatients_data(spark: SparkSession) -> DataFrame: F.col("tretspef_grouped"), F.col("hsagrp"), F.col("group"), + F.col("capacity_conversion_group"), F.col("admidate"), F.col("disdate"), F.col("speldur"), diff --git a/src/nhp/data/raw_data/outpatients.py b/src/nhp/data/raw_data/outpatients.py index a639572..dc91497 100644 --- a/src/nhp/data/raw_data/outpatients.py +++ b/src/nhp/data/raw_data/outpatients.py @@ -1,14 +1,30 @@ """Generate outpatients data""" from databricks.connect import DatabricksSession +from pyspark.sql import DataFrame, SparkSession +from pyspark.sql import functions as F +from pyspark.sql.types import * # noqa: F403 from nhp.data.nhp_datasets.icbs import add_main_icb, icb_mapping from nhp.data.nhp_datasets.local_authorities import local_authority_successors from nhp.data.nhp_datasets.providers import read_data_with_provider from nhp.data.raw_data.helpers import add_age_group_column, add_tretspef_grouped_column -from pyspark.sql import DataFrame, SparkSession -from pyspark.sql import functions as F -from pyspark.sql.types import * # noqa: F403 + + +def create_capacity_conversion_group(): + is_maternity = F.col("trestpef").isin(["424", "501", "505", "560"]) + is_child = F.col("age") <= 17 + + return F.when( + F.col("has_procedures"), + F.when(is_maternity, "op-procedure-maternity") + .when(is_child, "op-procedure-childrens") + .otherwise("op-procedure-adult"), + ).otherwise( + F.when(is_maternity, "op-maternity") + .when(is_child, "op-childrens") + .otherwise("op-adult") + ) def get_outpatients_data(spark: SparkSession) -> DataFrame: @@ -144,6 +160,7 @@ def get_outpatients_data(spark: SparkSession) -> DataFrame: .when(F.col("is_first"), "op_first") .otherwise("op_follow-up"), ) + .withColumn("capacity_conversion_group", create_capacity_conversion_group()) .withColumn("ndggrp", F.col("group")) )