Commit ca0de588 authored by Tim-Philipp Müller's avatar Tim-Philipp Müller 🐠
Browse files

WIP: handle badly-interleaved ASF streams

When we feed the elementary streams directly to
hardware decoders' ES buffers, the amount of space
for each ES stream is usually limited. When A/V
interleaving is bad, that easily leads to lock-ups
because one of the ES buffers (usually the video
buffer) fills up, while the audio buffer is still
empty or low, which leads to stuttering playback
in the best case, or deadlocks (e.g. no preroll
after seeking) in the worst case.

Detect bad A/V interleaving and stop processing
streams that run ahead, until we've caught up
on the lagging streams, then jump back, and
process the packets from the other stream which
we've skipped.

Needs more work and testing, also should be
made configurable e.g. via a property.
parent d77ddd37
......@@ -122,6 +122,29 @@ asf_payload_find_previous_fragment (AsfPayload * payload, AsfStream * stream)
return ret;
}
static gboolean
gst_asf_payload_check_skip (GstASFDemux * demux, AsfPayload * payload,
AsfStream * stream)
{
if (!GST_CLOCK_TIME_IS_VALID (stream->last_ts))
return FALSE;
/* if skipping, audio needs to catch up and we're skipping video; otherwise
* it might be video catching up again, in which case we need to skip the
* audio payloads which we've already sent when catching up with the video
* (i.e. scanning ahead in a badly interleaved stream) */
if (stream->state == ASF_STREAM_STATE_SKIPPING ||
payload->ts <= stream->last_ts) {
GST_INFO_OBJECT (stream->pad,
"skipping payload %" GST_TIME_FORMAT " (< %" GST_TIME_FORMAT
") state %d", GST_TIME_ARGS (payload->ts),
GST_TIME_ARGS (stream->last_ts), stream->state);
return TRUE;
}
return FALSE;
}
/* TODO: if we have another payload already queued for this stream and that
* payload doesn't have a duration, maybe we can calculate a duration for it
* (if the previous timestamp is smaller etc. etc.) */
......@@ -131,6 +154,7 @@ gst_asf_payload_queue_for_stream (GstASFDemux * demux, AsfPayload * payload,
{
GST_DEBUG_OBJECT (demux, "Got payload for stream %d ts:%" GST_TIME_FORMAT,
stream->id, GST_TIME_ARGS (payload->ts));
/* remember the first timestamp in the stream */
if (G_UNLIKELY (!GST_CLOCK_TIME_IS_VALID (demux->first_ts) &&
GST_CLOCK_TIME_IS_VALID (payload->ts))) {
......@@ -294,6 +318,8 @@ gst_asf_demux_parse_payload (GstASFDemux * demux, AsfPacket * packet,
payload.tff = FALSE;
payload.rff = FALSE;
payload.start_packet = packet->number;
payload.mo_number =
asf_packet_read_varlen_int (packet->prop_flags, 4, p_data, p_size);
payload.mo_offset =
......@@ -377,6 +403,12 @@ gst_asf_demux_parse_payload (GstASFDemux * demux, AsfPacket * packet,
return FALSE;
}
if (gst_asf_payload_check_skip (demux, &payload, stream)) {
*p_data += payload_len;
*p_size -= payload_len;
return TRUE;
}
GST_LOG_OBJECT (demux, "media object offset : %u", payload.mo_offset);
GST_LOG_OBJECT (demux, "payload length: %u", payload_len);
......@@ -470,12 +502,14 @@ gst_asf_demux_parse_payload (GstASFDemux * demux, AsfPacket * packet,
return FALSE;
}
if (G_LIKELY (sub_payload_len > 0)) {
payload.ts = ts;
if (G_LIKELY (sub_payload_len > 0) &&
!gst_asf_payload_check_skip (demux, &payload, stream)) {
payload.buf = asf_packet_create_payload_buffer (packet,
&payload_data, &payload_len, sub_payload_len);
payload.buf_filled = sub_payload_len;
payload.ts = ts;
if (G_LIKELY (ts_delta))
payload.duration = ts_delta;
else
......@@ -510,6 +544,7 @@ gst_asf_demux_parse_packet (GstASFDemux * demux, GstBuffer * buf)
goto short_packet;
packet.buf = buf;
packet.number = demux->packet;
ec_flags = GST_READ_UINT8 (data);
......
......@@ -43,6 +43,7 @@ typedef struct {
gboolean interlaced; /* default: FALSE */
gboolean tff;
gboolean rff;
guint64 start_packet; /* first packet with data for this object */
} AsfPayload;
typedef struct {
......@@ -50,6 +51,7 @@ typedef struct {
guint length; /* packet length (unused) */
guint padding; /* length of padding at end of packet */
guint sequence; /* sequence (unused) */
guint64 number;
GstClockTime send_time;
GstClockTime duration;
......
......@@ -243,6 +243,7 @@ gst_asf_demux_reset (GstASFDemux * demux, gboolean chain_reset)
demux->num_video_streams = 0;
}
demux->num_streams = 0;
demux->video_stream = NULL;
demux->activated_streams = FALSE;
demux->first_ts = GST_CLOCK_TIME_NONE;
demux->segment_ts = GST_CLOCK_TIME_NONE;
......@@ -511,6 +512,9 @@ gst_asf_demux_reset_stream_state_after_discont (GstASFDemux * demux)
for (n = 0; n < demux->num_streams; n++) {
demux->stream[n].discont = TRUE;
demux->stream[n].last_flow = GST_FLOW_OK;
demux->stream[n].last_ts = GST_CLOCK_TIME_NONE;
demux->stream[n].state = ASF_STREAM_STATE_NORMAL;
demux->stream[n].skip_start = -1;
while (demux->stream[n].payloads->len > 0) {
AsfPayload *payload;
......@@ -1342,6 +1346,107 @@ gst_asf_demux_find_stream_with_complete_payload (GstASFDemux * demux)
return best_stream;
}
/* When audio starts lagging more than this behind the video, take special
* measures to catch up and fix bad A/V interleaving */
#define MAX_AV_INTERLEAVING_INITIAL_THRESHOLD (3 * GST_SECOND)
/* Once A/V interleaving fixup is enabled: when audio starts lagging more
* than this value behind the video, take special measures to catch up
* the audio and fix bad A/V interleaving */
#define MAX_AV_INTERLEAVING (1 * GST_SECOND)
/* Once A/V interleaving fixup is enabled:
* Threshold when the audio has 'caught up'.
* Positive values mean audio is no more than that much behind the video,
* negative values mean audio is at least that much ahead of the video */
#define AV_CATCH_UP_THRESHOLD (-1 * GST_SECOND)
static void
check_streams (GstASFDemux * demux)
{
AsfStream *video_stream;
guint num_lagging = 0;
gint i;
video_stream = demux->video_stream;
/* no need to check A/V interleaving if we have no video stream */
if (video_stream == NULL || !GST_CLOCK_TIME_IS_VALID (video_stream->last_ts))
return;
/* .. or if we only have a video stream and no audio stream */
if (demux->num_audio_streams == 0)
return;
// FIXME: check default threshold, shortcut if disabled
/* check if any of the audio streams lag too much */
for (i = 0; i < demux->num_streams; ++i) {
GstClockTimeDiff ts_diff;
AsfStream *stream = &demux->stream[i];
if (stream->is_video)
continue;
if (!GST_CLOCK_TIME_IS_VALID (stream->last_ts))
continue;
ts_diff = video_stream->last_ts - stream->last_ts;
if fix_av_interleaving
if (video_stream->state == ASF_STREAM_STATE_SKIPPING) {
GstClockTimeDiff threshold = AV_CATCH_UP_THRESHOLD;
if (ts_diff < threshold) {
GST_DEBUG_OBJECT (demux, "Stream %s has caught up with video",
GST_PAD_NAME (stream->pad));
stream->state = ASF_STREAM_STATE_NORMAL;
} else {
GST_DEBUG_OBJECT (demux, "Stream %s still lags %.2fs behind video",
GST_PAD_NAME (stream->pad), (double) ts_diff / GST_SECOND);
stream->state = ASF_STREAM_STATE_LAGGING;
++num_lagging;
}
} else {
if (ts_diff > MAX_AV_INTERLEAVING) {
GST_INFO_OBJECT (demux, "Stream %s lags %.2fs behind video",
GST_PAD_NAME (stream->pad), (double) ts_diff / GST_SECOND);
stream->state = ASF_STREAM_STATE_LAGGING;
++num_lagging;
} else {
stream->state = ASF_STREAM_STATE_NORMAL;
}
}
}
if (num_lagging > 0 && video_stream->state != ASF_STREAM_STATE_SKIPPING) {
GST_INFO_OBJECT (demux, "audio streams lag too much, try to catch up");
video_stream->state = ASF_STREAM_STATE_SKIPPING;
if (video_stream->payloads->len > 0) {
AsfPayload *payload;
/* if there is an incomplete payload, we want to jump back to the
* packet where that incomplete payload started, which might be before
* the current packet (e.g. if there was an audio packet sandwiched in
* between partial video payloads) */
payload = &g_array_index (video_stream->payloads, AsfPayload, 0);
video_stream->skip_start = payload->start_packet;
} else {
video_stream->skip_start = demux->packet;
}
} else if (video_stream->state == ASF_STREAM_STATE_SKIPPING
&& num_lagging == 0) {
GST_INFO_OBJECT (demux,
"audio streams caught up, now go back and send "
"video payloads that we've skipped");
demux->packet = video_stream->skip_start;
video_stream->state = ASF_STREAM_STATE_NORMAL;
video_stream->skip_start = -1;
/* FIXME: restore payload array state ? */
/* FIXME: clear any partial payloads in streams? */
}
}
static GstFlowReturn
gst_asf_demux_push_complete_payloads (GstASFDemux * demux, gboolean force)
{
......@@ -1473,20 +1578,39 @@ gst_asf_demux_push_complete_payloads (GstASFDemux * demux, gboolean force)
/* FIXME: we should really set durations on buffers if we can */
GST_LOG_OBJECT (stream->pad, "pushing buffer, ts=%" GST_TIME_FORMAT
GST_LOG_OBJECT (stream->pad, "%s buffer, ts=%" GST_TIME_FORMAT
", dur=%" GST_TIME_FORMAT " size=%u",
(stream->state == ASF_STREAM_STATE_SKIPPING) ? "skipping" : "pushing",
GST_TIME_ARGS (GST_BUFFER_TIMESTAMP (payload->buf)),
GST_TIME_ARGS (GST_BUFFER_DURATION (payload->buf)),
GST_BUFFER_SIZE (payload->buf));
ret = gst_pad_push (stream->pad, payload->buf);
ret = gst_asf_demux_aggregate_flow_return (demux, stream, ret);
payload->buf = NULL;
g_array_remove_index (stream->payloads, 0);
if (stream->state != ASF_STREAM_STATE_SKIPPING) {
/* FIXME: use end time rather than ts? */
if (GST_BUFFER_TIMESTAMP_IS_VALID (payload->buf)) {
stream->last_ts = GST_BUFFER_TIMESTAMP (payload->buf);
}
#if 0
else if (GST_CLOCK_TIME_IS_VALID (stream->last_ts)) {
stream->last_ts += stream->ext_props.avg_time_per_frame * 100;
}
#endif
/* Break out as soon as we have an issue */
if (G_UNLIKELY (ret != GST_FLOW_OK))
break;
ret = gst_pad_push (stream->pad, payload->buf);
ret = gst_asf_demux_aggregate_flow_return (demux, stream, ret);
payload->buf = NULL;
g_array_remove_index (stream->payloads, 0);
/* Break out as soon as we have an issue */
if (G_UNLIKELY (ret != GST_FLOW_OK))
break;
} else {
gst_buffer_replace (&payload->buf, NULL);
g_array_remove_index (stream->payloads, 0);
}
/* check A/V interleaving situation */
check_streams (demux);
}
return ret;
......@@ -2128,6 +2252,11 @@ gst_asf_demux_setup_pad (GstASFDemux * demux, GstPad * src_pad,
stream->is_video = is_video;
stream->pending_tags = tags;
stream->discont = TRUE;
stream->last_ts = GST_CLOCK_TIME_NONE;
stream->state = ASF_STREAM_STATE_NORMAL;
stream->skip_start = -1;
if (is_video) {
GstStructure *st;
gint par_x, par_y;
......@@ -2301,6 +2430,11 @@ gst_asf_demux_activate_stream (GstASFDemux * demux, AsfStream * stream)
GST_INFO_OBJECT (demux, "Activating stream %2u, pad %s, caps %"
GST_PTR_FORMAT, stream->id, GST_PAD_NAME (stream->pad), stream->caps);
gst_pad_set_active (stream->pad, TRUE);
if (stream->is_video && demux->video_stream == NULL) {
GST_INFO_OBJECT (demux, "%s is primary video stream",
GST_PAD_NAME (stream->pad));
demux->video_stream = stream;
}
gst_element_add_pad (GST_ELEMENT_CAST (demux), stream->pad);
stream->active = TRUE;
}
......@@ -3367,6 +3501,7 @@ gst_asf_demux_process_ext_stream_props (GstASFDemux * demux, guint8 * data,
(esp.lang_idx < demux->num_languages) ?
GST_STR_NULL (demux->languages[esp.lang_idx]) : "??");
GST_INFO ("stream name count = %u", stream_name_count);
GST_INFO ("max object size = %u", esp.max_obj_size);
/* read stream names */
for (i = 0; i < stream_name_count; ++i) {
......
......@@ -80,6 +80,12 @@ typedef struct
/* missing: stream names */
} AsfStreamExtProps;
typedef enum {
ASF_STREAM_STATE_NORMAL,
ASF_STREAM_STATE_LAGGING, /* audio stream is a lot behind the video stream */
ASF_STREAM_STATE_SKIPPING /* ignore this video stream while audio catches up */
} AsfStreamState;
typedef struct
{
AsfStreamType type;
......@@ -111,6 +117,16 @@ typedef struct
/* extended stream properties (optional) */
AsfStreamExtProps ext_props;
/* last position in file, for A/V bad interleaving detection */
GstClockTime last_ts;
/* keep track of a stream's state, so we can skip video packet parsing
* until the audio has caught up, to work around streams with extremely
* bad A/V interleaving */
AsfStreamState state;
guint64 skip_start;; /* packet to jump back to when we're done
* skipping video and the other streams
* have caught up */
} AsfStream;
typedef enum {
......@@ -157,6 +173,8 @@ struct _GstASFDemux {
guint32 num_video_streams;
guint32 num_streams;
AsfStream stream[GST_ASF_DEMUX_NUM_STREAMS];
AsfStream * video_stream; /* primary video stream, to check
* A/V interleaving against */
gboolean activated_streams;
/* for chained asf handling, we need to hold the old asf streams until
......@@ -199,6 +217,9 @@ struct _GstASFDemux {
GstClockTime sidx_interval; /* interval between entries in ns */
guint sidx_num_entries; /* number of index entries */
AsfSimpleIndexEntry *sidx_entries; /* packet number for each entry */
/* whether A/V interleaving fixup mode is enabled */
gboolean fix_av_interleaving;
};
struct _GstASFDemuxClass {
......
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment