From b1db425343c8402c61fd860e8013847a4848c8a1 Mon Sep 17 00:00:00 2001
From: Thibault Saunier <tsaunier@igalia.com>
Date: Thu, 29 Aug 2019 11:16:39 -0400
Subject: [PATCH] oggdemux: Move seeking in pull mode to the streaming thread

Flushing and teering down the streaming thread from the seeking thread
and simply letting the streaming thread handle the seek event in its
loop function.

Fixes https://gitlab.freedesktop.org/gstreamer/gst-plugins-base/issues/639
---
 ext/ogg/gstoggdemux.c | 137 +++++++++++++++++++++++++-----------------
 ext/ogg/gstoggdemux.h |   4 +-
 2 files changed, 84 insertions(+), 57 deletions(-)

diff --git a/ext/ogg/gstoggdemux.c b/ext/ogg/gstoggdemux.c
index a4e10749bc..8d13dc6384 100644
--- a/ext/ogg/gstoggdemux.c
+++ b/ext/ogg/gstoggdemux.c
@@ -632,7 +632,7 @@ gst_ogg_demux_chain_peer (GstOggPad * pad, ogg_packet * packet,
             beyond = ipad->map.n_index
                 && ipad->map.index[ipad->map.n_index - 1].offset >= length;
             if (beyond) {
-              GST_WARNING_OBJECT (pad, "Index offsets beyong byte length");
+              GST_WARNING_OBJECT (pad, "Index offsets beyond byte length");
               if (ipad->discont) {
                 /* hole - the index is most likely screwed up */
                 GST_WARNING_OBJECT (ogg, "Discarding entire index");
@@ -1511,7 +1511,7 @@ gst_ogg_demux_estimate_bisection_target (GstOggDemux * ogg, float seek_quality)
   GST_DEBUG_OBJECT (ogg, "Raw best guess: %" G_GINT64_FORMAT, best);
 
   /* offset the guess down as we need to capture the start of the
-     page we are targetting - but only do so if we did not undershoot
+     page we are targeting - but only do so if we did not undershoot
      last time, as we're likely to still do this time */
   if (!ogg->seek_undershot) {
     /* very small packets are packed on pages, so offset by at least
@@ -3516,7 +3516,7 @@ gst_ogg_demux_perform_seek_pull (GstOggDemux * ogg, GstEvent * event)
 {
   GstOggChain *chain = NULL;
   gboolean res;
-  gboolean flush, accurate, keyframe;
+  gboolean accurate, keyframe;
   GstFormat format;
   gdouble rate;
   GstSeekFlags flags;
@@ -3524,7 +3524,6 @@ gst_ogg_demux_perform_seek_pull (GstOggDemux * ogg, GstEvent * event)
   gint64 start, stop;
   gboolean update;
   guint32 seqnum;
-  GstEvent *tevent;
 
   if (event) {
     GST_DEBUG_OBJECT (ogg, "seek with event");
@@ -3548,41 +3547,10 @@ gst_ogg_demux_perform_seek_pull (GstOggDemux * ogg, GstEvent * event)
 
   GST_DEBUG_OBJECT (ogg, "seek, rate %g", rate);
 
-  flush = flags & GST_SEEK_FLAG_FLUSH;
   accurate = flags & GST_SEEK_FLAG_ACCURATE;
   keyframe = flags & GST_SEEK_FLAG_KEY_UNIT;
 
-  /* first step is to unlock the streaming thread if it is
-   * blocked in a chain call, we do this by starting the flush. because
-   * we cannot yet hold any streaming lock, we have to protect the chains
-   * with their own lock. */
-  if (flush) {
-    gint i;
-
-    tevent = gst_event_new_flush_start ();
-    gst_event_set_seqnum (tevent, seqnum);
-
-    gst_event_ref (tevent);
-    gst_pad_push_event (ogg->sinkpad, tevent);
-
-    GST_CHAIN_LOCK (ogg);
-    for (i = 0; i < ogg->chains->len; i++) {
-      GstOggChain *chain = g_array_index (ogg->chains, GstOggChain *, i);
-      gint j;
-
-      for (j = 0; j < chain->streams->len; j++) {
-        GstOggPad *pad = g_array_index (chain->streams, GstOggPad *, j);
-
-        gst_event_ref (tevent);
-        gst_pad_push_event (GST_PAD (pad), tevent);
-      }
-    }
-    GST_CHAIN_UNLOCK (ogg);
-
-    gst_event_unref (tevent);
-  } else {
-    gst_pad_pause_task (ogg->sinkpad);
-  }
+  gst_pad_pause_task (ogg->sinkpad);
 
   /* now grab the stream lock so that streaming cannot continue, for
    * non flushing seeks when the element is in PAUSED this could block
@@ -3598,14 +3566,6 @@ gst_ogg_demux_perform_seek_pull (GstOggDemux * ogg, GstEvent * event)
       GST_TIME_FORMAT, GST_TIME_ARGS (ogg->segment.start),
       GST_TIME_ARGS (ogg->segment.stop));
 
-  /* we need to stop flushing on the srcpad as we're going to use it
-   * next. We can do this as we have the STREAM lock now. */
-  if (flush) {
-    tevent = gst_event_new_flush_stop (TRUE);
-    gst_event_set_seqnum (tevent, seqnum);
-    gst_pad_push_event (ogg->sinkpad, tevent);
-  }
-
   {
     gint i;
 
@@ -3640,13 +3600,6 @@ gst_ogg_demux_perform_seek_pull (GstOggDemux * ogg, GstEvent * event)
     gint64 position, begin_time;
     GstSegment segment;
 
-    /* we have to send the flush to the old chain, not the new one */
-    if (flush) {
-      tevent = gst_event_new_flush_stop (TRUE);
-      gst_event_set_seqnum (tevent, seqnum);
-      gst_ogg_demux_send_event (ogg, tevent);
-    }
-
     /* we need this to see how far inside the chain we need to start */
     if (chain->begin_time != GST_CLOCK_TIME_NONE)
       begin_time = chain->begin_time;
@@ -3992,13 +3945,80 @@ error_locked:
   goto error;
 }
 
+static gboolean
+gst_ogg_demux_setup_seek_pull (GstOggDemux * ogg, GstEvent * event)
+{
+  gboolean flush;
+  GstSeekFlags flags;
+  GstEvent *tevent;
+  guint32 seqnum = gst_event_get_seqnum (event);
+
+  GST_DEBUG_OBJECT (ogg, "Scheduling seek: %" GST_PTR_FORMAT, event);
+  gst_event_parse_seek (event, NULL, NULL, &flags, NULL, NULL, NULL, NULL);
+
+  flush = flags & GST_SEEK_FLAG_FLUSH;
+
+  /* first step is to unlock the streaming thread if it is
+   * blocked in a chain call, we do this by starting the flush. because
+   * we cannot yet hold any streaming lock, we have to protect the chains
+   * with their own lock. */
+  if (flush) {
+    gint i;
+
+    tevent = gst_event_new_flush_start ();
+    gst_event_set_seqnum (tevent, seqnum);
+
+    gst_event_ref (tevent);
+    gst_pad_push_event (ogg->sinkpad, tevent);
+
+    GST_CHAIN_LOCK (ogg);
+    for (i = 0; i < ogg->chains->len; i++) {
+      GstOggChain *chain = g_array_index (ogg->chains, GstOggChain *, i);
+      gint j;
+
+      for (j = 0; j < chain->streams->len; j++) {
+        GstOggPad *pad = g_array_index (chain->streams, GstOggPad *, j);
+
+        gst_event_ref (tevent);
+        gst_pad_push_event (GST_PAD (pad), tevent);
+      }
+    }
+    GST_CHAIN_UNLOCK (ogg);
+
+    gst_event_unref (tevent);
+  }
+
+  gst_pad_pause_task (ogg->sinkpad);
+
+  /* now grab the stream lock so that streaming cannot continue, for
+   * non flushing seeks when the element is in PAUSED this could block
+   * forever. */
+  GST_PAD_STREAM_LOCK (ogg->sinkpad);
+
+  /* we need to stop flushing on the sinkpad as we're going to use it
+   * next. We can do this as we have the STREAM lock now. */
+  if (flush) {
+    tevent = gst_event_new_flush_stop (TRUE);
+    gst_event_set_seqnum (tevent, seqnum);
+    gst_pad_push_event (ogg->sinkpad, gst_event_ref (tevent));
+    gst_ogg_demux_send_event (ogg, tevent);
+  }
+
+  gst_event_replace (&ogg->seek_event, event);
+  gst_pad_start_task (ogg->sinkpad, (GstTaskFunction) gst_ogg_demux_loop,
+      ogg->sinkpad, NULL);
+  GST_PAD_STREAM_UNLOCK (ogg->sinkpad);
+
+  return TRUE;
+}
+
 static gboolean
 gst_ogg_demux_perform_seek (GstOggDemux * ogg, GstEvent * event)
 {
   gboolean res;
 
   if (ogg->pullmode) {
-    res = gst_ogg_demux_perform_seek_pull (ogg, event);
+    res = gst_ogg_demux_setup_seek_pull (ogg, event);
   } else {
     res = gst_ogg_demux_perform_seek_push (ogg, event);
   }
@@ -4026,7 +4046,7 @@ gst_ogg_demux_bisect_forward_serialno (GstOggDemux * ogg,
       "bisect begin: %" G_GINT64_FORMAT ", searched: %" G_GINT64_FORMAT
       ", end %" G_GINT64_FORMAT ", chain: %p", begin, searched, end, chain);
 
-  /* the below guards against garbage seperating the last and
+  /* the below guards against garbage separating the last and
    * first pages of two links. */
   while (searched < endsearched) {
     gint64 bisect;
@@ -4862,12 +4882,15 @@ static void
 gst_ogg_demux_loop (GstOggPad * pad)
 {
   GstOggDemux *ogg;
+  gboolean res;
   GstFlowReturn ret;
+  GstEvent *seek;
 
   ogg = GST_OGG_DEMUX (GST_OBJECT_PARENT (pad));
+  seek = ogg->seek_event;
+  ogg->seek_event = NULL;
 
   if (ogg->need_chains) {
-    gboolean res;
 
     /* this is the only place where we write chains and thus need to lock. */
     GST_CHAIN_LOCK (ogg);
@@ -4883,8 +4906,12 @@ gst_ogg_demux_loop (GstOggPad * pad)
     GST_OBJECT_UNLOCK (ogg);
 
     /* and seek to configured positions without FLUSH */
-    res = gst_ogg_demux_perform_seek_pull (ogg, NULL);
+    res = gst_ogg_demux_perform_seek_pull (ogg, seek);
 
+    if (!res)
+      goto seek_failed;
+  } else if (seek) {
+    res = gst_ogg_demux_perform_seek_pull (ogg, seek);
     if (!res)
       goto seek_failed;
   }
diff --git a/ext/ogg/gstoggdemux.h b/ext/ogg/gstoggdemux.h
index 016c14ac37..5ac81b7f32 100644
--- a/ext/ogg/gstoggdemux.h
+++ b/ext/ogg/gstoggdemux.h
@@ -97,7 +97,7 @@ struct _GstOggPad
   gint64 first_granule;         /* the granulepos of first page == first sample in next page */
   GstClockTime first_time;      /* the timestamp of the second page or granuletime of first page */
 
-  GstClockTime position;        /* position when last push occured; used to detect when we
+  GstClockTime position;        /* position when last push occurred; used to detect when we
                                  * need to send a newsegment update event for sparse streams */
 
   GList *continued;
@@ -203,7 +203,7 @@ struct _GstOggDemux
   ogg_sync_state sync;
   long chunk_size;
 
-  /* Seek events set up by the streaming thread in push mode */
+  /* Seek events set up by the streaming thread */
   GstEvent *seek_event;
   GThread *seek_event_thread;
   GMutex seek_event_mutex;
-- 
GitLab