forked from datasets/covid-19
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathprocess.py
92 lines (87 loc) · 3.26 KB
/
process.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
from dataflows import Flow, load, unpivot, find_replace, set_type, dump_to_path, update_package, update_resource, join, join_with_self, add_computed_field, delete_fields, checkpoint, duplicate
BASE_URL = 'https://raw.githubusercontent.com/CSSEGISandData/COVID-19/master/csse_covid_19_data/csse_covid_19_time_series/'
CONFIRMED = 'time_series_19-covid-Confirmed.csv'
DEATH = 'time_series_19-covid-Deaths.csv'
RECOVERED = 'time_series_19-covid-Recovered.csv'
def to_normal_date(row):
old_date = row['Date']
month, day, year = row['Date'].split('-')
day = f'0{day}' if len(day) == 1 else day
month = f'0{month}' if len(month) == 1 else month
row['Date'] = '-'.join([day, month, year])
unpivoting_fields = [
{ 'name': '([0-9]+\/[0-9]+\/[0-9]+)', 'keys': {'Date': r'\1'} }
]
extra_keys = [{'name': 'Date', 'type': 'string'} ]
extra_value = {'name': 'Case', 'type': 'number'}
Flow(
load(f'{BASE_URL}{CONFIRMED}'),
load(f'{BASE_URL}{RECOVERED}'),
load(f'{BASE_URL}{DEATH}'),
checkpoint('load_data'),
unpivot(unpivoting_fields, extra_keys, extra_value),
find_replace([{'name': 'Date', 'patterns': [{'find': '/', 'replace': '-'}]}]),
to_normal_date,
set_type('Date', type='date', format='%d-%m-%y', resources=None),
set_type('Case', type='number', resources=None),
join(
source_name='time_series_19-covid-Confirmed',
source_key=['Province/State', 'Country/Region', 'Date'],
source_delete=True,
target_name='time_series_19-covid-Deaths',
target_key=['Province/State', 'Country/Region', 'Date'],
fields=dict(Confirmed={
'name': 'Case',
'aggregate': 'first'
})
),
join(
source_name='time_series_19-covid-Recovered',
source_key=['Province/State', 'Country/Region', 'Date'],
source_delete=True,
target_name='time_series_19-covid-Deaths',
target_key=['Province/State', 'Country/Region', 'Date'],
fields=dict(Recovered={
'name': 'Case',
'aggregate': 'first'
})
),
add_computed_field(
target={'name': 'Deaths', 'type': 'number'},
operation='format',
with_='{Case}'
),
delete_fields(['Case']),
update_resource('time_series_19-covid-Deaths', name='time-series-19-covid-combined', path='data/time-series-19-covid-combined.csv'),
update_package(name='covid-19', title='Novel Coronavirus 2019'),
dump_to_path(),
checkpoint('processed_data'),
# Duplicate the stream to create aggregated data
duplicate(
source='time-series-19-covid-combined',
target_name='worldwide-aggregated',
target_path='worldwide-aggregated.csv'
),
join_with_self(
resource_name='worldwide-aggregated',
join_key=['Date'],
fields=dict(
Date={
'name': 'Date'
},
Confirmed={
'name': 'Confirmed',
'aggregate': 'sum'
},
Recovered={
'name': 'Recovered',
'aggregate': 'sum'
},
Deaths={
'name': 'Deaths',
'aggregate': 'sum'
}
)
),
dump_to_path()
).results()[0]