diff --git a/.github/workflows/verify.yaml b/.github/workflows/verify.yaml index b6ee42e3..e25e91d6 100644 --- a/.github/workflows/verify.yaml +++ b/.github/workflows/verify.yaml @@ -15,18 +15,6 @@ concurrency: cancel-in-progress: true jobs: - Checkstyle: - permissions: - id-token: write - checks: write - pull-requests: write - runs-on: ubuntu-latest - steps: - - uses: actions/checkout@v4 - - uses: ./.github/actions/setup-build - - - name: Run Checkstyle - run: ./gradlew checkstyleMain checkstyleTest Build: runs-on: ubuntu-latest diff --git a/gradle/libs.versions.toml b/gradle/libs.versions.toml index 409d1069..fea8b473 100644 --- a/gradle/libs.versions.toml +++ b/gradle/libs.versions.toml @@ -56,6 +56,7 @@ edc-runtime-metamodel = { module = "org.eclipse.edc:runtime-metamodel", version. edc-transfer-data-plane = { module = "org.eclipse.edc:transfer-data-plane", version.ref = "edc" } edc-transfer-process-api = { module = "org.eclipse.edc:transfer-process-api", version.ref = "edc" } edc-transfer-pull-http-receiver = { module = "org.eclipse.edc:transfer-pull-http-receiver", version.ref = "edc" } +edc-transfer-pull-http-dynamic-receiver = { module = "org.eclipse.edc:transfer-pull-http-dynamic-receiver", version.ref = "edc" } edc-transfer-spi = { module = "org.eclipse.edc:transfer-spi", version.ref = "edc" } edc-util = { module = "org.eclipse.edc:util", version.ref = "edc" } edc-vault-azure = { module = "org.eclipse.edc:vault-azure", version.ref = "edc" } diff --git a/settings.gradle.kts b/settings.gradle.kts index 48a316fa..7bddd3b8 100644 --- a/settings.gradle.kts +++ b/settings.gradle.kts @@ -28,12 +28,10 @@ dependencyResolutionManagement { } } -// basic include(":basic:basic-01-basic-connector") include(":basic:basic-02-health-endpoint") include(":basic:basic-03-configuration") -// transfer include(":transfer:transfer-00-prerequisites:connector") include(":transfer:transfer-04-event-consumer:consumer-with-listener") @@ -43,16 +41,15 @@ include(":transfer:transfer-05-file-transfer-cloud:cloud-transfer-consumer") include(":transfer:transfer-05-file-transfer-cloud:cloud-transfer-provider") include(":transfer:transfer-05-file-transfer-cloud:transfer-file-cloud") -include("transfer:streaming:streaming-01-http-to-http:streaming-01-runtime") -include("transfer:streaming:streaming-02-kafka-to-http:streaming-02-runtime") +include(":transfer:streaming:streaming-01-http-to-http:streaming-01-runtime") +include(":transfer:streaming:streaming-02-kafka-to-http:streaming-02-runtime") +include(":transfer:streaming:streaming-03-kafka-broker:streaming-03-runtime") -// advanced include(":advanced:advanced-01-open-telemetry:open-telemetry-consumer") include(":advanced:advanced-01-open-telemetry:open-telemetry-provider") -// modules for code samples ------------------------------------------------------------------------ include(":other:custom-runtime") -include("util:http-request-logger") +include(":util:http-request-logger") include(":system-tests") diff --git a/system-tests/build.gradle.kts b/system-tests/build.gradle.kts index f7f3e6a8..deb02448 100644 --- a/system-tests/build.gradle.kts +++ b/system-tests/build.gradle.kts @@ -39,7 +39,8 @@ dependencies { testCompileOnly(project(":transfer:transfer-04-event-consumer:listener")) testCompileOnly(project(":transfer:streaming:streaming-01-http-to-http:streaming-01-runtime")) testCompileOnly(project(":transfer:streaming:streaming-02-kafka-to-http:streaming-02-runtime")) + testCompileOnly(project(":transfer:streaming:streaming-03-kafka-broker:streaming-03-runtime")) testCompileOnly(project(":advanced:advanced-01-open-telemetry:open-telemetry-provider")) testCompileOnly(project(":advanced:advanced-01-open-telemetry:open-telemetry-consumer")) -} \ No newline at end of file +} diff --git a/system-tests/src/test/java/org/eclipse/edc/samples/common/FileTransferCommon.java b/system-tests/src/test/java/org/eclipse/edc/samples/common/FileTransferCommon.java index 0e79a98b..1ff5f45e 100644 --- a/system-tests/src/test/java/org/eclipse/edc/samples/common/FileTransferCommon.java +++ b/system-tests/src/test/java/org/eclipse/edc/samples/common/FileTransferCommon.java @@ -15,10 +15,6 @@ package org.eclipse.edc.samples.common; -import com.fasterxml.jackson.databind.ObjectMapper; -import io.restassured.http.ContentType; -import org.apache.http.HttpStatus; -import org.eclipse.edc.connector.transfer.spi.types.DataRequest; import org.eclipse.edc.junit.testfixtures.TestUtils; import org.jetbrains.annotations.NotNull; @@ -26,52 +22,12 @@ import java.io.IOException; import java.nio.file.Files; import java.nio.file.Paths; -import java.time.Duration; -import java.util.Map; - -import static io.restassured.RestAssured.given; -import static org.assertj.core.api.Assertions.assertThat; -import static org.awaitility.Awaitility.await; -import static org.eclipse.edc.connector.transfer.spi.types.TransferProcessStates.COMPLETED; -import static org.hamcrest.Matchers.emptyString; -import static org.hamcrest.Matchers.equalTo; -import static org.hamcrest.Matchers.not; /** * Encapsulates common settings, test steps, and helper methods for transfer samples */ public class FileTransferCommon { - static final ObjectMapper MAPPER = new ObjectMapper(); - - static final String MANAGEMENT_API_URL = "http://localhost:9192/management"; - static final String CONTRACT_OFFER_FILE_PATH = "transfer/transfer-01-file-transfer/contractoffer.json"; - static final String TRANSFER_FILE_PATH = "transfer/transfer-01-file-transfer/filetransfer.json"; - static final String API_KEY_HEADER_KEY = "X-Api-Key"; - static final String API_KEY_HEADER_VALUE = "password"; - - final String sampleAssetFilePath; - final File sampleAssetFile; - final File destinationFile; - Duration timeout = Duration.ofSeconds(30); - Duration pollInterval = Duration.ofMillis(500); - - String contractNegotiationId; - String contractAgreementId; - - /** - * Creates a new {@code FileTransferSampleTestCommon} instance. - * - * @param sampleAssetFilePath Relative path starting from the root of the project to a file which will be read from for transfer. - * @param destinationFilePath Relative path starting from the root of the project where the transferred file will be written to. - */ - public FileTransferCommon(@NotNull String sampleAssetFilePath, @NotNull String destinationFilePath) { - this.sampleAssetFilePath = sampleAssetFilePath; - sampleAssetFile = getFileFromRelativePath(sampleAssetFilePath); - - destinationFile = getFileFromRelativePath(destinationFilePath); - } - /** * Resolves a {@link File} instance from a relative path. */ @@ -93,141 +49,4 @@ public static String getFileContentFromRelativePath(String relativePath) { } } - /** - * Assert that prerequisites are fulfilled before running the test. - * This assertion checks only whether the file to be copied is not existing already. - */ - void assertTestPrerequisites() { - assertThat(destinationFile).doesNotExist(); - } - - /** - * Remove files created while running the tests. - * The copied file will be deleted. - */ - void cleanTemporaryTestFiles() { - destinationFile.delete(); - } - - /** - * Assert that the file to be copied exists at the expected location. - * This method waits a duration which is defined in {@link FileTransferCommon#timeout}. - */ - void assertDestinationFileContent() { - await().atMost(timeout).pollInterval(pollInterval).untilAsserted(() - -> assertThat(destinationFile).hasSameBinaryContentAs(sampleAssetFile)); - } - - /** - * Assert that the transfer process state on the consumer is completed. - */ - void assertTransferProcessStatusConsumerSide(String transferProcessId) { - await().atMost(timeout).pollInterval(pollInterval).untilAsserted(() -> { - var state = getTransferProcessState(transferProcessId); - - assertThat(state).isEqualTo(COMPLETED.name()); - }); - } - - /** - * Assert that a POST request to initiate a contract negotiation is successful. - * This method corresponds to the command in the sample: {@code curl -X POST -H "Content-Type: application/json" -H "X-Api-Key: password" -d @transfer/transfer-01-file-transfer/contractoffer.json "http://localhost:9192/management/v2/contractnegotiations"} - */ - void initiateContractNegotiation() { - initiateContractNegotiation(CONTRACT_OFFER_FILE_PATH); - } - - - void initiateContractNegotiation(String contractOfferFilePath) { - contractNegotiationId = given() - .headers(API_KEY_HEADER_KEY, API_KEY_HEADER_VALUE) - .contentType(ContentType.JSON) - .body(new File(TestUtils.findBuildRoot(), contractOfferFilePath)) - .when() - .post(MANAGEMENT_API_URL + "/v2/contractnegotiations") - .then() - .statusCode(HttpStatus.SC_OK) - .body("@id", not(emptyString())) - .extract() - .jsonPath() - .get("@id"); - } - - public String getTransferProcessState(String processId) { - return given() - .headers(API_KEY_HEADER_KEY, API_KEY_HEADER_VALUE) - .when() - .get(String.format("%s/%s", MANAGEMENT_API_URL + "/v2/transferprocesses", processId)) - .then() - .statusCode(HttpStatus.SC_OK) - .extract().body().jsonPath().getString("'edc:state'"); - } - - /** - * Assert that a GET request to look up a contract agreement is successful. - * This method corresponds to the command in the sample: {@code curl -X GET -H 'X-Api-Key: password' "http://localhost:9192/management/v2/contractnegotiations/{UUID}"} - */ - void lookUpContractAgreementId() { - // Wait for transfer to be completed. - await().atMost(timeout).pollInterval(pollInterval).untilAsserted(() -> contractAgreementId = given() - .headers(API_KEY_HEADER_KEY, API_KEY_HEADER_VALUE) - .when() - .get(MANAGEMENT_API_URL + "/v2/contractnegotiations/{id}", contractNegotiationId) - .then() - .statusCode(HttpStatus.SC_OK) - .body("'edc:state'", equalTo("FINALIZED")) - .body("'edc:contractAgreementId'", not(emptyString())) - .extract().body().jsonPath().getString("'edc:contractAgreementId'") - ); - } - - /** - * Assert that a POST request to initiate transfer process is successful. - * This method corresponds to the command in the sample: {@code curl -X POST -H "Content-Type: application/json" -H "X-Api-Key: password" -d @transfer/transfer-01-file-transfer/filetransfer.json "http://localhost:9192/management/v2/transferprocesses"} - * - * @throws IOException Thrown if there was an error accessing the transfer request file defined in {@link FileTransferCommon#TRANSFER_FILE_PATH}. - */ - String requestTransferFile(String transferFilePath) throws IOException { - var transferJsonFile = getFileFromRelativePath(transferFilePath); - var requestBody = readAndUpdateDataRequestFromJsonFile(transferJsonFile, contractAgreementId); - - var jsonPath = given() - .headers(API_KEY_HEADER_KEY, API_KEY_HEADER_VALUE) - .contentType(ContentType.JSON) - .body(requestBody) - .when() - .post(MANAGEMENT_API_URL + "/v2/transferprocesses") - .then() - .log().ifError() - .statusCode(HttpStatus.SC_OK) - .body("@id", not(emptyString())) - .extract() - .jsonPath(); - - var transferProcessId = jsonPath.getString("@id"); - - assertThat(transferProcessId).isNotEmpty(); - - return transferProcessId; - } - - String requestTransferFile() throws IOException { - return requestTransferFile(TRANSFER_FILE_PATH); - } - - /** - * Reads a transfer request file with changed value for contract agreement ID and file destination path. - * - * @param transferJsonFile A {@link File} instance pointing to a JSON transfer request file. - * @param contractAgreementId This string containing a UUID will be used as value for the contract agreement ID. - * @return An instance of {@link DataRequest} with changed values for contract agreement ID and file destination path. - * @throws IOException Thrown if there was an error accessing the file given in transferJsonFile. - */ - Map readAndUpdateDataRequestFromJsonFile(@NotNull File transferJsonFile, @NotNull String contractAgreementId) throws IOException { - var fileString = Files.readString(transferJsonFile.toPath()) - .replace("{path to destination file}", destinationFile.getAbsolutePath()) - .replace("{agreement ID}", contractAgreementId); - - return MAPPER.readValue(fileString, Map.class); - } } diff --git a/system-tests/src/test/java/org/eclipse/edc/samples/transfer/streaming/KafkaSaslContainer.java b/system-tests/src/test/java/org/eclipse/edc/samples/transfer/streaming/KafkaSaslContainer.java new file mode 100644 index 00000000..c2df6d60 --- /dev/null +++ b/system-tests/src/test/java/org/eclipse/edc/samples/transfer/streaming/KafkaSaslContainer.java @@ -0,0 +1,37 @@ +package org.eclipse.edc.samples.transfer.streaming; + +import org.jetbrains.annotations.NotNull; +import org.testcontainers.containers.KafkaContainer; +import org.testcontainers.utility.DockerImageName; + +import java.io.File; +import java.io.IOException; +import java.nio.file.Files; + +/** + * Extension of the {@link KafkaContainer} that permits to set the SASL_PLAINTEXT security protocol + */ +public class KafkaSaslContainer extends KafkaContainer { + + private static final String KAFKA_IMAGE_NAME = "confluentinc/cp-kafka:7.5.2"; + private final File envFile; + + public KafkaSaslContainer(@NotNull File envFile) { + super(DockerImageName.parse(KAFKA_IMAGE_NAME)); + this.withKraft(); + this.envFile = envFile; + } + + @Override + protected void configureKraft() { + super.configureKraft(); + try { + Files.readAllLines(envFile.toPath()) + .stream().map(it -> it.split("=", 2)) + .forEach(it -> this.withEnv(it[0], it[1])); + } catch (IOException e) { + throw new RuntimeException(e); + } + } + +} diff --git a/system-tests/src/test/java/org/eclipse/edc/samples/transfer/streaming/Streaming03KafkaToKafkaTest.java b/system-tests/src/test/java/org/eclipse/edc/samples/transfer/streaming/Streaming03KafkaToKafkaTest.java new file mode 100644 index 00000000..c12fdaee --- /dev/null +++ b/system-tests/src/test/java/org/eclipse/edc/samples/transfer/streaming/Streaming03KafkaToKafkaTest.java @@ -0,0 +1,233 @@ +/* + * Copyright (c) 2023 Bayerische Motoren Werke Aktiengesellschaft (BMW AG) + * + * This program and the accompanying materials are made available under the + * terms of the Apache License, Version 2.0 which is available at + * https://www.apache.org/licenses/LICENSE-2.0 + * + * SPDX-License-Identifier: Apache-2.0 + * + * Contributors: + * Bayerische Motoren Werke Aktiengesellschaft (BMW AG) - initial test implementation for sample + * + */ + +package org.eclipse.edc.samples.transfer.streaming; + +import com.fasterxml.jackson.core.JsonProcessingException; +import com.fasterxml.jackson.databind.ObjectMapper; +import jakarta.json.Json; +import okhttp3.mockwebserver.MockWebServer; +import org.apache.kafka.clients.CommonClientConfigs; +import org.apache.kafka.clients.admin.AdminClient; +import org.apache.kafka.clients.admin.AdminClientConfig; +import org.apache.kafka.clients.consumer.Consumer; +import org.apache.kafka.clients.consumer.ConsumerConfig; +import org.apache.kafka.clients.consumer.KafkaConsumer; +import org.apache.kafka.clients.producer.KafkaProducer; +import org.apache.kafka.clients.producer.Producer; +import org.apache.kafka.clients.producer.ProducerConfig; +import org.apache.kafka.clients.producer.ProducerRecord; +import org.apache.kafka.common.acl.AccessControlEntry; +import org.apache.kafka.common.acl.AclBinding; +import org.apache.kafka.common.acl.AclOperation; +import org.apache.kafka.common.acl.AclPermissionType; +import org.apache.kafka.common.config.SaslConfigs; +import org.apache.kafka.common.resource.PatternType; +import org.apache.kafka.common.resource.ResourcePattern; +import org.apache.kafka.common.resource.ResourceType; +import org.apache.kafka.common.serialization.StringDeserializer; +import org.apache.kafka.common.serialization.StringSerializer; +import org.eclipse.edc.junit.annotations.EndToEndTest; +import org.eclipse.edc.junit.extensions.EdcRuntimeExtension; +import org.eclipse.edc.junit.testfixtures.TestUtils; +import org.eclipse.edc.spi.types.domain.edr.EndpointDataReference; +import org.jetbrains.annotations.NotNull; +import org.jetbrains.annotations.Nullable; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.extension.RegisterExtension; +import org.testcontainers.containers.KafkaContainer; +import org.testcontainers.junit.jupiter.Container; +import org.testcontainers.junit.jupiter.Testcontainers; + +import java.io.IOException; +import java.net.URI; +import java.nio.charset.Charset; +import java.nio.file.Path; +import java.time.Duration; +import java.util.Arrays; +import java.util.List; +import java.util.Map; +import java.util.Properties; +import java.util.UUID; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.Executors; + +import static java.time.Duration.ZERO; +import static java.util.concurrent.TimeUnit.MICROSECONDS; +import static java.util.concurrent.TimeUnit.SECONDS; +import static org.assertj.core.api.Assertions.assertThat; +import static org.awaitility.Awaitility.await; +import static org.eclipse.edc.connector.transfer.spi.types.TransferProcessStates.STARTED; +import static org.eclipse.edc.samples.common.FileTransferCommon.getFileContentFromRelativePath; +import static org.eclipse.edc.samples.common.FileTransferCommon.getFileFromRelativePath; +import static org.eclipse.edc.spi.CoreConstants.EDC_NAMESPACE; + +@Testcontainers +@EndToEndTest +public class Streaming03KafkaToKafkaTest { + + private static final String TOPIC = "topic-" + UUID.randomUUID(); + private static final String SAMPLE_NAME = "streaming-03-kafka-broker"; + private static final String RUNTIME_NAME = "streaming-03-runtime"; + private static final Path SAMPLE_FOLDER = Path.of("transfer", "streaming", SAMPLE_NAME); + private static final Path RUNTIME_PATH = SAMPLE_FOLDER.resolve(RUNTIME_NAME); + private static final Duration TIMEOUT = Duration.ofSeconds(60); + private static final Participant PROVIDER = Participant.Builder.newInstance() + .name("provider") + .id("provider") + .managementEndpoint(new Participant.Endpoint(URI.create("http://localhost:18181/management"))) + .protocolEndpoint(new Participant.Endpoint(URI.create("http://localhost:18182/protocol"))) + .controlEndpoint(new Participant.Endpoint(URI.create("http://localhost:18183/control"))) + .build(); + private static final Participant CONSUMER = Participant.Builder.newInstance() + .name("consumer") + .id("consumer") + .managementEndpoint(new Participant.Endpoint(URI.create("http://localhost:28181/management"))) + .protocolEndpoint(new Participant.Endpoint(URI.create("http://localhost:28182/protocol"))) + .controlEndpoint(new Participant.Endpoint(URI.create("http://localhost:28183/control"))) + .build(); + private static final String GROUP_ID = "group_id"; + + @Container + static KafkaContainer kafkaContainer = new KafkaSaslContainer(getFileFromRelativePath(SAMPLE_FOLDER.resolve("kafka.env").toString())) + .withLogConsumer(frame -> System.out.print(frame.getUtf8String())); + + @RegisterExtension + static EdcRuntimeExtension providerConnector = new EdcRuntimeExtension( + ":transfer:streaming:%s:%s".formatted(SAMPLE_NAME, RUNTIME_NAME), + "provider", + Map.of( + "edc.fs.config", + getFileFromRelativePath(RUNTIME_PATH.resolve("provider.properties").toString()) + .getAbsolutePath() + ) + ); + + @RegisterExtension + static EdcRuntimeExtension consumerConnector = new EdcRuntimeExtension( + ":transfer:streaming:%s:%s".formatted(SAMPLE_NAME, RUNTIME_NAME), + "consumer", + Map.of( + "edc.fs.config", + getFileFromRelativePath(RUNTIME_PATH.resolve("consumer.properties").toString()) + .getAbsolutePath() + ) + ); + + private final int httpReceiverPort = TestUtils.getFreePort(); + private final MockWebServer edrReceiverServer = new MockWebServer(); + + @BeforeEach + void setUp() throws IOException { + edrReceiverServer.start(httpReceiverPort); + } + + @Test + void streamData() throws InterruptedException, JsonProcessingException { + createAcls( + userCanAccess("User:alice", ResourceType.TOPIC, TOPIC), + userCanAccess("User:alice", ResourceType.GROUP, GROUP_ID) + ); + + PROVIDER.createAsset(getFileContentFromRelativePath(SAMPLE_FOLDER.resolve("1-asset.json").toString()) + .replace("{{bootstrap.servers}}", kafkaContainer.getBootstrapServers()) + .replace("{{topic}}", TOPIC)); + PROVIDER.createPolicyDefinition(getFileContentFromRelativePath(SAMPLE_FOLDER.resolve("2-policy-definition.json").toString())); + PROVIDER.createContractDefinition( + getFileContentFromRelativePath(SAMPLE_FOLDER.resolve("3-contract-definition.json").toString())); + + var destination = Json.createObjectBuilder() + .add("type", "Kafka") + .build(); + + var transferProcessPrivateProperties = Json.createObjectBuilder() + .add("receiverHttpEndpoint", "http://localhost:" + httpReceiverPort) + .build(); + var transferProcessId = CONSUMER.requestAsset(PROVIDER, "kafka-stream-asset", transferProcessPrivateProperties, destination); + + await().atMost(TIMEOUT).untilAsserted(() -> { + var state = CONSUMER.getTransferProcessState(transferProcessId); + assertThat(state).isEqualTo(STARTED.name()); + }); + + var producer = createKafkaProducer(); + var message = "message"; + Executors.newSingleThreadScheduledExecutor().scheduleAtFixedRate(() -> producer + .send(new ProducerRecord<>(TOPIC, "key", message)), 0L, 100L, MICROSECONDS); + + var endpointDataReference = readEndpointDataReference(); + + try (var clientConsumer = createKafkaConsumer(endpointDataReference.getEndpoint(), endpointDataReference.getAuthKey(), endpointDataReference.getAuthCode())) { + clientConsumer.subscribe(List.of(endpointDataReference.getProperties().get(EDC_NAMESPACE + "topic").toString())); + + await().atMost(TIMEOUT).untilAsserted(() -> { + var records = clientConsumer.poll(ZERO); + assertThat(records.isEmpty()).isFalse(); + records.records(TOPIC).forEach(record -> assertThat(record.value()).isEqualTo(message)); + }); + } + + producer.close(); + } + + private EndpointDataReference readEndpointDataReference() throws InterruptedException, JsonProcessingException { + var request = edrReceiverServer.takeRequest(TIMEOUT.getSeconds(), SECONDS); + var body = request.getBody().readString(Charset.defaultCharset()); + return new ObjectMapper().readValue(body, EndpointDataReference.class); + } + + private AclBinding userCanAccess(String principal, ResourceType resourceType, String resourceName) { + var pattern = new ResourcePattern(resourceType, resourceName, PatternType.LITERAL); + var entry = new AccessControlEntry(principal, "*", AclOperation.READ, AclPermissionType.ALLOW); + return new AclBinding(pattern, entry); + } + + private void createAcls(AclBinding... bindings) { + var adminProperties = new Properties(); + adminProperties.put(AdminClientConfig.SECURITY_PROTOCOL_CONFIG, "SASL_PLAINTEXT"); + adminProperties.put(SaslConfigs.SASL_MECHANISM, "PLAIN"); + adminProperties.put(SaslConfigs.SASL_JAAS_CONFIG, "org.apache.kafka.common.security.plain.PlainLoginModule required username=\"admin\" password=\"admin-secret\";"); + adminProperties.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaContainer.getBootstrapServers()); + try (var adminClient = AdminClient.create(adminProperties)) { + adminClient.createAcls(Arrays.stream(bindings).toList()).all().get(); + } catch (ExecutionException | InterruptedException e) { + throw new RuntimeException(e); + } + } + + private Producer createKafkaProducer() { + var props = new Properties(); + props.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, "SASL_PLAINTEXT"); + props.put(SaslConfigs.SASL_MECHANISM, "PLAIN"); + props.put(SaslConfigs.SASL_JAAS_CONFIG, "org.apache.kafka.common.security.plain.PlainLoginModule required username=\"admin\" password=\"admin-secret\";"); + props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaContainer.getBootstrapServers()); + props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName()); + props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName()); + return new KafkaProducer<>(props); + } + + private static Consumer createKafkaConsumer(@NotNull String endpoint, @Nullable String authKey, @Nullable String authCode) { + var props = new Properties(); + props.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, "SASL_PLAINTEXT"); + props.put(SaslConfigs.SASL_MECHANISM, "PLAIN"); + props.put(SaslConfigs.SASL_JAAS_CONFIG, "org.apache.kafka.common.security.plain.PlainLoginModule required username=\"%s\" password=\"%s\";".formatted(authKey, authCode)); + props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, endpoint); + props.put(ConsumerConfig.GROUP_ID_CONFIG, GROUP_ID); + props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName()); + props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName()); + return new KafkaConsumer<>(props); + } + +} diff --git a/transfer/streaming/streaming-03-kafka-broker/1-asset.json b/transfer/streaming/streaming-03-kafka-broker/1-asset.json new file mode 100644 index 00000000..a4b7d626 --- /dev/null +++ b/transfer/streaming/streaming-03-kafka-broker/1-asset.json @@ -0,0 +1,11 @@ +{ + "@context": { "@vocab": "https://w3id.org/edc/v0.0.1/ns/" }, + "@id": "kafka-stream-asset", + "properties": { + }, + "dataAddress": { + "type": "Kafka", + "kafka.bootstrap.servers": "{{bootstrap.servers}}", + "topic": "{{topic}}" + } +} diff --git a/transfer/streaming/streaming-03-kafka-broker/2-policy-definition.json b/transfer/streaming/streaming-03-kafka-broker/2-policy-definition.json new file mode 100644 index 00000000..4919c71a --- /dev/null +++ b/transfer/streaming/streaming-03-kafka-broker/2-policy-definition.json @@ -0,0 +1,10 @@ +{ + "@context": { + "@vocab": "https://w3id.org/edc/v0.0.1/ns/", + "odrl": "http://www.w3.org/ns/odrl/2/" + }, + "@id": "no-constraint-policy", + "policy": { + "@type": "odrl:use" + } +} diff --git a/transfer/streaming/streaming-03-kafka-broker/3-contract-definition.json b/transfer/streaming/streaming-03-kafka-broker/3-contract-definition.json new file mode 100644 index 00000000..d424ec90 --- /dev/null +++ b/transfer/streaming/streaming-03-kafka-broker/3-contract-definition.json @@ -0,0 +1,7 @@ +{ + "@context": { "@vocab": "https://w3id.org/edc/v0.0.1/ns/" }, + "@id": "contract-definition", + "accessPolicyId": "no-constraint-policy", + "contractPolicyId": "no-constraint-policy", + "assetsSelector": [] +} diff --git a/transfer/streaming/streaming-03-kafka-broker/4-get-dataset.json b/transfer/streaming/streaming-03-kafka-broker/4-get-dataset.json new file mode 100644 index 00000000..0ec57558 --- /dev/null +++ b/transfer/streaming/streaming-03-kafka-broker/4-get-dataset.json @@ -0,0 +1,7 @@ +{ + "@context": { "@vocab": "https://w3id.org/edc/v0.0.1/ns/" }, + "@type": "DatasetRequest", + "@id": "kafka-stream-asset", + "counterPartyAddress": "http://localhost:18182/protocol", + "protocol": "dataspace-protocol-http" +} diff --git a/transfer/streaming/streaming-03-kafka-broker/5-negotiate-contract.json b/transfer/streaming/streaming-03-kafka-broker/5-negotiate-contract.json new file mode 100644 index 00000000..b525b894 --- /dev/null +++ b/transfer/streaming/streaming-03-kafka-broker/5-negotiate-contract.json @@ -0,0 +1,23 @@ +{ + "@context": { + "edc": "https://w3id.org/edc/v0.0.1/ns/", + "odrl": "http://www.w3.org/ns/odrl/2/" + }, + "@type": "NegotiationInitiateRequestDto", + "connectorAddress": "http://localhost:18182/protocol", + "counterPartyAddress": "http://localhost:18182/protocol", + "providerId": "provider", + "protocol": "dataspace-protocol-http", + "offer": { + "offerId": "{{offerId}}", + "assetId": "kafka-stream-asset", + "policy": { + "@id": "{{offerId}}", + "@type": "use", + "odrl:permission": [], + "odrl:prohibition": [], + "odrl:obligation": [], + "odrl:target": "kafka-stream-asset" + } + } +} diff --git a/transfer/streaming/streaming-03-kafka-broker/6-transfer.json b/transfer/streaming/streaming-03-kafka-broker/6-transfer.json new file mode 100644 index 00000000..0e2c7d38 --- /dev/null +++ b/transfer/streaming/streaming-03-kafka-broker/6-transfer.json @@ -0,0 +1,14 @@ +{ + "@context": { + "edc": "https://w3id.org/edc/v0.0.1/ns/" + }, + "@type": "TransferRequest", + "dataDestination": { + "type": "Kafka" + }, + "protocol": "dataspace-protocol-http", + "assetId": "stream-asset", + "contractId": "{{contract-agreement-id}}", + "connectorId": "provider", + "connectorAddress": "http://localhost:18182/protocol" +} diff --git a/transfer/streaming/streaming-03-kafka-broker/README.md b/transfer/streaming/streaming-03-kafka-broker/README.md new file mode 100644 index 00000000..046c6c85 --- /dev/null +++ b/transfer/streaming/streaming-03-kafka-broker/README.md @@ -0,0 +1,205 @@ +# Streaming KAFKA to KAFKA + +This sample demonstrates how to set up the EDC to stream messages through Kafka. +This code is only for demonstration purposes and should not be used in production. + +## Concept + +In this sample the Data-Plane is not used, the consumer will set up a kafka client to poll the messages from the broker +using some credentials obtained from the transfer process. + +The DataFlow is managed by the [KafkaToKafkaDataFlowController](streaming-03-runtime/src/main/java/org/eclipse/edc/samples/streaming/KafkaToKafkaDataFlowController.java), +that on flow initialization creates an `EndpointDataReference` containing the credentials that the consumer would then use +to poll the messages. + +### Run + +Build the connector runtime, which will be used both for the provider and consumer: +```shell +./gradlew :transfer:streaming:streaming-03-kafka-broker:streaming-03-runtime:build +``` + +Run the provider and the consumer, which must be started from different terminal shells: +```shell +# provider +export EDC_FS_CONFIG=transfer/streaming/streaming-03-kafka-broker/streaming-03-runtime/provider.properties +java -jar transfer/streaming/streaming-03-kafka-broker/streaming-03-runtime/build/libs/connector.jar + +#consumer +export EDC_FS_CONFIG=transfer/streaming/streaming-03-kafka-broker/streaming-03-runtime/consumer.properties +java -jar transfer/streaming/streaming-03-kafka-broker/streaming-03-runtime/build/libs/connector.jar +``` + +### Start Kafka and configure ACLs + +Kafka will be started in [KRaft mode](https://developer.confluent.io/learn/kraft/), a single broker with `SASL_PLAINTEXT` +as security protocol ([see config](kafka.env)), there will be an `admin` user, responsible for setting up ACLs and producing +messages, and `alice`, that will be used by the consumer to consume the messages. + +Run the Kafka container: +```shell +docker run --rm --name=kafka-kraft -h kafka-kraft -p 9093:9093 \ + -v "$PWD/transfer/streaming/streaming-03-kafka-broker/kafka-config":/config \ + --env-file transfer/streaming/streaming-03-kafka-broker/kafka.env \ + -e KAFKA_NODE_ID=1 \ + -e KAFKA_LISTENERS='PLAINTEXT://0.0.0.0:9093,BROKER://0.0.0.0:9092,CONTROLLER://0.0.0.0:9094' \ + -e KAFKA_ADVERTISED_LISTENERS='PLAINTEXT://localhost:9093,BROKER://localhost:9092' \ + -e KAFKA_PROCESS_ROLES='broker,controller' \ + -e KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR=1 \ + -e KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR=1 \ + -e KAFKA_CONTROLLER_QUORUM_VOTERS='1@localhost:9094' \ + -e KAFKA_INTER_BROKER_LISTENER_NAME='BROKER' \ + -e KAFKA_CONTROLLER_LISTENER_NAMES='CONTROLLER' \ + -e KAFKA_OFFSETS_TOPIC_NUM_PARTITIONS=1 \ + -e CLUSTER_ID='4L6g3nShT-eMCtK--X86sw' \ + confluentinc/cp-kafka:7.5.2 +``` + +Create the topic `kafka-stream-topic` +```shell +docker exec -it kafka-kraft /bin/kafka-topics \ + --topic kafka-stream-topic --create --partitions 1 --replication-factor 1 \ + --command-config=/config/admin.properties \ + --bootstrap-server localhost:9092 +``` + +To give `alice` read permissions on the topic we need to set up ACLs: +```shell +docker exec -it kafka-kraft /bin/kafka-acls --command-config /config/admin.properties \ + --bootstrap-server localhost:9093 \ + --add --allow-principal 'User:alice' \ + --topic kafka-stream-topic \ + --group group_id \ + --operation Read +``` + +### Register Asset, Policy Definition and Contract Definition on provider + +Then put values of `kafka.bootstrap.servers`, `maxDuration` and `topic` in the [1-asset.json](1-asset.json) file replacing +their placeholders this way: +```json + "dataAddress": { + "type": "Kafka", + "kafka.bootstrap.servers": "localhost:9093", + "topic": "kafka-stream-topic" + } +``` + +Then create the Asset, the Policy Definition and the Contract Definition with these three calls: +```shell +curl -H 'Content-Type: application/json' -d @transfer/streaming/streaming-03-kafka-broker/1-asset.json -X POST "http://localhost:18181/management/v3/assets" +curl -H 'Content-Type: application/json' -d @transfer/streaming/streaming-03-kafka-broker/2-policy-definition.json -X POST "http://localhost:18181/management/v2/policydefinitions" +curl -H 'Content-Type: application/json' -d @transfer/streaming/streaming-03-kafka-broker/3-contract-definition.json -X POST "http://localhost:18181/management/v2/contractdefinitions" +``` + +### Negotiate the contract + +The typical flow requires fetching the catalog from the consumer side and using the contract offer to negotiate a contract. +However, in this sample case, we already have the provider asset (`"kafka-stream-asset"`) so we can get the related dataset +directly with this call: +```shell +curl -H 'Content-Type: application/json' -d @transfer/streaming/streaming-03-kafka-broker/4-get-dataset.json -X POST "http://localhost:28181/management/v2/catalog/dataset/request" -s | jq +``` + +The output will be something like: +```json +{ + "@id": "kafka-stream-asset", + "@type": "dcat:Dataset", + "odrl:hasPolicy": { + "@id": "Y29udHJhY3QtZGVmaW5pdGlvbg==:c3RyZWFtLWFzc2V0:NDlhYTUzZWEtMDUzMS00ZDkyLTg4Y2YtMGRjMTc4MmQ1NjY4", + "@type": "odrl:Set", + "odrl:permission": [], + "odrl:prohibition": [], + "odrl:obligation": [], + "odrl:target": "kafka-stream-asset" + }, + "dcat:distribution": { + "@type": "dcat:Distribution", + "dct:format": { + "@id": "HttpData" + }, + "dcat:accessService": "b24dfdbc-d17f-4d6e-9b5c-8fa71dacecfc" + }, + "edc:id": "kafka-stream-asset", + "@context": { + "dct": "https://purl.org/dc/terms/", + "edc": "https://w3id.org/edc/v0.0.1/ns/", + "dcat": "https://www.w3.org/ns/dcat/", + "odrl": "http://www.w3.org/ns/odrl/2/", + "dspace": "https://w3id.org/dspace/v0.8/" + } +} +``` + +With the `odrl:hasPolicy/@id` we can now replace it in the [negotiate-contract.json](5-negotiate-contract.json) file +and negotiate the contract: +```shell +curl -H 'Content-Type: application/json' -d @transfer/streaming/streaming-03-kafka-broker/5-negotiate-contract.json -X POST "http://localhost:28181/management/v2/contractnegotiations" -s | jq +``` + +### Start the transfer + +First we need to set up the receiver server on the consumer side that will receive the EndpointDataReference containing +the address and credentials to connect to the broker and poll the messages from the topic. For this you'll need to open +another terminal shell and run: +```shell +./gradlew util:http-request-logger:build +HTTP_SERVER_PORT=4000 java -jar util/http-request-logger/build/libs/http-request-logger.jar +``` +It will run on port 4000. + +At this point the contract agreement should already been issued, to verify that, please check the contract negotiation state with +this call, replacing `{{contract-negotiation-id}}` with the id returned by the negotiate contract call. +```shell +curl "http://localhost:28181/management/v2/contractnegotiations/{{contract-negotiation-id}}" -s | jq +``` + +If the `edc:contractAgreementId` is valued, it can be used to start the transfer, replacing it in the [6-transfer.json](6-transfer.json) +file to `{{contract-agreement-id}}` and then calling the connector with this command: +```shell +curl -H 'Content-Type: application/json' -d @transfer/streaming/streaming-03-kafka-broker/6-transfer.json -X POST "http://localhost:28181/management/v2/transferprocesses" -s | jq +``` +> Note that the destination address is `localhost:4000`, this because is where our http server is listening. + +Let's wait until the transfer state is `STARTED` state executing this call, replacing to `{{transfer-process-id}}` the id returned +by the start transfer call: +```shell +curl "http://localhost:28181/management/v2/transferprocesses/{{transfer-process-id}}" -s | jq +``` + +### Consume events +Now in the console of the `http-request-logger` we started before, the `EndpointDataReference` should have appeared: +```json +{ + "id":"8c52a781-2588-4c9b-8c70-4e5ad428eea9", + "endpoint":"localhost:9093", + "authKey":"alice", + "authCode":"alice-secret", + "properties": { + "https://w3id.org/edc/v0.0.1/ns/topic":"kafka-stream-topic" + } +} +``` + +Using these information on the consumer side we can run a `kafka-console-consumer` with the data received to consume +messages from the topic: +```shell +docker exec -it kafka-kraft /bin/kafka-console-consumer --topic kafka-stream-topic \ + --bootstrap-server localhost:9093 \ + --consumer-property group.id=group_id \ + --consumer-property security.protocol=SASL_PLAINTEXT \ + --consumer-property sasl.mechanism=PLAIN \ + --consumer-property sasl.jaas.config='org.apache.kafka.common.security.plain.PlainLoginModule required username="alice" password="alice-secret";' +``` + +### Produce events + +In another shell we can put ourselves in the provider shoes and create messages from the producer shell: +```shell +docker exec -it kafka-kraft /bin/kafka-console-producer --topic kafka-stream-topic \ + --producer.config=/config/admin.properties \ + --bootstrap-server localhost:9093 +``` + +For every message created on the provider side we will see a message on the consumer side. diff --git a/transfer/streaming/streaming-03-kafka-broker/kafka-config/admin.properties b/transfer/streaming/streaming-03-kafka-broker/kafka-config/admin.properties new file mode 100644 index 00000000..65d00847 --- /dev/null +++ b/transfer/streaming/streaming-03-kafka-broker/kafka-config/admin.properties @@ -0,0 +1,3 @@ +security.protocol=SASL_PLAINTEXT +sasl.mechanism=PLAIN +sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required username="admin" password="admin-secret"; diff --git a/transfer/streaming/streaming-03-kafka-broker/kafka.env b/transfer/streaming/streaming-03-kafka-broker/kafka.env new file mode 100644 index 00000000..f5faa1c0 --- /dev/null +++ b/transfer/streaming/streaming-03-kafka-broker/kafka.env @@ -0,0 +1,10 @@ +KAFKA_AUTHORIZER_CLASS_NAME=org.apache.kafka.metadata.authorizer.StandardAuthorizer +KAFKA_AUTO_CREATE_TOPICS_ENABLE=true +KAFKA_LISTENER_NAME_BROKER_PLAIN_SASL_JAAS_CONFIG=org.apache.kafka.common.security.plain.PlainLoginModule required username="admin" password="admin-secret" user_admin="admin-secret"; +KAFKA_LISTENER_NAME_CONTROLLER_PLAIN_SASL_JAAS_CONFIG=org.apache.kafka.common.security.plain.PlainLoginModule required username="admin" password="admin-secret" user_admin="admin-secret"; +KAFKA_LISTENER_NAME_PLAINTEXT_PLAIN_SASL_JAAS_CONFIG=org.apache.kafka.common.security.plain.PlainLoginModule required username="admin" password="admin-secret" user_admin="admin-secret" user_alice="alice-secret"; +KAFKA_LISTENER_SECURITY_PROTOCOL_MAP=BROKER:SASL_PLAINTEXT,PLAINTEXT:SASL_PLAINTEXT,CONTROLLER:SASL_PLAINTEXT +KAFKA_SASL_ENABLED_MECHANISMS=PLAIN +KAFKA_SASL_MECHANISM_CONTROLLER_PROTOCOL=PLAIN +KAFKA_SASL_MECHANISM_INTER_BROKER_PROTOCOL=PLAIN +KAFKA_SUPER_USERS=User:admin diff --git a/transfer/streaming/streaming-03-kafka-broker/kafka_broker_jaas.conf b/transfer/streaming/streaming-03-kafka-broker/kafka_broker_jaas.conf new file mode 100644 index 00000000..12b5f8f9 --- /dev/null +++ b/transfer/streaming/streaming-03-kafka-broker/kafka_broker_jaas.conf @@ -0,0 +1,7 @@ +KafkaServer { + org.apache.kafka.common.security.plain.PlainLoginModule required + username="admin" + password="admin-secret" + user_admin="admin-secret" + user_alice="alice-secret"; +}; diff --git a/transfer/streaming/streaming-03-kafka-broker/streaming-03-runtime/build.gradle.kts b/transfer/streaming/streaming-03-kafka-broker/streaming-03-runtime/build.gradle.kts new file mode 100644 index 00000000..5c6b5c0a --- /dev/null +++ b/transfer/streaming/streaming-03-kafka-broker/streaming-03-runtime/build.gradle.kts @@ -0,0 +1,48 @@ +/* + * Copyright (c) 2023 Bayerische Motoren Werke Aktiengesellschaft (BMW AG) + * + * This program and the accompanying materials are made available under the + * terms of the Apache License, Version 2.0 which is available at + * https://www.apache.org/licenses/LICENSE-2.0 + * + * SPDX-License-Identifier: Apache-2.0 + * + * Contributors: + * Bayerische Motoren Werke Aktiengesellschaft (BMW AG) - initial API and implementation + * + */ + +plugins { + `java-library` + id("application") + alias(libs.plugins.shadow) +} + +dependencies { + implementation(libs.edc.control.plane.api.client) + implementation(libs.edc.control.plane.core) + implementation(libs.edc.data.plane.selector.core) + implementation(libs.edc.api.observability) + implementation(libs.edc.configuration.filesystem) + implementation(libs.edc.iam.mock) + implementation(libs.edc.management.api) + implementation(libs.edc.dsp) + implementation(libs.edc.data.plane.selector.api) + implementation(libs.edc.data.plane.selector.client) + implementation(libs.edc.transfer.data.plane) + implementation(libs.edc.transfer.pull.http.dynamic.receiver) + implementation(libs.edc.data.plane.spi) + implementation(libs.edc.data.plane.core) + implementation(libs.edc.data.plane.http) + implementation(libs.edc.data.plane.kafka) + +} + +application { + mainClass.set("org.eclipse.edc.boot.system.runtime.BaseRuntime") +} + +tasks.withType { + mergeServiceFiles() + archiveFileName.set("connector.jar") +} diff --git a/transfer/streaming/streaming-03-kafka-broker/streaming-03-runtime/consumer.properties b/transfer/streaming/streaming-03-kafka-broker/streaming-03-runtime/consumer.properties new file mode 100644 index 00000000..da471ae1 --- /dev/null +++ b/transfer/streaming/streaming-03-kafka-broker/streaming-03-runtime/consumer.properties @@ -0,0 +1,11 @@ +web.http.port=28180 +web.http.path=/api +web.http.management.port=28181 +web.http.management.path=/management +web.http.protocol.port=28182 +web.http.protocol.path=/protocol +web.http.control.port=28183 +web.http.control.path=/control +edc.dsp.callback.address=http://localhost:28182/protocol +edc.participant.id=consumer +edc.receiver.http.dynamic.endpoint=http://localhost:4000/receiver diff --git a/transfer/streaming/streaming-03-kafka-broker/streaming-03-runtime/provider.properties b/transfer/streaming/streaming-03-kafka-broker/streaming-03-runtime/provider.properties new file mode 100644 index 00000000..a06dcb9f --- /dev/null +++ b/transfer/streaming/streaming-03-kafka-broker/streaming-03-runtime/provider.properties @@ -0,0 +1,13 @@ +web.http.port=18180 +web.http.path=/api +web.http.management.port=18181 +web.http.management.path=/management +web.http.protocol.port=18182 +web.http.protocol.path=/protocol +web.http.control.port=18183 +web.http.control.path=/control +edc.dsp.callback.address=http://localhost:18182/protocol +edc.participant.id=provider +edc.ids.id=urn:connector:provider +edc.dataplane.http.sink.partition.size=1 +edc.receiver.http.dynamic.endpoint=http://localhost/not/used/in/this/sample diff --git a/transfer/streaming/streaming-03-kafka-broker/streaming-03-runtime/src/main/java/org/eclipse/edc/samples/streaming/KafkaExtension.java b/transfer/streaming/streaming-03-kafka-broker/streaming-03-runtime/src/main/java/org/eclipse/edc/samples/streaming/KafkaExtension.java new file mode 100644 index 00000000..7657cfb2 --- /dev/null +++ b/transfer/streaming/streaming-03-kafka-broker/streaming-03-runtime/src/main/java/org/eclipse/edc/samples/streaming/KafkaExtension.java @@ -0,0 +1,40 @@ +/* + * Copyright (c) 2023 Bayerische Motoren Werke Aktiengesellschaft (BMW AG) + * + * This program and the accompanying materials are made available under the + * terms of the Apache License, Version 2.0 which is available at + * https://www.apache.org/licenses/LICENSE-2.0 + * + * SPDX-License-Identifier: Apache-2.0 + * + * Contributors: + * Bayerische Motoren Werke Aktiengesellschaft (BMW AG) - initial API and implementation + * + */ + +package org.eclipse.edc.samples.streaming; + +import org.eclipse.edc.connector.transfer.spi.flow.DataFlowManager; +import org.eclipse.edc.runtime.metamodel.annotation.Inject; +import org.eclipse.edc.spi.system.ServiceExtension; +import org.eclipse.edc.spi.system.ServiceExtensionContext; + +/** + * Kafka Broker flow extension + */ +public class KafkaExtension implements ServiceExtension { + + @Override + public String name() { + return "Kafka stream extension"; + } + + @Inject + private DataFlowManager dataFlowManager; + + @Override + public void initialize(ServiceExtensionContext context) { + dataFlowManager.register(10, new KafkaToKafkaDataFlowController()); + } + +} diff --git a/transfer/streaming/streaming-03-kafka-broker/streaming-03-runtime/src/main/java/org/eclipse/edc/samples/streaming/KafkaToKafkaDataFlowController.java b/transfer/streaming/streaming-03-kafka-broker/streaming-03-runtime/src/main/java/org/eclipse/edc/samples/streaming/KafkaToKafkaDataFlowController.java new file mode 100644 index 00000000..b1a11ecc --- /dev/null +++ b/transfer/streaming/streaming-03-kafka-broker/streaming-03-runtime/src/main/java/org/eclipse/edc/samples/streaming/KafkaToKafkaDataFlowController.java @@ -0,0 +1,55 @@ +/* + * Copyright (c) 2023 Bayerische Motoren Werke Aktiengesellschaft (BMW AG) + * + * This program and the accompanying materials are made available under the + * terms of the Apache License, Version 2.0 which is available at + * https://www.apache.org/licenses/LICENSE-2.0 + * + * SPDX-License-Identifier: Apache-2.0 + * + * Contributors: + * Bayerische Motoren Werke Aktiengesellschaft (BMW AG) - initial API and implementation + * + */ + +package org.eclipse.edc.samples.streaming; + +import org.eclipse.edc.connector.transfer.spi.flow.DataFlowController; +import org.eclipse.edc.connector.transfer.spi.types.DataFlowResponse; +import org.eclipse.edc.connector.transfer.spi.types.TransferProcess; +import org.eclipse.edc.dataplane.kafka.schema.KafkaDataAddressSchema; +import org.eclipse.edc.policy.model.Policy; +import org.eclipse.edc.spi.response.StatusResult; +import org.eclipse.edc.spi.types.domain.DataAddress; +import org.eclipse.edc.spi.types.domain.edr.EndpointDataReference; +import org.jetbrains.annotations.NotNull; + +import static org.eclipse.edc.spi.CoreConstants.EDC_NAMESPACE; + +class KafkaToKafkaDataFlowController implements DataFlowController { + @Override + public boolean canHandle(TransferProcess transferProcess) { + return "Kafka".equals(transferProcess.getContentDataAddress().getType()) && "Kafka".equals(transferProcess.getDestinationType()); + } + + @Override + public @NotNull StatusResult initiateFlow(TransferProcess transferProcess, Policy policy) { + // static credentials, in a production case these should be created dynamically and an ACLs entry should be added + var username = "alice"; + var password = "alice-secret"; + + var contentDataAddress = transferProcess.getContentDataAddress(); + var kafkaDataAddress = DataAddress.Builder.newInstance() + .type(EndpointDataReference.EDR_SIMPLE_TYPE) + .property(EndpointDataReference.ID, transferProcess.getCorrelationId()) + .property(EndpointDataReference.ENDPOINT, contentDataAddress.getStringProperty("kafka.bootstrap.servers")) + .property(EndpointDataReference.AUTH_KEY, username) + .property(EndpointDataReference.AUTH_CODE, password) + .property(EDC_NAMESPACE + KafkaDataAddressSchema.TOPIC, contentDataAddress.getStringProperty(KafkaDataAddressSchema.TOPIC)) + .build(); + + return StatusResult.success(DataFlowResponse.Builder.newInstance().dataAddress(kafkaDataAddress).build()); + } + + // TODO: terminate data flow method will available in the next EDC version, it will permit to remove permissions to the user to access the topic +} diff --git a/transfer/streaming/streaming-03-kafka-broker/streaming-03-runtime/src/main/resources/META-INF/services/org.eclipse.edc.spi.system.ServiceExtension b/transfer/streaming/streaming-03-kafka-broker/streaming-03-runtime/src/main/resources/META-INF/services/org.eclipse.edc.spi.system.ServiceExtension new file mode 100644 index 00000000..849cf54a --- /dev/null +++ b/transfer/streaming/streaming-03-kafka-broker/streaming-03-runtime/src/main/resources/META-INF/services/org.eclipse.edc.spi.system.ServiceExtension @@ -0,0 +1 @@ +org.eclipse.edc.samples.streaming.KafkaExtension