Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Virtual time can't actually be controlled #268

Open
mightyguava opened this issue Sep 24, 2023 · 29 comments
Open

Virtual time can't actually be controlled #268

mightyguava opened this issue Sep 24, 2023 · 29 comments

Comments

@mightyguava
Copy link

While trying to test delays with turbine I noticed that trying to control virtual time didn't actually do anything. Like the test here

@Test
fun virtualTimeCanBeControlled() = runTest {
flow {
delay(5000)
emit("1")
delay(5000)
emit("2")
}.test {
expectNoEvents()
advanceTimeBy(5000)
expectNoEvents()
runCurrent()
assertEquals("1", awaitItem())
advanceTimeBy(5000)
expectNoEvents()
runCurrent()
assertEquals("2", awaitItem())
awaitComplete()
}
}

If you just remove all of the advanceTimeBy() and runCurrent() calls, it still passes

  @Test
  fun virtualTimeCanBeControlled() = runTest {
    flow {
      delay(5000)
      emit("1")
      delay(5000)
      emit("2")
    }.test {
      expectNoEvents()
      expectNoEvents()
      assertEquals("1", awaitItem())
      expectNoEvents()
      assertEquals("2", awaitItem())
      awaitComplete()
    }
  }

I'd expect the lack of advanceTimeBy() calls to fail the first assertEquals(). I think what's happening is that since awaitItem() will yield, the TestDispatcher will advance time automatically. This makes virtual time meaningless when testing with Turbine unless all you want to do is verify ordering.

There's also another annoying problem that an awaitItem() call will cause other coroutines in the same scope that are running a while (true) { delay(...); doSomething(...) } to constantly spin.

@JakeWharton
Copy link
Collaborator

I'd expect the lack of advanceTimeBy() calls to fail the first assertEquals().

That is not how runTest works. If you do not take control of the time, delays are automatically skipped. We do not control this behavior, and you'll see the same behavior with or without the use of Turbine.

There's also another annoying problem that an awaitItem() call will cause other coroutines in the same scope that are running a while (true) { delay(...); doSomething(...) } to constantly spin.

Once again we are not in control of this. If awaitItem has to suspend other coroutines will run. If you are using a dispatcher that skips delays and does not otherwise suspend this coroutine will spin. You'll see the same behavior with or without the use of Turbine.

@mightyguava
Copy link
Author

I understand that the root of the issue is that runTest doesn't offer a method of using virtual time without delay skipping. But I think we can improve the Turbine's interaction with virtual time if Turbine allows you to use virtual time instead of wall time for timeouts.

If I make a change to withAppropriateTimeout to use withTimeout instead of withWallclockTimeout inside runTest, the test behaves as expected. This makes sense since Turbine is now within the same delay-skipping scope and timeouts run on virtual time. There's a comment there that says

// withTimeout uses virtual time, which will hang.

which does not reproduce for me. I wonder if it's defending against behavior from an older implementation of runTest?

Could there maybe be an option to create a Turbine that waits on virtual time instead of wall time?

@jingibus
Copy link
Collaborator

I wonder if it's defending against behavior from an older implementation of runTest?

It's there because for most tests, the expectation is that in failing cases, awaitItem() will fail after some period of wall clock time and produce a useful stack trace.

My expectation is that if you make that change to withAppropriateTimeout, many of our tests validating that awaitItem() and friends throw on timeout will fail.

If that's not the case, then our decision here is worth reconsidering! It would certainly simplify the code a lot.

@mightyguava
Copy link
Author

mightyguava commented Sep 24, 2023

@jingibus making that change causes 4 tests to fail, 3 of which because the flow under test has delays that use real time with Dispatchers.Default. I only tested on jvm though.

@mightyguava
Copy link
Author

mightyguava commented Sep 24, 2023

Made a PR and fixed those tests. Looks like all tests passed. https://github.com/mightyguava/turbine/actions/runs/6291465123/job/17079856671. Failed on lint

