Finomnis/tokio-graceful-shutdown

Provide access to SubsystemHandle

Ten0 opened this issue · 4 comments

Ten0 commented

I have a use-case where I need to start a bunch of tasks in several loops.
That is very impractical to do with toplevel not providing an interface when it doesn't take ownership.
I think I'd be fine with just subsystem(&self) -> &SubsystemHandle. That seems to be the most flexible for other operations as well.

In addition, it is not possible to start a nested subsystem at top-level, without Send+'static on all used items, because it is again required to have access to the SubsystemHandle for this. If I have e.g. an &mut something to update as I start my nested subsystem, whose output is used to start another system, that won't work.
Again, subsystem(&self) -> &SubsystemHandle solves this.

Just be aware that if you spawn a bunch of tasks repeatably, they don't get fully cleaned up on shutdown. Tasks only get cleaned up when the (nested) toplevel gets destroyed, as can be seen by the output at the end of the toplevel. Just so you are aware, but if you are using nested toplevels already, you probably already worked around that issue.

This is a limitation of the current implementation and I am currently playing around with a complete rewrite to solve this... So I apologize.

Apart of that, would you mind sending me a usecase example of how you would use said subsystem() function in your code?
I understand the problem that start should be &mut, und honestly, I agree. This was a pretty arbitraty decision to make it possible to spawn the toplevel in one long builder-style call chain (because wait_for_shutdown is consuming), but I see that it collides with your usecase.

I don't fully understand the second half of your problem, though. Why exactly would that not work?

Ten0 commented

I don't fully understand the second half of your problem, though. Why exactly would that not work?

Here is a test that showcases both use-cases:
c435783 (#55)

Sorry about the delay.

How about this?

use tokio::sync::oneshot;
use tokio_graceful_shutdown::*;

#[derive(Debug)]
struct MonitoringRegistry;

struct Tasks {
    monitoring_sender: oneshot::Sender<MonitoringRegistry>,
}

#[async_trait::async_trait]
impl IntoSubsystem<anyhow::Error, anyhow::Error> for Tasks {
    async fn run(self, handle: SubsystemHandle<anyhow::Error>) -> anyhow::Result<()> {
        // Obviously these two things are more complex in reality
        // thing needs &mut monitoring to initialize, and then that needs to be moved
        // to the Monitoring task
        let mut monitoring_registry = MonitoringRegistry;
        fn init_thing(_monitoring_registry: &mut MonitoringRegistry) {}

        let mut tasks = Toplevel::nested(&handle, "Tasks");
        for task in 0..2 {
            let thing_to_work_on = init_thing(&mut monitoring_registry);
            // By the way I would also like to not reallocate that string, although it does not matter
            // too much
            tasks = tasks.start(&format!("Task {task}"), move |subsystem| async move {
                // This simulates normal work with, but obviously it would normally wait on select
                // on `thing` and on_shutdown_requested
                let _thing = thing_to_work_on;
                subsystem.on_shutdown_requested().await;

                // Simulates the time it takes to gracefully shutdown
                println!("Shutting down thing task {task}");
                tokio::time::sleep(std::time::Duration::from_millis(200)).await;
                println!("Shut down task {task}");
                Ok::<(), anyhow::Error>(())
            });
        }

        // Initialization over, propagate the registry to the monitoring task
        self.monitoring_sender.send(monitoring_registry).unwrap();

        tasks = tasks.start("Ctrl+C", |handle| async move {
            tokio::time::sleep(std::time::Duration::from_millis(100)).await;
            println!("Time to shut down!");
            handle.request_shutdown();
            anyhow::Result::<()>::Ok(())
        });

        let res = tasks
            //.catch_signals() - normally we would put that here
            .handle_shutdown_requests(std::time::Duration::from_millis(300))
            .await;
        println!("All thing tasks have shut down, propagating up");
        handle.request_global_shutdown();

        res.map_err(Into::into)
    }
}

/// **This should show:**
///
/// Time to shut down!
/// Shutting down thing task 1
/// Shutting down thing task 0
/// Shut down task 1
/// Shut down task 0
/// All thing tasks have shut down, propagating up
/// Stopping monitoring
#[tokio::main]
async fn main() -> anyhow::Result<()> {
    let (monitoring_sender, monitoring_receiver) = oneshot::channel();

    Toplevel::<anyhow::Error>::new()
        .start("Tasks", Tasks { monitoring_sender }.into_subsystem())
        .start("Monitoring", move |ss| async move {
            let _monitoring = monitoring_receiver.await.unwrap();
            // imitates how quickly monitoring stops
            ss.on_shutdown_requested().await;
            println!("Stopping monitoring");
            Ok::<_, anyhow::Error>(())
        })
        .handle_shutdown_requests(std::time::Duration::from_millis(300))
        .await
        .map_err(Into::into)
}
Time to shut down!
Shutting down thing task 1
Shutting down thing task 0
Shut down task 0
Shut down task 1
All thing tasks have shut down, propagating up
Stopping monitoring

Closed due to inactivity.