Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
203 changes: 150 additions & 53 deletions lib/reindex/s3_bucketd.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
import argparse
import concurrent.futures as futures
import functools
import itertools
import json
import logging
Expand All @@ -8,6 +9,7 @@
import sys
import time
import urllib
from pathlib import Path
from collections import defaultdict, namedtuple
from concurrent.futures import ThreadPoolExecutor

Expand All @@ -31,11 +33,38 @@ def get_options():
parser.add_argument("-n", "--sentinel-cluster-name", default='scality-s3', help="Redis cluster name")
parser.add_argument("-s", "--bucketd-addr", default='http://127.0.0.1:9000', help="URL of the bucketd server")
parser.add_argument("-w", "--worker", default=10, type=int, help="Number of workers")
parser.add_argument("-b", "--bucket", default=None, help="Bucket to be processed")
parser.add_argument("-r", "--max-retries", default=2, type=int, help="Max retries before failing a bucketd request")
parser.add_argument("--only-latest-when-locked", action='store_true', help="Only index the latest version of a key when the bucket has a default object lock policy")
parser.add_argument("--debug", action='store_true', help="Enable debug logging")
return parser.parse_args()
parser.add_argument("--dry-run", action="store_true", help="Do not update redis")
group = parser.add_mutually_exclusive_group()
group.add_argument("-a", "--account", default=[], help="account canonical ID (all account buckets will be processed)", action="append", type=nonempty_string('account'))
group.add_argument("--account-file", default=None, help="file containing account canonical IDs, one ID per line", type=existing_file)
group.add_argument("-b", "--bucket", default=[], help="bucket name", action="append", type=nonempty_string('bucket'))
group.add_argument("--bucket-file", default=None, help="file containing bucket names, one bucket name per line", type=existing_file)

options = parser.parse_args()
if options.bucket_file:
with open(options.bucket_file) as f:
options.bucket = [line.strip() for line in f if line.strip()]
elif options.account_file:
with open(options.account_file) as f:
options.account = [line.strip() for line in f if line.strip()]

return options

def nonempty_string(flag):
def inner(value):
if not value.strip():
raise argparse.ArgumentTypeError("%s: value must not be empty"%flag)
return value
return inner

def existing_file(path):
path = Path(path).resolve()
if not path.exists():
raise argparse.ArgumentTypeError("File does not exist: %s"%path)
return path

def chunks(iterable, size):
it = iter(iterable)
Expand All @@ -62,6 +91,10 @@ class InvalidListing(Exception):
def __init__(self, bucket):
super().__init__('Invalid contents found while listing bucket %s'%bucket)

class BucketNotFound(Exception):
def __init__(self, bucket):
super().__init__('Bucket %s not found'%bucket)

class BucketDClient:

'''Performs Listing calls against bucketd'''
Expand Down Expand Up @@ -141,6 +174,7 @@ def _list_bucket(self, bucket, **kwargs):
else:
is_truncated = len(payload) > 0

@functools.lru_cache(maxsize=16)
def _get_bucket_attributes(self, name):
url = self.__url_attribute_format.format(addr=self._bucketd_addr, bucket=name)
try:
Expand All @@ -149,7 +183,7 @@ def _get_bucket_attributes(self, name):
return resp.json()
else:
_log.error('Error getting bucket attributes bucket:%s status_code:%s'%(name, resp.status_code))
raise InvalidListing(name)
raise BucketNotFound(name)
except ValueError as e:
_log.exception(e)
_log.error('Invalid attributes response body! bucket:%s'%name)
Expand All @@ -162,7 +196,15 @@ def _get_bucket_attributes(self, name):
_log.error('Unhandled exception getting bucket attributes bucket:%s'%name)
raise

def list_buckets(self, name = None):
def get_bucket_md(self, name):
md = self._get_bucket_attributes(name)
canonId = md.get('owner')
if canonId is None:
_log.error('No owner found for bucket %s'%name)
raise InvalidListing(name)
return Bucket(canonId, name, md.get('objectLockEnabled', False))

def list_buckets(self, account=None):

def get_next_marker(p):
if p is None:
Expand All @@ -174,25 +216,24 @@ def get_next_marker(p):
'maxKeys': 1000,
'marker': get_next_marker
}

if account is not None:
params['prefix'] = '%s..|..' % account

