gstqueue2.c 101 KB
Newer Older
1 2 3
/* GStreamer
 * Copyright (C) 1999,2000 Erik Walthinsen <omega@cse.ogi.edu>
 *                    2003 Colin Walters <cwalters@gnome.org>
Wim Taymans's avatar
Wim Taymans committed
4
 *                    2000,2005,2007 Wim Taymans <wim.taymans@gmail.com>
5
 *                    2007 Thiago Sousa Santos <thiagoss@lcc.ufcg.edu.br>
6
 *                 SA 2010 ST-Ericsson <benjamin.gaignard@stericsson.com>
7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29
 *
 * gstqueue2.c:
 *
 * This library is free software; you can redistribute it and/or
 * modify it under the terms of the GNU Library General Public
 * License as published by the Free Software Foundation; either
 * version 2 of the License, or (at your option) any later version.
 *
 * This library is distributed in the hope that it will be useful,
 * but WITHOUT ANY WARRANTY; without even the implied warranty of
 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
 * Library General Public License for more details.
 *
 * You should have received a copy of the GNU Library General Public
 * License along with this library; if not, write to the
 * Free Software Foundation, Inc., 59 Temple Place - Suite 330,
 * Boston, MA 02111-1307, USA.
 */

/**
 * SECTION:element-queue2
 *
 * Data is queued until one of the limits specified by the
30 31
 * #GstQueue2:max-size-buffers, #GstQueue2:max-size-bytes and/or
 * #GstQueue2:max-size-time properties has been reached. Any attempt to push
32 33 34 35 36 37 38
 * more buffers into the queue will block the pushing thread until more space
 * becomes available.
 *
 * The queue will create a new thread on the source pad to decouple the
 * processing on sink and source pad.
 *
 * You can query how many buffers are queued by reading the
39
 * #GstQueue2:current-level-buffers property.
40 41 42
 *
 * The default queue size limits are 100 buffers, 2MB of data, or
 * two seconds worth of data, whichever is reached first.
43
 *
44 45 46 47
 * If you set temp-tmpl to a value such as /tmp/gstreamer-XXXXXX, the element
 * will allocate a random free filename and buffer data in the file.
 * By using this, it will buffer the entire stream data on the file independently
 * of the queue size limits, they will only be used for buffering statistics.
48
 *
49 50 51 52 53 54
 * Since 0.10.24, setting the temp-location property with a filename is deprecated
 * because it's impossible to securely open a temporary file in this way. The
 * property will still be used to notify the application of the allocated
 * filename, though.
 *
 * Last reviewed on 2009-07-10 (0.10.24)
55
 */
56

57 58 59
#ifdef HAVE_CONFIG_H
#include "config.h"
#endif
60

61 62
#include "gstqueue2.h"

63 64
#include <glib/gstdio.h>

65
#include "gst/gst-i18n-lib.h"
66
#include "gst/glib-compat-private.h"
67

68 69
#include <string.h>

LRN's avatar
LRN committed
70 71 72 73 74 75
#ifdef G_OS_WIN32
#include <io.h>                 /* lseek, open, close, read */
#undef lseek
#define lseek _lseeki64
#undef off_t
#define off_t guint64
76 77
#else
#include <unistd.h>
LRN's avatar
LRN committed
78 79
#endif

80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98
static GstStaticPadTemplate sinktemplate = GST_STATIC_PAD_TEMPLATE ("sink",
    GST_PAD_SINK,
    GST_PAD_ALWAYS,
    GST_STATIC_CAPS_ANY);

static GstStaticPadTemplate srctemplate = GST_STATIC_PAD_TEMPLATE ("src",
    GST_PAD_SRC,
    GST_PAD_ALWAYS,
    GST_STATIC_CAPS_ANY);

GST_DEBUG_CATEGORY_STATIC (queue_debug);
#define GST_CAT_DEFAULT (queue_debug)
GST_DEBUG_CATEGORY_STATIC (queue_dataflow);

enum
{
  LAST_SIGNAL
};

99 100 101
/* other defines */
#define DEFAULT_BUFFER_SIZE 4096
#define QUEUE_IS_USING_TEMP_FILE(queue) ((queue)->temp_location_set || (queue)->temp_template != NULL)
102
#define QUEUE_IS_USING_RING_BUFFER(queue) ((queue)->ring_buffer_max_size != 0)  /* for consistency with the above macro */
Wim Taymans's avatar
Wim Taymans committed
103
#define QUEUE_IS_USING_QUEUE(queue) (!QUEUE_IS_USING_TEMP_FILE(queue) && !QUEUE_IS_USING_RING_BUFFER (queue))
104

Wim Taymans's avatar
Wim Taymans committed
105 106
#define QUEUE_MAX_BYTES(queue) MIN((queue)->max_level.bytes, (queue)->ring_buffer_max_size)

107 108 109 110 111 112
/* default property values */
#define DEFAULT_MAX_SIZE_BUFFERS   100  /* 100 buffers */
#define DEFAULT_MAX_SIZE_BYTES     (2 * 1024 * 1024)    /* 2 MB */
#define DEFAULT_MAX_SIZE_TIME      2 * GST_SECOND       /* 2 seconds */
#define DEFAULT_USE_BUFFERING      FALSE
#define DEFAULT_USE_RATE_ESTIMATE  TRUE
113 114
#define DEFAULT_LOW_PERCENT        10
#define DEFAULT_HIGH_PERCENT       99
115
#define DEFAULT_TEMP_REMOVE        TRUE
116
#define DEFAULT_RING_BUFFER_MAX_SIZE 0
117

118 119 120 121 122 123 124 125 126 127 128 129 130
enum
{
  PROP_0,
  PROP_CUR_LEVEL_BUFFERS,
  PROP_CUR_LEVEL_BYTES,
  PROP_CUR_LEVEL_TIME,
  PROP_MAX_SIZE_BUFFERS,
  PROP_MAX_SIZE_BYTES,
  PROP_MAX_SIZE_TIME,
  PROP_USE_BUFFERING,
  PROP_USE_RATE_ESTIMATE,
  PROP_LOW_PERCENT,
  PROP_HIGH_PERCENT,
131
  PROP_TEMP_TEMPLATE,
132 133
  PROP_TEMP_LOCATION,
  PROP_TEMP_REMOVE,
134
  PROP_RING_BUFFER_MAX_SIZE,
135
  PROP_LAST
136 137
};

