Skip to content

Commit

Permalink
feat: Funqy Amazon Lambda support more Amazon events
Browse files Browse the repository at this point in the history
- Refactor, so that the implementation is less invasive.
  Using the JacksonInputReader and JacksonOutputWriter for
  the advanced event handling.
- Fix issue in Kinesis event handling
- Adjust and add tests
  • Loading branch information
holomekc committed Jul 9, 2024
1 parent a3ee94f commit 9ba8f44
Show file tree
Hide file tree
Showing 16 changed files with 407 additions and 348 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,8 @@ public void init(List<FunctionBuildItem> functions,

@BuildStep
@Record(RUNTIME_INIT)
public RuntimeComplete choose(FunqyConfig config, FunqyAmazonConfig amazonConfig,
public RuntimeComplete choose(FunqyConfig config,
FunqyAmazonConfig amazonConfig,
FunqyLambdaBindingRecorder recorder) {
recorder.chooseInvoker(config, amazonConfig);
return new RuntimeComplete();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ public Uni<Void> snsFunction(SNSEvent.SNSRecord msg) {

@Funq("cloudevents-function")
public Uni<Void> cloudEventsFunction(CloudEvent msg) {
// Due to jackson deserialization the base64 decoding already happened.
if (new String(msg.getData().toBytes(), StandardCharsets.UTF_8).contains("true")) {
return Uni.createFrom().failure(new IllegalArgumentException("This is an expected error."));
}
Expand All @@ -47,6 +48,7 @@ public Uni<Void> cloudEventsFunction(CloudEvent msg) {

@Funq("kinesis-function")
public Uni<Void> kinesisFunction(KinesisEvent.Record msg) {
// Due to jackson deserialization the base64 decoding already happened.
if (StandardCharsets.UTF_8.decode(msg.getData()).toString().contains("true")) {
return Uni.createFrom().failure(new IllegalArgumentException("This is an expected error."));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
"eventID": "1",
"eventVersion": "1.0",
"dynamodb": {
"ApproximateCreationDateTime": 1719318377.0,
"Keys": {
"Id": {
"N": "1"
Expand Down Expand Up @@ -33,6 +34,7 @@
"eventID": "2",
"eventVersion": "1.0",
"dynamodb": {
"ApproximateCreationDateTime": 1719318377.0,
"NewImage": {
"Message": {
"S": "fail"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,19 +11,20 @@
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.ObjectReader;
import com.fasterxml.jackson.databind.ObjectWriter;
import com.fasterxml.jackson.databind.module.SimpleModule;

import io.quarkus.amazon.lambda.runtime.AbstractLambdaPollLoop;
import io.quarkus.amazon.lambda.runtime.AmazonLambdaContext;
import io.quarkus.amazon.lambda.runtime.AmazonLambdaMapperRecorder;
import io.quarkus.amazon.lambda.runtime.JacksonInputReader;
import io.quarkus.amazon.lambda.runtime.JacksonOutputWriter;
import io.quarkus.amazon.lambda.runtime.LambdaInputReader;
import io.quarkus.amazon.lambda.runtime.LambdaOutputWriter;
import io.quarkus.arc.ManagedContext;
import io.quarkus.arc.runtime.BeanContainer;
import io.quarkus.funqy.lambda.config.FunqyAmazonBuildTimeConfig;
import io.quarkus.funqy.lambda.config.FunqyAmazonConfig;
import io.quarkus.funqy.lambda.event.AwsModule;
import io.quarkus.funqy.lambda.event.EventDeserializer;
import io.quarkus.funqy.lambda.event.AwsEventInputReader;
import io.quarkus.funqy.lambda.event.AwsEventOutputWriter;
import io.quarkus.funqy.lambda.event.EventProcessor;
import io.quarkus.funqy.lambda.model.FunqyMethod;
import io.quarkus.funqy.runtime.FunctionConstructor;
Expand All @@ -45,35 +46,28 @@ public class FunqyLambdaBindingRecorder {

private static FunctionInvoker invoker;
private static BeanContainer beanContainer;
private static LambdaInputReader reader;
private static LambdaOutputWriter writer;
private static EventProcessor eventProcessor;
private static FunqyAmazonBuildTimeConfig amazonBuildTimeConfig;

public void init(BeanContainer bc, FunqyAmazonBuildTimeConfig buildTimeConfig) {
beanContainer = bc;
FunctionConstructor.CONTAINER = bc;
// We create a copy, because we register a custom deserializer for everything.
ObjectMapper objectMapper = AmazonLambdaMapperRecorder.objectMapper.copy();
EventDeserializer eventDeserializer = new EventDeserializer(buildTimeConfig);
final SimpleModule simpleModule = new AwsModule();
simpleModule.addDeserializer(Object.class, eventDeserializer);
objectMapper.registerModule(simpleModule);
amazonBuildTimeConfig = buildTimeConfig;
ObjectMapper objectMapper = AmazonLambdaMapperRecorder.objectMapper;

for (FunctionInvoker invoker : FunctionRecorder.registry.invokers()) {
ObjectReader reader = null;
JavaType javaInputType = null;
if (invoker.hasInput()) {
javaInputType = objectMapper.constructType(invoker.getInputType());
reader = objectMapper.readerFor(javaInputType);
JavaType javaInputType = objectMapper.constructType(invoker.getInputType());
ObjectReader reader = objectMapper.readerFor(javaInputType);
invoker.getBindingContext().put(ObjectReader.class.getName(), reader);
}
ObjectWriter writer = null;
JavaType javaOutputType = null;
if (invoker.hasOutput()) {
javaOutputType = objectMapper.constructType(invoker.getOutputType());
writer = objectMapper.writerFor(javaOutputType);
JavaType javaOutputType = objectMapper.constructType(invoker.getOutputType());
ObjectWriter writer = objectMapper.writerFor(javaOutputType);
invoker.getBindingContext().put(ObjectWriter.class.getName(), writer);
}
invoker.getBindingContext().put(EventProcessor.class.getName(),
new EventProcessor(objectMapper, eventDeserializer,
new FunqyMethod(reader, writer, javaInputType, javaOutputType),
buildTimeConfig));
}
}

Expand All @@ -92,8 +86,34 @@ public void chooseInvoker(FunqyConfig config, FunqyAmazonConfig amazonConfig) {
} else {
invoker = FunctionRecorder.registry.invokers().iterator().next();
}
eventProcessor = (EventProcessor) invoker.getBindingContext().get(EventProcessor.class.getName());
eventProcessor.init(amazonConfig);

ObjectReader objectReader = null;
if (invoker.hasInput()) {
objectReader = (ObjectReader) invoker.getBindingContext().get(ObjectReader.class.getName());

if (amazonBuildTimeConfig.advancedEventHandling().enabled()) {
// We create a copy, because the mapper will be reconfigured for the advanced event handling,
// and we do not want to adjust the ObjectMapper, which is available in arc context.
ObjectMapper objectMapper = AmazonLambdaMapperRecorder.objectMapper.copy();
reader = new AwsEventInputReader(objectMapper, objectReader, amazonBuildTimeConfig);
} else {
reader = new JacksonInputReader(objectReader);
}

}
if (invoker.hasOutput()) {
ObjectWriter objectWriter = (ObjectWriter) invoker.getBindingContext().get(ObjectWriter.class.getName());

if (!amazonBuildTimeConfig.advancedEventHandling().enabled()) {
writer = new JacksonOutputWriter(objectWriter);
}
}
if (amazonBuildTimeConfig.advancedEventHandling().enabled()) {
ObjectMapper objectMapper = AmazonLambdaMapperRecorder.objectMapper.copy();
writer = new AwsEventOutputWriter(objectMapper);

eventProcessor = new EventProcessor(objectReader, amazonBuildTimeConfig, amazonConfig);
}
}

/**
Expand All @@ -109,7 +129,16 @@ public void chooseInvoker(FunqyConfig config, FunqyAmazonConfig amazonConfig) {
* Is thrown in case the (de)serialization fails
*/
public static void handle(InputStream inputStream, OutputStream outputStream, Context context) throws IOException {
eventProcessor.handle(inputStream, outputStream, FunqyLambdaBindingRecorder::dispatch, context);
Object input = null;
if (invoker.hasInput()) {
input = reader.readValue(inputStream);
}
FunqyServerResponse response = dispatch(input, context);

Object value = response.getOutput().await().indefinitely();
if (value != null) {
writer.writeValue(outputStream, value);
}
}

@SuppressWarnings("rawtypes")
Expand All @@ -119,34 +148,43 @@ public void startPollLoop(ShutdownContext context, LaunchMode launchMode) {

@Override
protected Object processRequest(Object input, AmazonLambdaContext context) throws Exception {
throw new RuntimeException("Unreachable");
FunqyServerResponse response = dispatch(input, context);
return response.getOutput().await().indefinitely();
}

@Override
protected LambdaInputReader getInputReader() {
throw new RuntimeException("Unreachable");
return reader;
}

@Override
protected LambdaOutputWriter getOutputWriter() {
throw new RuntimeException("Unreachable");
return writer;
}

@Override
protected boolean isStream() {
return true;
return false;
}

@Override
protected void processRequest(InputStream input, OutputStream output, AmazonLambdaContext context)
throws Exception {
handle(input, output, context);
throw new RuntimeException("Unreachable!");
}
};
loop.startPollLoop(context);

}

private static FunqyServerResponse dispatch(Object input, Context context) throws IOException {
if (eventProcessor != null) {
return eventProcessor.handle(input, FunqyLambdaBindingRecorder::dispatch, context);
} else {
return dispatch(input);
}
}

private static FunqyServerResponse dispatch(Object input) {
ManagedContext requestContext = beanContainer.requestContext();
requestContext.activate();
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,179 @@
package io.quarkus.funqy.lambda.event;

import java.io.IOException;
import java.io.InputStream;
import java.util.Date;
import java.util.List;

import com.amazonaws.services.lambda.runtime.events.DynamodbEvent;
import com.amazonaws.services.lambda.runtime.events.KinesisEvent;
import com.amazonaws.services.lambda.runtime.events.SNSEvent;
import com.amazonaws.services.lambda.runtime.events.SQSEvent;
import com.fasterxml.jackson.core.TreeNode;
import com.fasterxml.jackson.core.type.TypeReference;
import com.fasterxml.jackson.databind.DeserializationFeature;
import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.ObjectReader;
import com.fasterxml.jackson.databind.module.SimpleModule;
import com.fasterxml.jackson.databind.node.ArrayNode;
import com.fasterxml.jackson.databind.node.ObjectNode;

import io.cloudevents.SpecVersion;
import io.quarkus.amazon.lambda.runtime.LambdaInputReader;
import io.quarkus.funqy.lambda.config.FunqyAmazonBuildTimeConfig;
import io.quarkus.funqy.lambda.model.cloudevents.CloudEventV1;
import io.quarkus.funqy.lambda.model.kinesis.PipesKinesisEvent;

public class AwsEventInputReader implements LambdaInputReader<Object> {

private static final String SQS_EVENT_SOURCE = "aws:sqs";
private static final String SNS_EVENT_SOURCE = "aws:sns";
private static final String KINESIS_EVENT_SOURCE = "aws:kinesis";
private static final String DYNAMODB_EVENT_SOURCE = "aws:dynamodb";

final ObjectMapper mapper;
final FunqyAmazonBuildTimeConfig amazonBuildTimeConfig;
final ObjectReader reader;

public AwsEventInputReader(ObjectMapper mapper, ObjectReader reader,
FunqyAmazonBuildTimeConfig amazonBuildTimeConfig) {
// configure the mapper for advanced event handling
final SimpleModule simpleModule = new SimpleModule();
simpleModule.addDeserializer(Date.class, new DateDeserializer());
mapper.registerModule(simpleModule);
mapper.disable(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES);

this.mapper = mapper;
this.amazonBuildTimeConfig = amazonBuildTimeConfig;
this.reader = reader;
}

@Override
public Object readValue(InputStream is) throws IOException {

final JsonNode rootNode = mapper.readTree(is);

if (amazonBuildTimeConfig.advancedEventHandling().enabled()) {
if (rootNode.isObject() || rootNode.isArray()) {
if (rootNode.isObject()) {
// object
ObjectNode object = (ObjectNode) rootNode;

if (object.has("Records") && object.get("Records").isArray()) {
// We need to look into the first record entry, to distinguish the different types.
for (JsonNode record : object.get("Records")) {
return deserializeEvent(record, object);
}
}
} else {
// array. This happens in case of AWS EventBridge usage, and is also the only way to enable
// CloudEvents usage
ArrayNode array = (ArrayNode) rootNode;
for (JsonNode record : array) {
return deserializeEvent(record, array);
}
}
}
}
// We could not identify the event based on the content. Fallback to deserializing the funq method input type.
return deserializeFunqReturnType(rootNode);
}

public Object deserializeEvent(JsonNode record, JsonNode rootNode) throws IOException {
Object eventClass = getEventType(record, rootNode);

if (eventClass != null) {
if (eventClass instanceof Class<?> clazz) {
return mapper.convertValue(rootNode, clazz);
} else if (eventClass instanceof TypeReference<?> typeReference) {
return mapper.convertValue(rootNode, typeReference);
}
}
// We could not identify the event based on the content. Fallback to deserializing the funq method input type.
return deserializeFunqReturnType(rootNode);
}

private Object getEventType(JsonNode record, JsonNode object) {
String eventSource = getEventSource(record);

if (eventSource == null) {
eventSource = "default";
}

// See: https://docs.aws.amazon.com/lambda/latest/dg/lambda-services.html
// and for Pipes: https://docs.aws.amazon.com/eventbridge/latest/userguide/eb-pipes-event-source.html
Class<?> eventType = null;
switch (eventSource) {
case SQS_EVENT_SOURCE:
if (object.isObject()) {
eventType = SQSEvent.class;
} else if (object.isArray()) {
// EventBridge Pipes
return new TypeReference<List<SQSEvent.SQSMessage>>() {
};
}
break;
case SNS_EVENT_SOURCE:
eventType = SNSEvent.class;
break;
case KINESIS_EVENT_SOURCE:
// Exclude Kinesis time window events. This would require to send a responsible with a state.
// This is very specific to AWS and maybe not the way funqy wants to handle things.
if (object.isObject() && !object.has("window")) {
eventType = KinesisEvent.class;
} else if (object.isArray()) {
// EventBridge Pipes
return new TypeReference<List<PipesKinesisEvent>>() {
};
}
break;
case DYNAMODB_EVENT_SOURCE:
if (object.isObject()) {
eventType = DynamodbEvent.class;
} else if (object.isArray()) {
// EventBridge Pipes
return new TypeReference<List<DynamodbEvent.DynamodbStreamRecord>>() {
};
}
break;
default:
break;
}
if (eventType == null && isCloudEvent(record)) {
return new TypeReference<List<CloudEventV1>>() {
};
}
return eventType;
}

private boolean isCloudEvent(final JsonNode record) {
// this is the best guess we can do. We check for required attributes:
// https://github.com/cloudevents/spec/blob/v1.0.2/cloudevents/spec.md#required-attributes
// A more tolerant way to check the type. We do not want the process to fail. We can fall back to
// the best guess logic.
return record.has("specversion") && record.get("specversion").isTextual() &&
SpecVersion.V1.toString().equals(record.get("specversion").asText()) &&
record.has("type");
}

private String getEventSource(JsonNode record) {
if (record.has("eventSource") && record.get("eventSource").isTextual()) {
return record.get("eventSource").asText();
}

// Unsure. In the AWS SNS documentation the key starts with a capital letter. I assume this is a mistake,
// but it should not hurt to be that tolerant as well.
if (record.has("EventSource") && record.get("EventSource").isTextual()) {
return record.get("EventSource").asText();
}
return null;
}

private Object deserializeFunqReturnType(TreeNode node) throws IOException {
if (reader != null) {
return reader.readValue(node.traverse());
}
return null;
}
}
Loading

0 comments on commit 9ba8f44

Please sign in to comment.