Skip to content

Commit 136fe5f

Browse files
committed
full transition of result native-js-native
1 parent 6446dec commit 136fe5f

File tree

13 files changed

+523
-156
lines changed

13 files changed

+523
-156
lines changed

packages/cubejs-backend-native/Cargo.lock

Lines changed: 1 addition & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

packages/cubejs-backend-native/Cargo.toml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -19,12 +19,12 @@ crate-type = ["cdylib", "lib"]
1919
cubesqlplanner = { path = "../../rust/cubesqlplanner/cubesqlplanner" }
2020
cubeorchestrator = { path = "../../rust/cubeorchestrator" }
2121
cubenativeutils = { path = "../../rust/cubenativeutils" }
22+
cubesql = { path = "../../rust/cubesql/cubesql" }
2223
anyhow = "1.0"
2324
async-channel = { version = "2" }
2425
async-trait = "0.1.36"
2526
convert_case = "0.6.0"
2627
pin-project = "1.1.5"
27-
cubesql = { path = "../../rust/cubesql/cubesql" }
2828
findshlibs = "0.10.2"
2929
futures = "0.3.30"
3030
http-body-util = "0.1"

packages/cubejs-backend-native/js/ResultWrapper.ts

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -161,6 +161,7 @@ export class ResultMultiWrapper extends BaseWrapper implements DataResult {
161161
}
162162
}
163163

164+
// This is consumed by native side via Transport Bridge
164165
export class ResultArrayWrapper extends BaseWrapper implements DataResult {
165166
public constructor(private readonly results: ResultWrapper[]) {
166167
super();
@@ -177,6 +178,9 @@ export class ResultArrayWrapper extends BaseWrapper implements DataResult {
177178
[[], [], []]
178179
);
179180

180-
return getFinalQueryResultArray(transformDataJson, rawData, resultDataJson);
181+
// It seems this is not needed anymore
182+
// return getFinalQueryResultArray(transformDataJson, rawData, resultDataJson);
183+
184+
return [transformDataJson, rawData, resultDataJson];
181185
}
182186
}

packages/cubejs-backend-native/js/index.ts

Lines changed: 1 addition & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -267,9 +267,7 @@ function wrapNativeFunctionWithStream(
267267
} else if (response.error) {
268268
writerOrChannel.reject(errorString(response));
269269
} else if (response.isWrapper) { // Native wrapped result
270-
const resArBuf = await response.getFinalResult();
271-
const resStr = new TextDecoder().decode(resArBuf);
272-
writerOrChannel.resolve(resStr);
270+
writerOrChannel.resolve(await response.getFinalResult());
273271
} else {
274272
writerOrChannel.resolve(JSON.stringify(response));
275273
}

packages/cubejs-backend-native/src/channel.rs

Lines changed: 8 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -5,16 +5,16 @@ use std::sync::atomic::{AtomicU64, Ordering};
55
use std::sync::Arc;
66

77
use crate::transport::MapCubeErrExt;
8+
use crate::utils::bind_method;
89
use async_trait::async_trait;
10+
use cubeorchestrator::query_result_transform::RequestResultArray;
911
use cubesql::transport::{SqlGenerator, SqlTemplates};
1012
use cubesql::CubeError;
1113
#[cfg(debug_assertions)]
1214
use log::trace;
1315
use neon::prelude::*;
1416
use tokio::sync::oneshot;
1517

16-
use crate::utils::bind_method;
17-
1818
type JsAsyncStringChannelCallback =
1919
Box<dyn FnOnce(Result<String, CubeError>) -> Result<(), CubeError> + Send>;
2020
type JsAsyncChannelCallback = Box<
@@ -194,6 +194,12 @@ where
194194
rx.await?
195195
}
196196

197+
#[derive(Debug)]
198+
pub enum ValueFromJs {
199+
String(String),
200+
RequestResultArray(RequestResultArray),
201+
}
202+
197203
#[allow(clippy::type_complexity)]
198204
pub async fn call_raw_js_with_channel_as_callback<T, R>(
199205
channel: Arc<Channel>,

packages/cubejs-backend-native/src/orchestrator.rs

Lines changed: 54 additions & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -144,51 +144,78 @@ pub fn final_query_result(mut cx: FunctionContext) -> JsResult<JsPromise> {
144144
Ok(promise)
145145
}
146146

147-
pub fn final_query_result_array(mut cx: FunctionContext) -> JsResult<JsPromise> {
148-
let transform_data_array = cx.argument::<JsValue>(0)?;
149-
let deserializer = JsValueDeserializer::new(&mut cx, transform_data_array);
147+
pub type JsResultDataVectors = (
148+
Vec<TransformDataRequest>,
149+
Vec<Arc<QueryResult>>,
150+
Vec<RequestResultData>,
151+
);
152+
153+
pub fn convert_final_query_result_array_from_js(
154+
cx: &mut FunctionContext<'_>,
155+
transform_data_array: Handle<JsValue>,
156+
data_array: Handle<JsArray>,
157+
results_data_array: Handle<JsValue>,
158+
) -> NeonResult<JsResultDataVectors> {
159+
let deserializer = JsValueDeserializer::new(cx, transform_data_array);
150160
let transform_requests: Vec<TransformDataRequest> = match Deserialize::deserialize(deserializer)
151161
{
152162
Ok(data) => data,
153163
Err(err) => return cx.throw_error(err.to_string()),
154164
};
155165

156-
let data_array = cx.argument::<JsArray>(1)?;
157166
let mut cube_store_results: Vec<Arc<QueryResult>> = vec![];
158-
for data_arg in data_array.to_vec(&mut cx)? {
159-
match extract_query_result(&mut cx, data_arg) {
167+
for data_arg in data_array.to_vec(cx)? {
168+
match extract_query_result(cx, data_arg) {
160169
Ok(query_result) => cube_store_results.push(query_result),
161170
Err(err) => return cx.throw_error(err.to_string()),
162171
};
163172
}
164173

165-
let results_data_array = cx.argument::<JsValue>(2)?;
166-
let deserializer = JsValueDeserializer::new(&mut cx, results_data_array);
167-
let mut request_results: Vec<RequestResultData> = match Deserialize::deserialize(deserializer) {
174+
let deserializer = JsValueDeserializer::new(cx, results_data_array);
175+
let request_results: Vec<RequestResultData> = match Deserialize::deserialize(deserializer) {
168176
Ok(data) => data,
169177
Err(err) => return cx.throw_error(err.to_string()),
170178
};
171179

172-
let promise = cx
173-
.task(move || {
174-
get_final_cubestore_result_array(
175-
&transform_requests,
176-
&cube_store_results,
177-
&mut request_results,
178-
)?;
179-
180-
let final_obj = RequestResultArray {
181-
results: request_results,
182-
};
180+
Ok((transform_requests, cube_store_results, request_results))
181+
}
183182

184-
match serde_json::to_string(&final_obj) {
185-
Ok(json) => Ok(json),
186-
Err(err) => Err(anyhow::Error::from(err)),
187-
}
188-
})
189-
.promise(move |cx, json_data| json_to_array_buffer(cx, json_data));
183+
pub fn final_query_result_array(mut cx: FunctionContext) -> JsResult<JsPromise> {
184+
let transform_data_array = cx.argument::<JsValue>(0)?;
185+
let data_array = cx.argument::<JsArray>(1)?;
186+
let results_data_array = cx.argument::<JsValue>(2)?;
190187

191-
Ok(promise)
188+
let convert_res = convert_final_query_result_array_from_js(
189+
&mut cx,
190+
transform_data_array,
191+
data_array,
192+
results_data_array,
193+
);
194+
match convert_res {
195+
Ok((transform_requests, cube_store_results, mut request_results)) => {
196+
let promise = cx
197+
.task(move || {
198+
get_final_cubestore_result_array(
199+
&transform_requests,
200+
&cube_store_results,
201+
&mut request_results,
202+
)?;
203+
204+
let final_obj = RequestResultArray {
205+
results: request_results,
206+
};
207+
208+
match serde_json::to_string(&final_obj) {
209+
Ok(json) => Ok(json),
210+
Err(err) => Err(anyhow::Error::from(err)),
211+
}
212+
})
213+
.promise(move |cx, json_data| json_to_array_buffer(cx, json_data));
214+
215+
Ok(promise)
216+
}
217+
Err(err) => cx.throw_error(err.to_string()),
218+
}
192219
}
193220

194221
pub fn final_query_result_multi(mut cx: FunctionContext) -> JsResult<JsPromise> {

packages/cubejs-backend-native/src/transport.rs

Lines changed: 100 additions & 35 deletions
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,18 @@ use neon::prelude::*;
33
use std::collections::HashMap;
44
use std::fmt::Display;
55

6+
use crate::auth::NativeAuthContext;
7+
use crate::channel::{call_raw_js_with_channel_as_callback, NodeSqlGenerator, ValueFromJs};
8+
use crate::node_obj_serializer::NodeObjSerializer;
9+
use crate::orchestrator::convert_final_query_result_array_from_js;
10+
use crate::{
11+
auth::TransportRequest, channel::call_js_with_channel_as_callback,
12+
stream::call_js_with_stream_as_callback,
13+
};
614
use async_trait::async_trait;
15+
use cubeorchestrator::query_result_transform::{
16+
get_final_cubestore_result_array, RequestResultArray,
17+
};
718
use cubesql::compile::engine::df::scan::{MemberField, SchemaRef};
819
use cubesql::compile::engine::df::wrapper::SqlQuery;
920
use cubesql::transport::{
@@ -20,14 +31,6 @@ use serde::Serialize;
2031
use std::sync::Arc;
2132
use uuid::Uuid;
2233

23-
use crate::auth::NativeAuthContext;
24-
use crate::channel::{call_raw_js_with_channel_as_callback, NodeSqlGenerator};
25-
use crate::node_obj_serializer::NodeObjSerializer;
26-
use crate::{
27-
auth::TransportRequest, channel::call_js_with_channel_as_callback,
28-
stream::call_js_with_stream_as_callback,
29-
};
30-
3134
#[derive(Debug)]
3235
pub struct NodeBridgeTransport {
3336
channel: Arc<Channel>,
@@ -369,54 +372,116 @@ impl TransportService for NodeBridgeTransport {
369372
streaming: false,
370373
})?;
371374

372-
let result = call_js_with_channel_as_callback(
375+
let result = call_raw_js_with_channel_as_callback(
373376
self.channel.clone(),
374377
self.on_sql_api_load.clone(),
375-
Some(extra),
378+
extra,
379+
Box::new(|cx, v| Ok(cx.string(v).as_value(cx))),
380+
Box::new(move |cx, v| {
381+
// It's too heavy/slow to get instance of ResultArrayWrapper from JS
382+
// and then call/await the .getFinalResult() method which needs be
383+
// executed again on JS side to get the actual needed date,
384+
// instead we pass it directly from JS side.
385+
// In case of wrapped result it's actually a tuple of
386+
// (transformDataJson[], rawData[], resultDataJson[])
387+
if let Ok(result_wrapped) = v.downcast::<JsArray, _>(cx) {
388+
let res_wrapped_vec = result_wrapped.to_vec(cx).map_cube_err("Can't convert JS result to array")?;
389+
390+
if res_wrapped_vec.len() != 3 {
391+
return Err(CubeError::internal("Expected a tuple with 3 elements: transformDataJson[], rawData[], resultDataJson[]".to_string()));
392+
}
393+
394+
let transform_data_array = res_wrapped_vec.first().unwrap();
395+
let data_array = res_wrapped_vec.get(1).unwrap()
396+
.downcast_or_throw::<JsArray, _>(cx).map_cube_err("Can't downcast js data to array")?;
397+
let results_data_array = res_wrapped_vec.get(2).unwrap();
398+
399+
match convert_final_query_result_array_from_js(
400+
cx,
401+
*transform_data_array,
402+
data_array,
403+
*results_data_array,
404+
) {
405+
Ok((transform_requests, cube_store_results, mut request_results)) => {
406+
get_final_cubestore_result_array(
407+
&transform_requests,
408+
&cube_store_results,
409+
&mut request_results,
410+
).map_cube_err("Can't build result array")?;
411+
412+
Ok(ValueFromJs::RequestResultArray(RequestResultArray {
413+
results: request_results,
414+
}))
415+
}
416+
Err(err) => {
417+
Err(CubeError::internal(format!("Error converting result data: {:?}", err.to_string())))
418+
}
419+
}
420+
421+
} else if let Ok(str) = v.downcast::<JsString, _>(cx) {
422+
Ok(ValueFromJs::String(str.value(cx)))
423+
} else {
424+
Err(CubeError::internal("Can't downcast callback argument to string or resultWrapper object".to_string()))
425+
}
426+
})
376427
)
377428
.await;
429+
378430
if let Err(e) = &result {
379431
if e.message.to_lowercase().contains("continue wait") {
380432
continue;
381433
}
382434
}
383435

384-
let response: serde_json::Value = result?;
436+
match result? {
437+
ValueFromJs::String(result) => {
438+
let response: serde_json::Value = serde_json::Value::String(result);
385439

386-
#[cfg(debug_assertions)]
387-
trace!("[transport] Request <- {:?}", response);
388-
#[cfg(not(debug_assertions))]
389-
trace!("[transport] Request <- <hidden>");
440+
#[cfg(debug_assertions)]
441+
trace!("[transport] Request <- {:?}", response);
442+
#[cfg(not(debug_assertions))]
443+
trace!("[transport] Request <- <hidden>");
390444

391-
if let Some(error_value) = response.get("error") {
392-
match error_value {
393-
serde_json::Value::String(error) => {
394-
if error.to_lowercase() == *"continue wait" {
395-
debug!(
445+
if let Some(error_value) = response.get("error") {
446+
match error_value {
447+
serde_json::Value::String(error) => {
448+
if error.to_lowercase() == *"continue wait" {
449+
debug!(
396450
"[transport] load - retrying request (continue wait) requestId: {}",
397451
request_id
398452
);
399453

400-
continue;
401-
} else {
402-
return Err(CubeError::user(error.clone()));
403-
}
404-
}
405-
other => {
406-
error!(
454+
continue;
455+
} else {
456+
return Err(CubeError::user(error.clone()));
457+
}
458+
}
459+
other => {
460+
error!(
407461
"[transport] load - strange response, success which contains error: {:?}",
408462
other
409463
);
410464

411-
return Err(CubeError::internal(
412-
"Error response with broken data inside".to_string(),
413-
));
414-
}
415-
}
416-
};
465+
return Err(CubeError::internal(
466+
"Error response with broken data inside".to_string(),
467+
));
468+
}
469+
}
470+
};
417471

418-
break serde_json::from_value::<TransportLoadResponse>(response)
419-
.map_err(|err| CubeError::user(err.to_string()));
472+
break serde_json::from_value::<TransportLoadResponse>(response)
473+
.map_err(|err| CubeError::user(err.to_string()));
474+
}
475+
ValueFromJs::RequestResultArray(result) => {
476+
let response = TransportLoadResponse {
477+
pivot_query: None,
478+
slow_query: None,
479+
query_type: None,
480+
results: result.results.into_iter().map(|v| v.into()).collect(),
481+
};
482+
break Ok(response);
483+
}
484+
}
420485
}
421486
}
422487

0 commit comments

Comments
 (0)