Skip to content

Commit 6cf0014

Browse files
filiphrtijsrademakers
authored andcommitted
Do not delete and create new jobs when unacquiring and make sure that cleaning up execution deletes jobs in bulk without revision check
1 parent 7d5dbeb commit 6cf0014

File tree

8 files changed

+329
-31
lines changed

8 files changed

+329
-31
lines changed
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,191 @@
1+
/* Licensed under the Apache License, Version 2.0 (the "License");
2+
* you may not use this file except in compliance with the License.
3+
* You may obtain a copy of the License at
4+
*
5+
* http://www.apache.org/licenses/LICENSE-2.0
6+
*
7+
* Unless required by applicable law or agreed to in writing, software
8+
* distributed under the License is distributed on an "AS IS" BASIS,
9+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
10+
* See the License for the specific language governing permissions and
11+
* limitations under the License.
12+
*/
13+
package org.flowable.engine.test.bpmn.async;
14+
15+
import static org.assertj.core.api.Assertions.assertThat;
16+
17+
import java.util.Collections;
18+
import java.util.List;
19+
import java.util.concurrent.CountDownLatch;
20+
import java.util.concurrent.TimeUnit;
21+
import java.util.concurrent.atomic.AtomicLong;
22+
23+
import org.flowable.common.engine.api.delegate.event.FlowableEngineEventType;
24+
import org.flowable.common.engine.api.delegate.event.FlowableEntityEvent;
25+
import org.flowable.common.engine.api.delegate.event.FlowableEvent;
26+
import org.flowable.common.engine.api.delegate.event.FlowableEventListener;
27+
import org.flowable.common.engine.impl.interceptor.Command;
28+
import org.flowable.common.engine.impl.interceptor.CommandConfig;
29+
import org.flowable.common.engine.impl.interceptor.CommandExecutor;
30+
import org.flowable.engine.impl.cfg.ProcessEngineConfigurationImpl;
31+
import org.flowable.engine.impl.interceptor.CommandInvoker;
32+
import org.flowable.engine.impl.jobexecutor.AsyncContinuationJobHandler;
33+
import org.flowable.engine.impl.jobexecutor.ParallelMultiInstanceActivityCompletionJobHandler;
34+
import org.flowable.engine.runtime.ProcessInstance;
35+
import org.flowable.engine.test.Deployment;
36+
import org.flowable.engine.test.impl.CustomConfigurationFlowableTestCase;
37+
import org.flowable.job.api.FlowableUnrecoverableJobException;
38+
import org.flowable.job.api.Job;
39+
import org.flowable.job.service.impl.cmd.LockExclusiveJobCmd;
40+
import org.flowable.job.service.impl.persistence.entity.JobEntity;
41+
import org.junit.jupiter.api.Test;
42+
43+
/**
44+
* @author Filip Hrisafov
45+
*/
46+
class ParallelMultiInstanceAsyncNonExclusiveTest extends CustomConfigurationFlowableTestCase {
47+
48+
protected CustomCommandInvoker customCommandInvoker;
49+
protected CustomEventListener customEventListener;
50+
protected CollectingAsyncRunnableExecutionExceptionHandler executionExceptionHandler;
51+
52+
public ParallelMultiInstanceAsyncNonExclusiveTest() {
53+
super("parallelMultiInstanceAsyncNonExclusiveTest");
54+
}
55+
56+
@Override
57+
protected void configureConfiguration(ProcessEngineConfigurationImpl processEngineConfiguration) {
58+
customCommandInvoker = new CustomCommandInvoker();
59+
processEngineConfiguration.setCommandInvoker(customCommandInvoker);
60+
processEngineConfiguration.getAsyncExecutorConfiguration().setGlobalAcquireLockEnabled(true);
61+
executionExceptionHandler = new CollectingAsyncRunnableExecutionExceptionHandler();
62+
processEngineConfiguration.setCustomAsyncRunnableExecutionExceptionHandlers(Collections.singletonList(executionExceptionHandler));
63+
customEventListener = new CustomEventListener();
64+
processEngineConfiguration.setEventListeners(Collections.singletonList(customEventListener));
65+
66+
}
67+
68+
@Test
69+
@Deployment
70+
public void parallelMultiInstanceNonExclusiveJobs() {
71+
// This test is trying to cause an optimistic locking exception when using non-exclusive parallel multi instance jobs.
72+
// This is mimicking the following scenario:
73+
// 4 async jobs complete in the same time, and thus they create 4 parallel-multi-instance-complete exclusive jobs
74+
// 3 of those jobs will fail to get the exclusive lock and unacquire their jobs and 1 will get the lock
75+
// the one that will get the lock will continue to the next step of the process and perform the multi instance cleanup
76+
// the cleanup of the multi instance should not fail.
77+
78+
ProcessInstance processInstance = runtimeService.createProcessInstanceBuilder()
79+
.processDefinitionKey("parallelScriptTask")
80+
.start();
81+
82+
List<Job> jobs = managementService.createJobQuery().list();
83+
assertThat(jobs).hasSize(4);
84+
assertThat(jobs)
85+
.extracting(Job::getJobHandlerType)
86+
.containsOnly(AsyncContinuationJobHandler.TYPE);
87+
customCommandInvoker.lockExclusiveCounter = new AtomicLong(0L);
88+
89+
customCommandInvoker.executeLockReleaseLatch = new CountDownLatch(1);
90+
customEventListener.parallelMultiInstanceCompleteLatch = customCommandInvoker.executeLockReleaseLatch;
91+
92+
customCommandInvoker.executeAsyncRunnableLatch = new CountDownLatch(4);
93+
customEventListener.asyncContinuationLatch = new CountDownLatch(4);
94+
95+
customCommandInvoker.executeLockCountLatch = new CountDownLatch(3);
96+
customEventListener.parallelMultiInstanceWaitCompleteLatch = customCommandInvoker.executeLockCountLatch;
97+
98+
waitForJobExecutorToProcessAllJobs(15_000, 200);
99+
100+
assertThat(executionExceptionHandler.getExceptions()).isEmpty();
101+
assertThat(managementService.createJobQuery().processInstanceId(processInstance.getId()).list()).isEmpty();
102+
assertThat(managementService.createDeadLetterJobQuery().processInstanceId(processInstance.getId()).list()).isEmpty();
103+
}
104+
105+
protected static class CustomCommandInvoker extends CommandInvoker {
106+
107+
protected AtomicLong lockExclusiveCounter = new AtomicLong();
108+
protected CountDownLatch executeLockCountLatch;
109+
protected CountDownLatch executeLockReleaseLatch;
110+
protected CountDownLatch executeAsyncRunnableLatch;
111+
112+
protected CustomCommandInvoker() {
113+
super(((commandContext, runnable) -> runnable.run()), null);
114+
}
115+
116+
@Override
117+
public <T> T execute(CommandConfig config, Command<T> command, CommandExecutor commandExecutor) {
118+
if (command instanceof LockExclusiveJobCmd) {
119+
if (lockExclusiveCounter.incrementAndGet() > 1) {
120+
// We let the first exclusive to run without waiting
121+
// we then wait to complete this transaction until the execute lock exclusive is released
122+
try {
123+
executeLockCountLatch.countDown();
124+
executeLockReleaseLatch.await(4, TimeUnit.SECONDS);
125+
} catch (InterruptedException e) {
126+
Thread.currentThread().interrupt();
127+
throw new RuntimeException(e);
128+
}
129+
}
130+
}
131+
return super.execute(config, command, commandExecutor);
132+
}
133+
}
134+
135+
protected static class CustomEventListener implements FlowableEventListener {
136+
137+
protected CountDownLatch asyncContinuationLatch;
138+
protected CountDownLatch parallelMultiInstanceCompleteLatch;
139+
protected CountDownLatch parallelMultiInstanceWaitCompleteLatch;
140+
141+
@Override
142+
public void onEvent(FlowableEvent event) {
143+
if (FlowableEngineEventType.JOB_EXECUTION_SUCCESS.equals(event.getType()) && event instanceof FlowableEntityEvent) {
144+
JobEntity entity = (JobEntity) ((FlowableEntityEvent) event).getEntity();
145+
String jobHandlerType = entity.getJobHandlerType();
146+
if (AsyncContinuationJobHandler.TYPE.equals(jobHandlerType)) {
147+
// We are going to wait for all the async jobs to complete in the same time
148+
asyncContinuationLatch.countDown();
149+
try {
150+
if (!asyncContinuationLatch.await(4, TimeUnit.SECONDS)) {
151+
throw new FlowableUnrecoverableJobException("asyncContinuationLatch did not reach 0");
152+
}
153+
} catch (InterruptedException e) {
154+
Thread.currentThread().interrupt();
155+
throw new RuntimeException(e);
156+
}
157+
} else if (ParallelMultiInstanceActivityCompletionJobHandler.TYPE.equals(jobHandlerType)) {
158+
// There will be one multi instance complete job, so we count it down to release the rest of the lock exclusive commands
159+
parallelMultiInstanceCompleteLatch.countDown();
160+
161+
try {
162+
// Wait for the rest of the lock exclusive commands to complete before resuming this transaction
163+
if (!parallelMultiInstanceWaitCompleteLatch.await(4, TimeUnit.SECONDS)) {
164+
throw new FlowableUnrecoverableJobException("parallelMultiInstanceWaitLatch did not reach 0");
165+
}
166+
} catch (InterruptedException e) {
167+
Thread.currentThread().interrupt();
168+
throw new RuntimeException(e);
169+
}
170+
}
171+
172+
}
173+
174+
}
175+
176+
@Override
177+
public boolean isFailOnException() {
178+
return true;
179+
}
180+
181+
@Override
182+
public boolean isFireOnTransactionLifecycleEvent() {
183+
return false;
184+
}
185+
186+
@Override
187+
public String getOnTransaction() {
188+
return null;
189+
}
190+
}
191+
}

modules/flowable-engine/src/test/java/org/flowable/engine/test/jobexecutor/ResetExpiredJobsTest.java

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -150,8 +150,11 @@ public void testResetExpiredJobTimeout() {
150150
managementService.executeCommand(new ResetExpiredJobsCmd(jobIds, jobServiceConfiguration.getJobEntityManager(), jobServiceConfiguration));
151151
assertThat(managementService.executeCommand(new FindExpiredJobsCmd(expiredJobsPagesSize, jobServiceConfiguration.getJobEntityManager(), jobServiceConfiguration))).isEmpty();
152152

153-
assertThat(managementService.createJobQuery().jobId(job.getId()).singleResult()).isNull();
154-
assertThat(managementService.createJobQuery().singleResult()).isNotNull();
153+
JobEntity jobAfterExpiry = (JobEntity) managementService.createJobQuery().singleResult();
154+
assertThat(jobAfterExpiry).isNotNull();
155+
assertThat(jobAfterExpiry.getId()).isEqualTo(job.getId());
156+
assertThat(jobAfterExpiry.getLockExpirationTime()).isNull();
157+
assertThat(jobAfterExpiry.getLockOwner()).isNull();
155158
}
156159

157160
@Test
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,91 @@
1+
<?xml version="1.0" encoding="UTF-8"?>
2+
<definitions xmlns="http://www.omg.org/spec/BPMN/20100524/MODEL" xmlns:flowable="http://flowable.org/bpmn"
3+
xmlns:bpmndi="http://www.omg.org/spec/BPMN/20100524/DI" xmlns:omgdc="http://www.omg.org/spec/DD/20100524/DC"
4+
xmlns:omgdi="http://www.omg.org/spec/DD/20100524/DI" xmlns:design="http://flowable.org/design" typeLanguage="http://www.w3.org/2001/XMLSchema"
5+
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
6+
xsi:schemaLocation="http://www.omg.org/spec/BPMN/20100524/MODEL http://www.omg.org/spec/BPMN/2.0/20100501/BPMN20.xsd"
7+
expressionLanguage="http://www.w3.org/1999/XPath" targetNamespace="http://flowable.org/test" design:palette="flowable-engage-process-palette">
8+
<process id="parallelScriptTask" name="Parallel Script Task" isExecutable="true" flowable:candidateStarterGroups="flowableUser">
9+
<extensionElements>
10+
<design:stencilid><![CDATA[BPMNDiagram]]></design:stencilid>
11+
<design:creationdate><![CDATA[2023-11-14T09:15:53.641Z]]></design:creationdate>
12+
<design:modificationdate><![CDATA[2023-11-14T09:16:52.191Z]]></design:modificationdate>
13+
</extensionElements>
14+
<scriptTask id="bpmnTask_1" name="Script task" flowable:async="true" flowable:exclusive="false" scriptFormat="groovy"
15+
flowable:autoStoreVariables="false">
16+
<extensionElements>
17+
<design:stencilid><![CDATA[ScriptTask]]></design:stencilid>
18+
<design:stencilsuperid><![CDATA[Task]]></design:stencilsuperid>
19+
</extensionElements>
20+
<multiInstanceLoopCharacteristics isSequential="false">
21+
<extensionElements></extensionElements>
22+
<loopCardinality>4</loopCardinality>
23+
</multiInstanceLoopCharacteristics>
24+
<script><![CDATA[println "Executing Test"]]></script>
25+
</scriptTask>
26+
<userTask id="bpmnTask_5" name="User task" flowable:assignee="${initiator}" flowable:formFieldValidation="false">
27+
<extensionElements>
28+
<flowable:task-candidates-type><![CDATA[all]]></flowable:task-candidates-type>
29+
<design:stencilid><![CDATA[FormTask]]></design:stencilid>
30+
<design:stencilsuperid><![CDATA[Task]]></design:stencilsuperid>
31+
</extensionElements>
32+
</userTask>
33+
<startEvent id="startnoneevent1" flowable:initiator="initiator" flowable:formFieldValidation="false">
34+
<extensionElements>
35+
<flowable:work-form-field-validation><![CDATA[false]]></flowable:work-form-field-validation>
36+
<design:stencilid><![CDATA[StartNoneEvent]]></design:stencilid>
37+
</extensionElements>
38+
</startEvent>
39+
<endEvent id="bpmnEndEvent_7">
40+
<extensionElements>
41+
<design:stencilid><![CDATA[EndNoneEvent]]></design:stencilid>
42+
</extensionElements>
43+
</endEvent>
44+
<sequenceFlow id="bpmnSequenceFlow_6" sourceRef="bpmnTask_1" targetRef="bpmnTask_5">
45+
<extensionElements>
46+
<design:stencilid><![CDATA[SequenceFlow]]></design:stencilid>
47+
</extensionElements>
48+
</sequenceFlow>
49+
<sequenceFlow id="bpmnSequenceFlow_8" sourceRef="bpmnTask_5" targetRef="bpmnEndEvent_7">
50+
<extensionElements>
51+
<design:stencilid><![CDATA[SequenceFlow]]></design:stencilid>
52+
</extensionElements>
53+
</sequenceFlow>
54+
<sequenceFlow id="bpmnSequenceFlow_2" sourceRef="startnoneevent1" targetRef="bpmnTask_1">
55+
<extensionElements>
56+
<design:stencilid><![CDATA[SequenceFlow]]></design:stencilid>
57+
</extensionElements>
58+
</sequenceFlow>
59+
</process>
60+
<bpmndi:BPMNDiagram id="BPMNDiagram_parallelScriptTask">
61+
<bpmndi:BPMNPlane bpmnElement="parallelScriptTask" id="BPMNPlane_parallelScriptTask">
62+
<bpmndi:BPMNShape bpmnElement="bpmnTask_1" id="BPMNShape_bpmnTask_1">
63+
<omgdc:Bounds height="80.0" width="100.0" x="395.0" y="236.0"></omgdc:Bounds>
64+
</bpmndi:BPMNShape>
65+
<bpmndi:BPMNShape bpmnElement="bpmnTask_5" id="BPMNShape_bpmnTask_5">
66+
<omgdc:Bounds height="80.0" width="100.0" x="545.0" y="236.0"></omgdc:Bounds>
67+
</bpmndi:BPMNShape>
68+
<bpmndi:BPMNShape bpmnElement="startnoneevent1" id="BPMNShape_startnoneevent1">
69+
<omgdc:Bounds height="30.0" width="30.0" x="315.0" y="261.0"></omgdc:Bounds>
70+
</bpmndi:BPMNShape>
71+
<bpmndi:BPMNShape bpmnElement="bpmnEndEvent_7" id="BPMNShape_bpmnEndEvent_7">
72+
<omgdc:Bounds height="28.0" width="28.0" x="695.0" y="262.0"></omgdc:Bounds>
73+
</bpmndi:BPMNShape>
74+
<bpmndi:BPMNEdge bpmnElement="bpmnSequenceFlow_2" id="BPMNEdge_bpmnSequenceFlow_2" flowable:sourceDockerX="15.0" flowable:sourceDockerY="15.0"
75+
flowable:targetDockerX="50.0" flowable:targetDockerY="40.0">
76+
<omgdi:waypoint x="345.0" y="276.0"></omgdi:waypoint>
77+
<omgdi:waypoint x="395.0" y="276.0"></omgdi:waypoint>
78+
</bpmndi:BPMNEdge>
79+
<bpmndi:BPMNEdge bpmnElement="bpmnSequenceFlow_8" id="BPMNEdge_bpmnSequenceFlow_8" flowable:sourceDockerX="50.0" flowable:sourceDockerY="40.0"
80+
flowable:targetDockerX="14.0" flowable:targetDockerY="14.0">
81+
<omgdi:waypoint x="645.0" y="276.0"></omgdi:waypoint>
82+
<omgdi:waypoint x="695.0" y="276.0"></omgdi:waypoint>
83+
</bpmndi:BPMNEdge>
84+
<bpmndi:BPMNEdge bpmnElement="bpmnSequenceFlow_6" id="BPMNEdge_bpmnSequenceFlow_6" flowable:sourceDockerX="50.0" flowable:sourceDockerY="40.0"
85+
flowable:targetDockerX="50.0" flowable:targetDockerY="40.0">
86+
<omgdi:waypoint x="495.0" y="276.0"></omgdi:waypoint>
87+
<omgdi:waypoint x="545.0" y="276.0"></omgdi:waypoint>
88+
</bpmndi:BPMNEdge>
89+
</bpmndi:BPMNPlane>
90+
</bpmndi:BPMNDiagram>
91+
</definitions>

