From 15deda7aff6f19948271373ee5cd1269f8be2982 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Lu=C3=ADs=20Duarte?= Date: Fri, 23 Aug 2024 10:03:15 +0100 Subject: [PATCH] feat: add email worker support --- Cargo.lock | 8 ++ examples/email/Cargo.toml | 18 +++ examples/email/src/lib.rs | 22 ++++ examples/email/wrangler.toml | 6 + worker-build/src/js/shim.js | 5 + worker-macros/src/event.rs | 41 +++++++ worker-sys/src/types.rs | 2 + worker-sys/src/types/email.rs | 64 +++++++++++ worker/src/email.rs | 207 ++++++++++++++++++++++++++++++++++ worker/src/env.rs | 6 + worker/src/lib.rs | 2 + 11 files changed, 381 insertions(+) create mode 100644 examples/email/Cargo.toml create mode 100644 examples/email/src/lib.rs create mode 100644 examples/email/wrangler.toml create mode 100644 worker-sys/src/types/email.rs create mode 100644 worker/src/email.rs diff --git a/Cargo.lock b/Cargo.lock index 4c8462f9..f027cc62 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -703,6 +703,14 @@ version = "1.13.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "60b1af1c220855b6ceac025d3f6ecdd2b7c4894bfe9cd9bda4fbb4bc7c0d4cf0" +[[package]] +name = "email" +version = "0.1.0" +dependencies = [ + "console_error_panic_hook", + "worker", +] + [[package]] name = "encode_unicode" version = "0.3.6" diff --git a/examples/email/Cargo.toml b/examples/email/Cargo.toml new file mode 100644 index 00000000..ff13ac1b --- /dev/null +++ b/examples/email/Cargo.toml @@ -0,0 +1,18 @@ +[package] +name = "email" +version = "0.1.0" +edition = "2021" + +[package.metadata.release] +release = false + +# https://github.com/rustwasm/wasm-pack/issues/1247 +[package.metadata.wasm-pack.profile.release] +wasm-opt = false + +[lib] +crate-type = ["cdylib"] + +[dependencies] +worker = { workspace=true } +console_error_panic_hook = { version = "0.1.1" } diff --git a/examples/email/src/lib.rs b/examples/email/src/lib.rs new file mode 100644 index 00000000..7f6bd5bd --- /dev/null +++ b/examples/email/src/lib.rs @@ -0,0 +1,22 @@ +use core::str; + +use worker::*; + +#[event(email)] +async fn email(message: ForwardableEmailMessage, _env: Env, _ctx: Context) -> Result<()> { + console_error_panic_hook::set_once(); + + let allow_list = vec!["another@example.com", "coworker@example.com"]; + let from = message.from_envelope(); + + let raw: Vec = message.raw_bytes().await?; + let raw = str::from_utf8(&raw)?; + console_log!("Raw email: {}", raw); + + if allow_list.contains(&from.as_str()) { + message.forward("mailbox@anotherexample.com", None).await?; + } else { + message.set_reject("Address not allowed")?; + } + Ok(()) +} diff --git a/examples/email/wrangler.toml b/examples/email/wrangler.toml new file mode 100644 index 00000000..8bbe1d6b --- /dev/null +++ b/examples/email/wrangler.toml @@ -0,0 +1,6 @@ +name = "email-worker" +main = "build/worker/shim.mjs" +compatibility_date = "2024-08-13" + +[build] +command = "cargo install -q worker-build && worker-build --release" \ No newline at end of file diff --git a/worker-build/src/js/shim.js b/worker-build/src/js/shim.js index 19d55198..6efdbdd5 100644 --- a/worker-build/src/js/shim.js +++ b/worker-build/src/js/shim.js @@ -20,6 +20,10 @@ class Entrypoint extends WorkerEntrypoint { async scheduled(event) { return await imports.scheduled(event, this.env, this.ctx) } + + async email(message) { + return await imports.email(message, this.env, this.ctx) + } } const EXCLUDE_EXPORT = [ @@ -33,6 +37,7 @@ const EXCLUDE_EXPORT = [ "fetch", "queue", "scheduled", + "email", "getMemory" ]; diff --git a/worker-macros/src/event.rs b/worker-macros/src/event.rs index 07d7f22c..656ee655 100644 --- a/worker-macros/src/event.rs +++ b/worker-macros/src/event.rs @@ -12,6 +12,7 @@ pub fn expand_macro(attr: TokenStream, item: TokenStream) -> TokenStream { Start, #[cfg(feature = "queue")] Queue, + Email, } use HandlerType::*; @@ -28,6 +29,7 @@ pub fn expand_macro(attr: TokenStream, item: TokenStream) -> TokenStream { "respond_with_errors" => { respond_with_errors = true; } + "email" => handler_type = Some(Email), _ => panic!("Invalid attribute: {}", attr), } } @@ -183,6 +185,45 @@ pub fn expand_macro(attr: TokenStream, item: TokenStream) -> TokenStream { TokenStream::from(output) } + Email => { + let input_fn_ident = Ident::new( + &(input_fn.sig.ident.to_string() + "_email_glue"), + input_fn.sig.ident.span(), + ); + let wrapper_fn_ident = Ident::new("email", input_fn.sig.ident.span()); + // rename the original attributed fn + input_fn.sig.ident = input_fn_ident.clone(); + + let wrapper_fn = quote! { + pub async fn #wrapper_fn_ident(message: ::worker::worker_sys::ForwardableEmailMessage, env: ::worker::Env, ctx: ::worker::worker_sys::Context) { + // call the original fn + let ctx = worker::Context::new(ctx); + match #input_fn_ident(::worker::ForwardableEmailMessage::from(message), env, ctx).await { + Ok(()) => {}, + Err(e) => { + ::worker::console_log!("{}", &e); + panic!("{}", e); + } + } + } + }; + + let wasm_bindgen_code = + wasm_bindgen_macro_support::expand(TokenStream::new().into(), wrapper_fn) + .expect("wasm_bindgen macro failed to expand"); + + let output = quote! { + #input_fn + + mod _worker_email { + use ::worker::{wasm_bindgen, wasm_bindgen_futures}; + use super::#input_fn_ident; + #wasm_bindgen_code + } + }; + + TokenStream::from(output) + } Start => { // save original fn name for re-use in the wrapper fn let input_fn_ident = Ident::new( diff --git a/worker-sys/src/types.rs b/worker-sys/src/types.rs index 76235254..af3818d3 100644 --- a/worker-sys/src/types.rs +++ b/worker-sys/src/types.rs @@ -4,6 +4,7 @@ mod crypto; mod d1; mod durable_object; mod dynamic_dispatcher; +mod email; mod fetcher; mod fixed_length_stream; mod hyperdrive; @@ -23,6 +24,7 @@ pub use crypto::*; pub use d1::*; pub use durable_object::*; pub use dynamic_dispatcher::*; +pub use email::*; pub use fetcher::*; pub use fixed_length_stream::*; pub use hyperdrive::*; diff --git a/worker-sys/src/types/email.rs b/worker-sys/src/types/email.rs new file mode 100644 index 00000000..b73dbbf7 --- /dev/null +++ b/worker-sys/src/types/email.rs @@ -0,0 +1,64 @@ +use js_sys::Promise; +use wasm_bindgen::prelude::*; +use web_sys::{Headers, ReadableStream}; + +#[wasm_bindgen] +extern "C" { + #[wasm_bindgen(extends=js_sys::Object)] + #[derive(Clone, PartialEq, Eq)] + pub type EmailMessage; + + // TODO(lduarte): see if also accepting string is needed + #[wasm_bindgen(constructor, catch)] + pub fn new(from: &str, to: &str, raw: &str) -> Result; + + #[wasm_bindgen(method, getter)] + pub fn from(this: &EmailMessage) -> String; + + #[wasm_bindgen(method, getter)] + pub fn to(this: &EmailMessage) -> String; +} + +#[wasm_bindgen] +extern "C" { + #[wasm_bindgen(extends=js_sys::Object)] + #[derive(Clone, PartialEq, Eq)] + pub type ForwardableEmailMessage; + + #[wasm_bindgen(method, getter)] + pub fn from(this: &ForwardableEmailMessage) -> String; + + #[wasm_bindgen(method, getter)] + pub fn to(this: &ForwardableEmailMessage) -> String; + + #[wasm_bindgen(method, getter)] + pub fn raw(this: &ForwardableEmailMessage) -> ReadableStream; + + // File size will never pass over 4GB so u32 is enough + #[wasm_bindgen(method, getter, js_name=rawSize)] + pub fn raw_size(this: &ForwardableEmailMessage) -> u32; + + #[wasm_bindgen(method, catch, js_name=setReject)] + pub fn set_reject(this: &ForwardableEmailMessage, reason: &str) -> Result<(), JsValue>; + + #[wasm_bindgen(method, catch)] + pub fn forward( + this: &ForwardableEmailMessage, + rcpt_to: &str, + headers: Headers, + ) -> Result; + + #[wasm_bindgen(method, catch)] + pub fn reply(this: &ForwardableEmailMessage, email: EmailMessage) -> Result; +} + +#[wasm_bindgen] +extern "C" { + #[wasm_bindgen(extends=js_sys::Object)] + #[derive(Clone, PartialEq, Eq)] + pub type SendEmail; + + #[wasm_bindgen(method, catch)] + pub fn send(this: &SendEmail, email: EmailMessage) -> Result; + +} diff --git a/worker/src/email.rs b/worker/src/email.rs new file mode 100644 index 00000000..c929086c --- /dev/null +++ b/worker/src/email.rs @@ -0,0 +1,207 @@ +use futures_util::TryStreamExt; +use wasm_bindgen::JsCast; +use wasm_bindgen::JsValue; +use wasm_bindgen_futures::JsFuture; +use worker_sys::EmailMessage as EmailMessageExt; +use worker_sys::ForwardableEmailMessage as ForwardableEmailMessageExt; +use worker_sys::SendEmail as SendEmailExt; + +use crate::ByteStream; +use crate::EnvBinding; +use crate::Error; +use crate::Headers; + +pub struct EmailMessage(EmailMessageExt); + +unsafe impl Send for EmailMessage {} +unsafe impl Sync for EmailMessage {} + +impl JsCast for EmailMessage { + fn instanceof(val: &JsValue) -> bool { + val.is_instance_of::() + } + + fn unchecked_from_js(val: JsValue) -> Self { + EmailMessage(val.unchecked_into()) + } + + fn unchecked_from_js_ref(val: &JsValue) -> &Self { + unsafe { &*(val as *const JsValue as *const Self) } + } +} + +impl AsRef for EmailMessage { + fn as_ref(&self) -> &JsValue { + &self.0 + } +} + +impl From for EmailMessage { + fn from(val: JsValue) -> Self { + EmailMessage(val.unchecked_into()) + } +} + +impl From for JsValue { + fn from(sec: EmailMessage) -> Self { + sec.0.into() + } +} + +impl EmailMessage { + pub fn new(from: &str, to: &str, raw: &str) -> Result { + Ok(EmailMessage(EmailMessageExt::new(&from, &to, &raw)?)) + } + + // method from is renamed compared to the JS version, because `from` conflicts with the From trait + // to was also renamed for consistency + pub fn from_envelope(&self) -> String { + self.0.from() + } + + pub fn to_envelope(&self) -> String { + self.0.to() + } +} + +pub struct ForwardableEmailMessage(ForwardableEmailMessageExt); + +unsafe impl Send for ForwardableEmailMessage {} +unsafe impl Sync for ForwardableEmailMessage {} + +impl JsCast for ForwardableEmailMessage { + fn instanceof(val: &JsValue) -> bool { + val.is_instance_of::() + } + + fn unchecked_from_js(val: JsValue) -> Self { + ForwardableEmailMessage(val.unchecked_into()) + } + + fn unchecked_from_js_ref(val: &JsValue) -> &Self { + unsafe { &*(val as *const JsValue as *const Self) } + } +} + +impl AsRef for ForwardableEmailMessage { + fn as_ref(&self) -> &JsValue { + &self.0 + } +} + +impl From for ForwardableEmailMessage { + fn from(val: JsValue) -> Self { + ForwardableEmailMessage(val.unchecked_into()) + } +} + +impl From for JsValue { + fn from(sec: ForwardableEmailMessage) -> Self { + sec.0.into() + } +} + +impl From for ForwardableEmailMessage { + fn from(value: ForwardableEmailMessageExt) -> Self { + ForwardableEmailMessage(value) + } +} + +impl ForwardableEmailMessage { + // method from is renamed compared to the JS version, because `from` conflicts with the From trait + // to was also renamed for consistency + pub fn from_envelope(&self) -> String { + self.0.from() + } + + pub fn to_envelope(&self) -> String { + self.0.to() + } + + pub fn raw(&self) -> Result { + Ok(ByteStream { + inner: wasm_streams::ReadableStream::from_raw(self.0.raw().dyn_into()?).into_stream(), + }) + } + + pub async fn raw_bytes(&self) -> Result, Error> { + self.raw()? + .try_fold(Vec::new(), |mut bytes, mut chunk| async move { + bytes.append(&mut chunk); + Ok(bytes) + }) + .await + } + + pub fn raw_size(&self) -> u32 { + self.0.raw_size() + } + + pub fn set_reject(&self, reason: &str) -> Result<(), JsValue> { + self.0.set_reject(reason) + } + + pub async fn forward(&self, rcpt_to: &str, headers: Option) -> Result<(), JsValue> { + JsFuture::from(self.0.forward(rcpt_to, headers.unwrap_or_default().0)?).await?; + Ok(()) + } + + pub async fn reply(&self, message: EmailMessage) -> Result<(), JsValue> { + JsFuture::from(self.0.reply(message.0)?).await?; + Ok(()) + } +} + +pub struct SendEmail(SendEmailExt); + +unsafe impl Send for SendEmail {} +unsafe impl Sync for SendEmail {} + +impl JsCast for SendEmail { + fn instanceof(val: &JsValue) -> bool { + val.is_instance_of::() + } + + fn unchecked_from_js(val: JsValue) -> Self { + SendEmail(val.unchecked_into()) + } + + fn unchecked_from_js_ref(val: &JsValue) -> &Self { + unsafe { &*(val as *const JsValue as *const Self) } + } +} + +impl AsRef for SendEmail { + fn as_ref(&self) -> &JsValue { + &self.0 + } +} + +impl From for SendEmail { + fn from(val: JsValue) -> Self { + SendEmail(val.unchecked_into()) + } +} + +impl From for JsValue { + fn from(sec: SendEmail) -> Self { + sec.0.into() + } +} + +impl From for SendEmail { + fn from(value: SendEmailExt) -> Self { + SendEmail(value) + } +} + +impl EnvBinding for SendEmail { + const TYPE_NAME: &'static str = "SendEmail"; +} + +impl SendEmail { + pub async fn send(&self, email: EmailMessage) -> Result<(), JsValue> { + JsFuture::from(self.0.send(email.0)?).await?; + Ok(()) + } +} diff --git a/worker/src/env.rs b/worker/src/env.rs index fe79d03a..e2a271ca 100644 --- a/worker/src/env.rs +++ b/worker/src/env.rs @@ -4,6 +4,7 @@ use std::fmt::Display; use crate::d1::D1Database; #[cfg(feature = "queue")] use crate::Queue; +use crate::SendEmail; use crate::{durable::ObjectNamespace, Bucket, DynamicDispatcher, Fetcher, Result}; use crate::{error::Error, hyperdrive::Hyperdrive}; @@ -89,6 +90,11 @@ impl Env { pub fn hyperdrive(&self, binding: &str) -> Result { self.get_binding(binding) } + + /// Access send_email binding for sending emails to verified destinations configured in your wrangler.toml file. + pub fn send_email(&self, binding: &str) -> Result { + self.get_binding(binding) + } } pub trait EnvBinding: Sized + JsCast { diff --git a/worker/src/lib.rs b/worker/src/lib.rs index bbaac220..21cb0680 100644 --- a/worker/src/lib.rs +++ b/worker/src/lib.rs @@ -172,6 +172,7 @@ pub use crate::date::{Date, DateInit}; pub use crate::delay::Delay; pub use crate::durable::*; pub use crate::dynamic_dispatch::*; +pub use crate::email::*; pub use crate::env::{Env, EnvBinding, Secret, Var}; pub use crate::error::Error; pub use crate::fetcher::Fetcher; @@ -207,6 +208,7 @@ mod date; mod delay; pub mod durable; mod dynamic_dispatch; +mod email; mod env; mod error; mod fetcher;