webrtc-rs/webrtc

ReadRTCP doesn't work

Closed this issue · 8 comments

Digging into how/why reading RTCP results in the data stream being corrupt.

I've

  • Confirmed that in Pion, the code that I am trying to write works
  • Isolated the issue to a simple repro
  • Got suspicions that the issue is caused by a shared lock between RTP and RTCP, meaning the RTP only gets read when an RTCP packet comes in.

I'll dig deeper, and amend this issue as I do.

Discussed in #150

Originally posted by robashton January 13, 2022
As mentioned over on the Interceptor repo, webrtc-rs/interceptor#2 and indeed the subsequent pull request webrtc-rs/interceptor#3, I'm wanting to get hold of the RTCP (Specifically, the sender reports) being sent by the web browser so I can do some A/V sync downstream of the rust.

The documentation for Pion suggests that we should use the receiver passed to us in OnTrack to perform this function, as this will run any interceptors and give you the final RTCP. (Hence how I discovered the issues there). ( https://github.com/pion/webrtc/blob/master/examples/rtcp-processing/main.go#L31 )

Indeed, on ingest - without the call to read_rtcp, those interceptors never get executed and presumably the rtcp being sent to us is just dropped on the floor (as far as I can tell). If I call read_rtcp, it breaks the actual rtp somehow, even if I do nothing with the results.

Sticking in the following code to the on_track code in the save-to-disk example means the interceptors get ran and I get some RTCP (including the sender reports in that println), but means the data stream breaks.

                    tokio::spawn(async move {
                        loop{
                            tokio::select! {
                                result = receiver.read_rtcp() => { 
                                    println!("RTCP {:?}". result);
                                }
                            }
                        }
                    });

wnat am I missing? How are we supposed to make sure the interceptors get executed on ingest and things like sender reports are made available to us?

I think I have a handle on this, I'd like to propose a couple of changes (Happy to send as a PR, but would like some feedback first).

Consider the following go

func (r *RTPReceiver) Read(b []byte) (n int, a interceptor.Attributes, err error) {
	select {
	case <-r.received:
		return r.tracks[0].rtcpInterceptor.Read(b, a)
	case <-r.closed:
		return 0, nil, io.ErrClosedPipe
	}
}

I am not a Go programmer, and what I know about Go I've learned in the last couple of days whilst reading Pion and putting things into an online runner, but all of those <-r.received calls are effectively doing is using the channel to signal that 'some data was received, so please perform this action', there could be multiple of these effectively queued up in different coroutines, and there is no guarantee (as far as I can tell) about who will actually receive data from this channel. Effectively it's a "take your turn and don't endlessly spin" guard of some sort.

Consider the following rust

    async fn read(&self, b: &mut [u8], rid: &str) -> Result<(usize, Attributes)> {
        let mut received_rx = self.received_rx.lock().await;

        tokio::select! {
            _ = received_rx.recv() =>{
                // do something that might take some time because there isn't actually the right sort of data yet
             }
            _ = self.closed_rx.notified() => {
                Err(Error::ErrClosedPipe)
            }
        }
    }

Effectively, we have multiple entrants into this code, we have the ReadRTCP in our outside loop, and we have ReadRTP being called by Track. ReadRTCP is going to take that lock on the channel (which to read from it, it should - this is correct, channels are not meant to be shared after all), but then it is going to hold onto that lock until some data flows through. Effectively this means blocking the ReadRTP call.

Changing this code to something like

    async fn read(&self, b: &mut [u8], rid: &str) -> Result<(usize, Attributes)> {
        {
          let mut received_rx = self.received_rx.lock().await;
          tokio::select! {
              _ = received_rx.recv() =>{}
              _ = self.closed_rx.notified() => {
                  return Err(Error::ErrClosedPipe);
              }
          }
      }                  
      // do something that might take some time because there isn't actually the right sort of data yet
    }

makes it more (morally) equivalent to the original Go (as far as I understand it - which is to say, not a lot).

Now, that isn't everything, I note that internal.tracks is held inside a Mutex, this has the same problems as again, we're requesting a lock for the duration of the operation. If we read the original Go, we see that actually we don't bother doing this - presumably Go has some copy-only semantics around access to this sort of shared state (or the original authors don't care, I don't know), but there is a RwLock created for controlling write access to that shared state.

We can do the same in the Rust (and indeed I have in my local code), instead of

tracks: Mutex<Vec<TrackStreams>>,

We can have

tracks: RwLock<Vec<TrackStreams>>,

For write operations we can take out a write lock, for most operations we can take out a read lock - and Rust will complain if we do the wrong thing so that's all safe.

With these two changes made in the specific code path I'm looking at, I see

Read some RTCP
Read some RTP
Read some RTP
Read some RTP
Read some RTP
Read some RTP
Read some RTP
Read some RTP
Read some RTP
Read some RTP
Read some RTP
Read some RTP
Read some RTP
Read some RTP
Read some RTP
Read some RTP

In my debug output, and video and audio can now be synchronised using the RTCP Sender Report.

If this looks good, I'll go ahead and make the change properly across the affected files and send a pull request.

Only a couple of instances anyway, leave it here for you!

@robashton, with the fix( #152), did you see any other issues for read_rtp/rtcp? If no other issues, I think it is worthy to release a new version to include recent fixes you have done. What do you think? Thanks.

As far as I know, this makes it all work properly.

What I can say with some degree of confidence, is that nobody has code that uses RTCP because beforehand there is no way it would have worked, so a release wouldn't break that.

I'll have this integrated into a pretty full workflow (at work) shortly, I just need to feed that timing information further downstream and validate the results - if I see lipsync then we can be reasonably confident I've not broken anything, and indeed have fixed the original issue!

It looks like this does indeed work (I've got a lovely lip-synced MP4 sat in front of me).

I've got a few lines of trace out of the h264 parser that causes me a little bit of concern as it indicates I'm losing a few packets, this might be as a result of my code or my usage of Webrtc.rs, or Webrtc.rs so I'll dig into that tomorrow - unless anybody is screaming out for RTCP support then I'd suggest leaving it for a few days while I

  • Verify that what I have works properly and completely
  • Document 'the blessed mechanism' by which we expect all of this to hang together/work

Cool.

Sure, will wait.

Rightio, I've tested this out properly and it does indeed work. I had some worries that something wasn't cancellable but that doesn't seem to be the case. A tagged release would make me happy, as I can update our repos then.

Cheers

@robashton, webrtc crate v0.4.0 was just released.