gstadaptivedemux.c 142 KB
Newer Older
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70
/* GStreamer
 *
 * Copyright (C) 2014 Samsung Electronics. All rights reserved.
 *   Author: Thiago Santos <thiagoss@osg.samsung.com>
 *
 * This library is free software; you can redistribute it and/or
 * modify it under the terms of the GNU Library General Public
 * License as published by the Free Software Foundation; either
 * version 2 of the License, or (at your option) any later version.
 *
 * This library is distributed in the hope that it will be useful,
 * but WITHOUT ANY WARRANTY; without even the implied warranty of
 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
 * Library General Public License for more details.
 *
 * You should have received a copy of the GNU Library General Public
 * License along with this library; if not, write to the
 * Free Software Foundation, Inc., 51 Franklin St, Fifth Floor,
 * Boston, MA 02110-1301, USA.
 */

/**
 * SECTION:gstadaptivedemux
 * @short_description: Base class for adaptive demuxers
 * @see_also:
 *
 * What is an adaptive demuxer?
 * Adaptive demuxers are special demuxers in the sense that they don't
 * actually demux data received from upstream but download the data
 * themselves.
 *
 * Adaptive formats (HLS, DASH, MSS) are composed of a manifest file and
 * a set of fragments. The manifest describes the available media and
 * the sequence of fragments to use. Each fragments contains a small
 * part of the media (typically only a few seconds). It is possible for
 * the manifest to have the same media available in different configurations
 * (bitrates for example) so that the client can select the one that
 * best suits its scenario (network fluctuation, hardware requirements...).
 * It is possible to switch from one representation of the media to another
 * during playback. That's why it is called 'adaptive', because it can be
 * adapted to the client's needs.
 *
 * Architectural overview:
 * The manifest is received by the demuxer in its sink pad and, upon receiving
 * EOS, it parses the manifest and exposes the streams available in it. For
 * each stream a source element will be created and will download the list
 * of fragments one by one. Once a fragment is finished downloading, the next
 * URI is set to the source element and it starts fetching it and pushing
 * through the stream's pad. This implies that each stream is independent from
 * each other as it runs on a separate thread.
 *
 * After downloading each fragment, the download rate of it is calculated and
 * the demuxer has a chance to switch to a different bitrate if needed. The
 * switch can be done by simply pushing a new caps before the next fragment
 * when codecs are the same, or by exposing a new pad group if it needs
 * a codec change.
 *
 * Extra features:
 * - Not linked streams: Streams that are not-linked have their download threads
 *                       interrupted to save network bandwidth. When they are
 *                       relinked a reconfigure event is received and the
 *                       stream is restarted.
 *
 * Subclasses:
 * While GstAdaptiveDemux is responsible for the workflow, it knows nothing
 * about the intrinsics of the subclass formats, so the subclasses are
 * resposible for maintaining the manifest data structures and stream
 * information.
 */

71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117
/*
MT safety.
The following rules were observed while implementing MT safety in adaptive demux:
1. If a variable is accessed from multiple threads and at least one thread
writes to it, then all the accesses needs to be done from inside a critical section.
2. If thread A wants to join thread B then at the moment it calls gst_task_join
it must not hold any mutexes that thread B might take.

Adaptive demux API can be called from several threads. More, adaptive demux
starts some threads to monitor the download of fragments. In order to protect
accesses to shared variables (demux and streams) all the API functions that
can be run in different threads will need to get a mutex (manifest_lock)
when they start and release it when they end. Because some of those functions
can indirectly call other API functions (eg they can generate events or messages
that are processed in the same thread) the manifest_lock must be recursive.

The manifest_lock will serialize the public API making access to shared
variables safe. But some of these functions will try at some moment to join
threads created by adaptive demux, or to change the state of src elements
(which will block trying to join the src element streaming thread). Because
of rule 2, those functions will need to release the manifest_lock during the
call of gst_task_join. During this time they can be interrupted by other API calls.
For example, during the precessing of a seek event, gst_adaptive_demux_stop_tasks
is called and this will join all threads. In order to prevent interruptions
during such period, all the API functions will also use a second lock: api_lock.
This will be taken at the beginning of the function and released at the end,
but this time this lock will not be temporarily released during join.
This lock will be used only by API calls (not by gst_adaptive_demux_stream_download_loop
or gst_adaptive_demux_updates_loop or _src_chain or _src_event) so it is safe
to hold it while joining the threads or changing the src element state. The
api_lock will serialise all external requests to adaptive demux. In order to
avoid deadlocks, if a function needs to acquire both manifest and api locks,
the api_lock will be taken first and the manifest_lock second.

By using the api_lock a thread is protected against other API calls. But when
temporarily dropping the manifest_lock, it will be vulnerable to changes from
threads that use only the manifest_lock and not the api_lock. These threads run
one of the following functions: gst_adaptive_demux_stream_download_loop,
gst_adaptive_demux_updates_loop, _src_chain, _src_event. In order to guarantee
that all operations during an API call are not impacted by other writes, the
above mentioned functions must check a cancelled flag every time they reacquire
the manifest_lock. If the flag is set, they must exit immediately, without
performing any changes on the shared data. In this way, an API call (eg seek
request) can set the cancel flag before releasing the manifest_lock and be sure
that the demux object and its streams are not changed by anybody else.
*/

118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134
#ifdef HAVE_CONFIG_H
#include "config.h"
#endif

#include "gstadaptivedemux.h"
#include "gst/gst-i18n-plugin.h"
#include <gst/base/gstadapter.h>

GST_DEBUG_CATEGORY (adaptivedemux_debug);
#define GST_CAT_DEFAULT adaptivedemux_debug

#define GST_ADAPTIVE_DEMUX_GET_PRIVATE(obj)  \
    (G_TYPE_INSTANCE_GET_PRIVATE ((obj), GST_TYPE_ADAPTIVE_DEMUX, \
        GstAdaptiveDemuxPrivate))

#define MAX_DOWNLOAD_ERROR_COUNT 3
#define DEFAULT_FAILED_COUNT 3
135
#define DEFAULT_CONNECTION_SPEED 0
136
#define DEFAULT_BITRATE_LIMIT 0.8f
137
#define SRC_QUEUE_MAX_BYTES 20 * 1024 * 1024    /* For safety. Large enough to hold a segment. */
138
#define NUM_LOOKBACK_FRAGMENTS 3
139

140
#define GST_MANIFEST_GET_LOCK(d) (&(GST_ADAPTIVE_DEMUX_CAST(d)->priv->manifest_lock))
141 142 143 144 145 146 147 148 149 150
#define GST_MANIFEST_LOCK(d) G_STMT_START { \
    GST_TRACE("Locking from thread %p", g_thread_self()); \
    g_rec_mutex_lock (GST_MANIFEST_GET_LOCK (d)); \
    GST_TRACE("Locked from thread %p", g_thread_self()); \
 } G_STMT_END

#define GST_MANIFEST_UNLOCK(d) G_STMT_START { \
    GST_TRACE("Unlocking from thread %p", g_thread_self()); \
    g_rec_mutex_unlock (GST_MANIFEST_GET_LOCK (d)); \
 } G_STMT_END
151 152 153 154 155

#define GST_API_GET_LOCK(d) (&(GST_ADAPTIVE_DEMUX_CAST(d)->priv->api_lock))
#define GST_API_LOCK(d)   g_mutex_lock (GST_API_GET_LOCK (d));
#define GST_API_UNLOCK(d) g_mutex_unlock (GST_API_GET_LOCK (d));

156 157 158 159
#define GST_ADAPTIVE_DEMUX_SEGMENT_GET_LOCK(d) (&GST_ADAPTIVE_DEMUX_CAST(d)->priv->segment_lock)
#define GST_ADAPTIVE_DEMUX_SEGMENT_LOCK(d) g_mutex_lock (GST_ADAPTIVE_DEMUX_SEGMENT_GET_LOCK (d))
#define GST_ADAPTIVE_DEMUX_SEGMENT_UNLOCK(d) g_mutex_unlock (GST_ADAPTIVE_DEMUX_SEGMENT_GET_LOCK (d))

160 161 162
enum
{
  PROP_0,
163
  PROP_CONNECTION_SPEED,
164
  PROP_BITRATE_LIMIT,
165 166
  PROP_LAST
};
167

168 169
/* Internal, so not using GST_FLOW_CUSTOM_SUCCESS_N */
#define GST_ADAPTIVE_DEMUX_FLOW_SWITCH (GST_FLOW_CUSTOM_SUCCESS_2 + 1)
170

171 172
struct _GstAdaptiveDemuxPrivate
{
173 174
  GstAdapter *input_adapter;    /* protected by manifest_lock */
  gboolean have_manifest;       /* protected by manifest_lock */
175

176
  GList *old_streams;           /* protected by manifest_lock */
177

178
  GstTask *updates_task;        /* MT safe */
179 180
  GRecMutex updates_lock;
  GMutex updates_timed_lock;
181 182 183 184
  GCond updates_timed_cond;     /* protected by updates_timed_lock */
  gboolean stop_updates_task;   /* protected by updates_timed_lock */

  /* used only from updates_task, no need to protect it */
185 186
  gint update_failed_count;

187 188 189 190 191 192
  guint32 segment_seqnum;       /* protected by manifest_lock */

  /* main lock used to protect adaptive demux and all its streams.
   * It serializes the adaptive demux public API.
   */
  GRecMutex manifest_lock;
193

194 195 196 197 198 199 200
  /* condition to wait for manifest updates on a live stream.
   * In order to signal the manifest_cond, the caller needs to hold both
   * manifest_lock and manifest_update_lock (taken in this order)
   */
  GCond manifest_cond;
  GMutex manifest_update_lock;

201 202 203 204 205
  /* Lock and condition for prerolling streams before exposing */
  GMutex preroll_lock;
  GCond preroll_cond;
  gint preroll_pending;

206
  GMutex api_lock;
207 208 209 210 211 212

  /* Protects demux and stream segment information
   * Needed because seeks can update segment information
   * without needing to stop tasks when they just want to
   * update the segment boundaries */
  GMutex segment_lock;
213 214
};

215 216
typedef struct _GstAdaptiveDemuxTimer
{
217
  volatile gint ref_count;
218 219 220 221 222 223
  GCond *cond;
  GMutex *mutex;
  GstClockID clock_id;
  gboolean fired;
} GstAdaptiveDemuxTimer;

224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249
static GstBinClass *parent_class = NULL;
static void gst_adaptive_demux_class_init (GstAdaptiveDemuxClass * klass);
static void gst_adaptive_demux_init (GstAdaptiveDemux * dec,
    GstAdaptiveDemuxClass * klass);
static void gst_adaptive_demux_finalize (GObject * object);
static GstStateChangeReturn gst_adaptive_demux_change_state (GstElement *
    element, GstStateChange transition);

static void gst_adaptive_demux_handle_message (GstBin * bin, GstMessage * msg);

static gboolean gst_adaptive_demux_sink_event (GstPad * pad, GstObject * parent,
    GstEvent * event);
static GstFlowReturn gst_adaptive_demux_sink_chain (GstPad * pad,
    GstObject * parent, GstBuffer * buffer);
static gboolean gst_adaptive_demux_src_query (GstPad * pad, GstObject * parent,
    GstQuery * query);
static gboolean gst_adaptive_demux_src_event (GstPad * pad, GstObject * parent,
    GstEvent * event);

static gboolean
gst_adaptive_demux_push_src_event (GstAdaptiveDemux * demux, GstEvent * event);

static void gst_adaptive_demux_updates_loop (GstAdaptiveDemux * demux);
static void gst_adaptive_demux_stream_download_loop (GstAdaptiveDemuxStream *
    stream);
static void gst_adaptive_demux_reset (GstAdaptiveDemux * demux);
250
static gboolean gst_adaptive_demux_prepare_streams (GstAdaptiveDemux * demux,
251
    gboolean first_and_live);
252
static gboolean gst_adaptive_demux_expose_streams (GstAdaptiveDemux * demux);
253 254
static gboolean gst_adaptive_demux_is_live (GstAdaptiveDemux * demux);
static GstFlowReturn gst_adaptive_demux_stream_seek (GstAdaptiveDemux * demux,
255 256
    GstAdaptiveDemuxStream * stream, gboolean forward, GstSeekFlags flags,
    GstClockTime ts, GstClockTime * final_ts);
257 258 259 260 261 262 263 264 265 266 267 268
static gboolean gst_adaptive_demux_stream_has_next_fragment (GstAdaptiveDemux *
    demux, GstAdaptiveDemuxStream * stream);
static gboolean gst_adaptive_demux_stream_select_bitrate (GstAdaptiveDemux *
    demux, GstAdaptiveDemuxStream * stream, guint64 bitrate);
static GstFlowReturn
gst_adaptive_demux_stream_update_fragment_info (GstAdaptiveDemux * demux,
    GstAdaptiveDemuxStream * stream);
static gint64
gst_adaptive_demux_stream_get_fragment_waiting_time (GstAdaptiveDemux * demux,
    GstAdaptiveDemuxStream * stream);
static GstFlowReturn gst_adaptive_demux_update_manifest (GstAdaptiveDemux *
    demux);
269 270
static GstFlowReturn
gst_adaptive_demux_update_manifest_default (GstAdaptiveDemux * demux);
271 272 273 274 275 276 277 278
static gboolean gst_adaptive_demux_has_next_period (GstAdaptiveDemux * demux);
static void gst_adaptive_demux_advance_period (GstAdaptiveDemux * demux);

static void gst_adaptive_demux_stream_free (GstAdaptiveDemuxStream * stream);
static GstFlowReturn
gst_adaptive_demux_stream_push_event (GstAdaptiveDemuxStream * stream,
    GstEvent * event);

279 280 281 282 283
static void gst_adaptive_demux_stop_manifest_update_task (GstAdaptiveDemux *
    demux);
static void gst_adaptive_demux_start_manifest_update_task (GstAdaptiveDemux *
    demux);

284 285
static void gst_adaptive_demux_start_tasks (GstAdaptiveDemux * demux,
    gboolean start_preroll_streams);
286 287
static void gst_adaptive_demux_stop_tasks (GstAdaptiveDemux * demux,
    gboolean stop_updates);
288 289
static GstFlowReturn gst_adaptive_demux_combine_flows (GstAdaptiveDemux *
    demux);
290 291 292
static void
gst_adaptive_demux_stream_fragment_download_finish (GstAdaptiveDemuxStream *
    stream, GstFlowReturn ret, GError * err);
293 294
static GstFlowReturn
gst_adaptive_demux_stream_data_received_default (GstAdaptiveDemux * demux,
295
    GstAdaptiveDemuxStream * stream, GstBuffer * buffer);
296 297 298
static GstFlowReturn
gst_adaptive_demux_stream_finish_fragment_default (GstAdaptiveDemux * demux,
    GstAdaptiveDemuxStream * stream);
299 300 301
static GstFlowReturn
gst_adaptive_demux_stream_advance_fragment_unlocked (GstAdaptiveDemux * demux,
    GstAdaptiveDemuxStream * stream, GstClockTime duration);
302 303 304 305 306
static gboolean
gst_adaptive_demux_wait_until (GstClock * clock, GCond * cond, GMutex * mutex,
    GstClockTime end_time);
static gboolean gst_adaptive_demux_clock_callback (GstClock * clock,
    GstClockTime time, GstClockID id, gpointer user_data);
307 308 309
static gboolean
gst_adaptive_demux_requires_periodical_playlist_update_default (GstAdaptiveDemux
    * demux);
310 311 312 313 314 315 316 317 318 319 320 321 322 323 324 325 326 327 328 329 330 331 332 333 334 335 336 337 338

/* we can't use G_DEFINE_ABSTRACT_TYPE because we need the klass in the _init
 * method to get to the padtemplates */
GType
gst_adaptive_demux_get_type (void)
{
  static volatile gsize type = 0;

  if (g_once_init_enter (&type)) {
    GType _type;
    static const GTypeInfo info = {
      sizeof (GstAdaptiveDemuxClass),
      NULL,
      NULL,
      (GClassInitFunc) gst_adaptive_demux_class_init,
      NULL,
      NULL,
      sizeof (GstAdaptiveDemux),
      0,
      (GInstanceInitFunc) gst_adaptive_demux_init,
    };

    _type = g_type_register_static (GST_TYPE_BIN,
        "GstAdaptiveDemux", &info, G_TYPE_FLAG_ABSTRACT);
    g_once_init_leave (&type, _type);
  }
  return type;
}

339 340 341 342 343 344
static void
gst_adaptive_demux_set_property (GObject * object, guint prop_id,
    const GValue * value, GParamSpec * pspec)
{
  GstAdaptiveDemux *demux = GST_ADAPTIVE_DEMUX (object);

345 346 347
  GST_API_LOCK (demux);
  GST_MANIFEST_LOCK (demux);

348
  switch (prop_id) {
349 350 351 352 353
    case PROP_CONNECTION_SPEED:
      demux->connection_speed = g_value_get_uint (value) * 1000;
      GST_DEBUG_OBJECT (demux, "Connection speed set to %u",
          demux->connection_speed);
      break;
354 355 356
    case PROP_BITRATE_LIMIT:
      demux->bitrate_limit = g_value_get_float (value);
      break;
357 358 359 360
    default:
      G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
      break;
  }
361 362 363

  GST_MANIFEST_UNLOCK (demux);
  GST_API_UNLOCK (demux);
364 365 366 367 368 369 370 371
}

static void
gst_adaptive_demux_get_property (GObject * object, guint prop_id,
    GValue * value, GParamSpec * pspec)
{
  GstAdaptiveDemux *demux = GST_ADAPTIVE_DEMUX (object);

372 373
  GST_MANIFEST_LOCK (demux);

374
  switch (prop_id) {
375 376 377
    case PROP_CONNECTION_SPEED:
      g_value_set_uint (value, demux->connection_speed / 1000);
      break;
378 379 380
    case PROP_BITRATE_LIMIT:
      g_value_set_float (value, demux->bitrate_limit);
      break;
381 382 383 384
    default:
      G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
      break;
  }
385 386

  GST_MANIFEST_UNLOCK (demux);
387 388
}

389 390 391 392 393 394 395 396 397 398 399 400 401 402 403 404 405
static void
gst_adaptive_demux_class_init (GstAdaptiveDemuxClass * klass)
{
  GObjectClass *gobject_class;
  GstElementClass *gstelement_class;
  GstBinClass *gstbin_class;

  gobject_class = G_OBJECT_CLASS (klass);
  gstelement_class = GST_ELEMENT_CLASS (klass);
  gstbin_class = GST_BIN_CLASS (klass);

  GST_DEBUG_CATEGORY_INIT (adaptivedemux_debug, "adaptivedemux", 0,
      "Base Adaptive Demux");

  parent_class = g_type_class_peek_parent (klass);
  g_type_class_add_private (klass, sizeof (GstAdaptiveDemuxPrivate));

406 407
  gobject_class->set_property = gst_adaptive_demux_set_property;
  gobject_class->get_property = gst_adaptive_demux_get_property;
408 409
  gobject_class->finalize = gst_adaptive_demux_finalize;

410 411 412 413 414 415
  g_object_class_install_property (gobject_class, PROP_CONNECTION_SPEED,
      g_param_spec_uint ("connection-speed", "Connection Speed",
          "Network connection speed in kbps (0 = calculate from downloaded"
          " fragments)", 0, G_MAXUINT / 1000, DEFAULT_CONNECTION_SPEED,
          G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));

416 417 418 419 420 421 422 423
  /* FIXME 2.0: rename this property to bandwidth-usage or any better name */
  g_object_class_install_property (gobject_class, PROP_BITRATE_LIMIT,
      g_param_spec_float ("bitrate-limit",
          "Bitrate limit in %",
          "Limit of the available bitrate to use when switching to alternates.",
          0, 1, DEFAULT_BITRATE_LIMIT,
          G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));

424 425 426
  gstelement_class->change_state = gst_adaptive_demux_change_state;

  gstbin_class->handle_message = gst_adaptive_demux_handle_message;
427 428 429

  klass->data_received = gst_adaptive_demux_stream_data_received_default;
  klass->finish_fragment = gst_adaptive_demux_stream_finish_fragment_default;
430
  klass->update_manifest = gst_adaptive_demux_update_manifest_default;
431 432 433
  klass->requires_periodical_playlist_update =
      gst_adaptive_demux_requires_periodical_playlist_update_default;

434 435 436 437 438 439 440
}

