Skip to content

Commit d7ff22c

Browse files
authored
GEODE-10267: fix creating gw sender with non-existent disk store (#7643)
* GEODE-10267: fix creating gw sender with non-existent disk store
1 parent a10c4f8 commit d7ff22c

File tree

5 files changed

+92
-5
lines changed

5 files changed

+92
-5
lines changed

geode-wan/src/distributedTest/java/org/apache/geode/cache/CacheXml70GatewayDUnitTest.java

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -125,6 +125,8 @@ public void testGatewayReceiver() throws Exception {
125125
public void testParallelGatewaySender() throws Exception {
126126
getSystem();
127127
CacheCreation cache = new CacheCreation();
128+
DiskStoreFactory dsf = cache.createDiskStoreFactory();
129+
dsf.create("LNSender");
128130

129131
GatewaySenderFactory gatewaySenderFactory = cache.createGatewaySenderFactory();
130132
gatewaySenderFactory.setParallel(true);
@@ -163,6 +165,9 @@ public void testParallelGatewaySender() throws Exception {
163165
public void testSerialGatewaySender() throws Exception {
164166
getSystem();
165167
CacheCreation cache = new CacheCreation();
168+
DiskStoreFactory dsf = cache.createDiskStoreFactory();
169+
dsf.create("LNSender");
170+
166171
GatewaySenderFactory gatewaySenderFactory = cache.createGatewaySenderFactory();
167172
gatewaySenderFactory.setParallel(false);
168173
gatewaySenderFactory.setManualStart(true);

geode-wan/src/distributedTest/java/org/apache/geode/cache/CacheXml80GatewayDUnitTest.java

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -105,6 +105,8 @@ public void testAsyncEventQueueWithSubstitutionFilter() throws Exception {
105105
public void testGatewaySenderWithSubstitutionFilter() throws Exception {
106106
getSystem();
107107
CacheCreation cache = new CacheCreation();
108+
DiskStoreFactory dsf = cache.createDiskStoreFactory();
109+
dsf.create(DiskStoreFactory.DEFAULT_DISK_STORE_NAME);
108110

109111
// Create a GatewaySender with GatewayEventSubstitutionFilter.
110112
// Don't start the sender to avoid 'Locators must be configured before starting gateway-sender'
@@ -113,6 +115,7 @@ public void testGatewaySenderWithSubstitutionFilter() throws Exception {
113115
GatewaySenderFactory factory = cache.createGatewaySenderFactory();
114116
factory.setManualStart(true);
115117
factory.setGatewayEventSubstitutionFilter(new MyGatewayEventSubstitutionFilter());
118+
factory.setDiskStoreName(DiskStoreFactory.DEFAULT_DISK_STORE_NAME);
116119
GatewaySender sender = factory.create(id, 2);
117120

118121
// Verify the GatewayEventSubstitutionFilter is set on the GatewaySender.

geode-wan/src/distributedTest/java/org/apache/geode/internal/cache/wan/wancommand/CreateDestroyGatewaySenderCommandDUnitTest.java

Lines changed: 40 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@
1919
import static org.apache.geode.internal.cache.wan.wancommand.WANCommandUtils.verifySenderAttributes;
2020
import static org.apache.geode.internal.cache.wan.wancommand.WANCommandUtils.verifySenderDoesNotExist;
2121
import static org.apache.geode.internal.cache.wan.wancommand.WANCommandUtils.verifySenderState;
22+
import static org.apache.geode.test.dunit.IgnoredException.addIgnoredException;
2223
import static org.assertj.core.api.Assertions.assertThat;
2324

2425
import java.io.Serializable;
@@ -66,6 +67,8 @@ public class CreateDestroyGatewaySenderCommandDUnitTest implements Serializable
6667
private static MemberVM server2;
6768
private static MemberVM server3;
6869

70+
String nonExistingDiskStore = "nonExistingDiskStore";
71+
6972
@BeforeClass
7073
public static void beforeClass() {
7174
Properties props = new Properties();
@@ -436,4 +439,41 @@ public void testCreateDestroyParallelGatewaySenderWithDispatcherThreads() {
436439
server3);
437440
}
438441

442+
@Test
443+
public void testCreateParallelGatewaySenderWithNonExistingDiskStore() {
444+
addIgnoredException(IllegalStateException.class);
445+
446+
String command = CliStrings.CREATE_GATEWAYSENDER + " --" + CliStrings.CREATE_GATEWAYSENDER__ID
447+
+ "=ny" + " --" + CliStrings.CREATE_GATEWAYSENDER__REMOTEDISTRIBUTEDSYSTEMID + "=2" + " --"
448+
+ CliStrings.CREATE_GATEWAYSENDER__PARALLEL + "=true" + " --"
449+
+ CliStrings.CREATE_GATEWAYSENDER__DISKSTORENAME + "=" + nonExistingDiskStore;
450+
451+
gfsh.executeAndAssertThat(command).statusIsError()
452+
.hasTableSection().hasRowSize(3).hasColumn("Message").contains(
453+
" java.lang.IllegalStateException: Disk store " + nonExistingDiskStore + " not found",
454+
" java.lang.IllegalStateException: Disk store " + nonExistingDiskStore + " not found",
455+
" java.lang.IllegalStateException: Disk store " + nonExistingDiskStore + " not found");
456+
457+
gfsh.executeAndAssertThat("list gateways").statusIsSuccess().containsOutput(
458+
"GatewaySenders or GatewayReceivers are not available in cluster");
459+
}
460+
461+
@Test
462+
public void testCreateSerialGatewaySenderWithNonExistingDiskStore() {
463+
addIgnoredException(IllegalStateException.class);
464+
465+
String command = CliStrings.CREATE_GATEWAYSENDER + " --" + CliStrings.CREATE_GATEWAYSENDER__ID
466+
+ "=ny" + " --" + CliStrings.CREATE_GATEWAYSENDER__REMOTEDISTRIBUTEDSYSTEMID + "=2" + " --"
467+
+ CliStrings.CREATE_GATEWAYSENDER__PARALLEL + "=false" + " --"
468+
+ CliStrings.CREATE_GATEWAYSENDER__DISKSTORENAME + "=" + nonExistingDiskStore;
469+
470+
gfsh.executeAndAssertThat(command).statusIsError()
471+
.hasTableSection().hasRowSize(3).hasColumn("Message").contains(
472+
" java.lang.IllegalStateException: Disk store " + nonExistingDiskStore + " not found",
473+
" java.lang.IllegalStateException: Disk store " + nonExistingDiskStore + " not found",
474+
" java.lang.IllegalStateException: Disk store " + nonExistingDiskStore + " not found");
475+
476+
gfsh.executeAndAssertThat("list gateways").statusIsSuccess().containsOutput(
477+
"GatewaySenders or GatewayReceivers are not available in cluster");
478+
}
439479
}

geode-wan/src/integrationTest/java/org/apache/geode/internal/cache/wan/misc/WANConfigurationJUnitTest.java

Lines changed: 37 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -223,6 +223,43 @@ public void test_GatewaySender_Serial_ZERO_DispatcherThread() {
223223
}
224224
}
225225

226+
@Test
227+
public void test_GatewaySender_Serial_NonExistingDiskStore() {
228+
cache = new CacheFactory().set(MCAST_PORT, "0").create();
229+
GatewaySenderFactory fact = cache.createGatewaySenderFactory();
230+
fact.setManualStart(true);
231+
fact.setDiskStoreName("FORNY");
232+
try {
233+
GatewaySender sender1 = fact.create("NYSender", 2);
234+
fail("Expected IllegalStateException but not thrown");
235+
} catch (Exception e) {
236+
if (e instanceof IllegalStateException
237+
&& e.getMessage().contains("Disk store FORNY not found")) {
238+
} else {
239+
fail("Expected IllegalStateException but received :" + e);
240+
}
241+
}
242+
}
243+
244+
@Test
245+
public void test_GatewaySender_Parallel_NonExistingDiskStore() {
246+
cache = new CacheFactory().set(MCAST_PORT, "0").create();
247+
GatewaySenderFactory fact = cache.createGatewaySenderFactory();
248+
fact.setManualStart(true);
249+
fact.setParallel(true);
250+
fact.setDiskStoreName("FORNY");
251+
try {
252+
GatewaySender sender1 = fact.create("NYSender", 2);
253+
fail("Expected IllegalStateException but not thrown");
254+
} catch (Exception e) {
255+
if (e instanceof IllegalStateException
256+
&& e.getMessage().contains("Disk store FORNY not found")) {
257+
} else {
258+
fail("Expected IllegalStateException but received :" + e);
259+
}
260+
}
261+
}
262+
226263
/**
227264
* Test to validate the gateway receiver attributes are correctly set
228265
*/
@@ -303,7 +340,6 @@ public void test_ValidateSerialGatewaySenderAttributes() {
303340
fact.setBatchSize(200);
304341
fact.setBatchTimeInterval(300);
305342
fact.setPersistenceEnabled(false);
306-
fact.setDiskStoreName("FORNY");
307343
fact.setMaximumQueueMemory(200);
308344
fact.setAlertThreshold(1200);
309345
GatewayEventFilter myEventFilter1 = new MyGatewayEventFilter1();
@@ -327,7 +363,6 @@ public void test_ValidateSerialGatewaySenderAttributes() {
327363
assertEquals(sender1.getBatchSize(), gatewaySender.getBatchSize());
328364
assertEquals(sender1.getBatchTimeInterval(), gatewaySender.getBatchTimeInterval());
329365
assertEquals(sender1.isPersistenceEnabled(), gatewaySender.isPersistenceEnabled());
330-
assertEquals(sender1.getDiskStoreName(), gatewaySender.getDiskStoreName());
331366
assertEquals(sender1.getMaximumQueueMemory(), gatewaySender.getMaximumQueueMemory());
332367
assertEquals(sender1.getAlertThreshold(), gatewaySender.getAlertThreshold());
333368
assertEquals(sender1.getGatewayEventFilters().size(),
@@ -350,7 +385,6 @@ public void test_ValidateParallelGatewaySenderAttributes() {
350385
fact.setBatchSize(200);
351386
fact.setBatchTimeInterval(300);
352387
fact.setPersistenceEnabled(false);
353-
fact.setDiskStoreName("FORNY");
354388
fact.setMaximumQueueMemory(200);
355389
fact.setAlertThreshold(1200);
356390
GatewayEventFilter myEventFilter1 = new MyGatewayEventFilter1();
@@ -374,7 +408,6 @@ public void test_ValidateParallelGatewaySenderAttributes() {
374408
assertEquals(sender1.getBatchSize(), gatewaySender.getBatchSize());
375409
assertEquals(sender1.getBatchTimeInterval(), gatewaySender.getBatchTimeInterval());
376410
assertEquals(sender1.isPersistenceEnabled(), gatewaySender.isPersistenceEnabled());
377-
assertEquals(sender1.getDiskStoreName(), gatewaySender.getDiskStoreName());
378411
assertEquals(sender1.getMaximumQueueMemory(), gatewaySender.getMaximumQueueMemory());
379412
assertEquals(sender1.getAlertThreshold(), gatewaySender.getAlertThreshold());
380413
assertEquals(sender1.getGatewayEventFilters().size(),

geode-wan/src/main/java/org/apache/geode/cache/wan/internal/GatewaySenderFactoryImpl.java

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -239,7 +239,6 @@ public GatewaySender create(String id, int remoteDSId) {
239239
String.format("GatewaySender %s can not be created with dispatcher threads less than 1",
240240
id));
241241
}
242-
243242
// Verify socket read timeout if a proper logger is available
244243
if (cache instanceof GemFireCacheImpl) {
245244
// If socket read timeout is less than the minimum, log a warning.
@@ -254,6 +253,13 @@ public GatewaySender create(String id, int remoteDSId) {
254253
attrs.setSocketReadTimeout(GatewaySender.MINIMUM_SOCKET_READ_TIMEOUT);
255254
}
256255

256+
if (attrs.getDiskStoreName() != null
257+
&& cache.findDiskStore(attrs.getDiskStoreName()) == null) {
258+
throw new IllegalStateException(
259+
String.format("Disk store %s not found",
260+
attrs.getDiskStoreName()));
261+
}
262+
257263
// Log a warning if the old system property is set.
258264
if (GATEWAY_CONNECTION_READ_TIMEOUT_PROPERTY_CHECKED.compareAndSet(false, true)) {
259265
if (System.getProperty(GatewaySender.GATEWAY_CONNECTION_READ_TIMEOUT_PROPERTY) != null) {

0 commit comments

Comments
 (0)