Commit fdfd764a authored by hoonhee.lee's avatar hoonhee.lee

decodebin3: introduce parse input lock for upstream changes

When upstream changes like dynamic stream changes is happened,
below symptom may be observed.
  1) DecodebinInputSteam is created in create_input_stream.
     But, it is not linked yet to multiqueue sinkpad and not assigned.
  2) custom-eos is delivered to multiqueue srcpad and it understands
    input stream is null and try to release slot by async call.
  3) The new input stream is linked to slot by link_input_to_slot.
  4) The slot is released by free_multiqueue_slot.

After that the DecodebinInputStream lost correponding slot and
It may cause unexpected behaviour or crash symptom.

To avoid this problem, take parse input lock for create_input_stream
and custom-eos is handled in multiqueue srcpad.
parent 779704b8
Pipeline #38636 passed with stages
in 58 minutes and 36 seconds
......@@ -49,6 +49,23 @@ _custom_eos_quark_get (void)
return g_quark;
}
#define PARSE_INPUT_LOCK(input) G_STMT_START { \
GST_LOG_OBJECT (input->dbin, \
"parse input locking from thread %p", \
g_thread_self ()); \
g_mutex_lock (&input->input_lock); \
GST_LOG_OBJECT (input->dbin, \
"parse input locked from thread %p", \
g_thread_self ()); \
} G_STMT_END
#define PARSE_INPUT_UNLOCK(input) G_STMT_START { \
GST_LOG_OBJECT (input->dbin, \
"parse input unlocking from thread %p", \
g_thread_self ()); \
g_mutex_unlock (&input->input_lock); \
} G_STMT_END
/* Streams that come from demuxers (input/upstream) */
/* FIXME : All this is hardcoded. Switch to tree of chains */
struct _DecodebinInputStream
......@@ -464,11 +481,13 @@ parsebin_buffer_probe (GstPad * pad, GstPadProbeInfo * info,
DecodebinInputStream *input_stream;
/* The remaining pads in pending_pads are the ones that require a new
* input stream */
PARSE_INPUT_LOCK (ppad->input);
input_stream = create_input_stream (dbin, stream, ppad->pad, ppad->input);
/* See if we can link it straight away */
input_stream->active_stream = stream;
SELECTION_LOCK (dbin);
PARSE_INPUT_UNLOCK (ppad->input);
slot = get_slot_for_input (dbin, input_stream);
link_input_to_slot (input_stream, slot);
SELECTION_UNLOCK (dbin);
......
......@@ -303,6 +303,8 @@ struct _DecodebinInput
/* HACK : Remove these fields */
/* List of PendingPad structures */
GList *pending_pads;
GMutex input_lock;
};
/* Multiqueue Slots */
......@@ -316,6 +318,7 @@ typedef struct _MultiQueueSlot
/* Linked input and output */
DecodebinInputStream *input;
DecodebinInput *parent_input;
/* pending => last stream received on sink pad */
GstStream *pending_stream;
......@@ -515,6 +518,8 @@ static void free_multiqueue_slot_async (GstDecodebin3 * dbin,
static GstStreamCollection *get_merged_collection (GstDecodebin3 * dbin);
static void update_requested_selection (GstDecodebin3 * dbin);
static DecodebinInput *get_input_for_slot (GstDecodebin3 * dbin,
MultiQueueSlot * slot);
/* FIXME: Really make all the parser stuff a self-contained helper object */
#include "gstdecodebin3-parse.c"
......@@ -971,6 +976,7 @@ create_new_input (GstDecodebin3 * dbin, gboolean main)
input->dbin = dbin;
input->is_main = main;
input->group_id = GST_GROUP_ID_INVALID;
g_mutex_init (&input->input_lock);
if (main)
input->ghost_sink = gst_ghost_pad_new_no_target ("sink", GST_PAD_SINK);
else {
......@@ -1291,6 +1297,50 @@ get_merged_collection (GstDecodebin3 * dbin)
return res;
}
/* Must be called with SELECTION_LOCK */
static DecodebinInput *
get_input_for_slot (GstDecodebin3 * dbin, MultiQueueSlot * slot)
{
DecodebinInput *input = NULL;
GList *tmp;
GST_DEBUG_OBJECT (dbin, "Finding for input for slot(%p)", slot);
if (slot->input && slot->input->input) {
GST_DEBUG_OBJECT (dbin, "Still alive inputstream(%p) and input(%p)",
slot->input, slot->input->input);
input = slot->input->input;
goto done;
}
if (slot->parent_input == dbin->main_input) {
GST_DEBUG_OBJECT (dbin,
"inputstream is not exist but, found in main input(%p)",
dbin->main_input);
input = dbin->main_input;
goto done;
}
for (tmp = dbin->other_inputs; tmp; tmp = tmp->next) {
DecodebinInput *cur = (DecodebinInput *) tmp->data;
if (slot->parent_input == cur) {
GST_DEBUG_OBJECT (dbin,
"inputstream is not exist but, found in other input(%p)", cur);
input = cur;
break;
}
}
done:
if (input)
GST_DEBUG_OBJECT (dbin, "Found input: %s",
input->parsebin ? GST_ELEMENT_NAME (input->parsebin) : "<NONE>");
else
GST_DEBUG_OBJECT (dbin, "No found input for slot(%p)", slot);
return input;
}
/* Call with INPUT_LOCK taken */
static DecodebinInput *
find_message_parsebin (GstDecodebin3 * dbin, GstElement * child)
......@@ -1802,12 +1852,28 @@ multiqueue_src_probe (GstPad * pad, GstPadProbeInfo * info,
/* Custom EOS handling first */
if (gst_mini_object_get_qdata (GST_MINI_OBJECT_CAST (ev),
CUSTOM_EOS_QUARK)) {
DecodebinInput *input = NULL;
gboolean need_input_lock = FALSE;
/* remove custom-eos */
gst_mini_object_set_qdata (GST_MINI_OBJECT_CAST (ev),
CUSTOM_EOS_QUARK, NULL, NULL);
GST_LOG_OBJECT (pad, "Received custom EOS");
ret = GST_PAD_PROBE_HANDLED;
SELECTION_LOCK (dbin);
input = get_input_for_slot (dbin, slot);
SELECTION_UNLOCK (dbin);
if (input) {
need_input_lock = TRUE;
PARSE_INPUT_LOCK (input);
}
SELECTION_LOCK (dbin);
input = get_input_for_slot (dbin, slot);
if (input && need_input_lock) {
PARSE_INPUT_UNLOCK (input);
}
if (slot->input == NULL) {
GST_DEBUG_OBJECT (pad,
"Got custom-eos from null input stream, remove output stream");
......@@ -2030,9 +2096,14 @@ link_input_to_slot (DecodebinInputStream * input, MultiQueueSlot * slot)
"Trying to link input to an already used slot");
return;
}
GST_DEBUG_OBJECT (input->srcpad, "Linking to slot(%s:%s)",
GST_DEBUG_PAD_NAME (slot->sink_pad));
gst_pad_link_full (input->srcpad, slot->sink_pad, GST_PAD_LINK_CHECK_NOTHING);
slot->pending_stream = input->active_stream;
slot->input = input;
slot->parent_input = input->input;
GST_DEBUG_OBJECT (input->srcpad, "Assigned to slot(%s:%s)",
GST_DEBUG_PAD_NAME (slot->sink_pad));
}
#if 0
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment