From e37a902d970cd50d590a34b901178578a2d965ac Mon Sep 17 00:00:00 2001 From: Erik Date: Mon, 2 Feb 2026 00:12:53 +0100 Subject: [PATCH 1/2] Add chaos integration tests for random delays and random failures in DataFixer executions. Include both single-threaded and multithreaded scenarios to validate delay handling, failure isolation, and fix chain resilience. --- .github/workflows/ci-nightly.yml | 158 ++++++ aether-datafixers-functional-tests/pom.xml | 10 + .../functional/chaos/RandomDelaysChaosIT.java | 354 +++++++++++++ .../chaos/RandomFailuresChaosIT.java | 423 ++++++++++++++++ .../functional/chaos/util/ChaosInjector.java | 250 ++++++++++ .../functional/chaos/util/FailingDataFix.java | 218 ++++++++ .../HighConcurrencyMigrationStressIT.java | 469 ++++++++++++++++++ .../stress/MemoryPressureStressIT.java | 387 +++++++++++++++ .../stress/MixedRegistryAccessStressIT.java | 288 +++++++++++ .../SustainedLoadMigrationStressIT.java | 405 +++++++++++++++ .../stress/util/StressTestConfig.java | 90 ++++ .../stress/util/StressTestMetrics.java | 191 +++++++ .../stress/util/ThreadOrchestrator.java | 208 ++++++++ pom.xml | 16 + 14 files changed, 3467 insertions(+) create mode 100644 .github/workflows/ci-nightly.yml create mode 100644 aether-datafixers-functional-tests/src/test/java/de/splatgames/aether/datafixers/functional/chaos/RandomDelaysChaosIT.java create mode 100644 aether-datafixers-functional-tests/src/test/java/de/splatgames/aether/datafixers/functional/chaos/RandomFailuresChaosIT.java create mode 100644 aether-datafixers-functional-tests/src/test/java/de/splatgames/aether/datafixers/functional/chaos/util/ChaosInjector.java create mode 100644 aether-datafixers-functional-tests/src/test/java/de/splatgames/aether/datafixers/functional/chaos/util/FailingDataFix.java create mode 100644 aether-datafixers-functional-tests/src/test/java/de/splatgames/aether/datafixers/functional/stress/HighConcurrencyMigrationStressIT.java create mode 100644 aether-datafixers-functional-tests/src/test/java/de/splatgames/aether/datafixers/functional/stress/MemoryPressureStressIT.java create mode 100644 aether-datafixers-functional-tests/src/test/java/de/splatgames/aether/datafixers/functional/stress/MixedRegistryAccessStressIT.java create mode 100644 aether-datafixers-functional-tests/src/test/java/de/splatgames/aether/datafixers/functional/stress/SustainedLoadMigrationStressIT.java create mode 100644 aether-datafixers-functional-tests/src/test/java/de/splatgames/aether/datafixers/functional/stress/util/StressTestConfig.java create mode 100644 aether-datafixers-functional-tests/src/test/java/de/splatgames/aether/datafixers/functional/stress/util/StressTestMetrics.java create mode 100644 aether-datafixers-functional-tests/src/test/java/de/splatgames/aether/datafixers/functional/stress/util/ThreadOrchestrator.java diff --git a/.github/workflows/ci-nightly.yml b/.github/workflows/ci-nightly.yml new file mode 100644 index 0000000..2e1158d --- /dev/null +++ b/.github/workflows/ci-nightly.yml @@ -0,0 +1,158 @@ +name: Nightly Stress Tests + +on: + schedule: + # Run at 2 AM UTC every day + - cron: '0 2 * * *' + workflow_dispatch: + inputs: + duration_minutes: + description: 'Test duration in minutes' + required: false + default: '5' + thread_count: + description: 'Number of concurrent threads' + required: false + default: '100' + test_groups: + description: 'Test groups to run (comma-separated: stress, chaos)' + required: false + default: 'stress,chaos' + +permissions: + contents: read + checks: write + +jobs: + stress-tests: + name: Stress Tests (Java ${{ matrix.java }}) + runs-on: ubuntu-latest + strategy: + fail-fast: false + matrix: + java: ['17', '21'] + + steps: + - name: Checkout repository + uses: actions/checkout@v6 + with: + fetch-depth: 1 + + - name: Set up JDK ${{ matrix.java }} + uses: actions/setup-java@v5 + with: + java-version: ${{ matrix.java }} + distribution: 'temurin' + cache: 'maven' + + - name: Run stress tests + run: | + mvn -B clean verify -Pstress \ + -pl aether-datafixers-functional-tests \ + -Dgroups=${{ github.event.inputs.test_groups || 'stress,chaos' }} \ + -Dstress.threads=${{ github.event.inputs.thread_count || '100' }} \ + -Dstress.duration.minutes=${{ github.event.inputs.duration_minutes || '5' }} \ + -Ddependency-check.skip=true + timeout-minutes: 60 + + - name: Upload stress test results + uses: actions/upload-artifact@v6 + if: always() + with: + name: stress-test-results-java${{ matrix.java }} + path: | + **/target/failsafe-reports/TEST-*.xml + **/target/failsafe-reports/*.txt + retention-days: 30 + + chaos-tests: + name: Chaos Tests (Java 21) + runs-on: ubuntu-latest + needs: stress-tests + if: always() && (needs.stress-tests.result == 'success' || needs.stress-tests.result == 'failure') + + steps: + - name: Checkout repository + uses: actions/checkout@v6 + with: + fetch-depth: 1 + + - name: Set up JDK 21 + uses: actions/setup-java@v5 + with: + java-version: '21' + distribution: 'temurin' + cache: 'maven' + + - name: Run chaos tests only + run: | + mvn -B clean verify -Pit \ + -pl aether-datafixers-functional-tests \ + -Dgroups=chaos \ + -Ddependency-check.skip=true + timeout-minutes: 30 + + - name: Upload chaos test results + uses: actions/upload-artifact@v6 + if: always() + with: + name: chaos-test-results + path: | + **/target/failsafe-reports/TEST-*.xml + retention-days: 30 + + reports: + name: Test Reports + runs-on: ubuntu-latest + needs: [stress-tests, chaos-tests] + if: always() + + permissions: + contents: read + checks: write + + steps: + - name: Download stress test results (Java 17) + uses: actions/download-artifact@v4 + continue-on-error: true + with: + name: stress-test-results-java17 + path: reports/stress-java17 + + - name: Download stress test results (Java 21) + uses: actions/download-artifact@v4 + continue-on-error: true + with: + name: stress-test-results-java21 + path: reports/stress-java21 + + - name: Download chaos test results + uses: actions/download-artifact@v4 + continue-on-error: true + with: + name: chaos-test-results + path: reports/chaos + + - name: Publish Stress Test Report (Java 17) + uses: mikepenz/action-junit-report@v6 + if: always() + continue-on-error: true + with: + report_paths: 'reports/stress-java17/**/TEST-*.xml' + check_name: Stress Test Report (Java 17) + + - name: Publish Stress Test Report (Java 21) + uses: mikepenz/action-junit-report@v6 + if: always() + continue-on-error: true + with: + report_paths: 'reports/stress-java21/**/TEST-*.xml' + check_name: Stress Test Report (Java 21) + + - name: Publish Chaos Test Report + uses: mikepenz/action-junit-report@v6 + if: always() + continue-on-error: true + with: + report_paths: 'reports/chaos/**/TEST-*.xml' + check_name: Chaos Test Report diff --git a/aether-datafixers-functional-tests/pom.xml b/aether-datafixers-functional-tests/pom.xml index 7547f3b..4193bc7 100644 --- a/aether-datafixers-functional-tests/pom.xml +++ b/aether-datafixers-functional-tests/pom.xml @@ -139,6 +139,7 @@ + org.apache.maven.plugins maven-failsafe-plugin @@ -148,6 +149,15 @@ **/*IT.java **/*E2E.java + + + ${stress.threads} + ${stress.duration.minutes} + ${stress.operations.per.thread} + + + -Xms512M -Xmx2G + 1800 diff --git a/aether-datafixers-functional-tests/src/test/java/de/splatgames/aether/datafixers/functional/chaos/RandomDelaysChaosIT.java b/aether-datafixers-functional-tests/src/test/java/de/splatgames/aether/datafixers/functional/chaos/RandomDelaysChaosIT.java new file mode 100644 index 0000000..0a33f02 --- /dev/null +++ b/aether-datafixers-functional-tests/src/test/java/de/splatgames/aether/datafixers/functional/chaos/RandomDelaysChaosIT.java @@ -0,0 +1,354 @@ +/* + * Copyright (c) 2025 Splatgames.de Software and Contributors + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package de.splatgames.aether.datafixers.functional.chaos; + +import com.google.gson.JsonElement; +import com.google.gson.JsonObject; +import de.splatgames.aether.datafixers.api.DataVersion; +import de.splatgames.aether.datafixers.api.TypeReference; +import de.splatgames.aether.datafixers.api.dynamic.Dynamic; +import de.splatgames.aether.datafixers.api.fix.DataFix; +import de.splatgames.aether.datafixers.api.fix.DataFixer; +import de.splatgames.aether.datafixers.api.fix.DataFixerContext; +import de.splatgames.aether.datafixers.codec.json.gson.GsonOps; +import de.splatgames.aether.datafixers.core.fix.DataFixerBuilder; +import de.splatgames.aether.datafixers.functional.chaos.util.ChaosInjector; +import de.splatgames.aether.datafixers.functional.stress.util.ThreadOrchestrator; +import org.jetbrains.annotations.NotNull; +import org.junit.jupiter.api.DisplayName; +import org.junit.jupiter.api.Nested; +import org.junit.jupiter.api.Tag; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.Timeout; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.time.Duration; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; + +import static org.assertj.core.api.Assertions.assertThat; + +/** + * Chaos tests introducing random delays into fix execution. + * + *

Tests that the DataFixer system correctly handles: + *

+ * + *

Run with: {@code mvn verify -Pit -Dgroups=chaos} + */ +@DisplayName("Random Delays Chaos Tests") +@Tag("chaos") +@Tag("integration") +class RandomDelaysChaosIT { + + private static final Logger LOGGER = LoggerFactory.getLogger(RandomDelaysChaosIT.class); + private static final TypeReference CHAOS_TYPE = new TypeReference("chaos_delay_entity"); + + @Nested + @DisplayName("Single Thread Delays") + class SingleThreadDelays { + + @Test + @Timeout(value = 5, unit = TimeUnit.MINUTES) + @DisplayName("completes correctly with random delays") + void completesCorrectlyWithRandomDelays() { + int operationCount = 100; + ChaosInjector chaos = ChaosInjector.builder() + .delayRange(1, 50) + .build(); + + AtomicInteger fixApplyCount = new AtomicInteger(0); + + DataFixer fixer = new DataFixerBuilder(new DataVersion(2)) + .addFix(CHAOS_TYPE, createDelayingFix("delay_fix", 1, 2, chaos, fixApplyCount)) + .build(); + + for (int i = 0; i < operationCount; i++) { + JsonObject inputObj = new JsonObject(); + inputObj.addProperty("id", i); + inputObj.addProperty("sequence", i); + Dynamic input = new Dynamic<>(GsonOps.INSTANCE, inputObj); + + Dynamic result = fixer.update( + CHAOS_TYPE, input, + new DataVersion(1), new DataVersion(2) + ); + + // Verify result correctness despite delay + assertThat(result.get("processed").asBoolean().result()).contains(true); + assertThat(result.get("id").asInt().result()).contains(i); + } + + assertThat(fixApplyCount.get()).isEqualTo(operationCount); + LOGGER.info("Completed {} operations with {} delays injected", + operationCount, chaos.delayCount()); + } + + @Test + @Timeout(value = 5, unit = TimeUnit.MINUTES) + @DisplayName("handles extreme delay variations in fix chain") + void handlesExtremeDelayVariationsInChain() { + int chainLength = 5; + int operationCount = 50; + + DataFixerBuilder builder = new DataFixerBuilder(new DataVersion(chainLength + 1)); + + // Each fix in the chain has different delay characteristics + for (int v = 1; v <= chainLength; v++) { + final int version = v; + // Vary delay range: some instant, some with significant delay + ChaosInjector chaos = (v % 2 == 0) + ? ChaosInjector.builder().delayRange(0, 100).build() + : ChaosInjector.builder().build(); // No delay + + builder.addFix(CHAOS_TYPE, new DataFix() { + @Override + public @NotNull String name() { + return "chain_delay_fix_v" + version; + } + + @Override + public @NotNull DataVersion fromVersion() { + return new DataVersion(version); + } + + @Override + public @NotNull DataVersion toVersion() { + return new DataVersion(version + 1); + } + + @Override + public @NotNull Dynamic apply( + @NotNull TypeReference type, + @NotNull Dynamic input, + @NotNull DataFixerContext context + ) { + chaos.maybeDelay(); + int step = input.get("step").asInt().result().orElse(0); + return input.set("step", input.createInt(step + 1)); + } + }); + } + + DataFixer fixer = builder.build(); + + for (int i = 0; i < operationCount; i++) { + JsonObject inputObj = new JsonObject(); + inputObj.addProperty("id", i); + inputObj.addProperty("step", 0); + Dynamic input = new Dynamic<>(GsonOps.INSTANCE, inputObj); + + Dynamic result = fixer.update( + CHAOS_TYPE, input, + new DataVersion(1), new DataVersion(chainLength + 1) + ); + + // Verify all fixes in chain were applied correctly + assertThat(result.get("step").asInt().result()) + .as("All chain fixes applied for operation " + i) + .contains(chainLength); + } + + LOGGER.info("Completed {} chain operations with varying delays", operationCount); + } + } + + @Nested + @DisplayName("Concurrent Delays") + class ConcurrentDelays { + + @Test + @Timeout(value = 5, unit = TimeUnit.MINUTES) + @DisplayName("handles random delays in concurrent execution") + void handlesRandomDelaysInConcurrentExecution() throws Exception { + int threadCount = 50; + int opsPerThread = 100; + + ChaosInjector chaos = ChaosInjector.builder() + .delayRange(1, 20) + .build(); + + AtomicInteger totalOps = new AtomicInteger(0); + + DataFixer fixer = new DataFixerBuilder(new DataVersion(2)) + .addFix(CHAOS_TYPE, createDelayingFix("concurrent_delay_fix", 1, 2, chaos, totalOps)) + .build(); + + try (ThreadOrchestrator orchestrator = ThreadOrchestrator.withThreads(threadCount)) { + for (int t = 0; t < threadCount; t++) { + final int threadId = t; + orchestrator.submit(() -> { + for (int i = 0; i < opsPerThread; i++) { + JsonObject inputObj = new JsonObject(); + inputObj.addProperty("id", threadId * 10_000L + i); + inputObj.addProperty("threadId", threadId); + Dynamic input = new Dynamic<>(GsonOps.INSTANCE, inputObj); + + Dynamic result = fixer.update( + CHAOS_TYPE, input, + new DataVersion(1), new DataVersion(2) + ); + + // Verify data integrity + assertThat(result.get("processed").asBoolean().result()) + .contains(true); + assertThat(result.get("threadId").asInt().result()) + .contains(threadId); + } + }); + } + + orchestrator.startAll(); + boolean completed = orchestrator.awaitCompletion(Duration.ofMinutes(5)); + + assertThat(completed).isTrue(); + assertThat(orchestrator.errors()).isEmpty(); + assertThat(totalOps.get()).isEqualTo(threadCount * opsPerThread); + } + + LOGGER.info("Completed {} concurrent operations with {} delays", + totalOps.get(), chaos.delayCount()); + } + + @Test + @Timeout(value = 5, unit = TimeUnit.MINUTES) + @DisplayName("maintains ordering within each thread despite delays") + void maintainsOrderingWithinEachThread() throws Exception { + int threadCount = 20; + int opsPerThread = 50; + + ChaosInjector chaos = ChaosInjector.builder() + .delayRange(1, 30) + .build(); + + DataFixer fixer = new DataFixerBuilder(new DataVersion(2)) + .addFix(CHAOS_TYPE, new DataFix() { + @Override + public @NotNull String name() { + return "ordering_delay_fix"; + } + + @Override + public @NotNull DataVersion fromVersion() { + return new DataVersion(1); + } + + @Override + public @NotNull DataVersion toVersion() { + return new DataVersion(2); + } + + @Override + public @NotNull Dynamic apply( + @NotNull TypeReference type, + @NotNull Dynamic input, + @NotNull DataFixerContext context + ) { + chaos.maybeDelay(); + long timestamp = System.nanoTime(); + return input + .set("processed", input.createBoolean(true)) + .set("processedAt", input.createLong(timestamp)); + } + }) + .build(); + + try (ThreadOrchestrator orchestrator = ThreadOrchestrator.withThreads(threadCount)) { + for (int t = 0; t < threadCount; t++) { + final int threadId = t; + orchestrator.submit(() -> { + long lastTimestamp = 0; + for (int i = 0; i < opsPerThread; i++) { + JsonObject inputObj = new JsonObject(); + inputObj.addProperty("id", threadId * 10_000L + i); + inputObj.addProperty("sequence", i); + Dynamic input = new Dynamic<>(GsonOps.INSTANCE, inputObj); + + Dynamic result = fixer.update( + CHAOS_TYPE, input, + new DataVersion(1), new DataVersion(2) + ); + + long currentTimestamp = result.get("processedAt").asLong().result().orElse(0L); + + // Within a single thread, processing should be sequential + assertThat(currentTimestamp) + .as("Ordering within thread " + threadId) + .isGreaterThanOrEqualTo(lastTimestamp); + + lastTimestamp = currentTimestamp; + } + }); + } + + orchestrator.startAll(); + boolean completed = orchestrator.awaitCompletion(Duration.ofMinutes(5)); + + assertThat(completed).isTrue(); + assertThat(orchestrator.errors()).isEmpty(); + } + + LOGGER.info("Verified ordering across {} threads with delays", threadCount); + } + } + + // Helper methods + + private DataFix createDelayingFix( + String name, int from, int to, + ChaosInjector chaos, AtomicInteger counter + ) { + return new DataFix<>() { + @Override + public @NotNull String name() { + return name; + } + + @Override + public @NotNull DataVersion fromVersion() { + return new DataVersion(from); + } + + @Override + public @NotNull DataVersion toVersion() { + return new DataVersion(to); + } + + @Override + public @NotNull Dynamic apply( + @NotNull TypeReference type, + @NotNull Dynamic input, + @NotNull DataFixerContext context + ) { + chaos.maybeDelay(); + counter.incrementAndGet(); + return input.set("processed", input.createBoolean(true)); + } + }; + } +} diff --git a/aether-datafixers-functional-tests/src/test/java/de/splatgames/aether/datafixers/functional/chaos/RandomFailuresChaosIT.java b/aether-datafixers-functional-tests/src/test/java/de/splatgames/aether/datafixers/functional/chaos/RandomFailuresChaosIT.java new file mode 100644 index 0000000..bf84494 --- /dev/null +++ b/aether-datafixers-functional-tests/src/test/java/de/splatgames/aether/datafixers/functional/chaos/RandomFailuresChaosIT.java @@ -0,0 +1,423 @@ +/* + * Copyright (c) 2025 Splatgames.de Software and Contributors + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package de.splatgames.aether.datafixers.functional.chaos; + +import com.google.gson.JsonElement; +import com.google.gson.JsonObject; +import de.splatgames.aether.datafixers.api.DataVersion; +import de.splatgames.aether.datafixers.api.TypeReference; +import de.splatgames.aether.datafixers.api.dynamic.Dynamic; +import de.splatgames.aether.datafixers.api.fix.DataFix; +import de.splatgames.aether.datafixers.api.fix.DataFixer; +import de.splatgames.aether.datafixers.api.fix.DataFixerContext; +import de.splatgames.aether.datafixers.codec.json.gson.GsonOps; +import de.splatgames.aether.datafixers.core.fix.DataFixerBuilder; +import de.splatgames.aether.datafixers.functional.chaos.util.ChaosInjector; +import de.splatgames.aether.datafixers.functional.stress.util.ThreadOrchestrator; +import org.jetbrains.annotations.NotNull; +import org.junit.jupiter.api.DisplayName; +import org.junit.jupiter.api.Nested; +import org.junit.jupiter.api.Tag; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.Timeout; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.time.Duration; +import java.util.Set; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; + +import static org.assertj.core.api.Assertions.assertThat; +import static org.assertj.core.api.Assertions.assertThatThrownBy; + +/** + * Chaos tests with random failures in fix execution. + * + *

Tests that the DataFixer system correctly handles: + *

+ * + *

Run with: {@code mvn verify -Pit -Dgroups=chaos} + */ +@DisplayName("Random Failures Chaos Tests") +@Tag("chaos") +@Tag("integration") +class RandomFailuresChaosIT { + + private static final Logger LOGGER = LoggerFactory.getLogger(RandomFailuresChaosIT.class); + private static final TypeReference CHAOS_TYPE = new TypeReference("chaos_failure_entity"); + + @Nested + @DisplayName("Single Thread Failures") + class SingleThreadFailures { + + @Test + @Timeout(value = 2, unit = TimeUnit.MINUTES) + @DisplayName("handles sporadic fix failures") + void handlesSporadicFixFailures() { + int operationCount = 200; + double failureProbability = 0.05; // 5% failure rate + + ChaosInjector chaos = ChaosInjector.withFailureProbability(failureProbability); + AtomicInteger successCount = new AtomicInteger(0); + AtomicInteger failureCount = new AtomicInteger(0); + + DataFixer fixer = new DataFixerBuilder(new DataVersion(2)) + .addFix(CHAOS_TYPE, createFailingFix("sporadic_fix", 1, 2, chaos)) + .build(); + + for (int i = 0; i < operationCount; i++) { + JsonObject inputObj = new JsonObject(); + inputObj.addProperty("id", i); + Dynamic input = new Dynamic<>(GsonOps.INSTANCE, inputObj); + + try { + Dynamic result = fixer.update( + CHAOS_TYPE, input, + new DataVersion(1), new DataVersion(2) + ); + + assertThat(result.get("processed").asBoolean().result()).contains(true); + successCount.incrementAndGet(); + } catch (ChaosInjector.ChaosException e) { + // Expected chaos failure + failureCount.incrementAndGet(); + } + } + + LOGGER.info("Operations: {}, Success: {}, Failures: {}", + operationCount, successCount.get(), failureCount.get()); + + // With 5% failure rate over 200 operations, we expect some failures + assertThat(failureCount.get()) + .as("Some failures should have occurred") + .isGreaterThan(0); + + // But not all should fail + assertThat(successCount.get()) + .as("Most operations should succeed") + .isGreaterThan(operationCount / 2); + } + + @Test + @Timeout(value = 2, unit = TimeUnit.MINUTES) + @DisplayName("propagates fix exceptions correctly") + void propagatesFixExceptionsCorrectly() { + ChaosInjector chaos = ChaosInjector.withFailureProbability(1.0); // Always fail + + DataFixer fixer = new DataFixerBuilder(new DataVersion(2)) + .addFix(CHAOS_TYPE, createFailingFix("always_fail_fix", 1, 2, chaos)) + .build(); + + JsonObject inputObj = new JsonObject(); + inputObj.addProperty("id", 1); + Dynamic input = new Dynamic<>(GsonOps.INSTANCE, inputObj); + + assertThatThrownBy(() -> fixer.update( + CHAOS_TYPE, input, + new DataVersion(1), new DataVersion(2) + )) + .isInstanceOf(ChaosInjector.ChaosException.class) + .hasMessageContaining("Chaos-injected failure"); + } + + @Test + @Timeout(value = 2, unit = TimeUnit.MINUTES) + @DisplayName("failure in chain stops subsequent fixes") + void failureInChainStopsSubsequentFixes() { + AtomicInteger fix1Count = new AtomicInteger(0); + AtomicInteger fix2Count = new AtomicInteger(0); + AtomicInteger fix3Count = new AtomicInteger(0); + + ChaosInjector alwaysFail = ChaosInjector.withFailureProbability(1.0); + ChaosInjector neverFail = ChaosInjector.withFailureProbability(0.0); + + DataFixer fixer = new DataFixerBuilder(new DataVersion(4)) + .addFix(CHAOS_TYPE, createCountingFix("chain_fix_1", 1, 2, neverFail, fix1Count)) + .addFix(CHAOS_TYPE, createCountingFix("chain_fix_2", 2, 3, alwaysFail, fix2Count)) + .addFix(CHAOS_TYPE, createCountingFix("chain_fix_3", 3, 4, neverFail, fix3Count)) + .build(); + + JsonObject inputObj = new JsonObject(); + inputObj.addProperty("id", 1); + Dynamic input = new Dynamic<>(GsonOps.INSTANCE, inputObj); + + assertThatThrownBy(() -> fixer.update( + CHAOS_TYPE, input, + new DataVersion(1), new DataVersion(4) + )).isInstanceOf(ChaosInjector.ChaosException.class); + + // Fix 1 should have run + assertThat(fix1Count.get()).isEqualTo(1); + // Fix 2 should have run and failed + assertThat(fix2Count.get()).isEqualTo(1); + // Fix 3 should NOT have run due to failure in fix 2 + assertThat(fix3Count.get()).isEqualTo(0); + } + } + + @Nested + @DisplayName("Concurrent Failure Isolation") + class ConcurrentFailureIsolation { + + @Test + @Timeout(value = 5, unit = TimeUnit.MINUTES) + @DisplayName("isolates failures between threads") + void isolatesFailuresBetweenThreads() throws Exception { + int threadCount = 50; + int opsPerThread = 100; + double failureProbability = 0.1; // 10% failure rate + + ChaosInjector chaos = ChaosInjector.withFailureProbability(failureProbability); + AtomicInteger totalSuccess = new AtomicInteger(0); + AtomicInteger totalFailure = new AtomicInteger(0); + Set threadsWithSuccess = ConcurrentHashMap.newKeySet(); + Set threadsWithFailure = ConcurrentHashMap.newKeySet(); + + DataFixer fixer = new DataFixerBuilder(new DataVersion(2)) + .addFix(CHAOS_TYPE, createFailingFix("isolation_fix", 1, 2, chaos)) + .build(); + + try (ThreadOrchestrator orchestrator = ThreadOrchestrator.withThreads(threadCount)) { + for (int t = 0; t < threadCount; t++) { + final int threadId = t; + orchestrator.submit(() -> { + for (int i = 0; i < opsPerThread; i++) { + JsonObject inputObj = new JsonObject(); + inputObj.addProperty("id", threadId * 10_000L + i); + inputObj.addProperty("threadId", threadId); + Dynamic input = new Dynamic<>(GsonOps.INSTANCE, inputObj); + + try { + Dynamic result = fixer.update( + CHAOS_TYPE, input, + new DataVersion(1), new DataVersion(2) + ); + + // Verify the thread's data wasn't corrupted by another thread's failure + assertThat(result.get("threadId").asInt().result()) + .contains(threadId); + + totalSuccess.incrementAndGet(); + threadsWithSuccess.add(threadId); + } catch (ChaosInjector.ChaosException e) { + totalFailure.incrementAndGet(); + threadsWithFailure.add(threadId); + } + } + }); + } + + orchestrator.startAll(); + boolean completed = orchestrator.awaitCompletion(Duration.ofMinutes(5)); + + // We expect the orchestrator to complete even with failures + // because individual operation failures don't stop the thread + assertThat(completed).isTrue(); + // Orchestrator should not collect ChaosExceptions as errors + // since they're caught within the task + assertThat(orchestrator.errors()).isEmpty(); + } + + LOGGER.info("Total operations: {}, Success: {}, Failures: {}", + threadCount * opsPerThread, totalSuccess.get(), totalFailure.get()); + LOGGER.info("Threads with success: {}, Threads with failure: {}", + threadsWithSuccess.size(), threadsWithFailure.size()); + + // Verify both successes and failures occurred across different threads + assertThat(totalSuccess.get()) + .as("Some operations should succeed") + .isGreaterThan(0); + assertThat(totalFailure.get()) + .as("Some operations should fail") + .isGreaterThan(0); + + // Most threads should have had both successes and failures + // (with 10% failure rate over 100 ops, very likely to see both) + assertThat(threadsWithSuccess.size()) + .as("Multiple threads should have successes") + .isGreaterThan(threadCount / 2); + } + + @Test + @Timeout(value = 5, unit = TimeUnit.MINUTES) + @DisplayName("fixer remains functional after failures") + void fixerRemainsFunctionalAfterFailures() throws Exception { + int phases = 3; + int opsPerPhase = 100; + + // Phase 1: Some failures + // Phase 2: No failures + // Phase 3: Some failures again + // Verify fixer works correctly throughout + + AtomicInteger phase1Success = new AtomicInteger(0); + AtomicInteger phase1Failure = new AtomicInteger(0); + AtomicInteger phase2Success = new AtomicInteger(0); + AtomicInteger phase3Success = new AtomicInteger(0); + AtomicInteger phase3Failure = new AtomicInteger(0); + + ChaosInjector chaosWith10Percent = ChaosInjector.withFailureProbability(0.1); + ChaosInjector chaosNoFailure = ChaosInjector.withFailureProbability(0.0); + + // Use a single fixer throughout all phases + DataFixer fixer = new DataFixerBuilder(new DataVersion(2)) + .addFix(CHAOS_TYPE, createFailingFix("resilience_fix", 1, 2, chaosWith10Percent)) + .build(); + + // Phase 1: With failures + for (int i = 0; i < opsPerPhase; i++) { + JsonObject inputObj = new JsonObject(); + inputObj.addProperty("id", i); + inputObj.addProperty("phase", 1); + Dynamic input = new Dynamic<>(GsonOps.INSTANCE, inputObj); + + try { + fixer.update(CHAOS_TYPE, input, new DataVersion(1), new DataVersion(2)); + phase1Success.incrementAndGet(); + } catch (ChaosInjector.ChaosException e) { + phase1Failure.incrementAndGet(); + } + } + + // Create a new fixer for phase 2 with no failures to verify isolation + DataFixer fixer2 = new DataFixerBuilder(new DataVersion(2)) + .addFix(CHAOS_TYPE, createFailingFix("stable_fix", 1, 2, chaosNoFailure)) + .build(); + + // Phase 2: No failures - verify fixer still works + for (int i = 0; i < opsPerPhase; i++) { + JsonObject inputObj = new JsonObject(); + inputObj.addProperty("id", i); + inputObj.addProperty("phase", 2); + Dynamic input = new Dynamic<>(GsonOps.INSTANCE, inputObj); + + Dynamic result = fixer2.update( + CHAOS_TYPE, input, + new DataVersion(1), new DataVersion(2) + ); + + assertThat(result.get("processed").asBoolean().result()).contains(true); + phase2Success.incrementAndGet(); + } + + // Phase 3: With failures again using original fixer + for (int i = 0; i < opsPerPhase; i++) { + JsonObject inputObj = new JsonObject(); + inputObj.addProperty("id", i); + inputObj.addProperty("phase", 3); + Dynamic input = new Dynamic<>(GsonOps.INSTANCE, inputObj); + + try { + fixer.update(CHAOS_TYPE, input, new DataVersion(1), new DataVersion(2)); + phase3Success.incrementAndGet(); + } catch (ChaosInjector.ChaosException e) { + phase3Failure.incrementAndGet(); + } + } + + LOGGER.info("Phase 1: {} success, {} failure", phase1Success.get(), phase1Failure.get()); + LOGGER.info("Phase 2: {} success (should be 100%)", phase2Success.get()); + LOGGER.info("Phase 3: {} success, {} failure", phase3Success.get(), phase3Failure.get()); + + // Phase 2 should have 100% success + assertThat(phase2Success.get()).isEqualTo(opsPerPhase); + + // Phase 1 and 3 should have some failures (10% rate) + assertThat(phase1Failure.get()).isGreaterThan(0); + assertThat(phase3Failure.get()).isGreaterThan(0); + } + } + + // Helper methods + + private DataFix createFailingFix( + String name, int from, int to, + ChaosInjector chaos + ) { + return new DataFix<>() { + @Override + public @NotNull String name() { + return name; + } + + @Override + public @NotNull DataVersion fromVersion() { + return new DataVersion(from); + } + + @Override + public @NotNull DataVersion toVersion() { + return new DataVersion(to); + } + + @Override + public @NotNull Dynamic apply( + @NotNull TypeReference type, + @NotNull Dynamic input, + @NotNull DataFixerContext context + ) { + chaos.maybeFail(); + return input.set("processed", input.createBoolean(true)); + } + }; + } + + private DataFix createCountingFix( + String name, int from, int to, + ChaosInjector chaos, AtomicInteger counter + ) { + return new DataFix<>() { + @Override + public @NotNull String name() { + return name; + } + + @Override + public @NotNull DataVersion fromVersion() { + return new DataVersion(from); + } + + @Override + public @NotNull DataVersion toVersion() { + return new DataVersion(to); + } + + @Override + public @NotNull Dynamic apply( + @NotNull TypeReference type, + @NotNull Dynamic input, + @NotNull DataFixerContext context + ) { + counter.incrementAndGet(); + chaos.maybeFail(); + return input.set("processed", input.createBoolean(true)); + } + }; + } +} diff --git a/aether-datafixers-functional-tests/src/test/java/de/splatgames/aether/datafixers/functional/chaos/util/ChaosInjector.java b/aether-datafixers-functional-tests/src/test/java/de/splatgames/aether/datafixers/functional/chaos/util/ChaosInjector.java new file mode 100644 index 0000000..33dbf47 --- /dev/null +++ b/aether-datafixers-functional-tests/src/test/java/de/splatgames/aether/datafixers/functional/chaos/util/ChaosInjector.java @@ -0,0 +1,250 @@ +/* + * Copyright (c) 2025 Splatgames.de Software and Contributors + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package de.splatgames.aether.datafixers.functional.chaos.util; + +import java.util.SplittableRandom; +import java.util.concurrent.atomic.AtomicLong; + +/** + * Injects chaos into test execution for chaos engineering tests. + * + *

Provides controlled randomization for: + *

+ * + *

Uses {@link SplittableRandom} for high-performance, thread-local randomness + * without atomic contention. + * + *

Example usage: + *

{@code
+ * ChaosInjector chaos = ChaosInjector.builder()
+ *     .failureProbability(0.05)  // 5% failure rate
+ *     .delayRange(1, 50)         // 1-50ms random delays
+ *     .build();
+ *
+ * // In test code:
+ * chaos.maybeDelay();
+ * chaos.maybeFail("Simulated failure");
+ * }
+ */ +public final class ChaosInjector { + + private final double failureProbability; + private final int minDelayMs; + private final int maxDelayMs; + private final ThreadLocal random; + private final AtomicLong delayCount = new AtomicLong(); + private final AtomicLong failureCount = new AtomicLong(); + + private ChaosInjector(Builder builder) { + this.failureProbability = builder.failureProbability; + this.minDelayMs = builder.minDelayMs; + this.maxDelayMs = builder.maxDelayMs; + this.random = ThreadLocal.withInitial(SplittableRandom::new); + } + + /** + * Creates a builder for configuring a ChaosInjector. + * + * @return a new builder + */ + public static Builder builder() { + return new Builder(); + } + + /** + * Creates a ChaosInjector with only failure probability configured. + * + * @param probability the probability of failure (0.0 to 1.0) + * @return a new ChaosInjector + */ + public static ChaosInjector withFailureProbability(double probability) { + return builder().failureProbability(probability).build(); + } + + /** + * Creates a ChaosInjector with only random delays configured. + * + * @param minMs minimum delay in milliseconds + * @param maxMs maximum delay in milliseconds + * @return a new ChaosInjector + */ + public static ChaosInjector withRandomDelay(int minMs, int maxMs) { + return builder().delayRange(minMs, maxMs).build(); + } + + /** + * Introduces a random delay based on configured parameters. + * + *

Does nothing if no delay range is configured. + */ + public void maybeDelay() { + if (maxDelayMs <= 0) { + return; + } + + int delay = minDelayMs + random.get().nextInt(maxDelayMs - minDelayMs + 1); + if (delay > 0) { + try { + Thread.sleep(delay); + delayCount.incrementAndGet(); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + } + } + } + + /** + * Throws an exception based on configured failure probability. + * + * @param message the exception message if failure is triggered + * @throws ChaosException if random chance triggers failure + */ + public void maybeFail(String message) { + if (failureProbability <= 0) { + return; + } + + if (random.get().nextDouble() < failureProbability) { + failureCount.incrementAndGet(); + throw new ChaosException(message); + } + } + + /** + * Throws an exception based on configured failure probability. + * + *

Uses a default message. + * + * @throws ChaosException if random chance triggers failure + */ + public void maybeFail() { + maybeFail("Chaos-injected failure"); + } + + /** + * Returns true based on configured failure probability. + * + *

Useful for conditional logic instead of exceptions. + * + * @return true if failure should be simulated + */ + public boolean shouldFail() { + if (failureProbability <= 0) { + return false; + } + return random.get().nextDouble() < failureProbability; + } + + /** + * Returns the total number of delays that were injected. + * + * @return delay count + */ + public long delayCount() { + return delayCount.get(); + } + + /** + * Returns the total number of failures that were injected. + * + * @return failure count + */ + public long failureCount() { + return failureCount.get(); + } + + /** + * Resets the delay and failure counters. + */ + public void resetCounters() { + delayCount.set(0); + failureCount.set(0); + } + + /** + * Builder for ChaosInjector. + */ + public static final class Builder { + private double failureProbability = 0.0; + private int minDelayMs = 0; + private int maxDelayMs = 0; + + private Builder() { + } + + /** + * Sets the failure probability. + * + * @param probability value between 0.0 (never fail) and 1.0 (always fail) + * @return this builder + */ + public Builder failureProbability(double probability) { + if (probability < 0.0 || probability > 1.0) { + throw new IllegalArgumentException( + "Probability must be between 0.0 and 1.0: " + probability + ); + } + this.failureProbability = probability; + return this; + } + + /** + * Sets the random delay range. + * + * @param minMs minimum delay in milliseconds (inclusive) + * @param maxMs maximum delay in milliseconds (inclusive) + * @return this builder + */ + public Builder delayRange(int minMs, int maxMs) { + if (minMs < 0 || maxMs < minMs) { + throw new IllegalArgumentException( + "Invalid delay range: " + minMs + " - " + maxMs + ); + } + this.minDelayMs = minMs; + this.maxDelayMs = maxMs; + return this; + } + + /** + * Builds the ChaosInjector. + * + * @return a new ChaosInjector + */ + public ChaosInjector build() { + return new ChaosInjector(this); + } + } + + /** + * Exception thrown by chaos injection. + */ + public static final class ChaosException extends RuntimeException { + public ChaosException(String message) { + super(message); + } + } +} diff --git a/aether-datafixers-functional-tests/src/test/java/de/splatgames/aether/datafixers/functional/chaos/util/FailingDataFix.java b/aether-datafixers-functional-tests/src/test/java/de/splatgames/aether/datafixers/functional/chaos/util/FailingDataFix.java new file mode 100644 index 0000000..debc03e --- /dev/null +++ b/aether-datafixers-functional-tests/src/test/java/de/splatgames/aether/datafixers/functional/chaos/util/FailingDataFix.java @@ -0,0 +1,218 @@ +/* + * Copyright (c) 2025 Splatgames.de Software and Contributors + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package de.splatgames.aether.datafixers.functional.chaos.util; + +import de.splatgames.aether.datafixers.api.DataVersion; +import de.splatgames.aether.datafixers.api.TypeReference; +import de.splatgames.aether.datafixers.api.dynamic.Dynamic; +import de.splatgames.aether.datafixers.api.fix.DataFix; +import de.splatgames.aether.datafixers.api.fix.DataFixerContext; +import org.jetbrains.annotations.NotNull; + +import java.util.concurrent.atomic.AtomicInteger; + +/** + * A DataFix that can be configured to fail randomly for chaos testing. + * + *

This fix wraps the chaos injection logic and can be used to test + * error handling in the data fixer pipeline. + * + * @param the element type + */ +public final class FailingDataFix implements DataFix { + + private final String name; + private final DataVersion fromVersion; + private final DataVersion toVersion; + private final ChaosInjector chaosInjector; + private final AtomicInteger applyCount = new AtomicInteger(0); + private final AtomicInteger failCount = new AtomicInteger(0); + + private FailingDataFix(Builder builder) { + this.name = builder.name; + this.fromVersion = builder.fromVersion; + this.toVersion = builder.toVersion; + this.chaosInjector = builder.chaosInjector; + } + + /** + * Creates a builder for a FailingDataFix. + * + * @param the element type + * @return a new builder + */ + public static Builder builder() { + return new Builder<>(); + } + + @Override + public @NotNull String name() { + return name; + } + + @Override + public @NotNull DataVersion fromVersion() { + return fromVersion; + } + + @Override + public @NotNull DataVersion toVersion() { + return toVersion; + } + + @Override + public @NotNull Dynamic apply( + @NotNull TypeReference type, + @NotNull Dynamic input, + @NotNull DataFixerContext context + ) { + applyCount.incrementAndGet(); + + // Inject delay if configured + chaosInjector.maybeDelay(); + + // Inject failure if configured + if (chaosInjector.shouldFail()) { + failCount.incrementAndGet(); + throw new ChaosInjector.ChaosException( + "Chaos failure in fix '" + name + "' at version " + fromVersion + " -> " + toVersion + ); + } + + // Mark as processed and return + return input.set("chaos_processed", input.createBoolean(true)); + } + + /** + * Returns the number of times this fix has been applied. + * + * @return apply count + */ + public int applyCount() { + return applyCount.get(); + } + + /** + * Returns the number of times this fix has triggered a failure. + * + * @return fail count + */ + public int failCount() { + return failCount.get(); + } + + /** + * Resets the apply and fail counters. + */ + public void resetCounters() { + applyCount.set(0); + failCount.set(0); + } + + /** + * Builder for FailingDataFix. + * + * @param the element type + */ + public static final class Builder { + private String name = "chaos_fix"; + private DataVersion fromVersion = new DataVersion(1); + private DataVersion toVersion = new DataVersion(2); + private ChaosInjector chaosInjector = ChaosInjector.builder().build(); + + private Builder() { + } + + /** + * Sets the fix name. + * + * @param name the fix name + * @return this builder + */ + public Builder name(String name) { + this.name = name; + return this; + } + + /** + * Sets the version range. + * + * @param from the source version + * @param to the target version + * @return this builder + */ + public Builder versions(int from, int to) { + this.fromVersion = new DataVersion(from); + this.toVersion = new DataVersion(to); + return this; + } + + /** + * Sets the chaos injector to use. + * + * @param chaosInjector the chaos injector + * @return this builder + */ + public Builder chaosInjector(ChaosInjector chaosInjector) { + this.chaosInjector = chaosInjector; + return this; + } + + /** + * Configures the fix to fail with the given probability. + * + * @param probability failure probability (0.0 to 1.0) + * @return this builder + */ + public Builder failureProbability(double probability) { + this.chaosInjector = ChaosInjector.withFailureProbability(probability); + return this; + } + + /** + * Configures the fix to introduce random delays. + * + * @param minMs minimum delay in milliseconds + * @param maxMs maximum delay in milliseconds + * @return this builder + */ + public Builder randomDelay(int minMs, int maxMs) { + this.chaosInjector = ChaosInjector.builder() + .delayRange(minMs, maxMs) + .failureProbability( + this.chaosInjector != null ? 0 : 0 // Keep existing probability if set + ) + .build(); + return this; + } + + /** + * Builds the FailingDataFix. + * + * @return a new FailingDataFix + */ + public FailingDataFix build() { + return new FailingDataFix<>(this); + } + } +} diff --git a/aether-datafixers-functional-tests/src/test/java/de/splatgames/aether/datafixers/functional/stress/HighConcurrencyMigrationStressIT.java b/aether-datafixers-functional-tests/src/test/java/de/splatgames/aether/datafixers/functional/stress/HighConcurrencyMigrationStressIT.java new file mode 100644 index 0000000..4dd0ff3 --- /dev/null +++ b/aether-datafixers-functional-tests/src/test/java/de/splatgames/aether/datafixers/functional/stress/HighConcurrencyMigrationStressIT.java @@ -0,0 +1,469 @@ +/* + * Copyright (c) 2025 Splatgames.de Software and Contributors + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package de.splatgames.aether.datafixers.functional.stress; + +import com.google.gson.JsonElement; +import com.google.gson.JsonObject; +import de.splatgames.aether.datafixers.api.DataVersion; +import de.splatgames.aether.datafixers.api.TypeReference; +import de.splatgames.aether.datafixers.api.dynamic.Dynamic; +import de.splatgames.aether.datafixers.api.fix.DataFix; +import de.splatgames.aether.datafixers.api.fix.DataFixer; +import de.splatgames.aether.datafixers.api.fix.DataFixerContext; +import de.splatgames.aether.datafixers.codec.json.gson.GsonOps; +import de.splatgames.aether.datafixers.core.fix.DataFixerBuilder; +import de.splatgames.aether.datafixers.functional.stress.util.StressTestConfig; +import de.splatgames.aether.datafixers.functional.stress.util.StressTestMetrics; +import de.splatgames.aether.datafixers.functional.stress.util.ThreadOrchestrator; +import org.jetbrains.annotations.NotNull; +import org.junit.jupiter.api.DisplayName; +import org.junit.jupiter.api.Nested; +import org.junit.jupiter.api.Tag; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.Timeout; +import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.ValueSource; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.time.Duration; +import java.time.Instant; +import java.util.Set; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; + +import static org.assertj.core.api.Assertions.assertThat; + +/** + * Stress tests for high-concurrency migration scenarios. + * + *

Tests DataFixer behavior with 100+ concurrent threads performing + * simultaneous migrations. Verifies: + *

+ * + *

Configuration via system properties: + *

+ * + *

Run with: {@code mvn verify -Pstress -Dgroups=stress} + */ +@DisplayName("High Concurrency Migration Stress Tests") +@Tag("stress") +@Tag("integration") +class HighConcurrencyMigrationStressIT { + + private static final Logger LOGGER = LoggerFactory.getLogger(HighConcurrencyMigrationStressIT.class); + private static final TypeReference STRESS_TYPE = new TypeReference("stress_entity"); + + @Nested + @DisplayName("Concurrent Single Fix Migrations") + class ConcurrentSingleFix { + + @Test + @Timeout(value = 5, unit = TimeUnit.MINUTES) + @DisplayName("handles 100+ concurrent single-fix migrations") + void handles100ConcurrentSingleFixMigrations() throws Exception { + int threadCount = StressTestConfig.threadCount(); + int opsPerThread = StressTestConfig.operationsPerThread(); + + LOGGER.info("Starting stress test with {} threads, {} ops/thread", + threadCount, opsPerThread); + + AtomicInteger fixApplyCount = new AtomicInteger(0); + StressTestMetrics metrics = new StressTestMetrics(); + + DataFixer fixer = new DataFixerBuilder(new DataVersion(2)) + .addFix(STRESS_TYPE, createCountingFix("single_fix", 1, 2, fixApplyCount)) + .build(); + + try (ThreadOrchestrator orchestrator = ThreadOrchestrator.withThreads(threadCount)) { + for (int t = 0; t < threadCount; t++) { + final int threadId = t; + orchestrator.submit(() -> { + for (int i = 0; i < opsPerThread; i++) { + long startNanos = System.nanoTime(); + boolean success = true; + + try { + JsonObject inputObj = new JsonObject(); + inputObj.addProperty("id", threadId * 100_000L + i); + inputObj.addProperty("threadId", threadId); + Dynamic input = new Dynamic<>(GsonOps.INSTANCE, inputObj); + + Dynamic result = fixer.update( + STRESS_TYPE, input, + new DataVersion(1), new DataVersion(2) + ); + + // Verify result + assertThat(result.get("processed").asBoolean().result()) + .contains(true); + assertThat(result.get("id").asLong().result()) + .contains(threadId * 100_000L + i); + } catch (Exception e) { + success = false; + throw e; + } finally { + metrics.recordOperation(System.nanoTime() - startNanos, success); + } + } + }); + } + + Instant startTime = Instant.now(); + orchestrator.startAll(); + boolean completed = orchestrator.awaitCompletion(Duration.ofMinutes(5)); + + Duration elapsed = Duration.between(startTime, Instant.now()); + metrics.report("handles100ConcurrentSingleFixMigrations", elapsed); + + assertThat(completed).as("All threads completed within timeout").isTrue(); + assertThat(orchestrator.errors()).as("No exceptions thrown").isEmpty(); + assertThat(fixApplyCount.get()) + .as("All fixes applied") + .isEqualTo(threadCount * opsPerThread); + } + } + + @Test + @Timeout(value = 5, unit = TimeUnit.MINUTES) + @DisplayName("maintains data integrity under high contention") + void maintainsDataIntegrityUnderHighContention() throws Exception { + int threadCount = StressTestConfig.threadCount(); + int opsPerThread = StressTestConfig.operationsPerThread(); + + // Track all processed IDs to verify no duplicates or losses + Set processedIds = ConcurrentHashMap.newKeySet(); + + DataFixer fixer = new DataFixerBuilder(new DataVersion(2)) + .addFix(STRESS_TYPE, createSimpleFix("integrity_fix", 1, 2)) + .build(); + + try (ThreadOrchestrator orchestrator = ThreadOrchestrator.withThreads(threadCount)) { + for (int t = 0; t < threadCount; t++) { + final int threadId = t; + orchestrator.submit(() -> { + for (int i = 0; i < opsPerThread; i++) { + long uniqueId = threadId * 100_000L + i; + + JsonObject inputObj = new JsonObject(); + inputObj.addProperty("id", uniqueId); + Dynamic input = new Dynamic<>(GsonOps.INSTANCE, inputObj); + + Dynamic result = fixer.update( + STRESS_TYPE, input, + new DataVersion(1), new DataVersion(2) + ); + + // Verify the ID wasn't corrupted + Long resultId = result.get("id").asLong().result().orElse(null); + assertThat(resultId).isEqualTo(uniqueId); + + // Track that we processed this ID + processedIds.add(uniqueId); + } + }); + } + + orchestrator.startAll(); + boolean completed = orchestrator.awaitCompletion(Duration.ofMinutes(5)); + + assertThat(completed).isTrue(); + assertThat(orchestrator.errors()).isEmpty(); + + // Verify all expected IDs were processed exactly once + assertThat(processedIds) + .as("All unique IDs processed") + .hasSize(threadCount * opsPerThread); + } + } + } + + @Nested + @DisplayName("Concurrent Chain Migrations") + class ConcurrentChainMigration { + + @Test + @Timeout(value = 10, unit = TimeUnit.MINUTES) + @DisplayName("handles concurrent 10-fix chain migrations") + void handlesConcurrentChainMigrations() throws Exception { + int threadCount = Math.min(StressTestConfig.threadCount(), 50); // Reduce for chain tests + int opsPerThread = StressTestConfig.operationsPerThread() / 10; // Reduce ops for longer chains + + LOGGER.info("Starting chain migration test with {} threads, {} ops/thread, 10-fix chain", + threadCount, opsPerThread); + + AtomicInteger totalFixApplications = new AtomicInteger(0); + StressTestMetrics metrics = new StressTestMetrics(); + + // Build a fixer with 10 chained fixes + DataFixerBuilder builder = new DataFixerBuilder(new DataVersion(11)); + for (int v = 1; v <= 10; v++) { + final int version = v; + builder.addFix(STRESS_TYPE, new DataFix() { + @Override + public @NotNull String name() { + return "chain_fix_v" + version; + } + + @Override + public @NotNull DataVersion fromVersion() { + return new DataVersion(version); + } + + @Override + public @NotNull DataVersion toVersion() { + return new DataVersion(version + 1); + } + + @Override + public @NotNull Dynamic apply( + @NotNull TypeReference type, + @NotNull Dynamic input, + @NotNull DataFixerContext context + ) { + totalFixApplications.incrementAndGet(); + int currentStep = input.get("migration_steps").asInt().result().orElse(0); + return input.set("migration_steps", input.createInt(currentStep + 1)); + } + }); + } + DataFixer fixer = builder.build(); + + try (ThreadOrchestrator orchestrator = ThreadOrchestrator.withThreads(threadCount)) { + for (int t = 0; t < threadCount; t++) { + final int threadId = t; + orchestrator.submit(() -> { + for (int i = 0; i < opsPerThread; i++) { + long startNanos = System.nanoTime(); + boolean success = true; + + try { + JsonObject inputObj = new JsonObject(); + inputObj.addProperty("id", threadId * 100_000L + i); + inputObj.addProperty("migration_steps", 0); + Dynamic input = new Dynamic<>(GsonOps.INSTANCE, inputObj); + + Dynamic result = fixer.update( + STRESS_TYPE, input, + new DataVersion(1), new DataVersion(11) + ); + + // Verify all 10 fixes were applied + assertThat(result.get("migration_steps").asInt().result()) + .contains(10); + } catch (Exception e) { + success = false; + throw e; + } finally { + metrics.recordOperation(System.nanoTime() - startNanos, success); + } + } + }); + } + + Instant startTime = Instant.now(); + orchestrator.startAll(); + boolean completed = orchestrator.awaitCompletion(Duration.ofMinutes(10)); + + Duration elapsed = Duration.between(startTime, Instant.now()); + metrics.report("handlesConcurrentChainMigrations", elapsed); + + assertThat(completed).isTrue(); + assertThat(orchestrator.errors()).isEmpty(); + + // Each operation applies 10 fixes + assertThat(totalFixApplications.get()) + .isEqualTo(threadCount * opsPerThread * 10); + } + } + + @Test + @Timeout(value = 10, unit = TimeUnit.MINUTES) + @DisplayName("handles mixed version range migrations concurrently") + void handlesMixedVersionRanges() throws Exception { + int threadCount = StressTestConfig.threadCount(); + int opsPerThread = StressTestConfig.operationsPerThread() / 5; + + // Build fixer with 5 fixes + DataFixerBuilder builder = new DataFixerBuilder(new DataVersion(6)); + for (int v = 1; v <= 5; v++) { + builder.addFix(STRESS_TYPE, createSimpleFix("mix_fix_v" + v, v, v + 1)); + } + DataFixer fixer = builder.build(); + + try (ThreadOrchestrator orchestrator = ThreadOrchestrator.withThreads(threadCount)) { + for (int t = 0; t < threadCount; t++) { + final int threadId = t; + orchestrator.submit(() -> { + for (int i = 0; i < opsPerThread; i++) { + // Different threads use different version ranges + int fromVersion = (threadId % 5) + 1; + int toVersion = 6; + + JsonObject inputObj = new JsonObject(); + inputObj.addProperty("id", threadId * 100_000L + i); + inputObj.addProperty("start_version", fromVersion); + Dynamic input = new Dynamic<>(GsonOps.INSTANCE, inputObj); + + Dynamic result = fixer.update( + STRESS_TYPE, input, + new DataVersion(fromVersion), new DataVersion(toVersion) + ); + + // Verify the data is still intact + assertThat(result.get("id").asLong().result()) + .contains(threadId * 100_000L + i); + } + }); + } + + orchestrator.startAll(); + boolean completed = orchestrator.awaitCompletion(Duration.ofMinutes(10)); + + assertThat(completed).isTrue(); + assertThat(orchestrator.errors()).isEmpty(); + } + } + } + + @Nested + @DisplayName("Throughput Scaling") + class ScalingAnalysis { + + @ParameterizedTest(name = "threads={0}") + @ValueSource(ints = {10, 25, 50, 100}) + @Timeout(value = 5, unit = TimeUnit.MINUTES) + @DisplayName("throughput scales with thread count") + void throughputScalesWithThreadCount(int threadCount) throws Exception { + int opsPerThread = 1000; + StressTestMetrics metrics = new StressTestMetrics(); + + DataFixer fixer = new DataFixerBuilder(new DataVersion(2)) + .addFix(STRESS_TYPE, createSimpleFix("scale_fix", 1, 2)) + .build(); + + try (ThreadOrchestrator orchestrator = ThreadOrchestrator.withThreads(threadCount)) { + for (int t = 0; t < threadCount; t++) { + final int threadId = t; + orchestrator.submit(() -> { + for (int i = 0; i < opsPerThread; i++) { + long startNanos = System.nanoTime(); + + JsonObject inputObj = new JsonObject(); + inputObj.addProperty("id", threadId * 100_000L + i); + Dynamic input = new Dynamic<>(GsonOps.INSTANCE, inputObj); + + fixer.update( + STRESS_TYPE, input, + new DataVersion(1), new DataVersion(2) + ); + + metrics.recordOperation(System.nanoTime() - startNanos, true); + } + }); + } + + Instant startTime = Instant.now(); + orchestrator.startAll(); + boolean completed = orchestrator.awaitCompletion(Duration.ofMinutes(5)); + Duration elapsed = Duration.between(startTime, Instant.now()); + + LOGGER.info("Threads: {}, Throughput: {} ops/sec, Avg latency: {} us", + threadCount, + String.format("%.2f", metrics.throughputPerSecond(elapsed)), + metrics.averageLatencyNanos() / 1000); + + assertThat(completed).isTrue(); + assertThat(orchestrator.errors()).isEmpty(); + assertThat(metrics.totalOperations()).isEqualTo((long) threadCount * opsPerThread); + } + } + } + + // Helper methods + + private DataFix createSimpleFix(String name, int from, int to) { + return new DataFix<>() { + @Override + public @NotNull String name() { + return name; + } + + @Override + public @NotNull DataVersion fromVersion() { + return new DataVersion(from); + } + + @Override + public @NotNull DataVersion toVersion() { + return new DataVersion(to); + } + + @Override + public @NotNull Dynamic apply( + @NotNull TypeReference type, + @NotNull Dynamic input, + @NotNull DataFixerContext context + ) { + return input.set("processed", input.createBoolean(true)); + } + }; + } + + private DataFix createCountingFix(String name, int from, int to, AtomicInteger counter) { + return new DataFix<>() { + @Override + public @NotNull String name() { + return name; + } + + @Override + public @NotNull DataVersion fromVersion() { + return new DataVersion(from); + } + + @Override + public @NotNull DataVersion toVersion() { + return new DataVersion(to); + } + + @Override + public @NotNull Dynamic apply( + @NotNull TypeReference type, + @NotNull Dynamic input, + @NotNull DataFixerContext context + ) { + counter.incrementAndGet(); + return input.set("processed", input.createBoolean(true)); + } + }; + } +} diff --git a/aether-datafixers-functional-tests/src/test/java/de/splatgames/aether/datafixers/functional/stress/MemoryPressureStressIT.java b/aether-datafixers-functional-tests/src/test/java/de/splatgames/aether/datafixers/functional/stress/MemoryPressureStressIT.java new file mode 100644 index 0000000..759bc7c --- /dev/null +++ b/aether-datafixers-functional-tests/src/test/java/de/splatgames/aether/datafixers/functional/stress/MemoryPressureStressIT.java @@ -0,0 +1,387 @@ +/* + * Copyright (c) 2025 Splatgames.de Software and Contributors + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package de.splatgames.aether.datafixers.functional.stress; + +import com.google.gson.JsonElement; +import com.google.gson.JsonObject; +import de.splatgames.aether.datafixers.api.DataVersion; +import de.splatgames.aether.datafixers.api.TypeReference; +import de.splatgames.aether.datafixers.api.dynamic.Dynamic; +import de.splatgames.aether.datafixers.api.fix.DataFix; +import de.splatgames.aether.datafixers.api.fix.DataFixer; +import de.splatgames.aether.datafixers.api.fix.DataFixerContext; +import de.splatgames.aether.datafixers.codec.json.gson.GsonOps; +import de.splatgames.aether.datafixers.core.fix.DataFixerBuilder; +import de.splatgames.aether.datafixers.functional.stress.util.StressTestConfig; +import org.jetbrains.annotations.NotNull; +import org.junit.jupiter.api.DisplayName; +import org.junit.jupiter.api.Nested; +import org.junit.jupiter.api.Tag; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.Timeout; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.lang.management.ManagementFactory; +import java.lang.management.MemoryMXBean; +import java.lang.management.MemoryUsage; +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; +import java.util.concurrent.*; +import java.util.concurrent.atomic.AtomicLong; + +import static org.assertj.core.api.Assertions.assertThat; + +/** + * Tests memory behavior under stress. + * + *

Verifies: + *

+ * + *

Run with: {@code mvn verify -Pstress -Dgroups=stress} + */ +@DisplayName("Memory Pressure Stress Tests") +@Tag("stress") +@Tag("integration") +class MemoryPressureStressIT { + + private static final Logger LOGGER = LoggerFactory.getLogger(MemoryPressureStressIT.class); + private static final TypeReference MEMORY_TYPE = new TypeReference("memory_entity"); + private static final MemoryMXBean MEMORY_BEAN = ManagementFactory.getMemoryMXBean(); + + @Nested + @DisplayName("Memory Leak Detection") + class MemoryLeakDetection { + + @Test + @Timeout(value = 10, unit = TimeUnit.MINUTES) + @DisplayName("no memory leaks in repeated migration cycles") + void noMemoryLeaksInRepeatedMigrationCycles() throws Exception { + int cycles = 10; + int operationsPerCycle = 10_000; + + LOGGER.info("Starting memory leak detection: {} cycles, {} ops/cycle", + cycles, operationsPerCycle); + + List heapUsageAfterGc = new ArrayList<>(); + + for (int cycle = 0; cycle < cycles; cycle++) { + DataFixer fixer = new DataFixerBuilder(new DataVersion(2)) + .addFix(MEMORY_TYPE, createSimpleFix("memory_fix_" + cycle, 1, 2)) + .build(); + + // Perform many migrations + for (int i = 0; i < operationsPerCycle; i++) { + JsonObject inputObj = new JsonObject(); + inputObj.addProperty("id", cycle * 1_000_000L + i); + inputObj.addProperty("data", "test_data_" + i); + Dynamic input = new Dynamic<>(GsonOps.INSTANCE, inputObj); + + fixer.update( + MEMORY_TYPE, input, + new DataVersion(1), new DataVersion(2) + ); + } + + // Force GC and measure heap + forceGc(); + MemoryUsage heapUsage = MEMORY_BEAN.getHeapMemoryUsage(); + heapUsageAfterGc.add(heapUsage.getUsed()); + + LOGGER.info("Cycle {}: Heap after GC = {} MB", + cycle + 1, heapUsage.getUsed() / (1024 * 1024)); + } + + // Analyze heap growth trend + // First few cycles may show growth due to JIT compilation and class loading + // After warm-up (cycles 3+), heap should be relatively stable + List stablePhase = heapUsageAfterGc.subList(3, heapUsageAfterGc.size()); + + long minHeap = Collections.min(stablePhase); + long maxHeap = Collections.max(stablePhase); + double growthRatio = (double) maxHeap / minHeap; + + LOGGER.info("Stable phase heap range: {} MB - {} MB (ratio: {})", + minHeap / (1024 * 1024), + maxHeap / (1024 * 1024), + String.format("%.2f", growthRatio)); + + // Allow up to 50% heap variation during stable phase + // This accounts for GC timing variations + assertThat(growthRatio) + .as("Heap growth ratio should be stable (no unbounded leak)") + .isLessThan(1.5); + } + + @Test + @Timeout(value = 5, unit = TimeUnit.MINUTES) + @DisplayName("fixer instances can be garbage collected") + void fixerInstancesCanBeGarbageCollected() throws Exception { + int fixerCount = 100; + int opsPerFixer = 1000; + + LOGGER.info("Creating and discarding {} fixers", fixerCount); + + long heapBefore = getHeapUsedAfterGc(); + + for (int f = 0; f < fixerCount; f++) { + DataFixer fixer = new DataFixerBuilder(new DataVersion(2)) + .addFix(MEMORY_TYPE, createSimpleFix("disposable_fix_" + f, 1, 2)) + .build(); + + for (int i = 0; i < opsPerFixer; i++) { + JsonObject inputObj = new JsonObject(); + inputObj.addProperty("id", i); + Dynamic input = new Dynamic<>(GsonOps.INSTANCE, inputObj); + + fixer.update( + MEMORY_TYPE, input, + new DataVersion(1), new DataVersion(2) + ); + } + // fixer goes out of scope and should be eligible for GC + } + + long heapAfter = getHeapUsedAfterGc(); + + LOGGER.info("Heap before: {} MB, after: {} MB", + heapBefore / (1024 * 1024), + heapAfter / (1024 * 1024)); + + // Heap should not have grown significantly + // Allow some growth for class metadata and JIT artifacts + long heapGrowth = heapAfter - heapBefore; + long maxAllowedGrowth = 50 * 1024 * 1024; // 50 MB + + assertThat(heapGrowth) + .as("Heap growth after disposing fixers") + .isLessThan(maxAllowedGrowth); + } + } + + @Nested + @DisplayName("GC Pressure Handling") + class GcPressureHandling { + + @Test + @Timeout(value = 5, unit = TimeUnit.MINUTES) + @DisplayName("handles GC pressure gracefully") + void handlesGcPressureGracefully() throws Exception { + int threadCount = 20; + int opsPerThread = 10_000; + + LOGGER.info("Starting GC pressure test: {} threads, {} ops/thread", + threadCount, opsPerThread); + + AtomicLong totalOps = new AtomicLong(0); + List errors = Collections.synchronizedList(new ArrayList<>()); + + DataFixer fixer = new DataFixerBuilder(new DataVersion(2)) + .addFix(MEMORY_TYPE, createLargeObjectFix("gc_pressure_fix", 1, 2)) + .build(); + + ExecutorService executor = Executors.newFixedThreadPool(threadCount); + CountDownLatch latch = new CountDownLatch(threadCount); + + for (int t = 0; t < threadCount; t++) { + final int threadId = t; + executor.submit(() -> { + try { + for (int i = 0; i < opsPerThread; i++) { + // Create large input to generate allocation pressure + JsonObject inputObj = new JsonObject(); + inputObj.addProperty("id", threadId * 1_000_000L + i); + inputObj.addProperty("largeField", createLargeString(1024)); + Dynamic input = new Dynamic<>(GsonOps.INSTANCE, inputObj); + + Dynamic result = fixer.update( + MEMORY_TYPE, input, + new DataVersion(1), new DataVersion(2) + ); + + assertThat(result.get("processed").asBoolean().result()) + .contains(true); + + totalOps.incrementAndGet(); + } + } catch (Exception e) { + errors.add(e); + } finally { + latch.countDown(); + } + }); + } + + boolean completed = latch.await(5, TimeUnit.MINUTES); + executor.shutdown(); + + assertThat(completed).isTrue(); + assertThat(errors).isEmpty(); + assertThat(totalOps.get()) + .isEqualTo((long) threadCount * opsPerThread); + + LOGGER.info("Completed {} operations under GC pressure", totalOps.get()); + } + + @Test + @Timeout(value = 5, unit = TimeUnit.MINUTES) + @DisplayName("maintains correctness during GC pauses") + void maintainsCorrectnessDuringGcPauses() throws Exception { + int iterations = 100; + int opsPerIteration = 1000; + + LOGGER.info("Testing correctness across {} iterations with GC", iterations); + + DataFixer fixer = new DataFixerBuilder(new DataVersion(2)) + .addFix(MEMORY_TYPE, createSimpleFix("gc_correctness_fix", 1, 2)) + .build(); + + for (int iter = 0; iter < iterations; iter++) { + // Force GC at various points to trigger potential issues + if (iter % 10 == 0) { + System.gc(); + } + + for (int i = 0; i < opsPerIteration; i++) { + long uniqueId = iter * 1_000_000L + i; + + JsonObject inputObj = new JsonObject(); + inputObj.addProperty("id", uniqueId); + inputObj.addProperty("checksum", uniqueId * 31); + Dynamic input = new Dynamic<>(GsonOps.INSTANCE, inputObj); + + Dynamic result = fixer.update( + MEMORY_TYPE, input, + new DataVersion(1), new DataVersion(2) + ); + + // Verify data integrity + Long resultId = result.get("id").asLong().result().orElse(null); + Long resultChecksum = result.get("checksum").asLong().result().orElse(null); + + assertThat(resultId).isEqualTo(uniqueId); + assertThat(resultChecksum).isEqualTo(uniqueId * 31); + } + } + + LOGGER.info("Completed correctness test with {} total operations", + iterations * opsPerIteration); + } + } + + // Helper methods + + private void forceGc() { + System.gc(); + try { + Thread.sleep(100); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + } + System.gc(); + try { + Thread.sleep(100); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + } + } + + private long getHeapUsedAfterGc() { + forceGc(); + return MEMORY_BEAN.getHeapMemoryUsage().getUsed(); + } + + private String createLargeString(int size) { + StringBuilder sb = new StringBuilder(size); + for (int i = 0; i < size; i++) { + sb.append((char) ('a' + (i % 26))); + } + return sb.toString(); + } + + private DataFix createSimpleFix(String name, int from, int to) { + return new DataFix<>() { + @Override + public @NotNull String name() { + return name; + } + + @Override + public @NotNull DataVersion fromVersion() { + return new DataVersion(from); + } + + @Override + public @NotNull DataVersion toVersion() { + return new DataVersion(to); + } + + @Override + public @NotNull Dynamic apply( + @NotNull TypeReference type, + @NotNull Dynamic input, + @NotNull DataFixerContext context + ) { + return input.set("processed", input.createBoolean(true)); + } + }; + } + + private DataFix createLargeObjectFix(String name, int from, int to) { + return new DataFix<>() { + @Override + public @NotNull String name() { + return name; + } + + @Override + public @NotNull DataVersion fromVersion() { + return new DataVersion(from); + } + + @Override + public @NotNull DataVersion toVersion() { + return new DataVersion(to); + } + + @Override + public @NotNull Dynamic apply( + @NotNull TypeReference type, + @NotNull Dynamic input, + @NotNull DataFixerContext context + ) { + // Create some intermediate objects to increase allocation pressure + String largeField = input.get("largeField").asString().result().orElse(""); + String processed = largeField.toUpperCase(); + return input + .set("processed", input.createBoolean(true)) + .set("processedField", input.createString(processed.substring(0, Math.min(100, processed.length())))); + } + }; + } +} diff --git a/aether-datafixers-functional-tests/src/test/java/de/splatgames/aether/datafixers/functional/stress/MixedRegistryAccessStressIT.java b/aether-datafixers-functional-tests/src/test/java/de/splatgames/aether/datafixers/functional/stress/MixedRegistryAccessStressIT.java new file mode 100644 index 0000000..021c726 --- /dev/null +++ b/aether-datafixers-functional-tests/src/test/java/de/splatgames/aether/datafixers/functional/stress/MixedRegistryAccessStressIT.java @@ -0,0 +1,288 @@ +/* + * Copyright (c) 2025 Splatgames.de Software and Contributors + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package de.splatgames.aether.datafixers.functional.stress; + +import de.splatgames.aether.datafixers.api.DataVersion; +import de.splatgames.aether.datafixers.api.TypeReference; +import de.splatgames.aether.datafixers.api.type.Type; +import de.splatgames.aether.datafixers.core.type.SimpleTypeRegistry; +import de.splatgames.aether.datafixers.functional.stress.util.StressTestConfig; +import de.splatgames.aether.datafixers.functional.stress.util.ThreadOrchestrator; +import org.junit.jupiter.api.DisplayName; +import org.junit.jupiter.api.Nested; +import org.junit.jupiter.api.Tag; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.Timeout; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.time.Duration; +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; +import java.util.concurrent.*; +import java.util.concurrent.atomic.AtomicInteger; + +import static org.assertj.core.api.Assertions.assertThat; +import static org.assertj.core.api.Assertions.assertThatThrownBy; + +/** + * Tests mixed read/write patterns on registries under concurrent access. + * + *

Verifies: + *

+ * + *

Run with: {@code mvn verify -Pstress -Dgroups=stress} + */ +@DisplayName("Mixed Registry Access Stress Tests") +@Tag("stress") +@Tag("integration") +class MixedRegistryAccessStressIT { + + private static final Logger LOGGER = LoggerFactory.getLogger(MixedRegistryAccessStressIT.class); + + @Nested + @DisplayName("Post-Freeze Concurrent Reads") + class PostFreezeConcurrentReads { + + @Test + @Timeout(value = 5, unit = TimeUnit.MINUTES) + @DisplayName("frozen registry handles massive concurrent reads") + void frozenRegistryHandlesMassiveConcurrentReads() throws Exception { + int threadCount = StressTestConfig.threadCount(); + int readsPerThread = StressTestConfig.operationsPerThread(); + int typeCount = 100; + + LOGGER.info("Starting registry read stress test: {} threads, {} reads/thread, {} types", + threadCount, readsPerThread, typeCount); + + // Setup: Create and freeze a registry with many types + SimpleTypeRegistry registry = new SimpleTypeRegistry(); + for (int i = 0; i < typeCount; i++) { + Type type = Type.named("stress_type_" + i, Type.STRING); + registry.register(type); + } + registry.freeze(); + + AtomicInteger successfulReads = new AtomicInteger(0); + + try (ThreadOrchestrator orchestrator = ThreadOrchestrator.withThreads(threadCount)) { + for (int t = 0; t < threadCount; t++) { + orchestrator.submit(() -> { + for (int i = 0; i < readsPerThread; i++) { + int typeIndex = i % typeCount; + TypeReference ref = new TypeReference("stress_type_" + typeIndex); + + Type type = registry.get(ref); + + assertThat(type).isNotNull(); + assertThat(type.reference().getId()).isEqualTo("stress_type_" + typeIndex); + + successfulReads.incrementAndGet(); + } + }); + } + + orchestrator.startAll(); + boolean completed = orchestrator.awaitCompletion(Duration.ofMinutes(5)); + + assertThat(completed).isTrue(); + assertThat(orchestrator.errors()).isEmpty(); + assertThat(successfulReads.get()) + .isEqualTo(threadCount * readsPerThread); + } + + LOGGER.info("Completed {} successful reads", successfulReads.get()); + } + + @Test + @Timeout(value = 2, unit = TimeUnit.MINUTES) + @DisplayName("rejects modifications after freeze under contention") + void rejectsModificationsAfterFreezeUnderContention() throws Exception { + int threadCount = 50; + int attemptsPerThread = 100; + + SimpleTypeRegistry registry = new SimpleTypeRegistry(); + registry.register(Type.named("initial_type", Type.STRING)); + registry.freeze(); + + AtomicInteger rejectionCount = new AtomicInteger(0); + + try (ThreadOrchestrator orchestrator = ThreadOrchestrator.withThreads(threadCount)) { + for (int t = 0; t < threadCount; t++) { + final int threadId = t; + orchestrator.submit(() -> { + for (int i = 0; i < attemptsPerThread; i++) { + try { + Type newType = Type.named( + "new_type_" + threadId + "_" + i, + Type.STRING + ); + registry.register(newType); + // Should not reach here + throw new AssertionError("Expected IllegalStateException"); + } catch (IllegalStateException e) { + // Expected behavior + rejectionCount.incrementAndGet(); + } + } + }); + } + + orchestrator.startAll(); + boolean completed = orchestrator.awaitCompletion(Duration.ofMinutes(2)); + + assertThat(completed).isTrue(); + assertThat(orchestrator.errors()).isEmpty(); + assertThat(rejectionCount.get()) + .as("All modification attempts rejected") + .isEqualTo(threadCount * attemptsPerThread); + } + } + + @Test + @Timeout(value = 5, unit = TimeUnit.MINUTES) + @DisplayName("handles interleaved reads of different types") + void handlesInterleavedReadsOfDifferentTypes() throws Exception { + int threadCount = StressTestConfig.threadCount(); + int readsPerThread = StressTestConfig.operationsPerThread(); + int typeCount = 500; + + SimpleTypeRegistry registry = new SimpleTypeRegistry(); + for (int i = 0; i < typeCount; i++) { + registry.register(Type.named("interleaved_type_" + i, Type.STRING)); + } + registry.freeze(); + + AtomicInteger successfulReads = new AtomicInteger(0); + + try (ThreadOrchestrator orchestrator = ThreadOrchestrator.withThreads(threadCount)) { + for (int t = 0; t < threadCount; t++) { + final int threadId = t; + orchestrator.submit(() -> { + // Each thread accesses types in a different pattern + int startOffset = threadId * 7; // Prime number for better distribution + for (int i = 0; i < readsPerThread; i++) { + int typeIndex = (startOffset + i * 13) % typeCount; + TypeReference ref = new TypeReference("interleaved_type_" + typeIndex); + + Type type = registry.get(ref); + assertThat(type).isNotNull(); + + successfulReads.incrementAndGet(); + } + }); + } + + orchestrator.startAll(); + boolean completed = orchestrator.awaitCompletion(Duration.ofMinutes(5)); + + assertThat(completed).isTrue(); + assertThat(orchestrator.errors()).isEmpty(); + } + + LOGGER.info("Completed {} interleaved reads across {} types", + successfulReads.get(), typeCount); + } + } + + @Nested + @DisplayName("Pre-Freeze Concurrent Access") + class PreFreezeConcurrentAccess { + + @Test + @Timeout(value = 2, unit = TimeUnit.MINUTES) + @DisplayName("handles concurrent reads during sequential registration") + void handlesConcurrentReadsDuringSequentialRegistration() throws Exception { + int readerThreads = 20; + int readsPerThread = 1000; + int typeCount = 100; + + SimpleTypeRegistry registry = new SimpleTypeRegistry(); + AtomicInteger registeredCount = new AtomicInteger(0); + AtomicInteger successfulReads = new AtomicInteger(0); + + ExecutorService executor = Executors.newFixedThreadPool(readerThreads + 1); + CountDownLatch startLatch = new CountDownLatch(1); + CountDownLatch completionLatch = new CountDownLatch(readerThreads + 1); + List errors = Collections.synchronizedList(new ArrayList<>()); + + // Registration thread (sequential, to avoid race in the non-thread-safe registry) + executor.submit(() -> { + try { + startLatch.await(); + for (int i = 0; i < typeCount; i++) { + registry.register(Type.named("concurrent_type_" + i, Type.STRING)); + registeredCount.incrementAndGet(); + Thread.sleep(1); // Small delay to allow reads to interleave + } + } catch (Exception e) { + errors.add(e); + } finally { + completionLatch.countDown(); + } + }); + + // Reader threads + for (int t = 0; t < readerThreads; t++) { + executor.submit(() -> { + try { + startLatch.await(); + for (int i = 0; i < readsPerThread; i++) { + int currentRegistered = registeredCount.get(); + if (currentRegistered > 0) { + int typeIndex = i % currentRegistered; + TypeReference ref = new TypeReference("concurrent_type_" + typeIndex); + Type type = registry.get(ref); + if (type != null) { + successfulReads.incrementAndGet(); + } + } + Thread.yield(); // Allow other threads to run + } + } catch (Exception e) { + errors.add(e); + } finally { + completionLatch.countDown(); + } + }); + } + + startLatch.countDown(); + boolean completed = completionLatch.await(2, TimeUnit.MINUTES); + executor.shutdown(); + + assertThat(completed).isTrue(); + assertThat(registeredCount.get()).isEqualTo(typeCount); + + // Note: Some reads may have occurred before any types were registered, + // so we just verify the test completed without errors + LOGGER.info("Completed with {} registered types and {} successful reads", + registeredCount.get(), successfulReads.get()); + } + } +} diff --git a/aether-datafixers-functional-tests/src/test/java/de/splatgames/aether/datafixers/functional/stress/SustainedLoadMigrationStressIT.java b/aether-datafixers-functional-tests/src/test/java/de/splatgames/aether/datafixers/functional/stress/SustainedLoadMigrationStressIT.java new file mode 100644 index 0000000..63293ab --- /dev/null +++ b/aether-datafixers-functional-tests/src/test/java/de/splatgames/aether/datafixers/functional/stress/SustainedLoadMigrationStressIT.java @@ -0,0 +1,405 @@ +/* + * Copyright (c) 2025 Splatgames.de Software and Contributors + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package de.splatgames.aether.datafixers.functional.stress; + +import com.google.gson.JsonElement; +import com.google.gson.JsonObject; +import de.splatgames.aether.datafixers.api.DataVersion; +import de.splatgames.aether.datafixers.api.TypeReference; +import de.splatgames.aether.datafixers.api.dynamic.Dynamic; +import de.splatgames.aether.datafixers.api.fix.DataFix; +import de.splatgames.aether.datafixers.api.fix.DataFixer; +import de.splatgames.aether.datafixers.api.fix.DataFixerContext; +import de.splatgames.aether.datafixers.codec.json.gson.GsonOps; +import de.splatgames.aether.datafixers.core.fix.DataFixerBuilder; +import de.splatgames.aether.datafixers.functional.stress.util.StressTestConfig; +import de.splatgames.aether.datafixers.functional.stress.util.StressTestMetrics; +import org.jetbrains.annotations.NotNull; +import org.junit.jupiter.api.DisplayName; +import org.junit.jupiter.api.Nested; +import org.junit.jupiter.api.Tag; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.Timeout; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.time.Duration; +import java.time.Instant; +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; +import java.util.concurrent.*; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicLong; + +import static org.assertj.core.api.Assertions.assertThat; + +/** + * Tests system behavior under sustained load for extended periods. + * + *

Verifies that the DataFixer system maintains: + *

+ * + *

Configuration via system properties: + *

+ * + *

Run with: {@code mvn verify -Pstress -Dgroups=stress} + */ +@DisplayName("Sustained Load Migration Stress Tests") +@Tag("stress") +@Tag("integration") +class SustainedLoadMigrationStressIT { + + private static final Logger LOGGER = LoggerFactory.getLogger(SustainedLoadMigrationStressIT.class); + private static final TypeReference STRESS_TYPE = new TypeReference("sustained_entity"); + + @Nested + @DisplayName("Continuous Load") + class ContinuousLoad { + + @Test + @Timeout(value = 15, unit = TimeUnit.MINUTES) + @DisplayName("sustains high throughput for configured duration") + void sustainsHighThroughputForConfiguredDuration() throws Exception { + Duration testDuration = StressTestConfig.duration(); + int threadCount = Math.min(StressTestConfig.threadCount(), 50); + + LOGGER.info("Starting sustained load test: {} threads for {} minutes", + threadCount, testDuration.toMinutes()); + + StressTestMetrics metrics = new StressTestMetrics(); + AtomicBoolean running = new AtomicBoolean(true); + AtomicLong operationCount = new AtomicLong(0); + + DataFixer fixer = new DataFixerBuilder(new DataVersion(2)) + .addFix(STRESS_TYPE, createSimpleFix("sustained_fix", 1, 2)) + .build(); + + ExecutorService executor = Executors.newFixedThreadPool(threadCount); + List> futures = new ArrayList<>(); + List errors = Collections.synchronizedList(new ArrayList<>()); + + Instant startTime = Instant.now(); + Instant endTime = startTime.plus(testDuration); + + // Submit worker tasks + for (int t = 0; t < threadCount; t++) { + final int threadId = t; + futures.add(executor.submit(() -> { + long localOps = 0; + while (running.get() && Instant.now().isBefore(endTime)) { + try { + long startNanos = System.nanoTime(); + + JsonObject inputObj = new JsonObject(); + inputObj.addProperty("id", threadId * 1_000_000_000L + localOps); + inputObj.addProperty("timestamp", System.currentTimeMillis()); + Dynamic input = new Dynamic<>(GsonOps.INSTANCE, inputObj); + + Dynamic result = fixer.update( + STRESS_TYPE, input, + new DataVersion(1), new DataVersion(2) + ); + + assertThat(result.get("processed").asBoolean().result()) + .contains(true); + + metrics.recordOperation(System.nanoTime() - startNanos, true); + operationCount.incrementAndGet(); + localOps++; + } catch (Exception e) { + errors.add(e); + metrics.recordOperation(0, false); + } + } + })); + } + + // Wait for duration to elapse + Thread.sleep(testDuration.toMillis()); + running.set(false); + + // Wait for all threads to finish + executor.shutdown(); + boolean terminated = executor.awaitTermination(30, TimeUnit.SECONDS); + + Duration actualDuration = Duration.between(startTime, Instant.now()); + metrics.report("sustainsHighThroughputForConfiguredDuration", actualDuration); + + assertThat(terminated).as("All threads terminated").isTrue(); + assertThat(errors).as("No errors during sustained load").isEmpty(); + assertThat(metrics.totalOperations()) + .as("Significant number of operations completed") + .isGreaterThan(threadCount * 100L); + + LOGGER.info("Completed {} operations in {} seconds ({} ops/sec)", + metrics.totalOperations(), + actualDuration.toSeconds(), + String.format("%.2f", metrics.throughputPerSecond(actualDuration))); + } + + @Test + @Timeout(value = 15, unit = TimeUnit.MINUTES) + @DisplayName("maintains consistent latency under sustained load") + void maintainsConsistentLatencyUnderLoad() throws Exception { + Duration testDuration = Duration.ofMinutes(Math.min(StressTestConfig.duration().toMinutes(), 2)); + int threadCount = 20; + int sampleInterval = 1000; // Sample every N operations + + LOGGER.info("Starting latency consistency test: {} threads for {} minutes", + threadCount, testDuration.toMinutes()); + + AtomicBoolean running = new AtomicBoolean(true); + List latencySamples = Collections.synchronizedList(new ArrayList<>()); + + DataFixer fixer = new DataFixerBuilder(new DataVersion(2)) + .addFix(STRESS_TYPE, createSimpleFix("latency_fix", 1, 2)) + .build(); + + ExecutorService executor = Executors.newFixedThreadPool(threadCount); + List errors = Collections.synchronizedList(new ArrayList<>()); + + Instant endTime = Instant.now().plus(testDuration); + + for (int t = 0; t < threadCount; t++) { + final int threadId = t; + executor.submit(() -> { + long localOps = 0; + while (running.get() && Instant.now().isBefore(endTime)) { + try { + long startNanos = System.nanoTime(); + + JsonObject inputObj = new JsonObject(); + inputObj.addProperty("id", threadId * 1_000_000_000L + localOps); + Dynamic input = new Dynamic<>(GsonOps.INSTANCE, inputObj); + + fixer.update( + STRESS_TYPE, input, + new DataVersion(1), new DataVersion(2) + ); + + long latencyNanos = System.nanoTime() - startNanos; + + // Sample periodically to avoid memory pressure from collecting all + if (localOps % sampleInterval == 0) { + latencySamples.add(latencyNanos); + } + + localOps++; + } catch (Exception e) { + errors.add(e); + } + } + }); + } + + Thread.sleep(testDuration.toMillis()); + running.set(false); + + executor.shutdown(); + executor.awaitTermination(30, TimeUnit.SECONDS); + + assertThat(errors).isEmpty(); + assertThat(latencySamples).isNotEmpty(); + + // Analyze latency distribution + List sortedSamples = new ArrayList<>(latencySamples); + Collections.sort(sortedSamples); + + int size = sortedSamples.size(); + long p50 = sortedSamples.get(size / 2); + long p95 = sortedSamples.get((int) (size * 0.95)); + long p99 = sortedSamples.get((int) (size * 0.99)); + long max = sortedSamples.get(size - 1); + + LOGGER.info("Latency distribution (us): p50={}, p95={}, p99={}, max={}", + p50 / 1000, p95 / 1000, p99 / 1000, max / 1000); + + // p99 should not be excessively higher than p50 (reasonable consistency) + // Allow up to 100x variance for stress conditions + assertThat(p99) + .as("p99 latency should be within reasonable bounds of p50") + .isLessThan(p50 * 100); + } + } + + @Nested + @DisplayName("Burst Traffic Patterns") + class BurstTraffic { + + @Test + @Timeout(value = 10, unit = TimeUnit.MINUTES) + @DisplayName("handles burst traffic patterns") + void handlesBurstTrafficPatterns() throws Exception { + int burstThreads = 50; + int quietThreads = 5; + int burstOps = 1000; + int quietOps = 100; + int cycles = 3; + + LOGGER.info("Starting burst traffic test: {} cycles of burst ({} threads) and quiet ({} threads)", + cycles, burstThreads, quietThreads); + + StressTestMetrics burstMetrics = new StressTestMetrics(); + StressTestMetrics quietMetrics = new StressTestMetrics(); + List errors = Collections.synchronizedList(new ArrayList<>()); + + DataFixer fixer = new DataFixerBuilder(new DataVersion(2)) + .addFix(STRESS_TYPE, createSimpleFix("burst_fix", 1, 2)) + .build(); + + for (int cycle = 0; cycle < cycles; cycle++) { + LOGGER.info("Cycle {}/{}: Burst phase", cycle + 1, cycles); + + // Burst phase + ExecutorService burstExecutor = Executors.newFixedThreadPool(burstThreads); + CountDownLatch burstLatch = new CountDownLatch(burstThreads); + + for (int t = 0; t < burstThreads; t++) { + final int threadId = t; + final int currentCycle = cycle; + burstExecutor.submit(() -> { + try { + for (int i = 0; i < burstOps; i++) { + long startNanos = System.nanoTime(); + + JsonObject inputObj = new JsonObject(); + inputObj.addProperty("id", currentCycle * 1_000_000L + threadId * 10_000L + i); + inputObj.addProperty("phase", "burst"); + Dynamic input = new Dynamic<>(GsonOps.INSTANCE, inputObj); + + fixer.update( + STRESS_TYPE, input, + new DataVersion(1), new DataVersion(2) + ); + + burstMetrics.recordOperation(System.nanoTime() - startNanos, true); + } + } catch (Exception e) { + errors.add(e); + } finally { + burstLatch.countDown(); + } + }); + } + + burstLatch.await(2, TimeUnit.MINUTES); + burstExecutor.shutdown(); + + LOGGER.info("Cycle {}/{}: Quiet phase", cycle + 1, cycles); + + // Quiet phase + ExecutorService quietExecutor = Executors.newFixedThreadPool(quietThreads); + CountDownLatch quietLatch = new CountDownLatch(quietThreads); + + for (int t = 0; t < quietThreads; t++) { + final int threadId = t; + final int currentCycle = cycle; + quietExecutor.submit(() -> { + try { + for (int i = 0; i < quietOps; i++) { + long startNanos = System.nanoTime(); + + JsonObject inputObj = new JsonObject(); + inputObj.addProperty("id", currentCycle * 1_000_000L + threadId * 10_000L + i); + inputObj.addProperty("phase", "quiet"); + Dynamic input = new Dynamic<>(GsonOps.INSTANCE, inputObj); + + fixer.update( + STRESS_TYPE, input, + new DataVersion(1), new DataVersion(2) + ); + + quietMetrics.recordOperation(System.nanoTime() - startNanos, true); + } + } catch (Exception e) { + errors.add(e); + } finally { + quietLatch.countDown(); + } + }); + } + + quietLatch.await(1, TimeUnit.MINUTES); + quietExecutor.shutdown(); + } + + assertThat(errors).isEmpty(); + + long expectedBurstOps = (long) cycles * burstThreads * burstOps; + long expectedQuietOps = (long) cycles * quietThreads * quietOps; + + assertThat(burstMetrics.totalOperations()) + .as("All burst operations completed") + .isEqualTo(expectedBurstOps); + + assertThat(quietMetrics.totalOperations()) + .as("All quiet operations completed") + .isEqualTo(expectedQuietOps); + + LOGGER.info("Burst phase: {} ops, avg latency {} us", + burstMetrics.totalOperations(), + burstMetrics.averageLatencyNanos() / 1000); + LOGGER.info("Quiet phase: {} ops, avg latency {} us", + quietMetrics.totalOperations(), + quietMetrics.averageLatencyNanos() / 1000); + } + } + + // Helper methods + + private DataFix createSimpleFix(String name, int from, int to) { + return new DataFix<>() { + @Override + public @NotNull String name() { + return name; + } + + @Override + public @NotNull DataVersion fromVersion() { + return new DataVersion(from); + } + + @Override + public @NotNull DataVersion toVersion() { + return new DataVersion(to); + } + + @Override + public @NotNull Dynamic apply( + @NotNull TypeReference type, + @NotNull Dynamic input, + @NotNull DataFixerContext context + ) { + return input.set("processed", input.createBoolean(true)); + } + }; + } +} diff --git a/aether-datafixers-functional-tests/src/test/java/de/splatgames/aether/datafixers/functional/stress/util/StressTestConfig.java b/aether-datafixers-functional-tests/src/test/java/de/splatgames/aether/datafixers/functional/stress/util/StressTestConfig.java new file mode 100644 index 0000000..efb642f --- /dev/null +++ b/aether-datafixers-functional-tests/src/test/java/de/splatgames/aether/datafixers/functional/stress/util/StressTestConfig.java @@ -0,0 +1,90 @@ +/* + * Copyright (c) 2025 Splatgames.de Software and Contributors + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package de.splatgames.aether.datafixers.functional.stress.util; + +import java.time.Duration; + +/** + * Configuration holder for stress test parameters. + * Reads from system properties with sensible defaults. + * + *

Configurable via Maven system properties or command line: + *

+ * mvn verify -Pstress -Dstress.threads=200 -Dstress.duration.minutes=10
+ * 
+ */ +public final class StressTestConfig { + + /** Default number of concurrent threads. */ + public static final int DEFAULT_THREAD_COUNT = 100; + + /** Default number of operations per thread. */ + public static final int DEFAULT_OPERATIONS_PER_THREAD = 1000; + + /** Default test duration. */ + public static final Duration DEFAULT_DURATION = Duration.ofMinutes(5); + + private StressTestConfig() { + // Utility class + } + + /** + * Returns the configured thread count. + * + * @return thread count from {@code stress.threads} system property, + * or {@link #DEFAULT_THREAD_COUNT} if not set + */ + public static int threadCount() { + return Integer.getInteger("stress.threads", DEFAULT_THREAD_COUNT); + } + + /** + * Returns the configured operations per thread. + * + * @return operations per thread from {@code stress.operations.per.thread} + * system property, or {@link #DEFAULT_OPERATIONS_PER_THREAD} if not set + */ + public static int operationsPerThread() { + return Integer.getInteger("stress.operations.per.thread", DEFAULT_OPERATIONS_PER_THREAD); + } + + /** + * Returns the configured test duration. + * + * @return duration from {@code stress.duration.minutes} system property, + * or {@link #DEFAULT_DURATION} if not set + */ + public static Duration duration() { + long minutes = Long.getLong("stress.duration.minutes", DEFAULT_DURATION.toMinutes()); + return Duration.ofMinutes(minutes); + } + + /** + * Returns the total number of operations (threads * operations per thread). + * + * @return total operation count + */ + public static int totalOperations() { + return threadCount() * operationsPerThread(); + } +} diff --git a/aether-datafixers-functional-tests/src/test/java/de/splatgames/aether/datafixers/functional/stress/util/StressTestMetrics.java b/aether-datafixers-functional-tests/src/test/java/de/splatgames/aether/datafixers/functional/stress/util/StressTestMetrics.java new file mode 100644 index 0000000..14871e7 --- /dev/null +++ b/aether-datafixers-functional-tests/src/test/java/de/splatgames/aether/datafixers/functional/stress/util/StressTestMetrics.java @@ -0,0 +1,191 @@ +/* + * Copyright (c) 2025 Splatgames.de Software and Contributors + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package de.splatgames.aether.datafixers.functional.stress.util; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.time.Duration; +import java.util.concurrent.atomic.AtomicLong; +import java.util.concurrent.atomic.LongAdder; + +/** + * Collects and reports metrics during stress tests. + * + *

Thread-safe metrics collector that tracks: + *

+ */ +public final class StressTestMetrics { + + private static final Logger LOGGER = LoggerFactory.getLogger(StressTestMetrics.class); + + private final LongAdder totalOperations = new LongAdder(); + private final LongAdder failedOperations = new LongAdder(); + private final LongAdder totalLatencyNanos = new LongAdder(); + private final AtomicLong minLatencyNanos = new AtomicLong(Long.MAX_VALUE); + private final AtomicLong maxLatencyNanos = new AtomicLong(0); + + /** + * Records a completed operation. + * + * @param latencyNanos the operation latency in nanoseconds + * @param success true if the operation succeeded, false otherwise + */ + public void recordOperation(long latencyNanos, boolean success) { + totalOperations.increment(); + totalLatencyNanos.add(latencyNanos); + + if (!success) { + failedOperations.increment(); + } + + // Update min/max atomically + updateMin(latencyNanos); + updateMax(latencyNanos); + } + + private void updateMin(long latencyNanos) { + long current; + do { + current = minLatencyNanos.get(); + if (latencyNanos >= current) { + return; + } + } while (!minLatencyNanos.compareAndSet(current, latencyNanos)); + } + + private void updateMax(long latencyNanos) { + long current; + do { + current = maxLatencyNanos.get(); + if (latencyNanos <= current) { + return; + } + } while (!maxLatencyNanos.compareAndSet(current, latencyNanos)); + } + + /** + * Returns the total number of operations. + * + * @return total operation count + */ + public long totalOperations() { + return totalOperations.sum(); + } + + /** + * Returns the number of successful operations. + * + * @return successful operation count + */ + public long successfulOperations() { + return totalOperations.sum() - failedOperations.sum(); + } + + /** + * Returns the number of failed operations. + * + * @return failed operation count + */ + public long failedOperations() { + return failedOperations.sum(); + } + + /** + * Calculates the throughput in operations per second. + * + * @param elapsed the elapsed time + * @return throughput in ops/sec + */ + public double throughputPerSecond(Duration elapsed) { + if (elapsed.isZero()) { + return 0.0; + } + return (double) totalOperations.sum() / elapsed.toSeconds(); + } + + /** + * Returns the average latency in nanoseconds. + * + * @return average latency in nanoseconds, or 0 if no operations recorded + */ + public long averageLatencyNanos() { + long total = totalOperations.sum(); + if (total == 0) { + return 0; + } + return totalLatencyNanos.sum() / total; + } + + /** + * Returns the minimum recorded latency in nanoseconds. + * + * @return minimum latency, or 0 if no operations recorded + */ + public long minLatencyNanos() { + long min = minLatencyNanos.get(); + return min == Long.MAX_VALUE ? 0 : min; + } + + /** + * Returns the maximum recorded latency in nanoseconds. + * + * @return maximum latency + */ + public long maxLatencyNanos() { + return maxLatencyNanos.get(); + } + + /** + * Logs a summary of the collected metrics. + * + * @param testName the name of the test for logging context + * @param elapsed the total elapsed time + */ + public void report(String testName, Duration elapsed) { + LOGGER.info("=== Stress Test Report: {} ===", testName); + LOGGER.info("Duration: {} ms", elapsed.toMillis()); + LOGGER.info("Total operations: {}", totalOperations()); + LOGGER.info("Successful: {}", successfulOperations()); + LOGGER.info("Failed: {}", failedOperations()); + LOGGER.info("Throughput: {} ops/sec", String.format("%.2f", throughputPerSecond(elapsed))); + LOGGER.info("Latency (avg): {} us", averageLatencyNanos() / 1000); + LOGGER.info("Latency (min): {} us", minLatencyNanos() / 1000); + LOGGER.info("Latency (max): {} us", maxLatencyNanos() / 1000); + } + + /** + * Resets all metrics to their initial state. + */ + public void reset() { + totalOperations.reset(); + failedOperations.reset(); + totalLatencyNanos.reset(); + minLatencyNanos.set(Long.MAX_VALUE); + maxLatencyNanos.set(0); + } +} diff --git a/aether-datafixers-functional-tests/src/test/java/de/splatgames/aether/datafixers/functional/stress/util/ThreadOrchestrator.java b/aether-datafixers-functional-tests/src/test/java/de/splatgames/aether/datafixers/functional/stress/util/ThreadOrchestrator.java new file mode 100644 index 0000000..d86a0a8 --- /dev/null +++ b/aether-datafixers-functional-tests/src/test/java/de/splatgames/aether/datafixers/functional/stress/util/ThreadOrchestrator.java @@ -0,0 +1,208 @@ +/* + * Copyright (c) 2025 Splatgames.de Software and Contributors + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package de.splatgames.aether.datafixers.functional.stress.util; + +import java.time.Duration; +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; + +/** + * Utility for coordinating concurrent test threads. + * + *

Provides a fluent API for: + *

+ * + *

Example usage: + *

{@code
+ * ThreadOrchestrator orchestrator = ThreadOrchestrator.withThreads(100);
+ * for (int i = 0; i < 100; i++) {
+ *     orchestrator.submit(() -> {
+ *         // Concurrent work
+ *     });
+ * }
+ * orchestrator.startAll();
+ * orchestrator.awaitCompletion(Duration.ofMinutes(5));
+ * assertThat(orchestrator.errors()).isEmpty();
+ * }
+ */ +public final class ThreadOrchestrator implements AutoCloseable { + + private final int threadCount; + private final ExecutorService executor; + private final CountDownLatch startLatch; + private final CountDownLatch completionLatch; + private final List errors; + private final AtomicBoolean started; + private int submittedTasks; + + private ThreadOrchestrator(int threadCount) { + this.threadCount = threadCount; + this.executor = Executors.newFixedThreadPool(threadCount); + this.startLatch = new CountDownLatch(1); + this.completionLatch = new CountDownLatch(threadCount); + this.errors = Collections.synchronizedList(new ArrayList<>()); + this.started = new AtomicBoolean(false); + this.submittedTasks = 0; + } + + /** + * Creates a new orchestrator with the specified thread count. + * + * @param threadCount the number of threads + * @return a new orchestrator + */ + public static ThreadOrchestrator withThreads(int threadCount) { + if (threadCount <= 0) { + throw new IllegalArgumentException("Thread count must be positive: " + threadCount); + } + return new ThreadOrchestrator(threadCount); + } + + /** + * Submits a task to be executed. + * + *

The task will wait at a barrier until {@link #startAll()} is called, + * ensuring all threads begin simultaneously. + * + * @param task the task to execute + * @return this orchestrator for chaining + * @throws IllegalStateException if already started or all tasks submitted + */ + public ThreadOrchestrator submit(Runnable task) { + if (started.get()) { + throw new IllegalStateException("Cannot submit tasks after startAll()"); + } + if (submittedTasks >= threadCount) { + throw new IllegalStateException( + "Cannot submit more than " + threadCount + " tasks" + ); + } + + executor.submit(() -> { + try { + // Wait for all threads to be ready + startLatch.await(); + // Execute the actual work + task.run(); + } catch (Throwable t) { + errors.add(t); + } finally { + completionLatch.countDown(); + } + }); + + submittedTasks++; + return this; + } + + /** + * Submits multiple identical tasks. + * + * @param count the number of tasks to submit + * @param task the task to execute + * @return this orchestrator for chaining + */ + public ThreadOrchestrator submitAll(int count, Runnable task) { + for (int i = 0; i < count; i++) { + submit(task); + } + return this; + } + + /** + * Releases all threads to start executing simultaneously. + * + * @throws IllegalStateException if not all tasks have been submitted + */ + public void startAll() { + if (submittedTasks != threadCount) { + throw new IllegalStateException( + "Expected " + threadCount + " tasks but only " + submittedTasks + " submitted" + ); + } + if (!started.compareAndSet(false, true)) { + throw new IllegalStateException("Already started"); + } + startLatch.countDown(); + } + + /** + * Waits for all tasks to complete. + * + * @param timeout the maximum time to wait + * @return true if all tasks completed within timeout, false if timeout elapsed + * @throws InterruptedException if interrupted while waiting + */ + public boolean awaitCompletion(Duration timeout) throws InterruptedException { + return completionLatch.await(timeout.toMillis(), TimeUnit.MILLISECONDS); + } + + /** + * Returns the list of exceptions thrown by tasks. + * + * @return unmodifiable list of exceptions + */ + public List errors() { + return Collections.unmodifiableList(new ArrayList<>(errors)); + } + + /** + * Returns true if any tasks threw exceptions. + * + * @return true if errors occurred + */ + public boolean hasErrors() { + return !errors.isEmpty(); + } + + /** + * Shuts down the executor service. + */ + public void shutdown() { + executor.shutdown(); + try { + if (!executor.awaitTermination(10, TimeUnit.SECONDS)) { + executor.shutdownNow(); + } + } catch (InterruptedException e) { + executor.shutdownNow(); + Thread.currentThread().interrupt(); + } + } + + @Override + public void close() { + shutdown(); + } +} diff --git a/pom.xml b/pom.xml index ac74cb0..9595a85 100644 --- a/pom.xml +++ b/pom.xml @@ -31,6 +31,11 @@ true + + 100 + 5 + 1000 + 26.0.2-1 2.13.2 @@ -398,6 +403,17 @@ + + + stress + + false + 100 + 5 + 1000 + + + qa From 81da2f005fa8f61e293755b1432d95bb0951dddd Mon Sep 17 00:00:00 2001 From: Erik Date: Mon, 2 Feb 2026 21:53:25 +0100 Subject: [PATCH 2/2] Simplify `randomDelay` implementation by leveraging `ChaosInjector.withRandomDelay`. Update documentation to clarify behavior and configuration options. --- .../functional/chaos/util/FailingDataFix.java | 11 +++++------ 1 file changed, 5 insertions(+), 6 deletions(-) diff --git a/aether-datafixers-functional-tests/src/test/java/de/splatgames/aether/datafixers/functional/chaos/util/FailingDataFix.java b/aether-datafixers-functional-tests/src/test/java/de/splatgames/aether/datafixers/functional/chaos/util/FailingDataFix.java index debc03e..192f4ba 100644 --- a/aether-datafixers-functional-tests/src/test/java/de/splatgames/aether/datafixers/functional/chaos/util/FailingDataFix.java +++ b/aether-datafixers-functional-tests/src/test/java/de/splatgames/aether/datafixers/functional/chaos/util/FailingDataFix.java @@ -192,17 +192,16 @@ public Builder failureProbability(double probability) { /** * Configures the fix to introduce random delays. * + *

Note: This replaces any existing chaos configuration. + * To combine delays with failures, use {@link #chaosInjector(ChaosInjector)} + * with a fully configured injector. + * * @param minMs minimum delay in milliseconds * @param maxMs maximum delay in milliseconds * @return this builder */ public Builder randomDelay(int minMs, int maxMs) { - this.chaosInjector = ChaosInjector.builder() - .delayRange(minMs, maxMs) - .failureProbability( - this.chaosInjector != null ? 0 : 0 // Keep existing probability if set - ) - .build(); + this.chaosInjector = ChaosInjector.withRandomDelay(minMs, maxMs); return this; }