Skip to content

Commit

Permalink
⚡ 优化事务临时数据库
Browse files Browse the repository at this point in the history
事务的增删改都优先更新ui数据库,保证ui的优先显示,然后再异步请求,如果请求失败,最后放进临时数据库保存到下一次请求
985892345 committed Oct 10, 2024
1 parent 3d4f717 commit a93dcc5
Showing 2 changed files with 118 additions and 133 deletions.
Original file line number Diff line number Diff line change
@@ -23,12 +23,12 @@ import com.mredrock.cyxbs.lib.utils.service.ServiceManager
import com.mredrock.cyxbs.lib.utils.utils.judge.NetworkUtil
import io.reactivex.rxjava3.android.schedulers.AndroidSchedulers
import io.reactivex.rxjava3.core.Completable
import io.reactivex.rxjava3.core.Maybe
import io.reactivex.rxjava3.core.Observable
import io.reactivex.rxjava3.core.Single
import io.reactivex.rxjava3.internal.functions.Functions
import io.reactivex.rxjava3.schedulers.Schedulers
import java.util.*
import java.util.concurrent.Callable
import java.util.concurrent.TimeUnit
import kotlin.IllegalStateException

@@ -40,23 +40,28 @@ import kotlin.IllegalStateException
*/
@SuppressLint("CheckResult")
object AffairRepository {


// 以此 id 向下递减表示本地临时事物
// 后端下发的 remoteId 是大于 0 的
// 如果 remoteId 小于 0,则说明是本地临时添加的事务
private var AffairId = -10000

private val Api = AffairApiService.INSTANCE

private val DB = AffairDataBase.INSTANCE
private val AffairDao = DB.getAffairDao()
private val AffairCalendarDao = DB.getAffairCalendarDao()
private val LocalAddDao = DB.getLocalAddAffairDao()
private val LocalUpdateDao = DB.getLocalUpdateAffairDao()
private val LocalDeleteDao = DB.getLocalDeleteAffairDao()

private val mGson = Gson()

private fun List<AffairEntity.AtWhatTime>.toPostDateJson(): String {
// 不建议让 AtWhatTime 成为转 json 的类,应该转换成 AffairDateBean 转 json
return mGson.toJson(toAffairDateBean())
}

/**
* 观察当前登录人的事务
* - 支持换账号登录后返回新登录人的数据
@@ -81,7 +86,7 @@ object AffairRepository {
}
}
}

/**
* 刷新事务
*/
@@ -93,12 +98,12 @@ object AffairRepository {
.andThen(Api.getAffair())
.throwApiExceptionIfFail()
.map {
// 装换数据并插入数据库
// 转换数据并插入数据库
val affairIncompleteEntity = it.toAffairIncompleteEntity()
AffairDao.resetData(selfNum, affairIncompleteEntity)
}.subscribeOn(Schedulers.io())
}

/**
* 得到事务,但不建议你直接使用,应该用 [observeAffair] 来代替
*
@@ -112,7 +117,7 @@ object AffairRepository {
AffairDao.getAffairByStuNum(selfNum)
}
}

/**
* 添加事务,请使用 [observeAffair] 进行观察数据
*/
@@ -125,36 +130,30 @@ object AffairRepository {
val stuNum = ServiceManager(IAccountService::class).getUserService().getStuNum()
if (stuNum.isBlank()) return Completable.error(IllegalStateException("学号为空!"))
val dateJson = atWhatTime.toPostDateJson()
return Api.addAffair(time, title, content, dateJson)
.throwApiExceptionIfFail()
.map { it.remoteId }
.onErrorReturn {
// 发送给下流本地 remoteId,表示网络连接失败,需要添加进临时数据库
LocalRemoteId
}.map { remoteId ->
// 存在概率很小的同步问题,可能此时正在执行另一个 uploadLocalAffair(),所以使用 runInTransaction
DB.runInTransaction(
Callable {
// 插入数据库新数据,并返回给下游 onlyId
val entity = AffairIncompleteEntity(remoteId, time, title, content, atWhatTime)
val onlyId = AffairDao
.insertAffair(stuNum, entity)
.onlyId
if (remoteId == LocalRemoteId) {
// 为本地 remoteId 的话就插入到本地临时添加的事务中
val localEntity = LocalAddAffairEntity(stuNum, onlyId, time, title, content, dateJson)
LocalAddDao.insertLocalAddAffair(localEntity)
}
// 注意:这里返回给下游的是 onlyId,不是 remoteId
onlyId
}
)
}.doOnSuccess { onlyId ->
insertCalendarAfterClear(onlyId, time, title, content, atWhatTime)
}.flatMapCompletable { Completable.complete() }
return Single.create {
val entity = AffairIncompleteEntity(AffairId--, time, title, content, atWhatTime)
val onlyId = AffairDao
.insertAffair(stuNum, entity) // 优先添加进数据库,保证用户先看到 ui
.onlyId
it.onSuccess(onlyId)
}.doOnSuccess { onlyId ->
// 这里进行异步上传
Api.addAffair(time, title, content, dateJson)
.throwApiExceptionIfFail()
.doOnSuccess {
// 更新本地 remoteId
AffairDao.updateRemoteId(stuNum, onlyId, it.remoteId)
}.doOnError {
// 网络请求失败,保存进临时数据库
val localEntity = LocalAddAffairEntity(stuNum, onlyId, time, title, content, dateJson)
LocalAddDao.insertLocalAddAffair(localEntity)
}.unsafeSubscribeBy()
}.doOnSuccess { onlyId ->
insertCalendarAfterClear(onlyId, time, title, content, atWhatTime)
}.flatMapCompletable { Completable.complete() }
.subscribeOn(Schedulers.io())
}

/**
* 更新事务,请使用 [observeAffair] 进行观察数据
*/
@@ -172,7 +171,7 @@ object AffairRepository {
insertCalendarAfterClear(onlyId, time, title, content, atWhatTime)
}.subscribeOn(Schedulers.io())
}

