diff --git a/flo/CHANGELOG.md b/flo/CHANGELOG.md new file mode 100644 index 00000000..d9beef3c --- /dev/null +++ b/flo/CHANGELOG.md @@ -0,0 +1,10 @@ +# Changelog + +All notable changes to this project will be documented in this file. + +The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.1.0/), +and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.html). + +## [unspecified] + +- Initial dump from internal repo diff --git a/flo/apps/demo/build.gradle.kts b/flo/apps/demo/build.gradle.kts new file mode 100644 index 00000000..b0face60 --- /dev/null +++ b/flo/apps/demo/build.gradle.kts @@ -0,0 +1,49 @@ +plugins { + alias(libs.plugins.tidal.android.application) +} + +android { + namespace = "com.tidal.sdk.flo.demo" + + buildFeatures { + compose = false + } + + defaultConfig { + applicationId = "com.tidal.sdk.flo.demo" + versionCode = 1 + versionName = "0.1.0" + } + + flavorDimensions += "api" + productFlavors { + create("core") { + dimension = "api" + applicationIdSuffix = ".core" + } + create("kotlincoroutines") { + dimension = "api" + applicationIdSuffix = ".kotlincoroutines" + } + create("rxjava3") { + dimension = "api" + applicationIdSuffix = ".rxjava3" + } + create("rxjava2") { + dimension = "api" + applicationIdSuffix = ".rxjava2" + } + create("rxjava") { + dimension = "api" + applicationIdSuffix = ".rxjava" + } + } +} + +dependencies { + implementation(project(":flo")) + "kotlincoroutinesImplementation"(project(":flo:extensions:kotlincoroutines")) + "rxjava3Implementation"(project(":flo:extensions:rxjava3")) + "rxjava2Implementation"(project(":flo:extensions:rxjava2")) + "rxjavaImplementation"(project(":flo:extensions:rxjava")) +} diff --git a/flo/apps/demo/src/core/kotlin/com/tidal/sdk/flo/demo/CoreApiSubscriptionManager.kt b/flo/apps/demo/src/core/kotlin/com/tidal/sdk/flo/demo/CoreApiSubscriptionManager.kt new file mode 100644 index 00000000..a86e03a8 --- /dev/null +++ b/flo/apps/demo/src/core/kotlin/com/tidal/sdk/flo/demo/CoreApiSubscriptionManager.kt @@ -0,0 +1,27 @@ +package com.tidal.sdk.flo.demo + +import com.tidal.sdk.flo.core.FloClient +import com.tidal.sdk.flo.core.SubscriptionHandle + +internal typealias SelectedApiSubscriptionManager = CoreApiSubscriptionManager + +internal class CoreApiSubscriptionManager( + floClient: FloClient, + name: String, +) : SubscriptionManager(floClient, name) { + + private val topicToSubscriptionHandle = mutableMapOf() + + override fun subscribeInternal( + topic: String, + onMessage: (String) -> Unit, + onError: (Throwable) -> Unit, + tail: Int, + ) { + topicToSubscriptionHandle[topic] = floClient.subscribe(topic, onMessage, onError, tail) + } + + override fun unsubscribeInternal(topic: String) { + topicToSubscriptionHandle[topic]?.unsubscribe() + } +} diff --git a/flo/apps/demo/src/kotlincoroutines/kotlin/com/tidal/sdk/flo/demo/KotlinCoroutineApiSubscriptionManager.kt b/flo/apps/demo/src/kotlincoroutines/kotlin/com/tidal/sdk/flo/demo/KotlinCoroutineApiSubscriptionManager.kt new file mode 100644 index 00000000..714caa2f --- /dev/null +++ b/flo/apps/demo/src/kotlincoroutines/kotlin/com/tidal/sdk/flo/demo/KotlinCoroutineApiSubscriptionManager.kt @@ -0,0 +1,46 @@ +package com.tidal.sdk.flo.demo + +import com.tidal.sdk.flo.core.FloClient +import com.tidal.sdk.flo.core.FloException +import com.tidal.sdk.flo.extensions.kotlincoroutines.topicAsFlow +import kotlinx.coroutines.CancellationException +import kotlinx.coroutines.GlobalScope +import kotlinx.coroutines.Job +import kotlinx.coroutines.launch + +internal typealias SelectedApiSubscriptionManager = KotlinCoroutineApiSubscriptionManager + +internal class KotlinCoroutineApiSubscriptionManager( + floClient: FloClient, + name: String, +) : SubscriptionManager(floClient, name) { + + private val topicToJob = mutableMapOf() + + @Suppress("TooGenericExceptionCaught") // We don't want singled-out behavior + override fun subscribeInternal( + topic: String, + onMessage: (String) -> Unit, + onError: (Throwable) -> Unit, + tail: Int, + ) { + topicToJob[topic] = GlobalScope.launch { + floClient.topicAsFlow(topic, tail).collect { + try { + onMessage(it) + } catch (throwable: Throwable) { + when (throwable) { + is FloException -> onError(throwable) + else -> throw throwable + } + } + } + } + } + + override fun unsubscribeInternal(topic: String) { + topicToJob[topic]?.cancel(RequestedCancellationException()) + } +} + +private class RequestedCancellationException : CancellationException() diff --git a/flo/apps/demo/src/main/AndroidManifest.xml b/flo/apps/demo/src/main/AndroidManifest.xml new file mode 100644 index 00000000..5848c652 --- /dev/null +++ b/flo/apps/demo/src/main/AndroidManifest.xml @@ -0,0 +1,16 @@ + + + + + + + + + + + + diff --git a/flo/apps/demo/src/main/kotlin/com/tidal/sdk/flo/demo/MainActivity.kt b/flo/apps/demo/src/main/kotlin/com/tidal/sdk/flo/demo/MainActivity.kt new file mode 100644 index 00000000..4278e0da --- /dev/null +++ b/flo/apps/demo/src/main/kotlin/com/tidal/sdk/flo/demo/MainActivity.kt @@ -0,0 +1,129 @@ +package com.tidal.sdk.flo.demo + +import android.annotation.SuppressLint +import android.app.Activity +import android.os.Bundle +import android.text.InputType +import android.util.Log +import android.view.Gravity +import android.widget.Button +import android.widget.EditText +import android.widget.LinearLayout +import android.widget.TextView + +internal class MainActivity : Activity() { + + @Suppress("LongMethod") + @SuppressLint("SetTextI18n") + override fun onCreate(savedInstanceState: Bundle?) { + super.onCreate(savedInstanceState) + title = "API: ${BuildConfig.FLAVOR}" + val topicView = EditText(this).apply { + gravity = Gravity.CENTER + layoutParams = LinearLayout.LayoutParams( + LinearLayout.LayoutParams.MATCH_PARENT, + LinearLayout.LayoutParams.WRAP_CONTENT, + ) + } + val tailView = EditText(this).apply { + gravity = Gravity.CENTER + inputType = InputType.TYPE_CLASS_NUMBER + layoutParams = LinearLayout.LayoutParams( + LinearLayout.LayoutParams.MATCH_PARENT, + LinearLayout.LayoutParams.WRAP_CONTENT, + ) + } + val subscriptionView = TextView(this).apply { + gravity = Gravity.CENTER + } + val subscriptionManager = SelectedApiSubscriptionManager( + (applicationContext as MainApplication).demoFloClient, + "Demo FloClient", + ) + val updateSubscriptionView = { + subscriptionView.text = subscriptionManager.subscribedTopics + .joinToString(separator = "\n", postfix = "\n") { + "topic=$it (${subscriptionManager.name})" + } + } + setContentView( + LinearLayout(this).apply { + gravity = Gravity.TOP or Gravity.CENTER_HORIZONTAL + orientation = LinearLayout.VERTICAL + addView( + LinearLayout(this@MainActivity).apply { + gravity = Gravity.CENTER + addView( + Button(this@MainActivity).apply { + text = "Subscribe" + setOnClickListener { + val topic = topicView.text.toString() + subscriptionManager.unsubscribe(topic) + subscriptionManager + .subscribe( + topic, + { msg: String -> + Log.d( + "HOLA", + "topic $topic - MSG: $msg", + ) + }, + { e: Throwable -> + Log.d( + "HOLA", + "topic $topic - ERROR", + e, + ) + }, + tailView.text.toString().ifBlank { "0" }.toInt(), + ) + updateSubscriptionView() + } + }, + ) + addView( + Button(this@MainActivity).apply { + text = "Unsubscribe" + setOnClickListener { + val topic = topicView.text.toString() + subscriptionManager.unsubscribe(topic) + updateSubscriptionView() + } + }, + ) + }, + ) + addView( + LinearLayout(this@MainActivity).apply { + gravity = Gravity.CENTER + orientation = LinearLayout.HORIZONTAL + addView( + TextView(this@MainActivity).apply { + text = "Topic:" + }, + ) + addView(topicView) + }, + ) + addView( + LinearLayout(this@MainActivity).apply { + gravity = Gravity.CENTER + orientation = LinearLayout.HORIZONTAL + addView( + TextView(this@MainActivity).apply { + text = "Tail (for subscribing only):" + }, + ) + addView(tailView) + }, + ) + addView(subscriptionView).apply { + layoutParams = LinearLayout.LayoutParams( + LinearLayout.LayoutParams.MATCH_PARENT, + LinearLayout.LayoutParams.MATCH_PARENT, + ) + } + }, + ) + } +} diff --git a/flo/apps/demo/src/main/kotlin/com/tidal/sdk/flo/demo/MainApplication.kt b/flo/apps/demo/src/main/kotlin/com/tidal/sdk/flo/demo/MainApplication.kt new file mode 100644 index 00000000..a461f3c6 --- /dev/null +++ b/flo/apps/demo/src/main/kotlin/com/tidal/sdk/flo/demo/MainApplication.kt @@ -0,0 +1,27 @@ +package com.tidal.sdk.flo.demo + +import android.app.Application +import android.net.ConnectivityManager +import android.os.Handler +import android.os.HandlerThread +import android.os.Looper +import com.tidal.sdk.flo.core.FloClient + +internal class MainApplication : Application() { + + val demoFloClient by lazy { + FloClient( + getSystemService(ConnectivityManager::class.java), + { "atoken" }, + { false }, + TODO("URL to your Flo backend here"), + HandlerThread("DemoFloClient::Operations") + .also { + it.start() + }.looper.run { + Handler(this) + }, + Handler(Looper.getMainLooper()), + ) + } +} diff --git a/flo/apps/demo/src/main/kotlin/com/tidal/sdk/flo/demo/SubscriptionManager.kt b/flo/apps/demo/src/main/kotlin/com/tidal/sdk/flo/demo/SubscriptionManager.kt new file mode 100644 index 00000000..8131da98 --- /dev/null +++ b/flo/apps/demo/src/main/kotlin/com/tidal/sdk/flo/demo/SubscriptionManager.kt @@ -0,0 +1,36 @@ +package com.tidal.sdk.flo.demo + +import com.tidal.sdk.flo.core.FloClient + +abstract class SubscriptionManager( + protected val floClient: FloClient, + val name: String, +) { + + var subscribedTopics = listOf() + private set + + fun subscribe( + topic: String, + onMessage: (String) -> Unit, + onError: (Throwable) -> Unit, + tail: Int = 0, + ) = synchronized(this) { + subscribedTopics = subscribedTopics + topic + subscribeInternal(topic, onMessage, onError, tail) + } + + fun unsubscribe(topic: String) = synchronized(this) { + subscribedTopics = subscribedTopics - topic + unsubscribeInternal(topic) + } + + protected abstract fun subscribeInternal( + topic: String, + onMessage: (String) -> Unit, + onError: (Throwable) -> Unit, + tail: Int = 0, + ) + + protected abstract fun unsubscribeInternal(topic: String) +} diff --git a/flo/apps/demo/src/rxjava/kotlin/com/tidal/sdk/flo/demo/RxJavaApiSubscriptionManager.kt b/flo/apps/demo/src/rxjava/kotlin/com/tidal/sdk/flo/demo/RxJavaApiSubscriptionManager.kt new file mode 100644 index 00000000..b475b5e6 --- /dev/null +++ b/flo/apps/demo/src/rxjava/kotlin/com/tidal/sdk/flo/demo/RxJavaApiSubscriptionManager.kt @@ -0,0 +1,37 @@ +package com.tidal.sdk.flo.demo + +import com.tidal.sdk.flo.core.FloClient +import com.tidal.sdk.flo.extensions.rxjava.topicAsObservable +import rx.Emitter +import rx.Subscription + +internal typealias SelectedApiSubscriptionManager = RxJavaApiSubscriptionManager + +internal class RxJavaApiSubscriptionManager( + floClient: FloClient, + name: String, +) : SubscriptionManager(floClient, name) { + + private val topicToSubscription = mutableMapOf() + + override fun subscribeInternal( + topic: String, + onMessage: (String) -> Unit, + onError: (Throwable) -> Unit, + tail: Int, + ) { + topicToSubscription[topic] = floClient.topicAsObservable( + topic, + tail, + Emitter.BackpressureMode.LATEST, + ) + .subscribe( + onMessage, + onError, + ) + } + + override fun unsubscribeInternal(topic: String) { + topicToSubscription[topic]?.unsubscribe() + } +} diff --git a/flo/apps/demo/src/rxjava2/kotlin/com/tidal/sdk/flo/demo/RxJava2ApiSubscriptionManager.kt b/flo/apps/demo/src/rxjava2/kotlin/com/tidal/sdk/flo/demo/RxJava2ApiSubscriptionManager.kt new file mode 100644 index 00000000..9160efa4 --- /dev/null +++ b/flo/apps/demo/src/rxjava2/kotlin/com/tidal/sdk/flo/demo/RxJava2ApiSubscriptionManager.kt @@ -0,0 +1,34 @@ +package com.tidal.sdk.flo.demo + +import com.tidal.sdk.flo.core.FloClient +import com.tidal.sdk.flo.extensions.rxjava2.topicAsFlowable +import io.reactivex.BackpressureStrategy +import io.reactivex.disposables.Disposable + +internal typealias SelectedApiSubscriptionManager = RxJava2ApiSubscriptionManager + +internal class RxJava2ApiSubscriptionManager( + floClient: FloClient, + name: String, +) : SubscriptionManager(floClient, name) { + + private val topicToDisposable = mutableMapOf() + + override fun subscribeInternal( + topic: String, + onMessage: (String) -> Unit, + onError: (Throwable) -> Unit, + tail: Int, + ) { + topicToDisposable[topic] = + floClient.topicAsFlowable(topic, tail, BackpressureStrategy.LATEST) + .subscribe( + onMessage, + onError, + ) + } + + override fun unsubscribeInternal(topic: String) { + topicToDisposable[topic]?.dispose() + } +} diff --git a/flo/apps/demo/src/rxjava3/kotlin/com/tidal/sdk/flo/demo/RxJava3ApiSubscriptionManager.kt b/flo/apps/demo/src/rxjava3/kotlin/com/tidal/sdk/flo/demo/RxJava3ApiSubscriptionManager.kt new file mode 100644 index 00000000..5568c5d5 --- /dev/null +++ b/flo/apps/demo/src/rxjava3/kotlin/com/tidal/sdk/flo/demo/RxJava3ApiSubscriptionManager.kt @@ -0,0 +1,34 @@ +package com.tidal.sdk.flo.demo + +import com.tidal.sdk.flo.core.FloClient +import com.tidal.sdk.flo.extensions.rxjava3.topicAsFlowable +import io.reactivex.rxjava3.core.BackpressureStrategy +import io.reactivex.rxjava3.disposables.Disposable + +internal typealias SelectedApiSubscriptionManager = RxJava3ApiSubscriptionManager + +internal class RxJava3ApiSubscriptionManager( + floClient: FloClient, + name: String, +) : SubscriptionManager(floClient, name) { + + private val topicToDisposable = mutableMapOf() + + override fun subscribeInternal( + topic: String, + onMessage: (String) -> Unit, + onError: (Throwable) -> Unit, + tail: Int, + ) { + topicToDisposable[topic] = + floClient.topicAsFlowable(topic, tail, BackpressureStrategy.LATEST) + .subscribe( + onMessage, + onError, + ) + } + + override fun unsubscribeInternal(topic: String) { + topicToDisposable[topic]?.dispose() + } +} diff --git a/flo/build.gradle.kts b/flo/build.gradle.kts new file mode 100644 index 00000000..444b5540 --- /dev/null +++ b/flo/build.gradle.kts @@ -0,0 +1,15 @@ +plugins { + alias(libs.plugins.tidal.android.library) + alias(libs.plugins.google.devtools.ksp) +} + +android { + namespace = "com.tidal.sdk.flo" +} + +dependencies { + ksp(libs.moshi.codegen) + implementation(libs.moshi) + implementation(libs.moshi.adapters) + api(libs.okhttp.okhttp) +} diff --git a/flo/extensions/kotlincoroutines/build.gradle.kts b/flo/extensions/kotlincoroutines/build.gradle.kts new file mode 100644 index 00000000..6932f076 --- /dev/null +++ b/flo/extensions/kotlincoroutines/build.gradle.kts @@ -0,0 +1,12 @@ +plugins { + alias(libs.plugins.tidal.android.library) +} + +android { + namespace = "com.tidal.sdk.flo.extensions.kotlincoroutines" +} + +dependencies { + api(project(":flo")) + api(libs.kotlinxCoroutinesCore) +} diff --git a/flo/extensions/kotlincoroutines/src/main/kotlin/com/tidal/sdk/flo/extensions/kotlincoroutines/KotlinCoroutinesExt.kt b/flo/extensions/kotlincoroutines/src/main/kotlin/com/tidal/sdk/flo/extensions/kotlincoroutines/KotlinCoroutinesExt.kt new file mode 100644 index 00000000..07de83cc --- /dev/null +++ b/flo/extensions/kotlincoroutines/src/main/kotlin/com/tidal/sdk/flo/extensions/kotlincoroutines/KotlinCoroutinesExt.kt @@ -0,0 +1,27 @@ +package com.tidal.sdk.flo.extensions.kotlincoroutines + +import com.tidal.sdk.flo.core.FloClient +import kotlinx.coroutines.cancel +import kotlinx.coroutines.channels.awaitClose +import kotlinx.coroutines.channels.trySendBlocking +import kotlinx.coroutines.flow.Flow +import kotlinx.coroutines.flow.buffer +import kotlinx.coroutines.flow.callbackFlow + +/** + * Subscribes to a topic using a [Flow]-based API. Operations on the returned [Flow], such as + * buffering management via [Flow.buffer], are supported. + * + * @return A cold stream that can be used to control the subscription. + * @see [FloClient.subscribe] + */ +@JvmOverloads +fun FloClient.topicAsFlow(topic: String, tail: Int = 0) = callbackFlow { + val subscriptionHandle = subscribe( + topic, + { trySendBlocking(it) }, + { cancel("There has been a terminal error.", it) }, + tail, + ) + awaitClose { subscriptionHandle.unsubscribe() } +} diff --git a/flo/extensions/rxjava/build.gradle.kts b/flo/extensions/rxjava/build.gradle.kts new file mode 100644 index 00000000..5f7054d2 --- /dev/null +++ b/flo/extensions/rxjava/build.gradle.kts @@ -0,0 +1,12 @@ +plugins { + alias(libs.plugins.tidal.android.library) +} + +android { + namespace = "com.tidal.sdk.flo.extensions.rxjava" +} + +dependencies { + api(project(":flo")) + api(libs.rxjava) +} diff --git a/flo/extensions/rxjava/src/main/kotlin/com/tidal/sdk/flo/extensions/rxjava/RxJavaExt.kt b/flo/extensions/rxjava/src/main/kotlin/com/tidal/sdk/flo/extensions/rxjava/RxJavaExt.kt new file mode 100644 index 00000000..af7268ec --- /dev/null +++ b/flo/extensions/rxjava/src/main/kotlin/com/tidal/sdk/flo/extensions/rxjava/RxJavaExt.kt @@ -0,0 +1,34 @@ +package com.tidal.sdk.flo.extensions.rxjava + +import com.tidal.sdk.flo.core.FloClient +import rx.Emitter.BackpressureMode +import rx.Observable + +/** + * Subscribes to a topic using an [Observable]-based API. Operations on the returned [Observable] + * are supported. + * + * @param backpressureMode Specifies a [BackpressureMode] to apply if incoming messages outpace + * consumer capabilities. + * @return A cold stream that can be used to control the subscription. + * @see [FloClient.subscribe] + */ +@JvmOverloads +fun FloClient.topicAsObservable(topic: String, tail: Int = 0, backpressureMode: BackpressureMode) = + Observable.create( + { emitter -> + val subscriptionHandle = subscribe( + topic, + { emitter.onNext(it) }, + { + emitter.apply { + onError(it) + onCompleted() + } + }, + tail, + ) + emitter.setCancellation { subscriptionHandle.unsubscribe() } + }, + backpressureMode, + ) diff --git a/flo/extensions/rxjava2/build.gradle.kts b/flo/extensions/rxjava2/build.gradle.kts new file mode 100644 index 00000000..6dc8189c --- /dev/null +++ b/flo/extensions/rxjava2/build.gradle.kts @@ -0,0 +1,12 @@ +plugins { + alias(libs.plugins.tidal.android.library) +} + +android { + namespace = "com.tidal.sdk.flo.extensions.rxjava2" +} + +dependencies { + api(project(":flo")) + api(libs.rxjava2) +} diff --git a/flo/extensions/rxjava2/src/main/kotlin/com/tidal/sdk/flo/extensions/rxjava2/RxJava2Ext.kt b/flo/extensions/rxjava2/src/main/kotlin/com/tidal/sdk/flo/extensions/rxjava2/RxJava2Ext.kt new file mode 100644 index 00000000..3cc1846f --- /dev/null +++ b/flo/extensions/rxjava2/src/main/kotlin/com/tidal/sdk/flo/extensions/rxjava2/RxJava2Ext.kt @@ -0,0 +1,37 @@ +package com.tidal.sdk.flo.extensions.rxjava2 + +import com.tidal.sdk.flo.core.FloClient +import io.reactivex.BackpressureStrategy +import io.reactivex.Flowable + +/** + * Subscribes to a topic using a [Flowable]-based API. Operations on the returned [Flowable] are + * supported. + * + * @param backpressureStrategy Specifies a [BackpressureStrategy] to apply if incoming messages + * outpace consumer capabilities. + * @return A cold stream that can be used to control the subscription. + * @see [FloClient.subscribe] + */ +@JvmOverloads +fun FloClient.topicAsFlowable( + topic: String, + tail: Int = 0, + backpressureStrategy: BackpressureStrategy, +) = Flowable.create( + { emitter -> + val subscriptionHandle = subscribe( + topic, + { emitter.onNext(it) }, + { + emitter.apply { + onError(it) + onComplete() + } + }, + tail, + ) + emitter.setCancellable { subscriptionHandle.unsubscribe() } + }, + backpressureStrategy, +) diff --git a/flo/extensions/rxjava3/build.gradle.kts b/flo/extensions/rxjava3/build.gradle.kts new file mode 100644 index 00000000..154b0885 --- /dev/null +++ b/flo/extensions/rxjava3/build.gradle.kts @@ -0,0 +1,12 @@ +plugins { + alias(libs.plugins.tidal.android.library) +} + +android { + namespace = "com.tidal.sdk.flo.extensions.rxjava3" +} + +dependencies { + api(project(":flo")) + api(libs.rxjava3) +} diff --git a/flo/extensions/rxjava3/src/main/kotlin/com/tidal/sdk/flo/extensions/rxjava3/RxJava3Ext.kt b/flo/extensions/rxjava3/src/main/kotlin/com/tidal/sdk/flo/extensions/rxjava3/RxJava3Ext.kt new file mode 100644 index 00000000..bfe87301 --- /dev/null +++ b/flo/extensions/rxjava3/src/main/kotlin/com/tidal/sdk/flo/extensions/rxjava3/RxJava3Ext.kt @@ -0,0 +1,37 @@ +package com.tidal.sdk.flo.extensions.rxjava3 + +import com.tidal.sdk.flo.core.FloClient +import io.reactivex.rxjava3.core.BackpressureStrategy +import io.reactivex.rxjava3.core.Flowable + +/** + * Subscribes to a topic using a [Flowable]-based API. Operations on the returned [Flowable] are + * supported. + * + * @param backpressureStrategy Specifies a [BackpressureStrategy] to apply if incoming messages + * outpace consumer capabilities. + * @return A cold stream that can be used to control the subscription. + * @see [FloClient.subscribe] + */ +@JvmOverloads +fun FloClient.topicAsFlowable( + topic: String, + tail: Int = 0, + backpressureStrategy: BackpressureStrategy, +) = Flowable.create( + { emitter -> + val subscriptionHandle = subscribe( + topic, + { emitter.onNext(it) }, + { + emitter.apply { + onError(it) + onComplete() + } + }, + tail, + ) + emitter.setCancellable { subscriptionHandle.unsubscribe() } + }, + backpressureStrategy, +) diff --git a/flo/gradle.properties b/flo/gradle.properties new file mode 100644 index 00000000..1635b958 --- /dev/null +++ b/flo/gradle.properties @@ -0,0 +1,2 @@ +projectDescription=Client implementation of our websocket-baased pubsub protocol. +version=unspecified diff --git a/flo/src/main/AndroidManifest.xml b/flo/src/main/AndroidManifest.xml new file mode 100644 index 00000000..8f1cbde4 --- /dev/null +++ b/flo/src/main/AndroidManifest.xml @@ -0,0 +1,7 @@ + + + + + + + diff --git a/flo/src/main/kotlin/com/tidal/sdk/flo/core/FloClient.kt b/flo/src/main/kotlin/com/tidal/sdk/flo/core/FloClient.kt new file mode 100644 index 00000000..a82ef2cb --- /dev/null +++ b/flo/src/main/kotlin/com/tidal/sdk/flo/core/FloClient.kt @@ -0,0 +1,56 @@ +package com.tidal.sdk.flo.core + +import android.net.ConnectivityManager +import android.os.Handler +import com.tidal.sdk.flo.core.internal.SubscriptionManager +import okhttp3.Response + +/** + * A client that allows subscribing to topics on a given connection. + * + * @param connectivityManager A handle for Android-specific network operations. + * @param tokenProvider A provider of tokens for authorization. + * @param retryUponAuthorizationError Criteria to determine whether to retry or not upon receiving + * an authorization error. If a retry should happen, we will query for a token again. + * @param url The location for the socket connection. + * @param operationHandler A [Handler] to run internal operations on. + * @param callbackHandler A [Handler] to invoke callbacks on. + */ +class FloClient( + connectivityManager: ConnectivityManager, + tokenProvider: () -> String?, + retryUponAuthorizationError: (Response) -> Boolean, + url: String, + operationHandler: Handler, + callbackHandler: Handler, +) { + + private val subscriptionManager = SubscriptionManager( + url, + tokenProvider, + retryUponAuthorizationError, + connectivityManager, + operationHandler, + callbackHandler, + ) + + /** + * Subscribe to a given topic. + * + * @param topic The target topic. + * @param onMessage Message callback, invoked on the main thread. + * @param onError Error callback, invoked on the main thread. This callback being invoked + * implies termination of this subscription (equivalent to calling + * [SubscriptionHandle.unsubscribe] on the return of this function), meaning you are encouraged + * to release any references to the returned [SubscriptionHandle] when this happens. + * @param tail Indicates the amount of messages to be replayed on subscription. + * @return A handle to terminate the subscription. + */ + @JvmOverloads + fun subscribe( + topic: String, + onMessage: (String) -> Unit, + onError: (FloException) -> Unit, + tail: Int = 0, + ) = subscriptionManager.add(topic, onMessage, onError, tail) +} diff --git a/flo/src/main/kotlin/com/tidal/sdk/flo/core/FloException.kt b/flo/src/main/kotlin/com/tidal/sdk/flo/core/FloException.kt new file mode 100644 index 00000000..64dbcb11 --- /dev/null +++ b/flo/src/main/kotlin/com/tidal/sdk/flo/core/FloException.kt @@ -0,0 +1,8 @@ +package com.tidal.sdk.flo.core + +sealed class FloException(cause: Throwable) : Exception(cause) { + + class ConnectionLost internal constructor(cause: Throwable) : FloException(cause) + + class NotAuthorized internal constructor(cause: Throwable) : FloException(cause) +} diff --git a/flo/src/main/kotlin/com/tidal/sdk/flo/core/SubscriptionHandle.kt b/flo/src/main/kotlin/com/tidal/sdk/flo/core/SubscriptionHandle.kt new file mode 100644 index 00000000..eaf9e3b6 --- /dev/null +++ b/flo/src/main/kotlin/com/tidal/sdk/flo/core/SubscriptionHandle.kt @@ -0,0 +1,20 @@ +package com.tidal.sdk.flo.core + +import com.tidal.sdk.flo.core.internal.SubscriptionHandleImpl + +/** + * A handle for subscription termination. + */ +class SubscriptionHandle internal constructor(delegate: SubscriptionHandleImpl?) { + internal var delegate = delegate + set(value) { + requireNotNull(value) + field = null + } + + /** + * Requests the associated subscription to be terminated. This method is safe to call at any + * point. + */ + fun unsubscribe() = delegate?.unsubscribe() +} diff --git a/flo/src/main/kotlin/com/tidal/sdk/flo/core/internal/BackoffPolicy.kt b/flo/src/main/kotlin/com/tidal/sdk/flo/core/internal/BackoffPolicy.kt new file mode 100644 index 00000000..8e3ae3b9 --- /dev/null +++ b/flo/src/main/kotlin/com/tidal/sdk/flo/core/internal/BackoffPolicy.kt @@ -0,0 +1,19 @@ +package com.tidal.sdk.flo.core.internal + +/** + * Describes how to act when a connection attempt fails due to non-authorization-related issues. + */ +internal interface BackoffPolicy { + + /** + * Implementations should return a delay to wait until the next attempt is performed, or null if + * no further attempts should be made. If return a non-null value, implementations should make + * sure that the value returned is strictly larger than 0. Doing otherwise may result in + * unpredictable behavior and is not supported. + * + * @param accumulatedFailedAttempts How many attempts have been made during the history of this + * backoff. Will never be less than 1. + * @return An amount of time to wait until the next attempt is performed, in milliseconds. + */ + fun onFailedAttempt(accumulatedFailedAttempts: Int): Long? +} diff --git a/flo/src/main/kotlin/com/tidal/sdk/flo/core/internal/Command.kt b/flo/src/main/kotlin/com/tidal/sdk/flo/core/internal/Command.kt new file mode 100644 index 00000000..b41a573a --- /dev/null +++ b/flo/src/main/kotlin/com/tidal/sdk/flo/core/internal/Command.kt @@ -0,0 +1,91 @@ +package com.tidal.sdk.flo.core.internal + +import com.squareup.moshi.JsonAdapter +import com.squareup.moshi.JsonClass +import com.squareup.moshi.JsonReader +import com.squareup.moshi.JsonWriter +import com.squareup.moshi.Moshi +import com.squareup.moshi.adapters.PolymorphicJsonAdapterFactory +import com.squareup.moshi.rawType + +internal sealed class Command(val type: Type, val topic: String) { + + @JsonClass(generateAdapter = true) + class Subscribe(topic: String, val data: Data?) : Command(Type.SUBSCRIBE, topic) { + + sealed interface Data { + + @JsonClass(generateAdapter = true) + class Tail(val tail: Int) : Data + + @JsonClass(generateAdapter = true) + class LastId(val lastId: String) : Data + + private class Adapter(private val moshi: Moshi) : JsonAdapter() { + + override fun fromJson(reader: JsonReader) = + throw UnsupportedOperationException("Unmarshalling this type is unsupported") + + override fun toJson(writer: JsonWriter, value: Data?) { + if (value == null) { + writer.nullValue() + return + } + writer.value( + when (value) { + is Tail -> moshi.adapter(Tail::class.java).toJson(value) + is LastId -> moshi.adapter(LastId::class.java).toJson(value) + }, + ) + } + } + + class JsonAdapterFactory : JsonAdapter.Factory { + + override fun create( + type: java.lang.reflect.Type, + annotations: MutableSet, + moshi: Moshi, + ): JsonAdapter<*>? { + if (type.rawType != Data::class.java) { + return null + } + return Adapter(moshi) + } + } + } + } + + @JsonClass(generateAdapter = true) + class Unsubscribe(topic: String) : Command(Type.UNSUBSCRIBE, topic) + + @JsonClass(generateAdapter = false) + enum class Type { + + SUBSCRIBE, + UNSUBSCRIBE, + } + + class JsonAdapterFactory : JsonAdapter.Factory { + + override fun create( + type: java.lang.reflect.Type, + annotations: MutableSet, + moshi: Moshi, + ): JsonAdapter<*>? { + if (type.rawType != Command::class.java) { + return null + } + return PolymorphicJsonAdapterFactory.of(Command::class.java, KEY_TYPE) + .withSubtype(Subscribe::class.java, VALUE_TYPE_SUBSCRIBE) + .withSubtype(Unsubscribe::class.java, VALUE_TYPE_UNSUBSCRIBE) + .create(type, annotations, moshi) + } + } + + companion object { + private const val KEY_TYPE = "type" + private const val VALUE_TYPE_SUBSCRIBE = "SUBSCRIBE" + private const val VALUE_TYPE_UNSUBSCRIBE = "UNSUBSCRIBE" + } +} diff --git a/flo/src/main/kotlin/com/tidal/sdk/flo/core/internal/ConnectRunnable.kt b/flo/src/main/kotlin/com/tidal/sdk/flo/core/internal/ConnectRunnable.kt new file mode 100644 index 00000000..fc237f93 --- /dev/null +++ b/flo/src/main/kotlin/com/tidal/sdk/flo/core/internal/ConnectRunnable.kt @@ -0,0 +1,114 @@ +package com.tidal.sdk.flo.core.internal + +import android.os.Handler +import android.os.SystemClock +import com.squareup.moshi.Moshi +import com.tidal.sdk.flo.core.FloException +import java.util.Locale +import okhttp3.OkHttpClient +import okhttp3.Request +import okhttp3.Response + +internal class ConnectRunnable +// No DI framework to avoid classpath bloat so need to pipe dependencies explicitly +@Suppress("LongParameterList") +constructor( + private val url: String, + private val connectionMutableState: SubscriptionManager.ConnectionMutableState, + private val operationHandler: Handler, + private val okHttpClient: OkHttpClient, + private val tokenProvider: () -> String?, + private val retryUponAuthorizationError: (Response) -> Boolean, + private val moshi: Moshi, + private val terminalErrorManager: TerminalErrorManager, + private val backoffPolicy: BackoffPolicy, + private val failedAttempts: Int = 0, +) : Runnable { + + // ReturnCount -> Preferred over indenting + // TooGenericExceptionCaught -> We don't want singled-out behavior + @Suppress("ReturnCount", "TooGenericExceptionCaught") + override fun run() { + if (!connectionMutableState.isConnectionRequired) { + return + } + when (val connectionState = connectionMutableState.socketConnectionState) { + is SocketConnectionState.NotConnected -> Unit + is SocketConnectionState.Connecting.AwaitingBackoffExpiry -> + if ( + failedAttempts != 0 && + connectionState.retryAtMillis > SystemClock.uptimeMillis() + ) { + return + } + + else -> return + } + connectionMutableState.socketConnectionState = SocketConnectionState.Connecting.ForReal + try { + okHttpClient.newWebSocket( + Request.Builder() + .apply { + val token = tokenProvider() ?: return + header( + HEADER_NAME_AUTHORIZATION, + TEMPLATE_HEADER_VALUE_BEARER_AUTHORIZATION.format( + token, + Locale.ENGLISH + ), + ) + } + .url(url) + .build(), + ConnectionWebSocketListener( + url, + connectionMutableState, + operationHandler, + okHttpClient, + tokenProvider, + retryUponAuthorizationError, + moshi, + terminalErrorManager, + backoffPolicy, + ), + ) + } catch (throwable: Throwable) { + retryIfBackoffAllows(throwable) + } + } + + private fun retryIfBackoffAllows(throwable: Throwable) { + val failedAttemptsIncludingSelf = failedAttempts + 1 + val nextAttemptDelayMillis = backoffPolicy.onFailedAttempt(failedAttemptsIncludingSelf) + if (nextAttemptDelayMillis == null) { + terminalErrorManager.dispatchErrorAndTerminateConnection( + connectionMutableState, + FloException.ConnectionLost(throwable), + ) + return + } + val nextAttemptAtMillis = SystemClock.uptimeMillis() + nextAttemptDelayMillis + connectionMutableState.socketConnectionState = + SocketConnectionState.Connecting.AwaitingBackoffExpiry(nextAttemptAtMillis) + operationHandler.postAtTime( + ConnectRunnable( + url, + connectionMutableState, + operationHandler, + okHttpClient, + tokenProvider, + retryUponAuthorizationError, + moshi, + terminalErrorManager, + backoffPolicy, + failedAttemptsIncludingSelf, + ), + nextAttemptAtMillis, + ) + } + + companion object { + private const val HEADER_NAME_AUTHORIZATION = "Authorization" + private const val TEMPLATE_HEADER_VALUE_BEARER_AUTHORIZATION = "Bearer %s" + } +} diff --git a/flo/src/main/kotlin/com/tidal/sdk/flo/core/internal/ConnectionRestorationNetworkConnectivityCallback.kt b/flo/src/main/kotlin/com/tidal/sdk/flo/core/internal/ConnectionRestorationNetworkConnectivityCallback.kt new file mode 100644 index 00000000..b9b2120d --- /dev/null +++ b/flo/src/main/kotlin/com/tidal/sdk/flo/core/internal/ConnectionRestorationNetworkConnectivityCallback.kt @@ -0,0 +1,51 @@ +package com.tidal.sdk.flo.core.internal + +import android.net.ConnectivityManager +import android.net.Network +import android.net.NetworkCapabilities +import android.os.Handler +import com.squareup.moshi.Moshi +import okhttp3.OkHttpClient +import okhttp3.Response + +internal class ConnectionRestorationNetworkConnectivityCallback +// No DI framework to avoid classpath bloat so need to pipe dependencies explicitly +@Suppress("LongParameterList") +constructor( + private val operationHandler: Handler, + private val mutableState: SubscriptionManager.MutableState, + private val url: String, + private val okHttpClient: OkHttpClient, + private val tokenProvider: () -> String?, + private val retryUponAuthorizationError: (Response) -> Boolean, + private val moshi: Moshi, + private val terminalErrorManager: TerminalErrorManager, + private val backoffPolicy: BackoffPolicy, +) : ConnectivityManager.NetworkCallback() { + + override fun onAvailable(network: Network) { + maybeDispatchConnect() + } + + override fun onCapabilitiesChanged(network: Network, networkCapabilities: NetworkCapabilities) { + if (networkCapabilities.hasCapability(NetworkCapabilities.NET_CAPABILITY_INTERNET)) { + maybeDispatchConnect() + } + } + + private fun maybeDispatchConnect() = mutableState.connectionMutableState?.let { + operationHandler.post( + ConnectRunnable( + url, + it, + operationHandler, + okHttpClient, + tokenProvider, + retryUponAuthorizationError, + moshi, + terminalErrorManager, + backoffPolicy, + ), + ) + } +} diff --git a/flo/src/main/kotlin/com/tidal/sdk/flo/core/internal/ConnectionWebSocketListener.kt b/flo/src/main/kotlin/com/tidal/sdk/flo/core/internal/ConnectionWebSocketListener.kt new file mode 100644 index 00000000..2815a85e --- /dev/null +++ b/flo/src/main/kotlin/com/tidal/sdk/flo/core/internal/ConnectionWebSocketListener.kt @@ -0,0 +1,99 @@ +package com.tidal.sdk.flo.core.internal + +import android.os.Handler +import com.squareup.moshi.Moshi +import com.tidal.sdk.flo.core.FloException +import okhttp3.OkHttpClient +import okhttp3.Response +import okhttp3.WebSocket +import okhttp3.WebSocketListener + +internal class ConnectionWebSocketListener +// No DI framework to avoid classpath bloat so need to pipe dependencies explicitly +@Suppress("LongParameterList") +constructor( + private val url: String, + private val connectionMutableState: SubscriptionManager.ConnectionMutableState, + private val operationHandler: Handler, + private val okHttpClient: OkHttpClient, + private val tokenProvider: () -> String?, + private val retryUponAuthorizationError: (Response) -> Boolean, + private val moshi: Moshi, + private val terminalErrorManager: TerminalErrorManager, + private val backoffPolicy: BackoffPolicy, +) : WebSocketListener() { + + override fun onFailure(webSocket: WebSocket, t: Throwable, response: Response?) { + ifRequiredOrClose(webSocket) { + if (response?.code == HTTP_CODE_401 && !retryUponAuthorizationError(response)) { + terminalErrorManager.dispatchErrorAndTerminateConnection( + connectionMutableState, + FloException.NotAuthorized(t), + ) + return@ifRequiredOrClose + } + connectionMutableState.socketConnectionState = SocketConnectionState.NotConnected + ConnectRunnable( + url, + connectionMutableState, + operationHandler, + okHttpClient, + tokenProvider, + retryUponAuthorizationError, + moshi, + terminalErrorManager, + backoffPolicy, + ).run() + } + } + + override fun onMessage(webSocket: WebSocket, text: String) { + ifRequiredOrClose(webSocket) { + val event = moshi.adapter(Event::class.java).fromJson(text) + if (event == null || event is Event.UnsubscribeSuccess) { + return@ifRequiredOrClose + } + val subscriptionIdentifier = SubscriptionIdentifier(event.topic) + val subscriptionDescriptor = + connectionMutableState.desiredSubscriptions[subscriptionIdentifier] + if (subscriptionDescriptor == null) { + UnsubscribeSendCommandRunnable(connectionMutableState, moshi, event.topic).run() + return@ifRequiredOrClose + } + if (event is Event.SubscribeSuccess) { + connectionMutableState.desiredSubscriptions[subscriptionIdentifier] = + SubscriptionDescriptor(subscriptionDescriptor, null) + } else if (event is Event.Message) { + if (event.data.id != null) { + connectionMutableState.desiredSubscriptions[subscriptionIdentifier] = + SubscriptionDescriptor( + subscriptionDescriptor, + SubscriptionDescriptor.ReplayStrategy.LastId(event.data.id), + ) + } + subscriptionDescriptor.onMessage(event.data.payload) + } + } + } + + override fun onOpen(webSocket: WebSocket, response: Response) { + ifRequiredOrClose(webSocket) { + val connected = SocketConnectionState.Connected(webSocket) + connectionMutableState.socketConnectionState = connected + connectionMutableState.desiredSubscriptions.forEach { + SubscribeSendCommandRunnable( + connectionMutableState, + moshi, + it.key.topic, + it.value.replayStrategy, + ) + .run() + } + } + } + + private fun ifRequiredOrClose(webSocket: WebSocket, block: () -> Unit) = + operationHandler.post(IfRequiredOrCloseRunnable(connectionMutableState, webSocket, block)) +} + +private const val HTTP_CODE_401 = 401 diff --git a/flo/src/main/kotlin/com/tidal/sdk/flo/core/internal/DefaultBackoffPolicy.kt b/flo/src/main/kotlin/com/tidal/sdk/flo/core/internal/DefaultBackoffPolicy.kt new file mode 100644 index 00000000..c4378918 --- /dev/null +++ b/flo/src/main/kotlin/com/tidal/sdk/flo/core/internal/DefaultBackoffPolicy.kt @@ -0,0 +1,20 @@ +package com.tidal.sdk.flo.core.internal + +import android.os.SystemClock + +internal class DefaultBackoffPolicy : BackoffPolicy { + + private val firstAttemptFailedAtMs by lazy { SystemClock.uptimeMillis() } + + override fun onFailedAttempt(accumulatedFailedAttempts: Int) = + if (firstAttemptFailedAtMs - SystemClock.uptimeMillis() > DROP_AFTER_MS) { + null + } else { + ATTEMPT_DELAY_MS + } + + companion object { + private const val DROP_AFTER_MS = 5 * 60 * 1_000 + private const val ATTEMPT_DELAY_MS = 10 * 1_000L + } +} diff --git a/flo/src/main/kotlin/com/tidal/sdk/flo/core/internal/DispatchSubscribeRunnable.kt b/flo/src/main/kotlin/com/tidal/sdk/flo/core/internal/DispatchSubscribeRunnable.kt new file mode 100644 index 00000000..2ec0fe86 --- /dev/null +++ b/flo/src/main/kotlin/com/tidal/sdk/flo/core/internal/DispatchSubscribeRunnable.kt @@ -0,0 +1,82 @@ +package com.tidal.sdk.flo.core.internal + +import android.net.ConnectivityManager +import android.os.Handler +import com.squareup.moshi.Moshi +import com.tidal.sdk.flo.core.FloException +import okhttp3.OkHttpClient +import okhttp3.Response + +internal class DispatchSubscribeRunnable +// LongParameterList -> No DI framework to avoid classpath bloat so need to pipe deps explicitly +@Suppress("LongParameterList", "MaxLineLength") +constructor( + private val url: String, + private val mutableState: SubscriptionManager.MutableState, + private val operationHandler: Handler, + private val callbackHandler: Handler, + private val connectivityManager: ConnectivityManager, + private val connectionRestorationNetworkConnectivityCallback: ConnectionRestorationNetworkConnectivityCallback, // ktlint-disable max-line-length parameter-wrapping + private val okHttpClient: OkHttpClient, + private val tokenProvider: () -> String?, + private val retryUponAuthorizationError: (Response) -> Boolean, + private val moshi: Moshi, + private val terminalErrorManager: TerminalErrorManager, + private val backoffPolicy: BackoffPolicy, + private val topic: String, + private val onMessage: (String) -> Unit, + private val onError: (FloException) -> Unit, + private val tail: Int, +) : Runnable { + + override fun run() { + val connectionMutableState = + mutableState.connectionMutableState ?: SubscriptionManager.ConnectionMutableState() + mutableState.connectionMutableState = connectionMutableState + val key = SubscriptionIdentifier(topic) + check(connectionMutableState.desiredSubscriptions.containsKey(key)) { + "Concurrent topic subscriptions on the same url are not supported - topic=$topic" + } + val subscriptionDescriptor = SubscriptionDescriptor( + PostRunnableToHandlerFunction( + callbackHandler, + InvokeFunction1Runnable.Factory(onMessage), + ), + PostRunnableToHandlerFunction( + callbackHandler, + InvokeFunction1Runnable.Factory(onError), + ), + SubscriptionDescriptor.ReplayStrategy.Tail.ifMeaningful(tail), + ) + connectionMutableState.desiredSubscriptions[key] = subscriptionDescriptor + if (!mutableState.isNetworkConnectivityCallbackCurrentlyRegistered) { + RegisterDefaultNetworkCallbackRunnable( + mutableState, + operationHandler, + connectivityManager, + connectionRestorationNetworkConnectivityCallback, + url, + 0, + ).run() + } + if (connectionMutableState.socketConnectionState is SocketConnectionState.NotConnected) { + ConnectRunnable( + url, + connectionMutableState, + operationHandler, + okHttpClient, + tokenProvider, + retryUponAuthorizationError, + moshi, + terminalErrorManager, + backoffPolicy, + ).run() + } + SubscribeSendCommandRunnable( + connectionMutableState, + moshi, + topic, + subscriptionDescriptor.replayStrategy, + ).run() + } +} diff --git a/flo/src/main/kotlin/com/tidal/sdk/flo/core/internal/DispatchUnsubscribeRunnable.kt b/flo/src/main/kotlin/com/tidal/sdk/flo/core/internal/DispatchUnsubscribeRunnable.kt new file mode 100644 index 00000000..70c50745 --- /dev/null +++ b/flo/src/main/kotlin/com/tidal/sdk/flo/core/internal/DispatchUnsubscribeRunnable.kt @@ -0,0 +1,40 @@ +package com.tidal.sdk.flo.core.internal + +import android.net.ConnectivityManager +import com.squareup.moshi.Moshi + +internal class DispatchUnsubscribeRunnable +@Suppress("MaxLineLength") +constructor( + private val mutableState: SubscriptionManager.MutableState, + private val connectivityManager: ConnectivityManager, + private val connectionRestorationNetworkConnectivityCallback: ConnectionRestorationNetworkConnectivityCallback, // ktlint-disable max-line-length parameter-wrapping + private val moshi: Moshi, + private val topic: String, +) : Runnable { + + override fun run() { + val connectionMutableState = mutableState.connectionMutableState ?: return + val key = SubscriptionIdentifier(topic) + connectionMutableState.desiredSubscriptions.remove(key) + UnsubscribeSendCommandRunnable(connectionMutableState, moshi, topic).run() + if (!connectionMutableState.isConnectionRequired) { + if (mutableState.isNetworkConnectivityCallbackCurrentlyRegistered) { + connectivityManager.unregisterNetworkCallback( + connectionRestorationNetworkConnectivityCallback, + ) + mutableState.isNetworkConnectivityCallbackCurrentlyRegistered = false + } + val connectionState = connectionMutableState.socketConnectionState + if (connectionState is SocketConnectionState.Connected) { + connectionState.webSocket.close(CLOSURE_STATUS_PURPOSE_FULFILLED, null) + connectionMutableState.socketConnectionState = SocketConnectionState.NotConnected + } + mutableState.connectionMutableState = null + } + } + + companion object { + private const val CLOSURE_STATUS_PURPOSE_FULFILLED = 1000 + } +} diff --git a/flo/src/main/kotlin/com/tidal/sdk/flo/core/internal/Event.kt b/flo/src/main/kotlin/com/tidal/sdk/flo/core/internal/Event.kt new file mode 100644 index 00000000..0129976e --- /dev/null +++ b/flo/src/main/kotlin/com/tidal/sdk/flo/core/internal/Event.kt @@ -0,0 +1,104 @@ +package com.tidal.sdk.flo.core.internal + +import com.squareup.moshi.JsonAdapter +import com.squareup.moshi.JsonClass +import com.squareup.moshi.JsonDataException +import com.squareup.moshi.JsonReader +import com.squareup.moshi.JsonWriter +import com.squareup.moshi.Moshi +import com.squareup.moshi.adapters.PolymorphicJsonAdapterFactory +import com.squareup.moshi.rawType +import java.lang.reflect.Type + +internal sealed class Event(val topic: String) { + + class Message(topic: String, val data: Data) : Event(topic) { + + @JsonClass(generateAdapter = true) + class Data( + val payload: String, + val id: String?, + ) + + private class Adapter(private val moshi: Moshi) : JsonAdapter() { + + override fun fromJson(reader: JsonReader): Message { + reader.beginObject() + var name: String? = null + var topic: String? = null + var data: Data? = null + while (reader.hasNext()) { + when (val key = reader.nextName()) { + KEY_NAME -> name = reader.nextString() + KEY_TOPIC -> topic = reader.nextString() + KEY_DATA -> + data = moshi.adapter(Data::class.java).fromJson(reader.nextString()) + + else -> throw JsonDataException("Unexpected key: \"$key\"") + } + } + reader.endObject() + if (!VALUE_NAME_MESSAGE.contentEquals(name)) { + throw JsonDataException("Invalid name: \"$name\"") + } + if (topic == null) { + throw JsonDataException("Missing key: \"$KEY_TOPIC\"") + } + if (data == null) { + throw JsonDataException("Missing key: \"$KEY_DATA\"") + } + return Message(topic, data) + } + + override fun toJson(writer: JsonWriter, value: Message?) = + throw UnsupportedOperationException("Marshalling this type is unsupported") + } + + class JsonAdapterFactory : JsonAdapter.Factory { + + override fun create( + type: Type, + annotations: MutableSet, + moshi: Moshi, + ): JsonAdapter<*>? { + if (type.rawType != Message::class.java) { + return null + } + return Adapter(moshi) + } + } + } + + @JsonClass(generateAdapter = true) + class SubscribeSuccess(topic: String) : Event(topic) + + @JsonClass(generateAdapter = true) + class UnsubscribeSuccess(topic: String) : Event(topic) + + class JsonAdapterFactory : JsonAdapter.Factory { + + override fun create( + type: Type, + annotations: MutableSet, + moshi: Moshi, + ): JsonAdapter<*>? { + if (type.rawType != Event::class.java) { + return null + } + return PolymorphicJsonAdapterFactory.of(Event::class.java, KEY_NAME) + .withSubtype(Message::class.java, VALUE_NAME_MESSAGE) + .withSubtype(SubscribeSuccess::class.java, VALUE_NAME_SUBSCRIBE_SUCCESS) + .withSubtype(UnsubscribeSuccess::class.java, VALUE_NAME_UNSUBSCRIBE_SUCCESS) + .create(type, annotations, moshi) + } + } + + companion object { + private const val KEY_NAME = "name" + private const val KEY_TOPIC = "topic" + private const val KEY_DATA = "data" + private const val VALUE_NAME_MESSAGE = "MESSAGE" + private const val VALUE_NAME_SUBSCRIBE_SUCCESS = "SUBSCRIBE_SUCCESS" + private const val VALUE_NAME_UNSUBSCRIBE_SUCCESS = "UNSUBSCRIBE_SUCCESS" + } +} diff --git a/flo/src/main/kotlin/com/tidal/sdk/flo/core/internal/IfRequiredOrCloseRunnable.kt b/flo/src/main/kotlin/com/tidal/sdk/flo/core/internal/IfRequiredOrCloseRunnable.kt new file mode 100644 index 00000000..4f0848ad --- /dev/null +++ b/flo/src/main/kotlin/com/tidal/sdk/flo/core/internal/IfRequiredOrCloseRunnable.kt @@ -0,0 +1,23 @@ +package com.tidal.sdk.flo.core.internal + +import okhttp3.WebSocket + +internal class IfRequiredOrCloseRunnable( + private val connectionMutableState: SubscriptionManager.ConnectionMutableState, + private val webSocket: WebSocket, + private val block: () -> Unit, +) : Runnable { + + override fun run() { + if (connectionMutableState.isConnectionRequired) { + block() + } else { + webSocket.close(CLOSURE_STATUS_GOING_AWAY, null) + connectionMutableState.socketConnectionState = SocketConnectionState.NotConnected + } + } + + companion object { + private const val CLOSURE_STATUS_GOING_AWAY = 1001 + } +} diff --git a/flo/src/main/kotlin/com/tidal/sdk/flo/core/internal/InvokeFunction1Runnable.kt b/flo/src/main/kotlin/com/tidal/sdk/flo/core/internal/InvokeFunction1Runnable.kt new file mode 100644 index 00000000..98aece8c --- /dev/null +++ b/flo/src/main/kotlin/com/tidal/sdk/flo/core/internal/InvokeFunction1Runnable.kt @@ -0,0 +1,12 @@ +package com.tidal.sdk.flo.core.internal + +internal class InvokeFunction1Runnable(private val function: (T) -> Unit, private val param: T) : + Runnable { + + override fun run() = function(param) + + class Factory(private val function: (T) -> Unit) : (T) -> InvokeFunction1Runnable { + + override operator fun invoke(param: T) = InvokeFunction1Runnable(function, param) + } +} diff --git a/flo/src/main/kotlin/com/tidal/sdk/flo/core/internal/PostRunnableToHandlerFunction.kt b/flo/src/main/kotlin/com/tidal/sdk/flo/core/internal/PostRunnableToHandlerFunction.kt new file mode 100644 index 00000000..7fa568f6 --- /dev/null +++ b/flo/src/main/kotlin/com/tidal/sdk/flo/core/internal/PostRunnableToHandlerFunction.kt @@ -0,0 +1,13 @@ +package com.tidal.sdk.flo.core.internal + +import android.os.Handler + +internal class PostRunnableToHandlerFunction( + private val targetHandler: Handler, + private val runnableFactory: (T) -> Runnable, +) : (T) -> Unit { + + override fun invoke(data: T) { + targetHandler.post(runnableFactory(data)) + } +} diff --git a/flo/src/main/kotlin/com/tidal/sdk/flo/core/internal/RegisterDefaultNetworkCallbackRunnable.kt b/flo/src/main/kotlin/com/tidal/sdk/flo/core/internal/RegisterDefaultNetworkCallbackRunnable.kt new file mode 100644 index 00000000..bca58d15 --- /dev/null +++ b/flo/src/main/kotlin/com/tidal/sdk/flo/core/internal/RegisterDefaultNetworkCallbackRunnable.kt @@ -0,0 +1,47 @@ +package com.tidal.sdk.flo.core.internal + +import android.net.ConnectivityManager +import android.os.Handler +import android.os.SystemClock +import kotlin.time.Duration.Companion.seconds + +internal class RegisterDefaultNetworkCallbackRunnable +@Suppress("MaxLineLength") +constructor( + private val mutableState: SubscriptionManager.MutableState, + private val operationHandler: Handler, + private val connectivityManager: ConnectivityManager, + private val connectionRestorationNetworkConnectivityCallback: ConnectionRestorationNetworkConnectivityCallback, // ktlint-disable max-line-length parameter-wrapping + private val url: String, + private val failureCount: Int, +) : Runnable { + + // SwallowedException -> It is useless + // TooGenericExceptionCaught -> We don't want singled-out behavior + @Suppress("SwallowedException", "TooGenericExceptionCaught") + override fun run() { + if (mutableState.isNetworkConnectivityCallbackCurrentlyRegistered) { + return + } + try { + connectivityManager.registerDefaultNetworkCallback( + connectionRestorationNetworkConnectivityCallback, + ) + mutableState.isNetworkConnectivityCallbackCurrentlyRegistered = true + } catch (tooManyRequestsException: RuntimeException) { + val failuresIncludingSelf = failureCount + 1 + operationHandler.postAtTime( + RegisterDefaultNetworkCallbackRunnable( + mutableState, + operationHandler, + connectivityManager, + connectionRestorationNetworkConnectivityCallback, + url, + failuresIncludingSelf, + ), + url, + SystemClock.uptimeMillis() + 1.seconds.inWholeMilliseconds * failuresIncludingSelf, + ) + } + } +} diff --git a/flo/src/main/kotlin/com/tidal/sdk/flo/core/internal/SendCommandRunnable.kt b/flo/src/main/kotlin/com/tidal/sdk/flo/core/internal/SendCommandRunnable.kt new file mode 100644 index 00000000..9123a75a --- /dev/null +++ b/flo/src/main/kotlin/com/tidal/sdk/flo/core/internal/SendCommandRunnable.kt @@ -0,0 +1,18 @@ +package com.tidal.sdk.flo.core.internal + +import com.squareup.moshi.Moshi + +internal abstract class SendCommandRunnable( + private val connectionMutableState: SubscriptionManager.ConnectionMutableState, + private val moshi: Moshi, +) : Runnable { + + abstract val command: Command + + override fun run() { + val connectionState = connectionMutableState.socketConnectionState + if (connectionState is SocketConnectionState.Connected) { + connectionState.webSocket.send(moshi.adapter(Command::class.java).toJson(command)) + } + } +} diff --git a/flo/src/main/kotlin/com/tidal/sdk/flo/core/internal/SocketConnectionState.kt b/flo/src/main/kotlin/com/tidal/sdk/flo/core/internal/SocketConnectionState.kt new file mode 100644 index 00000000..c7739c36 --- /dev/null +++ b/flo/src/main/kotlin/com/tidal/sdk/flo/core/internal/SocketConnectionState.kt @@ -0,0 +1,17 @@ +package com.tidal.sdk.flo.core.internal + +import okhttp3.WebSocket + +internal sealed interface SocketConnectionState { + + object NotConnected : SocketConnectionState + + sealed interface Connecting : SocketConnectionState { + + data class AwaitingBackoffExpiry(val retryAtMillis: Long) : Connecting + + object ForReal : Connecting + } + + data class Connected(val webSocket: WebSocket) : SocketConnectionState +} diff --git a/flo/src/main/kotlin/com/tidal/sdk/flo/core/internal/SubscribeSendCommandRunnable.kt b/flo/src/main/kotlin/com/tidal/sdk/flo/core/internal/SubscribeSendCommandRunnable.kt new file mode 100644 index 00000000..76b1f856 --- /dev/null +++ b/flo/src/main/kotlin/com/tidal/sdk/flo/core/internal/SubscribeSendCommandRunnable.kt @@ -0,0 +1,23 @@ +package com.tidal.sdk.flo.core.internal + +import com.squareup.moshi.Moshi + +internal class SubscribeSendCommandRunnable( + connectionMutableState: SubscriptionManager.ConnectionMutableState, + moshi: Moshi, + topic: String, + replayStrategy: SubscriptionDescriptor.ReplayStrategy?, +) : SendCommandRunnable(connectionMutableState, moshi) { + + override val command = Command.Subscribe( + topic, + when (replayStrategy) { + null -> null + is SubscriptionDescriptor.ReplayStrategy.Tail -> + Command.Subscribe.Data.Tail(replayStrategy.tail) + + is SubscriptionDescriptor.ReplayStrategy.LastId -> + Command.Subscribe.Data.LastId(replayStrategy.lastId) + }, + ) +} diff --git a/flo/src/main/kotlin/com/tidal/sdk/flo/core/internal/SubscriptionDescriptor.kt b/flo/src/main/kotlin/com/tidal/sdk/flo/core/internal/SubscriptionDescriptor.kt new file mode 100644 index 00000000..8d7f05e0 --- /dev/null +++ b/flo/src/main/kotlin/com/tidal/sdk/flo/core/internal/SubscriptionDescriptor.kt @@ -0,0 +1,34 @@ +package com.tidal.sdk.flo.core.internal + +import com.tidal.sdk.flo.core.FloException + +internal class SubscriptionDescriptor( + val onMessage: (String) -> Unit, + val onError: (FloException) -> Unit, + val replayStrategy: ReplayStrategy?, +) { + + constructor(subscriptionDescriptor: SubscriptionDescriptor, replayStrategy: ReplayStrategy?) : + this( + subscriptionDescriptor.onMessage, + subscriptionDescriptor.onError, + replayStrategy, + ) + + sealed interface ReplayStrategy { + + class Tail private constructor(val tail: Int) : ReplayStrategy { + + companion object { + + fun ifMeaningful(tail: Int) = if (tail > 0) { + Tail(tail) + } else { + null + } + } + } + + class LastId(val lastId: String) : ReplayStrategy + } +} diff --git a/flo/src/main/kotlin/com/tidal/sdk/flo/core/internal/SubscriptionHandleImpl.kt b/flo/src/main/kotlin/com/tidal/sdk/flo/core/internal/SubscriptionHandleImpl.kt new file mode 100644 index 00000000..fddab841 --- /dev/null +++ b/flo/src/main/kotlin/com/tidal/sdk/flo/core/internal/SubscriptionHandleImpl.kt @@ -0,0 +1,9 @@ +package com.tidal.sdk.flo.core.internal + +internal class SubscriptionHandleImpl( + private val subscriptionManager: SubscriptionManager, + private val topic: String, +) { + + fun unsubscribe() = subscriptionManager.remove(topic) +} diff --git a/flo/src/main/kotlin/com/tidal/sdk/flo/core/internal/SubscriptionIdentifier.kt b/flo/src/main/kotlin/com/tidal/sdk/flo/core/internal/SubscriptionIdentifier.kt new file mode 100644 index 00000000..74a28d38 --- /dev/null +++ b/flo/src/main/kotlin/com/tidal/sdk/flo/core/internal/SubscriptionIdentifier.kt @@ -0,0 +1,4 @@ +package com.tidal.sdk.flo.core.internal + +@JvmInline +internal value class SubscriptionIdentifier(val topic: String) diff --git a/flo/src/main/kotlin/com/tidal/sdk/flo/core/internal/SubscriptionManager.kt b/flo/src/main/kotlin/com/tidal/sdk/flo/core/internal/SubscriptionManager.kt new file mode 100644 index 00000000..dc4a1a8c --- /dev/null +++ b/flo/src/main/kotlin/com/tidal/sdk/flo/core/internal/SubscriptionManager.kt @@ -0,0 +1,101 @@ +package com.tidal.sdk.flo.core.internal + +import android.net.ConnectivityManager +import android.os.Handler +import com.squareup.moshi.Moshi +import com.tidal.sdk.flo.core.FloException +import com.tidal.sdk.flo.core.SubscriptionHandle +import okhttp3.OkHttpClient +import okhttp3.Response + +internal class SubscriptionManager( + private val url: String, + private val tokenProvider: () -> String?, + private val retryUponAuthorizationError: (Response) -> Boolean, + private val connectivityManager: ConnectivityManager, + private val operationHandler: Handler, + private val callbackHandler: Handler, +) { + + private val mutableState = MutableState() + private val okHttpClient = OkHttpClient() + private val moshi = Moshi.Builder() + .add(Event.JsonAdapterFactory()) + .add(Event.Message.JsonAdapterFactory()) + .add(Command.JsonAdapterFactory()) + .add(Command.Subscribe.Data.JsonAdapterFactory()) + .build() + private val terminalErrorManager = TerminalErrorManager() + private val defaultBackoffPolicy = DefaultBackoffPolicy() + private val connectionRestorationNetworkConnectivityCallback = + ConnectionRestorationNetworkConnectivityCallback( + operationHandler, + mutableState, + url, + okHttpClient, + tokenProvider, + retryUponAuthorizationError, + moshi, + terminalErrorManager, + defaultBackoffPolicy, + ) + + fun add( + topic: String, + onMessage: (String) -> Unit, + onError: (FloException) -> Unit, + tail: Int, + ): SubscriptionHandle { + val subscriptionHandle = SubscriptionHandle(SubscriptionHandleImpl(this, topic)) + operationHandler.post( + DispatchSubscribeRunnable( + url, + mutableState, + operationHandler, + callbackHandler, + connectivityManager, + connectionRestorationNetworkConnectivityCallback, + okHttpClient, + tokenProvider, + retryUponAuthorizationError, + moshi, + terminalErrorManager, + defaultBackoffPolicy, + topic, + onMessage, + { + subscriptionHandle.delegate = null + onError(it) + }, + tail, + ), + ) + return subscriptionHandle + } + + fun remove(topic: String) { + operationHandler.post( + DispatchUnsubscribeRunnable( + mutableState, + connectivityManager, + connectionRestorationNetworkConnectivityCallback, + moshi, + topic, + ), + ) + } + + class MutableState { + + var connectionMutableState: ConnectionMutableState? = null + var isNetworkConnectivityCallbackCurrentlyRegistered = false + } + + class ConnectionMutableState { + + var desiredSubscriptions = mutableMapOf() + var socketConnectionState: SocketConnectionState = SocketConnectionState.NotConnected + val isConnectionRequired: Boolean + get() = desiredSubscriptions.isNotEmpty() + } +} diff --git a/flo/src/main/kotlin/com/tidal/sdk/flo/core/internal/TerminalErrorManager.kt b/flo/src/main/kotlin/com/tidal/sdk/flo/core/internal/TerminalErrorManager.kt new file mode 100644 index 00000000..a3abc218 --- /dev/null +++ b/flo/src/main/kotlin/com/tidal/sdk/flo/core/internal/TerminalErrorManager.kt @@ -0,0 +1,19 @@ +package com.tidal.sdk.flo.core.internal + +import com.tidal.sdk.flo.core.FloException + +internal class TerminalErrorManager { + + fun dispatchErrorAndTerminateConnection( + connectionMutableState: SubscriptionManager.ConnectionMutableState, + floException: FloException, + ) { + if (connectionMutableState.isConnectionRequired) { + connectionMutableState.desiredSubscriptions.apply { + forEach { it.value.onError(floException) } + clear() + } + } + connectionMutableState.socketConnectionState = SocketConnectionState.NotConnected + } +} diff --git a/flo/src/main/kotlin/com/tidal/sdk/flo/core/internal/UnsubscribeSendCommandRunnable.kt b/flo/src/main/kotlin/com/tidal/sdk/flo/core/internal/UnsubscribeSendCommandRunnable.kt new file mode 100644 index 00000000..f70593e4 --- /dev/null +++ b/flo/src/main/kotlin/com/tidal/sdk/flo/core/internal/UnsubscribeSendCommandRunnable.kt @@ -0,0 +1,12 @@ +package com.tidal.sdk.flo.core.internal + +import com.squareup.moshi.Moshi + +internal class UnsubscribeSendCommandRunnable( + connectionMutableState: SubscriptionManager.ConnectionMutableState, + moshi: Moshi, + topic: String, +) : SendCommandRunnable(connectionMutableState, moshi) { + + override val command = Command.Unsubscribe(topic) +} diff --git a/gradle/libs.versions.toml b/gradle/libs.versions.toml index 4a038a5d..23abdf81 100644 --- a/gradle/libs.versions.toml +++ b/gradle/libs.versions.toml @@ -62,6 +62,7 @@ truetime = "com.github.instacart.truetime-android:library-extension-rx:3.5" # Moshi moshi = { module = "com.squareup.moshi:moshi", version.ref ="moshi"} +moshi-adapters = { module = "com.squareup.moshi:moshi-adapters", version.ref = "moshi" } moshi-codegen = { module = "com.squareup.moshi:moshi-kotlin-codegen", version.ref = "moshi" } # Networking @@ -80,10 +81,15 @@ kotlinx-datetime = { module = "org.jetbrains.kotlinx:kotlinx-datetime", version kotlinx-serialization-json = { module = "org.jetbrains.kotlinx:kotlinx-serialization-json", version = "1.6.3" } kotlinx-serialization-retrofit-converter = { module = "com.jakewharton.retrofit:retrofit2-kotlinx-serialization-converter", version = "1.0.0" } +rxjava3 = { module = "io.reactivex.rxjava3:rxjava", version = "3.1.10" } +rxjava2 = { module = "io.reactivex.rxjava2:rxjava", version = "2.2.21" } +rxjava = { module = "io.reactivex:rxjava", version = "1.3.8" } + # SDK tidal-sdk-common = { module = "com.tidal.sdk:common", version = "0.2.5" } tidal-sdk-auth = { module = "com.tidal.sdk:auth", version = "0.10.1" } tidal-sdk-eventproducer = { module = "com.tidal.sdk:eventproducer", version = "0.3.2" } +tidal-sdk-flo = { module = "com.tidal.sdk:flo", version = "1.0.0" } tidal-sdk-player = { module = "com.tidal.sdk:player", version = "0.0.39" } tidal-sdk-tidalapi = { module = "com.tidal.sdk:tidalapi", version = "0.1.0" } diff --git a/settings.gradle.kts b/settings.gradle.kts index 301706d2..2091cab0 100644 --- a/settings.gradle.kts +++ b/settings.gradle.kts @@ -50,4 +50,17 @@ listOf( includeFromDefaultHierarchy("auth") includeFromDefaultHierarchy("common") includeFromDefaultHierarchy("eventproducer") +includeFromDefaultHierarchy("flo") +listOf( + "kotlincoroutines", + "rxjava", + "rxjava2", + "rxjava3", +).forEach { + val projectName = "flo:extensions:$it" + include(projectName) + project(":$projectName").projectDir = project(":flo").projectDir + .resolve("extensions") + .resolve(it) +} includeFromDefaultHierarchy("tidalapi")