Also added a test for delay testing

@JakeWharton
Copy link
Collaborator

Delay testing remains the same as any other suspend function. You either need to take control of virtual time and assert against the effects of delayed code, measure virtual time before and after a function call, or you can wrap your calls in withTimeout or withTimeoutOrNull. Turbine should not be changing what you do here.

Our timeout is always a wall clock timeout because it guards against a mismatched await without an associated item that would otherwise hang the test and produce an unhelpful stacktrace when runTest or similar fails.

@mightyguava
Copy link
Author

Delay testing remains the same as any other suspend function. You either need to take control of virtual time and assert against the effects of delayed code, measure virtual time before and after a function call, or you can wrap your calls in withTimeout or withTimeoutOrNull. Turbine should not be changing what you do here.

If the test writer forgets to take control of virtual time in their test, Turbine will advance it for them, rendering their test for delays useless. That might be "correct" behavior and is due to runTest, but it is surprising behavior from my point of view.

I'm interested in the use of withTimeout or withTimeoutOrNull to make delay testing work in a more expected way though. Would you have the time to show an example of how it can be applied to 570fe47, which fails on trunk and passes on my branch?

Our timeout is always a wall clock timeout because it guards against a mismatched await without an associated item that would otherwise hang the test and produce an unhelpful stacktrace when runTest or similar fails.

I think this test that @jingibus suggested exercises the behavior you are describing? The test passes with virtual time on my PR.

@Test fun awaitFailsOnVirtualTime() = runTestTurbine {
  assertFailsWith<AssertionError> {
    flow<Nothing> {
      awaitCancellation()
    }.test {
      awaitComplete()
    }
  }
}

@JakeWharton
Copy link
Collaborator

Turbine will advance it for them, rendering their test for delays useless. That might be "correct" behavior and is due to runTest, but it is surprising behavior from my point of view.

This behavior comes from runTest. If you use runBlocking instead delays aren't skipped. Once again this has nothing to do with Turbine, and is the behavior that all coroutines inherit from the enclosing dispatcher.

@matejdro
Copy link

matejdro commented Sep 25, 2023

What I do in these cases is use expectMostRecentItem instead of awaitItem. Expect method will not advance the time.

To me, it seems reasonable that awaitItem will advance virtual time, since it's supposed to await (e.g. wait until some item arrives from flow).

@mightyguava
Copy link
Author

mightyguava commented Sep 25, 2023

expectMostRecentItem works if you expect your flow to produce just one more item. I don't believe that would be the case for many flows, particularly non-finite flows.

I agree it's reasonable for awaitItem to advance virtual time. However, it should advance virtual time by a maximum of what timeout is (default 3s). It currently advances virtual time by whatever 3 seconds of wall time will take you to.

As runTest puts all coroutines into virtual time by default, it's unexpected for Turbine to deal with timeouts in wall time. There seems to have been a bad interaction with runTest in the past where using virtual time timeout would cause Turbine to hang, but that issue no longer appears to be present.

@JakeWharton
Copy link
Collaborator

There seems to have been a bad interaction with runTest in the past where using virtual time timeout would cause Turbine to hang, but that issue no longer appears to be present.

Turbine's timeout has nothing to do with runTest and predates the function even existing. I explained its use above as a guard against hung tests due to mismatched item emission and awaits such that the failure exception indicates the problematic await.

@mightyguava
Copy link
Author

mightyguava commented Sep 25, 2023

I explained its use above as a guard against hung tests due to mismatched item emission and awaits such that the failure exception indicates the problematic await.

I didn't know that the use of wall time here predates runTest, that makes sense. It then follows based on the discussion above that Turbine's timeout doesn't need to use wall time when running within runTest because awaits do not hang on the virtual time.

as a guard against hung tests due to mismatched item emission and awaits such that the failure exception indicates the problematic await.

^ this hang does not happen in virtual time.

@JakeWharton
Copy link
Collaborator

This test verifies the timeout behavior:

@Test fun failsOnDefaultTimeout() = runTest {
neverFlow().test {
val actual = assertFailsWith<AssertionError> {
awaitItem()
}
assertEquals("No value produced in 3s", actual.message)
assertCallSitePresentInStackTraceOnJvm(
throwable = actual,
entryPoint = "ChannelTurbine\$awaitItem",
callSite = "FlowTest\$failsOnDefaultTimeout",
)
}
}

@mightyguava
Copy link
Author

mightyguava commented Sep 25, 2023

Here's the test modified to measure how long it took to run

  @Test fun failsOnDefaultTimeout() = runTest {
    val took = measureTime {
      neverFlow().test {
        val actual = assertFailsWith<AssertionError> {
          awaitItem()
        }
        assertEquals("No value produced in 3s", actual.message)
        assertCallSitePresentInStackTraceOnJvm(
          throwable = actual,
          entryPoint = "ChannelTurbine\$awaitItem",
          callSite = "FlowTest\$failsOnDefaultTimeout",
        )
      }
    }
    println("This test actually took $took")
  }

With the timeout using virtual time instead of wall time, it passes, and prints:

This test actually took 50.354666ms

@mightyguava
Copy link
Author

@JakeWharton @jingibus I'm wondering if there's enough evidence shown here that it would be safe to add an option to Turbine to do timeouts in virtual time instead of wall time? It would benefit people who use runTest and would like to delays within flows.

This statement has been mentioned several times

I explained its use above as a guard against hung tests due to mismatched item emission and awaits such that the failure exception indicates the problematic await.

I tried my best to reproduce the issue described, and tried the test above, but as far as I can tell it does not happen using virtual time with the current versions of Turbine and Kotlin.

@JakeWharton
Copy link
Collaborator

I do not believe we need that option, no. The existing timeout not running on wall clock time is just a bug that needs fixed. Someone will get to it eventually.

If you want to test delays you should either take control of virtual time with manual advancement or monitor the virtual clock to see how much time has passed between events. You can also use withTimeout on our await functions the same way you can for any suspend function.

@mightyguava
Copy link
Author

The existing timeout not running on wall clock time is just a bug that needs fixed. Someone will get to it eventually.

What bug is this? If you are referring to #268 (comment), that is with on my branch here #269, where I moved Turbine off wall clock for timeouts. This behavior is not a bug imo as I've said above. There doesn't appear to be any benefit for Turbine to timeout in wall time when the test is running in virtual time. There are no hangs moving to virtual time, it's easier to test delays, and tests run faster.

Anyways, point taken, I'll look into alternatives for testing delays.

@ryanholden8
Copy link

ryanholden8 commented Apr 19, 2024

Hi 👋 not sure I follow all the low level technical things here but it seems, at the end of the day, it's not currently possible to use this library when delays are involved and want to assert the delay (rather than skip). But please feel free to correct me if I'm misunderstanding.

For example:

Can't write a test like this in Turbine:

