Reception lag of 2 samples with some QoS settings
Closed this issue · 3 comments
This issue may be a duplicate of #255.
Problem description
With some QoS settings, when sending samples before a DataReader
is ready, i see the following behavior:
Creating DomainParticipant
Creating subscriber
Creating data_writer
Starting Subscriber thread
Starting Publishing
Publishing: 0
DDS receiver thread started
Publishing: 1
Publishing: 2
Publishing: 3
DataReader event: SubscriptionMatched { total: CountWithChange { count: 1, count_change: 1 }, current: CountWithChange { count: 1, count_change: 1 } }
Publishing: 4
Received: 2
Publishing: 5
Received: 3
Publishing: 6
Received: 4
We notice that after the SubscriptionMatched
event appears, we receive the samples with a lag of 2 that stays for the whole DataWriter
life.
Here are the QoS settings I tested, and the behavior I observed
'
-
' means default setting.
Reliability | History | Observed behavior |
---|---|---|
- | - | The first n samples are lost, then the acquisition is working properly without lag. |
- | KeepLast 10 | The first n samples are received all at once when the reader is ready, then the acquisition is working properly without lag |
Reliable (Duration infinite) | - | The first n samples are lost, then acquisition lag of 2 samples. |
Reliable (Duration infinite) | KeepLast 10 | The first n samples are received all at once when the reader is ready, then the acquisition is working properly without lag |
It seems that when using the Reliability QoS without History
, we experience a 2 samples lag in the reader.
I also tested with and without DurabilityQoS to
TransientLocal
. There was the same behavior for everyReliability
/History
setup.
I can't understand this lag behavior. Is it normal ?
Reproducible example
Comment out lines in the qos()
function to change QoS settings.
Code
use futures::{executor, select, FutureExt, StreamExt};
use rustdds::{
no_key::{DataReader, DataWriter},
policy::Reliability,
policy::{Durability, History},
DomainParticipant, Duration, QosPolicies, QosPolicyBuilder, TopicKind,
};
use serde::{Deserialize, Serialize};
use std::thread;
#[derive(Serialize, Deserialize, Debug, Default, Clone)]
struct ExampleMessage {
count: u32,
}
static DOMAIN_ID: u16 = 0;
/// Default QoS settings
fn qos() -> QosPolicies {
QosPolicyBuilder::new()
//.durability(Durability::TransientLocal)
.history(History::KeepLast { depth: 10 })
.reliability(Reliability::Reliable {
max_blocking_time: Duration::DURATION_INFINITE,
})
.build()
}
/// Create a publisher return its writer.
fn create_data_writer<TopicType: Serialize>(
ctx: &DomainParticipant,
topic_name: &str,
) -> DataWriter<TopicType> {
let topic_type = std::any::type_name::<TopicType>();
let topic = ctx
.create_topic(
topic_name.to_string(),
topic_type.to_string(),
&qos(),
TopicKind::NoKey,
)
.unwrap();
let publisher = ctx.create_publisher(&qos()).unwrap();
publisher
.create_datawriter_no_key_cdr::<TopicType>(&topic, Some(qos()))
.unwrap()
}
///Publish a sample using the given data_writer
fn publish<TopicType: Serialize + std::fmt::Debug>(
data_writer: &DataWriter<TopicType>,
sample: TopicType,
) {
let result = data_writer.write(sample, None);
result
.map_err(|e| println!("Failed to publish: {:?}", e))
.ok();
}
/// Create a subscriber and return its reader
fn create_data_reader<TopicType: for<'de> Deserialize<'de> + 'static>(
ctx: &DomainParticipant,
topic_name: &str,
) -> DataReader<TopicType> {
let topic_type = std::any::type_name::<TopicType>();
let topic = ctx
.create_topic(
topic_name.to_string(),
topic_type.to_string(),
&qos(),
TopicKind::NoKey,
)
.unwrap();
let subscriber = ctx.create_subscriber(&qos()).unwrap();
subscriber
.create_datareader_no_key_cdr::<TopicType>(&topic, Some(qos()))
.unwrap()
}
/// Create a domain participant, a publisher, and a subscriber.
/// Listen on the data_reader with an async sample stream in another thread and
/// publish every 0.5 sec on the main thread.
fn main() {
println!("Creating DomainParticipant");
let ctx = DomainParticipant::new(DOMAIN_ID).unwrap();
println!("Creating subscriber");
let data_reader = create_data_reader::<ExampleMessage>(&ctx, "hello/world");
println!("Creating data_writer");
let data_writer = create_data_writer::<ExampleMessage>(&ctx, "hello/world");
println!("Starting Subscriber thread");
// Run single thread with a future select
let _ = thread::Builder::new()
.name("ReceiverThread".to_string())
.spawn(move || {
println!("DDS receiver thread started");
executor::block_on(async_infinite_loop(data_reader));
println!("DDS receiver thread stopped");
})
.unwrap();
println!("Starting Publishing");
let mut sample = ExampleMessage { count: 0 };
loop {
println!("Publishing: {}", sample.count);
publish(&data_writer, sample.clone());
sample.count += 1;
std::thread::sleep(std::time::Duration::from_millis(500));
}
}
async fn async_infinite_loop(data_reader: DataReader<ExampleMessage>) {
// Create the async streams
let mut dr_stream = data_reader.async_sample_stream();
let mut dr_event_stream = dr_stream.async_event_stream();
loop {
// Create the futures
let next_some_dr_stream = dr_stream.select_next_some();
let next_some_dr_event_stream = dr_event_stream.select_next_some();
select! {
result = next_some_dr_stream.fuse() => {
match result {
Ok(sample) => println!("Received: {:}", sample.count),
Err(error) => println!("Error on sample read: {:?}", error)
}
}
result = next_some_dr_event_stream.fuse() => {
match result {
Ok(event) => println!("DataReader event: {event:?}"),
Err(error) => println!("Error while on reader event: {}", error)
}
}
};
}
}
Cargo.toml
[package]
name = "rdds-302"
version = "0.1.0"
edition = "2021"
# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html
[dependencies]
futures = "0.3.29"
rustdds = "0.8.4"
serde = "1.0.190"
I also tested on latest RustDDS commit on master, this didn't change the behavior.
Packet captures and logs of each QoS setup
Default QoS settings
Creating DomainParticipant
Creating subscriber
Creating data_writer
Starting Subscriber thread
Starting Publishing
Publishing: 0
DDS receiver thread started
Received: 0
Publishing: 1
Publishing: 2
Publishing: 3
DataReader event: SubscriptionMatched { total: CountWithChange { count: 1, count_change: 1 }, current: CountWithChange { count: 1, count_change: 1 } }
Publishing: 4
Received: 4
Publishing: 5
Received: 5
Publishing: 6
Received: 6
Publishing: 7
Received: 7
History only
Creating DomainParticipant
Creating subscriber
Creating data_writer
Starting Subscriber thread
Starting Publishing
Publishing: 0
DDS receiver thread started
Received: 0
Publishing: 1
Publishing: 2
Publishing: 3
Publishing: 4
DataReader event: SubscriptionMatched { total: CountWithChange { count: 1, count_change: 1 }, current: CountWithChange { count: 1, count_change: 1 } }
Publishing: 5
Received: 5
Publishing: 6
Received: 6
Publishing: 7
Received: 7
Reliable only
Creating DomainParticipant
Creating subscriber
Creating data_writer
Starting Subscriber thread
Starting Publishing
Publishing: 0
DDS receiver thread started
Publishing: 1
Publishing: 2
Publishing: 3
Publishing: 4
DataReader event: SubscriptionMatched { total: CountWithChange { count: 1, count_change: 1 }, current: CountWithChange { count: 1, count_change: 1 } }
Publishing: 5
Publishing: 6
Received: 4
Publishing: 7
Received: 5
Publishing: 8
Received: 6
Reliable with History
Creating DomainParticipant
Creating subscriber
Creating data_writer
Starting Subscriber thread
Starting Publishing
Publishing: 0
DDS receiver thread started
Publishing: 1
Publishing: 2
Publishing: 3
Publishing: 4
DataReader event: SubscriptionMatched { total: CountWithChange { count: 1, count_change: 1 }, current: CountWithChange { count: 1, count_change: 1 } }
Publishing: 5
Received: 4
Received: 5
Publishing: 6
Received: 6
Publishing: 7
Received: 7
With Keyed topics, when I publish with a first "early" writer (that doesn't wait for the DataReader
to be ready), then with a second "late" writer (that waits some time before starting publishing), I see a lag on the samples published by the early writer, and no lag for the late one:
Creating DomainParticipant
Creating subscriber
Creating data_writer
Starting Subscriber thread
Starting Publishing
Publishing Early: 0
DDS receiver thread started
Publishing Early: 1
Publishing Early: 2
Publishing Early: 3
Publishing Early: 4
DataReader event: SubscriptionMatched { total: CountWithChange { count: 1, count_change: 1 }, current: CountWithChange { count: 1, count_change: 1 } }
DataReader event: SubscriptionMatched { total: CountWithChange { count: 2, count_change: 1 }, current: CountWithChange { count: 2, count_change: 1 } }
Publishing Early: 5
Publishing Early: 6
Received: Value(ExampleMessage { id: 1, count: 4 })
Publishing Early: 7
Received: Value(ExampleMessage { id: 1, count: 5 })
Publishing Early: 8
Received: Value(ExampleMessage { id: 1, count: 6 })
Publishing Early: 9
Publishing Late: 1000
Received: Value(ExampleMessage { id: 1, count: 7 })
Received: Value(ExampleMessage { id: 2, count: 1000 })
Publishing Early: 10
Publishing Late: 1001
Received: Value(ExampleMessage { id: 1, count: 8 })
Received: Value(ExampleMessage { id: 2, count: 1001 })
Publishing Early: 11
Publishing Late: 1002
Received: Value(ExampleMessage { id: 1, count: 9 })
Received: Value(ExampleMessage { id: 2, count: 1002 })
Publishing Early: 12
Publishing Late: 1003
Received: Value(ExampleMessage { id: 1, count: 10 })
Received: Value(ExampleMessage { id: 2, count: 1003 })
Publishing Early: 13
Publishing Late: 1004
Received: Value(ExampleMessage { id: 1, count: 11 })
Received: Value(ExampleMessage { id: 2, count: 1004 })
Publishing Early: 14
Publishing Late: 1005
Received: Value(ExampleMessage { id: 1, count: 12 })
Received: Value(ExampleMessage { id: 2, count: 1005 })
Code
use futures::{executor, select, FutureExt, StreamExt};
use rustdds::{
with_key::{DataReader, DataWriter},
policy::Reliability,
policy::{Durability, History},
DomainParticipant, Duration, QosPolicies, QosPolicyBuilder, TopicKind, Keyed,
};
use serde::{Deserialize, Serialize};
use std::thread;
#[derive(Serialize, Deserialize, Debug, Default, Clone)]
struct ExampleMessage {
id: u8,
count: u32,
}
impl Keyed for ExampleMessage {
type K = u8;
fn key(&self) -> Self::K {
self.id
}
}
static DOMAIN_ID: u16 = 0;
/// Default QoS settings
fn qos() -> QosPolicies {
QosPolicyBuilder::new()
//.durability(Durability::TransientLocal)
//.history(History::KeepLast { depth: 10 })
.reliability(Reliability::Reliable {
max_blocking_time: Duration::DURATION_INFINITE,
})
.build()
}
/// Create a publisher return its writer.
fn create_data_writer<TopicType: Serialize + Keyed>(
ctx: &DomainParticipant,
topic_name: &str,
) -> DataWriter<TopicType> {
let topic_type = std::any::type_name::<TopicType>();
let topic = ctx
.create_topic(
topic_name.to_string(),
topic_type.to_string(),
&qos(),
TopicKind::WithKey,
)
.unwrap();
let publisher = ctx.create_publisher(&qos()).unwrap();
publisher
.create_datawriter_cdr::<TopicType>(&topic, Some(qos()))
.unwrap()
}
///Publish a sample using the given data_writer
fn publish<TopicType: Serialize + std::fmt::Debug + Keyed>(
data_writer: &DataWriter<TopicType>,
sample: TopicType,
) {
let result = data_writer.write(sample, None);
result
.map_err(|e| println!("Failed to publish: {:?}", e))
.ok();
}
/// Create a subscriber and return its reader
fn create_data_reader<TopicType: for<'de> Deserialize<'de> + 'static + Keyed>(
ctx: &DomainParticipant,
topic_name: &str,
) -> DataReader<TopicType>
where for<'de> <TopicType as Keyed>::K: Deserialize<'de> {
let topic_type = std::any::type_name::<TopicType>();
let topic = ctx
.create_topic(
topic_name.to_string(),
topic_type.to_string(),
&qos(),
TopicKind::WithKey,
)
.unwrap();
let subscriber = ctx.create_subscriber(&qos()).unwrap();
subscriber
.create_datareader_cdr::<TopicType>(&topic, Some(qos()))
.unwrap()
}
/// Create a domain participant, a publisher, and a subscriber.
/// Listen on the data_reader with an async sample stream in another thread and
/// publish every 0.5 sec on the main thread.
fn main() {
println!("Creating DomainParticipant");
let ctx = DomainParticipant::new(DOMAIN_ID).unwrap();
println!("Creating subscriber");
let data_reader = create_data_reader::<ExampleMessage>(&ctx, "hello/world");
println!("Creating data_writer");
let early_data_writer = create_data_writer::<ExampleMessage>(&ctx, "hello/world");
let late_data_writer = create_data_writer::<ExampleMessage>(&ctx, "hello/world");
println!("Starting Subscriber thread");
// Run single thread with a future select
let _ = thread::Builder::new()
.name("ReceiverThread".to_string())
.spawn(move || {
println!("DDS receiver thread started");
executor::block_on(async_infinite_loop(data_reader));
println!("DDS receiver thread stopped");
})
.unwrap();
println!("Starting Publishing");
let mut sample_early = ExampleMessage { id: 1, count: 0 };
let mut sample_late = ExampleMessage { id: 2, count: 1000 };
loop {
println!("Publishing Early: {}", sample_early.count);
publish(&early_data_writer, sample_early.clone());
sample_early.count += 1;
if sample_early.count >= 10 {
println!("Publishing Late: {}", sample_late.count);
publish(&late_data_writer, sample_late.clone());
sample_late.count += 1;
}
std::thread::sleep(std::time::Duration::from_millis(500));
}
}
async fn async_infinite_loop(data_reader: DataReader<ExampleMessage>) {
// Create the async streams
let mut dr_stream = data_reader.async_sample_stream();
let mut dr_event_stream = dr_stream.async_event_stream();
loop {
// Create the futures
let next_some_dr_stream = dr_stream.select_next_some();
let next_some_dr_event_stream = dr_event_stream.select_next_some();
select! {
result = next_some_dr_stream.fuse() => {
match result {
Ok(sample) => println!("Received: {:?}", sample),
Err(error) => println!("Error on sample read: {:?}", error)
}
}
result = next_some_dr_event_stream.fuse() => {
match result {
Ok(event) => println!("DataReader event: {event:?}"),
Err(error) => println!("Error while on reader event: {}", error)
}
}
};
}
}
Hi and thank you for reporting.
The problem was caused by readers not properly reacting to HEARTBEAT submessages declaring unavailable data. If the writer is set to only keep the latest sample in reliable mode, after matching to the reader the first sample it tries to send is for example sequence number 8. Along with it, it sends a HEARTBEAT stating that only sample 8 is available. Now the reader should tell the topic cache, and by extension datareaders, not to expect samples before 8, but this was not done properly.
Firstly, skipping unavailable samples would make the reader only update its own value to state that samples up to 7 are not expected. Thus it would ignore the fact that it just received sample 8 making it request it again, and discard the resent sample. When receiving the sample 9, the corresponding HEARTBEAT would cause it to mark 8 as no longer expected, but at the same time as the last one received reliably, as it had already been received. This causes the first round of delay.
Secondly, the reader would not update its own value to the topic cache when reacting to HEARTBEATs. Therefore the value indicating that 8 should be read by datareaders would be updated when DATA submessage containing sample 10 is received, causing the second round of delay.
If the writer history depth is 1, the issue caused the reader to never be up to date, and the 2 round delay would persist. If the depth is 2 or more, the missing sample 7 would be requested and handled properly, making the reader up to date, thus making the problem not appear.
It is now fixed in master with the commit 1b90316.