diff --git a/Dockerfile b/Dockerfile index acf3cbc..dfdf736 100644 --- a/Dockerfile +++ b/Dockerfile @@ -16,5 +16,6 @@ ENV PORT=8080 ENV BROKERS=dockerhost ENV TOPIC=samza-metrics ENV INCLUDE_JOBS_REGEX=.* +ENV GAUGES_TTL=600 -CMD python -u /usr/local/bin/samza-prometheus-exporter --brokers $BROKERS --port $PORT --topic $TOPIC --include-jobs-regex "$INCLUDE_JOBS_REGEX" +CMD python -u /usr/local/bin/samza-prometheus-exporter --brokers $BROKERS --port $PORT --topic $TOPIC --include-jobs-regex "$INCLUDE_JOBS_REGEX" --ttl $GAUGES_TTL diff --git a/requirements.txt b/requirements.txt index a192b7a..817bdf0 100644 --- a/requirements.txt +++ b/requirements.txt @@ -1,2 +1,2 @@ -kafka_python==1.3.3 -prometheus_client==0.0.19 +kafka_python==2.0.2 +prometheus_client==0.12.0 diff --git a/samza_prometheus_exporter/__init__.py b/samza_prometheus_exporter/__init__.py index bcfbd94..72750e4 100644 --- a/samza_prometheus_exporter/__init__.py +++ b/samza_prometheus_exporter/__init__.py @@ -19,19 +19,24 @@ GAUGES_TTL = 60 def metric_name_escape(name): - return name.replace(".", "_").replace("-", "_").replace(" ", "_") + return re.sub("[^a-zA-Z0-9_:]", "_", name) def setGaugeValue(name, labels, labelValues, value, description = ""): with GAUGES_LOCK: - name = metric_name_escape(name) - if name not in GAUGES: - GAUGES[name] = Gauge(name, description, labels) - if labels: - GAUGES[name].labels(*labelValues).set(value) - GAUGES_LABELS_LAST_UPDATE[(name, tuple(labelValues))] = time.time() - else: - GAUGES[name].set(value) - GAUGES_LAST_UPDATE[name] = time.time() + try: + name = metric_name_escape(name) + if name not in GAUGES: + GAUGES[name] = Gauge(name, description, labels) + if labels: + GAUGES[name].labels(*labelValues).set(value) + GAUGES_LABELS_LAST_UPDATE[(name, tuple(labelValues))] = time.time() + else: + GAUGES[name].set(value) + GAUGES_LAST_UPDATE[name] = time.time() + except ValueError as err: + print(f"Unexpected {err=}, {type(err)=}") + print("Metric name: " + name + ", Metric labels: " + str(labels)) + pass def process_metric(host, job_name, container_name, task_name, metric_class_name, metric_name, metric_value): try: @@ -85,8 +90,12 @@ def process_message(message, consumer, brokers, include_jobs_re): def consume_topic(consumer, brokers, include_jobs_re): print('Starting consumption loop.') - for message in consumer: - process_message(message, consumer, brokers, include_jobs_re) + while True: + messages = consumer.poll() + for topic_partition in messages.items(): + partition_messages = topic_partition[1] + for message in partition_messages: + process_message(message, consumer, brokers, include_jobs_re) def set_gauges_ttl(ttl): global GAUGES_TTL @@ -100,6 +109,7 @@ def start_ttl_watchdog_thread(): def ttl_watchdog_unregister_old_metrics(now): for (name, last_update) in list(GAUGES_LAST_UPDATE.items()): if now - last_update > GAUGES_TTL: + print('INFO: Samza metric deleted: %s' % (name)) REGISTRY.unregister(GAUGES[name]) del GAUGES[name] del GAUGES_LAST_UPDATE[name] @@ -129,8 +139,6 @@ def main(): help='port to serve metrics to Prometheus (default: 8080)') parser.add_argument('--topic', metavar='TOPIC', type=str, nargs='?',default='samza-metrics', help='name of topic to consume (default: "samza-metrics")') - parser.add_argument('--from-beginning', action='store_const', const=True, - help='consume topic from offset 0') parser.add_argument('--include-jobs-regex', metavar='INCLUDE_JOBS_REGEX', type=str, nargs='?', default='.*', help='only include jobs which match the given regex') parser.add_argument('--ttl', metavar='GAUGES_TTL', type=int, nargs='?', @@ -138,14 +146,11 @@ def main(): args = parser.parse_args() brokers = args.brokers.split(',') - consumer = KafkaConsumer(args.topic, group_id=KAFKA_GROUP_ID, bootstrap_servers=brokers) + consumer = KafkaConsumer(args.topic, group_id=KAFKA_GROUP_ID, bootstrap_servers=brokers, auto_offset_reset='earliest') start_http_server(args.port) set_gauges_ttl(args.ttl) - if args.from_beginning: - consumer.seek_to_beginning() - start_ttl_watchdog_thread() try: diff --git a/samza_prometheus_exporter/samza.py b/samza_prometheus_exporter/samza.py index 51612ec..d4feb2a 100644 --- a/samza_prometheus_exporter/samza.py +++ b/samza_prometheus_exporter/samza.py @@ -19,6 +19,14 @@ def store_metric(match): } } +def operator_metric(match): + return { + 'name': match.group(2), + 'labels': { + 'operator': match.group(1) + } + } + def system_metric(match): return { 'name': match.group(2), @@ -63,6 +71,16 @@ def system_stream_partition_metric(match): 'partition': match.group(3) } } + +def system_stream_partition_metric_with_prefix(match): + return { + 'name': 'system-stream-partition-' + match.group(4), + 'labels': { + 'system': match.group(1), + 'stream': match.group(2), + 'partition': match.group(3) + } + } def partition_metric(match): return { @@ -233,6 +251,9 @@ def elasticsearch_system_metric(match): (re('partition (\d+)-(lookup-restore-time)'), partition_metric), (re('partition (\d+)-(restore-time)'), partition_metric), (re('partition (\d+)-(.*)-(restore-time)'), partition_store_metric), + (re('systemstreampartition \[(.*), (.*), (.*)\]-(object-restore-time)'), system_stream_partition_metric_with_prefix), + (re('systemstreampartition \[(.*), (.*), (.*)\]-(lookup-restore-time)'), system_stream_partition_metric_with_prefix), + (re('systemstreampartition \[(.*), (.*), (.*)\]-(restore-time)'), system_stream_partition_metric_with_prefix), }, 'org.apache.samza.storage.kv.KeyValueStoreMetrics': { (re('(.*)-(bytes-read)'), store_metric), @@ -408,4 +429,9 @@ def elasticsearch_system_metric(match): 'job-coordinator': { (re('(.*)-(partitionCount)'), store_metric), }, + 'org.apache.samza.operators.impl.OperatorImpl': { + (re('(.*)-(messages)'), operator_metric), + (re('(.*)-(handle-message-ns)'), operator_metric), + (re('(.*)-(handle-timer-ns)'), operator_metric), + }, } diff --git a/setup.py b/setup.py index 17f4b4b..8d4683b 100644 --- a/setup.py +++ b/setup.py @@ -20,8 +20,8 @@ keywords='monitoring prometheus exporter apache samza', packages=find_packages(exclude=['tests']), install_requires=[ - 'kafka-python', - 'prometheus-client>=0.0.13' + 'kafka-python=2.0.2', + 'prometheus-client>=0.12.0' ], entry_points={ 'console_scripts': [