@Test
fun minDelayBetweenUpdatesTest() = runTest {
    val emittedValues = mutableListOf<Int>()

    val flow = MutableSharedFlow<Int>(replay = 3)

    flow
        .minDelayBetweenUpdates(
            delay = 1.seconds,
            shouldDelay = { newItem, _ -> newItem != 2 },
            now = { Instant.fromEpochMilliseconds(testScheduler.currentTime) },
        )
        .onEach { emittedValues.add(it) }
        // auto close flow when test completes so test does not get stuck
        .launchIn(backgroundScope)

    // Emit 3 values very fast
    flow.emit(1)
    flow.emit(2)
    flow.emit(3)

    // Assert: Nothing emitted yet cause time has not advanced
    emittedValues isEqualTo emptyList()

    // Act: Move just enough for the 1st item
    advanceTimeBy(1.milliseconds)

    // Assert: 1st and 2nd item emitted because shouldDelay returns false for item 2
    emittedValues isEqualTo listOf(1, 2)

    // Act: Move up just before the 3rd item is emitted
    advanceTimeBy(0.9.seconds)

    // Assert: The 3rd item has not been emitted
    emittedValues isEqualTo listOf(1, 2)

    // Act: Advance just enough for the 3rd item to be emitted
    advanceTimeBy(0.1.seconds)

    // Assert: The 3rd item has been emitted
    emittedValues isEqualTo listOf(1, 2, 3)

    // Act: Move time forward close to the delay parameter
    advanceTimeBy(0.9.seconds)
    // Act: Emit a 4th item
    flow.emit(4)

    // Assert: The 4th item has not been emitted because it's not past the min delay parameter
    emittedValues isEqualTo listOf(1, 2, 3)

    // Act: Advance just enough for the 4th item to be emitted (taking it the 1 second min delay)
    advanceTimeBy(0.1.seconds)

    // Assert: The 4th item has been emitted
    emittedValues isEqualTo listOf(1, 2, 3, 4)

    // Act: Advance just enough for the 5th item to be emitted without any delay because it's past the min delay mark
    advanceTimeBy(1.seconds)
    // Act: Emit a 5th item
    flow.emit(5)
    // Act: Allow the flow to emit
    advanceTimeBy(1.milliseconds)

    // Assert: The 5th item has been emitted
    emittedValues isEqualTo listOf(1, 2, 3, 4, 5)
}

/**
 * Ensures there is a minimum time between emitted values. Does not delay more than required by calculating how much
 * time has already passed between updates and not delaying if [shouldDelay] returns false.
 *
 * For example, if [delay] is 1 second and the next value was emitted 0.2 seconds later than the update is
 * only delayed 0.8 seconds. No delay would occur if an update was after 1 second.
 *
 * This is useful when a service provides updates faster than is reasonable for UI to display. Slows down updates to
 * allow the UI update without "flickering".
 */
fun <T> Flow<T>.minDelayBetweenUpdates(
    delay: Duration,
    shouldDelay: (newItem: T, currentItem: T?) -> Boolean = { _, _ -> true },
    now: () -> kotlinx.datetime.Instant = { Clock.System.now() },
) : Flow<T> {
    var lastUpdate: kotlinx.datetime.Instant? = null
    var previousItem: T? = null

    return onEach {
        // Only space time if needed
        if (shouldDelay(it, previousItem)) {
            // How long since our last update?
            val durationSinceLastUpdate = lastUpdate?.let { lastUpdate -> now() - lastUpdate }

            // Only space time if enough time has not passed since the last update
            if (durationSinceLastUpdate != null && delay > durationSinceLastUpdate) {
                // Calculate how much time is left to reach the desired time spacing
                val timeLeftToMeetDelayRequirement = delay - durationSinceLastUpdate

                delay(timeLeftToMeetDelayRequirement)
            }
        }

        // Track when we updated to know how much time has passed for the next update
        lastUpdate = now()
        previousItem = it
    }
}

@jingibus
Copy link
Collaborator

Hi 👋 not sure I follow all the low level technical things here but it seems, at the end of the day, it's not currently possible to use this library when delays are involved and want to assert the delay (rather than skip). But please feel free to correct me if I'm misunderstanding.

Kotlin's implementation of virtual time does not allow for (or, at least, encourage) the style of testing you present in your example. You can't take a stopwatch and advance it by 1s, 2s, 3s, etc and observe what happens at each time period.

The design intent is to flip the relationship on its head: instead of driving the test with the stopwatch, drive the test with the events you wish to observe. When each event occurs, observe the value of the stopwatch to see whether it arrived at the time you expected.

