Skip to content

Commit

Permalink
Bump versions
Browse files Browse the repository at this point in the history
  • Loading branch information
pbernet committed Aug 4, 2024
1 parent ea4ed73 commit d73906d
Show file tree
Hide file tree
Showing 4 changed files with 33 additions and 35 deletions.
10 changes: 5 additions & 5 deletions build.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -12,9 +12,9 @@ val pekkoConnectorVersion = "1.0.2"
val pekkoConnectorKafkaVersion = "1.0.0"

val kafkaVersion = "3.7.0"
val activemqVersion = "5.18.4" // We are stuck with 5.x
val artemisVersion = "2.35.0"
val testContainersVersion = "1.19.8"
val activemqVersion = "5.18.5" // We are stuck with 5.x
val artemisVersion = "2.36.0"
val testContainersVersion = "1.20.1"
val keycloakVersion = "24.0.4"
val sttpVersion = "3.9.0"
val influxdbVersion = "7.1.0"
Expand Down Expand Up @@ -90,7 +90,7 @@ libraryDependencies ++= Seq(

"com.influxdb" %% "influxdb-client-scala" % influxdbVersion,
"com.influxdb" % "flux-dsl" % influxdbVersion,
"org.influxdb" % "influxdb-java" % "2.23",
"org.influxdb" % "influxdb-java" % "2.24",

"ca.uhn.hapi" % "hapi-base" % "2.3",
"ca.uhn.hapi" % "hapi-structures-v23" % "2.3",
Expand All @@ -111,7 +111,7 @@ libraryDependencies ++= Seq(
"io.projectreactor" % "reactor-core" % "3.5.4",
"io.reactivex.rxjava3" % "rxjava" % "3.1.6",

"com.github.blemale" %% "scaffeine" % "5.2.1",
"com.github.blemale" %% "scaffeine" % "5.3.0",
"ch.qos.logback" % "logback-classic" % "1.4.12",

"org.testcontainers" % "testcontainers" % testContainersVersion,
Expand Down
17 changes: 10 additions & 7 deletions src/main/resources/broker.xml
Original file line number Diff line number Diff line change
Expand Up @@ -31,13 +31,16 @@ Apache Artemis minimal broker config
</security-settings>

<!-- TODO sync with broker_docker.xml -->
<queues>
<queue name="jms.queue.exampleQueue">
<address>jms.queue.exampleQueue</address>
<durable>true</durable>
</queue>
</queues>

<addresses>
<address name="jms.queue.exampleQueue">
<multicast>
<!-- pre-configured shared durable subscription queue -->
<queue name="jms.queue.exampleQueue" max-consumers="10">
<durable>true</durable>
</queue>
</multicast>
</address>
</addresses>

<address-settings>
<address-setting match="jms.queue.exampleQueue">
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,14 +22,14 @@ import scala.util.control.NonFatal

/**
* Use case:
* - Process a stream of messages with reoccurring TRACE_ID
* - For the first TRACE_ID: download .zip file from FileServer, add path to cache and store file
* - Process a stream of (random gen) msgs with reoccurring TRACE_ID
* - For the first TRACE_ID: download .zip file from FileServer, add path to local cache and store file
* - For subsequent TRACE_IDs: try to fetch path from local cache to avoid duplicate downloads per TRACE_ID
* - On downstream error: the path needs to be kept longer in the cache
* - On restart: populate cache from local filesystem
*
* Before running this class: start [[alpakka.env.FileServer]] to simulate non idempotent responses
* Monitor localFileCache dir with cmd: watch ls -ltr
* Monitor `localFileCache` dir with cmd: watch ls -ltr
*
* Doc:
* - Caffeine: https://github.com/ben-manes/caffeine
Expand All @@ -50,8 +50,8 @@ object LocalFileCacheCaffeine {
}

val scaleFactor = 1 // Raise to widen range of IDs and thus have more traffic
val evictionTime: FiniteDuration = 5.minutes // Lower eg to 5.seconds to see cache and file system deletes
val evictionTimeOnError: FiniteDuration = 10.minutes
val cacheEvictionTime: FiniteDuration = 5.minutes // Lower eg to 5.seconds to see cache and file system deletes
val cacheEvictionTimeOnError: FiniteDuration = 10.minutes
val localFileCache: Path = Paths.get(System.getProperty("java.io.tmpdir")).resolve("localFileCache")

logger.info(s"Starting with localFileCache dir: $localFileCache")
Expand All @@ -70,7 +70,7 @@ object LocalFileCacheCaffeine {
val cache: Cache[Int, Path] =
Scaffeine()
.recordStats()
.expireAfter[Int, Path]((_, _) => evictionTime, (_, _, _) => evictionTimeOnError, (_, _, _) => evictionTime)
.expireAfter[Int, Path]((_, _) => cacheEvictionTime, (_, _, _) => cacheEvictionTimeOnError, (_, _, _) => cacheEvictionTime)
.maximumSize(1000)
.removalListener((key, value, cause) => deleteFromFileStore(key, value, cause))
.build[Int, Path]()
Expand Down
29 changes: 12 additions & 17 deletions src/test/scala/alpakka/influxdb/InfluxdbIT.java
Original file line number Diff line number Diff line change
@@ -1,8 +1,6 @@
package alpakka.influxdb;

import org.apache.pekko.Done;
import org.apache.pekko.actor.ActorSystem;
import org.junit.Ignore;
import org.junit.jupiter.api.*;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand All @@ -13,12 +11,9 @@
import util.LogFileScanner;

import java.io.IOException;
import java.util.List;
import java.time.Duration;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionStage;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
import java.util.stream.IntStream;

import static org.assertj.core.api.AssertionsForClassTypes.assertThat;
Expand Down Expand Up @@ -46,8 +41,8 @@ public static void setupBeforeClass() throws IOException, InterruptedException {
// Doc: https://docs.influxdata.com/influxdb/v2.1/reference/release-notes/influxdb/
LOGGER.info("InfluxDB container listening on port: {}. Running: {} ", influxDBContainer.getMappedPort(INFLUXDB_PORT), influxDBContainer.isRunning());
Container.ExecResult result = influxDBContainer.execInContainer("influx", "setup", "-b", "testbucket", "-f", "-o", "testorg", "-t", "abcdefgh", "-u", "admin", "-p", "adminadmin");
LOGGER.info("Result exit code: " + result.getExitCode());
LOGGER.info("Result stdout: " + result.getStdout());
LOGGER.info("Result exit code: {}", result.getExitCode());
LOGGER.info("Result stdout: {}", result.getStdout());
browserClient();
}

Expand All @@ -67,12 +62,15 @@ public void setupBeforeTest(TestInfo testInfo) {
@Order(1)
void testWriteAndRead() {
int maxClients = 5;
int nPoints = 1000;
int nPoints = 100;

List<CompletionStage<Done>> futList = IntStream.rangeClosed(1, maxClients).boxed().parallel()
.map(i -> influxDBWriter.writeTestPoints(nPoints, "sensor" + i))
.collect(Collectors.toList());
assertThat(CompletableFuture.allOf(futList.toArray(new CompletableFuture[futList.size()]))).succeedsWithin(5 * maxClients, TimeUnit.SECONDS);
assertThat(
CompletableFuture.allOf(
IntStream.rangeClosed(1, maxClients)
.mapToObj(i -> influxDBWriter.writeTestPoints(nPoints, "sensor" + i))
.toArray(CompletableFuture[]::new)
)
).succeedsWithin(Duration.ofSeconds(10 * maxClients));

assertThat(influxDBReader.getQuerySync("testMem").length()).isEqualTo(nPoints * maxClients);
assertThat(influxDBReader.fluxQueryCount("testMem")).isEqualTo(nPoints * maxClients);
Expand All @@ -81,13 +79,10 @@ void testWriteAndRead() {

@Test
@Order(2)
@Ignore
void testWriteAndReadLineProtocol() throws ExecutionException, InterruptedException {
int nPoints = 10;
influxDBWriter.writeTestPointsFromLineProtocolSync();
// TODO Activate, when "com.influxdb" %% "influxdb-client-scala" is available for pekko
//assertThat(influxDBReader.getQuerySync("testMemLP").length()).isEqualTo(nPoints);
assert (true);
assertThat(influxDBReader.getQuerySync("testMemLP").length()).isEqualTo(nPoints);
}

@Test
Expand Down

0 comments on commit d73906d

Please sign in to comment.