Skip to content

Commit

Permalink
Merge pull request #5 from onema/feature-attachments
Browse files Browse the repository at this point in the history
Large attachments
  • Loading branch information
onema authored Jan 26, 2019
2 parents 6512da0 + 391ad9d commit a47bd6f
Show file tree
Hide file tree
Showing 12 changed files with 283 additions and 111 deletions.
23 changes: 12 additions & 11 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ LambdaMailer
LambdaMailer is a simple Serverless mailing system for AWS SES build on [µServerless](https://github.com/onema/uServerless) and [Scala](https://www.scala-lang.org/)
and deployed using the [serverless framework](https://serverless.com).

The project defines three lambda functions (mailer, bouce, and forwarder) and a dynamodb table to keep track of bounces and complaints.
The project defines three lambda functions (mailer, bounce, and forwarder) and a dynamodb table to keep track of bounces and complaints.

![Serverless Lambda Mailer](docs/img/ServerlessLambdaMailer.png)

Expand All @@ -21,18 +21,20 @@ a JSON object with the following format:
"subject": "test01",
"body": "<h1>Test Body</h1>",
"replyTo": "no-reply@mailer.mydomain.com",
"raw": false
"raw": false,
"attachments": ["s3/path/in/the/attachments/bucket.png"]
}
```

| Field | type | description |
|-----------|--------------|---------------------------------------------------------------------------------------------|
| `to` | List\[String\] | A list of valid email addresses |
| `from` | String | Email address that will be use to send the email, this must be an approved email or domain. |
| `subject` | String | The subject of the email |
| `body` | String | HTML or Plain Text body of the email |
| `replyTo` | String | The reply-to email address, can be different from the `from` email |
| `raw` | Boolean | Specifies weather the body of the email is a raw email message (true) or text/html (false) |
| Field | type | description |
|-------------- |--------------|---------------------------------------------------------------------------------------------|
| `to` | List\[String\] | A list of valid email addresses |
| `from` | String | Email address that will be use to send the email, this must be an approved email or domain. |
| `subject` | String | The subject of the email |
| `body` | String | HTML or Plain Text body of the email |
| `replyTo` | String | The reply-to email address, can be different from the `from` email |
| `raw` | Boolean | Specifies weather the body of the email is a raw email message (true) or text/html (false) |
| `attachments` | List\[String\] | A list containing keys in the attachments bucket |


The mailer will check each email address against the bounce dynamodb table to prevent sending emails to
Expand Down Expand Up @@ -67,7 +69,6 @@ For more information see the installation instructions below.
## Installation
You must build the project before it is deployed using SBT:
```bash
sbt compile
sbt assembly
```

Expand Down
7 changes: 4 additions & 3 deletions build.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -6,16 +6,17 @@ lazy val root = (project in file("."))

name := "lambda-mailer",

version := "0.6.0",
version := "0.7.0",

scalaVersion := "2.12.7",

libraryDependencies ++= {
val awsSdkVersion = "1.11.451"
Seq(
// dependencies
"io.onema" % "userverless-core_2.12" % "0.1.0",
"org.apache.commons" % "commons-email" % "1.5",
"io.onema" % "userverless-core_2.12" % "0.2.2",
"io.onema" % "vff_2.12" % "0.5.2",
"org.apache.commons" % "commons-email" % "1.5",

// AWS Clients
"com.amazonaws" % "aws-java-sdk-ses" % awsSdkVersion,
Expand Down
Binary file modified docs/img/ServerlessLambdaMailer.png
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
15 changes: 9 additions & 6 deletions infrastructure/global-values.yml
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@ iamRoleStatements:
- sns:Publish
Resource:
- "arn:aws:sns:*:*:${self:custom.stageName}-mailer-bounce"
- "arn:aws:sns:*:*:${self:custom.stageName}-bounce-topic"
- "arn:aws:sns:*:*:${self:custom.stageName}-mailer"
- Effect: Allow
Action:
Expand Down Expand Up @@ -33,18 +32,22 @@ iamRoleStatements:
Resource:
- Fn::Join: ["/", [Fn::GetAtt: [ForwarderS3Bucket, Arn], "*"]]
- Fn::GetAtt: [ForwarderS3Bucket, Arn]
- Effect: Allow
Action:
- s3:PutObject
- s3:GetObject
Resource:
- Fn::Join: ["/", [Fn::GetAtt: [MailerAttachments, Arn], "*"]]
- Fn::GetAtt: [MailerAttachments, Arn]

environment:
STAGE_NAME: ${self:custom.stageName}
LOG_LEVEL: DEBUG
APP_NAME: ${self:service}
SNS_MAILER_TOPIC:
Fn::Join: [':', ["arn:aws:sns", Ref: "AWS::Region", Ref: "AWS::AccountId", "${self:custom.stageName}-mailer"]]
ATTACHMENT_BUCKET:
Ref: MailerAttachments

# The error topic is part of the bootstrap infrastructure for uServerless

custom:
stageName: ${opt:stage, self:provider.stage}

deadLetterQueue:
Fn::ImportValue: "${self:custom.stageName}-dead-letter-queue-arn"
55 changes: 42 additions & 13 deletions infrastructure/mailer-resources_cfn.yml
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ Resources:
LambdaMailerTable:
Type: AWS::DynamoDB::Table
Properties:
BillingMode: PAY_PER_REQUEST
AttributeDefinitions:
- AttributeName:
Ref: PartitionKey
Expand Down Expand Up @@ -56,20 +57,28 @@ Resources:
KeyType: HASH
Projection:
ProjectionType: ALL
ProvisionedThroughput:
ReadCapacityUnits: 1
WriteCapacityUnits: 1

ProvisionedThroughput:
ReadCapacityUnits:
Ref: ReadCapacityUnits
WriteCapacityUnits:
Ref: WriteCapacityUnits
# Change the billing mode to PROVISIONED and uncomment if you want to define the read/write capacity units
# ProvisionedThroughput:
# ReadCapacityUnits: 1
# WriteCapacityUnits: 1
# ProvisionedThroughput:
# ReadCapacityUnits:
# Ref: ReadCapacityUnits
# WriteCapacityUnits:
# Ref: WriteCapacityUnits

TimeToLiveSpecification:
AttributeName: ExpirationTime
Enabled: true

MailerAttachments:
Type: AWS::S3::Bucket
Properties:
LifecycleConfiguration:
Rules:
- ExpirationInDays: 30
Id: delete-after-30-days
Status: Enabled
ForwarderS3Bucket:
Type: AWS::S3::Bucket
Properties:
Expand Down Expand Up @@ -97,19 +106,29 @@ Resources:
StringEquals:
aws:Referer:
Ref: AWS::AccountId
CreateTestDBPolicy:
Type: AWS::IAM::ManagedPolicy
Properties:
Description: Policy for third arties to add files to the mailer attachments bucket
Path: "/"
PolicyDocument:
Version: "2012-10-17"
Statement:
- Effect: Allow
Action: "s3:PutObject"
Resource:
Fn::GetAtt: [MailerAttachments, Arn]


Outputs:
LambdaMailerTopicArn:
Value:
Ref: SNSTopicMailer
Value: "arn:aws:sns:*:*:${self:custom.stageName}-mailer"
Export:
Name:
Fn::Join: ["-", [Ref: StageName, "lambda-mailer-topic-arn"]]

LambdaMailerBounceTopicArn:
Value:
Ref: SNSTopicMailerbounce
Value: "arn:aws:sns:*:*:${self:custom.stageName}-mailer-bounce"
Export:
Name:
Fn::Join: ["-", [Ref: StageName, "lambda-mailer-bounce-topic-arn"]]
Expand All @@ -118,6 +137,16 @@ Outputs:
Value:
Ref: LambdaMailerTable

MailerAttachmentsS3BucketArn:
Value:
Fn::GetAtt: [MailerAttachments, Arn]
Description: ARN of S3 bucket for storing mailer attachments

MailerAttachmentsS3BucketName:
Value:
Ref: MailerAttachments
Description: Name of S3 bucket for storing mailer attachments

ForwarderS3BucketArn:
Value:
Fn::GetAtt: [ForwarderS3Bucket, Arn]
Expand Down
9 changes: 4 additions & 5 deletions serverless.yml
Original file line number Diff line number Diff line change
Expand Up @@ -41,27 +41,26 @@ functions:
# functions
mailer-bounce:
handler: io.onema.bounce.BounceFunction::lambdaHandler
onError: ${file(infrastructure/global-values.yml):deadLetterQueue}
environment:
TABLE_NAME:
Ref: LambdaMailerTable
events:
- sns: mailer-bounce
- sns: ${self:custom.stageName}-mailer-bounce

mailer:
handler: io.onema.mailer.MailerFunction::lambdaHandler
onError: ${file(infrastructure/global-values.yml):deadLetterQueue}
environment:
LOG_EMAIL: ${env:LOG_EMAIL, 'false'}
TABLE_NAME:
Ref: LambdaMailerTable
events:
- sns: mailer
- sns: ${self:custom.stageName}-mailer

forwarder:
handler: io.onema.forwarder.ForwarderFunction::lambdaHandler
onError: ${file(infrastructure/global-values.yml):deadLetterQueue}
environment:
SNS_MAILER_TOPIC:
Fn::Join: [":", ["arn:aws:sns", Ref: "AWS::Region", Ref: "AWS::AccountId", "${self:custom.stageName}-mailer"]]
EMAIL_MAPPING: ${env:EMAIL_MAPPING}
FORWARDER_S3_BUCKET:
Ref: ForwarderS3Bucket
Expand Down
1 change: 1 addition & 0 deletions src/main/scala/io/onema/forwarder/ForwarderFunction.scala
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ class ForwarderFunction extends LambdaHandler[SesEvent, Unit] with EnvLambdaConf
mailerTopic = getValue("sns/mailer/topic").get,
s3Client = AmazonS3ClientBuilder.defaultClient(),
bucketName = getValue("forwarder/s3/bucket").get,
attachmentBucket = getValue("attachment/bucket").get,
shouldLog
)

Expand Down
94 changes: 52 additions & 42 deletions src/main/scala/io/onema/forwarder/ForwarderLogic.scala
Original file line number Diff line number Diff line change
Expand Up @@ -11,28 +11,30 @@

package io.onema.forwarder

import java.io.{ByteArrayInputStream, ByteArrayOutputStream}
import java.util
import java.io.ByteArrayInputStream
import java.util.Properties

import com.amazonaws.services.s3.AmazonS3
import com.amazonaws.services.s3.model.ObjectMetadata
import com.amazonaws.services.sns.AmazonSNS
import com.sun.mail.smtp.SMTPMessage
import com.typesafe.scalalogging.Logger
import io.onema.forwarder.ForwarderLogic.SesMessage
import io.onema.json.Extensions._
import io.onema.mailer.MailerLogic.Email
import io.onema.userverless.monitoring.LogMetrics._
import javax.mail.internet.InternetAddress
import javax.mail.{Address, Message, Session}
import io.onema.vff.FileSystem
import io.onema.vff.adapter.AwsS3Adapter
import io.onema.vff.extensions.StreamExtensions._
import javax.activation.DataSource
import javax.mail.Session
import net.logstash.logback.argument.StructuredArguments._
import org.apache.commons.mail.util.MimeMessageParser
import org.json4s.FieldSerializer._
import org.json4s.jackson.Serialization
import org.json4s.{FieldSerializer, Formats, NoTypeHints}

import scala.collection.JavaConverters._
import scala.collection.mutable.ArrayBuffer
import scala.io.Source

object ForwarderLogic {
Expand Down Expand Up @@ -77,16 +79,17 @@ object ForwarderLogic {
}
}

class ForwarderLogic(val snsClient: AmazonSNS, val mailerTopic: String, val s3Client: AmazonS3, val bucketName: String, logEmail: Boolean = false) {
class ForwarderLogic(val snsClient: AmazonSNS, val mailerTopic: String, val s3Client: AmazonS3, val bucketName: String, val attachmentBucket: String, val logEmail: Boolean = false) {

//--- Fields ---
protected val log = Logger("forwarder-logic")
private val headersToRemove = ArrayBuffer("DKIM-Signature", "X-SES-DKIM-SIGNATURE").toArray
private val s3 = new FileSystem(new AwsS3Adapter(s3Client, attachmentBucket))

//--- Methods ---
def handleRequest(message: SesMessage, emailMapping: String): Unit = {

// The email mapping will get us all the addresses associated with the forwarder address
val messageId = message.mail.messageId
val allowedForwardingEmailMapping = parseEmailMapping(emailMapping)
val resultingForwardingEmails = getForwardingEmailAddresses(message.receipt.recipients, allowedForwardingEmailMapping)
log.debug(s"addresses found: $resultingForwardingEmails")
Expand All @@ -98,13 +101,14 @@ class ForwarderLogic(val snsClient: AmazonSNS, val mailerTopic: String, val s3Cl
// iterate over each mapped address and forward the email
resultingForwardingEmails.foreach{case (from, toEmails) =>
toEmails.foreach(to => {
val contentMessage = smtpMessage(originalContent, message, to, from)
val content = rawMessage(contentMessage)
val (parser, _) = parseMessage(originalContent)
val htmlContent = content(parser)
val att = attachments(parser, messageId)
val subject = message.mail.commonHeaders.subject
val originalSender = contentMessage.getReplyTo()(0).toString
val originalSender: String = message.mail.commonHeaders.from.mkString(",")

// Send message to mailer
val emailMessage = Email(Seq(to), from, subject, content, raw = true).asJson
val emailMessage = Email(Seq(to), from, subject, htmlContent, replyTo = Some(originalSender), attachments = att).asJson
log.debug(s"MESSAGE: $emailMessage", keyValue("SUBJECT", subject))
snsClient.publish(mailerTopic, emailMessage)
if(logEmail) count("ForwardingEmail", ("from email", originalSender))
Expand All @@ -120,41 +124,47 @@ class ForwarderLogic(val snsClient: AmazonSNS, val mailerTopic: String, val s3Cl
(parser.parse(), smtpMessage)
}

private def smtpMessage(content: String, message: SesMessage, to: String, from: String): SMTPMessage = {

// parse the content and get the SMTP Message
val (parser, smtpMessage) = parseMessage(content)

// Set reply-to address
val replyTo: Array[Address] = message.mail.commonHeaders.from.map(new InternetAddress(_)).toArray
smtpMessage.setReplyTo(replyTo)

// Set to address
val toAddresses: Array[Address] = Seq(new InternetAddress(to)).toArray
smtpMessage.setRecipients(Message.RecipientType.TO, toAddresses)

// Remove unwanted headers
headersToRemove.foreach(smtpMessage.removeHeader)
private def content(mimeMessageParser: MimeMessageParser): String = {
if(mimeMessageParser.hasHtmlContent) {
mimeMessageParser.getHtmlContent
} else {
mimeMessageParser.getPlainContent
}
}

// Set from address
smtpMessage.setFrom(from)
smtpMessage.setEnvelopeFrom(from)
smtpMessage.setHeader("Return-Path", from)
smtpMessage.setSender(new InternetAddress(from))
smtpMessage
private def attachments(mimeMessageParser: MimeMessageParser, messageId: String): Option[Seq[String]] = {
if(mimeMessageParser.hasAttachments) {
val attachmentMap = mimeMessageParser
.getContentIds.asScala
.map(x => (x, mimeMessageParser.findAttachmentByCid(x)))
.filter(_._2 != null)
.toMap
val files = if(attachmentMap.size == mimeMessageParser.getAttachmentList.size()) {
attachmentMap.map { case(cid, file) => uploadAttachment(file,cid, messageId)}
} else {
mimeMessageParser.getAttachmentList.asScala.map(uploadAttachment(_, messageId))
}
Some(files.toSeq)
} else {
None
}
}

private def rawMessage(smtpMessage: SMTPMessage): String = {
private def uploadAttachment(x: DataSource, messageId: String): String = {
val fileBytes = x.getInputStream.toBytes
val destinationName = s"/$messageId/${x.getName}"
log.debug(s"Uploading attachment to $destinationName")
s3.write(destinationName, fileBytes)
destinationName
}

// Recreate raw email
val os = new ByteArrayOutputStream()
smtpMessage.writeTo(os)
log.debug("RAW EMAIL info", keyValue("FROM", smtpMessage.getFrom), keyValue("REPLY-TO", s"${smtpMessage.getReplyTo}"))
val newMessage = os.toString()
if(newMessage.getBytes.length > 131072) {
log.warn(s"${newMessage.getBytes.length} byte payload is too large for the SNS Event invocation (limit 131072 bytes)")
}
newMessage
private def uploadAttachment(x: DataSource, contentId: String, messageId: String): String = {
val destinationName = s"$messageId/${x.getName}"
log.debug(s"Uploading attachment to $destinationName")
val metadata = new ObjectMetadata()
metadata.addUserMetadata("ContentId", contentId)
s3Client.putObject(attachmentBucket, destinationName, x.getInputStream, metadata)
destinationName
}

private def parseEmailMapping(mapping: String): Map[String, Seq[String]] = {
Expand Down
Loading

0 comments on commit a47bd6f

Please sign in to comment.