Skip to content

Commit

Permalink
don't panic
Browse files Browse the repository at this point in the history
  • Loading branch information
fontanierh committed Nov 13, 2023
1 parent c5a80aa commit 391a5eb
Showing 1 changed file with 50 additions and 24 deletions.
74 changes: 50 additions & 24 deletions core/src/databases/database.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ use anyhow::{anyhow, Result};
use crate::{project::Project, stores::store::Store, utils};
use rayon::prelude::*;
use serde::{Deserialize, Serialize};
use serde_json::{Number, Value};
use serde_json::Value;
use std::collections::HashMap;

use super::table_schema::TableSchema;
Expand Down Expand Up @@ -156,6 +156,11 @@ impl Database {
let time_query_start = utils::now();
// Retrieve the DB schema and construct a SQL string.
let (schema, rows_by_table) = self.get_schema(project, store.clone(), true).await?;
let rows_by_table = match rows_by_table {
Some(rows) => rows,
None => return Err(anyhow!("No rows found")),
};

utils::done(&format!(
"DSSTRUCTSTAT Finished retrieving schema: duration={}ms",
utils::now() - time_query_start
Expand Down Expand Up @@ -204,14 +209,14 @@ impl Database {

// insert the rows in the DB
let insert_execute_start = utils::now();
for (table_name, rows) in rows_by_table.expect("No rows found") {
for (table_name, rows) in rows_by_table {
if rows.is_empty() {
continue;
}

let table_schema = table_schemas
.get(&table_name)
.expect("No schema found for table");
.ok_or_else(|| anyhow!("No schema found for table {}", table_name))?;

let mut insert_sql = "".to_string();
for row in rows {
Expand All @@ -228,33 +233,54 @@ impl Database {
));

let user_query_execute_start = utils::now();
let rows = stmt.query_map([], |row| {
let mut map = serde_json::Map::new();
for i in 0..column_count {
let column_name = column_names.get(i).expect("Invalid column name");
let value = match row.get(i).expect("Invalid value") {
rusqlite::types::Value::Integer(i) => Value::Number(i.into()),
rusqlite::types::Value::Real(f) => {
Value::Number(Number::from_f64(f).expect("invalid float value"))
}
rusqlite::types::Value::Text(t) => Value::String(t),
// convert blob into string
rusqlite::types::Value::Blob(b) => {
Value::String(String::from_utf8(b).expect("Invalid UTF-8 sequence"))
}

rusqlite::types::Value::Null => Value::Null,
};
map.insert(column_name.to_string(), value);
}
Ok(Value::Object(map))
// Execute the query and get an iterator over the mapped rows.
let mapped_rows = stmt.query_map([], |row| {
(0..column_count)
.map(|i| row.get::<usize, rusqlite::types::Value>(i))
.collect::<Result<Vec<rusqlite::types::Value>, rusqlite::Error>>()
})?;

let results = mapped_rows.map(|row_result| {
row_result
.map_err(|e| anyhow!("Failed to retrieve a row: {}", e))
.and_then(|row| {
column_names.iter().enumerate().try_fold(
serde_json::Map::new(),
|mut acc, (i, column_name)| {
row.get(i)
.ok_or_else(|| {
anyhow!("Missing value at index {} for column {}", i, column_name)
})
.and_then(|sql_value| {
let json_value = match sql_value {
rusqlite::types::Value::Integer(i) => serde_json::Value::Number((*i).into()),
rusqlite::types::Value::Real(f) => serde_json::Number::from_f64(*f)
.ok_or_else(|| {
anyhow!("Invalid float value for column {}", column_name)
})
.map(serde_json::Value::Number)?,
rusqlite::types::Value::Text(t) => serde_json::Value::String(t.clone()),
rusqlite::types::Value::Blob(b) => String::from_utf8(b.clone())
.map_err(|_| {
anyhow!("Invalid UTF-8 sequence for column {}", column_name)
})
.map(serde_json::Value::String)?,
rusqlite::types::Value::Null => serde_json::Value::Null,
};
acc.insert(column_name.clone(), json_value);
Ok(acc)
})
},
)
.map(serde_json::Value::Object)
})
})
.collect::<Result<Vec<serde_json::Value>, anyhow::Error>>()?;
utils::done(&format!(
"DSSTRUCTSTAT Finished executing user query: duration={}ms",
utils::now() - user_query_execute_start
));

let results = rows.collect::<Result<Vec<Value>, rusqlite::Error>>()?;
let results_refs = results.iter().collect::<Vec<&Value>>();

let infer_result_schema_start = utils::now();
Expand Down

0 comments on commit 391a5eb

Please sign in to comment.