Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: provide kafka-broker transfer streaming sample #151

Merged
merged 1 commit into from
Nov 20, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 0 additions & 12 deletions .github/workflows/verify.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -15,18 +15,6 @@ concurrency:
cancel-in-progress: true

jobs:
Checkstyle:
permissions:
id-token: write
checks: write
pull-requests: write
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v4
- uses: ./.github/actions/setup-build

- name: Run Checkstyle
run: ./gradlew checkstyleMain checkstyleTest

Build:
runs-on: ubuntu-latest
Expand Down
1 change: 1 addition & 0 deletions gradle/libs.versions.toml
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,7 @@ edc-runtime-metamodel = { module = "org.eclipse.edc:runtime-metamodel", version.
edc-transfer-data-plane = { module = "org.eclipse.edc:transfer-data-plane", version.ref = "edc" }
edc-transfer-process-api = { module = "org.eclipse.edc:transfer-process-api", version.ref = "edc" }
edc-transfer-pull-http-receiver = { module = "org.eclipse.edc:transfer-pull-http-receiver", version.ref = "edc" }
edc-transfer-pull-http-dynamic-receiver = { module = "org.eclipse.edc:transfer-pull-http-dynamic-receiver", version.ref = "edc" }
edc-transfer-spi = { module = "org.eclipse.edc:transfer-spi", version.ref = "edc" }
edc-util = { module = "org.eclipse.edc:util", version.ref = "edc" }
edc-vault-azure = { module = "org.eclipse.edc:vault-azure", version.ref = "edc" }
Expand Down
11 changes: 4 additions & 7 deletions settings.gradle.kts
Original file line number Diff line number Diff line change
Expand Up @@ -28,12 +28,10 @@ dependencyResolutionManagement {
}
}

// basic
include(":basic:basic-01-basic-connector")
include(":basic:basic-02-health-endpoint")
include(":basic:basic-03-configuration")

// transfer
include(":transfer:transfer-00-prerequisites:connector")

include(":transfer:transfer-04-event-consumer:consumer-with-listener")
Expand All @@ -43,16 +41,15 @@ include(":transfer:transfer-05-file-transfer-cloud:cloud-transfer-consumer")
include(":transfer:transfer-05-file-transfer-cloud:cloud-transfer-provider")
include(":transfer:transfer-05-file-transfer-cloud:transfer-file-cloud")

include("transfer:streaming:streaming-01-http-to-http:streaming-01-runtime")
include("transfer:streaming:streaming-02-kafka-to-http:streaming-02-runtime")
include(":transfer:streaming:streaming-01-http-to-http:streaming-01-runtime")
include(":transfer:streaming:streaming-02-kafka-to-http:streaming-02-runtime")
include(":transfer:streaming:streaming-03-kafka-broker:streaming-03-runtime")

// advanced
include(":advanced:advanced-01-open-telemetry:open-telemetry-consumer")
include(":advanced:advanced-01-open-telemetry:open-telemetry-provider")

// modules for code samples ------------------------------------------------------------------------
include(":other:custom-runtime")

include("util:http-request-logger")
include(":util:http-request-logger")

include(":system-tests")
3 changes: 2 additions & 1 deletion system-tests/build.gradle.kts
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,8 @@ dependencies {
testCompileOnly(project(":transfer:transfer-04-event-consumer:listener"))
testCompileOnly(project(":transfer:streaming:streaming-01-http-to-http:streaming-01-runtime"))
testCompileOnly(project(":transfer:streaming:streaming-02-kafka-to-http:streaming-02-runtime"))
testCompileOnly(project(":transfer:streaming:streaming-03-kafka-broker:streaming-03-runtime"))

testCompileOnly(project(":advanced:advanced-01-open-telemetry:open-telemetry-provider"))
testCompileOnly(project(":advanced:advanced-01-open-telemetry:open-telemetry-consumer"))
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -15,63 +15,19 @@

package org.eclipse.edc.samples.common;

import com.fasterxml.jackson.databind.ObjectMapper;
import io.restassured.http.ContentType;
import org.apache.http.HttpStatus;
import org.eclipse.edc.connector.transfer.spi.types.DataRequest;
import org.eclipse.edc.junit.testfixtures.TestUtils;
import org.jetbrains.annotations.NotNull;

import java.io.File;
import java.io.IOException;
import java.nio.file.Files;
import java.nio.file.Paths;
import java.time.Duration;
import java.util.Map;

import static io.restassured.RestAssured.given;
import static org.assertj.core.api.Assertions.assertThat;
import static org.awaitility.Awaitility.await;
import static org.eclipse.edc.connector.transfer.spi.types.TransferProcessStates.COMPLETED;
import static org.hamcrest.Matchers.emptyString;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.not;

/**
* Encapsulates common settings, test steps, and helper methods for transfer samples
*/
public class FileTransferCommon {

static final ObjectMapper MAPPER = new ObjectMapper();

static final String MANAGEMENT_API_URL = "http://localhost:9192/management";
static final String CONTRACT_OFFER_FILE_PATH = "transfer/transfer-01-file-transfer/contractoffer.json";
static final String TRANSFER_FILE_PATH = "transfer/transfer-01-file-transfer/filetransfer.json";
static final String API_KEY_HEADER_KEY = "X-Api-Key";
static final String API_KEY_HEADER_VALUE = "password";

final String sampleAssetFilePath;
final File sampleAssetFile;
final File destinationFile;
Duration timeout = Duration.ofSeconds(30);
Duration pollInterval = Duration.ofMillis(500);

String contractNegotiationId;
String contractAgreementId;

/**
* Creates a new {@code FileTransferSampleTestCommon} instance.
*
* @param sampleAssetFilePath Relative path starting from the root of the project to a file which will be read from for transfer.
* @param destinationFilePath Relative path starting from the root of the project where the transferred file will be written to.
*/
public FileTransferCommon(@NotNull String sampleAssetFilePath, @NotNull String destinationFilePath) {
this.sampleAssetFilePath = sampleAssetFilePath;
sampleAssetFile = getFileFromRelativePath(sampleAssetFilePath);

destinationFile = getFileFromRelativePath(destinationFilePath);
}

/**
* Resolves a {@link File} instance from a relative path.
*/
Expand All @@ -93,141 +49,4 @@ public static String getFileContentFromRelativePath(String relativePath) {
}
}

/**
* Assert that prerequisites are fulfilled before running the test.
* This assertion checks only whether the file to be copied is not existing already.
*/
void assertTestPrerequisites() {
assertThat(destinationFile).doesNotExist();
}

/**
* Remove files created while running the tests.
* The copied file will be deleted.
*/
void cleanTemporaryTestFiles() {
destinationFile.delete();
}

/**
* Assert that the file to be copied exists at the expected location.
* This method waits a duration which is defined in {@link FileTransferCommon#timeout}.
*/
void assertDestinationFileContent() {
await().atMost(timeout).pollInterval(pollInterval).untilAsserted(()
-> assertThat(destinationFile).hasSameBinaryContentAs(sampleAssetFile));
}

/**
* Assert that the transfer process state on the consumer is completed.
*/
void assertTransferProcessStatusConsumerSide(String transferProcessId) {
await().atMost(timeout).pollInterval(pollInterval).untilAsserted(() -> {
var state = getTransferProcessState(transferProcessId);

assertThat(state).isEqualTo(COMPLETED.name());
});
}

/**
* Assert that a POST request to initiate a contract negotiation is successful.
* This method corresponds to the command in the sample: {@code curl -X POST -H "Content-Type: application/json" -H "X-Api-Key: password" -d @transfer/transfer-01-file-transfer/contractoffer.json "http://localhost:9192/management/v2/contractnegotiations"}
*/
void initiateContractNegotiation() {
initiateContractNegotiation(CONTRACT_OFFER_FILE_PATH);
}


void initiateContractNegotiation(String contractOfferFilePath) {
contractNegotiationId = given()
.headers(API_KEY_HEADER_KEY, API_KEY_HEADER_VALUE)
.contentType(ContentType.JSON)
.body(new File(TestUtils.findBuildRoot(), contractOfferFilePath))
.when()
.post(MANAGEMENT_API_URL + "/v2/contractnegotiations")
.then()
.statusCode(HttpStatus.SC_OK)
.body("@id", not(emptyString()))
.extract()
.jsonPath()
.get("@id");
}

public String getTransferProcessState(String processId) {
return given()
.headers(API_KEY_HEADER_KEY, API_KEY_HEADER_VALUE)
.when()
.get(String.format("%s/%s", MANAGEMENT_API_URL + "/v2/transferprocesses", processId))
.then()
.statusCode(HttpStatus.SC_OK)
.extract().body().jsonPath().getString("'edc:state'");
}

/**
* Assert that a GET request to look up a contract agreement is successful.
* This method corresponds to the command in the sample: {@code curl -X GET -H 'X-Api-Key: password' "http://localhost:9192/management/v2/contractnegotiations/{UUID}"}
*/
void lookUpContractAgreementId() {
// Wait for transfer to be completed.
await().atMost(timeout).pollInterval(pollInterval).untilAsserted(() -> contractAgreementId = given()
.headers(API_KEY_HEADER_KEY, API_KEY_HEADER_VALUE)
.when()
.get(MANAGEMENT_API_URL + "/v2/contractnegotiations/{id}", contractNegotiationId)
.then()
.statusCode(HttpStatus.SC_OK)
.body("'edc:state'", equalTo("FINALIZED"))
.body("'edc:contractAgreementId'", not(emptyString()))
.extract().body().jsonPath().getString("'edc:contractAgreementId'")
);
}

/**
* Assert that a POST request to initiate transfer process is successful.
* This method corresponds to the command in the sample: {@code curl -X POST -H "Content-Type: application/json" -H "X-Api-Key: password" -d @transfer/transfer-01-file-transfer/filetransfer.json "http://localhost:9192/management/v2/transferprocesses"}
*
* @throws IOException Thrown if there was an error accessing the transfer request file defined in {@link FileTransferCommon#TRANSFER_FILE_PATH}.
*/
String requestTransferFile(String transferFilePath) throws IOException {
var transferJsonFile = getFileFromRelativePath(transferFilePath);
var requestBody = readAndUpdateDataRequestFromJsonFile(transferJsonFile, contractAgreementId);

var jsonPath = given()
.headers(API_KEY_HEADER_KEY, API_KEY_HEADER_VALUE)
.contentType(ContentType.JSON)
.body(requestBody)
.when()
.post(MANAGEMENT_API_URL + "/v2/transferprocesses")
.then()
.log().ifError()
.statusCode(HttpStatus.SC_OK)
.body("@id", not(emptyString()))
.extract()
.jsonPath();

var transferProcessId = jsonPath.getString("@id");

assertThat(transferProcessId).isNotEmpty();

return transferProcessId;
}

String requestTransferFile() throws IOException {
return requestTransferFile(TRANSFER_FILE_PATH);
}

/**
* Reads a transfer request file with changed value for contract agreement ID and file destination path.
*
* @param transferJsonFile A {@link File} instance pointing to a JSON transfer request file.
* @param contractAgreementId This string containing a UUID will be used as value for the contract agreement ID.
* @return An instance of {@link DataRequest} with changed values for contract agreement ID and file destination path.
* @throws IOException Thrown if there was an error accessing the file given in transferJsonFile.
*/
Map<String, Object> readAndUpdateDataRequestFromJsonFile(@NotNull File transferJsonFile, @NotNull String contractAgreementId) throws IOException {
var fileString = Files.readString(transferJsonFile.toPath())
.replace("{path to destination file}", destinationFile.getAbsolutePath())
.replace("{agreement ID}", contractAgreementId);

return MAPPER.readValue(fileString, Map.class);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
package org.eclipse.edc.samples.transfer.streaming;

import org.jetbrains.annotations.NotNull;
import org.testcontainers.containers.KafkaContainer;
import org.testcontainers.utility.DockerImageName;

import java.io.File;
import java.io.IOException;
import java.nio.file.Files;

/**
* Extension of the {@link KafkaContainer} that permits to set the SASL_PLAINTEXT security protocol
*/
public class KafkaSaslContainer extends KafkaContainer {

private static final String KAFKA_IMAGE_NAME = "confluentinc/cp-kafka:7.5.2";
private final File envFile;

public KafkaSaslContainer(@NotNull File envFile) {
super(DockerImageName.parse(KAFKA_IMAGE_NAME));
this.withKraft();
this.envFile = envFile;
}

@Override
protected void configureKraft() {
super.configureKraft();
try {
Files.readAllLines(envFile.toPath())
.stream().map(it -> it.split("=", 2))
.forEach(it -> this.withEnv(it[0], it[1]));
} catch (IOException e) {
throw new RuntimeException(e);
}
}

}
Loading
Loading