diff --git a/.gitignore b/.gitignore index 475785a..c2a7a0d 100644 --- a/.gitignore +++ b/.gitignore @@ -1,4 +1,5 @@ *~ +*.swp *.pyc *.pyo build/ diff --git a/README.md b/README.md index 0241ce2..7f513f4 100644 --- a/README.md +++ b/README.md @@ -302,22 +302,22 @@ Similarly on Linux you can use the 'getfattr' and 'setfattr' commands: $ getfattr -d -m yas3fs file # file: file - yas3fs.URL="http://bucket.s3.amazonaws.com/key" - yas3fs.bucket="S3 bucket" - yas3fs.expiration="2592000 (default)" - yas3fs.key="S3 key" - yas3fs.signedURL="https://bucket.s3.amazonaws.com/..." (for default expiration) + user.yas3fs.URL="http://bucket.s3.amazonaws.com/key" + user.yas3fs.bucket="S3 bucket" + user.yas3fs.expiration="2592000 (default)" + user.yas3fs.key="S3 key" + user.yas3fs.signedURL="https://bucket.s3.amazonaws.com/..." (for default expiration) - $ setfattr -n yas3fs.expiration -v 3600 + $ setfattr -n user.yas3fs.expiration -v 3600 $ getfattr -d -m yas3fs file # file: file - yas3fs.URL="http://bucket.s3.amazonaws.com/key" - yas3fs.bucket="S3 bucket" - yas3fs.expiration="3600" - yas3fs.key="S3 key" - yas3fs.signedURL="https://bucket.s3.amazonaws.com/..." (for 1h expiration) + user.yas3fs.URL="http://bucket.s3.amazonaws.com/key" + user.yas3fs.bucket="S3 bucket" + user.yas3fs.expiration="3600" + user.yas3fs.key="S3 key" + user.yas3fs.signedURL="https://bucket.s3.amazonaws.com/..." (for 1h expiration) - $ setfattr -x yas3fs.expiration latest.zip # File specific expiration removed, the default is used again + $ setfattr -x user.yas3fs.expiration latest.zip # File specific expiration removed, the default is used again ### Notification Syntax & Use @@ -341,7 +341,7 @@ The following `action`(s) are currently implemented: * `rename` (rename file or directory): `[ "node_id", "rename", "old_path", "new_path" ]` * `upload` (new or updated file): `[ "node_id", "upload", "path", "new_md5" ]` (`path` and `new_md5` are optional) * `md` (updated metadata, e.g. attr/xattr): `[ "node_id", "md", "path", "metadata_name" ]` -* `reset` (reset cache): `[ "node_id", "reset" ]` +* `reset` (reset cache): `[ "node_id", "reset", "path" ]` (`path` is optional) * `cache` (change cache config): `[ "node_id", "cache" , "entries" or "mem" or "disk", new_value ]` * `buffer` (change buffer config): `[ "node_id", "buffer", "size" or "prefetch", new_value ]` * `prefetch` (change prefetch config): `[ "node_id", "prefetch", "on" or "off" ]` diff --git a/yas3fs/__init__.py b/yas3fs/__init__.py index fb406dd..f5fab3e 100755 --- a/yas3fs/__init__.py +++ b/yas3fs/__init__.py @@ -9,8 +9,8 @@ import urllib import argparse -import errno -import stat +import errno +import stat import time import os import os.path @@ -42,13 +42,13 @@ from fuse import FUSE, FuseOSError, Operations, LoggingMixIn, fuse_get_context import boto -import boto.s3 +import boto.s3 import boto.sns import boto.sqs import boto.utils from boto.utils import compute_md5, compute_hash -from boto.s3.key import Key +from boto.s3.key import Key from YAS3FSPlugin import YAS3FSPlugin @@ -73,7 +73,7 @@ def __str__(self): return 'None' if isinstance(self.name, str): return self.name.decode('utf8', 'replace') - + return self.name def compute_md5(self, fp, size=None): @@ -152,16 +152,16 @@ def append(self, value): def popleft(self): with self.lock: if self.head.next != self.tail: - value = self.head.next.delete() - del self.index[value] - return value + value = self.head.next.delete() + del self.index[value] + return value else: - return None + return None def delete(self, value): with self.lock: if value in self.index: - self.index[value].delete() - del self.index[value] + self.index[value].delete() + del self.index[value] def move_to_the_tail(self, value): with self.lock: if value in self.index: @@ -280,7 +280,7 @@ def get_content_as_string(self): with self.get_lock(): return self.content.getvalue() elif self.store == 'disk': - with self.get_lock(): + with self.get_lock(): self.content.seek(0) # Go to the beginning return self.content.read() else: @@ -342,13 +342,13 @@ def delete(self, prop=None, wait_until_cleared_proplist = None): if data_range: logger.debug('wake after range delete') data_range.wake(False) # To make downloading threads go on... and then exit - + # for https://github.com/danilop/yas3fs/issues/52 if prop == 'change' and 'invoke_after_change' in self.props: logger.debug('FSData.props[change] removed, now executing invoke_after_change lambda for: ' + self.path) self.get('invoke_after_change')(self.path) del self.props['invoke_after_change'] # cLeanup - + def rename(self, new_path): with self.get_lock(): if self.store == 'disk': @@ -431,7 +431,7 @@ def wait_until_cleared(self, path, proplist = None, max_retries = 10, wait_time if not cleared: # import inspect -# inspect_stack = inspect.stack() +# inspect_stack = inspect.stack() # logger.critical("WAIT_UNTIL_CLEARED stack: '%s'"% pp.pformat(inspect_stack)) logger.error("wait_until_cleared %s could not clear '%s'" % (prop, path)) @@ -508,12 +508,11 @@ def set(self, path, prop, value): self.lru.move_to_the_tail(path) # Move to the tail of the LRU cache with self.get_lock(path): if path in self.entries: - if prop in self.entries[path]: + if prop in self.entries[path]: self.delete(path, prop) - self.entries[path][prop] = value - return True - else: - return False + self.entries[path][prop] = value + return True + return False def inc(self, path, prop): self.lru.move_to_the_tail(path) # Move to the tail of the LRU cache with self.get_lock(path): @@ -554,7 +553,7 @@ def is_empty(self, path): # To improve readability if self.has(path) and not self.has(path, 'attr'): return True else: - return False + return False ###try: ### return len(self.get(path)) <= 1 # Empty or just with 'lock' ###except TypeError: # if get returns None @@ -563,12 +562,12 @@ def is_not_empty(self, path): # To improve readability if self.has(path) and self.has(path, 'attr'): return True else: - return False + return False ###try: ### return len(self.get(path)) > 1 # More than just 'lock' ###except TypeError: # if get returns None ### return False - + class SNS_HTTPServer(BaseHTTPServer.HTTPServer): """ HTTP Server to receive SNS notifications via HTTP """ def set_fs(self, fs): @@ -623,8 +622,8 @@ def do_POST(self): if verify_evp.verify_final(signature.decode('base64')): self.send_response(200) if message_type== 'Notification': - message = message_content['Message'] - logger.debug('message = %s' % message) + message = message_content['Message'] + logger.debug('message = %s' % message) self.server.fs.process_message(message) elif message_type == 'SubscriptionConfirmation': token = message_content['Token'] @@ -667,7 +666,7 @@ def seek(self, offset, whence=0): self.pos = self.pos + offset elif whence == 2: self.pos = self.length + offset - + def tell(self): return self.pos def read(self, n=-1): @@ -695,15 +694,15 @@ def __init__(self, options): # Some constants ### self.http_listen_path_length = 30 self.running = True - + self.check_status_interval = 5.0 # Seconds, no need to configure that - + self.s3_retries = options.s3_retries # Maximum number of S3 retries (outside of boto) logger.info("s3-retries: '%i'" % self.s3_retries) - + self.s3_retries_sleep = options.s3_retries_sleep # retry sleep in seconds logger.info("s3-retries-sleep: '%i' seconds" % self.s3_retries_sleep) - + self.yas3fs_xattrs = [ 'user.yas3fs.bucket', 'user.yas3fs.key', 'user.yas3fs.URL', 'user.yas3fs.signedURL', 'user.yas3fs.expiration' ] self.multipart_uploads_in_progress = 0 @@ -757,7 +756,7 @@ def __init__(self, options): self.aws_managed_encryption = options.aws_managed_encryption logger.info("AWS Managed Encryption enabled: %s" % self.aws_managed_encryption) - + self.st_blksize = None if options.st_blksize: self.st_blksize = options.st_blksize @@ -778,20 +777,20 @@ def __init__(self, options): logger.info("Number of parallel S3 threads (0 to disable writeback): '%i'" % self.s3_num) self.download_num = options.download_num logger.info("Number of parallel downloading threads: '%i'" % self.download_num) - - + + # for https://github.com/danilop/yas3fs/issues/46 self.download_retries_num = options.download_retries_num logger.info("Number download retry attempts: '%i'" % self.download_retries_num) self.download_retries_sleep = options.download_retries_sleep logger.info("Download retry sleep time seconds: '%i'" % self.download_retries_sleep) - + self.read_retries_num = options.read_retries_num logger.info("Number read retry attempts: '%i'" % self.read_retries_num) self.read_retries_sleep = options.read_retries_sleep logger.info("Read retry sleep time seconds: '%i'" % self.read_retries_sleep) - - + + self.prefetch_num = options.prefetch_num logger.info("Number of parallel prefetching threads: '%i'" % self.prefetch_num) self.buffer_size = options.buffer_size * 1024 # To convert KB to bytes @@ -812,7 +811,7 @@ def __init__(self, options): logger.info("Default expiration for signed URLs via xattrs: '%s'" % str(self.default_expiration)) self.requester_pays = options.requester_pays logger.info("S3 Request Payer: '%s'" % str(self.requester_pays)) - + self.default_headers = {} if self.requester_pays: self.default_headers = { 'x-amz-request-payer' : 'requester' } @@ -865,7 +864,7 @@ def __init__(self, options): unique_id_list.append(str(uuid.uuid4())) self.unique_id = '-'.join(unique_id_list) logger.info("Unique node ID: '%s'" % self.unique_id) - + if self.sns_topic_arn: if not self.aws_region in (r.name for r in boto.sns.regions()): error_and_exit("wrong AWS region '%s' for SNS" % self.aws_region) @@ -891,7 +890,7 @@ def __init__(self, options): if not self.sqs: error_and_exit("no SQS connection") if self.new_queue: - hostname_array = [] + hostname_array = [] hostname = '' if self.new_queue_with_hostname: import socket @@ -923,7 +922,7 @@ def __init__(self, options): if self.hostname or self.sns_http_port: if not self.sns_topic_arn: - error_and_exit("The SNS topic must be provided when the hostname/port to listen to SNS HTTP notifications is given") + error_and_exit("The SNS topic must be provided when the hostname/port to listen to SNS HTTP notifications is given") if self.sns_http_port: if not self.hostname: @@ -1033,7 +1032,7 @@ def init(self, path): self.prefetch_threads = {} for i in range(self.prefetch_num): self.prefetch_threads[i] = None - + self.publish_thread = None self.queue_listen_thread = None self.http_listen_thread = None @@ -1074,7 +1073,7 @@ def flush_all_cache(self): data = self.cache.get(path, 'data') if data and data.has('change'): self.upload_to_s3(path, data) - + def destroy(self, path): logger.debug("destroy '%s'" % (path)) # Cleanup for unmount @@ -1085,7 +1084,7 @@ def destroy(self, path): if self.http_listen_thread: self.httpd.shutdown() # To stop HTTP listen thread logger.info("waiting for HTTP listen thread to shutdown...") - self.http_listen_thread.join(5.0) # 5 seconds should be enough + self.http_listen_thread.join(5.0) # 5 seconds should be enough logger.info("HTTP listen thread ended") self.sns.unsubscribe(self.http_subscription) logger.info("Unsubscribed SNS HTTP endpoint") @@ -1115,7 +1114,7 @@ def destroy(self, path): logger.info("waiting for check cache thread to shutdown...") self.check_cache_thread.join(self.cache_check_interval + 1.0) logger.info('File system unmounted.') - + def listen_for_messages_over_http(self): logger.info("Listening on: '%s'" % self.http_listen_url) server_class = SNS_HTTPServer @@ -1170,7 +1169,7 @@ def delete_cache(self, path): def process_message(self, messages): logger.debug("process_message '%s'" % (messages)) c = json.loads(messages) - if c[0] == self.unique_id: + if c[0] == self.unique_id: # discard message coming from itself logger.debug("process message from self discarded '%s'"%(c)) return @@ -1193,9 +1192,18 @@ def process_message(self, messages): self.cache.delete(c[3], 'key') self.cache.delete(c[3], c[2]) elif c[1] == 'reset': - with self.cache.lock: - self.flush_all_cache() - self.cache.reset_all() # Completely reset the cache + if len(c) >= 2 or not c[2] or c[2] == '/': + with self.cache.lock: + self.flush_all_cache() + self.cache.reset_all() # Completely reset the cache + else: + # c[2] exists and is not the root directory + for path in self.cache.entries.keys(): + # If the reset path is a directory and it matches + # the directory in the cache, it will delete the + # parent directory cache as well. + if path.startswith(c[2]): + self.delete_cache(path) elif c[1] == 'url': with self.cache.lock: self.flush_all_cache() @@ -1267,7 +1275,7 @@ def publish_messages(self): logger.error("publish exception: " + full_message.encode('ascii')) raise e - + def publish(self, message): if self.sns_topic_arn: logger.debug("publish '%s'" % (message)) @@ -1287,7 +1295,7 @@ def check_status(self): logger.info("entries, mem_size, disk_size, download_queue, prefetch_queue, s3_queue: %i, %i, %i, %i, %i, %i" % (num_entries, mem_size, disk_size, self.download_queue.qsize(), self.prefetch_queue.qsize(), s3q)) - + logger.info("multipart_uploads_in_progress = " + str(self.multipart_uploads_in_progress)) if debug: @@ -1303,7 +1311,7 @@ def check_status(self): time.sleep(self.check_status_interval) def check_cache_size(self): - + logger.debug("check_cache_size") while self.cache_entries: @@ -1439,7 +1447,7 @@ def get_key(self, path, cache=True): if self.cache.is_deleting(path): logger.debug("get_key path '%s' is deleting -- returning None" % (path)) return None - + if cache and self.cache.is_ready(path): key = self.cache.get(path, 'key') if key: @@ -1484,11 +1492,11 @@ def get_key(self, path, cache=True): def get_metadata(self, path, metadata_name, key=None): logger.debug("get_metadata -> '%s' '%s' '%s'" % (path, metadata_name, key)) with self.cache.get_lock(path): # To avoid consistency issues, e.g. with a concurrent purge - metadata_values = None + metadata_values = None if self.cache.has(path, metadata_name): metadata_values = self.cache.get(path, metadata_name) - if metadata_values == None: + if metadata_values == None: metadata_values = {} if not key: key = self.get_key(path) @@ -1595,7 +1603,7 @@ def set_metadata(self, path, metadata_name=None, metadata_values=None, key=None) ### key.set_contents_from_string('', headers={'Content-Type': 'application/x-directory'}) headers = { 'Content-Type': 'application/x-directory' } headers.update(self.default_write_headers) - + cmds = [ [ 'set_contents_from_string', [ '' ], { 'headers': headers } ] ] self.do_on_s3(key, pub, cmds) else: @@ -1604,19 +1612,19 @@ def set_metadata(self, path, metadata_name=None, metadata_values=None, key=None) key_name = key.name.decode('utf-8') else: key_name = key.name - + cmds = [ [ 'copy', [ key.bucket.name, key_name, key.metadata ], { 'preserve_acl': False, 'encrypt_key':self.aws_managed_encryption } ] ] self.do_on_s3(key, pub, cmds) ###self.publish(['md', metadata_name, path]) - + # handle a request to set metadata, but we can't right now because the node is currently # in the middle of a 'change' https://github.com/danilop/yas3fs/issues/52 elif self.write_metadata and data and data.has('change'): if metadata_name == 'attr' and metadata_values == None: logger.debug("set_metadata: 'change' already in progress, setting FSData.props[invoke_after_change] lambda for self.set_metadata("+path+",attr)") data.set('invoke_after_change',(lambda path: self.set_metadata(path,'attr'))) - + def getattr(self, path, fh=None): logger.debug("getattr -> '%s' '%s'" % (path, fh)) if self.cache.is_deleting(path): @@ -1642,10 +1650,10 @@ def getattr(self, path, fh=None): if attr['st_size'] == 0 and stat.S_ISDIR(attr['st_mode']): attr['st_size'] = 4096 # For compatibility... attr['st_nlink'] = 1 # Something better TODO ??? - + if self.st_blksize: - attr['st_blksize'] = self.st_blksize - + attr['st_blksize'] = self.st_blksize + if self.full_prefetch: # Prefetch if stat.S_ISDIR(attr['st_mode']): self.readdir(path) @@ -1680,10 +1688,10 @@ def readdir(self, path, fh=None): key_list = self.s3_bucket.list(full_path.encode('utf-8'), '/', headers = self.default_headers, encoding_type='url') dirs = ['.', '..'] for k in key_list: - + # 'unquoting' for https://github.com/danilop/yas3fs/issues/56 - k.name = urllib.unquote_plus(str(k.name)).decode('utf-8') - + k.name = urllib.unquote_plus(str(k.name)).decode('utf-8') + logger.debug("readdir '%s' '%s' S3 list key '%s'" % (path, fh, k)) d = k.name[len(full_path):] if len(d) > 0: @@ -1701,7 +1709,7 @@ def readdir(self, path, fh=None): for dir in dirs: convertedDirs.append(unicode(dir)) dirs = convertedDirs - + self.cache.set(path, 'readdir', dirs) logger.debug("readdir '%s' '%s' '%s'" % (path, fh, dirs)) @@ -1757,7 +1765,7 @@ def mkdir(self, path, mode): ### self.publish(['mkdir', path]) return 0 - + def symlink(self, path, link): logger.debug("symlink '%s' '%s'" % (path, link)) with self.cache.get_lock(path): @@ -1807,7 +1815,7 @@ def symlink(self, path, link): return 0 - def check_data(self, path): + def check_data(self, path): logger.debug("check_data '%s'" % (path)) with self.cache.get_lock(path): data = self.cache.get(path, 'data') @@ -1874,7 +1882,7 @@ def download(self, prefetch=False): if prefetch: (path, start, end) = self.prefetch_queue.get(True, 1) # 1 second time-out else: - + (path, start, end) = self.download_queue.get(True, 1) # 1 second time-out self.download_data(path, start, end) if prefetch: @@ -1936,18 +1944,18 @@ def download_data(self, path, start, end): # for https://github.com/danilop/yas3fs/issues/46 retriesAttempted = 0 while retry: - + # for https://github.com/danilop/yas3fs/issues/62 if key is None: logger.warn("download_data 'key' is None!.. exiting retry loop") break - + retriesAttempted += 1 - + # for https://github.com/danilop/yas3fs/issues/46 if retriesAttempted > self.download_retries_num: retry = False - + logger.debug("download_data range '%s' '%s' [thread '%s'] max: %i sleep: %i retries: %i" % (path, range_headers, thread_name, self.download_retries_num, self.download_retries_sleep, retriesAttempted)) try: if debug: @@ -1959,7 +1967,7 @@ def download_data(self, path, start, end): if debug: n2=dt.datetime.now() retry = False - + except Exception as e: logger.exception(e) logger.info("download_data error '%s' %i-%i [thread '%s'] -> retrying max: %i sleep: %i retries: %i" % (path, start, end, thread_name, self.download_retries_num, self.download_retries_sleep, retriesAttempted)) @@ -2018,7 +2026,7 @@ def get_to_do_on_s3(self, i): (key, pub, cmds) = self.s3_queue[i].get(True, 1) # 1 second time-out # MUTABLE PROTECTION # various sections of do_cmd_on_s3_now have the potential - # of mutating pub, this tries to keep the queue clean + # of mutating pub, this tries to keep the queue clean # in case a retry happens. pub = copy.copy(pub) @@ -2041,9 +2049,9 @@ def do_cmd_on_s3_now(self, key, pub, action, args, kargs): # fuse/yas3fs is version unaware and all operation should # happen to the current version - # also we don't track updated key.version_id in self.cache + # also we don't track updated key.version_id in self.cache # so it is likely that what was stored has been staled - key.version_id = None + key.version_id = None try: if action == 'delete': @@ -2057,7 +2065,7 @@ def do_cmd_on_s3_now(self, key, pub, action, args, kargs): path = self.remove_prefix(args[1]) if path.endswith('/'): - # this is a directory, but interally stored w/o + # this is a directory, but interally stored w/o # trailing slash path = path[:-1] @@ -2071,10 +2079,10 @@ def do_cmd_on_s3_now(self, key, pub, action, args, kargs): key.set_contents_from_string(*args,**kargs) elif action == 'set_contents_from_file': data = args[0] # First argument must be data - + if data.cache.is_deleting(data.path): return None - + try: # ignore deleting flag, though will fail w/ IOError key.set_contents_from_file(data.get_content(wait_until_cleared_proplist = ['s3_busy']),**kargs) @@ -2092,10 +2100,10 @@ def do_cmd_on_s3_now(self, key, pub, action, args, kargs): elif action == 'multipart_upload': data = args[1] # Second argument must be data - + if data.cache.is_deleting(data.path): return None - + full_size = args[2] # Third argument must be full_size complete = self.multipart_upload(*args) @@ -2196,7 +2204,7 @@ def readlink(self, path): data.close() return link.decode('utf-8') - + def rmdir(self, path): logger.debug("rmdir '%s'" % (path)) @@ -2337,12 +2345,12 @@ def rename_on_s3(self, key, source_path, target_path, dir): if dir: target += '/' pub = [ 'rename', source_path, target_path ] - + if isinstance(target,str): target_for_cmd = target.decode('utf-8') else: target_for_cmd = target - + cmds = [ [ 'copy', [ key.bucket.name, target_for_cmd, key.metadata ], { 'preserve_acl': False , 'encrypt_key':self.aws_managed_encryption } ], [ 'delete', [], { 'headers': self.default_headers } ] ] @@ -2409,11 +2417,11 @@ def unlink(self, path): self.do_on_s3(k, pub, cmds) # self.do_on_s3_now(k, pub, cmds) - return 0 + return 0 def create(self, path, mode, fi=None): logger.debug("create '%s' '%i' '%s'" % (path, mode, fi)) - return self.open(path, mode) + return self.open(path, mode) def open(self, path, flags): logger.debug("open '%s' '%i'" % (path, flags)) @@ -2423,7 +2431,7 @@ def open(self, path, flags): self.mknod(path, flags) self.cache.get(path, 'data').open() logger.debug("open '%s' '%i' '%s'" % (path, flags, self.cache.get(path, 'data').get('open'))) - return 0 + return 0 def release(self, path, flags): logger.debug("release '%s' '%i'" % (path, flags)) @@ -2444,7 +2452,7 @@ def release(self, path, flags): logger.debug("release '%s' '%i' '%s'" % (path, flags, data.get('open'))) else: logger.debug("release '%s' '%i'" % (path, flags)) - return 0 + return 0 def read(self, path, length, offset, fh=None): logger.debug("read '%s' '%i' '%i' '%s'" % (path, length, offset, fh)) @@ -2462,23 +2470,23 @@ def read(self, path, length, offset, fh=None): retriesAttempted = 0 while retry: retriesAttempted += 1 - + # for https://github.com/danilop/yas3fs/issues/46 if retriesAttempted > self.read_retries_num: - logger.error("read '%s' '%i' '%i' '%s' max read retries exceeded max: %i sleep: %i retries: %i, raising FuseOSError(errno.EIO) ''" % (path, length, offset, fh, self.read_retries_num, self.read_retries_sleep, retriesAttempted)) + logger.error("read '%s' '%i' '%i' '%s' max read retries exceeded max: %i sleep: %i retries: %i, raising FuseOSError(errno.EIO) ''" % (path, length, offset, fh, self.read_retries_num, self.read_retries_sleep, retriesAttempted)) retry = False self.invalidate_cache(path) raise FuseOSError(errno.EIO) - + data = self.cache.get(path, 'data') if not data: - logger.debug("read '%s' '%i' '%i' '%s' no data" % (path, length, offset, fh)) + logger.debug("read '%s' '%i' '%i' '%s' no data" % (path, length, offset, fh)) return '' # Something better ??? data_range = data.get('range') if data_range == None: logger.debug("read '%s' '%i' '%i' '%s' no range" % (path, length, offset, fh)) break - + attr = self.get_metadata(path, 'attr') file_size = attr['st_size'] end_interval = min(offset + length, file_size) - 1 @@ -2496,15 +2504,15 @@ def read(self, path, length, offset, fh=None): prefetch_interval = [prefetch_start, prefetch_end_interval] if not data_range.interval.contains(prefetch_interval): self.enqueue_download_data(path, prefetch_start, prefetch_length, prefetch=True) - logger.debug("read '%s' '%i' '%i' '%s' in range" % (path, length, offset, fh)) + logger.debug("read '%s' '%i' '%i' '%s' in range" % (path, length, offset, fh)) break else: # Note added max retries as this can go on forever... for https://github.com/danilop/yas3fs/issues/46 logger.debug("read '%s' '%i' '%i' '%s' out of range" % (path, length, offset, fh)) self.enqueue_download_data(path, offset, length) time.sleep(self.read_retries_sleep) - - + + logger.debug("read wait '%s' '%i' '%i' '%s'" % (path, length, offset, fh)) data_range.wait() logger.debug("read awake '%s' '%i' '%i' '%s'" % (path, length, offset, fh)) @@ -2529,7 +2537,7 @@ def write(self, path, new_data, offset, fh=None): if isinstance(new_data, unicode): # Fix for unicode logger.debug("write '%s' '%i' '%i' '%s' unicode fix" % (path, len(new_data), offset, fh)) new_data = str(new_data.encode('utf-8')) - length = len(new_data) + length = len(new_data) data = self.cache.get(path, 'data') data_range = data.get('range') @@ -2537,21 +2545,21 @@ def write(self, path, new_data, offset, fh=None): if data_range: self.enqueue_download_data(path) while data_range: - logger.debug("write wait '%s' '%i' '%i' '%s'" % (path, len(new_data), offset, fh)) + logger.debug("write wait '%s' '%i' '%i' '%s'" % (path, len(new_data), offset, fh)) data_range.wait() - logger.debug("write awake '%s' '%i' '%i' '%s'" % (path, len(new_data), offset, fh)) + logger.debug("write awake '%s' '%i' '%i' '%s'" % (path, len(new_data), offset, fh)) data_range = data.get('range') - - with data.get_lock(): + + with data.get_lock(): if not data.content: - logger.info("write awake '%s' '%i' '%i' '%s' no content" % (path, len(new_data), offset, fh)) + logger.info("write awake '%s' '%i' '%i' '%s' no content" % (path, len(new_data), offset, fh)) return 0 logger.debug("write '%s' '%i' '%i' '%s' '%s' content" % (path, len(new_data), offset, fh, data.content.name.decode('utf-8'))) data.content.seek(offset) data.content.write(new_data) data.set('change', True) - now = get_current_time() - attr = self.get_metadata(path, 'attr') + now = get_current_time() + attr = self.get_metadata(path, 'attr') old_size = attr['st_size'] new_size = max(old_size, offset + length) if new_size != old_size: @@ -2560,7 +2568,7 @@ def write(self, path, new_data, offset, fh=None): attr['st_mtime'] = now attr['st_atime'] = now return length - + def upload_to_s3(self, path, data): logger.debug("upload_to_s3 '%s'" % path) k = self.get_key(path) @@ -2621,13 +2629,13 @@ def multipart_upload(self, key_path, data, full_size, headers, metadata): logger.debug("initiate_multipart_upload '%s' '%s'" % (key_path, headers)) num_threads = min(part_num, self.multipart_num) logger.debug("multipart_upload '%s' num_threads '%s'" % (key_path, num_threads)) - + # encoding for https://github.com/danilop/yas3fs/issues/56 mpu = self.s3_bucket.initiate_multipart_upload(key_path.encode('utf-8'), headers=headers, metadata=metadata) - + self.multipart_uploads_in_progress += 1 - - for i in range(num_threads): + + for i in range(num_threads): t = TracebackLoggingThread(target=self.part_upload, args=(mpu, part_queue)) t.demon = True t.start() @@ -2662,18 +2670,18 @@ def part_upload(self, mpu, part_queue): # reset to initial position, before next retry # this force fixes an issue where the position # is off after an uncaught low-level connection - # exception is thrown + # exception is thrown part.pos = 0 logger.exception(e) logger.info("error during multipart upload part %i retry %i part__ %s : %s" % (num, retry, str(part.__dict__), sys.exc_info()[0])) - time.sleep(self.s3_retries_sleep) # Better wait N seconds before retrying + time.sleep(self.s3_retries_sleep) # Better wait N seconds before retrying logger.debug("end upload of part %i retry %i part__ %s" % (num, retry, str(part.__dict__))) part_queue.task_done() except Queue.Empty: logger.debug("the queue is empty") - + def chmod(self, path, mode): logger.debug("chmod '%s' '%i'" % (path, mode)) @@ -2742,6 +2750,9 @@ def utimens(self, path, times=None): def getxattr(self, path, name, position=0): logger.debug("getxattr '%s' '%s' '%i'" % (path, name, position)) + if name in ['yas3fs.bucket', 'user.yas3fs.bucket']: + return self.s3_bucket_name + if self.cache.is_deleting(path): logger.debug("getxattr path '%s' is deleting -- throwing ENOENT" % (path)) raise FuseOSError(errno.ENOENT) @@ -2749,41 +2760,42 @@ def getxattr(self, path, name, position=0): if self.cache.is_empty(path): logger.debug("getxattr '%s' '%s' '%i' ENOENT" % (path, name, position)) raise FuseOSError(errno.ENOENT) - if name in ['yas3fs.bucket', 'user.yas3fs.bucket']: - return self.s3_bucket_name + + key = self.get_key(path) + if not key: + if self.darwin: + raise FuseOSError(errno.ENOENT) # Should return ENOATTR + else: + return '' # Should return ENOATTR if name in ['yas3fs.key', 'user.yas3fs.key']: - key = self.get_key(path) - if key: - return key.key - elif name in ['yas3fs.URL', 'user.yas3fs.URL']: - key = self.get_key(path) - if key: - tmp_key = copy.copy(key) - tmp_key.metadata = {} # To remove unnecessary metadata headers - tmp_key.version_id = None - return tmp_key.generate_url(expires_in=0, headers=self.default_headers, query_auth=False) + return key.key + + if name in ['yas3fs.URL', 'user.yas3fs.URL']: + tmp_key = copy.copy(key) + tmp_key.metadata = {} # To remove unnecessary metadata headers + tmp_key.version_id = None + return tmp_key.generate_url(expires_in=0, headers=self.default_headers, query_auth=False) + xattr = self.get_metadata(path, 'xattr') if xattr == None: logger.debug("getxattr <- '%s' '%s' '%i' ENOENT" % (path, name, position)) raise FuseOSError(errno.ENOENT) if name in ['yas3fs.signedURL', 'user.yas3fs.signedURL']: - key = self.get_key(path) - if key: - try: - seconds = int(xattr['user.yas3fs.expiration']) - except KeyError: - seconds = self.default_expiration - tmp_key = copy.copy(key) - tmp_key.metadata = {} # To remove unnecessary metadata headers - tmp_key.version_id = None - return tmp_key.generate_url(expires_in=seconds, headers=self.default_headers) - elif name in ['yas3fs.expiration', 'user.yas3fs.expiration']: - key = self.get_key(path) - if key: - if name not in xattr: - return str(self.default_expiration) + ' (default)' + try: + seconds = int(xattr['user.yas3fs.expiration']) + except KeyError: + seconds = self.default_expiration + tmp_key = copy.copy(key) + tmp_key.metadata = {} # To remove unnecessary metadata headers + tmp_key.version_id = None + return tmp_key.generate_url(expires_in=seconds, headers=self.default_headers) + + if name in ['yas3fs.expiration', 'user.yas3fs.expiration']: + if 'user.yas3fs.expiration' not in xattr: + return str(self.default_expiration) + ' (default)' + try: return xattr[name] except KeyError: @@ -2815,10 +2827,15 @@ def removexattr(self, path, name): logger.debug("removexattr path '%s' is deleting -- throwing ENOENT" % (path)) raise FuseOSError(errno.ENOENT) + with self.cache.get_lock(path): if self.cache.is_empty(path): logger.debug("removexattr '%s' '%s' ENOENT" % (path, name)) raise FuseOSError(errno.ENOENT) + + if name in self.yas3fs_xattrs and name not in ['user.yas3fs.expiration']: + return 0 # Do nothing + xattr = self.get_metadata(path, 'xattr') try: del xattr[name] @@ -2843,8 +2860,8 @@ def setxattr(self, path, name, value, options, position=0): if self.cache.is_empty(path): logger.debug("setxattr '%s' '%s' ENOENT" % (path, name)) raise FuseOSError(errno.ENOENT) - if name in [ 'user.yas3fs.bucket', 'user.yas3fs.key', 'user.yas3fs.URL', 'user.yas3fs.signedURL' ]: - return 0 # Do nothing + if name in self.yas3fs_xattrs and name not in ['user.yas3fs.expiration']: + return 0 # Do nothing xattr = self.get_metadata(path, 'xattr') if xattr < 0: return xattr @@ -2882,7 +2899,7 @@ def run(self): raise class CompressedRotatingFileHandler(logging.handlers.RotatingFileHandler): - """ compress old files + """ compress old files from http://roadtodistributed.blogspot.com/2011/04/compressed-rotatingfilehandler-for.html """ def __init__(self, filename, mode='a', maxBytes=0, backupCount=0, encoding=None, delay=0): @@ -2934,16 +2951,16 @@ def create_dirs(dirname): try: if not isinstance(dirname,str): dirname = dirname.encode('utf-8') - + os.makedirs(dirname) logger.debug("create_dirs '%s' done" % dirname) - except OSError as exc: # Python >2.5 + except OSError as exc: # Python >2.5 if exc.errno == errno.EEXIST and os.path.isdir(dirname): logger.debug("create_dirs '%s' already there" % dirname) pass else: raise - + except Exception as exc: # Python >2.5 logger.debug("create_dirs '%s' ERROR %s" % (dirname, exc)) raise @@ -3071,14 +3088,14 @@ def main(): help='interval between cache size checks in seconds (default is %(default)s seconds)') parser.add_argument('--s3-num', metavar='N', type=int, default=32, help='number of parallel S3 calls (0 to disable writeback, default is %(default)s)') - + parser.add_argument('--s3-retries', metavar='N', type=int, default=3, help='number of of times to retry any s3 write operation (default is %(default)s)') - + parser.add_argument('--s3-retries-sleep', metavar='N', type=int, default=1, help='retry sleep in seconds between s3 write operations (default is %(default)s)') - - + + parser.add_argument('--download-num', metavar='N', type=int, default=4, help='number of parallel downloads (default is %(default)s)') parser.add_argument('--download-retries-num', metavar='N', type=int, default=60, @@ -3110,7 +3127,7 @@ def main(): '(0 to disable multipart upload, default is %(default)s)') parser.add_argument('--mp-retries', metavar='N', type=int, default=3, help='max number of retries in uploading a part (default is %(default)s)') - parser.add_argument('--aws-managed-encryption', action='store_true', + parser.add_argument('--aws-managed-encryption', action='store_true', help='Enable AWS managed encryption (sets header x-amz-server-side-encryption = AES256)') parser.add_argument('--id', help='a unique ID identifying this node in a cluster (default is a UUID)') @@ -3180,7 +3197,7 @@ def main(): logger.setLevel(logging.INFO) sys.excepthook = custom_sys_excepthook # This is not working for new threads that start afterwards - + logger.debug("options = %s" % options) if options.mkdir: