dragonflyoss/client

Implement Automatic Scheduler Switching for P2P Downloads

PoloDyBala opened this issue · 0 comments

Description

To address potential issues with Scheduler failures during P2P downloads, this project aims to implement an automatic switching feature at the Peer node. When a Scheduler encounters an issue and cannot properly handle downloads, the Peer node will switch to other Scheduler nodes using a hash ring to continue the download, thereby preventing download failures and improving success rates.

Requirements

  1. Scheduler Availability Check During Peer Registration: Ensure that the Scheduler is available during the Peer node registration process.

  2. Automatic Scheduler Switching: If the current Scheduler is unavailable, automatically switch to another Scheduler node until a usable Scheduler is found.

  3. Feature Development: Complete the development of the automatic switching logic and Scheduler availability detection.
    Testing: Implement unit tests and end-to-end (E2E) tests to ensure the reliability and stability of the feature.

Solution

Add a cooling time option to the configuration file, specifically:

- Cooling time mechanism: Before attempting to connect, check if the scheduler is within the cooling time. If it is within the cooling time, skip the node.

- Restore available state after success: If the attempt to connect is successful after the cooling time, remove the node from the unavailable list and make it available again.

File Path: dragonfly-client/src/dynconfig/mod.rs

// Data is the dynamic configuration of the dfdaemon.
#[derive(Default)]
pub struct Data {
    // schedulers is the schedulers of the dfdaemon.
    pub schedulers: ListSchedulersResponse,

    // available_schedulers is the available schedulers of the dfdaemon.
    pub available_schedulers: Vec<Scheduler>,

    // available_scheduler_cluster_id is the id of the available scheduler cluster of the dfdaemon.
    pub available_scheduler_cluster_id: Option<u64>,

    // cooldown_duration 是调度器的冷却时间(秒)。
    pub cooldown_duration_secs: u64,
}

File Path: src/grpc/scheduler.rs


impl SchedulerClient {
    // client gets the grpc client of the scheduler.
    #[instrument(skip(self))]
    async fn client(
        &self,
        task_id: &str,
        peer_id: Option<&str>,
    ) -> Result<SchedulerGRPCClient<Channel>> {
        let mut last_err = None;

        // 获取当前 HashRing 状态
        let hashring = self.hashring.read().await;
        let scheduler_keys: Vec<&VNode> = hashring.nodes().collect();
        let mut unavailable_schedulers = self.unavailable_schedulers.lock().await;
        let cooldown_duration = self.dynconfig.data.read().await.cooldown_duration;

        for scheduler_key in scheduler_keys.iter().cycle().take(scheduler_keys.len()) {
            let scheduler_addr = scheduler_key.addr;

            // 检查是否在冷却时间内
            if let Some(&instant) = unavailable_schedulers.get(&scheduler_addr) {
                if instant.elapsed() < cooldown_duration {
                    continue;
                }
            }

            match self.try_client(&scheduler_addr).await {
                Ok(client) => {
                    // 成功连接后从不可用列表中移除
                    unavailable_schedulers.remove(&scheduler_addr);
                    return Ok(client);
                }
                Err(err) => {
                    error!("Scheduler {} is not available: {}", scheduler_addr, err);
                    last_err = Some(err);
                    // 如果调度器不在不可用列表中,则将其加入缓存
                    if !unavailable_schedulers.contains_key(&scheduler_addr) {
                        unavailable_schedulers.insert(scheduler_addr, Instant::now());
                    }
                    continue;
                }
            }
        }

        // 如果遍历完所有调度器之后仍然失败,强制刷新调度器列表
        self.refresh_available_scheduler_addrs().await?;
        Err(last_err.unwrap_or(Error::SchedulerUnavailable))
    }

    async fn try_client(&self, scheduler_addr: &SocketAddr) -> Result<SchedulerGRPCClient<Channel>> {
        let channel = Channel::from_shared(format!("http://{}", scheduler_addr))
            .map_err(|_| Error::InvalidURI(scheduler_addr.to_string()))?
            .connect_timeout(super::CONNECT_TIMEOUT)
            .connect()
            .await
            .map_err(|err| {
                error!("connect to {} failed: {}", scheduler_addr, err);
                err
            })
            .or_err(ErrorType::ConnectError)?;

        Ok(SchedulerGRPCClient::new(channel))
    }

    #[instrument(skip(self))]
    async fn mark_unavailable(&self, scheduler_addr: SocketAddr) {
        let mut unavailable_schedulers = self.unavailable_schedulers.lock().await;
        unavailable_schedulers.insert(scheduler_addr, Instant::now());
    }

    #[instrument(skip(self))]
    async fn refresh_available_scheduler_addrs(&self) -> Result<()> {
        self.dynconfig.refresh().await?;

        let data = self.dynconfig.data.read().await;

        if data.available_schedulers.is_empty() {
            return Err(Error::AvailableSchedulersNotFound);
        }

        let mut new_available_schedulers = Vec::new();
        let mut new_available_scheduler_addrs = Vec::new();
        let mut new_hashring = HashRing::new();

        for available_scheduler in data.available_schedulers.iter() {
            let ip = match IpAddr::from_str(&available_scheduler.ip) {
                Ok(ip) => ip,
                Err(err) => {
                    error!("failed to parse ip: {}", err);
                    continue;
                }
            };

            new_available_schedulers.push(available_scheduler.clone());
            new_available_scheduler_addrs.push(SocketAddr::new(ip, available_scheduler.port as u16));
            new_hashring.add(VNode {
                addr: SocketAddr::new(ip, available_scheduler.port as u16),
            });
        }

        let mut available_schedulers = self.available_schedulers.write().await;
        *available_schedulers = new_available_schedulers;

        let mut available_scheduler_addrs = self.available_scheduler_addrs.write().await;
        *available_scheduler_addrs = new_available_scheduler_addrs;

        let mut hashring = self.hashring.write().await;
        *hashring = new_hashring;

        info!(
            "Refreshed available scheduler addresses: {:?}",
            available_scheduler_addrs
                .iter()
                .map(|s| s.ip().to_string())
                .collect::<Vec<String>>()
        );

        Ok(())
    }

    fn make_request<T>(request: T) -> tonic::Request<T> {
        let mut request = tonic::Request::new(request);
        request.set_timeout(super::REQUEST_TIMEOUT);
        request
    }
}