Commit da0060d3 authored by Wim Taymans's avatar Wim Taymans
Browse files

pad: call new callbacks set in the block callback

Keep track of when a new callback is installed in the callback and call the new
callback in that case.
Add unit test for checking pad blocking.
Fixes #573823.
parent 646a7469
......@@ -1022,6 +1022,7 @@ gst_pad_set_blocked_async_full (GstPad * pad, gboolean blocked,
pad->block_callback = callback;
pad->block_data = user_data;
pad->block_destroy_data = destroy_data;
pad->abidata.ABI.block_callback_called = FALSE;
if (!callback) {
GST_CAT_LOG_OBJECT (GST_CAT_SCHEDULING, pad, "waiting for block");
GST_PAD_BLOCK_WAIT (pad);
......@@ -1039,6 +1040,7 @@ gst_pad_set_blocked_async_full (GstPad * pad, gboolean blocked,
pad->block_callback = callback;
pad->block_data = user_data;
pad->block_destroy_data = destroy_data;
pad->abidata.ABI.block_callback_called = FALSE;
GST_PAD_BLOCK_BROADCAST (pad);
if (!callback) {
......@@ -3786,31 +3788,38 @@ handle_pad_block (GstPad * pad)
* all taken when calling this function. */
gst_object_ref (pad);
/* we either have a callback installed to notify the block or
* some other thread is doing a GCond wait. */
callback = pad->block_callback;
if (callback) {
/* there is a callback installed, call it. We release the
* lock so that the callback can do something usefull with the
* pad */
user_data = pad->block_data;
GST_OBJECT_UNLOCK (pad);
callback (pad, TRUE, user_data);
GST_OBJECT_LOCK (pad);
while (GST_PAD_IS_BLOCKED (pad)) {
do {
/* we either have a callback installed to notify the block or
* some other thread is doing a GCond wait. */
callback = pad->block_callback;
pad->abidata.ABI.block_callback_called = TRUE;
if (callback) {
/* there is a callback installed, call it. We release the
* lock so that the callback can do something usefull with the
* pad */
user_data = pad->block_data;
GST_OBJECT_UNLOCK (pad);
callback (pad, TRUE, user_data);
GST_OBJECT_LOCK (pad);
/* we released the lock, recheck flushing */
if (GST_PAD_IS_FLUSHING (pad))
goto flushing;
} else {
/* no callback, signal the thread that is doing a GCond wait
* if any. */
GST_PAD_BLOCK_BROADCAST (pad);
}
/* we released the lock, recheck flushing */
if (GST_PAD_IS_FLUSHING (pad))
goto flushing;
} else {
/* no callback, signal the thread that is doing a GCond wait
* if any. */
GST_PAD_BLOCK_BROADCAST (pad);
}
} while (pad->abidata.ABI.block_callback_called == FALSE
&& GST_PAD_IS_BLOCKED (pad));
/* OBJECT_LOCK could have been released when we did the callback, which
* then could have made the pad unblock so we need to check the blocking
* condition again. */
if (!GST_PAD_IS_BLOCKED (pad))
break;
/* OBJECT_LOCK could have been released when we did the callback, which
* then could have made the pad unblock so we need to check the blocking
* condition again. */
while (GST_PAD_IS_BLOCKED (pad)) {
/* now we block the streaming thread. It can be unlocked when we
* deactivate the pad (which will also set the FLUSHING flag) or
* when the pad is unblocked. A flushing event will also unblock
......
......@@ -633,7 +633,12 @@ struct _GstPad {
GDestroyNotify block_destroy_data;
/*< private >*/
gpointer _gst_reserved[GST_PADDING - 2];
union {
struct {
gboolean block_callback_called;
} ABI;
gpointer _gst_reserved[GST_PADDING - 2];
} abidata;
};
struct _GstPadClass {
......
......@@ -809,6 +809,97 @@ GST_START_TEST (test_block_async_full_destroy_dispose)
GST_END_TEST;
static void
unblock_async_no_flush_cb (GstPad * pad, gboolean blocked, gpointer user_data)
{
gboolean *bool_user_data = (gboolean *) user_data;
/* here we should have blocked == 1 unblocked == 0 */
fail_unless (blocked == FALSE);
fail_unless (bool_user_data[0] == TRUE);
fail_unless (bool_user_data[1] == TRUE);
fail_unless (bool_user_data[2] == FALSE);
bool_user_data[2] = TRUE;
}
static void
unblock_async_not_called (GstPad * pad, gboolean blocked, gpointer user_data)
{
g_warn_if_reached ();
}
static void
block_async_second_no_flush (GstPad * pad, gboolean blocked, gpointer user_data)
{
gboolean *bool_user_data = (gboolean *) user_data;
fail_unless (blocked == TRUE);
fail_unless (bool_user_data[0] == TRUE);
fail_unless (bool_user_data[1] == FALSE);
fail_unless (bool_user_data[2] == FALSE);
bool_user_data[1] = TRUE;
fail_unless (gst_pad_set_blocked_async (pad, FALSE, unblock_async_no_flush_cb,
user_data));
}
static void
block_async_first_no_flush (GstPad * pad, gboolean blocked, gpointer user_data)
{
static int n_calls = 0;
gboolean *bool_user_data = (gboolean *) user_data;
fail_unless (blocked == TRUE);
if (++n_calls > 1)
/* we expect this callback to be called only once */
g_warn_if_reached ();
*bool_user_data = blocked;
fail_unless (bool_user_data[0] == TRUE);
fail_unless (bool_user_data[1] == FALSE);
fail_unless (bool_user_data[2] == FALSE);
fail_unless (gst_pad_set_blocked_async (pad, FALSE, unblock_async_not_called,
NULL));
/* replace block_async_first with block_async_second so next time the pad is
* blocked the latter should be called */
fail_unless (gst_pad_set_blocked_async (pad, TRUE,
block_async_second_no_flush, user_data));
}
GST_START_TEST (test_block_async_replace_callback_no_flush)
{
GstPad *pad;
gboolean bool_user_data[3] = { FALSE, FALSE, FALSE };
pad = gst_pad_new ("src", GST_PAD_SRC);
fail_unless (pad != NULL);
gst_pad_set_active (pad, TRUE);
fail_unless (gst_pad_set_blocked_async (pad, TRUE, block_async_first_no_flush,
bool_user_data));
gst_pad_push (pad, gst_buffer_new ());
fail_unless (bool_user_data[0] == TRUE);
fail_unless (bool_user_data[1] == TRUE);
fail_unless (bool_user_data[2] == TRUE);
gst_object_unref (pad);
}
GST_END_TEST;
static Suite *
gst_pad_suite (void)
{
......@@ -837,6 +928,7 @@ gst_pad_suite (void)
#endif
tcase_add_test (tc_chain, test_block_async_full_destroy);
tcase_add_test (tc_chain, test_block_async_full_destroy_dispose);
tcase_add_test (tc_chain, test_block_async_replace_callback_no_flush);
return s;
}
......
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment