forked from tommertron/CovidReport
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathcovid_data_getter.py
executable file
·160 lines (133 loc) · 4.74 KB
/
covid_data_getter.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
# Import Modules
import pandas as pd
from datetime import date
import csv
from csv import DictWriter
from csv import DictReader
import time
import json
import urllib.request as ur
import urllib.parse as prs
from datetime import datetime, timedelta
###---Global Variables----
# Sets the name of the CSV file we'll be using to store data
file = "ontario_covid_data.csv"
# Gets today's date
today = date.today()
# Determines how many days back to check for data in the CSV file
DATESBACK = 10
# Creating the datasourceinfo variable which stores information about how to query different datasets
datasources = {
"CaseData": {
"id": "ed270bb8-340b-41f9-a7c6-e8ef587e6d11",
"datename": "Reported Date",
"fields": [
"Total Cases",
"Number of patients hospitalized with COVID-19",
"Number of patients in ICU due to COVID-19",
],
},
"VaccineData": {
"id": "8a89caa9-511c-4568-af89-7f2174b4378c",
"datename": "report_date",
"fields": [
"total_doses_administered",
"total_individuals_at_least_one",
"total_individuals_fully_vaccinated",
"total_individuals_3doses",
],
},
}
###---Functions---
# get list of fields from CSV file
def OpenCSV():
global localData
global field_names
with open(file) as csv_file:
csv_reader = csv.DictReader(csv_file)
dict_from_csv = dict(list(csv_reader)[0])
field_names = list(dict_from_csv.keys())
localData = pd.read_csv(file)
## Set the index of the CSV file to 'date' so we can reference rows by dates
localData.set_index("date", inplace=True)
localData.head()
# Function to pass in a new row to the CSV file
def addRow(elements):
# Open file in append mode
with open(file, "a+", newline="") as write_obj:
# Create a writer object from csv module
dict_writer = DictWriter(write_obj, fieldnames=field_names)
# Add dictionary as row in the csv
dict_writer.writerow(elements)
# Function to add given data to a given row and column
def addValue(row, column, value):
localData.loc[row, column] = value
localData.to_csv(file)
# Function to add a given date as a new row to the CSV
def dateCheck(day):
try:
localData.loc[day]
return True
except KeyError:
return False
# Function to check blank fields for a given date
def blankchecker(date, dataset):
fieldquery = []
for i in datasources[dataset]["fields"]:
try:
checker = 1 + int(localData.loc[date, i])
except ValueError:
fieldquery.append(i)
if fieldquery:
return fieldquery
# Function to build a query and get data
def querier(dataset, qfields, qdate):
urlstart = "https://data.ontario.ca/api/3/action/datastore_search?"
resourceid = "resource_id=" + datasources[dataset]["id"]
fieldQuery = "fields=%s" % (",".join(qfields))
reqdate = 'filters={"' + datasources[dataset]["datename"] + '":["' + qdate + '"]}'
queryurl = urlstart + resourceid + "&" + fieldQuery + "&" + reqdate
queryurl = queryurl.replace(" ", "%20")
fileobj = ur.urlopen(queryurl)
return json.loads(fileobj.read())
# Stub of a function to get blank data. Need to run this in a loop for every blank cell in each dataset for each day...
def blankfiller(date, dataset):
qfields = blankchecker(date, dataset)
if qfields is not None:
reqdata = querier(dataset, qfields, date)
if reqdata["result"]["total"] > 0:
for i in qfields:
addValue(date, i, reqdata["result"]["records"][0][i])
OpenCSV()
if __name__ == "__main__":
import argparse
import os
parser = argparse.ArgumentParser()
parser.add_argument("--csvdir", help="Directory of csv file")
args = parser.parse_args()
if args.csvdir:
file = os.path.join(args.csvdir, file)
# Open the CSV
OpenCSV()
# Add a blank line to end of CSV if one doesn't exist. (This is needed to add any new rows to the table.)
with open(file, "r") as f:
lines = f.readlines()
if "\n" in lines:
print("new line char found")
# Check if there are any missing rows and create if not
checkdate = today
for _ in range(DATESBACK):
OpenCSV()
if dateCheck(str(checkdate)) is False:
addRow({"date": checkdate})
checkdate = checkdate - timedelta(days=1)
# Populate missing values
checkdate = today
for _ in range(DATESBACK):
for i in datasources:
OpenCSV()
blankfiller(str(checkdate), i)
checkdate = checkdate - timedelta(days=1)
OpenCSV()
localData.sort_values(["date"], axis=0, ascending=[False], inplace=True)
localData.to_csv(file)