Commit e7f54b1e authored by Wim Taymans's avatar Wim Taymans

- Added a function to get the currently executing cothread

Original commit message from CVS:
- Added a function to get the currently executing cothread
- Removed some useless includes
- _interrupt now returns a boolean so the behaviour after the interrupt
can be controlled by the scheduler.
- Added a better way to set/get the default scheduler.
- make thread and pipeline get the default scheduler.
parent 21987266
......@@ -308,6 +308,21 @@ cothread_current_main (void)
return ctx->threads[0];
}
/**
* cothread_current:
*
* Get the currenttly executing cothread
*
* Returns: the #cothread_state of the current cothread
*/
cothread_state *
cothread_current (void)
{
cothread_context *ctx = pthread_getspecific (_cothread_key);
return ctx->threads[ctx->current];
}
static void
cothread_stub (void)
{
......
......@@ -76,7 +76,6 @@ void cothread_setfunc (cothread_state *thread, cothread_func func,
int argc, char **argv);
void cothread_stop (cothread_state *thread);
int cothread_getcurrent (void);
void cothread_switch (cothread_state *thread);
void cothread_set_data (cothread_state *thread, gchar *key, gpointer data);
gpointer cothread_get_data (cothread_state *thread, gchar *key);
......@@ -87,5 +86,6 @@ void cothread_unlock (cothread_state *thread);
cothread_state* cothread_main (cothread_context *ctx);
cothread_state* cothread_current_main (void);
cothread_state* cothread_current (void);
#endif /* __COTHREAD_H__ */
......@@ -47,7 +47,6 @@
#include <gst/gstutils.h>
#include <gst/gsttrace.h>
#include <gst/gstxml.h>
#include <gst/cothreads.h>
#include <gst/gstscheduler.h>
#include <gst/gsttimecache.h>
#include <gst/gstevent.h>
......
......@@ -25,7 +25,6 @@
#define __GST_BIN_H__
#include <gst/gstelement.h>
#include <gst/cothreads.h>
#ifdef __cplusplus
extern "C" {
......
......@@ -932,7 +932,6 @@ gst_element_set_state (GstElement *element, GstElementState state)
g_return_val_if_fail (element != NULL, GST_STATE_FAILURE);
g_return_val_if_fail (GST_IS_ELEMENT (element), GST_STATE_FAILURE);
g_return_val_if_fail (element->sched != NULL, GST_STATE_FAILURE);
GST_DEBUG_ELEMENT (GST_CAT_STATES,element, "setting state from %s to %s\n",
gst_element_statename(GST_STATE(element)),
......@@ -1292,12 +1291,14 @@ gst_element_yield (GstElement *element)
}
}
void
gboolean
gst_element_interrupt (GstElement *element)
{
if (GST_ELEMENT_SCHED (element)) {
gst_scheduler_interrupt (GST_ELEMENT_SCHED (element), element);
return gst_scheduler_interrupt (GST_ELEMENT_SCHED (element), element);
}
else
return FALSE;
}
/**
......
......@@ -29,7 +29,6 @@
#include <gst/gsttypes.h>
#include <gst/gstobject.h>
#include <gst/gstpad.h>
#include <gst/cothreads.h>
#include <gst/gstpluginfeature.h>
#ifdef __cplusplus
......@@ -103,6 +102,7 @@ typedef enum {
#define GST_ELEMENT_IS_THREAD_SUGGESTED(obj) (GST_FLAG_IS_SET(obj,GST_ELEMENT_THREAD_SUGGESTED))
#define GST_ELEMENT_IS_EOS(obj) (GST_FLAG_IS_SET(obj,GST_ELEMENT_EOS))
#define GST_ELEMENT_IS_EVENT_AWARE(obj) (GST_FLAG_IS_SET(obj,GST_ELEMENT_EVENT_AWARE))
#define GST_ELEMENT_IS_DECOUPLED(obj) (GST_FLAG_IS_SET(obj,GST_ELEMENT_DECOUPLED))
#define GST_ELEMENT_NAME(obj) (GST_OBJECT_NAME(obj))
#define GST_ELEMENT_PARENT(obj) (GST_OBJECT_PARENT(obj))
......@@ -184,7 +184,7 @@ void gst_element_set_parent (GstElement *element, Gs
GstObject* gst_element_get_parent (GstElement *element);
void gst_element_yield (GstElement *element);
void gst_element_interrupt (GstElement *element);
gboolean gst_element_interrupt (GstElement *element);
void gst_element_set_sched (GstElement *element, GstScheduler *sched);
GstScheduler* gst_element_get_sched (GstElement *element);
......
......@@ -494,7 +494,9 @@ gst_pad_push_func(GstPad *pad, GstBuffer *buf)
GST_DEBUG_FUNCPTR_NAME(GST_RPAD_CHAINFUNC(GST_RPAD_PEER(pad))));
(GST_RPAD_CHAINFUNC(GST_RPAD_PEER(pad)))(pad,buf);
} else {
GST_DEBUG (GST_CAT_DATAFLOW,"got a problem here: default pad_push handler in place, no chain function\n");
GST_DEBUG (GST_CAT_DATAFLOW,"default pad_push handler in place, no chain function\n");
g_warning ("(internal error) default pad_push in place for pad %s:%s but it has no chain function",
GST_DEBUG_PAD_NAME (pad));
}
}
......
......@@ -28,7 +28,6 @@
#include <gst/gstobject.h>
#include <gst/gstbuffer.h>
#include <gst/cothreads.h>
#include <gst/gstcaps.h>
#include <gst/gstevent.h>
......
......@@ -97,21 +97,22 @@ gst_pipeline_class_init (GstPipelineClass *klass)
static void
gst_pipeline_init (GstPipeline *pipeline)
{
const gchar *schedname;
GstScheduler *scheduler;
/* we're a manager by default */
GST_FLAG_SET (pipeline, GST_BIN_FLAG_MANAGER);
scheduler = gst_schedulerfactory_make ("basic", GST_ELEMENT (pipeline));
schedname = gst_schedulerfactory_get_default_name ();
scheduler = gst_schedulerfactory_make (schedname, GST_ELEMENT (pipeline));
GST_ELEMENT_SCHED (pipeline) = scheduler;
gst_object_ref (GST_OBJECT (scheduler));
gst_object_sink (GST_OBJECT (scheduler));
gst_scheduler_setup (scheduler);
GST_DEBUG (GST_CAT_PIPELINE, "pipeline's scheduler is %p\n", scheduler);
}
static void
......@@ -121,8 +122,10 @@ gst_pipeline_dispose (GObject *object)
G_OBJECT_CLASS (parent_class)->dispose (object);
gst_scheduler_reset (GST_ELEMENT_SCHED (pipeline));
gst_object_unref (GST_OBJECT (GST_ELEMENT_SCHED (pipeline)));
if (GST_ELEMENT_SCHED (pipeline)) {
gst_scheduler_reset (GST_ELEMENT_SCHED (pipeline));
gst_object_unref (GST_OBJECT (GST_ELEMENT_SCHED (pipeline)));
}
}
/**
......
......@@ -58,6 +58,7 @@ GType gst_pipeline_get_type (void);
GstElement* gst_pipeline_new (const gchar *name);
#define gst_pipeline_destroy(pipeline) gst_object_destroy(GST_OBJECT(pipeline))
#ifdef __cplusplus
}
#endif /* __cplusplus */
......
......@@ -284,7 +284,6 @@ gst_queue_chain (GstPad *pad, GstBuffer *buf)
queue = GST_QUEUE (GST_OBJECT_PARENT (pad));
reader = FALSE;
restart:
/* we have to lock the queue since we span threads */
GST_DEBUG_ELEMENT (GST_CAT_DATAFLOW, queue, "locking t:%ld\n", pthread_self ());
......@@ -353,7 +352,8 @@ restart:
while (GST_STATE_PENDING (queue) != GST_STATE_VOID_PENDING) {
GST_DEBUG_ELEMENT (GST_CAT_DATAFLOW, queue, "interrupted!!\n");
g_mutex_unlock (queue->qlock);
gst_element_interrupt (GST_ELEMENT (queue));
if (gst_element_interrupt (GST_ELEMENT (queue)))
return;
goto restart;
}
if (GST_STATE (queue) != GST_STATE_PLAYING) {
......@@ -419,7 +419,6 @@ gst_queue_get (GstPad *pad)
queue = GST_QUEUE (GST_OBJECT_PARENT (pad));
writer = FALSE;
restart:
/* have to lock for thread-safety */
GST_DEBUG_ELEMENT (GST_CAT_DATAFLOW, queue, "locking t:%ld\n", pthread_self ());
......@@ -434,7 +433,8 @@ restart:
while (GST_STATE_PENDING (queue) != GST_STATE_VOID_PENDING) {
GST_DEBUG_ELEMENT (GST_CAT_DATAFLOW, queue, "interrupted!!\n");
g_mutex_unlock (queue->qlock);
gst_element_interrupt (GST_ELEMENT (queue));
if (gst_element_interrupt (GST_ELEMENT (queue)))
return NULL;
goto restart;
}
if (GST_STATE (queue) != GST_STATE_PLAYING) {
......@@ -442,7 +442,7 @@ restart:
if (!queue->may_deadlock) {
g_mutex_unlock (queue->qlock);
gst_element_error (GST_ELEMENT (queue), "deadlock found, sink pad elements are shut down");
return NULL;
goto restart;
}
else {
gst_element_info (GST_ELEMENT (queue), "waiting for the app to restart sink pad elements");
......
......@@ -31,6 +31,8 @@ static void gst_scheduler_init (GstScheduler *sched);
static GstObjectClass *parent_class = NULL;
static gchar *_default_name = NULL;
GType
gst_scheduler_get_type (void)
{
......@@ -282,15 +284,20 @@ gst_scheduler_yield (GstScheduler *sched, GstElement *element)
* @element: the element requesting an interrupt
*
* Tell the scheduler to interrupt execution of this element.
*
* Retruns: TRUE if the element should return NULL from the chain/get
* function.
*/
void
gboolean
gst_scheduler_interrupt (GstScheduler *sched, GstElement *element)
{
g_return_if_fail (GST_IS_SCHEDULER (sched));
g_return_if_fail (GST_IS_ELEMENT (element));
g_return_val_if_fail (GST_IS_SCHEDULER (sched), FALSE);
g_return_val_if_fail (GST_IS_ELEMENT (element), FALSE);
if (CLASS (sched)->interrupt)
CLASS (sched)->interrupt (sched, element);
return CLASS (sched)->interrupt (sched, element);
return FALSE;
}
/**
......@@ -387,6 +394,7 @@ gst_schedulerfactory_class_init (GstSchedulerFactoryClass *klass)
#endif
_gst_schedulerfactories = NULL;
_default_name = g_strdup ("basic");
}
static void
......@@ -536,6 +544,21 @@ gst_schedulerfactory_make (const gchar *name, GstElement *parent)
return gst_schedulerfactory_create (factory, parent);
}
void
gst_schedulerfactory_set_default_name (const gchar* name)
{
if (_default_name)
g_free (_default_name);
_default_name = g_strdup (name);
}
const gchar*
gst_schedulerfactory_get_default_name (void)
{
return _default_name;
}
#ifndef GST_DISABLE_REGISTRY
static xmlNodePtr
gst_schedulerfactory_save_thyself (GstObject *object, xmlNodePtr parent)
......
......@@ -77,7 +77,7 @@ struct _GstSchedulerClass {
void (*lock_element) (GstScheduler *sched, GstElement *element);
void (*unlock_element) (GstScheduler *sched, GstElement *element);
void (*yield) (GstScheduler *sched, GstElement *element);
void (*interrupt) (GstScheduler *sched, GstElement *element);
gboolean (*interrupt) (GstScheduler *sched, GstElement *element);
void (*error) (GstScheduler *sched, GstElement *element);
void (*pad_connect) (GstScheduler *sched, GstPad *srcpad, GstPad *sinkpad);
void (*pad_disconnect) (GstScheduler *sched, GstPad *srcpad, GstPad *sinkpad);
......@@ -101,7 +101,7 @@ GstElementStateReturn gst_scheduler_state_transition (GstScheduler *sched, GstEl
void gst_scheduler_lock_element (GstScheduler *sched, GstElement *element);
void gst_scheduler_unlock_element (GstScheduler *sched, GstElement *element);
void gst_scheduler_yield (GstScheduler *sched, GstElement *element);
void gst_scheduler_interrupt (GstScheduler *sched, GstElement *element);
gboolean gst_scheduler_interrupt (GstScheduler *sched, GstElement *element);
void gst_scheduler_error (GstScheduler *sched, GstElement *element);
void gst_scheduler_pad_connect (GstScheduler *sched, GstPad *srcpad, GstPad *sinkpad);
void gst_scheduler_pad_disconnect (GstScheduler *sched, GstPad *srcpad, GstPad *sinkpad);
......@@ -150,6 +150,9 @@ GList* gst_schedulerfactory_get_list (void);
GstScheduler* gst_schedulerfactory_create (GstSchedulerFactory *factory, GstElement *parent);
GstScheduler* gst_schedulerfactory_make (const gchar *name, GstElement *parent);
void gst_schedulerfactory_set_default_name (const gchar* name);
const gchar* gst_schedulerfactory_get_default_name (void);
#ifdef __cplusplus
}
......
......@@ -133,19 +133,27 @@ gst_thread_class_init (GstThreadClass *klass)
static void
gst_thread_init (GstThread *thread)
{
const gchar *schedname;
GstScheduler *scheduler;
GST_DEBUG (GST_CAT_THREAD,"initializing thread\n");
GST_DEBUG (GST_CAT_THREAD, "initializing thread\n");
/* we're a manager by default */
GST_FLAG_SET (thread, GST_BIN_FLAG_MANAGER);
thread->lock = g_mutex_new();
thread->cond = g_cond_new();
schedname = gst_schedulerfactory_get_default_name ();
GST_ELEMENT_SCHED(thread) = gst_schedulerfactory_make ("basic", GST_ELEMENT(thread));
GST_DEBUG(GST_CAT_THREAD, "thread's scheduler is %p\n",GST_ELEMENT_SCHED(thread));
scheduler = gst_schedulerfactory_make (schedname, GST_ELEMENT (thread));
thread->ppid = getpid();
GST_ELEMENT_SCHED (thread) = scheduler;
gst_object_ref (GST_OBJECT (scheduler));
gst_object_sink (GST_OBJECT (scheduler));
thread->lock = g_mutex_new ();
thread->cond = g_cond_new ();
thread->ppid = getpid ();
thread->thread_id = -1;
}
......@@ -154,16 +162,17 @@ gst_thread_dispose (GObject *object)
{
GstThread *thread = GST_THREAD (object);
GST_DEBUG (GST_CAT_REFCOUNTING,"dispose\n");
GST_DEBUG (GST_CAT_REFCOUNTING, "dispose\n");
g_mutex_free (thread->lock);
g_cond_free (thread->cond);
G_OBJECT_CLASS (parent_class)->dispose (object);
gst_object_destroy (GST_OBJECT (GST_ELEMENT_SCHED (thread)));
gst_object_unref (GST_OBJECT (GST_ELEMENT_SCHED (thread)));
if (GST_ELEMENT_SCHED (thread)) {
gst_object_destroy (GST_OBJECT (GST_ELEMENT_SCHED (thread)));
gst_object_unref (GST_OBJECT (GST_ELEMENT_SCHED (thread)));
}
}
static void
......
......@@ -101,7 +101,7 @@ static GstElementStateReturn
static void gst_basic_scheduler_lock_element (GstScheduler *sched, GstElement *element);
static void gst_basic_scheduler_unlock_element (GstScheduler *sched, GstElement *element);
static void gst_basic_scheduler_yield (GstScheduler *sched, GstElement *element);
static void gst_basic_scheduler_interrupt (GstScheduler *sched, GstElement *element);
static gboolean gst_basic_scheduler_interrupt (GstScheduler *sched, GstElement *element);
static void gst_basic_scheduler_error (GstScheduler *sched, GstElement *element);
static void gst_basic_scheduler_pad_connect (GstScheduler *sched, GstPad *srcpad, GstPad *sinkpad);
static void gst_basic_scheduler_pad_disconnect (GstScheduler *sched, GstPad *srcpad, GstPad *sinkpad);
......@@ -330,6 +330,7 @@ gst_basic_scheduler_src_wrapper (int argc, char *argv[])
}
}
} while (!GST_ELEMENT_IS_COTHREAD_STOPPING (element));
exit:
GST_FLAG_UNSET (element, GST_ELEMENT_COTHREAD_STOPPING);
GST_DEBUG_LEAVE ("");
......@@ -1056,10 +1057,13 @@ gst_basic_scheduler_yield (GstScheduler *sched, GstElement *element)
}
}
static void
static gboolean
gst_basic_scheduler_interrupt (GstScheduler *sched, GstElement *element)
{
GST_FLAG_SET (element, GST_ELEMENT_COTHREAD_STOPPING);
cothread_switch (cothread_current_main ());
return FALSE;
}
static void
......
......@@ -284,7 +284,6 @@ gst_queue_chain (GstPad *pad, GstBuffer *buf)
queue = GST_QUEUE (GST_OBJECT_PARENT (pad));
reader = FALSE;
restart:
/* we have to lock the queue since we span threads */
GST_DEBUG_ELEMENT (GST_CAT_DATAFLOW, queue, "locking t:%ld\n", pthread_self ());
......@@ -353,7 +352,8 @@ restart:
while (GST_STATE_PENDING (queue) != GST_STATE_VOID_PENDING) {
GST_DEBUG_ELEMENT (GST_CAT_DATAFLOW, queue, "interrupted!!\n");
g_mutex_unlock (queue->qlock);
gst_element_interrupt (GST_ELEMENT (queue));
if (gst_element_interrupt (GST_ELEMENT (queue)))
return;
goto restart;
}
if (GST_STATE (queue) != GST_STATE_PLAYING) {
......@@ -419,7 +419,6 @@ gst_queue_get (GstPad *pad)
queue = GST_QUEUE (GST_OBJECT_PARENT (pad));
writer = FALSE;
restart:
/* have to lock for thread-safety */
GST_DEBUG_ELEMENT (GST_CAT_DATAFLOW, queue, "locking t:%ld\n", pthread_self ());
......@@ -434,7 +433,8 @@ restart:
while (GST_STATE_PENDING (queue) != GST_STATE_VOID_PENDING) {
GST_DEBUG_ELEMENT (GST_CAT_DATAFLOW, queue, "interrupted!!\n");
g_mutex_unlock (queue->qlock);
gst_element_interrupt (GST_ELEMENT (queue));
if (gst_element_interrupt (GST_ELEMENT (queue)))
return NULL;
goto restart;
}
if (GST_STATE (queue) != GST_STATE_PLAYING) {
......@@ -442,7 +442,7 @@ restart:
if (!queue->may_deadlock) {
g_mutex_unlock (queue->qlock);
gst_element_error (GST_ELEMENT (queue), "deadlock found, sink pad elements are shut down");
return NULL;
goto restart;
}
else {
gst_element_info (GST_ELEMENT (queue), "waiting for the app to restart sink pad elements");
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment