diff --git a/.github/workflows/push.yml b/.github/workflows/push.yml new file mode 100644 index 0000000..27ffe10 --- /dev/null +++ b/.github/workflows/push.yml @@ -0,0 +1,36 @@ +name: Little Kidogo Interruption Parser +on: [push] +jobs: + build: + runs-on: ubuntu-latest + strategy: + matrix: + python-version: [3.5] + + steps: + - uses: actions/checkout@v2 + - name: Set Up Python ${{matrix.python-version}} + uses: actions/setup-python@v2 + with: + python-version: ${{ matrix.python-version }} + - name: Install Dependencies + run : | + python -m pip install --upgrade pip + pip install -r requirements.txt + + compile: + runs-on: ubuntu-latest + steps: + - uses: actions/checkout@v2 + - name: Set Up Python ${{matrix.python-version}} + uses: actions/setup-python@v2 + with: + python-version: ${{ matrix.python-version }} + - name: Install Dependencies and test + run: | + python -m pip install --upgrade pip + pip install -r requirements.txt + python manage.py test + + + diff --git a/api/__init__.py b/api/__init__.py index 54ce9af..37ae0dd 100644 --- a/api/__init__.py +++ b/api/__init__.py @@ -5,6 +5,7 @@ from api.config import app_config from parser.parser import parse from api.util import validate_url +import json document_url = None callback_url = None @@ -30,8 +31,10 @@ def run_parse(): response = { 'error': 'None', 'data': data - } - requests.post(callback_url, headers=headers, data=response) + } + + res = json.dumps(response) + requests.post(callback_url, headers=headers, data=res) def create_app(config_name): diff --git a/parser/parser.py b/parser/parser.py index d230418..84e1e07 100644 --- a/parser/parser.py +++ b/parser/parser.py @@ -2,136 +2,155 @@ from pdfminer.high_level import extract_text from shutil import copyfileobj import tempfile - -keywords = ['REGION', 'COUNTY', 'TIME', 'DATE','AREA', ',' ] - -class County: - name = None - area = None - time = None - date = None - locations = [] - - def serialize(self): - if self.name == None: - return - - return { 'name': self.name, - 'area': self.area, - 'time': self.time, - 'date': self.date, - 'locations': self.locations - } - -class Region: - region = None - counties = [] - - def serialize(self): - ser_counties = [] - for sc in self.counties: - ser_counties.append(sc.serialize()) - - return {'region': self.region, - 'counties': ser_counties - } - -def download_file(url): +from re import search, sub, IGNORECASE +from .util import rlstrip_dot, composite_function + +# { +# "region": { +# "name": "Region name", +# "counties": [ +# { +# "name": "County Name", +# "areas":[ +# { +# "name": "Area name", +# "details": { +# "date": "Date", +# "time": "Time", +# "locations": ["location"] +# } +# } +# ] +# } +# ] +# } +# } + +def get_text(url): r = requests.get(url, stream=True) temFile = tempfile.TemporaryFile() copyfileobj(r.raw, temFile) - return temFile - -def get_text(file_): - return extract_text(file_) - -def check_for_keyword(lines): - new_lines = [] - for line in lines: - for k in keywords: - if k in line: - new_lines.append(line) - break - return new_lines - -def take_lines(contents): - lines = [] - contents = contents.split('\n') - lappend = lines.append - for line in contents: - if len(line) < 3: continue - lappend(line) - return check_for_keyword(lines[1:]) - -def parse_(lines): - hit_county, hit_region, i = 0, 0, 0 - regions = [] - region = Region() - county = County() - rounds = len(lines) - for line in lines: - i += 1 - line = line.replace('\n', '').lstrip().rstrip() - if 'REGION' in line: - if hit_region == 0: - region.region = line - hit_region = 1 - elif hit_region == 1: - # another region encountered store current - region.counties.append(county) - county = County() - regions.append(region) - region = Region() - region.region = line - - elif 'COUNTY' in line: - if hit_county == 0: - county.name = line - hit_county = 1 - else: - region.counties.append(county) - county = County() - county.name = line - - elif 'DATE' in line and 'TIME' in line: - date_str = '' - for x in line: - if x == ' ': - continue - - if x == 'T': - county.date = date_str.replace('\n', '') - date_str = x - continue - - date_str += x - county.time = date_str - county.locations = lines[i].replace('\n', '').rstrip().lstrip().split(',') - - elif 'DATE' in line: - county.date = line[6:] - - elif 'TIME' in line: - county.time = line[6:] - county.locations = lines[i].replace('\n', '').rstrip().lstrip().split(',') - - elif 'AREA' in line: - county.area = line[6:] - - if i == rounds-1: - region.counties.append(county) - regions.append(region) - - + text = extract_text(temFile) + text = text.replace("\n", '.') + text = sub(r"[\s]{2,}", ' ', text) + return text + + +def get_regions(text): + regions = dict() + regex = r"[.]([a-zA-Z\s]+?REGION)(.+?)[.](?:[a-zA-Z\s]+?REGION)" + region_search = search(regex, text, IGNORECASE) + while region_search: + # Get the top regio + region = dict() + region["name"] = region_search.group(1).strip() + region_key = '_'.join(region["name"].lower().split(' ')) + region["counties"] = get_counties(region_search.group(2), regions, region_key) + regions[region_key] = region + # Remove the region + text = text.replace(region_search.group(1), '') + text = text.replace(region_search.group(2), '') + + # Do the region search again + region_search = search(regex, text, IGNORECASE) + + last_region_check = search(r"[.]([a-zA-Z\s]+?REGION)(.+?customers)", text, IGNORECASE) + if last_region_check: + # Get the last region + region = dict() + region["name"] = last_region_check.group(1).strip() + region_key = '_'.join(region["name"].lower().split(' ')) + region["counties"] = get_counties(last_region_check.group(2), regions, region_key) + regions[region_key] = region return regions +def get_counties(text, regions, region_key): + counties = list() + regex = r"[.]([a-zA-Z\s]+?COUNTY)(.+?)[.]([a-zA-Z\s]*?COUNTY)" + county_search = search(regex, text, IGNORECASE) + while county_search: + # Get the top county + county = dict() + county["name"] = county_search.group(1).strip() + county["areas"] = get_areas(county_search.group(2)) + + # Check if the region already exists + if region_key in regions.keys(): + regions[region_key]["counties"].append(county) + else: + counties.append(county) + + # Remove the county + text = text.replace(county_search.group(1), '') + text = text.replace(county_search.group(2), '') + + # Do the county search again + county_search = search(regex, text, IGNORECASE) + + last_county_check = search(r"[.]([a-zA-Z\s]+?COUNTY)(.+?)$", text, IGNORECASE) + if last_county_check: + # Get the last county + county = dict() + county["name"] = last_county_check.group(1).strip() + county["areas"] = get_areas(last_county_check.group(2)) + + # Check if the region already exists + if region_key in regions.keys(): + regions[region_key]["counties"].append(county) + else: + counties.append(county) + + return counties + +def get_areas(text): + areas = list() + regex = r"(AREA:[a-zA-Z\s,]+[.])(DATE.+?)AREA" + area_search = search(regex, text, IGNORECASE) + while area_search: + # Get the top area + area = dict() + area["name"] = area_search.group(1) + area["details"] = get_details(area_search.group(2)) + areas.append(area) + + # Remove the area + text = text.replace(area_search.group(1), '') + text = text.replace(area_search.group(2), '') + + # Do the county search again + area_search = search(regex, text, IGNORECASE) + + last_area_check = search(r"(AREA:[a-zA-Z\s,]+[.])(DATE.+?)$", text, IGNORECASE) + if last_area_check: + # Get the last area + area = dict() + area["name"] = last_area_check.group(1) + area["details"] = get_details(last_area_check.group(2)) + areas.append(area) + + return areas + +def get_details(text): + details = dict() + date_search = search(r"(DATE:)(.+?)TIME", text, IGNORECASE) + if date_search: + details["date"] = date_search.group(2).strip() + text = text.replace(date_search.group(1), '') + text = text.replace(date_search.group(2), '') + + time_search = search(r"(TIME:)(.+?P[.]M[.])", text, IGNORECASE) + if time_search: + details["time"] = time_search.group(2).strip() + text = text.replace(time_search.group(1), '') + text = text.replace(time_search.group(2), '') + + details["locations"] = get_locations(text) + + return details + +def get_locations(text): + stripSpaces = lambda location : location.strip() + return list(map(composite_function(stripSpaces, rlstrip_dot), text.split(','))) def parse(url): - tempFile = download_file(url) - file_data = take_lines(get_text(tempFile)) - all_data = parse_(file_data) - serialized_data = [] - append = serialized_data.append - for r in all_data: - append(r.serialize()) - return serialized_data + return get_regions(get_text(url)) diff --git a/parser/util.py b/parser/util.py new file mode 100644 index 0000000..2dd96e5 --- /dev/null +++ b/parser/util.py @@ -0,0 +1,17 @@ +from functools import reduce +from re import sub + +# strip leading and trailing dots +def rlstrip_dot(string): + return sub(r"^[\.]|\.\s+$", "", string) + + +# helper function + +# this function takes a number of functions and composes them +def composite_function(*func): + + def compose(f, g): + return lambda x : f(g(x)) + + return reduce(compose, func, lambda x : x) diff --git a/requirements.txt b/requirements.txt index 6b3e0a6..66a0aa4 100644 --- a/requirements.txt +++ b/requirements.txt @@ -1,18 +1,18 @@ -certifi==2020.4.5.1 -chardet==3.0.4 -click==7.1.2 -decorator==4.4.2 -Flask==1.1.2 -Flask-Script==2.0.6 -idna==2.9 -itsdangerous==1.1.0 -Jinja2==2.11.2 -MarkupSafe==1.1.1 -pdfminer.six==20200517 -pycryptodome==3.9.7 -requests==2.23.0 -six==1.15.0 -sortedcontainers==2.1.0 -urllib3==1.25.9 -validators==0.15.0 -Werkzeug==1.0.1 +certifi +chardet +click +decorator +Flask +Flask-Script +idna +itsdangerous +Jinja2 +MarkupSafe +pdfminer.six +pycryptodome +requests +six +sortedcontainers +urllib3 +validators +Werkzeug diff --git a/tests/test_parser.py b/tests/test_parser.py new file mode 100644 index 0000000..104fa4f --- /dev/null +++ b/tests/test_parser.py @@ -0,0 +1,14 @@ +import unittest +from parser.parser import parse + +class ParserTest(unittest.TestCase): + + def setUp(self): + pass + + def test_parse(self): + result = parse("https://kplc.co.ke/img/full/zSxZeOMiFWWi_Interruptions%20-%2016.04.2020.pdf") + self.assertNotEqual(len(result), 0) + +if __name__ == '__main__': + unittest.main()