diff --git a/.github/workflows/ci-nightly.yml b/.github/workflows/ci-nightly.yml
new file mode 100644
index 0000000..2e1158d
--- /dev/null
+++ b/.github/workflows/ci-nightly.yml
@@ -0,0 +1,158 @@
+name: Nightly Stress Tests
+
+on:
+ schedule:
+ # Run at 2 AM UTC every day
+ - cron: '0 2 * * *'
+ workflow_dispatch:
+ inputs:
+ duration_minutes:
+ description: 'Test duration in minutes'
+ required: false
+ default: '5'
+ thread_count:
+ description: 'Number of concurrent threads'
+ required: false
+ default: '100'
+ test_groups:
+ description: 'Test groups to run (comma-separated: stress, chaos)'
+ required: false
+ default: 'stress,chaos'
+
+permissions:
+ contents: read
+ checks: write
+
+jobs:
+ stress-tests:
+ name: Stress Tests (Java ${{ matrix.java }})
+ runs-on: ubuntu-latest
+ strategy:
+ fail-fast: false
+ matrix:
+ java: ['17', '21']
+
+ steps:
+ - name: Checkout repository
+ uses: actions/checkout@v6
+ with:
+ fetch-depth: 1
+
+ - name: Set up JDK ${{ matrix.java }}
+ uses: actions/setup-java@v5
+ with:
+ java-version: ${{ matrix.java }}
+ distribution: 'temurin'
+ cache: 'maven'
+
+ - name: Run stress tests
+ run: |
+ mvn -B clean verify -Pstress \
+ -pl aether-datafixers-functional-tests \
+ -Dgroups=${{ github.event.inputs.test_groups || 'stress,chaos' }} \
+ -Dstress.threads=${{ github.event.inputs.thread_count || '100' }} \
+ -Dstress.duration.minutes=${{ github.event.inputs.duration_minutes || '5' }} \
+ -Ddependency-check.skip=true
+ timeout-minutes: 60
+
+ - name: Upload stress test results
+ uses: actions/upload-artifact@v6
+ if: always()
+ with:
+ name: stress-test-results-java${{ matrix.java }}
+ path: |
+ **/target/failsafe-reports/TEST-*.xml
+ **/target/failsafe-reports/*.txt
+ retention-days: 30
+
+ chaos-tests:
+ name: Chaos Tests (Java 21)
+ runs-on: ubuntu-latest
+ needs: stress-tests
+ if: always() && (needs.stress-tests.result == 'success' || needs.stress-tests.result == 'failure')
+
+ steps:
+ - name: Checkout repository
+ uses: actions/checkout@v6
+ with:
+ fetch-depth: 1
+
+ - name: Set up JDK 21
+ uses: actions/setup-java@v5
+ with:
+ java-version: '21'
+ distribution: 'temurin'
+ cache: 'maven'
+
+ - name: Run chaos tests only
+ run: |
+ mvn -B clean verify -Pit \
+ -pl aether-datafixers-functional-tests \
+ -Dgroups=chaos \
+ -Ddependency-check.skip=true
+ timeout-minutes: 30
+
+ - name: Upload chaos test results
+ uses: actions/upload-artifact@v6
+ if: always()
+ with:
+ name: chaos-test-results
+ path: |
+ **/target/failsafe-reports/TEST-*.xml
+ retention-days: 30
+
+ reports:
+ name: Test Reports
+ runs-on: ubuntu-latest
+ needs: [stress-tests, chaos-tests]
+ if: always()
+
+ permissions:
+ contents: read
+ checks: write
+
+ steps:
+ - name: Download stress test results (Java 17)
+ uses: actions/download-artifact@v4
+ continue-on-error: true
+ with:
+ name: stress-test-results-java17
+ path: reports/stress-java17
+
+ - name: Download stress test results (Java 21)
+ uses: actions/download-artifact@v4
+ continue-on-error: true
+ with:
+ name: stress-test-results-java21
+ path: reports/stress-java21
+
+ - name: Download chaos test results
+ uses: actions/download-artifact@v4
+ continue-on-error: true
+ with:
+ name: chaos-test-results
+ path: reports/chaos
+
+ - name: Publish Stress Test Report (Java 17)
+ uses: mikepenz/action-junit-report@v6
+ if: always()
+ continue-on-error: true
+ with:
+ report_paths: 'reports/stress-java17/**/TEST-*.xml'
+ check_name: Stress Test Report (Java 17)
+
+ - name: Publish Stress Test Report (Java 21)
+ uses: mikepenz/action-junit-report@v6
+ if: always()
+ continue-on-error: true
+ with:
+ report_paths: 'reports/stress-java21/**/TEST-*.xml'
+ check_name: Stress Test Report (Java 21)
+
+ - name: Publish Chaos Test Report
+ uses: mikepenz/action-junit-report@v6
+ if: always()
+ continue-on-error: true
+ with:
+ report_paths: 'reports/chaos/**/TEST-*.xml'
+ check_name: Chaos Test Report
diff --git a/aether-datafixers-functional-tests/pom.xml b/aether-datafixers-functional-tests/pom.xml
index 4f4e7a3..6b190c9 100644
--- a/aether-datafixers-functional-tests/pom.xml
+++ b/aether-datafixers-functional-tests/pom.xml
@@ -139,6 +139,7 @@
+
org.apache.maven.plugins
maven-failsafe-plugin
@@ -148,6 +149,15 @@
**/*IT.java
**/*E2E.java
+
+
+ ${stress.threads}
+ ${stress.duration.minutes}
+ ${stress.operations.per.thread}
+
+
+ -Xms512M -Xmx2G
+ 1800
diff --git a/aether-datafixers-functional-tests/src/test/java/de/splatgames/aether/datafixers/functional/chaos/RandomDelaysChaosIT.java b/aether-datafixers-functional-tests/src/test/java/de/splatgames/aether/datafixers/functional/chaos/RandomDelaysChaosIT.java
new file mode 100644
index 0000000..0a33f02
--- /dev/null
+++ b/aether-datafixers-functional-tests/src/test/java/de/splatgames/aether/datafixers/functional/chaos/RandomDelaysChaosIT.java
@@ -0,0 +1,354 @@
+/*
+ * Copyright (c) 2025 Splatgames.de Software and Contributors
+ *
+ * Permission is hereby granted, free of charge, to any person obtaining a copy
+ * of this software and associated documentation files (the "Software"), to deal
+ * in the Software without restriction, including without limitation the rights
+ * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+ * copies of the Software, and to permit persons to whom the Software is
+ * furnished to do so, subject to the following conditions:
+ *
+ * The above copyright notice and this permission notice shall be included in all
+ * copies or substantial portions of the Software.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+ * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+ * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+ * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+ * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+ * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
+ * SOFTWARE.
+ */
+
+package de.splatgames.aether.datafixers.functional.chaos;
+
+import com.google.gson.JsonElement;
+import com.google.gson.JsonObject;
+import de.splatgames.aether.datafixers.api.DataVersion;
+import de.splatgames.aether.datafixers.api.TypeReference;
+import de.splatgames.aether.datafixers.api.dynamic.Dynamic;
+import de.splatgames.aether.datafixers.api.fix.DataFix;
+import de.splatgames.aether.datafixers.api.fix.DataFixer;
+import de.splatgames.aether.datafixers.api.fix.DataFixerContext;
+import de.splatgames.aether.datafixers.codec.json.gson.GsonOps;
+import de.splatgames.aether.datafixers.core.fix.DataFixerBuilder;
+import de.splatgames.aether.datafixers.functional.chaos.util.ChaosInjector;
+import de.splatgames.aether.datafixers.functional.stress.util.ThreadOrchestrator;
+import org.jetbrains.annotations.NotNull;
+import org.junit.jupiter.api.DisplayName;
+import org.junit.jupiter.api.Nested;
+import org.junit.jupiter.api.Tag;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.Timeout;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.time.Duration;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+/**
+ * Chaos tests introducing random delays into fix execution.
+ *
+ *
Tests that the DataFixer system correctly handles:
+ *
+ * - Variable latency in fix execution
+ * - Ordering correctness despite delays
+ * - Concurrent execution with mixed delay patterns
+ *
+ *
+ * Run with: {@code mvn verify -Pit -Dgroups=chaos}
+ */
+@DisplayName("Random Delays Chaos Tests")
+@Tag("chaos")
+@Tag("integration")
+class RandomDelaysChaosIT {
+
+ private static final Logger LOGGER = LoggerFactory.getLogger(RandomDelaysChaosIT.class);
+ private static final TypeReference CHAOS_TYPE = new TypeReference("chaos_delay_entity");
+
+ @Nested
+ @DisplayName("Single Thread Delays")
+ class SingleThreadDelays {
+
+ @Test
+ @Timeout(value = 5, unit = TimeUnit.MINUTES)
+ @DisplayName("completes correctly with random delays")
+ void completesCorrectlyWithRandomDelays() {
+ int operationCount = 100;
+ ChaosInjector chaos = ChaosInjector.builder()
+ .delayRange(1, 50)
+ .build();
+
+ AtomicInteger fixApplyCount = new AtomicInteger(0);
+
+ DataFixer fixer = new DataFixerBuilder(new DataVersion(2))
+ .addFix(CHAOS_TYPE, createDelayingFix("delay_fix", 1, 2, chaos, fixApplyCount))
+ .build();
+
+ for (int i = 0; i < operationCount; i++) {
+ JsonObject inputObj = new JsonObject();
+ inputObj.addProperty("id", i);
+ inputObj.addProperty("sequence", i);
+ Dynamic input = new Dynamic<>(GsonOps.INSTANCE, inputObj);
+
+ Dynamic result = fixer.update(
+ CHAOS_TYPE, input,
+ new DataVersion(1), new DataVersion(2)
+ );
+
+ // Verify result correctness despite delay
+ assertThat(result.get("processed").asBoolean().result()).contains(true);
+ assertThat(result.get("id").asInt().result()).contains(i);
+ }
+
+ assertThat(fixApplyCount.get()).isEqualTo(operationCount);
+ LOGGER.info("Completed {} operations with {} delays injected",
+ operationCount, chaos.delayCount());
+ }
+
+ @Test
+ @Timeout(value = 5, unit = TimeUnit.MINUTES)
+ @DisplayName("handles extreme delay variations in fix chain")
+ void handlesExtremeDelayVariationsInChain() {
+ int chainLength = 5;
+ int operationCount = 50;
+
+ DataFixerBuilder builder = new DataFixerBuilder(new DataVersion(chainLength + 1));
+
+ // Each fix in the chain has different delay characteristics
+ for (int v = 1; v <= chainLength; v++) {
+ final int version = v;
+ // Vary delay range: some instant, some with significant delay
+ ChaosInjector chaos = (v % 2 == 0)
+ ? ChaosInjector.builder().delayRange(0, 100).build()
+ : ChaosInjector.builder().build(); // No delay
+
+ builder.addFix(CHAOS_TYPE, new DataFix() {
+ @Override
+ public @NotNull String name() {
+ return "chain_delay_fix_v" + version;
+ }
+
+ @Override
+ public @NotNull DataVersion fromVersion() {
+ return new DataVersion(version);
+ }
+
+ @Override
+ public @NotNull DataVersion toVersion() {
+ return new DataVersion(version + 1);
+ }
+
+ @Override
+ public @NotNull Dynamic apply(
+ @NotNull TypeReference type,
+ @NotNull Dynamic input,
+ @NotNull DataFixerContext context
+ ) {
+ chaos.maybeDelay();
+ int step = input.get("step").asInt().result().orElse(0);
+ return input.set("step", input.createInt(step + 1));
+ }
+ });
+ }
+
+ DataFixer fixer = builder.build();
+
+ for (int i = 0; i < operationCount; i++) {
+ JsonObject inputObj = new JsonObject();
+ inputObj.addProperty("id", i);
+ inputObj.addProperty("step", 0);
+ Dynamic input = new Dynamic<>(GsonOps.INSTANCE, inputObj);
+
+ Dynamic result = fixer.update(
+ CHAOS_TYPE, input,
+ new DataVersion(1), new DataVersion(chainLength + 1)
+ );
+
+ // Verify all fixes in chain were applied correctly
+ assertThat(result.get("step").asInt().result())
+ .as("All chain fixes applied for operation " + i)
+ .contains(chainLength);
+ }
+
+ LOGGER.info("Completed {} chain operations with varying delays", operationCount);
+ }
+ }
+
+ @Nested
+ @DisplayName("Concurrent Delays")
+ class ConcurrentDelays {
+
+ @Test
+ @Timeout(value = 5, unit = TimeUnit.MINUTES)
+ @DisplayName("handles random delays in concurrent execution")
+ void handlesRandomDelaysInConcurrentExecution() throws Exception {
+ int threadCount = 50;
+ int opsPerThread = 100;
+
+ ChaosInjector chaos = ChaosInjector.builder()
+ .delayRange(1, 20)
+ .build();
+
+ AtomicInteger totalOps = new AtomicInteger(0);
+
+ DataFixer fixer = new DataFixerBuilder(new DataVersion(2))
+ .addFix(CHAOS_TYPE, createDelayingFix("concurrent_delay_fix", 1, 2, chaos, totalOps))
+ .build();
+
+ try (ThreadOrchestrator orchestrator = ThreadOrchestrator.withThreads(threadCount)) {
+ for (int t = 0; t < threadCount; t++) {
+ final int threadId = t;
+ orchestrator.submit(() -> {
+ for (int i = 0; i < opsPerThread; i++) {
+ JsonObject inputObj = new JsonObject();
+ inputObj.addProperty("id", threadId * 10_000L + i);
+ inputObj.addProperty("threadId", threadId);
+ Dynamic input = new Dynamic<>(GsonOps.INSTANCE, inputObj);
+
+ Dynamic result = fixer.update(
+ CHAOS_TYPE, input,
+ new DataVersion(1), new DataVersion(2)
+ );
+
+ // Verify data integrity
+ assertThat(result.get("processed").asBoolean().result())
+ .contains(true);
+ assertThat(result.get("threadId").asInt().result())
+ .contains(threadId);
+ }
+ });
+ }
+
+ orchestrator.startAll();
+ boolean completed = orchestrator.awaitCompletion(Duration.ofMinutes(5));
+
+ assertThat(completed).isTrue();
+ assertThat(orchestrator.errors()).isEmpty();
+ assertThat(totalOps.get()).isEqualTo(threadCount * opsPerThread);
+ }
+
+ LOGGER.info("Completed {} concurrent operations with {} delays",
+ totalOps.get(), chaos.delayCount());
+ }
+
+ @Test
+ @Timeout(value = 5, unit = TimeUnit.MINUTES)
+ @DisplayName("maintains ordering within each thread despite delays")
+ void maintainsOrderingWithinEachThread() throws Exception {
+ int threadCount = 20;
+ int opsPerThread = 50;
+
+ ChaosInjector chaos = ChaosInjector.builder()
+ .delayRange(1, 30)
+ .build();
+
+ DataFixer fixer = new DataFixerBuilder(new DataVersion(2))
+ .addFix(CHAOS_TYPE, new DataFix() {
+ @Override
+ public @NotNull String name() {
+ return "ordering_delay_fix";
+ }
+
+ @Override
+ public @NotNull DataVersion fromVersion() {
+ return new DataVersion(1);
+ }
+
+ @Override
+ public @NotNull DataVersion toVersion() {
+ return new DataVersion(2);
+ }
+
+ @Override
+ public @NotNull Dynamic apply(
+ @NotNull TypeReference type,
+ @NotNull Dynamic input,
+ @NotNull DataFixerContext context
+ ) {
+ chaos.maybeDelay();
+ long timestamp = System.nanoTime();
+ return input
+ .set("processed", input.createBoolean(true))
+ .set("processedAt", input.createLong(timestamp));
+ }
+ })
+ .build();
+
+ try (ThreadOrchestrator orchestrator = ThreadOrchestrator.withThreads(threadCount)) {
+ for (int t = 0; t < threadCount; t++) {
+ final int threadId = t;
+ orchestrator.submit(() -> {
+ long lastTimestamp = 0;
+ for (int i = 0; i < opsPerThread; i++) {
+ JsonObject inputObj = new JsonObject();
+ inputObj.addProperty("id", threadId * 10_000L + i);
+ inputObj.addProperty("sequence", i);
+ Dynamic input = new Dynamic<>(GsonOps.INSTANCE, inputObj);
+
+ Dynamic result = fixer.update(
+ CHAOS_TYPE, input,
+ new DataVersion(1), new DataVersion(2)
+ );
+
+ long currentTimestamp = result.get("processedAt").asLong().result().orElse(0L);
+
+ // Within a single thread, processing should be sequential
+ assertThat(currentTimestamp)
+ .as("Ordering within thread " + threadId)
+ .isGreaterThanOrEqualTo(lastTimestamp);
+
+ lastTimestamp = currentTimestamp;
+ }
+ });
+ }
+
+ orchestrator.startAll();
+ boolean completed = orchestrator.awaitCompletion(Duration.ofMinutes(5));
+
+ assertThat(completed).isTrue();
+ assertThat(orchestrator.errors()).isEmpty();
+ }
+
+ LOGGER.info("Verified ordering across {} threads with delays", threadCount);
+ }
+ }
+
+ // Helper methods
+
+ private DataFix createDelayingFix(
+ String name, int from, int to,
+ ChaosInjector chaos, AtomicInteger counter
+ ) {
+ return new DataFix<>() {
+ @Override
+ public @NotNull String name() {
+ return name;
+ }
+
+ @Override
+ public @NotNull DataVersion fromVersion() {
+ return new DataVersion(from);
+ }
+
+ @Override
+ public @NotNull DataVersion toVersion() {
+ return new DataVersion(to);
+ }
+
+ @Override
+ public @NotNull Dynamic apply(
+ @NotNull TypeReference type,
+ @NotNull Dynamic input,
+ @NotNull DataFixerContext context
+ ) {
+ chaos.maybeDelay();
+ counter.incrementAndGet();
+ return input.set("processed", input.createBoolean(true));
+ }
+ };
+ }
+}
diff --git a/aether-datafixers-functional-tests/src/test/java/de/splatgames/aether/datafixers/functional/chaos/RandomFailuresChaosIT.java b/aether-datafixers-functional-tests/src/test/java/de/splatgames/aether/datafixers/functional/chaos/RandomFailuresChaosIT.java
new file mode 100644
index 0000000..bf84494
--- /dev/null
+++ b/aether-datafixers-functional-tests/src/test/java/de/splatgames/aether/datafixers/functional/chaos/RandomFailuresChaosIT.java
@@ -0,0 +1,423 @@
+/*
+ * Copyright (c) 2025 Splatgames.de Software and Contributors
+ *
+ * Permission is hereby granted, free of charge, to any person obtaining a copy
+ * of this software and associated documentation files (the "Software"), to deal
+ * in the Software without restriction, including without limitation the rights
+ * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+ * copies of the Software, and to permit persons to whom the Software is
+ * furnished to do so, subject to the following conditions:
+ *
+ * The above copyright notice and this permission notice shall be included in all
+ * copies or substantial portions of the Software.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+ * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+ * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+ * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+ * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+ * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
+ * SOFTWARE.
+ */
+
+package de.splatgames.aether.datafixers.functional.chaos;
+
+import com.google.gson.JsonElement;
+import com.google.gson.JsonObject;
+import de.splatgames.aether.datafixers.api.DataVersion;
+import de.splatgames.aether.datafixers.api.TypeReference;
+import de.splatgames.aether.datafixers.api.dynamic.Dynamic;
+import de.splatgames.aether.datafixers.api.fix.DataFix;
+import de.splatgames.aether.datafixers.api.fix.DataFixer;
+import de.splatgames.aether.datafixers.api.fix.DataFixerContext;
+import de.splatgames.aether.datafixers.codec.json.gson.GsonOps;
+import de.splatgames.aether.datafixers.core.fix.DataFixerBuilder;
+import de.splatgames.aether.datafixers.functional.chaos.util.ChaosInjector;
+import de.splatgames.aether.datafixers.functional.stress.util.ThreadOrchestrator;
+import org.jetbrains.annotations.NotNull;
+import org.junit.jupiter.api.DisplayName;
+import org.junit.jupiter.api.Nested;
+import org.junit.jupiter.api.Tag;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.Timeout;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.time.Duration;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.assertj.core.api.Assertions.assertThatThrownBy;
+
+/**
+ * Chaos tests with random failures in fix execution.
+ *
+ * Tests that the DataFixer system correctly handles:
+ *
+ * - Sporadic failures during fix execution
+ * - Failure isolation between concurrent operations
+ * - Proper exception propagation
+ *
+ *
+ * Run with: {@code mvn verify -Pit -Dgroups=chaos}
+ */
+@DisplayName("Random Failures Chaos Tests")
+@Tag("chaos")
+@Tag("integration")
+class RandomFailuresChaosIT {
+
+ private static final Logger LOGGER = LoggerFactory.getLogger(RandomFailuresChaosIT.class);
+ private static final TypeReference CHAOS_TYPE = new TypeReference("chaos_failure_entity");
+
+ @Nested
+ @DisplayName("Single Thread Failures")
+ class SingleThreadFailures {
+
+ @Test
+ @Timeout(value = 2, unit = TimeUnit.MINUTES)
+ @DisplayName("handles sporadic fix failures")
+ void handlesSporadicFixFailures() {
+ int operationCount = 200;
+ double failureProbability = 0.05; // 5% failure rate
+
+ ChaosInjector chaos = ChaosInjector.withFailureProbability(failureProbability);
+ AtomicInteger successCount = new AtomicInteger(0);
+ AtomicInteger failureCount = new AtomicInteger(0);
+
+ DataFixer fixer = new DataFixerBuilder(new DataVersion(2))
+ .addFix(CHAOS_TYPE, createFailingFix("sporadic_fix", 1, 2, chaos))
+ .build();
+
+ for (int i = 0; i < operationCount; i++) {
+ JsonObject inputObj = new JsonObject();
+ inputObj.addProperty("id", i);
+ Dynamic input = new Dynamic<>(GsonOps.INSTANCE, inputObj);
+
+ try {
+ Dynamic result = fixer.update(
+ CHAOS_TYPE, input,
+ new DataVersion(1), new DataVersion(2)
+ );
+
+ assertThat(result.get("processed").asBoolean().result()).contains(true);
+ successCount.incrementAndGet();
+ } catch (ChaosInjector.ChaosException e) {
+ // Expected chaos failure
+ failureCount.incrementAndGet();
+ }
+ }
+
+ LOGGER.info("Operations: {}, Success: {}, Failures: {}",
+ operationCount, successCount.get(), failureCount.get());
+
+ // With 5% failure rate over 200 operations, we expect some failures
+ assertThat(failureCount.get())
+ .as("Some failures should have occurred")
+ .isGreaterThan(0);
+
+ // But not all should fail
+ assertThat(successCount.get())
+ .as("Most operations should succeed")
+ .isGreaterThan(operationCount / 2);
+ }
+
+ @Test
+ @Timeout(value = 2, unit = TimeUnit.MINUTES)
+ @DisplayName("propagates fix exceptions correctly")
+ void propagatesFixExceptionsCorrectly() {
+ ChaosInjector chaos = ChaosInjector.withFailureProbability(1.0); // Always fail
+
+ DataFixer fixer = new DataFixerBuilder(new DataVersion(2))
+ .addFix(CHAOS_TYPE, createFailingFix("always_fail_fix", 1, 2, chaos))
+ .build();
+
+ JsonObject inputObj = new JsonObject();
+ inputObj.addProperty("id", 1);
+ Dynamic input = new Dynamic<>(GsonOps.INSTANCE, inputObj);
+
+ assertThatThrownBy(() -> fixer.update(
+ CHAOS_TYPE, input,
+ new DataVersion(1), new DataVersion(2)
+ ))
+ .isInstanceOf(ChaosInjector.ChaosException.class)
+ .hasMessageContaining("Chaos-injected failure");
+ }
+
+ @Test
+ @Timeout(value = 2, unit = TimeUnit.MINUTES)
+ @DisplayName("failure in chain stops subsequent fixes")
+ void failureInChainStopsSubsequentFixes() {
+ AtomicInteger fix1Count = new AtomicInteger(0);
+ AtomicInteger fix2Count = new AtomicInteger(0);
+ AtomicInteger fix3Count = new AtomicInteger(0);
+
+ ChaosInjector alwaysFail = ChaosInjector.withFailureProbability(1.0);
+ ChaosInjector neverFail = ChaosInjector.withFailureProbability(0.0);
+
+ DataFixer fixer = new DataFixerBuilder(new DataVersion(4))
+ .addFix(CHAOS_TYPE, createCountingFix("chain_fix_1", 1, 2, neverFail, fix1Count))
+ .addFix(CHAOS_TYPE, createCountingFix("chain_fix_2", 2, 3, alwaysFail, fix2Count))
+ .addFix(CHAOS_TYPE, createCountingFix("chain_fix_3", 3, 4, neverFail, fix3Count))
+ .build();
+
+ JsonObject inputObj = new JsonObject();
+ inputObj.addProperty("id", 1);
+ Dynamic input = new Dynamic<>(GsonOps.INSTANCE, inputObj);
+
+ assertThatThrownBy(() -> fixer.update(
+ CHAOS_TYPE, input,
+ new DataVersion(1), new DataVersion(4)
+ )).isInstanceOf(ChaosInjector.ChaosException.class);
+
+ // Fix 1 should have run
+ assertThat(fix1Count.get()).isEqualTo(1);
+ // Fix 2 should have run and failed
+ assertThat(fix2Count.get()).isEqualTo(1);
+ // Fix 3 should NOT have run due to failure in fix 2
+ assertThat(fix3Count.get()).isEqualTo(0);
+ }
+ }
+
+ @Nested
+ @DisplayName("Concurrent Failure Isolation")
+ class ConcurrentFailureIsolation {
+
+ @Test
+ @Timeout(value = 5, unit = TimeUnit.MINUTES)
+ @DisplayName("isolates failures between threads")
+ void isolatesFailuresBetweenThreads() throws Exception {
+ int threadCount = 50;
+ int opsPerThread = 100;
+ double failureProbability = 0.1; // 10% failure rate
+
+ ChaosInjector chaos = ChaosInjector.withFailureProbability(failureProbability);
+ AtomicInteger totalSuccess = new AtomicInteger(0);
+ AtomicInteger totalFailure = new AtomicInteger(0);
+ Set threadsWithSuccess = ConcurrentHashMap.newKeySet();
+ Set threadsWithFailure = ConcurrentHashMap.newKeySet();
+
+ DataFixer fixer = new DataFixerBuilder(new DataVersion(2))
+ .addFix(CHAOS_TYPE, createFailingFix("isolation_fix", 1, 2, chaos))
+ .build();
+
+ try (ThreadOrchestrator orchestrator = ThreadOrchestrator.withThreads(threadCount)) {
+ for (int t = 0; t < threadCount; t++) {
+ final int threadId = t;
+ orchestrator.submit(() -> {
+ for (int i = 0; i < opsPerThread; i++) {
+ JsonObject inputObj = new JsonObject();
+ inputObj.addProperty("id", threadId * 10_000L + i);
+ inputObj.addProperty("threadId", threadId);
+ Dynamic input = new Dynamic<>(GsonOps.INSTANCE, inputObj);
+
+ try {
+ Dynamic result = fixer.update(
+ CHAOS_TYPE, input,
+ new DataVersion(1), new DataVersion(2)
+ );
+
+ // Verify the thread's data wasn't corrupted by another thread's failure
+ assertThat(result.get("threadId").asInt().result())
+ .contains(threadId);
+
+ totalSuccess.incrementAndGet();
+ threadsWithSuccess.add(threadId);
+ } catch (ChaosInjector.ChaosException e) {
+ totalFailure.incrementAndGet();
+ threadsWithFailure.add(threadId);
+ }
+ }
+ });
+ }
+
+ orchestrator.startAll();
+ boolean completed = orchestrator.awaitCompletion(Duration.ofMinutes(5));
+
+ // We expect the orchestrator to complete even with failures
+ // because individual operation failures don't stop the thread
+ assertThat(completed).isTrue();
+ // Orchestrator should not collect ChaosExceptions as errors
+ // since they're caught within the task
+ assertThat(orchestrator.errors()).isEmpty();
+ }
+
+ LOGGER.info("Total operations: {}, Success: {}, Failures: {}",
+ threadCount * opsPerThread, totalSuccess.get(), totalFailure.get());
+ LOGGER.info("Threads with success: {}, Threads with failure: {}",
+ threadsWithSuccess.size(), threadsWithFailure.size());
+
+ // Verify both successes and failures occurred across different threads
+ assertThat(totalSuccess.get())
+ .as("Some operations should succeed")
+ .isGreaterThan(0);
+ assertThat(totalFailure.get())
+ .as("Some operations should fail")
+ .isGreaterThan(0);
+
+ // Most threads should have had both successes and failures
+ // (with 10% failure rate over 100 ops, very likely to see both)
+ assertThat(threadsWithSuccess.size())
+ .as("Multiple threads should have successes")
+ .isGreaterThan(threadCount / 2);
+ }
+
+ @Test
+ @Timeout(value = 5, unit = TimeUnit.MINUTES)
+ @DisplayName("fixer remains functional after failures")
+ void fixerRemainsFunctionalAfterFailures() throws Exception {
+ int phases = 3;
+ int opsPerPhase = 100;
+
+ // Phase 1: Some failures
+ // Phase 2: No failures
+ // Phase 3: Some failures again
+ // Verify fixer works correctly throughout
+
+ AtomicInteger phase1Success = new AtomicInteger(0);
+ AtomicInteger phase1Failure = new AtomicInteger(0);
+ AtomicInteger phase2Success = new AtomicInteger(0);
+ AtomicInteger phase3Success = new AtomicInteger(0);
+ AtomicInteger phase3Failure = new AtomicInteger(0);
+
+ ChaosInjector chaosWith10Percent = ChaosInjector.withFailureProbability(0.1);
+ ChaosInjector chaosNoFailure = ChaosInjector.withFailureProbability(0.0);
+
+ // Use a single fixer throughout all phases
+ DataFixer fixer = new DataFixerBuilder(new DataVersion(2))
+ .addFix(CHAOS_TYPE, createFailingFix("resilience_fix", 1, 2, chaosWith10Percent))
+ .build();
+
+ // Phase 1: With failures
+ for (int i = 0; i < opsPerPhase; i++) {
+ JsonObject inputObj = new JsonObject();
+ inputObj.addProperty("id", i);
+ inputObj.addProperty("phase", 1);
+ Dynamic input = new Dynamic<>(GsonOps.INSTANCE, inputObj);
+
+ try {
+ fixer.update(CHAOS_TYPE, input, new DataVersion(1), new DataVersion(2));
+ phase1Success.incrementAndGet();
+ } catch (ChaosInjector.ChaosException e) {
+ phase1Failure.incrementAndGet();
+ }
+ }
+
+ // Create a new fixer for phase 2 with no failures to verify isolation
+ DataFixer fixer2 = new DataFixerBuilder(new DataVersion(2))
+ .addFix(CHAOS_TYPE, createFailingFix("stable_fix", 1, 2, chaosNoFailure))
+ .build();
+
+ // Phase 2: No failures - verify fixer still works
+ for (int i = 0; i < opsPerPhase; i++) {
+ JsonObject inputObj = new JsonObject();
+ inputObj.addProperty("id", i);
+ inputObj.addProperty("phase", 2);
+ Dynamic input = new Dynamic<>(GsonOps.INSTANCE, inputObj);
+
+ Dynamic result = fixer2.update(
+ CHAOS_TYPE, input,
+ new DataVersion(1), new DataVersion(2)
+ );
+
+ assertThat(result.get("processed").asBoolean().result()).contains(true);
+ phase2Success.incrementAndGet();
+ }
+
+ // Phase 3: With failures again using original fixer
+ for (int i = 0; i < opsPerPhase; i++) {
+ JsonObject inputObj = new JsonObject();
+ inputObj.addProperty("id", i);
+ inputObj.addProperty("phase", 3);
+ Dynamic input = new Dynamic<>(GsonOps.INSTANCE, inputObj);
+
+ try {
+ fixer.update(CHAOS_TYPE, input, new DataVersion(1), new DataVersion(2));
+ phase3Success.incrementAndGet();
+ } catch (ChaosInjector.ChaosException e) {
+ phase3Failure.incrementAndGet();
+ }
+ }
+
+ LOGGER.info("Phase 1: {} success, {} failure", phase1Success.get(), phase1Failure.get());
+ LOGGER.info("Phase 2: {} success (should be 100%)", phase2Success.get());
+ LOGGER.info("Phase 3: {} success, {} failure", phase3Success.get(), phase3Failure.get());
+
+ // Phase 2 should have 100% success
+ assertThat(phase2Success.get()).isEqualTo(opsPerPhase);
+
+ // Phase 1 and 3 should have some failures (10% rate)
+ assertThat(phase1Failure.get()).isGreaterThan(0);
+ assertThat(phase3Failure.get()).isGreaterThan(0);
+ }
+ }
+
+ // Helper methods
+
+ private DataFix createFailingFix(
+ String name, int from, int to,
+ ChaosInjector chaos
+ ) {
+ return new DataFix<>() {
+ @Override
+ public @NotNull String name() {
+ return name;
+ }
+
+ @Override
+ public @NotNull DataVersion fromVersion() {
+ return new DataVersion(from);
+ }
+
+ @Override
+ public @NotNull DataVersion toVersion() {
+ return new DataVersion(to);
+ }
+
+ @Override
+ public @NotNull Dynamic apply(
+ @NotNull TypeReference type,
+ @NotNull Dynamic input,
+ @NotNull DataFixerContext context
+ ) {
+ chaos.maybeFail();
+ return input.set("processed", input.createBoolean(true));
+ }
+ };
+ }
+
+ private DataFix createCountingFix(
+ String name, int from, int to,
+ ChaosInjector chaos, AtomicInteger counter
+ ) {
+ return new DataFix<>() {
+ @Override
+ public @NotNull String name() {
+ return name;
+ }
+
+ @Override
+ public @NotNull DataVersion fromVersion() {
+ return new DataVersion(from);
+ }
+
+ @Override
+ public @NotNull DataVersion toVersion() {
+ return new DataVersion(to);
+ }
+
+ @Override
+ public @NotNull Dynamic apply(
+ @NotNull TypeReference type,
+ @NotNull Dynamic input,
+ @NotNull DataFixerContext context
+ ) {
+ counter.incrementAndGet();
+ chaos.maybeFail();
+ return input.set("processed", input.createBoolean(true));
+ }
+ };
+ }
+}
diff --git a/aether-datafixers-functional-tests/src/test/java/de/splatgames/aether/datafixers/functional/chaos/util/ChaosInjector.java b/aether-datafixers-functional-tests/src/test/java/de/splatgames/aether/datafixers/functional/chaos/util/ChaosInjector.java
new file mode 100644
index 0000000..33dbf47
--- /dev/null
+++ b/aether-datafixers-functional-tests/src/test/java/de/splatgames/aether/datafixers/functional/chaos/util/ChaosInjector.java
@@ -0,0 +1,250 @@
+/*
+ * Copyright (c) 2025 Splatgames.de Software and Contributors
+ *
+ * Permission is hereby granted, free of charge, to any person obtaining a copy
+ * of this software and associated documentation files (the "Software"), to deal
+ * in the Software without restriction, including without limitation the rights
+ * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+ * copies of the Software, and to permit persons to whom the Software is
+ * furnished to do so, subject to the following conditions:
+ *
+ * The above copyright notice and this permission notice shall be included in all
+ * copies or substantial portions of the Software.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+ * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+ * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+ * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+ * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+ * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
+ * SOFTWARE.
+ */
+
+package de.splatgames.aether.datafixers.functional.chaos.util;
+
+import java.util.SplittableRandom;
+import java.util.concurrent.atomic.AtomicLong;
+
+/**
+ * Injects chaos into test execution for chaos engineering tests.
+ *
+ * Provides controlled randomization for:
+ *
+ * - Random delays to simulate variable latency
+ * - Random failures to test error handling
+ *
+ *
+ * Uses {@link SplittableRandom} for high-performance, thread-local randomness
+ * without atomic contention.
+ *
+ *
Example usage:
+ *
{@code
+ * ChaosInjector chaos = ChaosInjector.builder()
+ * .failureProbability(0.05) // 5% failure rate
+ * .delayRange(1, 50) // 1-50ms random delays
+ * .build();
+ *
+ * // In test code:
+ * chaos.maybeDelay();
+ * chaos.maybeFail("Simulated failure");
+ * }
+ */
+public final class ChaosInjector {
+
+ private final double failureProbability;
+ private final int minDelayMs;
+ private final int maxDelayMs;
+ private final ThreadLocal random;
+ private final AtomicLong delayCount = new AtomicLong();
+ private final AtomicLong failureCount = new AtomicLong();
+
+ private ChaosInjector(Builder builder) {
+ this.failureProbability = builder.failureProbability;
+ this.minDelayMs = builder.minDelayMs;
+ this.maxDelayMs = builder.maxDelayMs;
+ this.random = ThreadLocal.withInitial(SplittableRandom::new);
+ }
+
+ /**
+ * Creates a builder for configuring a ChaosInjector.
+ *
+ * @return a new builder
+ */
+ public static Builder builder() {
+ return new Builder();
+ }
+
+ /**
+ * Creates a ChaosInjector with only failure probability configured.
+ *
+ * @param probability the probability of failure (0.0 to 1.0)
+ * @return a new ChaosInjector
+ */
+ public static ChaosInjector withFailureProbability(double probability) {
+ return builder().failureProbability(probability).build();
+ }
+
+ /**
+ * Creates a ChaosInjector with only random delays configured.
+ *
+ * @param minMs minimum delay in milliseconds
+ * @param maxMs maximum delay in milliseconds
+ * @return a new ChaosInjector
+ */
+ public static ChaosInjector withRandomDelay(int minMs, int maxMs) {
+ return builder().delayRange(minMs, maxMs).build();
+ }
+
+ /**
+ * Introduces a random delay based on configured parameters.
+ *
+ * Does nothing if no delay range is configured.
+ */
+ public void maybeDelay() {
+ if (maxDelayMs <= 0) {
+ return;
+ }
+
+ int delay = minDelayMs + random.get().nextInt(maxDelayMs - minDelayMs + 1);
+ if (delay > 0) {
+ try {
+ Thread.sleep(delay);
+ delayCount.incrementAndGet();
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ }
+ }
+ }
+
+ /**
+ * Throws an exception based on configured failure probability.
+ *
+ * @param message the exception message if failure is triggered
+ * @throws ChaosException if random chance triggers failure
+ */
+ public void maybeFail(String message) {
+ if (failureProbability <= 0) {
+ return;
+ }
+
+ if (random.get().nextDouble() < failureProbability) {
+ failureCount.incrementAndGet();
+ throw new ChaosException(message);
+ }
+ }
+
+ /**
+ * Throws an exception based on configured failure probability.
+ *
+ *
Uses a default message.
+ *
+ * @throws ChaosException if random chance triggers failure
+ */
+ public void maybeFail() {
+ maybeFail("Chaos-injected failure");
+ }
+
+ /**
+ * Returns true based on configured failure probability.
+ *
+ *
Useful for conditional logic instead of exceptions.
+ *
+ * @return true if failure should be simulated
+ */
+ public boolean shouldFail() {
+ if (failureProbability <= 0) {
+ return false;
+ }
+ return random.get().nextDouble() < failureProbability;
+ }
+
+ /**
+ * Returns the total number of delays that were injected.
+ *
+ * @return delay count
+ */
+ public long delayCount() {
+ return delayCount.get();
+ }
+
+ /**
+ * Returns the total number of failures that were injected.
+ *
+ * @return failure count
+ */
+ public long failureCount() {
+ return failureCount.get();
+ }
+
+ /**
+ * Resets the delay and failure counters.
+ */
+ public void resetCounters() {
+ delayCount.set(0);
+ failureCount.set(0);
+ }
+
+ /**
+ * Builder for ChaosInjector.
+ */
+ public static final class Builder {
+ private double failureProbability = 0.0;
+ private int minDelayMs = 0;
+ private int maxDelayMs = 0;
+
+ private Builder() {
+ }
+
+ /**
+ * Sets the failure probability.
+ *
+ * @param probability value between 0.0 (never fail) and 1.0 (always fail)
+ * @return this builder
+ */
+ public Builder failureProbability(double probability) {
+ if (probability < 0.0 || probability > 1.0) {
+ throw new IllegalArgumentException(
+ "Probability must be between 0.0 and 1.0: " + probability
+ );
+ }
+ this.failureProbability = probability;
+ return this;
+ }
+
+ /**
+ * Sets the random delay range.
+ *
+ * @param minMs minimum delay in milliseconds (inclusive)
+ * @param maxMs maximum delay in milliseconds (inclusive)
+ * @return this builder
+ */
+ public Builder delayRange(int minMs, int maxMs) {
+ if (minMs < 0 || maxMs < minMs) {
+ throw new IllegalArgumentException(
+ "Invalid delay range: " + minMs + " - " + maxMs
+ );
+ }
+ this.minDelayMs = minMs;
+ this.maxDelayMs = maxMs;
+ return this;
+ }
+
+ /**
+ * Builds the ChaosInjector.
+ *
+ * @return a new ChaosInjector
+ */
+ public ChaosInjector build() {
+ return new ChaosInjector(this);
+ }
+ }
+
+ /**
+ * Exception thrown by chaos injection.
+ */
+ public static final class ChaosException extends RuntimeException {
+ public ChaosException(String message) {
+ super(message);
+ }
+ }
+}
diff --git a/aether-datafixers-functional-tests/src/test/java/de/splatgames/aether/datafixers/functional/chaos/util/FailingDataFix.java b/aether-datafixers-functional-tests/src/test/java/de/splatgames/aether/datafixers/functional/chaos/util/FailingDataFix.java
new file mode 100644
index 0000000..192f4ba
--- /dev/null
+++ b/aether-datafixers-functional-tests/src/test/java/de/splatgames/aether/datafixers/functional/chaos/util/FailingDataFix.java
@@ -0,0 +1,217 @@
+/*
+ * Copyright (c) 2025 Splatgames.de Software and Contributors
+ *
+ * Permission is hereby granted, free of charge, to any person obtaining a copy
+ * of this software and associated documentation files (the "Software"), to deal
+ * in the Software without restriction, including without limitation the rights
+ * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+ * copies of the Software, and to permit persons to whom the Software is
+ * furnished to do so, subject to the following conditions:
+ *
+ * The above copyright notice and this permission notice shall be included in all
+ * copies or substantial portions of the Software.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+ * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+ * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+ * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+ * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+ * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
+ * SOFTWARE.
+ */
+
+package de.splatgames.aether.datafixers.functional.chaos.util;
+
+import de.splatgames.aether.datafixers.api.DataVersion;
+import de.splatgames.aether.datafixers.api.TypeReference;
+import de.splatgames.aether.datafixers.api.dynamic.Dynamic;
+import de.splatgames.aether.datafixers.api.fix.DataFix;
+import de.splatgames.aether.datafixers.api.fix.DataFixerContext;
+import org.jetbrains.annotations.NotNull;
+
+import java.util.concurrent.atomic.AtomicInteger;
+
+/**
+ * A DataFix that can be configured to fail randomly for chaos testing.
+ *
+ *
This fix wraps the chaos injection logic and can be used to test
+ * error handling in the data fixer pipeline.
+ *
+ * @param the element type
+ */
+public final class FailingDataFix implements DataFix {
+
+ private final String name;
+ private final DataVersion fromVersion;
+ private final DataVersion toVersion;
+ private final ChaosInjector chaosInjector;
+ private final AtomicInteger applyCount = new AtomicInteger(0);
+ private final AtomicInteger failCount = new AtomicInteger(0);
+
+ private FailingDataFix(Builder builder) {
+ this.name = builder.name;
+ this.fromVersion = builder.fromVersion;
+ this.toVersion = builder.toVersion;
+ this.chaosInjector = builder.chaosInjector;
+ }
+
+ /**
+ * Creates a builder for a FailingDataFix.
+ *
+ * @param the element type
+ * @return a new builder
+ */
+ public static Builder builder() {
+ return new Builder<>();
+ }
+
+ @Override
+ public @NotNull String name() {
+ return name;
+ }
+
+ @Override
+ public @NotNull DataVersion fromVersion() {
+ return fromVersion;
+ }
+
+ @Override
+ public @NotNull DataVersion toVersion() {
+ return toVersion;
+ }
+
+ @Override
+ public @NotNull Dynamic apply(
+ @NotNull TypeReference type,
+ @NotNull Dynamic input,
+ @NotNull DataFixerContext context
+ ) {
+ applyCount.incrementAndGet();
+
+ // Inject delay if configured
+ chaosInjector.maybeDelay();
+
+ // Inject failure if configured
+ if (chaosInjector.shouldFail()) {
+ failCount.incrementAndGet();
+ throw new ChaosInjector.ChaosException(
+ "Chaos failure in fix '" + name + "' at version " + fromVersion + " -> " + toVersion
+ );
+ }
+
+ // Mark as processed and return
+ return input.set("chaos_processed", input.createBoolean(true));
+ }
+
+ /**
+ * Returns the number of times this fix has been applied.
+ *
+ * @return apply count
+ */
+ public int applyCount() {
+ return applyCount.get();
+ }
+
+ /**
+ * Returns the number of times this fix has triggered a failure.
+ *
+ * @return fail count
+ */
+ public int failCount() {
+ return failCount.get();
+ }
+
+ /**
+ * Resets the apply and fail counters.
+ */
+ public void resetCounters() {
+ applyCount.set(0);
+ failCount.set(0);
+ }
+
+ /**
+ * Builder for FailingDataFix.
+ *
+ * @param the element type
+ */
+ public static final class Builder {
+ private String name = "chaos_fix";
+ private DataVersion fromVersion = new DataVersion(1);
+ private DataVersion toVersion = new DataVersion(2);
+ private ChaosInjector chaosInjector = ChaosInjector.builder().build();
+
+ private Builder() {
+ }
+
+ /**
+ * Sets the fix name.
+ *
+ * @param name the fix name
+ * @return this builder
+ */
+ public Builder name(String name) {
+ this.name = name;
+ return this;
+ }
+
+ /**
+ * Sets the version range.
+ *
+ * @param from the source version
+ * @param to the target version
+ * @return this builder
+ */
+ public Builder versions(int from, int to) {
+ this.fromVersion = new DataVersion(from);
+ this.toVersion = new DataVersion(to);
+ return this;
+ }
+
+ /**
+ * Sets the chaos injector to use.
+ *
+ * @param chaosInjector the chaos injector
+ * @return this builder
+ */
+ public Builder chaosInjector(ChaosInjector chaosInjector) {
+ this.chaosInjector = chaosInjector;
+ return this;
+ }
+
+ /**
+ * Configures the fix to fail with the given probability.
+ *
+ * @param probability failure probability (0.0 to 1.0)
+ * @return this builder
+ */
+ public Builder failureProbability(double probability) {
+ this.chaosInjector = ChaosInjector.withFailureProbability(probability);
+ return this;
+ }
+
+ /**
+ * Configures the fix to introduce random delays.
+ *
+ * Note: This replaces any existing chaos configuration.
+ * To combine delays with failures, use {@link #chaosInjector(ChaosInjector)}
+ * with a fully configured injector.
+ *
+ * @param minMs minimum delay in milliseconds
+ * @param maxMs maximum delay in milliseconds
+ * @return this builder
+ */
+ public Builder randomDelay(int minMs, int maxMs) {
+ this.chaosInjector = ChaosInjector.withRandomDelay(minMs, maxMs);
+ return this;
+ }
+
+ /**
+ * Builds the FailingDataFix.
+ *
+ * @return a new FailingDataFix
+ */
+ public FailingDataFix build() {
+ return new FailingDataFix<>(this);
+ }
+ }
+}
diff --git a/aether-datafixers-functional-tests/src/test/java/de/splatgames/aether/datafixers/functional/stress/HighConcurrencyMigrationStressIT.java b/aether-datafixers-functional-tests/src/test/java/de/splatgames/aether/datafixers/functional/stress/HighConcurrencyMigrationStressIT.java
new file mode 100644
index 0000000..4dd0ff3
--- /dev/null
+++ b/aether-datafixers-functional-tests/src/test/java/de/splatgames/aether/datafixers/functional/stress/HighConcurrencyMigrationStressIT.java
@@ -0,0 +1,469 @@
+/*
+ * Copyright (c) 2025 Splatgames.de Software and Contributors
+ *
+ * Permission is hereby granted, free of charge, to any person obtaining a copy
+ * of this software and associated documentation files (the "Software"), to deal
+ * in the Software without restriction, including without limitation the rights
+ * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+ * copies of the Software, and to permit persons to whom the Software is
+ * furnished to do so, subject to the following conditions:
+ *
+ * The above copyright notice and this permission notice shall be included in all
+ * copies or substantial portions of the Software.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+ * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+ * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+ * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+ * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+ * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
+ * SOFTWARE.
+ */
+
+package de.splatgames.aether.datafixers.functional.stress;
+
+import com.google.gson.JsonElement;
+import com.google.gson.JsonObject;
+import de.splatgames.aether.datafixers.api.DataVersion;
+import de.splatgames.aether.datafixers.api.TypeReference;
+import de.splatgames.aether.datafixers.api.dynamic.Dynamic;
+import de.splatgames.aether.datafixers.api.fix.DataFix;
+import de.splatgames.aether.datafixers.api.fix.DataFixer;
+import de.splatgames.aether.datafixers.api.fix.DataFixerContext;
+import de.splatgames.aether.datafixers.codec.json.gson.GsonOps;
+import de.splatgames.aether.datafixers.core.fix.DataFixerBuilder;
+import de.splatgames.aether.datafixers.functional.stress.util.StressTestConfig;
+import de.splatgames.aether.datafixers.functional.stress.util.StressTestMetrics;
+import de.splatgames.aether.datafixers.functional.stress.util.ThreadOrchestrator;
+import org.jetbrains.annotations.NotNull;
+import org.junit.jupiter.api.DisplayName;
+import org.junit.jupiter.api.Nested;
+import org.junit.jupiter.api.Tag;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.Timeout;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.ValueSource;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.time.Duration;
+import java.time.Instant;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+/**
+ * Stress tests for high-concurrency migration scenarios.
+ *
+ * Tests DataFixer behavior with 100+ concurrent threads performing
+ * simultaneous migrations. Verifies:
+ *
+ * - Thread safety under high contention
+ * - Data integrity across concurrent operations
+ * - Throughput scaling with thread count
+ *
+ *
+ * Configuration via system properties:
+ *
+ * - {@code stress.threads} - Number of concurrent threads (default: 100)
+ * - {@code stress.operations.per.thread} - Operations per thread (default: 1000)
+ *
+ *
+ * Run with: {@code mvn verify -Pstress -Dgroups=stress}
+ */
+@DisplayName("High Concurrency Migration Stress Tests")
+@Tag("stress")
+@Tag("integration")
+class HighConcurrencyMigrationStressIT {
+
+ private static final Logger LOGGER = LoggerFactory.getLogger(HighConcurrencyMigrationStressIT.class);
+ private static final TypeReference STRESS_TYPE = new TypeReference("stress_entity");
+
+ @Nested
+ @DisplayName("Concurrent Single Fix Migrations")
+ class ConcurrentSingleFix {
+
+ @Test
+ @Timeout(value = 5, unit = TimeUnit.MINUTES)
+ @DisplayName("handles 100+ concurrent single-fix migrations")
+ void handles100ConcurrentSingleFixMigrations() throws Exception {
+ int threadCount = StressTestConfig.threadCount();
+ int opsPerThread = StressTestConfig.operationsPerThread();
+
+ LOGGER.info("Starting stress test with {} threads, {} ops/thread",
+ threadCount, opsPerThread);
+
+ AtomicInteger fixApplyCount = new AtomicInteger(0);
+ StressTestMetrics metrics = new StressTestMetrics();
+
+ DataFixer fixer = new DataFixerBuilder(new DataVersion(2))
+ .addFix(STRESS_TYPE, createCountingFix("single_fix", 1, 2, fixApplyCount))
+ .build();
+
+ try (ThreadOrchestrator orchestrator = ThreadOrchestrator.withThreads(threadCount)) {
+ for (int t = 0; t < threadCount; t++) {
+ final int threadId = t;
+ orchestrator.submit(() -> {
+ for (int i = 0; i < opsPerThread; i++) {
+ long startNanos = System.nanoTime();
+ boolean success = true;
+
+ try {
+ JsonObject inputObj = new JsonObject();
+ inputObj.addProperty("id", threadId * 100_000L + i);
+ inputObj.addProperty("threadId", threadId);
+ Dynamic input = new Dynamic<>(GsonOps.INSTANCE, inputObj);
+
+ Dynamic result = fixer.update(
+ STRESS_TYPE, input,
+ new DataVersion(1), new DataVersion(2)
+ );
+
+ // Verify result
+ assertThat(result.get("processed").asBoolean().result())
+ .contains(true);
+ assertThat(result.get("id").asLong().result())
+ .contains(threadId * 100_000L + i);
+ } catch (Exception e) {
+ success = false;
+ throw e;
+ } finally {
+ metrics.recordOperation(System.nanoTime() - startNanos, success);
+ }
+ }
+ });
+ }
+
+ Instant startTime = Instant.now();
+ orchestrator.startAll();
+ boolean completed = orchestrator.awaitCompletion(Duration.ofMinutes(5));
+
+ Duration elapsed = Duration.between(startTime, Instant.now());
+ metrics.report("handles100ConcurrentSingleFixMigrations", elapsed);
+
+ assertThat(completed).as("All threads completed within timeout").isTrue();
+ assertThat(orchestrator.errors()).as("No exceptions thrown").isEmpty();
+ assertThat(fixApplyCount.get())
+ .as("All fixes applied")
+ .isEqualTo(threadCount * opsPerThread);
+ }
+ }
+
+ @Test
+ @Timeout(value = 5, unit = TimeUnit.MINUTES)
+ @DisplayName("maintains data integrity under high contention")
+ void maintainsDataIntegrityUnderHighContention() throws Exception {
+ int threadCount = StressTestConfig.threadCount();
+ int opsPerThread = StressTestConfig.operationsPerThread();
+
+ // Track all processed IDs to verify no duplicates or losses
+ Set processedIds = ConcurrentHashMap.newKeySet();
+
+ DataFixer fixer = new DataFixerBuilder(new DataVersion(2))
+ .addFix(STRESS_TYPE, createSimpleFix("integrity_fix", 1, 2))
+ .build();
+
+ try (ThreadOrchestrator orchestrator = ThreadOrchestrator.withThreads(threadCount)) {
+ for (int t = 0; t < threadCount; t++) {
+ final int threadId = t;
+ orchestrator.submit(() -> {
+ for (int i = 0; i < opsPerThread; i++) {
+ long uniqueId = threadId * 100_000L + i;
+
+ JsonObject inputObj = new JsonObject();
+ inputObj.addProperty("id", uniqueId);
+ Dynamic input = new Dynamic<>(GsonOps.INSTANCE, inputObj);
+
+ Dynamic result = fixer.update(
+ STRESS_TYPE, input,
+ new DataVersion(1), new DataVersion(2)
+ );
+
+ // Verify the ID wasn't corrupted
+ Long resultId = result.get("id").asLong().result().orElse(null);
+ assertThat(resultId).isEqualTo(uniqueId);
+
+ // Track that we processed this ID
+ processedIds.add(uniqueId);
+ }
+ });
+ }
+
+ orchestrator.startAll();
+ boolean completed = orchestrator.awaitCompletion(Duration.ofMinutes(5));
+
+ assertThat(completed).isTrue();
+ assertThat(orchestrator.errors()).isEmpty();
+
+ // Verify all expected IDs were processed exactly once
+ assertThat(processedIds)
+ .as("All unique IDs processed")
+ .hasSize(threadCount * opsPerThread);
+ }
+ }
+ }
+
+ @Nested
+ @DisplayName("Concurrent Chain Migrations")
+ class ConcurrentChainMigration {
+
+ @Test
+ @Timeout(value = 10, unit = TimeUnit.MINUTES)
+ @DisplayName("handles concurrent 10-fix chain migrations")
+ void handlesConcurrentChainMigrations() throws Exception {
+ int threadCount = Math.min(StressTestConfig.threadCount(), 50); // Reduce for chain tests
+ int opsPerThread = StressTestConfig.operationsPerThread() / 10; // Reduce ops for longer chains
+
+ LOGGER.info("Starting chain migration test with {} threads, {} ops/thread, 10-fix chain",
+ threadCount, opsPerThread);
+
+ AtomicInteger totalFixApplications = new AtomicInteger(0);
+ StressTestMetrics metrics = new StressTestMetrics();
+
+ // Build a fixer with 10 chained fixes
+ DataFixerBuilder builder = new DataFixerBuilder(new DataVersion(11));
+ for (int v = 1; v <= 10; v++) {
+ final int version = v;
+ builder.addFix(STRESS_TYPE, new DataFix() {
+ @Override
+ public @NotNull String name() {
+ return "chain_fix_v" + version;
+ }
+
+ @Override
+ public @NotNull DataVersion fromVersion() {
+ return new DataVersion(version);
+ }
+
+ @Override
+ public @NotNull DataVersion toVersion() {
+ return new DataVersion(version + 1);
+ }
+
+ @Override
+ public @NotNull Dynamic apply(
+ @NotNull TypeReference type,
+ @NotNull Dynamic input,
+ @NotNull DataFixerContext context
+ ) {
+ totalFixApplications.incrementAndGet();
+ int currentStep = input.get("migration_steps").asInt().result().orElse(0);
+ return input.set("migration_steps", input.createInt(currentStep + 1));
+ }
+ });
+ }
+ DataFixer fixer = builder.build();
+
+ try (ThreadOrchestrator orchestrator = ThreadOrchestrator.withThreads(threadCount)) {
+ for (int t = 0; t < threadCount; t++) {
+ final int threadId = t;
+ orchestrator.submit(() -> {
+ for (int i = 0; i < opsPerThread; i++) {
+ long startNanos = System.nanoTime();
+ boolean success = true;
+
+ try {
+ JsonObject inputObj = new JsonObject();
+ inputObj.addProperty("id", threadId * 100_000L + i);
+ inputObj.addProperty("migration_steps", 0);
+ Dynamic input = new Dynamic<>(GsonOps.INSTANCE, inputObj);
+
+ Dynamic result = fixer.update(
+ STRESS_TYPE, input,
+ new DataVersion(1), new DataVersion(11)
+ );
+
+ // Verify all 10 fixes were applied
+ assertThat(result.get("migration_steps").asInt().result())
+ .contains(10);
+ } catch (Exception e) {
+ success = false;
+ throw e;
+ } finally {
+ metrics.recordOperation(System.nanoTime() - startNanos, success);
+ }
+ }
+ });
+ }
+
+ Instant startTime = Instant.now();
+ orchestrator.startAll();
+ boolean completed = orchestrator.awaitCompletion(Duration.ofMinutes(10));
+
+ Duration elapsed = Duration.between(startTime, Instant.now());
+ metrics.report("handlesConcurrentChainMigrations", elapsed);
+
+ assertThat(completed).isTrue();
+ assertThat(orchestrator.errors()).isEmpty();
+
+ // Each operation applies 10 fixes
+ assertThat(totalFixApplications.get())
+ .isEqualTo(threadCount * opsPerThread * 10);
+ }
+ }
+
+ @Test
+ @Timeout(value = 10, unit = TimeUnit.MINUTES)
+ @DisplayName("handles mixed version range migrations concurrently")
+ void handlesMixedVersionRanges() throws Exception {
+ int threadCount = StressTestConfig.threadCount();
+ int opsPerThread = StressTestConfig.operationsPerThread() / 5;
+
+ // Build fixer with 5 fixes
+ DataFixerBuilder builder = new DataFixerBuilder(new DataVersion(6));
+ for (int v = 1; v <= 5; v++) {
+ builder.addFix(STRESS_TYPE, createSimpleFix("mix_fix_v" + v, v, v + 1));
+ }
+ DataFixer fixer = builder.build();
+
+ try (ThreadOrchestrator orchestrator = ThreadOrchestrator.withThreads(threadCount)) {
+ for (int t = 0; t < threadCount; t++) {
+ final int threadId = t;
+ orchestrator.submit(() -> {
+ for (int i = 0; i < opsPerThread; i++) {
+ // Different threads use different version ranges
+ int fromVersion = (threadId % 5) + 1;
+ int toVersion = 6;
+
+ JsonObject inputObj = new JsonObject();
+ inputObj.addProperty("id", threadId * 100_000L + i);
+ inputObj.addProperty("start_version", fromVersion);
+ Dynamic input = new Dynamic<>(GsonOps.INSTANCE, inputObj);
+
+ Dynamic result = fixer.update(
+ STRESS_TYPE, input,
+ new DataVersion(fromVersion), new DataVersion(toVersion)
+ );
+
+ // Verify the data is still intact
+ assertThat(result.get("id").asLong().result())
+ .contains(threadId * 100_000L + i);
+ }
+ });
+ }
+
+ orchestrator.startAll();
+ boolean completed = orchestrator.awaitCompletion(Duration.ofMinutes(10));
+
+ assertThat(completed).isTrue();
+ assertThat(orchestrator.errors()).isEmpty();
+ }
+ }
+ }
+
+ @Nested
+ @DisplayName("Throughput Scaling")
+ class ScalingAnalysis {
+
+ @ParameterizedTest(name = "threads={0}")
+ @ValueSource(ints = {10, 25, 50, 100})
+ @Timeout(value = 5, unit = TimeUnit.MINUTES)
+ @DisplayName("throughput scales with thread count")
+ void throughputScalesWithThreadCount(int threadCount) throws Exception {
+ int opsPerThread = 1000;
+ StressTestMetrics metrics = new StressTestMetrics();
+
+ DataFixer fixer = new DataFixerBuilder(new DataVersion(2))
+ .addFix(STRESS_TYPE, createSimpleFix("scale_fix", 1, 2))
+ .build();
+
+ try (ThreadOrchestrator orchestrator = ThreadOrchestrator.withThreads(threadCount)) {
+ for (int t = 0; t < threadCount; t++) {
+ final int threadId = t;
+ orchestrator.submit(() -> {
+ for (int i = 0; i < opsPerThread; i++) {
+ long startNanos = System.nanoTime();
+
+ JsonObject inputObj = new JsonObject();
+ inputObj.addProperty("id", threadId * 100_000L + i);
+ Dynamic input = new Dynamic<>(GsonOps.INSTANCE, inputObj);
+
+ fixer.update(
+ STRESS_TYPE, input,
+ new DataVersion(1), new DataVersion(2)
+ );
+
+ metrics.recordOperation(System.nanoTime() - startNanos, true);
+ }
+ });
+ }
+
+ Instant startTime = Instant.now();
+ orchestrator.startAll();
+ boolean completed = orchestrator.awaitCompletion(Duration.ofMinutes(5));
+ Duration elapsed = Duration.between(startTime, Instant.now());
+
+ LOGGER.info("Threads: {}, Throughput: {} ops/sec, Avg latency: {} us",
+ threadCount,
+ String.format("%.2f", metrics.throughputPerSecond(elapsed)),
+ metrics.averageLatencyNanos() / 1000);
+
+ assertThat(completed).isTrue();
+ assertThat(orchestrator.errors()).isEmpty();
+ assertThat(metrics.totalOperations()).isEqualTo((long) threadCount * opsPerThread);
+ }
+ }
+ }
+
+ // Helper methods
+
+ private DataFix createSimpleFix(String name, int from, int to) {
+ return new DataFix<>() {
+ @Override
+ public @NotNull String name() {
+ return name;
+ }
+
+ @Override
+ public @NotNull DataVersion fromVersion() {
+ return new DataVersion(from);
+ }
+
+ @Override
+ public @NotNull DataVersion toVersion() {
+ return new DataVersion(to);
+ }
+
+ @Override
+ public @NotNull Dynamic apply(
+ @NotNull TypeReference type,
+ @NotNull Dynamic input,
+ @NotNull DataFixerContext context
+ ) {
+ return input.set("processed", input.createBoolean(true));
+ }
+ };
+ }
+
+ private DataFix createCountingFix(String name, int from, int to, AtomicInteger counter) {
+ return new DataFix<>() {
+ @Override
+ public @NotNull String name() {
+ return name;
+ }
+
+ @Override
+ public @NotNull DataVersion fromVersion() {
+ return new DataVersion(from);
+ }
+
+ @Override
+ public @NotNull DataVersion toVersion() {
+ return new DataVersion(to);
+ }
+
+ @Override
+ public @NotNull Dynamic apply(
+ @NotNull TypeReference type,
+ @NotNull Dynamic input,
+ @NotNull DataFixerContext context
+ ) {
+ counter.incrementAndGet();
+ return input.set("processed", input.createBoolean(true));
+ }
+ };
+ }
+}
diff --git a/aether-datafixers-functional-tests/src/test/java/de/splatgames/aether/datafixers/functional/stress/MemoryPressureStressIT.java b/aether-datafixers-functional-tests/src/test/java/de/splatgames/aether/datafixers/functional/stress/MemoryPressureStressIT.java
new file mode 100644
index 0000000..759bc7c
--- /dev/null
+++ b/aether-datafixers-functional-tests/src/test/java/de/splatgames/aether/datafixers/functional/stress/MemoryPressureStressIT.java
@@ -0,0 +1,387 @@
+/*
+ * Copyright (c) 2025 Splatgames.de Software and Contributors
+ *
+ * Permission is hereby granted, free of charge, to any person obtaining a copy
+ * of this software and associated documentation files (the "Software"), to deal
+ * in the Software without restriction, including without limitation the rights
+ * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+ * copies of the Software, and to permit persons to whom the Software is
+ * furnished to do so, subject to the following conditions:
+ *
+ * The above copyright notice and this permission notice shall be included in all
+ * copies or substantial portions of the Software.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+ * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+ * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+ * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+ * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+ * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
+ * SOFTWARE.
+ */
+
+package de.splatgames.aether.datafixers.functional.stress;
+
+import com.google.gson.JsonElement;
+import com.google.gson.JsonObject;
+import de.splatgames.aether.datafixers.api.DataVersion;
+import de.splatgames.aether.datafixers.api.TypeReference;
+import de.splatgames.aether.datafixers.api.dynamic.Dynamic;
+import de.splatgames.aether.datafixers.api.fix.DataFix;
+import de.splatgames.aether.datafixers.api.fix.DataFixer;
+import de.splatgames.aether.datafixers.api.fix.DataFixerContext;
+import de.splatgames.aether.datafixers.codec.json.gson.GsonOps;
+import de.splatgames.aether.datafixers.core.fix.DataFixerBuilder;
+import de.splatgames.aether.datafixers.functional.stress.util.StressTestConfig;
+import org.jetbrains.annotations.NotNull;
+import org.junit.jupiter.api.DisplayName;
+import org.junit.jupiter.api.Nested;
+import org.junit.jupiter.api.Tag;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.Timeout;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.lang.management.ManagementFactory;
+import java.lang.management.MemoryMXBean;
+import java.lang.management.MemoryUsage;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.concurrent.*;
+import java.util.concurrent.atomic.AtomicLong;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+/**
+ * Tests memory behavior under stress.
+ *
+ * Verifies:
+ *
+ * - No memory leaks in repeated migration cycles
+ * - Stable memory usage under sustained load
+ * - Graceful handling of GC pressure
+ *
+ *
+ * Run with: {@code mvn verify -Pstress -Dgroups=stress}
+ */
+@DisplayName("Memory Pressure Stress Tests")
+@Tag("stress")
+@Tag("integration")
+class MemoryPressureStressIT {
+
+ private static final Logger LOGGER = LoggerFactory.getLogger(MemoryPressureStressIT.class);
+ private static final TypeReference MEMORY_TYPE = new TypeReference("memory_entity");
+ private static final MemoryMXBean MEMORY_BEAN = ManagementFactory.getMemoryMXBean();
+
+ @Nested
+ @DisplayName("Memory Leak Detection")
+ class MemoryLeakDetection {
+
+ @Test
+ @Timeout(value = 10, unit = TimeUnit.MINUTES)
+ @DisplayName("no memory leaks in repeated migration cycles")
+ void noMemoryLeaksInRepeatedMigrationCycles() throws Exception {
+ int cycles = 10;
+ int operationsPerCycle = 10_000;
+
+ LOGGER.info("Starting memory leak detection: {} cycles, {} ops/cycle",
+ cycles, operationsPerCycle);
+
+ List heapUsageAfterGc = new ArrayList<>();
+
+ for (int cycle = 0; cycle < cycles; cycle++) {
+ DataFixer fixer = new DataFixerBuilder(new DataVersion(2))
+ .addFix(MEMORY_TYPE, createSimpleFix("memory_fix_" + cycle, 1, 2))
+ .build();
+
+ // Perform many migrations
+ for (int i = 0; i < operationsPerCycle; i++) {
+ JsonObject inputObj = new JsonObject();
+ inputObj.addProperty("id", cycle * 1_000_000L + i);
+ inputObj.addProperty("data", "test_data_" + i);
+ Dynamic input = new Dynamic<>(GsonOps.INSTANCE, inputObj);
+
+ fixer.update(
+ MEMORY_TYPE, input,
+ new DataVersion(1), new DataVersion(2)
+ );
+ }
+
+ // Force GC and measure heap
+ forceGc();
+ MemoryUsage heapUsage = MEMORY_BEAN.getHeapMemoryUsage();
+ heapUsageAfterGc.add(heapUsage.getUsed());
+
+ LOGGER.info("Cycle {}: Heap after GC = {} MB",
+ cycle + 1, heapUsage.getUsed() / (1024 * 1024));
+ }
+
+ // Analyze heap growth trend
+ // First few cycles may show growth due to JIT compilation and class loading
+ // After warm-up (cycles 3+), heap should be relatively stable
+ List stablePhase = heapUsageAfterGc.subList(3, heapUsageAfterGc.size());
+
+ long minHeap = Collections.min(stablePhase);
+ long maxHeap = Collections.max(stablePhase);
+ double growthRatio = (double) maxHeap / minHeap;
+
+ LOGGER.info("Stable phase heap range: {} MB - {} MB (ratio: {})",
+ minHeap / (1024 * 1024),
+ maxHeap / (1024 * 1024),
+ String.format("%.2f", growthRatio));
+
+ // Allow up to 50% heap variation during stable phase
+ // This accounts for GC timing variations
+ assertThat(growthRatio)
+ .as("Heap growth ratio should be stable (no unbounded leak)")
+ .isLessThan(1.5);
+ }
+
+ @Test
+ @Timeout(value = 5, unit = TimeUnit.MINUTES)
+ @DisplayName("fixer instances can be garbage collected")
+ void fixerInstancesCanBeGarbageCollected() throws Exception {
+ int fixerCount = 100;
+ int opsPerFixer = 1000;
+
+ LOGGER.info("Creating and discarding {} fixers", fixerCount);
+
+ long heapBefore = getHeapUsedAfterGc();
+
+ for (int f = 0; f < fixerCount; f++) {
+ DataFixer fixer = new DataFixerBuilder(new DataVersion(2))
+ .addFix(MEMORY_TYPE, createSimpleFix("disposable_fix_" + f, 1, 2))
+ .build();
+
+ for (int i = 0; i < opsPerFixer; i++) {
+ JsonObject inputObj = new JsonObject();
+ inputObj.addProperty("id", i);
+ Dynamic input = new Dynamic<>(GsonOps.INSTANCE, inputObj);
+
+ fixer.update(
+ MEMORY_TYPE, input,
+ new DataVersion(1), new DataVersion(2)
+ );
+ }
+ // fixer goes out of scope and should be eligible for GC
+ }
+
+ long heapAfter = getHeapUsedAfterGc();
+
+ LOGGER.info("Heap before: {} MB, after: {} MB",
+ heapBefore / (1024 * 1024),
+ heapAfter / (1024 * 1024));
+
+ // Heap should not have grown significantly
+ // Allow some growth for class metadata and JIT artifacts
+ long heapGrowth = heapAfter - heapBefore;
+ long maxAllowedGrowth = 50 * 1024 * 1024; // 50 MB
+
+ assertThat(heapGrowth)
+ .as("Heap growth after disposing fixers")
+ .isLessThan(maxAllowedGrowth);
+ }
+ }
+
+ @Nested
+ @DisplayName("GC Pressure Handling")
+ class GcPressureHandling {
+
+ @Test
+ @Timeout(value = 5, unit = TimeUnit.MINUTES)
+ @DisplayName("handles GC pressure gracefully")
+ void handlesGcPressureGracefully() throws Exception {
+ int threadCount = 20;
+ int opsPerThread = 10_000;
+
+ LOGGER.info("Starting GC pressure test: {} threads, {} ops/thread",
+ threadCount, opsPerThread);
+
+ AtomicLong totalOps = new AtomicLong(0);
+ List errors = Collections.synchronizedList(new ArrayList<>());
+
+ DataFixer fixer = new DataFixerBuilder(new DataVersion(2))
+ .addFix(MEMORY_TYPE, createLargeObjectFix("gc_pressure_fix", 1, 2))
+ .build();
+
+ ExecutorService executor = Executors.newFixedThreadPool(threadCount);
+ CountDownLatch latch = new CountDownLatch(threadCount);
+
+ for (int t = 0; t < threadCount; t++) {
+ final int threadId = t;
+ executor.submit(() -> {
+ try {
+ for (int i = 0; i < opsPerThread; i++) {
+ // Create large input to generate allocation pressure
+ JsonObject inputObj = new JsonObject();
+ inputObj.addProperty("id", threadId * 1_000_000L + i);
+ inputObj.addProperty("largeField", createLargeString(1024));
+ Dynamic input = new Dynamic<>(GsonOps.INSTANCE, inputObj);
+
+ Dynamic result = fixer.update(
+ MEMORY_TYPE, input,
+ new DataVersion(1), new DataVersion(2)
+ );
+
+ assertThat(result.get("processed").asBoolean().result())
+ .contains(true);
+
+ totalOps.incrementAndGet();
+ }
+ } catch (Exception e) {
+ errors.add(e);
+ } finally {
+ latch.countDown();
+ }
+ });
+ }
+
+ boolean completed = latch.await(5, TimeUnit.MINUTES);
+ executor.shutdown();
+
+ assertThat(completed).isTrue();
+ assertThat(errors).isEmpty();
+ assertThat(totalOps.get())
+ .isEqualTo((long) threadCount * opsPerThread);
+
+ LOGGER.info("Completed {} operations under GC pressure", totalOps.get());
+ }
+
+ @Test
+ @Timeout(value = 5, unit = TimeUnit.MINUTES)
+ @DisplayName("maintains correctness during GC pauses")
+ void maintainsCorrectnessDuringGcPauses() throws Exception {
+ int iterations = 100;
+ int opsPerIteration = 1000;
+
+ LOGGER.info("Testing correctness across {} iterations with GC", iterations);
+
+ DataFixer fixer = new DataFixerBuilder(new DataVersion(2))
+ .addFix(MEMORY_TYPE, createSimpleFix("gc_correctness_fix", 1, 2))
+ .build();
+
+ for (int iter = 0; iter < iterations; iter++) {
+ // Force GC at various points to trigger potential issues
+ if (iter % 10 == 0) {
+ System.gc();
+ }
+
+ for (int i = 0; i < opsPerIteration; i++) {
+ long uniqueId = iter * 1_000_000L + i;
+
+ JsonObject inputObj = new JsonObject();
+ inputObj.addProperty("id", uniqueId);
+ inputObj.addProperty("checksum", uniqueId * 31);
+ Dynamic input = new Dynamic<>(GsonOps.INSTANCE, inputObj);
+
+ Dynamic result = fixer.update(
+ MEMORY_TYPE, input,
+ new DataVersion(1), new DataVersion(2)
+ );
+
+ // Verify data integrity
+ Long resultId = result.get("id").asLong().result().orElse(null);
+ Long resultChecksum = result.get("checksum").asLong().result().orElse(null);
+
+ assertThat(resultId).isEqualTo(uniqueId);
+ assertThat(resultChecksum).isEqualTo(uniqueId * 31);
+ }
+ }
+
+ LOGGER.info("Completed correctness test with {} total operations",
+ iterations * opsPerIteration);
+ }
+ }
+
+ // Helper methods
+
+ private void forceGc() {
+ System.gc();
+ try {
+ Thread.sleep(100);
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ }
+ System.gc();
+ try {
+ Thread.sleep(100);
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ }
+ }
+
+ private long getHeapUsedAfterGc() {
+ forceGc();
+ return MEMORY_BEAN.getHeapMemoryUsage().getUsed();
+ }
+
+ private String createLargeString(int size) {
+ StringBuilder sb = new StringBuilder(size);
+ for (int i = 0; i < size; i++) {
+ sb.append((char) ('a' + (i % 26)));
+ }
+ return sb.toString();
+ }
+
+ private DataFix createSimpleFix(String name, int from, int to) {
+ return new DataFix<>() {
+ @Override
+ public @NotNull String name() {
+ return name;
+ }
+
+ @Override
+ public @NotNull DataVersion fromVersion() {
+ return new DataVersion(from);
+ }
+
+ @Override
+ public @NotNull DataVersion toVersion() {
+ return new DataVersion(to);
+ }
+
+ @Override
+ public @NotNull Dynamic apply(
+ @NotNull TypeReference type,
+ @NotNull Dynamic input,
+ @NotNull DataFixerContext context
+ ) {
+ return input.set("processed", input.createBoolean(true));
+ }
+ };
+ }
+
+ private DataFix createLargeObjectFix(String name, int from, int to) {
+ return new DataFix<>() {
+ @Override
+ public @NotNull String name() {
+ return name;
+ }
+
+ @Override
+ public @NotNull DataVersion fromVersion() {
+ return new DataVersion(from);
+ }
+
+ @Override
+ public @NotNull DataVersion toVersion() {
+ return new DataVersion(to);
+ }
+
+ @Override
+ public @NotNull Dynamic apply(
+ @NotNull TypeReference type,
+ @NotNull Dynamic input,
+ @NotNull DataFixerContext context
+ ) {
+ // Create some intermediate objects to increase allocation pressure
+ String largeField = input.get("largeField").asString().result().orElse("");
+ String processed = largeField.toUpperCase();
+ return input
+ .set("processed", input.createBoolean(true))
+ .set("processedField", input.createString(processed.substring(0, Math.min(100, processed.length()))));
+ }
+ };
+ }
+}
diff --git a/aether-datafixers-functional-tests/src/test/java/de/splatgames/aether/datafixers/functional/stress/MixedRegistryAccessStressIT.java b/aether-datafixers-functional-tests/src/test/java/de/splatgames/aether/datafixers/functional/stress/MixedRegistryAccessStressIT.java
new file mode 100644
index 0000000..021c726
--- /dev/null
+++ b/aether-datafixers-functional-tests/src/test/java/de/splatgames/aether/datafixers/functional/stress/MixedRegistryAccessStressIT.java
@@ -0,0 +1,288 @@
+/*
+ * Copyright (c) 2025 Splatgames.de Software and Contributors
+ *
+ * Permission is hereby granted, free of charge, to any person obtaining a copy
+ * of this software and associated documentation files (the "Software"), to deal
+ * in the Software without restriction, including without limitation the rights
+ * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+ * copies of the Software, and to permit persons to whom the Software is
+ * furnished to do so, subject to the following conditions:
+ *
+ * The above copyright notice and this permission notice shall be included in all
+ * copies or substantial portions of the Software.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+ * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+ * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+ * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+ * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+ * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
+ * SOFTWARE.
+ */
+
+package de.splatgames.aether.datafixers.functional.stress;
+
+import de.splatgames.aether.datafixers.api.DataVersion;
+import de.splatgames.aether.datafixers.api.TypeReference;
+import de.splatgames.aether.datafixers.api.type.Type;
+import de.splatgames.aether.datafixers.core.type.SimpleTypeRegistry;
+import de.splatgames.aether.datafixers.functional.stress.util.StressTestConfig;
+import de.splatgames.aether.datafixers.functional.stress.util.ThreadOrchestrator;
+import org.junit.jupiter.api.DisplayName;
+import org.junit.jupiter.api.Nested;
+import org.junit.jupiter.api.Tag;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.Timeout;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.time.Duration;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.concurrent.*;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.assertj.core.api.Assertions.assertThatThrownBy;
+
+/**
+ * Tests mixed read/write patterns on registries under concurrent access.
+ *
+ * Verifies:
+ *
+ * - Frozen registry handles massive concurrent reads
+ * - Pre-freeze concurrent access patterns
+ * - Proper rejection of post-freeze modifications
+ *
+ *
+ * Run with: {@code mvn verify -Pstress -Dgroups=stress}
+ */
+@DisplayName("Mixed Registry Access Stress Tests")
+@Tag("stress")
+@Tag("integration")
+class MixedRegistryAccessStressIT {
+
+ private static final Logger LOGGER = LoggerFactory.getLogger(MixedRegistryAccessStressIT.class);
+
+ @Nested
+ @DisplayName("Post-Freeze Concurrent Reads")
+ class PostFreezeConcurrentReads {
+
+ @Test
+ @Timeout(value = 5, unit = TimeUnit.MINUTES)
+ @DisplayName("frozen registry handles massive concurrent reads")
+ void frozenRegistryHandlesMassiveConcurrentReads() throws Exception {
+ int threadCount = StressTestConfig.threadCount();
+ int readsPerThread = StressTestConfig.operationsPerThread();
+ int typeCount = 100;
+
+ LOGGER.info("Starting registry read stress test: {} threads, {} reads/thread, {} types",
+ threadCount, readsPerThread, typeCount);
+
+ // Setup: Create and freeze a registry with many types
+ SimpleTypeRegistry registry = new SimpleTypeRegistry();
+ for (int i = 0; i < typeCount; i++) {
+ Type type = Type.named("stress_type_" + i, Type.STRING);
+ registry.register(type);
+ }
+ registry.freeze();
+
+ AtomicInteger successfulReads = new AtomicInteger(0);
+
+ try (ThreadOrchestrator orchestrator = ThreadOrchestrator.withThreads(threadCount)) {
+ for (int t = 0; t < threadCount; t++) {
+ orchestrator.submit(() -> {
+ for (int i = 0; i < readsPerThread; i++) {
+ int typeIndex = i % typeCount;
+ TypeReference ref = new TypeReference("stress_type_" + typeIndex);
+
+ Type> type = registry.get(ref);
+
+ assertThat(type).isNotNull();
+ assertThat(type.reference().getId()).isEqualTo("stress_type_" + typeIndex);
+
+ successfulReads.incrementAndGet();
+ }
+ });
+ }
+
+ orchestrator.startAll();
+ boolean completed = orchestrator.awaitCompletion(Duration.ofMinutes(5));
+
+ assertThat(completed).isTrue();
+ assertThat(orchestrator.errors()).isEmpty();
+ assertThat(successfulReads.get())
+ .isEqualTo(threadCount * readsPerThread);
+ }
+
+ LOGGER.info("Completed {} successful reads", successfulReads.get());
+ }
+
+ @Test
+ @Timeout(value = 2, unit = TimeUnit.MINUTES)
+ @DisplayName("rejects modifications after freeze under contention")
+ void rejectsModificationsAfterFreezeUnderContention() throws Exception {
+ int threadCount = 50;
+ int attemptsPerThread = 100;
+
+ SimpleTypeRegistry registry = new SimpleTypeRegistry();
+ registry.register(Type.named("initial_type", Type.STRING));
+ registry.freeze();
+
+ AtomicInteger rejectionCount = new AtomicInteger(0);
+
+ try (ThreadOrchestrator orchestrator = ThreadOrchestrator.withThreads(threadCount)) {
+ for (int t = 0; t < threadCount; t++) {
+ final int threadId = t;
+ orchestrator.submit(() -> {
+ for (int i = 0; i < attemptsPerThread; i++) {
+ try {
+ Type newType = Type.named(
+ "new_type_" + threadId + "_" + i,
+ Type.STRING
+ );
+ registry.register(newType);
+ // Should not reach here
+ throw new AssertionError("Expected IllegalStateException");
+ } catch (IllegalStateException e) {
+ // Expected behavior
+ rejectionCount.incrementAndGet();
+ }
+ }
+ });
+ }
+
+ orchestrator.startAll();
+ boolean completed = orchestrator.awaitCompletion(Duration.ofMinutes(2));
+
+ assertThat(completed).isTrue();
+ assertThat(orchestrator.errors()).isEmpty();
+ assertThat(rejectionCount.get())
+ .as("All modification attempts rejected")
+ .isEqualTo(threadCount * attemptsPerThread);
+ }
+ }
+
+ @Test
+ @Timeout(value = 5, unit = TimeUnit.MINUTES)
+ @DisplayName("handles interleaved reads of different types")
+ void handlesInterleavedReadsOfDifferentTypes() throws Exception {
+ int threadCount = StressTestConfig.threadCount();
+ int readsPerThread = StressTestConfig.operationsPerThread();
+ int typeCount = 500;
+
+ SimpleTypeRegistry registry = new SimpleTypeRegistry();
+ for (int i = 0; i < typeCount; i++) {
+ registry.register(Type.named("interleaved_type_" + i, Type.STRING));
+ }
+ registry.freeze();
+
+ AtomicInteger successfulReads = new AtomicInteger(0);
+
+ try (ThreadOrchestrator orchestrator = ThreadOrchestrator.withThreads(threadCount)) {
+ for (int t = 0; t < threadCount; t++) {
+ final int threadId = t;
+ orchestrator.submit(() -> {
+ // Each thread accesses types in a different pattern
+ int startOffset = threadId * 7; // Prime number for better distribution
+ for (int i = 0; i < readsPerThread; i++) {
+ int typeIndex = (startOffset + i * 13) % typeCount;
+ TypeReference ref = new TypeReference("interleaved_type_" + typeIndex);
+
+ Type> type = registry.get(ref);
+ assertThat(type).isNotNull();
+
+ successfulReads.incrementAndGet();
+ }
+ });
+ }
+
+ orchestrator.startAll();
+ boolean completed = orchestrator.awaitCompletion(Duration.ofMinutes(5));
+
+ assertThat(completed).isTrue();
+ assertThat(orchestrator.errors()).isEmpty();
+ }
+
+ LOGGER.info("Completed {} interleaved reads across {} types",
+ successfulReads.get(), typeCount);
+ }
+ }
+
+ @Nested
+ @DisplayName("Pre-Freeze Concurrent Access")
+ class PreFreezeConcurrentAccess {
+
+ @Test
+ @Timeout(value = 2, unit = TimeUnit.MINUTES)
+ @DisplayName("handles concurrent reads during sequential registration")
+ void handlesConcurrentReadsDuringSequentialRegistration() throws Exception {
+ int readerThreads = 20;
+ int readsPerThread = 1000;
+ int typeCount = 100;
+
+ SimpleTypeRegistry registry = new SimpleTypeRegistry();
+ AtomicInteger registeredCount = new AtomicInteger(0);
+ AtomicInteger successfulReads = new AtomicInteger(0);
+
+ ExecutorService executor = Executors.newFixedThreadPool(readerThreads + 1);
+ CountDownLatch startLatch = new CountDownLatch(1);
+ CountDownLatch completionLatch = new CountDownLatch(readerThreads + 1);
+ List errors = Collections.synchronizedList(new ArrayList<>());
+
+ // Registration thread (sequential, to avoid race in the non-thread-safe registry)
+ executor.submit(() -> {
+ try {
+ startLatch.await();
+ for (int i = 0; i < typeCount; i++) {
+ registry.register(Type.named("concurrent_type_" + i, Type.STRING));
+ registeredCount.incrementAndGet();
+ Thread.sleep(1); // Small delay to allow reads to interleave
+ }
+ } catch (Exception e) {
+ errors.add(e);
+ } finally {
+ completionLatch.countDown();
+ }
+ });
+
+ // Reader threads
+ for (int t = 0; t < readerThreads; t++) {
+ executor.submit(() -> {
+ try {
+ startLatch.await();
+ for (int i = 0; i < readsPerThread; i++) {
+ int currentRegistered = registeredCount.get();
+ if (currentRegistered > 0) {
+ int typeIndex = i % currentRegistered;
+ TypeReference ref = new TypeReference("concurrent_type_" + typeIndex);
+ Type> type = registry.get(ref);
+ if (type != null) {
+ successfulReads.incrementAndGet();
+ }
+ }
+ Thread.yield(); // Allow other threads to run
+ }
+ } catch (Exception e) {
+ errors.add(e);
+ } finally {
+ completionLatch.countDown();
+ }
+ });
+ }
+
+ startLatch.countDown();
+ boolean completed = completionLatch.await(2, TimeUnit.MINUTES);
+ executor.shutdown();
+
+ assertThat(completed).isTrue();
+ assertThat(registeredCount.get()).isEqualTo(typeCount);
+
+ // Note: Some reads may have occurred before any types were registered,
+ // so we just verify the test completed without errors
+ LOGGER.info("Completed with {} registered types and {} successful reads",
+ registeredCount.get(), successfulReads.get());
+ }
+ }
+}
diff --git a/aether-datafixers-functional-tests/src/test/java/de/splatgames/aether/datafixers/functional/stress/SustainedLoadMigrationStressIT.java b/aether-datafixers-functional-tests/src/test/java/de/splatgames/aether/datafixers/functional/stress/SustainedLoadMigrationStressIT.java
new file mode 100644
index 0000000..63293ab
--- /dev/null
+++ b/aether-datafixers-functional-tests/src/test/java/de/splatgames/aether/datafixers/functional/stress/SustainedLoadMigrationStressIT.java
@@ -0,0 +1,405 @@
+/*
+ * Copyright (c) 2025 Splatgames.de Software and Contributors
+ *
+ * Permission is hereby granted, free of charge, to any person obtaining a copy
+ * of this software and associated documentation files (the "Software"), to deal
+ * in the Software without restriction, including without limitation the rights
+ * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+ * copies of the Software, and to permit persons to whom the Software is
+ * furnished to do so, subject to the following conditions:
+ *
+ * The above copyright notice and this permission notice shall be included in all
+ * copies or substantial portions of the Software.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+ * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+ * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+ * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+ * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+ * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
+ * SOFTWARE.
+ */
+
+package de.splatgames.aether.datafixers.functional.stress;
+
+import com.google.gson.JsonElement;
+import com.google.gson.JsonObject;
+import de.splatgames.aether.datafixers.api.DataVersion;
+import de.splatgames.aether.datafixers.api.TypeReference;
+import de.splatgames.aether.datafixers.api.dynamic.Dynamic;
+import de.splatgames.aether.datafixers.api.fix.DataFix;
+import de.splatgames.aether.datafixers.api.fix.DataFixer;
+import de.splatgames.aether.datafixers.api.fix.DataFixerContext;
+import de.splatgames.aether.datafixers.codec.json.gson.GsonOps;
+import de.splatgames.aether.datafixers.core.fix.DataFixerBuilder;
+import de.splatgames.aether.datafixers.functional.stress.util.StressTestConfig;
+import de.splatgames.aether.datafixers.functional.stress.util.StressTestMetrics;
+import org.jetbrains.annotations.NotNull;
+import org.junit.jupiter.api.DisplayName;
+import org.junit.jupiter.api.Nested;
+import org.junit.jupiter.api.Tag;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.Timeout;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.time.Duration;
+import java.time.Instant;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.concurrent.*;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicLong;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+/**
+ * Tests system behavior under sustained load for extended periods.
+ *
+ * Verifies that the DataFixer system maintains:
+ *
+ * - Consistent throughput over time
+ * - Stable latency without degradation
+ * - Correct behavior under burst traffic patterns
+ *
+ *
+ * Configuration via system properties:
+ *
+ * - {@code stress.duration.minutes} - Test duration (default: 5)
+ * - {@code stress.threads} - Worker thread count (default: 100)
+ *
+ *
+ * Run with: {@code mvn verify -Pstress -Dgroups=stress}
+ */
+@DisplayName("Sustained Load Migration Stress Tests")
+@Tag("stress")
+@Tag("integration")
+class SustainedLoadMigrationStressIT {
+
+ private static final Logger LOGGER = LoggerFactory.getLogger(SustainedLoadMigrationStressIT.class);
+ private static final TypeReference STRESS_TYPE = new TypeReference("sustained_entity");
+
+ @Nested
+ @DisplayName("Continuous Load")
+ class ContinuousLoad {
+
+ @Test
+ @Timeout(value = 15, unit = TimeUnit.MINUTES)
+ @DisplayName("sustains high throughput for configured duration")
+ void sustainsHighThroughputForConfiguredDuration() throws Exception {
+ Duration testDuration = StressTestConfig.duration();
+ int threadCount = Math.min(StressTestConfig.threadCount(), 50);
+
+ LOGGER.info("Starting sustained load test: {} threads for {} minutes",
+ threadCount, testDuration.toMinutes());
+
+ StressTestMetrics metrics = new StressTestMetrics();
+ AtomicBoolean running = new AtomicBoolean(true);
+ AtomicLong operationCount = new AtomicLong(0);
+
+ DataFixer fixer = new DataFixerBuilder(new DataVersion(2))
+ .addFix(STRESS_TYPE, createSimpleFix("sustained_fix", 1, 2))
+ .build();
+
+ ExecutorService executor = Executors.newFixedThreadPool(threadCount);
+ List> futures = new ArrayList<>();
+ List errors = Collections.synchronizedList(new ArrayList<>());
+
+ Instant startTime = Instant.now();
+ Instant endTime = startTime.plus(testDuration);
+
+ // Submit worker tasks
+ for (int t = 0; t < threadCount; t++) {
+ final int threadId = t;
+ futures.add(executor.submit(() -> {
+ long localOps = 0;
+ while (running.get() && Instant.now().isBefore(endTime)) {
+ try {
+ long startNanos = System.nanoTime();
+
+ JsonObject inputObj = new JsonObject();
+ inputObj.addProperty("id", threadId * 1_000_000_000L + localOps);
+ inputObj.addProperty("timestamp", System.currentTimeMillis());
+ Dynamic input = new Dynamic<>(GsonOps.INSTANCE, inputObj);
+
+ Dynamic result = fixer.update(
+ STRESS_TYPE, input,
+ new DataVersion(1), new DataVersion(2)
+ );
+
+ assertThat(result.get("processed").asBoolean().result())
+ .contains(true);
+
+ metrics.recordOperation(System.nanoTime() - startNanos, true);
+ operationCount.incrementAndGet();
+ localOps++;
+ } catch (Exception e) {
+ errors.add(e);
+ metrics.recordOperation(0, false);
+ }
+ }
+ }));
+ }
+
+ // Wait for duration to elapse
+ Thread.sleep(testDuration.toMillis());
+ running.set(false);
+
+ // Wait for all threads to finish
+ executor.shutdown();
+ boolean terminated = executor.awaitTermination(30, TimeUnit.SECONDS);
+
+ Duration actualDuration = Duration.between(startTime, Instant.now());
+ metrics.report("sustainsHighThroughputForConfiguredDuration", actualDuration);
+
+ assertThat(terminated).as("All threads terminated").isTrue();
+ assertThat(errors).as("No errors during sustained load").isEmpty();
+ assertThat(metrics.totalOperations())
+ .as("Significant number of operations completed")
+ .isGreaterThan(threadCount * 100L);
+
+ LOGGER.info("Completed {} operations in {} seconds ({} ops/sec)",
+ metrics.totalOperations(),
+ actualDuration.toSeconds(),
+ String.format("%.2f", metrics.throughputPerSecond(actualDuration)));
+ }
+
+ @Test
+ @Timeout(value = 15, unit = TimeUnit.MINUTES)
+ @DisplayName("maintains consistent latency under sustained load")
+ void maintainsConsistentLatencyUnderLoad() throws Exception {
+ Duration testDuration = Duration.ofMinutes(Math.min(StressTestConfig.duration().toMinutes(), 2));
+ int threadCount = 20;
+ int sampleInterval = 1000; // Sample every N operations
+
+ LOGGER.info("Starting latency consistency test: {} threads for {} minutes",
+ threadCount, testDuration.toMinutes());
+
+ AtomicBoolean running = new AtomicBoolean(true);
+ List latencySamples = Collections.synchronizedList(new ArrayList<>());
+
+ DataFixer fixer = new DataFixerBuilder(new DataVersion(2))
+ .addFix(STRESS_TYPE, createSimpleFix("latency_fix", 1, 2))
+ .build();
+
+ ExecutorService executor = Executors.newFixedThreadPool(threadCount);
+ List errors = Collections.synchronizedList(new ArrayList<>());
+
+ Instant endTime = Instant.now().plus(testDuration);
+
+ for (int t = 0; t < threadCount; t++) {
+ final int threadId = t;
+ executor.submit(() -> {
+ long localOps = 0;
+ while (running.get() && Instant.now().isBefore(endTime)) {
+ try {
+ long startNanos = System.nanoTime();
+
+ JsonObject inputObj = new JsonObject();
+ inputObj.addProperty("id", threadId * 1_000_000_000L + localOps);
+ Dynamic input = new Dynamic<>(GsonOps.INSTANCE, inputObj);
+
+ fixer.update(
+ STRESS_TYPE, input,
+ new DataVersion(1), new DataVersion(2)
+ );
+
+ long latencyNanos = System.nanoTime() - startNanos;
+
+ // Sample periodically to avoid memory pressure from collecting all
+ if (localOps % sampleInterval == 0) {
+ latencySamples.add(latencyNanos);
+ }
+
+ localOps++;
+ } catch (Exception e) {
+ errors.add(e);
+ }
+ }
+ });
+ }
+
+ Thread.sleep(testDuration.toMillis());
+ running.set(false);
+
+ executor.shutdown();
+ executor.awaitTermination(30, TimeUnit.SECONDS);
+
+ assertThat(errors).isEmpty();
+ assertThat(latencySamples).isNotEmpty();
+
+ // Analyze latency distribution
+ List sortedSamples = new ArrayList<>(latencySamples);
+ Collections.sort(sortedSamples);
+
+ int size = sortedSamples.size();
+ long p50 = sortedSamples.get(size / 2);
+ long p95 = sortedSamples.get((int) (size * 0.95));
+ long p99 = sortedSamples.get((int) (size * 0.99));
+ long max = sortedSamples.get(size - 1);
+
+ LOGGER.info("Latency distribution (us): p50={}, p95={}, p99={}, max={}",
+ p50 / 1000, p95 / 1000, p99 / 1000, max / 1000);
+
+ // p99 should not be excessively higher than p50 (reasonable consistency)
+ // Allow up to 100x variance for stress conditions
+ assertThat(p99)
+ .as("p99 latency should be within reasonable bounds of p50")
+ .isLessThan(p50 * 100);
+ }
+ }
+
+ @Nested
+ @DisplayName("Burst Traffic Patterns")
+ class BurstTraffic {
+
+ @Test
+ @Timeout(value = 10, unit = TimeUnit.MINUTES)
+ @DisplayName("handles burst traffic patterns")
+ void handlesBurstTrafficPatterns() throws Exception {
+ int burstThreads = 50;
+ int quietThreads = 5;
+ int burstOps = 1000;
+ int quietOps = 100;
+ int cycles = 3;
+
+ LOGGER.info("Starting burst traffic test: {} cycles of burst ({} threads) and quiet ({} threads)",
+ cycles, burstThreads, quietThreads);
+
+ StressTestMetrics burstMetrics = new StressTestMetrics();
+ StressTestMetrics quietMetrics = new StressTestMetrics();
+ List errors = Collections.synchronizedList(new ArrayList<>());
+
+ DataFixer fixer = new DataFixerBuilder(new DataVersion(2))
+ .addFix(STRESS_TYPE, createSimpleFix("burst_fix", 1, 2))
+ .build();
+
+ for (int cycle = 0; cycle < cycles; cycle++) {
+ LOGGER.info("Cycle {}/{}: Burst phase", cycle + 1, cycles);
+
+ // Burst phase
+ ExecutorService burstExecutor = Executors.newFixedThreadPool(burstThreads);
+ CountDownLatch burstLatch = new CountDownLatch(burstThreads);
+
+ for (int t = 0; t < burstThreads; t++) {
+ final int threadId = t;
+ final int currentCycle = cycle;
+ burstExecutor.submit(() -> {
+ try {
+ for (int i = 0; i < burstOps; i++) {
+ long startNanos = System.nanoTime();
+
+ JsonObject inputObj = new JsonObject();
+ inputObj.addProperty("id", currentCycle * 1_000_000L + threadId * 10_000L + i);
+ inputObj.addProperty("phase", "burst");
+ Dynamic input = new Dynamic<>(GsonOps.INSTANCE, inputObj);
+
+ fixer.update(
+ STRESS_TYPE, input,
+ new DataVersion(1), new DataVersion(2)
+ );
+
+ burstMetrics.recordOperation(System.nanoTime() - startNanos, true);
+ }
+ } catch (Exception e) {
+ errors.add(e);
+ } finally {
+ burstLatch.countDown();
+ }
+ });
+ }
+
+ burstLatch.await(2, TimeUnit.MINUTES);
+ burstExecutor.shutdown();
+
+ LOGGER.info("Cycle {}/{}: Quiet phase", cycle + 1, cycles);
+
+ // Quiet phase
+ ExecutorService quietExecutor = Executors.newFixedThreadPool(quietThreads);
+ CountDownLatch quietLatch = new CountDownLatch(quietThreads);
+
+ for (int t = 0; t < quietThreads; t++) {
+ final int threadId = t;
+ final int currentCycle = cycle;
+ quietExecutor.submit(() -> {
+ try {
+ for (int i = 0; i < quietOps; i++) {
+ long startNanos = System.nanoTime();
+
+ JsonObject inputObj = new JsonObject();
+ inputObj.addProperty("id", currentCycle * 1_000_000L + threadId * 10_000L + i);
+ inputObj.addProperty("phase", "quiet");
+ Dynamic input = new Dynamic<>(GsonOps.INSTANCE, inputObj);
+
+ fixer.update(
+ STRESS_TYPE, input,
+ new DataVersion(1), new DataVersion(2)
+ );
+
+ quietMetrics.recordOperation(System.nanoTime() - startNanos, true);
+ }
+ } catch (Exception e) {
+ errors.add(e);
+ } finally {
+ quietLatch.countDown();
+ }
+ });
+ }
+
+ quietLatch.await(1, TimeUnit.MINUTES);
+ quietExecutor.shutdown();
+ }
+
+ assertThat(errors).isEmpty();
+
+ long expectedBurstOps = (long) cycles * burstThreads * burstOps;
+ long expectedQuietOps = (long) cycles * quietThreads * quietOps;
+
+ assertThat(burstMetrics.totalOperations())
+ .as("All burst operations completed")
+ .isEqualTo(expectedBurstOps);
+
+ assertThat(quietMetrics.totalOperations())
+ .as("All quiet operations completed")
+ .isEqualTo(expectedQuietOps);
+
+ LOGGER.info("Burst phase: {} ops, avg latency {} us",
+ burstMetrics.totalOperations(),
+ burstMetrics.averageLatencyNanos() / 1000);
+ LOGGER.info("Quiet phase: {} ops, avg latency {} us",
+ quietMetrics.totalOperations(),
+ quietMetrics.averageLatencyNanos() / 1000);
+ }
+ }
+
+ // Helper methods
+
+ private DataFix createSimpleFix(String name, int from, int to) {
+ return new DataFix<>() {
+ @Override
+ public @NotNull String name() {
+ return name;
+ }
+
+ @Override
+ public @NotNull DataVersion fromVersion() {
+ return new DataVersion(from);
+ }
+
+ @Override
+ public @NotNull DataVersion toVersion() {
+ return new DataVersion(to);
+ }
+
+ @Override
+ public @NotNull Dynamic apply(
+ @NotNull TypeReference type,
+ @NotNull Dynamic input,
+ @NotNull DataFixerContext context
+ ) {
+ return input.set("processed", input.createBoolean(true));
+ }
+ };
+ }
+}
diff --git a/aether-datafixers-functional-tests/src/test/java/de/splatgames/aether/datafixers/functional/stress/util/StressTestConfig.java b/aether-datafixers-functional-tests/src/test/java/de/splatgames/aether/datafixers/functional/stress/util/StressTestConfig.java
new file mode 100644
index 0000000..efb642f
--- /dev/null
+++ b/aether-datafixers-functional-tests/src/test/java/de/splatgames/aether/datafixers/functional/stress/util/StressTestConfig.java
@@ -0,0 +1,90 @@
+/*
+ * Copyright (c) 2025 Splatgames.de Software and Contributors
+ *
+ * Permission is hereby granted, free of charge, to any person obtaining a copy
+ * of this software and associated documentation files (the "Software"), to deal
+ * in the Software without restriction, including without limitation the rights
+ * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+ * copies of the Software, and to permit persons to whom the Software is
+ * furnished to do so, subject to the following conditions:
+ *
+ * The above copyright notice and this permission notice shall be included in all
+ * copies or substantial portions of the Software.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+ * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+ * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+ * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+ * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+ * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
+ * SOFTWARE.
+ */
+
+package de.splatgames.aether.datafixers.functional.stress.util;
+
+import java.time.Duration;
+
+/**
+ * Configuration holder for stress test parameters.
+ * Reads from system properties with sensible defaults.
+ *
+ * Configurable via Maven system properties or command line:
+ *
+ * mvn verify -Pstress -Dstress.threads=200 -Dstress.duration.minutes=10
+ *
+ */
+public final class StressTestConfig {
+
+ /** Default number of concurrent threads. */
+ public static final int DEFAULT_THREAD_COUNT = 100;
+
+ /** Default number of operations per thread. */
+ public static final int DEFAULT_OPERATIONS_PER_THREAD = 1000;
+
+ /** Default test duration. */
+ public static final Duration DEFAULT_DURATION = Duration.ofMinutes(5);
+
+ private StressTestConfig() {
+ // Utility class
+ }
+
+ /**
+ * Returns the configured thread count.
+ *
+ * @return thread count from {@code stress.threads} system property,
+ * or {@link #DEFAULT_THREAD_COUNT} if not set
+ */
+ public static int threadCount() {
+ return Integer.getInteger("stress.threads", DEFAULT_THREAD_COUNT);
+ }
+
+ /**
+ * Returns the configured operations per thread.
+ *
+ * @return operations per thread from {@code stress.operations.per.thread}
+ * system property, or {@link #DEFAULT_OPERATIONS_PER_THREAD} if not set
+ */
+ public static int operationsPerThread() {
+ return Integer.getInteger("stress.operations.per.thread", DEFAULT_OPERATIONS_PER_THREAD);
+ }
+
+ /**
+ * Returns the configured test duration.
+ *
+ * @return duration from {@code stress.duration.minutes} system property,
+ * or {@link #DEFAULT_DURATION} if not set
+ */
+ public static Duration duration() {
+ long minutes = Long.getLong("stress.duration.minutes", DEFAULT_DURATION.toMinutes());
+ return Duration.ofMinutes(minutes);
+ }
+
+ /**
+ * Returns the total number of operations (threads * operations per thread).
+ *
+ * @return total operation count
+ */
+ public static int totalOperations() {
+ return threadCount() * operationsPerThread();
+ }
+}
diff --git a/aether-datafixers-functional-tests/src/test/java/de/splatgames/aether/datafixers/functional/stress/util/StressTestMetrics.java b/aether-datafixers-functional-tests/src/test/java/de/splatgames/aether/datafixers/functional/stress/util/StressTestMetrics.java
new file mode 100644
index 0000000..14871e7
--- /dev/null
+++ b/aether-datafixers-functional-tests/src/test/java/de/splatgames/aether/datafixers/functional/stress/util/StressTestMetrics.java
@@ -0,0 +1,191 @@
+/*
+ * Copyright (c) 2025 Splatgames.de Software and Contributors
+ *
+ * Permission is hereby granted, free of charge, to any person obtaining a copy
+ * of this software and associated documentation files (the "Software"), to deal
+ * in the Software without restriction, including without limitation the rights
+ * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+ * copies of the Software, and to permit persons to whom the Software is
+ * furnished to do so, subject to the following conditions:
+ *
+ * The above copyright notice and this permission notice shall be included in all
+ * copies or substantial portions of the Software.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+ * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+ * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+ * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+ * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+ * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
+ * SOFTWARE.
+ */
+
+package de.splatgames.aether.datafixers.functional.stress.util;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.time.Duration;
+import java.util.concurrent.atomic.AtomicLong;
+import java.util.concurrent.atomic.LongAdder;
+
+/**
+ * Collects and reports metrics during stress tests.
+ *
+ * Thread-safe metrics collector that tracks:
+ *
+ * - Total operations (success + failure)
+ * - Failed operations
+ * - Latency statistics (sum, min, max)
+ *
+ */
+public final class StressTestMetrics {
+
+ private static final Logger LOGGER = LoggerFactory.getLogger(StressTestMetrics.class);
+
+ private final LongAdder totalOperations = new LongAdder();
+ private final LongAdder failedOperations = new LongAdder();
+ private final LongAdder totalLatencyNanos = new LongAdder();
+ private final AtomicLong minLatencyNanos = new AtomicLong(Long.MAX_VALUE);
+ private final AtomicLong maxLatencyNanos = new AtomicLong(0);
+
+ /**
+ * Records a completed operation.
+ *
+ * @param latencyNanos the operation latency in nanoseconds
+ * @param success true if the operation succeeded, false otherwise
+ */
+ public void recordOperation(long latencyNanos, boolean success) {
+ totalOperations.increment();
+ totalLatencyNanos.add(latencyNanos);
+
+ if (!success) {
+ failedOperations.increment();
+ }
+
+ // Update min/max atomically
+ updateMin(latencyNanos);
+ updateMax(latencyNanos);
+ }
+
+ private void updateMin(long latencyNanos) {
+ long current;
+ do {
+ current = minLatencyNanos.get();
+ if (latencyNanos >= current) {
+ return;
+ }
+ } while (!minLatencyNanos.compareAndSet(current, latencyNanos));
+ }
+
+ private void updateMax(long latencyNanos) {
+ long current;
+ do {
+ current = maxLatencyNanos.get();
+ if (latencyNanos <= current) {
+ return;
+ }
+ } while (!maxLatencyNanos.compareAndSet(current, latencyNanos));
+ }
+
+ /**
+ * Returns the total number of operations.
+ *
+ * @return total operation count
+ */
+ public long totalOperations() {
+ return totalOperations.sum();
+ }
+
+ /**
+ * Returns the number of successful operations.
+ *
+ * @return successful operation count
+ */
+ public long successfulOperations() {
+ return totalOperations.sum() - failedOperations.sum();
+ }
+
+ /**
+ * Returns the number of failed operations.
+ *
+ * @return failed operation count
+ */
+ public long failedOperations() {
+ return failedOperations.sum();
+ }
+
+ /**
+ * Calculates the throughput in operations per second.
+ *
+ * @param elapsed the elapsed time
+ * @return throughput in ops/sec
+ */
+ public double throughputPerSecond(Duration elapsed) {
+ if (elapsed.isZero()) {
+ return 0.0;
+ }
+ return (double) totalOperations.sum() / elapsed.toSeconds();
+ }
+
+ /**
+ * Returns the average latency in nanoseconds.
+ *
+ * @return average latency in nanoseconds, or 0 if no operations recorded
+ */
+ public long averageLatencyNanos() {
+ long total = totalOperations.sum();
+ if (total == 0) {
+ return 0;
+ }
+ return totalLatencyNanos.sum() / total;
+ }
+
+ /**
+ * Returns the minimum recorded latency in nanoseconds.
+ *
+ * @return minimum latency, or 0 if no operations recorded
+ */
+ public long minLatencyNanos() {
+ long min = minLatencyNanos.get();
+ return min == Long.MAX_VALUE ? 0 : min;
+ }
+
+ /**
+ * Returns the maximum recorded latency in nanoseconds.
+ *
+ * @return maximum latency
+ */
+ public long maxLatencyNanos() {
+ return maxLatencyNanos.get();
+ }
+
+ /**
+ * Logs a summary of the collected metrics.
+ *
+ * @param testName the name of the test for logging context
+ * @param elapsed the total elapsed time
+ */
+ public void report(String testName, Duration elapsed) {
+ LOGGER.info("=== Stress Test Report: {} ===", testName);
+ LOGGER.info("Duration: {} ms", elapsed.toMillis());
+ LOGGER.info("Total operations: {}", totalOperations());
+ LOGGER.info("Successful: {}", successfulOperations());
+ LOGGER.info("Failed: {}", failedOperations());
+ LOGGER.info("Throughput: {} ops/sec", String.format("%.2f", throughputPerSecond(elapsed)));
+ LOGGER.info("Latency (avg): {} us", averageLatencyNanos() / 1000);
+ LOGGER.info("Latency (min): {} us", minLatencyNanos() / 1000);
+ LOGGER.info("Latency (max): {} us", maxLatencyNanos() / 1000);
+ }
+
+ /**
+ * Resets all metrics to their initial state.
+ */
+ public void reset() {
+ totalOperations.reset();
+ failedOperations.reset();
+ totalLatencyNanos.reset();
+ minLatencyNanos.set(Long.MAX_VALUE);
+ maxLatencyNanos.set(0);
+ }
+}
diff --git a/aether-datafixers-functional-tests/src/test/java/de/splatgames/aether/datafixers/functional/stress/util/ThreadOrchestrator.java b/aether-datafixers-functional-tests/src/test/java/de/splatgames/aether/datafixers/functional/stress/util/ThreadOrchestrator.java
new file mode 100644
index 0000000..d86a0a8
--- /dev/null
+++ b/aether-datafixers-functional-tests/src/test/java/de/splatgames/aether/datafixers/functional/stress/util/ThreadOrchestrator.java
@@ -0,0 +1,208 @@
+/*
+ * Copyright (c) 2025 Splatgames.de Software and Contributors
+ *
+ * Permission is hereby granted, free of charge, to any person obtaining a copy
+ * of this software and associated documentation files (the "Software"), to deal
+ * in the Software without restriction, including without limitation the rights
+ * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+ * copies of the Software, and to permit persons to whom the Software is
+ * furnished to do so, subject to the following conditions:
+ *
+ * The above copyright notice and this permission notice shall be included in all
+ * copies or substantial portions of the Software.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+ * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+ * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+ * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+ * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+ * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
+ * SOFTWARE.
+ */
+
+package de.splatgames.aether.datafixers.functional.stress.util;
+
+import java.time.Duration;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+/**
+ * Utility for coordinating concurrent test threads.
+ *
+ * Provides a fluent API for:
+ *
+ * - Submitting tasks to a thread pool
+ * - Synchronizing all threads to start simultaneously
+ * - Waiting for completion with timeout
+ * - Collecting exceptions from all threads
+ *
+ *
+ * Example usage:
+ *
{@code
+ * ThreadOrchestrator orchestrator = ThreadOrchestrator.withThreads(100);
+ * for (int i = 0; i < 100; i++) {
+ * orchestrator.submit(() -> {
+ * // Concurrent work
+ * });
+ * }
+ * orchestrator.startAll();
+ * orchestrator.awaitCompletion(Duration.ofMinutes(5));
+ * assertThat(orchestrator.errors()).isEmpty();
+ * }
+ */
+public final class ThreadOrchestrator implements AutoCloseable {
+
+ private final int threadCount;
+ private final ExecutorService executor;
+ private final CountDownLatch startLatch;
+ private final CountDownLatch completionLatch;
+ private final List errors;
+ private final AtomicBoolean started;
+ private int submittedTasks;
+
+ private ThreadOrchestrator(int threadCount) {
+ this.threadCount = threadCount;
+ this.executor = Executors.newFixedThreadPool(threadCount);
+ this.startLatch = new CountDownLatch(1);
+ this.completionLatch = new CountDownLatch(threadCount);
+ this.errors = Collections.synchronizedList(new ArrayList<>());
+ this.started = new AtomicBoolean(false);
+ this.submittedTasks = 0;
+ }
+
+ /**
+ * Creates a new orchestrator with the specified thread count.
+ *
+ * @param threadCount the number of threads
+ * @return a new orchestrator
+ */
+ public static ThreadOrchestrator withThreads(int threadCount) {
+ if (threadCount <= 0) {
+ throw new IllegalArgumentException("Thread count must be positive: " + threadCount);
+ }
+ return new ThreadOrchestrator(threadCount);
+ }
+
+ /**
+ * Submits a task to be executed.
+ *
+ * The task will wait at a barrier until {@link #startAll()} is called,
+ * ensuring all threads begin simultaneously.
+ *
+ * @param task the task to execute
+ * @return this orchestrator for chaining
+ * @throws IllegalStateException if already started or all tasks submitted
+ */
+ public ThreadOrchestrator submit(Runnable task) {
+ if (started.get()) {
+ throw new IllegalStateException("Cannot submit tasks after startAll()");
+ }
+ if (submittedTasks >= threadCount) {
+ throw new IllegalStateException(
+ "Cannot submit more than " + threadCount + " tasks"
+ );
+ }
+
+ executor.submit(() -> {
+ try {
+ // Wait for all threads to be ready
+ startLatch.await();
+ // Execute the actual work
+ task.run();
+ } catch (Throwable t) {
+ errors.add(t);
+ } finally {
+ completionLatch.countDown();
+ }
+ });
+
+ submittedTasks++;
+ return this;
+ }
+
+ /**
+ * Submits multiple identical tasks.
+ *
+ * @param count the number of tasks to submit
+ * @param task the task to execute
+ * @return this orchestrator for chaining
+ */
+ public ThreadOrchestrator submitAll(int count, Runnable task) {
+ for (int i = 0; i < count; i++) {
+ submit(task);
+ }
+ return this;
+ }
+
+ /**
+ * Releases all threads to start executing simultaneously.
+ *
+ * @throws IllegalStateException if not all tasks have been submitted
+ */
+ public void startAll() {
+ if (submittedTasks != threadCount) {
+ throw new IllegalStateException(
+ "Expected " + threadCount + " tasks but only " + submittedTasks + " submitted"
+ );
+ }
+ if (!started.compareAndSet(false, true)) {
+ throw new IllegalStateException("Already started");
+ }
+ startLatch.countDown();
+ }
+
+ /**
+ * Waits for all tasks to complete.
+ *
+ * @param timeout the maximum time to wait
+ * @return true if all tasks completed within timeout, false if timeout elapsed
+ * @throws InterruptedException if interrupted while waiting
+ */
+ public boolean awaitCompletion(Duration timeout) throws InterruptedException {
+ return completionLatch.await(timeout.toMillis(), TimeUnit.MILLISECONDS);
+ }
+
+ /**
+ * Returns the list of exceptions thrown by tasks.
+ *
+ * @return unmodifiable list of exceptions
+ */
+ public List errors() {
+ return Collections.unmodifiableList(new ArrayList<>(errors));
+ }
+
+ /**
+ * Returns true if any tasks threw exceptions.
+ *
+ * @return true if errors occurred
+ */
+ public boolean hasErrors() {
+ return !errors.isEmpty();
+ }
+
+ /**
+ * Shuts down the executor service.
+ */
+ public void shutdown() {
+ executor.shutdown();
+ try {
+ if (!executor.awaitTermination(10, TimeUnit.SECONDS)) {
+ executor.shutdownNow();
+ }
+ } catch (InterruptedException e) {
+ executor.shutdownNow();
+ Thread.currentThread().interrupt();
+ }
+ }
+
+ @Override
+ public void close() {
+ shutdown();
+ }
+}
diff --git a/pom.xml b/pom.xml
index ac74cb0..9595a85 100644
--- a/pom.xml
+++ b/pom.xml
@@ -31,6 +31,11 @@
true
+
+ 100
+ 5
+ 1000
+
26.0.2-1
2.13.2
@@ -398,6 +403,17 @@
+
+
+ stress
+
+ false
+ 100
+ 5
+ 1000
+
+
+
qa