Skip to content

Commit

Permalink
feat: provide kafka-to-kafka sample
Browse files Browse the repository at this point in the history
  • Loading branch information
ndr-brt committed Nov 15, 2023
1 parent e9e41d5 commit 74c4dfc
Show file tree
Hide file tree
Showing 23 changed files with 742 additions and 201 deletions.
12 changes: 0 additions & 12 deletions .github/workflows/verify.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -15,18 +15,6 @@ concurrency:
cancel-in-progress: true

jobs:
Checkstyle:
permissions:
id-token: write
checks: write
pull-requests: write
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v4
- uses: ./.github/actions/setup-build

- name: Run Checkstyle
run: ./gradlew checkstyleMain checkstyleTest

Build:
runs-on: ubuntu-latest
Expand Down
1 change: 1 addition & 0 deletions gradle/libs.versions.toml
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,7 @@ edc-runtime-metamodel = { module = "org.eclipse.edc:runtime-metamodel", version.
edc-transfer-data-plane = { module = "org.eclipse.edc:transfer-data-plane", version.ref = "edc" }
edc-transfer-process-api = { module = "org.eclipse.edc:transfer-process-api", version.ref = "edc" }
edc-transfer-pull-http-receiver = { module = "org.eclipse.edc:transfer-pull-http-receiver", version.ref = "edc" }
edc-transfer-pull-http-dynamic-receiver = { module = "org.eclipse.edc:transfer-pull-http-dynamic-receiver", version.ref = "edc" }
edc-transfer-spi = { module = "org.eclipse.edc:transfer-spi", version.ref = "edc" }
edc-util = { module = "org.eclipse.edc:util", version.ref = "edc" }
edc-vault-azure = { module = "org.eclipse.edc:vault-azure", version.ref = "edc" }
Expand Down
11 changes: 4 additions & 7 deletions settings.gradle.kts
Original file line number Diff line number Diff line change
Expand Up @@ -28,12 +28,10 @@ dependencyResolutionManagement {
}
}

// basic
include(":basic:basic-01-basic-connector")
include(":basic:basic-02-health-endpoint")
include(":basic:basic-03-configuration")

// transfer
include(":transfer:transfer-00-prerequisites:connector")

include(":transfer:transfer-04-event-consumer:consumer-with-listener")
Expand All @@ -43,16 +41,15 @@ include(":transfer:transfer-05-file-transfer-cloud:cloud-transfer-consumer")
include(":transfer:transfer-05-file-transfer-cloud:cloud-transfer-provider")
include(":transfer:transfer-05-file-transfer-cloud:transfer-file-cloud")

include("transfer:streaming:streaming-01-http-to-http:streaming-01-runtime")
include("transfer:streaming:streaming-02-kafka-to-http:streaming-02-runtime")
include(":transfer:streaming:streaming-01-http-to-http:streaming-01-runtime")
include(":transfer:streaming:streaming-02-kafka-to-http:streaming-02-runtime")
include(":transfer:streaming:streaming-03-kafka-broker:streaming-03-runtime")

// advanced
include(":advanced:advanced-01-open-telemetry:open-telemetry-consumer")
include(":advanced:advanced-01-open-telemetry:open-telemetry-provider")

// modules for code samples ------------------------------------------------------------------------
include(":other:custom-runtime")

include("util:http-request-logger")
include(":util:http-request-logger")

include(":system-tests")
3 changes: 2 additions & 1 deletion system-tests/build.gradle.kts
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,8 @@ dependencies {
testCompileOnly(project(":transfer:transfer-04-event-consumer:listener"))
testCompileOnly(project(":transfer:streaming:streaming-01-http-to-http:streaming-01-runtime"))
testCompileOnly(project(":transfer:streaming:streaming-02-kafka-to-http:streaming-02-runtime"))
testCompileOnly(project(":transfer:streaming:streaming-03-kafka-broker:streaming-03-runtime"))

testCompileOnly(project(":advanced:advanced-01-open-telemetry:open-telemetry-provider"))
testCompileOnly(project(":advanced:advanced-01-open-telemetry:open-telemetry-consumer"))
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -15,63 +15,19 @@

package org.eclipse.edc.samples.common;

import com.fasterxml.jackson.databind.ObjectMapper;
import io.restassured.http.ContentType;
import org.apache.http.HttpStatus;
import org.eclipse.edc.connector.transfer.spi.types.DataRequest;
import org.eclipse.edc.junit.testfixtures.TestUtils;
import org.jetbrains.annotations.NotNull;

import java.io.File;
import java.io.IOException;
import java.nio.file.Files;
import java.nio.file.Paths;
import java.time.Duration;
import java.util.Map;

import static io.restassured.RestAssured.given;
import static org.assertj.core.api.Assertions.assertThat;
import static org.awaitility.Awaitility.await;
import static org.eclipse.edc.connector.transfer.spi.types.TransferProcessStates.COMPLETED;
import static org.hamcrest.Matchers.emptyString;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.not;

/**
* Encapsulates common settings, test steps, and helper methods for transfer samples
*/
public class FileTransferCommon {

static final ObjectMapper MAPPER = new ObjectMapper();

static final String MANAGEMENT_API_URL = "http://localhost:9192/management";
static final String CONTRACT_OFFER_FILE_PATH = "transfer/transfer-01-file-transfer/contractoffer.json";
static final String TRANSFER_FILE_PATH = "transfer/transfer-01-file-transfer/filetransfer.json";
static final String API_KEY_HEADER_KEY = "X-Api-Key";
static final String API_KEY_HEADER_VALUE = "password";

final String sampleAssetFilePath;
final File sampleAssetFile;
final File destinationFile;
Duration timeout = Duration.ofSeconds(30);
Duration pollInterval = Duration.ofMillis(500);

String contractNegotiationId;
String contractAgreementId;

/**
* Creates a new {@code FileTransferSampleTestCommon} instance.
*
* @param sampleAssetFilePath Relative path starting from the root of the project to a file which will be read from for transfer.
* @param destinationFilePath Relative path starting from the root of the project where the transferred file will be written to.
*/
public FileTransferCommon(@NotNull String sampleAssetFilePath, @NotNull String destinationFilePath) {
this.sampleAssetFilePath = sampleAssetFilePath;
sampleAssetFile = getFileFromRelativePath(sampleAssetFilePath);

destinationFile = getFileFromRelativePath(destinationFilePath);
}

/**
* Resolves a {@link File} instance from a relative path.
*/
Expand All @@ -93,141 +49,4 @@ public static String getFileContentFromRelativePath(String relativePath) {
}
}

/**
* Assert that prerequisites are fulfilled before running the test.
* This assertion checks only whether the file to be copied is not existing already.
*/
void assertTestPrerequisites() {
assertThat(destinationFile).doesNotExist();
}

/**
* Remove files created while running the tests.
* The copied file will be deleted.
*/
void cleanTemporaryTestFiles() {
destinationFile.delete();
}

/**
* Assert that the file to be copied exists at the expected location.
* This method waits a duration which is defined in {@link FileTransferCommon#timeout}.
*/
void assertDestinationFileContent() {
await().atMost(timeout).pollInterval(pollInterval).untilAsserted(()
-> assertThat(destinationFile).hasSameBinaryContentAs(sampleAssetFile));
}

/**
* Assert that the transfer process state on the consumer is completed.
*/
void assertTransferProcessStatusConsumerSide(String transferProcessId) {
await().atMost(timeout).pollInterval(pollInterval).untilAsserted(() -> {
var state = getTransferProcessState(transferProcessId);

assertThat(state).isEqualTo(COMPLETED.name());
});
}

/**
* Assert that a POST request to initiate a contract negotiation is successful.
* This method corresponds to the command in the sample: {@code curl -X POST -H "Content-Type: application/json" -H "X-Api-Key: password" -d @transfer/transfer-01-file-transfer/contractoffer.json "http://localhost:9192/management/v2/contractnegotiations"}
*/
void initiateContractNegotiation() {
initiateContractNegotiation(CONTRACT_OFFER_FILE_PATH);
}


void initiateContractNegotiation(String contractOfferFilePath) {
contractNegotiationId = given()
.headers(API_KEY_HEADER_KEY, API_KEY_HEADER_VALUE)
.contentType(ContentType.JSON)
.body(new File(TestUtils.findBuildRoot(), contractOfferFilePath))
.when()
.post(MANAGEMENT_API_URL + "/v2/contractnegotiations")
.then()
.statusCode(HttpStatus.SC_OK)
.body("@id", not(emptyString()))
.extract()
.jsonPath()
.get("@id");
}

public String getTransferProcessState(String processId) {
return given()
.headers(API_KEY_HEADER_KEY, API_KEY_HEADER_VALUE)
.when()
.get(String.format("%s/%s", MANAGEMENT_API_URL + "/v2/transferprocesses", processId))
.then()
.statusCode(HttpStatus.SC_OK)
.extract().body().jsonPath().getString("'edc:state'");
}

/**
* Assert that a GET request to look up a contract agreement is successful.
* This method corresponds to the command in the sample: {@code curl -X GET -H 'X-Api-Key: password' "http://localhost:9192/management/v2/contractnegotiations/{UUID}"}
*/
void lookUpContractAgreementId() {
// Wait for transfer to be completed.
await().atMost(timeout).pollInterval(pollInterval).untilAsserted(() -> contractAgreementId = given()
.headers(API_KEY_HEADER_KEY, API_KEY_HEADER_VALUE)
.when()
.get(MANAGEMENT_API_URL + "/v2/contractnegotiations/{id}", contractNegotiationId)
.then()
.statusCode(HttpStatus.SC_OK)
.body("'edc:state'", equalTo("FINALIZED"))
.body("'edc:contractAgreementId'", not(emptyString()))
.extract().body().jsonPath().getString("'edc:contractAgreementId'")
);
}

/**
* Assert that a POST request to initiate transfer process is successful.
* This method corresponds to the command in the sample: {@code curl -X POST -H "Content-Type: application/json" -H "X-Api-Key: password" -d @transfer/transfer-01-file-transfer/filetransfer.json "http://localhost:9192/management/v2/transferprocesses"}
*
* @throws IOException Thrown if there was an error accessing the transfer request file defined in {@link FileTransferCommon#TRANSFER_FILE_PATH}.
*/
String requestTransferFile(String transferFilePath) throws IOException {
var transferJsonFile = getFileFromRelativePath(transferFilePath);
var requestBody = readAndUpdateDataRequestFromJsonFile(transferJsonFile, contractAgreementId);

var jsonPath = given()
.headers(API_KEY_HEADER_KEY, API_KEY_HEADER_VALUE)
.contentType(ContentType.JSON)
.body(requestBody)
.when()
.post(MANAGEMENT_API_URL + "/v2/transferprocesses")
.then()
.log().ifError()
.statusCode(HttpStatus.SC_OK)
.body("@id", not(emptyString()))
.extract()
.jsonPath();

var transferProcessId = jsonPath.getString("@id");

assertThat(transferProcessId).isNotEmpty();

return transferProcessId;
}

String requestTransferFile() throws IOException {
return requestTransferFile(TRANSFER_FILE_PATH);
}

/**
* Reads a transfer request file with changed value for contract agreement ID and file destination path.
*
* @param transferJsonFile A {@link File} instance pointing to a JSON transfer request file.
* @param contractAgreementId This string containing a UUID will be used as value for the contract agreement ID.
* @return An instance of {@link DataRequest} with changed values for contract agreement ID and file destination path.
* @throws IOException Thrown if there was an error accessing the file given in transferJsonFile.
*/
Map<String, Object> readAndUpdateDataRequestFromJsonFile(@NotNull File transferJsonFile, @NotNull String contractAgreementId) throws IOException {
var fileString = Files.readString(transferJsonFile.toPath())
.replace("{path to destination file}", destinationFile.getAbsolutePath())
.replace("{agreement ID}", contractAgreementId);

return MAPPER.readValue(fileString, Map.class);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
package org.eclipse.edc.samples.transfer.streaming;

import org.jetbrains.annotations.NotNull;
import org.testcontainers.containers.KafkaContainer;
import org.testcontainers.utility.DockerImageName;

import java.io.File;
import java.io.IOException;
import java.nio.file.Files;

/**
* Extension of the {@link KafkaContainer} that permits to set the SASL_PLAINTEXT security protocol
*/
public class KafkaSaslContainer extends KafkaContainer {

private static final String KAFKA_IMAGE_NAME = "confluentinc/cp-kafka:7.5.2";
private final File envFile;

public KafkaSaslContainer(@NotNull File envFile) {
super(DockerImageName.parse(KAFKA_IMAGE_NAME));
this.withKraft();
this.envFile = envFile;
}

@Override
protected void configureKraft() {
super.configureKraft();
try {
Files.readAllLines(envFile.toPath())
.stream().map(it -> it.split("=", 2))
.forEach(it -> this.withEnv(it[0], it[1]));
} catch (IOException e) {
throw new RuntimeException(e);
}
}

}
Loading

0 comments on commit 74c4dfc

Please sign in to comment.