Skip to content

Commit f2f1851

Browse files
committed
Merge branch 'master' into pixi
2 parents 2d5dd96 + adf9363 commit f2f1851

11 files changed

+653
-56
lines changed

pfcon/base_storage.py

+75-1
Original file line numberDiff line numberDiff line change
@@ -3,8 +3,9 @@
33
"""
44

55
import logging
6+
import os
67
import abc
7-
import shutil
8+
import shutil, errno
89

910

1011
logger = logging.getLogger(__name__)
@@ -38,3 +39,76 @@ def delete_data(self, job_dir):
3839
Delete job data from the local storage.
3940
"""
4041
shutil.rmtree(job_dir)
42+
43+
def process_chrislink_files(self, job_incoming_dir):
44+
"""
45+
Rearrange the local job incoming directory tree by creating folders that trace
46+
the source dirs pointed by ChRIS link files.
47+
"""
48+
self.job_incoming_dir = job_incoming_dir
49+
self._linked_paths = set()
50+
self._nlinks = 0
51+
self._already_copied_src_set = set()
52+
53+
self._process_chrislink_files(job_incoming_dir)
54+
55+
linked_path_top_folders = set()
56+
for path in self._linked_paths:
57+
linked_path_top_folders.add(path.split('/', 1)[0])
58+
59+
for folder in linked_path_top_folders:
60+
if folder not in self._linked_paths:
61+
self.deletesrc(os.path.join(job_incoming_dir, folder))
62+
63+
return self._nlinks
64+
65+
def _process_chrislink_files(self, dir):
66+
"""
67+
Recursively expand (substitute by actual folders) and remove ChRIS link files.
68+
"""
69+
for root, dirs, files in os.walk(dir):
70+
for filename in files:
71+
if filename.endswith('.chrislink'):
72+
link_file_path = os.path.join(root, filename)
73+
74+
if not link_file_path.startswith(tuple(self._already_copied_src_set)): # only expand a link once
75+
with open(link_file_path, 'rb') as f:
76+
rel_path = f.read().decode().strip()
77+
abs_path = os.path.join(self.job_incoming_dir, rel_path)
78+
79+
if os.path.isfile(abs_path):
80+
rel_path = os.path.dirname(rel_path)
81+
abs_path = os.path.dirname(abs_path)
82+
83+
source_trace_dir = rel_path.replace('/', '_')
84+
dst_path = os.path.join(root, source_trace_dir)
85+
86+
if not os.path.isdir(dst_path): # only copy once to a dest path
87+
self.copysrc(abs_path, dst_path)
88+
self._already_copied_src_set.add(abs_path)
89+
self._process_chrislink_files(dst_path) # recursive call
90+
91+
self._linked_paths.add(rel_path)
92+
93+
os.remove(link_file_path)
94+
self._nlinks += 1
95+
96+
@staticmethod
97+
def copysrc(src, dst):
98+
try:
99+
shutil.copytree(src, dst)
100+
except OSError as e:
101+
if e.errno in (errno.ENOTDIR, errno.EINVAL):
102+
shutil.copy(src, dst)
103+
else:
104+
raise
105+
106+
@staticmethod
107+
def deletesrc(src):
108+
try:
109+
shutil.rmtree(src)
110+
except OSError as e:
111+
if e.errno in (errno.ENOTDIR, errno.EINVAL):
112+
os.remove(src)
113+
else:
114+
raise

pfcon/config.py

+1-1
Original file line numberDiff line numberDiff line change
@@ -28,7 +28,7 @@ def __init__(self):
2828

2929
if self.PFCON_INNETWORK:
3030
self.STORAGE_ENV = env('STORAGE_ENV', 'swift')
31-
if self.STORAGE_ENV not in ('swift', 'filesystem'):
31+
if self.STORAGE_ENV not in ('swift', 'filesystem', 'fslink'):
3232
raise ValueError(f"Unsupported value '{self.STORAGE_ENV}' for STORAGE_ENV")
3333
else:
3434
self.STORAGE_ENV = env('STORAGE_ENV', 'zipfile')

pfcon/filesystem_storage.py

+5-3
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,8 @@
11
"""
2-
Handle filesystem-based (eg. mounted directory) storage. This is used when pfcon is
3-
in-network and configured to directly access the data from a filesystem.
2+
Handle filesystem-based storage. This is used when pfcon is in-network and configured
3+
to directly access the data from a ChRIS shared filesystem. It assumes that both the
4+
input (read-only)and the output (read-write) directories in the shared storage are
5+
directly mounted into the plugin container.
46
"""
57

68
import logging
@@ -24,7 +26,7 @@ def __init__(self, config):
2426

2527
self.fs_mount_base_dir = config.get('STOREBASE_MOUNT')
2628

27-
def store_data(self, job_id, job_incoming_dir, data=None, **kwargs):
29+
def store_data(self, job_id, job_incoming_dir, data, **kwargs):
2830
"""
2931
Count the number of files in the specified job incoming directory.
3032
"""

pfcon/fslink_storage.py

+114
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,114 @@
1+
"""
2+
Handle filesystem-based storage. This is used when pfcon is in-network and configured
3+
to directly access the data from a ChRIS shared filesystem. It only assumes that the
4+
output (read-write) directory in the shared storage is directly mounted into the plugin
5+
container. Unlike the 'filesystem' storage this supports ChRIS links.
6+
"""
7+
8+
import logging
9+
import datetime
10+
import os
11+
import shutil
12+
13+
14+
from .filesystem_storage import FileSystemStorage
15+
16+
17+
logger = logging.getLogger(__name__)
18+
19+
20+
class FSLinkStorage(FileSystemStorage):
21+
22+
def store_data(self, job_id, job_incoming_dir, data, **kwargs):
23+
"""
24+
Copy all the files from the filesystem tree under each input folder (storage
25+
prefix) in the specified data list into the specified job incoming directory.
26+
"""
27+
self.job_id = job_id
28+
self.job_output_path = kwargs['job_output_path']
29+
30+
all_file_paths = set()
31+
32+
for storage_path in data:
33+
storage_path = storage_path.strip('/')
34+
file_paths = set()
35+
visited_paths = set()
36+
37+
self._find_all_file_paths(storage_path, file_paths, visited_paths)
38+
39+
for f_path in file_paths:
40+
if f_path not in all_file_paths: # copy a given file only once
41+
fs_file_path = os.path.join(self.fs_mount_base_dir, f_path)
42+
43+
rel_file_path = f_path.replace(storage_path, '', 1).lstrip('/')
44+
local_file_path = os.path.join(job_incoming_dir, rel_file_path)
45+
46+
try:
47+
shutil.copy(fs_file_path, local_file_path)
48+
except FileNotFoundError:
49+
os.makedirs(os.path.dirname(local_file_path))
50+
shutil.copy(fs_file_path, local_file_path)
51+
52+
all_file_paths.add(f_path)
53+
54+
nfiles = len(all_file_paths)
55+
logger.info(f'{nfiles} files fetched from the filesystem for job {job_id}')
56+
57+
nlinks = self.process_chrislink_files(job_incoming_dir)
58+
nfiles -= nlinks
59+
60+
return {
61+
'jid': job_id,
62+
'nfiles': nfiles,
63+
'timestamp': f'{datetime.datetime.now()}',
64+
'path': job_incoming_dir
65+
}
66+
67+
def delete_data(self, job_dir):
68+
"""
69+
Delete job data from the local storage.
70+
"""
71+
shutil.rmtree(job_dir)
72+
73+
def _find_all_file_paths(self, storage_path, file_paths, visited_paths):
74+
"""
75+
Find all file paths under the passed storage path (prefix) by
76+
recursively following ChRIS links. The resulting set of file paths is given
77+
by the file_paths set argument.
78+
"""
79+
if not storage_path.startswith(tuple(visited_paths)): # avoid infinite loops
80+
visited_paths.add(storage_path)
81+
job_id = self.job_id
82+
job_output_path = self.job_output_path
83+
fs_abs_path = os.path.join(self.fs_mount_base_dir, storage_path)
84+
85+
l_ls = []
86+
if os.path.isfile(fs_abs_path):
87+
l_ls.append(fs_abs_path)
88+
else:
89+
for root, dirs, files in os.walk(fs_abs_path):
90+
for filename in files:
91+
l_ls.append(os.path.join(root, filename))
92+
93+
for abs_file_path in l_ls:
94+
if abs_file_path.endswith('.chrislink'):
95+
try:
96+
with open(abs_file_path, 'rb') as f:
97+
linked_path = f.read().decode().strip()
98+
except Exception as e:
99+
logger.error(f'Failed to read file {abs_file_path} for '
100+
f'job {job_id}, detail: {str(e)}')
101+
raise
102+
103+
if f'{job_output_path}/'.startswith(linked_path.rstrip('/') + '/'):
104+
# link files are not allowed to point to the job output dir or
105+
# any of its ancestors
106+
logger.error(f'Found invalid input path {linked_path} for job '
107+
f'{job_id} pointing to an ancestor of the job '
108+
f'output dir: {job_output_path}')
109+
raise ValueError(f'Invalid input path: {linked_path}')
110+
111+
self._find_all_file_paths(linked_path, file_paths,
112+
visited_paths) # recursive call
113+
file_paths.add(abs_file_path.replace(self.fs_mount_base_dir, '',
114+
1).lstrip('/'))

pfcon/resources.py

+40-13
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@
1313
from .zip_file_storage import ZipFileStorage
1414
from .swift_storage import SwiftStorage
1515
from .filesystem_storage import FileSystemStorage
16+
from .fslink_storage import FSLinkStorage
1617

1718

1819
logger = logging.getLogger(__name__)
@@ -80,7 +81,7 @@ def post(self):
8081
if self.pfcon_innetwork:
8182
if args.input_dirs is None:
8283
abort(400, message='input_dirs: field is required')
83-
if self.storage_env == 'filesystem' and args.output_dir is None:
84+
if args.output_dir is None:
8485
abort(400, message='output_dir: field is required')
8586
else:
8687
if request.files['data_file'] is None:
@@ -99,31 +100,50 @@ def post(self):
99100
input_dir = args.input_dirs[0].strip('/')
100101
output_dir = args.output_dir.strip('/')
101102
incoming_dir = os.path.join(self.storebase_mount, input_dir)
103+
102104
storage = FileSystemStorage(app.config)
103105
try:
104-
d_info = storage.store_data(job_id, incoming_dir)
106+
d_info = storage.store_data(job_id, incoming_dir, None)
105107
except Exception as e:
106108
logger.error(f'Error while accessing files from shared filesystem '
107109
f'for job {job_id}, detail: {str(e)}')
108-
abort(400, message='input_dirs: Error accessing files from shared '
109-
'filesystem')
110+
abort(400,
111+
message='input_dirs: Error accessing files from shared filesystem')
110112
else:
111113
incoming_dir = os.path.join(self.storebase_mount, input_dir)
112-
outgoing_dir = os.path.join(self.storebase_mount, output_dir)
113114
os.makedirs(incoming_dir, exist_ok=True)
114-
os.makedirs(outgoing_dir, exist_ok=True)
115115

116116
if self.pfcon_innetwork:
117117
if self.storage_env == 'swift':
118+
outgoing_dir = os.path.join(self.storebase_mount, output_dir)
119+
os.makedirs(outgoing_dir, exist_ok=True)
120+
118121
storage = SwiftStorage(app.config)
119122
try:
120-
d_info = storage.store_data(job_id, incoming_dir, args.input_dirs)
123+
d_info = storage.store_data(job_id, incoming_dir, args.input_dirs,
124+
job_output_path=args.output_dir.strip('/'))
121125
except ClientException as e:
122126
logger.error(f'Error while fetching files from swift and '
123127
f'storing job {job_id} data, detail: {str(e)}')
124-
abort(400, message='input_dirs: Error fetching files from swift')
128+
abort(400,
129+
message='input_dirs: Error fetching files from swift')
130+
131+
elif self.storage_env == 'fslink':
132+
output_dir = args.output_dir.strip('/')
133+
storage = FSLinkStorage(app.config)
134+
try:
135+
d_info = storage.store_data(job_id, incoming_dir, args.input_dirs,
136+
job_output_path=output_dir)
137+
except Exception as e:
138+
logger.error(f'Error while accessing files from shared filesystem '
139+
f'and storing job {job_id} data, detail: {str(e)}')
140+
abort(400,
141+
message='input_dirs: Error copying files from shared filesystem')
125142
else:
126143
if self.storage_env == 'zipfile':
144+
outgoing_dir = os.path.join(self.storebase_mount, output_dir)
145+
os.makedirs(outgoing_dir, exist_ok=True)
146+
127147
storage = ZipFileStorage(app.config)
128148
data_file = request.files['data_file']
129149
try:
@@ -187,11 +207,14 @@ def delete(self, job_id):
187207
storage = None
188208

189209
if self.pfcon_innetwork:
190-
if self.storage_env == 'swift':
210+
if self.storage_env == 'filesystem':
211+
storage = FileSystemStorage(app.config)
212+
213+
elif self.storage_env == 'swift':
191214
storage = SwiftStorage(app.config)
192215

193-
elif self.storage_env == 'filesystem':
194-
storage = FileSystemStorage(app.config)
216+
elif self.storage_env == 'fslink':
217+
storage = FSLinkStorage(app.config)
195218
else:
196219
if self.storage_env == 'zipfile':
197220
storage = ZipFileStorage(app.config)
@@ -229,7 +252,7 @@ def get(self, job_id):
229252
download_name = f'{job_id}.zip'
230253
mimetype = 'application/zip'
231254

232-
if self.pfcon_innetwork and self.storage_env == 'filesystem':
255+
if self.pfcon_innetwork and self.storage_env in ('filesystem', 'fslink'):
233256
job_output_path = request.args.get('job_output_path')
234257
if not job_output_path:
235258
abort(400, message='job_output_path: query parameter is required')
@@ -239,7 +262,11 @@ def get(self, job_id):
239262
if not os.path.isdir(outgoing_dir):
240263
abort(404)
241264

242-
storage = FileSystemStorage(app.config)
265+
if self.storage_env == 'filesystem':
266+
storage = FileSystemStorage(app.config)
267+
else:
268+
storage = FSLinkStorage(app.config)
269+
243270
content = storage.get_data(job_id, outgoing_dir,
244271
job_output_path=job_output_path)
245272
download_name = f'{job_id}.json'

0 commit comments

Comments
 (0)