Skip to content

Commit 421f9c3

Browse files
committed
nits: add abel suggestion
1 parent b76484e commit 421f9c3

File tree

5 files changed

+45
-45
lines changed

5 files changed

+45
-45
lines changed

backend/Cargo.lock

Lines changed: 0 additions & 1 deletion
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

backend/windmill-api/Cargo.toml

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -28,7 +28,7 @@ zip = ["dep:async_zip"]
2828
oauth2 = ["dep:async-oauth2"]
2929
http_trigger = ["dep:matchit"]
3030
static_frontend = ["dep:rust-embed"]
31-
database = ["dep:rust-postgres", "dep:pg_escape", "dep:byteorder", "dep:memchr", "dep:thiserror", "dep:rust_decimal"]
31+
database = ["dep:rust-postgres", "dep:pg_escape", "dep:byteorder", "dep:thiserror", "dep:rust_decimal"]
3232

3333
[dependencies]
3434
windmill-queue.workspace = true
@@ -115,6 +115,5 @@ ulid.workspace = true
115115
rust-postgres = { workspace = true, optional = true }
116116
pg_escape = { workspace = true, optional = true }
117117
byteorder = { workspace = true, optional = true }
118-
memchr = { workspace = true, optional = true }
119118
thiserror = { workspace = true, optional = true }
120119
rust_decimal = { workspace = true, optional = true }

backend/windmill-api/src/postgres_triggers/replication_message.rs

Lines changed: 5 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -3,13 +3,12 @@
33
use core::str;
44
use std::{
55
cmp,
6-
io::{self, Read},
6+
io::{self, Cursor, Read},
77
str::Utf8Error,
88
};
99

1010
use byteorder::{BigEndian, ReadBytesExt};
1111
use bytes::Bytes;
12-
use memchr::memchr;
1312
use rust_postgres::types::{Oid, Type};
1413
use thiserror::Error;
1514

@@ -20,13 +19,12 @@ const X_LOG_DATA_BYTE: u8 = b'w';
2019
/**
2120
* This implementation is inspired by Postgres replication functionality
2221
* from https://github.com/supabase/pg_replicate
23-
*
24-
* Original implementation:
22+
*
23+
* Original implementation:
2524
* - https://github.com/supabase/pg_replicate/blob/main/pg_replicate/src/conversions/cdc_event.rs
26-
*
25+
*
2726
*/
2827

29-
3028
#[derive(Debug)]
3129
pub struct PrimaryKeepAliveBody {
3230
pub wal_end: u64,
@@ -255,7 +253,7 @@ impl Buffer {
255253
}
256254

257255
fn read_cstr(&mut self) -> Result<String, ConversionError> {
258-
match memchr(0, self.slice()) {
256+
match self.slice().iter().position(|&x| x == 0) {
259257
Some(pos) => {
260258
let start = self.idx;
261259
let end = start + pos;

backend/windmill-api/src/postgres_triggers/trigger.rs

Lines changed: 28 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
use std::{collections::HashMap, pin::Pin};
22

33
use crate::{
4+
db::DB,
45
postgres_triggers::{
56
get_database_resource,
67
relation::RelationConverter,
@@ -10,7 +11,6 @@ use crate::{
1011
},
1112
run_job,
1213
},
13-
db::DB,
1414
users::fetch_api_authed,
1515
};
1616
use bytes::{BufMut, Bytes, BytesMut};
@@ -141,8 +141,8 @@ async fn update_ping(
141141
db: &DB,
142142
postgres_trigger: &DatabaseTrigger,
143143
error: Option<&str>,
144-
) -> Option<()> {
145-
match sqlx::query_scalar!(
144+
) -> Result<bool, sqlx::Error> {
145+
let updated = sqlx::query_scalar!(
146146
r#"
147147
UPDATE
148148
postgres_trigger
@@ -162,8 +162,9 @@ async fn update_ping(
162162
*INSTANCE_NAME
163163
)
164164
.fetch_optional(db)
165-
.await
166-
{
165+
.await;
166+
167+
match updated {
167168
Ok(updated) => {
168169
if updated.flatten().is_none() {
169170
// allow faster restart of database trigger
@@ -187,27 +188,32 @@ async fn update_ping(
187188
"Database {} changed, disabled, or deleted, stopping...",
188189
postgres_trigger.path
189190
);
190-
return None;
191+
return Ok(false);
191192
}
192193
}
193-
Err(err) => {
194-
tracing::warn!(
195-
"Error updating ping of database {}: {:?}",
196-
postgres_trigger.path,
197-
err
198-
);
199-
return None;
200-
}
194+
Err(err) => return Err(err),
201195
};
202196

203-
Some(())
197+
Ok(true)
204198
}
205199

206200
async fn loop_ping(db: &DB, postgres_trigger: &DatabaseTrigger, error: Option<&str>) {
207201
loop {
208-
if update_ping(db, postgres_trigger, error).await.is_none() {
209-
return;
202+
let result = update_ping(db, postgres_trigger, error).await;
203+
204+
match result {
205+
Ok(false) => return,
206+
Err(err) => {
207+
tracing::warn!(
208+
"Error updating ping of database {}: {:?}",
209+
postgres_trigger.path,
210+
err
211+
);
212+
return;
213+
}
214+
_ => {}
210215
}
216+
211217
tokio::time::sleep(tokio::time::Duration::from_secs(5)).await;
212218
}
213219
}
@@ -271,7 +277,7 @@ async fn listen_to_transactions(
271277

272278
if let Err(err) = &message {
273279
tracing::debug!("{}", err.to_string());
274-
update_ping(&db, postgres_trigger, Some(&err.to_string())).await;
280+
let _ = update_ping(&db, postgres_trigger, Some(&err.to_string())).await;
275281
return;
276282
}
277283

@@ -281,7 +287,7 @@ async fn listen_to_transactions(
281287
Ok(logical_message) => logical_message,
282288
Err(err) => {
283289
tracing::debug!("{}", err.to_string());
284-
update_ping(&db, postgres_trigger, Some(&err.to_string())).await;
290+
let _ = update_ping(&db, postgres_trigger, Some(&err.to_string())).await;
285291
return;
286292
}
287293
};
@@ -297,7 +303,7 @@ async fn listen_to_transactions(
297303
let logical_replication_message = match x_log_data.parse(&logicail_replication_settings) {
298304
Ok(logical_replication_message) => logical_replication_message,
299305
Err(err) => {
300-
update_ping(&db, postgres_trigger, Some(&err.to_string())).await;
306+
let _ = update_ping(&db, postgres_trigger, Some(&err.to_string())).await;
301307
return;
302308
}
303309
};
@@ -325,7 +331,7 @@ async fn listen_to_transactions(
325331
let relation = match relations.get_relation(o_id) {
326332
Ok(relation) => relation,
327333
Err(err) => {
328-
update_ping(&db, postgres_trigger, Some(&err.to_string())).await;
334+
let _ = update_ping(&db, postgres_trigger, Some(&err.to_string())).await;
329335
return;
330336
}
331337
};
@@ -386,7 +392,7 @@ async fn try_to_listen_to_database_transactions(
386392
tokio::spawn(async move {
387393
let result = listen_to_transactions(&pg_trigger, db.clone(), killpill_rx).await;
388394
if let Err(e) = result {
389-
update_ping(&db, &pg_trigger, Some(e.to_string().as_str())).await;
395+
let _ = update_ping(&db, &pg_trigger, Some(e.to_string().as_str())).await;
390396
};
391397
});
392398
} else {

backend/windmill-api/src/variables.rs

Lines changed: 11 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -691,29 +691,27 @@ pub fn decrypt(mc: &MagicCrypt256, value: String) -> Result<String> {
691691
})
692692
}
693693

694-
struct Variable {
695-
value: String,
696-
is_secret: bool,
697-
}
698-
699694
pub async fn get_variable_or_self(path: String, db: &DB, w_id: &str) -> Result<String> {
700695
if !path.starts_with("$var:") {
701696
return Ok(path);
702697
}
703698
let path = path.strip_prefix("$var:").unwrap().to_string();
704-
let mut variable = sqlx::query_as!(
705-
Variable,
706-
"SELECT value, is_secret
707-
FROM variable
708-
WHERE path = $1 AND workspace_id = $2",
699+
700+
let record = sqlx::query!(
701+
"SELECT value, is_secret
702+
FROM variable
703+
WHERE path = $1 AND workspace_id = $2",
709704
&path,
710705
&w_id
711706
)
712707
.fetch_one(db)
713708
.await?;
714-
if variable.is_secret {
709+
710+
let mut value = record.value;
711+
if record.is_secret {
715712
let mc = build_crypt(db, w_id).await?;
716-
variable.value = decrypt(&mc, variable.value)?;
713+
value = decrypt(&mc, value)?;
717714
}
718-
Ok(variable.value)
715+
716+
Ok(value)
719717
}

0 commit comments

Comments
 (0)