-
Notifications
You must be signed in to change notification settings - Fork 620
groupwithin new implementation #3162
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
val size: Long = os.size.toLong | ||
|
||
// checking if it's empty to avoid early termination of the stream if the producer is faster than consumers | ||
val streamExhausted: Boolean = supplyEnded && os.isEmpty |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I wonder if these can just be def
s in which case I expect they will be inlined anyway.
def emitChunk(n: Long): F2[Chunk[O]] = state.modify(_.splitAt(n)) | ||
|
||
val dequeue: F2[Chunk[O]] = | ||
F.race(bySize, byTime).flatMap { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm worried if a race condition where both sides of this race complete can lead to deadlock. Actually, this concern of mine is what inspired this Cats Effect issue.
)(implicit F: Temporal[F2]): Stream[F2, Chunk[O]] = { | ||
|
||
case class State(@uncheckedVariance os: Chunk[O], supplyEnded: Boolean) { | ||
case class State(os: Chunk[O @uncheckedVariance], supplyEnded: Boolean) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Instead of unchecked variance (which usually indicates possible unsoundness) can we just make it a type parameter?
case class State(os: Chunk[O @uncheckedVariance], supplyEnded: Boolean) { | |
case class State[+O](os: Chunk[O], supplyEnded: Boolean) { |
val groupSize = chunkSize.toLong | ||
|
||
val enqueue: F2[Unit] = | ||
evalTap(o => state.update(State.add[O](o)) *> supply.release) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Since you are not using the result after it.
evalTap(o => state.update(State.add[O](o)) *> supply.release) | |
foreach(o => state.update(State.add[O](o)) *> supply.release) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm not sure that removing the demand
semaphore is valid, you basically lose all backpressure, if downstream starts pulling and then stops for a while, the producer will keep running and fill up the buffer.
Furthermore, this also removes all tracking of upstream errors, which isn't valid (and tbf we should add a test here, which is currently missing).
In other words, the additional complexity of the current implementation seems justified unless we can provide an alternative which is functionally equivalent :)
@SystemFw yeah that make sense. I understand the role of the demand semaphore now: it wasn't clear to me immediately. I've hooked up VisualVm and realised memory performance is much worse 😓 I have a question about error handling: what kind of errors can we expect when adding elements to the buffer ? |
ah, that's not what I mean, I'm talking about propagation of upstream failures. Basically if you have val stream2 = stream1.groupWithin(100, 2.seconds) I would expect stream2 to fail if stream1 does, but in this case stream1 becomes just a Basically, every combinator has a set of (non-trivial) things it needs to do before we even talk about its specific logic: error propagation, backpressure, chunking, etc. This is why it's generally a good idea to use existing combinators when possible, e.g. (As a separate point, today I spoke with @armanbilge a bit about benchmarking the implementation in main not against its immediate predecessor based on |
Ok I see: thanks for the explanation. That was one of the things I didn't understand about the existing implementation. Will try to improve this one to be more robust or rely on existing combinators if possible. I'll also have a look at the older implementations. Might be able to find something interesting (current one definitely is, learned a lot here 🚀 ) |
@SystemFw @diesalbla @armanbilge thanks for the feedback/suggestions. I've made changes to take into account error propagation and memory utilization Change log (updated)
After these changes performance is still better (ops/sec up to 100% better especially for large streams, I’m not familiar with I've run the tests a few times, without failures (except for the flaky "accumulation & splitting"): hopefully you will get the same results. Lastly I’ve followed the discussion on Discord and I’m aware that the long term plan is to move away from semaphores and rely on fs2 data types. Will start looking at Pull et al in the next weeks: in the meantime hopefully these changes are a small step forward. Let me know what you guys think and if there’s anything that still needs fixing. |
F.ifM(streamExhausted)(F.pure(Chunk.empty[O]), awaitAndEmitNext) | ||
|
||
val dequeue: F2[Chunk[O]] = | ||
F.race(supply.acquireN(groupSize), F.sleep(timeout)).flatMap { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I’ve read the issue attached above (race condition), am I right to think that this implementation is potentially better if the race is left-biased, since the timeout logic (on the right) is slightly more convoluted ?
Less chance of deadlock is still not as good as no chance of deadlock 😁
I think the current implementation of groupWithin
uses a hack, where it always returns the semaphore, and then attempts to re-acquire it. But there's no way to fix this properly without implementing the Ior
-based race
as described in that issue.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I was struggling a bit to understand the problem (i.e where the deadlock would occur) until I clicked through a few links from the issue linked above and stumbled across your PR comment here
If I understood your concern correctly the issue is that the semaphore might lose the race, even if it acquired the permits successfully. So in that case we would need to restore them, in order to prevent a deadlock when the onTimeout
logic tries to acquire a permit again (that simply won't be there if the semaphore acquired them without releasing them)
if that is the case I believe this is not a problem in this implementation because of this logic:
val isBufferEmpty: F2[Boolean] =
buffer.size.map(_ == 0)
val awaitAndEmitNext: F2[Chunk[O]] = for {
isEmpty <- isBufferEmpty
awaited <- supply.acquire.whenA(isEmpty).as(if (isEmpty) 1 else 0)
flushed <- emitChunk
// lower supply by {flushed.size} (excluding element already awaited)
_ <- supply.tryAcquireN((flushed.size.toLong - awaited).max(0))
} yield flushed
basically if the buffer contains any element by the time the timeout expires we won't attempt to acquire the permit in a blocking fashion in the for-comprehension.
So by definition the deadlock cannot occur or in other words we should be able to rule out the deadlock because if the buffer is empty then we will know for a fact that the competing semaphore (the one in the race) hasn't acquired any permit (i.e. it does not need to release them)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for looking more closely at this! Yes, you've definitely understood the issue.
After looking at your code and thinking about it, I think you are right that it won't deadlock. However, I still suspect that the state can get corrupted. Something like this:
- The
race
times-out, although the semaphoreacquireN
also succeeds.
fs2/core/shared/src/main/scala/fs2/Stream.scala
Lines 1456 to 1458 in d676edd
F.race(supply.acquireN(groupSize), F.sleep(timeout)).flatMap { case Left(_) => emitChunk case Right(_) => onTimeout - We successfully
emitChunk
viaonTimeout
andawaitAndEmitNext
.
fs2/core/shared/src/main/scala/fs2/Stream.scala
Lines 1452 to 1453 in d676edd
val onTimeout = F.ifM(streamExhausted)(F.pure(Chunk.empty[O]), awaitAndEmitNext)
fs2/core/shared/src/main/scala/fs2/Stream.scala
Lines 1444 to 1447 in d676edd
val awaitAndEmitNext: F2[Chunk[O]] = for { isEmpty <- isBufferEmpty awaited <- supply.acquire.whenA(isEmpty).as(if (isEmpty) 1 else 0) flushed <- emitChunk
fs2/core/shared/src/main/scala/fs2/Stream.scala
Lines 1420 to 1421 in d676edd
val emitChunk: F2[Chunk[O]] = buffer.tryTakeN(Some(groupSize.toInt)).map(Chunk.seq) - This unblocks the
buffer
, which concurrently gets filled with new stuff and releases permits.
fs2/core/shared/src/main/scala/fs2/Stream.scala
Lines 1441 to 1442 in d676edd
val enqueue: F2[Unit] = foreach(buffer.offer(_) <* supply.release).compile.drain.guarantee(endSupply) - The
tryAcquireN
inawaitAndEmitNext
succeeds.
fs2/core/shared/src/main/scala/fs2/Stream.scala
Lines 1448 to 1450 in d676edd
// lower supply by {flushed.size} (excluding element already awaited) _ <- supply.tryAcquireN((flushed.size.toLong - awaited).max(0)) } yield flushed - At this point, we've consumed
2*chunkSize
permits to emitchunkSize
elements.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I see, thanks for breaking this down 🙌🏾
Yes I can see the problem now if the semaphore loses the race, but manages to acquire the permits. We would have a corrupted state and could potentially deadlock/slow down unnecessarily on the next race.
To be honest I didn't know this was a possibility (losing the race but securing the permits), but I think that it could be fixed as follows:
val onTimeout = {
val edgeCase = (supply.available.map(_ == 0), buffer.size.map(_ == groupSize)).mapN(_ && _)
val onExpiry = F.ifM(edgeCase)(supply.releaseN(groupSize) *> awaitAndEmitNext, awaitAndEmitNext)
F.ifM(streamExhausted)(F.pure(Chunk.empty[O]), onExpiry)
}
the edgeCase
val effectively describes the problem you have brought up: if, by the time the timeout expires there are no permits available but the buffer is full then we will know for sure that the semaphore has lost the race but succeeded in acquiring the permits. The fact that the queue is bounded
means nothing can interfere at this point and change the values of the available supply or the buffer size (correct me if I'm wrong).
And it's probably an improvement compared to the hack in the current implementation that resets the count regardless just to reacquire the permits shortly after.
val waitSupply = supply.acquireN(outputLong).guaranteeCase {
case Outcome.Succeeded(_) => supply.releaseN(outputLong)
case _ => F.unit
}
Out of curiosity I've run the benchmarks with the above modification adding a println
to see how often this would occur. It did not happen once!! So maybe this is very rare! And perhaps the race being left-biased is also a mitigating factor
What do you think ? shall I handle the scenario this way ? Or should I have a go and use the bothOutcome/racePair
combinators mentioned in the issue above?
Also what command should I run to run done 🤞🏾scalafmt
on all sub-projects ? CI keeps failing due to formatting 😓.
I've run sbt scalafmt
but that didn't work 🤔
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Out of curiosity I've run the benchmarks with the above modification adding a println to see how often this would occur
@armanbilge I have a possible explanation as to why this wasn't happening: there's a bug in the timeout logic that causes the permits count to be inaccurate
val enqueue = foreach(buffer.offer(_) <* supply.release) ...
Since the above is not atomic
val awaitAndEmitNext: F2[Chunk[O]] = for {
isEmpty <- isBufferEmpty
awaited <- supply.acquire.whenA(isEmpty).as(if (isEmpty) 1 else 0)
flushed <- emitChunk
// lower supply by {flushed.size} (excluding element already awaited)
_ <- supply.acquireN((flushed.size.toLong - awaited).max(0))
} yield flushed
Then even if the buffer contains one element, the permit count can be zero. This means that the number of permits acquired (awaited
) can be off by one when we acquire them at the end of the for comprehension. As a result the next race will be skewed towards the semaphore.
That and the fact that none of the tests have been able to reproduce the scenario.
I've fixed this in the other PR (will push the change sometime today) and I can see the edge case occurring, or at least I've got a test case where this occurs regularly.
The good thing is that the fix suggested seems to be working and can be even simplified to
val onTimeout = {
val edgeCase = (supply.available.map(_ == 0), buffer.size.map(_ == groupSize)).mapN(_ && _)
// no need to release permits and await a chunk, we know permits have already been acquired
// and we know the buffer is full so we can emit immediately
val onExpiry = F.ifM(edgeCase)(emitNext, awaitAndEmitNext)
F.ifM(streamExhausted)(F.pure(Chunk.empty[O]), onExpiry)
}
So far tests look good, will update the other PR, later on today. Closing this for now, since the other one seems to more accurate.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
the fact that none of the tests have been able to reproduce the scenario.
@Angel-O to reproduce this, try using executeEmbed
and setting up a scenario where the semaphore gets permits at the exact same time that the timeout expires.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I've fixed this in the other PR (will push the change sometime today) and I can see the edge case occurring, or at least I've got a test case where this occurs regularly.
Oh missed this at first, nice work!
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nice!! Using TestControl.executeEmbed
is a great idea for this! Will give it a go 👍🏾
Co-authored-by: Diego E. Alonso Blas <diesalbla@gmail.com>
d0ba384
to
e417d17
Compare
400deb3
to
63d0cd7
Compare
Closing this, I've discovered a bug that explains the huge performance gain (basically the stream terminates prematurely: not sure why): I'll raise another PR instead adding tests EDIT: I'm reopening it: I discovered the bug: Notes:
|
Closing in favour of a new PR as explained here. |
Update
New change log here: #3162 (comment)
Old change log
Summary: groupWithin new implementation
A new implementation for the combinator that will hopefully:
Changes
demand
semaphore: the process is entirely controlled by the thesupply
semaphoreChunk
instead ofVector
Notes
groupWithin1
) so you can compare benchmarks results between the two without switching branchesjmh
so by “performance improvements” I’m only looking at the result (ops/s).including the(I'm seeing the first failures on this test, less frequently compared to the current implementation, but they're still present, so I'm reverting this)accumulation and splitting
test marked as flaky, hence I’m promoting the test to non-flaky statusBenchmarks
suggested implementation
current implementation