Skip to content

Commit

Permalink
Fix sync and stuff.
Browse files Browse the repository at this point in the history
  • Loading branch information
mopsalarm committed Jun 11, 2018
1 parent af9b4aa commit 3f30974
Show file tree
Hide file tree
Showing 8 changed files with 124 additions and 30 deletions.
8 changes: 8 additions & 0 deletions app/src/main/java/com/pr0gramm/app/EagerBootstrap.kt
Original file line number Diff line number Diff line change
Expand Up @@ -6,20 +6,28 @@ import com.pr0gramm.app.services.UserService
import com.pr0gramm.app.services.VoteService
import com.pr0gramm.app.services.preloading.PreloadManager
import com.pr0gramm.app.services.proxy.ProxyService
import com.pr0gramm.app.sync.SyncService
import com.pr0gramm.app.util.AndroidUtility
import com.pr0gramm.app.util.doInBackground
import org.slf4j.LoggerFactory

/**
* Bootstraps a few instances in some other thread.
*/
object EagerBootstrap {
private val logger = LoggerFactory.getLogger("EagerBootstrap")

fun initEagerSingletons(kodein: Kodein) {
doInBackground {
try {
logger.info("Bootstrapping instances...")

kodein.instance<ProxyService>()
kodein.instance<PreloadManager>()
kodein.instance<VoteService>()
kodein.instance<UserService>()
kodein.instance<SyncService>()

} catch (error: Throwable) {
AndroidUtility.logToCrashlytics(error)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ class BookmarkService(private val database: Holder<SQLiteDatabase>) {
}

fun get(): Observable<List<Bookmark>> {
return onChange.subscribeOn(BackgroundScheduler.instance()).map { list() }
return onChange.observeOn(BackgroundScheduler.instance()).map { list() }
}

/**
Expand Down
71 changes: 53 additions & 18 deletions app/src/main/java/com/pr0gramm/app/services/SeenService.kt
Original file line number Diff line number Diff line change
Expand Up @@ -3,16 +3,20 @@ package com.pr0gramm.app.services
import android.content.Context
import com.google.common.primitives.UnsignedBytes
import com.pr0gramm.app.util.doInBackground
import com.pr0gramm.app.util.readStream
import com.pr0gramm.app.util.time
import org.slf4j.LoggerFactory
import java.io.*
import java.io.ByteArrayInputStream
import java.io.ByteArrayOutputStream
import java.io.File
import java.io.RandomAccessFile
import java.nio.ByteBuffer
import java.nio.channels.Channels
import java.nio.channels.FileChannel
import java.util.concurrent.atomic.AtomicReference
import java.util.zip.Deflater
import java.util.zip.DeflaterInputStream
import java.util.zip.DeflaterOutputStream
import java.util.zip.InflaterInputStream
import kotlin.experimental.or


Expand All @@ -33,7 +37,7 @@ class SeenService(context: Context) {
try {
val file = File(context.filesDir, "seen-posts.bits")
buffer.set(mapByteBuffer(file))
} catch (error: IOException) {
} catch (error: Exception) {
logger.warn("Could not load the seen-Cache", error)
}
}
Expand Down Expand Up @@ -93,33 +97,64 @@ class SeenService(context: Context) {
fun merge(other: ByteArray) {
val buffer = this.buffer.get() ?: return

var updated = 0
var totalCount = 0
var updateCount = 0

synchronized(lock) {
logger.time("Merging values") {
ByteArrayInputStream(other).use { bi ->
DeflaterInputStream(bi).use { input ->
for (idx in 0 until buffer.limit()) {
val otherValue = input.read()
if (otherValue == -1)
break

// merge them by performing a bitwise 'or'
val previousValue = buffer.get(idx)
val mergedValue = previousValue or UnsignedBytes.saturatedCast(otherValue.toLong())
if (previousValue != mergedValue) {
updated++
buffer.put(idx, mergedValue)
InflaterInputStream(bi).use { input ->
val source = buffer.duplicate()
val target = buffer.duplicate()

readStream(input) { bytes, read ->
val stepSize = read.coerceAtMost(source.remaining())

var updatedInStep = 0
for (idx in 0 until stepSize) {
val previousValue = source.get()
val mergedValue = bytes[idx] or previousValue

if (previousValue != mergedValue) {
bytes[idx] = mergedValue
updatedInStep++
}
}

if (updatedInStep != 0) {
target.put(bytes, 0, stepSize)
} else {
target.position(target.position() + stepSize)
}

totalCount += stepSize
updateCount += updatedInStep
}
}
}
}

dirty = updated > 0
dirty = updateCount > 0
}

logger.info("Updated {} bytes in seen cache", updated)
logger.info("Updated {} out of {} bytes in seen cache", updateCount, totalCount)
}

/**
* Workaround to reset the lower n bytes.
*/
fun clearUpTo(n: Int) {
val buffer = this.buffer.get() ?: return

logger.info("Setting the first {} bits to zero.", n)

synchronized(lock) {
for (idx in 0 until n.coerceAtMost(buffer.limit())) {
buffer.put(idx, 0)
}

dirty = true
}
}

fun export(): ByteArray {
Expand Down
2 changes: 1 addition & 1 deletion app/src/main/java/com/pr0gramm/app/services/UserService.kt
Original file line number Diff line number Diff line change
Expand Up @@ -128,7 +128,7 @@ class UserService(private val api: Api,

.subscribe(
{ loginState -> updateLoginState({ loginState }) },
{ error -> logger.warn("Could not restore login state: " + error) })
{ error -> logger.warn("Could not restore login state:", error) })

}
}
Expand Down
47 changes: 42 additions & 5 deletions app/src/main/java/com/pr0gramm/app/sync/SyncService.kt
Original file line number Diff line number Diff line change
Expand Up @@ -6,10 +6,12 @@ import com.pr0gramm.app.Settings
import com.pr0gramm.app.Stats
import com.pr0gramm.app.services.*
import com.pr0gramm.app.ui.dialogs.ignoreError
import com.pr0gramm.app.util.AndroidUtility
import com.pr0gramm.app.util.subscribeOnBackground
import com.pr0gramm.app.util.*
import org.slf4j.LoggerFactory
import rx.Observable
import java.io.IOException
import java.util.concurrent.TimeUnit
import java.util.concurrent.locks.ReentrantLock


/**
Expand All @@ -23,6 +25,17 @@ class SyncService(private val userService: UserService,
private val logger = LoggerFactory.getLogger("SyncService")
private val settings = Settings.get()

init {

// do a sync everytime the user token changes
userService.loginState()
.mapNotNull { state -> state.uniqueToken }
.doOnNext { logger.info("Unique token is now {}", it) }
.distinctUntilChanged()
.delaySubscription(1, TimeUnit.SECONDS)
.subscribe { syncSeenServiceAsync(it) }
}

fun syncStatistics() {
Stats.get().incrementCounter("jobs.sync-stats")

Expand Down Expand Up @@ -77,9 +90,32 @@ class SyncService(private val userService: UserService,
}
}

private fun syncSeenServiceAsync(token: String) {
private val seenSyncLock = ReentrantLock()

private fun syncSeenServiceAsync(token: String) = seenSyncLock.withTryLock {
if (!settings.backup) {
return
}

logger.info("Starting sync of seen bits.")

// the first implementation of this feature had a bug where it would deflate
// the stream again and by this wrongly set some of the lower random bytes.
//
// we will now just clear the lower bits
//
val fixObservable = if (singleShotService.isFirstTime("fix.clear-lower-seen-bits-2")) {
kvService.get(token, "seen")
.ofType<KVService.GetResult.Value>()
.doOnNext { seenService.clearUpTo(it.value.size * 150 / 100) }
.ignoreError()

} else {
Observable.empty()
}

val updateObservable = kvService
.update(token, "seen") { previous ->
.update(token, "seen-bits") { previous ->
if (previous != null) {
// merge the previous state into the current seen service
seenService.merge(previous)
Expand All @@ -93,7 +129,8 @@ class SyncService(private val userService: UserService,
}
}

updateObservable
fixObservable.ofType<KVService.PutResult.Version>()
.concatWith(updateObservable)
.doOnError { err ->
Stats.get().incrementCounter("seen.sync.error")

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,10 +13,10 @@ internal class BookmarkActionItem(private val bookmarkService: BookmarkService,
return bookmarkService.get()
.take(1)
.flatMapIterable { it }
.exists { it.asFeedFilter() == filter }
.onErrorResumeEmpty()
.exists { it.asFeedFilter() == filter }
.toBlocking()
.first()
.single()
}

override fun activate() {
Expand All @@ -28,7 +28,6 @@ internal class BookmarkActionItem(private val bookmarkService: BookmarkService,
.take(1)
.flatMapIterable { it }
.filter { it.asFeedFilter() == filter }
.onErrorResumeEmpty()
.flatMapCompletable { bookmark -> bookmarkService.delete(bookmark).onErrorComplete() }
.subscribe()
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@ class SettingsActionItemsSlide : ActionItemsSlide("SettingsActionItemsSlide") {
val settings = Settings.get()

return listOf(
SettingActionItem(settings, "HTTPS verwenden", "pref_use_https"),
SettingActionItem(settings, "Immer mit 'sfw' starten", "pref_feed_start_at_sfw"),
SettingActionItem(settings, getString(R.string.pref_use_incognito_browser_title), "pref_use_incognito_browser"),
SettingActionItem(settings, getString(R.string.pref_double_tap_to_upvote), "pref_double_tap_to_upvote"),
Expand Down
18 changes: 17 additions & 1 deletion app/src/main/java/com/pr0gramm/app/util/Extensions.kt
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ import java.io.File
import java.io.InputStream
import java.lang.ref.WeakReference
import java.util.concurrent.TimeUnit
import java.util.concurrent.locks.Lock
import java.util.regex.Pattern
import kotlin.properties.Delegates
import kotlin.properties.ReadWriteProperty
Expand All @@ -58,7 +59,7 @@ fun <T> Observable<T>.subscribeOnBackground(): Observable<T> = subscribeOn(Backg

fun <T> Observable<T>.observeOnMain(): Observable<T> = observeOn(AndroidSchedulers.mainThread())

inline fun readStream(stream: InputStream, bufferSize: Int = 16 * 1042, fn: (ByteArray, Int) -> Unit): Unit {
inline fun readStream(stream: InputStream, bufferSize: Int = 16 * 1024, fn: (ByteArray, Int) -> Unit): Unit {
val buffer = ByteArray(bufferSize)

while (true) {
Expand Down Expand Up @@ -389,3 +390,18 @@ fun View.updatePadding(

setPadding(left, top, right, bottom)
}

fun <T : Any?, R : Any> Observable<T>.mapNotNull(fn: (T) -> R?): Observable<R> {
@Suppress("UNCHECKED_CAST")
return map { fn(it) }.filter { it != null } as Observable<R>
}

inline fun Lock.withTryLock(fn: () -> Unit) {
if (tryLock()) {
try {
fn()
} finally {
unlock()
}
}
}

0 comments on commit 3f30974

Please sign in to comment.