Skip to content

Commit

Permalink
Improvements to error handling
Browse files Browse the repository at this point in the history
  • Loading branch information
rdettai committed Jul 12, 2024
1 parent c963ad4 commit f002a6f
Show file tree
Hide file tree
Showing 5 changed files with 132 additions and 47 deletions.
2 changes: 0 additions & 2 deletions docker-compose.yml
Original file line number Diff line number Diff line change
Expand Up @@ -39,8 +39,6 @@ services:
environment:
SERVICES: kinesis,s3,sqs
PERSISTENCE: 1
# avoid using the localstack.cloud domain in the generated queue URLs
SQS_ENDPOINT_STRATEGY: path
volumes:
- .localstack:/etc/localstack/init/ready.d
- localstack_data:/var/lib/localstack
Expand Down
14 changes: 14 additions & 0 deletions quickwit/quickwit-config/src/source_config/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -934,6 +934,20 @@ mod tests {
"Only one notification can be specified for now"
);
}
{
let json = r#"
{
"notifications": [
{
"queue_url": "https://sqs.us-east-1.amazonaws.com/123456789012/queue",
"message_type": "s3_notification"
}
]
}
"#;
let error = serde_json::from_str::<FileSourceParams>(json).unwrap_err();
assert!(error.to_string().contains("missing field `type`"));
}
}

#[test]
Expand Down
43 changes: 26 additions & 17 deletions quickwit/quickwit-indexing/src/source/queue_sources/coordinator.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,17 +23,18 @@ use std::time::Duration;

use itertools::Itertools;
use quickwit_actors::{ActorExitStatus, Mailbox};
use quickwit_common::rate_limited_warn;
use quickwit_common::rate_limited_error;
use quickwit_config::{FileSourceMessageType, FileSourceSqs};
use quickwit_metastore::checkpoint::SourceCheckpoint;
use quickwit_proto::indexing::IndexingPipelineId;
use quickwit_proto::metastore::SourceType;
use quickwit_storage::StorageResolver;
use serde::Serialize;
use tracing::info;
use ulid::Ulid;

