diff --git a/examples/getting_started/main.py b/examples/getting_started/main.py index 3d22fc0..7cd5033 100644 --- a/examples/getting_started/main.py +++ b/examples/getting_started/main.py @@ -82,7 +82,7 @@ def threaded_function(addr_queue): connection = create_connection() offset_specification = StreamFilterOptions() offset_specification.offset(10) - consumer = connection.consumer(addr_queue, handler=MyMessageHandler()) + consumer = connection.consumer(addr_queue, handler=MyMessageHandler(), stream_filter_options=offset_specification) try: consumer.run() except KeyboardInterrupt: diff --git a/rabbitmq_amqp_python_client/entities.py b/rabbitmq_amqp_python_client/entities.py index 37b084f..815c9fd 100644 --- a/rabbitmq_amqp_python_client/entities.py +++ b/rabbitmq_amqp_python_client/entities.py @@ -45,8 +45,6 @@ def __init__(self): self._filter_set: Dict[symbol, Described] = {} def offset(self, offset: int): - #self._filter_set[symbol(STREAM_FILTER_SPEC)] = Described(symbol(STREAM_FILTER_SPEC), "first") - print("im here") self._filter_set[symbol('rabbitmq:stream-offset-spec')] = Described(symbol('rabbitmq:stream-offset-spec'), "first") def filters(self) -> Dict[symbol, Described]: diff --git a/rabbitmq_amqp_python_client/options.py b/rabbitmq_amqp_python_client/options.py index 1e9e194..c05f85a 100644 --- a/rabbitmq_amqp_python_client/options.py +++ b/rabbitmq_amqp_python_client/options.py @@ -1,6 +1,6 @@ from .qpid.proton._data import ( # noqa: E402 PropertyDict, - symbol, + symbol, Described, ) from .qpid.proton._endpoints import Link # noqa: E402 from .qpid.proton.reactor import LinkOption, Filter # noqa: E402 @@ -71,11 +71,7 @@ def __init__(self, addr: str, filter_options: StreamFilterOptions): def apply(self, link: Link) -> None: - link.target.address = self._addr - link.snd_settle_mode = Link.SND_UNSETTLED - link.rcv_settle_mode = Link.RCV_FIRST - link.properties = PropertyDict({symbol("paired"): True}) - link.source.dynamic = False + link.source.filter.put_dict(self.filter_set) def test(self, link: Link) -> bool: return bool(link.is_receiver)