static void
gst_adaptive_demux_init (GstAdaptiveDemux * demux,
    GstAdaptiveDemuxClass * klass)
{
  GstPadTemplate *pad_template;
441 442
  GstClockType clock_type = GST_CLOCK_TYPE_OTHER;
  GObjectClass *gobject_class;
443 444 445 446 447

  GST_DEBUG_OBJECT (demux, "gst_adaptive_demux_init");

  demux->priv = GST_ADAPTIVE_DEMUX_GET_PRIVATE (demux);
  demux->priv->input_adapter = gst_adapter_new ();
448
  demux->downloader = gst_uri_downloader_new ();
449
  gst_uri_downloader_set_parent (demux->downloader, GST_ELEMENT_CAST (demux));
450
  demux->stream_struct_size = sizeof (GstAdaptiveDemuxStream);
451 452 453
  demux->priv->segment_seqnum = gst_util_seqnum_next ();
  demux->have_group_id = FALSE;
  demux->group_id = G_MAXUINT;
454 455 456

  gst_segment_init (&demux->segment, GST_FORMAT_TIME);

457 458
  gst_bin_set_suppressed_flags (GST_BIN_CAST (demux),
      GST_ELEMENT_FLAG_SOURCE | GST_ELEMENT_FLAG_SINK);
459

460 461 462 463 464 465 466 467 468 469 470 471 472 473 474 475 476 477 478 479 480 481 482 483
  demux->realtime_clock = gst_system_clock_obtain ();
  g_assert (demux->realtime_clock != NULL);
  gobject_class = G_OBJECT_GET_CLASS (demux->realtime_clock);
  if (g_object_class_find_property (gobject_class, "clock-type")) {
    g_object_get (demux->realtime_clock, "clock-type", &clock_type, NULL);
  } else {
    GST_WARNING_OBJECT (demux,
        "System clock does not have clock-type property");
  }
  if (clock_type == GST_CLOCK_TYPE_REALTIME) {
    demux->clock_offset = 0;
  } else {
    GDateTime *utc_now;
    GstClockTime rtc_now;
    GTimeVal gtv;

    utc_now = g_date_time_new_now_utc ();
    rtc_now = gst_clock_get_time (demux->realtime_clock);
    g_date_time_to_timeval (utc_now, &gtv);
    demux->clock_offset =
        gtv.tv_sec * G_TIME_SPAN_SECOND + gtv.tv_usec -
        GST_TIME_AS_USECONDS (rtc_now);
    g_date_time_unref (utc_now);
  }
484 485 486 487 488 489 490 491 492
  g_rec_mutex_init (&demux->priv->updates_lock);
  demux->priv->updates_task =
      gst_task_new ((GstTaskFunction) gst_adaptive_demux_updates_loop,
      demux, NULL);
  gst_task_set_lock (demux->priv->updates_task, &demux->priv->updates_lock);

  g_mutex_init (&demux->priv->updates_timed_lock);
  g_cond_init (&demux->priv->updates_timed_cond);

493 494 495 496 497
  g_cond_init (&demux->priv->manifest_cond);
  g_mutex_init (&demux->priv->manifest_update_lock);

  g_rec_mutex_init (&demux->priv->manifest_lock);
  g_mutex_init (&demux->priv->api_lock);
498
  g_mutex_init (&demux->priv->segment_lock);
499

500 501 502
  g_cond_init (&demux->priv->preroll_cond);
  g_mutex_init (&demux->priv->preroll_lock);

503 504 505 506
  pad_template =
      gst_element_class_get_pad_template (GST_ELEMENT_CLASS (klass), "sink");
  g_return_if_fail (pad_template != NULL);

507
  demux->sinkpad = gst_pad_new_from_template (pad_template, "sink");
508 509 510 511 512
  gst_pad_set_event_function (demux->sinkpad,
      GST_DEBUG_FUNCPTR (gst_adaptive_demux_sink_event));
  gst_pad_set_chain_function (demux->sinkpad,
      GST_DEBUG_FUNCPTR (gst_adaptive_demux_sink_chain));

513
  /* Properties */
514
  demux->bitrate_limit = DEFAULT_BITRATE_LIMIT;
515
  demux->connection_speed = DEFAULT_CONNECTION_SPEED;
516

517 518 519 520 521 522 523 524 525 526 527 528
  gst_element_add_pad (GST_ELEMENT (demux), demux->sinkpad);
}

static void
gst_adaptive_demux_finalize (GObject * object)
{
  GstAdaptiveDemux *demux = GST_ADAPTIVE_DEMUX_CAST (object);
  GstAdaptiveDemuxPrivate *priv = demux->priv;

  GST_DEBUG_OBJECT (object, "finalize");

  g_object_unref (priv->input_adapter);
529
  g_object_unref (demux->downloader);
530 531 532

  g_mutex_clear (&priv->updates_timed_lock);
  g_cond_clear (&priv->updates_timed_cond);
533 534
  g_mutex_clear (&demux->priv->manifest_update_lock);
  g_cond_clear (&demux->priv->manifest_cond);
535 536
  g_object_unref (priv->updates_task);
  g_rec_mutex_clear (&priv->updates_lock);
537 538
  g_rec_mutex_clear (&demux->priv->manifest_lock);
  g_mutex_clear (&demux->priv->api_lock);
539
  g_mutex_clear (&demux->priv->segment_lock);
540 541 542 543
  if (demux->realtime_clock) {
    gst_object_unref (demux->realtime_clock);
    demux->realtime_clock = NULL;
  }
544

545 546 547
  g_cond_clear (&demux->priv->preroll_cond);
  g_mutex_clear (&demux->priv->preroll_lock);

548 549 550 551 552 553 554 555 556 557
  G_OBJECT_CLASS (parent_class)->finalize (object);
}

static GstStateChangeReturn
gst_adaptive_demux_change_state (GstElement * element,
    GstStateChange transition)
{
  GstAdaptiveDemux *demux = GST_ADAPTIVE_DEMUX_CAST (element);
  GstStateChangeReturn result = GST_STATE_CHANGE_FAILURE;

558 559
  GST_API_LOCK (demux);

560 561
  switch (transition) {
    case GST_STATE_CHANGE_PAUSED_TO_READY:
562
      GST_MANIFEST_LOCK (demux);
563
      demux->running = FALSE;
564
      gst_adaptive_demux_reset (demux);
565
      GST_MANIFEST_UNLOCK (demux);
566
      break;
567 568
    case GST_STATE_CHANGE_READY_TO_PAUSED:
      GST_MANIFEST_LOCK (demux);
569
      gst_adaptive_demux_reset (demux);
570 571
      if (demux->priv->have_manifest)
        gst_adaptive_demux_start_manifest_update_task (demux);
572 573 574
      demux->running = TRUE;
      GST_MANIFEST_UNLOCK (demux);
      break;
575 576 577 578
    default:
      break;
  }

579 580 581 582 583
  /* this must be run without MANIFEST_LOCK taken.
   * For PLAYING to PLAYING state changes, it will want to take a lock in
   * src element and that lock is held while the streaming thread is running.
   * The streaming thread will take the MANIFEST_LOCK, leading to a deadlock.
   */
584 585
  result = GST_ELEMENT_CLASS (parent_class)->change_state (element, transition);

586
  GST_API_UNLOCK (demux);
587 588 589 590 591 592 593 594 595
  return result;
}

static gboolean
gst_adaptive_demux_sink_event (GstPad * pad, GstObject * parent,
    GstEvent * event)
{
  GstAdaptiveDemux *demux = GST_ADAPTIVE_DEMUX_CAST (parent);
  GstAdaptiveDemuxClass *demux_class;
596
  gboolean ret;
597 598 599

  switch (event->type) {
    case GST_EVENT_FLUSH_STOP:
600 601 602
      GST_API_LOCK (demux);
      GST_MANIFEST_LOCK (demux);

603
      gst_adaptive_demux_reset (demux);
604 605 606 607 608 609 610

      ret = gst_pad_event_default (pad, parent, event);

      GST_MANIFEST_UNLOCK (demux);
      GST_API_UNLOCK (demux);

      return ret;
611 612 613 614 615
    case GST_EVENT_EOS:{
      GstQuery *query;
      gboolean query_res;
      gboolean ret = TRUE;
      gsize available;
616
      GstBuffer *manifest_buffer;
617

618 619 620
      GST_API_LOCK (demux);
      GST_MANIFEST_LOCK (demux);

621
      demux_class = GST_ADAPTIVE_DEMUX_GET_CLASS (demux);
622

623 624 625 626
      available = gst_adapter_available (demux->priv->input_adapter);

      if (available == 0) {
        GST_WARNING_OBJECT (demux, "Received EOS without a manifest.");
627 628 629 630 631 632
        ret = gst_pad_event_default (pad, parent, event);

        GST_MANIFEST_UNLOCK (demux);
        GST_API_UNLOCK (demux);

        return ret;
633 634 635 636 637 638 639 640 641 642 643 644 645 646 647 648 649 650 651 652 653 654 655 656 657 658 659 660 661 662 663 664 665
      }

      GST_DEBUG_OBJECT (demux, "Got EOS on the sink pad: manifest fetched");

      /* Need to get the URI to use it as a base to generate the fragment's
       * uris */
      query = gst_query_new_uri ();
      query_res = gst_pad_peer_query (pad, query);
      if (query_res) {
        gchar *uri, *redirect_uri;
        gboolean permanent;

        gst_query_parse_uri (query, &uri);
        gst_query_parse_uri_redirection (query, &redirect_uri);
        gst_query_parse_uri_redirection_permanent (query, &permanent);

        if (permanent && redirect_uri) {
          demux->manifest_uri = redirect_uri;
          demux->manifest_base_uri = NULL;
          g_free (uri);
        } else {
          demux->manifest_uri = uri;
          demux->manifest_base_uri = redirect_uri;
        }

        GST_DEBUG_OBJECT (demux, "Fetched manifest at URI: %s (base: %s)",
            demux->manifest_uri, GST_STR_NULL (demux->manifest_base_uri));
      } else {
        GST_WARNING_OBJECT (demux, "Upstream URI query failed.");
      }
      gst_query_unref (query);

      /* Let the subclass parse the manifest */
666
      manifest_buffer =
667
          gst_adapter_take_buffer (demux->priv->input_adapter, available);
668
      if (!demux_class->process_manifest (demux, manifest_buffer)) {
669 670 671 672 673 674
        /* In most cases, this will happen if we set a wrong url in the
         * source element and we have received the 404 HTML response instead of
         * the manifest */
        GST_ELEMENT_ERROR (demux, STREAM, DECODE, ("Invalid manifest."),
            (NULL));
        ret = FALSE;
675 676
      } else {
        demux->priv->have_manifest = TRUE;
677
      }
678
      gst_buffer_unref (manifest_buffer);
679 680 681

      gst_element_post_message (GST_ELEMENT_CAST (demux),
          gst_message_new_element (GST_OBJECT_CAST (demux),
682
              gst_structure_new (GST_ADAPTIVE_DEMUX_STATISTICS_MESSAGE_NAME,
683 684 685 686 687 688 689 690 691 692 693 694 695 696 697 698 699 700 701 702 703 704 705 706 707 708
                  "manifest-uri", G_TYPE_STRING,
                  demux->manifest_uri, "uri", G_TYPE_STRING,
                  demux->manifest_uri,
                  "manifest-download-start", GST_TYPE_CLOCK_TIME,
                  GST_CLOCK_TIME_NONE,
                  "manifest-download-stop", GST_TYPE_CLOCK_TIME,
                  gst_util_get_timestamp (), NULL)));

      if (ret) {
        /* Send duration message */
        if (!gst_adaptive_demux_is_live (demux)) {
          GstClockTime duration = demux_class->get_duration (demux);

          if (duration != GST_CLOCK_TIME_NONE) {
            GST_DEBUG_OBJECT (demux,
                "Sending duration message : %" GST_TIME_FORMAT,
                GST_TIME_ARGS (duration));
            gst_element_post_message (GST_ELEMENT (demux),
                gst_message_new_duration_changed (GST_OBJECT (demux)));
          } else {
            GST_DEBUG_OBJECT (demux,
                "media duration unknown, can not send the duration message");
          }
        }

        if (demux->next_streams) {
709
          gst_adaptive_demux_prepare_streams (demux,
710
              gst_adaptive_demux_is_live (demux));
711
          gst_adaptive_demux_start_tasks (demux, TRUE);
712
          gst_adaptive_demux_start_manifest_update_task (demux);
713 714 715 716 717 718 719 720 721 722 723
        } else {
          /* no streams */
          GST_WARNING_OBJECT (demux, "No streams created from manifest");
          GST_ELEMENT_ERROR (demux, STREAM, DEMUX,
              (_("This file contains no playable streams.")),
              ("No known stream formats found at the Manifest"));
          ret = FALSE;
        }

      }
      GST_MANIFEST_UNLOCK (demux);
724
      GST_API_UNLOCK (demux);
725 726 727 728 729 730 731 732 733 734 735 736 737 738 739 740 741 742 743 744

      gst_event_unref (event);
      return ret;
    }
    case GST_EVENT_SEGMENT:
      /* Swallow newsegments, we'll push our own */
      gst_event_unref (event);
      return TRUE;
    default:
      break;
  }

  return gst_pad_event_default (pad, parent, event);
}

