gstqueue2.c 95.3 KB
Newer Older
1
2
3
/* GStreamer
 * Copyright (C) 1999,2000 Erik Walthinsen <omega@cse.ogi.edu>
 *                    2003 Colin Walters <cwalters@gnome.org>
Wim Taymans's avatar
Wim Taymans committed
4
 *                    2000,2005,2007 Wim Taymans <wim.taymans@gmail.com>
5
 *                    2007 Thiago Sousa Santos <thiagoss@lcc.ufcg.edu.br>
6
 *                 SA 2010 ST-Ericsson <benjamin.gaignard@stericsson.com>
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
 *
 * gstqueue2.c:
 *
 * This library is free software; you can redistribute it and/or
 * modify it under the terms of the GNU Library General Public
 * License as published by the Free Software Foundation; either
 * version 2 of the License, or (at your option) any later version.
 *
 * This library is distributed in the hope that it will be useful,
 * but WITHOUT ANY WARRANTY; without even the implied warranty of
 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
 * Library General Public License for more details.
 *
 * You should have received a copy of the GNU Library General Public
 * License along with this library; if not, write to the
 * Free Software Foundation, Inc., 59 Temple Place - Suite 330,
 * Boston, MA 02111-1307, USA.
 */

/**
 * SECTION:element-queue2
 *
 * Data is queued until one of the limits specified by the
30
31
 * #GstQueue2:max-size-buffers, #GstQueue2:max-size-bytes and/or
 * #GstQueue2:max-size-time properties has been reached. Any attempt to push
32
33
34
35
36
37
38
 * more buffers into the queue will block the pushing thread until more space
 * becomes available.
 *
 * The queue will create a new thread on the source pad to decouple the
 * processing on sink and source pad.
 *
 * You can query how many buffers are queued by reading the
39
 * #GstQueue2:current-level-buffers property.
40
41
42
 *
 * The default queue size limits are 100 buffers, 2MB of data, or
 * two seconds worth of data, whichever is reached first.
43
 *
44
45
46
47
 * If you set temp-tmpl to a value such as /tmp/gstreamer-XXXXXX, the element
 * will allocate a random free filename and buffer data in the file.
 * By using this, it will buffer the entire stream data on the file independently
 * of the queue size limits, they will only be used for buffering statistics.
48
 *
49
50
51
52
53
54
 * Since 0.10.24, setting the temp-location property with a filename is deprecated
 * because it's impossible to securely open a temporary file in this way. The
 * property will still be used to notify the application of the allocated
 * filename, though.
 *
 * Last reviewed on 2009-07-10 (0.10.24)
55
 */
56

57
58
59
#ifdef HAVE_CONFIG_H
#include "config.h"
#endif
60

61
62
#include "gstqueue2.h"

63
64
#include <glib/gstdio.h>

65
#include "gst/gst-i18n-lib.h"
66

67
68
#include <string.h>

LRN's avatar
LRN committed
69
70
71
72
73
74
#ifdef G_OS_WIN32
#include <io.h>                 /* lseek, open, close, read */
#undef lseek
#define lseek _lseeki64
#undef off_t
#define off_t guint64
75
76
#else
#include <unistd.h>
LRN's avatar
LRN committed
77
78
#endif

79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
static GstStaticPadTemplate sinktemplate = GST_STATIC_PAD_TEMPLATE ("sink",
    GST_PAD_SINK,
    GST_PAD_ALWAYS,
    GST_STATIC_CAPS_ANY);

static GstStaticPadTemplate srctemplate = GST_STATIC_PAD_TEMPLATE ("src",
    GST_PAD_SRC,
    GST_PAD_ALWAYS,
    GST_STATIC_CAPS_ANY);

GST_DEBUG_CATEGORY_STATIC (queue_debug);
#define GST_CAT_DEFAULT (queue_debug)
GST_DEBUG_CATEGORY_STATIC (queue_dataflow);

enum
{
  LAST_SIGNAL
};

98
99
100
/* other defines */
#define DEFAULT_BUFFER_SIZE 4096
#define QUEUE_IS_USING_TEMP_FILE(queue) ((queue)->temp_location_set || (queue)->temp_template != NULL)
101
#define QUEUE_IS_USING_RING_BUFFER(queue) ((queue)->ring_buffer_max_size != 0)  /* for consistency with the above macro */
Wim Taymans's avatar
Wim Taymans committed
102
#define QUEUE_IS_USING_QUEUE(queue) (!QUEUE_IS_USING_TEMP_FILE(queue) && !QUEUE_IS_USING_RING_BUFFER (queue))
103

Wim Taymans's avatar
Wim Taymans committed
104
105
#define QUEUE_MAX_BYTES(queue) MIN((queue)->max_level.bytes, (queue)->ring_buffer_max_size)

106
107
108
109
110
111
/* default property values */
#define DEFAULT_MAX_SIZE_BUFFERS   100  /* 100 buffers */
#define DEFAULT_MAX_SIZE_BYTES     (2 * 1024 * 1024)    /* 2 MB */
#define DEFAULT_MAX_SIZE_TIME      2 * GST_SECOND       /* 2 seconds */
#define DEFAULT_USE_BUFFERING      FALSE
#define DEFAULT_USE_RATE_ESTIMATE  TRUE
112
113
#define DEFAULT_LOW_PERCENT        10
#define DEFAULT_HIGH_PERCENT       99
114
#define DEFAULT_TEMP_REMOVE        TRUE
115
#define DEFAULT_RING_BUFFER_MAX_SIZE 0
116

117
118
119
120
121
122
123
124
125
126
127
128
129
enum
{
  PROP_0,
  PROP_CUR_LEVEL_BUFFERS,
  PROP_CUR_LEVEL_BYTES,
  PROP_CUR_LEVEL_TIME,
  PROP_MAX_SIZE_BUFFERS,
  PROP_MAX_SIZE_BYTES,
  PROP_MAX_SIZE_TIME,
  PROP_USE_BUFFERING,
  PROP_USE_RATE_ESTIMATE,
  PROP_LOW_PERCENT,
  PROP_HIGH_PERCENT,
130
  PROP_TEMP_TEMPLATE,
131
132
  PROP_TEMP_LOCATION,
  PROP_TEMP_REMOVE,
133
  PROP_RING_BUFFER_MAX_SIZE,
134
  PROP_LAST
135
136
};

137
#define GST_QUEUE2_CLEAR_LEVEL(l) G_STMT_START {         \
138
139
140
141
142
143
144
145
146
147
  l.buffers = 0;                                        \
  l.bytes = 0;                                          \
  l.time = 0;                                           \
  l.rate_time = 0;                                      \
} G_STMT_END

#define STATUS(queue, pad, msg) \
  GST_CAT_LOG_OBJECT (queue_dataflow, queue, \
                      "(%s:%s) " msg ": %u of %u buffers, %u of %u " \
                      "bytes, %" G_GUINT64_FORMAT " of %" G_GUINT64_FORMAT \
148
                      " ns, %"G_GUINT64_FORMAT" items", \
149
150
151
152
153
154
155
                      GST_DEBUG_PAD_NAME (pad), \
                      queue->cur_level.buffers, \
                      queue->max_level.buffers, \
                      queue->cur_level.bytes, \
                      queue->max_level.bytes, \
                      queue->cur_level.time, \
                      queue->max_level.time, \
Wim Taymans's avatar
Wim Taymans committed
156
                      (guint64) (!QUEUE_IS_USING_QUEUE(queue) ? \
157
                        queue->current->writing_pos - queue->current->max_reading_pos : \
158
                        queue->queue->length))
159

160
#define GST_QUEUE2_MUTEX_LOCK(q) G_STMT_START {                          \
161
162
163
  g_mutex_lock (q->qlock);                                              \
} G_STMT_END

164
165
166
#define GST_QUEUE2_MUTEX_LOCK_CHECK(q,res,label) G_STMT_START {         \
  GST_QUEUE2_MUTEX_LOCK (q);                                            \
  if (res != GST_FLOW_OK)                                               \
167
168
169
    goto label;                                                         \
} G_STMT_END

170
#define GST_QUEUE2_MUTEX_UNLOCK(q) G_STMT_START {                        \
171
172
173
  g_mutex_unlock (q->qlock);                                            \
} G_STMT_END

174
#define GST_QUEUE2_WAIT_DEL_CHECK(q, res, label) G_STMT_START {         \
175
176
177
178
  STATUS (queue, q->sinkpad, "wait for DEL");                           \
  q->waiting_del = TRUE;                                                \
  g_cond_wait (q->item_del, queue->qlock);                              \
  q->waiting_del = FALSE;                                               \
179
  if (res != GST_FLOW_OK) {                                             \
180
181
182
183
184
185
    STATUS (queue, q->srcpad, "received DEL wakeup");                   \
    goto label;                                                         \
  }                                                                     \
  STATUS (queue, q->sinkpad, "received DEL");                           \
} G_STMT_END

186
#define GST_QUEUE2_WAIT_ADD_CHECK(q, res, label) G_STMT_START {         \
187
188
189
190
  STATUS (queue, q->srcpad, "wait for ADD");                            \
  q->waiting_add = TRUE;                                                \
  g_cond_wait (q->item_add, q->qlock);                                  \
  q->waiting_add = FALSE;                                               \
191
  if (res != GST_FLOW_OK) {                                             \
192
193
194
195
196
197
    STATUS (queue, q->srcpad, "received ADD wakeup");                   \
    goto label;                                                         \
  }                                                                     \
  STATUS (queue, q->srcpad, "received ADD");                            \
} G_STMT_END

198
#define GST_QUEUE2_SIGNAL_DEL(q) G_STMT_START {                          \
199
200
201
202
203
204
  if (q->waiting_del) {                                                 \
    STATUS (q, q->srcpad, "signal DEL");                                \
    g_cond_signal (q->item_del);                                        \
  }                                                                     \
} G_STMT_END

205
#define GST_QUEUE2_SIGNAL_ADD(q) G_STMT_START {                          \
206
207
208
209
210
211
212
  if (q->waiting_add) {                                                 \
    STATUS (q, q->sinkpad, "signal ADD");                               \
    g_cond_signal (q->item_add);                                        \
  }                                                                     \
} G_STMT_END

#define _do_init(bla) \
213
214
215
    GST_DEBUG_CATEGORY_INIT (queue_debug, "queue2", 0, "queue element"); \
    GST_DEBUG_CATEGORY_INIT (queue_dataflow, "queue2_dataflow", 0, \
        "dataflow inside the queue element");
216

217
218
GST_BOILERPLATE_FULL (GstQueue2, gst_queue2, GstElement, GST_TYPE_ELEMENT,
    _do_init);
219

220
static void gst_queue2_finalize (GObject * object);
221

222
static void gst_queue2_set_property (GObject * object,
223
    guint prop_id, const GValue * value, GParamSpec * pspec);
224
static void gst_queue2_get_property (GObject * object,
225
226
    guint prop_id, GValue * value, GParamSpec * pspec);

227
228
static GstFlowReturn gst_queue2_chain (GstPad * pad, GstBuffer * buffer);
static GstFlowReturn gst_queue2_bufferalloc (GstPad * pad, guint64 offset,
229
    guint size, GstCaps * caps, GstBuffer ** buf);
230
231
static GstFlowReturn gst_queue2_push_one (GstQueue2 * queue);
static void gst_queue2_loop (GstPad * pad);
232

233
static gboolean gst_queue2_handle_sink_event (GstPad * pad, GstEvent * event);
234

235
236
static gboolean gst_queue2_handle_src_event (GstPad * pad, GstEvent * event);
static gboolean gst_queue2_handle_src_query (GstPad * pad, GstQuery * query);
237
238
static gboolean gst_queue2_handle_query (GstElement * element,
    GstQuery * query);
239

240
241
static GstCaps *gst_queue2_getcaps (GstPad * pad);
static gboolean gst_queue2_acceptcaps (GstPad * pad, GstCaps * caps);
242

243
static GstFlowReturn gst_queue2_get_range (GstPad * pad, guint64 offset,
244
    guint length, GstBuffer ** buffer);
245
static gboolean gst_queue2_src_checkgetrange_function (GstPad * pad);
246

247
248
249
250
static gboolean gst_queue2_src_activate_pull (GstPad * pad, gboolean active);
static gboolean gst_queue2_src_activate_push (GstPad * pad, gboolean active);
static gboolean gst_queue2_sink_activate_push (GstPad * pad, gboolean active);
static GstStateChangeReturn gst_queue2_change_state (GstElement * element,
251
252
    GstStateChange transition);

253
254
static gboolean gst_queue2_is_empty (GstQueue2 * queue);
static gboolean gst_queue2_is_filled (GstQueue2 * queue);
255

256
257
258
static void update_cur_level (GstQueue2 * queue, GstQueue2Range * range);


259
/* static guint gst_queue2_signals[LAST_SIGNAL] = { 0 }; */
260

261
262
263
264
265
266
267
268
269
270
static void
gst_queue2_base_init (gpointer g_class)
{
  GstElementClass *gstelement_class = GST_ELEMENT_CLASS (g_class);

  gst_element_class_add_pad_template (gstelement_class,
      gst_static_pad_template_get (&srctemplate));
  gst_element_class_add_pad_template (gstelement_class,
      gst_static_pad_template_get (&sinktemplate));

271
  gst_element_class_set_details_simple (gstelement_class, "Queue 2",
272
273
274
275
276
277
      "Generic",
      "Simple data queue",
      "Erik Walthinsen <omega@cse.ogi.edu>, "
      "Wim Taymans <wim.taymans@gmail.com>");
}

278
static void
279
gst_queue2_class_init (GstQueue2Class * klass)
280
281
282
283
{
  GObjectClass *gobject_class = G_OBJECT_CLASS (klass);
  GstElementClass *gstelement_class = GST_ELEMENT_CLASS (klass);

284
285
  gobject_class->set_property = gst_queue2_set_property;
  gobject_class->get_property = gst_queue2_get_property;
286
287
288
289
290

  /* properties */
  g_object_class_install_property (gobject_class, PROP_CUR_LEVEL_BYTES,
      g_param_spec_uint ("current-level-bytes", "Current level (kB)",
          "Current amount of data in the queue (bytes)",
291
          0, G_MAXUINT, 0, G_PARAM_READABLE | G_PARAM_STATIC_STRINGS));
292
293
294
  g_object_class_install_property (gobject_class, PROP_CUR_LEVEL_BUFFERS,
      g_param_spec_uint ("current-level-buffers", "Current level (buffers)",
          "Current number of buffers in the queue",
295
          0, G_MAXUINT, 0, G_PARAM_READABLE | G_PARAM_STATIC_STRINGS));
296
297
298
  g_object_class_install_property (gobject_class, PROP_CUR_LEVEL_TIME,
      g_param_spec_uint64 ("current-level-time", "Current level (ns)",
          "Current amount of data in the queue (in ns)",
299
          0, G_MAXUINT64, 0, G_PARAM_READABLE | G_PARAM_STATIC_STRINGS));
300
301
302
303

  g_object_class_install_property (gobject_class, PROP_MAX_SIZE_BYTES,
      g_param_spec_uint ("max-size-bytes", "Max. size (kB)",
          "Max. amount of data in the queue (bytes, 0=disable)",
304
305
          0, G_MAXUINT, DEFAULT_MAX_SIZE_BYTES,
          G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
306
307
  g_object_class_install_property (gobject_class, PROP_MAX_SIZE_BUFFERS,
      g_param_spec_uint ("max-size-buffers", "Max. size (buffers)",
308
309
310
          "Max. number of buffers in the queue (0=disable)", 0, G_MAXUINT,
          DEFAULT_MAX_SIZE_BUFFERS,
          G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
311
312
  g_object_class_install_property (gobject_class, PROP_MAX_SIZE_TIME,
      g_param_spec_uint64 ("max-size-time", "Max. size (ns)",
313
314
          "Max. amount of data in the queue (in ns, 0=disable)", 0, G_MAXUINT64,
          DEFAULT_MAX_SIZE_TIME, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
315
316
317
318

  g_object_class_install_property (gobject_class, PROP_USE_BUFFERING,
      g_param_spec_boolean ("use-buffering", "Use buffering",
          "Emit GST_MESSAGE_BUFFERING based on low-/high-percent thresholds",
319
          DEFAULT_USE_BUFFERING, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
320
321
322
  g_object_class_install_property (gobject_class, PROP_USE_RATE_ESTIMATE,
      g_param_spec_boolean ("use-rate-estimate", "Use Rate Estimate",
          "Estimate the bitrate of the stream to calculate time level",
323
324
          DEFAULT_USE_RATE_ESTIMATE,
          G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
325
326
  g_object_class_install_property (gobject_class, PROP_LOW_PERCENT,
      g_param_spec_int ("low-percent", "Low percent",
327
328
          "Low threshold for buffering to start. Only used if use-buffering is True",
          0, 100, DEFAULT_LOW_PERCENT,
329
          G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
330
331
  g_object_class_install_property (gobject_class, PROP_HIGH_PERCENT,
      g_param_spec_int ("high-percent", "High percent",
332
333
334
          "High threshold for buffering to finish. Only used if use-buffering is True",
          0, 100, DEFAULT_HIGH_PERCENT,
          G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
335

336
337
338
339
340
341
  g_object_class_install_property (gobject_class, PROP_TEMP_TEMPLATE,
      g_param_spec_string ("temp-template", "Temporary File Template",
          "File template to store temporary files in, should contain directory "
          "and XXXXXX. (NULL == disabled)",
          NULL, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));

342
343
  g_object_class_install_property (gobject_class, PROP_TEMP_LOCATION,
      g_param_spec_string ("temp-location", "Temporary File Location",
344
          "Location to store temporary files in (Deprecated: Only read this "
345
          "property, use temp-template to configure the name template)",
346
          NULL, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
347

348
349
350
351
352
353
354
  /**
   * GstQueue2:temp-remove
   *
   * When temp-template is set, remove the temporary file when going to READY.
   *
   * Since: 0.10.26
   */
355
356
357
358
359
  g_object_class_install_property (gobject_class, PROP_TEMP_REMOVE,
      g_param_spec_boolean ("temp-remove", "Remove the Temporary File",
          "Remove the temp-location after use",
          DEFAULT_TEMP_REMOVE, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));

360
361
362
  /**
   * GstQueue2:ring-buffer-max-size
   *
363
364
   * The maximum size of the ring buffer in bytes. If set to 0, the ring
   * buffer is disabled. Default 0.
365
   *
366
   * Since: 0.10.31
367
368
   */
  g_object_class_install_property (gobject_class, PROP_RING_BUFFER_MAX_SIZE,
369
      g_param_spec_uint64 ("ring-buffer-max-size",
370
          "Max. ring buffer size (bytes)",
371
          "Max. amount of data in the ring buffer (bytes, 0 = disabled",
372
          0, G_MAXUINT64, DEFAULT_RING_BUFFER_MAX_SIZE,
373
374
          G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));

375
  /* set several parent class virtual functions */
376
  gobject_class->finalize = gst_queue2_finalize;
377

378
  gstelement_class->change_state = GST_DEBUG_FUNCPTR (gst_queue2_change_state);
379
  gstelement_class->query = GST_DEBUG_FUNCPTR (gst_queue2_handle_query);
380
381
382
}

static void
383
gst_queue2_init (GstQueue2 * queue, GstQueue2Class * g_class)
384
385
386
387
{
  queue->sinkpad = gst_pad_new_from_static_template (&sinktemplate, "sink");

  gst_pad_set_chain_function (queue->sinkpad,
388
      GST_DEBUG_FUNCPTR (gst_queue2_chain));
389
  gst_pad_set_activatepush_function (queue->sinkpad,
390
      GST_DEBUG_FUNCPTR (gst_queue2_sink_activate_push));
391
  gst_pad_set_event_function (queue->sinkpad,
392
      GST_DEBUG_FUNCPTR (gst_queue2_handle_sink_event));
393
  gst_pad_set_getcaps_function (queue->sinkpad,
394
      GST_DEBUG_FUNCPTR (gst_queue2_getcaps));
395
  gst_pad_set_acceptcaps_function (queue->sinkpad,
396
      GST_DEBUG_FUNCPTR (gst_queue2_acceptcaps));
397
  gst_pad_set_bufferalloc_function (queue->sinkpad,
398
      GST_DEBUG_FUNCPTR (gst_queue2_bufferalloc));
399
400
401
402
  gst_element_add_pad (GST_ELEMENT (queue), queue->sinkpad);

  queue->srcpad = gst_pad_new_from_static_template (&srctemplate, "src");

403
  gst_pad_set_activatepull_function (queue->srcpad,
404
      GST_DEBUG_FUNCPTR (gst_queue2_src_activate_pull));
405
  gst_pad_set_activatepush_function (queue->srcpad,
406
      GST_DEBUG_FUNCPTR (gst_queue2_src_activate_push));
407
  gst_pad_set_getrange_function (queue->srcpad,
408
      GST_DEBUG_FUNCPTR (gst_queue2_get_range));
409
  gst_pad_set_checkgetrange_function (queue->srcpad,
410
      GST_DEBUG_FUNCPTR (gst_queue2_src_checkgetrange_function));
411
  gst_pad_set_getcaps_function (queue->srcpad,
412
      GST_DEBUG_FUNCPTR (gst_queue2_getcaps));
413
  gst_pad_set_acceptcaps_function (queue->srcpad,
414
      GST_DEBUG_FUNCPTR (gst_queue2_acceptcaps));
415
  gst_pad_set_event_function (queue->srcpad,
416
      GST_DEBUG_FUNCPTR (gst_queue2_handle_src_event));
417
  gst_pad_set_query_function (queue->srcpad,
418
      GST_DEBUG_FUNCPTR (gst_queue2_handle_src_query));
419
420
421
  gst_element_add_pad (GST_ELEMENT (queue), queue->srcpad);

  /* levels */
422
  GST_QUEUE2_CLEAR_LEVEL (queue->cur_level);
423
424
425
426
427
428
429
430
431
432
433
434
  queue->max_level.buffers = DEFAULT_MAX_SIZE_BUFFERS;
  queue->max_level.bytes = DEFAULT_MAX_SIZE_BYTES;
  queue->max_level.time = DEFAULT_MAX_SIZE_TIME;
  queue->max_level.rate_time = DEFAULT_MAX_SIZE_TIME;
  queue->use_buffering = DEFAULT_USE_BUFFERING;
  queue->use_rate_estimate = DEFAULT_USE_RATE_ESTIMATE;
  queue->low_percent = DEFAULT_LOW_PERCENT;
  queue->high_percent = DEFAULT_HIGH_PERCENT;

  gst_segment_init (&queue->sink_segment, GST_FORMAT_TIME);
  gst_segment_init (&queue->src_segment, GST_FORMAT_TIME);

435
436
437
438
439
  queue->sinktime = GST_CLOCK_TIME_NONE;
  queue->srctime = GST_CLOCK_TIME_NONE;
  queue->sink_tainted = TRUE;
  queue->src_tainted = TRUE;

440
  queue->srcresult = GST_FLOW_WRONG_STATE;
441
  queue->sinkresult = GST_FLOW_WRONG_STATE;
442
  queue->is_eos = FALSE;
443
444
  queue->in_timer = g_timer_new ();
  queue->out_timer = g_timer_new ();
445
446
447
448
449
450
451
452

  queue->qlock = g_mutex_new ();
  queue->waiting_add = FALSE;
  queue->item_add = g_cond_new ();
  queue->waiting_del = FALSE;
  queue->item_del = g_cond_new ();
  queue->queue = g_queue_new ();

453
454
  queue->buffering_percent = 100;

455
  /* tempfile related */
456
  queue->temp_template = NULL;
457
  queue->temp_location = NULL;
458
  queue->temp_location_set = FALSE;
459
  queue->temp_remove = DEFAULT_TEMP_REMOVE;
460

461
  queue->ring_buffer = NULL;
462
  queue->ring_buffer_max_size = DEFAULT_RING_BUFFER_MAX_SIZE;
463

464
465
466
467
468
469
  GST_DEBUG_OBJECT (queue,
      "initialized queue's not_empty & not_full conditions");
}

/* called only once, as opposed to dispose */
static void
470
gst_queue2_finalize (GObject * object)
471
{
472
  GstQueue2 *queue = GST_QUEUE2 (object);
473
474
475
476
477
478
479
480

  GST_DEBUG_OBJECT (queue, "finalizing queue");

  while (!g_queue_is_empty (queue->queue)) {
    GstMiniObject *data = g_queue_pop_head (queue->queue);

    gst_mini_object_unref (data);
  }
481

482
483
484
485
  g_queue_free (queue->queue);
  g_mutex_free (queue->qlock);
  g_cond_free (queue->item_add);
  g_cond_free (queue->item_del);
486
487
  g_timer_destroy (queue->in_timer);
  g_timer_destroy (queue->out_timer);
488

489
  /* temp_file path cleanup  */
490
491
  g_free (queue->temp_template);
  g_free (queue->temp_location);
492

493
494
495
  G_OBJECT_CLASS (parent_class)->finalize (object);
}

496
497
498
499
500
501
static void
debug_ranges (GstQueue2 * queue)
{
  GstQueue2Range *walk;

  for (walk = queue->ranges; walk; walk = walk->next) {
502
503
504
505
506
507
    GST_DEBUG_OBJECT (queue,
        "range [%" G_GUINT64_FORMAT "-%" G_GUINT64_FORMAT "] (rb [%"
        G_GUINT64_FORMAT "-%" G_GUINT64_FORMAT "]), reading %" G_GUINT64_FORMAT
        " current range? %s", walk->offset, walk->writing_pos, walk->rb_offset,
        walk->rb_writing_pos, walk->reading_pos,
        walk == queue->current ? "**y**" : "  n  ");
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
  }
}

/* clear all the downloaded ranges */
static void
clean_ranges (GstQueue2 * queue)
{
  GST_DEBUG_OBJECT (queue, "clean queue ranges");

  g_slice_free_chain (GstQueue2Range, queue->ranges, next);
  queue->ranges = NULL;
  queue->current = NULL;
}

/* find a range that contains @offset or NULL when nothing does */
static GstQueue2Range *
524
find_range (GstQueue2 * queue, guint64 offset)
525
{
526
527
  GstQueue2Range *range = NULL;
  GstQueue2Range *walk;
528
529
530
531
532
533
534
535
536

  /* first do a quick check for the current range */
  for (walk = queue->ranges; walk; walk = walk->next) {
    if (offset >= walk->offset && offset <= walk->writing_pos) {
      /* we can reuse an existing range */
      range = walk;
      break;
    }
  }
Robert Swain's avatar
Robert Swain committed
537
538
539
540
541
542
543
  if (range) {
    GST_DEBUG_OBJECT (queue,
        "found range for %" G_GUINT64_FORMAT ": [%" G_GUINT64_FORMAT "-%"
        G_GUINT64_FORMAT "]", offset, range->offset, range->writing_pos);
  } else {
    GST_DEBUG_OBJECT (queue, "no range for %" G_GUINT64_FORMAT, offset);
  }
544
545
546
  return range;
}

Wim Taymans's avatar
Wim Taymans committed
547
548
549
550
551
552
553
554
555
556
557
558
559
560
static void
update_cur_level (GstQueue2 * queue, GstQueue2Range * range)
{
  guint64 max_reading_pos, writing_pos;

  writing_pos = range->writing_pos;
  max_reading_pos = range->max_reading_pos;

  if (writing_pos > max_reading_pos)
    queue->cur_level.bytes = writing_pos - max_reading_pos;
  else
    queue->cur_level.bytes = 0;
}

561
562
563
564
565
566
567
568
/* make a new range for @offset or reuse an existing range */
static GstQueue2Range *
add_range (GstQueue2 * queue, guint64 offset)
{
  GstQueue2Range *range, *prev, *next;

  GST_DEBUG_OBJECT (queue, "find range for %" G_GUINT64_FORMAT, offset);

569
  if ((range = find_range (queue, offset))) {
570
571
572
573
574
575
576
577
578
579
    GST_DEBUG_OBJECT (queue,
        "reusing range %" G_GUINT64_FORMAT "-%" G_GUINT64_FORMAT, range->offset,
        range->writing_pos);
    range->writing_pos = offset;
  } else {
    GST_DEBUG_OBJECT (queue,
        "new range %" G_GUINT64_FORMAT "-%" G_GUINT64_FORMAT, offset, offset);

    range = g_slice_new0 (GstQueue2Range);
    range->offset = offset;
580
581
    /* we want to write to the next location in the ring buffer */
    range->rb_offset = queue->current ? queue->current->rb_writing_pos : 0;
582
    range->writing_pos = offset;
583
    range->rb_writing_pos = range->rb_offset;
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
    range->reading_pos = offset;
    range->max_reading_pos = offset;

    /* insert sorted */
    prev = NULL;
    next = queue->ranges;
    while (next) {
      if (next->offset > offset) {
        /* insert before next */
        GST_DEBUG_OBJECT (queue,
            "insert before range %p, offset %" G_GUINT64_FORMAT, next,
            next->offset);
        break;
      }
      /* try next */
      prev = next;
      next = next->next;
    }
    range->next = next;
    if (prev)
      prev->next = range;
    else
      queue->ranges = range;
  }
  debug_ranges (queue);

Wim Taymans's avatar
Wim Taymans committed
610
611
612
  /* update the stats for this range */
  update_cur_level (queue, range);

613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
  return range;
}


/* clear and init the download ranges for offset 0 */
static void
init_ranges (GstQueue2 * queue)
{
  GST_DEBUG_OBJECT (queue, "init queue ranges");

  /* get rid of all the current ranges */
  clean_ranges (queue);
  /* make a range for offset 0 */
  queue->current = add_range (queue, 0);
}

629
static gboolean
630
gst_queue2_acceptcaps (GstPad * pad, GstCaps * caps)
631
{
632
  GstQueue2 *queue;
633
634
635
  GstPad *otherpad;
  gboolean result;

636
  queue = GST_QUEUE2 (GST_PAD_PARENT (pad));
637
638
639
640
641
642
643

  otherpad = (pad == queue->srcpad ? queue->sinkpad : queue->srcpad);
  result = gst_pad_peer_accept_caps (otherpad, caps);

  return result;
}

644
static GstCaps *
645
gst_queue2_getcaps (GstPad * pad)
646
{
647
  GstQueue2 *queue;
648
649
650
  GstPad *otherpad;
  GstCaps *result;

651
652
653
  queue = GST_QUEUE2 (gst_pad_get_parent (pad));
  if (G_UNLIKELY (queue == NULL))
    return gst_caps_new_any ();
654
655
656
657
658
659

  otherpad = (pad == queue->srcpad ? queue->sinkpad : queue->srcpad);
  result = gst_pad_peer_get_caps (otherpad);
  if (result == NULL)
    result = gst_caps_new_any ();

660
661
  gst_object_unref (queue);

662
663
664
665
  return result;
}

static GstFlowReturn
666
667
gst_queue2_bufferalloc (GstPad * pad, guint64 offset, guint size,
    GstCaps * caps, GstBuffer ** buf)
668
{
669
  GstQueue2 *queue;
670
671
  GstFlowReturn result;

672
  queue = GST_QUEUE2 (GST_PAD_PARENT (pad));
673
674
675
676
677
678
679
680
681
682

  /* Forward to src pad, without setting caps on the src pad */
  result = gst_pad_alloc_buffer (queue->srcpad, offset, size, caps, buf);

  return result;
}

/* calculate the diff between running time on the sink and src of the queue.
 * This is the total amount of time in the queue. */
static void
683
update_time_level (GstQueue2 * queue)
684
{
685
686
687
688
689
690
  if (queue->sink_tainted) {
    queue->sinktime =
        gst_segment_to_running_time (&queue->sink_segment, GST_FORMAT_TIME,
        queue->sink_segment.last_stop);
    queue->sink_tainted = FALSE;
  }
691

692
693
694
695
696
697
  if (queue->src_tainted) {
    queue->srctime =
        gst_segment_to_running_time (&queue->src_segment, GST_FORMAT_TIME,
        queue->src_segment.last_stop);
    queue->src_tainted = FALSE;
  }
698
699

  GST_DEBUG_OBJECT (queue, "sink %" GST_TIME_FORMAT ", src %" GST_TIME_FORMAT,
700
      GST_TIME_ARGS (queue->sinktime), GST_TIME_ARGS (queue->srctime));
701

702
703
704
  if (queue->sinktime != GST_CLOCK_TIME_NONE
      && queue->srctime != GST_CLOCK_TIME_NONE
      && queue->sinktime >= queue->srctime)
705
    queue->cur_level.time = queue->sinktime - queue->srctime;
706
707
708
709
710
711
712
  else
    queue->cur_level.time = 0;
}

/* take a NEWSEGMENT event and apply the values to segment, updating the time
 * level of queue. */
static void
713
714
apply_segment (GstQueue2 * queue, GstEvent * event, GstSegment * segment,
    gboolean is_sink)
715
716
717
718
719
720
721
722
723
{
  gboolean update;
  GstFormat format;
  gdouble rate, arate;
  gint64 start, stop, time;

  gst_event_parse_new_segment_full (event, &update, &rate, &arate,
      &format, &start, &stop, &time);

724
725
726
727
  GST_DEBUG_OBJECT (queue,
      "received NEWSEGMENT update %d, rate %lf, applied rate %lf, "
      "format %d, "
      "%" G_GINT64_FORMAT " -- %" G_GINT64_FORMAT ", time %"
728
      G_GINT64_FORMAT, update, rate, arate, format, start, stop, time);
729
730

  if (format == GST_FORMAT_BYTES) {
731
732
733
734
735
736
    if (QUEUE_IS_USING_TEMP_FILE (queue)) {
      /* start is where we'll be getting from and as such writing next */
      queue->current = add_range (queue, start);
      /* update the stats for this range */
      update_cur_level (queue, queue->current);
    }
737
738
  }

739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
  /* now configure the values, we use these to track timestamps on the
   * sinkpad. */
  if (format != GST_FORMAT_TIME) {
    /* non-time format, pretent the current time segment is closed with a
     * 0 start and unknown stop time. */
    update = FALSE;
    format = GST_FORMAT_TIME;
    start = 0;
    stop = -1;
    time = 0;
  }
  gst_segment_set_newsegment_full (segment, update,
      rate, arate, format, start, stop, time);

  GST_DEBUG_OBJECT (queue,
      "configured NEWSEGMENT %" GST_SEGMENT_FORMAT, segment);

756
757
758
759
760
  if (is_sink)
    queue->sink_tainted = TRUE;
  else
    queue->src_tainted = TRUE;

761
762
763
764
765
766
  /* segment can update the time level of the queue */
  update_time_level (queue);
}

/* take a buffer and update segment, updating the time level of the queue. */
static void
767
768
apply_buffer (GstQueue2 * queue, GstBuffer * buffer, GstSegment * segment,
    gboolean is_sink)
769
770
771
772
773
774
{
  GstClockTime duration, timestamp;

  timestamp = GST_BUFFER_TIMESTAMP (buffer);
  duration = GST_BUFFER_DURATION (buffer);

775
  /* if no timestamp is set, assume it's continuous with the previous
776
777
778
779
780
781
782
783
784
785
786
787
788
   * time */
  if (timestamp == GST_CLOCK_TIME_NONE)
    timestamp = segment->last_stop;

  /* add duration */
  if (duration != GST_CLOCK_TIME_NONE)
    timestamp += duration;

  GST_DEBUG_OBJECT (queue, "last_stop updated to %" GST_TIME_FORMAT,
      GST_TIME_ARGS (timestamp));

  gst_segment_set_last_stop (segment, GST_FORMAT_TIME, timestamp);

789
790
791
792
793
  if (is_sink)
    queue->sink_tainted = TRUE;
  else
    queue->src_tainted = TRUE;

794
795
796
797
798
  /* calc diff with other end */
  update_time_level (queue);
}

static void
799
update_buffering (GstQueue2 * queue)
800
{
801
  gint64 percent;
802
803
  gboolean post = FALSE;

804
  if (queue->high_percent <= 0)
805
806
    return;

807
#define GET_PERCENT(format,alt_max) ((queue->max_level.format) > 0 ? (queue->cur_level.format) * 100 / ((alt_max) > 0 ? MIN ((alt_max), (queue->max_level.format)) : (queue->max_level.format)) : 0)
808
809
810

  if (queue->is_eos) {
    /* on EOS we are always 100% full, we set the var here so that it we can
Thiago Santos's avatar
Thiago Santos committed
811
     * reuse the logic below to stop buffering */
812
    percent = 100;
813
    GST_LOG_OBJECT (queue, "we are EOS");
814
815
  } else {
    /* figure out the percent we are filled, we take the max of all formats. */
816
817

    if (!QUEUE_IS_USING_RING_BUFFER (queue)) {
818
      percent = GET_PERCENT (bytes, 0);
819
820
    } else {
      guint64 rb_size = queue->ring_buffer_max_size;
821
      percent = GET_PERCENT (bytes, rb_size);
822
    }
823
824
    percent = MAX (percent, GET_PERCENT (time, 0));
    percent = MAX (percent, GET_PERCENT (buffers, 0));
825
826
827

    /* also apply the rate estimate when we need to */
    if (queue->use_rate_estimate)
828
      percent = MAX (percent, GET_PERCENT (rate_time, 0));
829
830
831
832
833
834
835
836
837
838
839
840
  }

  if (queue->is_buffering) {
    post = TRUE;
    /* if we were buffering see if we reached the high watermark */
    if (percent >= queue->high_percent)
      queue->is_buffering = FALSE;
  } else {
    /* we were not buffering, check if we need to start buffering if we drop
     * below the low threshold */
    if (percent < queue->low_percent) {
      queue->is_buffering = TRUE;
841
      queue->buffering_iteration++;
842
843
844
845
      post = TRUE;
    }
  }
  if (post) {
846
    GstMessage *message;
847
    GstBufferingMode mode;
848
    gint64 buffering_left = -1;
849

850
851
852
853
854
855
    /* scale to high percent so that it becomes the 100% mark */
    percent = percent * 100 / queue->high_percent;
    /* clip */
    if (percent > 100)
      percent = 100;

856
857
858
859
860
861
862
863
864
865
866
867
868
869
870
871
872
873
874
875
    if (percent != queue->buffering_percent) {
      queue->buffering_percent = percent;

      if (!QUEUE_IS_USING_QUEUE (queue)) {
        GstFormat fmt = GST_FORMAT_BYTES;
        gint64 duration;

        if (QUEUE_IS_USING_RING_BUFFER (queue))
          mode = GST_BUFFERING_TIMESHIFT;
        else
          mode = GST_BUFFERING_DOWNLOAD;

        if (queue->byte_in_rate > 0) {
          if (gst_pad_query_peer_duration (queue->sinkpad, &fmt, &duration))
            buffering_left =
                (gdouble) ((duration -
                    queue->current->writing_pos) * 1000) / queue->byte_in_rate;
        } else {
          buffering_left = G_MAXINT64;
        }
876
      } else {
877
        mode = GST_BUFFERING_STREAM;
878
      }
879

880
881
882
883
884
      GST_DEBUG_OBJECT (queue, "buffering %d percent", (gint) percent);
      message = gst_message_new_buffering (GST_OBJECT_CAST (queue),
          (gint) percent);
      gst_message_set_buffering_stats (message, mode,
          queue->byte_in_rate, queue->byte_out_rate, buffering_left);
885

886
887
      gst_element_post_message (GST_ELEMENT_CAST (queue), message);
    }
888
  } else {
Thiago Santos's avatar
Thiago Santos committed
889
    GST_DEBUG_OBJECT (queue, "filled %d percent", (gint) percent);
890
891
892
893
894
895
  }

#undef GET_PERCENT
}

static void
896
reset_rate_timer (GstQueue2 * queue)
897
898
899
900
901
{
  queue->bytes_in = 0;
  queue->bytes_out = 0;
  queue->byte_in_rate = 0.0;
  queue->byte_out_rate = 0.0;
902
903
904
905
  queue->last_in_elapsed = 0.0;
  queue->last_out_elapsed = 0.0;
  queue->in_timer_started = FALSE;
  queue->out_timer_started = FALSE;
906
907
}

908
909
/* the interval in seconds to recalculate the rate */
#define RATE_INTERVAL    0.2
910
911
912
913
914
915
916
917
/* Tuning for rate estimation. We use a large window for the input rate because
 * it should be stable when connected to a network. The output rate is less
 * stable (the elements preroll, queues behind a demuxer fill, ...) and should
 * therefore adapt more quickly. */
#define AVG_IN(avg,val)  ((avg) * 15.0 + (val)) / 16.0
#define AVG_OUT(avg,val) ((avg) * 3.0 + (val)) / 4.0

static void
918
update_in_rates (GstQueue2 * queue)
919
920
921
922
{
  gdouble elapsed, period;
  gdouble byte_in_rate;

923
924
925
  if (!queue->in_timer_started) {
    queue->in_timer_started = TRUE;
    g_timer_start (queue->in_timer);
926
927
928
    return;
  }

929
  elapsed = g_timer_elapsed (queue->in_timer, NULL);
930
931

  /* recalc after each interval. */
932
933
  if (queue->last_in_elapsed + RATE_INTERVAL < elapsed) {
    period = elapsed - queue->last_in_elapsed;
934
935

    GST_DEBUG_OBJECT (queue,
936
        "rates: period %f, in %" G_GUINT64_FORMAT, period, queue->bytes_in);
937
938
939
940
941
942
943
944
945

    byte_in_rate = queue->bytes_in / period;

    if (queue->byte_in_rate == 0.0)
      queue->byte_in_rate = byte_in_rate;
    else
      queue->byte_in_rate = AVG_IN (queue->byte_in_rate, byte_in_rate);

    /* reset the values to calculate rate over the next interval */
946
    queue->last_in_elapsed = elapsed;
947
948
    queue->bytes_in = 0;
  }
949

950
951
952
953
  if (queue->byte_in_rate > 0.0) {
    queue->cur_level.rate_time =
        queue->cur_level.bytes / queue->byte_in_rate * GST_SECOND;
  }
954
955
956
957
958
  GST_DEBUG_OBJECT (queue, "rates: in %f, time %" GST_TIME_FORMAT,
      queue->byte_in_rate, GST_TIME_ARGS (queue->cur_level.rate_time));
}

static void
959
update_out_rates (GstQueue2 * queue)
960
961
962
963
964
965
966
967
968
969
970
971
972
973
974
{
  gdouble elapsed, period;
  gdouble byte_out_rate;

  if (!queue->out_timer_started) {
    queue->out_timer_started = TRUE;
    g_timer_start (queue->out_timer);
    return;
  }

  elapsed = g_timer_elapsed (queue->out_timer, NULL);

  /* recalc after each interval. */
  if (queue->last_out_elapsed + RATE_INTERVAL < elapsed) {
    period = elapsed - queue->last_out_elapsed;
975

976
977
978
979
980
981
982
983
984
985
986
987
988
989
    GST_DEBUG_OBJECT (queue,
        "rates: period %f, out %" G_GUINT64_FORMAT, period, queue->bytes_out);

    byte_out_rate = queue->bytes_out / period;

    if (queue->byte_out_rate == 0.0)
      queue->byte_out_rate = byte_out_rate;
    else
      queue->byte_out_rate = AVG_OUT (queue->byte_out_rate, byte_out_rate);

    /* reset the values to calculate rate over the next interval */
    queue->last_out_elapsed = elapsed;
    queue->bytes_out = 0;
  }
990
991
992
993
  if (queue->byte_in_rate > 0.0) {
    queue->cur_level.rate_time =
        queue->cur_level.bytes / queue->byte_in_rate * GST_SECOND;
  }
994
995
  GST_DEBUG_OBJECT (queue, "rates: out %f, time %" GST_TIME_FORMAT,
      queue->byte_out_rate, GST_TIME_ARGS (queue->cur_level.rate_time));
996
997
}

998
999
1000
1001
1002
1003
1004
1005
1006
1007
static void
update_cur_pos (GstQueue2 * queue, GstQueue2Range * range, guint64 pos)
{
  guint64 reading_pos, max_reading_pos;

  reading_pos = pos;
  max_reading_pos = range->max_reading_pos;

  max_reading_pos = MAX (max_reading_pos, reading_pos);

Robert Swain's avatar
Robert Swain committed
1008
  GST_DEBUG_OBJECT (queue,
Robert Swain's avatar
Robert Swain committed
1009
1010
      "updating max_reading_pos from %" G_GUINT64_FORMAT " to %"
      G_GUINT64_FORMAT, range->max_reading_pos, max_reading_pos);
1011
1012
1013
1014
1015
  range->max_reading_pos = max_reading_pos;

  update_cur_level (queue, range);
}

1016
1017
1018
1019
1020
1021
static gboolean
perform_seek_to_offset (GstQueue2 * queue, guint64 offset)
{
  GstEvent *event;
  gboolean res;

1022
1023
  GST_QUEUE2_MUTEX_UNLOCK (queue);

1024
1025
1026
1027
1028
1029
1030
1031
1032
1033
  GST_DEBUG_OBJECT (queue, "Seeking to %" G_GUINT64_FORMAT, offset);

  event =
      gst_event_new_seek (1.0, GST_FORMAT_BYTES,
      GST_SEEK_FLAG_FLUSH | GST_SEEK_FLAG_ACCURATE, GST_SEEK_TYPE_SET, offset,
      GST_SEEK_TYPE_NONE, -1);

  res = gst_pad_push_event (queue->sinkpad, event);
  GST_QUEUE2_MUTEX_LOCK (queue);

Wim Taymans's avatar
Wim Taymans committed
1034
1035
1036
  if (res)
    queue->current = add_range (queue, offset);

1037
  return res;
1038
1039
1040
1041
}

/* see if there is enough data in the file to read a full buffer */
static gboolean
1042
gst_queue2_have_data (GstQueue2 * queue, guint64 offset, guint length)
1043
{
1044
  GstQueue2Range *range;
1045

1046
1047
  GST_DEBUG_OBJECT (queue, "looking for offset %" G_GUINT64_FORMAT ", len %u",
      offset, length);
1048

1049
  if ((range = find_range (queue, offset))) {
1050
    if (queue->current != range) {
1051
      GST_DEBUG_OBJECT (queue, "switching ranges, do seek to range position");
1052
1053
      perform_seek_to_offset (queue, range->writing_pos);
    }
1054

1055
    GST_INFO_OBJECT (queue, "cur_level.bytes %u (max %" G_GUINT64_FORMAT ")",
Wim Taymans's avatar
Wim Taymans committed
1056
        queue->cur_level.bytes, QUEUE_MAX_BYTES (queue));
1057

1058
1059
1060
1061
1062
    /* we have a range for offset */
    GST_DEBUG_OBJECT (queue,
        "we have a range %p, offset %" G_GUINT64_FORMAT ", writing_pos %"
        G_GUINT64_FORMAT, range, range->offset, range->writing_pos);

1063
    if (!QUEUE_IS_USING_RING_BUFFER (queue) && queue->is_eos)
1064
1065
      return TRUE;

1066
    if (offset + length <= range->writing_pos)
1067
      return TRUE;
1068
1069
1070
    else
      GST_DEBUG_OBJECT (queue,
          "Need more data (%" G_GUINT64_FORMAT " bytes more)",
1071
          (offset + length) - range->writing_pos);
1072
1073

  } else {
1074
    GST_INFO_OBJECT (queue, "not found in any range");
1075
1076
    /* we don't have the range, see how far away we are, FIXME, find a good
     * threshold based on the incomming rate. */
1077
    if (!queue->is_eos && queue->current) {
Wim Taymans's avatar
Wim Taymans committed
1078
1079
      if (QUEUE_IS_USING_RING_BUFFER (queue)) {
        if (offset < queue->current->offset || offset >
Robert Swain's avatar
Robert Swain committed
1080
            queue->current->writing_pos + QUEUE_MAX_BYTES (queue) -
Wim Taymans's avatar
Wim Taymans committed
1081
1082
            queue->cur_level.bytes) {
          perform_seek_to_offset (queue, offset);
Robert Swain's avatar
Robert Swain committed
1083
1084
1085
        } else {
          GST_INFO_OBJECT (queue,
              "requested data is within range, wait for data");
Wim Taymans's avatar
Wim Taymans committed
1086
        }
1087
      } else if (offset < queue->current->writing_pos + 200000) {
1088
1089
        update_cur_pos (queue, queue->current, offset + length);
        GST_INFO_OBJECT (queue, "wait for data");
1090
        return FALSE;
1091
1092
      }
    }
1093
1094

    /* too far away, do a seek */
1095
1096
    perform_seek_to_offset (queue, offset);
  }
1097
1098
1099
1100

  return FALSE;
}

1101
1102
1103
1104
1105
1106
1107
1108
#ifdef HAVE_FSEEKO
#define FSEEK_FILE(file,offset)  (fseeko (file, (off_t) offset, SEEK_SET) != 0)
#elif defined (G_OS_UNIX) || defined (G_OS_WIN32)
#define FSEEK_FILE(file,offset)  (lseek (fileno (file), (off_t) offset, SEEK_SET) == (off_t) -1)
#else
#define FSEEK_FILE(file,offset)  (fseek (file, offset, SEEK_SET) != 0)
#endif

1109
static gint64
1110
1111
gst_queue2_read_data_at_offset (GstQueue2 * queue, guint64 offset, guint length,
    guint8 * dst)
1112
{
1113
  guint8 *ring_buffer;
1114
1115
  size_t res;

1116
1117
1118
  ring_buffer = queue->ring_buffer;

  if (QUEUE_IS_USING_TEMP_FILE (queue) && FSEEK_FILE (queue->temp_file, offset))
Wim Taymans's avatar
Wim Taymans committed
1119
    goto seek_failed;
1120
1121

  /* this should not block */
1122
1123
  GST_LOG_OBJECT (queue, "Reading %d bytes from offset %" G_GUINT64_FORMAT,
      length, offset);
1124
1125
1126
1127
1128
1129
1130
  if (QUEUE_IS_USING_TEMP_FILE (queue)) {
    res = fread (dst, 1, length, queue->temp_file);
  } else {
    memcpy (dst, ring_buffer + offset, length);
    res = length;
  }

1131
  GST_LOG_OBJECT (queue, "read %" G_GSIZE_FORMAT " bytes", res);
1132

1133
  if (G_UNLIKELY (res < length)) {
1134
1135
    if (!QUEUE_IS_USING_TEMP_FILE (queue))
      goto could_not_read;
1136
1137
1138
1139
1140
1141
1142
    /* check for errors or EOF */
    if (ferror (queue->temp_file))
      goto could_not_read;
    if (feof (queue->temp_file) && length > 0)
      goto eos;
  }

1143
  return res;
1144
1145