/lelet

golang like task executor

Primary LanguageRustGNU General Public License v3.0GPL-3.0

lelet

A golang like task executor

Crates.io version Download docs.rs docs

Lelet executor

Task executor that inspired by golang runtime.

The executor is running in thread pool, and when it detect blocking call inside a task, it will automatically scale the thread pool.

Because of this feature, it is always safe for you to do blocking operation in a task, you don't need to worry about blocking the entire executor thread.

Installation

With cargo add installed run:

$ cargo add lelet

Example

use std::thread;
use std::time::Duration;

use futures_timer::Delay;

fn main() {
    lelet::spawn(async {
        for _ in 0..10 {
            Delay::new(Duration::from_secs(1)).await;
            println!("Non-blocking Hello World");
        }
    });

    lelet::spawn(async {
        for _ in 0..10 {
            thread::sleep(Duration::from_secs(1));
            println!("Blocking Hello World");
        }
    });

    thread::sleep(Duration::from_secs(11));
}

How about IO

lelet-io is still on progress, in the meantime you can use async IO library from tokio

for example HTTP server using hyper

use std::convert::Infallible;
use std::thread;
use std::time::Duration;

use hyper::service::{make_service_fn, service_fn};
use hyper::{Body, Request, Response, Server, StatusCode};

async fn handler(request: Request<Body>) -> Result<Response<Body>, Infallible> {
    match request.uri().path() {
        "/" => Ok(Response::new(Body::from("Hello World!"))),
        "/blocking" => {
            thread::sleep(Duration::from_secs(5));
            Ok(Response::new(Body::from("Blocking Hello World!")))
        }
        _ => {
            let mut resp = Response::new(Body::from("404 Not Found"));
            *resp.status_mut() = StatusCode::NOT_FOUND;
            Ok(resp)
        }
    }
}

fn main() {
    tokio::runtime::Builder::new()
        .enable_io()
        .build()
        .unwrap()
        .block_on(async {
            let addr = "127.0.0.1:3000";
            let listener = tokio::net::TcpListener::bind(addr).await.unwrap();

            let make_svc =
                make_service_fn(|_conn| async { Ok::<_, Infallible>(service_fn(handler)) });

            let server = Server::builder(compat::TcpListener(listener))
                .executor(compat::Executor)
                .serve(make_svc);

            println!("Listening on http://{}", addr);
            server.await.unwrap();
        });
}

pub mod compat {
    use std::future::Future;
    use std::io;
    use std::pin::Pin;
    use std::task::{Context, Poll};

    use hyper::server::accept::Accept;
    use tokio::io::{AsyncRead, AsyncWrite};

    #[derive(Clone)]
    pub struct Executor;

    impl<F> hyper::rt::Executor<F> for Executor
    where
        F: Future + Send + 'static,
        F::Output: Send + 'static,
    {
        fn execute(&self, fut: F) {
            lelet::spawn(fut);
        }
    }

    pub struct TcpListener(pub tokio::net::TcpListener);

    impl Accept for TcpListener {
        type Conn = TcpStream;
        type Error = io::Error;

        fn poll_accept(
            mut self: Pin<&mut Self>,
            cx: &mut Context,
        ) -> Poll<Option<Result<Self::Conn, Self::Error>>> {
            Pin::new(&mut self.0.incoming())
                .poll_accept(cx)
                .map(|result| Some(result.map(TcpStream)))
        }
    }

    pub struct TcpStream(pub tokio::net::TcpStream);

    impl AsyncRead for TcpStream {
        fn poll_read(
            mut self: Pin<&mut Self>,
            cx: &mut Context,
            buf: &mut [u8],
        ) -> Poll<io::Result<usize>> {
            Pin::new(&mut self.0).poll_read(cx, buf)
        }
    }

    impl AsyncWrite for TcpStream {
        fn poll_write(
            mut self: Pin<&mut Self>,
            cx: &mut Context,
            buf: &[u8],
        ) -> Poll<io::Result<usize>> {
            Pin::new(&mut self.0).poll_write(cx, buf)
        }

        fn poll_flush(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<io::Result<()>> {
            Pin::new(&mut self.0).poll_flush(cx)
        }

        fn poll_shutdown(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<io::Result<()>> {
            Pin::new(&mut self.0).poll_shutdown(cx)
        }
    }
}

then test multiple request to /blocking, all of them will complete within 5 second

bash -c '
  date
  for i in {0..20}; do
    curl localhost:3000/blocking -s -o /dev/null &
  done
  wait
  date
'