Commit 1cda8197 authored by Wim Taymans's avatar Wim Taymans

Added support for live sources and other elements that cannot do preroll.

Original commit message from CVS:
Added support for live sources and other elements that
cannot do preroll.
Updated design docs, added live-source design doc.
Implemented live source functionality in basesrc
Fix error condition in _bin_get_state()
Implement live source handling in -launch.
Added check for live sources.
Fixed case in GstBin where elements were changed state
multiple times.
parent b8d13efa
2005-06-23 Wim Taymans <wim@fluendo.com>
* check/Makefile.am:
* check/states/sinks.c: (START_TEST), (gst_object_suite), (main):
* docs/design/part-live-source.txt:
* docs/design/part-states.txt:
* gst/base/gstbasesrc.c: (gst_basesrc_init),
(gst_basesrc_set_live), (gst_basesrc_is_live),
(gst_basesrc_get_range), (gst_basesrc_activate),
(gst_basesrc_change_state):
* gst/base/gstbasesrc.h:
* gst/elements/gstfakesrc.c: (gst_fakesrc_class_init),
(gst_fakesrc_set_property), (gst_fakesrc_get_property):
* gst/gstbin.c: (gst_bin_get_state), (gst_bin_change_state):
* gst/gstelement.c: (gst_element_get_state_func),
(gst_element_set_state):
* gst/gstelement.h:
* gst/gsttypes.h:
* tools/gst-launch.c: (event_loop), (main):
Added support for live sources and other elements that
cannot do preroll.
Updated design docs, added live-source design doc.
Implemented live source functionality in basesrc
Fix error condition in _bin_get_state()
Implement live source handling in -launch.
Added check for live sources.
Fixed case in GstBin where elements were changed state
multiple times.
2005-06-23 Andy Wingo <wingo@pobox.com>
* check/gst/gstpad.c (test_get_allowed_caps, test_refcount): Fix
......
......@@ -39,6 +39,7 @@ TESTS = $(top_builddir)/tools/gst-register \
gst/gstvalue \
pipelines/simple_launch_lines \
pipelines/cleanup \
states/sinks \
gst-libs/gdp
check_PROGRAMS = $(TESTS)
......
/* GStreamer
*
* unit test for sinks
*
* Copyright (C) <2005> Wim Taymans <wim at fluendo dot com>
*
* This library is free software; you can redistribute it and/or
* modify it under the terms of the GNU Library General Public
* License as published by the Free Software Foundation; either
* version 2 of the License, or (at your option) any later version.
*
* This library is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
* Library General Public License for more details.
*
* You should have received a copy of the GNU Library General Public
* License along with this library; if not, write to the
* Free Software Foundation, Inc., 59 Temple Place - Suite 330,
* Boston, MA 02111-1307, USA.
*/
#include "../gstcheck.h"
/* a sink should go ASYNC to PAUSE. forcing PLAYING is possible */
START_TEST (test_sink)
{
GstElement *sink;
GstElementStateReturn ret;
GstElementState current, pending;
sink = gst_element_factory_make ("fakesink", "sink");
ret = gst_element_set_state (sink, GST_STATE_PAUSED);
fail_unless (ret == GST_STATE_ASYNC, "no async state return");
ret = gst_element_set_state (sink, GST_STATE_PLAYING);
fail_unless (ret == GST_STATE_SUCCESS, "cannot force play");
ret = gst_element_get_state (sink, &current, &pending, NULL);
fail_unless (ret == GST_STATE_SUCCESS, "not playing");
fail_unless (current == GST_STATE_PLAYING, "not playing");
fail_unless (pending == GST_STATE_VOID_PENDING, "not playing");
}
END_TEST
/* a sink should go ASYNC to PAUSE. PAUSE should complete when
* prerolled. */
START_TEST (test_src_sink)
{
GstElement *sink, *src, *pipeline;
GstElementStateReturn ret;
GstElementState current, pending;
GstPad *srcpad, *sinkpad;
pipeline = gst_pipeline_new ("pipeline");
src = gst_element_factory_make ("fakesrc", "src");
sink = gst_element_factory_make ("fakesink", "sink");
gst_bin_add (GST_BIN (pipeline), src);
gst_bin_add (GST_BIN (pipeline), sink);
srcpad = gst_element_get_pad (src, "src");
sinkpad = gst_element_get_pad (sink, "sink");
gst_pad_link (srcpad, sinkpad);
gst_object_unref (GST_OBJECT (srcpad));
gst_object_unref (GST_OBJECT (sinkpad));
ret = gst_element_set_state (pipeline, GST_STATE_PAUSED);
fail_unless (ret == GST_STATE_SUCCESS, "no success state return");
ret = gst_element_set_state (pipeline, GST_STATE_PLAYING);
fail_unless (ret == GST_STATE_SUCCESS, "cannot start play");
ret = gst_element_get_state (pipeline, &current, &pending, NULL);
fail_unless (ret == GST_STATE_SUCCESS, "not playing");
fail_unless (current == GST_STATE_PLAYING, "not playing");
fail_unless (pending == GST_STATE_VOID_PENDING, "not playing");
}
END_TEST
/* a pipeline with live source should return NO_PREROLL in
* PAUSE. When removing the live source it should return ASYNC
* from the sink */
START_TEST (test_livesrc_remove)
{
GstElement *sink, *src, *pipeline;
GstElementStateReturn ret;
GstElementState current, pending;
GstPad *srcpad, *sinkpad;
GTimeVal tv;
pipeline = gst_pipeline_new ("pipeline");
src = gst_element_factory_make ("fakesrc", "src");
g_object_set (G_OBJECT (src), "is-live", TRUE, NULL);
sink = gst_element_factory_make ("fakesink", "sink");
gst_bin_add (GST_BIN (pipeline), src);
gst_bin_add (GST_BIN (pipeline), sink);
srcpad = gst_element_get_pad (src, "src");
sinkpad = gst_element_get_pad (sink, "sink");
gst_pad_link (srcpad, sinkpad);
gst_object_unref (GST_OBJECT (srcpad));
gst_object_unref (GST_OBJECT (sinkpad));
ret = gst_element_set_state (pipeline, GST_STATE_PAUSED);
fail_unless (ret == GST_STATE_NO_PREROLL, "no no_preroll state return");
ret = gst_element_get_state (src, &current, &pending, NULL);
fail_unless (ret == GST_STATE_NO_PREROLL, "not paused");
fail_unless (current == GST_STATE_PAUSED, "not paused");
fail_unless (pending == GST_STATE_VOID_PENDING, "not playing");
gst_bin_remove (GST_BIN (pipeline), src);
GST_TIME_TO_TIMEVAL (0, tv);
ret = gst_element_get_state (pipeline, &current, &pending, &tv);
fail_unless (ret == GST_STATE_ASYNC, "not async");
fail_unless (current == GST_STATE_PAUSED, "not paused");
fail_unless (pending == GST_STATE_VOID_PENDING, "not playing");
}
END_TEST
/* a sink should go ASYNC to PAUSE. PAUSE does not complete
* since we have a live source. */
START_TEST (test_livesrc_sink)
{
GstElement *sink, *src, *pipeline;
GstElementStateReturn ret;
GstElementState current, pending;
GstPad *srcpad, *sinkpad;
pipeline = gst_pipeline_new ("pipeline");
src = gst_element_factory_make ("fakesrc", "src");
g_object_set (G_OBJECT (src), "is-live", TRUE, NULL);
sink = gst_element_factory_make ("fakesink", "sink");
gst_bin_add (GST_BIN (pipeline), src);
gst_bin_add (GST_BIN (pipeline), sink);
srcpad = gst_element_get_pad (src, "src");
sinkpad = gst_element_get_pad (sink, "sink");
gst_pad_link (srcpad, sinkpad);
gst_object_unref (GST_OBJECT (srcpad));
gst_object_unref (GST_OBJECT (sinkpad));
ret = gst_element_set_state (pipeline, GST_STATE_PAUSED);
fail_unless (ret == GST_STATE_NO_PREROLL, "no no_preroll state return");
ret = gst_element_get_state (src, &current, &pending, NULL);
fail_unless (ret == GST_STATE_NO_PREROLL, "not paused");
fail_unless (current == GST_STATE_PAUSED, "not paused");
fail_unless (pending == GST_STATE_VOID_PENDING, "not playing");
ret = gst_element_get_state (pipeline, &current, &pending, NULL);
fail_unless (ret == GST_STATE_NO_PREROLL, "not paused");
fail_unless (current == GST_STATE_PAUSED, "not paused");
fail_unless (pending == GST_STATE_VOID_PENDING, "not playing");
ret = gst_element_set_state (pipeline, GST_STATE_PLAYING);
fail_unless (ret == GST_STATE_SUCCESS, "cannot force play");
ret = gst_element_get_state (pipeline, &current, &pending, NULL);
fail_unless (ret == GST_STATE_SUCCESS, "not playing");
fail_unless (current == GST_STATE_PLAYING, "not playing");
fail_unless (pending == GST_STATE_VOID_PENDING, "not playing");
}
END_TEST
/* test: try changing state of sinks */
Suite * gst_object_suite (void)
{
Suite *s = suite_create ("Sinks");
TCase *tc_chain = tcase_create ("general");
/* turn off timeout */
tcase_set_timeout (tc_chain, 60);
suite_add_tcase (s, tc_chain);
tcase_add_test (tc_chain, test_sink);
tcase_add_test (tc_chain, test_src_sink);
tcase_add_test (tc_chain, test_livesrc_remove);
tcase_add_test (tc_chain, test_livesrc_sink);
return s;
}
int
main (int argc, char **argv)
{
int nf;
Suite *s = gst_object_suite ();
SRunner *sr = srunner_create (s);
gst_check_init (&argc, &argv);
srunner_run_all (sr, CK_NORMAL);
nf = srunner_ntests_failed (sr);
srunner_free (sr);
return nf;
}
Live sources
------------
A live source such as an element capturing audio or video need to be handled
in a special way. It does not make sense to start the dataflow in the PAUSED
state for those devices as the user might wait a long time between going from
PAUSED to PLAYING, making the previously captured buffers irrelevant.
A live source therefore only produces buffers in the PLAYING state. This has
implications for sinks waiting for a buffer to complete the preroll state
since such a buffer might never arrive.
Live sources return NO_PREROLL when going to the PAUSED state to inform the
bin/pipeline that this element will not be able to produce data in the
PAUSED state.
When performing a get_state() on a bin with a non-zero timeout value, the
bin must be sure that there are no live sources in the pipeline because else
the get_state() function would block on the sinks.
A gstbin therefore always performs a zero timeout get_state() on its
elements to discover the NO_PREROLL (and ERROR) elements before performing
a blocking wait on all elements.
Scheduling
----------
Live sources can not produce data in the paused state. They block in the
getrange function or in the loop function until they go to PLAYING.
Latency
-------
The live source timestamps its data with the time of the clock at the
time the data was captured. Normally it will take some time to capture
the first sample of data and the last sample. This means that when the
buffer arrives at the sink, it will already be late and will be dropped.
The latency is the time it takes to construct one buffer of data.
......@@ -60,8 +60,11 @@ The _set_state() function can return 3 possible values:
change or for sinks that need to receive the first buffer
before they can complete the state change (preroll).
In the case of an async state change, it is not possible to proceed to the next
state until the current state change completed. After receiving an ASYNC return
GST_STATE_NO_PREROLL: The state change is completed successfully but the element
will not be able to produce data in the PAUSED state.
In the case of an async state change, it is possible to proceed to the next
state before the current state change completed. After receiving an ASYNC return
value, you can use _element_get_state() to poll the status of the element.
When setting the state of an element, the PENDING_STATE is set to the required
......@@ -129,15 +132,25 @@ on the elements.
If after calling the state function on all children, one of the children returned
ASYNC, the function returns ASYNC as well.
If after calling the state function on all children, one of the children returned
NO_PREROLL, the function returns NO_PREROLL as well.
The current state of the bin can be retrieved with _get_state(). This function will
call the _get_state() function on all the elements. If one of the children returns
FAILURE or ASYNC, the bin reports FAILURE or ASYNC respectively. The bin also
updates its state variables after polling its children, this means that the state
variables of the bin are only updated after calling _get_state() on the bin.
call the _get_state() function on all the elements.
First the bin will perform a _get_state() on all children with a 0 timeout. This
is to find any children with an ERROR/NO_PREROLL result value.
Then the bin performs the _get_state() with the requested timeout. The reason for
the 2 phases is that when an ERROR or NO_PREROLL result is found, a blocking
wait on the sinks might never return.
The _get_state() function will be called on the children with the same timout value
so the function can potentially block timeout*num_children.
The bin also updates its state variables after polling its children, this means that
the state variables of the bin are only updated after calling _get_state() on the bin.
Implementing states in elements
-------------------------------
......
......@@ -49,7 +49,7 @@ enum
PROP_0,
PROP_BLOCKSIZE,
PROP_HAS_LOOP,
PROP_HAS_GETRANGE
PROP_HAS_GETRANGE,
};
static GstElementClass *parent_class = NULL;
......@@ -164,6 +164,10 @@ gst_basesrc_init (GstBaseSrc * basesrc, gpointer g_class)
gst_pad_set_checkgetrange_function (pad, gst_basesrc_check_get_range);
basesrc->is_live = FALSE;
basesrc->live_lock = g_mutex_new ();
basesrc->live_cond = g_cond_new ();
/* hold ref to pad */
basesrc->srcpad = pad;
gst_element_add_pad (GST_ELEMENT (basesrc), pad);
......@@ -176,6 +180,26 @@ gst_basesrc_init (GstBaseSrc * basesrc, gpointer g_class)
GST_FLAG_UNSET (basesrc, GST_BASESRC_STARTED);
}
void
gst_basesrc_set_live (GstBaseSrc * src, gboolean live)
{
GST_LIVE_LOCK (src);
src->is_live = live;
GST_LIVE_UNLOCK (src);
}
gboolean
gst_basesrc_is_live (GstBaseSrc * src)
{
gboolean result;
GST_LIVE_LOCK (src);
result = src->is_live;
GST_LIVE_UNLOCK (src);
return result;
}
static void
gst_basesrc_set_dataflow_funcs (GstBaseSrc * this)
{
......@@ -460,6 +484,16 @@ gst_basesrc_get_range (GstPad * pad, guint64 offset, guint length,
src = GST_BASESRC (GST_OBJECT_PARENT (pad));
bclass = GST_BASESRC_GET_CLASS (src);
GST_LIVE_LOCK (src);
if (src->is_live) {
while (!src->live_running) {
GST_DEBUG ("live source waiting for running state");
GST_LIVE_WAIT (src);
GST_DEBUG ("live source unlocked");
}
}
GST_LIVE_UNLOCK (src);
if (!GST_FLAG_IS_SET (src, GST_BASESRC_STARTED))
goto not_started;
......@@ -725,6 +759,11 @@ gst_basesrc_activate (GstPad * pad, GstActivateMode mode)
gst_basesrc_stop (basesrc);
break;
case GST_ACTIVATE_NONE:
GST_LIVE_LOCK (basesrc);
basesrc->live_running = TRUE;
GST_LIVE_SIGNAL (basesrc);
GST_LIVE_UNLOCK (basesrc);
/* step 1, unblock clock sync (if any) */
gst_basesrc_unlock (basesrc);
......@@ -746,7 +785,8 @@ static GstElementStateReturn
gst_basesrc_change_state (GstElement * element)
{
GstBaseSrc *basesrc;
GstElementStateReturn result = GST_STATE_FAILURE;
GstElementStateReturn result = GST_STATE_SUCCESS;
GstElementStateReturn presult;
GstElementState transition;
basesrc = GST_BASESRC (element);
......@@ -757,17 +797,35 @@ gst_basesrc_change_state (GstElement * element)
case GST_STATE_NULL_TO_READY:
break;
case GST_STATE_READY_TO_PAUSED:
GST_LIVE_LOCK (element);
if (basesrc->is_live) {
result = GST_STATE_NO_PREROLL;
basesrc->live_running = FALSE;
}
GST_LIVE_UNLOCK (element);
break;
case GST_STATE_PAUSED_TO_PLAYING:
GST_LIVE_LOCK (element);
basesrc->live_running = TRUE;
GST_LIVE_SIGNAL (element);
GST_LIVE_UNLOCK (element);
break;
default:
break;
}
result = GST_ELEMENT_CLASS (parent_class)->change_state (element);
if ((presult = GST_ELEMENT_CLASS (parent_class)->change_state (element)) !=
GST_STATE_SUCCESS)
return presult;
switch (transition) {
case GST_STATE_PLAYING_TO_PAUSED:
GST_LIVE_LOCK (element);
if (basesrc->is_live) {
result = GST_STATE_NO_PREROLL;
basesrc->live_running = FALSE;
}
GST_LIVE_UNLOCK (element);
break;
case GST_STATE_PAUSED_TO_READY:
if (!gst_basesrc_stop (basesrc))
......
......@@ -54,11 +54,28 @@ typedef struct _GstBaseSrcClass GstBaseSrcClass;
#define GST_BASESRC_PAD(obj) (GST_BASESRC (obj)->srcpad)
#define GST_LIVE_GET_LOCK(elem) (GST_BASESRC(elem)->live_lock)
#define GST_LIVE_LOCK(elem) g_mutex_lock(GST_LIVE_GET_LOCK(elem))
#define GST_LIVE_TRYLOCK(elem) g_mutex_trylock(GST_LIVE_GET_LOCK(elem))
#define GST_LIVE_UNLOCK(elem) g_mutex_unlock(GST_LIVE_GET_LOCK(elem))
#define GST_LIVE_GET_COND(elem) (GST_BASESRC(elem)->live_cond)
#define GST_LIVE_WAIT(elem) g_cond_wait (GST_LIVE_GET_COND (elem), GST_LIVE_GET_LOCK (elem))
#define GST_LIVE_TIMED_WAIT(elem, timeval) g_cond_timed_wait (GST_LIVE_GET_COND (elem), GST_LIVE_GET_LOCK (elem),\
timeval)
#define GST_LIVE_SIGNAL(elem) g_cond_signal (GST_LIVE_GET_COND (elem));
#define GST_LIVE_BROADCAST(elem) g_cond_broadcast (GST_LIVE_GET_COND (elem));
struct _GstBaseSrc {
GstElement element;
GstPad *srcpad;
/*< protected >*/ /* with LIVE_LOCK */
GMutex *live_lock;
GCond *live_cond;
gboolean is_live;
gboolean live_running;
/*< protected >*/ /* with LOCK */
gint blocksize; /* size of buffers when operating push based */
gboolean has_loop; /* some scheduling properties */
......@@ -114,6 +131,9 @@ struct _GstBaseSrcClass {
GType gst_basesrc_get_type(void);
void gst_basesrc_set_live (GstBaseSrc *src, gboolean live);
gboolean gst_basesrc_is_live (GstBaseSrc *src);
G_END_DECLS
#endif /* __GST_BASESRC_H__ */
......@@ -90,7 +90,8 @@ enum
PROP_PARENTSIZE,
PROP_LAST_MESSAGE,
PROP_HAS_LOOP,
PROP_HAS_GETRANGE
PROP_HAS_GETRANGE,
PROP_IS_LIVE
};
#define GST_TYPE_FAKESRC_OUTPUT (gst_fakesrc_output_get_type())
......@@ -284,6 +285,10 @@ gst_fakesrc_class_init (GstFakeSrcClass * klass)
g_param_spec_boolean ("has-getrange", "Has getrange function",
"True if the element exposes a getrange function", TRUE,
G_PARAM_READWRITE | G_PARAM_CONSTRUCT));
g_object_class_install_property (G_OBJECT_CLASS (klass), PROP_IS_LIVE,
g_param_spec_boolean ("is-live", "Is this a live source",
"True if the element cannot produce data in PAUSED", FALSE,
G_PARAM_READWRITE | G_PARAM_CONSTRUCT));
gst_fakesrc_signals[SIGNAL_HANDOFF] =
g_signal_new ("handoff", G_TYPE_FROM_CLASS (klass), G_SIGNAL_RUN_LAST,
......@@ -361,8 +366,10 @@ gst_fakesrc_set_property (GObject * object, guint prop_id, const GValue * value,
GParamSpec * pspec)
{
GstFakeSrc *src;
GstBaseSrc *basesrc;
src = GST_FAKESRC (object);
basesrc = GST_BASESRC (object);
switch (prop_id) {
case PROP_OUTPUT:
......@@ -428,6 +435,9 @@ gst_fakesrc_set_property (GObject * object, guint prop_id, const GValue * value,
g_return_if_fail (!GST_FLAG_IS_SET (object, GST_BASESRC_STARTED));
src->has_getrange = g_value_get_boolean (value);
break;
case PROP_IS_LIVE:
gst_basesrc_set_live (basesrc, g_value_get_boolean (value));
break;
default:
G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
break;
......@@ -439,11 +449,13 @@ gst_fakesrc_get_property (GObject * object, guint prop_id, GValue * value,
GParamSpec * pspec)
{
GstFakeSrc *src;
GstBaseSrc *basesrc;
/* it's not null if we got it, but it might not be ours */
g_return_if_fail (GST_IS_FAKESRC (object));
src = GST_FAKESRC (object);
basesrc = GST_BASESRC (object);
switch (prop_id) {
case PROP_OUTPUT:
......@@ -500,6 +512,9 @@ gst_fakesrc_get_property (GObject * object, guint prop_id, GValue * value,
case PROP_HAS_GETRANGE:
g_value_set_boolean (value, src->has_getrange);
break;
case PROP_IS_LIVE:
g_value_set_boolean (value, gst_basesrc_is_live (basesrc));
break;
default:
G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
break;
......
......@@ -822,8 +822,13 @@ gst_bin_iterate_sinks (GstBin * bin)
return result;
}
/* this functions loops over all children, as soon as one does
* not return SUCCESS, we return that value.
/* 2 phases:
* 1) check state of all children with 0 timeout to find ERROR and
* NO_PREROLL elements. return if found.
* 2) perform full blocking wait with requested timeout.
*
* 2) cannot be performed when 1) returns results as the sinks might
* not be able to complete the state change making 2) block forever.
*
* MT safe
*/
......@@ -832,18 +837,76 @@ gst_bin_get_state (GstElement * element, GstElementState * state,
GstElementState * pending, GTimeVal * timeout)
{
GstBin *bin = GST_BIN (element);
GstElementStateReturn ret;
GstElementStateReturn ret = GST_STATE_SUCCESS;
GList *children;
guint32 children_cookie;
gboolean zero_timeout;
GST_CAT_INFO_OBJECT (GST_CAT_STATES, element, "getting state");
/* we cannot take the state lock yet as we might block when querying
* the children, holding the lock too long for no reason. */
zero_timeout = timeout != NULL && timeout->tv_sec == 0
&& timeout->tv_usec == 0;
/* if we have a non zero timeout we must make sure not to block
* on the sinks when we have NO_PREROLL elements. This is why we do
* a quick check if there are still NO_PREROLL elements. We also
* catch the error elements this way. */
GST_STATE_LOCK (bin);
if (!zero_timeout) {
GST_LOCK (bin);
GTimeVal tv;
gboolean have_no_preroll = FALSE;
gboolean have_async = FALSE;
GST_CAT_INFO_OBJECT (GST_CAT_STATES, element, "checking for NO_PREROLL");
/* use 0 timeout so we don't block on the sinks */
GST_TIME_TO_TIMEVAL (0, tv);
children = bin->children;
while (children) {
GstElement *child = GST_ELEMENT_CAST (children->data);
ret = gst_element_get_state (child, NULL, NULL, &tv);
switch (ret) {
/* report FAILURE or NO_PREROLL immediatly */
case GST_STATE_FAILURE:
GST_UNLOCK (bin);
goto report;
case GST_STATE_NO_PREROLL:
/* we have to continue scanning as there might be
* ERRORS too */
have_no_preroll = TRUE;
break;
case GST_STATE_ASYNC:
have_async = TRUE;
break;
default:
break;
}
children = g_list_next (children);
}