Skip to content

Commit 783a03b

Browse files
authored
Merge pull request #48 from epandurski/master
Implement swpt-drainer CLI command, add missing queue.task_done() calls
2 parents c9cbb2c + f297de6 commit 783a03b

38 files changed

+1470
-26
lines changed

README.md

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -129,6 +129,15 @@ container allows you to execute the following *documented commands*:
129129

130130
For more information, run `swpt-client --help`.
131131

132+
* `swpt-drainer`
133+
134+
Consumes (drains) a RabbitMQ queue associated with an already
135+
deactivated Swaptacular peer node, freeing up resources. A peer node
136+
ID, and a queue name should be specified as arguments. You can start
137+
simultaneously as many drainers as you like.
138+
139+
For more information, run `swpt-drainer --help`.
140+
132141
* `configure-queue`
133142

134143
Configures a RabbitMQ queue that will contain messages which have to be

docker/entrypoint.sh

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -47,6 +47,9 @@ case $1 in
4747
swpt-client)
4848
exec "$@"
4949
;;
50+
swpt-drainer)
51+
exec "$@"
52+
;;
5053
configure-queue)
5154
exec "$@"
5255
;;

pyproject.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,7 @@ black = "^23.7.0"
2727
[tool.poetry.scripts]
2828
swpt-client = "swpt_stomp.client:client"
2929
swpt-server = "swpt_stomp.server:server"
30+
swpt-drainer = "swpt_stomp.drainer:drainer"
3031
configure-queue = "swpt_stomp.configure_queue:configure_queue"
3132

3233
[tool.pylsp-mypy]

swpt_stomp/configure_queue.py

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -79,6 +79,9 @@ async def bind_queue() -> None:
7979
if peer_data is None:
8080
_logger.error("Peer %s is not in the database.", peer_node_id)
8181
sys.exit(1)
82+
if peer_data.is_deactivated: # pragma: no cover
83+
_logger.error("Peer %s has been deactivated.", peer_node_id)
84+
sys.exit(1)
8285

8386
connection = await aio_pika.connect(broker_url)
8487
channel = await connection.channel()

swpt_stomp/drainer.py

Lines changed: 317 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,317 @@
1+
##############################################################################
2+
# Implements a drainer that consumes messages from existing queues
3+
# associated with already deactivated Swaptacular peers.
4+
#
5+
# Here is how the different parts fit together:
6+
#
7+
# messages_queue responses_queue
8+
# /-------\ (messages) /----------\ (resp. messages) /-------\
9+
# | |------------------->| |-------------------->| |
10+
# |Rabbit | |drainer | |Rabbit |
11+
# |MQ | AMQP 0.9.1 |asyncio | AMQP 0.9.1 |MQ |
12+
# |Server | |task | |Server |
13+
# | |<-------------------| |<--------------------| |
14+
# \-------/ acks_queue \----------/ confirms_queue \-------/
15+
# (msg. acks) | (publ. confirms)
16+
# |
17+
# |
18+
# V
19+
# /---------------------------------\
20+
# | Node Peers Database |
21+
# \---------------------------------/
22+
#
23+
# The "drainer asyncio task" reads messages from the RabbitMQ Server
24+
# and acks them immediately. However, for some of the messages the
25+
# drainer task will generate fake responses (and will publish them to
26+
# the RabbitMQ Server), as if the responses were generated by the
27+
# deactivated peer. The role of the fake responses is to free up
28+
# resources.
29+
#
30+
# The "Node Peers Database" contains information about the peers of
31+
# the given node.
32+
##############################################################################
33+
34+
import logging
35+
import sys
36+
import asyncio
37+
import click
38+
from datetime import datetime, timezone
39+
from swpt_stomp.loggers import configure_logging
40+
from typing import Union, Optional
41+
from functools import partial
42+
from swpt_stomp.common import (
43+
WatermarkQueue,
44+
ServerError,
45+
Message,
46+
set_event_loop_policy,
47+
)
48+
from swpt_stomp import smp_schemas
49+
from swpt_stomp.rmq import consume_from_queue, publish_to_exchange
50+
from swpt_stomp.server import EXCHANGE_NAMES
51+
from swpt_stomp.peer_data import get_database_instance
52+
from swpt_stomp.process_messages import (
53+
transform_message,
54+
preprocess_message,
55+
parse_message_body,
56+
)
57+
58+
_configure_account = smp_schemas.ConfigureAccountMessageSchema()
59+
_finalize_transfer = smp_schemas.FinalizeTransferMessageSchema()
60+
_rejected_config = smp_schemas.RejectedConfigMessageSchema()
61+
62+
63+
def generate_optional_response(message: Message) -> Optional[Message]:
64+
msg_type = message.type
65+
if msg_type == "AccountUpdate":
66+
msg_data = parse_message_body(message)
67+
assert msg_data["type"] == msg_type
68+
response_type = "ConfigureAccount"
69+
response_json = _configure_account.dumps({
70+
"type": response_type,
71+
"creditor_id": msg_data["creditor_id"],
72+
"debtor_id": msg_data["debtor_id"],
73+
"negligible_amount": 1e30,
74+
"config_data": "",
75+
"config_flags": 1, # scheduled for deletion
76+
"seqnum": 0,
77+
"ts": datetime.now(tz=timezone.utc),
78+
})
79+
elif msg_type == "PreparedTransfer":
80+
msg_data = parse_message_body(message)
81+
assert msg_data["type"] == msg_type
82+
response_type = "FinalizeTransfer"
83+
response_json = _finalize_transfer.dumps({
84+
"type": response_type,
85+
"creditor_id": msg_data["creditor_id"],
86+
"debtor_id": msg_data["debtor_id"],
87+
"transfer_id": msg_data["transfer_id"],
88+
"coordinator_type": msg_data["coordinator_type"],
89+
"coordinator_id": msg_data["coordinator_id"],
90+
"coordinator_request_id": msg_data["coordinator_request_id"],
91+
"committed_amount": 0,
92+
"transfer_note": "",
93+
"transfer_note_format": "",
94+
"ts": datetime.now(tz=timezone.utc),
95+
})
96+
elif msg_type == "ConfigureAccount":
97+
msg_data = parse_message_body(message)
98+
assert msg_data["type"] == msg_type
99+
response_type = "RejectedConfig"
100+
response_json = _rejected_config.dumps({
101+
"type": response_type,
102+
"creditor_id": msg_data["creditor_id"],
103+
"debtor_id": msg_data["debtor_id"],
104+
"config_ts": msg_data["ts"],
105+
"config_seqnum": msg_data["seqnum"],
106+
"config_flags": msg_data["config_flags"],
107+
"negligible_amount": msg_data["negligible_amount"],
108+
"config_data": msg_data["config_data"],
109+
"rejection_code": "NO_CONNECTION_TO_DEBTOR",
110+
"ts": datetime.now(tz=timezone.utc),
111+
})
112+
else:
113+
return None
114+
115+
return Message(
116+
id=message.id,
117+
type=response_type,
118+
body=bytearray(response_json.encode("utf8")),
119+
content_type="application/json",
120+
)
121+
122+
123+
async def drain(
124+
*,
125+
peer_node_id: str,
126+
nodedata_url: str,
127+
protocol_broker_url,
128+
protocol_broker_queue,
129+
client_queue_size: int,
130+
server_queue_size: int,
131+
):
132+
db = get_database_instance(url=nodedata_url)
133+
owner_node_data = await db.get_node_data()
134+
peer_data = await db.get_peer_data(peer_node_id, active_peers_only=False)
135+
if peer_data is None: # pragma: no cover
136+
raise RuntimeError(f"Peer {peer_node_id} is not in the database.")
137+
if not peer_data.is_deactivated: # pragma: no cover
138+
raise RuntimeError(f"Peer {peer_node_id} has not been deactivated.")
139+
140+
acks_queue: WatermarkQueue[Union[str, None]] = WatermarkQueue(
141+
client_queue_size
142+
)
143+
messages_queue: asyncio.Queue[
144+
Union[Message, None, ServerError]
145+
] = asyncio.Queue(client_queue_size)
146+
147+
responses_queue: WatermarkQueue[Union[Message, None]] = WatermarkQueue(
148+
server_queue_size
149+
)
150+
confirms_queue: asyncio.Queue[
151+
Union[str, None, ServerError]
152+
] = asyncio.Queue(server_queue_size)
153+
154+
async def ack_and_respond_if_necessary():
155+
while m := await messages_queue.get():
156+
if isinstance(m, ServerError):
157+
messages_queue.task_done()
158+
raise m
159+
160+
if r := generate_optional_response(m):
161+
await responses_queue.put(r)
162+
163+
await acks_queue.put(m.id)
164+
messages_queue.task_done()
165+
166+
messages_queue.task_done() # pragma: no cover
167+
168+
async def ignore_confirmations():
169+
while True:
170+
await confirms_queue.get()
171+
confirms_queue.task_done()
172+
173+
loop = asyncio.get_running_loop()
174+
read_messages_task = loop.create_task(
175+
consume_from_queue(
176+
messages_queue,
177+
acks_queue,
178+
url=protocol_broker_url,
179+
queue_name=protocol_broker_queue,
180+
transform_message=partial(
181+
transform_message, owner_node_data, peer_data
182+
),
183+
)
184+
)
185+
process_messages_task = loop.create_task(
186+
ack_and_respond_if_necessary()
187+
)
188+
publish_responses_task = loop.create_task(
189+
publish_to_exchange(
190+
confirms_queue,
191+
responses_queue,
192+
url=protocol_broker_url,
193+
exchange_name=EXCHANGE_NAMES[owner_node_data.node_type],
194+
preprocess_message=partial(
195+
preprocess_message, owner_node_data, peer_data
196+
),
197+
)
198+
)
199+
process_publish_confirmations_task = loop.create_task(
200+
ignore_confirmations()
201+
)
202+
203+
tasks = [
204+
read_messages_task,
205+
process_messages_task,
206+
publish_responses_task,
207+
process_publish_confirmations_task,
208+
]
209+
try:
210+
await asyncio.gather(*tasks)
211+
except asyncio.CancelledError:
212+
pass
213+
finally:
214+
for t in tasks:
215+
t.cancel()
216+
await asyncio.wait(tasks)
217+
218+
219+
@click.command()
220+
@click.argument("peer_node_id")
221+
@click.argument("queue_name")
222+
@click.option(
223+
"-n",
224+
"--nodedata-url",
225+
envvar="SWPT_NODEDATA_URL",
226+
default="file:///var/lib/swpt-nodedata",
227+
show_envvar=True,
228+
show_default=True,
229+
help=(
230+
"URL of the database that contains current node's data, including "
231+
"information about peer nodes."
232+
),
233+
)
234+
@click.option(
235+
"-u",
236+
"--broker-url",
237+
envvar="PROTOCOL_BROKER_URL",
238+
default="amqp://guest:guest@localhost:5672",
239+
show_envvar=True,
240+
show_default=True,
241+
help="URL of the RabbitMQ broker to connect to.",
242+
)
243+
@click.option(
244+
"-b",
245+
"--client-buffer",
246+
type=int,
247+
envvar="SWPT_CLIENT_BUFFER",
248+
default=100,
249+
show_envvar=True,
250+
show_default=True,
251+
help="Maximum number of consumed messages to store in memory.",
252+
)
253+
@click.option(
254+
"-b",
255+
"--server-buffer",
256+
type=int,
257+
envvar="SWPT_SERVER_BUFFER",
258+
default=100,
259+
show_envvar=True,
260+
show_default=True,
261+
help="Maximum number of generated response messages to store in memory.",
262+
)
263+
@click.option(
264+
"-l",
265+
"--log-level",
266+
type=click.Choice(["error", "warning", "info", "debug"]),
267+
envvar="APP_LOG_LEVEL",
268+
default="info",
269+
show_envvar=True,
270+
show_default=True,
271+
help="Application log level.",
272+
)
273+
@click.option(
274+
"-f",
275+
"--log-format",
276+
type=click.Choice(["text", "json"]),
277+
envvar="APP_LOG_FORMAT",
278+
default="text",
279+
show_envvar=True,
280+
show_default=True,
281+
help="Application log format.",
282+
)
283+
def drainer(
284+
peer_node_id: str,
285+
queue_name: str,
286+
nodedata_url: str,
287+
broker_url: str,
288+
client_buffer: int,
289+
server_buffer: int,
290+
log_level: str,
291+
log_format: str,
292+
):
293+
"""Consumes (drains) a RabbitMQ queue associated with an already
294+
deactivated Swaptacular peer node, freeing up resources.
295+
296+
PEER_NODE_ID: The node ID of the deactivated Swaptacular peer node.
297+
298+
QUEUE_NAME: The name of the RabbitMQ queue to consume messages from.
299+
"""
300+
set_event_loop_policy()
301+
configure_logging(level=log_level, format=log_format)
302+
303+
asyncio.run(
304+
drain(
305+
peer_node_id=peer_node_id,
306+
nodedata_url=nodedata_url,
307+
client_queue_size=client_buffer,
308+
server_queue_size=server_buffer,
309+
protocol_broker_url=broker_url,
310+
protocol_broker_queue=queue_name,
311+
)
312+
)
313+
sys.exit(1) # pragma: nocover
314+
315+
316+
if __name__ == "__main__": # pragma: nocover
317+
drainer()

0 commit comments

Comments
 (0)