Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
19 changes: 19 additions & 0 deletions docker/promxy-to-opensearch/Dockerfile
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
FROM gitlab-registry.cern.ch/linuxsupport/alma9-base

ENV PYTHONUNBUFFERED=1 \
PYTHONDONTWRITEBYTECODE=1 \
PATH=/usr/local/bin:$PATH \
PYTHONPATH=/app:$PYTHONPATH

WORKDIR /app

RUN dnf -y install epel-release
RUN dnf -y install python3-opensearch-py+kerberos
RUN dnf -y install python3-requests-gssapi

COPY src/ ./src
COPY promxy_to_opensearch.py .
COPY entrypoint.sh .

CMD ["/app/entrypoint.sh"]

34 changes: 34 additions & 0 deletions docker/promxy-to-opensearch/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
# Promxy to Opensearch

## Purpose

This is the codebase for the `promxy-to-opensearch` docker image inside the cmsmonitoring repository of the CERN docker registry.

This image's main purpose is to serve as a flexible base for creating Kubernetes cronjobs that handle data pipelines between Promxy and OpenSearch.

## Deployment

The code for deploying the cronjob in charge of the data loads is in the [CMSKubernetes repository](https://github.com/dmwm/CMSKubernetes/blob/master/kubernetes/monitoring/crons/promxy-to-opensearch.yaml).
[Comment] # TODO Update the readme when the cronjob is moved to terraform

To deploy or update the cronjob using the yaml, you have to first set the proper `kubectl` context and then apply the yaml with the changes you made by running the following:

``` bash
export KUBECONFIG=<path_to_kubeconfig>
kubectl apply -f <path_to_cronjob_yaml>
```

The most common update to the cronjob will be changing the data it pushes to Opensearch. This can be done by changing the PromQL query passed through the cronjob environment variables. The full list of the environment variables with their respective usage is:

- PROMXY_URL: Url to use as the Promxy endpoint. Can be modified to choose between different Promxy instances or to switch between using the `query` or the `query_range` parameter (more info in the [official documentation](https://prometheus.io/docs/prometheus/latest/querying/basics/)).The default value for this variable is `"http://cms-monitoring.cern.ch:30082/api/v1/query_range"`.
- PROM_QUERY: PromQL query to be used when sending the request to Promxy. It can be used to retrieve raw data from a metric or a recording rule, or as a first filter before uploading any data to a different database. Its default value is `"avg_over_time:rucio_report_used_space:1h"`
- OPENSEARCH_HOST: Opensearch instance url to where we will send the data. The default value is `"https://os-cms.cern.ch:443/os"`.
- OPENSEARCH_INDEX: Index to which we will write the data inside Opensearch.
- KRB5_CLIENT_KTNAME: Path to the keytab file to be used for Kerberos authentication in all of CERN internal services. The default value is `"/etc/secrets/keytab"`, and the file is mounted from a Kubernetes secret.
- START_DATE: Start of the time range that we want the data from. If not specified, the time range will be set to the past month.
- END_DATE: End of the time range that we want the data from. If not specified, the time range will be set to the past month.
- STEP: Time step to be used when querying Promxy, in seconds. Smaller values means more data points but might lead to data duplication, while higher values will gather less data but risk missing some data points. The optimal value depends on the granularity of the time series (i.e. 3600 for hourly series or 86400 for daily ones).
- DEBUG_MODE: If `"true"`, sets the logging level to DEBUG, increasing the verbosity of the logs.
- DRY_RUN: If `"true"`, runs the workload without uploading the data to Opensearch. Useful for debugging purposes whithout getting wrong or duplicate data inside Opensearch.

[Comment] # TODO Add setup instructions for local run.
22 changes: 22 additions & 0 deletions docker/promxy-to-opensearch/entrypoint.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
#!/bin/bash
set -euo pipefail

KEYTAB_PATH="${KRB5_CLIENT_KTNAME:-/etc/krb5.keytab}"

# Extract principal from keytab (last one listed)
principal=$(klist -k "$KEYTAB_PATH" | tail -1 | awk '{print $2}')

echo "[INFO] Using principal: $principal"

# Authenticate with Kerberos
if ! kinit "$principal" -k -t "$KEYTAB_PATH" >/dev/null; then
echo "[ERROR] Kerberos authentication failed using keytab: $KEYTAB_PATH"
exit 1
fi

# Strip @REALM to get short name (optional)
SHORT_NAME=$(echo "$principal" | grep -o '^[^@]*')
echo "[INFO] Kerberos auth succeeded. Principal short name: $SHORT_NAME"

python3 "/app/promxy_to_opensearch.py"

61 changes: 61 additions & 0 deletions docker/promxy-to-opensearch/promxy_to_opensearch.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,61 @@
from logging import DEBUG
import itertools
import src.constants as const
from src.promxy import query_promxy, generate_os_docs
from src.helpers import generate_date_ranges
from src.os_utils import os_upload_docs_in_bulk
from src.logging import logger


logger.debug(
f"""
Environment Configuration:
PROMXY_URL = {const.PROMXY_URL}
PROM_QUERY = {const.PROM_QUERY}
OPENSEARCH_INDEX = {const.INDEX}
OPENSEARCH_HOST = {const.OS_HOST}
CERT_PATH = {const.CERT_PATH}
START_DATE = {const.START_DATE}
END_DATE = {const.END_DATE}
STEP = {const.STEP}
"""
)

# PromQL gives inconsistent responses for query ranges spanning longer
# than 30 days, so we split our date range in smaller chunks if necessary.
date_ranges = generate_date_ranges(const.START, const.END)
logger.debug(f"Date ranges: {date_ranges}")
responses = []
for range_start, range_end in date_ranges:
logger.info(f"Current date range: {range_start.date()} - {range_end.date()}")
promxy_request_params = {
"query": const.PROM_QUERY,
"start": int(range_start.timestamp()),
"end": int(range_end.timestamp()),
"step": const.STEP,
}
responses.append(
query_promxy(promxy_url=const.PROMXY_URL, params=promxy_request_params)
)

if const.DRY_RUN:
n_samples = 2
generator = generate_os_docs(responses=responses, os_index=const.INDEX)
doc_samples = list(itertools.islice(generator, n_samples))
total_documents = n_samples + sum(1 for _ in generator)

logger.info(f"Document samples: {doc_samples}")
logger.info(f"[DRY RUN] {total_documents} docs generated.")
else:
logger.info(f"Uploading to OpenSearch: {const.OS_HOST},\n Index: {const.INDEX}")
successes, failures = os_upload_docs_in_bulk(
os_host=const.OS_HOST,
ca_cert_path=const.CERT_PATH,
doc_iterable=generate_os_docs(responses=responses, os_index=const.INDEX),
)
logger.info(f"Uploaded {successes} docs.")
if failures:
if logger.getEffectiveLevel() == DEBUG:
for failure in failures:
logger.debug(f"Failure detail: {failure}")
logger.error(f"{len(failures)} OpenSearch index failures occurred.")
33 changes: 33 additions & 0 deletions docker/promxy-to-opensearch/src/constants.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
import os
import logging
from datetime import datetime, timezone, timedelta


DEBUG_MODE = os.getenv("DEBUG_MODE", "false").lower() == "true"
LOG_LEVEL = logging.DEBUG if DEBUG_MODE else logging.INFO

DRY_RUN = os.getenv("DRY_RUN", "false").lower() == "true"

PROMXY_URL = os.environ.get(
"PROMXY_URL", "http://cms-monitoring.cern.ch:30082/api/v1/query_range"
)
PROM_QUERY = os.environ.get("PROM_QUERY", "avg_over_time:rucio_report_used_space:1h")
INDEX = os.environ.get("OPENSEARCH_INDEX", "rucio-used-space-1h")

OS_HOST = os.environ.get("OPENSEARCH_HOST", "https://os-cms.cern.ch:443/os")
CERT_PATH = os.environ.get("CERT_PATH", "/etc/pki/tls/certs/ca-bundle.trust.crt")

STEP = os.environ.get("STEP", "3600")
START_DATE = os.environ.get("START_DATE", None)
END_DATE = os.environ.get("END_DATE", None)

START = (
datetime.strptime(START_DATE, "%Y-%m-%d")
if START_DATE and END_DATE
else datetime.now(timezone.utc) - timedelta(days=30)
)
END = (
datetime.strptime(END_DATE, "%Y-%m-%d")
if START_DATE and END_DATE
else datetime.now(timezone.utc)
)
16 changes: 16 additions & 0 deletions docker/promxy-to-opensearch/src/helpers.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
from datetime import datetime, timedelta
from typing import Optional


def generate_date_ranges(
start: Optional[datetime], end: Optional[datetime]
) -> list[tuple[datetime, datetime]]:
if start > end:
raise ValueError(f"Start date: {start} cannot be later than end date: {end}.")
ranges = []
current_start = start
while current_start <= end:
current_end = min(current_start + timedelta(days=30), end)
ranges.append((current_start, current_end))
current_start = current_end + timedelta(days=1)
return ranges
23 changes: 23 additions & 0 deletions docker/promxy-to-opensearch/src/logging.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
import logging
import src.constants as const

APP_NAME = "promxy_to_opensearch"

def _get_logger() -> logging.Logger:
logger = logging.getLogger(APP_NAME)
logger.setLevel(const.LOG_LEVEL)

# TODO: Look into redirecting full logs to logstash or some other cloud tool
fh = logging.FileHandler(f'{__name__}.log')
fh.setLevel(logging.DEBUG)
ch = logging.StreamHandler()
ch.setLevel(const.LOG_LEVEL)
formatter = logging.Formatter('%(asctime)s - %(name)s - %(levelname)s - %(message)s')
fh.setFormatter(formatter)
ch.setFormatter(formatter)

logger.addHandler(fh)
logger.addHandler(ch)
return logger

logger: logging.Logger = _get_logger()
21 changes: 21 additions & 0 deletions docker/promxy-to-opensearch/src/os_utils.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
from typing import Iterable
from opensearchpy import OpenSearch, helpers, RequestsHttpConnection
from requests_gssapi import HTTPSPNEGOAuth, OPTIONAL

def get_opensearch_client(os_host: str, ca_cert_path: str) -> int:
return OpenSearch(
[os_host],
use_ssl=True,
verify_certs=True,
ca_certs=ca_cert_path,
connection_class=RequestsHttpConnection,
http_auth=HTTPSPNEGOAuth(mutual_authentication=OPTIONAL),
)


def os_upload_docs_in_bulk(os_host: str, ca_cert_path: str, doc_iterable: Iterable):
os_client = get_opensearch_client(os_host=os_host, ca_cert_path=ca_cert_path)

success, failures = helpers.bulk(os_client, doc_iterable, max_retries=3)

return success, failures
78 changes: 78 additions & 0 deletions docker/promxy-to-opensearch/src/promxy.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,78 @@
import requests
import sys
import datetime
from typing import TypedDict, Literal, Union, Iterator, Any

from src.logging import logger


# TODO: Pass metrics as ConfigMaps to the K8s cronjobs
# Also can be extended to all other parameters
class RucioUsageFields(TypedDict):
__name__: str
aggregate: str
country: str
job: str
rse: str
rse_type: str
rucioInstance: str
source: str


class PromSeriesData(TypedDict):
metric: RucioUsageFields
values: list[list[int, str]]


class InnerData(TypedDict):
result: list[PromSeriesData]


class RangeQueryResponse(TypedDict):
status: Literal["success"]
data: dict[str, Union[str, list[InnerData]]]


def query_promxy(promxy_url: str, params: dict[str, str]) -> RangeQueryResponse:
logger.info(f"Querying Promxy: {promxy_url}")

try:
resp = requests.get(
promxy_url,
params=params,
)
resp.raise_for_status()
except requests.RequestException as e:
logger.error(f"Failed to fetch from Promxy: {e}")
sys.exit(1)

logger.debug(f"Final URL: {resp.url}")
logger.debug(f"Promxy response status: {resp.status_code}")
# logging.debug(f"Promxy response sample: {resp.json()['data']['result'][:5]}")

return resp.json()


def generate_os_docs(responses: list[RangeQueryResponse], os_index: str) -> Iterator[dict[str:Any]]:
for response in responses:
for series in response["data"]["result"]:
fields = series.get("metric", {})
values = series.get("values", [])
for ts, val in values:
yield {
"_op_type": "index",
"_index": os_index,
"_source": {
"timestamp": datetime.datetime.fromtimestamp(
int(float(ts)), datetime.timezone.utc
).isoformat()
+ "Z",
"rse": fields.get("rse"),
"rse_type": fields.get("rse_type"),
"tier": fields.get("rse")[:2],
"country": fields.get("country"),
"rucioInstance": fields.get("rucioInstance"),
"source": fields.get("source"),
"used_bytes": float(val),
},
}