Skip to content

Commit 25ca217

Browse files
committed
backend: re-work inputs API impl after v2
1 parent 1585e88 commit 25ca217

File tree

1 file changed

+78
-142
lines changed

1 file changed

+78
-142
lines changed

backend/windmill-api/src/inputs.rs

Lines changed: 78 additions & 142 deletions
Original file line numberDiff line numberDiff line change
@@ -15,18 +15,16 @@ use axum::{
1515
use chrono::{DateTime, Utc};
1616
use serde::{Deserialize, Serialize};
1717
use serde_json::Value;
18-
use sqlx::{types::Uuid, FromRow};
19-
use std::{
20-
fmt::{Display, Formatter},
21-
vec,
22-
};
18+
use sqlx::types::Uuid;
19+
use std::fmt::{Display, Formatter};
2320
use windmill_common::{
2421
db::UserDB,
2522
error::JsonResult,
2623
jobs::JobKind,
2724
scripts::to_i64,
2825
utils::{not_found_if_none, paginate, Pagination},
2926
};
27+
3028
pub fn workspaced_service() -> Router {
3129
Router::new()
3230
.route("/history", get(get_input_history))
@@ -40,20 +38,7 @@ pub fn workspaced_service() -> Router {
4038
)
4139
}
4240

43-
#[derive(Debug, sqlx::FromRow, Serialize, Deserialize)]
44-
pub struct InputRow {
45-
pub id: Uuid,
46-
pub workspace_id: String,
47-
pub runnable_id: String,
48-
pub runnable_type: RunnableType,
49-
pub name: String,
50-
pub args: sqlx::types::Json<Box<serde_json::value::RawValue>>,
51-
pub created_at: DateTime<Utc>,
52-
pub created_by: String,
53-
pub is_public: bool,
54-
}
55-
56-
#[derive(Debug, Serialize, Deserialize, sqlx::Type)]
41+
#[derive(Debug, Serialize, Deserialize, sqlx::Type, Copy, Clone)]
5742
#[sqlx(type_name = "runnable_type")]
5843
pub enum RunnableType {
5944
ScriptHash,
@@ -71,24 +56,6 @@ impl Display for RunnableType {
7156
}
7257
}
7358

74-
impl RunnableType {
75-
fn job_kind(&self) -> JobKind {
76-
match self {
77-
RunnableType::ScriptHash => JobKind::Script,
78-
RunnableType::ScriptPath => JobKind::Script,
79-
RunnableType::FlowPath => JobKind::Flow,
80-
}
81-
}
82-
83-
fn column_name(&self) -> &'static str {
84-
match self {
85-
RunnableType::ScriptHash => "script_hash",
86-
RunnableType::ScriptPath => "script_path",
87-
RunnableType::FlowPath => "script_path",
88-
}
89-
}
90-
}
91-
9259
#[derive(Debug, Serialize, Deserialize)]
9360
pub struct RunnableParams {
9461
pub runnable_id: String,
@@ -99,22 +66,13 @@ pub struct RunnableParams {
9966
pub struct Input {
10067
id: Uuid,
10168
name: String,
102-
created_at: chrono::DateTime<chrono::Utc>,
103-
args: sqlx::types::Json<Box<serde_json::value::RawValue>>,
69+
created_at: DateTime<Utc>,
70+
args: Value,
10471
created_by: String,
10572
is_public: bool,
10673
success: bool,
10774
}
10875

109-
#[derive(Debug, Serialize, Deserialize, FromRow)]
110-
pub struct CompletedJobMini {
111-
id: Uuid,
112-
created_at: chrono::DateTime<chrono::Utc>,
113-
args: Option<sqlx::types::Json<Box<serde_json::value::RawValue>>>,
114-
created_by: String,
115-
success: bool,
116-
}
117-
11876
#[derive(Deserialize)]
11977
struct GetInputHistory {
12078
include_preview: Option<bool>,
@@ -132,60 +90,43 @@ async fn get_input_history(
13290

13391
let mut tx = user_db.begin(&authed).await?;
13492

135-
let sql = &format!(
136-
"select id, created_at, created_by, 'null'::jsonb as args, success from v2_completed_job \
137-
where {} = $1 and job_kind = any($2) and workspace_id = $3 \
138-
order by created_at desc limit $4 offset $5",
139-
r.runnable_type.column_name()
140-
);
141-
142-
let query = sqlx::query_as::<_, CompletedJobMini>(sql);
143-
144-
let query = match r.runnable_type {
145-
RunnableType::ScriptHash => query.bind(to_i64(&r.runnable_id)?),
146-
_ => query.bind(&r.runnable_id),
93+
let job_kinds = match (r.runnable_type, g.include_preview.unwrap_or(false)) {
94+
(RunnableType::FlowPath, true) => [JobKind::Flow, JobKind::FlowPreview].as_slice(),
95+
(RunnableType::FlowPath, _) => [JobKind::Flow].as_slice(),
96+
(_, true) => [JobKind::Script, JobKind::Preview].as_slice(),
97+
_ => [JobKind::Script].as_slice(),
14798
};
14899

149-
let job_kinds = match r.runnable_type.job_kind() {
150-
kind @ JobKind::Script if g.include_preview.unwrap_or(false) => {
151-
vec![kind, JobKind::Preview]
152-
}
153-
kind @ JobKind::Flow if g.include_preview.unwrap_or(false) => {
154-
vec![kind, JobKind::FlowPreview]
155-
}
156-
kind => vec![kind],
100+
let (runnable_id, runnable_path) = match r.runnable_type {
101+
RunnableType::ScriptHash => (Some(to_i64(&r.runnable_id)?), None),
102+
_ => (None, Some(&r.runnable_id)),
157103
};
158104

159-
let rows = query
160-
.bind(job_kinds)
161-
.bind(&w_id)
162-
.bind(per_page as i32)
163-
.bind(offset as i32)
164-
.fetch_all(&mut *tx)
165-
.await?;
105+
let inputs = sqlx::query!(
106+
"SELECT c.id, created_at, created_by, status = 'success'::job_status AS \"success!\"
107+
FROM v2_job_completed c JOIN v2_job USING (id)
108+
WHERE (runnable_id = $1 OR runnable_path = $2) AND kind = ANY($3) AND c.workspace_id = $4
109+
ORDER BY created_at DESC LIMIT $5 OFFSET $6",
110+
runnable_id,
111+
runnable_path,
112+
job_kinds as &[JobKind],
113+
&w_id,
114+
per_page as i32,
115+
offset as i32
116+
)
117+
.map(|r| Input {
118+
id: r.id,
119+
name: format!("{} {}", r.created_at.format("%H:%M %-d/%-m"), r.created_by),
120+
created_at: r.created_at,
121+
args: Value::Null,
122+
created_by: r.created_by,
123+
is_public: true,
124+
success: r.success,
125+
})
126+
.fetch_all(&mut *tx)
127+
.await?;
166128

167129
tx.commit().await?;
168-
169-
let mut inputs = vec![];
170-
171-
for row in rows {
172-
inputs.push(Input {
173-
id: row.id,
174-
name: format!(
175-
"{} {}",
176-
row.created_at.format("%H:%M %-d/%-m"),
177-
row.created_by
178-
),
179-
created_at: row.created_at,
180-
args: row.args.unwrap_or(sqlx::types::Json(
181-
serde_json::value::RawValue::from_string("null".to_string()).unwrap(),
182-
)),
183-
created_by: row.created_by,
184-
is_public: true,
185-
success: row.success,
186-
});
187-
}
188-
189130
Ok(Json(inputs))
190131
}
191132

@@ -194,6 +135,7 @@ struct GetArgs {
194135
input: Option<bool>,
195136
allow_large: Option<bool>,
196137
}
138+
197139
async fn get_args_from_history_or_saved_input(
198140
authed: ApiAuthed,
199141
Extension(user_db): Extension<UserDB>,
@@ -252,37 +194,26 @@ async fn list_saved_inputs(
252194

253195
let mut tx = user_db.begin(&authed).await?;
254196

255-
let rows = sqlx::query_as::<_, InputRow>(
256-
"select id, workspace_id, runnable_id, runnable_type, name, 'null'::jsonb as args, created_at, created_by, is_public from input \
257-
where runnable_id = $1 and runnable_type = $2 and workspace_id = $3 \
258-
and (is_public IS true OR created_by = $4) \
259-
order by created_at desc limit $5 offset $6",
197+
let inputs = sqlx::query_as!(
198+
Input,
199+
"SELECT id, name, 'null'::JSONB AS args, created_by, created_at, is_public,
200+
TRUE AS \"success!\"
201+
FROM input
202+
WHERE runnable_id = $1 AND runnable_type = $2 AND workspace_id = $3
203+
AND (is_public IS TRUE OR created_by = $4)
204+
ORDER BY created_at DESC LIMIT $5 OFFSET $6",
205+
&r.runnable_id,
206+
&r.runnable_type as &RunnableType,
207+
&w_id,
208+
&authed.username,
209+
per_page as i32,
210+
offset as i32
260211
)
261-
.bind(&r.runnable_id)
262-
.bind(&r.runnable_type)
263-
.bind(&w_id)
264-
.bind(&authed.username)
265-
.bind(per_page as i32)
266-
.bind(offset as i32)
267212
.fetch_all(&mut *tx)
268213
.await?;
269214

270215
tx.commit().await?;
271216

272-
let mut inputs: Vec<Input> = Vec::new();
273-
274-
for row in rows {
275-
inputs.push(Input {
276-
id: row.id,
277-
name: row.name,
278-
args: row.args,
279-
created_by: row.created_by,
280-
created_at: row.created_at,
281-
is_public: row.is_public,
282-
success: true,
283-
})
284-
}
285-
286217
Ok(Json(inputs))
287218
}
288219

@@ -303,16 +234,17 @@ async fn create_input(
303234

304235
let id = Uuid::new_v4();
305236

306-
sqlx::query(
307-
"INSERT INTO input (id, workspace_id, runnable_id, runnable_type, name, args, created_by) VALUES ($1, $2, $3, $4, $5, $6, $7)",
237+
sqlx::query!(
238+
"INSERT INTO input (id, workspace_id, runnable_id, runnable_type, name, args, created_by)
239+
VALUES ($1, $2, $3, $4, $5, $6, $7)",
240+
&id,
241+
&w_id,
242+
&r.runnable_id,
243+
&r.runnable_type as &RunnableType,
244+
&input.name,
245+
sqlx::types::Json(&input.args) as sqlx::types::Json<&Box<serde_json::value::RawValue>>,
246+
&authed.username
308247
)
309-
.bind(&id)
310-
.bind(&w_id)
311-
.bind(&r.runnable_id)
312-
.bind(&r.runnable_type)
313-
.bind(&input.name)
314-
.bind(sqlx::types::Json(&input.args))
315-
.bind(&authed.username)
316248
.execute(&mut *tx)
317249
.await?;
318250

@@ -336,13 +268,15 @@ async fn update_input(
336268
) -> JsonResult<String> {
337269
let mut tx = user_db.begin(&authed).await?;
338270

339-
sqlx::query("UPDATE input SET name = $1, is_public = $2 WHERE id = $3 and workspace_id = $4")
340-
.bind(&input.name)
341-
.bind(&input.is_public)
342-
.bind(&input.id)
343-
.bind(&w_id)
344-
.execute(&mut *tx)
345-
.await?;
271+
sqlx::query!(
272+
"UPDATE input SET name = $1, is_public = $2 WHERE id = $3 and workspace_id = $4",
273+
&input.name,
274+
&input.is_public,
275+
&input.id,
276+
&w_id
277+
)
278+
.execute(&mut *tx)
279+
.await?;
346280

347281
tx.commit().await?;
348282

@@ -356,11 +290,13 @@ async fn delete_input(
356290
) -> JsonResult<String> {
357291
let mut tx = user_db.begin(&authed).await?;
358292

359-
sqlx::query("DELETE FROM input WHERE id = $1 and workspace_id = $2")
360-
.bind(&i_id)
361-
.bind(&w_id)
362-
.execute(&mut *tx)
363-
.await?;
293+
sqlx::query!(
294+
"DELETE FROM input WHERE id = $1 and workspace_id = $2",
295+
&i_id,
296+
&w_id
297+
)
298+
.execute(&mut *tx)
299+
.await?;
364300

365301
tx.commit().await?;
366302

0 commit comments

Comments
 (0)