Skip to content

Commit

Permalink
ci: moves ActiveMQ and RabbitMQ to new docker images (#3640)
Browse files Browse the repository at this point in the history
Notably, this removes a JUnit 4 dependency in activemq and should fix
flakes related to ActiveMQ integration tests.

Fixes #3633

Signed-off-by: Adrian Cole <adrian@tetrate.io>
  • Loading branch information
codefromthecrypt authored Dec 13, 2023
1 parent 6bfc39b commit e86bce0
Show file tree
Hide file tree
Showing 10 changed files with 155 additions and 60 deletions.
1 change: 1 addition & 0 deletions .github/workflows/test.yml
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,7 @@ jobs:
strategy:
matrix:
include:
- name: zipkin-collector-activemq
- name: zipkin-collector-kafka
- name: zipkin-collector-rabbitmq
- name: zipkin-storage-cassandra
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -91,7 +91,7 @@ class ServerIntegratedBenchmark {

@Test void elasticsearch() throws Exception {
GenericContainer<?> elasticsearch =
new GenericContainer<>(parse("ghcr.io/openzipkin/zipkin-elasticsearch7:2.24.3"))
new GenericContainer<>(parse("ghcr.io/openzipkin/zipkin-elasticsearch7:2.24.4"))
.withNetwork(Network.SHARED)
.withNetworkAliases("elasticsearch")
.withLabel("name", "elasticsearch")
Expand All @@ -105,7 +105,7 @@ class ServerIntegratedBenchmark {

@Test void cassandra3() throws Exception {
GenericContainer<?> cassandra =
new GenericContainer<>(parse("ghcr.io/openzipkin/zipkin-cassandra:2.24.3"))
new GenericContainer<>(parse("ghcr.io/openzipkin/zipkin-cassandra:2.24.4"))
.withNetwork(Network.SHARED)
.withNetworkAliases("cassandra")
.withLabel("name", "cassandra")
Expand All @@ -119,7 +119,7 @@ class ServerIntegratedBenchmark {

@Test void mysql() throws Exception {
GenericContainer<?> mysql =
new GenericContainer<>(parse("ghcr.io/openzipkin/zipkin-mysql:2.24.3"))
new GenericContainer<>(parse("ghcr.io/openzipkin/zipkin-mysql:2.24.4"))
.withNetwork(Network.SHARED)
.withNetworkAliases("mysql")
.withLabel("name", "mysql")
Expand Down
16 changes: 0 additions & 16 deletions zipkin-collector/activemq/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -44,21 +44,5 @@
<artifactId>activemq-client</artifactId>
<version>${activemq.version}</version>
</dependency>

<dependency>
<groupId>org.apache.activemq</groupId>
<artifactId>activemq-broker</artifactId>
<version>${activemq.version}</version>
<scope>test</scope>
</dependency>

<!-- Note: this is junit 4 -->
<dependency>
<groupId>org.apache.activemq.tooling</groupId>
<artifactId>activemq-junit</artifactId>
<version>${activemq.version}</version>
<scope>test</scope>
</dependency>

</dependencies>
</project>
Original file line number Diff line number Diff line change
@@ -0,0 +1,76 @@
/*
* Copyright 2016-2023 The OpenZipkin Authors
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except
* in compliance with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License
* is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
* or implied. See the License for the specific language governing permissions and limitations under
* the License.
*/
package zipkin2.collector.activemq;

import java.time.Duration;
import org.apache.activemq.ActiveMQConnectionFactory;
import org.junit.jupiter.api.extension.AfterAllCallback;
import org.junit.jupiter.api.extension.BeforeAllCallback;
import org.junit.jupiter.api.extension.ExtensionContext;
import org.opentest4j.TestAbortedException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.testcontainers.containers.GenericContainer;
import org.testcontainers.containers.wait.strategy.Wait;

import static org.testcontainers.utility.DockerImageName.parse;

class ActiveMQExtension implements BeforeAllCallback, AfterAllCallback {
static final Logger LOGGER = LoggerFactory.getLogger(ActiveMQExtension.class);
static final int ACTIVEMQ_PORT = 61616;

ActiveMQContainer container = new ActiveMQContainer();

@Override public void beforeAll(ExtensionContext context) {
if (context.getRequiredTestClass().getEnclosingClass() != null) {
// Only run once in outermost scope.
return;
}

container.start();
LOGGER.info("Using brokerURL " + brokerURL());
}

@Override public void afterAll(ExtensionContext context) {
if (context.getRequiredTestClass().getEnclosingClass() != null) {
// Only run once in outermost scope.
return;
}

container.stop();
}

ActiveMQCollector.Builder newCollectorBuilder(String queue) {
ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory();
connectionFactory.setBrokerURL(brokerURL());
return ActiveMQCollector.builder().queue(queue).connectionFactory(connectionFactory);
}

String brokerURL() {
return "failover:tcp://" + container.getHost() + ":" + container.getMappedPort(ACTIVEMQ_PORT);
}

// mostly waiting for https://github.com/testcontainers/testcontainers-java/issues/3537
static final class ActiveMQContainer extends GenericContainer<ActiveMQContainer> {
ActiveMQContainer() {
super(parse("ghcr.io/openzipkin/zipkin-activemq:2.24.4"));
if ("true".equals(System.getProperty("docker.skip"))) {
throw new TestAbortedException("${docker.skip} == true");
}
withExposedPorts(ACTIVEMQ_PORT);
waitStrategy = Wait.forListeningPorts(ACTIVEMQ_PORT);
withStartupTimeout(Duration.ofSeconds(60));
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,17 +18,27 @@
import java.lang.reflect.Method;
import java.util.Arrays;
import java.util.List;
import java.util.Map;
import java.util.NoSuchElementException;
import java.util.Optional;
import java.util.concurrent.CopyOnWriteArraySet;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.atomic.AtomicInteger;
import javax.jms.BytesMessage;
import javax.jms.Connection;
import javax.jms.Queue;
import javax.jms.QueueReceiver;
import javax.jms.QueueSender;
import javax.jms.QueueSession;
import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.activemq.junit.EmbeddedActiveMQBroker;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.TestInfo;
import org.junit.jupiter.api.TestInstance;
import org.junit.jupiter.api.Timeout;
import org.junit.jupiter.api.extension.RegisterExtension;
import zipkin2.Call;
import zipkin2.Callback;
import zipkin2.Component;
Expand All @@ -46,39 +56,39 @@
import static zipkin2.codec.SpanBytesEncoder.PROTO3;
import static zipkin2.codec.SpanBytesEncoder.THRIFT;

public class ITActiveMQCollector {
@TestInstance(TestInstance.Lifecycle.PER_CLASS)
@Timeout(60)
class ITActiveMQCollector {
@RegisterExtension ActiveMQExtension activemq = new ActiveMQExtension();
List<Span> spans = Arrays.asList(LOTS_OF_SPANS[0], LOTS_OF_SPANS[1]);

/** Managed directly as this is a JUnit 4, not 5 type. */
EmbeddedActiveMQBroker activemq;
public String testName;

InMemoryCollectorMetrics metrics = new InMemoryCollectorMetrics();
InMemoryCollectorMetrics activemqMetrics = metrics.forTransport("activemq");

CopyOnWriteArraySet<Thread> threadsProvidingSpans = new CopyOnWriteArraySet<>();
LinkedBlockingQueue<List<Span>> receivedSpans = new LinkedBlockingQueue<>();
SpanConsumer consumer = (spans) -> {
threadsProvidingSpans.add(Thread.currentThread());
receivedSpans.add(spans);
return Call.create(null);
};

SpanConsumer consumer;
ActiveMQCollector collector;

@BeforeEach void start(TestInfo testInfo) {
Optional<Method> testMethod = testInfo.getTestMethod();
if (testMethod.isPresent()) {
this.testName = testMethod.get().getName();
}
activemq = new EmbeddedActiveMQBroker();
activemq.start();
threadsProvidingSpans.clear();
receivedSpans.clear();
consumer = (spans) -> {
threadsProvidingSpans.add(Thread.currentThread());
receivedSpans.add(spans);
return Call.create(null);
};
activemqMetrics.clear();
collector = builder().build().start();
}

@AfterEach void stop() throws IOException {
activemq.stop();
collector.close();
}

Expand Down Expand Up @@ -106,9 +116,8 @@ public class ITActiveMQCollector {
* information.
*/
@Test void toStringContainsOnlySummaryInformation() {
assertThat(collector).hasToString(String.format("ActiveMQCollector{brokerURL=%s, queue=%s}",
activemq.getVmURL(), testName)
);
assertThat(collector).hasToString(
String.format("ActiveMQCollector{brokerURL=%s, queue=%s}", activemq.brokerURL(), testName));
}

/** Ensures list encoding works: a json encoded list of spans */
Expand All @@ -128,7 +137,7 @@ public class ITActiveMQCollector {

void messageWithMultipleSpans(SpanBytesEncoder encoder) throws Exception {
byte[] message = encoder.encodeList(spans);
activemq.pushMessage(collector.queue, message);
pushMessage(collector.queue, message);

assertThat(receivedSpans.take()).isEqualTo(spans);

Expand All @@ -143,18 +152,18 @@ void messageWithMultipleSpans(SpanBytesEncoder encoder) throws Exception {
@Test void skipsMalformedData() throws Exception {
byte[] malformed1 = "[\"='".getBytes(UTF_8); // screwed up json
byte[] malformed2 = "malformed".getBytes(UTF_8);
activemq.pushMessage(collector.queue, THRIFT.encodeList(spans));
activemq.pushMessage(collector.queue, new byte[0]);
activemq.pushMessage(collector.queue, malformed1);
activemq.pushMessage(collector.queue, malformed2);
activemq.pushMessage(collector.queue, THRIFT.encodeList(spans));
pushMessage(collector.queue, THRIFT.encodeList(spans));
pushMessage(collector.queue, new byte[0]);
pushMessage(collector.queue, malformed1);
pushMessage(collector.queue, malformed2);
pushMessage(collector.queue, THRIFT.encodeList(spans));

Thread.sleep(1000);

assertThat(activemqMetrics.messages()).isEqualTo(5);
assertThat(activemqMetrics.messagesDropped()).isEqualTo(2); // only malformed, not empty
assertThat(activemqMetrics.bytes())
.isEqualTo(THRIFT.encodeList(spans).length * 2 + malformed1.length + malformed2.length);
assertThat(activemqMetrics.bytes()).isEqualTo(
THRIFT.encodeList(spans).length * 2 + malformed1.length + malformed2.length);
assertThat(activemqMetrics.spans()).isEqualTo(spans.size() * 2);
assertThat(activemqMetrics.spansDropped()).isZero();
}
Expand Down Expand Up @@ -183,12 +192,12 @@ void messageWithMultipleSpans(SpanBytesEncoder encoder) throws Exception {
}
};

activemq.pushMessage(collector.queue, PROTO3.encodeList(spans));
activemq.pushMessage(collector.queue, PROTO3.encodeList(spans)); // tossed on error
activemq.pushMessage(collector.queue, PROTO3.encodeList(spans));

collector = builder().storage(buildStorage(consumer)).build().start();

pushMessage(collector.queue, PROTO3.encodeList(spans));
pushMessage(collector.queue, PROTO3.encodeList(spans)); // tossed on error
pushMessage(collector.queue, PROTO3.encodeList(spans));

assertThat(receivedSpans.take()).containsExactlyElementsOf(spans);
// the only way we could read this, is if the malformed span was skipped.
assertThat(receivedSpans.take()).containsExactlyElementsOf(spans);
Expand All @@ -214,9 +223,9 @@ void messageWithMultipleSpans(SpanBytesEncoder encoder) throws Exception {
return consumer.accept(spans);
})).build().start();

activemq.pushMessage(collector.queue, ""); // empty bodies don't go to storage
activemq.pushMessage(collector.queue, PROTO3.encodeList(spans));
activemq.pushMessage(collector.queue, PROTO3.encodeList(spans));
pushMessage(collector.queue, new byte[] {}); // empty bodies don't go to storage
pushMessage(collector.queue, PROTO3.encodeList(spans));
pushMessage(collector.queue, PROTO3.encodeList(spans));

assertThat(receivedSpans.take()).containsExactlyElementsOf(spans);
latch.countDown();
Expand All @@ -232,11 +241,10 @@ void messageWithMultipleSpans(SpanBytesEncoder encoder) throws Exception {
}

ActiveMQCollector.Builder builder() {
return ActiveMQCollector.builder()
.connectionFactory(activemq.createConnectionFactory())
// prevent test flakes by having each run in an individual queue
return activemq.newCollectorBuilder(testName)
.storage(buildStorage(consumer))
.metrics(metrics)
// prevent test flakes by having each run in an individual queue
.queue(testName);
}

Expand All @@ -251,4 +259,30 @@ static StorageComponent buildStorage(final SpanConsumer spanConsumer) {
}
};
}

void pushMessage(String queueName, byte[] message) throws Exception {
ActiveMQSpanConsumer consumer = collector.lazyInit.result;

// Look up the existing session for this queue, so that there is no chance of flakes.
QueueSession session = null;
Queue queue = null;
for (Map.Entry<QueueSession, QueueReceiver> entry : consumer.sessionToReceiver.entrySet()) {
if (entry.getValue().getQueue().getQueueName().equals(queueName)) {
session = entry.getKey();
queue = entry.getValue().getQueue();
break;
}
}
if (session == null) {
throw new NoSuchElementException("couldn't find session for queue " + queueName);
}

Connection conn = collector.lazyInit.result.connection;

try (QueueSender sender = session.createSender(queue)) {
BytesMessage bytesMessage = session.createBytesMessage();
bytesMessage.writeBytes(message);
sender.send(bytesMessage);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -92,7 +92,7 @@ KafkaCollector.Builder newCollectorBuilder(String topic, int streams) {
// mostly waiting for https://github.com/testcontainers/testcontainers-java/issues/3537
static final class KafkaContainer extends GenericContainer<KafkaContainer> {
KafkaContainer() {
super(parse("ghcr.io/openzipkin/zipkin-kafka:2.24.3"));
super(parse("ghcr.io/openzipkin/zipkin-kafka:2.24.4"));
if ("true".equals(System.getProperty("docker.skip"))) {
throw new TestAbortedException("${docker.skip} == true");
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ RabbitMQCollector.Builder newCollectorBuilder(String queue) {
void declareQueue(String queue) {
ExecResult result;
try {
result = container.execInContainer("rabbitmqadmin", "declare", "queue", "name=" + queue);
result = container.execInContainer("amqp-declare-queue", "-q", queue);
} catch (Throwable e) {
propagateIfFatal(e);
throw new TestAbortedException(
Expand All @@ -83,11 +83,11 @@ int port() {
// mostly waiting for https://github.com/testcontainers/testcontainers-java/issues/3537
static final class RabbitMQContainer extends GenericContainer<RabbitMQContainer> {
RabbitMQContainer() {
super(parse("ghcr.io/openzipkin/rabbitmq-management-alpine:latest"));
super(parse("ghcr.io/openzipkin/zipkin-rabbitmq:2.24.4"));
if ("true".equals(System.getProperty("docker.skip"))) {
throw new TestAbortedException("${docker.skip} == true");
}
withExposedPorts(RABBIT_PORT); // rabbit's image doesn't expose any port
withExposedPorts(RABBIT_PORT);
waitStrategy = Wait.forLogMessage(".*Server startup complete.*", 1);
withStartupTimeout(Duration.ofSeconds(60));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -156,7 +156,7 @@ static boolean poolInFlight(CqlSession session) {
// mostly waiting for https://github.com/testcontainers/testcontainers-java/issues/3537
static final class CassandraContainer extends GenericContainer<CassandraContainer> {
CassandraContainer() {
super(parse("ghcr.io/openzipkin/zipkin-cassandra:2.24.3"));
super(parse("ghcr.io/openzipkin/zipkin-cassandra:2.24.4"));
if ("true".equals(System.getProperty("docker.skip"))) {
throw new TestAbortedException("${docker.skip} == true");
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -127,7 +127,7 @@ String baseUrl() {
// mostly waiting for https://github.com/testcontainers/testcontainers-java/issues/3537
static final class ElasticsearchContainer extends GenericContainer<ElasticsearchContainer> {
ElasticsearchContainer(int majorVersion) {
super(parse("ghcr.io/openzipkin/zipkin-elasticsearch" + majorVersion + ":2.24.3"));
super(parse("ghcr.io/openzipkin/zipkin-elasticsearch" + majorVersion + ":2.24.4"));
if ("true".equals(System.getProperty("docker.skip"))) {
throw new TestAbortedException("${docker.skip} == true");
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -113,7 +113,7 @@ int port() {
// mostly waiting for https://github.com/testcontainers/testcontainers-java/issues/3537
static final class MySQLContainer extends GenericContainer<MySQLContainer> {
MySQLContainer() {
super(parse("ghcr.io/openzipkin/zipkin-mysql:2.24.3"));
super(parse("ghcr.io/openzipkin/zipkin-mysql:2.24.4"));
if ("true".equals(System.getProperty("docker.skip"))) {
throw new TestAbortedException("${docker.skip} == true");
}
Expand Down

0 comments on commit e86bce0

Please sign in to comment.