Commit c50f4477 authored by Mathieu Duponchelle's avatar Mathieu Duponchelle 🐸 Committed by GStreamer Merge Bot

video-converter: switch to using a task pool ..

.. and make use of that API in videoaggregator.

When setting certain properties, such as cropping or the scaled
size of pads, a new converter is created by videoaggregator.

Before that patch, this implied spawning new threads, potentially
at each aggregate cycle when interpolating pad properties. This
is obviously wasteful, and re-using a task pool removes that
overhead.

Part-of: <gstreamer/gst-plugins-base!896>
parent f3dc83d2
......@@ -415,6 +415,8 @@ struct _GstVideoAggregatorConvertPadPrivate
* and as such are protected with the object lock */
GstStructure *converter_config;
gboolean converter_config_changed;
GstTaskPool *task_pool;
};
G_DEFINE_TYPE_WITH_PRIVATE (GstVideoAggregatorConvertPad,
......@@ -433,6 +435,11 @@ gst_video_aggregator_convert_pad_finalize (GObject * o)
gst_structure_free (vaggpad->priv->converter_config);
vaggpad->priv->converter_config = NULL;
if (vaggpad->priv->task_pool)
gst_task_pool_cleanup (vaggpad->priv->task_pool);
gst_object_replace ((GstObject **) & vaggpad->priv->task_pool, NULL);
G_OBJECT_CLASS (gst_video_aggregator_pad_parent_class)->finalize (o);
}
......@@ -447,6 +454,15 @@ static void
GST_OBJECT_UNLOCK (pad);
}
static guint
get_opt_uint (const GstStructure * config, const gchar * opt, guint def)
{
guint res;
if (!gst_structure_get_uint (config, opt, &res))
res = def;
return res;
}
static gboolean
gst_video_aggregator_convert_pad_prepare_frame (GstVideoAggregatorPad * vpad,
GstVideoAggregator * vagg, GstBuffer * buffer,
......@@ -475,10 +491,22 @@ gst_video_aggregator_convert_pad_prepare_frame (GstVideoAggregatorPad * vpad,
pad->priv->convert = NULL;
if (!gst_video_info_is_equal (&vpad->info, &pad->priv->conversion_info)) {
if (pad->priv->converter_config) {
guint n_threads = get_opt_uint (pad->priv->converter_config,
GST_VIDEO_CONVERTER_OPT_THREADS, 1);
if (n_threads == 0 || n_threads > g_get_num_processors ())
n_threads = g_get_num_processors ();
gst_shared_task_pool_set_max_threads (GST_SHARED_TASK_POOL (pad->priv->
task_pool), n_threads);
}
pad->priv->convert =
gst_video_converter_new (&vpad->info, &pad->priv->conversion_info,
pad->priv->converter_config ? gst_structure_copy (pad->priv->
converter_config) : NULL);
gst_video_converter_new_with_pool (&vpad->info,
&pad->priv->conversion_info,
pad->priv->converter_config ? gst_structure_copy (pad->
priv->converter_config) : NULL, pad->priv->task_pool);
if (!pad->priv->convert) {
GST_WARNING_OBJECT (pad, "No path found for conversion");
return FALSE;
......@@ -486,8 +514,8 @@ gst_video_aggregator_convert_pad_prepare_frame (GstVideoAggregatorPad * vpad,
GST_DEBUG_OBJECT (pad, "This pad will be converted from %s to %s",
gst_video_format_to_string (GST_VIDEO_INFO_FORMAT (&vpad->info)),
gst_video_format_to_string (GST_VIDEO_INFO_FORMAT (&pad->
priv->conversion_info)));
gst_video_format_to_string (GST_VIDEO_INFO_FORMAT (&pad->priv->
conversion_info)));
} else {
GST_DEBUG_OBJECT (pad, "This pad will not need conversion");
}
......@@ -689,8 +717,10 @@ gst_video_aggregator_convert_pad_init (GstVideoAggregatorConvertPad * vaggpad)
vaggpad->priv->convert = NULL;
vaggpad->priv->converter_config = NULL;
vaggpad->priv->converter_config_changed = FALSE;
}
vaggpad->priv->task_pool = gst_shared_task_pool_new ();
gst_task_pool_prepare (vaggpad->priv->task_pool, NULL);
}
/**
* gst_video_aggregator_convert_pad_update_conversion_info:
......
......@@ -34,6 +34,7 @@
#include <glib.h>
#include <string.h>
#include <math.h>
#include <gst/base/base.h>
#include "video-orc.h"
......@@ -120,161 +121,107 @@ typedef void (*GstParallelizedTaskFunc) (gpointer user_data);
typedef struct _GstParallelizedTaskRunner GstParallelizedTaskRunner;
typedef struct _GstParallelizedTaskThread GstParallelizedTaskThread;
struct _GstParallelizedTaskThread
{
GstParallelizedTaskRunner *runner;
guint idx;
GThread *thread;
};
struct _GstParallelizedTaskRunner
{
GstTaskPool *pool;
gboolean own_pool;
guint n_threads;
GstParallelizedTaskThread *threads;
GstQueueArray *tasks;
GstParallelizedTaskFunc func;
gpointer *task_data;
GMutex lock;
GCond cond_todo, cond_done;
gint n_todo, n_done;
gboolean quit;
gint n_todo;
};
static gpointer
static void
gst_parallelized_task_thread_func (gpointer data)
{
GstParallelizedTaskThread *self = data;
#if 0
#ifdef HAVE_PTHREAD
{
pthread_t thread = pthread_self ();
cpu_set_t cpuset;
int r;
CPU_ZERO (&cpuset);
CPU_SET (self->idx, &cpuset);
if ((r = pthread_setaffinity_np (thread, sizeof (cpuset), &cpuset)) != 0)
GST_ERROR ("Failed to set thread affinity for thread %d: %s", self->idx,
g_strerror (r));
}
#endif
#endif
g_mutex_lock (&self->runner->lock);
self->runner->n_done++;
if (self->runner->n_done == self->runner->n_threads - 1)
g_cond_signal (&self->runner->cond_done);
do {
gint idx;
while (self->runner->n_todo == -1 && !self->runner->quit)
g_cond_wait (&self->runner->cond_todo, &self->runner->lock);
if (self->runner->quit)
break;
idx = self->runner->n_todo--;
g_assert (self->runner->n_todo >= -1);
g_mutex_unlock (&self->runner->lock);
GstParallelizedTaskRunner *runner = data;
gint idx;
g_assert (self->runner->func != NULL);
g_mutex_lock (&runner->lock);
idx = runner->n_todo--;
g_assert (runner->n_todo >= -1);
g_mutex_unlock (&runner->lock);
self->runner->func (self->runner->task_data[idx]);
g_assert (runner->func != NULL);
g_mutex_lock (&self->runner->lock);
self->runner->n_done++;
if (self->runner->n_done == self->runner->n_threads - 1)
g_cond_signal (&self->runner->cond_done);
} while (TRUE);
runner->func (runner->task_data[idx]);
}
g_mutex_unlock (&self->runner->lock);
static void
gst_parallelized_task_runner_join (GstParallelizedTaskRunner * self)
{
gboolean joined = FALSE;
return NULL;
while (!joined) {
g_mutex_lock (&self->lock);
if (!(joined = gst_queue_array_is_empty (self->tasks))) {
gpointer task = gst_queue_array_pop_head (self->tasks);
g_mutex_unlock (&self->lock);
gst_task_pool_join (self->pool, task);
} else {
g_mutex_unlock (&self->lock);
}
}
}
static void
gst_parallelized_task_runner_free (GstParallelizedTaskRunner * self)
{
guint i;
g_mutex_lock (&self->lock);
self->quit = TRUE;
g_cond_broadcast (&self->cond_todo);
g_mutex_unlock (&self->lock);
for (i = 1; i < self->n_threads; i++) {
if (!self->threads[i].thread)
continue;
g_thread_join (self->threads[i].thread);
}
gst_parallelized_task_runner_join (self);
gst_queue_array_free (self->tasks);
if (self->own_pool)
gst_task_pool_cleanup (self->pool);
gst_object_unref (self->pool);
g_mutex_clear (&self->lock);
g_cond_clear (&self->cond_todo);
g_cond_clear (&self->cond_done);
g_free (self->threads);
g_free (self);
}
static GstParallelizedTaskRunner *
gst_parallelized_task_runner_new (guint n_threads)
gst_parallelized_task_runner_new (guint n_threads, GstTaskPool * pool)
{
GstParallelizedTaskRunner *self;
guint i;
GError *err = NULL;
if (n_threads == 0)
n_threads = g_get_num_processors ();
self = g_new0 (GstParallelizedTaskRunner, 1);
if (pool) {
self->pool = g_object_ref (pool);
self->own_pool = FALSE;
/* No reason to split up the work between more threads than the
* pool can spawn */
if (GST_IS_SHARED_TASK_POOL (pool))
n_threads =
MIN (n_threads,
gst_shared_task_pool_get_max_threads (GST_SHARED_TASK_POOL (pool)));
} else {
self->pool = gst_shared_task_pool_new ();
self->own_pool = TRUE;
gst_shared_task_pool_set_max_threads (GST_SHARED_TASK_POOL (self->pool),
n_threads);
gst_task_pool_prepare (self->pool, NULL);
}
self->tasks = gst_queue_array_new (n_threads);
self->n_threads = n_threads;
self->threads = g_new0 (GstParallelizedTaskThread, n_threads);
self->quit = FALSE;
self->n_todo = -1;
self->n_done = 0;
g_mutex_init (&self->lock);
g_cond_init (&self->cond_todo);
g_cond_init (&self->cond_done);
/* Set when scheduling a job */
self->func = NULL;
self->task_data = NULL;
for (i = 0; i < n_threads; i++) {
self->threads[i].runner = self;
self->threads[i].idx = i;
/* First thread is the one calling run() */
if (i > 0) {
self->threads[i].thread =
g_thread_try_new ("videoconvert", gst_parallelized_task_thread_func,
&self->threads[i], &err);
if (!self->threads[i].thread)
goto error;
}
}
g_mutex_lock (&self->lock);
while (self->n_done < self->n_threads - 1)
g_cond_wait (&self->cond_done, &self->lock);
self->n_done = 0;
g_mutex_unlock (&self->lock);
return self;
error:
{
GST_ERROR ("Failed to start thread %u: %s", i, err->message);
g_clear_error (&err);
gst_parallelized_task_runner_free (self);
return NULL;
}
}
static void
......@@ -287,22 +234,24 @@ gst_parallelized_task_runner_run (GstParallelizedTaskRunner * self,
self->task_data = task_data;
if (n_threads > 1) {
guint i = 0;
g_mutex_lock (&self->lock);
self->n_todo = self->n_threads - 2;
self->n_done = 0;
g_cond_broadcast (&self->cond_todo);
for (i = 1; i < n_threads; i++) {
gpointer task =
gst_task_pool_push (self->pool, gst_parallelized_task_thread_func,
self, NULL);
/* The return value of push() is unfortunately nullable, and we can't deal with that */
g_assert (task != NULL);
gst_queue_array_push_tail (self->tasks, task);
}
g_mutex_unlock (&self->lock);
}
self->func (self->task_data[self->n_threads - 1]);
if (n_threads > 1) {
g_mutex_lock (&self->lock);
while (self->n_done < self->n_threads - 1)
g_cond_wait (&self->cond_done, &self->lock);
self->n_done = 0;
g_mutex_unlock (&self->lock);
}
gst_parallelized_task_runner_join (self);
self->func = NULL;
self->task_data = NULL;
......@@ -2281,21 +2230,25 @@ convert_get_alpha_mode (GstVideoConverter * convert)
}
/**
* gst_video_converter_new: (skip)
* gst_video_converter_new_with_pool: (skip)
* @in_info: a #GstVideoInfo
* @out_info: a #GstVideoInfo
* @config: (transfer full): a #GstStructure with configuration options
* @pool: (nullable): a #GstTaskPool to spawn threads from
*
* Create a new converter object to convert between @in_info and @out_info
* with @config.
*
* The optional @pool can be used to spawn threads, this is useful when
* creating new converters rapidly, for example when updating cropping.
*
* Returns: a #GstVideoConverter or %NULL if conversion is not possible.
*
* Since: 1.6
* Since: 1.20
*/
GstVideoConverter *
gst_video_converter_new (GstVideoInfo * in_info, GstVideoInfo * out_info,
GstStructure * config)
gst_video_converter_new_with_pool (GstVideoInfo * in_info,
GstVideoInfo * out_info, GstStructure * config, GstTaskPool * pool)
{
GstVideoConverter *convert;
GstLineCache *prev;
......@@ -2431,7 +2384,8 @@ gst_video_converter_new (GstVideoInfo * in_info, GstVideoInfo * out_info,
if (n_threads < 1)
n_threads = 1;
convert->conversion_runner = gst_parallelized_task_runner_new (n_threads);
convert->conversion_runner =
gst_parallelized_task_runner_new (n_threads, pool);
if (video_converter_lookup_fastpath (convert))
goto done;
......@@ -2520,6 +2474,26 @@ no_pack_func:
}
}
/**
* gst_video_converter_new: (skip)
* @in_info: a #GstVideoInfo
* @out_info: a #GstVideoInfo
* @config: (transfer full): a #GstStructure with configuration options
*
* Create a new converter object to convert between @in_info and @out_info
* with @config.
*
* Returns: a #GstVideoConverter or %NULL if conversion is not possible.
*
* Since: 1.6
*/
GstVideoConverter *
gst_video_converter_new (GstVideoInfo * in_info, GstVideoInfo * out_info,
GstStructure * config)
{
return gst_video_converter_new_with_pool (in_info, out_info, config, NULL);
}
static void
clear_matrix_data (MatrixData * data)
{
......
......@@ -280,6 +280,12 @@ GstVideoConverter * gst_video_converter_new (GstVideoInfo *in_info,
GstVideoInfo *out_info,
GstStructure *config);
GST_VIDEO_API
GstVideoConverter * gst_video_converter_new_with_pool (GstVideoInfo * in_info,
GstVideoInfo * out_info,
GstStructure * config,
GstTaskPool * pool);
GST_VIDEO_API
void gst_video_converter_free (GstVideoConverter * convert);
......
......@@ -2719,6 +2719,84 @@ GST_START_TEST (test_video_convert)
GST_END_TEST;
GST_START_TEST (test_video_convert_multithreading)
{
GstVideoInfo ininfo, outinfo;
GstVideoFrame inframe, outframe, refframe;
GstBuffer *inbuffer, *outbuffer, *refbuffer;
GstVideoConverter *convert;
GstMapInfo info;
GstTaskPool *pool;
/* Large enough input resolution for video-converter to actually use
* 4 threads if required */
fail_unless (gst_video_info_set_format (&ininfo, GST_VIDEO_FORMAT_ARGB, 1280,
720));
inbuffer = gst_buffer_new_and_alloc (ininfo.size);
gst_buffer_memset (inbuffer, 0, 0, -1);
gst_video_frame_map (&inframe, &ininfo, inbuffer, GST_MAP_READ);
fail_unless (gst_video_info_set_format (&outinfo, GST_VIDEO_FORMAT_BGRx, 400,
300));
outbuffer = gst_buffer_new_and_alloc (outinfo.size);
refbuffer = gst_buffer_new_and_alloc (outinfo.size);
gst_video_frame_map (&outframe, &outinfo, outbuffer, GST_MAP_WRITE);
gst_video_frame_map (&refframe, &outinfo, refbuffer, GST_MAP_WRITE);
/* Single threaded-conversion */
convert = gst_video_converter_new (&ininfo, &outinfo,
gst_structure_new_empty ("options"));
gst_video_converter_frame (convert, &inframe, &refframe);
gst_video_converter_free (convert);
/* Multithreaded conversion, converter creates pool */
convert = gst_video_converter_new (&ininfo, &outinfo,
gst_structure_new ("options",
GST_VIDEO_CONVERTER_OPT_THREADS, G_TYPE_UINT, 4, NULL)
);
gst_video_converter_frame (convert, &inframe, &outframe);
gst_video_converter_free (convert);
gst_video_frame_unmap (&outframe);
gst_video_frame_unmap (&refframe);
gst_buffer_map (outbuffer, &info, GST_MAP_READ);
fail_unless (gst_buffer_memcmp (refbuffer, 0, info.data, info.size) == 0);
gst_buffer_unmap (outbuffer, &info);
gst_video_frame_map (&outframe, &outinfo, outbuffer, GST_MAP_WRITE);
gst_video_frame_map (&refframe, &outinfo, refbuffer, GST_MAP_WRITE);
/* Multi-threaded conversion, user-provided pool */
pool = gst_shared_task_pool_new ();
gst_shared_task_pool_set_max_threads (GST_SHARED_TASK_POOL (pool), 4);
gst_task_pool_prepare (pool, NULL);
convert = gst_video_converter_new_with_pool (&ininfo, &outinfo,
gst_structure_new ("options",
GST_VIDEO_CONVERTER_OPT_THREADS, G_TYPE_UINT, 4, NULL), pool);
gst_video_converter_frame (convert, &inframe, &outframe);
gst_video_converter_free (convert);
gst_task_pool_cleanup (pool);
gst_object_unref (pool);
gst_video_frame_unmap (&outframe);
gst_video_frame_unmap (&refframe);
gst_buffer_map (outbuffer, &info, GST_MAP_READ);
fail_unless (gst_buffer_memcmp (refbuffer, 0, info.data, info.size) == 0);
gst_buffer_unmap (outbuffer, &info);
gst_buffer_unref (refbuffer);
gst_buffer_unref (outbuffer);
gst_video_frame_unmap (&inframe);
gst_buffer_unref (inbuffer);
}
GST_END_TEST;
GST_START_TEST (test_video_transfer)
{
gint i, j;
......@@ -3889,6 +3967,7 @@ video_suite (void)
tcase_add_test (tc_chain, test_video_color_convert_other);
tcase_add_test (tc_chain, test_video_size_convert);
tcase_add_test (tc_chain, test_video_convert);
tcase_add_test (tc_chain, test_video_convert_multithreading);
tcase_add_test (tc_chain, test_video_transfer);
tcase_add_test (tc_chain, test_overlay_blend);
tcase_add_test (tc_chain, test_video_center_rect);
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment