diff --git a/NOTICE.md b/NOTICE.md new file mode 100644 index 0000000..4814843 --- /dev/null +++ b/NOTICE.md @@ -0,0 +1,14 @@ +## Changes from source project + +Part of this project uses a modified version of the public api extension and http extension of the edc-connector from: https://github.com/eclipse-edc/Connector + +The following files were modified from their original source: +- http-pull-connector/src/main/java/org/eclipse/org/eclipse/edc/connector/dataplane/api/controller/ContainerRequestContextApiImpl.java +- http-pull-connector/src/main/java/org/eclipse/org/eclipse/edc/connector/dataplane/api/controller/DataFlowRequestSupplier.java +- http-pull-connector/src/main/java/org/eclipse/edc/connector/dataplane/api/controller/ExtendedDataPlanePublicApiController.java +- http-pull-connector/src/main/java/org/eclipse/edc/connector/dataplane/api/pipeline/ApiDataSink.java +- http-pull-connector/src/main/java/org/eclipse/edc/connector/dataplane/api/pipeline/ApiDataSinkFactory.java +- http-pull-connector/src/main/java/org/eclipse/edc/connector/dataplane/api/DataPlanePublicApiExtension.java +- http-pull-connector/src/main/java/org/eclipse/edc/connector/dataplane/http/pipeline/datasink/HttpDataSink.java +- http-pull-connector/src/main/java/org/eclipse/edc/connector/dataplane/http/pipeline/datasource/HttpDataSource.java +- http-pull-connector/src/main/java/org/eclipse/edc/connector/dataplane/http/DataPlaneHttpExtension.java \ No newline at end of file diff --git a/gradle/libs.versions.toml b/gradle/libs.versions.toml index 01ee83c..9af76d7 100644 --- a/gradle/libs.versions.toml +++ b/gradle/libs.versions.toml @@ -4,7 +4,7 @@ format.version = "1.1" [versions] assertj = "3.24.2" awaitility = "4.2.0" -edc = "0.2.1" +edc = "0.4.1" jakarta-json = "2.0.1" junit-pioneer = "2.1.0" jupiter = "5.10.0" @@ -28,6 +28,7 @@ edc-catalog-spi = { module = "org.eclipse.edc:federated-catalog-spi", version.re edc-configuration-filesystem = { module = "org.eclipse.edc:configuration-filesystem", version.ref = "edc" } edc-connector-core = { module = "org.eclipse.edc:connector-core", version.ref = "edc" } edc-control-plane-core = { module = "org.eclipse.edc:control-plane-core", version.ref = "edc" } +edc-control-plane-api-client = { module = "org.eclipse.edc:control-plane-api-client", version.ref = "edc" } edc-control-plane-spi = { module = "org.eclipse.edc:control-plane-spi", version.ref = "edc" } edc-data-plane-api = { module = "org.eclipse.edc:data-plane-api", version.ref = "edc" } edc-data-plane-aws-s3 = { module = "org.eclipse.edc:data-plane-aws-s3", version.ref = "edc" } @@ -35,6 +36,7 @@ edc-data-plane-azure-storage = { module = "org.eclipse.edc:data-plane-azure-stor edc-data-plane-client = { module = "org.eclipse.edc:data-plane-client", version.ref = "edc" } edc-data-plane-core = { module = "org.eclipse.edc:data-plane-core", version.ref = "edc" } edc-data-plane-http = { module = "org.eclipse.edc:data-plane-http", version.ref = "edc" } +edc-data-plane-http-spi = { module = "org.eclipse.edc:data-plane-http-spi", version.ref = "edc" } edc-data-plane-selector-api = { module = "org.eclipse.edc:data-plane-selector-api", version.ref = "edc" } edc-data-plane-selector-client = { module = "org.eclipse.edc:data-plane-selector-client", version.ref = "edc" } edc-data-plane-selector-core = { module = "org.eclipse.edc:data-plane-selector-core", version.ref = "edc" } diff --git a/http-pull-connector/build.gradle.kts b/http-pull-connector/build.gradle.kts index f585725..282760d 100644 --- a/http-pull-connector/build.gradle.kts +++ b/http-pull-connector/build.gradle.kts @@ -12,6 +12,10 @@ repositories { dependencies { + implementation(libs.edc.data.plane.util) + implementation(libs.edc.util) + implementation(libs.edc.jersey.core) + implementation(libs.edc.control.plane.core) implementation(libs.edc.dsp) implementation(libs.edc.configuration.filesystem) @@ -22,13 +26,17 @@ dependencies { // implementation(libs.edc.transfer.pull.http.receiver) implementation(libs.edc.transfer.pull.http.dynamic.receiver) + implementation(libs.edc.control.plane.api.client) + implementation(libs.edc.data.plane.http.spi) + implementation(libs.edc.data.plane.selector.api) implementation(libs.edc.data.plane.selector.core) implementation(libs.edc.data.plane.selector.client) - implementation(libs.edc.data.plane.api) implementation(libs.edc.data.plane.core) - implementation(libs.edc.data.plane.http) +// must not be added so our implementation is used by the dataplane +// implementation(libs.edc.data.plane.api) +// implementation(libs.edc.data.plane.http) } diff --git a/http-pull-connector/src/main/java/org/eclipse/edc/connector/dataplane/api/DataPlanePublicApiExtension.java b/http-pull-connector/src/main/java/org/eclipse/edc/connector/dataplane/api/DataPlanePublicApiExtension.java new file mode 100644 index 0000000..3ad868d --- /dev/null +++ b/http-pull-connector/src/main/java/org/eclipse/edc/connector/dataplane/api/DataPlanePublicApiExtension.java @@ -0,0 +1,81 @@ +package org.eclipse.edc.connector.dataplane.api; + +import org.eclipse.edc.connector.dataplane.api.controller.ExtendedDataPlanePublicApiController; +import org.eclipse.edc.connector.dataplane.api.pipeline.ApiDataSinkFactory; +import org.eclipse.edc.connector.dataplane.api.validation.ConsumerPullTransferDataAddressResolver; +import org.eclipse.edc.connector.dataplane.spi.pipeline.DataTransferExecutorServiceContainer; +import org.eclipse.edc.connector.dataplane.spi.pipeline.PipelineService; +import org.eclipse.edc.runtime.metamodel.annotation.Extension; +import org.eclipse.edc.runtime.metamodel.annotation.Inject; +import org.eclipse.edc.runtime.metamodel.annotation.Setting; +import org.eclipse.edc.spi.http.EdcHttpClient; +import org.eclipse.edc.spi.system.ServiceExtension; +import org.eclipse.edc.spi.system.ServiceExtensionContext; +import org.eclipse.edc.spi.types.TypeManager; +import org.eclipse.edc.web.spi.WebServer; +import org.eclipse.edc.web.spi.WebService; +import org.eclipse.edc.web.spi.configuration.WebServiceConfigurer; +import org.eclipse.edc.web.spi.configuration.WebServiceSettings; + +/** + * This extension provides generic endpoints which are open to public participants of the Dataspace to execute + * requests on the actual data source. + */ +@Extension(value = DataPlanePublicApiExtension.NAME) +public class DataPlanePublicApiExtension implements ServiceExtension { + public static final String NAME = "Extended Data Plane Public API"; + private static final int DEFAULT_PUBLIC_PORT = 8185; + private static final String PUBLIC_API_CONFIG = "web.http.public"; + private static final String PUBLIC_CONTEXT_ALIAS = "public"; + private static final String PUBLIC_CONTEXT_PATH = "/api/v1/public"; + + @Setting + private static final String CONTROL_PLANE_VALIDATION_ENDPOINT = "edc.dataplane.token.validation.endpoint"; + + private static final WebServiceSettings PUBLIC_SETTINGS = WebServiceSettings.Builder.newInstance() + .apiConfigKey(PUBLIC_API_CONFIG) + .contextAlias(PUBLIC_CONTEXT_ALIAS) + .defaultPath(PUBLIC_CONTEXT_PATH) + .defaultPort(DEFAULT_PUBLIC_PORT) + .name(NAME) + .build(); + + @Inject + private WebServer webServer; + + @Inject + private WebServiceConfigurer webServiceConfigurer; + + @Inject + private PipelineService pipelineService; + + @Inject + private WebService webService; + + @Inject + private EdcHttpClient httpClient; + + @Inject + private TypeManager typeManager; + + @Inject + private DataTransferExecutorServiceContainer executorContainer; + + @Override + public String name() { + return NAME; + } + + @Override + public void initialize(ServiceExtensionContext context) { + var validationEndpoint = context.getConfig().getString(CONTROL_PLANE_VALIDATION_ENDPOINT); + var dataAddressResolver = new ConsumerPullTransferDataAddressResolver(httpClient, validationEndpoint, typeManager.getMapper()); + var configuration = webServiceConfigurer.configure(context, webServer, PUBLIC_SETTINGS); + + var sinkFactory = new ApiDataSinkFactory(context.getMonitor(), executorContainer.getExecutorService()); + pipelineService.registerFactory(sinkFactory); + + var publicApiController = new ExtendedDataPlanePublicApiController(pipelineService, dataAddressResolver); + webService.registerResource(configuration.getContextAlias(), publicApiController); + } +} diff --git a/http-pull-connector/src/main/java/org/eclipse/edc/connector/dataplane/api/controller/ContainerRequestContextApi.java b/http-pull-connector/src/main/java/org/eclipse/edc/connector/dataplane/api/controller/ContainerRequestContextApi.java new file mode 100644 index 0000000..03a1757 --- /dev/null +++ b/http-pull-connector/src/main/java/org/eclipse/edc/connector/dataplane/api/controller/ContainerRequestContextApi.java @@ -0,0 +1,54 @@ +package org.eclipse.edc.connector.dataplane.api.controller; + +import jakarta.ws.rs.container.ContainerRequestContext; + +import java.util.Map; + +/** + * Wrapper around {@link ContainerRequestContext} enabling mocking. + */ +public interface ContainerRequestContextApi { + + /** + * Get the request headers. Note that if more than one value is associated to a specific header, + * only the first one is retained. + * + * @return Headers map. + */ + Map headers(); + + /** + * Format query of the request as string, e.g. "hello=world\&foo=bar". + * + * @return Query param string. + */ + String queryParams(); + + /** + * Format the request body into a string. + * + * @return Request body. + */ + String body(); + + /** + * Get the media type from incoming request. + * + * @return Media type. + */ + String mediaType(); + + /** + * Return request path, e.g. "hello/world/foo/bar". + * + * @return Path string. + */ + String path(); + + /** + * Get http method from the incoming request, e.g. "GET", "POST"... + * + * @return Http method. + */ + String method(); +} diff --git a/http-pull-connector/src/main/java/org/eclipse/edc/connector/dataplane/api/controller/ContainerRequestContextApiImpl.java b/http-pull-connector/src/main/java/org/eclipse/edc/connector/dataplane/api/controller/ContainerRequestContextApiImpl.java new file mode 100644 index 0000000..f247cb9 --- /dev/null +++ b/http-pull-connector/src/main/java/org/eclipse/edc/connector/dataplane/api/controller/ContainerRequestContextApiImpl.java @@ -0,0 +1,94 @@ +package org.eclipse.edc.connector.dataplane.api.controller; + +import jakarta.ws.rs.container.ContainerRequestContext; +import jakarta.ws.rs.core.MediaType; +import org.eclipse.edc.spi.EdcException; + +import java.io.BufferedReader; +import java.io.IOException; +import java.io.InputStreamReader; +import java.util.List; +import java.util.Map; +import java.util.Optional; +import java.util.stream.Collectors; + +/** + * This class provides a set of API wrapping a {@link ContainerRequestContext}. + */ +public class ContainerRequestContextApiImpl implements ContainerRequestContextApi { + + private static final String QUERY_PARAM_SEPARATOR = "&"; + + private final ContainerRequestContext context; + + public ContainerRequestContextApiImpl(ContainerRequestContext context) { + this.context = context; + } + + @Override + public Map headers() { + return context.getHeaders().entrySet() + .stream() + .filter(entry -> !entry.getValue().isEmpty()) + .collect(Collectors.toMap(Map.Entry::getKey, entry -> entry.getValue().get(0))); + } + + @Override + public String queryParams() { + return context.getUriInfo().getQueryParameters().entrySet() + .stream() + .map(entry -> new QueryParam(entry.getKey(), entry.getValue())) + .filter(QueryParam::isValid) + .map(QueryParam::toString) + .collect(Collectors.joining(QUERY_PARAM_SEPARATOR)); + } + + @Override + public String body() { + try (BufferedReader br = new BufferedReader(new InputStreamReader(context.getEntityStream()))) { + return br.lines().collect(Collectors.joining("\n")); + } catch (IOException e) { + throw new EdcException("Failed to read request body: " + e.getMessage()); + } + } + + @Override + public String path() { + var pathInfo = context.getUriInfo().getPath(); + return pathInfo.startsWith("/") ? pathInfo.substring(1) : pathInfo; + } + + @Override + public String mediaType() { + return Optional.ofNullable(context.getMediaType()) + .map(MediaType::toString) + .orElse(null); + } + + @Override + public String method() { + return context.getMethod(); + } + + private static final class QueryParam { + + private final String key; + private final List values; + private final boolean valid; + + private QueryParam(String key, List values) { + this.key = key; + this.values = values; + this.valid = key != null && values != null && !values.isEmpty(); + } + + public boolean isValid() { + return valid; + } + + @Override + public String toString() { + return valid ? key + "=" + values.get(0) : ""; + } + } +} diff --git a/http-pull-connector/src/main/java/org/eclipse/edc/connector/dataplane/api/controller/DataFlowRequestSupplier.java b/http-pull-connector/src/main/java/org/eclipse/edc/connector/dataplane/api/controller/DataFlowRequestSupplier.java new file mode 100644 index 0000000..b41113d --- /dev/null +++ b/http-pull-connector/src/main/java/org/eclipse/edc/connector/dataplane/api/controller/DataFlowRequestSupplier.java @@ -0,0 +1,65 @@ +package org.eclipse.edc.connector.dataplane.api.controller; + +import org.eclipse.edc.connector.dataplane.api.pipeline.ApiDataSinkFactory; +import org.eclipse.edc.spi.types.domain.DataAddress; +import org.eclipse.edc.spi.types.domain.transfer.DataFlowRequest; + +import java.util.HashMap; +import java.util.Map; +import java.util.Optional; +import java.util.UUID; +import java.util.function.BiFunction; + +import static org.eclipse.edc.connector.dataplane.api.pipeline.HttpPart.excludedHeaders; +import static org.eclipse.edc.connector.dataplane.spi.schema.DataFlowRequestSchema.*; + +public class DataFlowRequestSupplier implements BiFunction { + + /** + * Create a {@link DataFlowRequest} based on incoming request and claims decoded from the access token. + * + * @param contextApi Api for accessing request properties. + * @param dataAddress Source data address. + * @return DataFlowRequest + */ + @Override + public DataFlowRequest apply(org.eclipse.edc.connector.dataplane.api.controller.ContainerRequestContextApi contextApi, DataAddress dataAddress) { + var props = createProps(contextApi); + return DataFlowRequest.Builder.newInstance() + .processId(UUID.randomUUID().toString()) + .sourceDataAddress(dataAddress) + .destinationDataAddress(DataAddress.Builder.newInstance() + .type(ApiDataSinkFactory.TYPE) + .build()) + .trackable(false) + .id(UUID.randomUUID().toString()) + .properties(props) + .build(); + } + + /** + * Put all properties of the incoming request (method, request body, query params...) into a map. + */ + private static Map createProps(ContainerRequestContextApi contextApi) { + var props = new HashMap(); + props.put(METHOD, contextApi.method()); + props.put(QUERY_PARAMS, contextApi.queryParams()); + props.put(PATH, contextApi.path()); + Optional.ofNullable(contextApi.mediaType()) + .ifPresent(mediaType -> { + props.put(MEDIA_TYPE, mediaType); + props.put(BODY, contextApi.body()); + }); + contextApi.headers().forEach((key, value) -> { + if (isAdditionalHeader(key)) { + props.put(key, value); + } + }); + return props; + } + + private static boolean isAdditionalHeader(String key) { + return !excludedHeaders.contains(key); + + } +} diff --git a/http-pull-connector/src/main/java/org/eclipse/edc/connector/dataplane/api/controller/ExtendedDataPlanePublicApiController.java b/http-pull-connector/src/main/java/org/eclipse/edc/connector/dataplane/api/controller/ExtendedDataPlanePublicApiController.java new file mode 100644 index 0000000..56596c8 --- /dev/null +++ b/http-pull-connector/src/main/java/org/eclipse/edc/connector/dataplane/api/controller/ExtendedDataPlanePublicApiController.java @@ -0,0 +1,151 @@ +package org.eclipse.edc.connector.dataplane.api.controller; + +import jakarta.ws.rs.*; +import jakarta.ws.rs.container.AsyncResponse; +import jakarta.ws.rs.container.ContainerRequestContext; +import jakarta.ws.rs.container.Suspended; +import jakarta.ws.rs.core.Context; +import jakarta.ws.rs.core.HttpHeaders; +import jakarta.ws.rs.core.MediaType; +import jakarta.ws.rs.core.Response; +import org.eclipse.edc.connector.dataplane.spi.pipeline.PipelineService; +import org.eclipse.edc.connector.dataplane.spi.resolver.DataAddressResolver; +import org.eclipse.edc.connector.dataplane.spi.response.TransferErrorResponse; +import org.eclipse.edc.spi.types.domain.DataAddress; +import org.eclipse.edc.web.spi.exception.NotAuthorizedException; + +import java.util.List; + +import static jakarta.ws.rs.core.Response.status; +import static java.lang.String.format; +import static java.lang.String.join; + +@Path("{any:.*}") +@Produces(MediaType.APPLICATION_JSON) +public class ExtendedDataPlanePublicApiController { + + private final PipelineService pipelineService; + private final DataAddressResolver dataAddressResolver; + private final org.eclipse.edc.connector.dataplane.api.controller.DataFlowRequestSupplier requestSupplier; + + public ExtendedDataPlanePublicApiController(PipelineService pipelineService, + DataAddressResolver dataAddressResolver) { + this.pipelineService = pipelineService; + this.dataAddressResolver = dataAddressResolver; + this.requestSupplier = new DataFlowRequestSupplier(); + } + + @GET + public void get(@Context ContainerRequestContext requestContext, @Suspended AsyncResponse response) { + handle(requestContext, response); + } + + /** + * Sends a {@link DELETE} request to the data source and returns data. + * + * @param requestContext Request context. + * @param response Data fetched from the data source. + */ + @DELETE + public void delete(@Context ContainerRequestContext requestContext, @Suspended AsyncResponse response) { + handle(requestContext, response); + } + + /** + * Sends a {@link PATCH} request to the data source and returns data. + * + * @param requestContext Request context. + * @param response Data fetched from the data source. + */ + @PATCH + public void patch(@Context ContainerRequestContext requestContext, @Suspended AsyncResponse response) { + handle(requestContext, response); + } + + /** + * Sends a {@link PUT} request to the data source and returns data. + * + * @param requestContext Request context. + * @param response Data fetched from the data source. + */ + @PUT + public void put(@Context ContainerRequestContext requestContext, @Suspended AsyncResponse response) { + handle(requestContext, response); + } + + /** + * Sends a {@link POST} request to the data source and returns data. + * + * @param requestContext Request context. + * @param response Data fetched from the data source. + */ + @POST + public void post(@Context ContainerRequestContext requestContext, @Suspended AsyncResponse response) { + handle(requestContext, response); + } + + private void handle(ContainerRequestContext context, AsyncResponse response) { + var contextApi = new ContainerRequestContextApiImpl(context); + var token = contextApi.headers().get(HttpHeaders.AUTHORIZATION); + if (token == null) { + response.resume(badRequest(("Missing bearer token"))); + return; + } + + var dataAddress = extractSourceDataAddress(token); + var dataFlowRequest = requestSupplier.apply(contextApi, dataAddress); + + var validationResult = pipelineService.validate(dataFlowRequest); + if (validationResult.failed()) { + var errorMsg = validationResult.getFailureMessages().isEmpty() ? + format("Failed to validate request with id: %s", dataFlowRequest.getId()) : + join(",", validationResult.getFailureMessages()); + response.resume(badRequest(errorMsg)); + return; + } + + pipelineService.transfer(dataFlowRequest) + .whenComplete((result, throwable) -> { + if (throwable == null) { + if (result.succeeded()) { + response.resume(result.getContent()); + } else { + response.resume(internalServerError(result.getFailureMessages())); + } + } else { + response.resume(internalServerError("Unhandled exception occurred during data transfer: " + throwable.getMessage())); + } + }); + } + + /** + * Invoke the {@link DataAddressResolver} with the provided token to retrieve the source data address. + * + * @param token input token + * @return the source {@link DataAddress}. + * @throws NotAuthorizedException if {@link DataAddressResolver} invokation failed. + */ + private DataAddress extractSourceDataAddress(String token) { + var result = dataAddressResolver.resolve(token); + if (result.failed()) { + throw new NotAuthorizedException(String.join(", ", result.getFailureMessages())); + } + return result.getContent(); + } + + private Response badRequest(String error) { + return badRequest(List.of(error)); + } + + private Response badRequest(List errors) { + return status(Response.Status.BAD_REQUEST).entity(new TransferErrorResponse(errors)).build(); + } + + private Response internalServerError(String error) { + return internalServerError(List.of(error)); + } + + private Response internalServerError(List errors) { + return status(Response.Status.INTERNAL_SERVER_ERROR).entity(new TransferErrorResponse(errors)).build(); + } +} diff --git a/http-pull-connector/src/main/java/org/eclipse/edc/connector/dataplane/api/pipeline/ApiDataSink.java b/http-pull-connector/src/main/java/org/eclipse/edc/connector/dataplane/api/pipeline/ApiDataSink.java new file mode 100644 index 0000000..0b9fc9e --- /dev/null +++ b/http-pull-connector/src/main/java/org/eclipse/edc/connector/dataplane/api/pipeline/ApiDataSink.java @@ -0,0 +1,90 @@ +package org.eclipse.edc.connector.dataplane.api.pipeline; + +import jakarta.ws.rs.core.Response; +import org.eclipse.edc.connector.dataplane.spi.pipeline.DataSink; +import org.eclipse.edc.connector.dataplane.spi.pipeline.DataSource; +import org.eclipse.edc.connector.dataplane.spi.pipeline.StreamResult; +import org.eclipse.edc.spi.monitor.Monitor; +import org.eclipse.edc.spi.result.AbstractResult; +import org.eclipse.edc.spi.result.Result; + +import java.io.ByteArrayOutputStream; +import java.nio.charset.StandardCharsets; +import java.util.Map; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ExecutorService; + +import static java.lang.String.format; +import static java.util.concurrent.CompletableFuture.completedFuture; +import static java.util.concurrent.CompletableFuture.supplyAsync; +import static org.eclipse.edc.connector.dataplane.spi.pipeline.StreamResult.error; +import static org.eclipse.edc.connector.dataplane.spi.pipeline.StreamResult.failure; +import static org.eclipse.edc.util.async.AsyncUtils.asyncAllOf; + +/** + * Sends data to an output stream. The transfer is done asynchronously using the supplied executor service. + */ +public class ApiDataSink implements DataSink { + private final String requestId; + private final ByteArrayOutputStream stream; + private final ExecutorService executorService; + private final Monitor monitor; + private int status; + private Map headers; + + public ApiDataSink(String requestId, ExecutorService executorService, Monitor monitor) { + this.requestId = requestId; + this.stream = new ByteArrayOutputStream(); + this.executorService = executorService; + this.monitor = monitor; + } + + @Override + public CompletableFuture> transfer(DataSource source) { + var streamResult = source.openPartStream(); + if (streamResult.failed()) { + return completedFuture(failure(streamResult.getFailure())); + } + + try (var partStream = streamResult.getContent()) { + return partStream + .map(part -> supplyAsync(() -> transferData(part), executorService)) + .collect(asyncAllOf()) + .thenApply(results -> { + if (results.stream().anyMatch(AbstractResult::failed)) { + return error("Error transferring data"); + } + Response.ResponseBuilder responseBuilder = isOk() ? Response.ok(stream.toString()) : Response.status(status, getContent()); + headers.forEach(responseBuilder::header); + + return StreamResult.success(responseBuilder.build()); + }); + } catch (Exception e) { + var errorMessage = format("Error processing data transfer request - Request ID: %s", requestId); + monitor.severe(errorMessage, e); + return CompletableFuture.completedFuture(error(errorMessage)); + } + } + + private Result transferData(DataSource.Part part) { + try (var source = part.openStream()) { + source.transferTo(stream); + if (part instanceof HttpPart httpPart) { + status = httpPart.getStatusCode(); + headers = httpPart.getHeadersForRequest(); + } + return Result.success(); + } catch (Exception e) { + monitor.severe("Error writing data", e); + return Result.failure("Error writing data"); + } + } + + private boolean isOk() { + return (200 <= status && status < 300); + } + + private String getContent() { + return stream.toString(StandardCharsets.UTF_8); + } +} diff --git a/http-pull-connector/src/main/java/org/eclipse/edc/connector/dataplane/api/pipeline/ApiDataSinkFactory.java b/http-pull-connector/src/main/java/org/eclipse/edc/connector/dataplane/api/pipeline/ApiDataSinkFactory.java new file mode 100644 index 0000000..1671a99 --- /dev/null +++ b/http-pull-connector/src/main/java/org/eclipse/edc/connector/dataplane/api/pipeline/ApiDataSinkFactory.java @@ -0,0 +1,48 @@ +package org.eclipse.edc.connector.dataplane.api.pipeline; + +import org.eclipse.edc.connector.dataplane.spi.pipeline.DataSink; +import org.eclipse.edc.connector.dataplane.spi.pipeline.DataSinkFactory; +import org.eclipse.edc.connector.dataplane.util.sink.OutputStreamDataSink; +import org.eclipse.edc.spi.monitor.Monitor; +import org.eclipse.edc.spi.result.Result; +import org.eclipse.edc.spi.types.domain.transfer.DataFlowRequest; +import org.jetbrains.annotations.NotNull; + +import java.util.concurrent.ExecutorService; + +/** + * A sink factory whose sole purpose is to validate {@link DataFlowRequest} whose destination address type is OutputStream. + * This is required for synchronous data transfers in which an {@link OutputStreamDataSink} is used to convey the data to the call response. + */ +public class ApiDataSinkFactory implements DataSinkFactory { + + public static final String TYPE = "Api-response"; + private final Monitor monitor; + private final ExecutorService executorService; + + public ApiDataSinkFactory(Monitor monitor, ExecutorService executorService) { + this.monitor = monitor; + this.executorService = executorService; + } + + @Override + public boolean canHandle(DataFlowRequest request) { + return TYPE.equals(request.getDestinationDataAddress().getType()); + } + + @Override + public @NotNull Result validateRequest(DataFlowRequest request) { + if (!canHandle(request)) { + + return Result.failure(String.format("%s: Cannot handle destination data address with type: %s", + getClass().getSimpleName(), request.getDestinationDataAddress().getType())); + } + + return Result.success(); + } + + @Override + public DataSink createSink(DataFlowRequest request) { + return new ApiDataSink(request.getId(), executorService, monitor); + } +} diff --git a/http-pull-connector/src/main/java/org/eclipse/edc/connector/dataplane/api/pipeline/HttpPart.java b/http-pull-connector/src/main/java/org/eclipse/edc/connector/dataplane/api/pipeline/HttpPart.java new file mode 100644 index 0000000..b02c84c --- /dev/null +++ b/http-pull-connector/src/main/java/org/eclipse/edc/connector/dataplane/api/pipeline/HttpPart.java @@ -0,0 +1,65 @@ +package org.eclipse.edc.connector.dataplane.api.pipeline; + +import jakarta.ws.rs.core.HttpHeaders; +import org.eclipse.edc.connector.dataplane.spi.pipeline.DataSource; + +import java.io.InputStream; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +public class HttpPart implements DataSource.Part { + public static final List excludedHeaders = List.of(HttpHeaders.AUTHORIZATION, HttpHeaders.CONTENT_TYPE, HttpHeaders.ACCEPT); + private final String name; + private final Map> headers; + private final int statusCode; + private final InputStream content; + + public HttpPart(String name, Map> headers, int statusCode, InputStream content) { + this.name = name; + this.headers = headers; + this.statusCode = statusCode; + this.content = content; + } + + @Override + public String name() { + return name; + } + + @Override + public long size() { + return SIZE_UNKNOWN; + } + + @Override + public InputStream openStream() { + return content; + } + + public Map> getHeaders() { + return headers; + } + + public Map getHeadersForRequest() { + Map requestHeadersMap = new HashMap<>(); + headers.forEach((key, value) -> { + if (isRequestHeader(key)) { + requestHeadersMap.put(key, String.join(",", value)); + } + }); + return requestHeadersMap; + } + + public int getStatusCode() { + return statusCode; + } + + public boolean isOk() { + return 200 <= statusCode && statusCode < 300; + } + + private boolean isRequestHeader(String key) { + return !excludedHeaders.contains(key); + } +} \ No newline at end of file diff --git a/http-pull-connector/src/main/java/org/eclipse/edc/connector/dataplane/api/validation/ConsumerPullTransferDataAddressResolver.java b/http-pull-connector/src/main/java/org/eclipse/edc/connector/dataplane/api/validation/ConsumerPullTransferDataAddressResolver.java new file mode 100644 index 0000000..a856988 --- /dev/null +++ b/http-pull-connector/src/main/java/org/eclipse/edc/connector/dataplane/api/validation/ConsumerPullTransferDataAddressResolver.java @@ -0,0 +1,53 @@ +package org.eclipse.edc.connector.dataplane.api.validation; + +import com.fasterxml.jackson.databind.ObjectMapper; +import jakarta.ws.rs.core.HttpHeaders; +import okhttp3.Request; +import org.eclipse.edc.connector.dataplane.spi.resolver.DataAddressResolver; +import org.eclipse.edc.spi.http.EdcHttpClient; +import org.eclipse.edc.spi.result.Result; +import org.eclipse.edc.spi.types.domain.DataAddress; + +import java.io.IOException; + +import static java.lang.String.format; + +public class ConsumerPullTransferDataAddressResolver implements DataAddressResolver { + + private final EdcHttpClient httpClient; + private final String endpoint; + private final ObjectMapper mapper; + + public ConsumerPullTransferDataAddressResolver(EdcHttpClient httpClient, String endpoint, ObjectMapper mapper) { + this.httpClient = httpClient; + this.endpoint = endpoint; + this.mapper = mapper; + } + + /** + * Resolves access token received in input of Data Plane public API (consumer pull) into the {@link DataAddress} + * of the requested data. + * + * @param token Access token received in input of the Data Plane public API + * @return Data address + */ + @Override + public Result resolve(String token) { + var request = new Request.Builder().url(endpoint).header(HttpHeaders.AUTHORIZATION, token).get().build(); + try (var response = httpClient.execute(request)) { + var body = response.body(); + var stringBody = body != null ? body.string() : null; + if (stringBody == null) { + return Result.failure("Token validation server returned null body"); + } + + if (response.isSuccessful()) { + return Result.success(mapper.readValue(stringBody, DataAddress.class)); + } else { + return Result.failure(format("Call to token validation sever failed: %s - %s. %s", response.code(), response.message(), stringBody)); + } + } catch (IOException e) { + return Result.failure("Unhandled exception occurred during call to token validation server: " + e.getMessage()); + } + } +} diff --git a/http-pull-connector/src/main/java/org/eclipse/edc/connector/dataplane/http/DataPlaneHttpExtension.java b/http-pull-connector/src/main/java/org/eclipse/edc/connector/dataplane/http/DataPlaneHttpExtension.java new file mode 100644 index 0000000..7dfefc7 --- /dev/null +++ b/http-pull-connector/src/main/java/org/eclipse/edc/connector/dataplane/http/DataPlaneHttpExtension.java @@ -0,0 +1,63 @@ +package org.eclipse.edc.connector.dataplane.http; + +import org.eclipse.edc.connector.dataplane.http.params.HttpRequestFactory; +import org.eclipse.edc.connector.dataplane.http.params.HttpRequestParamsProviderImpl; +import org.eclipse.edc.connector.dataplane.http.pipeline.datasink.HttpDataSinkFactory; +import org.eclipse.edc.connector.dataplane.http.pipeline.datasource.HttpDataSourceFactory; +import org.eclipse.edc.connector.dataplane.http.spi.HttpRequestParamsProvider; +import org.eclipse.edc.connector.dataplane.spi.pipeline.DataTransferExecutorServiceContainer; +import org.eclipse.edc.connector.dataplane.spi.pipeline.PipelineService; +import org.eclipse.edc.runtime.metamodel.annotation.Extension; +import org.eclipse.edc.runtime.metamodel.annotation.Inject; +import org.eclipse.edc.runtime.metamodel.annotation.Setting; +import org.eclipse.edc.spi.http.EdcHttpClient; +import org.eclipse.edc.spi.security.Vault; +import org.eclipse.edc.spi.system.ServiceExtension; +import org.eclipse.edc.spi.system.ServiceExtensionContext; +import org.eclipse.edc.spi.types.TypeManager; + +@Extension(value = DataPlaneHttpExtension.NAME) +public class DataPlaneHttpExtension implements ServiceExtension { + public static final String NAME = "Data Plane Extended HTTP"; + private static final int DEFAULT_PART_SIZE = 5; + + @Setting + private static final String EDC_DATAPLANE_HTTP_SINK_PARTITION_SIZE = "edc.dataplane.http.sink.partition.size"; + + @Inject + private EdcHttpClient httpClient; + + @Inject + private PipelineService pipelineService; + + @Inject + private DataTransferExecutorServiceContainer executorContainer; + + @Inject + private Vault vault; + + @Inject + private TypeManager typeManager; + + @Override + public String name() { + return NAME; + } + + @Override + public void initialize(ServiceExtensionContext context) { + var monitor = context.getMonitor(); + var sinkPartitionSize = context.getSetting(EDC_DATAPLANE_HTTP_SINK_PARTITION_SIZE, DEFAULT_PART_SIZE); + + var paramsProvider = new HttpRequestParamsProviderImpl(vault, typeManager); + context.registerService(HttpRequestParamsProvider.class, paramsProvider); + + var httpRequestFactory = new HttpRequestFactory(); + + var sourceFactory = new HttpDataSourceFactory(httpClient, paramsProvider, monitor, httpRequestFactory); + pipelineService.registerFactory(sourceFactory); + + var sinkFactory = new HttpDataSinkFactory(httpClient, executorContainer.getExecutorService(), sinkPartitionSize, monitor, paramsProvider, httpRequestFactory); + pipelineService.registerFactory(sinkFactory); + } +} diff --git a/http-pull-connector/src/main/java/org/eclipse/edc/connector/dataplane/http/params/HttpRequestFactory.java b/http-pull-connector/src/main/java/org/eclipse/edc/connector/dataplane/http/params/HttpRequestFactory.java new file mode 100644 index 0000000..01bc81b --- /dev/null +++ b/http-pull-connector/src/main/java/org/eclipse/edc/connector/dataplane/http/params/HttpRequestFactory.java @@ -0,0 +1,99 @@ +package org.eclipse.edc.connector.dataplane.http.params; + +import okhttp3.HttpUrl; +import okhttp3.Request; +import okhttp3.RequestBody; +import org.eclipse.edc.connector.dataplane.http.pipeline.ChunkedTransferRequestBody; +import org.eclipse.edc.connector.dataplane.http.pipeline.NonChunkedTransferRequestBody; +import org.eclipse.edc.connector.dataplane.http.pipeline.StringRequestBodySupplier; +import org.eclipse.edc.connector.dataplane.http.spi.HttpRequestParams; +import org.jetbrains.annotations.NotNull; +import org.jetbrains.annotations.Nullable; + +import java.io.InputStream; +import java.util.Map; +import java.util.Objects; +import java.util.Optional; +import java.util.function.Supplier; + +import static org.eclipse.edc.util.string.StringUtils.isNullOrBlank; + +/** + * Permits to create a {@link Request} from a {@link HttpRequestParams} + */ +public class HttpRequestFactory { + + private static final String SLASH = "/"; + private static final String BACKSLASH = "\\"; + + /** + * Creates HTTP request from the provided set of parameters. + * + * @param params the http request parameters + * @return HTTP request. + */ + public Request toRequest(HttpRequestParams params) { + var bodySupplier = Optional.of(params) + .map(HttpRequestParams::getBody) + .map(StringRequestBodySupplier::new) + .orElse(null); + + return toRequest(params, Map.of(), bodySupplier); + } + + /** + * Creates HTTP request from the provided set of parameters and the request body supplier. + * + * @param params the http request parameters + * @param bodySupplier the request body supplier. + * @return HTTP request. + */ + public Request toRequest(HttpRequestParams params, Map additionalHeaders, Supplier bodySupplier) { + var requestBody = createRequestBody(params, bodySupplier); + var requestBuilder = new Request.Builder() + .url(toUrl(params)) + .method(params.getMethod(), requestBody); + params.getHeaders().forEach(requestBuilder::addHeader); + additionalHeaders.forEach(requestBuilder::addHeader); + return requestBuilder.build(); + } + + @Nullable + private RequestBody createRequestBody(HttpRequestParams params, @Nullable Supplier bodySupplier) { + var contentType = params.getContentType(); + if (bodySupplier == null || contentType == null) { + return null; + } + return params.isNonChunkedTransfer() + ? new NonChunkedTransferRequestBody(bodySupplier, contentType) + : new ChunkedTransferRequestBody(bodySupplier, contentType); + } + + /** + * Creates a URL from the base url, path and query parameters provided in input. + * + * @return The URL. + */ + private HttpUrl toUrl(HttpRequestParams params) { + var baseUrl = params.getBaseUrl(); + var parsedBaseUrl = HttpUrl.parse(baseUrl); + Objects.requireNonNull(parsedBaseUrl, "Failed to parse baseUrl: " + baseUrl); + + var builder = parsedBaseUrl.newBuilder(); + var path = params.getPath(); + if (!isNullOrBlank(path)) { + var sanitizedPath = startWithSlash(path) ? path.substring(1) : path; + builder.addPathSegments(sanitizedPath); + } + + var queryParams = params.getQueryParams(); + if (!isNullOrBlank(queryParams)) { + builder.query(queryParams); + } + return builder.build(); + } + + private static boolean startWithSlash(@NotNull String s) { + return s.startsWith(SLASH) || s.startsWith(BACKSLASH); + } +} diff --git a/http-pull-connector/src/main/java/org/eclipse/edc/connector/dataplane/http/params/HttpRequestParamsProviderImpl.java b/http-pull-connector/src/main/java/org/eclipse/edc/connector/dataplane/http/params/HttpRequestParamsProviderImpl.java new file mode 100644 index 0000000..e273ca2 --- /dev/null +++ b/http-pull-connector/src/main/java/org/eclipse/edc/connector/dataplane/http/params/HttpRequestParamsProviderImpl.java @@ -0,0 +1,56 @@ +package org.eclipse.edc.connector.dataplane.http.params; + +import org.eclipse.edc.connector.dataplane.http.params.decorators.BaseCommonHttpParamsDecorator; +import org.eclipse.edc.connector.dataplane.http.params.decorators.BaseSinkHttpParamsDecorator; +import org.eclipse.edc.connector.dataplane.http.params.decorators.BaseSourceHttpParamsDecorator; +import org.eclipse.edc.connector.dataplane.http.spi.HttpDataAddress; +import org.eclipse.edc.connector.dataplane.http.spi.HttpParamsDecorator; +import org.eclipse.edc.connector.dataplane.http.spi.HttpRequestParams; +import org.eclipse.edc.connector.dataplane.http.spi.HttpRequestParamsProvider; +import org.eclipse.edc.spi.security.Vault; +import org.eclipse.edc.spi.types.TypeManager; +import org.eclipse.edc.spi.types.domain.transfer.DataFlowRequest; + +import java.util.ArrayList; +import java.util.List; + +public class HttpRequestParamsProviderImpl implements HttpRequestParamsProvider { + + private final List sourceDecorators = new ArrayList<>(); + private final List sinkDecorators = new ArrayList<>(); + + public HttpRequestParamsProviderImpl(Vault vault, TypeManager typeManager) { + var commonHttpParamsDecorator = new BaseCommonHttpParamsDecorator(vault, typeManager); + registerSinkDecorator(commonHttpParamsDecorator); + registerSourceDecorator(commonHttpParamsDecorator); + registerSourceDecorator(new BaseSourceHttpParamsDecorator()); + registerSinkDecorator(new BaseSinkHttpParamsDecorator()); + } + + @Override + public void registerSourceDecorator(HttpParamsDecorator decorator) { + sourceDecorators.add(decorator); + } + + @Override + public void registerSinkDecorator(HttpParamsDecorator decorator) { + sinkDecorators.add(decorator); + } + + @Override + public HttpRequestParams provideSourceParams(DataFlowRequest request) { + var params = HttpRequestParams.Builder.newInstance(); + var address = HttpDataAddress.Builder.newInstance().copyFrom(request.getSourceDataAddress()).build(); + sourceDecorators.forEach(decorator -> decorator.decorate(request, address, params)); + return params.build(); + } + + @Override + public HttpRequestParams provideSinkParams(DataFlowRequest request) { + var params = HttpRequestParams.Builder.newInstance(); + var address = HttpDataAddress.Builder.newInstance().copyFrom(request.getDestinationDataAddress()).build(); + sinkDecorators.forEach(decorator -> decorator.decorate(request, address, params)); + return params.build(); + } + +} diff --git a/http-pull-connector/src/main/java/org/eclipse/edc/connector/dataplane/http/params/decorators/BaseCommonHttpParamsDecorator.java b/http-pull-connector/src/main/java/org/eclipse/edc/connector/dataplane/http/params/decorators/BaseCommonHttpParamsDecorator.java new file mode 100644 index 0000000..ab04c28 --- /dev/null +++ b/http-pull-connector/src/main/java/org/eclipse/edc/connector/dataplane/http/params/decorators/BaseCommonHttpParamsDecorator.java @@ -0,0 +1,90 @@ +package org.eclipse.edc.connector.dataplane.http.params.decorators; + +import org.eclipse.edc.connector.dataplane.http.spi.HttpDataAddress; +import org.eclipse.edc.connector.dataplane.http.spi.HttpParamsDecorator; +import org.eclipse.edc.connector.dataplane.http.spi.HttpRequestParams; +import org.eclipse.edc.spi.EdcException; +import org.eclipse.edc.spi.security.Vault; +import org.eclipse.edc.spi.types.TypeManager; +import org.eclipse.edc.spi.types.domain.transfer.DataFlowRequest; + +import java.util.Map; +import java.util.Optional; + +import static java.lang.String.format; +import static org.eclipse.edc.connector.dataplane.http.spi.HttpDataAddress.ADDITIONAL_HEADER; + +public class BaseCommonHttpParamsDecorator implements HttpParamsDecorator { + + private final Vault vault; + private final TypeManager typeManager; + + public BaseCommonHttpParamsDecorator(Vault vault, TypeManager typeManager) { + this.vault = vault; + this.typeManager = typeManager; + } + + @Override + public HttpRequestParams.Builder decorate(DataFlowRequest request, HttpDataAddress address, HttpRequestParams.Builder params) { + var requestId = request.getId(); + var baseUrl = Optional.ofNullable(address.getBaseUrl()) + .orElseThrow(() -> new EdcException(format("DataFlowRequest %s: 'baseUrl' property is missing in HttpDataAddress", requestId))); + + Optional.ofNullable(address.getAuthKey()) + .ifPresent(authKey -> params.header(authKey, extractAuthCode(requestId, address))); + request.getProperties().forEach((key, value) -> { + if (key.startsWith(ADDITIONAL_HEADER)) { + params.header(key.replace(ADDITIONAL_HEADER, ""), value); + } + }); + return params + .baseUrl(baseUrl) + .headers(address.getAdditionalHeaders()); + } + + /** + * Extract auth token for accessing data source API. + *

+ * First check the token is directly hardcoded within the data source. + * If not then use the secret to resolve it from the vault. + * In the vault the token could be stored directly as a string or in an object within the "token" field (look at the + * "oauth2-provision" extension for details.) + * + * @param requestId request identifier + * @param address address of the data source + * @return Secret to be used for authentication. + */ + private String extractAuthCode(String requestId, HttpDataAddress address) { + var secret = address.getAuthCode(); + if (secret != null) { + return secret; + } + + var secretName = address.getSecretName(); + if (secretName == null) { + throw new EdcException(format("DataFlowRequest %s: 'secretName' property is missing in HttpDataAddress", requestId)); + } + + var value = vault.resolveSecret(secretName); + + return Optional.ofNullable(value) + .map(it -> getTokenFromJson(it, requestId).orElse(it)) + .orElseThrow(() -> new EdcException(format("DataFlowRequest %s: no secret found in vault with name %s", requestId, secretName))); + } + + private Optional getTokenFromJson(String value, String requestId) { + Map map; + try { + map = typeManager.readValue(value, Map.class); + } catch (Exception e) { + return Optional.empty(); + } + + var token = map.get("token"); + if (token == null) { + throw new EdcException(format("DataFlowRequest %s: Field 'token' not found in the secret serialized as json: %s", requestId, value)); + } else { + return Optional.of(token.toString()); + } + } +} diff --git a/http-pull-connector/src/main/java/org/eclipse/edc/connector/dataplane/http/params/decorators/BaseSinkHttpParamsDecorator.java b/http-pull-connector/src/main/java/org/eclipse/edc/connector/dataplane/http/params/decorators/BaseSinkHttpParamsDecorator.java new file mode 100644 index 0000000..094b97a --- /dev/null +++ b/http-pull-connector/src/main/java/org/eclipse/edc/connector/dataplane/http/params/decorators/BaseSinkHttpParamsDecorator.java @@ -0,0 +1,28 @@ +package org.eclipse.edc.connector.dataplane.http.params.decorators; + +import org.eclipse.edc.connector.dataplane.http.spi.HttpDataAddress; +import org.eclipse.edc.connector.dataplane.http.spi.HttpParamsDecorator; +import org.eclipse.edc.connector.dataplane.http.spi.HttpRequestParams; +import org.eclipse.edc.spi.types.domain.transfer.DataFlowRequest; + +import java.util.Optional; + +public class BaseSinkHttpParamsDecorator implements HttpParamsDecorator { + private static final String DEFAULT_METHOD = "POST"; + + @Override + public HttpRequestParams.Builder decorate(DataFlowRequest request, HttpDataAddress address, HttpRequestParams.Builder params) { + var method = Optional.ofNullable(address.getMethod()).orElse(DEFAULT_METHOD); + params.method(method); + params.path(address.getPath()); + params.queryParams(null); + Optional.ofNullable(address.getContentType()) + .ifPresent(contentType -> { + params.contentType(contentType); + params.body(null); + }); + params.nonChunkedTransfer(address.getNonChunkedTransfer()); + return params; + } + +} diff --git a/http-pull-connector/src/main/java/org/eclipse/edc/connector/dataplane/http/params/decorators/BaseSourceHttpParamsDecorator.java b/http-pull-connector/src/main/java/org/eclipse/edc/connector/dataplane/http/params/decorators/BaseSourceHttpParamsDecorator.java new file mode 100644 index 0000000..67f6585 --- /dev/null +++ b/http-pull-connector/src/main/java/org/eclipse/edc/connector/dataplane/http/params/decorators/BaseSourceHttpParamsDecorator.java @@ -0,0 +1,70 @@ +package org.eclipse.edc.connector.dataplane.http.params.decorators; + +import org.eclipse.edc.connector.dataplane.http.spi.HttpDataAddress; +import org.eclipse.edc.connector.dataplane.http.spi.HttpParamsDecorator; +import org.eclipse.edc.connector.dataplane.http.spi.HttpRequestParams; +import org.eclipse.edc.spi.EdcException; +import org.eclipse.edc.spi.types.domain.transfer.DataFlowRequest; +import org.eclipse.edc.util.string.StringUtils; +import org.jetbrains.annotations.NotNull; +import org.jetbrains.annotations.Nullable; + +import java.util.Optional; +import java.util.stream.Collectors; +import java.util.stream.Stream; + +import static java.lang.String.format; +import static org.eclipse.edc.connector.dataplane.spi.schema.DataFlowRequestSchema.*; + +public class BaseSourceHttpParamsDecorator implements HttpParamsDecorator { + + private static final String DEFAULT_METHOD = "GET"; + + @Override + public HttpRequestParams.Builder decorate(DataFlowRequest request, HttpDataAddress address, HttpRequestParams.Builder params) { + params.method(extractMethod(address, request)); + params.path(extractPath(address, request)); + params.queryParams(extractQueryParams(address, request)); + Optional.ofNullable(extractContentType(address, request)) + .ifPresent(ct -> { + params.contentType(ct); + params.body(extractBody(address, request)); + }); + params.nonChunkedTransfer(false); + return params; + } + + private @NotNull String extractMethod(HttpDataAddress address, DataFlowRequest request) { + if (Boolean.parseBoolean(address.getProxyMethod())) { + return Optional.ofNullable(request.getProperties().get(METHOD)) + .orElseThrow(() -> new EdcException(format("DataFlowRequest %s: 'method' property is missing", request.getId()))); + } + return Optional.ofNullable(address.getMethod()).orElse(DEFAULT_METHOD); + } + + private @Nullable String extractPath(HttpDataAddress address, DataFlowRequest request) { + return Boolean.parseBoolean(address.getProxyPath()) ? request.getProperties().get(PATH) : address.getPath(); + } + + private @Nullable String extractQueryParams(HttpDataAddress address, DataFlowRequest request) { + var queryParams = Stream.of(address.getQueryParams(), getRequestQueryParams(address, request)) + .filter(s -> !StringUtils.isNullOrBlank(s)) + .collect(Collectors.joining("&")); + return !queryParams.isEmpty() ? queryParams : null; + } + + @Nullable + private String extractContentType(HttpDataAddress address, DataFlowRequest request) { + return Boolean.parseBoolean(address.getProxyBody()) ? request.getProperties().get(MEDIA_TYPE) : address.getContentType(); + } + + @Nullable + private String extractBody(HttpDataAddress address, DataFlowRequest request) { + return Boolean.parseBoolean(address.getProxyBody()) ? request.getProperties().get(BODY) : null; + } + + @Nullable + private String getRequestQueryParams(HttpDataAddress address, DataFlowRequest request) { + return Boolean.parseBoolean(address.getProxyQueryParams()) ? request.getProperties().get(QUERY_PARAMS) : null; + } +} diff --git a/http-pull-connector/src/main/java/org/eclipse/edc/connector/dataplane/http/pipeline/AbstractTransferRequestBody.java b/http-pull-connector/src/main/java/org/eclipse/edc/connector/dataplane/http/pipeline/AbstractTransferRequestBody.java new file mode 100644 index 0000000..87c9059 --- /dev/null +++ b/http-pull-connector/src/main/java/org/eclipse/edc/connector/dataplane/http/pipeline/AbstractTransferRequestBody.java @@ -0,0 +1,23 @@ +package org.eclipse.edc.connector.dataplane.http.pipeline; + +import okhttp3.MediaType; +import okhttp3.RequestBody; + +/** + * Writes content into an OK HTTP buffered sink. + * + * @see OkHttp Dcoumentation + */ +public abstract class AbstractTransferRequestBody extends RequestBody { + + private final String contentType; + + protected AbstractTransferRequestBody(String contentType) { + this.contentType = contentType; + } + + @Override + public MediaType contentType() { + return MediaType.parse(contentType); + } +} diff --git a/http-pull-connector/src/main/java/org/eclipse/edc/connector/dataplane/http/pipeline/ChunkedTransferRequestBody.java b/http-pull-connector/src/main/java/org/eclipse/edc/connector/dataplane/http/pipeline/ChunkedTransferRequestBody.java new file mode 100644 index 0000000..75f0088 --- /dev/null +++ b/http-pull-connector/src/main/java/org/eclipse/edc/connector/dataplane/http/pipeline/ChunkedTransferRequestBody.java @@ -0,0 +1,32 @@ +package org.eclipse.edc.connector.dataplane.http.pipeline; + +import okio.BufferedSink; +import org.jetbrains.annotations.NotNull; + +import java.io.IOException; +import java.io.InputStream; +import java.util.function.Supplier; + +/** + * Streams content into an OK HTTP buffered sink in chunks. + *

+ * Due to OkHttp implementation an extra header will be created (no-overridable) Transfer-Encoding with value chunked + * + * @see OkHttp Dcoumentation + */ +public class ChunkedTransferRequestBody extends AbstractTransferRequestBody { + + private final Supplier bodySupplier; + + public ChunkedTransferRequestBody(Supplier bodySupplier, String contentType) { + super(contentType); + this.bodySupplier = bodySupplier; + } + + @Override + public void writeTo(@NotNull BufferedSink sink) throws IOException { + try (var os = sink.outputStream(); var is = bodySupplier.get()) { + is.transferTo(os); + } + } +} diff --git a/http-pull-connector/src/main/java/org/eclipse/edc/connector/dataplane/http/pipeline/NonChunkedTransferRequestBody.java b/http-pull-connector/src/main/java/org/eclipse/edc/connector/dataplane/http/pipeline/NonChunkedTransferRequestBody.java new file mode 100644 index 0000000..8feb2b0 --- /dev/null +++ b/http-pull-connector/src/main/java/org/eclipse/edc/connector/dataplane/http/pipeline/NonChunkedTransferRequestBody.java @@ -0,0 +1,47 @@ +package org.eclipse.edc.connector.dataplane.http.pipeline; + +import okio.BufferedSink; +import org.jetbrains.annotations.NotNull; + +import java.io.IOException; +import java.io.InputStream; +import java.util.function.Supplier; + +/** + * Writes content into an OK HTTP buffered sink. + *

+ * The extra Transfer-Encoding is not created because the Content-Length is provided upfront. + * Note that means that the all content is loaded into memory, so this method can be used for small files (up to 50MB) for e.g. + * + * @see OkHttp Dcoumentation + */ +public class NonChunkedTransferRequestBody extends AbstractTransferRequestBody { + + private byte[] bytes; + + public NonChunkedTransferRequestBody(Supplier contentSupplier, String contentType) { + super(contentType); + try (var is = contentSupplier.get()) { + this.bytes = is.readAllBytes(); + } catch (IOException e) { + //do nothing + } + } + + @Override + public long contentLength() { + return bytes == null ? 0 : bytes.length; + } + + @Override + public void writeTo(@NotNull BufferedSink sink) throws IOException { + if (bytes == null) { + return; + } + + try (var os = sink.outputStream()) { + os.write(bytes); + } + } +} + diff --git a/http-pull-connector/src/main/java/org/eclipse/edc/connector/dataplane/http/pipeline/StringRequestBodySupplier.java b/http-pull-connector/src/main/java/org/eclipse/edc/connector/dataplane/http/pipeline/StringRequestBodySupplier.java new file mode 100644 index 0000000..6fc329d --- /dev/null +++ b/http-pull-connector/src/main/java/org/eclipse/edc/connector/dataplane/http/pipeline/StringRequestBodySupplier.java @@ -0,0 +1,24 @@ +package org.eclipse.edc.connector.dataplane.http.pipeline; + +import java.io.ByteArrayInputStream; +import java.io.InputStream; +import java.util.Objects; +import java.util.function.Supplier; + +/** + * Supplier for string request body. + */ +public class StringRequestBodySupplier implements Supplier { + + private final String body; + + public StringRequestBodySupplier(String requestBody) { + Objects.requireNonNull(requestBody); + this.body = requestBody; + } + + @Override + public InputStream get() { + return new ByteArrayInputStream(body.getBytes()); + } +} diff --git a/http-pull-connector/src/main/java/org/eclipse/edc/connector/dataplane/http/pipeline/datasink/HttpDataSink.java b/http-pull-connector/src/main/java/org/eclipse/edc/connector/dataplane/http/pipeline/datasink/HttpDataSink.java new file mode 100644 index 0000000..f826fd2 --- /dev/null +++ b/http-pull-connector/src/main/java/org/eclipse/edc/connector/dataplane/http/pipeline/datasink/HttpDataSink.java @@ -0,0 +1,106 @@ +package org.eclipse.edc.connector.dataplane.http.pipeline.datasink; + +import org.eclipse.edc.connector.dataplane.api.pipeline.HttpPart; +import org.eclipse.edc.connector.dataplane.http.params.HttpRequestFactory; +import org.eclipse.edc.connector.dataplane.http.spi.HttpRequestParams; +import org.eclipse.edc.connector.dataplane.spi.pipeline.DataSource; +import org.eclipse.edc.connector.dataplane.spi.pipeline.StreamFailure; +import org.eclipse.edc.connector.dataplane.spi.pipeline.StreamResult; +import org.eclipse.edc.connector.dataplane.util.sink.ParallelSink; +import org.eclipse.edc.spi.http.EdcHttpClient; + +import java.io.BufferedReader; +import java.io.IOException; +import java.io.InputStreamReader; +import java.util.List; +import java.util.Map; +import java.util.Objects; +import java.util.stream.Collectors; + +import static java.lang.String.format; + +/** + * Writes data in a streaming fashion to an HTTP endpoint. + */ +public class HttpDataSink extends ParallelSink { + private static final StreamResult ERROR_WRITING_DATA = StreamResult.error("Error writing data"); + + private HttpRequestParams params; + private EdcHttpClient httpClient; + private HttpRequestFactory requestFactory; + + @Override + protected StreamResult transferParts(List parts) { + for (var part : parts) { + Map additionalHeaders = Map.of(); + if (part instanceof HttpPart httpPart) { + if (!httpPart.isOk()) { + return failingStatusCodeToStreamResult(httpPart); + } + additionalHeaders = httpPart.getHeadersForRequest(); + } + + var request = requestFactory.toRequest(params, additionalHeaders, part::openStream); + try (var response = httpClient.execute(request)) { + if (!response.isSuccessful()) { + monitor.severe(format("Error {%s: %s} received writing HTTP data %s to endpoint %s for request: %s", + response.code(), response.message(), part.name(), request.url().url(), request)); + return ERROR_WRITING_DATA; + } + } catch (Exception e) { + monitor.severe(format("Error writing HTTP data %s to endpoint %s for request: %s", part.name(), request.url().url(), request), e); + return ERROR_WRITING_DATA; + } + } + return StreamResult.success(); + } + + private StreamResult failingStatusCodeToStreamResult(HttpPart httpPart) { + try(BufferedReader reader = new BufferedReader(new InputStreamReader(httpPart.openStream()))){ + String message = reader.lines().parallel().collect(Collectors.joining("\n")); + return switch (httpPart.getStatusCode()) { + case 401, 403 -> StreamResult.notAuthorized(); + case 404 -> StreamResult.notFound(); + default -> + StreamResult.failure(new StreamFailure(List.of(message), StreamFailure.Reason.GENERAL_ERROR)); + }; + } catch (IOException e) { + monitor.severe(format("Error writing error message of %s", httpPart.name()), e); + return ERROR_WRITING_DATA; + } + } + + private HttpDataSink() { + } + + public static class Builder extends ParallelSink.Builder { + + public static Builder newInstance() { + return new Builder(); + } + + private Builder() { + super(new HttpDataSink()); + } + + public Builder params(HttpRequestParams params) { + sink.params = params; + return this; + } + + public Builder httpClient(EdcHttpClient httpClient) { + sink.httpClient = httpClient; + return this; + } + + public Builder requestFactory(HttpRequestFactory requestFactory) { + sink.requestFactory = requestFactory; + return this; + } + + @Override + protected void validate() { + Objects.requireNonNull(sink.requestFactory, "requestFactory"); + } + } +} diff --git a/http-pull-connector/src/main/java/org/eclipse/edc/connector/dataplane/http/pipeline/datasink/HttpDataSinkFactory.java b/http-pull-connector/src/main/java/org/eclipse/edc/connector/dataplane/http/pipeline/datasink/HttpDataSinkFactory.java new file mode 100644 index 0000000..344cd04 --- /dev/null +++ b/http-pull-connector/src/main/java/org/eclipse/edc/connector/dataplane/http/pipeline/datasink/HttpDataSinkFactory.java @@ -0,0 +1,69 @@ +package org.eclipse.edc.connector.dataplane.http.pipeline.datasink; + +import org.eclipse.edc.connector.dataplane.http.params.HttpRequestFactory; +import org.eclipse.edc.connector.dataplane.http.spi.HttpRequestParamsProvider; +import org.eclipse.edc.connector.dataplane.spi.pipeline.DataSink; +import org.eclipse.edc.connector.dataplane.spi.pipeline.DataSinkFactory; +import org.eclipse.edc.dataaddress.httpdata.spi.HttpDataAddressSchema; +import org.eclipse.edc.spi.http.EdcHttpClient; +import org.eclipse.edc.spi.monitor.Monitor; +import org.eclipse.edc.spi.result.Result; +import org.eclipse.edc.spi.types.domain.transfer.DataFlowRequest; +import org.jetbrains.annotations.NotNull; + +import java.util.concurrent.ExecutorService; + +import static org.eclipse.edc.dataaddress.httpdata.spi.HttpDataAddressSchema.HTTP_DATA_TYPE; + +/** + * Instantiates {@link HttpDataSink}s for requests whose source data type is {@link HttpDataAddressSchema#HTTP_DATA_TYPE}. + */ +public class HttpDataSinkFactory implements DataSinkFactory { + private final EdcHttpClient httpClient; + private final ExecutorService executorService; + private final int partitionSize; + private final Monitor monitor; + private final HttpRequestParamsProvider requestParamsProvider; + private final HttpRequestFactory requestFactory; + + public HttpDataSinkFactory(EdcHttpClient httpClient, + ExecutorService executorService, + int partitionSize, + Monitor monitor, + HttpRequestParamsProvider requestParamsProvider, HttpRequestFactory requestFactory) { + this.httpClient = httpClient; + this.executorService = executorService; + this.partitionSize = partitionSize; + this.monitor = monitor; + this.requestParamsProvider = requestParamsProvider; + this.requestFactory = requestFactory; + } + + @Override + public boolean canHandle(DataFlowRequest request) { + return HTTP_DATA_TYPE.equals(request.getDestinationDataAddress().getType()); + } + + @Override + public @NotNull Result validateRequest(DataFlowRequest request) { + try { + createSink(request); + } catch (Exception e) { + return Result.failure("Failed to build HttpDataSink: " + e.getMessage()); + } + return Result.success(); + } + + @Override + public DataSink createSink(DataFlowRequest request) { + return HttpDataSink.Builder.newInstance() + .params(requestParamsProvider.provideSinkParams(request)) + .requestId(request.getId()) + .partitionSize(partitionSize) + .httpClient(httpClient) + .executorService(executorService) + .monitor(monitor) + .requestFactory(requestFactory) + .build(); + } +} diff --git a/http-pull-connector/src/main/java/org/eclipse/edc/connector/dataplane/http/pipeline/datasource/HttpDataSource.java b/http-pull-connector/src/main/java/org/eclipse/edc/connector/dataplane/http/pipeline/datasource/HttpDataSource.java new file mode 100644 index 0000000..80ed090 --- /dev/null +++ b/http-pull-connector/src/main/java/org/eclipse/edc/connector/dataplane/http/pipeline/datasource/HttpDataSource.java @@ -0,0 +1,104 @@ +package org.eclipse.edc.connector.dataplane.http.pipeline.datasource; + + +import org.eclipse.edc.connector.dataplane.api.pipeline.HttpPart; +import org.eclipse.edc.connector.dataplane.http.params.HttpRequestFactory; +import org.eclipse.edc.connector.dataplane.http.spi.HttpRequestParams; +import org.eclipse.edc.connector.dataplane.spi.pipeline.DataSource; +import org.eclipse.edc.connector.dataplane.spi.pipeline.StreamResult; +import org.eclipse.edc.spi.EdcException; +import org.eclipse.edc.spi.http.EdcHttpClient; +import org.eclipse.edc.spi.monitor.Monitor; + +import java.io.IOException; +import java.util.Objects; +import java.util.stream.Stream; + +import static java.lang.String.format; + +public class HttpDataSource implements DataSource { + + private String name; + private HttpRequestParams params; + private String requestId; + private Monitor monitor; + private EdcHttpClient httpClient; + private HttpRequestFactory requestFactory; + + @Override + public StreamResult> openPartStream() { + var request = requestFactory.toRequest(params); + monitor.debug(() -> "Executing HTTP request: " + request.url()); + try { + // NB: Do not close the response as the body input stream needs to be read after this method returns. The response closes the body stream. + var response = httpClient.execute(request); + var body = response.body(); + if (body == null) { + throw new EdcException(format("Received empty response body transferring HTTP data for request %s: %s", requestId, response.code())); + } + return StreamResult.success(Stream.of(new HttpPart(name, response.headers().toMultimap(), response.code(), body.byteStream()))); + + } catch (IOException e) { + throw new EdcException(e); + } + + } + + private HttpDataSource() { + } + + @Override + public void close() { + + } + + public static class Builder { + private final HttpDataSource dataSource; + + public static Builder newInstance() { + return new Builder(); + } + + private Builder() { + dataSource = new HttpDataSource(); + } + + public Builder params(HttpRequestParams params) { + dataSource.params = params; + return this; + } + + public Builder name(String name) { + dataSource.name = name; + return this; + } + + public Builder requestId(String requestId) { + dataSource.requestId = requestId; + return this; + } + + public Builder httpClient(EdcHttpClient httpClient) { + dataSource.httpClient = httpClient; + return this; + } + + public Builder monitor(Monitor monitor) { + dataSource.monitor = monitor; + return this; + } + + public Builder requestFactory(HttpRequestFactory requestFactory) { + dataSource.requestFactory = requestFactory; + return this; + } + + public HttpDataSource build() { + Objects.requireNonNull(dataSource.requestId, "requestId"); + Objects.requireNonNull(dataSource.httpClient, "httpClient"); + Objects.requireNonNull(dataSource.monitor, "monitor"); + Objects.requireNonNull(dataSource.requestFactory, "requestFactory"); + return dataSource; + } + } +} diff --git a/http-pull-connector/src/main/java/org/eclipse/edc/connector/dataplane/http/pipeline/datasource/HttpDataSourceFactory.java b/http-pull-connector/src/main/java/org/eclipse/edc/connector/dataplane/http/pipeline/datasource/HttpDataSourceFactory.java new file mode 100644 index 0000000..d0c094b --- /dev/null +++ b/http-pull-connector/src/main/java/org/eclipse/edc/connector/dataplane/http/pipeline/datasource/HttpDataSourceFactory.java @@ -0,0 +1,63 @@ +package org.eclipse.edc.connector.dataplane.http.pipeline.datasource; + +import org.eclipse.edc.connector.dataplane.http.params.HttpRequestFactory; +import org.eclipse.edc.connector.dataplane.http.spi.HttpDataAddress; +import org.eclipse.edc.connector.dataplane.http.spi.HttpRequestParamsProvider; +import org.eclipse.edc.connector.dataplane.spi.pipeline.DataSource; +import org.eclipse.edc.connector.dataplane.spi.pipeline.DataSourceFactory; +import org.eclipse.edc.dataaddress.httpdata.spi.HttpDataAddressSchema; +import org.eclipse.edc.spi.http.EdcHttpClient; +import org.eclipse.edc.spi.monitor.Monitor; +import org.eclipse.edc.spi.result.Result; +import org.eclipse.edc.spi.types.domain.transfer.DataFlowRequest; +import org.jetbrains.annotations.NotNull; + +import static org.eclipse.edc.dataaddress.httpdata.spi.HttpDataAddressSchema.HTTP_DATA_TYPE; + + +/** + * Instantiates {@link HttpDataSourceFactory}s for requests whose source data type is {@link HttpDataAddressSchema#HTTP_DATA_TYPE}. + */ +public class HttpDataSourceFactory implements DataSourceFactory { + private final EdcHttpClient httpClient; + private final HttpRequestParamsProvider requestParamsProvider; + private final Monitor monitor; + private final HttpRequestFactory requestFactory; + + public HttpDataSourceFactory(EdcHttpClient httpClient, HttpRequestParamsProvider requestParamsProvider, Monitor monitor, HttpRequestFactory requestFactory) { + this.httpClient = httpClient; + this.requestParamsProvider = requestParamsProvider; + this.monitor = monitor; + this.requestFactory = requestFactory; + } + + @Override + public boolean canHandle(DataFlowRequest request) { + return HTTP_DATA_TYPE.equals(request.getSourceDataAddress().getType()); + } + + @Override + public @NotNull Result validateRequest(DataFlowRequest request) { + try { + createSource(request); + } catch (Exception e) { + return Result.failure("Failed to build ExtendedHttpDataSource: " + e.getMessage()); + } + return Result.success(); + } + + @Override + public DataSource createSource(DataFlowRequest request) { + var dataAddress = HttpDataAddress.Builder.newInstance() + .copyFrom(request.getSourceDataAddress()) + .build(); + return HttpDataSource.Builder.newInstance() + .httpClient(httpClient) + .monitor(monitor) + .requestId(request.getId()) + .name(dataAddress.getName()) + .params(requestParamsProvider.provideSourceParams(request)) + .requestFactory(requestFactory) + .build(); + } +} diff --git a/http-pull-connector/src/main/resources/META-INF/services/org.eclipse.edc.spi.system.ServiceExtension b/http-pull-connector/src/main/resources/META-INF/services/org.eclipse.edc.spi.system.ServiceExtension new file mode 100644 index 0000000..f991a9c --- /dev/null +++ b/http-pull-connector/src/main/resources/META-INF/services/org.eclipse.edc.spi.system.ServiceExtension @@ -0,0 +1,2 @@ +org.eclipse.edc.connector.dataplane.api.DataPlanePublicApiExtension +org.eclipse.edc.connector.dataplane.http.DataPlaneHttpExtension \ No newline at end of file