Skip to content

Commit 064321d

Browse files
committed
Merge branch 'master' of https://github.com/dromara/dynamic-tp
2 parents bd4525c + 87fa39c commit 064321d

File tree

15 files changed

+831
-29
lines changed

15 files changed

+831
-29
lines changed

core/src/main/java/org/dromara/dynamictp/core/aware/TaskEnhanceAware.java

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -35,19 +35,19 @@ public interface TaskEnhanceAware extends DtpAware {
3535
/**
3636
* Enhance task
3737
*
38-
* @param command command
38+
* @param command command
3939
* @param taskWrappers task wrappers
4040
* @return enhanced task
4141
*/
4242
default Runnable getEnhancedTask(Runnable command, List<TaskWrapper> taskWrappers) {
43+
Runnable wrapRunnable = command;
4344
if (CollectionUtils.isNotEmpty(taskWrappers)) {
4445
for (TaskWrapper t : taskWrappers) {
45-
command = t.wrap(command);
46+
wrapRunnable = t.wrap(wrapRunnable);
4647
}
4748
}
48-
String taskName = (command instanceof NamedRunnable) ? ((NamedRunnable) command).getName() : null;
49-
command = new DtpRunnable(command, taskName);
50-
return command;
49+
String taskName = (wrapRunnable instanceof NamedRunnable) ? ((NamedRunnable) wrapRunnable).getName() : null;
50+
return new DtpRunnable(command, wrapRunnable, taskName);
5151
}
5252

5353
/**

core/src/main/java/org/dromara/dynamictp/core/executor/ExecutorType.java

Lines changed: 21 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@
1919

2020
import org.dromara.dynamictp.core.executor.eager.EagerDtpExecutor;
2121
import lombok.Getter;
22+
import org.dromara.dynamictp.core.executor.priority.PriorityDtpExecutor;
2223

2324
/**
2425
* ExecutorType related
@@ -30,12 +31,29 @@
3031
public enum ExecutorType {
3132

3233
/**
33-
* Executor type.
34+
* Common executor type.
3435
*/
3536
COMMON("common", DtpExecutor.class),
37+
38+
/**
39+
* Eager executor type.
40+
*/
3641
EAGER("eager", EagerDtpExecutor.class),
42+
43+
/**
44+
* Scheduled executor type.
45+
*/
3746
SCHEDULED("scheduled", ScheduledDtpExecutor.class),
38-
ORDERED("ordered", OrderedDtpExecutor.class);
47+
48+
/**
49+
* Ordered executor type.
50+
*/
51+
ORDERED("ordered", OrderedDtpExecutor.class),
52+
53+
/**
54+
* Priority executor type.
55+
*/
56+
PRIORITY("priority", PriorityDtpExecutor.class);
3957

4058
private final String name;
4159

@@ -54,4 +72,5 @@ public static Class<?> getClass(String name) {
5472
}
5573
return COMMON.getClazz();
5674
}
75+
5776
}
Lines changed: 28 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,28 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
package org.dromara.dynamictp.core.executor.priority;
18+
19+
/**
20+
* Priority related
21+
*
22+
* @author <a href = "mailto:kamtohung@gmail.com">KamTo Hung</a>
23+
*/
24+
public interface Priority {
25+
26+
int getPriority();
27+
28+
}
Lines changed: 49 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,49 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
package org.dromara.dynamictp.core.executor.priority;
18+
19+
import lombok.Getter;
20+
21+
import java.util.concurrent.Callable;
22+
23+
/**
24+
* PriorityCallable related
25+
*
26+
* @author <a href = "mailto:kamtohung@gmail.com">KamTo Hung</a>
27+
*/
28+
public class PriorityCallable<V> implements Priority, Callable<V> {
29+
30+
private final Callable<V> callable;
31+
32+
@Getter
33+
private final int priority;
34+
35+
private PriorityCallable(Callable<V> callable, int priority) {
36+
this.callable = callable;
37+
this.priority = priority;
38+
}
39+
40+
public static <T> Callable<T> of(Callable<T> task, int i) {
41+
return new PriorityCallable<>(task, i);
42+
}
43+
44+
@Override
45+
public V call() throws Exception {
46+
return callable.call();
47+
}
48+
49+
}
Lines changed: 182 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,182 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
package org.dromara.dynamictp.core.executor.priority;
18+
19+
import lombok.extern.slf4j.Slf4j;
20+
import org.dromara.dynamictp.core.executor.DtpExecutor;
21+
import org.dromara.dynamictp.core.support.task.runnable.DtpRunnable;
22+
23+
import java.util.Comparator;
24+
import java.util.concurrent.Callable;
25+
import java.util.concurrent.Executors;
26+
import java.util.concurrent.Future;
27+
import java.util.concurrent.PriorityBlockingQueue;
28+
import java.util.concurrent.RejectedExecutionHandler;
29+
import java.util.concurrent.RunnableFuture;
30+
import java.util.concurrent.ThreadFactory;
31+
import java.util.concurrent.TimeUnit;
32+
33+
/**
34+
* PriorityDtpExecutor related, extending DtpExecutor, implements priority feature
35+
*
36+
* @author <a href = "mailto:kamtohung@gmail.com">KamTo Hung</a>
37+
*/
38+
@Slf4j
39+
public class PriorityDtpExecutor extends DtpExecutor {
40+
41+
/**
42+
* The default priority.
43+
*/
44+
private static final int DEFAULT_PRIORITY = 0;
45+
46+
public PriorityDtpExecutor(int corePoolSize,
47+
int maximumPoolSize,
48+
long keepAliveTime,
49+
TimeUnit unit,
50+
int capacity) {
51+
this(corePoolSize, maximumPoolSize, keepAliveTime, unit, new PriorityBlockingQueue<>(capacity), Executors.defaultThreadFactory(), new AbortPolicy());
52+
}
53+
54+
public PriorityDtpExecutor(int corePoolSize,
55+
int maximumPoolSize,
56+
long keepAliveTime,
57+
TimeUnit unit,
58+
int capacity,
59+
ThreadFactory threadFactory) {
60+
this(corePoolSize, maximumPoolSize, keepAliveTime, unit, new PriorityBlockingQueue<>(capacity), threadFactory, new AbortPolicy());
61+
}
62+
63+
public PriorityDtpExecutor(int corePoolSize,
64+
int maximumPoolSize,
65+
long keepAliveTime,
66+
TimeUnit unit,
67+
int capacity,
68+
RejectedExecutionHandler handler) {
69+
this(corePoolSize, maximumPoolSize, keepAliveTime, unit, new PriorityBlockingQueue<>(capacity), Executors.defaultThreadFactory(), handler);
70+
}
71+
72+
public PriorityDtpExecutor(int corePoolSize,
73+
int maximumPoolSize,
74+
long keepAliveTime,
75+
TimeUnit unit,
76+
int capacity,
77+
ThreadFactory threadFactory,
78+
RejectedExecutionHandler handler) {
79+
this(corePoolSize, maximumPoolSize, keepAliveTime, unit, new PriorityBlockingQueue<>(capacity), threadFactory, handler);
80+
}
81+
82+
83+
public PriorityDtpExecutor(int corePoolSize,
84+
int maximumPoolSize,
85+
long keepAliveTime,
86+
TimeUnit unit,
87+
PriorityBlockingQueue<Runnable> workQueue) {
88+
super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, Executors.defaultThreadFactory(), new AbortPolicy());
89+
}
90+
91+
public PriorityDtpExecutor(int corePoolSize,
92+
int maximumPoolSize,
93+
long keepAliveTime,
94+
TimeUnit unit,
95+
PriorityBlockingQueue<Runnable> workQueue,
96+
ThreadFactory threadFactory) {
97+
this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, threadFactory, new AbortPolicy());
98+
}
99+
100+
public PriorityDtpExecutor(int corePoolSize,
101+
int maximumPoolSize,
102+
long keepAliveTime,
103+
TimeUnit unit,
104+
PriorityBlockingQueue<Runnable> workQueue,
105+
RejectedExecutionHandler handler) {
106+
this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, Executors.defaultThreadFactory(), handler);
107+
}
108+
109+
public PriorityDtpExecutor(int corePoolSize,
110+
int maximumPoolSize,
111+
long keepAliveTime,
112+
TimeUnit unit,
113+
PriorityBlockingQueue<Runnable> workQueue,
114+
ThreadFactory threadFactory,
115+
RejectedExecutionHandler handler) {
116+
super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, threadFactory, handler);
117+
}
118+
119+
120+
@Override
121+
protected <T> RunnableFuture<T> newTaskFor(Runnable runnable, T value) {
122+
return new PriorityFutureTask<>(runnable, value);
123+
}
124+
125+
@Override
126+
protected <T> RunnableFuture<T> newTaskFor(Callable<T> callable) {
127+
return new PriorityFutureTask<>(callable);
128+
}
129+
130+
public void execute(Runnable command, int priority) {
131+
super.execute(PriorityRunnable.of(command, priority));
132+
}
133+
134+
@Override
135+
public Future<?> submit(Runnable task) {
136+
return super.submit(PriorityRunnable.of(task, DEFAULT_PRIORITY));
137+
}
138+
139+
public Future<?> submit(Runnable task, int priority) {
140+
return super.submit(PriorityRunnable.of(task, priority));
141+
}
142+
143+
@Override
144+
public <T> Future<T> submit(Runnable task, T result) {
145+
return super.submit(PriorityRunnable.of(task, DEFAULT_PRIORITY), result);
146+
}
147+
148+
public <T> Future<T> submit(Runnable task, T result, int priority) {
149+
return super.submit(PriorityRunnable.of(task, priority), result);
150+
}
151+
152+
153+
@Override
154+
public <T> Future<T> submit(Callable<T> task) {
155+
return super.submit(PriorityCallable.of(task, DEFAULT_PRIORITY));
156+
}
157+
158+
public <T> Future<T> submit(Callable<T> task, int priority) {
159+
return super.submit(PriorityCallable.of(task, priority));
160+
}
161+
162+
163+
/**
164+
* Priority Comparator
165+
*
166+
* @return Comparator
167+
*/
168+
public static Comparator<Runnable> getRunnableComparator() {
169+
return (o1, o2) -> {
170+
if (!(o1 instanceof DtpRunnable) || !(o2 instanceof DtpRunnable)) {
171+
return 0;
172+
}
173+
Runnable po1 = ((DtpRunnable) o1).getOriginRunnable();
174+
Runnable po2 = ((DtpRunnable) o2).getOriginRunnable();
175+
if (po1 instanceof Priority && po2 instanceof Priority) {
176+
return Integer.compare(((Priority) po1).getPriority(), ((Priority) po2).getPriority());
177+
}
178+
return 0;
179+
};
180+
}
181+
182+
}
Lines changed: 54 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,54 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
package org.dromara.dynamictp.core.executor.priority;
18+
19+
20+
import java.util.concurrent.Callable;
21+
import java.util.concurrent.FutureTask;
22+
23+
/**
24+
* PriorityFutureTask related
25+
*
26+
* @author <a href = "mailto:kamtohung@gmail.com">KamTo Hung</a>
27+
*/
28+
public class PriorityFutureTask<V> extends FutureTask<V> implements Priority {
29+
30+
/**
31+
* The runnable.
32+
*/
33+
private final Priority obj;
34+
35+
private final int priority;
36+
37+
public PriorityFutureTask(Runnable runnable, V result) {
38+
super(runnable, result);
39+
this.obj = (PriorityRunnable) runnable;
40+
this.priority = this.obj.getPriority();
41+
}
42+
43+
public PriorityFutureTask(Callable<V> callable) {
44+
super(callable);
45+
this.obj = (PriorityCallable<V>) callable;
46+
this.priority = this.obj.getPriority();
47+
}
48+
49+
@Override
50+
public int getPriority() {
51+
return this.priority;
52+
}
53+
54+
}

0 commit comments

Comments
 (0)