From c21aacc955e79d7168599f80a239e654268b8beb Mon Sep 17 00:00:00 2001 From: Ben Plommer Date: Tue, 15 Mar 2022 09:52:06 +0000 Subject: [PATCH] fixes --- .../main/scala/fs2/kafka/KafkaConsumer.scala | 43 +++++++++---------- .../kafka/internal/KafkaConsumerActor.scala | 4 +- 2 files changed, 22 insertions(+), 25 deletions(-) diff --git a/modules/core/src/main/scala/fs2/kafka/KafkaConsumer.scala b/modules/core/src/main/scala/fs2/kafka/KafkaConsumer.scala index 8c31aedc4..516260b7c 100644 --- a/modules/core/src/main/scala/fs2/kafka/KafkaConsumer.scala +++ b/modules/core/src/main/scala/fs2/kafka/KafkaConsumer.scala @@ -275,32 +275,29 @@ object KafkaConsumer { streamId: StreamId, assignmentRef: Ref[F, Map[TopicPartition, Deferred[F, Unit]]], partitionsMapQueue: PartitionsMapQueue - ): F[Map[TopicPartition, Deferred[F, Unit]]] = - Deferred[F, Either[Throwable, SortedSet[TopicPartition]]].flatMap { deferred => - val request = - Request.Assignment[F]( - deferred.complete(_).void, - Some( - onRebalance( - streamId, - assignmentRef, - partitionsMapQueue - ) - ) + ): F[Map[TopicPartition, Deferred[F, Unit]]] = { + val assignment = this.assignment( + Some( + onRebalance( + streamId, + assignmentRef, + partitionsMapQueue ) - val assignment = requests.offer(request) >> deferred.get.rethrow - F.race(awaitTermination.attempt, assignment).flatMap { - case Left(_) => - F.pure(Map.empty) + ) + ) - case Right(assigned) => - assigned.toVector - .traverse { partition => - Deferred[F, Unit].map(partition -> _) - } - .map(_.toMap) - } + F.race(awaitTermination.attempt, assignment).flatMap { + case Left(_) => + F.pure(Map.empty) + + case Right(assigned) => + assigned.toVector + .traverse { partition => + Deferred[F, Unit].map(partition -> _) + } + .map(_.toMap) } + } def initialEnqueue( streamId: StreamId, diff --git a/modules/core/src/main/scala/fs2/kafka/internal/KafkaConsumerActor.scala b/modules/core/src/main/scala/fs2/kafka/internal/KafkaConsumerActor.scala index 957ce8716..756c0842c 100644 --- a/modules/core/src/main/scala/fs2/kafka/internal/KafkaConsumerActor.scala +++ b/modules/core/src/main/scala/fs2/kafka/internal/KafkaConsumerActor.scala @@ -444,7 +444,7 @@ private[kafka] final class KafkaConsumerActor[F[_], K, V]( def handle(request: Request[F, K, V]): F[Unit] = request match { - case Request.Poll() => poll + case Request.Poll() => poll case Request.Fetch(partition, streamId, callback) => fetch(partition, streamId, callback) case request @ Request.Commit(_, _) => commit(request) @@ -648,7 +648,7 @@ private[kafka] object KafkaConsumerActor { object Request { final case class Permit[F[_]](callback: Resource[F, Unit] => F[Unit]) extends Request[F, Any, Any] - + final case class Poll[F[_]]() extends Request[F, Any, Any] private[this] val pollInstance: Poll[Nothing] =