Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
26 commits
Select commit Hold shift + click to select a range
1cc4bd7
abstracts table names into table_names.py
tomjemmett Oct 28, 2025
5218ce9
fix typing/linting issues
tomjemmett Oct 28, 2025
9ec981a
removes arguments to get_spark
tomjemmett Oct 28, 2025
9054ab3
removes arguments that called get_spark as a default value
tomjemmett Oct 28, 2025
58ed27a
moves the path for model data extraction to table_names
tomjemmett Oct 28, 2025
8739757
switches to single get_spark method
tomjemmett Oct 28, 2025
da69470
fixes some table names
tomjemmett Oct 29, 2025
4baba90
changes current environment
tomjemmett Oct 29, 2025
28e9355
switches to using the base data object rather than raw_data table to …
tomjemmett Oct 29, 2025
3301ac3
removes unused file
tomjemmett Oct 29, 2025
383b8eb
removes comment, covvered elsewhere
tomjemmett Oct 29, 2025
ef223d7
uses correct tables in filter_acute_providers calls
tomjemmett Oct 29, 2025
554469d
moves inputs save path into table names module
tomjemmett Oct 29, 2025
1e15a6c
fix typo in variable name
tomjemmett Oct 29, 2025
9c537d9
moves save path into table names for population projections
tomjemmett Oct 29, 2025
1f6771d
removes custom population paths path, uses standard path instead
tomjemmett Oct 29, 2025
a7d76bd
uses path from table_names in day procedures mitigator
tomjemmett Oct 29, 2025
8a13906
moves hrg trim point file to reference data module
tomjemmett Oct 29, 2025
7b4c3ce
moves hrgs.json into reference data
tomjemmett Oct 29, 2025
79b8e14
updates todo note
tomjemmett Oct 29, 2025
afcdc70
refactor
tomjemmett Oct 29, 2025
8d14c62
remove unused import
tomjemmett Oct 29, 2025
f3c1750
fix typo in docstring
tomjemmett Oct 29, 2025
44f0410
explicitly state the null value used in the reference data file
tomjemmett Oct 29, 2025
4f9b00a
uses get_spark wherever SparkSession is used
tomjemmett Oct 29, 2025
dc0206d
moves hes_datasets to raw_data.mitigators.ip, because that is the onl…
tomjemmett Oct 29, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
23 changes: 10 additions & 13 deletions databricks_workflows/nhp_data-extract_nhp_for_containers.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ resources:
package_name: nhp_data
entry_point: model_data-ip
parameters:
- "{{job.parameters.save_path}}/{{job.parameters.data_version}}"
- "{{job.parameters.data_version}}"
- "{{input}}"
job_cluster_key: run_nhp_extracts_cluster
libraries:
Expand All @@ -33,7 +33,7 @@ resources:
package_name: nhp_data
entry_point: model_data-op
parameters:
- "{{job.parameters.save_path}}/{{job.parameters.data_version}}"
- "{{job.parameters.data_version}}"
- "{{input}}"
job_cluster_key: run_nhp_extracts_cluster
libraries:
Expand All @@ -49,7 +49,7 @@ resources:
package_name: nhp_data
entry_point: model_data-aae
parameters:
- "{{job.parameters.save_path}}/{{job.parameters.data_version}}"
- "{{job.parameters.data_version}}"
- "{{input}}"
job_cluster_key: run_nhp_extracts_cluster
libraries:
Expand All @@ -65,7 +65,7 @@ resources:
package_name: nhp_data
entry_point: model_data-demographic_factors
parameters:
- "{{job.parameters.save_path}}/{{job.parameters.data_version}}"
- "{{job.parameters.data_version}}"
- "{{input}}"
- "{{job.parameters.projection_year}}"
job_cluster_key: run_nhp_extracts_cluster
Expand All @@ -82,7 +82,7 @@ resources:
package_name: nhp_data
entry_point: model_data-birth_factors
parameters:
- "{{job.parameters.save_path}}/{{job.parameters.data_version}}"
- "{{job.parameters.data_version}}"
- "{{input}}"
- "{{job.parameters.projection_year}}"
job_cluster_key: run_nhp_extracts_cluster
Expand All @@ -99,7 +99,7 @@ resources:
package_name: nhp_data
entry_point: model_data-inequalities
parameters:
- "{{job.parameters.save_path}}/{{job.parameters.data_version}}"
- "{{job.parameters.data_version}}"
- "{{input}}"
job_cluster_key: run_nhp_extracts_cluster
libraries:
Expand All @@ -111,7 +111,7 @@ resources:
package_name: nhp_data
entry_point: model_data-health_status_adjustment-generate_provider_gams
parameters:
- "{{job.parameters.save_path}}/{{job.parameters.data_version}}"
- "{{job.parameters.data_version}}"
job_cluster_key: run_nhp_extracts_cluster
libraries:
- pypi:
Expand All @@ -124,7 +124,7 @@ resources:
package_name: nhp_data
entry_point: model_data-health_status_adjustment-generate_icb_gams
parameters:
- "{{job.parameters.save_path}}/{{job.parameters.data_version}}"
- "{{job.parameters.data_version}}"
job_cluster_key: run_nhp_extracts_cluster
libraries:
- pypi:
Expand All @@ -137,7 +137,7 @@ resources:
package_name: nhp_data
entry_point: model_data-health_status_adjustment-generate_national_gams
parameters:
- "{{job.parameters.save_path}}/{{job.parameters.data_version}}"
- "{{job.parameters.data_version}}"
job_cluster_key: run_nhp_extracts_cluster
libraries:
- pypi:
Expand All @@ -154,7 +154,6 @@ resources:
package_name: nhp_data
entry_point: model_data-generate_synthetic_data
parameters:
- "{{job.parameters.save_path}}"
- "{{input}}"
- "20251001"
job_cluster_key: run_nhp_extracts_cluster
Expand All @@ -167,7 +166,7 @@ resources:
package_name: nhp_data
entry_point: model_data-clean_up
parameters:
- "{{job.parameters.save_path}}/{{job.parameters.data_version}}"
- "{{job.parameters.data_version}}"
job_cluster_key: run_nhp_extracts_cluster
libraries:
- whl: ../dist/*.whl
Expand Down Expand Up @@ -202,8 +201,6 @@ resources:
parameters:
- name: data_version
default: dev
- name: save_path
default: /Volumes/nhp/model_data/files
- name: years
default: "[202324]"
- name: projection_year
Expand Down
18 changes: 0 additions & 18 deletions databricks_workflows/nhp_data-inputs.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,6 @@ resources:
python_wheel_task:
package_name: nhp_data
entry_point: inputs_data-catchments
parameters:
- "{{job.parameters.save_path}}"
job_cluster_key: nhp_inputs_data_extract
libraries:
- whl: ../dist/*.whl
Expand All @@ -31,8 +29,6 @@ resources:
python_wheel_task:
package_name: nhp_data
entry_point: inputs_data-age_sex
parameters:
- "{{job.parameters.save_path}}"
job_cluster_key: nhp_inputs_data_extract
libraries:
- whl: ../dist/*.whl
Expand All @@ -42,8 +38,6 @@ resources:
python_wheel_task:
package_name: nhp_data
entry_point: inputs_data-baseline
parameters:
- "{{job.parameters.save_path}}"
job_cluster_key: nhp_inputs_data_extract
libraries:
- whl: ../dist/*.whl
Expand All @@ -53,8 +47,6 @@ resources:
python_wheel_task:
package_name: nhp_data
entry_point: inputs_data-diagnoses
parameters:
- "{{job.parameters.save_path}}"
job_cluster_key: nhp_inputs_data_extract
libraries:
- whl: ../dist/*.whl
Expand All @@ -64,8 +56,6 @@ resources:
python_wheel_task:
package_name: nhp_data
entry_point: inputs_data-expat_repat
parameters:
- "{{job.parameters.save_path}}"
job_cluster_key: nhp_inputs_data_extract
libraries:
- whl: ../dist/*.whl
Expand All @@ -75,8 +65,6 @@ resources:
python_wheel_task:
package_name: nhp_data
entry_point: inputs_data-procedures
parameters:
- "{{job.parameters.save_path}}"
job_cluster_key: nhp_inputs_data_extract
libraries:
- whl: ../dist/*.whl
Expand All @@ -86,8 +74,6 @@ resources:
python_wheel_task:
package_name: nhp_data
entry_point: inputs_data-rates
parameters:
- "{{job.parameters.save_path}}"
job_cluster_key: nhp_inputs_data_extract
libraries:
- whl: ../dist/*.whl
Expand All @@ -97,8 +83,6 @@ resources:
python_wheel_task:
package_name: nhp_data
entry_point: inputs_data-inequalities
parameters:
- "{{job.parameters.save_path}}"
job_cluster_key: nhp_inputs_data_extract
libraries:
- whl: ../dist/*.whl
Expand All @@ -123,5 +107,3 @@ resources:
parameters:
- name: run_inputs_data
default: "True"
- name: save_path
default: /Volumes/nhp/inputs_data/files/dev
5 changes: 0 additions & 5 deletions databricks_workflows/nhp_data-population_projections.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,6 @@ resources:
package_name: nhp_data
entry_point: population_projections-get_ons_files_2022
parameters:
- "{{job.parameters.path}}"
- "{{job.parameters.projection_year}}"
job_cluster_key: generate_nhp_population
libraries:
Expand All @@ -44,7 +43,6 @@ resources:
package_name: nhp_data
entry_point: population_projections-snpp
parameters:
- "{{job.parameters.path}}"
- "{{job.parameters.projection_year}}"
- "{{input}}"
job_cluster_key: generate_nhp_population
Expand All @@ -61,7 +59,6 @@ resources:
package_name: nhp_data
entry_point: population_projections-npp
parameters:
- "{{job.parameters.path}}"
- "{{job.parameters.projection_year}}"
- "{{input}}"
job_cluster_key: generate_nhp_population
Expand Down Expand Up @@ -92,8 +89,6 @@ resources:
queue:
enabled: true
parameters:
- name: path
default: /Volumes/nhp/population_projections/files
- name: projection_year
default: "2022"
- name: run_population_data
Expand Down
3 changes: 0 additions & 3 deletions databricks_workflows/nhp_data-reference_data.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,6 @@ resources:
package_name: nhp_data
entry_point: reference-day_procedures
parameters:
- "{{job.parameters.day_procedures_save_path}}"
- "{{job.parameters.day_procedures_base_year}}"
job_cluster_key: generate_nhp_reference
job_clusters:
Expand All @@ -88,8 +87,6 @@ resources:
parameters:
- name: run_reference_data
default: "True"
- name: day_procedures_save_path
default: /Volumes/nhp/reference/files/day_procedures.json
- name: day_procedures_base_year
default: "202324"
- name: population_by_imd_decile_base_year
Expand Down
10 changes: 6 additions & 4 deletions src/nhp/data/aggregated_data/ecds.py
Original file line number Diff line number Diff line change
@@ -1,15 +1,17 @@
"""Generate ECDS Data"""

from databricks.connect import DatabricksSession
from pyspark.sql import DataFrame, SparkSession
from pyspark.sql import functions as F
from pyspark.sql.types import * # noqa: F403

from nhp.data.get_spark import get_spark
from nhp.data.table_names import table_names


def get_ecds_data(spark: SparkSession) -> DataFrame:
"""Get ECDS Data"""
return (
spark.read.table("nhp.raw_data.ecds")
spark.read.table(table_names.raw_data_ecds)
.groupBy(
F.col("fyear"),
F.col("provider"),
Expand Down Expand Up @@ -49,12 +51,12 @@ def generate_ecds_data(spark: SparkSession, ecds: DataFrame) -> None:
.write.partitionBy("fyear", "provider")
.mode("overwrite")
.option("mergeSchema", "true")
.saveAsTable("nhp.aggregated_data.ecds")
.saveAsTable(table_names.aggregated_data_ecds)
)


def main() -> None:
"""main method"""
spark = DatabricksSession.builder.getOrCreate()
spark = get_spark()
ecds = get_ecds_data(spark)
generate_ecds_data(spark, ecds)
10 changes: 6 additions & 4 deletions src/nhp/data/aggregated_data/outpatients.py
Original file line number Diff line number Diff line change
@@ -1,15 +1,17 @@
"""Generate Outpatients Data"""

from databricks.connect import DatabricksSession
from pyspark.sql import DataFrame, SparkSession
from pyspark.sql import functions as F
from pyspark.sql.types import * # noqa: F403

from nhp.data.get_spark import get_spark
from nhp.data.table_names import table_names


def get_outpatients_data(spark: SparkSession) -> DataFrame:
"""Get Outpatients Data"""
return (
spark.read.table("nhp.raw_data.opa")
spark.read.table(table_names.raw_data_opa)
.groupBy(
F.col("fyear"),
F.col("provider"),
Expand Down Expand Up @@ -51,12 +53,12 @@ def generate_outpatients_data(spark: SparkSession, opa: DataFrame) -> None:
opa.write.partitionBy("fyear", "provider")
.mode("overwrite")
.option("mergeSchema", "true")
.saveAsTable("nhp.aggregated_data.opa")
.saveAsTable(table_names.aggregated_data_opa)
)


def main() -> None:
"""main method"""
spark = DatabricksSession.builder.getOrCreate()
spark = get_spark()
opa = get_outpatients_data(spark)
generate_outpatients_data(spark, opa)
16 changes: 3 additions & 13 deletions src/nhp/data/get_spark.py
Original file line number Diff line number Diff line change
@@ -1,25 +1,15 @@
"""Helper method to get the spark session."""

import os

from databricks.connect import DatabricksSession
from pyspark.sql import SparkSession


def get_spark(schema: str, catalog: str | None = None) -> SparkSession:
def get_spark() -> SparkSession:
"""Get spark session

:param schema: which schema to use
:type schema: str
:param catalog: catalog to use, defaults to the environment variable "nhp_catalog" (or, "nhp" if
that environment variable is not set)
:type catalog: str | None, optional

:return: get the spark session to use
:rtype: SparkSession
"""

spark: SparkSession = DatabricksSession.builder.getOrCreate()
spark.catalog.setCurrentCatalog(catalog or os.environ.get("nhp_catalog", "nhp"))
spark.catalog.setCurrentDatabase(schema)
spark = DatabricksSession.builder.getOrCreate()
spark.conf.set("spark.sql.sources.partitionOverwriteMode", "dynamic")
return spark
4 changes: 3 additions & 1 deletion src/nhp/data/inputs_data/acute_providers.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,8 @@
import pyspark.sql.functions as F
from pyspark.sql import DataFrame, SparkSession

from nhp.data.table_names import table_names


@cache
def get_acute_providers(spark: SparkSession) -> DataFrame:
Expand All @@ -18,7 +20,7 @@ def get_acute_providers(spark: SparkSession) -> DataFrame:
:rtype: DataFrame
"""
acute_df = (
spark.read.table("nhp.reference.ods_trusts")
spark.read.table(table_names.reference_ods_trusts)
.filter(F.col("org_type").startswith("ACUTE"))
.persist()
)
Expand Down
3 changes: 2 additions & 1 deletion src/nhp/data/inputs_data/ae/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@

from nhp.data.inputs_data.acute_providers import filter_acute_providers
from nhp.data.inputs_data.helpers import inputs_age_group
from nhp.data.table_names import table_names


def get_ae_df(spark: SparkSession) -> DataFrame:
Expand All @@ -18,7 +19,7 @@ def get_ae_df(spark: SparkSession) -> DataFrame:
:rtype: DataFrame
"""
return (
filter_acute_providers(spark, "ecds")
filter_acute_providers(spark, table_names.raw_data_ecds)
.filter(F.isnotnull("age"))
.drop("age_group")
.join(inputs_age_group(spark), "age")
Expand Down
Loading
Loading