Atostek/RustDDS

Reception lag of 2 samples with some QoS settings

Closed this issue · 3 comments

tot0k commented

This issue may be a duplicate of #255.

Problem description

With some QoS settings, when sending samples before a DataReader is ready, i see the following behavior:

Creating DomainParticipant
Creating subscriber
Creating data_writer
Starting Subscriber thread
Starting Publishing
Publishing: 0
DDS receiver thread started
Publishing: 1
Publishing: 2
Publishing: 3
DataReader event: SubscriptionMatched { total: CountWithChange { count: 1, count_change: 1 }, current: CountWithChange { count: 1, count_change: 1 } }
Publishing: 4
Received: 2
Publishing: 5
Received: 3
Publishing: 6
Received: 4

We notice that after the SubscriptionMatched event appears, we receive the samples with a lag of 2 that stays for the whole DataWriter life.

Here are the QoS settings I tested, and the behavior I observed

'-' means default setting.

Reliability History Observed behavior
- - The first n samples are lost, then the acquisition is working properly without lag.
- KeepLast 10 The first n samples are received all at once when the reader is ready, then the acquisition is working properly without lag
Reliable (Duration infinite) - The first n samples are lost, then acquisition lag of 2 samples.
Reliable (Duration infinite) KeepLast 10 The first n samples are received all at once when the reader is ready, then the acquisition is working properly without lag

It seems that when using the Reliability QoS without History, we experience a 2 samples lag in the reader.

I also tested with and without DurabilityQoS to TransientLocal. There was the same behavior for every Reliability/History setup.

I can't understand this lag behavior. Is it normal ?

Reproducible example

Comment out lines in the qos() function to change QoS settings.

Code

use futures::{executor, select, FutureExt, StreamExt};
use rustdds::{
    no_key::{DataReader, DataWriter},
    policy::Reliability,
    policy::{Durability, History},
    DomainParticipant, Duration, QosPolicies, QosPolicyBuilder, TopicKind,
};
use serde::{Deserialize, Serialize};
use std::thread;

#[derive(Serialize, Deserialize, Debug, Default, Clone)]
struct ExampleMessage {
    count: u32,
}

static DOMAIN_ID: u16 = 0;

/// Default QoS settings
fn qos() -> QosPolicies {
    QosPolicyBuilder::new()
        //.durability(Durability::TransientLocal)
        .history(History::KeepLast { depth: 10 })
        .reliability(Reliability::Reliable {
            max_blocking_time: Duration::DURATION_INFINITE,
        })
        .build()
}

/// Create a publisher return its writer.
fn create_data_writer<TopicType: Serialize>(
    ctx: &DomainParticipant,
    topic_name: &str,
) -> DataWriter<TopicType> {
    let topic_type = std::any::type_name::<TopicType>();
    let topic = ctx
        .create_topic(
            topic_name.to_string(),
            topic_type.to_string(),
            &qos(),
            TopicKind::NoKey,
        )
        .unwrap();
    let publisher = ctx.create_publisher(&qos()).unwrap();
    publisher
        .create_datawriter_no_key_cdr::<TopicType>(&topic, Some(qos()))
        .unwrap()
}

///Publish a sample using the given data_writer
fn publish<TopicType: Serialize + std::fmt::Debug>(
    data_writer: &DataWriter<TopicType>,
    sample: TopicType,
) {
    let result = data_writer.write(sample, None);
    result
        .map_err(|e| println!("Failed to publish: {:?}", e))
        .ok();
}

/// Create a subscriber and return its reader
fn create_data_reader<TopicType: for<'de> Deserialize<'de> + 'static>(
    ctx: &DomainParticipant,
    topic_name: &str,
) -> DataReader<TopicType> {
    let topic_type = std::any::type_name::<TopicType>();
    let topic = ctx
        .create_topic(
            topic_name.to_string(),
            topic_type.to_string(),
            &qos(),
            TopicKind::NoKey,
        )
        .unwrap();
    let subscriber = ctx.create_subscriber(&qos()).unwrap();
    subscriber
        .create_datareader_no_key_cdr::<TopicType>(&topic, Some(qos()))
        .unwrap()
}

/// Create a domain participant, a publisher, and a subscriber.
/// Listen on the data_reader with an async sample stream in another thread and
/// publish every 0.5 sec on the main thread.
fn main() {
    println!("Creating DomainParticipant");
    let ctx = DomainParticipant::new(DOMAIN_ID).unwrap();

    println!("Creating subscriber");
    let data_reader = create_data_reader::<ExampleMessage>(&ctx, "hello/world");

    println!("Creating data_writer");
    let data_writer = create_data_writer::<ExampleMessage>(&ctx, "hello/world");

    println!("Starting Subscriber thread");
    // Run single thread with a future select
    let _ = thread::Builder::new()
        .name("ReceiverThread".to_string())
        .spawn(move || {
            println!("DDS receiver thread started");
            executor::block_on(async_infinite_loop(data_reader));
            println!("DDS receiver thread stopped");
        })
        .unwrap();

    println!("Starting Publishing");

    let mut sample = ExampleMessage { count: 0 };

    loop {
        println!("Publishing: {}", sample.count);
        publish(&data_writer, sample.clone());
        sample.count += 1;
        std::thread::sleep(std::time::Duration::from_millis(500));
    }
}

async fn async_infinite_loop(data_reader: DataReader<ExampleMessage>) {
    // Create the async streams
    let mut dr_stream = data_reader.async_sample_stream();
    let mut dr_event_stream = dr_stream.async_event_stream();

    loop {
        // Create the futures
        let next_some_dr_stream = dr_stream.select_next_some();
        let next_some_dr_event_stream = dr_event_stream.select_next_some();

        select! {
            result = next_some_dr_stream.fuse() => {
                match result {
                    Ok(sample) => println!("Received: {:}", sample.count),
                    Err(error) => println!("Error on sample read: {:?}", error)
                }
            }
            result = next_some_dr_event_stream.fuse() => {
                match result {
                    Ok(event) => println!("DataReader event: {event:?}"),
                    Err(error) => println!("Error while on reader event: {}", error)
                }
            }
        };
    }
}

Cargo.toml

[package]
name = "rdds-302"
version = "0.1.0"
edition = "2021"

# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html

[dependencies]
futures = "0.3.29"
rustdds = "0.8.4"
serde = "1.0.190"

I also tested on latest RustDDS commit on master, this didn't change the behavior.

Packet captures and logs of each QoS setup

Default QoS settings

rdds-302_default_qos.zip

Creating DomainParticipant
Creating subscriber
Creating data_writer
Starting Subscriber thread
Starting Publishing
Publishing: 0
DDS receiver thread started
Received: 0
Publishing: 1
Publishing: 2
Publishing: 3
DataReader event: SubscriptionMatched { total: CountWithChange { count: 1, count_change: 1 }, current: CountWithChange { count: 1, count_change: 1 } }
Publishing: 4
Received: 4
Publishing: 5
Received: 5
Publishing: 6
Received: 6
Publishing: 7
Received: 7

History only

rdds-302_history_only.zip

Creating DomainParticipant
Creating subscriber
Creating data_writer
Starting Subscriber thread
Starting Publishing
Publishing: 0
DDS receiver thread started
Received: 0
Publishing: 1
Publishing: 2
Publishing: 3
Publishing: 4
DataReader event: SubscriptionMatched { total: CountWithChange { count: 1, count_change: 1 }, current: CountWithChange { count: 1, count_change: 1 } }
Publishing: 5
Received: 5
Publishing: 6
Received: 6
Publishing: 7
Received: 7

Reliable only

rdds-302_reliable_only.zip

Creating DomainParticipant
Creating subscriber
Creating data_writer
Starting Subscriber thread
Starting Publishing
Publishing: 0
DDS receiver thread started
Publishing: 1
Publishing: 2
Publishing: 3
Publishing: 4
DataReader event: SubscriptionMatched { total: CountWithChange { count: 1, count_change: 1 }, current: CountWithChange { count: 1, count_change: 1 } }
Publishing: 5
Publishing: 6
Received: 4
Publishing: 7
Received: 5
Publishing: 8
Received: 6

Reliable with History

rdds-302_reliable_history.zip

Creating DomainParticipant
Creating subscriber
Creating data_writer
Starting Subscriber thread
Starting Publishing
Publishing: 0
DDS receiver thread started
Publishing: 1
Publishing: 2
Publishing: 3
Publishing: 4
DataReader event: SubscriptionMatched { total: CountWithChange { count: 1, count_change: 1 }, current: CountWithChange { count: 1, count_change: 1 } }
Publishing: 5
Received: 4
Received: 5
Publishing: 6
Received: 6
Publishing: 7
Received: 7
tot0k commented

