Skip to content

Commit

Permalink
support
Browse files Browse the repository at this point in the history
  • Loading branch information
xxhZs committed Nov 7, 2024
1 parent dae04db commit e4770af
Show file tree
Hide file tree
Showing 8 changed files with 132 additions and 80 deletions.
2 changes: 0 additions & 2 deletions src/batch/src/executor/log_row_seq_scan.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,15 +18,13 @@ use std::sync::Arc;
use futures::prelude::stream::StreamExt;
use futures_async_stream::try_stream;
use futures_util::pin_mut;
use iceberg::scan;
use prometheus::Histogram;
use risingwave_common::array::{DataChunk, Op};
use risingwave_common::bitmap::Bitmap;
use risingwave_common::catalog::{ColumnId, Field, Schema};
use risingwave_common::hash::VnodeCountCompat;
use risingwave_common::row::{Row, RowExt};
use risingwave_common::types::{DataType, ScalarImpl};
use risingwave_common::util::scan_range;
use risingwave_hummock_sdk::{HummockReadEpoch, HummockVersionId};
use risingwave_pb::batch_plan::plan_node::NodeBody;
use risingwave_pb::common::{batch_query_epoch, BatchQueryEpoch};
Expand Down
6 changes: 2 additions & 4 deletions src/batch/src/executor/row_seq_scan.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
use std::ops::{Bound, Deref};
use std::ops::Deref;
use std::sync::Arc;

use futures::{pin_mut, StreamExt};
Expand All @@ -23,11 +23,9 @@ use risingwave_common::bitmap::Bitmap;
use risingwave_common::catalog::{ColumnId, Schema};
use risingwave_common::hash::VnodeCountCompat;
use risingwave_common::row::{OwnedRow, Row};
use risingwave_common::types::{DataType, Datum};
use risingwave_common::types::DataType;
use risingwave_common::util::chunk_coalesce::DataChunkBuilder;
use risingwave_common::util::value_encoding::deserialize_datum;
use risingwave_pb::batch_plan::plan_node::NodeBody;
use risingwave_pb::batch_plan::{scan_range, PbScanRange};
use risingwave_pb::common::BatchQueryEpoch;
use risingwave_pb::plan_common::as_of::AsOfType;
use risingwave_pb::plan_common::{as_of, PbAsOf, StorageTableDesc};
Expand Down
91 changes: 87 additions & 4 deletions src/common/src/array/struct_array.rs
Original file line number Diff line number Diff line change
Expand Up @@ -361,11 +361,27 @@ impl StructValue {
if !s.ends_with(')') {
return Err("Missing right parenthesis".into());
}
let mut fields = Vec::with_capacity(s.len());
for (s, ty) in s[1..s.len() - 1].split(',').zip_eq_debug(ty.types()) {
let datum = match s.trim() {
let s = &s[1..s.len() - 1];
let mut split_str = Vec::with_capacity(ty.len());
let mut left_parenthesis_num = 0;
let mut start = 0;
for (i, c) in s.char_indices() {
match c {
'(' => left_parenthesis_num += 1,
')' => left_parenthesis_num -= 1,
',' if left_parenthesis_num == 0 => {
split_str.push(&s[start..i]);
start = i + 1;
}
_ => {}
}
}
split_str.push(&s[start..=(s.len() - 1)]);
let mut fields = Vec::with_capacity(ty.len());
for (str, ty) in split_str.iter().zip_eq_debug(ty.types()) {
let datum = match str.trim() {
"" => None,
s => Some(ScalarImpl::from_text(s, ty)?),
s => Some(ScalarImpl::from_text(s, ty).unwrap()),
};
fields.push(datum);
}
Expand Down Expand Up @@ -832,4 +848,71 @@ mod tests {
test("{1,2}", r#""{1,2}""#);
test(r#"{"f": 1}"#, r#""{""f"": 1}""#);
}

#[test]
fn test_from_str_nested_struct() {
let struct_str = "(1,sad ,(3, 4.0),(1,( 2,(3,(4,(5, 6))) )) )";
let struct_type = StructType::unnamed(vec![
DataType::Int32,
DataType::Varchar,
DataType::new_unnamed_struct(vec![DataType::Int32, DataType::Float64]),
DataType::new_unnamed_struct(vec![
DataType::Int32,
DataType::new_unnamed_struct(vec![
DataType::Int32,
DataType::new_unnamed_struct(vec![
DataType::Int32,
DataType::new_unnamed_struct(vec![
DataType::Int32,
DataType::new_unnamed_struct(vec![DataType::Int32, DataType::Int32]),
]),
]),
]),
]),
]);
let struct_value = StructValue::from_str(struct_str, &struct_type).unwrap();
let expected = StructValue::new(vec![
Some(1.to_scalar_value()),
Some("sad".into()),
Some(
StructValue::new(vec![
Some(3.to_scalar_value()),
Some(ScalarImpl::Float64(4.0.into())),
])
.to_scalar_value(),
),
Some(
StructValue::new(vec![
Some(1.to_scalar_value()),
Some(
StructValue::new(vec![
Some(2.to_scalar_value()),
Some(
StructValue::new(vec![
Some(3.to_scalar_value()),
Some(
StructValue::new(vec![
Some(4.to_scalar_value()),
Some(
StructValue::new(vec![
Some(5.to_scalar_value()),
Some(6.to_scalar_value()),
])
.to_scalar_value(),
),
])
.to_scalar_value(),
),
])
.to_scalar_value(),
),
])
.to_scalar_value(),
),
])
.to_scalar_value(),
),
]);
assert_eq!(struct_value, expected);
}
}
7 changes: 4 additions & 3 deletions src/expr/impl/src/scalar/cast.rs
Original file line number Diff line number Diff line change
Expand Up @@ -48,10 +48,11 @@ where
}

#[function("cast(varchar) -> struct", type_infer = "unreachable")]
pub fn str_parse_struct(elem: &str, ctx: &Context) -> Result<StructValue>{
pub fn str_parse_struct(elem: &str, ctx: &Context) -> Result<StructValue> {
match &ctx.return_type {
risingwave_common::types::DataType::Struct(s) => Ok(StructValue::from_str(elem, s).map_err(|e| ExprError::Parse(format!("error: {:?}",e.as_report()).into()))?),
_ => return Err(ExprError::Parse("unsupported type".into())),
risingwave_common::types::DataType::Struct(s) => Ok(StructValue::from_str(elem, s)
.map_err(|e| ExprError::Parse(format!("error: {:?}", e.as_report()).into()))?),
_ => Err(ExprError::Parse("unsupported type".into())),
}
}

Expand Down
4 changes: 3 additions & 1 deletion src/frontend/src/handler/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -540,7 +540,9 @@ pub async fn handle(
Statement::Query(_)
| Statement::Insert { .. }
| Statement::Delete { .. }
| Statement::Update { .. } => Ok(query::handle_query(handler_args, stmt, formats).await.unwrap()),
| Statement::Update { .. } => Ok(query::handle_query(handler_args, stmt, formats)
.await
.unwrap()),
Statement::CreateView {
materialized,
if_not_exists,
Expand Down
16 changes: 7 additions & 9 deletions src/frontend/src/handler/util.rs
Original file line number Diff line number Diff line change
Expand Up @@ -274,12 +274,10 @@ pub fn gen_query_from_table_name_order_by(
let mut values = vec![];
for ((name, _), seek_pk) in pks.iter().zip_eq_fast(seek_pk_rows.iter()) {
if let Some(seek_pk) = seek_pk {
pk_rows.push(
Expr::Identifier(Ident::with_quote_unchecked(
'"',
name.clone(),
))
);
pk_rows.push(Expr::Identifier(Ident::with_quote_unchecked(
'"',
name.clone(),
)));
values.push(String::from_utf8(seek_pk.clone().into()).unwrap());
}
}
Expand All @@ -293,7 +291,7 @@ pub fn gen_query_from_table_name_order_by(
op: BinaryOperator::Gt,
right: Box::new(right),
})
}else{
} else {
let left = Expr::Row(pk_rows);
let values = values.join(",");
let right = Expr::Value(Value::SingleQuotedString(format!("({})", values)));
Expand All @@ -303,13 +301,13 @@ pub fn gen_query_from_table_name_order_by(
right: Box::new(right),
})
}
} else{
} else {
None
};

let select = Select {
from,
projection,
from,
selection,
..Default::default()
};
Expand Down
6 changes: 1 addition & 5 deletions src/frontend/src/optimizer/plan_node/batch_sys_seq_scan.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,18 +12,14 @@
// See the License for the specific language governing permissions and
// limitations under the License.

use std::ops::Bound;

use itertools::Itertools;
use pretty_xmlish::{Pretty, XmlNode};
use risingwave_common::types::ScalarImpl;
use risingwave_common::util::scan_range::{is_full_range, ScanRange};
use risingwave_pb::batch_plan::plan_node::NodeBody;
use risingwave_pb::batch_plan::SysRowSeqScanNode;
use risingwave_pb::plan_common::PbColumnDesc;

use super::batch::prelude::*;
use super::utils::{childless_record, range_to_string, scan_ranges_as_strs, Distill};
use super::utils::{childless_record, scan_ranges_as_strs, Distill};
use super::{generic, ExprRewritable, PlanBase, PlanRef, ToBatchPb, ToDistributedBatch};
use crate::error::Result;
use crate::expr::{ExprRewriter, ExprVisitor};
Expand Down
80 changes: 28 additions & 52 deletions src/frontend/src/session/cursor_manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,23 +27,20 @@ use itertools::Itertools;
use pgwire::pg_field_descriptor::PgFieldDescriptor;
use pgwire::pg_response::StatementType;
use pgwire::types::{Format, Row};
use prost::Message;
use risingwave_common::catalog::Field;
use risingwave_common::error::BoxedError;
use risingwave_common::session_config::QueryMode;
use risingwave_common::types::{DataType, ScalarImpl, StructType, StructValue};
use risingwave_common::util::iter_util::ZipEqFast;
use risingwave_common::util::scan_range;
use risingwave_common::util::sort_util::ColumnOrder;
use risingwave_hummock_sdk::HummockVersionId;
use risingwave_pb::expr::expr_node::Type;
use risingwave_sqlparser::ast::{BinaryOperator, Expr, Ident, ObjectName, Statement, Value};
use risingwave_sqlparser::ast::{Ident, ObjectName, Statement};

use super::SessionImpl;
use crate::catalog::subscription_catalog::SubscriptionCatalog;
use crate::catalog::TableId;
use crate::error::{ErrorCode, Result};
use crate::expr::{ExprImpl, ExprType, FunctionCall, InputRef, Literal};
use crate::expr::{ExprType, FunctionCall, InputRef, Literal};
use crate::handler::declare_cursor::create_chunk_stream_for_cursor;
use crate::handler::query::{
gen_batch_plan_by_statement, gen_batch_plan_fragmenter, BatchQueryPlanResult,
Expand All @@ -59,9 +56,7 @@ use crate::optimizer::property::{Order, RequiredDist};
use crate::optimizer::PlanRoot;
use crate::scheduler::{DistributedQueryStream, LocalQueryStream};
use crate::utils::Condition;
use crate::{
Binder, OptimizerContext, OptimizerContextRef, PgResponseStream, PlanRef, TableCatalog,
};
use crate::{Binder, OptimizerContext, OptimizerContextRef, PgResponseStream, TableCatalog};

pub enum CursorDataChunkStream {
LocalDataChunk(Option<LocalQueryStream>),
Expand Down Expand Up @@ -392,26 +387,6 @@ impl SubscriptionCursor {
&fields,
handler_args.session.clone(),
);
{
let (mut chunk_stream, fields, init_query_timer, pk_column_names) =
Self::initiate_query(
Some(rw_timestamp),
&self.dependent_table_id,
handler_args.clone(),
self.seek_pk_row.clone(),
)
.await?;
Self::init_row_stream(
&mut chunk_stream,
formats,
&from_snapshot,
&fields,
handler_args.session.clone(),
);
while let Some(a) = chunk_stream.next().await? {
println!("testtest {:?}", a);
}
}

self.cursor_need_drop_time = Instant::now()
+ Duration::from_secs(self.subscription.retention_seconds);
Expand Down Expand Up @@ -875,43 +850,42 @@ impl SubscriptionCursor {
.iter()
.map(|f| {
let pk = table_catalog.columns.get(f.column_index).unwrap();
(pk.name().to_string(), pk.data_type(), f.column_index)
(pk.data_type(), f.column_index)
})
.collect_vec();
let (scan, predicate) = if let Some(seek_pk_rows) = seek_pk_rows {
let mut pk_rows = vec![];
let mut values = vec![];
for (seek_pk, (name, data_type, column_index)) in seek_pk_rows
.into_iter()
.zip_eq_fast(pks.into_iter()) {
for (seek_pk, (data_type, column_index)) in
seek_pk_rows.into_iter().zip_eq_fast(pks.into_iter())
{
if let Some(seek_pk) = seek_pk {
pk_rows.push(
InputRef {
index: column_index,
data_type: data_type.clone(),
}
);
pk_rows.push(InputRef {
index: column_index,
data_type: data_type.clone(),
});
let value_string = String::from_utf8(seek_pk.clone().into()).unwrap();
let value_data =
ScalarImpl::from_text(&value_string, data_type).unwrap();
values.push((Some(value_data),data_type.clone()));
let value_data = ScalarImpl::from_text(&value_string, data_type).unwrap();
values.push((Some(value_data), data_type.clone()));
}
}
if pk_rows.is_empty() {
(vec![], None)
} else if pk_rows.len() == 1 {
let left = pk_rows.pop().unwrap();
let (right_data,right_type) = values.pop().unwrap();
let (right_data, right_type) = values.pop().unwrap();
let (scan, predicate) = Condition {
conjunctions: vec![FunctionCall::new(
ExprType::GreaterThan,
vec![left.into(), Literal::new(right_data, right_type).into()],
)?.into()],
}.split_to_scan_ranges(table_catalog.table_desc().into(), max_split_range_gap)?;
)?
.into()],
}
.split_to_scan_ranges(table_catalog.table_desc().into(), max_split_range_gap)?;
(scan, Some(predicate))
}else{
let (right_datas,right_types):(Vec<_>,Vec<_>) = values.into_iter().unzip();
let right_data = ScalarImpl::Struct(StructValue::new(right_datas));
} else {
let (right_data, right_types): (Vec<_>, Vec<_>) = values.into_iter().unzip();
let right_data = ScalarImpl::Struct(StructValue::new(right_data));
let right_type = DataType::Struct(StructType::unnamed(right_types));
let left = FunctionCall::new_unchecked(
ExprType::Row,
Expand All @@ -921,13 +895,15 @@ impl SubscriptionCursor {
let right = Literal::new(Some(right_data), right_type);
let (scan, predicate) = Condition {
conjunctions: vec![FunctionCall::new(
ExprType::GreaterThan,
vec![left.into(), right.into()],
)?.into()],
}.split_to_scan_ranges(table_catalog.table_desc().into(), max_split_range_gap)?;
ExprType::GreaterThan,
vec![left.into(), right.into()],
)?
.into()],
}
.split_to_scan_ranges(table_catalog.table_desc().into(), max_split_range_gap)?;
(scan, Some(predicate))
}
} else{
} else {
(vec![], None)
};

Expand Down

0 comments on commit e4770af

Please sign in to comment.