Skip to content

Commit

Permalink
feat: add query_all function (#355)
Browse files Browse the repository at this point in the history
* feat: add query_all function

* feat: update

* feat: update

* feat: update

* update

* update

* update
  • Loading branch information
sundy-li authored Mar 4, 2024
1 parent 25b1195 commit 4c3041b
Show file tree
Hide file tree
Showing 10 changed files with 112 additions and 57 deletions.
2 changes: 2 additions & 0 deletions bindings/nodejs/index.d.ts
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,8 @@ export class Connection {
exec(sql: string): Promise<number>
/** Execute a SQL query, and only return the first row. */
queryRow(sql: string): Promise<Row | null>
/** Execute a SQL query and fetch all data into the result */
queryAll(sql: string): Promise<Array<Row>>
/** Execute a SQL query, and return all rows. */
queryIter(sql: string): Promise<RowIterator>
/** Execute a SQL query, and return all rows with schema and stats. */
Expand Down
13 changes: 13 additions & 0 deletions bindings/nodejs/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -332,6 +332,19 @@ impl Connection {
.map_err(format_napi_error)
}

/// Execute a SQL query and fetch all data into the result
#[napi]
pub async fn query_all(&self, sql: String) -> Result<Vec<Row>> {
Ok(self
.0
.query_all(&sql)
.await
.map_err(format_napi_error)?
.into_iter()
.map(|row| Row(row))
.collect())
}

/// Execute a SQL query, and return all rows.
#[napi]
pub async fn query_iter(&self, sql: String) -> Result<RowIterator> {
Expand Down
14 changes: 14 additions & 0 deletions bindings/python/src/asyncio.rs
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,20 @@ impl AsyncDatabendConnection {
})
}

pub fn query_all<'p>(&self, py: Python<'p>, sql: String) -> PyResult<&'p PyAny> {
let this = self.0.clone();
future_into_py(py, async move {
let rows: Vec<Row> = this
.query_all(&sql)
.await
.map_err(DriverError::new)?
.into_iter()
.map(Row::new)
.collect();
Ok(rows)
})
}

pub fn query_iter<'p>(&'p self, py: Python<'p>, sql: String) -> PyResult<&'p PyAny> {
let this = self.0.clone();
future_into_py(py, async move {
Expand Down
8 changes: 8 additions & 0 deletions bindings/python/src/blocking.rs
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,14 @@ impl BlockingDatabendConnection {
Ok(ret.map(Row::new))
}

pub fn query_all(&self, py: Python, sql: String) -> PyResult<Vec<Row>> {
let this = self.0.clone();
let rows = wait_for_future(py, async move {
this.query_all(&sql).await.map_err(DriverError::new)
})?;
Ok(rows.into_iter().map(Row::new).collect())
}

pub fn query_iter(&self, py: Python, sql: String) -> PyResult<RowIterator> {
let this = self.0.clone();
let it = wait_for_future(py, async {
Expand Down
13 changes: 11 additions & 2 deletions cli/src/ast/tokenizer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -122,7 +122,10 @@ pub enum TokenKind {
#[regex(r#"'([^'\\]|\\.|'')*'"#)]
QuotedString,

#[regex(r#"@([^\s`;'"])+"#)]
#[regex(r#"\$\$([^\$]|(\$[^\$]))*\$\$"#)]
CodeString,

#[regex(r#"@([^\s`;'"()]|\\\s|\\'|\\"|\\\\)+"#)]
AtString,

#[regex(r"[xX]'[a-fA-F0-9]*'")]
Expand Down Expand Up @@ -907,7 +910,12 @@ impl TokenKind {
pub fn is_literal(&self) -> bool {
matches!(
self,
LiteralInteger | LiteralFloat | QuotedString | PGLiteralHex | MySQLLiteralHex
LiteralInteger
| CodeString
| LiteralFloat
| QuotedString
| PGLiteralHex
| MySQLLiteralHex
)
}

Expand All @@ -916,6 +924,7 @@ impl TokenKind {
self,
Ident
| QuotedString
| CodeString
| PGLiteralHex
| MySQLLiteralHex
| LiteralInteger
Expand Down
88 changes: 39 additions & 49 deletions cli/src/session.rs
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,6 @@ pub struct Session {

settings: Settings,
query: String,
in_comment_block: bool,

keywords: Arc<Vec<String>>,
}
Expand Down Expand Up @@ -108,7 +107,6 @@ impl Session {
is_repl,
settings,
query: String::new(),
in_comment_block: false,
keywords: Arc::new(keywords),
})
}
Expand Down Expand Up @@ -316,7 +314,6 @@ impl Session {
}

pub fn append_query(&mut self, line: &str) -> Vec<String> {
let line = line.trim();
if line.is_empty() {
return vec![];
}
Expand All @@ -338,63 +335,56 @@ impl Session {
}
}

self.query.push(' ');

// consume self.query and get the result
let mut queries = Vec::new();
let mut tokenizer = Tokenizer::new(line);
let mut in_comment = false;
let mut start = 0;
let mut comment_block_start = 0;

while let Some(Ok(token)) = tokenizer.next() {
match token.kind {
TokenKind::SemiColon => {
if in_comment || self.in_comment_block {
continue;
} else {
let mut sql = self.query.trim().to_owned();
if sql.is_empty() {

if !self.query.is_empty() {
self.query.push('\n');
}
self.query.push_str(line);

'Parser: loop {
let mut tokenizer = Tokenizer::new(&self.query);

let mut in_comment = false;
let mut in_comment_block = false;

while let Some(Ok(token)) = tokenizer.next() {
match token.kind {
TokenKind::SemiColon => {
if in_comment_block || in_comment {
continue;
}
sql.push(';');

queries.push(sql);
self.query.clear();
// push to current and continue the tokenizer
let (sql, remain) = self.query.split_at(token.span.end);
if !sql.is_empty() {
queries.push(sql.to_string());
}
self.query = remain.to_string();
continue 'Parser;
}
}
TokenKind::Comment => {
in_comment = true;
}
TokenKind::EOI => {
in_comment = false;
}
TokenKind::Newline => {
in_comment = false;
self.query.push('\n');
}
TokenKind::CommentBlockStart => {
if !self.in_comment_block {
comment_block_start = token.span.start;
TokenKind::Comment => {
if in_comment_block {
continue;
}
in_comment = true;
}
self.in_comment_block = true;
}
TokenKind::CommentBlockEnd => {
self.in_comment_block = false;
self.query
.push_str(&line[comment_block_start..token.span.end]);
}
_ => {
if !in_comment && !self.in_comment_block {
self.query.push_str(&line[start..token.span.end]);
TokenKind::Newline => {
in_comment = false;
}
TokenKind::CommentBlockStart => {
in_comment_block = true;
}
TokenKind::CommentBlockEnd => {
in_comment_block = false;
}
_ => {}
}
}
start = token.span.end;
break;
}

if self.in_comment_block {
self.query.push_str(&line[comment_block_start..]);
}
queries
}

Expand Down
5 changes: 5 additions & 0 deletions cli/tests/00-base.result
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,11 @@ a 1 true [1,2]
3
[] {}
with comment
1
2
"
a"
3
3.00 3.00 0.0000000170141183460469231731687303715884105727000 -0.0000000170141183460469231731687303715884105727000
Asia/Shanghai
0 0.00
Expand Down
10 changes: 10 additions & 0 deletions cli/tests/00-base.sql
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,16 @@ select [], {};

select /* ignore this block */ 'with comment';

select 1; select 2; select '
a'; select 3;

-- enable it after we support code string in databend
-- select $$aa$$;
-- select $$
-- def add(a, b):
-- a + b
-- $$;

/* ignore this block /* /*
select 'in comment block';
*/
Expand Down
4 changes: 4 additions & 0 deletions driver/src/conn.rs
Original file line number Diff line number Diff line change
Expand Up @@ -106,6 +106,10 @@ pub trait Connection: DynClone + Send + Sync {

async fn exec(&self, sql: &str) -> Result<i64>;
async fn query_row(&self, sql: &str) -> Result<Option<Row>>;
async fn query_all(&self, sql: &str) -> Result<Vec<Row>> {
let rows = self.query_iter(sql).await?;
rows.collect().await
}
async fn query_iter(&self, sql: &str) -> Result<RowIterator>;
async fn query_iter_ext(&self, sql: &str) -> Result<RowStatsIterator>;

Expand Down
12 changes: 6 additions & 6 deletions driver/src/flight_sql.rs
Original file line number Diff line number Diff line change
Expand Up @@ -121,9 +121,9 @@ impl Connection for FlightSQLConnection {
_file_format_options: Option<BTreeMap<&str, &str>>,
_copy_options: Option<BTreeMap<&str, &str>>,
) -> Result<ServerStats> {
return Err(Error::Protocol(
Err(Error::Protocol(
"LOAD DATA unavailable for FlightSQL".to_string(),
));
))
}

async fn load_file(
Expand All @@ -133,15 +133,15 @@ impl Connection for FlightSQLConnection {
_format_options: BTreeMap<&str, &str>,
_copy_options: Option<BTreeMap<&str, &str>>,
) -> Result<ServerStats> {
return Err(Error::Protocol(
Err(Error::Protocol(
"LOAD FILE unavailable for FlightSQL".to_string(),
));
))
}

async fn stream_load(&self, _sql: &str, _data: Vec<Vec<&str>>) -> Result<ServerStats> {
return Err(Error::Protocol(
Err(Error::Protocol(
"STREAM LOAD unavailable for FlightSQL".to_string(),
));
))
}
}

Expand Down

0 comments on commit 4c3041b

Please sign in to comment.