Skip to content

Commit

Permalink
feat: structured data core endpoints for creating / updating database…
Browse files Browse the repository at this point in the history
…s and tables (#2472)

* Add endpoint for registering new databases.

* endpoints for databases and database tables

* databases list remove pagination from API
  • Loading branch information
fontanierh authored Nov 10, 2023
1 parent f82fa0e commit 4258e04
Show file tree
Hide file tree
Showing 4 changed files with 427 additions and 18 deletions.
254 changes: 254 additions & 0 deletions core/bin/dust_api.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1641,6 +1641,235 @@ async fn data_sources_delete(
}
}

// Databases

#[derive(serde::Deserialize)]
struct DatabasesRegisterPayload {
database_id: String,
name: String,
}

async fn databases_register(
extract::Path((project_id, data_source_id)): extract::Path<(i64, String)>,
extract::Json(payload): extract::Json<DatabasesRegisterPayload>,
extract::Extension(state): extract::Extension<Arc<APIState>>,
) -> (StatusCode, Json<APIResponse>) {
let project = project::Project::new_from_id(project_id);
match state
.store
.register_database(
&project,
&data_source_id,
&payload.database_id,
&payload.name,
)
.await
{
Err(e) => error_response(
StatusCode::INTERNAL_SERVER_ERROR,
"internal_server_error",
"Failed to register database",
Some(e),
),
Ok(db) => (
StatusCode::OK,
Json(APIResponse {
error: None,
response: Some(json!({
"database": db
})),
}),
),
}
}

async fn databases_retrieve(
extract::Path((project_id, data_source_id, database_id)): extract::Path<(i64, String, String)>,
extract::Extension(state): extract::Extension<Arc<APIState>>,
) -> (StatusCode, Json<APIResponse>) {
let project = project::Project::new_from_id(project_id);

match state
.store
.load_database(&project, &data_source_id, &database_id)
.await
{
Err(e) => error_response(
StatusCode::INTERNAL_SERVER_ERROR,
"internal_server_error",
"Failed to retrieve database",
Some(e),
),
Ok(db) => match db {
None => error_response(
StatusCode::NOT_FOUND,
"database_not_found",
&format!("No database found for id `{}`", database_id),
None,
),
Some(db) => (
StatusCode::OK,
Json(APIResponse {
error: None,
response: Some(json!({
"database": db
})),
}),
),
},
}
}

#[derive(serde::Deserialize)]
struct DatabasesListQuery {
data_source_id: Option<String>,
}

async fn databases_list(
extract::Path(project_id): extract::Path<i64>,
extract::Query(query): extract::Query<DatabasesListQuery>,
extract::Extension(state): extract::Extension<Arc<APIState>>,
) -> (StatusCode, Json<APIResponse>) {
let project = project::Project::new_from_id(project_id);

match state
.store
.list_databases(&project, &query.data_source_id)
.await
{
Err(e) => error_response(
StatusCode::INTERNAL_SERVER_ERROR,
"internal_server_error",
"Failed to list databases",
Some(e),
),
Ok(dbs) => (
StatusCode::OK,
Json(APIResponse {
error: None,
response: Some(json!({
"databases": dbs
})),
}),
),
}
}

#[derive(serde::Deserialize)]
struct DatabasesTablesUpsertPayload {
table_id: String,
name: String,
description: String,
}

async fn databases_tables_upsert(
extract::Path((project_id, data_source_id, database_id)): extract::Path<(i64, String, String)>,
extract::Json(payload): extract::Json<DatabasesTablesUpsertPayload>,
extract::Extension(state): extract::Extension<Arc<APIState>>,
) -> (StatusCode, Json<APIResponse>) {
let project = project::Project::new_from_id(project_id);

match state
.store
.upsert_database_table(
&project,
&data_source_id,
&database_id,
&payload.table_id,
&payload.name,
&payload.description,
)
.await
{
Err(e) => error_response(
StatusCode::INTERNAL_SERVER_ERROR,
"internal_server_error",
"Failed to upsert database table",
Some(e),
),
Ok(table) => (
StatusCode::OK,
Json(APIResponse {
error: None,
response: Some(json!({
"table": table
})),
}),
),
}
}

async fn databases_tables_retrieve(
extract::Path((project_id, data_source_id, database_id, table_id)): extract::Path<(
i64,
String,
String,
String,
)>,
extract::Extension(state): extract::Extension<Arc<APIState>>,
) -> (StatusCode, Json<APIResponse>) {
let project = project::Project::new_from_id(project_id);

match state
.store
.load_database_table(&project, &data_source_id, &database_id, &table_id)
.await
{
Err(e) => error_response(
StatusCode::INTERNAL_SERVER_ERROR,
"internal_server_error",
"Failed to retrieve database table",
Some(e),
),
Ok(table) => match table {
None => error_response(
StatusCode::NOT_FOUND,
"database_table_not_found",
&format!("No database table found for id `{}`", table_id),
None,
),
Some(table) => (
StatusCode::OK,
Json(APIResponse {
error: None,
response: Some(json!({
"table": table
})),
}),
),
},
}
}

async fn databases_tables_list(
extract::Path((project_id, data_source_id, database_id)): extract::Path<(i64, String, String)>,
extract::Extension(state): extract::Extension<Arc<APIState>>,
) -> (StatusCode, Json<APIResponse>) {
let project = project::Project::new_from_id(project_id);

match state
.store
.list_databases_tables(&project, &data_source_id, &database_id, None)
.await
{
Err(e) => error_response(
StatusCode::INTERNAL_SERVER_ERROR,
"internal_server_error",
"Failed to list database tables",
Some(e),
),
Ok((tables, _)) => (
StatusCode::OK,
Json(APIResponse {
error: None,
response: Some(json!({
"tables": tables,
})),
}),
),
}
}

// Misc

#[derive(serde::Deserialize)]
Expand Down Expand Up @@ -1821,6 +2050,31 @@ fn main() {
"/projects/:project_id/data_sources/:data_source_id",
delete(data_sources_delete),
)
// Databases
.route(
"/projects/:project_id/data_sources/:data_source_id/databases",
post(databases_register),
)
.route(
"/projects/:project_id/data_sources/:data_source_id/databases/:database_id",
get(databases_retrieve),
)
.route(
"/projects/:project_id",
get(databases_list),
)
.route(
"/projects/:project_id/data_sources/:data_source_id/databases/:database_id/tables",
post(databases_tables_upsert),
)
.route(
"/projects/:project_id/data_sources/:data_source_id/databases/:database_id/tables/:table_id",
get(databases_tables_retrieve),
)
.route(
"/projects/:project_id/data_sources/:data_source_id/databases/:database_id/tables",
get(databases_tables_list),
)
// Misc
.route("/tokenize", post(tokenize))

Expand Down
36 changes: 20 additions & 16 deletions core/src/stores/postgres.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1840,7 +1840,7 @@ impl Store for PostgresStore {
async fn list_databases(
&self,
project: &Project,
data_source_id: Option<&str>,
data_source_id: &Option<String>,
) -> Result<Vec<Database>> {
let project_id = project.project_id();

Expand Down Expand Up @@ -1931,7 +1931,7 @@ impl Store for PostgresStore {
// Upsert Database Table.
let stmt = c
.prepare(
"INSERT INTO database_tables \
"INSERT INTO databases_tables \
(id, database, created, table_id, name, description) \
VALUES (DEFAULT, $1, $2, $3, $4, $5) \
ON CONFLICT (table_id, database) DO UPDATE \
Expand Down Expand Up @@ -1978,7 +1978,7 @@ impl Store for PostgresStore {

let stmt = c
.prepare(
"SELECT created, table_id, name, description FROM database_tables \
"SELECT created, table_id, name, description FROM databases_tables \
WHERE database IN (
SELECT id FROM databases WHERE data_source IN (
SELECT id FROM data_sources WHERE project = $1 AND data_source_id = $2
Expand Down Expand Up @@ -2012,7 +2012,7 @@ impl Store for PostgresStore {
}
}

async fn list_database_tables(
async fn list_databases_tables(
&self,
project: &Project,
data_source_id: &str,
Expand Down Expand Up @@ -2058,7 +2058,7 @@ impl Store for PostgresStore {
None => {
let stmt = c
.prepare(
"SELECT created, table_id, name, description FROM database_tables \
"SELECT created, table_id, name, description FROM databases_tables \
WHERE database = $1",
)
.await?;
Expand All @@ -2067,7 +2067,7 @@ impl Store for PostgresStore {
Some((limit, offset)) => {
let stmt = c
.prepare(
"SELECT created, table_id, name, description FROM database_tables \
"SELECT created, table_id, name, description FROM databases_tables \
WHERE database = $1 LIMIT $2 OFFSET $3",
)
.await?;
Expand Down Expand Up @@ -2101,7 +2101,7 @@ impl Store for PostgresStore {
None => tables.len(),
Some(_) => {
let stmt = c
.prepare("SELECT COUNT(*) FROM database_tables WHERE database = $1")
.prepare("SELECT COUNT(*) FROM databases_tables WHERE database = $1")
.await?;
let t: i64 = c.query_one(&stmt, &[&database_row_id]).await?.get(0);
t as usize
Expand Down Expand Up @@ -2157,7 +2157,9 @@ impl Store for PostgresStore {

// get the table row id
let stmt = c
.prepare("SELECT id FROM database_tables WHERE database = $1 AND table_id = $2 LIMIT 1")
.prepare(
"SELECT id FROM databases_tables WHERE database = $1 AND table_id = $2 LIMIT 1",
)
.await?;
let r = c.query(&stmt, &[&database_row_id, &table_id]).await?;
let table_row_id: i64 = match r.len() {
Expand Down Expand Up @@ -2213,7 +2215,7 @@ impl Store for PostgresStore {
.prepare(
"SELECT created, row_id, content FROM database_rows \
WHERE database_table IN (
SELECT id FROM database_tables WHERE database IN (
SELECT id FROM databases_tables WHERE database IN (
SELECT id FROM databases WHERE data_source IN (
SELECT id FROM data_sources WHERE project = $1 AND data_source_id = $2
) AND database_id = $3
Expand Down Expand Up @@ -2295,15 +2297,15 @@ impl Store for PostgresStore {

let mut params: Vec<&(dyn ToSql + Sync)> = vec![&database_row_id];
let mut query = "SELECT database_rows.created, database_rows.row_id, \
database_rows.content, database_tables.table_id \
database_rows.content, databases_tables.table_id \
FROM database_rows \
INNER JOIN database_tables ON database_rows.database_table = database_tables.id \
WHERE database_tables.database = $1".to_string();
INNER JOIN databases_tables ON database_rows.database_table = databases_tables.id \
WHERE databases_tables.database = $1".to_string();

let table_id_str: String;
if let Some(table_id) = table_id {
table_id_str = table_id.to_string();
query.push_str(" AND database_tables.table_id = $2");
query.push_str(" AND databases_tables.table_id = $2");
params.push(&table_id_str);
};

Expand Down Expand Up @@ -2345,8 +2347,8 @@ impl Store for PostgresStore {
None => rows.len(),
Some(_) => {
let count_sql = match table_id {
Some(_) => "SELECT COUNT(*) FROM database_rows WHERE database_table = (SELECT id FROM database_tables WHERE database = $1 AND table_id = $2)",
None => "SELECT COUNT(*) FROM database_rows WHERE database_table IN (SELECT id FROM database_tables WHERE database = $1)",
Some(_) => "SELECT COUNT(*) FROM database_rows WHERE database_table = (SELECT id FROM databases_tables WHERE database = $1 AND table_id = $2)",
None => "SELECT COUNT(*) FROM database_rows WHERE database_table IN (SELECT id FROM databases_tables WHERE database = $1)",
};
let t: i64 = c.query_one(count_sql, &params).await?.get(0);
t as usize
Expand Down Expand Up @@ -2399,7 +2401,9 @@ impl Store for PostgresStore {

// get the table row id
let stmt = c
.prepare("SELECT id FROM database_tables WHERE database = $1 AND table_id = $2 LIMIT 1")
.prepare(
"SELECT id FROM databases_tables WHERE database = $1 AND table_id = $2 LIMIT 1",
)
.await?;
let r = c.query(&stmt, &[&database_row_id, &table_id]).await?;
let table_row_id: i64 = match r.len() {
Expand Down
Loading

0 comments on commit 4258e04

Please sign in to comment.