Skip to content

Commit

Permalink
Use etag in prepro to skip preprocessing
Browse files Browse the repository at this point in the history
  • Loading branch information
corneliusroemer committed Sep 11, 2024
1 parent 40ded2b commit cda70e5
Show file tree
Hide file tree
Showing 5 changed files with 28 additions and 19 deletions.
3 changes: 2 additions & 1 deletion preprocessing/nextclade/environment.yml
Original file line number Diff line number Diff line change
Expand Up @@ -2,11 +2,12 @@ name: loculus-nextclade
channels:
- conda-forge
- bioconda
- nodefaults
dependencies:
- python=3.12
- biopython=1.83
- dpath=2.1
- nextclade=3.5
- nextclade=3.8
- pip=24.0
- PyYAML=6.0
- pyjwt=2.8
Expand Down
33 changes: 19 additions & 14 deletions preprocessing/nextclade/src/loculus_preprocessing/backend.py
Original file line number Diff line number Diff line change
Expand Up @@ -66,24 +66,29 @@ def get_jwt(config: Config) -> str:
raise Exception(error_msg)


def fetch_unprocessed_sequences(n: int, config: Config) -> str:
def fetch_unprocessed_sequences(etag: str, config: Config) -> tuple[str, str]:
n = config.batch_size
url = config.backend_host.rstrip("/") + "/extract-unprocessed-data"
logging.debug(f"Fetching {n} unprocessed sequences from {url}")
params = {"numberOfSequenceEntries": n, "pipelineVersion": config.pipeline_version}
headers = {"Authorization": "Bearer " + get_jwt(config)}
headers = {"Authorization": "Bearer " + get_jwt(config), "If-None-Match": etag}
response = requests.post(url, data=params, headers=headers, timeout=10)
if not response.ok:
if response.status_code == HTTPStatus.UNPROCESSABLE_ENTITY:
logging.debug(f"{response.text}.\nSleeping for a while.")
time.sleep(60 * 1)
return ""
msg = f"Fetching unprocessed data failed. Status code: {
response.status_code}"
raise Exception(
msg,
response.text,
)
return response.text
match response.status_code:
case HTTPStatus.NOT_MODIFIED:
return etag, ""
case HTTPStatus.OK:
return response.headers["ETag"], response.text
case _:
if response.status_code == HTTPStatus.UNPROCESSABLE_ENTITY:
logging.debug(f"{response.text}.\nSleeping for a while.")
time.sleep(60 * 1)
return ""
msg = f"Fetching unprocessed data failed. Status code: {
response.status_code}"
raise Exception(
msg,
response.text,
)


def submit_processed_sequences(
Expand Down
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
# ruff: noqa: N815
from dataclasses import dataclass, field
from enum import StrEnum, unique
from typing import List, Tuple, Any
from typing import Any

AccessionVersion = str
GeneName = str
Expand Down Expand Up @@ -37,7 +37,7 @@ def __hash__(self):

@dataclass(frozen=True)
class ProcessingAnnotation:
source: Tuple[AnnotationSource, ...]
source: tuple[AnnotationSource, ...]
message: str

def __post_init__(self):
Expand Down
3 changes: 2 additions & 1 deletion preprocessing/nextclade/src/loculus_preprocessing/prepro.py
Original file line number Diff line number Diff line change
Expand Up @@ -725,9 +725,10 @@ def run(config: Config) -> None:
if config.nextclade_dataset_name:
download_nextclade_dataset(dataset_dir, config)
total_processed = 0
etag = ""
while True:
logging.debug("Fetching unprocessed sequences")
unprocessed = parse_ndjson(fetch_unprocessed_sequences(config.batch_size, config))
etag, unprocessed = fetch_unprocessed_sequences(etag, config)
if len(unprocessed) == 0:
# sleep 1 sec and try again
logging.debug("No unprocessed sequences found. Sleeping for 1 second.")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -423,7 +423,9 @@ def identity(
warnings.append(
ProcessingAnnotation(
source=[
AnnotationSource(name=output_field, type=AnnotationSourceType.METADATA)
AnnotationSource(
name=output_field, type=AnnotationSourceType.METADATA
)
],
message=f"Invalid boolean value: {input_datum}. Defaulting to null.",
)
Expand Down

0 comments on commit cda70e5

Please sign in to comment.