for _, payload in self._list_bucket(USERS_BUCKET, **params):
buckets = []
for result in payload.get('Contents', []):
match = re.match("(\w+)..\|..(\w+.*)", result['key'])
bucket = Bucket(*match.groups(), False)
if name is None or bucket.name == name:
# We need to get the attributes for each bucket to determine if it is locked
if self._only_latest_when_locked:
bucket_attrs = self._get_bucket_attributes(bucket.name)
object_lock_enabled = bucket_attrs.get('objectLockEnabled', False)
bucket = bucket._replace(object_lock_enabled=object_lock_enabled)
buckets.append(bucket)
# We need to get the attributes for each bucket to determine if it is locked
if self._only_latest_when_locked:
bucket_attrs = self._get_bucket_attributes(bucket.name)
object_lock_enabled = bucket_attrs.get('objectLockEnabled', False)
bucket = bucket._replace(object_lock_enabled=object_lock_enabled)
buckets.append(bucket)

if buckets:
yield buckets
if name is not None:
# Break on the first matching bucket if a name is given
break


def list_mpus(self, bucket):
_bucket = MPU_SHADOW_BUCKET_PREFIX + bucket.name
Expand Down Expand Up @@ -255,7 +296,7 @@ def _sum_objects(self, bucket, listing, only_latest_when_locked = False):
total_size += size

except InvalidListing:
_log.error('Invalid contents in listing. bucket:%s status_code:%s'%(bucket.name, status_code))
_log.error('Invalid contents in listing. bucket:%s'%bucket.name)
raise InvalidListing(bucket.name)
return count, total_size

Expand Down Expand Up @@ -324,6 +365,23 @@ def get_next_marker(p):
total_size=total_size
)

def list_all_buckets(bucket_client):
return bucket_client.list_buckets()

def list_specific_accounts(bucket_client, accounts):
for account in accounts:
yield from bucket_client.list_buckets(account=account)

def list_specific_buckets(bucket_client, buckets):
batch = []
for bucket in buckets:
try:
batch.append(bucket_client.get_bucket_md(bucket))
except BucketNotFound:
_log.error('Failed to list bucket %s. Removing from results.'%bucket)
continue

yield batch

