diff --git a/.github/workflows/test.yml b/.github/workflows/test.yml index f6c831b4dae..286f970b1be 100644 --- a/.github/workflows/test.yml +++ b/.github/workflows/test.yml @@ -60,6 +60,7 @@ jobs: strategy: matrix: include: + - name: zipkin-collector-activemq - name: zipkin-collector-kafka - name: zipkin-collector-rabbitmq - name: zipkin-storage-cassandra diff --git a/benchmarks/src/test/java/zipkin2/server/ServerIntegratedBenchmark.java b/benchmarks/src/test/java/zipkin2/server/ServerIntegratedBenchmark.java index 49aa61a5f1b..b4d19923959 100644 --- a/benchmarks/src/test/java/zipkin2/server/ServerIntegratedBenchmark.java +++ b/benchmarks/src/test/java/zipkin2/server/ServerIntegratedBenchmark.java @@ -91,7 +91,7 @@ class ServerIntegratedBenchmark { @Test void elasticsearch() throws Exception { GenericContainer elasticsearch = - new GenericContainer<>(parse("ghcr.io/openzipkin/zipkin-elasticsearch7:2.24.3")) + new GenericContainer<>(parse("ghcr.io/openzipkin/zipkin-elasticsearch7:2.24.4")) .withNetwork(Network.SHARED) .withNetworkAliases("elasticsearch") .withLabel("name", "elasticsearch") @@ -105,7 +105,7 @@ class ServerIntegratedBenchmark { @Test void cassandra3() throws Exception { GenericContainer cassandra = - new GenericContainer<>(parse("ghcr.io/openzipkin/zipkin-cassandra:2.24.3")) + new GenericContainer<>(parse("ghcr.io/openzipkin/zipkin-cassandra:2.24.4")) .withNetwork(Network.SHARED) .withNetworkAliases("cassandra") .withLabel("name", "cassandra") @@ -119,7 +119,7 @@ class ServerIntegratedBenchmark { @Test void mysql() throws Exception { GenericContainer mysql = - new GenericContainer<>(parse("ghcr.io/openzipkin/zipkin-mysql:2.24.3")) + new GenericContainer<>(parse("ghcr.io/openzipkin/zipkin-mysql:2.24.4")) .withNetwork(Network.SHARED) .withNetworkAliases("mysql") .withLabel("name", "mysql") diff --git a/zipkin-collector/activemq/pom.xml b/zipkin-collector/activemq/pom.xml index 19b6fff1908..cdd7cec652d 100644 --- a/zipkin-collector/activemq/pom.xml +++ b/zipkin-collector/activemq/pom.xml @@ -44,21 +44,5 @@ activemq-client ${activemq.version} - - - org.apache.activemq - activemq-broker - ${activemq.version} - test - - - - - org.apache.activemq.tooling - activemq-junit - ${activemq.version} - test - - diff --git a/zipkin-collector/activemq/src/test/java/zipkin2/collector/activemq/ActiveMQExtension.java b/zipkin-collector/activemq/src/test/java/zipkin2/collector/activemq/ActiveMQExtension.java new file mode 100644 index 00000000000..c6e662d5849 --- /dev/null +++ b/zipkin-collector/activemq/src/test/java/zipkin2/collector/activemq/ActiveMQExtension.java @@ -0,0 +1,76 @@ +/* + * Copyright 2016-2023 The OpenZipkin Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except + * in compliance with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License + * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express + * or implied. See the License for the specific language governing permissions and limitations under + * the License. + */ +package zipkin2.collector.activemq; + +import java.time.Duration; +import org.apache.activemq.ActiveMQConnectionFactory; +import org.junit.jupiter.api.extension.AfterAllCallback; +import org.junit.jupiter.api.extension.BeforeAllCallback; +import org.junit.jupiter.api.extension.ExtensionContext; +import org.opentest4j.TestAbortedException; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.testcontainers.containers.GenericContainer; +import org.testcontainers.containers.wait.strategy.Wait; + +import static org.testcontainers.utility.DockerImageName.parse; + +class ActiveMQExtension implements BeforeAllCallback, AfterAllCallback { + static final Logger LOGGER = LoggerFactory.getLogger(ActiveMQExtension.class); + static final int ACTIVEMQ_PORT = 61616; + + ActiveMQContainer container = new ActiveMQContainer(); + + @Override public void beforeAll(ExtensionContext context) { + if (context.getRequiredTestClass().getEnclosingClass() != null) { + // Only run once in outermost scope. + return; + } + + container.start(); + LOGGER.info("Using brokerURL " + brokerURL()); + } + + @Override public void afterAll(ExtensionContext context) { + if (context.getRequiredTestClass().getEnclosingClass() != null) { + // Only run once in outermost scope. + return; + } + + container.stop(); + } + + ActiveMQCollector.Builder newCollectorBuilder(String queue) { + ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory(); + connectionFactory.setBrokerURL(brokerURL()); + return ActiveMQCollector.builder().queue(queue).connectionFactory(connectionFactory); + } + + String brokerURL() { + return "failover:tcp://" + container.getHost() + ":" + container.getMappedPort(ACTIVEMQ_PORT); + } + + // mostly waiting for https://github.com/testcontainers/testcontainers-java/issues/3537 + static final class ActiveMQContainer extends GenericContainer { + ActiveMQContainer() { + super(parse("ghcr.io/openzipkin/zipkin-activemq:2.24.4")); + if ("true".equals(System.getProperty("docker.skip"))) { + throw new TestAbortedException("${docker.skip} == true"); + } + withExposedPorts(ACTIVEMQ_PORT); + waitStrategy = Wait.forListeningPorts(ACTIVEMQ_PORT); + withStartupTimeout(Duration.ofSeconds(60)); + } + } +} diff --git a/zipkin-collector/activemq/src/test/java/zipkin2/collector/activemq/ITActiveMQCollector.java b/zipkin-collector/activemq/src/test/java/zipkin2/collector/activemq/ITActiveMQCollector.java index 0367f8c9607..962c597f5ca 100644 --- a/zipkin-collector/activemq/src/test/java/zipkin2/collector/activemq/ITActiveMQCollector.java +++ b/zipkin-collector/activemq/src/test/java/zipkin2/collector/activemq/ITActiveMQCollector.java @@ -18,17 +18,27 @@ import java.lang.reflect.Method; import java.util.Arrays; import java.util.List; +import java.util.Map; +import java.util.NoSuchElementException; import java.util.Optional; import java.util.concurrent.CopyOnWriteArraySet; import java.util.concurrent.CountDownLatch; import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.atomic.AtomicInteger; +import javax.jms.BytesMessage; +import javax.jms.Connection; +import javax.jms.Queue; +import javax.jms.QueueReceiver; +import javax.jms.QueueSender; +import javax.jms.QueueSession; import org.apache.activemq.ActiveMQConnectionFactory; -import org.apache.activemq.junit.EmbeddedActiveMQBroker; import org.junit.jupiter.api.AfterEach; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; import org.junit.jupiter.api.TestInfo; +import org.junit.jupiter.api.TestInstance; +import org.junit.jupiter.api.Timeout; +import org.junit.jupiter.api.extension.RegisterExtension; import zipkin2.Call; import zipkin2.Callback; import zipkin2.Component; @@ -46,11 +56,12 @@ import static zipkin2.codec.SpanBytesEncoder.PROTO3; import static zipkin2.codec.SpanBytesEncoder.THRIFT; -public class ITActiveMQCollector { +@TestInstance(TestInstance.Lifecycle.PER_CLASS) +@Timeout(60) +class ITActiveMQCollector { + @RegisterExtension ActiveMQExtension activemq = new ActiveMQExtension(); List spans = Arrays.asList(LOTS_OF_SPANS[0], LOTS_OF_SPANS[1]); - /** Managed directly as this is a JUnit 4, not 5 type. */ - EmbeddedActiveMQBroker activemq; public String testName; InMemoryCollectorMetrics metrics = new InMemoryCollectorMetrics(); @@ -58,12 +69,7 @@ public class ITActiveMQCollector { CopyOnWriteArraySet threadsProvidingSpans = new CopyOnWriteArraySet<>(); LinkedBlockingQueue> receivedSpans = new LinkedBlockingQueue<>(); - SpanConsumer consumer = (spans) -> { - threadsProvidingSpans.add(Thread.currentThread()); - receivedSpans.add(spans); - return Call.create(null); - }; - + SpanConsumer consumer; ActiveMQCollector collector; @BeforeEach void start(TestInfo testInfo) { @@ -71,14 +77,18 @@ public class ITActiveMQCollector { if (testMethod.isPresent()) { this.testName = testMethod.get().getName(); } - activemq = new EmbeddedActiveMQBroker(); - activemq.start(); + threadsProvidingSpans.clear(); + receivedSpans.clear(); + consumer = (spans) -> { + threadsProvidingSpans.add(Thread.currentThread()); + receivedSpans.add(spans); + return Call.create(null); + }; activemqMetrics.clear(); collector = builder().build().start(); } @AfterEach void stop() throws IOException { - activemq.stop(); collector.close(); } @@ -106,9 +116,8 @@ public class ITActiveMQCollector { * information. */ @Test void toStringContainsOnlySummaryInformation() { - assertThat(collector).hasToString(String.format("ActiveMQCollector{brokerURL=%s, queue=%s}", - activemq.getVmURL(), testName) - ); + assertThat(collector).hasToString( + String.format("ActiveMQCollector{brokerURL=%s, queue=%s}", activemq.brokerURL(), testName)); } /** Ensures list encoding works: a json encoded list of spans */ @@ -128,7 +137,7 @@ public class ITActiveMQCollector { void messageWithMultipleSpans(SpanBytesEncoder encoder) throws Exception { byte[] message = encoder.encodeList(spans); - activemq.pushMessage(collector.queue, message); + pushMessage(collector.queue, message); assertThat(receivedSpans.take()).isEqualTo(spans); @@ -143,18 +152,18 @@ void messageWithMultipleSpans(SpanBytesEncoder encoder) throws Exception { @Test void skipsMalformedData() throws Exception { byte[] malformed1 = "[\"='".getBytes(UTF_8); // screwed up json byte[] malformed2 = "malformed".getBytes(UTF_8); - activemq.pushMessage(collector.queue, THRIFT.encodeList(spans)); - activemq.pushMessage(collector.queue, new byte[0]); - activemq.pushMessage(collector.queue, malformed1); - activemq.pushMessage(collector.queue, malformed2); - activemq.pushMessage(collector.queue, THRIFT.encodeList(spans)); + pushMessage(collector.queue, THRIFT.encodeList(spans)); + pushMessage(collector.queue, new byte[0]); + pushMessage(collector.queue, malformed1); + pushMessage(collector.queue, malformed2); + pushMessage(collector.queue, THRIFT.encodeList(spans)); Thread.sleep(1000); assertThat(activemqMetrics.messages()).isEqualTo(5); assertThat(activemqMetrics.messagesDropped()).isEqualTo(2); // only malformed, not empty - assertThat(activemqMetrics.bytes()) - .isEqualTo(THRIFT.encodeList(spans).length * 2 + malformed1.length + malformed2.length); + assertThat(activemqMetrics.bytes()).isEqualTo( + THRIFT.encodeList(spans).length * 2 + malformed1.length + malformed2.length); assertThat(activemqMetrics.spans()).isEqualTo(spans.size() * 2); assertThat(activemqMetrics.spansDropped()).isZero(); } @@ -183,12 +192,12 @@ void messageWithMultipleSpans(SpanBytesEncoder encoder) throws Exception { } }; - activemq.pushMessage(collector.queue, PROTO3.encodeList(spans)); - activemq.pushMessage(collector.queue, PROTO3.encodeList(spans)); // tossed on error - activemq.pushMessage(collector.queue, PROTO3.encodeList(spans)); - collector = builder().storage(buildStorage(consumer)).build().start(); + pushMessage(collector.queue, PROTO3.encodeList(spans)); + pushMessage(collector.queue, PROTO3.encodeList(spans)); // tossed on error + pushMessage(collector.queue, PROTO3.encodeList(spans)); + assertThat(receivedSpans.take()).containsExactlyElementsOf(spans); // the only way we could read this, is if the malformed span was skipped. assertThat(receivedSpans.take()).containsExactlyElementsOf(spans); @@ -214,9 +223,9 @@ void messageWithMultipleSpans(SpanBytesEncoder encoder) throws Exception { return consumer.accept(spans); })).build().start(); - activemq.pushMessage(collector.queue, ""); // empty bodies don't go to storage - activemq.pushMessage(collector.queue, PROTO3.encodeList(spans)); - activemq.pushMessage(collector.queue, PROTO3.encodeList(spans)); + pushMessage(collector.queue, new byte[] {}); // empty bodies don't go to storage + pushMessage(collector.queue, PROTO3.encodeList(spans)); + pushMessage(collector.queue, PROTO3.encodeList(spans)); assertThat(receivedSpans.take()).containsExactlyElementsOf(spans); latch.countDown(); @@ -232,11 +241,10 @@ void messageWithMultipleSpans(SpanBytesEncoder encoder) throws Exception { } ActiveMQCollector.Builder builder() { - return ActiveMQCollector.builder() - .connectionFactory(activemq.createConnectionFactory()) + // prevent test flakes by having each run in an individual queue + return activemq.newCollectorBuilder(testName) .storage(buildStorage(consumer)) .metrics(metrics) - // prevent test flakes by having each run in an individual queue .queue(testName); } @@ -251,4 +259,30 @@ static StorageComponent buildStorage(final SpanConsumer spanConsumer) { } }; } + + void pushMessage(String queueName, byte[] message) throws Exception { + ActiveMQSpanConsumer consumer = collector.lazyInit.result; + + // Look up the existing session for this queue, so that there is no chance of flakes. + QueueSession session = null; + Queue queue = null; + for (Map.Entry entry : consumer.sessionToReceiver.entrySet()) { + if (entry.getValue().getQueue().getQueueName().equals(queueName)) { + session = entry.getKey(); + queue = entry.getValue().getQueue(); + break; + } + } + if (session == null) { + throw new NoSuchElementException("couldn't find session for queue " + queueName); + } + + Connection conn = collector.lazyInit.result.connection; + + try (QueueSender sender = session.createSender(queue)) { + BytesMessage bytesMessage = session.createBytesMessage(); + bytesMessage.writeBytes(message); + sender.send(bytesMessage); + } + } } diff --git a/zipkin-collector/kafka/src/test/java/zipkin2/collector/kafka/KafkaExtension.java b/zipkin-collector/kafka/src/test/java/zipkin2/collector/kafka/KafkaExtension.java index 5dd6711acf6..f2d9fd36dca 100644 --- a/zipkin-collector/kafka/src/test/java/zipkin2/collector/kafka/KafkaExtension.java +++ b/zipkin-collector/kafka/src/test/java/zipkin2/collector/kafka/KafkaExtension.java @@ -92,7 +92,7 @@ KafkaCollector.Builder newCollectorBuilder(String topic, int streams) { // mostly waiting for https://github.com/testcontainers/testcontainers-java/issues/3537 static final class KafkaContainer extends GenericContainer { KafkaContainer() { - super(parse("ghcr.io/openzipkin/zipkin-kafka:2.24.3")); + super(parse("ghcr.io/openzipkin/zipkin-kafka:2.24.4")); if ("true".equals(System.getProperty("docker.skip"))) { throw new TestAbortedException("${docker.skip} == true"); } diff --git a/zipkin-collector/rabbitmq/src/test/java/zipkin2/collector/rabbitmq/RabbitMQExtension.java b/zipkin-collector/rabbitmq/src/test/java/zipkin2/collector/rabbitmq/RabbitMQExtension.java index 683ae818db6..bc5a46805e3 100644 --- a/zipkin-collector/rabbitmq/src/test/java/zipkin2/collector/rabbitmq/RabbitMQExtension.java +++ b/zipkin-collector/rabbitmq/src/test/java/zipkin2/collector/rabbitmq/RabbitMQExtension.java @@ -61,7 +61,7 @@ RabbitMQCollector.Builder newCollectorBuilder(String queue) { void declareQueue(String queue) { ExecResult result; try { - result = container.execInContainer("rabbitmqadmin", "declare", "queue", "name=" + queue); + result = container.execInContainer("amqp-declare-queue", "-q", queue); } catch (Throwable e) { propagateIfFatal(e); throw new TestAbortedException( @@ -83,11 +83,11 @@ int port() { // mostly waiting for https://github.com/testcontainers/testcontainers-java/issues/3537 static final class RabbitMQContainer extends GenericContainer { RabbitMQContainer() { - super(parse("ghcr.io/openzipkin/rabbitmq-management-alpine:latest")); + super(parse("ghcr.io/openzipkin/zipkin-rabbitmq:2.24.4")); if ("true".equals(System.getProperty("docker.skip"))) { throw new TestAbortedException("${docker.skip} == true"); } - withExposedPorts(RABBIT_PORT); // rabbit's image doesn't expose any port + withExposedPorts(RABBIT_PORT); waitStrategy = Wait.forLogMessage(".*Server startup complete.*", 1); withStartupTimeout(Duration.ofSeconds(60)); } diff --git a/zipkin-storage/cassandra/src/test/java/zipkin2/storage/cassandra/CassandraStorageExtension.java b/zipkin-storage/cassandra/src/test/java/zipkin2/storage/cassandra/CassandraStorageExtension.java index 502053c30e9..4644acd082d 100644 --- a/zipkin-storage/cassandra/src/test/java/zipkin2/storage/cassandra/CassandraStorageExtension.java +++ b/zipkin-storage/cassandra/src/test/java/zipkin2/storage/cassandra/CassandraStorageExtension.java @@ -156,7 +156,7 @@ static boolean poolInFlight(CqlSession session) { // mostly waiting for https://github.com/testcontainers/testcontainers-java/issues/3537 static final class CassandraContainer extends GenericContainer { CassandraContainer() { - super(parse("ghcr.io/openzipkin/zipkin-cassandra:2.24.3")); + super(parse("ghcr.io/openzipkin/zipkin-cassandra:2.24.4")); if ("true".equals(System.getProperty("docker.skip"))) { throw new TestAbortedException("${docker.skip} == true"); } diff --git a/zipkin-storage/elasticsearch/src/test/java/zipkin2/elasticsearch/integration/ElasticsearchExtension.java b/zipkin-storage/elasticsearch/src/test/java/zipkin2/elasticsearch/integration/ElasticsearchExtension.java index 192f432aaad..48b0012acef 100644 --- a/zipkin-storage/elasticsearch/src/test/java/zipkin2/elasticsearch/integration/ElasticsearchExtension.java +++ b/zipkin-storage/elasticsearch/src/test/java/zipkin2/elasticsearch/integration/ElasticsearchExtension.java @@ -127,7 +127,7 @@ String baseUrl() { // mostly waiting for https://github.com/testcontainers/testcontainers-java/issues/3537 static final class ElasticsearchContainer extends GenericContainer { ElasticsearchContainer(int majorVersion) { - super(parse("ghcr.io/openzipkin/zipkin-elasticsearch" + majorVersion + ":2.24.3")); + super(parse("ghcr.io/openzipkin/zipkin-elasticsearch" + majorVersion + ":2.24.4")); if ("true".equals(System.getProperty("docker.skip"))) { throw new TestAbortedException("${docker.skip} == true"); } diff --git a/zipkin-storage/mysql-v1/src/test/java/zipkin2/storage/mysql/v1/MySQLExtension.java b/zipkin-storage/mysql-v1/src/test/java/zipkin2/storage/mysql/v1/MySQLExtension.java index 82593c06135..60050cca5f0 100644 --- a/zipkin-storage/mysql-v1/src/test/java/zipkin2/storage/mysql/v1/MySQLExtension.java +++ b/zipkin-storage/mysql-v1/src/test/java/zipkin2/storage/mysql/v1/MySQLExtension.java @@ -113,7 +113,7 @@ int port() { // mostly waiting for https://github.com/testcontainers/testcontainers-java/issues/3537 static final class MySQLContainer extends GenericContainer { MySQLContainer() { - super(parse("ghcr.io/openzipkin/zipkin-mysql:2.24.3")); + super(parse("ghcr.io/openzipkin/zipkin-mysql:2.24.4")); if ("true".equals(System.getProperty("docker.skip"))) { throw new TestAbortedException("${docker.skip} == true"); }