Skip to content

DailyMed NDC to Label Image Mart #326

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 49 commits into from
Oct 30, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
49 commits
Select commit Hold shift + click to select a range
6a0b9b3
Update full with daily files
jrlegrand Nov 28, 2023
32cf795
Fix xml regex matching
jrlegrand Nov 28, 2023
69455a1
Update datasource table name
jrlegrand Nov 28, 2023
f23bf71
Update remaining staging file
jrlegrand Nov 28, 2023
8785ff2
Initial Dailymed dbt work
jrlegrand Nov 29, 2023
5d9060b
Convert all staging to dbt
jrlegrand Nov 30, 2023
9352bab
Remove extraneous DAG files
jrlegrand Nov 30, 2023
3d5e426
Convert organization metrics to intermediate model
jrlegrand Nov 30, 2023
f11d962
Remove daily version to focus on full
jrlegrand Nov 30, 2023
45676f1
Rename DAG to dailymed
jrlegrand Nov 30, 2023
182f560
Convert intermediate table to dbt
jrlegrand Nov 30, 2023
40bc54a
Update dailymed name change
jrlegrand Nov 30, 2023
d02259e
Convert DAG to Taskflow format
jrlegrand Dec 1, 2023
e2308d1
Change to daily only
jrlegrand Jul 11, 2024
18d4ae7
XPath for Media
jrlegrand Jul 13, 2024
cb49918
Point at package label section
jrlegrand Jul 14, 2024
f2be387
Update dbt models
jrlegrand Jul 15, 2024
318db6c
Update full with daily files
jrlegrand Nov 28, 2023
012b7bd
Fix xml regex matching
jrlegrand Nov 28, 2023
dc9c83b
Update datasource table name
jrlegrand Nov 28, 2023
00db7cf
Update remaining staging file
jrlegrand Nov 28, 2023
d865f6e
Initial Dailymed dbt work
jrlegrand Nov 29, 2023
49183ca
Convert all staging to dbt
jrlegrand Nov 30, 2023
6897c11
Remove extraneous DAG files
jrlegrand Nov 30, 2023
f3a5c74
Convert organization metrics to intermediate model
jrlegrand Nov 30, 2023
162b4ee
Remove daily version to focus on full
jrlegrand Nov 30, 2023
c7d4dd7
Rename DAG to dailymed
jrlegrand Nov 30, 2023
226c547
Convert intermediate table to dbt
jrlegrand Nov 30, 2023
38fef34
Update dailymed name change
jrlegrand Nov 30, 2023
132bb52
Convert DAG to Taskflow format
jrlegrand Dec 1, 2023
2b2ceea
Change to daily only
jrlegrand Jul 11, 2024
a707f87
XPath for Media
jrlegrand Jul 13, 2024
6970f0a
Point at package label section
jrlegrand Jul 14, 2024
afce6d0
Update dbt models
jrlegrand Jul 15, 2024
a587e7d
Dailymed work
jrlegrand Sep 9, 2024
b1564f8
Merge with origin
jrlegrand Sep 11, 2024
b3bc70f
Got sorting working
jrlegrand Sep 16, 2024
8588bc7
Initial image name work
jrlegrand Sep 16, 2024
236c3c6
Add todo
jrlegrand Sep 18, 2024
67dd3de
Account for Rx and OTC
jrlegrand Sep 21, 2024
70b7c63
Account for loading rx and otc
jrlegrand Sep 21, 2024
f16b966
Fix image NDC issue
jrlegrand Sep 21, 2024
ac7ecdc
Clean up mart
jrlegrand Sep 22, 2024
af9feff
Mart updates
jrlegrand Sep 22, 2024
fbc8f1d
Fix NDC whitespace issue
jrlegrand Sep 25, 2024
0925f3d
Account for RegEx whitespace
jrlegrand Sep 27, 2024
cd7df2a
Fix Observable bugs
jrlegrand Oct 6, 2024
c195753
Update mart with urls
jrlegrand Oct 15, 2024
d373931
Account for image references in XSLT
jrlegrand Oct 24, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
149 changes: 149 additions & 0 deletions airflow/dags/dailymed/dag.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,149 @@
from pathlib import Path
import pendulum
import zipfile
import os

from airflow.decorators import dag, task
from airflow.hooks.subprocess import SubprocessHook

from lxml import etree

from sagerx import create_path, load_df_to_pg

@dag(
schedule="0 0 10 * *",
start_date=pendulum.yesterday(),
catchup=False
)
def dailymed():
dag_id = "dailymed"

ds_folder = Path("/opt/airflow/dags") / dag_id
data_folder = Path("/opt/airflow/data") / dag_id

# NOTE: "dm_spl_release_human" accounts for both
# rx and otc SPLs (but no other types of SPLs)
# - "dm_spl_release_human_rx" for rx meds only
# - "dm_spl_release_human_otc" for otc meds only
# - "dm_spl_release_human_rx_part1" for a given part
# - "dm_spl_daily_update_MMDDYYYY" for a given date
# (replace MMDDYYY with your month, day, and year)
file_set = "dm_spl_release_human_rx"

def connect_to_ftp_dir(ftp_str: str, dir: str):
import ftplib

ftp = ftplib.FTP(ftp_str)
ftp.login()

ftp.cwd(dir)

return ftp

def obtain_ftp_file_list(ftp):
import fnmatch

file_list = []
for file in ftp.nlst():
if fnmatch.fnmatch(file, f"*{file_set}*"):
file_list.append(file)
return file_list

def get_dailymed_files(ftp, file_name: str):
zip_path = create_path(data_folder) / file_name

with open(zip_path, "wb") as file:
ftp.retrbinary(f"RETR {file_name}", file.write)

with zipfile.ZipFile(zip_path, "r") as zip_ref:
zip_ref.extractall(data_folder.with_suffix(""))

os.remove(zip_path)

def transform_xml(input_xml, xslt):
# load xml input
dom = etree.parse(input_xml, etree.XMLParser(huge_tree=True))
# load XSLT
xslt_doc = etree.parse(xslt)
xslt_transformer = etree.XSLT(xslt_doc)
# apply XSLT on loaded dom
new_xml = xslt_transformer(dom)
return etree.tostring(new_xml, pretty_print=True).decode("utf-8")

def load_xml_data(spl_type_data_folder: Path):
import re
import pandas as pd
import sqlalchemy

xslt = ds_folder / "template.xsl"

db_conn_string = os.environ["AIRFLOW_CONN_POSTGRES_DEFAULT"]
db_conn = sqlalchemy.create_engine(db_conn_string)

data = []
for zip_folder in spl_type_data_folder.iterdir():
with zipfile.ZipFile(zip_folder) as unzipped_folder:
zip_file = zip_folder.stem
set_id = zip_file.split('_')[1]
for subfile in unzipped_folder.infolist():
if re.search("\.xml$", subfile.filename):
xml_file = subfile.filename

# xslt transform
temp_xml_file = unzipped_folder.extract(subfile, spl_type_data_folder)
xml_content = transform_xml(temp_xml_file, xslt)
os.remove(temp_xml_file)

# append row to the data list
data.append({"set_id": set_id, "zip_file": zip_file, "xml_file": xml_file, "xml_content": xml_content})

df = pd.DataFrame(
data,
columns=["set_id", "zip_file", "xml_file", "xml_content"],
)

load_df_to_pg(
df,
schema_name="sagerx_lake",
table_name="dailymed",
if_exists="append", # TODO: make this better - maybe don't put stuff in multiple folders?
index=False,
)

@task
def extract():
dailymed_ftp = "public.nlm.nih.gov"
ftp_dir = "/nlmdata/.dailymed/"

ftp = connect_to_ftp_dir(dailymed_ftp, ftp_dir)

file_list = obtain_ftp_file_list(ftp)
print(f'Extracting {file_list}')

for file_name in file_list:
get_dailymed_files(ftp, file_name)

@task
def load():
spl_types = ['prescription', 'otc']

for spl_type in spl_types:
spl_type_data_folder = (
data_folder
/ spl_type
)
if os.path.exists(spl_type_data_folder):
print(f'Loading {spl_type} SPLs...')
load_xml_data(spl_type_data_folder)


# Task to transform data using dbt
@task
def transform():
subprocess = SubprocessHook()
result = subprocess.run_command(['dbt', 'run', '--select', 'models/staging/dailymed', 'models/intermediate/dailymed'], cwd='/dbt/sagerx')
print("Result from dbt:", result)

extract() >> load() >> transform()

dailymed()
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,10 @@
<dailymed>
<xsl:apply-templates select="v3:document"/>

<PackageLabels>
<xsl:apply-templates select="//v3:section[v3:code[@code='51945-4']]"/>
</PackageLabels>

<NDCList>
<xsl:apply-templates select="v3:document/v3:component"/>
</NDCList>
Expand Down Expand Up @@ -67,6 +71,49 @@
<ApplicationNumber><xsl:value-of select="//v3:subjectOf/v3:approval/v3:id/@extension"/></ApplicationNumber>
</xsl:template>

<!-- PackageLabels -->
<xsl:template match="//v3:section[v3:code[@code='51945-4']]">
<PackageLabel> <!-- there can be multiple PRINCIPAL DISPLAY PANEL sections in a SPL -->
<MediaList>
<!-- If v3:observationMedia exists, process only those -->
<xsl:choose>
<xsl:when test=".//v3:observationMedia">
<xsl:for-each select=".//v3:observationMedia">
<Media>
<ID>
<xsl:value-of select="./@ID"/>
</ID>
<Image>
<xsl:value-of select="v3:value/v3:reference/@value"/>
</Image>
</Media>
</xsl:for-each>
</xsl:when>
<!-- Else, process v3:renderMultiMedia -->
<xsl:otherwise>
<xsl:for-each select=".//v3:renderMultiMedia">
<xsl:variable name="refID" select="@referencedObject"/>

<!-- look for the corresponding observationMedia with the same ID -->
<xsl:for-each select="//v3:observationMedia[@ID=$refID]">
<Media>
<ID>
<xsl:value-of select="./@ID"/>
</ID>
<Image>
<xsl:value-of select="v3:value/v3:reference/@value"/>
</Image>
</Media>
</xsl:for-each>
</xsl:for-each>
</xsl:otherwise>
</xsl:choose>
</MediaList>
<ID><xsl:value-of select=".//@root"/></ID>
<Text><xsl:value-of select="."/></Text>
</PackageLabel>
</xsl:template>

<!-- NDCList -->
<xsl:template match="v3:document/v3:component">
<xsl:for-each select="//v3:containerPackagedProduct/v3:code[@codeSystem='2.16.840.1.113883.6.69']">
Expand Down
152 changes: 0 additions & 152 deletions airflow/dags/dailymed_daily/dailymed_daily_dag.py

This file was deleted.

16 changes: 0 additions & 16 deletions airflow/dags/dailymed_daily/organization_metrics.sql

This file was deleted.

Loading