Skip to content

NeonGeckoCom/neon_mq_connector

Folders and files

NameName
Last commit message
Last commit date

Latest commit

 
Dec 6, 2024
Dec 10, 2024
Dec 5, 2024
Dec 10, 2024
Jul 13, 2021
Dec 11, 2023
Nov 25, 2021
Dec 10, 2024
Dec 3, 2024
Dec 5, 2024
Nov 25, 2021

Repository files navigation

Neon MQ Connector

The Neon MQ Connector is an MQ interface for microservices.

Configuration

By default, this package will use ovos-config to read default configuration. In general, configuration should be passed to the MQConnector object at init.

Legacy Configuration

A global configuration for the MQ Connector may be specified at ~/.config/neon/mq_config.json. This configuration file may contain the following keys:

  • server: The hostname or IP address of the MQ server to connect to. If left blank, this defaults to "localhost"
  • port: The port used by the MQ server. If left blank, this defaults to 5672
  • users: A mapping of service names to credentials. Note that not all users will have permissions required to access each service.
{
  "server": "localhost",
  "port": 5672,
  "users": {
    "<service_name>": {
      "username": "<username>",
      "password": "<password>"
    }
  }
}

Services

The MQConnector class should be extended by a class providing some specific service. Service classes will specify the following parameters.

  • service_name: Name of the service, used to identify credentials in configuration
  • vhost: Virtual host to connect to; messages are all constrained to this namespace.
  • consumers: Dict of names to ConsumerThread objects. A ConsumerThread will accept a connection to a particular connection, a queue, and a callback_func
    • connection: MQ connection to the vhost specified above.
    • queue: Queue to monitor within the vhost. A vhost may handle multiple queues.
    • callback_func: Function to call when a message arrives in the queue

Callback Functions

A callback function should have the following signature:

def handle_api_input(self,
                     channel: pika.channel.Channel,
                     method: pika.spec.Basic.Return,
                     properties: pika.spec.BasicProperties,
                     body: bytes):
    """
        Handles input requests from MQ to Neon API

        :param channel: MQ channel object (pika.channel.Channel)
        :param method: MQ return method (pika.spec.Basic.Return)
        :param properties: MQ properties (pika.spec.BasicProperties)
        :param body: request body (bytes)
    """

Generally, body should be decoded into a dict, and that dict should contain message_id. The message_id should be included in the body of any response to associate the response to the request. A response may be sent via:

 channel.queue_declare(queue='<queue>')

 channel.basic_publish(exchange='',
                       routing_key='<queue>',
                       body=<data>,
                       properties=pika.BasicProperties(expiration='1000')
                       )

Where <queue> is the queue to which the response will be published, and data is a bytes response (generally a base64-encoded dict).

Multi-part Responses

A callback function may choose to publish multiple response messages so the client may receive partial responses as they are being generated. If multiple responses will be returned, the following requirements must be met:

  • Each response must be a dict with _part and _is_final keys.
  • The final response must specify _is_final=True
  • The final response MUST NOT require the client to handle partial responses

Client Requests

Most client applications will interact with services via send_mq_request. This function will return a dict response to the input message.

Multi-part Responses

A caller may optionally include a stream_callback argument which may receive partial responses if supported by the service generating the response. The stream_callback will always be called with the final result that is returned by send_mq_request. Keep in mind that the timeout param passed to send_mq_request applies to the full response, so it may be desirable to increase the timeout if

[BETA] Asynchronous Consumers

Now there is a support for async-based consumers handling based on pika.SelectConnection

Enabling in a code

To enable creation of async consumers/subscribers, set the class-attribute async_consumers_enabled to True:

from neon_mq_connector import MQConnector

class MQConnectorChild(MQConnector):
   async_consumers_enabled  = True

About

MQ Connector module for Neon Modules

Resources

License

Stars

Watchers

Forks

Packages

No packages published

Contributors 4

  •  
  •  
  •  
  •  

Languages