diff --git a/docker/promxy-to-opensearch/Dockerfile b/docker/promxy-to-opensearch/Dockerfile new file mode 100644 index 00000000..4fd5e92b --- /dev/null +++ b/docker/promxy-to-opensearch/Dockerfile @@ -0,0 +1,19 @@ +FROM gitlab-registry.cern.ch/linuxsupport/alma9-base + +ENV PYTHONUNBUFFERED=1 \ + PYTHONDONTWRITEBYTECODE=1 \ + PATH=/usr/local/bin:$PATH \ + PYTHONPATH=/app:$PYTHONPATH + +WORKDIR /app + +RUN dnf -y install epel-release +RUN dnf -y install python3-opensearch-py+kerberos +RUN dnf -y install python3-requests-gssapi + +COPY src/ ./src +COPY promxy_to_opensearch.py . +COPY entrypoint.sh . + +CMD ["/app/entrypoint.sh"] + diff --git a/docker/promxy-to-opensearch/README.md b/docker/promxy-to-opensearch/README.md new file mode 100644 index 00000000..b01f583f --- /dev/null +++ b/docker/promxy-to-opensearch/README.md @@ -0,0 +1,34 @@ +# Promxy to Opensearch + +## Purpose + +This is the codebase for the `promxy-to-opensearch` docker image inside the cmsmonitoring repository of the CERN docker registry. + +This image's main purpose is to serve as a flexible base for creating Kubernetes cronjobs that handle data pipelines between Promxy and OpenSearch. + +## Deployment + +The code for deploying the cronjob in charge of the data loads is in the [CMSKubernetes repository](https://github.com/dmwm/CMSKubernetes/blob/master/kubernetes/monitoring/crons/promxy-to-opensearch.yaml). +[Comment] # TODO Update the readme when the cronjob is moved to terraform + +To deploy or update the cronjob using the yaml, you have to first set the proper `kubectl` context and then apply the yaml with the changes you made by running the following: + +``` bash +export KUBECONFIG= +kubectl apply -f +``` + +The most common update to the cronjob will be changing the data it pushes to Opensearch. This can be done by changing the PromQL query passed through the cronjob environment variables. The full list of the environment variables with their respective usage is: + +- PROMXY_URL: Url to use as the Promxy endpoint. Can be modified to choose between different Promxy instances or to switch between using the `query` or the `query_range` parameter (more info in the [official documentation](https://prometheus.io/docs/prometheus/latest/querying/basics/)).The default value for this variable is `"http://cms-monitoring.cern.ch:30082/api/v1/query_range"`. +- PROM_QUERY: PromQL query to be used when sending the request to Promxy. It can be used to retrieve raw data from a metric or a recording rule, or as a first filter before uploading any data to a different database. Its default value is `"avg_over_time:rucio_report_used_space:1h"` +- OPENSEARCH_HOST: Opensearch instance url to where we will send the data. The default value is `"https://os-cms.cern.ch:443/os"`. +- OPENSEARCH_INDEX: Index to which we will write the data inside Opensearch. +- KRB5_CLIENT_KTNAME: Path to the keytab file to be used for Kerberos authentication in all of CERN internal services. The default value is `"/etc/secrets/keytab"`, and the file is mounted from a Kubernetes secret. +- START_DATE: Start of the time range that we want the data from. If not specified, the time range will be set to the past month. +- END_DATE: End of the time range that we want the data from. If not specified, the time range will be set to the past month. +- STEP: Time step to be used when querying Promxy, in seconds. Smaller values means more data points but might lead to data duplication, while higher values will gather less data but risk missing some data points. The optimal value depends on the granularity of the time series (i.e. 3600 for hourly series or 86400 for daily ones). +- DEBUG_MODE: If `"true"`, sets the logging level to DEBUG, increasing the verbosity of the logs. +- DRY_RUN: If `"true"`, runs the workload without uploading the data to Opensearch. Useful for debugging purposes whithout getting wrong or duplicate data inside Opensearch. + +[Comment] # TODO Add setup instructions for local run. \ No newline at end of file diff --git a/docker/promxy-to-opensearch/entrypoint.sh b/docker/promxy-to-opensearch/entrypoint.sh new file mode 100644 index 00000000..b253928e --- /dev/null +++ b/docker/promxy-to-opensearch/entrypoint.sh @@ -0,0 +1,22 @@ +#!/bin/bash +set -euo pipefail + +KEYTAB_PATH="${KRB5_CLIENT_KTNAME:-/etc/krb5.keytab}" + +# Extract principal from keytab (last one listed) +principal=$(klist -k "$KEYTAB_PATH" | tail -1 | awk '{print $2}') + +echo "[INFO] Using principal: $principal" + +# Authenticate with Kerberos +if ! kinit "$principal" -k -t "$KEYTAB_PATH" >/dev/null; then + echo "[ERROR] Kerberos authentication failed using keytab: $KEYTAB_PATH" + exit 1 +fi + +# Strip @REALM to get short name (optional) +SHORT_NAME=$(echo "$principal" | grep -o '^[^@]*') +echo "[INFO] Kerberos auth succeeded. Principal short name: $SHORT_NAME" + +python3 "/app/promxy_to_opensearch.py" + diff --git a/docker/promxy-to-opensearch/promxy_to_opensearch.py b/docker/promxy-to-opensearch/promxy_to_opensearch.py new file mode 100644 index 00000000..7c75b958 --- /dev/null +++ b/docker/promxy-to-opensearch/promxy_to_opensearch.py @@ -0,0 +1,61 @@ +from logging import DEBUG +import itertools +import src.constants as const +from src.promxy import query_promxy, generate_os_docs +from src.helpers import generate_date_ranges +from src.os_utils import os_upload_docs_in_bulk +from src.logging import logger + + +logger.debug( + f""" +Environment Configuration: + PROMXY_URL = {const.PROMXY_URL} + PROM_QUERY = {const.PROM_QUERY} + OPENSEARCH_INDEX = {const.INDEX} + OPENSEARCH_HOST = {const.OS_HOST} + CERT_PATH = {const.CERT_PATH} + START_DATE = {const.START_DATE} + END_DATE = {const.END_DATE} + STEP = {const.STEP} +""" +) + +# PromQL gives inconsistent responses for query ranges spanning longer +# than 30 days, so we split our date range in smaller chunks if necessary. +date_ranges = generate_date_ranges(const.START, const.END) +logger.debug(f"Date ranges: {date_ranges}") +responses = [] +for range_start, range_end in date_ranges: + logger.info(f"Current date range: {range_start.date()} - {range_end.date()}") + promxy_request_params = { + "query": const.PROM_QUERY, + "start": int(range_start.timestamp()), + "end": int(range_end.timestamp()), + "step": const.STEP, + } + responses.append( + query_promxy(promxy_url=const.PROMXY_URL, params=promxy_request_params) + ) + +if const.DRY_RUN: + n_samples = 2 + generator = generate_os_docs(responses=responses, os_index=const.INDEX) + doc_samples = list(itertools.islice(generator, n_samples)) + total_documents = n_samples + sum(1 for _ in generator) + + logger.info(f"Document samples: {doc_samples}") + logger.info(f"[DRY RUN] {total_documents} docs generated.") +else: + logger.info(f"Uploading to OpenSearch: {const.OS_HOST},\n Index: {const.INDEX}") + successes, failures = os_upload_docs_in_bulk( + os_host=const.OS_HOST, + ca_cert_path=const.CERT_PATH, + doc_iterable=generate_os_docs(responses=responses, os_index=const.INDEX), + ) + logger.info(f"Uploaded {successes} docs.") + if failures: + if logger.getEffectiveLevel() == DEBUG: + for failure in failures: + logger.debug(f"Failure detail: {failure}") + logger.error(f"{len(failures)} OpenSearch index failures occurred.") diff --git a/docker/promxy-to-opensearch/src/constants.py b/docker/promxy-to-opensearch/src/constants.py new file mode 100644 index 00000000..4bddc7d8 --- /dev/null +++ b/docker/promxy-to-opensearch/src/constants.py @@ -0,0 +1,33 @@ +import os +import logging +from datetime import datetime, timezone, timedelta + + +DEBUG_MODE = os.getenv("DEBUG_MODE", "false").lower() == "true" +LOG_LEVEL = logging.DEBUG if DEBUG_MODE else logging.INFO + +DRY_RUN = os.getenv("DRY_RUN", "false").lower() == "true" + +PROMXY_URL = os.environ.get( + "PROMXY_URL", "http://cms-monitoring.cern.ch:30082/api/v1/query_range" +) +PROM_QUERY = os.environ.get("PROM_QUERY", "avg_over_time:rucio_report_used_space:1h") +INDEX = os.environ.get("OPENSEARCH_INDEX", "rucio-used-space-1h") + +OS_HOST = os.environ.get("OPENSEARCH_HOST", "https://os-cms.cern.ch:443/os") +CERT_PATH = os.environ.get("CERT_PATH", "/etc/pki/tls/certs/ca-bundle.trust.crt") + +STEP = os.environ.get("STEP", "3600") +START_DATE = os.environ.get("START_DATE", None) +END_DATE = os.environ.get("END_DATE", None) + +START = ( + datetime.strptime(START_DATE, "%Y-%m-%d") + if START_DATE and END_DATE + else datetime.now(timezone.utc) - timedelta(days=30) +) +END = ( + datetime.strptime(END_DATE, "%Y-%m-%d") + if START_DATE and END_DATE + else datetime.now(timezone.utc) +) diff --git a/docker/promxy-to-opensearch/src/helpers.py b/docker/promxy-to-opensearch/src/helpers.py new file mode 100644 index 00000000..b6bc5506 --- /dev/null +++ b/docker/promxy-to-opensearch/src/helpers.py @@ -0,0 +1,16 @@ +from datetime import datetime, timedelta +from typing import Optional + + +def generate_date_ranges( + start: Optional[datetime], end: Optional[datetime] +) -> list[tuple[datetime, datetime]]: + if start > end: + raise ValueError(f"Start date: {start} cannot be later than end date: {end}.") + ranges = [] + current_start = start + while current_start <= end: + current_end = min(current_start + timedelta(days=30), end) + ranges.append((current_start, current_end)) + current_start = current_end + timedelta(days=1) + return ranges diff --git a/docker/promxy-to-opensearch/src/logging.py b/docker/promxy-to-opensearch/src/logging.py new file mode 100644 index 00000000..f4664508 --- /dev/null +++ b/docker/promxy-to-opensearch/src/logging.py @@ -0,0 +1,23 @@ +import logging +import src.constants as const + +APP_NAME = "promxy_to_opensearch" + +def _get_logger() -> logging.Logger: + logger = logging.getLogger(APP_NAME) + logger.setLevel(const.LOG_LEVEL) + + # TODO: Look into redirecting full logs to logstash or some other cloud tool + fh = logging.FileHandler(f'{__name__}.log') + fh.setLevel(logging.DEBUG) + ch = logging.StreamHandler() + ch.setLevel(const.LOG_LEVEL) + formatter = logging.Formatter('%(asctime)s - %(name)s - %(levelname)s - %(message)s') + fh.setFormatter(formatter) + ch.setFormatter(formatter) + + logger.addHandler(fh) + logger.addHandler(ch) + return logger + +logger: logging.Logger = _get_logger() \ No newline at end of file diff --git a/docker/promxy-to-opensearch/src/os_utils.py b/docker/promxy-to-opensearch/src/os_utils.py new file mode 100644 index 00000000..b0a23d3a --- /dev/null +++ b/docker/promxy-to-opensearch/src/os_utils.py @@ -0,0 +1,21 @@ +from typing import Iterable +from opensearchpy import OpenSearch, helpers, RequestsHttpConnection +from requests_gssapi import HTTPSPNEGOAuth, OPTIONAL + +def get_opensearch_client(os_host: str, ca_cert_path: str) -> int: + return OpenSearch( + [os_host], + use_ssl=True, + verify_certs=True, + ca_certs=ca_cert_path, + connection_class=RequestsHttpConnection, + http_auth=HTTPSPNEGOAuth(mutual_authentication=OPTIONAL), + ) + + +def os_upload_docs_in_bulk(os_host: str, ca_cert_path: str, doc_iterable: Iterable): + os_client = get_opensearch_client(os_host=os_host, ca_cert_path=ca_cert_path) + + success, failures = helpers.bulk(os_client, doc_iterable, max_retries=3) + + return success, failures diff --git a/docker/promxy-to-opensearch/src/promxy.py b/docker/promxy-to-opensearch/src/promxy.py new file mode 100644 index 00000000..5913db1f --- /dev/null +++ b/docker/promxy-to-opensearch/src/promxy.py @@ -0,0 +1,78 @@ +import requests +import sys +import datetime +from typing import TypedDict, Literal, Union, Iterator, Any + +from src.logging import logger + + +# TODO: Pass metrics as ConfigMaps to the K8s cronjobs +# Also can be extended to all other parameters +class RucioUsageFields(TypedDict): + __name__: str + aggregate: str + country: str + job: str + rse: str + rse_type: str + rucioInstance: str + source: str + + +class PromSeriesData(TypedDict): + metric: RucioUsageFields + values: list[list[int, str]] + + +class InnerData(TypedDict): + result: list[PromSeriesData] + + +class RangeQueryResponse(TypedDict): + status: Literal["success"] + data: dict[str, Union[str, list[InnerData]]] + + +def query_promxy(promxy_url: str, params: dict[str, str]) -> RangeQueryResponse: + logger.info(f"Querying Promxy: {promxy_url}") + + try: + resp = requests.get( + promxy_url, + params=params, + ) + resp.raise_for_status() + except requests.RequestException as e: + logger.error(f"Failed to fetch from Promxy: {e}") + sys.exit(1) + + logger.debug(f"Final URL: {resp.url}") + logger.debug(f"Promxy response status: {resp.status_code}") + # logging.debug(f"Promxy response sample: {resp.json()['data']['result'][:5]}") + + return resp.json() + + +def generate_os_docs(responses: list[RangeQueryResponse], os_index: str) -> Iterator[dict[str:Any]]: + for response in responses: + for series in response["data"]["result"]: + fields = series.get("metric", {}) + values = series.get("values", []) + for ts, val in values: + yield { + "_op_type": "index", + "_index": os_index, + "_source": { + "timestamp": datetime.datetime.fromtimestamp( + int(float(ts)), datetime.timezone.utc + ).isoformat() + + "Z", + "rse": fields.get("rse"), + "rse_type": fields.get("rse_type"), + "tier": fields.get("rse")[:2], + "country": fields.get("country"), + "rucioInstance": fields.get("rucioInstance"), + "source": fields.get("source"), + "used_bytes": float(val), + }, + }