Skip to content
Snippets Groups Projects

alphacombine: De-couple flush-start/stop events handling

Merged Philippe Normand requested to merge philn/gstreamer:issue-4174 into main
1 file
+ 83
14
Compare changes
  • Side-by-side
  • Inline
@@ -107,8 +107,16 @@ struct _GstAlphaCombine
GMutex buffer_lock;
GCond buffer_cond;
GstBuffer *alpha_buffer;
/* Ref-counted flushing state */
guint flushing;
/* flushing state, protected by buffer_lock */
gboolean flushing;
/* Number of flush-stop events received, used to know when all sink pads are
done flushing. Protected by buffer_lock */
guint flush_stops;
/* sink pad being blocked while waiting for the other pad to receive
* FLUSH_STOP. Protected by buffer_lock */
GstPad *blocked_pad;
gulong pad_block_id;
GstVideoInfo sink_vinfo;
GstVideoInfo alpha_vinfo;
@@ -152,33 +160,83 @@ static void
gst_alpha_combine_unlock (GstAlphaCombine * self)
{
g_mutex_lock (&self->buffer_lock);
self->flushing++;
self->flushing = TRUE;
g_cond_broadcast (&self->buffer_cond);
g_mutex_unlock (&self->buffer_lock);
}
static GstPadProbeReturn
pad_blocked (GstPad * pad, GstPadProbeInfo * info, gpointer user_data)
{
GstAlphaCombine *self = user_data;
GST_DEBUG_OBJECT (self, "pad %s:%s blocked", GST_DEBUG_PAD_NAME (pad));
return GST_PAD_PROBE_OK;
}
/* Has to be called with buffer_lock held */
static void
gst_alpha_combine_unlock_stop (GstAlphaCombine * self)
gst_alpha_combine_clear_sink_pad_probe (GstAlphaCombine * self)
{
g_mutex_lock (&self->buffer_lock);
g_assert (self->flushing);
self->flushing--;
if (!self->pad_block_id)
return;
g_assert (self->blocked_pad);
gst_pad_remove_probe (self->blocked_pad, self->pad_block_id);
self->pad_block_id = 0;
self->blocked_pad = NULL;
}
/* Has to be called with buffer_lock held */
static void
gst_alpha_combine_unlock_reset_flushing_state (GstAlphaCombine * self)
{
self->flushing = FALSE;
self->flush_stops = 0;
/* Reset the format cookies to ensure they are equal */
if (!self->flushing) {
self->sink_format_cookie = 0;
self->alpha_format_cookie = 0;
self->sink_format_cookie = 0;
self->alpha_format_cookie = 0;
}
static void
gst_alpha_combine_unlock_stop (GstAlphaCombine * self, GstPad * pad)
{
g_mutex_lock (&self->buffer_lock);
self->flush_stops++;
/* Keep flushing until we have received a FLUSH_STOP on every sink pad. */
if (self->flush_stops >= 1 && pad) {
if (self->flush_stops == 2) {
GST_DEBUG_OBJECT (self,
"Both sink pads received FLUSH_STOP, unblocking them");
gst_alpha_combine_clear_sink_pad_probe (self);
} else {
GST_DEBUG_OBJECT (pad, "FLUSH_STOP received, blocking");
g_assert (!self->pad_block_id);
self->pad_block_id =
gst_pad_add_probe (pad,
GST_PAD_PROBE_TYPE_BLOCK | GST_PAD_PROBE_TYPE_DATA_DOWNSTREAM,
pad_blocked, self, NULL);
self->blocked_pad = pad;
g_mutex_unlock (&self->buffer_lock);
return;
}
}
gst_alpha_combine_unlock_reset_flushing_state (self);
g_mutex_unlock (&self->buffer_lock);
}
static void
gst_alpha_combine_reset (GstAlphaCombine * self)
{
g_mutex_lock (&self->buffer_lock);
gst_alpha_combine_clear_sink_pad_probe (self);
gst_buffer_replace (&self->alpha_buffer, NULL);
gst_buffer_replace (&self->last_alpha_buffer, NULL);
self->last_flow_ret = GST_FLOW_OK;
g_mutex_unlock (&self->buffer_lock);
}
/*
@@ -492,7 +550,7 @@ gst_alpha_combine_sink_event (GstPad * pad, GstObject * object,
gst_alpha_combine_unlock (self);
break;
case GST_EVENT_FLUSH_STOP:
gst_alpha_combine_unlock_stop (self);
gst_alpha_combine_unlock_stop (self, pad);
break;
case GST_EVENT_CAPS:
{
@@ -523,7 +581,7 @@ gst_alpha_combine_alpha_event (GstPad * pad, GstObject * object,
gst_alpha_combine_unlock (self);
break;
case GST_EVENT_FLUSH_STOP:
gst_alpha_combine_unlock_stop (self);
gst_alpha_combine_unlock_stop (self, pad);
gst_alpha_combine_reset (self);
break;
case GST_EVENT_CAPS:
@@ -587,7 +645,9 @@ gst_alpha_combine_change_state (GstElement * element, GstStateChange transition)
switch (transition) {
case GST_STATE_CHANGE_READY_TO_PAUSED:
gst_alpha_combine_unlock_stop (self);
g_mutex_lock (&self->buffer_lock);
gst_alpha_combine_unlock_reset_flushing_state (self);
g_mutex_unlock (&self->buffer_lock);
break;
case GST_STATE_CHANGE_PAUSED_TO_READY:
gst_alpha_combine_unlock (self);
@@ -600,7+660,7 @@
switch (transition) {
case GST_STATE_CHANGE_PAUSED_TO_READY:
gst_alpha_combine_reset (self);
self->src_format = GST_VIDEO_FORMAT_UNKNOWN;
gst_video_info_init (&self->sink_vinfo);
gst_video_info_init (&self->alpha_vinfo);
@@ -623,7+683,7 @@
g_clear_object (&self->alpha_pad);
g_clear_object (&self->src_pad);
if (G_UNLIKELY (self->blocked_pad)) {
g_assert (self->pad_block_id);
gst_pad_remove_probe (self->blocked_pad, self->pad_block_id);
}
g_clear_object (&self->blocked_pad);
G_OBJECT_CLASS (parent_class)->dispose (object);
}
@@ -669,7 +735,10 @@ gst_alpha_combine_init (GstAlphaCombine * self)
self->sink_pad = gst_element_get_static_pad (GST_ELEMENT (self), "sink");
self->alpha_pad = gst_element_get_static_pad (GST_ELEMENT (self), "alpha");
self->src_pad = gst_element_get_static_pad (GST_ELEMENT (self), "src");
self->flushing = 1;
self->flushing = FALSE;
self->flush_stops = 0;
self->pad_block_id = 0;
self->blocked_pad = NULL;
g_mutex_init (&self->buffer_lock);
g_cond_init (&self->buffer_cond);
Loading