Skip to content

Commit

Permalink
More cleanup
Browse files Browse the repository at this point in the history
  • Loading branch information
Half-Shot committed Feb 12, 2024
1 parent 4b626ee commit bf88aab
Show file tree
Hide file tree
Showing 6 changed files with 122 additions and 65 deletions.
64 changes: 54 additions & 10 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ rand = "0.8.5"
uuid = { version = "1.7.0", features = ["v4"] }
async-std = "1.12.0"
redis = { version = "0.24.0", features = ["aio", "tokio-comp"] }
futures = "0.3.30"

[build-dependencies]
napi-build = "2"
7 changes: 4 additions & 3 deletions src/feeds/parser.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ pub struct FeedItem {
pub pubdate: Option<String>,
pub summary: Option<String>,
pub author: Option<String>,
pub hash_id: Option<String>,
pub hash_id: String,
}

#[derive(Serialize, Debug, Deserialize)]
Expand Down Expand Up @@ -71,7 +71,8 @@ fn parse_channel_to_js_result(channel: &Channel) -> JsRssChannel {
.or(item.link.clone())
.or(item.title.clone())
.and_then(|f| hash_id(f).ok())
.and_then(|f| Some(format!("md5:{}", f))),
.and_then(|f| Some(format!("md5:{}", f)))
.unwrap(), // TODO: Handle error
})
.collect(),
}
Expand Down Expand Up @@ -118,7 +119,7 @@ fn parse_feed_to_js_result(feed: &Feed) -> JsRssChannel {
.map(|date| date.to_rfc2822()),
summary: item.summary().map(|v| v.value.clone()),
author: authors_to_string(item.authors()),
hash_id: hash_id(item.id.clone()).ok(),
hash_id: hash_id(item.id.clone()).unwrap(),
})
.collect(),
}
Expand Down
89 changes: 52 additions & 37 deletions src/feeds/reader.rs
Original file line number Diff line number Diff line change
@@ -1,10 +1,12 @@
use std::collections::{HashMap, HashSet};
use std::future;
use crate::util::QueueWithBackoff;
use std::time::{Duration, Instant};
use napi::bindgen_prelude::{Error as JsError, Status};
use napi::bindgen_prelude::Error as JsError;
use napi::tokio::sync::RwLock;
use std::sync::Arc;
use uuid::Uuid;
use futures::future::{Future, select_all};
use crate::feeds::parser::{js_read_feed, ReadFeedOptions};
use crate::stores::memory::MemoryStorageProvider;
use crate::stores::traits::StorageProvider;
Expand Down Expand Up @@ -51,7 +53,6 @@ pub struct FeedReader {
queue: QueueWithBackoff,
feeds_to_retain: HashSet<String>,
cache_times: Arc<RwLock<HashMap<String, CacheTime>>>,
storage_provider: Box<impl StorageProvider>,
poll_interval_seconds: f64,
poll_concurrency: u8,
poll_timeout_seconds: i64,
Expand All @@ -70,14 +71,12 @@ impl FeedReader {
pub fn new(poll_interval_seconds: f64, poll_concurrency: u8, poll_timeout_seconds: i64) -> Self {
let mut cache_times: HashMap<String, CacheTime> = HashMap::new();
let mut lock = Arc::new(RwLock::new(cache_times));
let mut storage_provider = MemoryStorageProvider::new();
FeedReader {
queue: QueueWithBackoff::new(
BACKOFF_TIME_MS,
BACKOFF_POW,
BACKOFF_TIME_MAX_MS,
),
storage_provider,
feeds_to_retain: HashSet::new(),
poll_interval_seconds,
poll_concurrency,
Expand Down Expand Up @@ -105,7 +104,7 @@ impl FeedReader {

}

async fn poll_feed(&self, url: &String, cache_times: Arc<RwLock<HashMap<String, CacheTime>>>) -> Result<Option<HookshotFeedInfo>, JsError> {
async fn poll_feed(&self, url: &String, cache_times: Arc<RwLock<HashMap<String, CacheTime>>>, mut storage: &impl StorageProvider) -> Result<Option<HookshotFeedInfo>, JsError> {
let seen_entries_changed = false;
let fetch_key = Uuid::new_v4().to_string();

Expand All @@ -128,13 +127,17 @@ impl FeedReader {
});
drop(c_t_w);

let initial_sync = false; // TODO: Implement
let seen_items: HashSet<String> = HashSet::new(); // TODO: Implement
let initial_sync = storage.has_seen_feed(url).await;
let mut new_guids: Vec<String> = Vec::new();
let new_entries: Vec<HookshotFeedEntry> = Vec::new();


if let Some(feed) = result.feed {
let items = feed.items.iter().map(|x| x.hash_id.clone()).collect::<Vec<_>>();
let seen_items = storage.has_seen_feed_guids(
url,
&items,
).await;
println!("Got feed result!");
let mut feed_info = HookshotFeedInfo {
title: feed.title,
Expand All @@ -144,26 +147,25 @@ impl FeedReader {
};
for item in feed.items {
println!("Got feed result! {:?}", item);
if let Some(hash_id) = item.hash_id {
if seen_items.contains(&hash_id) {
continue;
}
// TODO: Drop unwrap
new_guids.push(hash_id);

if initial_sync {
// Skip.
continue;
}
feed_info.entries.push(HookshotFeedEntry {
title: item.title,
pubdate: item.pubdate,
summary: item.summary,
author: item.author,
link: item.link,
});
if seen_items.contains(&item.hash_id) {
continue;
}

new_guids.push(item.hash_id.clone());

if initial_sync {
// Skip.
continue;
}
feed_info.entries.push(HookshotFeedEntry {
title: item.title,
pubdate: item.pubdate,
summary: item.summary,
author: item.author,
link: item.link,
});
}
storage.store_feed_guids(&url, &new_guids).await;
return Ok(Some(feed_info));
} else {
// TODO: Implement
Expand All @@ -178,20 +180,33 @@ impl FeedReader {
return (self.poll_interval_seconds * 1000.0) / self.queue.length() as f64;
}

async unsafe fn poll_feed_int(&mut self, url: &String, mut storage_provider: &impl StorageProvider) {
let mut sleep_for = self.sleeping_interval();
self.feeds_to_retain.insert(url.clone());
let now = Instant::now();
let result = self.poll_feed(url, self.cache_times.clone(), &storage_provider).await;
self.feeds_to_retain.remove(url);
let elapsed = now.elapsed();
sleep_for = (sleep_for - (elapsed.as_millis() as f64)).max(0.0);
async_std::task::sleep(Duration::from_millis(sleep_for as u64)).await;
}

#[napi]
pub async unsafe fn poll_feeds(&mut self) -> Result<(), JsError> {
let mut sleep_for = self.sleeping_interval();
if let Some(url) = self.queue.pop() {
self.feeds_to_retain.insert(url.clone());
let now = Instant::now();
let result = self.poll_feed(&url, self.cache_times.clone()).await?;
self.feeds_to_retain.remove(&url);
let elapsed = now.elapsed();
sleep_for = (sleep_for - (elapsed.as_millis() as f64)).max(0.0);
} else {
println!("No feeds available");
let concurrency = self.poll_concurrency as usize;
let mut storage_provider = MemoryStorageProvider::new();
let mut future_set: Vec<_> = Vec::new();
loop {
if let Some(url) = self.queue.pop() {
let result = Box::pin(self.poll_feed_int(&url, &storage_provider));
future_set.push(result);
} else {
async_std::task::sleep(Duration::from_millis(self.sleeping_interval() as u64)).await;
}
if future_set.len() >= concurrency {
let (item_resolved, ready_future_index, _remaining_futures) =
select_all(future_set).await;
}
}
async_std::task::sleep(Duration::from_millis(sleep_for as u64)).await;
Ok(())
}
}
18 changes: 7 additions & 11 deletions src/stores/memory.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,23 +14,19 @@ impl MemoryStorageProvider {
}

impl StorageProvider for MemoryStorageProvider {
async fn store_feed_guids(&mut self, url: &String, guids: &Vec<String>) -> Result<(), Err<String>> {
let mut guid_set = self.guids.get(url).or_else(|| {
let new = HashSet::new();
self.guids.insert(url.clone(), new);
Some(&new)
}).unwrap();
async fn store_feed_guids(&mut self, url: &String, guids: &Vec<String>) {
let guid_set = self.guids.entry(url.clone()).or_insert(HashSet::default());

for guid in guids {
guid_set.insert(guid.clone());
}
Ok(())
}

async fn has_seen_feed(&self, url: &String, guids: &Vec<String>) -> Result<bool, Err<String>> {
Ok(self.guids.contains_key(url))
async fn has_seen_feed(&self, url: &String) -> bool {
self.guids.contains_key(url)
}

async fn has_seen_feed_guids(&self,url: &String, guids: &Vec<String>) -> Result<Vec<String>, Err<String>> {
async fn has_seen_feed_guids(&self,url: &String, guids: &Vec<String>) -> Vec<String> {
let mut seen_guids = Vec::default();
if let Some(existing_guids) = self.guids.get(url) {
for guid in guids {
Expand All @@ -39,6 +35,6 @@ impl StorageProvider for MemoryStorageProvider {
}
}
}
Ok(seen_guids)
seen_guids
}
}
8 changes: 4 additions & 4 deletions src/stores/traits.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
trait StorageProvider {
async fn store_feed_guids(&mut self, url: &String, guids: &Vec<String>) -> Result<Ok, Err<String>>;
async fn has_seen_feed(&self, url: &String, guids: &Vec<String>) -> Result<bool, Err<String>>;
async fn has_seen_feed_guids(&self, url: &String, guids: &Vec<String>) -> Result<Vec<String>, Err<String>>;
pub trait StorageProvider {
async fn store_feed_guids(&mut self, url: &String, guids: &Vec<String>);
async fn has_seen_feed(&self, url: &String) -> bool;
async fn has_seen_feed_guids(&self, url: &String, guids: &Vec<String>) -> Vec<String>;
}

0 comments on commit bf88aab

Please sign in to comment.