Skip to content

Commit

Permalink
Fix EventInsertLiveObserver gets blocked by reverting and adding lock…
Browse files Browse the repository at this point in the history
… instead
  • Loading branch information
ganfra authored and bmarty committed Jun 13, 2022
1 parent 65bb1a7 commit c384a3d
Show file tree
Hide file tree
Showing 2 changed files with 45 additions and 52 deletions.
1 change: 1 addition & 0 deletions changelog.d/6278.bugfix
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Fix regression on EventInsertLiveObserver getting blocked so there is no event being processed anymore.
Original file line number Diff line number Diff line change
Expand Up @@ -19,10 +19,9 @@ package org.matrix.android.sdk.internal.database
import com.zhuinden.monarchy.Monarchy
import io.realm.RealmConfiguration
import io.realm.RealmResults
import kotlinx.coroutines.flow.MutableSharedFlow
import kotlinx.coroutines.flow.launchIn
import kotlinx.coroutines.flow.onEach
import kotlinx.coroutines.launch
import kotlinx.coroutines.sync.Mutex
import kotlinx.coroutines.sync.withLock
import org.matrix.android.sdk.internal.database.mapper.asDomain
import org.matrix.android.sdk.internal.database.model.EventEntity
import org.matrix.android.sdk.internal.database.model.EventInsertEntity
Expand All @@ -37,65 +36,58 @@ internal class EventInsertLiveObserver @Inject constructor(@SessionDatabase real
private val processors: Set<@JvmSuppressWildcards EventInsertLiveProcessor>) :
RealmLiveEntityObserver<EventInsertEntity>(realmConfiguration) {

private val lock = Mutex()

override val query = Monarchy.Query {
it.where(EventInsertEntity::class.java).equalTo(EventInsertEntityFields.CAN_BE_PROCESSED, true)
}

private val onResultsChangedFlow = MutableSharedFlow<RealmResults<EventInsertEntity>>()

init {
onResultsChangedFlow
.onEach { handleChange(it) }
.launchIn(observerScope)
}

override fun onChange(results: RealmResults<EventInsertEntity>) {
if (!results.isLoaded || results.isEmpty()) {
return
}
observerScope.launch { onResultsChangedFlow.emit(results) }
}

private suspend fun handleChange(results: RealmResults<EventInsertEntity>) {
val idsToDeleteAfterProcess = ArrayList<String>()
val filteredEvents = ArrayList<EventInsertEntity>(results.size)
Timber.v("EventInsertEntity updated with ${results.size} results in db")
results.forEach {
if (shouldProcess(it)) {
// don't use copy from realm over there
val copiedEvent = EventInsertEntity(
eventId = it.eventId,
eventType = it.eventType
).apply {
insertType = it.insertType
observerScope.launch {
lock.withLock {
if (!results.isLoaded || results.isEmpty()) {
return@launch
}
filteredEvents.add(copiedEvent)
}
idsToDeleteAfterProcess.add(it.eventId)
}

awaitTransaction(realmConfiguration) { realm ->
Timber.v("##Transaction: There are ${filteredEvents.size} events to process ")
filteredEvents.forEach { eventInsert ->
val eventId = eventInsert.eventId
val event = EventEntity.where(realm, eventId).findFirst()
if (event == null) {
Timber.v("Event $eventId not found")
return@forEach
val idsToDeleteAfterProcess = ArrayList<String>()
val filteredEvents = ArrayList<EventInsertEntity>(results.size)
Timber.v("EventInsertEntity updated with ${results.size} results in db")
results.forEach {
if (shouldProcess(it)) {
// don't use copy from realm over there
val copiedEvent = EventInsertEntity(
eventId = it.eventId,
eventType = it.eventType
).apply {
insertType = it.insertType
}
filteredEvents.add(copiedEvent)
}
idsToDeleteAfterProcess.add(it.eventId)
}
val domainEvent = event.asDomain()
processors.filter {
it.shouldProcess(eventId, domainEvent.getClearType(), eventInsert.insertType)
}.forEach {
it.process(realm, domainEvent)
awaitTransaction(realmConfiguration) { realm ->
Timber.v("##Transaction: There are ${filteredEvents.size} events to process ")
filteredEvents.forEach { eventInsert ->
val eventId = eventInsert.eventId
val event = EventEntity.where(realm, eventId).findFirst()
if (event == null) {
Timber.v("Event $eventId not found")
return@forEach
}
val domainEvent = event.asDomain()
processors.filter {
it.shouldProcess(eventId, domainEvent.getClearType(), eventInsert.insertType)
}.forEach {
it.process(realm, domainEvent)
}
}
realm.where(EventInsertEntity::class.java)
.`in`(EventInsertEntityFields.EVENT_ID, idsToDeleteAfterProcess.toTypedArray())
.findAll()
.deleteAllFromRealm()
}
processors.forEach { it.onPostProcess() }
}
realm.where(EventInsertEntity::class.java)
.`in`(EventInsertEntityFields.EVENT_ID, idsToDeleteAfterProcess.toTypedArray())
.findAll()
.deleteAllFromRealm()
}
processors.forEach { it.onPostProcess() }
}

private fun shouldProcess(eventInsertEntity: EventInsertEntity): Boolean {
Expand Down

0 comments on commit c384a3d

Please sign in to comment.