With Keyed topics, when I publish with a first "early" writer (that doesn't wait for the DataReader to be ready), then with a second "late" writer (that waits some time before starting publishing), I see a lag on the samples published by the early writer, and no lag for the late one:

Creating DomainParticipant                                                                                                                                                                                                                     
Creating subscriber                                                                                                                                                                                                                            
Creating data_writer                                                                                                                                                                                                                           
Starting Subscriber thread                                                                                                                                                                                                                     
Starting Publishing                                                                                                                                                                                                                            
Publishing Early: 0                                                                                                                                                                                                                            
DDS receiver thread started                                                                                                                                                                                                                    
Publishing Early: 1                                                                                                                                                                                                                            
Publishing Early: 2                                                                                                                                                                                                                            
Publishing Early: 3                                                                                                                                                                                                                            
Publishing Early: 4                                                                                                                                                                                                                            
DataReader event: SubscriptionMatched { total: CountWithChange { count: 1, count_change: 1 }, current: CountWithChange { count: 1, count_change: 1 } }                                                                                         
DataReader event: SubscriptionMatched { total: CountWithChange { count: 2, count_change: 1 }, current: CountWithChange { count: 2, count_change: 1 } }
Publishing Early: 5
Publishing Early: 6  
Received: Value(ExampleMessage { id: 1, count: 4 })
Publishing Early: 7                                   
Received: Value(ExampleMessage { id: 1, count: 5 })
Publishing Early: 8  
Received: Value(ExampleMessage { id: 1, count: 6 })
Publishing Early: 9
Publishing Late: 1000
Received: Value(ExampleMessage { id: 1, count: 7 })
Received: Value(ExampleMessage { id: 2, count: 1000 })
Publishing Early: 10
Publishing Late: 1001
Received: Value(ExampleMessage { id: 1, count: 8 })
Received: Value(ExampleMessage { id: 2, count: 1001 })
Publishing Early: 11
Publishing Late: 1002
Received: Value(ExampleMessage { id: 1, count: 9 })
Received: Value(ExampleMessage { id: 2, count: 1002 })
Publishing Early: 12
Publishing Late: 1003
Received: Value(ExampleMessage { id: 1, count: 10 })
Received: Value(ExampleMessage { id: 2, count: 1003 })
Publishing Early: 13
Publishing Late: 1004
Received: Value(ExampleMessage { id: 1, count: 11 })
Received: Value(ExampleMessage { id: 2, count: 1004 })
Publishing Early: 14
Publishing Late: 1005
Received: Value(ExampleMessage { id: 1, count: 12 })
Received: Value(ExampleMessage { id: 2, count: 1005 })
Code

use futures::{executor, select, FutureExt, StreamExt};
use rustdds::{
    with_key::{DataReader, DataWriter},
    policy::Reliability,
    policy::{Durability, History},
    DomainParticipant, Duration, QosPolicies, QosPolicyBuilder, TopicKind, Keyed,
};
use serde::{Deserialize, Serialize};
use std::thread;

#[derive(Serialize, Deserialize, Debug, Default, Clone)]
struct ExampleMessage {
    id: u8,
    count: u32,
}

impl Keyed for ExampleMessage {
    type K = u8;
    fn key(&self) -> Self::K {
        self.id
    }
}

static DOMAIN_ID: u16 = 0;

/// Default QoS settings
fn qos() -> QosPolicies {
    QosPolicyBuilder::new()
        //.durability(Durability::TransientLocal)
        //.history(History::KeepLast { depth: 10 })
        .reliability(Reliability::Reliable {
            max_blocking_time: Duration::DURATION_INFINITE,
        })
        .build()
}

/// Create a publisher return its writer.
fn create_data_writer<TopicType: Serialize + Keyed>(
    ctx: &DomainParticipant,
    topic_name: &str,
) -> DataWriter<TopicType> {
    let topic_type = std::any::type_name::<TopicType>();
    let topic = ctx
        .create_topic(
            topic_name.to_string(),
            topic_type.to_string(),
            &qos(),
            TopicKind::WithKey,
        )
        .unwrap();
    let publisher = ctx.create_publisher(&qos()).unwrap();
    publisher
        .create_datawriter_cdr::<TopicType>(&topic, Some(qos()))
        .unwrap()
}

///Publish a sample using the given data_writer
fn publish<TopicType: Serialize + std::fmt::Debug + Keyed>(
    data_writer: &DataWriter<TopicType>,
    sample: TopicType,
) {
    let result = data_writer.write(sample, None);
    result
        .map_err(|e| println!("Failed to publish: {:?}", e))
        .ok();
}

/// Create a subscriber and return its reader
fn create_data_reader<TopicType: for<'de> Deserialize<'de> + 'static + Keyed>(
    ctx: &DomainParticipant,
    topic_name: &str,
) -> DataReader<TopicType>
where for<'de> <TopicType as Keyed>::K: Deserialize<'de> {
    let topic_type = std::any::type_name::<TopicType>();
    let topic = ctx
        .create_topic(
            topic_name.to_string(),
            topic_type.to_string(),
            &qos(),
            TopicKind::WithKey,
        )
        .unwrap();
    let subscriber = ctx.create_subscriber(&qos()).unwrap();
    subscriber
        .create_datareader_cdr::<TopicType>(&topic, Some(qos()))
        .unwrap()
}

