Skip to content

Commit

Permalink
OTHER: added job naming to helpers (#55)
Browse files Browse the repository at this point in the history
  • Loading branch information
RachelTucker authored Mar 10, 2022
1 parent c26b12e commit b7d262d
Show file tree
Hide file tree
Showing 2 changed files with 50 additions and 16 deletions.
36 changes: 24 additions & 12 deletions ds3/ds3Helpers.py
Original file line number Diff line number Diff line change
Expand Up @@ -150,7 +150,7 @@ def get_checksum_type(self, bucket_name: str) -> str:
return policy_response.result['ChecksumType']

def put_objects(self, put_objects: List[HelperPutObject], bucket: str, max_threads: int = 5,
calculate_checksum: bool = False) -> str:
calculate_checksum: bool = False, job_name: str = None) -> str:
"""
Puts a list of objects to a Black Pearl bucket.
Expand All @@ -168,6 +168,8 @@ def put_objects(self, put_objects: List[HelperPutObject], bucket: str, max_threa
if the client and BP checksums do not match. Note that calculating the checksum is processor intensive, and
it also requires two reads of the object (first to calculate checksum, and secondly to send the data). The
type of checksum calculated is determined by the data policy associated with the bucket.
job_name : str
The name to give the BP put job.
"""
# If calculating checksum, then determine the checksum type from the data policy
checksum_type = None
Expand All @@ -181,7 +183,7 @@ def put_objects(self, put_objects: List[HelperPutObject], bucket: str, max_threa
put_objects_map[entry.object_name] = entry

bulk_put = self.client.put_bulk_job_spectra_s3(
PutBulkJobSpectraS3Request(bucket_name=bucket, object_list=ds3_put_objects))
PutBulkJobSpectraS3Request(bucket_name=bucket, object_list=ds3_put_objects, name=job_name))

job_id = bulk_put.result['JobId']

Expand Down Expand Up @@ -244,7 +246,8 @@ def put_blob(self, bucket: str, put_object: HelperPutObject, length: int, offset
stream.close()

def put_all_objects_in_directory(self, source_dir: str, bucket: str, objects_per_bp_job: int = 1000,
max_threads: int = 5, calculate_checksum: bool = False) -> List[str]:
max_threads: int = 5, calculate_checksum: bool = False,
job_name: str = None) -> List[str]:
"""
Puts all files and subdirectories to a Black Pearl bucket.
Expand All @@ -267,6 +270,8 @@ def put_all_objects_in_directory(self, source_dir: str, bucket: str, objects_per
and BP checksums do not match. Note that calculating the checksum is processor intensive, and it also
requires two reads of the object (first to calculate checksum, and secondly to send the data). The type of
checksum calculated is determined by the data policy associated with the bucket.
job_name : str
The name to give the BP put jobs. All BP jobs that are created will have the same name.
"""
obj_list: List[HelperPutObject] = list()
job_list: List[str] = list()
Expand All @@ -277,8 +282,8 @@ def put_all_objects_in_directory(self, source_dir: str, bucket: str, objects_per
size = os.path.getsize(obj_path)
obj_list.append(HelperPutObject(object_name=obj_name, file_path=obj_path, size=size))
if len(obj_list) >= objects_per_bp_job:
job_list.append(self.put_objects(
obj_list, bucket, max_threads=max_threads, calculate_checksum=calculate_checksum))
job_list.append(self.put_objects(obj_list, bucket, max_threads=max_threads,
calculate_checksum=calculate_checksum, job_name=job_name))
obj_list = []

for name in dirs:
Expand All @@ -287,17 +292,18 @@ def put_all_objects_in_directory(self, source_dir: str, bucket: str, objects_per
path.join(path.normpath(path.relpath(path=dir_path, start=source_dir)), ""))
obj_list.append(HelperPutObject(object_name=dir_name, file_path=dir_path, size=0))
if len(obj_list) >= objects_per_bp_job:
job_list.append(self.put_objects(
obj_list, bucket, max_threads=max_threads, calculate_checksum=calculate_checksum))
job_list.append(self.put_objects(obj_list, bucket, max_threads=max_threads,
calculate_checksum=calculate_checksum, job_name=job_name))
obj_list = []

if len(obj_list) > 0:
job_list.append(self.put_objects(
obj_list, bucket, max_threads=max_threads, calculate_checksum=calculate_checksum))
obj_list, bucket, max_threads=max_threads, calculate_checksum=calculate_checksum, job_name=job_name))

return job_list

def get_objects(self, get_objects: List[HelperGetObject], bucket: str, max_threads: int = 5) -> str:
def get_objects(self, get_objects: List[HelperGetObject], bucket: str, max_threads: int = 5,
job_name: str = None) -> str:
"""
Retrieves a list of objects from a Black Pearl bucket.
Expand All @@ -309,6 +315,8 @@ def get_objects(self, get_objects: List[HelperGetObject], bucket: str, max_threa
The name of the bucket where the objects are being retrieved from.
max_threads : int
The number of concurrent objects being transferred at once (default 5).
job_name : str
The name to give the BP get job.
"""
ds3_get_objects: List[Ds3GetObject] = []
get_objects_map: Dict[str, HelperGetObject] = dict()
Expand All @@ -317,7 +325,8 @@ def get_objects(self, get_objects: List[HelperGetObject], bucket: str, max_threa
get_objects_map[entry.object_name] = entry

bulk_get = self.client.get_bulk_job_spectra_s3(GetBulkJobSpectraS3Request(bucket_name=bucket,
object_list=ds3_get_objects))
object_list=ds3_get_objects,
name=job_name))

job_id = bulk_get.result['JobId']

Expand Down Expand Up @@ -369,7 +378,7 @@ def get_blob(self, bucket: str, get_object: HelperGetObject, offset: int, job_id
stream.close()

def get_all_files_in_bucket(self, destination_dir: str, bucket: str, objects_per_bp_job: int = 1000,
max_threads: int = 5) -> List[str]:
max_threads: int = 5, job_name: str = None) -> List[str]:
"""
Retrieves all objects from a Black Pearl bucket.
Expand All @@ -385,6 +394,8 @@ def get_all_files_in_bucket(self, destination_dir: str, bucket: str, objects_per
This determines how many objects to bundle per BP job.
max_threads : int
The number of concurrent objects being transferred at once (default 5).
job_name : str
The name to give the BP get jobs. All BP jobs that are created will have the same name.
"""
truncated: str = 'true'
marker = ""
Expand Down Expand Up @@ -423,7 +434,8 @@ def get_all_files_in_bucket(self, destination_dir: str, bucket: str, objects_per
get_objects.append(HelperGetObject(object_name=object_name, destination_path=object_destination))

if len(get_objects) > 0:
job_id = self.get_objects(get_objects=get_objects, bucket=bucket, max_threads=max_threads)
job_id = self.get_objects(get_objects=get_objects, bucket=bucket, max_threads=max_threads,
job_name=job_name)
job_ids.append(job_id)

truncated = list_bucket.result['IsTruncated']
Expand Down
30 changes: 26 additions & 4 deletions tests/helpersTests.py
Original file line number Diff line number Diff line change
Expand Up @@ -108,11 +108,12 @@ def test_put_and_get_objects(self):
include_dirs=False)

# create the BP helper and perform the put all objects call
job_name = "python test job"
client = ds3.createClientFromEnv()
client.put_bucket_spectra_s3(ds3.PutBucketSpectraS3Request(name=bucket))

helpers = ds3Helpers.Helper(client=client)
job_id = helpers.put_objects(bucket=bucket, put_objects=put_objects)
job_id = helpers.put_objects(bucket=bucket, put_objects=put_objects, job_name=job_name)
self.assertNotEqual(job_id, "", "job id was returned")