Turbine mostly has no opinion on this (if you want to work around this paradigm, go right ahead - Turbine will not make your life any easier or harder than the rest of coroutines+flow), except that (as @JakeWharton explains above) it takes pains to timeout using wall clock time. This is because Turbine's timeouts are intended to detect bugs, not programmatic delays: i.e. I expected to observe event X, and event X never occurred. It would be inappropriate if that test were to fail because of a call to delay(10000).

@ryanholden8
Copy link

Thanks for taking the time to reply @jingibus !

Kotlin's implementation of virtual time does not allow for (or, at least, encourage) the style of testing you present in your example. You can't take a stopwatch and advance it by 1s, 2s, 3s, etc and observe what happens at each time period.

Not sure I understand, advanceTimeBy is a standard function that seems to do directly this?

The design intent is to flip the relationship on its head: instead of driving the test with the stopwatch, drive the test with the events you wish to observe. When each event occurs, observe the value of the stopwatch to see whether it arrived at the time you expected.

Ah I can see that design working well.. is there docs or an example somewhere?

Turbine mostly has no opinion on this (if you want to work around this paradigm, go right ahead - Turbine will not make your life any easier or harder than the rest of coroutines+flow), except that (as @JakeWharton explains above) it takes pains to timeout using wall clock time. This is because Turbine's timeouts are intended to detect bugs, not programmatic delays: i.e. I expected to observe event X, and event X never occurred. It would be inappropriate if that test were to fail because of a call to delay(10000).

Reading over this and I'm sure I lack knowledge on this subject. From my standpoint, I'm just a developer trying to understand how to write a test for a flow where I need to actually assert delay behavior, not skip it.

@jingibus
Copy link
Collaborator

Not sure I understand, advanceTimeBy is a standard function that seems to do directly this?

Yes, it is there, and yes it does advance the virtual time as in your example. But it will also automatically advance virtual time.

Ah I can see that design working well.. is there docs or an example somewhere?

