From f6e0cf42902ff7dd393049c762f7c6e75edcea19 Mon Sep 17 00:00:00 2001 From: carlosbogo Date: Mon, 11 Aug 2025 18:35:58 +0200 Subject: [PATCH 01/10] Create promxy-to-opensearch docker image --- docker/promxy-to-opensearch/Dockerfile | 19 +++++++++++++++++++ 1 file changed, 19 insertions(+) create mode 100644 docker/promxy-to-opensearch/Dockerfile diff --git a/docker/promxy-to-opensearch/Dockerfile b/docker/promxy-to-opensearch/Dockerfile new file mode 100644 index 00000000..4fd5e92b --- /dev/null +++ b/docker/promxy-to-opensearch/Dockerfile @@ -0,0 +1,19 @@ +FROM gitlab-registry.cern.ch/linuxsupport/alma9-base + +ENV PYTHONUNBUFFERED=1 \ + PYTHONDONTWRITEBYTECODE=1 \ + PATH=/usr/local/bin:$PATH \ + PYTHONPATH=/app:$PYTHONPATH + +WORKDIR /app + +RUN dnf -y install epel-release +RUN dnf -y install python3-opensearch-py+kerberos +RUN dnf -y install python3-requests-gssapi + +COPY src/ ./src +COPY promxy_to_opensearch.py . +COPY entrypoint.sh . + +CMD ["/app/entrypoint.sh"] + From d38aac802cacec5bfaff12998ba4eeb4284bf396 Mon Sep 17 00:00:00 2001 From: carlosbogo Date: Mon, 11 Aug 2025 18:36:54 +0200 Subject: [PATCH 02/10] Write entrypoint script --- docker/promxy-to-opensearch/entrypoint.sh | 22 ++++++++++++++++++++++ 1 file changed, 22 insertions(+) create mode 100644 docker/promxy-to-opensearch/entrypoint.sh diff --git a/docker/promxy-to-opensearch/entrypoint.sh b/docker/promxy-to-opensearch/entrypoint.sh new file mode 100644 index 00000000..b253928e --- /dev/null +++ b/docker/promxy-to-opensearch/entrypoint.sh @@ -0,0 +1,22 @@ +#!/bin/bash +set -euo pipefail + +KEYTAB_PATH="${KRB5_CLIENT_KTNAME:-/etc/krb5.keytab}" + +# Extract principal from keytab (last one listed) +principal=$(klist -k "$KEYTAB_PATH" | tail -1 | awk '{print $2}') + +echo "[INFO] Using principal: $principal" + +# Authenticate with Kerberos +if ! kinit "$principal" -k -t "$KEYTAB_PATH" >/dev/null; then + echo "[ERROR] Kerberos authentication failed using keytab: $KEYTAB_PATH" + exit 1 +fi + +# Strip @REALM to get short name (optional) +SHORT_NAME=$(echo "$principal" | grep -o '^[^@]*') +echo "[INFO] Kerberos auth succeeded. Principal short name: $SHORT_NAME" + +python3 "/app/promxy_to_opensearch.py" + From 9f1ee1c4d90bf08b1284ced0110ddbc28d68297c Mon Sep 17 00:00:00 2001 From: carlosbogo Date: Mon, 11 Aug 2025 18:37:18 +0200 Subject: [PATCH 03/10] Create constants module --- docker/promxy-to-opensearch/src/constants.py | 33 ++++++++++++++++++++ 1 file changed, 33 insertions(+) create mode 100644 docker/promxy-to-opensearch/src/constants.py diff --git a/docker/promxy-to-opensearch/src/constants.py b/docker/promxy-to-opensearch/src/constants.py new file mode 100644 index 00000000..4bddc7d8 --- /dev/null +++ b/docker/promxy-to-opensearch/src/constants.py @@ -0,0 +1,33 @@ +import os +import logging +from datetime import datetime, timezone, timedelta + + +DEBUG_MODE = os.getenv("DEBUG_MODE", "false").lower() == "true" +LOG_LEVEL = logging.DEBUG if DEBUG_MODE else logging.INFO + +DRY_RUN = os.getenv("DRY_RUN", "false").lower() == "true" + +PROMXY_URL = os.environ.get( + "PROMXY_URL", "http://cms-monitoring.cern.ch:30082/api/v1/query_range" +) +PROM_QUERY = os.environ.get("PROM_QUERY", "avg_over_time:rucio_report_used_space:1h") +INDEX = os.environ.get("OPENSEARCH_INDEX", "rucio-used-space-1h") + +OS_HOST = os.environ.get("OPENSEARCH_HOST", "https://os-cms.cern.ch:443/os") +CERT_PATH = os.environ.get("CERT_PATH", "/etc/pki/tls/certs/ca-bundle.trust.crt") + +STEP = os.environ.get("STEP", "3600") +START_DATE = os.environ.get("START_DATE", None) +END_DATE = os.environ.get("END_DATE", None) + +START = ( + datetime.strptime(START_DATE, "%Y-%m-%d") + if START_DATE and END_DATE + else datetime.now(timezone.utc) - timedelta(days=30) +) +END = ( + datetime.strptime(END_DATE, "%Y-%m-%d") + if START_DATE and END_DATE + else datetime.now(timezone.utc) +) From 204abb89407d03043ca256990e6b9d0af85ab7c5 Mon Sep 17 00:00:00 2001 From: carlosbogo Date: Mon, 11 Aug 2025 18:37:39 +0200 Subject: [PATCH 04/10] Create logging module --- docker/promxy-to-opensearch/src/logging.py | 23 ++++++++++++++++++++++ 1 file changed, 23 insertions(+) create mode 100644 docker/promxy-to-opensearch/src/logging.py diff --git a/docker/promxy-to-opensearch/src/logging.py b/docker/promxy-to-opensearch/src/logging.py new file mode 100644 index 00000000..f4664508 --- /dev/null +++ b/docker/promxy-to-opensearch/src/logging.py @@ -0,0 +1,23 @@ +import logging +import src.constants as const + +APP_NAME = "promxy_to_opensearch" + +def _get_logger() -> logging.Logger: + logger = logging.getLogger(APP_NAME) + logger.setLevel(const.LOG_LEVEL) + + # TODO: Look into redirecting full logs to logstash or some other cloud tool + fh = logging.FileHandler(f'{__name__}.log') + fh.setLevel(logging.DEBUG) + ch = logging.StreamHandler() + ch.setLevel(const.LOG_LEVEL) + formatter = logging.Formatter('%(asctime)s - %(name)s - %(levelname)s - %(message)s') + fh.setFormatter(formatter) + ch.setFormatter(formatter) + + logger.addHandler(fh) + logger.addHandler(ch) + return logger + +logger: logging.Logger = _get_logger() \ No newline at end of file From f4f53c9b13fc91a5f40cc9e8728bddf77ee5f658 Mon Sep 17 00:00:00 2001 From: carlosbogo Date: Mon, 11 Aug 2025 18:37:57 +0200 Subject: [PATCH 05/10] Create helpers module --- docker/promxy-to-opensearch/src/helpers.py | 16 ++++++++++++++++ 1 file changed, 16 insertions(+) create mode 100644 docker/promxy-to-opensearch/src/helpers.py diff --git a/docker/promxy-to-opensearch/src/helpers.py b/docker/promxy-to-opensearch/src/helpers.py new file mode 100644 index 00000000..b6bc5506 --- /dev/null +++ b/docker/promxy-to-opensearch/src/helpers.py @@ -0,0 +1,16 @@ +from datetime import datetime, timedelta +from typing import Optional + + +def generate_date_ranges( + start: Optional[datetime], end: Optional[datetime] +) -> list[tuple[datetime, datetime]]: + if start > end: + raise ValueError(f"Start date: {start} cannot be later than end date: {end}.") + ranges = [] + current_start = start + while current_start <= end: + current_end = min(current_start + timedelta(days=30), end) + ranges.append((current_start, current_end)) + current_start = current_end + timedelta(days=1) + return ranges From 33cb03d3d7198884411734f01f0a172412b1e1b6 Mon Sep 17 00:00:00 2001 From: carlosbogo Date: Mon, 11 Aug 2025 18:38:22 +0200 Subject: [PATCH 06/10] Create opensearch module --- docker/promxy-to-opensearch/src/os_utils.py | 21 +++++++++++++++++++++ 1 file changed, 21 insertions(+) create mode 100644 docker/promxy-to-opensearch/src/os_utils.py diff --git a/docker/promxy-to-opensearch/src/os_utils.py b/docker/promxy-to-opensearch/src/os_utils.py new file mode 100644 index 00000000..b0a23d3a --- /dev/null +++ b/docker/promxy-to-opensearch/src/os_utils.py @@ -0,0 +1,21 @@ +from typing import Iterable +from opensearchpy import OpenSearch, helpers, RequestsHttpConnection +from requests_gssapi import HTTPSPNEGOAuth, OPTIONAL + +def get_opensearch_client(os_host: str, ca_cert_path: str) -> int: + return OpenSearch( + [os_host], + use_ssl=True, + verify_certs=True, + ca_certs=ca_cert_path, + connection_class=RequestsHttpConnection, + http_auth=HTTPSPNEGOAuth(mutual_authentication=OPTIONAL), + ) + + +def os_upload_docs_in_bulk(os_host: str, ca_cert_path: str, doc_iterable: Iterable): + os_client = get_opensearch_client(os_host=os_host, ca_cert_path=ca_cert_path) + + success, failures = helpers.bulk(os_client, doc_iterable, max_retries=3) + + return success, failures From 1168848d754a3f59402afa4face35fd02ebe80bc Mon Sep 17 00:00:00 2001 From: carlosbogo Date: Mon, 11 Aug 2025 18:38:39 +0200 Subject: [PATCH 07/10] Create promxy querying module --- docker/promxy-to-opensearch/src/promxy.py | 77 +++++++++++++++++++++++ 1 file changed, 77 insertions(+) create mode 100644 docker/promxy-to-opensearch/src/promxy.py diff --git a/docker/promxy-to-opensearch/src/promxy.py b/docker/promxy-to-opensearch/src/promxy.py new file mode 100644 index 00000000..5609b0a1 --- /dev/null +++ b/docker/promxy-to-opensearch/src/promxy.py @@ -0,0 +1,77 @@ +import requests +import sys +import datetime +from typing import TypedDict, Literal, Union, Iterator, Any + +from src.logging import logger + + +# TODO: Pass metrics as ConfigMaps to the K8s cronjobs +# Also can be extended to all other parameters +class RucioUsageFields(TypedDict): + __name__: str + aggregate: str + country: str + job: str + rse: str + rse_type: str + rucioInstance: str + source: str + + +class PromSeriesData(TypedDict): + metric: RucioUsageFields + values: list[list[int, str]] + + +class InnerData(TypedDict): + result: list[PromSeriesData] + + +class RangeQueryResponse(TypedDict): + status: Literal["success"] + data: dict[str, Union[str, list[InnerData]]] + + +def query_promxy(promxy_url: str, params: dict[str, str]) -> RangeQueryResponse: + logger.info(f"Querying Promxy: {promxy_url}") + + try: + resp = requests.get( + promxy_url, + params=params, + ) + resp.raise_for_status() + except requests.RequestException as e: + logger.error(f"Failed to fetch from Promxy: {e}") + sys.exit(1) + + logger.debug(f"Final URL: {resp.url}") + logger.debug(f"Promxy response status: {resp.status_code}") + # logging.debug(f"Promxy response sample: {resp.json()['data']['result'][:5]}") + + return resp.json() + + +def generate_os_docs(responses: list[RangeQueryResponse], os_index: str) -> Iterator[dict[str:Any]]: + for response in responses: + for series in response["data"]["result"]: + fields = series.get("metric", {}) + values = series.get("values", []) + for ts, val in values: + yield { + "_op_type": "index", + "_index": os_index, + "_source": { + "timestamp": datetime.datetime.fromtimestamp( + int(float(ts)), datetime.timezone.utc + ).isoformat() + + "Z", + "rse": fields.get("rse"), + "rse_type": fields.get("rse_type"), + "country": fields.get("country"), + "rucioInstance": fields.get("rucioInstance"), + "source": fields.get("source"), + "used_bytes": float(val), + }, + } From 226b2e7c10c9f65f275460bf6e8d80a43d3010f6 Mon Sep 17 00:00:00 2001 From: carlosbogo Date: Mon, 11 Aug 2025 18:39:05 +0200 Subject: [PATCH 08/10] Write main python script --- .../promxy_to_opensearch.py | 61 +++++++++++++++++++ 1 file changed, 61 insertions(+) create mode 100644 docker/promxy-to-opensearch/promxy_to_opensearch.py diff --git a/docker/promxy-to-opensearch/promxy_to_opensearch.py b/docker/promxy-to-opensearch/promxy_to_opensearch.py new file mode 100644 index 00000000..7c75b958 --- /dev/null +++ b/docker/promxy-to-opensearch/promxy_to_opensearch.py @@ -0,0 +1,61 @@ +from logging import DEBUG +import itertools +import src.constants as const +from src.promxy import query_promxy, generate_os_docs +from src.helpers import generate_date_ranges +from src.os_utils import os_upload_docs_in_bulk +from src.logging import logger + + +logger.debug( + f""" +Environment Configuration: + PROMXY_URL = {const.PROMXY_URL} + PROM_QUERY = {const.PROM_QUERY} + OPENSEARCH_INDEX = {const.INDEX} + OPENSEARCH_HOST = {const.OS_HOST} + CERT_PATH = {const.CERT_PATH} + START_DATE = {const.START_DATE} + END_DATE = {const.END_DATE} + STEP = {const.STEP} +""" +) + +# PromQL gives inconsistent responses for query ranges spanning longer +# than 30 days, so we split our date range in smaller chunks if necessary. +date_ranges = generate_date_ranges(const.START, const.END) +logger.debug(f"Date ranges: {date_ranges}") +responses = [] +for range_start, range_end in date_ranges: + logger.info(f"Current date range: {range_start.date()} - {range_end.date()}") + promxy_request_params = { + "query": const.PROM_QUERY, + "start": int(range_start.timestamp()), + "end": int(range_end.timestamp()), + "step": const.STEP, + } + responses.append( + query_promxy(promxy_url=const.PROMXY_URL, params=promxy_request_params) + ) + +if const.DRY_RUN: + n_samples = 2 + generator = generate_os_docs(responses=responses, os_index=const.INDEX) + doc_samples = list(itertools.islice(generator, n_samples)) + total_documents = n_samples + sum(1 for _ in generator) + + logger.info(f"Document samples: {doc_samples}") + logger.info(f"[DRY RUN] {total_documents} docs generated.") +else: + logger.info(f"Uploading to OpenSearch: {const.OS_HOST},\n Index: {const.INDEX}") + successes, failures = os_upload_docs_in_bulk( + os_host=const.OS_HOST, + ca_cert_path=const.CERT_PATH, + doc_iterable=generate_os_docs(responses=responses, os_index=const.INDEX), + ) + logger.info(f"Uploaded {successes} docs.") + if failures: + if logger.getEffectiveLevel() == DEBUG: + for failure in failures: + logger.debug(f"Failure detail: {failure}") + logger.error(f"{len(failures)} OpenSearch index failures occurred.") From e2142b31d2c846905b955cd4d90135a75bbf8a6b Mon Sep 17 00:00:00 2001 From: carlosbogo Date: Fri, 22 Aug 2025 18:16:24 +0200 Subject: [PATCH 09/10] Add README.md --- docker/promxy-to-opensearch/README.md | 34 +++++++++++++++++++++++++++ 1 file changed, 34 insertions(+) create mode 100644 docker/promxy-to-opensearch/README.md diff --git a/docker/promxy-to-opensearch/README.md b/docker/promxy-to-opensearch/README.md new file mode 100644 index 00000000..b01f583f --- /dev/null +++ b/docker/promxy-to-opensearch/README.md @@ -0,0 +1,34 @@ +# Promxy to Opensearch + +## Purpose + +This is the codebase for the `promxy-to-opensearch` docker image inside the cmsmonitoring repository of the CERN docker registry. + +This image's main purpose is to serve as a flexible base for creating Kubernetes cronjobs that handle data pipelines between Promxy and OpenSearch. + +## Deployment + +The code for deploying the cronjob in charge of the data loads is in the [CMSKubernetes repository](https://github.com/dmwm/CMSKubernetes/blob/master/kubernetes/monitoring/crons/promxy-to-opensearch.yaml). +[Comment] # TODO Update the readme when the cronjob is moved to terraform + +To deploy or update the cronjob using the yaml, you have to first set the proper `kubectl` context and then apply the yaml with the changes you made by running the following: + +``` bash +export KUBECONFIG= +kubectl apply -f +``` + +The most common update to the cronjob will be changing the data it pushes to Opensearch. This can be done by changing the PromQL query passed through the cronjob environment variables. The full list of the environment variables with their respective usage is: + +- PROMXY_URL: Url to use as the Promxy endpoint. Can be modified to choose between different Promxy instances or to switch between using the `query` or the `query_range` parameter (more info in the [official documentation](https://prometheus.io/docs/prometheus/latest/querying/basics/)).The default value for this variable is `"http://cms-monitoring.cern.ch:30082/api/v1/query_range"`. +- PROM_QUERY: PromQL query to be used when sending the request to Promxy. It can be used to retrieve raw data from a metric or a recording rule, or as a first filter before uploading any data to a different database. Its default value is `"avg_over_time:rucio_report_used_space:1h"` +- OPENSEARCH_HOST: Opensearch instance url to where we will send the data. The default value is `"https://os-cms.cern.ch:443/os"`. +- OPENSEARCH_INDEX: Index to which we will write the data inside Opensearch. +- KRB5_CLIENT_KTNAME: Path to the keytab file to be used for Kerberos authentication in all of CERN internal services. The default value is `"/etc/secrets/keytab"`, and the file is mounted from a Kubernetes secret. +- START_DATE: Start of the time range that we want the data from. If not specified, the time range will be set to the past month. +- END_DATE: End of the time range that we want the data from. If not specified, the time range will be set to the past month. +- STEP: Time step to be used when querying Promxy, in seconds. Smaller values means more data points but might lead to data duplication, while higher values will gather less data but risk missing some data points. The optimal value depends on the granularity of the time series (i.e. 3600 for hourly series or 86400 for daily ones). +- DEBUG_MODE: If `"true"`, sets the logging level to DEBUG, increasing the verbosity of the logs. +- DRY_RUN: If `"true"`, runs the workload without uploading the data to Opensearch. Useful for debugging purposes whithout getting wrong or duplicate data inside Opensearch. + +[Comment] # TODO Add setup instructions for local run. \ No newline at end of file From c7821c3b2c31658791329bc97ab3cfb2d2750104 Mon Sep 17 00:00:00 2001 From: carlosbogo Date: Tue, 9 Sep 2025 10:11:38 +0200 Subject: [PATCH 10/10] Add tier field to OS data --- docker/promxy-to-opensearch/src/promxy.py | 1 + 1 file changed, 1 insertion(+) diff --git a/docker/promxy-to-opensearch/src/promxy.py b/docker/promxy-to-opensearch/src/promxy.py index 5609b0a1..5913db1f 100644 --- a/docker/promxy-to-opensearch/src/promxy.py +++ b/docker/promxy-to-opensearch/src/promxy.py @@ -69,6 +69,7 @@ def generate_os_docs(responses: list[RangeQueryResponse], os_index: str) -> Iter + "Z", "rse": fields.get("rse"), "rse_type": fields.get("rse_type"), + "tier": fields.get("rse")[:2], "country": fields.get("country"), "rucioInstance": fields.get("rucioInstance"), "source": fields.get("source"),