static GstFlowReturn
gst_adaptive_demux_sink_chain (GstPad * pad, GstObject * parent,
    GstBuffer * buffer)
{
  GstAdaptiveDemux *demux = GST_ADAPTIVE_DEMUX_CAST (parent);
745 746 747

  GST_MANIFEST_LOCK (demux);

748 749 750 751 752
  gst_adapter_push (demux->priv->input_adapter, buffer);

  GST_INFO_OBJECT (demux, "Received manifest buffer, total size is %i bytes",
      (gint) gst_adapter_available (demux->priv->input_adapter));

753
  GST_MANIFEST_UNLOCK (demux);
754 755 756
  return GST_FLOW_OK;
}

757
/* must be called with manifest_lock taken */
758 759 760 761 762
static void
gst_adaptive_demux_reset (GstAdaptiveDemux * demux)
{
  GstAdaptiveDemuxClass *klass = GST_ADAPTIVE_DEMUX_GET_CLASS (demux);
  GList *iter;
763 764 765 766 767 768 769 770
  GList *old_streams;
  GstEvent *eos;

  /* take ownership of old_streams before releasing the manifest_lock in
   * gst_adaptive_demux_stop_tasks
   */
  old_streams = demux->priv->old_streams;
  demux->priv->old_streams = NULL;
771

772
  gst_adaptive_demux_stop_tasks (demux, TRUE);
773 774 775 776

  if (klass->reset)
    klass->reset (demux);

777
  eos = gst_event_new_eos ();
778 779
  for (iter = demux->streams; iter; iter = g_list_next (iter)) {
    GstAdaptiveDemuxStream *stream = iter->data;
780 781 782 783
    if (stream->pad) {
      gst_pad_push_event (stream->pad, gst_event_ref (eos));
      gst_pad_set_active (stream->pad, FALSE);

784
      gst_element_remove_pad (GST_ELEMENT_CAST (demux), stream->pad);
785
    }
786 787
    gst_adaptive_demux_stream_free (stream);
  }
788
  gst_event_unref (eos);
789 790
  g_list_free (demux->streams);
  demux->streams = NULL;
791 792 793 794 795 796 797 798 799 800
  if (demux->prepared_streams) {
    g_list_free_full (demux->prepared_streams,
        (GDestroyNotify) gst_adaptive_demux_stream_free);
    demux->prepared_streams = NULL;
  }
  if (demux->next_streams) {
    g_list_free_full (demux->next_streams,
        (GDestroyNotify) gst_adaptive_demux_stream_free);
    demux->next_streams = NULL;
  }
801

802 803
  if (old_streams) {
    g_list_free_full (old_streams,
804 805 806
        (GDestroyNotify) gst_adaptive_demux_stream_free);
  }

807 808 809 810 811 812
  if (demux->priv->old_streams) {
    g_list_free_full (demux->priv->old_streams,
        (GDestroyNotify) gst_adaptive_demux_stream_free);
    demux->priv->old_streams = NULL;
  }

813 814 815 816 817 818
  g_free (demux->manifest_uri);
  g_free (demux->manifest_base_uri);
  demux->manifest_uri = NULL;
  demux->manifest_base_uri = NULL;

  gst_adapter_clear (demux->priv->input_adapter);
819
  demux->priv->have_manifest = FALSE;
820 821 822 823 824

  gst_segment_init (&demux->segment, GST_FORMAT_TIME);

  demux->have_group_id = FALSE;
  demux->group_id = G_MAXUINT;
825
  demux->priv->segment_seqnum = gst_util_seqnum_next ();
826 827 828 829 830 831 832 833 834 835
}

static void
gst_adaptive_demux_handle_message (GstBin * bin, GstMessage * msg)
{
  GstAdaptiveDemux *demux = GST_ADAPTIVE_DEMUX_CAST (bin);

  switch (GST_MESSAGE_TYPE (msg)) {
    case GST_MESSAGE_ERROR:{
      GList *iter;
836
      GstAdaptiveDemuxStream *stream = NULL;
837 838 839
      GError *err = NULL;
      gchar *debug = NULL;
      gchar *new_error = NULL;
840
      const GstStructure *details = NULL;
841

842 843
      GST_MANIFEST_LOCK (demux);

844
      for (iter = demux->streams; iter; iter = g_list_next (iter)) {
845
        GstAdaptiveDemuxStream *cur = iter->data;
846
        if (gst_object_has_as_ancestor (GST_MESSAGE_SRC (msg),
847 848 849 850 851 852 853 854 855 856 857 858
                GST_OBJECT_CAST (cur->src))) {
          stream = cur;
          break;
        }
      }
      if (stream == NULL) {
        for (iter = demux->prepared_streams; iter; iter = g_list_next (iter)) {
          GstAdaptiveDemuxStream *cur = iter->data;
          if (gst_object_has_as_ancestor (GST_MESSAGE_SRC (msg),
                  GST_OBJECT_CAST (cur->src))) {
            stream = cur;
            break;
859
          }
860 861 862 863 864 865 866
        }
        if (stream == NULL) {
          GST_WARNING_OBJECT (demux,
              "Failed to locate stream for errored element");
          break;
        }
      }
867

868
      gst_message_parse_error (msg, &err, &debug);
869

870 871 872
      GST_WARNING_OBJECT (GST_ADAPTIVE_DEMUX_STREAM_PAD (stream),
          "Source posted error: %d:%d %s (%s)", err->domain, err->code,
          err->message, debug);
873

874 875 876 877 878
      if (debug)
        new_error = g_strdup_printf ("%s: %s\n", err->message, debug);
      if (new_error) {
        g_free (err->message);
        err->message = new_error;
879 880
      }

881 882 883 884 885 886 887 888 889 890 891 892 893
      gst_message_parse_error_details (msg, &details);
      if (details) {
        gst_structure_get_uint (details, "http-status-code",
            &stream->last_status_code);
      }

      /* error, but ask to retry */
      gst_adaptive_demux_stream_fragment_download_finish (stream,
          GST_FLOW_CUSTOM_ERROR, err);

      g_error_free (err);
      g_free (debug);

894 895
      GST_MANIFEST_UNLOCK (demux);

896 897 898 899 900 901 902 903 904 905 906 907 908 909 910 911
      gst_message_unref (msg);
      msg = NULL;
    }
      break;
    default:
      break;
  }

  if (msg)
    GST_BIN_CLASS (parent_class)->handle_message (bin, msg);
}

