-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy path02_postgres_taxi.yaml
268 lines (248 loc) · 10.6 KB
/
02_postgres_taxi.yaml
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
id: 02_postgres_taxi
namespace: zoomcamp
description: |
The CSV Data used in the course: https://github.com/DataTalksClub/nyc-tlc-data/releases
inputs:
- id: taxi
type: SELECT
displayName: Select taxi type
values: [yellow, green]
defaults: yellow
- id: year
type: SELECT
displayName: Select year
values: ["2019", "2020"]
defaults: "2019"
- id: month
type: SELECT
displayName: Select month
values: ["01", "02", "03", "04", "05", "06", "07", "08", "09", "10", "11", "12"]
defaults: "01"
variables:
file: "{{inputs.taxi}}_tripdata_{{inputs.year}}-{{inputs.month}}.csv"
table: "public.{{inputs.taxi}}_tripdata_{{inputs.year}}_{{inputs.month}}"
final_table: "public.{{inputs.taxi}}_tripdata"
data: "{{outputs.extract.outputFiles[inputs.taxi ~ '_tripdata_' ~ inputs.year ~ '-' ~ inputs.month ~ '.csv']}}"
tasks:
- id: set_label
type: io.kestra.plugin.core.execution.Labels
labels:
file: "{{render(vars.file)}}"
taxi: "{{inputs.taxi}}"
- id: extract
type: io.kestra.plugin.scripts.shell.Commands
outputFiles:
- "*.csv"
taskRunner:
type: io.kestra.plugin.core.runner.Process
commands:
- wget -qO- https://github.com/DataTalksClub/nyc-tlc-data/releases/download/{{inputs.taxi}}/{{render(vars.file)}}.gz | gunzip > {{render(vars.file)}}
- id: yellow_final_table
runIf: "{{inputs.taxi == 'yellow'}}"
type: io.kestra.plugin.jdbc.postgresql.Queries
sql: |
CREATE TABLE IF NOT EXISTS {{render(vars.final_table)}} (
unique_row_id text,
filename text,
VendorID text,
tpep_pickup_datetime timestamp,
tpep_dropoff_datetime timestamp,
passenger_count integer,
trip_distance double precision,
RatecodeID text,
store_and_fwd_flag text,
PULocationID text,
DOLocationID text,
payment_type integer,
fare_amount double precision,
extra double precision,
mta_tax double precision,
tip_amount double precision,
tolls_amount double precision,
improvement_surcharge double precision,
total_amount double precision,
congestion_surcharge double precision
);
- id: green_final_table
runIf: "{{inputs.taxi == 'green'}}"
type: io.kestra.plugin.jdbc.postgresql.Queries
sql: |
CREATE TABLE IF NOT EXISTS {{render(vars.final_table)}} (
unique_row_id text,
filename text,
VendorID text,
lpep_pickup_datetime timestamp,
lpep_dropoff_datetime timestamp,
store_and_fwd_flag text,
RatecodeID text,
PULocationID text,
DOLocationID text,
passenger_count integer,
trip_distance double precision,
fare_amount double precision,
extra double precision,
mta_tax double precision,
tip_amount double precision,
tolls_amount double precision,
ehail_fee double precision,
improvement_surcharge double precision,
total_amount double precision,
payment_type integer,
trip_type integer,
congestion_surcharge double precision
);
# Create Regular Monthly Table
- id: yellow_monthly_table
runIf: "{{inputs.taxi == 'yellow'}}"
type: io.kestra.plugin.jdbc.postgresql.Queries
sql: |
CREATE TABLE IF NOT EXISTS {{render(vars.table)}} (
VendorID text,
tpep_pickup_datetime timestamp,
tpep_dropoff_datetime timestamp,
passenger_count integer,
trip_distance double precision,
RatecodeID text,
store_and_fwd_flag text,
PULocationID text,
DOLocationID text,
payment_type integer,
fare_amount double precision,
extra double precision,
mta_tax double precision,
tip_amount double precision,
tolls_amount double precision,
improvement_surcharge double precision,
total_amount double precision,
congestion_surcharge double precision
);
- id: green_monthly_table
runIf: "{{inputs.taxi == 'green'}}"
type: io.kestra.plugin.jdbc.postgresql.Queries
sql: |
CREATE TABLE IF NOT EXISTS {{render(vars.table)}} (
VendorID text,
lpep_pickup_datetime timestamp,
lpep_dropoff_datetime timestamp,
store_and_fwd_flag text,
RatecodeID text,
PULocationID text,
DOLocationID text,
passenger_count integer,
trip_distance double precision,
fare_amount double precision,
extra double precision,
mta_tax double precision,
tip_amount double precision,
tolls_amount double precision,
ehail_fee double precision,
improvement_surcharge double precision,
total_amount double precision,
payment_type integer,
trip_type integer,
congestion_surcharge double precision
);
- id: truncate_table
type: io.kestra.plugin.jdbc.postgresql.Queries
sql: |
TRUNCATE TABLE {{render(vars.table)}};
- id: green_copy_in
runIf: "{{inputs.taxi == 'green'}}"
type: io.kestra.plugin.jdbc.postgresql.CopyIn
format: CSV
from: "{{render(vars.data)}}"
table: "{{render(vars.table)}}"
header: true
columns: [VendorID,lpep_pickup_datetime,lpep_dropoff_datetime,store_and_fwd_flag,RatecodeID,PULocationID,DOLocationID,passenger_count,trip_distance,fare_amount,extra,mta_tax,tip_amount,tolls_amount,ehail_fee,improvement_surcharge,total_amount,payment_type,trip_type,congestion_surcharge]
- id: yellow_copy_in
runIf: "{{inputs.taxi == 'yellow'}}"
type: io.kestra.plugin.jdbc.postgresql.CopyIn
format: CSV
from: "{{render(vars.data)}}"
table: "{{render(vars.table)}}"
header: true
columns: [VendorID,tpep_pickup_datetime,tpep_dropoff_datetime,passenger_count,trip_distance,RatecodeID,store_and_fwd_flag,PULocationID,DOLocationID,payment_type,fare_amount,extra,mta_tax,tip_amount,tolls_amount,improvement_surcharge,total_amount,congestion_surcharge]
- id: yellow_add_unique_id_and_filename
runIf: "{{inputs.taxi == 'yellow'}}"
type: io.kestra.plugin.jdbc.postgresql.Queries
sql: |
ALTER TABLE {{render(vars.table)}}
ADD COLUMN IF NOT EXISTS unique_row_id text,
ADD COLUMN IF NOT EXISTS filename text;
UPDATE {{render(vars.table)}}
SET
unique_row_id = md5(
COALESCE(CAST(VendorID AS text), '') ||
COALESCE(CAST(tpep_pickup_datetime AS text), '') ||
COALESCE(CAST(tpep_dropoff_datetime AS text), '') ||
COALESCE(PULocationID, '') ||
COALESCE(DOLocationID, '') ||
COALESCE(CAST(fare_amount AS text), '') ||
COALESCE(CAST(trip_distance AS text), '')
),
filename = '{{render(vars.file)}}';
- id: green_add_unique_id_and_filename
runIf: "{{inputs.taxi == 'green'}}"
type: io.kestra.plugin.jdbc.postgresql.Queries
sql: |
ALTER TABLE {{render(vars.table)}}
ADD COLUMN IF NOT EXISTS unique_row_id text,
ADD COLUMN IF NOT EXISTS filename text;
UPDATE {{render(vars.table)}}
SET
unique_row_id = md5(
COALESCE(CAST(VendorID AS text), '') ||
COALESCE(CAST(lpep_pickup_datetime AS text), '') ||
COALESCE(CAST(lpep_dropoff_datetime AS text), '') ||
COALESCE(PULocationID, '') ||
COALESCE(DOLocationID, '') ||
COALESCE(CAST(fare_amount AS text), '') ||
COALESCE(CAST(trip_distance AS text), '')
),
filename = '{{render(vars.file)}}';
- id: yellow_merge_data
runIf: "{{inputs.taxi == 'yellow'}}"
type: io.kestra.plugin.jdbc.postgresql.Queries
sql: |
MERGE INTO {{render(vars.final_table)}} AS T
USING {{render(vars.table)}} AS S
ON T.unique_row_id = S.unique_row_id
WHEN NOT MATCHED THEN
INSERT (
unique_row_id, filename, VendorID, tpep_pickup_datetime, tpep_dropoff_datetime,
passenger_count, trip_distance, RatecodeID, store_and_fwd_flag, PULocationID,
DOLocationID, payment_type, fare_amount, extra, mta_tax, tip_amount, tolls_amount,
improvement_surcharge, total_amount, congestion_surcharge
)
VALUES (
S.unique_row_id, S.filename, S.VendorID, S.tpep_pickup_datetime, S.tpep_dropoff_datetime,
S.passenger_count, S.trip_distance, S.RatecodeID, S.store_and_fwd_flag, S.PULocationID,
S.DOLocationID, S.payment_type, S.fare_amount, S.extra, S.mta_tax, S.tip_amount, S.tolls_amount,
S.improvement_surcharge, S.total_amount, S.congestion_surcharge
);
- id: green_merge_data
runIf: "{{inputs.taxi == 'green'}}"
type: io.kestra.plugin.jdbc.postgresql.Queries
sql: |
MERGE INTO {{render(vars.final_table)}} AS T
USING {{render(vars.table)}} AS S
ON T.unique_row_id = S.unique_row_id
WHEN NOT MATCHED THEN
INSERT (
unique_row_id, filename, VendorID, lpep_pickup_datetime, lpep_dropoff_datetime,
store_and_fwd_flag, RatecodeID, PULocationID, DOLocationID, passenger_count,
trip_distance, fare_amount, extra, mta_tax, tip_amount, tolls_amount, ehail_fee,
improvement_surcharge, total_amount, payment_type, trip_type, congestion_surcharge
)
VALUES (
S.unique_row_id, S.filename, S.VendorID, S.lpep_pickup_datetime, S.lpep_dropoff_datetime,
S.store_and_fwd_flag, S.RatecodeID, S.PULocationID, S.DOLocationID, S.passenger_count,
S.trip_distance, S.fare_amount, S.extra, S.mta_tax, S.tip_amount, S.tolls_amount, S.ehail_fee,
S.improvement_surcharge, S.total_amount, S.payment_type, S.trip_type, S.congestion_surcharge
);
pluginDefaults:
- type: io.kestra.plugin.jdbc.postgresql
values:
url: jdbc:postgresql://host.docker.internal:5432/kestra
username: kestra
password: k3str4