Skip to content

Commit

Permalink
Synchronize integration tests with :telemetry (#8)
Browse files Browse the repository at this point in the history
Get rid of `:timer.sleep/1`
  • Loading branch information
mgibowski authored Nov 4, 2023
1 parent 524b3f0 commit b3d7a7d
Showing 1 changed file with 29 additions and 5 deletions.
34 changes: 29 additions & 5 deletions test/kafkaesque/integration_test.exs
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,29 @@ defmodule Kafkaesque.IntegrationTest do
Kafkaesque.Test.Helpers.create_topics()
end

setup %{sync_on_telemetry_event: telemetry_event} do
test_pid = self()

:telemetry.attach(
"test_notifier",
telemetry_event,
fn _, _, _, _ -> send(test_pid, :telemetry_event_occured) end,
:no_config
)

on_exit(fn -> :telemetry.detach("test_notifier") end)
end

defp await_telemetry_event do
receive do
:telemetry_event_occured ->
:ok
after
1_000 ->
raise "Synchronization timeout after 1 sec"
end
end

defmodule MyApp.Kafka do
use Kafkaesque, repo: Repo

Expand All @@ -19,6 +42,7 @@ defmodule Kafkaesque.IntegrationTest do
end
end

@tag sync_on_telemetry_event: [:kafkaesque, :acknowledge, :stop]
test "integration: publishes messages to kafka" do
{:ok, _} =
Kafkaesque.start_link(
Expand All @@ -36,14 +60,14 @@ defmodule Kafkaesque.IntegrationTest do
assert message.body == Jason.encode!(%{hello: :kafka})
assert message.state == :pending

# Could perform some synchronization to avoid sleeping
:timer.sleep(1000)
# Await message acknowledgement
await_telemetry_event()

message2 = Repo.reload(message)

assert message2.state == :published
end

@tag sync_on_telemetry_event: [:kafkaesque, :publish, :start]
test "integration: complete flow including termination" do
# Hack: since there is no synchronization, we execute this test a few times
# to have increase its chance to fail if there's a bug
Expand All @@ -70,8 +94,8 @@ defmodule Kafkaesque.IntegrationTest do
# Monitoring so we can wait for the process to die
ref = Process.monitor(main_pid)

# Giving it some time to start publishing messages
:timer.sleep(200)
# Await the start of publishing messages
await_telemetry_event()

# Sending shutdown and waiting for it
Process.exit(main_pid, :shutdown)
Expand Down

0 comments on commit b3d7a7d

Please sign in to comment.