Skip to content

Commit

Permalink
0.7.1 (#94)
Browse files Browse the repository at this point in the history
  • Loading branch information
NeonDaniel authored Dec 11, 2023
2 parents 314bf81 + 78fb74f commit b9566b2
Show file tree
Hide file tree
Showing 7 changed files with 95 additions and 65 deletions.
4 changes: 2 additions & 2 deletions .github/workflows/unit_tests.yml
Original file line number Diff line number Diff line change
Expand Up @@ -39,8 +39,8 @@ jobs:
- name: Run Connector Tests
run: |
pytest tests/test_connector.py --doctest-modules --junitxml=tests/connector-test-results.xml
env:
MQ_TESTING: 1
# env:
# MQ_TESTING: 1
- name: Upload Connector test results
uses: actions/upload-artifact@v2
with:
Expand Down
49 changes: 33 additions & 16 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,35 +1,52 @@
# Changelog

## [0.6.1a10](https://github.com/NeonGeckoCom/neon_mq_connector/tree/0.6.1a10) (2023-06-06)
## [0.7.1](https://github.com/NeonGeckoCom/neon_mq_connector/tree/0.7.1) (2023-12-11)

[Full Changelog](https://github.com/NeonGeckoCom/neon_mq_connector/compare/0.6.1a9...0.6.1a10)
[Full Changelog](https://github.com/NeonGeckoCom/neon_mq_connector/compare/0.7.1a5...0.7.1)

**Closed issues:**

- remove hard-coded configuration of log level [\#87](https://github.com/NeonGeckoCom/neon_mq_connector/issues/87)

## [0.7.1a5](https://github.com/NeonGeckoCom/neon_mq_connector/tree/0.7.1a5) (2023-12-09)

[Full Changelog](https://github.com/NeonGeckoCom/neon_mq_connector/compare/0.7.1a4...0.7.1a5)

**Merged pull requests:**

- Reduce message emit log to `DEBUG` [\#93](https://github.com/NeonGeckoCom/neon_mq_connector/pull/93) ([NeonDaniel](https://github.com/NeonDaniel))

## [0.7.1a4](https://github.com/NeonGeckoCom/neon_mq_connector/tree/0.7.1a4) (2023-08-11)

[Full Changelog](https://github.com/NeonGeckoCom/neon_mq_connector/compare/0.7.1a3...0.7.1a4)

**Merged pull requests:**

- Updated Configuration, Permit Infinite Consumer Restarts [\#70](https://github.com/NeonGeckoCom/neon_mq_connector/pull/70) ([github-actions[bot]](https://github.com/apps/github-actions))
- Update neon\_utils dependency spec to support 1.0 [\#61](https://github.com/NeonGeckoCom/neon_mq_connector/pull/61) ([github-actions[bot]](https://github.com/apps/github-actions))
- Logging, testing, style updates and minor bugfixes [\#54](https://github.com/NeonGeckoCom/neon_mq_connector/pull/54) ([NeonDaniel](https://github.com/NeonDaniel))
- Updated Exchanges, Publishing, Chatbots support [\#42](https://github.com/NeonGeckoCom/neon_mq_connector/pull/42) ([github-actions[bot]](https://github.com/apps/github-actions))
- Update `message_id` handling for response routing [\#92](https://github.com/NeonGeckoCom/neon_mq_connector/pull/92) ([NeonDaniel](https://github.com/NeonDaniel))

## [0.6.1a9](https://github.com/NeonGeckoCom/neon_mq_connector/tree/0.6.1a9) (2023-05-01)
## [0.7.1a3](https://github.com/NeonGeckoCom/neon_mq_connector/tree/0.7.1a3) (2023-07-28)

[Full Changelog](https://github.com/NeonGeckoCom/neon_mq_connector/compare/0.6.1a8...0.6.1a9)
[Full Changelog](https://github.com/NeonGeckoCom/neon_mq_connector/compare/0.7.1a2...0.7.1a3)

## [0.6.1a8](https://github.com/NeonGeckoCom/neon_mq_connector/tree/0.6.1a8) (2023-04-27)
**Merged pull requests:**

[Full Changelog](https://github.com/NeonGeckoCom/neon_mq_connector/compare/0.6.1a7...0.6.1a8)
- Update message\_id handling for iris compat [\#91](https://github.com/NeonGeckoCom/neon_mq_connector/pull/91) ([NeonDaniel](https://github.com/NeonDaniel))

## [0.6.1a7](https://github.com/NeonGeckoCom/neon_mq_connector/tree/0.6.1a7) (2023-04-21)
## [0.7.1a2](https://github.com/NeonGeckoCom/neon_mq_connector/tree/0.7.1a2) (2023-07-27)

[Full Changelog](https://github.com/NeonGeckoCom/neon_mq_connector/compare/0.6.1a6...0.6.1a7)
[Full Changelog](https://github.com/NeonGeckoCom/neon_mq_connector/compare/0.7.1a1...0.7.1a2)

## [0.6.1a6](https://github.com/NeonGeckoCom/neon_mq_connector/tree/0.6.1a6) (2023-04-21)
**Merged pull requests:**

[Full Changelog](https://github.com/NeonGeckoCom/neon_mq_connector/compare/0.6.1a5...0.6.1a6)
- Update logging and response routing [\#89](https://github.com/NeonGeckoCom/neon_mq_connector/pull/89) ([NeonDaniel](https://github.com/NeonDaniel))

## [0.6.1a5](https://github.com/NeonGeckoCom/neon_mq_connector/tree/0.6.1a5) (2023-04-21)
## [0.7.1a1](https://github.com/NeonGeckoCom/neon_mq_connector/tree/0.7.1a1) (2023-07-26)

[Full Changelog](https://github.com/NeonGeckoCom/neon_mq_connector/compare/0.7.0...0.7.1a1)

**Merged pull requests:**

[Full Changelog](https://github.com/NeonGeckoCom/neon_mq_connector/compare/0.6.0...0.6.1a5)
- Bug Fix: Setting Generated Message ID as default value, skipping it while already set [\#90](https://github.com/NeonGeckoCom/neon_mq_connector/pull/90) ([NeonKirill](https://github.com/NeonKirill))



Expand Down
82 changes: 44 additions & 38 deletions neon_mq_connector/connector.py
Original file line number Diff line number Diff line change
Expand Up @@ -218,16 +218,16 @@ def __basic_configurable_properties(self) -> Dict[str, Any]:
self.service_configurable_properties()
"""
return {
'sync_period': 10, # in seconds
'observe_period': 20, # in seconds
'vhost_prefix': '', # Could be used for scalability purposes
'default_testing_prefix': 'test',
'testing_envs': (f'{self.service_name.upper()}_TESTING',
'MQ_TESTING',), # order matters
'testing_prefix_envs': (f'{self.service_name.upper()}'
f'_TESTING_PREFIX',
'MQ_TESTING_PREFIX',) # order matters
}
'sync_period': 10, # in seconds
'observe_period': 20, # in seconds
'vhost_prefix': '', # Could be used for scalability purposes
'default_testing_prefix': 'test',
'testing_envs': (f'{self.service_name.upper()}_TESTING',
'MQ_TESTING',), # order matters
'testing_prefix_envs': (f'{self.service_name.upper()}'
f'_TESTING_PREFIX',
'MQ_TESTING_PREFIX',) # order matters
}

@property
def service_configurable_properties(self) -> Dict[str, Any]:
Expand Down Expand Up @@ -363,29 +363,34 @@ def emit_mq_message(cls,
:raises ValueError: invalid request data provided
:returns message_id: id of the sent message
"""
if request_data and len(request_data) > 0 and isinstance(request_data,
dict):
message_id = cls.create_unique_id()
request_data['message_id'] = message_id
with connection.channel() as channel:
if exchange:
channel.exchange_declare(exchange=exchange,
exchange_type=exchange_type,
auto_delete=False)
if queue:
declared_queue = channel.queue_declare(queue=queue,
auto_delete=False)
if exchange_type == ExchangeType.fanout.value:
channel.queue_bind(queue=declared_queue.method.queue,
exchange=exchange)
channel.basic_publish(exchange=exchange or '',
routing_key=queue,
body=dict_to_b64(request_data),
properties=pika.BasicProperties(
expiration=str(expiration)))
return message_id
else:
raise ValueError(f'Invalid request data provided: {request_data}')
if not isinstance(request_data, dict):
raise TypeError(f"Expected dict and got {type(request_data)}")
if not request_data:
raise ValueError(f'No request data provided')

# Ensure `message_id` in data will match context in messagebus connector
request_data.setdefault('message_id', request_data.get("context", {})
.get("mq", {}).get("message_id") or
cls.create_unique_id())

with connection.channel() as channel:
if exchange:
channel.exchange_declare(exchange=exchange,
exchange_type=exchange_type,
auto_delete=False)
if queue:
declared_queue = channel.queue_declare(queue=queue,
auto_delete=False)
if exchange_type == ExchangeType.fanout.value:
channel.queue_bind(queue=declared_queue.method.queue,
exchange=exchange)
channel.basic_publish(exchange=exchange or '',
routing_key=queue,
body=dict_to_b64(request_data),
properties=pika.BasicProperties(
expiration=str(expiration)))
LOG.debug(f"sent message: {request_data['message_id']}")
return request_data['message_id']

@classmethod
def publish_message(cls,
Expand Down Expand Up @@ -437,25 +442,26 @@ def send_message(self,
vhost = self.vhost
if not connection_props:
connection_props = {}
LOG.debug(f'Opening connection on vhost={vhost}')
LOG.debug(f'Opening connection on vhost={vhost} queue={queue}')
with self.create_mq_connection(vhost=vhost,
**connection_props) as mq_conn:
if exchange_type in (ExchangeType.fanout,
ExchangeType.fanout.value,):
LOG.debug('Sending fanout request to MQ')
LOG.debug(f'Sending fanout request to exchange: {exchange}')
msg_id = self.publish_message(connection=mq_conn,
request_data=request_data,
exchange=exchange,
expiration=expiration)
else:
LOG.debug(f'Sending {exchange_type} request to MQ')
LOG.debug(f'Sending {exchange_type} request to exchange '
f'{exchange}')
msg_id = self.emit_mq_message(mq_conn,
queue=queue,
request_data=request_data,
exchange=exchange,
exchange_type=exchange_type,
expiration=expiration)
LOG.info(f'Message propagated, id={msg_id}')
LOG.debug(f'Message propagated, id={msg_id}')
return msg_id

@retry(use_self=True, num_retries=__run_retries__)
Expand Down Expand Up @@ -695,7 +701,7 @@ def observe_consumers(self):
Iteratively observes each consumer, and if it was launched but is not
alive - restarts it
"""
LOG.debug('Observers state observation')
# LOG.debug('Observers state observation')
consumers_dict = copy.copy(self.consumers)
for consumer_name, consumer_instance in consumers_dict.items():
if self.consumer_properties[consumer_name]['started'] and \
Expand Down
20 changes: 13 additions & 7 deletions neon_mq_connector/utils/client_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,6 @@
# NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS
# SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.

import logging
import uuid

from threading import Event
Expand All @@ -38,9 +37,6 @@

from neon_mq_connector.utils.network_utils import b64_to_dict

# TODO: Leave below to configuration
logging.getLogger("pika").setLevel(logging.CRITICAL)

_default_mq_config = {
"server": "api.neon.ai",
"port": 5672,
Expand Down Expand Up @@ -93,11 +89,21 @@ def on_error(thread, error):

def handle_mq_response(channel: Channel, method, _, body):
"""
Method that handles Neon API output.
In case received output message with the desired id, event stops
Method that handles Neon API output.
In case received output message with the desired id, event stops
"""
api_output = b64_to_dict(body)
api_output_msg_id = api_output.get('message_id', None)

# The Messagebus connector generates a unique `message_id` for each
# response message. Check context for the original one; otherwise,
# check in output directly as some APIs emit responses without a unique
# message_id
api_output_msg_id = \
api_output.get('context',
api_output).get('mq', api_output).get('message_id')
# TODO: One of these specs should be deprecated
if api_output_msg_id != api_output.get('message_id'):
LOG.debug(f"Handling message_id from response context")
if api_output_msg_id == message_id:
LOG.debug(f'MQ output: {api_output}')
channel.basic_ack(delivery_tag=method.delivery_tag)
Expand Down
2 changes: 1 addition & 1 deletion tests/test_backward_compatibility.py
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ def callback_func_1(self, channel, method, properties, body):

def __init__(self, config: dict, service_name: str):
super().__init__(config=config, service_name=service_name)
self.vhost = '/test'
self.vhost = '/neon_testing'
self.func_1_ok = False


Expand Down
1 change: 1 addition & 0 deletions tests/test_connector.py
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,7 @@ def __init__(self, config: dict, service_name: str):
self.exception = None
self._consume_event = None
self._consumer_restarted_event = None
self._vhost = "/neon_testing"
self.observe_period = 10
self.register_consumer(name="error", vhost=self.vhost, queue="error",
callback=self.callback_func_error,
Expand Down
2 changes: 1 addition & 1 deletion version.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,4 +26,4 @@
# NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS
# SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.

__version__ = "0.7.0"
__version__ = "0.7.1"

0 comments on commit b9566b2

Please sign in to comment.