diff --git a/docs/plugins/gst_plugins_cache.json b/docs/plugins/gst_plugins_cache.json index 716fbeb5c3f1e6d8825b51dc599ef79fc7eee59f..9ec2ca252c7619c952146df2ab9a62d8d8b3de39 100644 --- a/docs/plugins/gst_plugins_cache.json +++ b/docs/plugins/gst_plugins_cache.json @@ -83,6 +83,74 @@ }, "rank": "none" }, + "clocksync": { + "author": "Jan Schmidt ", + "description": "Synchronise buffers to the clock", + "hierarchy": [ + "GstClockSync", + "GstElement", + "GstObject", + "GInitiallyUnowned", + "GObject" + ], + "klass": "Generic", + "long-name": "ClockSync", + "pad-templates": { + "sink": { + "caps": "ANY", + "direction": "sink", + "presence": "always" + }, + "src": { + "caps": "ANY", + "direction": "src", + "presence": "always" + } + }, + "properties": { + "signal-handoffs": { + "blurb": "Send a signal before pushing the buffer", + "construct": false, + "construct-only": false, + "default": "false", + "type-name": "gboolean", + "writable": true + }, + "sync": { + "blurb": "Synchronize to pipeline clock", + "construct": false, + "construct-only": false, + "default": "true", + "type-name": "gboolean", + "writable": true + }, + "ts-offset": { + "blurb": "Timestamp offset in nanoseconds for synchronisation, negative for earlier sync", + "construct": false, + "construct-only": false, + "default": "0", + "max": "9223372036854775807", + "min": "-9223372036854775808", + "type-name": "gint64", + "writable": true + } + }, + "rank": "none", + "signals": { + "handoff": { + "args": [ + "GstBuffer" + ], + "retval": "void" + }, + "handoff-list": { + "args": [ + "GstBufferList" + ], + "retval": "void" + } + } + }, "concat": { "author": "Sebastian Dr\u00f6ge ", "description": "Concatenate multiple streams", diff --git a/plugins/elements/gstclocksync.c b/plugins/elements/gstclocksync.c new file mode 100644 index 0000000000000000000000000000000000000000..fb5efbe435b4be83c5db173da6ad8acba063e79d --- /dev/null +++ b/plugins/elements/gstclocksync.c @@ -0,0 +1,583 @@ +/* + * GStreamer + * Copyright (C) 1999,2000 Erik Walthinsen + * 2000 Wim Taymans + * 2005 Wim Taymans + * Copyright (C) 2020 Jan Schmidt + * + * gstclocksync.c: + * + * This library is free software; you can redistribute it and/or + * modify it under the terms of the GNU Library General Public + * License as published by the Free Software Foundation; either + * version 2 of the License, or (at your option) any later version. + * + * This library is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU + * Library General Public License for more details. + * + * You should have received a copy of the GNU Library General Public + * License along with this library; if not, write to the + * Free Software Foundation, Inc., 51 Franklin St, Fifth Floor, + * Boston, MA 02110-1301, USA. + */ + +/** + * SECTION:element-clocksync + * @title: clocksync + * + * Simple element that passes all buffers and buffer-lists intact, but + * synchronising them to the clock before passing. + * + * Synchronisation to the clock is on by default, but can be turned + * off by setting the 'sync' property to FALSE + * + * + * Example launch line + * |[ + * gst-launch -v -m videotestsrc ! clocksync ! fakesink silent=TRUE + * ]| + * + */ + +#ifdef HAVE_CONFIG_H +# include +#endif + +#include + +#include "gstclocksync.h" + +GST_DEBUG_CATEGORY_STATIC (gst_clock_sync_debug); +#define GST_CAT_DEFAULT gst_clock_sync_debug + +/* ClockSync signals and args */ +enum +{ + SIGNAL_HANDOFF, + SIGNAL_HANDOFF_LIST, + /* FILL ME */ + LAST_SIGNAL +}; + +#define DEFAULT_SYNC TRUE +#define DEFAULT_TS_OFFSET 0 +#define DEFAULT_SIGNAL_HANDOFFS FALSE + +enum +{ + PROP_0, + PROP_SYNC, + PROP_TS_OFFSET, + PROP_SIGNAL_HANDOFFS, +}; + +static GstStaticPadTemplate sinktemplate = GST_STATIC_PAD_TEMPLATE ("sink", + GST_PAD_SINK, + GST_PAD_ALWAYS, + GST_STATIC_CAPS ("ANY") + ); + +static GstStaticPadTemplate srctemplate = GST_STATIC_PAD_TEMPLATE ("src", + GST_PAD_SRC, + GST_PAD_ALWAYS, + GST_STATIC_CAPS ("ANY") + ); + +#define _do_init \ + GST_DEBUG_CATEGORY_INIT (gst_clock_sync_debug, "clocksync", 0, "clocksync element"); +#define gst_clock_sync_parent_class parent_class +G_DEFINE_TYPE_WITH_CODE (GstClockSync, gst_clock_sync, GST_TYPE_ELEMENT, + _do_init); + +static void gst_clock_sync_finalize (GObject * object); + +static void gst_clock_sync_set_property (GObject * object, guint prop_id, + const GValue * value, GParamSpec * pspec); +static void gst_clock_sync_get_property (GObject * object, guint prop_id, + GValue * value, GParamSpec * pspec); + +static gboolean gst_clock_sync_sink_event (GstPad * pad, GstObject * parent, + GstEvent * event); +static GstFlowReturn gst_clock_sync_chain (GstPad * pad, GstObject * parent, + GstBuffer * buf); +static GstFlowReturn gst_clock_sync_chain_list (GstPad * pad, + GstObject * parent, GstBufferList * buflist); +static gboolean gst_clock_sync_src_query (GstPad * pad, GstObject * parent, + GstQuery * query); +static GstStateChangeReturn gst_clocksync_change_state (GstElement * element, + GstStateChange transition); + +static guint gst_clocksync_signals[LAST_SIGNAL] = { 0 }; + +static void +gst_clock_sync_class_init (GstClockSyncClass * klass) +{ + GObjectClass *gobject_class; + GstElementClass *gstelement_class; + + gobject_class = (GObjectClass *) klass; + gstelement_class = (GstElementClass *) klass; + + gobject_class->set_property = gst_clock_sync_set_property; + gobject_class->get_property = gst_clock_sync_get_property; + gobject_class->finalize = gst_clock_sync_finalize; + + g_object_class_install_property (gobject_class, PROP_SYNC, + g_param_spec_boolean ("sync", "Synchronize", + "Synchronize to pipeline clock", DEFAULT_SYNC, + G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS)); + g_object_class_install_property (gobject_class, PROP_TS_OFFSET, + g_param_spec_int64 ("ts-offset", "Timestamp offset for synchronisation", + "Timestamp offset in nanoseconds for synchronisation, negative for earlier sync", + G_MININT64, G_MAXINT64, DEFAULT_TS_OFFSET, + G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS)); + /** + * GstClockSync::signal-handoffs + * + * If set to %TRUE, the clocksync will emit a handoff signal when handling a buffer. + * When set to %FALSE, no signal will be emitted, which might improve performance. + */ + g_object_class_install_property (gobject_class, PROP_SIGNAL_HANDOFFS, + g_param_spec_boolean ("signal-handoffs", + "Signal handoffs", "Send a signal before pushing the buffer", + DEFAULT_SIGNAL_HANDOFFS, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS)); + + /** + * GstClockSync::handoff: + * @clocksync: the clocksync instance + * @buffer: the buffer that just has been received + * @pad: the pad that received it + * + * This signal gets emitted before passing the buffer downstream. + */ + gst_clocksync_signals[SIGNAL_HANDOFF] = + g_signal_new ("handoff", G_TYPE_FROM_CLASS (klass), G_SIGNAL_RUN_LAST, + G_STRUCT_OFFSET (GstClockSyncClass, handoff), NULL, NULL, + NULL, G_TYPE_NONE, 1, GST_TYPE_BUFFER | G_SIGNAL_TYPE_STATIC_SCOPE); + + /** + * GstClockSync::handoff-list: + * @clocksync: the clocksync instance + * @buffer_list: the buffer list that just has been received + * @pad: the pad that received it + * + * This signal gets emitted before passing the buffer list downstream. + */ + gst_clocksync_signals[SIGNAL_HANDOFF_LIST] = + g_signal_new ("handoff-list", G_TYPE_FROM_CLASS (klass), + G_SIGNAL_RUN_LAST, G_STRUCT_OFFSET (GstClockSyncClass, handoff_list), + NULL, NULL, NULL, G_TYPE_NONE, 1, + GST_TYPE_BUFFER_LIST | G_SIGNAL_TYPE_STATIC_SCOPE); + + gstelement_class->change_state = + GST_DEBUG_FUNCPTR (gst_clocksync_change_state); + + gst_element_class_set_static_metadata (gstelement_class, + "ClockSync", + "Generic", + "Synchronise buffers to the clock", "Jan Schmidt "); + + gst_element_class_add_static_pad_template (gstelement_class, &srctemplate); + gst_element_class_add_static_pad_template (gstelement_class, &sinktemplate); +} + +static void +gst_clock_sync_finalize (GObject * object) +{ + GstClockSync *clocksync = GST_CLOCKSYNC (object); + + g_cond_clear (&clocksync->blocked_cond); + + G_OBJECT_CLASS (parent_class)->finalize (object); +} + +static void +gst_clock_sync_init (GstClockSync * clocksync) +{ + clocksync->sinkpad = gst_pad_new_from_static_template (&sinktemplate, "sink"); + gst_pad_set_event_function (clocksync->sinkpad, + GST_DEBUG_FUNCPTR (gst_clock_sync_sink_event)); + gst_pad_set_chain_function (clocksync->sinkpad, + GST_DEBUG_FUNCPTR (gst_clock_sync_chain)); + gst_pad_set_chain_list_function (clocksync->sinkpad, + GST_DEBUG_FUNCPTR (gst_clock_sync_chain_list)); + GST_PAD_SET_PROXY_CAPS (clocksync->sinkpad); + gst_element_add_pad (GST_ELEMENT (clocksync), clocksync->sinkpad); + + clocksync->srcpad = gst_pad_new_from_static_template (&srctemplate, "src"); + + gst_pad_set_query_function (clocksync->srcpad, gst_clock_sync_src_query); + + GST_PAD_SET_PROXY_CAPS (clocksync->srcpad); + gst_element_add_pad (GST_ELEMENT (clocksync), clocksync->srcpad); + + clocksync->signal_handoffs = DEFAULT_SIGNAL_HANDOFFS; + clocksync->ts_offset = DEFAULT_TS_OFFSET; + clocksync->sync = DEFAULT_SYNC; + g_cond_init (&clocksync->blocked_cond); +} + +static void +gst_clock_sync_set_property (GObject * object, guint prop_id, + const GValue * value, GParamSpec * pspec) +{ + GstClockSync *clocksync = GST_CLOCKSYNC (object); + + switch (prop_id) { + case PROP_SYNC: + clocksync->sync = g_value_get_boolean (value); + break; + case PROP_TS_OFFSET: + clocksync->ts_offset = g_value_get_int64 (value); + break; + case PROP_SIGNAL_HANDOFFS: + clocksync->signal_handoffs = g_value_get_boolean (value); + break; + default: + G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec); + break; + } +} + +static void +gst_clock_sync_get_property (GObject * object, guint prop_id, + GValue * value, GParamSpec * pspec) +{ + GstClockSync *clocksync = GST_CLOCKSYNC (object); + + switch (prop_id) { + case PROP_SYNC: + g_value_set_boolean (value, clocksync->sync); + break; + case PROP_TS_OFFSET: + g_value_set_int64 (value, clocksync->ts_offset); + break; + case PROP_SIGNAL_HANDOFFS: + g_value_set_boolean (value, clocksync->signal_handoffs); + break; + default: + G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec); + break; + } +} + +static GstFlowReturn +gst_clocksync_do_sync (GstClockSync * clocksync, GstClockTime running_time) +{ + GstFlowReturn ret = GST_FLOW_OK; + GstClock *clock; + + if (!clocksync->sync) + return GST_FLOW_OK; + + if (running_time == GST_CLOCK_TIME_NONE) + return GST_FLOW_OK; /* Can't sync on an invalid time either way */ + + if (clocksync->segment.format != GST_FORMAT_TIME) + return GST_FLOW_OK; + + GST_OBJECT_LOCK (clocksync); + + if (clocksync->flushing) { + GST_OBJECT_UNLOCK (clocksync); + return GST_FLOW_FLUSHING; + } + + while (clocksync->blocked && !clocksync->flushing) + g_cond_wait (&clocksync->blocked_cond, GST_OBJECT_GET_LOCK (clocksync)); + + if (clocksync->flushing) { + GST_OBJECT_UNLOCK (clocksync); + return GST_FLOW_FLUSHING; + } + + if ((clock = GST_ELEMENT (clocksync)->clock)) { + GstClockReturn cret; + GstClockTime timestamp; + GstClockTimeDiff ts_offset = clocksync->ts_offset; + + timestamp = running_time + GST_ELEMENT (clocksync)->base_time + + clocksync->upstream_latency; + if (ts_offset < 0) { + ts_offset = -ts_offset; + if (ts_offset < timestamp) + timestamp -= ts_offset; + else + timestamp = 0; + } else + timestamp += ts_offset; + + /* save id if we need to unlock */ + clocksync->clock_id = gst_clock_new_single_shot_id (clock, timestamp); + GST_OBJECT_UNLOCK (clocksync); + + cret = gst_clock_id_wait (clocksync->clock_id, NULL); + + GST_OBJECT_LOCK (clocksync); + if (clocksync->clock_id) { + gst_clock_id_unref (clocksync->clock_id); + clocksync->clock_id = NULL; + } + if (cret == GST_CLOCK_UNSCHEDULED || clocksync->flushing) + ret = GST_FLOW_FLUSHING; + } + GST_OBJECT_UNLOCK (clocksync); + + return ret; +} + +static gboolean +gst_clock_sync_sink_event (GstPad * pad, GstObject * parent, GstEvent * event) +{ + GstClockSync *clocksync = GST_CLOCKSYNC (parent); + gboolean ret; + + GST_LOG_OBJECT (clocksync, "Received %s event: %" GST_PTR_FORMAT, + GST_EVENT_TYPE_NAME (event), event); + + switch (GST_EVENT_TYPE (event)) { + case GST_EVENT_SEGMENT: + /* store the event for synching */ + gst_event_copy_segment (event, &clocksync->segment); + break; + case GST_EVENT_GAP: + { + GstClockTime start, dur; + + if (clocksync->segment.format != GST_FORMAT_TIME) + break; + + gst_event_parse_gap (event, &start, &dur); + if (GST_CLOCK_TIME_IS_VALID (start)) { + start = gst_segment_to_running_time (&clocksync->segment, + GST_FORMAT_TIME, start); + + gst_clocksync_do_sync (clocksync, start); + } + break; + } + case GST_EVENT_FLUSH_START: + GST_OBJECT_LOCK (clocksync); + clocksync->flushing = TRUE; + g_cond_broadcast (&clocksync->blocked_cond); + if (clocksync->clock_id) { + GST_DEBUG_OBJECT (clocksync, "unlock clock wait"); + gst_clock_id_unschedule (clocksync->clock_id); + } + GST_OBJECT_UNLOCK (clocksync); + break; + case GST_EVENT_FLUSH_STOP: + GST_OBJECT_LOCK (clocksync); + clocksync->flushing = FALSE; + gst_segment_init (&clocksync->segment, GST_FORMAT_UNDEFINED); + GST_OBJECT_UNLOCK (clocksync); + default: + break; + } + + /* Always handle all events as normal: */ + ret = gst_pad_event_default (pad, parent, event); + return ret; +} + +static GstFlowReturn +gst_clock_sync_chain (GstPad * pad, GstObject * parent, GstBuffer * buf) +{ + GstClockSync *clocksync = GST_CLOCKSYNC (parent); + GstFlowReturn ret = GST_FLOW_OK; + + GST_LOG_OBJECT (clocksync, "Handling buffer %" GST_PTR_FORMAT, buf); + + if (clocksync->segment.format == GST_FORMAT_TIME) { + GstClockTime runtimestamp = 0; + GstClockTime rundts, runpts; + + rundts = gst_segment_to_running_time (&clocksync->segment, + GST_FORMAT_TIME, GST_BUFFER_DTS (buf)); + runpts = gst_segment_to_running_time (&clocksync->segment, + GST_FORMAT_TIME, GST_BUFFER_PTS (buf)); + + if (GST_CLOCK_TIME_IS_VALID (rundts)) + runtimestamp = rundts; + else if (GST_CLOCK_TIME_IS_VALID (runpts)) + runtimestamp = runpts; + + ret = gst_clocksync_do_sync (clocksync, runtimestamp); + if (ret != GST_FLOW_OK) { + GST_LOG_OBJECT (clocksync, + "Interrupted while waiting on the clock. Dropping buffer."); + gst_buffer_unref (buf); + return ret; + } + } + + if (clocksync->signal_handoffs) + g_signal_emit (clocksync, gst_clocksync_signals[SIGNAL_HANDOFF], 0, buf); + + /* Forward the buffer */ + return gst_pad_push (clocksync->srcpad, buf); +} + +static GstFlowReturn +gst_clock_sync_chain_list (GstPad * pad, GstObject * parent, + GstBufferList * buffer_list) +{ + GstClockSync *clocksync = GST_CLOCKSYNC (parent); + GstFlowReturn ret = GST_FLOW_OK; + GstBuffer *buf; + + GST_LOG_OBJECT (clocksync, "Handling buffer list %" GST_PTR_FORMAT, + buffer_list); + + if (gst_buffer_list_length (buffer_list) == 0) + goto done; + + buf = gst_buffer_list_get (buffer_list, 0); + + if (clocksync->segment.format == GST_FORMAT_TIME) { + GstClockTime runtimestamp = 0; + GstClockTime rundts, runpts; + + rundts = gst_segment_to_running_time (&clocksync->segment, + GST_FORMAT_TIME, GST_BUFFER_DTS (buf)); + runpts = gst_segment_to_running_time (&clocksync->segment, + GST_FORMAT_TIME, GST_BUFFER_PTS (buf)); + + if (GST_CLOCK_TIME_IS_VALID (rundts)) + runtimestamp = rundts; + else if (GST_CLOCK_TIME_IS_VALID (runpts)) + runtimestamp = runpts; + + ret = gst_clocksync_do_sync (clocksync, runtimestamp); + if (ret != GST_FLOW_OK) { + gst_buffer_list_unref (buffer_list); + return ret; + } + } + + if (clocksync->signal_handoffs) + g_signal_emit (clocksync, gst_clocksync_signals[SIGNAL_HANDOFF_LIST], 0, + buffer_list); + + /* Forward the buffer list */ +done: + return gst_pad_push_list (clocksync->srcpad, buffer_list); +} + +static gboolean +gst_clock_sync_src_query (GstPad * pad, GstObject * parent, GstQuery * query) +{ + GstClockSync *clocksync = GST_CLOCKSYNC (parent); + gboolean res; + + res = gst_pad_query_default (pad, parent, query); + + switch (GST_QUERY_TYPE (query)) { + case GST_QUERY_LATENCY:{ + gboolean live = FALSE; + GstClockTime min = 0, max = 0; + + if (res) { + gst_query_parse_latency (query, &live, &min, &max); + + if (clocksync->sync && max < min) { + GST_ELEMENT_WARNING (parent, CORE, CLOCK, (NULL), + ("Impossible to configure latency upstream of clocksync sync=true:" + " max %" GST_TIME_FORMAT " < min %" + GST_TIME_FORMAT ". Add queues or other buffering elements.", + GST_TIME_ARGS (max), GST_TIME_ARGS (min))); + } + } + + /* Ignore the upstream latency if it is not live */ + GST_OBJECT_LOCK (clocksync); + if (live) + clocksync->upstream_latency = min; + else { + clocksync->upstream_latency = 0; + /* if upstream is non-live source, then there is no + * limit on the maximum latency */ + max = -1; + } + + GST_OBJECT_UNLOCK (clocksync); + + GST_DEBUG_OBJECT (clocksync, + "Configured upstream latency = %" GST_TIME_FORMAT, + GST_TIME_ARGS (clocksync->upstream_latency)); + + gst_query_set_latency (query, live || clocksync->sync, min, max); + break; + } + default: + break; + } + + return res; +} + +static GstStateChangeReturn +gst_clocksync_change_state (GstElement * element, GstStateChange transition) +{ + GstClockSync *clocksync = GST_CLOCKSYNC (element); + GstStateChangeReturn ret; + gboolean no_preroll = FALSE; + + switch (transition) { + case GST_STATE_CHANGE_NULL_TO_READY: + break; + case GST_STATE_CHANGE_READY_TO_PAUSED: + GST_OBJECT_LOCK (clocksync); + clocksync->flushing = FALSE; + clocksync->blocked = TRUE; + GST_OBJECT_UNLOCK (clocksync); + if (clocksync->sync) + no_preroll = TRUE; + break; + case GST_STATE_CHANGE_PAUSED_TO_PLAYING: + GST_OBJECT_LOCK (clocksync); + clocksync->blocked = FALSE; + g_cond_broadcast (&clocksync->blocked_cond); + GST_OBJECT_UNLOCK (clocksync); + break; + case GST_STATE_CHANGE_PAUSED_TO_READY: + GST_OBJECT_LOCK (clocksync); + clocksync->flushing = TRUE; + if (clocksync->clock_id) { + GST_DEBUG_OBJECT (clocksync, "unlock clock wait"); + gst_clock_id_unschedule (clocksync->clock_id); + } + clocksync->blocked = FALSE; + g_cond_broadcast (&clocksync->blocked_cond); + GST_OBJECT_UNLOCK (clocksync); + break; + default: + break; + } + + ret = GST_ELEMENT_CLASS (parent_class)->change_state (element, transition); + + switch (transition) { + case GST_STATE_CHANGE_PLAYING_TO_PAUSED: + GST_OBJECT_LOCK (clocksync); + clocksync->upstream_latency = 0; + clocksync->blocked = TRUE; + GST_OBJECT_UNLOCK (clocksync); + if (clocksync->sync) + no_preroll = TRUE; + break; + case GST_STATE_CHANGE_PAUSED_TO_READY: + break; + case GST_STATE_CHANGE_READY_TO_NULL: + break; + default: + break; + } + + if (no_preroll && ret == GST_STATE_CHANGE_SUCCESS) + ret = GST_STATE_CHANGE_NO_PREROLL; + + return ret; +} diff --git a/plugins/elements/gstclocksync.h b/plugins/elements/gstclocksync.h new file mode 100644 index 0000000000000000000000000000000000000000..44ab48e27f9b6e58695f3d940de5266170fce3f3 --- /dev/null +++ b/plugins/elements/gstclocksync.h @@ -0,0 +1,81 @@ +/* GStreamer + * Copyright (C) 1999,2000 Erik Walthinsen + * 2000 Wim Taymans + * 2005 Wim Taymans + * Copyright (C) 2020 Jan Schmidt + * + * gstclocksync.h: + * + * This library is free software; you can redistribute it and/or + * modify it under the terms of the GNU Library General Public + * License as published by the Free Software Foundation; either + * version 2 of the License, or (at your option) any later version. + * + * This library is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU + * Library General Public License for more details. + * + * You should have received a copy of the GNU Library General Public + * License along with this library; if not, write to the + * Free Software Foundation, Inc., 51 Franklin St, Fifth Floor, + * Boston, MA 02110-1301, USA. + */ + +#ifndef __GST_CLOCKSYNC_H__ +#define __GST_CLOCKSYNC_H__ + +#include + +G_BEGIN_DECLS + +/* #defines don't like whitespacey bits */ +#define GST_TYPE_CLOCKSYNC \ + (gst_clock_sync_get_type()) +#define GST_CLOCKSYNC(obj) \ + (G_TYPE_CHECK_INSTANCE_CAST((obj),GST_TYPE_CLOCKSYNC,GstClockSync)) +#define GST_CLOCKSYNC_CLASS(klass) \ + (G_TYPE_CHECK_CLASS_CAST((klass),GST_TYPE_CLOCKSYNC,GstClockSyncClass)) +#define GST_IS_CLOCKSYNC(obj) \ + (G_TYPE_CHECK_INSTANCE_TYPE((obj),GST_TYPE_CLOCKSYNC)) +#define GST_IS_CLOCKSYNC_CLASS(klass) \ + (G_TYPE_CHECK_CLASS_TYPE((klass),GST_TYPE_CLOCKSYNC)) + +typedef struct _GstClockSync GstClockSync; +typedef struct _GstClockSyncClass GstClockSyncClass; + +struct _GstClockSync +{ + GstElement element; + + /*< private >*/ + GstPad *sinkpad, *srcpad; + + gboolean signal_handoffs; + + GstSegment segment; + GstClockID clock_id; + gboolean flushing; + gboolean sync; + + GCond blocked_cond; + gboolean blocked; + GstClockTimeDiff ts_offset; + + GstClockTime upstream_latency; +}; + +struct _GstClockSyncClass +{ + GstElementClass parent_class; + + /* signals */ + void (*handoff) (GstElement *element, GstBuffer *buf); + void (*handoff_list) (GstElement *element, GstBufferList *buffer_list); +}; + +GType gst_clock_sync_get_type (void); + +G_END_DECLS + +#endif /* __GST_CLOCKSYNC_H__ */ diff --git a/plugins/elements/gstelements.c b/plugins/elements/gstelements.c index f0324392a005e575278ed9e5ed4002d959baa1d8..5ed328c2a5f1b3163dde4d4ef9b2f6b5b876af4e 100644 --- a/plugins/elements/gstelements.c +++ b/plugins/elements/gstelements.c @@ -32,6 +32,7 @@ #include #include "gstcapsfilter.h" +#include "gstclocksync.h" #include "gstconcat.h" #include "gstdataurisrc.h" #include "gstdownloadbuffer.h" @@ -59,6 +60,9 @@ plugin_init (GstPlugin * plugin) if (!gst_element_register (plugin, "capsfilter", GST_RANK_NONE, gst_capsfilter_get_type ())) return FALSE; + if (!gst_element_register (plugin, "clocksync", GST_RANK_NONE, + gst_clock_sync_get_type ())) + return FALSE; if (!gst_element_register (plugin, "concat", GST_RANK_NONE, gst_concat_get_type ())) return FALSE; diff --git a/plugins/elements/meson.build b/plugins/elements/meson.build index bb0bca35b6d4de31661bafb118ce61ee9d3957dc..c1d91d2f3406e40c4bd50ce07d76904b391406b9 100644 --- a/plugins/elements/meson.build +++ b/plugins/elements/meson.build @@ -1,5 +1,6 @@ gst_elements_sources = [ 'gstcapsfilter.c', + 'gstclocksync.c', 'gstconcat.c', 'gstdataurisrc.c', 'gstdownloadbuffer.c', diff --git a/tests/check/elements/clocksync.c b/tests/check/elements/clocksync.c new file mode 100644 index 0000000000000000000000000000000000000000..c9d4fb6347f2169ff6493d91002f17881686ee34 --- /dev/null +++ b/tests/check/elements/clocksync.c @@ -0,0 +1,235 @@ +/* GStreamer + * + * unit test for clocksync + * + * Copyright (C) <2005> Thomas Vander Stichele + * Copyright (C) <2015> Havard Graff + * Copyright (C) <2020> Jan Schmidt + * + * This library is free software; you can redistribute it and/or + * modify it under the terms of the GNU Library General Public + * License as published by the Free Software Foundation; either + * version 2 of the License, or (at your option) any later version. + * + * This library is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU + * Library General Public License for more details. + * + * You should have received a copy of the GNU Library General Public + * License along with this library; if not, write to the + * Free Software Foundation, Inc., 51 Franklin St, Fifth Floor, + * Boston, MA 02110-1301, USA. + */ +#ifdef HAVE_CONFIG_H +#include "config.h" +#endif + +#include +#include + +GST_START_TEST (test_one_buffer) +{ + GstHarness *h = gst_harness_new_parse ("clocksync sync=false"); + GstBuffer *buffer_in; + GstBuffer *buffer_out; + + gst_harness_set_src_caps_str (h, "mycaps"); + + buffer_in = gst_buffer_new_and_alloc (4); + ASSERT_BUFFER_REFCOUNT (buffer_in, "buffer", 1); + + gst_buffer_fill (buffer_in, 0, "data", 4); + + /* pushing gives away my reference ... */ + fail_unless_equals_int (GST_FLOW_OK, gst_harness_push (h, buffer_in)); + + /* ... but it should end up being collected on GstHarness queue */ + fail_unless_equals_int (1, gst_harness_buffers_in_queue (h)); + buffer_out = gst_harness_pull (h); + + fail_unless (buffer_in == buffer_out); + ASSERT_BUFFER_REFCOUNT (buffer_out, "buffer", 1); + + /* cleanup */ + gst_buffer_unref (buffer_out); + gst_harness_teardown (h); +} + +GST_END_TEST; + +static void +handoff_func (GstElement * clocksync, GstBuffer * buf, GstBuffer ** ret) +{ + (void) clocksync; + *ret = buf; +} + +GST_START_TEST (test_signal_handoffs) +{ + GstHarness *h = gst_harness_new_parse ("clocksync sync=false name=c"); + GstBuffer *buffer_in; + GstBuffer *buffer_signaled = NULL; + GstElement *c = gst_bin_get_by_name (GST_BIN (h->element), "c"); + + gst_harness_set_src_caps_str (h, "mycaps"); + + /* connect to the handoff signal */ + g_signal_connect (c, "handoff", G_CALLBACK (handoff_func), &buffer_signaled); + + /* first, turn off signal-handoffs */ + g_object_set (c, "signal-handoffs", FALSE, NULL); + + /* then push a buffer */ + buffer_in = gst_buffer_new_and_alloc (4); + fail_unless_equals_int (GST_FLOW_OK, gst_harness_push (h, buffer_in)); + + /* verify that we got no buffer signaled */ + fail_unless (buffer_signaled == NULL); + + /* now turn on signal-handoffs */ + g_object_set (c, "signal-handoffs", TRUE, NULL); + + /* then push another buffer */ + buffer_in = gst_buffer_new_and_alloc (4); + fail_unless_equals_int (GST_FLOW_OK, gst_harness_push (h, buffer_in)); + + /* verify the buffer signaled is equal to the one pushed in */ + fail_unless (buffer_signaled == buffer_in); + ASSERT_BUFFER_REFCOUNT (buffer_signaled, "buffer", 1); + + /* cleanup */ + gst_object_unref (c); + gst_harness_teardown (h); +} + +GST_END_TEST; + +GST_START_TEST (test_sync_on_timestamp) +{ + /* the reason to use the queue in front of the clocksync element + is to effectively make gst_harness_push asynchronous, not locking + up the test, waiting for gst_clock_id_wait */ + GstHarness *h = gst_harness_new_parse ("queue ! clocksync"); + GstBuffer *buf; + GstClock *clock; + GstClockTime timestamp = 123456789; + + /* use testclock */ + gst_harness_use_testclock (h); + gst_harness_set_src_caps_str (h, "mycaps"); + + /* make a buffer and set the timestamp */ + buf = gst_buffer_new (); + GST_BUFFER_PTS (buf) = timestamp; + + /* push the buffer, and verify it does *not* make it through */ + gst_harness_push (h, buf); + fail_unless_equals_int (0, gst_harness_buffers_in_queue (h)); + + /* verify the clocksync element has registered exactly one GstClockID */ + fail_unless (gst_harness_wait_for_clock_id_waits (h, 1, 42)); + + /* crank the clock and pull the buffer */ + gst_harness_crank_single_clock_wait (h); + buf = gst_harness_pull (h); + + /* verify that the buffer has the right timestamp, and that the time on + the clock is equal to the timestamp */ + fail_unless_equals_int64 (timestamp, GST_BUFFER_PTS (buf)); + clock = gst_element_get_clock (h->element); + fail_unless_equals_int64 (timestamp, gst_clock_get_time (clock)); + + /* cleanup */ + gst_object_unref (clock); + gst_buffer_unref (buf); + gst_harness_teardown (h); +} + +GST_END_TEST; + +GST_START_TEST (test_no_sync_on_timestamp) +{ + GstHarness *h = gst_harness_new_parse ("clocksync sync=false"); + GstBuffer *buf; + GstClockTime timestamp = 123456789; + + /* use testclock */ + gst_harness_use_testclock (h); + gst_harness_set_src_caps_str (h, "mycaps"); + + /* make a buffer and set the timestamp */ + buf = gst_buffer_new (); + GST_BUFFER_PTS (buf) = timestamp; + + /* push the buffer, and verify it was forwarded immediately */ + gst_harness_push (h, buf); + fail_unless_equals_int (1, gst_harness_buffers_in_queue (h)); + + buf = gst_harness_pull (h); + /* verify that the buffer has the right timestamp */ + fail_unless_equals_int64 (timestamp, GST_BUFFER_PTS (buf)); + + /* cleanup */ + gst_buffer_unref (buf); + gst_harness_teardown (h); +} + +GST_END_TEST; + +GST_START_TEST (test_stopping_element_unschedules_sync) +{ + /* the reason to use the queue in front of the clocksync element + is to effectively make gst_harness_push asynchronous, not locking + up the test, waiting for gst_clock_id_wait */ + GstHarness *h = gst_harness_new_parse ("queue ! clocksync sync=true"); + GstBuffer *buf; + GstClockTime timestamp = 123456789; + + /* use testclock */ + gst_harness_use_testclock (h); + gst_harness_set_src_caps_str (h, "mycaps"); + + /* make a buffer and set the timestamp */ + buf = gst_buffer_new (); + GST_BUFFER_PTS (buf) = timestamp; + + /* push the buffer, and verify it does *not* make it through */ + gst_harness_push (h, buf); + fail_unless_equals_int (0, gst_harness_buffers_in_queue (h)); + + /* verify the clocksync element has registered exactly one GstClockID */ + fail_unless (gst_harness_wait_for_clock_id_waits (h, 1, 42)); + + /* setting clocksync to READY should unschedule the sync */ + gst_element_set_state (h->element, GST_STATE_READY); + + /* verify the clocksync element no longer waits on the clock */ + fail_unless (gst_harness_wait_for_clock_id_waits (h, 0, 42)); + + /* and that the waiting buffer was dropped */ + fail_unless_equals_int (0, gst_harness_buffers_received (h)); + + gst_harness_teardown (h); +} + +GST_END_TEST; + +static Suite * +clocksync_suite (void) +{ + Suite *s = suite_create ("clocksync"); + TCase *tc_chain = tcase_create ("general"); + + suite_add_tcase (s, tc_chain); + tcase_add_test (tc_chain, test_one_buffer); + tcase_add_test (tc_chain, test_signal_handoffs); + tcase_add_test (tc_chain, test_sync_on_timestamp); + tcase_add_test (tc_chain, test_stopping_element_unschedules_sync); + tcase_add_test (tc_chain, test_no_sync_on_timestamp); + + + return s; +} + +GST_CHECK_MAIN (clocksync); diff --git a/tests/check/meson.build b/tests/check/meson.build index 2372931cb59222249836931176d6dec55353c8bf..b47325bce4716752134ef7de3ea4690b27c697a6 100644 --- a/tests/check/meson.build +++ b/tests/check/meson.build @@ -78,6 +78,7 @@ core_tests = [ [ 'libs/typefindhelper.c' ], [ 'libs/queuearray.c' ], [ 'elements/capsfilter.c', not gst_registry ], + [ 'elements/clocksync.c', not gst_registry or not gst_parse ], [ 'elements/concat.c', not gst_registry ], [ 'elements/dataurisrc.c', not gst_registry ], [ 'elements/fakesrc.c', not gst_registry ],