modules/flowable-job-service/src/main/java/org/flowable/job/service/impl/JobServiceImpl.java

Lines changed: 29 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -17,11 +17,14 @@
1717

1818
import org.flowable.common.engine.api.FlowableIllegalArgumentException;
1919
import org.flowable.common.engine.api.delegate.event.FlowableEngineEventType;
20+
import org.flowable.common.engine.api.delegate.event.FlowableEventDispatcher;
21+
import org.flowable.common.engine.impl.persistence.entity.ByteArrayRef;
2022
import org.flowable.job.api.DeadLetterJobQuery;
2123
import org.flowable.job.api.HistoryJobQuery;
2224
import org.flowable.job.api.JobQuery;
2325
import org.flowable.job.api.SuspendedJobQuery;
2426
import org.flowable.job.api.TimerJobQuery;
27+
import org.flowable.job.service.InternalJobManager;
2528
import org.flowable.job.service.JobService;
2629
import org.flowable.job.service.JobServiceConfiguration;
2730
import org.flowable.job.service.event.impl.FlowableJobEventBuilder;
@@ -200,13 +203,30 @@ public void deleteJob(JobEntity job) {
200203
public void deleteJobsByExecutionId(String executionId) {
201204
JobEntityManager jobEntityManager = getJobEntityManager();
202205
Collection<JobEntity> jobsForExecution = jobEntityManager.findJobsByExecutionId(executionId);
206+
if (jobsForExecution.isEmpty()) {
207+
return;
208+
}
209+
210+
InternalJobManager internalJobManager = configuration.getInternalJobManager();
211+
FlowableEventDispatcher eventDispatcher = getEventDispatcher();
212+
boolean eventDispatcherEnabled = eventDispatcher != null && eventDispatcher.isEnabled();
203213
for (JobEntity job : jobsForExecution) {
204-
getJobEntityManager().delete(job);
205-
if (getEventDispatcher() != null && getEventDispatcher().isEnabled()) {
206-
getEventDispatcher().dispatchEvent(FlowableJobEventBuilder.createEntityEvent(
214+
if (internalJobManager != null) {
215+
internalJobManager.handleJobDelete(job);
216+
}
217+
218+
deleteByteArrayRef(job.getExceptionByteArrayRef());
219+
deleteByteArrayRef(job.getCustomValuesByteArrayRef());
220+
221+
if (eventDispatcherEnabled) {
222+
eventDispatcher.dispatchEvent(FlowableJobEventBuilder.createEntityEvent(
223+
FlowableEngineEventType.ENTITY_DELETED, job), configuration.getEngineName());
224+
eventDispatcher.dispatchEvent(FlowableJobEventBuilder.createEntityEvent(
207225
FlowableEngineEventType.JOB_CANCELED, job), configuration.getEngineName());
208226
}
209227
}
228+
229+
jobEntityManager.deleteJobsByExecutionId(executionId);
210230
}
211231

212232
@Override
@@ -235,4 +255,10 @@ public void deleteDeadLetterJobsByExecutionId(String executionId) {
235255
}
236256
}
237257

258+
protected void deleteByteArrayRef(ByteArrayRef jobByteArrayRef) {
259+
if (jobByteArrayRef != null) {
260+
jobByteArrayRef.delete(configuration.getEngineName());
261+
}
262+
}
263+
238264
}

0 commit comments

Comments
 (0)