-
Notifications
You must be signed in to change notification settings - Fork 62
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
feat: add streaming kafka-to-http sample
Co-authored-by: ndr_brt <andrea.bertagnolli@gmail.com>
- Loading branch information
1 parent
8ac69c0
commit 013c0a0
Showing
16 changed files
with
465 additions
and
0 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
146 changes: 146 additions & 0 deletions
146
.../src/test/java/org/eclipse/edc/samples/transfer/streaming/Streaming02KafkaToHttpTest.java
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,146 @@ | ||
/* | ||
* Copyright (c) 2023 Bayerische Motoren Werke Aktiengesellschaft (BMW AG) | ||
* | ||
* This program and the accompanying materials are made available under the | ||
* terms of the Apache License, Version 2.0 which is available at | ||
* https://www.apache.org/licenses/LICENSE-2.0 | ||
* | ||
* SPDX-License-Identifier: Apache-2.0 | ||
* | ||
* Contributors: | ||
* Bayerische Motoren Werke Aktiengesellschaft (BMW AG) - initial test implementation for sample | ||
* | ||
*/ | ||
|
||
package org.eclipse.edc.samples.transfer.streaming; | ||
|
||
import jakarta.json.Json; | ||
import okhttp3.mockwebserver.MockWebServer; | ||
import org.apache.kafka.clients.producer.KafkaProducer; | ||
import org.apache.kafka.clients.producer.Producer; | ||
import org.apache.kafka.clients.producer.ProducerConfig; | ||
import org.apache.kafka.clients.producer.ProducerRecord; | ||
import org.eclipse.edc.junit.annotations.EndToEndTest; | ||
import org.eclipse.edc.junit.extensions.EdcRuntimeExtension; | ||
import org.eclipse.edc.junit.testfixtures.TestUtils; | ||
import org.junit.jupiter.api.BeforeEach; | ||
import org.junit.jupiter.api.Test; | ||
import org.junit.jupiter.api.extension.RegisterExtension; | ||
import org.testcontainers.containers.KafkaContainer; | ||
import org.testcontainers.junit.jupiter.Container; | ||
import org.testcontainers.junit.jupiter.Testcontainers; | ||
import org.testcontainers.shaded.com.fasterxml.jackson.databind.ser.std.StringSerializer; | ||
import org.testcontainers.utility.DockerImageName; | ||
|
||
import java.io.IOException; | ||
import java.net.URI; | ||
import java.time.Duration; | ||
import java.util.List; | ||
import java.util.Map; | ||
|
||
import static org.assertj.core.api.Assertions.assertThat; | ||
import static org.awaitility.Awaitility.await; | ||
import static org.eclipse.edc.connector.transfer.spi.types.TransferProcessStates.STARTED; | ||
import static org.eclipse.edc.samples.transfer.FileTransferSampleTestCommon.getFileContentFromRelativePath; | ||
import static org.eclipse.edc.samples.transfer.FileTransferSampleTestCommon.getFileFromRelativePath; | ||
|
||
@Testcontainers | ||
@EndToEndTest | ||
public class Streaming02KafkaToHttpTest { | ||
|
||
private static final String KAFKA_IMAGE_NAME = "bashj79/kafka-kraft:3.0.0"; | ||
private static final String TOPIC = "kafka-stream-topic"; | ||
private static final String MAX_DURATION = "PT30S"; | ||
private static final String SAMPLE_FOLDER = "transfer/streaming/streaming-02-kafka-to-http"; | ||
private static final Duration TIMEOUT = Duration.ofSeconds(30); | ||
@Container | ||
public static final KafkaContainer KAFKA_CONTAINER = new KafkaContainer(DockerImageName | ||
.parse(KAFKA_IMAGE_NAME)).withEnv("KAFKA_CREATE_TOPICS", TOPIC.concat(":1:1")); | ||
|
||
private Producer<String, String> createKafkaProducer() { | ||
java.util.Properties props = new java.util.Properties(); | ||
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, KAFKA_CONTAINER.getBootstrapServers()); | ||
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName()); | ||
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName()); | ||
return new KafkaProducer<>(props); | ||
} | ||
|
||
private static final Participant PROVIDER = Participant.Builder.newInstance() | ||
.name("provider") | ||
.id("provider") | ||
.managementEndpoint(new Participant.Endpoint(URI.create("http://localhost:18181/management"))) | ||
.protocolEndpoint(new Participant.Endpoint(URI.create("http://localhost:18182/protocol"))) | ||
.controlEndpoint(new Participant.Endpoint(URI.create("http://localhost:18183/control"))) | ||
.build(); | ||
|
||
private static final Participant CONSUMER = Participant.Builder.newInstance() | ||
.name("consumer") | ||
.id("consumer") | ||
.managementEndpoint(new Participant.Endpoint(URI.create("http://localhost:28181/management"))) | ||
.protocolEndpoint(new Participant.Endpoint(URI.create("http://localhost:28182/protocol"))) | ||
.controlEndpoint(new Participant.Endpoint(URI.create("http://localhost:28183/control"))) | ||
.build(); | ||
|
||
@RegisterExtension | ||
static EdcRuntimeExtension providerConnector = new EdcRuntimeExtension( | ||
":transfer:streaming:streaming-02-kafka-to-http:streaming-02-runtime", | ||
"provider", | ||
Map.of( | ||
"edc.fs.config", getFileFromRelativePath(SAMPLE_FOLDER + "/streaming-02-runtime/provider.properties").getAbsolutePath() | ||
) | ||
); | ||
|
||
@RegisterExtension | ||
static EdcRuntimeExtension consumerConnector = new EdcRuntimeExtension( | ||
":transfer:streaming:streaming-02-kafka-to-http:streaming-02-runtime", | ||
"provider", | ||
Map.of( | ||
"edc.fs.config", getFileFromRelativePath(SAMPLE_FOLDER + "/streaming-02-runtime/consumer.properties").getAbsolutePath() | ||
) | ||
); | ||
private final int httpReceiverPort = TestUtils.getFreePort(); | ||
private final MockWebServer consumerReceiverServer = new MockWebServer(); | ||
|
||
@BeforeEach | ||
void setUp() throws IOException { | ||
consumerReceiverServer.start(httpReceiverPort); | ||
} | ||
|
||
@Test | ||
void streamData() { | ||
|
||
PROVIDER.registerDataPlane(List.of("Kafka"), List.of("HttpData")); | ||
|
||
PROVIDER.createAsset(getFileContentFromRelativePath(SAMPLE_FOLDER + "/1-asset.json") | ||
.replace("{{bootstrap.servers}}", KAFKA_CONTAINER.getBootstrapServers()) | ||
.replace("{{max.duration}}", MAX_DURATION) | ||
.replace("{{topic}}", TOPIC)); | ||
PROVIDER.createPolicyDefinition(getFileContentFromRelativePath(SAMPLE_FOLDER + "/2-policy-definition.json")); | ||
PROVIDER.createContractDefinition(getFileContentFromRelativePath(SAMPLE_FOLDER + "/2-contract-definition.json")); | ||
|
||
var destination = Json.createObjectBuilder() | ||
.add("type", "HttpData") | ||
.add("baseUrl", "http://localhost:" + httpReceiverPort) | ||
.build(); | ||
|
||
var transferProcessId = CONSUMER.requestAsset(PROVIDER, "kafka-stream-asset", Json.createObjectBuilder().build(), destination); | ||
|
||
await().atMost(TIMEOUT).untilAsserted(() -> { | ||
String state = CONSUMER.getTransferProcessState(transferProcessId); | ||
assertThat(state).isEqualTo(STARTED.name()); | ||
}); | ||
|
||
var producer = createKafkaProducer(); | ||
var message = "fake message"; | ||
producer.send(new ProducerRecord<>(TOPIC, "key", message)); | ||
producer.send(new ProducerRecord<>(TOPIC, "key", message)); | ||
|
||
await().atMost(TIMEOUT).untilAsserted(() -> { | ||
var request = consumerReceiverServer.takeRequest(); | ||
assertThat(request).isNotNull(); | ||
assertThat(request.getBody().readByteArray()).isEqualTo(message.getBytes()); | ||
}); | ||
|
||
producer.close(); | ||
} | ||
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1 @@ | ||
source/* |
9 changes: 9 additions & 0 deletions
9
transfer/streaming/streaming-02-kafka-to-http/0-dataplane.json
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,9 @@ | ||
{ | ||
"@context": { | ||
"edc": "https://w3id.org/edc/v0.0.1/ns/" | ||
}, | ||
"@id": "http-pull-provider-dataplane", | ||
"url": "http://localhost:19192/control/transfer", | ||
"allowedSourceTypes": [ "Kafka" ], | ||
"allowedDestTypes": [ "HttpData" ] | ||
} |
12 changes: 12 additions & 0 deletions
12
transfer/streaming/streaming-02-kafka-to-http/1-asset.json
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,12 @@ | ||
{ | ||
"@context": { "@vocab": "https://w3id.org/edc/v0.0.1/ns/" }, | ||
"@id": "kafka-stream-asset", | ||
"properties": { | ||
}, | ||
"dataAddress": { | ||
"type": "Kafka", | ||
"kafka.bootstrap.servers": "{{bootstrap.servers}}", | ||
"maxDuration": "{{max.duration}}", | ||
"topic": "{{topic}}" | ||
} | ||
} |
10 changes: 10 additions & 0 deletions
10
transfer/streaming/streaming-02-kafka-to-http/2-policy-definition.json
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,10 @@ | ||
{ | ||
"@context": { | ||
"@vocab": "https://w3id.org/edc/v0.0.1/ns/", | ||
"odrl": "http://www.w3.org/ns/odrl/2/" | ||
}, | ||
"@id": "no-constraint-policy", | ||
"policy": { | ||
"@type": "odrl:use" | ||
} | ||
} |
7 changes: 7 additions & 0 deletions
7
transfer/streaming/streaming-02-kafka-to-http/3-contract-definition.json
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,7 @@ | ||
{ | ||
"@context": { "@vocab": "https://w3id.org/edc/v0.0.1/ns/" }, | ||
"@id": "contract-definition", | ||
"accessPolicyId": "no-constraint-policy", | ||
"contractPolicyId": "no-constraint-policy", | ||
"assetsSelector": [] | ||
} |
7 changes: 7 additions & 0 deletions
7
transfer/streaming/streaming-02-kafka-to-http/4-get-dataset.json
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,7 @@ | ||
{ | ||
"@context": { "@vocab": "https://w3id.org/edc/v0.0.1/ns/" }, | ||
"@type": "DatasetRequest", | ||
"@id": "kafka-stream-asset", | ||
"counterPartyAddress": "http://localhost:18182/protocol", | ||
"protocol": "dataspace-protocol-http" | ||
} |
23 changes: 23 additions & 0 deletions
23
transfer/streaming/streaming-02-kafka-to-http/5-negotiate-contract.json
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,23 @@ | ||
{ | ||
"@context": { | ||
"edc": "https://w3id.org/edc/v0.0.1/ns/", | ||
"odrl": "http://www.w3.org/ns/odrl/2/" | ||
}, | ||
"@type": "NegotiationInitiateRequestDto", | ||
"connectorAddress": "http://localhost:18182/protocol", | ||
"counterPartyAddress": "http://localhost:18182/protocol", | ||
"providerId": "provider", | ||
"protocol": "dataspace-protocol-http", | ||
"offer": { | ||
"offerId": "{{offerId}}", | ||
"assetId": "kafka-stream-asset", | ||
"policy": { | ||
"@id": "{{offerId}}", | ||
"@type": "use", | ||
"odrl:permission": [], | ||
"odrl:prohibition": [], | ||
"odrl:obligation": [], | ||
"odrl:target": "kafka-stream-asset" | ||
} | ||
} | ||
} |
15 changes: 15 additions & 0 deletions
15
transfer/streaming/streaming-02-kafka-to-http/6-transfer.json
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,15 @@ | ||
{ | ||
"@context": { | ||
"edc": "https://w3id.org/edc/v0.0.1/ns/" | ||
}, | ||
"@type": "TransferRequest", | ||
"dataDestination": { | ||
"type": "HttpData", | ||
"baseUrl": "http://localhost:4000" | ||
}, | ||
"protocol": "dataspace-protocol-http", | ||
"assetId": "stream-asset", | ||
"contractId": "{{contract-agreement-id}}", | ||
"connectorId": "provider", | ||
"connectorAddress": "http://localhost:18182/protocol" | ||
} |
Oops, something went wrong.