diff --git a/extensions/funqy/funqy-amazon-lambda/deployment/src/main/java/io/quarkus/funqy/deployment/bindings/FunqyLambdaBuildStep.java b/extensions/funqy/funqy-amazon-lambda/deployment/src/main/java/io/quarkus/funqy/deployment/bindings/FunqyLambdaBuildStep.java index 9158ec43dcbbb..9633e155a1d49 100644 --- a/extensions/funqy/funqy-amazon-lambda/deployment/src/main/java/io/quarkus/funqy/deployment/bindings/FunqyLambdaBuildStep.java +++ b/extensions/funqy/funqy-amazon-lambda/deployment/src/main/java/io/quarkus/funqy/deployment/bindings/FunqyLambdaBuildStep.java @@ -49,7 +49,8 @@ public void init(List functions, @BuildStep @Record(RUNTIME_INIT) - public RuntimeComplete choose(FunqyConfig config, FunqyAmazonConfig amazonConfig, + public RuntimeComplete choose(FunqyConfig config, + FunqyAmazonConfig amazonConfig, FunqyLambdaBindingRecorder recorder) { recorder.chooseInvoker(config, amazonConfig); return new RuntimeComplete(); diff --git a/extensions/funqy/funqy-amazon-lambda/deployment/src/test/java/io/quarkus/funqy/test/TestFunctions.java b/extensions/funqy/funqy-amazon-lambda/deployment/src/test/java/io/quarkus/funqy/test/TestFunctions.java index f8cb6f8582e29..18a47c49c37ce 100644 --- a/extensions/funqy/funqy-amazon-lambda/deployment/src/test/java/io/quarkus/funqy/test/TestFunctions.java +++ b/extensions/funqy/funqy-amazon-lambda/deployment/src/test/java/io/quarkus/funqy/test/TestFunctions.java @@ -39,6 +39,7 @@ public Uni snsFunction(SNSEvent.SNSRecord msg) { @Funq("cloudevents-function") public Uni cloudEventsFunction(CloudEvent msg) { + // Due to jackson deserialization the base64 decoding already happened. if (new String(msg.getData().toBytes(), StandardCharsets.UTF_8).contains("true")) { return Uni.createFrom().failure(new IllegalArgumentException("This is an expected error.")); } @@ -47,6 +48,7 @@ public Uni cloudEventsFunction(CloudEvent msg) { @Funq("kinesis-function") public Uni kinesisFunction(KinesisEvent.Record msg) { + // Due to jackson deserialization the base64 decoding already happened. if (StandardCharsets.UTF_8.decode(msg.getData()).toString().contains("true")) { return Uni.createFrom().failure(new IllegalArgumentException("This is an expected error.")); } diff --git a/extensions/funqy/funqy-amazon-lambda/deployment/src/test/resources/events/dynamodb/ok.json b/extensions/funqy/funqy-amazon-lambda/deployment/src/test/resources/events/dynamodb/ok.json index 4cc6db92f700e..722e5aa15662c 100644 --- a/extensions/funqy/funqy-amazon-lambda/deployment/src/test/resources/events/dynamodb/ok.json +++ b/extensions/funqy/funqy-amazon-lambda/deployment/src/test/resources/events/dynamodb/ok.json @@ -4,6 +4,7 @@ "eventID": "1", "eventVersion": "1.0", "dynamodb": { + "ApproximateCreationDateTime": 1719318377.0, "Keys": { "Id": { "N": "1" @@ -33,6 +34,7 @@ "eventID": "2", "eventVersion": "1.0", "dynamodb": { + "ApproximateCreationDateTime": 1719318377.0, "NewImage": { "Message": { "S": "fail" diff --git a/extensions/funqy/funqy-amazon-lambda/runtime/src/main/java/io/quarkus/funqy/lambda/FunqyLambdaBindingRecorder.java b/extensions/funqy/funqy-amazon-lambda/runtime/src/main/java/io/quarkus/funqy/lambda/FunqyLambdaBindingRecorder.java index bb4fba814d073..1faa97887076b 100644 --- a/extensions/funqy/funqy-amazon-lambda/runtime/src/main/java/io/quarkus/funqy/lambda/FunqyLambdaBindingRecorder.java +++ b/extensions/funqy/funqy-amazon-lambda/runtime/src/main/java/io/quarkus/funqy/lambda/FunqyLambdaBindingRecorder.java @@ -11,19 +11,20 @@ import com.fasterxml.jackson.databind.ObjectMapper; import com.fasterxml.jackson.databind.ObjectReader; import com.fasterxml.jackson.databind.ObjectWriter; -import com.fasterxml.jackson.databind.module.SimpleModule; import io.quarkus.amazon.lambda.runtime.AbstractLambdaPollLoop; import io.quarkus.amazon.lambda.runtime.AmazonLambdaContext; import io.quarkus.amazon.lambda.runtime.AmazonLambdaMapperRecorder; +import io.quarkus.amazon.lambda.runtime.JacksonInputReader; +import io.quarkus.amazon.lambda.runtime.JacksonOutputWriter; import io.quarkus.amazon.lambda.runtime.LambdaInputReader; import io.quarkus.amazon.lambda.runtime.LambdaOutputWriter; import io.quarkus.arc.ManagedContext; import io.quarkus.arc.runtime.BeanContainer; import io.quarkus.funqy.lambda.config.FunqyAmazonBuildTimeConfig; import io.quarkus.funqy.lambda.config.FunqyAmazonConfig; -import io.quarkus.funqy.lambda.event.AwsModule; -import io.quarkus.funqy.lambda.event.EventDeserializer; +import io.quarkus.funqy.lambda.event.AwsEventInputReader; +import io.quarkus.funqy.lambda.event.AwsEventOutputWriter; import io.quarkus.funqy.lambda.event.EventProcessor; import io.quarkus.funqy.lambda.model.FunqyMethod; import io.quarkus.funqy.runtime.FunctionConstructor; @@ -45,35 +46,28 @@ public class FunqyLambdaBindingRecorder { private static FunctionInvoker invoker; private static BeanContainer beanContainer; + private static LambdaInputReader reader; + private static LambdaOutputWriter writer; private static EventProcessor eventProcessor; + private static FunqyAmazonBuildTimeConfig amazonBuildTimeConfig; public void init(BeanContainer bc, FunqyAmazonBuildTimeConfig buildTimeConfig) { beanContainer = bc; FunctionConstructor.CONTAINER = bc; - // We create a copy, because we register a custom deserializer for everything. - ObjectMapper objectMapper = AmazonLambdaMapperRecorder.objectMapper.copy(); - EventDeserializer eventDeserializer = new EventDeserializer(buildTimeConfig); - final SimpleModule simpleModule = new AwsModule(); - simpleModule.addDeserializer(Object.class, eventDeserializer); - objectMapper.registerModule(simpleModule); + amazonBuildTimeConfig = buildTimeConfig; + ObjectMapper objectMapper = AmazonLambdaMapperRecorder.objectMapper; for (FunctionInvoker invoker : FunctionRecorder.registry.invokers()) { - ObjectReader reader = null; - JavaType javaInputType = null; if (invoker.hasInput()) { - javaInputType = objectMapper.constructType(invoker.getInputType()); - reader = objectMapper.readerFor(javaInputType); + JavaType javaInputType = objectMapper.constructType(invoker.getInputType()); + ObjectReader reader = objectMapper.readerFor(javaInputType); + invoker.getBindingContext().put(ObjectReader.class.getName(), reader); } - ObjectWriter writer = null; - JavaType javaOutputType = null; if (invoker.hasOutput()) { - javaOutputType = objectMapper.constructType(invoker.getOutputType()); - writer = objectMapper.writerFor(javaOutputType); + JavaType javaOutputType = objectMapper.constructType(invoker.getOutputType()); + ObjectWriter writer = objectMapper.writerFor(javaOutputType); + invoker.getBindingContext().put(ObjectWriter.class.getName(), writer); } - invoker.getBindingContext().put(EventProcessor.class.getName(), - new EventProcessor(objectMapper, eventDeserializer, - new FunqyMethod(reader, writer, javaInputType, javaOutputType), - buildTimeConfig)); } } @@ -92,8 +86,34 @@ public void chooseInvoker(FunqyConfig config, FunqyAmazonConfig amazonConfig) { } else { invoker = FunctionRecorder.registry.invokers().iterator().next(); } - eventProcessor = (EventProcessor) invoker.getBindingContext().get(EventProcessor.class.getName()); - eventProcessor.init(amazonConfig); + + ObjectReader objectReader = null; + if (invoker.hasInput()) { + objectReader = (ObjectReader) invoker.getBindingContext().get(ObjectReader.class.getName()); + + if (amazonBuildTimeConfig.advancedEventHandling().enabled()) { + // We create a copy, because the mapper will be reconfigured for the advanced event handling, + // and we do not want to adjust the ObjectMapper, which is available in arc context. + ObjectMapper objectMapper = AmazonLambdaMapperRecorder.objectMapper.copy(); + reader = new AwsEventInputReader(objectMapper, objectReader, amazonBuildTimeConfig); + } else { + reader = new JacksonInputReader(objectReader); + } + + } + if (invoker.hasOutput()) { + ObjectWriter objectWriter = (ObjectWriter) invoker.getBindingContext().get(ObjectWriter.class.getName()); + + if (!amazonBuildTimeConfig.advancedEventHandling().enabled()) { + writer = new JacksonOutputWriter(objectWriter); + } + } + if (amazonBuildTimeConfig.advancedEventHandling().enabled()) { + ObjectMapper objectMapper = AmazonLambdaMapperRecorder.objectMapper.copy(); + writer = new AwsEventOutputWriter(objectMapper); + + eventProcessor = new EventProcessor(objectReader, amazonBuildTimeConfig, amazonConfig); + } } /** @@ -109,7 +129,16 @@ public void chooseInvoker(FunqyConfig config, FunqyAmazonConfig amazonConfig) { * Is thrown in case the (de)serialization fails */ public static void handle(InputStream inputStream, OutputStream outputStream, Context context) throws IOException { - eventProcessor.handle(inputStream, outputStream, FunqyLambdaBindingRecorder::dispatch, context); + Object input = null; + if (invoker.hasInput()) { + input = reader.readValue(inputStream); + } + FunqyServerResponse response = dispatch(input, context); + + Object value = response.getOutput().await().indefinitely(); + if (value != null) { + writer.writeValue(outputStream, value); + } } @SuppressWarnings("rawtypes") @@ -119,34 +148,43 @@ public void startPollLoop(ShutdownContext context, LaunchMode launchMode) { @Override protected Object processRequest(Object input, AmazonLambdaContext context) throws Exception { - throw new RuntimeException("Unreachable"); + FunqyServerResponse response = dispatch(input, context); + return response.getOutput().await().indefinitely(); } @Override protected LambdaInputReader getInputReader() { - throw new RuntimeException("Unreachable"); + return reader; } @Override protected LambdaOutputWriter getOutputWriter() { - throw new RuntimeException("Unreachable"); + return writer; } @Override protected boolean isStream() { - return true; + return false; } @Override protected void processRequest(InputStream input, OutputStream output, AmazonLambdaContext context) throws Exception { - handle(input, output, context); + throw new RuntimeException("Unreachable!"); } }; loop.startPollLoop(context); } + private static FunqyServerResponse dispatch(Object input, Context context) throws IOException { + if (eventProcessor != null) { + return eventProcessor.handle(input, FunqyLambdaBindingRecorder::dispatch, context); + } else { + return dispatch(input); + } + } + private static FunqyServerResponse dispatch(Object input) { ManagedContext requestContext = beanContainer.requestContext(); requestContext.activate(); diff --git a/extensions/funqy/funqy-amazon-lambda/runtime/src/main/java/io/quarkus/funqy/lambda/event/AwsEventInputReader.java b/extensions/funqy/funqy-amazon-lambda/runtime/src/main/java/io/quarkus/funqy/lambda/event/AwsEventInputReader.java new file mode 100644 index 0000000000000..824802a1ebb12 --- /dev/null +++ b/extensions/funqy/funqy-amazon-lambda/runtime/src/main/java/io/quarkus/funqy/lambda/event/AwsEventInputReader.java @@ -0,0 +1,179 @@ +package io.quarkus.funqy.lambda.event; + +import java.io.IOException; +import java.io.InputStream; +import java.util.Date; +import java.util.List; + +import com.amazonaws.services.lambda.runtime.events.DynamodbEvent; +import com.amazonaws.services.lambda.runtime.events.KinesisEvent; +import com.amazonaws.services.lambda.runtime.events.SNSEvent; +import com.amazonaws.services.lambda.runtime.events.SQSEvent; +import com.fasterxml.jackson.core.TreeNode; +import com.fasterxml.jackson.core.type.TypeReference; +import com.fasterxml.jackson.databind.DeserializationFeature; +import com.fasterxml.jackson.databind.JsonNode; +import com.fasterxml.jackson.databind.ObjectMapper; +import com.fasterxml.jackson.databind.ObjectReader; +import com.fasterxml.jackson.databind.module.SimpleModule; +import com.fasterxml.jackson.databind.node.ArrayNode; +import com.fasterxml.jackson.databind.node.ObjectNode; + +import io.cloudevents.SpecVersion; +import io.quarkus.amazon.lambda.runtime.LambdaInputReader; +import io.quarkus.funqy.lambda.config.FunqyAmazonBuildTimeConfig; +import io.quarkus.funqy.lambda.model.cloudevents.CloudEventV1; +import io.quarkus.funqy.lambda.model.kinesis.PipesKinesisEvent; + +public class AwsEventInputReader implements LambdaInputReader { + + private static final String SQS_EVENT_SOURCE = "aws:sqs"; + private static final String SNS_EVENT_SOURCE = "aws:sns"; + private static final String KINESIS_EVENT_SOURCE = "aws:kinesis"; + private static final String DYNAMODB_EVENT_SOURCE = "aws:dynamodb"; + + final ObjectMapper mapper; + final FunqyAmazonBuildTimeConfig amazonBuildTimeConfig; + final ObjectReader reader; + + public AwsEventInputReader(ObjectMapper mapper, ObjectReader reader, + FunqyAmazonBuildTimeConfig amazonBuildTimeConfig) { + // configure the mapper for advanced event handling + final SimpleModule simpleModule = new SimpleModule(); + simpleModule.addDeserializer(Date.class, new DateDeserializer()); + mapper.registerModule(simpleModule); + mapper.disable(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES); + + this.mapper = mapper; + this.amazonBuildTimeConfig = amazonBuildTimeConfig; + this.reader = reader; + } + + @Override + public Object readValue(InputStream is) throws IOException { + + final JsonNode rootNode = mapper.readTree(is); + + if (amazonBuildTimeConfig.advancedEventHandling().enabled()) { + if (rootNode.isObject() || rootNode.isArray()) { + if (rootNode.isObject()) { + // object + ObjectNode object = (ObjectNode) rootNode; + + if (object.has("Records") && object.get("Records").isArray()) { + // We need to look into the first record entry, to distinguish the different types. + for (JsonNode record : object.get("Records")) { + return deserializeEvent(record, object); + } + } + } else { + // array. This happens in case of AWS EventBridge usage, and is also the only way to enable + // CloudEvents usage + ArrayNode array = (ArrayNode) rootNode; + for (JsonNode record : array) { + return deserializeEvent(record, array); + } + } + } + } + // We could not identify the event based on the content. Fallback to deserializing the funq method input type. + return deserializeFunqReturnType(rootNode); + } + + public Object deserializeEvent(JsonNode record, JsonNode rootNode) throws IOException { + Object eventClass = getEventType(record, rootNode); + + if (eventClass != null) { + if (eventClass instanceof Class clazz) { + return mapper.convertValue(rootNode, clazz); + } else if (eventClass instanceof TypeReference typeReference) { + return mapper.convertValue(rootNode, typeReference); + } + } + // We could not identify the event based on the content. Fallback to deserializing the funq method input type. + return deserializeFunqReturnType(rootNode); + } + + private Object getEventType(JsonNode record, JsonNode object) { + String eventSource = getEventSource(record); + + if (eventSource == null) { + eventSource = "default"; + } + + // See: https://docs.aws.amazon.com/lambda/latest/dg/lambda-services.html + // and for Pipes: https://docs.aws.amazon.com/eventbridge/latest/userguide/eb-pipes-event-source.html + Class eventType = null; + switch (eventSource) { + case SQS_EVENT_SOURCE: + if (object.isObject()) { + eventType = SQSEvent.class; + } else if (object.isArray()) { + // EventBridge Pipes + return new TypeReference>() { + }; + } + break; + case SNS_EVENT_SOURCE: + eventType = SNSEvent.class; + break; + case KINESIS_EVENT_SOURCE: + // Exclude Kinesis time window events. This would require to send a responsible with a state. + // This is very specific to AWS and maybe not the way funqy wants to handle things. + if (object.isObject() && !object.has("window")) { + eventType = KinesisEvent.class; + } else if (object.isArray()) { + // EventBridge Pipes + return new TypeReference>() { + }; + } + break; + case DYNAMODB_EVENT_SOURCE: + if (object.isObject()) { + eventType = DynamodbEvent.class; + } else if (object.isArray()) { + // EventBridge Pipes + return new TypeReference>() { + }; + } + break; + default: + break; + } + if (eventType == null && isCloudEvent(record)) { + return new TypeReference>() { + }; + } + return eventType; + } + + private boolean isCloudEvent(final JsonNode record) { + // this is the best guess we can do. We check for required attributes: + // https://github.com/cloudevents/spec/blob/v1.0.2/cloudevents/spec.md#required-attributes + // A more tolerant way to check the type. We do not want the process to fail. We can fall back to + // the best guess logic. + return record.has("specversion") && record.get("specversion").isTextual() && + SpecVersion.V1.toString().equals(record.get("specversion").asText()) && + record.has("type"); + } + + private String getEventSource(JsonNode record) { + if (record.has("eventSource") && record.get("eventSource").isTextual()) { + return record.get("eventSource").asText(); + } + + // Unsure. In the AWS SNS documentation the key starts with a capital letter. I assume this is a mistake, + // but it should not hurt to be that tolerant as well. + if (record.has("EventSource") && record.get("EventSource").isTextual()) { + return record.get("EventSource").asText(); + } + return null; + } + + private Object deserializeFunqReturnType(TreeNode node) throws IOException { + if (reader != null) { + return reader.readValue(node.traverse()); + } + return null; + } +} diff --git a/extensions/funqy/funqy-amazon-lambda/runtime/src/main/java/io/quarkus/funqy/lambda/event/AwsEventOutputWriter.java b/extensions/funqy/funqy-amazon-lambda/runtime/src/main/java/io/quarkus/funqy/lambda/event/AwsEventOutputWriter.java new file mode 100644 index 0000000000000..c28e60fe8fa3f --- /dev/null +++ b/extensions/funqy/funqy-amazon-lambda/runtime/src/main/java/io/quarkus/funqy/lambda/event/AwsEventOutputWriter.java @@ -0,0 +1,26 @@ +package io.quarkus.funqy.lambda.event; + +import java.io.IOException; +import java.io.OutputStream; + +import com.fasterxml.jackson.databind.ObjectMapper; + +import io.quarkus.amazon.lambda.runtime.LambdaOutputWriter; + +/** + * Responsible for serializing the different data models of the events + */ +public class AwsEventOutputWriter implements LambdaOutputWriter { + + final ObjectMapper mapper; + + public AwsEventOutputWriter(ObjectMapper mapper) { + // At the moment no special configuration is needed. But we need an ObjectMapper due to different models. + this.mapper = mapper; + } + + @Override + public void writeValue(final OutputStream os, final Object obj) throws IOException { + mapper.writeValue(os, obj); + } +} diff --git a/extensions/funqy/funqy-amazon-lambda/runtime/src/main/java/io/quarkus/funqy/lambda/event/AwsModule.java b/extensions/funqy/funqy-amazon-lambda/runtime/src/main/java/io/quarkus/funqy/lambda/event/AwsModule.java deleted file mode 100644 index c75002af6af74..0000000000000 --- a/extensions/funqy/funqy-amazon-lambda/runtime/src/main/java/io/quarkus/funqy/lambda/event/AwsModule.java +++ /dev/null @@ -1,42 +0,0 @@ -package io.quarkus.funqy.lambda.event; - -import java.util.Iterator; - -import com.amazonaws.services.lambda.runtime.events.models.kinesis.Record; -import com.fasterxml.jackson.databind.BeanDescription; -import com.fasterxml.jackson.databind.DeserializationConfig; -import com.fasterxml.jackson.databind.deser.BeanDeserializerBuilder; -import com.fasterxml.jackson.databind.deser.BeanDeserializerModifier; -import com.fasterxml.jackson.databind.deser.SettableBeanProperty; -import com.fasterxml.jackson.databind.module.SimpleModule; - -import io.quarkus.funqy.lambda.event.kinesis.DateDeserializer; - -public class AwsModule extends SimpleModule { - - final String DATE_PROPERTY_NAME = "approximateArrivalTimestamp"; - - public AwsModule() { - this.setDeserializerModifier(new BeanDeserializerModifier() { - - @Override - public BeanDeserializerBuilder updateBuilder(final DeserializationConfig config, - final BeanDescription beanDesc, final BeanDeserializerBuilder builder) { - - for (final Iterator iterator = builder.getProperties(); iterator.hasNext();) { - SettableBeanProperty property = iterator.next(); - - // Kinesis records need some special treatment. The approximateArrivalTimestamp - // cannot be deserialized that easily. - if (Record.class.isAssignableFrom(property.getMember().getDeclaringClass()) - && DATE_PROPERTY_NAME.equalsIgnoreCase(property.getName())) { - final DateDeserializer deserializer = new DateDeserializer(); - property = property.withValueDeserializer(deserializer); - builder.addOrReplaceProperty(property, true); - } - } - return builder; - } - }); - } -} diff --git a/extensions/funqy/funqy-amazon-lambda/runtime/src/main/java/io/quarkus/funqy/lambda/event/DateDeserializer.java b/extensions/funqy/funqy-amazon-lambda/runtime/src/main/java/io/quarkus/funqy/lambda/event/DateDeserializer.java new file mode 100644 index 0000000000000..be4d61c9f3946 --- /dev/null +++ b/extensions/funqy/funqy-amazon-lambda/runtime/src/main/java/io/quarkus/funqy/lambda/event/DateDeserializer.java @@ -0,0 +1,22 @@ +package io.quarkus.funqy.lambda.event; + +import java.io.IOException; +import java.util.Date; + +import com.fasterxml.jackson.core.JsonParser; +import com.fasterxml.jackson.databind.DeserializationContext; +import com.fasterxml.jackson.databind.JsonDeserializer; + +/** + * AWS uses double values. E.g. 1719318377.0 + * Therefore, a dedicated deserializer is needed + */ +public class DateDeserializer extends JsonDeserializer { + + @Override + public Date deserialize(JsonParser jsonParser, DeserializationContext ctxt) throws IOException { + double timestamp = jsonParser.getDoubleValue(); + long milliseconds = (long) (timestamp * 1000); + return new Date(milliseconds); + } +} diff --git a/extensions/funqy/funqy-amazon-lambda/runtime/src/main/java/io/quarkus/funqy/lambda/event/EventDeserializer.java b/extensions/funqy/funqy-amazon-lambda/runtime/src/main/java/io/quarkus/funqy/lambda/event/EventDeserializer.java deleted file mode 100644 index 44e0362c5b7c9..0000000000000 --- a/extensions/funqy/funqy-amazon-lambda/runtime/src/main/java/io/quarkus/funqy/lambda/event/EventDeserializer.java +++ /dev/null @@ -1,176 +0,0 @@ -package io.quarkus.funqy.lambda.event; - -import java.io.IOException; -import java.util.List; - -import com.amazonaws.services.lambda.runtime.events.DynamodbEvent; -import com.amazonaws.services.lambda.runtime.events.KinesisEvent; -import com.amazonaws.services.lambda.runtime.events.SNSEvent; -import com.amazonaws.services.lambda.runtime.events.SQSEvent; -import com.fasterxml.jackson.core.JsonParser; -import com.fasterxml.jackson.core.ObjectCodec; -import com.fasterxml.jackson.core.TreeNode; -import com.fasterxml.jackson.core.type.TypeReference; -import com.fasterxml.jackson.databind.DeserializationContext; -import com.fasterxml.jackson.databind.JsonDeserializer; -import com.fasterxml.jackson.databind.JsonNode; -import com.fasterxml.jackson.databind.ObjectMapper; -import com.fasterxml.jackson.databind.ObjectReader; -import com.fasterxml.jackson.databind.node.ArrayNode; -import com.fasterxml.jackson.databind.node.ObjectNode; - -import io.cloudevents.SpecVersion; -import io.quarkus.funqy.lambda.config.FunqyAmazonBuildTimeConfig; -import io.quarkus.funqy.lambda.model.cloudevents.CloudEventV1; -import io.quarkus.funqy.lambda.model.kinesis.PipesKinesisEvent; - -public class EventDeserializer extends JsonDeserializer { - - private final FunqyAmazonBuildTimeConfig buildTimeConfig; - - private ObjectReader funqyMethodObjReader; - private ObjectMapper objectMapper; - - public EventDeserializer(FunqyAmazonBuildTimeConfig buildTimeConfig) { - this.buildTimeConfig = buildTimeConfig; - } - - @Override - public Object deserialize(JsonParser jsonParser, DeserializationContext ctx) - throws IOException { - - ObjectCodec codec = jsonParser.getCodec(); - - TreeNode rootNode = codec.readTree(jsonParser); - - if (buildTimeConfig.advancedEventHandling().enabled() && (rootNode.isObject() || rootNode.isArray())) { - - if (rootNode.isObject()) { - // object - ObjectNode object = (ObjectNode) rootNode; - - if (object.has("Records") && object.get("Records").isArray()) { - // We need to look into the first record entry, to distinguish the different types. - for (JsonNode record : object.get("Records")) { - return deserializeEvent(jsonParser, record, object, codec); - } - } - } else { - // array. This happens in case of AWS EventBridge usage and is also the only way to enable - // CloudEvents usage - ArrayNode array = (ArrayNode) rootNode; - for (JsonNode record : array) { - return deserializeEvent(jsonParser, record, array, codec); - } - } - } - // We have no clue what it is. Fallback to serializing the output of the funqy method - return deserializeFunqReturnType(rootNode); - } - - public Object deserializeEvent(final JsonParser jsonParser, JsonNode record, JsonNode rootNode, ObjectCodec codec) - throws IOException { - - Object eventClass = getEventType(record, rootNode); - - if (eventClass != null) { - if (eventClass instanceof Class clazz) { - return objectMapper.convertValue(rootNode, clazz); - } else if (eventClass instanceof TypeReference typeReference) { - return objectMapper.convertValue(rootNode, typeReference); - } - } - - return deserializeFunqReturnType(rootNode); - } - - private Object getEventType(JsonNode record, JsonNode object) { - String eventSource = getEventSource(record); - - if (eventSource == null) { - eventSource = "default"; - } - - // See: https://docs.aws.amazon.com/lambda/latest/dg/lambda-services.html - // and for Pipes: https://docs.aws.amazon.com/eventbridge/latest/userguide/eb-pipes-event-source.html - Class eventType = null; - switch (eventSource) { - case "aws:sqs": - if (object.isObject()) { - eventType = SQSEvent.class; - } else { - // EventBridge Pipes - return new TypeReference>() { - }; - } - break; - case "aws:sns": - eventType = SNSEvent.class; - break; - case "aws:kinesis": - // Exclude Kinesis time window events. This would require to send a responsible with a state. - // This is very specific to AWS and maybe not the way funqy wants to handle things. - if (object.isObject() && !object.has("window")) { - eventType = KinesisEvent.class; - } else if (object.isArray()) { - // EventBridge Pipes - return new TypeReference>() { - }; - } - break; - case "aws:dynamodb": - if (object.isObject()) { - eventType = DynamodbEvent.class; - } else { - // EventBridge Pipes - return new TypeReference>() { - }; - } - break; - default: - break; - } - if (eventType == null && isCloudEvent(record)) { - return new TypeReference>() { - }; - } - return eventType; - } - - private boolean isCloudEvent(final JsonNode record) { - // this is the best guess we can do. We check for required attributes: - // https://github.com/cloudevents/spec/blob/v1.0.2/cloudevents/spec.md#required-attributes - // A more tolerant way to check the type. We do not want to process to fail here. We can fall back to - // the best guess parting - return record.has("specversion") && record.get("specversion").isTextual() && - SpecVersion.V1.toString().equals(record.get("specversion").asText()) && - record.has("type"); - } - - private String getEventSource(JsonNode record) { - if (record.has("eventSource") && record.get("eventSource").isTextual()) { - return record.get("eventSource").asText(); - } - - // Unsure. In the AWS documentation SNS uses capital keys. I assume this is a mistake. - if (record.has("EventSource") && record.get("EventSource").isTextual()) { - return record.get("EventSource").asText(); - } - return null; - } - - private Object deserializeFunqReturnType(TreeNode node) throws IOException { - if (funqyMethodObjReader != null) { - return funqyMethodObjReader.readValue(node.traverse()); - } - return null; - } - - public void setFunqyMethodObjReader(ObjectReader funqyMethodObjReader) { - this.funqyMethodObjReader = funqyMethodObjReader; - } - - public void setObjectMapper(final ObjectMapper objectMapper) { - this.objectMapper = objectMapper; - } -} diff --git a/extensions/funqy/funqy-amazon-lambda/runtime/src/main/java/io/quarkus/funqy/lambda/event/EventHandler.java b/extensions/funqy/funqy-amazon-lambda/runtime/src/main/java/io/quarkus/funqy/lambda/event/EventHandler.java index 1775904bd4352..7a90372c0b7f8 100644 --- a/extensions/funqy/funqy-amazon-lambda/runtime/src/main/java/io/quarkus/funqy/lambda/event/EventHandler.java +++ b/extensions/funqy/funqy-amazon-lambda/runtime/src/main/java/io/quarkus/funqy/lambda/event/EventHandler.java @@ -7,14 +7,49 @@ import io.quarkus.funqy.lambda.config.FunqyAmazonConfig; +/** + * This interface described how events should be handled + * @param type of the event + * @param type of the message + * @param type of the response + */ public interface EventHandler { + + /** + * Provides all messages from the event. Specially for events with multiple messages from a batch. + * @param event event to provide messages from + * @param amazonConfig config + * @return a stream of messages + */ Stream streamEvent(E event, FunqyAmazonConfig amazonConfig); + /** + * Get the identifier of a message. + * @param message message to extract the identifier from + * @param amazonConfig config + * @return the identifier + */ String getIdentifier(M message, FunqyAmazonConfig amazonConfig); + /** + * Get the body of a message as an {@link InputStream} + * @param message message to extract the body from + * @param amazonConfig config + * @return the body input stream + */ Supplier getBody(M message, FunqyAmazonConfig amazonConfig); + /** + * Create the response based on the collected failures. + * @param failures a list of message identifier, which failed + * @param amazonConfig config + * @return the created response + */ R createResponse(List failures, FunqyAmazonConfig amazonConfig); + /** + * The class of the message + * @return the class of the message + */ Class getMessageClass(); } diff --git a/extensions/funqy/funqy-amazon-lambda/runtime/src/main/java/io/quarkus/funqy/lambda/event/EventProcessor.java b/extensions/funqy/funqy-amazon-lambda/runtime/src/main/java/io/quarkus/funqy/lambda/event/EventProcessor.java index 30d27b9160661..90604d0c5a822 100644 --- a/extensions/funqy/funqy-amazon-lambda/runtime/src/main/java/io/quarkus/funqy/lambda/event/EventProcessor.java +++ b/extensions/funqy/funqy-amazon-lambda/runtime/src/main/java/io/quarkus/funqy/lambda/event/EventProcessor.java @@ -2,10 +2,10 @@ import java.io.IOException; import java.io.InputStream; -import java.io.OutputStream; import java.util.HashMap; import java.util.List; import java.util.Map; +import java.util.Optional; import java.util.function.Function; import java.util.function.Supplier; @@ -16,9 +16,9 @@ import com.amazonaws.services.lambda.runtime.events.KinesisEvent; import com.amazonaws.services.lambda.runtime.events.SNSEvent; import com.amazonaws.services.lambda.runtime.events.SQSEvent; -import com.fasterxml.jackson.databind.ObjectMapper; import com.fasterxml.jackson.databind.ObjectReader; +import io.quarkus.funqy.lambda.FunqyResponseImpl; import io.quarkus.funqy.lambda.config.FunqyAmazonBuildTimeConfig; import io.quarkus.funqy.lambda.config.FunqyAmazonConfig; import io.quarkus.funqy.lambda.event.cloudevents.CloudEventsHandler; @@ -29,7 +29,6 @@ import io.quarkus.funqy.lambda.event.sns.SnsEventHandler; import io.quarkus.funqy.lambda.event.sqs.PipesSqsEventHandler; import io.quarkus.funqy.lambda.event.sqs.SqsEventHandler; -import io.quarkus.funqy.lambda.model.FunqyMethod; import io.quarkus.funqy.lambda.model.cloudevents.CloudEventV1; import io.quarkus.funqy.lambda.model.kinesis.PipesKinesisEvent; import io.quarkus.funqy.runtime.FunqyServerResponse; @@ -39,21 +38,14 @@ public class EventProcessor { private static final Logger log = Logger.getLogger(EventProcessor.class); - private final EventDeserializer eventDeserializer; - private final ObjectMapper objectMapper; - private final ObjectReader reader; - private final FunqyMethod funqyMethod; + private final ObjectReader objectReader; + private final FunqyAmazonConfig config; private final Map, EventHandler> eventHandlers; - private FunqyAmazonConfig amazonConfig; - - public EventProcessor(ObjectMapper objectMapper, EventDeserializer eventDeserializer, - FunqyMethod funqyMethod, - FunqyAmazonBuildTimeConfig buildTimeConfig) { - this.objectMapper = objectMapper; - this.eventDeserializer = eventDeserializer; - this.reader = objectMapper.readerFor(Object.class); - this.funqyMethod = funqyMethod; + public EventProcessor(final ObjectReader objectReader, + final FunqyAmazonBuildTimeConfig buildTimeConfig, final FunqyAmazonConfig config) { + this.objectReader = objectReader; + this.config = config; this.eventHandlers = new HashMap<>(); if (buildTimeConfig.advancedEventHandling().enabled()) { @@ -68,35 +60,21 @@ public EventProcessor(ObjectMapper objectMapper, EventDeserializer eventDeserial } } - public void init(FunqyAmazonConfig amazonConfig) { - // This is a bit strange. We have some static values, which are initialized at some point in time. - // See FunqyLambdaBindingRecorder. Furthermore, we need to pass through the runtime config. - eventDeserializer.setFunqyMethodObjReader(funqyMethod.getReader().orElse(null)); - eventDeserializer.setObjectMapper(objectMapper); - this.amazonConfig = amazonConfig; - } - - public void handle(InputStream inputStream, OutputStream outputStream, + public FunqyServerResponse handle(Object event, Function dispatcher, Context context) throws IOException { - // event might be null - Object event = reader.readValue(inputStream); EventHandler handler = getHandler(event); + if (handler != null) { EventErrorHandler eventErrorHandler = new EventErrorHandler(); - Object value = handleEvent(handler, event, eventErrorHandler, dispatcher) - .await().indefinitely(); + FunqyResponseImpl funqyResponse = new FunqyResponseImpl(); + funqyResponse.setOutput(handleEvent(handler, event, eventErrorHandler, dispatcher)); + return funqyResponse; - if (value != null) { - objectMapper.writeValue(outputStream, value); - } } else { // Unknown event type. We do what Funqy normally did in the past. - FunqyServerResponse response = dispatcher.apply(event); - - Object value = response.getOutput().await().indefinitely(); - writeOutput(outputStream, value); + return dispatcher.apply(event); } } @@ -117,31 +95,36 @@ private Uni handleEvent(EventHandler handler, Object event EventErrorHandler eventErrorHandler, Function dispatcher) { // We collect all messages in a list first, so that we can execute them in parallel. - List> unis = handler.streamEvent((E) event, amazonConfig) + List> unis = handler.streamEvent((E) event, config) .map(msg -> handleMessage(handler, eventErrorHandler, dispatcher, msg)).toList(); return Uni.combine().all().unis(unis) .collectFailures().discardItems() .onFailure().recoverWithNull() - .replaceWith(() -> handler.createResponse(eventErrorHandler.getFailures(), amazonConfig)); + .replaceWith(() -> handler.createResponse(eventErrorHandler.getFailures(), config)); } private Uni handleMessage(final EventHandler handler, final EventErrorHandler eventErrorHandler, final Function dispatcher, final M msg) { try { - final boolean isSuitableType = funqyMethod.getInputType().map(type -> type.hasRawClass(handler.getMessageClass())) + // We check if the funqy method already uses the event model + final boolean isUsingEventModel = Optional.ofNullable(objectReader).map(ObjectReader::getValueType) + .map(type -> type.hasRawClass(handler.getMessageClass())) .orElse(false); Object input; - if (isSuitableType) { + if (isUsingEventModel) { + // If the funqy method is using the event model we do not need to deserialize the content input = msg; } else { - input = readMessageBody(handler.getBody(msg, amazonConfig)); + // The funqy method uses a custom model. We need to ask the handle to provide the content and then + // we deserialize it. + input = readMessageBody(handler.getBody(msg, config)); } FunqyServerResponse response = dispatcher.apply(input); - return eventErrorHandler.collectFailures(response.getOutput(), handler.getIdentifier(msg, amazonConfig)); + return eventErrorHandler.collectFailures(response.getOutput(), handler.getIdentifier(msg, config)); } catch (Throwable e) { log.errorv(e, """ Event could not be handled. This might happen, when the lambda is used with a not supported \ @@ -149,20 +132,14 @@ private Uni handleMessage(final EventHandler handler, fina manually. """); return eventErrorHandler.collectFailures(Uni.createFrom().failure(e), - handler.getIdentifier(msg, amazonConfig)); + handler.getIdentifier(msg, config)); } } private Object readMessageBody(Supplier is) throws IOException { - if (funqyMethod.getReader().isEmpty()) { + if (objectReader == null) { return null; } - return funqyMethod.getReader().get().readValue(is.get()); - } - - private void writeOutput(final OutputStream outputStream, final Object value) throws IOException { - if (funqyMethod.getWriter().isPresent() && value != null) { - funqyMethod.getWriter().get().writeValue(outputStream, value); - } + return objectReader.readValue(is.get()); } } diff --git a/extensions/funqy/funqy-amazon-lambda/runtime/src/main/java/io/quarkus/funqy/lambda/event/kinesis/DateDeserializer.java b/extensions/funqy/funqy-amazon-lambda/runtime/src/main/java/io/quarkus/funqy/lambda/event/kinesis/DateDeserializer.java deleted file mode 100644 index 07f19bbfda6a8..0000000000000 --- a/extensions/funqy/funqy-amazon-lambda/runtime/src/main/java/io/quarkus/funqy/lambda/event/kinesis/DateDeserializer.java +++ /dev/null @@ -1,23 +0,0 @@ -package io.quarkus.funqy.lambda.event.kinesis; - -import java.io.IOException; -import java.util.Date; - -import com.fasterxml.jackson.core.JsonParser; -import com.fasterxml.jackson.databind.DeserializationContext; -import com.fasterxml.jackson.databind.JsonDeserializer; - -public class DateDeserializer extends JsonDeserializer { - - @Override - public Date deserialize(JsonParser jsonParser, DeserializationContext deserializationContext) throws IOException { - String fieldName = jsonParser.getCurrentName(); - if ("approximateArrivalTimestamp".equals(fieldName)) { - double timestamp = jsonParser.getDoubleValue(); - long milliseconds = (long) (timestamp * 1000); - return new Date(milliseconds); - } - // For other properties, delegate to default deserialization - return (Date) deserializationContext.handleUnexpectedToken(Date.class, jsonParser); - } -} diff --git a/extensions/funqy/funqy-amazon-lambda/runtime/src/main/java/io/quarkus/funqy/lambda/event/kinesis/KinesisEventHandler.java b/extensions/funqy/funqy-amazon-lambda/runtime/src/main/java/io/quarkus/funqy/lambda/event/kinesis/KinesisEventHandler.java index bf70838bbbb1b..ba9e2c8d7a33e 100644 --- a/extensions/funqy/funqy-amazon-lambda/runtime/src/main/java/io/quarkus/funqy/lambda/event/kinesis/KinesisEventHandler.java +++ b/extensions/funqy/funqy-amazon-lambda/runtime/src/main/java/io/quarkus/funqy/lambda/event/kinesis/KinesisEventHandler.java @@ -15,27 +15,27 @@ import io.quarkus.funqy.lambda.config.FunqyAmazonConfig; import io.quarkus.funqy.lambda.event.EventHandler; -public class KinesisEventHandler implements EventHandler { +public class KinesisEventHandler implements EventHandler { @Override - public Stream streamEvent(KinesisEvent event, FunqyAmazonConfig amazonConfig) { + public Stream streamEvent(KinesisEvent event, FunqyAmazonConfig amazonConfig) { if (event == null) { return Stream.empty(); } - return event.getRecords().stream(); + return event.getRecords().stream().map(KinesisEventRecord::getKinesis); } @Override - public String getIdentifier(KinesisEventRecord message, FunqyAmazonConfig amazonConfig) { - return message.getKinesis().getSequenceNumber(); + public String getIdentifier(KinesisEvent.Record message, FunqyAmazonConfig amazonConfig) { + return message.getSequenceNumber(); } @Override - public Supplier getBody(KinesisEventRecord message, FunqyAmazonConfig amazonConfig) { - if (message.getKinesis().getData() == null) { + public Supplier getBody(KinesisEvent.Record message, FunqyAmazonConfig amazonConfig) { + if (message.getData() == null) { return ByteArrayInputStream::nullInputStream; } - return () -> new ByteBufferBackedInputStream(message.getKinesis().getData()); + return () -> new ByteBufferBackedInputStream(message.getData()); } @Override @@ -48,7 +48,7 @@ public StreamsEventResponse createResponse(List failures, FunqyAmazonCon } @Override - public Class getMessageClass() { - return KinesisEventRecord.class; + public Class getMessageClass() { + return KinesisEvent.Record.class; } } diff --git a/extensions/funqy/funqy-amazon-lambda/runtime/src/main/java/io/quarkus/funqy/lambda/event/kinesis/PipesKinesisEventHandler.java b/extensions/funqy/funqy-amazon-lambda/runtime/src/main/java/io/quarkus/funqy/lambda/event/kinesis/PipesKinesisEventHandler.java index 20205d2903d72..89712d4e7e849 100644 --- a/extensions/funqy/funqy-amazon-lambda/runtime/src/main/java/io/quarkus/funqy/lambda/event/kinesis/PipesKinesisEventHandler.java +++ b/extensions/funqy/funqy-amazon-lambda/runtime/src/main/java/io/quarkus/funqy/lambda/event/kinesis/PipesKinesisEventHandler.java @@ -2,10 +2,13 @@ import java.io.ByteArrayInputStream; import java.io.InputStream; +import java.nio.charset.StandardCharsets; import java.util.List; +import java.util.function.Function; import java.util.function.Supplier; import java.util.stream.Stream; +import com.amazonaws.services.lambda.runtime.events.KinesisEvent; import com.amazonaws.services.lambda.runtime.events.StreamsEventResponse; import com.amazonaws.services.lambda.runtime.events.StreamsEventResponse.BatchItemFailure; import com.fasterxml.jackson.databind.util.ByteBufferBackedInputStream; @@ -15,26 +18,27 @@ import io.quarkus.funqy.lambda.model.kinesis.PipesKinesisEvent; public class PipesKinesisEventHandler - implements EventHandler, PipesKinesisEvent, StreamsEventResponse> { + implements EventHandler, KinesisEvent.Record, StreamsEventResponse> { @Override - public Stream streamEvent(List event, FunqyAmazonConfig amazonConfig) { + public Stream streamEvent(List event, FunqyAmazonConfig amazonConfig) { if (event == null) { return Stream.empty(); } - return event.stream(); + return event.stream().map(Function.identity()); } @Override - public String getIdentifier(PipesKinesisEvent message, FunqyAmazonConfig amazonConfig) { + public String getIdentifier(KinesisEvent.Record message, FunqyAmazonConfig amazonConfig) { return message.getSequenceNumber(); } @Override - public Supplier getBody(PipesKinesisEvent message, FunqyAmazonConfig amazonConfig) { + public Supplier getBody(KinesisEvent.Record message, FunqyAmazonConfig amazonConfig) { if (message.getData() == null) { return ByteArrayInputStream::nullInputStream; } + return () -> new ByteBufferBackedInputStream(message.getData()); } @@ -48,7 +52,7 @@ public StreamsEventResponse createResponse(List failures, FunqyAmazonCon } @Override - public Class getMessageClass() { - return PipesKinesisEvent.class; + public Class getMessageClass() { + return KinesisEvent.Record.class; } } diff --git a/extensions/funqy/funqy-amazon-lambda/runtime/src/main/java/io/quarkus/funqy/lambda/model/FunqyMethod.java b/extensions/funqy/funqy-amazon-lambda/runtime/src/main/java/io/quarkus/funqy/lambda/model/FunqyMethod.java index d0f979ebac03c..834b2dabcc4cc 100644 --- a/extensions/funqy/funqy-amazon-lambda/runtime/src/main/java/io/quarkus/funqy/lambda/model/FunqyMethod.java +++ b/extensions/funqy/funqy-amazon-lambda/runtime/src/main/java/io/quarkus/funqy/lambda/model/FunqyMethod.java @@ -8,17 +8,29 @@ public class FunqyMethod { - private final ObjectReader reader; - private final ObjectWriter writer; - private final JavaType inputType; - private final JavaType outputType; + private ObjectReader reader; + private ObjectWriter writer; + private JavaType inputType; + private JavaType outputType; - public FunqyMethod(final ObjectReader reader, final ObjectWriter writer, final JavaType inputType, - final JavaType outputType) { + public FunqyMethod setReader(final ObjectReader reader) { this.reader = reader; + return this; + } + + public FunqyMethod setWriter(final ObjectWriter writer) { this.writer = writer; + return this; + } + + public FunqyMethod setInputType(final JavaType inputType) { this.inputType = inputType; + return this; + } + + public FunqyMethod setOutputType(final JavaType outputType) { this.outputType = outputType; + return this; } public Optional getReader() { diff --git a/extensions/funqy/funqy-amazon-lambda/runtime/src/main/java/io/quarkus/funqy/lambda/model/kinesis/PipesKinesisEvent.java b/extensions/funqy/funqy-amazon-lambda/runtime/src/main/java/io/quarkus/funqy/lambda/model/kinesis/PipesKinesisEvent.java index 78ed01667a209..784eba9ded342 100644 --- a/extensions/funqy/funqy-amazon-lambda/runtime/src/main/java/io/quarkus/funqy/lambda/model/kinesis/PipesKinesisEvent.java +++ b/extensions/funqy/funqy-amazon-lambda/runtime/src/main/java/io/quarkus/funqy/lambda/model/kinesis/PipesKinesisEvent.java @@ -1,14 +1,16 @@ package io.quarkus.funqy.lambda.model.kinesis; -import com.amazonaws.services.lambda.runtime.events.models.kinesis.Record; +import com.amazonaws.services.lambda.runtime.events.KinesisEvent; /** * For some reason AWS decided to flatten the model in EventBridge pipes for Kinesis. So there is no additional * property called kinesis. We use the Record model and add the missing properties. Sadly I could not find a * dedicated model for Kinesis in Pipes. So it is a combination of - * {@link com.amazonaws.services.lambda.runtime.events.KinesisEvent.KinesisEventRecord} and {@link Record} + * {@link com.amazonaws.services.lambda.runtime.events.KinesisEvent.KinesisEventRecord} and {@link KinesisEvent.Record} */ -public class PipesKinesisEvent extends Record { +public class PipesKinesisEvent extends KinesisEvent.Record { + + private static final long serialVersionUID = 4365865918351932405L; private String eventSource;