Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Salvage OCR / Barcode Scanning code for future optimization #329

Closed
jrlegrand opened this issue Oct 30, 2024 · 4 comments
Closed

Salvage OCR / Barcode Scanning code for future optimization #329

jrlegrand opened this issue Oct 30, 2024 · 4 comments
Assignees
Labels
data source New data source, or data source related optimization Nice to have, but not critical

Comments

@jrlegrand
Copy link
Member

Problem Statement

Right now, @lprzychodzien has some great starter code for doing OCR / Barcode scanning in one of the two branches below:

  1. https://github.com/coderxio/sagerx/tree/dailymed - Lukasz's original branch
  2. https://github.com/coderxio/sagerx/tree/dailymed_images_joey - Joey's fork of Lukasz's original branch

We want to pull the good stuff from those branches and stick them into perhaps a separate python script (maybe ocr.py) in the airflow/dags/dailymed folder so that we can save it for later, reference it in the #328 issue, and then delete the above branches to avoid confusion and reduce our branch count.

Criteria for Success

All the good OCR / Barcode Scanning is safely stored for future work in the main branch and we don't have any more dailymed branches.

Additional Information

Make note of dependencies needed in the python code - don't actually add the dependencies to requirements.txt for this PR.

@jrlegrand
Copy link
Member Author

jrlegrand commented Jan 18, 2025

I accidentally blew up the important branch for this because it was really old and just cluttering up our branches... whoops.

Good news is I still had a local version of the branch. This is the most important file for OCR-related work, I believe.

airflow/dags/dailymed/dailymed_images.py

import os
import pandas as pd
from dailymed.dailymed import DailyMed
from xml_functions import parse_dm_xml_to_dict
from sagerx import load_df_to_pg
import logging


class DailyMedImages(DailyMed):
    def __init__(self, data_folder: os.PathLike) -> None:
        super().__init__(data_folder)

    def get_full_ndc_variants(self, ndcs):
        ndcs_11 = [self.convert_ndc_10_to_11(ndc) for ndc in ndcs]
        ndcs.extend(ndcs_11)
        ndcs_nd = [self.convert_ndc_no_dash(ndc) for ndc in ndcs]
        ndcs.extend(ndcs_nd)
        ndcs.sort(key=lambda s: len(s), reverse=True)
        return ndcs
    
    def get_ndc_from_image_filename(self, ndc_variants, image_id):
        # attempt to regex an NDC from image file name
        ndc_matches = self.ndc_format(image_id)

        # if NDC match found
        if ndc_matches:
            image_ndc = ndc_matches[0]
            # compare it against all valid NDC variants in SPL
            # TODO: convert ndc_variants to a dict and iterate
            # through items so that it compares to the list of variants,
            # but returns the original NDC that is represnted by those variants
            for ndc in ndc_variants:
                if ndc == image_ndc:
                    return ndc
            # if no valid NDC variant match, assume it is a
            # random NDC-length number and disregard match
            return None
        # if no NDC match found in image file name, return None
        else:
            return None
        
    def find_image_components(self,xml_doc):
        components = []
        for component in xml_doc['document']['component']['structuredBody']['component']:
            if component['section']['code']['@code'] == '51945-4':
                components.append(component)
        return components
    
    def get_ndcs_from_image_components(self, xml_doc, ndc_ids, image_ids):
        mapped_dict = {}
        #print(xml_doc)
        packages = xml_doc.get('PackageData', {}).get('Package', [])
        if not isinstance(packages, list):
            packages = [packages]

        # loop through the packages and apply the regex
        for package in packages:
            text = package.get('Text', '')

            # there can be multiple Media in a Package
            # for some reason the xmltodict and/or XML
            # stores as a non-list if only one element
            media_list = package.get('MediaList', {})
            if media_list:
                medias = media_list.get('Media', [])
                if not isinstance(medias, list):
                    medias = [medias]

                # add all valid images
                images = []
                for media in medias:
                    image = media.get('Image', '')
                    # TODO: not sure we need the below check
                    # since we are starting with a subset of components
                    # we know/believe to be package label info
                    if image in image_ids:
                        images.append(image)
                
                print(images)

                # check if the text matches the regex pattern
                ndc_matches = self.ndc_format(text)
                print(f'initial ndc_matches: {ndc_matches}')
                # get distinct ndc_matches because
                # sometimes the NDC is repeated multiples
                # times in a component
                if ndc_matches:
                    # keep the list of ndc_matches in the same order it was found
                    # but filter out NDCs that are not in the SPL
                    final_ndc_matches = []
                    found_ndc_matches = set()

                    # compare ndc_matches to list of SPL NDCs
                    # maintain the order of NDCs found in the free text
                    # remove duplicate NDCs found in the free text
                    for ndc in ndc_matches:
                        if ndc in ndc_ids and ndc not in found_ndc_matches:
                            final_ndc_matches.append(ndc)
                            found_ndc_matches.add(ndc)
                    print(f'de-dup ndc_matches: {final_ndc_matches}')
                    # if the number of NDC maches equals
                    # the number of images
                    if len(final_ndc_matches) == len(images):
                        for idx, ndc_match in enumerate(final_ndc_matches):
                            # map the NDC to the image in the
                            # same list position
                            # NOTE: this is an assumption and needs
                            # to be validated / verified
                            mapped_dict[ndc_match] = images[idx]

        return mapped_dict

    def extract_and_upload_mapped_ndcs_from_image_files(self):
        mapping_dict = self.file_mapping

        image_ndc_mapping = {}

        for spl, mapping in mapping_dict.items():
            # get all image file names associated with the SPL
            image_files = mapping.get('image_files')
            # get all NDCs associated with the SPL
            ndcs = mapping.get('ndcIds')
            # get all variants of each NDC to check against potential
            # different formatting in the image name
            # TODO: reconfigure this as a dict so that the original NDC
            # points to a list of all variations of itself, including itself
            ndc_variants = self.get_full_ndc_variants(ndcs)            

            for image_file in image_files:
                # attempt to regex an NDC out of each image
                # and also ensure that the NDC matches an NDC
                # from the SPL - not a random NDC-length number
                matched_ndc = self.get_ndc_from_image_filename(ndc_variants, image_file)
                
                # if a match is found, add it to a mapping dict
                if matched_ndc:
                    image_ndc_mapping[matched_ndc] = {
                        'image_file':image_file,
                        'spl':spl, 
                        'methodology':'image_filename',
                        'confidence_level':1,
                        'matched':1
                    }
            
            # add un-matched NDCs to the list
            # NOTE: maybe instead, we add un-matched images to the list?
            for ndc in ndcs:
                if ndc not in image_ndc_mapping.keys():
                    image_ndc_mapping[ndc] = {
                        'image_file':'',
                        'spl':spl, 
                        'methodology':'image_filename',
                        'confidence_level':1,
                        'matched':0
                    }
            
        df = pd.DataFrame.from_dict(image_ndc_mapping, orient='index')
        df = df.reset_index().rename(columns={'index':'ndc'})
        load_df_to_pg(df,"sagerx_lake","dailymed_image_ndc","append")


    def extract_and_upload_mapped_ndcs_from_image_components(self):
        mapping_dict = self.file_mapping

        image_ndc_mapping = {}

        for spl, mapping in mapping_dict.items():
            # get all image file names associated with the SPL
            image_files = mapping.get('image_files')
            # get all NDCs associated with the SPL
            ndcs = mapping.get('ndcIds')

            # Get NDCs from XML components
            spl_folder_name = mapping.get("spl_folder_name")
            xml_file_path = self.get_file_path(spl_folder_name, mapping.get("xml_file"))
            xml_doc = self.find_xml_package_data(xml_file_path)
            
            matched_components = self.get_ndcs_from_image_components(xml_doc, ndcs, image_files)

            for ndc,image_file in matched_components.items():
                image_ndc_mapping[ndc] = {
                        'image_file':image_file,
                        'spl':spl, 
                        'methodology':'image_component',
                        'confidence_level':0.75,
                        'matched':1} 
            
            for ndc in ndcs:
                if ndc not in image_ndc_mapping.keys():
                    image_ndc_mapping[ndc] = {
                        'image_file':'',
                        'spl':spl, 
                        'methodology':'image_component',
                        'confidence_level':1,
                        'matched':0} 
                
        df = pd.DataFrame.from_dict(image_ndc_mapping, orient='index')
        df = df.reset_index().rename(columns={'index':'ndc'})
        load_df_to_pg(df,"sagerx_lake","dailymed_image_ndc","append")


    def barcode_to_ndc(self,data):
        if len(data) > 11:
            data = data[:-1]
            data = data[2:]

        if len(data) == 10:
            data = data[:-1] + '0' + data[-1]
        
        return data

    def extract_and_upload_mapped_ndcs_from_image_barcode(self):
        from PIL import Image, ImageOps
        from pyzbar.pyzbar import decode

        mapping_dict = self.file_mapping
        image_ndc_mapping = {}

        for spl, mapping in mapping_dict.items():
            logging.debug(f"image barcode check for {spl}")

            ndcs = mapping.get('ndcIds')
            ndcs = self.get_full_ndc_varients(ndcs)
            image_files = mapping.get('image_files')

            spl_folder_name = mapping.get("spl_folder_name")

            for image_file in image_files:
                image_file_path = self.get_file_path(spl_folder_name, image_file)
                
                img = Image.open(image_file_path)
                img = img.convert('L')  
                img = ImageOps.autocontrast(img)
                barcodes = decode(img)
        
                if not barcodes:
                    logging.debug("No barcode found in the image.")
                    return
                
                for barcode in barcodes:
                    barcode_ndc  = self.barcode_to_ndc(barcode)
                    if barcode_ndc in ndcs:
                        image_ndc_mapping[barcode_ndc] = {
                        'image_file':image_file,
                        'spl':spl, 
                        'methodology':'image_barcode',
                        'confidence_level':0.5,
                        'matched':1} 

            for ndc in ndcs:
                if ndc not in image_ndc_mapping.keys():
                    image_ndc_mapping[ndc] = {
                        'image_file':'',
                        'spl':spl, 
                        'methodology':'image_barcode',
                        'confidence_level':0.5,
                        'matched':0} 
                    
        df = pd.DataFrame.from_dict(image_ndc_mapping, orient='index')
        df = df.reset_index().rename(columns={'index':'ndc'})
        load_df_to_pg(df,"sagerx_lake","dailymed_image_ndc","append")



    def extract_and_upload_mapped_ndcs_from_image_ocr(self):
        import pytesseract
        from PIL import Image

        mapping_dict = self.file_mapping
        image_ndc_mapping = {}

        for spl, mapping in mapping_dict.items():
            logging.debug(f"image OCR check for {spl}")

            ndcs = mapping.get('ndcIds')
            ndcs = self.get_full_ndc_varients(ndcs)
            image_files = mapping.get('image_files')

            spl_folder_name = mapping.get("spl_folder_name")

            for image_file in image_files:
                image_file_path = self.get_file_path(spl_folder_name, image_file)
                
                img = Image.open(image_file_path)
                ocr_text = pytesseract.image_to_string(img)
                lines = ocr_text.splitlines()

                for line in lines:
                    matched_ndc = self.ndc_format(line)

                    if matched_ndc:
                        image_ndc_mapping[matched_ndc] = {
                        'image_file':image_file,
                        'spl':spl, 
                        'methodology':'image_ocr',
                        'confidence_level':0.25,
                        'matched':1} 
        
        for ndc in ndcs:
                if ndc not in image_ndc_mapping.keys():
                    image_ndc_mapping[ndc] = {
                        'image_file':'',
                        'spl':spl, 
                        'methodology':'image_ocr',
                        'confidence_level':0.25,
                        'matched':0} 
                    
        df = pd.DataFrame.from_dict(image_ndc_mapping, orient='index')
        df = df.reset_index().rename(columns={'index':'ndc'})
        load_df_to_pg(df,"sagerx_lake","dailymed_image_ndc","append")

@jrlegrand
Copy link
Member Author

I think this file is less important, but sharing anyway.

airflow/dags/dailymed/dailymed.py

import os
from pathlib import Path
from xml_functions import transform_xml_to_dict, get_xsl_template_path, transform_xml
import pandas as pd
from sagerx import load_df_to_pg
import logging
from airflow import configuration as conf
import re


class DailyMed():
    def __init__(self, data_folder: os.PathLike) -> None:
        self.data_folder = data_folder
        self.rx_folder = Path(data_folder) / "prescription"

        airflow_logging_level = conf.get('logging', 'logging_level')

        if airflow_logging_level == 'DEBUG':
            logging.debug("This is a debug message that will only be logging.debuged when logging_level is set to DEBUG.")
        else:
            logging.info("This is an info message, but DEBUG level messages will not be logging.debuged.")


        ### 
        # Supplementary Functions
        ### 

        # order of patterns is important
        # largest to smallest
        self.ndc_pattern = re.compile(r"""
                        \d{11}              | # 11 digit
                        \d{10}              | # 10 digit
                        \d{5}-\d{5}         | # 5-5
                        \d{5}-\d{4}-\d{2}   | # 5-4-2
                        \d{5}-\d{4}-\d{1}   | # 5-4-1
                        \d{5}-\d{3}-\d{2}   | # 5-3-2
                        \d{4}-\d{6}         | # 4-6
                        \d{4}-\d{4}-\d{2}     # 4-4-2
                    """, re.X)

    def ndc_format(self,text_data):
        matches = re.findall(self.ndc_pattern, text_data)
        if matches:
            return matches      
        return None
    
    def convert_ndc_10_to_11(self,ndc):
        parts = ndc.split('-')
        if len(parts[-1]) == 1:
            parts[-1] = '0' + parts[-1]
        return '-'.join(parts)
    
    def convert_ndc_no_dash(self,ndc):
        return ndc.replace("-","")

    ###
    # XML Processing 
    ###

    def find_xml_image_ids(self, xml_doc) -> list:
        xslt = get_xsl_template_path("package_data.xsl")
        results = transform_xml_to_dict(xml_doc,xslt)
        return list(set(results.get('Image',[])))

    def find_xml_ndc_numbers(self, xml_doc) -> list:
        xslt = get_xsl_template_path("ndcs.xsl")
        results = transform_xml_to_dict(xml_doc,xslt)
        return list(set(results.get('NDCs', {}).get('NDC', [])))
    
    def find_xml_metadata(self, xml_doc) -> dict:
        xslt = get_xsl_template_path("doc_metadata.xsl")
        results = transform_xml_to_dict(xml_doc,xslt)
        return results
    
    def find_xml_package_data(self, xml_doc) -> dict:
        xslt = get_xsl_template_path("package_data.xsl")
        results = transform_xml_to_dict(xml_doc,xslt)
        return results

    def metadata_dict_cleanup(self, metadata):
        new_dict = {}
        for key, value in metadata.items():
            if isinstance(value, list) and len(value) == 1:
                new_dict[key] = str(value[0])
            elif isinstance(value, list) and len(value) > 1:
                new_dict[key] = value
        return new_dict


    def process_xml_doc(self, xml_doc):
        image_ids = self.find_xml_image_ids(xml_doc)
        ndc_ids = self.find_xml_ndc_numbers(xml_doc)

        metadata = self.find_xml_metadata(xml_doc)

        metadata['imageIds'] = image_ids
        metadata['ndcIds'] = ndc_ids
        return metadata

    ### 
    # File Processing
    ###
    
    def unzip_data(self) ->  None:
        import zipfile
        for zip_folder in self.rx_folder.iterdir():
            if zip_folder.is_file() and zip_folder.suffix == '.zip':
                logging.debug(zip_folder)
                with zipfile.ZipFile(zip_folder) as unzipped_folder:
                    folder_name = zip_folder.stem
                    extracted_folder_path = self.rx_folder / folder_name
                    extracted_folder_path.mkdir(exist_ok=True)

                    for subfile in unzipped_folder.infolist():
                        unzipped_folder.extract(subfile, extracted_folder_path)

                os.remove(zip_folder)

    def map_files(self):
        file_mapping ={}
        for spl_folder in self.rx_folder.iterdir():
            if spl_folder.name == '.DS_Store':
                continue

            image_files = []
            xml_file_name = ""

            for subfile in spl_folder.iterdir():
                if subfile.suffix == '.xml':
                    xml_file_name = subfile.name
                elif subfile.suffix == '.jpg':
                    image_files.append(subfile.name)

            spl = spl_folder.name.split("_")[1]

            xml_path = self.get_file_path(spl_folder, xml_file_name)
            metadata = self.process_xml_doc(xml_path)

            file_dict = {
                "xml_file":xml_file_name,
                "image_files": image_files,
                "spl_folder_name": spl_folder.name
            }
            file_mapping[spl] = dict(file_dict, **metadata)
        self.file_mapping = file_mapping
        logging.debug(file_mapping)

    
    def get_file_path(self, spl_folder_name, file_name):
        return os.path.join(self.rx_folder,spl_folder_name,file_name)

    ###
    # Data Extraction for DailyMed Daily
    ###

    def extract_and_upload_dmd_base_data(self):
        xslt = get_xsl_template_path("dailymed_prescription.xsl")

            
        for spl, mapping in self.file_mapping.items():
            spl_folder_name = mapping.get("spl_folder_name")
            xml_file = self.get_file_path(spl_folder_name, mapping.get("xml_file"))
            xml_content = transform_xml(xml_file, xslt)

            df = pd.DataFrame(
                columns=["spl","spl_folder_name", "xml_file_name", "xml_content","image_files"],
                data=[[spl, spl_folder_name, mapping.get("xml_file"), xml_content, mapping.get("image_files")]],
            )

            load_df_to_pg(df,"sagerx_lake","dailymed_daily","append")

@jrlegrand
Copy link
Member Author

One last one. This was converted to a txt file - I think maybe because it was old or we didn't want it to be picked up by Airflow.

airflow/dags/dailymed/dag.txt

import pendulum

from airflow_operator import create_dag
from airflow.utils.helpers import chain

from airflow.decorators import task
import pandas as pd

from airflow.dags.dailymed_images.dailymed_images import *
from sagerx import load_df_to_pg

dag_id = "dailymed_images"

dag = create_dag(
    dag_id=dag_id,
    schedule= "0 8 * * 1-5",  # at 7:45 am once daily)
    start_date=pendulum.yesterday(),
    catchup=False,
    max_active_runs=1,
    concurrency=2,
)

"""
Process

1. get xml data from dailymed_daily
2. read xml data 
3. find ndc and image data in xml
4. map ndc and image data 
5. map ndc and image ids toghether
5.a. check to see if NDC is in the image name, if so map together
5.b. check to see if NDC is under the same 51945-4 component, if so then map together
5.c. Run image PCR to pull NDC fr
6. upload to postgres


"""


@task
def get_dailymed_data():
    from airflow.hooks.postgres_hook import PostgresHook

    query = "SELECT * FROM sagerx_lake.dailymed_daily"
    pg_hook = PostgresHook(postgres_conn_id="postgres_default")
    engine = pg_hook.get_sqlalchemy_engine()
    df = pd.read_sql(query, con=engine)
    print(f"DF length of {len(df)} with columns: {df.columns}")

    df['raw_xml_content'] = df.apply(parse_xml_content, axis=1)
    df['set_id'] = df.apply(lambda x: extract_set_id(x['raw_xml_content']), axis=1)
    df['ndc_ids'] = df.apply(lambda x: find_ndc_numbers(x['raw_xml_content']), axis=1)
    df['image_ids'] = df.apply(lambda x: find_image_ids(x['raw_xml_content']), axis=1)
    df['ndc_image_mapping'] = df.apply(map_ndcs_parent_function, axis=1)

    load_df_to_pg(df[['spl','file_name','set_id','ndc_ids','image_ids','ndc_image_mapping']],"sagerx_lake","dailymed_images","replace",dtype_name="ndc_image_mapping")

    #df['med_dict'] = df.apply(extract_data, axis=1)

    # dfs = []
    # for md in df['med_dict']:
    #     df_temp = pd.DataFrame.from_dict(md,orient='index')
    #     df_temp.index.name = 'ndc'
    #     dfs.append(df_temp)

    # df_final = pd.concat(dfs)
    # df_final = df_final.reset_index()

    #print(df_final)
    # return df
     
    
with dag:
    get_dailymed_data()

@jrlegrand jrlegrand added the data source New data source, or data source related label Jan 18, 2025
@jrlegrand
Copy link
Member Author

Marking as complete. This was prework for related issue #328. The key pieces of information are in my most recent comments.

@github-project-automation github-project-automation bot moved this from Todo to Done in SageRx Sprint Board Jan 19, 2025
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
data source New data source, or data source related optimization Nice to have, but not critical
Projects
Status: Done
Development

No branches or pull requests

2 participants