diff --git a/README.md b/README.md
index 298cbb4..9c3d84e 100644
--- a/README.md
+++ b/README.md
@@ -20,14 +20,14 @@ xtool 是一个小小的 Java 工具集,遵循简单、可靠的原则,不
com.igeeksky.xtool
xtool
- 1.1.0
+ 1.1.1
```
### 2.2.Gradle
```groovy
-implementation group: 'com.igeeksky.xtool', name: 'xtool', version: '1.1.0'
+implementation group: 'com.igeeksky.xtool', name: 'xtool', version: '1.1.1'
```
### 2.3.编译安装
@@ -76,32 +76,121 @@ mvn clean install
**!!!总之,欢迎 pr,欢迎 issue!!!**
+
+
## 4. 更新日志
-| 版本 | 说明 |
-| ------ | ------------------------------------------------------------ |
-| 1.1.0 | 1. remove @ParameterNames
2. add ObjectUtils
3. tuple move to root package |
-| 1.0.22 | add ImmutableByteArray |
-| 1.0.21 | KeyValue 和 ByteArray 增加静态方法 |
-| 1.0.20 | Futures 增加 awaitAll 方法 |
-| 1.0.19 | 优化 SimpleJSON 实现:如果 field 为 public,即使无 readMethod,也读取属性值 |
-| 1.0.18 | 优化 RandomUtils 实现,通过 threadId 计算 index ,并获取数组对应 index 的 Random 实例 |
-| 1.0.17 | 优化 SimpleJSON 实现,仅获取 readMethod,支持非标准 JavaBean |
-| 1.0.16 | remove ThreadLocalRandom,change to Random[] |
-| 1.0.15 | add ArrayUtils.fill |
-| 1.0.14 | 1. add Codec
2. add Compressor |
-| 1.0.13 | 1. add Futures
2. add ByteArray
3. add KeyValue & ExpireKeyValue |
-| 1.0.12 | 1. 修改部分文档说明 |
-| 1.0.11 | 1. 添加 ConcurrentHashSet
2. 添加 PlatformThreadFactory
3. 添加 VirtualThreadFactory
4. 升级为支持 JDK 21,不再支持 JDK 17 |
-| 1.0.10 | 1. 添加 SimpleJSON(仅实现对象转 JSONString)
2. 升级为支持 JDK 17,不再支持 JDK 8 |
-| 1.0.9 | 1. 添加 RandomUtils |
-| 1.0.8 | 1. 添加 Maps.newHashMap方法 |
-| 1.0.7 | 1. 添加 IOUtils.closeQuietly方法 |
-| 1.0.6 | 1. Found 删除 node 字段,NodeHelper 增加精确匹配方法 |
-| 1.0.5 | 1. 调整部分方法返回值 |
-| 1.0.4 | 1. 增加测试用例;2. 编写参考文档;3.调整部分代码 |
-| 1.0.3 | 1. 调整DigestUtils默认小写 |
-| 1.0.2 | 1. 补充完整注释 |
-| 1.0.1 | 1. 增加测试用例 2. 删除 Lists类 |
-| 1.0.0 | 1. 添加常用工具类 2. 添加 ConcurrentHashTrie 字典树 |
+### 1.1.1
+
+1. add RingBuffer
+2. add FuturesTest
+
+### 1.1.0
+
+1. remove @ParameterNames
+2. add ObjectUtils
+3. tuple move to root package
+
+### 1.0.22
+
+1. add ImmutableByteArray
+
+### 1.0.21
+
+1. KeyValue 和 ByteArray 增加静态方法
+
+### 1.0.20
+
+1. Futures 增加 awaitAll 方法
+
+### 1.0.19
+
+1. 优化 SimpleJSON 实现:如果 field 为 public,即使无 readMethod,也读取属性值
+
+### 1.0.18
+
+1. 优化 RandomUtils 实现,通过 threadId 计算 index ,并获取数组对应 index 的 Random 实例
+
+### 1.0.17
+
+1. 优化 SimpleJSON 实现,仅获取 readMethod,支持非标准 JavaBean
+
+### 1.0.16
+
+1. remove ThreadLocalRandom,change to Random[]
+
+### 1.0.15
+
+1. add ArrayUtils.fill
+
+### 1.0.14
+
+1. add Codec
+2. add Compressor
+
+### 1.0.13
+
+1. add Futures
+2. add ByteArray
+3. add KeyValue & ExpireKeyValue
+
+### 1.0.12
+
+1. 修改部分文档说明
+
+### 1.0.11
+
+1. 添加 ConcurrentHashSet
+2. 添加 PlatformThreadFactory
+3. 添加 VirtualThreadFactory
+4. 升级为支持 JDK 21,不再支持 JDK 17
+
+### 1.0.10
+
+1. 添加 SimpleJSON(仅实现对象转 JSONString)
+2. 升级为支持 JDK 17,不再支持 JDK 8
+
+### 1.0.9
+
+1. 添加 RandomUtils
+
+### 1.0.8
+
+1. 添加 Maps.newHashMap方法
+
+### 1.0.7
+
+1. 添加 IOUtils.closeQuietly方法
+
+### 1.0.6
+
+1. Found 删除 node 字段,NodeHelper 增加精确匹配方法
+
+### 1.0.5
+
+1. 调整部分方法返回值
+
+### 1.0.4
+
+1. 增加测试用例
+2. 编写参考文档
+3. 调整部分代码
+
+### 1.0.3
+
+1. 调整DigestUtils默认小写
+
+### 1.0.2
+
+1. 补充完整注释
+
+### 1.0.1
+
+1. 增加测试用例
+2. 删除 Lists类
+
+### 1.0.0
+
+1. 添加常用工具类
+2. 添加 ConcurrentHashTrie 字典树
diff --git a/docs/Reference.md b/docs/Reference.md
index 8b1c7de..5ecaab0 100644
--- a/docs/Reference.md
+++ b/docs/Reference.md
@@ -1,6 +1,6 @@
## xtool 参考文档
-Author: [Patrick.Lau](mailto:patricklauxx@gmail.com) Version: 1.1.0
+Author: [Patrick.Lau](mailto:patricklauxx@gmail.com) Version: 1.1.1
[](https://www.apache.org/licenses/LICENSE-2.0.html) [](https://github.com/patricklaux/xtool/releases) [](https://search.maven.org/search?q=g:%22com.igeeksky.xtool%22%20AND%20a:%22xtool%22) [](https://codecov.io/gh/patricklaux/xtool) [](https://github.com/patricklaux/xtool/commits) [](https://gitter.im/igeeksky/xtool?utm_source=badge&utm_medium=badge&utm_campaign=pr-badge&utm_content=badge)
@@ -26,14 +26,14 @@ xtool 是一个小小的 Java 工具集,遵循简单、可靠的原则,不
com.igeeksky.xtool
xtool
- 1.1.0
+ 1.1.1
```
#### 1.2.2.Gradle
```groovy
-implementation group: 'com.igeeksky.xtool', name: 'xtool', version: '1.1.0'
+implementation group: 'com.igeeksky.xtool', name: 'xtool', version: '1.1.1'
```
#### 1.2.3.编译安装
diff --git a/docs/TODO.md b/docs/TODO.md
index 34368e5..d24e6f2 100644
--- a/docs/TODO.md
+++ b/docs/TODO.md
@@ -4,6 +4,7 @@
- [X] 【开发】 KeyValue, ExpireKeyValue
- [X] 【开发】 Codec
- [X] 【开发】 Compressor
+- [X] 【开发】 RingBuffer
### 待完成任务
diff --git a/pom.xml b/pom.xml
index c1549ae..62e53dd 100644
--- a/pom.xml
+++ b/pom.xml
@@ -6,14 +6,14 @@
com.igeeksky.xtool
xtool
- 1.1.0
+ 1.1.1
xtool
xtool is a very small set of Java tools.
https://github.com/patricklaux/xtool
- 5.11.4
+ 5.14.2
2.18.2
@@ -62,16 +62,16 @@
- org.junit.jupiter
- junit-jupiter-api
- ${jupiter.version}
+ com.fasterxml.jackson.core
+ jackson-databind
+ ${jackson.version}
test
true
- com.fasterxml.jackson.core
- jackson-databind
- ${jackson.version}
+ org.mockito
+ mockito-junit-jupiter
+ ${mockito.version}
test
true
diff --git a/src/main/java/com/igeeksky/xtool/core/collection/RingBuffer.java b/src/main/java/com/igeeksky/xtool/core/collection/RingBuffer.java
new file mode 100644
index 0000000..729f977
--- /dev/null
+++ b/src/main/java/com/igeeksky/xtool/core/collection/RingBuffer.java
@@ -0,0 +1,183 @@
+package com.igeeksky.xtool.core.collection;
+
+import com.igeeksky.xtool.core.lang.Assert;
+
+import java.util.concurrent.atomic.AtomicLong;
+import java.util.concurrent.atomic.AtomicReferenceArray;
+import java.util.function.Consumer;
+
+/**
+ * 环形缓冲区
+ *
+ * @author Patrick.Lau
+ * @since 1.1.1
+ */
+public class RingBuffer {
+
+ private static final int MAXIMUM_SIZE = 1 << 30;
+ private static final int MINIMUM_SIZE = 1 << 4;
+
+ private final int cap;
+ private final int mask;
+
+ private final AtomicReferenceArray buffer;
+ private final AtomicLong readCounter = new AtomicLong(0);
+ private final AtomicLong writeCounter = new AtomicLong(0);
+
+ /**
+ * 创建一个固定容量的环形缓冲区
+ *
+ * @param capacity 固定容量(如非 2的幂,则自动调整为最接近的2的幂)
+ */
+ public RingBuffer(int capacity) {
+ this.cap = tableSizeFor(capacity);
+ this.mask = this.cap - 1;
+ this.buffer = new AtomicReferenceArray<>(this.cap);
+ }
+
+ /**
+ * 添加元素,如果缓冲区已满,则返回 false
+ *
+ * @param element 元素
+ * @return 是否添加成功
+ */
+ public boolean offer(E element) {
+ Assert.notNull(element, "element must not be null");
+ long head = readCounter.get();
+ long tail = writeCounter.getOpaque();
+ while (tail - head < cap) {
+ if (writeCounter.weakCompareAndSetVolatile(tail, tail + 1)) {
+ int index = (int) (tail & mask);
+ buffer.setRelease(index, element);
+ return true;
+ }
+ head = readCounter.get();
+ tail = writeCounter.get();
+ }
+ return false;
+ }
+
+ /**
+ * 获取并移除缓冲区头部元素,如果缓冲区为空,则返回 null
+ *
+ * @return 元素
+ */
+ public E poll() {
+ long head = readCounter.getOpaque();
+ long tail = writeCounter.get();
+ if (head < tail) {
+ int index = (int) (head & mask);
+ E element = buffer.getAcquire(index);
+ if (element == null) {
+ return null;
+ }
+ // 使用CAS操作尝试将读取位置向前移动,以确保线程安全
+ if (readCounter.weakCompareAndSetVolatile(head, head + 1)) {
+ buffer.weakCompareAndSetVolatile(index, element, null);
+ return element;
+ }
+ }
+ return null;
+ }
+
+ /**
+ * 将队列中的所有元素传递给指定的消费者,直到队列为空
+ * 此方法通过比较读取和写入计数器来确定队列是否为空,并使用CAS操作来确保线程安全
+ *
+ * @param consumer 消费者接口,用于处理队列中的元素
+ */
+ public void drainTo(Consumer consumer) {
+ // 获取当前读取位置
+ long head = readCounter.getOpaque();
+ // 获取当前写入位置
+ long tail = writeCounter.get();
+ // 当读取位置小于写入位置时,表示队列不为空
+ while (head < tail) {
+ int index = (int) (head & mask);
+ E element = buffer.getAcquire(index);
+ if (element == null) {
+ return;
+ }
+ // 使用CAS操作尝试将读取位置向前移动,以确保线程安全
+ if (readCounter.weakCompareAndSetVolatile(head, head + 1)) {
+ buffer.weakCompareAndSetVolatile(index, element, null);
+ consumer.accept(element);
+ }
+ // 重新获取当前读取和写入位置,以准备下一轮循环
+ head = readCounter.get();
+ tail = writeCounter.get();
+ }
+ }
+
+ /**
+ * 获取缓冲区元素数量
+ *
+ * @return 元素数量
+ */
+ public int size() {
+ return (int) (writeCounter.get() - readCounter.get());
+ }
+
+ /**
+ * 判断缓冲区是否为空
+ *
+ * @return 是否为空
+ */
+ public boolean isEmpty() {
+ return size() == 0;
+ }
+
+ /**
+ * 判断缓冲区是否已满
+ *
+ * @return 是否已满
+ */
+ public boolean isFull() {
+ return size() == cap;
+ }
+
+ /**
+ * 获取缓冲区容量
+ *
+ * @return 容量
+ */
+ public int capacity() {
+ return cap;
+ }
+
+ /**
+ * 获取缓冲区读取计数器
+ *
+ * @return 读取计数器
+ */
+ public long reads() {
+ return readCounter.get();
+ }
+
+ /**
+ * 获取缓冲区写入计数器
+ *
+ * @return 写入计数器
+ */
+ public long writes() {
+ return writeCounter.get();
+ }
+
+ /**
+ * 计算最接近2的幂的容量
+ *
+ * @param size 容量
+ * @return 最接近2的幂的容量
+ */
+ private static int tableSizeFor(int size) {
+ if (size <= MINIMUM_SIZE) {
+ return MINIMUM_SIZE;
+ }
+ if (size >= MAXIMUM_SIZE) {
+ return MAXIMUM_SIZE;
+ }
+ int n = -1 >>> Integer.numberOfLeadingZeros(size - 1);
+ return (n <= MINIMUM_SIZE) ? MINIMUM_SIZE : (n >= MAXIMUM_SIZE) ? MAXIMUM_SIZE : n + 1;
+ }
+
+}
diff --git a/src/main/java/com/igeeksky/xtool/core/concurrent/Futures.java b/src/main/java/com/igeeksky/xtool/core/concurrent/Futures.java
index 8a8d625..21a3693 100644
--- a/src/main/java/com/igeeksky/xtool/core/concurrent/Futures.java
+++ b/src/main/java/com/igeeksky/xtool/core/concurrent/Futures.java
@@ -123,7 +123,6 @@ public static int awaitAll(long timeout, TimeUnit unit, int start, ArrayList[] futures, boolean mayInterrup
if (future.isDone()) {
continue;
}
- if (future.isCancelled()) {
- continue;
- }
future.cancel(mayInterruptIfRunning);
}
}
@@ -202,7 +198,7 @@ public static void cancelAll(int start, Future>[] futures, boolean mayInterrup
* @param start 起始位置,从此位置开始取消未完成的任务
* @param futures 任务列表
*/
- public static void cancelAll(int start, ArrayList> futures) {
+ public static void cancelAll(int start, ArrayList> futures, boolean mayInterruptIfRunning) {
int i = start, len = futures.size();
for (; i < len; i++) {
Future> future = futures.get(i);
@@ -210,10 +206,7 @@ public static void cancelAll(int start, ArrayList> futures) {
if (future.isDone()) {
continue;
}
- if (future.isCancelled()) {
- continue;
- }
- future.cancel(true);
+ future.cancel(mayInterruptIfRunning);
}
}
}
diff --git a/src/test/java/com/igeeksky/xtool/core/collection/RingBufferTest.java b/src/test/java/com/igeeksky/xtool/core/collection/RingBufferTest.java
new file mode 100644
index 0000000..8834c1b
--- /dev/null
+++ b/src/test/java/com/igeeksky/xtool/core/collection/RingBufferTest.java
@@ -0,0 +1,453 @@
+package com.igeeksky.xtool.core.collection;
+
+
+import com.igeeksky.xtool.core.lang.Assert;
+import org.junit.jupiter.api.Test;
+
+import java.util.Set;
+import java.util.concurrent.ArrayBlockingQueue;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.locks.LockSupport;
+import java.util.function.Consumer;
+
+import static org.junit.jupiter.api.Assertions.*;
+
+/**
+ * RingBuffer 测试用例
+ *
+ * @author Patrick.Lau
+ * @since 1.1.1
+ */
+public class RingBufferTest {
+
+ @Test
+ void size_maxCapacity_ReturnsMaxCapacity() {
+ RingBuffer buffer = new RingBuffer<>((1 << 30) + 1);
+ assertEquals(1 << 30, buffer.capacity());
+ assertEquals(0, buffer.size());
+ assertTrue(buffer.isEmpty());
+ assertFalse(buffer.isFull());
+ }
+
+ @Test
+ void size_minCapacity_ReturnsMaxCapacity() {
+ RingBuffer buffer = new RingBuffer<>(1);
+ assertEquals(16, buffer.capacity());
+ assertEquals(0, buffer.size());
+ assertTrue(buffer.isEmpty());
+ assertFalse(buffer.isFull());
+
+ for (int i = 0; i < 16; i++) {
+ buffer.offer("element" + i);
+ }
+
+ assertTrue(buffer.isFull());
+ }
+
+ @Test
+ void offer_BufferNotFull_ElementAdded() {
+ RingBuffer buffer = new RingBuffer<>(16);
+ assertTrue(buffer.offer("element1"));
+ assertEquals(1, buffer.size());
+ }
+
+ @Test
+ void offer_BufferFull_ElementNotAdded() {
+ RingBuffer buffer = new RingBuffer<>(16);
+ for (int i = 0; i < 16; i++) {
+ buffer.offer("element" + i);
+ }
+ assertFalse(buffer.offer("element17"));
+ assertEquals(16, buffer.size());
+ }
+
+ @Test
+ void poll_BufferNotEmpty_ElementRemoved() {
+ RingBuffer buffer = new RingBuffer<>(16);
+ buffer.offer("element1");
+ assertEquals("element1", buffer.poll());
+ assertEquals(0, buffer.size());
+ }
+
+ @Test
+ void poll_BufferEmpty_ReturnsNull() {
+ RingBuffer buffer = new RingBuffer<>(16);
+ assertNull(buffer.poll());
+ }
+
+ @Test
+ void size_BufferEmpty_SizeZero() {
+ RingBuffer buffer = new RingBuffer<>(16);
+ assertEquals(0, buffer.size());
+ }
+
+ @Test
+ void size_BufferPartiallyFilled_SizeNonZero() {
+ RingBuffer buffer = new RingBuffer<>(16);
+ buffer.offer("element1");
+ assertEquals(1, buffer.size());
+ }
+
+ @Test
+ void size_BufferFull_SizeEqualsCapacity() {
+ RingBuffer buffer = new RingBuffer<>(16);
+ for (int i = 0; i < 10; i++) {
+ buffer.offer("element" + i);
+ }
+ assertEquals(10, buffer.size());
+ }
+
+ @Test
+ void reads_CounterReflectsReadOperations() {
+ RingBuffer buffer = new RingBuffer<>(16);
+ buffer.offer("element1");
+ buffer.poll();
+ assertEquals(1, buffer.reads());
+ }
+
+ @Test
+ void writes_CounterReflectsWriteOperations() {
+ RingBuffer buffer = new RingBuffer<>(16);
+ buffer.offer("element1");
+ assertEquals(1, buffer.writes());
+ }
+
+ @Test
+ void drainTo_EmptyBuffer_ConsumerNotCalled() {
+ RingBuffer buffer = new RingBuffer<>(16);
+ StringConsumer consumer = new StringConsumer();
+
+ buffer.drainTo(consumer);
+ assertEquals(0, consumer.getCount());
+ }
+
+ @Test
+ void drainTo_NonEmptyBuffer_AllElementsConsumed() {
+ RingBuffer buffer = new RingBuffer<>(16);
+ StringConsumer consumer = new StringConsumer();
+
+ buffer.offer("element1");
+ buffer.offer("element2");
+ buffer.offer("element3");
+
+ buffer.drainTo(consumer);
+
+ assertEquals(3, consumer.getCount());
+ }
+
+ @Test
+ void test_TwoOffer_Concurrently() throws InterruptedException {
+ RingBuffer buffer = new RingBuffer<>(2048);
+ CountDownLatch latch = new CountDownLatch(2);
+
+ Thread thread1 = new Thread(() -> {
+ for (int i = 0; i < 1000; ) {
+ if (buffer.offer("element" + i)) {
+ i++;
+ }
+ }
+ latch.countDown();
+ });
+
+ Thread thread2 = new Thread(() -> {
+ for (int i = 1000; i < 2000; ) {
+ if (buffer.offer("element" + i)) {
+ i++;
+ }
+ }
+ latch.countDown();
+ });
+
+ thread1.start();
+ thread2.start();
+
+ latch.await();
+
+ assertEquals(2000, buffer.size());
+ assertEquals(2000, buffer.writes());
+ assertEquals(0, buffer.reads());
+ }
+
+ @Test
+ void test_TwoPoll_Concurrently() throws InterruptedException {
+ RingBuffer buffer = new RingBuffer<>(2048);
+ StringConsumer consumer = new StringConsumer();
+ CountDownLatch latch = new CountDownLatch(2);
+
+ for (int i = 0; i < 2000; ) {
+ if (buffer.offer("element" + i)) {
+ i++;
+ }
+ }
+
+ Thread thread1 = new Thread(() -> {
+ String polled;
+ while ((polled = buffer.poll()) != null) {
+ consumer.accept(polled);
+ }
+ latch.countDown();
+ });
+
+ Thread thread2 = new Thread(() -> {
+ String polled;
+ while ((polled = buffer.poll()) != null) {
+ consumer.accept(polled);
+ }
+ latch.countDown();
+ });
+
+ thread1.start();
+ thread2.start();
+
+ latch.await();
+
+ Set values = consumer.getValues();
+ for (int i = 0; i < 2000; i++) {
+ String element = "element" + i;
+ if (!values.contains(element)) {
+ System.out.println(element + " is not exist!");
+ }
+ }
+
+ assertEquals(2000, consumer.getCount());
+ assertEquals(2000, buffer.writes());
+ assertEquals(2000, buffer.reads());
+ }
+
+ @Test
+ void test_TwoOffer_OneConsumer_Concurrently() {
+ RingBuffer buffer = new RingBuffer<>(16);
+ StringConsumer consumer = new StringConsumer(2000000);
+ CountDownLatch latch = new CountDownLatch(2);
+
+ Thread thread1 = new Thread(() -> {
+ for (int i = 0; i < 1000000; ) {
+ if (buffer.offer("element" + i)) {
+ i++;
+ }
+ }
+ latch.countDown();
+ });
+ Thread thread2 = new Thread(() -> {
+ for (int i = 1000000; i < 2000000; ) {
+ if (buffer.offer("element" + i)) {
+ i++;
+ }
+ }
+ latch.countDown();
+ });
+
+ long startTime = System.currentTimeMillis();
+
+ thread1.start();
+ thread2.start();
+
+ while (latch.getCount() != 0) {
+ buffer.drainTo(consumer);
+ }
+
+ buffer.drainTo(consumer);
+
+ long endTime = System.currentTimeMillis();
+
+ System.out.println("Time cost: " + (endTime - startTime) + "ms");
+
+ Set values = consumer.getValues();
+ for (int i = 0; i < 2000000; i++) {
+ String element = "element" + i;
+ if (!values.contains(element)) {
+ System.out.println(element + " is not exist!");
+ }
+ }
+
+ assertEquals(2000000, consumer.getCount());
+ assertEquals(2000000, buffer.writes());
+ assertEquals(2000000, buffer.reads());
+ }
+
+ @Test
+ void test_TwoOffer_OneConsumer_Concurrently_ArrayBlockingQueue() {
+ ArrayBlockingQueue buffer = new ArrayBlockingQueue<>(16);
+ ConcurrentHashSet consumer = Sets.newConcurrentHashSet(2000000);
+ CountDownLatch latch = new CountDownLatch(2);
+
+ Thread thread1 = new Thread(() -> {
+ for (int i = 0; i < 1000000; ) {
+ if (buffer.offer("element" + i)) {
+ i++;
+ }
+ }
+ latch.countDown();
+ });
+ Thread thread2 = new Thread(() -> {
+ for (int i = 1000000; i < 2000000; ) {
+ if (buffer.offer("element" + i)) {
+ i++;
+ }
+ }
+ latch.countDown();
+ });
+
+ long startTime = System.currentTimeMillis();
+
+ thread1.start();
+ thread2.start();
+
+ while (latch.getCount() != 0) {
+ buffer.drainTo(consumer);
+ }
+
+ buffer.drainTo(consumer);
+
+ long endTime = System.currentTimeMillis();
+
+ System.out.println("Time cost: " + (endTime - startTime) + "ms");
+
+ assertEquals(2000000, consumer.size());
+ }
+
+ @Test
+ void test_TwoOffer_TwoConsumer_Concurrently() throws InterruptedException {
+ RingBuffer buffer = new RingBuffer<>(16);
+ StringConsumer consumer1 = new StringConsumer();
+ StringConsumer consumer2 = new StringConsumer();
+ CountDownLatch offerLatch = new CountDownLatch(2);
+ CountDownLatch consumerLatch = new CountDownLatch(2);
+
+ Thread offerThread1 = new Thread(() -> {
+ for (int i = 0; i < 1000; ) {
+ if (buffer.offer("element" + i)) {
+ i++;
+ }
+ }
+ offerLatch.countDown();
+ });
+
+ Thread offerThread2 = new Thread(() -> {
+ for (int i = 1000; i < 2000; ) {
+ if (buffer.offer("element" + i)) {
+ i++;
+ }
+ }
+ offerLatch.countDown();
+ });
+
+ Thread consumerThread1 = new Thread(() -> {
+ while (offerLatch.getCount() != 0) {
+ buffer.drainTo(consumer1);
+ // LockSupport.parkNanos(10);
+ }
+
+ buffer.drainTo(consumer1);
+ consumerLatch.countDown();
+ });
+
+ Thread consumerThread2 = new Thread(() -> {
+ while (offerLatch.getCount() != 0) {
+ buffer.drainTo(consumer2);
+ }
+ buffer.drainTo(consumer2);
+ consumerLatch.countDown();
+ });
+
+ offerThread1.start();
+ offerThread2.start();
+ consumerThread1.start();
+ consumerThread2.start();
+
+ consumerLatch.await();
+
+ Set values1 = consumer1.getValues();
+ Set values2 = consumer2.getValues();
+ for (int i = 0; i < 2000; i++) {
+ String element = "element" + i;
+ if (!values1.contains(element) && !values2.contains(element)) {
+ System.out.println(element + " is not exist!");
+ }
+ }
+
+ assertEquals(2000, consumer1.getCount() + consumer2.getCount());
+ assertEquals(2000, buffer.writes());
+ assertEquals(2000, buffer.reads());
+ }
+
+ @Test
+ void test_TwoOffer_OnePoll_Concurrently() {
+ RingBuffer buffer = new RingBuffer<>(2048);
+ StringConsumer consumer = new StringConsumer();
+ CountDownLatch latch = new CountDownLatch(2);
+
+ Thread thread1 = new Thread(() -> {
+ for (int i = 0; i < 1000; ) {
+ if (buffer.offer("element" + i)) {
+ i++;
+ }
+ }
+ latch.countDown();
+ });
+
+ Thread thread2 = new Thread(() -> {
+ for (int i = 1000; i < 2000; ) {
+ if (buffer.offer("element" + i)) {
+ i++;
+ }
+ }
+ latch.countDown();
+ });
+
+ thread1.start();
+ thread2.start();
+
+ for (int i = 0; i < 2001; i++) {
+ String polled = buffer.poll();
+ if (polled != null) {
+ consumer.accept(polled);
+ continue;
+ }
+ assertEquals(consumer.getCount(), buffer.reads());
+ LockSupport.parkNanos(100);
+ }
+
+ Set values = consumer.getValues();
+ for (int i = 0; i < 2000; i++) {
+ String element = "element" + i;
+ if (!values.contains(element)) {
+ System.out.println(element + " is not exist!");
+ }
+ }
+
+ assertEquals(2000, consumer.getCount());
+ }
+
+ private static class StringConsumer implements Consumer {
+
+ private final AtomicInteger count = new AtomicInteger();
+ private final Set values;
+
+ public StringConsumer() {
+ this(2048);
+ }
+
+ public StringConsumer(int capacity) {
+ values = Sets.newConcurrentHashSet(capacity);
+ }
+
+ @Override
+ public void accept(String s) {
+ Assert.notNull(s, "element must not be null");
+ count.incrementAndGet();
+ values.add(s);
+ }
+
+ int getCount() {
+ return count.get();
+ }
+
+ Set getValues() {
+ return values;
+ }
+
+ }
+
+}
diff --git a/src/test/java/com/igeeksky/xtool/core/concurrent/FuturesTest.java b/src/test/java/com/igeeksky/xtool/core/concurrent/FuturesTest.java
new file mode 100644
index 0000000..0ec9b54
--- /dev/null
+++ b/src/test/java/com/igeeksky/xtool/core/concurrent/FuturesTest.java
@@ -0,0 +1,128 @@
+package com.igeeksky.xtool.core.concurrent;
+
+
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+
+import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.mockito.Mockito.*;
+
+public class FuturesTest {
+
+ private Future> future1;
+ private Future> future2;
+ private Future> future3;
+
+ @BeforeEach
+ public void setUp() {
+ future1 = mock(Future.class);
+ future2 = mock(Future.class);
+ future3 = mock(Future.class);
+ }
+
+ @Test
+ public void awaitAll_FuturesCompleted_NoException() {
+ when(future1.isDone()).thenReturn(true);
+ when(future2.isDone()).thenReturn(true);
+ when(future3.isDone()).thenReturn(true);
+
+ Futures.awaitAll(new Future>[]{future1, future2, future3});
+ }
+
+ @Test
+ public void awaitAll_FuturesNotCompleted_WaitForCompletion() throws Exception {
+ when(future1.isDone()).thenReturn(false);
+ when(future2.isDone()).thenReturn(false);
+ when(future3.isDone()).thenReturn(false);
+
+ Futures.awaitAll(new Future>[]{future1, future2, future3});
+
+ verify(future1).get();
+ verify(future2).get();
+ verify(future3).get();
+ }
+
+ @Test
+ public void awaitAll_FuturesMixedState_WaitForUncompleted() throws Exception {
+ when(future1.isDone()).thenReturn(true);
+ when(future2.isDone()).thenReturn(false);
+ when(future3.isDone()).thenReturn(true);
+
+ Futures.awaitAll(new Future>[]{future1, future2, future3});
+
+ verify(future2).get();
+ }
+
+ @Test
+ public void awaitAll_EmptyList_NoException() {
+ Futures.awaitAll(new Future>[]{});
+ }
+
+ @Test
+ public void awaitAll_ListWithNulls_NoException() {
+ Futures.awaitAll(new Future>[]{future1, null, future2});
+ }
+
+ @Test
+ public void awaitAll_Timeout_AllCompletedBeforeTimeout() {
+ when(future1.isDone()).thenReturn(true);
+ when(future2.isDone()).thenReturn(true);
+ when(future3.isDone()).thenReturn(true);
+
+ int result = Futures.awaitAll(1000, TimeUnit.MILLISECONDS, 0, new Future>[]{future1, future2, future3});
+ assertEquals(3, result);
+ }
+
+ @Test
+ public void checkAll_AllCompleted_ReturnsLength() {
+ when(future1.isDone()).thenReturn(true);
+ when(future2.isDone()).thenReturn(true);
+ when(future3.isDone()).thenReturn(true);
+
+ int result = Futures.checkAll(0, new Future>[]{future1, future2, future3});
+ assertEquals(3, result);
+ }
+
+ @Test
+ public void checkAll_NotAllCompleted_ReturnsFirstUncompletedIndex() {
+ when(future1.isDone()).thenReturn(true);
+ when(future2.isDone()).thenReturn(false);
+ when(future3.isDone()).thenReturn(true);
+
+ int result = Futures.checkAll(0, new Future>[]{future1, future2, future3});
+ assertEquals(1, result);
+ }
+
+ @Test
+ public void checkAll_EmptyList_ReturnsZero() {
+ int result = Futures.checkAll(0, new Future>[]{});
+ assertEquals(0, result);
+ }
+
+ @Test
+ public void cancelAll_AllCompleted_NoCancellation() {
+ when(future1.isDone()).thenReturn(true);
+ when(future2.isDone()).thenReturn(true);
+ when(future3.isDone()).thenReturn(true);
+
+ Futures.cancelAll(0, new Future>[]{future1, future2, future3}, false);
+
+ verify(future1, never()).cancel(anyBoolean());
+ verify(future2, never()).cancel(anyBoolean());
+ verify(future3, never()).cancel(anyBoolean());
+ }
+
+ @Test
+ public void cancelAll_EmptyList_NoCancellation() {
+ Futures.cancelAll(0, new Future>[]{}, false);
+ }
+
+ @Test
+ public void cancelAll_ListWithNulls_NoCancellation() {
+ Futures.cancelAll(0, new Future>[]{future1, null, future2}, false);
+ }
+
+}