def index_bucket(client, bucket):
'''
Expand Down Expand Up @@ -399,18 +457,24 @@ def log_report(resource, name, obj_count, total_size):

if __name__ == '__main__':
options = get_options()
if options.bucket is not None and not options.bucket.strip():
print('You must provide a bucket name with the --bucket flag')
sys.exit(1)
if options.debug:
_log.setLevel(logging.DEBUG)

bucket_client = BucketDClient(options.bucketd_addr, options.max_retries, options.only_latest_when_locked)
redis_client = get_redis_client(options)
account_reports = {}
observed_buckets = set()
failed_accounts = set()

if options.account:
batch_generator = list_specific_accounts(bucket_client, options.account)
elif options.bucket:
batch_generator = list_specific_buckets(bucket_client, options.bucket)
else:
batch_generator = list_all_buckets(bucket_client)

with ThreadPoolExecutor(max_workers=options.worker) as executor:
for batch in bucket_client.list_buckets(options.bucket):
for batch in batch_generator:
bucket_reports = {}
jobs = { executor.submit(index_bucket, bucket_client, b): b for b in batch }
for job in futures.as_completed(jobs.keys()):
Expand All @@ -429,51 +493,84 @@ def log_report(resource, name, obj_count, total_size):
update_report(account_reports, total.bucket.userid, total.obj_count, total.total_size)

# Bucket reports can be updated as we get them
pipeline = redis_client.pipeline(transaction=False) # No transaction to reduce redis load
for bucket, report in bucket_reports.items():
update_redis(pipeline, 'buckets', bucket, report['obj_count'], report['total_size'])
log_report('buckets', bucket, report['obj_count'], report['total_size'])
pipeline.execute()
if options.dry_run:
for bucket, report in bucket_reports.items():
_log.info(
"DryRun: resource buckets [%s] would be updated with obj_count %i and total_size %i" % (
bucket, report['obj_count'], report['total_size']
)
)
else:
pipeline = redis_client.pipeline(transaction=False) # No transaction to reduce redis load
for bucket, report in bucket_reports.items():
update_redis(pipeline, 'buckets', bucket, report['obj_count'], report['total_size'])
log_report('buckets', bucket, report['obj_count'], report['total_size'])
pipeline.execute()

stale_buckets = set()
recorded_buckets = set(get_resources_from_redis(redis_client, 'buckets'))
if options.bucket is None:
stale_buckets = recorded_buckets.difference(observed_buckets)
elif observed_buckets and options.bucket not in recorded_buckets:
# The provided bucket does not exist, so clean up any metrics
stale_buckets = { options.bucket }
if options.bucket:
stale_buckets = { b for b in options.bucket if b not in observed_buckets }
elif options.account:
_log.warning('Stale buckets will not be cleared when using the --account or --account-file flags')
else:
stale_buckets = set()
stale_buckets = recorded_buckets.difference(observed_buckets)

_log.info('Found %s stale buckets' % len(stale_buckets))
for chunk in chunks(stale_buckets, ACCOUNT_UPDATE_CHUNKSIZE):
pipeline = redis_client.pipeline(transaction=False) # No transaction to reduce redis load
for bucket in chunk:
update_redis(pipeline, 'buckets', bucket, 0, 0)
log_report('buckets', bucket, 0, 0)
pipeline.execute()
if options.dry_run:
_log.info("DryRun: not updating stale buckets")
else:
for chunk in chunks(stale_buckets, ACCOUNT_UPDATE_CHUNKSIZE):
pipeline = redis_client.pipeline(transaction=False) # No transaction to reduce redis load
for bucket in chunk:
update_redis(pipeline, 'buckets', bucket, 0, 0)
log_report('buckets', bucket, 0, 0)
pipeline.execute()

# Account metrics are not updated if a bucket is specified
if options.bucket is None:
if options.bucket:
_log.warning('Account metrics will not be updated when using the --bucket or --bucket-file flags')
else:
# Don't update any accounts with failed listings
without_failed = filter(lambda x: x[0] not in failed_accounts, account_reports.items())
# Update total account reports in chunks
for chunk in chunks(without_failed, ACCOUNT_UPDATE_CHUNKSIZE):
pipeline = redis_client.pipeline(transaction=False) # No transaction to reduce redis load
for userid, report in chunk:
update_redis(pipeline, 'accounts', userid, report['obj_count'], report['total_size'])
log_report('accounts', userid, report['obj_count'], report['total_size'])
pipeline.execute()
if options.dry_run:
for userid, report in account_reports.items():
_log.info(
"DryRun: resource account [%s] would be updated with obj_count %i and total_size %i" % (
userid, report['obj_count'], report['total_size']
)
)
else:
# Update total account reports in chunks
for chunk in chunks(without_failed, ACCOUNT_UPDATE_CHUNKSIZE):
pipeline = redis_client.pipeline(transaction=False) # No transaction to reduce redis load
for userid, report in chunk:
update_redis(pipeline, 'accounts', userid, report['obj_count'], report['total_size'])
log_report('accounts', userid, report['obj_count'], report['total_size'])
pipeline.execute()

if options.account:
for account in options.account:
if account in failed_accounts:
_log.error("No metrics updated for account %s, one or more buckets failed" % account)

# Include failed_accounts in observed_accounts to avoid clearing metrics
observed_accounts = failed_accounts.union(set(account_reports.keys()))
recorded_accounts = set(get_resources_from_redis(redis_client, 'accounts'))

# Stale accounts and buckets are ones that do not appear in the listing, but have recorded values
stale_accounts = recorded_accounts.difference(observed_accounts)
if options.account:
stale_accounts = { a for a in options.account if a not in observed_accounts }
else:
# Stale accounts and buckets are ones that do not appear in the listing, but have recorded values
stale_accounts = recorded_accounts.difference(observed_accounts)

_log.info('Found %s stale accounts' % len(stale_accounts))
for chunk in chunks(stale_accounts, ACCOUNT_UPDATE_CHUNKSIZE):
pipeline = redis_client.pipeline(transaction=False) # No transaction to reduce redis load
for account in chunk:
update_redis(pipeline, 'accounts', account, 0, 0)
log_report('accounts', account, 0, 0)
pipeline.execute()
if options.dry_run:
_log.info("DryRun: not updating stale accounts")
else:
for chunk in chunks(stale_accounts, ACCOUNT_UPDATE_CHUNKSIZE):
pipeline = redis_client.pipeline(transaction=False) # No transaction to reduce redis load
for account in chunk:
update_redis(pipeline, 'accounts', account, 0, 0)
log_report('accounts', account, 0, 0)
pipeline.execute()