Commit d2c5ea9a authored by Wim Taymans's avatar Wim Taymans Committed by Wim Taymans
Browse files

TaskPool: remove _set_func()

Remove the static function set on the TaskPool before _prepare() is called and
allow for assigning a function to a Task when we _push().
Update the examples
parent 4cc2e056
...@@ -2173,8 +2173,8 @@ gst_tag_setter_get_type ...@@ -2173,8 +2173,8 @@ gst_tag_setter_get_type
<TITLE>GstTaskPool</TITLE> <TITLE>GstTaskPool</TITLE>
GstTaskPool GstTaskPool
GstTaskPoolClass GstTaskPoolClass
GstTaskPoolFunction
gst_task_pool_new gst_task_pool_new
gst_task_pool_set_func
gst_task_pool_prepare gst_task_pool_prepare
gst_task_pool_push gst_task_pool_push
gst_task_pool_join gst_task_pool_join
......
...@@ -92,7 +92,7 @@ static void gst_task_class_init (GstTaskClass * klass); ...@@ -92,7 +92,7 @@ static void gst_task_class_init (GstTaskClass * klass);
static void gst_task_init (GstTask * task); static void gst_task_init (GstTask * task);
static void gst_task_finalize (GObject * object); static void gst_task_finalize (GObject * object);
static void gst_task_func (GstTask * task, GstTaskClass * tclass); static void gst_task_func (GstTask * task);
static GStaticMutex pool_lock = G_STATIC_MUTEX_INIT; static GStaticMutex pool_lock = G_STATIC_MUTEX_INIT;
...@@ -112,7 +112,6 @@ init_klass_pool (GstTaskClass * klass) ...@@ -112,7 +112,6 @@ init_klass_pool (GstTaskClass * klass)
gst_object_unref (klass->pool); gst_object_unref (klass->pool);
} }
klass->pool = gst_task_pool_new (); klass->pool = gst_task_pool_new ();
gst_task_pool_set_func (klass->pool, (GFunc) gst_task_func, klass);
gst_task_pool_prepare (klass->pool, NULL); gst_task_pool_prepare (klass->pool, NULL);
g_static_mutex_unlock (&pool_lock); g_static_mutex_unlock (&pool_lock);
} }
...@@ -177,7 +176,7 @@ gst_task_finalize (GObject * object) ...@@ -177,7 +176,7 @@ gst_task_finalize (GObject * object)
} }
static void static void
gst_task_func (GstTask * task, GstTaskClass * tclass) gst_task_func (GstTask * task)
{ {
GStaticRecMutex *lock; GStaticRecMutex *lock;
GThread *tself; GThread *tself;
...@@ -564,7 +563,9 @@ start_task (GstTask * task) ...@@ -564,7 +563,9 @@ start_task (GstTask * task)
/* push on the thread pool, we remember the original pool because the user /* push on the thread pool, we remember the original pool because the user
* could change it later on and then we join to the wrong pool. */ * could change it later on and then we join to the wrong pool. */
priv->pool_id = gst_object_ref (priv->pool); priv->pool_id = gst_object_ref (priv->pool);
priv->id = gst_task_pool_push (priv->pool_id, task, &error); priv->id =
gst_task_pool_push (priv->pool_id, (GstTaskPoolFunction) gst_task_func,
task, &error);
if (error != NULL) { if (error != NULL) {
g_warning ("failed to create thread: %s", error->message); g_warning ("failed to create thread: %s", error->message);
......
...@@ -51,12 +51,30 @@ static void gst_task_pool_finalize (GObject * object); ...@@ -51,12 +51,30 @@ static void gst_task_pool_finalize (GObject * object);
G_DEFINE_TYPE_WITH_CODE (GstTaskPool, gst_task_pool, GST_TYPE_OBJECT, _do_init); G_DEFINE_TYPE_WITH_CODE (GstTaskPool, gst_task_pool, GST_TYPE_OBJECT, _do_init);
typedef struct
{
GstTaskPoolFunction func;
gpointer user_data;
} TaskData;
static void static void
default_prepare (GstTaskPool * pool, GFunc func, gpointer user_data, default_func (TaskData * tdata, GstTaskPool * pool)
GError ** error) {
GstTaskPoolFunction func;
gpointer user_data;
func = tdata->func;
user_data = tdata->user_data;
g_slice_free (TaskData, tdata);
func (user_data);
}
static void
default_prepare (GstTaskPool * pool, GError ** error)
{ {
GST_OBJECT_LOCK (pool); GST_OBJECT_LOCK (pool);
pool->pool = g_thread_pool_new ((GFunc) func, user_data, -1, FALSE, NULL); pool->pool = g_thread_pool_new ((GFunc) default_func, pool, -1, FALSE, NULL);
GST_OBJECT_UNLOCK (pool); GST_OBJECT_UNLOCK (pool);
} }
...@@ -75,11 +93,21 @@ default_cleanup (GstTaskPool * pool) ...@@ -75,11 +93,21 @@ default_cleanup (GstTaskPool * pool)
} }
static gpointer static gpointer
default_push (GstTaskPool * pool, gpointer data, GError ** error) default_push (GstTaskPool * pool, GstTaskPoolFunction func,
gpointer user_data, GError ** error)
{ {
TaskData *tdata;
tdata = g_slice_new (TaskData);
tdata->func = func;
tdata->user_data = user_data;
GST_OBJECT_LOCK (pool); GST_OBJECT_LOCK (pool);
if (pool->pool) if (pool->pool)
g_thread_pool_push (pool->pool, data, error); g_thread_pool_push (pool->pool, tdata, error);
else {
g_slice_free (TaskData, tdata);
}
GST_OBJECT_UNLOCK (pool); GST_OBJECT_UNLOCK (pool);
return NULL; return NULL;
...@@ -88,7 +116,7 @@ default_push (GstTaskPool * pool, gpointer data, GError ** error) ...@@ -88,7 +116,7 @@ default_push (GstTaskPool * pool, gpointer data, GError ** error)
static void static void
default_join (GstTaskPool * pool, gpointer id) default_join (GstTaskPool * pool, gpointer id)
{ {
/* does nothing, we can't join for threads from the threadpool */ /* we do nothing here, we can't join from the pools */
} }
static void static void
...@@ -141,16 +169,6 @@ gst_task_pool_new (void) ...@@ -141,16 +169,6 @@ gst_task_pool_new (void)
return pool; return pool;
} }
void
gst_task_pool_set_func (GstTaskPool * pool, GFunc func, gpointer user_data)
{
g_return_if_fail (GST_IS_TASK_POOL (pool));
pool->func = func;
pool->user_data = user_data;
}
/** /**
* gst_task_pool_prepare: * gst_task_pool_prepare:
* @pool: a #GstTaskPool * @pool: a #GstTaskPool
...@@ -170,7 +188,7 @@ gst_task_pool_prepare (GstTaskPool * pool, GError ** error) ...@@ -170,7 +188,7 @@ gst_task_pool_prepare (GstTaskPool * pool, GError ** error)
klass = GST_TASK_POOL_GET_CLASS (pool); klass = GST_TASK_POOL_GET_CLASS (pool);
if (klass->prepare) if (klass->prepare)
klass->prepare (pool, pool->func, pool->user_data, error); klass->prepare (pool, error);
} }
/** /**
...@@ -198,7 +216,8 @@ gst_task_pool_cleanup (GstTaskPool * pool) ...@@ -198,7 +216,8 @@ gst_task_pool_cleanup (GstTaskPool * pool)
/** /**
* gst_task_pool_push: * gst_task_pool_push:
* @pool: a #GstTaskPool * @pool: a #GstTaskPool
* @data: data to pass to the thread function * @func: the function to call
* @user_data: data to pass to @func
* @error: return location for an error * @error: return location for an error
* *
* Start the execution of a new thread from @pool. * Start the execution of a new thread from @pool.
...@@ -208,7 +227,8 @@ gst_task_pool_cleanup (GstTaskPool * pool) ...@@ -208,7 +227,8 @@ gst_task_pool_cleanup (GstTaskPool * pool)
* errors. * errors.
*/ */
gpointer gpointer
gst_task_pool_push (GstTaskPool * pool, gpointer data, GError ** error) gst_task_pool_push (GstTaskPool * pool, GstTaskPoolFunction func,
gpointer user_data, GError ** error)
{ {
GstTaskPoolClass *klass; GstTaskPoolClass *klass;
...@@ -219,7 +239,7 @@ gst_task_pool_push (GstTaskPool * pool, gpointer data, GError ** error) ...@@ -219,7 +239,7 @@ gst_task_pool_push (GstTaskPool * pool, gpointer data, GError ** error)
if (klass->push == NULL) if (klass->push == NULL)
goto not_supported; goto not_supported;
return klass->push (pool, data, error); return klass->push (pool, func, user_data, error);
/* ERRORS */ /* ERRORS */
not_supported: not_supported:
......
...@@ -38,6 +38,8 @@ G_BEGIN_DECLS ...@@ -38,6 +38,8 @@ G_BEGIN_DECLS
typedef struct _GstTaskPool GstTaskPool; typedef struct _GstTaskPool GstTaskPool;
typedef struct _GstTaskPoolClass GstTaskPoolClass; typedef struct _GstTaskPoolClass GstTaskPoolClass;
typedef void (*GstTaskPoolFunction) (void *data);
/** /**
* GstTaskPool: * GstTaskPool:
* *
...@@ -47,9 +49,6 @@ struct _GstTaskPool { ...@@ -47,9 +49,6 @@ struct _GstTaskPool {
GstObject object; GstObject object;
/*< private >*/ /*< private >*/
GFunc func;
gpointer user_data;
GThreadPool *pool; GThreadPool *pool;
gpointer _gst_reserved[GST_PADDING]; gpointer _gst_reserved[GST_PADDING];
...@@ -69,11 +68,11 @@ struct _GstTaskPoolClass { ...@@ -69,11 +68,11 @@ struct _GstTaskPoolClass {
GstObjectClass parent_class; GstObjectClass parent_class;
/*< public >*/ /*< public >*/
void (*prepare) (GstTaskPool *pool, GFunc func, void (*prepare) (GstTaskPool *pool, GError **error);
gpointer user_data, GError **error);
void (*cleanup) (GstTaskPool *pool); void (*cleanup) (GstTaskPool *pool);
gpointer (*push) (GstTaskPool *pool, gpointer data, GError **error); gpointer (*push) (GstTaskPool *pool, GstTaskPoolFunction func,
gpointer user_data, GError **error);
void (*join) (GstTaskPool *pool, gpointer id); void (*join) (GstTaskPool *pool, gpointer id);
/*< private >*/ /*< private >*/
...@@ -83,14 +82,10 @@ struct _GstTaskPoolClass { ...@@ -83,14 +82,10 @@ struct _GstTaskPoolClass {
GType gst_task_pool_get_type (void); GType gst_task_pool_get_type (void);
GstTaskPool * gst_task_pool_new (void); GstTaskPool * gst_task_pool_new (void);
void gst_task_pool_set_func (GstTaskPool *pool,
GFunc func, gpointer user_data);
void gst_task_pool_prepare (GstTaskPool *pool, GError **error); void gst_task_pool_prepare (GstTaskPool *pool, GError **error);
gpointer gst_task_pool_push (GstTaskPool *pool, gpointer data, gpointer gst_task_pool_push (GstTaskPool *pool, GstTaskPoolFunction func,
GError **error); gpointer user_data, GError **error);
void gst_task_pool_join (GstTaskPool *pool, gpointer id); void gst_task_pool_join (GstTaskPool *pool, gpointer id);
void gst_task_pool_cleanup (GstTaskPool *pool); void gst_task_pool_cleanup (GstTaskPool *pool);
......
stream-status stream-status
rtpool-test
*.bb *.bb
*.bbg *.bbg
*.da *.da
......
...@@ -17,18 +17,26 @@ ...@@ -17,18 +17,26 @@
* Boston, MA 02111-1307, USA. * Boston, MA 02111-1307, USA.
*/ */
#include <pthread.h>
#include "testrtpool.h" #include "testrtpool.h"
static void test_rt_pool_class_init (TestRTPoolClass * klass); static void test_rt_pool_class_init (TestRTPoolClass * klass);
static void test_rt_pool_init (TestRTPool * pool); static void test_rt_pool_init (TestRTPool * pool);
static void test_rt_pool_finalize (GObject * object); static void test_rt_pool_finalize (GObject * object);
typedef struct
{
pthread_t thread;
} TestRTId;
G_DEFINE_TYPE (TestRTPool, test_rt_pool, GST_TYPE_TASK_POOL); G_DEFINE_TYPE (TestRTPool, test_rt_pool, GST_TYPE_TASK_POOL);
static void static void
default_prepare (GstTaskPool * pool, GFunc func, gpointer user_data, default_prepare (GstTaskPool * pool, GError ** error)
GError ** error)
{ {
/* we don't do anything here. We could construct a pool of threads here that
* we could reuse later but we don't */
g_message ("prepare Realtime pool %p", pool); g_message ("prepare Realtime pool %p", pool);
} }
...@@ -39,19 +47,47 @@ default_cleanup (GstTaskPool * pool) ...@@ -39,19 +47,47 @@ default_cleanup (GstTaskPool * pool)
} }
static gpointer static gpointer
default_push (GstTaskPool * pool, gpointer data, GError ** error) default_push (GstTaskPool * pool, GstTaskPoolFunction func, gpointer data,
GError ** error)
{ {
g_message ("pushing Realtime pool %p", pool); TestRTId *tid;
gint res;
pthread_attr_t attr;
//struct sched_param param;
g_message ("pushing Realtime pool %p, %p", pool, func);
*error = g_error_new (1, 1, "not supported"); tid = g_slice_new0 (TestRTId);
return NULL; pthread_attr_init (&attr);
/*
pthread_attr_setschedpolicy (&attr, SCHED_RR);
param.sched_priority = 50;
pthread_attr_setschedparam (&attr, &param);
*/
res = pthread_create (&tid->thread, &attr, (void *(*)(void *)) func, data);
if (res < 0) {
g_set_error (error, G_THREAD_ERROR, G_THREAD_ERROR_AGAIN,
"Error creating thread: %s", g_strerror (res));
g_slice_free (TestRTId, tid);
tid = NULL;
}
return tid;
} }
static void static void
default_join (GstTaskPool * pool, gpointer id) default_join (GstTaskPool * pool, gpointer id)
{ {
TestRTId *tid = (TestRTId *) id;
g_message ("joining Realtime pool %p", pool); g_message ("joining Realtime pool %p", pool);
pthread_join (tid->thread, NULL);
g_slice_free (TestRTId, tid);
} }
static void static void
......
...@@ -950,7 +950,6 @@ EXPORTS ...@@ -950,7 +950,6 @@ EXPORTS
gst_task_pool_new gst_task_pool_new
gst_task_pool_prepare gst_task_pool_prepare
gst_task_pool_push gst_task_pool_push
gst_task_pool_set_func
gst_task_set_lock gst_task_set_lock
gst_task_set_pool gst_task_set_pool
gst_task_set_priority gst_task_set_priority
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment