From 7385e070ece31a14155ca75ede3581aaf5621a53 Mon Sep 17 00:00:00 2001 From: Mathieu Duponchelle Date: Thu, 12 May 2022 01:09:58 +0200 Subject: [PATCH] ts-jitterbuffer: improve scheduling of lost events When we have detected that packets are equidistant and have determined a packet spacing, we can schedule lost events "on time" instead of pushing them at the same time as the next received packet. --- generic/threadshare/src/jitterbuffer/imp.rs | 171 ++++++++++++++++---- 1 file changed, 144 insertions(+), 27 deletions(-) diff --git a/generic/threadshare/src/jitterbuffer/imp.rs b/generic/threadshare/src/jitterbuffer/imp.rs index 312eb852..d5bb7ed8 100644 --- a/generic/threadshare/src/jitterbuffer/imp.rs +++ b/generic/threadshare/src/jitterbuffer/imp.rs @@ -525,15 +525,15 @@ impl SinkHandler { let jb = element.imp(); let mut state = jb.state.lock().unwrap(); - let (latency, context_wait) = { + let (latency, context_wait, do_lost) = { let settings = jb.settings.lock().unwrap(); - (settings.latency, settings.context_wait) + (settings.latency, settings.context_wait, settings.do_lost) }; // Reschedule if needed let (_, next_wakeup) = jb.src_pad_handler - .next_wakeup(element, &state, latency, context_wait); + .next_wakeup(element, &state, do_lost, latency, context_wait); if let Some((next_wakeup, _)) = next_wakeup { if let Some((previous_next_wakeup, ref abort_handle)) = state.wait_handle { if previous_next_wakeup.is_none() @@ -840,10 +840,51 @@ impl SrcHandler { jb.src_pad.push(buffer).await } + // If there is a gap between the next seqnum we must output and the seqnum of + // the earliest item currently stored, we may want to wake up earlier in order + // to push the corresponding lost event, provided we are reasonably sure that + // packets are equidistant and we have calculated a packet spacing. + fn next_lost_wakeup( + &self, + state: &State, + do_lost: bool, + latency: gst::ClockTime, + context_wait: gst::ClockTime, + ) -> Option { + if do_lost && state.equidistant > 3 && !state.packet_spacing.is_zero() { + if let Some(gap) = state + .last_popped_seqnum + .zip(state.earliest_seqnum) + .map(|(last, earliest)| gst_rtp::compare_seqnum(last, earliest)) + { + if gap > 1 { + if let Some(last_popped_pts) = state.last_popped_pts { + return Some(last_popped_pts + latency - context_wait / 2); + } + } + } + } + + None + } + + // The time we should wake up at in order to push our earliest item on time + fn next_packet_wakeup( + &self, + state: &State, + latency: gst::ClockTime, + context_wait: gst::ClockTime, + ) -> Option { + state + .earliest_pts + .map(|earliest_pts| earliest_pts + latency - state.packet_spacing - context_wait / 2) + } + fn next_wakeup( &self, element: &super::JitterBuffer, state: &State, + do_lost: bool, latency: gst::ClockTime, context_wait: gst::ClockTime, ) -> ( @@ -868,27 +909,26 @@ impl SrcHandler { return (now, Some((now, Duration::ZERO))); } - if state.earliest_pts.is_none() { - return (now, None); - } - - let next_wakeup = state - .earliest_pts - .map(|earliest_pts| earliest_pts + latency - state.packet_spacing - context_wait / 2); + if let Some(next_wakeup) = self + .next_lost_wakeup(state, do_lost, latency, context_wait) + .or_else(|| self.next_packet_wakeup(state, latency, context_wait)) + { + let delay = next_wakeup + .opt_saturating_sub(now) + .unwrap_or(gst::ClockTime::ZERO); - let delay = next_wakeup - .opt_saturating_sub(now) - .unwrap_or(gst::ClockTime::ZERO); - - gst::debug!( - CAT, - obj: element, - "Next wakeup at {} with delay {}", - next_wakeup.display(), - delay - ); + gst::debug!( + CAT, + obj: element, + "Next wakeup at {} with delay {}", + next_wakeup.display(), + delay + ); - (now, Some((next_wakeup, delay.into()))) + (now, Some((Some(next_wakeup), delay.into()))) + } else { + (now, None) + } } } @@ -1090,9 +1130,9 @@ impl TaskImpl for JitterBufferTask { fn iterate(&mut self) -> BoxFuture<'_, Result<(), gst::FlowError>> { async move { let jb = self.element.imp(); - let (latency, context_wait) = { + let (latency, context_wait, do_lost) = { let settings = jb.settings.lock().unwrap(); - (settings.latency, settings.context_wait) + (settings.latency, settings.context_wait, settings.do_lost) }; loop { @@ -1101,6 +1141,7 @@ impl TaskImpl for JitterBufferTask { let (_, next_wakeup) = self.src_pad_handler.next_wakeup( &self.element, &state, + do_lost, latency, context_wait, ); @@ -1138,13 +1179,14 @@ impl TaskImpl for JitterBufferTask { } } - let (head_pts, head_seq) = { - let state = jb.state.lock().unwrap(); + let (head_pts, head_seq, lost_events) = { + let mut state = jb.state.lock().unwrap(); // // Check earliest PTS as we have just taken the lock let (now, next_wakeup) = self.src_pad_handler.next_wakeup( &self.element, &state, + do_lost, latency, context_wait, ); @@ -1167,10 +1209,84 @@ impl TaskImpl for JitterBufferTask { } let (head_pts, head_seq) = state.jbuf.peek(); + let mut events = vec![]; + + // We may have woken up in order to push lost events on time + // (see next_packet_wakeup()) + if do_lost && state.equidistant > 3 && !state.packet_spacing.is_zero() { + loop { + if let Some((lost_seq, lost_pts)) = state.last_popped_seqnum + .zip(state.earliest_seqnum) + .and_then(|(last, earliest)| { + if let Some(last_popped_pts) = state.last_popped_pts { + let next = last.wrapping_add(1); + if next != earliest && (last_popped_pts + latency - context_wait / 2).opt_lt(now).unwrap_or(false) { + Some((next, last_popped_pts + state.packet_spacing)) + } else { + None + } + } else { + None + } + }) { + let s = gst::Structure::builder("GstRTPPacketLost") + .field("seqnum", lost_seq as u32) + .field("timestamp", lost_pts) + .field("duration", state.packet_spacing) + .field("retry", 0) + .build(); + + events.push(gst::event::CustomDownstream::new(s)); + state.stats.num_lost += 1; + state.last_popped_pts = lost_pts.opt_min(state.earliest_pts).or(Some(lost_pts)); + state.last_popped_seqnum = Some(lost_seq); + } else { + break; + } + } - (head_pts, head_seq) + } + + (head_pts, head_seq, events) }; + { + // Push any lost events we may have woken up to push on schedule + for event in lost_events { + gst::debug!(CAT, obj: jb.src_pad.gst_pad(), "Pushing lost event {:?}", event); + let _ = jb.src_pad.push_event(event).await; + } + + let state = jb.state.lock().unwrap(); + // + // Now recheck earliest PTS as we have just retaken the lock and may + // have advanced last_popped_* fields + let (now, next_wakeup) = self.src_pad_handler.next_wakeup( + &self.element, + &state, + do_lost, + latency, + context_wait, + ); + + gst::debug!( + CAT, + obj: &self.element, + "Woke up at {}, earliest_pts {}", + now.display(), + state.earliest_pts.display() + ); + + if let Some((next_wakeup, _)) = next_wakeup { + if next_wakeup.opt_gt(now).unwrap_or(false) { + // Reschedule and wait a bit longer in the next iteration + return Ok(()); + } + } else { + return Ok(()); + } + } + let res = self.src_pad_handler.pop_and_push(&self.element).await; { @@ -1190,6 +1306,7 @@ impl TaskImpl for JitterBufferTask { let (now, next_wakeup) = self.src_pad_handler.next_wakeup( &self.element, &state, + do_lost, latency, context_wait, ); -- GitLab