Skip to content

Commit

Permalink
IGNITE-21713 Data streamer does not fire all the CQ events (apache#11269
Browse files Browse the repository at this point in the history
)
  • Loading branch information
zstan authored Mar 11, 2024
1 parent e35f91f commit c758187
Show file tree
Hide file tree
Showing 3 changed files with 92 additions and 17 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -1212,10 +1212,10 @@ private String taskName() {
else if (initUpdCntrs != null)
partCntrs = initUpdCntrs.get(partId);

rec = new CacheContinuousQueryPartitionRecovery(ctx.log(CU.CONTINUOUS_QRY_LOG_CATEGORY), topVer,
partCntrs != null ? partCntrs.get2() : null);

CacheContinuousQueryPartitionRecovery oldRec = rcvs.putIfAbsent(partId, rec);
T2<Long, Long> partCntrs0 = partCntrs;
CacheContinuousQueryPartitionRecovery oldRec = rcvs.computeIfAbsent(partId, k ->
new CacheContinuousQueryPartitionRecovery(ctx.log(CU.CONTINUOUS_QRY_LOG_CATEGORY), topVer,
partCntrs0 != null ? partCntrs0.get2() : null));

if (oldRec != null)
rec = oldRec;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -891,15 +891,6 @@ public IgniteInternalFuture<UUID> startRoutine(GridContinuousHandler hnd,
// Whether local node is included in routine.
boolean locIncluded = prjPred == null || prjPred.apply(ctx.discovery().localNode());

AbstractContinuousMessage msg;

try {
msg = createStartMessage(routineId, hnd, bufSize, interval, autoUnsubscribe, prjPred);
}
catch (IgniteCheckedException e) {
return new GridFinishedFuture<>(e);
}

// Register per-routine notifications listener if ordered messaging is used.
registerMessageListener(hnd);

Expand All @@ -922,6 +913,8 @@ public IgniteInternalFuture<UUID> startRoutine(GridContinuousHandler hnd,
true);
}

AbstractContinuousMessage msg = createStartMessage(routineId, hnd, bufSize, interval, autoUnsubscribe, prjPred);

ctx.discovery().sendCustomEvent(msg);
}
catch (IgniteCheckedException e) {
Expand Down Expand Up @@ -1001,10 +994,15 @@ private AbstractContinuousMessage createStartMessage(UUID routineId,
reqData.p2pMarshal(marsh);
}

return new StartRoutineDiscoveryMessage(
routineId,
reqData,
reqData.handler().keepBinary());
StartRoutineDiscoveryMessage msg = new StartRoutineDiscoveryMessage(
routineId,
reqData,
reqData.handler().keepBinary());

if (hnd.updateCounters() != null)
msg.addUpdateCounters(ctx.localNodeId(), hnd.updateCounters());

return msg;
}
else {
assert discoProtoVer == 2 : discoProtoVer;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,9 +31,11 @@
import javax.cache.event.CacheEntryUpdatedListener;
import javax.cache.integration.CacheWriterException;
import org.apache.ignite.IgniteCache;
import org.apache.ignite.IgniteDataStreamer;
import org.apache.ignite.cache.CacheAtomicityMode;
import org.apache.ignite.cache.CacheEntryEventSerializableFilter;
import org.apache.ignite.cache.CacheMode;
import org.apache.ignite.cache.affinity.rendezvous.RendezvousAffinityFunction;
import org.apache.ignite.cache.query.CacheQueryEntryEvent;
import org.apache.ignite.cache.query.ContinuousQuery;
import org.apache.ignite.cache.query.QueryCursor;
Expand All @@ -42,6 +44,9 @@
import org.apache.ignite.configuration.CacheConfiguration;
import org.apache.ignite.configuration.IgniteConfiguration;
import org.apache.ignite.configuration.NearCacheConfiguration;
import org.apache.ignite.events.CacheEvent;
import org.apache.ignite.events.EventType;
import org.apache.ignite.internal.IgniteInterruptedCheckedException;
import org.apache.ignite.internal.util.typedef.PA;
import org.apache.ignite.internal.util.typedef.T2;
import org.apache.ignite.lang.IgniteBiInClosure;
Expand All @@ -56,6 +61,7 @@
import static org.apache.ignite.cache.CacheAtomicityMode.TRANSACTIONAL;
import static org.apache.ignite.cache.CacheRebalanceMode.ASYNC;
import static org.apache.ignite.cache.CacheWriteSynchronizationMode.FULL_SYNC;
import static org.apache.ignite.testframework.GridTestUtils.waitForCondition;

/**
* Continuous queries counter tests.
Expand All @@ -76,6 +82,8 @@ public abstract class CacheContinuousQueryCounterAbstractTest extends GridCommon

((TcpCommunicationSpi)cfg.getCommunicationSpi()).setSharedMemoryPort(-1);

cfg.setDataStreamerThreadPoolSize(2);

return cfg;
}

Expand Down Expand Up @@ -227,6 +235,75 @@ public void testAllEntries() throws Exception {
}
}

/**
* The main idea of the test is to emulate entries reordering after update counter is already defined.
* Thus we can obtain situation when entries from equal partition already obtained update counter but finally registered in different
* order due to some pauses in data streamer striped pool threads. Sprecial latch on event is for race emulation.
* This test use assumption that {@link org.apache.ignite.internal.processors.cache.GridCacheMapEntry#innerSet} raises
* {@code EVT_CACHE_OBJECT_PUT} after update counter was invoked.
*/
@Test
public void testDataStreamerItemsReordered() throws IgniteInterruptedCheckedException {
AtomicInteger partitionWithSlowThread = new AtomicInteger(-1);
CountDownLatch partLatch = new CountDownLatch(1);

CacheConfiguration cacheCfg = new CacheConfiguration("ds-cq-test");
cacheCfg.setAffinity(new RendezvousAffinityFunction(false, 2));
IgniteCache<Integer, Integer> cache = grid(0).getOrCreateCache(cacheCfg);

grid(0).events().enableLocal(EventType.EVT_CACHE_OBJECT_PUT);

grid(0).events().localListen(e -> {
CacheEvent ce = (CacheEvent)e;
if (partitionWithSlowThread.compareAndSet(-1, ce.partition())) {
try {
partLatch.await();
}
catch (InterruptedException ex) {
throw new RuntimeException(ex);
}
}

if (partitionWithSlowThread.get() == ce.partition()) {
partLatch.countDown();
}

return true;
}, EventType.EVT_CACHE_OBJECT_PUT);

ContinuousQuery<Integer, Integer> qry = new ContinuousQuery<>();

ConcurrentHashMap<Integer, Integer> itemsHolder = new ConcurrentHashMap<>();

qry.setLocalListener(events -> {
for (CacheEntryEvent<? extends Integer, ? extends Integer> evt : events) {
itemsHolder.put(evt.getKey(), evt.getValue());
}
});

cache.query(qry);

int itemsToProc = gridCount() * 5000;

try (IgniteDataStreamer<Integer, Integer> stmr = grid(0).dataStreamer("ds-cq-test")) {
stmr.allowOverwrite(true);
stmr.perNodeBufferSize(1024);
stmr.autoFlushFrequency(500);

// Stream entries.
for (int i = 0; i < itemsToProc; i++) {
stmr.addData(i, i);

if (i == 1024)
stmr.tryFlush();
}

stmr.flush();
}

assertTrue(waitForCondition(() -> itemsToProc == itemsHolder.size(), 2000));
}

/**
* @throws Exception If failed.
*/
Expand Down

0 comments on commit c758187

Please sign in to comment.