/// Create a domain participant, a publisher, and a subscriber.
/// Listen on the data_reader with an async sample stream in another thread and
/// publish every 0.5 sec on the main thread.
fn main() {
    println!("Creating DomainParticipant");
    let ctx = DomainParticipant::new(DOMAIN_ID).unwrap();

    println!("Creating subscriber");
    let data_reader = create_data_reader::<ExampleMessage>(&ctx, "hello/world");

    println!("Creating data_writer");
    let early_data_writer = create_data_writer::<ExampleMessage>(&ctx, "hello/world");
    let late_data_writer = create_data_writer::<ExampleMessage>(&ctx, "hello/world");

    println!("Starting Subscriber thread");
    // Run single thread with a future select
    let _ = thread::Builder::new()
        .name("ReceiverThread".to_string())
        .spawn(move || {
            println!("DDS receiver thread started");
            executor::block_on(async_infinite_loop(data_reader));
            println!("DDS receiver thread stopped");
        })
        .unwrap();

    println!("Starting Publishing");

    let mut sample_early = ExampleMessage { id: 1, count: 0 };
    let mut sample_late = ExampleMessage { id: 2, count: 1000 };

    loop {
        println!("Publishing Early: {}", sample_early.count);
        publish(&early_data_writer, sample_early.clone());
        sample_early.count += 1;

        if sample_early.count >= 10 {
            println!("Publishing Late: {}", sample_late.count);
            publish(&late_data_writer, sample_late.clone());
            sample_late.count += 1;
        }
        std::thread::sleep(std::time::Duration::from_millis(500));
    }
}

async fn async_infinite_loop(data_reader: DataReader<ExampleMessage>) {
    // Create the async streams
    let mut dr_stream = data_reader.async_sample_stream();
    let mut dr_event_stream = dr_stream.async_event_stream();

    loop {
        // Create the futures
        let next_some_dr_stream = dr_stream.select_next_some();
        let next_some_dr_event_stream = dr_event_stream.select_next_some();

        select! {
            result = next_some_dr_stream.fuse() => {
                match result {
                    Ok(sample) => println!("Received: {:?}", sample),
                    Err(error) => println!("Error on sample read: {:?}", error)
                }
            }
            result = next_some_dr_event_stream.fuse() => {
                match result {
                    Ok(event) => println!("DataReader event: {event:?}"),
                    Err(error) => println!("Error while on reader event: {}", error)
                }
            }
        };
    }
}

SelimV commented

Hi and thank you for reporting.

The problem was caused by readers not properly reacting to HEARTBEAT submessages declaring unavailable data. If the writer is set to only keep the latest sample in reliable mode, after matching to the reader the first sample it tries to send is for example sequence number 8. Along with it, it sends a HEARTBEAT stating that only sample 8 is available. Now the reader should tell the topic cache, and by extension datareaders, not to expect samples before 8, but this was not done properly.

Firstly, skipping unavailable samples would make the reader only update its own value to state that samples up to 7 are not expected. Thus it would ignore the fact that it just received sample 8 making it request it again, and discard the resent sample. When receiving the sample 9, the corresponding HEARTBEAT would cause it to mark 8 as no longer expected, but at the same time as the last one received reliably, as it had already been received. This causes the first round of delay.

Secondly, the reader would not update its own value to the topic cache when reacting to HEARTBEATs. Therefore the value indicating that 8 should be read by datareaders would be updated when DATA submessage containing sample 10 is received, causing the second round of delay.

If the writer history depth is 1, the issue caused the reader to never be up to date, and the 2 round delay would persist. If the depth is 2 or more, the missing sample 7 would be requested and handled properly, making the reader up to date, thus making the problem not appear.

It is now fixed in master with the commit 1b90316.

tot0k commented

Hi @SelimV,

Wow, that seems to be a tough one to analyze, I didn't go that far !

Thank you for your fast answer and for the fix :)