Commit ba5cecf3 authored by Jan Schmidt's avatar Jan Schmidt

examples/: Update a couple of the examples to work again.

Original commit message from CVS:
* examples/Makefile.am:
* examples/helloworld/helloworld.c: (event_loop), (main):
* examples/queue/queue.c: (event_loop), (main):
* examples/queue2/queue2.c: (main):
Update a couple of the examples to work again.

* gst/base/gstbasesink.c: (gst_base_sink_preroll_queue_empty),
(gst_base_sink_preroll_queue_flush), (gst_base_sink_handle_event):
Spelling corrections and extra debug.

* gst/gstbin.c: (gst_bin_class_init), (gst_bin_init), (is_eos),
(gst_bin_add_func), (bin_element_is_sink), (gst_bin_get_state),
(gst_bin_change_state), (gst_bin_dispose), (bin_bus_handler):
* gst/gstbin.h:
* gst/gstpipeline.c: (gst_pipeline_init), (gst_pipeline_dispose),
(gst_pipeline_change_state):
* gst/gstpipeline.h:
Move the bus handler for children to the GstBin, and create a
separate bus for receiving messages from children to the one the
bus sends 'upwards' on.
parent 03aa950f
......@@ -7,9 +7,6 @@ endif
dirs = \
helloworld \
queue \
queue2 \
queue3 \
queue4 \
launch \
thread \
plugins \
......@@ -20,6 +17,10 @@ dirs = \
pwg \
retag
#queue2 \
#queue3 \
#queue4
SUBDIRS = $(dirs) \
$(GST_LOADSAVE_DIRS)
......
#include <stdlib.h>
#include <gst/gst.h>
static void
event_loop (GstElement * pipe)
{
GstBus *bus;
GstMessageType revent;
GstMessage *message = NULL;
bus = gst_element_get_bus (GST_ELEMENT (pipe));
while (TRUE) {
revent = gst_bus_poll (bus, GST_MESSAGE_ANY, -1);
message = gst_bus_pop (bus);
g_assert (message != NULL);
switch (revent) {
case GST_MESSAGE_EOS:
gst_message_unref (message);
return;
case GST_MESSAGE_WARNING:
case GST_MESSAGE_ERROR:{
GError *gerror;
gchar *debug;
gst_message_parse_error (message, &gerror, &debug);
gst_object_default_error (GST_MESSAGE_SRC (message), gerror, debug);
gst_message_unref (message);
g_error_free (gerror);
g_free (debug);
return;
}
default:
gst_message_unref (message);
break;
}
}
}
int
main (int argc, char *argv[])
{
GstElement *bin, *filesrc, *decoder, *osssink;
GstElement *bin, *filesrc, *decoder, *audiosink;
gst_init (&argc, &argv);
......@@ -29,19 +67,20 @@ main (int argc, char *argv[])
return -1;
}
/* and an audio sink */
osssink = gst_element_factory_make ("osssink", "play_audio");
g_assert (osssink);
audiosink = gst_element_factory_make ("alsasink", "play_audio");
g_assert (audiosink);
/* add objects to the main pipeline */
gst_bin_add_many (GST_BIN (bin), filesrc, decoder, osssink, NULL);
gst_bin_add_many (GST_BIN (bin), filesrc, decoder, audiosink, NULL);
/* link the elements */
gst_element_link_many (filesrc, decoder, osssink, NULL);
gst_element_link_many (filesrc, decoder, audiosink, NULL);
/* start playing */
gst_element_set_state (bin, GST_STATE_PLAYING);
while (gst_bin_iterate (GST_BIN (bin)));
/* Run event loop listening for bus messages until EOS or ERROR */
event_loop (bin);
/* stop the bin */
gst_element_set_state (bin, GST_STATE_NULL);
......
#include <stdlib.h>
#include <gst/gst.h>
/* This example uses the queue element to create a buffer between 2 elements.
* The scheduler automatically uses 2 threads, 1 to feed and another to consume
* data from the queue buffer
*/
/* Event loop to listen to events posted on the GstBus from the pipeline. Exits
* on EOS or ERROR events
*/
static void
event_loop (GstElement * pipe)
{
GstBus *bus;
GstMessageType revent;
GstMessage *message = NULL;
bus = gst_element_get_bus (GST_ELEMENT (pipe));
while (TRUE) {
revent = gst_bus_poll (bus, GST_MESSAGE_ANY, -1);
message = gst_bus_pop (bus);
g_assert (message != NULL);
switch (revent) {
case GST_MESSAGE_EOS:
gst_message_unref (message);
return;
case GST_MESSAGE_WARNING:
case GST_MESSAGE_ERROR:{
GError *gerror;
gchar *debug;
gst_message_parse_error (message, &gerror, &debug);
gst_object_default_error (GST_MESSAGE_SRC (message), gerror, debug);
gst_message_unref (message);
g_error_free (gerror);
g_free (debug);
return;
}
default:
gst_message_unref (message);
break;
}
}
}
int
main (int argc, char *argv[])
{
GstElement *filesrc, *osssink, *parse, *decode, *queue;
GstElement *filesrc, *audiosink, *decode, *queue;
GstElement *pipeline;
GstElement *thread;
gst_init (&argc, &argv);
......@@ -15,10 +60,6 @@ main (int argc, char *argv[])
exit (-1);
}
/* create a new thread to hold the elements */
thread = gst_thread_new ("thread");
g_assert (thread != NULL);
/* create a new pipeline to hold the elements */
pipeline = gst_pipeline_new ("pipeline");
g_assert (pipeline != NULL);
......@@ -28,27 +69,27 @@ main (int argc, char *argv[])
g_assert (filesrc != NULL);
g_object_set (G_OBJECT (filesrc), "location", argv[1], NULL);
parse = gst_element_factory_make ("mp3parse", "parse");
decode = gst_element_factory_make ("mad", "decode");
g_assert (decode != NULL);
queue = gst_element_factory_make ("queue", "queue");
g_assert (queue != NULL);
/* and an audio sink */
osssink = gst_element_factory_make ("osssink", "play_audio");
g_assert (osssink != NULL);
audiosink = gst_element_factory_make ("alsasink", "play_audio");
g_assert (audiosink != NULL);
/* add objects to the main pipeline */
gst_bin_add_many (GST_BIN (pipeline), filesrc, parse, decode, queue, NULL);
gst_bin_add (GST_BIN (thread), osssink);
gst_bin_add (GST_BIN (pipeline), thread);
gst_bin_add_many (GST_BIN (pipeline), filesrc, decode, queue, audiosink,
NULL);
gst_element_link_many (filesrc, parse, decode, queue, osssink, NULL);
gst_element_link_many (filesrc, decode, queue, audiosink, NULL);
/* start playing */
gst_element_set_state (GST_ELEMENT (pipeline), GST_STATE_PLAYING);
while (gst_bin_iterate (GST_BIN (pipeline)));
/* Listen for EOS */
event_loop (pipeline);
gst_element_set_state (GST_ELEMENT (pipeline), GST_STATE_NULL);
......
......@@ -15,9 +15,8 @@ eos (GstElement * element, gpointer data)
int
main (int argc, char *argv[])
{
GstElement *filesrc, *osssink, *queue;
GstElement *filesrc, *audiosink, *queue;
GstElement *pipeline;
GstElement *thread;
gst_init (&argc, &argv);
......@@ -26,10 +25,6 @@ main (int argc, char *argv[])
exit (-1);
}
/* create a new thread to hold the elements */
thread = gst_thread_new ("thread");
g_assert (thread != NULL);
/* create a new bin to hold the elements */
pipeline = gst_pipeline_new ("pipeline");
g_assert (pipeline != NULL);
......@@ -43,18 +38,18 @@ main (int argc, char *argv[])
queue = gst_element_factory_make ("queue", "queue");
/* and an audio sink */
osssink = gst_element_factory_make ("osssink", "play_audio");
g_assert (osssink != NULL);
audiosink = gst_element_factory_make ("alsasink", "play_audio");
g_assert (audiosink != NULL);
/* add objects to the main pipeline */
/*
gst_pipeline_add_src(GST_PIPELINE(pipeline), filesrc);
gst_pipeline_add_sink(GST_PIPELINE(pipeline), queue);
gst_bin_add(GST_BIN(thread), osssink);
gst_bin_add(GST_BIN (pipeline), audiosink);
gst_pad_link(gst_element_get_pad(queue,"src"),
gst_element_get_pad(osssink,"sink"));
gst_element_get_pad(audiosink,"sink"));
if (!gst_pipeline_autoplug(GST_PIPELINE(pipeline))) {
g_print("cannot autoplug pipeline\n");
......
......@@ -402,10 +402,10 @@ gst_base_sink_preroll_queue_empty (GstBaseSink * basesink, GstPad * pad)
GST_PREROLL_UNLOCK (pad);
if (is_buffer) {
GST_DEBUG ("poped buffer %p", obj);
GST_DEBUG ("popped buffer %p", obj);
ret = gst_base_sink_handle_buffer (basesink, GST_BUFFER (obj));
} else {
GST_DEBUG ("poped event %p", obj);
GST_DEBUG ("popped event %p", obj);
gst_base_sink_handle_event (basesink, GST_EVENT (obj));
ret = GST_FLOW_OK;
}
......@@ -427,7 +427,7 @@ gst_base_sink_preroll_queue_flush (GstBaseSink * basesink, GstPad * pad)
GST_DEBUG ("flushing queue %p", basesink);
if (q) {
while ((obj = g_queue_pop_head (q))) {
GST_DEBUG ("poped %p", obj);
GST_DEBUG ("popped %p", obj);
gst_mini_object_unref (obj);
}
}
......@@ -814,6 +814,7 @@ gst_base_sink_handle_event (GstBaseSink * basesink, GstEvent * event)
/* if we are still EOS, we can post the EOS message */
if (basesink->eos) {
/* ok, now we can post the message */
GST_DEBUG_OBJECT (basesink, "Now posting EOS");
gst_element_post_message (GST_ELEMENT (basesink),
gst_message_new_eos (GST_OBJECT (basesink)));
}
......
......@@ -68,10 +68,11 @@ static GstClock *gst_bin_get_clock_func (GstElement * element);
static void gst_bin_set_clock_func (GstElement * element, GstClock * clock);
static void gst_bin_set_manager (GstElement * element, GstPipeline * manager);
static void gst_bin_set_bus (GstElement * element, GstBus * bus);
static void gst_bin_set_scheduler (GstElement * element, GstScheduler * sched);
static gboolean gst_bin_send_event (GstElement * element, GstEvent * event);
static GstBusSyncReply bin_bus_handler (GstBus * bus,
GstMessage * message, GstBin * bin);
static gboolean gst_bin_query (GstElement * element, GstQuery * query);
#ifndef GST_DISABLE_LOADSAVE
......@@ -179,7 +180,6 @@ gst_bin_class_init (GstBinClass * klass)
gstelement_class->get_clock = GST_DEBUG_FUNCPTR (gst_bin_get_clock_func);
gstelement_class->set_clock = GST_DEBUG_FUNCPTR (gst_bin_set_clock_func);
gstelement_class->set_manager = GST_DEBUG_FUNCPTR (gst_bin_set_manager);
gstelement_class->set_bus = GST_DEBUG_FUNCPTR (gst_bin_set_bus);
gstelement_class->set_scheduler = GST_DEBUG_FUNCPTR (gst_bin_set_scheduler);
gstelement_class->send_event = GST_DEBUG_FUNCPTR (gst_bin_send_event);
......@@ -192,9 +192,23 @@ gst_bin_class_init (GstBinClass * klass)
static void
gst_bin_init (GstBin * bin)
{
GstBus *bus;
bin->numchildren = 0;
bin->children = NULL;
bin->children_cookie = 0;
bin->eosed = NULL;
/* Set up a bus for listening to child elements,
* and one for sending messages up the hierarchy */
bus = g_object_new (gst_bus_get_type (), NULL);
bin->child_bus = bus;
gst_bus_set_sync_handler (bus, (GstBusSyncHandler) bin_bus_handler, bin);
bus = g_object_new (gst_bus_get_type (), NULL);
gst_element_set_bus (GST_ELEMENT (bin), bus);
/* set_bus refs the bus via gst_object_replace, we drop our ref */
gst_object_unref (bus);
}
/**
......@@ -281,29 +295,6 @@ gst_bin_get_clock_func (GstElement * element)
return result;
}
/* set the bus on all of the children in this bin
*
* MT safe
*/
static void
gst_bin_set_bus (GstElement * element, GstBus * bus)
{
GList *children;
GstBin *bin;
bin = GST_BIN (element);
parent_class->set_bus (element, bus);
GST_LOCK (bin);
for (children = bin->children; children; children = g_list_next (children)) {
GstElement *child = GST_ELEMENT (children->data);
gst_element_set_bus (child, bus);
}
GST_UNLOCK (bin);
}
/* set the scheduler on all of the children in this bin
*
* MT safe
......@@ -348,6 +339,53 @@ gst_bin_set_manager (GstElement * element, GstPipeline * manager)
GST_UNLOCK (element);
}
static gboolean
is_eos (GstBin * bin)
{
GstIterator *sinks;
gboolean result = TRUE;
gboolean done = FALSE;
sinks = gst_bin_iterate_sinks (bin);
while (!done) {
gpointer data;
switch (gst_iterator_next (sinks, &data)) {
case GST_ITERATOR_OK:
{
GstElement *element = GST_ELEMENT (data);
GList *eosed;
gchar *name;
name = gst_element_get_name (element);
eosed = g_list_find (bin->eosed, element);
if (!eosed) {
GST_DEBUG ("element %s did not post EOS yet", name);
result = FALSE;
done = TRUE;
} else {
GST_DEBUG ("element %s posted EOS", name);
}
g_free (name);
gst_object_unref (element);
break;
}
case GST_ITERATOR_RESYNC:
result = TRUE;
gst_iterator_resync (sinks);
break;
case GST_ITERATOR_DONE:
done = TRUE;
break;
default:
g_assert_not_reached ();
break;
}
}
gst_iterator_free (sinks);
return result;
}
/* add an element to this bin
*
* MT safe
......@@ -389,7 +427,7 @@ gst_bin_add_func (GstBin * bin, GstElement * element)
bin->children_cookie++;
gst_element_set_manager (element, GST_ELEMENT (bin)->manager);
gst_element_set_bus (element, GST_ELEMENT (bin)->bus);
gst_element_set_bus (element, bin->child_bus);
gst_element_set_scheduler (element, GST_ELEMENT_SCHEDULER (bin));
gst_element_set_clock (element, GST_ELEMENT_CLOCK (bin));
......@@ -697,11 +735,11 @@ bin_element_is_sink (GstElement * child, GstBin * bin)
* get its name safely. */
GST_LOCK (child);
is_sink = GST_FLAG_IS_SET (child, GST_ELEMENT_IS_SINK);
GST_UNLOCK (child);
GST_CAT_DEBUG_OBJECT (GST_CAT_STATES, bin,
"child %s %s sink", GST_OBJECT_NAME (child), is_sink ? "is" : "is not");
GST_UNLOCK (child);
return is_sink ? 0 : 1;
}
......@@ -1112,6 +1150,12 @@ gst_bin_change_state (GstElement * element)
if (pending == GST_STATE_VOID_PENDING)
return GST_STATE_SUCCESS;
/* Clear eosed element list on READY-> PAUSED */
if (GST_STATE_TRANSITION (element) == GST_STATE_READY_TO_PAUSED) {
g_list_free (bin->eosed);
bin->eosed = NULL;
}
/* all elements added to these queues should have their refcount
* incremented */
elem_queue = g_queue_new ();
......@@ -1335,6 +1379,10 @@ gst_bin_dispose (GObject * object)
/* ref to not hit 0 again */
gst_object_ref (object);
g_list_free (bin->eosed);
gst_object_unref (bin->child_bus);
gst_element_set_bus (GST_ELEMENT (bin), NULL);
while (bin->children) {
gst_bin_remove (bin, GST_ELEMENT (bin->children->data));
}
......@@ -1393,6 +1441,39 @@ gst_bin_send_event (GstElement * element, GstEvent * event)
return res;
}
/* FIXME, make me threadsafe */
static GstBusSyncReply
bin_bus_handler (GstBus * bus, GstMessage * message, GstBin * bin)
{
/* we don't want messages from the streaming thread while we're doing the
* state change. We do want them from the state change functions. */
switch (GST_MESSAGE_TYPE (message)) {
case GST_MESSAGE_EOS:
GST_DEBUG_OBJECT (bin, "got EOS message from %s",
gst_object_get_name (GST_MESSAGE_SRC (message)));
GST_LOCK (bin->child_bus);
bin->eosed = g_list_prepend (bin->eosed, GST_MESSAGE_SRC (message));
GST_UNLOCK (bin->child_bus);
if (is_eos (bin)) {
GST_DEBUG_OBJECT (bin, "all sinks posted EOS");
gst_bus_post (GST_ELEMENT (bin)->bus,
gst_message_new_eos (GST_OBJECT (bin)));
}
/* we drop all EOS messages */
gst_message_unref (message);
break;
default:
/* Send all other messages upward */
gst_bus_post (GST_ELEMENT (bin)->bus, message);
break;
}
return GST_BUS_DROP;
}
static gboolean
gst_bin_query (GstElement * element, GstQuery * query)
{
......
......@@ -26,6 +26,7 @@
#include <gst/gstelement.h>
#include <gst/gstiterator.h>
#include <gst/gstbus.h>
G_BEGIN_DECLS
......@@ -70,6 +71,9 @@ struct _GstBin {
GList *children;
guint32 children_cookie;
GstBus *child_bus; /* Bus we set on our children */
GList *eosed; /* list of elements that posted EOS */
/*< private >*/
gpointer _gst_reserved[GST_PADDING];
};
......
......@@ -63,8 +63,6 @@ static void gst_pipeline_get_property (GObject * object, guint prop_id,
static gboolean gst_pipeline_send_event (GstElement * element,
GstEvent * event);
static GstBusSyncReply pipeline_bus_handler (GstBus * bus, GstMessage * message,
GstPipeline * pipeline);
static GstClock *gst_pipeline_get_clock_func (GstElement * element);
static GstElementStateReturn gst_pipeline_change_state (GstElement * element);
......@@ -141,7 +139,6 @@ gst_pipeline_init (GTypeInstance * instance, gpointer g_class)
{
GstScheduler *scheduler;
GstPipeline *pipeline = GST_PIPELINE (instance);
GstBus *bus;
/* get an instance of the default scheduler */
scheduler = gst_scheduler_factory_make (NULL, GST_ELEMENT (pipeline));
......@@ -159,18 +156,10 @@ gst_pipeline_init (GTypeInstance * instance, gpointer g_class)
gst_object_unref ((GstObject *) scheduler);
}
pipeline->eosed = NULL;
pipeline->delay = DEFAULT_DELAY;
pipeline->play_timeout = DEFAULT_PLAY_TIMEOUT;
/* we are our own manager */
GST_ELEMENT_MANAGER (pipeline) = pipeline;
bus = g_object_new (gst_bus_get_type (), NULL);
gst_bus_set_sync_handler (bus,
(GstBusSyncHandler) pipeline_bus_handler, pipeline);
gst_element_set_bus (GST_ELEMENT (pipeline), bus);
/* set_bus refs the bus via gst_object_replace, we drop our ref */
gst_object_unref ((GstObject *) bus);
}
static void
......@@ -178,7 +167,6 @@ gst_pipeline_dispose (GObject * object)
{
GstPipeline *pipeline = GST_PIPELINE (object);
gst_element_set_bus (GST_ELEMENT (pipeline), NULL);
gst_scheduler_reset (GST_ELEMENT_SCHEDULER (object));
gst_element_set_scheduler (GST_ELEMENT (pipeline), NULL);
gst_object_replace ((GstObject **) & pipeline->fixed_clock, NULL);
......@@ -228,53 +216,6 @@ gst_pipeline_get_property (GObject * object, guint prop_id,
GST_UNLOCK (pipeline);
}
static gboolean
is_eos (GstPipeline * pipeline)
{
GstIterator *sinks;
gboolean result = TRUE;
gboolean done = FALSE;
sinks = gst_bin_iterate_sinks (GST_BIN (pipeline));
while (!done) {
gpointer data;
switch (gst_iterator_next (sinks, &data)) {
case GST_ITERATOR_OK:
{
GstElement *element = GST_ELEMENT (data);
GList *eosed;
gchar *name;
name = gst_element_get_name (element);
eosed = g_list_find (pipeline->eosed, element);
if (!eosed) {
GST_DEBUG ("element %s did not post EOS yet", name);
result = FALSE;
done = TRUE;
} else {
GST_DEBUG ("element %s posted EOS", name);
}
g_free (name);
gst_object_unref (element);
break;
}
case GST_ITERATOR_RESYNC:
result = TRUE;
gst_iterator_resync (sinks);
break;
case GST_ITERATOR_DONE:
done = TRUE;
break;
default:
g_assert_not_reached ();
break;
}
}
gst_iterator_free (sinks);
return result;
}
/* sending an event on the pipeline pauses the pipeline if it
* was playing.
*/
......@@ -318,47 +259,6 @@ gst_pipeline_send_event (GstElement * element, GstEvent * event)
return res;
}
/* FIXME, make me threadsafe */
static GstBusSyncReply
pipeline_bus_handler (GstBus * bus, GstMessage * message,
GstPipeline * pipeline)
{
GstBusSyncReply result = GST_BUS_PASS;
gboolean posteos = FALSE;
/* we don't want messages from the streaming thread while we're doing the
* state change. We do want them from the state change functions. */
switch (GST_MESSAGE_TYPE (message)) {
case GST_MESSAGE_EOS:
if (GST_MESSAGE_SRC (message) != GST_OBJECT (pipeline)) {
GST_DEBUG ("got EOS message");
GST_LOCK (bus);
pipeline->eosed =
g_list_prepend (pipeline->eosed, GST_MESSAGE_SRC (message));
GST_UNLOCK (bus);
if (is_eos (pipeline)) {
posteos = TRUE;
GST_DEBUG ("all sinks posted EOS");
}
/* we drop all EOS messages */
result = GST_BUS_DROP;
gst_message_unref (message);
}
break;
case GST_MESSAGE_ERROR:
break;
default:
break;
}
if (posteos) {
gst_bus_post (bus, gst_message_new_eos (GST_OBJECT (pipeline)));
}
return result;
}
/**
* gst_pipeline_new:
* @name: name of new pipeline
......@@ -396,7 +296,6 @@ gst_pipeline_change_state (GstElement * element)
clock = gst_element_get_clock (element);
gst_element_set_clock (element, clock);
gst_object_unref (clock);
pipeline->eosed = NULL;
break;
}
case GST_STATE_PAUSED_TO_PLAYING:
......
......@@ -26,7 +26,6 @@
#include <gst/gsttypes.h>
#include <gst/gstbin.h>
#include <gst/gstbus.h>
G_BEGIN_DECLS
......@@ -54,8 +53,6 @@ struct _GstPipeline {
GstClockTime delay;
GstClockTime play_timeout;
GList *eosed; /* list of elements that posted EOS */
/*< private >*/
gpointer _gst_reserved[GST_PADDING];
};
......
......@@ -402,10 +402,10 @@ gst_base_sink_preroll_queue_empty (GstBaseSink * basesink, GstPad * pad)
GST_PREROLL_UNLOCK (pad);
if (is_buffer) {
GST_DEBUG ("poped buffer %p", obj);
GST_DEBUG ("popped buffer %p", obj);
ret = gst_base_sink_handle_buffer (basesink, GST_BUFFER (obj));
} else {
GST_DEBUG ("poped event %p", obj);
GST_DEBUG ("popped event %p", obj);
gst_base_sink_handle_event (basesink, GST_EVENT (obj));
ret = GST_FLOW_OK;
}
......@@ -427,7 +427,7 @@ gst_base_sink_preroll_queue_flush (GstBaseSink * basesink, GstPad * pad)
GST_DEBUG ("flushing queue %p", basesink);
if (q) {
while ((obj = g_queue_pop_head (q))) {
GST_DEBUG ("poped %p", obj);
GST_DEBUG ("popped %p", obj);
gst_mini_object_unref (obj);
}
}
......@@ -814,6 +814,7 @@ gst_base_sink_handle_event (GstBaseSink * basesink, GstEvent * event)
/* if we are still EOS, we can post the EOS message */
if (basesink->eos) {
/* ok, now we can post the message */
GST_DEBUG_OBJECT (basesink, "Now posting EOS");
gst_element_post_message (GST_ELEMENT (basesink),</