Skip to content

Commit

Permalink
Wip
Browse files Browse the repository at this point in the history
  • Loading branch information
levsh committed Apr 17, 2024
1 parent 4cad63c commit aaedd91
Show file tree
Hide file tree
Showing 3 changed files with 30 additions and 12 deletions.
2 changes: 1 addition & 1 deletion arrlio/backends/local.py
Original file line number Diff line number Diff line change
Expand Up @@ -187,7 +187,7 @@ async def push_task_result(self, task_result: TaskResult, task_instance: TaskIns

result = results[task_id]

result[1].append(self.serializer.dumps_task_result(task_result, task_instance))
result[1].append(self.serializer.dumps_task_result(task_result, task_instance=task_instance))
result[0].set()

if task_instance.result_ttl is not None:
Expand Down
39 changes: 28 additions & 11 deletions arrlio/backends/rabbitmq.py
Original file line number Diff line number Diff line change
Expand Up @@ -985,11 +985,18 @@ async def _on_task_message(self, callback, channel: aiormq.Channel, message: aio
routing_key = message.header.properties.reply_to
if routing_key.startswith("amq.rabbitmq.reply-to."):
exchange = self._default_exchange
headers = {}
data = self.serializer.dumps_task_result(
TaskResult(res=None, exc=e, trb=None),
task_instance=task_instance,
headers=headers,
)
await exchange.publish(
self.serializer.dumps_task_result(TaskResult(res=None, exc=e, trb=None), task_instance),
data,
routing_key=routing_key,
properties={
"delivery_mode": 2,
"headers": headers,
"message_id": message.header.properties.message_id,
"timestamp": datetime.now(tz=timezone.utc),
},
Expand Down Expand Up @@ -1114,18 +1121,28 @@ async def _push_task_result(
task_result.pretty_repr(sanitize=settings.LOG_SANITIZE),
)

headers = {}
data = self.serializer.dumps_task_result(task_result, task_instance=task_instance, headers=headers)

properties = {
"delivery_mode": 2,
"message_type": "arrlio:result",
"headers": headers,
"message_id": f"{task_instance.task_id}",
"correlation_id": f"{task_instance.task_id}",
"timestamp": datetime.now(tz=timezone.utc),
}
if self.serializer.content_type is not None:
properties["content_type"] = self.serializer.content_type
if task_instance.result_ttl is not None:
properties["expiration"] = f"{int(task_instance.result_ttl * 1000)}"
# if task_instance.extra.get("app_id"):
# properties["app_id"] = task_instance.extra["app_id"]

await exchange.publish(
self.serializer.dumps_task_result(task_result, task_instance),
data,
routing_key=routing_key,
properties={
"delivery_mode": 2,
"message_id": f"{task_instance.task_id}",
"timestamp": datetime.now(tz=timezone.utc),
"expiration": f"{int(task_instance.result_ttl * 1000)}"
if task_instance.result_ttl is not None
else None,
"correlation_id": f"{task_instance.task_id}",
},
properties=properties,
timeout=self.config.timeout,
)

Expand Down
1 change: 1 addition & 0 deletions arrlio/serializers/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ def loads_task_instance(self, data: bytes | TaskInstance, **kwds) -> TaskInstanc
def dumps_task_result(
self,
task_result: TaskResult,
*,
task_instance: TaskInstance | None = None,
**kwds,
) -> bytes | TaskResult:
Expand Down

0 comments on commit aaedd91

Please sign in to comment.