void
gst_adaptive_demux_set_stream_struct_size (GstAdaptiveDemux * demux,
    gsize struct_size)
{
912 913
  GST_API_LOCK (demux);
  GST_MANIFEST_LOCK (demux);
914
  demux->stream_struct_size = struct_size;
915 916
  GST_MANIFEST_UNLOCK (demux);
  GST_API_UNLOCK (demux);
917 918
}

919
/* must be called with manifest_lock taken */
920
static gboolean
921
gst_adaptive_demux_prepare_stream (GstAdaptiveDemux * demux,
922 923 924 925 926 927 928 929 930 931 932 933 934 935 936 937 938 939 940 941 942 943 944 945 946 947 948 949 950 951 952 953 954
    GstAdaptiveDemuxStream * stream)
{
  GstPad *pad = stream->pad;
  gchar *name = gst_pad_get_name (pad);
  GstEvent *event;
  gchar *stream_id;

  gst_pad_set_active (pad, TRUE);
  stream->need_header = TRUE;

  stream_id = gst_pad_create_stream_id (pad, GST_ELEMENT_CAST (demux), name);

  event =
      gst_pad_get_sticky_event (GST_ADAPTIVE_DEMUX_SINK_PAD (demux),
      GST_EVENT_STREAM_START, 0);
  if (event) {
    if (gst_event_parse_group_id (event, &demux->group_id))
      demux->have_group_id = TRUE;
    else
      demux->have_group_id = FALSE;
    gst_event_unref (event);
  } else if (!demux->have_group_id) {
    demux->have_group_id = TRUE;
    demux->group_id = gst_util_group_id_next ();
  }
  event = gst_event_new_stream_start (stream_id);
  if (demux->have_group_id)
    gst_event_set_group_id (event, demux->group_id);

  gst_pad_push_event (pad, event);
  g_free (stream_id);
  g_free (name);

955 956 957 958 959 960 961 962 963 964 965 966 967 968
  GST_DEBUG_OBJECT (demux, "Preparing srcpad %s:%s", GST_DEBUG_PAD_NAME (pad));

  stream->discont = TRUE;

  return TRUE;
}

static gboolean
gst_adaptive_demux_expose_stream (GstAdaptiveDemux * demux,
    GstAdaptiveDemuxStream * stream)
{
  gboolean ret;
  GstPad *pad = stream->pad;
  GstCaps *caps;
969 970 971

  if (stream->pending_caps) {
    gst_pad_set_caps (pad, stream->pending_caps);
972
    caps = stream->pending_caps;
973
    stream->pending_caps = NULL;
974 975
  } else {
    caps = gst_pad_get_current_caps (pad);
976 977
  }

978 979 980 981
  GST_DEBUG_OBJECT (demux, "Exposing srcpad %s:%s with caps %" GST_PTR_FORMAT,
      GST_DEBUG_PAD_NAME (pad), caps);
  if (caps)
    gst_caps_unref (caps);
982 983 984

  gst_object_ref (pad);

985 986 987 988 989 990
  /* Don't hold the manifest lock while exposing a pad */
  GST_MANIFEST_UNLOCK (demux);
  ret = gst_element_add_pad (GST_ELEMENT_CAST (demux), pad);
  GST_MANIFEST_LOCK (demux);

  return ret;
991 992
}

993
/* must be called with manifest_lock taken */
994 995 996 997 998 999 1000 1001
static GstClockTime
gst_adaptive_demux_stream_get_presentation_offset (GstAdaptiveDemux * demux,
    GstAdaptiveDemuxStream * stream)
{
  GstAdaptiveDemuxClass *klass;

  klass = GST_ADAPTIVE_DEMUX_GET_CLASS (demux);

1002 1003
  if (klass->get_presentation_offset == NULL)
    return 0;
1004 1005 1006 1007

  return klass->get_presentation_offset (demux, stream);
}

1008
/* must be called with manifest_lock taken */
1009 1010 1011 1012 1013 1014 1015 1016 1017 1018 1019 1020 1021
static GstClockTime
gst_adaptive_demux_get_period_start_time (GstAdaptiveDemux * demux)
{
  GstAdaptiveDemuxClass *klass;

  klass = GST_ADAPTIVE_DEMUX_GET_CLASS (demux);

  if (klass->get_period_start_time == NULL)
    return 0;

  return klass->get_period_start_time (demux);
}

1022
/* must be called with manifest_lock taken */
1023
static gboolean
1024
gst_adaptive_demux_prepare_streams (GstAdaptiveDemux * demux,
1025
    gboolean first_and_live)
