From 90c18cb9c1a0ecc09a6df273ce961f234a3c6153 Mon Sep 17 00:00:00 2001 From: ndr_brt Date: Tue, 14 May 2024 08:36:44 +0200 Subject: [PATCH] build: bump EDC to 0.6.4 (#259) * build: bump EDC to 0.6.4 * refactor participant --- .../resources/start-transfer.json | 4 +- .../edc/sample/runtime/CustomRuntime.java | 19 +- build.gradle.kts | 7 - gradle/libs.versions.toml | 3 +- .../policy/PolicyFunctionsExtension.java | 4 +- system-tests/build.gradle.kts | 1 + .../advanced/Advanced01openTelemetryTest.java | 4 +- .../transfer/Transfer02consumerPullTest.java | 2 +- .../transfer/Transfer03providerPushTest.java | 2 +- .../transfer/Transfer04eventConsumerTest.java | 2 +- .../transfer/streaming/Participant.java | 450 ------------------ .../streaming/Streaming01httpToHttpTest.java | 27 +- .../streaming/Streaming02KafkaToHttpTest.java | 26 +- .../Streaming03KafkaToKafkaTest.java | 26 +- .../streaming/StreamingParticipant.java | 119 +++++ .../edc/samples/util/TransferUtil.java | 2 +- .../http/HttpStreamingDataSourceFactory.java | 6 +- .../edc/samples/streaming/KafkaExtension.java | 2 +- .../KafkaToKafkaDataFlowController.java | 10 +- .../resources/start-transfer.json | 4 +- .../resources/start-transfer.json | 1 + transfer/transfer-04-event-consumer/README.md | 2 +- .../TransferProcessStartedListener.java | 4 +- ...ansferProcessStartedListenerExtension.java | 2 +- .../transfer/CloudTransferExtension.java | 14 +- 25 files changed, 203 insertions(+), 540 deletions(-) delete mode 100644 system-tests/src/test/java/org/eclipse/edc/samples/transfer/streaming/Participant.java create mode 100644 system-tests/src/test/java/org/eclipse/edc/samples/transfer/streaming/StreamingParticipant.java diff --git a/advanced/advanced-01-open-telemetry/resources/start-transfer.json b/advanced/advanced-01-open-telemetry/resources/start-transfer.json index 171cc529..1dc97872 100644 --- a/advanced/advanced-01-open-telemetry/resources/start-transfer.json +++ b/advanced/advanced-01-open-telemetry/resources/start-transfer.json @@ -8,7 +8,5 @@ "contractId": "{{contract-agreement-id}}", "assetId": "assetId", "protocol": "dataspace-protocol-http", - "dataDestination": { - "type": "HttpProxy" - } + "transferType": "HttpData-PULL" } diff --git a/advanced/advanced-02-custom-runtime/src/main/java/org/eclipse/edc/sample/runtime/CustomRuntime.java b/advanced/advanced-02-custom-runtime/src/main/java/org/eclipse/edc/sample/runtime/CustomRuntime.java index f1e23189..3f5d209b 100644 --- a/advanced/advanced-02-custom-runtime/src/main/java/org/eclipse/edc/sample/runtime/CustomRuntime.java +++ b/advanced/advanced-02-custom-runtime/src/main/java/org/eclipse/edc/sample/runtime/CustomRuntime.java @@ -17,12 +17,10 @@ import org.eclipse.edc.boot.system.DefaultServiceExtensionContext; import org.eclipse.edc.boot.system.runtime.BaseRuntime; import org.eclipse.edc.spi.monitor.Monitor; -import org.eclipse.edc.spi.system.ConfigurationExtension; import org.eclipse.edc.spi.system.ServiceExtensionContext; +import org.eclipse.edc.spi.system.configuration.Config; import org.jetbrains.annotations.NotNull; -import java.util.List; - public class CustomRuntime extends BaseRuntime { /** @@ -34,18 +32,13 @@ public static void main(String[] args) { } @Override - protected String getRuntimeName(ServiceExtensionContext context) { - return "CUSTOM-RUNTIME"; - } - - @Override - protected @NotNull ServiceExtensionContext createContext(Monitor monitor) { + protected @NotNull ServiceExtensionContext createContext(Monitor monitor, Config config) { //override the default service extension context with a super customized one - return new SuperCustomExtensionContext(monitor, loadConfigurationExtensions()); + return new SuperCustomExtensionContext(monitor, config); } @Override - protected void shutdown() { + public void shutdown() { super.shutdown(); //this is the custom part here: @@ -53,8 +46,8 @@ protected void shutdown() { } private static class SuperCustomExtensionContext extends DefaultServiceExtensionContext { - SuperCustomExtensionContext(Monitor monitor, List configurationExtensions) { - super(monitor, configurationExtensions); + SuperCustomExtensionContext(Monitor monitor, Config config) { + super(monitor, config); } } } diff --git a/build.gradle.kts b/build.gradle.kts index 1915c302..8b773e65 100644 --- a/build.gradle.kts +++ b/build.gradle.kts @@ -47,13 +47,6 @@ allprojects { configDirectory.set(rootProject.file("resources")) } - // EdcRuntimeExtension uses this to determine the runtime classpath of the module to run. - tasks.register("printClasspath") { - doLast { - println(sourceSets["main"].runtimeClasspath.asPath) - } - } - tasks.test { testLogging { showStandardStreams = true diff --git a/gradle/libs.versions.toml b/gradle/libs.versions.toml index 43ef1314..51326c15 100644 --- a/gradle/libs.versions.toml +++ b/gradle/libs.versions.toml @@ -4,7 +4,7 @@ format.version = "1.1" [versions] assertj = "3.25.3" awaitility = "4.2.1" -edc = "0.6.0" +edc = "0.6.4" jakarta-json = "2.0.1" junit-pioneer = "2.2.0" jupiter = "5.10.2" @@ -49,6 +49,7 @@ edc-json-ld-lib = { module = "org.eclipse.edc:json-ld-lib", version.ref = "edc" edc-json-ld-spi = { module = "org.eclipse.edc:json-ld-spi", version.ref = "edc" } edc-junit = { module = "org.eclipse.edc:junit", version.ref = "edc" } edc-management-api = { module = "org.eclipse.edc:management-api", version.ref = "edc" } +edc-management-api-test-fixtures = { module = "org.eclipse.edc:management-api-test-fixtures", version.ref = "edc" } edc-micrometer-core = { module = "org.eclipse.edc:micrometer-core", version.ref = "edc" } edc-monitor-jdk-logger = { module = "org.eclipse.edc:monitor-jdk-logger", version.ref = "edc" } edc-provision-aws-s3 = { module = "org.eclipse.edc:provision-aws-s3", version.ref = "edc" } diff --git a/policy/policy-01-policy-enforcement/policy-functions/src/main/java/org/eclipse/edc/sample/extension/policy/PolicyFunctionsExtension.java b/policy/policy-01-policy-enforcement/policy-functions/src/main/java/org/eclipse/edc/sample/extension/policy/PolicyFunctionsExtension.java index 85881a7f..0d1018d7 100644 --- a/policy/policy-01-policy-enforcement/policy-functions/src/main/java/org/eclipse/edc/sample/extension/policy/PolicyFunctionsExtension.java +++ b/policy/policy-01-policy-enforcement/policy-functions/src/main/java/org/eclipse/edc/sample/extension/policy/PolicyFunctionsExtension.java @@ -21,10 +21,10 @@ import org.eclipse.edc.spi.system.ServiceExtension; import org.eclipse.edc.spi.system.ServiceExtensionContext; -import static org.eclipse.edc.connector.contract.spi.validation.ContractValidationService.NEGOTIATION_SCOPE; +import static org.eclipse.edc.connector.controlplane.contract.spi.validation.ContractValidationService.NEGOTIATION_SCOPE; import static org.eclipse.edc.jsonld.spi.PropertyAndTypeNames.ODRL_USE_ACTION_ATTRIBUTE; import static org.eclipse.edc.policy.engine.spi.PolicyEngine.ALL_SCOPES; -import static org.eclipse.edc.spi.CoreConstants.EDC_NAMESPACE; +import static org.eclipse.edc.spi.constants.CoreConstants.EDC_NAMESPACE; public class PolicyFunctionsExtension implements ServiceExtension { private static final String LOCATION_CONSTRAINT_KEY = EDC_NAMESPACE + "location"; diff --git a/system-tests/build.gradle.kts b/system-tests/build.gradle.kts index 3f924704..82cabece 100644 --- a/system-tests/build.gradle.kts +++ b/system-tests/build.gradle.kts @@ -21,6 +21,7 @@ dependencies { testImplementation(libs.edc.json.ld.lib) testImplementation(libs.edc.json.ld.spi) testImplementation(libs.edc.control.plane.spi) + testImplementation(testFixtures(libs.edc.management.api.test.fixtures)) testImplementation(libs.awaitility) testImplementation(libs.okhttp.mockwebserver) testImplementation(libs.restAssured) diff --git a/system-tests/src/test/java/org/eclipse/edc/samples/advanced/Advanced01openTelemetryTest.java b/system-tests/src/test/java/org/eclipse/edc/samples/advanced/Advanced01openTelemetryTest.java index 39cec4e0..6566766e 100644 --- a/system-tests/src/test/java/org/eclipse/edc/samples/advanced/Advanced01openTelemetryTest.java +++ b/system-tests/src/test/java/org/eclipse/edc/samples/advanced/Advanced01openTelemetryTest.java @@ -16,7 +16,6 @@ package org.eclipse.edc.samples.advanced; import org.apache.http.HttpStatus; -import org.eclipse.edc.connector.transfer.spi.types.TransferProcessStates; import org.eclipse.edc.junit.annotations.EndToEndTest; import org.junit.jupiter.api.BeforeAll; import org.junit.jupiter.api.Test; @@ -31,6 +30,7 @@ import static org.assertj.core.api.Assertions.assertThat; import static org.assertj.core.api.Assertions.fail; +import static org.eclipse.edc.connector.controlplane.transfer.spi.types.TransferProcessStates.STARTED; import static org.eclipse.edc.samples.common.FileTransferCommon.getFileContentFromRelativePath; import static org.eclipse.edc.samples.common.FileTransferCommon.getFileFromRelativePath; import static org.eclipse.edc.samples.common.NegotiationCommon.createAsset; @@ -75,7 +75,7 @@ void runSampleSteps() { var contractNegotiationId = negotiateContract(NEGOTIATE_CONTRACT_FILE_PATH, catalogDatasetId); var contractAgreementId = getContractAgreementId(contractNegotiationId); var transferProcessId = startTransfer(getFileContentFromRelativePath(START_TRANSFER_FILE_PATH), contractAgreementId); - checkTransferStatus(transferProcessId, TransferProcessStates.STARTED); + checkTransferStatus(transferProcessId, STARTED); assertJaegerState(); } diff --git a/system-tests/src/test/java/org/eclipse/edc/samples/transfer/Transfer02consumerPullTest.java b/system-tests/src/test/java/org/eclipse/edc/samples/transfer/Transfer02consumerPullTest.java index 8d7210f1..1ba92337 100644 --- a/system-tests/src/test/java/org/eclipse/edc/samples/transfer/Transfer02consumerPullTest.java +++ b/system-tests/src/test/java/org/eclipse/edc/samples/transfer/Transfer02consumerPullTest.java @@ -16,7 +16,7 @@ package org.eclipse.edc.samples.transfer; import org.apache.http.HttpStatus; -import org.eclipse.edc.connector.transfer.spi.types.TransferProcessStates; +import org.eclipse.edc.connector.controlplane.transfer.spi.types.TransferProcessStates; import org.eclipse.edc.junit.annotations.EndToEndTest; import org.eclipse.edc.junit.extensions.EdcRuntimeExtension; import org.eclipse.edc.samples.util.HttpRequestLoggerConsumer; diff --git a/system-tests/src/test/java/org/eclipse/edc/samples/transfer/Transfer03providerPushTest.java b/system-tests/src/test/java/org/eclipse/edc/samples/transfer/Transfer03providerPushTest.java index a607c159..02e7af66 100644 --- a/system-tests/src/test/java/org/eclipse/edc/samples/transfer/Transfer03providerPushTest.java +++ b/system-tests/src/test/java/org/eclipse/edc/samples/transfer/Transfer03providerPushTest.java @@ -15,7 +15,7 @@ package org.eclipse.edc.samples.transfer; -import org.eclipse.edc.connector.transfer.spi.types.TransferProcessStates; +import org.eclipse.edc.connector.controlplane.transfer.spi.types.TransferProcessStates; import org.eclipse.edc.junit.annotations.EndToEndTest; import org.eclipse.edc.junit.extensions.EdcRuntimeExtension; import org.eclipse.edc.samples.util.HttpRequestLoggerConsumer; diff --git a/system-tests/src/test/java/org/eclipse/edc/samples/transfer/Transfer04eventConsumerTest.java b/system-tests/src/test/java/org/eclipse/edc/samples/transfer/Transfer04eventConsumerTest.java index 60f42781..93c5c792 100644 --- a/system-tests/src/test/java/org/eclipse/edc/samples/transfer/Transfer04eventConsumerTest.java +++ b/system-tests/src/test/java/org/eclipse/edc/samples/transfer/Transfer04eventConsumerTest.java @@ -15,7 +15,7 @@ package org.eclipse.edc.samples.transfer; -import org.eclipse.edc.connector.transfer.spi.types.TransferProcessStates; +import org.eclipse.edc.connector.controlplane.transfer.spi.types.TransferProcessStates; import org.eclipse.edc.junit.annotations.EndToEndTest; import org.eclipse.edc.junit.extensions.EdcRuntimeExtension; import org.eclipse.edc.samples.util.HttpRequestLoggerContainer; diff --git a/system-tests/src/test/java/org/eclipse/edc/samples/transfer/streaming/Participant.java b/system-tests/src/test/java/org/eclipse/edc/samples/transfer/streaming/Participant.java deleted file mode 100644 index 7d6b7be4..00000000 --- a/system-tests/src/test/java/org/eclipse/edc/samples/transfer/streaming/Participant.java +++ /dev/null @@ -1,450 +0,0 @@ -/* - * Copyright (c) 2023 Bayerische Motoren Werke Aktiengesellschaft (BMW AG) - * - * This program and the accompanying materials are made available under the - * terms of the Apache License, Version 2.0 which is available at - * https://www.apache.org/licenses/LICENSE-2.0 - * - * SPDX-License-Identifier: Apache-2.0 - * - * Contributors: - * Bayerische Motoren Werke Aktiengesellschaft (BMW AG) - initial API and implementation - * - */ - -package org.eclipse.edc.samples.transfer.streaming; - -import com.fasterxml.jackson.databind.ObjectMapper; -import io.restassured.specification.RequestSpecification; -import jakarta.json.Json; -import jakarta.json.JsonArray; -import jakarta.json.JsonObject; -import org.eclipse.edc.jsonld.TitaniumJsonLd; -import org.eclipse.edc.jsonld.spi.JsonLd; -import org.eclipse.edc.jsonld.util.JacksonJsonLd; -import org.eclipse.edc.spi.EdcException; -import org.eclipse.edc.spi.monitor.ConsoleMonitor; - -import java.net.URI; -import java.time.Duration; -import java.util.HashMap; -import java.util.List; -import java.util.Map; -import java.util.Objects; -import java.util.UUID; -import java.util.concurrent.atomic.AtomicReference; - -import static io.restassured.RestAssured.given; -import static io.restassured.http.ContentType.JSON; -import static jakarta.json.Json.createArrayBuilder; -import static jakarta.json.Json.createObjectBuilder; -import static org.assertj.core.api.Assertions.assertThat; -import static org.awaitility.Awaitility.await; -import static org.eclipse.edc.connector.contract.spi.types.negotiation.ContractNegotiationStates.FINALIZED; -import static org.eclipse.edc.jsonld.spi.JsonLdKeywords.CONTEXT; -import static org.eclipse.edc.jsonld.spi.JsonLdKeywords.ID; -import static org.eclipse.edc.jsonld.spi.JsonLdKeywords.TYPE; -import static org.eclipse.edc.jsonld.spi.PropertyAndTypeNames.DCAT_DATASET_ATTRIBUTE; -import static org.eclipse.edc.jsonld.spi.PropertyAndTypeNames.ODRL_ASSIGNER_ATTRIBUTE; -import static org.eclipse.edc.jsonld.spi.PropertyAndTypeNames.ODRL_POLICY_ATTRIBUTE; -import static org.eclipse.edc.jsonld.spi.PropertyAndTypeNames.ODRL_TARGET_ATTRIBUTE; -import static org.eclipse.edc.spi.CoreConstants.EDC_NAMESPACE; -import static org.eclipse.edc.spi.CoreConstants.EDC_PREFIX; - -/** - * Essentially a wrapper around the management API enabling to test interactions with other participants, eg. catalog, transfer... - */ -public class Participant { - - private static final String DSP_PROTOCOL = "dataspace-protocol-http"; - private static final Duration TIMEOUT = Duration.ofSeconds(30); - - protected String id; - protected String name; - protected Endpoint managementEndpoint; - protected Endpoint protocolEndpoint; - protected Endpoint controlEndpoint; - protected JsonLd jsonLd; - protected ObjectMapper objectMapper; - - protected Participant() { - } - - public String getName() { - return name; - } - - public void registerDataPlane(List sourceTypes, List destinationTypes) { - var jsonObject = Json.createObjectBuilder() - .add(CONTEXT, createObjectBuilder().add(EDC_PREFIX, EDC_NAMESPACE)) - .add(ID, UUID.randomUUID().toString()) - .add(EDC_NAMESPACE + "url", controlEndpoint.url + "/transfer") - .add(EDC_NAMESPACE + "allowedSourceTypes", createArrayBuilder(sourceTypes)) - .add(EDC_NAMESPACE + "allowedDestTypes", createArrayBuilder(destinationTypes)) - .build(); - - managementEndpoint.baseRequest() - .contentType(JSON) - .body(jsonObject.toString()) - .when() - .post("/v2/dataplanes") - .then() - .statusCode(200); - } - - public String createAsset(String requestBody) { - return managementEndpoint.baseRequest() - .contentType(JSON) - .body(requestBody) - .when() - .post("/v3/assets") - .then() - .statusCode(200) - .contentType(JSON) - .extract().jsonPath().getString(ID); - } - - public String createPolicyDefinition(String requestBody) { - return managementEndpoint.baseRequest() - .contentType(JSON) - .body(requestBody) - .when() - .post("/v2/policydefinitions") - .then() - .statusCode(200) - .contentType(JSON) - .extract().jsonPath().getString(ID); - } - - public String createContractDefinition(String requestBody) { - return managementEndpoint.baseRequest() - .contentType(JSON) - .body(requestBody) - .when() - .post("/v2/contractdefinitions") - .then() - .statusCode(200) - .extract().jsonPath().getString(ID); - } - - /** - * Request provider catalog. - * - * @param provider data provider - * @return list of {@link org.eclipse.edc.catalog.spi.Dataset}. - */ - public JsonArray getCatalogDatasets(Participant provider) { - var datasetReference = new AtomicReference(); - var requestBody = createObjectBuilder() - .add(CONTEXT, createObjectBuilder().add(EDC_PREFIX, EDC_NAMESPACE)) - .add(TYPE, "CatalogRequest") - .add("counterPartyAddress", provider.protocolEndpoint.url.toString()) - .add("protocol", DSP_PROTOCOL) - .build(); - - await().atMost(TIMEOUT).untilAsserted(() -> { - var response = managementEndpoint.baseRequest() - .contentType(JSON) - .when() - .body(requestBody) - .post("/v2/catalog/request") - .then() - .log().ifValidationFails() - .statusCode(200) - .extract().body().asString(); - - var responseBody = objectMapper.readValue(response, JsonObject.class); - - var catalog = jsonLd.expand(responseBody).orElseThrow(f -> new EdcException(f.getFailureDetail())); - - var datasets = catalog.getJsonArray(DCAT_DATASET_ATTRIBUTE); - assertThat(datasets).hasSizeGreaterThan(0); - - datasetReference.set(datasets); - }); - - return datasetReference.get(); - } - - /** - * Get first {@link org.eclipse.edc.catalog.spi.Dataset} from provider matching the given asset id. - * - * @param provider data provider - * @param assetId asset id - * @return dataset. - */ - public JsonObject getDatasetForAsset(Participant provider, String assetId) { - var datasetReference = new AtomicReference(); - var requestBody = createObjectBuilder() - .add(CONTEXT, createObjectBuilder().add(EDC_PREFIX, EDC_NAMESPACE)) - .add(TYPE, "DatasetRequest") - .add(ID, assetId) - .add("counterPartyAddress", provider.protocolEndpoint.url.toString()) - .add("protocol", DSP_PROTOCOL) - .build(); - - await().atMost(TIMEOUT).untilAsserted(() -> { - var response = managementEndpoint.baseRequest() - .contentType(JSON) - .when() - .body(requestBody) - .post("/v2/catalog/dataset/request") - .then() - .log().ifValidationFails() - .statusCode(200) - .extract().body().asString(); - - var compacted = objectMapper.readValue(response, JsonObject.class); - - var dataset = jsonLd.expand(compacted).orElseThrow(f -> new EdcException(f.getFailureDetail())); - - datasetReference.set(dataset); - }); - - return datasetReference.get(); - } - - /** - * Initiate negotiation with a provider. - * - * @param provider data provider - * @param offer the contract offer - * @return id of the contract agreement. - */ - public String negotiateContract(Participant provider, JsonObject offer) { - var requestBody = createObjectBuilder() - .add(CONTEXT, createObjectBuilder().add(EDC_PREFIX, EDC_NAMESPACE)) - .add(TYPE, "ContractRequestDto") - .add("providerId", provider.id) - .add("counterPartyAddress", provider.protocolEndpoint.url.toString()) - .add("protocol", DSP_PROTOCOL) - .add("policy", jsonLd.compact(offer).getContent()) - .build(); - - var negotiationId = managementEndpoint.baseRequest() - .contentType(JSON) - .body(requestBody) - .when() - .post("/v2/contractnegotiations") - .then() - .log().ifValidationFails() - .statusCode(200) - .extract().body().jsonPath().getString(ID); - - await().atMost(TIMEOUT).untilAsserted(() -> { - var state = getContractNegotiationState(negotiationId); - assertThat(state).isEqualTo(FINALIZED.name()); - }); - - return getContractAgreementId(negotiationId); - } - - /** - * Initiate data transfer. - * - * @param provider data provider - * @param contractAgreementId contract agreement id - * @param assetId asset id - * @param privateProperties private properties - * @param destination data destination address - * @return id of the transfer process. - */ - public String initiateTransfer(Participant provider, String contractAgreementId, String assetId, JsonObject privateProperties, JsonObject destination) { - var requestBody = createObjectBuilder() - .add(CONTEXT, createObjectBuilder().add(EDC_PREFIX, EDC_NAMESPACE)) - .add(TYPE, "TransferRequest") - .add("dataDestination", destination) - .add("protocol", DSP_PROTOCOL) - .add("assetId", assetId) - .add("contractId", contractAgreementId) - .add("connectorId", provider.id) - .add("counterPartyAddress", provider.protocolEndpoint.url.toString()) - .add("privateProperties", privateProperties) - .build(); - - return managementEndpoint.baseRequest() - .contentType(JSON) - .body(requestBody) - .when() - .post("/v2/transferprocesses") - .then() - .log().ifError() - .statusCode(200) - .extract().body().jsonPath().getString(ID); - } - - /** - * Request a provider asset: - * - retrieves the contract definition associated with the asset, - * - handles the contract negotiation, - * - initiate the data transfer. - * - * @param provider data provider - * @param assetId asset id - * @param privateProperties private properties of the data request - * @param destination data destination - * @return transfer process id. - */ - public String requestAsset(Participant provider, String assetId, JsonObject privateProperties, JsonObject destination) { - var offer = getOfferForAsset(provider, assetId); - - var contractAgreementId = negotiateContract(provider, offer); - var transferProcessId = initiateTransfer(provider, contractAgreementId, assetId, privateProperties, destination); - assertThat(transferProcessId).isNotNull(); - return transferProcessId; - } - - private JsonObject getOfferForAsset(Participant provider, String assetId) { - var dataset = getDatasetForAsset(provider, assetId); - var policy = dataset.getJsonArray(ODRL_POLICY_ATTRIBUTE).get(0).asJsonObject(); - return createObjectBuilder(policy) - .add(ODRL_ASSIGNER_ATTRIBUTE, createObjectBuilder().add(ID, provider.id)) - .add(ODRL_TARGET_ATTRIBUTE, createObjectBuilder().add(ID, dataset.get(ID))) - .build(); - } - - /** - * Get current state of a transfer process. - * - * @param id transfer process id - * @return state of the transfer process. - */ - public String getTransferProcessState(String id) { - return managementEndpoint.baseRequest() - .contentType(JSON) - .when() - .get("/v2/transferprocesses/{id}/state", id) - .then() - .statusCode(200) - .extract().body().jsonPath().getString("state"); - } - - private String getContractNegotiationState(String id) { - return managementEndpoint.baseRequest() - .contentType(JSON) - .when() - .get("/v2/contractnegotiations/{id}/state", id) - .then() - .statusCode(200) - .extract().body().jsonPath().getString("state"); - } - - - private String getContractAgreementId(String negotiationId) { - var contractAgreementIdAtomic = new AtomicReference(); - - await().atMost(TIMEOUT).untilAsserted(() -> { - var agreementId = getContractNegotiationField(negotiationId, "contractAgreementId"); - assertThat(agreementId).isNotNull().isInstanceOf(String.class); - - contractAgreementIdAtomic.set(agreementId); - }); - - var contractAgreementId = contractAgreementIdAtomic.get(); - assertThat(id).isNotEmpty(); - return contractAgreementId; - } - - private String getContractNegotiationField(String negotiationId, String fieldName) { - return managementEndpoint.baseRequest() - .contentType(JSON) - .when() - .get("/v2/contractnegotiations/{id}", negotiationId) - .then() - .statusCode(200) - .extract().body().jsonPath() - .getString(fieldName); - } - - /** - * Represent an endpoint exposed by a {@link Participant}. - */ - public static class Endpoint { - private final URI url; - private final Map headers; - - public Endpoint(URI url) { - this.url = url; - this.headers = new HashMap<>(); - } - - public Endpoint(URI url, Map headers) { - this.url = url; - this.headers = headers; - } - - public RequestSpecification baseRequest() { - return given().baseUri(url.toString()).headers(headers); - } - - public URI getUrl() { - return url; - } - } - - public static class Builder

> { - protected final P participant; - - protected Builder(P participant) { - this.participant = participant; - } - - public static > Builder newInstance() { - return new Builder<>(new Participant()); - } - - public B id(String id) { - participant.id = id; - return self(); - } - - public B name(String name) { - participant.name = name; - return self(); - } - - public B managementEndpoint(Endpoint managementEndpoint) { - participant.managementEndpoint = managementEndpoint; - return self(); - } - - public B protocolEndpoint(Endpoint protocolEndpoint) { - participant.protocolEndpoint = protocolEndpoint; - return self(); - } - - public B controlEndpoint(Endpoint controlEndpoint) { - participant.controlEndpoint = controlEndpoint; - return self(); - } - - public B jsonLd(JsonLd jsonLd) { - participant.jsonLd = jsonLd; - return self(); - } - - public B objectMapper(ObjectMapper objectMapper) { - participant.objectMapper = objectMapper; - return self(); - } - - public Participant build() { - Objects.requireNonNull(participant.id, "id"); - Objects.requireNonNull(participant.name, "name"); - Objects.requireNonNull(participant.managementEndpoint, "managementEndpoint"); - Objects.requireNonNull(participant.protocolEndpoint, "protocolEndpoint"); - if (participant.jsonLd == null) { - participant.jsonLd = new TitaniumJsonLd(new ConsoleMonitor()); - } - if (participant.objectMapper == null) { - participant.objectMapper = JacksonJsonLd.createObjectMapper(); - } - return participant; - } - - @SuppressWarnings("unchecked") - private B self() { - return (B) this; - } - } -} diff --git a/system-tests/src/test/java/org/eclipse/edc/samples/transfer/streaming/Streaming01httpToHttpTest.java b/system-tests/src/test/java/org/eclipse/edc/samples/transfer/streaming/Streaming01httpToHttpTest.java index 1699c8d2..7f59cb28 100644 --- a/system-tests/src/test/java/org/eclipse/edc/samples/transfer/streaming/Streaming01httpToHttpTest.java +++ b/system-tests/src/test/java/org/eclipse/edc/samples/transfer/streaming/Streaming01httpToHttpTest.java @@ -36,7 +36,7 @@ import static java.util.concurrent.TimeUnit.MILLISECONDS; import static org.assertj.core.api.Assertions.assertThat; import static org.awaitility.Awaitility.await; -import static org.eclipse.edc.connector.transfer.spi.types.TransferProcessStates.STARTED; +import static org.eclipse.edc.connector.controlplane.transfer.spi.types.TransferProcessStates.STARTED; import static org.eclipse.edc.samples.common.FileTransferCommon.getFileContentFromRelativePath; import static org.eclipse.edc.samples.common.FileTransferCommon.getFileFromRelativePath; @@ -46,20 +46,20 @@ public class Streaming01httpToHttpTest { private static final String SAMPLE_FOLDER = "transfer/streaming/streaming-01-http-to-http"; private static final Duration TIMEOUT = Duration.ofSeconds(30); - private static final Participant PROVIDER = Participant.Builder.newInstance() + private static final StreamingParticipant PROVIDER = StreamingParticipant.Builder.newStreamingInstance() .name("provider") .id("provider") - .managementEndpoint(new Participant.Endpoint(URI.create("http://localhost:18181/management"))) - .protocolEndpoint(new Participant.Endpoint(URI.create("http://localhost:18182/protocol"))) - .controlEndpoint(new Participant.Endpoint(URI.create("http://localhost:18183/control"))) + .managementEndpoint(new StreamingParticipant.Endpoint(URI.create("http://localhost:18181/management"))) + .protocolEndpoint(new StreamingParticipant.Endpoint(URI.create("http://localhost:18182/protocol"))) + .controlEndpoint(new StreamingParticipant.Endpoint(URI.create("http://localhost:18183/control"))) .build(); - private static final Participant CONSUMER = Participant.Builder.newInstance() + private static final StreamingParticipant CONSUMER = StreamingParticipant.Builder.newStreamingInstance() .name("consumer") .id("consumer") - .managementEndpoint(new Participant.Endpoint(URI.create("http://localhost:28181/management"))) - .protocolEndpoint(new Participant.Endpoint(URI.create("http://localhost:28182/protocol"))) - .controlEndpoint(new Participant.Endpoint(URI.create("http://localhost:28183/control"))) + .managementEndpoint(new StreamingParticipant.Endpoint(URI.create("http://localhost:28181/management"))) + .protocolEndpoint(new StreamingParticipant.Endpoint(URI.create("http://localhost:28182/protocol"))) + .controlEndpoint(new StreamingParticipant.Endpoint(URI.create("http://localhost:28183/control"))) .build(); @RegisterExtension @@ -90,7 +90,7 @@ void setUp() throws IOException { @Test void streamData() throws IOException { var source = Files.createTempDirectory("source"); - PROVIDER.registerDataPlane(List.of("HttpStreaming"), List.of("HttpData")); + PROVIDER.registerDataPlane(List.of("HttpStreaming"), List.of("HttpData"), List.of("HttpData-PUSH")); PROVIDER.createAsset(getFileContentFromRelativePath(SAMPLE_FOLDER + "/asset.json") .replace("{{sourceFolder}}", source.toString())); @@ -101,10 +101,13 @@ void streamData() throws IOException { .add("type", "HttpData") .add("baseUrl", "http://localhost:" + httpReceiverPort) .build(); - var transferProcessId = CONSUMER.requestAsset(PROVIDER, "stream-asset", Json.createObjectBuilder().build(), destination); + var transferProcessId = CONSUMER.requestAssetFrom("stream-asset", PROVIDER) + .withDestination(destination) + .withTransferType("HttpData-PUSH") + .execute(); await().atMost(TIMEOUT).untilAsserted(() -> { - String state = CONSUMER.getTransferProcessState(transferProcessId); + var state = CONSUMER.getTransferProcessState(transferProcessId); assertThat(state).isEqualTo(STARTED.name()); }); diff --git a/system-tests/src/test/java/org/eclipse/edc/samples/transfer/streaming/Streaming02KafkaToHttpTest.java b/system-tests/src/test/java/org/eclipse/edc/samples/transfer/streaming/Streaming02KafkaToHttpTest.java index 266cc7ad..383b0014 100644 --- a/system-tests/src/test/java/org/eclipse/edc/samples/transfer/streaming/Streaming02KafkaToHttpTest.java +++ b/system-tests/src/test/java/org/eclipse/edc/samples/transfer/streaming/Streaming02KafkaToHttpTest.java @@ -43,7 +43,7 @@ import static java.util.concurrent.TimeUnit.MICROSECONDS; import static org.assertj.core.api.Assertions.assertThat; import static org.awaitility.Awaitility.await; -import static org.eclipse.edc.connector.transfer.spi.types.TransferProcessStates.STARTED; +import static org.eclipse.edc.connector.controlplane.transfer.spi.types.TransferProcessStates.STARTED; import static org.eclipse.edc.samples.common.FileTransferCommon.getFileContentFromRelativePath; import static org.eclipse.edc.samples.common.FileTransferCommon.getFileFromRelativePath; @@ -56,19 +56,19 @@ public class Streaming02KafkaToHttpTest { private static final String MAX_DURATION = "PT30S"; private static final String SAMPLE_FOLDER = "transfer/streaming/streaming-02-kafka-to-http"; private static final Duration TIMEOUT = Duration.ofSeconds(30); - private static final Participant PROVIDER = Participant.Builder.newInstance() + private static final StreamingParticipant PROVIDER = StreamingParticipant.Builder.newStreamingInstance() .name("provider") .id("provider") - .managementEndpoint(new Participant.Endpoint(URI.create("http://localhost:18181/management"))) - .protocolEndpoint(new Participant.Endpoint(URI.create("http://localhost:18182/protocol"))) - .controlEndpoint(new Participant.Endpoint(URI.create("http://localhost:18183/control"))) + .managementEndpoint(new StreamingParticipant.Endpoint(URI.create("http://localhost:18181/management"))) + .protocolEndpoint(new StreamingParticipant.Endpoint(URI.create("http://localhost:18182/protocol"))) + .controlEndpoint(new StreamingParticipant.Endpoint(URI.create("http://localhost:18183/control"))) .build(); - private static final Participant CONSUMER = Participant.Builder.newInstance() + private static final StreamingParticipant CONSUMER = StreamingParticipant.Builder.newStreamingInstance() .name("consumer") .id("consumer") - .managementEndpoint(new Participant.Endpoint(URI.create("http://localhost:28181/management"))) - .protocolEndpoint(new Participant.Endpoint(URI.create("http://localhost:28182/protocol"))) - .controlEndpoint(new Participant.Endpoint(URI.create("http://localhost:28183/control"))) + .managementEndpoint(new StreamingParticipant.Endpoint(URI.create("http://localhost:28181/management"))) + .protocolEndpoint(new StreamingParticipant.Endpoint(URI.create("http://localhost:28182/protocol"))) + .controlEndpoint(new StreamingParticipant.Endpoint(URI.create("http://localhost:28183/control"))) .build(); @Container @@ -109,7 +109,7 @@ void setUp() throws IOException { @Test void streamData() { - PROVIDER.registerDataPlane(List.of("Kafka"), List.of("HttpData")); + PROVIDER.registerDataPlane(List.of("Kafka"), List.of("HttpData"), List.of("HttpData-PUSH")); PROVIDER.createAsset(getFileContentFromRelativePath(SAMPLE_FOLDER + "/1-asset.json") .replace("{{bootstrap.servers}}", kafkaContainer.getBootstrapServers()) @@ -124,8 +124,10 @@ void streamData() { .add("baseUrl", "http://localhost:" + httpReceiverPort) .build(); - var transferProcessId = CONSUMER.requestAsset(PROVIDER, "kafka-stream-asset", - Json.createObjectBuilder().build(), destination); + var transferProcessId = CONSUMER.requestAssetFrom("kafka-stream-asset", PROVIDER) + .withDestination(destination) + .withTransferType("HttpData-PUSH") + .execute(); await().atMost(TIMEOUT).untilAsserted(() -> { var state = CONSUMER.getTransferProcessState(transferProcessId); diff --git a/system-tests/src/test/java/org/eclipse/edc/samples/transfer/streaming/Streaming03KafkaToKafkaTest.java b/system-tests/src/test/java/org/eclipse/edc/samples/transfer/streaming/Streaming03KafkaToKafkaTest.java index 5f2882a5..0b8bf86a 100644 --- a/system-tests/src/test/java/org/eclipse/edc/samples/transfer/streaming/Streaming03KafkaToKafkaTest.java +++ b/system-tests/src/test/java/org/eclipse/edc/samples/transfer/streaming/Streaming03KafkaToKafkaTest.java @@ -69,10 +69,10 @@ import static java.util.concurrent.TimeUnit.SECONDS; import static org.assertj.core.api.Assertions.assertThat; import static org.awaitility.Awaitility.await; -import static org.eclipse.edc.connector.transfer.spi.types.TransferProcessStates.STARTED; +import static org.eclipse.edc.connector.controlplane.transfer.spi.types.TransferProcessStates.STARTED; import static org.eclipse.edc.samples.common.FileTransferCommon.getFileContentFromRelativePath; import static org.eclipse.edc.samples.common.FileTransferCommon.getFileFromRelativePath; -import static org.eclipse.edc.spi.CoreConstants.EDC_NAMESPACE; +import static org.eclipse.edc.spi.constants.CoreConstants.EDC_NAMESPACE; @Testcontainers @EndToEndTest @@ -84,19 +84,19 @@ public class Streaming03KafkaToKafkaTest { private static final Path SAMPLE_FOLDER = Path.of("transfer", "streaming", SAMPLE_NAME); private static final Path RUNTIME_PATH = SAMPLE_FOLDER.resolve(RUNTIME_NAME); private static final Duration TIMEOUT = Duration.ofSeconds(60); - private static final Participant PROVIDER = Participant.Builder.newInstance() + private static final StreamingParticipant PROVIDER = StreamingParticipant.Builder.newStreamingInstance() .name("provider") .id("provider") - .managementEndpoint(new Participant.Endpoint(URI.create("http://localhost:18181/management"))) - .protocolEndpoint(new Participant.Endpoint(URI.create("http://localhost:18182/protocol"))) - .controlEndpoint(new Participant.Endpoint(URI.create("http://localhost:18183/control"))) + .managementEndpoint(new StreamingParticipant.Endpoint(URI.create("http://localhost:18181/management"))) + .protocolEndpoint(new StreamingParticipant.Endpoint(URI.create("http://localhost:18182/protocol"))) + .controlEndpoint(new StreamingParticipant.Endpoint(URI.create("http://localhost:18183/control"))) .build(); - private static final Participant CONSUMER = Participant.Builder.newInstance() + private static final StreamingParticipant CONSUMER = StreamingParticipant.Builder.newStreamingInstance() .name("consumer") .id("consumer") - .managementEndpoint(new Participant.Endpoint(URI.create("http://localhost:28181/management"))) - .protocolEndpoint(new Participant.Endpoint(URI.create("http://localhost:28182/protocol"))) - .controlEndpoint(new Participant.Endpoint(URI.create("http://localhost:28183/control"))) + .managementEndpoint(new StreamingParticipant.Endpoint(URI.create("http://localhost:28181/management"))) + .protocolEndpoint(new StreamingParticipant.Endpoint(URI.create("http://localhost:28182/protocol"))) + .controlEndpoint(new StreamingParticipant.Endpoint(URI.create("http://localhost:28183/control"))) .build(); private static final String GROUP_ID = "group_id"; @@ -155,7 +155,11 @@ void streamData() throws InterruptedException, JsonProcessingException { var transferProcessPrivateProperties = Json.createObjectBuilder() .add("receiverHttpEndpoint", "http://localhost:" + httpReceiverPort) .build(); - var transferProcessId = CONSUMER.requestAsset(PROVIDER, "kafka-stream-asset", transferProcessPrivateProperties, destination); + var transferProcessId = CONSUMER.requestAssetFrom("kafka-stream-asset", PROVIDER) + .withPrivateProperties(transferProcessPrivateProperties) + .withDestination(destination) + .withTransferType("KafkaBroker-PULL") + .execute(); await().atMost(TIMEOUT).untilAsserted(() -> { var state = CONSUMER.getTransferProcessState(transferProcessId); diff --git a/system-tests/src/test/java/org/eclipse/edc/samples/transfer/streaming/StreamingParticipant.java b/system-tests/src/test/java/org/eclipse/edc/samples/transfer/streaming/StreamingParticipant.java new file mode 100644 index 00000000..274473d3 --- /dev/null +++ b/system-tests/src/test/java/org/eclipse/edc/samples/transfer/streaming/StreamingParticipant.java @@ -0,0 +1,119 @@ +/* + * Copyright (c) 2023 Bayerische Motoren Werke Aktiengesellschaft (BMW AG) + * + * This program and the accompanying materials are made available under the + * terms of the Apache License, Version 2.0 which is available at + * https://www.apache.org/licenses/LICENSE-2.0 + * + * SPDX-License-Identifier: Apache-2.0 + * + * Contributors: + * Bayerische Motoren Werke Aktiengesellschaft (BMW AG) - initial API and implementation + * + */ + +package org.eclipse.edc.samples.transfer.streaming; + +import jakarta.json.Json; +import org.eclipse.edc.connector.controlplane.test.system.utils.Participant; + +import java.util.List; +import java.util.UUID; + +import static io.restassured.http.ContentType.JSON; +import static jakarta.json.Json.createArrayBuilder; +import static jakarta.json.Json.createObjectBuilder; +import static org.eclipse.edc.jsonld.spi.JsonLdKeywords.CONTEXT; +import static org.eclipse.edc.jsonld.spi.JsonLdKeywords.ID; +import static org.eclipse.edc.spi.constants.CoreConstants.EDC_NAMESPACE; +import static org.eclipse.edc.spi.constants.CoreConstants.EDC_PREFIX; + +/** + * Essentially a wrapper around the management API enabling to test interactions with other participants, eg. catalog, transfer... + */ +public class StreamingParticipant extends Participant { + + protected Endpoint controlEndpoint; + + protected StreamingParticipant() { + } + + public String getName() { + return name; + } + + public void registerDataPlane(List sourceTypes, List destinationTypes, List transferTypes) { + var jsonObject = Json.createObjectBuilder() + .add(CONTEXT, createObjectBuilder().add(EDC_PREFIX, EDC_NAMESPACE)) + .add(ID, UUID.randomUUID().toString()) + .add(EDC_NAMESPACE + "url", controlEndpoint.getUrl() + "/transfer") + .add(EDC_NAMESPACE + "allowedSourceTypes", createArrayBuilder(sourceTypes)) + .add(EDC_NAMESPACE + "allowedDestTypes", createArrayBuilder(destinationTypes)) + .add(EDC_NAMESPACE + "allowedTransferTypes", createArrayBuilder(transferTypes)) + .build(); + + managementEndpoint.baseRequest() + .contentType(JSON) + .body(jsonObject.toString()) + .when() + .post("/v2/dataplanes") + .then() + .statusCode(200); + } + + public String createAsset(String requestBody) { + return managementEndpoint.baseRequest() + .contentType(JSON) + .body(requestBody) + .when() + .post("/v3/assets") + .then() + .statusCode(200) + .contentType(JSON) + .extract().jsonPath().getString(ID); + } + + public String createPolicyDefinition(String requestBody) { + return managementEndpoint.baseRequest() + .contentType(JSON) + .body(requestBody) + .when() + .post("/v2/policydefinitions") + .then() + .statusCode(200) + .contentType(JSON) + .extract().jsonPath().getString(ID); + } + + public String createContractDefinition(String requestBody) { + return managementEndpoint.baseRequest() + .contentType(JSON) + .body(requestBody) + .when() + .post("/v2/contractdefinitions") + .then() + .statusCode(200) + .extract().jsonPath().getString(ID); + } + + public static class Builder

> extends Participant.Builder { + + protected Builder(P participant) { + super(participant); + } + + public static > Builder newStreamingInstance() { + return new Builder<>(new StreamingParticipant()); + } + + public B controlEndpoint(Endpoint controlEndpoint) { + participant.controlEndpoint = controlEndpoint; + return self(); + } + + @Override + public StreamingParticipant build() { + return (StreamingParticipant) super.build(); + } + } +} diff --git a/system-tests/src/test/java/org/eclipse/edc/samples/util/TransferUtil.java b/system-tests/src/test/java/org/eclipse/edc/samples/util/TransferUtil.java index 769ed50c..4f443640 100644 --- a/system-tests/src/test/java/org/eclipse/edc/samples/util/TransferUtil.java +++ b/system-tests/src/test/java/org/eclipse/edc/samples/util/TransferUtil.java @@ -17,7 +17,7 @@ import io.restassured.http.ContentType; import org.apache.http.HttpStatus; -import org.eclipse.edc.connector.transfer.spi.types.TransferProcessStates; +import org.eclipse.edc.connector.controlplane.transfer.spi.types.TransferProcessStates; import java.time.Duration; diff --git a/transfer/streaming/streaming-01-http-to-http/streaming-01-runtime/src/main/java/org/eclipse/edc/samples/transfer/streaming/http/HttpStreamingDataSourceFactory.java b/transfer/streaming/streaming-01-http-to-http/streaming-01-runtime/src/main/java/org/eclipse/edc/samples/transfer/streaming/http/HttpStreamingDataSourceFactory.java index 0fbb3d92..93dc0a4d 100644 --- a/transfer/streaming/streaming-01-http-to-http/streaming-01-runtime/src/main/java/org/eclipse/edc/samples/transfer/streaming/http/HttpStreamingDataSourceFactory.java +++ b/transfer/streaming/streaming-01-http-to-http/streaming-01-runtime/src/main/java/org/eclipse/edc/samples/transfer/streaming/http/HttpStreamingDataSourceFactory.java @@ -14,9 +14,9 @@ package org.eclipse.edc.samples.transfer.streaming.http; +import org.eclipse.edc.connector.controlplane.transfer.spi.types.TransferProcess; import org.eclipse.edc.connector.dataplane.spi.pipeline.DataSource; import org.eclipse.edc.connector.dataplane.spi.pipeline.DataSourceFactory; -import org.eclipse.edc.connector.transfer.spi.types.TransferProcess; import org.eclipse.edc.spi.result.Result; import org.eclipse.edc.spi.types.domain.transfer.DataFlowStartMessage; import org.jetbrains.annotations.NotNull; @@ -31,8 +31,8 @@ public class HttpStreamingDataSourceFactory implements DataSourceFactory { @Override - public boolean canHandle(DataFlowStartMessage dataFlowStartMessage) { - return dataFlowStartMessage.getSourceDataAddress().getType().equals("HttpStreaming"); + public String supportedType() { + return "HttpStreaming"; } @Override diff --git a/transfer/streaming/streaming-03-kafka-broker/streaming-03-runtime/src/main/java/org/eclipse/edc/samples/streaming/KafkaExtension.java b/transfer/streaming/streaming-03-kafka-broker/streaming-03-runtime/src/main/java/org/eclipse/edc/samples/streaming/KafkaExtension.java index 7657cfb2..4a93b703 100644 --- a/transfer/streaming/streaming-03-kafka-broker/streaming-03-runtime/src/main/java/org/eclipse/edc/samples/streaming/KafkaExtension.java +++ b/transfer/streaming/streaming-03-kafka-broker/streaming-03-runtime/src/main/java/org/eclipse/edc/samples/streaming/KafkaExtension.java @@ -14,7 +14,7 @@ package org.eclipse.edc.samples.streaming; -import org.eclipse.edc.connector.transfer.spi.flow.DataFlowManager; +import org.eclipse.edc.connector.controlplane.transfer.spi.flow.DataFlowManager; import org.eclipse.edc.runtime.metamodel.annotation.Inject; import org.eclipse.edc.spi.system.ServiceExtension; import org.eclipse.edc.spi.system.ServiceExtensionContext; diff --git a/transfer/streaming/streaming-03-kafka-broker/streaming-03-runtime/src/main/java/org/eclipse/edc/samples/streaming/KafkaToKafkaDataFlowController.java b/transfer/streaming/streaming-03-kafka-broker/streaming-03-runtime/src/main/java/org/eclipse/edc/samples/streaming/KafkaToKafkaDataFlowController.java index ca659483..fb8a7ce9 100644 --- a/transfer/streaming/streaming-03-kafka-broker/streaming-03-runtime/src/main/java/org/eclipse/edc/samples/streaming/KafkaToKafkaDataFlowController.java +++ b/transfer/streaming/streaming-03-kafka-broker/streaming-03-runtime/src/main/java/org/eclipse/edc/samples/streaming/KafkaToKafkaDataFlowController.java @@ -14,14 +14,14 @@ package org.eclipse.edc.samples.streaming; -import org.eclipse.edc.connector.transfer.spi.flow.DataFlowController; -import org.eclipse.edc.connector.transfer.spi.types.DataFlowResponse; -import org.eclipse.edc.connector.transfer.spi.types.TransferProcess; +import org.eclipse.edc.connector.controlplane.asset.spi.domain.Asset; +import org.eclipse.edc.connector.controlplane.transfer.spi.flow.DataFlowController; +import org.eclipse.edc.connector.controlplane.transfer.spi.types.DataFlowResponse; +import org.eclipse.edc.connector.controlplane.transfer.spi.types.TransferProcess; import org.eclipse.edc.dataaddress.kafka.spi.KafkaDataAddressSchema; import org.eclipse.edc.policy.model.Policy; import org.eclipse.edc.spi.response.StatusResult; import org.eclipse.edc.spi.types.domain.DataAddress; -import org.eclipse.edc.spi.types.domain.asset.Asset; import org.eclipse.edc.spi.types.domain.edr.EndpointDataReference; import org.jetbrains.annotations.NotNull; @@ -33,7 +33,7 @@ class KafkaToKafkaDataFlowController implements DataFlowController { @Override public boolean canHandle(TransferProcess transferProcess) { - return KAFKA_TYPE.equals(transferProcess.getContentDataAddress().getType()) && "KafkaBroker".equals(transferProcess.getTransferType()); + return KAFKA_TYPE.equals(transferProcess.getContentDataAddress().getType()) && "KafkaBroker-PULL".equals(transferProcess.getTransferType()); } @Override diff --git a/transfer/transfer-02-consumer-pull/resources/start-transfer.json b/transfer/transfer-02-consumer-pull/resources/start-transfer.json index 48a2cc31..c9959b7b 100644 --- a/transfer/transfer-02-consumer-pull/resources/start-transfer.json +++ b/transfer/transfer-02-consumer-pull/resources/start-transfer.json @@ -8,7 +8,5 @@ "contractId": "{{contract-agreement-id}}", "assetId": "assetId", "protocol": "dataspace-protocol-http", - "dataDestination": { - "type": "HttpProxy" - } + "transferType": "HttpData-PULL" } diff --git a/transfer/transfer-03-provider-push/resources/start-transfer.json b/transfer/transfer-03-provider-push/resources/start-transfer.json index 65fe1541..51e6706b 100644 --- a/transfer/transfer-03-provider-push/resources/start-transfer.json +++ b/transfer/transfer-03-provider-push/resources/start-transfer.json @@ -8,6 +8,7 @@ "contractId": "{{contract-agreement-id}}", "assetId": "assetId", "protocol": "dataspace-protocol-http", + "transferType": "HttpData-PUSH", "dataDestination": { "type": "HttpData", "baseUrl": "http://localhost:4000/api/consumer/store" diff --git a/transfer/transfer-04-event-consumer/README.md b/transfer/transfer-04-event-consumer/README.md index 3358da19..ede0926c 100644 --- a/transfer/transfer-04-event-consumer/README.md +++ b/transfer/transfer-04-event-consumer/README.md @@ -101,7 +101,7 @@ The consumer should spew out logs similar to: ```bash DEBUG 2023-10-16T09:29:45.316908 [TransferProcessManagerImpl] TransferProcess 762b5a0c-43fb-4b8b-8022-669043c8fa81 is now in state REQUESTED -DEBUG 2023-10-16T09:29:46.269998 DSP: Incoming TransferStartMessage for class org.eclipse.edc.connector.transfer.spi.types.TransferProcess process: 762b5a0c-43fb-4b8b-8022-669043c8fa81 +DEBUG 2023-10-16T09:29:46.269998 DSP: Incoming TransferStartMessage for class org.eclipse.edc.connector.controlplane.transfer.spi.types.TransferProcess process: 762b5a0c-43fb-4b8b-8022-669043c8fa81 DEBUG 2023-10-16T09:29:46.271592 TransferProcessStartedListener received STARTED event <---------------------------- DEBUG 2023-10-16T09:29:46.27174 TransferProcess 762b5a0c-43fb-4b8b-8022-669043c8fa81 is now in state STARTED ``` diff --git a/transfer/transfer-04-event-consumer/listener/src/main/java/org/eclipse/edc/sample/extension/listener/TransferProcessStartedListener.java b/transfer/transfer-04-event-consumer/listener/src/main/java/org/eclipse/edc/sample/extension/listener/TransferProcessStartedListener.java index 515a9e26..acc94405 100644 --- a/transfer/transfer-04-event-consumer/listener/src/main/java/org/eclipse/edc/sample/extension/listener/TransferProcessStartedListener.java +++ b/transfer/transfer-04-event-consumer/listener/src/main/java/org/eclipse/edc/sample/extension/listener/TransferProcessStartedListener.java @@ -14,8 +14,8 @@ package org.eclipse.edc.sample.extension.listener; -import org.eclipse.edc.connector.transfer.spi.observe.TransferProcessListener; -import org.eclipse.edc.connector.transfer.spi.types.TransferProcess; +import org.eclipse.edc.connector.controlplane.transfer.spi.observe.TransferProcessListener; +import org.eclipse.edc.connector.controlplane.transfer.spi.types.TransferProcess; import org.eclipse.edc.spi.monitor.Monitor; public class TransferProcessStartedListener implements TransferProcessListener { diff --git a/transfer/transfer-04-event-consumer/listener/src/main/java/org/eclipse/edc/sample/extension/listener/TransferProcessStartedListenerExtension.java b/transfer/transfer-04-event-consumer/listener/src/main/java/org/eclipse/edc/sample/extension/listener/TransferProcessStartedListenerExtension.java index a2e5f47e..96ab60d1 100644 --- a/transfer/transfer-04-event-consumer/listener/src/main/java/org/eclipse/edc/sample/extension/listener/TransferProcessStartedListenerExtension.java +++ b/transfer/transfer-04-event-consumer/listener/src/main/java/org/eclipse/edc/sample/extension/listener/TransferProcessStartedListenerExtension.java @@ -14,7 +14,7 @@ package org.eclipse.edc.sample.extension.listener; -import org.eclipse.edc.connector.transfer.spi.observe.TransferProcessObservable; +import org.eclipse.edc.connector.controlplane.transfer.spi.observe.TransferProcessObservable; import org.eclipse.edc.spi.system.ServiceExtension; import org.eclipse.edc.spi.system.ServiceExtensionContext; diff --git a/transfer/transfer-05-file-transfer-cloud/transfer-file-cloud/src/main/java/org/eclipse/edc/sample/extension/transfer/CloudTransferExtension.java b/transfer/transfer-05-file-transfer-cloud/transfer-file-cloud/src/main/java/org/eclipse/edc/sample/extension/transfer/CloudTransferExtension.java index accb343d..78d29cda 100644 --- a/transfer/transfer-05-file-transfer-cloud/transfer-file-cloud/src/main/java/org/eclipse/edc/sample/extension/transfer/CloudTransferExtension.java +++ b/transfer/transfer-05-file-transfer-cloud/transfer-file-cloud/src/main/java/org/eclipse/edc/sample/extension/transfer/CloudTransferExtension.java @@ -14,19 +14,19 @@ package org.eclipse.edc.sample.extension.transfer; -import org.eclipse.edc.connector.contract.spi.offer.store.ContractDefinitionStore; -import org.eclipse.edc.connector.contract.spi.types.offer.ContractDefinition; -import org.eclipse.edc.connector.policy.spi.PolicyDefinition; -import org.eclipse.edc.connector.policy.spi.store.PolicyDefinitionStore; +import org.eclipse.edc.connector.controlplane.asset.spi.domain.Asset; +import org.eclipse.edc.connector.controlplane.asset.spi.index.AssetIndex; +import org.eclipse.edc.connector.controlplane.contract.spi.offer.store.ContractDefinitionStore; +import org.eclipse.edc.connector.controlplane.contract.spi.types.offer.ContractDefinition; +import org.eclipse.edc.connector.controlplane.policy.spi.PolicyDefinition; +import org.eclipse.edc.connector.controlplane.policy.spi.store.PolicyDefinitionStore; import org.eclipse.edc.policy.model.Action; import org.eclipse.edc.policy.model.Permission; import org.eclipse.edc.policy.model.Policy; import org.eclipse.edc.runtime.metamodel.annotation.Inject; -import org.eclipse.edc.spi.asset.AssetIndex; import org.eclipse.edc.spi.system.ServiceExtension; import org.eclipse.edc.spi.system.ServiceExtensionContext; import org.eclipse.edc.spi.types.domain.DataAddress; -import org.eclipse.edc.spi.types.domain.asset.Asset; import static org.eclipse.edc.spi.query.Criterion.criterion; @@ -49,7 +49,7 @@ public void initialize(ServiceExtensionContext context) { policyDefinitionStore.create(policy); registerDataEntries(); - registerContractDefinition(policy.getUid()); + registerContractDefinition(policy.getId()); } public void registerDataEntries() {