|
17 | 17 |
|
18 | 18 | import java.time.Instant; |
19 | 19 | import java.util.ArrayList; |
| 20 | +import java.util.LinkedList; |
20 | 21 | import java.util.List; |
21 | 22 | import java.util.concurrent.*; |
22 | 23 | import java.util.concurrent.atomic.AtomicBoolean; |
@@ -1437,11 +1438,8 @@ public void recoveryResetsToFirstAvailableSynchronizer() throws Exception { |
1437 | 1438 | Future<Void> startFuture = dataSource.start(); |
1438 | 1439 | startFuture.get(2, TimeUnit.SECONDS); |
1439 | 1440 |
|
1440 | | - // Wait for recovery timeout to trigger by waiting for multiple synchronizer calls |
1441 | | - // Recovery brings us back to first, so we should see multiple calls eventually |
1442 | | - for (int i = 0; i < 3; i++) { |
1443 | | - sink.awaitApplyCount(i + 1, 5, TimeUnit.SECONDS); |
1444 | | - } |
| 1441 | + // Wait for 3 applies with enough time for recovery (2s) + overhead |
| 1442 | + sink.awaitApplyCount(3, 5, TimeUnit.SECONDS); |
1445 | 1443 |
|
1446 | 1444 | // Should have called first synchronizer again after recovery |
1447 | 1445 | assertTrue(firstCallCount.get() >= 2 || secondCallCount.get() >= 1); |
@@ -1732,73 +1730,45 @@ public void multipleChangeSetsAppliedInOrder() throws Exception { |
1732 | 1730 | assertEquals(3, sink.getApplyCount()); |
1733 | 1731 | } |
1734 | 1732 |
|
1735 | | - @Test |
1736 | | - public void selectorEmptyStillCompletesIfAnyDataReceived() throws Exception { |
1737 | | - executor = Executors.newScheduledThreadPool(2); |
1738 | | - MockDataSourceUpdateSink sink = new MockDataSourceUpdateSink(); |
1739 | | - |
1740 | | - CompletableFuture<FDv2SourceResult> initializerFuture = CompletableFuture.completedFuture( |
1741 | | - FDv2SourceResult.changeSet(makeChangeSet(false), false) |
1742 | | - ); |
1743 | | - |
1744 | | - ImmutableList<FDv2DataSource.DataSourceFactory<Initializer>> initializers = ImmutableList.of( |
1745 | | - () -> new MockInitializer(initializerFuture) |
1746 | | - ); |
1747 | | - |
1748 | | - CompletableFuture<FDv2SourceResult> synchronizerFuture = CompletableFuture.completedFuture( |
1749 | | - FDv2SourceResult.changeSet(makeChangeSet(false), false) |
1750 | | - ); |
1751 | | - |
1752 | | - ImmutableList<FDv2DataSource.DataSourceFactory<Synchronizer>> synchronizers = ImmutableList.of( |
1753 | | - () -> new MockSynchronizer(synchronizerFuture) |
1754 | | - ); |
1755 | | - |
1756 | | - FDv2DataSource dataSource = new FDv2DataSource(initializers, synchronizers, sink, Thread.NORM_PRIORITY, logger, executor, 120, 300); |
1757 | | - resourcesToClose.add(dataSource); |
1758 | | - |
1759 | | - Future<Void> startFuture = dataSource.start(); |
1760 | | - startFuture.get(2, TimeUnit.SECONDS); |
1761 | | - |
1762 | | - assertTrue(dataSource.isInitialized()); |
1763 | | - |
1764 | | - // Wait for the synchronizer to also run |
1765 | | - sink.awaitApplyCount(2, 2, TimeUnit.SECONDS); |
1766 | | - assertEquals(2, sink.getApplyCount()); // Both initializer and synchronizer |
1767 | | - } |
1768 | | - |
1769 | 1733 | @Test |
1770 | 1734 | public void selectorNonEmptyCompletesInitialization() throws Exception { |
1771 | 1735 | executor = Executors.newScheduledThreadPool(2); |
1772 | 1736 | MockDataSourceUpdateSink sink = new MockDataSourceUpdateSink(); |
1773 | 1737 |
|
1774 | | - CompletableFuture<FDv2SourceResult> initializerFuture = CompletableFuture.completedFuture( |
| 1738 | + CompletableFuture<FDv2SourceResult> firstInitializerFuture = CompletableFuture.completedFuture( |
1775 | 1739 | FDv2SourceResult.changeSet(makeChangeSet(true), false) |
1776 | 1740 | ); |
1777 | 1741 |
|
1778 | | - AtomicBoolean synchronizerCalled = new AtomicBoolean(false); |
| 1742 | + BlockingQueue<Boolean> secondInitializerCalledQueue = new LinkedBlockingQueue<>(); |
1779 | 1743 |
|
1780 | 1744 | ImmutableList<FDv2DataSource.DataSourceFactory<Initializer>> initializers = ImmutableList.of( |
1781 | | - () -> new MockInitializer(initializerFuture) |
1782 | | - ); |
1783 | | - |
1784 | | - ImmutableList<FDv2DataSource.DataSourceFactory<Synchronizer>> synchronizers = ImmutableList.of( |
| 1745 | + () -> new MockInitializer(firstInitializerFuture), |
1785 | 1746 | () -> { |
1786 | | - synchronizerCalled.set(true); |
1787 | | - return new MockSynchronizer(CompletableFuture.completedFuture( |
| 1747 | + secondInitializerCalledQueue.offer(true); |
| 1748 | + return new MockInitializer(CompletableFuture.completedFuture( |
1788 | 1749 | FDv2SourceResult.changeSet(makeChangeSet(false), false) |
1789 | 1750 | )); |
1790 | 1751 | } |
1791 | 1752 | ); |
1792 | 1753 |
|
| 1754 | + ImmutableList<FDv2DataSource.DataSourceFactory<Synchronizer>> synchronizers = ImmutableList.of( |
| 1755 | + () -> new MockSynchronizer(CompletableFuture.completedFuture( |
| 1756 | + FDv2SourceResult.changeSet(makeChangeSet(false), false) |
| 1757 | + )) |
| 1758 | + ); |
| 1759 | + |
1793 | 1760 | FDv2DataSource dataSource = new FDv2DataSource(initializers, synchronizers, sink, Thread.NORM_PRIORITY, logger, executor, 120, 300); |
1794 | 1761 | resourcesToClose.add(dataSource); |
1795 | 1762 |
|
1796 | 1763 | Future<Void> startFuture = dataSource.start(); |
1797 | 1764 | startFuture.get(2, TimeUnit.SECONDS); |
1798 | 1765 |
|
1799 | 1766 | assertTrue(dataSource.isInitialized()); |
1800 | | - assertFalse(synchronizerCalled.get()); // Should not proceed to synchronizers |
1801 | 1767 | assertEquals(1, sink.getApplyCount()); |
| 1768 | + |
| 1769 | + // Second initializer should not be called since first had non-empty selector |
| 1770 | + Boolean secondInitializerCalled = secondInitializerCalledQueue.poll(500, TimeUnit.MILLISECONDS); |
| 1771 | + assertNull("Second initializer should not be called when first returns non-empty selector", secondInitializerCalled); |
1802 | 1772 | } |
1803 | 1773 |
|
1804 | 1774 | @Test |
|
0 commit comments