Skip to content

Commit

Permalink
gracefully handle feed error for merge filter
Browse files Browse the repository at this point in the history
  • Loading branch information
shouya committed Sep 20, 2024
1 parent bf8610a commit 8ef2178
Show file tree
Hide file tree
Showing 3 changed files with 109 additions and 8 deletions.
32 changes: 32 additions & 0 deletions src/feed.rs
Original file line number Diff line number Diff line change
Expand Up @@ -255,6 +255,38 @@ impl Feed {
}
}
}

pub fn add_item(&mut self, title: String, body: String, link: String) {
let guid = link.clone();

match self {
Feed::Rss(channel) => {
let mut item = rss::Item::default();
item.title = Some(title);
item.description = Some(body);
item.link = Some(link);
item.guid = Some(rss::Guid {
value: guid,
..Default::default()
});
channel.items.push(item);
}
Feed::Atom(feed) => {
let mut entry = atom_syndication::Entry::default();
entry.title = atom_syndication::Text::plain(title);
entry.content = Some(atom_syndication::Content {
value: Some(body),
..Default::default()
});
entry.links.push(atom_syndication::Link {
href: link,
..Default::default()
});
entry.id = guid;
feed.entries.push(entry);
}
};
}
}

#[cfg(test)]
Expand Down
72 changes: 64 additions & 8 deletions src/filter/merge.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ use crate::client::{Client, ClientConfig};
use crate::feed::Feed;
use crate::filter_pipeline::{FilterPipeline, FilterPipelineConfig};
use crate::source::{SimpleSourceConfig, Source};
use crate::util::{ConfigError, Result, SingleOrVec};
use crate::util::{ConfigError, Error, Result, SingleOrVec};

use super::{FeedFilter, FeedFilterConfig, FilterContext};

Expand Down Expand Up @@ -107,36 +107,75 @@ pub struct Merge {
}

impl Merge {
async fn fetch_sources(&self, ctx: &FilterContext) -> Result<Vec<Feed>> {
stream::iter(self.sources.clone())
async fn fetch_sources(
&self,
ctx: &FilterContext,
) -> Result<(Vec<Feed>, Vec<(Source, Error)>)> {
let iter = stream::iter(self.sources.clone())
.map(|source: Source| {
let client = &self.client;
async move {
let feed = source.fetch_feed(ctx, Some(client)).await?;
Ok(feed)
let fetch_res = source.fetch_feed(ctx, Some(client)).await;
(source, fetch_res)
}
})
.buffered(self.parallelism)
.collect::<Vec<_>>()
.await
.into_iter()
.collect::<Result<Vec<Feed>>>()
.into_iter();

collect_partial_oks(iter)
}
}

#[async_trait::async_trait]
impl FeedFilter for Merge {
async fn run(&self, ctx: &mut FilterContext, mut feed: Feed) -> Result<Feed> {
for new_feed in self.fetch_sources(ctx).await? {
let (new_feeds, errors) = self.fetch_sources(ctx).await?;

for new_feed in new_feeds {
let ctx = ctx.subcontext();
let filtered_new_feed = self.filters.run(ctx, new_feed).await?;
feed.merge(filtered_new_feed)?;
}

for (source, error) in errors {
let title = "Failed fetching source".to_owned();
let source_url = source.full_url(ctx).map(|u| u.to_string());
let source_desc = source_url
.clone()
.unwrap_or_else(|| format!("{:?}", source));

let body = format!("<p>Source: {source_desc}</p><p>Error: {error}</p>");
feed.add_item(title, body, source_url.unwrap_or_default());
}

feed.reorder();
Ok(feed)
}
}

// return Err(e) only if all results are Err.
// otherwise return a Vec<T> with failed results
fn collect_partial_oks<S, T, E>(
iter: impl Iterator<Item = (S, Result<T, E>)>,
) -> Result<(Vec<T>, Vec<(S, E)>), E> {
let mut oks = Vec::new();
let mut errs = Vec::new();
for (source, res) in iter {
match res {
Ok(ok) => oks.push(ok),
Err(err) => errs.push((source, err)),
}
}

if oks.is_empty() && !errs.is_empty() {
Err(errs.pop().map(|(_, e)| e).unwrap())
} else {
Ok((oks, errs))
}
}

#[cfg(test)]
mod test {
use crate::test_utils::fetch_endpoint;
Expand Down Expand Up @@ -206,4 +245,21 @@ mod test {
assert_eq!(titles.len(), 3);
}
}

#[test]
fn test_partial_collect() {
let results = vec![
(1, Ok(1)),
(2, Err("error2")),
(3, Ok(3)),
(4, Err("error4")),
];
let (oks, errs) = super::collect_partial_oks(results.into_iter()).unwrap();
assert_eq!(oks, vec![1, 3]);
assert_eq!(errs, vec![(2, "error2"), (4, "error4")]);

let results = vec![(1, Err::<(), _>("error1")), (2, Err("error2"))];
let err = super::collect_partial_oks(results.into_iter()).unwrap_err();
assert_eq!(err, "error1");
}
}
13 changes: 13 additions & 0 deletions src/source.rs
Original file line number Diff line number Diff line change
Expand Up @@ -274,6 +274,19 @@ impl Source {
}
}
}

pub fn full_url(&self, ctx: &FilterContext) -> Option<Url> {
match self {
Source::Dynamic => ctx.source().cloned(),
Source::AbsoluteUrl(url) => Some(url.clone()),
Source::RelativeUrl(path) => ctx
.base_expected()
.ok()
.map(|base| base.join(path).expect("failed to join base and path")),
Source::FromScratch(_) => None,
Source::Templated(_) => None,
}
}
}

fn split_with_delimiter<'a>(
Expand Down

0 comments on commit 8ef2178

Please sign in to comment.