diff --git a/file_store/src/file_info_poller.rs b/file_store/src/file_info_poller.rs index b682fb4a1..ecaa9bfdb 100644 --- a/file_store/src/file_info_poller.rs +++ b/file_store/src/file_info_poller.rs @@ -5,6 +5,7 @@ use derive_builder::Builder; use futures::{future::LocalBoxFuture, stream::BoxStream, StreamExt}; use futures_util::TryFutureExt; use retainer::Cache; +use sqlx::postgres::PgQueryResult; use std::{collections::VecDeque, marker::PhantomData, sync::Arc, time::Duration}; use task_manager::ManagedTask; use tokio::sync::mpsc::{Receiver, Sender}; @@ -27,7 +28,13 @@ pub trait FileInfoPollerState: Send + Sync + 'static { async fn exists(&self, process_name: &str, file_info: &FileInfo) -> Result; - async fn clean(&self, process_name: &str, file_type: &str) -> Result; + // Returns number of items cleaned + async fn clean( + &self, + process_name: &str, + file_type: &str, + offset: DateTime, + ) -> Result; } #[async_trait::async_trait] @@ -272,11 +279,26 @@ where } async fn clean(&self, cache: &MemoryFileCache) -> Result { + let cache_before = cache.len().await; cache.purge(4, 0.25).await; - self.config + let cache_after = cache.len().await; + + let db_removed = self + .config .state - .clean(&self.config.process_name, &self.config.prefix) + .clean( + &self.config.process_name, + &self.config.prefix, + self.after(self.latest_file_timestamp), + ) .await?; + + tracing::info!( + cache_removed = cache_before - cache_after, + db_removed, + "cache clean" + ); + Ok(()) } @@ -445,24 +467,52 @@ impl FileInfoPollerState for sqlx::Pool { .map_err(Error::from) } - async fn clean(&self, process_name: &str, file_type: &str) -> Result { - sqlx::query( + async fn clean( + &self, + process_name: &str, + file_type: &str, + offset: DateTime, + ) -> Result { + let t100_timestamp: Option> = sqlx::query_scalar( + r#" + SELECT file_timestamp + FROM files_processed + WHERE process_name = $1 + AND file_type = $2 + ORDER BY file_timestamp DESC + LIMIT 1 OFFSET 100; + "#, + ) + .bind(process_name) + .bind(file_type) + .fetch_optional(self) + .await?; + + let Some(t100) = t100_timestamp else { + // The cleaning limit has not been reached, remove nothing. + return Ok(0); + }; + + // To keep from reprocessing files, we need to make sure rows that exist + // within the offset window are not removed. + let older_than_limit = t100.min(offset); + + let query_result: PgQueryResult = sqlx::query( r#" - DELETE FROM files_processed where file_name in ( - SELECT file_name - FROM files_processed - WHERE process_name = $1 and file_type = $2 - ORDER BY file_timestamp DESC - OFFSET 100 - ) + DELETE FROM files_processed + WHERE process_name = $1 + AND file_type = $2 + AND file_timestamp < $3 "#, ) .bind(process_name) .bind(file_type) + .bind(older_than_limit) .execute(self) .await - .map(|_| ()) - .map_err(Error::from) + .map_err(Error::from)?; + + Ok(query_result.rows_affected()) } } @@ -481,7 +531,7 @@ mod tests { #[async_trait::async_trait] impl FileInfoPollerParser for TestParser { async fn parse(&self, _byte_stream: ByteStream) -> Result> { - Ok(vec!["Hello".into(), "world".into()]) + Ok(vec![]) } } @@ -490,106 +540,134 @@ mod tests { async fn list_all( &self, _file_type: &str, - _after: A, - _before: B, + after: A, + before: B, ) -> Result> where A: Into>> + Send + Sync + Copy, B: Into>> + Send + Sync + Copy, { - Ok(self.0.to_owned()) + let after = after.into(); + let before = before.into(); + + Ok(self + .0 + .clone() + .into_iter() + .filter(|file_info| after.map_or(true, |v| file_info.timestamp > v)) + .filter(|file_info| before.map_or(true, |v| file_info.timestamp <= v)) + .collect()) } async fn get_raw(&self, _key: K) -> Result where K: Into + Send + Sync, { - Ok(ByteStream::from_static( - b"hello, is it me you're looking for?", - )) + Ok(ByteStream::default()) } } #[sqlx::test] - async fn test_file_after_oldest_timestamp(pool: PgPool) -> anyhow::Result<()> { + async fn do_not_reprocess_files_when_offset_exceeds_earliest_file( + pool: PgPool, + ) -> anyhow::Result<()> { + // Cleaning the files_processed table should not cause files within the + // `FileInfoPoller.config.offset` window to be reprocessed. + + // There is no auto-migration for tests in this lib workspace. pool.execute( r#" CREATE TABLE files_processed ( process_name TEXT NOT NULL DEFAULT 'default', - file_name VARCHAR PRIMARY KEY, - file_type VARCHAR NOT NULL, - file_timestamp TIMESTAMPTZ NOT NULL, - processed_at TIMESTAMPTZ NOT NULL + file_name VARCHAR PRIMARY KEY, + file_type VARCHAR NOT NULL, + file_timestamp TIMESTAMPTZ NOT NULL, + processed_at TIMESTAMPTZ NOT NULL ); "#, ) .await?; + // The important aspect of this test is that all the files to be + // processed happen _within_ the lookback offset. + const EXPECTED_FILE_COUNT: i64 = 150; let mut infos = vec![]; - for day_ago in 0..150 { + for seconds in 0..EXPECTED_FILE_COUNT { let file_info = FileInfo { - key: format!("key-{day_ago}"), + key: format!("key-{seconds}"), prefix: "file_type".to_string(), - timestamp: Utc::now() - chrono::Duration::days(day_ago), - size: 125, + timestamp: Utc::now() - chrono::Duration::seconds(seconds), + size: 42, }; infos.push(file_info); } - let (mut receiver, ingest_server) = + // To simulate a restart, we're going to make a new FileInfoPoller. + // This closure is to ensure they have the same settings. + let file_info_builder = || { + let six_hours = chrono::Duration::hours(6).to_std().unwrap(); FileInfoPollerConfigBuilder::::default() .parser(TestParser) .state(pool.clone()) .store(TestStore(infos.clone())) - .lookback(LookbackBehavior::StartAfter(Utc::now())) - .prefix("test-prefix".to_string()) + .lookback(LookbackBehavior::Max(six_hours)) + .prefix("file_type".to_string()) + .offset(six_hours) .create() - .await?; + }; + // The first startup of the file info poller, there is nothing to clean. + // And all file_infos will be returned to be processed. + let (mut receiver, ingest_server) = file_info_builder().await?; let (trigger, shutdown) = triggered::trigger(); - let _handle = tokio::spawn(async move { - let status = ingest_server.run(shutdown).await; - println!("ingest server: {status:?}"); + tokio::spawn(async move { + if let Err(status) = ingest_server.run(shutdown).await { + println!("ingest server went down unexpectedly: {status:?}"); + } }); - // "process" all the files. - let mut idx = 0; - while idx < 150 { - let msg = timeout(Duration::from_secs(1), receiver.recv()).await?; - idx += 1; - println!("{idx} :: got {:?}", msg.map(|x| x.data)); + // "process" all the files. They are not recorded into the database + // until the file is consumed as a stream. + let mut processed = 0; + while processed < EXPECTED_FILE_COUNT { + match timeout(Duration::from_secs(1), receiver.recv()).await? { + Some(msg) => { + processed += 1; + let mut txn = pool.begin().await?; + let _x = msg.into_stream(&mut txn).await?; + txn.commit().await?; + } + err => panic!("something went wrong: {err:?}"), + }; } // Shutdown the ingest server, we're going to create a new one and start it. - // This is more or less equivalent to a restart of the service. - let _shutdown = trigger.trigger(); - - let (mut receiver, ingest_server) = - FileInfoPollerConfigBuilder::::default() - .parser(TestParser) - .state(pool.clone()) - .store(TestStore(infos)) - .lookback(LookbackBehavior::StartAfter(Utc::now())) - .prefix("test-prefix".to_string()) - .create() - .await?; + trigger.trigger(); - // The service will immediately "clean" the database + // The second startup of the file info poller, there are 100+ files that + // have been processed. The initial clean should not remove processed + // files in a way that causes us to re-receive any files within our + // offset for processing. + let (mut receiver, ingest_server) = file_info_builder().await?; let (trigger, shutdown) = triggered::trigger(); let _handle = tokio::spawn(async move { - let status = ingest_server.run(shutdown).await; - println!("ingest server: {status:?}"); + if let Err(status) = ingest_server.run(shutdown).await { + println!("ingest server went down unexpectedly: {status:?}"); + } }); - // Requesting the "same" files from it, none should be returned, because we've already "proessed" them all. - let _err = match timeout(Duration::from_secs(1), receiver.recv()).await { - Err(_err) => _err, + // Attempting to recieve files for processing. The timeout should fire, + // because all the files we have setup exist within the offset, and + // should still be in the database. + match timeout(Duration::from_secs(1), receiver.recv()).await { + Err(_err) => (), Ok(msg) => { panic!("we got something when we expected nothing.: {msg:?}"); } - }; + } - let _shutdown = trigger.trigger(); + // Shut down for great good + trigger.trigger(); Ok(()) }