Commit d5be929b authored by Wim Taymans's avatar Wim Taymans

gst/base/gstbasesink.*: Store and use discont values when syncing buffers as...

gst/base/gstbasesink.*: Store and use discont values when syncing buffers as described in design docs.

Original commit message from CVS:
* gst/base/gstbasesink.c: (gst_base_sink_handle_object),
(gst_base_sink_event), (gst_base_sink_get_times),
(gst_base_sink_do_sync), (gst_base_sink_change_state):
* gst/base/gstbasesink.h:
Store and use discont values when syncing buffers as described
in design docs.

* gst/base/gstbasesrc.c: (gst_base_src_send_discont),
(gst_base_src_do_seek), (gst_base_src_loop), (gst_base_src_start),
(gst_base_src_activate_push):
Push discont event when starting.

* gst/elements/gstidentity.c: (gst_identity_transform):
Small cleanups.

* gst/gstbin.c: (gst_bin_change_state):
Small cleanups in base_time  distribution.

* gst/gstelement.c: (gst_element_set_base_time),
(gst_element_get_base_time), (gst_element_change_state):
* gst/gstelement.h:
Added methods for the base_time of the element.
Some MT fixes.

* gst/gstpipeline.c: (gst_pipeline_send_event),
(gst_pipeline_change_state), (gst_pipeline_set_new_stream_time),
(gst_pipeline_get_last_stream_time):
* gst/gstpipeline.h:
MT fixes.
Handle seeking as described in design doc, remove stream_time
hack.
Cleanups clock and stream_time selection code. Added accessors
for the stream_time.
parent d3ffeea7
2005-07-16 Wim Taymans <wim@fluendo.com>
* gst/base/gstbasesink.c: (gst_base_sink_handle_object),
(gst_base_sink_event), (gst_base_sink_get_times),
(gst_base_sink_do_sync), (gst_base_sink_change_state):
* gst/base/gstbasesink.h:
Store and use discont values when syncing buffers as described
in design docs.
* gst/base/gstbasesrc.c: (gst_base_src_send_discont),
(gst_base_src_do_seek), (gst_base_src_loop), (gst_base_src_start),
(gst_base_src_activate_push):
Push discont event when starting.
* gst/elements/gstidentity.c: (gst_identity_transform):
Small cleanups.
* gst/gstbin.c: (gst_bin_change_state):
Small cleanups in base_time distribution.
* gst/gstelement.c: (gst_element_set_base_time),
(gst_element_get_base_time), (gst_element_change_state):
* gst/gstelement.h:
Added methods for the base_time of the element.
Some MT fixes.
* gst/gstpipeline.c: (gst_pipeline_send_event),
(gst_pipeline_change_state), (gst_pipeline_set_new_stream_time),
(gst_pipeline_get_last_stream_time):
* gst/gstpipeline.h:
MT fixes.
Handle seeking as described in design doc, remove stream_time
hack.
Cleanups clock and stream_time selection code. Added accessors
for the stream_time.
2005-07-16 Andy Wingo <wingo@pobox.com>
* gst/gsterror.c (_gst_core_errors_init): Use the magic word..
......
......@@ -455,16 +455,37 @@ gst_base_sink_handle_object (GstBaseSink * basesink, GstPad * pad,
have_event = GST_IS_EVENT (obj);
if (have_event) {
GstEvent *event = GST_EVENT (obj);
switch (GST_EVENT_TYPE (obj)) {
case GST_EVENT_EOS:
basesink->preroll_queued++;
basesink->eos = TRUE;
break;
case GST_EVENT_DISCONTINUOUS:
/* the discont event is needed to bring the buffer timestamps to the
* stream time */
if (!gst_event_discont_get_value (event, GST_FORMAT_TIME,
&basesink->discont_start, &basesink->discont_stop)) {
basesink->discont_start = 0;
basesink->discont_stop = 0;
}
basesink->have_discont = TRUE;
GST_DEBUG ("received DISCONT %" GST_TIME_FORMAT "-%" GST_TIME_FORMAT,
GST_TIME_ARGS (basesink->discont_start),
GST_TIME_ARGS (basesink->discont_stop));
break;
default:
break;
}
basesink->events_queued++;
} else {
if (!basesink->have_discont) {
GST_ELEMENT_ERROR (basesink, STREAM, STOPPED,
("received buffer without a discont"),
("received buffer without a discont"));
}
basesink->preroll_queued++;
basesink->buffers_queued++;
}
......@@ -641,9 +662,6 @@ gst_base_sink_event (GstPad * pad, GstEvent * event)
GstFlowReturn ret;
GST_STREAM_LOCK (pad);
if (basesink->clock) {
//gint64 value = GST_EVENT_DISCONT_OFFSET (event, 0).value;
}
ret =
gst_base_sink_handle_object (basesink, pad, GST_MINI_OBJECT (event));
GST_STREAM_UNLOCK (pad);
......@@ -693,7 +711,7 @@ gst_base_sink_event (GstPad * pad, GstEvent * event)
}
/* default implementation to calculate the start and end
* timestamps on a buffer, subclasses cna override
* timestamps on a buffer, subclasses can override
*/
static void
gst_base_sink_get_times (GstBaseSink * basesink, GstBuffer * buffer,
......@@ -703,6 +721,10 @@ gst_base_sink_get_times (GstBaseSink * basesink, GstBuffer * buffer,
timestamp = GST_BUFFER_TIMESTAMP (buffer);
if (GST_CLOCK_TIME_IS_VALID (timestamp)) {
/* bring timestamp to stream time using last
* discont offset. */
timestamp -= basesink->discont_start;
/* get duration to calculate end time */
duration = GST_BUFFER_DURATION (buffer);
if (GST_CLOCK_TIME_IS_VALID (duration)) {
*end = timestamp + duration;
......@@ -739,11 +761,17 @@ gst_base_sink_do_sync (GstBaseSink * basesink, GstBuffer * buffer)
if (GST_CLOCK_TIME_IS_VALID (start)) {
GstClockReturn ret;
GstClockTime base_time;
/* save clock id so that we can unlock it if needed */
GST_LOCK (basesink);
base_time = GST_ELEMENT (basesink)->base_time;
GST_LOG_OBJECT (basesink,
"waiting for clock, base time %" GST_TIME_FORMAT,
GST_TIME_ARGS (base_time));
/* save clock id so that we can unlock it if needed */
basesink->clock_id = gst_clock_new_single_shot_id (basesink->clock,
start + GST_ELEMENT (basesink)->base_time);
start + base_time);
basesink->end_time = end;
GST_UNLOCK (basesink);
......@@ -995,6 +1023,9 @@ gst_base_sink_change_state (GstElement * element)
basesink->have_preroll = FALSE;
basesink->need_preroll = TRUE;
GST_PREROLL_UNLOCK (basesink->sinkpad);
basesink->have_discont = FALSE;
basesink->discont_start = 0;
basesink->discont_stop = 0;
ret = GST_STATE_ASYNC;
break;
case GST_STATE_PAUSED_TO_PLAYING:
......
......@@ -66,6 +66,10 @@ struct _GstBaseSink {
GstClockID clock_id;
GstClockTime end_time;
gboolean have_discont;
GstClockTime discont_start;
GstClockTime discont_stop;
gboolean eos;
gboolean need_preroll;
gboolean have_preroll;
......
......@@ -356,6 +356,18 @@ gst_base_src_get_event_mask (GstPad * pad)
}
#endif
static gboolean
gst_base_src_send_discont (GstBaseSrc * src)
{
GstEvent *event;
event = gst_event_new_discontinuous (1.0,
GST_FORMAT_BYTES,
(gint64) src->segment_start, (gint64) src->segment_end, NULL);
return gst_pad_push_event (src->srcpad, event);
}
static gboolean
gst_base_src_do_seek (GstBaseSrc * src, GstEvent * event)
{
......@@ -420,15 +432,7 @@ gst_base_src_do_seek (GstBaseSrc * src, GstEvent * event)
gst_pad_push_event (src->srcpad, gst_event_new_flush (TRUE));
/* now send discont */
{
GstEvent *event;
event = gst_event_new_discontinuous (1.0,
GST_FORMAT_BYTES,
(gint64) src->segment_start, (gint64) src->segment_end, NULL);
gst_pad_push_event (src->srcpad, event);
}
gst_base_src_send_discont (src);
/* and restart the task */
gst_pad_start_task (src->srcpad, (GstTaskFunction) gst_base_src_loop,
......@@ -864,7 +868,7 @@ gst_base_src_start (GstBaseSrc * basesrc)
GST_DEBUG ("size %d %lld", result, basesrc->size);
/* we always run to the end */
basesrc->segment_end = -1;
basesrc->segment_end = basesrc->size;
/* check if we can seek, updates ->seekable */
gst_base_src_is_seekable (basesrc);
......@@ -952,6 +956,9 @@ gst_base_src_activate_push (GstPad * pad, gboolean active)
if (!gst_base_src_start (basesrc))
goto error_start;
/* now send discont */
gst_base_src_send_discont (basesrc);
return gst_pad_start_task (pad, (GstTaskFunction) gst_base_src_loop, pad);
} else {
return gst_base_src_deactivate (basesrc, pad);
......
......@@ -360,27 +360,28 @@ gst_identity_transform (GstBaseTransform * trans, GstBuffer * inbuf,
if (identity->sync) {
GstClock *clock;
GstClockReturn cret;
clock = GST_ELEMENT (identity)->clock;
GST_LOCK (identity);
if ((clock = GST_ELEMENT (identity)->clock)) {
GstClockReturn cret;
if (clock) {
/* save id if we need to unlock */
/* FIXME: actually unlock this somewhere if the state changes */
GST_LOCK (identity);
/* FIXME: actually unlock this somewhere in the state changes */
identity->clock_id = gst_clock_new_single_shot_id (clock,
GST_BUFFER_TIMESTAMP (outbuf) + GST_ELEMENT (identity)->base_time);
GST_UNLOCK (identity);
cret = gst_clock_id_wait (identity->clock_id, NULL);
GST_LOCK (identity);
if (identity->clock_id) {
gst_clock_id_unref (identity->clock_id);
identity->clock_id = NULL;
}
GST_UNLOCK (identity);
if (cret == GST_CLOCK_UNSCHEDULED)
ret = GST_FLOW_UNEXPECTED;
}
GST_UNLOCK (identity);
}
identity->offset += GST_BUFFER_SIZE (outbuf);
......
......@@ -1135,6 +1135,7 @@ gst_bin_change_state (GstElement * element)
GQueue *elem_queue; /* list of elements waiting for a state change */
GQueue *semi_queue; /* list of elements with no connected srcpads */
GQueue *temp; /* queue of leftovers */
GstClockTime base_time;
bin = GST_BIN (element);
......@@ -1167,6 +1168,9 @@ gst_bin_change_state (GstElement * element)
GST_LOCK (bin);
restart:
/* take base time */
base_time = element->base_time;
/* make sure queues are empty, they could be filled when
* restarting. */
clear_queue (elem_queue, TRUE);
......@@ -1286,7 +1290,9 @@ restart:
goto next_element;
/* set base time on element */
qelement->base_time = element->base_time;
gst_element_set_base_time (qelement, base_time);
/* then change state */
ret = gst_element_set_state (qelement, pending);
/* the set state could have cause elements to be added/removed,
......
......@@ -362,6 +362,52 @@ gst_element_get_clock (GstElement * element)
return NULL;
}
/**
* gst_element_set_base_time:
* @element: a #GstElement.
* @time: the base time to set.
*
* Set the base time of an element. See @gst_element_get_base_time().
*
* MT safe.
*/
void
gst_element_set_base_time (GstElement * element, GstClockTime time)
{
g_return_if_fail (GST_IS_ELEMENT (element));
GST_LOCK (element);
element->base_time = time;
GST_UNLOCK (element);
}
/**
* gst_element_get_base_time:
* @element: a #GstElement.
*
* Returns the base time of the element. The base time is the
* absolute time of the clock when this element was last put to
* PLAYING. Substracting the base time from the clock time gives
* the stream time of the element.
*
* Returns: the base time of the element.
*
* MT safe.
*/
GstClockTime
gst_element_get_base_time (GstElement * element)
{
GstClockTime result;
g_return_val_if_fail (GST_IS_ELEMENT (element), GST_CLOCK_TIME_NONE);
GST_LOCK (element);
result = element->base_time;
GST_UNLOCK (element);
return result;
}
#ifndef GST_DISABLE_INDEX
/**
* gst_element_is_indexable:
......@@ -1953,6 +1999,7 @@ gst_element_change_state (GstElement * element)
}
break;
case GST_STATE_PAUSED_TO_PLAYING:
/* FIXME really needed? */
GST_LOCK (element);
if (GST_ELEMENT_MANAGER (element)) {
element->base_time =
......@@ -1969,7 +2016,9 @@ gst_element_change_state (GstElement * element)
if (!gst_element_pads_activate (element, FALSE)) {
result = GST_STATE_FAILURE;
} else {
GST_LOCK (element);
element->base_time = 0;
GST_UNLOCK (element);
}
break;
default:
......
......@@ -281,6 +281,8 @@ gboolean gst_element_requires_clock (GstElement *element);
gboolean gst_element_provides_clock (GstElement *element);
GstClock* gst_element_get_clock (GstElement *element);
void gst_element_set_clock (GstElement *element, GstClock *clock);
void gst_element_set_base_time (GstElement *element, GstClockTime time);
GstClockTime gst_element_get_base_time (GstElement *element);
/* indexes */
gboolean gst_element_is_indexable (GstElement *element);
......
......@@ -229,7 +229,6 @@ gst_pipeline_send_event (GstElement * element, GstEvent * event)
GstElementState state;
GstEventType event_type = GST_EVENT_TYPE (event);
GTimeVal timeout;
gint64 offset = -1;
/* need to call _get_state() since a bin state is only updated
* with this call. */
......@@ -239,12 +238,6 @@ gst_pipeline_send_event (GstElement * element, GstEvent * event)
was_playing = state == GST_STATE_PLAYING;
if (event_type == GST_EVENT_SEEK) {
if (GST_EVENT_SEEK_FORMAT (event) != GST_FORMAT_TIME) {
GST_WARNING ("Pipelines only accept seek events with TIME format");
g_warning ("Pipelines only accept seek events with TIME format");
return FALSE;
}
offset = GST_EVENT_SEEK_OFFSET (event);
if (was_playing)
gst_element_set_state (element, GST_STATE_PAUSED);
}
......@@ -253,7 +246,7 @@ gst_pipeline_send_event (GstElement * element, GstEvent * event)
if (res && event_type == GST_EVENT_SEEK) {
/* need to set the stream time to the seek time */
GST_PIPELINE (element)->stream_time = offset;
gst_pipeline_set_new_stream_time (GST_PIPELINE (element), 0);
if (was_playing)
gst_element_set_state (element, GST_STATE_PLAYING);
}
......@@ -284,35 +277,42 @@ gst_pipeline_change_state (GstElement * element)
GstElementStateReturn result = GST_STATE_SUCCESS;
GstPipeline *pipeline = GST_PIPELINE (element);
gint transition = GST_STATE_TRANSITION (element);
GstClock *clock;
switch (transition) {
case GST_STATE_NULL_TO_READY:
GST_LOCK (element);
if (element->bus)
gst_bus_set_flushing (element->bus, FALSE);
GST_UNLOCK (element);
gst_scheduler_setup (GST_ELEMENT_SCHEDULER (pipeline));
break;
case GST_STATE_READY_TO_PAUSED:
{
GstClock *clock;
clock = gst_element_get_clock (element);
gst_element_set_clock (element, clock);
gst_object_unref (clock);
break;
}
case GST_STATE_PAUSED_TO_PLAYING:
if (element->clock) {
GstClockTime start_time = gst_clock_get_time (element->clock);
/* when going to playing, select a clock */
if ((clock = gst_element_get_clock (element))) {
GstClockTime start_time;
/* distribute the clock */
gst_element_set_clock (element, clock);
/* get start time */
start_time = gst_clock_get_time (clock);
gst_object_unref (clock);
GST_LOCK (element);
element->base_time = start_time -
pipeline->stream_time + pipeline->delay;
GST_DEBUG ("stream_time=%" GST_TIME_FORMAT ", start_time=%"
GST_TIME_FORMAT ", base time %" GST_TIME_FORMAT,
GST_TIME_ARGS (pipeline->stream_time),
GST_TIME_ARGS (start_time), GST_TIME_ARGS (element->base_time));
GST_UNLOCK (element);
} else {
element->base_time = 0;
GST_UNLOCK (element);
GST_DEBUG ("no clock, using base time of 0");
gst_element_set_base_time (element, 0);
}
break;
case GST_STATE_PLAYING_TO_PAUSED:
......@@ -325,28 +325,40 @@ gst_pipeline_change_state (GstElement * element)
switch (transition) {
case GST_STATE_READY_TO_PAUSED:
pipeline->stream_time = 0;
gst_pipeline_set_new_stream_time (pipeline, 0);
break;
case GST_STATE_PAUSED_TO_PLAYING:
break;
case GST_STATE_PLAYING_TO_PAUSED:
if (element->clock) {
GST_LOCK (element);
if ((clock = element->clock)) {
GstClockTime now;
now = gst_clock_get_time (element->clock);
gst_object_ref (clock);
GST_UNLOCK (element);
/* calculate the time when we stopped */
now = gst_clock_get_time (clock);
gst_object_unref (clock);
GST_LOCK (element);
/* store the current stream time */
pipeline->stream_time = now - element->base_time;
GST_DEBUG ("stream_time=%" GST_TIME_FORMAT ", now=%" GST_TIME_FORMAT
", base time %" GST_TIME_FORMAT,
GST_TIME_ARGS (pipeline->stream_time),
GST_TIME_ARGS (now), GST_TIME_ARGS (element->base_time));
}
GST_UNLOCK (element);
break;
case GST_STATE_PAUSED_TO_READY:
break;
case GST_STATE_READY_TO_NULL:
GST_LOCK (element);
if (element->bus) {
gst_bus_set_flushing (element->bus, TRUE);
}
GST_UNLOCK (element);
break;
}
......@@ -412,6 +424,55 @@ gst_pipeline_get_bus (GstPipeline * pipeline)
return gst_element_get_bus (GST_ELEMENT (pipeline));
}
/**
* gst_pipeline_set_new_stream_time:
* @pipeline: the pipeline
*
* Set the new stream time of the pipeline. The stream time is used to
* set the base time on the elements (see @gst_element_set_base_time())
* in the PAUSED->PLAYING state transition.
*
* MT safe.
*/
void
gst_pipeline_set_new_stream_time (GstPipeline * pipeline, GstClockTime time)
{
g_return_if_fail (GST_IS_PIPELINE (pipeline));
GST_LOCK (pipeline);
pipeline->stream_time = time;
GST_DEBUG ("%s: set new stream_time to %" GST_TIME_FORMAT,
GST_ELEMENT_NAME (pipeline), time);
GST_UNLOCK (pipeline);
}
/**
* gst_pipeline_get_last_stream_time:
* @pipeline: the pipeline
*
* Gets the last stream time of the pipeline. If the pipeline is PLAYING,
* the returned time is the stream time used to configure the elements
* in the PAUSED->PLAYING state. If the pipeline is PAUSED, the returned
* time is the stream time when the pipeline was paused.
*
* Returns: a GstClockTime
*
* MT safe.
*/
GstClockTime
gst_pipeline_get_last_stream_time (GstPipeline * pipeline)
{
GstClockTime result;
g_return_val_if_fail (GST_IS_PIPELINE (pipeline), GST_CLOCK_TIME_NONE);
GST_LOCK (pipeline);
result = pipeline->stream_time;
GST_UNLOCK (pipeline);
return result;
}
static GstClock *
gst_pipeline_get_clock_func (GstElement * element)
{
......
......@@ -70,6 +70,9 @@ GstElement* gst_pipeline_new (const gchar *name);
GstScheduler* gst_pipeline_get_scheduler (GstPipeline *pipeline);
GstBus* gst_pipeline_get_bus (GstPipeline *pipeline);
void gst_pipeline_set_new_stream_time (GstPipeline *pipeline, GstClockTime time);
GstClockTime gst_pipeline_get_last_stream_time (GstPipeline *pipeline);
void gst_pipeline_use_clock (GstPipeline *pipeline, GstClock *clock);
void gst_pipeline_set_clock (GstPipeline *pipeline, GstClock *clock);
GstClock* gst_pipeline_get_clock (GstPipeline *pipeline);
......
......@@ -455,16 +455,37 @@ gst_base_sink_handle_object (GstBaseSink * basesink, GstPad * pad,
have_event = GST_IS_EVENT (obj);
if (have_event) {
GstEvent *event = GST_EVENT (obj);
switch (GST_EVENT_TYPE (obj)) {
case GST_EVENT_EOS:
basesink->preroll_queued++;
basesink->eos = TRUE;
break;
case GST_EVENT_DISCONTINUOUS:
/* the discont event is needed to bring the buffer timestamps to the
* stream time */
if (!gst_event_discont_get_value (event, GST_FORMAT_TIME,
&basesink->discont_start, &basesink->discont_stop)) {
basesink->discont_start = 0;
basesink->discont_stop = 0;
}
basesink->have_discont = TRUE;
GST_DEBUG ("received DISCONT %" GST_TIME_FORMAT "-%" GST_TIME_FORMAT,
GST_TIME_ARGS (basesink->discont_start),
GST_TIME_ARGS (basesink->discont_stop));
break;
default:
break;
}
basesink->events_queued++;
} else {
if (!basesink->have_discont) {
GST_ELEMENT_ERROR (basesink, STREAM, STOPPED,
("received buffer without a discont"),
("received buffer without a discont"));
}
basesink->preroll_queued++;
basesink->buffers_queued++;
}
......@@ -641,9 +662,6 @@ gst_base_sink_event (GstPad * pad, GstEvent * event)
GstFlowReturn ret;
GST_STREAM_LOCK (pad);
if (basesink->clock) {
//gint64 value = GST_EVENT_DISCONT_OFFSET (event, 0).value;
}
ret =
gst_base_sink_handle_object (basesink, pad, GST_MINI_OBJECT (event));
GST_STREAM_UNLOCK (pad);
......@@ -693,7 +711,7 @@ gst_base_sink_event (GstPad * pad, GstEvent * event)
}
/* default implementation to calculate the start and end
* timestamps on a buffer, subclasses cna override
* timestamps on a buffer, subclasses can override
*/
static void
gst_base_sink_get_times (GstBaseSink * basesink, GstBuffer * buffer,
......@@ -703,6 +721,10 @@ gst_base_sink_get_times (GstBaseSink * basesink, GstBuffer * buffer,
timestamp = GST_BUFFER_TIMESTAMP (buffer);
if (GST_CLOCK_TIME_IS_VALID (timestamp)) {
/* bring timestamp to stream time using last
* discont offset. */
timestamp -= basesink->discont_start;
/* get duration to calculate end time */
duration = GST_BUFFER_DURATION (buffer);
if (GST_CLOCK_TIME_IS_VALID (duration)) {
*end = timestamp + duration;
......@@ -739,11 +761,17 @@ gst_base_sink_do_sync (GstBaseSink * basesink, GstBuffer * buffer)
if (GST_CLOCK_TIME_IS_VALID (start)) {
GstClockReturn ret;
GstClockTime base_time;
/* save clock id so that we can unlock it if needed */
GST_LOCK (basesink);
base_time = GST_ELEMENT (basesink)->base_time;
GST_LOG_OBJECT (basesink,
"waiting for clock, base time %" GST_TIME_FORMAT,
GST_TIME_ARGS (base_time));
/* save clock id so that we can unlock it if needed */
basesink->clock_id = gst_clock_new_single_shot_id (basesink->clock,
start + GST_ELEMENT (basesink)->base_time);
start + base_time);
basesink->end_time = end;
GST_UNLOCK (basesink);
......@@ -995,6 +1023,9 @@ gst_base_sink_change_state (GstElement * element)
basesink->have_preroll = FALSE;
basesink->need_preroll = TRUE;
GST_PREROLL_UNLOCK (basesink->sinkpad);
basesink->have_discont = FALSE;
basesink->discont_start = 0;
basesink->discont_stop = 0;