cetra3/tmq

send &mut

skeet70 opened this issue · 9 comments

I've been trying to understand the code but the macros keep throwing me for a loop. Why does send require mut? Is there any way to prevent that?

It's leading to some pretty awkward code where I create an mpsc channel to share the socket across async stuff. It intuitively feels like I should be able to share the socket as is and send to it without adding the additional in memory channel.

I'd actually say that it's natural for a send method to require &mut, as sending typically requires unique ownership of the resource and it also mutates it in some way. We implement Sink for the sockets that are able to send data, so the send method that you mention is coming from SinkExt in tokio/futures: https://docs.rs/futures/0.3.4/futures/sink/trait.SinkExt.html#method.send.

The same holds for example for tokio's TcpStream, it also requires &mut for the send method.

If you want to write and read at the same time, you can split the socket, which divides it into two halves - one for reading and one for writing. There are multiple ways of achieving that, we provide a split method on (some) sockets and there is also tokio::io::split.

If you just want to share the socket amongst multiple borrowers and you only write with one at the same time, a canonical solution for that is probably using something akin to Rc<RefCell<Socket>> to move the borrow check from compile time to runtime.

The &mut comes from the tokio & futures Stream/Sink trait definitions unfortunately. I have a PR open where I am experimenting by using the standard StreamExt split which will allow you some more flexibility.

The socket can only be used by one task at one time. If I have multiple tasks that all required to send/receive to the same socket I have either used mpsc or broadcast as you've done, but maybe we can make this a little easier to wire up inside tokio. Do you have some code that you can share as to what you're trying to acheive?

I don't have code I can easily share on hand, but if I get some time I can try to come up with a minimal example. Core of it though is a webservice that needs to send events to a zero PUSH socket to be logged. There can be many parallel requests to the service generating events.

