Skip to content
Open

Log #158

Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
169 changes: 145 additions & 24 deletions canine/localization/file_handlers.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
import google.auth
import glob, google_crc32c, json, hashlib, base64, binascii, os, re, requests, shlex, subprocess, threading
import pandas as pd
import urllib.parse

from google.auth.transport.requests import AuthorizedSession
from ..utils import sha1_base32, canine_logging
Expand Down Expand Up @@ -124,27 +125,49 @@ def get_requester_pays(self) -> bool:
Returns True if the requested gs:// object or bucket resides in a
requester pays bucket
"""
bucket = re.match(r"gs://(.*?)/.*", self.path)[1]

gcs_cl = gcloud_storage_client()
bucket_obj = google.cloud.storage.Bucket(gcs_cl, bucket, user_project = self.extra_args.get("project"))
bucket_obj.reload()
return bucket_obj.requester_pays

# TODO: handle case where permissions disallow bucket inspection?
# ret = subprocess.run('gsutil requesterpays get gs://{}'.format(bucket), shell = True, capture_output = True)
# if b'requester pays bucket but no user project provided' in ret.stderr:
# return True
# else:
# # Try again ls-ing the object itself
# # sometimes permissions can disallow bucket inspection
# # but allow object inspection
# ret = subprocess.run('gsutil ls {}'.format(self.path), shell = True, capture_output = True)
# return b'requester pays bucket but no user project provided' in ret.stderr
#
# if ret.returncode == 1 and b'BucketNotFoundException: 404' in ret.stderr:
# canine_logging.error(ret.stderr.decode())
# raise subprocess.CalledProcessError(ret.returncode, "")
# Try GCS API first (fastest)
try:
bucket = re.match(r"gs://(.*?)/.*", self.path)[1]
gcs_cl = gcloud_storage_client()
bucket_obj = google.cloud.storage.Bucket(gcs_cl, bucket, user_project = self.extra_args.get("project"))
bucket_obj.reload()
return bucket_obj.requester_pays
except Exception as e:
# Fallback to gsutil approach when GCS API fails (e.g., 403 permissions)
canine_logging.info1(f"GCS API failed for bucket {bucket}, falling back to gsutil: {e}")

# Extract bucket and path like base class does
if self.path.startswith('gs://'):
path = self.path[5:]
else:
path = self.path
bucket = path.split('/')[0]

# Try gsutil requesterpays get command
command = 'gsutil requesterpays get gs://{}'.format(bucket)
ret = subprocess.run(command, shell = True, capture_output = True)
text = ret.stderr

if ret.returncode == 0 or b'BucketNotFoundException: 404' not in text:
# Check both stderr (for error messages) and stdout (for success messages)
return (
b'requester pays bucket but no user project provided' in text
or 'gs://{}: Enabled'.format(bucket).encode() in ret.stdout
)
else:
# Try again ls-ing the object itself
# sometimes permissions can disallow bucket inspection
# but allow object inspection
command = 'gsutil ls gs://{}'.format(path)
ret = subprocess.run(command, shell = True, capture_output = True)
text = ret.stderr

if ret.returncode == 1 and b'BucketNotFoundException: 404' in text:
canine_logging.error(text.decode())
raise subprocess.CalledProcessError(ret.returncode, command)

# Check if this indicates requester pays
return b'requester pays bucket but no user project provided' in text

def __init__(self, path, **kwargs):
super().__init__(path, **kwargs)
Expand Down Expand Up @@ -534,16 +557,74 @@ def __init__(self, path, **kwargs):

try:
metadata = resp.json()
self.path = metadata["fileName"]
self._size = metadata["size"]

# Extract the actual filename from the metadata
if "fileName" in metadata and metadata["fileName"]:
provided_filename = metadata["fileName"]
canine_logging.info1(f"DRShub-provided fileName: {provided_filename}")

# Check if the fileName is just the UUID (common issue with DRShub)
# Extract UUID from the DRS URI for comparison
uri_uuid = self.uri.split(':')[-1] if ':' in self.uri else None

if provided_filename == uri_uuid:
# DRShub gave us the UUID as filename, try to extract real filename from accessUrl
canine_logging.warning(f"DRShub returned UUID as fileName for {self.uri}, attempting to extract real filename from accessUrl")

# Make another call to get the accessUrl
access_data = {"url": self.uri, "fields": ["accessUrl"]}
access_resp = drshub_session.post(type(self).drs_resolver,
headers={"Content-type": "application/json"}, json=access_data)

try:
access_metadata = access_resp.json()
if "accessUrl" in access_metadata and ("url" in access_metadata["accessUrl"]):
signed_url = access_metadata["accessUrl"]["url"]

# Extract filename from the signed URL path
# URLs typically look like: https://domain.com/bucket/uuid/actual_filename.ext?params
parsed_url = urllib.parse.urlparse(signed_url)
url_path = parsed_url.path

# Split path and get the last part (should be the real filename)
path_parts = url_path.strip('/').split('/')
if len(path_parts) >= 2:
real_filename = path_parts[-1] # Last part should be the real filename
if real_filename and (real_filename != uri_uuid):
canine_logging.info1(f"Extracted real filename from accessUrl: {real_filename}")
self.path = real_filename
else:
canine_logging.warning(f"Could not extract valid filename from accessUrl path: {url_path}")
self.path = provided_filename # Fall back to UUID
else:
canine_logging.warning(f"Unexpected accessUrl path format: {url_path}")
self.path = provided_filename # Fall back to UUID
else:
canine_logging.warning(f"No accessUrl in DRShub response, using provided fileName: {provided_filename}")
self.path = provided_filename
except Exception as e:
canine_logging.warning(f"Error extracting filename from accessUrl: {e}, using provided fileName: {provided_filename}")
self.path = provided_filename
else:
# DRShub gave us a proper filename
self.path = provided_filename
else:
# This should not happen - if DRShub doesn't provide fileName, something is wrong
canine_logging.error(f"DRShub did not provide fileName for {self.uri}")
canine_logging.error(f"Available fields: {list(metadata.keys())}")
raise ValueError(f"DRShub response missing fileName for {self.uri}")

self._size = metadata.get("size")
self._hash = metadata.get("hashes", {}).get("md5")
except:

except Exception as e:
try:
msg = json.dumps(resp.json())
except:
msg = resp.text
canine_logging.error("Error resolving DRS URI; see details:")
canine_logging.error(f"Response code: {resp.status_code}")
canine_logging.error(f"Error: {e}")
canine_logging.error(msg)
raise
self.localized_path = self.path
Expand Down Expand Up @@ -599,6 +680,45 @@ def localization_command(self, dest):
return "\n".join(cmd)


class HandleGCSSignedURL(FileType):
localization_mode = "url"

def __init__(self, path, **kwargs):
super().__init__(path, **kwargs)

self.url = self.path
# Extract the object path from signed GCS URLs to preserve the original filename
# Signed URLs look like: https://storage.googleapis.com/bucket/path/to/file.ext?GoogleAccessId=...
# or: https://storage.cloud.google.com/bucket/path/to/file.ext?GoogleAccessId=...
url_parse = re.match(r"https://storage\.(?:googleapis|cloud\.google)\.com/[^/]+/(.+?)(?:\?|$)", self.url)
if url_parse is None:
raise ValueError(f"Signed GCS URL format not recognized: {self.url}")

# Keep the full object path to preserve directory structure and original filename
object_path = url_parse[1]
self.path = os.path.basename(object_path) # Use just the filename for localization
self.localized_path = self.path

# get file size from server
try:
resp_size = subprocess.run("curl -sIL {url} | grep -i Content-Length".format(url=self.url), shell=True, capture_output=True)
self._size = int(re.match("[cC]ontent.[lL]ength.*?(\d+)", resp_size.stdout.decode())[1])
except:
raise ValueError("Could not get file header size")

def _get_hash(self):
return sha1_base32(bytearray(self.url, "utf-8"), 4)

def localization_command(self, dest):
dest_dir = shlex.quote(os.path.dirname(dest))
dest_file = shlex.quote(os.path.basename(dest))
self.localized_path = os.path.join(dest_dir, dest_file)
cmd = []
cmd += ["[ ! -d {dest_dir} ] && mkdir -p {dest_dir} || :; curl -C - -o {path} '{url}'".format(dest_dir = dest_dir, path = self.localized_path, url = self.url)]

# md5 checking currently not supported
return "\n".join(cmd)

class HandleOtherURL(FileType):
localization_mode = "url"

Expand Down Expand Up @@ -745,6 +865,7 @@ def get_file_handler(path, url_map = None, **kwargs):
r"^drs://" : HandleDRSURI,
r"^https://api.gdc.cancer.gov" : HandleGDCHTTPURL,
r"^https://api.awg.gdc.cancer.gov" : HandleGDCHTTPURL,
r"^https://storage\.(?:googleapis|cloud\.google)\.com/" : HandleGCSSignedURL,
r"^rodisk://" : HandleRODISKURL,
r"^(?:ftp|https|http)://" : HandleOtherURL
} if url_map is None else url_map
Expand Down
51 changes: 36 additions & 15 deletions canine/orchestrator.py
Original file line number Diff line number Diff line change
Expand Up @@ -86,7 +86,7 @@
break
else
echo "!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!" >&2
echo -e "!!!! JOB FAILED! (EXIT CODE !!!!\e[29G$CANINE_JOB_RC)" >&2
printf "!!!! JOB FAILED! (EXIT CODE %-16s!!!!\n" "$CANINE_JOB_RC)" >&2
echo "!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!" >&2
echo $(($([ -f $CANINE_JOB_ROOT/.job_failure_count ] && cat $CANINE_JOB_ROOT/.job_failure_count || echo -n 0)+1)) > $CANINE_JOB_ROOT/.job_failure_count
echo '++++ STARTING JOB CLEANUP ++++' >&2
Expand Down Expand Up @@ -211,25 +211,46 @@ def load_acct_from_disk(job_spec, localizer, batch_id):

jobs_dir = localizer.environment("local")["CANINE_JOBS"]
acct = {}
placeholder_fields = { "State" : np.nan, "ExitCode": "-", "CPUTimeRAW" : -1, "Submit": np.datetime64('nat'), "n_preempted" : -1 }
placeholder_fields = { "State" : np.nan, "ExitCode": "-", "CPUTimeRAW" : -1, "Submit": np.datetime64('nat'), "NodeList" : "-", "Partition" : "-","ReqCPUS" : -1, "NCPUS" : -1, "ReqMem" : "-", "n_preempted" : -1}

with localizer.transport_context() as tr:
for j, v in job_spec.items():
sacct_path = os.path.join(jobs_dir, j, ".sacct")
jid = str(batch_id) + "_" + j
if tr.exists(sacct_path):
with tr.open(sacct_path, "r") as f:
acct[jid] = pd.read_csv(
f,
header = None,
sep = "\t",
names = [
"State", "ExitCode", "CPUTimeRAW", "Submit", "n_preempted"
]
).astype({
'CPUTimeRAW': int,
"Submit" : np.datetime64
})
try:
acct[jid] = pd.read_csv(
f,
header = None,
sep = "\t",
names = [
"State", "ExitCode", "CPUTimeRAW", "Submit","NodeList","Partition","ReqCPUS","NCPUS","ReqMem", "n_preempted"
]
).astype({
'CPUTimeRAW': int,
"Submit" : np.datetime64,
"ReqCPUS" : int,
"NCPUS" : int,
})
except:
a = pd.read_csv(
f,
header = None,
sep = "\t",
names = [
"State", "ExitCode", "CPUTimeRAW", "Submit", "n_preempted"
]
).astype({
'CPUTimeRAW': int,
"Submit" : np.datetime64,
})
a["NodeList"] = "-"
a["Partition"] = "-"
a["ReqCPUS"] = -1
a["NCPUS"] = -1
a["ReqMem"] = "-"
acct[jid] = a[["State", "ExitCode", "CPUTimeRAW", "Submit","NodeList","Partition","ReqCPUS","NCPUS","ReqMem", "n_preempted"]]

# sacct info is blank (write error?)
if acct[jid].empty:
Expand Down Expand Up @@ -534,8 +555,8 @@ def grouper(g):
acct = self.backend.sacct(
"D",
job = batch_id,
format = "JobId%50,State,ExitCode,CPUTimeRAW,PlannedCPURAW,Submit"
).astype({'CPUTimeRAW': int, "PlannedCPURAW" : float, "Submit" : np.datetime64})
format = "JobId%50,State,ExitCode,CPUTimeRAW,PlannedCPURAW,Submit,NodeList%50,Partition%50,ReqCPUS,NCPUS,ReqMem"
).astype({'CPUTimeRAW': int, "PlannedCPURAW" : float, "Submit" : np.datetime64, "ReqCPUS" : int, "NCPUS" : int})
# sometimes sacct can lag when the cluster is under load and return nothing; retry with exponential backoff
if len(acct) > 0:
break
Expand Down
Loading