Not dropping a capability?
LorenzSelv opened this issue · 4 comments
I have the following timely program which has a single unary_notify
operator with no input. It asks to be notified at time 1
and whenever a notification is delivered it requests a notification for the following timestamp t+1
.
After the first notification (t=1
) is delivered, the program stalls and never delivers the notification for t=2
, which suggests someone is holding a capability for that timestamp. This "someone" should not be empty
as it dropped its capability.
I looked at progress updates messages and the frontier is still stuck at timestamp t=2
.
Who is holding a capability?
extern crate timely;
use timely::dataflow::operators::*;
use timely::dataflow::channels::pact::Pipeline;
use timely::dataflow::operators::generic::operator::empty;
use std::cell::RefCell;
use std::rc::Rc;
fn main() {
timely::execute_from_args(std::env::args(), |worker| {
let index = worker.index();
worker.dataflow(|scope| {
empty::<_, ()>(scope)
.unary_notify(Pipeline, "hello", vec![1], move |input, output, notificator| {
input.for_each(|time, data| {
panic!("no input, only notifications");
});
let ref1 = Rc::new(RefCell::new(None));
let ref2 = Rc::clone(&ref1);
notificator.for_each(|time, count, _self| {
println!("[W{}] notificator.for_each time={:?}, count={:?}", index, time.time(), count);
*ref1.borrow_mut() = Some(time.clone());
let out = *time.time();
let mut session = output.session(&time);
session.give(out);
});
let mut borrow = ref2.borrow_mut();
if let Some(time) = borrow.take() {
let next_time = *time.time() + 1;
println!("requested notification for time {}", next_time);
if next_time <= 10 {
notificator.notify_at(time.delayed(&next_time));
std::thread::sleep(std::time::Duration::from_secs(1));
}
}
})
.inspect_batch(move |t, x| println!("[W{}@inspect] seen {:?} at time {:?}", index, x, t));
});
}).unwrap();
}
I had a quick peek. I reduced the example a bit, and added monitoring of the output frontier of "hello":
use timely::dataflow::operators::*;
use timely::dataflow::channels::pact::Pipeline;
use timely::dataflow::operators::generic::operator::{empty, Operator};
use std::cell::RefCell;
use std::rc::Rc;
fn main() {
timely::execute_from_args(std::env::args(), |worker| {
let index = worker.index();
worker.dataflow(|scope| {
empty::<_, ()>(scope)
.unary_notify(Pipeline, "hello", vec![1], move |input, output, notificator| {
input.for_each(|time, data| {
panic!("no input, only notifications");
});
let mut cap = None;
notificator.for_each(|time, count, _self| {
println!("[W{}] notificator.for_each time={:?}, count={:?}", index, time.time(), count);
cap = Some(time.clone());
let out = *time.time();
let mut session = output.session(&time);
session.give(out);
});
if let Some(time) = cap.take() {
let next_time = *time.time() + 1;
if next_time <= 10 {
println!("requested notification for time {}", next_time);
notificator.notify_at(time.delayed(&next_time));
}
}
eprintln!("notificator.frontier: {:?}", notificator.frontier(0).iter().collect::<Vec<_>>());
eprintln!("cap: {:?}", cap);
})
.unary_frontier(Pipeline, "hello2", move |_info, _cap| {
|input, output| {
input.for_each(|time, data| {
output.session(&time).give(());
});
eprintln!("frontier 2: {:?}", input.frontier().frontier().iter().collect::<Vec<_>>());
}
})
.inspect_batch(move |t, x| println!("[W{}@inspect] seen {:?} at time {:?}", index, x, t));
});
}).unwrap();
}
This hangs with:
notificator.frontier: [0]
cap: None
frontier 2: [0]
[W0] notificator.for_each time=1, count=1
requested notification for time 2
notificator.frontier: []
cap: None
frontier 2: [1]
[W0@inspect] seen [()] at time 1
frontier 2: [2]
It's still stepping, I checked, but neither operator is being scheduled. This may be an interaction with activator
.
Removing the notify_at
call or moving it into notificator.for_each
prevents the issue.
@utaal's diagnosis sounds right. My guess is that the notificator is not doing activation correctly (at all) when you request a notification for a time that has already passed. Putting the call in for_each
allows the notificator to serve the notification before dropping out of the closure.
Short version: timely dataflow schedules operators in response to three types of events:
- new messages
- new progress (input frontiers change)
- explicit activations
In this example, the empty
stream produces no data, and only one round of progress change. The operator likely gets scheduled once, and then in the call to notify_at
stashes a capability. However, the notification could be served immediately, and worse there is no reason to expect that an external force will cause the operator to be re-scheduled.
I think this is a bug in notificator, which lets you write code like this without handling activation for this perhap-corner-case. But a bug in either case. Lemme ponder for a bit to be sure (middle of the night here) but it seems like a notificator should have an activator for this sort of problem.
So I pondered a bit, and I'm not sure whether to classify this as a notificator bug or "user error" ("bad documentation", let's say). The event-driven timely has a strong requirement that operators should respond to all available signals each time they are scheduled, and re-activate themselves if they choose not to. For example, if an operator does not drain all of its inputs, or does not fully drain its notificator, that is "the operator's fault".
We could look in to writing operators that are more defensive, and which pro-actively re-schedule operators that appear to have lingering work. We could perhaps warn somewhere, or more aggressively explain what has gone wrong. I'm not certain about the best way to do either of these that doesn't impact operators that people might want to write. In personal experience, operators I have written wouldn't be harmed by these behaviors, but I don't know if that means everyone should have to.
If anyone has some philosophy on the right way to handle this I'm all ears. My guess is that core timely shouldn't do this sort of thing, but a reasonable layer atop it (which is what the notificator is starting to get at) might choose to.
Thanks for the comments.
I understand the cause now and what you are saying makes sense to me. This was just a simple program to stress the capability initialization for the new workers joining the cluster (for the rescaling project that I'm working on at the moment). I tried to came up with some operators doing some "unusual" things with capabilities, the problem occurred and I thought to bring it up :)
Also, one could achieve (almost) the same behavior by using the _self
parameter passed to the for_each
closure to schedule future notifications. So I agree it's not worth spending time trying to "fix" it.
Closing the issue.