Skip to content

Commit

Permalink
feat(ci, ingest): Prevent duplicated ingest by preventing concurrent …
Browse files Browse the repository at this point in the history
…ingest + add regroup&revoke cronjob (#2473)

* Update ingest deployment config: remove job launched after new deployment

* Update ingest deployment config: Have ingest cronjob try to start every 2 minutes,  add approve to cronjob, thus triggering ingest after new syncs (i.e. preview deployments), make argocd restart cronjobs on new sync. Enforce no concurrency on cronjob level.

* Add approval timeout in minutes to the approve snakemake rule this ensures that the cronjob will stop before being removed by kubernetes after the activeDeadlineSeconds. 

* Add a revoke_and_regroup cronjob that never runs but can be launched manually if re-ingest finds metadata updates that change the grouping of segments for segmented viruses.

---------

Co-authored-by: Cornelius Roemer <cornelius.roemer@gmail.com>
  • Loading branch information
anna-parker and corneliusroemer authored Aug 22, 2024
1 parent 8a76ef0 commit e1cabe0
Show file tree
Hide file tree
Showing 5 changed files with 87 additions and 65 deletions.
16 changes: 13 additions & 3 deletions ingest/Snakefile
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ COLUMN_MAPPING = config["column_mapping"]
LOG_LEVEL = config.get("log_level", "INFO")
NCBI_API_KEY = os.getenv("NCBI_API_KEY")
FILTER_FASTA_HEADERS = config.get("filter_fasta_headers", None)
APPROVE_TIMEOUT_MIN = config.get("approve_timeout_min") # time in minutes


def rename_columns(input_file, output_file, mapping=COLUMN_MAPPING):
Expand Down Expand Up @@ -58,7 +59,7 @@ rule fetch_ncbi_dataset_package:
dataset_package="results/ncbi_dataset.zip",
params:
taxon_id=TAXON_ID,
api_key=NCBI_API_KEY
api_key=NCBI_API_KEY,
shell:
"""
datasets download virus genome taxon {params.taxon_id} \
Expand Down Expand Up @@ -113,7 +114,7 @@ rule extract_ncbi_dataset_sequences:
> {output.ncbi_dataset_sequences}
"""


rule calculate_sequence_hashes:
"""Output JSON: {insdc_accession: md5_sequence_hash, ...}"""
input:
Expand All @@ -130,7 +131,9 @@ rule calculate_sequence_hashes:
--output-sequences {output.sequence_json}
"""


if FILTER_FASTA_HEADERS:

rule filter_fasta_headers:
input:
sequences="results/sequences.fasta",
Expand All @@ -153,7 +156,11 @@ if FILTER_FASTA_HEADERS:

rule align:
input:
sequences="results/sequences_filtered.fasta" if FILTER_FASTA_HEADERS else "results/sequences.fasta",
sequences=(
"results/sequences_filtered.fasta"
if FILTER_FASTA_HEADERS
else "results/sequences.fasta"
),
output:
results="results/nextclade_{segment}.tsv",
params:
Expand Down Expand Up @@ -411,6 +418,7 @@ rule revise:
fi
"""


rule regroup_and_revoke:
input:
script="scripts/call_loculus.py",
Expand Down Expand Up @@ -444,10 +452,12 @@ rule approve:
approved=touch("results/approved"),
params:
log_level=LOG_LEVEL,
approve_timeout_min=APPROVE_TIMEOUT_MIN,
shell:
"""
python {input.script} \
--mode approve \
--config-file {input.config} \
--log-level {params.log_level} \
--approve-timeout {params.approve_timeout_min}
"""
1 change: 1 addition & 0 deletions ingest/config/defaults.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -75,3 +75,4 @@ username: insdc_ingest_user
password: insdc_ingest_user
keycloak_client_id: backend-client
subsample_fraction: 1.0
approve_timeout_min: "25" # Cronjobs run every 30min, make approve stop before it is forced to stop by argocd
18 changes: 17 additions & 1 deletion ingest/scripts/call_loculus.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,13 +3,15 @@
import os
from collections import defaultdict
from dataclasses import dataclass
from datetime import datetime, timedelta
from http import HTTPMethod
from pathlib import Path
from time import sleep
from typing import Any, Literal

import click
import jsonlines
import pytz
import requests
import yaml

Expand All @@ -21,6 +23,8 @@
datefmt="%H:%M:%S",
)

_start_time: datetime | None = None


@dataclass
class Config:
Expand Down Expand Up @@ -413,10 +417,18 @@ def get_submitted(config: Config):
required=False,
type=click.Path(exists=True),
)
def submit_to_loculus(metadata, sequences, mode, log_level, config_file, output, revoke_map):
@click.option(
"--approve-timeout",
required=False,
type=int,
)
def submit_to_loculus(
metadata, sequences, mode, log_level, config_file, output, revoke_map, approve_timeout
):
"""
Submit data to Loculus.
"""
global _start_time
logger.setLevel(log_level)
logging.getLogger("requests").setLevel(logging.WARNING)
logging.getLogger("urllib3").setLevel(logging.WARNING)
Expand Down Expand Up @@ -449,10 +461,14 @@ def record_factory(*args, **kwargs):

if mode == "approve":
while True:
if not _start_time:
_start_time = datetime.now(tz=pytz.utc)
logger.info("Approving sequences")
response = approve(config)
logger.info(f"Approved: {len(response)} sequences")
sleep(30)
if datetime.now(tz=pytz.utc) - timedelta(minutes=approve_timeout) > _start_time:
break

if mode == "regroup-and-revoke":
try:
Expand Down
5 changes: 3 additions & 2 deletions ingest/scripts/prepare_files.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@

@dataclass
class Config:
organism: str
segmented: str
nucleotide_sequences: list[str]
slack_hook: str
Expand Down Expand Up @@ -52,8 +53,8 @@ def revocation_notification(config: Config, to_revoke: dict[str, dict[str, str]]
text = (
f"{config.backend_url}: Ingest pipeline wants to add the following sequences"
f" which will lead to revocations: {to_revoke}. "
"If you agree with this run the regroup_and_revoke rule in the ingest pod:"
" `kubectl exec -it INGEST_POD_NAME -- snakemake regroup_and_revoke`."
"If you agree with this manually run the regroup_and_revoke cronjob:"
f" `kubectl create job --from=cronjob/loculus-revoke-and-regroup-cronjob-{config.organism} <manual-job-name>`."
)
notify(config, text)

Expand Down
112 changes: 53 additions & 59 deletions kubernetes/loculus/templates/ingest-deployment.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -3,74 +3,66 @@
{{- range $key, $value := (.Values.organisms | default .Values.defaultOrganisms) }}
{{- if $value.ingest }}
---
apiVersion: apps/v1
kind: Deployment
apiVersion: batch/v1
kind: CronJob
metadata:
name: loculus-ingest-{{ $key }}
annotations:
argocd.argoproj.io/sync-options: Replace=true
reloader.stakater.com/auto: "true"
name: loculus-ingest-cronjob-{{ $key }}
spec:
replicas: 1
selector:
matchLabels:
app: loculus
component: loculus-ingest-{{ $key }}
template:
metadata:
labels:
app: loculus
component: loculus-ingest-{{ $key }}
schedule: "*/2 * * * *" # ingest every 2 minutes but forbid concurrency, have jobs run only for Values.ingestLimitSeconds
startingDeadlineSeconds: 60
concurrencyPolicy: Forbid
jobTemplate:
spec:
containers:
- name: ingest-{{ $key }}
image: {{ $value.ingest.image}}:{{ $dockerTag }}
imagePullPolicy: Always
resources:
requests:
memory: "80Mi"
cpu: "10m"
limits:
memory: "10Gi"
env:
- name: KEYCLOAK_INGEST_PASSWORD
valueFrom:
secretKeyRef:
name: service-accounts
key: insdcIngestUserPassword
- name: NCBI_API_KEY
valueFrom:
secretKeyRef:
name: ingest-ncbi
key: api-key
- name: SLACK_HOOK
valueFrom:
secretKeyRef:
name: slack-notifications
key: slack-hook
args:
- snakemake
- results/approved
- results/submitted # Remove in production, see #1777
- results/revised # Remove in production, see #1777
- --all-temp # Reduce disk usage by not keeping files around
{{- if $value.ingest.configFile }}
volumeMounts:
activeDeadlineSeconds: {{ $.Values.ingestLimitSeconds }}
template:
metadata:
labels:
app: loculus
component: loculus-ingest-cronjob-{{ $key }}
annotations:
argocd.argoproj.io/sync-options: Force=true,Replace=true
spec:
restartPolicy: Never
containers:
- name: ingest-{{ $key }}
image: {{ $value.ingest.image}}:{{ $dockerTag }}
imagePullPolicy: Always
resources:
requests:
memory: "1Gi"
cpu: "200m"
limits:
cpu: "200m"
memory: "10Gi"
env:
- name: KEYCLOAK_INGEST_PASSWORD
valueFrom:
secretKeyRef:
name: service-accounts
key: insdcIngestUserPassword
args:
- snakemake
- results/submitted
- results/revised
- results/approved
- --all-temp # Reduce disk usage by not keeping files around
{{- if $value.ingest.configFile }}
volumeMounts:
- name: loculus-ingest-config-volume-{{ $key }}
mountPath: /package/config/config.yaml
subPath: config.yaml
volumes:
- name: loculus-ingest-config-volume-{{ $key }}
mountPath: /package/config/config.yaml
subPath: config.yaml
volumes:
- name: loculus-ingest-config-volume-{{ $key }}
configMap:
name: loculus-ingest-config-{{ $key }}
{{- end }}
configMap:
name: loculus-ingest-config-{{ $key }}
{{- end }}
---
apiVersion: batch/v1
kind: CronJob
metadata:
name: loculus-ingest-cronjob-{{ $key }}
name: loculus-revoke-and-regroup-cronjob-{{ $key }}
spec:
schedule: "*/30 * * * *" # ingest every 30 minutes (not more often to be kind to NCBI)
schedule: "0 0 31 2 *" # Never runs without manual trigger
startingDeadlineSeconds: 60
concurrencyPolicy: Forbid
jobTemplate:
Expand Down Expand Up @@ -107,6 +99,8 @@ spec:
- snakemake
- results/submitted
- results/revised
- results/revoked
- results/approved
- --all-temp # Reduce disk usage by not keeping files around
{{- if $value.ingest.configFile }}
volumeMounts:
Expand Down

0 comments on commit e1cabe0

Please sign in to comment.