From 067af4b668b437150a49c87d4ef5d32ef286ab58 Mon Sep 17 00:00:00 2001 From: Taddes Date: Thu, 4 Dec 2025 16:31:49 -0500 Subject: [PATCH 1/6] put bso --- syncstorage-postgres/src/db/db_impl.rs | 100 ++++++++++++++++++++++++- syncstorage-postgres/src/lib.rs | 1 + 2 files changed, 98 insertions(+), 3 deletions(-) diff --git a/syncstorage-postgres/src/db/db_impl.rs b/syncstorage-postgres/src/db/db_impl.rs index c360af50f6..f37d994981 100644 --- a/syncstorage-postgres/src/db/db_impl.rs +++ b/syncstorage-postgres/src/db/db_impl.rs @@ -11,8 +11,9 @@ use diesel::{ }; use diesel_async::{AsyncConnection, RunQueryDsl, TransactionManager}; use futures::TryStreamExt; +use std::collections::HashMap; use syncstorage_db_common::{ - error::DbErrorIntrospect, params, results, util::SyncTimestamp, Db, Sorting, + error::DbErrorIntrospect, params, results, util::SyncTimestamp, Db, Sorting, DEFAULT_BSO_TTL, }; use syncstorage_settings::Quota; @@ -416,8 +417,101 @@ impl Db for PgDb { Ok(modified) } - async fn put_bso(&mut self, params: params::PutBso) -> DbResult { - todo!() + async fn put_bso(&mut self, bso: params::PutBso) -> Result { + let collection_id = self.get_or_create_collection_id(&bso.collection).await?; + let user_id: u64 = bso.user_id.legacy_id; + let timestamp = self.timestamp().as_i64(); + if self.quota.enabled { + let usage = self + .get_quota_usage(params::GetQuotaUsage { + user_id: bso.user_id.clone(), + collection: bso.collection.clone(), + collection_id, + }) + .await?; + if usage.total_bytes >= self.quota.size { + let mut tags = HashMap::default(); + tags.insert("collection".to_owned(), bso.collection.clone()); + self.metrics.incr_with_tags("storage.quota.at_limit", tags); + if self.quota.enforced { + return Err(DbError::quota()); + } else { + warn!("Quota at limit for user's collection ({} bytes)", usage.total_bytes; "collection"=>bso.collection.clone()); + } + } + } + + let payload = bso.payload.as_deref().unwrap_or_default(); + let sortindex = bso.sortindex; + let ttl = bso.ttl.map_or(DEFAULT_BSO_TTL, |ttl| ttl); + + // This method is an upsert operation, which allows the update of an existing row + // or inserts a new one if it doesn’t exist. Postgres does not have `UPSERT` but + // achieves this though `INSERT...ON CONFLICT`. + let q: String = r#" + INSERT INTO bso (user_id, collection_id, bso_id, sortindex, payload, modified, expiry) + VALUES ($1, $2, $3, $4, $5, $6, $7) + ON CONFLICT (user_id, collection_id, bso_id) + DO UPDATE SET + user_id = EXCLUDED.user_id, + collection_id = EXCLUDED.collection_id, + bso_id = EXCLUDED.bso_id, + "# + .to_string(); + + let q = format!( + "{}{}", + q, + if bso.sortindex.is_some() { + ", sortindex = VALUES(sortindex)" + } else { + "" + }, + ); + let q = format!( + "{}{}", + q, + if bso.payload.is_some() { + ", payload = VALUES(payload)" + } else { + "" + }, + ); + let q = format!( + "{}{}", + q, + if bso.ttl.is_some() { + "expiry = VALUES(expiry)" + } else { + "" + }, + ); + let q = format!( + "{}{}", + q, + if bso.payload.is_some() || bso.sortindex.is_some() { + "modified = VALUES(modified)" + } else { + "" + }, + ); + sql_query(q) + .bind::(user_id as i64) // XXX: + .bind::(&collection_id) + .bind::(&bso.id) + .bind::, _>(sortindex) + .bind::(payload) + .bind::(timestamp) + .bind::(timestamp + (i64::from(ttl) * 1000)) // remember: this is in millis + .execute(&mut self.conn) + .await?; + + self.update_collection(params::UpdateCollection { + user_id: bso.user_id, + collection_id, + collection: bso.collection, + }) + .await } async fn get_collection_id(&mut self, name: &str) -> DbResult { diff --git a/syncstorage-postgres/src/lib.rs b/syncstorage-postgres/src/lib.rs index 68f77c08cc..5712418266 100644 --- a/syncstorage-postgres/src/lib.rs +++ b/syncstorage-postgres/src/lib.rs @@ -2,6 +2,7 @@ #[macro_use] extern crate diesel; extern crate diesel_migrations; +#[macro_use] extern crate slog_scope; mod db; From d27916d5869ea9bc552ae583e152765496181327 Mon Sep 17 00:00:00 2001 From: Taddes Date: Thu, 4 Dec 2025 16:35:29 -0500 Subject: [PATCH 2/6] post bsos --- syncstorage-postgres/src/db/db_impl.rs | 25 +++++++++++++++++++++++-- 1 file changed, 23 insertions(+), 2 deletions(-) diff --git a/syncstorage-postgres/src/db/db_impl.rs b/syncstorage-postgres/src/db/db_impl.rs index f37d994981..2247ef0405 100644 --- a/syncstorage-postgres/src/db/db_impl.rs +++ b/syncstorage-postgres/src/db/db_impl.rs @@ -357,7 +357,28 @@ impl Db for PgDb { } async fn post_bsos(&mut self, params: params::PostBsos) -> DbResult { - todo!() + let collection_id = self.get_or_create_collection_id(¶ms.collection).await?; + let modified = self.timestamp(); + + for pbso in params.bsos { + self.put_bso(params::PutBso { + user_id: params.user_id.clone(), + collection: params.collection.clone(), + id: pbso.id.clone(), + payload: pbso.payload, + sortindex: pbso.sortindex, + ttl: pbso.ttl, + }) + .await?; + } + self.update_collection(params::UpdateCollection { + user_id: params.user_id, + collection_id, + collection: params.collection, + }) + .await?; + + Ok(modified) } async fn delete_bso(&mut self, params: params::DeleteBso) -> DbResult { @@ -417,7 +438,7 @@ impl Db for PgDb { Ok(modified) } - async fn put_bso(&mut self, bso: params::PutBso) -> Result { + async fn put_bso(&mut self, bso: params::PutBso) -> DbResult { let collection_id = self.get_or_create_collection_id(&bso.collection).await?; let user_id: u64 = bso.user_id.legacy_id; let timestamp = self.timestamp().as_i64(); From 8c1336982e36989df30bcd876fbee8f6108c5aed Mon Sep 17 00:00:00 2001 From: Taddes Date: Fri, 5 Dec 2025 09:15:02 -0500 Subject: [PATCH 3/6] orm variant --- syncstorage-postgres/src/db/db_impl.rs | 132 ++++++++++++++----------- 1 file changed, 77 insertions(+), 55 deletions(-) diff --git a/syncstorage-postgres/src/db/db_impl.rs b/syncstorage-postgres/src/db/db_impl.rs index 2247ef0405..7a7c5f5f91 100644 --- a/syncstorage-postgres/src/db/db_impl.rs +++ b/syncstorage-postgres/src/db/db_impl.rs @@ -469,61 +469,83 @@ impl Db for PgDb { // This method is an upsert operation, which allows the update of an existing row // or inserts a new one if it doesn’t exist. Postgres does not have `UPSERT` but // achieves this though `INSERT...ON CONFLICT`. - let q: String = r#" - INSERT INTO bso (user_id, collection_id, bso_id, sortindex, payload, modified, expiry) - VALUES ($1, $2, $3, $4, $5, $6, $7) - ON CONFLICT (user_id, collection_id, bso_id) - DO UPDATE SET - user_id = EXCLUDED.user_id, - collection_id = EXCLUDED.collection_id, - bso_id = EXCLUDED.bso_id, - "# - .to_string(); - - let q = format!( - "{}{}", - q, - if bso.sortindex.is_some() { - ", sortindex = VALUES(sortindex)" - } else { - "" - }, - ); - let q = format!( - "{}{}", - q, - if bso.payload.is_some() { - ", payload = VALUES(payload)" - } else { - "" - }, - ); - let q = format!( - "{}{}", - q, - if bso.ttl.is_some() { - "expiry = VALUES(expiry)" - } else { - "" - }, - ); - let q = format!( - "{}{}", - q, - if bso.payload.is_some() || bso.sortindex.is_some() { - "modified = VALUES(modified)" - } else { - "" - }, - ); - sql_query(q) - .bind::(user_id as i64) // XXX: - .bind::(&collection_id) - .bind::(&bso.id) - .bind::, _>(sortindex) - .bind::(payload) - .bind::(timestamp) - .bind::(timestamp + (i64::from(ttl) * 1000)) // remember: this is in millis + // let q: String = r#" + // INSERT INTO bso (user_id, collection_id, bso_id, sortindex, payload, modified, expiry) + // VALUES ($1, $2, $3, $4, $5, $6, $7) + // ON CONFLICT (user_id, collection_id, bso_id) + // DO UPDATE SET + // user_id = EXCLUDED.user_id, + // collection_id = EXCLUDED.collection_id, + // bso_id = EXCLUDED.bso_id, + // "# + // .to_string(); + + // let q = format!( + // "{}{}", + // q, + // if bso.sortindex.is_some() { + // ", sortindex = VALUES(sortindex)" + // } else { + // "" + // }, + // ); + // let q = format!( + // "{}{}", + // q, + // if bso.payload.is_some() { + // ", payload = VALUES(payload)" + // } else { + // "" + // }, + // ); + // let q = format!( + // "{}{}", + // q, + // if bso.ttl.is_some() { + // "expiry = VALUES(expiry)" + // } else { + // "" + // }, + // ); + // let q = format!( + // "{}{}", + // q, + // if bso.payload.is_some() || bso.sortindex.is_some() { + // "modified = VALUES(modified)" + // } else { + // "" + // }, + // ); + // sql_query(q) + // .bind::(user_id as i64) // XXX: + // .bind::(&collection_id) + // .bind::(&bso.id) + // .bind::, _>(sortindex) + // .bind::(payload) + // .bind::(timestamp) + // .bind::(timestamp + (i64::from(ttl) * 1000)) // remember: this is in millis + // .execute(&mut self.conn) + // .await?; + + let expiry_ts = SyncTimestamp::from_i64(timestamp + (i64::from(ttl) * 1000))?; // remember: original milli conversion + let modified_ts = SyncTimestamp::from_i64(timestamp)?; + diesel::insert_into(bsos::table) + .values(( + bsos::user_id.eq(user_id as i64), + bsos::collection_id.eq(&collection_id), + bsos::bso_id.eq(&bso.id), + bsos::sortindex.eq(sortindex), + bsos::payload.eq(payload), + bsos::modified.eq(modified_ts.as_naive_datetime()?), + bsos::expiry.eq(expiry_ts.as_naive_datetime()?), + )) + .on_conflict((bsos::user_id, bsos::collection_id, bsos::bso_id)) + .do_update() + .set(( + bsos::user_id.eq(user_id as i64), + bsos::collection_id.eq(&collection_id), + bsos::bso_id.eq(&bso.id), + )) .execute(&mut self.conn) .await?; From b7a86a77dd93fd611f4fff23dfdf8336d14d391e Mon Sep 17 00:00:00 2001 From: Taddes Date: Fri, 5 Dec 2025 12:30:44 -0500 Subject: [PATCH 4/6] implement bso changeset struct to pass to put_bso --- syncstorage-postgres/src/db/db_impl.rs | 35 ++++++++++++++++++++++---- syncstorage-postgres/src/orm_models.rs | 11 +++++++- 2 files changed, 40 insertions(+), 6 deletions(-) diff --git a/syncstorage-postgres/src/db/db_impl.rs b/syncstorage-postgres/src/db/db_impl.rs index 7a7c5f5f91..2ef095ba2d 100644 --- a/syncstorage-postgres/src/db/db_impl.rs +++ b/syncstorage-postgres/src/db/db_impl.rs @@ -21,6 +21,7 @@ use super::PgDb; use crate::{ bsos_query, db::CollectionLock, + orm_models::BsoChangeset, pool::Conn, schema::{bsos, user_collections}, DbError, DbResult, @@ -529,6 +530,34 @@ impl Db for PgDb { let expiry_ts = SyncTimestamp::from_i64(timestamp + (i64::from(ttl) * 1000))?; // remember: original milli conversion let modified_ts = SyncTimestamp::from_i64(timestamp)?; + + let expiry_dt = expiry_ts.as_naive_datetime()?; + let modified_dt = modified_ts.as_naive_datetime()?; + + // The changeset utilizes Diesel's `AsChangeset` trait. + // This allows selective updates of fields if and only if they are `Some()` + let changeset = BsoChangeset { + sortindex: if bso.sortindex.is_some() { + Some(sortindex) + } else { + None + }, + payload: if bso.payload.is_some() { + Some(payload) + } else { + None + }, + expiry: if bso.ttl.is_some() { + Some(expiry_dt) + } else { + None + }, + modified: if bso.payload.is_some() || bso.sortindex.is_some() { + Some(modified_dt) + } else { + None + }, + }; diesel::insert_into(bsos::table) .values(( bsos::user_id.eq(user_id as i64), @@ -541,11 +570,7 @@ impl Db for PgDb { )) .on_conflict((bsos::user_id, bsos::collection_id, bsos::bso_id)) .do_update() - .set(( - bsos::user_id.eq(user_id as i64), - bsos::collection_id.eq(&collection_id), - bsos::bso_id.eq(&bso.id), - )) + .set(changeset) .execute(&mut self.conn) .await?; diff --git a/syncstorage-postgres/src/orm_models.rs b/syncstorage-postgres/src/orm_models.rs index c33929e15f..c7bc816788 100644 --- a/syncstorage-postgres/src/orm_models.rs +++ b/syncstorage-postgres/src/orm_models.rs @@ -2,7 +2,7 @@ use chrono::NaiveDateTime; use uuid::Uuid; use crate::schema::{batch_bsos, batches, bsos, collections, user_collections}; -use diesel::{Identifiable, Queryable}; +use diesel::{AsChangeset, Identifiable, Queryable}; #[allow(clippy::all)] #[derive(Queryable, Debug, Identifiable)] @@ -39,6 +39,15 @@ pub struct Bso { pub expiry: NaiveDateTime, } +#[derive(AsChangeset)] +#[diesel(table_name = bsos)] +pub struct BsoChangeset<'a> { + pub sortindex: Option>, + pub payload: Option<&'a str>, + pub modified: Option, + pub expiry: Option, +} + #[derive(Queryable, Debug, Identifiable)] #[diesel(primary_key(collection_id))] pub struct Collection { From 281ac16d7f8f5f5ae4c9ed4c62657d48b84ca159 Mon Sep 17 00:00:00 2001 From: Taddes Date: Fri, 5 Dec 2025 12:37:32 -0500 Subject: [PATCH 5/6] clean up --- syncstorage-postgres/src/db/db_impl.rs | 70 +++----------------------- 1 file changed, 6 insertions(+), 64 deletions(-) diff --git a/syncstorage-postgres/src/db/db_impl.rs b/syncstorage-postgres/src/db/db_impl.rs index 2ef095ba2d..5797560154 100644 --- a/syncstorage-postgres/src/db/db_impl.rs +++ b/syncstorage-postgres/src/db/db_impl.rs @@ -467,73 +467,15 @@ impl Db for PgDb { let sortindex = bso.sortindex; let ttl = bso.ttl.map_or(DEFAULT_BSO_TTL, |ttl| ttl); - // This method is an upsert operation, which allows the update of an existing row - // or inserts a new one if it doesn’t exist. Postgres does not have `UPSERT` but - // achieves this though `INSERT...ON CONFLICT`. - // let q: String = r#" - // INSERT INTO bso (user_id, collection_id, bso_id, sortindex, payload, modified, expiry) - // VALUES ($1, $2, $3, $4, $5, $6, $7) - // ON CONFLICT (user_id, collection_id, bso_id) - // DO UPDATE SET - // user_id = EXCLUDED.user_id, - // collection_id = EXCLUDED.collection_id, - // bso_id = EXCLUDED.bso_id, - // "# - // .to_string(); - - // let q = format!( - // "{}{}", - // q, - // if bso.sortindex.is_some() { - // ", sortindex = VALUES(sortindex)" - // } else { - // "" - // }, - // ); - // let q = format!( - // "{}{}", - // q, - // if bso.payload.is_some() { - // ", payload = VALUES(payload)" - // } else { - // "" - // }, - // ); - // let q = format!( - // "{}{}", - // q, - // if bso.ttl.is_some() { - // "expiry = VALUES(expiry)" - // } else { - // "" - // }, - // ); - // let q = format!( - // "{}{}", - // q, - // if bso.payload.is_some() || bso.sortindex.is_some() { - // "modified = VALUES(modified)" - // } else { - // "" - // }, - // ); - // sql_query(q) - // .bind::(user_id as i64) // XXX: - // .bind::(&collection_id) - // .bind::(&bso.id) - // .bind::, _>(sortindex) - // .bind::(payload) - // .bind::(timestamp) - // .bind::(timestamp + (i64::from(ttl) * 1000)) // remember: this is in millis - // .execute(&mut self.conn) - // .await?; - - let expiry_ts = SyncTimestamp::from_i64(timestamp + (i64::from(ttl) * 1000))?; // remember: original milli conversion + // original required millisecond conversion + let expiry_ts = SyncTimestamp::from_i64(timestamp + (i64::from(ttl) * 1000))?; let modified_ts = SyncTimestamp::from_i64(timestamp)?; let expiry_dt = expiry_ts.as_naive_datetime()?; let modified_dt = modified_ts.as_naive_datetime()?; + let modified = self.timestamp().as_naive_datetime()?; + let expiry = modified_dt + TimeDelta::seconds(ttl as i64); // The changeset utilizes Diesel's `AsChangeset` trait. // This allows selective updates of fields if and only if they are `Some()` let changeset = BsoChangeset { @@ -565,8 +507,8 @@ impl Db for PgDb { bsos::bso_id.eq(&bso.id), bsos::sortindex.eq(sortindex), bsos::payload.eq(payload), - bsos::modified.eq(modified_ts.as_naive_datetime()?), - bsos::expiry.eq(expiry_ts.as_naive_datetime()?), + bsos::modified.eq(modified_dt), + bsos::expiry.eq(expiry_dt), )) .on_conflict((bsos::user_id, bsos::collection_id, bsos::bso_id)) .do_update() From 070757897371a1033e00c1b2b0fa8263fbd7b7da Mon Sep 17 00:00:00 2001 From: Taddes Date: Wed, 10 Dec 2025 20:49:11 -0500 Subject: [PATCH 6/6] r --- syncstorage-postgres/src/db/db_impl.rs | 24 +++++++++--------------- syncstorage-postgres/src/orm_models.rs | 2 +- 2 files changed, 10 insertions(+), 16 deletions(-) diff --git a/syncstorage-postgres/src/db/db_impl.rs b/syncstorage-postgres/src/db/db_impl.rs index 5797560154..dc7d996755 100644 --- a/syncstorage-postgres/src/db/db_impl.rs +++ b/syncstorage-postgres/src/db/db_impl.rs @@ -1,7 +1,7 @@ #![allow(unused_variables)] // XXX: use async_trait::async_trait; -use chrono::NaiveDateTime; +use chrono::{NaiveDateTime, TimeDelta}; use diesel::{ delete, dsl::{count, max, sql}, @@ -442,7 +442,6 @@ impl Db for PgDb { async fn put_bso(&mut self, bso: params::PutBso) -> DbResult { let collection_id = self.get_or_create_collection_id(&bso.collection).await?; let user_id: u64 = bso.user_id.legacy_id; - let timestamp = self.timestamp().as_i64(); if self.quota.enabled { let usage = self .get_quota_usage(params::GetQuotaUsage { @@ -467,20 +466,15 @@ impl Db for PgDb { let sortindex = bso.sortindex; let ttl = bso.ttl.map_or(DEFAULT_BSO_TTL, |ttl| ttl); - // original required millisecond conversion - let expiry_ts = SyncTimestamp::from_i64(timestamp + (i64::from(ttl) * 1000))?; - let modified_ts = SyncTimestamp::from_i64(timestamp)?; - - let expiry_dt = expiry_ts.as_naive_datetime()?; - let modified_dt = modified_ts.as_naive_datetime()?; - let modified = self.timestamp().as_naive_datetime()?; - let expiry = modified_dt + TimeDelta::seconds(ttl as i64); + // Expiry originally required millisecond conversion + let expiry = modified + TimeDelta::seconds(ttl as i64); + // The changeset utilizes Diesel's `AsChangeset` trait. // This allows selective updates of fields if and only if they are `Some()` let changeset = BsoChangeset { sortindex: if bso.sortindex.is_some() { - Some(sortindex) + sortindex // sortindex is already an Option of type `Option` } else { None }, @@ -490,12 +484,12 @@ impl Db for PgDb { None }, expiry: if bso.ttl.is_some() { - Some(expiry_dt) + Some(expiry) } else { None }, modified: if bso.payload.is_some() || bso.sortindex.is_some() { - Some(modified_dt) + Some(modified) } else { None }, @@ -507,8 +501,8 @@ impl Db for PgDb { bsos::bso_id.eq(&bso.id), bsos::sortindex.eq(sortindex), bsos::payload.eq(payload), - bsos::modified.eq(modified_dt), - bsos::expiry.eq(expiry_dt), + bsos::modified.eq(modified), + bsos::expiry.eq(expiry), )) .on_conflict((bsos::user_id, bsos::collection_id, bsos::bso_id)) .do_update() diff --git a/syncstorage-postgres/src/orm_models.rs b/syncstorage-postgres/src/orm_models.rs index c7bc816788..ab4aeb4caa 100644 --- a/syncstorage-postgres/src/orm_models.rs +++ b/syncstorage-postgres/src/orm_models.rs @@ -42,7 +42,7 @@ pub struct Bso { #[derive(AsChangeset)] #[diesel(table_name = bsos)] pub struct BsoChangeset<'a> { - pub sortindex: Option>, + pub sortindex: Option, pub payload: Option<&'a str>, pub modified: Option, pub expiry: Option,