forked from andyhuynh3/target-jsonl
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathtarget_c8.py
executable file
·129 lines (108 loc) · 3.85 KB
/
target_c8.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
#!/usr/bin/env python3
import argparse
import io
import jsonschema
import simplejson as json
import sys
from c8 import C8Client
import singer
from jsonschema import Draft4Validator
from adjust_precision_for_schema import adjust_decimal_precision_for_schema
logger = singer.get_logger()
def emit_state(state):
if state is not None:
line = json.dumps(state)
logger.debug('Emitting state {}'.format(line))
sys.stdout.write("{}\n".format(line))
sys.stdout.flush()
def persist_messages(messages, target):
state = None
schemas = {}
key_properties = {}
validators = {}
collections = []
if target != None and target:
if not client.has_collection(target):
client.create_collection(name=target)
else:
for c in client.get_collections():
if not c['system']:
collections.append(c['name'])
for message in messages:
try:
o = singer.parse_message(message).asdict()
except json.decoder.JSONDecodeError:
logger.error("Unable to parse:\n{}".format(message))
raise
message_type = o['type']
if message_type == 'RECORD':
stream = o['stream']
if stream not in schemas:
raise Exception(
"A record for stream {}"
"was encountered before a corresponding schema".format(stream)
)
try:
validators[stream].validate((o['record']))
except jsonschema.ValidationError as e:
logger.error(f"Failed parsing the json schema for stream: {stream}.")
raise e
if target == None or not target:
if stream not in collections:
client.create_collection(name=stream)
collections.append(stream)
# Get Collecion Handle and Insert
coll = client.get_collection(stream)
else:
coll = client.get_collection(target)
logger.info('Writing a record')
try:
coll.insert(o['record'])
except TypeError as e:
# TODO: This is temporary until json serializing issue for Decimals are fixed in pyC8
logger.debug("pyC8 error occurred")
state = None
elif message_type == 'STATE':
logger.debug('Setting state to {}'.format(o['value']))
state = o['value']
elif message_type == 'SCHEMA':
stream = o['stream']
schemas[stream] = o['schema']
adjust_decimal_precision_for_schema(schemas[stream])
validators[stream] = Draft4Validator((o['schema']))
key_properties[stream] = o['key_properties']
else:
logger.warning("Unknown message type {} in message {}".format(o['type'], o))
return state
def main():
parser = argparse.ArgumentParser()
parser.add_argument('-c', '--config', help='Config file')
args = parser.parse_args()
if args.config:
with open(args.config) as input_json:
config = json.load(input_json)
else:
raise Exception(
"Required '--config' parameter was not provided"
)
region = config['region']
tenant = config['tenant']
fabric = config['fabric']
password = config['password']
target_collection = config['target_collection']
print("Create C8Client Connection")
global client
client = C8Client(
protocol='https',
host=region,
port=443,
email=tenant,
password=password,
geofabric=fabric
)
input_messages = io.TextIOWrapper(sys.stdin.buffer, encoding='utf-8')
state = persist_messages(input_messages, target_collection)
emit_state(state)
logger.debug("Exiting normally")
if __name__ == '__main__':
main()