138
#define GST_QUEUE2_CLEAR_LEVEL(l) G_STMT_START {         \
139 140 141 142 143 144 145 146 147 148
  l.buffers = 0;                                        \
  l.bytes = 0;                                          \
  l.time = 0;                                           \
  l.rate_time = 0;                                      \
} G_STMT_END

#define STATUS(queue, pad, msg) \
  GST_CAT_LOG_OBJECT (queue_dataflow, queue, \
                      "(%s:%s) " msg ": %u of %u buffers, %u of %u " \
                      "bytes, %" G_GUINT64_FORMAT " of %" G_GUINT64_FORMAT \
149
                      " ns, %"G_GUINT64_FORMAT" items", \
150 151 152 153 154 155 156
                      GST_DEBUG_PAD_NAME (pad), \
                      queue->cur_level.buffers, \
                      queue->max_level.buffers, \
                      queue->cur_level.bytes, \
                      queue->max_level.bytes, \
                      queue->cur_level.time, \
                      queue->max_level.time, \
Wim Taymans's avatar
Wim Taymans committed
157
                      (guint64) (!QUEUE_IS_USING_QUEUE(queue) ? \
158
                        queue->current->writing_pos - queue->current->max_reading_pos : \
159
                        queue->queue.length))
160

161
#define GST_QUEUE2_MUTEX_LOCK(q) G_STMT_START {                          \
Wim Taymans's avatar
Wim Taymans committed
162
  g_mutex_lock (&q->qlock);                                              \
163 164
} G_STMT_END

165 166 167
#define GST_QUEUE2_MUTEX_LOCK_CHECK(q,res,label) G_STMT_START {         \
  GST_QUEUE2_MUTEX_LOCK (q);                                            \
  if (res != GST_FLOW_OK)                                               \
168 169 170
    goto label;                                                         \
} G_STMT_END

171
#define GST_QUEUE2_MUTEX_UNLOCK(q) G_STMT_START {                        \
Wim Taymans's avatar
Wim Taymans committed
172
  g_mutex_unlock (&q->qlock);                                            \
173 174
} G_STMT_END

175
#define GST_QUEUE2_WAIT_DEL_CHECK(q, res, label) G_STMT_START {         \
176 177
  STATUS (queue, q->sinkpad, "wait for DEL");                           \
  q->waiting_del = TRUE;                                                \
Wim Taymans's avatar
Wim Taymans committed
178
  g_cond_wait (&q->item_del, &queue->qlock);                              \
179
  q->waiting_del = FALSE;                                               \
180
  if (res != GST_FLOW_OK) {                                             \
181 182 183 184 185 186
    STATUS (queue, q->srcpad, "received DEL wakeup");                   \
    goto label;                                                         \
  }                                                                     \
  STATUS (queue, q->sinkpad, "received DEL");                           \
} G_STMT_END

187
#define GST_QUEUE2_WAIT_ADD_CHECK(q, res, label) G_STMT_START {         \
188 189
  STATUS (queue, q->srcpad, "wait for ADD");                            \
  q->waiting_add = TRUE;                                                \
Wim Taymans's avatar
Wim Taymans committed
190
  g_cond_wait (&q->item_add, &q->qlock);                                  \
191
  q->waiting_add = FALSE;                                               \
192
  if (res != GST_FLOW_OK) {                                             \
193 194 195 196 197 198
    STATUS (queue, q->srcpad, "received ADD wakeup");                   \
    goto label;                                                         \
  }                                                                     \
  STATUS (queue, q->srcpad, "received ADD");                            \
} G_STMT_END

199
#define GST_QUEUE2_SIGNAL_DEL(q) G_STMT_START {                          \
200 201
  if (q->waiting_del) {                                                 \
    STATUS (q, q->srcpad, "signal DEL");                                \
Wim Taymans's avatar
Wim Taymans committed
202
    g_cond_signal (&q->item_del);                                        \
203 204 205
  }                                                                     \
} G_STMT_END

206
#define GST_QUEUE2_SIGNAL_ADD(q) G_STMT_START {                          \
207 208
  if (q->waiting_add) {                                                 \
    STATUS (q, q->sinkpad, "signal ADD");                               \
Wim Taymans's avatar
Wim Taymans committed
209
    g_cond_signal (&q->item_add);                                        \
210 211 212
  }                                                                     \
} G_STMT_END

213
#define _do_init \
214 215 216
    GST_DEBUG_CATEGORY_INIT (queue_debug, "queue2", 0, "queue element"); \
    GST_DEBUG_CATEGORY_INIT (queue_dataflow, "queue2_dataflow", 0, \
        "dataflow inside the queue element");
217 218
#define gst_queue2_parent_class parent_class
G_DEFINE_TYPE_WITH_CODE (GstQueue2, gst_queue2, GST_TYPE_ELEMENT, _do_init);
219

220
static void gst_queue2_finalize (GObject * object);
221

222
static void gst_queue2_set_property (GObject * object,
223
    guint prop_id, const GValue * value, GParamSpec * pspec);
224
static void gst_queue2_get_property (GObject * object,
225 226
    guint prop_id, GValue * value, GParamSpec * pspec);

227 228
static GstFlowReturn gst_queue2_chain (GstPad * pad, GstObject * parent,
    GstBuffer * buffer);
229
static GstFlowReturn gst_queue2_chain_list (GstPad * pad, GstObject * parent,
230
    GstBufferList * buffer_list);
231 232
static GstFlowReturn gst_queue2_push_one (GstQueue2 * queue);
static void gst_queue2_loop (GstPad * pad);
233

234 235
static gboolean gst_queue2_handle_sink_event (GstPad * pad, GstObject * parent,
    GstEvent * event);
