Skip to content

Commit a4d9e85

Browse files
implement application properties filter (#78)
* This PR implements application properties filtering for AMQP streams, enabling consumers to filter messages based on their application-level properties. This completes the feature request tracked in issue #42. Adds support for filtering messages by application properties in stream consumers Updates the example to demonstrate both message properties and application properties filtering Extends the StreamConsumerOptions class with application properties filter handling --------- Signed-off-by: Gabriele Santomaggio <G.santomaggio@gmail.com> Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com>
1 parent 4146b5f commit a4d9e85

File tree

2 files changed

+27
-1
lines changed

2 files changed

+27
-1
lines changed

examples/streams_with_filters/example_streams_with_filters.py

Lines changed: 9 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -29,11 +29,13 @@ def __init__(self):
2929

3030
def on_amqp_message(self, event: Event):
3131
# only messages with banana filters and with subject yellow
32+
# and application property from = italy get received
3233
self._count = self._count + 1
3334
logger.info(
34-
"Received message: {}, subject {}.[Total Consumed: {}]".format(
35+
"Received message: {}, subject {} application properties {} .[Total Consumed: {}]".format(
3536
Converter.bytes_to_string(event.message.body),
3637
event.message.subject,
38+
event.message.application_properties,
3739
self._count,
3840
)
3941
)
@@ -88,13 +90,15 @@ def main() -> None:
8890
addr_queue,
8991
message_handler=MyMessageHandler(),
9092
# the consumer will only receive messages with filter value banana and subject yellow
93+
# and application property from = italy
9194
stream_consumer_options=StreamConsumerOptions(
9295
offset_specification=OffsetSpecification.first,
9396
filter_options=StreamFilterOptions(
9497
values=["banana"],
9598
message_properties=MessageProperties(
9699
subject="yellow",
97100
),
101+
application_properties={"from": "italy"},
98102
),
99103
),
100104
)
@@ -108,11 +112,13 @@ def main() -> None:
108112
# publish with a filter of apple
109113
for i in range(MESSAGES_TO_PUBLISH):
110114
color = "green" if i % 2 == 0 else "yellow"
115+
from_value = "italy" if i % 3 == 0 else "spain"
111116
publisher.publish(
112117
Message(
113118
Converter.string_to_bytes(body="apple: " + str(i)),
114119
annotations={"x-stream-filter-value": "apple"},
115120
subject=color,
121+
application_properties={"from": from_value},
116122
)
117123
)
118124

@@ -121,11 +127,13 @@ def main() -> None:
121127
# publish with a filter of banana
122128
for i in range(MESSAGES_TO_PUBLISH):
123129
color = "green" if i % 2 == 0 else "yellow"
130+
from_value = "italy" if i % 3 == 0 else "spain"
124131
publisher.publish(
125132
Message(
126133
body=Converter.string_to_bytes("banana: " + str(i)),
127134
annotations={"x-stream-filter-value": "banana"},
128135
subject=color,
136+
application_properties={"from": from_value},
129137
)
130138
)
131139

rabbitmq_amqp_python_client/entities.py

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,7 @@
1111
STREAM_OFFSET_SPEC = "rabbitmq:stream-offset-spec"
1212
STREAM_FILTER_MATCH_UNFILTERED = "rabbitmq:stream-match-unfiltered"
1313
AMQP_PROPERTIES_FILTER = "amqp:properties-filter"
14+
AMQP_APPLICATION_PROPERTIES_FILTER = "amqp:application-properties-filter"
1415

1516

1617
@dataclass
@@ -252,6 +253,11 @@ def __init__(
252253

253254
if filter_options is not None and filter_options.message_properties is not None:
254255
self._filter_message_properties(filter_options.message_properties)
256+
if (
257+
filter_options is not None
258+
and filter_options.application_properties is not None
259+
):
260+
self._filter_application_properties(filter_options.application_properties)
255261

256262
def _offset(self, offset_specification: Union[OffsetSpecification, int]) -> None:
257263
"""
@@ -316,6 +322,18 @@ def _filter_message_properties(
316322
symbol(AMQP_PROPERTIES_FILTER), filter_prop
317323
)
318324

325+
def _filter_application_properties(
326+
self, application_properties: Optional[dict[str, Any]]
327+
) -> None:
328+
app_prop = {}
329+
if application_properties is not None:
330+
app_prop = application_properties.copy()
331+
332+
if len(app_prop) > 0:
333+
self._filter_set[symbol(AMQP_APPLICATION_PROPERTIES_FILTER)] = (
334+
Described(symbol(AMQP_APPLICATION_PROPERTIES_FILTER), app_prop)
335+
)
336+
319337
def filter_set(self) -> Dict[symbol, Described]:
320338
"""
321339
Get the current filter set configuration.

0 commit comments

Comments
 (0)