Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(subscription): Support specified pk read log store #19274

Open
wants to merge 8 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions proto/batch_plan.proto
Original file line number Diff line number Diff line change
Expand Up @@ -134,6 +134,7 @@ message LogRowSeqScanNode {
common.BatchQueryEpoch old_epoch = 4;
common.BatchQueryEpoch new_epoch = 5;
bool ordered = 6;
repeated ScanRange scan_ranges = 7;
}

message InsertNode {
Expand Down
69 changes: 53 additions & 16 deletions src/batch/src/executor/log_row_seq_scan.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ use risingwave_common::bitmap::Bitmap;
use risingwave_common::catalog::{ColumnId, Field, Schema};
use risingwave_common::hash::VnodeCountCompat;
use risingwave_common::row::{Row, RowExt};
use risingwave_common::types::ScalarImpl;
use risingwave_common::types::{DataType, ScalarImpl};
use risingwave_hummock_sdk::{HummockReadEpoch, HummockVersionId};
use risingwave_pb::batch_plan::plan_node::NodeBody;
use risingwave_pb::common::{batch_query_epoch, BatchQueryEpoch};
Expand All @@ -33,7 +33,9 @@ use risingwave_storage::table::batch_table::storage_table::StorageTable;
use risingwave_storage::table::collect_data_chunk;
use risingwave_storage::{dispatch_state_store, StateStore};

use super::{BoxedDataChunkStream, BoxedExecutor, BoxedExecutorBuilder, Executor, ExecutorBuilder};
use super::{
BoxedDataChunkStream, BoxedExecutor, BoxedExecutorBuilder, Executor, ExecutorBuilder, ScanRange,
};
use crate::error::{BatchError, Result};
use crate::monitor::BatchMetrics;
use crate::task::BatchTaskContext;
Expand All @@ -53,6 +55,7 @@ pub struct LogRowSeqScanExecutor<S: StateStore> {
new_epoch: u64,
version_id: HummockVersionId,
ordered: bool,
scan_ranges: Vec<ScanRange>,
}

impl<S: StateStore> LogRowSeqScanExecutor<S> {
Expand All @@ -65,6 +68,7 @@ impl<S: StateStore> LogRowSeqScanExecutor<S> {
identity: String,
metrics: Option<BatchMetrics>,
ordered: bool,
scan_ranges: Vec<ScanRange>,
) -> Self {
let mut schema = table.schema().clone();
schema.fields.push(Field::with_name(
Expand All @@ -81,6 +85,7 @@ impl<S: StateStore> LogRowSeqScanExecutor<S> {
new_epoch,
version_id,
ordered,
scan_ranges,
}
}
}
Expand Down Expand Up @@ -139,6 +144,28 @@ impl BoxedExecutorBuilder for LogStoreRowSeqScanExecutorBuilder {
let old_epoch = old_epoch.epoch;
let new_epoch = new_epoch.epoch;

let scan_ranges = {
let scan_ranges = &log_store_seq_scan_node.scan_ranges;
if scan_ranges.is_empty() {
vec![ScanRange::full()]
} else {
scan_ranges
.iter()
.map(|scan_range| {
let pk_types = table_desc.pk.iter().map(|order| {
DataType::from(
table_desc.columns[order.column_index as usize]
.column_type
.as_ref()
.unwrap(),
)
});
ScanRange::new(scan_range.clone(), pk_types)
})
.try_collect()?
}
};

dispatch_state_store!(source.context().state_store(), state_store, {
let table = StorageTable::new_partial(state_store, column_ids, vnodes, table_desc);
Ok(Box::new(LogRowSeqScanExecutor::new(
Expand All @@ -150,6 +177,7 @@ impl BoxedExecutorBuilder for LogStoreRowSeqScanExecutorBuilder {
source.plan_node().get_identity().clone(),
metrics,
log_store_seq_scan_node.ordered,
scan_ranges,
)))
})
}
Expand Down Expand Up @@ -180,6 +208,7 @@ impl<S: StateStore> LogRowSeqScanExecutor<S> {
version_id,
schema,
ordered,
scan_ranges,
..
} = *self;
let table = std::sync::Arc::new(table);
Expand All @@ -191,20 +220,23 @@ impl<S: StateStore> LogRowSeqScanExecutor<S> {
// Range Scan
// WARN: DO NOT use `select` to execute range scans concurrently
// it can consume too much memory if there're too many ranges.
let stream = Self::execute_range(
table.clone(),
old_epoch,
new_epoch,
version_id,
chunk_size,
histogram,
Arc::new(schema.clone()),
ordered,
);
#[for_await]
for chunk in stream {
let chunk = chunk?;
yield chunk;
for range in scan_ranges {
let stream = Self::execute_range(
table.clone(),
old_epoch,
new_epoch,
version_id,
chunk_size,
histogram,
Arc::new(schema.clone()),
ordered,
range,
);
#[for_await]
for chunk in stream {
let chunk = chunk?;
yield chunk;
}
}
}

Expand All @@ -218,13 +250,18 @@ impl<S: StateStore> LogRowSeqScanExecutor<S> {
histogram: Option<impl Deref<Target = Histogram>>,
schema: Arc<Schema>,
ordered: bool,
scan_range: ScanRange,
) {
let pk_prefix = scan_range.pk_prefix.clone();
let range_bounds = scan_range.convert_to_range_bounds(table.clone());
// Range Scan.
let iter = table
.batch_iter_log_with_pk_bounds(
old_epoch,
HummockReadEpoch::BatchQueryCommitted(new_epoch, version_id),
ordered,
range_bounds,
pk_prefix,
)
.await?
.flat_map(|r| {
Expand Down
120 changes: 6 additions & 114 deletions src/batch/src/executor/row_seq_scan.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
use std::ops::{Bound, Deref};
use std::ops::Deref;
use std::sync::Arc;

use futures::{pin_mut, StreamExt};
Expand All @@ -23,18 +23,17 @@ use risingwave_common::bitmap::Bitmap;
use risingwave_common::catalog::{ColumnId, Schema};
use risingwave_common::hash::VnodeCountCompat;
use risingwave_common::row::{OwnedRow, Row};
use risingwave_common::types::{DataType, Datum};
use risingwave_common::types::DataType;
use risingwave_common::util::chunk_coalesce::DataChunkBuilder;
use risingwave_common::util::value_encoding::deserialize_datum;
use risingwave_pb::batch_plan::plan_node::NodeBody;
use risingwave_pb::batch_plan::{scan_range, PbScanRange};
use risingwave_pb::common::BatchQueryEpoch;
use risingwave_pb::plan_common::as_of::AsOfType;
use risingwave_pb::plan_common::{as_of, PbAsOf, StorageTableDesc};
use risingwave_storage::store::PrefetchOptions;
use risingwave_storage::table::batch_table::storage_table::StorageTable;
use risingwave_storage::{dispatch_state_store, StateStore};

use super::ScanRange;
use crate::error::{BatchError, Result};
use crate::executor::{
BoxedDataChunkStream, BoxedExecutor, BoxedExecutorBuilder, Executor, ExecutorBuilder,
Expand All @@ -59,15 +58,6 @@ pub struct RowSeqScanExecutor<S: StateStore> {
as_of: Option<AsOf>,
}

/// Range for batch scan.
pub struct ScanRange {
/// The prefix of the primary key.
pub pk_prefix: OwnedRow,

/// The range bounds of the next column.
pub next_col_bounds: (Bound<Datum>, Bound<Datum>),
}

#[derive(Debug, Clone, PartialEq, Eq, Hash)]
pub struct AsOf {
pub timestamp: i64,
Expand Down Expand Up @@ -98,64 +88,6 @@ impl From<&AsOf> for PbAsOf {
}
}

impl ScanRange {
/// Create a scan range from the prost representation.
pub fn new(
scan_range: PbScanRange,
mut pk_types: impl Iterator<Item = DataType>,
) -> Result<Self> {
let pk_prefix = OwnedRow::new(
scan_range
.eq_conds
.iter()
.map(|v| {
let ty = pk_types.next().unwrap();
deserialize_datum(v.as_slice(), &ty)
})
.try_collect()?,
);
if scan_range.lower_bound.is_none() && scan_range.upper_bound.is_none() {
return Ok(Self {
pk_prefix,
..Self::full()
});
}

let bound_ty = pk_types.next().unwrap();
let build_bound = |bound: &scan_range::Bound| -> Bound<Datum> {
let datum = deserialize_datum(bound.value.as_slice(), &bound_ty).unwrap();
if bound.inclusive {
Bound::Included(datum)
} else {
Bound::Excluded(datum)
}
};

let next_col_bounds: (Bound<Datum>, Bound<Datum>) = match (
scan_range.lower_bound.as_ref(),
scan_range.upper_bound.as_ref(),
) {
(Some(lb), Some(ub)) => (build_bound(lb), build_bound(ub)),
(None, Some(ub)) => (Bound::Unbounded, build_bound(ub)),
(Some(lb), None) => (build_bound(lb), Bound::Unbounded),
(None, None) => unreachable!(),
};

Ok(Self {
pk_prefix,
next_col_bounds,
})
}

/// Create a scan range for full table scan.
pub fn full() -> Self {
Self {
pk_prefix: OwnedRow::default(),
next_col_bounds: (Bound::Unbounded, Bound::Unbounded),
}
}
}

impl<S: StateStore> RowSeqScanExecutor<S> {
pub fn new(
table: StorageTable<S>,
Expand Down Expand Up @@ -419,55 +351,15 @@ impl<S: StateStore> RowSeqScanExecutor<S> {
limit: Option<u64>,
histogram: Option<impl Deref<Target = Histogram>>,
) {
let ScanRange {
pk_prefix,
next_col_bounds,
} = scan_range;

let order_type = table.pk_serializer().get_order_types()[pk_prefix.len()];
let (start_bound, end_bound) = if order_type.is_ascending() {
(next_col_bounds.0, next_col_bounds.1)
} else {
(next_col_bounds.1, next_col_bounds.0)
};

let start_bound_is_bounded = !matches!(start_bound, Bound::Unbounded);
let end_bound_is_bounded = !matches!(end_bound, Bound::Unbounded);

let pk_prefix = scan_range.pk_prefix.clone();
let range_bounds = scan_range.convert_to_range_bounds(table.clone());
// Range Scan.
assert!(pk_prefix.len() < table.pk_indices().len());
let iter = table
.batch_chunk_iter_with_pk_bounds(
epoch.into(),
&pk_prefix,
(
match start_bound {
Bound::Unbounded => {
if end_bound_is_bounded && order_type.nulls_are_first() {
// `NULL`s are at the start bound side, we should exclude them to meet SQL semantics.
Bound::Excluded(OwnedRow::new(vec![None]))
} else {
// Both start and end are unbounded, so we need to select all rows.
Bound::Unbounded
}
}
Bound::Included(x) => Bound::Included(OwnedRow::new(vec![x])),
Bound::Excluded(x) => Bound::Excluded(OwnedRow::new(vec![x])),
},
match end_bound {
Bound::Unbounded => {
if start_bound_is_bounded && order_type.nulls_are_last() {
// `NULL`s are at the end bound side, we should exclude them to meet SQL semantics.
Bound::Excluded(OwnedRow::new(vec![None]))
} else {
// Both start and end are unbounded, so we need to select all rows.
Bound::Unbounded
}
}
Bound::Included(x) => Bound::Included(OwnedRow::new(vec![x])),
Bound::Excluded(x) => Bound::Excluded(OwnedRow::new(vec![x])),
},
),
range_bounds,
ordered,
chunk_size,
PrefetchOptions::new(limit.is_none(), true),
Expand Down
Loading
Loading