-
Notifications
You must be signed in to change notification settings - Fork 3
/
Copy pathcontrolsMagix.kt
70 lines (61 loc) · 2.33 KB
/
controlsMagix.kt
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
package space.kscience.controls.client
import kotlinx.coroutines.Job
import kotlinx.coroutines.flow.catch
import kotlinx.coroutines.flow.launchIn
import kotlinx.coroutines.flow.onEach
import kotlinx.coroutines.launch
import space.kscience.controls.api.DeviceMessage
import space.kscience.controls.manager.DeviceManager
import space.kscience.controls.manager.hubMessageFlow
import space.kscience.controls.manager.respondHubMessage
import space.kscience.dataforge.context.error
import space.kscience.dataforge.context.logger
import space.kscience.magix.api.*
internal val controlsMagixFormat: MagixFormat<DeviceMessage> = MagixFormat(
DeviceMessage.serializer(),
setOf("controls-kt")
)
/**
* A magix message format to work with Controls-kt data
*/
public val DeviceManager.Companion.magixFormat: MagixFormat<DeviceMessage> get() = controlsMagixFormat
internal fun generateId(request: MagixMessage): String = if (request.id != null) {
"${request.id}.response"
} else {
"controls[${request.payload.hashCode().toString(16)}"
}
/**
* Communicate with server in [Magix format](https://github.com/waltz-controls/rfc/tree/master/1)
*
* Accepts messages with target that equals [endpointID] or null (broadcast messages)
*/
public fun DeviceManager.launchMagixService(
endpoint: MagixEndpoint,
endpointID: String = controlsMagixFormat.defaultFormat,
): Job = context.launch {
endpoint.subscribe(controlsMagixFormat, targetFilter = listOf(endpointID, null)).onEach { (request, payload) ->
val responsePayload = respondHubMessage(payload)
responsePayload.forEach {
endpoint.send(
format = controlsMagixFormat,
payload = it,
source = endpointID,
target = request.sourceEndpoint,
id = generateId(request),
parentId = request.id
)
}
}.catch { error ->
logger.error(error) { "Error while responding to message: ${error.message}" }
}.launchIn(this)
hubMessageFlow().onEach { payload ->
endpoint.send(
format = controlsMagixFormat,
payload = payload,
source = endpointID,
id = "df[${payload.hashCode()}]"
)
}.catch { error ->
logger.error(error) { "Error while sending a message: ${error.message}" }
}.launchIn(this)
}