Skip to content

Commit

Permalink
refactor(crawler): 重构爬虫模快
Browse files Browse the repository at this point in the history
主要功能:
    1.接入prometheus,实时监控爬虫运行情况
    2.重新设计爬虫架构,简洁清晰
  • Loading branch information
Erinable committed Jan 1, 2025
1 parent 1f61c83 commit fb4a8a5
Show file tree
Hide file tree
Showing 23 changed files with 2,397 additions and 652 deletions.
4 changes: 4 additions & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -29,8 +29,11 @@ diesel-async = {version = "^0.5", features = ["postgres", "bb8"]}
dotenv = "0.15"
futures = "0.3"
governor = "0.6"
lazy_static = "1.5.0"
# Utilities
num_cpus = "1.16"
prometheus = {version = "0.13.4", features = ["process"]}
prometheus-client = "0.22.3"
quick-xml = "0.31"
r2d2 = "0.8"
rand = "0.8"
Expand All @@ -46,6 +49,7 @@ time = {version = "0.3", features = ["formatting"]}
# Async Runtime
tokio = {version = "1.32", features = ["full"]}
tokio-stream = "0.1"
tokio-util = {version = "0.7.13", features = ["rt"]}
# Logging and Tracing
tracing = {version = "0.1", features = ["attributes"]}
tracing-appender = "0.2"
Expand Down
304 changes: 280 additions & 24 deletions src/crawler/batch_processor/inserter.rs
Original file line number Diff line number Diff line change
@@ -1,53 +1,294 @@
use std::collections::HashMap;
use std::future::Future;
use std::sync::atomic::{AtomicUsize, Ordering};
use std::sync::Arc;
use tokio::sync::mpsc;
use std::time::{Duration, Instant};
use tokio::sync::mpsc::error::TryRecvError;
use tokio::sync::{mpsc, Mutex, Semaphore};
use tokio::task::JoinHandle;
use tracing::{error, info};

use crate::crawler::batch_processor::processor::process_batch;
use crate::crawler::batch_processor::stats::{BatchStats, StatsSummary};
use crate::crawler::TaskResult;
use crate::infrastructure::error::{AppError, DomainError, DomainErrorKind};

pub(crate) struct BatchInserter<T> {
tx: mpsc::Sender<T>,
rx: Arc<Mutex<mpsc::Receiver<T>>>,
processed_count: Arc<AtomicUsize>,
semaphore: Arc<Semaphore>,
active_workers: Arc<AtomicUsize>,
channel_capacity: usize,
total_count: usize,
stats: Arc<BatchStats>,
}

impl<T> BatchInserter<T>
where
T: Send + 'static,
T: Send + 'static + Clone,
{
pub fn new<F>(batch_size: usize, insert_fn: F) -> Self
pub fn new<F, Fut>(
batch_size: usize,
max_concurrent_inserts: usize,
insert_fn: F,
batch_timeout: Duration,
concurrent_threads: usize,
) -> Self
where
F: Fn(Vec<T>) -> Result<(), AppError> + Send + Sync + 'static,
F: Fn(Vec<T>) -> Fut + Send + Sync + 'static + Clone,
Fut: Future<Output = Result<(), AppError>> + Send,
{
let (tx, mut rx) = mpsc::channel(batch_size * 2);
let processed_count = Arc::new(AtomicUsize::new(0));
let counter = processed_count.clone();
println!(
"Creating BatchInserter: batch_size={}, max_concurrent_inserts={}, batch_timeout={:?}, concurrent_threads={}",
batch_size, max_concurrent_inserts, batch_timeout, concurrent_threads
);

tokio::spawn(async move {
let mut current_batch = Vec::with_capacity(batch_size);
// Calculate optimal channel capacity
let channel_capacity = batch_size * concurrent_threads;
let (tx, rx) = mpsc::channel(channel_capacity);
let rx = Arc::new(Mutex::new(rx));
let processed_count = Arc::new(AtomicUsize::new(0));
let semaphore = Arc::new(Semaphore::new(max_concurrent_inserts));
let active_workers = Arc::new(AtomicUsize::new(0));
let stats = Arc::new(BatchStats::new());

while let Some(item) = rx.recv().await {
current_batch.push(item);
let processed_count_clone = processed_count.clone();
let semaphore_clone = semaphore.clone();
let active_workers_clone = active_workers.clone();
let stats_clone = stats.clone();
let tx_clone = tx.clone();
let rx_clone = rx.clone();

if current_batch.len() >= batch_size {
Self::process_batch(&insert_fn, &mut current_batch, &counter).await;
}
}

// Process remaining items
if !current_batch.is_empty() {
Self::process_batch(&insert_fn, &mut current_batch, &counter).await;
}
tokio::spawn(async move {
let this = Self {
tx: tx_clone,
rx: rx_clone,
processed_count: processed_count_clone.clone(),
semaphore: semaphore_clone.clone(),
active_workers: active_workers_clone.clone(),
total_count: 0,
stats: stats_clone.clone(),
channel_capacity,
};
this.start_channel_monitor(
batch_size,
batch_timeout,
insert_fn.clone(),
processed_count_clone,
semaphore_clone,
active_workers_clone,
stats_clone,
)
.await
.await
});

Self {
tx,
rx,
processed_count,
semaphore,
active_workers,
total_count: 0,
stats,
channel_capacity,
}
}

async fn start_channel_monitor<F, Fut>(
&self,
batch_size: usize,
batch_timeout: Duration,
insert_fn: F,
processed_count: Arc<AtomicUsize>,
semaphore: Arc<Semaphore>,
active_workers: Arc<AtomicUsize>,
stats: Arc<BatchStats>,
) -> JoinHandle<()>
where
F: Fn(Vec<T>) -> Fut + Send + Sync + 'static + Clone,
Fut: Future<Output = Result<(), AppError>> + Send,
{
let rx_clone = self.rx.clone();
tokio::spawn(async move {
BatchInserter::channel_monitor(
rx_clone,
batch_size,
batch_timeout,
insert_fn,
processed_count,
semaphore,
active_workers,
stats,
)
.await
})
}

async fn channel_monitor<F, Fut>(
rx: Arc<Mutex<mpsc::Receiver<T>>>,
batch_size: usize,
batch_timeout: Duration,
insert_fn: F,
processed_count: Arc<AtomicUsize>,
semaphore: Arc<Semaphore>,
active_workers: Arc<AtomicUsize>,
stats: Arc<BatchStats>,
) where
F: Fn(Vec<T>) -> Fut + Send + Sync + 'static + Clone,
Fut: Future<Output = Result<(), AppError>> + Send,
{
loop {
// Collect a batch of items
let batch = Self::collect_batch(&rx, batch_size, batch_timeout).await;

// Skip if batch is empty
if batch.is_empty() {
tokio::time::sleep(Duration::from_millis(100)).await;
continue;
}

// Spawn a worker to process the batch
let worker_batch = batch.clone();
let worker_insert_fn = insert_fn.clone();
let worker_processed_count = processed_count.clone();
let worker_semaphore = semaphore.clone();
let worker_active_workers = active_workers.clone();
let worker_stats = stats.clone();

tokio::spawn(async move {
Self::spawn_worker(
worker_batch,
worker_insert_fn,
worker_processed_count,
worker_semaphore,
worker_active_workers,
worker_stats,
)
.await;
});
}
}

async fn collect_batch(
rx: &Arc<Mutex<mpsc::Receiver<T>>>,
batch_size: usize,
batch_timeout: Duration,
) -> Vec<T> {
let mut batch = Vec::with_capacity(batch_size);
let start_time = Instant::now();

// Try to fill the batch
while batch.len() < batch_size {
// Check timeout
if start_time.elapsed() >= batch_timeout {
break;
}

// Use a timeout for receiving to allow checking the elapsed time
let timeout_duration = Duration::from_millis(50);
let receive_result =
tokio::time::timeout(timeout_duration, rx.lock().await.recv()).await;

match receive_result {
Ok(Some(item)) => {
batch.push(item);
}
Ok(None) => {
// Channel closed
break;
}
Err(_) => {
// Timeout occurred, continue checking
continue;
}
}
}

batch
}

async fn spawn_worker<F, Fut>(
batch: Vec<T>,
insert_fn: F,
counter: Arc<AtomicUsize>,
semaphore: Arc<Semaphore>,
active_workers: Arc<AtomicUsize>,
stats: Arc<BatchStats>,
) where
F: Fn(Vec<T>) -> Fut + Send + Sync + 'static + Clone,
Fut: Future<Output = Result<(), AppError>> + Send,
{
println!(
"Spawning worker for batch of {} items. Active workers before spawn: {}",
batch.len(),
active_workers.load(Ordering::SeqCst)
);

active_workers.fetch_add(1, Ordering::SeqCst);

tokio::spawn(async move {
BatchInserter::worker_thread(
batch,
&insert_fn,
&counter,
&semaphore,
&active_workers,
&stats,
)
.await;
});
}

async fn worker_thread<F, Fut>(
batch: Vec<T>,
insert_fn: &F,
_counter: &Arc<AtomicUsize>,
semaphore: &Arc<Semaphore>,
active_workers: &Arc<AtomicUsize>,
stats: &Arc<BatchStats>,
) where
F: Fn(Vec<T>) -> Fut + Send + Sync + 'static + Clone,
Fut: Future<Output = Result<(), AppError>> + Send,
{
println!(
"Worker thread started with batch of {} items. Active workers: {}",
batch.len(),
active_workers.load(Ordering::SeqCst)
);

let _permit = semaphore.acquire().await.unwrap();
let start_time = Instant::now();
let batch_size = batch.len();

println!("Attempting to process batch of {} items", batch_size);

let task_result = match insert_fn(batch).await {
Ok(_) => {
println!("Batch processing successful");
TaskResult::success("".to_string(), batch_size, start_time.elapsed())
}
Err(e) => {
println!("Batch processing failed: {}", e);
TaskResult::failure("error".to_string(), e.to_string(), start_time.elapsed())
}
};

stats.record_result(&task_result);

println!(
"Worker thread completed. Batch size: {}, Duration: {:?}, Active workers before decrement: {}",
batch_size,
start_time.elapsed(),
active_workers.load(Ordering::SeqCst)
);

active_workers.fetch_sub(1, Ordering::SeqCst);
}

pub async fn insert(&self, item: T) {
println!("Attempting to insert item into batch inserter queue");
if let Err(e) = self.tx.send(item).await {
error!(
error = %e,
Expand All @@ -57,27 +298,42 @@ where
}

pub async fn finish(self) {
println!("Finishing batch inserter: Dropping sender to signal channel closure");
drop(self.tx);
}

async fn process_batch<F>(insert_fn: &F, batch: &mut Vec<T>, counter: &Arc<AtomicUsize>)
pub async fn get_stats_summary(&self) -> StatsSummary {
let summary = self.stats.get_summary().await;
println!("Stats Summary:\n {}", summary.format_report());
summary
}

async fn process_batch<F, Fut>(insert_fn: &F, batch: &mut Vec<T>, counter: &Arc<AtomicUsize>)
where
F: Fn(Vec<T>) -> Result<(), AppError> + Send + Sync,
F: Fn(Vec<T>) -> Fut + Send + Sync + 'static + Clone,
Fut: Future<Output = Result<(), AppError>> + Send,
{
match insert_fn(std::mem::take(batch)) {
println!("Processing batch of {} items", batch.len());
match insert_fn(std::mem::take(batch)).await {
Ok(_) => {
counter.fetch_add(batch.len(), Ordering::SeqCst);
info!("Successfully processed batch of {} items", batch.len());
println!(
"Batch processed successfully. Total processed: {}",
counter.load(Ordering::SeqCst)
);
}
Err(e) => {
error!("Failed to process batch: {}", e);
println!("Batch processing failed with error: {}", e);
let error = DomainError::new(
DomainErrorKind::BatchProcessing,
format!("Batch processing error: {}", e),
Some(format!("Batch size: {}", batch.len())),
None,
Some(Box::new(e)),
);
error!("Batch error: {}", error);
println!("Detailed batch error: {}", error);
}
}
}
Expand Down
3 changes: 1 addition & 2 deletions src/crawler/batch_processor/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@ use crate::crawler::url_utils;
use crate::crawler::TaskResult;
use crate::infrastructure::error::{AppError, DomainError, DomainErrorKind};

pub(crate) use inserter::BatchInserter;
pub(crate) use processor::process_batch;

impl<T> From<processor::TaskResult<T>> for TaskResult<T> {
Expand Down Expand Up @@ -100,7 +99,7 @@ where
let distributed_urls = url_utils::distribute_urls(&urls, insert_batch)?;

for (batch_index, batch_urls) in distributed_urls.iter().enumerate() {
let batch_results = processor::process_batch(
let batch_results = process_batch(
crawler.clone(),
batch_urls,
insert_batch,
Expand Down
Loading

0 comments on commit fb4a8a5

Please sign in to comment.