Commit 6ae7d4ec authored by Mathieu Duponchelle's avatar Mathieu Duponchelle 🐸 Committed by Tim-Philipp Müller
Browse files

aggregator: make peek() has() pop() drop() buffer API threadsafe

Enforce that the last buffer that was peeked (or had its existence
checked) on a pad is the one that gets popped / dropped, resetting
at the end of each aggregation cycle.

Fixes gstreamer/gstreamer#603

Part-of: <gstreamer/gstreamer!675>
parent a613240c
......@@ -46,6 +46,24 @@
* has been taken with pop_buffer (), a new buffer can be queued
* on that pad.
*
* * When gst_aggregator_pad_peek_buffer() or gst_aggregator_pad_has_buffer()
* are called, a reference is taken to the returned buffer, which stays
* valid until either:
*
* - gst_aggregator_pad_pop_buffer() is called, in which case the caller
* is guaranteed that the buffer they receive is the same as the peeked
* buffer.
* - gst_aggregator_pad_drop_buffer() is called, in which case the caller
* is guaranteed that the dropped buffer is the one that was peeked.
* - the subclass implementation of #GstAggregatorClass.aggregate returns.
*
* Subsequent calls to gst_aggregator_pad_peek_buffer() or
* gst_aggregator_pad_has_buffer() return / check the same buffer that was
* returned / checked, until one of the conditions listed above is met.
*
* Subclasses are only allowed to call these methods from the aggregate
* thread.
*
* * If the subclass wishes to push a buffer downstream in its aggregate
* implementation, it should do so through the
* gst_aggregator_finish_buffer() method. This method will take care
......@@ -127,7 +145,7 @@ static GstClockTime gst_aggregator_get_latency_property (GstAggregator * agg);
static GstClockTime gst_aggregator_get_latency_unlocked (GstAggregator * self);
static void gst_aggregator_pad_buffer_consumed (GstAggregatorPad * pad,
GstBuffer * buffer);
GstBuffer * buffer, gboolean dequeued);
GST_DEBUG_CATEGORY_STATIC (aggregator_debug);
#define GST_CAT_DEFAULT aggregator_debug
......@@ -239,6 +257,7 @@ struct _GstAggregatorPadPrivate
GQueue data; /* buffers, events and queries */
GstBuffer *clipped_buffer;
guint num_buffers;
GstBuffer *peeked_buffer;
/* used to track fill state of queues, only used with live-src and when
* latency property is set to > 0 */
......@@ -951,7 +970,8 @@ gst_aggregator_pad_skip_buffers (GstElement * self, GstPad * epad,
if (GST_IS_BUFFER (item->data)
&& klass->skip_buffer (aggpad, agg, item->data)) {
GST_LOG_OBJECT (aggpad, "Skipping %" GST_PTR_FORMAT, item->data);
gst_aggregator_pad_buffer_consumed (aggpad, GST_BUFFER (item->data));
gst_aggregator_pad_buffer_consumed (aggpad, GST_BUFFER (item->data),
TRUE);
gst_buffer_unref (item->data);
g_queue_delete_link (&aggpad->priv->data, item);
} else {
......@@ -966,6 +986,22 @@ gst_aggregator_pad_skip_buffers (GstElement * self, GstPad * epad,
return TRUE;
}
static gboolean
gst_aggregator_pad_reset_peeked_buffer (GstElement * self, GstPad * epad,
gpointer user_data)
{
GstAggregatorPad *aggpad = (GstAggregatorPad *) epad;
PAD_LOCK (aggpad);
gst_buffer_replace (&aggpad->priv->peeked_buffer, NULL);
PAD_UNLOCK (aggpad);
return TRUE;
}
static void
gst_aggregator_pad_set_flushing (GstAggregatorPad * aggpad,
GstFlowReturn flow_return, gboolean full)
......@@ -1300,8 +1336,11 @@ gst_aggregator_aggregate_func (GstAggregator * self)
/* Ensure we have buffers ready (either in clipped_buffer or at the head of
* the queue */
if (!gst_aggregator_wait_and_check (self, &timeout))
if (!gst_aggregator_wait_and_check (self, &timeout)) {
gst_element_foreach_sink_pad (GST_ELEMENT_CAST (self),
gst_aggregator_pad_reset_peeked_buffer, NULL);
continue;
}
if (gst_pad_check_reconfigure (GST_AGGREGATOR_SRC_PAD (self))) {
if (!gst_aggregator_negotiate_unlocked (self)) {
......@@ -1319,6 +1358,9 @@ gst_aggregator_aggregate_func (GstAggregator * self)
flow_return = klass->aggregate (self, timeout);
}
gst_element_foreach_sink_pad (GST_ELEMENT_CAST (self),
gst_aggregator_pad_reset_peeked_buffer, NULL);
if (!priv->selected_samples_called_or_warned) {
GST_FIXME_OBJECT (self,
"Subclass should call gst_aggregator_selected_samples() from its "
......@@ -3249,9 +3291,12 @@ gst_aggregator_pad_init (GstAggregatorPad * pad)
/* Must be called with the PAD_LOCK held */
static void
gst_aggregator_pad_buffer_consumed (GstAggregatorPad * pad, GstBuffer * buffer)
gst_aggregator_pad_buffer_consumed (GstAggregatorPad * pad, GstBuffer * buffer,
gboolean dequeued)
{
pad->priv->num_buffers--;
if (dequeued)
pad->priv->num_buffers--;
if (buffer && pad->priv->emit_signals) {
g_signal_emit (pad, gst_aggregator_pad_signals[PAD_SIGNAL_BUFFER_CONSUMED],
0, buffer);
......@@ -3292,7 +3337,7 @@ gst_aggregator_pad_clip_buffer_unlocked (GstAggregatorPad * pad)
buffer = aggclass->clip (self, pad, buffer);
if (buffer == NULL) {
gst_aggregator_pad_buffer_consumed (pad, buffer);
gst_aggregator_pad_buffer_consumed (pad, buffer, TRUE);
GST_TRACE_OBJECT (pad, "Clipping consumed the buffer");
}
}
......@@ -3316,22 +3361,44 @@ gst_aggregator_pad_clip_buffer_unlocked (GstAggregatorPad * pad)
GstBuffer *
gst_aggregator_pad_pop_buffer (GstAggregatorPad * pad)
{
GstBuffer *buffer;
GstBuffer *buffer = NULL;
PAD_LOCK (pad);
if (pad->priv->flow_return != GST_FLOW_OK) {
PAD_UNLOCK (pad);
return NULL;
/* If the subclass has already peeked a buffer, we guarantee
* that it receives the same buffer, no matter if the pad has
* errored out / been flushed in the meantime.
*/
if (pad->priv->peeked_buffer) {
buffer = pad->priv->peeked_buffer;
goto done;
}
gst_aggregator_pad_clip_buffer_unlocked (pad);
if (pad->priv->flow_return != GST_FLOW_OK)
goto done;
gst_aggregator_pad_clip_buffer_unlocked (pad);
buffer = pad->priv->clipped_buffer;
done:
if (buffer) {
pad->priv->clipped_buffer = NULL;
gst_aggregator_pad_buffer_consumed (pad, buffer);
if (pad->priv->clipped_buffer != NULL) {
/* Here we still hold a reference to both the clipped buffer
* and possibly the peeked buffer, we transfer the first and
* potentially release the second
*/
gst_aggregator_pad_buffer_consumed (pad, buffer, TRUE);
pad->priv->clipped_buffer = NULL;
gst_buffer_replace (&pad->priv->peeked_buffer, NULL);
} else {
/* Here our clipped buffer has already been released, for
* example because of a flush. We thus transfer the reference
* to the peeked buffer to the caller, and we don't decrement
* pad.num_buffers as it has already been done elsewhere
*/
gst_aggregator_pad_buffer_consumed (pad, buffer, FALSE);
pad->priv->peeked_buffer = NULL;
}
GST_DEBUG_OBJECT (pad, "Consumed: %" GST_PTR_FORMAT, buffer);
}
......@@ -3373,24 +3440,29 @@ gst_aggregator_pad_drop_buffer (GstAggregatorPad * pad)
GstBuffer *
gst_aggregator_pad_peek_buffer (GstAggregatorPad * pad)
{
GstBuffer *buffer;
GstBuffer *buffer = NULL;
PAD_LOCK (pad);
if (pad->priv->flow_return != GST_FLOW_OK) {
PAD_UNLOCK (pad);
return NULL;
if (pad->priv->peeked_buffer) {
buffer = gst_buffer_ref (pad->priv->peeked_buffer);
goto done;
}
if (pad->priv->flow_return != GST_FLOW_OK)
goto done;
gst_aggregator_pad_clip_buffer_unlocked (pad);
if (pad->priv->clipped_buffer) {
buffer = gst_buffer_ref (pad->priv->clipped_buffer);
pad->priv->peeked_buffer = gst_buffer_ref (buffer);
} else {
buffer = NULL;
}
PAD_UNLOCK (pad);
done:
PAD_UNLOCK (pad);
return buffer;
}
......@@ -3412,8 +3484,15 @@ gst_aggregator_pad_has_buffer (GstAggregatorPad * pad)
gboolean has_buffer;
PAD_LOCK (pad);
gst_aggregator_pad_clip_buffer_unlocked (pad);
has_buffer = (pad->priv->clipped_buffer != NULL);
if (pad->priv->peeked_buffer) {
has_buffer = TRUE;
} else {
gst_aggregator_pad_clip_buffer_unlocked (pad);
has_buffer = (pad->priv->clipped_buffer != NULL);
if (has_buffer)
pad->priv->peeked_buffer = gst_buffer_ref (pad->priv->clipped_buffer);
}
PAD_UNLOCK (pad);
return has_buffer;
......
......@@ -58,6 +58,7 @@ struct _GstTestAggregator
guint64 timestamp;
gboolean gap_expected;
gboolean do_flush_on_aggregate;
};
struct _GstTestAggregatorClass
......@@ -100,7 +101,21 @@ gst_test_aggregator_aggregate (GstAggregator * aggregator, gboolean timeout)
testagg->gap_expected = FALSE;
}
gst_aggregator_pad_drop_buffer (pad);
if (testagg->do_flush_on_aggregate) {
GstBuffer *popped_buf;
buf = gst_aggregator_pad_peek_buffer (pad);
GST_DEBUG_OBJECT (pad, "Flushing on aggregate");
gst_pad_send_event (GST_PAD (pad), gst_event_new_flush_start ());
popped_buf = gst_aggregator_pad_pop_buffer (pad);
fail_unless (buf == popped_buf);
gst_buffer_unref (buf);
gst_buffer_unref (popped_buf);
} else {
gst_aggregator_pad_drop_buffer (pad);
}
g_value_reset (&value);
break;
......@@ -1259,6 +1274,37 @@ GST_START_TEST (test_change_state_intensive)
GST_END_TEST;
GST_START_TEST (test_flush_on_aggregate)
{
GThread *thread1, *thread2;
ChainData data1 = { 0, };
ChainData data2 = { 0, };
TestData test = { 0, };
_test_data_init (&test, FALSE);
((GstTestAggregator *) test.aggregator)->do_flush_on_aggregate = TRUE;
_chain_data_init (&data1, test.aggregator, gst_buffer_new (), NULL);
_chain_data_init (&data2, test.aggregator, gst_buffer_new (), NULL);
thread1 = g_thread_try_new ("gst-check", push_data, &data1, NULL);
thread2 = g_thread_try_new ("gst-check", push_data, &data2, NULL);
g_main_loop_run (test.ml);
g_source_remove (test.timeout_id);
/* these will return immediately as when the data is popped the threads are
* unlocked and will terminate */
g_thread_join (thread1);
g_thread_join (thread2);
_chain_data_clear (&data1);
_chain_data_clear (&data2);
_test_data_clear (&test);
}
GST_END_TEST;
static Suite *
gst_aggregator_suite (void)
{
......@@ -1286,6 +1332,7 @@ gst_aggregator_suite (void)
tcase_add_test (general, test_timeout_pipeline_with_wait);
tcase_add_test (general, test_add_remove);
tcase_add_test (general, test_change_state_intensive);
tcase_add_test (general, test_flush_on_aggregate);
return suite;
}
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment