diff --git a/core/Cargo.lock b/core/Cargo.lock index 8c76c2b4b129..89951568b259 100644 --- a/core/Cargo.lock +++ b/core/Cargo.lock @@ -1008,6 +1008,7 @@ dependencies = [ "bb8-postgres", "blake3", "bstr 0.2.17", + "chrono", "clap", "cloud-storage", "deno_core", diff --git a/core/Cargo.toml b/core/Cargo.toml index f9bafb278f94..e42696f0cf09 100644 --- a/core/Cargo.toml +++ b/core/Cargo.toml @@ -57,3 +57,4 @@ deno_core = "0.200" rayon = "1.8.0" clap = { version = "4.4", features = ["derive"] } async-recursion = "1.0" +chrono = "0.4.31" diff --git a/core/src/databases/table_schema.rs b/core/src/databases/table_schema.rs index 5e3b84933b0b..77fece13cc79 100644 --- a/core/src/databases/table_schema.rs +++ b/core/src/databases/table_schema.rs @@ -2,6 +2,7 @@ use std::collections::{HashMap, HashSet}; use super::database::{DatabaseRow, HasValue}; use anyhow::{anyhow, Result}; +use chrono::prelude::DateTime; use itertools::Itertools; use rusqlite::{types::ToSqlOutput, ToSql}; use serde::{Deserialize, Serialize}; @@ -14,6 +15,7 @@ pub enum TableSchemaFieldType { Float, Text, Bool, + DateTime, } #[derive(Debug, Serialize, Deserialize, Clone, PartialEq)] @@ -48,6 +50,7 @@ impl std::fmt::Display for TableSchemaFieldType { TableSchemaFieldType::Float => write!(f, "real"), TableSchemaFieldType::Text => write!(f, "text"), TableSchemaFieldType::Bool => write!(f, "boolean"), + TableSchemaFieldType::DateTime => write!(f, "timestamp"), } } } @@ -118,7 +121,11 @@ impl TableSchema { Value::String(s) => { format!("\"{}\"", s) } - Value::Object(_) | Value::Array(_) | Value::Null => unreachable!(), + Value::Object(obj) => match Self::try_parse_date_object(obj) { + Some(date) => date, + None => unreachable!(), + }, + Value::Array(_) | Value::Null => unreachable!(), }; if s.len() > POSSIBLE_VALUES_MAX_LEN { @@ -161,9 +168,18 @@ impl TableSchema { } } Value::String(_) => TableSchemaFieldType::Text, - Value::Object(_) | Value::Array(_) => Err(anyhow!( + Value::Object(obj) => match Self::try_parse_date_object(obj) { + Some(_) => TableSchemaFieldType::DateTime, + None => Err(anyhow!( + "Field {} is not a primitive or datetime object type on row {} \ + (non datetime object and arrays are not supported)", + k, + row_index, + ))?, + }, + Value::Array(_) => Err(anyhow!( "Field {} is not a primitive type on row {} \ - (object and arrays are not supported)", + (nondatetime object and arrays are not supported)", k, row_index, ))?, @@ -208,6 +224,7 @@ impl TableSchema { TableSchemaFieldType::Float => "REAL", TableSchemaFieldType::Text => "TEXT", TableSchemaFieldType::Bool => "BOOLEAN", + TableSchemaFieldType::DateTime => "TEXT", }; format!("\"{}\" {}", column.name, sql_type) }) @@ -259,6 +276,10 @@ impl TableSchema { } } Some(Value::String(s)) => Ok(SqlParam::Text(s.clone())), + Some(Value::Object(obj)) => match Self::try_parse_date_object(obj) { + Some(date) => Ok(SqlParam::Text(date)), + None => Err(anyhow!("Unknown object type")), + }, None | Some(Value::Null) => Ok(SqlParam::Null), _ => Err(anyhow!("Cannot convert value {:?} to SqlParam", object)), }) @@ -348,6 +369,24 @@ impl TableSchema { Ok(TableSchema(merged_schema)) } + + fn try_parse_date_object(maybe_date_obj: &serde_json::Map) -> Option { + match (maybe_date_obj.get("type"), maybe_date_obj.get("epoch")) { + (Some(Value::String(date_type)), Some(Value::Number(epoch))) => { + if date_type == "datetime" { + let epoch = match epoch.as_i64() { + Some(epoch) => epoch, + None => return None, + }; + DateTime::from_timestamp(epoch / 1000, ((epoch % 1000) * 1_000_000) as u32) + .map(|dt| dt.format("%Y-%m-%d %H:%M:%S").to_string()) + } else { + None + } + } + _ => None, + } + } } #[cfg(test)] @@ -769,6 +808,67 @@ mod tests { Ok(()) } + #[test] + fn test_table_schema_from_rows_with_datetime() -> Result<()> { + let row_1 = json!({ + "created_at": { + "type": "datetime", + "epoch": 946684800000_i64 // Corresponds to 2000-01-01 00:00:00 + } + }); + let row_2 = json!({ + "created_at": { + "type": "datetime", + "epoch": 946771200000_i64 // Corresponds to 2000-01-02 00:00:00 + } + }); + let rows = &vec![ + DatabaseRow::new("1".to_string(), row_1.clone()), + DatabaseRow::new("2".to_string(), row_2.clone()), + ]; + + let schema = TableSchema::from_rows(rows)?; + + let expected_schema = TableSchema(vec![TableSchemaColumn { + name: "created_at".to_string(), + value_type: TableSchemaFieldType::DateTime, + possible_values: Some(vec![ + "2000-01-01 00:00:00".to_string(), + "2000-01-02 00:00:00".to_string(), + ]), + }]); + + assert_eq!(schema, expected_schema); + + let conn = setup_in_memory_db(&schema)?; + + let (sql, field_names) = schema.get_insert_sql("test_table"); + let params = params_from_iter( + schema.get_insert_params(&field_names, &DatabaseRow::new("1".to_string(), row_1))?, + ); + let mut stmt = conn.prepare(&sql)?; + stmt.execute(params)?; + + let (sql, field_names) = schema.get_insert_sql("test_table"); + let params = params_from_iter( + schema.get_insert_params(&field_names, &DatabaseRow::new("2".to_string(), row_2))?, + ); + let mut stmt = conn.prepare(&sql)?; + stmt.execute(params)?; + + let mut stmt = conn.prepare( + "SELECT * FROM test_table where created_at > datetime('2000-01-01 00:00:00');", + )?; + let mut rows = stmt.query([])?; + let row = rows.next()?.unwrap(); + let created_at: String = row.get("created_at")?; + assert_eq!(created_at, "2000-01-02 00:00:00"); + // There should be no more rows. + assert!(rows.next()?.is_none()); + + Ok(()) + } + // Helper function to set up an in-memory database with a test table fn setup_in_memory_db(schema: &TableSchema) -> Result { let conn = Connection::open_in_memory()?;