-
Notifications
You must be signed in to change notification settings - Fork 1
/
Copy pathdynamodb_copy_table.py
159 lines (127 loc) · 5.32 KB
/
dynamodb_copy_table.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
import sys
import os
from time import sleep
import boto3
import multiprocessing
import itertools
spinner = itertools.cycle(['-', '/', '|', '\\'])
def copy_items(src_table, dst_table, client, segment, total_segments):
# copy over item
item_count = 0
paginator = client.get_paginator('scan')
for page in paginator.paginate(
TableName=src_table,
Select='ALL_ATTRIBUTES',
ReturnConsumedCapacity='NONE',
ConsistentRead=True,
Segment=segment,
TotalSegments=total_segments,
PaginationConfig={"PageSize": 25}):
batch = []
for item in page['Items']:
item_count += 1
batch.append({
'PutRequest': {
'Item': item
}
})
print "Process %d put %d items" % (segment, item_count)
client.batch_write_item(
RequestItems={
dst_table: batch
}
)
def create_table(src_table, dst_table, client):
# get source table and its schema
try:
table_schema = client.describe_table(TableName=src_table)["Table"]
except client.exceptions.ResourceNotFoundException:
print "!!! Table %s does not exist. Exiting..." % src_table
sys.exit(1)
print '*** Reading key schema from %s table' % src_table
# create keyword args for copy able
keyword_args = {"TableName": dst_table}
keyword_args['KeySchema'] = table_schema['KeySchema']
keyword_args['AttributeDefinitions'] = table_schema['AttributeDefinitions']
global_secondary_indexes = []
local_secondary_indexes = []
if table_schema.get("GlobalSecondaryIndexes"):
for item in table_schema["GlobalSecondaryIndexes"]:
index = {}
for k, v in item.iteritems():
if k in ["IndexName", "KeySchema", "Projection", "ProvisionedThroughput"]:
if k == "ProvisionedThroughput":
# uncomment below to have same read/write capacity as original table
# for key in v.keys():
# if key not in ["ReadCapacityUnits", "WriteCapacityUnits"]:
# del v[key]
# comment below to have same read/write capacity as original table
index[k] = {"ReadCapacityUnits": 3, "WriteCapacityUnits": 1200}
continue
index[k] = v
global_secondary_indexes.append(index)
if table_schema.get("LocalSecondaryIndexes"):
for item in table_schema["LocalSecondaryIndexes"]:
index = {}
for k, v in item.iteritems():
if k in ["IndexName", "KeySchema", "Projection"]:
index[k] = v
local_secondary_indexes.append(index)
if global_secondary_indexes:
keyword_args["GlobalSecondaryIndexes"] = global_secondary_indexes
if local_secondary_indexes:
keyword_args["LocalSecondaryIndexes"] = local_secondary_indexes
# uncomment below to have same read/write capacity as original table
# provisionedThroughput = table_schema['ProvisionedThroughput']
# for key in provisionedThroughput.keys():
# if key not in ["ReadCapacityUnits", "WriteCapacityUnits"]:
# del provisionedThroughput[key]
# keyword_args["ProvisionedThroughput"] = provisionedThroughput
# comment below to have same read/write capacity as original table
keyword_args["ProvisionedThroughput"] = {"ReadCapacityUnits": 3, "WriteCapacityUnits": 1200}
if table_schema.get('StreamSpecification'):
keyword_args['StreamSpecification'] = table_schema['StreamSpecification']
# create copy table
try:
client.describe_table(TableName=dst_table)
print '!!! Table %s already exists. Exiting...' % dst_table
sys.exit(0)
except client.exceptions.ResourceNotFoundException:
client.create_table(**keyword_args)
print '*** Waiting for the new table %s to become active' % dst_table
sleep(5)
while client.describe_table(TableName=dst_table)['Table']['TableStatus'] != 'ACTIVE':
sys.stdout.write(spinner.next())
sys.stdout.flush()
sleep(0.1)
sys.stdout.write('\b')
print '*** New table %s to is now active!' % dst_table
if __name__ == "__main__":
if len(sys.argv) != 3:
print 'Usage: %s <source_table_name>' \
' <destination_table_name>' % sys.argv[0]
sys.exit(1)
table_1 = sys.argv[1]
table_2 = sys.argv[2]
region = os.getenv('AWS_DEFAULT_REGION', 'us-east-1')
iam_role = boto3.session.Session(profile_name='default')
db_client = iam_role.client('dynamodb')
create_table(table_1, table_2, db_client)
pool_size = 8 # tested with 4, took 5 minutes to copy 150,000+ items
pool = []
for i in range(pool_size):
worker = multiprocessing.Process(
target=copy_items,
kwargs={
'src_table': table_1,
'dst_table': table_2,
'client': db_client,
'segment': i,
'total_segments': pool_size
}
)
pool.append(worker)
worker.start()
for process in pool:
process.join()
print '*** All Jobs Done. Exiting... ***'