From 246329757492da126cee5cc16f9fc30bb899b870 Mon Sep 17 00:00:00 2001 From: holomekc <30546982+holomekc@users.noreply.github.com> Date: Wed, 1 May 2024 19:18:51 +0200 Subject: [PATCH] feat: Funqy Amazon Lambda support more Amazon events Support for SQS, SNS, DynamoDB and Kinesis, with focus on the batching feature of AWS. Furthermore, add support for CloudEvents. Add tests for funqy amazon lambda --- bom/application/pom.xml | 7 + .../runtime/AbstractLambdaPollLoop.java | 1 + .../funqy-amazon-lambda/deployment/pom.xml | 10 + .../bindings/FunqyAmazonLambdaProcessor.java | 43 +++++ .../bindings/FunqyLambdaBuildStep.java | 12 +- .../quarkus/funqy/test/AnyFunctionTest.java | 48 +++++ .../test/CloudEventsEventFunctionTest.java | 62 ++++++ .../funqy/test/CloudEventsFunctionTest.java | 61 ++++++ .../funqy/test/DynamoDbEventFunctionTest.java | 95 ++++++++++ .../funqy/test/DynamoDbFunctionTest.java | 64 +++++++ .../test/java/io/quarkus/funqy/test/Item.java | 24 +++ .../funqy/test/KinesisFunctionTest.java | 94 ++++++++++ .../funqy/test/SnsEventFunctionTest.java | 50 +++++ .../quarkus/funqy/test/SnsFunctionTest.java | 49 +++++ .../funqy/test/SqsEventFunctionTest.java | 95 ++++++++++ .../SqsFunctionNoBatchItemFailuresTest.java | 48 +++++ .../quarkus/funqy/test/SqsFunctionTest.java | 94 ++++++++++ .../io/quarkus/funqy/test/TestFunctions.java | 63 +++++++ .../funqy/test/model/BatchItemFailures.java | 6 + .../quarkus/funqy/test/model/ItemFailure.java | 4 + .../funqy/test/util/EventDataProvider.java | 19 ++ .../test/resources/any-function.properties | 1 + .../src/test/resources/events/any/fail.json | 4 + .../src/test/resources/events/any/ok.json | 4 + .../resources/events/cloudevents/fail.json | 26 +++ .../test/resources/events/cloudevents/ok.json | 41 ++++ .../test/resources/events/dynamodb/fail.json | 62 ++++++ .../test/resources/events/dynamodb/ok.json | 62 ++++++ .../resources/events/dynamodb/pipes-fail.json | 60 ++++++ .../resources/events/dynamodb/pipes-ok.json | 60 ++++++ .../test/resources/events/kinesis/fail.json | 36 ++++ .../src/test/resources/events/kinesis/ok.json | 36 ++++ .../resources/events/kinesis/pipes-fail.json | 30 +++ .../resources/events/kinesis/pipes-ok.json | 30 +++ .../src/test/resources/events/sns/fail.json | 31 +++ .../src/test/resources/events/sns/ok.json | 31 +++ .../src/test/resources/events/sqs/fail.json | 36 ++++ .../src/test/resources/events/sqs/ok.json | 36 ++++ .../test/resources/events/sqs/pipes-fail.json | 34 ++++ .../test/resources/events/sqs/pipes-ok.json | 34 ++++ .../test/resources/item-function.properties | 2 + .../resources/no-batch-function.properties | 3 + .../funqy/funqy-amazon-lambda/runtime/pom.xml | 14 ++ .../lambda/FunqyLambdaBindingRecorder.java | 78 ++++---- .../AdvancedEventHandlingBuildTimeConfig.java | 17 ++ .../config/AdvancedEventHandlingConfig.java | 30 +++ .../quarkus/funqy/lambda/config/DynamoDb.java | 17 ++ .../config/FunqyAmazonBuildTimeConfig.java | 17 ++ .../lambda/config/FunqyAmazonConfig.java | 17 ++ .../quarkus/funqy/lambda/config/Kinesis.java | 17 ++ .../io/quarkus/funqy/lambda/config/Sns.java | 11 ++ .../io/quarkus/funqy/lambda/config/Sqs.java | 17 ++ .../quarkus/funqy/lambda/event/AwsModule.java | 42 +++++ .../funqy/lambda/event/EventDeserializer.java | 176 ++++++++++++++++++ .../funqy/lambda/event/EventErrorHandler.java | 29 +++ .../funqy/lambda/event/EventHandler.java | 20 ++ .../funqy/lambda/event/EventProcessor.java | 168 +++++++++++++++++ .../event/cloudevents/CloudEventsHandler.java | 46 +++++ .../event/dynamodb/DynamoDbEventHandler.java | 53 ++++++ .../dynamodb/PipesDynamoDbEventHandler.java | 53 ++++++ .../event/kinesis/DateDeserializer.java | 23 +++ .../event/kinesis/KinesisEventHandler.java | 54 ++++++ .../kinesis/PipesKinesisEventHandler.java | 54 ++++++ .../lambda/event/sns/SnsEventHandler.java | 49 +++++ .../event/sqs/PipesSqsEventHandler.java | 49 +++++ .../lambda/event/sqs/SqsEventHandler.java | 54 ++++++ .../funqy/lambda/model/FunqyMethod.java | 39 ++++ .../model/cloudevents/CloudEventDataV1.java | 27 +++ .../model/cloudevents/CloudEventV1.java | 172 +++++++++++++++++ .../model/kinesis/PipesKinesisEvent.java | 61 ++++++ .../lambda/model/pipes/BatchItemFailures.java | 14 ++ .../funqy/lambda/model/pipes/Response.java | 16 ++ 72 files changed, 3001 insertions(+), 41 deletions(-) create mode 100644 extensions/funqy/funqy-amazon-lambda/deployment/src/main/java/io/quarkus/funqy/deployment/bindings/FunqyAmazonLambdaProcessor.java create mode 100644 extensions/funqy/funqy-amazon-lambda/deployment/src/test/java/io/quarkus/funqy/test/AnyFunctionTest.java create mode 100644 extensions/funqy/funqy-amazon-lambda/deployment/src/test/java/io/quarkus/funqy/test/CloudEventsEventFunctionTest.java create mode 100644 extensions/funqy/funqy-amazon-lambda/deployment/src/test/java/io/quarkus/funqy/test/CloudEventsFunctionTest.java create mode 100644 extensions/funqy/funqy-amazon-lambda/deployment/src/test/java/io/quarkus/funqy/test/DynamoDbEventFunctionTest.java create mode 100644 extensions/funqy/funqy-amazon-lambda/deployment/src/test/java/io/quarkus/funqy/test/DynamoDbFunctionTest.java create mode 100644 extensions/funqy/funqy-amazon-lambda/deployment/src/test/java/io/quarkus/funqy/test/Item.java create mode 100644 extensions/funqy/funqy-amazon-lambda/deployment/src/test/java/io/quarkus/funqy/test/KinesisFunctionTest.java create mode 100644 extensions/funqy/funqy-amazon-lambda/deployment/src/test/java/io/quarkus/funqy/test/SnsEventFunctionTest.java create mode 100644 extensions/funqy/funqy-amazon-lambda/deployment/src/test/java/io/quarkus/funqy/test/SnsFunctionTest.java create mode 100644 extensions/funqy/funqy-amazon-lambda/deployment/src/test/java/io/quarkus/funqy/test/SqsEventFunctionTest.java create mode 100644 extensions/funqy/funqy-amazon-lambda/deployment/src/test/java/io/quarkus/funqy/test/SqsFunctionNoBatchItemFailuresTest.java create mode 100644 extensions/funqy/funqy-amazon-lambda/deployment/src/test/java/io/quarkus/funqy/test/SqsFunctionTest.java create mode 100644 extensions/funqy/funqy-amazon-lambda/deployment/src/test/java/io/quarkus/funqy/test/TestFunctions.java create mode 100644 extensions/funqy/funqy-amazon-lambda/deployment/src/test/java/io/quarkus/funqy/test/model/BatchItemFailures.java create mode 100644 extensions/funqy/funqy-amazon-lambda/deployment/src/test/java/io/quarkus/funqy/test/model/ItemFailure.java create mode 100644 extensions/funqy/funqy-amazon-lambda/deployment/src/test/java/io/quarkus/funqy/test/util/EventDataProvider.java create mode 100644 extensions/funqy/funqy-amazon-lambda/deployment/src/test/resources/any-function.properties create mode 100644 extensions/funqy/funqy-amazon-lambda/deployment/src/test/resources/events/any/fail.json create mode 100644 extensions/funqy/funqy-amazon-lambda/deployment/src/test/resources/events/any/ok.json create mode 100644 extensions/funqy/funqy-amazon-lambda/deployment/src/test/resources/events/cloudevents/fail.json create mode 100644 extensions/funqy/funqy-amazon-lambda/deployment/src/test/resources/events/cloudevents/ok.json create mode 100644 extensions/funqy/funqy-amazon-lambda/deployment/src/test/resources/events/dynamodb/fail.json create mode 100644 extensions/funqy/funqy-amazon-lambda/deployment/src/test/resources/events/dynamodb/ok.json create mode 100644 extensions/funqy/funqy-amazon-lambda/deployment/src/test/resources/events/dynamodb/pipes-fail.json create mode 100644 extensions/funqy/funqy-amazon-lambda/deployment/src/test/resources/events/dynamodb/pipes-ok.json create mode 100644 extensions/funqy/funqy-amazon-lambda/deployment/src/test/resources/events/kinesis/fail.json create mode 100644 extensions/funqy/funqy-amazon-lambda/deployment/src/test/resources/events/kinesis/ok.json create mode 100644 extensions/funqy/funqy-amazon-lambda/deployment/src/test/resources/events/kinesis/pipes-fail.json create mode 100644 extensions/funqy/funqy-amazon-lambda/deployment/src/test/resources/events/kinesis/pipes-ok.json create mode 100644 extensions/funqy/funqy-amazon-lambda/deployment/src/test/resources/events/sns/fail.json create mode 100644 extensions/funqy/funqy-amazon-lambda/deployment/src/test/resources/events/sns/ok.json create mode 100644 extensions/funqy/funqy-amazon-lambda/deployment/src/test/resources/events/sqs/fail.json create mode 100644 extensions/funqy/funqy-amazon-lambda/deployment/src/test/resources/events/sqs/ok.json create mode 100644 extensions/funqy/funqy-amazon-lambda/deployment/src/test/resources/events/sqs/pipes-fail.json create mode 100644 extensions/funqy/funqy-amazon-lambda/deployment/src/test/resources/events/sqs/pipes-ok.json create mode 100644 extensions/funqy/funqy-amazon-lambda/deployment/src/test/resources/item-function.properties create mode 100644 extensions/funqy/funqy-amazon-lambda/deployment/src/test/resources/no-batch-function.properties create mode 100644 extensions/funqy/funqy-amazon-lambda/runtime/src/main/java/io/quarkus/funqy/lambda/config/AdvancedEventHandlingBuildTimeConfig.java create mode 100644 extensions/funqy/funqy-amazon-lambda/runtime/src/main/java/io/quarkus/funqy/lambda/config/AdvancedEventHandlingConfig.java create mode 100644 extensions/funqy/funqy-amazon-lambda/runtime/src/main/java/io/quarkus/funqy/lambda/config/DynamoDb.java create mode 100644 extensions/funqy/funqy-amazon-lambda/runtime/src/main/java/io/quarkus/funqy/lambda/config/FunqyAmazonBuildTimeConfig.java create mode 100644 extensions/funqy/funqy-amazon-lambda/runtime/src/main/java/io/quarkus/funqy/lambda/config/FunqyAmazonConfig.java create mode 100644 extensions/funqy/funqy-amazon-lambda/runtime/src/main/java/io/quarkus/funqy/lambda/config/Kinesis.java create mode 100644 extensions/funqy/funqy-amazon-lambda/runtime/src/main/java/io/quarkus/funqy/lambda/config/Sns.java create mode 100644 extensions/funqy/funqy-amazon-lambda/runtime/src/main/java/io/quarkus/funqy/lambda/config/Sqs.java create mode 100644 extensions/funqy/funqy-amazon-lambda/runtime/src/main/java/io/quarkus/funqy/lambda/event/AwsModule.java create mode 100644 extensions/funqy/funqy-amazon-lambda/runtime/src/main/java/io/quarkus/funqy/lambda/event/EventDeserializer.java create mode 100644 extensions/funqy/funqy-amazon-lambda/runtime/src/main/java/io/quarkus/funqy/lambda/event/EventErrorHandler.java create mode 100644 extensions/funqy/funqy-amazon-lambda/runtime/src/main/java/io/quarkus/funqy/lambda/event/EventHandler.java create mode 100644 extensions/funqy/funqy-amazon-lambda/runtime/src/main/java/io/quarkus/funqy/lambda/event/EventProcessor.java create mode 100644 extensions/funqy/funqy-amazon-lambda/runtime/src/main/java/io/quarkus/funqy/lambda/event/cloudevents/CloudEventsHandler.java create mode 100644 extensions/funqy/funqy-amazon-lambda/runtime/src/main/java/io/quarkus/funqy/lambda/event/dynamodb/DynamoDbEventHandler.java create mode 100644 extensions/funqy/funqy-amazon-lambda/runtime/src/main/java/io/quarkus/funqy/lambda/event/dynamodb/PipesDynamoDbEventHandler.java create mode 100644 extensions/funqy/funqy-amazon-lambda/runtime/src/main/java/io/quarkus/funqy/lambda/event/kinesis/DateDeserializer.java create mode 100644 extensions/funqy/funqy-amazon-lambda/runtime/src/main/java/io/quarkus/funqy/lambda/event/kinesis/KinesisEventHandler.java create mode 100644 extensions/funqy/funqy-amazon-lambda/runtime/src/main/java/io/quarkus/funqy/lambda/event/kinesis/PipesKinesisEventHandler.java create mode 100644 extensions/funqy/funqy-amazon-lambda/runtime/src/main/java/io/quarkus/funqy/lambda/event/sns/SnsEventHandler.java create mode 100644 extensions/funqy/funqy-amazon-lambda/runtime/src/main/java/io/quarkus/funqy/lambda/event/sqs/PipesSqsEventHandler.java create mode 100644 extensions/funqy/funqy-amazon-lambda/runtime/src/main/java/io/quarkus/funqy/lambda/event/sqs/SqsEventHandler.java create mode 100644 extensions/funqy/funqy-amazon-lambda/runtime/src/main/java/io/quarkus/funqy/lambda/model/FunqyMethod.java create mode 100644 extensions/funqy/funqy-amazon-lambda/runtime/src/main/java/io/quarkus/funqy/lambda/model/cloudevents/CloudEventDataV1.java create mode 100644 extensions/funqy/funqy-amazon-lambda/runtime/src/main/java/io/quarkus/funqy/lambda/model/cloudevents/CloudEventV1.java create mode 100644 extensions/funqy/funqy-amazon-lambda/runtime/src/main/java/io/quarkus/funqy/lambda/model/kinesis/PipesKinesisEvent.java create mode 100644 extensions/funqy/funqy-amazon-lambda/runtime/src/main/java/io/quarkus/funqy/lambda/model/pipes/BatchItemFailures.java create mode 100644 extensions/funqy/funqy-amazon-lambda/runtime/src/main/java/io/quarkus/funqy/lambda/model/pipes/Response.java diff --git a/bom/application/pom.xml b/bom/application/pom.xml index 34b0452e2fe1f5..b29490c1ca6915 100644 --- a/bom/application/pom.xml +++ b/bom/application/pom.xml @@ -156,6 +156,7 @@ 1.2.3 3.11.4 2.15.2 + 3.0.0 3.1.0 1.0.0 1.9.23 @@ -5678,6 +5679,12 @@ + + io.cloudevents + cloudevents-api + ${cloudevents-api.version} + + com.microsoft.azure.functions azure-functions-java-library diff --git a/extensions/amazon-lambda/common-runtime/src/main/java/io/quarkus/amazon/lambda/runtime/AbstractLambdaPollLoop.java b/extensions/amazon-lambda/common-runtime/src/main/java/io/quarkus/amazon/lambda/runtime/AbstractLambdaPollLoop.java index c967b6064c5e3b..a493e1489f64bb 100644 --- a/extensions/amazon-lambda/common-runtime/src/main/java/io/quarkus/amazon/lambda/runtime/AbstractLambdaPollLoop.java +++ b/extensions/amazon-lambda/common-runtime/src/main/java/io/quarkus/amazon/lambda/runtime/AbstractLambdaPollLoop.java @@ -125,6 +125,7 @@ public void run() { URL url = AmazonLambdaApi.invocationResponse(baseUrl, requestId); if (isStream()) { HttpURLConnection responseConnection = responseStream(url); + responseConnection.setRequestProperty("Content-Type", "application/json"); if (running.get()) { processRequest(requestConnection.getInputStream(), responseConnection.getOutputStream(), createContext(requestConnection)); diff --git a/extensions/funqy/funqy-amazon-lambda/deployment/pom.xml b/extensions/funqy/funqy-amazon-lambda/deployment/pom.xml index 2a03684b9a6752..159c1b235e5ad5 100644 --- a/extensions/funqy/funqy-amazon-lambda/deployment/pom.xml +++ b/extensions/funqy/funqy-amazon-lambda/deployment/pom.xml @@ -29,6 +29,16 @@ io.quarkus quarkus-arc-deployment + + io.quarkus + quarkus-junit5-internal + test + + + io.rest-assured + rest-assured + test + diff --git a/extensions/funqy/funqy-amazon-lambda/deployment/src/main/java/io/quarkus/funqy/deployment/bindings/FunqyAmazonLambdaProcessor.java b/extensions/funqy/funqy-amazon-lambda/deployment/src/main/java/io/quarkus/funqy/deployment/bindings/FunqyAmazonLambdaProcessor.java new file mode 100644 index 00000000000000..ee4d950cf9b1df --- /dev/null +++ b/extensions/funqy/funqy-amazon-lambda/deployment/src/main/java/io/quarkus/funqy/deployment/bindings/FunqyAmazonLambdaProcessor.java @@ -0,0 +1,43 @@ +package io.quarkus.funqy.deployment.bindings; + +import com.amazonaws.services.lambda.runtime.events.DynamodbEvent; +import com.amazonaws.services.lambda.runtime.events.KinesisEvent; +import com.amazonaws.services.lambda.runtime.events.SNSEvent; +import com.amazonaws.services.lambda.runtime.events.SQSBatchResponse; +import com.amazonaws.services.lambda.runtime.events.SQSEvent; +import com.amazonaws.services.lambda.runtime.events.StreamsEventResponse; +import com.amazonaws.services.lambda.runtime.events.models.kinesis.Record; + +import io.quarkus.deployment.annotations.BuildProducer; +import io.quarkus.deployment.annotations.BuildStep; +import io.quarkus.deployment.builditem.nativeimage.ReflectiveClassBuildItem; +import io.quarkus.deployment.pkg.steps.NativeBuild; + +public class FunqyAmazonLambdaProcessor { + + @BuildStep(onlyIf = NativeBuild.class) + public void process(BuildProducer reflectiveClass) { + reflectiveClass.produce(ReflectiveClassBuildItem.builder( + // SQS + SQSEvent.class.getName(), + SQSEvent.SQSMessage.class.getName(), + SQSEvent.MessageAttribute.class.getName(), + SQSBatchResponse.class.getName(), + SQSBatchResponse.BatchItemFailure.class.getName(), + // SNS + SNSEvent.class.getName(), + SNSEvent.SNSRecord.class.getName(), + SNSEvent.SNS.class.getName(), + // Kinesis + KinesisEvent.class.getName(), + KinesisEvent.KinesisEventRecord.class.getName(), + Record.class.getName(), + StreamsEventResponse.class.getName(), + StreamsEventResponse.BatchItemFailure.class.getName(), + // DynamoDB + DynamodbEvent.class.getName(), + DynamodbEvent.DynamodbStreamRecord.class.getName() + ).constructors().methods().fields().build() + ); + } +} diff --git a/extensions/funqy/funqy-amazon-lambda/deployment/src/main/java/io/quarkus/funqy/deployment/bindings/FunqyLambdaBuildStep.java b/extensions/funqy/funqy-amazon-lambda/deployment/src/main/java/io/quarkus/funqy/deployment/bindings/FunqyLambdaBuildStep.java index 6a8bcbdc213d9c..9158ec43dcbbb5 100644 --- a/extensions/funqy/funqy-amazon-lambda/deployment/src/main/java/io/quarkus/funqy/deployment/bindings/FunqyLambdaBuildStep.java +++ b/extensions/funqy/funqy-amazon-lambda/deployment/src/main/java/io/quarkus/funqy/deployment/bindings/FunqyLambdaBuildStep.java @@ -20,6 +20,8 @@ import io.quarkus.funqy.deployment.FunctionBuildItem; import io.quarkus.funqy.deployment.FunctionInitializedBuildItem; import io.quarkus.funqy.lambda.FunqyLambdaBindingRecorder; +import io.quarkus.funqy.lambda.config.FunqyAmazonBuildTimeConfig; +import io.quarkus.funqy.lambda.config.FunqyAmazonConfig; import io.quarkus.funqy.runtime.FunqyConfig; import io.quarkus.runtime.LaunchMode; @@ -37,17 +39,19 @@ public void init(List functions, BuildProducer feature, Optional hasFunctions, LambdaObjectMapperInitializedBuildItem mapperDependency, - BeanContainerBuildItem beanContainer) throws Exception { + BeanContainerBuildItem beanContainer, + FunqyAmazonBuildTimeConfig buildTimeConfig) throws Exception { if (!hasFunctions.isPresent() || hasFunctions.get() == null) return; feature.produce(new FeatureBuildItem(FUNQY_AMAZON_LAMBDA)); - recorder.init(beanContainer.getValue()); + recorder.init(beanContainer.getValue(), buildTimeConfig); } @BuildStep @Record(RUNTIME_INIT) - public RuntimeComplete choose(FunqyConfig config, FunqyLambdaBindingRecorder recorder) { - recorder.chooseInvoker(config); + public RuntimeComplete choose(FunqyConfig config, FunqyAmazonConfig amazonConfig, + FunqyLambdaBindingRecorder recorder) { + recorder.chooseInvoker(config, amazonConfig); return new RuntimeComplete(); } diff --git a/extensions/funqy/funqy-amazon-lambda/deployment/src/test/java/io/quarkus/funqy/test/AnyFunctionTest.java b/extensions/funqy/funqy-amazon-lambda/deployment/src/test/java/io/quarkus/funqy/test/AnyFunctionTest.java new file mode 100644 index 00000000000000..ac505ad6c16a29 --- /dev/null +++ b/extensions/funqy/funqy-amazon-lambda/deployment/src/test/java/io/quarkus/funqy/test/AnyFunctionTest.java @@ -0,0 +1,48 @@ +package io.quarkus.funqy.test; + +import static io.quarkus.funqy.test.util.EventDataProvider.getData; + +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.extension.RegisterExtension; + +import io.quarkus.funqy.test.util.EventDataProvider; +import io.quarkus.test.QuarkusUnitTest; +import io.restassured.RestAssured; + +public class AnyFunctionTest { + @RegisterExtension + static QuarkusUnitTest test = new QuarkusUnitTest() + .withApplicationRoot((jar) -> jar + .addAsResource("any-function.properties", "application.properties") + .addAsResource("events/any", "events") + .addClasses(TestFunctions.class, Item.class, + EventDataProvider.class)); + + @Test + public void should_return_no_failures_if_processing_is_ok() { + // given + var body = getData("ok.json"); + + // when + var response = RestAssured.given().contentType("application/json") + .body(body) + .post("/"); + + // then + response.then().statusCode(204); + } + + @Test + public void should_return_one_failure_if_processing_fails() { + // given + var body = getData("fail.json"); + + // when + var response = RestAssured.given().contentType("application/json") + .body(body) + .post("/"); + + // then + response.then().statusCode(500); + } +} diff --git a/extensions/funqy/funqy-amazon-lambda/deployment/src/test/java/io/quarkus/funqy/test/CloudEventsEventFunctionTest.java b/extensions/funqy/funqy-amazon-lambda/deployment/src/test/java/io/quarkus/funqy/test/CloudEventsEventFunctionTest.java new file mode 100644 index 00000000000000..a7de253a34c42d --- /dev/null +++ b/extensions/funqy/funqy-amazon-lambda/deployment/src/test/java/io/quarkus/funqy/test/CloudEventsEventFunctionTest.java @@ -0,0 +1,62 @@ +package io.quarkus.funqy.test; + +import static io.quarkus.funqy.test.util.EventDataProvider.getData; +import static org.hamcrest.MatcherAssert.assertThat; +import static org.hamcrest.Matchers.empty; +import static org.hamcrest.Matchers.hasItem; +import static org.hamcrest.Matchers.hasSize; +import static org.hamcrest.Matchers.is; + +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.extension.RegisterExtension; + +import io.quarkus.funqy.test.model.BatchItemFailures; +import io.quarkus.funqy.test.model.ItemFailure; +import io.quarkus.funqy.test.util.EventDataProvider; +import io.quarkus.test.QuarkusUnitTest; +import io.restassured.RestAssured; + +public class CloudEventsEventFunctionTest { + @RegisterExtension + static QuarkusUnitTest test = new QuarkusUnitTest() + .overrideRuntimeConfigKey("quarkus.funqy.export", "cloudevents-function") + .withApplicationRoot((jar) -> jar + .addAsResource("item-function.properties", "application.properties") + .addAsResource("events/cloudevents", "events") + .addClasses(TestFunctions.class, Item.class, + BatchItemFailures.class, ItemFailure.class, + EventDataProvider.class)); + + @Test + public void should_return_no_failures_if_processing_is_ok() { + // given + var body = getData("ok.json"); + + // when + var response = RestAssured.given().contentType("application/json") + .body(body) + .post("/"); + + // then + var respBody = response.then().statusCode(200) + .extract().body().as(BatchItemFailures.class); + assertThat(respBody.batchItemFailures(), is(empty())); + } + + @Test + public void should_return_one_failure_if_processing_fails() { + // given + var body = getData("fail.json"); + + // when + var response = RestAssured.given().contentType("application/json") + .body(body) + .post("/"); + + // then + var respBody = response.then().statusCode(200) + .extract().body().as(BatchItemFailures.class); + assertThat(respBody.batchItemFailures(), hasSize(1)); + assertThat(respBody.batchItemFailures().stream().map(ItemFailure::itemIdentifier).toList(), hasItem("1")); + } +} diff --git a/extensions/funqy/funqy-amazon-lambda/deployment/src/test/java/io/quarkus/funqy/test/CloudEventsFunctionTest.java b/extensions/funqy/funqy-amazon-lambda/deployment/src/test/java/io/quarkus/funqy/test/CloudEventsFunctionTest.java new file mode 100644 index 00000000000000..bbac786e904db6 --- /dev/null +++ b/extensions/funqy/funqy-amazon-lambda/deployment/src/test/java/io/quarkus/funqy/test/CloudEventsFunctionTest.java @@ -0,0 +1,61 @@ +package io.quarkus.funqy.test; + +import static io.quarkus.funqy.test.util.EventDataProvider.getData; +import static org.hamcrest.MatcherAssert.assertThat; +import static org.hamcrest.Matchers.empty; +import static org.hamcrest.Matchers.hasItem; +import static org.hamcrest.Matchers.hasSize; +import static org.hamcrest.Matchers.is; + +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.extension.RegisterExtension; + +import io.quarkus.funqy.test.model.BatchItemFailures; +import io.quarkus.funqy.test.model.ItemFailure; +import io.quarkus.funqy.test.util.EventDataProvider; +import io.quarkus.test.QuarkusUnitTest; +import io.restassured.RestAssured; + +public class CloudEventsFunctionTest { + @RegisterExtension + static QuarkusUnitTest test = new QuarkusUnitTest() + .withApplicationRoot((jar) -> jar + .addAsResource("item-function.properties", "application.properties") + .addAsResource("events/cloudevents", "events") + .addClasses(TestFunctions.class, Item.class, + BatchItemFailures.class, ItemFailure.class, + EventDataProvider.class)); + + @Test + public void should_return_no_failures_if_processing_is_ok() { + // given + var body = getData("ok.json"); + + // when + var response = RestAssured.given().contentType("application/json") + .body(body) + .post("/"); + + // then + var respBody = response.then().statusCode(200) + .extract().body().as(BatchItemFailures.class); + assertThat(respBody.batchItemFailures(), is(empty())); + } + + @Test + public void should_return_one_failure_if_processing_fails() { + // given + var body = getData("fail.json"); + + // when + var response = RestAssured.given().contentType("application/json") + .body(body) + .post("/"); + + // then + var respBody = response.then().statusCode(200) + .extract().body().as(BatchItemFailures.class); + assertThat(respBody.batchItemFailures(), hasSize(1)); + assertThat(respBody.batchItemFailures().stream().map(ItemFailure::itemIdentifier).toList(), hasItem("1")); + } +} diff --git a/extensions/funqy/funqy-amazon-lambda/deployment/src/test/java/io/quarkus/funqy/test/DynamoDbEventFunctionTest.java b/extensions/funqy/funqy-amazon-lambda/deployment/src/test/java/io/quarkus/funqy/test/DynamoDbEventFunctionTest.java new file mode 100644 index 00000000000000..44739d1e7dd388 --- /dev/null +++ b/extensions/funqy/funqy-amazon-lambda/deployment/src/test/java/io/quarkus/funqy/test/DynamoDbEventFunctionTest.java @@ -0,0 +1,95 @@ +package io.quarkus.funqy.test; + +import static io.quarkus.funqy.test.util.EventDataProvider.getData; +import static org.hamcrest.MatcherAssert.assertThat; +import static org.hamcrest.Matchers.empty; +import static org.hamcrest.Matchers.hasItem; +import static org.hamcrest.Matchers.hasSize; +import static org.hamcrest.Matchers.is; + +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.extension.RegisterExtension; + +import io.quarkus.funqy.test.model.BatchItemFailures; +import io.quarkus.funqy.test.model.ItemFailure; +import io.quarkus.funqy.test.util.EventDataProvider; +import io.quarkus.test.QuarkusUnitTest; +import io.restassured.RestAssured; + +public class DynamoDbEventFunctionTest { + @RegisterExtension + static QuarkusUnitTest test = new QuarkusUnitTest() + .overrideRuntimeConfigKey("quarkus.funqy.export", "dynamodb-function") + .withApplicationRoot((jar) -> jar + .addAsResource("item-function.properties", "application.properties") + .addAsResource("events/dynamodb", "events") + .addClasses(TestFunctions.class, Item.class, + BatchItemFailures.class, ItemFailure.class, + EventDataProvider.class)); + + @Test + public void should_return_no_failures_if_processing_is_ok() { + // given + var body = getData("ok.json"); + + // when + var response = RestAssured.given().contentType("application/json") + .body(body) + .post("/"); + + // then + var respBody = response.then().statusCode(200) + .extract().body().as(BatchItemFailures.class); + assertThat(respBody.batchItemFailures(), is(empty())); + } + + @Test + public void should_return_one_failure_if_processing_fails() { + // given + var body = getData("fail.json"); + + // when + var response = RestAssured.given().contentType("application/json") + .body(body) + .post("/"); + + // then + var respBody = response.then().statusCode(200) + .extract().body().as(BatchItemFailures.class); + assertThat(respBody.batchItemFailures(), hasSize(1)); + assertThat(respBody.batchItemFailures().stream().map(ItemFailure::itemIdentifier).toList(), hasItem("1")); + } + + @Test + public void should_return_no_failures_if_processing_pipes_is_ok() { + // given + var body = getData("pipes-ok.json"); + + // when + var response = RestAssured.given().contentType("application/json") + .body(body) + .post("/"); + + // then + var respBody = response.then().statusCode(200) + .extract().body().as(BatchItemFailures.class); + assertThat(respBody.batchItemFailures(), is(empty())); + } + + @Test + public void should_return_one_failure_if_processing_pipes_fails() { + // given + var body = getData("pipes-fail.json"); + + // when + var response = RestAssured.given().contentType("application/json") + .body(body) + .post("/"); + + // then + var respBody = response.then().statusCode(200) + .extract().body().as(BatchItemFailures.class); + assertThat(respBody.batchItemFailures(), hasSize(1)); + assertThat(respBody.batchItemFailures().stream().map(ItemFailure::itemIdentifier).toList(), hasItem("1")); + } +} diff --git a/extensions/funqy/funqy-amazon-lambda/deployment/src/test/java/io/quarkus/funqy/test/DynamoDbFunctionTest.java b/extensions/funqy/funqy-amazon-lambda/deployment/src/test/java/io/quarkus/funqy/test/DynamoDbFunctionTest.java new file mode 100644 index 00000000000000..c7451eff51403d --- /dev/null +++ b/extensions/funqy/funqy-amazon-lambda/deployment/src/test/java/io/quarkus/funqy/test/DynamoDbFunctionTest.java @@ -0,0 +1,64 @@ +package io.quarkus.funqy.test; + +import static io.quarkus.funqy.test.util.EventDataProvider.getData; +import static org.hamcrest.MatcherAssert.assertThat; +import static org.hamcrest.Matchers.hasItems; +import static org.hamcrest.Matchers.hasSize; + +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.extension.RegisterExtension; + +import io.quarkus.funqy.test.model.BatchItemFailures; +import io.quarkus.funqy.test.model.ItemFailure; +import io.quarkus.funqy.test.util.EventDataProvider; +import io.quarkus.test.QuarkusUnitTest; +import io.restassured.RestAssured; + +public class DynamoDbFunctionTest { + @RegisterExtension + static QuarkusUnitTest test = new QuarkusUnitTest() + .withApplicationRoot((jar) -> jar + .addAsResource("item-function.properties", "application.properties") + .addAsResource("events/dynamodb", "events") + .addClasses(TestFunctions.class, Item.class, + BatchItemFailures.class, ItemFailure.class, + EventDataProvider.class)); + + @Test + public void should_fail_on_dynamodb_event_without_dynamodb_event_type() { + // given + var body = getData("ok.json"); + + // when + var response = RestAssured.given().contentType("application/json") + .body(body) + .post("/"); + + // then + // It is not supported to transform the DynamoDB event record to an internal model. Therefore, if somebody + // would try this, the lambda would return every message as failure in batch item failures and log an error. + var respBody = response.then().statusCode(200) + .extract().body().as(BatchItemFailures.class); + assertThat(respBody.batchItemFailures(), hasSize(2)); + assertThat(respBody.batchItemFailures().stream().map(ItemFailure::itemIdentifier).toList(), hasItems("1", "2")); + } + + @Test + public void should_fail_on_dynamodb_event_via_pipes_without_dynamodb_event_type() { + // given + var body = getData("pipes-ok.json"); + + // when + var response = RestAssured.given().contentType("application/json") + .body(body) + .post("/"); + + // then + // It is not supported to transform the DynamoDB event record to an internal model. Therefore, if somebody + // would try this, the lambda would return every message as failure in batch item failures and log an error. + var respBody = response.then().statusCode(200) + .extract().body().as(BatchItemFailures.class); + assertThat(respBody.batchItemFailures(), hasSize(2)); + assertThat(respBody.batchItemFailures().stream().map(ItemFailure::itemIdentifier).toList(), hasItems("1", "2")); + } +} diff --git a/extensions/funqy/funqy-amazon-lambda/deployment/src/test/java/io/quarkus/funqy/test/Item.java b/extensions/funqy/funqy-amazon-lambda/deployment/src/test/java/io/quarkus/funqy/test/Item.java new file mode 100644 index 00000000000000..26975491bde6db --- /dev/null +++ b/extensions/funqy/funqy-amazon-lambda/deployment/src/test/java/io/quarkus/funqy/test/Item.java @@ -0,0 +1,24 @@ +package io.quarkus.funqy.test; + +public class Item { + + String message; + + boolean throwError; + + public String getMessage() { + return message; + } + + public void setMessage(final String message) { + this.message = message; + } + + public boolean isThrowError() { + return throwError; + } + + public void setThrowError(final boolean throwError) { + this.throwError = throwError; + } +} diff --git a/extensions/funqy/funqy-amazon-lambda/deployment/src/test/java/io/quarkus/funqy/test/KinesisFunctionTest.java b/extensions/funqy/funqy-amazon-lambda/deployment/src/test/java/io/quarkus/funqy/test/KinesisFunctionTest.java new file mode 100644 index 00000000000000..4371d5e47d36da --- /dev/null +++ b/extensions/funqy/funqy-amazon-lambda/deployment/src/test/java/io/quarkus/funqy/test/KinesisFunctionTest.java @@ -0,0 +1,94 @@ +package io.quarkus.funqy.test; + +import static io.quarkus.funqy.test.util.EventDataProvider.getData; +import static org.hamcrest.MatcherAssert.assertThat; +import static org.hamcrest.Matchers.empty; +import static org.hamcrest.Matchers.hasItem; +import static org.hamcrest.Matchers.hasSize; +import static org.hamcrest.Matchers.is; + +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.extension.RegisterExtension; + +import io.quarkus.funqy.test.model.BatchItemFailures; +import io.quarkus.funqy.test.model.ItemFailure; +import io.quarkus.funqy.test.util.EventDataProvider; +import io.quarkus.test.QuarkusUnitTest; +import io.restassured.RestAssured; + +public class KinesisFunctionTest { + @RegisterExtension + static QuarkusUnitTest test = new QuarkusUnitTest() + .withApplicationRoot((jar) -> jar + .addAsResource("item-function.properties", "application.properties") + .addAsResource("events/kinesis", "events") + .addClasses(TestFunctions.class, Item.class, + BatchItemFailures.class, ItemFailure.class, + EventDataProvider.class)); + + @Test + public void should_return_no_failures_if_processing_is_ok() { + // given + var body = getData("ok.json"); + + // when + var response = RestAssured.given().contentType("application/json") + .body(body) + .post("/"); + + // then + var respBody = response.then().statusCode(200) + .extract().body().as(BatchItemFailures.class); + assertThat(respBody.batchItemFailures(), is(empty())); + } + + @Test + public void should_return_one_failure_if_processing_fails() { + // given + var body = getData("fail.json"); + + // when + var response = RestAssured.given().contentType("application/json") + .body(body) + .post("/"); + + // then + var respBody = response.then().statusCode(200) + .extract().body().as(BatchItemFailures.class); + assertThat(respBody.batchItemFailures(), hasSize(1)); + assertThat(respBody.batchItemFailures().stream().map(ItemFailure::itemIdentifier).toList(), hasItem("1")); + } + + @Test + public void should_return_no_failures_if_processing_pipes_is_ok() { + // given + var body = getData("pipes-ok.json"); + + // when + var response = RestAssured.given().contentType("application/json") + .body(body) + .post("/"); + + // then + var respBody = response.then().statusCode(200) + .extract().body().as(BatchItemFailures.class); + assertThat(respBody.batchItemFailures(), is(empty())); + } + + @Test + public void should_return_one_failure_if_processing_pipes_fails() { + // given + var body = getData("pipes-fail.json"); + + // when + var response = RestAssured.given().contentType("application/json") + .body(body) + .post("/"); + + // then + var respBody = response.then().statusCode(200) + .extract().body().as(BatchItemFailures.class); + assertThat(respBody.batchItemFailures(), hasSize(1)); + assertThat(respBody.batchItemFailures().stream().map(ItemFailure::itemIdentifier).toList(), hasItem("1")); + } +} diff --git a/extensions/funqy/funqy-amazon-lambda/deployment/src/test/java/io/quarkus/funqy/test/SnsEventFunctionTest.java b/extensions/funqy/funqy-amazon-lambda/deployment/src/test/java/io/quarkus/funqy/test/SnsEventFunctionTest.java new file mode 100644 index 00000000000000..c01be98ef1ad40 --- /dev/null +++ b/extensions/funqy/funqy-amazon-lambda/deployment/src/test/java/io/quarkus/funqy/test/SnsEventFunctionTest.java @@ -0,0 +1,50 @@ +package io.quarkus.funqy.test; + +import static io.quarkus.funqy.test.util.EventDataProvider.getData; + +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.extension.RegisterExtension; + +import io.quarkus.funqy.test.util.EventDataProvider; +import io.quarkus.test.QuarkusUnitTest; +import io.restassured.RestAssured; + +public class SnsEventFunctionTest { + @RegisterExtension + static QuarkusUnitTest test = new QuarkusUnitTest() + .overrideRuntimeConfigKey("quarkus.funqy.export", "sns-function") + .withApplicationRoot((jar) -> jar + .addAsResource("item-function.properties", "application.properties") + .addAsResource("events/sns", "events") + .addClasses(TestFunctions.class, Item.class, + EventDataProvider.class)); + + @Test + public void should_return_no_failures_if_processing_is_ok() { + // given + var body = getData("ok.json"); + + // when + var response = RestAssured.given().contentType("application/json") + .body(body) + .post("/"); + + // then + response.then().statusCode(204); + } + + @Test + public void should_return_one_failure_if_processing_fails() { + // given + var body = getData("fail.json"); + + // when + var response = RestAssured.given().contentType("application/json") + .body(body) + .post("/"); + + // then + // SNS triggers have no error handling. + response.then().statusCode(204); + } +} diff --git a/extensions/funqy/funqy-amazon-lambda/deployment/src/test/java/io/quarkus/funqy/test/SnsFunctionTest.java b/extensions/funqy/funqy-amazon-lambda/deployment/src/test/java/io/quarkus/funqy/test/SnsFunctionTest.java new file mode 100644 index 00000000000000..5c0d7956077e3b --- /dev/null +++ b/extensions/funqy/funqy-amazon-lambda/deployment/src/test/java/io/quarkus/funqy/test/SnsFunctionTest.java @@ -0,0 +1,49 @@ +package io.quarkus.funqy.test; + +import static io.quarkus.funqy.test.util.EventDataProvider.getData; + +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.extension.RegisterExtension; + +import io.quarkus.funqy.test.util.EventDataProvider; +import io.quarkus.test.QuarkusUnitTest; +import io.restassured.RestAssured; + +public class SnsFunctionTest { + @RegisterExtension + static QuarkusUnitTest test = new QuarkusUnitTest() + .withApplicationRoot((jar) -> jar + .addAsResource("item-function.properties", "application.properties") + .addAsResource("events/sns", "events") + .addClasses(TestFunctions.class, Item.class, + EventDataProvider.class)); + + @Test + public void should_return_no_failures_if_processing_is_ok() { + // given + var body = getData("ok.json"); + + // when + var response = RestAssured.given().contentType("application/json") + .body(body) + .post("/"); + + // then + response.then().statusCode(204); + } + + @Test + public void should_return_one_failure_if_processing_fails() { + // given + var body = getData("fail.json"); + + // when + var response = RestAssured.given().contentType("application/json") + .body(body) + .post("/"); + + // then + // SNS triggers have no error handling. + response.then().statusCode(204); + } +} diff --git a/extensions/funqy/funqy-amazon-lambda/deployment/src/test/java/io/quarkus/funqy/test/SqsEventFunctionTest.java b/extensions/funqy/funqy-amazon-lambda/deployment/src/test/java/io/quarkus/funqy/test/SqsEventFunctionTest.java new file mode 100644 index 00000000000000..40f77665e6c4cf --- /dev/null +++ b/extensions/funqy/funqy-amazon-lambda/deployment/src/test/java/io/quarkus/funqy/test/SqsEventFunctionTest.java @@ -0,0 +1,95 @@ +package io.quarkus.funqy.test; + +import static io.quarkus.funqy.test.util.EventDataProvider.getData; +import static org.hamcrest.MatcherAssert.assertThat; +import static org.hamcrest.Matchers.empty; +import static org.hamcrest.Matchers.hasItem; +import static org.hamcrest.Matchers.hasSize; +import static org.hamcrest.Matchers.is; + +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.extension.RegisterExtension; + +import io.quarkus.funqy.test.model.BatchItemFailures; +import io.quarkus.funqy.test.model.ItemFailure; +import io.quarkus.funqy.test.util.EventDataProvider; +import io.quarkus.test.QuarkusUnitTest; +import io.restassured.RestAssured; + +public class SqsEventFunctionTest { + @RegisterExtension + static QuarkusUnitTest test = new QuarkusUnitTest() + .overrideRuntimeConfigKey("quarkus.funqy.export", "sqs-function") + .withApplicationRoot((jar) -> jar + .addAsResource("item-function.properties", "application.properties") + .addAsResource("events/sqs", "events") + .addClasses(TestFunctions.class, Item.class, + BatchItemFailures.class, ItemFailure.class, + EventDataProvider.class)); + + @Test + public void should_return_no_failures_if_processing_is_ok() { + // given + var body = getData("ok.json"); + + // when + var response = RestAssured.given().contentType("application/json") + .body(body) + .post("/"); + + // then + var respBody = response.then().statusCode(200) + .extract().body().as(BatchItemFailures.class); + assertThat(respBody.batchItemFailures(), is(empty())); + } + + @Test + public void should_return_one_failure_if_processing_fails() { + // given + var body = getData("fail.json"); + + // when + var response = RestAssured.given().contentType("application/json") + .body(body) + .post("/"); + + // then + var respBody = response.then().statusCode(200) + .extract().body().as(BatchItemFailures.class); + assertThat(respBody.batchItemFailures(), hasSize(1)); + assertThat(respBody.batchItemFailures().stream().map(ItemFailure::itemIdentifier).toList(), hasItem("1")); + } + + @Test + public void should_return_no_failures_if_processing_pipes_is_ok() { + // given + var body = getData("pipes-ok.json"); + + // when + var response = RestAssured.given().contentType("application/json") + .body(body) + .post("/"); + + // then + var respBody = response.then().statusCode(200) + .extract().body().as(BatchItemFailures.class); + assertThat(respBody.batchItemFailures(), is(empty())); + } + + @Test + public void should_return_one_failure_if_processing_pipes_fails() { + // given + var body = getData("pipes-fail.json"); + + // when + var response = RestAssured.given().contentType("application/json") + .body(body) + .post("/"); + + // then + var respBody = response.then().statusCode(200) + .extract().body().as(BatchItemFailures.class); + assertThat(respBody.batchItemFailures(), hasSize(1)); + assertThat(respBody.batchItemFailures().stream().map(ItemFailure::itemIdentifier).toList(), hasItem("1")); + } +} diff --git a/extensions/funqy/funqy-amazon-lambda/deployment/src/test/java/io/quarkus/funqy/test/SqsFunctionNoBatchItemFailuresTest.java b/extensions/funqy/funqy-amazon-lambda/deployment/src/test/java/io/quarkus/funqy/test/SqsFunctionNoBatchItemFailuresTest.java new file mode 100644 index 00000000000000..89bf3e585587d5 --- /dev/null +++ b/extensions/funqy/funqy-amazon-lambda/deployment/src/test/java/io/quarkus/funqy/test/SqsFunctionNoBatchItemFailuresTest.java @@ -0,0 +1,48 @@ +package io.quarkus.funqy.test; + +import static io.quarkus.funqy.test.util.EventDataProvider.getData; + +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.extension.RegisterExtension; + +import io.quarkus.funqy.test.util.EventDataProvider; +import io.quarkus.test.QuarkusUnitTest; +import io.restassured.RestAssured; + +public class SqsFunctionNoBatchItemFailuresTest { + @RegisterExtension + static QuarkusUnitTest test = new QuarkusUnitTest() + .withApplicationRoot((jar) -> jar + .addAsResource("no-batch-function.properties", "application.properties") + .addAsResource("events/sqs", "events") + .addClasses(TestFunctions.class, Item.class, + EventDataProvider.class)); + + @Test + public void should_return_no_failures_if_processing_is_ok() { + // given + var body = getData("ok.json"); + + // when + var response = RestAssured.given().contentType("application/json") + .body(body) + .post("/"); + + // then + response.then().statusCode(204); + } + + @Test + public void should_return_one_failure_if_processing_fails() { + // given + var body = getData("fail.json"); + + // when + var response = RestAssured.given().contentType("application/json") + .body(body) + .post("/"); + + // then + response.then().statusCode(204); + } +} diff --git a/extensions/funqy/funqy-amazon-lambda/deployment/src/test/java/io/quarkus/funqy/test/SqsFunctionTest.java b/extensions/funqy/funqy-amazon-lambda/deployment/src/test/java/io/quarkus/funqy/test/SqsFunctionTest.java new file mode 100644 index 00000000000000..d7a6c367ea5e54 --- /dev/null +++ b/extensions/funqy/funqy-amazon-lambda/deployment/src/test/java/io/quarkus/funqy/test/SqsFunctionTest.java @@ -0,0 +1,94 @@ +package io.quarkus.funqy.test; + +import static io.quarkus.funqy.test.util.EventDataProvider.getData; +import static org.hamcrest.MatcherAssert.assertThat; +import static org.hamcrest.Matchers.empty; +import static org.hamcrest.Matchers.hasItem; +import static org.hamcrest.Matchers.hasSize; +import static org.hamcrest.Matchers.is; + +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.extension.RegisterExtension; + +import io.quarkus.funqy.test.model.BatchItemFailures; +import io.quarkus.funqy.test.model.ItemFailure; +import io.quarkus.funqy.test.util.EventDataProvider; +import io.quarkus.test.QuarkusUnitTest; +import io.restassured.RestAssured; + +public class SqsFunctionTest { + @RegisterExtension + static QuarkusUnitTest test = new QuarkusUnitTest() + .withApplicationRoot((jar) -> jar + .addAsResource("item-function.properties", "application.properties") + .addAsResource("events/sqs", "events") + .addClasses(TestFunctions.class, Item.class, + BatchItemFailures.class, ItemFailure.class, + EventDataProvider.class)); + + @Test + public void should_return_no_failures_if_processing_is_ok() { + // given + var body = getData("ok.json"); + + // when + var response = RestAssured.given().contentType("application/json") + .body(body) + .post("/"); + + // then + var respBody = response.then().statusCode(200) + .extract().body().as(BatchItemFailures.class); + assertThat(respBody.batchItemFailures(), is(empty())); + } + + @Test + public void should_return_one_failure_if_processing_fails() { + // given + var body = getData("fail.json"); + + // when + var response = RestAssured.given().contentType("application/json") + .body(body) + .post("/"); + + // then + var respBody = response.then().statusCode(200) + .extract().body().as(BatchItemFailures.class); + assertThat(respBody.batchItemFailures(), hasSize(1)); + assertThat(respBody.batchItemFailures().stream().map(ItemFailure::itemIdentifier).toList(), hasItem("1")); + } + + @Test + public void should_return_no_failures_if_processing_pipes_is_ok() { + // given + var body = getData("pipes-ok.json"); + + // when + var response = RestAssured.given().contentType("application/json") + .body(body) + .post("/"); + + // then + var respBody = response.then().statusCode(200) + .extract().body().as(BatchItemFailures.class); + assertThat(respBody.batchItemFailures(), is(empty())); + } + + @Test + public void should_return_one_failure_if_processing_pipes_fails() { + // given + var body = getData("pipes-fail.json"); + + // when + var response = RestAssured.given().contentType("application/json") + .body(body) + .post("/"); + + // then + var respBody = response.then().statusCode(200) + .extract().body().as(BatchItemFailures.class); + assertThat(respBody.batchItemFailures(), hasSize(1)); + assertThat(respBody.batchItemFailures().stream().map(ItemFailure::itemIdentifier).toList(), hasItem("1")); + } +} diff --git a/extensions/funqy/funqy-amazon-lambda/deployment/src/test/java/io/quarkus/funqy/test/TestFunctions.java b/extensions/funqy/funqy-amazon-lambda/deployment/src/test/java/io/quarkus/funqy/test/TestFunctions.java new file mode 100644 index 00000000000000..f8cb6f8582e292 --- /dev/null +++ b/extensions/funqy/funqy-amazon-lambda/deployment/src/test/java/io/quarkus/funqy/test/TestFunctions.java @@ -0,0 +1,63 @@ +package io.quarkus.funqy.test; + +import java.nio.charset.StandardCharsets; + +import com.amazonaws.services.lambda.runtime.events.DynamodbEvent; +import com.amazonaws.services.lambda.runtime.events.KinesisEvent; +import com.amazonaws.services.lambda.runtime.events.SNSEvent; +import com.amazonaws.services.lambda.runtime.events.SQSEvent; + +import io.cloudevents.CloudEvent; +import io.quarkus.funqy.Funq; +import io.smallrye.mutiny.Uni; + +public class TestFunctions { + + @Funq("item-function") + public Uni itemFunction(Item item) { + if (item.isThrowError()) { + return Uni.createFrom().failure(new IllegalArgumentException("This is an expected error.")); + } + return Uni.createFrom().voidItem(); + } + + @Funq("sqs-function") + public Uni sqsFunction(SQSEvent.SQSMessage msg) { + if (msg.getBody().contains("true")) { + return Uni.createFrom().failure(new IllegalArgumentException("This is an expected error.")); + } + return Uni.createFrom().voidItem(); + } + + @Funq("sns-function") + public Uni snsFunction(SNSEvent.SNSRecord msg) { + if (msg.getSNS().getMessage().contains("true")) { + return Uni.createFrom().failure(new IllegalArgumentException("This is an expected error.")); + } + return Uni.createFrom().voidItem(); + } + + @Funq("cloudevents-function") + public Uni cloudEventsFunction(CloudEvent msg) { + if (new String(msg.getData().toBytes(), StandardCharsets.UTF_8).contains("true")) { + return Uni.createFrom().failure(new IllegalArgumentException("This is an expected error.")); + } + return Uni.createFrom().voidItem(); + } + + @Funq("kinesis-function") + public Uni kinesisFunction(KinesisEvent.Record msg) { + if (StandardCharsets.UTF_8.decode(msg.getData()).toString().contains("true")) { + return Uni.createFrom().failure(new IllegalArgumentException("This is an expected error.")); + } + return Uni.createFrom().voidItem(); + } + + @Funq("dynamodb-function") + public Uni dynamodbFunction(DynamodbEvent.DynamodbStreamRecord msg) { + if (msg.getDynamodb().getNewImage().get("ThrowError").getBOOL()) { + return Uni.createFrom().failure(new IllegalArgumentException("This is an expected error.")); + } + return Uni.createFrom().voidItem(); + } +} diff --git a/extensions/funqy/funqy-amazon-lambda/deployment/src/test/java/io/quarkus/funqy/test/model/BatchItemFailures.java b/extensions/funqy/funqy-amazon-lambda/deployment/src/test/java/io/quarkus/funqy/test/model/BatchItemFailures.java new file mode 100644 index 00000000000000..4914a34a5f8017 --- /dev/null +++ b/extensions/funqy/funqy-amazon-lambda/deployment/src/test/java/io/quarkus/funqy/test/model/BatchItemFailures.java @@ -0,0 +1,6 @@ +package io.quarkus.funqy.test.model; + +import java.util.List; + +public record BatchItemFailures(List batchItemFailures) { +} diff --git a/extensions/funqy/funqy-amazon-lambda/deployment/src/test/java/io/quarkus/funqy/test/model/ItemFailure.java b/extensions/funqy/funqy-amazon-lambda/deployment/src/test/java/io/quarkus/funqy/test/model/ItemFailure.java new file mode 100644 index 00000000000000..bd08b653841e27 --- /dev/null +++ b/extensions/funqy/funqy-amazon-lambda/deployment/src/test/java/io/quarkus/funqy/test/model/ItemFailure.java @@ -0,0 +1,4 @@ +package io.quarkus.funqy.test.model; + +public record ItemFailure(String itemIdentifier) { +} diff --git a/extensions/funqy/funqy-amazon-lambda/deployment/src/test/java/io/quarkus/funqy/test/util/EventDataProvider.java b/extensions/funqy/funqy-amazon-lambda/deployment/src/test/java/io/quarkus/funqy/test/util/EventDataProvider.java new file mode 100644 index 00000000000000..17e1efadc5fe1c --- /dev/null +++ b/extensions/funqy/funqy-amazon-lambda/deployment/src/test/java/io/quarkus/funqy/test/util/EventDataProvider.java @@ -0,0 +1,19 @@ +package io.quarkus.funqy.test.util; + +import java.io.IOException; +import java.nio.charset.Charset; + +import org.apache.commons.io.IOUtils; + +public class EventDataProvider { + + public static String getData(String path) { + try { + return IOUtils.toString( + EventDataProvider.class.getClassLoader().getResourceAsStream("events/" + path), + Charset.defaultCharset()); + } catch (IOException e) { + throw new RuntimeException(e); + } + } +} diff --git a/extensions/funqy/funqy-amazon-lambda/deployment/src/test/resources/any-function.properties b/extensions/funqy/funqy-amazon-lambda/deployment/src/test/resources/any-function.properties new file mode 100644 index 00000000000000..e9be9dfb08bdd4 --- /dev/null +++ b/extensions/funqy/funqy-amazon-lambda/deployment/src/test/resources/any-function.properties @@ -0,0 +1 @@ +quarkus.funqy.export=item-function diff --git a/extensions/funqy/funqy-amazon-lambda/deployment/src/test/resources/events/any/fail.json b/extensions/funqy/funqy-amazon-lambda/deployment/src/test/resources/events/any/fail.json new file mode 100644 index 00000000000000..2ec250d2e81a18 --- /dev/null +++ b/extensions/funqy/funqy-amazon-lambda/deployment/src/test/resources/events/any/fail.json @@ -0,0 +1,4 @@ +{ + "message": "hello", + "throwError": true +} \ No newline at end of file diff --git a/extensions/funqy/funqy-amazon-lambda/deployment/src/test/resources/events/any/ok.json b/extensions/funqy/funqy-amazon-lambda/deployment/src/test/resources/events/any/ok.json new file mode 100644 index 00000000000000..47ae6aedbde1dd --- /dev/null +++ b/extensions/funqy/funqy-amazon-lambda/deployment/src/test/resources/events/any/ok.json @@ -0,0 +1,4 @@ +{ + "message": "hello", + "throwError": false +} \ No newline at end of file diff --git a/extensions/funqy/funqy-amazon-lambda/deployment/src/test/resources/events/cloudevents/fail.json b/extensions/funqy/funqy-amazon-lambda/deployment/src/test/resources/events/cloudevents/fail.json new file mode 100644 index 00000000000000..62f32e16345e37 --- /dev/null +++ b/extensions/funqy/funqy-amazon-lambda/deployment/src/test/resources/events/cloudevents/fail.json @@ -0,0 +1,26 @@ +[ + { + "specversion" : "1.0", + "type" : "com.github.pull_request.opened", + "source" : "https://github.com/cloudevents/spec/pull", + "subject" : "123", + "id" : "1", + "time" : "2018-04-05T17:31:00Z", + "comexampleextension1" : "value", + "comexampleothervalue" : 5, + "datacontenttype" : "text/plain", + "data" : "{\"message\":\"hello\",\"throwError\":true}" + }, + { + "specversion" : "1.0", + "type" : "com.github.pull_request.opened", + "source" : "https://github.com/cloudevents/spec/pull", + "subject" : "123", + "id" : "2", + "time" : "2018-04-05T17:31:00Z", + "comexampleextension1" : "value", + "comexampleothervalue" : 5, + "datacontenttype" : "text/plain", + "data" : "{\"message\":\"fail\",\"throwError\":false}" + } +] \ No newline at end of file diff --git a/extensions/funqy/funqy-amazon-lambda/deployment/src/test/resources/events/cloudevents/ok.json b/extensions/funqy/funqy-amazon-lambda/deployment/src/test/resources/events/cloudevents/ok.json new file mode 100644 index 00000000000000..e49c9ddcece017 --- /dev/null +++ b/extensions/funqy/funqy-amazon-lambda/deployment/src/test/resources/events/cloudevents/ok.json @@ -0,0 +1,41 @@ +[ + { + "specversion" : "1.0", + "type" : "com.github.pull_request.opened", + "source" : "https://github.com/cloudevents/spec/pull", + "subject" : "123", + "id" : "1", + "time" : "2018-04-05T17:31:00Z", + "comexampleextension1" : "value", + "comexampleothervalue" : 5, + "datacontenttype" : "text/plain", + "data" : "{\"message\":\"hello\",\"throwError\":false}" + }, + { + "specversion" : "1.0", + "type" : "com.github.pull_request.opened", + "source" : "https://github.com/cloudevents/spec/pull", + "subject" : "123", + "id" : "2", + "time" : "2018-04-05T17:31:00Z", + "comexampleextension1" : "value", + "comexampleothervalue" : 5, + "datacontenttype" : "application/json", + "data" : { + "message": "ok", + "throwError": false + } + }, + { + "specversion" : "1.0", + "type" : "com.github.pull_request.opened", + "source" : "https://github.com/cloudevents/spec/pull", + "subject" : "123", + "id" : "3", + "time" : "2018-04-05T17:31:00Z", + "comexampleextension1" : "value", + "comexampleothervalue" : 5, + "datacontenttype" : "text/xml", + "data_base64" : "eyJtZXNzYWdlIjoiZmFpbCIsInRocm93RXJyb3IiOmZhbHNlfQ==" + } +] \ No newline at end of file diff --git a/extensions/funqy/funqy-amazon-lambda/deployment/src/test/resources/events/dynamodb/fail.json b/extensions/funqy/funqy-amazon-lambda/deployment/src/test/resources/events/dynamodb/fail.json new file mode 100644 index 00000000000000..9bb03b7bc14898 --- /dev/null +++ b/extensions/funqy/funqy-amazon-lambda/deployment/src/test/resources/events/dynamodb/fail.json @@ -0,0 +1,62 @@ +{ + "Records": [ + { + "eventID": "1", + "eventVersion": "1.0", + "dynamodb": { + "Keys": { + "Id": { + "N": "1" + } + }, + "NewImage": { + "Message": { + "S": "hello" + }, + "ThrowError": { + "BOOL": true + }, + "Id": { + "N": "1" + } + }, + "StreamViewType": "NEW_AND_OLD_IMAGES", + "SequenceNumber": "1", + "SizeBytes": 26 + }, + "awsRegion": "us-west-2", + "eventName": "INSERT", + "eventSourceARN": "arn:aws:dynamodb:us-east-2:123456789012:table/my-table/stream/2023-06-10T19:26:16.525", + "eventSource": "aws:dynamodb" + }, + { + "eventID": "2", + "eventVersion": "1.0", + "dynamodb": { + "NewImage": { + "Message": { + "S": "fail" + }, + "ThrowError": { + "BOOL": false + }, + "Id": { + "N": "2" + } + }, + "SequenceNumber": "2", + "Keys": { + "Id": { + "N": "101" + } + }, + "SizeBytes": 59, + "StreamViewType": "NEW_AND_OLD_IMAGES" + }, + "awsRegion": "us-west-2", + "eventName": "INSERT", + "eventSourceARN": "arn:aws:dynamodb:us-east-2:123456789012:table/my-table/stream/2023-06-10T19:26:16.525", + "eventSource": "aws:dynamodb" + } + ] +} \ No newline at end of file diff --git a/extensions/funqy/funqy-amazon-lambda/deployment/src/test/resources/events/dynamodb/ok.json b/extensions/funqy/funqy-amazon-lambda/deployment/src/test/resources/events/dynamodb/ok.json new file mode 100644 index 00000000000000..4cc6db92f700e2 --- /dev/null +++ b/extensions/funqy/funqy-amazon-lambda/deployment/src/test/resources/events/dynamodb/ok.json @@ -0,0 +1,62 @@ +{ + "Records": [ + { + "eventID": "1", + "eventVersion": "1.0", + "dynamodb": { + "Keys": { + "Id": { + "N": "1" + } + }, + "NewImage": { + "Message": { + "S": "hello" + }, + "ThrowError": { + "BOOL": false + }, + "Id": { + "N": "1" + } + }, + "StreamViewType": "NEW_AND_OLD_IMAGES", + "SequenceNumber": "1", + "SizeBytes": 26 + }, + "awsRegion": "us-west-2", + "eventName": "INSERT", + "eventSourceARN": "arn:aws:dynamodb:us-east-2:123456789012:table/my-table/stream/2023-06-10T19:26:16.525", + "eventSource": "aws:dynamodb" + }, + { + "eventID": "2", + "eventVersion": "1.0", + "dynamodb": { + "NewImage": { + "Message": { + "S": "fail" + }, + "ThrowError": { + "BOOL": false + }, + "Id": { + "N": "2" + } + }, + "SequenceNumber": "2", + "Keys": { + "Id": { + "N": "101" + } + }, + "SizeBytes": 59, + "StreamViewType": "NEW_AND_OLD_IMAGES" + }, + "awsRegion": "us-west-2", + "eventName": "INSERT", + "eventSourceARN": "arn:aws:dynamodb:us-east-2:123456789012:table/my-table/stream/2023-06-10T19:26:16.525", + "eventSource": "aws:dynamodb" + } + ] +} \ No newline at end of file diff --git a/extensions/funqy/funqy-amazon-lambda/deployment/src/test/resources/events/dynamodb/pipes-fail.json b/extensions/funqy/funqy-amazon-lambda/deployment/src/test/resources/events/dynamodb/pipes-fail.json new file mode 100644 index 00000000000000..98d791b29e8b59 --- /dev/null +++ b/extensions/funqy/funqy-amazon-lambda/deployment/src/test/resources/events/dynamodb/pipes-fail.json @@ -0,0 +1,60 @@ +[ + { + "eventID": "1", + "eventVersion": "1.0", + "dynamodb": { + "Keys": { + "Id": { + "N": "1" + } + }, + "NewImage": { + "Message": { + "S": "hello" + }, + "ThrowError": { + "BOOL": true + }, + "Id": { + "N": "1" + } + }, + "StreamViewType": "NEW_AND_OLD_IMAGES", + "SequenceNumber": "1", + "SizeBytes": 26 + }, + "awsRegion": "us-west-2", + "eventName": "INSERT", + "eventSourceARN": "arn:aws:dynamodb:us-east-2:123456789012:table/my-table/stream/2023-06-10T19:26:16.525", + "eventSource": "aws:dynamodb" + }, + { + "eventID": "2", + "eventVersion": "1.0", + "dynamodb": { + "NewImage": { + "Message": { + "S": "fail" + }, + "ThrowError": { + "BOOL": false + }, + "Id": { + "N": "2" + } + }, + "SequenceNumber": "2", + "Keys": { + "Id": { + "N": "101" + } + }, + "SizeBytes": 59, + "StreamViewType": "NEW_AND_OLD_IMAGES" + }, + "awsRegion": "us-west-2", + "eventName": "INSERT", + "eventSourceARN": "arn:aws:dynamodb:us-east-2:123456789012:table/my-table/stream/2023-06-10T19:26:16.525", + "eventSource": "aws:dynamodb" + } +] \ No newline at end of file diff --git a/extensions/funqy/funqy-amazon-lambda/deployment/src/test/resources/events/dynamodb/pipes-ok.json b/extensions/funqy/funqy-amazon-lambda/deployment/src/test/resources/events/dynamodb/pipes-ok.json new file mode 100644 index 00000000000000..d0d3816b1496c7 --- /dev/null +++ b/extensions/funqy/funqy-amazon-lambda/deployment/src/test/resources/events/dynamodb/pipes-ok.json @@ -0,0 +1,60 @@ +[ + { + "eventID": "1", + "eventVersion": "1.0", + "dynamodb": { + "Keys": { + "Id": { + "N": "1" + } + }, + "NewImage": { + "Message": { + "S": "hello" + }, + "ThrowError": { + "BOOL": false + }, + "Id": { + "N": "1" + } + }, + "StreamViewType": "NEW_AND_OLD_IMAGES", + "SequenceNumber": "1", + "SizeBytes": 26 + }, + "awsRegion": "us-west-2", + "eventName": "INSERT", + "eventSourceARN": "arn:aws:dynamodb:us-east-2:123456789012:table/my-table/stream/2023-06-10T19:26:16.525", + "eventSource": "aws:dynamodb" + }, + { + "eventID": "2", + "eventVersion": "1.0", + "dynamodb": { + "NewImage": { + "Message": { + "S": "fail" + }, + "ThrowError": { + "BOOL": false + }, + "Id": { + "N": "2" + } + }, + "SequenceNumber": "2", + "Keys": { + "Id": { + "N": "101" + } + }, + "SizeBytes": 59, + "StreamViewType": "NEW_AND_OLD_IMAGES" + }, + "awsRegion": "us-west-2", + "eventName": "INSERT", + "eventSourceARN": "arn:aws:dynamodb:us-east-2:123456789012:table/my-table/stream/2023-06-10T19:26:16.525", + "eventSource": "aws:dynamodb" + } +] \ No newline at end of file diff --git a/extensions/funqy/funqy-amazon-lambda/deployment/src/test/resources/events/kinesis/fail.json b/extensions/funqy/funqy-amazon-lambda/deployment/src/test/resources/events/kinesis/fail.json new file mode 100644 index 00000000000000..061f8dd6b880b3 --- /dev/null +++ b/extensions/funqy/funqy-amazon-lambda/deployment/src/test/resources/events/kinesis/fail.json @@ -0,0 +1,36 @@ +{ + "Records": [ + { + "kinesis": { + "kinesisSchemaVersion": "1.0", + "partitionKey": "1", + "sequenceNumber": "1", + "data": "eyJtZXNzYWdlIjoiaGVsbG8iLCJ0aHJvd0Vycm9yIjp0cnVlfQ==", + "approximateArrivalTimestamp": 1545084650.987 + }, + "eventSource": "aws:kinesis", + "eventVersion": "1.0", + "eventID": "shardId-000000000006:49590338271490256608559692538361571095921575989136588898", + "eventName": "aws:kinesis:record", + "invokeIdentityArn": "arn:aws:iam::123456789012:role/lambda-role", + "awsRegion": "us-east-2", + "eventSourceARN": "arn:aws:kinesis:us-east-2:123456789012:stream/lambda-stream" + }, + { + "kinesis": { + "kinesisSchemaVersion": "1.0", + "partitionKey": "1", + "sequenceNumber": "2", + "data": "eyJtZXNzYWdlIjoiZmFpbCIsInRocm93RXJyb3IiOmZhbHNlfQ==", + "approximateArrivalTimestamp": 1545084711.166 + }, + "eventSource": "aws:kinesis", + "eventVersion": "1.0", + "eventID": "shardId-000000000006:49590338271490256608559692540925702759324208523137515618", + "eventName": "aws:kinesis:record", + "invokeIdentityArn": "arn:aws:iam::123456789012:role/lambda-role", + "awsRegion": "us-east-2", + "eventSourceARN": "arn:aws:kinesis:us-east-2:123456789012:stream/lambda-stream" + } + ] +} \ No newline at end of file diff --git a/extensions/funqy/funqy-amazon-lambda/deployment/src/test/resources/events/kinesis/ok.json b/extensions/funqy/funqy-amazon-lambda/deployment/src/test/resources/events/kinesis/ok.json new file mode 100644 index 00000000000000..c8550929724b15 --- /dev/null +++ b/extensions/funqy/funqy-amazon-lambda/deployment/src/test/resources/events/kinesis/ok.json @@ -0,0 +1,36 @@ +{ + "Records": [ + { + "kinesis": { + "kinesisSchemaVersion": "1.0", + "partitionKey": "1", + "sequenceNumber": "1", + "data": "eyJtZXNzYWdlIjoiaGVsbG8iLCJ0aHJvd0Vycm9yIjpmYWxzZX0=", + "approximateArrivalTimestamp": 1545084650.987 + }, + "eventSource": "aws:kinesis", + "eventVersion": "1.0", + "eventID": "shardId-000000000006:49590338271490256608559692538361571095921575989136588898", + "eventName": "aws:kinesis:record", + "invokeIdentityArn": "arn:aws:iam::123456789012:role/lambda-role", + "awsRegion": "us-east-2", + "eventSourceARN": "arn:aws:kinesis:us-east-2:123456789012:stream/lambda-stream" + }, + { + "kinesis": { + "kinesisSchemaVersion": "1.0", + "partitionKey": "1", + "sequenceNumber": "2", + "data": "eyJtZXNzYWdlIjoiZmFpbCIsInRocm93RXJyb3IiOmZhbHNlfQ==", + "approximateArrivalTimestamp": 1545084711.166 + }, + "eventSource": "aws:kinesis", + "eventVersion": "1.0", + "eventID": "shardId-000000000006:49590338271490256608559692540925702759324208523137515618", + "eventName": "aws:kinesis:record", + "invokeIdentityArn": "arn:aws:iam::123456789012:role/lambda-role", + "awsRegion": "us-east-2", + "eventSourceARN": "arn:aws:kinesis:us-east-2:123456789012:stream/lambda-stream" + } + ] +} \ No newline at end of file diff --git a/extensions/funqy/funqy-amazon-lambda/deployment/src/test/resources/events/kinesis/pipes-fail.json b/extensions/funqy/funqy-amazon-lambda/deployment/src/test/resources/events/kinesis/pipes-fail.json new file mode 100644 index 00000000000000..3cc956e1cee0d7 --- /dev/null +++ b/extensions/funqy/funqy-amazon-lambda/deployment/src/test/resources/events/kinesis/pipes-fail.json @@ -0,0 +1,30 @@ +[ + { + "kinesisSchemaVersion": "1.0", + "partitionKey": "1", + "sequenceNumber": "1", + "data": "eyJtZXNzYWdlIjoiaGVsbG8iLCJ0aHJvd0Vycm9yIjp0cnVlfQ==", + "approximateArrivalTimestamp": 1545084650.987, + "eventSource": "aws:kinesis", + "eventVersion": "1.0", + "eventID": "shardId-000000000006:49590338271490256608559692538361571095921575989136588898", + "eventName": "aws:kinesis:record", + "invokeIdentityArn": "arn:aws:iam::123456789012:role/lambda-role", + "awsRegion": "us-east-2", + "eventSourceARN": "arn:aws:kinesis:us-east-2:123456789012:stream/lambda-stream" + }, + { + "kinesisSchemaVersion": "1.0", + "partitionKey": "1", + "sequenceNumber": "2", + "data": "eyJtZXNzYWdlIjoiZmFpbCIsInRocm93RXJyb3IiOmZhbHNlfQ==", + "approximateArrivalTimestamp": 1545084711.166, + "eventSource": "aws:kinesis", + "eventVersion": "1.0", + "eventID": "shardId-000000000006:49590338271490256608559692540925702759324208523137515618", + "eventName": "aws:kinesis:record", + "invokeIdentityArn": "arn:aws:iam::123456789012:role/lambda-role", + "awsRegion": "us-east-2", + "eventSourceARN": "arn:aws:kinesis:us-east-2:123456789012:stream/lambda-stream" + } +] \ No newline at end of file diff --git a/extensions/funqy/funqy-amazon-lambda/deployment/src/test/resources/events/kinesis/pipes-ok.json b/extensions/funqy/funqy-amazon-lambda/deployment/src/test/resources/events/kinesis/pipes-ok.json new file mode 100644 index 00000000000000..1a40ebe10175d3 --- /dev/null +++ b/extensions/funqy/funqy-amazon-lambda/deployment/src/test/resources/events/kinesis/pipes-ok.json @@ -0,0 +1,30 @@ +[ + { + "kinesisSchemaVersion": "1.0", + "partitionKey": "1", + "sequenceNumber": "1", + "data": "eyJtZXNzYWdlIjoiaGVsbG8iLCJ0aHJvd0Vycm9yIjpmYWxzZX0=", + "approximateArrivalTimestamp": 1545084650.987, + "eventSource": "aws:kinesis", + "eventVersion": "1.0", + "eventID": "shardId-000000000006:49590338271490256608559692538361571095921575989136588898", + "eventName": "aws:kinesis:record", + "invokeIdentityArn": "arn:aws:iam::123456789012:role/lambda-role", + "awsRegion": "us-east-2", + "eventSourceARN": "arn:aws:kinesis:us-east-2:123456789012:stream/lambda-stream" + }, + { + "kinesisSchemaVersion": "1.0", + "partitionKey": "1", + "sequenceNumber": "2", + "data": "eyJtZXNzYWdlIjoiZmFpbCIsInRocm93RXJyb3IiOmZhbHNlfQ==", + "approximateArrivalTimestamp": 1545084711.166, + "eventSource": "aws:kinesis", + "eventVersion": "1.0", + "eventID": "shardId-000000000006:49590338271490256608559692540925702759324208523137515618", + "eventName": "aws:kinesis:record", + "invokeIdentityArn": "arn:aws:iam::123456789012:role/lambda-role", + "awsRegion": "us-east-2", + "eventSourceARN": "arn:aws:kinesis:us-east-2:123456789012:stream/lambda-stream" + } +] \ No newline at end of file diff --git a/extensions/funqy/funqy-amazon-lambda/deployment/src/test/resources/events/sns/fail.json b/extensions/funqy/funqy-amazon-lambda/deployment/src/test/resources/events/sns/fail.json new file mode 100644 index 00000000000000..4643568fdfe33a --- /dev/null +++ b/extensions/funqy/funqy-amazon-lambda/deployment/src/test/resources/events/sns/fail.json @@ -0,0 +1,31 @@ +{ + "Records": [ + { + "EventVersion": "1.0", + "EventSubscriptionArn": "arn:aws:sns:us-east-1:123456789012:sns-lambda:21be56ed-a058-49f5-8c98-aedd2564c486", + "EventSource": "aws:sns", + "Sns": { + "SignatureVersion": "1", + "Timestamp": "2019-01-02T12:45:07.000Z", + "Signature": "tcc6faL2yUC6dgZdmrwh1Y4cGa/ebXEkAi6RibDsvpi+tE/1+82j...65r==", + "SigningCertURL": "https://sns.us-east-1.amazonaws.com/SimpleNotificationService-ac565b8b1a6c5d002d285f9598aa1d9b.pem", + "MessageId": "1", + "Message": "{\"message\":\"hello\",\"throwError\":true}", + "MessageAttributes": { + "Test": { + "Type": "String", + "Value": "TestString" + }, + "TestBinary": { + "Type": "Binary", + "Value": "TestBinary" + } + }, + "Type": "Notification", + "UnsubscribeURL": "https://sns.us-east-1.amazonaws.com/?Action=Unsubscribe&SubscriptionArn=arn:aws:sns:us-east-1:123456789012:test-lambda:21be56ed-a058-49f5-8c98-aedd2564c486", + "TopicArn":"arn:aws:sns:us-east-1:123456789012:sns-lambda", + "Subject": "TestInvoke" + } + } + ] +} \ No newline at end of file diff --git a/extensions/funqy/funqy-amazon-lambda/deployment/src/test/resources/events/sns/ok.json b/extensions/funqy/funqy-amazon-lambda/deployment/src/test/resources/events/sns/ok.json new file mode 100644 index 00000000000000..c97c6511cbab23 --- /dev/null +++ b/extensions/funqy/funqy-amazon-lambda/deployment/src/test/resources/events/sns/ok.json @@ -0,0 +1,31 @@ +{ + "Records": [ + { + "EventVersion": "1.0", + "EventSubscriptionArn": "arn:aws:sns:us-east-1:123456789012:sns-lambda:21be56ed-a058-49f5-8c98-aedd2564c486", + "EventSource": "aws:sns", + "Sns": { + "SignatureVersion": "1", + "Timestamp": "2019-01-02T12:45:07.000Z", + "Signature": "tcc6faL2yUC6dgZdmrwh1Y4cGa/ebXEkAi6RibDsvpi+tE/1+82j...65r==", + "SigningCertURL": "https://sns.us-east-1.amazonaws.com/SimpleNotificationService-ac565b8b1a6c5d002d285f9598aa1d9b.pem", + "MessageId": "1", + "Message": "{\"message\":\"hello\",\"throwError\":false}", + "MessageAttributes": { + "Test": { + "Type": "String", + "Value": "TestString" + }, + "TestBinary": { + "Type": "Binary", + "Value": "TestBinary" + } + }, + "Type": "Notification", + "UnsubscribeURL": "https://sns.us-east-1.amazonaws.com/?Action=Unsubscribe&SubscriptionArn=arn:aws:sns:us-east-1:123456789012:test-lambda:21be56ed-a058-49f5-8c98-aedd2564c486", + "TopicArn":"arn:aws:sns:us-east-1:123456789012:sns-lambda", + "Subject": "TestInvoke" + } + } + ] +} \ No newline at end of file diff --git a/extensions/funqy/funqy-amazon-lambda/deployment/src/test/resources/events/sqs/fail.json b/extensions/funqy/funqy-amazon-lambda/deployment/src/test/resources/events/sqs/fail.json new file mode 100644 index 00000000000000..b6b52c29187ec8 --- /dev/null +++ b/extensions/funqy/funqy-amazon-lambda/deployment/src/test/resources/events/sqs/fail.json @@ -0,0 +1,36 @@ +{ + "Records": [ + { + "messageId": "1", + "receiptHandle": "AQEBwJnKyrHigUMZj6rYigCgxlaS3SLy0a...", + "body": "{\"message\":\"hello\",\"throwError\":true}", + "attributes": { + "ApproximateReceiveCount": "1", + "SentTimestamp": "1545082649183", + "SenderId": "AIDAIENQZJOLO23YVJ4VO", + "ApproximateFirstReceiveTimestamp": "1545082649185" + }, + "messageAttributes": {}, + "md5OfBody": "e4e68fb7bd0e697a0ae8f1bb342846b3", + "eventSource": "aws:sqs", + "eventSourceARN": "arn:aws:sqs:us-east-2:123456789012:my-queue", + "awsRegion": "us-east-2" + }, + { + "messageId": "2", + "receiptHandle": "AQEBzWwaftRI0KuVm4tP+/7q1rGgNqicHq...", + "body": "{\"message\":\"fail\",\"throwError\":false}", + "attributes": { + "ApproximateReceiveCount": "1", + "SentTimestamp": "1545082650636", + "SenderId": "AIDAIENQZJOLO23YVJ4VO", + "ApproximateFirstReceiveTimestamp": "1545082650649" + }, + "messageAttributes": {}, + "md5OfBody": "e4e68fb7bd0e697a0ae8f1bb342846b3", + "eventSource": "aws:sqs", + "eventSourceARN": "arn:aws:sqs:us-east-2:123456789012:my-queue", + "awsRegion": "us-east-2" + } + ] +} \ No newline at end of file diff --git a/extensions/funqy/funqy-amazon-lambda/deployment/src/test/resources/events/sqs/ok.json b/extensions/funqy/funqy-amazon-lambda/deployment/src/test/resources/events/sqs/ok.json new file mode 100644 index 00000000000000..a5b3b937145053 --- /dev/null +++ b/extensions/funqy/funqy-amazon-lambda/deployment/src/test/resources/events/sqs/ok.json @@ -0,0 +1,36 @@ +{ + "Records": [ + { + "messageId": "1", + "receiptHandle": "AQEBwJnKyrHigUMZj6rYigCgxlaS3SLy0a...", + "body": "{\"message\":\"hello\",\"throwError\":false}", + "attributes": { + "ApproximateReceiveCount": "1", + "SentTimestamp": "1545082649183", + "SenderId": "AIDAIENQZJOLO23YVJ4VO", + "ApproximateFirstReceiveTimestamp": "1545082649185" + }, + "messageAttributes": {}, + "md5OfBody": "e4e68fb7bd0e697a0ae8f1bb342846b3", + "eventSource": "aws:sqs", + "eventSourceARN": "arn:aws:sqs:us-east-2:123456789012:my-queue", + "awsRegion": "us-east-2" + }, + { + "messageId": "2", + "receiptHandle": "AQEBzWwaftRI0KuVm4tP+/7q1rGgNqicHq...", + "body": "{\"message\":\"fail\",\"throwError\":false}", + "attributes": { + "ApproximateReceiveCount": "1", + "SentTimestamp": "1545082650636", + "SenderId": "AIDAIENQZJOLO23YVJ4VO", + "ApproximateFirstReceiveTimestamp": "1545082650649" + }, + "messageAttributes": {}, + "md5OfBody": "e4e68fb7bd0e697a0ae8f1bb342846b3", + "eventSource": "aws:sqs", + "eventSourceARN": "arn:aws:sqs:us-east-2:123456789012:my-queue", + "awsRegion": "us-east-2" + } + ] +} \ No newline at end of file diff --git a/extensions/funqy/funqy-amazon-lambda/deployment/src/test/resources/events/sqs/pipes-fail.json b/extensions/funqy/funqy-amazon-lambda/deployment/src/test/resources/events/sqs/pipes-fail.json new file mode 100644 index 00000000000000..76e02d9b27aaa2 --- /dev/null +++ b/extensions/funqy/funqy-amazon-lambda/deployment/src/test/resources/events/sqs/pipes-fail.json @@ -0,0 +1,34 @@ +[ + { + "messageId": "1", + "receiptHandle": "AQEBwJnKyrHigUMZj6rYigCgxlaS3SLy0a...", + "body": "{\"message\":\"hello\",\"throwError\":true}", + "attributes": { + "ApproximateReceiveCount": "1", + "SentTimestamp": "1545082649183", + "SenderId": "AIDAIENQZJOLO23YVJ4VO", + "ApproximateFirstReceiveTimestamp": "1545082649185" + }, + "messageAttributes": {}, + "md5OfBody": "e4e68fb7bd0e697a0ae8f1bb342846b3", + "eventSource": "aws:sqs", + "eventSourceARN": "arn:aws:sqs:us-east-2:123456789012:my-queue", + "awsRegion": "us-east-2" + }, + { + "messageId": "2", + "receiptHandle": "AQEBzWwaftRI0KuVm4tP+/7q1rGgNqicHq...", + "body": "{\"message\":\"fail\",\"throwError\":false}", + "attributes": { + "ApproximateReceiveCount": "1", + "SentTimestamp": "1545082650636", + "SenderId": "AIDAIENQZJOLO23YVJ4VO", + "ApproximateFirstReceiveTimestamp": "1545082650649" + }, + "messageAttributes": {}, + "md5OfBody": "e4e68fb7bd0e697a0ae8f1bb342846b3", + "eventSource": "aws:sqs", + "eventSourceARN": "arn:aws:sqs:us-east-2:123456789012:my-queue", + "awsRegion": "us-east-2" + } +] \ No newline at end of file diff --git a/extensions/funqy/funqy-amazon-lambda/deployment/src/test/resources/events/sqs/pipes-ok.json b/extensions/funqy/funqy-amazon-lambda/deployment/src/test/resources/events/sqs/pipes-ok.json new file mode 100644 index 00000000000000..83f6fb1449950a --- /dev/null +++ b/extensions/funqy/funqy-amazon-lambda/deployment/src/test/resources/events/sqs/pipes-ok.json @@ -0,0 +1,34 @@ +[ + { + "messageId": "1", + "receiptHandle": "AQEBwJnKyrHigUMZj6rYigCgxlaS3SLy0a...", + "body": "{\"message\":\"hello\",\"throwError\":false}", + "attributes": { + "ApproximateReceiveCount": "1", + "SentTimestamp": "1545082649183", + "SenderId": "AIDAIENQZJOLO23YVJ4VO", + "ApproximateFirstReceiveTimestamp": "1545082649185" + }, + "messageAttributes": {}, + "md5OfBody": "e4e68fb7bd0e697a0ae8f1bb342846b3", + "eventSource": "aws:sqs", + "eventSourceARN": "arn:aws:sqs:us-east-2:123456789012:my-queue", + "awsRegion": "us-east-2" + }, + { + "messageId": "2", + "receiptHandle": "AQEBzWwaftRI0KuVm4tP+/7q1rGgNqicHq...", + "body": "{\"message\":\"fail\",\"throwError\":false}", + "attributes": { + "ApproximateReceiveCount": "1", + "SentTimestamp": "1545082650636", + "SenderId": "AIDAIENQZJOLO23YVJ4VO", + "ApproximateFirstReceiveTimestamp": "1545082650649" + }, + "messageAttributes": {}, + "md5OfBody": "e4e68fb7bd0e697a0ae8f1bb342846b3", + "eventSource": "aws:sqs", + "eventSourceARN": "arn:aws:sqs:us-east-2:123456789012:my-queue", + "awsRegion": "us-east-2" + } +] \ No newline at end of file diff --git a/extensions/funqy/funqy-amazon-lambda/deployment/src/test/resources/item-function.properties b/extensions/funqy/funqy-amazon-lambda/deployment/src/test/resources/item-function.properties new file mode 100644 index 00000000000000..3e2957b10e52cc --- /dev/null +++ b/extensions/funqy/funqy-amazon-lambda/deployment/src/test/resources/item-function.properties @@ -0,0 +1,2 @@ +quarkus.funqy.export=item-function +quarkus.funqy.amazon-lambda.advanced-event-handling.enabled=true diff --git a/extensions/funqy/funqy-amazon-lambda/deployment/src/test/resources/no-batch-function.properties b/extensions/funqy/funqy-amazon-lambda/deployment/src/test/resources/no-batch-function.properties new file mode 100644 index 00000000000000..ca2d666955c8fa --- /dev/null +++ b/extensions/funqy/funqy-amazon-lambda/deployment/src/test/resources/no-batch-function.properties @@ -0,0 +1,3 @@ +quarkus.funqy.export=item-function +quarkus.funqy.amazon-lambda.advanced-event-handling.enabled=true +quarkus.funqy.amazon-lambda.advanced-event-handling.sqs.report-batch-item-failures=false diff --git a/extensions/funqy/funqy-amazon-lambda/runtime/pom.xml b/extensions/funqy/funqy-amazon-lambda/runtime/pom.xml index 4366bd5e63b3de..ccf8a40910d245 100644 --- a/extensions/funqy/funqy-amazon-lambda/runtime/pom.xml +++ b/extensions/funqy/funqy-amazon-lambda/runtime/pom.xml @@ -26,6 +26,20 @@ io.quarkus quarkus-jackson + + com.amazonaws + aws-lambda-java-events + + + joda-time + joda-time + + + + + io.cloudevents + cloudevents-api + diff --git a/extensions/funqy/funqy-amazon-lambda/runtime/src/main/java/io/quarkus/funqy/lambda/FunqyLambdaBindingRecorder.java b/extensions/funqy/funqy-amazon-lambda/runtime/src/main/java/io/quarkus/funqy/lambda/FunqyLambdaBindingRecorder.java index d2726940abd858..bb4fba814d0735 100644 --- a/extensions/funqy/funqy-amazon-lambda/runtime/src/main/java/io/quarkus/funqy/lambda/FunqyLambdaBindingRecorder.java +++ b/extensions/funqy/funqy-amazon-lambda/runtime/src/main/java/io/quarkus/funqy/lambda/FunqyLambdaBindingRecorder.java @@ -11,16 +11,21 @@ import com.fasterxml.jackson.databind.ObjectMapper; import com.fasterxml.jackson.databind.ObjectReader; import com.fasterxml.jackson.databind.ObjectWriter; +import com.fasterxml.jackson.databind.module.SimpleModule; import io.quarkus.amazon.lambda.runtime.AbstractLambdaPollLoop; import io.quarkus.amazon.lambda.runtime.AmazonLambdaContext; import io.quarkus.amazon.lambda.runtime.AmazonLambdaMapperRecorder; -import io.quarkus.amazon.lambda.runtime.JacksonInputReader; -import io.quarkus.amazon.lambda.runtime.JacksonOutputWriter; import io.quarkus.amazon.lambda.runtime.LambdaInputReader; import io.quarkus.amazon.lambda.runtime.LambdaOutputWriter; import io.quarkus.arc.ManagedContext; import io.quarkus.arc.runtime.BeanContainer; +import io.quarkus.funqy.lambda.config.FunqyAmazonBuildTimeConfig; +import io.quarkus.funqy.lambda.config.FunqyAmazonConfig; +import io.quarkus.funqy.lambda.event.AwsModule; +import io.quarkus.funqy.lambda.event.EventDeserializer; +import io.quarkus.funqy.lambda.event.EventProcessor; +import io.quarkus.funqy.lambda.model.FunqyMethod; import io.quarkus.funqy.runtime.FunctionConstructor; import io.quarkus.funqy.runtime.FunctionInvoker; import io.quarkus.funqy.runtime.FunctionRecorder; @@ -40,28 +45,39 @@ public class FunqyLambdaBindingRecorder { private static FunctionInvoker invoker; private static BeanContainer beanContainer; - private static LambdaInputReader reader; - private static LambdaOutputWriter writer; + private static EventProcessor eventProcessor; - public void init(BeanContainer bc) { + public void init(BeanContainer bc, FunqyAmazonBuildTimeConfig buildTimeConfig) { beanContainer = bc; FunctionConstructor.CONTAINER = bc; - ObjectMapper objectMapper = AmazonLambdaMapperRecorder.objectMapper; + // We create a copy, because we register a custom deserializer for everything. + ObjectMapper objectMapper = AmazonLambdaMapperRecorder.objectMapper.copy(); + EventDeserializer eventDeserializer = new EventDeserializer(buildTimeConfig); + final SimpleModule simpleModule = new AwsModule(); + simpleModule.addDeserializer(Object.class, eventDeserializer); + objectMapper.registerModule(simpleModule); + for (FunctionInvoker invoker : FunctionRecorder.registry.invokers()) { + ObjectReader reader = null; + JavaType javaInputType = null; if (invoker.hasInput()) { - JavaType javaInputType = objectMapper.constructType(invoker.getInputType()); - ObjectReader reader = objectMapper.readerFor(javaInputType); - invoker.getBindingContext().put(ObjectReader.class.getName(), reader); + javaInputType = objectMapper.constructType(invoker.getInputType()); + reader = objectMapper.readerFor(javaInputType); } + ObjectWriter writer = null; + JavaType javaOutputType = null; if (invoker.hasOutput()) { - JavaType javaOutputType = objectMapper.constructType(invoker.getOutputType()); - ObjectWriter writer = objectMapper.writerFor(javaOutputType); - invoker.getBindingContext().put(ObjectWriter.class.getName(), writer); + javaOutputType = objectMapper.constructType(invoker.getOutputType()); + writer = objectMapper.writerFor(javaOutputType); } + invoker.getBindingContext().put(EventProcessor.class.getName(), + new EventProcessor(objectMapper, eventDeserializer, + new FunqyMethod(reader, writer, javaInputType, javaOutputType), + buildTimeConfig)); } } - public void chooseInvoker(FunqyConfig config) { + public void chooseInvoker(FunqyConfig config, FunqyAmazonConfig amazonConfig) { // this is done at Runtime so that we can change it with an environment variable. if (config.export.isPresent()) { invoker = FunctionRecorder.registry.matchInvoker(config.export.get()); @@ -76,35 +92,24 @@ public void chooseInvoker(FunqyConfig config) { } else { invoker = FunctionRecorder.registry.invokers().iterator().next(); } - if (invoker.hasInput()) { - reader = new JacksonInputReader((ObjectReader) invoker.getBindingContext().get(ObjectReader.class.getName())); - } - if (invoker.hasOutput()) { - writer = new JacksonOutputWriter((ObjectWriter) invoker.getBindingContext().get(ObjectWriter.class.getName())); - } - + eventProcessor = (EventProcessor) invoker.getBindingContext().get(EventProcessor.class.getName()); + eventProcessor.init(amazonConfig); } /** * Called by JVM handler wrapper * * @param inputStream + * {@link InputStream} of the AWS SDK {@link com.amazonaws.services.lambda.runtime.RequestStreamHandler} * @param outputStream + * {@link OutputStream} of the AWS SDK {@link com.amazonaws.services.lambda.runtime.RequestStreamHandler} * @param context + * AWS context information provided to the Lambda * @throws IOException + * Is thrown in case the (de)serialization fails */ public static void handle(InputStream inputStream, OutputStream outputStream, Context context) throws IOException { - Object input = null; - if (invoker.hasInput()) { - input = reader.readValue(inputStream); - } - FunqyServerResponse response = dispatch(input); - - Object value = response.getOutput().await().indefinitely(); - if (value != null) { - writer.writeValue(outputStream, value); - } - + eventProcessor.handle(inputStream, outputStream, FunqyLambdaBindingRecorder::dispatch, context); } @SuppressWarnings("rawtypes") @@ -114,29 +119,28 @@ public void startPollLoop(ShutdownContext context, LaunchMode launchMode) { @Override protected Object processRequest(Object input, AmazonLambdaContext context) throws Exception { - FunqyServerResponse response = dispatch(input); - return response.getOutput().await().indefinitely(); + throw new RuntimeException("Unreachable"); } @Override protected LambdaInputReader getInputReader() { - return reader; + throw new RuntimeException("Unreachable"); } @Override protected LambdaOutputWriter getOutputWriter() { - return writer; + throw new RuntimeException("Unreachable"); } @Override protected boolean isStream() { - return false; + return true; } @Override protected void processRequest(InputStream input, OutputStream output, AmazonLambdaContext context) throws Exception { - throw new RuntimeException("Unreachable!"); + handle(input, output, context); } }; loop.startPollLoop(context); diff --git a/extensions/funqy/funqy-amazon-lambda/runtime/src/main/java/io/quarkus/funqy/lambda/config/AdvancedEventHandlingBuildTimeConfig.java b/extensions/funqy/funqy-amazon-lambda/runtime/src/main/java/io/quarkus/funqy/lambda/config/AdvancedEventHandlingBuildTimeConfig.java new file mode 100644 index 00000000000000..3ac6babc1c95e9 --- /dev/null +++ b/extensions/funqy/funqy-amazon-lambda/runtime/src/main/java/io/quarkus/funqy/lambda/config/AdvancedEventHandlingBuildTimeConfig.java @@ -0,0 +1,17 @@ +package io.quarkus.funqy.lambda.config; + +import io.quarkus.runtime.annotations.ConfigGroup; +import io.smallrye.config.WithDefault; + +/** + * Advanced event handling build time configuration + */ +@ConfigGroup +public interface AdvancedEventHandlingBuildTimeConfig { + + /** + * If advanced event handling should be enabled + */ + @WithDefault("false") + boolean enabled(); +} diff --git a/extensions/funqy/funqy-amazon-lambda/runtime/src/main/java/io/quarkus/funqy/lambda/config/AdvancedEventHandlingConfig.java b/extensions/funqy/funqy-amazon-lambda/runtime/src/main/java/io/quarkus/funqy/lambda/config/AdvancedEventHandlingConfig.java new file mode 100644 index 00000000000000..e5ca3b0afacac9 --- /dev/null +++ b/extensions/funqy/funqy-amazon-lambda/runtime/src/main/java/io/quarkus/funqy/lambda/config/AdvancedEventHandlingConfig.java @@ -0,0 +1,30 @@ +package io.quarkus.funqy.lambda.config; + +import io.quarkus.runtime.annotations.ConfigGroup; + +/** + * Advanced event handling configuration + */ +@ConfigGroup +public interface AdvancedEventHandlingConfig { + + /** + * Sqs related config. + */ + Sqs sqs(); + + /** + * Sns related config. + */ + Sns sns(); + + /** + * Kinesis related config. + */ + Kinesis kinesis(); + + /** + * DynamoDb related config. + */ + DynamoDb dynamoDb(); +} diff --git a/extensions/funqy/funqy-amazon-lambda/runtime/src/main/java/io/quarkus/funqy/lambda/config/DynamoDb.java b/extensions/funqy/funqy-amazon-lambda/runtime/src/main/java/io/quarkus/funqy/lambda/config/DynamoDb.java new file mode 100644 index 00000000000000..6d182d725670bd --- /dev/null +++ b/extensions/funqy/funqy-amazon-lambda/runtime/src/main/java/io/quarkus/funqy/lambda/config/DynamoDb.java @@ -0,0 +1,17 @@ +package io.quarkus.funqy.lambda.config; + +import io.quarkus.runtime.annotations.ConfigGroup; +import io.smallrye.config.WithDefault; + +/** + * Kinesis event config + */ +@ConfigGroup +public interface DynamoDb { + + /** + * Allows functions to return partially successful responses for a batch of event records. + */ + @WithDefault("true") + boolean reportBatchItemFailures(); +} diff --git a/extensions/funqy/funqy-amazon-lambda/runtime/src/main/java/io/quarkus/funqy/lambda/config/FunqyAmazonBuildTimeConfig.java b/extensions/funqy/funqy-amazon-lambda/runtime/src/main/java/io/quarkus/funqy/lambda/config/FunqyAmazonBuildTimeConfig.java new file mode 100644 index 00000000000000..94d8c2da860bfb --- /dev/null +++ b/extensions/funqy/funqy-amazon-lambda/runtime/src/main/java/io/quarkus/funqy/lambda/config/FunqyAmazonBuildTimeConfig.java @@ -0,0 +1,17 @@ +package io.quarkus.funqy.lambda.config; + +import io.quarkus.runtime.annotations.ConfigPhase; +import io.quarkus.runtime.annotations.ConfigRoot; +import io.smallrye.config.ConfigMapping; +import io.smallrye.config.WithName; + +@ConfigMapping(prefix = "quarkus.funqy.amazon-lambda") +@ConfigRoot(phase = ConfigPhase.BUILD_AND_RUN_TIME_FIXED) +public interface FunqyAmazonBuildTimeConfig { + + /** + * The advanced event handling config + */ + @WithName("advanced-event-handling") + AdvancedEventHandlingBuildTimeConfig advancedEventHandling(); +} diff --git a/extensions/funqy/funqy-amazon-lambda/runtime/src/main/java/io/quarkus/funqy/lambda/config/FunqyAmazonConfig.java b/extensions/funqy/funqy-amazon-lambda/runtime/src/main/java/io/quarkus/funqy/lambda/config/FunqyAmazonConfig.java new file mode 100644 index 00000000000000..ede409c6e90d1a --- /dev/null +++ b/extensions/funqy/funqy-amazon-lambda/runtime/src/main/java/io/quarkus/funqy/lambda/config/FunqyAmazonConfig.java @@ -0,0 +1,17 @@ +package io.quarkus.funqy.lambda.config; + +import io.quarkus.runtime.annotations.ConfigPhase; +import io.quarkus.runtime.annotations.ConfigRoot; +import io.smallrye.config.ConfigMapping; +import io.smallrye.config.WithName; + +@ConfigMapping(prefix = "quarkus.funqy.amazon-lambda") +@ConfigRoot(phase = ConfigPhase.RUN_TIME) +public interface FunqyAmazonConfig { + + /** + * The advanced event handling config + */ + @WithName("advanced-event-handling") + AdvancedEventHandlingConfig advancedEventHandling(); +} diff --git a/extensions/funqy/funqy-amazon-lambda/runtime/src/main/java/io/quarkus/funqy/lambda/config/Kinesis.java b/extensions/funqy/funqy-amazon-lambda/runtime/src/main/java/io/quarkus/funqy/lambda/config/Kinesis.java new file mode 100644 index 00000000000000..b05562d93573a8 --- /dev/null +++ b/extensions/funqy/funqy-amazon-lambda/runtime/src/main/java/io/quarkus/funqy/lambda/config/Kinesis.java @@ -0,0 +1,17 @@ +package io.quarkus.funqy.lambda.config; + +import io.quarkus.runtime.annotations.ConfigGroup; +import io.smallrye.config.WithDefault; + +/** + * Kinesis event config + */ +@ConfigGroup +public interface Kinesis { + + /** + * Allows functions to return partially successful responses for a batch of event records. + */ + @WithDefault("true") + boolean reportBatchItemFailures(); +} diff --git a/extensions/funqy/funqy-amazon-lambda/runtime/src/main/java/io/quarkus/funqy/lambda/config/Sns.java b/extensions/funqy/funqy-amazon-lambda/runtime/src/main/java/io/quarkus/funqy/lambda/config/Sns.java new file mode 100644 index 00000000000000..cfb273a73550f4 --- /dev/null +++ b/extensions/funqy/funqy-amazon-lambda/runtime/src/main/java/io/quarkus/funqy/lambda/config/Sns.java @@ -0,0 +1,11 @@ +package io.quarkus.funqy.lambda.config; + +import io.quarkus.runtime.annotations.ConfigGroup; + +/** + * Sns event config + */ +@ConfigGroup +public interface Sns { + +} diff --git a/extensions/funqy/funqy-amazon-lambda/runtime/src/main/java/io/quarkus/funqy/lambda/config/Sqs.java b/extensions/funqy/funqy-amazon-lambda/runtime/src/main/java/io/quarkus/funqy/lambda/config/Sqs.java new file mode 100644 index 00000000000000..75346e96e58655 --- /dev/null +++ b/extensions/funqy/funqy-amazon-lambda/runtime/src/main/java/io/quarkus/funqy/lambda/config/Sqs.java @@ -0,0 +1,17 @@ +package io.quarkus.funqy.lambda.config; + +import io.quarkus.runtime.annotations.ConfigGroup; +import io.smallrye.config.WithDefault; + +/** + * Sqs event config + */ +@ConfigGroup +public interface Sqs { + + /** + * Allows functions to return partially successful responses for a batch of event records. + */ + @WithDefault("true") + boolean reportBatchItemFailures(); +} diff --git a/extensions/funqy/funqy-amazon-lambda/runtime/src/main/java/io/quarkus/funqy/lambda/event/AwsModule.java b/extensions/funqy/funqy-amazon-lambda/runtime/src/main/java/io/quarkus/funqy/lambda/event/AwsModule.java new file mode 100644 index 00000000000000..c75002af6af74e --- /dev/null +++ b/extensions/funqy/funqy-amazon-lambda/runtime/src/main/java/io/quarkus/funqy/lambda/event/AwsModule.java @@ -0,0 +1,42 @@ +package io.quarkus.funqy.lambda.event; + +import java.util.Iterator; + +import com.amazonaws.services.lambda.runtime.events.models.kinesis.Record; +import com.fasterxml.jackson.databind.BeanDescription; +import com.fasterxml.jackson.databind.DeserializationConfig; +import com.fasterxml.jackson.databind.deser.BeanDeserializerBuilder; +import com.fasterxml.jackson.databind.deser.BeanDeserializerModifier; +import com.fasterxml.jackson.databind.deser.SettableBeanProperty; +import com.fasterxml.jackson.databind.module.SimpleModule; + +import io.quarkus.funqy.lambda.event.kinesis.DateDeserializer; + +public class AwsModule extends SimpleModule { + + final String DATE_PROPERTY_NAME = "approximateArrivalTimestamp"; + + public AwsModule() { + this.setDeserializerModifier(new BeanDeserializerModifier() { + + @Override + public BeanDeserializerBuilder updateBuilder(final DeserializationConfig config, + final BeanDescription beanDesc, final BeanDeserializerBuilder builder) { + + for (final Iterator iterator = builder.getProperties(); iterator.hasNext();) { + SettableBeanProperty property = iterator.next(); + + // Kinesis records need some special treatment. The approximateArrivalTimestamp + // cannot be deserialized that easily. + if (Record.class.isAssignableFrom(property.getMember().getDeclaringClass()) + && DATE_PROPERTY_NAME.equalsIgnoreCase(property.getName())) { + final DateDeserializer deserializer = new DateDeserializer(); + property = property.withValueDeserializer(deserializer); + builder.addOrReplaceProperty(property, true); + } + } + return builder; + } + }); + } +} diff --git a/extensions/funqy/funqy-amazon-lambda/runtime/src/main/java/io/quarkus/funqy/lambda/event/EventDeserializer.java b/extensions/funqy/funqy-amazon-lambda/runtime/src/main/java/io/quarkus/funqy/lambda/event/EventDeserializer.java new file mode 100644 index 00000000000000..44e0362c5b7c9a --- /dev/null +++ b/extensions/funqy/funqy-amazon-lambda/runtime/src/main/java/io/quarkus/funqy/lambda/event/EventDeserializer.java @@ -0,0 +1,176 @@ +package io.quarkus.funqy.lambda.event; + +import java.io.IOException; +import java.util.List; + +import com.amazonaws.services.lambda.runtime.events.DynamodbEvent; +import com.amazonaws.services.lambda.runtime.events.KinesisEvent; +import com.amazonaws.services.lambda.runtime.events.SNSEvent; +import com.amazonaws.services.lambda.runtime.events.SQSEvent; +import com.fasterxml.jackson.core.JsonParser; +import com.fasterxml.jackson.core.ObjectCodec; +import com.fasterxml.jackson.core.TreeNode; +import com.fasterxml.jackson.core.type.TypeReference; +import com.fasterxml.jackson.databind.DeserializationContext; +import com.fasterxml.jackson.databind.JsonDeserializer; +import com.fasterxml.jackson.databind.JsonNode; +import com.fasterxml.jackson.databind.ObjectMapper; +import com.fasterxml.jackson.databind.ObjectReader; +import com.fasterxml.jackson.databind.node.ArrayNode; +import com.fasterxml.jackson.databind.node.ObjectNode; + +import io.cloudevents.SpecVersion; +import io.quarkus.funqy.lambda.config.FunqyAmazonBuildTimeConfig; +import io.quarkus.funqy.lambda.model.cloudevents.CloudEventV1; +import io.quarkus.funqy.lambda.model.kinesis.PipesKinesisEvent; + +public class EventDeserializer extends JsonDeserializer { + + private final FunqyAmazonBuildTimeConfig buildTimeConfig; + + private ObjectReader funqyMethodObjReader; + private ObjectMapper objectMapper; + + public EventDeserializer(FunqyAmazonBuildTimeConfig buildTimeConfig) { + this.buildTimeConfig = buildTimeConfig; + } + + @Override + public Object deserialize(JsonParser jsonParser, DeserializationContext ctx) + throws IOException { + + ObjectCodec codec = jsonParser.getCodec(); + + TreeNode rootNode = codec.readTree(jsonParser); + + if (buildTimeConfig.advancedEventHandling().enabled() && (rootNode.isObject() || rootNode.isArray())) { + + if (rootNode.isObject()) { + // object + ObjectNode object = (ObjectNode) rootNode; + + if (object.has("Records") && object.get("Records").isArray()) { + // We need to look into the first record entry, to distinguish the different types. + for (JsonNode record : object.get("Records")) { + return deserializeEvent(jsonParser, record, object, codec); + } + } + } else { + // array. This happens in case of AWS EventBridge usage and is also the only way to enable + // CloudEvents usage + ArrayNode array = (ArrayNode) rootNode; + for (JsonNode record : array) { + return deserializeEvent(jsonParser, record, array, codec); + } + } + } + // We have no clue what it is. Fallback to serializing the output of the funqy method + return deserializeFunqReturnType(rootNode); + } + + public Object deserializeEvent(final JsonParser jsonParser, JsonNode record, JsonNode rootNode, ObjectCodec codec) + throws IOException { + + Object eventClass = getEventType(record, rootNode); + + if (eventClass != null) { + if (eventClass instanceof Class clazz) { + return objectMapper.convertValue(rootNode, clazz); + } else if (eventClass instanceof TypeReference typeReference) { + return objectMapper.convertValue(rootNode, typeReference); + } + } + + return deserializeFunqReturnType(rootNode); + } + + private Object getEventType(JsonNode record, JsonNode object) { + String eventSource = getEventSource(record); + + if (eventSource == null) { + eventSource = "default"; + } + + // See: https://docs.aws.amazon.com/lambda/latest/dg/lambda-services.html + // and for Pipes: https://docs.aws.amazon.com/eventbridge/latest/userguide/eb-pipes-event-source.html + Class eventType = null; + switch (eventSource) { + case "aws:sqs": + if (object.isObject()) { + eventType = SQSEvent.class; + } else { + // EventBridge Pipes + return new TypeReference>() { + }; + } + break; + case "aws:sns": + eventType = SNSEvent.class; + break; + case "aws:kinesis": + // Exclude Kinesis time window events. This would require to send a responsible with a state. + // This is very specific to AWS and maybe not the way funqy wants to handle things. + if (object.isObject() && !object.has("window")) { + eventType = KinesisEvent.class; + } else if (object.isArray()) { + // EventBridge Pipes + return new TypeReference>() { + }; + } + break; + case "aws:dynamodb": + if (object.isObject()) { + eventType = DynamodbEvent.class; + } else { + // EventBridge Pipes + return new TypeReference>() { + }; + } + break; + default: + break; + } + if (eventType == null && isCloudEvent(record)) { + return new TypeReference>() { + }; + } + return eventType; + } + + private boolean isCloudEvent(final JsonNode record) { + // this is the best guess we can do. We check for required attributes: + // https://github.com/cloudevents/spec/blob/v1.0.2/cloudevents/spec.md#required-attributes + // A more tolerant way to check the type. We do not want to process to fail here. We can fall back to + // the best guess parting + return record.has("specversion") && record.get("specversion").isTextual() && + SpecVersion.V1.toString().equals(record.get("specversion").asText()) && + record.has("type"); + } + + private String getEventSource(JsonNode record) { + if (record.has("eventSource") && record.get("eventSource").isTextual()) { + return record.get("eventSource").asText(); + } + + // Unsure. In the AWS documentation SNS uses capital keys. I assume this is a mistake. + if (record.has("EventSource") && record.get("EventSource").isTextual()) { + return record.get("EventSource").asText(); + } + return null; + } + + private Object deserializeFunqReturnType(TreeNode node) throws IOException { + if (funqyMethodObjReader != null) { + return funqyMethodObjReader.readValue(node.traverse()); + } + return null; + } + + public void setFunqyMethodObjReader(ObjectReader funqyMethodObjReader) { + this.funqyMethodObjReader = funqyMethodObjReader; + } + + public void setObjectMapper(final ObjectMapper objectMapper) { + this.objectMapper = objectMapper; + } +} diff --git a/extensions/funqy/funqy-amazon-lambda/runtime/src/main/java/io/quarkus/funqy/lambda/event/EventErrorHandler.java b/extensions/funqy/funqy-amazon-lambda/runtime/src/main/java/io/quarkus/funqy/lambda/event/EventErrorHandler.java new file mode 100644 index 00000000000000..fbef780f2e2210 --- /dev/null +++ b/extensions/funqy/funqy-amazon-lambda/runtime/src/main/java/io/quarkus/funqy/lambda/event/EventErrorHandler.java @@ -0,0 +1,29 @@ +package io.quarkus.funqy.lambda.event; + +import java.util.ArrayList; +import java.util.List; + +import io.smallrye.mutiny.Uni; + +public class EventErrorHandler { + + private final List failures = new ArrayList<>(); + + public Uni collectFailures(Uni uni, String id) { + return uni.onTermination().invoke((item, failure, cancellation) -> { + Throwable actualFailure = null; + if (failure != null) { + actualFailure = failure; + } else if (cancellation) { + actualFailure = new RuntimeException("Stream cancelled"); + } + if (actualFailure != null) { + failures.add(id); + } + }).onFailure().recoverWithNull(); + } + + public List getFailures() { + return failures; + } +} diff --git a/extensions/funqy/funqy-amazon-lambda/runtime/src/main/java/io/quarkus/funqy/lambda/event/EventHandler.java b/extensions/funqy/funqy-amazon-lambda/runtime/src/main/java/io/quarkus/funqy/lambda/event/EventHandler.java new file mode 100644 index 00000000000000..1775904bd43520 --- /dev/null +++ b/extensions/funqy/funqy-amazon-lambda/runtime/src/main/java/io/quarkus/funqy/lambda/event/EventHandler.java @@ -0,0 +1,20 @@ +package io.quarkus.funqy.lambda.event; + +import java.io.InputStream; +import java.util.List; +import java.util.function.Supplier; +import java.util.stream.Stream; + +import io.quarkus.funqy.lambda.config.FunqyAmazonConfig; + +public interface EventHandler { + Stream streamEvent(E event, FunqyAmazonConfig amazonConfig); + + String getIdentifier(M message, FunqyAmazonConfig amazonConfig); + + Supplier getBody(M message, FunqyAmazonConfig amazonConfig); + + R createResponse(List failures, FunqyAmazonConfig amazonConfig); + + Class getMessageClass(); +} diff --git a/extensions/funqy/funqy-amazon-lambda/runtime/src/main/java/io/quarkus/funqy/lambda/event/EventProcessor.java b/extensions/funqy/funqy-amazon-lambda/runtime/src/main/java/io/quarkus/funqy/lambda/event/EventProcessor.java new file mode 100644 index 00000000000000..30d27b9160661a --- /dev/null +++ b/extensions/funqy/funqy-amazon-lambda/runtime/src/main/java/io/quarkus/funqy/lambda/event/EventProcessor.java @@ -0,0 +1,168 @@ +package io.quarkus.funqy.lambda.event; + +import java.io.IOException; +import java.io.InputStream; +import java.io.OutputStream; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.function.Function; +import java.util.function.Supplier; + +import org.jboss.logging.Logger; + +import com.amazonaws.services.lambda.runtime.Context; +import com.amazonaws.services.lambda.runtime.events.DynamodbEvent; +import com.amazonaws.services.lambda.runtime.events.KinesisEvent; +import com.amazonaws.services.lambda.runtime.events.SNSEvent; +import com.amazonaws.services.lambda.runtime.events.SQSEvent; +import com.fasterxml.jackson.databind.ObjectMapper; +import com.fasterxml.jackson.databind.ObjectReader; + +import io.quarkus.funqy.lambda.config.FunqyAmazonBuildTimeConfig; +import io.quarkus.funqy.lambda.config.FunqyAmazonConfig; +import io.quarkus.funqy.lambda.event.cloudevents.CloudEventsHandler; +import io.quarkus.funqy.lambda.event.dynamodb.DynamoDbEventHandler; +import io.quarkus.funqy.lambda.event.dynamodb.PipesDynamoDbEventHandler; +import io.quarkus.funqy.lambda.event.kinesis.KinesisEventHandler; +import io.quarkus.funqy.lambda.event.kinesis.PipesKinesisEventHandler; +import io.quarkus.funqy.lambda.event.sns.SnsEventHandler; +import io.quarkus.funqy.lambda.event.sqs.PipesSqsEventHandler; +import io.quarkus.funqy.lambda.event.sqs.SqsEventHandler; +import io.quarkus.funqy.lambda.model.FunqyMethod; +import io.quarkus.funqy.lambda.model.cloudevents.CloudEventV1; +import io.quarkus.funqy.lambda.model.kinesis.PipesKinesisEvent; +import io.quarkus.funqy.runtime.FunqyServerResponse; +import io.smallrye.mutiny.Uni; + +public class EventProcessor { + + private static final Logger log = Logger.getLogger(EventProcessor.class); + + private final EventDeserializer eventDeserializer; + private final ObjectMapper objectMapper; + private final ObjectReader reader; + private final FunqyMethod funqyMethod; + private final Map, EventHandler> eventHandlers; + + private FunqyAmazonConfig amazonConfig; + + public EventProcessor(ObjectMapper objectMapper, EventDeserializer eventDeserializer, + FunqyMethod funqyMethod, + FunqyAmazonBuildTimeConfig buildTimeConfig) { + this.objectMapper = objectMapper; + this.eventDeserializer = eventDeserializer; + this.reader = objectMapper.readerFor(Object.class); + this.funqyMethod = funqyMethod; + + this.eventHandlers = new HashMap<>(); + if (buildTimeConfig.advancedEventHandling().enabled()) { + this.eventHandlers.put(SQSEvent.class, new SqsEventHandler()); + this.eventHandlers.put(SQSEvent.SQSMessage.class, new PipesSqsEventHandler()); + this.eventHandlers.put(SNSEvent.class, new SnsEventHandler()); + this.eventHandlers.put(KinesisEvent.class, new KinesisEventHandler()); + this.eventHandlers.put(PipesKinesisEvent.class, new PipesKinesisEventHandler()); + this.eventHandlers.put(DynamodbEvent.class, new DynamoDbEventHandler()); + this.eventHandlers.put(DynamodbEvent.DynamodbStreamRecord.class, new PipesDynamoDbEventHandler()); + this.eventHandlers.put(CloudEventV1.class, new CloudEventsHandler()); + } + } + + public void init(FunqyAmazonConfig amazonConfig) { + // This is a bit strange. We have some static values, which are initialized at some point in time. + // See FunqyLambdaBindingRecorder. Furthermore, we need to pass through the runtime config. + eventDeserializer.setFunqyMethodObjReader(funqyMethod.getReader().orElse(null)); + eventDeserializer.setObjectMapper(objectMapper); + this.amazonConfig = amazonConfig; + } + + public void handle(InputStream inputStream, OutputStream outputStream, + Function dispatcher, Context context) throws IOException { + // event might be null + Object event = reader.readValue(inputStream); + + EventHandler handler = getHandler(event); + if (handler != null) { + EventErrorHandler eventErrorHandler = new EventErrorHandler(); + + Object value = handleEvent(handler, event, eventErrorHandler, dispatcher) + .await().indefinitely(); + + if (value != null) { + objectMapper.writeValue(outputStream, value); + } + } else { + // Unknown event type. We do what Funqy normally did in the past. + FunqyServerResponse response = dispatcher.apply(event); + + Object value = response.getOutput().await().indefinitely(); + writeOutput(outputStream, value); + } + } + + private EventHandler getHandler(Object event) { + if (event == null) { + return null; + } + if (event instanceof List list && !list.isEmpty()) { + // we need some special handling for lists + return eventHandlers.get(list.get(0).getClass()); + } + + return eventHandlers.get(event.getClass()); + } + + @SuppressWarnings("unchecked") + private Uni handleEvent(EventHandler handler, Object event, + EventErrorHandler eventErrorHandler, Function dispatcher) { + + // We collect all messages in a list first, so that we can execute them in parallel. + List> unis = handler.streamEvent((E) event, amazonConfig) + .map(msg -> handleMessage(handler, eventErrorHandler, dispatcher, msg)).toList(); + + return Uni.combine().all().unis(unis) + .collectFailures().discardItems() + .onFailure().recoverWithNull() + .replaceWith(() -> handler.createResponse(eventErrorHandler.getFailures(), amazonConfig)); + } + + private Uni handleMessage(final EventHandler handler, final EventErrorHandler eventErrorHandler, + final Function dispatcher, final M msg) { + try { + final boolean isSuitableType = funqyMethod.getInputType().map(type -> type.hasRawClass(handler.getMessageClass())) + .orElse(false); + + Object input; + if (isSuitableType) { + input = msg; + } else { + input = readMessageBody(handler.getBody(msg, amazonConfig)); + } + + FunqyServerResponse response = dispatcher.apply(input); + + return eventErrorHandler.collectFailures(response.getOutput(), handler.getIdentifier(msg, amazonConfig)); + } catch (Throwable e) { + log.errorv(e, """ + Event could not be handled. This might happen, when the lambda is used with a not supported \ + trigger. If this happens you should disable the advanced event handling and handle the event \ + manually. + """); + return eventErrorHandler.collectFailures(Uni.createFrom().failure(e), + handler.getIdentifier(msg, amazonConfig)); + } + } + + private Object readMessageBody(Supplier is) throws IOException { + if (funqyMethod.getReader().isEmpty()) { + return null; + } + return funqyMethod.getReader().get().readValue(is.get()); + } + + private void writeOutput(final OutputStream outputStream, final Object value) throws IOException { + if (funqyMethod.getWriter().isPresent() && value != null) { + funqyMethod.getWriter().get().writeValue(outputStream, value); + } + } +} diff --git a/extensions/funqy/funqy-amazon-lambda/runtime/src/main/java/io/quarkus/funqy/lambda/event/cloudevents/CloudEventsHandler.java b/extensions/funqy/funqy-amazon-lambda/runtime/src/main/java/io/quarkus/funqy/lambda/event/cloudevents/CloudEventsHandler.java new file mode 100644 index 00000000000000..380e6d670df52d --- /dev/null +++ b/extensions/funqy/funqy-amazon-lambda/runtime/src/main/java/io/quarkus/funqy/lambda/event/cloudevents/CloudEventsHandler.java @@ -0,0 +1,46 @@ +package io.quarkus.funqy.lambda.event.cloudevents; + +import java.io.ByteArrayInputStream; +import java.io.InputStream; +import java.util.List; +import java.util.function.Supplier; +import java.util.stream.Stream; + +import io.cloudevents.CloudEvent; +import io.quarkus.funqy.lambda.config.FunqyAmazonConfig; +import io.quarkus.funqy.lambda.event.EventHandler; +import io.quarkus.funqy.lambda.model.pipes.BatchItemFailures; +import io.quarkus.funqy.lambda.model.pipes.Response; + +public class CloudEventsHandler implements EventHandler, CloudEvent, Response> { + @Override + public Stream streamEvent(final List event, final FunqyAmazonConfig amazonConfig) { + if (event == null) { + return Stream.empty(); + } + return event.stream(); + } + + @Override + public String getIdentifier(final CloudEvent message, final FunqyAmazonConfig amazonConfig) { + return message.getId(); + } + + @Override + public Supplier getBody(final CloudEvent message, final FunqyAmazonConfig amazonConfig) { + return () -> new ByteArrayInputStream(message.getData().toBytes()); + } + + @Override + public Response createResponse(final List failures, final FunqyAmazonConfig amazonConfig) { + if (!amazonConfig.advancedEventHandling().sqs().reportBatchItemFailures()) { + return null; + } + return new Response(failures.stream().map(BatchItemFailures::new).toList()); + } + + @Override + public Class getMessageClass() { + return CloudEvent.class; + } +} diff --git a/extensions/funqy/funqy-amazon-lambda/runtime/src/main/java/io/quarkus/funqy/lambda/event/dynamodb/DynamoDbEventHandler.java b/extensions/funqy/funqy-amazon-lambda/runtime/src/main/java/io/quarkus/funqy/lambda/event/dynamodb/DynamoDbEventHandler.java new file mode 100644 index 00000000000000..e47ad7d92e33c9 --- /dev/null +++ b/extensions/funqy/funqy-amazon-lambda/runtime/src/main/java/io/quarkus/funqy/lambda/event/dynamodb/DynamoDbEventHandler.java @@ -0,0 +1,53 @@ +package io.quarkus.funqy.lambda.event.dynamodb; + +import java.io.InputStream; +import java.util.List; +import java.util.function.Supplier; +import java.util.stream.Stream; + +import com.amazonaws.services.lambda.runtime.events.DynamodbEvent; +import com.amazonaws.services.lambda.runtime.events.DynamodbEvent.DynamodbStreamRecord; +import com.amazonaws.services.lambda.runtime.events.StreamsEventResponse; + +import io.quarkus.funqy.lambda.config.FunqyAmazonConfig; +import io.quarkus.funqy.lambda.event.EventHandler; + +public class DynamoDbEventHandler implements EventHandler { + + @Override + public Stream streamEvent(DynamodbEvent event, FunqyAmazonConfig amazonConfig) { + if (event == null) { + return Stream.empty(); + } + return event.getRecords().stream(); + } + + @Override + public String getIdentifier(DynamodbStreamRecord message, FunqyAmazonConfig amazonConfig) { + return message.getDynamodb().getSequenceNumber(); + } + + @Override + public Supplier getBody(DynamodbStreamRecord message, FunqyAmazonConfig amazonConfig) { + throw new IllegalStateException(""" + DynamoDB records are too specific. It is not supported to extract a message from them. \ + Use the DynamodbStreamRecord in your funq method, or use EventBridge Pipes with CloudEvents. + """); + } + + @Override + public StreamsEventResponse createResponse(List failures, FunqyAmazonConfig amazonConfig) { + if (!amazonConfig.advancedEventHandling().dynamoDb().reportBatchItemFailures()) { + return null; + } + return StreamsEventResponse.builder().withBatchItemFailures( + failures.stream().map(id -> StreamsEventResponse.BatchItemFailure.builder() + .withItemIdentifier(id).build()).toList()) + .build(); + } + + @Override + public Class getMessageClass() { + return DynamodbStreamRecord.class; + } +} diff --git a/extensions/funqy/funqy-amazon-lambda/runtime/src/main/java/io/quarkus/funqy/lambda/event/dynamodb/PipesDynamoDbEventHandler.java b/extensions/funqy/funqy-amazon-lambda/runtime/src/main/java/io/quarkus/funqy/lambda/event/dynamodb/PipesDynamoDbEventHandler.java new file mode 100644 index 00000000000000..6363bf1e82a484 --- /dev/null +++ b/extensions/funqy/funqy-amazon-lambda/runtime/src/main/java/io/quarkus/funqy/lambda/event/dynamodb/PipesDynamoDbEventHandler.java @@ -0,0 +1,53 @@ +package io.quarkus.funqy.lambda.event.dynamodb; + +import java.io.InputStream; +import java.util.List; +import java.util.function.Supplier; +import java.util.stream.Stream; + +import com.amazonaws.services.lambda.runtime.events.DynamodbEvent.DynamodbStreamRecord; +import com.amazonaws.services.lambda.runtime.events.StreamsEventResponse; + +import io.quarkus.funqy.lambda.config.FunqyAmazonConfig; +import io.quarkus.funqy.lambda.event.EventHandler; + +public class PipesDynamoDbEventHandler + implements EventHandler, DynamodbStreamRecord, StreamsEventResponse> { + + @Override + public Stream streamEvent(List event, FunqyAmazonConfig amazonConfig) { + if (event == null) { + return Stream.empty(); + } + return event.stream(); + } + + @Override + public String getIdentifier(DynamodbStreamRecord message, FunqyAmazonConfig amazonConfig) { + return message.getDynamodb().getSequenceNumber(); + } + + @Override + public Supplier getBody(DynamodbStreamRecord message, FunqyAmazonConfig amazonConfig) { + throw new IllegalStateException(""" + DynamoDB records are too specific. It is not supported to extract a message from them. \ + Use the DynamodbStreamRecord in your funq method, or use EventBridge Pipes with CloudEvents. + """); + } + + @Override + public StreamsEventResponse createResponse(List failures, FunqyAmazonConfig amazonConfig) { + if (!amazonConfig.advancedEventHandling().dynamoDb().reportBatchItemFailures()) { + return null; + } + return StreamsEventResponse.builder().withBatchItemFailures( + failures.stream().map(id -> StreamsEventResponse.BatchItemFailure.builder() + .withItemIdentifier(id).build()).toList()) + .build(); + } + + @Override + public Class getMessageClass() { + return DynamodbStreamRecord.class; + } +} diff --git a/extensions/funqy/funqy-amazon-lambda/runtime/src/main/java/io/quarkus/funqy/lambda/event/kinesis/DateDeserializer.java b/extensions/funqy/funqy-amazon-lambda/runtime/src/main/java/io/quarkus/funqy/lambda/event/kinesis/DateDeserializer.java new file mode 100644 index 00000000000000..07f19bbfda6a8f --- /dev/null +++ b/extensions/funqy/funqy-amazon-lambda/runtime/src/main/java/io/quarkus/funqy/lambda/event/kinesis/DateDeserializer.java @@ -0,0 +1,23 @@ +package io.quarkus.funqy.lambda.event.kinesis; + +import java.io.IOException; +import java.util.Date; + +import com.fasterxml.jackson.core.JsonParser; +import com.fasterxml.jackson.databind.DeserializationContext; +import com.fasterxml.jackson.databind.JsonDeserializer; + +public class DateDeserializer extends JsonDeserializer { + + @Override + public Date deserialize(JsonParser jsonParser, DeserializationContext deserializationContext) throws IOException { + String fieldName = jsonParser.getCurrentName(); + if ("approximateArrivalTimestamp".equals(fieldName)) { + double timestamp = jsonParser.getDoubleValue(); + long milliseconds = (long) (timestamp * 1000); + return new Date(milliseconds); + } + // For other properties, delegate to default deserialization + return (Date) deserializationContext.handleUnexpectedToken(Date.class, jsonParser); + } +} diff --git a/extensions/funqy/funqy-amazon-lambda/runtime/src/main/java/io/quarkus/funqy/lambda/event/kinesis/KinesisEventHandler.java b/extensions/funqy/funqy-amazon-lambda/runtime/src/main/java/io/quarkus/funqy/lambda/event/kinesis/KinesisEventHandler.java new file mode 100644 index 00000000000000..bf70838bbbb1bd --- /dev/null +++ b/extensions/funqy/funqy-amazon-lambda/runtime/src/main/java/io/quarkus/funqy/lambda/event/kinesis/KinesisEventHandler.java @@ -0,0 +1,54 @@ +package io.quarkus.funqy.lambda.event.kinesis; + +import java.io.ByteArrayInputStream; +import java.io.InputStream; +import java.util.List; +import java.util.function.Supplier; +import java.util.stream.Stream; + +import com.amazonaws.services.lambda.runtime.events.KinesisEvent; +import com.amazonaws.services.lambda.runtime.events.KinesisEvent.KinesisEventRecord; +import com.amazonaws.services.lambda.runtime.events.StreamsEventResponse; +import com.amazonaws.services.lambda.runtime.events.StreamsEventResponse.BatchItemFailure; +import com.fasterxml.jackson.databind.util.ByteBufferBackedInputStream; + +import io.quarkus.funqy.lambda.config.FunqyAmazonConfig; +import io.quarkus.funqy.lambda.event.EventHandler; + +public class KinesisEventHandler implements EventHandler { + + @Override + public Stream streamEvent(KinesisEvent event, FunqyAmazonConfig amazonConfig) { + if (event == null) { + return Stream.empty(); + } + return event.getRecords().stream(); + } + + @Override + public String getIdentifier(KinesisEventRecord message, FunqyAmazonConfig amazonConfig) { + return message.getKinesis().getSequenceNumber(); + } + + @Override + public Supplier getBody(KinesisEventRecord message, FunqyAmazonConfig amazonConfig) { + if (message.getKinesis().getData() == null) { + return ByteArrayInputStream::nullInputStream; + } + return () -> new ByteBufferBackedInputStream(message.getKinesis().getData()); + } + + @Override + public StreamsEventResponse createResponse(List failures, FunqyAmazonConfig amazonConfig) { + if (!amazonConfig.advancedEventHandling().kinesis().reportBatchItemFailures()) { + return null; + } + return StreamsEventResponse.builder().withBatchItemFailures( + failures.stream().map(id -> BatchItemFailure.builder().withItemIdentifier(id).build()).toList()).build(); + } + + @Override + public Class getMessageClass() { + return KinesisEventRecord.class; + } +} diff --git a/extensions/funqy/funqy-amazon-lambda/runtime/src/main/java/io/quarkus/funqy/lambda/event/kinesis/PipesKinesisEventHandler.java b/extensions/funqy/funqy-amazon-lambda/runtime/src/main/java/io/quarkus/funqy/lambda/event/kinesis/PipesKinesisEventHandler.java new file mode 100644 index 00000000000000..20205d2903d721 --- /dev/null +++ b/extensions/funqy/funqy-amazon-lambda/runtime/src/main/java/io/quarkus/funqy/lambda/event/kinesis/PipesKinesisEventHandler.java @@ -0,0 +1,54 @@ +package io.quarkus.funqy.lambda.event.kinesis; + +import java.io.ByteArrayInputStream; +import java.io.InputStream; +import java.util.List; +import java.util.function.Supplier; +import java.util.stream.Stream; + +import com.amazonaws.services.lambda.runtime.events.StreamsEventResponse; +import com.amazonaws.services.lambda.runtime.events.StreamsEventResponse.BatchItemFailure; +import com.fasterxml.jackson.databind.util.ByteBufferBackedInputStream; + +import io.quarkus.funqy.lambda.config.FunqyAmazonConfig; +import io.quarkus.funqy.lambda.event.EventHandler; +import io.quarkus.funqy.lambda.model.kinesis.PipesKinesisEvent; + +public class PipesKinesisEventHandler + implements EventHandler, PipesKinesisEvent, StreamsEventResponse> { + + @Override + public Stream streamEvent(List event, FunqyAmazonConfig amazonConfig) { + if (event == null) { + return Stream.empty(); + } + return event.stream(); + } + + @Override + public String getIdentifier(PipesKinesisEvent message, FunqyAmazonConfig amazonConfig) { + return message.getSequenceNumber(); + } + + @Override + public Supplier getBody(PipesKinesisEvent message, FunqyAmazonConfig amazonConfig) { + if (message.getData() == null) { + return ByteArrayInputStream::nullInputStream; + } + return () -> new ByteBufferBackedInputStream(message.getData()); + } + + @Override + public StreamsEventResponse createResponse(List failures, FunqyAmazonConfig amazonConfig) { + if (!amazonConfig.advancedEventHandling().kinesis().reportBatchItemFailures()) { + return null; + } + return StreamsEventResponse.builder().withBatchItemFailures( + failures.stream().map(id -> BatchItemFailure.builder().withItemIdentifier(id).build()).toList()).build(); + } + + @Override + public Class getMessageClass() { + return PipesKinesisEvent.class; + } +} diff --git a/extensions/funqy/funqy-amazon-lambda/runtime/src/main/java/io/quarkus/funqy/lambda/event/sns/SnsEventHandler.java b/extensions/funqy/funqy-amazon-lambda/runtime/src/main/java/io/quarkus/funqy/lambda/event/sns/SnsEventHandler.java new file mode 100644 index 00000000000000..a263ce6e908128 --- /dev/null +++ b/extensions/funqy/funqy-amazon-lambda/runtime/src/main/java/io/quarkus/funqy/lambda/event/sns/SnsEventHandler.java @@ -0,0 +1,49 @@ +package io.quarkus.funqy.lambda.event.sns; + +import java.io.ByteArrayInputStream; +import java.io.InputStream; +import java.nio.charset.StandardCharsets; +import java.util.List; +import java.util.function.Supplier; +import java.util.stream.Stream; + +import com.amazonaws.services.lambda.runtime.events.SNSEvent; +import com.amazonaws.services.lambda.runtime.events.SNSEvent.SNSRecord; + +import io.quarkus.funqy.lambda.config.FunqyAmazonConfig; +import io.quarkus.funqy.lambda.event.EventHandler; + +public class SnsEventHandler implements EventHandler { + + @Override + public Stream streamEvent(SNSEvent event, FunqyAmazonConfig amazonConfig) { + if (event == null) { + return Stream.empty(); + } + return event.getRecords().stream(); + } + + @Override + public String getIdentifier(SNSRecord message, FunqyAmazonConfig amazonConfig) { + return message.getSNS().getMessageId(); + } + + @Override + public Supplier getBody(SNSRecord message, FunqyAmazonConfig amazonConfig) { + if (message.getSNS() == null) { + return ByteArrayInputStream::nullInputStream; + } + return () -> new ByteArrayInputStream(message.getSNS().getMessage().getBytes(StandardCharsets.UTF_8)); + } + + @Override + public Void createResponse(List failures, FunqyAmazonConfig amazonConfig) { + // SNS does not support batch item failures. We return nothing, which results in no response + return null; + } + + @Override + public Class getMessageClass() { + return SNSRecord.class; + } +} diff --git a/extensions/funqy/funqy-amazon-lambda/runtime/src/main/java/io/quarkus/funqy/lambda/event/sqs/PipesSqsEventHandler.java b/extensions/funqy/funqy-amazon-lambda/runtime/src/main/java/io/quarkus/funqy/lambda/event/sqs/PipesSqsEventHandler.java new file mode 100644 index 00000000000000..f9158e6b5a4469 --- /dev/null +++ b/extensions/funqy/funqy-amazon-lambda/runtime/src/main/java/io/quarkus/funqy/lambda/event/sqs/PipesSqsEventHandler.java @@ -0,0 +1,49 @@ +package io.quarkus.funqy.lambda.event.sqs; + +import java.io.ByteArrayInputStream; +import java.io.InputStream; +import java.nio.charset.StandardCharsets; +import java.util.List; +import java.util.function.Supplier; +import java.util.stream.Stream; + +import com.amazonaws.services.lambda.runtime.events.SQSEvent.SQSMessage; + +import io.quarkus.funqy.lambda.config.FunqyAmazonConfig; +import io.quarkus.funqy.lambda.event.EventHandler; +import io.quarkus.funqy.lambda.model.pipes.BatchItemFailures; +import io.quarkus.funqy.lambda.model.pipes.Response; + +public class PipesSqsEventHandler implements EventHandler, SQSMessage, Response> { + + @Override + public Stream streamEvent(final List event, final FunqyAmazonConfig amazonConfig) { + return event.stream(); + } + + @Override + public String getIdentifier(SQSMessage message, FunqyAmazonConfig amazonConfig) { + return message.getMessageId(); + } + + @Override + public Supplier getBody(SQSMessage message, FunqyAmazonConfig amazonConfig) { + if (message.getBody() == null) { + return ByteArrayInputStream::nullInputStream; + } + return () -> new ByteArrayInputStream(message.getBody().getBytes(StandardCharsets.UTF_8)); + } + + @Override + public Response createResponse(List failures, FunqyAmazonConfig amazonConfig) { + if (!amazonConfig.advancedEventHandling().sqs().reportBatchItemFailures()) { + return null; + } + return new Response(failures.stream().map(BatchItemFailures::new).toList()); + } + + @Override + public Class getMessageClass() { + return SQSMessage.class; + } +} diff --git a/extensions/funqy/funqy-amazon-lambda/runtime/src/main/java/io/quarkus/funqy/lambda/event/sqs/SqsEventHandler.java b/extensions/funqy/funqy-amazon-lambda/runtime/src/main/java/io/quarkus/funqy/lambda/event/sqs/SqsEventHandler.java new file mode 100644 index 00000000000000..ea39ff19dd0653 --- /dev/null +++ b/extensions/funqy/funqy-amazon-lambda/runtime/src/main/java/io/quarkus/funqy/lambda/event/sqs/SqsEventHandler.java @@ -0,0 +1,54 @@ +package io.quarkus.funqy.lambda.event.sqs; + +import java.io.ByteArrayInputStream; +import java.io.InputStream; +import java.nio.charset.StandardCharsets; +import java.util.List; +import java.util.function.Supplier; +import java.util.stream.Stream; + +import com.amazonaws.services.lambda.runtime.events.SQSBatchResponse; +import com.amazonaws.services.lambda.runtime.events.SQSBatchResponse.BatchItemFailure; +import com.amazonaws.services.lambda.runtime.events.SQSEvent; +import com.amazonaws.services.lambda.runtime.events.SQSEvent.SQSMessage; + +import io.quarkus.funqy.lambda.config.FunqyAmazonConfig; +import io.quarkus.funqy.lambda.event.EventHandler; + +public class SqsEventHandler implements EventHandler { + + @Override + public Stream streamEvent(SQSEvent event, FunqyAmazonConfig amazonConfig) { + if (event == null) { + return Stream.empty(); + } + return event.getRecords().stream(); + } + + @Override + public String getIdentifier(SQSMessage message, FunqyAmazonConfig amazonConfig) { + return message.getMessageId(); + } + + @Override + public Supplier getBody(SQSMessage message, FunqyAmazonConfig amazonConfig) { + if (message.getBody() == null) { + return ByteArrayInputStream::nullInputStream; + } + return () -> new ByteArrayInputStream(message.getBody().getBytes(StandardCharsets.UTF_8)); + } + + @Override + public SQSBatchResponse createResponse(List failures, FunqyAmazonConfig amazonConfig) { + if (!amazonConfig.advancedEventHandling().sqs().reportBatchItemFailures()) { + return null; + } + return SQSBatchResponse.builder().withBatchItemFailures( + failures.stream().map(id -> BatchItemFailure.builder().withItemIdentifier(id).build()).toList()).build(); + } + + @Override + public Class getMessageClass() { + return SQSMessage.class; + } +} diff --git a/extensions/funqy/funqy-amazon-lambda/runtime/src/main/java/io/quarkus/funqy/lambda/model/FunqyMethod.java b/extensions/funqy/funqy-amazon-lambda/runtime/src/main/java/io/quarkus/funqy/lambda/model/FunqyMethod.java new file mode 100644 index 00000000000000..d0f979ebac03c2 --- /dev/null +++ b/extensions/funqy/funqy-amazon-lambda/runtime/src/main/java/io/quarkus/funqy/lambda/model/FunqyMethod.java @@ -0,0 +1,39 @@ +package io.quarkus.funqy.lambda.model; + +import java.util.Optional; + +import com.fasterxml.jackson.databind.JavaType; +import com.fasterxml.jackson.databind.ObjectReader; +import com.fasterxml.jackson.databind.ObjectWriter; + +public class FunqyMethod { + + private final ObjectReader reader; + private final ObjectWriter writer; + private final JavaType inputType; + private final JavaType outputType; + + public FunqyMethod(final ObjectReader reader, final ObjectWriter writer, final JavaType inputType, + final JavaType outputType) { + this.reader = reader; + this.writer = writer; + this.inputType = inputType; + this.outputType = outputType; + } + + public Optional getReader() { + return Optional.ofNullable(reader); + } + + public Optional getWriter() { + return Optional.ofNullable(writer); + } + + public Optional getInputType() { + return Optional.ofNullable(inputType); + } + + public Optional getOutputType() { + return Optional.ofNullable(outputType); + } +} diff --git a/extensions/funqy/funqy-amazon-lambda/runtime/src/main/java/io/quarkus/funqy/lambda/model/cloudevents/CloudEventDataV1.java b/extensions/funqy/funqy-amazon-lambda/runtime/src/main/java/io/quarkus/funqy/lambda/model/cloudevents/CloudEventDataV1.java new file mode 100644 index 00000000000000..9ba920623c2bfd --- /dev/null +++ b/extensions/funqy/funqy-amazon-lambda/runtime/src/main/java/io/quarkus/funqy/lambda/model/cloudevents/CloudEventDataV1.java @@ -0,0 +1,27 @@ +package io.quarkus.funqy.lambda.model.cloudevents; + +import java.nio.charset.StandardCharsets; + +import io.cloudevents.CloudEventData; + +public class CloudEventDataV1 implements CloudEventData { + + private final byte[] data; + + public CloudEventDataV1(final String data) { + if (data == null) { + this.data = null; + } else { + this.data = data.getBytes(StandardCharsets.UTF_8); + } + } + + public CloudEventDataV1(final byte[] data) { + this.data = data; + } + + @Override + public byte[] toBytes() { + return this.data; + } +} diff --git a/extensions/funqy/funqy-amazon-lambda/runtime/src/main/java/io/quarkus/funqy/lambda/model/cloudevents/CloudEventV1.java b/extensions/funqy/funqy-amazon-lambda/runtime/src/main/java/io/quarkus/funqy/lambda/model/cloudevents/CloudEventV1.java new file mode 100644 index 00000000000000..7f1b52bb5c01f0 --- /dev/null +++ b/extensions/funqy/funqy-amazon-lambda/runtime/src/main/java/io/quarkus/funqy/lambda/model/cloudevents/CloudEventV1.java @@ -0,0 +1,172 @@ +package io.quarkus.funqy.lambda.model.cloudevents; + +import java.io.IOException; +import java.net.URI; +import java.time.OffsetDateTime; +import java.util.HashMap; +import java.util.Map; +import java.util.Set; + +import com.fasterxml.jackson.annotation.JsonAnySetter; +import com.fasterxml.jackson.annotation.JsonProperty; +import com.fasterxml.jackson.databind.JsonNode; + +import io.cloudevents.CloudEvent; +import io.cloudevents.CloudEventData; +import io.cloudevents.SpecVersion; + +/** + * This class represents a {@link CloudEvent} in version 1.0 and is Jackson friendly + */ +public class CloudEventV1 implements CloudEvent { + + //private static final Pattern JSON_TYPE_PATTERN = Pattern.compile("^(application|text)/([a-zA-Z]+\\+)?json;?.*$"); + + private final CloudEventDataV1 data; + private final SpecVersion specVersion; + private final String id; + private final String type; + private final URI source; + private final String dataContentType; + private final URI dataSchema; + private final String subject; + private final OffsetDateTime time; + private final Map extensions; + + public CloudEventV1( + @JsonProperty("specversion") String specVersion, + @JsonProperty("id") String id, + @JsonProperty("type") String type, + @JsonProperty("source") URI source, + @JsonProperty("datacontenttype") String dataContentType, + @JsonProperty("dataschema") URI dataSchema, + @JsonProperty("subject") String subject, + @JsonProperty("time") OffsetDateTime time, + @JsonProperty("data") JsonNode data, + @JsonProperty("data_base64") JsonNode dataBase64) { + this.specVersion = SpecVersion.parse(specVersion); + this.id = id; + this.type = type; + this.source = source; + this.dataContentType = dataContentType; + this.dataSchema = dataSchema; + this.subject = subject; + this.time = time; + this.extensions = new HashMap<>(); + this.data = deserializeData(data, dataBase64, dataContentType); + } + + @JsonAnySetter + public void add(String property, String value) { + switch (property) { + case "specversion": + case "id": + case "source": + case "type": + case "datacontenttype": + case "dataschema": + case "data": + case "data_base64": + case "subject": + case "time": + // Those names are reserved + return; + } + extensions.put(property, value); + } + + private CloudEventDataV1 deserializeData(final JsonNode data, final JsonNode dataBase64, + final String dataContentType) { + if (dataBase64 != null) { + try { + return new CloudEventDataV1(dataBase64.binaryValue()); + } catch (IOException e) { + throw new RuntimeException(e); + } + } else if (data == null) { + return null; + } + + if (data.isTextual()) { + return new CloudEventDataV1(data.asText()); + } else { + // This should work for every other type. Even for application/json, because we need to serialize + // the data anyway for the interface. + return new CloudEventDataV1(data.toString()); + } + } + + @Override + public CloudEventData getData() { + return this.data; + } + + @Override + public SpecVersion getSpecVersion() { + return this.specVersion; + } + + @Override + public String getId() { + return this.id; + } + + @Override + public String getType() { + return this.type; + } + + @Override + public URI getSource() { + return this.source; + } + + @Override + public String getDataContentType() { + return this.dataContentType; + } + + @Override + public URI getDataSchema() { + return this.dataSchema; + } + + @Override + public String getSubject() { + return this.subject; + } + + @Override + public OffsetDateTime getTime() { + return this.time; + } + + @Override + public Object getAttribute(final String attributeName) throws IllegalArgumentException { + return switch (attributeName) { + case "specversion" -> getSpecVersion(); + case "id" -> getId(); + case "source" -> getSource(); + case "type" -> getType(); + case "datacontenttype" -> getDataContentType(); + case "dataschema" -> getDataSchema(); + case "subject" -> getSubject(); + case "time" -> getTime(); + default -> throw new IllegalArgumentException( + "The specified attribute name \"" + attributeName + "\" is not specified in version v1."); + }; + } + + @Override + public Object getExtension(final String s) { + if (s == null) { + throw new IllegalArgumentException("Extension name cannot be null"); + } + return this.extensions.get(s); + } + + @Override + public Set getExtensionNames() { + return this.extensions.keySet(); + } +} diff --git a/extensions/funqy/funqy-amazon-lambda/runtime/src/main/java/io/quarkus/funqy/lambda/model/kinesis/PipesKinesisEvent.java b/extensions/funqy/funqy-amazon-lambda/runtime/src/main/java/io/quarkus/funqy/lambda/model/kinesis/PipesKinesisEvent.java new file mode 100644 index 00000000000000..78ed01667a2096 --- /dev/null +++ b/extensions/funqy/funqy-amazon-lambda/runtime/src/main/java/io/quarkus/funqy/lambda/model/kinesis/PipesKinesisEvent.java @@ -0,0 +1,61 @@ +package io.quarkus.funqy.lambda.model.kinesis; + +import com.amazonaws.services.lambda.runtime.events.models.kinesis.Record; + +/** + * For some reason AWS decided to flatten the model in EventBridge pipes for Kinesis. So there is no additional + * property called kinesis. We use the Record model and add the missing properties. Sadly I could not find a + * dedicated model for Kinesis in Pipes. So it is a combination of + * {@link com.amazonaws.services.lambda.runtime.events.KinesisEvent.KinesisEventRecord} and {@link Record} + */ +public class PipesKinesisEvent extends Record { + + private String eventSource; + + private String eventID; + + private String invokeIdentityArn; + + private String eventName; + + private String eventVersion; + + private String eventSourceARN; + + private String awsRegion; + + public PipesKinesisEvent setEventSource(final String eventSource) { + this.eventSource = eventSource; + return this; + } + + public PipesKinesisEvent setEventID(final String eventID) { + this.eventID = eventID; + return this; + } + + public PipesKinesisEvent setInvokeIdentityArn(final String invokeIdentityArn) { + this.invokeIdentityArn = invokeIdentityArn; + return this; + } + + public PipesKinesisEvent setEventName(final String eventName) { + this.eventName = eventName; + return this; + } + + public PipesKinesisEvent setEventVersion(final String eventVersion) { + this.eventVersion = eventVersion; + return this; + } + + public PipesKinesisEvent setEventSourceARN(final String eventSourceARN) { + this.eventSourceARN = eventSourceARN; + return this; + } + + public PipesKinesisEvent setAwsRegion(final String awsRegion) { + this.awsRegion = awsRegion; + return this; + } +} diff --git a/extensions/funqy/funqy-amazon-lambda/runtime/src/main/java/io/quarkus/funqy/lambda/model/pipes/BatchItemFailures.java b/extensions/funqy/funqy-amazon-lambda/runtime/src/main/java/io/quarkus/funqy/lambda/model/pipes/BatchItemFailures.java new file mode 100644 index 00000000000000..99110edfa4ecf3 --- /dev/null +++ b/extensions/funqy/funqy-amazon-lambda/runtime/src/main/java/io/quarkus/funqy/lambda/model/pipes/BatchItemFailures.java @@ -0,0 +1,14 @@ +package io.quarkus.funqy.lambda.model.pipes; + +public class BatchItemFailures { + + private final String itemIdentifier; + + public BatchItemFailures(final String itemIdentifier) { + this.itemIdentifier = itemIdentifier; + } + + public String getItemIdentifier() { + return itemIdentifier; + } +} diff --git a/extensions/funqy/funqy-amazon-lambda/runtime/src/main/java/io/quarkus/funqy/lambda/model/pipes/Response.java b/extensions/funqy/funqy-amazon-lambda/runtime/src/main/java/io/quarkus/funqy/lambda/model/pipes/Response.java new file mode 100644 index 00000000000000..20b817d1b91f28 --- /dev/null +++ b/extensions/funqy/funqy-amazon-lambda/runtime/src/main/java/io/quarkus/funqy/lambda/model/pipes/Response.java @@ -0,0 +1,16 @@ +package io.quarkus.funqy.lambda.model.pipes; + +import java.util.List; + +public class Response { + + private final List batchItemFailures; + + public Response(final List batchItemFailures) { + this.batchItemFailures = batchItemFailures; + } + + public List getBatchItemFailures() { + return batchItemFailures; + } +}