Skip to content

Commit

Permalink
Removed all debugging code because the bug is fixed now (#131).
Browse files Browse the repository at this point in the history
  • Loading branch information
rtar committed Jan 10, 2021
1 parent 3e8a105 commit cddb626
Show file tree
Hide file tree
Showing 8 changed files with 12 additions and 40 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ object ConsumerFlow {
* Note, that topic specified by an appropriate parameter should contain a
* journal in the format of `Kafka Journal` library.
*/
def of[F[_]: MonadThrowable: Timeout: LogOf](
def of[F[_]: MonadThrowable: LogOf](
consumer: Consumer[F],
topic: Topic,
flowOf: TopicFlowOf[F],
Expand All @@ -48,7 +48,7 @@ object ConsumerFlow {
* Note, that topics specified by an appropriate parameter should contain a
* journal in the format of `Kafka Journal` library.
*/
def of[F[_]: MonadThrowable: Timeout: LogOf](
def of[F[_]: MonadThrowable: LogOf](
consumer: Consumer[F],
topics: NonEmptySet[Topic],
flowOf: TopicFlowOf[F],
Expand All @@ -68,7 +68,7 @@ object ConsumerFlow {
* Note, that topic specified by an appropriate parameter should contain a
* journal in the format of `Kafka Journal` library.
*/
def apply[F[_]: MonadThrowable: Timeout: LogOf](
def apply[F[_]: MonadThrowable: LogOf](
consumer: Consumer[F],
flows: Map[Topic, TopicFlow[F]],
config: ConsumerFlowConfig
Expand All @@ -92,7 +92,7 @@ object ConsumerFlow {

def stream = for {
_ <- Stream.lift(subscribe)
records <- Stream.repeat(Timeout[F].timeout(poll))
records <- Stream.repeat(poll)
// we process empty polls to trigger timers, but do not return them
if records.values.nonEmpty
} yield records
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ object ConsumerFlowOf {
* Note, that topic specified by an appropriate parameter should contain a
* journal in the format of `Kafka Journal` library.
*/
def apply[F[_]: BracketThrowable: Timeout: LogOf](
def apply[F[_]: BracketThrowable: LogOf](
topic: Topic,
flowOf: TopicFlowOf[F],
config: ConsumerFlowConfig = ConsumerFlowConfig()
Expand All @@ -33,7 +33,7 @@ object ConsumerFlowOf {
* Note, that topics specified by an appropriate parameter should contain a
* journal in the format of `Kafka Journal` library.
*/
def apply[F[_]: BracketThrowable: Timeout: LogOf](
def apply[F[_]: BracketThrowable: LogOf](
topics: NonEmptySet[Topic],
flowOf: TopicFlowOf[F],
): ConsumerFlowOf[F] = ConsumerFlowOf(topics, flowOf, ConsumerFlowConfig())
Expand All @@ -43,7 +43,7 @@ object ConsumerFlowOf {
* Note, that topics specified by an appropriate parameter should contain a
* journal in the format of `Kafka Journal` library.
*/
def apply[F[_]: BracketThrowable: Timeout: LogOf](
def apply[F[_]: BracketThrowable: LogOf](
topics: NonEmptySet[Topic],
flowOf: TopicFlowOf[F],
config: ConsumerFlowConfig
Expand Down
24 changes: 0 additions & 24 deletions core/src/main/scala/com/evolutiongaming/kafka/flow/Timeout.scala

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,6 @@ object ConsumerFlowSpec {
type F[A] = StateT[SyncIO, Context, A]

implicit val logOf: LogOf[F] = LogOf.empty
implicit val timeout: Timeout[F] = Timeout.never

case class StopException(context: Context) extends Exception

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -278,6 +278,4 @@ object KafkaFlowSpec {

case object Error extends RuntimeException with NoStackTrace

implicit val timeout: Timeout[F] = Timeout.never

}
1 change: 0 additions & 1 deletion docs/overview.md
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,6 @@ implicit val bracketThrowable = BracketThrowable[IO]
implicit val measureDuration = MeasureDuration.empty[IO]
implicit val logOf = LogOf.empty[IO]
implicit val log = Log.empty[IO]
implicit val timetout = Timeout.never[IO]
```

## KafkaFlow
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,6 @@ class FlowSpec(val globalResources: GlobalResources) extends CassandraSpec {
ConsumerRecords(Map(TopicPartition.empty -> records))
}
join <- {
implicit val timeout = Timeout.of(10.minutes)
implicit val retry = Retry.empty[IO]
KafkaFlow.resource(
consumer = Resource.liftF(consumer),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,8 +27,6 @@ object CassandraModule {

def log[F[_]: LogOf]: F[Log[F]] = LogOf[F].apply(CassandraModule.getClass)

val FutureTimeout: FiniteDuration = 5.minutes

def clusterOf[F[_]: Sync](
fromGFuture: FromGFuture[F]
): F[CassandraClusterOf[F]] = {
Expand All @@ -49,13 +47,16 @@ object CassandraModule {

for {
log <- Resource.liftF(log[F])
// this is required to log all Cassandra errors before popping them up,
// which is useful because popped up errors might be lost in some cases
// while kafka-flow is accessing Cassandra in bracket/resource release
// routine
fromGFuture = new FromGFuture[F] {
val self = FromGFuture.lift[F]
def apply[A](future: => ListenableFuture[A]) = {
val fa = self(future) onError { case e =>
self(future) onError { case e =>
log.error("Cassandra request failed", e)
}
Concurrent.timeout(fa, FutureTimeout)
}
}
clusterOf <- Resource.liftF(clusterOf[F](fromGFuture))
Expand Down

0 comments on commit cddb626

Please sign in to comment.