From f32aa9b5ea10467637e7592d8dd7529d01276ae0 Mon Sep 17 00:00:00 2001 From: Matthew Marion Date: Fri, 14 Nov 2025 13:08:46 -0500 Subject: [PATCH 1/4] try upgrading to support 3.14 --- .../workflows/ci-build-release-wheels.yaml | 3 + .github/workflows/ci-pr-validation.yaml | 5 +- README.md | 2 +- dependencies.yaml | 2 +- pulsar/__init__.py | 670 +++++++++++------- setup.py | 53 +- 6 files changed, 447 insertions(+), 288 deletions(-) diff --git a/.github/workflows/ci-build-release-wheels.yaml b/.github/workflows/ci-build-release-wheels.yaml index 47155a5..79a179d 100644 --- a/.github/workflows/ci-build-release-wheels.yaml +++ b/.github/workflows/ci-build-release-wheels.yaml @@ -46,6 +46,7 @@ jobs: - {version: '3.11', spec: 'cp311-cp311'} - {version: '3.12', spec: 'cp312-cp312'} - {version: '3.13', spec: 'cp313-cp313'} + - {version: '3.14', spec: 'cp314-cp314'} cpu: - {arch: 'x86_64', platform: 'x86_64'} - {arch: 'aarch64', platform: 'arm64'} @@ -106,6 +107,7 @@ jobs: - {version: '3.11', version_long: '3.11.11'} - {version: '3.12', version_long: '3.12.8'} - {version: '3.13', version_long: '3.13.1'} + - {version: '3.14', version_long: '3.14.0'} steps: - name: checkout @@ -136,6 +138,7 @@ jobs: - {version: '3.11'} - {version: '3.12'} - {version: '3.13'} + - {version: '3.14'} steps: - uses: actions/checkout@v3 diff --git a/.github/workflows/ci-pr-validation.yaml b/.github/workflows/ci-pr-validation.yaml index 60a9e7d..121210b 100644 --- a/.github/workflows/ci-pr-validation.yaml +++ b/.github/workflows/ci-pr-validation.yaml @@ -54,7 +54,7 @@ jobs: strategy: fail-fast: false matrix: - version: ['3.9', '3.13'] + version: ['3.9', '3.13', '3.14'] steps: - name: checkout @@ -116,6 +116,7 @@ jobs: - {name: 'manylinux_musl', py_suffix: '-alpine'} python: - {version: '3.13', spec: 'cp313-cp313'} + - {version: '3.14', spec: 'cp314-cp314'} cpu: - {arch: 'x86_64', platform: 'x86_64'} @@ -163,6 +164,7 @@ jobs: matrix: py: - {version: '3.13', version_long: '3.13.1'} + - {version: '3.14', version_long: '3.14.0'} steps: - name: checkout @@ -185,6 +187,7 @@ jobs: matrix: python: - version: '3.12' + - version: '3.14' steps: - uses: actions/checkout@v3 diff --git a/README.md b/README.md index f7ec0b0..b964d2b 100644 --- a/README.md +++ b/README.md @@ -25,7 +25,7 @@ Pulsar Python clients support a variety of Pulsar features to enable building ap ## Requirements -- Python 3.9, 3.10, 3.11, 3.12 or 3.13 +- Python 3.9, 3.10, 3.11, 3.12, 3.13, or 3.14 - A C++ compiler that supports C++11 - CMake >= 3.18 - [Pulsar C++ client library](https://github.com/apache/pulsar-client-cpp) diff --git a/dependencies.yaml b/dependencies.yaml index 20dc5f5..71d31c8 100644 --- a/dependencies.yaml +++ b/dependencies.yaml @@ -18,6 +18,6 @@ # pulsar-cpp: 3.7.2 -pybind11: 2.10.1 +pybind11: 3.0.1 # The OpenSSL dependency is only used when building Python from source openssl: 1.1.1q diff --git a/pulsar/__init__.py b/pulsar/__init__.py index 7dda4cd..3dc01b4 100644 --- a/pulsar/__init__.py +++ b/pulsar/__init__.py @@ -21,7 +21,7 @@ The Pulsar Python client library is based on the existing C++ client library. All the same features are exposed through the Python interface. -Currently, the supported Python versions are 3.7, 3.8, 3.9 and 3.10. +Currently, the supported Python versions are 3.9, 3.10, 3.11, 3.12, 3.13, and 3.14. ================= Install from PyPI @@ -43,28 +43,40 @@ """ import logging -from typing import Callable, List, Tuple, Optional, Union +from typing import Callable, List, Optional, Tuple, Union import _pulsar +from _pulsar import ( + BatchingType, + BatchReceivePolicy, + CompressionType, + ConsumerCryptoFailureAction, + ConsumerType, + DeadLetterPolicyBuilder, # noqa: F401 + InitialPosition, + KeySharedMode, + KeySharedPolicy, + LoggerLevel, + PartitionsRoutingMode, + ProducerAccessMode, + RegexSubscriptionMode, + Result, +) -from _pulsar import Result, CompressionType, ConsumerType, InitialPosition, PartitionsRoutingMode, BatchingType, \ - LoggerLevel, BatchReceivePolicy, KeySharedPolicy, KeySharedMode, ProducerAccessMode, RegexSubscriptionMode, \ - DeadLetterPolicyBuilder, ConsumerCryptoFailureAction # noqa: F401 - +from pulsar import schema from pulsar.__about__ import __version__ - from pulsar.exceptions import * +from pulsar.functions.context import Context +from pulsar.functions.function import Function +from pulsar.functions.serde import IdentitySerDe, PickleSerDe, SerDe from pulsar.schema.schema import BytesSchema from pulsar.tableview import TableView -from pulsar.functions.function import Function -from pulsar.functions.context import Context -from pulsar.functions.serde import SerDe, IdentitySerDe, PickleSerDe -from pulsar import schema _schema = schema import re -_retype = type(re.compile('x')) + +_retype = type(re.compile("x")) from datetime import timedelta @@ -83,7 +95,9 @@ class MessageId: """ def __init__(self, partition=-1, ledger_id=-1, entry_id=-1, batch_index=-1): - self._msg_id: _pulsar.MessageId = _pulsar.MessageId(partition, ledger_id, entry_id, batch_index) + self._msg_id: _pulsar.MessageId = _pulsar.MessageId( + partition, ledger_id, entry_id, batch_index + ) earliest = _pulsar.MessageId.earliest latest = _pulsar.MessageId.latest @@ -166,6 +180,7 @@ def wrap(cls, msg_id: _pulsar.MessageId): self._msg_id = msg_id return self + class Message: """ Message objects are returned by a consumer, either by calling `receive` or @@ -253,7 +268,6 @@ def _wrap(_message): class MessageBatch: - def __init__(self): self._msg_batch = _pulsar.MessageBatch() @@ -277,6 +291,7 @@ class Authentication: Authentication provider object. Used to load authentication from an external shared library. """ + def __init__(self, dynamicLibPath, authParamsString): """ Create the authentication provider instance. @@ -289,8 +304,8 @@ def __init__(self, dynamicLibPath, authParamsString): authParamsString: str Comma-separated list of provider-specific configuration params """ - _check_type(str, dynamicLibPath, 'dynamicLibPath') - _check_type(str, authParamsString, 'authParamsString') + _check_type(str, dynamicLibPath, "dynamicLibPath") + _check_type(str, authParamsString, "authParamsString") self.auth = _pulsar.Authentication.create(dynamicLibPath, authParamsString) @@ -298,6 +313,7 @@ class AuthenticationTLS(Authentication): """ TLS Authentication implementation """ + def __init__(self, certificate_path, private_key_path): """ Create the TLS authentication provider instance. @@ -310,8 +326,8 @@ def __init__(self, certificate_path, private_key_path): private_key_path: str Path to private TLS key """ - _check_type(str, certificate_path, 'certificate_path') - _check_type(str, private_key_path, 'private_key_path') + _check_type(str, certificate_path, "certificate_path") + _check_type(str, private_key_path, "private_key_path") self.auth = _pulsar.AuthenticationTLS.create(certificate_path, private_key_path) @@ -319,6 +335,7 @@ class AuthenticationToken(Authentication): """ Token based authentication implementation """ + def __init__(self, token): """ Create the token authentication provider instance. @@ -330,7 +347,9 @@ def __init__(self, token): A string containing the token or a functions that provides a string with the token """ if not (isinstance(token, str) or callable(token)): - raise ValueError("Argument token is expected to be of type 'str' or a function returning 'str'") + raise ValueError( + "Argument token is expected to be of type 'str' or a function returning 'str'" + ) self.auth = _pulsar.AuthenticationToken.create(token) @@ -338,6 +357,7 @@ class AuthenticationAthenz(Authentication): """ Athenz Authentication implementation """ + def __init__(self, auth_params_string): """ Create the Athenz authentication provider instance. @@ -348,13 +368,15 @@ def __init__(self, auth_params_string): auth_params_string: str JSON encoded configuration for Athenz client """ - _check_type(str, auth_params_string, 'auth_params_string') + _check_type(str, auth_params_string, "auth_params_string") self.auth = _pulsar.AuthenticationAthenz.create(auth_params_string) + class AuthenticationOauth2(Authentication): """ Oauth2 Authentication implementation """ + def __init__(self, auth_params_string: str): """ Create the Oauth2 authentication provider instance. @@ -394,14 +416,18 @@ def __init__(self, auth_params_string: str): JSON encoded configuration for Oauth2 client """ - _check_type(str, auth_params_string, 'auth_params_string') + _check_type(str, auth_params_string, "auth_params_string") self.auth = _pulsar.AuthenticationOauth2.create(auth_params_string) + class AuthenticationBasic(Authentication): """ Basic Authentication implementation """ - def __init__(self, username=None, password=None, method='basic', auth_params_string=None): + + def __init__( + self, username=None, password=None, method="basic", auth_params_string=None + ): """ Create the Basic authentication provider instance. @@ -430,22 +456,26 @@ def __init__(self, username=None, password=None, method='basic', auth_params_str "basic" by default. """ if auth_params_string is not None: - _check_type(str, auth_params_string, 'auth_params_string') + _check_type(str, auth_params_string, "auth_params_string") self.auth = _pulsar.AuthenticationBasic.create(auth_params_string) else: - _check_type(str, username, 'username') - _check_type(str, password, 'password') - _check_type(str, method, 'method') + _check_type(str, username, "username") + _check_type(str, password, "password") + _check_type(str, method, "method") self.auth = _pulsar.AuthenticationBasic.create(username, password, method) + class ConsumerDeadLetterPolicy: """ Configuration for the "dead letter queue" feature in consumer. """ - def __init__(self, - max_redeliver_count: int, - dead_letter_topic: str = None, - initial_subscription_name: str = None): + + def __init__( + self, + max_redeliver_count: int, + dead_letter_topic: str = None, + initial_subscription_name: str = None, + ): """ Wrapper DeadLetterPolicy. @@ -497,10 +527,12 @@ def policy(self): """ return self._policy + class CryptoKeyReader: """ Default crypto key reader implementation """ + def __init__(self, public_key_path, private_key_path): """ Create crypto key reader. @@ -513,9 +545,12 @@ def __init__(self, public_key_path, private_key_path): private_key_path: str Path to private key """ - _check_type(str, public_key_path, 'public_key_path') - _check_type(str, private_key_path, 'private_key_path') - self.cryptoKeyReader = _pulsar.CryptoKeyReader(public_key_path, private_key_path) + _check_type(str, public_key_path, "public_key_path") + _check_type(str, private_key_path, "private_key_path") + self.cryptoKeyReader = _pulsar.CryptoKeyReader( + public_key_path, private_key_path + ) + class Client: """ @@ -526,24 +561,26 @@ class Client: producers and consumers. """ - def __init__(self, service_url, - authentication=None, - operation_timeout_seconds=30, - io_threads=1, - message_listener_threads=1, - concurrent_lookup_requests=50000, - log_conf_file_path=None, - stats_interval_in_seconds=600, - use_tls=False, - tls_trust_certs_file_path=None, - tls_allow_insecure_connection=False, - tls_validate_hostname=False, - logger=None, - connection_timeout_ms=10000, - listener_name=None, - tls_private_key_file_path: Optional[str] = None, - tls_certificate_file_path: Optional[str] = None, - ): + def __init__( + self, + service_url, + authentication=None, + operation_timeout_seconds=30, + io_threads=1, + message_listener_threads=1, + concurrent_lookup_requests=50000, + log_conf_file_path=None, + stats_interval_in_seconds=600, + use_tls=False, + tls_trust_certs_file_path=None, + tls_allow_insecure_connection=False, + tls_validate_hostname=False, + logger=None, + connection_timeout_ms=10000, + listener_name=None, + tls_private_key_file_path: Optional[str] = None, + tls_certificate_file_path: Optional[str] = None, + ): """ Create a new Pulsar client instance. @@ -613,22 +650,24 @@ def __init__(self, service_url, tls_certificate_file_path: str, optional The path to the TLS certificate file. """ - _check_type(str, service_url, 'service_url') - _check_type_or_none(Authentication, authentication, 'authentication') - _check_type(int, operation_timeout_seconds, 'operation_timeout_seconds') - _check_type(int, connection_timeout_ms, 'connection_timeout_ms') - _check_type(int, io_threads, 'io_threads') - _check_type(int, message_listener_threads, 'message_listener_threads') - _check_type(int, concurrent_lookup_requests, 'concurrent_lookup_requests') - _check_type_or_none(str, log_conf_file_path, 'log_conf_file_path') - _check_type(int, stats_interval_in_seconds, 'stats_interval_in_seconds') - _check_type(bool, use_tls, 'use_tls') - _check_type_or_none(str, tls_trust_certs_file_path, 'tls_trust_certs_file_path') - _check_type(bool, tls_allow_insecure_connection, 'tls_allow_insecure_connection') - _check_type(bool, tls_validate_hostname, 'tls_validate_hostname') - _check_type_or_none(str, listener_name, 'listener_name') - _check_type_or_none(str, tls_private_key_file_path, 'tls_private_key_file_path') - _check_type_or_none(str, tls_certificate_file_path, 'tls_certificate_file_path') + _check_type(str, service_url, "service_url") + _check_type_or_none(Authentication, authentication, "authentication") + _check_type(int, operation_timeout_seconds, "operation_timeout_seconds") + _check_type(int, connection_timeout_ms, "connection_timeout_ms") + _check_type(int, io_threads, "io_threads") + _check_type(int, message_listener_threads, "message_listener_threads") + _check_type(int, concurrent_lookup_requests, "concurrent_lookup_requests") + _check_type_or_none(str, log_conf_file_path, "log_conf_file_path") + _check_type(int, stats_interval_in_seconds, "stats_interval_in_seconds") + _check_type(bool, use_tls, "use_tls") + _check_type_or_none(str, tls_trust_certs_file_path, "tls_trust_certs_file_path") + _check_type( + bool, tls_allow_insecure_connection, "tls_allow_insecure_connection" + ) + _check_type(bool, tls_validate_hostname, "tls_validate_hostname") + _check_type_or_none(str, listener_name, "listener_name") + _check_type_or_none(str, tls_private_key_file_path, "tls_private_key_file_path") + _check_type_or_none(str, tls_certificate_file_path, "tls_certificate_file_path") conf = _pulsar.ClientConfiguration() if authentication: @@ -647,16 +686,23 @@ def __init__(self, service_url, elif isinstance(logger, FileLogger): conf.set_file_logger(logger.log_level, logger.log_file) elif logger is not None: - raise ValueError("Logger is expected to be either None, logger.Logger, pulsar.ConsoleLogger or pulsar.FileLogger") + raise ValueError( + "Logger is expected to be either None, logger.Logger, pulsar.ConsoleLogger or pulsar.FileLogger" + ) if listener_name: conf.listener_name(listener_name) - if use_tls or service_url.startswith('pulsar+ssl://') or service_url.startswith('https://'): + if ( + use_tls + or service_url.startswith("pulsar+ssl://") + or service_url.startswith("https://") + ): conf.use_tls(True) if tls_trust_certs_file_path: conf.tls_trust_certs_file_path(tls_trust_certs_file_path) else: import certifi + conf.tls_trust_certs_file_path(certifi.where()) conf.tls_allow_insecure_connection(tls_allow_insecure_connection) conf.tls_validate_hostname(tls_validate_hostname) @@ -670,36 +716,40 @@ def __init__(self, service_url, @staticmethod def _prepare_logger(logger): import logging + def log(level, message): old_threads = logging.logThreads logging.logThreads = False logger.log(logging.getLevelName(level), message) logging.logThreads = old_threads + return log - def create_producer(self, topic, - producer_name=None, - schema=schema.BytesSchema(), - initial_sequence_id=None, - send_timeout_millis=30000, - compression_type: CompressionType = CompressionType.NONE, - max_pending_messages=1000, - max_pending_messages_across_partitions=50000, - block_if_queue_full=False, - batching_enabled=False, - batching_max_messages=1000, - batching_max_allowed_size_in_bytes=128*1024, - batching_max_publish_delay_ms=10, - chunking_enabled=False, - message_routing_mode: PartitionsRoutingMode = PartitionsRoutingMode.RoundRobinDistribution, - lazy_start_partitioned_producers=False, - properties=None, - batching_type: BatchingType = BatchingType.Default, - encryption_key=None, - crypto_key_reader: Union[None, CryptoKeyReader] = None, - access_mode: ProducerAccessMode = ProducerAccessMode.Shared, - message_router: Callable[[Message, int], int]=None, - ): + def create_producer( + self, + topic, + producer_name=None, + schema=schema.BytesSchema(), + initial_sequence_id=None, + send_timeout_millis=30000, + compression_type: CompressionType = CompressionType.NONE, + max_pending_messages=1000, + max_pending_messages_across_partitions=50000, + block_if_queue_full=False, + batching_enabled=False, + batching_max_messages=1000, + batching_max_allowed_size_in_bytes=128 * 1024, + batching_max_publish_delay_ms=10, + chunking_enabled=False, + message_routing_mode: PartitionsRoutingMode = PartitionsRoutingMode.RoundRobinDistribution, + lazy_start_partitioned_producers=False, + properties=None, + batching_type: BatchingType = BatchingType.Default, + encryption_key=None, + crypto_key_reader: Union[None, CryptoKeyReader] = None, + access_mode: ProducerAccessMode = ProducerAccessMode.Shared, + message_router: Callable[[Message, int], int] = None, + ): """ Create a new producer on a given topic. @@ -819,32 +869,44 @@ def create_producer(self, topic, and returns the partition index to which the message should be routed. If not provided, the default routing policy defined by `message_routing_mode` will be used. """ - _check_type(str, topic, 'topic') - _check_type_or_none(str, producer_name, 'producer_name') - _check_type(_schema.Schema, schema, 'schema') - _check_type_or_none(int, initial_sequence_id, 'initial_sequence_id') - _check_type(int, send_timeout_millis, 'send_timeout_millis') - _check_type(CompressionType, compression_type, 'compression_type') - _check_type(int, max_pending_messages, 'max_pending_messages') - _check_type(int, max_pending_messages_across_partitions, 'max_pending_messages_across_partitions') - _check_type(bool, block_if_queue_full, 'block_if_queue_full') - _check_type(bool, batching_enabled, 'batching_enabled') - _check_type(int, batching_max_messages, 'batching_max_messages') - _check_type(int, batching_max_allowed_size_in_bytes, 'batching_max_allowed_size_in_bytes') - _check_type(int, batching_max_publish_delay_ms, 'batching_max_publish_delay_ms') - _check_type(bool, chunking_enabled, 'chunking_enabled') - _check_type_or_none(dict, properties, 'properties') - _check_type(BatchingType, batching_type, 'batching_type') - _check_type_or_none(str, encryption_key, 'encryption_key') - _check_type_or_none(CryptoKeyReader, crypto_key_reader, 'crypto_key_reader') - _check_type(bool, lazy_start_partitioned_producers, 'lazy_start_partitioned_producers') - _check_type(ProducerAccessMode, access_mode, 'access_mode') + _check_type(str, topic, "topic") + _check_type_or_none(str, producer_name, "producer_name") + _check_type(_schema.Schema, schema, "schema") + _check_type_or_none(int, initial_sequence_id, "initial_sequence_id") + _check_type(int, send_timeout_millis, "send_timeout_millis") + _check_type(CompressionType, compression_type, "compression_type") + _check_type(int, max_pending_messages, "max_pending_messages") + _check_type( + int, + max_pending_messages_across_partitions, + "max_pending_messages_across_partitions", + ) + _check_type(bool, block_if_queue_full, "block_if_queue_full") + _check_type(bool, batching_enabled, "batching_enabled") + _check_type(int, batching_max_messages, "batching_max_messages") + _check_type( + int, + batching_max_allowed_size_in_bytes, + "batching_max_allowed_size_in_bytes", + ) + _check_type(int, batching_max_publish_delay_ms, "batching_max_publish_delay_ms") + _check_type(bool, chunking_enabled, "chunking_enabled") + _check_type_or_none(dict, properties, "properties") + _check_type(BatchingType, batching_type, "batching_type") + _check_type_or_none(str, encryption_key, "encryption_key") + _check_type_or_none(CryptoKeyReader, crypto_key_reader, "crypto_key_reader") + _check_type( + bool, lazy_start_partitioned_producers, "lazy_start_partitioned_producers" + ) + _check_type(ProducerAccessMode, access_mode, "access_mode") conf = _pulsar.ProducerConfiguration() conf.send_timeout_millis(send_timeout_millis) conf.compression_type(compression_type) conf.max_pending_messages(max_pending_messages) - conf.max_pending_messages_across_partitions(max_pending_messages_across_partitions) + conf.max_pending_messages_across_partitions( + max_pending_messages_across_partitions + ) conf.block_if_queue_full(block_if_queue_full) conf.batching_enabled(batching_enabled) conf.batching_max_messages(batching_max_messages) @@ -856,7 +918,9 @@ def create_producer(self, topic, conf.lazy_start_partitioned_producers(lazy_start_partitioned_producers) conf.access_mode(access_mode) if message_router is not None: - underlying_router = lambda msg, num_partitions: int(message_router(Message._wrap(msg), num_partitions)) + underlying_router = lambda msg, num_partitions: int( + message_router(Message._wrap(msg), num_partitions) + ) conf.message_router(underlying_router) if producer_name: @@ -874,7 +938,9 @@ def create_producer(self, topic, conf.crypto_key_reader(crypto_key_reader.cryptoKeyReader) if batching_enabled and chunking_enabled: - raise ValueError("Batching and chunking of messages can't be enabled together.") + raise ValueError( + "Batching and chunking of messages can't be enabled together." + ) p = Producer() p._producer = self._client.create_producer(topic, conf) @@ -882,32 +948,35 @@ def create_producer(self, topic, p._client = self._client return p - def subscribe(self, topic, subscription_name, - consumer_type: ConsumerType = ConsumerType.Exclusive, - schema=schema.BytesSchema(), - message_listener=None, - receiver_queue_size=1000, - max_total_receiver_queue_size_across_partitions=50000, - consumer_name=None, - unacked_messages_timeout_ms=None, - broker_consumer_stats_cache_time_ms=30000, - negative_ack_redelivery_delay_ms=60000, - is_read_compacted=False, - properties=None, - pattern_auto_discovery_period=60, - initial_position: InitialPosition = InitialPosition.Latest, - crypto_key_reader: Union[None, CryptoKeyReader] = None, - replicate_subscription_state_enabled=False, - max_pending_chunked_message=10, - auto_ack_oldest_chunked_message_on_queue_full=False, - start_message_id_inclusive=False, - batch_receive_policy=None, - key_shared_policy=None, - batch_index_ack_enabled=False, - regex_subscription_mode: RegexSubscriptionMode = RegexSubscriptionMode.PersistentOnly, - dead_letter_policy: Union[None, ConsumerDeadLetterPolicy] = None, - crypto_failure_action: ConsumerCryptoFailureAction = ConsumerCryptoFailureAction.FAIL, - ): + def subscribe( + self, + topic, + subscription_name, + consumer_type: ConsumerType = ConsumerType.Exclusive, + schema=schema.BytesSchema(), + message_listener=None, + receiver_queue_size=1000, + max_total_receiver_queue_size_across_partitions=50000, + consumer_name=None, + unacked_messages_timeout_ms=None, + broker_consumer_stats_cache_time_ms=30000, + negative_ack_redelivery_delay_ms=60000, + is_read_compacted=False, + properties=None, + pattern_auto_discovery_period=60, + initial_position: InitialPosition = InitialPosition.Latest, + crypto_key_reader: Union[None, CryptoKeyReader] = None, + replicate_subscription_state_enabled=False, + max_pending_chunked_message=10, + auto_ack_oldest_chunked_message_on_queue_full=False, + start_message_id_inclusive=False, + batch_receive_policy=None, + key_shared_policy=None, + batch_index_ack_enabled=False, + regex_subscription_mode: RegexSubscriptionMode = RegexSubscriptionMode.PersistentOnly, + dead_letter_policy: Union[None, ConsumerDeadLetterPolicy] = None, + crypto_failure_action: ConsumerCryptoFailureAction = ConsumerCryptoFailureAction.FAIL, + ): """ Subscribe to the given topic and subscription combination. @@ -1024,29 +1093,52 @@ def my_listener(consumer, message): message contains batch messages, client will not be able to retrieve individual messages in the batch. """ - _check_type(str, subscription_name, 'subscription_name') - _check_type(ConsumerType, consumer_type, 'consumer_type') - _check_type(_schema.Schema, schema, 'schema') - _check_type(int, receiver_queue_size, 'receiver_queue_size') - _check_type(int, max_total_receiver_queue_size_across_partitions, - 'max_total_receiver_queue_size_across_partitions') - _check_type_or_none(str, consumer_name, 'consumer_name') - _check_type_or_none(int, unacked_messages_timeout_ms, 'unacked_messages_timeout_ms') - _check_type(int, broker_consumer_stats_cache_time_ms, 'broker_consumer_stats_cache_time_ms') - _check_type(int, negative_ack_redelivery_delay_ms, 'negative_ack_redelivery_delay_ms') - _check_type(int, pattern_auto_discovery_period, 'pattern_auto_discovery_period') - _check_type(bool, is_read_compacted, 'is_read_compacted') - _check_type_or_none(dict, properties, 'properties') - _check_type(InitialPosition, initial_position, 'initial_position') - _check_type_or_none(CryptoKeyReader, crypto_key_reader, 'crypto_key_reader') - _check_type(int, max_pending_chunked_message, 'max_pending_chunked_message') - _check_type(bool, auto_ack_oldest_chunked_message_on_queue_full, 'auto_ack_oldest_chunked_message_on_queue_full') - _check_type(bool, start_message_id_inclusive, 'start_message_id_inclusive') - _check_type_or_none(ConsumerBatchReceivePolicy, batch_receive_policy, 'batch_receive_policy') - _check_type_or_none(ConsumerKeySharedPolicy, key_shared_policy, 'key_shared_policy') - _check_type(bool, batch_index_ack_enabled, 'batch_index_ack_enabled') - _check_type(RegexSubscriptionMode, regex_subscription_mode, 'regex_subscription_mode') - _check_type(ConsumerCryptoFailureAction, crypto_failure_action, 'crypto_failure_action') + _check_type(str, subscription_name, "subscription_name") + _check_type(ConsumerType, consumer_type, "consumer_type") + _check_type(_schema.Schema, schema, "schema") + _check_type(int, receiver_queue_size, "receiver_queue_size") + _check_type( + int, + max_total_receiver_queue_size_across_partitions, + "max_total_receiver_queue_size_across_partitions", + ) + _check_type_or_none(str, consumer_name, "consumer_name") + _check_type_or_none( + int, unacked_messages_timeout_ms, "unacked_messages_timeout_ms" + ) + _check_type( + int, + broker_consumer_stats_cache_time_ms, + "broker_consumer_stats_cache_time_ms", + ) + _check_type( + int, negative_ack_redelivery_delay_ms, "negative_ack_redelivery_delay_ms" + ) + _check_type(int, pattern_auto_discovery_period, "pattern_auto_discovery_period") + _check_type(bool, is_read_compacted, "is_read_compacted") + _check_type_or_none(dict, properties, "properties") + _check_type(InitialPosition, initial_position, "initial_position") + _check_type_or_none(CryptoKeyReader, crypto_key_reader, "crypto_key_reader") + _check_type(int, max_pending_chunked_message, "max_pending_chunked_message") + _check_type( + bool, + auto_ack_oldest_chunked_message_on_queue_full, + "auto_ack_oldest_chunked_message_on_queue_full", + ) + _check_type(bool, start_message_id_inclusive, "start_message_id_inclusive") + _check_type_or_none( + ConsumerBatchReceivePolicy, batch_receive_policy, "batch_receive_policy" + ) + _check_type_or_none( + ConsumerKeySharedPolicy, key_shared_policy, "key_shared_policy" + ) + _check_type(bool, batch_index_ack_enabled, "batch_index_ack_enabled") + _check_type( + RegexSubscriptionMode, regex_subscription_mode, "regex_subscription_mode" + ) + _check_type( + ConsumerCryptoFailureAction, crypto_failure_action, "crypto_failure_action" + ) conf = _pulsar.ConsumerConfiguration() conf.consumer_type(consumer_type) @@ -1055,7 +1147,9 @@ def my_listener(consumer, message): if message_listener: conf.message_listener(_listener_wrapper(message_listener, schema)) conf.receiver_queue_size(receiver_queue_size) - conf.max_total_receiver_queue_size_across_partitions(max_total_receiver_queue_size_across_partitions) + conf.max_total_receiver_queue_size_across_partitions( + max_total_receiver_queue_size_across_partitions + ) if consumer_name: conf.consumer_name(consumer_name) if unacked_messages_timeout_ms: @@ -1075,7 +1169,9 @@ def my_listener(consumer, message): conf.replicate_subscription_state_enabled(replicate_subscription_state_enabled) conf.max_pending_chunked_message(max_pending_chunked_message) - conf.auto_ack_oldest_chunked_message_on_queue_full(auto_ack_oldest_chunked_message_on_queue_full) + conf.auto_ack_oldest_chunked_message_on_queue_full( + auto_ack_oldest_chunked_message_on_queue_full + ) conf.start_message_id_inclusive(start_message_id_inclusive) if batch_receive_policy: conf.batch_receive_policy(batch_receive_policy.policy()) @@ -1096,9 +1192,13 @@ def my_listener(consumer, message): c._consumer = self._client.subscribe_topics(topic, subscription_name, conf) elif isinstance(topic, _retype): # Regex pattern - c._consumer = self._client.subscribe_pattern(topic.pattern, subscription_name, conf) + c._consumer = self._client.subscribe_pattern( + topic.pattern, subscription_name, conf + ) else: - raise ValueError("Argument 'topic' is expected to be of a type between (str, list, re.pattern)") + raise ValueError( + "Argument 'topic' is expected to be of a type between (str, list, re.pattern)" + ) c._client = self c._schema = schema @@ -1106,17 +1206,20 @@ def my_listener(consumer, message): self._consumers.append(c) return c - def create_reader(self, topic, start_message_id, - schema=schema.BytesSchema(), - reader_listener=None, - receiver_queue_size=1000, - reader_name=None, - subscription_role_prefix=None, - is_read_compacted=False, - crypto_key_reader: Union[None, CryptoKeyReader] = None, - start_message_id_inclusive=False, - crypto_failure_action: ConsumerCryptoFailureAction = ConsumerCryptoFailureAction.FAIL, - ): + def create_reader( + self, + topic, + start_message_id, + schema=schema.BytesSchema(), + reader_listener=None, + receiver_queue_size=1000, + reader_name=None, + subscription_role_prefix=None, + is_read_compacted=False, + crypto_key_reader: Union[None, CryptoKeyReader] = None, + start_message_id_inclusive=False, + crypto_failure_action: ConsumerCryptoFailureAction = ConsumerCryptoFailureAction.FAIL, + ): """ Create a reader on a particular topic @@ -1195,16 +1298,18 @@ def my_listener(reader, message): if isinstance(start_message_id, MessageId): start_message_id = start_message_id._msg_id - _check_type(str, topic, 'topic') - _check_type(_pulsar.MessageId, start_message_id, 'start_message_id') - _check_type(_schema.Schema, schema, 'schema') - _check_type(int, receiver_queue_size, 'receiver_queue_size') - _check_type_or_none(str, reader_name, 'reader_name') - _check_type_or_none(str, subscription_role_prefix, 'subscription_role_prefix') - _check_type(bool, is_read_compacted, 'is_read_compacted') - _check_type_or_none(CryptoKeyReader, crypto_key_reader, 'crypto_key_reader') - _check_type(bool, start_message_id_inclusive, 'start_message_id_inclusive') - _check_type(ConsumerCryptoFailureAction, crypto_failure_action, 'crypto_failure_action') + _check_type(str, topic, "topic") + _check_type(_pulsar.MessageId, start_message_id, "start_message_id") + _check_type(_schema.Schema, schema, "schema") + _check_type(int, receiver_queue_size, "receiver_queue_size") + _check_type_or_none(str, reader_name, "reader_name") + _check_type_or_none(str, subscription_role_prefix, "subscription_role_prefix") + _check_type(bool, is_read_compacted, "is_read_compacted") + _check_type_or_none(CryptoKeyReader, crypto_key_reader, "crypto_key_reader") + _check_type(bool, start_message_id_inclusive, "start_message_id_inclusive") + _check_type( + ConsumerCryptoFailureAction, crypto_failure_action, "crypto_failure_action" + ) conf = _pulsar.ReaderConfiguration() if reader_listener: @@ -1229,9 +1334,12 @@ def my_listener(reader, message): self._consumers.append(c) return c - def create_table_view(self, topic: str, - subscription_name: Optional[str] = None, - schema: schema.Schema = schema.BytesSchema()) -> TableView: + def create_table_view( + self, + topic: str, + subscription_name: Optional[str] = None, + schema: schema.Schema = schema.BytesSchema(), + ) -> TableView: """ Create a table view on a particular topic @@ -1253,9 +1361,9 @@ def create_table_view(self, topic: str, TableView A table view instance. """ - _check_type(str, topic, 'topic') - _check_type_or_none(str, subscription_name, 'subscription_name') - _check_type(_schema.Schema, schema, 'schema') + _check_type(str, topic, "topic") + _check_type_or_none(str, subscription_name, "subscription_name") + _check_type(_schema.Schema, schema, "schema") tv_conf = _pulsar.TableViewConfiguration() if subscription_name is not None: @@ -1286,7 +1394,7 @@ def get_topic_partitions(self, topic): list a list of partition name """ - _check_type(str, topic, 'topic') + _check_type(str, topic, "topic") return self._client.get_topic_partitions(topic) def shutdown(self): @@ -1349,17 +1457,19 @@ def last_sequence_id(self): """ return self._producer.last_sequence_id() - def send(self, content, - properties=None, - partition_key=None, - ordering_key=None, - sequence_id=None, - replication_clusters=None, - disable_replication=False, - event_timestamp=None, - deliver_at=None, - deliver_after=None, - ) -> _pulsar.MessageId: + def send( + self, + content, + properties=None, + partition_key=None, + ordering_key=None, + sequence_id=None, + replication_clusters=None, + disable_replication=False, + event_timestamp=None, + deliver_at=None, + deliver_after=None, + ) -> _pulsar.MessageId: """ Publish a message on the topic. Blocks until the message is acknowledged @@ -1397,22 +1507,34 @@ def send(self, content, ---------- A `_pulsar.MessageId` object that represents where the message is persisted. """ - msg = self._build_msg(content, properties, partition_key, ordering_key, sequence_id, - replication_clusters, disable_replication, event_timestamp, - deliver_at, deliver_after) + msg = self._build_msg( + content, + properties, + partition_key, + ordering_key, + sequence_id, + replication_clusters, + disable_replication, + event_timestamp, + deliver_at, + deliver_after, + ) return self._producer.send(msg) - def send_async(self, content, callback, - properties=None, - partition_key=None, - ordering_key=None, - sequence_id=None, - replication_clusters=None, - disable_replication=False, - event_timestamp=None, - deliver_at=None, - deliver_after=None, - ): + def send_async( + self, + content, + callback, + properties=None, + partition_key=None, + ordering_key=None, + sequence_id=None, + replication_clusters=None, + disable_replication=False, + event_timestamp=None, + deliver_at=None, + deliver_after=None, + ): """ Send a message asynchronously. @@ -1476,12 +1598,20 @@ def callback(res, msg_id): deliver_after: optional Specify a delay in timedelta for the delivery of the messages. """ - msg = self._build_msg(content, properties, partition_key, ordering_key, sequence_id, - replication_clusters, disable_replication, event_timestamp, - deliver_at, deliver_after) + msg = self._build_msg( + content, + properties, + partition_key, + ordering_key, + sequence_id, + replication_clusters, + disable_replication, + event_timestamp, + deliver_at, + deliver_after, + ) self._producer.send_async(msg, callback) - def flush(self): """ Flush all the messages buffered in the client and wait until all messages have been @@ -1489,28 +1619,37 @@ def flush(self): """ self._producer.flush() - def close(self): """ Close the producer. """ self._producer.close() - def _build_msg(self, content, properties, partition_key, ordering_key, sequence_id, - replication_clusters, disable_replication, event_timestamp, - deliver_at, deliver_after): + def _build_msg( + self, + content, + properties, + partition_key, + ordering_key, + sequence_id, + replication_clusters, + disable_replication, + event_timestamp, + deliver_at, + deliver_after, + ): data = self._schema.encode(content) - _check_type(bytes, data, 'data') - _check_type_or_none(dict, properties, 'properties') - _check_type_or_none(str, partition_key, 'partition_key') - _check_type_or_none(str, ordering_key, 'ordering_key') - _check_type_or_none(int, sequence_id, 'sequence_id') - _check_type_or_none(list, replication_clusters, 'replication_clusters') - _check_type(bool, disable_replication, 'disable_replication') - _check_type_or_none(int, event_timestamp, 'event_timestamp') - _check_type_or_none(int, deliver_at, 'deliver_at') - _check_type_or_none(timedelta, deliver_after, 'deliver_after') + _check_type(bytes, data, "data") + _check_type_or_none(dict, properties, "properties") + _check_type_or_none(str, partition_key, "partition_key") + _check_type_or_none(str, ordering_key, "ordering_key") + _check_type_or_none(int, sequence_id, "sequence_id") + _check_type_or_none(list, replication_clusters, "replication_clusters") + _check_type(bool, disable_replication, "disable_replication") + _check_type_or_none(int, event_timestamp, "event_timestamp") + _check_type_or_none(int, deliver_at, "deliver_at") + _check_type_or_none(timedelta, deliver_after, "deliver_after") mb = _pulsar.MessageBuilder() mb.content(data) @@ -1612,7 +1751,7 @@ def receive(self, timeout_millis=None): if timeout_millis is None: msg = self._consumer.receive() else: - _check_type(int, timeout_millis, 'timeout_millis') + _check_type(int, timeout_millis, "timeout_millis") msg = self._consumer.receive(timeout_millis) m = Message() @@ -1635,7 +1774,9 @@ def batch_receive(self): messages.append(m) return messages - def acknowledge(self, message: Union[Message, MessageId, _pulsar.Message, _pulsar.MessageId]): + def acknowledge( + self, message: Union[Message, MessageId, _pulsar.Message, _pulsar.MessageId] + ): """ Acknowledge the reception of a single message. @@ -1659,7 +1800,9 @@ def acknowledge(self, message: Union[Message, MessageId, _pulsar.Message, _pulsa else: self._consumer.acknowledge(message) - def acknowledge_cumulative(self, message: Union[Message, MessageId, _pulsar.Message, _pulsar.MessageId]): + def acknowledge_cumulative( + self, message: Union[Message, MessageId, _pulsar.Message, _pulsar.MessageId] + ): """ Acknowledge the reception of all the messages in the stream up to (and including) the provided message. @@ -1765,6 +1908,7 @@ def get_last_message_id(self): """ return self._consumer.get_last_message_id() + class ConsumerBatchReceivePolicy: """ Batch receive policy can limit the number and bytes of messages in a single batch, @@ -1773,6 +1917,7 @@ class ConsumerBatchReceivePolicy: A batch receive action is completed as long as any one of the conditions (the batch has enough number or size of messages, or the waiting timeout is passed) are met. """ + def __init__(self, max_num_message, max_num_bytes, timeout_ms): """ Wrapper BatchReceivePolicy. @@ -1792,15 +1937,17 @@ def policy(self): """ return self._policy + class ConsumerKeySharedPolicy: """ Consumer key shared policy is used to configure the consumer behaviour when the ConsumerType is KeyShared. """ + def __init__( - self, - key_shared_mode: KeySharedMode = KeySharedMode.AutoSplit, - allow_out_of_order_delivery: bool = False, - sticky_ranges: Optional[List[Tuple[int, int]]] = None, + self, + key_shared_mode: KeySharedMode = KeySharedMode.AutoSplit, + allow_out_of_order_delivery: bool = False, + sticky_ranges: Optional[List[Tuple[int, int]]] = None, ): """ Wrapper KeySharedPolicy. @@ -1823,7 +1970,9 @@ def __init__( Set the ranges used with sticky mode. The integers can be from 0 to 2^16 (0 <= val < 65,536) """ if key_shared_mode == KeySharedMode.Sticky and sticky_ranges is None: - raise ValueError("When using key_shared_mode = KeySharedMode.Sticky you must also provide sticky_ranges") + raise ValueError( + "When using key_shared_mode = KeySharedMode.Sticky you must also provide sticky_ranges" + ) self._policy = KeySharedPolicy() self._policy.set_key_shared_mode(key_shared_mode) @@ -1859,6 +2008,7 @@ def policy(self): """ return self._policy + class Reader: """ Pulsar topic reader. @@ -1886,7 +2036,7 @@ def read_next(self, timeout_millis=None): if timeout_millis is None: msg = self._reader.read_next() else: - _check_type(int, timeout_millis, 'timeout_millis') + _check_type(int, timeout_millis, "timeout_millis") msg = self._reader.read_next(timeout_millis) m = Message() @@ -1898,7 +2048,7 @@ def has_message_available(self): """ Check if there is any message available to read from the current position. """ - return self._reader.has_message_available(); + return self._reader.has_message_available() def seek(self, messageid: Union[MessageId, _pulsar.MessageId, int]): """ @@ -1939,8 +2089,9 @@ class ConsoleLogger: log_level: The logging level, eg: ``pulsar.LoggerLevel.Info`` """ + def __init__(self, log_level=_pulsar.LoggerLevel.Info): - _check_type(_pulsar.LoggerLevel, log_level, 'log_level') + _check_type(_pulsar.LoggerLevel, log_level, "log_level") self.log_level = log_level @@ -1956,23 +2107,28 @@ class FileLogger: log_file: The file where to write the logs """ + def __init__(self, log_level, log_file): - _check_type(_pulsar.LoggerLevel, log_level, 'log_level') - _check_type(str, log_file, 'log_file') + _check_type(_pulsar.LoggerLevel, log_level, "log_level") + _check_type(str, log_file, "log_file") self.log_level = log_level self.log_file = log_file def _check_type(var_type, var, name): if not isinstance(var, var_type): - raise ValueError("Argument %s is expected to be of type '%s' and not '%s'" - % (name, var_type.__name__, type(var).__name__)) + raise ValueError( + "Argument %s is expected to be of type '%s' and not '%s'" + % (name, var_type.__name__, type(var).__name__) + ) def _check_type_or_none(var_type, var, name): if var is not None and not isinstance(var, var_type): - raise ValueError("Argument %s is expected to be either None or of type '%s'" - % (name, var_type.__name__)) + raise ValueError( + "Argument %s is expected to be either None or of type '%s'" + % (name, var_type.__name__) + ) def _listener_wrapper(listener, schema): @@ -1983,8 +2139,10 @@ def wrapper(consumer, msg): m._message = msg m._schema = schema listener(c, m) + return wrapper + def _seek_arg_convert(seek_arg): if isinstance(seek_arg, MessageId): return seek_arg._msg_id diff --git a/setup.py b/setup.py index 76d929b..00e9aec 100755 --- a/setup.py +++ b/setup.py @@ -18,60 +18,60 @@ # under the License. # -from setuptools import setup +import platform +from distutils.command import build_ext from distutils.core import Extension from os import environ, path -import platform -from distutils.command import build_ext +from setuptools import setup def get_version(): root = path.dirname(path.realpath(__file__)) - version_file = path.join(root, 'pulsar', '__about__.py') + version_file = path.join(root, "pulsar", "__about__.py") version = {} with open(version_file) as fp: exec(fp.read(), version) - return version['__version__'] + return version["__version__"] def get_name(): - postfix = environ.get('NAME_POSTFIX', '') - base = 'pulsar-client' + postfix = environ.get("NAME_POSTFIX", "") + base = "pulsar-client" return base + postfix VERSION = get_version() NAME = get_name() -print('NAME: %s' % NAME) -print('VERSION: %s' % VERSION) +print("NAME: %s" % NAME) +print("VERSION: %s" % VERSION) # This is a workaround to have setuptools to include # the already compiled _pulsar.so library class my_build_ext(build_ext.build_ext): def build_extension(self, ext): - import shutil import os.path + import shutil try: os.makedirs(os.path.dirname(self.get_ext_fullpath(ext.name))) except OSError as e: if e.errno != 17: # already exists raise - if 'Windows' in platform.platform(): - shutil.copyfile('_pulsar.pyd', self.get_ext_fullpath(ext.name)) + if "Windows" in platform.platform(): + shutil.copyfile("_pulsar.pyd", self.get_ext_fullpath(ext.name)) else: try: - shutil.copyfile('_pulsar.so', self.get_ext_fullpath(ext.name)) + shutil.copyfile("_pulsar.so", self.get_ext_fullpath(ext.name)) except FileNotFoundError: - shutil.copyfile('lib_pulsar.so', self.get_ext_fullpath(ext.name)) + shutil.copyfile("lib_pulsar.so", self.get_ext_fullpath(ext.name)) # Core Client dependencies dependencies = [ - 'certifi', + "certifi", ] extras_require = {} @@ -79,20 +79,16 @@ def build_extension(self, ext): # functions dependencies extras_require["functions"] = sorted( { - "protobuf>=3.6.1", - "grpcio>=1.59.3", - "apache-bookkeeper-client>=4.16.1", - "prometheus_client", - "ratelimit" + "protobuf>=3.6.1", + "grpcio>=1.59.3", + "apache-bookkeeper-client>=4.16.1", + "prometheus_client", + "ratelimit", } ) # avro dependencies -extras_require["avro"] = sorted( - { - "fastavro>=1.9.2" - } -) +extras_require["avro"] = sorted({"fastavro>=1.9.2"}) # all dependencies extras_require["all"] = sorted(set(sum(extras_require.values(), []))) @@ -100,10 +96,9 @@ def build_extension(self, ext): setup( name=NAME, version=VERSION, - packages=['pulsar', 'pulsar.schema', 'pulsar.functions'], - cmdclass={'build_ext': my_build_ext}, - ext_modules=[Extension('_pulsar', [])], - + packages=["pulsar", "pulsar.schema", "pulsar.functions"], + cmdclass={"build_ext": my_build_ext}, + ext_modules=[Extension("_pulsar", [])], author="Pulsar Devs", author_email="dev@pulsar.apache.org", description="Apache Pulsar Python client library", From efeb155d952694730582768b626a4385f17c7169 Mon Sep 17 00:00:00 2001 From: Matthew Marion Date: Fri, 14 Nov 2025 13:15:57 -0500 Subject: [PATCH 2/4] revert setup.py formatting --- setup.py | 53 +++++++++++++++++++++++++++++------------------------ 1 file changed, 29 insertions(+), 24 deletions(-) diff --git a/setup.py b/setup.py index 00e9aec..76d929b 100755 --- a/setup.py +++ b/setup.py @@ -18,60 +18,60 @@ # under the License. # -import platform -from distutils.command import build_ext +from setuptools import setup from distutils.core import Extension from os import environ, path +import platform -from setuptools import setup +from distutils.command import build_ext def get_version(): root = path.dirname(path.realpath(__file__)) - version_file = path.join(root, "pulsar", "__about__.py") + version_file = path.join(root, 'pulsar', '__about__.py') version = {} with open(version_file) as fp: exec(fp.read(), version) - return version["__version__"] + return version['__version__'] def get_name(): - postfix = environ.get("NAME_POSTFIX", "") - base = "pulsar-client" + postfix = environ.get('NAME_POSTFIX', '') + base = 'pulsar-client' return base + postfix VERSION = get_version() NAME = get_name() -print("NAME: %s" % NAME) -print("VERSION: %s" % VERSION) +print('NAME: %s' % NAME) +print('VERSION: %s' % VERSION) # This is a workaround to have setuptools to include # the already compiled _pulsar.so library class my_build_ext(build_ext.build_ext): def build_extension(self, ext): - import os.path import shutil + import os.path try: os.makedirs(os.path.dirname(self.get_ext_fullpath(ext.name))) except OSError as e: if e.errno != 17: # already exists raise - if "Windows" in platform.platform(): - shutil.copyfile("_pulsar.pyd", self.get_ext_fullpath(ext.name)) + if 'Windows' in platform.platform(): + shutil.copyfile('_pulsar.pyd', self.get_ext_fullpath(ext.name)) else: try: - shutil.copyfile("_pulsar.so", self.get_ext_fullpath(ext.name)) + shutil.copyfile('_pulsar.so', self.get_ext_fullpath(ext.name)) except FileNotFoundError: - shutil.copyfile("lib_pulsar.so", self.get_ext_fullpath(ext.name)) + shutil.copyfile('lib_pulsar.so', self.get_ext_fullpath(ext.name)) # Core Client dependencies dependencies = [ - "certifi", + 'certifi', ] extras_require = {} @@ -79,16 +79,20 @@ def build_extension(self, ext): # functions dependencies extras_require["functions"] = sorted( { - "protobuf>=3.6.1", - "grpcio>=1.59.3", - "apache-bookkeeper-client>=4.16.1", - "prometheus_client", - "ratelimit", + "protobuf>=3.6.1", + "grpcio>=1.59.3", + "apache-bookkeeper-client>=4.16.1", + "prometheus_client", + "ratelimit" } ) # avro dependencies -extras_require["avro"] = sorted({"fastavro>=1.9.2"}) +extras_require["avro"] = sorted( + { + "fastavro>=1.9.2" + } +) # all dependencies extras_require["all"] = sorted(set(sum(extras_require.values(), []))) @@ -96,9 +100,10 @@ def build_extension(self, ext): setup( name=NAME, version=VERSION, - packages=["pulsar", "pulsar.schema", "pulsar.functions"], - cmdclass={"build_ext": my_build_ext}, - ext_modules=[Extension("_pulsar", [])], + packages=['pulsar', 'pulsar.schema', 'pulsar.functions'], + cmdclass={'build_ext': my_build_ext}, + ext_modules=[Extension('_pulsar', [])], + author="Pulsar Devs", author_email="dev@pulsar.apache.org", description="Apache Pulsar Python client library", From 12de19dba52d9968025ea1ca5bd6eeeec06f95a0 Mon Sep 17 00:00:00 2001 From: Matthew Marion Date: Fri, 14 Nov 2025 13:18:27 -0500 Subject: [PATCH 3/4] revert formatting --- pulsar/__init__.py | 636 +++++++++++++++++---------------------------- 1 file changed, 245 insertions(+), 391 deletions(-) diff --git a/pulsar/__init__.py b/pulsar/__init__.py index 3dc01b4..de6f4bf 100644 --- a/pulsar/__init__.py +++ b/pulsar/__init__.py @@ -76,7 +76,7 @@ import re -_retype = type(re.compile("x")) +_retype = type(re.compile('x')) from datetime import timedelta @@ -95,9 +95,7 @@ class MessageId: """ def __init__(self, partition=-1, ledger_id=-1, entry_id=-1, batch_index=-1): - self._msg_id: _pulsar.MessageId = _pulsar.MessageId( - partition, ledger_id, entry_id, batch_index - ) + self._msg_id: _pulsar.MessageId = _pulsar.MessageId(partition, ledger_id, entry_id, batch_index) earliest = _pulsar.MessageId.earliest latest = _pulsar.MessageId.latest @@ -180,7 +178,6 @@ def wrap(cls, msg_id: _pulsar.MessageId): self._msg_id = msg_id return self - class Message: """ Message objects are returned by a consumer, either by calling `receive` or @@ -268,6 +265,7 @@ def _wrap(_message): class MessageBatch: + def __init__(self): self._msg_batch = _pulsar.MessageBatch() @@ -291,7 +289,6 @@ class Authentication: Authentication provider object. Used to load authentication from an external shared library. """ - def __init__(self, dynamicLibPath, authParamsString): """ Create the authentication provider instance. @@ -304,8 +301,8 @@ def __init__(self, dynamicLibPath, authParamsString): authParamsString: str Comma-separated list of provider-specific configuration params """ - _check_type(str, dynamicLibPath, "dynamicLibPath") - _check_type(str, authParamsString, "authParamsString") + _check_type(str, dynamicLibPath, 'dynamicLibPath') + _check_type(str, authParamsString, 'authParamsString') self.auth = _pulsar.Authentication.create(dynamicLibPath, authParamsString) @@ -313,7 +310,6 @@ class AuthenticationTLS(Authentication): """ TLS Authentication implementation """ - def __init__(self, certificate_path, private_key_path): """ Create the TLS authentication provider instance. @@ -326,8 +322,8 @@ def __init__(self, certificate_path, private_key_path): private_key_path: str Path to private TLS key """ - _check_type(str, certificate_path, "certificate_path") - _check_type(str, private_key_path, "private_key_path") + _check_type(str, certificate_path, 'certificate_path') + _check_type(str, private_key_path, 'private_key_path') self.auth = _pulsar.AuthenticationTLS.create(certificate_path, private_key_path) @@ -335,7 +331,6 @@ class AuthenticationToken(Authentication): """ Token based authentication implementation """ - def __init__(self, token): """ Create the token authentication provider instance. @@ -347,9 +342,7 @@ def __init__(self, token): A string containing the token or a functions that provides a string with the token """ if not (isinstance(token, str) or callable(token)): - raise ValueError( - "Argument token is expected to be of type 'str' or a function returning 'str'" - ) + raise ValueError("Argument token is expected to be of type 'str' or a function returning 'str'") self.auth = _pulsar.AuthenticationToken.create(token) @@ -357,7 +350,6 @@ class AuthenticationAthenz(Authentication): """ Athenz Authentication implementation """ - def __init__(self, auth_params_string): """ Create the Athenz authentication provider instance. @@ -368,15 +360,13 @@ def __init__(self, auth_params_string): auth_params_string: str JSON encoded configuration for Athenz client """ - _check_type(str, auth_params_string, "auth_params_string") + _check_type(str, auth_params_string, 'auth_params_string') self.auth = _pulsar.AuthenticationAthenz.create(auth_params_string) - class AuthenticationOauth2(Authentication): """ Oauth2 Authentication implementation """ - def __init__(self, auth_params_string: str): """ Create the Oauth2 authentication provider instance. @@ -416,18 +406,14 @@ def __init__(self, auth_params_string: str): JSON encoded configuration for Oauth2 client """ - _check_type(str, auth_params_string, "auth_params_string") + _check_type(str, auth_params_string, 'auth_params_string') self.auth = _pulsar.AuthenticationOauth2.create(auth_params_string) - class AuthenticationBasic(Authentication): """ Basic Authentication implementation """ - - def __init__( - self, username=None, password=None, method="basic", auth_params_string=None - ): + def __init__(self, username=None, password=None, method='basic', auth_params_string=None): """ Create the Basic authentication provider instance. @@ -456,26 +442,22 @@ def __init__( "basic" by default. """ if auth_params_string is not None: - _check_type(str, auth_params_string, "auth_params_string") + _check_type(str, auth_params_string, 'auth_params_string') self.auth = _pulsar.AuthenticationBasic.create(auth_params_string) else: - _check_type(str, username, "username") - _check_type(str, password, "password") - _check_type(str, method, "method") + _check_type(str, username, 'username') + _check_type(str, password, 'password') + _check_type(str, method, 'method') self.auth = _pulsar.AuthenticationBasic.create(username, password, method) - class ConsumerDeadLetterPolicy: """ Configuration for the "dead letter queue" feature in consumer. """ - - def __init__( - self, - max_redeliver_count: int, - dead_letter_topic: str = None, - initial_subscription_name: str = None, - ): + def __init__(self, + max_redeliver_count: int, + dead_letter_topic: str = None, + initial_subscription_name: str = None): """ Wrapper DeadLetterPolicy. @@ -527,12 +509,10 @@ def policy(self): """ return self._policy - class CryptoKeyReader: """ Default crypto key reader implementation """ - def __init__(self, public_key_path, private_key_path): """ Create crypto key reader. @@ -545,12 +525,9 @@ def __init__(self, public_key_path, private_key_path): private_key_path: str Path to private key """ - _check_type(str, public_key_path, "public_key_path") - _check_type(str, private_key_path, "private_key_path") - self.cryptoKeyReader = _pulsar.CryptoKeyReader( - public_key_path, private_key_path - ) - + _check_type(str, public_key_path, 'public_key_path') + _check_type(str, private_key_path, 'private_key_path') + self.cryptoKeyReader = _pulsar.CryptoKeyReader(public_key_path, private_key_path) class Client: """ @@ -561,26 +538,24 @@ class Client: producers and consumers. """ - def __init__( - self, - service_url, - authentication=None, - operation_timeout_seconds=30, - io_threads=1, - message_listener_threads=1, - concurrent_lookup_requests=50000, - log_conf_file_path=None, - stats_interval_in_seconds=600, - use_tls=False, - tls_trust_certs_file_path=None, - tls_allow_insecure_connection=False, - tls_validate_hostname=False, - logger=None, - connection_timeout_ms=10000, - listener_name=None, - tls_private_key_file_path: Optional[str] = None, - tls_certificate_file_path: Optional[str] = None, - ): + def __init__(self, service_url, + authentication=None, + operation_timeout_seconds=30, + io_threads=1, + message_listener_threads=1, + concurrent_lookup_requests=50000, + log_conf_file_path=None, + stats_interval_in_seconds=600, + use_tls=False, + tls_trust_certs_file_path=None, + tls_allow_insecure_connection=False, + tls_validate_hostname=False, + logger=None, + connection_timeout_ms=10000, + listener_name=None, + tls_private_key_file_path: Optional[str] = None, + tls_certificate_file_path: Optional[str] = None, + ): """ Create a new Pulsar client instance. @@ -650,24 +625,22 @@ def __init__( tls_certificate_file_path: str, optional The path to the TLS certificate file. """ - _check_type(str, service_url, "service_url") - _check_type_or_none(Authentication, authentication, "authentication") - _check_type(int, operation_timeout_seconds, "operation_timeout_seconds") - _check_type(int, connection_timeout_ms, "connection_timeout_ms") - _check_type(int, io_threads, "io_threads") - _check_type(int, message_listener_threads, "message_listener_threads") - _check_type(int, concurrent_lookup_requests, "concurrent_lookup_requests") - _check_type_or_none(str, log_conf_file_path, "log_conf_file_path") - _check_type(int, stats_interval_in_seconds, "stats_interval_in_seconds") - _check_type(bool, use_tls, "use_tls") - _check_type_or_none(str, tls_trust_certs_file_path, "tls_trust_certs_file_path") - _check_type( - bool, tls_allow_insecure_connection, "tls_allow_insecure_connection" - ) - _check_type(bool, tls_validate_hostname, "tls_validate_hostname") - _check_type_or_none(str, listener_name, "listener_name") - _check_type_or_none(str, tls_private_key_file_path, "tls_private_key_file_path") - _check_type_or_none(str, tls_certificate_file_path, "tls_certificate_file_path") + _check_type(str, service_url, 'service_url') + _check_type_or_none(Authentication, authentication, 'authentication') + _check_type(int, operation_timeout_seconds, 'operation_timeout_seconds') + _check_type(int, connection_timeout_ms, 'connection_timeout_ms') + _check_type(int, io_threads, 'io_threads') + _check_type(int, message_listener_threads, 'message_listener_threads') + _check_type(int, concurrent_lookup_requests, 'concurrent_lookup_requests') + _check_type_or_none(str, log_conf_file_path, 'log_conf_file_path') + _check_type(int, stats_interval_in_seconds, 'stats_interval_in_seconds') + _check_type(bool, use_tls, 'use_tls') + _check_type_or_none(str, tls_trust_certs_file_path, 'tls_trust_certs_file_path') + _check_type(bool, tls_allow_insecure_connection, 'tls_allow_insecure_connection') + _check_type(bool, tls_validate_hostname, 'tls_validate_hostname') + _check_type_or_none(str, listener_name, 'listener_name') + _check_type_or_none(str, tls_private_key_file_path, 'tls_private_key_file_path') + _check_type_or_none(str, tls_certificate_file_path, 'tls_certificate_file_path') conf = _pulsar.ClientConfiguration() if authentication: @@ -686,23 +659,16 @@ def __init__( elif isinstance(logger, FileLogger): conf.set_file_logger(logger.log_level, logger.log_file) elif logger is not None: - raise ValueError( - "Logger is expected to be either None, logger.Logger, pulsar.ConsoleLogger or pulsar.FileLogger" - ) + raise ValueError("Logger is expected to be either None, logger.Logger, pulsar.ConsoleLogger or pulsar.FileLogger") if listener_name: conf.listener_name(listener_name) - if ( - use_tls - or service_url.startswith("pulsar+ssl://") - or service_url.startswith("https://") - ): + if use_tls or service_url.startswith('pulsar+ssl://') or service_url.startswith('https://'): conf.use_tls(True) if tls_trust_certs_file_path: conf.tls_trust_certs_file_path(tls_trust_certs_file_path) else: import certifi - conf.tls_trust_certs_file_path(certifi.where()) conf.tls_allow_insecure_connection(tls_allow_insecure_connection) conf.tls_validate_hostname(tls_validate_hostname) @@ -716,40 +682,36 @@ def __init__( @staticmethod def _prepare_logger(logger): import logging - def log(level, message): old_threads = logging.logThreads logging.logThreads = False logger.log(logging.getLevelName(level), message) logging.logThreads = old_threads - return log - def create_producer( - self, - topic, - producer_name=None, - schema=schema.BytesSchema(), - initial_sequence_id=None, - send_timeout_millis=30000, - compression_type: CompressionType = CompressionType.NONE, - max_pending_messages=1000, - max_pending_messages_across_partitions=50000, - block_if_queue_full=False, - batching_enabled=False, - batching_max_messages=1000, - batching_max_allowed_size_in_bytes=128 * 1024, - batching_max_publish_delay_ms=10, - chunking_enabled=False, - message_routing_mode: PartitionsRoutingMode = PartitionsRoutingMode.RoundRobinDistribution, - lazy_start_partitioned_producers=False, - properties=None, - batching_type: BatchingType = BatchingType.Default, - encryption_key=None, - crypto_key_reader: Union[None, CryptoKeyReader] = None, - access_mode: ProducerAccessMode = ProducerAccessMode.Shared, - message_router: Callable[[Message, int], int] = None, - ): + def create_producer(self, topic, + producer_name=None, + schema=schema.BytesSchema(), + initial_sequence_id=None, + send_timeout_millis=30000, + compression_type: CompressionType = CompressionType.NONE, + max_pending_messages=1000, + max_pending_messages_across_partitions=50000, + block_if_queue_full=False, + batching_enabled=False, + batching_max_messages=1000, + batching_max_allowed_size_in_bytes=128*1024, + batching_max_publish_delay_ms=10, + chunking_enabled=False, + message_routing_mode: PartitionsRoutingMode = PartitionsRoutingMode.RoundRobinDistribution, + lazy_start_partitioned_producers=False, + properties=None, + batching_type: BatchingType = BatchingType.Default, + encryption_key=None, + crypto_key_reader: Union[None, CryptoKeyReader] = None, + access_mode: ProducerAccessMode = ProducerAccessMode.Shared, + message_router: Callable[[Message, int], int]=None, + ): """ Create a new producer on a given topic. @@ -869,44 +831,32 @@ def create_producer( and returns the partition index to which the message should be routed. If not provided, the default routing policy defined by `message_routing_mode` will be used. """ - _check_type(str, topic, "topic") - _check_type_or_none(str, producer_name, "producer_name") - _check_type(_schema.Schema, schema, "schema") - _check_type_or_none(int, initial_sequence_id, "initial_sequence_id") - _check_type(int, send_timeout_millis, "send_timeout_millis") - _check_type(CompressionType, compression_type, "compression_type") - _check_type(int, max_pending_messages, "max_pending_messages") - _check_type( - int, - max_pending_messages_across_partitions, - "max_pending_messages_across_partitions", - ) - _check_type(bool, block_if_queue_full, "block_if_queue_full") - _check_type(bool, batching_enabled, "batching_enabled") - _check_type(int, batching_max_messages, "batching_max_messages") - _check_type( - int, - batching_max_allowed_size_in_bytes, - "batching_max_allowed_size_in_bytes", - ) - _check_type(int, batching_max_publish_delay_ms, "batching_max_publish_delay_ms") - _check_type(bool, chunking_enabled, "chunking_enabled") - _check_type_or_none(dict, properties, "properties") - _check_type(BatchingType, batching_type, "batching_type") - _check_type_or_none(str, encryption_key, "encryption_key") - _check_type_or_none(CryptoKeyReader, crypto_key_reader, "crypto_key_reader") - _check_type( - bool, lazy_start_partitioned_producers, "lazy_start_partitioned_producers" - ) - _check_type(ProducerAccessMode, access_mode, "access_mode") + _check_type(str, topic, 'topic') + _check_type_or_none(str, producer_name, 'producer_name') + _check_type(_schema.Schema, schema, 'schema') + _check_type_or_none(int, initial_sequence_id, 'initial_sequence_id') + _check_type(int, send_timeout_millis, 'send_timeout_millis') + _check_type(CompressionType, compression_type, 'compression_type') + _check_type(int, max_pending_messages, 'max_pending_messages') + _check_type(int, max_pending_messages_across_partitions, 'max_pending_messages_across_partitions') + _check_type(bool, block_if_queue_full, 'block_if_queue_full') + _check_type(bool, batching_enabled, 'batching_enabled') + _check_type(int, batching_max_messages, 'batching_max_messages') + _check_type(int, batching_max_allowed_size_in_bytes, 'batching_max_allowed_size_in_bytes') + _check_type(int, batching_max_publish_delay_ms, 'batching_max_publish_delay_ms') + _check_type(bool, chunking_enabled, 'chunking_enabled') + _check_type_or_none(dict, properties, 'properties') + _check_type(BatchingType, batching_type, 'batching_type') + _check_type_or_none(str, encryption_key, 'encryption_key') + _check_type_or_none(CryptoKeyReader, crypto_key_reader, 'crypto_key_reader') + _check_type(bool, lazy_start_partitioned_producers, 'lazy_start_partitioned_producers') + _check_type(ProducerAccessMode, access_mode, 'access_mode') conf = _pulsar.ProducerConfiguration() conf.send_timeout_millis(send_timeout_millis) conf.compression_type(compression_type) conf.max_pending_messages(max_pending_messages) - conf.max_pending_messages_across_partitions( - max_pending_messages_across_partitions - ) + conf.max_pending_messages_across_partitions(max_pending_messages_across_partitions) conf.block_if_queue_full(block_if_queue_full) conf.batching_enabled(batching_enabled) conf.batching_max_messages(batching_max_messages) @@ -918,9 +868,7 @@ def create_producer( conf.lazy_start_partitioned_producers(lazy_start_partitioned_producers) conf.access_mode(access_mode) if message_router is not None: - underlying_router = lambda msg, num_partitions: int( - message_router(Message._wrap(msg), num_partitions) - ) + underlying_router = lambda msg, num_partitions: int(message_router(Message._wrap(msg), num_partitions)) conf.message_router(underlying_router) if producer_name: @@ -938,9 +886,7 @@ def create_producer( conf.crypto_key_reader(crypto_key_reader.cryptoKeyReader) if batching_enabled and chunking_enabled: - raise ValueError( - "Batching and chunking of messages can't be enabled together." - ) + raise ValueError("Batching and chunking of messages can't be enabled together.") p = Producer() p._producer = self._client.create_producer(topic, conf) @@ -948,35 +894,32 @@ def create_producer( p._client = self._client return p - def subscribe( - self, - topic, - subscription_name, - consumer_type: ConsumerType = ConsumerType.Exclusive, - schema=schema.BytesSchema(), - message_listener=None, - receiver_queue_size=1000, - max_total_receiver_queue_size_across_partitions=50000, - consumer_name=None, - unacked_messages_timeout_ms=None, - broker_consumer_stats_cache_time_ms=30000, - negative_ack_redelivery_delay_ms=60000, - is_read_compacted=False, - properties=None, - pattern_auto_discovery_period=60, - initial_position: InitialPosition = InitialPosition.Latest, - crypto_key_reader: Union[None, CryptoKeyReader] = None, - replicate_subscription_state_enabled=False, - max_pending_chunked_message=10, - auto_ack_oldest_chunked_message_on_queue_full=False, - start_message_id_inclusive=False, - batch_receive_policy=None, - key_shared_policy=None, - batch_index_ack_enabled=False, - regex_subscription_mode: RegexSubscriptionMode = RegexSubscriptionMode.PersistentOnly, - dead_letter_policy: Union[None, ConsumerDeadLetterPolicy] = None, - crypto_failure_action: ConsumerCryptoFailureAction = ConsumerCryptoFailureAction.FAIL, - ): + def subscribe(self, topic, subscription_name, + consumer_type: ConsumerType = ConsumerType.Exclusive, + schema=schema.BytesSchema(), + message_listener=None, + receiver_queue_size=1000, + max_total_receiver_queue_size_across_partitions=50000, + consumer_name=None, + unacked_messages_timeout_ms=None, + broker_consumer_stats_cache_time_ms=30000, + negative_ack_redelivery_delay_ms=60000, + is_read_compacted=False, + properties=None, + pattern_auto_discovery_period=60, + initial_position: InitialPosition = InitialPosition.Latest, + crypto_key_reader: Union[None, CryptoKeyReader] = None, + replicate_subscription_state_enabled=False, + max_pending_chunked_message=10, + auto_ack_oldest_chunked_message_on_queue_full=False, + start_message_id_inclusive=False, + batch_receive_policy=None, + key_shared_policy=None, + batch_index_ack_enabled=False, + regex_subscription_mode: RegexSubscriptionMode = RegexSubscriptionMode.PersistentOnly, + dead_letter_policy: Union[None, ConsumerDeadLetterPolicy] = None, + crypto_failure_action: ConsumerCryptoFailureAction = ConsumerCryptoFailureAction.FAIL, + ): """ Subscribe to the given topic and subscription combination. @@ -1093,52 +1036,29 @@ def my_listener(consumer, message): message contains batch messages, client will not be able to retrieve individual messages in the batch. """ - _check_type(str, subscription_name, "subscription_name") - _check_type(ConsumerType, consumer_type, "consumer_type") - _check_type(_schema.Schema, schema, "schema") - _check_type(int, receiver_queue_size, "receiver_queue_size") - _check_type( - int, - max_total_receiver_queue_size_across_partitions, - "max_total_receiver_queue_size_across_partitions", - ) - _check_type_or_none(str, consumer_name, "consumer_name") - _check_type_or_none( - int, unacked_messages_timeout_ms, "unacked_messages_timeout_ms" - ) - _check_type( - int, - broker_consumer_stats_cache_time_ms, - "broker_consumer_stats_cache_time_ms", - ) - _check_type( - int, negative_ack_redelivery_delay_ms, "negative_ack_redelivery_delay_ms" - ) - _check_type(int, pattern_auto_discovery_period, "pattern_auto_discovery_period") - _check_type(bool, is_read_compacted, "is_read_compacted") - _check_type_or_none(dict, properties, "properties") - _check_type(InitialPosition, initial_position, "initial_position") - _check_type_or_none(CryptoKeyReader, crypto_key_reader, "crypto_key_reader") - _check_type(int, max_pending_chunked_message, "max_pending_chunked_message") - _check_type( - bool, - auto_ack_oldest_chunked_message_on_queue_full, - "auto_ack_oldest_chunked_message_on_queue_full", - ) - _check_type(bool, start_message_id_inclusive, "start_message_id_inclusive") - _check_type_or_none( - ConsumerBatchReceivePolicy, batch_receive_policy, "batch_receive_policy" - ) - _check_type_or_none( - ConsumerKeySharedPolicy, key_shared_policy, "key_shared_policy" - ) - _check_type(bool, batch_index_ack_enabled, "batch_index_ack_enabled") - _check_type( - RegexSubscriptionMode, regex_subscription_mode, "regex_subscription_mode" - ) - _check_type( - ConsumerCryptoFailureAction, crypto_failure_action, "crypto_failure_action" - ) + _check_type(str, subscription_name, 'subscription_name') + _check_type(ConsumerType, consumer_type, 'consumer_type') + _check_type(_schema.Schema, schema, 'schema') + _check_type(int, receiver_queue_size, 'receiver_queue_size') + _check_type(int, max_total_receiver_queue_size_across_partitions, + 'max_total_receiver_queue_size_across_partitions') + _check_type_or_none(str, consumer_name, 'consumer_name') + _check_type_or_none(int, unacked_messages_timeout_ms, 'unacked_messages_timeout_ms') + _check_type(int, broker_consumer_stats_cache_time_ms, 'broker_consumer_stats_cache_time_ms') + _check_type(int, negative_ack_redelivery_delay_ms, 'negative_ack_redelivery_delay_ms') + _check_type(int, pattern_auto_discovery_period, 'pattern_auto_discovery_period') + _check_type(bool, is_read_compacted, 'is_read_compacted') + _check_type_or_none(dict, properties, 'properties') + _check_type(InitialPosition, initial_position, 'initial_position') + _check_type_or_none(CryptoKeyReader, crypto_key_reader, 'crypto_key_reader') + _check_type(int, max_pending_chunked_message, 'max_pending_chunked_message') + _check_type(bool, auto_ack_oldest_chunked_message_on_queue_full, 'auto_ack_oldest_chunked_message_on_queue_full') + _check_type(bool, start_message_id_inclusive, 'start_message_id_inclusive') + _check_type_or_none(ConsumerBatchReceivePolicy, batch_receive_policy, 'batch_receive_policy') + _check_type_or_none(ConsumerKeySharedPolicy, key_shared_policy, 'key_shared_policy') + _check_type(bool, batch_index_ack_enabled, 'batch_index_ack_enabled') + _check_type(RegexSubscriptionMode, regex_subscription_mode, 'regex_subscription_mode') + _check_type(ConsumerCryptoFailureAction, crypto_failure_action, 'crypto_failure_action') conf = _pulsar.ConsumerConfiguration() conf.consumer_type(consumer_type) @@ -1147,9 +1067,7 @@ def my_listener(consumer, message): if message_listener: conf.message_listener(_listener_wrapper(message_listener, schema)) conf.receiver_queue_size(receiver_queue_size) - conf.max_total_receiver_queue_size_across_partitions( - max_total_receiver_queue_size_across_partitions - ) + conf.max_total_receiver_queue_size_across_partitions(max_total_receiver_queue_size_across_partitions) if consumer_name: conf.consumer_name(consumer_name) if unacked_messages_timeout_ms: @@ -1169,9 +1087,7 @@ def my_listener(consumer, message): conf.replicate_subscription_state_enabled(replicate_subscription_state_enabled) conf.max_pending_chunked_message(max_pending_chunked_message) - conf.auto_ack_oldest_chunked_message_on_queue_full( - auto_ack_oldest_chunked_message_on_queue_full - ) + conf.auto_ack_oldest_chunked_message_on_queue_full(auto_ack_oldest_chunked_message_on_queue_full) conf.start_message_id_inclusive(start_message_id_inclusive) if batch_receive_policy: conf.batch_receive_policy(batch_receive_policy.policy()) @@ -1192,13 +1108,9 @@ def my_listener(consumer, message): c._consumer = self._client.subscribe_topics(topic, subscription_name, conf) elif isinstance(topic, _retype): # Regex pattern - c._consumer = self._client.subscribe_pattern( - topic.pattern, subscription_name, conf - ) + c._consumer = self._client.subscribe_pattern(topic.pattern, subscription_name, conf) else: - raise ValueError( - "Argument 'topic' is expected to be of a type between (str, list, re.pattern)" - ) + raise ValueError("Argument 'topic' is expected to be of a type between (str, list, re.pattern)") c._client = self c._schema = schema @@ -1206,20 +1118,17 @@ def my_listener(consumer, message): self._consumers.append(c) return c - def create_reader( - self, - topic, - start_message_id, - schema=schema.BytesSchema(), - reader_listener=None, - receiver_queue_size=1000, - reader_name=None, - subscription_role_prefix=None, - is_read_compacted=False, - crypto_key_reader: Union[None, CryptoKeyReader] = None, - start_message_id_inclusive=False, - crypto_failure_action: ConsumerCryptoFailureAction = ConsumerCryptoFailureAction.FAIL, - ): + def create_reader(self, topic, start_message_id, + schema=schema.BytesSchema(), + reader_listener=None, + receiver_queue_size=1000, + reader_name=None, + subscription_role_prefix=None, + is_read_compacted=False, + crypto_key_reader: Union[None, CryptoKeyReader] = None, + start_message_id_inclusive=False, + crypto_failure_action: ConsumerCryptoFailureAction = ConsumerCryptoFailureAction.FAIL, + ): """ Create a reader on a particular topic @@ -1298,18 +1207,16 @@ def my_listener(reader, message): if isinstance(start_message_id, MessageId): start_message_id = start_message_id._msg_id - _check_type(str, topic, "topic") - _check_type(_pulsar.MessageId, start_message_id, "start_message_id") - _check_type(_schema.Schema, schema, "schema") - _check_type(int, receiver_queue_size, "receiver_queue_size") - _check_type_or_none(str, reader_name, "reader_name") - _check_type_or_none(str, subscription_role_prefix, "subscription_role_prefix") - _check_type(bool, is_read_compacted, "is_read_compacted") - _check_type_or_none(CryptoKeyReader, crypto_key_reader, "crypto_key_reader") - _check_type(bool, start_message_id_inclusive, "start_message_id_inclusive") - _check_type( - ConsumerCryptoFailureAction, crypto_failure_action, "crypto_failure_action" - ) + _check_type(str, topic, 'topic') + _check_type(_pulsar.MessageId, start_message_id, 'start_message_id') + _check_type(_schema.Schema, schema, 'schema') + _check_type(int, receiver_queue_size, 'receiver_queue_size') + _check_type_or_none(str, reader_name, 'reader_name') + _check_type_or_none(str, subscription_role_prefix, 'subscription_role_prefix') + _check_type(bool, is_read_compacted, 'is_read_compacted') + _check_type_or_none(CryptoKeyReader, crypto_key_reader, 'crypto_key_reader') + _check_type(bool, start_message_id_inclusive, 'start_message_id_inclusive') + _check_type(ConsumerCryptoFailureAction, crypto_failure_action, 'crypto_failure_action') conf = _pulsar.ReaderConfiguration() if reader_listener: @@ -1334,12 +1241,9 @@ def my_listener(reader, message): self._consumers.append(c) return c - def create_table_view( - self, - topic: str, - subscription_name: Optional[str] = None, - schema: schema.Schema = schema.BytesSchema(), - ) -> TableView: + def create_table_view(self, topic: str, + subscription_name: Optional[str] = None, + schema: schema.Schema = schema.BytesSchema()) -> TableView: """ Create a table view on a particular topic @@ -1361,9 +1265,9 @@ def create_table_view( TableView A table view instance. """ - _check_type(str, topic, "topic") - _check_type_or_none(str, subscription_name, "subscription_name") - _check_type(_schema.Schema, schema, "schema") + _check_type(str, topic, 'topic') + _check_type_or_none(str, subscription_name, 'subscription_name') + _check_type(_schema.Schema, schema, 'schema') tv_conf = _pulsar.TableViewConfiguration() if subscription_name is not None: @@ -1394,7 +1298,7 @@ def get_topic_partitions(self, topic): list a list of partition name """ - _check_type(str, topic, "topic") + _check_type(str, topic, 'topic') return self._client.get_topic_partitions(topic) def shutdown(self): @@ -1457,19 +1361,17 @@ def last_sequence_id(self): """ return self._producer.last_sequence_id() - def send( - self, - content, - properties=None, - partition_key=None, - ordering_key=None, - sequence_id=None, - replication_clusters=None, - disable_replication=False, - event_timestamp=None, - deliver_at=None, - deliver_after=None, - ) -> _pulsar.MessageId: + def send(self, content, + properties=None, + partition_key=None, + ordering_key=None, + sequence_id=None, + replication_clusters=None, + disable_replication=False, + event_timestamp=None, + deliver_at=None, + deliver_after=None, + ) -> _pulsar.MessageId: """ Publish a message on the topic. Blocks until the message is acknowledged @@ -1507,34 +1409,22 @@ def send( ---------- A `_pulsar.MessageId` object that represents where the message is persisted. """ - msg = self._build_msg( - content, - properties, - partition_key, - ordering_key, - sequence_id, - replication_clusters, - disable_replication, - event_timestamp, - deliver_at, - deliver_after, - ) + msg = self._build_msg(content, properties, partition_key, ordering_key, sequence_id, + replication_clusters, disable_replication, event_timestamp, + deliver_at, deliver_after) return self._producer.send(msg) - def send_async( - self, - content, - callback, - properties=None, - partition_key=None, - ordering_key=None, - sequence_id=None, - replication_clusters=None, - disable_replication=False, - event_timestamp=None, - deliver_at=None, - deliver_after=None, - ): + def send_async(self, content, callback, + properties=None, + partition_key=None, + ordering_key=None, + sequence_id=None, + replication_clusters=None, + disable_replication=False, + event_timestamp=None, + deliver_at=None, + deliver_after=None, + ): """ Send a message asynchronously. @@ -1598,20 +1488,12 @@ def callback(res, msg_id): deliver_after: optional Specify a delay in timedelta for the delivery of the messages. """ - msg = self._build_msg( - content, - properties, - partition_key, - ordering_key, - sequence_id, - replication_clusters, - disable_replication, - event_timestamp, - deliver_at, - deliver_after, - ) + msg = self._build_msg(content, properties, partition_key, ordering_key, sequence_id, + replication_clusters, disable_replication, event_timestamp, + deliver_at, deliver_after) self._producer.send_async(msg, callback) + def flush(self): """ Flush all the messages buffered in the client and wait until all messages have been @@ -1619,37 +1501,28 @@ def flush(self): """ self._producer.flush() + def close(self): """ Close the producer. """ self._producer.close() - def _build_msg( - self, - content, - properties, - partition_key, - ordering_key, - sequence_id, - replication_clusters, - disable_replication, - event_timestamp, - deliver_at, - deliver_after, - ): + def _build_msg(self, content, properties, partition_key, ordering_key, sequence_id, + replication_clusters, disable_replication, event_timestamp, + deliver_at, deliver_after): data = self._schema.encode(content) - _check_type(bytes, data, "data") - _check_type_or_none(dict, properties, "properties") - _check_type_or_none(str, partition_key, "partition_key") - _check_type_or_none(str, ordering_key, "ordering_key") - _check_type_or_none(int, sequence_id, "sequence_id") - _check_type_or_none(list, replication_clusters, "replication_clusters") - _check_type(bool, disable_replication, "disable_replication") - _check_type_or_none(int, event_timestamp, "event_timestamp") - _check_type_or_none(int, deliver_at, "deliver_at") - _check_type_or_none(timedelta, deliver_after, "deliver_after") + _check_type(bytes, data, 'data') + _check_type_or_none(dict, properties, 'properties') + _check_type_or_none(str, partition_key, 'partition_key') + _check_type_or_none(str, ordering_key, 'ordering_key') + _check_type_or_none(int, sequence_id, 'sequence_id') + _check_type_or_none(list, replication_clusters, 'replication_clusters') + _check_type(bool, disable_replication, 'disable_replication') + _check_type_or_none(int, event_timestamp, 'event_timestamp') + _check_type_or_none(int, deliver_at, 'deliver_at') + _check_type_or_none(timedelta, deliver_after, 'deliver_after') mb = _pulsar.MessageBuilder() mb.content(data) @@ -1751,7 +1624,7 @@ def receive(self, timeout_millis=None): if timeout_millis is None: msg = self._consumer.receive() else: - _check_type(int, timeout_millis, "timeout_millis") + _check_type(int, timeout_millis, 'timeout_millis') msg = self._consumer.receive(timeout_millis) m = Message() @@ -1774,9 +1647,7 @@ def batch_receive(self): messages.append(m) return messages - def acknowledge( - self, message: Union[Message, MessageId, _pulsar.Message, _pulsar.MessageId] - ): + def acknowledge(self, message: Union[Message, MessageId, _pulsar.Message, _pulsar.MessageId]): """ Acknowledge the reception of a single message. @@ -1800,9 +1671,7 @@ def acknowledge( else: self._consumer.acknowledge(message) - def acknowledge_cumulative( - self, message: Union[Message, MessageId, _pulsar.Message, _pulsar.MessageId] - ): + def acknowledge_cumulative(self, message: Union[Message, MessageId, _pulsar.Message, _pulsar.MessageId]): """ Acknowledge the reception of all the messages in the stream up to (and including) the provided message. @@ -1908,7 +1777,6 @@ def get_last_message_id(self): """ return self._consumer.get_last_message_id() - class ConsumerBatchReceivePolicy: """ Batch receive policy can limit the number and bytes of messages in a single batch, @@ -1917,7 +1785,6 @@ class ConsumerBatchReceivePolicy: A batch receive action is completed as long as any one of the conditions (the batch has enough number or size of messages, or the waiting timeout is passed) are met. """ - def __init__(self, max_num_message, max_num_bytes, timeout_ms): """ Wrapper BatchReceivePolicy. @@ -1937,17 +1804,15 @@ def policy(self): """ return self._policy - class ConsumerKeySharedPolicy: """ Consumer key shared policy is used to configure the consumer behaviour when the ConsumerType is KeyShared. """ - def __init__( - self, - key_shared_mode: KeySharedMode = KeySharedMode.AutoSplit, - allow_out_of_order_delivery: bool = False, - sticky_ranges: Optional[List[Tuple[int, int]]] = None, + self, + key_shared_mode: KeySharedMode = KeySharedMode.AutoSplit, + allow_out_of_order_delivery: bool = False, + sticky_ranges: Optional[List[Tuple[int, int]]] = None, ): """ Wrapper KeySharedPolicy. @@ -1970,9 +1835,7 @@ def __init__( Set the ranges used with sticky mode. The integers can be from 0 to 2^16 (0 <= val < 65,536) """ if key_shared_mode == KeySharedMode.Sticky and sticky_ranges is None: - raise ValueError( - "When using key_shared_mode = KeySharedMode.Sticky you must also provide sticky_ranges" - ) + raise ValueError("When using key_shared_mode = KeySharedMode.Sticky you must also provide sticky_ranges") self._policy = KeySharedPolicy() self._policy.set_key_shared_mode(key_shared_mode) @@ -2008,7 +1871,6 @@ def policy(self): """ return self._policy - class Reader: """ Pulsar topic reader. @@ -2036,7 +1898,7 @@ def read_next(self, timeout_millis=None): if timeout_millis is None: msg = self._reader.read_next() else: - _check_type(int, timeout_millis, "timeout_millis") + _check_type(int, timeout_millis, 'timeout_millis') msg = self._reader.read_next(timeout_millis) m = Message() @@ -2048,7 +1910,7 @@ def has_message_available(self): """ Check if there is any message available to read from the current position. """ - return self._reader.has_message_available() + return self._reader.has_message_available(); def seek(self, messageid: Union[MessageId, _pulsar.MessageId, int]): """ @@ -2089,9 +1951,8 @@ class ConsoleLogger: log_level: The logging level, eg: ``pulsar.LoggerLevel.Info`` """ - def __init__(self, log_level=_pulsar.LoggerLevel.Info): - _check_type(_pulsar.LoggerLevel, log_level, "log_level") + _check_type(_pulsar.LoggerLevel, log_level, 'log_level') self.log_level = log_level @@ -2107,28 +1968,23 @@ class FileLogger: log_file: The file where to write the logs """ - def __init__(self, log_level, log_file): - _check_type(_pulsar.LoggerLevel, log_level, "log_level") - _check_type(str, log_file, "log_file") + _check_type(_pulsar.LoggerLevel, log_level, 'log_level') + _check_type(str, log_file, 'log_file') self.log_level = log_level self.log_file = log_file def _check_type(var_type, var, name): if not isinstance(var, var_type): - raise ValueError( - "Argument %s is expected to be of type '%s' and not '%s'" - % (name, var_type.__name__, type(var).__name__) - ) + raise ValueError("Argument %s is expected to be of type '%s' and not '%s'" + % (name, var_type.__name__, type(var).__name__)) def _check_type_or_none(var_type, var, name): if var is not None and not isinstance(var, var_type): - raise ValueError( - "Argument %s is expected to be either None or of type '%s'" - % (name, var_type.__name__) - ) + raise ValueError("Argument %s is expected to be either None or of type '%s'" + % (name, var_type.__name__)) def _listener_wrapper(listener, schema): @@ -2139,10 +1995,8 @@ def wrapper(consumer, msg): m._message = msg m._schema = schema listener(c, m) - return wrapper - def _seek_arg_convert(seek_arg): if isinstance(seek_arg, MessageId): return seek_arg._msg_id From 43527603d3995ce1350f016914ee66e821b63af8 Mon Sep 17 00:00:00 2001 From: Matthew Marion Date: Fri, 14 Nov 2025 13:20:24 -0500 Subject: [PATCH 4/4] format revert --- pulsar/__init__.py | 34 +++++++++++----------------------- 1 file changed, 11 insertions(+), 23 deletions(-) diff --git a/pulsar/__init__.py b/pulsar/__init__.py index de6f4bf..f415387 100644 --- a/pulsar/__init__.py +++ b/pulsar/__init__.py @@ -21,7 +21,7 @@ The Pulsar Python client library is based on the existing C++ client library. All the same features are exposed through the Python interface. -Currently, the supported Python versions are 3.9, 3.10, 3.11, 3.12, 3.13, and 3.14. +Currently, the supported Python versions are 3.7, 3.8, 3.9, 3.10, 3.11, 3.12, 3.13, and 3.14. ================= Install from PyPI @@ -43,39 +43,27 @@ """ import logging -from typing import Callable, List, Optional, Tuple, Union +from typing import Callable, List, Tuple, Optional, Union import _pulsar -from _pulsar import ( - BatchingType, - BatchReceivePolicy, - CompressionType, - ConsumerCryptoFailureAction, - ConsumerType, - DeadLetterPolicyBuilder, # noqa: F401 - InitialPosition, - KeySharedMode, - KeySharedPolicy, - LoggerLevel, - PartitionsRoutingMode, - ProducerAccessMode, - RegexSubscriptionMode, - Result, -) -from pulsar import schema +from _pulsar import Result, CompressionType, ConsumerType, InitialPosition, PartitionsRoutingMode, BatchingType, \ + LoggerLevel, BatchReceivePolicy, KeySharedPolicy, KeySharedMode, ProducerAccessMode, RegexSubscriptionMode, \ + DeadLetterPolicyBuilder, ConsumerCryptoFailureAction # noqa: F401 + from pulsar.__about__ import __version__ + from pulsar.exceptions import * -from pulsar.functions.context import Context -from pulsar.functions.function import Function -from pulsar.functions.serde import IdentitySerDe, PickleSerDe, SerDe from pulsar.schema.schema import BytesSchema from pulsar.tableview import TableView +from pulsar.functions.function import Function +from pulsar.functions.context import Context +from pulsar.functions.serde import SerDe, IdentitySerDe, PickleSerDe +from pulsar import schema _schema = schema import re - _retype = type(re.compile('x')) from datetime import timedelta