Skip to content

Commit

Permalink
Test for failures in integration tests instead (#131)
Browse files Browse the repository at this point in the history
  • Loading branch information
rtar committed Jan 8, 2021
1 parent 6780d7a commit 9afd78f
Show file tree
Hide file tree
Showing 8 changed files with 101 additions and 101 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
package com.evolutiongaming.kafka.flow

import cats.effect.concurrent.Ref
import cats.syntax.all._
import com.datastax.driver.core.Statement
import com.evolutiongaming.catshelper.MonadThrowable
import com.evolutiongaming.kafka.journal.eventual.cassandra.CassandraSession
import com.evolutiongaming.sstream.Stream

object CassandraSessionStub {

def alwaysFails[F[_]: MonadThrowable]: CassandraSession[F] = new CassandraSession[F] {
def fail[T]: F[T] = MonadThrowable[F].raiseError {
new RuntimeException("CassandraSessionStub: always fails")
}
def prepare(query: String) = fail
def execute(statement: Statement) = Stream.lift(fail)
def unsafe = sys.error("CassandraSessionStub: no unsafe session")
}

def injectFailures[F[_]: MonadThrowable](
session: CassandraSession[F],
failAfter: Ref[F, Int]
): CassandraSession[F] = new CassandraSession[F] {

def fail[T]: F[T] = MonadThrowable[F].raiseError {
new RuntimeException("CassandraSessionStub: failing after proper calls exhausted")
}

val failed = failAfter modify { failAfter =>
(failAfter - 1, failAfter <= 0)
}

def prepare(query: String) = failed.ifM(fail, session.prepare(query))

def execute(statement: Statement) = Stream.lift(failed) flatMap { failed =>
if (failed) Stream.lift(fail) else session.execute(statement)
}

def unsafe = sys.error("CassandraSessionStub: no unsafe session")

}

}
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
package com.evolutiongaming.kafka.flow.journal

import cats.effect.IO
import cats.effect.concurrent.Ref
import com.evolutiongaming.kafka.flow.CassandraSessionStub
import com.evolutiongaming.kafka.flow.CassandraSpec
import com.evolutiongaming.kafka.flow.KafkaKey
import com.evolutiongaming.kafka.journal.ConsRecord
Expand Down Expand Up @@ -37,4 +39,15 @@ class JournalSpec(val globalResources: GlobalResources) extends CassandraSpec {
}
}

test("failures") { cassandra =>
val key = KafkaKey("JournalSpec", "integration-tests-1", TopicPartition.empty, "failures")
for {
failAfter <- Ref.of(100)
session = CassandraSessionStub.injectFailures(cassandra.session, failAfter)
journals <- CassandraJournals.withSchema(session, cassandra.sync)
_ <- failAfter.set(1)
records <- journals.get(key).toList.attempt
} yield expect(records.isLeft)
}

}
Original file line number Diff line number Diff line change
@@ -1,12 +1,19 @@
package com.evolutiongaming.kafka.flow.key

import cats.effect.IO
import cats.effect.concurrent.Ref
import cats.syntax.all._
import com.evolutiongaming.kafka.flow.CassandraSessionStub
import com.evolutiongaming.kafka.flow.CassandraSpec
import com.evolutiongaming.kafka.flow.KafkaKey
import com.evolutiongaming.kafka.flow.journal.CassandraJournals
import com.evolutiongaming.kafka.flow.key.CassandraKeys
import com.evolutiongaming.kafka.journal.ConsRecord
import com.evolutiongaming.skafka.Offset
import com.evolutiongaming.skafka.Partition
import com.evolutiongaming.skafka.TopicPartition
import com.evolutiongaming.skafka.consumer.WithSize
import scodec.bits.ByteVector
import weaver.GlobalResources

class KeySpec(val globalResources: GlobalResources) extends CassandraSpec {
Expand Down Expand Up @@ -43,4 +50,14 @@ class KeySpec(val globalResources: GlobalResources) extends CassandraSpec {
}
}

}
test("failures") { cassandra =>
for {
failAfter <- Ref.of(100)
session = CassandraSessionStub.injectFailures(cassandra.session, failAfter)
keys <- CassandraKeys.withSchema(session, cassandra.sync)
_ <- failAfter.set(1)
keys <- keys.all("KeySpec", "integration-tests-1").toList.attempt
} yield expect(keys.isLeft)
}

}
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
package com.evolutiongaming.kafka.flow.snapshot

import cats.effect.IO
import cats.effect.concurrent.Ref
import com.evolutiongaming.kafka.flow.CassandraSessionStub
import com.evolutiongaming.kafka.flow.CassandraSpec
import com.evolutiongaming.kafka.flow.KafkaKey
import com.evolutiongaming.skafka.Offset
Expand All @@ -27,4 +29,15 @@ class SnapshotSpec(val globalResources: GlobalResources) extends CassandraSpec {
}
}

test("failures") { cassandra =>
val key = KafkaKey("SnapshotSpec", "integration-tests-1", TopicPartition.empty, "queries")
for {
failAfter <- Ref.of(100)
session = CassandraSessionStub.injectFailures(cassandra.session, failAfter)
snapshots <- CassandraSnapshots.withSchema[IO, String](session, cassandra.sync)
_ <- failAfter.set(1)
snapshots <- snapshots.get(key).attempt
} yield expect(snapshots.isLeft)
}

}
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
package com.evolutiongaming.kafka.flow.timer

import cats.effect.IO
import cats.effect.concurrent.Ref
import com.evolutiongaming.kafka.flow.CassandraSessionStub
import com.evolutiongaming.kafka.flow.CassandraSpec
import com.evolutiongaming.kafka.flow.KafkaKey
import com.evolutiongaming.kafka.journal.ConsRecord
Expand Down Expand Up @@ -31,4 +33,15 @@ class TimerSpec(val globalResources: GlobalResources) extends CassandraSpec {
}
}

test("failures") { cassandra =>
val key = KafkaKey("TimerSpec", "integration-tests-1", TopicPartition.empty, "failures")
for {
failAfter <- Ref.of(100)
session = CassandraSessionStub.injectFailures(cassandra.session, failAfter)
journals <- CassandraTimers.withSchema(session, cassandra.sync)
_ <- failAfter.set(1)
records <- journals.get(key).toList.attempt
} yield expect(records.isLeft)
}

}

This file was deleted.

This file was deleted.

This file was deleted.

0 comments on commit 9afd78f

Please sign in to comment.