Enhancing OutputPort Backpressure Handling via RecvError::Lagged Management and Buffer Configurability
dcadenas opened this issue · 6 comments
I've noticed some silent drops on the receiver side under heavy load, mainly because there's no handling for RecvError::Lagged
in this section. Adjusting the broadcast buffer size could help temporarily, but directly addressing backpressure could offer a more durable solution.
Proposals:
Dealing with RecvError::Lagged
:
Implementing explicit handling for RecvError::Lagged
when backpressure interrupts the recv loop could greatly improve our insight into and control over backpressure issues. Logging or aborting cleanly in these cases would make such events more transparent.
Using Sender::len
for Proactive Management:
Checking the channel's capacity with Sender::len
before sending allows for preemptive action to avoid overloading. This approach could enable smarter decision-making on sending operations, based on the current state of the channel.
Configurable Broadcast Buffer Size:
Allowing the buffer size to be adjusted externally could provide the flexibility needed to optimize for varying workloads, helping to manage backpressure more effectively.
Integrating these suggestions could enhance how backpressure is managed, leading to a system that’s more robust and easier to fine-tune.
I just wanted to state I'm seeing these posts, and am thrilled with the progress. However a sick 5-month-old has prevented me from doing proper reviews here. Rest assured I will get to it, sorry for the delay
No worries at all! Family always comes first, and I hope your little one gets well soon. I'm just grateful for the work you've put into the library and am looking forward to your insights whenever you have the time. Take care and thank you for the update!
OK now I've actually had 20 minutes to read this!
I'm actually surprised this is an issue. In the OutputPort
, the spawned task is just doing a recv() -> map() -> send()
to the target actor, which has an unbounded buffer. Do you have a repro you can share of where this is missing messages under heavy load? (I understand if it's proprietary or something of course).
However if this is indeed a problem, it means the tokio scheduler is really busy that it can't do a simple dequeue of a port, then two blocking operations.
My only concern with pushing logic into the sender space, is that we effect QoS for all downstream targets for a single slow target, which kind of breaks the point of pub-sub models. Probably the correct approach (imho) is dealing with the LAGGED
signal as you suggest.
My only concern with pushing logic into the sender space, is that we effect QoS for all downstream targets for a single slow target, which kind of breaks the point of pub-sub models. Probably the correct approach (imho) is dealing with the LAGGED signal as you suggest.
I hadn't considered that, which makes a lot of sense. Here's some code to reproduce the issue:
use std::time::Duration;
use tracing_subscriber::{fmt, prelude::*, EnvFilter};
use ractor::{Actor, ActorProcessingErr, ActorRef, OutputPort};
struct Counter;
struct CounterState {
count: i64,
output_port: OutputPort<i64>,
}
enum CounterMessage {
Increment(i64),
Subscribe(ActorRef<CounterMessage>),
DoSomethingSlow(i64),
}
#[ractor::async_trait]
impl Actor for Counter {
type Msg = CounterMessage;
type State = CounterState;
type Arguments = ();
async fn pre_start(
&self,
_myself: ActorRef<Self::Msg>,
_: (),
) -> Result<Self::State, ActorProcessingErr> {
tracing::info!("Starting the counter actor");
// create the initial state
Ok(CounterState {
count: 0,
output_port: OutputPort::default(),
})
}
async fn handle(
&self,
_myself: ActorRef<Self::Msg>,
message: Self::Msg,
state: &mut Self::State,
) -> Result<(), ActorProcessingErr> {
match message {
CounterMessage::Increment(how_much) => {
if state.count < 5 {
tokio::time::sleep(std::time::Duration::from_secs(1)).await;
}
if state.count == 5 {
tracing::info!("Producer speed up, to ensure that we fill the 10 items broadcast buffer, receiver loop will be broken from lagging messages and silently drop");
}
state.count += how_much;
tracing::info!("Producer current count: {}", state.count);
state.output_port.send(how_much);
}
CounterMessage::Subscribe(subscriber) => {
state
.output_port
.subscribe(subscriber, |i64| Some(CounterMessage::DoSomethingSlow(i64)));
}
CounterMessage::DoSomethingSlow(value) => {
tokio::time::sleep(std::time::Duration::from_secs(2)).await;
state.count += value;
tracing::info!("Consumer current count: {}", state.count);
}
}
Ok(())
}
}
fn init_logging() {
tracing_subscriber::registry()
.with(fmt::layer())
.with(EnvFilter::from_default_env())
.init();
}
#[tokio::main]
async fn main() {
init_logging();
let (actor, handle) = Actor::spawn(Some("test_name".to_string()), Counter, ())
.await
.expect("Failed to start actor!");
let (actor_subscriber, handle_subscriber) =
Actor::spawn(Some("test_subscriber".to_string()), Counter, ())
.await
.expect("Failed to start actor!");
actor
.send_message(CounterMessage::Subscribe(actor_subscriber.clone()))
.expect("Failed to send message");
// +5 +10 -5 a few times, printing the value via RPC
for _i in 0..25 {
actor
.send_message(CounterMessage::Increment(1))
.expect("Failed to send message");
}
tokio::spawn(async move {
tokio::time::sleep(Duration::from_secs(25)).await;
actor.stop(None);
actor_subscriber.stop(None);
});
handle.await.expect("Actor failed to exit cleanly");
handle_subscriber
.await
.expect("Actor failed to exit cleanly");
}
I'm happy to share the real use case because it's open source. It's a server that consumes direct messages from the Nostr network.
The server processes messages that can arrive very quickly from a websocket port connected to the Nostr network relays. Initially, I encountered no issues because the test messages from my local test server were fewer than 10. However, as soon as I exceeded that limit, the initial query returned too much data too quickly, so my message handler here wasn't being called due to a broken loop. I was puzzled because my subscriber wasn't being called. Eventually, I "solved" it by adding a limit to the initial query here. This workaround mostly prevents such initial large bursts, which would be rare for the moment, but it's not a real long term solution.
I'm pretty new to Rust and the actor model, so any feedback or suggestion on how to use the crate or anything else will always be welcome.
I'm trying to understand what's going on here, we're creating a broadcast channel with a "buffer" of 10 messages. We were exiting the subscription if we got a Lagged(num_dropped)
error which isn't right I think. In the event we're lagged, we should log it and probably try and continue on.
However I'm honestly surprised this is happening, because we spawn a dedicated task to take the output port's received messages and forward them to an actor's (unbounded) channel. The only thing I could think of is if the converter is really slow converting messages from type A -> B.
Like as long as the subscription is running, we shouldn't ever fill the buffer of 10 unless we're under super heavy cpu pressure I would hope.