Commit f7277db9 authored by George Kiagiadakis's avatar George Kiagiadakis Committed by Wim Taymans

tests/check: Add rtprtx::test_rtxsender_packet_retention

This unit test verifies that the rtxsend element correctly maintains
a buffer of already transmitted rtp packets and that it can
re-transmit all of them correctly on demand. It also verifies
that the limit of this buffer (max-size-packets property) is respected.
parent 71bdb5e0
......@@ -1058,19 +1058,248 @@ GST_START_TEST (test_drop_multiple_sender)
struct GenerateTestBuffersData
GstElement *src, *payloader, *sink;
GMutex mutex;
GCond cond;
GList *buffers;
gint num_buffers;
guint last_seqnum;
static void
fakesink_handoff (GstElement * sink, GstBuffer * buf, GstPad * pad,
gpointer user_data)
struct GenerateTestBuffersData *data = user_data;
g_mutex_lock (&data->mutex);
if (data->num_buffers > 0)
data->buffers = g_list_append (data->buffers, gst_buffer_ref (buf));
/* if we have collected enough buffers, unblock the main thread to stop */
if (--data->num_buffers <= 0)
g_cond_signal (&data->cond);
if (data->num_buffers == 0)
g_object_get (data->payloader, "seqnum", &data->last_seqnum, NULL);
g_mutex_unlock (&data->mutex);
static GList *
generate_test_buffers (const gint num_buffers, guint ssrc, guint * payload_type)
GstElement *bin;
gboolean res;
struct GenerateTestBuffersData data;
fail_unless (num_buffers > 0);
g_mutex_init (&data.mutex);
g_cond_init (&data.cond);
data.buffers = NULL;
data.num_buffers = num_buffers;
bin = gst_pipeline_new (NULL);
data.src = gst_element_factory_make ("videotestsrc", NULL);
data.payloader = gst_element_factory_make ("rtpvrawpay", NULL);
data.sink = gst_element_factory_make ("fakesink", NULL);
g_object_set (data.payloader, "seqnum-offset", 1, "ssrc", ssrc, NULL);
g_object_set (data.sink, "signal-handoffs", TRUE, NULL);
g_signal_connect (data.sink, "handoff", (GCallback) fakesink_handoff, &data);
gst_bin_add_many (GST_BIN (bin), data.src, data.payloader, data.sink, NULL);
res = gst_element_link_many (data.src, data.payloader, data.sink, NULL);
fail_unless_equals_int (res, TRUE);
g_mutex_lock (&data.mutex);
while (data.num_buffers > 0)
g_cond_wait (&data.cond, &data.mutex);
g_mutex_unlock (&data.mutex);
g_object_get (data.payloader, "pt", payload_type, NULL);
fail_unless_equals_int (g_list_length (data.buffers), num_buffers);
fail_unless_equals_int (num_buffers, data.last_seqnum);
g_mutex_clear (&data.mutex);
g_cond_clear (&data.cond);
gst_object_unref (bin);
return data.buffers;
static GstEvent *
create_rtx_event (guint seqnum, guint ssrc, guint payload_type)
return gst_event_new_custom (GST_EVENT_CUSTOM_UPSTREAM,
gst_structure_new ("GstRTPRetransmissionRequest",
"seqnum", G_TYPE_UINT, seqnum,
"ssrc", G_TYPE_UINT, ssrc,
"payload-type", G_TYPE_UINT, payload_type, NULL));
GST_START_TEST (test_rtxsender_packet_retention)
const gint num_buffers = 10;
const gint half_buffers = num_buffers / 2;
const guint ssrc = 1234567;
const guint rtx_payload_type = 99;
GList *in_buffers, *node;
guint payload_type;
GstElement *rtxsend;
GstPad *srcpad, *sinkpad;
GstCaps *caps;
gint i, j;
gboolean res;
/* generate test data */
in_buffers = generate_test_buffers (num_buffers, ssrc, &payload_type);
/* clear the global buffers list, which we are going to use later */
gst_check_drop_buffers ();
/* setup element & pads */
rtxsend = gst_check_setup_element ("rtprtxsend");
g_object_set (rtxsend, "max-size-packets", half_buffers,
"rtx-payload-type", rtx_payload_type, NULL);
srcpad = gst_check_setup_src_pad (rtxsend, &srctemplate);
fail_unless_equals_int (gst_pad_set_active (srcpad, TRUE), TRUE);
sinkpad = gst_check_setup_sink_pad (rtxsend, &sinktemplate);
fail_unless_equals_int (gst_pad_set_active (sinkpad, TRUE), TRUE);
caps = gst_caps_from_string ("application/x-rtp, "
"media = (string)video, payload = (int)96, "
"clock-rate = (int)90000, encoding-name = (string)RAW");
gst_check_setup_events (srcpad, rtxsend, caps, GST_FORMAT_TIME);
gst_caps_unref (caps);
/* now push all buffers and request retransmission every time for all of them */
node = in_buffers;
for (i = 1; i <= num_buffers; i++) {
GstBuffer *buffer = GST_BUFFER (node->data);
/* verify that the original packets are correct */
res = gst_rtp_buffer_map (buffer, GST_MAP_READ, &rtp);
fail_unless_equals_int (res, TRUE);
fail_unless_equals_int (gst_rtp_buffer_get_ssrc (&rtp), ssrc);
fail_unless_equals_int (gst_rtp_buffer_get_payload_type (&rtp),
fail_unless_equals_int (gst_rtp_buffer_get_seq (&rtp), i);
gst_rtp_buffer_unmap (&rtp);
/* retransmit all the previous ones */
for (j = 1; j < i; j++) {
res = gst_pad_push_event (sinkpad,
create_rtx_event (j, ssrc, payload_type));
fail_unless_equals_int (res, TRUE);
/* push this one, triggering the retransmit in rtxsend's chain() function */
gst_pad_push (srcpad, gst_buffer_ref (buffer));
node = g_list_next (node);
/* verify the result. buffers should be in this order (numbers are seqnums):
* 1, 1rtx, 2, 2rtx, 1rtx, 3, ... , 9, 9rtx, 8rtx, 7rtx, 6rtx, 5rtx, 10 */
GstRTPBuffer orig_rtp = GST_RTP_BUFFER_INIT;
gint expected_rtx_requests, expected_rtx_packets;
gint real_rtx_requests, real_rtx_packets;
/* verify statistics first */
expected_rtx_packets = half_buffers * half_buffers +
((half_buffers - 1) / 2.0f) * half_buffers;
for (i = 1, expected_rtx_requests = 0; i < num_buffers; i++)
expected_rtx_requests += i;
g_object_get (rtxsend, "num-rtx-requests", &real_rtx_requests,
"num-rtx-packets", &real_rtx_packets, NULL);
fail_unless_equals_int (expected_rtx_requests, real_rtx_requests);
fail_unless_equals_int (expected_rtx_packets, real_rtx_packets);
/* and the number of actual buffers that we were pushed out of rtxsend */
fail_unless_equals_int (g_list_length (buffers),
num_buffers + expected_rtx_packets);
node = buffers;
for (i = 1; i <= num_buffers; i++) {
/* verify the normal rtp flow packet */
res = gst_rtp_buffer_map (GST_BUFFER (node->data), GST_MAP_READ, &rtp);
fail_unless_equals_int (res, TRUE);
fail_unless_equals_int (gst_rtp_buffer_get_ssrc (&rtp), ssrc);
fail_unless_equals_int (gst_rtp_buffer_get_payload_type (&rtp),
fail_unless_equals_int (gst_rtp_buffer_get_seq (&rtp), i);
gst_rtp_buffer_unmap (&rtp);
node = g_list_next (node);
/* there are no rtx packets after the last normal one */
if (i == num_buffers)
/* now verify the retransmission packets */
for (j = i; j > MAX (i - half_buffers, 0); j--) {
GST_INFO ("checking %d, %d", i, j);
res = gst_rtp_buffer_map (GST_BUFFER (node->data), GST_MAP_READ, &rtp);
fail_unless_equals_int (res, TRUE);
fail_if (gst_rtp_buffer_get_ssrc (&rtp) == ssrc);
fail_unless_equals_int (gst_rtp_buffer_get_payload_type (&rtp),
fail_unless_equals_int (GST_READ_UINT16_BE (gst_rtp_buffer_get_payload (&rtp)), j); /* j == rtx seqnum */
/* open the original packet for this rtx packet and verify timestamps */
res = gst_rtp_buffer_map (GST_BUFFER (g_list_nth_data (in_buffers, j)),
GST_MAP_READ, &orig_rtp);
fail_unless_equals_int (res, TRUE);
fail_unless_equals_int (gst_rtp_buffer_get_timestamp (&orig_rtp),
gst_rtp_buffer_get_timestamp (&rtp));
gst_rtp_buffer_unmap (&orig_rtp);
gst_rtp_buffer_unmap (&rtp);
node = g_list_next (node);
g_list_free_full (in_buffers, (GDestroyNotify) gst_buffer_unref);
gst_check_drop_buffers ();
gst_check_teardown_src_pad (rtxsend);
gst_check_teardown_sink_pad (rtxsend);
gst_check_teardown_element (rtxsend);
static Suite *
rtprtx_suite (void)
Suite *s = suite_create ("rtprtx");
TCase *tc_chain = tcase_create ("general");
tcase_set_timeout (tc_chain, 10000);
tcase_set_timeout (tc_chain, 10);
suite_add_tcase (s, tc_chain);
tcase_add_test (tc_chain, test_push_forward_seq);
tcase_add_test (tc_chain, test_drop_one_sender);
tcase_add_test (tc_chain, test_drop_multiple_sender);
tcase_add_test (tc_chain, test_rtxsender_packet_retention);
return s;
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment