diff --git a/quickwit/quickwit-ingest/src/ingest_v2/fetch.rs b/quickwit/quickwit-ingest/src/ingest_v2/fetch.rs index 6511a13ecb7..0056f9d488e 100644 --- a/quickwit/quickwit-ingest/src/ingest_v2/fetch.rs +++ b/quickwit/quickwit-ingest/src/ingest_v2/fetch.rs @@ -145,7 +145,10 @@ impl FetchStreamTask { break; }; for Record { payload, .. } in mrecords { - if mrecord_buffer.len() + payload.len() > mrecord_buffer.capacity() { + // Accept at least one message + if !mrecord_buffer.is_empty() + && (mrecord_buffer.len() + payload.len() > mrecord_buffer.capacity()) + { has_drained_queue = false; break; } @@ -1199,6 +1202,81 @@ pub(super) mod tests { ); } + #[tokio::test] + async fn test_fetch_task_batch_num_bytes_less_than_record_payload() { + let tempdir = tempfile::tempdir().unwrap(); + let mrecordlog = Arc::new(RwLock::new(Some( + MultiRecordLogAsync::open(tempdir.path()).await.unwrap(), + ))); + let client_id = "test-client".to_string(); + let index_uid: IndexUid = IndexUid::for_test("test-index", 0); + let source_id = "test-source".to_string(); + let shard_id = ShardId::from(1); + let queue_id = queue_id(&index_uid, &source_id, &shard_id); + + let open_fetch_stream_request = OpenFetchStreamRequest { + client_id: client_id.clone(), + index_uid: Some(index_uid.clone()), + source_id: source_id.clone(), + shard_id: Some(shard_id.clone()), + from_position_exclusive: Some(Position::Beginning), + }; + let (shard_status_tx, shard_status_rx) = watch::channel(ShardStatus::default()); + let (mut fetch_stream, _fetch_task_handle) = FetchStreamTask::spawn( + open_fetch_stream_request, + mrecordlog.clone(), + shard_status_rx, + 10, //< we request batch larger than 10 bytes. + ); + + let mut mrecordlog_guard = mrecordlog.write().await; + + mrecordlog_guard + .as_mut() + .unwrap() + .create_queue(&queue_id) + .await + .unwrap(); + + mrecordlog_guard + .as_mut() + .unwrap() + .append_records( + &queue_id, + None, + // This doc is longer than 10 bytes. + std::iter::once(MRecord::new_doc("test-doc-foo").encode()), + ) + .await + .unwrap(); + + drop(mrecordlog_guard); + + let shard_status = (ShardState::Open, Position::offset(1u64)); + shard_status_tx.send(shard_status).unwrap(); + + let fetch_message = timeout(Duration::from_millis(100), fetch_stream.next()) + .await + .unwrap() + .unwrap() + .unwrap(); + + let fetch_payload = into_fetch_payload(fetch_message); + + assert_eq!( + fetch_payload + .mrecord_batch + .as_ref() + .unwrap() + .mrecord_lengths, + [14] + ); + assert_eq!( + fetch_payload.mrecord_batch.as_ref().unwrap().mrecord_buffer, + "\0\0test-doc-foo" + ); + } + #[test] fn test_select_preferred_and_failover_ingesters() { let self_node_id: NodeId = "test-ingester-0".into();