Skip to content

Commit 6a7df83

Browse files
authored
[Fix][Zeta] Fix task can not end cause by lock metrics failed (apache#7357)
1 parent 7b19df5 commit 6a7df83

File tree

5 files changed

+58
-47
lines changed

5 files changed

+58
-47
lines changed

seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/java/org/apache/seatunnel/engine/e2e/JobExecutionIT.java

Lines changed: 33 additions & 28 deletions
Original file line numberDiff line numberDiff line change
@@ -22,12 +22,15 @@
2222
import org.apache.seatunnel.engine.client.SeaTunnelClient;
2323
import org.apache.seatunnel.engine.client.job.ClientJobExecutionEnvironment;
2424
import org.apache.seatunnel.engine.client.job.ClientJobProxy;
25+
import org.apache.seatunnel.engine.common.Constant;
2526
import org.apache.seatunnel.engine.common.config.ConfigProvider;
2627
import org.apache.seatunnel.engine.common.config.JobConfig;
2728
import org.apache.seatunnel.engine.common.config.SeaTunnelConfig;
2829
import org.apache.seatunnel.engine.core.job.JobResult;
2930
import org.apache.seatunnel.engine.core.job.JobStatus;
3031
import org.apache.seatunnel.engine.server.SeaTunnelServerStarter;
32+
import org.apache.seatunnel.engine.server.execution.TaskLocation;
33+
import org.apache.seatunnel.engine.server.metrics.SeaTunnelMetricsContext;
3134

3235
import org.junit.jupiter.api.AfterEach;
3336
import org.junit.jupiter.api.Assertions;
@@ -36,8 +39,10 @@
3639

3740
import com.hazelcast.client.config.ClientConfig;
3841
import com.hazelcast.instance.impl.HazelcastInstanceImpl;
42+
import com.hazelcast.map.IMap;
3943
import lombok.extern.slf4j.Slf4j;
4044

45+
import java.util.HashMap;
4146
import java.util.concurrent.CompletableFuture;
4247
import java.util.concurrent.ExecutionException;
4348
import java.util.concurrent.TimeUnit;
@@ -73,11 +78,17 @@ public void testSayHello() {
7378

7479
@Test
7580
public void testExecuteJob() throws Exception {
81+
runJobFileWithAssertEndStatus(
82+
"batch_fakesource_to_file.conf", "fake_to_file", JobStatus.FINISHED);
83+
}
84+
85+
private static void runJobFileWithAssertEndStatus(
86+
String confFile, String name, JobStatus finished)
87+
throws ExecutionException, InterruptedException {
7688
Common.setDeployMode(DeployMode.CLIENT);
77-
String filePath = TestUtils.getResource("batch_fakesource_to_file.conf");
89+
String filePath = TestUtils.getResource(confFile);
7890
JobConfig jobConfig = new JobConfig();
79-
jobConfig.setName("fake_to_file");
80-
91+
jobConfig.setName(name);
8192
ClientConfig clientConfig = ConfigProvider.locateAndGetClientConfig();
8293
clientConfig.setClusterName(TestUtils.getClusterName("JobExecutionIT"));
8394
try (SeaTunnelClient engineClient = new SeaTunnelClient(clientConfig)) {
@@ -94,11 +105,25 @@ public void testExecuteJob() throws Exception {
94105
() ->
95106
Assertions.assertTrue(
96107
objectCompletableFuture.isDone()
97-
&& JobStatus.FINISHED.equals(
108+
&& finished.equals(
98109
objectCompletableFuture.get())));
99110
}
100111
}
101112

113+
@Test
114+
public void testExecuteJobWithLockMetrics() throws Exception {
115+
// lock metrics map
116+
IMap<Long, HashMap<TaskLocation, SeaTunnelMetricsContext>> metricsImap =
117+
hazelcastInstance.getMap(Constant.IMAP_RUNNING_JOB_METRICS);
118+
metricsImap.lock(Constant.IMAP_RUNNING_JOB_METRICS_KEY);
119+
try {
120+
runJobFileWithAssertEndStatus(
121+
"batch_fakesource_to_file.conf", "fake_to_file", JobStatus.FINISHED);
122+
} finally {
123+
metricsImap.unlock(Constant.IMAP_RUNNING_JOB_METRICS_KEY);
124+
}
125+
}
126+
102127
@Test
103128
public void cancelJobTest() throws Exception {
104129
Common.setDeployMode(DeployMode.CLIENT);
@@ -229,29 +254,9 @@ void afterClass() {
229254

230255
@Test
231256
public void testLastCheckpointErrorJob() throws Exception {
232-
Common.setDeployMode(DeployMode.CLIENT);
233-
String filePath = TestUtils.getResource("batch_last_checkpoint_error.conf");
234-
JobConfig jobConfig = new JobConfig();
235-
jobConfig.setName("batch_last_checkpoint_error");
236-
237-
ClientConfig clientConfig = ConfigProvider.locateAndGetClientConfig();
238-
clientConfig.setClusterName(TestUtils.getClusterName("JobExecutionIT"));
239-
try (SeaTunnelClient engineClient = new SeaTunnelClient(clientConfig)) {
240-
ClientJobExecutionEnvironment jobExecutionEnv =
241-
engineClient.createExecutionContext(filePath, jobConfig, SEATUNNEL_CONFIG);
242-
243-
final ClientJobProxy clientJobProxy = jobExecutionEnv.execute();
244-
245-
CompletableFuture<JobStatus> objectCompletableFuture =
246-
CompletableFuture.supplyAsync(clientJobProxy::waitForJobComplete);
247-
248-
await().atMost(600000, TimeUnit.MILLISECONDS)
249-
.untilAsserted(
250-
() ->
251-
Assertions.assertTrue(
252-
objectCompletableFuture.isDone()
253-
&& JobStatus.FAILED.equals(
254-
objectCompletableFuture.get())));
255-
}
257+
runJobFileWithAssertEndStatus(
258+
"batch_last_checkpoint_error.conf",
259+
"batch_last_checkpoint_error",
260+
JobStatus.FAILED);
256261
}
257262
}

seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/TaskExecutionService.java

Lines changed: 7 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -968,10 +968,14 @@ void taskDone(Task task) {
968968
cancellationFutures.remove(taskGroupLocation);
969969
try {
970970
cancelAsyncFunction(taskGroupLocation);
971-
} catch (Throwable e) {
972-
throw new RuntimeException(e);
971+
} catch (Throwable t) {
972+
logger.severe("cancel async function failed", t);
973+
}
974+
try {
975+
updateMetricsContextInImap();
976+
} catch (Throwable t) {
977+
logger.severe("update metrics context in imap failed", t);
973978
}
974-
updateMetricsContextInImap();
975979
if (ex == null) {
976980
logger.info(
977981
String.format(

seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/dag/physical/SubPlan.java

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -308,7 +308,14 @@ private void subPlanDone(PipelineStatus pipelineStatus) {
308308
RetryUtils.retryWithException(
309309
() -> {
310310
jobMaster.savePipelineMetricsToHistory(getPipelineLocation());
311-
jobMaster.removeMetricsContext(getPipelineLocation(), pipelineStatus);
311+
try {
312+
jobMaster.removeMetricsContext(getPipelineLocation(), pipelineStatus);
313+
} catch (Throwable e) {
314+
log.error(
315+
"Remove metrics context for pipeline {} failed, with exception: {}",
316+
pipelineFullName,
317+
ExceptionUtils.getMessage(e));
318+
}
312319
notifyCheckpointManagerPipelineEnd(pipelineStatus);
313320
jobMaster.releasePipelineResource(this);
314321
return null;

seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/master/JobMaster.java

Lines changed: 10 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -95,6 +95,7 @@
9595
import java.util.concurrent.ConcurrentHashMap;
9696
import java.util.concurrent.CopyOnWriteArrayList;
9797
import java.util.concurrent.ExecutorService;
98+
import java.util.concurrent.TimeUnit;
9899
import java.util.stream.Collectors;
99100

100101
import static com.hazelcast.jet.impl.util.ExceptionUtil.withTryCatch;
@@ -678,8 +679,13 @@ public void removeMetricsContext(
678679

679680
boolean lockedIMap = false;
680681
try {
681-
metricsImap.lock(Constant.IMAP_RUNNING_JOB_METRICS_KEY);
682-
lockedIMap = true;
682+
lockedIMap =
683+
metricsImap.tryLock(
684+
Constant.IMAP_RUNNING_JOB_METRICS_KEY, 5, TimeUnit.SECONDS);
685+
if (!lockedIMap) {
686+
LOGGER.severe("lock imap failed in update metrics");
687+
return;
688+
}
683689

684690
HashMap<TaskLocation, SeaTunnelMetricsContext> centralMap =
685691
metricsImap.get(Constant.IMAP_RUNNING_JOB_METRICS_KEY);
@@ -697,6 +703,8 @@ public void removeMetricsContext(
697703
collect.forEach(centralMap::remove);
698704
metricsImap.put(Constant.IMAP_RUNNING_JOB_METRICS_KEY, centralMap);
699705
}
706+
} catch (Exception e) {
707+
LOGGER.warning("failed to remove metrics context", e);
700708
} finally {
701709
if (lockedIMap) {
702710
boolean unLockedIMap = false;

seatunnel-engine/seatunnel-engine-server/src/test/java/org/apache/seatunnel/engine/server/TaskExecutionServiceTest.java

Lines changed: 0 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -28,7 +28,6 @@
2828
import org.apache.seatunnel.engine.server.execution.TestTask;
2929

3030
import org.junit.jupiter.api.BeforeAll;
31-
import org.junit.jupiter.api.Disabled;
3231
import org.junit.jupiter.api.RepeatedTest;
3332
import org.junit.jupiter.api.Test;
3433

@@ -65,8 +64,6 @@ public void before() {
6564
}
6665

6766
@Test
68-
@Disabled(
69-
"As we have more and more test cases the test the load of the test container will up, the test case may failed")
7067
public void testCancel() {
7168
TaskExecutionService taskExecutionService = server.getTaskExecutionService();
7269

@@ -92,8 +89,6 @@ public void testCancel() {
9289
}
9390

9491
@Test
95-
@Disabled(
96-
"As we have more and more test cases the test the load of the test container will up, the test case may failed")
9792
public void testCancelBlockTask() throws InterruptedException {
9893
TaskExecutionService taskExecutionService = server.getTaskExecutionService();
9994

@@ -118,8 +113,6 @@ public void testCancelBlockTask() throws InterruptedException {
118113
}
119114

120115
@Test
121-
@Disabled(
122-
"As we have more and more test cases the test the load of the test container will up, the test case may failed")
123116
public void testFinish() {
124117
TaskExecutionService taskExecutionService = server.getTaskExecutionService();
125118

@@ -150,8 +143,6 @@ public void testFinish() {
150143

151144
/** Test task execution time is the same as the timer timeout */
152145
@Test
153-
@Disabled(
154-
"As we have more and more test cases the test the load of the test container will up, the test case may failed")
155146
public void testCriticalCallTime() throws InterruptedException {
156147
AtomicBoolean stopMark = new AtomicBoolean(false);
157148
CopyOnWriteArrayList<Long> stopTime = new CopyOnWriteArrayList<>();
@@ -189,8 +180,6 @@ public void testCriticalCallTime() throws InterruptedException {
189180
}
190181

191182
@Test
192-
@Disabled(
193-
"As we have more and more test cases the test the load of the test container will up, the test case may failed")
194183
public void testThrowException() throws InterruptedException {
195184
TaskExecutionService taskExecutionService = server.getTaskExecutionService();
196185

@@ -264,8 +253,6 @@ public void testThrowException() throws InterruptedException {
264253
}
265254

266255
@RepeatedTest(2)
267-
@Disabled(
268-
"As we have more and more test cases the test the load of the test container will up, the test case may failed")
269256
public void testDelay() throws InterruptedException {
270257

271258
long lowLagSleep = 10;

0 commit comments

Comments
 (0)