Skip to content

Commit

Permalink
adapt
Browse files Browse the repository at this point in the history
  • Loading branch information
ndr-brt committed Mar 28, 2024
1 parent d6f8ed8 commit e7a082b
Show file tree
Hide file tree
Showing 7 changed files with 27 additions and 20 deletions.
3 changes: 2 additions & 1 deletion gradle/libs.versions.toml
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,8 @@ edc-http = { module = "org.eclipse.edc:http", version.ref = "edc" }
edc-iam-mock = { module = "org.eclipse.edc:iam-mock", version.ref = "edc" }
edc-jersey-micrometer = { module = "org.eclipse.edc:jersey-micrometer", version.ref = "edc" }
edc-jetty-micrometer = { module = "org.eclipse.edc:jetty-micrometer", version.ref = "edc" }
edc-json-ld = { module = "org.eclipse.edc:json-ld", version.ref = "edc" }
edc-json-ld-lib = { module = "org.eclipse.edc:json-ld-lib", version.ref = "edc" }
edc-json-ld-spi = { module = "org.eclipse.edc:json-ld-spi", version.ref = "edc" }
edc-junit = { module = "org.eclipse.edc:junit", version.ref = "edc" }
edc-management-api = { module = "org.eclipse.edc:management-api", version.ref = "edc" }
edc-micrometer-core = { module = "org.eclipse.edc:micrometer-core", version.ref = "edc" }
Expand Down
3 changes: 2 additions & 1 deletion system-tests/build.gradle.kts
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,8 @@ plugins {

dependencies {
testImplementation(libs.edc.junit)
testImplementation(libs.edc.json.ld)
testImplementation(libs.edc.json.ld.lib)
testImplementation(libs.edc.json.ld.spi)
testImplementation(libs.edc.control.plane.spi)
testImplementation(libs.awaitility)
testImplementation(libs.okhttp.mockwebserver)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@
import okhttp3.mockwebserver.MockWebServer;
import org.eclipse.edc.junit.annotations.EndToEndTest;
import org.eclipse.edc.junit.extensions.EdcRuntimeExtension;
import org.eclipse.edc.junit.testfixtures.TestUtils;
import org.eclipse.edc.util.io.Ports;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.RegisterExtension;
Expand Down Expand Up @@ -79,7 +79,7 @@ public class Streaming01httpToHttpTest {
"edc.fs.config", getFileFromRelativePath(SAMPLE_FOLDER + "/streaming-01-runtime/consumer.properties").getAbsolutePath()
)
);
private final int httpReceiverPort = TestUtils.getFreePort();
private final int httpReceiverPort = Ports.getFreePort();
private final MockWebServer consumerReceiverServer = new MockWebServer();

@BeforeEach
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@
import org.apache.kafka.common.serialization.StringSerializer;
import org.eclipse.edc.junit.annotations.EndToEndTest;
import org.eclipse.edc.junit.extensions.EdcRuntimeExtension;
import org.eclipse.edc.junit.testfixtures.TestUtils;
import org.eclipse.edc.util.io.Ports;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.RegisterExtension;
Expand Down Expand Up @@ -98,7 +98,7 @@ public class Streaming02KafkaToHttpTest {
.getAbsolutePath()
)
);
private final int httpReceiverPort = TestUtils.getFreePort();
private final int httpReceiverPort = Ports.getFreePort();
private final MockWebServer consumerReceiverServer = new MockWebServer();

@BeforeEach
Expand Down Expand Up @@ -128,7 +128,7 @@ void streamData() {
Json.createObjectBuilder().build(), destination);

await().atMost(TIMEOUT).untilAsserted(() -> {
String state = CONSUMER.getTransferProcessState(transferProcessId);
var state = CONSUMER.getTransferProcessState(transferProcessId);
assertThat(state).isEqualTo(STARTED.name());
});

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,8 +40,8 @@
import org.apache.kafka.common.serialization.StringSerializer;
import org.eclipse.edc.junit.annotations.EndToEndTest;
import org.eclipse.edc.junit.extensions.EdcRuntimeExtension;
import org.eclipse.edc.junit.testfixtures.TestUtils;
import org.eclipse.edc.spi.types.domain.edr.EndpointDataReference;
import org.eclipse.edc.util.io.Ports;
import org.jetbrains.annotations.NotNull;
import org.jetbrains.annotations.Nullable;
import org.junit.jupiter.api.BeforeEach;
Expand Down Expand Up @@ -126,7 +126,7 @@ public class Streaming03KafkaToKafkaTest {
)
);

private final int httpReceiverPort = TestUtils.getFreePort();
private final int httpReceiverPort = Ports.getFreePort();
private final MockWebServer edrReceiverServer = new MockWebServer();

@BeforeEach
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@
import org.eclipse.edc.connector.dataplane.spi.pipeline.DataSourceFactory;
import org.eclipse.edc.connector.transfer.spi.types.TransferProcess;
import org.eclipse.edc.spi.result.Result;
import org.eclipse.edc.spi.types.domain.transfer.DataFlowRequest;
import org.eclipse.edc.spi.types.domain.transfer.DataFlowStartMessage;
import org.jetbrains.annotations.NotNull;

import java.io.File;
Expand All @@ -31,28 +31,27 @@
public class HttpStreamingDataSourceFactory implements DataSourceFactory {

@Override
public boolean canHandle(DataFlowRequest request) {
return request.getSourceDataAddress().getType().equals("HttpStreaming");
public boolean canHandle(DataFlowStartMessage dataFlowStartMessage) {
return dataFlowStartMessage.getSourceDataAddress().getType().equals("HttpStreaming");
}

@Override
public DataSource createSource(DataFlowRequest request) {
return new HttpStreamingDataSource(sourceFolder(request).get());
public DataSource createSource(DataFlowStartMessage dataFlowStartMessage) {
return new HttpStreamingDataSource(sourceFolder(dataFlowStartMessage).get());
}

@Override
public @NotNull Result<Void> validateRequest(DataFlowRequest request) {
return sourceFolder(request)
public @NotNull Result<Void> validateRequest(DataFlowStartMessage dataFlowStartMessage) {
return sourceFolder(dataFlowStartMessage)
.map(it -> Result.success())
.orElseGet(() -> Result.failure("sourceFolder is not found or it does not exist"));
}

private Optional<File> sourceFolder(DataFlowRequest request) {
private Optional<File> sourceFolder(DataFlowStartMessage request) {
return Optional.of(request)
.map(DataFlowRequest::getSourceDataAddress)
.map(DataFlowStartMessage::getSourceDataAddress)
.map(it -> it.getStringProperty("sourceFolder"))
.map(File::new)
.filter(File::exists);
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ public boolean canHandle(TransferProcess transferProcess) {
}

@Override
public @NotNull StatusResult<DataFlowResponse> initiateFlow(TransferProcess transferProcess, Policy policy) {
public @NotNull StatusResult<DataFlowResponse> start(TransferProcess transferProcess, Policy policy) {
// static credentials, in a production case these should be created dynamically and an ACLs entry should be added
var username = "alice";
var password = "alice-secret";
Expand All @@ -57,6 +57,12 @@ public boolean canHandle(TransferProcess transferProcess) {
return StatusResult.success(DataFlowResponse.Builder.newInstance().dataAddress(kafkaDataAddress).build());
}

@Override
public StatusResult<Void> suspend(TransferProcess transferProcess) {
// here the flow can be suspended, not something covered in this sample
return StatusResult.success();
}

@Override
public StatusResult<Void> terminate(TransferProcess transferProcess) {
// here the flow can be terminated, not something covered in this sample
Expand Down

0 comments on commit e7a082b

Please sign in to comment.