-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathstorage.py
203 lines (166 loc) · 7.56 KB
/
storage.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
import abc
from inspect import signature
from io import BytesIO
import json
import os
import boto3
from typing import IO, Any
class StorageDriver(metaclass=abc.ABCMeta):
@classmethod
def __subclasshook__(cls, subclass):
return (hasattr(subclass, 'store') and
callable(subclass.store) and
len(signature(subclass.store).parameters) == 3 and
hasattr(subclass, 'delete') and
callable(subclass.delete) and
len(signature(subclass.store).parameters) == 2 and
hasattr(subclass, 'get') and
callable(subclass.get) and
len(signature(subclass.store).parameters) == 2 or
NotImplemented)
@abc.abstractmethod
def __init__(self, config) -> None:
"""
Initializes the storage driver object. The passed config parameter is guaranteed to contain all required config options.
:param config: configuration dictionary
:returns: None
"""
pass
@abc.abstractmethod
def store(self, id, file) -> (None | str):
"""
if not external storage driver: Takes the file object and stores it. The file must be retrievable using the id.
if external storage driver: Returns a json string containing a dictionary. The file will be uploaded to the url member of the dictionary.
The file name will be set to the 'file-name' value of the dictionary. All other members of the dictionary will be send as POST options for the request.
:param id: id of the file
:param file: none if external storage driver, file to upload otherwise
:return: None if not external storage driver, json string otherwise
"""
pass
@abc.abstractmethod
def delete(self, id) -> None:
"""
Deletes the file with the given id
:param id: id of the file to be deleted
:returns: None
"""
pass
@abc.abstractmethod
def get(self, id) -> (str | IO):
"""
if not external storage driver: Returns a IO object that contains the file with the given id
if external storage driver: returns url string where the file can be downloaded
:param id: id of the file to be retrieved
:returns: IO object to file if not external storage driver, string with download url otherwise
"""
pass
@staticmethod
@abc.abstractmethod
def required_config() -> dict[str, Any]:
"""
Defines the required config for this. Member names of the returned dictionary are the config parameter names. Values of the dictionary are set as default values.
:returns: dictionary with the required config parameters and their default values.
"""
pass
@property
def extern(self) -> bool:
"""
Determines if the storage driver is external or not. External storage drivers only returns urls for the upload and download of the file.
The user's webbrowser will then upload the files directly to the storage backend (e.g. using signed urls for S3). Non-external storage drivers handle the file objects directly.
"""
return False
class FileSystemStorageDriver(StorageDriver):
def __init__(self, config) -> None:
self.config = config
def store(self, id, file) -> (None | str):
# see FileUpload._copy_file
chunksize = 2 ** 16
with open(os.path.join(self.config['file_location'], id), 'wb') as f:
while True:
buf = file.read()
if not buf: break
f.write(buf)
def delete(self, id) -> None:
os.remove(os.path.join(self.config['file_location'], id))
def get(self, id) -> (str | IO):
return open(os.path.join(self.config['file_location'], id), 'rb')
@staticmethod
def required_config() -> dict[str, Any]:
return {'file_location': 'files'}
class S3StorageDriverExtern(StorageDriver):
def __init__(self, config):
self.config = config
try:
session = boto3.session.Session()
self._s3_client = session.client('s3',
region_name=config['S3_REGION'],
endpoint_url=config['S3_ENDPOINT'],
aws_access_key_id=config['S3_ACCESS_ID'],
aws_secret_access_key=config['S3_SECRET_KEY'])
except Exception as e:
raise Exception("""Could not connect to s3 instance.
Make sure S3_ACCESS_ID, S3_SECRET_KEY and S3_BUCKET are correctly defined in the config.""")
response = self._s3_client.list_buckets()
for bucket in response['Buckets']:
if bucket['Name'] == config['S3_BUCKET']:
return
raise Exception('Could not find bucket ' + config['S3_BUCKET'] + '.')
def store(self, id, file) -> (None | str):
response = self._s3_client.generate_presigned_post(self.config['S3_BUCKET'], id, ExpiresIn=60)
for k in response['fields'].keys():
response[k] = response['fields'][k]
del response['fields']
response['file-name'] = response['key']
return json.dumps(response)
def delete(self, id) -> None:
self._s3_client.delete_object(Bucket=self.config['S3_BUCKET'], Key=id)
def get(self, id) -> (str | IO):
params = {'Bucket': self.config['S3_BUCKET'],
'Key': id}
return self._s3_client.generate_presigned_url('get_object', Params=params, ExpiresIn=60)
@staticmethod
def required_config() -> dict[str, Any]:
return {'S3_ACCESS_KEY': '',
'S3_SECRET_KEY': '',
'S3_REGION': '',
'S3_ENDPOINT': '',
'S3_BUCKET': 'file_transfer'}
@property
def extern(self) -> bool:
return True
class S3StorageDriver(StorageDriver):
def __init__(self, config):
self.config = config
try:
session = boto3.session.Session()
self._s3_client = session.client('s3',
region_name=config['S3_REGION'],
endpoint_url=config['S3_ENDPOINT'],
aws_access_key_id=config['S3_ACCESS_ID'],
aws_secret_access_key=config['S3_SECRET_KEY'])
except Exception as e:
raise Exception("""Could not connect to s3 instance.
Make sure S3_ACCESS_ID, S3_SECRET_KEY and S3_BUCKET are correctly defined in the config.""")
response = self._s3_client.list_buckets()
for bucket in response['Buckets']:
if bucket['Name'] == config['S3_BUCKET']:
return
raise Exception('Could not find bucket ' + config['S3_BUCKET'] + '.')
def store(self, id, file) -> (None | str):
self._s3_client.upload_fileobj(file, self.config['S3_BUCKET'], id)
def delete(self, id) -> None:
self._s3_client.delete_object(Bucket=self.config['S3_BUCKET'], Key=id)
def get(self, id) -> (str | IO):
file_obj = BytesIO()
a = self._s3_client.get_object(Bucket=self.config['S3_BUCKET'], Key=id)['Body'].read()
return a
@staticmethod
def required_config() -> dict[str, Any]:
return {'S3_ACCESS_KEY': '',
'S3_SECRET_KEY': '',
'S3_REGION': '',
'S3_ENDPOINT': '',
'S3_BUCKET': 'file_transfer'}
@property
def extern(self) -> bool:
return False