From 5bd006623b69849d5867b3b81dc6e43d0c1eaaf9 Mon Sep 17 00:00:00 2001 From: Cornelius Roemer Date: Mon, 9 Sep 2024 17:51:10 +0200 Subject: [PATCH 1/3] feat: add table_update_tracker tracking last change per table co-authored with @anna-parker and Claude Sonnet try now it might work? Remove views fixup Change to a conditional update for useNewerProcessingPipelineIfPossible, add last_updated_db as a view. (#2741) Co-authored-by: Anna (Anya) Parker <50943381+anna-parker@users.noreply.github.com> Still return current newest version Return version from correct place Don't issue delete statement unless there's something to delete fix ktlint Adjust useNewerProcessingPipelineIfPossible to only update table when there are changes Table update tracker anya (#2764) * Add option to get-released-data to check last update time. * make name better * Make or short-circuiting * Only work with table_update_tracker - return lastTime in LAST_MODIFIED header * Remove unused functions * add back missing imports - somehow format removed this??? * Update bash scripts * Attempt to fix bash * 2nd little fix * echo value of last_modified not name * Do not update SILO if loculus responds with 304. * Always echo $lastSnapshot unless new data existed in loculus AND this was successfully sent to LAPIS - then echo $last_modified * save last snapshot to file * Update with loculus data anyways if longer than 1hour since last update * fix * Improve silo import scripts etc * add back BACKEND_BASE_URL * Log headers * Regularize * Better logs * Try to fix error * return 0 and never null in header * Fix read non-existent file bug * Better wrapper logging * fix delete after update snapshot * Make grep case insensitive * add back removed exit 0 * Improvement to logging * Add test * Add full process to test * Update documentation --------- Co-authored-by: Cornelius Roemer CR edits to table-update-tracker (#2767) * Add option to get-released-data to check last update time. * make name better * Make or short-circuiting * Only work with table_update_tracker - return lastTime in LAST_MODIFIED header * Remove unused functions * add back missing imports - somehow format removed this??? * Update bash scripts * Attempt to fix bash * 2nd little fix * echo value of last_modified not name * Do not update SILO if loculus responds with 304. * Always echo $lastSnapshot unless new data existed in loculus AND this was successfully sent to LAPIS - then echo $last_modified * save last snapshot to file * Update with loculus data anyways if longer than 1hour since last update * fix * Improve silo import scripts etc * add back BACKEND_BASE_URL * Log headers * Regularize * Better logs * Try to fix error * return 0 and never null in header * Fix read non-existent file bug * Better wrapper logging * fix delete after update snapshot * Make grep case insensitive * add back removed exit 0 * Improvement to logging * Add test * Add full process to test * Update documentation * Use etag instead of modified-since for spec compliance * ktlin fix * Move logic into model * Fix etag formatting * Fix etag yet again --------- Co-authored-by: Anna (Anya) Parker <50943381+anna-parker@users.noreply.github.com> refactor(backend): streamline control flow refactor(backend): extract function Update backend/src/main/kotlin/org/loculus/backend/controller/SubmissionController.kt Co-authored-by: Fabian Engelniederhammer <92720311+fengelniederhammer@users.noreply.github.com> Update backend/src/main/kotlin/org/loculus/backend/controller/SubmissionController.kt Co-authored-by: Fabian Engelniederhammer <92720311+fengelniederhammer@users.noreply.github.com> Update backend/src/test/kotlin/org/loculus/backend/controller/submission/GetReleasedDataEndpointTest.kt Co-authored-by: Fabian Engelniederhammer <92720311+fengelniederhammer@users.noreply.github.com> Enable etag for /extract-unprocessed-data Use etag in prepro to skip preprocessing match case refactor Try fix fix logic fix! i'm stupid Force refresh every hour Add response code annotations Update preprocessing/nextclade/src/loculus_preprocessing/prepro.py Co-authored-by: Anna (Anya) Parker <50943381+anna-parker@users.noreply.github.com> Add etag to dummy preprocessing # Conflicts: # backend/src/main/kotlin/org/loculus/backend/controller/SubmissionController.kt # backend/src/main/kotlin/org/loculus/backend/controller/SubmissionControllerDescriptions.kt # backend/src/main/kotlin/org/loculus/backend/model/ReleasedDataModel.kt # backend/src/main/kotlin/org/loculus/backend/service/submission/SubmissionDatabaseService.kt # backend/src/main/kotlin/org/loculus/backend/service/submission/UpdateTrackerTable.kt # backend/src/main/resources/db/migration/V1.2__add_table_update_tracker.sql # backend/src/test/kotlin/org/loculus/backend/controller/submission/GetReleasedDataEndpointTest.kt # kubernetes/loculus/silo_import_job.sh --- .../controller/SubmissionController.kt | 6 ++ .../SubmissionControllerDescriptions.kt | 2 + .../backend/model/ReleasedDataModel.kt | 2 + kubernetes/loculus/silo_import_job.sh | 5 +- preprocessing/dummy/main.py | 58 ++++++++++--------- preprocessing/nextclade/environment.yml | 3 +- .../src/loculus_preprocessing/backend.py | 58 +++++++++++++++---- .../src/loculus_preprocessing/datatypes.py | 4 +- .../src/loculus_preprocessing/prepro.py | 44 ++++++-------- .../processing_functions.py | 4 +- 10 files changed, 114 insertions(+), 72 deletions(-) diff --git a/backend/src/main/kotlin/org/loculus/backend/controller/SubmissionController.kt b/backend/src/main/kotlin/org/loculus/backend/controller/SubmissionController.kt index 5b1c16cb3..7bf230cd0 100644 --- a/backend/src/main/kotlin/org/loculus/backend/controller/SubmissionController.kt +++ b/backend/src/main/kotlin/org/loculus/backend/controller/SubmissionController.kt @@ -134,6 +134,7 @@ open class SubmissionController( ), ], ) + @ApiResponse(responseCode = "304", description = "Not Modified") @ApiResponse(responseCode = "422", description = EXTRACT_UNPROCESSED_DATA_ERROR_RESPONSE) @PostMapping("/extract-unprocessed-data", produces = [MediaType.APPLICATION_NDJSON_VALUE]) fun extractUnprocessedData( @@ -143,6 +144,7 @@ open class SubmissionController( message = "You can extract at max $MAX_EXTRACTED_SEQUENCE_ENTRIES sequence entries at once.", ) numberOfSequenceEntries: Int, @RequestParam pipelineVersion: Long, + @RequestHeader(value = HttpHeaders.IF_NONE_MATCH, required = false) ifNoneMatch: String?, ): ResponseEntity { val currentProcessingPipelineVersion = submissionDatabaseService.getCurrentProcessingPipelineVersion() if (pipelineVersion < currentProcessingPipelineVersion) { @@ -152,8 +154,12 @@ open class SubmissionController( ) } + val lastDatabaseWriteETag = releasedDataModel.getLastDatabaseWriteETag() + if (ifNoneMatch == lastDatabaseWriteETag) return ResponseEntity.status(HttpStatus.NOT_MODIFIED).build() + val headers = HttpHeaders() headers.contentType = MediaType.parseMediaType(MediaType.APPLICATION_NDJSON_VALUE) + headers.eTag = lastDatabaseWriteETag val streamBody = streamTransactioned { submissionDatabaseService.streamUnprocessedSubmissions(numberOfSequenceEntries, organism, pipelineVersion) } diff --git a/backend/src/main/kotlin/org/loculus/backend/controller/SubmissionControllerDescriptions.kt b/backend/src/main/kotlin/org/loculus/backend/controller/SubmissionControllerDescriptions.kt index 0f0be7b72..f88076596 100644 --- a/backend/src/main/kotlin/org/loculus/backend/controller/SubmissionControllerDescriptions.kt +++ b/backend/src/main/kotlin/org/loculus/backend/controller/SubmissionControllerDescriptions.kt @@ -146,6 +146,8 @@ and roll back the whole transaction. const val GET_RELEASED_DATA_DESCRIPTION = """ Get released data as a stream of NDJSON. This returns all accession versions that have the status 'APPROVED_FOR_RELEASE'. +Optionally add HttpHeader If-Modified-Since in unix timestamp (in seconds), +to only retrieve all released data if the database has changed since If-Modified-Since. """ const val GET_RELEASED_DATA_RESPONSE_DESCRIPTION = """ diff --git a/backend/src/main/kotlin/org/loculus/backend/model/ReleasedDataModel.kt b/backend/src/main/kotlin/org/loculus/backend/model/ReleasedDataModel.kt index 59f05a39b..2b670cb29 100644 --- a/backend/src/main/kotlin/org/loculus/backend/model/ReleasedDataModel.kt +++ b/backend/src/main/kotlin/org/loculus/backend/model/ReleasedDataModel.kt @@ -1,5 +1,6 @@ package org.loculus.backend.model +import UpdateTrackerTable import com.fasterxml.jackson.databind.node.BooleanNode import com.fasterxml.jackson.databind.node.IntNode import com.fasterxml.jackson.databind.node.LongNode @@ -9,6 +10,7 @@ import kotlinx.datetime.Clock import kotlinx.datetime.TimeZone import kotlinx.datetime.toLocalDateTime import mu.KotlinLogging +import org.jetbrains.exposed.sql.selectAll import org.loculus.backend.api.DataUseTerms import org.loculus.backend.api.GeneticSequence import org.loculus.backend.api.Organism diff --git a/kubernetes/loculus/silo_import_job.sh b/kubernetes/loculus/silo_import_job.sh index 7f1a66590..c2cc4091c 100755 --- a/kubernetes/loculus/silo_import_job.sh +++ b/kubernetes/loculus/silo_import_job.sh @@ -85,7 +85,6 @@ download_data() { http_status_code=$(curl -o "$new_input_data_path" --fail-with-body "$released_data_endpoint" -H "If-None-Match: $last_etag" -D "$new_input_header_path" -w "%{http_code}") exit_code=$? set -e - echo "Release data request returned with http status code: $http_status_code" if [ "$http_status_code" -eq 304 ]; then echo "State in Loculus backend has not changed: HTTP 304 Not Modified." @@ -109,8 +108,8 @@ download_data() { expected_record_count=$(grep -i '^x-total-records:' "$new_input_header_path" | awk '{print $2}' | tr -d '[:space:]') echo "Response should contain a total of : $expected_record_count records" - # jq validates each individual json object, to catch truncated lines - true_record_count=$(zstd -d -c "$new_input_data_path" | jq -c . | wc -l | tr -d '[:space:]') + # jq validates each individual json object, to catch truncated lines + true_record_count=$(zstd -d -c "$new_input_data_path" | jq -c . | wc -l | tr -d '[:space:]') echo "Response contained a total of : $true_record_count records" if [ "$true_record_count" -ne "$expected_record_count" ]; then diff --git a/preprocessing/dummy/main.py b/preprocessing/dummy/main.py index 9958a2d76..886e1b59d 100644 --- a/preprocessing/dummy/main.py +++ b/preprocessing/dummy/main.py @@ -87,21 +87,28 @@ class Sequence: ) -def fetch_unprocessed_sequences(n: int) -> List[Sequence]: +def fetch_unprocessed_sequences(etag: str | None, n: int) -> tuple[str | None, List[Sequence]]: url = backendHost + "/extract-unprocessed-data" params = {"numberOfSequenceEntries": n, "pipelineVersion": pipeline_version} - headers = {"Authorization": "Bearer " + get_jwt()} + headers = { + "Authorization": "Bearer " + get_jwt(), + **({"If-None-Match": etag} if etag else {}), + } response = requests.post(url, data=params, headers=headers) - if not response.ok: - if response.status_code == 422: - logging.debug("{}. Sleeping for a while.".format(response.text)) + match response.status_code: + case 200: + return response.headers.get("ETag"), parse_ndjson(response.text) + case 304: + return etag, [] + case 422: + logging.debug(f"{response.text}. Sleeping for a while.") time.sleep(60 * 10) - return [] - raise Exception( - "Fetching unprocessed data failed. Status code: {}".format(response.status_code), - response.text, - ) - return parse_ndjson(response.text) + return None, [] + case _: + raise Exception( + f"Fetching unprocessed data failed. Status code: {response.status_code}", + response.text, + ) def parse_ndjson(ndjson_data: str) -> List[Sequence]: @@ -181,7 +188,7 @@ def submit_processed_sequences(processed: List[Sequence]): response = requests.post(url, data=ndjson_string, headers=headers) if not response.ok: raise Exception( - "Submitting processed data failed. Status code: {}".format(response.status_code), + f"Submitting processed data failed. Status code: {response.status_code}", response.text, ) @@ -196,37 +203,36 @@ def get_jwt(): } response = requests.post(url, data=data) if not response.ok: - raise Exception( - "Fetching JWT failed. Status code: {}".format(response.status_code), response.text - ) + raise Exception(f"Fetching JWT failed. Status code: {response.status_code}", response.text) return response.json()["access_token"] def main(): total_processed = 0 locally_processed = 0 + etag = None + last_force_refresh = time.time() if watch_mode: logging.debug("Started in watch mode - waiting 10 seconds before fetching data.") time.sleep(10) - if args.maxSequences and args.maxSequences < 100: - sequences_to_fetch = args.maxSequences - else: - sequences_to_fetch = 100 + sequences_to_fetch = args.maxSequences if args.maxSequences and args.maxSequences < 100 else 100 while True: - unprocessed = fetch_unprocessed_sequences(sequences_to_fetch) + if last_force_refresh + 3600 < time.time(): + etag = None + last_force_refresh = time.time() + + etag, unprocessed = fetch_unprocessed_sequences(etag, sequences_to_fetch) if len(unprocessed) == 0: if watch_mode: - logging.debug( - "Processed {} sequences. Sleeping for 10 seconds.".format(locally_processed) - ) + logging.debug(f"Processed {locally_processed} sequences. Sleeping for 10 seconds.") time.sleep(2) locally_processed = 0 continue - else: - break + break + etag = None processed = process(unprocessed) submit_processed_sequences(processed) total_processed += len(processed) @@ -234,7 +240,7 @@ def main(): if args.maxSequences and total_processed >= args.maxSequences: break - logging.debug("Total processed sequences: {}".format(total_processed)) + logging.debug(f"Total processed sequences: {total_processed}") if __name__ == "__main__": diff --git a/preprocessing/nextclade/environment.yml b/preprocessing/nextclade/environment.yml index 131ac1446..a1e9811b4 100644 --- a/preprocessing/nextclade/environment.yml +++ b/preprocessing/nextclade/environment.yml @@ -2,11 +2,12 @@ name: loculus-nextclade channels: - conda-forge - bioconda + - nodefaults dependencies: - python=3.12 - biopython=1.83 - dpath=2.1 - - nextclade=3.5 + - nextclade=3.8 - pip=24.0 - PyYAML=6.0 - pyjwt=2.8 diff --git a/preprocessing/nextclade/src/loculus_preprocessing/backend.py b/preprocessing/nextclade/src/loculus_preprocessing/backend.py index 8a6028b56..9dbdc4883 100644 --- a/preprocessing/nextclade/src/loculus_preprocessing/backend.py +++ b/preprocessing/nextclade/src/loculus_preprocessing/backend.py @@ -16,6 +16,8 @@ from .config import Config from .datatypes import ( ProcessedEntry, + UnprocessedData, + UnprocessedEntry, ) @@ -66,24 +68,56 @@ def get_jwt(config: Config) -> str: raise Exception(error_msg) -def fetch_unprocessed_sequences(n: int, config: Config) -> str: +def parse_ndjson(ndjson_data: str) -> Sequence[UnprocessedEntry]: + entries = [] + for json_str in ndjson_data.split("\n"): + if len(json_str) == 0: + continue + # Loculus currently cannot handle non-breaking spaces. + json_str_processed = json_str.replace("\N{NO-BREAK SPACE}", " ") + json_object = json.loads(json_str_processed) + unprocessed_data = UnprocessedData( + submitter=json_object["submitter"], + metadata=json_object["data"]["metadata"], + unalignedNucleotideSequences=json_object["data"]["unalignedNucleotideSequences"], + ) + entry = UnprocessedEntry( + accessionVersion=f"{json_object['accession']}.{ + json_object['version']}", + data=unprocessed_data, + ) + entries.append(entry) + return entries + + +def fetch_unprocessed_sequences( + etag: str | None, config: Config +) -> tuple[str | None, Sequence[UnprocessedEntry] | None]: + n = config.batch_size url = config.backend_host.rstrip("/") + "/extract-unprocessed-data" logging.debug(f"Fetching {n} unprocessed sequences from {url}") params = {"numberOfSequenceEntries": n, "pipelineVersion": config.pipeline_version} - headers = {"Authorization": "Bearer " + get_jwt(config)} + headers = { + "Authorization": "Bearer " + get_jwt(config), + **({"If-None-Match": etag} if etag else {}), + } + logging.debug(f"Requesting data with ETag: {etag}") response = requests.post(url, data=params, headers=headers, timeout=10) - if not response.ok: - if response.status_code == HTTPStatus.UNPROCESSABLE_ENTITY: + match response.status_code: + case HTTPStatus.NOT_MODIFIED: + return etag, None + case HTTPStatus.OK: + return response.headers["ETag"], parse_ndjson(response.text) + case HTTPStatus.UNPROCESSABLE_ENTITY: logging.debug(f"{response.text}.\nSleeping for a while.") time.sleep(60 * 1) - return "" - msg = f"Fetching unprocessed data failed. Status code: { - response.status_code}" - raise Exception( - msg, - response.text, - ) - return response.text + return None, None + case _: + msg = f"Fetching unprocessed data failed. Status code: {response.status_code}" + raise Exception( + msg, + response.text, + ) def submit_processed_sequences( diff --git a/preprocessing/nextclade/src/loculus_preprocessing/datatypes.py b/preprocessing/nextclade/src/loculus_preprocessing/datatypes.py index 8acebb6a9..215028624 100644 --- a/preprocessing/nextclade/src/loculus_preprocessing/datatypes.py +++ b/preprocessing/nextclade/src/loculus_preprocessing/datatypes.py @@ -1,7 +1,7 @@ # ruff: noqa: N815 from dataclasses import dataclass, field from enum import StrEnum, unique -from typing import List, Tuple, Any +from typing import Any AccessionVersion = str GeneName = str @@ -37,7 +37,7 @@ def __hash__(self): @dataclass(frozen=True) class ProcessingAnnotation: - source: Tuple[AnnotationSource, ...] + source: tuple[AnnotationSource, ...] message: str def __post_init__(self): diff --git a/preprocessing/nextclade/src/loculus_preprocessing/prepro.py b/preprocessing/nextclade/src/loculus_preprocessing/prepro.py index 715105b88..2c6474e75 100644 --- a/preprocessing/nextclade/src/loculus_preprocessing/prepro.py +++ b/preprocessing/nextclade/src/loculus_preprocessing/prepro.py @@ -52,28 +52,6 @@ # Functions related to reading and writing files -def parse_ndjson(ndjson_data: str) -> Sequence[UnprocessedEntry]: - entries = [] - for json_str in ndjson_data.split("\n"): - if len(json_str) == 0: - continue - # Loculus currently cannot handle non-breaking spaces. - json_str_processed = json_str.replace("\N{NO-BREAK SPACE}", " ") - json_object = json.loads(json_str_processed) - unprocessed_data = UnprocessedData( - submitter=json_object["submitter"], - metadata=json_object["data"]["metadata"], - unalignedNucleotideSequences=json_object["data"]["unalignedNucleotideSequences"], - ) - entry = UnprocessedEntry( - accessionVersion=f"{json_object['accession']}.{ - json_object['version']}", - data=unprocessed_data, - ) - entries.append(entry) - return entries - - def parse_nextclade_tsv( amino_acid_insertions: defaultdict[ AccessionVersion, defaultdict[GeneName, list[AminoAcidInsertion]] @@ -725,17 +703,29 @@ def run(config: Config) -> None: if config.nextclade_dataset_name: download_nextclade_dataset(dataset_dir, config) total_processed = 0 + etag = None + last_force_refresh = time.time() while True: logging.debug("Fetching unprocessed sequences") - unprocessed = parse_ndjson(fetch_unprocessed_sequences(config.batch_size, config)) - if len(unprocessed) == 0: + # Reset etag every hour just in case + if last_force_refresh + 3600 < time.time(): + etag = None + last_force_refresh = time.time() + etag, unprocessed = fetch_unprocessed_sequences(etag, config) + if not unprocessed: # sleep 1 sec and try again logging.debug("No unprocessed sequences found. Sleeping for 1 second.") time.sleep(1) continue - # Process the sequences, get result as dictionary - processed = process_all(unprocessed, dataset_dir, config) - # Submit the result + # Don't use etag if we just got data, preprocessing only asks for 100 sequences to process at a time, so there might be more + etag = None + try: + processed = process_all(unprocessed, dataset_dir, config) + except Exception as e: + logging.exception( + f"Processing failed. Traceback : {e}. Unprocessed data: {unprocessed}" + ) + continue try: submit_processed_sequences(processed, dataset_dir, config) except RuntimeError as e: diff --git a/preprocessing/nextclade/src/loculus_preprocessing/processing_functions.py b/preprocessing/nextclade/src/loculus_preprocessing/processing_functions.py index aeed9d114..780764d66 100644 --- a/preprocessing/nextclade/src/loculus_preprocessing/processing_functions.py +++ b/preprocessing/nextclade/src/loculus_preprocessing/processing_functions.py @@ -423,7 +423,9 @@ def identity( warnings.append( ProcessingAnnotation( source=[ - AnnotationSource(name=output_field, type=AnnotationSourceType.METADATA) + AnnotationSource( + name=output_field, type=AnnotationSourceType.METADATA + ) ], message=f"Invalid boolean value: {input_datum}. Defaulting to null.", ) From 1c27fa8fcb5e9450cb919928f037a636ee9a8c96 Mon Sep 17 00:00:00 2001 From: Cornelius Roemer Date: Thu, 19 Sep 2024 12:53:28 +0200 Subject: [PATCH 2/3] Fix description --- .../backend/controller/SubmissionControllerDescriptions.kt | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/backend/src/main/kotlin/org/loculus/backend/controller/SubmissionControllerDescriptions.kt b/backend/src/main/kotlin/org/loculus/backend/controller/SubmissionControllerDescriptions.kt index f88076596..49ec4eb96 100644 --- a/backend/src/main/kotlin/org/loculus/backend/controller/SubmissionControllerDescriptions.kt +++ b/backend/src/main/kotlin/org/loculus/backend/controller/SubmissionControllerDescriptions.kt @@ -146,8 +146,8 @@ and roll back the whole transaction. const val GET_RELEASED_DATA_DESCRIPTION = """ Get released data as a stream of NDJSON. This returns all accession versions that have the status 'APPROVED_FOR_RELEASE'. -Optionally add HttpHeader If-Modified-Since in unix timestamp (in seconds), -to only retrieve all released data if the database has changed since If-Modified-Since. +Optionally submit the etag received in previous request with If-None-Match +to only retrieve all released data if the database has changed since last request. """ const val GET_RELEASED_DATA_RESPONSE_DESCRIPTION = """ From d968edc863112617f3800c2355d41f32486fe69d Mon Sep 17 00:00:00 2001 From: Cornelius Roemer Date: Thu, 19 Sep 2024 12:59:19 +0200 Subject: [PATCH 3/3] fix imports --- .../main/kotlin/org/loculus/backend/model/ReleasedDataModel.kt | 2 -- 1 file changed, 2 deletions(-) diff --git a/backend/src/main/kotlin/org/loculus/backend/model/ReleasedDataModel.kt b/backend/src/main/kotlin/org/loculus/backend/model/ReleasedDataModel.kt index 2b670cb29..59f05a39b 100644 --- a/backend/src/main/kotlin/org/loculus/backend/model/ReleasedDataModel.kt +++ b/backend/src/main/kotlin/org/loculus/backend/model/ReleasedDataModel.kt @@ -1,6 +1,5 @@ package org.loculus.backend.model -import UpdateTrackerTable import com.fasterxml.jackson.databind.node.BooleanNode import com.fasterxml.jackson.databind.node.IntNode import com.fasterxml.jackson.databind.node.LongNode @@ -10,7 +9,6 @@ import kotlinx.datetime.Clock import kotlinx.datetime.TimeZone import kotlinx.datetime.toLocalDateTime import mu.KotlinLogging -import org.jetbrains.exposed.sql.selectAll import org.loculus.backend.api.DataUseTerms import org.loculus.backend.api.GeneticSequence import org.loculus.backend.api.Organism