JetBrains mostly seems content to let us figure this out ourselves from the reference documentation, rather than provide a clear guide on best practices. (I don't love this.)

From my standpoint, I'm just a developer trying to understand how to write a test for a flow where I need to actually assert delay behavior, not skip it.

I would try something like this:

flow.test {
  val start = currentTime

  assertThat(awaitItem()).isEqualTo(1)
  assertThat(currentTime - start).isEqualTo(1000)

  assertThat(awaitItem()).isEqualTo(2)
  assertThat(currentTime - start).isEqualTo(1000)

  assertThat(awaitItem()).isEqualTo(3)
  assertThat(currentTime - start).isEqualTo(2000)
}

@kevincianfarini
Copy link
Contributor

You can use the TestScope.testTimeSource to measure and get the current virtual time. For example:

@Test 
fun test_delay_period() = runTest {
  val flow = flow { delay(50); emit(true) }
  flow.test {
    val duration = testTimeSource.measureTime { awaitItem() }
    assertEquals(50.milliseconds, duration)
  }
}

@ryanholden8
Copy link

You can use the TestScope.testTimeSource to measure and get the current virtual time. For example:

@Test 
fun test_delay_period() = runTest {
  val flow = flow { delay(50); emit(true) }
  flow.test {
    val duration = testTimeSource.measureTime { awaitItem() }
    assertEquals(50.milliseconds, duration)
  }
}

Thank you @kevincianfarini for taking the time to provide this example test code. However, it does not seem to work as expected when more than 1 delay is used, which is the main use case. It appears all the delays are immediately applied so assertEquals(50.milliseconds, duration1) fails because duration1 is 150 instead of the expected 50.

@Test
fun test_delay_period() = runTest {
    val flow = flow {
        delay(50)
        emit(true)
        delay(100)
        emit(false)
    }
    flow.test {
        val duration1 = testTimeSource.measureTime { awaitItem() }
        assertEquals(50.milliseconds, duration1)

        val duration2 = testTimeSource.measureTime { awaitItem() }
        assertEquals(100.milliseconds, duration2)

        awaitComplete()
    }
}

@ryanholden8
Copy link

I would try something like this:

flow.test {
  val start = currentTime

  assertThat(awaitItem()).isEqualTo(1)
  assertThat(currentTime - start).isEqualTo(1000)

  assertThat(awaitItem()).isEqualTo(2)
  assertThat(currentTime - start).isEqualTo(1000)

  assertThat(awaitItem()).isEqualTo(3)
  assertThat(currentTime - start).isEqualTo(2000)
}

Thank you too @jingibus for the sample code! Just wasn't sure how to create the flow variable for the expected assertions to work.

This code does not work for the same issue as my reply to @kevincianfarini above: (All delays are applied right away)

@Test
fun test_delay_period() = runTest {
    flow {
        // Emit 3 values very fast
        emit(1)
        emit(2)
        emit(3)
        // Emit after 0.9 seconds, so additional delay should only be 0.1 seconds from minDelayBetweenUpdates (if we applied it here, not applying to get basic test working)
        delay(0.9.seconds)
        emit(4)
        // Emit after 1 second, no additional delay should occur from minDelayBetweenUpdates (if we applied it here, not applying to get basic test working)
        delay(1.1.seconds)
        emit(5)

    }.test {
        val startTime = currentTime

        assertThat(awaitItem()).isEqualTo(1)
        assertThat(currentTime - startTime).isEqualTo(0)

        assertThat(awaitItem()).isEqualTo(2)
        assertThat(currentTime - startTime).isEqualTo(0)

        assertThat(awaitItem()).isEqualTo(3)
        assertThat(currentTime - startTime).isEqualTo(0)

        assertThat(awaitItem()).isEqualTo(4)
        assertThat(currentTime - startTime).isEqualTo(900)

        assertThat(awaitItem()).isEqualTo(5)
        assertThat(currentTime - startTime).isEqualTo(2000)

        awaitComplete()
    }
}

Fails on the first assertThat(currentTime - startTime).isEqualTo(0) with expected: 0 but was : 2000

If I use advanceTimeBy instead of delay though, it fails on the 900 assertion assertThat(currentTime - startTime).isEqualTo(900) with expected: 900 but was : 0.

So to sum up, I still don't understand how to use the turbine design pattern to control virtual time for tests. The code in my comment which does not use Turbine does work and allows control of virtual time

@kevincianfarini
Copy link
Contributor

kevincianfarini commented Apr 23, 2024

@ryanholden8 I'm not quite sure why my example isn't working for you. The following test passes locally for me.

@OptIn(ExperimentalTime::class)
@Test fun foo() = runTest {
    val flow = flow {
        delay(50)
        emit(1)
        delay(100)
        emit(2)
    }

    flow.test {
        val duration1 = testTimeSource.measureTime { awaitItem() }
        assertEquals(50.milliseconds, duration1)
        val duration2 = testTimeSource.measureTime { awaitItem() }
        assertEquals(100.milliseconds, duration2)
        awaitComplete()
    }
}

In the debugger while running the test, you can see the values of duration1 and duration2 as well.

image

Which versions of Turbine and Coroutines are you using?

@ryanholden8
Copy link

ryanholden8 commented Apr 23, 2024

@ryanholden8 I'm not quite sure why my example isn't working for you. The following test passes locally for me.

Which versions of Turbine and Coroutines are you using?

ah good instinct. Was using 0.12.0 upgrading to 1.1.0 fixes the failing test.

@kevincianfarini
Copy link
Contributor

Awesome 👍

@ryanholden8
Copy link

ryanholden8 commented Apr 23, 2024

Here's the final test using Turbine to assert virtual time, in case anybody finds this thread. Thanks @kevincianfarini & @jingibus for the practical help! The test is much cleaner than the original test using advanceTimeBy in #268 (comment)

@Test
fun minDelayBetweenUpdates() = runTest {
    flow {
        // Emit 3 values very fast
        emit(1)
        emit(2)
        emit(3)
        // Emit after 0.9 seconds, so minDelayBetweenUpdates should only add an additional delay of 0.1 seconds
        delay(0.9.seconds)
        emit(4)
        // Emit after 5 seconds, no additional delay should occur from minDelayBetweenUpdates
        delay(5.seconds)
        emit(5)

    }
        .minDelayBetweenUpdates(
            delay = 1.seconds,
            // Don't delay item 2
            shouldDelay = { newItem, _ -> newItem != 2 },
            now = { Instant.fromEpochMilliseconds(currentTime) },
        )
        .test {
            with (testTimeSource) {
                // Item 1 is not delayed
                measureTimedValue { awaitItem() } isEqualTo TimedValue(1, 0.seconds)
                // Item 2 is not delayed because shouldDelay returns false
                measureTimedValue { awaitItem() } isEqualTo TimedValue(2, 0.seconds)
                // Item 3 was emitted at the same time as 1 and 2 but should have a delay of 1 second
                measureTimedValue { awaitItem() } isEqualTo TimedValue(3, 1.seconds)
                // Item 4 was emitted at 0.9 seconds after item 3 so it should only be delayed 0.1 seconds
                // taking it up to the min delay of 1 second
                measureTimedValue { awaitItem() } isEqualTo TimedValue(4, 1.seconds)
                // Item 5 was emitted at 5 seconds after item 4, so there should be no additional delay
                measureTimedValue { awaitItem() } isEqualTo TimedValue(5, 5.seconds)
            }

            awaitComplete()
        }
}

/**
 * Ensures there is a minimum time between emitted values. Does not delay more than required by calculating how much
 * time has already passed between updates and not delaying if [shouldDelay] returns false.
 *
 * For example, if [delay] is 1 second and the next value was emitted 0.2 seconds later than the update is
 * only delayed 0.8 seconds. No delay would occur if an update was after 1 second.
 *
 * This is useful when a service provides updates faster than is reasonable for UI to display. Slows down updates to
 * allow the UI to update without "flickering".
 */
fun <T> Flow<T>.minDelayBetweenUpdates(
    delay: Duration,
    shouldDelay: (newItem: T, currentItem: T?) -> Boolean = { _, _ -> true },
    now: () -> kotlinx.datetime.Instant = { Clock.System.now() },
    delayDispatcher: CoroutineDispatcher? = null,
): Flow<T> {
    var lastUpdate: kotlinx.datetime.Instant? = null
    var previousItem: T? = null

    return onEach {
        // Only space time if needed
        if (shouldDelay(it, previousItem)) {
            // How long since our last update?
            val durationSinceLastUpdate = lastUpdate?.let { lastUpdate -> now() - lastUpdate }

            // Only space time if enough time has not passed since the last update
            if (durationSinceLastUpdate != null && delay > durationSinceLastUpdate) {
                // Calculate how much time is left to reach the desired time spacing
                val timeLeftToMeetDelayRequirement = delay - durationSinceLastUpdate

                if (delayDispatcher != null) {
                    withContext(delayDispatcher) {
                        delay(timeLeftToMeetDelayRequirement)
                    }
                } else {
                    delay(timeLeftToMeetDelayRequirement)
                }
            }
        }

        // Track when we updated to know how much time has passed for the next update
        lastUpdate = now()
        previousItem = it
    }
}

/**
 * Asserts that this object is equal to [expected].
 * Syntax sugar to help unit test assertions read more like factual statements.
 */
infix fun <T> T?.isEqualTo(expected: T?) = Truth.assertThat(this).isEqualTo(expected)

@jingibus
Copy link
Collaborator

That's a great example.

@JakeWharton, I don't think we can fix this as long as we are heavily buffering the internal channel; we'd need to switch it to RENDEZVOUS.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

6 participants