-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathkinesis_reader.py
72 lines (56 loc) · 2.43 KB
/
kinesis_reader.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
import json
import os
import argparse
def list_shards(stream_name):
print('Listing shards.. ')
with os.popen('aws kinesis list-shards --stream-name ' + stream_name) as f:
data = json.loads(f.read())
return list(map(lambda shard: shard['ShardId'], data['Shards']))
def get_iterator(stream_name, shard_id):
with os.popen('aws kinesis get-shard-iterator --stream-name ' + stream_name +
' --shard-id ' + shard_id +
' --shard-iterator-type TRIM_HORIZON') as f:
data = json.loads(f.read())
return data['ShardIterator']
def get_records(iterator):
with os.popen('aws kinesis get-records --shard-iterator ' + iterator) as f:
data = json.loads(f.read())
return list(map(lambda shard: shard['Data'], data['Records']))
def store_into_file(dir_name, shard_name, records):
print('Shard ' + shard_name + ' contains ' + str(len(records)) + ' records')
if len(records) > 0:
print('Writting records...')
with open(dir_name + "/" + shard_name + ".txt", "w") as file:
for record in records:
with os.popen('echo ' + record + ' | base64 -d | jq ') as std:
output = std.read()
file.write(output)
else:
print('Skipping...')
def create_or_empty_directory(dir_name):
if not os.path.exists(dir_name):
print('Creating "' + dir_name + '" subdirectory to store result..')
os.mkdir(dir_name)
else:
print('"' + dir_name + '" directory is not empty. Removing files...')
for file_name in os.listdir(dir_name):
file = dir_name + '/' + file_name
if os.path.isfile(file):
os.remove(file)
if __name__ == '__main__':
parser = argparse.ArgumentParser()
parser.add_argument(
'stream-name', help='the name of the stream to dump records from')
parser.add_argument(
'--dir', help='the name of the directory to store shards content (default value is "records")')
args = parser.parse_args()
dir = args.dir if args.dir else 'Records'
stream_name = getattr(args, 'stream-name')
create_or_empty_directory(dir)
shards = list_shards(stream_name)
for shard in shards:
iterator = get_iterator(stream_name, shard)
records = get_records(iterator)
store_into_file(dir, shard, records)
print('\nComplete!')
print('Files written to ' + os.path.abspath(dir))