Implement Automatic Scheduler Switching for P2P Downloads
PoloDyBala opened this issue · 0 comments
Description
To address potential issues with Scheduler failures during P2P downloads, this project aims to implement an automatic switching feature at the Peer node. When a Scheduler encounters an issue and cannot properly handle downloads, the Peer node will switch to other Scheduler nodes using a hash ring to continue the download, thereby preventing download failures and improving success rates.
Requirements
-
Scheduler Availability Check During Peer Registration: Ensure that the Scheduler is available during the Peer node registration process.
-
Automatic Scheduler Switching: If the current Scheduler is unavailable, automatically switch to another Scheduler node until a usable Scheduler is found.
-
Feature Development: Complete the development of the automatic switching logic and Scheduler availability detection.
Testing: Implement unit tests and end-to-end (E2E) tests to ensure the reliability and stability of the feature.
Solution
Add a cooling time option to the configuration file, specifically:
- Cooling time mechanism: Before attempting to connect, check if the scheduler is within the cooling time. If it is within the cooling time, skip the node.
- Restore available state after success: If the attempt to connect is successful after the cooling time, remove the node from the unavailable list and make it available again.
File Path: dragonfly-client/src/dynconfig/mod.rs
// Data is the dynamic configuration of the dfdaemon.
#[derive(Default)]
pub struct Data {
// schedulers is the schedulers of the dfdaemon.
pub schedulers: ListSchedulersResponse,
// available_schedulers is the available schedulers of the dfdaemon.
pub available_schedulers: Vec<Scheduler>,
// available_scheduler_cluster_id is the id of the available scheduler cluster of the dfdaemon.
pub available_scheduler_cluster_id: Option<u64>,
// cooldown_duration 是调度器的冷却时间(秒)。
pub cooldown_duration_secs: u64,
}
File Path: src/grpc/scheduler.rs
impl SchedulerClient {
// client gets the grpc client of the scheduler.
#[instrument(skip(self))]
async fn client(
&self,
task_id: &str,
peer_id: Option<&str>,
) -> Result<SchedulerGRPCClient<Channel>> {
let mut last_err = None;
// 获取当前 HashRing 状态
let hashring = self.hashring.read().await;
let scheduler_keys: Vec<&VNode> = hashring.nodes().collect();
let mut unavailable_schedulers = self.unavailable_schedulers.lock().await;
let cooldown_duration = self.dynconfig.data.read().await.cooldown_duration;
for scheduler_key in scheduler_keys.iter().cycle().take(scheduler_keys.len()) {
let scheduler_addr = scheduler_key.addr;
// 检查是否在冷却时间内
if let Some(&instant) = unavailable_schedulers.get(&scheduler_addr) {
if instant.elapsed() < cooldown_duration {
continue;
}
}
match self.try_client(&scheduler_addr).await {
Ok(client) => {
// 成功连接后从不可用列表中移除
unavailable_schedulers.remove(&scheduler_addr);
return Ok(client);
}
Err(err) => {
error!("Scheduler {} is not available: {}", scheduler_addr, err);
last_err = Some(err);
// 如果调度器不在不可用列表中,则将其加入缓存
if !unavailable_schedulers.contains_key(&scheduler_addr) {
unavailable_schedulers.insert(scheduler_addr, Instant::now());
}
continue;
}
}
}
// 如果遍历完所有调度器之后仍然失败,强制刷新调度器列表
self.refresh_available_scheduler_addrs().await?;
Err(last_err.unwrap_or(Error::SchedulerUnavailable))
}
async fn try_client(&self, scheduler_addr: &SocketAddr) -> Result<SchedulerGRPCClient<Channel>> {
let channel = Channel::from_shared(format!("http://{}", scheduler_addr))
.map_err(|_| Error::InvalidURI(scheduler_addr.to_string()))?
.connect_timeout(super::CONNECT_TIMEOUT)
.connect()
.await
.map_err(|err| {
error!("connect to {} failed: {}", scheduler_addr, err);
err
})
.or_err(ErrorType::ConnectError)?;
Ok(SchedulerGRPCClient::new(channel))
}
#[instrument(skip(self))]
async fn mark_unavailable(&self, scheduler_addr: SocketAddr) {
let mut unavailable_schedulers = self.unavailable_schedulers.lock().await;
unavailable_schedulers.insert(scheduler_addr, Instant::now());
}
#[instrument(skip(self))]
async fn refresh_available_scheduler_addrs(&self) -> Result<()> {
self.dynconfig.refresh().await?;
let data = self.dynconfig.data.read().await;
if data.available_schedulers.is_empty() {
return Err(Error::AvailableSchedulersNotFound);
}
let mut new_available_schedulers = Vec::new();
let mut new_available_scheduler_addrs = Vec::new();
let mut new_hashring = HashRing::new();
for available_scheduler in data.available_schedulers.iter() {
let ip = match IpAddr::from_str(&available_scheduler.ip) {
Ok(ip) => ip,
Err(err) => {
error!("failed to parse ip: {}", err);
continue;
}
};
new_available_schedulers.push(available_scheduler.clone());
new_available_scheduler_addrs.push(SocketAddr::new(ip, available_scheduler.port as u16));
new_hashring.add(VNode {
addr: SocketAddr::new(ip, available_scheduler.port as u16),
});
}
let mut available_schedulers = self.available_schedulers.write().await;
*available_schedulers = new_available_schedulers;
let mut available_scheduler_addrs = self.available_scheduler_addrs.write().await;
*available_scheduler_addrs = new_available_scheduler_addrs;
let mut hashring = self.hashring.write().await;
*hashring = new_hashring;
info!(
"Refreshed available scheduler addresses: {:?}",
available_scheduler_addrs
.iter()
.map(|s| s.ip().to_string())
.collect::<Vec<String>>()
);
Ok(())
}
fn make_request<T>(request: T) -> tonic::Request<T> {
let mut request = tonic::Request::new(request);
request.set_timeout(super::REQUEST_TIMEOUT);
request
}
}