Skip to content

Commit 6a7d6ab

Browse files
authored
Merge pull request #1 from egemenyavuz/master
switched to cherrypy framework,added envvars for auth, implemented support for upload, updated readme
2 parents 492e155 + 3bf416e commit 6a7d6ab

File tree

6 files changed

+286
-51
lines changed

6 files changed

+286
-51
lines changed

.gitignore

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,2 +1,5 @@
11
.idea
22
venv
3+
env*list
4+
__pycache__
5+
*.pyc

Dockerfile

Lines changed: 6 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,11 @@
1-
FROM python:3-alpine
1+
FROM python:3-slim
22
MAINTAINER Graham Moore "graham.moore@sesam.io"
3-
COPY ./service /service
3+
COPY ./service/*.py /service/
4+
COPY ./service/requirements.txt /service/requirements.txt
45
WORKDIR /service
5-
RUN pip install -r requirements.txt
6+
RUN pip install --upgrade pip \
7+
&& pip install -r requirements.txt
8+
69
EXPOSE 5000/tcp
710
ENTRYPOINT ["python"]
811
CMD ["proxy-service.py"]

README.md

Lines changed: 94 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,96 @@
1+
[![Build Status](https://travis-ci.org/sesam-community/azure-storage.svg?branch=master)](https://travis-ci.org/sesam-community/azure-storage)
2+
3+
14
# azure-storage
2-
Sesam datasource that reads files from Azure Storage
5+
Can be used to
6+
* reads files from Azure Storage(Blob/Files)
7+
* upload files to Azure Storage(Blob/Files)
38

4-
[![Build Status](https://travis-ci.org/sesam-community/azure-storage.svg?branch=master)](https://travis-ci.org/sesam-community/azure-storage)
9+
### Environment Parameters
10+
11+
| CONFIG_NAME | DESCRIPTION | IS_REQUIRED |DEFAULT_VALUE|
12+
| -------------------|---------------------|:------------:|:-----------:|
13+
| ACCOUNT_NAME | Azure Storage account name. | no, basic auth alternatively | n/a |
14+
| ACCOUNT_KEY | Azure Storage account key. | no, basic auth alternatively | n/a |
15+
| SAS_PARAMS | dict for the sas_token generation parameters | no | check the code |
16+
17+
18+
### Query Parameters
19+
20+
##### POST REQUEST:
21+
22+
| CONFIG_NAME | DESCRIPTION | IS_REQUIRED |DEFAULT_VALUE|
23+
| -------------------|---------------------|:------------:|:-----------:|
24+
| start_timedelta | time offset for the start of sas_token validity. <numeric_value> followed by M, H and D for minutes, hours and days, respectively. | no | 0M (immediate start) |
25+
| expiry_timedelta | time offset for the expiry of sas_token validity. <numeric_value> followed by M, H and D for minutes, hours and days, respectively. | no | 12H |
26+
27+
28+
29+
### An example of system config:
30+
31+
```json
32+
{
33+
"_id": "my_azure_storage",
34+
"type": "system:microservice",
35+
"connect_timeout": 60,
36+
"docker": {
37+
"environment": {
38+
"ACCOUNT_NAME":"myaccount",
39+
"ACCOUNT_KEY":"myaccount-key"
40+
},
41+
"image": "sesamcommunity/azure-storage:latest",
42+
"port": 5000
43+
},
44+
"read_timeout": 7200,
45+
}
46+
```
47+
48+
### An example of input-pipe config:
49+
```json
50+
{
51+
"_id": "mydataset-from-azure-storage-file",
52+
"type": "pipe",
53+
"source": {
54+
"type": "json",
55+
"system": "my_azure_storage",
56+
"url": "/file/myshare/mydir/mydataset.json"
57+
},
58+
"transform": {
59+
"type": "dtl",
60+
"rules": {
61+
"default": [
62+
["copy", "*"]
63+
]
64+
}
65+
}
66+
}
67+
68+
```
69+
70+
### An example of output-pipe config:
71+
Note that to uploading could be tricky due to batchsize etc. You might utilize another microservice between sesam and azure-storage for tailor-made solutions.
72+
```json
73+
{
74+
"_id": "mydataset-ftp-endpoint",
75+
"type": "pipe",
76+
"source": {
77+
"type": "dataset",
78+
"dataset": "mydataset-ftp"
79+
},
80+
"sink": {
81+
"type": "json",
82+
"system": "my_azure_storage",
83+
"batch_size": 1000000,
84+
"url": "/file/myshare/mydir/mydataset.json"
85+
},
86+
"transform": {
87+
"type": "dtl",
88+
"rules": {
89+
"default": [
90+
["copy", "*"]
91+
]
92+
}
93+
}
94+
}
95+
96+
```

service/logger.py

Lines changed: 33 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,33 @@
1+
import logging
2+
from paste.translogger import TransLogger
3+
4+
5+
def get_stdout_handler():
6+
format_string = '%(asctime)s - %(name)s - %(levelname)s - %(message)s'
7+
stdout_handler = logging.StreamHandler()
8+
stdout_handler.setFormatter(logging.Formatter(format_string))
9+
return stdout_handler
10+
11+
12+
def init_logger(logger_name, loglevel):
13+
logger = logging.getLogger(logger_name)
14+
15+
# Log to stdout
16+
logger.addHandler(get_stdout_handler())
17+
logger.setLevel(loglevel)
18+
logger.propagate = False
19+
return logger
20+
21+
22+
def get_level_name(loglevel):
23+
return logging.getLevelName(loglevel)
24+
25+
26+
def add_access_logger(app, logger):
27+
wsgi_log_format_string = ('"%(REQUEST_METHOD)s %(REQUEST_URI)s %(HTTP_VERSION)s" '
28+
'%(status)s %(bytes)s')
29+
30+
app.wsgi_app = TransLogger(app.wsgi_app, logger_name=logger.name, format=wsgi_log_format_string,
31+
setup_console_handler=False, set_logger_level=logger.level)
32+
app.logger.addHandler(get_stdout_handler())
33+
return app

service/proxy-service.py

Lines changed: 145 additions & 44 deletions
Original file line numberDiff line numberDiff line change
@@ -1,22 +1,31 @@
11
from flask import Flask, request, Response, abort, send_file
22
from functools import wraps
3-
from azure.storage.file import FileService
4-
from azure.storage.blob import BlockBlobService
5-
import logging
3+
from azure.storage.file import FileService, SharePermissions
4+
from azure.storage.blob import BlockBlobService, BlobPermissions
5+
import logger as logging
66
import os
77
import io
8+
import json
9+
from datetime import datetime, timedelta
810

911
app = Flask(__name__)
1012

13+
logger = logging.init_logger('azure-storage-service', os.getenv('LOGLEVEL',"INFO"))
1114

12-
def get_var(var):
13-
envvar = None
14-
if var.upper() in os.environ:
15-
envvar = os.environ[var.upper()]
15+
DEFAULT_SAS_PARAMS = '{"start_timedelta": null, "expiry_timedelta": "12H"}'
16+
17+
PORT = int(os.getenv("PORT", 5000))
18+
ACCOUNT_NAME = os.getenv("ACCOUNT_NAME")
19+
ACCOUNT_KEY = os.getenv("ACCOUNT_KEY")
20+
SAS_PARAMS = json.loads(os.getenv("SAS_PARAMS",DEFAULT_SAS_PARAMS))
21+
22+
logger.info('starting azure-storage-service with \n\tPORT={}\n\tACCOUNT_NAME={}\n\tLOGLEVEL={}\n\tSAS_PARAMS={}'.format(PORT, ACCOUNT_NAME, os.getenv('LOGLEVEL',"INFO"),SAS_PARAMS))
23+
24+
def get_auth(auth):
25+
if auth:
26+
return auth.get('username'), auth.get('password')
1627
else:
17-
envvar = request.args.get(var)
18-
logger.debug("Setting %s = %s" % (var, envvar))
19-
return envvar
28+
return ACCOUNT_NAME, ACCOUNT_KEY
2029

2130
def authenticate():
2231
"""Sends a 401 response that enables basic auth"""
@@ -29,61 +38,153 @@ def requires_auth(f):
2938
@wraps(f)
3039
def decorated(*args, **kwargs):
3140
auth = request.authorization
32-
if not auth:
41+
if not auth and not(ACCOUNT_NAME and ACCOUNT_KEY):
3342
return authenticate()
3443
return f(*args, **kwargs)
3544

3645
return decorated
3746

38-
@app.route('/file/<share_name>/<directory_name>/<file_name>', methods=['GET'])
47+
48+
def get_sas_params(args):
49+
def str_to_timedelta(str):
50+
days = 0
51+
hours = 0
52+
minutes = 0
53+
if str:
54+
str = str.upper()
55+
if str[-1] == 'D':
56+
days = int(str[:-1])
57+
elif str[-1] == 'H':
58+
hours = int(str[:-1])
59+
elif str[-1] == 'M':
60+
minutes = int(str[:-1])
61+
return timedelta(days=days, hours=hours,minutes=minutes)
62+
start_timedelta_in = args.get('start_timedelta', SAS_PARAMS.get('start_timedelta'))
63+
expiry_timedelta_in = args.get('expiry_timedelta', SAS_PARAMS.get('expiry_timedelta'))
64+
start_timedelta_out = str_to_timedelta(start_timedelta_in)
65+
expiry_timedelta_out = str_to_timedelta(expiry_timedelta_in)
66+
return start_timedelta_out, expiry_timedelta_out
67+
68+
69+
def get_location(fpath):
70+
path_list = fpath.split('/')
71+
directory_name = "/".join(path_list[:-1]) if len(path_list) > 1 else None
72+
file_name = path_list[-1]
73+
return directory_name, file_name
74+
75+
@app.route('/file/<share_name>/<path:path_to_file>', methods=['GET'])
3976
@requires_auth
40-
def get_file(share_name, directory_name, file_name):
41-
auth = request.authorization
42-
file_service = FileService(account_name = auth.username, account_key = auth.password)
43-
f_stream = io.BytesIO()
77+
def get_file(share_name, path_to_file):
78+
try:
79+
account_name, account_key = get_auth(request.authorization)
80+
directory_name, file_name = get_location(path_to_file)
81+
file_service = FileService(account_name = account_name, account_key = account_key)
82+
f_stream = io.BytesIO()
83+
file_service.get_file_to_stream(share_name, directory_name, file_name,
84+
f_stream, max_connections=6)
85+
f_stream.seek(0)
86+
return send_file(f_stream, attachment_filename=file_name, as_attachment=True)
87+
except Exception as e:
88+
logger.exception(e)
89+
return abort(500, e)
90+
91+
@app.route('/file/<share_name>/<path:path_to_file>', methods=['POST'])
92+
@requires_auth
93+
def post_file(share_name, path_to_file):
4494
try:
45-
file_service.get_file_to_stream(share_name, directory_name, file_name,
46-
f_stream, max_connections=6)
47-
f_stream.seek(0)
48-
return send_file(f_stream, attachment_filename=file_name, as_attachment=True)
95+
account_name, account_key = get_auth(request.authorization)
96+
file_service = FileService(account_name = account_name, account_key = account_key)
97+
start_timedelta, expiry_timedelta = get_sas_params(request.args)
98+
directory_name, file_name = get_location(path_to_file)
99+
if request.headers.get('Transfer-Encoding') == 'chunked':
100+
file_service.create_file_from_stream(share_name, directory_name, file_name, request.stream, count=4096)
101+
else:
102+
file_service.create_file_from_bytes(share_name, directory_name, file_name, request.get_data())
103+
sas_token = file_service.generate_file_shared_access_signature(share_name,
104+
directory_name=directory_name,
105+
file_name=file_name,
106+
permission=SharePermissions(read=True),
107+
expiry=datetime.now() + expiry_timedelta,
108+
start=start_timedelta,
109+
id=None,
110+
ip=None,
111+
protocol='https',
112+
cache_control=request.headers.get('Cache-Control'),
113+
content_disposition=request.headers.get('Content-Disposition: attachment;'),
114+
content_encoding=request.headers.get('Content-Encoding'),
115+
content_language=request.headers.get('Content-Language'),
116+
content_type=request.headers.get('Content-Type'))
117+
url = file_service.make_file_url(share_name, directory_name, file_name, protocol='https', sas_token=sas_token)
118+
119+
return Response(response=url+"", status=200, content_type='text/plain')
49120
except Exception as e:
121+
logger.exception(e)
122+
return abort(500, e)
123+
124+
@app.route('/blob/<container_name>/<blob_name>', methods=['POST'])
125+
@requires_auth
126+
def post_blob(container_name, blob_name):
127+
try:
128+
account_name, account_key = get_auth(request.authorization)
129+
file_service = BlockBlobService(account_name = account_name, account_key = account_key)
130+
start_timedelta, expiry_timedelta = get_sas_params(request.args)
131+
if request.headers.get('Transfer-Encoding') == 'chunked':
132+
file_service.create_file_from_stream(container_name, directory_name, file_name, request.stream, count=4096)
133+
else:
134+
file_service.create_blob_from_bytes(container_name=container_name, blob_name=blob_name, blob=request.get_data())
135+
sas_token = file_service.generate_blob_shared_access_signature(container_name=container_name,
136+
blob_name=blob_name,
137+
permission=BlobPermissions(read=True),
138+
expiry=datetime.now() + expiry_timedelta,
139+
start=start_timedelta,
140+
id=None,
141+
ip=None,
142+
protocol='https',
143+
cache_control=request.headers.get('Cache-Control'),
144+
content_disposition=request.headers.get('Content-Disposition: attachment;'),
145+
content_encoding=request.headers.get('Content-Encoding'),
146+
content_language=request.headers.get('Content-Language'),
147+
content_type=request.headers.get('Content-Type'))
148+
url = file_service.make_blob_url(container_name, blob_name, protocol='https', sas_token=sas_token)
149+
150+
return Response(response=url+"", status=200, content_type='text/plain')
151+
except Exception as e:
152+
logger.exception(e)
50153
return abort(500, e)
51154

52155
@app.route('/blob/<container_name>/<blob_name>', methods=['GET'])
53156
@requires_auth
54157
def get_blob(container_name, blob_name):
55-
auth = request.authorization
56-
blob_service = BlockBlobService(account_name = auth.username, account_key = auth.password)
57-
f_stream = io.BytesIO()
58158
try:
159+
account_name, account_key = get_auth(request.authorization)
160+
blob_service = BlockBlobService(account_name = account_name, account_key = account_key)
161+
f_stream = io.BytesIO()
59162
blob_service.get_blob_to_stream(container_name=container_name, blob_name=blob_name,
60163
stream=f_stream, max_connections=6)
61164
f_stream.seek(0)
62165
return send_file(f_stream, attachment_filename=blob_name, as_attachment=True)
63166
except Exception as e:
167+
logger.exception(e)
64168
return abort(500, e)
65169

66170
if __name__ == '__main__':
67-
# Set up logging
68-
format_string = '%(asctime)s - %(name)s - %(levelname)s - %(message)s'
69-
logger = logging.getLogger('http-ftp-proxy-microservice')
70-
71-
# Log to stdout
72-
stdout_handler = logging.StreamHandler()
73-
stdout_handler.setFormatter(logging.Formatter(format_string))
74-
logger.addHandler(stdout_handler)
75-
76-
loglevel = os.environ.get("LOGLEVEL", "INFO")
77-
if "INFO" == loglevel.upper():
78-
logger.setLevel(logging.INFO)
79-
elif "DEBUG" == loglevel.upper():
80-
logger.setLevel(logging.DEBUG)
81-
elif "WARN" == loglevel.upper():
82-
logger.setLevel(logging.WARN)
83-
elif "ERROR" == loglevel.upper():
84-
logger.setLevel(logging.ERROR)
171+
if os.getenv('WEBFRAMEWORK', '').lower() == 'flask':
172+
app.run(debug=True, host='0.0.0.0', port=PORT)
85173
else:
86-
logger.setlevel(logging.INFO)
87-
logger.info("Define an unsupported loglevel. Using the default level: INFO.")
174+
import cherrypy
175+
176+
app = logging.add_access_logger(app, logger)
177+
cherrypy.tree.graft(app, '/')
178+
179+
# Set the configuration of the web server to production mode
180+
cherrypy.config.update({
181+
'environment': 'production',
182+
'engine.autoreload_on': False,
183+
'log.screen': True,
184+
'server.socket_port': PORT,
185+
'server.socket_host': '0.0.0.0'
186+
})
88187

89-
app.run(threaded=True, debug=True, host='0.0.0.0')
188+
# Start the CherryPy WSGI web server
189+
cherrypy.engine.start()
190+
cherrypy.engine.block()

service/requirements.txt

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,2 +1,5 @@
1-
Flask==0.11
2-
azure
1+
requests==2.20.0
2+
Flask==1.0.2
3+
CherryPy==18.2.0
4+
paste==3.0.8
5+
azure==4.0.0

0 commit comments

Comments
 (0)