# verify all the files and directories are on the BP
Expand All @@ -123,6 +124,10 @@ def test_put_and_get_objects(self):
head_obj = client.head_object(ds3.HeadObjectRequest(bucket_name=bucket, object_name=put_object.object_name))
self.assertNotEqual(head_obj.result, "DOESNTEXIST")

# verify that the job was created with the desired name
get_job = client.get_job_spectra_s3(ds3.GetJobSpectraS3Request(job_id=job_id))
self.assertEqual(get_job.result['Name'], job_name)

# retrieve the files from the BP
destination = tempfile.TemporaryDirectory(prefix="ds3-python3-sdk-dst-")
get_objects: List[ds3Helpers.HelperGetObject] = []
Expand All @@ -134,7 +139,7 @@ def test_put_and_get_objects(self):
object_name_to_source[put_object.object_name] = put_object.file_path

# perform the get objects call
job_id = helpers.get_objects(bucket=bucket, get_objects=get_objects)
job_id = helpers.get_objects(bucket=bucket, get_objects=get_objects, job_name=job_name)
self.assertNotEqual(job_id, "", "job id was returned")

for get_object in get_objects:
Expand All @@ -147,13 +152,18 @@ def test_put_and_get_objects(self):
original_file.close()
retrieved_file.close()

# verify that the job was created with the desired name
get_job = client.get_job_spectra_s3(ds3.GetJobSpectraS3Request(job_id=job_id))
self.assertEqual(get_job.result['Name'], job_name)

# cleanup
source.cleanup()
destination.cleanup()
client.delete_bucket_spectra_s3(ds3.DeleteBucketSpectraS3Request(bucket_name=bucket, force=True))

def test_put_and_get_all_objects_in_directory(self):
bucket = f'ds3-python3-sdk-test-{uuid.uuid1()}'
job_name = "python test job"

# create temporary directory with some files and subdirectories
source = tempfile.TemporaryDirectory(prefix="ds3-python3-sdk-src-")
Expand All @@ -173,19 +183,26 @@ def test_put_and_get_all_objects_in_directory(self):
client.put_bucket(ds3.PutBucketRequest(bucket_name=bucket))

helpers = ds3Helpers.Helper(client=client)
job_ids = helpers.put_all_objects_in_directory(source_dir=source.name, bucket=bucket, objects_per_bp_job=10)
job_ids = helpers.put_all_objects_in_directory(source_dir=source.name, bucket=bucket, objects_per_bp_job=10,
job_name=job_name)
self.assertGreaterEqual(len(job_ids), 1, "received at least one job id")

# verify all the files and directories are on the BP
for put_object in put_objects:
head_obj = client.head_object(ds3.HeadObjectRequest(bucket_name=bucket, object_name=put_object.object_name))
self.assertNotEqual(head_obj.result, "DOESNTEXIST")

# verify that all the job were created with the desired name
for job_id in job_ids:
get_job = client.get_job_spectra_s3(ds3.GetJobSpectraS3Request(job_id=job_id))
self.assertEqual(get_job.result['Name'], job_name)

# retrieve the objects from the BP
destination = tempfile.TemporaryDirectory(prefix="ds3-python3-sdk-dst-")
job_ids = helpers.get_all_files_in_bucket(destination_dir=destination.name,
bucket=bucket,
objects_per_bp_job=10)
objects_per_bp_job=10,
job_name=job_name)

self.assertGreaterEqual(len(job_ids), 2, "multiple job ids returned")

Expand All @@ -199,6 +216,11 @@ def test_put_and_get_all_objects_in_directory(self):
self.assertTrue(os.path.isfile(obj_destination), f'expected path to be file: {obj_destination}')
self.assertEqual(put_object.size, os.path.getsize(obj_destination), 'file size')

# verify that all the job were created with the desired name
for job_id in job_ids:
get_job = client.get_job_spectra_s3(ds3.GetJobSpectraS3Request(job_id=job_id))
self.assertEqual(get_job.result['Name'], job_name)

# cleanup
source.cleanup()
destination.cleanup()
Expand Down

0 comments on commit b7d262d

Please sign in to comment.