Commit 24658952 authored by Sebastian Dröge's avatar Sebastian Dröge 🍵

streamsynchronizer: Implement grouping of streams via the group-id

https://bugzilla.gnome.org/show_bug.cgi?id=704427
https://bugzilla.gnome.org/show_bug.cgi?id=704408
parent 9bd140d3
......@@ -76,6 +76,7 @@ typedef struct
* default: G_MAXUINT32 */
guint32 stream_start_seqnum;
guint32 segment_seqnum;
guint group_id;
} GstStream;
/* Must be called with lock! */
......@@ -243,86 +244,119 @@ gst_stream_synchronizer_sink_event (GstPad * pad, GstObject * parent,
{
GstStream *stream, *ostream;
guint32 seqnum = gst_event_get_seqnum (event);
guint group_id;
gboolean have_group_id;
GList *l;
gboolean all_wait = TRUE;
gboolean new_stream = TRUE;
have_group_id = gst_event_parse_group_id (event, &group_id);
GST_STREAM_SYNCHRONIZER_LOCK (self);
self->have_group_id &= have_group_id;
have_group_id = self->have_group_id;
stream = gst_pad_get_element_private (pad);
if (stream && stream->stream_start_seqnum != seqnum) {
if (!stream) {
GST_DEBUG_OBJECT (self, "No stream or STREAM_START from same source");
GST_STREAM_SYNCHRONIZER_UNLOCK (self);
break;
}
if ((have_group_id && stream->group_id != group_id) || (!have_group_id
&& stream->stream_start_seqnum != seqnum)) {
stream->is_eos = FALSE;
stream->stream_start_seqnum = seqnum;
stream->group_id = group_id;
stream->drop_discont = TRUE;
/* Check if this belongs to a stream that is already there,
* e.g. we got the visualizations for an audio stream */
for (l = self->streams; l; l = l->next) {
ostream = l->data;
if (!have_group_id) {
/* Check if this belongs to a stream that is already there,
* e.g. we got the visualizations for an audio stream */
for (l = self->streams; l; l = l->next) {
ostream = l->data;
if (ostream != stream && ostream->stream_start_seqnum == seqnum
&& !ostream->wait) {
new_stream = FALSE;
break;
}
}
if (ostream != stream && ostream->stream_start_seqnum == seqnum
&& !ostream->wait) {
new_stream = FALSE;
if (!new_stream) {
GST_DEBUG_OBJECT (pad,
"Stream %d belongs to running stream %d, no waiting",
stream->stream_number, ostream->stream_number);
stream->wait = FALSE;
stream->new_stream = FALSE;
GST_STREAM_SYNCHRONIZER_UNLOCK (self);
break;
}
} else if (group_id == self->group_id) {
GST_DEBUG_OBJECT (pad, "Stream %d belongs to running group %d, "
"no waiting", stream->stream_number, group_id);
GST_STREAM_SYNCHRONIZER_UNLOCK (self);
break;
}
if (!new_stream) {
GST_DEBUG_OBJECT (pad,
"Stream %d belongs to running stream %d, no waiting",
stream->stream_number, ostream->stream_number);
stream->wait = FALSE;
stream->new_stream = FALSE;
} else {
GST_DEBUG_OBJECT (pad, "Stream %d changed", stream->stream_number);
GST_DEBUG_OBJECT (pad, "Stream %d changed", stream->stream_number);
stream->wait = TRUE;
stream->new_stream = TRUE;
stream->wait = TRUE;
stream->new_stream = TRUE;
for (l = self->streams; l; l = l->next) {
GstStream *ostream = l->data;
for (l = self->streams; l; l = l->next) {
GstStream *ostream = l->data;
all_wait = all_wait && ostream->wait;
if (!all_wait)
break;
}
if (all_wait) {
gint64 position = 0;
all_wait = all_wait && ostream->wait && (!have_group_id
|| ostream->group_id == group_id);
if (!all_wait)
break;
}
if (all_wait) {
gint64 position = 0;
if (have_group_id)
GST_DEBUG_OBJECT (self,
"All streams have changed to group id %u -- unblocking",
group_id);
else
GST_DEBUG_OBJECT (self, "All streams have changed -- unblocking");
for (l = self->streams; l; l = l->next) {
GstStream *ostream = l->data;
gint64 stop_running_time;
gint64 position_running_time;
ostream->wait = FALSE;
if (ostream->segment.format == GST_FORMAT_TIME) {
stop_running_time =
gst_segment_to_running_time (&ostream->segment,
GST_FORMAT_TIME, ostream->segment.stop);
position_running_time =
gst_segment_to_running_time (&ostream->segment,
GST_FORMAT_TIME, ostream->segment.position);
position =
MAX (position, MAX (stop_running_time,
position_running_time));
}
self->group_id = group_id;
for (l = self->streams; l; l = l->next) {
GstStream *ostream = l->data;
gint64 stop_running_time;
gint64 position_running_time;
ostream->wait = FALSE;
if (ostream->segment.format == GST_FORMAT_TIME) {
stop_running_time =
gst_segment_to_running_time (&ostream->segment,
GST_FORMAT_TIME, ostream->segment.stop);
position_running_time =
gst_segment_to_running_time (&ostream->segment,
GST_FORMAT_TIME, ostream->segment.position);
position =
MAX (position, MAX (stop_running_time,
position_running_time));
}
position = MAX (0, position);
self->group_start_time = MAX (self->group_start_time, position);
}
position = MAX (0, position);
self->group_start_time = MAX (self->group_start_time, position);
GST_DEBUG_OBJECT (self, "New group start time: %" GST_TIME_FORMAT,
GST_TIME_ARGS (self->group_start_time));
GST_DEBUG_OBJECT (self, "New group start time: %" GST_TIME_FORMAT,
GST_TIME_ARGS (self->group_start_time));
for (l = self->streams; l; l = l->next) {
GstStream *ostream = l->data;
g_cond_broadcast (&ostream->stream_finish_cond);
}
for (l = self->streams; l; l = l->next) {
GstStream *ostream = l->data;
g_cond_broadcast (&ostream->stream_finish_cond);
}
}
} else {
GST_DEBUG_OBJECT (self, "No stream or STREAM_START from same source");
}
GST_STREAM_SYNCHRONIZER_UNLOCK (self);
......@@ -657,6 +691,7 @@ gst_stream_synchronizer_request_new_pad (GstElement * element,
g_cond_init (&stream->stream_finish_cond);
stream->stream_start_seqnum = G_MAXUINT32;
stream->segment_seqnum = G_MAXUINT32;
stream->group_id = G_MAXUINT;
tmp = g_strdup_printf ("sink_%u", self->current_stream_number);
stream->sinkpad = gst_pad_new_from_static_template (&sinktemplate, tmp);
......@@ -717,6 +752,10 @@ gst_stream_synchronizer_release_stream (GstStreamSynchronizer * self,
}
}
g_assert (l != NULL);
if (self->streams == NULL) {
self->have_group_id = TRUE;
self->group_id = G_MAXUINT;
}
/* we can drop the lock, since stream exists now only local.
* Moreover, we should drop, to prevent deadlock with STREAM_LOCK
......@@ -799,6 +838,8 @@ gst_stream_synchronizer_change_state (GstElement * element,
case GST_STATE_CHANGE_READY_TO_PAUSED:
GST_DEBUG_OBJECT (self, "State change READY->PAUSED");
self->group_start_time = 0;
self->have_group_id = TRUE;
self->group_id = G_MAXUINT;
self->shutdown = FALSE;
break;
case GST_STATE_CHANGE_PAUSED_TO_READY:{
......
......@@ -52,6 +52,9 @@ struct _GstStreamSynchronizer
guint current_stream_number;
GstClockTime group_start_time;
gboolean have_group_id;
guint group_id;
};
struct _GstStreamSynchronizerClass
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment