Skip to content

Commit

Permalink
Merge pull request #7 from Informatievlaanderen/feat/httppart
Browse files Browse the repository at this point in the history
Feat/httppart
  • Loading branch information
pj-cegeka authored Jan 25, 2024
2 parents 20bf3fa + f397b32 commit 390d110
Show file tree
Hide file tree
Showing 27 changed files with 1,604 additions and 3 deletions.
14 changes: 14 additions & 0 deletions NOTICE.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
## Changes from source project

Part of this project uses a modified version of the public api extension and http extension of the edc-connector from: https://github.com/eclipse-edc/Connector

The following files were modified from their original source:
- http-pull-connector/src/main/java/org/eclipse/org/eclipse/edc/connector/dataplane/api/controller/ContainerRequestContextApiImpl.java
- http-pull-connector/src/main/java/org/eclipse/org/eclipse/edc/connector/dataplane/api/controller/DataFlowRequestSupplier.java
- http-pull-connector/src/main/java/org/eclipse/edc/connector/dataplane/api/controller/ExtendedDataPlanePublicApiController.java
- http-pull-connector/src/main/java/org/eclipse/edc/connector/dataplane/api/pipeline/ApiDataSink.java
- http-pull-connector/src/main/java/org/eclipse/edc/connector/dataplane/api/pipeline/ApiDataSinkFactory.java
- http-pull-connector/src/main/java/org/eclipse/edc/connector/dataplane/api/DataPlanePublicApiExtension.java
- http-pull-connector/src/main/java/org/eclipse/edc/connector/dataplane/http/pipeline/datasink/HttpDataSink.java
- http-pull-connector/src/main/java/org/eclipse/edc/connector/dataplane/http/pipeline/datasource/HttpDataSource.java
- http-pull-connector/src/main/java/org/eclipse/edc/connector/dataplane/http/DataPlaneHttpExtension.java
4 changes: 3 additions & 1 deletion gradle/libs.versions.toml
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ format.version = "1.1"
[versions]
assertj = "3.24.2"
awaitility = "4.2.0"
edc = "0.2.1"
edc = "0.4.1"
jakarta-json = "2.0.1"
junit-pioneer = "2.1.0"
jupiter = "5.10.0"
Expand All @@ -28,13 +28,15 @@ edc-catalog-spi = { module = "org.eclipse.edc:federated-catalog-spi", version.re
edc-configuration-filesystem = { module = "org.eclipse.edc:configuration-filesystem", version.ref = "edc" }
edc-connector-core = { module = "org.eclipse.edc:connector-core", version.ref = "edc" }
edc-control-plane-core = { module = "org.eclipse.edc:control-plane-core", version.ref = "edc" }
edc-control-plane-api-client = { module = "org.eclipse.edc:control-plane-api-client", version.ref = "edc" }
edc-control-plane-spi = { module = "org.eclipse.edc:control-plane-spi", version.ref = "edc" }
edc-data-plane-api = { module = "org.eclipse.edc:data-plane-api", version.ref = "edc" }
edc-data-plane-aws-s3 = { module = "org.eclipse.edc:data-plane-aws-s3", version.ref = "edc" }
edc-data-plane-azure-storage = { module = "org.eclipse.edc:data-plane-azure-storage", version.ref = "edc" }
edc-data-plane-client = { module = "org.eclipse.edc:data-plane-client", version.ref = "edc" }
edc-data-plane-core = { module = "org.eclipse.edc:data-plane-core", version.ref = "edc" }
edc-data-plane-http = { module = "org.eclipse.edc:data-plane-http", version.ref = "edc" }
edc-data-plane-http-spi = { module = "org.eclipse.edc:data-plane-http-spi", version.ref = "edc" }
edc-data-plane-selector-api = { module = "org.eclipse.edc:data-plane-selector-api", version.ref = "edc" }
edc-data-plane-selector-client = { module = "org.eclipse.edc:data-plane-selector-client", version.ref = "edc" }
edc-data-plane-selector-core = { module = "org.eclipse.edc:data-plane-selector-core", version.ref = "edc" }
Expand Down
12 changes: 10 additions & 2 deletions http-pull-connector/build.gradle.kts
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,10 @@ repositories {

dependencies {

implementation(libs.edc.data.plane.util)
implementation(libs.edc.util)
implementation(libs.edc.jersey.core)

implementation(libs.edc.control.plane.core)
implementation(libs.edc.dsp)
implementation(libs.edc.configuration.filesystem)
Expand All @@ -22,13 +26,17 @@ dependencies {
// implementation(libs.edc.transfer.pull.http.receiver)
implementation(libs.edc.transfer.pull.http.dynamic.receiver)

implementation(libs.edc.control.plane.api.client)
implementation(libs.edc.data.plane.http.spi)

implementation(libs.edc.data.plane.selector.api)
implementation(libs.edc.data.plane.selector.core)
implementation(libs.edc.data.plane.selector.client)

implementation(libs.edc.data.plane.api)
implementation(libs.edc.data.plane.core)
implementation(libs.edc.data.plane.http)
// must not be added so our implementation is used by the dataplane
// implementation(libs.edc.data.plane.api)
// implementation(libs.edc.data.plane.http)

}

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,81 @@
package org.eclipse.edc.connector.dataplane.api;

import org.eclipse.edc.connector.dataplane.api.controller.ExtendedDataPlanePublicApiController;
import org.eclipse.edc.connector.dataplane.api.pipeline.ApiDataSinkFactory;
import org.eclipse.edc.connector.dataplane.api.validation.ConsumerPullTransferDataAddressResolver;
import org.eclipse.edc.connector.dataplane.spi.pipeline.DataTransferExecutorServiceContainer;
import org.eclipse.edc.connector.dataplane.spi.pipeline.PipelineService;
import org.eclipse.edc.runtime.metamodel.annotation.Extension;
import org.eclipse.edc.runtime.metamodel.annotation.Inject;
import org.eclipse.edc.runtime.metamodel.annotation.Setting;
import org.eclipse.edc.spi.http.EdcHttpClient;
import org.eclipse.edc.spi.system.ServiceExtension;
import org.eclipse.edc.spi.system.ServiceExtensionContext;
import org.eclipse.edc.spi.types.TypeManager;
import org.eclipse.edc.web.spi.WebServer;
import org.eclipse.edc.web.spi.WebService;
import org.eclipse.edc.web.spi.configuration.WebServiceConfigurer;
import org.eclipse.edc.web.spi.configuration.WebServiceSettings;

/**
* This extension provides generic endpoints which are open to public participants of the Dataspace to execute
* requests on the actual data source.
*/
@Extension(value = DataPlanePublicApiExtension.NAME)
public class DataPlanePublicApiExtension implements ServiceExtension {
public static final String NAME = "Extended Data Plane Public API";
private static final int DEFAULT_PUBLIC_PORT = 8185;
private static final String PUBLIC_API_CONFIG = "web.http.public";
private static final String PUBLIC_CONTEXT_ALIAS = "public";
private static final String PUBLIC_CONTEXT_PATH = "/api/v1/public";

@Setting
private static final String CONTROL_PLANE_VALIDATION_ENDPOINT = "edc.dataplane.token.validation.endpoint";

private static final WebServiceSettings PUBLIC_SETTINGS = WebServiceSettings.Builder.newInstance()
.apiConfigKey(PUBLIC_API_CONFIG)
.contextAlias(PUBLIC_CONTEXT_ALIAS)
.defaultPath(PUBLIC_CONTEXT_PATH)
.defaultPort(DEFAULT_PUBLIC_PORT)
.name(NAME)
.build();

@Inject
private WebServer webServer;

@Inject
private WebServiceConfigurer webServiceConfigurer;

@Inject
private PipelineService pipelineService;

@Inject
private WebService webService;

@Inject
private EdcHttpClient httpClient;

@Inject
private TypeManager typeManager;

@Inject
private DataTransferExecutorServiceContainer executorContainer;

@Override
public String name() {
return NAME;
}

@Override
public void initialize(ServiceExtensionContext context) {
var validationEndpoint = context.getConfig().getString(CONTROL_PLANE_VALIDATION_ENDPOINT);
var dataAddressResolver = new ConsumerPullTransferDataAddressResolver(httpClient, validationEndpoint, typeManager.getMapper());
var configuration = webServiceConfigurer.configure(context, webServer, PUBLIC_SETTINGS);

var sinkFactory = new ApiDataSinkFactory(context.getMonitor(), executorContainer.getExecutorService());
pipelineService.registerFactory(sinkFactory);

var publicApiController = new ExtendedDataPlanePublicApiController(pipelineService, dataAddressResolver);
webService.registerResource(configuration.getContextAlias(), publicApiController);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,54 @@
package org.eclipse.edc.connector.dataplane.api.controller;

import jakarta.ws.rs.container.ContainerRequestContext;

import java.util.Map;

/**
* Wrapper around {@link ContainerRequestContext} enabling mocking.
*/
public interface ContainerRequestContextApi {

/**
* Get the request headers. Note that if more than one value is associated to a specific header,
* only the first one is retained.
*
* @return Headers map.
*/
Map<String, String> headers();

/**
* Format query of the request as string, e.g. "hello=world\&amp;foo=bar".
*
* @return Query param string.
*/
String queryParams();

/**
* Format the request body into a string.
*
* @return Request body.
*/
String body();

/**
* Get the media type from incoming request.
*
* @return Media type.
*/
String mediaType();

/**
* Return request path, e.g. "hello/world/foo/bar".
*
* @return Path string.
*/
String path();

/**
* Get http method from the incoming request, e.g. "GET", "POST"...
*
* @return Http method.
*/
String method();
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,94 @@
package org.eclipse.edc.connector.dataplane.api.controller;

import jakarta.ws.rs.container.ContainerRequestContext;
import jakarta.ws.rs.core.MediaType;
import org.eclipse.edc.spi.EdcException;

import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStreamReader;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.stream.Collectors;

/**
* This class provides a set of API wrapping a {@link ContainerRequestContext}.
*/
public class ContainerRequestContextApiImpl implements ContainerRequestContextApi {

private static final String QUERY_PARAM_SEPARATOR = "&";

private final ContainerRequestContext context;

public ContainerRequestContextApiImpl(ContainerRequestContext context) {
this.context = context;
}

@Override
public Map<String, String> headers() {
return context.getHeaders().entrySet()
.stream()
.filter(entry -> !entry.getValue().isEmpty())
.collect(Collectors.toMap(Map.Entry::getKey, entry -> entry.getValue().get(0)));
}

@Override
public String queryParams() {
return context.getUriInfo().getQueryParameters().entrySet()
.stream()
.map(entry -> new QueryParam(entry.getKey(), entry.getValue()))
.filter(QueryParam::isValid)
.map(QueryParam::toString)
.collect(Collectors.joining(QUERY_PARAM_SEPARATOR));
}

@Override
public String body() {
try (BufferedReader br = new BufferedReader(new InputStreamReader(context.getEntityStream()))) {
return br.lines().collect(Collectors.joining("\n"));
} catch (IOException e) {
throw new EdcException("Failed to read request body: " + e.getMessage());
}
}

@Override
public String path() {
var pathInfo = context.getUriInfo().getPath();
return pathInfo.startsWith("/") ? pathInfo.substring(1) : pathInfo;
}

@Override
public String mediaType() {
return Optional.ofNullable(context.getMediaType())
.map(MediaType::toString)
.orElse(null);
}

@Override
public String method() {
return context.getMethod();
}

private static final class QueryParam {

private final String key;
private final List<String> values;
private final boolean valid;

private QueryParam(String key, List<String> values) {
this.key = key;
this.values = values;
this.valid = key != null && values != null && !values.isEmpty();
}

public boolean isValid() {
return valid;
}

@Override
public String toString() {
return valid ? key + "=" + values.get(0) : "";
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,65 @@
package org.eclipse.edc.connector.dataplane.api.controller;

import org.eclipse.edc.connector.dataplane.api.pipeline.ApiDataSinkFactory;
import org.eclipse.edc.spi.types.domain.DataAddress;
import org.eclipse.edc.spi.types.domain.transfer.DataFlowRequest;

import java.util.HashMap;
import java.util.Map;
import java.util.Optional;
import java.util.UUID;
import java.util.function.BiFunction;

import static org.eclipse.edc.connector.dataplane.api.pipeline.HttpPart.excludedHeaders;
import static org.eclipse.edc.connector.dataplane.spi.schema.DataFlowRequestSchema.*;

public class DataFlowRequestSupplier implements BiFunction<org.eclipse.edc.connector.dataplane.api.controller.ContainerRequestContextApi, DataAddress, DataFlowRequest> {

/**
* Create a {@link DataFlowRequest} based on incoming request and claims decoded from the access token.
*
* @param contextApi Api for accessing request properties.
* @param dataAddress Source data address.
* @return DataFlowRequest
*/
@Override
public DataFlowRequest apply(org.eclipse.edc.connector.dataplane.api.controller.ContainerRequestContextApi contextApi, DataAddress dataAddress) {
var props = createProps(contextApi);
return DataFlowRequest.Builder.newInstance()
.processId(UUID.randomUUID().toString())
.sourceDataAddress(dataAddress)
.destinationDataAddress(DataAddress.Builder.newInstance()
.type(ApiDataSinkFactory.TYPE)
.build())
.trackable(false)
.id(UUID.randomUUID().toString())
.properties(props)
.build();
}

/**
* Put all properties of the incoming request (method, request body, query params...) into a map.
*/
private static Map<String, String> createProps(ContainerRequestContextApi contextApi) {
var props = new HashMap<String, String>();
props.put(METHOD, contextApi.method());
props.put(QUERY_PARAMS, contextApi.queryParams());
props.put(PATH, contextApi.path());
Optional.ofNullable(contextApi.mediaType())
.ifPresent(mediaType -> {
props.put(MEDIA_TYPE, mediaType);
props.put(BODY, contextApi.body());
});
contextApi.headers().forEach((key, value) -> {
if (isAdditionalHeader(key)) {
props.put(key, value);
}
});
return props;
}

private static boolean isAdditionalHeader(String key) {
return !excludedHeaders.contains(key);

}
}
Loading

0 comments on commit 390d110

Please sign in to comment.