Skip to content

Commit

Permalink
Merge pull request #13 from talatuyarer/master
Browse files Browse the repository at this point in the history
Upgrade Dependencies and Added new Metrics
  • Loading branch information
Nicolas Maquet authored Nov 21, 2021
2 parents 0959866 + 962e9ce commit 457859b
Show file tree
Hide file tree
Showing 5 changed files with 55 additions and 23 deletions.
3 changes: 2 additions & 1 deletion Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -16,5 +16,6 @@ ENV PORT=8080
ENV BROKERS=dockerhost
ENV TOPIC=samza-metrics
ENV INCLUDE_JOBS_REGEX=.*
ENV GAUGES_TTL=600

CMD python -u /usr/local/bin/samza-prometheus-exporter --brokers $BROKERS --port $PORT --topic $TOPIC --include-jobs-regex "$INCLUDE_JOBS_REGEX"
CMD python -u /usr/local/bin/samza-prometheus-exporter --brokers $BROKERS --port $PORT --topic $TOPIC --include-jobs-regex "$INCLUDE_JOBS_REGEX" --ttl $GAUGES_TTL
4 changes: 2 additions & 2 deletions requirements.txt
Original file line number Diff line number Diff line change
@@ -1,2 +1,2 @@
kafka_python==1.3.3
prometheus_client==0.0.19
kafka_python==2.0.2
prometheus_client==0.12.0
41 changes: 23 additions & 18 deletions samza_prometheus_exporter/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,19 +19,24 @@
GAUGES_TTL = 60

def metric_name_escape(name):
return name.replace(".", "_").replace("-", "_").replace(" ", "_")
return re.sub("[^a-zA-Z0-9_:]", "_", name)

def setGaugeValue(name, labels, labelValues, value, description = ""):
with GAUGES_LOCK:
name = metric_name_escape(name)
if name not in GAUGES:
GAUGES[name] = Gauge(name, description, labels)
if labels:
GAUGES[name].labels(*labelValues).set(value)
GAUGES_LABELS_LAST_UPDATE[(name, tuple(labelValues))] = time.time()
else:
GAUGES[name].set(value)
GAUGES_LAST_UPDATE[name] = time.time()
try:
name = metric_name_escape(name)
if name not in GAUGES:
GAUGES[name] = Gauge(name, description, labels)
if labels:
GAUGES[name].labels(*labelValues).set(value)
GAUGES_LABELS_LAST_UPDATE[(name, tuple(labelValues))] = time.time()
else:
GAUGES[name].set(value)
GAUGES_LAST_UPDATE[name] = time.time()
except ValueError as err:
print(f"Unexpected {err=}, {type(err)=}")
print("Metric name: " + name + ", Metric labels: " + str(labels))
pass

def process_metric(host, job_name, container_name, task_name, metric_class_name, metric_name, metric_value):
try:
Expand Down Expand Up @@ -85,8 +90,12 @@ def process_message(message, consumer, brokers, include_jobs_re):

def consume_topic(consumer, brokers, include_jobs_re):
print('Starting consumption loop.')
for message in consumer:
process_message(message, consumer, brokers, include_jobs_re)
while True:
messages = consumer.poll()
for topic_partition in messages.items():
partition_messages = topic_partition[1]
for message in partition_messages:
process_message(message, consumer, brokers, include_jobs_re)

def set_gauges_ttl(ttl):
global GAUGES_TTL
Expand All @@ -100,6 +109,7 @@ def start_ttl_watchdog_thread():
def ttl_watchdog_unregister_old_metrics(now):
for (name, last_update) in list(GAUGES_LAST_UPDATE.items()):
if now - last_update > GAUGES_TTL:
print('INFO: Samza metric deleted: %s' % (name))
REGISTRY.unregister(GAUGES[name])
del GAUGES[name]
del GAUGES_LAST_UPDATE[name]
Expand Down Expand Up @@ -129,23 +139,18 @@ def main():
help='port to serve metrics to Prometheus (default: 8080)')
parser.add_argument('--topic', metavar='TOPIC', type=str, nargs='?',default='samza-metrics',
help='name of topic to consume (default: "samza-metrics")')
parser.add_argument('--from-beginning', action='store_const', const=True,
help='consume topic from offset 0')
parser.add_argument('--include-jobs-regex', metavar='INCLUDE_JOBS_REGEX', type=str, nargs='?', default='.*',
help='only include jobs which match the given regex')
parser.add_argument('--ttl', metavar='GAUGES_TTL', type=int, nargs='?',
help='time in seconds after which a metric (or label set) is no longer reported when not updated (default: 60s)')

args = parser.parse_args()
brokers = args.brokers.split(',')
consumer = KafkaConsumer(args.topic, group_id=KAFKA_GROUP_ID, bootstrap_servers=brokers)
consumer = KafkaConsumer(args.topic, group_id=KAFKA_GROUP_ID, bootstrap_servers=brokers, auto_offset_reset='earliest')
start_http_server(args.port)

set_gauges_ttl(args.ttl)

if args.from_beginning:
consumer.seek_to_beginning()

start_ttl_watchdog_thread()

try:
Expand Down
26 changes: 26 additions & 0 deletions samza_prometheus_exporter/samza.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,14 @@ def store_metric(match):
}
}

def operator_metric(match):
return {
'name': match.group(2),
'labels': {
'operator': match.group(1)
}
}

def system_metric(match):
return {
'name': match.group(2),
Expand Down Expand Up @@ -63,6 +71,16 @@ def system_stream_partition_metric(match):
'partition': match.group(3)
}
}

def system_stream_partition_metric_with_prefix(match):
return {
'name': 'system-stream-partition-' + match.group(4),
'labels': {
'system': match.group(1),
'stream': match.group(2),
'partition': match.group(3)
}
}

def partition_metric(match):
return {
Expand Down Expand Up @@ -233,6 +251,9 @@ def elasticsearch_system_metric(match):
(re('partition (\d+)-(lookup-restore-time)'), partition_metric),
(re('partition (\d+)-(restore-time)'), partition_metric),
(re('partition (\d+)-(.*)-(restore-time)'), partition_store_metric),
(re('systemstreampartition \[(.*), (.*), (.*)\]-(object-restore-time)'), system_stream_partition_metric_with_prefix),
(re('systemstreampartition \[(.*), (.*), (.*)\]-(lookup-restore-time)'), system_stream_partition_metric_with_prefix),
(re('systemstreampartition \[(.*), (.*), (.*)\]-(restore-time)'), system_stream_partition_metric_with_prefix),
},
'org.apache.samza.storage.kv.KeyValueStoreMetrics': {
(re('(.*)-(bytes-read)'), store_metric),
Expand Down Expand Up @@ -408,4 +429,9 @@ def elasticsearch_system_metric(match):
'job-coordinator': {
(re('(.*)-(partitionCount)'), store_metric),
},
'org.apache.samza.operators.impl.OperatorImpl': {
(re('(.*)-(messages)'), operator_metric),
(re('(.*)-(handle-message-ns)'), operator_metric),
(re('(.*)-(handle-timer-ns)'), operator_metric),
},
}
4 changes: 2 additions & 2 deletions setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,8 @@
keywords='monitoring prometheus exporter apache samza',
packages=find_packages(exclude=['tests']),
install_requires=[
'kafka-python',
'prometheus-client>=0.0.13'
'kafka-python=2.0.2',
'prometheus-client>=0.12.0'
],
entry_points={
'console_scripts': [
Expand Down

0 comments on commit 457859b

Please sign in to comment.