Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

amqp unsub routine now works, make broker logic more robust #7

Merged
merged 5 commits into from
Jul 18, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion docker-compose.yml
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ services:
- "15672:15672" # Web UI
environment:
# full list of env variables available at https://github.com/bitnami/containers/blob/main/bitnami/rabbitmq/README.md
RABBITMQ_PLUGINS: "rabbitmq_mqtt"
RABBITMQ_PLUGINS: "rabbitmq_management rabbitmq_mqtt"
RABBITMQ_USERNAME: "intersect_username"
RABBITMQ_PASSWORD: "intersect_password"
RABBITMQ_MANAGEMENT_ALLOW_WEB_ACCESS: "yes"
Expand Down
1 change: 1 addition & 0 deletions examples/1_hello_world_amqp/hello_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
)

logging.basicConfig(level=logging.INFO)
logging.getLogger('pika').setLevel(logging.WARNING)
marshallmcdonnell marked this conversation as resolved.
Show resolved Hide resolved


def simple_client_callback(
Expand Down
1 change: 1 addition & 0 deletions examples/1_hello_world_amqp/hello_service.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
)

logging.basicConfig(level=logging.INFO)
logging.getLogger('pika').setLevel(logging.WARNING)
marshallmcdonnell marked this conversation as resolved.
Show resolved Hide resolved
logger = logging.getLogger(__name__)


Expand Down
Empty file.
99 changes: 99 additions & 0 deletions examples/3_ping_pong_events_amqp/ping_pong_client.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,99 @@
import logging

from intersect_sdk import (
INTERSECT_JSON_VALUE,
IntersectClient,
IntersectClientCallback,
IntersectClientConfig,
default_intersect_lifecycle_loop,
)

logging.basicConfig(level=logging.INFO)
logging.getLogger('pika').setLevel(logging.WARNING)

logger = logging.getLogger(__name__)

PING_SERVICE = 'p-ng-organization.p-ng-facility.p-ng-system.p-ng-subsystem.ping-service'
PONG_SERVICE = 'p-ng-organization.p-ng-facility.p-ng-system.p-ng-subsystem.pong-service'


class SampleOrchestrator:
"""Basic orchestrator with an event callback.

The event callback switches between listening to 'ping' and 'pong' events; it never listens to both simultaneously.
"""

MAX_EVENTS_TO_PROCESS = 4

def __init__(self) -> None:
"""Straightforward constructor, just initializes global variable which counts events."""
self.events_encountered = 0

def event_callback(
self, _source: str, _operation: str, event_name: str, payload: INTERSECT_JSON_VALUE
) -> IntersectClientCallback:
"""Handles events from two Services at once.

With this handler, the order of instructions goes like this:

1. Listen for a ping event.
2. On ping event - Start listening for pong events and stop listening for ping events.
3. On pong event - Start listening for ping events and stop listening for pong events.
4. Repeat steps 2-3 until we have processed the maximum number of events we want to.
"""
self.events_encountered += 1
logger.info(payload)
print(payload)
if self.events_encountered == self.MAX_EVENTS_TO_PROCESS:
raise Exception

# we would normally also check the source here. With certain services, checking the operation can also be helpful.
if event_name == 'ping':
return IntersectClientCallback(
services_to_start_listening_for_events=[PONG_SERVICE],
services_to_stop_listening_for_events=[PING_SERVICE],
)
return IntersectClientCallback(
services_to_start_listening_for_events=[PING_SERVICE],
services_to_stop_listening_for_events=[PONG_SERVICE],
)


if __name__ == '__main__':
from_config_file = {
'data_stores': {
'minio': [
{
'username': 'AKIAIOSFODNN7EXAMPLE',
'password': 'wJalrXUtnFEMI/K7MDENG/bPxRfiCYEXAMPLEKEY',
'port': 9000,
},
],
},
'brokers': [
{
'username': 'intersect_username',
'password': 'intersect_password',
'port': 5672,
'protocol': 'amqp0.9.1',
},
],
}

# we initially only listen for ping service events,
config = IntersectClientConfig(
initial_message_event_config=IntersectClientCallback(
services_to_start_listening_for_events=[
PING_SERVICE,
]
),
**from_config_file,
)
orchestrator = SampleOrchestrator()
client = IntersectClient(
config=config,
event_callback=orchestrator.event_callback,
)
default_intersect_lifecycle_loop(
client,
)
36 changes: 36 additions & 0 deletions examples/3_ping_pong_events_amqp/ping_service.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
"""Simple service which regularly emits an event."""

import logging
import threading
import time

from intersect_sdk import IntersectEventDefinition, intersect_event

from .service_runner import P_ngBaseCapabilityImplementation, run_service

logging.basicConfig(level=logging.INFO)
logging.getLogger('pika').setLevel(logging.WARNING)


class PingCapabiilityImplementation(P_ngBaseCapabilityImplementation):
"""Basic capability definition, very similar to the other capability except for the type of event it emits."""

def after_service_startup(self) -> None:
"""Called after service startup."""
self.counter_thread = threading.Thread(
target=self.ping_event,
daemon=True,
name='counter_thread',
)
self.counter_thread.start()

@intersect_event(events={'ping': IntersectEventDefinition(event_type=str)})
def ping_event(self) -> None:
"""Send out a ping event every 2 seconds."""
while True:
time.sleep(2.0)
self.intersect_sdk_emit_event('ping', 'ping')


if __name__ == '__main__':
run_service(PingCapabiilityImplementation())
140 changes: 140 additions & 0 deletions examples/3_ping_pong_events_amqp/ping_service_schema.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,140 @@
{
"asyncapi": "2.6.0",
"x-intersect-version": "0.6.2",
"info": {
"title": "p-ng-organization.p-ng-facility.p-ng-system.p-ng-subsystem.ping-service",
"version": "0.0.0"
},
"defaultContentType": "application/json",
"channels": {},
"events": {
"ping": {
"type": "string"
}
},
"components": {
"schemas": {},
"messageTraits": {
"commonHeaders": {
"messageHeaders": {
"$defs": {
"IntersectDataHandler": {
"description": "What data transfer type do you want to use for handling the request/response?\n\nDefault: MESSAGE",
"enum": [
0,
1
],
"title": "IntersectDataHandler",
"type": "integer"
}
},
"description": "Matches the current header definition for INTERSECT messages.\n\nALL messages should contain this header.",
"properties": {
"source": {
"description": "source of the message",
"pattern": "([-a-z0-9]+\\.)*[-a-z0-9]",
"title": "Source",
"type": "string"
},
"destination": {
"description": "destination of the message",
"pattern": "([-a-z0-9]+\\.)*[-a-z0-9]",
"title": "Destination",
"type": "string"
},
"created_at": {
"description": "the UTC timestamp of message creation",
"format": "date-time",
"title": "Created At",
"type": "string"
},
"sdk_version": {
"description": "SemVer string of SDK's version, used to check for compatibility",
"pattern": "^\\d+\\.\\d+\\.\\d+$",
"title": "Sdk Version",
"type": "string"
},
"data_handler": {
"allOf": [
{
"$ref": "#/components/messageTraits/commonHeaders/userspaceHeaders/$defs/IntersectDataHandler"
}
],
"default": 0,
"description": "Code signifying where data is stored."
},
"has_error": {
"default": false,
"description": "If this value is True, the payload will contain the error message (a string)",
"title": "Has Error",
"type": "boolean"
}
},
"required": [
"source",
"destination",
"created_at",
"sdk_version"
],
"title": "UserspaceMessageHeader",
"type": "object"
},
"eventHeaders": {
"$defs": {
"IntersectDataHandler": {
"description": "What data transfer type do you want to use for handling the request/response?\n\nDefault: MESSAGE",
"enum": [
0,
1
],
"title": "IntersectDataHandler",
"type": "integer"
}
},
"description": "Matches the current header definition for INTERSECT messages.\n\nALL messages should contain this header.",
"properties": {
"source": {
"description": "source of the message",
"pattern": "([-a-z0-9]+\\.)*[-a-z0-9]",
"title": "Source",
"type": "string"
},
"created_at": {
"description": "the UTC timestamp of message creation",
"format": "date-time",
"title": "Created At",
"type": "string"
},
"sdk_version": {
"description": "SemVer string of SDK's version, used to check for compatibility",
"pattern": "^\\d+\\.\\d+\\.\\d+$",
"title": "Sdk Version",
"type": "string"
},
"data_handler": {
"allOf": [
{
"$ref": "#/components/messageTraits/commonHeaders/eventHeaders/$defs/IntersectDataHandler"
}
],
"default": 0,
"description": "Code signifying where data is stored."
},
"event_name": {
"title": "Event Name",
"type": "string"
}
},
"required": [
"source",
"created_at",
"sdk_version",
"event_name"
],
"title": "EventMessageHeaders",
"type": "object"
}
}
}
}
}
36 changes: 36 additions & 0 deletions examples/3_ping_pong_events_amqp/pong_service.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
"""Simple service which regularly emits an event."""

import logging
import threading
import time

from intersect_sdk import IntersectEventDefinition, intersect_event

from .service_runner import P_ngBaseCapabilityImplementation, run_service

logging.basicConfig(level=logging.INFO)
logging.getLogger('pika').setLevel(logging.WARNING)


class PongCapabiilityImplementation(P_ngBaseCapabilityImplementation):
"""Basic capability definition, very similar to the other capability except for the type of event it emits."""

def after_service_startup(self) -> None:
"""Called after service startup."""
self.counter_thread = threading.Thread(
target=self.pong_event,
daemon=True,
name='counter_thread',
)
self.counter_thread.start()

@intersect_event(events={'pong': IntersectEventDefinition(event_type=str)})
def pong_event(self) -> None:
"""Send out a pong event every 2 seconds."""
while True:
time.sleep(2.0)
self.intersect_sdk_emit_event('pong', 'pong')


if __name__ == '__main__':
run_service(PongCapabiilityImplementation())
Loading
Loading