Skip to content

Commit

Permalink
add test for update race condition
Browse files Browse the repository at this point in the history
  • Loading branch information
pwinckles committed Nov 19, 2024
1 parent 4ef60a4 commit b4e6791
Showing 1 changed file with 102 additions and 0 deletions.
102 changes: 102 additions & 0 deletions dxm/src/test/java/io/pcp/parfait/dxm/PcpMmvWriterIntegrationTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -17,11 +17,18 @@
package io.pcp.parfait.dxm;

import io.pcp.parfait.dxm.semantics.Semantics;
import io.pcp.parfait.dxm.types.AbstractTypeHandler;
import io.pcp.parfait.dxm.types.MmvMetricType;
import org.hamcrest.Matcher;
import org.junit.BeforeClass;
import org.junit.Test;

import java.lang.reflect.Field;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.Phaser;

import static io.pcp.parfait.dxm.IdentifierSourceSet.DEFAULT_SET;
import static io.pcp.parfait.dxm.MmvVersion.MMV_VERSION1;
Expand Down Expand Up @@ -109,6 +116,93 @@ public void resetShouldClearStrings() throws Exception {
assertStringsCount(pcpMmvWriterV2, 0);
}

@Test
public void metricUpdatesWhileResettingWriterShouldNotBeLost() throws Exception {
// The order the metrics are written is non-deterministic because they're pulled out of a hash map, so
// we must dynamically record their order.
List<String> order = new ArrayList<>();

pcpMmvWriterV1.reset();
pcpMmvWriterV1.addMetric(MetricName.parse("value1"), Semantics.COUNTER, ONE, 1,
new AbstractTypeHandler<Number>(MmvMetricType.I32, 4) {
public void putBytes(ByteBuffer buffer, Number value) {
order.add("value1");
buffer.putInt(value == null ? 0 : value.intValue());
}
});
pcpMmvWriterV1.addMetric(MetricName.parse("value2"), Semantics.COUNTER, ONE, 2,
new AbstractTypeHandler<Number>(MmvMetricType.I32, 4) {
public void putBytes(ByteBuffer buffer, Number value) {
order.add("value2");
buffer.putInt(value == null ? 0 : value.intValue());
}
});

pcpMmvWriterV1.start();

waitForReload();

assertMetric("mmv.value1", is("1.000"));
assertMetric("mmv.value2", is("2.000"));

pcpMmvWriterV1.reset();

// The idea here is that the 1st metric will be written immediately, but the 2nd will wait on the phaser to
// write. This gives us time to update the 1st metric value. The sleep is needed to ensure the start() method
// doesn't exit before updateMetric() is executed.
Phaser phaser = new Phaser(2);

pcpMmvWriterV1.addMetric(MetricName.parse("value1"), Semantics.COUNTER, ONE, 1,
new AbstractTypeHandler<Number>(MmvMetricType.I32, 4) {
public void putBytes(ByteBuffer buffer, Number value) {
boolean isNotFirst = !"value1".equals(order.get(0));
if (isNotFirst) {
phaser.arriveAndAwaitAdvance();
}
buffer.putInt(value == null ? 0 : value.intValue());
if (isNotFirst) {
sleep(1_000);
}
}
});
pcpMmvWriterV1.addMetric(MetricName.parse("value2"), Semantics.COUNTER, ONE, 2,
new AbstractTypeHandler<Number>(MmvMetricType.I32, 4) {
public void putBytes(ByteBuffer buffer, Number value) {
boolean isNotFirst = !"value2".equals(order.get(0));
if (isNotFirst) {
phaser.arriveAndAwaitAdvance();
}
buffer.putInt(value == null ? 0 : value.intValue());
if (isNotFirst) {
sleep(1_000);
}
}
});

CountDownLatch startDone = new CountDownLatch(1);

new Thread(() -> {
try {
pcpMmvWriterV1.start();
} catch (Exception e) {
e.printStackTrace();
} finally {
startDone.countDown();
}
}).start();

// Will not continue till after the 1st metric has been written
phaser.arriveAndAwaitAdvance();

pcpMmvWriterV1.updateMetric(MetricName.parse(order.get(0)), 10);

startDone.await();

waitForReload();

assertMetric("mmv." + order.get(0), is("10.000"));
}

private void assertMetric(String metricName, Matcher<String> expectedValue) throws Exception {
String actual = pcpClient.getMetric(metricName);
assertThat(actual, expectedValue);
Expand All @@ -125,4 +219,12 @@ private void waitForReload() throws InterruptedException {
Thread.sleep(1000);
}

private void sleep(long millis) {
try {
Thread.sleep(millis);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
}

}

0 comments on commit b4e6791

Please sign in to comment.