Skip to content

Commit

Permalink
Merge branch 'develop' into feature/handle-missing-transforms
Browse files Browse the repository at this point in the history
  • Loading branch information
jameshawkes authored Sep 24, 2024
2 parents 7d20cf2 + 5a289fb commit c69fd62
Show file tree
Hide file tree
Showing 3 changed files with 185 additions and 103 deletions.
16 changes: 7 additions & 9 deletions polytope_server/common/datasource/polytope.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,10 +18,10 @@
# does it submit to any jurisdiction.
#

import copy
import json
import logging
import os
import copy

import yaml
from polytope.utility.exceptions import PolytopeError
Expand Down Expand Up @@ -61,7 +61,7 @@ def retrieve(self, request):

# Set the "pre-path" for this request
pre_path = {}
for k,v in r.items():
for k, v in r.items():
if k in self.req_single_keys:
if isinstance(v, list):
v = v[0]
Expand All @@ -81,15 +81,13 @@ def retrieve(self, request):

polytope_mars_config["options"]["axis_config"] = transforms



polytope_mars = PolytopeMars(
polytope_mars_config,
log_context= {
"user": request.user.realm + ':' + request.user.username,
log_context={
"user": request.user.realm + ":" + request.user.username,
"id": request.id,
})

},
)

try:
self.output = polytope_mars.extract(r)
Expand Down Expand Up @@ -123,7 +121,7 @@ def match(self, request):
raise Exception("got {} : {}, but expected one of {}".format(k, r[k], v))

# Check that there is only one value if required
for k, v in r.items():
for k, v in r.items():
if k in self.req_single_keys:
v = [v] if isinstance(v, str) else v
if len(v) > 1:
Expand Down
50 changes: 31 additions & 19 deletions polytope_server/common/staging/s3_boto3_staging.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@

import json
import logging
import random
import time
from concurrent.futures import Future, ThreadPoolExecutor

Expand Down Expand Up @@ -59,7 +58,6 @@ def submit(self, fn, /, *args, **kwargs):

class S3Staging_boto3(staging.Staging):
def __init__(self, config):

self.bucket = config.get("bucket", "default")
self.url = config.get("url", None)

Expand All @@ -76,17 +74,9 @@ def __init__(self, config):
for name in ["boto", "urllib3", "s3transfer", "boto3", "botocore", "nose"]:
logging.getLogger(name).setLevel(logging.WARNING)

prefix = "https" if self.use_ssl else "http"

if config.get("random_host", False):
self.host = config.get("random_host", {}).get("host", self.host)
index = random.randint(0, config.get("random_host", {}).get("max", 1) - 1)
# replace %%ID%% in the host with the index
self.host = self.host.replace("%%ID%%", str(index))
self.url = self.url + "/" + str(index)
logging.info(f"Using random host: {self.host}")
self.prefix = "https" if self.use_ssl else "http"

self._internal_url = f"{prefix}://{self.host}:{self.port}"
self._internal_url = f"http://{self.host}:{self.port}"

# Setup Boto3 client
self.s3_client = boto3.client(
Expand Down Expand Up @@ -125,7 +115,10 @@ def create(self, name, data, content_type):
# else using content-disposition header
try:
multipart_upload = self.s3_client.create_multipart_upload(
Bucket=self.bucket, Key=name, ContentType=content_type, ContentDisposition="attachment"
Bucket=self.bucket,
Key=name,
ContentType=content_type,
ContentDisposition="attachment",
)
upload_id = multipart_upload["UploadId"]

Expand All @@ -140,7 +133,15 @@ def create(self, name, data, content_type):
else:
for part_data in self.iterator_buffer(data, self.buffer_size):
if part_data:
futures.append(executor.submit(self.upload_part, name, part_number, part_data, upload_id))
futures.append(
executor.submit(
self.upload_part,
name,
part_number,
part_data,
upload_id,
)
)
part_number += 1

for future in futures:
Expand All @@ -153,7 +154,10 @@ def create(self, name, data, content_type):
raise ValueError("No data retrieved")

self.s3_client.complete_multipart_upload(
Bucket=self.bucket, Key=name, UploadId=upload_id, MultipartUpload={"Parts": parts}
Bucket=self.bucket,
Key=name,
UploadId=upload_id,
MultipartUpload={"Parts": parts},
)

logging.info(f"Successfully uploaded {name} in {len(parts)} parts.")
Expand All @@ -168,7 +172,11 @@ def create(self, name, data, content_type):
def upload_part(self, name, part_number, data, upload_id):
logging.debug(f"Uploading part {part_number} of {name}, {len(data)} bytes")
response = self.s3_client.upload_part(
Bucket=self.bucket, Key=name, PartNumber=part_number, UploadId=upload_id, Body=data
Bucket=self.bucket,
Key=name,
PartNumber=part_number,
UploadId=upload_id,
Body=data,
)
return {"PartNumber": part_number, "ETag": response["ETag"]}

Expand All @@ -190,14 +198,14 @@ def set_bucket_policy(self):
},
{
"Sid": "AllowListBucket",
"Effect": "Allow",
"Effect": "Deny",
"Principal": "*",
"Action": "s3:ListBucket",
"Resource": f"arn:aws:s3:::{self.bucket}",
},
{
"Sid": "AllowGetBucketLocation",
"Effect": "Allow",
"Effect": "Deny",
"Principal": "*",
"Action": "s3:GetBucketLocation",
"Resource": f"arn:aws:s3:::{self.bucket}",
Expand Down Expand Up @@ -239,7 +247,11 @@ def stat(self, name):

def get_url(self, name):
if self.url:
return f"{self.url}/{self.bucket}/{name}"
if self.url.startswith("http"):
# This covers both http and https
return f"{self.url}/{self.bucket}/{name}"
else:
return f"{self.prefix}://{self.url}/{self.bucket}/{name}"
return None

def get_internal_url(self, name):
Expand Down
Loading

0 comments on commit c69fd62

Please sign in to comment.