diff --git a/.github/workflows/unit_tests.yml b/.github/workflows/unit_tests.yml index 8679a67..7355a17 100644 --- a/.github/workflows/unit_tests.yml +++ b/.github/workflows/unit_tests.yml @@ -39,8 +39,8 @@ jobs: - name: Run Connector Tests run: | pytest tests/test_connector.py --doctest-modules --junitxml=tests/connector-test-results.xml - env: - MQ_TESTING: 1 +# env: +# MQ_TESTING: 1 - name: Upload Connector test results uses: actions/upload-artifact@v2 with: diff --git a/CHANGELOG.md b/CHANGELOG.md index 95e2605..84b3cac 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,35 +1,52 @@ # Changelog -## [0.6.1a10](https://github.com/NeonGeckoCom/neon_mq_connector/tree/0.6.1a10) (2023-06-06) +## [0.7.1](https://github.com/NeonGeckoCom/neon_mq_connector/tree/0.7.1) (2023-12-11) -[Full Changelog](https://github.com/NeonGeckoCom/neon_mq_connector/compare/0.6.1a9...0.6.1a10) +[Full Changelog](https://github.com/NeonGeckoCom/neon_mq_connector/compare/0.7.1a5...0.7.1) + +**Closed issues:** + +- remove hard-coded configuration of log level [\#87](https://github.com/NeonGeckoCom/neon_mq_connector/issues/87) + +## [0.7.1a5](https://github.com/NeonGeckoCom/neon_mq_connector/tree/0.7.1a5) (2023-12-09) + +[Full Changelog](https://github.com/NeonGeckoCom/neon_mq_connector/compare/0.7.1a4...0.7.1a5) + +**Merged pull requests:** + +- Reduce message emit log to `DEBUG` [\#93](https://github.com/NeonGeckoCom/neon_mq_connector/pull/93) ([NeonDaniel](https://github.com/NeonDaniel)) + +## [0.7.1a4](https://github.com/NeonGeckoCom/neon_mq_connector/tree/0.7.1a4) (2023-08-11) + +[Full Changelog](https://github.com/NeonGeckoCom/neon_mq_connector/compare/0.7.1a3...0.7.1a4) **Merged pull requests:** -- Updated Configuration, Permit Infinite Consumer Restarts [\#70](https://github.com/NeonGeckoCom/neon_mq_connector/pull/70) ([github-actions[bot]](https://github.com/apps/github-actions)) -- Update neon\_utils dependency spec to support 1.0 [\#61](https://github.com/NeonGeckoCom/neon_mq_connector/pull/61) ([github-actions[bot]](https://github.com/apps/github-actions)) -- Logging, testing, style updates and minor bugfixes [\#54](https://github.com/NeonGeckoCom/neon_mq_connector/pull/54) ([NeonDaniel](https://github.com/NeonDaniel)) -- Updated Exchanges, Publishing, Chatbots support [\#42](https://github.com/NeonGeckoCom/neon_mq_connector/pull/42) ([github-actions[bot]](https://github.com/apps/github-actions)) +- Update `message_id` handling for response routing [\#92](https://github.com/NeonGeckoCom/neon_mq_connector/pull/92) ([NeonDaniel](https://github.com/NeonDaniel)) -## [0.6.1a9](https://github.com/NeonGeckoCom/neon_mq_connector/tree/0.6.1a9) (2023-05-01) +## [0.7.1a3](https://github.com/NeonGeckoCom/neon_mq_connector/tree/0.7.1a3) (2023-07-28) -[Full Changelog](https://github.com/NeonGeckoCom/neon_mq_connector/compare/0.6.1a8...0.6.1a9) +[Full Changelog](https://github.com/NeonGeckoCom/neon_mq_connector/compare/0.7.1a2...0.7.1a3) -## [0.6.1a8](https://github.com/NeonGeckoCom/neon_mq_connector/tree/0.6.1a8) (2023-04-27) +**Merged pull requests:** -[Full Changelog](https://github.com/NeonGeckoCom/neon_mq_connector/compare/0.6.1a7...0.6.1a8) +- Update message\_id handling for iris compat [\#91](https://github.com/NeonGeckoCom/neon_mq_connector/pull/91) ([NeonDaniel](https://github.com/NeonDaniel)) -## [0.6.1a7](https://github.com/NeonGeckoCom/neon_mq_connector/tree/0.6.1a7) (2023-04-21) +## [0.7.1a2](https://github.com/NeonGeckoCom/neon_mq_connector/tree/0.7.1a2) (2023-07-27) -[Full Changelog](https://github.com/NeonGeckoCom/neon_mq_connector/compare/0.6.1a6...0.6.1a7) +[Full Changelog](https://github.com/NeonGeckoCom/neon_mq_connector/compare/0.7.1a1...0.7.1a2) -## [0.6.1a6](https://github.com/NeonGeckoCom/neon_mq_connector/tree/0.6.1a6) (2023-04-21) +**Merged pull requests:** -[Full Changelog](https://github.com/NeonGeckoCom/neon_mq_connector/compare/0.6.1a5...0.6.1a6) +- Update logging and response routing [\#89](https://github.com/NeonGeckoCom/neon_mq_connector/pull/89) ([NeonDaniel](https://github.com/NeonDaniel)) -## [0.6.1a5](https://github.com/NeonGeckoCom/neon_mq_connector/tree/0.6.1a5) (2023-04-21) +## [0.7.1a1](https://github.com/NeonGeckoCom/neon_mq_connector/tree/0.7.1a1) (2023-07-26) + +[Full Changelog](https://github.com/NeonGeckoCom/neon_mq_connector/compare/0.7.0...0.7.1a1) + +**Merged pull requests:** -[Full Changelog](https://github.com/NeonGeckoCom/neon_mq_connector/compare/0.6.0...0.6.1a5) +- Bug Fix: Setting Generated Message ID as default value, skipping it while already set [\#90](https://github.com/NeonGeckoCom/neon_mq_connector/pull/90) ([NeonKirill](https://github.com/NeonKirill)) diff --git a/neon_mq_connector/connector.py b/neon_mq_connector/connector.py index d15e5fd..b16da48 100644 --- a/neon_mq_connector/connector.py +++ b/neon_mq_connector/connector.py @@ -218,16 +218,16 @@ def __basic_configurable_properties(self) -> Dict[str, Any]: self.service_configurable_properties() """ return { - 'sync_period': 10, # in seconds - 'observe_period': 20, # in seconds - 'vhost_prefix': '', # Could be used for scalability purposes - 'default_testing_prefix': 'test', - 'testing_envs': (f'{self.service_name.upper()}_TESTING', - 'MQ_TESTING',), # order matters - 'testing_prefix_envs': (f'{self.service_name.upper()}' - f'_TESTING_PREFIX', - 'MQ_TESTING_PREFIX',) # order matters - } + 'sync_period': 10, # in seconds + 'observe_period': 20, # in seconds + 'vhost_prefix': '', # Could be used for scalability purposes + 'default_testing_prefix': 'test', + 'testing_envs': (f'{self.service_name.upper()}_TESTING', + 'MQ_TESTING',), # order matters + 'testing_prefix_envs': (f'{self.service_name.upper()}' + f'_TESTING_PREFIX', + 'MQ_TESTING_PREFIX',) # order matters + } @property def service_configurable_properties(self) -> Dict[str, Any]: @@ -363,29 +363,34 @@ def emit_mq_message(cls, :raises ValueError: invalid request data provided :returns message_id: id of the sent message """ - if request_data and len(request_data) > 0 and isinstance(request_data, - dict): - message_id = cls.create_unique_id() - request_data['message_id'] = message_id - with connection.channel() as channel: - if exchange: - channel.exchange_declare(exchange=exchange, - exchange_type=exchange_type, - auto_delete=False) - if queue: - declared_queue = channel.queue_declare(queue=queue, - auto_delete=False) - if exchange_type == ExchangeType.fanout.value: - channel.queue_bind(queue=declared_queue.method.queue, - exchange=exchange) - channel.basic_publish(exchange=exchange or '', - routing_key=queue, - body=dict_to_b64(request_data), - properties=pika.BasicProperties( - expiration=str(expiration))) - return message_id - else: - raise ValueError(f'Invalid request data provided: {request_data}') + if not isinstance(request_data, dict): + raise TypeError(f"Expected dict and got {type(request_data)}") + if not request_data: + raise ValueError(f'No request data provided') + + # Ensure `message_id` in data will match context in messagebus connector + request_data.setdefault('message_id', request_data.get("context", {}) + .get("mq", {}).get("message_id") or + cls.create_unique_id()) + + with connection.channel() as channel: + if exchange: + channel.exchange_declare(exchange=exchange, + exchange_type=exchange_type, + auto_delete=False) + if queue: + declared_queue = channel.queue_declare(queue=queue, + auto_delete=False) + if exchange_type == ExchangeType.fanout.value: + channel.queue_bind(queue=declared_queue.method.queue, + exchange=exchange) + channel.basic_publish(exchange=exchange or '', + routing_key=queue, + body=dict_to_b64(request_data), + properties=pika.BasicProperties( + expiration=str(expiration))) + LOG.debug(f"sent message: {request_data['message_id']}") + return request_data['message_id'] @classmethod def publish_message(cls, @@ -437,25 +442,26 @@ def send_message(self, vhost = self.vhost if not connection_props: connection_props = {} - LOG.debug(f'Opening connection on vhost={vhost}') + LOG.debug(f'Opening connection on vhost={vhost} queue={queue}') with self.create_mq_connection(vhost=vhost, **connection_props) as mq_conn: if exchange_type in (ExchangeType.fanout, ExchangeType.fanout.value,): - LOG.debug('Sending fanout request to MQ') + LOG.debug(f'Sending fanout request to exchange: {exchange}') msg_id = self.publish_message(connection=mq_conn, request_data=request_data, exchange=exchange, expiration=expiration) else: - LOG.debug(f'Sending {exchange_type} request to MQ') + LOG.debug(f'Sending {exchange_type} request to exchange ' + f'{exchange}') msg_id = self.emit_mq_message(mq_conn, queue=queue, request_data=request_data, exchange=exchange, exchange_type=exchange_type, expiration=expiration) - LOG.info(f'Message propagated, id={msg_id}') + LOG.debug(f'Message propagated, id={msg_id}') return msg_id @retry(use_self=True, num_retries=__run_retries__) @@ -695,7 +701,7 @@ def observe_consumers(self): Iteratively observes each consumer, and if it was launched but is not alive - restarts it """ - LOG.debug('Observers state observation') + # LOG.debug('Observers state observation') consumers_dict = copy.copy(self.consumers) for consumer_name, consumer_instance in consumers_dict.items(): if self.consumer_properties[consumer_name]['started'] and \ diff --git a/neon_mq_connector/utils/client_utils.py b/neon_mq_connector/utils/client_utils.py index 004677b..8768326 100644 --- a/neon_mq_connector/utils/client_utils.py +++ b/neon_mq_connector/utils/client_utils.py @@ -26,7 +26,6 @@ # NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS # SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. -import logging import uuid from threading import Event @@ -38,9 +37,6 @@ from neon_mq_connector.utils.network_utils import b64_to_dict -# TODO: Leave below to configuration -logging.getLogger("pika").setLevel(logging.CRITICAL) - _default_mq_config = { "server": "api.neon.ai", "port": 5672, @@ -93,11 +89,21 @@ def on_error(thread, error): def handle_mq_response(channel: Channel, method, _, body): """ - Method that handles Neon API output. - In case received output message with the desired id, event stops + Method that handles Neon API output. + In case received output message with the desired id, event stops """ api_output = b64_to_dict(body) - api_output_msg_id = api_output.get('message_id', None) + + # The Messagebus connector generates a unique `message_id` for each + # response message. Check context for the original one; otherwise, + # check in output directly as some APIs emit responses without a unique + # message_id + api_output_msg_id = \ + api_output.get('context', + api_output).get('mq', api_output).get('message_id') + # TODO: One of these specs should be deprecated + if api_output_msg_id != api_output.get('message_id'): + LOG.debug(f"Handling message_id from response context") if api_output_msg_id == message_id: LOG.debug(f'MQ output: {api_output}') channel.basic_ack(delivery_tag=method.delivery_tag) diff --git a/tests/test_backward_compatibility.py b/tests/test_backward_compatibility.py index 8651cd1..08ae240 100644 --- a/tests/test_backward_compatibility.py +++ b/tests/test_backward_compatibility.py @@ -42,7 +42,7 @@ def callback_func_1(self, channel, method, properties, body): def __init__(self, config: dict, service_name: str): super().__init__(config=config, service_name=service_name) - self.vhost = '/test' + self.vhost = '/neon_testing' self.func_1_ok = False diff --git a/tests/test_connector.py b/tests/test_connector.py index d837b62..1ba40c9 100644 --- a/tests/test_connector.py +++ b/tests/test_connector.py @@ -54,6 +54,7 @@ def __init__(self, config: dict, service_name: str): self.exception = None self._consume_event = None self._consumer_restarted_event = None + self._vhost = "/neon_testing" self.observe_period = 10 self.register_consumer(name="error", vhost=self.vhost, queue="error", callback=self.callback_func_error, diff --git a/version.py b/version.py index 93da92a..7680be6 100644 --- a/version.py +++ b/version.py @@ -26,4 +26,4 @@ # NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS # SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. -__version__ = "0.7.0" +__version__ = "0.7.1"