Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Update config, remove deployment, add revoke cronjob #2473

Merged
merged 19 commits into from
Aug 22, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 13 additions & 3 deletions ingest/Snakefile
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ COLUMN_MAPPING = config["column_mapping"]
LOG_LEVEL = config.get("log_level", "INFO")
NCBI_API_KEY = os.getenv("NCBI_API_KEY")
FILTER_FASTA_HEADERS = config.get("filter_fasta_headers", None)
APPROVE_TIMEOUT_MIN = config.get("approve_timeout_min") # time in minutes


def rename_columns(input_file, output_file, mapping=COLUMN_MAPPING):
Expand Down Expand Up @@ -58,7 +59,7 @@ rule fetch_ncbi_dataset_package:
dataset_package="results/ncbi_dataset.zip",
params:
taxon_id=TAXON_ID,
api_key=NCBI_API_KEY
api_key=NCBI_API_KEY,
shell:
"""
datasets download virus genome taxon {params.taxon_id} \
Expand Down Expand Up @@ -113,7 +114,7 @@ rule extract_ncbi_dataset_sequences:
> {output.ncbi_dataset_sequences}
"""


rule calculate_sequence_hashes:
"""Output JSON: {insdc_accession: md5_sequence_hash, ...}"""
input:
Expand All @@ -130,7 +131,9 @@ rule calculate_sequence_hashes:
--output-sequences {output.sequence_json}
"""


if FILTER_FASTA_HEADERS:

rule filter_fasta_headers:
input:
sequences="results/sequences.fasta",
Expand All @@ -153,7 +156,11 @@ if FILTER_FASTA_HEADERS:

rule align:
input:
sequences="results/sequences_filtered.fasta" if FILTER_FASTA_HEADERS else "results/sequences.fasta",
sequences=(
"results/sequences_filtered.fasta"
if FILTER_FASTA_HEADERS
else "results/sequences.fasta"
),
output:
results="results/nextclade_{segment}.tsv",
params:
Expand Down Expand Up @@ -411,6 +418,7 @@ rule revise:
fi
"""


rule regroup_and_revoke:
input:
script="scripts/call_loculus.py",
Expand Down Expand Up @@ -444,10 +452,12 @@ rule approve:
approved=touch("results/approved"),
params:
log_level=LOG_LEVEL,
approve_timeout_min=APPROVE_TIMEOUT_MIN,
shell:
"""
python {input.script} \
--mode approve \
--config-file {input.config} \
--log-level {params.log_level} \
--approve-timeout {params.approve_timeout_min}
"""
1 change: 1 addition & 0 deletions ingest/config/defaults.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -75,3 +75,4 @@ username: insdc_ingest_user
password: insdc_ingest_user
keycloak_client_id: backend-client
subsample_fraction: 1.0
approve_timeout_min: "25" # Cronjobs run every 30min, make approve stop before it is forced to stop by argocd
18 changes: 17 additions & 1 deletion ingest/scripts/call_loculus.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,13 +3,15 @@
import os
from collections import defaultdict
from dataclasses import dataclass
from datetime import datetime, timedelta
from http import HTTPMethod
from pathlib import Path
from time import sleep
from typing import Any, Literal

import click
import jsonlines
import pytz
import requests
import yaml

Expand All @@ -21,6 +23,8 @@
datefmt="%H:%M:%S",
)

_start_time: datetime | None = None


@dataclass
class Config:
Expand Down Expand Up @@ -413,10 +417,18 @@ def get_submitted(config: Config):
required=False,
type=click.Path(exists=True),
)
def submit_to_loculus(metadata, sequences, mode, log_level, config_file, output, revoke_map):
@click.option(
"--approve-timeout",
required=False,
type=int,
)
def submit_to_loculus(
metadata, sequences, mode, log_level, config_file, output, revoke_map, approve_timeout
):
"""
Submit data to Loculus.
"""
global _start_time
logger.setLevel(log_level)
logging.getLogger("requests").setLevel(logging.WARNING)
logging.getLogger("urllib3").setLevel(logging.WARNING)
Expand Down Expand Up @@ -449,10 +461,14 @@ def record_factory(*args, **kwargs):

if mode == "approve":
while True:
if not _start_time:
_start_time = datetime.now(tz=pytz.utc)
logger.info("Approving sequences")
response = approve(config)
logger.info(f"Approved: {len(response)} sequences")
sleep(30)
if datetime.now(tz=pytz.utc) - timedelta(minutes=approve_timeout) > _start_time:
break

if mode == "regroup-and-revoke":
try:
Expand Down
5 changes: 3 additions & 2 deletions ingest/scripts/prepare_files.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@

