Skip to content

Commit

Permalink
feat: Add cache for all filters (#153)
Browse files Browse the repository at this point in the history
## Background

In [PR #112](#112), @olehbozhok
implemented caching for the `full_text` filter, significantly boosting
performance on resource-constrained devices. This improvement is
particularly effective for filters requiring network fetches or HTML
processing.

## Current Implementation

This PR extends the caching behavior to all filters, with two levels of
cache granularity:

1. Feed-level cache (default):
   - Skips the filter if the input feed is found in the cache
- Expected to be effective due to infrequent feed changes between
fetches

2. Post-level cache (opt-in):
   - Caches individual posts
   - Run the filter on a modified feed with posts not found in the cache
- Correctness depends on filter implementation, thus opt-in by certain
filters only
   - Feed-level cache is still checked first

Both cache levels use expiring LRU (Least Recently Used) caches with
hard expiration times:

- Feed-level: size=5, TTL=12 hours
- Post-level: size=40, TTL=1 hour

## Additional Features

- Cache parameters are currently hard-coded but can be made configurable
upon request
- Caches persist across hot-reloads due to config changes
  + Only caches for changed filters are invalidated during hot-reloads
  • Loading branch information
shouya authored Sep 28, 2024
1 parent 15dfebd commit 952e7da
Show file tree
Hide file tree
Showing 14 changed files with 255 additions and 39 deletions.
3 changes: 1 addition & 2 deletions src/feed.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,7 @@ use crate::Result;

use extension::ExtensionExt;

use self::norm::NormalizedFeed;
pub use self::norm::NormalizedPost;
pub use self::norm::{NormalizedFeed, NormalizedPost};

#[derive(Serialize, Deserialize, Clone, Debug, PartialEq)]
#[serde(untagged)]
Expand Down
4 changes: 2 additions & 2 deletions src/feed/norm.rs
Original file line number Diff line number Diff line change
@@ -1,15 +1,15 @@
use chrono::{DateTime, FixedOffset};
use serde::Serialize;

#[derive(Debug, Serialize, Default)]
#[derive(Debug, Clone, Serialize, Hash, PartialEq, Eq, Default)]
pub struct NormalizedFeed {
pub title: String,
pub link: String,
pub description: Option<String>,
pub posts: Vec<NormalizedPost>,
}

#[derive(Debug, Serialize, PartialEq, Eq, Hash, Default)]
#[derive(Debug, Clone, Serialize, PartialEq, Eq, Hash, Default)]
pub struct NormalizedPost {
pub title: String,
pub author: Option<String>,
Expand Down
8 changes: 7 additions & 1 deletion src/filter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,9 @@ use serde::{Deserialize, Serialize};
use serde_with::{formats::CommaSeparator, serde_as, StringWithSeparator};
use url::Url;

use crate::{feed::Feed, ConfigError, Error, Result};
use crate::{
feed::Feed, filter_cache::CacheGranularity, ConfigError, Error, Result,
};

#[serde_as]
#[derive(Clone, Debug, Deserialize)]
Expand Down Expand Up @@ -122,6 +124,10 @@ impl FilterContext {
#[async_trait::async_trait]
pub trait FeedFilter {
async fn run(&self, ctx: &mut FilterContext, feed: Feed) -> Result<Feed>;

fn cache_granularity(&self) -> CacheGranularity {
CacheGranularity::FeedOnly
}
}

#[async_trait::async_trait]
Expand Down
31 changes: 5 additions & 26 deletions src/filter/full_text.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,14 +10,10 @@ use url::Url;
use crate::client::{self, Client};
use crate::feed::{Feed, Post};
use crate::util::convert_relative_url;
use crate::util::TimedLruCache;
use crate::{ConfigError, Error, Result};

use super::html::{KeepElement, KeepElementConfig};
use super::{FeedFilter, FeedFilterConfig, FilterContext};
use crate::feed::NormalizedPost;

type PostCache = TimedLruCache<NormalizedPost, Post>;

const DEFAULT_PARALLELISM: usize = 20;

Expand Down Expand Up @@ -46,7 +42,6 @@ pub struct FullTextFilter {
keep_element: Option<KeepElement>,
simplify: bool,
keep_guid: bool,
post_cache: PostCache,
}

#[async_trait::async_trait]
Expand All @@ -66,10 +61,6 @@ impl FeedFilterConfig for FullTextConfig {
None => None,
Some(c) => Some(c.build().await?),
};
let post_cache = PostCache::new(
conf_client.get_cache_size(),
conf_client.get_cache_ttl(default_cache_ttl),
);

Ok(FullTextFilter {
simplify,
Expand All @@ -78,7 +69,6 @@ impl FeedFilterConfig for FullTextConfig {
append_mode,
keep_guid,
keep_element,
post_cache,
})
}
}
Expand Down Expand Up @@ -151,24 +141,9 @@ impl FullTextFilter {
}
}

async fn fetch_full_post_cached(&self, post: Post) -> Result<Post> {
let normalized_post = post.normalize();
if let Some(result_post) = self.post_cache.get_cached(&normalized_post) {
return Ok(result_post);
};

match self.fetch_full_post(post).await {
Ok(result_post) => {
self.post_cache.insert(normalized_post, result_post.clone());
Ok(result_post)
}
Err(e) => Err(e),
}
}

async fn fetch_all_posts(&self, posts: Vec<Post>) -> Result<Vec<Post>> {
stream::iter(posts)
.map(|post| self.fetch_full_post_cached(post))
.map(|post| self.fetch_full_post(post))
.buffered(self.parallelism)
.collect::<Vec<_>>()
.await
Expand All @@ -189,6 +164,10 @@ impl FeedFilter for FullTextFilter {
feed.set_posts(posts);
Ok(feed)
}

fn cache_granularity(&self) -> super::CacheGranularity {
super::CacheGranularity::FeedAndPost
}
}

fn strip_post_content(
Expand Down
4 changes: 4 additions & 0 deletions src/filter/highlight.rs
Original file line number Diff line number Diff line change
Expand Up @@ -226,6 +226,10 @@ impl FeedFilter for Highlight {

Ok(feed)
}

fn cache_granularity(&self) -> super::CacheGranularity {
super::CacheGranularity::FeedAndPost
}
}

fn insert_sibling_fragment(
Expand Down
8 changes: 8 additions & 0 deletions src/filter/html.rs
Original file line number Diff line number Diff line change
Expand Up @@ -101,6 +101,10 @@ impl FeedFilter for RemoveElement {
feed.set_posts(posts);
Ok(feed)
}

fn cache_granularity(&self) -> super::CacheGranularity {
super::CacheGranularity::FeedAndPost
}
}

/// Keep only selected elements from the post's body parsed as HTML.
Expand Down Expand Up @@ -192,6 +196,10 @@ impl FeedFilter for KeepElement {
feed.set_posts(posts);
Ok(feed)
}

fn cache_granularity(&self) -> super::CacheGranularity {
super::CacheGranularity::FeedAndPost
}
}

#[derive(
Expand Down
4 changes: 4 additions & 0 deletions src/filter/image_proxy.rs
Original file line number Diff line number Diff line change
Expand Up @@ -168,6 +168,10 @@ impl FeedFilter for ImageProxy {
feed.set_posts(posts);
Ok(feed)
}

fn cache_granularity(&self) -> super::CacheGranularity {
super::CacheGranularity::FeedAndPost
}
}

fn rewrite_html(
Expand Down
4 changes: 4 additions & 0 deletions src/filter/magnet.rs
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,10 @@ impl FeedFilter for Magnet {
feed.set_posts(posts);
Ok(feed)
}

fn cache_granularity(&self) -> super::CacheGranularity {
super::CacheGranularity::FeedAndPost
}
}

lazy_static::lazy_static! {
Expand Down
4 changes: 4 additions & 0 deletions src/filter/sanitize.rs
Original file line number Diff line number Diff line change
Expand Up @@ -151,6 +151,10 @@ impl FeedFilter for Sanitize {
feed.set_posts(posts);
Ok(feed)
}

fn cache_granularity(&self) -> super::CacheGranularity {
super::CacheGranularity::FeedAndPost
}
}

#[cfg(test)]
Expand Down
4 changes: 4 additions & 0 deletions src/filter/simplify_html.rs
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,10 @@ impl FeedFilter for SimplifyHtmlFilter {
feed.set_posts(posts);
Ok(feed)
}

fn cache_granularity(&self) -> super::CacheGranularity {
super::CacheGranularity::FeedAndPost
}
}

pub(super) fn simplify(text: &str, url: &str) -> Option<String> {
Expand Down
137 changes: 137 additions & 0 deletions src/filter_cache.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,137 @@
use crate::{
feed::{Feed, NormalizedFeed, NormalizedPost, Post},
util::TimedLruCache,
Result,
};
use futures::Future;
use std::time::Duration;

#[derive(Clone, Copy, PartialEq, Eq, Debug)]
pub enum CacheGranularity {
FeedOnly,
FeedAndPost,
}

pub struct FilterCache {
feed_cache: TimedLruCache<NormalizedFeed, Feed>,
post_cache: TimedLruCache<NormalizedPost, Post>,
}

impl FilterCache {
pub fn new() -> Self {
Self {
feed_cache: TimedLruCache::new(5, Duration::from_secs(12 * 3600)),
post_cache: TimedLruCache::new(40, Duration::from_secs(3600)),
}
}

pub async fn run<F, Fut>(
&self,
input_feed: Feed,
granularity: CacheGranularity,
f: F,
) -> Result<Feed>
where
F: FnOnce(Feed) -> Fut,
Fut: Future<Output = Result<Feed>>,
{
let input_feed_norm = input_feed.normalize();

// try to get the whole feed from cache first
if let Some(cached_feed) = self.check_feed_cache(&input_feed_norm) {
return Ok(cached_feed);
}

// decide what to do based on cache granularity
let (uncached_input_feed, final_output_posts) = match granularity {
CacheGranularity::FeedOnly => (input_feed.clone(), Vec::new()),
CacheGranularity::FeedAndPost => {
self.process_post_cache(input_feed.clone(), &input_feed_norm)
}
};

// apply the filter function to the uncached portion
let mut output_feed = f(uncached_input_feed.clone()).await?;

// merge cached and newly processed posts
if granularity == CacheGranularity::FeedAndPost {
self.register_post_cache(uncached_input_feed, output_feed.clone());
output_feed = self.reassemble_feed(output_feed, final_output_posts);
}

// update caches
self.register_feed_cache(input_feed, output_feed.clone());

Ok(output_feed)
}

// quick check: is the whole feed already in our cache?
fn check_feed_cache(&self, input_feed_norm: &NormalizedFeed) -> Option<Feed> {
self.feed_cache.get_cached(input_feed_norm)
}

// sort out which posts we need to process and which we can grab from cache
fn process_post_cache(
&self,
mut input_feed: Feed,
input_feed_norm: &NormalizedFeed,
) -> (Feed, Vec<Option<Post>>) {
let all_posts = input_feed.take_posts();
let mut final_output_posts = Vec::new();
let mut uncached_input_posts = Vec::new();

for (post_norm, post) in input_feed_norm.posts.iter().zip(all_posts) {
if let Some(cached_post) = self.post_cache.get_cached(post_norm) {
final_output_posts.push(Some(cached_post));
} else {
final_output_posts.push(None);
uncached_input_posts.push(post);
}
}

input_feed.set_posts(uncached_input_posts);
(input_feed, final_output_posts)
}

// assemble feed with cached and newly processed posts, in the correct order
fn reassemble_feed(
&self,
mut output_feed: Feed,
mut final_output_posts: Vec<Option<Post>>,
) -> Feed {
let mut output_posts = output_feed.take_posts();
output_posts.reverse();

for post in &mut final_output_posts {
if post.is_none() {
*post = output_posts.pop();
}
}

// add any remaining posts from the output feed
final_output_posts.extend(output_posts.into_iter().rev().map(Some));

let final_output_posts = final_output_posts.into_iter().flatten().collect();
output_feed.set_posts(final_output_posts);
output_feed
}

// add processed posts to the post cache
fn register_post_cache(&self, mut input_feed: Feed, mut output_feed: Feed) {
let input_posts = input_feed.take_posts();
let output_posts = output_feed.take_posts();
if input_posts.len() != output_posts.len() {
tracing::warn!("input and output post counts do not match");
}

for (input_post, output_post) in input_posts.into_iter().zip(output_posts) {
let input_post_norm = input_post.normalize();
self.post_cache.insert(input_post_norm, output_post);
}
}

fn register_feed_cache(&self, input_feed: Feed, output_feed: Feed) {
let input_feed_norm = input_feed.normalize();
self.feed_cache.insert(input_feed_norm, output_feed);
}
}
Loading

0 comments on commit 952e7da

Please sign in to comment.