diff --git a/CMakeLists.txt b/CMakeLists.txt index 660a1b0..28b377c 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -48,7 +48,7 @@ message(STATUS "PULSAR_LIBRARY: ${PULSAR_LIBRARY}") find_path(PULSAR_INCLUDE pulsar/Client.h) message(STATUS "PULSAR_INCLUDE: ${PULSAR_INCLUDE}") -SET(CMAKE_CXX_STANDARD 11) +SET(CMAKE_CXX_STANDARD 17) find_package (Python3 REQUIRED COMPONENTS Development.Module) MESSAGE(STATUS "PYTHON: " ${Python3_VERSION} " - " ${Python3_INCLUDE_DIRS}) diff --git a/build-support/dep-url.sh b/build-support/dep-url.sh index 7670bb9..b949426 100644 --- a/build-support/dep-url.sh +++ b/build-support/dep-url.sh @@ -23,7 +23,7 @@ pulsar_cpp_base_url() { exit 1 fi VERSION=$1 - echo "https://archive.apache.org/dist/pulsar/pulsar-client-cpp-${VERSION}" + echo "https://dist.apache.org/repos/dist/dev/pulsar/pulsar-client-cpp/pulsar-client-cpp-${VERSION}-candidate-1" } download_dependency() { diff --git a/dependencies.yaml b/dependencies.yaml index e6f598f..f24d9a6 100644 --- a/dependencies.yaml +++ b/dependencies.yaml @@ -17,7 +17,7 @@ # under the License. # -pulsar-cpp: 3.8.0 +pulsar-cpp: 4.0.0 pybind11: 2.10.1 # The OpenSSL dependency is only used when building Python from source openssl: 1.1.1q diff --git a/tests/custom_logger_test.py b/tests/custom_logger_test.py index 60f3315..89bb460 100755 --- a/tests/custom_logger_test.py +++ b/tests/custom_logger_test.py @@ -21,6 +21,7 @@ from unittest import TestCase, main import asyncio import logging +import threading from pulsar import Client class CustomLoggingTest(TestCase): @@ -49,6 +50,35 @@ async def async_get(value): client.close() + def test_logger_thread_leaks(self): + def _do_connect(close): + logger = logging.getLogger(str(threading.current_thread().ident)) + logger.setLevel(logging.INFO) + client = Client( + service_url="pulsar://localhost:6650", + io_threads=4, + message_listener_threads=4, + operation_timeout_seconds=1, + log_conf_file_path=None, + authentication=None, + logger=logger, + ) + client.get_topic_partitions("persistent://public/default/partitioned_topic_name_test") + if close: + client.close() + + for should_close in (True, False): + self.assertEqual(threading.active_count(), 1, "Explicit close: {}; baseline is 1 thread".format(should_close)) + _do_connect(should_close) + self.assertEqual(threading.active_count(), 1, "Explicit close: {}; synchronous connect doesn't leak threads".format(should_close)) + threads = [] + for _ in range(10): + threads.append(threading.Thread(target=_do_connect, args=(should_close))) + threads[-1].start() + for thread in threads: + thread.join() + assert threading.active_count() == 1, "Explicit close: {}; threaded connect in parallel doesn't leak threads".format(should_close) + if __name__ == '__main__': logging.basicConfig(level=logging.DEBUG) main() diff --git a/tests/pulsar_test.py b/tests/pulsar_test.py index d24faf3..ee10579 100755 --- a/tests/pulsar_test.py +++ b/tests/pulsar_test.py @@ -20,8 +20,6 @@ import random -import threading -import logging from typing import Optional from unittest import TestCase, main import time @@ -1529,34 +1527,6 @@ def test_json_schema_encode(self): self.assertEqual(first_encode, second_encode) - def test_logger_thread_leaks(self): - def _do_connect(close): - logger = logging.getLogger(str(threading.current_thread().ident)) - logger.setLevel(logging.INFO) - client = pulsar.Client( - service_url="pulsar://localhost:6650", - io_threads=4, - message_listener_threads=4, - operation_timeout_seconds=1, - log_conf_file_path=None, - authentication=None, - logger=logger, - ) - client.get_topic_partitions("persistent://public/default/partitioned_topic_name_test") - if close: - client.close() - - for should_close in (True, False): - self.assertEqual(threading.active_count(), 1, "Explicit close: {}; baseline is 1 thread".format(should_close)) - _do_connect(should_close) - self.assertEqual(threading.active_count(), 1, "Explicit close: {}; synchronous connect doesn't leak threads".format(should_close)) - threads = [] - for _ in range(10): - threads.append(threading.Thread(target=_do_connect, args=(should_close))) - threads[-1].start() - for thread in threads: - thread.join() - assert threading.active_count() == 1, "Explicit close: {}; threaded connect in parallel doesn't leak threads".format(should_close) def test_chunking(self): client = Client(self.serviceUrl)