Skip to content

Commit d475227

Browse files
committed
Adapt handshakes.
1 parent 57650a7 commit d475227

File tree

2 files changed

+37
-14
lines changed

2 files changed

+37
-14
lines changed

src/main/java/mx/kenzie/jupiter/stream/impl/HandshakeChunkStream.java

Lines changed: 7 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -38,6 +38,13 @@ public void feed(Element element) {
3838
}
3939
}
4040

41+
@Override
42+
public Element[] readAllRemaining(Element[] array) {
43+
final List<Element> list = new ArrayList<>();
44+
while (!this.isClosed()) list.add(this.read());
45+
return list.toArray(array);
46+
}
47+
4148
@Override
4249
public Element read() {
4350
boolean read;
@@ -61,13 +68,6 @@ public Element read() {
6168
return element;
6269
}
6370

64-
@Override
65-
public Element[] readAllRemaining(Element[] array) {
66-
final List<Element> list = new ArrayList<>();
67-
while (!this.isClosed()) list.add(this.read());
68-
return list.toArray(array);
69-
}
70-
7171
@Override
7272
public synchronized boolean isClosed() {
7373
return closed;

src/main/java/mx/kenzie/jupiter/stream/impl/QueuedChunkStream.java

Lines changed: 30 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -1,31 +1,32 @@
11
package mx.kenzie.jupiter.stream.impl;
22

3+
import mx.kenzie.jupiter.iterator.LazyIterator;
34
import mx.kenzie.jupiter.stream.ChunkStream;
5+
import org.jetbrains.annotations.NotNull;
46

57
import java.util.ArrayList;
68
import java.util.Deque;
9+
import java.util.Iterator;
710
import java.util.List;
811
import java.util.concurrent.LinkedBlockingDeque;
12+
import java.util.concurrent.atomic.AtomicLong;
913

1014
public class QueuedChunkStream<Element> extends ChunkStream<Element> {
1115

1216
protected final Deque<Element> queue;
17+
protected final AtomicLong counter;
1318
protected volatile boolean closed;
1419

1520
public QueuedChunkStream() {
1621
this.queue = new LinkedBlockingDeque<>();
22+
this.counter = new AtomicLong();
1723
}
1824

1925
@Override
2026
public void feed(Element element) {
2127
this.queue.addLast(element);
2228
}
2329

24-
@Override
25-
public Element read() {
26-
return this.queue.getFirst();
27-
}
28-
2930
@Override
3031
public Element[] readAllRemaining(Element[] array) {
3132
final List<Element> list = new ArrayList<>();
@@ -34,15 +35,37 @@ public Element[] readAllRemaining(Element[] array) {
3435
}
3536

3637
@Override
37-
public synchronized boolean isClosed() {
38-
return closed;
38+
public @NotNull Iterator<Element> iterator() {
39+
return new LazyIterator<>() {
40+
@Override
41+
public boolean hasNext() {
42+
return !closed || counter.get() + 1 < queue.size();
43+
}
44+
45+
@Override
46+
public Element next() {
47+
return read();
48+
}
49+
};
3950
}
4051

4152
@Override
4253
public boolean canRead() {
4354
return !this.queue.isEmpty();
4455
}
4556

57+
@Override
58+
public Element read() {
59+
final Element element = this.queue.getFirst();
60+
this.counter.getAndIncrement();
61+
return element;
62+
}
63+
64+
@Override
65+
public synchronized boolean isClosed() {
66+
return closed;
67+
}
68+
4669
@Override
4770
public synchronized void close() {
4871
this.closed = true;

0 commit comments

Comments
 (0)