webrtc-rs/interceptor

Wrong rtcp-reader flow when have more than a stream

giangndm opened this issue · 5 comments

When we have more than a stream, then the below logic cause overwrite the previous bonded reader. This issue related to webrtc-rs/webrtc#138

    /// bind_rtcp_reader lets you modify any incoming RTCP packets. It is called once per sender/receiver, however this might
    /// change in the future. The returned method will be called once per packet batch.
    async fn bind_rtcp_reader(
        &self,
        reader: Arc<dyn RTCPReader + Send + Sync>,
    ) -> Arc<dyn RTCPReader + Send + Sync> {
        {
            let mut parent_rtcp_reader = self.internal.parent_rtcp_reader.lock().await;
            *parent_rtcp_reader = Some(reader);
        }

        Arc::clone(&self.internal) as Arc<dyn RTCPReader + Send + Sync>
    }

We can resolve this by wrapper a new RTCPReader for each bonded reader.

Thanks for reporting this issue. PRs are welcome to fix this issue.

@giangndm, I just checked the code, such bind_rtcp_reader overwrite in NACK responder or Report receiver is by design, since
in its RTCPReader trait impl, i.e.,
https://github.com/webrtc-rs/interceptor/blob/main/src/nack/responder/mod.rs#L104
previous or parent rtcp_reader will be first called.

Could you provide an example with two streams that cause NACK or RR wrong behavior so that we can investigate more? Thanks.

@rainliu I using simulcast example with MacOS network-link-conditioner for simulate 1% packet lose or can simulate packet lose in UDP socket; this scenario cause video frame cannot decode at receiver because of the missing packet, have NACK but don't resend.

I check the interceptor code that below logic when has more than a stream.

- With local_stream1: interceptor.bind_rtcp_reader will be called with stream1 srtp_rtcp_reader => will set internal.parent_rtcp_reader in nack
- With local_stream2: interceptor.bind_rtcp_reader will be called with stream2 srtp_rtcp_reader => will overwrite internal.parent_rtcp_reader in nack (lost reference to Stream1 rtcp_reader)
- With local_stream3: interceptor.bind_rtcp_reader will be called with stream3 srtp_rtcp_reader => will overwrite internal.parent_rtcp_reader in nack (lost reference Stream2 rtcp_reader)

That cause Stream1 and Stream2 rtcp_reader will be called with Stream3 srtp_rtcp_reader, therefore NACK for stream1 and stream2 never read

Now I see this issue, after spending a few days trying to work out why I wasn't able to get RTCP sender reports out of WebRTC.RS

I won't open another issue for that because it boils down to the same thing (And I've written the code to fix it, I'll send a pull request tomorrow with the 'correct' code for all affected modules, pending your feedback on this comment)

Essentially, you can end up with multiple video streams even if you have a single video stream if your web browser sends the relevant SDP to trigger the RTX_SSRC code path.

Consider that in *webrtc/src/rtp_transceiver/rtp_receiver/mod.rs *

We have the following code

                    let (rtp_read_stream, rtp_interceptor, rtcp_read_stream, rtcp_interceptor) =
                        self.transport
                            .streams_for_ssrc(encoding.ssrc, &stream_info, &interceptor)
                            .await?;

and

                let (rtp_read_stream, rtp_interceptor, rtcp_read_stream, rtcp_interceptor) = self
                    .transport
                    .streams_for_ssrc(rtx_ssrc, &stream_info, &interceptor)
                    .await?;

Both of these, in the case of DTLS transport trigger a call to bind_rtcp_reader on the same interceptor.

Consider the simplest sample I can find in the default interceptors code, the NACK reader above.

We have an Internal state object

pub struct ResponderInternal {
    log2_size: u8,
    streams: Arc<Mutex<HashMap<u32, Arc<ResponderStream>>>>,
    parent_rtcp_reader: Mutex<Option<Arc<dyn RTCPReader + Send + Sync>>>,
}

and the following code during a bind

    async fn bind_rtcp_reader(
        &self,
        reader: Arc<dyn RTCPReader + Send + Sync>,
    ) -> Arc<dyn RTCPReader + Send + Sync> {
        {
            let mut parent_rtcp_reader = self.internal.parent_rtcp_reader.lock().await;
            *parent_rtcp_reader = Some(reader);
        }

        Arc::clone(&self.internal) as Arc<dyn RTCPReader + Send + Sync>
    }

Glancing at this code, we might think that all is fine - we're doing a clone! What can go wrong? The answer is actually the second call to bind_rtcp_reader still trashes the underlying state which is shared across both calls. In my case, when I'm trying to read RTCP, this means that we end up calling read_rtcp on the wrong stream and never get any data.

Looking at the Go, the function that gets returned from a call to BindRtcpReader is entirely stateless and this should really be the case here too, a quick experiment done in this single file results in the following code, where we keep the shared state for keeping track of streams/config, and a new struct for wrapping a reader every time the function is called.

pub struct ResponderInternal {
    log2_size: u8,
    streams: Arc<Mutex<HashMap<u32, Arc<ResponderStream>>>>,
}

pub struct ResponderRtcpReader {
    parent_rtcp_reader: Arc<dyn RTCPReader + Send + Sync>,
    internal: Arc<ResponderInternal>
}

The code in both the bind and implementation gets a bit simpler then, we no longer need the mutexes (I'm not sure we need the mutexes anyway if we're using Arc, but I digress).

#[async_trait]
impl RTCPReader for ResponderRtcpReader {
    async fn read(&self, buf: &mut [u8], a: &Attributes) -> Result<(usize, Attributes)> {
        let (n, attr) = {
            self.parent_rtcp_reader.read(buf, a).await?
        };

        let mut b = &buf[..n];
        let pkts = rtcp::packet::unmarshal(&mut b)?;
        for p in &pkts {
            if let Some(nack) = p.as_any().downcast_ref::<TransportLayerNack>() {
                let nack = nack.clone();
                let streams = Arc::clone(&self.internal.streams);
                tokio::spawn(async move {
                    ResponderInternal::resend_packets(streams, nack).await;
                });
            }
        }

        Ok((n, attr))
    }
}

and

    async fn bind_rtcp_reader(
        &self,
        reader: Arc<dyn RTCPReader + Send + Sync>,
    ) -> Arc<dyn RTCPReader + Send + Sync> {
        Arc::new(ResponderRtcpReader {
            internal: Arc::clone(&self.internal),
            parent_rtcp_reader: reader
        }) as Arc<dyn RTCPReader + Send + Sync>
    }

I've made a similar change across all cases of bind_rtcp_reader in the current tree (and my own interceptor) and now I see a flow of traffic, I can call read_rtcp without the my code hanging and all my trace tells me that the right streams are always being read from.

If this looks 'right' to you, then I'll make the changes properly and send a PR.

@robashton, Thank you for debugging this issue. Yes, please send a PR to fix this.