@dataclass
class Config:
organism: str
segmented: str
nucleotide_sequences: list[str]
slack_hook: str
Expand Down Expand Up @@ -52,8 +53,8 @@ def revocation_notification(config: Config, to_revoke: dict[str, dict[str, str]]
text = (
f"{config.backend_url}: Ingest pipeline wants to add the following sequences"
f" which will lead to revocations: {to_revoke}. "
"If you agree with this run the regroup_and_revoke rule in the ingest pod:"
" `kubectl exec -it INGEST_POD_NAME -- snakemake regroup_and_revoke`."
"If you agree with this manually run the regroup_and_revoke cronjob:"
f" `kubectl create job --from=cronjob/loculus-revoke-and-regroup-cronjob-{config.organism} <manual-job-name>`."
)
notify(config, text)

Expand Down
112 changes: 53 additions & 59 deletions kubernetes/loculus/templates/ingest-deployment.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -3,74 +3,66 @@
{{- range $key, $value := (.Values.organisms | default .Values.defaultOrganisms) }}
{{- if $value.ingest }}
---
apiVersion: apps/v1
kind: Deployment
apiVersion: batch/v1
kind: CronJob
metadata:
name: loculus-ingest-{{ $key }}
annotations:
argocd.argoproj.io/sync-options: Replace=true
reloader.stakater.com/auto: "true"
name: loculus-ingest-cronjob-{{ $key }}
spec:
replicas: 1
selector:
matchLabels:
app: loculus
component: loculus-ingest-{{ $key }}
template:
metadata:
labels:
app: loculus
component: loculus-ingest-{{ $key }}
schedule: "*/2 * * * *" # ingest every 2 minutes but forbid concurrency, have jobs run only for Values.ingestLimitSeconds
startingDeadlineSeconds: 60
concurrencyPolicy: Forbid
jobTemplate:
spec:
containers:
- name: ingest-{{ $key }}
image: {{ $value.ingest.image}}:{{ $dockerTag }}
imagePullPolicy: Always
resources:
requests:
memory: "80Mi"
cpu: "10m"
limits:
memory: "10Gi"
env:
- name: KEYCLOAK_INGEST_PASSWORD
valueFrom:
secretKeyRef:
name: service-accounts
key: insdcIngestUserPassword
- name: NCBI_API_KEY
valueFrom:
secretKeyRef:
name: ingest-ncbi
key: api-key
- name: SLACK_HOOK
valueFrom:
secretKeyRef:
name: slack-notifications
key: slack-hook
args:
- snakemake
- results/approved
- results/submitted # Remove in production, see #1777
- results/revised # Remove in production, see #1777
- --all-temp # Reduce disk usage by not keeping files around
{{- if $value.ingest.configFile }}
volumeMounts:
activeDeadlineSeconds: {{ $.Values.ingestLimitSeconds }}
template:
metadata:
labels:
app: loculus
component: loculus-ingest-cronjob-{{ $key }}
annotations:
argocd.argoproj.io/sync-options: Force=true,Replace=true
spec:
restartPolicy: Never
containers:
- name: ingest-{{ $key }}
image: {{ $value.ingest.image}}:{{ $dockerTag }}
imagePullPolicy: Always
resources:
requests:
memory: "1Gi"
cpu: "200m"
limits:
cpu: "200m"
memory: "10Gi"
env:
- name: KEYCLOAK_INGEST_PASSWORD
valueFrom:
secretKeyRef:
name: service-accounts
key: insdcIngestUserPassword
args:
- snakemake
- results/submitted
- results/revised
- results/approved
- --all-temp # Reduce disk usage by not keeping files around
{{- if $value.ingest.configFile }}
volumeMounts:
- name: loculus-ingest-config-volume-{{ $key }}
mountPath: /package/config/config.yaml
subPath: config.yaml
volumes:
- name: loculus-ingest-config-volume-{{ $key }}
mountPath: /package/config/config.yaml
subPath: config.yaml
volumes:
- name: loculus-ingest-config-volume-{{ $key }}
configMap:
name: loculus-ingest-config-{{ $key }}
{{- end }}
configMap:
name: loculus-ingest-config-{{ $key }}
{{- end }}
---
apiVersion: batch/v1
kind: CronJob
metadata:
name: loculus-ingest-cronjob-{{ $key }}
name: loculus-revoke-and-regroup-cronjob-{{ $key }}
spec:
schedule: "*/30 * * * *" # ingest every 30 minutes (not more often to be kind to NCBI)
schedule: "0 0 31 2 *" # Never runs without manual trigger
anna-parker marked this conversation as resolved.
Show resolved Hide resolved
startingDeadlineSeconds: 60
concurrencyPolicy: Forbid
jobTemplate:
Expand Down Expand Up @@ -107,6 +99,8 @@ spec:
- snakemake
- results/submitted
- results/revised
- results/revoked
- results/approved
- --all-temp # Reduce disk usage by not keeping files around
{{- if $value.ingest.configFile }}
volumeMounts:
Expand Down
Loading