Skip to content

Commit 701e13e

Browse files
committed
Merge branch '11-zeitlich-gruppierte-update-queue' into 'main'
Resolve "Zeitlich gruppierte Update Queue" Closes #11 See merge request imprj/01-bgp-testbed/zettabgp!9
2 parents eb63d84 + ad99b2e commit 701e13e

File tree

3 files changed

+102
-23
lines changed

3 files changed

+102
-23
lines changed

README.md

Lines changed: 19 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -62,12 +62,24 @@ The process can be started from within ExaBGP.\
6262
The Messages will be received using the stdin pipe.
6363
```
6464
Options:
65-
-r, --no-rabbitmq
65+
-d, --no-rabbitmq-direct
66+
-g, --rabbitmq-grouped INTEGER Queue group interval in minutes. [default: (5)]
6667
-l, --no-mongodb-log
6768
-s, --no-mongodb-state
6869
-t, --no-mongodb-statistics
6970
```
7071

72+
##### Queue Group Interval
73+
Besides the direct RabbitMQ queue (Exchange: `zettabgp` with Routing Key: `direct`), a grouped queue can be activated.\
74+
The grouped queue (Exchange: `zettabgp` with Routing Key: `grouped`) will be filled white grouped route updates when enabled with the `-g` option.\
75+
The group interval defaults to 5 minutes.\
76+
Alternative intervals can be set as an argument to option `-g`.\
77+
A 10 minute interval can be set as following.
78+
```
79+
zettabgp exabgp -g 10
80+
```
81+
When no `-g` option is present, no grouped updates will appear at all.
82+
7183
#### `zettabgp mrt-simulation`
7284
The `mrt-simulation` subcommand is used for processing MRT files.\
7385
It is mendatory to provide a valid path to a mrt file.\
@@ -76,14 +88,18 @@ Arguments:
7688
MRT_FILE
7789
7890
Options:
79-
-r, --no-rabbitmq
91+
-d, --no-rabbitmq-direct
92+
-g, --rabbitmq-grouped INTEGER Queue group interval in minutes. [default: (5)]
8093
-l, --no-mongodb-log
8194
-s, --no-mongodb-state
8295
-t, --no-mongodb-statistics
8396
-p, --playback-speed INTEGER Playback speed in multiples of real time. [default: (1)]
84-
-o, --playback-interval INTEGER Playback interval in minutes. [default: (5)]
97+
-o, --playback-interval INTEGER Playback interval in minutes. [default: (5)]
8598
```
8699

100+
##### Queue Group Interval
101+
See [`exabgp` command reference](#queue-group-interval).
102+
87103
##### Playback Speed
88104
Without specifying a playback speed, `mrt-simulation` will replay all route updates at once.\
89105
When defining playback speed, the replay of the updates will be done in multiples of real time.\

src/adapters/rabbitmq.py

Lines changed: 51 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -1,10 +1,11 @@
11
from src.parsers.route_update import RouteUpdateParser
22
from src.models.route_update import RouteUpdate
3-
import pika, os
3+
from datetime import timedelta, datetime
4+
import pika, json, os
45

56
class RabbitMQAdapter:
67
'''This class is responsible for receiving the parsed messages and forwarding them to the RabbitMQ message broker'''
7-
def __init__(self, parser: RouteUpdateParser):
8+
def __init__(self, parser: RouteUpdateParser, no_direct: bool, queue_interval: int):
89
connection = pika.BlockingConnection(
910
pika.ConnectionParameters(
1011
host=os.getenv('RABBIT_MQ_HOST', 'localhost'),
@@ -15,27 +16,65 @@ def __init__(self, parser: RouteUpdateParser):
1516
channel = connection.channel()
1617
channel.exchange_declare(
1718
exchange='zettabgp',
18-
exchange_type='fanout',
19+
exchange_type='direct',
1920
)
2021

22+
2123
'''Declares the test_bgp_updates queue and binds it to the zettabgp exchange'''
22-
def _declare_test_queue(queue_name: str):
24+
def _declare_test_queue(queue_name: str, routing_key: str):
2325
channel.queue_declare(
2426
queue=queue_name,
2527
)
2628
channel.queue_bind(
2729
exchange='zettabgp',
2830
queue=queue_name,
31+
routing_key=routing_key,
2932
)
3033

3134
_declare_test_queue(
32-
queue_name='test_bgp_updates',
35+
queue_name='test_direct_route_updates',
36+
routing_key='direct',
37+
)
38+
_declare_test_queue(
39+
queue_name='test_grouped_route_updates',
40+
routing_key='grouped',
3341
)
3442

35-
@parser.on_update
36-
def on_update(message: RouteUpdate):
37-
channel.basic_publish(
38-
exchange='zettabgp',
39-
body=message.model_dump_json(),
40-
routing_key='',
41-
)
43+
if not no_direct:
44+
@parser.on_update
45+
def direct(message: RouteUpdate):
46+
channel.basic_publish(
47+
exchange='zettabgp',
48+
body=message.model_dump_json(),
49+
routing_key='direct',
50+
)
51+
52+
if queue_interval:
53+
queue_interval_stop: datetime = None
54+
queue_interval_messages: list[RouteUpdate] = []
55+
56+
@parser.on_update
57+
def grouped(message: RouteUpdate):
58+
nonlocal queue_interval_stop
59+
60+
if queue_interval_stop:
61+
if message.timestamp >= queue_interval_stop:
62+
channel.basic_publish(
63+
exchange='zettabgp',
64+
body=json.dumps([
65+
message.model_dump(
66+
mode='json',
67+
)
68+
for message in queue_interval_messages
69+
]),
70+
routing_key='grouped',
71+
)
72+
73+
queue_interval_stop = queue_interval_stop + timedelta(minutes=queue_interval)
74+
queue_interval_messages.clear()
75+
else:
76+
# -1 seconds to avoid unpublished last interval when the next message is exactly at the stop time
77+
# this occurs when simulating with mrt-simulation and same -o value
78+
queue_interval_stop = message.timestamp + timedelta(minutes=queue_interval, seconds=-1)
79+
80+
queue_interval_messages.append(message)

src/main.py

Lines changed: 32 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -16,10 +16,20 @@ def cli():
1616
help='Process ExaBGP messages.',
1717
)
1818
@click.option(
19-
'--no-rabbitmq',
20-
'-r',
19+
'--no-rabbitmq-direct',
20+
'-d',
2121
is_flag=True,
2222
)
23+
@click.option(
24+
'--rabbitmq-grouped',
25+
'-g',
26+
type=int,
27+
default=None,
28+
show_default='5',
29+
is_flag=False,
30+
flag_value=5,
31+
help='Queue group interval in minutes.',
32+
)
2333
@click.option(
2434
'--no-mongodb-log',
2535
'-l',
@@ -35,12 +45,14 @@ def cli():
3545
'-t',
3646
is_flag=True,
3747
)
38-
def exabgp(no_rabbitmq: bool, no_mongodb_log: bool, no_mongodb_state: bool, no_mongodb_statistics: bool):
48+
def exabgp(no_rabbitmq_direct: bool, rabbitmq_grouped: int, no_mongodb_log: bool, no_mongodb_state: bool, no_mongodb_statistics: bool):
3949
parser = ExaBGPParser()
4050

41-
if not no_rabbitmq:
51+
if not no_rabbitmq_direct or rabbitmq_grouped:
4252
RabbitMQAdapter(
4353
parser=parser,
54+
no_direct=no_rabbitmq_direct,
55+
queue_interval=rabbitmq_grouped,
4456
)
4557

4658
if not no_mongodb_log or not no_mongodb_state or not no_mongodb_statistics:
@@ -64,10 +76,20 @@ def exabgp(no_rabbitmq: bool, no_mongodb_log: bool, no_mongodb_state: bool, no_m
6476
help='Process BGP4MP MRT files.',
6577
)
6678
@click.option(
67-
'--no-rabbitmq',
68-
'-r',
79+
'--no-rabbitmq-direct',
80+
'-d',
6981
is_flag=True,
7082
)
83+
@click.option(
84+
'--rabbitmq-grouped',
85+
'-g',
86+
type=int,
87+
default=None,
88+
show_default='5',
89+
is_flag=False,
90+
flag_value=5,
91+
help='Queue group interval in minutes.',
92+
)
7193
@click.option(
7294
'--no-mongodb-log',
7395
'-l',
@@ -110,12 +132,14 @@ def exabgp(no_rabbitmq: bool, no_mongodb_log: bool, no_mongodb_state: bool, no_m
110132
resolve_path=True,
111133
),
112134
)
113-
def mrt_simulation(no_rabbitmq: bool, no_mongodb_log: bool, no_mongodb_state: bool, no_mongodb_statistics: bool, playback_speed: int, playback_interval: int, mrt_file: str):
135+
def mrt_simulation(no_rabbitmq_direct: bool, rabbitmq_grouped: int, no_mongodb_log: bool, no_mongodb_state: bool, no_mongodb_statistics: bool, playback_speed: int, playback_interval: int, mrt_file: str):
114136
parser = MrtBgp4MpParser()
115137

116-
if not no_rabbitmq:
138+
if not no_rabbitmq_direct or rabbitmq_grouped:
117139
RabbitMQAdapter(
118140
parser=parser,
141+
no_direct=no_rabbitmq_direct,
142+
queue_interval=rabbitmq_grouped,
119143
)
120144

121145
if not no_mongodb_log or not no_mongodb_state or not no_mongodb_statistics:

0 commit comments

Comments
 (0)