Skip to content

Commit

Permalink
feat(structured data): add support for datetime columns (#2734)
Browse files Browse the repository at this point in the history
* feat(structured data): add support for datetime columns

* add the nanos

* align error messages

---------

Co-authored-by: Henry Fontanier <henry@dust.tt>
  • Loading branch information
fontanierh and Henry Fontanier authored Dec 1, 2023
1 parent 1edfe02 commit 35eda04
Show file tree
Hide file tree
Showing 3 changed files with 105 additions and 3 deletions.
1 change: 1 addition & 0 deletions core/Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions core/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -57,3 +57,4 @@ deno_core = "0.200"
rayon = "1.8.0"
clap = { version = "4.4", features = ["derive"] }
async-recursion = "1.0"
chrono = "0.4.31"
106 changes: 103 additions & 3 deletions core/src/databases/table_schema.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ use std::collections::{HashMap, HashSet};

use super::database::{DatabaseRow, HasValue};
use anyhow::{anyhow, Result};
use chrono::prelude::DateTime;
use itertools::Itertools;
use rusqlite::{types::ToSqlOutput, ToSql};
use serde::{Deserialize, Serialize};
Expand All @@ -14,6 +15,7 @@ pub enum TableSchemaFieldType {
Float,
Text,
Bool,
DateTime,
}

#[derive(Debug, Serialize, Deserialize, Clone, PartialEq)]
Expand Down Expand Up @@ -48,6 +50,7 @@ impl std::fmt::Display for TableSchemaFieldType {
TableSchemaFieldType::Float => write!(f, "real"),
TableSchemaFieldType::Text => write!(f, "text"),
TableSchemaFieldType::Bool => write!(f, "boolean"),
TableSchemaFieldType::DateTime => write!(f, "timestamp"),
}
}
}
Expand Down Expand Up @@ -118,7 +121,11 @@ impl TableSchema {
Value::String(s) => {
format!("\"{}\"", s)
}
Value::Object(_) | Value::Array(_) | Value::Null => unreachable!(),
Value::Object(obj) => match Self::try_parse_date_object(obj) {
Some(date) => date,
None => unreachable!(),
},
Value::Array(_) | Value::Null => unreachable!(),
};

if s.len() > POSSIBLE_VALUES_MAX_LEN {
Expand Down Expand Up @@ -161,9 +168,18 @@ impl TableSchema {
}
}
Value::String(_) => TableSchemaFieldType::Text,
Value::Object(_) | Value::Array(_) => Err(anyhow!(
Value::Object(obj) => match Self::try_parse_date_object(obj) {
Some(_) => TableSchemaFieldType::DateTime,
None => Err(anyhow!(
"Field {} is not a primitive or datetime object type on row {} \
(non datetime object and arrays are not supported)",
k,
row_index,
))?,
},
Value::Array(_) => Err(anyhow!(
"Field {} is not a primitive type on row {} \
(object and arrays are not supported)",
(nondatetime object and arrays are not supported)",
k,
row_index,
))?,
Expand Down Expand Up @@ -208,6 +224,7 @@ impl TableSchema {
TableSchemaFieldType::Float => "REAL",
TableSchemaFieldType::Text => "TEXT",
TableSchemaFieldType::Bool => "BOOLEAN",
TableSchemaFieldType::DateTime => "TEXT",
};
format!("\"{}\" {}", column.name, sql_type)
})
Expand Down Expand Up @@ -259,6 +276,10 @@ impl TableSchema {
}
}
Some(Value::String(s)) => Ok(SqlParam::Text(s.clone())),
Some(Value::Object(obj)) => match Self::try_parse_date_object(obj) {
Some(date) => Ok(SqlParam::Text(date)),
None => Err(anyhow!("Unknown object type")),
},
None | Some(Value::Null) => Ok(SqlParam::Null),
_ => Err(anyhow!("Cannot convert value {:?} to SqlParam", object)),
})
Expand Down Expand Up @@ -348,6 +369,24 @@ impl TableSchema {

Ok(TableSchema(merged_schema))
}

fn try_parse_date_object(maybe_date_obj: &serde_json::Map<String, Value>) -> Option<String> {
match (maybe_date_obj.get("type"), maybe_date_obj.get("epoch")) {
(Some(Value::String(date_type)), Some(Value::Number(epoch))) => {
if date_type == "datetime" {
let epoch = match epoch.as_i64() {
Some(epoch) => epoch,
None => return None,
};
DateTime::from_timestamp(epoch / 1000, ((epoch % 1000) * 1_000_000) as u32)
.map(|dt| dt.format("%Y-%m-%d %H:%M:%S").to_string())
} else {
None
}
}
_ => None,
}
}
}

#[cfg(test)]
Expand Down Expand Up @@ -769,6 +808,67 @@ mod tests {
Ok(())
}

#[test]
fn test_table_schema_from_rows_with_datetime() -> Result<()> {
let row_1 = json!({
"created_at": {
"type": "datetime",
"epoch": 946684800000_i64 // Corresponds to 2000-01-01 00:00:00
}
});
let row_2 = json!({
"created_at": {
"type": "datetime",
"epoch": 946771200000_i64 // Corresponds to 2000-01-02 00:00:00
}
});
let rows = &vec![
DatabaseRow::new("1".to_string(), row_1.clone()),
DatabaseRow::new("2".to_string(), row_2.clone()),
];

let schema = TableSchema::from_rows(rows)?;

let expected_schema = TableSchema(vec![TableSchemaColumn {
name: "created_at".to_string(),
value_type: TableSchemaFieldType::DateTime,
possible_values: Some(vec![
"2000-01-01 00:00:00".to_string(),
"2000-01-02 00:00:00".to_string(),
]),
}]);

assert_eq!(schema, expected_schema);

let conn = setup_in_memory_db(&schema)?;

let (sql, field_names) = schema.get_insert_sql("test_table");
let params = params_from_iter(
schema.get_insert_params(&field_names, &DatabaseRow::new("1".to_string(), row_1))?,
);
let mut stmt = conn.prepare(&sql)?;
stmt.execute(params)?;

let (sql, field_names) = schema.get_insert_sql("test_table");
let params = params_from_iter(
schema.get_insert_params(&field_names, &DatabaseRow::new("2".to_string(), row_2))?,
);
let mut stmt = conn.prepare(&sql)?;
stmt.execute(params)?;

let mut stmt = conn.prepare(
"SELECT * FROM test_table where created_at > datetime('2000-01-01 00:00:00');",
)?;
let mut rows = stmt.query([])?;
let row = rows.next()?.unwrap();
let created_at: String = row.get("created_at")?;
assert_eq!(created_at, "2000-01-02 00:00:00");
// There should be no more rows.
assert!(rows.next()?.is_none());

Ok(())
}

// Helper function to set up an in-memory database with a test table
fn setup_in_memory_db(schema: &TableSchema) -> Result<Connection> {
let conn = Connection::open_in_memory()?;
Expand Down

0 comments on commit 35eda04

Please sign in to comment.