Skip to content

Commit 7b99592

Browse files
Added compression support in pushgateway (#1144)
* feat(): Added compression support in pushgateway Signed-off-by: ritesh-avesha <ritesh@aveshasystems.com> * fix(): Incorporated changes for PR review comments Signed-off-by: ritesh-avesha <ritesh@aveshasystems.com> * fix(): Incorporated changes for PR review comments, lint issues Signed-off-by: ritesh-avesha <ritesh@aveshasystems.com> * fix(): lint issues Signed-off-by: ritesh-avesha <ritesh@aveshasystems.com> --------- Signed-off-by: ritesh-avesha <ritesh@aveshasystems.com>
1 parent 13df124 commit 7b99592

File tree

4 files changed

+96
-10
lines changed

4 files changed

+96
-10
lines changed

docs/content/exporting/pushgateway.md

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -54,6 +54,20 @@ g.set_to_current_time()
5454
push_to_gateway('localhost:9091', job='batchA', registry=registry, handler=my_auth_handler)
5555
```
5656

57+
# Compressing data before sending to pushgateway
58+
Pushgateway (version >= 1.5.0) supports gzip and snappy compression (v > 1.6.0). This can help in network constrained environments.
59+
To compress a push request, set the `compression` argument to `'gzip'` or `'snappy'`:
60+
```python
61+
push_to_gateway(
62+
'localhost:9091',
63+
job='batchA',
64+
registry=registry,
65+
handler=my_auth_handler,
66+
compression='gzip',
67+
)
68+
```
69+
Snappy compression requires the optional [`python-snappy`](https://github.com/andrix/python-snappy) package.
70+
5771
TLS Auth is also supported when using the push gateway with a special handler.
5872

5973
```python

prometheus_client/exposition.py

Lines changed: 56 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,9 @@
99
import ssl
1010
import sys
1111
import threading
12-
from typing import Any, Callable, Dict, List, Optional, Sequence, Tuple, Union
12+
from typing import (
13+
Any, Callable, Dict, List, Literal, Optional, Sequence, Tuple, Union,
14+
)
1315
from urllib.error import HTTPError
1416
from urllib.parse import parse_qs, quote_plus, urlparse
1517
from urllib.request import (
@@ -22,6 +24,13 @@
2224
from .registry import Collector, REGISTRY
2325
from .utils import floatToGoString, parse_version
2426

27+
try:
28+
import snappy # type: ignore
29+
SNAPPY_AVAILABLE = True
30+
except ImportError:
31+
snappy = None # type: ignore
32+
SNAPPY_AVAILABLE = False
33+
2534
__all__ = (
2635
'CONTENT_TYPE_LATEST',
2736
'CONTENT_TYPE_PLAIN_0_0_4',
@@ -46,6 +55,7 @@
4655
"""Content type of the latest format"""
4756

4857
CONTENT_TYPE_LATEST = CONTENT_TYPE_PLAIN_1_0_0
58+
CompressionType = Optional[Literal['gzip', 'snappy']]
4959

5060

5161
class _PrometheusRedirectHandler(HTTPRedirectHandler):
@@ -596,6 +606,7 @@ def push_to_gateway(
596606
grouping_key: Optional[Dict[str, Any]] = None,
597607
timeout: Optional[float] = 30,
598608
handler: Callable = default_handler,
609+
compression: CompressionType = None,
599610
) -> None:
600611
"""Push metrics to the given pushgateway.
601612
@@ -632,10 +643,12 @@ def push_to_gateway(
632643
failure.
633644
'content' is the data which should be used to form the HTTP
634645
Message Body.
646+
`compression` selects the payload compression. Supported values are 'gzip'
647+
and 'snappy'. Defaults to None (no compression).
635648
636649
This overwrites all metrics with the same job and grouping_key.
637650
This uses the PUT HTTP method."""
638-
_use_gateway('PUT', gateway, job, registry, grouping_key, timeout, handler)
651+
_use_gateway('PUT', gateway, job, registry, grouping_key, timeout, handler, compression)
639652

640653

641654
def pushadd_to_gateway(
@@ -645,6 +658,7 @@ def pushadd_to_gateway(
645658
grouping_key: Optional[Dict[str, Any]] = None,
646659
timeout: Optional[float] = 30,
647660
handler: Callable = default_handler,
661+
compression: CompressionType = None,
648662
) -> None:
649663
"""PushAdd metrics to the given pushgateway.
650664
@@ -663,10 +677,12 @@ def pushadd_to_gateway(
663677
will be carried out by a default handler.
664678
See the 'prometheus_client.push_to_gateway' documentation
665679
for implementation requirements.
680+
`compression` selects the payload compression. Supported values are 'gzip'
681+
and 'snappy'. Defaults to None (no compression).
666682
667683
This replaces metrics with the same name, job and grouping_key.
668684
This uses the POST HTTP method."""
669-
_use_gateway('POST', gateway, job, registry, grouping_key, timeout, handler)
685+
_use_gateway('POST', gateway, job, registry, grouping_key, timeout, handler, compression)
670686

671687

672688
def delete_from_gateway(
@@ -706,6 +722,7 @@ def _use_gateway(
706722
grouping_key: Optional[Dict[str, Any]],
707723
timeout: Optional[float],
708724
handler: Callable,
725+
compression: CompressionType = None,
709726
) -> None:
710727
gateway_url = urlparse(gateway)
711728
# See https://bugs.python.org/issue27657 for details on urlparse in py>=3.7.6.
@@ -715,24 +732,53 @@ def _use_gateway(
715732
gateway = gateway.rstrip('/')
716733
url = '{}/metrics/{}/{}'.format(gateway, *_escape_grouping_key("job", job))
717734

718-
data = b''
719-
if method != 'DELETE':
720-
if registry is None:
721-
registry = REGISTRY
722-
data = generate_latest(registry)
723-
724735
if grouping_key is None:
725736
grouping_key = {}
726737
url += ''.join(
727738
'/{}/{}'.format(*_escape_grouping_key(str(k), str(v)))
728739
for k, v in sorted(grouping_key.items()))
729740

741+
data = b''
742+
headers: List[Tuple[str, str]] = []
743+
if method != 'DELETE':
744+
if registry is None:
745+
registry = REGISTRY
746+
data = generate_latest(registry)
747+
data, headers = _compress_payload(data, compression)
748+
else:
749+
# DELETE requests still need Content-Type header per test expectations
750+
headers = [('Content-Type', CONTENT_TYPE_PLAIN_0_0_4)]
751+
if compression is not None:
752+
raise ValueError('Compression is not supported for DELETE requests.')
753+
730754
handler(
731755
url=url, method=method, timeout=timeout,
732-
headers=[('Content-Type', CONTENT_TYPE_PLAIN_0_0_4)], data=data,
756+
headers=headers, data=data,
733757
)()
734758

735759

760+
def _compress_payload(data: bytes, compression: CompressionType) -> Tuple[bytes, List[Tuple[str, str]]]:
761+
headers = [('Content-Type', CONTENT_TYPE_PLAIN_0_0_4)]
762+
if compression is None:
763+
return data, headers
764+
765+
encoding = compression.lower()
766+
if encoding == 'gzip':
767+
headers.append(('Content-Encoding', 'gzip'))
768+
return gzip.compress(data), headers
769+
if encoding == 'snappy':
770+
if not SNAPPY_AVAILABLE:
771+
raise RuntimeError('Snappy compression requires the python-snappy package to be installed.')
772+
headers.append(('Content-Encoding', 'snappy'))
773+
compressor = snappy.StreamCompressor()
774+
compressed = compressor.compress(data)
775+
flush = getattr(compressor, 'flush', None)
776+
if callable(flush):
777+
compressed += flush()
778+
return compressed, headers
779+
raise ValueError(f"Unsupported compression type: {compression}")
780+
781+
736782
def _escape_grouping_key(k, v):
737783
if v == "":
738784
# Per https://github.com/prometheus/pushgateway/pull/346.

tests/test_exposition.py

Lines changed: 25 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,4 @@
1+
import gzip
12
from http.server import BaseHTTPRequestHandler, HTTPServer
23
import os
34
import threading
@@ -404,6 +405,30 @@ def test_push_with_trailing_slash(self):
404405

405406
self.assertNotIn('//', self.requests[0][0].path)
406407

408+
def test_push_with_gzip_compression(self):
409+
push_to_gateway(self.address, "my_job", self.registry, compression='gzip')
410+
request, body = self.requests[0]
411+
self.assertEqual(request.headers.get('content-encoding'), 'gzip')
412+
decompressed = gzip.decompress(body)
413+
self.assertEqual(decompressed, b'# HELP g help\n# TYPE g gauge\ng 0.0\n')
414+
415+
def test_push_with_snappy_compression(self):
416+
snappy = pytest.importorskip('snappy')
417+
push_to_gateway(self.address, "my_job", self.registry, compression='snappy')
418+
request, body = self.requests[0]
419+
self.assertEqual(request.headers.get('content-encoding'), 'snappy')
420+
decompressor = snappy.StreamDecompressor()
421+
decompressed = decompressor.decompress(body)
422+
flush = getattr(decompressor, 'flush', None)
423+
if callable(flush):
424+
decompressed += flush()
425+
self.assertEqual(decompressed, b'# HELP g help\n# TYPE g gauge\ng 0.0\n')
426+
427+
def test_push_with_invalid_compression(self):
428+
with self.assertRaisesRegex(ValueError, 'Unsupported compression type'):
429+
push_to_gateway(self.address, "my_job", self.registry, compression='brotli')
430+
self.assertEqual(self.requests, [])
431+
407432
def test_instance_ip_grouping_key(self):
408433
self.assertTrue('' != instance_ip_grouping_key()['instance'])
409434

tox.ini

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@ deps =
1010
attrs
1111
{py3.9,pypy3.9}: twisted
1212
{py3.9,pypy3.9}: aiohttp
13+
{py3.9}: python-snappy
1314
commands = coverage run --parallel -m pytest {posargs}
1415

1516
[testenv:py3.9-nooptionals]

0 commit comments

Comments
 (0)