Commit d3f19741 authored by Sebastian Dröge's avatar Sebastian Dröge 🍵

pad: Fix race condition causing the same probe to be called multiple times

Probes were remembering a cookie that was used to check if the probe was
already called this time before the probes list changed. However the
same probes could've been called by another thread in between and thus
gotten a new cookie, and would then be called a second time.

https://bugzilla.gnome.org/show_bug.cgi?id=795987
parent fbf6e287
......@@ -143,7 +143,6 @@ struct _GstPadPrivate
gint using;
guint probe_list_cookie;
guint probe_cookie;
/* counter of how many idle probes are running directly from the add_probe
* call. Used to block any data flowing in the pad while the idle callback
......@@ -159,10 +158,8 @@ struct _GstPadPrivate
typedef struct
{
GHook hook;
guint cookie;
} GstProbe;
#define PROBE_COOKIE(h) (((GstProbe *)(h))->cookie)
#define GST_PAD_IS_RUNNING_IDLE_PROBE(p) \
(((GstPad *)(p))->priv->idle_running > 0)
......@@ -174,7 +171,11 @@ typedef struct
gboolean pass;
gboolean handled;
gboolean marshalled;
guint cookie;
GHook **called_probes;
guint n_called_probes;
guint called_probes_size;
gboolean retry;
} ProbeMarshall;
static void gst_pad_dispose (GObject * object);
......@@ -1455,7 +1456,6 @@ gst_pad_add_probe (GstPad * pad, GstPadProbeType mask,
hook->func = callback;
hook->data = user_data;
hook->destroy = destroy_data;
PROBE_COOKIE (hook) = (pad->priv->probe_cookie - 1);
/* add the probe */
g_hook_append (&pad->probes, hook);
......@@ -3459,6 +3459,8 @@ gst_pad_query_default (GstPad * pad, GstObject * parent, GstQuery * query)
return ret;
}
#define N_STACK_ALLOCATE_PROBES (16)
static void
probe_hook_marshal (GHook * hook, ProbeMarshall * data)
{
......@@ -3468,16 +3470,36 @@ probe_hook_marshal (GHook * hook, ProbeMarshall * data)
GstPadProbeCallback callback;
GstPadProbeReturn ret;
gpointer original_data;
guint i;
/* if we have called this callback, do nothing */
if (PROBE_COOKIE (hook) == data->cookie) {
GST_CAT_LOG_OBJECT (GST_CAT_SCHEDULING, pad,
"hook %lu, cookie %u already called", hook->hook_id,
PROBE_COOKIE (hook));
return;
/* if we have called this callback, do nothing. But only check
* if we're actually calling probes a second time */
if (data->retry) {
for (i = 0; i < data->n_called_probes; i++) {
if (data->called_probes[i] == hook) {
GST_CAT_LOG_OBJECT (GST_CAT_SCHEDULING, pad,
"hook %lu already called", hook->hook_id);
return;
}
}
}
PROBE_COOKIE (hook) = data->cookie;
/* reallocate on the heap if we had more than 16 probes */
if (data->n_called_probes == data->called_probes_size) {
if (data->called_probes_size > N_STACK_ALLOCATE_PROBES) {
data->called_probes_size *= 2;
data->called_probes =
g_renew (GHook *, data->called_probes, data->called_probes_size);
} else {
GHook **tmp = data->called_probes;
data->called_probes_size *= 2;
data->called_probes = g_new (GHook *, data->called_probes_size);
memcpy (data->called_probes, tmp,
N_STACK_ALLOCATE_PROBES * sizeof (GHook *));
}
}
data->called_probes[data->n_called_probes++] = hook;
flags = hook->flags >> G_HOOK_FLAG_USER_SHIFT;
type = info->type;
......@@ -3525,8 +3547,7 @@ probe_hook_marshal (GHook * hook, ProbeMarshall * data)
goto no_match;
GST_CAT_LOG_OBJECT (GST_CAT_SCHEDULING, pad,
"hook %lu, cookie %u with flags 0x%08x matches", hook->hook_id,
PROBE_COOKIE (hook), flags);
"hook %lu with flags 0x%08x matches", hook->hook_id, flags);
data->marshalled = TRUE;
......@@ -3582,8 +3603,8 @@ probe_hook_marshal (GHook * hook, ProbeMarshall * data)
no_match:
{
GST_CAT_LOG_OBJECT (GST_CAT_SCHEDULING, pad,
"hook %lu, cookie %u with flags 0x%08x does not match %08x",
hook->hook_id, PROBE_COOKIE (hook), flags, info->type);
"hook %lu with flags 0x%08x does not match %08x",
hook->hook_id, flags, info->type);
return;
}
}
......@@ -3668,6 +3689,7 @@ do_probe_callbacks (GstPad * pad, GstPadProbeInfo * info,
ProbeMarshall data;
guint cookie;
gboolean is_block;
GHook *called_probes[N_STACK_ALLOCATE_PROBES];
data.pad = pad;
data.info = info;
......@@ -3675,7 +3697,14 @@ do_probe_callbacks (GstPad * pad, GstPadProbeInfo * info,
data.handled = FALSE;
data.marshalled = FALSE;
data.dropped = FALSE;
data.cookie = ++pad->priv->probe_cookie;
/* We stack-allocate for N_STACK_ALLOCATE_PROBES hooks as a first step. If more are needed,
* we will re-allocate with g_malloc(). This should usually never be needed
*/
data.called_probes = called_probes;
data.n_called_probes = 0;
data.called_probes_size = N_STACK_ALLOCATE_PROBES;
data.retry = FALSE;
is_block =
(info->type & GST_PAD_PROBE_TYPE_BLOCK) == GST_PAD_PROBE_TYPE_BLOCK;
......@@ -3686,18 +3715,18 @@ do_probe_callbacks (GstPad * pad, GstPadProbeInfo * info,
}
again:
GST_CAT_LOG_OBJECT (GST_CAT_SCHEDULING, pad,
"do probes cookie %u", data.cookie);
GST_CAT_LOG_OBJECT (GST_CAT_SCHEDULING, pad, "do probes");
cookie = pad->priv->probe_list_cookie;
g_hook_list_marshal (&pad->probes, TRUE,
(GHookMarshaller) probe_hook_marshal, &data);
/* if the list changed, call the new callbacks (they will not have their
* cookie set to data.cookie */
/* if the list changed, call the new callbacks (they will not be in
* called_probes yet) */
if (cookie != pad->priv->probe_list_cookie) {
GST_CAT_LOG_OBJECT (GST_CAT_SCHEDULING, pad,
"probe list changed, restarting");
data.retry = TRUE;
goto again;
}
......@@ -3739,11 +3768,12 @@ again:
GST_OBJECT_FLAG_UNSET (pad, GST_PAD_FLAG_BLOCKING);
GST_CAT_LOG_OBJECT (GST_CAT_SCHEDULING, pad, "We got unblocked");
/* if the list changed, call the new callbacks (they will not have their
* cookie set to data.cookie */
/* if the list changed, call the new callbacks (they will not be in
* called_probes yet) */
if (cookie != pad->priv->probe_list_cookie) {
GST_CAT_LOG_OBJECT (GST_CAT_SCHEDULING, pad,
"probe list changed, restarting");
data.retry = TRUE;
goto again;
}
......@@ -3752,28 +3782,39 @@ again:
}
}
if (data.called_probes_size > N_STACK_ALLOCATE_PROBES)
g_free (data.called_probes);
return defaultval;
/* ERRORS */
flushing:
{
GST_DEBUG_OBJECT (pad, "pad is flushing");
if (data.called_probes_size > N_STACK_ALLOCATE_PROBES)
g_free (data.called_probes);
return GST_FLOW_FLUSHING;
}
dropped:
{
GST_DEBUG_OBJECT (pad, "data is dropped");
if (data.called_probes_size > N_STACK_ALLOCATE_PROBES)
g_free (data.called_probes);
return GST_FLOW_CUSTOM_SUCCESS;
}
passed:
{
/* FIXME : Should we return FLOW_OK or the defaultval ?? */
GST_DEBUG_OBJECT (pad, "data is passed");
if (data.called_probes_size > N_STACK_ALLOCATE_PROBES)
g_free (data.called_probes);
return GST_FLOW_OK;
}
handled:
{
GST_DEBUG_OBJECT (pad, "data was handled");
if (data.called_probes_size > N_STACK_ALLOCATE_PROBES)
g_free (data.called_probes);
return GST_FLOW_CUSTOM_SUCCESS_1;
}
}
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment