Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix rare case where the file info poller will re-process old files #854

Closed
wants to merge 2 commits into from
Closed
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
87 changes: 63 additions & 24 deletions file_store/src/file_info_poller.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,12 @@ pub trait FileInfoPollerState: Send + Sync + 'static {
file_type: &str,
) -> Result<Option<DateTime<Utc>>>;

async fn oldest_timestamp(
&self,
process_name: &str,
file_type: &str,
) -> Result<Option<DateTime<Utc>>>;

async fn exists(&self, process_name: &str, file_info: &FileInfo) -> Result<bool>;

async fn clean(&self, process_name: &str, file_type: &str) -> Result;
Expand Down Expand Up @@ -108,6 +114,7 @@ pub struct FileInfoPollerServer<T, S, P> {
sender: Sender<FileInfoStream<T>>,
file_queue: VecDeque<FileInfo>,
latest_file_timestamp: Option<DateTime<Utc>>,
oldest_file_timestamp: Option<DateTime<Utc>>,
cache: MemoryFileCache,
}

Expand All @@ -127,6 +134,10 @@ where
.state
.latest_timestamp(&config.process_name, &config.prefix)
.await?;
let oldest_file_timestamp = config
.state
.oldest_timestamp(&config.process_name, &config.prefix)
.await?;

Ok((
receiver,
Expand All @@ -135,6 +146,7 @@ where
sender,
file_queue: VecDeque::new(),
latest_file_timestamp,
oldest_file_timestamp,
cache: create_cache(),
},
))
Expand Down Expand Up @@ -187,7 +199,7 @@ where
return Ok(file_info);
}

let after = self.after(self.latest_file_timestamp);
let after = self.after(self.latest_file_timestamp, self.oldest_file_timestamp);
let before = Utc::now();
let files = self
.config
Expand Down Expand Up @@ -241,15 +253,18 @@ where
Ok(())
}

fn after(&self, latest: Option<DateTime<Utc>>) -> DateTime<Utc> {
fn after(&self, latest: Option<DateTime<Utc>>, oldest: Option<DateTime<Utc>>) -> DateTime<Utc> {
let latest_offset = latest.map(|lt| lt - self.config.offset);
match self.config.lookback {
let lookback = match self.config.lookback {
LookbackBehavior::StartAfter(start_after) => latest_offset.unwrap_or(start_after),
LookbackBehavior::Max(max_lookback) => {
let max_ts = Utc::now() - max_lookback;
latest_offset.map(|lt| lt.max(max_ts)).unwrap_or(max_ts)
}
}
};
// If there is an oldest timestamp, use the max of that and the given look back
// to prevent us from re-processing old files.
oldest.map(|x| x.max(lookback)).unwrap_or(lookback)
}

async fn clean(&self, cache: &MemoryFileCache) -> Result {
Expand Down Expand Up @@ -375,6 +390,8 @@ impl FileInfoPollerStateRecorder for &mut sqlx::Transaction<'_, sqlx::Postgres>
}
}

pub const MAX_SIZE_OF_FILES_PROCESSED_TABLE: i64 = 100;

#[cfg(feature = "sqlx-postgres")]
#[async_trait::async_trait]
impl FileInfoPollerState for sqlx::Pool<sqlx::Postgres> {
Expand All @@ -383,9 +400,9 @@ impl FileInfoPollerState for sqlx::Pool<sqlx::Postgres> {
process_name: &str,
file_type: &str,
) -> Result<Option<DateTime<Utc>>> {
sqlx::query_scalar::<_, Option<DateTime<Utc>>>(
sqlx::query_scalar(
r#"
SELECT MAX(file_timestamp) FROM files_processed where process_name = $1 and file_type = $2
SELECT MAX(file_timestamp) FROM files_processed where process_name = $1 and file_type = $2
"#,
)
.bind(process_name)
Expand All @@ -395,36 +412,58 @@ impl FileInfoPollerState for sqlx::Pool<sqlx::Postgres> {
.map_err(Error::from)
}

async fn oldest_timestamp(
&self,
process_name: &str,
file_type: &str,
) -> Result<Option<DateTime<Utc>>> {
sqlx::query_scalar(
r#"
SELECT file_timestamp FROM files_processed
WHERE process_name = $1 AND file_type = $2
ORDER BY file_timestamp DESC
OFFSET $3
LIMIT 1
"#,
)
.bind(process_name)
.bind(file_type)
.bind(MAX_SIZE_OF_FILES_PROCESSED_TABLE - 1)
.fetch_one(self)
maplant marked this conversation as resolved.
Show resolved Hide resolved
.await
.map_err(Error::from)
}

async fn exists(&self, process_name: &str, file_info: &FileInfo) -> Result<bool> {
sqlx::query_scalar::<_, bool>(
sqlx::query_scalar(
r#"
SELECT EXISTS(SELECT 1 from files_processed where process_name = $1 and file_name = $2)
SELECT EXISTS(SELECT 1 from files_processed where process_name = $1 and file_name = $2)
"#,
)
.bind(process_name)
.bind(&file_info.key)
.fetch_one(self)
.await
.map_err(Error::from)
)
.bind(process_name)
.bind(&file_info.key)
.fetch_one(self)
.await
.map_err(Error::from)
}

async fn clean(&self, process_name: &str, file_type: &str) -> Result {
sqlx::query(
r#"
DELETE FROM files_processed where file_name in (
SELECT file_name
FROM files_processed
WHERE process_name = $1 and file_type = $2
ORDER BY file_timestamp DESC
OFFSET 100
)
DELETE FROM files_processed where file_name in (
SELECT file_name
FROM files_processed
WHERE process_name = $1 and file_type = $2
ORDER BY file_timestamp DESC
OFFSET $3
)
"#,
)
.bind(process_name)
.bind(file_type)
.bind(MAX_SIZE_OF_FILES_PROCESSED_TABLE)
.execute(self)
.await
.map(|_| ())
.map_err(Error::from)
.await?;
Ok(())
}
}