-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathmain.rs
152 lines (130 loc) · 5.26 KB
/
main.rs
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
#[cfg(not(debug_assertions))]
use lambda_runtime::handler_fn;
mod config;
mod elastic;
mod handler;
mod html;
pub(crate) type Error = Box<dyn std::error::Error + Send + Sync + 'static>;
#[tokio::main]
async fn main() -> Result<(), Error> {
// init the logger with the specified level
let tsub = tracing_subscriber::fmt()
.with_max_level(tracing::Level::INFO)
.with_ansi(false);
// time is not needed in CloudWatch, but is useful in console
#[cfg(not(debug_assertions))]
let tsub = tsub.without_time();
tsub.init();
#[cfg(debug_assertions)]
return proxy::run().await;
// call the actual handler of the request
#[cfg(not(debug_assertions))]
return lambda_runtime::run(handler_fn(handler::my_handler)).await;
}
/// This module is only used for local debugging via SQS and should
/// not be deployed to Lambda.
#[cfg(debug_assertions)]
mod proxy {
use lambda_runtime::Context;
use rusoto_core::region::Region;
use rusoto_sqs::{
DeleteMessageRequest, ReceiveMessageRequest, SendMessageRequest, Sqs, SqsClient,
};
use serde::Deserialize;
use serde_json::Value;
use tracing::info;
pub(crate) type Error = Box<dyn std::error::Error + Send + Sync + 'static>;
const AWS_REGION: Region = Region::UsEast1; // replace with your preferred region
const REQUEST_QUEUE_URL_ENV: &str = "STM_HTML_LAMBDA_PROXY_REQ"; // add your queue URL there
const RESPONSE_QUEUE_URL_ENV: &str = "STM_HTML_LAMBDA_PROXY_RESP"; // add your queue URL there
#[derive(Deserialize, Debug)]
struct RequestPayload {
pub event: Value,
pub ctx: Context,
}
pub(crate) async fn run() -> Result<(), Error> {
#[cfg(debug_assertions)]
loop {
// get event and context details from the queue
let (payload, receipt_handle) = get_input().await?;
info!("New msg");
// invoke the handler
let response = crate::handler::my_handler(payload.event, payload.ctx).await?;
// send back the response and delete the message from the queue
send_output(response, receipt_handle).await?;
info!("Msg sent");
}
}
/// Read a message from the queue and return the payload as Lambda structures
async fn get_input() -> Result<(RequestPayload, String), Error> {
let client = SqsClient::new(AWS_REGION);
// start listening to the response
loop {
let resp = client
.receive_message(ReceiveMessageRequest {
max_number_of_messages: Some(1),
queue_url: std::env::var(REQUEST_QUEUE_URL_ENV)
.expect(&format!(
"Missing {} env var with the SQS request queue URL",
REQUEST_QUEUE_URL_ENV
))
.trim()
.to_string(),
wait_time_seconds: Some(20),
..Default::default()
})
.await?;
// wait until a message arrives or the function is killed by AWS
if resp.messages.is_none() {
continue;
}
// an empty list returns when the queue wait time expires
let msgs = resp.messages.expect("Failed to get list of messages");
if msgs.len() == 0 {
continue;
}
// the message receipt is needed to delete the message from the queue later
let receipt_handle = msgs[0]
.receipt_handle
.as_ref()
.expect("Failed to get msg receipt")
.to_owned();
// convert JSON encoded body into event + ctx structures as defined by Lambda Runtime
let body = msgs[0].body.as_ref().expect("Failed to get message body");
let payload: RequestPayload =
serde_json::from_str(body).expect("Failed to deserialize msg body");
return Ok((payload, receipt_handle));
}
}
/// Send back the response and delete the message from the queue.
async fn send_output(response: Value, receipt_handle: String) -> Result<(), Error> {
let client = SqsClient::new(AWS_REGION);
client
.send_message(SendMessageRequest {
message_body: response.to_string(),
queue_url: std::env::var(RESPONSE_QUEUE_URL_ENV)
.expect(&format!(
"Missing {} env var with the SQS response queue URL",
RESPONSE_QUEUE_URL_ENV
))
.trim()
.to_string(),
..Default::default()
})
.await?;
// delete the request msg from the queue so it cannot be replayed again
client
.delete_message(DeleteMessageRequest {
queue_url: std::env::var(REQUEST_QUEUE_URL_ENV)
.expect(&format!(
"Missing {} env var with the SQS request queue URL",
REQUEST_QUEUE_URL_ENV
))
.trim()
.to_string(),
receipt_handle,
})
.await?;
Ok(())
}
}