diff --git a/gradle/libs.versions.toml b/gradle/libs.versions.toml index 7a0f925c..f255523a 100644 --- a/gradle/libs.versions.toml +++ b/gradle/libs.versions.toml @@ -45,7 +45,8 @@ edc-http = { module = "org.eclipse.edc:http", version.ref = "edc" } edc-iam-mock = { module = "org.eclipse.edc:iam-mock", version.ref = "edc" } edc-jersey-micrometer = { module = "org.eclipse.edc:jersey-micrometer", version.ref = "edc" } edc-jetty-micrometer = { module = "org.eclipse.edc:jetty-micrometer", version.ref = "edc" } -edc-json-ld = { module = "org.eclipse.edc:json-ld", version.ref = "edc" } +edc-json-ld-lib = { module = "org.eclipse.edc:json-ld-lib", version.ref = "edc" } +edc-json-ld-spi = { module = "org.eclipse.edc:json-ld-spi", version.ref = "edc" } edc-junit = { module = "org.eclipse.edc:junit", version.ref = "edc" } edc-management-api = { module = "org.eclipse.edc:management-api", version.ref = "edc" } edc-micrometer-core = { module = "org.eclipse.edc:micrometer-core", version.ref = "edc" } diff --git a/system-tests/build.gradle.kts b/system-tests/build.gradle.kts index 30fad202..3f924704 100644 --- a/system-tests/build.gradle.kts +++ b/system-tests/build.gradle.kts @@ -18,7 +18,8 @@ plugins { dependencies { testImplementation(libs.edc.junit) - testImplementation(libs.edc.json.ld) + testImplementation(libs.edc.json.ld.lib) + testImplementation(libs.edc.json.ld.spi) testImplementation(libs.edc.control.plane.spi) testImplementation(libs.awaitility) testImplementation(libs.okhttp.mockwebserver) diff --git a/system-tests/src/test/java/org/eclipse/edc/samples/transfer/streaming/Streaming01httpToHttpTest.java b/system-tests/src/test/java/org/eclipse/edc/samples/transfer/streaming/Streaming01httpToHttpTest.java index 12b70552..1699c8d2 100644 --- a/system-tests/src/test/java/org/eclipse/edc/samples/transfer/streaming/Streaming01httpToHttpTest.java +++ b/system-tests/src/test/java/org/eclipse/edc/samples/transfer/streaming/Streaming01httpToHttpTest.java @@ -18,7 +18,7 @@ import okhttp3.mockwebserver.MockWebServer; import org.eclipse.edc.junit.annotations.EndToEndTest; import org.eclipse.edc.junit.extensions.EdcRuntimeExtension; -import org.eclipse.edc.junit.testfixtures.TestUtils; +import org.eclipse.edc.util.io.Ports; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; import org.junit.jupiter.api.extension.RegisterExtension; @@ -79,7 +79,7 @@ public class Streaming01httpToHttpTest { "edc.fs.config", getFileFromRelativePath(SAMPLE_FOLDER + "/streaming-01-runtime/consumer.properties").getAbsolutePath() ) ); - private final int httpReceiverPort = TestUtils.getFreePort(); + private final int httpReceiverPort = Ports.getFreePort(); private final MockWebServer consumerReceiverServer = new MockWebServer(); @BeforeEach diff --git a/system-tests/src/test/java/org/eclipse/edc/samples/transfer/streaming/Streaming02KafkaToHttpTest.java b/system-tests/src/test/java/org/eclipse/edc/samples/transfer/streaming/Streaming02KafkaToHttpTest.java index de876440..266cc7ad 100644 --- a/system-tests/src/test/java/org/eclipse/edc/samples/transfer/streaming/Streaming02KafkaToHttpTest.java +++ b/system-tests/src/test/java/org/eclipse/edc/samples/transfer/streaming/Streaming02KafkaToHttpTest.java @@ -23,7 +23,7 @@ import org.apache.kafka.common.serialization.StringSerializer; import org.eclipse.edc.junit.annotations.EndToEndTest; import org.eclipse.edc.junit.extensions.EdcRuntimeExtension; -import org.eclipse.edc.junit.testfixtures.TestUtils; +import org.eclipse.edc.util.io.Ports; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; import org.junit.jupiter.api.extension.RegisterExtension; @@ -98,7 +98,7 @@ public class Streaming02KafkaToHttpTest { .getAbsolutePath() ) ); - private final int httpReceiverPort = TestUtils.getFreePort(); + private final int httpReceiverPort = Ports.getFreePort(); private final MockWebServer consumerReceiverServer = new MockWebServer(); @BeforeEach @@ -128,7 +128,7 @@ void streamData() { Json.createObjectBuilder().build(), destination); await().atMost(TIMEOUT).untilAsserted(() -> { - String state = CONSUMER.getTransferProcessState(transferProcessId); + var state = CONSUMER.getTransferProcessState(transferProcessId); assertThat(state).isEqualTo(STARTED.name()); }); diff --git a/system-tests/src/test/java/org/eclipse/edc/samples/transfer/streaming/Streaming03KafkaToKafkaTest.java b/system-tests/src/test/java/org/eclipse/edc/samples/transfer/streaming/Streaming03KafkaToKafkaTest.java index 2416592f..5f2882a5 100644 --- a/system-tests/src/test/java/org/eclipse/edc/samples/transfer/streaming/Streaming03KafkaToKafkaTest.java +++ b/system-tests/src/test/java/org/eclipse/edc/samples/transfer/streaming/Streaming03KafkaToKafkaTest.java @@ -40,8 +40,8 @@ import org.apache.kafka.common.serialization.StringSerializer; import org.eclipse.edc.junit.annotations.EndToEndTest; import org.eclipse.edc.junit.extensions.EdcRuntimeExtension; -import org.eclipse.edc.junit.testfixtures.TestUtils; import org.eclipse.edc.spi.types.domain.edr.EndpointDataReference; +import org.eclipse.edc.util.io.Ports; import org.jetbrains.annotations.NotNull; import org.jetbrains.annotations.Nullable; import org.junit.jupiter.api.BeforeEach; @@ -126,7 +126,7 @@ public class Streaming03KafkaToKafkaTest { ) ); - private final int httpReceiverPort = TestUtils.getFreePort(); + private final int httpReceiverPort = Ports.getFreePort(); private final MockWebServer edrReceiverServer = new MockWebServer(); @BeforeEach diff --git a/transfer/streaming/streaming-01-http-to-http/streaming-01-runtime/src/main/java/org/eclipse/edc/samples/transfer/streaming/http/HttpStreamingDataSourceFactory.java b/transfer/streaming/streaming-01-http-to-http/streaming-01-runtime/src/main/java/org/eclipse/edc/samples/transfer/streaming/http/HttpStreamingDataSourceFactory.java index 45c2af8c..0fbb3d92 100644 --- a/transfer/streaming/streaming-01-http-to-http/streaming-01-runtime/src/main/java/org/eclipse/edc/samples/transfer/streaming/http/HttpStreamingDataSourceFactory.java +++ b/transfer/streaming/streaming-01-http-to-http/streaming-01-runtime/src/main/java/org/eclipse/edc/samples/transfer/streaming/http/HttpStreamingDataSourceFactory.java @@ -18,7 +18,7 @@ import org.eclipse.edc.connector.dataplane.spi.pipeline.DataSourceFactory; import org.eclipse.edc.connector.transfer.spi.types.TransferProcess; import org.eclipse.edc.spi.result.Result; -import org.eclipse.edc.spi.types.domain.transfer.DataFlowRequest; +import org.eclipse.edc.spi.types.domain.transfer.DataFlowStartMessage; import org.jetbrains.annotations.NotNull; import java.io.File; @@ -31,28 +31,27 @@ public class HttpStreamingDataSourceFactory implements DataSourceFactory { @Override - public boolean canHandle(DataFlowRequest request) { - return request.getSourceDataAddress().getType().equals("HttpStreaming"); + public boolean canHandle(DataFlowStartMessage dataFlowStartMessage) { + return dataFlowStartMessage.getSourceDataAddress().getType().equals("HttpStreaming"); } @Override - public DataSource createSource(DataFlowRequest request) { - return new HttpStreamingDataSource(sourceFolder(request).get()); + public DataSource createSource(DataFlowStartMessage dataFlowStartMessage) { + return new HttpStreamingDataSource(sourceFolder(dataFlowStartMessage).get()); } @Override - public @NotNull Result validateRequest(DataFlowRequest request) { - return sourceFolder(request) + public @NotNull Result validateRequest(DataFlowStartMessage dataFlowStartMessage) { + return sourceFolder(dataFlowStartMessage) .map(it -> Result.success()) .orElseGet(() -> Result.failure("sourceFolder is not found or it does not exist")); } - private Optional sourceFolder(DataFlowRequest request) { + private Optional sourceFolder(DataFlowStartMessage request) { return Optional.of(request) - .map(DataFlowRequest::getSourceDataAddress) + .map(DataFlowStartMessage::getSourceDataAddress) .map(it -> it.getStringProperty("sourceFolder")) .map(File::new) .filter(File::exists); } - } diff --git a/transfer/streaming/streaming-03-kafka-broker/streaming-03-runtime/src/main/java/org/eclipse/edc/samples/streaming/KafkaToKafkaDataFlowController.java b/transfer/streaming/streaming-03-kafka-broker/streaming-03-runtime/src/main/java/org/eclipse/edc/samples/streaming/KafkaToKafkaDataFlowController.java index a10b4b7c..7569b0c9 100644 --- a/transfer/streaming/streaming-03-kafka-broker/streaming-03-runtime/src/main/java/org/eclipse/edc/samples/streaming/KafkaToKafkaDataFlowController.java +++ b/transfer/streaming/streaming-03-kafka-broker/streaming-03-runtime/src/main/java/org/eclipse/edc/samples/streaming/KafkaToKafkaDataFlowController.java @@ -38,7 +38,7 @@ public boolean canHandle(TransferProcess transferProcess) { } @Override - public @NotNull StatusResult initiateFlow(TransferProcess transferProcess, Policy policy) { + public @NotNull StatusResult start(TransferProcess transferProcess, Policy policy) { // static credentials, in a production case these should be created dynamically and an ACLs entry should be added var username = "alice"; var password = "alice-secret"; @@ -57,6 +57,12 @@ public boolean canHandle(TransferProcess transferProcess) { return StatusResult.success(DataFlowResponse.Builder.newInstance().dataAddress(kafkaDataAddress).build()); } + @Override + public StatusResult suspend(TransferProcess transferProcess) { + // here the flow can be suspended, not something covered in this sample + return StatusResult.success(); + } + @Override public StatusResult terminate(TransferProcess transferProcess) { // here the flow can be terminated, not something covered in this sample diff --git a/transfer/transfer-01-negotiation/resources/create-policy.json b/transfer/transfer-01-negotiation/resources/create-policy.json index 4aab1e42..cf7a4703 100644 --- a/transfer/transfer-01-negotiation/resources/create-policy.json +++ b/transfer/transfer-01-negotiation/resources/create-policy.json @@ -5,9 +5,10 @@ }, "@id": "aPolicy", "policy": { + "@context": "http://www.w3.org/ns/odrl.jsonld", "@type": "Set", - "odrl:permission": [], - "odrl:prohibition": [], - "odrl:obligation": [] + "permission": [], + "prohibition": [], + "obligation": [] } } diff --git a/transfer/transfer-01-negotiation/resources/negotiate-contract.json b/transfer/transfer-01-negotiation/resources/negotiate-contract.json index 85762a75..ad6c8a95 100644 --- a/transfer/transfer-01-negotiation/resources/negotiate-contract.json +++ b/transfer/transfer-01-negotiation/resources/negotiate-contract.json @@ -6,7 +6,6 @@ "connectorId": "provider", "counterPartyAddress": "http://localhost:19194/protocol", "consumerId": "consumer", - "providerId": "provider", "protocol": "dataspace-protocol-http", "policy": { "@context": "http://www.w3.org/ns/odrl.jsonld", @@ -15,6 +14,7 @@ "permission": [], "prohibition": [], "obligation": [], + "assigner": "provider", "target": "assetId" } }