236 237
static gboolean gst_queue2_handle_sink_query (GstPad * pad, GstObject * parent,
    GstQuery * query);
238

239 240
static gboolean gst_queue2_handle_src_event (GstPad * pad, GstObject * parent,
    GstEvent * event);
241 242
static gboolean gst_queue2_handle_src_query (GstPad * pad, GstObject * parent,
    GstQuery * query);
243
static gboolean gst_queue2_handle_query (GstElement * element,
244
    GstQuery * query);
245

246 247
static GstFlowReturn gst_queue2_get_range (GstPad * pad, GstObject * parent,
    guint64 offset, guint length, GstBuffer ** buffer);
248

249 250 251 252
static gboolean gst_queue2_src_activate_mode (GstPad * pad, GstObject * parent,
    GstPadMode mode, gboolean active);
static gboolean gst_queue2_sink_activate_mode (GstPad * pad, GstObject * parent,
    GstPadMode mode, gboolean active);
253
static GstStateChangeReturn gst_queue2_change_state (GstElement * element,
254 255
    GstStateChange transition);

256 257
static gboolean gst_queue2_is_empty (GstQueue2 * queue);
static gboolean gst_queue2_is_filled (GstQueue2 * queue);
258

259 260
static void update_cur_level (GstQueue2 * queue, GstQueue2Range * range);

261 262 263 264
typedef enum
{
  GST_QUEUE2_ITEM_TYPE_UNKNOWN = 0,
  GST_QUEUE2_ITEM_TYPE_BUFFER,
265
  GST_QUEUE2_ITEM_TYPE_BUFFER_LIST,
266 267
  GST_QUEUE2_ITEM_TYPE_EVENT
} GstQueue2ItemType;
268

269
/* static guint gst_queue2_signals[LAST_SIGNAL] = { 0 }; */
270 271

static void
272
gst_queue2_class_init (GstQueue2Class * klass)
273 274 275 276
{
  GObjectClass *gobject_class = G_OBJECT_CLASS (klass);
  GstElementClass *gstelement_class = GST_ELEMENT_CLASS (klass);

277 278
  gobject_class->set_property = gst_queue2_set_property;
  gobject_class->get_property = gst_queue2_get_property;
279 280 281 282 283

  /* properties */
  g_object_class_install_property (gobject_class, PROP_CUR_LEVEL_BYTES,
      g_param_spec_uint ("current-level-bytes", "Current level (kB)",
          "Current amount of data in the queue (bytes)",
284
          0, G_MAXUINT, 0, G_PARAM_READABLE | G_PARAM_STATIC_STRINGS));
285 286 287
  g_object_class_install_property (gobject_class, PROP_CUR_LEVEL_BUFFERS,
      g_param_spec_uint ("current-level-buffers", "Current level (buffers)",
          "Current number of buffers in the queue",
288
          0, G_MAXUINT, 0, G_PARAM_READABLE | G_PARAM_STATIC_STRINGS));
289 290 291
  g_object_class_install_property (gobject_class, PROP_CUR_LEVEL_TIME,
      g_param_spec_uint64 ("current-level-time", "Current level (ns)",
          "Current amount of data in the queue (in ns)",
292
          0, G_MAXUINT64, 0, G_PARAM_READABLE | G_PARAM_STATIC_STRINGS));
293 294 295 296

  g_object_class_install_property (gobject_class, PROP_MAX_SIZE_BYTES,
      g_param_spec_uint ("max-size-bytes", "Max. size (kB)",
          "Max. amount of data in the queue (bytes, 0=disable)",
297 298
          0, G_MAXUINT, DEFAULT_MAX_SIZE_BYTES,
          G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
299 300
  g_object_class_install_property (gobject_class, PROP_MAX_SIZE_BUFFERS,
      g_param_spec_uint ("max-size-buffers", "Max. size (buffers)",
301 302 303
          "Max. number of buffers in the queue (0=disable)", 0, G_MAXUINT,
          DEFAULT_MAX_SIZE_BUFFERS,
          G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
304 305
  g_object_class_install_property (gobject_class, PROP_MAX_SIZE_TIME,
      g_param_spec_uint64 ("max-size-time", "Max. size (ns)",
306 307
          "Max. amount of data in the queue (in ns, 0=disable)", 0, G_MAXUINT64,
          DEFAULT_MAX_SIZE_TIME, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
308 309 310 311

  g_object_class_install_property (gobject_class, PROP_USE_BUFFERING,
      g_param_spec_boolean ("use-buffering", "Use buffering",
          "Emit GST_MESSAGE_BUFFERING based on low-/high-percent thresholds",
312
          DEFAULT_USE_BUFFERING, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
313 314 315
  g_object_class_install_property (gobject_class, PROP_USE_RATE_ESTIMATE,
      g_param_spec_boolean ("use-rate-estimate", "Use Rate Estimate",
          "Estimate the bitrate of the stream to calculate time level",
316 317
          DEFAULT_USE_RATE_ESTIMATE,
          G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
318 319
  g_object_class_install_property (gobject_class, PROP_LOW_PERCENT,
      g_param_spec_int ("low-percent", "Low percent",
320 321
          "Low threshold for buffering to start. Only used if use-buffering is True",
          0, 100, DEFAULT_LOW_PERCENT,
322
          G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
323 324
  g_object_class_install_property (gobject_class, PROP_HIGH_PERCENT,
      g_param_spec_int ("high-percent", "High percent",
325 326 327
          "High threshold for buffering to finish. Only used if use-buffering is True",
          0, 100, DEFAULT_HIGH_PERCENT,
          G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
328

329 330 331 332 333 334
  g_object_class_install_property (gobject_class, PROP_TEMP_TEMPLATE,
      g_param_spec_string ("temp-template", "Temporary File Template",
          "File template to store temporary files in, should contain directory "
          "and XXXXXX. (NULL == disabled)",
          NULL, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));

335 336
  g_object_class_install_property (gobject_class, PROP_TEMP_LOCATION,
      g_param_spec_string ("temp-location", "Temporary File Location",
337
          "Location to store temporary files in (Deprecated: Only read this "
338
          "property, use temp-template to configure the name template)",
339
          NULL, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
340

341 342 343 344 345 346 347
  /**
   * GstQueue2:temp-remove
   *
   * When temp-template is set, remove the temporary file when going to READY.
   *
   * Since: 0.10.26
   */
348 349 350 351 352
  g_object_class_install_property (gobject_class, PROP_TEMP_REMOVE,
      g_param_spec_boolean ("temp-remove", "Remove the Temporary File",
          "Remove the temp-location after use",
          DEFAULT_TEMP_REMOVE, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));

353 354 355
  /**
   * GstQueue2:ring-buffer-max-size
   *
356 357
   * The maximum size of the ring buffer in bytes. If set to 0, the ring
   * buffer is disabled. Default 0.
358
   *
359
   * Since: 0.10.31
360 361
   */
  g_object_class_install_property (gobject_class, PROP_RING_BUFFER_MAX_SIZE,
362
      g_param_spec_uint64 ("ring-buffer-max-size",
363
          "Max. ring buffer size (bytes)",
364
          "Max. amount of data in the ring buffer (bytes, 0 = disabled)",
365
          0, G_MAXUINT64, DEFAULT_RING_BUFFER_MAX_SIZE,
366 367
          G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));

368
  /* set several parent class virtual functions */
369
  gobject_class->finalize = gst_queue2_finalize;
370

371 372 373 374 375 376 377 378 379 380 381
  gst_element_class_add_pad_template (gstelement_class,
      gst_static_pad_template_get (&srctemplate));
  gst_element_class_add_pad_template (gstelement_class,
      gst_static_pad_template_get (&sinktemplate));

  gst_element_class_set_details_simple (gstelement_class, "Queue 2",
      "Generic",
      "Simple data queue",
      "Erik Walthinsen <omega@cse.ogi.edu>, "
      "Wim Taymans <wim.taymans@gmail.com>");

382
  gstelement_class->change_state = GST_DEBUG_FUNCPTR (gst_queue2_change_state);
383
  gstelement_class->query = GST_DEBUG_FUNCPTR (gst_queue2_handle_query);
384 385 386
}

static void
387
gst_queue2_init (GstQueue2 * queue)
388 389 390 391
{
  queue->sinkpad = gst_pad_new_from_static_template (&sinktemplate, "sink");

  gst_pad_set_chain_function (queue->sinkpad,
392
      GST_DEBUG_FUNCPTR (gst_queue2_chain));
393 394
  gst_pad_set_chain_list_function (queue->sinkpad,
      GST_DEBUG_FUNCPTR (gst_queue2_chain_list));
395 396
  gst_pad_set_activatemode_function (queue->sinkpad,
      GST_DEBUG_FUNCPTR (gst_queue2_sink_activate_mode));
397
  gst_pad_set_event_function (queue->sinkpad,
398
      GST_DEBUG_FUNCPTR (gst_queue2_handle_sink_event));
Wim Taymans's avatar
Wim Taymans committed
399 400
  gst_pad_set_query_function (queue->sinkpad,
      GST_DEBUG_FUNCPTR (gst_queue2_handle_sink_query));
Wim Taymans's avatar
Wim Taymans committed
401 402
  GST_PAD_SET_PROXY_CAPS (queue->sinkpad);
  GST_PAD_SET_PROXY_ALLOCATION (queue->sinkpad);
403 404 405 406
  gst_element_add_pad (GST_ELEMENT (queue), queue->sinkpad);

  queue->srcpad = gst_pad_new_from_static_template (&srctemplate, "src");

407 408
  gst_pad_set_activatemode_function (queue->srcpad,
      GST_DEBUG_FUNCPTR (gst_queue2_src_activate_mode));
409
  gst_pad_set_getrange_function (queue->srcpad,
410
      GST_DEBUG_FUNCPTR (gst_queue2_get_range));
411
  gst_pad_set_event_function (queue->srcpad,
412
      GST_DEBUG_FUNCPTR (gst_queue2_handle_src_event));
413
  gst_pad_set_query_function (queue->srcpad,
414
      GST_DEBUG_FUNCPTR (gst_queue2_handle_src_query));
Wim Taymans's avatar
Wim Taymans committed
415
  GST_PAD_SET_PROXY_CAPS (queue->srcpad);
416 417 418
  gst_element_add_pad (GST_ELEMENT (queue), queue->srcpad);

  /* levels */
419
  GST_QUEUE2_CLEAR_LEVEL (queue->cur_level);
420 421 422 423 424 425 426 427 428 429 430 431
  queue->max_level.buffers = DEFAULT_MAX_SIZE_BUFFERS;
  queue->max_level.bytes = DEFAULT_MAX_SIZE_BYTES;
  queue->max_level.time = DEFAULT_MAX_SIZE_TIME;
  queue->max_level.rate_time = DEFAULT_MAX_SIZE_TIME;
  queue->use_buffering = DEFAULT_USE_BUFFERING;
  queue->use_rate_estimate = DEFAULT_USE_RATE_ESTIMATE;
  queue->low_percent = DEFAULT_LOW_PERCENT;
  queue->high_percent = DEFAULT_HIGH_PERCENT;

  gst_segment_init (&queue->sink_segment, GST_FORMAT_TIME);
  gst_segment_init (&queue->src_segment, GST_FORMAT_TIME);

432 433 434 435 436
  queue->sinktime = GST_CLOCK_TIME_NONE;
  queue->srctime = GST_CLOCK_TIME_NONE;
  queue->sink_tainted = TRUE;
  queue->src_tainted = TRUE;

437 438
  queue->srcresult = GST_FLOW_FLUSHING;
  queue->sinkresult = GST_FLOW_FLUSHING;
439
  queue->is_eos = FALSE;
440 441
  queue->in_timer = g_timer_new ();
  queue->out_timer = g_timer_new ();
442

Wim Taymans's avatar
Wim Taymans committed
443
  g_mutex_init (&queue->qlock);
444
  queue->waiting_add = FALSE;
Wim Taymans's avatar
Wim Taymans committed
445
  g_cond_init (&queue->item_add);
446
  queue->waiting_del = FALSE;
Wim Taymans's avatar
Wim Taymans committed
447
  g_cond_init (&queue->item_del);
448
  g_queue_init (&queue->queue);
449

450 451
  queue->buffering_percent = 100;

452
  /* tempfile related */
453
  queue->temp_template = NULL;
454
  queue->temp_location = NULL;
455
  queue->temp_location_set = FALSE;
456
  queue->temp_remove = DEFAULT_TEMP_REMOVE;
457

458
  queue->ring_buffer = NULL;
459
  queue->ring_buffer_max_size = DEFAULT_RING_BUFFER_MAX_SIZE;
460

461 462 463 464 465 466
  GST_DEBUG_OBJECT (queue,
      "initialized queue's not_empty & not_full conditions");
}

/* called only once, as opposed to dispose */
static void
467
gst_queue2_finalize (GObject * object)
468
{
469
  GstQueue2 *queue = GST_QUEUE2 (object);
470 471 472

  GST_DEBUG_OBJECT (queue, "finalizing queue");

473 474
  while (!g_queue_is_empty (&queue->queue)) {
    GstMiniObject *data = g_queue_pop_head (&queue->queue);
475 476 477

    gst_mini_object_unref (data);
  }
478

479
  g_queue_clear (&queue->queue);
Wim Taymans's avatar
Wim Taymans committed
480 481 482
  g_mutex_clear (&queue->qlock);
  g_cond_clear (&queue->item_add);
  g_cond_clear (&queue->item_del);
483 484
  g_timer_destroy (queue->in_timer);
  g_timer_destroy (queue->out_timer);
485

486
  /* temp_file path cleanup  */
487 488
  g_free (queue->temp_template);
  g_free (queue->temp_location);
489

490 491 492
  G_OBJECT_CLASS (parent_class)->finalize (object);
}

493 494 495 496 497 498
static void
debug_ranges (GstQueue2 * queue)
{
  GstQueue2Range *walk;

  for (walk = queue->ranges; walk; walk = walk->next) {
499 500 501 502 503 504
    GST_DEBUG_OBJECT (queue,
        "range [%" G_GUINT64_FORMAT "-%" G_GUINT64_FORMAT "] (rb [%"
        G_GUINT64_FORMAT "-%" G_GUINT64_FORMAT "]), reading %" G_GUINT64_FORMAT
        " current range? %s", walk->offset, walk->writing_pos, walk->rb_offset,
        walk->rb_writing_pos, walk->reading_pos,
        walk == queue->current ? "**y**" : "  n  ");
505 506 507 508 509 510 511 512 513 514 515 516 517 518 519 520
  }
}

/* clear all the downloaded ranges */
static void
clean_ranges (GstQueue2 * queue)
{
  GST_DEBUG_OBJECT (queue, "clean queue ranges");

  g_slice_free_chain (GstQueue2Range, queue->ranges, next);
  queue->ranges = NULL;
  queue->current = NULL;
}

/* find a range that contains @offset or NULL when nothing does */
static GstQueue2Range *
521
find_range (GstQueue2 * queue, guint64 offset)
522
{
523 524
  GstQueue2Range *range = NULL;
  GstQueue2Range *walk;
525 526 527 528 529 530 531 532 533

  /* first do a quick check for the current range */
  for (walk = queue->ranges; walk; walk = walk->next) {
    if (offset >= walk->offset && offset <= walk->writing_pos) {
      /* we can reuse an existing range */
      range = walk;
      break;
    }
  }
Robert Swain's avatar
Robert Swain committed
534 535 536 537 538 539 540
  if (range) {
    GST_DEBUG_OBJECT (queue,
        "found range for %" G_GUINT64_FORMAT ": [%" G_GUINT64_FORMAT "-%"
        G_GUINT64_FORMAT "]", offset, range->offset, range->writing_pos);
  } else {
    GST_DEBUG_OBJECT (queue, "no range for %" G_GUINT64_FORMAT, offset);
  }
541 542 543
  return range;
}

Wim Taymans's avatar
Wim Taymans committed
544 545 546 547 548 549 550 551 552 553 554 555 556 557
static void
update_cur_level (GstQueue2 * queue, GstQueue2Range * range)
{
  guint64 max_reading_pos, writing_pos;

  writing_pos = range->writing_pos;
  max_reading_pos = range->max_reading_pos;

  if (writing_pos > max_reading_pos)
    queue->cur_level.bytes = writing_pos - max_reading_pos;
  else
    queue->cur_level.bytes = 0;
}

558 559 560 561 562 563 564 565
/* make a new range for @offset or reuse an existing range */
static GstQueue2Range *
add_range (GstQueue2 * queue, guint64 offset)
{
  GstQueue2Range *range, *prev, *next;

  GST_DEBUG_OBJECT (queue, "find range for %" G_GUINT64_FORMAT, offset);

566
  if ((range = find_range (queue, offset))) {
567 568 569 570 571 572 573 574 575 576
    GST_DEBUG_OBJECT (queue,
        "reusing range %" G_GUINT64_FORMAT "-%" G_GUINT64_FORMAT, range->offset,
        range->writing_pos);
    range->writing_pos = offset;
  } else {
    GST_DEBUG_OBJECT (queue,
        "new range %" G_GUINT64_FORMAT "-%" G_GUINT64_FORMAT, offset, offset);

    range = g_slice_new0 (GstQueue2Range);
    range->offset = offset;
577 578
    /* we want to write to the next location in the ring buffer */
    range->rb_offset = queue->current ? queue->current->rb_writing_pos : 0;
579
    range->writing_pos = offset;
580
    range->rb_writing_pos = range->rb_offset;
581 582 583 584 585 586 587 588 589 590 591 592 593 594 595 596 597 598 599 600 601 602 603 604 605 606
    range->reading_pos = offset;
    range->max_reading_pos = offset;

    /* insert sorted */
    prev = NULL;
    next = queue->ranges;
    while (next) {
      if (next->offset > offset) {
        /* insert before next */
        GST_DEBUG_OBJECT (queue,
            "insert before range %p, offset %" G_GUINT64_FORMAT, next,
            next->offset);
        break;
      }
      /* try next */
      prev = next;
      next = next->next;
    }
    range->next = next;
    if (prev)
      prev->next = range;
    else
      queue->ranges = range;
  }
  debug_ranges (queue);

Wim Taymans's avatar
Wim Taymans committed
607 608 609
  /* update the stats for this range */
  update_cur_level (queue, range);

610 611 612 613 614 615 616 617 618 619 620 621 622 623 624 625
  return range;
}


/* clear and init the download ranges for offset 0 */
static void
init_ranges (GstQueue2 * queue)
{
  GST_DEBUG_OBJECT (queue, "init queue ranges");

  /* get rid of all the current ranges */
  clean_ranges (queue);
  /* make a range for offset 0 */
  queue->current = add_range (queue, 0);
}

626 627 628
/* calculate the diff between running time on the sink and src of the queue.
 * This is the total amount of time in the queue. */
static void
629
update_time_level (GstQueue2 * queue)
630
{
631 632 633
  if (queue->sink_tainted) {
    queue->sinktime =
        gst_segment_to_running_time (&queue->sink_segment, GST_FORMAT_TIME,
Wim Taymans's avatar
Wim Taymans committed
634
        queue->sink_segment.position);
635 636
    queue->sink_tainted = FALSE;
  }
637

638 639 640
  if (queue->src_tainted) {
    queue->srctime =
        gst_segment_to_running_time (&queue->src_segment, GST_FORMAT_TIME,
Wim Taymans's avatar
Wim Taymans committed
641
        queue->src_segment.position);
642 643
    queue->src_tainted = FALSE;
  }
644 645

  GST_DEBUG_OBJECT (queue, "sink %" GST_TIME_FORMAT ", src %" GST_TIME_FORMAT,
646
      GST_TIME_ARGS (queue->sinktime), GST_TIME_ARGS (queue->srctime));
647

648 649 650
  if (queue->sinktime != GST_CLOCK_TIME_NONE
      && queue->srctime != GST_CLOCK_TIME_NONE
      && queue->sinktime >= queue->srctime)
651
    queue->cur_level.time = queue->sinktime - queue->srctime;
652 653 654 655
  else
    queue->cur_level.time = 0;
}

Wim Taymans's avatar
Wim Taymans committed
656
/* take a SEGMENT event and apply the values to segment, updating the time
657 658
 * level of queue. */
static void
659 660
apply_segment (GstQueue2 * queue, GstEvent * event, GstSegment * segment,
    gboolean is_sink)
661
{
662
  gst_event_copy_segment (event, segment);
663

Wim Taymans's avatar
Wim Taymans committed
664
  if (segment->format == GST_FORMAT_BYTES) {
665 666
    if (QUEUE_IS_USING_TEMP_FILE (queue)) {
      /* start is where we'll be getting from and as such writing next */
Wim Taymans's avatar
Wim Taymans committed
667
      queue->current = add_range (queue, segment->start);
668 669 670
      /* update the stats for this range */
      update_cur_level (queue, queue->current);
    }
671 672
  }

673 674
  /* now configure the values, we use these to track timestamps on the
   * sinkpad. */
Wim Taymans's avatar
Wim Taymans committed
675
  if (segment->format != GST_FORMAT_TIME) {
676 677
    /* non-time format, pretent the current time segment is closed with a
     * 0 start and unknown stop time. */
Wim Taymans's avatar
Wim Taymans committed
678 679 680 681
    segment->format = GST_FORMAT_TIME;
    segment->start = 0;
    segment->stop = -1;
    segment->time = 0;
682 683
  }

Wim Taymans's avatar
Wim Taymans committed
684
  GST_DEBUG_OBJECT (queue, "configured SEGMENT %" GST_SEGMENT_FORMAT, segment);
685

686 687 688 689 690
  if (is_sink)
    queue->sink_tainted = TRUE;
  else
    queue->src_tainted = TRUE;

691 692 693 694 695 696
  /* segment can update the time level of the queue */
  update_time_level (queue);
}

/* take a buffer and update segment, updating the time level of the queue. */
static void
697 698
apply_buffer (GstQueue2 * queue, GstBuffer * buffer, GstSegment * segment,
    gboolean is_sink)
699 700 701 702 703 704
{
  GstClockTime duration, timestamp;

  timestamp = GST_BUFFER_TIMESTAMP (buffer);
  duration = GST_BUFFER_DURATION (buffer);

705
  /* if no timestamp is set, assume it's continuous with the previous
706 707
   * time */
  if (timestamp == GST_CLOCK_TIME_NONE)
Wim Taymans's avatar
Wim Taymans committed
708
    timestamp = segment->position;
709 710 711 712 713

  /* add duration */
  if (duration != GST_CLOCK_TIME_NONE)
    timestamp += duration;

Wim Taymans's avatar
Wim Taymans committed
714
  GST_DEBUG_OBJECT (queue, "position updated to %" GST_TIME_FORMAT,
715 716
      GST_TIME_ARGS (timestamp));

Wim Taymans's avatar
Wim Taymans committed
717
  segment->position = timestamp;
718

719 720 721 722 723
  if (is_sink)
    queue->sink_tainted = TRUE;
  else
    queue->src_tainted = TRUE;

724 725 726 727
  /* calc diff with other end */
  update_time_level (queue);
}

728 729
static gboolean
buffer_list_apply_time (GstBuffer ** buf, guint idx, gpointer data)
730 731 732
{
  GstClockTime *timestamp = data;

733 734
  GST_TRACE ("buffer %u has ts %" GST_TIME_FORMAT
      " duration %" GST_TIME_FORMAT, idx,
735 736 737 738 739 740 741 742 743 744
      GST_TIME_ARGS (GST_BUFFER_TIMESTAMP (*buf)),
      GST_TIME_ARGS (GST_BUFFER_DURATION (*buf)));

  if (GST_BUFFER_TIMESTAMP_IS_VALID (*buf))
    *timestamp = GST_BUFFER_TIMESTAMP (*buf);

  if (GST_BUFFER_DURATION_IS_VALID (*buf))
    *timestamp += GST_BUFFER_DURATION (*buf);

  GST_TRACE ("ts now %" GST_TIME_FORMAT, GST_TIME_ARGS (*timestamp));
745
  return TRUE;
746 747 748 749 750 751 752 753 754 755
}

/* take a buffer list and update segment, updating the time level of the queue */
static void
apply_buffer_list (GstQueue2 * queue, GstBufferList * buffer_list,
    GstSegment * segment, gboolean is_sink)
{
  GstClockTime timestamp;

  /* if no timestamp is set, assume it's continuous with the previous time */
756
  timestamp = segment->position;
757 758 759 760 761 762

  gst_buffer_list_foreach (buffer_list, buffer_list_apply_time, &timestamp);

  GST_DEBUG_OBJECT (queue, "last_stop updated to %" GST_TIME_FORMAT,
      GST_TIME_ARGS (timestamp));

763
  segment->position = timestamp;
764 765 766 767 768 769 770 771 772 773

  if (is_sink)
    queue->sink_tainted = TRUE;
  else
    queue->src_tainted = TRUE;

  /* calc diff with other end */
  update_time_level (queue);
}

774
static void
775
update_buffering (GstQueue2 * queue)
776
{
777
  gint64 percent;
778 779
  gboolean post = FALSE;

780
  if (queue->high_percent <= 0)
781 782
    return;

783
#define GET_PERCENT(format,alt_max) ((queue->max_level.format) > 0 ? (queue->cur_level.format) * 100 / ((alt_max) > 0 ? MIN ((alt_max), (queue->max_level.format)) : (queue->max_level.format)) : 0)
784 785 786

  if (queue->is_eos) {
    /* on EOS we are always 100% full, we set the var here so that it we can
Thiago Santos's avatar
Thiago Santos committed
787
     * reuse the logic below to stop buffering */
788
    percent = 100;
789
    GST_LOG_OBJECT (queue, "we are EOS");
790 791
  } else {
    /* figure out the percent we are filled, we take the max of all formats. */
792 793

    if (!QUEUE_IS_USING_RING_BUFFER (queue)) {
794
      percent = GET_PERCENT (bytes, 0);
795 796
    } else {
      guint64 rb_size = queue->ring_buffer_max_size;
797
      percent = GET_PERCENT (bytes, rb_size);
798
    }
799 800
    percent = MAX (percent, GET_PERCENT (time, 0));
    percent = MAX (percent, GET_PERCENT (buffers, 0));
801 802 803

    /* also apply the rate estimate when we need to */
    if (queue->use_rate_estimate)
804
      percent = MAX (percent, GET_PERCENT (rate_time, 0));
805 806 807 808 809 810 811 812 813 814 815 816
  }

  if (queue->is_buffering) {
    post = TRUE;
    /* if we were buffering see if we reached the high watermark */
    if (percent >= queue->high_percent)
      queue->is_buffering = FALSE;
  } else {
    /* we were not buffering, check if we need to start buffering if we drop
     * below the low threshold */
    if (percent < queue->low_percent) {
      queue->is_buffering = TRUE;
817
      queue->buffering_iteration++;
818 819 820 821
      post = TRUE;
    }
  }
  if (post) {
822
    GstMessage *message;
823
    GstBufferingMode mode;
824
    gint64 buffering_left = -1;
825

826 827 828 829 830 831
    /* scale to high percent so that it becomes the 100% mark */
    percent = percent * 100 / queue->high_percent;
    /* clip */
    if (percent > 100)
      percent = 100;

832 833 834 835 836 837 838 839 840 841 842 843
    if (percent != queue->buffering_percent) {
      queue->buffering_percent = percent;

      if (!QUEUE_IS_USING_QUEUE (queue)) {
        gint64 duration;

        if (QUEUE_IS_USING_RING_BUFFER (queue))
          mode = GST_BUFFERING_TIMESHIFT;
        else
          mode = GST_BUFFERING_DOWNLOAD;

        if (queue->byte_in_rate > 0) {
844
          if (gst_pad_peer_query_duration (queue->sinkpad, GST_FORMAT_BYTES,
845
                  &duration)) {
846 847 848
            buffering_left =
                (gdouble) ((duration -
                    queue->current->writing_pos) * 1000) / queue->byte_in_rate;
849
          }
850 851 852
        } else {
          buffering_left = G_MAXINT64;
        }
853
      } else {
854
        mode = GST_BUFFERING_STREAM;
855
      }
856

857 858 859 860 861
      GST_DEBUG_OBJECT (queue, "buffering %d percent", (gint) percent);
      message = gst_message_new_buffering (GST_OBJECT_CAST (queue),
          (gint) percent);
      gst_message_set_buffering_stats (message, mode,
          queue->byte_in_rate, queue->byte_out_rate, buffering_left);
862

863 864
      gst_element_post_message (GST_ELEMENT_CAST (queue), message);
    }
865
  } else {
Thiago Santos's avatar
Thiago Santos committed
866
    GST_DEBUG_OBJECT (queue, "filled %d percent", (gint) percent);
867 868 869 870 871 872
  }

#undef GET_PERCENT
}

static void
873
reset_rate_timer (GstQueue2 * queue)
874 875 876 877
{
  queue->bytes_in = 0;
  queue->bytes_out = 0;
  queue->byte_in_rate = 0.0;
878
  queue->byte_in_period = 0;
879
  queue->byte_out_rate = 0.0;
880 881 882 883
  queue->last_in_elapsed = 0.0;
  queue->last_out_elapsed = 0.0;
  queue->in_timer_started = FALSE;
  queue->out_timer_started = FALSE;
884 885
}

886 887
/* the interval in seconds to recalculate the rate */
#define RATE_INTERVAL    0.2
888 889 890
/* Tuning for rate estimation. We use a large window for the input rate because
 * it should be stable when connected to a network. The output rate is less
 * stable (the elements preroll, queues behind a demuxer fill, ...) and should
891 892 893 894 895
 * therefore adapt more quickly.
 * However, initial input rate may be subject to a burst, and should therefore
 * initially also adapt more quickly to changes, and only later on give higher
 * weight to previous values. */
#define AVG_IN(avg,val,w1,w2)  ((avg) * (w1) + (val) * (w2)) / ((w1) + (w2))
896 897 898
#define AVG_OUT(avg,val) ((avg) * 3.0 + (val)) / 4.0

static void
899
update_in_rates (GstQueue2 * queue)
900 901 902 903
{
  gdouble elapsed, period;
  gdouble byte_in_rate;

904 905 906
  if (!queue->in_timer_started) {
    queue->in_timer_started = TRUE;
    g_timer_start (queue->in_timer);
907 908 909
    return;
  }

910
  elapsed = g_timer_elapsed (queue->in_timer, NULL);
911 912

  /* recalc after each interval. */
913 914
  if (queue->last_in_elapsed + RATE_INTERVAL < elapsed) {
    period = elapsed - queue->last_in_elapsed;
915 916

    GST_DEBUG_OBJECT (queue,
917 918
        "rates: period %f, in %" G_GUINT64_FORMAT ", global period %f",
        period, queue->bytes_in, queue->byte_in_period);
919 920 921 922 923 924

    byte_in_rate = queue->bytes_in / period;

    if (queue->byte_in_rate == 0.0)
      queue->byte_in_rate = byte_in_rate;
    else
925 926 927 928 929 930
      queue->byte_in_rate = AVG_IN (queue->byte_in_rate, byte_in_rate,
          (double) queue->byte_in_period, period);

    /* another data point, cap at 16 for long time running average */
    if (queue->byte_in_period < 16 * RATE_INTERVAL)
      queue->byte_in_period += period;
931 932

    /* reset the values to calculate rate over the next interval */
933
    queue->last_in_elapsed = elapsed;
934 935
    queue->bytes_in = 0;
  }
936

937 938 939 940
  if (queue->byte_in_rate > 0.0) {
    queue->cur_level.rate_time =
        queue->cur_level.bytes / queue->byte_in_rate * GST_SECOND;
  }
941 942 943 944 945
  GST_DEBUG_OBJECT (queue, "rates: in %f, time %" GST_TIME_FORMAT,
      queue->byte_in_rate, GST_TIME_ARGS (queue->cur_level.rate_time));
}

static void
946
update_out_rates (GstQueue2 * queue)
947 948 949 950 951 952 953 954 955 956 957 958 959 960 961
{
  gdouble elapsed, period;
  gdouble byte_out_rate;

  if (!queue->out_timer_started) {
    queue->out_timer_started = TRUE;
    g_timer_start (queue->out_timer);
    return;
  }

  elapsed = g_timer_elapsed (queue->out_timer, NULL);

  /* recalc after each interval. */
  if (queue->last_out_elapsed + RATE_INTERVAL < elapsed) {
    period = elapsed - queue->last_out_elapsed;
962

963 964 965 966 967 968 969 970 971 972 973 974 975 976
    GST_DEBUG_OBJECT (queue,
        "rates: period %f, out %" G_GUINT64_FORMAT, period, queue->bytes_out);

    byte_out_rate = queue->bytes_out / period;

    if (queue->byte_out_rate == 0.0)
      queue->byte_out_rate = byte_out_rate;
    else
      queue->byte_out_rate = AVG_OUT (queue->byte_out_rate, byte_out_rate);

    /* reset the values to calculate rate over the next interval */
    queue->last_out_elapsed = elapsed;
    queue->bytes_out = 0;
  }
977 978 979 980
  if (queue->byte_in_rate > 0.0) {
    queue->cur_level.rate_time =
        queue->cur_level.bytes / queue->byte_in_rate * GST_SECOND;
  }
981 982
  GST_DEBUG_OBJECT (queue, "rates: out %f, time %" GST_TIME_FORMAT,
      queue->byte_out_rate, GST_TIME_ARGS (queue->cur_level.rate_time));
983 984
}

985 986 987 988 989 990 991 992 993 994
static void
update_cur_pos (GstQueue2 * queue, GstQueue2Range * range, guint64 pos)
{
  guint64 reading_pos, max_reading_pos;

  reading_pos = pos;
  max_reading_pos = range->max_reading_pos;

  max_reading_pos = MAX (max_reading_pos, reading_pos);

Robert Swain's avatar
Robert Swain committed
995
  GST_DEBUG_OBJECT (queue,
Robert Swain's avatar
Robert Swain committed
996 997
      "updating max_reading_pos from %" G_GUINT64_FORMAT " to %"
      G_GUINT64_FORMAT, range->max_reading_pos, max_reading_pos);
998 999 1000 1001 1002
  range->max_reading_pos = max_reading_pos;

  update_cur_level (queue, range);
}

1003 1004 1005 1006 1007 1008
static gboolean
perform_seek_to_offset (GstQueue2 * queue, guint64 offset)
{
  GstEvent *event;
  gboolean res;

1009 1010
  GST_QUEUE2_MUTEX_UNLOCK (queue);

1011 1012 1013 1014 1015 1016 1017 1018 1019 1020
  GST_DEBUG_OBJECT (queue, "Seeking to %" G_GUINT64_FORMAT, offset);

  event =
      gst_event_new_seek (1.0, GST_FORMAT_BYTES,
      GST_SEEK_FLAG_FLUSH | GST_SEEK_FLAG_ACCURATE, GST_SEEK_TYPE_SET, offset,
      GST_SEEK_TYPE_NONE, -1);

  res = gst_pad_push_event (queue->sinkpad, event);
  GST_QUEUE2_MUTEX_LOCK (queue);

Wim Taymans's avatar
Wim Taymans committed
1021 1022 1023
  if (res)
    queue->current = add_range (queue, offset);