Skip to content

Commit

Permalink
Implement router and ingester (#3730)
Browse files Browse the repository at this point in the history
  • Loading branch information
guilload authored Aug 14, 2023
1 parent ba38252 commit c9472e4
Show file tree
Hide file tree
Showing 79 changed files with 8,478 additions and 647 deletions.
22 changes: 11 additions & 11 deletions quickwit/Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion quickwit/quickwit-cli/src/index.rs
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ use quickwit_config::{ConfigFormat, IndexConfig};
use quickwit_indexing::models::IndexingStatistics;
use quickwit_indexing::IndexingPipeline;
use quickwit_metastore::{IndexMetadata, Split, SplitState};
use quickwit_proto::{SortField, SortOrder};
use quickwit_proto::search::{SortField, SortOrder};
use quickwit_rest_client::models::IngestSource;
use quickwit_rest_client::rest_client::{CommitType, IngestEvent};
use quickwit_search::SearchResponseRest;
Expand Down
2 changes: 1 addition & 1 deletion quickwit/quickwit-cli/src/tool.rs
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ use quickwit_indexing::models::{
};
use quickwit_indexing::IndexingPipeline;
use quickwit_metastore::Metastore;
use quickwit_proto::SearchResponse;
use quickwit_proto::search::SearchResponse;
use quickwit_search::{single_node_search, SearchResponseRest};
use quickwit_serve::{
search_request_from_api_request, BodyFormat, SearchRequestQueryString, SortBy,
Expand Down
6 changes: 4 additions & 2 deletions quickwit/quickwit-codegen/src/codegen.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,8 @@ use prost_build::{Method, Service, ServiceGenerator};
use quote::{quote, ToTokens};
use syn::Ident;

use crate::ProstConfig;

pub struct Codegen;

impl Codegen {
Expand All @@ -39,7 +41,7 @@ impl Codegen {
result_type_path,
error_type_path,
includes,
prost_build::Config::default(),
ProstConfig::default(),
)
}

Expand All @@ -49,7 +51,7 @@ impl Codegen {
result_type_path: &str,
error_type_path: &str,
includes: &[&str],
mut prost_config: prost_build::Config,
mut prost_config: ProstConfig,
) -> anyhow::Result<()> {
let service_generator = Box::new(QuickwitServiceGenerator::new(
result_type_path,
Expand Down
1 change: 1 addition & 0 deletions quickwit/quickwit-codegen/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,3 +20,4 @@
mod codegen;

pub use codegen::Codegen;
pub use prost_build::Config as ProstConfig;
23 changes: 12 additions & 11 deletions quickwit/quickwit-common/src/stream_utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ use std::any::TypeId;
use std::fmt;
use std::pin::Pin;

use futures::{Stream, StreamExt, TryStreamExt};
use futures::{stream, Stream, TryStreamExt};
use tokio::sync::mpsc;
use tokio_stream::wrappers::{ReceiverStream, UnboundedReceiverStream};
use tracing::warn;
Expand Down Expand Up @@ -123,19 +123,20 @@ impl<T> From<tonic::Streaming<T>> for ServiceStream<T>
where T: Send + 'static
{
fn from(streaming: tonic::Streaming<T>) -> Self {
let ok_streaming = streaming.filter_map(|message| {
Box::pin(async move {
message
.map_err(|status| {
warn!(status=?status, "gRPC transport error.");
status
})
.ok()
let message_stream = stream::unfold(streaming, |mut streaming| {
Box::pin(async {
match streaming.message().await {
Ok(Some(message)) => Some((message, streaming)),
Ok(None) => None,
Err(error) => {
warn!(error=?error, "gRPC transport error.");
None
}
}
})
});

Self {
inner: Box::pin(ok_streaming),
inner: Box::pin(message_stream),
}
}
}
4 changes: 1 addition & 3 deletions quickwit/quickwit-common/src/tower/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ pub use box_service::BoxService;
pub use buffer::{Buffer, BufferError, BufferLayer};
pub use change::Change;
pub use estimate_rate::{EstimateRate, EstimateRateLayer};
use futures::{Future, Stream};
use futures::Future;
pub use metrics::{PrometheusMetrics, PrometheusMetricsLayer};
pub use pool::Pool;
pub use rate::{ConstantRate, Rate};
Expand All @@ -51,8 +51,6 @@ pub type BoxFuture<T, E> = Pin<Box<dyn Future<Output = Result<T, E>> + Send + 's

pub type BoxFutureInfaillible<T> = Pin<Box<dyn Future<Output = T> + Send + 'static>>;

pub type BoxStream<T> = Pin<Box<dyn Stream<Item = T> + Send + Unpin + 'static>>;

pub trait Cost {
fn cost(&self) -> u64;
}
3 changes: 2 additions & 1 deletion quickwit/quickwit-common/src/tower/transport.rs
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,8 @@ use tower::discover::Change as TowerChange;
use tower::load::{CompleteOnResponse, PendingRequestsDiscover};
use tower::{BoxError, Service, ServiceExt};

use super::{BoxFuture, BoxStream, Change};
use super::{BoxFuture, Change};
use crate::BoxStream;

// Transforms a boxed stream of `Change<K, Channel>` into a stream of `Result<TowerChange<K,
// Channel>, Infallible>>` while keeping track of the number of connections.
Expand Down
33 changes: 31 additions & 2 deletions quickwit/quickwit-control-plane/src/control_plane.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,10 @@
use anyhow::Context;
use async_trait::async_trait;
use quickwit_actors::{Actor, ActorContext, ActorExitStatus, Handler, Mailbox};
use quickwit_proto::control_plane::{NotifyIndexChangeRequest, NotifyIndexChangeResponse};
use quickwit_proto::control_plane::{
CloseShardsRequest, CloseShardsResponse, ControlPlaneResult, GetOpenShardsRequest,
GetOpenShardsResponse, NotifyIndexChangeRequest, NotifyIndexChangeResponse,
};
use tracing::debug;

use crate::scheduler::IndexingScheduler;
Expand Down Expand Up @@ -51,7 +54,7 @@ impl ControlPlane {

#[async_trait]
impl Handler<NotifyIndexChangeRequest> for ControlPlane {
type Reply = quickwit_proto::control_plane::Result<NotifyIndexChangeResponse>;
type Reply = ControlPlaneResult<NotifyIndexChangeResponse>;

async fn handle(
&mut self,
Expand All @@ -66,3 +69,29 @@ impl Handler<NotifyIndexChangeRequest> for ControlPlane {
Ok(Ok(NotifyIndexChangeResponse {}))
}
}

#[async_trait]
impl Handler<GetOpenShardsRequest> for ControlPlane {
type Reply = ControlPlaneResult<GetOpenShardsResponse>;

async fn handle(
&mut self,
_request: GetOpenShardsRequest,
_: &ActorContext<Self>,
) -> Result<Self::Reply, ActorExitStatus> {
unimplemented!()
}
}

#[async_trait]
impl Handler<CloseShardsRequest> for ControlPlane {
type Reply = ControlPlaneResult<CloseShardsResponse>;

async fn handle(
&mut self,
_request: CloseShardsRequest,
_: &ActorContext<Self>,
) -> Result<Self::Reply, ActorExitStatus> {
unimplemented!()
}
}
6 changes: 4 additions & 2 deletions quickwit/quickwit-control-plane/src/scheduler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,9 @@ use itertools::Itertools;
use quickwit_actors::{Actor, ActorContext, ActorExitStatus, Handler};
use quickwit_config::SourceConfig;
use quickwit_metastore::Metastore;
use quickwit_proto::control_plane::{NotifyIndexChangeRequest, NotifyIndexChangeResponse};
use quickwit_proto::control_plane::{
ControlPlaneResult, NotifyIndexChangeRequest, NotifyIndexChangeResponse,
};
use quickwit_proto::indexing::{ApplyIndexingPlanRequest, IndexingService, IndexingTask};
use serde::Serialize;
use tracing::{debug, error, info, warn};
Expand Down Expand Up @@ -298,7 +300,7 @@ impl IndexingScheduler {

#[async_trait]
impl Handler<NotifyIndexChangeRequest> for IndexingScheduler {
type Reply = quickwit_proto::control_plane::Result<NotifyIndexChangeResponse>;
type Reply = ControlPlaneResult<NotifyIndexChangeResponse>;

async fn handle(
&mut self,
Expand Down
4 changes: 3 additions & 1 deletion quickwit/quickwit-ingest/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ mockall = { workspace = true, optional = true }
mrecordlog = { workspace = true }
once_cell = { workspace = true }
prost = { workspace = true }
rand = { workspace = true }
serde = { workspace = true }
serde_json = { workspace = true }
thiserror = { workspace = true }
Expand All @@ -41,10 +42,11 @@ rand_distr = { workspace = true }
tempfile = { workspace = true }

quickwit-actors = { workspace = true, features = ["testsuite"] }
quickwit-common = { workspace = true, features = ["testsuite"] }
quickwit-proto = { workspace = true, features = ["testsuite"] }

[build-dependencies]
quickwit-codegen = { workspace = true }
prost-build = { workspace = true }

[features]
testsuite = ["mockall"]
10 changes: 6 additions & 4 deletions quickwit/quickwit-ingest/build.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,18 +17,20 @@
// You should have received a copy of the GNU Affero General Public License
// along with this program. If not, see <http://www.gnu.org/licenses/>.

use quickwit_codegen::Codegen;
use quickwit_codegen::{Codegen, ProstConfig};

fn main() {
let mut config = prost_build::Config::default();
config.bytes(["DocBatch.doc_buffer"]);
// Legacy ingest codegen
let mut prost_config = ProstConfig::default();
prost_config.bytes(["DocBatch.doc_buffer"]);

Codegen::run_with_config(
&["src/ingest_service.proto"],
"src/codegen/",
"crate::Result",
"crate::IngestServiceError",
&[],
config,
prost_config,
)
.unwrap();
}
2 changes: 1 addition & 1 deletion quickwit/quickwit-ingest/src/doc_batch.rs
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,7 @@ where T: Buf + Default
}

/// Copies the command to the end of bytes::BufMut while returning the number of bytes copied
pub fn write(self, buf: &mut impl BufMut) -> usize {
pub fn write(self, mut buf: impl BufMut) -> usize {
let self_buf = self.into_buf();
let len = self_buf.remaining();
buf.put(self_buf);
Expand Down
Loading

0 comments on commit c9472e4

Please sign in to comment.