diff --git a/preprocessing/nextclade/src/loculus_preprocessing/backend.py b/preprocessing/nextclade/src/loculus_preprocessing/backend.py index ae249787b..aafa1352e 100644 --- a/preprocessing/nextclade/src/loculus_preprocessing/backend.py +++ b/preprocessing/nextclade/src/loculus_preprocessing/backend.py @@ -16,6 +16,8 @@ from .config import Config from .datatypes import ( ProcessedEntry, + UnprocessedData, + UnprocessedEntry, ) @@ -66,7 +68,31 @@ def get_jwt(config: Config) -> str: raise Exception(error_msg) -def fetch_unprocessed_sequences(etag: str, config: Config) -> tuple[str, str]: +def parse_ndjson(ndjson_data: str) -> Sequence[UnprocessedEntry]: + entries = [] + for json_str in ndjson_data.split("\n"): + if len(json_str) == 0: + continue + # Loculus currently cannot handle non-breaking spaces. + json_str_processed = json_str.replace("\N{NO-BREAK SPACE}", " ") + json_object = json.loads(json_str_processed) + unprocessed_data = UnprocessedData( + submitter=json_object["submitter"], + metadata=json_object["data"]["metadata"], + unalignedNucleotideSequences=json_object["data"]["unalignedNucleotideSequences"], + ) + entry = UnprocessedEntry( + accessionVersion=f"{json_object['accession']}.{ + json_object['version']}", + data=unprocessed_data, + ) + entries.append(entry) + return entries + + +def fetch_unprocessed_sequences( + etag: str, config: Config +) -> tuple[str, Sequence[UnprocessedEntry] | None]: n = config.batch_size url = config.backend_host.rstrip("/") + "/extract-unprocessed-data" logging.debug(f"Fetching {n} unprocessed sequences from {url}") @@ -78,7 +104,7 @@ def fetch_unprocessed_sequences(etag: str, config: Config) -> tuple[str, str]: case HTTPStatus.NOT_MODIFIED: return etag, None case HTTPStatus.OK: - return response.headers["ETag"], response.text + return response.headers["ETag"], parse_ndjson(response.text) case HTTPStatus.UNPROCESSABLE_ENTITY: logging.debug(f"{response.text}.\nSleeping for a while.") time.sleep(60 * 1) diff --git a/preprocessing/nextclade/src/loculus_preprocessing/prepro.py b/preprocessing/nextclade/src/loculus_preprocessing/prepro.py index d49e15d47..c659b050d 100644 --- a/preprocessing/nextclade/src/loculus_preprocessing/prepro.py +++ b/preprocessing/nextclade/src/loculus_preprocessing/prepro.py @@ -52,28 +52,6 @@ # Functions related to reading and writing files -def parse_ndjson(ndjson_data: str) -> Sequence[UnprocessedEntry]: - entries = [] - for json_str in ndjson_data.split("\n"): - if len(json_str) == 0: - continue - # Loculus currently cannot handle non-breaking spaces. - json_str_processed = json_str.replace("\N{NO-BREAK SPACE}", " ") - json_object = json.loads(json_str_processed) - unprocessed_data = UnprocessedData( - submitter=json_object["submitter"], - metadata=json_object["data"]["metadata"], - unalignedNucleotideSequences=json_object["data"]["unalignedNucleotideSequences"], - ) - entry = UnprocessedEntry( - accessionVersion=f"{json_object['accession']}.{ - json_object['version']}", - data=unprocessed_data, - ) - entries.append(entry) - return entries - - def parse_nextclade_tsv( amino_acid_insertions: defaultdict[ AccessionVersion, defaultdict[GeneName, list[AminoAcidInsertion]]