From 59d45aa79e82a238cdfa58baa1c505f69c2c8c68 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Yi=C4=9Fit=20=C3=96zg=C3=BCm=C3=BC=C5=9F?= Date: Sun, 22 Sep 2024 23:59:21 +0300 Subject: [PATCH 01/17] Add annotations for Internal and Experimental APIs --- .../com/trendyol/transmission/Annotations.kt | 16 ++++++++++++++++ 1 file changed, 16 insertions(+) create mode 100644 transmission/src/main/java/com/trendyol/transmission/Annotations.kt diff --git a/transmission/src/main/java/com/trendyol/transmission/Annotations.kt b/transmission/src/main/java/com/trendyol/transmission/Annotations.kt new file mode 100644 index 0000000..e9812b3 --- /dev/null +++ b/transmission/src/main/java/com/trendyol/transmission/Annotations.kt @@ -0,0 +1,16 @@ +package com.trendyol.transmission + +/** + * Marks internal declarations in Transmission. Internal declarations must not be used outside the library. + * There are no backward compatibility guarantees between different versions of Transmission. + */ +@RequiresOptIn(level = RequiresOptIn.Level.ERROR) +@Retention(AnnotationRetention.BINARY) +annotation class InternalTransmissionApi + +/** + * Marks experimental API in Transmission. An experimental API can be changed or removed at any time. + */ +@RequiresOptIn(level = RequiresOptIn.Level.WARNING) +@Retention(AnnotationRetention.BINARY) +annotation class ExperimentalTransmissionApi From da8ff8e04fc57b953d5b8875e7da28e2c654f3ab Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Yi=C4=9Fit=20=C3=96zg=C3=BCm=C3=BC=C5=9F?= Date: Sun, 22 Sep 2024 23:59:44 +0300 Subject: [PATCH 02/17] Add CheckpointHandler API methods --- .../checkpoint/CheckpointHandler.kt | 32 +++++++++++++++++++ 1 file changed, 32 insertions(+) create mode 100644 transmission/src/main/java/com/trendyol/transmission/transformer/checkpoint/CheckpointHandler.kt diff --git a/transmission/src/main/java/com/trendyol/transmission/transformer/checkpoint/CheckpointHandler.kt b/transmission/src/main/java/com/trendyol/transmission/transformer/checkpoint/CheckpointHandler.kt new file mode 100644 index 0000000..7a9607e --- /dev/null +++ b/transmission/src/main/java/com/trendyol/transmission/transformer/checkpoint/CheckpointHandler.kt @@ -0,0 +1,32 @@ +package com.trendyol.transmission.transformer.checkpoint + +import com.trendyol.transmission.ExperimentalTransmissionApi +import com.trendyol.transmission.transformer.handler.CommunicationScope +import com.trendyol.transmission.transformer.request.Contract + +interface CheckpointHandler { + + @ExperimentalTransmissionApi + suspend fun CommunicationScope.pauseOn( + contract: Contract.Checkpoint, + resumeBlock: suspend CommunicationScope.() -> Unit + ) + + @ExperimentalTransmissionApi + suspend fun CommunicationScope.pauseOn( + vararg contract: Contract.Checkpoint, + resumeBlock: suspend CommunicationScope.() -> Unit + ) + + @ExperimentalTransmissionApi + suspend fun , A : Any> CommunicationScope.pauseOn( + contract: C, + resumeBlock: suspend CommunicationScope.(args: A) -> Unit + ) + + @ExperimentalTransmissionApi + suspend fun validate(contract: Contract.Checkpoint) + + @ExperimentalTransmissionApi + suspend fun , A : Any> validate(contract: C, args: A) +} From dcb552302ff8ebcc5a7a0aa94d91ee5059842b1f Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Yi=C4=9Fit=20=C3=96zg=C3=BCm=C3=BC=C5=9F?= Date: Mon, 23 Sep 2024 00:00:06 +0300 Subject: [PATCH 03/17] Add CheckpointTracker Data structure to keep track of checkpoints --- .../checkpoint/CheckpointTracker.kt | 31 +++++++++++++++++++ 1 file changed, 31 insertions(+) create mode 100644 transmission/src/main/java/com/trendyol/transmission/transformer/checkpoint/CheckpointTracker.kt diff --git a/transmission/src/main/java/com/trendyol/transmission/transformer/checkpoint/CheckpointTracker.kt b/transmission/src/main/java/com/trendyol/transmission/transformer/checkpoint/CheckpointTracker.kt new file mode 100644 index 0000000..e68799d --- /dev/null +++ b/transmission/src/main/java/com/trendyol/transmission/transformer/checkpoint/CheckpointTracker.kt @@ -0,0 +1,31 @@ +package com.trendyol.transmission.transformer.checkpoint + +import com.trendyol.transmission.transformer.request.Contract +import java.util.concurrent.ConcurrentHashMap +import java.util.concurrent.ConcurrentMap + +internal class CheckpointTracker { + private val tracker: ConcurrentMap> = + ConcurrentHashMap() + + fun putOrCreate(contract: Contract, barrierOwner: Contract.Identity, identifier: String) { + tracker + .putIfAbsent( + contract, + ArrayDeque().apply { + addLast( + IdentifierBundle( + barrierOwner, + identifier + ) + ) + }) + ?.addLast(IdentifierBundle(barrierOwner, identifier)) + } + + fun useIdentifier(contract: Contract): IdentifierBundle? { + return tracker[contract]?.removeFirstOrNull() + } +} + +internal class IdentifierBundle(val barrierOwner: Contract.Identity, val value: String) From a3a97512a393fbd5982e4b5012c9f8f4aec56c82 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Yi=C4=9Fit=20=C3=96zg=C3=BCm=C3=BC=C5=9F?= Date: Mon, 23 Sep 2024 00:00:17 +0300 Subject: [PATCH 04/17] Add Checkpoint validation frequency --- .../transmission/transformer/checkpoint/Frequency.kt | 7 +++++++ 1 file changed, 7 insertions(+) create mode 100644 transmission/src/main/java/com/trendyol/transmission/transformer/checkpoint/Frequency.kt diff --git a/transmission/src/main/java/com/trendyol/transmission/transformer/checkpoint/Frequency.kt b/transmission/src/main/java/com/trendyol/transmission/transformer/checkpoint/Frequency.kt new file mode 100644 index 0000000..04d3028 --- /dev/null +++ b/transmission/src/main/java/com/trendyol/transmission/transformer/checkpoint/Frequency.kt @@ -0,0 +1,7 @@ +package com.trendyol.transmission.transformer.checkpoint + + +sealed interface Frequency { + data object Once : Frequency + data object Continous : Frequency +} From bfc6009133cf55379b566ff24b62876c9538586f Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Yi=C4=9Fit=20=C3=96zg=C3=BCm=C3=BC=C5=9F?= Date: Mon, 23 Sep 2024 00:00:37 +0300 Subject: [PATCH 05/17] Make CommunicationScope also extend CheckpointHandler --- .../transmission/transformer/handler/CommunicationScope.kt | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/transmission/src/main/java/com/trendyol/transmission/transformer/handler/CommunicationScope.kt b/transmission/src/main/java/com/trendyol/transmission/transformer/handler/CommunicationScope.kt index 9f9971e..53de3be 100644 --- a/transmission/src/main/java/com/trendyol/transmission/transformer/handler/CommunicationScope.kt +++ b/transmission/src/main/java/com/trendyol/transmission/transformer/handler/CommunicationScope.kt @@ -3,10 +3,11 @@ package com.trendyol.transmission.transformer.handler import com.trendyol.transmission.Transmission import com.trendyol.transmission.router.TransmissionRouter import com.trendyol.transmission.transformer.Transformer +import com.trendyol.transmission.transformer.checkpoint.CheckpointHandler import com.trendyol.transmission.transformer.request.Contract import com.trendyol.transmission.transformer.request.RequestHandler -interface CommunicationScope : RequestHandler { +interface CommunicationScope : RequestHandler, CheckpointHandler { /** * Sends data to [TransmissionRouter] * @param data of type [Transmission.Data] From 2bc5ac1dae64273fc316378479e676475c76456e Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Yi=C4=9Fit=20=C3=96zg=C3=BCm=C3=BC=C5=9F?= Date: Mon, 23 Sep 2024 00:01:03 +0300 Subject: [PATCH 06/17] Remove redundant generic type from execution register method --- .../transformer/request/execution/ExecutionExt.kt | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/transmission/src/main/java/com/trendyol/transmission/transformer/request/execution/ExecutionExt.kt b/transmission/src/main/java/com/trendyol/transmission/transformer/request/execution/ExecutionExt.kt index 3ff1d0a..3b0e2e9 100644 --- a/transmission/src/main/java/com/trendyol/transmission/transformer/request/execution/ExecutionExt.kt +++ b/transmission/src/main/java/com/trendyol/transmission/transformer/request/execution/ExecutionExt.kt @@ -13,8 +13,8 @@ import com.trendyol.transmission.transformer.request.RequestHandler * Can be queried using [RequestHandler.execute] * @param execution execution to get the result [Transmission.Data] */ -fun ExecutionScope.register( - contract: C, +fun ExecutionScope.register( + contract: Contract.Execution, execution: suspend RequestHandler.() -> Unit, ) { this.executionRegistry.buildWith(contract.key, execution) From 204ca584d89b749c35dd28d14e34ce6ca4fa0e18 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Yi=C4=9Fit=20=C3=96zg=C3=BCm=C3=BC=C5=9F?= Date: Mon, 23 Sep 2024 00:01:22 +0300 Subject: [PATCH 07/17] Add Checkpoint and CheckpointWithArgs contracts, make Execution Inline --- .../transmission/transformer/request/Contract.kt | 14 +++++++++++++- 1 file changed, 13 insertions(+), 1 deletion(-) diff --git a/transmission/src/main/java/com/trendyol/transmission/transformer/request/Contract.kt b/transmission/src/main/java/com/trendyol/transmission/transformer/request/Contract.kt index 4a20b4f..7aa40a4 100644 --- a/transmission/src/main/java/com/trendyol/transmission/transformer/request/Contract.kt +++ b/transmission/src/main/java/com/trendyol/transmission/transformer/request/Contract.kt @@ -1,6 +1,7 @@ package com.trendyol.transmission.transformer.request import com.trendyol.transmission.Transmission +import com.trendyol.transmission.transformer.checkpoint.Frequency sealed interface Contract { @@ -20,9 +21,20 @@ sealed interface Contract { internal val useCache: Boolean = false ) : Contract - class Execution internal constructor(internal val key: String) : Contract + @JvmInline + value class Execution internal constructor(internal val key: String) : Contract class ExecutionWithArgs internal constructor( internal val key: String ) : Contract + + class Checkpoint internal constructor( + internal val key: String, + internal val frequency: Frequency + ) : Contract + + class CheckpointWithArgs internal constructor( + internal val key: String, + internal val frequency: Frequency + ) : Contract } From b55569a0031f8cbf40efb4a27591c21f76bf9301 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Yi=C4=9Fit=20=C3=96zg=C3=BCm=C3=BC=C5=9F?= Date: Mon, 23 Sep 2024 00:01:39 +0300 Subject: [PATCH 08/17] Add Contract creation extension functions for checkpoints --- .../transformer/request/ContractExt.kt | 20 +++++++++++++++++++ 1 file changed, 20 insertions(+) diff --git a/transmission/src/main/java/com/trendyol/transmission/transformer/request/ContractExt.kt b/transmission/src/main/java/com/trendyol/transmission/transformer/request/ContractExt.kt index 4b9c02e..b229401 100644 --- a/transmission/src/main/java/com/trendyol/transmission/transformer/request/ContractExt.kt +++ b/transmission/src/main/java/com/trendyol/transmission/transformer/request/ContractExt.kt @@ -1,7 +1,9 @@ package com.trendyol.transmission.transformer.request +import com.trendyol.transmission.ExperimentalTransmissionApi import com.trendyol.transmission.Transmission import com.trendyol.transmission.identifier.IdentifierGenerator +import com.trendyol.transmission.transformer.checkpoint.Frequency object Contracts @@ -38,3 +40,21 @@ fun Contracts.execution(): Contract.Execution { fun Contracts.executionWithArgs(): Contract.ExecutionWithArgs { return Contract.ExecutionWithArgs(key = IdentifierGenerator.generateIdentifier()) } + +@ExperimentalTransmissionApi +fun Contracts.checkpoint(frequency: Frequency = Frequency.Once): Contract.Checkpoint { + return Contract.Checkpoint( + key = IdentifierGenerator.generateIdentifier(), + frequency = frequency + ) +} + +@ExperimentalTransmissionApi +fun Contracts.checkpointWithArgs( + frequency: Frequency = Frequency.Once +): Contract.CheckpointWithArgs { + return Contract.CheckpointWithArgs( + key = IdentifierGenerator.generateIdentifier(), + frequency = frequency + ) +} From d4642e3cbdb9e38cdb10271ec8649ab0070b3c39 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Yi=C4=9Fit=20=C3=96zg=C3=BCm=C3=BC=C5=9F?= Date: Mon, 23 Sep 2024 00:02:32 +0300 Subject: [PATCH 09/17] Add Query and QueryResult classes for Checkpoint --- .../trendyol/transmission/transformer/request/Query.kt | 7 +++++++ .../transmission/transformer/request/QueryResult.kt | 10 +++++++++- 2 files changed, 16 insertions(+), 1 deletion(-) diff --git a/transmission/src/main/java/com/trendyol/transmission/transformer/request/Query.kt b/transmission/src/main/java/com/trendyol/transmission/transformer/request/Query.kt index 14818a0..b841da9 100644 --- a/transmission/src/main/java/com/trendyol/transmission/transformer/request/Query.kt +++ b/transmission/src/main/java/com/trendyol/transmission/transformer/request/Query.kt @@ -31,4 +31,11 @@ internal sealed interface Query { val key: String, val args: A, ) : Query + + class Checkpoint( + val sender: String, + val key: String, + val args: A, + val queryIdentifier: String, + ) : Query } diff --git a/transmission/src/main/java/com/trendyol/transmission/transformer/request/QueryResult.kt b/transmission/src/main/java/com/trendyol/transmission/transformer/request/QueryResult.kt index 579ebbe..aa6a85b 100644 --- a/transmission/src/main/java/com/trendyol/transmission/transformer/request/QueryResult.kt +++ b/transmission/src/main/java/com/trendyol/transmission/transformer/request/QueryResult.kt @@ -2,7 +2,7 @@ package com.trendyol.transmission.transformer.request import com.trendyol.transmission.Transmission -sealed class QueryResult( +internal sealed class QueryResult( open val owner: String, open val key: String, ) { @@ -19,4 +19,12 @@ sealed class QueryResult( val data: D?, val resultIdentifier: String, ) : QueryResult(owner, key) + + class Checkpoint( + override val owner: String, + override val key: String, + val data: D, + val resultIdentifier: String, + ) : QueryResult(owner, key) + } From 7b0a35d82c3cc8e33d7163bc14ba9651e54c1ccb Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Yi=C4=9Fit=20=C3=96zg=C3=BCm=C3=BC=C5=9F?= Date: Mon, 23 Sep 2024 00:06:40 +0300 Subject: [PATCH 10/17] Add API implementations for Checkpoint API to scopeBuilder and RequestDelegate --- .../transformer/CommunicationScopeBuilder.kt | 50 ++++- .../request/TransformerRequestDelegate.kt | 194 +++++++++++++++++- 2 files changed, 237 insertions(+), 7 deletions(-) diff --git a/transmission/src/main/java/com/trendyol/transmission/transformer/CommunicationScopeBuilder.kt b/transmission/src/main/java/com/trendyol/transmission/transformer/CommunicationScopeBuilder.kt index 5a15700..c485bc7 100644 --- a/transmission/src/main/java/com/trendyol/transmission/transformer/CommunicationScopeBuilder.kt +++ b/transmission/src/main/java/com/trendyol/transmission/transformer/CommunicationScopeBuilder.kt @@ -1,5 +1,6 @@ package com.trendyol.transmission.transformer +import com.trendyol.transmission.ExperimentalTransmissionApi import com.trendyol.transmission.Transmission import com.trendyol.transmission.effect.EffectWrapper import com.trendyol.transmission.transformer.handler.CommunicationScope @@ -7,6 +8,7 @@ import com.trendyol.transmission.transformer.request.Contract import com.trendyol.transmission.transformer.request.TransformerRequestDelegate import kotlinx.coroutines.channels.Channel +@OptIn(ExperimentalTransmissionApi::class) internal class CommunicationScopeBuilder( private val effectChannel: Channel, private val dataChannel: Channel, @@ -29,14 +31,14 @@ internal class CommunicationScopeBuilder( } override suspend fun , D : Transmission.Data> getData(contract: C): D? { - return requestDelegate.interactor.getData(contract) + return requestDelegate.requestHandler.getData(contract) } override suspend fun , D : Any> compute( contract: C, invalidate: Boolean ): D? { - return requestDelegate.interactor.compute(contract, invalidate) + return requestDelegate.requestHandler.compute(contract, invalidate) } override suspend fun , A : Any, D : Any> compute( @@ -44,17 +46,55 @@ internal class CommunicationScopeBuilder( args: A, invalidate: Boolean ): D? { - return requestDelegate.interactor.compute(contract, args, invalidate) + return requestDelegate.requestHandler.compute(contract, args, invalidate) } override suspend fun execute(contract: Contract.Execution) { - requestDelegate.interactor.execute(contract) + requestDelegate.requestHandler.execute(contract) } override suspend fun , A : Any> execute( contract: C, args: A ) { - requestDelegate.interactor.execute(contract, args) + requestDelegate.requestHandler.execute(contract, args) + } + + override suspend fun CommunicationScope.pauseOn( + contract: Contract.Checkpoint, + resumeBlock: suspend CommunicationScope.() -> Unit + ) { + with(requestDelegate.checkpointHandler) { + pauseOn(contract, resumeBlock) + } + } + + override suspend fun CommunicationScope.pauseOn( + vararg contract: Contract.Checkpoint, + resumeBlock: suspend CommunicationScope.() -> Unit + ) { + with(requestDelegate.checkpointHandler) { + pauseOn(contract = contract, resumeBlock = resumeBlock) + } + } + + override suspend fun , A : Any> CommunicationScope.pauseOn( + contract: C, + resumeBlock: suspend CommunicationScope.(args: A) -> Unit + ) { + with(requestDelegate.checkpointHandler) { + pauseOn(contract, resumeBlock) + } + } + + override suspend fun validate(contract: Contract.Checkpoint) { + requestDelegate.checkpointHandler.validate(contract) + } + + override suspend fun , A : Any> validate( + contract: C, + args: A + ) { + requestDelegate.checkpointHandler.validate(contract, args) } } diff --git a/transmission/src/main/java/com/trendyol/transmission/transformer/request/TransformerRequestDelegate.kt b/transmission/src/main/java/com/trendyol/transmission/transformer/request/TransformerRequestDelegate.kt index 3ce398b..07a5ab2 100644 --- a/transmission/src/main/java/com/trendyol/transmission/transformer/request/TransformerRequestDelegate.kt +++ b/transmission/src/main/java/com/trendyol/transmission/transformer/request/TransformerRequestDelegate.kt @@ -1,20 +1,210 @@ package com.trendyol.transmission.transformer.request +import com.trendyol.transmission.ExperimentalTransmissionApi import com.trendyol.transmission.Transmission import com.trendyol.transmission.identifier.IdentifierGenerator import com.trendyol.transmission.router.createBroadcast +import com.trendyol.transmission.transformer.checkpoint.CheckpointHandler +import com.trendyol.transmission.transformer.checkpoint.CheckpointTracker +import com.trendyol.transmission.transformer.checkpoint.Frequency +import com.trendyol.transmission.transformer.handler.CommunicationScope import kotlinx.coroutines.CoroutineScope import kotlinx.coroutines.channels.Channel +import kotlinx.coroutines.flow.drop import kotlinx.coroutines.flow.filter import kotlinx.coroutines.flow.filterIsInstance import kotlinx.coroutines.flow.first +import java.util.concurrent.ConcurrentHashMap +import java.util.concurrent.ConcurrentMap -internal class TransformerRequestDelegate(scope: CoroutineScope, identity: Contract.Identity) { +@ExperimentalTransmissionApi +internal class TransformerRequestDelegate( + scope: CoroutineScope, + checkpointTrackerProvider: () -> CheckpointTracker?, + identity: Contract.Identity +) { val outGoingQuery: Channel = Channel(capacity = Channel.BUFFERED) val resultBroadcast = scope.createBroadcast() - val interactor: RequestHandler = object : RequestHandler { + private val frequencyTracker: MutableSet = mutableSetOf() + private val arbitraryFrequencyTracker: MutableSet> = mutableSetOf() + private val frequencyWithArgsTracker: ConcurrentMap, Any> = + ConcurrentHashMap() + + val checkpointHandler: CheckpointHandler by lazy { + object : CheckpointHandler { + + override suspend fun CommunicationScope.pauseOn( + contract: Contract.Checkpoint, + resumeBlock: suspend CommunicationScope.() -> Unit + ) { + when (contract.frequency) { + Frequency.Continous -> { + val queryIdentifier = IdentifierGenerator.generateIdentifier() + checkpointTrackerProvider()?.putOrCreate( + contract = contract, + barrierOwner = identity, + identifier = queryIdentifier + ) + resultBroadcast.output.filterIsInstance>() + .filter { it.resultIdentifier == queryIdentifier && it.key == contract.key } + .collect { + resumeBlock.invoke(this) + } + } + + Frequency.Once -> { + if (frequencyTracker.contains(contract)) { + resumeBlock.invoke(this) + } else { + val queryIdentifier = IdentifierGenerator.generateIdentifier() + checkpointTrackerProvider()?.putOrCreate( + contract = contract, + barrierOwner = identity, + identifier = queryIdentifier + ) + frequencyTracker.add(contract) + resultBroadcast.output.filterIsInstance>() + .filter { it.resultIdentifier == queryIdentifier && it.key == contract.key } + .collect { + resumeBlock.invoke(this) + } + } + } + } + } + + @ExperimentalTransmissionApi + override suspend fun CommunicationScope.pauseOn( + vararg contract: Contract.Checkpoint, + resumeBlock: suspend CommunicationScope.() -> Unit + ) { + val contractList = contract.toList() + check(contractList.distinctBy { it.frequency }.size == 1) { + "All Barriers should have the same frequency" + } + check(contractList.isNotEmpty()) { + "At least one barrier should be provided" + } + check(contractList.toSet().size == contractList.size) { + "All Barrier Contracts should be unique" + } + when (contractList.first().frequency) { + Frequency.Continous -> { + val queryIdentifier = IdentifierGenerator.generateIdentifier() + contractList.forEach { internalContract -> + checkpointTrackerProvider()?.putOrCreate( + contract = internalContract, + barrierOwner = identity, + identifier = queryIdentifier + ) + } + resultBroadcast.output.filterIsInstance>() + .filter { it.resultIdentifier == queryIdentifier } + .drop(contractList.size.dec()) + .collect { + resumeBlock.invoke(this) + } + + } + + Frequency.Once -> { + if (arbitraryFrequencyTracker.contains(contractList.toSet())) { + resumeBlock.invoke(this) + } else { + val queryIdentifier = IdentifierGenerator.generateIdentifier() + contractList.forEach { internalContract -> + checkpointTrackerProvider()?.putOrCreate( + contract = internalContract, + barrierOwner = identity, + identifier = queryIdentifier + ) + } + arbitraryFrequencyTracker.add(contractList.toSet()) + resultBroadcast.output.filterIsInstance>() + .filter { it.resultIdentifier == queryIdentifier } + .drop(contractList.size.dec()) + .collect { + resumeBlock.invoke(this) + } + } + } + } + + } + + override suspend fun , A : Any> CommunicationScope.pauseOn( + contract: C, + resumeBlock: suspend CommunicationScope.(args: A) -> Unit + ) { + when (contract.frequency) { + Frequency.Continous -> { + val queryIdentifier = IdentifierGenerator.generateIdentifier() + checkpointTrackerProvider()?.putOrCreate( + contract = contract, + barrierOwner = identity, + identifier = queryIdentifier + ) + resultBroadcast.output.filterIsInstance>() + .filter { it.resultIdentifier == queryIdentifier && it.key == contract.key } + .collect { + resumeBlock.invoke(this, it.data) + } + } + + Frequency.Once -> { + if (frequencyWithArgsTracker.containsKey(contract)) { + @Suppress("UNCHECKED_CAST") + resumeBlock.invoke(this, frequencyWithArgsTracker[contract] as A) + } else { + val queryIdentifier = IdentifierGenerator.generateIdentifier() + checkpointTrackerProvider()?.putOrCreate( + contract = contract, + barrierOwner = identity, + identifier = queryIdentifier + ) + resultBroadcast.output.filterIsInstance>() + .filter { it.resultIdentifier == queryIdentifier && it.key == contract.key } + .collect { + frequencyWithArgsTracker[contract] = it.data + resumeBlock.invoke(this, it.data) + } + } + } + } + } + + override suspend fun validate(contract: Contract.Checkpoint) { + val identifier = checkpointTrackerProvider()?.useIdentifier(contract) ?: return + outGoingQuery.send( + Query.Checkpoint( + sender = identifier.barrierOwner.key, + key = contract.key, + args = Unit, + queryIdentifier = identifier.value + ) + ) + } + + override suspend fun , A : Any> validate( + contract: C, + args: A + ) { + val identifier = checkpointTrackerProvider()?.useIdentifier(contract) ?: return + outGoingQuery.send( + Query.Checkpoint( + sender = identifier.barrierOwner.key, + key = contract.key, + args = args, + queryIdentifier = identifier.value + ) + ) + } + } + } + + val requestHandler: RequestHandler = object : RequestHandler { override suspend fun , D : Transmission.Data> getData(contract: C): D? { val queryIdentifier = IdentifierGenerator.generateIdentifier() From 314499e839c06096a86dda2d89306d09ad72f86e Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Yi=C4=9Fit=20=C3=96zg=C3=BCm=C3=BC=C5=9F?= Date: Mon, 23 Sep 2024 00:07:32 +0300 Subject: [PATCH 11/17] Update Transformer to correctly pass down CheckpointTracker provider --- .../transmission/transformer/Transformer.kt | 29 +++++++++++++++---- 1 file changed, 23 insertions(+), 6 deletions(-) diff --git a/transmission/src/main/java/com/trendyol/transmission/transformer/Transformer.kt b/transmission/src/main/java/com/trendyol/transmission/transformer/Transformer.kt index 7307e91..e60b90b 100644 --- a/transmission/src/main/java/com/trendyol/transmission/transformer/Transformer.kt +++ b/transmission/src/main/java/com/trendyol/transmission/transformer/Transformer.kt @@ -1,8 +1,10 @@ package com.trendyol.transmission.transformer +import com.trendyol.transmission.ExperimentalTransmissionApi import com.trendyol.transmission.Transmission import com.trendyol.transmission.effect.EffectWrapper import com.trendyol.transmission.effect.RouterEffect +import com.trendyol.transmission.transformer.checkpoint.CheckpointTracker import com.trendyol.transmission.transformer.handler.CommunicationScope import com.trendyol.transmission.transformer.handler.HandlerRegistry import com.trendyol.transmission.transformer.request.Contract @@ -30,6 +32,7 @@ import kotlinx.coroutines.flow.receiveAsFlow import kotlinx.coroutines.launch import kotlinx.coroutines.supervisorScope +@OptIn(ExperimentalTransmissionApi::class) open class Transformer( identity: Contract.Identity = Contracts.identity(), dispatcher: CoroutineDispatcher = Dispatchers.Default, @@ -45,7 +48,14 @@ open class Transformer( private val _identity: Contract.Identity = identity private val effectChannel: Channel = Channel(capacity = Channel.BUFFERED) - private val requestDelegate = TransformerRequestDelegate(transformerScope, _identity) + private var checkpointProvider: () -> CheckpointTracker? = { null } + private val requestDelegate by lazy { + TransformerRequestDelegate( + scope = transformerScope, + checkpointTrackerProvider = checkpointProvider, + identity = _identity + ) + } internal val dataChannel: Channel = Channel(capacity = Channel.BUFFERED) internal val storage = TransformerStorage() @@ -57,11 +67,18 @@ open class Transformer( var currentEffectProcessing: Job? = null var currentSignalProcessing: Job? = null - val communicationScope: CommunicationScope = CommunicationScopeBuilder( - effectChannel = effectChannel, - dataChannel = dataChannel, - requestDelegate = requestDelegate - ) + val communicationScope: CommunicationScope by lazy { + CommunicationScopeBuilder( + effectChannel = effectChannel, + dataChannel = dataChannel, + requestDelegate = requestDelegate + ) + } + + + internal fun bindCheckpointTracker(tracker: CheckpointTracker) { + checkpointProvider = { tracker } + } internal fun startSignalCollection(incoming: SharedFlow) { transformerScope.launch { From 567267da5d3586b3796220b8a3f700721cbe8a17 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Yi=C4=9Fit=20=C3=96zg=C3=BCm=C3=BC=C5=9F?= Date: Mon, 23 Sep 2024 00:30:22 +0300 Subject: [PATCH 12/17] Update Request Delegate and Router for Checkpoint API update passing --- .../transmission/router/RequestDelegate.kt | 38 ++++++++++++------- .../transmission/router/TransmissionRouter.kt | 4 ++ 2 files changed, 28 insertions(+), 14 deletions(-) diff --git a/transmission/src/main/java/com/trendyol/transmission/router/RequestDelegate.kt b/transmission/src/main/java/com/trendyol/transmission/router/RequestDelegate.kt index 2864e46..0f7f27f 100644 --- a/transmission/src/main/java/com/trendyol/transmission/router/RequestDelegate.kt +++ b/transmission/src/main/java/com/trendyol/transmission/router/RequestDelegate.kt @@ -1,5 +1,3 @@ -@file:OptIn(ExperimentalUuidApi::class) - package com.trendyol.transmission.router import com.trendyol.transmission.Transmission @@ -20,7 +18,6 @@ import kotlinx.coroutines.flow.first import kotlinx.coroutines.flow.receiveAsFlow import kotlinx.coroutines.flow.shareIn import kotlinx.coroutines.launch -import kotlin.uuid.ExperimentalUuidApi internal class RequestDelegate( private val queryScope: CoroutineScope, @@ -49,18 +46,30 @@ internal class RequestDelegate( // region process queries - private suspend fun processQuery(query: Query) = queryScope.launch { + private fun processQuery(query: Query) = queryScope.launch { when (query) { is Query.Computation -> processComputationQuery(query) is Query.Data -> processDataQuery(query) is Query.ComputationWithArgs<*> -> processComputationQueryWithArgs(query) is Query.Execution -> processExecution(query) is Query.ExecutionWithArgs<*> -> processExecutionWithArgs(query) + is Query.Checkpoint<*> -> processBarrier(query) } } + private fun processBarrier(query: Query.Checkpoint) = queryScope.launch { + queryResultChannel.send( + QueryResult.Checkpoint( + owner = query.sender, + key = query.key, + data = query.args, + resultIdentifier = query.queryIdentifier + ) + ) + } + private fun processDataQuery( - query: Query.Data + query: Query.Data, ) = queryScope.launch { val dataHolder = routerRef.transformerSet .filter { it.storage.isHolderStateInitialized() } @@ -79,7 +88,7 @@ internal class RequestDelegate( } private fun processComputationQuery( - query: Query.Computation + query: Query.Computation, ) = queryScope.launch { val computationHolder = routerRef.transformerSet .find { it.storage.hasComputation(query.key) } @@ -110,7 +119,7 @@ internal class RequestDelegate( } private fun processComputationQueryWithArgs( - query: Query.ComputationWithArgs + query: Query.ComputationWithArgs, ) = queryScope.launch { val computationHolder = routerRef.transformerSet .find { it.storage.hasComputation(query.key) } @@ -145,7 +154,7 @@ internal class RequestDelegate( } private fun processExecution( - query: Query.Execution + query: Query.Execution, ) = queryScope.launch { val executionHolder = routerRef.transformerSet .find { it.storage.hasExecution(query.key) } ?: return@launch @@ -176,11 +185,12 @@ internal class RequestDelegate( is Query.ComputationWithArgs<*> -> testComputationQueryWithArgs(query) is Query.Execution -> {} is Query.ExecutionWithArgs<*> -> {} + is Query.Checkpoint<*> -> TODO() } } private fun testDataQuery( - query: Query.Data + query: Query.Data, ) = queryScope.launch { val dataToSend = QueryResult.Data( owner = query.sender, @@ -194,7 +204,7 @@ internal class RequestDelegate( } private fun testComputationQuery( - query: Query.Computation + query: Query.Computation, ) = queryScope.launch { val computationToSend = QueryResult.Computation( owner = query.sender, @@ -206,7 +216,7 @@ internal class RequestDelegate( } private fun testComputationQueryWithArgs( - query: Query.ComputationWithArgs + query: Query.ComputationWithArgs, ) = queryScope.launch { val computationToSend = QueryResult.Computation( owner = query.sender, @@ -236,7 +246,7 @@ internal class RequestDelegate( override suspend fun , D : Any> compute( contract: C, - invalidate: Boolean + invalidate: Boolean, ): D? { val queryIdentifier = IdentifierGenerator.generateIdentifier() outGoingQuery.send( @@ -256,7 +266,7 @@ internal class RequestDelegate( override suspend fun , A : Any, D : Any> compute( contract: C, args: A, - invalidate: Boolean + invalidate: Boolean, ): D? { val queryIdentifier = IdentifierGenerator.generateIdentifier() outGoingQuery.send( @@ -284,7 +294,7 @@ internal class RequestDelegate( override suspend fun , A : Any> execute( contract: C, - args: A + args: A, ) { outGoingQuery.send( Query.ExecutionWithArgs( diff --git a/transmission/src/main/java/com/trendyol/transmission/router/TransmissionRouter.kt b/transmission/src/main/java/com/trendyol/transmission/router/TransmissionRouter.kt index de5258b..f4a7678 100644 --- a/transmission/src/main/java/com/trendyol/transmission/router/TransmissionRouter.kt +++ b/transmission/src/main/java/com/trendyol/transmission/router/TransmissionRouter.kt @@ -5,6 +5,7 @@ import com.trendyol.transmission.effect.EffectWrapper import com.trendyol.transmission.router.builder.TransmissionRouterBuilderScope import com.trendyol.transmission.router.loader.TransformerSetLoader import com.trendyol.transmission.transformer.Transformer +import com.trendyol.transmission.transformer.checkpoint.CheckpointTracker import com.trendyol.transmission.transformer.request.Contract import com.trendyol.transmission.transformer.request.RequestHandler import kotlinx.coroutines.CoroutineDispatcher @@ -42,6 +43,8 @@ class TransmissionRouter internal constructor( private val dataBroadcast = routerScope.createBroadcast() private val effectBroadcast = routerScope.createBroadcast() + private val checkpointTracker = CheckpointTracker() + val dataStream = dataBroadcast.output val effectStream: SharedFlow = effectBroadcast.output.map { it.effect } .shareIn(routerScope, SharingStarted.Lazily) @@ -97,6 +100,7 @@ class TransmissionRouter internal constructor( } transformerSet.forEach { transformer -> transformer.run { + bindCheckpointTracker(checkpointTracker) startSignalCollection(incoming = signalBroadcast.output) startDataPublishing(data = dataBroadcast.producer) startEffectProcessing( From f2ac244b7beea799969d346fc06a6c1de25fb409 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Yi=C4=9Fit=20=C3=96zg=C3=BCm=C3=BC=C5=9F?= Date: Mon, 23 Sep 2024 00:30:39 +0300 Subject: [PATCH 13/17] Update components sample --- .../features/input/InputTransformer.kt | 20 ++++++++++++++++--- 1 file changed, 17 insertions(+), 3 deletions(-) diff --git a/sample/src/main/java/com/trendyol/transmission/components/features/input/InputTransformer.kt b/sample/src/main/java/com/trendyol/transmission/components/features/input/InputTransformer.kt index 9da503d..a343eac 100644 --- a/sample/src/main/java/com/trendyol/transmission/components/features/input/InputTransformer.kt +++ b/sample/src/main/java/com/trendyol/transmission/components/features/input/InputTransformer.kt @@ -1,7 +1,11 @@ package com.trendyol.transmission.components.features.input +import androidx.compose.ui.graphics.Color import com.trendyol.transmission.DefaultDispatcher +import com.trendyol.transmission.ExperimentalTransmissionApi +import com.trendyol.transmission.components.features.InputUiState import com.trendyol.transmission.components.features.colorpicker.ColorPickerEffect +import com.trendyol.transmission.components.features.multioutput.multiOutputTransformerIdentity import com.trendyol.transmission.transformer.Transformer import com.trendyol.transmission.transformer.dataholder.dataHolder import com.trendyol.transmission.transformer.handler.HandlerRegistry @@ -9,14 +13,13 @@ import com.trendyol.transmission.transformer.handler.handlers import com.trendyol.transmission.transformer.handler.onEffect import com.trendyol.transmission.transformer.handler.onSignal import com.trendyol.transmission.transformer.request.Contracts +import com.trendyol.transmission.transformer.request.checkpointWithArgs import com.trendyol.transmission.transformer.request.computation import com.trendyol.transmission.transformer.request.computation.ComputationRegistry import com.trendyol.transmission.transformer.request.computation.computations import com.trendyol.transmission.transformer.request.computation.register import com.trendyol.transmission.transformer.request.computationWithArgs import com.trendyol.transmission.transformer.request.dataHolder -import com.trendyol.transmission.transformer.request.identity -import com.trendyol.transmission.components.features.InputUiState import kotlinx.coroutines.CoroutineDispatcher import kotlinx.coroutines.delay import javax.inject.Inject @@ -38,19 +41,30 @@ class InputTransformer @Inject constructor( } } + @OptIn(ExperimentalTransmissionApi::class) override val handlers: HandlerRegistry = handlers { onSignal { signal -> holder.update { it.copy(writtenText = signal.value) } - publish(effect = InputEffect.InputUpdate(signal.value)) + pauseOn(colorCheckpoint) { color -> + send( + effect = ColorPickerEffect.SelectedColorUpdate(color), + identity = multiOutputTransformerIdentity + ) + publish(effect = InputEffect.InputUpdate(signal.value)) + } + pauseOn(colorCheckpoint, colorCheckpoint) {} } onEffect { effect -> + validate(colorCheckpoint, effect.color) holder.update { it.copy(backgroundColor = effect.color) } } } + @OptIn(ExperimentalTransmissionApi::class) companion object { val writtenInputWithArgs = Contracts.computationWithArgs() val writtenInputContract = Contracts.computation() val holderContract = Contracts.dataHolder() + val colorCheckpoint = Contracts.checkpointWithArgs() } } From cb8db2447011a0d9fb0fadce159c71bfabe080a9 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Yi=C4=9Fit=20=C3=96zg=C3=BCm=C3=BC=C5=9F?= Date: Mon, 23 Sep 2024 00:40:09 +0300 Subject: [PATCH 14/17] Add pauseOn variant with CheckpointWithArgs for 2 checkpoints --- .../checkpoint/CheckpointHandler.kt | 8 +++ .../request/TransformerRequestDelegate.kt | 64 +++++++++++++++++++ 2 files changed, 72 insertions(+) diff --git a/transmission/src/main/java/com/trendyol/transmission/transformer/checkpoint/CheckpointHandler.kt b/transmission/src/main/java/com/trendyol/transmission/transformer/checkpoint/CheckpointHandler.kt index 7a9607e..1051e4e 100644 --- a/transmission/src/main/java/com/trendyol/transmission/transformer/checkpoint/CheckpointHandler.kt +++ b/transmission/src/main/java/com/trendyol/transmission/transformer/checkpoint/CheckpointHandler.kt @@ -24,9 +24,17 @@ interface CheckpointHandler { resumeBlock: suspend CommunicationScope.(args: A) -> Unit ) + @ExperimentalTransmissionApi + suspend fun , C2 : Contract.CheckpointWithArgs, A : Any, B : Any> CommunicationScope.pauseOn( + contract: C, + contract2: C2, + resumeBlock: suspend CommunicationScope.(A, B) -> Unit + ) + @ExperimentalTransmissionApi suspend fun validate(contract: Contract.Checkpoint) @ExperimentalTransmissionApi suspend fun , A : Any> validate(contract: C, args: A) } + diff --git a/transmission/src/main/java/com/trendyol/transmission/transformer/request/TransformerRequestDelegate.kt b/transmission/src/main/java/com/trendyol/transmission/transformer/request/TransformerRequestDelegate.kt index 07a5ab2..eadb18e 100644 --- a/transmission/src/main/java/com/trendyol/transmission/transformer/request/TransformerRequestDelegate.kt +++ b/transmission/src/main/java/com/trendyol/transmission/transformer/request/TransformerRequestDelegate.kt @@ -10,6 +10,7 @@ import com.trendyol.transmission.transformer.checkpoint.Frequency import com.trendyol.transmission.transformer.handler.CommunicationScope import kotlinx.coroutines.CoroutineScope import kotlinx.coroutines.channels.Channel +import kotlinx.coroutines.flow.combine import kotlinx.coroutines.flow.drop import kotlinx.coroutines.flow.filter import kotlinx.coroutines.flow.filterIsInstance @@ -175,6 +176,69 @@ internal class TransformerRequestDelegate( } } + @ExperimentalTransmissionApi + override suspend fun , C2 : Contract.CheckpointWithArgs, A : Any, B : Any> CommunicationScope.pauseOn( + contract: C, + contract2: C2, + resumeBlock: suspend CommunicationScope.(A, B) -> Unit + ) { + when (contract.frequency) { + Frequency.Continous -> { + val queryIdentifier = IdentifierGenerator.generateIdentifier() + listOf(contract, contract2).forEach { + checkpointTrackerProvider()?.putOrCreate( + contract = it, + barrierOwner = identity, + identifier = queryIdentifier + ) + } + val flow1 = + resultBroadcast.output.filterIsInstance>() + .filter { it.resultIdentifier == queryIdentifier && it.key == contract.key } + val flow2 = + resultBroadcast.output.filterIsInstance>() + .filter { it.resultIdentifier == queryIdentifier && it.key == contract.key } + combine(flow1, flow2) { checkpoint1, checkpoint2 -> + resumeBlock.invoke(this, checkpoint1.data, checkpoint2.data) + } + } + + Frequency.Once -> { + if (frequencyWithArgsTracker.containsKey(contract) && frequencyWithArgsTracker.containsKey( + contract2 + ) + ) { + @Suppress("UNCHECKED_CAST") + resumeBlock.invoke( + this, + frequencyWithArgsTracker[contract] as A, + frequencyWithArgsTracker[contract2] as B + ) + } else { + val queryIdentifier = IdentifierGenerator.generateIdentifier() + listOf(contract, contract2).forEach { + checkpointTrackerProvider()?.putOrCreate( + contract = it, + barrierOwner = identity, + identifier = queryIdentifier + ) + } + val flow1 = + resultBroadcast.output.filterIsInstance>() + .filter { it.resultIdentifier == queryIdentifier && it.key == contract.key } + val flow2 = + resultBroadcast.output.filterIsInstance>() + .filter { it.resultIdentifier == queryIdentifier && it.key == contract.key } + combine(flow1, flow2) { checkpoint1, checkpoint2 -> + frequencyWithArgsTracker[contract] = checkpoint1.data + frequencyWithArgsTracker[contract2] = checkpoint2.data + resumeBlock.invoke(this, checkpoint1.data, checkpoint2.data) + } + } + } + } + } + override suspend fun validate(contract: Contract.Checkpoint) { val identifier = checkpointTrackerProvider()?.useIdentifier(contract) ?: return outGoingQuery.send( From 3b303102a95bd47fdd34a819434644ab90996752 Mon Sep 17 00:00:00 2001 From: Yigit Ozgumus Date: Mon, 23 Sep 2024 18:02:06 +0300 Subject: [PATCH 15/17] Add implementation of 2 contract version to ScopeBuilder --- .../transformer/CommunicationScopeBuilder.kt | 11 +++++++++++ 1 file changed, 11 insertions(+) diff --git a/transmission/src/main/java/com/trendyol/transmission/transformer/CommunicationScopeBuilder.kt b/transmission/src/main/java/com/trendyol/transmission/transformer/CommunicationScopeBuilder.kt index c485bc7..1733e5b 100644 --- a/transmission/src/main/java/com/trendyol/transmission/transformer/CommunicationScopeBuilder.kt +++ b/transmission/src/main/java/com/trendyol/transmission/transformer/CommunicationScopeBuilder.kt @@ -87,6 +87,17 @@ internal class CommunicationScopeBuilder( } } + @ExperimentalTransmissionApi + override suspend fun , C2 : Contract.CheckpointWithArgs, A : Any, B : Any> CommunicationScope.pauseOn( + contract: C, + contract2: C2, + resumeBlock: suspend CommunicationScope.(A, B) -> Unit + ) { + with(requestDelegate.checkpointHandler) { + pauseOn(contract, contract2, resumeBlock) + } + } + override suspend fun validate(contract: Contract.Checkpoint) { requestDelegate.checkpointHandler.validate(contract) } From 12137ced72c666b929539468612df0f462c3c9f1 Mon Sep 17 00:00:00 2001 From: Yigit Ozgumus Date: Mon, 23 Sep 2024 23:26:20 +0300 Subject: [PATCH 16/17] Cleanup API and update documentation --- .../features/input/InputTransformer.kt | 1 - .../transmissiontest/TransformerTestScope.kt | 1 - .../transmission/effect/EffectWrapper.kt | 2 +- .../identifier/IdentifierGenerator.kt | 5 ++- .../trendyol/transmission/router/Broadcast.kt | 8 ++-- .../transmission/router/RequestDelegate.kt | 8 ++-- .../transmission/router/TransmissionRouter.kt | 4 +- .../checkpoint/CheckpointHandler.kt | 37 ++++++++++++++++++- .../checkpoint/CheckpointTracker.kt | 11 ++---- .../transformer/checkpoint/Frequency.kt | 14 ++++++- .../dataholder/TransmissionDataHolder.kt | 1 - .../transformer/request/RequestHandler.kt | 27 ++++++++++++++ .../request/TransformerRequestDelegate.kt | 37 +++++++++++-------- 13 files changed, 118 insertions(+), 38 deletions(-) diff --git a/sample/src/main/java/com/trendyol/transmission/components/features/input/InputTransformer.kt b/sample/src/main/java/com/trendyol/transmission/components/features/input/InputTransformer.kt index a343eac..0f7e909 100644 --- a/sample/src/main/java/com/trendyol/transmission/components/features/input/InputTransformer.kt +++ b/sample/src/main/java/com/trendyol/transmission/components/features/input/InputTransformer.kt @@ -52,7 +52,6 @@ class InputTransformer @Inject constructor( ) publish(effect = InputEffect.InputUpdate(signal.value)) } - pauseOn(colorCheckpoint, colorCheckpoint) {} } onEffect { effect -> validate(colorCheckpoint, effect.color) diff --git a/transmission-test/src/main/java/com/trendyol/transmissiontest/TransformerTestScope.kt b/transmission-test/src/main/java/com/trendyol/transmissiontest/TransformerTestScope.kt index 3fbd639..b01f656 100644 --- a/transmission-test/src/main/java/com/trendyol/transmissiontest/TransformerTestScope.kt +++ b/transmission-test/src/main/java/com/trendyol/transmissiontest/TransformerTestScope.kt @@ -1,7 +1,6 @@ package com.trendyol.transmissiontest import com.trendyol.transmission.Transmission -import com.trendyol.transmission.effect.EffectWrapper interface TransformerTestScope { val dataStream: List diff --git a/transmission/src/main/java/com/trendyol/transmission/effect/EffectWrapper.kt b/transmission/src/main/java/com/trendyol/transmission/effect/EffectWrapper.kt index dae229f..6994ed3 100644 --- a/transmission/src/main/java/com/trendyol/transmission/effect/EffectWrapper.kt +++ b/transmission/src/main/java/com/trendyol/transmission/effect/EffectWrapper.kt @@ -3,7 +3,7 @@ package com.trendyol.transmission.effect import com.trendyol.transmission.Transmission import com.trendyol.transmission.transformer.request.Contract -data class EffectWrapper( +internal data class EffectWrapper( val effect: Transmission.Effect, val identity: Contract.Identity? = null ) diff --git a/transmission/src/main/java/com/trendyol/transmission/identifier/IdentifierGenerator.kt b/transmission/src/main/java/com/trendyol/transmission/identifier/IdentifierGenerator.kt index c6416b4..772cb20 100644 --- a/transmission/src/main/java/com/trendyol/transmission/identifier/IdentifierGenerator.kt +++ b/transmission/src/main/java/com/trendyol/transmission/identifier/IdentifierGenerator.kt @@ -3,9 +3,12 @@ package com.trendyol.transmission.identifier import kotlin.uuid.ExperimentalUuidApi import kotlin.uuid.Uuid +/** + * Generates identifier for internal communication. + */ @OptIn(ExperimentalUuidApi::class) internal object IdentifierGenerator { fun generateIdentifier(): String { return Uuid.random().toString() } -} \ No newline at end of file +} diff --git a/transmission/src/main/java/com/trendyol/transmission/router/Broadcast.kt b/transmission/src/main/java/com/trendyol/transmission/router/Broadcast.kt index 98e3b22..64f3470 100644 --- a/transmission/src/main/java/com/trendyol/transmission/router/Broadcast.kt +++ b/transmission/src/main/java/com/trendyol/transmission/router/Broadcast.kt @@ -8,16 +8,18 @@ import kotlinx.coroutines.flow.SharingStarted import kotlinx.coroutines.flow.receiveAsFlow import kotlinx.coroutines.flow.shareIn -interface Broadcast { +internal interface Broadcast { val producer: SendChannel val output: SharedFlow } -fun CoroutineScope.createBroadcast(): Broadcast = object : Broadcast { +internal fun CoroutineScope.createBroadcast(): Broadcast = object : Broadcast { + private val _source = Channel(capacity = Channel.BUFFERED) override val producer: SendChannel = _source override val output by lazy { - _source.receiveAsFlow().shareIn(this@createBroadcast, SharingStarted.Lazily) + _source.receiveAsFlow() + .shareIn(this@createBroadcast, SharingStarted.Lazily) } } diff --git a/transmission/src/main/java/com/trendyol/transmission/router/RequestDelegate.kt b/transmission/src/main/java/com/trendyol/transmission/router/RequestDelegate.kt index 0f7f27f..9242230 100644 --- a/transmission/src/main/java/com/trendyol/transmission/router/RequestDelegate.kt +++ b/transmission/src/main/java/com/trendyol/transmission/router/RequestDelegate.kt @@ -53,13 +53,13 @@ internal class RequestDelegate( is Query.ComputationWithArgs<*> -> processComputationQueryWithArgs(query) is Query.Execution -> processExecution(query) is Query.ExecutionWithArgs<*> -> processExecutionWithArgs(query) - is Query.Checkpoint<*> -> processBarrier(query) + is Query.Checkpoint<*> -> processCheckpoint(query) } } - private fun processBarrier(query: Query.Checkpoint) = queryScope.launch { + private fun processCheckpoint(query: Query.Checkpoint) = queryScope.launch { queryResultChannel.send( - QueryResult.Checkpoint( + QueryResult.Checkpoint( owner = query.sender, key = query.key, data = query.args, @@ -185,7 +185,7 @@ internal class RequestDelegate( is Query.ComputationWithArgs<*> -> testComputationQueryWithArgs(query) is Query.Execution -> {} is Query.ExecutionWithArgs<*> -> {} - is Query.Checkpoint<*> -> TODO() + is Query.Checkpoint<*> -> {} } } diff --git a/transmission/src/main/java/com/trendyol/transmission/router/TransmissionRouter.kt b/transmission/src/main/java/com/trendyol/transmission/router/TransmissionRouter.kt index f4a7678..eab4fb6 100644 --- a/transmission/src/main/java/com/trendyol/transmission/router/TransmissionRouter.kt +++ b/transmission/src/main/java/com/trendyol/transmission/router/TransmissionRouter.kt @@ -46,7 +46,8 @@ class TransmissionRouter internal constructor( private val checkpointTracker = CheckpointTracker() val dataStream = dataBroadcast.output - val effectStream: SharedFlow = effectBroadcast.output.map { it.effect } + val effectStream: SharedFlow = effectBroadcast.output + .map { it.effect } .shareIn(routerScope, SharingStarted.Lazily) private val _requestDelegate = RequestDelegate( @@ -54,6 +55,7 @@ class TransmissionRouter internal constructor( routerRef = this@TransmissionRouter, registry = registryScope ) + val requestHelper: RequestHandler = _requestDelegate init { diff --git a/transmission/src/main/java/com/trendyol/transmission/transformer/checkpoint/CheckpointHandler.kt b/transmission/src/main/java/com/trendyol/transmission/transformer/checkpoint/CheckpointHandler.kt index 1051e4e..205b889 100644 --- a/transmission/src/main/java/com/trendyol/transmission/transformer/checkpoint/CheckpointHandler.kt +++ b/transmission/src/main/java/com/trendyol/transmission/transformer/checkpoint/CheckpointHandler.kt @@ -1,29 +1,59 @@ package com.trendyol.transmission.transformer.checkpoint import com.trendyol.transmission.ExperimentalTransmissionApi +import com.trendyol.transmission.Transmission import com.trendyol.transmission.transformer.handler.CommunicationScope import com.trendyol.transmission.transformer.request.Contract interface CheckpointHandler { + /** + * Pauses the processing of a [Transmission.Signal] or [Transmission.Effect] using + * the given [Contract.Checkpoint]. + * @param contract Checkpoint to check for pause condition + * @param resumeBlock execution block that will run after [contract] is validated + */ @ExperimentalTransmissionApi suspend fun CommunicationScope.pauseOn( contract: Contract.Checkpoint, resumeBlock: suspend CommunicationScope.() -> Unit ) + /** + * Pauses the processing of a [Transmission.Signal] or [Transmission.Effect] using + * the given [Contract.Checkpoint]s. + * @param contract Checkpoints to check for pause condition. + * @param resumeBlock execution block that will run after [contract] is validated + * @throws IllegalStateException when [Contract.Checkpoint]s have different frequency + * @throws IllegalStateException when no [Contract.Checkpoint] is supplied + */ @ExperimentalTransmissionApi suspend fun CommunicationScope.pauseOn( vararg contract: Contract.Checkpoint, resumeBlock: suspend CommunicationScope.() -> Unit ) + /** + * Pauses the processing of a [Transmission.Signal] or [Transmission.Effect] using + * the given [Contract.CheckpointWithArgs]s. + * @param contract Checkpoint with Args to check for pause condition. + * @param resumeBlock execution block that will run after [contract] is validated. It accepts + * the type of the [Contract.CheckpointWithArgs] as argument. + */ @ExperimentalTransmissionApi suspend fun , A : Any> CommunicationScope.pauseOn( contract: C, resumeBlock: suspend CommunicationScope.(args: A) -> Unit ) + /** + * Pauses the processing of a [Transmission.Signal] or [Transmission.Effect] using + * the given [Contract.CheckpointWithArgs]s. + * @param contract Checkpoint with Args to check for pause condition. + * @param contract2 Second Checkpoint with Args to check for pause condition. + * @param resumeBlock execution block that will run after both [contract] and [contract2] is + * validated. It accepts the type of the [Contract.CheckpointWithArgs] as arguments. + */ @ExperimentalTransmissionApi suspend fun , C2 : Contract.CheckpointWithArgs, A : Any, B : Any> CommunicationScope.pauseOn( contract: C, @@ -31,10 +61,15 @@ interface CheckpointHandler { resumeBlock: suspend CommunicationScope.(A, B) -> Unit ) + /** + * Validates the given [Contract.Checkpoint] and resumes the execution added with [pauseOn] + */ @ExperimentalTransmissionApi suspend fun validate(contract: Contract.Checkpoint) + /** + * Validates the given [Contract.CheckpointWithArgs] and resumes the execution added with [pauseOn] + */ @ExperimentalTransmissionApi suspend fun , A : Any> validate(contract: C, args: A) } - diff --git a/transmission/src/main/java/com/trendyol/transmission/transformer/checkpoint/CheckpointTracker.kt b/transmission/src/main/java/com/trendyol/transmission/transformer/checkpoint/CheckpointTracker.kt index e68799d..748532a 100644 --- a/transmission/src/main/java/com/trendyol/transmission/transformer/checkpoint/CheckpointTracker.kt +++ b/transmission/src/main/java/com/trendyol/transmission/transformer/checkpoint/CheckpointTracker.kt @@ -8,19 +8,14 @@ internal class CheckpointTracker { private val tracker: ConcurrentMap> = ConcurrentHashMap() - fun putOrCreate(contract: Contract, barrierOwner: Contract.Identity, identifier: String) { + fun putOrCreate(contract: Contract, checkpointOwner: Contract.Identity, identifier: String) { tracker .putIfAbsent( contract, ArrayDeque().apply { - addLast( - IdentifierBundle( - barrierOwner, - identifier - ) - ) + addLast(IdentifierBundle(checkpointOwner, identifier)) }) - ?.addLast(IdentifierBundle(barrierOwner, identifier)) + ?.addLast(IdentifierBundle(checkpointOwner, identifier)) } fun useIdentifier(contract: Contract): IdentifierBundle? { diff --git a/transmission/src/main/java/com/trendyol/transmission/transformer/checkpoint/Frequency.kt b/transmission/src/main/java/com/trendyol/transmission/transformer/checkpoint/Frequency.kt index 04d3028..c02d9ba 100644 --- a/transmission/src/main/java/com/trendyol/transmission/transformer/checkpoint/Frequency.kt +++ b/transmission/src/main/java/com/trendyol/transmission/transformer/checkpoint/Frequency.kt @@ -1,7 +1,19 @@ package com.trendyol.transmission.transformer.checkpoint +import com.trendyol.transmission.transformer.request.Contract +/** + * Indicator of how frequent [Contract.Checkpoint] and [Contract.CheckpointWithArgs] will be + * validated. + */ sealed interface Frequency { + /** + * Validates the checkpoint only once + */ data object Once : Frequency - data object Continous : Frequency + + /** + * Validates the checkpoint each time execution encounters + */ + data object Continuous : Frequency } diff --git a/transmission/src/main/java/com/trendyol/transmission/transformer/dataholder/TransmissionDataHolder.kt b/transmission/src/main/java/com/trendyol/transmission/transformer/dataholder/TransmissionDataHolder.kt index 965563b..f852813 100644 --- a/transmission/src/main/java/com/trendyol/transmission/transformer/dataholder/TransmissionDataHolder.kt +++ b/transmission/src/main/java/com/trendyol/transmission/transformer/dataholder/TransmissionDataHolder.kt @@ -16,7 +16,6 @@ interface TransmissionDataHolder { suspend fun updateAndGet(updater: (T) -> @UnsafeVariance T): T } -@PublishedApi internal class TransmissionDataHolderImpl( initialValue: T, publishUpdates: Boolean, diff --git a/transmission/src/main/java/com/trendyol/transmission/transformer/request/RequestHandler.kt b/transmission/src/main/java/com/trendyol/transmission/transformer/request/RequestHandler.kt index c0d4ce9..c608bff 100644 --- a/transmission/src/main/java/com/trendyol/transmission/transformer/request/RequestHandler.kt +++ b/transmission/src/main/java/com/trendyol/transmission/transformer/request/RequestHandler.kt @@ -1,23 +1,50 @@ package com.trendyol.transmission.transformer.request import com.trendyol.transmission.Transmission +import com.trendyol.transmission.transformer.Transformer interface RequestHandler { + /** + * Gets the data using the provided [Contract.DataHolder] + * @param contract DataHolder Contract to be sent + */ suspend fun , D : Transmission.Data> getData(contract: C): D? + /** + * Starts computation in the target [Transformer] and returns the result data. + * @param contract Computation Contract to be sent + * @param invalidate if the Computation is cached, this invalidates the result make it compute + * again. If it is not cached, it doesn't have any effect. + */ suspend fun , D : Any> compute( contract: C, invalidate: Boolean = false, ): D? + /** + * Starts computation in the target [Transformer] and returns the result data. + * @param contract Computation Contract to be sent + * @param args data required by Computation + * @param invalidate if the Computation is cached, this invalidates the result make it compute + * again. If it is not cached, it doesn't have any effect. + */ suspend fun , A : Any, D : Any> compute( contract: C, args: A, invalidate: Boolean = false, ): D? + /** + * Starts Execution in the target [Transformer] + * @param contract Execution Contract to be sent + */ suspend fun execute(contract: Contract.Execution) + /** + * Starts Execution in the target [Transformer] + * @param contract Execution Contract to be sent + * @param args data required by Execution + */ suspend fun , A : Any> execute(contract: C, args: A) } diff --git a/transmission/src/main/java/com/trendyol/transmission/transformer/request/TransformerRequestDelegate.kt b/transmission/src/main/java/com/trendyol/transmission/transformer/request/TransformerRequestDelegate.kt index eadb18e..0b86e70 100644 --- a/transmission/src/main/java/com/trendyol/transmission/transformer/request/TransformerRequestDelegate.kt +++ b/transmission/src/main/java/com/trendyol/transmission/transformer/request/TransformerRequestDelegate.kt @@ -41,11 +41,11 @@ internal class TransformerRequestDelegate( resumeBlock: suspend CommunicationScope.() -> Unit ) { when (contract.frequency) { - Frequency.Continous -> { + Frequency.Continuous -> { val queryIdentifier = IdentifierGenerator.generateIdentifier() checkpointTrackerProvider()?.putOrCreate( contract = contract, - barrierOwner = identity, + checkpointOwner = identity, identifier = queryIdentifier ) resultBroadcast.output.filterIsInstance>() @@ -62,7 +62,7 @@ internal class TransformerRequestDelegate( val queryIdentifier = IdentifierGenerator.generateIdentifier() checkpointTrackerProvider()?.putOrCreate( contract = contract, - barrierOwner = identity, + checkpointOwner = identity, identifier = queryIdentifier ) frequencyTracker.add(contract) @@ -83,21 +83,21 @@ internal class TransformerRequestDelegate( ) { val contractList = contract.toList() check(contractList.distinctBy { it.frequency }.size == 1) { - "All Barriers should have the same frequency" + "All Checkpoints should have the same frequency" } check(contractList.isNotEmpty()) { - "At least one barrier should be provided" + "At least one checkpoint should be provided" } check(contractList.toSet().size == contractList.size) { - "All Barrier Contracts should be unique" + "All Checkpoint Contracts should be unique" } when (contractList.first().frequency) { - Frequency.Continous -> { + Frequency.Continuous -> { val queryIdentifier = IdentifierGenerator.generateIdentifier() contractList.forEach { internalContract -> checkpointTrackerProvider()?.putOrCreate( contract = internalContract, - barrierOwner = identity, + checkpointOwner = identity, identifier = queryIdentifier ) } @@ -118,7 +118,7 @@ internal class TransformerRequestDelegate( contractList.forEach { internalContract -> checkpointTrackerProvider()?.putOrCreate( contract = internalContract, - barrierOwner = identity, + checkpointOwner = identity, identifier = queryIdentifier ) } @@ -140,11 +140,11 @@ internal class TransformerRequestDelegate( resumeBlock: suspend CommunicationScope.(args: A) -> Unit ) { when (contract.frequency) { - Frequency.Continous -> { + Frequency.Continuous -> { val queryIdentifier = IdentifierGenerator.generateIdentifier() checkpointTrackerProvider()?.putOrCreate( contract = contract, - barrierOwner = identity, + checkpointOwner = identity, identifier = queryIdentifier ) resultBroadcast.output.filterIsInstance>() @@ -162,7 +162,7 @@ internal class TransformerRequestDelegate( val queryIdentifier = IdentifierGenerator.generateIdentifier() checkpointTrackerProvider()?.putOrCreate( contract = contract, - barrierOwner = identity, + checkpointOwner = identity, identifier = queryIdentifier ) resultBroadcast.output.filterIsInstance>() @@ -182,13 +182,20 @@ internal class TransformerRequestDelegate( contract2: C2, resumeBlock: suspend CommunicationScope.(A, B) -> Unit ) { + val contractList = listOf(contract, contract2) + check(contractList.distinctBy { it.frequency }.size == 1) { + "All Checkpoint should have the same frequency" + } + check(contractList.toSet().size == contractList.size) { + "All Checkpoint Contracts should be unique" + } when (contract.frequency) { - Frequency.Continous -> { + Frequency.Continuous -> { val queryIdentifier = IdentifierGenerator.generateIdentifier() listOf(contract, contract2).forEach { checkpointTrackerProvider()?.putOrCreate( contract = it, - barrierOwner = identity, + checkpointOwner = identity, identifier = queryIdentifier ) } @@ -219,7 +226,7 @@ internal class TransformerRequestDelegate( listOf(contract, contract2).forEach { checkpointTrackerProvider()?.putOrCreate( contract = it, - barrierOwner = identity, + checkpointOwner = identity, identifier = queryIdentifier ) } From 9b112821efad9986d8bfa241aedf26638ee23621 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Yi=C4=9Fit=20=C3=96zg=C3=BCm=C3=BC=C5=9F?= Date: Wed, 16 Oct 2024 22:28:50 +0300 Subject: [PATCH 17/17] Simplify checkpoint API implementation - Remove Frequency types - Update validating checkpoint mechanism --- .../features/input/InputTransformer.kt | 13 +- .../transmission/router/RequestDelegate.kt | 13 - .../transformer/CommunicationScopeBuilder.kt | 37 +-- .../checkpoint/CheckpointHandler.kt | 39 +-- .../checkpoint/CheckpointTracker.kt | 41 ++- .../checkpoint/CheckpointValidator.kt | 9 + .../transformer/checkpoint/Frequency.kt | 19 -- .../transformer/request/Contract.kt | 21 +- .../transformer/request/ContractExt.kt | 13 +- .../transmission/transformer/request/Query.kt | 6 - .../request/TransformerRequestDelegate.kt | 281 +++++------------- 11 files changed, 152 insertions(+), 340 deletions(-) create mode 100644 transmission/src/main/java/com/trendyol/transmission/transformer/checkpoint/CheckpointValidator.kt delete mode 100644 transmission/src/main/java/com/trendyol/transmission/transformer/checkpoint/Frequency.kt diff --git a/sample/src/main/java/com/trendyol/transmission/components/features/input/InputTransformer.kt b/sample/src/main/java/com/trendyol/transmission/components/features/input/InputTransformer.kt index e2bfb5c..477691d 100644 --- a/sample/src/main/java/com/trendyol/transmission/components/features/input/InputTransformer.kt +++ b/sample/src/main/java/com/trendyol/transmission/components/features/input/InputTransformer.kt @@ -45,13 +45,12 @@ class InputTransformer @Inject constructor( override val handlers: Handlers = createHandlers { onSignal { signal -> holder.update { it.copy(writtenText = signal.value) } - pauseOn(colorCheckpoint) { color -> - send( - effect = ColorPickerEffect.SelectedColorUpdate(color), - identity = multiOutputTransformerIdentity - ) - publish(effect = InputEffect.InputUpdate(signal.value)) - } + val color = pauseOn(colorCheckpoint) + send( + effect = ColorPickerEffect.SelectedColorUpdate(color), + identity = multiOutputTransformerIdentity + ) + publish(effect = InputEffect.InputUpdate(signal.value)) } onEffect { effect -> validate(colorCheckpoint, effect.color) diff --git a/transmission/src/main/java/com/trendyol/transmission/router/RequestDelegate.kt b/transmission/src/main/java/com/trendyol/transmission/router/RequestDelegate.kt index 9242230..161e5ab 100644 --- a/transmission/src/main/java/com/trendyol/transmission/router/RequestDelegate.kt +++ b/transmission/src/main/java/com/trendyol/transmission/router/RequestDelegate.kt @@ -53,21 +53,9 @@ internal class RequestDelegate( is Query.ComputationWithArgs<*> -> processComputationQueryWithArgs(query) is Query.Execution -> processExecution(query) is Query.ExecutionWithArgs<*> -> processExecutionWithArgs(query) - is Query.Checkpoint<*> -> processCheckpoint(query) } } - private fun processCheckpoint(query: Query.Checkpoint) = queryScope.launch { - queryResultChannel.send( - QueryResult.Checkpoint( - owner = query.sender, - key = query.key, - data = query.args, - resultIdentifier = query.queryIdentifier - ) - ) - } - private fun processDataQuery( query: Query.Data, ) = queryScope.launch { @@ -185,7 +173,6 @@ internal class RequestDelegate( is Query.ComputationWithArgs<*> -> testComputationQueryWithArgs(query) is Query.Execution -> {} is Query.ExecutionWithArgs<*> -> {} - is Query.Checkpoint<*> -> {} } } diff --git a/transmission/src/main/java/com/trendyol/transmission/transformer/CommunicationScopeBuilder.kt b/transmission/src/main/java/com/trendyol/transmission/transformer/CommunicationScopeBuilder.kt index 1733e5b..db6c627 100644 --- a/transmission/src/main/java/com/trendyol/transmission/transformer/CommunicationScopeBuilder.kt +++ b/transmission/src/main/java/com/trendyol/transmission/transformer/CommunicationScopeBuilder.kt @@ -60,49 +60,38 @@ internal class CommunicationScopeBuilder( requestDelegate.requestHandler.execute(contract, args) } + @ExperimentalTransmissionApi override suspend fun CommunicationScope.pauseOn( - contract: Contract.Checkpoint, - resumeBlock: suspend CommunicationScope.() -> Unit + contract: Contract.Checkpoint.Default ) { with(requestDelegate.checkpointHandler) { - pauseOn(contract, resumeBlock) + pauseOn(contract) } } + @ExperimentalTransmissionApi override suspend fun CommunicationScope.pauseOn( - vararg contract: Contract.Checkpoint, - resumeBlock: suspend CommunicationScope.() -> Unit + vararg contract: Contract.Checkpoint.Default ) { with(requestDelegate.checkpointHandler) { - pauseOn(contract = contract, resumeBlock = resumeBlock) - } - } - - override suspend fun , A : Any> CommunicationScope.pauseOn( - contract: C, - resumeBlock: suspend CommunicationScope.(args: A) -> Unit - ) { - with(requestDelegate.checkpointHandler) { - pauseOn(contract, resumeBlock) + pauseOn(*contract) } } @ExperimentalTransmissionApi - override suspend fun , C2 : Contract.CheckpointWithArgs, A : Any, B : Any> CommunicationScope.pauseOn( - contract: C, - contract2: C2, - resumeBlock: suspend CommunicationScope.(A, B) -> Unit - ) { - with(requestDelegate.checkpointHandler) { - pauseOn(contract, contract2, resumeBlock) + override suspend fun , A : Any> CommunicationScope.pauseOn( + contract: C + ): A { + return with(requestDelegate.checkpointHandler) { + pauseOn(contract) } } - override suspend fun validate(contract: Contract.Checkpoint) { + override suspend fun validate(contract: Contract.Checkpoint.Default) { requestDelegate.checkpointHandler.validate(contract) } - override suspend fun , A : Any> validate( + override suspend fun , A : Any> validate( contract: C, args: A ) { diff --git a/transmission/src/main/java/com/trendyol/transmission/transformer/checkpoint/CheckpointHandler.kt b/transmission/src/main/java/com/trendyol/transmission/transformer/checkpoint/CheckpointHandler.kt index 205b889..f349db9 100644 --- a/transmission/src/main/java/com/trendyol/transmission/transformer/checkpoint/CheckpointHandler.kt +++ b/transmission/src/main/java/com/trendyol/transmission/transformer/checkpoint/CheckpointHandler.kt @@ -1,7 +1,6 @@ package com.trendyol.transmission.transformer.checkpoint import com.trendyol.transmission.ExperimentalTransmissionApi -import com.trendyol.transmission.Transmission import com.trendyol.transmission.transformer.handler.CommunicationScope import com.trendyol.transmission.transformer.request.Contract @@ -11,65 +10,45 @@ interface CheckpointHandler { * Pauses the processing of a [Transmission.Signal] or [Transmission.Effect] using * the given [Contract.Checkpoint]. * @param contract Checkpoint to check for pause condition - * @param resumeBlock execution block that will run after [contract] is validated + * @return Unit after the [contract] is validated */ @ExperimentalTransmissionApi suspend fun CommunicationScope.pauseOn( - contract: Contract.Checkpoint, - resumeBlock: suspend CommunicationScope.() -> Unit + contract: Contract.Checkpoint.Default, ) /** * Pauses the processing of a [Transmission.Signal] or [Transmission.Effect] using * the given [Contract.Checkpoint]s. * @param contract Checkpoints to check for pause condition. - * @param resumeBlock execution block that will run after [contract] is validated - * @throws IllegalStateException when [Contract.Checkpoint]s have different frequency + * @return Unit after the [contract]s are validated * @throws IllegalStateException when no [Contract.Checkpoint] is supplied */ @ExperimentalTransmissionApi suspend fun CommunicationScope.pauseOn( - vararg contract: Contract.Checkpoint, - resumeBlock: suspend CommunicationScope.() -> Unit + vararg contract: Contract.Checkpoint.Default, ) /** * Pauses the processing of a [Transmission.Signal] or [Transmission.Effect] using * the given [Contract.CheckpointWithArgs]s. * @param contract Checkpoint with Args to check for pause condition. - * @param resumeBlock execution block that will run after [contract] is validated. It accepts - * the type of the [Contract.CheckpointWithArgs] as argument. + * @return the type of argument depicted in [contract] after the checkpoint is validated */ @ExperimentalTransmissionApi - suspend fun , A : Any> CommunicationScope.pauseOn( + suspend fun , A : Any> CommunicationScope.pauseOn( contract: C, - resumeBlock: suspend CommunicationScope.(args: A) -> Unit - ) - - /** - * Pauses the processing of a [Transmission.Signal] or [Transmission.Effect] using - * the given [Contract.CheckpointWithArgs]s. - * @param contract Checkpoint with Args to check for pause condition. - * @param contract2 Second Checkpoint with Args to check for pause condition. - * @param resumeBlock execution block that will run after both [contract] and [contract2] is - * validated. It accepts the type of the [Contract.CheckpointWithArgs] as arguments. - */ - @ExperimentalTransmissionApi - suspend fun , C2 : Contract.CheckpointWithArgs, A : Any, B : Any> CommunicationScope.pauseOn( - contract: C, - contract2: C2, - resumeBlock: suspend CommunicationScope.(A, B) -> Unit - ) + ): A /** * Validates the given [Contract.Checkpoint] and resumes the execution added with [pauseOn] */ @ExperimentalTransmissionApi - suspend fun validate(contract: Contract.Checkpoint) + suspend fun validate(contract: Contract.Checkpoint.Default) /** * Validates the given [Contract.CheckpointWithArgs] and resumes the execution added with [pauseOn] */ @ExperimentalTransmissionApi - suspend fun , A : Any> validate(contract: C, args: A) + suspend fun , A : Any> validate(contract: C, args: A) } diff --git a/transmission/src/main/java/com/trendyol/transmission/transformer/checkpoint/CheckpointTracker.kt b/transmission/src/main/java/com/trendyol/transmission/transformer/checkpoint/CheckpointTracker.kt index 748532a..f542158 100644 --- a/transmission/src/main/java/com/trendyol/transmission/transformer/checkpoint/CheckpointTracker.kt +++ b/transmission/src/main/java/com/trendyol/transmission/transformer/checkpoint/CheckpointTracker.kt @@ -1,26 +1,43 @@ package com.trendyol.transmission.transformer.checkpoint +import com.trendyol.transmission.InternalTransmissionApi import com.trendyol.transmission.transformer.request.Contract import java.util.concurrent.ConcurrentHashMap import java.util.concurrent.ConcurrentMap +@OptIn(InternalTransmissionApi::class) internal class CheckpointTracker { - private val tracker: ConcurrentMap> = + private val tracker: ConcurrentMap>> = ConcurrentHashMap() + private val contractTracker: ConcurrentMap = ConcurrentHashMap() - fun putOrCreate(contract: Contract, checkpointOwner: Contract.Identity, identifier: String) { + fun registerContract(contract: Contract.Checkpoint, identifier: String) { + contractTracker.put(contract, identifier) + } + + fun putOrCreate( + identifier: String, + validator: CheckpointValidator + ) { tracker - .putIfAbsent( - contract, - ArrayDeque().apply { - addLast(IdentifierBundle(checkpointOwner, identifier)) - }) - ?.addLast(IdentifierBundle(checkpointOwner, identifier)) + .putIfAbsent(identifier, ArrayDeque>().apply { + addLast(validator) + })?.addLast(validator) } - fun useIdentifier(contract: Contract): IdentifierBundle? { - return tracker[contract]?.removeFirstOrNull() + fun useValidator(contract: Contract.Checkpoint): CheckpointValidator? { + val identifier = contractTracker[contract] ?: return null + return tracker[identifier]?.firstOrNull() as? CheckpointValidator } -} -internal class IdentifierBundle(val barrierOwner: Contract.Identity, val value: String) + fun removeValidator(contract: Contract.Checkpoint) { + val identifier = contractTracker[contract] ?: return + val contractsToRemove = contractTracker.entries + .filter { it.value == identifier } + .map { it.key } + contractsToRemove.forEach { + contractTracker.remove(it) + } + tracker[identifier]?.removeFirstOrNull() + } +} diff --git a/transmission/src/main/java/com/trendyol/transmission/transformer/checkpoint/CheckpointValidator.kt b/transmission/src/main/java/com/trendyol/transmission/transformer/checkpoint/CheckpointValidator.kt new file mode 100644 index 0000000..ccf5d23 --- /dev/null +++ b/transmission/src/main/java/com/trendyol/transmission/transformer/checkpoint/CheckpointValidator.kt @@ -0,0 +1,9 @@ +package com.trendyol.transmission.transformer.checkpoint + +import com.trendyol.transmission.InternalTransmissionApi +import com.trendyol.transmission.transformer.request.Contract + +@InternalTransmissionApi +interface CheckpointValidator { + suspend fun validate(contract: C, args: A): Boolean +} \ No newline at end of file diff --git a/transmission/src/main/java/com/trendyol/transmission/transformer/checkpoint/Frequency.kt b/transmission/src/main/java/com/trendyol/transmission/transformer/checkpoint/Frequency.kt deleted file mode 100644 index c02d9ba..0000000 --- a/transmission/src/main/java/com/trendyol/transmission/transformer/checkpoint/Frequency.kt +++ /dev/null @@ -1,19 +0,0 @@ -package com.trendyol.transmission.transformer.checkpoint - -import com.trendyol.transmission.transformer.request.Contract - -/** - * Indicator of how frequent [Contract.Checkpoint] and [Contract.CheckpointWithArgs] will be - * validated. - */ -sealed interface Frequency { - /** - * Validates the checkpoint only once - */ - data object Once : Frequency - - /** - * Validates the checkpoint each time execution encounters - */ - data object Continuous : Frequency -} diff --git a/transmission/src/main/java/com/trendyol/transmission/transformer/request/Contract.kt b/transmission/src/main/java/com/trendyol/transmission/transformer/request/Contract.kt index 7aa40a4..bbddd8b 100644 --- a/transmission/src/main/java/com/trendyol/transmission/transformer/request/Contract.kt +++ b/transmission/src/main/java/com/trendyol/transmission/transformer/request/Contract.kt @@ -1,7 +1,6 @@ package com.trendyol.transmission.transformer.request import com.trendyol.transmission.Transmission -import com.trendyol.transmission.transformer.checkpoint.Frequency sealed interface Contract { @@ -28,13 +27,15 @@ sealed interface Contract { internal val key: String ) : Contract - class Checkpoint internal constructor( - internal val key: String, - internal val frequency: Frequency - ) : Contract - - class CheckpointWithArgs internal constructor( - internal val key: String, - internal val frequency: Frequency - ) : Contract + sealed class Checkpoint( + internal open val key: String, + ) : Contract { + class Default internal constructor( + internal override val key: String, + ) : Checkpoint(key) + + class WithArgs internal constructor( + internal override val key: String, + ) : Checkpoint(key) + } } diff --git a/transmission/src/main/java/com/trendyol/transmission/transformer/request/ContractExt.kt b/transmission/src/main/java/com/trendyol/transmission/transformer/request/ContractExt.kt index b229401..594a09c 100644 --- a/transmission/src/main/java/com/trendyol/transmission/transformer/request/ContractExt.kt +++ b/transmission/src/main/java/com/trendyol/transmission/transformer/request/ContractExt.kt @@ -3,7 +3,6 @@ package com.trendyol.transmission.transformer.request import com.trendyol.transmission.ExperimentalTransmissionApi import com.trendyol.transmission.Transmission import com.trendyol.transmission.identifier.IdentifierGenerator -import com.trendyol.transmission.transformer.checkpoint.Frequency object Contracts @@ -42,19 +41,15 @@ fun Contracts.executionWithArgs(): Contract.ExecutionWithArgs { } @ExperimentalTransmissionApi -fun Contracts.checkpoint(frequency: Frequency = Frequency.Once): Contract.Checkpoint { - return Contract.Checkpoint( +fun Contracts.checkpoint(): Contract.Checkpoint.Default { + return Contract.Checkpoint.Default( key = IdentifierGenerator.generateIdentifier(), - frequency = frequency ) } @ExperimentalTransmissionApi -fun Contracts.checkpointWithArgs( - frequency: Frequency = Frequency.Once -): Contract.CheckpointWithArgs { - return Contract.CheckpointWithArgs( +fun Contracts.checkpointWithArgs(): Contract.Checkpoint.WithArgs { + return Contract.Checkpoint.WithArgs( key = IdentifierGenerator.generateIdentifier(), - frequency = frequency ) } diff --git a/transmission/src/main/java/com/trendyol/transmission/transformer/request/Query.kt b/transmission/src/main/java/com/trendyol/transmission/transformer/request/Query.kt index b841da9..2d2a8b5 100644 --- a/transmission/src/main/java/com/trendyol/transmission/transformer/request/Query.kt +++ b/transmission/src/main/java/com/trendyol/transmission/transformer/request/Query.kt @@ -32,10 +32,4 @@ internal sealed interface Query { val args: A, ) : Query - class Checkpoint( - val sender: String, - val key: String, - val args: A, - val queryIdentifier: String, - ) : Query } diff --git a/transmission/src/main/java/com/trendyol/transmission/transformer/request/TransformerRequestDelegate.kt b/transmission/src/main/java/com/trendyol/transmission/transformer/request/TransformerRequestDelegate.kt index 0b86e70..d50924f 100644 --- a/transmission/src/main/java/com/trendyol/transmission/transformer/request/TransformerRequestDelegate.kt +++ b/transmission/src/main/java/com/trendyol/transmission/transformer/request/TransformerRequestDelegate.kt @@ -1,23 +1,26 @@ package com.trendyol.transmission.transformer.request import com.trendyol.transmission.ExperimentalTransmissionApi +import com.trendyol.transmission.InternalTransmissionApi import com.trendyol.transmission.Transmission import com.trendyol.transmission.identifier.IdentifierGenerator import com.trendyol.transmission.router.createBroadcast import com.trendyol.transmission.transformer.checkpoint.CheckpointHandler import com.trendyol.transmission.transformer.checkpoint.CheckpointTracker -import com.trendyol.transmission.transformer.checkpoint.Frequency +import com.trendyol.transmission.transformer.checkpoint.CheckpointValidator import com.trendyol.transmission.transformer.handler.CommunicationScope import kotlinx.coroutines.CoroutineScope import kotlinx.coroutines.channels.Channel -import kotlinx.coroutines.flow.combine -import kotlinx.coroutines.flow.drop import kotlinx.coroutines.flow.filter import kotlinx.coroutines.flow.filterIsInstance import kotlinx.coroutines.flow.first +import kotlinx.coroutines.suspendCancellableCoroutine +import kotlinx.coroutines.sync.Mutex +import kotlinx.coroutines.sync.withLock import java.util.concurrent.ConcurrentHashMap -import java.util.concurrent.ConcurrentMap +import kotlin.coroutines.resume +@OptIn(InternalTransmissionApi::class) @ExperimentalTransmissionApi internal class TransformerRequestDelegate( scope: CoroutineScope, @@ -28,249 +31,107 @@ internal class TransformerRequestDelegate( val outGoingQuery: Channel = Channel(capacity = Channel.BUFFERED) val resultBroadcast = scope.createBroadcast() - private val frequencyTracker: MutableSet = mutableSetOf() - private val arbitraryFrequencyTracker: MutableSet> = mutableSetOf() - private val frequencyWithArgsTracker: ConcurrentMap, Any> = - ConcurrentHashMap() - val checkpointHandler: CheckpointHandler by lazy { object : CheckpointHandler { - override suspend fun CommunicationScope.pauseOn( - contract: Contract.Checkpoint, - resumeBlock: suspend CommunicationScope.() -> Unit - ) { - when (contract.frequency) { - Frequency.Continuous -> { - val queryIdentifier = IdentifierGenerator.generateIdentifier() - checkpointTrackerProvider()?.putOrCreate( - contract = contract, - checkpointOwner = identity, - identifier = queryIdentifier - ) - resultBroadcast.output.filterIsInstance>() - .filter { it.resultIdentifier == queryIdentifier && it.key == contract.key } - .collect { - resumeBlock.invoke(this) + @ExperimentalTransmissionApi + override suspend fun CommunicationScope.pauseOn(contract: Contract.Checkpoint.Default) { + val queryIdentifier = IdentifierGenerator.generateIdentifier() + suspendCancellableCoroutine { continuation -> + val validator = + object : CheckpointValidator { + + override suspend fun validate( + contract: Contract.Checkpoint.Default, + args: Unit + ): Boolean { + continuation.resume(Unit) + return true } - } - - Frequency.Once -> { - if (frequencyTracker.contains(contract)) { - resumeBlock.invoke(this) - } else { - val queryIdentifier = IdentifierGenerator.generateIdentifier() - checkpointTrackerProvider()?.putOrCreate( - contract = contract, - checkpointOwner = identity, - identifier = queryIdentifier - ) - frequencyTracker.add(contract) - resultBroadcast.output.filterIsInstance>() - .filter { it.resultIdentifier == queryIdentifier && it.key == contract.key } - .collect { - resumeBlock.invoke(this) - } } + checkpointTrackerProvider()?.run { + registerContract(contract, queryIdentifier) + putOrCreate(queryIdentifier, validator) } } } @ExperimentalTransmissionApi override suspend fun CommunicationScope.pauseOn( - vararg contract: Contract.Checkpoint, - resumeBlock: suspend CommunicationScope.() -> Unit + vararg contract: Contract.Checkpoint.Default ) { val contractList = contract.toList() - check(contractList.distinctBy { it.frequency }.size == 1) { - "All Checkpoints should have the same frequency" - } check(contractList.isNotEmpty()) { "At least one checkpoint should be provided" } check(contractList.toSet().size == contractList.size) { "All Checkpoint Contracts should be unique" } - when (contractList.first().frequency) { - Frequency.Continuous -> { - val queryIdentifier = IdentifierGenerator.generateIdentifier() - contractList.forEach { internalContract -> - checkpointTrackerProvider()?.putOrCreate( - contract = internalContract, - checkpointOwner = identity, - identifier = queryIdentifier - ) - } - resultBroadcast.output.filterIsInstance>() - .filter { it.resultIdentifier == queryIdentifier } - .drop(contractList.size.dec()) - .collect { - resumeBlock.invoke(this) - } - - } - - Frequency.Once -> { - if (arbitraryFrequencyTracker.contains(contractList.toSet())) { - resumeBlock.invoke(this) - } else { - val queryIdentifier = IdentifierGenerator.generateIdentifier() - contractList.forEach { internalContract -> - checkpointTrackerProvider()?.putOrCreate( - contract = internalContract, - checkpointOwner = identity, - identifier = queryIdentifier - ) - } - arbitraryFrequencyTracker.add(contractList.toSet()) - resultBroadcast.output.filterIsInstance>() - .filter { it.resultIdentifier == queryIdentifier } - .drop(contractList.size.dec()) - .collect { - resumeBlock.invoke(this) - } - } - } - } - - } - - override suspend fun , A : Any> CommunicationScope.pauseOn( - contract: C, - resumeBlock: suspend CommunicationScope.(args: A) -> Unit - ) { - when (contract.frequency) { - Frequency.Continuous -> { - val queryIdentifier = IdentifierGenerator.generateIdentifier() - checkpointTrackerProvider()?.putOrCreate( - contract = contract, - checkpointOwner = identity, - identifier = queryIdentifier - ) - resultBroadcast.output.filterIsInstance>() - .filter { it.resultIdentifier == queryIdentifier && it.key == contract.key } - .collect { - resumeBlock.invoke(this, it.data) + val queryIdentifier = IdentifierGenerator.generateIdentifier() + suspendCancellableCoroutine { continuation -> + val validator = + object : CheckpointValidator { + private val lock = Mutex() + private val contractMap = + ConcurrentHashMap() + .apply { putAll(contractList.map { it to false }) } + + override suspend fun validate( + contract: Contract.Checkpoint.Default, + args: Unit + ): Boolean { + lock.withLock { contractMap.put(contract, true) } + if (contractMap.values.all { it }) { + continuation.resume(Unit) + return true + } else return false } - } - - Frequency.Once -> { - if (frequencyWithArgsTracker.containsKey(contract)) { - @Suppress("UNCHECKED_CAST") - resumeBlock.invoke(this, frequencyWithArgsTracker[contract] as A) - } else { - val queryIdentifier = IdentifierGenerator.generateIdentifier() - checkpointTrackerProvider()?.putOrCreate( - contract = contract, - checkpointOwner = identity, - identifier = queryIdentifier - ) - resultBroadcast.output.filterIsInstance>() - .filter { it.resultIdentifier == queryIdentifier && it.key == contract.key } - .collect { - frequencyWithArgsTracker[contract] = it.data - resumeBlock.invoke(this, it.data) - } } + checkpointTrackerProvider()?.run { + contractList.forEach { registerContract(it, queryIdentifier) } + putOrCreate(queryIdentifier, validator) } } } @ExperimentalTransmissionApi - override suspend fun , C2 : Contract.CheckpointWithArgs, A : Any, B : Any> CommunicationScope.pauseOn( - contract: C, - contract2: C2, - resumeBlock: suspend CommunicationScope.(A, B) -> Unit - ) { - val contractList = listOf(contract, contract2) - check(contractList.distinctBy { it.frequency }.size == 1) { - "All Checkpoint should have the same frequency" - } - check(contractList.toSet().size == contractList.size) { - "All Checkpoint Contracts should be unique" - } - when (contract.frequency) { - Frequency.Continuous -> { - val queryIdentifier = IdentifierGenerator.generateIdentifier() - listOf(contract, contract2).forEach { - checkpointTrackerProvider()?.putOrCreate( - contract = it, - checkpointOwner = identity, - identifier = queryIdentifier - ) - } - val flow1 = - resultBroadcast.output.filterIsInstance>() - .filter { it.resultIdentifier == queryIdentifier && it.key == contract.key } - val flow2 = - resultBroadcast.output.filterIsInstance>() - .filter { it.resultIdentifier == queryIdentifier && it.key == contract.key } - combine(flow1, flow2) { checkpoint1, checkpoint2 -> - resumeBlock.invoke(this, checkpoint1.data, checkpoint2.data) + override suspend fun , A : Any> CommunicationScope.pauseOn( + contract: C + ): A { + val queryIdentifier = IdentifierGenerator.generateIdentifier() + return suspendCancellableCoroutine { continuation -> + val validator = object : CheckpointValidator { + override suspend fun validate(contract: C, args: A): Boolean { + continuation.resume(args) + return true } } - - Frequency.Once -> { - if (frequencyWithArgsTracker.containsKey(contract) && frequencyWithArgsTracker.containsKey( - contract2 - ) - ) { - @Suppress("UNCHECKED_CAST") - resumeBlock.invoke( - this, - frequencyWithArgsTracker[contract] as A, - frequencyWithArgsTracker[contract2] as B - ) - } else { - val queryIdentifier = IdentifierGenerator.generateIdentifier() - listOf(contract, contract2).forEach { - checkpointTrackerProvider()?.putOrCreate( - contract = it, - checkpointOwner = identity, - identifier = queryIdentifier - ) - } - val flow1 = - resultBroadcast.output.filterIsInstance>() - .filter { it.resultIdentifier == queryIdentifier && it.key == contract.key } - val flow2 = - resultBroadcast.output.filterIsInstance>() - .filter { it.resultIdentifier == queryIdentifier && it.key == contract.key } - combine(flow1, flow2) { checkpoint1, checkpoint2 -> - frequencyWithArgsTracker[contract] = checkpoint1.data - frequencyWithArgsTracker[contract2] = checkpoint2.data - resumeBlock.invoke(this, checkpoint1.data, checkpoint2.data) - } - } + checkpointTrackerProvider()?.run { + registerContract(contract, queryIdentifier) + putOrCreate(queryIdentifier, validator) } } } - override suspend fun validate(contract: Contract.Checkpoint) { - val identifier = checkpointTrackerProvider()?.useIdentifier(contract) ?: return - outGoingQuery.send( - Query.Checkpoint( - sender = identifier.barrierOwner.key, - key = contract.key, - args = Unit, - queryIdentifier = identifier.value - ) - ) + override suspend fun validate(contract: Contract.Checkpoint.Default) { + val validator = checkpointTrackerProvider() + ?.useValidator(contract) + if (validator?.validate(contract, Unit) == true) { + checkpointTrackerProvider() + ?.removeValidator(contract) + } } - override suspend fun , A : Any> validate( + override suspend fun , A : Any> validate( contract: C, args: A ) { - val identifier = checkpointTrackerProvider()?.useIdentifier(contract) ?: return - outGoingQuery.send( - Query.Checkpoint( - sender = identifier.barrierOwner.key, - key = contract.key, - args = args, - queryIdentifier = identifier.value - ) - ) + val validator = checkpointTrackerProvider() + ?.useValidator(contract) + if (validator?.validate(contract, args) == true) { + checkpointTrackerProvider() + ?.removeValidator(contract) + } } } }