diff --git a/changelog.d/6278.bugfix b/changelog.d/6278.bugfix new file mode 100644 index 00000000000..2ee06eebac2 --- /dev/null +++ b/changelog.d/6278.bugfix @@ -0,0 +1 @@ +Fix regression on EventInsertLiveObserver getting blocked so there is no event being processed anymore. diff --git a/matrix-sdk-android/src/main/java/org/matrix/android/sdk/internal/database/EventInsertLiveObserver.kt b/matrix-sdk-android/src/main/java/org/matrix/android/sdk/internal/database/EventInsertLiveObserver.kt index 751992fa7f8..38d80c34361 100644 --- a/matrix-sdk-android/src/main/java/org/matrix/android/sdk/internal/database/EventInsertLiveObserver.kt +++ b/matrix-sdk-android/src/main/java/org/matrix/android/sdk/internal/database/EventInsertLiveObserver.kt @@ -19,10 +19,9 @@ package org.matrix.android.sdk.internal.database import com.zhuinden.monarchy.Monarchy import io.realm.RealmConfiguration import io.realm.RealmResults -import kotlinx.coroutines.flow.MutableSharedFlow -import kotlinx.coroutines.flow.launchIn -import kotlinx.coroutines.flow.onEach import kotlinx.coroutines.launch +import kotlinx.coroutines.sync.Mutex +import kotlinx.coroutines.sync.withLock import org.matrix.android.sdk.internal.database.mapper.asDomain import org.matrix.android.sdk.internal.database.model.EventEntity import org.matrix.android.sdk.internal.database.model.EventInsertEntity @@ -37,65 +36,58 @@ internal class EventInsertLiveObserver @Inject constructor(@SessionDatabase real private val processors: Set<@JvmSuppressWildcards EventInsertLiveProcessor>) : RealmLiveEntityObserver(realmConfiguration) { + private val lock = Mutex() + override val query = Monarchy.Query { it.where(EventInsertEntity::class.java).equalTo(EventInsertEntityFields.CAN_BE_PROCESSED, true) } - private val onResultsChangedFlow = MutableSharedFlow>() - - init { - onResultsChangedFlow - .onEach { handleChange(it) } - .launchIn(observerScope) - } - override fun onChange(results: RealmResults) { - if (!results.isLoaded || results.isEmpty()) { - return - } - observerScope.launch { onResultsChangedFlow.emit(results) } - } - - private suspend fun handleChange(results: RealmResults) { - val idsToDeleteAfterProcess = ArrayList() - val filteredEvents = ArrayList(results.size) - Timber.v("EventInsertEntity updated with ${results.size} results in db") - results.forEach { - if (shouldProcess(it)) { - // don't use copy from realm over there - val copiedEvent = EventInsertEntity( - eventId = it.eventId, - eventType = it.eventType - ).apply { - insertType = it.insertType + observerScope.launch { + lock.withLock { + if (!results.isLoaded || results.isEmpty()) { + return@launch } - filteredEvents.add(copiedEvent) - } - idsToDeleteAfterProcess.add(it.eventId) - } - - awaitTransaction(realmConfiguration) { realm -> - Timber.v("##Transaction: There are ${filteredEvents.size} events to process ") - filteredEvents.forEach { eventInsert -> - val eventId = eventInsert.eventId - val event = EventEntity.where(realm, eventId).findFirst() - if (event == null) { - Timber.v("Event $eventId not found") - return@forEach + val idsToDeleteAfterProcess = ArrayList() + val filteredEvents = ArrayList(results.size) + Timber.v("EventInsertEntity updated with ${results.size} results in db") + results.forEach { + if (shouldProcess(it)) { + // don't use copy from realm over there + val copiedEvent = EventInsertEntity( + eventId = it.eventId, + eventType = it.eventType + ).apply { + insertType = it.insertType + } + filteredEvents.add(copiedEvent) + } + idsToDeleteAfterProcess.add(it.eventId) } - val domainEvent = event.asDomain() - processors.filter { - it.shouldProcess(eventId, domainEvent.getClearType(), eventInsert.insertType) - }.forEach { - it.process(realm, domainEvent) + awaitTransaction(realmConfiguration) { realm -> + Timber.v("##Transaction: There are ${filteredEvents.size} events to process ") + filteredEvents.forEach { eventInsert -> + val eventId = eventInsert.eventId + val event = EventEntity.where(realm, eventId).findFirst() + if (event == null) { + Timber.v("Event $eventId not found") + return@forEach + } + val domainEvent = event.asDomain() + processors.filter { + it.shouldProcess(eventId, domainEvent.getClearType(), eventInsert.insertType) + }.forEach { + it.process(realm, domainEvent) + } + } + realm.where(EventInsertEntity::class.java) + .`in`(EventInsertEntityFields.EVENT_ID, idsToDeleteAfterProcess.toTypedArray()) + .findAll() + .deleteAllFromRealm() } + processors.forEach { it.onPostProcess() } } - realm.where(EventInsertEntity::class.java) - .`in`(EventInsertEntityFields.EVENT_ID, idsToDeleteAfterProcess.toTypedArray()) - .findAll() - .deleteAllFromRealm() } - processors.forEach { it.onPostProcess() } } private fun shouldProcess(eventInsertEntity: EventInsertEntity): Boolean {