-
Notifications
You must be signed in to change notification settings - Fork 8
/
Copy pathupload.py
84 lines (67 loc) · 2.92 KB
/
upload.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
import csv
import datetime
import apache_beam as beam
from apache_beam.options.pipeline_options import PipelineOptions
from apache_beam.io.gcp.datastore.v1.datastoreio import WriteToDatastore
from google.cloud.proto.datastore.v1 import entity_pb2
from googledatastore import helper as datastore_helper
from settings import PROJECT, BUCKET, INPUT_FILENAME
class CSVtoDict(beam.DoFn):
"""Converts line into dictionary"""
def process(self, element, headers):
rec = ""
element = element.encode('utf-8')
try:
for line in csv.reader([element]):
rec = line
if len(rec) == len(headers):
data = {header.strip(): val.strip() for header, val in zip(headers, rec)}
return [data]
else:
print "bad: {}".format(rec)
except Exception:
pass
class CreateEntities(beam.DoFn):
"""Creates Datastore entity"""
def process(self, element):
entity = entity_pb2.Entity()
sku = int(element.pop('sku'))
element['regularPrice'] = float(element['regularPrice'])
element['salePrice'] = float(element['salePrice'])
element['name'] = unicode(element['name'].decode('utf-8'))
element['type'] = unicode(element['type'].decode('utf-8'))
element['url'] = unicode(element['url'].decode('utf-8'))
element['image'] = unicode(element['image'].decode('utf-8'))
element['inStoreAvailability'] = unicode(element['inStoreAvailability'])
datastore_helper.add_key_path(entity.key, 'Productx', sku)
datastore_helper.add_properties(entity, element)
return [entity]
def dataflow(run_local):
if run_local:
input_file_path = 'sample.csv'
else:
input_file_path = 'gs://' + BUCKET + '/' + INPUT_FILENAME
JOB_NAME = 'datastore-upload-{}'.format(datetime.datetime.now().strftime('%Y-%m-%d-%H%M%S'))
pipeline_options = {
'project': PROJECT,
'staging_location': 'gs://' + BUCKET + '/staging',
'runner': 'DataflowRunner',
'job_name': JOB_NAME,
'disk_size_gb': 100,
'temp_location': 'gs://' + BUCKET + '/temp',
'save_main_session': True
}
if run_local:
pipeline_options['runner'] = 'DirectRunner'
options = PipelineOptions.from_dictionary(pipeline_options)
with beam.Pipeline(options=options) as p:
(p | 'Reading input file' >> beam.io.ReadFromText(input_file_path)
| 'Converting from csv to dict' >> beam.ParDo(CSVtoDict(),
['sku', 'name', 'regularPrice', 'salePrice', 'type', 'url', 'image',
'inStoreAvailability'])
| 'Create entities' >> beam.ParDo(CreateEntities())
| 'Write entities into Datastore' >> WriteToDatastore(PROJECT)
)
if __name__ == '__main__':
run_locally = False
dataflow(run_locally)