Skip to content

Commit

Permalink
Add etag to dummy preprocessing
Browse files Browse the repository at this point in the history
  • Loading branch information
corneliusroemer committed Sep 13, 2024
1 parent 8f85b3c commit b422c16
Showing 1 changed file with 32 additions and 26 deletions.
58 changes: 32 additions & 26 deletions preprocessing/dummy/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -87,21 +87,28 @@ class Sequence:
)


def fetch_unprocessed_sequences(n: int) -> List[Sequence]:
def fetch_unprocessed_sequences(etag: str | None, n: int) -> tuple[str | None, List[Sequence]]:
url = backendHost + "/extract-unprocessed-data"
params = {"numberOfSequenceEntries": n, "pipelineVersion": pipeline_version}
headers = {"Authorization": "Bearer " + get_jwt()}
headers = {
"Authorization": "Bearer " + get_jwt(),
**({"If-None-Match": etag} if etag else {}),
}
response = requests.post(url, data=params, headers=headers)
if not response.ok:
if response.status_code == 422:
logging.debug("{}. Sleeping for a while.".format(response.text))
match response.status_code:
case 200:
return response.headers.get("ETag"), parse_ndjson(response.text)
case 304:
return etag, []
case 422:
logging.debug(f"{response.text}. Sleeping for a while.")
time.sleep(60 * 10)
return []
raise Exception(
"Fetching unprocessed data failed. Status code: {}".format(response.status_code),
response.text,
)
return parse_ndjson(response.text)
return None, []
case _:
raise Exception(
f"Fetching unprocessed data failed. Status code: {response.status_code}",
response.text,
)


def parse_ndjson(ndjson_data: str) -> List[Sequence]:
Expand Down Expand Up @@ -181,7 +188,7 @@ def submit_processed_sequences(processed: List[Sequence]):
response = requests.post(url, data=ndjson_string, headers=headers)
if not response.ok:
raise Exception(
"Submitting processed data failed. Status code: {}".format(response.status_code),
f"Submitting processed data failed. Status code: {response.status_code}",
response.text,
)

Expand All @@ -196,45 +203,44 @@ def get_jwt():
}
response = requests.post(url, data=data)
if not response.ok:
raise Exception(
"Fetching JWT failed. Status code: {}".format(response.status_code), response.text
)
raise Exception(f"Fetching JWT failed. Status code: {response.status_code}", response.text)
return response.json()["access_token"]


def main():
total_processed = 0
locally_processed = 0
etag = None
last_force_refresh = time.time()

if watch_mode:
logging.debug("Started in watch mode - waiting 10 seconds before fetching data.")
time.sleep(10)

if args.maxSequences and args.maxSequences < 100:
sequences_to_fetch = args.maxSequences
else:
sequences_to_fetch = 100
sequences_to_fetch = args.maxSequences if args.maxSequences and args.maxSequences < 100 else 100

while True:
unprocessed = fetch_unprocessed_sequences(sequences_to_fetch)
if last_force_refresh + 3600 < time.time():
etag = None
last_force_refresh = time.time()

etag, unprocessed = fetch_unprocessed_sequences(etag, sequences_to_fetch)
if len(unprocessed) == 0:
if watch_mode:
logging.debug(
"Processed {} sequences. Sleeping for 10 seconds.".format(locally_processed)
)
logging.debug(f"Processed {locally_processed} sequences. Sleeping for 10 seconds.")
time.sleep(2)
locally_processed = 0
continue
else:
break
break
etag = None
processed = process(unprocessed)
submit_processed_sequences(processed)
total_processed += len(processed)
locally_processed += len(processed)

if args.maxSequences and total_processed >= args.maxSequences:
break
logging.debug("Total processed sequences: {}".format(total_processed))
logging.debug(f"Total processed sequences: {total_processed}")


if __name__ == "__main__":
Expand Down

0 comments on commit b422c16

Please sign in to comment.