Skip to content

Commit 00dbe72

Browse files
committed
Added basic functionality for read and write to HUAWEI Object Storage Service (OBS)
1 parent 5a82613 commit 00dbe72

File tree

5 files changed

+491
-1
lines changed

5 files changed

+491
-1
lines changed

setup.py

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -42,8 +42,9 @@ def read(fname):
4242
http_deps = ['requests']
4343
ssh_deps = ['paramiko']
4444
zst_deps = ['zstandard']
45+
obs_deps = ['esdk-obs-python']
4546

46-
all_deps = aws_deps + gcs_deps + azure_deps + http_deps + ssh_deps + zst_deps
47+
all_deps = aws_deps + gcs_deps + azure_deps + http_deps + ssh_deps + zst_deps + obs_deps
4748
tests_require = all_deps + [
4849
'moto[server]',
4950
'responses',
@@ -83,6 +84,7 @@ def read(fname):
8384
'webhdfs': http_deps,
8485
'ssh': ssh_deps,
8586
'zst': zst_deps,
87+
'obs': obs_deps,
8688
},
8789
python_requires=">=3.7,<4.0",
8890

smart_open/obs.py

Lines changed: 371 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,371 @@
1+
# -*- coding: utf-8 -*-
2+
#
3+
# Copyright (C) 2024 Sergei Sokolov <sv.sokolov@gmail.com>
4+
#
5+
# This code is distributed under the terms and conditions
6+
# from the MIT License (MIT).
7+
#
8+
"""Implements file-like objects for reading and writing from/to HUAWEI Object Storage Service (OBS)."""
9+
from __future__ import annotations
10+
11+
import io
12+
import logging
13+
import os
14+
import struct
15+
import sys
16+
from typing import Optional
17+
18+
from smart_open.utils import set_defaults
19+
20+
try:
21+
import obs.client
22+
from obs.searchmethod import get_token
23+
from obs import loadtoken
24+
except ImportError:
25+
MISSING_DEPS = True
26+
27+
import smart_open.bytebuffer
28+
import smart_open.utils
29+
30+
from smart_open import constants
31+
32+
logger = logging.getLogger(__name__)
33+
34+
SCHEMES = ('obs',)
35+
URI_EXAMPLES = (
36+
'obs://bucket_id.server:port/object_key',
37+
)
38+
39+
DEFAULT_CHUNK_SIZE = 65536
40+
DEFAULT_HTTP_PROTOCOL = 'https'
41+
DEFAULT_SECURITY_PROVIDER_POLICY = 'ENV'
42+
43+
default_client_kwargs = {
44+
'security_provider_policy': DEFAULT_SECURITY_PROVIDER_POLICY,
45+
}
46+
47+
48+
def parse_uri(uri_as_string):
49+
split_uri = smart_open.utils.safe_urlsplit(uri_as_string)
50+
assert split_uri.scheme in SCHEMES
51+
52+
bucket_id, server = split_uri.netloc.split('.', 1)
53+
object_key = split_uri.path[1:]
54+
55+
return dict(
56+
scheme=split_uri.scheme,
57+
bucket_id=bucket_id,
58+
object_key=object_key,
59+
server=server,
60+
)
61+
62+
63+
def open_uri(uri, mode, transport_params):
64+
parsed_uri = parse_uri(uri)
65+
kwargs = smart_open.utils.check_kwargs(open, transport_params)
66+
67+
http_protocol = transport_params.get('http_protocol', DEFAULT_HTTP_PROTOCOL)
68+
client_kwargs = {
69+
'server': f'{http_protocol}://{parsed_uri["server"]}',
70+
}
71+
client_kwargs.update(default_client_kwargs)
72+
73+
kwargs['client'] = transport_params.get('client', client_kwargs)
74+
75+
default_kwarg = {
76+
'use_obs_client_write_mode':
77+
os.environ.get('SMART_OPEN_OBS_USE_CLIENT_WRITE_MODE', 'false').lower() in ('true'),
78+
'decrypt_ak_sk':
79+
os.environ.get('SMART_OPEN_OBS_DECRYPT_AK_SK', 'false').lower() in ('true'),
80+
'scc_lib_path':
81+
os.environ.get('SMART_OPEN_OBS_SCC_LIB_PATH', None),
82+
'scc_conf_path':
83+
os.environ.get('SMART_OPEN_OBS_SCC_CONF_PATH', None),
84+
}
85+
86+
set_defaults(kwargs, default_kwarg)
87+
88+
return open(parsed_uri['bucket_id'], parsed_uri['object_key'], mode, **kwargs)
89+
90+
91+
def open(
92+
bucket_id,
93+
object_key,
94+
mode,
95+
buffer_size=DEFAULT_CHUNK_SIZE,
96+
client: Optional[obs.ObsClient | dict] = None,
97+
headers: Optional[obs.PutObjectHeader | obs.GetObjectHeader] = None,
98+
use_obs_client_write_mode: bool = False,
99+
decrypt_ak_sk: bool = False,
100+
scc_lib_path: Optional[str] = None,
101+
scc_conf_path: Optional[str] = None):
102+
"""Open an OBS object for reading or writing.
103+
104+
Parameters
105+
----------
106+
bucket_id: str
107+
The name of the bucket this object resides in.
108+
object_key: str
109+
The name of the key within the bucket.
110+
mode: str
111+
The mode for opening the object. Must be either "rb" or "wb".
112+
buffer_size: int
113+
The buffer size to use when performing I/O.
114+
client: Optional[obs.ObsClient | dict]
115+
The initialized OBS client or dict with args that will be supplied to obs.ObsClient constructor.
116+
Please see docs for esdk-obs-python.
117+
headers: Optional[obs.PutObjectHeader | obs.GetObjectHeader]
118+
The optional additional headers of the request.
119+
Please see docs for esdk-obs-python.
120+
use_obs_client_write_mode: bool
121+
True if we will use readable object to get bytes.
122+
For writing only.
123+
Please see docs for ObsClient.putContent api
124+
decrypt_ak_sk: bool
125+
True if we need decrypt Access key, Secret key and Security token.
126+
It required to install CryptoAPI libs.
127+
https://support.huawei.com/enterprise/en/software/260510077-ESW2000847337
128+
scc_lib_path: Optional[str]
129+
The path to CryptoAPI libs.
130+
scc_conf_path: Optional[str]
131+
The path to scc.conf.
132+
"""
133+
134+
logger.debug('%r', locals())
135+
if mode not in constants.BINARY_MODES:
136+
raise NotImplementedError('bad mode: %r expected one of %r' % (mode, constants.BINARY_MODES))
137+
138+
_client = client if isinstance(client, obs.ObsClient) else create_obs_client(
139+
client_config=client,
140+
decrypt_ak_sk=decrypt_ak_sk,
141+
scc_lib_path=scc_lib_path,
142+
scc_conf_path=scc_conf_path)
143+
144+
if mode == constants.READ_BINARY:
145+
fileobj = ObsReader(bucket_id=bucket_id,
146+
object_key=object_key,
147+
client=_client,
148+
headers=headers)
149+
elif mode == constants.WRITE_BINARY:
150+
fileobj = ObsWriter(bucket_id=bucket_id,
151+
object_key=object_key,
152+
client=_client,
153+
headers=headers,
154+
use_obs_client_write_mode=use_obs_client_write_mode)
155+
else:
156+
assert False, 'unexpected mode: %r' % mode
157+
return fileobj
158+
159+
160+
def create_obs_client(client_config: dict,
161+
decrypt_ak_sk: bool = False,
162+
scc_lib_path: Optional[str] = None,
163+
scc_conf_path: Optional[str] = None) -> obs.ObsClient:
164+
"""Initializes the ObsClient.
165+
"""
166+
if not decrypt_ak_sk:
167+
return obs.ObsClient(**client_config)
168+
169+
decrypted_config = _decrypt_ak_sk(client_config=client_config,
170+
scc_lib_path=scc_lib_path,
171+
scc_conf_path=scc_conf_path)
172+
173+
set_defaults(decrypted_config, client_config)
174+
return obs.ObsClient(**decrypted_config)
175+
176+
177+
def _decrypt_ak_sk(client_config: dict,
178+
scc_lib_path: Optional[str] = None,
179+
scc_conf_path: Optional[str] = None) -> dict:
180+
crypto_provider = CryptoProvider(scc_lib_path=scc_lib_path,
181+
scc_conf_path=scc_conf_path)
182+
183+
if 'access_key_id' in client_config:
184+
access_key_id = client_config.get('access_key_id')
185+
secret_access_key = client_config.get('secret_access_key')
186+
security_token = client_config.get('security_token', None)
187+
else:
188+
tokens = get_token(security_providers=loadtoken.ENV)
189+
access_key_id = tokens.get('accessKey')
190+
secret_access_key = tokens.get('secretKey')
191+
security_token = tokens.get('securityToken')
192+
193+
return {
194+
access_key_id: crypto_provider.decrypt(access_key_id),
195+
secret_access_key: crypto_provider.decrypt(secret_access_key),
196+
security_token: crypto_provider.decrypt(security_token),
197+
}
198+
199+
200+
class ObsReader(io.RawIOBase):
201+
"""Read an OBS Object.
202+
"""
203+
204+
def __init__(self,
205+
bucket_id: str,
206+
object_key: str,
207+
client: obs.ObsClient,
208+
headers: Optional[obs.GetObjectHeader] = None,
209+
buffer_size: int = DEFAULT_CHUNK_SIZE):
210+
self.name = object_key
211+
self.bucket_id = bucket_id
212+
self.object_key = object_key
213+
self.buffer_size = buffer_size
214+
self._client = client
215+
self._buffer = smart_open.bytebuffer.ByteBuffer(buffer_size)
216+
self._resp = self._client.getObject(bucketName=bucket_id,
217+
objectKey=object_key,
218+
headers=headers)
219+
if self._resp.status >= 300:
220+
raise RuntimeError(
221+
f'Failed to read: {self.object_key}! '
222+
f'errorCode: {self._resp.errorCode}, '
223+
f'errorMessage: {self._resp.errorMessage}')
224+
225+
def readinto(self, __buffer):
226+
data = self.read(len(__buffer))
227+
if not data:
228+
return 0
229+
__buffer[:len(data)] = data
230+
return len(data)
231+
232+
def readinto1(self, __buffer):
233+
return self.readinto(__buffer)
234+
235+
def read(self, size=-1):
236+
if size == 0:
237+
return b''
238+
239+
if self._resp is None:
240+
raise RuntimeError(f'No response received while reading: {self.object_key}')
241+
242+
if size > 0:
243+
chunk = self._resp.body.response.read(size)
244+
return chunk
245+
else:
246+
while True:
247+
chunk = self._resp.body.response.read(self.buffer_size)
248+
if not chunk:
249+
break
250+
self._buffer.fill(struct.unpack(str(len(chunk)) + 'c', chunk))
251+
return self._buffer.read()
252+
253+
def read1(self, size=-1):
254+
return self.read(size)
255+
256+
def close(self):
257+
self.__del__()
258+
259+
def seekable(self):
260+
return False
261+
262+
def detach(self):
263+
"""Unsupported."""
264+
raise io.UnsupportedOperation
265+
266+
def __del__(self):
267+
try:
268+
if self._client:
269+
self._resp = None
270+
self._client.close()
271+
self._client = None
272+
except Exception as ex:
273+
logger.warning(ex)
274+
275+
276+
class ObsWriter(io.RawIOBase):
277+
"""Write an OBS Object.
278+
279+
If use_obs_client_write_mode set to False:
280+
this class buffers all of its input in memory until its `close` method is called.
281+
Only then the data will be written to OBS and the buffer is released.
282+
283+
If use_obs_client_write_mode set to True:
284+
`write` method of the ObsWriter will accept any readable object or path to file.
285+
In this case will be used internal implementation in obs.ObsClient.putContent to read bytes
286+
Write to OBS will be triggered in `close` method.
287+
"""
288+
289+
def __init__(self,
290+
bucket_id: str,
291+
object_key: str,
292+
client: obs.ObsClient,
293+
headers: Optional[obs.PutObjectHeader] = None,
294+
use_obs_client_write_mode: bool = False
295+
):
296+
self.name = object_key
297+
self.bucket_id = bucket_id
298+
self.object_key = object_key
299+
self._client = client
300+
self._headers = headers
301+
self._content: Optional[str | io.BytesIO | io.BufferedReader] = None
302+
self.use_obs_client_write_mode = use_obs_client_write_mode
303+
304+
def write(self, __buffer):
305+
if not __buffer:
306+
return None
307+
308+
if self.use_obs_client_write_mode:
309+
self._content = __buffer
310+
else:
311+
if not self._content:
312+
self._content = io.BytesIO()
313+
self._content.write(__buffer)
314+
return None
315+
316+
def close(self):
317+
if not self._content:
318+
self._client.close()
319+
return
320+
321+
if isinstance(self._content, io.BytesIO):
322+
self._content.seek(0)
323+
324+
self._client.putContent(bucketName=self.bucket_id,
325+
objectKey=self.object_key,
326+
content=self._content,
327+
headers=self._headers)
328+
self._content = None
329+
330+
def seekable(self):
331+
return False
332+
333+
def writable(self):
334+
return self._content is not None
335+
336+
def detach(self):
337+
"""Unsupported."""
338+
raise io.UnsupportedOperation
339+
340+
341+
class CryptoProvider:
342+
"""Decrypt Access Key, Secret Key, Security Token.
343+
344+
This class use Huawei CloudGuard CSP seccomponent to decrypt AK, SK and ST.
345+
"""
346+
347+
def __init__(self, scc_lib_path: Optional[str] = None, scc_conf_path: Optional[str] = None):
348+
self._scc_lib_path = scc_lib_path
349+
self._scc_conf_path = scc_conf_path
350+
351+
if scc_lib_path and scc_lib_path not in sys.path:
352+
sys.path.append(scc_lib_path)
353+
354+
try:
355+
from CryptoAPI import CryptoAPI
356+
except ImportError:
357+
raise RuntimeError('Failed to use CryptoAPI module. Please install CloudGuard CSP seccomponent.')
358+
359+
self._api = CryptoAPI()
360+
361+
if self._scc_conf_path:
362+
self._api.initialize(self._scc_conf_path)
363+
else:
364+
self._api.initialize()
365+
366+
def __del__(self):
367+
if self._api:
368+
self._api.finalize()
369+
370+
def decrypt(self, encrypted: Optional[str]) -> Optional[str]:
371+
return self._api.decrypt(encrypted) if encrypted else None

0 commit comments

Comments
 (0)