Skip to content

Commit

Permalink
Merge pull request #27 from Trendyol/checkpoint-api-experimental
Browse files Browse the repository at this point in the history
Checkpoint API
  • Loading branch information
yigitozgumus authored Nov 14, 2024
2 parents eb104ae + 9b11282 commit d156b64
Show file tree
Hide file tree
Showing 21 changed files with 419 additions and 37 deletions.
Original file line number Diff line number Diff line change
@@ -1,21 +1,25 @@
package com.trendyol.transmission.components.features.input

import androidx.compose.ui.graphics.Color
import com.trendyol.transmission.DefaultDispatcher
import com.trendyol.transmission.ExperimentalTransmissionApi
import com.trendyol.transmission.components.features.InputUiState
import com.trendyol.transmission.components.features.colorpicker.ColorPickerEffect
import com.trendyol.transmission.components.features.multioutput.multiOutputTransformerIdentity
import com.trendyol.transmission.transformer.Transformer
import com.trendyol.transmission.transformer.dataholder.dataHolder
import com.trendyol.transmission.transformer.handler.Handlers
import com.trendyol.transmission.transformer.handler.createHandlers
import com.trendyol.transmission.transformer.handler.onEffect
import com.trendyol.transmission.transformer.handler.onSignal
import com.trendyol.transmission.transformer.request.Contracts
import com.trendyol.transmission.transformer.request.checkpointWithArgs
import com.trendyol.transmission.transformer.request.computation
import com.trendyol.transmission.transformer.request.computation.Computations
import com.trendyol.transmission.transformer.request.computation.createComputations
import com.trendyol.transmission.transformer.request.computation.register
import com.trendyol.transmission.transformer.request.computationWithArgs
import com.trendyol.transmission.transformer.request.dataHolder
import com.trendyol.transmission.components.features.InputUiState
import kotlinx.coroutines.CoroutineDispatcher
import kotlinx.coroutines.delay
import javax.inject.Inject
Expand All @@ -37,20 +41,29 @@ class InputTransformer @Inject constructor(
}
}

@OptIn(ExperimentalTransmissionApi::class)
override val handlers: Handlers = createHandlers {
onSignal<InputSignal.InputUpdate> { signal ->
holder.update { it.copy(writtenText = signal.value) }
val color = pauseOn(colorCheckpoint)
send(
effect = ColorPickerEffect.SelectedColorUpdate(color),
identity = multiOutputTransformerIdentity
)
publish(effect = InputEffect.InputUpdate(signal.value))
}
onEffect<ColorPickerEffect.BackgroundColorUpdate> { effect ->
validate(colorCheckpoint, effect.color)
holder.update { it.copy(backgroundColor = effect.color) }
}
}

@OptIn(ExperimentalTransmissionApi::class)
companion object {
val writtenInputWithArgs = Contracts.computationWithArgs<String, WrittenInput>()
val writtenInputContract = Contracts.computation<WrittenInput>()
val holderContract = Contracts.dataHolder<InputUiState>()
val colorCheckpoint = Contracts.checkpointWithArgs<Color>()
}
}

Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
package com.trendyol.transmissiontest

import com.trendyol.transmission.Transmission
import com.trendyol.transmission.effect.EffectWrapper

interface TransformerTestScope {
val dataStream: List<Transmission.Data>
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
package com.trendyol.transmission

/**
* Marks internal declarations in Transmission. Internal declarations must not be used outside the library.
* There are no backward compatibility guarantees between different versions of Transmission.
*/
@RequiresOptIn(level = RequiresOptIn.Level.ERROR)
@Retention(AnnotationRetention.BINARY)
annotation class InternalTransmissionApi

/**
* Marks experimental API in Transmission. An experimental API can be changed or removed at any time.
*/
@RequiresOptIn(level = RequiresOptIn.Level.WARNING)
@Retention(AnnotationRetention.BINARY)
annotation class ExperimentalTransmissionApi
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ package com.trendyol.transmission.effect
import com.trendyol.transmission.Transmission
import com.trendyol.transmission.transformer.request.Contract

data class EffectWrapper(
internal data class EffectWrapper(
val effect: Transmission.Effect,
val identity: Contract.Identity? = null
)
Original file line number Diff line number Diff line change
Expand Up @@ -3,9 +3,12 @@ package com.trendyol.transmission.identifier
import kotlin.uuid.ExperimentalUuidApi
import kotlin.uuid.Uuid

/**
* Generates identifier for internal communication.
*/
@OptIn(ExperimentalUuidApi::class)
internal object IdentifierGenerator {
fun generateIdentifier(): String {
return Uuid.random().toString()
}
}
}
Original file line number Diff line number Diff line change
@@ -1,5 +1,3 @@
@file:OptIn(ExperimentalUuidApi::class)

package com.trendyol.transmission.router

import com.trendyol.transmission.Transmission
Expand All @@ -20,7 +18,6 @@ import kotlinx.coroutines.flow.first
import kotlinx.coroutines.flow.receiveAsFlow
import kotlinx.coroutines.flow.shareIn
import kotlinx.coroutines.launch
import kotlin.uuid.ExperimentalUuidApi

internal class RequestDelegate(
private val queryScope: CoroutineScope,
Expand Down Expand Up @@ -49,7 +46,7 @@ internal class RequestDelegate(

// region process queries

private suspend fun processQuery(query: Query) = queryScope.launch {
private fun processQuery(query: Query) = queryScope.launch {
when (query) {
is Query.Computation -> processComputationQuery(query)
is Query.Data -> processDataQuery(query)
Expand All @@ -60,7 +57,7 @@ internal class RequestDelegate(
}

private fun processDataQuery(
query: Query.Data
query: Query.Data,
) = queryScope.launch {
val dataHolder = routerRef.transformerSet
.filter { it.storage.isHolderStateInitialized() }
Expand All @@ -79,7 +76,7 @@ internal class RequestDelegate(
}

private fun processComputationQuery(
query: Query.Computation
query: Query.Computation,
) = queryScope.launch {
val computationHolder = routerRef.transformerSet
.find { it.storage.hasComputation(query.key) }
Expand Down Expand Up @@ -110,7 +107,7 @@ internal class RequestDelegate(
}

private fun <A : Any> processComputationQueryWithArgs(
query: Query.ComputationWithArgs<A>
query: Query.ComputationWithArgs<A>,
) = queryScope.launch {
val computationHolder = routerRef.transformerSet
.find { it.storage.hasComputation(query.key) }
Expand Down Expand Up @@ -145,7 +142,7 @@ internal class RequestDelegate(
}

private fun processExecution(
query: Query.Execution
query: Query.Execution,
) = queryScope.launch {
val executionHolder = routerRef.transformerSet
.find { it.storage.hasExecution(query.key) } ?: return@launch
Expand Down Expand Up @@ -180,7 +177,7 @@ internal class RequestDelegate(
}

private fun testDataQuery(
query: Query.Data
query: Query.Data,
) = queryScope.launch {
val dataToSend = QueryResult.Data(
owner = query.sender,
Expand All @@ -194,7 +191,7 @@ internal class RequestDelegate(
}

private fun testComputationQuery(
query: Query.Computation
query: Query.Computation,
) = queryScope.launch {
val computationToSend = QueryResult.Computation(
owner = query.sender,
Expand All @@ -206,7 +203,7 @@ internal class RequestDelegate(
}

private fun <A : Any> testComputationQueryWithArgs(
query: Query.ComputationWithArgs<A>
query: Query.ComputationWithArgs<A>,
) = queryScope.launch {
val computationToSend = QueryResult.Computation(
owner = query.sender,
Expand Down Expand Up @@ -236,7 +233,7 @@ internal class RequestDelegate(

override suspend fun <C : Contract.Computation<D>, D : Any> compute(
contract: C,
invalidate: Boolean
invalidate: Boolean,
): D? {
val queryIdentifier = IdentifierGenerator.generateIdentifier()
outGoingQuery.send(
Expand All @@ -256,7 +253,7 @@ internal class RequestDelegate(
override suspend fun <C : Contract.ComputationWithArgs<A, D>, A : Any, D : Any> compute(
contract: C,
args: A,
invalidate: Boolean
invalidate: Boolean,
): D? {
val queryIdentifier = IdentifierGenerator.generateIdentifier()
outGoingQuery.send(
Expand Down Expand Up @@ -284,7 +281,7 @@ internal class RequestDelegate(

override suspend fun <C : Contract.ExecutionWithArgs<A>, A : Any> execute(
contract: C,
args: A
args: A,
) {
outGoingQuery.send(
Query.ExecutionWithArgs(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import com.trendyol.transmission.effect.EffectWrapper
import com.trendyol.transmission.router.builder.TransmissionRouterBuilderScope
import com.trendyol.transmission.router.loader.TransformerSetLoader
import com.trendyol.transmission.transformer.Transformer
import com.trendyol.transmission.transformer.checkpoint.CheckpointTracker
import com.trendyol.transmission.transformer.request.Contract
import com.trendyol.transmission.transformer.request.RequestHandler
import kotlinx.coroutines.CoroutineDispatcher
Expand Down Expand Up @@ -42,6 +43,8 @@ class TransmissionRouter internal constructor(
private val dataBroadcast = routerScope.createBroadcast<Transmission.Data>()
private val effectBroadcast = routerScope.createBroadcast<EffectWrapper>()

private val checkpointTracker = CheckpointTracker()

val dataStream = dataBroadcast.output
val effectStream: SharedFlow<Transmission.Effect> = effectBroadcast.output.map { it.effect }
.shareIn(routerScope, SharingStarted.WhileSubscribed())
Expand All @@ -51,6 +54,7 @@ class TransmissionRouter internal constructor(
routerRef = this@TransmissionRouter,
registry = registryScope
)

val requestHelper: RequestHandler = _requestDelegate

init {
Expand Down Expand Up @@ -97,6 +101,7 @@ class TransmissionRouter internal constructor(
}
transformerSet.forEach { transformer ->
transformer.run {
bindCheckpointTracker(checkpointTracker)
startSignalCollection(incoming = signalBroadcast.output)
startDataPublishing(data = dataBroadcast.producer)
startEffectProcessing(
Expand Down
Original file line number Diff line number Diff line change
@@ -1,12 +1,14 @@
package com.trendyol.transmission.transformer

import com.trendyol.transmission.ExperimentalTransmissionApi
import com.trendyol.transmission.Transmission
import com.trendyol.transmission.effect.EffectWrapper
import com.trendyol.transmission.transformer.handler.CommunicationScope
import com.trendyol.transmission.transformer.request.Contract
import com.trendyol.transmission.transformer.request.TransformerRequestDelegate
import kotlinx.coroutines.channels.Channel

@OptIn(ExperimentalTransmissionApi::class)
internal class CommunicationScopeBuilder(
private val effectChannel: Channel<EffectWrapper>,
private val dataChannel: Channel<Transmission.Data>,
Expand All @@ -29,32 +31,70 @@ internal class CommunicationScopeBuilder(
}

override suspend fun <C : Contract.DataHolder<D>, D : Transmission.Data> getData(contract: C): D? {
return requestDelegate.interactor.getData(contract)
return requestDelegate.requestHandler.getData(contract)
}

override suspend fun <C : Contract.Computation<D>, D : Any> compute(
contract: C,
invalidate: Boolean
): D? {
return requestDelegate.interactor.compute(contract, invalidate)
return requestDelegate.requestHandler.compute(contract, invalidate)
}

override suspend fun <C : Contract.ComputationWithArgs<A, D>, A : Any, D : Any> compute(
contract: C,
args: A,
invalidate: Boolean
): D? {
return requestDelegate.interactor.compute(contract, args, invalidate)
return requestDelegate.requestHandler.compute(contract, args, invalidate)
}

override suspend fun execute(contract: Contract.Execution) {
requestDelegate.interactor.execute(contract)
requestDelegate.requestHandler.execute(contract)
}

override suspend fun <C : Contract.ExecutionWithArgs<A>, A : Any> execute(
contract: C,
args: A
) {
requestDelegate.interactor.execute(contract, args)
requestDelegate.requestHandler.execute(contract, args)
}

@ExperimentalTransmissionApi
override suspend fun CommunicationScope.pauseOn(
contract: Contract.Checkpoint.Default
) {
with(requestDelegate.checkpointHandler) {
pauseOn(contract)
}
}

@ExperimentalTransmissionApi
override suspend fun CommunicationScope.pauseOn(
vararg contract: Contract.Checkpoint.Default
) {
with(requestDelegate.checkpointHandler) {
pauseOn(*contract)
}
}

@ExperimentalTransmissionApi
override suspend fun <C : Contract.Checkpoint.WithArgs<A>, A : Any> CommunicationScope.pauseOn(
contract: C
): A {
return with(requestDelegate.checkpointHandler) {
pauseOn(contract)
}
}

override suspend fun validate(contract: Contract.Checkpoint.Default) {
requestDelegate.checkpointHandler.validate(contract)
}

override suspend fun <C : Contract.Checkpoint.WithArgs<A>, A : Any> validate(
contract: C,
args: A
) {
requestDelegate.checkpointHandler.validate(contract, args)
}
}
Loading

0 comments on commit d156b64

Please sign in to comment.