Skip to content

Commit 504b7de

Browse files
authored
refactor(dfdaemon): parent selector for selecting piece (#1481)
Signed-off-by: Gaius <gaius.qi@gmail.com>
1 parent 1c29c46 commit 504b7de

File tree

9 files changed

+654
-1114
lines changed

9 files changed

+654
-1114
lines changed

dragonfly-client-config/src/dfdaemon.rs

Lines changed: 0 additions & 75 deletions
Original file line numberDiff line numberDiff line change
@@ -73,24 +73,6 @@ pub fn default_download_request_rate_limit() -> u64 {
7373
4000
7474
}
7575

76-
/// default_parent_selector_sync_interval is the default interval to sync host information.
77-
#[inline]
78-
fn default_parent_selector_sync_interval() -> Duration {
79-
Duration::from_millis(500)
80-
}
81-
82-
/// default_parent_selector_timeout is the default timeout for the sync host.
83-
#[inline]
84-
fn default_parent_selector_timeout() -> Duration {
85-
Duration::from_secs(3)
86-
}
87-
88-
/// default_parent_selector_capacity is the default capacity of the parent selector's gRPC connections.
89-
#[inline]
90-
pub fn default_parent_selector_capacity() -> usize {
91-
20
92-
}
93-
9476
/// default_host_hostname is the default hostname of the host.
9577
#[inline]
9678
fn default_host_hostname() -> String {
@@ -398,9 +380,6 @@ pub struct Download {
398380
#[serde(default = "default_download_protocol")]
399381
pub protocol: String,
400382

401-
/// parent_selector is the download parent selector configuration for dfdaemon.
402-
pub parent_selector: ParentSelector,
403-
404383
/// rate_limit is the rate limit of the download speed in GiB/Mib/Kib per second.
405384
#[serde(with = "bytesize_serde", default = "default_download_rate_limit")]
406385
pub rate_limit: ByteSize,
@@ -429,7 +408,6 @@ impl Default for Download {
429408
Download {
430409
server: DownloadServer::default(),
431410
protocol: default_download_protocol(),
432-
parent_selector: ParentSelector::default(),
433411
rate_limit: default_download_rate_limit(),
434412
piece_timeout: default_download_piece_timeout(),
435413
collected_piece_timeout: default_collected_download_piece_timeout(),
@@ -553,59 +531,6 @@ impl UploadClient {
553531
}
554532
}
555533

556-
/// ParentSelector is the download parent selector configuration for dfdaemon. It will synchronize
557-
/// the host info in real-time from the parents and then select the parents for downloading.
558-
///
559-
/// The workflow diagram is as follows:
560-
///
561-
///```text
562-
/// +----------+
563-
/// ----------------| Parent |---------------
564-
/// | +----------+ |
565-
/// Host Info Piece Metadata
566-
/// +------------|-----------------------------------------|------------+
567-
/// | | | |
568-
/// | | Peer | |
569-
/// | v v |
570-
/// | +------------------+ +------------------+ |
571-
/// | | ParentSelector | ---Optimal Parent---> | PieceCollector | |
572-
/// | +------------------+ +------------------+ |
573-
/// | | |
574-
/// | Piece Metadata |
575-
/// | | |
576-
/// | v |
577-
/// | +------------+ |
578-
/// | | Download | |
579-
/// | +------------+ |
580-
/// +-------------------------------------------------------------------+
581-
/// ```
582-
#[derive(Debug, Clone, Default, Validate, Deserialize)]
583-
#[serde(default, rename_all = "camelCase")]
584-
pub struct ParentSelector {
585-
/// enable indicates whether enable parent selector for downloading.
586-
///
587-
/// If `enable` is true, the `ParentSelector`'s sync loop will start. It will periodically fetch
588-
/// host information from parents and use this information to calculate scores for selecting the
589-
/// parents for downloading.
590-
pub enable: bool,
591-
592-
/// sync_interval is the interval to sync parents' host info by gRPC streaming.
593-
#[serde(
594-
default = "default_parent_selector_sync_interval",
595-
with = "humantime_serde"
596-
)]
597-
pub sync_interval: Duration,
598-
599-
/// timeout is the timeout for the sync host.
600-
#[serde(default = "default_parent_selector_timeout", with = "humantime_serde")]
601-
pub timeout: Duration,
602-
603-
/// capacity is the maximum number of gRPC connections that `DfdaemonUpload.SyncHost` maintains
604-
/// in the `ParentSelector`, the default value is 20.
605-
#[serde(default = "default_parent_selector_capacity")]
606-
pub capacity: usize,
607-
}
608-
609534
/// Upload is the upload configuration for dfdaemon.
610535
#[derive(Debug, Clone, Validate, Deserialize)]
611536
#[serde(default, rename_all = "camelCase")]

dragonfly-client-util/src/net/mod.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -59,7 +59,7 @@ pub struct NetworkData {
5959
/// Interface methods provide functionality to get network interface information.
6060
impl Interface {
6161
/// DEFAULT_NETWORKS_REFRESH_INTERVAL is the default interval for refreshing network data.
62-
const DEFAULT_NETWORKS_REFRESH_INTERVAL: Duration = Duration::from_secs(2);
62+
const DEFAULT_NETWORKS_REFRESH_INTERVAL: Duration = Duration::from_secs(1);
6363

6464
/// new creates a new Interface instance based on the provided IP address and rate limit.
6565
pub fn new(ip: IpAddr, rate_limit: ByteSize) -> Interface {

dragonfly-client-util/src/request/selector.rs

Lines changed: 2 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -150,12 +150,8 @@ impl SeedPeerSelector {
150150
hashring.add(addr);
151151
hosts.insert(addr.to_string(), peer);
152152
}
153-
Ok(Err(err)) => {
154-
error!("health check error: {}", err);
155-
}
156-
Err(join_err) => {
157-
error!("task join error: {}", join_err);
158-
}
153+
Ok(Err(err)) => error!("health check failed: {}", err),
154+
Err(err) => error!("task join error: {}", err),
159155
}
160156
}
161157

dragonfly-client/src/bin/dfdaemon/main.rs

Lines changed: 5 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -24,9 +24,7 @@ use dragonfly_client::grpc::{
2424
};
2525
use dragonfly_client::health::Health;
2626
use dragonfly_client::proxy::Proxy;
27-
use dragonfly_client::resource::{
28-
parent_selector::ParentSelector, persistent_cache_task::PersistentCacheTask, task::Task,
29-
};
27+
use dragonfly_client::resource::{persistent_cache_task::PersistentCacheTask, task::Task};
3028
use dragonfly_client::stats::Stats;
3129
use dragonfly_client::tracing::init_tracing;
3230
use dragonfly_client_backend::BackendFactory;
@@ -247,16 +245,6 @@ async fn main() -> Result<(), anyhow::Error> {
247245
.build(),
248246
);
249247

250-
// Initialize parent selector.
251-
let parent_selector = ParentSelector::new(
252-
config.clone(),
253-
id_generator.host_id(),
254-
id_generator.peer_id(),
255-
shutdown.clone(),
256-
shutdown_complete_tx.clone(),
257-
);
258-
let parent_selector = Arc::new(parent_selector);
259-
260248
// Initialize task manager.
261249
let task = Task::new(
262250
config.clone(),
@@ -267,7 +255,8 @@ async fn main() -> Result<(), anyhow::Error> {
267255
download_rate_limiter.clone(),
268256
upload_rate_limiter.clone(),
269257
prefetch_rate_limiter.clone(),
270-
parent_selector.clone(),
258+
shutdown.clone(),
259+
shutdown_complete_tx.clone(),
271260
)?;
272261
let task = Arc::new(task);
273262

@@ -281,6 +270,8 @@ async fn main() -> Result<(), anyhow::Error> {
281270
download_rate_limiter.clone(),
282271
upload_rate_limiter.clone(),
283272
prefetch_rate_limiter.clone(),
273+
shutdown.clone(),
274+
shutdown_complete_tx.clone(),
284275
)?;
285276
let persistent_cache_task = Arc::new(persistent_cache_task);
286277

dragonfly-client/src/grpc/dfdaemon_upload.rs

Lines changed: 14 additions & 28 deletions
Original file line numberDiff line numberDiff line change
@@ -972,7 +972,7 @@ impl DfdaemonUpload for DfdaemonUploadServerHandler {
972972

973973
// Send the piece metadata to the stream.
974974
if piece.is_finished() {
975-
match out_stream_tx
975+
if let Err(err) = out_stream_tx
976976
.send_timeout(
977977
Ok(SyncPiecesResponse {
978978
number: piece.number,
@@ -986,19 +986,14 @@ impl DfdaemonUpload for DfdaemonUploadServerHandler {
986986
)
987987
.await
988988
{
989-
Ok(_) => {
990-
info!("send piece metadata {}-{}", task_id, piece.number);
991-
}
992-
Err(err) => {
993-
error!(
994-
"send piece metadata {}-{} to stream: {}",
995-
task_id, interested_piece_number, err
996-
);
989+
error!(
990+
"send piece metadata {}-{} to stream: {}",
991+
task_id, interested_piece_number, err
992+
);
997993

998-
drop(out_stream_tx);
999-
return;
1000-
}
1001-
}
994+
drop(out_stream_tx);
995+
return;
996+
};
1002997

1003998
// Add the finished piece number to the finished piece numbers.
1004999
finished_piece_numbers.push(piece.number);
@@ -1177,18 +1172,12 @@ impl DfdaemonUpload for DfdaemonUploadServerHandler {
11771172
// Get local interface.
11781173
let interface = self.interface.clone();
11791174

1180-
// DEFAULT_HOST_INFO_REFRESH_INTERVAL is the default interval for refreshing the host info.
1181-
const DEFAULT_HOST_INFO_REFRESH_INTERVAL: Duration = Duration::from_millis(500);
1182-
11831175
// Initialize stream channel.
11841176
let (out_stream_tx, out_stream_rx) = mpsc::channel(10 * 1024);
11851177
tokio::spawn(
11861178
async move {
11871179
// Start the host info update loop.
11881180
loop {
1189-
// Wait for the host info refresh interval.
1190-
tokio::time::sleep(DEFAULT_HOST_INFO_REFRESH_INTERVAL).await;
1191-
11921181
// Wait for getting the network data.
11931182
let network_data = interface.get_network_data().await;
11941183
debug!(
@@ -1200,7 +1189,7 @@ impl DfdaemonUpload for DfdaemonUploadServerHandler {
12001189
);
12011190

12021191
// Send host info.
1203-
match out_stream_tx
1192+
if let Err(err) = out_stream_tx
12041193
.send(Ok(Host {
12051194
network: Some(Network {
12061195
max_rx_bandwidth: network_data.max_rx_bandwidth,
@@ -1213,15 +1202,12 @@ impl DfdaemonUpload for DfdaemonUploadServerHandler {
12131202
}))
12141203
.await
12151204
{
1216-
Ok(_) => {}
1217-
Err(err) => {
1218-
error!(
1219-
"connection broken from remote host {}, err: {}",
1220-
remote_host_id, err
1221-
);
1205+
error!(
1206+
"connection broken from remote host {}, err: {}",
1207+
remote_host_id, err
1208+
);
12221209

1223-
return;
1224-
}
1210+
return;
12251211
};
12261212
}
12271213
}

0 commit comments

Comments
 (0)