-
Notifications
You must be signed in to change notification settings - Fork 2
/
Copy pathS3KeyValueStore.py
135 lines (107 loc) · 4.37 KB
/
S3KeyValueStore.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
"""
Implements a performant key-value store backed on S3 that uses objects stored
in a specific prefix. In practice, this is more performant and scales to a few
hundred transactions per second for workloads without atomicity requirements.
"""
import sys
import json
import cPickle
import hashlib
import boto3
from botocore.exceptions import ClientError
class Table(object):
"""
Implements the key-value store on top of S3 that provides get_item and put_item
semantics identical to a DynamoDB Table resource.
"""
def __init__(self, bucket, prefix, keys):
"""
Create an S3 key-value table object, by providing the bucket, the prefix,
and the list of keys that must be present in every object that will be used
to construct the S3 object key.
"""
self.bucket = bucket
self.prefix = prefix.strip('/')
self.client = boto3.client('s3')
self.keys = set(keys)
def get_item(self, **kwargs):
"""
Provides a method for getting an item with the same semantics as the DynamoDB
get_item function of a Table resource.
"""
if 'Key' not in kwargs:
raise KeyError("'Key' is a non-optional keyword argument")
item = kwargs['Key']
if not isinstance(item, dict):
raise TypeError("'Key' must be a dictionary")
try:
item_key = dict([(k, item[k]) for k in self.keys])
except KeyError as e:
item_key = dict()
if set(item_key.keys()) != self.keys:
raise ValueError("'Key' must have at least the following keys: %s"
% str(self.keys))
object_key = hashlib.sha256(
json.dumps(item_key, sort_keys=True)).hexdigest()
try:
value_object = self.client.get_object(
Bucket=self.bucket, Key=self.prefix + '/' + object_key)
except ClientError as exc:
if exc.response['Error']['Code'] == 'NoSuchKey' or exc.response[
'Error']['Code'] == 'AccessDenied':
value_object = None
else:
sys.stderr.write("%s %s\n" %
(self.bucket, self.prefix + '/' + object_key))
raise exc
ret = dict()
if value_object is not None:
ret['Item'] = cPickle.loads(value_object['Body'].read())
else:
pass
return ret
def put_item(self, **kwargs):
"""
Provides a method for getting an item with the same semantics as the DynamoDB
get_item function of a Table resource.
"""
if 'Item' not in kwargs:
raise KeyError("'Item' is a non-optional keyword argument")
item = kwargs['Item']
if not isinstance(item, dict):
raise TypeError("'Item' must be a dictionary")
try:
item_key = dict([(k, item[k]) for k in self.keys])
except KeyError:
item_key = dict()
if set(item_key.keys()) != self.keys:
raise ValueError("'Item' must have at least the following keys: %s"
% str(self.keys))
object_key = hashlib.sha256(
json.dumps(item_key, sort_keys=True)).hexdigest()
self.client.put_object(
Bucket=self.bucket,
Key=self.prefix + '/' + object_key,
Body=cPickle.dumps(item))
# def update_item(self, **kwargs):
# for kwarg in ["Key", "UpdateExpression", "ExpressionAttributeValues"]:
# if kwarg not in kwargs:
# raise KeyError("'%s' is a non-optional keyword argument" %
# kwarg)
# key = kwargs['Item']
# if not isinstance(item, dict):
# raise TypeError("'Item' must be a dictionary")
# try:
# item_key = dict([(k, item[k]) for k in self.keys])
# except KeyError:
# item_key = dict()
# if set(item_key.keys()) != self.keys:
# raise ValueError("'Item' must have at least the following keys: %s"
# % str(self.keys))
# object_key = hashlib.sha256(
# json.dumps(item_key, sort_keys=True)).hexdigest()
# self.client.put_object(
# Bucket=self.bucket,
# Key=self.prefix + '/' + object_key,
# Body=cPickle.dumps(item))
return dict()