diff --git a/proto/catalog.proto b/proto/catalog.proto index f792eccc0cab..8eeb75843244 100644 --- a/proto/catalog.proto +++ b/proto/catalog.proto @@ -419,13 +419,13 @@ message Table { // Total vnode count of the table. // - // Can be unset if... - // - The catalog is generated by the frontend and not yet persisted, this is - // because the vnode count of each fragment (then internal tables) is determined - // by the meta service. - // - The table is created in older versions where variable vnode count is not + // Use `VnodeCountCompat::vnode_count` to access it. + // + // - Can be unset if the table is created in older versions where variable vnode count is not // supported, in which case a default value of 256 should be used. - // Use `VnodeCountCompat::vnode_count` to access it. + // - Can be placeholder value `Some(0)` if the catalog is generated by the frontend and the + // corresponding job is still in `Creating` status, in which case calling `vnode_count` + // will panic. // // Please note that this field is not intended to describe the expected vnode count // for a streaming job. Instead, refer to `stream_plan.StreamFragmentGraph.max_parallelism`. diff --git a/src/common/src/catalog/physical_table.rs b/src/common/src/catalog/physical_table.rs index df1b30fd41ee..680f23fb0dc1 100644 --- a/src/common/src/catalog/physical_table.rs +++ b/src/common/src/catalog/physical_table.rs @@ -21,7 +21,7 @@ use risingwave_pb::plan_common::StorageTableDesc; use super::{ColumnDesc, ColumnId, TableId}; use crate::catalog::get_dist_key_in_pk_indices; -use crate::hash::VnodeCountCompat; +use crate::hash::{VnodeCount, VnodeCountCompat}; use crate::util::sort_util::ColumnOrder; /// Includes necessary information for compute node to access data of the table. @@ -117,7 +117,7 @@ impl TableDesc { versioned: self.versioned, stream_key: self.stream_key.iter().map(|&x| x as u32).collect(), vnode_col_idx_in_pk, - maybe_vnode_count: Some(self.vnode_count as u32), + maybe_vnode_count: VnodeCount::set(self.vnode_count).to_protobuf(), }) } diff --git a/src/common/src/hash/consistent_hash/compat.rs b/src/common/src/hash/consistent_hash/compat.rs index 0c86fbb12bcd..c659da2fc605 100644 --- a/src/common/src/hash/consistent_hash/compat.rs +++ b/src/common/src/hash/consistent_hash/compat.rs @@ -12,15 +12,87 @@ // See the License for the specific language governing permissions and // limitations under the License. +use std::num::NonZeroUsize; + use super::vnode::VirtualNode; +/// The different cases of `maybe_vnode_count` field in the protobuf message. +#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, Default)] +pub enum VnodeCount { + /// The field is a placeholder and has to be filled first before using it. + #[default] + Placeholder, + /// The field is set to a specific value. + Set(NonZeroUsize), + /// The field is unset because it's persisted in an older version. + Compat, +} + +impl VnodeCount { + /// Creates a `VnodeCount` set to the given value. + pub fn set(v: impl TryInto + Copy + std::fmt::Debug) -> Self { + let v = (v.try_into().ok()) + .filter(|v| (1..=VirtualNode::MAX_COUNT).contains(v)) + .unwrap_or_else(|| panic!("invalid vnode count {v:?}")); + + VnodeCount::Set(NonZeroUsize::new(v).unwrap()) + } + + /// Creates a `VnodeCount` set to the value for testing. + /// + /// Equivalent to `VnodeCount::set(VirtualNode::COUNT_FOR_TEST)`. + pub fn for_test() -> Self { + Self::set(VirtualNode::COUNT_FOR_TEST) + } + + /// Converts to protobuf representation for `maybe_vnode_count`. + pub fn to_protobuf(self) -> Option { + match self { + VnodeCount::Placeholder => Some(0), + VnodeCount::Set(v) => Some(v.get() as _), + VnodeCount::Compat => None, + } + } + + /// Converts from protobuf representation of `maybe_vnode_count`. + pub fn from_protobuf(v: Option) -> Self { + match v { + Some(0) => VnodeCount::Placeholder, + Some(v) => VnodeCount::set(v as usize), + None => VnodeCount::Compat, + } + } + + /// Returns the value of the vnode count, or `None` if it's a placeholder. + pub fn value_opt(self) -> Option { + match self { + VnodeCount::Placeholder => None, + VnodeCount::Set(v) => Some(v.get()), + VnodeCount::Compat => Some(VirtualNode::COUNT_FOR_COMPAT), + } + } + + /// Returns the value of the vnode count. Panics if it's a placeholder. + pub fn value(self) -> usize { + self.value_opt() + .expect("vnode count is a placeholder that must be filled by the meta service first") + } +} + /// A trait for accessing the vnode count field with backward compatibility. pub trait VnodeCountCompat { + /// Get the `maybe_vnode_count` field. + fn vnode_count_inner(&self) -> VnodeCount; + /// Returns the vnode count, or [`VirtualNode::COUNT_FOR_COMPAT`] if the vnode count is not set, - /// typically for backward compatibility. + /// typically for backward compatibility. Panics if the field is a placeholder. + /// + /// Equivalent to `self.vnode_count_inner().value()`. /// /// See the documentation on the field of the implementing type for more details. - fn vnode_count(&self) -> usize; + fn vnode_count(&self) -> usize { + self.vnode_count_inner().value() + } } /// Implement the trait for given types by delegating to the `maybe_vnode_count` field. @@ -36,9 +108,8 @@ macro_rules! impl_maybe_vnode_count_compat { ($($ty:ty),* $(,)?) => { $( impl VnodeCountCompat for $ty { - fn vnode_count(&self) -> usize { - self.maybe_vnode_count - .map_or(VirtualNode::COUNT_FOR_COMPAT, |v| v as _) + fn vnode_count_inner(&self) -> VnodeCount { + VnodeCount::from_protobuf(self.maybe_vnode_count) } } )* diff --git a/src/frontend/src/catalog/table_catalog.rs b/src/frontend/src/catalog/table_catalog.rs index 8c33e55a0b16..b533e6d95668 100644 --- a/src/frontend/src/catalog/table_catalog.rs +++ b/src/frontend/src/catalog/table_catalog.rs @@ -12,6 +12,7 @@ // See the License for the specific language governing permissions and // limitations under the License. +use std::assert_matches::assert_matches; use std::collections::{HashMap, HashSet}; use fixedbitset::FixedBitSet; @@ -20,7 +21,7 @@ use risingwave_common::catalog::{ ColumnCatalog, ConflictBehavior, CreateType, Field, Schema, StreamJobStatus, TableDesc, TableId, TableVersionId, }; -use risingwave_common::hash::VnodeCountCompat; +use risingwave_common::hash::{VnodeCount, VnodeCountCompat}; use risingwave_common::util::epoch::Epoch; use risingwave_common::util::sort_util::ColumnOrder; use risingwave_pb::catalog::table::{OptionalAssociatedSourceId, PbTableType, PbTableVersion}; @@ -174,19 +175,13 @@ pub struct TableCatalog { /// Total vnode count of the table. /// - /// Can be unset if the catalog is generated by the frontend and not yet persisted. This is - /// because the vnode count of each fragment (then internal tables) is determined by the - /// meta service. See also [`StreamMaterialize::derive_table_catalog`] and - /// [`TableCatalogBuilder::build`]. - /// - /// On the contrary, if this comes from a [`PbTable`], the field must be `Some` no matter - /// whether the table is created before or after the version we introduced variable vnode - /// count support. This is because we've already handled backward compatibility during - /// conversion. + /// Can be [`VnodeCount::Placeholder`] if the catalog is generated by the frontend and the + /// corresponding job is still in `Creating` status, in which case calling [`Self::vnode_count`] + /// will panic. /// /// [`StreamMaterialize::derive_table_catalog`]: crate::optimizer::plan_node::StreamMaterialize::derive_table_catalog /// [`TableCatalogBuilder::build`]: crate::optimizer::plan_node::utils::TableCatalogBuilder::build - pub vnode_count: Option, + pub vnode_count: VnodeCount, } #[derive(Copy, Clone, Debug, PartialEq, Eq, Hash)] @@ -408,8 +403,7 @@ impl TableCatalog { /// /// Panics if it's called on an incomplete (and not yet persisted) table catalog. pub fn vnode_count(&self) -> usize { - self.vnode_count - .expect("vnode count unset, called on an incomplete table catalog?") + self.vnode_count.value() } pub fn to_prost(&self, schema_id: SchemaId, database_id: DatabaseId) -> PbTable { @@ -457,7 +451,7 @@ impl TableCatalog { initialized_at_cluster_version: self.initialized_at_cluster_version.clone(), retention_seconds: self.retention_seconds, cdc_table_id: self.cdc_table_id.clone(), - maybe_vnode_count: self.vnode_count.map(|v| v as _), + maybe_vnode_count: self.vnode_count.to_protobuf(), } } @@ -563,7 +557,13 @@ impl From for TableCatalog { OptionalAssociatedSourceId::AssociatedSourceId(id) => id, }); let name = tb.name.clone(); - let vnode_count = tb.vnode_count(); + + let vnode_count = tb.vnode_count_inner(); + if let VnodeCount::Placeholder = vnode_count { + // Only allow placeholder vnode count for creating tables. + // After the table is created, an `Update` notification will be used to update the vnode count field. + assert_matches!(stream_job_status, PbStreamJobStatus::Creating); + } let mut col_names = HashSet::new(); let mut col_index: HashMap = HashMap::new(); @@ -634,7 +634,7 @@ impl From for TableCatalog { .map(TableId::from) .collect_vec(), cdc_table_id: tb.cdc_table_id, - vnode_count: Some(vnode_count), /* from existing (persisted) tables, vnode_count must be set */ + vnode_count, } } } @@ -725,7 +725,7 @@ mod tests { initialized_at_cluster_version: None, version_column_index: None, cdc_table_id: None, - maybe_vnode_count: Some(233), + maybe_vnode_count: VnodeCount::set(233).to_protobuf(), } .into(); @@ -789,7 +789,7 @@ mod tests { dependent_relations: vec![], version_column_index: None, cdc_table_id: None, - vnode_count: Some(233), + vnode_count: VnodeCount::set(233), } ); assert_eq!(table, TableCatalog::from(table.to_prost(0, 0))); diff --git a/src/frontend/src/handler/alter_table_column.rs b/src/frontend/src/handler/alter_table_column.rs index e9ecb5713cb1..88e886ad667b 100644 --- a/src/frontend/src/handler/alter_table_column.rs +++ b/src/frontend/src/handler/alter_table_column.rs @@ -19,6 +19,7 @@ use anyhow::{anyhow, Context}; use itertools::Itertools; use pgwire::pg_response::{PgResponse, StatementType}; use risingwave_common::catalog::ColumnCatalog; +use risingwave_common::hash::VnodeCount; use risingwave_common::types::DataType; use risingwave_common::util::column_index_mapping::ColIndexMapping; use risingwave_common::{bail, bail_not_implemented}; @@ -244,7 +245,7 @@ pub async fn get_replace_table_plan( // Set some fields ourselves so that the meta service does not need to maintain them. let mut table = table; table.incoming_sinks = incoming_sink_ids.iter().copied().collect(); - table.maybe_vnode_count = Some(original_catalog.vnode_count() as _); + table.maybe_vnode_count = VnodeCount::set(original_catalog.vnode_count()).to_protobuf(); Ok((source, table, graph, col_index_mapping, job_type)) } diff --git a/src/frontend/src/optimizer/plan_node/stream_materialize.rs b/src/frontend/src/optimizer/plan_node/stream_materialize.rs index 88878667b841..fa5f60c2a99e 100644 --- a/src/frontend/src/optimizer/plan_node/stream_materialize.rs +++ b/src/frontend/src/optimizer/plan_node/stream_materialize.rs @@ -21,6 +21,7 @@ use pretty_xmlish::{Pretty, XmlNode}; use risingwave_common::catalog::{ ColumnCatalog, ConflictBehavior, CreateType, StreamJobStatus, TableId, OBJECT_ID_PLACEHOLDER, }; +use risingwave_common::hash::VnodeCount; use risingwave_common::util::iter_util::ZipEqFast; use risingwave_common::util::sort_util::{ColumnOrder, OrderType}; use risingwave_pb::stream_plan::stream_node::PbNodeBody; @@ -283,7 +284,7 @@ impl StreamMaterialize { created_at_cluster_version: None, retention_seconds: retention_seconds.map(|i| i.into()), cdc_table_id: None, - vnode_count: None, // will be filled in by the meta service later + vnode_count: VnodeCount::Placeholder, // will be filled in by the meta service later }) } diff --git a/src/frontend/src/optimizer/plan_node/utils.rs b/src/frontend/src/optimizer/plan_node/utils.rs index 4aefa1e54c88..a96e1284a9f7 100644 --- a/src/frontend/src/optimizer/plan_node/utils.rs +++ b/src/frontend/src/optimizer/plan_node/utils.rs @@ -27,6 +27,7 @@ use risingwave_common::catalog::{ use risingwave_common::constants::log_store::v2::{ KV_LOG_STORE_PREDEFINED_COLUMNS, PK_ORDERING, VNODE_COLUMN_INDEX, }; +use risingwave_common::hash::VnodeCount; use risingwave_common::util::sort_util::{ColumnOrder, OrderType}; use crate::catalog::table_catalog::TableType; @@ -179,7 +180,7 @@ impl TableCatalogBuilder { created_at_cluster_version: None, retention_seconds: None, cdc_table_id: None, - vnode_count: None, // will be filled in by the meta service later + vnode_count: VnodeCount::Placeholder, // will be filled in by the meta service later } } diff --git a/src/frontend/src/scheduler/distributed/query.rs b/src/frontend/src/scheduler/distributed/query.rs index d169874d5b3f..2d40328bab40 100644 --- a/src/frontend/src/scheduler/distributed/query.rs +++ b/src/frontend/src/scheduler/distributed/query.rs @@ -473,7 +473,7 @@ pub(crate) mod tests { ColumnCatalog, ColumnDesc, ConflictBehavior, CreateType, StreamJobStatus, DEFAULT_SUPER_USER_ID, }; - use risingwave_common::hash::{VirtualNode, WorkerSlotId, WorkerSlotMapping}; + use risingwave_common::hash::{VirtualNode, VnodeCount, WorkerSlotId, WorkerSlotMapping}; use risingwave_common::types::DataType; use risingwave_pb::common::worker_node::Property; use risingwave_pb::common::{HostAddress, WorkerNode, WorkerType}; @@ -589,7 +589,7 @@ pub(crate) mod tests { initialized_at_cluster_version: None, created_at_cluster_version: None, cdc_table_id: None, - vnode_count: Some(vnode_count), + vnode_count: VnodeCount::set(vnode_count), }; let batch_plan_node: PlanRef = LogicalScan::create( "".to_string(), diff --git a/src/frontend/src/test_utils.rs b/src/frontend/src/test_utils.rs index ac8784fde4dd..14befbaeb735 100644 --- a/src/frontend/src/test_utils.rs +++ b/src/frontend/src/test_utils.rs @@ -28,6 +28,7 @@ use risingwave_common::catalog::{ FunctionId, IndexId, TableId, DEFAULT_DATABASE_NAME, DEFAULT_SCHEMA_NAME, DEFAULT_SUPER_USER, DEFAULT_SUPER_USER_ID, NON_RESERVED_USER_ID, PG_CATALOG_SCHEMA_NAME, RW_CATALOG_SCHEMA_NAME, }; +use risingwave_common::hash::{VirtualNode, VnodeCount, VnodeCountCompat}; use risingwave_common::session_config::SessionConfig; use risingwave_common::system_param::reader::SystemParamsReader; use risingwave_common::util::cluster_limit::ClusterLimit; @@ -281,6 +282,7 @@ impl CatalogWriter for MockCatalogWriter { ) -> Result<()> { table.id = self.gen_id(); table.stream_job_status = PbStreamJobStatus::Created as _; + table.maybe_vnode_count = VnodeCount::for_test().to_protobuf(); self.catalog.write().create_table(&table); self.add_table_or_source_id(table.id, table.schema_id, table.database_id); self.hummock_snapshot_manager @@ -320,6 +322,7 @@ impl CatalogWriter for MockCatalogWriter { _job_type: TableJobType, ) -> Result<()> { table.stream_job_status = PbStreamJobStatus::Created as _; + assert_eq!(table.vnode_count(), VirtualNode::COUNT_FOR_TEST); self.catalog.write().update_table(&table); Ok(()) } @@ -353,6 +356,7 @@ impl CatalogWriter for MockCatalogWriter { ) -> Result<()> { index_table.id = self.gen_id(); index_table.stream_job_status = PbStreamJobStatus::Created as _; + index_table.maybe_vnode_count = VnodeCount::for_test().to_protobuf(); self.catalog.write().create_table(&index_table); self.add_table_or_index_id( index_table.id, diff --git a/src/meta/model/src/table.rs b/src/meta/model/src/table.rs index 0a47208ff735..1d950f277396 100644 --- a/src/meta/model/src/table.rs +++ b/src/meta/model/src/table.rs @@ -209,7 +209,15 @@ impl From for ActiveModel { fn from(pb_table: PbTable) -> Self { let table_type = pb_table.table_type(); let handle_pk_conflict_behavior = pb_table.handle_pk_conflict_behavior(); - let vnode_count = pb_table.vnode_count(); + + // `PbTable` here should be sourced from the wire, not from persistence. + // A placeholder `maybe_vnode_count` field should be treated as `NotSet`, instead of calling + // the compatibility code. + let vnode_count = pb_table + .vnode_count_inner() + .value_opt() + .map(|v| v as _) + .map_or(NotSet, Set); let fragment_id = if pb_table.fragment_id == OBJECT_ID_PLACEHOLDER { NotSet @@ -258,7 +266,7 @@ impl From for ActiveModel { retention_seconds: Set(pb_table.retention_seconds.map(|i| i as _)), incoming_sinks: Set(pb_table.incoming_sinks.into()), cdc_table_id: Set(pb_table.cdc_table_id), - vnode_count: Set(vnode_count as _), + vnode_count, } } } diff --git a/src/meta/src/controller/fragment.rs b/src/meta/src/controller/fragment.rs index e118ad3c9683..a79b890cade2 100644 --- a/src/meta/src/controller/fragment.rs +++ b/src/meta/src/controller/fragment.rs @@ -19,7 +19,7 @@ use std::mem::swap; use anyhow::Context; use itertools::Itertools; use risingwave_common::bail; -use risingwave_common::hash::{VnodeCountCompat, WorkerSlotId}; +use risingwave_common::hash::{VnodeCount, VnodeCountCompat, WorkerSlotId}; use risingwave_common::util::stream_graph_visitor::visit_stream_node; use risingwave_common::util::worker_util::WorkerNodeId; use risingwave_meta_model::actor::ActorStatus; @@ -496,7 +496,7 @@ impl CatalogController { actors: pb_actors, state_table_ids: pb_state_table_ids, upstream_fragment_ids: pb_upstream_fragment_ids, - maybe_vnode_count: Some(vnode_count as _), + maybe_vnode_count: VnodeCount::set(vnode_count).to_protobuf(), }; Ok((pb_fragment, pb_actor_status, pb_actor_splits)) @@ -1497,7 +1497,7 @@ mod tests { use std::collections::{BTreeMap, HashMap}; use itertools::Itertools; - use risingwave_common::hash::{ActorMapping, VirtualNode}; + use risingwave_common::hash::{ActorMapping, VirtualNode, VnodeCount}; use risingwave_common::util::iter_util::ZipEqDebug; use risingwave_common::util::stream_graph_visitor::visit_stream_node; use risingwave_meta_model::actor::ActorStatus; @@ -1627,7 +1627,7 @@ mod tests { .values() .flat_map(|m| m.keys().map(|x| *x as _)) .collect(), - maybe_vnode_count: Some(VirtualNode::COUNT_FOR_TEST as _), + maybe_vnode_count: VnodeCount::for_test().to_protobuf(), }; let pb_actor_status = (0..actor_count) diff --git a/src/meta/src/controller/mod.rs b/src/meta/src/controller/mod.rs index cd0bb8530acb..c7cf45daad9e 100644 --- a/src/meta/src/controller/mod.rs +++ b/src/meta/src/controller/mod.rs @@ -15,6 +15,7 @@ use std::collections::BTreeMap; use anyhow::anyhow; +use risingwave_common::hash::VnodeCount; use risingwave_common::util::epoch::Epoch; use risingwave_meta_model::{ connection, database, function, index, object, schema, secret, sink, source, subscription, @@ -164,7 +165,7 @@ impl From> for PbTable { created_at_cluster_version: value.1.created_at_cluster_version, retention_seconds: value.0.retention_seconds.map(|id| id as u32), cdc_table_id: value.0.cdc_table_id, - maybe_vnode_count: Some(value.0.vnode_count as _), + maybe_vnode_count: VnodeCount::set(value.0.vnode_count).to_protobuf(), } } } diff --git a/src/meta/src/stream/stream_graph/fragment.rs b/src/meta/src/stream/stream_graph/fragment.rs index 211f609a13fe..86a2197a9d5b 100644 --- a/src/meta/src/stream/stream_graph/fragment.rs +++ b/src/meta/src/stream/stream_graph/fragment.rs @@ -24,6 +24,7 @@ use risingwave_common::bail; use risingwave_common::catalog::{ generate_internal_table_name_with_type, TableId, CDC_SOURCE_COLUMN_NUM, }; +use risingwave_common::hash::VnodeCount; use risingwave_common::util::iter_util::ZipEqFast; use risingwave_common::util::stream_graph_visitor; use risingwave_common::util::stream_graph_visitor::visit_stream_node_cont; @@ -1126,7 +1127,7 @@ impl CompleteStreamFragmentGraph { actors, state_table_ids, upstream_fragment_ids, - maybe_vnode_count: Some(vnode_count as _), + maybe_vnode_count: VnodeCount::set(vnode_count).to_protobuf(), } }