diff --git a/bom/application/pom.xml b/bom/application/pom.xml index 6a4cbeaf71ea30..a79e7929fbdf2a 100644 --- a/bom/application/pom.xml +++ b/bom/application/pom.xml @@ -149,6 +149,7 @@ 1.2.3 3.11.6 2.16.0 + 3.0.0 3.1.0 1.0.0 2.0.0 @@ -5762,6 +5763,12 @@ + + io.cloudevents + cloudevents-api + ${cloudevents-api.version} + + com.microsoft.azure.functions azure-functions-java-library diff --git a/extensions/funqy/funqy-amazon-lambda/deployment/pom.xml b/extensions/funqy/funqy-amazon-lambda/deployment/pom.xml index 2a03684b9a6752..159c1b235e5ad5 100644 --- a/extensions/funqy/funqy-amazon-lambda/deployment/pom.xml +++ b/extensions/funqy/funqy-amazon-lambda/deployment/pom.xml @@ -29,6 +29,16 @@ io.quarkus quarkus-arc-deployment + + io.quarkus + quarkus-junit5-internal + test + + + io.rest-assured + rest-assured + test + diff --git a/extensions/funqy/funqy-amazon-lambda/deployment/src/main/java/io/quarkus/funqy/deployment/bindings/FunqyAmazonLambdaProcessor.java b/extensions/funqy/funqy-amazon-lambda/deployment/src/main/java/io/quarkus/funqy/deployment/bindings/FunqyAmazonLambdaProcessor.java new file mode 100644 index 00000000000000..298f90d74d0e73 --- /dev/null +++ b/extensions/funqy/funqy-amazon-lambda/deployment/src/main/java/io/quarkus/funqy/deployment/bindings/FunqyAmazonLambdaProcessor.java @@ -0,0 +1,46 @@ +package io.quarkus.funqy.deployment.bindings; + +import com.amazonaws.services.lambda.runtime.events.DynamodbEvent; +import com.amazonaws.services.lambda.runtime.events.KinesisEvent; +import com.amazonaws.services.lambda.runtime.events.SNSEvent; +import com.amazonaws.services.lambda.runtime.events.SQSBatchResponse; +import com.amazonaws.services.lambda.runtime.events.SQSEvent; +import com.amazonaws.services.lambda.runtime.events.StreamsEventResponse; +import com.amazonaws.services.lambda.runtime.events.models.kinesis.Record; + +import io.quarkus.deployment.annotations.BuildProducer; +import io.quarkus.deployment.annotations.BuildStep; +import io.quarkus.deployment.builditem.nativeimage.ReflectiveClassBuildItem; +import io.quarkus.deployment.pkg.steps.NativeBuild; +import io.quarkus.funqy.lambda.model.cloudevents.CloudEventDataV1; +import io.quarkus.funqy.lambda.model.cloudevents.CloudEventV1; + +public class FunqyAmazonLambdaProcessor { + + @BuildStep(onlyIf = NativeBuild.class) + public void process(BuildProducer reflectiveClass) { + reflectiveClass.produce(ReflectiveClassBuildItem.builder( + // io CloudEvents + CloudEventV1.class.getName(), + CloudEventDataV1.class.getName(), + // SQS + SQSEvent.class.getName(), + SQSEvent.SQSMessage.class.getName(), + SQSEvent.MessageAttribute.class.getName(), + SQSBatchResponse.class.getName(), + SQSBatchResponse.BatchItemFailure.class.getName(), + // SNS + SNSEvent.class.getName(), + SNSEvent.SNSRecord.class.getName(), + SNSEvent.SNS.class.getName(), + // Kinesis + KinesisEvent.class.getName(), + KinesisEvent.KinesisEventRecord.class.getName(), + Record.class.getName(), + StreamsEventResponse.class.getName(), + StreamsEventResponse.BatchItemFailure.class.getName(), + // DynamoDB + DynamodbEvent.class.getName(), + DynamodbEvent.DynamodbStreamRecord.class.getName()).constructors().methods().fields().build()); + } +} diff --git a/extensions/funqy/funqy-amazon-lambda/deployment/src/main/java/io/quarkus/funqy/deployment/bindings/FunqyLambdaBuildStep.java b/extensions/funqy/funqy-amazon-lambda/deployment/src/main/java/io/quarkus/funqy/deployment/bindings/FunqyLambdaBuildStep.java index 6a8bcbdc213d9c..9633e155a1d49b 100644 --- a/extensions/funqy/funqy-amazon-lambda/deployment/src/main/java/io/quarkus/funqy/deployment/bindings/FunqyLambdaBuildStep.java +++ b/extensions/funqy/funqy-amazon-lambda/deployment/src/main/java/io/quarkus/funqy/deployment/bindings/FunqyLambdaBuildStep.java @@ -20,6 +20,8 @@ import io.quarkus.funqy.deployment.FunctionBuildItem; import io.quarkus.funqy.deployment.FunctionInitializedBuildItem; import io.quarkus.funqy.lambda.FunqyLambdaBindingRecorder; +import io.quarkus.funqy.lambda.config.FunqyAmazonBuildTimeConfig; +import io.quarkus.funqy.lambda.config.FunqyAmazonConfig; import io.quarkus.funqy.runtime.FunqyConfig; import io.quarkus.runtime.LaunchMode; @@ -37,17 +39,20 @@ public void init(List functions, BuildProducer feature, Optional hasFunctions, LambdaObjectMapperInitializedBuildItem mapperDependency, - BeanContainerBuildItem beanContainer) throws Exception { + BeanContainerBuildItem beanContainer, + FunqyAmazonBuildTimeConfig buildTimeConfig) throws Exception { if (!hasFunctions.isPresent() || hasFunctions.get() == null) return; feature.produce(new FeatureBuildItem(FUNQY_AMAZON_LAMBDA)); - recorder.init(beanContainer.getValue()); + recorder.init(beanContainer.getValue(), buildTimeConfig); } @BuildStep @Record(RUNTIME_INIT) - public RuntimeComplete choose(FunqyConfig config, FunqyLambdaBindingRecorder recorder) { - recorder.chooseInvoker(config); + public RuntimeComplete choose(FunqyConfig config, + FunqyAmazonConfig amazonConfig, + FunqyLambdaBindingRecorder recorder) { + recorder.chooseInvoker(config, amazonConfig); return new RuntimeComplete(); } diff --git a/extensions/funqy/funqy-amazon-lambda/deployment/src/test/java/io/quarkus/funqy/test/AnyFunctionTest.java b/extensions/funqy/funqy-amazon-lambda/deployment/src/test/java/io/quarkus/funqy/test/AnyFunctionTest.java new file mode 100644 index 00000000000000..b80bcdb19343aa --- /dev/null +++ b/extensions/funqy/funqy-amazon-lambda/deployment/src/test/java/io/quarkus/funqy/test/AnyFunctionTest.java @@ -0,0 +1,52 @@ +package io.quarkus.funqy.test; + +import static io.quarkus.funqy.test.util.EventDataProvider.getData; + +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.extension.RegisterExtension; + +import io.quarkus.funqy.test.util.EventDataProvider; +import io.quarkus.test.QuarkusUnitTest; +import io.restassured.RestAssured; + +/** + * Testing that the item-function can handle an event, which just represents the item itself. So no special aws event + * is used as envelope + */ +public class AnyFunctionTest { + @RegisterExtension + static QuarkusUnitTest test = new QuarkusUnitTest() + .withApplicationRoot((jar) -> jar + .addAsResource("any-function.properties", "application.properties") + .addAsResource("events/any", "events") + .addClasses(TestFunctions.class, Item.class, + EventDataProvider.class)); + + @Test + public void should_return_no_failures_if_processing_is_ok() { + // given + var body = getData("ok.json"); + + // when + var response = RestAssured.given().contentType("application/json") + .body(body) + .post("/"); + + // then + response.then().statusCode(204); + } + + @Test + public void should_return_one_failure_if_processing_fails() { + // given + var body = getData("fail.json"); + + // when + var response = RestAssured.given().contentType("application/json") + .body(body) + .post("/"); + + // then + response.then().statusCode(500); + } +} diff --git a/extensions/funqy/funqy-amazon-lambda/deployment/src/test/java/io/quarkus/funqy/test/CloudEventsEventFunctionTest.java b/extensions/funqy/funqy-amazon-lambda/deployment/src/test/java/io/quarkus/funqy/test/CloudEventsEventFunctionTest.java new file mode 100644 index 00000000000000..97f6c877a05e0a --- /dev/null +++ b/extensions/funqy/funqy-amazon-lambda/deployment/src/test/java/io/quarkus/funqy/test/CloudEventsEventFunctionTest.java @@ -0,0 +1,69 @@ +package io.quarkus.funqy.test; + +import static io.quarkus.funqy.test.util.EventDataProvider.getData; +import static org.hamcrest.MatcherAssert.assertThat; +import static org.hamcrest.Matchers.empty; +import static org.hamcrest.Matchers.hasItem; +import static org.hamcrest.Matchers.hasSize; +import static org.hamcrest.Matchers.is; + +import jakarta.inject.Inject; + +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.extension.RegisterExtension; + +import io.quarkus.funqy.test.model.BatchItemFailures; +import io.quarkus.funqy.test.model.ItemFailure; +import io.quarkus.funqy.test.util.BodyDeserializer; +import io.quarkus.funqy.test.util.EventDataProvider; +import io.quarkus.test.QuarkusUnitTest; +import io.restassured.RestAssured; + +/** + * Testing that the cloudevents-function with a cloud events specific model can handle events. + */ +public class CloudEventsEventFunctionTest { + @RegisterExtension + static QuarkusUnitTest test = new QuarkusUnitTest() + .overrideRuntimeConfigKey("quarkus.funqy.export", "cloudevents-function") + .withApplicationRoot((jar) -> jar + .addAsResource("item-function.properties", "application.properties") + .addAsResource("events/cloudevents", "events") + .addClasses(TestFunctions.class, Item.class, + BatchItemFailures.class, ItemFailure.class, + EventDataProvider.class, BodyDeserializer.class)); + + @Inject + BodyDeserializer deserializer; + + @Test + public void should_return_no_failures_if_processing_is_ok() { + // given + var body = getData("ok.json"); + + // when + var response = RestAssured.given().contentType("application/json") + .body(body) + .post("/"); + + // then + var respBody = deserializer.getBodyAs(response.then().statusCode(200), BatchItemFailures.class); + assertThat(respBody.batchItemFailures(), is(empty())); + } + + @Test + public void should_return_one_failure_if_processing_fails() { + // given + var body = getData("fail.json"); + + // when + var response = RestAssured.given().contentType("application/json") + .body(body) + .post("/"); + + // then + var respBody = deserializer.getBodyAs(response.then().statusCode(200), BatchItemFailures.class); + assertThat(respBody.batchItemFailures(), hasSize(1)); + assertThat(respBody.batchItemFailures().stream().map(ItemFailure::itemIdentifier).toList(), hasItem("1")); + } +} diff --git a/extensions/funqy/funqy-amazon-lambda/deployment/src/test/java/io/quarkus/funqy/test/CloudEventsFunctionTest.java b/extensions/funqy/funqy-amazon-lambda/deployment/src/test/java/io/quarkus/funqy/test/CloudEventsFunctionTest.java new file mode 100644 index 00000000000000..793729fbd1fbf6 --- /dev/null +++ b/extensions/funqy/funqy-amazon-lambda/deployment/src/test/java/io/quarkus/funqy/test/CloudEventsFunctionTest.java @@ -0,0 +1,68 @@ +package io.quarkus.funqy.test; + +import static io.quarkus.funqy.test.util.EventDataProvider.getData; +import static org.hamcrest.MatcherAssert.assertThat; +import static org.hamcrest.Matchers.empty; +import static org.hamcrest.Matchers.hasItem; +import static org.hamcrest.Matchers.hasSize; +import static org.hamcrest.Matchers.is; + +import jakarta.inject.Inject; + +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.extension.RegisterExtension; + +import io.quarkus.funqy.test.model.BatchItemFailures; +import io.quarkus.funqy.test.model.ItemFailure; +import io.quarkus.funqy.test.util.BodyDeserializer; +import io.quarkus.funqy.test.util.EventDataProvider; +import io.quarkus.test.QuarkusUnitTest; +import io.restassured.RestAssured; + +/** + * Testing that the item-function with a customer model can handle cloud events. + */ +public class CloudEventsFunctionTest { + @RegisterExtension + static QuarkusUnitTest test = new QuarkusUnitTest() + .withApplicationRoot((jar) -> jar + .addAsResource("item-function.properties", "application.properties") + .addAsResource("events/cloudevents", "events") + .addClasses(TestFunctions.class, Item.class, + BatchItemFailures.class, ItemFailure.class, + EventDataProvider.class, BodyDeserializer.class)); + + @Inject + BodyDeserializer deserializer; + + @Test + public void should_return_no_failures_if_processing_is_ok() { + // given + var body = getData("ok.json"); + + // when + var response = RestAssured.given().contentType("application/json") + .body(body) + .post("/"); + + // then + var respBody = deserializer.getBodyAs(response.then().statusCode(200), BatchItemFailures.class); + assertThat(respBody.batchItemFailures(), is(empty())); + } + + @Test + public void should_return_one_failure_if_processing_fails() { + // given + var body = getData("fail.json"); + + // when + var response = RestAssured.given().contentType("application/json") + .body(body) + .post("/"); + + // then + var respBody = deserializer.getBodyAs(response.then().statusCode(200), BatchItemFailures.class); + assertThat(respBody.batchItemFailures(), hasSize(1)); + assertThat(respBody.batchItemFailures().stream().map(ItemFailure::itemIdentifier).toList(), hasItem("1")); + } +} diff --git a/extensions/funqy/funqy-amazon-lambda/deployment/src/test/java/io/quarkus/funqy/test/DynamoDbEventFunctionTest.java b/extensions/funqy/funqy-amazon-lambda/deployment/src/test/java/io/quarkus/funqy/test/DynamoDbEventFunctionTest.java new file mode 100644 index 00000000000000..28f735375f9ab0 --- /dev/null +++ b/extensions/funqy/funqy-amazon-lambda/deployment/src/test/java/io/quarkus/funqy/test/DynamoDbEventFunctionTest.java @@ -0,0 +1,100 @@ +package io.quarkus.funqy.test; + +import static io.quarkus.funqy.test.util.EventDataProvider.getData; +import static org.hamcrest.MatcherAssert.assertThat; +import static org.hamcrest.Matchers.empty; +import static org.hamcrest.Matchers.hasItem; +import static org.hamcrest.Matchers.hasSize; +import static org.hamcrest.Matchers.is; + +import jakarta.inject.Inject; + +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.extension.RegisterExtension; + +import io.quarkus.funqy.test.model.BatchItemFailures; +import io.quarkus.funqy.test.model.ItemFailure; +import io.quarkus.funqy.test.util.BodyDeserializer; +import io.quarkus.funqy.test.util.EventDataProvider; +import io.quarkus.test.QuarkusUnitTest; +import io.restassured.RestAssured; + +/** + * Testing that the dynamodb-function with a customer model can handle cloud events. + */ +public class DynamoDbEventFunctionTest { + @RegisterExtension + static QuarkusUnitTest test = new QuarkusUnitTest() + .overrideRuntimeConfigKey("quarkus.funqy.export", "dynamodb-function") + .withApplicationRoot((jar) -> jar + .addAsResource("item-function.properties", "application.properties") + .addAsResource("events/dynamodb", "events") + .addClasses(TestFunctions.class, Item.class, + BatchItemFailures.class, ItemFailure.class, + EventDataProvider.class, BodyDeserializer.class)); + + @Inject + BodyDeserializer deserializer; + + @Test + public void should_return_no_failures_if_processing_is_ok() { + // given + var body = getData("ok.json"); + + // when + var response = RestAssured.given().contentType("application/json") + .body(body) + .post("/"); + + // then + var respBody = deserializer.getBodyAs(response.then().statusCode(200), BatchItemFailures.class); + assertThat(respBody.batchItemFailures(), is(empty())); + } + + @Test + public void should_return_one_failure_if_processing_fails() { + // given + var body = getData("fail.json"); + + // when + var response = RestAssured.given().contentType("application/json") + .body(body) + .post("/"); + + // then + var respBody = deserializer.getBodyAs(response.then().statusCode(200), BatchItemFailures.class); + assertThat(respBody.batchItemFailures(), hasSize(1)); + assertThat(respBody.batchItemFailures().stream().map(ItemFailure::itemIdentifier).toList(), hasItem("1")); + } + + @Test + public void should_return_no_failures_if_processing_pipes_is_ok() { + // given + var body = getData("pipes-ok.json"); + + // when + var response = RestAssured.given().contentType("application/json") + .body(body) + .post("/"); + + // then + var respBody = deserializer.getBodyAs(response.then().statusCode(200), BatchItemFailures.class); + assertThat(respBody.batchItemFailures(), is(empty())); + } + + @Test + public void should_return_one_failure_if_processing_pipes_fails() { + // given + var body = getData("pipes-fail.json"); + + // when + var response = RestAssured.given().contentType("application/json") + .body(body) + .post("/"); + + // then + var respBody = deserializer.getBodyAs(response.then().statusCode(200), BatchItemFailures.class); + assertThat(respBody.batchItemFailures(), hasSize(1)); + assertThat(respBody.batchItemFailures().stream().map(ItemFailure::itemIdentifier).toList(), hasItem("1")); + } +} diff --git a/extensions/funqy/funqy-amazon-lambda/deployment/src/test/java/io/quarkus/funqy/test/DynamoDbFunctionTest.java b/extensions/funqy/funqy-amazon-lambda/deployment/src/test/java/io/quarkus/funqy/test/DynamoDbFunctionTest.java new file mode 100644 index 00000000000000..92782894170466 --- /dev/null +++ b/extensions/funqy/funqy-amazon-lambda/deployment/src/test/java/io/quarkus/funqy/test/DynamoDbFunctionTest.java @@ -0,0 +1,72 @@ +package io.quarkus.funqy.test; + +import static io.quarkus.funqy.test.util.EventDataProvider.getData; +import static org.hamcrest.MatcherAssert.assertThat; +import static org.hamcrest.Matchers.hasItems; +import static org.hamcrest.Matchers.hasSize; + +import jakarta.inject.Inject; + +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.extension.RegisterExtension; + +import io.quarkus.funqy.test.model.BatchItemFailures; +import io.quarkus.funqy.test.model.ItemFailure; +import io.quarkus.funqy.test.util.BodyDeserializer; +import io.quarkus.funqy.test.util.EventDataProvider; +import io.quarkus.test.QuarkusUnitTest; +import io.restassured.RestAssured; + +/** + * Testing that the item-function with a customer model cannot handle dynamodb events. Due to the structure we cannot + * really guess which data is relevant for the customer. But the impl will allow to use batching. + */ +public class DynamoDbFunctionTest { + @RegisterExtension + static QuarkusUnitTest test = new QuarkusUnitTest() + .withApplicationRoot((jar) -> jar + .addAsResource("item-function.properties", "application.properties") + .addAsResource("events/dynamodb", "events") + .addClasses(TestFunctions.class, Item.class, + BatchItemFailures.class, ItemFailure.class, + EventDataProvider.class, BodyDeserializer.class)); + + @Inject + BodyDeserializer deserializer; + + @Test + public void should_fail_on_dynamodb_event_without_dynamodb_event_type() { + // given + var body = getData("ok.json"); + + // when + var response = RestAssured.given().contentType("application/json") + .body(body) + .post("/"); + + // then + // It is not supported to transform the DynamoDB event record to an internal model. Therefore, if somebody + // would try this, the lambda would return every message as failure in batch item failures and log an error. + var respBody = deserializer.getBodyAs(response.then().statusCode(200), BatchItemFailures.class); + assertThat(respBody.batchItemFailures(), hasSize(2)); + assertThat(respBody.batchItemFailures().stream().map(ItemFailure::itemIdentifier).toList(), hasItems("1", "2")); + } + + @Test + public void should_fail_on_dynamodb_event_via_pipes_without_dynamodb_event_type() { + // given + var body = getData("pipes-ok.json"); + + // when + var response = RestAssured.given().contentType("application/json") + .body(body) + .post("/"); + + // then + // It is not supported to transform the DynamoDB event record to an internal model. Therefore, if somebody + // would try this, the lambda would return every message as failure in batch item failures and log an error. + var respBody = deserializer.getBodyAs(response.then().statusCode(200), BatchItemFailures.class); + assertThat(respBody.batchItemFailures(), hasSize(2)); + assertThat(respBody.batchItemFailures().stream().map(ItemFailure::itemIdentifier).toList(), hasItems("1", "2")); + } +} diff --git a/extensions/funqy/funqy-amazon-lambda/deployment/src/test/java/io/quarkus/funqy/test/Item.java b/extensions/funqy/funqy-amazon-lambda/deployment/src/test/java/io/quarkus/funqy/test/Item.java new file mode 100644 index 00000000000000..26975491bde6db --- /dev/null +++ b/extensions/funqy/funqy-amazon-lambda/deployment/src/test/java/io/quarkus/funqy/test/Item.java @@ -0,0 +1,24 @@ +package io.quarkus.funqy.test; + +public class Item { + + String message; + + boolean throwError; + + public String getMessage() { + return message; + } + + public void setMessage(final String message) { + this.message = message; + } + + public boolean isThrowError() { + return throwError; + } + + public void setThrowError(final boolean throwError) { + this.throwError = throwError; + } +} diff --git a/extensions/funqy/funqy-amazon-lambda/deployment/src/test/java/io/quarkus/funqy/test/KinesisEventFunctionTest.java b/extensions/funqy/funqy-amazon-lambda/deployment/src/test/java/io/quarkus/funqy/test/KinesisEventFunctionTest.java new file mode 100644 index 00000000000000..cfc283165bf2c8 --- /dev/null +++ b/extensions/funqy/funqy-amazon-lambda/deployment/src/test/java/io/quarkus/funqy/test/KinesisEventFunctionTest.java @@ -0,0 +1,100 @@ +package io.quarkus.funqy.test; + +import static io.quarkus.funqy.test.util.EventDataProvider.getData; +import static org.hamcrest.MatcherAssert.assertThat; +import static org.hamcrest.Matchers.empty; +import static org.hamcrest.Matchers.hasItem; +import static org.hamcrest.Matchers.hasSize; +import static org.hamcrest.Matchers.is; + +import jakarta.inject.Inject; + +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.extension.RegisterExtension; + +import io.quarkus.funqy.test.model.BatchItemFailures; +import io.quarkus.funqy.test.model.ItemFailure; +import io.quarkus.funqy.test.util.BodyDeserializer; +import io.quarkus.funqy.test.util.EventDataProvider; +import io.quarkus.test.QuarkusUnitTest; +import io.restassured.RestAssured; + +/** + * Testing that the kinesis-function with a kinesis specific model can handle events. + */ +public class KinesisEventFunctionTest { + @RegisterExtension + static QuarkusUnitTest test = new QuarkusUnitTest() + .overrideRuntimeConfigKey("quarkus.funqy.export", "kinesis-function") + .withApplicationRoot((jar) -> jar + .addAsResource("item-function.properties", "application.properties") + .addAsResource("events/kinesis", "events") + .addClasses(TestFunctions.class, Item.class, + BatchItemFailures.class, ItemFailure.class, + EventDataProvider.class, BodyDeserializer.class)); + + @Inject + BodyDeserializer deserializer; + + @Test + public void should_return_no_failures_if_processing_is_ok() { + // given + var body = getData("ok.json"); + + // when + var response = RestAssured.given().contentType("application/json") + .body(body) + .post("/"); + + // then + var respBody = deserializer.getBodyAs(response.then().statusCode(200), BatchItemFailures.class); + assertThat(respBody.batchItemFailures(), is(empty())); + } + + @Test + public void should_return_one_failure_if_processing_fails() { + // given + var body = getData("fail.json"); + + // when + var response = RestAssured.given().contentType("application/json") + .body(body) + .post("/"); + + // then + var respBody = deserializer.getBodyAs(response.then().statusCode(200), BatchItemFailures.class); + assertThat(respBody.batchItemFailures(), hasSize(1)); + assertThat(respBody.batchItemFailures().stream().map(ItemFailure::itemIdentifier).toList(), hasItem("1")); + } + + @Test + public void should_return_no_failures_if_processing_pipes_is_ok() { + // given + var body = getData("pipes-ok.json"); + + // when + var response = RestAssured.given().contentType("application/json") + .body(body) + .post("/"); + + // then + var respBody = deserializer.getBodyAs(response.then().statusCode(200), BatchItemFailures.class); + assertThat(respBody.batchItemFailures(), is(empty())); + } + + @Test + public void should_return_one_failure_if_processing_pipes_fails() { + // given + var body = getData("pipes-fail.json"); + + // when + var response = RestAssured.given().contentType("application/json") + .body(body) + .post("/"); + + // then + var respBody = deserializer.getBodyAs(response.then().statusCode(200), BatchItemFailures.class); + assertThat(respBody.batchItemFailures(), hasSize(1)); + assertThat(respBody.batchItemFailures().stream().map(ItemFailure::itemIdentifier).toList(), hasItem("1")); + } +} diff --git a/extensions/funqy/funqy-amazon-lambda/deployment/src/test/java/io/quarkus/funqy/test/KinesisFunctionTest.java b/extensions/funqy/funqy-amazon-lambda/deployment/src/test/java/io/quarkus/funqy/test/KinesisFunctionTest.java new file mode 100644 index 00000000000000..40d7ad0f216a00 --- /dev/null +++ b/extensions/funqy/funqy-amazon-lambda/deployment/src/test/java/io/quarkus/funqy/test/KinesisFunctionTest.java @@ -0,0 +1,99 @@ +package io.quarkus.funqy.test; + +import static io.quarkus.funqy.test.util.EventDataProvider.getData; +import static org.hamcrest.MatcherAssert.assertThat; +import static org.hamcrest.Matchers.empty; +import static org.hamcrest.Matchers.hasItem; +import static org.hamcrest.Matchers.hasSize; +import static org.hamcrest.Matchers.is; + +import jakarta.inject.Inject; + +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.extension.RegisterExtension; + +import io.quarkus.funqy.test.model.BatchItemFailures; +import io.quarkus.funqy.test.model.ItemFailure; +import io.quarkus.funqy.test.util.BodyDeserializer; +import io.quarkus.funqy.test.util.EventDataProvider; +import io.quarkus.test.QuarkusUnitTest; +import io.restassured.RestAssured; + +/** + * Testing that the item-function with a customer model can handle kinesis events. + */ +public class KinesisFunctionTest { + @RegisterExtension + static QuarkusUnitTest test = new QuarkusUnitTest() + .withApplicationRoot((jar) -> jar + .addAsResource("item-function.properties", "application.properties") + .addAsResource("events/kinesis", "events") + .addClasses(TestFunctions.class, Item.class, + BatchItemFailures.class, ItemFailure.class, + EventDataProvider.class, BodyDeserializer.class)); + + @Inject + BodyDeserializer deserializer; + + @Test + public void should_return_no_failures_if_processing_is_ok() { + // given + var body = getData("ok.json"); + + // when + var response = RestAssured.given().contentType("application/json") + .body(body) + .post("/"); + + // then + var respBody = deserializer.getBodyAs(response.then().statusCode(200), BatchItemFailures.class); + assertThat(respBody.batchItemFailures(), is(empty())); + } + + @Test + public void should_return_one_failure_if_processing_fails() { + // given + var body = getData("fail.json"); + + // when + var response = RestAssured.given().contentType("application/json") + .body(body) + .post("/"); + + // then + var respBody = deserializer.getBodyAs(response.then().statusCode(200), BatchItemFailures.class); + assertThat(respBody.batchItemFailures(), hasSize(1)); + assertThat(respBody.batchItemFailures().stream().map(ItemFailure::itemIdentifier).toList(), hasItem("1")); + } + + @Test + public void should_return_no_failures_if_processing_pipes_is_ok() { + // given + var body = getData("pipes-ok.json"); + + // when + var response = RestAssured.given().contentType("application/json") + .body(body) + .post("/"); + + // then + var respBody = deserializer.getBodyAs(response.then().statusCode(200), BatchItemFailures.class); + assertThat(respBody.batchItemFailures(), is(empty())); + } + + @Test + public void should_return_one_failure_if_processing_pipes_fails() { + // given + var body = getData("pipes-fail.json"); + + // when + var response = RestAssured.given().contentType("application/json") + .body(body) + .post("/"); + + // then + var respBody = deserializer.getBodyAs(response.then().statusCode(200), BatchItemFailures.class); + assertThat(respBody.batchItemFailures(), hasSize(1)); + assertThat(respBody.batchItemFailures().stream().map(ItemFailure::itemIdentifier).toList(), hasItem("1")); + } +} diff --git a/extensions/funqy/funqy-amazon-lambda/deployment/src/test/java/io/quarkus/funqy/test/SnsEventFunctionTest.java b/extensions/funqy/funqy-amazon-lambda/deployment/src/test/java/io/quarkus/funqy/test/SnsEventFunctionTest.java new file mode 100644 index 00000000000000..d95c66fd727b67 --- /dev/null +++ b/extensions/funqy/funqy-amazon-lambda/deployment/src/test/java/io/quarkus/funqy/test/SnsEventFunctionTest.java @@ -0,0 +1,53 @@ +package io.quarkus.funqy.test; + +import static io.quarkus.funqy.test.util.EventDataProvider.getData; + +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.extension.RegisterExtension; + +import io.quarkus.funqy.test.util.EventDataProvider; +import io.quarkus.test.QuarkusUnitTest; +import io.restassured.RestAssured; + +/** + * Testing that the sns-function with an sns specific model can handle events. + */ +public class SnsEventFunctionTest { + @RegisterExtension + static QuarkusUnitTest test = new QuarkusUnitTest() + .overrideRuntimeConfigKey("quarkus.funqy.export", "sns-function") + .withApplicationRoot((jar) -> jar + .addAsResource("item-function.properties", "application.properties") + .addAsResource("events/sns", "events") + .addClasses(TestFunctions.class, Item.class, + EventDataProvider.class)); + + @Test + public void should_return_no_failures_if_processing_is_ok() { + // given + var body = getData("ok.json"); + + // when + var response = RestAssured.given().contentType("application/json") + .body(body) + .post("/"); + + // then + response.then().statusCode(204); + } + + @Test + public void should_return_one_failure_if_processing_fails() { + // given + var body = getData("fail.json"); + + // when + var response = RestAssured.given().contentType("application/json") + .body(body) + .post("/"); + + // then + // SNS triggers have no error handling. + response.then().statusCode(204); + } +} diff --git a/extensions/funqy/funqy-amazon-lambda/deployment/src/test/java/io/quarkus/funqy/test/SnsFunctionTest.java b/extensions/funqy/funqy-amazon-lambda/deployment/src/test/java/io/quarkus/funqy/test/SnsFunctionTest.java new file mode 100644 index 00000000000000..7297a7452b6c6e --- /dev/null +++ b/extensions/funqy/funqy-amazon-lambda/deployment/src/test/java/io/quarkus/funqy/test/SnsFunctionTest.java @@ -0,0 +1,52 @@ +package io.quarkus.funqy.test; + +import static io.quarkus.funqy.test.util.EventDataProvider.getData; + +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.extension.RegisterExtension; + +import io.quarkus.funqy.test.util.EventDataProvider; +import io.quarkus.test.QuarkusUnitTest; +import io.restassured.RestAssured; + +/** + * Testing that the item-function with a customer model can handle sns events. + */ +public class SnsFunctionTest { + @RegisterExtension + static QuarkusUnitTest test = new QuarkusUnitTest() + .withApplicationRoot((jar) -> jar + .addAsResource("item-function.properties", "application.properties") + .addAsResource("events/sns", "events") + .addClasses(TestFunctions.class, Item.class, + EventDataProvider.class)); + + @Test + public void should_return_no_failures_if_processing_is_ok() { + // given + var body = getData("ok.json"); + + // when + var response = RestAssured.given().contentType("application/json") + .body(body) + .post("/"); + + // then + response.then().statusCode(204); + } + + @Test + public void should_return_one_failure_if_processing_fails() { + // given + var body = getData("fail.json"); + + // when + var response = RestAssured.given().contentType("application/json") + .body(body) + .post("/"); + + // then + // SNS triggers have no error handling. + response.then().statusCode(204); + } +} diff --git a/extensions/funqy/funqy-amazon-lambda/deployment/src/test/java/io/quarkus/funqy/test/SqsEventFunctionTest.java b/extensions/funqy/funqy-amazon-lambda/deployment/src/test/java/io/quarkus/funqy/test/SqsEventFunctionTest.java new file mode 100644 index 00000000000000..88b6852b521583 --- /dev/null +++ b/extensions/funqy/funqy-amazon-lambda/deployment/src/test/java/io/quarkus/funqy/test/SqsEventFunctionTest.java @@ -0,0 +1,100 @@ +package io.quarkus.funqy.test; + +import static io.quarkus.funqy.test.util.EventDataProvider.getData; +import static org.hamcrest.MatcherAssert.assertThat; +import static org.hamcrest.Matchers.empty; +import static org.hamcrest.Matchers.hasItem; +import static org.hamcrest.Matchers.hasSize; +import static org.hamcrest.Matchers.is; + +import jakarta.inject.Inject; + +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.extension.RegisterExtension; + +import io.quarkus.funqy.test.model.BatchItemFailures; +import io.quarkus.funqy.test.model.ItemFailure; +import io.quarkus.funqy.test.util.BodyDeserializer; +import io.quarkus.funqy.test.util.EventDataProvider; +import io.quarkus.test.QuarkusUnitTest; +import io.restassured.RestAssured; + +/** + * Testing that the sqs-function with an sqs specific model can handle events. + */ +public class SqsEventFunctionTest { + @RegisterExtension + static QuarkusUnitTest test = new QuarkusUnitTest() + .overrideRuntimeConfigKey("quarkus.funqy.export", "sqs-function") + .withApplicationRoot((jar) -> jar + .addAsResource("item-function.properties", "application.properties") + .addAsResource("events/sqs", "events") + .addClasses(TestFunctions.class, Item.class, + BatchItemFailures.class, ItemFailure.class, + EventDataProvider.class, BodyDeserializer.class)); + + @Inject + BodyDeserializer deserializer; + + @Test + public void should_return_no_failures_if_processing_is_ok() { + // given + var body = getData("ok.json"); + + // when + var response = RestAssured.given().contentType("application/json") + .body(body) + .post("/"); + + // then + var respBody = deserializer.getBodyAs(response.then().statusCode(200), BatchItemFailures.class); + assertThat(respBody.batchItemFailures(), is(empty())); + } + + @Test + public void should_return_one_failure_if_processing_fails() { + // given + var body = getData("fail.json"); + + // when + var response = RestAssured.given().contentType("application/json") + .body(body) + .post("/"); + + // then + var respBody = deserializer.getBodyAs(response.then().statusCode(200), BatchItemFailures.class); + assertThat(respBody.batchItemFailures(), hasSize(1)); + assertThat(respBody.batchItemFailures().stream().map(ItemFailure::itemIdentifier).toList(), hasItem("1")); + } + + @Test + public void should_return_no_failures_if_processing_pipes_is_ok() { + // given + var body = getData("pipes-ok.json"); + + // when + var response = RestAssured.given().contentType("application/json") + .body(body) + .post("/"); + + // then + var respBody = deserializer.getBodyAs(response.then().statusCode(200), BatchItemFailures.class); + assertThat(respBody.batchItemFailures(), is(empty())); + } + + @Test + public void should_return_one_failure_if_processing_pipes_fails() { + // given + var body = getData("pipes-fail.json"); + + // when + var response = RestAssured.given().contentType("application/json") + .body(body) + .post("/"); + + // then + var respBody = deserializer.getBodyAs(response.then().statusCode(200), BatchItemFailures.class); + assertThat(respBody.batchItemFailures(), hasSize(1)); + assertThat(respBody.batchItemFailures().stream().map(ItemFailure::itemIdentifier).toList(), hasItem("1")); + } +} diff --git a/extensions/funqy/funqy-amazon-lambda/deployment/src/test/java/io/quarkus/funqy/test/SqsFunctionNoBatchItemFailuresTest.java b/extensions/funqy/funqy-amazon-lambda/deployment/src/test/java/io/quarkus/funqy/test/SqsFunctionNoBatchItemFailuresTest.java new file mode 100644 index 00000000000000..89bf3e585587d5 --- /dev/null +++ b/extensions/funqy/funqy-amazon-lambda/deployment/src/test/java/io/quarkus/funqy/test/SqsFunctionNoBatchItemFailuresTest.java @@ -0,0 +1,48 @@ +package io.quarkus.funqy.test; + +import static io.quarkus.funqy.test.util.EventDataProvider.getData; + +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.extension.RegisterExtension; + +import io.quarkus.funqy.test.util.EventDataProvider; +import io.quarkus.test.QuarkusUnitTest; +import io.restassured.RestAssured; + +public class SqsFunctionNoBatchItemFailuresTest { + @RegisterExtension + static QuarkusUnitTest test = new QuarkusUnitTest() + .withApplicationRoot((jar) -> jar + .addAsResource("no-batch-function.properties", "application.properties") + .addAsResource("events/sqs", "events") + .addClasses(TestFunctions.class, Item.class, + EventDataProvider.class)); + + @Test + public void should_return_no_failures_if_processing_is_ok() { + // given + var body = getData("ok.json"); + + // when + var response = RestAssured.given().contentType("application/json") + .body(body) + .post("/"); + + // then + response.then().statusCode(204); + } + + @Test + public void should_return_one_failure_if_processing_fails() { + // given + var body = getData("fail.json"); + + // when + var response = RestAssured.given().contentType("application/json") + .body(body) + .post("/"); + + // then + response.then().statusCode(204); + } +} diff --git a/extensions/funqy/funqy-amazon-lambda/deployment/src/test/java/io/quarkus/funqy/test/SqsFunctionTest.java b/extensions/funqy/funqy-amazon-lambda/deployment/src/test/java/io/quarkus/funqy/test/SqsFunctionTest.java new file mode 100644 index 00000000000000..a8ec58a725dea1 --- /dev/null +++ b/extensions/funqy/funqy-amazon-lambda/deployment/src/test/java/io/quarkus/funqy/test/SqsFunctionTest.java @@ -0,0 +1,99 @@ +package io.quarkus.funqy.test; + +import static io.quarkus.funqy.test.util.EventDataProvider.getData; +import static org.hamcrest.MatcherAssert.assertThat; +import static org.hamcrest.Matchers.empty; +import static org.hamcrest.Matchers.hasItem; +import static org.hamcrest.Matchers.hasSize; +import static org.hamcrest.Matchers.is; + +import jakarta.inject.Inject; + +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.extension.RegisterExtension; + +import io.quarkus.funqy.test.model.BatchItemFailures; +import io.quarkus.funqy.test.model.ItemFailure; +import io.quarkus.funqy.test.util.BodyDeserializer; +import io.quarkus.funqy.test.util.EventDataProvider; +import io.quarkus.test.QuarkusUnitTest; +import io.restassured.RestAssured; + +/** + * Testing that the item-function with a customer model can handle sqs events. + */ +public class SqsFunctionTest { + @RegisterExtension + static QuarkusUnitTest test = new QuarkusUnitTest() + .withApplicationRoot((jar) -> jar + .addAsResource("item-function.properties", "application.properties") + .addAsResource("events/sqs", "events") + .addClasses(TestFunctions.class, Item.class, + BatchItemFailures.class, ItemFailure.class, + EventDataProvider.class, BodyDeserializer.class)); + + @Inject + BodyDeserializer deserializer; + + @Test + public void should_return_no_failures_if_processing_is_ok() { + // given + var body = getData("ok.json"); + + // when + var response = RestAssured.given().contentType("application/json") + .body(body) + .post("/"); + + // then + var respBody = deserializer.getBodyAs(response.then().statusCode(200), BatchItemFailures.class); + assertThat(respBody.batchItemFailures(), is(empty())); + } + + @Test + public void should_return_one_failure_if_processing_fails() { + // given + var body = getData("fail.json"); + + // when + var response = RestAssured.given().contentType("application/json") + .body(body) + .post("/"); + + // then + var respBody = deserializer.getBodyAs(response.then().statusCode(200), BatchItemFailures.class); + assertThat(respBody.batchItemFailures(), hasSize(1)); + assertThat(respBody.batchItemFailures().stream().map(ItemFailure::itemIdentifier).toList(), hasItem("1")); + } + + @Test + public void should_return_no_failures_if_processing_pipes_is_ok() { + // given + var body = getData("pipes-ok.json"); + + // when + var response = RestAssured.given().contentType("application/json") + .body(body) + .post("/"); + + // then + var respBody = deserializer.getBodyAs(response.then().statusCode(200), BatchItemFailures.class); + assertThat(respBody.batchItemFailures(), is(empty())); + } + + @Test + public void should_return_one_failure_if_processing_pipes_fails() { + // given + var body = getData("pipes-fail.json"); + + // when + var response = RestAssured.given().contentType("application/json") + .body(body) + .post("/"); + + // then + var respBody = deserializer.getBodyAs(response.then().statusCode(200), BatchItemFailures.class); + assertThat(respBody.batchItemFailures(), hasSize(1)); + assertThat(respBody.batchItemFailures().stream().map(ItemFailure::itemIdentifier).toList(), hasItem("1")); + } +} diff --git a/extensions/funqy/funqy-amazon-lambda/deployment/src/test/java/io/quarkus/funqy/test/TestFunctions.java b/extensions/funqy/funqy-amazon-lambda/deployment/src/test/java/io/quarkus/funqy/test/TestFunctions.java new file mode 100644 index 00000000000000..18a47c49c37cee --- /dev/null +++ b/extensions/funqy/funqy-amazon-lambda/deployment/src/test/java/io/quarkus/funqy/test/TestFunctions.java @@ -0,0 +1,65 @@ +package io.quarkus.funqy.test; + +import java.nio.charset.StandardCharsets; + +import com.amazonaws.services.lambda.runtime.events.DynamodbEvent; +import com.amazonaws.services.lambda.runtime.events.KinesisEvent; +import com.amazonaws.services.lambda.runtime.events.SNSEvent; +import com.amazonaws.services.lambda.runtime.events.SQSEvent; + +import io.cloudevents.CloudEvent; +import io.quarkus.funqy.Funq; +import io.smallrye.mutiny.Uni; + +public class TestFunctions { + + @Funq("item-function") + public Uni itemFunction(Item item) { + if (item.isThrowError()) { + return Uni.createFrom().failure(new IllegalArgumentException("This is an expected error.")); + } + return Uni.createFrom().voidItem(); + } + + @Funq("sqs-function") + public Uni sqsFunction(SQSEvent.SQSMessage msg) { + if (msg.getBody().contains("true")) { + return Uni.createFrom().failure(new IllegalArgumentException("This is an expected error.")); + } + return Uni.createFrom().voidItem(); + } + + @Funq("sns-function") + public Uni snsFunction(SNSEvent.SNSRecord msg) { + if (msg.getSNS().getMessage().contains("true")) { + return Uni.createFrom().failure(new IllegalArgumentException("This is an expected error.")); + } + return Uni.createFrom().voidItem(); + } + + @Funq("cloudevents-function") + public Uni cloudEventsFunction(CloudEvent msg) { + // Due to jackson deserialization the base64 decoding already happened. + if (new String(msg.getData().toBytes(), StandardCharsets.UTF_8).contains("true")) { + return Uni.createFrom().failure(new IllegalArgumentException("This is an expected error.")); + } + return Uni.createFrom().voidItem(); + } + + @Funq("kinesis-function") + public Uni kinesisFunction(KinesisEvent.Record msg) { + // Due to jackson deserialization the base64 decoding already happened. + if (StandardCharsets.UTF_8.decode(msg.getData()).toString().contains("true")) { + return Uni.createFrom().failure(new IllegalArgumentException("This is an expected error.")); + } + return Uni.createFrom().voidItem(); + } + + @Funq("dynamodb-function") + public Uni dynamodbFunction(DynamodbEvent.DynamodbStreamRecord msg) { + if (msg.getDynamodb().getNewImage().get("ThrowError").getBOOL()) { + return Uni.createFrom().failure(new IllegalArgumentException("This is an expected error.")); + } + return Uni.createFrom().voidItem(); + } +} diff --git a/extensions/funqy/funqy-amazon-lambda/deployment/src/test/java/io/quarkus/funqy/test/model/BatchItemFailures.java b/extensions/funqy/funqy-amazon-lambda/deployment/src/test/java/io/quarkus/funqy/test/model/BatchItemFailures.java new file mode 100644 index 00000000000000..4914a34a5f8017 --- /dev/null +++ b/extensions/funqy/funqy-amazon-lambda/deployment/src/test/java/io/quarkus/funqy/test/model/BatchItemFailures.java @@ -0,0 +1,6 @@ +package io.quarkus.funqy.test.model; + +import java.util.List; + +public record BatchItemFailures(List batchItemFailures) { +} diff --git a/extensions/funqy/funqy-amazon-lambda/deployment/src/test/java/io/quarkus/funqy/test/model/ItemFailure.java b/extensions/funqy/funqy-amazon-lambda/deployment/src/test/java/io/quarkus/funqy/test/model/ItemFailure.java new file mode 100644 index 00000000000000..bd08b653841e27 --- /dev/null +++ b/extensions/funqy/funqy-amazon-lambda/deployment/src/test/java/io/quarkus/funqy/test/model/ItemFailure.java @@ -0,0 +1,4 @@ +package io.quarkus.funqy.test.model; + +public record ItemFailure(String itemIdentifier) { +} diff --git a/extensions/funqy/funqy-amazon-lambda/deployment/src/test/java/io/quarkus/funqy/test/util/BodyDeserializer.java b/extensions/funqy/funqy-amazon-lambda/deployment/src/test/java/io/quarkus/funqy/test/util/BodyDeserializer.java new file mode 100644 index 00000000000000..c69081d484c724 --- /dev/null +++ b/extensions/funqy/funqy-amazon-lambda/deployment/src/test/java/io/quarkus/funqy/test/util/BodyDeserializer.java @@ -0,0 +1,32 @@ +package io.quarkus.funqy.test.util; + +import jakarta.enterprise.context.ApplicationScoped; +import jakarta.inject.Inject; + +import com.fasterxml.jackson.core.JsonProcessingException; +import com.fasterxml.jackson.databind.ObjectMapper; + +import io.restassured.response.ValidatableResponse; + +@ApplicationScoped +public class BodyDeserializer { + + @Inject + ObjectMapper objectMapper; + + /** + * Allows to deserialize the response provided by RestAssured to the specified class. + * + * @param response RestAssured response + * @param clazz class to deserialize to + * @return the deserialized class + * @param type of the class to deserialize to + */ + public T getBodyAs(ValidatableResponse response, Class clazz) { + try { + return objectMapper.readValue(response.extract().body().asString(), clazz); + } catch (JsonProcessingException e) { + throw new RuntimeException(e); + } + } +} diff --git a/extensions/funqy/funqy-amazon-lambda/deployment/src/test/java/io/quarkus/funqy/test/util/EventDataProvider.java b/extensions/funqy/funqy-amazon-lambda/deployment/src/test/java/io/quarkus/funqy/test/util/EventDataProvider.java new file mode 100644 index 00000000000000..17e1efadc5fe1c --- /dev/null +++ b/extensions/funqy/funqy-amazon-lambda/deployment/src/test/java/io/quarkus/funqy/test/util/EventDataProvider.java @@ -0,0 +1,19 @@ +package io.quarkus.funqy.test.util; + +import java.io.IOException; +import java.nio.charset.Charset; + +import org.apache.commons.io.IOUtils; + +public class EventDataProvider { + + public static String getData(String path) { + try { + return IOUtils.toString( + EventDataProvider.class.getClassLoader().getResourceAsStream("events/" + path), + Charset.defaultCharset()); + } catch (IOException e) { + throw new RuntimeException(e); + } + } +} diff --git a/extensions/funqy/funqy-amazon-lambda/deployment/src/test/resources/any-function.properties b/extensions/funqy/funqy-amazon-lambda/deployment/src/test/resources/any-function.properties new file mode 100644 index 00000000000000..e9be9dfb08bdd4 --- /dev/null +++ b/extensions/funqy/funqy-amazon-lambda/deployment/src/test/resources/any-function.properties @@ -0,0 +1 @@ +quarkus.funqy.export=item-function diff --git a/extensions/funqy/funqy-amazon-lambda/deployment/src/test/resources/events/any/fail.json b/extensions/funqy/funqy-amazon-lambda/deployment/src/test/resources/events/any/fail.json new file mode 100644 index 00000000000000..2ec250d2e81a18 --- /dev/null +++ b/extensions/funqy/funqy-amazon-lambda/deployment/src/test/resources/events/any/fail.json @@ -0,0 +1,4 @@ +{ + "message": "hello", + "throwError": true +} \ No newline at end of file diff --git a/extensions/funqy/funqy-amazon-lambda/deployment/src/test/resources/events/any/ok.json b/extensions/funqy/funqy-amazon-lambda/deployment/src/test/resources/events/any/ok.json new file mode 100644 index 00000000000000..47ae6aedbde1dd --- /dev/null +++ b/extensions/funqy/funqy-amazon-lambda/deployment/src/test/resources/events/any/ok.json @@ -0,0 +1,4 @@ +{ + "message": "hello", + "throwError": false +} \ No newline at end of file diff --git a/extensions/funqy/funqy-amazon-lambda/deployment/src/test/resources/events/cloudevents/fail.json b/extensions/funqy/funqy-amazon-lambda/deployment/src/test/resources/events/cloudevents/fail.json new file mode 100644 index 00000000000000..62f32e16345e37 --- /dev/null +++ b/extensions/funqy/funqy-amazon-lambda/deployment/src/test/resources/events/cloudevents/fail.json @@ -0,0 +1,26 @@ +[ + { + "specversion" : "1.0", + "type" : "com.github.pull_request.opened", + "source" : "https://github.com/cloudevents/spec/pull", + "subject" : "123", + "id" : "1", + "time" : "2018-04-05T17:31:00Z", + "comexampleextension1" : "value", + "comexampleothervalue" : 5, + "datacontenttype" : "text/plain", + "data" : "{\"message\":\"hello\",\"throwError\":true}" + }, + { + "specversion" : "1.0", + "type" : "com.github.pull_request.opened", + "source" : "https://github.com/cloudevents/spec/pull", + "subject" : "123", + "id" : "2", + "time" : "2018-04-05T17:31:00Z", + "comexampleextension1" : "value", + "comexampleothervalue" : 5, + "datacontenttype" : "text/plain", + "data" : "{\"message\":\"fail\",\"throwError\":false}" + } +] \ No newline at end of file diff --git a/extensions/funqy/funqy-amazon-lambda/deployment/src/test/resources/events/cloudevents/ok.json b/extensions/funqy/funqy-amazon-lambda/deployment/src/test/resources/events/cloudevents/ok.json new file mode 100644 index 00000000000000..e49c9ddcece017 --- /dev/null +++ b/extensions/funqy/funqy-amazon-lambda/deployment/src/test/resources/events/cloudevents/ok.json @@ -0,0 +1,41 @@ +[ + { + "specversion" : "1.0", + "type" : "com.github.pull_request.opened", + "source" : "https://github.com/cloudevents/spec/pull", + "subject" : "123", + "id" : "1", + "time" : "2018-04-05T17:31:00Z", + "comexampleextension1" : "value", + "comexampleothervalue" : 5, + "datacontenttype" : "text/plain", + "data" : "{\"message\":\"hello\",\"throwError\":false}" + }, + { + "specversion" : "1.0", + "type" : "com.github.pull_request.opened", + "source" : "https://github.com/cloudevents/spec/pull", + "subject" : "123", + "id" : "2", + "time" : "2018-04-05T17:31:00Z", + "comexampleextension1" : "value", + "comexampleothervalue" : 5, + "datacontenttype" : "application/json", + "data" : { + "message": "ok", + "throwError": false + } + }, + { + "specversion" : "1.0", + "type" : "com.github.pull_request.opened", + "source" : "https://github.com/cloudevents/spec/pull", + "subject" : "123", + "id" : "3", + "time" : "2018-04-05T17:31:00Z", + "comexampleextension1" : "value", + "comexampleothervalue" : 5, + "datacontenttype" : "text/xml", + "data_base64" : "eyJtZXNzYWdlIjoiZmFpbCIsInRocm93RXJyb3IiOmZhbHNlfQ==" + } +] \ No newline at end of file diff --git a/extensions/funqy/funqy-amazon-lambda/deployment/src/test/resources/events/dynamodb/fail.json b/extensions/funqy/funqy-amazon-lambda/deployment/src/test/resources/events/dynamodb/fail.json new file mode 100644 index 00000000000000..9bb03b7bc14898 --- /dev/null +++ b/extensions/funqy/funqy-amazon-lambda/deployment/src/test/resources/events/dynamodb/fail.json @@ -0,0 +1,62 @@ +{ + "Records": [ + { + "eventID": "1", + "eventVersion": "1.0", + "dynamodb": { + "Keys": { + "Id": { + "N": "1" + } + }, + "NewImage": { + "Message": { + "S": "hello" + }, + "ThrowError": { + "BOOL": true + }, + "Id": { + "N": "1" + } + }, + "StreamViewType": "NEW_AND_OLD_IMAGES", + "SequenceNumber": "1", + "SizeBytes": 26 + }, + "awsRegion": "us-west-2", + "eventName": "INSERT", + "eventSourceARN": "arn:aws:dynamodb:us-east-2:123456789012:table/my-table/stream/2023-06-10T19:26:16.525", + "eventSource": "aws:dynamodb" + }, + { + "eventID": "2", + "eventVersion": "1.0", + "dynamodb": { + "NewImage": { + "Message": { + "S": "fail" + }, + "ThrowError": { + "BOOL": false + }, + "Id": { + "N": "2" + } + }, + "SequenceNumber": "2", + "Keys": { + "Id": { + "N": "101" + } + }, + "SizeBytes": 59, + "StreamViewType": "NEW_AND_OLD_IMAGES" + }, + "awsRegion": "us-west-2", + "eventName": "INSERT", + "eventSourceARN": "arn:aws:dynamodb:us-east-2:123456789012:table/my-table/stream/2023-06-10T19:26:16.525", + "eventSource": "aws:dynamodb" + } + ] +} \ No newline at end of file diff --git a/extensions/funqy/funqy-amazon-lambda/deployment/src/test/resources/events/dynamodb/ok.json b/extensions/funqy/funqy-amazon-lambda/deployment/src/test/resources/events/dynamodb/ok.json new file mode 100644 index 00000000000000..722e5aa15662cd --- /dev/null +++ b/extensions/funqy/funqy-amazon-lambda/deployment/src/test/resources/events/dynamodb/ok.json @@ -0,0 +1,64 @@ +{ + "Records": [ + { + "eventID": "1", + "eventVersion": "1.0", + "dynamodb": { + "ApproximateCreationDateTime": 1719318377.0, + "Keys": { + "Id": { + "N": "1" + } + }, + "NewImage": { + "Message": { + "S": "hello" + }, + "ThrowError": { + "BOOL": false + }, + "Id": { + "N": "1" + } + }, + "StreamViewType": "NEW_AND_OLD_IMAGES", + "SequenceNumber": "1", + "SizeBytes": 26 + }, + "awsRegion": "us-west-2", + "eventName": "INSERT", + "eventSourceARN": "arn:aws:dynamodb:us-east-2:123456789012:table/my-table/stream/2023-06-10T19:26:16.525", + "eventSource": "aws:dynamodb" + }, + { + "eventID": "2", + "eventVersion": "1.0", + "dynamodb": { + "ApproximateCreationDateTime": 1719318377.0, + "NewImage": { + "Message": { + "S": "fail" + }, + "ThrowError": { + "BOOL": false + }, + "Id": { + "N": "2" + } + }, + "SequenceNumber": "2", + "Keys": { + "Id": { + "N": "101" + } + }, + "SizeBytes": 59, + "StreamViewType": "NEW_AND_OLD_IMAGES" + }, + "awsRegion": "us-west-2", + "eventName": "INSERT", + "eventSourceARN": "arn:aws:dynamodb:us-east-2:123456789012:table/my-table/stream/2023-06-10T19:26:16.525", + "eventSource": "aws:dynamodb" + } + ] +} \ No newline at end of file diff --git a/extensions/funqy/funqy-amazon-lambda/deployment/src/test/resources/events/dynamodb/pipes-fail.json b/extensions/funqy/funqy-amazon-lambda/deployment/src/test/resources/events/dynamodb/pipes-fail.json new file mode 100644 index 00000000000000..98d791b29e8b59 --- /dev/null +++ b/extensions/funqy/funqy-amazon-lambda/deployment/src/test/resources/events/dynamodb/pipes-fail.json @@ -0,0 +1,60 @@ +[ + { + "eventID": "1", + "eventVersion": "1.0", + "dynamodb": { + "Keys": { + "Id": { + "N": "1" + } + }, + "NewImage": { + "Message": { + "S": "hello" + }, + "ThrowError": { + "BOOL": true + }, + "Id": { + "N": "1" + } + }, + "StreamViewType": "NEW_AND_OLD_IMAGES", + "SequenceNumber": "1", + "SizeBytes": 26 + }, + "awsRegion": "us-west-2", + "eventName": "INSERT", + "eventSourceARN": "arn:aws:dynamodb:us-east-2:123456789012:table/my-table/stream/2023-06-10T19:26:16.525", + "eventSource": "aws:dynamodb" + }, + { + "eventID": "2", + "eventVersion": "1.0", + "dynamodb": { + "NewImage": { + "Message": { + "S": "fail" + }, + "ThrowError": { + "BOOL": false + }, + "Id": { + "N": "2" + } + }, + "SequenceNumber": "2", + "Keys": { + "Id": { + "N": "101" + } + }, + "SizeBytes": 59, + "StreamViewType": "NEW_AND_OLD_IMAGES" + }, + "awsRegion": "us-west-2", + "eventName": "INSERT", + "eventSourceARN": "arn:aws:dynamodb:us-east-2:123456789012:table/my-table/stream/2023-06-10T19:26:16.525", + "eventSource": "aws:dynamodb" + } +] \ No newline at end of file diff --git a/extensions/funqy/funqy-amazon-lambda/deployment/src/test/resources/events/dynamodb/pipes-ok.json b/extensions/funqy/funqy-amazon-lambda/deployment/src/test/resources/events/dynamodb/pipes-ok.json new file mode 100644 index 00000000000000..d0d3816b1496c7 --- /dev/null +++ b/extensions/funqy/funqy-amazon-lambda/deployment/src/test/resources/events/dynamodb/pipes-ok.json @@ -0,0 +1,60 @@ +[ + { + "eventID": "1", + "eventVersion": "1.0", + "dynamodb": { + "Keys": { + "Id": { + "N": "1" + } + }, + "NewImage": { + "Message": { + "S": "hello" + }, + "ThrowError": { + "BOOL": false + }, + "Id": { + "N": "1" + } + }, + "StreamViewType": "NEW_AND_OLD_IMAGES", + "SequenceNumber": "1", + "SizeBytes": 26 + }, + "awsRegion": "us-west-2", + "eventName": "INSERT", + "eventSourceARN": "arn:aws:dynamodb:us-east-2:123456789012:table/my-table/stream/2023-06-10T19:26:16.525", + "eventSource": "aws:dynamodb" + }, + { + "eventID": "2", + "eventVersion": "1.0", + "dynamodb": { + "NewImage": { + "Message": { + "S": "fail" + }, + "ThrowError": { + "BOOL": false + }, + "Id": { + "N": "2" + } + }, + "SequenceNumber": "2", + "Keys": { + "Id": { + "N": "101" + } + }, + "SizeBytes": 59, + "StreamViewType": "NEW_AND_OLD_IMAGES" + }, + "awsRegion": "us-west-2", + "eventName": "INSERT", + "eventSourceARN": "arn:aws:dynamodb:us-east-2:123456789012:table/my-table/stream/2023-06-10T19:26:16.525", + "eventSource": "aws:dynamodb" + } +] \ No newline at end of file diff --git a/extensions/funqy/funqy-amazon-lambda/deployment/src/test/resources/events/kinesis/fail.json b/extensions/funqy/funqy-amazon-lambda/deployment/src/test/resources/events/kinesis/fail.json new file mode 100644 index 00000000000000..061f8dd6b880b3 --- /dev/null +++ b/extensions/funqy/funqy-amazon-lambda/deployment/src/test/resources/events/kinesis/fail.json @@ -0,0 +1,36 @@ +{ + "Records": [ + { + "kinesis": { + "kinesisSchemaVersion": "1.0", + "partitionKey": "1", + "sequenceNumber": "1", + "data": "eyJtZXNzYWdlIjoiaGVsbG8iLCJ0aHJvd0Vycm9yIjp0cnVlfQ==", + "approximateArrivalTimestamp": 1545084650.987 + }, + "eventSource": "aws:kinesis", + "eventVersion": "1.0", + "eventID": "shardId-000000000006:49590338271490256608559692538361571095921575989136588898", + "eventName": "aws:kinesis:record", + "invokeIdentityArn": "arn:aws:iam::123456789012:role/lambda-role", + "awsRegion": "us-east-2", + "eventSourceARN": "arn:aws:kinesis:us-east-2:123456789012:stream/lambda-stream" + }, + { + "kinesis": { + "kinesisSchemaVersion": "1.0", + "partitionKey": "1", + "sequenceNumber": "2", + "data": "eyJtZXNzYWdlIjoiZmFpbCIsInRocm93RXJyb3IiOmZhbHNlfQ==", + "approximateArrivalTimestamp": 1545084711.166 + }, + "eventSource": "aws:kinesis", + "eventVersion": "1.0", + "eventID": "shardId-000000000006:49590338271490256608559692540925702759324208523137515618", + "eventName": "aws:kinesis:record", + "invokeIdentityArn": "arn:aws:iam::123456789012:role/lambda-role", + "awsRegion": "us-east-2", + "eventSourceARN": "arn:aws:kinesis:us-east-2:123456789012:stream/lambda-stream" + } + ] +} \ No newline at end of file diff --git a/extensions/funqy/funqy-amazon-lambda/deployment/src/test/resources/events/kinesis/ok.json b/extensions/funqy/funqy-amazon-lambda/deployment/src/test/resources/events/kinesis/ok.json new file mode 100644 index 00000000000000..c8550929724b15 --- /dev/null +++ b/extensions/funqy/funqy-amazon-lambda/deployment/src/test/resources/events/kinesis/ok.json @@ -0,0 +1,36 @@ +{ + "Records": [ + { + "kinesis": { + "kinesisSchemaVersion": "1.0", + "partitionKey": "1", + "sequenceNumber": "1", + "data": "eyJtZXNzYWdlIjoiaGVsbG8iLCJ0aHJvd0Vycm9yIjpmYWxzZX0=", + "approximateArrivalTimestamp": 1545084650.987 + }, + "eventSource": "aws:kinesis", + "eventVersion": "1.0", + "eventID": "shardId-000000000006:49590338271490256608559692538361571095921575989136588898", + "eventName": "aws:kinesis:record", + "invokeIdentityArn": "arn:aws:iam::123456789012:role/lambda-role", + "awsRegion": "us-east-2", + "eventSourceARN": "arn:aws:kinesis:us-east-2:123456789012:stream/lambda-stream" + }, + { + "kinesis": { + "kinesisSchemaVersion": "1.0", + "partitionKey": "1", + "sequenceNumber": "2", + "data": "eyJtZXNzYWdlIjoiZmFpbCIsInRocm93RXJyb3IiOmZhbHNlfQ==", + "approximateArrivalTimestamp": 1545084711.166 + }, + "eventSource": "aws:kinesis", + "eventVersion": "1.0", + "eventID": "shardId-000000000006:49590338271490256608559692540925702759324208523137515618", + "eventName": "aws:kinesis:record", + "invokeIdentityArn": "arn:aws:iam::123456789012:role/lambda-role", + "awsRegion": "us-east-2", + "eventSourceARN": "arn:aws:kinesis:us-east-2:123456789012:stream/lambda-stream" + } + ] +} \ No newline at end of file diff --git a/extensions/funqy/funqy-amazon-lambda/deployment/src/test/resources/events/kinesis/pipes-fail.json b/extensions/funqy/funqy-amazon-lambda/deployment/src/test/resources/events/kinesis/pipes-fail.json new file mode 100644 index 00000000000000..3cc956e1cee0d7 --- /dev/null +++ b/extensions/funqy/funqy-amazon-lambda/deployment/src/test/resources/events/kinesis/pipes-fail.json @@ -0,0 +1,30 @@ +[ + { + "kinesisSchemaVersion": "1.0", + "partitionKey": "1", + "sequenceNumber": "1", + "data": "eyJtZXNzYWdlIjoiaGVsbG8iLCJ0aHJvd0Vycm9yIjp0cnVlfQ==", + "approximateArrivalTimestamp": 1545084650.987, + "eventSource": "aws:kinesis", + "eventVersion": "1.0", + "eventID": "shardId-000000000006:49590338271490256608559692538361571095921575989136588898", + "eventName": "aws:kinesis:record", + "invokeIdentityArn": "arn:aws:iam::123456789012:role/lambda-role", + "awsRegion": "us-east-2", + "eventSourceARN": "arn:aws:kinesis:us-east-2:123456789012:stream/lambda-stream" + }, + { + "kinesisSchemaVersion": "1.0", + "partitionKey": "1", + "sequenceNumber": "2", + "data": "eyJtZXNzYWdlIjoiZmFpbCIsInRocm93RXJyb3IiOmZhbHNlfQ==", + "approximateArrivalTimestamp": 1545084711.166, + "eventSource": "aws:kinesis", + "eventVersion": "1.0", + "eventID": "shardId-000000000006:49590338271490256608559692540925702759324208523137515618", + "eventName": "aws:kinesis:record", + "invokeIdentityArn": "arn:aws:iam::123456789012:role/lambda-role", + "awsRegion": "us-east-2", + "eventSourceARN": "arn:aws:kinesis:us-east-2:123456789012:stream/lambda-stream" + } +] \ No newline at end of file diff --git a/extensions/funqy/funqy-amazon-lambda/deployment/src/test/resources/events/kinesis/pipes-ok.json b/extensions/funqy/funqy-amazon-lambda/deployment/src/test/resources/events/kinesis/pipes-ok.json new file mode 100644 index 00000000000000..1a40ebe10175d3 --- /dev/null +++ b/extensions/funqy/funqy-amazon-lambda/deployment/src/test/resources/events/kinesis/pipes-ok.json @@ -0,0 +1,30 @@ +[ + { + "kinesisSchemaVersion": "1.0", + "partitionKey": "1", + "sequenceNumber": "1", + "data": "eyJtZXNzYWdlIjoiaGVsbG8iLCJ0aHJvd0Vycm9yIjpmYWxzZX0=", + "approximateArrivalTimestamp": 1545084650.987, + "eventSource": "aws:kinesis", + "eventVersion": "1.0", + "eventID": "shardId-000000000006:49590338271490256608559692538361571095921575989136588898", + "eventName": "aws:kinesis:record", + "invokeIdentityArn": "arn:aws:iam::123456789012:role/lambda-role", + "awsRegion": "us-east-2", + "eventSourceARN": "arn:aws:kinesis:us-east-2:123456789012:stream/lambda-stream" + }, + { + "kinesisSchemaVersion": "1.0", + "partitionKey": "1", + "sequenceNumber": "2", + "data": "eyJtZXNzYWdlIjoiZmFpbCIsInRocm93RXJyb3IiOmZhbHNlfQ==", + "approximateArrivalTimestamp": 1545084711.166, + "eventSource": "aws:kinesis", + "eventVersion": "1.0", + "eventID": "shardId-000000000006:49590338271490256608559692540925702759324208523137515618", + "eventName": "aws:kinesis:record", + "invokeIdentityArn": "arn:aws:iam::123456789012:role/lambda-role", + "awsRegion": "us-east-2", + "eventSourceARN": "arn:aws:kinesis:us-east-2:123456789012:stream/lambda-stream" + } +] \ No newline at end of file diff --git a/extensions/funqy/funqy-amazon-lambda/deployment/src/test/resources/events/sns/fail.json b/extensions/funqy/funqy-amazon-lambda/deployment/src/test/resources/events/sns/fail.json new file mode 100644 index 00000000000000..4643568fdfe33a --- /dev/null +++ b/extensions/funqy/funqy-amazon-lambda/deployment/src/test/resources/events/sns/fail.json @@ -0,0 +1,31 @@ +{ + "Records": [ + { + "EventVersion": "1.0", + "EventSubscriptionArn": "arn:aws:sns:us-east-1:123456789012:sns-lambda:21be56ed-a058-49f5-8c98-aedd2564c486", + "EventSource": "aws:sns", + "Sns": { + "SignatureVersion": "1", + "Timestamp": "2019-01-02T12:45:07.000Z", + "Signature": "tcc6faL2yUC6dgZdmrwh1Y4cGa/ebXEkAi6RibDsvpi+tE/1+82j...65r==", + "SigningCertURL": "https://sns.us-east-1.amazonaws.com/SimpleNotificationService-ac565b8b1a6c5d002d285f9598aa1d9b.pem", + "MessageId": "1", + "Message": "{\"message\":\"hello\",\"throwError\":true}", + "MessageAttributes": { + "Test": { + "Type": "String", + "Value": "TestString" + }, + "TestBinary": { + "Type": "Binary", + "Value": "TestBinary" + } + }, + "Type": "Notification", + "UnsubscribeURL": "https://sns.us-east-1.amazonaws.com/?Action=Unsubscribe&SubscriptionArn=arn:aws:sns:us-east-1:123456789012:test-lambda:21be56ed-a058-49f5-8c98-aedd2564c486", + "TopicArn":"arn:aws:sns:us-east-1:123456789012:sns-lambda", + "Subject": "TestInvoke" + } + } + ] +} \ No newline at end of file diff --git a/extensions/funqy/funqy-amazon-lambda/deployment/src/test/resources/events/sns/ok.json b/extensions/funqy/funqy-amazon-lambda/deployment/src/test/resources/events/sns/ok.json new file mode 100644 index 00000000000000..c97c6511cbab23 --- /dev/null +++ b/extensions/funqy/funqy-amazon-lambda/deployment/src/test/resources/events/sns/ok.json @@ -0,0 +1,31 @@ +{ + "Records": [ + { + "EventVersion": "1.0", + "EventSubscriptionArn": "arn:aws:sns:us-east-1:123456789012:sns-lambda:21be56ed-a058-49f5-8c98-aedd2564c486", + "EventSource": "aws:sns", + "Sns": { + "SignatureVersion": "1", + "Timestamp": "2019-01-02T12:45:07.000Z", + "Signature": "tcc6faL2yUC6dgZdmrwh1Y4cGa/ebXEkAi6RibDsvpi+tE/1+82j...65r==", + "SigningCertURL": "https://sns.us-east-1.amazonaws.com/SimpleNotificationService-ac565b8b1a6c5d002d285f9598aa1d9b.pem", + "MessageId": "1", + "Message": "{\"message\":\"hello\",\"throwError\":false}", + "MessageAttributes": { + "Test": { + "Type": "String", + "Value": "TestString" + }, + "TestBinary": { + "Type": "Binary", + "Value": "TestBinary" + } + }, + "Type": "Notification", + "UnsubscribeURL": "https://sns.us-east-1.amazonaws.com/?Action=Unsubscribe&SubscriptionArn=arn:aws:sns:us-east-1:123456789012:test-lambda:21be56ed-a058-49f5-8c98-aedd2564c486", + "TopicArn":"arn:aws:sns:us-east-1:123456789012:sns-lambda", + "Subject": "TestInvoke" + } + } + ] +} \ No newline at end of file diff --git a/extensions/funqy/funqy-amazon-lambda/deployment/src/test/resources/events/sqs/fail.json b/extensions/funqy/funqy-amazon-lambda/deployment/src/test/resources/events/sqs/fail.json new file mode 100644 index 00000000000000..b6b52c29187ec8 --- /dev/null +++ b/extensions/funqy/funqy-amazon-lambda/deployment/src/test/resources/events/sqs/fail.json @@ -0,0 +1,36 @@ +{ + "Records": [ + { + "messageId": "1", + "receiptHandle": "AQEBwJnKyrHigUMZj6rYigCgxlaS3SLy0a...", + "body": "{\"message\":\"hello\",\"throwError\":true}", + "attributes": { + "ApproximateReceiveCount": "1", + "SentTimestamp": "1545082649183", + "SenderId": "AIDAIENQZJOLO23YVJ4VO", + "ApproximateFirstReceiveTimestamp": "1545082649185" + }, + "messageAttributes": {}, + "md5OfBody": "e4e68fb7bd0e697a0ae8f1bb342846b3", + "eventSource": "aws:sqs", + "eventSourceARN": "arn:aws:sqs:us-east-2:123456789012:my-queue", + "awsRegion": "us-east-2" + }, + { + "messageId": "2", + "receiptHandle": "AQEBzWwaftRI0KuVm4tP+/7q1rGgNqicHq...", + "body": "{\"message\":\"fail\",\"throwError\":false}", + "attributes": { + "ApproximateReceiveCount": "1", + "SentTimestamp": "1545082650636", + "SenderId": "AIDAIENQZJOLO23YVJ4VO", + "ApproximateFirstReceiveTimestamp": "1545082650649" + }, + "messageAttributes": {}, + "md5OfBody": "e4e68fb7bd0e697a0ae8f1bb342846b3", + "eventSource": "aws:sqs", + "eventSourceARN": "arn:aws:sqs:us-east-2:123456789012:my-queue", + "awsRegion": "us-east-2" + } + ] +} \ No newline at end of file diff --git a/extensions/funqy/funqy-amazon-lambda/deployment/src/test/resources/events/sqs/ok.json b/extensions/funqy/funqy-amazon-lambda/deployment/src/test/resources/events/sqs/ok.json new file mode 100644 index 00000000000000..a5b3b937145053 --- /dev/null +++ b/extensions/funqy/funqy-amazon-lambda/deployment/src/test/resources/events/sqs/ok.json @@ -0,0 +1,36 @@ +{ + "Records": [ + { + "messageId": "1", + "receiptHandle": "AQEBwJnKyrHigUMZj6rYigCgxlaS3SLy0a...", + "body": "{\"message\":\"hello\",\"throwError\":false}", + "attributes": { + "ApproximateReceiveCount": "1", + "SentTimestamp": "1545082649183", + "SenderId": "AIDAIENQZJOLO23YVJ4VO", + "ApproximateFirstReceiveTimestamp": "1545082649185" + }, + "messageAttributes": {}, + "md5OfBody": "e4e68fb7bd0e697a0ae8f1bb342846b3", + "eventSource": "aws:sqs", + "eventSourceARN": "arn:aws:sqs:us-east-2:123456789012:my-queue", + "awsRegion": "us-east-2" + }, + { + "messageId": "2", + "receiptHandle": "AQEBzWwaftRI0KuVm4tP+/7q1rGgNqicHq...", + "body": "{\"message\":\"fail\",\"throwError\":false}", + "attributes": { + "ApproximateReceiveCount": "1", + "SentTimestamp": "1545082650636", + "SenderId": "AIDAIENQZJOLO23YVJ4VO", + "ApproximateFirstReceiveTimestamp": "1545082650649" + }, + "messageAttributes": {}, + "md5OfBody": "e4e68fb7bd0e697a0ae8f1bb342846b3", + "eventSource": "aws:sqs", + "eventSourceARN": "arn:aws:sqs:us-east-2:123456789012:my-queue", + "awsRegion": "us-east-2" + } + ] +} \ No newline at end of file diff --git a/extensions/funqy/funqy-amazon-lambda/deployment/src/test/resources/events/sqs/pipes-fail.json b/extensions/funqy/funqy-amazon-lambda/deployment/src/test/resources/events/sqs/pipes-fail.json new file mode 100644 index 00000000000000..76e02d9b27aaa2 --- /dev/null +++ b/extensions/funqy/funqy-amazon-lambda/deployment/src/test/resources/events/sqs/pipes-fail.json @@ -0,0 +1,34 @@ +[ + { + "messageId": "1", + "receiptHandle": "AQEBwJnKyrHigUMZj6rYigCgxlaS3SLy0a...", + "body": "{\"message\":\"hello\",\"throwError\":true}", + "attributes": { + "ApproximateReceiveCount": "1", + "SentTimestamp": "1545082649183", + "SenderId": "AIDAIENQZJOLO23YVJ4VO", + "ApproximateFirstReceiveTimestamp": "1545082649185" + }, + "messageAttributes": {}, + "md5OfBody": "e4e68fb7bd0e697a0ae8f1bb342846b3", + "eventSource": "aws:sqs", + "eventSourceARN": "arn:aws:sqs:us-east-2:123456789012:my-queue", + "awsRegion": "us-east-2" + }, + { + "messageId": "2", + "receiptHandle": "AQEBzWwaftRI0KuVm4tP+/7q1rGgNqicHq...", + "body": "{\"message\":\"fail\",\"throwError\":false}", + "attributes": { + "ApproximateReceiveCount": "1", + "SentTimestamp": "1545082650636", + "SenderId": "AIDAIENQZJOLO23YVJ4VO", + "ApproximateFirstReceiveTimestamp": "1545082650649" + }, + "messageAttributes": {}, + "md5OfBody": "e4e68fb7bd0e697a0ae8f1bb342846b3", + "eventSource": "aws:sqs", + "eventSourceARN": "arn:aws:sqs:us-east-2:123456789012:my-queue", + "awsRegion": "us-east-2" + } +] \ No newline at end of file diff --git a/extensions/funqy/funqy-amazon-lambda/deployment/src/test/resources/events/sqs/pipes-ok.json b/extensions/funqy/funqy-amazon-lambda/deployment/src/test/resources/events/sqs/pipes-ok.json new file mode 100644 index 00000000000000..83f6fb1449950a --- /dev/null +++ b/extensions/funqy/funqy-amazon-lambda/deployment/src/test/resources/events/sqs/pipes-ok.json @@ -0,0 +1,34 @@ +[ + { + "messageId": "1", + "receiptHandle": "AQEBwJnKyrHigUMZj6rYigCgxlaS3SLy0a...", + "body": "{\"message\":\"hello\",\"throwError\":false}", + "attributes": { + "ApproximateReceiveCount": "1", + "SentTimestamp": "1545082649183", + "SenderId": "AIDAIENQZJOLO23YVJ4VO", + "ApproximateFirstReceiveTimestamp": "1545082649185" + }, + "messageAttributes": {}, + "md5OfBody": "e4e68fb7bd0e697a0ae8f1bb342846b3", + "eventSource": "aws:sqs", + "eventSourceARN": "arn:aws:sqs:us-east-2:123456789012:my-queue", + "awsRegion": "us-east-2" + }, + { + "messageId": "2", + "receiptHandle": "AQEBzWwaftRI0KuVm4tP+/7q1rGgNqicHq...", + "body": "{\"message\":\"fail\",\"throwError\":false}", + "attributes": { + "ApproximateReceiveCount": "1", + "SentTimestamp": "1545082650636", + "SenderId": "AIDAIENQZJOLO23YVJ4VO", + "ApproximateFirstReceiveTimestamp": "1545082650649" + }, + "messageAttributes": {}, + "md5OfBody": "e4e68fb7bd0e697a0ae8f1bb342846b3", + "eventSource": "aws:sqs", + "eventSourceARN": "arn:aws:sqs:us-east-2:123456789012:my-queue", + "awsRegion": "us-east-2" + } +] \ No newline at end of file diff --git a/extensions/funqy/funqy-amazon-lambda/deployment/src/test/resources/item-function.properties b/extensions/funqy/funqy-amazon-lambda/deployment/src/test/resources/item-function.properties new file mode 100644 index 00000000000000..3e2957b10e52cc --- /dev/null +++ b/extensions/funqy/funqy-amazon-lambda/deployment/src/test/resources/item-function.properties @@ -0,0 +1,2 @@ +quarkus.funqy.export=item-function +quarkus.funqy.amazon-lambda.advanced-event-handling.enabled=true diff --git a/extensions/funqy/funqy-amazon-lambda/deployment/src/test/resources/no-batch-function.properties b/extensions/funqy/funqy-amazon-lambda/deployment/src/test/resources/no-batch-function.properties new file mode 100644 index 00000000000000..ca2d666955c8fa --- /dev/null +++ b/extensions/funqy/funqy-amazon-lambda/deployment/src/test/resources/no-batch-function.properties @@ -0,0 +1,3 @@ +quarkus.funqy.export=item-function +quarkus.funqy.amazon-lambda.advanced-event-handling.enabled=true +quarkus.funqy.amazon-lambda.advanced-event-handling.sqs.report-batch-item-failures=false diff --git a/extensions/funqy/funqy-amazon-lambda/runtime/pom.xml b/extensions/funqy/funqy-amazon-lambda/runtime/pom.xml index 4366bd5e63b3de..ccf8a40910d245 100644 --- a/extensions/funqy/funqy-amazon-lambda/runtime/pom.xml +++ b/extensions/funqy/funqy-amazon-lambda/runtime/pom.xml @@ -26,6 +26,20 @@ io.quarkus quarkus-jackson + + com.amazonaws + aws-lambda-java-events + + + joda-time + joda-time + + + + + io.cloudevents + cloudevents-api + diff --git a/extensions/funqy/funqy-amazon-lambda/runtime/src/main/java/io/quarkus/funqy/lambda/FunqyLambdaBindingRecorder.java b/extensions/funqy/funqy-amazon-lambda/runtime/src/main/java/io/quarkus/funqy/lambda/FunqyLambdaBindingRecorder.java index d2726940abd858..29ae0fd378bb65 100644 --- a/extensions/funqy/funqy-amazon-lambda/runtime/src/main/java/io/quarkus/funqy/lambda/FunqyLambdaBindingRecorder.java +++ b/extensions/funqy/funqy-amazon-lambda/runtime/src/main/java/io/quarkus/funqy/lambda/FunqyLambdaBindingRecorder.java @@ -21,6 +21,11 @@ import io.quarkus.amazon.lambda.runtime.LambdaOutputWriter; import io.quarkus.arc.ManagedContext; import io.quarkus.arc.runtime.BeanContainer; +import io.quarkus.funqy.lambda.config.FunqyAmazonBuildTimeConfig; +import io.quarkus.funqy.lambda.config.FunqyAmazonConfig; +import io.quarkus.funqy.lambda.event.AwsEventInputReader; +import io.quarkus.funqy.lambda.event.AwsEventOutputWriter; +import io.quarkus.funqy.lambda.event.EventProcessor; import io.quarkus.funqy.runtime.FunctionConstructor; import io.quarkus.funqy.runtime.FunctionInvoker; import io.quarkus.funqy.runtime.FunctionRecorder; @@ -42,11 +47,15 @@ public class FunqyLambdaBindingRecorder { private static BeanContainer beanContainer; private static LambdaInputReader reader; private static LambdaOutputWriter writer; + private static EventProcessor eventProcessor; + private static FunqyAmazonBuildTimeConfig amazonBuildTimeConfig; - public void init(BeanContainer bc) { + public void init(BeanContainer bc, FunqyAmazonBuildTimeConfig buildTimeConfig) { beanContainer = bc; FunctionConstructor.CONTAINER = bc; + amazonBuildTimeConfig = buildTimeConfig; ObjectMapper objectMapper = AmazonLambdaMapperRecorder.objectMapper; + for (FunctionInvoker invoker : FunctionRecorder.registry.invokers()) { if (invoker.hasInput()) { JavaType javaInputType = objectMapper.constructType(invoker.getInputType()); @@ -61,7 +70,7 @@ public void init(BeanContainer bc) { } } - public void chooseInvoker(FunqyConfig config) { + public void chooseInvoker(FunqyConfig config, FunqyAmazonConfig amazonConfig) { // this is done at Runtime so that we can change it with an environment variable. if (config.export.isPresent()) { invoker = FunctionRecorder.registry.matchInvoker(config.export.get()); @@ -76,35 +85,59 @@ public void chooseInvoker(FunqyConfig config) { } else { invoker = FunctionRecorder.registry.invokers().iterator().next(); } + + ObjectReader objectReader = null; if (invoker.hasInput()) { - reader = new JacksonInputReader((ObjectReader) invoker.getBindingContext().get(ObjectReader.class.getName())); + objectReader = (ObjectReader) invoker.getBindingContext().get(ObjectReader.class.getName()); + + if (amazonBuildTimeConfig.advancedEventHandling().enabled()) { + // We create a copy, because the mapper will be reconfigured for the advanced event handling, + // and we do not want to adjust the ObjectMapper, which is available in arc context. + ObjectMapper objectMapper = AmazonLambdaMapperRecorder.objectMapper.copy(); + reader = new AwsEventInputReader(objectMapper, objectReader, amazonBuildTimeConfig); + } else { + reader = new JacksonInputReader(objectReader); + } + } if (invoker.hasOutput()) { - writer = new JacksonOutputWriter((ObjectWriter) invoker.getBindingContext().get(ObjectWriter.class.getName())); + ObjectWriter objectWriter = (ObjectWriter) invoker.getBindingContext().get(ObjectWriter.class.getName()); + + if (!amazonBuildTimeConfig.advancedEventHandling().enabled()) { + writer = new JacksonOutputWriter(objectWriter); + } } + if (amazonBuildTimeConfig.advancedEventHandling().enabled()) { + ObjectMapper objectMapper = AmazonLambdaMapperRecorder.objectMapper.copy(); + writer = new AwsEventOutputWriter(objectMapper); + eventProcessor = new EventProcessor(objectReader, amazonBuildTimeConfig, amazonConfig); + } } /** * Called by JVM handler wrapper * * @param inputStream + * {@link InputStream} of the AWS SDK {@link com.amazonaws.services.lambda.runtime.RequestStreamHandler} * @param outputStream + * {@link OutputStream} of the AWS SDK {@link com.amazonaws.services.lambda.runtime.RequestStreamHandler} * @param context + * AWS context information provided to the Lambda * @throws IOException + * Is thrown in case the (de)serialization fails */ public static void handle(InputStream inputStream, OutputStream outputStream, Context context) throws IOException { Object input = null; if (invoker.hasInput()) { input = reader.readValue(inputStream); } - FunqyServerResponse response = dispatch(input); + FunqyServerResponse response = dispatch(input, context); Object value = response.getOutput().await().indefinitely(); if (value != null) { writer.writeValue(outputStream, value); } - } @SuppressWarnings("rawtypes") @@ -114,7 +147,7 @@ public void startPollLoop(ShutdownContext context, LaunchMode launchMode) { @Override protected Object processRequest(Object input, AmazonLambdaContext context) throws Exception { - FunqyServerResponse response = dispatch(input); + FunqyServerResponse response = dispatch(input, context); return response.getOutput().await().indefinitely(); } @@ -143,6 +176,14 @@ protected void processRequest(InputStream input, OutputStream output, AmazonLamb } + private static FunqyServerResponse dispatch(Object input, Context context) throws IOException { + if (eventProcessor != null) { + return eventProcessor.handle(input, FunqyLambdaBindingRecorder::dispatch, context); + } else { + return dispatch(input); + } + } + private static FunqyServerResponse dispatch(Object input) { ManagedContext requestContext = beanContainer.requestContext(); requestContext.activate(); diff --git a/extensions/funqy/funqy-amazon-lambda/runtime/src/main/java/io/quarkus/funqy/lambda/config/AdvancedEventHandlingBuildTimeConfig.java b/extensions/funqy/funqy-amazon-lambda/runtime/src/main/java/io/quarkus/funqy/lambda/config/AdvancedEventHandlingBuildTimeConfig.java new file mode 100644 index 00000000000000..af4f2940277485 --- /dev/null +++ b/extensions/funqy/funqy-amazon-lambda/runtime/src/main/java/io/quarkus/funqy/lambda/config/AdvancedEventHandlingBuildTimeConfig.java @@ -0,0 +1,17 @@ +package io.quarkus.funqy.lambda.config; + +import io.quarkus.runtime.annotations.ConfigGroup; +import io.smallrye.config.WithDefault; + +/** + * Advanced event handling build time configuration + */ +@ConfigGroup +public interface AdvancedEventHandlingBuildTimeConfig { + + /** + * If advanced event handling should be enabled + */ + @WithDefault("true") + boolean enabled(); +} diff --git a/extensions/funqy/funqy-amazon-lambda/runtime/src/main/java/io/quarkus/funqy/lambda/config/AdvancedEventHandlingConfig.java b/extensions/funqy/funqy-amazon-lambda/runtime/src/main/java/io/quarkus/funqy/lambda/config/AdvancedEventHandlingConfig.java new file mode 100644 index 00000000000000..e5ca3b0afacac9 --- /dev/null +++ b/extensions/funqy/funqy-amazon-lambda/runtime/src/main/java/io/quarkus/funqy/lambda/config/AdvancedEventHandlingConfig.java @@ -0,0 +1,30 @@ +package io.quarkus.funqy.lambda.config; + +import io.quarkus.runtime.annotations.ConfigGroup; + +/** + * Advanced event handling configuration + */ +@ConfigGroup +public interface AdvancedEventHandlingConfig { + + /** + * Sqs related config. + */ + Sqs sqs(); + + /** + * Sns related config. + */ + Sns sns(); + + /** + * Kinesis related config. + */ + Kinesis kinesis(); + + /** + * DynamoDb related config. + */ + DynamoDb dynamoDb(); +} diff --git a/extensions/funqy/funqy-amazon-lambda/runtime/src/main/java/io/quarkus/funqy/lambda/config/DynamoDb.java b/extensions/funqy/funqy-amazon-lambda/runtime/src/main/java/io/quarkus/funqy/lambda/config/DynamoDb.java new file mode 100644 index 00000000000000..6d182d725670bd --- /dev/null +++ b/extensions/funqy/funqy-amazon-lambda/runtime/src/main/java/io/quarkus/funqy/lambda/config/DynamoDb.java @@ -0,0 +1,17 @@ +package io.quarkus.funqy.lambda.config; + +import io.quarkus.runtime.annotations.ConfigGroup; +import io.smallrye.config.WithDefault; + +/** + * Kinesis event config + */ +@ConfigGroup +public interface DynamoDb { + + /** + * Allows functions to return partially successful responses for a batch of event records. + */ + @WithDefault("true") + boolean reportBatchItemFailures(); +} diff --git a/extensions/funqy/funqy-amazon-lambda/runtime/src/main/java/io/quarkus/funqy/lambda/config/FunqyAmazonBuildTimeConfig.java b/extensions/funqy/funqy-amazon-lambda/runtime/src/main/java/io/quarkus/funqy/lambda/config/FunqyAmazonBuildTimeConfig.java new file mode 100644 index 00000000000000..94d8c2da860bfb --- /dev/null +++ b/extensions/funqy/funqy-amazon-lambda/runtime/src/main/java/io/quarkus/funqy/lambda/config/FunqyAmazonBuildTimeConfig.java @@ -0,0 +1,17 @@ +package io.quarkus.funqy.lambda.config; + +import io.quarkus.runtime.annotations.ConfigPhase; +import io.quarkus.runtime.annotations.ConfigRoot; +import io.smallrye.config.ConfigMapping; +import io.smallrye.config.WithName; + +@ConfigMapping(prefix = "quarkus.funqy.amazon-lambda") +@ConfigRoot(phase = ConfigPhase.BUILD_AND_RUN_TIME_FIXED) +public interface FunqyAmazonBuildTimeConfig { + + /** + * The advanced event handling config + */ + @WithName("advanced-event-handling") + AdvancedEventHandlingBuildTimeConfig advancedEventHandling(); +} diff --git a/extensions/funqy/funqy-amazon-lambda/runtime/src/main/java/io/quarkus/funqy/lambda/config/FunqyAmazonConfig.java b/extensions/funqy/funqy-amazon-lambda/runtime/src/main/java/io/quarkus/funqy/lambda/config/FunqyAmazonConfig.java new file mode 100644 index 00000000000000..ede409c6e90d1a --- /dev/null +++ b/extensions/funqy/funqy-amazon-lambda/runtime/src/main/java/io/quarkus/funqy/lambda/config/FunqyAmazonConfig.java @@ -0,0 +1,17 @@ +package io.quarkus.funqy.lambda.config; + +import io.quarkus.runtime.annotations.ConfigPhase; +import io.quarkus.runtime.annotations.ConfigRoot; +import io.smallrye.config.ConfigMapping; +import io.smallrye.config.WithName; + +@ConfigMapping(prefix = "quarkus.funqy.amazon-lambda") +@ConfigRoot(phase = ConfigPhase.RUN_TIME) +public interface FunqyAmazonConfig { + + /** + * The advanced event handling config + */ + @WithName("advanced-event-handling") + AdvancedEventHandlingConfig advancedEventHandling(); +} diff --git a/extensions/funqy/funqy-amazon-lambda/runtime/src/main/java/io/quarkus/funqy/lambda/config/Kinesis.java b/extensions/funqy/funqy-amazon-lambda/runtime/src/main/java/io/quarkus/funqy/lambda/config/Kinesis.java new file mode 100644 index 00000000000000..b05562d93573a8 --- /dev/null +++ b/extensions/funqy/funqy-amazon-lambda/runtime/src/main/java/io/quarkus/funqy/lambda/config/Kinesis.java @@ -0,0 +1,17 @@ +package io.quarkus.funqy.lambda.config; + +import io.quarkus.runtime.annotations.ConfigGroup; +import io.smallrye.config.WithDefault; + +/** + * Kinesis event config + */ +@ConfigGroup +public interface Kinesis { + + /** + * Allows functions to return partially successful responses for a batch of event records. + */ + @WithDefault("true") + boolean reportBatchItemFailures(); +} diff --git a/extensions/funqy/funqy-amazon-lambda/runtime/src/main/java/io/quarkus/funqy/lambda/config/Sns.java b/extensions/funqy/funqy-amazon-lambda/runtime/src/main/java/io/quarkus/funqy/lambda/config/Sns.java new file mode 100644 index 00000000000000..cfb273a73550f4 --- /dev/null +++ b/extensions/funqy/funqy-amazon-lambda/runtime/src/main/java/io/quarkus/funqy/lambda/config/Sns.java @@ -0,0 +1,11 @@ +package io.quarkus.funqy.lambda.config; + +import io.quarkus.runtime.annotations.ConfigGroup; + +/** + * Sns event config + */ +@ConfigGroup +public interface Sns { + +} diff --git a/extensions/funqy/funqy-amazon-lambda/runtime/src/main/java/io/quarkus/funqy/lambda/config/Sqs.java b/extensions/funqy/funqy-amazon-lambda/runtime/src/main/java/io/quarkus/funqy/lambda/config/Sqs.java new file mode 100644 index 00000000000000..75346e96e58655 --- /dev/null +++ b/extensions/funqy/funqy-amazon-lambda/runtime/src/main/java/io/quarkus/funqy/lambda/config/Sqs.java @@ -0,0 +1,17 @@ +package io.quarkus.funqy.lambda.config; + +import io.quarkus.runtime.annotations.ConfigGroup; +import io.smallrye.config.WithDefault; + +/** + * Sqs event config + */ +@ConfigGroup +public interface Sqs { + + /** + * Allows functions to return partially successful responses for a batch of event records. + */ + @WithDefault("true") + boolean reportBatchItemFailures(); +} diff --git a/extensions/funqy/funqy-amazon-lambda/runtime/src/main/java/io/quarkus/funqy/lambda/event/AwsEventInputReader.java b/extensions/funqy/funqy-amazon-lambda/runtime/src/main/java/io/quarkus/funqy/lambda/event/AwsEventInputReader.java new file mode 100644 index 00000000000000..1973d7fa596103 --- /dev/null +++ b/extensions/funqy/funqy-amazon-lambda/runtime/src/main/java/io/quarkus/funqy/lambda/event/AwsEventInputReader.java @@ -0,0 +1,191 @@ +package io.quarkus.funqy.lambda.event; + +import java.io.IOException; +import java.io.InputStream; +import java.util.Date; +import java.util.List; + +import org.jboss.logging.Logger; + +import com.amazonaws.services.lambda.runtime.events.DynamodbEvent; +import com.amazonaws.services.lambda.runtime.events.KinesisEvent; +import com.amazonaws.services.lambda.runtime.events.SNSEvent; +import com.amazonaws.services.lambda.runtime.events.SQSEvent; +import com.fasterxml.jackson.core.TreeNode; +import com.fasterxml.jackson.core.type.TypeReference; +import com.fasterxml.jackson.databind.DeserializationFeature; +import com.fasterxml.jackson.databind.JsonNode; +import com.fasterxml.jackson.databind.ObjectMapper; +import com.fasterxml.jackson.databind.ObjectReader; +import com.fasterxml.jackson.databind.module.SimpleModule; +import com.fasterxml.jackson.databind.node.ArrayNode; +import com.fasterxml.jackson.databind.node.ObjectNode; + +import io.cloudevents.SpecVersion; +import io.quarkus.amazon.lambda.runtime.LambdaInputReader; +import io.quarkus.funqy.lambda.config.FunqyAmazonBuildTimeConfig; +import io.quarkus.funqy.lambda.model.cloudevents.CloudEventV1; +import io.quarkus.funqy.lambda.model.kinesis.PipesKinesisEvent; + +public class AwsEventInputReader implements LambdaInputReader { + + private static final Logger log = Logger.getLogger(AwsEventInputReader.class); + + private static final String SQS_EVENT_SOURCE = "aws:sqs"; + private static final String SNS_EVENT_SOURCE = "aws:sns"; + private static final String KINESIS_EVENT_SOURCE = "aws:kinesis"; + private static final String DYNAMODB_EVENT_SOURCE = "aws:dynamodb"; + + final ObjectMapper mapper; + final FunqyAmazonBuildTimeConfig amazonBuildTimeConfig; + final ObjectReader reader; + + public AwsEventInputReader(ObjectMapper mapper, ObjectReader reader, + FunqyAmazonBuildTimeConfig amazonBuildTimeConfig) { + // configure the mapper for advanced event handling + final SimpleModule simpleModule = new SimpleModule(); + simpleModule.addDeserializer(Date.class, new DateDeserializer()); + mapper.registerModule(simpleModule); + mapper.disable(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES); + + this.mapper = mapper; + this.amazonBuildTimeConfig = amazonBuildTimeConfig; + this.reader = reader; + } + + @Override + public Object readValue(InputStream is) throws IOException { + + final JsonNode rootNode = mapper.readTree(is); + + if (amazonBuildTimeConfig.advancedEventHandling().enabled()) { + if (rootNode.isObject() || rootNode.isArray()) { + if (rootNode.isObject()) { + // object + ObjectNode object = (ObjectNode) rootNode; + + if (object.has("Records") && object.get("Records").isArray()) { + // We need to look into the first record entry, to distinguish the different types. + for (JsonNode record : object.get("Records")) { + return deserializeEvent(record, object); + } + } + } else { + // array. This happens in case of AWS EventBridge usage, and is also the only way to enable + // CloudEvents usage + ArrayNode array = (ArrayNode) rootNode; + for (JsonNode record : array) { + return deserializeEvent(record, array); + } + } + } + log.debug("Could not detect event type. Try to deserialize to funqy method type"); + } else { + log.debug("Advanced event handling disabled. Try to deserialize to funqy method type"); + } + // We could not identify the event based on the content. Fallback to deserializing the funq method input type. + return deserializeFunqReturnType(rootNode); + } + + public Object deserializeEvent(JsonNode record, JsonNode rootNode) throws IOException { + Object eventClass = getEventType(record, rootNode); + + log.debugv("Detected event class: {0}", eventClass); + + if (eventClass != null) { + if (eventClass instanceof Class clazz) { + return mapper.convertValue(rootNode, clazz); + } else if (eventClass instanceof TypeReference typeReference) { + return mapper.convertValue(rootNode, typeReference); + } + } + + log.debug("Could not detect event type. Try to deserialize to funqy method type"); + + // We could not identify the event based on the content. Fallback to deserializing the funq method input type. + return deserializeFunqReturnType(rootNode); + } + + private Object getEventType(JsonNode record, JsonNode object) { + String eventSource = getEventSource(record); + + if (eventSource == null) { + eventSource = "default"; + } + + // See: https://docs.aws.amazon.com/lambda/latest/dg/lambda-services.html + // and for Pipes: https://docs.aws.amazon.com/eventbridge/latest/userguide/eb-pipes-event-source.html + Class eventType = null; + switch (eventSource) { + case SQS_EVENT_SOURCE: + if (object.isObject()) { + eventType = SQSEvent.class; + } else if (object.isArray()) { + // EventBridge Pipes + return new TypeReference>() { + }; + } + break; + case SNS_EVENT_SOURCE: + eventType = SNSEvent.class; + break; + case KINESIS_EVENT_SOURCE: + // Exclude Kinesis time window events. This would require to send a responsible with a state. + // This is very specific to AWS and maybe not the way funqy wants to handle things. + if (object.isObject() && !object.has("window")) { + eventType = KinesisEvent.class; + } else if (object.isArray()) { + // EventBridge Pipes + return new TypeReference>() { + }; + } + break; + case DYNAMODB_EVENT_SOURCE: + if (object.isObject()) { + eventType = DynamodbEvent.class; + } else if (object.isArray()) { + // EventBridge Pipes + return new TypeReference>() { + }; + } + break; + default: + break; + } + if (eventType == null && isCloudEvent(record)) { + return new TypeReference>() { + }; + } + return eventType; + } + + private boolean isCloudEvent(final JsonNode record) { + // this is the best guess we can do. We check for required attributes: + // https://github.com/cloudevents/spec/blob/v1.0.2/cloudevents/spec.md#required-attributes + // A more tolerant way to check the type. We do not want the process to fail. We can fall back to + // the best guess logic. + return record.has("specversion") && record.get("specversion").isTextual() && + SpecVersion.V1.toString().equals(record.get("specversion").asText()) && + record.has("type"); + } + + private String getEventSource(JsonNode record) { + if (record.has("eventSource") && record.get("eventSource").isTextual()) { + return record.get("eventSource").asText(); + } + + // Unsure. In the AWS SNS documentation the key starts with a capital letter. I assume this is a mistake, + // but it should not hurt to be that tolerant as well. + if (record.has("EventSource") && record.get("EventSource").isTextual()) { + return record.get("EventSource").asText(); + } + return null; + } + + private Object deserializeFunqReturnType(TreeNode node) throws IOException { + if (reader != null) { + return reader.readValue(node.traverse()); + } + return null; + } +} diff --git a/extensions/funqy/funqy-amazon-lambda/runtime/src/main/java/io/quarkus/funqy/lambda/event/AwsEventOutputWriter.java b/extensions/funqy/funqy-amazon-lambda/runtime/src/main/java/io/quarkus/funqy/lambda/event/AwsEventOutputWriter.java new file mode 100644 index 00000000000000..c28e60fe8fa3f8 --- /dev/null +++ b/extensions/funqy/funqy-amazon-lambda/runtime/src/main/java/io/quarkus/funqy/lambda/event/AwsEventOutputWriter.java @@ -0,0 +1,26 @@ +package io.quarkus.funqy.lambda.event; + +import java.io.IOException; +import java.io.OutputStream; + +import com.fasterxml.jackson.databind.ObjectMapper; + +import io.quarkus.amazon.lambda.runtime.LambdaOutputWriter; + +/** + * Responsible for serializing the different data models of the events + */ +public class AwsEventOutputWriter implements LambdaOutputWriter { + + final ObjectMapper mapper; + + public AwsEventOutputWriter(ObjectMapper mapper) { + // At the moment no special configuration is needed. But we need an ObjectMapper due to different models. + this.mapper = mapper; + } + + @Override + public void writeValue(final OutputStream os, final Object obj) throws IOException { + mapper.writeValue(os, obj); + } +} diff --git a/extensions/funqy/funqy-amazon-lambda/runtime/src/main/java/io/quarkus/funqy/lambda/event/DateDeserializer.java b/extensions/funqy/funqy-amazon-lambda/runtime/src/main/java/io/quarkus/funqy/lambda/event/DateDeserializer.java new file mode 100644 index 00000000000000..be4d61c9f39465 --- /dev/null +++ b/extensions/funqy/funqy-amazon-lambda/runtime/src/main/java/io/quarkus/funqy/lambda/event/DateDeserializer.java @@ -0,0 +1,22 @@ +package io.quarkus.funqy.lambda.event; + +import java.io.IOException; +import java.util.Date; + +import com.fasterxml.jackson.core.JsonParser; +import com.fasterxml.jackson.databind.DeserializationContext; +import com.fasterxml.jackson.databind.JsonDeserializer; + +/** + * AWS uses double values. E.g. 1719318377.0 + * Therefore, a dedicated deserializer is needed + */ +public class DateDeserializer extends JsonDeserializer { + + @Override + public Date deserialize(JsonParser jsonParser, DeserializationContext ctxt) throws IOException { + double timestamp = jsonParser.getDoubleValue(); + long milliseconds = (long) (timestamp * 1000); + return new Date(milliseconds); + } +} diff --git a/extensions/funqy/funqy-amazon-lambda/runtime/src/main/java/io/quarkus/funqy/lambda/event/EventErrorHandler.java b/extensions/funqy/funqy-amazon-lambda/runtime/src/main/java/io/quarkus/funqy/lambda/event/EventErrorHandler.java new file mode 100644 index 00000000000000..fbef780f2e2210 --- /dev/null +++ b/extensions/funqy/funqy-amazon-lambda/runtime/src/main/java/io/quarkus/funqy/lambda/event/EventErrorHandler.java @@ -0,0 +1,29 @@ +package io.quarkus.funqy.lambda.event; + +import java.util.ArrayList; +import java.util.List; + +import io.smallrye.mutiny.Uni; + +public class EventErrorHandler { + + private final List failures = new ArrayList<>(); + + public Uni collectFailures(Uni uni, String id) { + return uni.onTermination().invoke((item, failure, cancellation) -> { + Throwable actualFailure = null; + if (failure != null) { + actualFailure = failure; + } else if (cancellation) { + actualFailure = new RuntimeException("Stream cancelled"); + } + if (actualFailure != null) { + failures.add(id); + } + }).onFailure().recoverWithNull(); + } + + public List getFailures() { + return failures; + } +} diff --git a/extensions/funqy/funqy-amazon-lambda/runtime/src/main/java/io/quarkus/funqy/lambda/event/EventHandler.java b/extensions/funqy/funqy-amazon-lambda/runtime/src/main/java/io/quarkus/funqy/lambda/event/EventHandler.java new file mode 100644 index 00000000000000..00defbd3e256b5 --- /dev/null +++ b/extensions/funqy/funqy-amazon-lambda/runtime/src/main/java/io/quarkus/funqy/lambda/event/EventHandler.java @@ -0,0 +1,61 @@ +package io.quarkus.funqy.lambda.event; + +import java.io.InputStream; +import java.util.List; +import java.util.function.Supplier; +import java.util.stream.Stream; + +import io.quarkus.funqy.lambda.config.FunqyAmazonConfig; + +/** + * This interface described how events should be handled + * + * @param type of the event + * @param type of the message + * @param type of the response + */ +public interface EventHandler { + + /** + * Provides all messages from the event. Specially for events with multiple messages from a batch. + * + * @param event event to provide messages from + * @param amazonConfig config + * @return a stream of messages + */ + Stream streamEvent(E event, FunqyAmazonConfig amazonConfig); + + /** + * Get the identifier of a message. + * + * @param message message to extract the identifier from + * @param amazonConfig config + * @return the identifier + */ + String getIdentifier(M message, FunqyAmazonConfig amazonConfig); + + /** + * Get the body of a message as an {@link InputStream} + * + * @param message message to extract the body from + * @param amazonConfig config + * @return the body input stream + */ + Supplier getBody(M message, FunqyAmazonConfig amazonConfig); + + /** + * Create the response based on the collected failures. + * + * @param failures a list of message identifier, which failed + * @param amazonConfig config + * @return the created response + */ + R createResponse(List failures, FunqyAmazonConfig amazonConfig); + + /** + * The class of the message + * + * @return the class of the message + */ + Class getMessageClass(); +} diff --git a/extensions/funqy/funqy-amazon-lambda/runtime/src/main/java/io/quarkus/funqy/lambda/event/EventProcessor.java b/extensions/funqy/funqy-amazon-lambda/runtime/src/main/java/io/quarkus/funqy/lambda/event/EventProcessor.java new file mode 100644 index 00000000000000..ae44725ddd30b7 --- /dev/null +++ b/extensions/funqy/funqy-amazon-lambda/runtime/src/main/java/io/quarkus/funqy/lambda/event/EventProcessor.java @@ -0,0 +1,153 @@ +package io.quarkus.funqy.lambda.event; + +import java.io.IOException; +import java.io.InputStream; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Optional; +import java.util.function.Function; +import java.util.function.Supplier; + +import org.jboss.logging.Logger; + +import com.amazonaws.services.lambda.runtime.Context; +import com.amazonaws.services.lambda.runtime.events.DynamodbEvent; +import com.amazonaws.services.lambda.runtime.events.KinesisEvent; +import com.amazonaws.services.lambda.runtime.events.SNSEvent; +import com.amazonaws.services.lambda.runtime.events.SQSEvent; +import com.fasterxml.jackson.databind.ObjectReader; + +import io.quarkus.funqy.lambda.FunqyResponseImpl; +import io.quarkus.funqy.lambda.config.FunqyAmazonBuildTimeConfig; +import io.quarkus.funqy.lambda.config.FunqyAmazonConfig; +import io.quarkus.funqy.lambda.event.cloudevents.CloudEventsHandler; +import io.quarkus.funqy.lambda.event.dynamodb.DynamoDbEventHandler; +import io.quarkus.funqy.lambda.event.dynamodb.PipesDynamoDbEventHandler; +import io.quarkus.funqy.lambda.event.kinesis.KinesisEventHandler; +import io.quarkus.funqy.lambda.event.kinesis.PipesKinesisEventHandler; +import io.quarkus.funqy.lambda.event.sns.SnsEventHandler; +import io.quarkus.funqy.lambda.event.sqs.PipesSqsEventHandler; +import io.quarkus.funqy.lambda.event.sqs.SqsEventHandler; +import io.quarkus.funqy.lambda.model.cloudevents.CloudEventV1; +import io.quarkus.funqy.lambda.model.kinesis.PipesKinesisEvent; +import io.quarkus.funqy.runtime.FunqyServerResponse; +import io.smallrye.mutiny.Uni; + +public class EventProcessor { + + private static final Logger log = Logger.getLogger(EventProcessor.class); + + private final ObjectReader objectReader; + private final FunqyAmazonConfig config; + private final Map, EventHandler> eventHandlers; + + public EventProcessor(final ObjectReader objectReader, + final FunqyAmazonBuildTimeConfig buildTimeConfig, final FunqyAmazonConfig config) { + this.objectReader = objectReader; + this.config = config; + + this.eventHandlers = new HashMap<>(); + if (buildTimeConfig.advancedEventHandling().enabled()) { + this.eventHandlers.put(SQSEvent.class, new SqsEventHandler()); + this.eventHandlers.put(SQSEvent.SQSMessage.class, new PipesSqsEventHandler()); + this.eventHandlers.put(SNSEvent.class, new SnsEventHandler()); + this.eventHandlers.put(KinesisEvent.class, new KinesisEventHandler()); + this.eventHandlers.put(PipesKinesisEvent.class, new PipesKinesisEventHandler()); + this.eventHandlers.put(DynamodbEvent.class, new DynamoDbEventHandler()); + this.eventHandlers.put(DynamodbEvent.DynamodbStreamRecord.class, new PipesDynamoDbEventHandler()); + this.eventHandlers.put(CloudEventV1.class, new CloudEventsHandler()); + } + } + + public FunqyServerResponse handle(Object event, + Function dispatcher, Context context) throws IOException { + + EventHandler handler = getHandler(event); + + if (handler != null) { + EventErrorHandler eventErrorHandler = new EventErrorHandler(); + + FunqyResponseImpl funqyResponse = new FunqyResponseImpl(); + funqyResponse.setOutput(handleEvent(handler, event, eventErrorHandler, dispatcher)); + return funqyResponse; + + } else { + // Unknown event type. We do what Funqy normally did in the past. + return dispatcher.apply(event); + } + } + + private EventHandler getHandler(Object event) { + if (event == null) { + return null; + } + if (event instanceof List list && !list.isEmpty()) { + // we need some special handling for lists + return eventHandlers.get(list.get(0).getClass()); + } + + return eventHandlers.get(event.getClass()); + } + + @SuppressWarnings("unchecked") + private Uni handleEvent(EventHandler handler, Object event, + EventErrorHandler eventErrorHandler, Function dispatcher) { + + // We collect all messages in a list first, so that we can execute them in parallel. + List> unis = handler.streamEvent((E) event, config) + .map(msg -> handleMessage(handler, eventErrorHandler, dispatcher, msg)).toList(); + + log.debugv("Received {0} messages in a batch.", unis.size()); + + return Uni.combine().all().unis(unis) + .collectFailures().discardItems() + .onFailure().invoke(err -> log.errorv(err, "An exception occurred during message handling.")) + .onFailure().recoverWithNull() + .replaceWith(() -> { + log.debugv("Detected {0} errors during message handling.", unis.size()); + return handler.createResponse(eventErrorHandler.getFailures(), config); + }); + } + + private Uni handleMessage(final EventHandler handler, final EventErrorHandler eventErrorHandler, + final Function dispatcher, final M msg) { + try { + // We check if the funqy method already uses the event model + final boolean isUsingEventModel = Optional.ofNullable(objectReader).map(ObjectReader::getValueType) + .map(type -> type.hasRawClass(handler.getMessageClass())) + .orElse(false); + + Object input; + if (isUsingEventModel) { + // If the funqy method is using the event model we do not need to deserialize the content + log.debug("Funqy method is using the event model. No further deserialization necessary."); + input = msg; + } else { + // The funqy method uses a custom model. We need to ask the handle to provide the content and then + // we deserialize it. + log.debug("Funqy method is using a custom model. Try to deserialize message."); + input = readMessageBody(handler.getBody(msg, config)); + } + + FunqyServerResponse response = dispatcher.apply(input); + + return eventErrorHandler.collectFailures(response.getOutput(), handler.getIdentifier(msg, config)); + } catch (Throwable e) { + log.errorv(e, """ + Event could not be handled. This can have multiple reasons: + 1. Message body could not be deserialized + 2. Using a not supported AWS event + """); + return eventErrorHandler.collectFailures(Uni.createFrom().failure(e), + handler.getIdentifier(msg, config)); + } + } + + private Object readMessageBody(Supplier is) throws IOException { + if (objectReader == null) { + return null; + } + return objectReader.readValue(is.get()); + } +} diff --git a/extensions/funqy/funqy-amazon-lambda/runtime/src/main/java/io/quarkus/funqy/lambda/event/cloudevents/CloudEventsHandler.java b/extensions/funqy/funqy-amazon-lambda/runtime/src/main/java/io/quarkus/funqy/lambda/event/cloudevents/CloudEventsHandler.java new file mode 100644 index 00000000000000..380e6d670df52d --- /dev/null +++ b/extensions/funqy/funqy-amazon-lambda/runtime/src/main/java/io/quarkus/funqy/lambda/event/cloudevents/CloudEventsHandler.java @@ -0,0 +1,46 @@ +package io.quarkus.funqy.lambda.event.cloudevents; + +import java.io.ByteArrayInputStream; +import java.io.InputStream; +import java.util.List; +import java.util.function.Supplier; +import java.util.stream.Stream; + +import io.cloudevents.CloudEvent; +import io.quarkus.funqy.lambda.config.FunqyAmazonConfig; +import io.quarkus.funqy.lambda.event.EventHandler; +import io.quarkus.funqy.lambda.model.pipes.BatchItemFailures; +import io.quarkus.funqy.lambda.model.pipes.Response; + +public class CloudEventsHandler implements EventHandler, CloudEvent, Response> { + @Override + public Stream streamEvent(final List event, final FunqyAmazonConfig amazonConfig) { + if (event == null) { + return Stream.empty(); + } + return event.stream(); + } + + @Override + public String getIdentifier(final CloudEvent message, final FunqyAmazonConfig amazonConfig) { + return message.getId(); + } + + @Override + public Supplier getBody(final CloudEvent message, final FunqyAmazonConfig amazonConfig) { + return () -> new ByteArrayInputStream(message.getData().toBytes()); + } + + @Override + public Response createResponse(final List failures, final FunqyAmazonConfig amazonConfig) { + if (!amazonConfig.advancedEventHandling().sqs().reportBatchItemFailures()) { + return null; + } + return new Response(failures.stream().map(BatchItemFailures::new).toList()); + } + + @Override + public Class getMessageClass() { + return CloudEvent.class; + } +} diff --git a/extensions/funqy/funqy-amazon-lambda/runtime/src/main/java/io/quarkus/funqy/lambda/event/dynamodb/DynamoDbEventHandler.java b/extensions/funqy/funqy-amazon-lambda/runtime/src/main/java/io/quarkus/funqy/lambda/event/dynamodb/DynamoDbEventHandler.java new file mode 100644 index 00000000000000..e47ad7d92e33c9 --- /dev/null +++ b/extensions/funqy/funqy-amazon-lambda/runtime/src/main/java/io/quarkus/funqy/lambda/event/dynamodb/DynamoDbEventHandler.java @@ -0,0 +1,53 @@ +package io.quarkus.funqy.lambda.event.dynamodb; + +import java.io.InputStream; +import java.util.List; +import java.util.function.Supplier; +import java.util.stream.Stream; + +import com.amazonaws.services.lambda.runtime.events.DynamodbEvent; +import com.amazonaws.services.lambda.runtime.events.DynamodbEvent.DynamodbStreamRecord; +import com.amazonaws.services.lambda.runtime.events.StreamsEventResponse; + +import io.quarkus.funqy.lambda.config.FunqyAmazonConfig; +import io.quarkus.funqy.lambda.event.EventHandler; + +public class DynamoDbEventHandler implements EventHandler { + + @Override + public Stream streamEvent(DynamodbEvent event, FunqyAmazonConfig amazonConfig) { + if (event == null) { + return Stream.empty(); + } + return event.getRecords().stream(); + } + + @Override + public String getIdentifier(DynamodbStreamRecord message, FunqyAmazonConfig amazonConfig) { + return message.getDynamodb().getSequenceNumber(); + } + + @Override + public Supplier getBody(DynamodbStreamRecord message, FunqyAmazonConfig amazonConfig) { + throw new IllegalStateException(""" + DynamoDB records are too specific. It is not supported to extract a message from them. \ + Use the DynamodbStreamRecord in your funq method, or use EventBridge Pipes with CloudEvents. + """); + } + + @Override + public StreamsEventResponse createResponse(List failures, FunqyAmazonConfig amazonConfig) { + if (!amazonConfig.advancedEventHandling().dynamoDb().reportBatchItemFailures()) { + return null; + } + return StreamsEventResponse.builder().withBatchItemFailures( + failures.stream().map(id -> StreamsEventResponse.BatchItemFailure.builder() + .withItemIdentifier(id).build()).toList()) + .build(); + } + + @Override + public Class getMessageClass() { + return DynamodbStreamRecord.class; + } +} diff --git a/extensions/funqy/funqy-amazon-lambda/runtime/src/main/java/io/quarkus/funqy/lambda/event/dynamodb/PipesDynamoDbEventHandler.java b/extensions/funqy/funqy-amazon-lambda/runtime/src/main/java/io/quarkus/funqy/lambda/event/dynamodb/PipesDynamoDbEventHandler.java new file mode 100644 index 00000000000000..6363bf1e82a484 --- /dev/null +++ b/extensions/funqy/funqy-amazon-lambda/runtime/src/main/java/io/quarkus/funqy/lambda/event/dynamodb/PipesDynamoDbEventHandler.java @@ -0,0 +1,53 @@ +package io.quarkus.funqy.lambda.event.dynamodb; + +import java.io.InputStream; +import java.util.List; +import java.util.function.Supplier; +import java.util.stream.Stream; + +import com.amazonaws.services.lambda.runtime.events.DynamodbEvent.DynamodbStreamRecord; +import com.amazonaws.services.lambda.runtime.events.StreamsEventResponse; + +import io.quarkus.funqy.lambda.config.FunqyAmazonConfig; +import io.quarkus.funqy.lambda.event.EventHandler; + +public class PipesDynamoDbEventHandler + implements EventHandler, DynamodbStreamRecord, StreamsEventResponse> { + + @Override + public Stream streamEvent(List event, FunqyAmazonConfig amazonConfig) { + if (event == null) { + return Stream.empty(); + } + return event.stream(); + } + + @Override + public String getIdentifier(DynamodbStreamRecord message, FunqyAmazonConfig amazonConfig) { + return message.getDynamodb().getSequenceNumber(); + } + + @Override + public Supplier getBody(DynamodbStreamRecord message, FunqyAmazonConfig amazonConfig) { + throw new IllegalStateException(""" + DynamoDB records are too specific. It is not supported to extract a message from them. \ + Use the DynamodbStreamRecord in your funq method, or use EventBridge Pipes with CloudEvents. + """); + } + + @Override + public StreamsEventResponse createResponse(List failures, FunqyAmazonConfig amazonConfig) { + if (!amazonConfig.advancedEventHandling().dynamoDb().reportBatchItemFailures()) { + return null; + } + return StreamsEventResponse.builder().withBatchItemFailures( + failures.stream().map(id -> StreamsEventResponse.BatchItemFailure.builder() + .withItemIdentifier(id).build()).toList()) + .build(); + } + + @Override + public Class getMessageClass() { + return DynamodbStreamRecord.class; + } +} diff --git a/extensions/funqy/funqy-amazon-lambda/runtime/src/main/java/io/quarkus/funqy/lambda/event/kinesis/KinesisEventHandler.java b/extensions/funqy/funqy-amazon-lambda/runtime/src/main/java/io/quarkus/funqy/lambda/event/kinesis/KinesisEventHandler.java new file mode 100644 index 00000000000000..ba9e2c8d7a33e3 --- /dev/null +++ b/extensions/funqy/funqy-amazon-lambda/runtime/src/main/java/io/quarkus/funqy/lambda/event/kinesis/KinesisEventHandler.java @@ -0,0 +1,54 @@ +package io.quarkus.funqy.lambda.event.kinesis; + +import java.io.ByteArrayInputStream; +import java.io.InputStream; +import java.util.List; +import java.util.function.Supplier; +import java.util.stream.Stream; + +import com.amazonaws.services.lambda.runtime.events.KinesisEvent; +import com.amazonaws.services.lambda.runtime.events.KinesisEvent.KinesisEventRecord; +import com.amazonaws.services.lambda.runtime.events.StreamsEventResponse; +import com.amazonaws.services.lambda.runtime.events.StreamsEventResponse.BatchItemFailure; +import com.fasterxml.jackson.databind.util.ByteBufferBackedInputStream; + +import io.quarkus.funqy.lambda.config.FunqyAmazonConfig; +import io.quarkus.funqy.lambda.event.EventHandler; + +public class KinesisEventHandler implements EventHandler { + + @Override + public Stream streamEvent(KinesisEvent event, FunqyAmazonConfig amazonConfig) { + if (event == null) { + return Stream.empty(); + } + return event.getRecords().stream().map(KinesisEventRecord::getKinesis); + } + + @Override + public String getIdentifier(KinesisEvent.Record message, FunqyAmazonConfig amazonConfig) { + return message.getSequenceNumber(); + } + + @Override + public Supplier getBody(KinesisEvent.Record message, FunqyAmazonConfig amazonConfig) { + if (message.getData() == null) { + return ByteArrayInputStream::nullInputStream; + } + return () -> new ByteBufferBackedInputStream(message.getData()); + } + + @Override + public StreamsEventResponse createResponse(List failures, FunqyAmazonConfig amazonConfig) { + if (!amazonConfig.advancedEventHandling().kinesis().reportBatchItemFailures()) { + return null; + } + return StreamsEventResponse.builder().withBatchItemFailures( + failures.stream().map(id -> BatchItemFailure.builder().withItemIdentifier(id).build()).toList()).build(); + } + + @Override + public Class getMessageClass() { + return KinesisEvent.Record.class; + } +} diff --git a/extensions/funqy/funqy-amazon-lambda/runtime/src/main/java/io/quarkus/funqy/lambda/event/kinesis/PipesKinesisEventHandler.java b/extensions/funqy/funqy-amazon-lambda/runtime/src/main/java/io/quarkus/funqy/lambda/event/kinesis/PipesKinesisEventHandler.java new file mode 100644 index 00000000000000..a21ae47edad84a --- /dev/null +++ b/extensions/funqy/funqy-amazon-lambda/runtime/src/main/java/io/quarkus/funqy/lambda/event/kinesis/PipesKinesisEventHandler.java @@ -0,0 +1,57 @@ +package io.quarkus.funqy.lambda.event.kinesis; + +import java.io.ByteArrayInputStream; +import java.io.InputStream; +import java.util.List; +import java.util.function.Function; +import java.util.function.Supplier; +import java.util.stream.Stream; + +import com.amazonaws.services.lambda.runtime.events.KinesisEvent; +import com.amazonaws.services.lambda.runtime.events.StreamsEventResponse; +import com.amazonaws.services.lambda.runtime.events.StreamsEventResponse.BatchItemFailure; +import com.fasterxml.jackson.databind.util.ByteBufferBackedInputStream; + +import io.quarkus.funqy.lambda.config.FunqyAmazonConfig; +import io.quarkus.funqy.lambda.event.EventHandler; +import io.quarkus.funqy.lambda.model.kinesis.PipesKinesisEvent; + +public class PipesKinesisEventHandler + implements EventHandler, KinesisEvent.Record, StreamsEventResponse> { + + @Override + public Stream streamEvent(List event, FunqyAmazonConfig amazonConfig) { + if (event == null) { + return Stream.empty(); + } + return event.stream().map(Function.identity()); + } + + @Override + public String getIdentifier(KinesisEvent.Record message, FunqyAmazonConfig amazonConfig) { + return message.getSequenceNumber(); + } + + @Override + public Supplier getBody(KinesisEvent.Record message, FunqyAmazonConfig amazonConfig) { + if (message.getData() == null) { + return ByteArrayInputStream::nullInputStream; + } + + return () -> new ByteBufferBackedInputStream(message.getData()); + } + + @Override + public StreamsEventResponse createResponse(List failures, FunqyAmazonConfig amazonConfig) { + if (!amazonConfig.advancedEventHandling().kinesis().reportBatchItemFailures()) { + return null; + } + return StreamsEventResponse.builder().withBatchItemFailures( + failures.stream().map(id -> BatchItemFailure.builder().withItemIdentifier(id).build()).toList()).build(); + } + + @Override + public Class getMessageClass() { + return KinesisEvent.Record.class; + } +} diff --git a/extensions/funqy/funqy-amazon-lambda/runtime/src/main/java/io/quarkus/funqy/lambda/event/sns/SnsEventHandler.java b/extensions/funqy/funqy-amazon-lambda/runtime/src/main/java/io/quarkus/funqy/lambda/event/sns/SnsEventHandler.java new file mode 100644 index 00000000000000..a263ce6e908128 --- /dev/null +++ b/extensions/funqy/funqy-amazon-lambda/runtime/src/main/java/io/quarkus/funqy/lambda/event/sns/SnsEventHandler.java @@ -0,0 +1,49 @@ +package io.quarkus.funqy.lambda.event.sns; + +import java.io.ByteArrayInputStream; +import java.io.InputStream; +import java.nio.charset.StandardCharsets; +import java.util.List; +import java.util.function.Supplier; +import java.util.stream.Stream; + +import com.amazonaws.services.lambda.runtime.events.SNSEvent; +import com.amazonaws.services.lambda.runtime.events.SNSEvent.SNSRecord; + +import io.quarkus.funqy.lambda.config.FunqyAmazonConfig; +import io.quarkus.funqy.lambda.event.EventHandler; + +public class SnsEventHandler implements EventHandler { + + @Override + public Stream streamEvent(SNSEvent event, FunqyAmazonConfig amazonConfig) { + if (event == null) { + return Stream.empty(); + } + return event.getRecords().stream(); + } + + @Override + public String getIdentifier(SNSRecord message, FunqyAmazonConfig amazonConfig) { + return message.getSNS().getMessageId(); + } + + @Override + public Supplier getBody(SNSRecord message, FunqyAmazonConfig amazonConfig) { + if (message.getSNS() == null) { + return ByteArrayInputStream::nullInputStream; + } + return () -> new ByteArrayInputStream(message.getSNS().getMessage().getBytes(StandardCharsets.UTF_8)); + } + + @Override + public Void createResponse(List failures, FunqyAmazonConfig amazonConfig) { + // SNS does not support batch item failures. We return nothing, which results in no response + return null; + } + + @Override + public Class getMessageClass() { + return SNSRecord.class; + } +} diff --git a/extensions/funqy/funqy-amazon-lambda/runtime/src/main/java/io/quarkus/funqy/lambda/event/sqs/PipesSqsEventHandler.java b/extensions/funqy/funqy-amazon-lambda/runtime/src/main/java/io/quarkus/funqy/lambda/event/sqs/PipesSqsEventHandler.java new file mode 100644 index 00000000000000..f9158e6b5a4469 --- /dev/null +++ b/extensions/funqy/funqy-amazon-lambda/runtime/src/main/java/io/quarkus/funqy/lambda/event/sqs/PipesSqsEventHandler.java @@ -0,0 +1,49 @@ +package io.quarkus.funqy.lambda.event.sqs; + +import java.io.ByteArrayInputStream; +import java.io.InputStream; +import java.nio.charset.StandardCharsets; +import java.util.List; +import java.util.function.Supplier; +import java.util.stream.Stream; + +import com.amazonaws.services.lambda.runtime.events.SQSEvent.SQSMessage; + +import io.quarkus.funqy.lambda.config.FunqyAmazonConfig; +import io.quarkus.funqy.lambda.event.EventHandler; +import io.quarkus.funqy.lambda.model.pipes.BatchItemFailures; +import io.quarkus.funqy.lambda.model.pipes.Response; + +public class PipesSqsEventHandler implements EventHandler, SQSMessage, Response> { + + @Override + public Stream streamEvent(final List event, final FunqyAmazonConfig amazonConfig) { + return event.stream(); + } + + @Override + public String getIdentifier(SQSMessage message, FunqyAmazonConfig amazonConfig) { + return message.getMessageId(); + } + + @Override + public Supplier getBody(SQSMessage message, FunqyAmazonConfig amazonConfig) { + if (message.getBody() == null) { + return ByteArrayInputStream::nullInputStream; + } + return () -> new ByteArrayInputStream(message.getBody().getBytes(StandardCharsets.UTF_8)); + } + + @Override + public Response createResponse(List failures, FunqyAmazonConfig amazonConfig) { + if (!amazonConfig.advancedEventHandling().sqs().reportBatchItemFailures()) { + return null; + } + return new Response(failures.stream().map(BatchItemFailures::new).toList()); + } + + @Override + public Class getMessageClass() { + return SQSMessage.class; + } +} diff --git a/extensions/funqy/funqy-amazon-lambda/runtime/src/main/java/io/quarkus/funqy/lambda/event/sqs/SqsEventHandler.java b/extensions/funqy/funqy-amazon-lambda/runtime/src/main/java/io/quarkus/funqy/lambda/event/sqs/SqsEventHandler.java new file mode 100644 index 00000000000000..ea39ff19dd0653 --- /dev/null +++ b/extensions/funqy/funqy-amazon-lambda/runtime/src/main/java/io/quarkus/funqy/lambda/event/sqs/SqsEventHandler.java @@ -0,0 +1,54 @@ +package io.quarkus.funqy.lambda.event.sqs; + +import java.io.ByteArrayInputStream; +import java.io.InputStream; +import java.nio.charset.StandardCharsets; +import java.util.List; +import java.util.function.Supplier; +import java.util.stream.Stream; + +import com.amazonaws.services.lambda.runtime.events.SQSBatchResponse; +import com.amazonaws.services.lambda.runtime.events.SQSBatchResponse.BatchItemFailure; +import com.amazonaws.services.lambda.runtime.events.SQSEvent; +import com.amazonaws.services.lambda.runtime.events.SQSEvent.SQSMessage; + +import io.quarkus.funqy.lambda.config.FunqyAmazonConfig; +import io.quarkus.funqy.lambda.event.EventHandler; + +public class SqsEventHandler implements EventHandler { + + @Override + public Stream streamEvent(SQSEvent event, FunqyAmazonConfig amazonConfig) { + if (event == null) { + return Stream.empty(); + } + return event.getRecords().stream(); + } + + @Override + public String getIdentifier(SQSMessage message, FunqyAmazonConfig amazonConfig) { + return message.getMessageId(); + } + + @Override + public Supplier getBody(SQSMessage message, FunqyAmazonConfig amazonConfig) { + if (message.getBody() == null) { + return ByteArrayInputStream::nullInputStream; + } + return () -> new ByteArrayInputStream(message.getBody().getBytes(StandardCharsets.UTF_8)); + } + + @Override + public SQSBatchResponse createResponse(List failures, FunqyAmazonConfig amazonConfig) { + if (!amazonConfig.advancedEventHandling().sqs().reportBatchItemFailures()) { + return null; + } + return SQSBatchResponse.builder().withBatchItemFailures( + failures.stream().map(id -> BatchItemFailure.builder().withItemIdentifier(id).build()).toList()).build(); + } + + @Override + public Class getMessageClass() { + return SQSMessage.class; + } +} diff --git a/extensions/funqy/funqy-amazon-lambda/runtime/src/main/java/io/quarkus/funqy/lambda/model/FunqyMethod.java b/extensions/funqy/funqy-amazon-lambda/runtime/src/main/java/io/quarkus/funqy/lambda/model/FunqyMethod.java new file mode 100644 index 00000000000000..834b2dabcc4cc2 --- /dev/null +++ b/extensions/funqy/funqy-amazon-lambda/runtime/src/main/java/io/quarkus/funqy/lambda/model/FunqyMethod.java @@ -0,0 +1,51 @@ +package io.quarkus.funqy.lambda.model; + +import java.util.Optional; + +import com.fasterxml.jackson.databind.JavaType; +import com.fasterxml.jackson.databind.ObjectReader; +import com.fasterxml.jackson.databind.ObjectWriter; + +public class FunqyMethod { + + private ObjectReader reader; + private ObjectWriter writer; + private JavaType inputType; + private JavaType outputType; + + public FunqyMethod setReader(final ObjectReader reader) { + this.reader = reader; + return this; + } + + public FunqyMethod setWriter(final ObjectWriter writer) { + this.writer = writer; + return this; + } + + public FunqyMethod setInputType(final JavaType inputType) { + this.inputType = inputType; + return this; + } + + public FunqyMethod setOutputType(final JavaType outputType) { + this.outputType = outputType; + return this; + } + + public Optional getReader() { + return Optional.ofNullable(reader); + } + + public Optional getWriter() { + return Optional.ofNullable(writer); + } + + public Optional getInputType() { + return Optional.ofNullable(inputType); + } + + public Optional getOutputType() { + return Optional.ofNullable(outputType); + } +} diff --git a/extensions/funqy/funqy-amazon-lambda/runtime/src/main/java/io/quarkus/funqy/lambda/model/cloudevents/CloudEventDataV1.java b/extensions/funqy/funqy-amazon-lambda/runtime/src/main/java/io/quarkus/funqy/lambda/model/cloudevents/CloudEventDataV1.java new file mode 100644 index 00000000000000..9ba920623c2bfd --- /dev/null +++ b/extensions/funqy/funqy-amazon-lambda/runtime/src/main/java/io/quarkus/funqy/lambda/model/cloudevents/CloudEventDataV1.java @@ -0,0 +1,27 @@ +package io.quarkus.funqy.lambda.model.cloudevents; + +import java.nio.charset.StandardCharsets; + +import io.cloudevents.CloudEventData; + +public class CloudEventDataV1 implements CloudEventData { + + private final byte[] data; + + public CloudEventDataV1(final String data) { + if (data == null) { + this.data = null; + } else { + this.data = data.getBytes(StandardCharsets.UTF_8); + } + } + + public CloudEventDataV1(final byte[] data) { + this.data = data; + } + + @Override + public byte[] toBytes() { + return this.data; + } +} diff --git a/extensions/funqy/funqy-amazon-lambda/runtime/src/main/java/io/quarkus/funqy/lambda/model/cloudevents/CloudEventV1.java b/extensions/funqy/funqy-amazon-lambda/runtime/src/main/java/io/quarkus/funqy/lambda/model/cloudevents/CloudEventV1.java new file mode 100644 index 00000000000000..7f1b52bb5c01f0 --- /dev/null +++ b/extensions/funqy/funqy-amazon-lambda/runtime/src/main/java/io/quarkus/funqy/lambda/model/cloudevents/CloudEventV1.java @@ -0,0 +1,172 @@ +package io.quarkus.funqy.lambda.model.cloudevents; + +import java.io.IOException; +import java.net.URI; +import java.time.OffsetDateTime; +import java.util.HashMap; +import java.util.Map; +import java.util.Set; + +import com.fasterxml.jackson.annotation.JsonAnySetter; +import com.fasterxml.jackson.annotation.JsonProperty; +import com.fasterxml.jackson.databind.JsonNode; + +import io.cloudevents.CloudEvent; +import io.cloudevents.CloudEventData; +import io.cloudevents.SpecVersion; + +/** + * This class represents a {@link CloudEvent} in version 1.0 and is Jackson friendly + */ +public class CloudEventV1 implements CloudEvent { + + //private static final Pattern JSON_TYPE_PATTERN = Pattern.compile("^(application|text)/([a-zA-Z]+\\+)?json;?.*$"); + + private final CloudEventDataV1 data; + private final SpecVersion specVersion; + private final String id; + private final String type; + private final URI source; + private final String dataContentType; + private final URI dataSchema; + private final String subject; + private final OffsetDateTime time; + private final Map extensions; + + public CloudEventV1( + @JsonProperty("specversion") String specVersion, + @JsonProperty("id") String id, + @JsonProperty("type") String type, + @JsonProperty("source") URI source, + @JsonProperty("datacontenttype") String dataContentType, + @JsonProperty("dataschema") URI dataSchema, + @JsonProperty("subject") String subject, + @JsonProperty("time") OffsetDateTime time, + @JsonProperty("data") JsonNode data, + @JsonProperty("data_base64") JsonNode dataBase64) { + this.specVersion = SpecVersion.parse(specVersion); + this.id = id; + this.type = type; + this.source = source; + this.dataContentType = dataContentType; + this.dataSchema = dataSchema; + this.subject = subject; + this.time = time; + this.extensions = new HashMap<>(); + this.data = deserializeData(data, dataBase64, dataContentType); + } + + @JsonAnySetter + public void add(String property, String value) { + switch (property) { + case "specversion": + case "id": + case "source": + case "type": + case "datacontenttype": + case "dataschema": + case "data": + case "data_base64": + case "subject": + case "time": + // Those names are reserved + return; + } + extensions.put(property, value); + } + + private CloudEventDataV1 deserializeData(final JsonNode data, final JsonNode dataBase64, + final String dataContentType) { + if (dataBase64 != null) { + try { + return new CloudEventDataV1(dataBase64.binaryValue()); + } catch (IOException e) { + throw new RuntimeException(e); + } + } else if (data == null) { + return null; + } + + if (data.isTextual()) { + return new CloudEventDataV1(data.asText()); + } else { + // This should work for every other type. Even for application/json, because we need to serialize + // the data anyway for the interface. + return new CloudEventDataV1(data.toString()); + } + } + + @Override + public CloudEventData getData() { + return this.data; + } + + @Override + public SpecVersion getSpecVersion() { + return this.specVersion; + } + + @Override + public String getId() { + return this.id; + } + + @Override + public String getType() { + return this.type; + } + + @Override + public URI getSource() { + return this.source; + } + + @Override + public String getDataContentType() { + return this.dataContentType; + } + + @Override + public URI getDataSchema() { + return this.dataSchema; + } + + @Override + public String getSubject() { + return this.subject; + } + + @Override + public OffsetDateTime getTime() { + return this.time; + } + + @Override + public Object getAttribute(final String attributeName) throws IllegalArgumentException { + return switch (attributeName) { + case "specversion" -> getSpecVersion(); + case "id" -> getId(); + case "source" -> getSource(); + case "type" -> getType(); + case "datacontenttype" -> getDataContentType(); + case "dataschema" -> getDataSchema(); + case "subject" -> getSubject(); + case "time" -> getTime(); + default -> throw new IllegalArgumentException( + "The specified attribute name \"" + attributeName + "\" is not specified in version v1."); + }; + } + + @Override + public Object getExtension(final String s) { + if (s == null) { + throw new IllegalArgumentException("Extension name cannot be null"); + } + return this.extensions.get(s); + } + + @Override + public Set getExtensionNames() { + return this.extensions.keySet(); + } +} diff --git a/extensions/funqy/funqy-amazon-lambda/runtime/src/main/java/io/quarkus/funqy/lambda/model/kinesis/PipesKinesisEvent.java b/extensions/funqy/funqy-amazon-lambda/runtime/src/main/java/io/quarkus/funqy/lambda/model/kinesis/PipesKinesisEvent.java new file mode 100644 index 00000000000000..784eba9ded3428 --- /dev/null +++ b/extensions/funqy/funqy-amazon-lambda/runtime/src/main/java/io/quarkus/funqy/lambda/model/kinesis/PipesKinesisEvent.java @@ -0,0 +1,63 @@ +package io.quarkus.funqy.lambda.model.kinesis; + +import com.amazonaws.services.lambda.runtime.events.KinesisEvent; + +/** + * For some reason AWS decided to flatten the model in EventBridge pipes for Kinesis. So there is no additional + * property called kinesis. We use the Record model and add the missing properties. Sadly I could not find a + * dedicated model for Kinesis in Pipes. So it is a combination of + * {@link com.amazonaws.services.lambda.runtime.events.KinesisEvent.KinesisEventRecord} and {@link KinesisEvent.Record} + */ +public class PipesKinesisEvent extends KinesisEvent.Record { + + private static final long serialVersionUID = 4365865918351932405L; + + private String eventSource; + + private String eventID; + + private String invokeIdentityArn; + + private String eventName; + + private String eventVersion; + + private String eventSourceARN; + + private String awsRegion; + + public PipesKinesisEvent setEventSource(final String eventSource) { + this.eventSource = eventSource; + return this; + } + + public PipesKinesisEvent setEventID(final String eventID) { + this.eventID = eventID; + return this; + } + + public PipesKinesisEvent setInvokeIdentityArn(final String invokeIdentityArn) { + this.invokeIdentityArn = invokeIdentityArn; + return this; + } + + public PipesKinesisEvent setEventName(final String eventName) { + this.eventName = eventName; + return this; + } + + public PipesKinesisEvent setEventVersion(final String eventVersion) { + this.eventVersion = eventVersion; + return this; + } + + public PipesKinesisEvent setEventSourceARN(final String eventSourceARN) { + this.eventSourceARN = eventSourceARN; + return this; + } + + public PipesKinesisEvent setAwsRegion(final String awsRegion) { + this.awsRegion = awsRegion; + return this; + } +} diff --git a/extensions/funqy/funqy-amazon-lambda/runtime/src/main/java/io/quarkus/funqy/lambda/model/pipes/BatchItemFailures.java b/extensions/funqy/funqy-amazon-lambda/runtime/src/main/java/io/quarkus/funqy/lambda/model/pipes/BatchItemFailures.java new file mode 100644 index 00000000000000..99110edfa4ecf3 --- /dev/null +++ b/extensions/funqy/funqy-amazon-lambda/runtime/src/main/java/io/quarkus/funqy/lambda/model/pipes/BatchItemFailures.java @@ -0,0 +1,14 @@ +package io.quarkus.funqy.lambda.model.pipes; + +public class BatchItemFailures { + + private final String itemIdentifier; + + public BatchItemFailures(final String itemIdentifier) { + this.itemIdentifier = itemIdentifier; + } + + public String getItemIdentifier() { + return itemIdentifier; + } +} diff --git a/extensions/funqy/funqy-amazon-lambda/runtime/src/main/java/io/quarkus/funqy/lambda/model/pipes/Response.java b/extensions/funqy/funqy-amazon-lambda/runtime/src/main/java/io/quarkus/funqy/lambda/model/pipes/Response.java new file mode 100644 index 00000000000000..20b817d1b91f28 --- /dev/null +++ b/extensions/funqy/funqy-amazon-lambda/runtime/src/main/java/io/quarkus/funqy/lambda/model/pipes/Response.java @@ -0,0 +1,16 @@ +package io.quarkus.funqy.lambda.model.pipes; + +import java.util.List; + +public class Response { + + private final List batchItemFailures; + + public Response(final List batchItemFailures) { + this.batchItemFailures = batchItemFailures; + } + + public List getBatchItemFailures() { + return batchItemFailures; + } +}