Skip to content

Commit

Permalink
fix reprocessing files when more than the limit arrives within the lo…
Browse files Browse the repository at this point in the history
…okback offset

When the cleaning process is triggered, it will get the timestamp of the
100th oldest file that has been processed. If that time is greater than
the lookback offset, we will only remove files older than the lookback.
Otherwise, we will remove any file older than the 100th entry.

- add cache clean logging
- break out getting FileInfo from s3 with `FileInfoPollerStore` trait
  • Loading branch information
michaeldjeffrey committed Aug 20, 2024
1 parent 7a90c4a commit 3986bbf
Showing 1 changed file with 141 additions and 63 deletions.
204 changes: 141 additions & 63 deletions file_store/src/file_info_poller.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ use derive_builder::Builder;
use futures::{future::LocalBoxFuture, stream::BoxStream, StreamExt};
use futures_util::TryFutureExt;
use retainer::Cache;
use sqlx::postgres::PgQueryResult;
use std::{collections::VecDeque, marker::PhantomData, sync::Arc, time::Duration};
use task_manager::ManagedTask;
use tokio::sync::mpsc::{Receiver, Sender};
Expand All @@ -27,7 +28,13 @@ pub trait FileInfoPollerState: Send + Sync + 'static {

async fn exists(&self, process_name: &str, file_info: &FileInfo) -> Result<bool>;

async fn clean(&self, process_name: &str, file_type: &str) -> Result;
// Returns number of items cleaned
async fn clean(
&self,
process_name: &str,
file_type: &str,
offset: DateTime<Utc>,
) -> Result<u64>;
}

#[async_trait::async_trait]
Expand Down Expand Up @@ -272,11 +279,26 @@ where
}

async fn clean(&self, cache: &MemoryFileCache) -> Result {
let cache_before = cache.len().await;
cache.purge(4, 0.25).await;
self.config
let cache_after = cache.len().await;

let db_removed = self
.config
.state
.clean(&self.config.process_name, &self.config.prefix)
.clean(
&self.config.process_name,
&self.config.prefix,
self.after(self.latest_file_timestamp),
)
.await?;

tracing::info!(
cache_removed = cache_before - cache_after,
db_removed,
"cache clean"
);

Ok(())
}

Expand Down Expand Up @@ -445,24 +467,52 @@ impl FileInfoPollerState for sqlx::Pool<sqlx::Postgres> {
.map_err(Error::from)
}

async fn clean(&self, process_name: &str, file_type: &str) -> Result {
sqlx::query(
async fn clean(
&self,
process_name: &str,
file_type: &str,
offset: DateTime<Utc>,
) -> Result<u64> {
let t100_timestamp: Option<DateTime<Utc>> = sqlx::query_scalar(
r#"
SELECT file_timestamp
FROM files_processed
WHERE process_name = $1
AND file_type = $2
ORDER BY file_timestamp DESC
LIMIT 1 OFFSET 100;
"#,
)
.bind(process_name)
.bind(file_type)
.fetch_optional(self)
.await?;

let Some(t100) = t100_timestamp else {
// The cleaning limit has not been reached, remove nothing.
return Ok(0);
};

// To keep from reprocessing files, we need to make sure rows that exist
// within the offset window are not removed.
let older_than_limit = t100.min(offset);

let query_result: PgQueryResult = sqlx::query(
r#"
DELETE FROM files_processed where file_name in (
SELECT file_name
FROM files_processed
WHERE process_name = $1 and file_type = $2
ORDER BY file_timestamp DESC
OFFSET 100
)
DELETE FROM files_processed
WHERE process_name = $1
AND file_type = $2
AND file_timestamp < $3
"#,
)
.bind(process_name)
.bind(file_type)
.bind(older_than_limit)
.execute(self)
.await
.map(|_| ())
.map_err(Error::from)
.map_err(Error::from)?;

Ok(query_result.rows_affected())
}
}

Expand All @@ -481,7 +531,7 @@ mod tests {
#[async_trait::async_trait]
impl FileInfoPollerParser<String> for TestParser {
async fn parse(&self, _byte_stream: ByteStream) -> Result<Vec<String>> {
Ok(vec!["Hello".into(), "world".into()])
Ok(vec![])
}
}

Expand All @@ -490,106 +540,134 @@ mod tests {
async fn list_all<A, B>(
&self,
_file_type: &str,
_after: A,
_before: B,
after: A,
before: B,
) -> Result<Vec<FileInfo>>
where
A: Into<Option<DateTime<Utc>>> + Send + Sync + Copy,
B: Into<Option<DateTime<Utc>>> + Send + Sync + Copy,
{
Ok(self.0.to_owned())
let after = after.into();
let before = before.into();

Ok(self
.0
.clone()
.into_iter()
.filter(|file_info| after.map_or(true, |v| file_info.timestamp > v))
.filter(|file_info| before.map_or(true, |v| file_info.timestamp <= v))
.collect())
}

async fn get_raw<K>(&self, _key: K) -> Result<ByteStream>
where
K: Into<String> + Send + Sync,
{
Ok(ByteStream::from_static(
b"hello, is it me you're looking for?",
))
Ok(ByteStream::default())
}
}

#[sqlx::test]
async fn test_file_after_oldest_timestamp(pool: PgPool) -> anyhow::Result<()> {
async fn do_not_reprocess_files_when_offset_exceeds_earliest_file(
pool: PgPool,
) -> anyhow::Result<()> {
// Cleaning the files_processed table should not cause files within the
// `FileInfoPoller.config.offset` window to be reprocessed.

// There is no auto-migration for tests in this lib workspace.
pool.execute(
r#"
CREATE TABLE files_processed (
process_name TEXT NOT NULL DEFAULT 'default',
file_name VARCHAR PRIMARY KEY,
file_type VARCHAR NOT NULL,
file_timestamp TIMESTAMPTZ NOT NULL,
processed_at TIMESTAMPTZ NOT NULL
file_name VARCHAR PRIMARY KEY,
file_type VARCHAR NOT NULL,
file_timestamp TIMESTAMPTZ NOT NULL,
processed_at TIMESTAMPTZ NOT NULL
);
"#,
)
.await?;

// The important aspect of this test is that all the files to be
// processed happen _within_ the lookback offset.
const EXPECTED_FILE_COUNT: i64 = 150;
let mut infos = vec![];
for day_ago in 0..150 {
for seconds in 0..EXPECTED_FILE_COUNT {
let file_info = FileInfo {
key: format!("key-{day_ago}"),
key: format!("key-{seconds}"),
prefix: "file_type".to_string(),
timestamp: Utc::now() - chrono::Duration::days(day_ago),
size: 125,
timestamp: Utc::now() - chrono::Duration::seconds(seconds),
size: 42,
};
infos.push(file_info);
}

let (mut receiver, ingest_server) =
// To simulate a restart, we're going to make a new FileInfoPoller.
// This closure is to ensure they have the same settings.
let file_info_builder = || {
let six_hours = chrono::Duration::hours(6).to_std().unwrap();
FileInfoPollerConfigBuilder::<String, _, TestStore, _>::default()
.parser(TestParser)
.state(pool.clone())
.store(TestStore(infos.clone()))
.lookback(LookbackBehavior::StartAfter(Utc::now()))
.prefix("test-prefix".to_string())
.lookback(LookbackBehavior::Max(six_hours))
.prefix("file_type".to_string())
.offset(six_hours)
.create()
.await?;
};

// The first startup of the file info poller, there is nothing to clean.
// And all file_infos will be returned to be processed.
let (mut receiver, ingest_server) = file_info_builder().await?;
let (trigger, shutdown) = triggered::trigger();
let _handle = tokio::spawn(async move {
let status = ingest_server.run(shutdown).await;
println!("ingest server: {status:?}");
tokio::spawn(async move {
if let Err(status) = ingest_server.run(shutdown).await {
println!("ingest server went down unexpectedly: {status:?}");
}
});

// "process" all the files.
let mut idx = 0;
while idx < 150 {
let msg = timeout(Duration::from_secs(1), receiver.recv()).await?;
idx += 1;
println!("{idx} :: got {:?}", msg.map(|x| x.data));
// "process" all the files. They are not recorded into the database
// until the file is consumed as a stream.
let mut processed = 0;
while processed < EXPECTED_FILE_COUNT {
match timeout(Duration::from_secs(1), receiver.recv()).await? {
Some(msg) => {
processed += 1;
let mut txn = pool.begin().await?;
let _x = msg.into_stream(&mut txn).await?;
txn.commit().await?;
}
err => panic!("something went wrong: {err:?}"),
};
}

// Shutdown the ingest server, we're going to create a new one and start it.
// This is more or less equivalent to a restart of the service.
let _shutdown = trigger.trigger();

let (mut receiver, ingest_server) =
FileInfoPollerConfigBuilder::<String, _, TestStore, _>::default()
.parser(TestParser)
.state(pool.clone())
.store(TestStore(infos))
.lookback(LookbackBehavior::StartAfter(Utc::now()))
.prefix("test-prefix".to_string())
.create()
.await?;
trigger.trigger();

// The service will immediately "clean" the database
// The second startup of the file info poller, there are 100+ files that
// have been processed. The initial clean should not remove processed
// files in a way that causes us to re-receive any files within our
// offset for processing.
let (mut receiver, ingest_server) = file_info_builder().await?;
let (trigger, shutdown) = triggered::trigger();
let _handle = tokio::spawn(async move {
let status = ingest_server.run(shutdown).await;
println!("ingest server: {status:?}");
if let Err(status) = ingest_server.run(shutdown).await {
println!("ingest server went down unexpectedly: {status:?}");
}
});

// Requesting the "same" files from it, none should be returned, because we've already "proessed" them all.
let _err = match timeout(Duration::from_secs(1), receiver.recv()).await {
Err(_err) => _err,
// Attempting to recieve files for processing. The timeout should fire,
// because all the files we have setup exist within the offset, and
// should still be in the database.
match timeout(Duration::from_secs(1), receiver.recv()).await {
Err(_err) => (),
Ok(msg) => {
panic!("we got something when we expected nothing.: {msg:?}");
}
};
}

let _shutdown = trigger.trigger();
// Shut down for great good
trigger.trigger();

Ok(())
}
Expand Down

0 comments on commit 3986bbf

Please sign in to comment.