Commit ac3bb3ac authored by Wim Taymans's avatar Wim Taymans

rtpjitterbuffer: handle large gaps with one lost event

When we have a large number of missing packets, generate one lost event for all
the packets that have no chance of being pushed out in time.
Fix and activate unit test for large gaps.
parent 26402e1c
...@@ -1721,35 +1721,64 @@ flushing: ...@@ -1721,35 +1721,64 @@ flushing:
} }
static void static GstFlowReturn
calculate_expected (GstRtpJitterBuffer * jitterbuffer, guint32 expected, calculate_expected (GstRtpJitterBuffer * jitterbuffer, guint32 expected,
guint16 seqnum, GstClockTime dts, gint gap) guint16 seqnum, GstClockTime dts, gint gap)
{ {
GstRtpJitterBufferPrivate *priv = jitterbuffer->priv; GstRtpJitterBufferPrivate *priv = jitterbuffer->priv;
GstClockTime duration, expected_dts; GstFlowReturn ret = GST_FLOW_OK;
GstClockTime total_duration, duration, expected_dts;
TimerType type; TimerType type;
GST_DEBUG_OBJECT (jitterbuffer, GST_DEBUG_OBJECT (jitterbuffer,
"dts %" GST_TIME_FORMAT ", last %" GST_TIME_FORMAT, "dts %" GST_TIME_FORMAT ", last %" GST_TIME_FORMAT,
GST_TIME_ARGS (dts), GST_TIME_ARGS (priv->last_in_dts)); GST_TIME_ARGS (dts), GST_TIME_ARGS (priv->last_in_dts));
/* interpolate between the current time and the last time based on /* the total duration spanned by the missing packets */
* number of packets we are missing, this is the estimated duration
* for the missing packet based on equidistant packet spacing. Also make
* sure we never go negative. */
if (dts >= priv->last_in_dts) if (dts >= priv->last_in_dts)
duration = (dts - priv->last_in_dts) / (gap + 1); total_duration = dts - priv->last_in_dts;
else else
/* packet already lost, timer will timeout quickly */ total_duration = 0;
duration = 0;
/* interpolate between the current time and the last time based on
* number of packets we are missing, this is the estimated duration
* for the missing packet based on equidistant packet spacing. */
duration = total_duration / (gap + 1);
GST_DEBUG_OBJECT (jitterbuffer, "duration %" GST_TIME_FORMAT, GST_DEBUG_OBJECT (jitterbuffer, "duration %" GST_TIME_FORMAT,
GST_TIME_ARGS (duration)); GST_TIME_ARGS (duration));
if (total_duration > priv->latency_ns) {
GstClockTime gap_time;
guint lost_packets;
gap_time = total_duration - priv->latency_ns;
if (duration > 0)
lost_packets = gap_time / duration;
else
lost_packets = gap;
/* too many lost packets, some of the missing packets are already
* too late and we can generate lost packet events for them. */
GST_DEBUG_OBJECT (jitterbuffer, "too many lost packets %" GST_TIME_FORMAT
" > %" GST_TIME_FORMAT ", consider %u lost",
GST_TIME_ARGS (total_duration), GST_TIME_ARGS (priv->latency_ns),
lost_packets);
ret =
send_lost_event (jitterbuffer, expected, lost_packets,
priv->last_in_dts + duration, gap_time, TRUE);
expected += lost_packets;
priv->last_in_dts += gap_time;
}
expected_dts = priv->last_in_dts + duration; expected_dts = priv->last_in_dts + duration;
if (priv->do_retransmission) { if (priv->do_retransmission) {
type = TIMER_TYPE_EXPECTED; type = TIMER_TYPE_EXPECTED;
/* if we had a timer for the first missing packet, leave it. */
if (find_timer (jitterbuffer, type, expected)) if (find_timer (jitterbuffer, type, expected))
expected++; expected++;
} else { } else {
...@@ -1761,6 +1790,7 @@ calculate_expected (GstRtpJitterBuffer * jitterbuffer, guint32 expected, ...@@ -1761,6 +1790,7 @@ calculate_expected (GstRtpJitterBuffer * jitterbuffer, guint32 expected,
expected_dts += duration; expected_dts += duration;
expected++; expected++;
} }
return ret;
} }
static GstFlowReturn static GstFlowReturn
...@@ -1884,7 +1914,10 @@ gst_rtp_jitter_buffer_chain (GstPad * pad, GstObject * parent, ...@@ -1884,7 +1914,10 @@ gst_rtp_jitter_buffer_chain (GstPad * pad, GstObject * parent,
} else { } else {
GST_DEBUG_OBJECT (jitterbuffer, "%d missing packets", gap); GST_DEBUG_OBJECT (jitterbuffer, "%d missing packets", gap);
/* fill in the gap with EXPECTED timers */ /* fill in the gap with EXPECTED timers */
calculate_expected (jitterbuffer, expected, seqnum, dts, gap); ret = calculate_expected (jitterbuffer, expected, seqnum, dts, gap);
if (ret != GST_FLOW_OK)
goto out_flushing;
do_next_seqnum = TRUE; do_next_seqnum = TRUE;
} }
} }
......
...@@ -25,6 +25,8 @@ ...@@ -25,6 +25,8 @@
#include <gst/check/gstcheck.h> #include <gst/check/gstcheck.h>
#include <gst/check/gsttestclock.h> #include <gst/check/gsttestclock.h>
#include <gst/rtp/gstrtpbuffer.h>
/* For ease of programming we use globals to keep refs for our floating /* For ease of programming we use globals to keep refs for our floating
* src and sink pads we create; otherwise we always have to do get_pad, * src and sink pads we create; otherwise we always have to do get_pad,
* get_peer, and then remove references in every test function */ * get_peer, and then remove references in every test function */
...@@ -323,7 +325,6 @@ GST_START_TEST (test_basetime) ...@@ -323,7 +325,6 @@ GST_START_TEST (test_basetime)
GST_END_TEST; GST_END_TEST;
#if 0
static const guint payload_size = 160; static const guint payload_size = 160;
static const guint clock_rate = 8000; static const guint clock_rate = 8000;
static const guint pcmu_payload_type = 0; static const guint pcmu_payload_type = 0;
...@@ -357,25 +358,30 @@ generate_test_buffer (GstClockTime gst_ts, ...@@ -357,25 +358,30 @@ generate_test_buffer (GstClockTime gst_ts,
GstBuffer *buf; GstBuffer *buf;
guint8 *payload; guint8 *payload;
guint i; guint i;
GstRTPBuffer rtp = GST_RTP_BUFFER_INIT;
buf = gst_rtp_buffer_new_allocate (payload_size, 0, 0); buf = gst_rtp_buffer_new_allocate (payload_size, 0, 0);
GST_BUFFER_TIMESTAMP (buf) = gst_ts; GST_BUFFER_TIMESTAMP (buf) = gst_ts;
GST_BUFFER_CAPS (buf) = generate_caps (); //GST_BUFFER_CAPS (buf) = generate_caps ();
gst_rtp_buffer_set_payload_type (buf, pcmu_payload_type);
gst_rtp_buffer_set_marker (buf, marker_bit); gst_rtp_buffer_map (buf, GST_MAP_READWRITE, &rtp);
gst_rtp_buffer_set_seq (buf, seq_num); gst_rtp_buffer_set_payload_type (&rtp, pcmu_payload_type);
gst_rtp_buffer_set_timestamp (buf, rtp_ts); gst_rtp_buffer_set_marker (&rtp, marker_bit);
gst_rtp_buffer_set_ssrc (buf, test_ssrc); gst_rtp_buffer_set_seq (&rtp, seq_num);
gst_rtp_buffer_set_timestamp (&rtp, rtp_ts);
payload = gst_rtp_buffer_get_payload (buf); gst_rtp_buffer_set_ssrc (&rtp, test_ssrc);
payload = gst_rtp_buffer_get_payload (&rtp);
for (i = 0; i < payload_size; i++) for (i = 0; i < payload_size; i++)
payload[i] = 0xff; payload[i] = 0xff;
gst_rtp_buffer_unmap (&rtp);
return buf; return buf;
} }
static GstFlowReturn static GstFlowReturn
test_sink_pad_chain_cb (GstPad * pad, GstBuffer * buffer) test_sink_pad_chain_cb (GstPad * pad, GstObject * parent, GstBuffer * buffer)
{ {
TestData *data = gst_pad_get_element_private (pad); TestData *data = gst_pad_get_element_private (pad);
g_async_queue_push (data->buf_queue, buffer); g_async_queue_push (data->buf_queue, buffer);
...@@ -383,10 +389,13 @@ test_sink_pad_chain_cb (GstPad * pad, GstBuffer * buffer) ...@@ -383,10 +389,13 @@ test_sink_pad_chain_cb (GstPad * pad, GstBuffer * buffer)
} }
static gboolean static gboolean
test_sink_pad_event_cb (GstPad * pad, GstEvent * event) test_sink_pad_event_cb (GstPad * pad, GstObject * parent, GstEvent * event)
{ {
TestData *data = gst_pad_get_element_private (pad); TestData *data = gst_pad_get_element_private (pad);
const GstStructure *structure = gst_event_get_structure (event); const GstStructure *structure = gst_event_get_structure (event);
GST_DEBUG ("got event %" GST_PTR_FORMAT, event);
if (strcmp (gst_structure_get_name (structure), "GstRTPPacketLost") == 0) if (strcmp (gst_structure_get_name (structure), "GstRTPPacketLost") == 0)
data->lost_event_count++; data->lost_event_count++;
...@@ -398,6 +407,8 @@ static void ...@@ -398,6 +407,8 @@ static void
setup_testharness (TestData * data) setup_testharness (TestData * data)
{ {
GstPad *jb_sink_pad, *jb_src_pad; GstPad *jb_sink_pad, *jb_src_pad;
GstSegment seg;
GstMiniObject *obj;
// create the testclock // create the testclock
data->clock = gst_test_clock_new (); data->clock = gst_test_clock_new ();
...@@ -405,7 +416,7 @@ setup_testharness (TestData * data) ...@@ -405,7 +416,7 @@ setup_testharness (TestData * data)
gst_test_clock_set_time (GST_TEST_CLOCK (data->clock), 0); gst_test_clock_set_time (GST_TEST_CLOCK (data->clock), 0);
// rig up the jitter buffer // rig up the jitter buffer
data->jitter_buffer = gst_element_factory_make ("gstrtpjitterbuffer", NULL); data->jitter_buffer = gst_element_factory_make ("rtpjitterbuffer", NULL);
g_assert (data->jitter_buffer); g_assert (data->jitter_buffer);
gst_element_set_clock (data->jitter_buffer, data->clock); gst_element_set_clock (data->jitter_buffer, data->clock);
g_object_set (data->jitter_buffer, "do-lost", TRUE, NULL); g_object_set (data->jitter_buffer, "do-lost", TRUE, NULL);
...@@ -414,8 +425,7 @@ setup_testharness (TestData * data) ...@@ -414,8 +425,7 @@ setup_testharness (TestData * data)
// link in the test source-pad // link in the test source-pad
data->test_src_pad = gst_pad_new ("src", GST_PAD_SRC); data->test_src_pad = gst_pad_new ("src", GST_PAD_SRC);
gst_pad_set_caps (data->test_src_pad, generate_caps ()); jb_sink_pad = gst_element_get_static_pad (data->jitter_buffer, "sink");
jb_sink_pad = gst_element_get_pad (data->jitter_buffer, "sink");
g_assert_cmpint (gst_pad_link (data->test_src_pad, jb_sink_pad), ==, g_assert_cmpint (gst_pad_link (data->test_src_pad, jb_sink_pad), ==,
GST_PAD_LINK_OK); GST_PAD_LINK_OK);
g_assert (gst_pad_set_active (data->test_src_pad, TRUE)); g_assert (gst_pad_set_active (data->test_src_pad, TRUE));
...@@ -426,7 +436,7 @@ setup_testharness (TestData * data) ...@@ -426,7 +436,7 @@ setup_testharness (TestData * data)
gst_pad_set_caps (data->test_sink_pad, generate_caps ()); gst_pad_set_caps (data->test_sink_pad, generate_caps ());
gst_pad_set_chain_function (data->test_sink_pad, test_sink_pad_chain_cb); gst_pad_set_chain_function (data->test_sink_pad, test_sink_pad_chain_cb);
gst_pad_set_event_function (data->test_sink_pad, test_sink_pad_event_cb); gst_pad_set_event_function (data->test_sink_pad, test_sink_pad_event_cb);
jb_src_pad = gst_element_get_pad (data->jitter_buffer, "src"); jb_src_pad = gst_element_get_static_pad (data->jitter_buffer, "src");
g_assert_cmpint (gst_pad_link (jb_src_pad, data->test_sink_pad), ==, g_assert_cmpint (gst_pad_link (jb_src_pad, data->test_sink_pad), ==,
GST_PAD_LINK_OK); GST_PAD_LINK_OK);
g_assert (gst_pad_set_active (data->test_sink_pad, TRUE)); g_assert (gst_pad_set_active (data->test_sink_pad, TRUE));
...@@ -440,6 +450,16 @@ setup_testharness (TestData * data) ...@@ -440,6 +450,16 @@ setup_testharness (TestData * data)
data->lost_event_count = 0; data->lost_event_count = 0;
gst_pad_set_element_private (data->test_sink_pad, data); gst_pad_set_element_private (data->test_sink_pad, data);
gst_segment_init (&seg, GST_FORMAT_TIME);
gst_pad_push_event (data->test_src_pad,
gst_event_new_stream_start ("stream0"));
gst_pad_set_caps (data->test_src_pad, generate_caps ());
gst_pad_push_event (data->test_src_pad, gst_event_new_segment (&seg));
while ((obj = g_async_queue_try_pop (data->event_queue)))
gst_mini_object_unref (obj);
} }
static void static void
...@@ -501,12 +521,13 @@ verify_lost_event (GstEvent * event, guint32 expected_seqnum, ...@@ -501,12 +521,13 @@ verify_lost_event (GstEvent * event, guint32 expected_seqnum,
GST_START_TEST (test_only_one_lost_event_on_large_gaps) GST_START_TEST (test_only_one_lost_event_on_large_gaps)
{ {
TestData data; TestData data;
GstTestClockPendingID id; GstClockID id, test_id;
guint64 timeout; guint64 timeout;
GstBuffer *in_buf, *out_buf; GstBuffer *in_buf, *out_buf;
GstEvent *out_event; GstEvent *out_event;
gint jb_latency_ms = 200; gint jb_latency_ms = 200;
guint buffer_size_ms = (payload_size * 1000) / clock_rate; guint buffer_size_ms = (payload_size * 1000) / clock_rate;
GstRTPBuffer rtp = GST_RTP_BUFFER_INIT;
setup_testharness (&data); setup_testharness (&data);
timeout = 20 * G_USEC_PER_SEC; timeout = 20 * G_USEC_PER_SEC;
...@@ -519,14 +540,13 @@ GST_START_TEST (test_only_one_lost_event_on_large_gaps) ...@@ -519,14 +540,13 @@ GST_START_TEST (test_only_one_lost_event_on_large_gaps)
g_assert_cmpint (gst_pad_push (data.test_src_pad, in_buf), ==, GST_FLOW_OK); g_assert_cmpint (gst_pad_push (data.test_src_pad, in_buf), ==, GST_FLOW_OK);
// wait for the first buffer to be synced to timestamp + latency // wait for the first buffer to be synced to timestamp + latency
g_assert (gst_test_clock_wait_for_next_pending_id (GST_TEST_CLOCK gst_test_clock_wait_for_next_pending_id (GST_TEST_CLOCK (data.clock), &id);
(data.clock), &id));
// increase the time to timestamp + latency and release the wait // increase the time to timestamp + latency and release the wait
gst_test_clock_set_time (GST_TEST_CLOCK (data.clock), gst_test_clock_set_time (GST_TEST_CLOCK (data.clock),
jb_latency_ms * GST_MSECOND); jb_latency_ms * GST_MSECOND);
g_assert (gst_test_clock_process_next_clock_id (GST_TEST_CLOCK (data.clock)) g_assert (gst_test_clock_process_next_clock_id (GST_TEST_CLOCK (data.clock))
== id.clock_id); == id);
// check for the buffer coming out that was pushed in // check for the buffer coming out that was pushed in
out_buf = g_async_queue_timeout_pop (data.buf_queue, timeout); out_buf = g_async_queue_timeout_pop (data.buf_queue, timeout);
...@@ -550,25 +570,27 @@ GST_START_TEST (test_only_one_lost_event_on_large_gaps) ...@@ -550,25 +570,27 @@ GST_START_TEST (test_only_one_lost_event_on_large_gaps)
g_assert_cmpint (gst_pad_push (data.test_src_pad, in_buf), ==, GST_FLOW_OK); g_assert_cmpint (gst_pad_push (data.test_src_pad, in_buf), ==, GST_FLOW_OK);
// release the wait // release the wait
g_assert (gst_test_clock_wait_for_next_pending_id (GST_TEST_CLOCK GST_DEBUG ("wait for id");
(data.clock), &id)); gst_test_clock_wait_for_next_pending_id (GST_TEST_CLOCK (data.clock), &id);
g_assert (gst_test_clock_process_next_clock_id (GST_TEST_CLOCK (data.clock)) GST_DEBUG ("got wait id %p", id);
== id.clock_id); gst_test_clock_advance_time (GST_TEST_CLOCK (data.clock), GST_MSECOND * 20);
test_id = gst_test_clock_process_next_clock_id (GST_TEST_CLOCK (data.clock));
GST_DEBUG ("process id %p", test_id);
g_assert (id == test_id);
// we should now receive a packet-lost-event for buffers 1 through 489 // we should now receive a packet-lost-event for buffers 1 through 489
out_event = g_async_queue_timeout_pop (data.event_queue, timeout); out_event = g_async_queue_timeout_pop (data.event_queue, timeout);
g_assert (out_event != NULL); g_assert (out_event != NULL);
g_assert_cmpint (data.lost_event_count, ==, 1); g_assert_cmpint (data.lost_event_count, ==, 1);
verify_lost_event (out_event, 1, 1 * GST_MSECOND * 20, GST_MSECOND * 20 * 489, verify_lost_event (out_event, 1, 1 * GST_MSECOND * 20, GST_MSECOND * 20 * 490,
TRUE); TRUE);
// churn through sync_times until the new buffer gets pushed out // churn through sync_times until the new buffer gets pushed out
while (g_async_queue_length (data.buf_queue) < 1) { while (g_async_queue_length (data.buf_queue) < 1) {
if (gst_test_clock_peek_next_pending_id (GST_TEST_CLOCK (data.clock), &id)) { if (gst_test_clock_peek_next_pending_id (GST_TEST_CLOCK (data.clock), &id)) {
if (id.time > gst_clock_get_time (data.clock)) { GstClockTime t = gst_clock_id_get_time (id);
gst_test_clock_set_time (GST_TEST_CLOCK (data.clock), id.time); if (t > gst_clock_get_time (data.clock)) {
g_print ("setting time to %" GST_TIME_FORMAT "\n", gst_test_clock_set_time (GST_TEST_CLOCK (data.clock), t);
GST_TIME_ARGS (id.time));
} }
gst_test_clock_process_next_clock_id (GST_TEST_CLOCK (data.clock)); gst_test_clock_process_next_clock_id (GST_TEST_CLOCK (data.clock));
} }
...@@ -577,19 +599,21 @@ GST_START_TEST (test_only_one_lost_event_on_large_gaps) ...@@ -577,19 +599,21 @@ GST_START_TEST (test_only_one_lost_event_on_large_gaps)
out_buf = g_async_queue_timeout_pop (data.buf_queue, timeout); out_buf = g_async_queue_timeout_pop (data.buf_queue, timeout);
g_assert (out_buf != NULL); g_assert (out_buf != NULL);
g_assert (GST_BUFFER_FLAG_IS_SET (out_buf, GST_BUFFER_FLAG_DISCONT)); g_assert (GST_BUFFER_FLAG_IS_SET (out_buf, GST_BUFFER_FLAG_DISCONT));
g_assert_cmpint (gst_rtp_buffer_get_seq (out_buf), ==, 500); gst_rtp_buffer_map (out_buf, GST_MAP_READ, &rtp);
g_assert_cmpint (gst_rtp_buffer_get_seq (&rtp), ==, 500);
gst_rtp_buffer_unmap (&rtp);
g_assert_cmpint (GST_BUFFER_TIMESTAMP (out_buf), ==, (10 * GST_SECOND)); g_assert_cmpint (GST_BUFFER_TIMESTAMP (out_buf), ==, (10 * GST_SECOND));
// we get as many lost events as the the number of buffers the jitterbuffer // we get as many lost events as the the number of buffers the jitterbuffer
// is able to wait for (+ the one we already got) // is able to wait for (+ the one we already got)
g_assert_cmpint (data.lost_event_count, ==, g_assert_cmpint (data.lost_event_count, ==, jb_latency_ms / buffer_size_ms);
jb_latency_ms / buffer_size_ms + 1);
destroy_testharness (&data); destroy_testharness (&data);
} }
GST_END_TEST; GST_END_TEST;
#if 0
GST_START_TEST (test_two_lost_one_arrives_in_time) GST_START_TEST (test_two_lost_one_arrives_in_time)
{ {
TestData data; TestData data;
...@@ -840,8 +864,8 @@ rtpjitterbuffer_suite (void) ...@@ -840,8 +864,8 @@ rtpjitterbuffer_suite (void)
tcase_add_test (tc_chain, test_push_backward_seq); tcase_add_test (tc_chain, test_push_backward_seq);
tcase_add_test (tc_chain, test_push_unordered); tcase_add_test (tc_chain, test_push_unordered);
tcase_add_test (tc_chain, test_basetime); tcase_add_test (tc_chain, test_basetime);
#if 0
tcase_add_test (tc_chain, test_only_one_lost_event_on_large_gaps); tcase_add_test (tc_chain, test_only_one_lost_event_on_large_gaps);
#if 0
tcase_add_test (tc_chain, test_two_lost_one_arrives_in_time); tcase_add_test (tc_chain, test_two_lost_one_arrives_in_time);
tcase_add_test (tc_chain, test_late_packets_still_makes_lost_events); tcase_add_test (tc_chain, test_late_packets_still_makes_lost_events);
tcase_add_test (tc_chain, test_all_packets_are_timestamped_zero); tcase_add_test (tc_chain, test_all_packets_are_timestamped_zero);
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment