-
Notifications
You must be signed in to change notification settings - Fork 249
Create SyncOrchestrator
#4176
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Create SyncOrchestrator
#4176
Changes from 5 commits
85f7e5e
64f5a30
fa41a0a
4218140
64abdf6
68aa4a4
d1f2240
d1afcb4
27e841a
b38464c
ef8c524
efcc10c
07ed484
ecd76d8
b3bba14
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,148 @@ | ||
/* | ||
* Copyright 2025 New Vector Ltd. | ||
* | ||
* SPDX-License-Identifier: AGPL-3.0-only OR LicenseRef-Element-Commercial | ||
* Please see LICENSE files in the repository root for full details. | ||
*/ | ||
|
||
package io.element.android.appnav.di | ||
|
||
import dagger.assisted.Assisted | ||
import dagger.assisted.AssistedFactory | ||
import dagger.assisted.AssistedInject | ||
import io.element.android.features.networkmonitor.api.NetworkMonitor | ||
import io.element.android.features.networkmonitor.api.NetworkStatus | ||
import io.element.android.libraries.core.coroutine.CoroutineDispatchers | ||
import io.element.android.libraries.matrix.api.MatrixClient | ||
import io.element.android.libraries.matrix.api.sync.SyncState | ||
import io.element.android.services.appnavstate.api.AppForegroundStateService | ||
import io.element.android.services.appnavstate.api.SyncOrchestrator | ||
import kotlinx.coroutines.CoroutineName | ||
import kotlinx.coroutines.CoroutineScope | ||
import kotlinx.coroutines.FlowPreview | ||
import kotlinx.coroutines.cancel | ||
import kotlinx.coroutines.delay | ||
import kotlinx.coroutines.flow.combine | ||
import kotlinx.coroutines.flow.debounce | ||
import kotlinx.coroutines.flow.distinctUntilChanged | ||
import kotlinx.coroutines.flow.first | ||
import kotlinx.coroutines.launch | ||
import kotlinx.coroutines.sync.Mutex | ||
import timber.log.Timber | ||
import java.util.concurrent.atomic.AtomicBoolean | ||
import kotlin.time.Duration.Companion.milliseconds | ||
import kotlin.time.Duration.Companion.seconds | ||
|
||
class DefaultSyncOrchestrator @AssistedInject constructor( | ||
@Assisted matrixClient: MatrixClient, | ||
private val baseCoroutineScope: CoroutineScope = matrixClient.sessionCoroutineScope, | ||
private val appForegroundStateService: AppForegroundStateService, | ||
private val networkMonitor: NetworkMonitor, | ||
private val dispatchers: CoroutineDispatchers, | ||
) : SyncOrchestrator { | ||
@AssistedFactory | ||
interface Factory { | ||
fun create(matrixClient: MatrixClient): DefaultSyncOrchestrator | ||
} | ||
|
||
private val syncService = matrixClient.syncService() | ||
|
||
private val initialSyncMutex = Mutex() | ||
|
||
private var coroutineScope: CoroutineScope? = null | ||
|
||
private val tag = "SyncOrchestrator" | ||
|
||
private val started = AtomicBoolean(false) | ||
|
||
/** | ||
* Starting observing the app state and network state to start/stop the sync service. | ||
* | ||
* Before observing the state, a first attempt at starting the sync service will happen if it's not already running. | ||
*/ | ||
@OptIn(FlowPreview::class) | ||
override fun start() { | ||
if (!started.compareAndSet(false, true)) { | ||
Timber.tag(tag).d("already started, exiting early") | ||
return | ||
} | ||
|
||
Timber.tag(tag).d("start observing the app and network state") | ||
|
||
if (syncService.syncState.value != SyncState.Running) { | ||
Timber.tag(tag).d("initial startSync") | ||
baseCoroutineScope.launch(dispatchers.io) { | ||
try { | ||
initialSyncMutex.lock() | ||
syncService.startSync() | ||
bmarty marked this conversation as resolved.
Show resolved
Hide resolved
|
||
|
||
// Wait until it's running | ||
syncService.syncState.first { it == SyncState.Running } | ||
} finally { | ||
initialSyncMutex.unlock() | ||
} | ||
} | ||
} | ||
|
||
coroutineScope = CoroutineScope(baseCoroutineScope.coroutineContext + CoroutineName(tag) + dispatchers.io) | ||
|
||
coroutineScope?.launch { | ||
// Wait until the initial sync is done, either successfully or failing | ||
initialSyncMutex.lock() | ||
|
||
combine( | ||
// small debounce to avoid spamming startSync when the state is changing quickly in case of error. | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Maybe the debounced can be moved after the combine block (before .distinctUntilChanged())? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This causes issues in the tests, since it's preventing some intermediate states from being generated. I'd rather keep it were it is, if possible. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Can you try with this patch? |
||
syncService.syncState.debounce(100.milliseconds), | ||
networkMonitor.connectivity, | ||
appForegroundStateService.isInForeground, | ||
appForegroundStateService.isInCall, | ||
appForegroundStateService.isSyncingNotificationEvent, | ||
) { syncState, networkState, isInForeground, isInCall, isSyncingNotificationEvent -> | ||
val isAppActive = isInForeground || isInCall || isSyncingNotificationEvent | ||
val isNetworkAvailable = networkState == NetworkStatus.Connected | ||
|
||
Timber.tag(tag).d("isAppActive=$isAppActive, isNetworkAvailable=$isNetworkAvailable") | ||
if (syncState == SyncState.Running && !isAppActive) { | ||
// Don't stop the sync immediately, wait a bit to avoid starting/stopping the sync too often | ||
delay(3.seconds) | ||
SyncStateAction.StopSync | ||
} else if (syncState != SyncState.Running && isAppActive && isNetworkAvailable) { | ||
SyncStateAction.StartSync | ||
} else { | ||
SyncStateAction.NoOp | ||
} | ||
} | ||
.distinctUntilChanged() | ||
.collect { action -> | ||
when (action) { | ||
SyncStateAction.StartSync -> { | ||
syncService.startSync() | ||
} | ||
SyncStateAction.StopSync -> { | ||
syncService.stopSync() | ||
} | ||
SyncStateAction.NoOp -> Unit | ||
} | ||
} | ||
} | ||
} | ||
|
||
/** | ||
* Stop observing the app state and network state. | ||
*/ | ||
override fun stop() { | ||
bmarty marked this conversation as resolved.
Show resolved
Hide resolved
|
||
if (!started.compareAndSet(true, false)) { | ||
Timber.tag(tag).d("already stopped, exiting early") | ||
return | ||
} | ||
Timber.tag(tag).d("stop observing the app and network state") | ||
coroutineScope?.cancel() | ||
coroutineScope = null | ||
} | ||
} | ||
|
||
private enum class SyncStateAction { | ||
StartSync, | ||
StopSync, | ||
NoOp, | ||
} |
Uh oh!
There was an error while loading. Please reload this page.