Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Sample04 fix #2

Closed
wants to merge 7 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 0 additions & 1 deletion basic/basic-01-basic-connector/build.gradle.kts
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,6 @@ application {
}

tasks.withType<com.github.jengelman.gradle.plugins.shadow.tasks.ShadowJar> {
exclude("**/pom.properties", "**/pom.xm")
mergeServiceFiles()
archiveFileName.set("basic-connector.jar")
}
1 change: 0 additions & 1 deletion basic/basic-02-health-endpoint/build.gradle.kts
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,6 @@ application {
}

tasks.withType<com.github.jengelman.gradle.plugins.shadow.tasks.ShadowJar> {
exclude("**/pom.properties", "**/pom.xm")
mergeServiceFiles()
archiveFileName.set("connector-health.jar")
}
1 change: 0 additions & 1 deletion basic/basic-03-configuration/build.gradle.kts
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,6 @@ application {
}

tasks.withType<com.github.jengelman.gradle.plugins.shadow.tasks.ShadowJar> {
exclude("**/pom.properties", "**/pom.xm")
mergeServiceFiles()
archiveFileName.set("filesystem-config-connector.jar")
}
9 changes: 8 additions & 1 deletion gradle/libs.versions.toml
Original file line number Diff line number Diff line change
Expand Up @@ -4,14 +4,16 @@ format.version = "1.1"
[versions]
assertj = "3.24.2"
awaitility = "4.2.0"
edc = "0.2.1"
edc = "0.3.0"
jakarta-json = "2.0.1"
junit-pioneer = "2.1.0"
jupiter = "5.10.0"
okhttp-mockwebserver = "5.0.0-alpha.11"
openTelemetry = "1.18.0"
restAssured = "5.3.2"
rsApi = "3.1.0"
kafkaClients = "3.6.0"
testContainers = "1.19.1"


[libraries]
Expand All @@ -24,6 +26,7 @@ edc-boot = { module = "org.eclipse.edc:boot", version.ref = "edc" }
edc-build-plugin = { module = "org.eclipse.edc.edc-build:org.eclipse.edc.edc-build.gradle.plugin", version.ref = "edc" }
edc-configuration-filesystem = { module = "org.eclipse.edc:configuration-filesystem", version.ref = "edc" }
edc-connector-core = { module = "org.eclipse.edc:connector-core", version.ref = "edc" }
edc-control-plane-api-client = { module = "org.eclipse.edc:control-plane-api-client", version.ref = "edc" }
edc-control-plane-core = { module = "org.eclipse.edc:control-plane-core", version.ref = "edc" }
edc-control-plane-spi = { module = "org.eclipse.edc:control-plane-spi", version.ref = "edc" }
edc-data-plane-api = { module = "org.eclipse.edc:data-plane-api", version.ref = "edc" }
Expand All @@ -32,6 +35,7 @@ edc-data-plane-azure-storage = { module = "org.eclipse.edc:data-plane-azure-stor
edc-data-plane-client = { module = "org.eclipse.edc:data-plane-client", version.ref = "edc" }
edc-data-plane-core = { module = "org.eclipse.edc:data-plane-core", version.ref = "edc" }
edc-data-plane-http = { module = "org.eclipse.edc:data-plane-http", version.ref = "edc" }
edc-data-plane-kafka = { module = "org.eclipse.edc:data-plane-kafka", version.ref = "edc" }
edc-data-plane-selector-api = { module = "org.eclipse.edc:data-plane-selector-api", version.ref = "edc" }
edc-data-plane-selector-client = { module = "org.eclipse.edc:data-plane-selector-client", version.ref = "edc" }
edc-data-plane-selector-core = { module = "org.eclipse.edc:data-plane-selector-core", version.ref = "edc" }
Expand Down Expand Up @@ -65,6 +69,9 @@ junit-pioneer = { module = "org.junit-pioneer:junit-pioneer", version.ref = "jun
okhttp-mockwebserver = { module = "com.squareup.okhttp3:mockwebserver", version.ref = "okhttp-mockwebserver" }
opentelemetry-annotations = { module = "io.opentelemetry:opentelemetry-extension-annotations", version.ref = "openTelemetry" }
restAssured = { module = "io.rest-assured:rest-assured", version.ref = "restAssured" }
kafka-clients = { module = "org.apache.kafka:kafka-clients", version.ref = "kafkaClients" }
testcontainers-kafka = { module = "org.testcontainers:kafka", version.ref = "testContainers" }
testcontainers-junit = { module = "org.testcontainers:junit-jupiter", version.ref = "testContainers" }

[plugins]
shadow = { id = "com.github.johnrengelman.shadow", version = "8.1.1" }
1 change: 0 additions & 1 deletion other/custom-runtime/build.gradle.kts
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,6 @@ var distTar = tasks.getByName("distTar")
var distZip = tasks.getByName("distZip")

tasks.withType<com.github.jengelman.gradle.plugins.shadow.tasks.ShadowJar> {
exclude("**/pom.properties", "**/pom.xm")
mergeServiceFiles()
archiveFileName.set("custom-runtime.jar")
dependsOn(distTar, distZip)
Expand Down
1 change: 1 addition & 0 deletions settings.gradle.kts
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,7 @@ include("transfer:transfer-06-consumer-pull-http:http-pull-connector")
include("transfer:transfer-07-provider-push-http:http-push-connector")

include("transfer:streaming:streaming-01-http-to-http:streaming-01-runtime")
include("transfer:streaming:streaming-02-kafka-to-http:streaming-02-runtime")

include("util:http-request-logger")

Expand Down
4 changes: 4 additions & 0 deletions system-tests/build.gradle.kts
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,9 @@ dependencies {
testImplementation(libs.awaitility)
testImplementation(libs.okhttp.mockwebserver)
testImplementation(libs.restAssured)
testImplementation(libs.testcontainers.junit)
testImplementation(libs.testcontainers.kafka)
testImplementation(libs.kafka.clients)

// runtimes
testCompileOnly(project(":basic:basic-01-basic-connector"))
Expand All @@ -34,4 +37,5 @@ dependencies {
testCompileOnly(project(":transfer:transfer-02-file-transfer-listener:file-transfer-listener-consumer"))
testCompileOnly(project(":transfer:transfer-03-modify-transferprocess:modify-transferprocess-consumer"))
testCompileOnly(project(":transfer:streaming:streaming-01-http-to-http:streaming-01-runtime"))
testCompileOnly(project(":transfer:streaming:streaming-02-kafka-to-http:streaming-02-runtime"))
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@
import jakarta.json.Json;
import jakarta.json.JsonArray;
import jakarta.json.JsonObject;
import org.eclipse.edc.connector.contract.spi.ContractId;
import org.eclipse.edc.connector.contract.spi.ContractOfferId;
import org.eclipse.edc.jsonld.TitaniumJsonLd;
import org.eclipse.edc.jsonld.spi.JsonLd;
import org.eclipse.edc.jsonld.util.JacksonJsonLd;
Expand Down Expand Up @@ -89,7 +89,7 @@ public void registerDataPlane(List<String> sourceTypes, List<String> destination
.when()
.post("/v2/dataplanes")
.then()
.statusCode(204);
.statusCode(200);
}

public String createAsset(String requestBody) {
Expand Down Expand Up @@ -293,7 +293,7 @@ public String initiateTransfer(Participant provider, String contractAgreementId,
public String requestAsset(Participant provider, String assetId, JsonObject privateProperties, JsonObject destination) {
var dataset = getDatasetForAsset(provider, assetId);
var policy = dataset.getJsonArray(ODRL_POLICY_ATTRIBUTE).get(0).asJsonObject();
var contractDefinitionId = ContractId.parseId(policy.getString(ID))
var contractDefinitionId = ContractOfferId.parseId(policy.getString(ID))
.orElseThrow(failure -> new RuntimeException(failure.getFailureDetail()));
var contractAgreementId = negotiateContract(provider, contractDefinitionId.toString(), assetId, policy);
var transferProcessId = initiateTransfer(provider, contractAgreementId, assetId, privateProperties, destination);
Expand All @@ -317,11 +317,6 @@ public String getTransferProcessState(String id) {
.extract().body().jsonPath().getString("'edc:state'");
}

private ContractId extractContractDefinitionId(JsonObject dataset) {
var contractId = dataset.getJsonArray(ODRL_POLICY_ATTRIBUTE).get(0).asJsonObject().getString(ID);
return ContractId.parseId(contractId).orElseThrow(f -> new RuntimeException(f.getFailureDetail()));
}

private String getContractNegotiationState(String id) {
return managementEndpoint.baseRequest()
.contentType(JSON)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,11 +26,14 @@
import java.io.IOException;
import java.net.URI;
import java.nio.file.Files;
import java.nio.file.Path;
import java.time.Duration;
import java.util.List;
import java.util.Map;
import java.util.UUID;

import static java.util.concurrent.Executors.newScheduledThreadPool;
import static java.util.concurrent.TimeUnit.MILLISECONDS;
import static org.assertj.core.api.Assertions.assertThat;
import static org.awaitility.Awaitility.await;
import static org.eclipse.edc.connector.transfer.spi.types.TransferProcessStates.STARTED;
Expand Down Expand Up @@ -106,12 +109,20 @@ void streamData() throws IOException {
});

var eventBody = "message that will be sent".getBytes();
Files.write(source.resolve("message-" + UUID.randomUUID()), eventBody);
newScheduledThreadPool(1).scheduleAtFixedRate(() -> createMessage(source, eventBody), 0L, 200L, MILLISECONDS);

await().atMost(TIMEOUT).untilAsserted(() -> {
var request = consumerReceiverServer.takeRequest();
assertThat(request).isNotNull();
assertThat(request.getBody().readByteArray()).isEqualTo(eventBody);
});
}

private static void createMessage(Path source, byte[] content) {
try {
Files.write(source.resolve("message-" + UUID.randomUUID()), content);
} catch (IOException e) {
throw new RuntimeException(e);
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,157 @@
/*
* Copyright (c) 2023 Bayerische Motoren Werke Aktiengesellschaft (BMW AG)
*
* This program and the accompanying materials are made available under the
* terms of the Apache License, Version 2.0 which is available at
* https://www.apache.org/licenses/LICENSE-2.0
*
* SPDX-License-Identifier: Apache-2.0
*
* Contributors:
* Bayerische Motoren Werke Aktiengesellschaft (BMW AG) - initial test implementation for sample
*
*/

package org.eclipse.edc.samples.transfer.streaming;

import jakarta.json.Json;
import okhttp3.mockwebserver.MockWebServer;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.serialization.StringSerializer;
import org.eclipse.edc.junit.annotations.EndToEndTest;
import org.eclipse.edc.junit.extensions.EdcRuntimeExtension;
import org.eclipse.edc.junit.testfixtures.TestUtils;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.RegisterExtension;
import org.testcontainers.containers.KafkaContainer;
import org.testcontainers.junit.jupiter.Container;
import org.testcontainers.junit.jupiter.Testcontainers;
import org.testcontainers.utility.DockerImageName;

import java.io.IOException;
import java.net.URI;
import java.time.Duration;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.concurrent.Executors;

import static java.util.concurrent.TimeUnit.MICROSECONDS;
import static org.assertj.core.api.Assertions.assertThat;
import static org.awaitility.Awaitility.await;
import static org.eclipse.edc.connector.transfer.spi.types.TransferProcessStates.STARTED;
import static org.eclipse.edc.samples.transfer.FileTransferSampleTestCommon.getFileContentFromRelativePath;
import static org.eclipse.edc.samples.transfer.FileTransferSampleTestCommon.getFileFromRelativePath;

@Testcontainers
@EndToEndTest
public class Streaming02KafkaToHttpTest {

private static final String KAFKA_IMAGE_NAME = "confluentinc/cp-kafka:7.4.0";
private static final String TOPIC = "kafka-stream-topic";
private static final String MAX_DURATION = "PT30S";
private static final String SAMPLE_FOLDER = "transfer/streaming/streaming-02-kafka-to-http";
private static final Duration TIMEOUT = Duration.ofSeconds(30);
private static final Participant PROVIDER = Participant.Builder.newInstance()
.name("provider")
.id("provider")
.managementEndpoint(new Participant.Endpoint(URI.create("http://localhost:18181/management")))
.protocolEndpoint(new Participant.Endpoint(URI.create("http://localhost:18182/protocol")))
.controlEndpoint(new Participant.Endpoint(URI.create("http://localhost:18183/control")))
.build();
private static final Participant CONSUMER = Participant.Builder.newInstance()
.name("consumer")
.id("consumer")
.managementEndpoint(new Participant.Endpoint(URI.create("http://localhost:28181/management")))
.protocolEndpoint(new Participant.Endpoint(URI.create("http://localhost:28182/protocol")))
.controlEndpoint(new Participant.Endpoint(URI.create("http://localhost:28183/control")))
.build();

@Container
static KafkaContainer kafkaContainer = new KafkaContainer(DockerImageName.parse(KAFKA_IMAGE_NAME))
.withKraft()
.withEnv("KAFKA_AUTO_CREATE_TOPICS_ENABLE", "true")
.withEnv("KAFKA_CREATE_TOPICS", TOPIC.concat(":1:1"));

@RegisterExtension
static EdcRuntimeExtension providerConnector = new EdcRuntimeExtension(
":transfer:streaming:streaming-02-kafka-to-http:streaming-02-runtime",
"provider",
Map.of(
"edc.fs.config",
getFileFromRelativePath(SAMPLE_FOLDER + "/streaming-02-runtime/provider.properties")
.getAbsolutePath()
)
);

@RegisterExtension
static EdcRuntimeExtension consumerConnector = new EdcRuntimeExtension(
":transfer:streaming:streaming-02-kafka-to-http:streaming-02-runtime",
"consumer",
Map.of(
"edc.fs.config",
getFileFromRelativePath(SAMPLE_FOLDER + "/streaming-02-runtime/consumer.properties")
.getAbsolutePath()
)
);
private final int httpReceiverPort = TestUtils.getFreePort();
private final MockWebServer consumerReceiverServer = new MockWebServer();

@BeforeEach
void setUp() throws IOException {
consumerReceiverServer.start(httpReceiverPort);
}

@Test
void streamData() {

PROVIDER.registerDataPlane(List.of("Kafka"), List.of("HttpData"));

PROVIDER.createAsset(getFileContentFromRelativePath(SAMPLE_FOLDER + "/1-asset.json")
.replace("{{bootstrap.servers}}", kafkaContainer.getBootstrapServers())
.replace("{{max.duration}}", MAX_DURATION)
.replace("{{topic}}", TOPIC));
PROVIDER.createPolicyDefinition(getFileContentFromRelativePath(SAMPLE_FOLDER + "/2-policy-definition.json"));
PROVIDER.createContractDefinition(
getFileContentFromRelativePath(SAMPLE_FOLDER + "/3-contract-definition.json"));

var destination = Json.createObjectBuilder()
.add("type", "HttpData")
.add("baseUrl", "http://localhost:" + httpReceiverPort)
.build();

var transferProcessId = CONSUMER.requestAsset(PROVIDER, "kafka-stream-asset",
Json.createObjectBuilder().build(), destination);

await().atMost(TIMEOUT).untilAsserted(() -> {
String state = CONSUMER.getTransferProcessState(transferProcessId);
assertThat(state).isEqualTo(STARTED.name());
});

var producer = createKafkaProducer();
var message = "message";
Executors.newSingleThreadScheduledExecutor().scheduleAtFixedRate(() -> producer
.send(new ProducerRecord<>(TOPIC, "key", message)), 0L, 100L, MICROSECONDS);

await().atMost(TIMEOUT).untilAsserted(() -> {
var request = consumerReceiverServer.takeRequest();
assertThat(request).isNotNull();
assertThat(request.getBody().readByteArray()).isEqualTo(message.getBytes());
});

producer.close();
}

private Producer<String, String> createKafkaProducer() {
var props = new Properties();
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaContainer.getBootstrapServers());
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
return new KafkaProducer<>(props);
}

}
1 change: 0 additions & 1 deletion transfer/streaming/streaming-01-http-to-http/.gitignore

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ plugins {
}

dependencies {
implementation(libs.edc.control.plane.api.client)
implementation(libs.edc.control.plane.core)
implementation(libs.edc.data.plane.selector.core)
implementation(libs.edc.api.observability)
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
{
"@context": {
"edc": "https://w3id.org/edc/v0.0.1/ns/"
},
"@id": "http-pull-provider-dataplane",
"url": "http://localhost:19192/control/transfer",
"allowedSourceTypes": [ "Kafka" ],
"allowedDestTypes": [ "HttpData" ]
}
12 changes: 12 additions & 0 deletions transfer/streaming/streaming-02-kafka-to-http/1-asset.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
{
"@context": { "@vocab": "https://w3id.org/edc/v0.0.1/ns/" },
"@id": "kafka-stream-asset",
"properties": {
},
"dataAddress": {
"type": "Kafka",
"kafka.bootstrap.servers": "{{bootstrap.servers}}",
"maxDuration": "{{max.duration}}",
"topic": "{{topic}}"
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
{
"@context": {
"@vocab": "https://w3id.org/edc/v0.0.1/ns/",
"odrl": "http://www.w3.org/ns/odrl/2/"
},
"@id": "no-constraint-policy",
"policy": {
"@type": "odrl:use"
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
{
"@context": { "@vocab": "https://w3id.org/edc/v0.0.1/ns/" },
"@id": "contract-definition",
"accessPolicyId": "no-constraint-policy",
"contractPolicyId": "no-constraint-policy",
"assetsSelector": []
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
{
"@context": { "@vocab": "https://w3id.org/edc/v0.0.1/ns/" },
"@type": "DatasetRequest",
"@id": "kafka-stream-asset",
"counterPartyAddress": "http://localhost:18182/protocol",
"protocol": "dataspace-protocol-http"
}
Loading