From cbc0f1cf643ed1857efae4a832f87012956f3574 Mon Sep 17 00:00:00 2001 From: Sandeep Nishad Date: Mon, 7 Aug 2023 01:20:51 +0530 Subject: [PATCH] feat(weaver-corda): support array of remote views, consequent user flow call Signed-off-by: Sandeep Nishad --- .../src/main/kotlin/CordaDriver.kt | 12 +- .../imodule/corda/states/ExternalState.kt | 9 + .../corda/flows/WriteExternalStateFlows.kt | 289 +++++++++++------- .../client/AssetTransferManager.kt | 16 +- .../client/InteropManager.kt | 49 +-- .../contracts-kotlin/build.gradle | 2 + .../com/example/contract/SimpleContract.kt | 9 + .../kotlin/com/example/flow/SimpleFlow.kt | 122 ++++++++ weaver/sdks/corda/github.properties.rm | 3 + .../weaver/sdk/corda/InteroperableHelper.kt | 166 +++++++--- weaver/tests/network-setups/corda/makefile | 3 + 11 files changed, 487 insertions(+), 193 deletions(-) create mode 100644 weaver/sdks/corda/github.properties.rm diff --git a/weaver/core/drivers/corda-driver/src/main/kotlin/CordaDriver.kt b/weaver/core/drivers/corda-driver/src/main/kotlin/CordaDriver.kt index eaaa2b3f8f4..7acb8386746 100644 --- a/weaver/core/drivers/corda-driver/src/main/kotlin/CordaDriver.kt +++ b/weaver/core/drivers/corda-driver/src/main/kotlin/CordaDriver.kt @@ -18,6 +18,7 @@ import java.util.* import org.hyperledger.cacti.weaver.imodule.corda.flows.HandleExternalRequest import org.hyperledger.cacti.weaver.sdk.corda.InteroperableHelper +import org.hyperledger.cacti.weaver.sdk.corda.RelayOptions import org.hyperledger.cacti.weaver.protos.common.query.QueryOuterClass import org.hyperledger.cacti.weaver.protos.common.state.State import org.hyperledger.cacti.weaver.protos.corda.ViewDataOuterClass @@ -143,13 +144,16 @@ fun createAggregatedCordaView(views: List) : Either // TODO: if the first relay address fails, retry with other relay addresses in the list. + val relayOptions = RelayOptions( + useTlsForRelay = System.getenv("RELAY_TLS")?.toBoolean() ?: false, + relayTlsTrustStorePath = System.getenv("RELAY_TLSCA_TRUST_STORE")?.toString() ?: "", + relayTlsTrustStorePassword = System.getenv("RELAY_TLSCA_TRUST_STORE_PASSWORD")?.toString() ?: "", + tlsCACertPathsForRelay = System.getenv("RELAY_TLSCA_CERT_PATHS")?.toString() ?: "" + ) val channel = InteroperableHelper.getChannelToRelay( relayAddresses[0].host, relayAddresses[0].port, - System.getenv("RELAY_TLS")?.toBoolean() ?: false, - System.getenv("RELAY_TLSCA_TRUST_STORE")?.toString() ?: "", - System.getenv("RELAY_TLSCA_TRUST_STORE_PASSWORD")?.toString() ?: "", - System.getenv("RELAY_TLSCA_CERT_PATHS")?.toString() ?: "") + relayOptions) GrpcClient(channel) } } catch (e: Exception) { diff --git a/weaver/core/network/corda-interop-app/interop-contracts/src/main/kotlin/org/hyperledger/cacti/weaver/imodule/corda/states/ExternalState.kt b/weaver/core/network/corda-interop-app/interop-contracts/src/main/kotlin/org/hyperledger/cacti/weaver/imodule/corda/states/ExternalState.kt index 53423445ad3..acfacc24e78 100644 --- a/weaver/core/network/corda-interop-app/interop-contracts/src/main/kotlin/org/hyperledger/cacti/weaver/imodule/corda/states/ExternalState.kt +++ b/weaver/core/network/corda-interop-app/interop-contracts/src/main/kotlin/org/hyperledger/cacti/weaver/imodule/corda/states/ExternalState.kt @@ -11,6 +11,7 @@ import net.corda.core.contracts.BelongsToContract import net.corda.core.contracts.LinearState import net.corda.core.contracts.UniqueIdentifier import net.corda.core.identity.Party +import net.corda.core.serialization.CordaSerializable /** * A representation of state and proof retrieved from an external network. @@ -27,3 +28,11 @@ data class ExternalState( override val linearId: UniqueIdentifier = UniqueIdentifier(), override val participants: List = listOf() ) : LinearState + +@CordaSerializable +data class InvocationSpec( + val disableInvocation: Boolean = true, + val invokeFlowName: String = "", + val invokeFlowArgs: List = listOf(), + val interopArgsIndex: Int = -1 +) diff --git a/weaver/core/network/corda-interop-app/interop-workflows/src/main/kotlin/org/hyperledger/cacti/weaver/imodule/corda/flows/WriteExternalStateFlows.kt b/weaver/core/network/corda-interop-app/interop-workflows/src/main/kotlin/org/hyperledger/cacti/weaver/imodule/corda/flows/WriteExternalStateFlows.kt index 34dc878591f..adfab0bcdff 100644 --- a/weaver/core/network/corda-interop-app/interop-workflows/src/main/kotlin/org/hyperledger/cacti/weaver/imodule/corda/flows/WriteExternalStateFlows.kt +++ b/weaver/core/network/corda-interop-app/interop-workflows/src/main/kotlin/org/hyperledger/cacti/weaver/imodule/corda/flows/WriteExternalStateFlows.kt @@ -34,6 +34,7 @@ import org.hyperledger.cacti.weaver.protos.corda.ViewDataOuterClass import org.hyperledger.cacti.weaver.protos.common.interop_payload.InteropPayloadOuterClass import org.hyperledger.cacti.weaver.imodule.corda.contracts.ExternalStateContract import org.hyperledger.cacti.weaver.imodule.corda.states.ExternalState +import org.hyperledger.cacti.weaver.imodule.corda.states.InvocationSpec /** @@ -49,10 +50,11 @@ import org.hyperledger.cacti.weaver.imodule.corda.states.ExternalState class WriteExternalStateInitiator @JvmOverloads constructor( - val viewBase64String: String, - val address: String, + val views64: Array, + val addresses: Array, + val invokeObject: InvocationSpec = InvocationSpec(), val participants: List = listOf() - ): FlowLogic>() { + ): FlowLogic>() { /** * The call() method captures the logic to perform the proof validation and the construction of @@ -61,51 +63,82 @@ class WriteExternalStateInitiator * @return Returns the linearId of the newly created [ExternalState]. */ @Suspendable - override fun call(): Either = try { - println("External network returned view: $viewBase64String\n") + override fun call(): Either { + try { + var externalStatesLinearIdArray = Array(addresses.size) { UniqueIdentifier() } + for (i in 0..addresses.size-1) { + val viewBase64String = views64[i] + val address = addresses[i] + + println("External network returned view #${i}: $viewBase64String\n") + + val view = State.View.parseFrom(Base64.getDecoder().decode(viewBase64String)) - val view = State.View.parseFrom(Base64.getDecoder().decode(viewBase64String)) + // 1. Verify the proofs that are returned + val verifyResult = verifyView(view, address, serviceHub).fold({ + println("View verification failed with error: ${it.message}") + Left(Error("View verification failed with error: ${it.message}")) + }, { + println("View verification successful. Creating state to be stored in the vault.") + // 2. Create the state to be stored + var externalStateParticipants = if (participants.contains(ourIdentity)) { participants } else { listOf(ourIdentity) + participants } + val state = ExternalState( + linearId = UniqueIdentifier(), + participants = externalStateParticipants, + meta = view.meta.toByteArray(), + state = view.data.toByteArray()) + println("Storing ExternalState in the vault:\n\tLinear Id = ${state.linearId}\n\tParticipants = ${state.participants}\n\tMeta = ${view.meta}\tState = ${Base64.getEncoder().encodeToString(state.state)}\n") - // 1. Verify the proofs that are returned - verifyView(view, address, serviceHub).flatMap { - println("View verification successful. Creating state to be stored in the vault.") - // 2. Create the state to be stored - var externalStateParticipants = if (participants.contains(ourIdentity)) { participants } else { listOf(ourIdentity) + participants } - val state = ExternalState( - linearId = UniqueIdentifier(), - participants = externalStateParticipants, - meta = view.meta.toByteArray(), - state = view.data.toByteArray()) - println("Storing ExternalState in the vault:\n\tLinear Id = ${state.linearId}\n\tParticipants = ${state.participants}\n\tMeta = ${view.meta}\tState = ${Base64.getEncoder().encodeToString(state.state)}\n") + // 3. Build the transaction + val notary = serviceHub.networkMapCache.notaryIdentities.first() + val command = Command(ExternalStateContract.Commands.Create(), ourIdentity.owningKey) + val txBuilder = TransactionBuilder(notary) + .addOutputState(state, ExternalStateContract.ID) + .addCommand(command) - // 3. Build the transaction - val notary = serviceHub.networkMapCache.notaryIdentities.first() - val command = Command(ExternalStateContract.Commands.Create(), ourIdentity.owningKey) - val txBuilder = TransactionBuilder(notary) - .addOutputState(state, ExternalStateContract.ID) - .addCommand(command) + // 4. Verify and collect signatures on the transaction + txBuilder.verify(serviceHub) + val tx = serviceHub.signInitialTransaction(txBuilder) + var sessions = listOf() + for (party in externalStateParticipants) { + if (!ourIdentity.equals(party)) { + val session = initiateFlow(party) + session.send(address) + sessions += session + } + } + + val stx = subFlow(CollectSignaturesFlow(tx, sessions)) + subFlow(FinalityFlow(stx, sessions)) - // 4. Verify and collect signatures on the transaction - txBuilder.verify(serviceHub) - val tx = serviceHub.signInitialTransaction(txBuilder) - var sessions = listOf() - for (party in externalStateParticipants) { - if (!ourIdentity.equals(party)) { - val session = initiateFlow(party) - session.send(address) - sessions += session + // 5. Return the linearId of the state + println("State stored successfully.\n") + externalStatesLinearIdArray[i] = state.linearId + Right(Unit) + }) + if (verifyResult.isLeft()) { + return verifyResult } } - - val stx = subFlow(CollectSignaturesFlow(tx, sessions)) - subFlow(FinalityFlow(stx, sessions)) - - // 5. Return the linearId of the state - println("State stored successfully.\n") - Right(state.linearId) + if (invokeObject.disableInvocation) { + println("Invocation disabled!") + return Right(externalStatesLinearIdArray) + } else { + val argsMList = invokeObject.invokeFlowArgs.toMutableList() + argsMList[invokeObject.interopArgsIndex] = externalStatesLinearIdArray + println("Calling Workflow: ${invokeObject.invokeFlowName} with args: ${argsMList}") + val userFlow = resolveGenericFlow(invokeObject.invokeFlowName, argsMList.toList()) + return userFlow.fold({ + println("Error in resolving user flow: ${it.message}") + Left(Error("Error in resolving user flow: ${it.message}")) + }, { + Right(subFlow(it)) + }) + } + } catch (e: Exception) { + println("Error in WriteExternalState: ${e.message}") + return Left(Error("Error in WriteExternalState: ${e.message}")) } - } catch (e: Exception) { - Left(Error("Failed to store state in ledger: ${e.message}")) } } @@ -169,80 +202,7 @@ class GetExternalStateByLinearId( println("Error: Could not find external state with linearId $linearId") throw IllegalArgumentException("Error: Could not find external state with linearId $linearId") } else { - val viewMetaByteArray = states.first().state.data.meta - val viewDataByteArray = states.first().state.data.state - val meta = State.Meta.parseFrom(viewMetaByteArray) - - when (meta.protocol) { - State.Meta.Protocol.CORDA -> { - val cordaViewData = ViewDataOuterClass.ViewData.parseFrom(viewDataByteArray) - println("cordaViewData: $cordaViewData") - val interopPayload = InteropPayloadOuterClass.InteropPayload.parseFrom(cordaViewData.notarizedPayloadsList[0].payload) - val payloadString = interopPayload.payload.toStringUtf8() - println("response from remote: ${payloadString}.\n") - println("query address: ${interopPayload.address}.\n") - val viewData = ViewDataOuterClass.ViewData.newBuilder() - .addAllNotarizedPayloads(cordaViewData.notarizedPayloadsList) - .build() - - return viewData.toByteArray() - } - State.Meta.Protocol.FABRIC -> { - val fabricViewData = ViewData.FabricView.parseFrom(viewDataByteArray) - println("fabricViewData: $fabricViewData") - // TODO: We assume here that the response payloads have been matched earlier, but perhaps we should match them here too - val chaincodeAction = ProposalPackage.ChaincodeAction.parseFrom(fabricViewData.endorsedProposalResponsesList[0].payload.extension) - val interopPayload = InteropPayloadOuterClass.InteropPayload.parseFrom(chaincodeAction.response.payload) - val payloadString = interopPayload.payload.toStringUtf8() - println("response from remote: ${payloadString}.\n") - println("query address: ${interopPayload.address}.\n") - - val securityDomain = interopPayload.address.split("/")[1] - val proofStringPrefix = "Verified Proof: Endorsed by members: [" - val proofStringSuffix = "] of security domain: $securityDomain" - var mspIdList = "" - fabricViewData.endorsedProposalResponsesList.map { endorsedProposalResponse -> - val endorsement = endorsedProposalResponse.endorsement - val mspId = Identities.SerializedIdentity.parseFrom(endorsement.endorser).mspid - if (mspIdList.isNotEmpty()) { - mspIdList += ", " - } - mspIdList += mspId - } - val proofMessage = proofStringPrefix + mspIdList + proofStringSuffix - println("Proof Message: ${proofMessage}.\n") - - var notarizationList: List = listOf() - - fabricViewData.endorsedProposalResponsesList.map { endorsedProposalResponse -> - val endorsement = endorsedProposalResponse.endorsement - val serializedIdentity = Identities.SerializedIdentity.parseFrom(endorsement.endorser) - val mspId = serializedIdentity.mspid - val certString = Base64.getEncoder().encodeToString(serializedIdentity.idBytes.toByteArray()) - val signature = Base64.getEncoder().encodeToString(endorsement.signature.toByteArray()) - - val notarization = ViewDataOuterClass.ViewData.NotarizedPayload.newBuilder() - .setCertificate(certString) - .setSignature(signature) - .setId(mspId) - .setPayload(chaincodeAction.response.payload) - .build() - notarizationList = notarizationList + notarization - } - - val viewData = ViewDataOuterClass.ViewData.newBuilder() - .addAllNotarizedPayloads(notarizationList) - .build() - - return viewData.toByteArray() - } - else -> { - println("GetExternalStateByLinearId Error: Unrecognized protocol.\n") - throw IllegalArgumentException("Error: Unrecognized protocol.") - } - } - - + return getViewFromExternalState(states.first().state.data) } } @@ -282,3 +242,102 @@ class GetExternalStateAndRefByLinearId( } } + +fun getViewFromExternalState(state: ExternalState): ByteArray { + val viewMetaByteArray = state.meta + val viewDataByteArray = state.state + val meta = State.Meta.parseFrom(viewMetaByteArray) + var viewData = when (meta.protocol) { + State.Meta.Protocol.CORDA -> { + val cordaViewData = ViewDataOuterClass.ViewData.parseFrom(viewDataByteArray) + println("cordaViewData: $cordaViewData") + val interopPayload = InteropPayloadOuterClass.InteropPayload.parseFrom(cordaViewData.notarizedPayloadsList[0].payload) + val payloadString = interopPayload.payload.toStringUtf8() + println("response from remote: ${payloadString}.\n") + println("query address: ${interopPayload.address}.\n") + ViewDataOuterClass.ViewData.newBuilder() + .addAllNotarizedPayloads(cordaViewData.notarizedPayloadsList) + .build() + } + State.Meta.Protocol.FABRIC -> { + val fabricViewData = ViewData.FabricView.parseFrom(viewDataByteArray) + println("fabricViewData: $fabricViewData") + // TODO: We assume here that the response payloads have been matched earlier, but perhaps we should match them here too + val chaincodeAction = ProposalPackage.ChaincodeAction.parseFrom(fabricViewData.endorsedProposalResponsesList[0].payload.extension) + val interopPayload = InteropPayloadOuterClass.InteropPayload.parseFrom(chaincodeAction.response.payload) + val payloadString = interopPayload.payload.toStringUtf8() + println("response from remote: ${payloadString}.\n") + println("query address: ${interopPayload.address}.\n") + + val securityDomain = interopPayload.address.split("/")[1] + val proofStringPrefix = "Verified Proof: Endorsed by members: [" + val proofStringSuffix = "] of security domain: $securityDomain" + var mspIdList = "" + fabricViewData.endorsedProposalResponsesList.map { endorsedProposalResponse -> + val endorsement = endorsedProposalResponse.endorsement + val mspId = Identities.SerializedIdentity.parseFrom(endorsement.endorser).mspid + if (mspIdList.isNotEmpty()) { + mspIdList += ", " + } + mspIdList += mspId + } + val proofMessage = proofStringPrefix + mspIdList + proofStringSuffix + println("Proof Message: ${proofMessage}.\n") + + var notarizationList: List = listOf() + + fabricViewData.endorsedProposalResponsesList.map { endorsedProposalResponse -> + val endorsement = endorsedProposalResponse.endorsement + val serializedIdentity = Identities.SerializedIdentity.parseFrom(endorsement.endorser) + val mspId = serializedIdentity.mspid + val certString = Base64.getEncoder().encodeToString(serializedIdentity.idBytes.toByteArray()) + val signature = Base64.getEncoder().encodeToString(endorsement.signature.toByteArray()) + + val notarization = ViewDataOuterClass.ViewData.NotarizedPayload.newBuilder() + .setCertificate(certString) + .setSignature(signature) + .setId(mspId) + .setPayload(chaincodeAction.response.payload) + .build() + notarizationList = notarizationList + notarization + } + + ViewDataOuterClass.ViewData.newBuilder() + .addAllNotarizedPayloads(notarizationList) + .build() + } + else -> { + println("GetExternalStateByLinearId Error: Unrecognized protocol.\n") + throw IllegalArgumentException("Error: Unrecognized protocol.") + } + } + return viewData.toByteArray() +} + +fun getPaylaodFromView(viewBytes: ByteArray): ByteArray { + val externalStateView = ViewDataOuterClass.ViewData.parseFrom(viewBytes) + val interopPayload = InteropPayloadOuterClass.InteropPayload.parseFrom(externalStateView.notarizedPayloadsList[0].payload) + return interopPayload.payload.toByteArray() +} + +@Suppress("UNCHECKED_CAST") +fun resolveGenericFlow(flowName: String, flowArgs: List): Either> = try { + println("Attempting to resolve $flowName to a Corda flow.") + val kotlinClass = Class.forName(flowName).kotlin + val ctor = kotlinClass.constructors.first() + if (ctor.parameters.size != flowArgs.size) { + println("Flow Resolution Error: wrong number of arguments supplied.\n") + Left(Error("Flow Resolution Error: wrong number of arguments supplied")) + } else { + println("Resolved flow to ${ctor}") + try { + Right(ctor.call(*flowArgs.toTypedArray()) as FlowLogic) + } catch (e: Exception) { + println("Flow Resolution Error: $flowName not a flow: ${e.message}.\n") + Left(Error("Flow Resolution Error: $flowName not a flow")) + } + } +} catch (e: Exception) { + println("Flow Resolution Error: ${e.message} \n") + Left(Error("Flow Resolution Error: ${e.message}")) +} diff --git a/weaver/samples/corda/corda-simple-application/clients/src/main/kotlin/com/cordaSimpleApplication/client/AssetTransferManager.kt b/weaver/samples/corda/corda-simple-application/clients/src/main/kotlin/com/cordaSimpleApplication/client/AssetTransferManager.kt index e69f0fd9481..b9a9f9f1958 100644 --- a/weaver/samples/corda/corda-simple-application/clients/src/main/kotlin/com/cordaSimpleApplication/client/AssetTransferManager.kt +++ b/weaver/samples/corda/corda-simple-application/clients/src/main/kotlin/com/cordaSimpleApplication/client/AssetTransferManager.kt @@ -26,6 +26,7 @@ import java.util.Base64 import net.corda.core.identity.CordaX500Name import org.hyperledger.cacti.weaver.sdk.corda.AssetTransferSDK import org.hyperledger.cacti.weaver.sdk.corda.InteroperableHelper +import org.hyperledger.cacti.weaver.sdk.corda.RelayOptions import com.cordaSimpleApplication.contract.AssetContract import com.cordaSimpleApplication.contract.BondAssetContract import java.util.Calendar @@ -613,16 +614,19 @@ fun requestStateFromRemoteNetwork( val networkName = System.getenv("NETWORK_NAME") ?: "Corda_Network" try { + val relayOptions = RelayOptions( + useTlsForRelay = config["RELAY_TLS"]!!.toBoolean(), + relayTlsTrustStorePath = config["RELAY_TLSCA_TRUST_STORE"]!!, + relayTlsTrustStorePassword = config["RELAY_TLSCA_TRUST_STORE_PASSWORD"]!!, + tlsCACertPathsForRelay = config["RELAY_TLSCA_CERT_PATHS"]!! + ) InteroperableHelper.interopFlow( proxy, + arrayOf(externalStateAddress), localRelayAddress, - externalStateAddress, networkName, - externalStateParticipants, - config["RELAY_TLS"]!!.toBoolean(), - config["RELAY_TLSCA_TRUST_STORE"]!!, - config["RELAY_TLSCA_TRUST_STORE_PASSWORD"]!!, - config["RELAY_TLSCA_CERT_PATHS"]!! + externalStateParticipants = externalStateParticipants, + relayOptions = relayOptions ).fold({ println("Error in Interop Flow: ${it.message}") exitProcess(1) diff --git a/weaver/samples/corda/corda-simple-application/clients/src/main/kotlin/com/cordaSimpleApplication/client/InteropManager.kt b/weaver/samples/corda/corda-simple-application/clients/src/main/kotlin/com/cordaSimpleApplication/client/InteropManager.kt index 3af05e9540e..8e311709932 100644 --- a/weaver/samples/corda/corda-simple-application/clients/src/main/kotlin/com/cordaSimpleApplication/client/InteropManager.kt +++ b/weaver/samples/corda/corda-simple-application/clients/src/main/kotlin/com/cordaSimpleApplication/client/InteropManager.kt @@ -12,17 +12,20 @@ import arrow.core.Right import com.github.ajalt.clikt.core.CliktCommand import com.github.ajalt.clikt.core.requireObject import com.github.ajalt.clikt.parameters.arguments.argument +import com.github.ajalt.clikt.parameters.options.option import io.grpc.ManagedChannelBuilder import java.lang.Exception import kotlinx.coroutines.* import net.corda.core.messaging.startFlow import java.util.* import net.corda.core.identity.Party +import net.corda.core.transactions.SignedTransaction import com.cordaSimpleApplication.flow.CreateState import com.cordaSimpleApplication.state.SimpleState import org.hyperledger.cacti.weaver.sdk.corda.InteroperableHelper +import org.hyperledger.cacti.weaver.sdk.corda.RelayOptions /** * The CLI command used to trigger a request for state from an external network. @@ -39,6 +42,7 @@ import org.hyperledger.cacti.weaver.sdk.corda.InteroperableHelper */ class RequestStateCommand : CliktCommand(help = "Requests state from a foreign network. " + "Requires the port number for the local relay and remote relay name") { + val key: String? by option("-k", "--key", help="key to store the external state") val localRelayAddress: String by argument() val externalStateAddress: String by argument() val config by requireObject>() @@ -50,32 +54,39 @@ class RequestStateCommand : CliktCommand(help = "Requests state from a foreign n password = "test", rpcPort = config["CORDA_PORT"]!!.toInt()) try { + val relayOptions = RelayOptions( + useTlsForRelay = config["RELAY_TLS"]!!.toBoolean(), + relayTlsTrustStorePath = config["RELAY_TLSCA_TRUST_STORE"]!!, + relayTlsTrustStorePassword = config["RELAY_TLSCA_TRUST_STORE_PASSWORD"]!!, + tlsCACertPathsForRelay = config["RELAY_TLSCA_CERT_PATHS"]!! + ) + val k: String = if (key != null) { + key!! + } else { + "external_state" + } + val args: List = listOf(k, arrayOf()) InteroperableHelper.interopFlow( - rpc.proxy, - localRelayAddress, - externalStateAddress, + rpc.proxy, + arrayOf(externalStateAddress), + localRelayAddress, networkName, - listOf(), - config["RELAY_TLS"]!!.toBoolean(), - config["RELAY_TLSCA_TRUST_STORE"]!!, - config["RELAY_TLSCA_TRUST_STORE_PASSWORD"]!!, - config["RELAY_TLSCA_CERT_PATHS"]!! + false, + "com.cordaSimpleApplication.flow.CreateFromExternalState", + args, + 1, + externalStateParticipants = listOf(), + relayOptions = relayOptions ).fold({ println("Error in Interop Flow: ${it.message}") }, { - val linearId = it.toString() - println("Interop flow successful and external-state was stored with linearId $linearId.\n") - val value = InteroperableHelper.getExternalStatePayloadString( - rpc.proxy, - linearId - ) - val key = externalStateAddress.split(":").last() - val createdState = rpc.proxy.startFlow(::CreateState, key, value) - .returnValue.get().tx.outputStates.first() as SimpleState - println("Created simplestate: ${createdState}") + println("Successful response: ${it}") + val stx = it as SignedTransaction + val createdState = stx.tx.outputStates.first() as SimpleState + println("Created simplestate: ${createdState}") }) } catch (e: Exception) { - println("Error: ${e.toString()}") + println("Error in request state: ${e.toString()}") } finally { rpc.close() } diff --git a/weaver/samples/corda/corda-simple-application/contracts-kotlin/build.gradle b/weaver/samples/corda/corda-simple-application/contracts-kotlin/build.gradle index 34a99f6a321..b30a10ceff0 100644 --- a/weaver/samples/corda/corda-simple-application/contracts-kotlin/build.gradle +++ b/weaver/samples/corda/corda-simple-application/contracts-kotlin/build.gradle @@ -22,6 +22,8 @@ dependencies { cordaCompile "$corda_core_release_group:corda-core:$corda_core_release_version" testCompile "$corda_release_group:corda-node-driver:$corda_release_version" + + implementation(group: 'org.hyperledger.cacti.weaver.imodule.corda', name: 'interop-contracts', version: "$cacti_version") } publishing { diff --git a/weaver/samples/corda/corda-simple-application/contracts-kotlin/src/main/kotlin/com/example/contract/SimpleContract.kt b/weaver/samples/corda/corda-simple-application/contracts-kotlin/src/main/kotlin/com/example/contract/SimpleContract.kt index e0709c0182f..98a711548bf 100644 --- a/weaver/samples/corda/corda-simple-application/contracts-kotlin/src/main/kotlin/com/example/contract/SimpleContract.kt +++ b/weaver/samples/corda/corda-simple-application/contracts-kotlin/src/main/kotlin/com/example/contract/SimpleContract.kt @@ -13,6 +13,8 @@ import net.corda.core.contracts.requireSingleCommand import net.corda.core.contracts.requireThat import net.corda.core.transactions.LedgerTransaction +import org.hyperledger.cacti.weaver.imodule.corda.states.ExternalState + /** * SimpleContract defines the rules for managing [SimpleState]. * @@ -52,6 +54,12 @@ class SimpleContract : Contract { val out = tx.outputsOfType().single() "The participant must be the signer." using (command.signers.containsAll(out.participants.map { it.owningKey })) } + is Commands.CreateFromExternal -> requireThat { + "One external state as input should be consumed when issuing a SimpleState from ExternalState." using (tx.inputsOfType().size == 1) + "Only one output state should be created." using (tx.outputs.size == 1) + val out = tx.outputsOfType().single() + "The participant must be the signer." using (command.signers.containsAll(out.participants.map { it.owningKey })) + } is Commands.Update -> requireThat { "There should be one input state" using (tx.inputs.size == 1) "The input state should be of type SimpleState" using (tx.inputs[0].state.data is SimpleState) @@ -79,6 +87,7 @@ class SimpleContract : Contract { */ interface Commands : CommandData { class Create : Commands + class CreateFromExternal: Commands class Update : Commands class Delete : Commands } diff --git a/weaver/samples/corda/corda-simple-application/workflows-kotlin/src/main/kotlin/com/example/flow/SimpleFlow.kt b/weaver/samples/corda/corda-simple-application/workflows-kotlin/src/main/kotlin/com/example/flow/SimpleFlow.kt index 03f5805a3a5..d72940f06be 100644 --- a/weaver/samples/corda/corda-simple-application/workflows-kotlin/src/main/kotlin/com/example/flow/SimpleFlow.kt +++ b/weaver/samples/corda/corda-simple-application/workflows-kotlin/src/main/kotlin/com/example/flow/SimpleFlow.kt @@ -12,6 +12,7 @@ import com.cordaSimpleApplication.state.SimpleState import javassist.NotFoundException import net.corda.core.contracts.Command import net.corda.core.contracts.UniqueIdentifier +import net.corda.core.contracts.requireThat import net.corda.core.flows.* import net.corda.core.node.services.Vault import net.corda.core.node.services.queryBy @@ -20,8 +21,15 @@ import net.corda.core.transactions.SignedTransaction import net.corda.core.transactions.TransactionBuilder import net.corda.core.utilities.ProgressTracker import net.corda.core.utilities.ProgressTracker.Step +import net.corda.core.serialization.CordaSerializable import java.util.Arrays.asList +import org.hyperledger.cacti.weaver.imodule.corda.states.ExternalState +import org.hyperledger.cacti.weaver.imodule.corda.contracts.ExternalStateContract +import org.hyperledger.cacti.weaver.imodule.corda.flows.GetExternalStateAndRefByLinearId +import org.hyperledger.cacti.weaver.imodule.corda.flows.getViewFromExternalState +import org.hyperledger.cacti.weaver.imodule.corda.flows.getPaylaodFromView + /** * The CreateState flow is used to create a new [SimpleState]. * @@ -89,6 +97,120 @@ class CreateState(val key: String, val value: String) : FlowLogic +) : FlowLogic() { + /** + * The progress tracker checkpoints each stage of the flow and outputs the specified messages when each + * checkpoint is reached in the code. See the 'progressTracker.currentStep' expressions within the call() function. + */ + companion object { + object GENERATING_TRANSACTION : Step("Generating transaction based on new simple state.") + object VERIFYING_TRANSACTION : Step("Verifying contract constraints.") + object SIGNING_TRANSACTION : Step("Signing transaction with our private key.") + object FINALISING_TRANSACTION : Step("Obtaining notary signature and recording transaction.") { + override fun childProgressTracker() = FinalityFlow.tracker() + } + fun tracker() = ProgressTracker( + GENERATING_TRANSACTION, + VERIFYING_TRANSACTION, + SIGNING_TRANSACTION, + FINALISING_TRANSACTION + ) + } + + override val progressTracker = tracker() + + /** + * The call() method captures the logic to build and sign a transaction that creates a [SimpleState]. + * + * @return returns the signed transaction. + */ + @Suspendable + override fun call(): SignedTransaction { + println("Creating state from External State...") + // Obtain a reference to the notary we want to use. + val notary = serviceHub.networkMapCache.notaryIdentities[0] + val externalStateAndRef = subFlow(GetExternalStateAndRefByLinearId(externalStateLinearIds[0].toString())) + val value = getPaylaodFromView(getViewFromExternalState(externalStateAndRef.state.data)).toString(Charsets.UTF_8) + + progressTracker.currentStep = GENERATING_TRANSACTION + // Generate an unsigned transaction. + val simpleState = SimpleState(key, value, serviceHub.myInfo.legalIdentities.first()) + println("Storing simple state in the ledger: $simpleState\n") + val txCommand = Command(SimpleContract.Commands.CreateFromExternal(), simpleState.participants.map { it.owningKey }) + val externalStateConsumeCommand = Command(ExternalStateContract.Commands.Consume(), + externalStateAndRef.state.data.participants.map { it.owningKey } + ) + val txBuilder = TransactionBuilder(notary) + .addInputState(externalStateAndRef) + .addOutputState(simpleState, SimpleContract.ID) + .addCommand(txCommand) + .addCommand(externalStateConsumeCommand) + + // Stage 2. + progressTracker.currentStep = VERIFYING_TRANSACTION + // Verify that the transaction is valid. + txBuilder.verify(serviceHub) + + // Stage 3. + progressTracker.currentStep = SIGNING_TRANSACTION + // Sign the transaction. + val signedTx = serviceHub.signInitialTransaction(txBuilder) + + // Stage 5. + progressTracker.currentStep = FINALISING_TRANSACTION + var sessions = listOf() + for (party in externalStateAndRef.state.data.participants) { + if (!ourIdentity.equals(party)) { + val session = initiateFlow(party) + sessions += session + } + } + + val sTx = subFlow(CollectSignaturesFlow(signedTx, sessions)) + + // Notarise and record the transaction in the party's vault. + return subFlow(FinalityFlow(sTx, sessions, FINALISING_TRANSACTION.childProgressTracker())) + } +} +@InitiatedBy(CreateFromExternalState::class) +class CreateFromExternalStateAcceptor(val session: FlowSession) : FlowLogic() { + @Suspendable + override fun call(): SignedTransaction { + val signTransactionFlow = object : SignTransactionFlow(session) { + override fun checkTransaction(stx: SignedTransaction) = requireThat { + val tx = stx.tx.toLedgerTransaction(serviceHub) + + val externalState = tx.inputsOfType().single() + val out = tx.outputsOfType().single() + + val externalValue = getPaylaodFromView(getViewFromExternalState(externalState)).toString(Charsets.UTF_8) + + "Value in SimpleState should match with external state" using (out.value == externalValue) + } + } + try { + val txId = subFlow(signTransactionFlow).id + println("${ourIdentity} signed transaction.") + return subFlow(ReceiveFinalityFlow(session, expectedTxId = txId)) + } catch (e: Exception) { + val errorMsg = "Error signing create from external state transaction: ${e.message}\n" + println(errorMsg) + throw Error(errorMsg) + } + } +} + /** * The UpdateState flow is used to update an existing [SimpleState]. * diff --git a/weaver/sdks/corda/github.properties.rm b/weaver/sdks/corda/github.properties.rm new file mode 100644 index 00000000000..bca79fefb1b --- /dev/null +++ b/weaver/sdks/corda/github.properties.rm @@ -0,0 +1,3 @@ +username=sandeep.nishad1@ibm.com +password=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 +url=https://maven.pkg.github.com/hyperledger/cacti \ No newline at end of file diff --git a/weaver/sdks/corda/src/main/kotlin/org/hyperledger/cacti/weaver/sdk/corda/InteroperableHelper.kt b/weaver/sdks/corda/src/main/kotlin/org/hyperledger/cacti/weaver/sdk/corda/InteroperableHelper.kt index b5f7b0b36cb..b3ebd91a0ac 100644 --- a/weaver/sdks/corda/src/main/kotlin/org/hyperledger/cacti/weaver/sdk/corda/InteroperableHelper.kt +++ b/weaver/sdks/corda/src/main/kotlin/org/hyperledger/cacti/weaver/sdk/corda/InteroperableHelper.kt @@ -29,14 +29,22 @@ import net.corda.core.messaging.startFlow import net.corda.core.messaging.CordaRPCOps import net.corda.core.identity.Party +import org.hyperledger.cacti.weaver.imodule.corda.states.InvocationSpec import org.hyperledger.cacti.weaver.imodule.corda.flows.CreateExternalRequest import org.hyperledger.cacti.weaver.imodule.corda.flows.WriteExternalStateInitiator import org.hyperledger.cacti.weaver.imodule.corda.flows.GetExternalStateByLinearId import org.hyperledger.cacti.weaver.protos.common.interop_payload.InteropPayloadOuterClass -import org.hyperledger.cacti.weaver.protos.common.state.State +import org.hyperledger.cacti.weaver.protos.common.state.State.RequestState import org.hyperledger.cacti.weaver.protos.corda.ViewDataOuterClass import org.hyperledger.cacti.weaver.protos.networks.networks.Networks +data class RelayOptions( + val useTlsForRelay: Boolean = false, + val relayTlsTrustStorePath: String = "", + val relayTlsTrustStorePassword: String = "", + val tlsCACertPathsForRelay: String = "" +) + class InteroperableHelper { companion object { private val logger = LoggerFactory.getLogger(InteroperableHelper::class.java) @@ -85,22 +93,19 @@ class InteroperableHelper { fun getChannelToRelay ( localRelayHost: String, localRelayPort: Int, - useTlsForRelay: Boolean, - relayTlsTrustStorePath: String, - relayTlsTrustStorePassword: String, - tlsCACertPathsForRelay: String + relayOptions: RelayOptions ): ManagedChannel { - if (useTlsForRelay) { + if (relayOptions.useTlsForRelay) { var trustStore: KeyStore = KeyStore.getInstance(KeyStore.getDefaultType()) - if (relayTlsTrustStorePath.length > 0) { - if (relayTlsTrustStorePassword.length == 0) { + if (relayOptions.relayTlsTrustStorePath.length > 0) { + if (relayOptions.relayTlsTrustStorePassword.length == 0) { throw Exception("Password not supplied for JKS trust store") } - val trustStream = FileInputStream(relayTlsTrustStorePath) - trustStore.load(trustStream, relayTlsTrustStorePassword.toCharArray()) - } else if (tlsCACertPathsForRelay.length > 0) { + val trustStream = FileInputStream(relayOptions.relayTlsTrustStorePath) + trustStore.load(trustStream, relayOptions.relayTlsTrustStorePassword.toCharArray()) + } else if (relayOptions.tlsCACertPathsForRelay.length > 0) { trustStore.load(null, null) - val tlsCACertPaths = tlsCACertPathsForRelay.split(":") + val tlsCACertPaths = relayOptions.tlsCACertPathsForRelay.split(":") var tlsCACertCounter = 0 for (tlsCACertPath in tlsCACertPaths) { val certFactory = CertificateFactory.getInstance("X509") @@ -146,15 +151,17 @@ class InteroperableHelper { @JvmStatic @JvmOverloads fun interopFlow ( proxy: CordaRPCOps, + externalStateAddresses: Array, localRelayEndpoint: String, - externalStateAddress: String, networkName: String, - externalStateParticipants: List = listOf() , - useTlsForRelay: Boolean = false, - relayTlsTrustStorePath: String = "", - relayTlsTrustStorePassword: String = "", - tlsCACertPathsForRelay: String = "" - ): Either { + returnWithoutLocalInvocation: Boolean = true, + invokeFlowName: String = "", + invokeFlowArgs: List = listOf(), + interopArgsIndex: Int = -1, + externalStateParticipants: List = listOf(), + relayOptions: RelayOptions = RelayOptions() + ): Either { + // Create Relay client instance val localRelayHost = localRelayEndpoint.split(":").first() val localRelayPort = localRelayEndpoint.split(":").last().toInt() var channel: ManagedChannel @@ -162,38 +169,76 @@ class InteroperableHelper { channel = getChannelToRelay( localRelayHost, localRelayPort, - useTlsForRelay, - relayTlsTrustStorePath, - relayTlsTrustStorePassword, - tlsCACertPathsForRelay) + relayOptions) } catch(e: Exception) { logger.error("Error creating channel to relay: ${e.message}\n") return Left(Error("Error creating channel to relay: ${e.message}\n")) } val client = RelayClient(channel) + var responseStates = Array(externalStateAddresses.size) { null } + var responseError: Error? = null + + // Fetch remote views + runBlocking { + coroutineScope { // limits the scope of concurrency + externalStateAddresses.mapIndexed { index, externalStateAddress -> + async { // async means "concurrently", context goes here + getRemoteView( + proxy, + networkName, + externalStateAddress, + client + ).fold({ + logger.error("Error in Interop Flow for address ${externalStateAddress}: ${it.message}\n") + responseError = Error("Error in Interop Flow for address ${externalStateAddress}: ${it.message}\n") + },{ state -> + responseStates[index] = state + }) + } + }.awaitAll() // waits all of them + } + } + + if (responseError is Error){ + return Left(responseError!!) + } + + // Write external state + val views64 = responseStates.mapNotNull { + Base64.getEncoder().encodeToString(it!!.view.toByteArray()) + }.toTypedArray() + + return writeExternalStateToVault( + proxy, + views64, + externalStateAddresses, + externalStateParticipants, + returnWithoutLocalInvocation, + invokeFlowName, + invokeFlowArgs, + interopArgsIndex + ) + } + + fun getRemoteView( + proxy: CordaRPCOps, + networkName: String, + externalStateAddress: String, + client: RelayClient + ): Either { val eitherErrorQuery = constructNetworkQuery(proxy, externalStateAddress, networkName) logger.debug("Corda network returned: $eitherErrorQuery \n") - val result = eitherErrorQuery.fold({ + return eitherErrorQuery.fold({ logger.error("error ${it}") Left(it) }, { networkQuery -> logger.debug("Network query: $networkQuery") var eitherErrorResult = runBlocking { val ack = async { client.requestState(networkQuery) }.await() - pollForState(ack.requestId, client).fold({ - logger.error("Error in Interop Flow: ${it.message}\n") - Left(Error("Error in Interop Flow: ${it.message}\n")) - }, { state -> - writeExternalStateToVault( - proxy, - state, - externalStateAddress, - externalStateParticipants) - }) + pollForState(ack.requestId, client) } eitherErrorResult }) - return result } /** @@ -337,7 +382,7 @@ class InteroperableHelper { * Will have status "PENDING", "PENDING_ACK", "COMPLETED" or "ERROR". * @return Returns the request state when it has status "COMPLETED" or "ERROR". */ - suspend fun pollForState(requestId: String, client: RelayClient, retryCount: Int = 0): Either = coroutineScope { + suspend fun pollForState(requestId: String, client: RelayClient, retryCount: Int = 0): Either = coroutineScope { val timeout = 30 val delayTime = 500L val num = 30*1000/delayTime @@ -374,22 +419,45 @@ class InteroperableHelper { */ fun writeExternalStateToVault( proxy: CordaRPCOps, - requestState: State.RequestState, - address: String, - externalStateParticipants: List = listOf() - ): Either { + views64: Array, + addresses: Array, + externalStateParticipants: List = listOf(), + returnWithoutLocalInvocation: Boolean = true, + invokeFlowName: String = "", + invokeFlowArgs: List = listOf(), + interopArgsIndex: Int = -1 + ): Either { return try { logger.debug("Sending response to Corda for view verification.\n") - val stateId = runCatching { - val viewBase64String = Base64.getEncoder().encodeToString(requestState.view.toByteArray()) - proxy.startFlow(::WriteExternalStateInitiator, viewBase64String, address, externalStateParticipants) - .returnValue.get() - }.fold({ - it.map { linearId -> - logger.debug("Verification was successful and external-state was stored with linearId $linearId.\n") - linearId.toString() - } + val invokeObject = InvocationSpec( + disableInvocation = returnWithoutLocalInvocation, + invokeFlowName = invokeFlowName, + invokeFlowArgs = invokeFlowArgs, + interopArgsIndex = interopArgsIndex + ) + logger.debug("Invoke Object: ${invokeObject}") + val stateId: Either = runCatching { + proxy.startFlow(::WriteExternalStateInitiator, + views64, + addresses, + invokeObject, + externalStateParticipants + ).returnValue.get() + }.fold({ + it.fold({ + logger.error("WriteExternalStateInitiator flow error: ${it.message}\n") + Left(Error("WriteExternalStateInitiator flow error: ${it.message}\n")) + }, { result: Any -> + if (returnWithoutLocalInvocation) { + logger.debug("Verification was successful and external-state was stored with array of linearIds $result.\n") + Right(result.toString()) + } else { + logger.debug("Verification was successful and called flow: $invokeFlowName with result: $result.\n") + Right(result) + } + }) }, { + logger.error("Corda Network Error: Error running WriteExternalStateInitiator flow: ${it.message}\n") Left(Error("Corda Network Error: Error running WriteExternalStateInitiator flow: ${it.message}\n")) }) stateId diff --git a/weaver/tests/network-setups/corda/makefile b/weaver/tests/network-setups/corda/makefile index edfe9902cd3..2f241071770 100644 --- a/weaver/tests/network-setups/corda/makefile +++ b/weaver/tests/network-setups/corda/makefile @@ -60,6 +60,9 @@ restart-with-new-interop-app: stop cd ../../../core/network/corda-interop-app && ./gradlew clean jar ./scripts/get-cordapps.sh $(APP_NAME) local ./scripts/start-nodes.sh $(APP_NAME) $(PROFILE) + ./scripts/start-nodes.sh $(APP_NAME) $(PROFILE) Corda_Network2 + ./scripts/check-nodes-status.sh $(PROFILE) + ./scripts/check-nodes-status.sh $(PROFILE) Corda_Network2 .PHONY: stop-network1 stop-network1: