From c596bdda38305f3e58a315d30c82d2549bef59aa Mon Sep 17 00:00:00 2001 From: Nicolas Dufresne Date: Tue, 27 Nov 2018 17:10:57 -0500 Subject: [PATCH 1/3] test: rtpssrcdemux: Test event forwarding This the first unit test of this element. It adds a test that verify that events are forwarded correctly. --- tests/check/Makefile.am | 1 + tests/check/elements/rtpssrcdemux.c | 189 ++++++++++++++++++++++++++++ tests/check/meson.build | 1 + 3 files changed, 191 insertions(+) create mode 100644 tests/check/elements/rtpssrcdemux.c diff --git a/tests/check/Makefile.am b/tests/check/Makefile.am index 5d9e74b7f..87b3ca2b6 100644 --- a/tests/check/Makefile.am +++ b/tests/check/Makefile.am @@ -256,6 +256,7 @@ check_rtpmanager = \ elements/rtpred \ elements/rtpulpfec \ elements/rtpfunnel + elements/rtpssrcdemux \ else check_rtpmanager = endif diff --git a/tests/check/elements/rtpssrcdemux.c b/tests/check/elements/rtpssrcdemux.c new file mode 100644 index 000000000..a8b093b1c --- /dev/null +++ b/tests/check/elements/rtpssrcdemux.c @@ -0,0 +1,189 @@ +/* GStreamer + * + * Copyright (C) 2018 Collabora Ltd. + * Author: Nicolas Dufresne + * + * This library is free software; you can redistribute it and/or + * modify it under the terms of the GNU Library General Public + * License as published by the Free Software Foundation; either + * version 2 of the License, or (at your option) any later version. + * + * This library is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU + * Library General Public License for more details. + * + * You should have received a copy of the GNU Library General Public + * License along with this library; if not, write to the + * Free Software Foundation, Inc., 51 Franklin St, Fifth Floor, + * Boston, MA 02110-1301, USA. + */ +#include +#include +#include + +#define TEST_BUF_CLOCK_RATE 8000 +#define TEST_BUF_PT 0 +#define TEST_BUF_SSRC 0x01BADBAD +#define TEST_BUF_MS 20 +#define TEST_BUF_DURATION (TEST_BUF_MS * GST_MSECOND) +#define TEST_BUF_SIZE (64000 * TEST_BUF_MS / 1000) +#define TEST_RTP_TS_DURATION (TEST_BUF_CLOCK_RATE * TEST_BUF_MS / 1000) + +static GstCaps * +generate_caps (void) +{ + return gst_caps_new_simple ("application/x-rtp", + "media", G_TYPE_STRING, "audio", + "clock-rate", G_TYPE_INT, TEST_BUF_CLOCK_RATE, NULL); +} + +static GstBuffer * +create_buffer (guint seq_num, guint32 ssrc) +{ + GstBuffer *buf; + guint8 *payload; + guint i; + GstClockTime dts = seq_num * TEST_BUF_DURATION; + guint32 rtp_ts = seq_num * TEST_RTP_TS_DURATION; + GstRTPBuffer rtp = GST_RTP_BUFFER_INIT; + + buf = gst_rtp_buffer_new_allocate (TEST_BUF_SIZE, 0, 0); + GST_BUFFER_DTS (buf) = dts; + + gst_rtp_buffer_map (buf, GST_MAP_READWRITE, &rtp); + gst_rtp_buffer_set_payload_type (&rtp, TEST_BUF_PT); + gst_rtp_buffer_set_seq (&rtp, seq_num); + gst_rtp_buffer_set_timestamp (&rtp, rtp_ts); + gst_rtp_buffer_set_ssrc (&rtp, ssrc); + + payload = gst_rtp_buffer_get_payload (&rtp); + for (i = 0; i < TEST_BUF_SIZE; i++) + payload[i] = 0xff; + + gst_rtp_buffer_unmap (&rtp); + + return buf; +} + +typedef struct +{ + GstHarness *rtp_sink; + GstHarness *rtcp_sink; + GstHarness *rtp_src; + GstHarness *rtcp_src; +} TestContext; + +static void +rtpssrcdemux_pad_added (G_GNUC_UNUSED GstElement * demux, GstPad * src_pad, + TestContext * ctx) +{ + GstHarness *h; + + h = gst_harness_new_with_element (ctx->rtp_sink->element, NULL, + GST_PAD_NAME (src_pad)); + + /* FIXME We should also check that pads have current caps, but this is not + * currently the case as both pads are created when the first pad receive a + * buffer. If the other pad is not linked, you'll get a pad without caps. + * Changing this implies not having both pads on 'on-new-ssrc' which would + * break rtpbin assumption. */ + + if (g_str_has_prefix (GST_PAD_NAME (src_pad), "src_")) { + g_assert (ctx->rtp_src == NULL); + ctx->rtp_src = h; + } else if (g_str_has_prefix (GST_PAD_NAME (src_pad), "rtcp_src_")) { + g_assert (ctx->rtcp_src == NULL); + ctx->rtcp_src = h; + } else { + g_assert_not_reached (); + } +} + +GST_START_TEST (test_event_forwarding) +{ + TestContext ctx = { NULL, }; + GstHarness *h; + GstEvent *event; + GstCaps *caps; + GstStructure *s; + guint ssrc; + + ctx.rtp_sink = h = gst_harness_new_with_padnames ("rtpssrcdemux", "sink", + NULL); + g_signal_connect (h->element, "pad_added", + G_CALLBACK (rtpssrcdemux_pad_added), &ctx); + + ctx.rtcp_sink = gst_harness_new_with_element (h->element, "rtcp_sink", NULL); + + gst_harness_set_src_caps (h, generate_caps ()); + gst_harness_push (h, create_buffer (0, TEST_BUF_SSRC)); + + g_assert (ctx.rtp_src); + g_assert (ctx.rtcp_src); + + gst_harness_push_event (h, gst_event_new_eos ()); + + /* We expect stream-start/caps/segment/eos */ + g_assert_cmpint (gst_harness_events_in_queue (ctx.rtp_src), ==, 4); + + event = gst_harness_pull_event (ctx.rtp_src); + g_assert_cmpint (event->type, ==, GST_EVENT_STREAM_START); + gst_event_unref (event); + + event = gst_harness_pull_event (ctx.rtp_src); + g_assert_cmpint (event->type, ==, GST_EVENT_CAPS); + gst_event_parse_caps (event, &caps); + s = gst_caps_get_structure (caps, 0); + g_assert (gst_structure_has_field (s, "ssrc")); + g_assert (gst_structure_get_uint (s, "ssrc", &ssrc)); + g_assert_cmpuint (ssrc, ==, TEST_BUF_SSRC); + gst_event_unref (event); + + event = gst_harness_pull_event (ctx.rtp_src); + g_assert_cmpint (event->type, ==, GST_EVENT_SEGMENT); + gst_event_unref (event); + + event = gst_harness_pull_event (ctx.rtp_src); + g_assert_cmpint (event->type, ==, GST_EVENT_EOS); + gst_event_unref (event); + + /* We pushed on the RTP pad, no events should have reached the RTCP pad */ + g_assert_cmpint (gst_harness_events_in_queue (ctx.rtcp_src), ==, 0); + + /* push EOS on the rtcp sink pad, to make sure it EOS properly, the harness + * will create the missing stream-start */ + gst_harness_push_event (ctx.rtcp_sink, gst_event_new_eos ()); + + g_assert_cmpint (gst_harness_events_in_queue (ctx.rtp_src), ==, 0); + g_assert_cmpint (gst_harness_events_in_queue (ctx.rtcp_src), ==, 2); + + event = gst_harness_pull_event (ctx.rtcp_src); + g_assert_cmpint (event->type, ==, GST_EVENT_STREAM_START); + gst_event_unref (event); + + event = gst_harness_pull_event (ctx.rtcp_src); + g_assert_cmpint (event->type, ==, GST_EVENT_EOS); + gst_event_unref (event); + + gst_harness_teardown (ctx.rtp_src); + gst_harness_teardown (ctx.rtcp_src); + gst_harness_teardown (ctx.rtcp_sink); + gst_harness_teardown (ctx.rtp_sink); +} + +GST_END_TEST; + +static Suite * +rtpssrcdemux_suite (void) +{ + Suite *s = suite_create ("rtpssrcdemux"); + TCase *tc_chain = tcase_create ("general"); + + suite_add_tcase (s, tc_chain); + tcase_add_test (tc_chain, test_event_forwarding); + + return s; +} + +GST_CHECK_MAIN (rtpssrcdemux); diff --git a/tests/check/meson.build b/tests/check/meson.build index 4b7f3f952..7ba0cb289 100644 --- a/tests/check/meson.build +++ b/tests/check/meson.build @@ -83,6 +83,7 @@ good_tests = [ [ 'elements/rtpstorage' ], [ 'elements/rtpred' ], [ 'elements/rtpulpfec' ], + [ 'elements/rtpssrcdemux' ], [ 'elements/souphttpsrc', not libsoup_dep.found(), [libsoup_dep] ], [ 'elements/spectrum' ], [ 'elements/shapewipe' ], -- GitLab From 40daf6322d07bed9c7bf30c89328d9f444257a97 Mon Sep 17 00:00:00 2001 From: Nicolas Dufresne Date: Wed, 28 Nov 2018 17:14:11 -0500 Subject: [PATCH 2/3] rtpssrcdemux: Hold on internal stream lock while pushing sticky This reverts "6f3734c305 rtpssrcdemux: Only forward stick events while holding the sinkpad stream lock" and actually hold on the internal stream lock. This prevents in some needed case having a second streaming thread poping in and messing up event ordering. --- gst/rtpmanager/gstrtpssrcdemux.c | 45 +++++--------------------------- 1 file changed, 6 insertions(+), 39 deletions(-) diff --git a/gst/rtpmanager/gstrtpssrcdemux.c b/gst/rtpmanager/gstrtpssrcdemux.c index dfccbefdc..9cd33f80b 100644 --- a/gst/rtpmanager/gstrtpssrcdemux.c +++ b/gst/rtpmanager/gstrtpssrcdemux.c @@ -142,9 +142,6 @@ struct _GstRtpSsrcDemuxPad GstPad *rtp_pad; GstCaps *caps; GstPad *rtcp_pad; - - gboolean pushed_initial_rtp_events; - gboolean pushed_initial_rtcp_events; }; /* find a src pad for a given SSRC, returns NULL if the SSRC was not found @@ -197,6 +194,7 @@ struct ForwardStickyEventData guint32 ssrc; }; +/* With internal stream lock held */ static gboolean forward_sticky_events (GstPad * pad, GstEvent ** event, gpointer user_data) { @@ -210,6 +208,7 @@ forward_sticky_events (GstPad * pad, GstEvent ** event, gpointer user_data) return TRUE; } +/* With internal stream lock held */ static void forward_initial_events (GstRtpSsrcDemux * demux, guint32 ssrc, GstPad * pad, PadType padtype) @@ -240,28 +239,17 @@ find_or_create_demux_pad_for_ssrc (GstRtpSsrcDemux * demux, guint32 ssrc, gchar *padname; GstRtpSsrcDemuxPad *demuxpad; GstPad *retpad; - gulong rtp_block, rtcp_block; GST_PAD_LOCK (demux); demuxpad = find_demux_pad_for_ssrc (demux, ssrc); if (demuxpad != NULL) { - gboolean forward = FALSE; - switch (padtype) { case RTP_PAD: retpad = gst_object_ref (demuxpad->rtp_pad); - if (!demuxpad->pushed_initial_rtp_events) { - forward = TRUE; - demuxpad->pushed_initial_rtp_events = TRUE; - } break; case RTCP_PAD: retpad = gst_object_ref (demuxpad->rtcp_pad); - if (!demuxpad->pushed_initial_rtcp_events) { - forward = TRUE; - demuxpad->pushed_initial_rtcp_events = TRUE; - } break; default: retpad = NULL; @@ -270,8 +258,6 @@ find_or_create_demux_pad_for_ssrc (GstRtpSsrcDemux * demux, guint32 ssrc, GST_PAD_UNLOCK (demux); - if (forward) - forward_initial_events (demux, ssrc, retpad, padtype); return retpad; } @@ -312,15 +298,8 @@ find_or_create_demux_pad_for_ssrc (GstRtpSsrcDemux * demux, guint32 ssrc, gst_pad_use_fixed_caps (rtcp_pad); gst_pad_set_active (rtcp_pad, TRUE); - if (padtype == RTP_PAD) { - demuxpad->pushed_initial_rtp_events = TRUE; - forward_initial_events (demux, ssrc, rtp_pad, padtype); - } else if (padtype == RTCP_PAD) { - demuxpad->pushed_initial_rtcp_events = TRUE; - forward_initial_events (demux, ssrc, rtcp_pad, padtype); - } else { - g_assert_not_reached (); - } + forward_initial_events (demux, ssrc, rtp_pad, RTP_PAD); + forward_initial_events (demux, ssrc, rtcp_pad, RTCP_PAD); gst_element_add_pad (GST_ELEMENT_CAST (demux), rtp_pad); gst_element_add_pad (GST_ELEMENT_CAST (demux), rtcp_pad); @@ -340,18 +319,10 @@ find_or_create_demux_pad_for_ssrc (GstRtpSsrcDemux * demux, guint32 ssrc, gst_object_ref (rtp_pad); gst_object_ref (rtcp_pad); - rtp_block = gst_pad_add_probe (rtp_pad, GST_PAD_PROBE_TYPE_BLOCK_DOWNSTREAM, - NULL, NULL, NULL); - rtcp_block = gst_pad_add_probe (rtcp_pad, GST_PAD_PROBE_TYPE_BLOCK_DOWNSTREAM, - NULL, NULL, NULL); - - GST_PAD_UNLOCK (demux); - g_signal_emit (G_OBJECT (demux), gst_rtp_ssrc_demux_signals[SIGNAL_NEW_SSRC_PAD], 0, ssrc, rtp_pad); - gst_pad_remove_probe (rtp_pad, rtp_block); - gst_pad_remove_probe (rtcp_pad, rtcp_block); + GST_PAD_UNLOCK (demux); gst_object_unref (rtp_pad); gst_object_unref (rtcp_pad); @@ -565,11 +536,7 @@ forward_event (GstPad * pad, gpointer user_data) for (walk = fdata->demux->srcpads; walk; walk = walk->next) { GstRtpSsrcDemuxPad *dpad = (GstRtpSsrcDemuxPad *) walk->data; - /* Only forward the event if the initial events have been through first, - * the initial events should be forwarded before any other event - * or buffer is pushed */ - if ((pad == dpad->rtp_pad && dpad->pushed_initial_rtp_events) || - (pad == dpad->rtcp_pad && dpad->pushed_initial_rtcp_events)) { + if (pad == dpad->rtp_pad || pad == dpad->rtcp_pad) { newevent = add_ssrc_and_ref (fdata->event, dpad->ssrc); break; } -- GitLab From d637567ab3f27e7e1ae8f278365b4fde6fd4f5db Mon Sep 17 00:00:00 2001 From: Nicolas Dufresne Date: Thu, 29 Nov 2018 14:54:06 -0500 Subject: [PATCH 3/3] rtpssrcdemux: Rename confusingly name lock macros This is an extra internal recurisve lock use to avoid having to take both sink pad streams lock all the time. This patch renamed it INTERLNAL_STREAM_LOCK/UNLOCK() to avoid confusion with possible upstream GST_PAD API. --- gst/rtpmanager/gstrtpssrcdemux.c | 32 ++++++++++++++++---------------- 1 file changed, 16 insertions(+), 16 deletions(-) diff --git a/gst/rtpmanager/gstrtpssrcdemux.c b/gst/rtpmanager/gstrtpssrcdemux.c index 9cd33f80b..3bcd07be0 100644 --- a/gst/rtpmanager/gstrtpssrcdemux.c +++ b/gst/rtpmanager/gstrtpssrcdemux.c @@ -80,8 +80,8 @@ GST_STATIC_PAD_TEMPLATE ("rtcp_src_%u", GST_STATIC_CAPS ("application/x-rtcp") ); -#define GST_PAD_LOCK(obj) (g_rec_mutex_lock (&(obj)->padlock)) -#define GST_PAD_UNLOCK(obj) (g_rec_mutex_unlock (&(obj)->padlock)) +#define INTERNAL_STREAM_LOCK(obj) (g_rec_mutex_lock (&(obj)->padlock)) +#define INTERNAL_STREAM_UNLOCK(obj) (g_rec_mutex_unlock (&(obj)->padlock)) typedef enum { @@ -240,7 +240,7 @@ find_or_create_demux_pad_for_ssrc (GstRtpSsrcDemux * demux, guint32 ssrc, GstRtpSsrcDemuxPad *demuxpad; GstPad *retpad; - GST_PAD_LOCK (demux); + INTERNAL_STREAM_LOCK (demux); demuxpad = find_demux_pad_for_ssrc (demux, ssrc); if (demuxpad != NULL) { @@ -256,7 +256,7 @@ find_or_create_demux_pad_for_ssrc (GstRtpSsrcDemux * demux, guint32 ssrc, g_assert_not_reached (); } - GST_PAD_UNLOCK (demux); + INTERNAL_STREAM_UNLOCK (demux); return retpad; } @@ -322,7 +322,7 @@ find_or_create_demux_pad_for_ssrc (GstRtpSsrcDemux * demux, guint32 ssrc, g_signal_emit (G_OBJECT (demux), gst_rtp_ssrc_demux_signals[SIGNAL_NEW_SSRC_PAD], 0, ssrc, rtp_pad); - GST_PAD_UNLOCK (demux); + INTERNAL_STREAM_UNLOCK (demux); gst_object_unref (rtp_pad); gst_object_unref (rtcp_pad); @@ -483,17 +483,17 @@ gst_rtp_ssrc_demux_clear_ssrc (GstRtpSsrcDemux * demux, guint32 ssrc) { GstRtpSsrcDemuxPad *dpad; - GST_PAD_LOCK (demux); + INTERNAL_STREAM_LOCK (demux); dpad = find_demux_pad_for_ssrc (demux, ssrc); if (dpad == NULL) { - GST_PAD_UNLOCK (demux); + INTERNAL_STREAM_UNLOCK (demux); goto unknown_pad; } GST_DEBUG_OBJECT (demux, "clearing pad for SSRC %08x", ssrc); demux->srcpads = g_slist_remove (demux->srcpads, dpad); - GST_PAD_UNLOCK (demux); + INTERNAL_STREAM_UNLOCK (demux); gst_pad_set_active (dpad->rtp_pad, FALSE); gst_pad_set_active (dpad->rtcp_pad, FALSE); @@ -532,7 +532,7 @@ forward_event (GstPad * pad, gpointer user_data) GSList *walk = NULL; GstEvent *newevent = NULL; - GST_PAD_LOCK (fdata->demux); + INTERNAL_STREAM_LOCK (fdata->demux); for (walk = fdata->demux->srcpads; walk; walk = walk->next) { GstRtpSsrcDemuxPad *dpad = (GstRtpSsrcDemuxPad *) walk->data; @@ -541,7 +541,7 @@ forward_event (GstPad * pad, gpointer user_data) break; } } - GST_PAD_UNLOCK (fdata->demux); + INTERNAL_STREAM_UNLOCK (fdata->demux); if (newevent) fdata->res &= gst_pad_push_event (pad, newevent); @@ -600,13 +600,13 @@ gst_rtp_ssrc_demux_chain (GstPad * pad, GstObject * parent, GstBuffer * buf) if (ret != GST_FLOW_OK) { /* check if the ssrc still there, may have been removed */ - GST_PAD_LOCK (demux); + INTERNAL_STREAM_LOCK (demux); dpad = find_demux_pad_for_ssrc (demux, ssrc); if (dpad == NULL || dpad->rtp_pad != srcpad) { /* SSRC was removed during the push ... ignore the error */ ret = GST_FLOW_OK; } - GST_PAD_UNLOCK (demux); + INTERNAL_STREAM_UNLOCK (demux); } gst_object_unref (srcpad); @@ -687,13 +687,13 @@ gst_rtp_ssrc_demux_rtcp_chain (GstPad * pad, GstObject * parent, if (ret != GST_FLOW_OK) { /* check if the ssrc still there, may have been removed */ - GST_PAD_LOCK (demux); + INTERNAL_STREAM_LOCK (demux); dpad = find_demux_pad_for_ssrc (demux, ssrc); if (dpad == NULL || dpad->rtcp_pad != srcpad) { /* SSRC was removed during the push ... ignore the error */ ret = GST_FLOW_OK; } - GST_PAD_UNLOCK (demux); + INTERNAL_STREAM_UNLOCK (demux); } gst_object_unref (srcpad); @@ -783,7 +783,7 @@ gst_rtp_ssrc_demux_iterate_internal_links_src (GstPad * pad, GstObject * parent) demux = GST_RTP_SSRC_DEMUX (parent); - GST_PAD_LOCK (demux); + INTERNAL_STREAM_LOCK (demux); for (current = demux->srcpads; current; current = g_slist_next (current)) { GstRtpSsrcDemuxPad *dpad = (GstRtpSsrcDemuxPad *) current->data; @@ -804,7 +804,7 @@ gst_rtp_ssrc_demux_iterate_internal_links_src (GstPad * pad, GstObject * parent) g_value_unset (&val); } - GST_PAD_UNLOCK (demux); + INTERNAL_STREAM_UNLOCK (demux); return it; } -- GitLab