Skip to content

Commit

Permalink
add scraping job
Browse files Browse the repository at this point in the history
  • Loading branch information
zaknesler committed May 25, 2024
1 parent a73884c commit d781658
Show file tree
Hide file tree
Showing 18 changed files with 135 additions and 100 deletions.
1 change: 1 addition & 0 deletions crates/blend-db/migrations/20240430000000_entries.sql
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ CREATE TABLE IF NOT EXISTS entries (
summary_html TEXT,
content_html TEXT,
content_scraped_html TEXT,
media_url TEXT,
published_at DATETIME,
updated_at DATETIME,
read_at DATETIME,
Expand Down
21 changes: 0 additions & 21 deletions crates/blend-db/migrations/20240504000000_feeds_stats.sql

This file was deleted.

1 change: 1 addition & 0 deletions crates/blend-db/src/model/entry.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ pub struct Entry {
pub content_html: Option<String>,
#[sqlx(default)]
pub content_scraped_html: Option<String>,
pub media_url: Option<String>,
pub published_at: Option<DateTime<Utc>>,
pub updated_at: Option<DateTime<Utc>>,
pub read_at: Option<DateTime<Utc>>,
Expand Down
38 changes: 33 additions & 5 deletions crates/blend-db/src/repo/entry.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ pub struct CreateEntryParams {
pub title: Option<String>,
pub summary_html: Option<String>,
pub content_html: Option<String>,
pub content_scraped_html: Option<String>,
pub media_url: Option<String>,
pub published_at: Option<DateTime<Utc>>,
pub updated_at: Option<DateTime<Utc>>,
}
Expand Down Expand Up @@ -87,7 +87,7 @@ impl EntryRepo {
let el = filter.sort.query_elements();
let el_inv = filter.sort.query_elements_inverse();

let mut query = QueryBuilder::<Sqlite>::new("SELECT uuid, feed_uuid, id, url, title, summary_html, published_at, updated_at, read_at, saved_at, scraped_at FROM entries WHERE 1=1");
let mut query = QueryBuilder::<Sqlite>::new("SELECT uuid, feed_uuid, id, url, title, summary_html, media_url, published_at, updated_at, read_at, saved_at, scraped_at FROM entries WHERE 1=1");

match filter.view {
View::All => query.push(""),
Expand Down Expand Up @@ -147,6 +147,17 @@ impl EntryRepo {
.map_err(|err| err.into())
}

pub async fn get_entries_to_scrape(
&self,
feed_uuid: &uuid::Uuid,
) -> DbResult<Vec<model::Entry>> {
sqlx::query_as::<_, model::Entry>("SELECT * FROM entries WHERE feed_uuid = ?1 AND content_scraped_html IS NULL AND scraped_at IS NULL")
.bind(feed_uuid)
.fetch_all(&self.db)
.await
.map_err(|err| err.into())
}

pub async fn update_entry_as_read(&self, entry_uuid: &uuid::Uuid) -> DbResult<bool> {
let rows_affected = sqlx::query("UPDATE entries SET read_at = ?1 WHERE uuid = ?2")
.bind(Utc::now())
Expand Down Expand Up @@ -177,7 +188,7 @@ impl EntryRepo {
return Ok(vec![]);
}

let mut query = QueryBuilder::<Sqlite>::new("INSERT INTO entries (feed_uuid, uuid, id, url, title, summary_html, content_html, content_scraped_html, published_at, updated_at) ");
let mut query = QueryBuilder::<Sqlite>::new("INSERT INTO entries (feed_uuid, uuid, id, url, title, summary_html, content_html, media_url, published_at, updated_at) ");
query.push_values(entries.iter(), |mut b, entry| {
b.push_bind(feed_uuid)
.push_bind(uuid::Uuid::new_v4())
Expand All @@ -186,7 +197,7 @@ impl EntryRepo {
.push_bind(entry.title.clone())
.push_bind(entry.summary_html.clone())
.push_bind(entry.content_html.clone())
.push_bind(entry.content_scraped_html.clone())
.push_bind(entry.media_url.clone())
.push_bind(entry.published_at)
.push_bind(entry.updated_at);
});
Expand All @@ -198,7 +209,6 @@ impl EntryRepo {
title = excluded.title,
summary_html = excluded.summary_html,
content_html = excluded.content_html,
content_scraped_html = excluded.content_scraped_html,
updated_at = excluded.updated_at
RETURNING uuid
"#,
Expand All @@ -212,4 +222,22 @@ impl EntryRepo {
.map(|row| row.try_get("uuid").map_err(|err| err.into()))
.collect::<DbResult<Vec<uuid::Uuid>>>()
}

pub async fn update_scraped_entry(
&self,
entry_uuid: &uuid::Uuid,
content_scraped_html: Option<String>,
) -> DbResult<bool> {
let rows_affected = sqlx::query(
"UPDATE entries SET content_scraped_html = ?1, scraped_at = ?2 WHERE uuid = ?3",
)
.bind(content_scraped_html)
.bind(Utc::now())
.bind(entry_uuid)
.execute(&self.db)
.await?
.rows_affected();

Ok(rows_affected > 0)
}
}
19 changes: 15 additions & 4 deletions crates/blend-db/src/repo/feed.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,10 +27,21 @@ impl FeedRepo {
}

pub async fn get_stats(&self) -> DbResult<Vec<model::FeedStats>> {
sqlx::query_as::<_, model::FeedStats>("SELECT * from feeds_stats")
.fetch_all(&self.db)
.await
.map_err(|err| err.into())
sqlx::query_as::<_, model::FeedStats>(
r#"
SELECT
feeds.uuid,
COUNT(entries.uuid) as count_total,
COUNT(CASE WHEN entries.read_at IS NULL THEN 1 ELSE NULL END) as count_unread,
COUNT(CASE WHEN entries.saved_at IS NOT NULL THEN 1 ELSE NULL END) as count_saved
FROM feeds
INNER JOIN entries ON feeds.uuid = entries.feed_uuid
GROUP BY feeds.uuid
"#,
)
.fetch_all(&self.db)
.await
.map_err(|err| err.into())
}

pub async fn get_feed(&self, feed_uuid: uuid::Uuid) -> DbResult<Option<model::Feed>> {
Expand Down
2 changes: 2 additions & 0 deletions crates/blend-feed/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,9 @@ pub use error::FeedError as Error;

pub mod model;
mod readability;

mod scrape;
pub use scrape::scrape_entry;

mod parse;
pub use parse::{parse_entries, parse_feed};
Expand Down
1 change: 0 additions & 1 deletion crates/blend-feed/src/model.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@ pub struct ParsedEntry {
pub title: Option<String>,
pub summary_html: Option<String>,
pub content_html: Option<String>,
pub content_scraped_html: Option<String>,
pub media_url: Option<String>,
pub published_at: Option<DateTime<Utc>>,
pub updated_at: Option<DateTime<Utc>>,
Expand Down
15 changes: 1 addition & 14 deletions crates/blend-feed/src/parse.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@ use crate::{
error::FeedResult,
extract::*,
model::{self, ParsedEntry, ParsedFeed},
scrape,
};

/// Fetch feed and handle edge cases
Expand Down Expand Up @@ -36,7 +35,7 @@ pub async fn parse_feed(url: &str) -> FeedResult<ParsedFeed> {
pub async fn parse_entries(url: &str) -> FeedResult<Vec<ParsedEntry>> {
let feed = get_feed(url).await?;

let mut entries = feed
let entries = feed
.entries
.iter()
.cloned()
Expand All @@ -58,24 +57,12 @@ pub async fn parse_entries(url: &str) -> FeedResult<Vec<ParsedEntry>> {
content_html: entry
.content
.and_then(|content| content.body.map(|content| extract_html(&content))),
content_scraped_html: None,
media_url,
published_at: entry.published,
updated_at: entry.updated,
}
})
.collect::<Vec<model::ParsedEntry>>();

for entry in entries.iter_mut() {
if entry.content_html.is_some() || entry.media_url.is_some() {
continue;
}

if let Some(url) = entry.url.clone() {
entry.content_scraped_html =
scrape::scrape_entry(url).await?.map(|html| extract_html(&html));
}
}

Ok(entries)
}
4 changes: 2 additions & 2 deletions crates/blend-web/src/context.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,6 @@ use tokio::sync::{broadcast, mpsc, Mutex};
pub struct Context {
pub blend: Config,
pub db: SqlitePool,
pub jobs: Arc<Mutex<mpsc::Sender<blend_worker::Job>>>,
pub notifs: Arc<Mutex<broadcast::Sender<blend_worker::Notification>>>,
pub job_tx: Arc<Mutex<mpsc::Sender<blend_worker::Job>>>,
pub notif_tx: Arc<Mutex<broadcast::Sender<blend_worker::Notification>>>,
}
16 changes: 8 additions & 8 deletions crates/blend-web/src/router/feed.rs
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ async fn create(
})
.await?;

let worker = ctx.jobs.lock().await;
let worker = ctx.job_tx.lock().await;
worker.send(blend_worker::Job::FetchMetadata(feed.clone())).await?;
worker.send(blend_worker::Job::FetchEntries(feed.clone())).await?;

Expand Down Expand Up @@ -92,23 +92,23 @@ async fn refresh_feed(
.await?
.ok_or_else(|| WebError::NotFoundError)?;

let notifier = ctx.notifs.lock().await;
let dispatcher = ctx.jobs.lock().await;
let notif_tx = ctx.notif_tx.lock().await;
let job_tx = ctx.job_tx.lock().await;

notifier.send(blend_worker::Notification::StartedFeedRefresh {
notif_tx.send(blend_worker::Notification::StartedFeedRefresh {
feed_uuid: feed.uuid,
})?;
dispatcher.send(blend_worker::Job::FetchMetadata(feed.clone())).await?;
dispatcher.send(blend_worker::Job::FetchEntries(feed.clone())).await?;
job_tx.send(blend_worker::Job::FetchMetadata(feed.clone())).await?;
job_tx.send(blend_worker::Job::FetchEntries(feed.clone())).await?;

Ok(Json(json!({ "success": true })))
}

async fn refresh_feeds(State(ctx): State<crate::Context>) -> WebResult<impl IntoResponse> {
let feeds = repo::feed::FeedRepo::new(ctx.db).get_feeds().await?;

let notifier = ctx.notifs.lock().await;
let dispatcher = ctx.jobs.lock().await;
let notifier = ctx.notif_tx.lock().await;
let dispatcher = ctx.job_tx.lock().await;

for feed in feeds {
notifier.send(blend_worker::Notification::StartedFeedRefresh {
Expand Down
2 changes: 1 addition & 1 deletion crates/blend-web/src/router/ws.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ async fn notifications(
}

async fn handle_socket(socket: WebSocket, ctx: crate::Context) {
let mut rx = ctx.notifs.lock().await.subscribe();
let mut rx = ctx.notif_tx.lock().await.subscribe();
let (mut ws_sender, _) = socket.split();

while let Ok(notif) = rx.recv().await {
Expand Down
49 changes: 37 additions & 12 deletions crates/blend-worker/src/handler.rs
Original file line number Diff line number Diff line change
@@ -1,20 +1,20 @@
use crate::{error::WorkerResult, Notification};
use crate::{error::WorkerResult, Job, Notification};
use blend_db::{
model::Feed,
repo::entry::{CreateEntryParams, EntryRepo},
};
use blend_feed::parse_entries;
use sqlx::SqlitePool;
use std::sync::Arc;
use tokio::sync::{broadcast, Mutex};
use tokio::sync::{broadcast, mpsc, Mutex};

/// Parse entries from a feed, and scrape content if necessary
pub async fn fetch_entries(
feed: Feed,
db: SqlitePool,
notifs: Arc<Mutex<broadcast::Sender<Notification>>>,
job_tx: Arc<Mutex<mpsc::Sender<Job>>>,
notif_tx: Arc<Mutex<broadcast::Sender<Notification>>>,
) -> WorkerResult<()> {
let mapped = parse_entries(&feed.url_feed)
let mapped = blend_feed::parse_entries(&feed.url_feed)
.await?
.into_iter()
.map(|entry| CreateEntryParams {
Expand All @@ -23,22 +23,47 @@ pub async fn fetch_entries(
title: entry.title,
summary_html: entry.summary_html,
content_html: entry.content_html,
content_scraped_html: entry.content_scraped_html,
media_url: entry.media_url,
published_at: entry.published_at,
updated_at: entry.updated_at,
})
.collect::<Vec<_>>();

let entry_uuids = EntryRepo::new(db).upsert_entries(&feed.uuid, &mapped).await?;
EntryRepo::new(db).upsert_entries(&feed.uuid, &mapped).await?;

let notifier = notifs.lock().await;

notifier.send(Notification::FinishedFeedRefresh {
// Notify that we've finished a feed refresh
notif_tx.lock().await.send(Notification::FinishedFeedRefresh {
feed_uuid: feed.uuid,
})?;
notifier.send(Notification::EntriesFetched {

// Initiate job to scrape entries
job_tx.lock().await.send(Job::ScrapeEntries(feed)).await?;

Ok(())
}

/// Parse entries from a feed, and scrape content if necessary
pub async fn scrape_entries(
feed: Feed,
db: SqlitePool,
notif_tx: Arc<Mutex<broadcast::Sender<Notification>>>,
) -> WorkerResult<()> {
let repo = EntryRepo::new(db);

// Fetch all entries that have no content and haven't been scraped yet
let entries_to_scrape = repo.get_entries_to_scrape(&feed.uuid).await?;

// Scrape the content for each URL and update in the DB
for entry in entries_to_scrape {
let content_scraped_html = blend_feed::scrape_entry(entry.url)
.await?
.map(|html| blend_feed::extract_html(&html));
repo.update_scraped_entry(&entry.uuid, content_scraped_html).await?;
}

// Notify that we've finished scraping for this feed
notif_tx.lock().await.send(Notification::FinishedScrapingEntries {
feed_uuid: feed.uuid,
entry_uuids,
})?;

Ok(())
Expand Down
2 changes: 2 additions & 0 deletions crates/blend-worker/src/job.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ use std::fmt::Display;
pub enum Job {
FetchEntries(model::Feed),
FetchMetadata(model::Feed),
ScrapeEntries(model::Feed),
}

impl Display for Job {
Expand All @@ -24,6 +25,7 @@ impl Display for Job {
match self {
Job::FetchEntries(feed) => write_job_str("fetch entries", feed.uuid),
Job::FetchMetadata(feed) => write_job_str("fetch metadata", feed.uuid),
Job::ScrapeEntries(feed) => write_job_str("scrape entries", feed.uuid),
}
}
}
Loading

0 comments on commit d781658

Please sign in to comment.