diff --git a/docs/content/exporting/pushgateway.md b/docs/content/exporting/pushgateway.md index bf5eb112..d9f9a945 100644 --- a/docs/content/exporting/pushgateway.md +++ b/docs/content/exporting/pushgateway.md @@ -54,6 +54,20 @@ g.set_to_current_time() push_to_gateway('localhost:9091', job='batchA', registry=registry, handler=my_auth_handler) ``` +# Compressing data before sending to pushgateway +Pushgateway (version >= 1.5.0) supports gzip and snappy compression (v > 1.6.0). This can help in network constrained environments. +To compress a push request, set the `compression` argument to `'gzip'` or `'snappy'`: +```python +push_to_gateway( + 'localhost:9091', + job='batchA', + registry=registry, + handler=my_auth_handler, + compression='gzip', +) +``` +Snappy compression requires the optional [`python-snappy`](https://github.com/andrix/python-snappy) package. + TLS Auth is also supported when using the push gateway with a special handler. ```python diff --git a/prometheus_client/exposition.py b/prometheus_client/exposition.py index 0d471707..404b1282 100644 --- a/prometheus_client/exposition.py +++ b/prometheus_client/exposition.py @@ -9,7 +9,9 @@ import ssl import sys import threading -from typing import Any, Callable, Dict, List, Optional, Sequence, Tuple, Union +from typing import ( + Any, Callable, Dict, List, Literal, Optional, Sequence, Tuple, Union, +) from urllib.error import HTTPError from urllib.parse import parse_qs, quote_plus, urlparse from urllib.request import ( @@ -22,6 +24,13 @@ from .registry import CollectorRegistry, REGISTRY from .utils import floatToGoString, parse_version +try: + import snappy # type: ignore + SNAPPY_AVAILABLE = True +except ImportError: + snappy = None # type: ignore + SNAPPY_AVAILABLE = False + __all__ = ( 'CONTENT_TYPE_LATEST', 'CONTENT_TYPE_PLAIN_0_0_4', @@ -46,6 +55,7 @@ """Content type of the latest format""" CONTENT_TYPE_LATEST = CONTENT_TYPE_PLAIN_1_0_0 +CompressionType = Optional[Literal['gzip', 'snappy']] class _PrometheusRedirectHandler(HTTPRedirectHandler): @@ -596,6 +606,7 @@ def push_to_gateway( grouping_key: Optional[Dict[str, Any]] = None, timeout: Optional[float] = 30, handler: Callable = default_handler, + compression: CompressionType = None, ) -> None: """Push metrics to the given pushgateway. @@ -632,10 +643,12 @@ def push_to_gateway( failure. 'content' is the data which should be used to form the HTTP Message Body. + `compression` selects the payload compression. Supported values are 'gzip' + and 'snappy'. Defaults to None (no compression). This overwrites all metrics with the same job and grouping_key. This uses the PUT HTTP method.""" - _use_gateway('PUT', gateway, job, registry, grouping_key, timeout, handler) + _use_gateway('PUT', gateway, job, registry, grouping_key, timeout, handler, compression) def pushadd_to_gateway( @@ -645,6 +658,7 @@ def pushadd_to_gateway( grouping_key: Optional[Dict[str, Any]] = None, timeout: Optional[float] = 30, handler: Callable = default_handler, + compression: CompressionType = None, ) -> None: """PushAdd metrics to the given pushgateway. @@ -663,10 +677,12 @@ def pushadd_to_gateway( will be carried out by a default handler. See the 'prometheus_client.push_to_gateway' documentation for implementation requirements. + `compression` selects the payload compression. Supported values are 'gzip' + and 'snappy'. Defaults to None (no compression). This replaces metrics with the same name, job and grouping_key. This uses the POST HTTP method.""" - _use_gateway('POST', gateway, job, registry, grouping_key, timeout, handler) + _use_gateway('POST', gateway, job, registry, grouping_key, timeout, handler, compression) def delete_from_gateway( @@ -706,6 +722,7 @@ def _use_gateway( grouping_key: Optional[Dict[str, Any]], timeout: Optional[float], handler: Callable, + compression: CompressionType = None, ) -> None: gateway_url = urlparse(gateway) # See https://bugs.python.org/issue27657 for details on urlparse in py>=3.7.6. @@ -715,24 +732,53 @@ def _use_gateway( gateway = gateway.rstrip('/') url = '{}/metrics/{}/{}'.format(gateway, *_escape_grouping_key("job", job)) - data = b'' - if method != 'DELETE': - if registry is None: - registry = REGISTRY - data = generate_latest(registry) - if grouping_key is None: grouping_key = {} url += ''.join( '/{}/{}'.format(*_escape_grouping_key(str(k), str(v))) for k, v in sorted(grouping_key.items())) + data = b'' + headers: List[Tuple[str, str]] = [] + if method != 'DELETE': + if registry is None: + registry = REGISTRY + data = generate_latest(registry) + data, headers = _compress_payload(data, compression) + else: + # DELETE requests still need Content-Type header per test expectations + headers = [('Content-Type', CONTENT_TYPE_PLAIN_0_0_4)] + if compression is not None: + raise ValueError('Compression is not supported for DELETE requests.') + handler( url=url, method=method, timeout=timeout, - headers=[('Content-Type', CONTENT_TYPE_PLAIN_0_0_4)], data=data, + headers=headers, data=data, )() +def _compress_payload(data: bytes, compression: CompressionType) -> Tuple[bytes, List[Tuple[str, str]]]: + headers = [('Content-Type', CONTENT_TYPE_PLAIN_0_0_4)] + if compression is None: + return data, headers + + encoding = compression.lower() + if encoding == 'gzip': + headers.append(('Content-Encoding', 'gzip')) + return gzip.compress(data), headers + if encoding == 'snappy': + if not SNAPPY_AVAILABLE: + raise RuntimeError('Snappy compression requires the python-snappy package to be installed.') + headers.append(('Content-Encoding', 'snappy')) + compressor = snappy.StreamCompressor() + compressed = compressor.compress(data) + flush = getattr(compressor, 'flush', None) + if callable(flush): + compressed += flush() + return compressed, headers + raise ValueError(f"Unsupported compression type: {compression}") + + def _escape_grouping_key(k, v): if v == "": # Per https://github.com/prometheus/pushgateway/pull/346. diff --git a/tests/test_exposition.py b/tests/test_exposition.py index 3dd5e378..aceff738 100644 --- a/tests/test_exposition.py +++ b/tests/test_exposition.py @@ -1,3 +1,4 @@ +import gzip from http.server import BaseHTTPRequestHandler, HTTPServer import os import threading @@ -404,6 +405,30 @@ def test_push_with_trailing_slash(self): self.assertNotIn('//', self.requests[0][0].path) + def test_push_with_gzip_compression(self): + push_to_gateway(self.address, "my_job", self.registry, compression='gzip') + request, body = self.requests[0] + self.assertEqual(request.headers.get('content-encoding'), 'gzip') + decompressed = gzip.decompress(body) + self.assertEqual(decompressed, b'# HELP g help\n# TYPE g gauge\ng 0.0\n') + + def test_push_with_snappy_compression(self): + snappy = pytest.importorskip('snappy') + push_to_gateway(self.address, "my_job", self.registry, compression='snappy') + request, body = self.requests[0] + self.assertEqual(request.headers.get('content-encoding'), 'snappy') + decompressor = snappy.StreamDecompressor() + decompressed = decompressor.decompress(body) + flush = getattr(decompressor, 'flush', None) + if callable(flush): + decompressed += flush() + self.assertEqual(decompressed, b'# HELP g help\n# TYPE g gauge\ng 0.0\n') + + def test_push_with_invalid_compression(self): + with self.assertRaisesRegex(ValueError, 'Unsupported compression type'): + push_to_gateway(self.address, "my_job", self.registry, compression='brotli') + self.assertEqual(self.requests, []) + def test_instance_ip_grouping_key(self): self.assertTrue('' != instance_ip_grouping_key()['instance']) diff --git a/tox.ini b/tox.ini index ccf95cc2..45a6baf3 100644 --- a/tox.ini +++ b/tox.ini @@ -10,6 +10,7 @@ deps = attrs {py3.9,pypy3.9}: twisted {py3.9,pypy3.9}: aiohttp + {py3.9}: python-snappy commands = coverage run --parallel -m pytest {posargs} [testenv:py3.9-nooptionals]