Skip to content

Commit

Permalink
Add batch consuming option to consumers (#298)
Browse files Browse the repository at this point in the history
* Add batch consumer logic to documentation generation

* Modify consumer loop to include batch option for consuming

* Add batch consuming guide

* Fix mypy and add test to Tester

* Merge origin/main into 18-add-batching-for-consumers

* Add docs

* Implement review changes
  • Loading branch information
sternakt authored May 19, 2023
1 parent 74ed08a commit 4b47d6a
Show file tree
Hide file tree
Showing 16 changed files with 1,639 additions and 396 deletions.
16 changes: 9 additions & 7 deletions fastkafka/_application/tester.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,9 @@

from .. import KafkaEvent
from .app import FastKafka
from .._components.helpers import unwrap_list_type
from .._components.meta import delegates, export, patch
from .._components.producer_decorator import unwrap_from_kafka_event
from .._testing.apache_kafka_broker import ApacheKafkaBroker
from .._testing.in_memory_broker import InMemoryBroker
from .._testing.local_redpanda_broker import LocalRedpandaBroker
Expand Down Expand Up @@ -148,11 +150,7 @@ async def __aexit__(self, *args: Any) -> None:
def mirror_producer(topic: str, producer_f: Callable[..., Any]) -> Callable[..., Any]:
msg_type = inspect.signature(producer_f).return_annotation

if hasattr(msg_type, "__origin__") and msg_type.__origin__ == KafkaEvent:
msg_type = msg_type.__args__[0]

if hasattr(msg_type, "__origin__") and msg_type.__origin__ == list:
msg_type = msg_type.__args__[0]
msg_type_unwrapped = unwrap_list_type(unwrap_from_kafka_event(msg_type))

async def skeleton_func(msg: BaseModel) -> None:
pass
Expand All @@ -168,7 +166,7 @@ async def skeleton_func(msg: BaseModel) -> None:
parameters=[
inspect.Parameter(
name="msg",
annotation=msg_type,
annotation=msg_type_unwrapped,
kind=inspect.Parameter.POSITIONAL_OR_KEYWORD,
)
]
Expand All @@ -182,6 +180,8 @@ async def skeleton_func(msg: BaseModel) -> None:
def mirror_consumer(topic: str, consumer_f: Callable[..., Any]) -> Callable[..., Any]:
msg_type = inspect.signature(consumer_f).parameters["msg"]

msg_type_unwrapped = unwrap_list_type(msg_type)

async def skeleton_func(msg: BaseModel) -> BaseModel:
return msg

Expand All @@ -192,7 +192,9 @@ async def skeleton_func(msg: BaseModel) -> BaseModel:
mirror_func.__name__ = "to_" + topic

# adjust arg and return val
sig = sig.replace(parameters=[msg_type], return_annotation=msg_type.annotation)
sig = sig.replace(
parameters=[msg_type], return_annotation=msg_type_unwrapped.annotation
)

mirror_func.__signature__ = sig # type: ignore
return mirror_func
Expand Down
203 changes: 152 additions & 51 deletions fastkafka/_components/aiokafka_consumer_loop.py
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,14 @@ class EventMetadata:

@staticmethod
def create_event_metadata(record: ConsumerRecord) -> "EventMetadata": # type: ignore
"""Creates an instance of EventMetadata from a ConsumerRecord.
Args:
record: The Kafka ConsumerRecord.
Returns:
The created EventMetadata instance.
"""
return EventMetadata(
topic=record.topic,
partition=record.partition,
Expand All @@ -75,10 +83,15 @@ def create_event_metadata(record: ConsumerRecord) -> "EventMetadata": # type: i
)

# %% ../../nbs/011_ConsumerLoop.ipynb 11
AsyncConsume = Callable[[BaseModel], Awaitable[None]]
AsyncConsumeMeta = Callable[[BaseModel, EventMetadata], Awaitable[None]]
SyncConsume = Callable[[BaseModel], None]
SyncConsumeMeta = Callable[[BaseModel, EventMetadata], None]
AsyncConsume = Callable[[Union[List[BaseModel], BaseModel]], Awaitable[None]]
AsyncConsumeMeta = Callable[
[Union[List[BaseModel], BaseModel], Union[List[EventMetadata], EventMetadata]],
Awaitable[None],
]
SyncConsume = Callable[[Union[List[BaseModel], BaseModel]], None]
SyncConsumeMeta = Callable[
[Union[List[BaseModel], BaseModel], Union[List[EventMetadata], EventMetadata]], None
]

ConsumeCallable = Union[AsyncConsume, AsyncConsumeMeta, SyncConsume, SyncConsumeMeta]

Expand All @@ -96,26 +109,30 @@ def _callback_parameters_wrapper(
"""

async def _params_wrap(
msg: BaseModel,
meta: EventMetadata,
msg: Union[BaseModel, List[BaseModel]],
meta: Union[EventMetadata, List[EventMetadata]],
callback: Union[AsyncConsume, AsyncConsumeMeta] = callback,
) -> None:
types = list(get_type_hints(callback).values())
args: List[Union[BaseModel, EventMetadata]] = [msg]
args: List[
Union[BaseModel, List[BaseModel], EventMetadata, List[EventMetadata]]
] = [msg]
if EventMetadata in types:
args.insert(types.index(EventMetadata), meta)
if List[EventMetadata] in types:
args.insert(types.index(List[EventMetadata]), meta)
await callback(*args) # type: ignore

return _params_wrap

# %% ../../nbs/011_ConsumerLoop.ipynb 16
# %% ../../nbs/011_ConsumerLoop.ipynb 17
def _prepare_callback(callback: ConsumeCallable) -> AsyncConsumeMeta:
"""
Prepares a callback to be used in the consumer loop.
1. If callback is sync, asyncify it
2. Wrap the callback into a safe callback for exception handling
Params:
Args:
callback: async callable that will be prepared for use in consumer
Returns:
Expand All @@ -126,35 +143,113 @@ def _prepare_callback(callback: ConsumeCallable) -> AsyncConsumeMeta:
)
return _callback_parameters_wrapper(async_callback)

# %% ../../nbs/011_ConsumerLoop.ipynb 18
async def _stream_msgs( # type: ignore
msgs: Dict[TopicPartition, bytes],
send_stream: anyio.streams.memory.MemoryObjectSendStream[Any],
) -> None:
# %% ../../nbs/011_ConsumerLoop.ipynb 24
def _get_single_msg_handlers( # type: ignore
*,
consumer: AIOKafkaConsumer,
callback: AsyncConsumeMeta,
decoder_fn: Callable[[bytes, ModelMetaclass], Any],
msg_type: Type[BaseModel],
**kwargs: Any,
) -> Tuple[
Callable[
[
ConsumerRecord,
AsyncConsumeMeta,
Callable[[bytes, ModelMetaclass], Any],
Type[BaseModel],
],
Awaitable[None],
],
Callable[[AIOKafkaConsumer, Any], Awaitable[List[ConsumerRecord]]],
]:
"""
Decodes and streams the message and topic to the send_stream.
Retrieves the message handlers for consuming single messages from a Kafka topic.
Params:
msgs:
send_stream:
Args:
consumer: The Kafka consumer instance.
callback: The callback function to handle the consumed message.
decoder_fn: The function to decode the consumed message.
msg_type: The type of the consumed message.
**kwargs: Additional keyword arguments for the consumer.
Returns:
The handle_msg function and poll_consumer function.
"""
for topic_partition, topic_msgs in msgs.items():
topic = topic_partition.topic
try:
await send_stream.send(topic_msgs)
except Exception as e:
logger.warning(
f"_stream_msgs(): Unexpected exception '{e.__repr__()}' caught and ignored for topic='{topic_partition.topic}', partition='{topic_partition.partition}' and messages: {topic_msgs!r}"
)

async def handle_msg( # type: ignore
record: ConsumerRecord,
callback: AsyncConsumeMeta = callback,
decoder_fn: Callable[[bytes, ModelMetaclass], Any] = decoder_fn,
msg_type: Type[BaseModel] = msg_type,
) -> None:
await callback(
decoder_fn(record.value, msg_type),
EventMetadata.create_event_metadata(record),
)

async def poll_consumer( # type: ignore
consumer: AIOKafkaConsumer = consumer, kwargs: Any = kwargs
) -> List[ConsumerRecord]:
msgs = await consumer.getmany(**kwargs)
return [msg for msg_group in msgs.values() for msg in msg_group]

return handle_msg, poll_consumer

# %% ../../nbs/011_ConsumerLoop.ipynb 26
def _get_batch_msg_handlers( # type: ignore
*,
consumer: AIOKafkaConsumer,
callback: AsyncConsumeMeta,
decoder_fn: Callable[[bytes, ModelMetaclass], Any],
msg_type: Type[BaseModel],
**kwargs: Any,
) -> Tuple[
Callable[
[
List[ConsumerRecord],
AsyncConsumeMeta,
Callable[[bytes, ModelMetaclass], Any],
Type[BaseModel],
],
Awaitable[None],
],
Callable[[AIOKafkaConsumer, Any], Awaitable[List[List[ConsumerRecord]]]],
]:
"""
Retrieves the message handlers for consuming messages in batches from a Kafka topic.
Args:
consumer: The Kafka consumer instance.
callback: The callback function to handle the consumed messages.
decoder_fn: The function to decode the consumed messages.
msg_type: The type of the consumed messages.
**kwargs: Additional keyword arguments for the consumer.
def _decode_streamed_msgs( # type: ignore
msgs: List[ConsumerRecord], msg_type: BaseModel
) -> List[BaseModel]:
decoded_msgs = [msg_type.parse_raw(msg.value.decode("utf-8")) for msg in msgs]
return decoded_msgs
Returns:
The handle_msg function and poll_consumer function.
"""

# %% ../../nbs/011_ConsumerLoop.ipynb 23
async def handle_msg( # type: ignore
records: List[ConsumerRecord],
callback: AsyncConsumeMeta = callback,
decoder_fn: Callable[[bytes, ModelMetaclass], Any] = decoder_fn,
msg_type: Type[BaseModel] = msg_type,
) -> None:
await callback(
[decoder_fn(record.value, msg_type) for record in records],
[EventMetadata.create_event_metadata(record) for record in records],
)

async def poll_consumer( # type: ignore
consumer: AIOKafkaConsumer = consumer, kwargs: Any = kwargs
) -> List[List[ConsumerRecord]]:
msgs = await consumer.getmany(**kwargs)
return [value for value in msgs.values() if len(value) > 0]

return handle_msg, poll_consumer

# %% ../../nbs/011_ConsumerLoop.ipynb 28
@delegates(AIOKafkaConsumer.getmany)
async def _aiokafka_consumer_loop( # type: ignore
consumer: AIOKafkaConsumer,
Expand All @@ -163,7 +258,7 @@ async def _aiokafka_consumer_loop( # type: ignore
decoder_fn: Callable[[bytes, ModelMetaclass], Any],
callback: ConsumeCallable,
max_buffer_size: int = 100_000,
msg_type: Type[BaseModel],
msg_type: Union[Type[List[BaseModel]], Type[BaseModel]],
is_shutting_down_f: Callable[[], bool],
executor: Union[str, StreamExecutor, None] = None,
**kwargs: Any,
Expand All @@ -172,7 +267,7 @@ async def _aiokafka_consumer_loop( # type: ignore
Consumer loop for infinite pooling of the AIOKafka consumer for new messages. Calls consumer.getmany()
and after the consumer return messages or times out, messages are decoded and streamed to defined callback.
Params:
Args:
topic: Topic to subscribe
decoder_fn: Function to decode the messages consumed from the topic
callbacks: Dict of callbacks mapped to their respective topics
Expand All @@ -184,29 +279,35 @@ async def _aiokafka_consumer_loop( # type: ignore

prepared_callback = _prepare_callback(callback)

async def handle_msg( # type: ignore
record: ConsumerRecord,
callback: AsyncConsumeMeta = prepared_callback,
decoder_fn: Callable[[bytes, ModelMetaclass], Any] = decoder_fn,
msg_type: Type[BaseModel] = msg_type,
) -> None:
await callback(
decoder_fn(record.value, msg_type),
EventMetadata.create_event_metadata(record),
if hasattr(msg_type, "__origin__") and msg_type.__origin__ == list:
handle_msg, poll_consumer = _get_batch_msg_handlers(
consumer=consumer,
callback=prepared_callback,
decoder_fn=decoder_fn,
msg_type=msg_type.__args__[0], # type: ignore
**kwargs,
)
else:
handle_msg, poll_consumer = _get_single_msg_handlers(
consumer=consumer,
callback=prepared_callback,
decoder_fn=decoder_fn,
msg_type=msg_type, # type: ignore
**kwargs,
)

async def poll_consumer(kwargs: Any = kwargs) -> List[ConsumerRecord]: # type: ignore
msgs = await consumer.getmany(**kwargs)
return [msg for msg_group in msgs.values() for msg in msg_group]

await get_executor(executor).run(is_shutting_down_f, poll_consumer, handle_msg)
await get_executor(executor).run(
is_shutting_down_f=is_shutting_down_f,
generator=poll_consumer, # type: ignore
processor=handle_msg, # type: ignore
)

# %% ../../nbs/011_ConsumerLoop.ipynb 28
# %% ../../nbs/011_ConsumerLoop.ipynb 35
def sanitize_kafka_config(**kwargs: Any) -> Dict[str, Any]:
"""Sanitize Kafka config"""
return {k: "*" * len(v) if "pass" in k.lower() else v for k, v in kwargs.items()}

# %% ../../nbs/011_ConsumerLoop.ipynb 30
# %% ../../nbs/011_ConsumerLoop.ipynb 37
@delegates(AIOKafkaConsumer)
@delegates(_aiokafka_consumer_loop, keep=True)
async def aiokafka_consumer_loop(
Expand All @@ -216,7 +317,7 @@ async def aiokafka_consumer_loop(
timeout_ms: int = 100,
max_buffer_size: int = 100_000,
callback: ConsumeCallable,
msg_type: Type[BaseModel],
msg_type: Union[Type[List[BaseModel]], Type[BaseModel]],
is_shutting_down_f: Callable[[], bool],
executor: Union[str, StreamExecutor, None] = None,
**kwargs: Any,
Expand Down
Loading

0 comments on commit 4b47d6a

Please sign in to comment.