@SuppressLint("CheckResult")
private fun updateAffairInternal(
stuNum: String,
@@ -182,60 +181,48 @@ object AffairRepository {
content: String,
atWhatTime: List<AffairEntity.AtWhatTime>
): Completable {
return Completable.create { emitter ->
try {
// 存在概率很小的同步问题,可能此时正在执行另一个 uploadLocalAffair(),所以使用 runInTransaction
DB.runInTransaction {
AffairDao.findAffairByOnlyId(stuNum, onlyId)
.toSingle() // 找不到时直接抛错
.map { it.remoteId }
.doOnSuccess { remoteId ->
val dateJson = atWhatTime.toPostDateJson()
if (remoteId == LocalRemoteId) {
// 如果是本地临时事务,就直接更新临时添加的事务
LocalAddDao
.updateLocalAddAffair(
LocalAddAffairEntity(stuNum, onlyId, time, title, content, dateJson)
return AffairDao.findAffairByOnlyId(stuNum, onlyId)
.toSingle() // 找不到时直接抛错,更新操作还能找不到?
.map { it.remoteId }
.doOnSuccess { remoteId ->
// 更新本地数据库,更新后 ui 就会同步刷新
AffairDao.updateAffair(
AffairEntity(
stuNum,
onlyId,
remoteId,
time,
title,
content,
atWhatTime
)
)
}.doOnSuccess { remoteId ->
val dateJson = atWhatTime.toPostDateJson()
if (remoteId < 0) {
// 如果是本地临时事务,就直接更新临时添加的事务
LocalAddDao
.updateLocalAddAffair(
LocalAddAffairEntity(stuNum, onlyId, time, title, content, dateJson)
)
} else {
// 不是本地临时事务就上传,这里异步上传
Api.updateAffair(remoteId, time, title, content, dateJson)
.throwApiExceptionIfFail()
.doOnError {
// 上传失败就暂时保存在本地临时更新的事务中
// insert 已改为 OnConflictStrategy.REPLACE,可进行替换插入
LocalUpdateDao
.insertLocalUpdateAffair(
LocalUpdateAffairEntity(
stuNum, onlyId, remoteId, time, title, content, dateJson
)
} else {
// 不是本地临时事务就上传
Api.updateAffair(remoteId, time, title, content, dateJson)
.throwApiExceptionIfFail()
.doOnError {
// 上传失败就暂时保存在本地临时更新的事务中
// insert 已改为 OnConflictStrategy.REPLACE,可进行替换插入
LocalUpdateDao
.insertLocalUpdateAffair(
LocalUpdateAffairEntity(
stuNum, onlyId, remoteId, time, title, content, dateJson
)
)
}.onErrorComplete { true } // 终止异常向下游传递
.blockingGet() // 直接堵塞,因为需要使用数据库的 runInTransaction,不能使用流来处理
}
}.doOnSuccess { remoteId ->
// 最后更新本地数据
AffairDao.updateAffair(
AffairEntity(
stuNum,
onlyId,
remoteId,
time,
title,
content,
atWhatTime
)
)
}.blockingGet() // 直接堵塞,因为需要使用数据库的 runInTransaction,不能使用流来处理
}.unsafeSubscribeBy()
}
// 在 runInTransaction 结束后再发送
emitter.onComplete()
} catch (e: Exception) {
emitter.tryOnError(e)
}
}
}.flatMapCompletable { Completable.complete() }
}

/**
* 删除事务,请使用 [observeAffair] 进行观察数据
*/
@@ -250,54 +237,44 @@ object AffairRepository {
}
}.subscribeOn(Schedulers.io())
}

@SuppressLint("CheckResult")
private fun deleteAffairInternal(stuNum: String, onlyId: Int): Completable {
return Completable.create { emitter ->
// 存在概率很小的同步问题,可能此时正在执行另一个 uploadLocalAffair(),所以使用 runInTransaction
return Single.create {
try {
DB.runInTransaction {
AffairDao.findAffairByOnlyId(stuNum, onlyId)
.toSingle() // 找不到时直接抛错
.map { it.remoteId }
.doOnSuccess { remoteId ->
if (remoteId == LocalRemoteId) {
// 如果是本地临时事务,就直接删除临时添加的事务
LocalAddDao.deleteLocalAddAffair(stuNum, onlyId)
} else {
// 不是本地临时事务就上传
Api.deleteAffair(remoteId)
.throwApiExceptionIfFail()
.doOnError {
// 上传失败就暂时保存在本地临时删除的事务中
LocalDeleteDao.insertLocalDeleteAffair(
LocalDeleteAffairEntity(stuNum, onlyId, remoteId)
)
// 然后尝试删除本地临时更新的事务,不管有没有
LocalUpdateDao.deleteLocalUpdateAffair(stuNum, onlyId)
}.onErrorComplete { true } // 终止异常向下游传递
.blockingGet() // 直接堵塞,因为需要使用数据库的 runInTransaction,不能使用流来处理
}
}.doOnSuccess {
// 最后删除本地数据
AffairDao.deleteAffair(stuNum, onlyId)
}.blockingGet() // 直接堵塞,因为需要使用数据库的 runInTransaction,不能使用流来处理
}
// 在 runInTransaction 结束后再发送
emitter.onComplete()
// 找不到时直接抛错,删除操作还能找不到?
val entity = AffairDao.deleteAffairReturn(stuNum, onlyId)!!
it.onSuccess(entity)
} catch (e: Exception) {
emitter.tryOnError(e)
it.tryOnError(e)
}
}
}.doOnSuccess { entity ->
if (entity.remoteId < 0) {
// 如果是本地临时事务,就直接删除临时添加的事务
LocalAddDao.deleteLocalAddAffair(stuNum, onlyId)
} else {
// 不是本地临时事务就上传,这里异步上传
Api.deleteAffair(entity.remoteId)
.throwApiExceptionIfFail()
.doOnError {
// 上传失败就暂时保存在本地临时删除的事务中
LocalDeleteDao.insertLocalDeleteAffair(
LocalDeleteAffairEntity(stuNum, onlyId, entity.remoteId)
)
// 然后尝试删除本地临时更新的事务,不管有没有
LocalUpdateDao.deleteLocalUpdateAffair(stuNum, onlyId)
}.unsafeSubscribeBy()
}
}.flatMapCompletable { Completable.complete() }
}

fun addTodo(pushWrapper: TodoListPushWrapper) = AffairApiService
.INSTANCE
.pushTodo(pushWrapper)
.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.INSTANCE
.pushTodo(pushWrapper)
.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())



/**
* 发送本地临时保存的事务
*
@@ -329,7 +306,7 @@ object AffairRepository {
}.blockingGet() // 直接同步请求,原因请看该方法注释
}
}

// 本地临时更新的事务
val localUpdate = {
LocalUpdateDao.getLocalUpdateAffair(stuNum).forEach { entity ->
@@ -347,7 +324,7 @@ object AffairRepository {
}.blockingGet() // 直接同步请求,原因请看该方法注释
}
}

// 本地临时删除的事务
val localDelete = {
LocalDeleteDao.getLocalDeleteAffair(stuNum).forEach { entity ->
@@ -360,7 +337,7 @@ object AffairRepository {
}.blockingGet() // 直接同步请求,原因请看该方法注释
}
}

return Completable.create { emitter ->
try {
// 必须使用 Transaction,保证数据库的同步性
@@ -379,7 +356,7 @@ object AffairRepository {
.delay(if (hasLocalAffair) 200 else 0, TimeUnit.MILLISECONDS)
}.observeOn(Schedulers.io())
}

/**
* 先清理 [onlyId] 已经添加进的手机日历
* 再添加进手机日历
Original file line number Diff line number Diff line change
@@ -67,7 +67,7 @@ data class AffairIncompleteEntity(
data class AffairEntity(
val stuNum: String,
val onlyId: Int, // 本地的唯一 id,由我们端上给出
val remoteId: Int, // 后端的 id,因为存在本地临时事务,所以会发生改变
val remoteId: Int, // 后端的 id,如果小于 0,则说明是本地临时添加的事务,并且可能会发生改变,业务侧不建议使用
val time: Int, // 提醒时间
val title: String,
val content: String,
@@ -140,6 +140,14 @@ abstract class AffairDao {
// 内部使用
@Insert(onConflict = OnConflictStrategy.REPLACE)
protected abstract fun insertAffair(affair: AffairEntity)

@Transaction
open fun deleteAffairReturn(stuNum: String, onlyId: Int): AffairEntity? {
return findAffairByOnlyId(stuNum, onlyId)
.doOnSuccess {
deleteAffair(it)
}.blockingGet()
}

/**
* 更新旧事务的 id

0 comments on commit a93dcc5

Please sign in to comment.