Skip to content

Commit bb0c8aa

Browse files
committed
keep batch locked until emit
1 parent 1289f86 commit bb0c8aa

File tree

1 file changed

+12
-5
lines changed

1 file changed

+12
-5
lines changed

src/main/scala/util/Batcher.scala

Lines changed: 12 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -19,18 +19,25 @@ final class Batcher[Key, Elem, Batch](
1919
private val buffers = ConcurrentHashMap[Key, Buffer](initialCapacity)
2020

2121
def add(key: Key, elem: Elem): Unit =
22-
val newBuffer = buffers.compute(
22+
buffers.compute(
2323
key,
2424
(_, buffer) =>
2525
val prev = Option(buffer)
2626
if prev.isEmpty then scheduler.scheduleOnce(timeout, () => emitAndRemove(key))
27-
Buffer(
27+
val newBuffer = Buffer(
2828
append(prev.map(_.batch), elem),
2929
prev.fold(1)(_.counter + 1)
3030
)
31+
if newBuffer.counter >= maxBatchSize then
32+
emit(key, newBuffer.batch)
33+
null
34+
else newBuffer
3135
)
32-
if newBuffer.counter >= maxBatchSize then emitAndRemove(key)
3336

3437
private def emitAndRemove(key: Key): Unit =
35-
Option(buffers.remove(key)).foreach: buffer =>
36-
emit(key, buffer.batch)
38+
buffers.computeIfPresent(
39+
key,
40+
(_, buffer) =>
41+
emit(key, buffer.batch)
42+
null
43+
)

0 commit comments

Comments
 (0)