Skip to content

Commit 3228274

Browse files
committed
Add time unit
1 parent bc12e5a commit 3228274

File tree

7 files changed

+173
-55
lines changed

7 files changed

+173
-55
lines changed

tang-commons/src/main/kotlin/com/tang/commons/utils/queue/bootstrap/QueueBootstrap.kt

Lines changed: 21 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -13,7 +13,24 @@ import java.util.concurrent.TimeUnit
1313
*
1414
* @author Tang
1515
*/
16-
class QueueBootstrap {
16+
class QueueBootstrap(
17+
18+
/**
19+
* 槽位数量
20+
*/
21+
private val ticksPerWheel: Int = (1 shl 9), // aka 512
22+
23+
/**
24+
* 槽位时间间隔
25+
*/
26+
private val tickDuration: Long = 100,
27+
28+
/**
29+
* 时间单位
30+
*/
31+
private val unit: TimeUnit = TimeUnit.MILLISECONDS
32+
33+
) {
1734

1835
companion object {
1936
private val LOGGER: Logger = LogUtils.getLogger()
@@ -31,9 +48,9 @@ class QueueBootstrap {
3148
*/
3249
fun start(): WheelQueue {
3350
LOGGER.info("Timer wheel queue scanner starting...")
34-
val wheelQueue = WheelQueue(1 shl 10) // aka 1024
35-
val timerTask = QueueScanTimer(wheelQueue)
36-
newScheduledThreadPool.scheduleWithFixedDelay(timerTask, 0, 100, TimeUnit.MILLISECONDS)
51+
val wheelQueue = WheelQueue(ticksPerWheel, tickDuration, unit)
52+
val timerTask = QueueScanTimer(wheelQueue, ticksPerWheel, tickDuration, unit)
53+
newScheduledThreadPool.scheduleWithFixedDelay(timerTask, 0, tickDuration, unit)
3754
LOGGER.info("Timer wheel queue scanner start up.")
3855
return wheelQueue
3956
}

tang-commons/src/main/kotlin/com/tang/commons/utils/queue/bootstrap/QueueScanTimer.kt

Lines changed: 23 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,6 @@ import com.tang.commons.utils.queue.factory.QueueDefaultThreadFactory
55
import com.tang.commons.utils.queue.wheel.SlotTask
66
import com.tang.commons.utils.queue.wheel.WheelQueue
77
import org.slf4j.Logger
8-
import java.time.LocalDateTime
98
import java.util.TimerTask
109
import java.util.concurrent.LinkedBlockingQueue
1110
import java.util.concurrent.ThreadFactory
@@ -22,16 +21,33 @@ class QueueScanTimer(
2221
/**
2322
* 环形队列
2423
*/
25-
private val queue: WheelQueue
24+
private val queue: WheelQueue,
25+
26+
/**
27+
* 槽位数量
28+
*/
29+
private val ticksPerWheel: Int,
30+
31+
/**
32+
* 槽位时间间隔
33+
*/
34+
private val tickDuration: Long,
35+
36+
/**
37+
* 时间单位
38+
*/
39+
private val unit: TimeUnit
2640

2741
) : TimerTask() {
2842

2943
companion object {
44+
3045
private val LOGGER: Logger = LogUtils.getLogger()
3146

3247
private val slotThreadFactory: ThreadFactory = QueueDefaultThreadFactory("slotThreadGroup")
3348

3449
private val taskThreadFactory: ThreadFactory = QueueDefaultThreadFactory("taskThreadGroup")
50+
3551
}
3652

3753
/**
@@ -45,11 +61,11 @@ class QueueScanTimer(
4561
private val taskPool = ThreadPoolExecutor(1000, 1000, 0L, TimeUnit.MILLISECONDS, LinkedBlockingQueue(), taskThreadFactory)
4662

4763
override fun run() {
48-
val now = LocalDateTime.now()
49-
val currentSecond = (now.minute * 60 + now.second) % (1 shl 10)
50-
val slot = queue.peek(currentSecond)
51-
LOGGER.debug("current slot: {}", currentSecond)
52-
slotPool.execute(SlotTask(slot.tasks, currentSecond, taskPool, queue))
64+
val now = System.nanoTime()
65+
val index = now / unit.toNanos(tickDuration) % ticksPerWheel
66+
val slot = queue.peek(index.toInt())
67+
LOGGER.debug("current slot: {}", index)
68+
slotPool.execute(SlotTask(slot.tasks, taskPool, queue))
5369
}
5470

5571
}
Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,7 @@
11
package com.tang.commons.utils.queue.config
22

3+
import java.util.concurrent.TimeUnit
4+
35
/**
46
* 队列配置
57
*
@@ -8,8 +10,8 @@ package com.tang.commons.utils.queue.config
810
object QueueConfig {
911

1012
/**
11-
* 槽位数量
13+
* 默认延时单位
1214
*/
13-
const val QUEUE_SIZE: Int = 3600
15+
val DEFAULT_DELAY_UNIT: TimeUnit = TimeUnit.MILLISECONDS
1416

1517
}

tang-commons/src/main/kotlin/com/tang/commons/utils/queue/factory/QueueDefaultThreadFactory.kt

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,14 @@ import java.util.concurrent.atomic.AtomicInteger
88
*
99
* @author Tang
1010
*/
11-
class QueueDefaultThreadFactory(groupName: String) : ThreadFactory {
11+
class QueueDefaultThreadFactory(
12+
13+
/**
14+
* 线程组名称
15+
*/
16+
groupName: String
17+
18+
) : ThreadFactory {
1219

1320
companion object {
1421
private val POOL_NUMBER = AtomicInteger(1)

tang-commons/src/main/kotlin/com/tang/commons/utils/queue/task/TaskAttribute.kt

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
package com.tang.commons.utils.queue.task
22

3-
import java.util.Date
3+
import java.time.LocalDateTime
44

55
/**
66
* 任务属性
@@ -17,11 +17,11 @@ class TaskAttribute {
1717
/**
1818
* 任务加入槽位的时间
1919
*/
20-
lateinit var joinTime: Date
20+
lateinit var joinTime: LocalDateTime
2121

2222
/**
2323
* 任务应该什么时候执行
2424
*/
25-
lateinit var executeTime: Date
25+
lateinit var executeTime: LocalDateTime
2626

2727
}

tang-commons/src/main/kotlin/com/tang/commons/utils/queue/wheel/SlotTask.kt

Lines changed: 13 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,6 @@
11
package com.tang.commons.utils.queue.wheel
22

3-
import com.tang.commons.utils.LogUtils
43
import com.tang.commons.utils.queue.task.AbstractTask
5-
import org.slf4j.Logger
64
import java.util.concurrent.ConcurrentMap
75
import java.util.concurrent.ThreadPoolExecutor
86

@@ -12,34 +10,34 @@ import java.util.concurrent.ThreadPoolExecutor
1210
* @author Tang
1311
*/
1412
class SlotTask(
13+
14+
/**
15+
* 任务集合
16+
*/
1517
private var tasks: ConcurrentMap<String, AbstractTask>,
16-
private var currentSecond: Int,
18+
19+
/**
20+
* 任务线程池
21+
*/
1722
private var taskPool: ThreadPoolExecutor,
23+
24+
/**
25+
* 环形队列
26+
*/
1827
private var queue: WheelQueue
19-
) : Runnable {
2028

21-
companion object {
22-
private val LOGGER: Logger = LogUtils.getLogger()
23-
}
29+
) : Runnable {
2430

2531
override fun run() {
2632
val it = tasks.entries.iterator()
2733
while (it.hasNext()) {
2834
val entry = it.next()
2935
val task = entry.value
30-
LOGGER.debug(
31-
"running_current_slot:currentSecond => {}, task => {}, taskQueueSize => {}",
32-
currentSecond, task.toString(), tasks.size
33-
)
3436
if (task.cycleNum <= 0) {
3537
taskPool.execute(task)
3638
it.remove()
3739
queue.remove(entry.key)
3840
} else {
39-
LOGGER.debug(
40-
"decrementCycle#running_current_solt:currentSecond => {}, task => {}",
41-
currentSecond, task
42-
)
4341
task.decrementCycle()
4442
}
4543
}

tang-commons/src/main/kotlin/com/tang/commons/utils/queue/wheel/WheelQueue.kt

Lines changed: 101 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -1,73 +1,151 @@
11
package com.tang.commons.utils.queue.wheel
22

33
import com.tang.commons.utils.LogUtils
4+
import com.tang.commons.utils.queue.config.QueueConfig
45
import com.tang.commons.utils.queue.task.AbstractTask
56
import com.tang.commons.utils.queue.task.TaskAttribute
67
import org.slf4j.Logger
8+
import java.time.Instant
79
import java.time.LocalDateTime
810
import java.time.ZoneId
9-
import java.util.Date
11+
import java.util.concurrent.TimeUnit
1012

1113
/**
1214
* 时间轮队列。像轮子一样转动的队列, 环形队列、循环队列。
1315
*
1416
* @author Tang
1517
*/
16-
class WheelQueue(private val ticksPerWheel: Int) {
18+
class WheelQueue(
19+
20+
/**
21+
* 槽位数量
22+
*/
23+
private val ticksPerWheel: Int,
24+
25+
/**
26+
* 槽位时间间隔
27+
*/
28+
private val tickDuration: Long,
29+
30+
/**
31+
* 时间单位
32+
*/
33+
private val tickUnit: TimeUnit
34+
35+
) {
1736

1837
companion object {
1938
private val LOGGER: Logger = LogUtils.getLogger()
2039
}
2140

2241
/**
23-
* 建立一个有3600个槽位的环形队列。每秒轮询一个槽位,3600个就是3600秒=1小时
42+
* 环形队列中的槽位
2443
*/
2544
private val slotQueue: Array<Slot> = Array(findNextPositivePowerOfTwo(ticksPerWheel)) { Slot() }
2645

2746
/**
2847
* 任务 ID 对应的槽位等任务属性
2948
*/
30-
private val taskSlotMapping: MutableMap<String, TaskAttribute> = HashMap(1 shl 10) // aka 1024
49+
private val taskSlotMapping: MutableMap<String, TaskAttribute> = HashMap(slotQueue.size shl 1)
3150

51+
/**
52+
* 计算下一个 2 的幂次方
53+
*/
3254
private fun findNextPositivePowerOfTwo(value: Int): Int {
3355
assert(value > Int.MIN_VALUE && value < 0x40000000)
3456
return 1 shl (32 - Integer.numberOfLeadingZeros(value - 1))
3557
}
3658

59+
/**
60+
* 添加一个任务到环形队列
61+
*
62+
* @param taskId 任务 ID
63+
* @param delay 延迟时间
64+
* @param task 任务
65+
*/
66+
fun add(taskId: String, delay: Int, task: Runnable) {
67+
add(taskId, delay, QueueConfig.DEFAULT_DELAY_UNIT, task)
68+
}
69+
70+
/**
71+
* 添加一个任务到环形队列
72+
*
73+
* @param taskId 任务 ID
74+
* @param delay 延迟时间
75+
* @param task 任务
76+
*/
3777
fun add(taskId: String, delay: Long, task: Runnable) {
78+
add(taskId, delay, QueueConfig.DEFAULT_DELAY_UNIT, task)
79+
}
80+
81+
/**
82+
* 添加一个任务到环形队列
83+
*
84+
* @param taskId 任务 ID
85+
* @param delay 延迟时间
86+
* @param unit 时间单位
87+
* @param task 任务
88+
*/
89+
fun add(taskId: String, delay: Int, unit: TimeUnit, task: Runnable) {
90+
add(object : AbstractTask(taskId) {
91+
override fun run() {
92+
task.run()
93+
}
94+
}, delay, unit)
95+
}
96+
97+
/**
98+
* 添加一个任务到环形队列
99+
*
100+
* @param taskId 任务 ID
101+
* @param delay 延迟时间
102+
* @param unit 时间单位
103+
* @param task 任务
104+
*/
105+
fun add(taskId: String, delay: Long, unit: TimeUnit, task: Runnable) {
106+
add(object : AbstractTask(taskId) {
107+
override fun run() {
108+
task.run()
109+
}
110+
}, delay, unit)
38111
}
39112

40113
/**
41114
* 添加一个任务到环形队列
42115
*
43-
* @param task 任务
44-
* @param secondsLater 以当前时间点为基准,多少秒以后执行
116+
* @param task 任务
117+
* @param delay 延迟时间
118+
* @param unit 时间单位
45119
*/
46-
fun add(task: AbstractTask, secondsLater: Int) {
120+
fun add(task: AbstractTask, delay: Int, unit: TimeUnit = QueueConfig.DEFAULT_DELAY_UNIT) {
121+
add(task, delay.toLong(), unit)
122+
}
123+
124+
/**
125+
* 添加一个任务到环形队列
126+
*
127+
* @param task 任务
128+
* @param delay 延迟时间
129+
* @param unit 时间单位
130+
*/
131+
fun add(task: AbstractTask, delay: Long, unit: TimeUnit = QueueConfig.DEFAULT_DELAY_UNIT) {
47132
//设置任务熟悉
48-
val slotIndex = setAttribute(secondsLater, task)
133+
val slotIndex = setAttribute(task, delay, unit)
49134
//加到对应槽位的集合中
50135
slotQueue[slotIndex].addTask(task)
51136
LOGGER.debug("join task.task => {}, slotIndex => {}", task, slotIndex)
52137
}
53138

54-
private fun setAttribute(
55-
secondsLater: Int,
56-
task: AbstractTask,
57-
taskSlotMapping: Map<String, TaskAttribute>
58-
): Int {
59-
val taskAttribute = TaskAttribute()
60-
val now = LocalDateTime.now()
61-
val currentSecond = now.minute * 60 + now.second
62-
val slotIndex = (currentSecond + secondsLater) % ticksPerWheel
63-
task.cycleNum = secondsLater / ticksPerWheel
64-
val future = now.plusSeconds(secondsLater.toLong())
139+
private fun setAttribute(task: AbstractTask, delay: Long, unit: TimeUnit): Int {
140+
val duration = unit.toNanos(delay)
141+
val now = System.nanoTime()
142+
val slotIndex = (now + duration) / tickUnit.toNanos(tickDuration) % ticksPerWheel
65143
val taskAttribute = TaskAttribute()
66-
taskAttribute.executeTime = Date.from(future.atZone(ZoneId.systemDefault()).toInstant())
67-
taskAttribute.slotIndex = slotIndex
68-
taskAttribute.joinTime = Date.from(now.atZone(ZoneId.systemDefault()).toInstant())
144+
taskAttribute.joinTime = LocalDateTime.ofInstant(Instant.ofEpochSecond(now / 1_000_000_000), ZoneId.systemDefault())
145+
taskAttribute.slotIndex = slotIndex.toInt()
146+
taskAttribute.executeTime = LocalDateTime.ofInstant(Instant.ofEpochSecond((now + duration) / 1_000_000_000), ZoneId.systemDefault())
69147
taskSlotMapping[task.id] = taskAttribute
70-
return slotIndex
148+
return slotIndex.toInt()
71149
}
72150

73151
/**

0 commit comments

Comments
 (0)