1026 1027
{
  GList *iter;
1028
  GstClockTime period_start, min_pts = GST_CLOCK_TIME_NONE;
1029 1030

  g_return_val_if_fail (demux->next_streams != NULL, FALSE);
1031 1032 1033 1034 1035 1036
  if (demux->prepared_streams != NULL) {
    /* Old streams that were never exposed, due to a seek or so */
    GST_FIXME_OBJECT (demux,
        "Preparing new streams without cleaning up old ones!");
    return FALSE;
  }
1037

1038
  demux->prepared_streams = demux->next_streams;
1039 1040
  demux->next_streams = NULL;

1041 1042 1043
  if (!demux->running) {
    GST_DEBUG_OBJECT (demux, "Not exposing pads due to shutdown");
    return TRUE;
1044 1045
  }

1046
  for (iter = demux->prepared_streams; iter; iter = g_list_next (iter)) {
1047 1048
    GstAdaptiveDemuxStream *stream = iter->data;

1049 1050 1051
    stream->do_block = TRUE;

    if (!gst_adaptive_demux_prepare_stream (demux,
1052 1053 1054
            GST_ADAPTIVE_DEMUX_STREAM_CAST (stream))) {
      /* TODO act on error */
    }
1055 1056 1057 1058 1059 1060 1061 1062 1063 1064 1065 1066 1067

    if (first_and_live) {
      /* TODO we only need the first timestamp, maybe create a simple function */
      gst_adaptive_demux_stream_update_fragment_info (demux, stream);

      if (GST_CLOCK_TIME_IS_VALID (min_pts)) {
        min_pts = MIN (min_pts, stream->fragment.timestamp);
      } else {
        min_pts = stream->fragment.timestamp;
      }
    }
  }

1068 1069
  period_start = gst_adaptive_demux_get_period_start_time (demux);

1070 1071 1072 1073 1074 1075 1076 1077
  /* For live streams, the subclass is supposed to seek to the current
   * fragment and then tell us its timestamp in stream->fragment.timestamp.
   * We now also have to seek our demuxer segment to reflect this.
   *
   * FIXME: This needs some refactoring at some point.
   */
  if (first_and_live) {
    gst_segment_do_seek (&demux->segment, demux->segment.rate, GST_FORMAT_TIME,
1078 1079
        GST_SEEK_FLAG_FLUSH, GST_SEEK_TYPE_SET, min_pts + period_start,
        GST_SEEK_TYPE_NONE, -1, NULL);
1080 1081
  }

1082
  for (iter = demux->prepared_streams; iter; iter = g_list_next (iter)) {
1083
    GstAdaptiveDemuxStream *stream = iter->data;
1084
    GstClockTime offset;
1085

1086
    offset = gst_adaptive_demux_stream_get_presentation_offset (demux, stream);
1087
    stream->segment = demux->segment;
1088

1089 1090 1091 1092 1093 1094 1095 1096 1097 1098 1099 1100 1101 1102 1103 1104 1105 1106 1107 1108 1109 1110 1111 1112 1113 1114 1115
    /* The demuxer segment is just built from seek events, but for each stream
     * we have to adjust segments according to the current period and the
     * stream specific presentation time offset.
     *
     * For each period, buffer timestamps start again from 0. Additionally the
     * buffer timestamps are shifted by the stream specific presentation time
     * offset, so the first buffer timestamp of a period is 0 + presentation
     * time offset. If the stream contains timestamps itself, this is also
     * supposed to be the presentation time stored inside the stream.
     *
     * The stream time over periods is supposed to be continuous, that is the
     * buffer timestamp 0 + presentation time offset should map to the start
     * time of the current period.
     *
     *
     * The adjustment of the stream segments as such works the following.
     *
     * If the demuxer segment start is bigger than the period start, this
     * means that we have to drop some media at the beginning of the current
     * period, e.g. because a seek into the middle of the period has
     * happened. The amount of media to drop is the difference between the
     * period start and the demuxer segment start, and as each period starts
     * again from 0, this difference is going to be the actual stream's
     * segment start. As all timestamps of the stream are shifted by the
     * presentation time offset, we will also have to move the segment start
     * by that offset.
     *
1116 1117 1118 1119 1120 1121 1122
     * Likewise, the demuxer segment stop value is adjusted in the same
     * fashion.
     *
     * Now the running time and stream time at the stream's segment start has
     * to be the one that is stored inside the demuxer's segment, which means
     * that segment.base and segment.time have to be copied over (done just
     * above)
1123 1124 1125 1126 1127 1128 1129 1130 1131 1132 1133 1134 1135
     *
     *
     * If the demuxer segment start is smaller than the period start time,
     * this means that the whole period is inside the segment. As each period
     * starts timestamps from 0, and additionally timestamps are shifted by
     * the presentation time offset, the stream's first timestamp (and as such
     * the stream's segment start) has to be the presentation time offset.
     * The stream time at the segment start is supposed to be the stream time
     * of the period start according to the demuxer segment, so the stream
     * segment's time would be set to that. The same goes for the stream
     * segment's base, which is supposed to be the running time of the period
     * start according to the demuxer's segment.
     *
1136 1137 1138
     * The same logic applies for negative rates with the segment stop and
     * the period stop time (which gets clamped).
     *
1139 1140 1141 1142 1143
     *
     * For the first case where not the complete period is inside the segment,
     * the segment time and base as calculated by the second case would be
     * equivalent.
     */
1144 1145 1146 1147 1148 1149 1150 1151
    GST_DEBUG_OBJECT (demux, "Using demux segment %" GST_SEGMENT_FORMAT,
        &demux->segment);
    GST_DEBUG_OBJECT (demux,
        "period_start: %" GST_TIME_FORMAT " offset: %" GST_TIME_FORMAT,
        GST_TIME_ARGS (period_start), GST_TIME_ARGS (offset));
    /* note for readers:
     * Since stream->segment is initially a copy of demux->segment,
     * only the values that need updating are modified below. */
1152
    if (first_and_live) {
1153
      /* If first and live, demuxer did seek to the current position already */
1154
      stream->segment.start = demux->segment.start - period_start + offset;
1155 1156 1157
      if (GST_CLOCK_TIME_IS_VALID (demux->segment.stop))
        stream->segment.stop = demux->segment.stop - period_start + offset;
      /* FIXME : Do we need to handle negative rates for this ? */
1158 1159
      stream->segment.position = stream->segment.start;
    } else if (demux->segment.start > period_start) {
1160
      /* seek within a period */
1161
      stream->segment.start = demux->segment.start - period_start + offset;
1162 1163 1164 1165 1166 1167
      if (GST_CLOCK_TIME_IS_VALID (demux->segment.stop))
        stream->segment.stop = demux->segment.stop - period_start + offset;
      if (stream->segment.rate >= 0)
        stream->segment.position = offset;
      else
        stream->segment.position = stream->segment.stop;
1168 1169
    } else {
      stream->segment.start = offset;
1170 1171 1172 1173 1174 1175
      if (GST_CLOCK_TIME_IS_VALID (demux->segment.stop))
        stream->segment.stop = demux->segment.stop - period_start + offset;
      if (stream->segment.rate >= 0)
        stream->segment.position = offset;
      else
        stream->segment.position = stream->segment.stop;
1176 1177 1178
      stream->segment.time =
          gst_segment_to_stream_time (&demux->segment, GST_FORMAT_TIME,
          period_start);
1179
      stream->segment.base =
1180 1181 1182
          gst_segment_to_running_time (&demux->segment, GST_FORMAT_TIME,
          period_start);
    }
</