As far as I understood there wasn't a recieve/read side of a PUSH socket, and since the version with it isn't published yet I was sorta floundering around the docs (note to self, I should generate them locally). It seemed like PUSH should just be write and intuitively it seemed like a black hole. If you successfully sent the message to the socket, that was it, which made it weird that it required mut to send. I guess I was expecting behaviour closer to what the Sender side of an mpsc channel provides (a clonable reference that returns from send as soon as it's placed into the buffer).

Right now I am going forward with something like (psuedocode):

async fn main() {
    let (mut log_sender, mut log_receiver) = mpsc::channel::<String>(100);
    tokio::spawn(async move {
        let mut event_logger = logging::events::EventLogger::new(5555).unwrap(); // creates/contains a PUSH socket
        while let Some(event) = log_reciever.recv().await {
            event_logger.log_event(&event).await;
        }
    });

    tokio::spawn(core_service(routes(log_sender.clone())).start(7777));
    tokio::spawn(caching_service(log_sender.clone()).start());
}

and threading the sender through a bunch of the logic in each service. Just felt like the mpsc part shouldn't be necessary if zero was already going to be buffering messages in memory and handling it's own threadpool for sending them. As you can tell this isn't my area of expertise, so take what I'm saying in the context of a naive user. I'm learning 0MQ and its concepts at the same time I'm learning how to use this library.

Beware that ZeroMQ is by design NOT thread safe, so it's not safe to send ZMQ sockets between threads (you can avoid this by using a single threaded tokio runtime). If you just want to have multiple shared references to the socket without going across threads and you don't want to use mpsc, simply wrap it in Rc<RefCell<Socket>>. Then you can share it and also take &mut from it. Although this can lead to runtime errors if the socket is borrowed mutably more than once. It's important not to hold a &mut reference across awaits that could switch to another callsite which also borrows the same socket.

I think the challenge here is that ZeroMQ & tokio are doing similar things and can step on eachother's toes like you have found. Sometimes it's not clear when to use one or the other. ZeroMQ was definitely not designed in a time when rust was a thing, and probably wayyy before async/futures was considered.

The sockets from ZeroMQ are unfortunately not threadsafe, i.e, you can't have 2 or more threads sending data at the same time. In rust parlance they are Send but not Sync. Imagine if you were sending two messages at once & the bytes were interleaved!

There are a few options here that I can think of alongside @Kobzol's solution, but no silver bullet:

Firstly, keep it simple & run everything single threaded. The current client_worker example does this (but is changed in the PR I have active). The spawn_local allows for tasks that are not even Send. You will still have concurrency, as async is designed for this, but I feel like tokio is really geared towards having a thread pool, single threaded configuration is a bit of a second class citizen.

Secondly, you could have multiple sockets connecting to the same endpoint. I'm not sure what the overhead would be, especially if you have lots of tasks running, but maybe thread_local could be of use here.

Thirdly, you can use sync primitives inside tokio to assist with this. You have already stumbled upon mpsc but that has an internal buffer.

If you don't want the memory requirements of an mpsc, you could instead use a tokio Mutex. Not to be confused with a std mutex: the lock function is async. If the lock is not free it will just await until it is, yielding back to other tasks. Internally it uses a semaphore to guard access, and when the lock is released, other tasks that are awaiting the lock are notified.

My rule of thumb is to use tmq at the edge, and use tokio for any internal task related communications. This means that your solution of using an mpsc is going to work, but obviously has some tradeoffs for memory, whereas the mutex would not buffer up messages itself. We could provide some wrapper types but they would still be using tokio primitives, and the challenge would be to ensure that they are performant and do what the API consumer would expect.

Makes sense, though it's too bad. Thanks for all the context and clarification. It may make sense for this library to present some of these patterns itself, since it's specifically tmq. You're right that the challenge would be making them performant and ergonomic, but that's going to be a challenge that anyone solving a problem with those primitives and the library will need to figure out too. May help solve a lot of people's average cases by having some compositions of the tokio primitives with sockets in the lib, with the ability to then document the tradeoffs and use cases.

Either way you've both been very helpful, thank you.

I'll keep this issue open as a possible enhancement to tmq

May I conclude that tmq (in its present version) is only suitable/safe for single threaded Tokio applications?

Rationale:

  1. the tmq examples only use tokio::task::spawn_local(...)
  2. In my application when I try to tokio::task:spawn(async move { run().await }) with
async fn run() -> Result<()> {
   let context = zmq::Context::new();
   let mut s_rep = tokio::reply(&context).bind("ipc:///tmp/s0)?;
   loop {
        let (multipart, s_req) = s_rep.recv().await?;    
        s_rep = s_req.send(multipart).await?;
    }
    Ok(())
}

I obtain the following tmq induced compilation error:

error[E0277]: `*mut std::ffi::c_void` cannot be shared between threads safely
   --> src/bin/app/control/server.rs:60:20
    |
60  |         let h_am = tokio::spawn(async move {
    |                    ^^^^^^^^^^^ `*mut std::ffi::c_void` cannot be shared between threads safely
    | 
   ::: /home/ewf/.cargo/registry/src/github.com-1ecc6299db9ec823/tokio-0.2.20/src/task/spawn.rs:127:21
    |
127 |         T: Future + Send + 'static,
    |                     ---- required by this bound in `tokio::task::spawn::spawn`
    |
    = help: within `tmq::socket_types::request_reply::RequestReceiver`, the trait `std::marker::Sync` is not implemented for `*mut std::ffi::c_void`
    = note: required because it appears within the type `zmq::Socket`
    = note: required because it appears within the type `tmq::socket::SocketWrapper`
    = note: required because it appears within the type `std::option::Option<tmq::socket::SocketWrapper>`
    = note: required because it appears within the type `tokio::io::poll_evented::PollEvented<tmq::socket::SocketWrapper>`
    = note: required because it appears within the type `tmq::poll::ZmqPoller`
    = note: required because it appears within the type `tmq::socket_types::request_reply::RequestReceiver`
    = note: required because of the requirements on the impl of `std::marker::Send` for `&tmq::socket_types::request_reply::RequestReceiver`
    = note: required because it appears within the type `[closure@tmq::socket_types::request_reply::RequestReceiver::recv::{{closure}}#0::{{closure}}#0 0:&tmq::socket_types::request_reply::RequestReceiver]`
    = note: required because it appears within the type `for<'r, 's> {std::future::ResumeTy, tmq::socket_types::request_reply::RequestReceiver, [closure@tmq::socket_types::request_reply::RequestReceiver::recv::{{closure}}#0::{{closure}}#0 0:&'r tmq::socket_types::request_reply::RequestReceiver], futures_util::future::poll_fn::PollFn<[closure@tmq::socket_types::request_reply::RequestReceiver::recv::{{closure}}#0::{{closure}}#0 0:&'s tmq::socket_types::request_reply::RequestReceiver]>, ()}`
    = note: required because it appears within the type `[static generator@tmq::socket_types::request_reply::RequestReceiver::recv::{{closure}}#0 0:tmq::socket_types::request_reply::RequestReceiver for<'r, 's> {std::future::ResumeTy, tmq::socket_types::request_reply::RequestReceiver, [closure@tmq::socket_types::request_reply::RequestReceiver::recv::{{closure}}#0::{{closure}}#0 0:&'r tmq::socket_types::request_reply::RequestReceiver], futures_util::future::poll_fn::PollFn<[closure@tmq::socket_types::request_reply::RequestReceiver::recv::{{closure}}#0::{{closure}}#0 0:&'s tmq::socket_types::request_reply::RequestReceiver]>, ()}]`
    = note: required because it appears within the type `std::future::from_generator::GenFuture<[static generator@tmq::socket_types::request_reply::RequestReceiver::recv::{{closure}}#0 0:tmq::socket_types::request_reply::RequestReceiver for<'r, 's> {std::future::ResumeTy, tmq::socket_types::request_reply::RequestReceiver, [closure@tmq::socket_types::request_reply::RequestReceiver::recv::{{closure}}#0::{{closure}}#0 0:&'r tmq::socket_types::request_reply::RequestReceiver], futures_util::future::poll_fn::PollFn<[closure@tmq::socket_types::request_reply::RequestReceiver::recv::{{closure}}#0::{{closure}}#0 0:&'s tmq::socket_types::request_reply::RequestReceiver]>, ()}]>`
    = note: required because it appears within the type `impl std::future::Future`
    = note: required because it appears within the type `impl std::future::Future`
    = note: required because it appears within the type `{std::future::ResumeTy, std::sync::Arc<config::Config>, zmq::Context, tmq::socket_types::request_reply::RequestReceiver, i32, impl std::future::Future, (), tmq::message::Multipart, tmq::socket_types::request_reply::RequestSender, impl std::future::Future}`
    = note: required because it appears within the type `[static generator@src/bin/app/control/process/appmngr.rs:28:26: 62:2 config:std::sync::Arc<config::Config> {std::future::ResumeTy, std::sync::Arc<config::Config>, zmq::Context, tmq::socket_types::request_reply::RequestReceiver, i32, impl std::future::Future, (), tmq::message::Multipart, tmq::socket_types::request_reply::RequestSender, impl std::future::Future}]`
    = note: required because it appears within the type `std::future::from_generator::GenFuture<[static generator@src/bin/app/control/process/appmngr.rs:28:26: 62:2 config:std::sync::Arc<config::Config> {std::future::ResumeTy, std::sync::Arc<config::Config>, zmq::Context, tmq::socket_types::request_reply::RequestReceiver, i32, impl std::future::Future, (), tmq::message::Multipart, tmq::socket_types::request_reply::RequestSender, impl std::future::Future}]>`
    = note: required because it appears within the type `impl std::future::Future`
    = note: required because it appears within the type `impl std::future::Future`
    = note: required because it appears within the type `{std::future::ResumeTy, std::sync::Arc<config::Config>, impl std::future::Future, ()}`
    = note: required because it appears within the type `[static generator@src/bin/app/control/server.rs:60:46: 64:10 cfg0:std::sync::Arc<config::Config> {std::future::ResumeTy, std::sync::Arc<config::Config>, impl std::future::Future, ()}]`
    = note: required because it appears within the type `std::future::from_generator::GenFuture<[static generator@src/bin/app/control/server.rs:60:46: 64:10 cfg0:std::sync::Arc<config::Config> {std::future::ResumeTy, std::sync::Arc<config::Config>, impl std::future::Future, ()}]>`
    = note: required because it appears within the type `impl std::future::Future`

So the sockets are Send but not Sync. I have pushed the PR to use the standard split for other socket types but the REQ/REP pairing may take a bit more work to use ergonomically. I have used other socket types such as DEALER/ROUTER the standard tokio runtime.

See jobq for an example