diff --git a/bom/application/pom.xml b/bom/application/pom.xml index 34b0452e2fe1f..b29490c1ca691 100644 --- a/bom/application/pom.xml +++ b/bom/application/pom.xml @@ -156,6 +156,7 @@ 1.2.3 3.11.4 2.15.2 + 3.0.0 3.1.0 1.0.0 1.9.23 @@ -5678,6 +5679,12 @@ + + io.cloudevents + cloudevents-api + ${cloudevents-api.version} + + com.microsoft.azure.functions azure-functions-java-library diff --git a/extensions/amazon-lambda/common-runtime/src/main/java/io/quarkus/amazon/lambda/runtime/AbstractLambdaPollLoop.java b/extensions/amazon-lambda/common-runtime/src/main/java/io/quarkus/amazon/lambda/runtime/AbstractLambdaPollLoop.java index c967b6064c5e3..a493e1489f64b 100644 --- a/extensions/amazon-lambda/common-runtime/src/main/java/io/quarkus/amazon/lambda/runtime/AbstractLambdaPollLoop.java +++ b/extensions/amazon-lambda/common-runtime/src/main/java/io/quarkus/amazon/lambda/runtime/AbstractLambdaPollLoop.java @@ -125,6 +125,7 @@ public void run() { URL url = AmazonLambdaApi.invocationResponse(baseUrl, requestId); if (isStream()) { HttpURLConnection responseConnection = responseStream(url); + responseConnection.setRequestProperty("Content-Type", "application/json"); if (running.get()) { processRequest(requestConnection.getInputStream(), responseConnection.getOutputStream(), createContext(requestConnection)); diff --git a/extensions/funqy/funqy-amazon-lambda/deployment/pom.xml b/extensions/funqy/funqy-amazon-lambda/deployment/pom.xml index 2a03684b9a675..159c1b235e5ad 100644 --- a/extensions/funqy/funqy-amazon-lambda/deployment/pom.xml +++ b/extensions/funqy/funqy-amazon-lambda/deployment/pom.xml @@ -29,6 +29,16 @@ io.quarkus quarkus-arc-deployment + + io.quarkus + quarkus-junit5-internal + test + + + io.rest-assured + rest-assured + test + diff --git a/extensions/funqy/funqy-amazon-lambda/deployment/src/main/java/io/quarkus/funqy/deployment/bindings/FunqyAmazonLambdaProcessor.java b/extensions/funqy/funqy-amazon-lambda/deployment/src/main/java/io/quarkus/funqy/deployment/bindings/FunqyAmazonLambdaProcessor.java new file mode 100644 index 0000000000000..ee4d950cf9b1d --- /dev/null +++ b/extensions/funqy/funqy-amazon-lambda/deployment/src/main/java/io/quarkus/funqy/deployment/bindings/FunqyAmazonLambdaProcessor.java @@ -0,0 +1,43 @@ +package io.quarkus.funqy.deployment.bindings; + +import com.amazonaws.services.lambda.runtime.events.DynamodbEvent; +import com.amazonaws.services.lambda.runtime.events.KinesisEvent; +import com.amazonaws.services.lambda.runtime.events.SNSEvent; +import com.amazonaws.services.lambda.runtime.events.SQSBatchResponse; +import com.amazonaws.services.lambda.runtime.events.SQSEvent; +import com.amazonaws.services.lambda.runtime.events.StreamsEventResponse; +import com.amazonaws.services.lambda.runtime.events.models.kinesis.Record; + +import io.quarkus.deployment.annotations.BuildProducer; +import io.quarkus.deployment.annotations.BuildStep; +import io.quarkus.deployment.builditem.nativeimage.ReflectiveClassBuildItem; +import io.quarkus.deployment.pkg.steps.NativeBuild; + +public class FunqyAmazonLambdaProcessor { + + @BuildStep(onlyIf = NativeBuild.class) + public void process(BuildProducer reflectiveClass) { + reflectiveClass.produce(ReflectiveClassBuildItem.builder( + // SQS + SQSEvent.class.getName(), + SQSEvent.SQSMessage.class.getName(), + SQSEvent.MessageAttribute.class.getName(), + SQSBatchResponse.class.getName(), + SQSBatchResponse.BatchItemFailure.class.getName(), + // SNS + SNSEvent.class.getName(), + SNSEvent.SNSRecord.class.getName(), + SNSEvent.SNS.class.getName(), + // Kinesis + KinesisEvent.class.getName(), + KinesisEvent.KinesisEventRecord.class.getName(), + Record.class.getName(), + StreamsEventResponse.class.getName(), + StreamsEventResponse.BatchItemFailure.class.getName(), + // DynamoDB + DynamodbEvent.class.getName(), + DynamodbEvent.DynamodbStreamRecord.class.getName() + ).constructors().methods().fields().build() + ); + } +} diff --git a/extensions/funqy/funqy-amazon-lambda/deployment/src/main/java/io/quarkus/funqy/deployment/bindings/FunqyLambdaBuildStep.java b/extensions/funqy/funqy-amazon-lambda/deployment/src/main/java/io/quarkus/funqy/deployment/bindings/FunqyLambdaBuildStep.java index 6a8bcbdc213d9..9158ec43dcbbb 100644 --- a/extensions/funqy/funqy-amazon-lambda/deployment/src/main/java/io/quarkus/funqy/deployment/bindings/FunqyLambdaBuildStep.java +++ b/extensions/funqy/funqy-amazon-lambda/deployment/src/main/java/io/quarkus/funqy/deployment/bindings/FunqyLambdaBuildStep.java @@ -20,6 +20,8 @@ import io.quarkus.funqy.deployment.FunctionBuildItem; import io.quarkus.funqy.deployment.FunctionInitializedBuildItem; import io.quarkus.funqy.lambda.FunqyLambdaBindingRecorder; +import io.quarkus.funqy.lambda.config.FunqyAmazonBuildTimeConfig; +import io.quarkus.funqy.lambda.config.FunqyAmazonConfig; import io.quarkus.funqy.runtime.FunqyConfig; import io.quarkus.runtime.LaunchMode; @@ -37,17 +39,19 @@ public void init(List functions, BuildProducer feature, Optional hasFunctions, LambdaObjectMapperInitializedBuildItem mapperDependency, - BeanContainerBuildItem beanContainer) throws Exception { + BeanContainerBuildItem beanContainer, + FunqyAmazonBuildTimeConfig buildTimeConfig) throws Exception { if (!hasFunctions.isPresent() || hasFunctions.get() == null) return; feature.produce(new FeatureBuildItem(FUNQY_AMAZON_LAMBDA)); - recorder.init(beanContainer.getValue()); + recorder.init(beanContainer.getValue(), buildTimeConfig); } @BuildStep @Record(RUNTIME_INIT) - public RuntimeComplete choose(FunqyConfig config, FunqyLambdaBindingRecorder recorder) { - recorder.chooseInvoker(config); + public RuntimeComplete choose(FunqyConfig config, FunqyAmazonConfig amazonConfig, + FunqyLambdaBindingRecorder recorder) { + recorder.chooseInvoker(config, amazonConfig); return new RuntimeComplete(); } diff --git a/extensions/funqy/funqy-amazon-lambda/deployment/src/test/java/io/quarkus/funqy/test/AnyFunctionTest.java b/extensions/funqy/funqy-amazon-lambda/deployment/src/test/java/io/quarkus/funqy/test/AnyFunctionTest.java new file mode 100644 index 0000000000000..ac505ad6c16a2 --- /dev/null +++ b/extensions/funqy/funqy-amazon-lambda/deployment/src/test/java/io/quarkus/funqy/test/AnyFunctionTest.java @@ -0,0 +1,48 @@ +package io.quarkus.funqy.test; + +import static io.quarkus.funqy.test.util.EventDataProvider.getData; + +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.extension.RegisterExtension; + +import io.quarkus.funqy.test.util.EventDataProvider; +import io.quarkus.test.QuarkusUnitTest; +import io.restassured.RestAssured; + +public class AnyFunctionTest { + @RegisterExtension + static QuarkusUnitTest test = new QuarkusUnitTest() + .withApplicationRoot((jar) -> jar + .addAsResource("any-function.properties", "application.properties") + .addAsResource("events/any", "events") + .addClasses(TestFunctions.class, Item.class, + EventDataProvider.class)); + + @Test + public void should_return_no_failures_if_processing_is_ok() { + // given + var body = getData("ok.json"); + + // when + var response = RestAssured.given().contentType("application/json") + .body(body) + .post("/"); + + // then + response.then().statusCode(204); + } + + @Test + public void should_return_one_failure_if_processing_fails() { + // given + var body = getData("fail.json"); + + // when + var response = RestAssured.given().contentType("application/json") + .body(body) + .post("/"); + + // then + response.then().statusCode(500); + } +} diff --git a/extensions/funqy/funqy-amazon-lambda/deployment/src/test/java/io/quarkus/funqy/test/CloudEventsEventFunctionTest.java b/extensions/funqy/funqy-amazon-lambda/deployment/src/test/java/io/quarkus/funqy/test/CloudEventsEventFunctionTest.java new file mode 100644 index 0000000000000..a7de253a34c42 --- /dev/null +++ b/extensions/funqy/funqy-amazon-lambda/deployment/src/test/java/io/quarkus/funqy/test/CloudEventsEventFunctionTest.java @@ -0,0 +1,62 @@ +package io.quarkus.funqy.test; + +import static io.quarkus.funqy.test.util.EventDataProvider.getData; +import static org.hamcrest.MatcherAssert.assertThat; +import static org.hamcrest.Matchers.empty; +import static org.hamcrest.Matchers.hasItem; +import static org.hamcrest.Matchers.hasSize; +import static org.hamcrest.Matchers.is; + +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.extension.RegisterExtension; + +import io.quarkus.funqy.test.model.BatchItemFailures; +import io.quarkus.funqy.test.model.ItemFailure; +import io.quarkus.funqy.test.util.EventDataProvider; +import io.quarkus.test.QuarkusUnitTest; +import io.restassured.RestAssured; + +public class CloudEventsEventFunctionTest { + @RegisterExtension + static QuarkusUnitTest test = new QuarkusUnitTest() + .overrideRuntimeConfigKey("quarkus.funqy.export", "cloudevents-function") + .withApplicationRoot((jar) -> jar + .addAsResource("item-function.properties", "application.properties") + .addAsResource("events/cloudevents", "events") + .addClasses(TestFunctions.class, Item.class, + BatchItemFailures.class, ItemFailure.class, + EventDataProvider.class)); + + @Test + public void should_return_no_failures_if_processing_is_ok() { + // given + var body = getData("ok.json"); + + // when + var response = RestAssured.given().contentType("application/json") + .body(body) + .post("/"); + + // then + var respBody = response.then().statusCode(200) + .extract().body().as(BatchItemFailures.class); + assertThat(respBody.batchItemFailures(), is(empty())); + } + + @Test + public void should_return_one_failure_if_processing_fails() { + // given + var body = getData("fail.json"); + + // when + var response = RestAssured.given().contentType("application/json") + .body(body) + .post("/"); + + // then + var respBody = response.then().statusCode(200) + .extract().body().as(BatchItemFailures.class); + assertThat(respBody.batchItemFailures(), hasSize(1)); + assertThat(respBody.batchItemFailures().stream().map(ItemFailure::itemIdentifier).toList(), hasItem("1")); + } +} diff --git a/extensions/funqy/funqy-amazon-lambda/deployment/src/test/java/io/quarkus/funqy/test/CloudEventsFunctionTest.java b/extensions/funqy/funqy-amazon-lambda/deployment/src/test/java/io/quarkus/funqy/test/CloudEventsFunctionTest.java new file mode 100644 index 0000000000000..bbac786e904db --- /dev/null +++ b/extensions/funqy/funqy-amazon-lambda/deployment/src/test/java/io/quarkus/funqy/test/CloudEventsFunctionTest.java @@ -0,0 +1,61 @@ +package io.quarkus.funqy.test; + +import static io.quarkus.funqy.test.util.EventDataProvider.getData; +import static org.hamcrest.MatcherAssert.assertThat; +import static org.hamcrest.Matchers.empty; +import static org.hamcrest.Matchers.hasItem; +import static org.hamcrest.Matchers.hasSize; +import static org.hamcrest.Matchers.is; + +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.extension.RegisterExtension; + +import io.quarkus.funqy.test.model.BatchItemFailures; +import io.quarkus.funqy.test.model.ItemFailure; +import io.quarkus.funqy.test.util.EventDataProvider; +import io.quarkus.test.QuarkusUnitTest; +import io.restassured.RestAssured; + +public class CloudEventsFunctionTest { + @RegisterExtension + static QuarkusUnitTest test = new QuarkusUnitTest() + .withApplicationRoot((jar) -> jar + .addAsResource("item-function.properties", "application.properties") + .addAsResource("events/cloudevents", "events") + .addClasses(TestFunctions.class, Item.class, + BatchItemFailures.class, ItemFailure.class, + EventDataProvider.class)); + + @Test + public void should_return_no_failures_if_processing_is_ok() { + // given + var body = getData("ok.json"); + + // when + var response = RestAssured.given().contentType("application/json") + .body(body) + .post("/"); + + // then + var respBody = response.then().statusCode(200) + .extract().body().as(BatchItemFailures.class); + assertThat(respBody.batchItemFailures(), is(empty())); + } + + @Test + public void should_return_one_failure_if_processing_fails() { + // given + var body = getData("fail.json"); + + // when + var response = RestAssured.given().contentType("application/json") + .body(body) + .post("/"); + + // then + var respBody = response.then().statusCode(200) + .extract().body().as(BatchItemFailures.class); + assertThat(respBody.batchItemFailures(), hasSize(1)); + assertThat(respBody.batchItemFailures().stream().map(ItemFailure::itemIdentifier).toList(), hasItem("1")); + } +} diff --git a/extensions/funqy/funqy-amazon-lambda/deployment/src/test/java/io/quarkus/funqy/test/DynamoDbEventFunctionTest.java b/extensions/funqy/funqy-amazon-lambda/deployment/src/test/java/io/quarkus/funqy/test/DynamoDbEventFunctionTest.java new file mode 100644 index 0000000000000..44739d1e7dd38 --- /dev/null +++ b/extensions/funqy/funqy-amazon-lambda/deployment/src/test/java/io/quarkus/funqy/test/DynamoDbEventFunctionTest.java @@ -0,0 +1,95 @@ +package io.quarkus.funqy.test; + +import static io.quarkus.funqy.test.util.EventDataProvider.getData; +import static org.hamcrest.MatcherAssert.assertThat; +import static org.hamcrest.Matchers.empty; +import static org.hamcrest.Matchers.hasItem; +import static org.hamcrest.Matchers.hasSize; +import static org.hamcrest.Matchers.is; + +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.extension.RegisterExtension; + +import io.quarkus.funqy.test.model.BatchItemFailures; +import io.quarkus.funqy.test.model.ItemFailure; +import io.quarkus.funqy.test.util.EventDataProvider; +import io.quarkus.test.QuarkusUnitTest; +import io.restassured.RestAssured; + +public class DynamoDbEventFunctionTest { + @RegisterExtension + static QuarkusUnitTest test = new QuarkusUnitTest() + .overrideRuntimeConfigKey("quarkus.funqy.export", "dynamodb-function") + .withApplicationRoot((jar) -> jar + .addAsResource("item-function.properties", "application.properties") + .addAsResource("events/dynamodb", "events") + .addClasses(TestFunctions.class, Item.class, + BatchItemFailures.class, ItemFailure.class, + EventDataProvider.class)); + + @Test + public void should_return_no_failures_if_processing_is_ok() { + // given + var body = getData("ok.json"); + + // when + var response = RestAssured.given().contentType("application/json") + .body(body) + .post("/"); + + // then + var respBody = response.then().statusCode(200) + .extract().body().as(BatchItemFailures.class); + assertThat(respBody.batchItemFailures(), is(empty())); + } + + @Test + public void should_return_one_failure_if_processing_fails() { + // given + var body = getData("fail.json"); + + // when + var response = RestAssured.given().contentType("application/json") + .body(body) + .post("/"); + + // then + var respBody = response.then().statusCode(200) + .extract().body().as(BatchItemFailures.class); + assertThat(respBody.batchItemFailures(), hasSize(1)); + assertThat(respBody.batchItemFailures().stream().map(ItemFailure::itemIdentifier).toList(), hasItem("1")); + } + + @Test + public void should_return_no_failures_if_processing_pipes_is_ok() { + // given + var body = getData("pipes-ok.json"); + + // when + var response = RestAssured.given().contentType("application/json") + .body(body) + .post("/"); + + // then + var respBody = response.then().statusCode(200) + .extract().body().as(BatchItemFailures.class); + assertThat(respBody.batchItemFailures(), is(empty())); + } + + @Test + public void should_return_one_failure_if_processing_pipes_fails() { + // given + var body = getData("pipes-fail.json"); + + // when + var response = RestAssured.given().contentType("application/json") + .body(body) + .post("/"); + + // then + var respBody = response.then().statusCode(200) + .extract().body().as(BatchItemFailures.class); + assertThat(respBody.batchItemFailures(), hasSize(1)); + assertThat(respBody.batchItemFailures().stream().map(ItemFailure::itemIdentifier).toList(), hasItem("1")); + } +} diff --git a/extensions/funqy/funqy-amazon-lambda/deployment/src/test/java/io/quarkus/funqy/test/DynamoDbFunctionTest.java b/extensions/funqy/funqy-amazon-lambda/deployment/src/test/java/io/quarkus/funqy/test/DynamoDbFunctionTest.java new file mode 100644 index 0000000000000..c7451eff51403 --- /dev/null +++ b/extensions/funqy/funqy-amazon-lambda/deployment/src/test/java/io/quarkus/funqy/test/DynamoDbFunctionTest.java @@ -0,0 +1,64 @@ +package io.quarkus.funqy.test; + +import static io.quarkus.funqy.test.util.EventDataProvider.getData; +import static org.hamcrest.MatcherAssert.assertThat; +import static org.hamcrest.Matchers.hasItems; +import static org.hamcrest.Matchers.hasSize; + +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.extension.RegisterExtension; + +import io.quarkus.funqy.test.model.BatchItemFailures; +import io.quarkus.funqy.test.model.ItemFailure; +import io.quarkus.funqy.test.util.EventDataProvider; +import io.quarkus.test.QuarkusUnitTest; +import io.restassured.RestAssured; + +public class DynamoDbFunctionTest { + @RegisterExtension + static QuarkusUnitTest test = new QuarkusUnitTest() + .withApplicationRoot((jar) -> jar + .addAsResource("item-function.properties", "application.properties") + .addAsResource("events/dynamodb", "events") + .addClasses(TestFunctions.class, Item.class, + BatchItemFailures.class, ItemFailure.class, + EventDataProvider.class)); + + @Test + public void should_fail_on_dynamodb_event_without_dynamodb_event_type() { + // given + var body = getData("ok.json"); + + // when + var response = RestAssured.given().contentType("application/json") + .body(body) + .post("/"); + + // then + // It is not supported to transform the DynamoDB event record to an internal model. Therefore, if somebody + // would try this, the lambda would return every message as failure in batch item failures and log an error. + var respBody = response.then().statusCode(200) + .extract().body().as(BatchItemFailures.class); + assertThat(respBody.batchItemFailures(), hasSize(2)); + assertThat(respBody.batchItemFailures().stream().map(ItemFailure::itemIdentifier).toList(), hasItems("1", "2")); + } + + @Test + public void should_fail_on_dynamodb_event_via_pipes_without_dynamodb_event_type() { + // given + var body = getData("pipes-ok.json"); + + // when + var response = RestAssured.given().contentType("application/json") + .body(body) + .post("/"); + + // then + // It is not supported to transform the DynamoDB event record to an internal model. Therefore, if somebody + // would try this, the lambda would return every message as failure in batch item failures and log an error. + var respBody = response.then().statusCode(200) + .extract().body().as(BatchItemFailures.class); + assertThat(respBody.batchItemFailures(), hasSize(2)); + assertThat(respBody.batchItemFailures().stream().map(ItemFailure::itemIdentifier).toList(), hasItems("1", "2")); + } +} diff --git a/extensions/funqy/funqy-amazon-lambda/deployment/src/test/java/io/quarkus/funqy/test/Item.java b/extensions/funqy/funqy-amazon-lambda/deployment/src/test/java/io/quarkus/funqy/test/Item.java new file mode 100644 index 0000000000000..26975491bde6d --- /dev/null +++ b/extensions/funqy/funqy-amazon-lambda/deployment/src/test/java/io/quarkus/funqy/test/Item.java @@ -0,0 +1,24 @@ +package io.quarkus.funqy.test; + +public class Item { + + String message; + + boolean throwError; + + public String getMessage() { + return message; + } + + public void setMessage(final String message) { + this.message = message; + } + + public boolean isThrowError() { + return throwError; + } + + public void setThrowError(final boolean throwError) { + this.throwError = throwError; + } +} diff --git a/extensions/funqy/funqy-amazon-lambda/deployment/src/test/java/io/quarkus/funqy/test/KinesisFunctionTest.java b/extensions/funqy/funqy-amazon-lambda/deployment/src/test/java/io/quarkus/funqy/test/KinesisFunctionTest.java new file mode 100644 index 0000000000000..4371d5e47d36d --- /dev/null +++ b/extensions/funqy/funqy-amazon-lambda/deployment/src/test/java/io/quarkus/funqy/test/KinesisFunctionTest.java @@ -0,0 +1,94 @@ +package io.quarkus.funqy.test; + +import static io.quarkus.funqy.test.util.EventDataProvider.getData; +import static org.hamcrest.MatcherAssert.assertThat; +import static org.hamcrest.Matchers.empty; +import static org.hamcrest.Matchers.hasItem; +import static org.hamcrest.Matchers.hasSize; +import static org.hamcrest.Matchers.is; + +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.extension.RegisterExtension; + +import io.quarkus.funqy.test.model.BatchItemFailures; +import io.quarkus.funqy.test.model.ItemFailure; +import io.quarkus.funqy.test.util.EventDataProvider; +import io.quarkus.test.QuarkusUnitTest; +import io.restassured.RestAssured; + +public class KinesisFunctionTest { + @RegisterExtension + static QuarkusUnitTest test = new QuarkusUnitTest() + .withApplicationRoot((jar) -> jar + .addAsResource("item-function.properties", "application.properties") + .addAsResource("events/kinesis", "events") + .addClasses(TestFunctions.class, Item.class, + BatchItemFailures.class, ItemFailure.class, + EventDataProvider.class)); + + @Test + public void should_return_no_failures_if_processing_is_ok() { + // given + var body = getData("ok.json"); + + // when + var response = RestAssured.given().contentType("application/json") + .body(body) + .post("/"); + + // then + var respBody = response.then().statusCode(200) + .extract().body().as(BatchItemFailures.class); + assertThat(respBody.batchItemFailures(), is(empty())); + } + + @Test + public void should_return_one_failure_if_processing_fails() { + // given + var body = getData("fail.json"); + + // when + var response = RestAssured.given().contentType("application/json") + .body(body) + .post("/"); + + // then + var respBody = response.then().statusCode(200) + .extract().body().as(BatchItemFailures.class); + assertThat(respBody.batchItemFailures(), hasSize(1)); + assertThat(respBody.batchItemFailures().stream().map(ItemFailure::itemIdentifier).toList(), hasItem("1")); + } + + @Test + public void should_return_no_failures_if_processing_pipes_is_ok() { + // given + var body = getData("pipes-ok.json"); + + // when + var response = RestAssured.given().contentType("application/json") + .body(body) + .post("/"); + + // then + var respBody = response.then().statusCode(200) + .extract().body().as(BatchItemFailures.class); + assertThat(respBody.batchItemFailures(), is(empty())); + } + + @Test + public void should_return_one_failure_if_processing_pipes_fails() { + // given + var body = getData("pipes-fail.json"); + + // when + var response = RestAssured.given().contentType("application/json") + .body(body) + .post("/"); + + // then + var respBody = response.then().statusCode(200) + .extract().body().as(BatchItemFailures.class); + assertThat(respBody.batchItemFailures(), hasSize(1)); + assertThat(respBody.batchItemFailures().stream().map(ItemFailure::itemIdentifier).toList(), hasItem("1")); + } +} diff --git a/extensions/funqy/funqy-amazon-lambda/deployment/src/test/java/io/quarkus/funqy/test/SnsEventFunctionTest.java b/extensions/funqy/funqy-amazon-lambda/deployment/src/test/java/io/quarkus/funqy/test/SnsEventFunctionTest.java new file mode 100644 index 0000000000000..c01be98ef1ad4 --- /dev/null +++ b/extensions/funqy/funqy-amazon-lambda/deployment/src/test/java/io/quarkus/funqy/test/SnsEventFunctionTest.java @@ -0,0 +1,50 @@ +package io.quarkus.funqy.test; + +import static io.quarkus.funqy.test.util.EventDataProvider.getData; + +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.extension.RegisterExtension; + +import io.quarkus.funqy.test.util.EventDataProvider; +import io.quarkus.test.QuarkusUnitTest; +import io.restassured.RestAssured; + +public class SnsEventFunctionTest { + @RegisterExtension + static QuarkusUnitTest test = new QuarkusUnitTest() + .overrideRuntimeConfigKey("quarkus.funqy.export", "sns-function") + .withApplicationRoot((jar) -> jar + .addAsResource("item-function.properties", "application.properties") + .addAsResource("events/sns", "events") + .addClasses(TestFunctions.class, Item.class, + EventDataProvider.class)); + + @Test + public void should_return_no_failures_if_processing_is_ok() { + // given + var body = getData("ok.json"); + + // when + var response = RestAssured.given().contentType("application/json") + .body(body) + .post("/"); + + // then + response.then().statusCode(204); + } + + @Test + public void should_return_one_failure_if_processing_fails() { + // given + var body = getData("fail.json"); + + // when + var response = RestAssured.given().contentType("application/json") + .body(body) + .post("/"); + + // then + // SNS triggers have no error handling. + response.then().statusCode(204); + } +} diff --git a/extensions/funqy/funqy-amazon-lambda/deployment/src/test/java/io/quarkus/funqy/test/SnsFunctionTest.java b/extensions/funqy/funqy-amazon-lambda/deployment/src/test/java/io/quarkus/funqy/test/SnsFunctionTest.java new file mode 100644 index 0000000000000..5c0d7956077e3 --- /dev/null +++ b/extensions/funqy/funqy-amazon-lambda/deployment/src/test/java/io/quarkus/funqy/test/SnsFunctionTest.java @@ -0,0 +1,49 @@ +package io.quarkus.funqy.test; + +import static io.quarkus.funqy.test.util.EventDataProvider.getData; + +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.extension.RegisterExtension; + +import io.quarkus.funqy.test.util.EventDataProvider; +import io.quarkus.test.QuarkusUnitTest; +import io.restassured.RestAssured; + +public class SnsFunctionTest { + @RegisterExtension + static QuarkusUnitTest test = new QuarkusUnitTest() + .withApplicationRoot((jar) -> jar + .addAsResource("item-function.properties", "application.properties") + .addAsResource("events/sns", "events") + .addClasses(TestFunctions.class, Item.class, + EventDataProvider.class)); + + @Test + public void should_return_no_failures_if_processing_is_ok() { + // given + var body = getData("ok.json"); + + // when + var response = RestAssured.given().contentType("application/json") + .body(body) + .post("/"); + + // then + response.then().statusCode(204); + } + + @Test + public void should_return_one_failure_if_processing_fails() { + // given + var body = getData("fail.json"); + + // when + var response = RestAssured.given().contentType("application/json") + .body(body) + .post("/"); + + // then + // SNS triggers have no error handling. + response.then().statusCode(204); + } +} diff --git a/extensions/funqy/funqy-amazon-lambda/deployment/src/test/java/io/quarkus/funqy/test/SqsEventFunctionTest.java b/extensions/funqy/funqy-amazon-lambda/deployment/src/test/java/io/quarkus/funqy/test/SqsEventFunctionTest.java new file mode 100644 index 0000000000000..40f77665e6c4c --- /dev/null +++ b/extensions/funqy/funqy-amazon-lambda/deployment/src/test/java/io/quarkus/funqy/test/SqsEventFunctionTest.java @@ -0,0 +1,95 @@ +package io.quarkus.funqy.test; + +import static io.quarkus.funqy.test.util.EventDataProvider.getData; +import static org.hamcrest.MatcherAssert.assertThat; +import static org.hamcrest.Matchers.empty; +import static org.hamcrest.Matchers.hasItem; +import static org.hamcrest.Matchers.hasSize; +import static org.hamcrest.Matchers.is; + +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.extension.RegisterExtension; + +import io.quarkus.funqy.test.model.BatchItemFailures; +import io.quarkus.funqy.test.model.ItemFailure; +import io.quarkus.funqy.test.util.EventDataProvider; +import io.quarkus.test.QuarkusUnitTest; +import io.restassured.RestAssured; + +public class SqsEventFunctionTest { + @RegisterExtension + static QuarkusUnitTest test = new QuarkusUnitTest() + .overrideRuntimeConfigKey("quarkus.funqy.export", "sqs-function") + .withApplicationRoot((jar) -> jar + .addAsResource("item-function.properties", "application.properties") + .addAsResource("events/sqs", "events") + .addClasses(TestFunctions.class, Item.class, + BatchItemFailures.class, ItemFailure.class, + EventDataProvider.class)); + + @Test + public void should_return_no_failures_if_processing_is_ok() { + // given + var body = getData("ok.json"); + + // when + var response = RestAssured.given().contentType("application/json") + .body(body) + .post("/"); + + // then + var respBody = response.then().statusCode(200) + .extract().body().as(BatchItemFailures.class); + assertThat(respBody.batchItemFailures(), is(empty())); + } + + @Test + public void should_return_one_failure_if_processing_fails() { + // given + var body = getData("fail.json"); + + // when + var response = RestAssured.given().contentType("application/json") + .body(body) + .post("/"); + + // then + var respBody = response.then().statusCode(200) + .extract().body().as(BatchItemFailures.class); + assertThat(respBody.batchItemFailures(), hasSize(1)); + assertThat(respBody.batchItemFailures().stream().map(ItemFailure::itemIdentifier).toList(), hasItem("1")); + } + + @Test + public void should_return_no_failures_if_processing_pipes_is_ok() { + // given + var body = getData("pipes-ok.json"); + + // when + var response = RestAssured.given().contentType("application/json") + .body(body) + .post("/"); + + // then + var respBody = response.then().statusCode(200) + .extract().body().as(BatchItemFailures.class); + assertThat(respBody.batchItemFailures(), is(empty())); + } + + @Test + public void should_return_one_failure_if_processing_pipes_fails() { + // given + var body = getData("pipes-fail.json"); + + // when + var response = RestAssured.given().contentType("application/json") + .body(body) + .post("/"); + + // then + var respBody = response.then().statusCode(200) + .extract().body().as(BatchItemFailures.class); + assertThat(respBody.batchItemFailures(), hasSize(1)); + assertThat(respBody.batchItemFailures().stream().map(ItemFailure::itemIdentifier).toList(), hasItem("1")); + } +} diff --git a/extensions/funqy/funqy-amazon-lambda/deployment/src/test/java/io/quarkus/funqy/test/SqsFunctionNoBatchItemFailuresTest.java b/extensions/funqy/funqy-amazon-lambda/deployment/src/test/java/io/quarkus/funqy/test/SqsFunctionNoBatchItemFailuresTest.java new file mode 100644 index 0000000000000..89bf3e585587d --- /dev/null +++ b/extensions/funqy/funqy-amazon-lambda/deployment/src/test/java/io/quarkus/funqy/test/SqsFunctionNoBatchItemFailuresTest.java @@ -0,0 +1,48 @@ +package io.quarkus.funqy.test; + +import static io.quarkus.funqy.test.util.EventDataProvider.getData; + +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.extension.RegisterExtension; + +import io.quarkus.funqy.test.util.EventDataProvider; +import io.quarkus.test.QuarkusUnitTest; +import io.restassured.RestAssured; + +public class SqsFunctionNoBatchItemFailuresTest { + @RegisterExtension + static QuarkusUnitTest test = new QuarkusUnitTest() + .withApplicationRoot((jar) -> jar + .addAsResource("no-batch-function.properties", "application.properties") + .addAsResource("events/sqs", "events") + .addClasses(TestFunctions.class, Item.class, + EventDataProvider.class)); + + @Test + public void should_return_no_failures_if_processing_is_ok() { + // given + var body = getData("ok.json"); + + // when + var response = RestAssured.given().contentType("application/json") + .body(body) + .post("/"); + + // then + response.then().statusCode(204); + } + + @Test + public void should_return_one_failure_if_processing_fails() { + // given + var body = getData("fail.json"); + + // when + var response = RestAssured.given().contentType("application/json") + .body(body) + .post("/"); + + // then + response.then().statusCode(204); + } +} diff --git a/extensions/funqy/funqy-amazon-lambda/deployment/src/test/java/io/quarkus/funqy/test/SqsFunctionTest.java b/extensions/funqy/funqy-amazon-lambda/deployment/src/test/java/io/quarkus/funqy/test/SqsFunctionTest.java new file mode 100644 index 0000000000000..d7a6c367ea5e5 --- /dev/null +++ b/extensions/funqy/funqy-amazon-lambda/deployment/src/test/java/io/quarkus/funqy/test/SqsFunctionTest.java @@ -0,0 +1,94 @@ +package io.quarkus.funqy.test; + +import static io.quarkus.funqy.test.util.EventDataProvider.getData; +import static org.hamcrest.MatcherAssert.assertThat; +import static org.hamcrest.Matchers.empty; +import static org.hamcrest.Matchers.hasItem; +import static org.hamcrest.Matchers.hasSize; +import static org.hamcrest.Matchers.is; + +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.extension.RegisterExtension; + +import io.quarkus.funqy.test.model.BatchItemFailures; +import io.quarkus.funqy.test.model.ItemFailure; +import io.quarkus.funqy.test.util.EventDataProvider; +import io.quarkus.test.QuarkusUnitTest; +import io.restassured.RestAssured; + +public class SqsFunctionTest { + @RegisterExtension + static QuarkusUnitTest test = new QuarkusUnitTest() + .withApplicationRoot((jar) -> jar + .addAsResource("item-function.properties", "application.properties") + .addAsResource("events/sqs", "events") + .addClasses(TestFunctions.class, Item.class, + BatchItemFailures.class, ItemFailure.class, + EventDataProvider.class)); + + @Test + public void should_return_no_failures_if_processing_is_ok() { + // given + var body = getData("ok.json"); + + // when + var response = RestAssured.given().contentType("application/json") + .body(body) + .post("/"); + + // then + var respBody = response.then().statusCode(200) + .extract().body().as(BatchItemFailures.class); + assertThat(respBody.batchItemFailures(), is(empty())); + } + + @Test + public void should_return_one_failure_if_processing_fails() { + // given + var body = getData("fail.json"); + + // when + var response = RestAssured.given().contentType("application/json") + .body(body) + .post("/"); + + // then + var respBody = response.then().statusCode(200) + .extract().body().as(BatchItemFailures.class); + assertThat(respBody.batchItemFailures(), hasSize(1)); + assertThat(respBody.batchItemFailures().stream().map(ItemFailure::itemIdentifier).toList(), hasItem("1")); + } + + @Test + public void should_return_no_failures_if_processing_pipes_is_ok() { + // given + var body = getData("pipes-ok.json"); + + // when + var response = RestAssured.given().contentType("application/json") + .body(body) + .post("/"); + + // then + var respBody = response.then().statusCode(200) + .extract().body().as(BatchItemFailures.class); + assertThat(respBody.batchItemFailures(), is(empty())); + } + + @Test + public void should_return_one_failure_if_processing_pipes_fails() { + // given + var body = getData("pipes-fail.json"); + + // when + var response = RestAssured.given().contentType("application/json") + .body(body) + .post("/"); + + // then + var respBody = response.then().statusCode(200) + .extract().body().as(BatchItemFailures.class); + assertThat(respBody.batchItemFailures(), hasSize(1)); + assertThat(respBody.batchItemFailures().stream().map(ItemFailure::itemIdentifier).toList(), hasItem("1")); + } +} diff --git a/extensions/funqy/funqy-amazon-lambda/deployment/src/test/java/io/quarkus/funqy/test/TestFunctions.java b/extensions/funqy/funqy-amazon-lambda/deployment/src/test/java/io/quarkus/funqy/test/TestFunctions.java new file mode 100644 index 0000000000000..f8cb6f8582e29 --- /dev/null +++ b/extensions/funqy/funqy-amazon-lambda/deployment/src/test/java/io/quarkus/funqy/test/TestFunctions.java @@ -0,0 +1,63 @@ +package io.quarkus.funqy.test; + +import java.nio.charset.StandardCharsets; + +import com.amazonaws.services.lambda.runtime.events.DynamodbEvent; +import com.amazonaws.services.lambda.runtime.events.KinesisEvent; +import com.amazonaws.services.lambda.runtime.events.SNSEvent; +import com.amazonaws.services.lambda.runtime.events.SQSEvent; + +import io.cloudevents.CloudEvent; +import io.quarkus.funqy.Funq; +import io.smallrye.mutiny.Uni; + +public class TestFunctions { + + @Funq("item-function") + public Uni itemFunction(Item item) { + if (item.isThrowError()) { + return Uni.createFrom().failure(new IllegalArgumentException("This is an expected error.")); + } + return Uni.createFrom().voidItem(); + } + + @Funq("sqs-function") + public Uni sqsFunction(SQSEvent.SQSMessage msg) { + if (msg.getBody().contains("true")) { + return Uni.createFrom().failure(new IllegalArgumentException("This is an expected error.")); + } + return Uni.createFrom().voidItem(); + } + + @Funq("sns-function") + public Uni snsFunction(SNSEvent.SNSRecord msg) { + if (msg.getSNS().getMessage().contains("true")) { + return Uni.createFrom().failure(new IllegalArgumentException("This is an expected error.")); + } + return Uni.createFrom().voidItem(); + } + + @Funq("cloudevents-function") + public Uni cloudEventsFunction(CloudEvent msg) { + if (new String(msg.getData().toBytes(), StandardCharsets.UTF_8).contains("true")) { + return Uni.createFrom().failure(new IllegalArgumentException("This is an expected error.")); + } + return Uni.createFrom().voidItem(); + } + + @Funq("kinesis-function") + public Uni kinesisFunction(KinesisEvent.Record msg) { + if (StandardCharsets.UTF_8.decode(msg.getData()).toString().contains("true")) { + return Uni.createFrom().failure(new IllegalArgumentException("This is an expected error.")); + } + return Uni.createFrom().voidItem(); + } + + @Funq("dynamodb-function") + public Uni dynamodbFunction(DynamodbEvent.DynamodbStreamRecord msg) { + if (msg.getDynamodb().getNewImage().get("ThrowError").getBOOL()) { + return Uni.createFrom().failure(new IllegalArgumentException("This is an expected error.")); + } + return Uni.createFrom().voidItem(); + } +} diff --git a/extensions/funqy/funqy-amazon-lambda/deployment/src/test/java/io/quarkus/funqy/test/model/BatchItemFailures.java b/extensions/funqy/funqy-amazon-lambda/deployment/src/test/java/io/quarkus/funqy/test/model/BatchItemFailures.java new file mode 100644 index 0000000000000..4914a34a5f801 --- /dev/null +++ b/extensions/funqy/funqy-amazon-lambda/deployment/src/test/java/io/quarkus/funqy/test/model/BatchItemFailures.java @@ -0,0 +1,6 @@ +package io.quarkus.funqy.test.model; + +import java.util.List; + +public record BatchItemFailures(List batchItemFailures) { +} diff --git a/extensions/funqy/funqy-amazon-lambda/deployment/src/test/java/io/quarkus/funqy/test/model/ItemFailure.java b/extensions/funqy/funqy-amazon-lambda/deployment/src/test/java/io/quarkus/funqy/test/model/ItemFailure.java new file mode 100644 index 0000000000000..bd08b653841e2 --- /dev/null +++ b/extensions/funqy/funqy-amazon-lambda/deployment/src/test/java/io/quarkus/funqy/test/model/ItemFailure.java @@ -0,0 +1,4 @@ +package io.quarkus.funqy.test.model; + +public record ItemFailure(String itemIdentifier) { +} diff --git a/extensions/funqy/funqy-amazon-lambda/deployment/src/test/java/io/quarkus/funqy/test/util/EventDataProvider.java b/extensions/funqy/funqy-amazon-lambda/deployment/src/test/java/io/quarkus/funqy/test/util/EventDataProvider.java new file mode 100644 index 0000000000000..17e1efadc5fe1 --- /dev/null +++ b/extensions/funqy/funqy-amazon-lambda/deployment/src/test/java/io/quarkus/funqy/test/util/EventDataProvider.java @@ -0,0 +1,19 @@ +package io.quarkus.funqy.test.util; + +import java.io.IOException; +import java.nio.charset.Charset; + +import org.apache.commons.io.IOUtils; + +public class EventDataProvider { + + public static String getData(String path) { + try { + return IOUtils.toString( + EventDataProvider.class.getClassLoader().getResourceAsStream("events/" + path), + Charset.defaultCharset()); + } catch (IOException e) { + throw new RuntimeException(e); + } + } +} diff --git a/extensions/funqy/funqy-amazon-lambda/deployment/src/test/resources/any-function.properties b/extensions/funqy/funqy-amazon-lambda/deployment/src/test/resources/any-function.properties new file mode 100644 index 0000000000000..e9be9dfb08bdd --- /dev/null +++ b/extensions/funqy/funqy-amazon-lambda/deployment/src/test/resources/any-function.properties @@ -0,0 +1 @@ +quarkus.funqy.export=item-function diff --git a/extensions/funqy/funqy-amazon-lambda/deployment/src/test/resources/events/any/fail.json b/extensions/funqy/funqy-amazon-lambda/deployment/src/test/resources/events/any/fail.json new file mode 100644 index 0000000000000..2ec250d2e81a1 --- /dev/null +++ b/extensions/funqy/funqy-amazon-lambda/deployment/src/test/resources/events/any/fail.json @@ -0,0 +1,4 @@ +{ + "message": "hello", + "throwError": true +} \ No newline at end of file diff --git a/extensions/funqy/funqy-amazon-lambda/deployment/src/test/resources/events/any/ok.json b/extensions/funqy/funqy-amazon-lambda/deployment/src/test/resources/events/any/ok.json new file mode 100644 index 0000000000000..47ae6aedbde1d --- /dev/null +++ b/extensions/funqy/funqy-amazon-lambda/deployment/src/test/resources/events/any/ok.json @@ -0,0 +1,4 @@ +{ + "message": "hello", + "throwError": false +} \ No newline at end of file diff --git a/extensions/funqy/funqy-amazon-lambda/deployment/src/test/resources/events/cloudevents/fail.json b/extensions/funqy/funqy-amazon-lambda/deployment/src/test/resources/events/cloudevents/fail.json new file mode 100644 index 0000000000000..62f32e16345e3 --- /dev/null +++ b/extensions/funqy/funqy-amazon-lambda/deployment/src/test/resources/events/cloudevents/fail.json @@ -0,0 +1,26 @@ +[ + { + "specversion" : "1.0", + "type" : "com.github.pull_request.opened", + "source" : "https://github.com/cloudevents/spec/pull", + "subject" : "123", + "id" : "1", + "time" : "2018-04-05T17:31:00Z", + "comexampleextension1" : "value", + "comexampleothervalue" : 5, + "datacontenttype" : "text/plain", + "data" : "{\"message\":\"hello\",\"throwError\":true}" + }, + { + "specversion" : "1.0", + "type" : "com.github.pull_request.opened", + "source" : "https://github.com/cloudevents/spec/pull", + "subject" : "123", + "id" : "2", + "time" : "2018-04-05T17:31:00Z", + "comexampleextension1" : "value", + "comexampleothervalue" : 5, + "datacontenttype" : "text/plain", + "data" : "{\"message\":\"fail\",\"throwError\":false}" + } +] \ No newline at end of file diff --git a/extensions/funqy/funqy-amazon-lambda/deployment/src/test/resources/events/cloudevents/ok.json b/extensions/funqy/funqy-amazon-lambda/deployment/src/test/resources/events/cloudevents/ok.json new file mode 100644 index 0000000000000..e49c9ddcece01 --- /dev/null +++ b/extensions/funqy/funqy-amazon-lambda/deployment/src/test/resources/events/cloudevents/ok.json @@ -0,0 +1,41 @@ +[ + { + "specversion" : "1.0", + "type" : "com.github.pull_request.opened", + "source" : "https://github.com/cloudevents/spec/pull", + "subject" : "123", + "id" : "1", + "time" : "2018-04-05T17:31:00Z", + "comexampleextension1" : "value", + "comexampleothervalue" : 5, + "datacontenttype" : "text/plain", + "data" : "{\"message\":\"hello\",\"throwError\":false}" + }, + { + "specversion" : "1.0", + "type" : "com.github.pull_request.opened", + "source" : "https://github.com/cloudevents/spec/pull", + "subject" : "123", + "id" : "2", + "time" : "2018-04-05T17:31:00Z", + "comexampleextension1" : "value", + "comexampleothervalue" : 5, + "datacontenttype" : "application/json", + "data" : { + "message": "ok", + "throwError": false + } + }, + { + "specversion" : "1.0", + "type" : "com.github.pull_request.opened", + "source" : "https://github.com/cloudevents/spec/pull", + "subject" : "123", + "id" : "3", + "time" : "2018-04-05T17:31:00Z", + "comexampleextension1" : "value", + "comexampleothervalue" : 5, + "datacontenttype" : "text/xml", + "data_base64" : "eyJtZXNzYWdlIjoiZmFpbCIsInRocm93RXJyb3IiOmZhbHNlfQ==" + } +] \ No newline at end of file diff --git a/extensions/funqy/funqy-amazon-lambda/deployment/src/test/resources/events/dynamodb/fail.json b/extensions/funqy/funqy-amazon-lambda/deployment/src/test/resources/events/dynamodb/fail.json new file mode 100644 index 0000000000000..9bb03b7bc1489 --- /dev/null +++ b/extensions/funqy/funqy-amazon-lambda/deployment/src/test/resources/events/dynamodb/fail.json @@ -0,0 +1,62 @@ +{ + "Records": [ + { + "eventID": "1", + "eventVersion": "1.0", + "dynamodb": { + "Keys": { + "Id": { + "N": "1" + } + }, + "NewImage": { + "Message": { + "S": "hello" + }, + "ThrowError": { + "BOOL": true + }, + "Id": { + "N": "1" + } + }, + "StreamViewType": "NEW_AND_OLD_IMAGES", + "SequenceNumber": "1", + "SizeBytes": 26 + }, + "awsRegion": "us-west-2", + "eventName": "INSERT", + "eventSourceARN": "arn:aws:dynamodb:us-east-2:123456789012:table/my-table/stream/2023-06-10T19:26:16.525", + "eventSource": "aws:dynamodb" + }, + { + "eventID": "2", + "eventVersion": "1.0", + "dynamodb": { + "NewImage": { + "Message": { + "S": "fail" + }, + "ThrowError": { + "BOOL": false + }, + "Id": { + "N": "2" + } + }, + "SequenceNumber": "2", + "Keys": { + "Id": { + "N": "101" + } + }, + "SizeBytes": 59, + "StreamViewType": "NEW_AND_OLD_IMAGES" + }, + "awsRegion": "us-west-2", + "eventName": "INSERT", + "eventSourceARN": "arn:aws:dynamodb:us-east-2:123456789012:table/my-table/stream/2023-06-10T19:26:16.525", + "eventSource": "aws:dynamodb" + } + ] +} \ No newline at end of file diff --git a/extensions/funqy/funqy-amazon-lambda/deployment/src/test/resources/events/dynamodb/ok.json b/extensions/funqy/funqy-amazon-lambda/deployment/src/test/resources/events/dynamodb/ok.json new file mode 100644 index 0000000000000..4cc6db92f700e --- /dev/null +++ b/extensions/funqy/funqy-amazon-lambda/deployment/src/test/resources/events/dynamodb/ok.json @@ -0,0 +1,62 @@ +{ + "Records": [ + { + "eventID": "1", + "eventVersion": "1.0", + "dynamodb": { + "Keys": { + "Id": { + "N": "1" + } + }, + "NewImage": { + "Message": { + "S": "hello" + }, + "ThrowError": { + "BOOL": false + }, + "Id": { + "N": "1" + } + }, + "StreamViewType": "NEW_AND_OLD_IMAGES", + "SequenceNumber": "1", + "SizeBytes": 26 + }, + "awsRegion": "us-west-2", + "eventName": "INSERT", + "eventSourceARN": "arn:aws:dynamodb:us-east-2:123456789012:table/my-table/stream/2023-06-10T19:26:16.525", + "eventSource": "aws:dynamodb" + }, + { + "eventID": "2", + "eventVersion": "1.0", + "dynamodb": { + "NewImage": { + "Message": { + "S": "fail" + }, + "ThrowError": { + "BOOL": false + }, + "Id": { + "N": "2" + } + }, + "SequenceNumber": "2", + "Keys": { + "Id": { + "N": "101" + } + }, + "SizeBytes": 59, + "StreamViewType": "NEW_AND_OLD_IMAGES" + }, + "awsRegion": "us-west-2", + "eventName": "INSERT", + "eventSourceARN": "arn:aws:dynamodb:us-east-2:123456789012:table/my-table/stream/2023-06-10T19:26:16.525", + "eventSource": "aws:dynamodb" + } + ] +} \ No newline at end of file diff --git a/extensions/funqy/funqy-amazon-lambda/deployment/src/test/resources/events/dynamodb/pipes-fail.json b/extensions/funqy/funqy-amazon-lambda/deployment/src/test/resources/events/dynamodb/pipes-fail.json new file mode 100644 index 0000000000000..98d791b29e8b5 --- /dev/null +++ b/extensions/funqy/funqy-amazon-lambda/deployment/src/test/resources/events/dynamodb/pipes-fail.json @@ -0,0 +1,60 @@ +[ + { + "eventID": "1", + "eventVersion": "1.0", + "dynamodb": { + "Keys": { + "Id": { + "N": "1" + } + }, + "NewImage": { + "Message": { + "S": "hello" + }, + "ThrowError": { + "BOOL": true + }, + "Id": { + "N": "1" + } + }, + "StreamViewType": "NEW_AND_OLD_IMAGES", + "SequenceNumber": "1", + "SizeBytes": 26 + }, + "awsRegion": "us-west-2", + "eventName": "INSERT", + "eventSourceARN": "arn:aws:dynamodb:us-east-2:123456789012:table/my-table/stream/2023-06-10T19:26:16.525", + "eventSource": "aws:dynamodb" + }, + { + "eventID": "2", + "eventVersion": "1.0", + "dynamodb": { + "NewImage": { + "Message": { + "S": "fail" + }, + "ThrowError": { + "BOOL": false + }, + "Id": { + "N": "2" + } + }, + "SequenceNumber": "2", + "Keys": { + "Id": { + "N": "101" + } + }, + "SizeBytes": 59, + "StreamViewType": "NEW_AND_OLD_IMAGES" + }, + "awsRegion": "us-west-2", + "eventName": "INSERT", + "eventSourceARN": "arn:aws:dynamodb:us-east-2:123456789012:table/my-table/stream/2023-06-10T19:26:16.525", + "eventSource": "aws:dynamodb" + } +] \ No newline at end of file diff --git a/extensions/funqy/funqy-amazon-lambda/deployment/src/test/resources/events/dynamodb/pipes-ok.json b/extensions/funqy/funqy-amazon-lambda/deployment/src/test/resources/events/dynamodb/pipes-ok.json new file mode 100644 index 0000000000000..d0d3816b1496c --- /dev/null +++ b/extensions/funqy/funqy-amazon-lambda/deployment/src/test/resources/events/dynamodb/pipes-ok.json @@ -0,0 +1,60 @@ +[ + { + "eventID": "1", + "eventVersion": "1.0", + "dynamodb": { + "Keys": { + "Id": { + "N": "1" + } + }, + "NewImage": { + "Message": { + "S": "hello" + }, + "ThrowError": { + "BOOL": false + }, + "Id": { + "N": "1" + } + }, + "StreamViewType": "NEW_AND_OLD_IMAGES", + "SequenceNumber": "1", + "SizeBytes": 26 + }, + "awsRegion": "us-west-2", + "eventName": "INSERT", + "eventSourceARN": "arn:aws:dynamodb:us-east-2:123456789012:table/my-table/stream/2023-06-10T19:26:16.525", + "eventSource": "aws:dynamodb" + }, + { + "eventID": "2", + "eventVersion": "1.0", + "dynamodb": { + "NewImage": { + "Message": { + "S": "fail" + }, + "ThrowError": { + "BOOL": false + }, + "Id": { + "N": "2" + } + }, + "SequenceNumber": "2", + "Keys": { + "Id": { + "N": "101" + } + }, + "SizeBytes": 59, + "StreamViewType": "NEW_AND_OLD_IMAGES" + }, + "awsRegion": "us-west-2", + "eventName": "INSERT", + "eventSourceARN": "arn:aws:dynamodb:us-east-2:123456789012:table/my-table/stream/2023-06-10T19:26:16.525", + "eventSource": "aws:dynamodb" + } +] \ No newline at end of file diff --git a/extensions/funqy/funqy-amazon-lambda/deployment/src/test/resources/events/kinesis/fail.json b/extensions/funqy/funqy-amazon-lambda/deployment/src/test/resources/events/kinesis/fail.json new file mode 100644 index 0000000000000..061f8dd6b880b --- /dev/null +++ b/extensions/funqy/funqy-amazon-lambda/deployment/src/test/resources/events/kinesis/fail.json @@ -0,0 +1,36 @@ +{ + "Records": [ + { + "kinesis": { + "kinesisSchemaVersion": "1.0", + "partitionKey": "1", + "sequenceNumber": "1", + "data": "eyJtZXNzYWdlIjoiaGVsbG8iLCJ0aHJvd0Vycm9yIjp0cnVlfQ==", + "approximateArrivalTimestamp": 1545084650.987 + }, + "eventSource": "aws:kinesis", + "eventVersion": "1.0", + "eventID": "shardId-000000000006:49590338271490256608559692538361571095921575989136588898", + "eventName": "aws:kinesis:record", + "invokeIdentityArn": "arn:aws:iam::123456789012:role/lambda-role", + "awsRegion": "us-east-2", + "eventSourceARN": "arn:aws:kinesis:us-east-2:123456789012:stream/lambda-stream" + }, + { + "kinesis": { + "kinesisSchemaVersion": "1.0", + "partitionKey": "1", + "sequenceNumber": "2", + "data": "eyJtZXNzYWdlIjoiZmFpbCIsInRocm93RXJyb3IiOmZhbHNlfQ==", + "approximateArrivalTimestamp": 1545084711.166 + }, + "eventSource": "aws:kinesis", + "eventVersion": "1.0", + "eventID": "shardId-000000000006:49590338271490256608559692540925702759324208523137515618", + "eventName": "aws:kinesis:record", + "invokeIdentityArn": "arn:aws:iam::123456789012:role/lambda-role", + "awsRegion": "us-east-2", + "eventSourceARN": "arn:aws:kinesis:us-east-2:123456789012:stream/lambda-stream" + } + ] +} \ No newline at end of file diff --git a/extensions/funqy/funqy-amazon-lambda/deployment/src/test/resources/events/kinesis/ok.json b/extensions/funqy/funqy-amazon-lambda/deployment/src/test/resources/events/kinesis/ok.json new file mode 100644 index 0000000000000..c8550929724b1 --- /dev/null +++ b/extensions/funqy/funqy-amazon-lambda/deployment/src/test/resources/events/kinesis/ok.json @@ -0,0 +1,36 @@ +{ + "Records": [ + { + "kinesis": { + "kinesisSchemaVersion": "1.0", + "partitionKey": "1", + "sequenceNumber": "1", + "data": "eyJtZXNzYWdlIjoiaGVsbG8iLCJ0aHJvd0Vycm9yIjpmYWxzZX0=", + "approximateArrivalTimestamp": 1545084650.987 + }, + "eventSource": "aws:kinesis", + "eventVersion": "1.0", + "eventID": "shardId-000000000006:49590338271490256608559692538361571095921575989136588898", + "eventName": "aws:kinesis:record", + "invokeIdentityArn": "arn:aws:iam::123456789012:role/lambda-role", + "awsRegion": "us-east-2", + "eventSourceARN": "arn:aws:kinesis:us-east-2:123456789012:stream/lambda-stream" + }, + { + "kinesis": { + "kinesisSchemaVersion": "1.0", + "partitionKey": "1", + "sequenceNumber": "2", + "data": "eyJtZXNzYWdlIjoiZmFpbCIsInRocm93RXJyb3IiOmZhbHNlfQ==", + "approximateArrivalTimestamp": 1545084711.166 + }, + "eventSource": "aws:kinesis", + "eventVersion": "1.0", + "eventID": "shardId-000000000006:49590338271490256608559692540925702759324208523137515618", + "eventName": "aws:kinesis:record", + "invokeIdentityArn": "arn:aws:iam::123456789012:role/lambda-role", + "awsRegion": "us-east-2", + "eventSourceARN": "arn:aws:kinesis:us-east-2:123456789012:stream/lambda-stream" + } + ] +} \ No newline at end of file diff --git a/extensions/funqy/funqy-amazon-lambda/deployment/src/test/resources/events/kinesis/pipes-fail.json b/extensions/funqy/funqy-amazon-lambda/deployment/src/test/resources/events/kinesis/pipes-fail.json new file mode 100644 index 0000000000000..3cc956e1cee0d --- /dev/null +++ b/extensions/funqy/funqy-amazon-lambda/deployment/src/test/resources/events/kinesis/pipes-fail.json @@ -0,0 +1,30 @@ +[ + { + "kinesisSchemaVersion": "1.0", + "partitionKey": "1", + "sequenceNumber": "1", + "data": "eyJtZXNzYWdlIjoiaGVsbG8iLCJ0aHJvd0Vycm9yIjp0cnVlfQ==", + "approximateArrivalTimestamp": 1545084650.987, + "eventSource": "aws:kinesis", + "eventVersion": "1.0", + "eventID": "shardId-000000000006:49590338271490256608559692538361571095921575989136588898", + "eventName": "aws:kinesis:record", + "invokeIdentityArn": "arn:aws:iam::123456789012:role/lambda-role", + "awsRegion": "us-east-2", + "eventSourceARN": "arn:aws:kinesis:us-east-2:123456789012:stream/lambda-stream" + }, + { + "kinesisSchemaVersion": "1.0", + "partitionKey": "1", + "sequenceNumber": "2", + "data": "eyJtZXNzYWdlIjoiZmFpbCIsInRocm93RXJyb3IiOmZhbHNlfQ==", + "approximateArrivalTimestamp": 1545084711.166, + "eventSource": "aws:kinesis", + "eventVersion": "1.0", + "eventID": "shardId-000000000006:49590338271490256608559692540925702759324208523137515618", + "eventName": "aws:kinesis:record", + "invokeIdentityArn": "arn:aws:iam::123456789012:role/lambda-role", + "awsRegion": "us-east-2", + "eventSourceARN": "arn:aws:kinesis:us-east-2:123456789012:stream/lambda-stream" + } +] \ No newline at end of file diff --git a/extensions/funqy/funqy-amazon-lambda/deployment/src/test/resources/events/kinesis/pipes-ok.json b/extensions/funqy/funqy-amazon-lambda/deployment/src/test/resources/events/kinesis/pipes-ok.json new file mode 100644 index 0000000000000..1a40ebe10175d --- /dev/null +++ b/extensions/funqy/funqy-amazon-lambda/deployment/src/test/resources/events/kinesis/pipes-ok.json @@ -0,0 +1,30 @@ +[ + { + "kinesisSchemaVersion": "1.0", + "partitionKey": "1", + "sequenceNumber": "1", + "data": "eyJtZXNzYWdlIjoiaGVsbG8iLCJ0aHJvd0Vycm9yIjpmYWxzZX0=", + "approximateArrivalTimestamp": 1545084650.987, + "eventSource": "aws:kinesis", + "eventVersion": "1.0", + "eventID": "shardId-000000000006:49590338271490256608559692538361571095921575989136588898", + "eventName": "aws:kinesis:record", + "invokeIdentityArn": "arn:aws:iam::123456789012:role/lambda-role", + "awsRegion": "us-east-2", + "eventSourceARN": "arn:aws:kinesis:us-east-2:123456789012:stream/lambda-stream" + }, + { + "kinesisSchemaVersion": "1.0", + "partitionKey": "1", + "sequenceNumber": "2", + "data": "eyJtZXNzYWdlIjoiZmFpbCIsInRocm93RXJyb3IiOmZhbHNlfQ==", + "approximateArrivalTimestamp": 1545084711.166, + "eventSource": "aws:kinesis", + "eventVersion": "1.0", + "eventID": "shardId-000000000006:49590338271490256608559692540925702759324208523137515618", + "eventName": "aws:kinesis:record", + "invokeIdentityArn": "arn:aws:iam::123456789012:role/lambda-role", + "awsRegion": "us-east-2", + "eventSourceARN": "arn:aws:kinesis:us-east-2:123456789012:stream/lambda-stream" + } +] \ No newline at end of file diff --git a/extensions/funqy/funqy-amazon-lambda/deployment/src/test/resources/events/sns/fail.json b/extensions/funqy/funqy-amazon-lambda/deployment/src/test/resources/events/sns/fail.json new file mode 100644 index 0000000000000..4643568fdfe33 --- /dev/null +++ b/extensions/funqy/funqy-amazon-lambda/deployment/src/test/resources/events/sns/fail.json @@ -0,0 +1,31 @@ +{ + "Records": [ + { + "EventVersion": "1.0", + "EventSubscriptionArn": "arn:aws:sns:us-east-1:123456789012:sns-lambda:21be56ed-a058-49f5-8c98-aedd2564c486", + "EventSource": "aws:sns", + "Sns": { + "SignatureVersion": "1", + "Timestamp": "2019-01-02T12:45:07.000Z", + "Signature": "tcc6faL2yUC6dgZdmrwh1Y4cGa/ebXEkAi6RibDsvpi+tE/1+82j...65r==", + "SigningCertURL": "https://sns.us-east-1.amazonaws.com/SimpleNotificationService-ac565b8b1a6c5d002d285f9598aa1d9b.pem", + "MessageId": "1", + "Message": "{\"message\":\"hello\",\"throwError\":true}", + "MessageAttributes": { + "Test": { + "Type": "String", + "Value": "TestString" + }, + "TestBinary": { + "Type": "Binary", + "Value": "TestBinary" + } + }, + "Type": "Notification", + "UnsubscribeURL": "https://sns.us-east-1.amazonaws.com/?Action=Unsubscribe&SubscriptionArn=arn:aws:sns:us-east-1:123456789012:test-lambda:21be56ed-a058-49f5-8c98-aedd2564c486", + "TopicArn":"arn:aws:sns:us-east-1:123456789012:sns-lambda", + "Subject": "TestInvoke" + } + } + ] +} \ No newline at end of file diff --git a/extensions/funqy/funqy-amazon-lambda/deployment/src/test/resources/events/sns/ok.json b/extensions/funqy/funqy-amazon-lambda/deployment/src/test/resources/events/sns/ok.json new file mode 100644 index 0000000000000..c97c6511cbab2 --- /dev/null +++ b/extensions/funqy/funqy-amazon-lambda/deployment/src/test/resources/events/sns/ok.json @@ -0,0 +1,31 @@ +{ + "Records": [ + { + "EventVersion": "1.0", + "EventSubscriptionArn": "arn:aws:sns:us-east-1:123456789012:sns-lambda:21be56ed-a058-49f5-8c98-aedd2564c486", + "EventSource": "aws:sns", + "Sns": { + "SignatureVersion": "1", + "Timestamp": "2019-01-02T12:45:07.000Z", + "Signature": "tcc6faL2yUC6dgZdmrwh1Y4cGa/ebXEkAi6RibDsvpi+tE/1+82j...65r==", + "SigningCertURL": "https://sns.us-east-1.amazonaws.com/SimpleNotificationService-ac565b8b1a6c5d002d285f9598aa1d9b.pem", + "MessageId": "1", + "Message": "{\"message\":\"hello\",\"throwError\":false}", + "MessageAttributes": { + "Test": { + "Type": "String", + "Value": "TestString" + }, + "TestBinary": { + "Type": "Binary", + "Value": "TestBinary" + } + }, + "Type": "Notification", + "UnsubscribeURL": "https://sns.us-east-1.amazonaws.com/?Action=Unsubscribe&SubscriptionArn=arn:aws:sns:us-east-1:123456789012:test-lambda:21be56ed-a058-49f5-8c98-aedd2564c486", + "TopicArn":"arn:aws:sns:us-east-1:123456789012:sns-lambda", + "Subject": "TestInvoke" + } + } + ] +} \ No newline at end of file diff --git a/extensions/funqy/funqy-amazon-lambda/deployment/src/test/resources/events/sqs/fail.json b/extensions/funqy/funqy-amazon-lambda/deployment/src/test/resources/events/sqs/fail.json new file mode 100644 index 0000000000000..b6b52c29187ec --- /dev/null +++ b/extensions/funqy/funqy-amazon-lambda/deployment/src/test/resources/events/sqs/fail.json @@ -0,0 +1,36 @@ +{ + "Records": [ + { + "messageId": "1", + "receiptHandle": "AQEBwJnKyrHigUMZj6rYigCgxlaS3SLy0a...", + "body": "{\"message\":\"hello\",\"throwError\":true}", + "attributes": { + "ApproximateReceiveCount": "1", + "SentTimestamp": "1545082649183", + "SenderId": "AIDAIENQZJOLO23YVJ4VO", + "ApproximateFirstReceiveTimestamp": "1545082649185" + }, + "messageAttributes": {}, + "md5OfBody": "e4e68fb7bd0e697a0ae8f1bb342846b3", + "eventSource": "aws:sqs", + "eventSourceARN": "arn:aws:sqs:us-east-2:123456789012:my-queue", + "awsRegion": "us-east-2" + }, + { + "messageId": "2", + "receiptHandle": "AQEBzWwaftRI0KuVm4tP+/7q1rGgNqicHq...", + "body": "{\"message\":\"fail\",\"throwError\":false}", + "attributes": { + "ApproximateReceiveCount": "1", + "SentTimestamp": "1545082650636", + "SenderId": "AIDAIENQZJOLO23YVJ4VO", + "ApproximateFirstReceiveTimestamp": "1545082650649" + }, + "messageAttributes": {}, + "md5OfBody": "e4e68fb7bd0e697a0ae8f1bb342846b3", + "eventSource": "aws:sqs", + "eventSourceARN": "arn:aws:sqs:us-east-2:123456789012:my-queue", + "awsRegion": "us-east-2" + } + ] +} \ No newline at end of file diff --git a/extensions/funqy/funqy-amazon-lambda/deployment/src/test/resources/events/sqs/ok.json b/extensions/funqy/funqy-amazon-lambda/deployment/src/test/resources/events/sqs/ok.json new file mode 100644 index 0000000000000..a5b3b93714505 --- /dev/null +++ b/extensions/funqy/funqy-amazon-lambda/deployment/src/test/resources/events/sqs/ok.json @@ -0,0 +1,36 @@ +{ + "Records": [ + { + "messageId": "1", + "receiptHandle": "AQEBwJnKyrHigUMZj6rYigCgxlaS3SLy0a...", + "body": "{\"message\":\"hello\",\"throwError\":false}", + "attributes": { + "ApproximateReceiveCount": "1", + "SentTimestamp": "1545082649183", + "SenderId": "AIDAIENQZJOLO23YVJ4VO", + "ApproximateFirstReceiveTimestamp": "1545082649185" + }, + "messageAttributes": {}, + "md5OfBody": "e4e68fb7bd0e697a0ae8f1bb342846b3", + "eventSource": "aws:sqs", + "eventSourceARN": "arn:aws:sqs:us-east-2:123456789012:my-queue", + "awsRegion": "us-east-2" + }, + { + "messageId": "2", + "receiptHandle": "AQEBzWwaftRI0KuVm4tP+/7q1rGgNqicHq...", + "body": "{\"message\":\"fail\",\"throwError\":false}", + "attributes": { + "ApproximateReceiveCount": "1", + "SentTimestamp": "1545082650636", + "SenderId": "AIDAIENQZJOLO23YVJ4VO", + "ApproximateFirstReceiveTimestamp": "1545082650649" + }, + "messageAttributes": {}, + "md5OfBody": "e4e68fb7bd0e697a0ae8f1bb342846b3", + "eventSource": "aws:sqs", + "eventSourceARN": "arn:aws:sqs:us-east-2:123456789012:my-queue", + "awsRegion": "us-east-2" + } + ] +} \ No newline at end of file diff --git a/extensions/funqy/funqy-amazon-lambda/deployment/src/test/resources/events/sqs/pipes-fail.json b/extensions/funqy/funqy-amazon-lambda/deployment/src/test/resources/events/sqs/pipes-fail.json new file mode 100644 index 0000000000000..76e02d9b27aaa --- /dev/null +++ b/extensions/funqy/funqy-amazon-lambda/deployment/src/test/resources/events/sqs/pipes-fail.json @@ -0,0 +1,34 @@ +[ + { + "messageId": "1", + "receiptHandle": "AQEBwJnKyrHigUMZj6rYigCgxlaS3SLy0a...", + "body": "{\"message\":\"hello\",\"throwError\":true}", + "attributes": { + "ApproximateReceiveCount": "1", + "SentTimestamp": "1545082649183", + "SenderId": "AIDAIENQZJOLO23YVJ4VO", + "ApproximateFirstReceiveTimestamp": "1545082649185" + }, + "messageAttributes": {}, + "md5OfBody": "e4e68fb7bd0e697a0ae8f1bb342846b3", + "eventSource": "aws:sqs", + "eventSourceARN": "arn:aws:sqs:us-east-2:123456789012:my-queue", + "awsRegion": "us-east-2" + }, + { + "messageId": "2", + "receiptHandle": "AQEBzWwaftRI0KuVm4tP+/7q1rGgNqicHq...", + "body": "{\"message\":\"fail\",\"throwError\":false}", + "attributes": { + "ApproximateReceiveCount": "1", + "SentTimestamp": "1545082650636", + "SenderId": "AIDAIENQZJOLO23YVJ4VO", + "ApproximateFirstReceiveTimestamp": "1545082650649" + }, + "messageAttributes": {}, + "md5OfBody": "e4e68fb7bd0e697a0ae8f1bb342846b3", + "eventSource": "aws:sqs", + "eventSourceARN": "arn:aws:sqs:us-east-2:123456789012:my-queue", + "awsRegion": "us-east-2" + } +] \ No newline at end of file diff --git a/extensions/funqy/funqy-amazon-lambda/deployment/src/test/resources/events/sqs/pipes-ok.json b/extensions/funqy/funqy-amazon-lambda/deployment/src/test/resources/events/sqs/pipes-ok.json new file mode 100644 index 0000000000000..83f6fb1449950 --- /dev/null +++ b/extensions/funqy/funqy-amazon-lambda/deployment/src/test/resources/events/sqs/pipes-ok.json @@ -0,0 +1,34 @@ +[ + { + "messageId": "1", + "receiptHandle": "AQEBwJnKyrHigUMZj6rYigCgxlaS3SLy0a...", + "body": "{\"message\":\"hello\",\"throwError\":false}", + "attributes": { + "ApproximateReceiveCount": "1", + "SentTimestamp": "1545082649183", + "SenderId": "AIDAIENQZJOLO23YVJ4VO", + "ApproximateFirstReceiveTimestamp": "1545082649185" + }, + "messageAttributes": {}, + "md5OfBody": "e4e68fb7bd0e697a0ae8f1bb342846b3", + "eventSource": "aws:sqs", + "eventSourceARN": "arn:aws:sqs:us-east-2:123456789012:my-queue", + "awsRegion": "us-east-2" + }, + { + "messageId": "2", + "receiptHandle": "AQEBzWwaftRI0KuVm4tP+/7q1rGgNqicHq...", + "body": "{\"message\":\"fail\",\"throwError\":false}", + "attributes": { + "ApproximateReceiveCount": "1", + "SentTimestamp": "1545082650636", + "SenderId": "AIDAIENQZJOLO23YVJ4VO", + "ApproximateFirstReceiveTimestamp": "1545082650649" + }, + "messageAttributes": {}, + "md5OfBody": "e4e68fb7bd0e697a0ae8f1bb342846b3", + "eventSource": "aws:sqs", + "eventSourceARN": "arn:aws:sqs:us-east-2:123456789012:my-queue", + "awsRegion": "us-east-2" + } +] \ No newline at end of file diff --git a/extensions/funqy/funqy-amazon-lambda/deployment/src/test/resources/item-function.properties b/extensions/funqy/funqy-amazon-lambda/deployment/src/test/resources/item-function.properties new file mode 100644 index 0000000000000..3e2957b10e52c --- /dev/null +++ b/extensions/funqy/funqy-amazon-lambda/deployment/src/test/resources/item-function.properties @@ -0,0 +1,2 @@ +quarkus.funqy.export=item-function +quarkus.funqy.amazon-lambda.advanced-event-handling.enabled=true diff --git a/extensions/funqy/funqy-amazon-lambda/deployment/src/test/resources/no-batch-function.properties b/extensions/funqy/funqy-amazon-lambda/deployment/src/test/resources/no-batch-function.properties new file mode 100644 index 0000000000000..ca2d666955c8f --- /dev/null +++ b/extensions/funqy/funqy-amazon-lambda/deployment/src/test/resources/no-batch-function.properties @@ -0,0 +1,3 @@ +quarkus.funqy.export=item-function +quarkus.funqy.amazon-lambda.advanced-event-handling.enabled=true +quarkus.funqy.amazon-lambda.advanced-event-handling.sqs.report-batch-item-failures=false diff --git a/extensions/funqy/funqy-amazon-lambda/runtime/pom.xml b/extensions/funqy/funqy-amazon-lambda/runtime/pom.xml index 4366bd5e63b3d..ccf8a40910d24 100644 --- a/extensions/funqy/funqy-amazon-lambda/runtime/pom.xml +++ b/extensions/funqy/funqy-amazon-lambda/runtime/pom.xml @@ -26,6 +26,20 @@ io.quarkus quarkus-jackson + + com.amazonaws + aws-lambda-java-events + + + joda-time + joda-time + + + + + io.cloudevents + cloudevents-api + diff --git a/extensions/funqy/funqy-amazon-lambda/runtime/src/main/java/io/quarkus/funqy/lambda/FunqyLambdaBindingRecorder.java b/extensions/funqy/funqy-amazon-lambda/runtime/src/main/java/io/quarkus/funqy/lambda/FunqyLambdaBindingRecorder.java index d2726940abd85..bb4fba814d073 100644 --- a/extensions/funqy/funqy-amazon-lambda/runtime/src/main/java/io/quarkus/funqy/lambda/FunqyLambdaBindingRecorder.java +++ b/extensions/funqy/funqy-amazon-lambda/runtime/src/main/java/io/quarkus/funqy/lambda/FunqyLambdaBindingRecorder.java @@ -11,16 +11,21 @@ import com.fasterxml.jackson.databind.ObjectMapper; import com.fasterxml.jackson.databind.ObjectReader; import com.fasterxml.jackson.databind.ObjectWriter; +import com.fasterxml.jackson.databind.module.SimpleModule; import io.quarkus.amazon.lambda.runtime.AbstractLambdaPollLoop; import io.quarkus.amazon.lambda.runtime.AmazonLambdaContext; import io.quarkus.amazon.lambda.runtime.AmazonLambdaMapperRecorder; -import io.quarkus.amazon.lambda.runtime.JacksonInputReader; -import io.quarkus.amazon.lambda.runtime.JacksonOutputWriter; import io.quarkus.amazon.lambda.runtime.LambdaInputReader; import io.quarkus.amazon.lambda.runtime.LambdaOutputWriter; import io.quarkus.arc.ManagedContext; import io.quarkus.arc.runtime.BeanContainer; +import io.quarkus.funqy.lambda.config.FunqyAmazonBuildTimeConfig; +import io.quarkus.funqy.lambda.config.FunqyAmazonConfig; +import io.quarkus.funqy.lambda.event.AwsModule; +import io.quarkus.funqy.lambda.event.EventDeserializer; +import io.quarkus.funqy.lambda.event.EventProcessor; +import io.quarkus.funqy.lambda.model.FunqyMethod; import io.quarkus.funqy.runtime.FunctionConstructor; import io.quarkus.funqy.runtime.FunctionInvoker; import io.quarkus.funqy.runtime.FunctionRecorder; @@ -40,28 +45,39 @@ public class FunqyLambdaBindingRecorder { private static FunctionInvoker invoker; private static BeanContainer beanContainer; - private static LambdaInputReader reader; - private static LambdaOutputWriter writer; + private static EventProcessor eventProcessor; - public void init(BeanContainer bc) { + public void init(BeanContainer bc, FunqyAmazonBuildTimeConfig buildTimeConfig) { beanContainer = bc; FunctionConstructor.CONTAINER = bc; - ObjectMapper objectMapper = AmazonLambdaMapperRecorder.objectMapper; + // We create a copy, because we register a custom deserializer for everything. + ObjectMapper objectMapper = AmazonLambdaMapperRecorder.objectMapper.copy(); + EventDeserializer eventDeserializer = new EventDeserializer(buildTimeConfig); + final SimpleModule simpleModule = new AwsModule(); + simpleModule.addDeserializer(Object.class, eventDeserializer); + objectMapper.registerModule(simpleModule); + for (FunctionInvoker invoker : FunctionRecorder.registry.invokers()) { + ObjectReader reader = null; + JavaType javaInputType = null; if (invoker.hasInput()) { - JavaType javaInputType = objectMapper.constructType(invoker.getInputType()); - ObjectReader reader = objectMapper.readerFor(javaInputType); - invoker.getBindingContext().put(ObjectReader.class.getName(), reader); + javaInputType = objectMapper.constructType(invoker.getInputType()); + reader = objectMapper.readerFor(javaInputType); } + ObjectWriter writer = null; + JavaType javaOutputType = null; if (invoker.hasOutput()) { - JavaType javaOutputType = objectMapper.constructType(invoker.getOutputType()); - ObjectWriter writer = objectMapper.writerFor(javaOutputType); - invoker.getBindingContext().put(ObjectWriter.class.getName(), writer); + javaOutputType = objectMapper.constructType(invoker.getOutputType()); + writer = objectMapper.writerFor(javaOutputType); } + invoker.getBindingContext().put(EventProcessor.class.getName(), + new EventProcessor(objectMapper, eventDeserializer, + new FunqyMethod(reader, writer, javaInputType, javaOutputType), + buildTimeConfig)); } } - public void chooseInvoker(FunqyConfig config) { + public void chooseInvoker(FunqyConfig config, FunqyAmazonConfig amazonConfig) { // this is done at Runtime so that we can change it with an environment variable. if (config.export.isPresent()) { invoker = FunctionRecorder.registry.matchInvoker(config.export.get()); @@ -76,35 +92,24 @@ public void chooseInvoker(FunqyConfig config) { } else { invoker = FunctionRecorder.registry.invokers().iterator().next(); } - if (invoker.hasInput()) { - reader = new JacksonInputReader((ObjectReader) invoker.getBindingContext().get(ObjectReader.class.getName())); - } - if (invoker.hasOutput()) { - writer = new JacksonOutputWriter((ObjectWriter) invoker.getBindingContext().get(ObjectWriter.class.getName())); - } - + eventProcessor = (EventProcessor) invoker.getBindingContext().get(EventProcessor.class.getName()); + eventProcessor.init(amazonConfig); } /** * Called by JVM handler wrapper * * @param inputStream + * {@link InputStream} of the AWS SDK {@link com.amazonaws.services.lambda.runtime.RequestStreamHandler} * @param outputStream + * {@link OutputStream} of the AWS SDK {@link com.amazonaws.services.lambda.runtime.RequestStreamHandler} * @param context + * AWS context information provided to the Lambda * @throws IOException + * Is thrown in case the (de)serialization fails */ public static void handle(InputStream inputStream, OutputStream outputStream, Context context) throws IOException { - Object input = null; - if (invoker.hasInput()) { - input = reader.readValue(inputStream); - } - FunqyServerResponse response = dispatch(input); - - Object value = response.getOutput().await().indefinitely(); - if (value != null) { - writer.writeValue(outputStream, value); - } - + eventProcessor.handle(inputStream, outputStream, FunqyLambdaBindingRecorder::dispatch, context); } @SuppressWarnings("rawtypes") @@ -114,29 +119,28 @@ public void startPollLoop(ShutdownContext context, LaunchMode launchMode) { @Override protected Object processRequest(Object input, AmazonLambdaContext context) throws Exception { - FunqyServerResponse response = dispatch(input); - return response.getOutput().await().indefinitely(); + throw new RuntimeException("Unreachable"); } @Override protected LambdaInputReader getInputReader() { - return reader; + throw new RuntimeException("Unreachable"); } @Override protected LambdaOutputWriter getOutputWriter() { - return writer; + throw new RuntimeException("Unreachable"); } @Override protected boolean isStream() { - return false; + return true; } @Override protected void processRequest(InputStream input, OutputStream output, AmazonLambdaContext context) throws Exception { - throw new RuntimeException("Unreachable!"); + handle(input, output, context); } }; loop.startPollLoop(context); diff --git a/extensions/funqy/funqy-amazon-lambda/runtime/src/main/java/io/quarkus/funqy/lambda/config/AdvancedEventHandlingBuildTimeConfig.java b/extensions/funqy/funqy-amazon-lambda/runtime/src/main/java/io/quarkus/funqy/lambda/config/AdvancedEventHandlingBuildTimeConfig.java new file mode 100644 index 0000000000000..3ac6babc1c95e --- /dev/null +++ b/extensions/funqy/funqy-amazon-lambda/runtime/src/main/java/io/quarkus/funqy/lambda/config/AdvancedEventHandlingBuildTimeConfig.java @@ -0,0 +1,17 @@ +package io.quarkus.funqy.lambda.config; + +import io.quarkus.runtime.annotations.ConfigGroup; +import io.smallrye.config.WithDefault; + +/** + * Advanced event handling build time configuration + */ +@ConfigGroup +public interface AdvancedEventHandlingBuildTimeConfig { + + /** + * If advanced event handling should be enabled + */ + @WithDefault("false") + boolean enabled(); +} diff --git a/extensions/funqy/funqy-amazon-lambda/runtime/src/main/java/io/quarkus/funqy/lambda/config/AdvancedEventHandlingConfig.java b/extensions/funqy/funqy-amazon-lambda/runtime/src/main/java/io/quarkus/funqy/lambda/config/AdvancedEventHandlingConfig.java new file mode 100644 index 0000000000000..e5ca3b0afacac --- /dev/null +++ b/extensions/funqy/funqy-amazon-lambda/runtime/src/main/java/io/quarkus/funqy/lambda/config/AdvancedEventHandlingConfig.java @@ -0,0 +1,30 @@ +package io.quarkus.funqy.lambda.config; + +import io.quarkus.runtime.annotations.ConfigGroup; + +/** + * Advanced event handling configuration + */ +@ConfigGroup +public interface AdvancedEventHandlingConfig { + + /** + * Sqs related config. + */ + Sqs sqs(); + + /** + * Sns related config. + */ + Sns sns(); + + /** + * Kinesis related config. + */ + Kinesis kinesis(); + + /** + * DynamoDb related config. + */ + DynamoDb dynamoDb(); +} diff --git a/extensions/funqy/funqy-amazon-lambda/runtime/src/main/java/io/quarkus/funqy/lambda/config/DynamoDb.java b/extensions/funqy/funqy-amazon-lambda/runtime/src/main/java/io/quarkus/funqy/lambda/config/DynamoDb.java new file mode 100644 index 0000000000000..6d182d725670b --- /dev/null +++ b/extensions/funqy/funqy-amazon-lambda/runtime/src/main/java/io/quarkus/funqy/lambda/config/DynamoDb.java @@ -0,0 +1,17 @@ +package io.quarkus.funqy.lambda.config; + +import io.quarkus.runtime.annotations.ConfigGroup; +import io.smallrye.config.WithDefault; + +/** + * Kinesis event config + */ +@ConfigGroup +public interface DynamoDb { + + /** + * Allows functions to return partially successful responses for a batch of event records. + */ + @WithDefault("true") + boolean reportBatchItemFailures(); +} diff --git a/extensions/funqy/funqy-amazon-lambda/runtime/src/main/java/io/quarkus/funqy/lambda/config/FunqyAmazonBuildTimeConfig.java b/extensions/funqy/funqy-amazon-lambda/runtime/src/main/java/io/quarkus/funqy/lambda/config/FunqyAmazonBuildTimeConfig.java new file mode 100644 index 0000000000000..94d8c2da860bf --- /dev/null +++ b/extensions/funqy/funqy-amazon-lambda/runtime/src/main/java/io/quarkus/funqy/lambda/config/FunqyAmazonBuildTimeConfig.java @@ -0,0 +1,17 @@ +package io.quarkus.funqy.lambda.config; + +import io.quarkus.runtime.annotations.ConfigPhase; +import io.quarkus.runtime.annotations.ConfigRoot; +import io.smallrye.config.ConfigMapping; +import io.smallrye.config.WithName; + +@ConfigMapping(prefix = "quarkus.funqy.amazon-lambda") +@ConfigRoot(phase = ConfigPhase.BUILD_AND_RUN_TIME_FIXED) +public interface FunqyAmazonBuildTimeConfig { + + /** + * The advanced event handling config + */ + @WithName("advanced-event-handling") + AdvancedEventHandlingBuildTimeConfig advancedEventHandling(); +} diff --git a/extensions/funqy/funqy-amazon-lambda/runtime/src/main/java/io/quarkus/funqy/lambda/config/FunqyAmazonConfig.java b/extensions/funqy/funqy-amazon-lambda/runtime/src/main/java/io/quarkus/funqy/lambda/config/FunqyAmazonConfig.java new file mode 100644 index 0000000000000..ede409c6e90d1 --- /dev/null +++ b/extensions/funqy/funqy-amazon-lambda/runtime/src/main/java/io/quarkus/funqy/lambda/config/FunqyAmazonConfig.java @@ -0,0 +1,17 @@ +package io.quarkus.funqy.lambda.config; + +import io.quarkus.runtime.annotations.ConfigPhase; +import io.quarkus.runtime.annotations.ConfigRoot; +import io.smallrye.config.ConfigMapping; +import io.smallrye.config.WithName; + +@ConfigMapping(prefix = "quarkus.funqy.amazon-lambda") +@ConfigRoot(phase = ConfigPhase.RUN_TIME) +public interface FunqyAmazonConfig { + + /** + * The advanced event handling config + */ + @WithName("advanced-event-handling") + AdvancedEventHandlingConfig advancedEventHandling(); +} diff --git a/extensions/funqy/funqy-amazon-lambda/runtime/src/main/java/io/quarkus/funqy/lambda/config/Kinesis.java b/extensions/funqy/funqy-amazon-lambda/runtime/src/main/java/io/quarkus/funqy/lambda/config/Kinesis.java new file mode 100644 index 0000000000000..b05562d93573a --- /dev/null +++ b/extensions/funqy/funqy-amazon-lambda/runtime/src/main/java/io/quarkus/funqy/lambda/config/Kinesis.java @@ -0,0 +1,17 @@ +package io.quarkus.funqy.lambda.config; + +import io.quarkus.runtime.annotations.ConfigGroup; +import io.smallrye.config.WithDefault; + +/** + * Kinesis event config + */ +@ConfigGroup +public interface Kinesis { + + /** + * Allows functions to return partially successful responses for a batch of event records. + */ + @WithDefault("true") + boolean reportBatchItemFailures(); +} diff --git a/extensions/funqy/funqy-amazon-lambda/runtime/src/main/java/io/quarkus/funqy/lambda/config/Sns.java b/extensions/funqy/funqy-amazon-lambda/runtime/src/main/java/io/quarkus/funqy/lambda/config/Sns.java new file mode 100644 index 0000000000000..cfb273a73550f --- /dev/null +++ b/extensions/funqy/funqy-amazon-lambda/runtime/src/main/java/io/quarkus/funqy/lambda/config/Sns.java @@ -0,0 +1,11 @@ +package io.quarkus.funqy.lambda.config; + +import io.quarkus.runtime.annotations.ConfigGroup; + +/** + * Sns event config + */ +@ConfigGroup +public interface Sns { + +} diff --git a/extensions/funqy/funqy-amazon-lambda/runtime/src/main/java/io/quarkus/funqy/lambda/config/Sqs.java b/extensions/funqy/funqy-amazon-lambda/runtime/src/main/java/io/quarkus/funqy/lambda/config/Sqs.java new file mode 100644 index 0000000000000..75346e96e5865 --- /dev/null +++ b/extensions/funqy/funqy-amazon-lambda/runtime/src/main/java/io/quarkus/funqy/lambda/config/Sqs.java @@ -0,0 +1,17 @@ +package io.quarkus.funqy.lambda.config; + +import io.quarkus.runtime.annotations.ConfigGroup; +import io.smallrye.config.WithDefault; + +/** + * Sqs event config + */ +@ConfigGroup +public interface Sqs { + + /** + * Allows functions to return partially successful responses for a batch of event records. + */ + @WithDefault("true") + boolean reportBatchItemFailures(); +} diff --git a/extensions/funqy/funqy-amazon-lambda/runtime/src/main/java/io/quarkus/funqy/lambda/event/AwsModule.java b/extensions/funqy/funqy-amazon-lambda/runtime/src/main/java/io/quarkus/funqy/lambda/event/AwsModule.java new file mode 100644 index 0000000000000..c75002af6af74 --- /dev/null +++ b/extensions/funqy/funqy-amazon-lambda/runtime/src/main/java/io/quarkus/funqy/lambda/event/AwsModule.java @@ -0,0 +1,42 @@ +package io.quarkus.funqy.lambda.event; + +import java.util.Iterator; + +import com.amazonaws.services.lambda.runtime.events.models.kinesis.Record; +import com.fasterxml.jackson.databind.BeanDescription; +import com.fasterxml.jackson.databind.DeserializationConfig; +import com.fasterxml.jackson.databind.deser.BeanDeserializerBuilder; +import com.fasterxml.jackson.databind.deser.BeanDeserializerModifier; +import com.fasterxml.jackson.databind.deser.SettableBeanProperty; +import com.fasterxml.jackson.databind.module.SimpleModule; + +import io.quarkus.funqy.lambda.event.kinesis.DateDeserializer; + +public class AwsModule extends SimpleModule { + + final String DATE_PROPERTY_NAME = "approximateArrivalTimestamp"; + + public AwsModule() { + this.setDeserializerModifier(new BeanDeserializerModifier() { + + @Override + public BeanDeserializerBuilder updateBuilder(final DeserializationConfig config, + final BeanDescription beanDesc, final BeanDeserializerBuilder builder) { + + for (final Iterator iterator = builder.getProperties(); iterator.hasNext();) { + SettableBeanProperty property = iterator.next(); + + // Kinesis records need some special treatment. The approximateArrivalTimestamp + // cannot be deserialized that easily. + if (Record.class.isAssignableFrom(property.getMember().getDeclaringClass()) + && DATE_PROPERTY_NAME.equalsIgnoreCase(property.getName())) { + final DateDeserializer deserializer = new DateDeserializer(); + property = property.withValueDeserializer(deserializer); + builder.addOrReplaceProperty(property, true); + } + } + return builder; + } + }); + } +} diff --git a/extensions/funqy/funqy-amazon-lambda/runtime/src/main/java/io/quarkus/funqy/lambda/event/EventDeserializer.java b/extensions/funqy/funqy-amazon-lambda/runtime/src/main/java/io/quarkus/funqy/lambda/event/EventDeserializer.java new file mode 100644 index 0000000000000..44e0362c5b7c9 --- /dev/null +++ b/extensions/funqy/funqy-amazon-lambda/runtime/src/main/java/io/quarkus/funqy/lambda/event/EventDeserializer.java @@ -0,0 +1,176 @@ +package io.quarkus.funqy.lambda.event; + +import java.io.IOException; +import java.util.List; + +import com.amazonaws.services.lambda.runtime.events.DynamodbEvent; +import com.amazonaws.services.lambda.runtime.events.KinesisEvent; +import com.amazonaws.services.lambda.runtime.events.SNSEvent; +import com.amazonaws.services.lambda.runtime.events.SQSEvent; +import com.fasterxml.jackson.core.JsonParser; +import com.fasterxml.jackson.core.ObjectCodec; +import com.fasterxml.jackson.core.TreeNode; +import com.fasterxml.jackson.core.type.TypeReference; +import com.fasterxml.jackson.databind.DeserializationContext; +import com.fasterxml.jackson.databind.JsonDeserializer; +import com.fasterxml.jackson.databind.JsonNode; +import com.fasterxml.jackson.databind.ObjectMapper; +import com.fasterxml.jackson.databind.ObjectReader; +import com.fasterxml.jackson.databind.node.ArrayNode; +import com.fasterxml.jackson.databind.node.ObjectNode; + +import io.cloudevents.SpecVersion; +import io.quarkus.funqy.lambda.config.FunqyAmazonBuildTimeConfig; +import io.quarkus.funqy.lambda.model.cloudevents.CloudEventV1; +import io.quarkus.funqy.lambda.model.kinesis.PipesKinesisEvent; + +public class EventDeserializer extends JsonDeserializer { + + private final FunqyAmazonBuildTimeConfig buildTimeConfig; + + private ObjectReader funqyMethodObjReader; + private ObjectMapper objectMapper; + + public EventDeserializer(FunqyAmazonBuildTimeConfig buildTimeConfig) { + this.buildTimeConfig = buildTimeConfig; + } + + @Override + public Object deserialize(JsonParser jsonParser, DeserializationContext ctx) + throws IOException { + + ObjectCodec codec = jsonParser.getCodec(); + + TreeNode rootNode = codec.readTree(jsonParser); + + if (buildTimeConfig.advancedEventHandling().enabled() && (rootNode.isObject() || rootNode.isArray())) { + + if (rootNode.isObject()) { + // object + ObjectNode object = (ObjectNode) rootNode; + + if (object.has("Records") && object.get("Records").isArray()) { + // We need to look into the first record entry, to distinguish the different types. + for (JsonNode record : object.get("Records")) { + return deserializeEvent(jsonParser, record, object, codec); + } + } + } else { + // array. This happens in case of AWS EventBridge usage and is also the only way to enable + // CloudEvents usage + ArrayNode array = (ArrayNode) rootNode; + for (JsonNode record : array) { + return deserializeEvent(jsonParser, record, array, codec); + } + } + } + // We have no clue what it is. Fallback to serializing the output of the funqy method + return deserializeFunqReturnType(rootNode); + } + + public Object deserializeEvent(final JsonParser jsonParser, JsonNode record, JsonNode rootNode, ObjectCodec codec) + throws IOException { + + Object eventClass = getEventType(record, rootNode); + + if (eventClass != null) { + if (eventClass instanceof Class clazz) { + return objectMapper.convertValue(rootNode, clazz); + } else if (eventClass instanceof TypeReference typeReference) { + return objectMapper.convertValue(rootNode, typeReference); + } + } + + return deserializeFunqReturnType(rootNode); + } + + private Object getEventType(JsonNode record, JsonNode object) { + String eventSource = getEventSource(record); + + if (eventSource == null) { + eventSource = "default"; + } + + // See: https://docs.aws.amazon.com/lambda/latest/dg/lambda-services.html + // and for Pipes: https://docs.aws.amazon.com/eventbridge/latest/userguide/eb-pipes-event-source.html + Class eventType = null; + switch (eventSource) { + case "aws:sqs": + if (object.isObject()) { + eventType = SQSEvent.class; + } else { + // EventBridge Pipes + return new TypeReference>() { + }; + } + break; + case "aws:sns": + eventType = SNSEvent.class; + break; + case "aws:kinesis": + // Exclude Kinesis time window events. This would require to send a responsible with a state. + // This is very specific to AWS and maybe not the way funqy wants to handle things. + if (object.isObject() && !object.has("window")) { + eventType = KinesisEvent.class; + } else if (object.isArray()) { + // EventBridge Pipes + return new TypeReference>() { + }; + } + break; + case "aws:dynamodb": + if (object.isObject()) { + eventType = DynamodbEvent.class; + } else { + // EventBridge Pipes + return new TypeReference>() { + }; + } + break; + default: + break; + } + if (eventType == null && isCloudEvent(record)) { + return new TypeReference>() { + }; + } + return eventType; + } + + private boolean isCloudEvent(final JsonNode record) { + // this is the best guess we can do. We check for required attributes: + // https://github.com/cloudevents/spec/blob/v1.0.2/cloudevents/spec.md#required-attributes + // A more tolerant way to check the type. We do not want to process to fail here. We can fall back to + // the best guess parting + return record.has("specversion") && record.get("specversion").isTextual() && + SpecVersion.V1.toString().equals(record.get("specversion").asText()) && + record.has("type"); + } + + private String getEventSource(JsonNode record) { + if (record.has("eventSource") && record.get("eventSource").isTextual()) { + return record.get("eventSource").asText(); + } + + // Unsure. In the AWS documentation SNS uses capital keys. I assume this is a mistake. + if (record.has("EventSource") && record.get("EventSource").isTextual()) { + return record.get("EventSource").asText(); + } + return null; + } + + private Object deserializeFunqReturnType(TreeNode node) throws IOException { + if (funqyMethodObjReader != null) { + return funqyMethodObjReader.readValue(node.traverse()); + } + return null; + } + + public void setFunqyMethodObjReader(ObjectReader funqyMethodObjReader) { + this.funqyMethodObjReader = funqyMethodObjReader; + } + + public void setObjectMapper(final ObjectMapper objectMapper) { + this.objectMapper = objectMapper; + } +} diff --git a/extensions/funqy/funqy-amazon-lambda/runtime/src/main/java/io/quarkus/funqy/lambda/event/EventErrorHandler.java b/extensions/funqy/funqy-amazon-lambda/runtime/src/main/java/io/quarkus/funqy/lambda/event/EventErrorHandler.java new file mode 100644 index 0000000000000..fbef780f2e221 --- /dev/null +++ b/extensions/funqy/funqy-amazon-lambda/runtime/src/main/java/io/quarkus/funqy/lambda/event/EventErrorHandler.java @@ -0,0 +1,29 @@ +package io.quarkus.funqy.lambda.event; + +import java.util.ArrayList; +import java.util.List; + +import io.smallrye.mutiny.Uni; + +public class EventErrorHandler { + + private final List failures = new ArrayList<>(); + + public Uni collectFailures(Uni uni, String id) { + return uni.onTermination().invoke((item, failure, cancellation) -> { + Throwable actualFailure = null; + if (failure != null) { + actualFailure = failure; + } else if (cancellation) { + actualFailure = new RuntimeException("Stream cancelled"); + } + if (actualFailure != null) { + failures.add(id); + } + }).onFailure().recoverWithNull(); + } + + public List getFailures() { + return failures; + } +} diff --git a/extensions/funqy/funqy-amazon-lambda/runtime/src/main/java/io/quarkus/funqy/lambda/event/EventHandler.java b/extensions/funqy/funqy-amazon-lambda/runtime/src/main/java/io/quarkus/funqy/lambda/event/EventHandler.java new file mode 100644 index 0000000000000..1775904bd4352 --- /dev/null +++ b/extensions/funqy/funqy-amazon-lambda/runtime/src/main/java/io/quarkus/funqy/lambda/event/EventHandler.java @@ -0,0 +1,20 @@ +package io.quarkus.funqy.lambda.event; + +import java.io.InputStream; +import java.util.List; +import java.util.function.Supplier; +import java.util.stream.Stream; + +import io.quarkus.funqy.lambda.config.FunqyAmazonConfig; + +public interface EventHandler { + Stream streamEvent(E event, FunqyAmazonConfig amazonConfig); + + String getIdentifier(M message, FunqyAmazonConfig amazonConfig); + + Supplier getBody(M message, FunqyAmazonConfig amazonConfig); + + R createResponse(List failures, FunqyAmazonConfig amazonConfig); + + Class getMessageClass(); +} diff --git a/extensions/funqy/funqy-amazon-lambda/runtime/src/main/java/io/quarkus/funqy/lambda/event/EventProcessor.java b/extensions/funqy/funqy-amazon-lambda/runtime/src/main/java/io/quarkus/funqy/lambda/event/EventProcessor.java new file mode 100644 index 0000000000000..30d27b9160661 --- /dev/null +++ b/extensions/funqy/funqy-amazon-lambda/runtime/src/main/java/io/quarkus/funqy/lambda/event/EventProcessor.java @@ -0,0 +1,168 @@ +package io.quarkus.funqy.lambda.event; + +import java.io.IOException; +import java.io.InputStream; +import java.io.OutputStream; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.function.Function; +import java.util.function.Supplier; + +import org.jboss.logging.Logger; + +import com.amazonaws.services.lambda.runtime.Context; +import com.amazonaws.services.lambda.runtime.events.DynamodbEvent; +import com.amazonaws.services.lambda.runtime.events.KinesisEvent; +import com.amazonaws.services.lambda.runtime.events.SNSEvent; +import com.amazonaws.services.lambda.runtime.events.SQSEvent; +import com.fasterxml.jackson.databind.ObjectMapper; +import com.fasterxml.jackson.databind.ObjectReader; + +import io.quarkus.funqy.lambda.config.FunqyAmazonBuildTimeConfig; +import io.quarkus.funqy.lambda.config.FunqyAmazonConfig; +import io.quarkus.funqy.lambda.event.cloudevents.CloudEventsHandler; +import io.quarkus.funqy.lambda.event.dynamodb.DynamoDbEventHandler; +import io.quarkus.funqy.lambda.event.dynamodb.PipesDynamoDbEventHandler; +import io.quarkus.funqy.lambda.event.kinesis.KinesisEventHandler; +import io.quarkus.funqy.lambda.event.kinesis.PipesKinesisEventHandler; +import io.quarkus.funqy.lambda.event.sns.SnsEventHandler; +import io.quarkus.funqy.lambda.event.sqs.PipesSqsEventHandler; +import io.quarkus.funqy.lambda.event.sqs.SqsEventHandler; +import io.quarkus.funqy.lambda.model.FunqyMethod; +import io.quarkus.funqy.lambda.model.cloudevents.CloudEventV1; +import io.quarkus.funqy.lambda.model.kinesis.PipesKinesisEvent; +import io.quarkus.funqy.runtime.FunqyServerResponse; +import io.smallrye.mutiny.Uni; + +public class EventProcessor { + + private static final Logger log = Logger.getLogger(EventProcessor.class); + + private final EventDeserializer eventDeserializer; + private final ObjectMapper objectMapper; + private final ObjectReader reader; + private final FunqyMethod funqyMethod; + private final Map, EventHandler> eventHandlers; + + private FunqyAmazonConfig amazonConfig; + + public EventProcessor(ObjectMapper objectMapper, EventDeserializer eventDeserializer, + FunqyMethod funqyMethod, + FunqyAmazonBuildTimeConfig buildTimeConfig) { + this.objectMapper = objectMapper; + this.eventDeserializer = eventDeserializer; + this.reader = objectMapper.readerFor(Object.class); + this.funqyMethod = funqyMethod; + + this.eventHandlers = new HashMap<>(); + if (buildTimeConfig.advancedEventHandling().enabled()) { + this.eventHandlers.put(SQSEvent.class, new SqsEventHandler()); + this.eventHandlers.put(SQSEvent.SQSMessage.class, new PipesSqsEventHandler()); + this.eventHandlers.put(SNSEvent.class, new SnsEventHandler()); + this.eventHandlers.put(KinesisEvent.class, new KinesisEventHandler()); + this.eventHandlers.put(PipesKinesisEvent.class, new PipesKinesisEventHandler()); + this.eventHandlers.put(DynamodbEvent.class, new DynamoDbEventHandler()); + this.eventHandlers.put(DynamodbEvent.DynamodbStreamRecord.class, new PipesDynamoDbEventHandler()); + this.eventHandlers.put(CloudEventV1.class, new CloudEventsHandler()); + } + } + + public void init(FunqyAmazonConfig amazonConfig) { + // This is a bit strange. We have some static values, which are initialized at some point in time. + // See FunqyLambdaBindingRecorder. Furthermore, we need to pass through the runtime config. + eventDeserializer.setFunqyMethodObjReader(funqyMethod.getReader().orElse(null)); + eventDeserializer.setObjectMapper(objectMapper); + this.amazonConfig = amazonConfig; + } + + public void handle(InputStream inputStream, OutputStream outputStream, + Function dispatcher, Context context) throws IOException { + // event might be null + Object event = reader.readValue(inputStream); + + EventHandler handler = getHandler(event); + if (handler != null) { + EventErrorHandler eventErrorHandler = new EventErrorHandler(); + + Object value = handleEvent(handler, event, eventErrorHandler, dispatcher) + .await().indefinitely(); + + if (value != null) { + objectMapper.writeValue(outputStream, value); + } + } else { + // Unknown event type. We do what Funqy normally did in the past. + FunqyServerResponse response = dispatcher.apply(event); + + Object value = response.getOutput().await().indefinitely(); + writeOutput(outputStream, value); + } + } + + private EventHandler getHandler(Object event) { + if (event == null) { + return null; + } + if (event instanceof List list && !list.isEmpty()) { + // we need some special handling for lists + return eventHandlers.get(list.get(0).getClass()); + } + + return eventHandlers.get(event.getClass()); + } + + @SuppressWarnings("unchecked") + private Uni handleEvent(EventHandler handler, Object event, + EventErrorHandler eventErrorHandler, Function dispatcher) { + + // We collect all messages in a list first, so that we can execute them in parallel. + List> unis = handler.streamEvent((E) event, amazonConfig) + .map(msg -> handleMessage(handler, eventErrorHandler, dispatcher, msg)).toList(); + + return Uni.combine().all().unis(unis) + .collectFailures().discardItems() + .onFailure().recoverWithNull() + .replaceWith(() -> handler.createResponse(eventErrorHandler.getFailures(), amazonConfig)); + } + + private Uni handleMessage(final EventHandler handler, final EventErrorHandler eventErrorHandler, + final Function dispatcher, final M msg) { + try { + final boolean isSuitableType = funqyMethod.getInputType().map(type -> type.hasRawClass(handler.getMessageClass())) + .orElse(false); + + Object input; + if (isSuitableType) { + input = msg; + } else { + input = readMessageBody(handler.getBody(msg, amazonConfig)); + } + + FunqyServerResponse response = dispatcher.apply(input); + + return eventErrorHandler.collectFailures(response.getOutput(), handler.getIdentifier(msg, amazonConfig)); + } catch (Throwable e) { + log.errorv(e, """ + Event could not be handled. This might happen, when the lambda is used with a not supported \ + trigger. If this happens you should disable the advanced event handling and handle the event \ + manually. + """); + return eventErrorHandler.collectFailures(Uni.createFrom().failure(e), + handler.getIdentifier(msg, amazonConfig)); + } + } + + private Object readMessageBody(Supplier is) throws IOException { + if (funqyMethod.getReader().isEmpty()) { + return null; + } + return funqyMethod.getReader().get().readValue(is.get()); + } + + private void writeOutput(final OutputStream outputStream, final Object value) throws IOException { + if (funqyMethod.getWriter().isPresent() && value != null) { + funqyMethod.getWriter().get().writeValue(outputStream, value); + } + } +} diff --git a/extensions/funqy/funqy-amazon-lambda/runtime/src/main/java/io/quarkus/funqy/lambda/event/cloudevents/CloudEventsHandler.java b/extensions/funqy/funqy-amazon-lambda/runtime/src/main/java/io/quarkus/funqy/lambda/event/cloudevents/CloudEventsHandler.java new file mode 100644 index 0000000000000..380e6d670df52 --- /dev/null +++ b/extensions/funqy/funqy-amazon-lambda/runtime/src/main/java/io/quarkus/funqy/lambda/event/cloudevents/CloudEventsHandler.java @@ -0,0 +1,46 @@ +package io.quarkus.funqy.lambda.event.cloudevents; + +import java.io.ByteArrayInputStream; +import java.io.InputStream; +import java.util.List; +import java.util.function.Supplier; +import java.util.stream.Stream; + +import io.cloudevents.CloudEvent; +import io.quarkus.funqy.lambda.config.FunqyAmazonConfig; +import io.quarkus.funqy.lambda.event.EventHandler; +import io.quarkus.funqy.lambda.model.pipes.BatchItemFailures; +import io.quarkus.funqy.lambda.model.pipes.Response; + +public class CloudEventsHandler implements EventHandler, CloudEvent, Response> { + @Override + public Stream streamEvent(final List event, final FunqyAmazonConfig amazonConfig) { + if (event == null) { + return Stream.empty(); + } + return event.stream(); + } + + @Override + public String getIdentifier(final CloudEvent message, final FunqyAmazonConfig amazonConfig) { + return message.getId(); + } + + @Override + public Supplier getBody(final CloudEvent message, final FunqyAmazonConfig amazonConfig) { + return () -> new ByteArrayInputStream(message.getData().toBytes()); + } + + @Override + public Response createResponse(final List failures, final FunqyAmazonConfig amazonConfig) { + if (!amazonConfig.advancedEventHandling().sqs().reportBatchItemFailures()) { + return null; + } + return new Response(failures.stream().map(BatchItemFailures::new).toList()); + } + + @Override + public Class getMessageClass() { + return CloudEvent.class; + } +} diff --git a/extensions/funqy/funqy-amazon-lambda/runtime/src/main/java/io/quarkus/funqy/lambda/event/dynamodb/DynamoDbEventHandler.java b/extensions/funqy/funqy-amazon-lambda/runtime/src/main/java/io/quarkus/funqy/lambda/event/dynamodb/DynamoDbEventHandler.java new file mode 100644 index 0000000000000..e47ad7d92e33c --- /dev/null +++ b/extensions/funqy/funqy-amazon-lambda/runtime/src/main/java/io/quarkus/funqy/lambda/event/dynamodb/DynamoDbEventHandler.java @@ -0,0 +1,53 @@ +package io.quarkus.funqy.lambda.event.dynamodb; + +import java.io.InputStream; +import java.util.List; +import java.util.function.Supplier; +import java.util.stream.Stream; + +import com.amazonaws.services.lambda.runtime.events.DynamodbEvent; +import com.amazonaws.services.lambda.runtime.events.DynamodbEvent.DynamodbStreamRecord; +import com.amazonaws.services.lambda.runtime.events.StreamsEventResponse; + +import io.quarkus.funqy.lambda.config.FunqyAmazonConfig; +import io.quarkus.funqy.lambda.event.EventHandler; + +public class DynamoDbEventHandler implements EventHandler { + + @Override + public Stream streamEvent(DynamodbEvent event, FunqyAmazonConfig amazonConfig) { + if (event == null) { + return Stream.empty(); + } + return event.getRecords().stream(); + } + + @Override + public String getIdentifier(DynamodbStreamRecord message, FunqyAmazonConfig amazonConfig) { + return message.getDynamodb().getSequenceNumber(); + } + + @Override + public Supplier getBody(DynamodbStreamRecord message, FunqyAmazonConfig amazonConfig) { + throw new IllegalStateException(""" + DynamoDB records are too specific. It is not supported to extract a message from them. \ + Use the DynamodbStreamRecord in your funq method, or use EventBridge Pipes with CloudEvents. + """); + } + + @Override + public StreamsEventResponse createResponse(List failures, FunqyAmazonConfig amazonConfig) { + if (!amazonConfig.advancedEventHandling().dynamoDb().reportBatchItemFailures()) { + return null; + } + return StreamsEventResponse.builder().withBatchItemFailures( + failures.stream().map(id -> StreamsEventResponse.BatchItemFailure.builder() + .withItemIdentifier(id).build()).toList()) + .build(); + } + + @Override + public Class getMessageClass() { + return DynamodbStreamRecord.class; + } +} diff --git a/extensions/funqy/funqy-amazon-lambda/runtime/src/main/java/io/quarkus/funqy/lambda/event/dynamodb/PipesDynamoDbEventHandler.java b/extensions/funqy/funqy-amazon-lambda/runtime/src/main/java/io/quarkus/funqy/lambda/event/dynamodb/PipesDynamoDbEventHandler.java new file mode 100644 index 0000000000000..6363bf1e82a48 --- /dev/null +++ b/extensions/funqy/funqy-amazon-lambda/runtime/src/main/java/io/quarkus/funqy/lambda/event/dynamodb/PipesDynamoDbEventHandler.java @@ -0,0 +1,53 @@ +package io.quarkus.funqy.lambda.event.dynamodb; + +import java.io.InputStream; +import java.util.List; +import java.util.function.Supplier; +import java.util.stream.Stream; + +import com.amazonaws.services.lambda.runtime.events.DynamodbEvent.DynamodbStreamRecord; +import com.amazonaws.services.lambda.runtime.events.StreamsEventResponse; + +import io.quarkus.funqy.lambda.config.FunqyAmazonConfig; +import io.quarkus.funqy.lambda.event.EventHandler; + +public class PipesDynamoDbEventHandler + implements EventHandler, DynamodbStreamRecord, StreamsEventResponse> { + + @Override + public Stream streamEvent(List event, FunqyAmazonConfig amazonConfig) { + if (event == null) { + return Stream.empty(); + } + return event.stream(); + } + + @Override + public String getIdentifier(DynamodbStreamRecord message, FunqyAmazonConfig amazonConfig) { + return message.getDynamodb().getSequenceNumber(); + } + + @Override + public Supplier getBody(DynamodbStreamRecord message, FunqyAmazonConfig amazonConfig) { + throw new IllegalStateException(""" + DynamoDB records are too specific. It is not supported to extract a message from them. \ + Use the DynamodbStreamRecord in your funq method, or use EventBridge Pipes with CloudEvents. + """); + } + + @Override + public StreamsEventResponse createResponse(List failures, FunqyAmazonConfig amazonConfig) { + if (!amazonConfig.advancedEventHandling().dynamoDb().reportBatchItemFailures()) { + return null; + } + return StreamsEventResponse.builder().withBatchItemFailures( + failures.stream().map(id -> StreamsEventResponse.BatchItemFailure.builder() + .withItemIdentifier(id).build()).toList()) + .build(); + } + + @Override + public Class getMessageClass() { + return DynamodbStreamRecord.class; + } +} diff --git a/extensions/funqy/funqy-amazon-lambda/runtime/src/main/java/io/quarkus/funqy/lambda/event/kinesis/DateDeserializer.java b/extensions/funqy/funqy-amazon-lambda/runtime/src/main/java/io/quarkus/funqy/lambda/event/kinesis/DateDeserializer.java new file mode 100644 index 0000000000000..07f19bbfda6a8 --- /dev/null +++ b/extensions/funqy/funqy-amazon-lambda/runtime/src/main/java/io/quarkus/funqy/lambda/event/kinesis/DateDeserializer.java @@ -0,0 +1,23 @@ +package io.quarkus.funqy.lambda.event.kinesis; + +import java.io.IOException; +import java.util.Date; + +import com.fasterxml.jackson.core.JsonParser; +import com.fasterxml.jackson.databind.DeserializationContext; +import com.fasterxml.jackson.databind.JsonDeserializer; + +public class DateDeserializer extends JsonDeserializer { + + @Override + public Date deserialize(JsonParser jsonParser, DeserializationContext deserializationContext) throws IOException { + String fieldName = jsonParser.getCurrentName(); + if ("approximateArrivalTimestamp".equals(fieldName)) { + double timestamp = jsonParser.getDoubleValue(); + long milliseconds = (long) (timestamp * 1000); + return new Date(milliseconds); + } + // For other properties, delegate to default deserialization + return (Date) deserializationContext.handleUnexpectedToken(Date.class, jsonParser); + } +} diff --git a/extensions/funqy/funqy-amazon-lambda/runtime/src/main/java/io/quarkus/funqy/lambda/event/kinesis/KinesisEventHandler.java b/extensions/funqy/funqy-amazon-lambda/runtime/src/main/java/io/quarkus/funqy/lambda/event/kinesis/KinesisEventHandler.java new file mode 100644 index 0000000000000..bf70838bbbb1b --- /dev/null +++ b/extensions/funqy/funqy-amazon-lambda/runtime/src/main/java/io/quarkus/funqy/lambda/event/kinesis/KinesisEventHandler.java @@ -0,0 +1,54 @@ +package io.quarkus.funqy.lambda.event.kinesis; + +import java.io.ByteArrayInputStream; +import java.io.InputStream; +import java.util.List; +import java.util.function.Supplier; +import java.util.stream.Stream; + +import com.amazonaws.services.lambda.runtime.events.KinesisEvent; +import com.amazonaws.services.lambda.runtime.events.KinesisEvent.KinesisEventRecord; +import com.amazonaws.services.lambda.runtime.events.StreamsEventResponse; +import com.amazonaws.services.lambda.runtime.events.StreamsEventResponse.BatchItemFailure; +import com.fasterxml.jackson.databind.util.ByteBufferBackedInputStream; + +import io.quarkus.funqy.lambda.config.FunqyAmazonConfig; +import io.quarkus.funqy.lambda.event.EventHandler; + +public class KinesisEventHandler implements EventHandler { + + @Override + public Stream streamEvent(KinesisEvent event, FunqyAmazonConfig amazonConfig) { + if (event == null) { + return Stream.empty(); + } + return event.getRecords().stream(); + } + + @Override + public String getIdentifier(KinesisEventRecord message, FunqyAmazonConfig amazonConfig) { + return message.getKinesis().getSequenceNumber(); + } + + @Override + public Supplier getBody(KinesisEventRecord message, FunqyAmazonConfig amazonConfig) { + if (message.getKinesis().getData() == null) { + return ByteArrayInputStream::nullInputStream; + } + return () -> new ByteBufferBackedInputStream(message.getKinesis().getData()); + } + + @Override + public StreamsEventResponse createResponse(List failures, FunqyAmazonConfig amazonConfig) { + if (!amazonConfig.advancedEventHandling().kinesis().reportBatchItemFailures()) { + return null; + } + return StreamsEventResponse.builder().withBatchItemFailures( + failures.stream().map(id -> BatchItemFailure.builder().withItemIdentifier(id).build()).toList()).build(); + } + + @Override + public Class getMessageClass() { + return KinesisEventRecord.class; + } +} diff --git a/extensions/funqy/funqy-amazon-lambda/runtime/src/main/java/io/quarkus/funqy/lambda/event/kinesis/PipesKinesisEventHandler.java b/extensions/funqy/funqy-amazon-lambda/runtime/src/main/java/io/quarkus/funqy/lambda/event/kinesis/PipesKinesisEventHandler.java new file mode 100644 index 0000000000000..20205d2903d72 --- /dev/null +++ b/extensions/funqy/funqy-amazon-lambda/runtime/src/main/java/io/quarkus/funqy/lambda/event/kinesis/PipesKinesisEventHandler.java @@ -0,0 +1,54 @@ +package io.quarkus.funqy.lambda.event.kinesis; + +import java.io.ByteArrayInputStream; +import java.io.InputStream; +import java.util.List; +import java.util.function.Supplier; +import java.util.stream.Stream; + +import com.amazonaws.services.lambda.runtime.events.StreamsEventResponse; +import com.amazonaws.services.lambda.runtime.events.StreamsEventResponse.BatchItemFailure; +import com.fasterxml.jackson.databind.util.ByteBufferBackedInputStream; + +import io.quarkus.funqy.lambda.config.FunqyAmazonConfig; +import io.quarkus.funqy.lambda.event.EventHandler; +import io.quarkus.funqy.lambda.model.kinesis.PipesKinesisEvent; + +public class PipesKinesisEventHandler + implements EventHandler, PipesKinesisEvent, StreamsEventResponse> { + + @Override + public Stream streamEvent(List event, FunqyAmazonConfig amazonConfig) { + if (event == null) { + return Stream.empty(); + } + return event.stream(); + } + + @Override + public String getIdentifier(PipesKinesisEvent message, FunqyAmazonConfig amazonConfig) { + return message.getSequenceNumber(); + } + + @Override + public Supplier getBody(PipesKinesisEvent message, FunqyAmazonConfig amazonConfig) { + if (message.getData() == null) { + return ByteArrayInputStream::nullInputStream; + } + return () -> new ByteBufferBackedInputStream(message.getData()); + } + + @Override + public StreamsEventResponse createResponse(List failures, FunqyAmazonConfig amazonConfig) { + if (!amazonConfig.advancedEventHandling().kinesis().reportBatchItemFailures()) { + return null; + } + return StreamsEventResponse.builder().withBatchItemFailures( + failures.stream().map(id -> BatchItemFailure.builder().withItemIdentifier(id).build()).toList()).build(); + } + + @Override + public Class getMessageClass() { + return PipesKinesisEvent.class; + } +} diff --git a/extensions/funqy/funqy-amazon-lambda/runtime/src/main/java/io/quarkus/funqy/lambda/event/sns/SnsEventHandler.java b/extensions/funqy/funqy-amazon-lambda/runtime/src/main/java/io/quarkus/funqy/lambda/event/sns/SnsEventHandler.java new file mode 100644 index 0000000000000..a263ce6e90812 --- /dev/null +++ b/extensions/funqy/funqy-amazon-lambda/runtime/src/main/java/io/quarkus/funqy/lambda/event/sns/SnsEventHandler.java @@ -0,0 +1,49 @@ +package io.quarkus.funqy.lambda.event.sns; + +import java.io.ByteArrayInputStream; +import java.io.InputStream; +import java.nio.charset.StandardCharsets; +import java.util.List; +import java.util.function.Supplier; +import java.util.stream.Stream; + +import com.amazonaws.services.lambda.runtime.events.SNSEvent; +import com.amazonaws.services.lambda.runtime.events.SNSEvent.SNSRecord; + +import io.quarkus.funqy.lambda.config.FunqyAmazonConfig; +import io.quarkus.funqy.lambda.event.EventHandler; + +public class SnsEventHandler implements EventHandler { + + @Override + public Stream streamEvent(SNSEvent event, FunqyAmazonConfig amazonConfig) { + if (event == null) { + return Stream.empty(); + } + return event.getRecords().stream(); + } + + @Override + public String getIdentifier(SNSRecord message, FunqyAmazonConfig amazonConfig) { + return message.getSNS().getMessageId(); + } + + @Override + public Supplier getBody(SNSRecord message, FunqyAmazonConfig amazonConfig) { + if (message.getSNS() == null) { + return ByteArrayInputStream::nullInputStream; + } + return () -> new ByteArrayInputStream(message.getSNS().getMessage().getBytes(StandardCharsets.UTF_8)); + } + + @Override + public Void createResponse(List failures, FunqyAmazonConfig amazonConfig) { + // SNS does not support batch item failures. We return nothing, which results in no response + return null; + } + + @Override + public Class getMessageClass() { + return SNSRecord.class; + } +} diff --git a/extensions/funqy/funqy-amazon-lambda/runtime/src/main/java/io/quarkus/funqy/lambda/event/sqs/PipesSqsEventHandler.java b/extensions/funqy/funqy-amazon-lambda/runtime/src/main/java/io/quarkus/funqy/lambda/event/sqs/PipesSqsEventHandler.java new file mode 100644 index 0000000000000..f9158e6b5a446 --- /dev/null +++ b/extensions/funqy/funqy-amazon-lambda/runtime/src/main/java/io/quarkus/funqy/lambda/event/sqs/PipesSqsEventHandler.java @@ -0,0 +1,49 @@ +package io.quarkus.funqy.lambda.event.sqs; + +import java.io.ByteArrayInputStream; +import java.io.InputStream; +import java.nio.charset.StandardCharsets; +import java.util.List; +import java.util.function.Supplier; +import java.util.stream.Stream; + +import com.amazonaws.services.lambda.runtime.events.SQSEvent.SQSMessage; + +import io.quarkus.funqy.lambda.config.FunqyAmazonConfig; +import io.quarkus.funqy.lambda.event.EventHandler; +import io.quarkus.funqy.lambda.model.pipes.BatchItemFailures; +import io.quarkus.funqy.lambda.model.pipes.Response; + +public class PipesSqsEventHandler implements EventHandler, SQSMessage, Response> { + + @Override + public Stream streamEvent(final List event, final FunqyAmazonConfig amazonConfig) { + return event.stream(); + } + + @Override + public String getIdentifier(SQSMessage message, FunqyAmazonConfig amazonConfig) { + return message.getMessageId(); + } + + @Override + public Supplier getBody(SQSMessage message, FunqyAmazonConfig amazonConfig) { + if (message.getBody() == null) { + return ByteArrayInputStream::nullInputStream; + } + return () -> new ByteArrayInputStream(message.getBody().getBytes(StandardCharsets.UTF_8)); + } + + @Override + public Response createResponse(List failures, FunqyAmazonConfig amazonConfig) { + if (!amazonConfig.advancedEventHandling().sqs().reportBatchItemFailures()) { + return null; + } + return new Response(failures.stream().map(BatchItemFailures::new).toList()); + } + + @Override + public Class getMessageClass() { + return SQSMessage.class; + } +} diff --git a/extensions/funqy/funqy-amazon-lambda/runtime/src/main/java/io/quarkus/funqy/lambda/event/sqs/SqsEventHandler.java b/extensions/funqy/funqy-amazon-lambda/runtime/src/main/java/io/quarkus/funqy/lambda/event/sqs/SqsEventHandler.java new file mode 100644 index 0000000000000..ea39ff19dd065 --- /dev/null +++ b/extensions/funqy/funqy-amazon-lambda/runtime/src/main/java/io/quarkus/funqy/lambda/event/sqs/SqsEventHandler.java @@ -0,0 +1,54 @@ +package io.quarkus.funqy.lambda.event.sqs; + +import java.io.ByteArrayInputStream; +import java.io.InputStream; +import java.nio.charset.StandardCharsets; +import java.util.List; +import java.util.function.Supplier; +import java.util.stream.Stream; + +import com.amazonaws.services.lambda.runtime.events.SQSBatchResponse; +import com.amazonaws.services.lambda.runtime.events.SQSBatchResponse.BatchItemFailure; +import com.amazonaws.services.lambda.runtime.events.SQSEvent; +import com.amazonaws.services.lambda.runtime.events.SQSEvent.SQSMessage; + +import io.quarkus.funqy.lambda.config.FunqyAmazonConfig; +import io.quarkus.funqy.lambda.event.EventHandler; + +public class SqsEventHandler implements EventHandler { + + @Override + public Stream streamEvent(SQSEvent event, FunqyAmazonConfig amazonConfig) { + if (event == null) { + return Stream.empty(); + } + return event.getRecords().stream(); + } + + @Override + public String getIdentifier(SQSMessage message, FunqyAmazonConfig amazonConfig) { + return message.getMessageId(); + } + + @Override + public Supplier getBody(SQSMessage message, FunqyAmazonConfig amazonConfig) { + if (message.getBody() == null) { + return ByteArrayInputStream::nullInputStream; + } + return () -> new ByteArrayInputStream(message.getBody().getBytes(StandardCharsets.UTF_8)); + } + + @Override + public SQSBatchResponse createResponse(List failures, FunqyAmazonConfig amazonConfig) { + if (!amazonConfig.advancedEventHandling().sqs().reportBatchItemFailures()) { + return null; + } + return SQSBatchResponse.builder().withBatchItemFailures( + failures.stream().map(id -> BatchItemFailure.builder().withItemIdentifier(id).build()).toList()).build(); + } + + @Override + public Class getMessageClass() { + return SQSMessage.class; + } +} diff --git a/extensions/funqy/funqy-amazon-lambda/runtime/src/main/java/io/quarkus/funqy/lambda/model/FunqyMethod.java b/extensions/funqy/funqy-amazon-lambda/runtime/src/main/java/io/quarkus/funqy/lambda/model/FunqyMethod.java new file mode 100644 index 0000000000000..d0f979ebac03c --- /dev/null +++ b/extensions/funqy/funqy-amazon-lambda/runtime/src/main/java/io/quarkus/funqy/lambda/model/FunqyMethod.java @@ -0,0 +1,39 @@ +package io.quarkus.funqy.lambda.model; + +import java.util.Optional; + +import com.fasterxml.jackson.databind.JavaType; +import com.fasterxml.jackson.databind.ObjectReader; +import com.fasterxml.jackson.databind.ObjectWriter; + +public class FunqyMethod { + + private final ObjectReader reader; + private final ObjectWriter writer; + private final JavaType inputType; + private final JavaType outputType; + + public FunqyMethod(final ObjectReader reader, final ObjectWriter writer, final JavaType inputType, + final JavaType outputType) { + this.reader = reader; + this.writer = writer; + this.inputType = inputType; + this.outputType = outputType; + } + + public Optional getReader() { + return Optional.ofNullable(reader); + } + + public Optional getWriter() { + return Optional.ofNullable(writer); + } + + public Optional getInputType() { + return Optional.ofNullable(inputType); + } + + public Optional getOutputType() { + return Optional.ofNullable(outputType); + } +} diff --git a/extensions/funqy/funqy-amazon-lambda/runtime/src/main/java/io/quarkus/funqy/lambda/model/cloudevents/CloudEventDataV1.java b/extensions/funqy/funqy-amazon-lambda/runtime/src/main/java/io/quarkus/funqy/lambda/model/cloudevents/CloudEventDataV1.java new file mode 100644 index 0000000000000..9ba920623c2bf --- /dev/null +++ b/extensions/funqy/funqy-amazon-lambda/runtime/src/main/java/io/quarkus/funqy/lambda/model/cloudevents/CloudEventDataV1.java @@ -0,0 +1,27 @@ +package io.quarkus.funqy.lambda.model.cloudevents; + +import java.nio.charset.StandardCharsets; + +import io.cloudevents.CloudEventData; + +public class CloudEventDataV1 implements CloudEventData { + + private final byte[] data; + + public CloudEventDataV1(final String data) { + if (data == null) { + this.data = null; + } else { + this.data = data.getBytes(StandardCharsets.UTF_8); + } + } + + public CloudEventDataV1(final byte[] data) { + this.data = data; + } + + @Override + public byte[] toBytes() { + return this.data; + } +} diff --git a/extensions/funqy/funqy-amazon-lambda/runtime/src/main/java/io/quarkus/funqy/lambda/model/cloudevents/CloudEventV1.java b/extensions/funqy/funqy-amazon-lambda/runtime/src/main/java/io/quarkus/funqy/lambda/model/cloudevents/CloudEventV1.java new file mode 100644 index 0000000000000..7f1b52bb5c01f --- /dev/null +++ b/extensions/funqy/funqy-amazon-lambda/runtime/src/main/java/io/quarkus/funqy/lambda/model/cloudevents/CloudEventV1.java @@ -0,0 +1,172 @@ +package io.quarkus.funqy.lambda.model.cloudevents; + +import java.io.IOException; +import java.net.URI; +import java.time.OffsetDateTime; +import java.util.HashMap; +import java.util.Map; +import java.util.Set; + +import com.fasterxml.jackson.annotation.JsonAnySetter; +import com.fasterxml.jackson.annotation.JsonProperty; +import com.fasterxml.jackson.databind.JsonNode; + +import io.cloudevents.CloudEvent; +import io.cloudevents.CloudEventData; +import io.cloudevents.SpecVersion; + +/** + * This class represents a {@link CloudEvent} in version 1.0 and is Jackson friendly + */ +public class CloudEventV1 implements CloudEvent { + + //private static final Pattern JSON_TYPE_PATTERN = Pattern.compile("^(application|text)/([a-zA-Z]+\\+)?json;?.*$"); + + private final CloudEventDataV1 data; + private final SpecVersion specVersion; + private final String id; + private final String type; + private final URI source; + private final String dataContentType; + private final URI dataSchema; + private final String subject; + private final OffsetDateTime time; + private final Map extensions; + + public CloudEventV1( + @JsonProperty("specversion") String specVersion, + @JsonProperty("id") String id, + @JsonProperty("type") String type, + @JsonProperty("source") URI source, + @JsonProperty("datacontenttype") String dataContentType, + @JsonProperty("dataschema") URI dataSchema, + @JsonProperty("subject") String subject, + @JsonProperty("time") OffsetDateTime time, + @JsonProperty("data") JsonNode data, + @JsonProperty("data_base64") JsonNode dataBase64) { + this.specVersion = SpecVersion.parse(specVersion); + this.id = id; + this.type = type; + this.source = source; + this.dataContentType = dataContentType; + this.dataSchema = dataSchema; + this.subject = subject; + this.time = time; + this.extensions = new HashMap<>(); + this.data = deserializeData(data, dataBase64, dataContentType); + } + + @JsonAnySetter + public void add(String property, String value) { + switch (property) { + case "specversion": + case "id": + case "source": + case "type": + case "datacontenttype": + case "dataschema": + case "data": + case "data_base64": + case "subject": + case "time": + // Those names are reserved + return; + } + extensions.put(property, value); + } + + private CloudEventDataV1 deserializeData(final JsonNode data, final JsonNode dataBase64, + final String dataContentType) { + if (dataBase64 != null) { + try { + return new CloudEventDataV1(dataBase64.binaryValue()); + } catch (IOException e) { + throw new RuntimeException(e); + } + } else if (data == null) { + return null; + } + + if (data.isTextual()) { + return new CloudEventDataV1(data.asText()); + } else { + // This should work for every other type. Even for application/json, because we need to serialize + // the data anyway for the interface. + return new CloudEventDataV1(data.toString()); + } + } + + @Override + public CloudEventData getData() { + return this.data; + } + + @Override + public SpecVersion getSpecVersion() { + return this.specVersion; + } + + @Override + public String getId() { + return this.id; + } + + @Override + public String getType() { + return this.type; + } + + @Override + public URI getSource() { + return this.source; + } + + @Override + public String getDataContentType() { + return this.dataContentType; + } + + @Override + public URI getDataSchema() { + return this.dataSchema; + } + + @Override + public String getSubject() { + return this.subject; + } + + @Override + public OffsetDateTime getTime() { + return this.time; + } + + @Override + public Object getAttribute(final String attributeName) throws IllegalArgumentException { + return switch (attributeName) { + case "specversion" -> getSpecVersion(); + case "id" -> getId(); + case "source" -> getSource(); + case "type" -> getType(); + case "datacontenttype" -> getDataContentType(); + case "dataschema" -> getDataSchema(); + case "subject" -> getSubject(); + case "time" -> getTime(); + default -> throw new IllegalArgumentException( + "The specified attribute name \"" + attributeName + "\" is not specified in version v1."); + }; + } + + @Override + public Object getExtension(final String s) { + if (s == null) { + throw new IllegalArgumentException("Extension name cannot be null"); + } + return this.extensions.get(s); + } + + @Override + public Set getExtensionNames() { + return this.extensions.keySet(); + } +} diff --git a/extensions/funqy/funqy-amazon-lambda/runtime/src/main/java/io/quarkus/funqy/lambda/model/kinesis/PipesKinesisEvent.java b/extensions/funqy/funqy-amazon-lambda/runtime/src/main/java/io/quarkus/funqy/lambda/model/kinesis/PipesKinesisEvent.java new file mode 100644 index 0000000000000..78ed01667a209 --- /dev/null +++ b/extensions/funqy/funqy-amazon-lambda/runtime/src/main/java/io/quarkus/funqy/lambda/model/kinesis/PipesKinesisEvent.java @@ -0,0 +1,61 @@ +package io.quarkus.funqy.lambda.model.kinesis; + +import com.amazonaws.services.lambda.runtime.events.models.kinesis.Record; + +/** + * For some reason AWS decided to flatten the model in EventBridge pipes for Kinesis. So there is no additional + * property called kinesis. We use the Record model and add the missing properties. Sadly I could not find a + * dedicated model for Kinesis in Pipes. So it is a combination of + * {@link com.amazonaws.services.lambda.runtime.events.KinesisEvent.KinesisEventRecord} and {@link Record} + */ +public class PipesKinesisEvent extends Record { + + private String eventSource; + + private String eventID; + + private String invokeIdentityArn; + + private String eventName; + + private String eventVersion; + + private String eventSourceARN; + + private String awsRegion; + + public PipesKinesisEvent setEventSource(final String eventSource) { + this.eventSource = eventSource; + return this; + } + + public PipesKinesisEvent setEventID(final String eventID) { + this.eventID = eventID; + return this; + } + + public PipesKinesisEvent setInvokeIdentityArn(final String invokeIdentityArn) { + this.invokeIdentityArn = invokeIdentityArn; + return this; + } + + public PipesKinesisEvent setEventName(final String eventName) { + this.eventName = eventName; + return this; + } + + public PipesKinesisEvent setEventVersion(final String eventVersion) { + this.eventVersion = eventVersion; + return this; + } + + public PipesKinesisEvent setEventSourceARN(final String eventSourceARN) { + this.eventSourceARN = eventSourceARN; + return this; + } + + public PipesKinesisEvent setAwsRegion(final String awsRegion) { + this.awsRegion = awsRegion; + return this; + } +} diff --git a/extensions/funqy/funqy-amazon-lambda/runtime/src/main/java/io/quarkus/funqy/lambda/model/pipes/BatchItemFailures.java b/extensions/funqy/funqy-amazon-lambda/runtime/src/main/java/io/quarkus/funqy/lambda/model/pipes/BatchItemFailures.java new file mode 100644 index 0000000000000..99110edfa4ecf --- /dev/null +++ b/extensions/funqy/funqy-amazon-lambda/runtime/src/main/java/io/quarkus/funqy/lambda/model/pipes/BatchItemFailures.java @@ -0,0 +1,14 @@ +package io.quarkus.funqy.lambda.model.pipes; + +public class BatchItemFailures { + + private final String itemIdentifier; + + public BatchItemFailures(final String itemIdentifier) { + this.itemIdentifier = itemIdentifier; + } + + public String getItemIdentifier() { + return itemIdentifier; + } +} diff --git a/extensions/funqy/funqy-amazon-lambda/runtime/src/main/java/io/quarkus/funqy/lambda/model/pipes/Response.java b/extensions/funqy/funqy-amazon-lambda/runtime/src/main/java/io/quarkus/funqy/lambda/model/pipes/Response.java new file mode 100644 index 0000000000000..20b817d1b91f2 --- /dev/null +++ b/extensions/funqy/funqy-amazon-lambda/runtime/src/main/java/io/quarkus/funqy/lambda/model/pipes/Response.java @@ -0,0 +1,16 @@ +package io.quarkus.funqy.lambda.model.pipes; + +import java.util.List; + +public class Response { + + private final List batchItemFailures; + + public Response(final List batchItemFailures) { + this.batchItemFailures = batchItemFailures; + } + + public List getBatchItemFailures() { + return batchItemFailures; + } +}