use super::local_state::QueueLocalState;
use super::message::{MessageType, ReadyMessage};
use super::message::{MessageType, PreProcessingError, ReadyMessage};
use super::shared_state::{checkpoint_messages, QueueSharedState};
use super::visibility::{spawn_visibility_task, VisibilitySettings};
use super::{Queue, QueueReceiver};
Expand Down Expand Up @@ -156,23 +157,30 @@ impl QueueCoordinator {
.receive(1, self.visible_settings.deadline_for_receive)
.await?;

let mut invalid_messages = Vec::new();
let preprocessed_messages = raw_messages
.into_iter()
.map(|msg| msg.pre_process(self.message_type))
.filter_map(|res| res.map_err(|err| invalid_messages.push(err)).ok())
.collect::<Vec<_>>();
if !invalid_messages.is_empty() {
self.observable_state.num_messages_failed_preprocessing +=
invalid_messages.len() as u64;
rate_limited_warn!(
let mut format_errors = Vec::new();
let mut discardable_ack_ids = Vec::new();
let mut preprocessed_messages = Vec::new();
for message in raw_messages {
match message.pre_process(self.message_type) {
Ok(preprocessed_message) => preprocessed_messages.push(preprocessed_message),
Err(PreProcessingError::UnexpectedFormat(err)) => format_errors.push(err),
Err(PreProcessingError::Discardable { ack_id, reason }) => {
info!(reason, "acknowledge message without processing");
discardable_ack_ids.push(ack_id)
}
}
}
if !format_errors.is_empty() {
self.observable_state.num_messages_failed_preprocessing += format_errors.len() as u64;
rate_limited_error!(
limit_per_min = 10,
count = invalid_messages.len(),
last_err = ?invalid_messages.last().unwrap(),
"invalid messages skipped, use a dead letter queue to limit retries"
count = format_errors.len(),
last_err = ?format_errors.last().unwrap(),
"invalid message(s) not processed, use a dead letter queue to limit retries"
);
}
if preprocessed_messages.is_empty() {
self.queue.acknowledge(&discardable_ack_ids).await?;
return Ok(());
}

Expand Down Expand Up @@ -218,10 +226,11 @@ impl QueueCoordinator {
self.local_state.set_ready_for_read(ready_messages);

// Acknowledge messages that already have been processed
let ack_ids = already_completed
let mut ack_ids = already_completed
.iter()
.map(|msg| msg.metadata.ack_id.clone())
.collect::<Vec<_>>();
ack_ids.append(&mut discardable_ack_ids);
self.queue.acknowledge(&ack_ids).await?;

Ok(())
Expand Down Expand Up @@ -258,7 +267,7 @@ impl QueueCoordinator {
}
Err(err) => {
self.observable_state.num_messages_failed_opening += 1;
rate_limited_warn!(
rate_limited_error!(
limit_per_min = 5,
err = ?err,
"failed to start processing message"
Expand Down
89 changes: 75 additions & 14 deletions quickwit/quickwit-indexing/src/source/queue_sources/message.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
// You should have received a copy of the GNU Affero General Public License
// along with this program. If not, see <http://www.gnu.org/licenses/>.

use core::fmt;
use std::io::read_to_string;
use std::str::FromStr;
use std::time::Instant;
Expand All @@ -27,6 +28,7 @@ use quickwit_metastore::checkpoint::PartitionId;
use quickwit_proto::types::Position;
use quickwit_storage::{OwnedBytes, StorageResolver};
use serde_json::Value;
use thiserror::Error;

use super::visibility::VisibilityTaskHandle;
use crate::source::doc_file_reader::ObjectUriBatchReader;
Expand Down Expand Up @@ -61,14 +63,39 @@ pub struct RawMessage {
pub payload: OwnedBytes,
}

impl fmt::Debug for RawMessage {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
f.debug_struct("RawMessage")
.field("metadata", &self.metadata)
.field("payload", &"<bytes>")
.finish()
}
}

#[derive(Error, Debug)]
pub enum PreProcessingError {
/// The message can be acknowledged without processing
#[error("skippable message: {reason}")]
Discardable {
reason: &'static str,
ack_id: String,
},
#[error("unexpected message format: {0}")]
UnexpectedFormat(#[from] anyhow::Error),
}

impl RawMessage {
pub fn pre_process(self, message_type: MessageType) -> anyhow::Result<PreProcessedMessage> {
pub fn pre_process(
self,
message_type: MessageType,
) -> Result<PreProcessedMessage, PreProcessingError> {
let payload = match message_type {
MessageType::S3Notification => {
PreProcessedPayload::ObjectUri(uri_from_s3_notification(&self.payload)?)
}
MessageType::S3Notification => PreProcessedPayload::ObjectUri(
uri_from_s3_notification(&self.payload, &self.metadata.ack_id)?,
),
MessageType::RawUri => {
PreProcessedPayload::ObjectUri(Uri::from_str(&read_to_string(self.payload)?)?)
let payload_str = read_to_string(self.payload).context("failed to read payload")?;
PreProcessedPayload::ObjectUri(Uri::from_str(&payload_str)?)
}
MessageType::RawData => unimplemented!(),
};
Expand Down Expand Up @@ -108,15 +135,22 @@ impl PreProcessedMessage {
}
}

fn uri_from_s3_notification(message: &OwnedBytes) -> anyhow::Result<Uri> {
let value: Value = serde_json::from_slice(message.as_slice())?;
fn uri_from_s3_notification(message: &OwnedBytes, ack_id: &str) -> Result<Uri, PreProcessingError> {
let value: Value =
serde_json::from_slice(message.as_slice()).context("invalid JSON message")?;
if matches!(value["Event"].as_str(), Some("s3:TestEvent")) {
return Err(PreProcessingError::Discardable {
reason: "S3 test event",
ack_id: ack_id.to_string(),
});
}
let key = value["Records"][0]["s3"]["object"]["key"]
.as_str()
.context("Invalid S3 notification")?;
.context("invalid S3 notification: Records[0].s3.object.key not found")?;
let bucket = value["Records"][0]["s3"]["bucket"]["name"]
.as_str()
.context("Invalid S3 notification")?;
Uri::from_str(&format!("s3://{}/{}", bucket, key))
.context("invalid S3 notification: Records[0].s3.bucket.name not found".to_string())?;
Uri::from_str(&format!("s3://{}/{}", bucket, key)).map_err(|e| e.into())
}

/// A message for which we know as much of the global processing status as
Expand Down Expand Up @@ -172,7 +206,7 @@ mod tests {
use super::*;

#[test]
fn test_uri_from_s3_notification() {
fn test_uri_from_s3_notification_valid() {
let test_message = r#"
{
"Records": [
Expand Down Expand Up @@ -214,10 +248,13 @@ mod tests {
]
}"#;
let actual_uri =
uri_from_s3_notification(&OwnedBytes::new(test_message.as_bytes())).unwrap();
uri_from_s3_notification(&OwnedBytes::new(test_message.as_bytes()), "myackid").unwrap();
let expected_uri = Uri::from_str("s3://mybucket/logs.json").unwrap();
assert_eq!(actual_uri, expected_uri);
}

#[test]
fn test_uri_from_s3_notification_invalid() {
let invalid_message = r#"{
"Records": [
{
Expand All @@ -229,7 +266,31 @@ mod tests {
}
]
}"#;
let result = uri_from_s3_notification(&OwnedBytes::new(invalid_message.as_bytes()));
assert!(result.is_err());
let result =
uri_from_s3_notification(&OwnedBytes::new(invalid_message.as_bytes()), "myackid");
assert!(matches!(
result,
Err(PreProcessingError::UnexpectedFormat(_))
));
}

#[test]
fn test_uri_from_s3_notification_skippable() {
let invalid_message = r#"{
"Service":"Amazon S3",
"Event":"s3:TestEvent",
"Time":"2014-10-13T15:57:02.089Z",
"Bucket":"bucketname",
"RequestId":"5582815E1AEA5ADF",
"HostId":"8cLeGAmw098X5cv4Zkwcmo8vvZa3eH3eKxsPzbB9wrR+YstdA6Knx4Ip8EXAMPLE"
}"#;
let result =
uri_from_s3_notification(&OwnedBytes::new(invalid_message.as_bytes()), "myackid");
if let Err(PreProcessingError::Discardable { reason, ack_id }) = result {
assert_eq!(reason, "S3 test event");
assert_eq!(ack_id, "myackid");
} else {
panic!("Expected skippable error");
}
}
}
31 changes: 17 additions & 14 deletions quickwit/quickwit-indexing/src/source/queue_sources/sqs_queue.rs
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,7 @@ impl Queue for SqsQueue {
// state that it starts when the message is returned.
let initial_deadline = Instant::now() + suggested_deadline;
let clamped_max_messages = std::cmp::min(max_messages, 10) as i32;
let receive_res = aws_retry(&self.receive_retries, || async {
let receive_output = aws_retry(&self.receive_retries, || async {
self.sqs_client
.receive_message()
.queue_url(&self.queue_url)
Expand All @@ -83,19 +83,7 @@ impl Queue for SqsQueue {
.send()
.await
})
.await;

let receive_output = match receive_res {
Ok(output) => output,
Err(err) => {
rate_limited_error!(
limit_per_min = 10,
first_err = ?err,
"failed to receive messages from SQS",
);
return Ok(Vec::new());
}
};
.await?;

receive_output
.messages
Expand Down Expand Up @@ -465,4 +453,19 @@ mod localstack_tests {
.unwrap();
assert_eq!(in_flight_count, 0);
}

#[tokio::test]
async fn test_receive_wrong_queue() {
let client = test_helpers::get_localstack_sqs_client().await.unwrap();
let queue_url = test_helpers::create_queue(&client, "test-receive-existing-msg").await;
let bad_queue_url = format!("{}wrong", queue_url);
let queue = Arc::new(SqsQueue::try_new(bad_queue_url).await.unwrap());
tokio::time::timeout(
Duration::from_millis(500),
queue.clone().receive(5, Duration::from_secs(60)),
)
.await
.unwrap()
.unwrap_err();
}
}

0 comments on commit f002a6f

Please sign in to comment.