gstmultiqueue.c 60.1 KB
Newer Older
1
2
/* GStreamer
 * Copyright (C) 2006 Edward Hervey <edward@fluendo.com>
3
4
 * Copyright (C) 2007 Jan Schmidt <jan@fluendo.com>
 * Copyright (C) 2007 Wim Taymans <wim@fluendo.com>
5
 * Copyright (C) 2011 Sebastian Dröge <sebastian.droege@collabora.co.uk>
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
 *
 * gstmultiqueue.c:
 *
 * This library is free software; you can redistribute it and/or
 * modify it under the terms of the GNU Library General Public
 * License as published by the Free Software Foundation; either
 * version 2 of the License, or (at your option) any later version.
 *
 * This library is distributed in the hope that it will be useful,
 * but WITHOUT ANY WARRANTY; without even the implied warranty of
 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
 * Library General Public License for more details.
 *
 * You should have received a copy of the GNU Library General Public
 * License along with this library; if not, write to the
 * Free Software Foundation, Inc., 59 Temple Place - Suite 330,
 * Boston, MA 02111-1307, USA.
 */

25
26
27
28
29
30
31
32
33
34
35
36
37
/**
 * SECTION:element-multiqueue
 * @see_also: #GstQueue
 *
 * <refsect2>
 * <para>
 * Multiqueue is similar to a normal #GstQueue with the following additional
 * features:
 * <orderedlist>
 * <listitem>
 *   <itemizedlist><title>Multiple streamhandling</title>
 *   <listitem><para>
 *     The element handles queueing data on more than one stream at once. To
38
39
 *     achieve such a feature it has request sink pads (sink&percnt;d) and
 *     'sometimes' src pads (src&percnt;d).
40
41
42
 *   </para><para>
 *     When requesting a given sinkpad with gst_element_get_request_pad(),
 *     the associated srcpad for that stream will be created.
43
 *     Example: requesting sink1 will generate src1.
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
 *   </para></listitem>
 *   </itemizedlist>
 * </listitem>
 * <listitem>
 *   <itemizedlist><title>Non-starvation on multiple streams</title>
 *   <listitem><para>
 *     If more than one stream is used with the element, the streams' queues
 *     will be dynamically grown (up to a limit), in order to ensure that no
 *     stream is risking data starvation. This guarantees that at any given
 *     time there are at least N bytes queued and available for each individual
 *     stream.
 *   </para><para>
 *     If an EOS event comes through a srcpad, the associated queue will be
 *     considered as 'not-empty' in the queue-size-growing algorithm.
 *   </para></listitem>
 *   </itemizedlist>
 * </listitem>
 * <listitem>
 *   <itemizedlist><title>Non-linked srcpads graceful handling</title>
 *   <listitem><para>
 *     In order to better support dynamic switching between streams, the multiqueue
 *     (unlike the current GStreamer queue) continues to push buffers on non-linked
 *     pads rather than shutting down.
 *   </para><para>
 *     In addition, to prevent a non-linked stream from very quickly consuming all
 *     available buffers and thus 'racing ahead' of the other streams, the element
 *     must ensure that buffers and inlined events for a non-linked stream are pushed
 *     in the same order as they were received, relative to the other streams
 *     controlled by the element. This means that a buffer cannot be pushed to a
 *     non-linked pad any sooner than buffers in any other stream which were received
 *     before it.
 *   </para></listitem>
 *   </itemizedlist>
 * </listitem>
 * </orderedlist>
 * </para>
 * <para>
 *   Data is queued until one of the limits specified by the
 *   #GstMultiQueue:max-size-buffers, #GstMultiQueue:max-size-bytes and/or
 *   #GstMultiQueue:max-size-time properties has been reached. Any attempt to push
 *   more buffers into the queue will block the pushing thread until more space
 *   becomes available. #GstMultiQueue:extra-size-buffers,
 * </para>
 * <para>
 *   #GstMultiQueue:extra-size-bytes and #GstMultiQueue:extra-size-time are
 *   currently unused.
 * </para>
 * <para>
 *   The default queue size limits are 5 buffers, 10MB of data, or
 *   two second worth of data, whichever is reached first. Note that the number
 *   of buffers will dynamically grow depending on the fill level of 
 *   other queues.
 * </para>
 * <para>
 *   The #GstMultiQueue::underrun signal is emitted when all of the queues
 *   are empty. The #GstMultiQueue::overrun signal is emitted when one of the
 *   queues is filled.
 *   Both signals are emitted from the context of the streaming thread.
 * </para>
 * </refsect2>
 *
 * Last reviewed on 2008-01-25 (0.10.17)
 */

108
109
110
111
112
#ifdef HAVE_CONFIG_H
#  include "config.h"
#endif

#include <gst/gst.h>
113
#include <stdio.h>
114
#include "gstmultiqueue.h"
115
#include <gst/glib-compat-private.h>
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138

/**
 * GstSingleQueue:
 * @sinkpad: associated sink #GstPad
 * @srcpad: associated source #GstPad
 *
 * Structure containing all information and properties about
 * a single queue.
 */
typedef struct _GstSingleQueue GstSingleQueue;

struct _GstSingleQueue
{
  /* unique identifier of the queue */
  guint id;

  GstMultiQueue *mqueue;

  GstPad *sinkpad;
  GstPad *srcpad;

  /* flowreturn of previous srcpad push */
  GstFlowReturn srcresult;
139
140

  /* segments */
141
142
  GstSegment sink_segment;
  GstSegment src_segment;
143

144
145
146
147
148
  /* position of src/sink */
  GstClockTime sinktime, srctime;
  /* TRUE if either position needs to be recalculated */
  gboolean sink_tainted, src_tainted;

149
150
151
  /* queue of data */
  GstDataQueue *queue;
  GstDataQueueSize max_size, extra_size;
152
153
  GstClockTime cur_time;
  gboolean is_eos;
154
  gboolean flushing;
155
156
157
158

  /* Protected by global lock */
  guint32 nextid;               /* ID of the next object waiting to be pushed */
  guint32 oldid;                /* ID of the last object pushed (last in a series) */
159
  guint32 last_oldid;           /* Previously observed old_id, reset to MAXUINT32 on flush */
160
161
  GstClockTime next_time;       /* End running time of next buffer to be pushed */
  GstClockTime last_time;       /* Start running time of last pushed buffer */
Wim Taymans's avatar
Wim Taymans committed
162
  GCond turn;                   /* SingleQueue turn waiting conditional */
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
};


/* Extension of GstDataQueueItem structure for our usage */
typedef struct _GstMultiQueueItem GstMultiQueueItem;

struct _GstMultiQueueItem
{
  GstMiniObject *object;
  guint size;
  guint64 duration;
  gboolean visible;

  GDestroyNotify destroy;
  guint32 posid;
};

Wim Taymans's avatar
Wim Taymans committed
180
static GstSingleQueue *gst_single_queue_new (GstMultiQueue * mqueue, guint id);
181
static void gst_single_queue_free (GstSingleQueue * squeue);
182
183

static void wake_up_next_non_linked (GstMultiQueue * mq);
184
static void compute_high_id (GstMultiQueue * mq);
185
static void compute_high_time (GstMultiQueue * mq);
186
static void single_queue_overrun_cb (GstDataQueue * dq, GstSingleQueue * sq);
187
static void single_queue_underrun_cb (GstDataQueue * dq, GstSingleQueue * sq);
188

Wim Taymans's avatar
Wim Taymans committed
189
static GstStaticPadTemplate sinktemplate = GST_STATIC_PAD_TEMPLATE ("sink_%u",
190
191
192
193
    GST_PAD_SINK,
    GST_PAD_REQUEST,
    GST_STATIC_CAPS_ANY);

Wim Taymans's avatar
Wim Taymans committed
194
static GstStaticPadTemplate srctemplate = GST_STATIC_PAD_TEMPLATE ("src_%u",
195
196
197
198
199
200
201
    GST_PAD_SRC,
    GST_PAD_SOMETIMES,
    GST_STATIC_CAPS_ANY);

GST_DEBUG_CATEGORY_STATIC (multi_queue_debug);
#define GST_CAT_DEFAULT (multi_queue_debug)

Wim Taymans's avatar
Wim Taymans committed
202
203
204
205
206
207
208
209
/* Signals and args */
enum
{
  SIGNAL_UNDERRUN,
  SIGNAL_OVERRUN,
  LAST_SIGNAL
};

210
211
212
213
/* default limits, we try to keep up to 2 seconds of data and if there is not
 * time, up to 10 MB. The number of buffers is dynamically scaled to make sure
 * there is data in the queues. Normally, the byte and time limits are not hit
 * in theses conditions. */
214
#define DEFAULT_MAX_SIZE_BYTES 10 * 1024 * 1024 /* 10 MB */
215
216
#define DEFAULT_MAX_SIZE_BUFFERS 5
#define DEFAULT_MAX_SIZE_TIME 2 * GST_SECOND
217

218
219
/* second limits. When we hit one of the above limits we are probably dealing
 * with a badly muxed file and we scale the limits to these emergency values.
Wim Taymans's avatar
Wim Taymans committed
220
221
222
223
 * This is currently not yet implemented.
 * Since we dynamically scale the queue buffer size up to the limits but avoid
 * going above the max-size-buffers when we can, we don't really need this
 * aditional extra size. */
224
#define DEFAULT_EXTRA_SIZE_BYTES 10 * 1024 * 1024       /* 10 MB */
225
226
#define DEFAULT_EXTRA_SIZE_BUFFERS 5
#define DEFAULT_EXTRA_SIZE_TIME 3 * GST_SECOND
227

228
#define DEFAULT_USE_BUFFERING FALSE
229
230
#define DEFAULT_LOW_PERCENT   10
#define DEFAULT_HIGH_PERCENT  99
231
#define DEFAULT_SYNC_BY_RUNNING_TIME FALSE
232

233
234
enum
{
Wim Taymans's avatar
Wim Taymans committed
235
236
237
238
239
240
241
  PROP_0,
  PROP_EXTRA_SIZE_BYTES,
  PROP_EXTRA_SIZE_BUFFERS,
  PROP_EXTRA_SIZE_TIME,
  PROP_MAX_SIZE_BYTES,
  PROP_MAX_SIZE_BUFFERS,
  PROP_MAX_SIZE_TIME,
242
  PROP_USE_BUFFERING,
243
244
  PROP_LOW_PERCENT,
  PROP_HIGH_PERCENT,
245
  PROP_SYNC_BY_RUNNING_TIME,
Wim Taymans's avatar
Wim Taymans committed
246
  PROP_LAST
247
248
249
};

#define GST_MULTI_QUEUE_MUTEX_LOCK(q) G_STMT_START {                          \
Wim Taymans's avatar
Wim Taymans committed
250
  g_mutex_lock (&q->qlock);                                              \
251
252
253
} G_STMT_END

#define GST_MULTI_QUEUE_MUTEX_UNLOCK(q) G_STMT_START {                        \
Wim Taymans's avatar
Wim Taymans committed
254
  g_mutex_unlock (&q->qlock);                                            \
255
256
257
258
259
260
261
262
263
} G_STMT_END

static void gst_multi_queue_finalize (GObject * object);
static void gst_multi_queue_set_property (GObject * object,
    guint prop_id, const GValue * value, GParamSpec * pspec);
static void gst_multi_queue_get_property (GObject * object,
    guint prop_id, GValue * value, GParamSpec * pspec);

static GstPad *gst_multi_queue_request_new_pad (GstElement * element,
264
    GstPadTemplate * temp, const gchar * name, const GstCaps * caps);
265
static void gst_multi_queue_release_pad (GstElement * element, GstPad * pad);
266
267
static GstStateChangeReturn gst_multi_queue_change_state (GstElement *
    element, GstStateChange transition);
268

269
270
static void gst_multi_queue_loop (GstPad * pad);

271
#define _do_init \
272
  GST_DEBUG_CATEGORY_INIT (multi_queue_debug, "multiqueue", 0, "multiqueue element");
273
274
275
#define gst_multi_queue_parent_class parent_class
G_DEFINE_TYPE_WITH_CODE (GstMultiQueue, gst_multi_queue, GST_TYPE_ELEMENT,
    _do_init);
276
277
278
279
280
281
282
283
284

static guint gst_multi_queue_signals[LAST_SIGNAL] = { 0 };

static void
gst_multi_queue_class_init (GstMultiQueueClass * klass)
{
  GObjectClass *gobject_class = G_OBJECT_CLASS (klass);
  GstElementClass *gstelement_class = GST_ELEMENT_CLASS (klass);

285
286
  gobject_class->set_property = gst_multi_queue_set_property;
  gobject_class->get_property = gst_multi_queue_get_property;
287
288

  /* SIGNALS */
289
290
291
292
293
294
295
296
297
298

  /**
   * GstMultiQueue::underrun:
   * @multiqueue: the multqueue instance
   *
   * This signal is emitted from the streaming thread when there is
   * no data in any of the queues inside the multiqueue instance (underrun).
   *
   * This indicates either starvation or EOS from the upstream data sources.
   */
299
300
301
302
303
  gst_multi_queue_signals[SIGNAL_UNDERRUN] =
      g_signal_new ("underrun", G_TYPE_FROM_CLASS (klass), G_SIGNAL_RUN_FIRST,
      G_STRUCT_OFFSET (GstMultiQueueClass, underrun), NULL, NULL,
      g_cclosure_marshal_VOID__VOID, G_TYPE_NONE, 0);

304
305
306
307
308
309
310
311
312
313
314
  /**
   * GstMultiQueue::overrun:
   * @multiqueue: the multiqueue instance
   *
   * Reports that one of the queues in the multiqueue is full (overrun).
   * A queue is full if the total amount of data inside it (num-buffers, time,
   * size) is higher than the boundary values which can be set through the
   * GObject properties.
   *
   * This can be used as an indicator of pre-roll. 
   */
315
316
317
318
319
320
321
  gst_multi_queue_signals[SIGNAL_OVERRUN] =
      g_signal_new ("overrun", G_TYPE_FROM_CLASS (klass), G_SIGNAL_RUN_FIRST,
      G_STRUCT_OFFSET (GstMultiQueueClass, overrun), NULL, NULL,
      g_cclosure_marshal_VOID__VOID, G_TYPE_NONE, 0);

  /* PROPERTIES */

Wim Taymans's avatar
Wim Taymans committed
322
  g_object_class_install_property (gobject_class, PROP_MAX_SIZE_BYTES,
323
324
      g_param_spec_uint ("max-size-bytes", "Max. size (kB)",
          "Max. amount of data in the queue (bytes, 0=disable)",
325
326
          0, G_MAXUINT, DEFAULT_MAX_SIZE_BYTES,
          G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
Wim Taymans's avatar
Wim Taymans committed
327
  g_object_class_install_property (gobject_class, PROP_MAX_SIZE_BUFFERS,
328
      g_param_spec_uint ("max-size-buffers", "Max. size (buffers)",
329
330
331
          "Max. number of buffers in the queue (0=disable)", 0, G_MAXUINT,
          DEFAULT_MAX_SIZE_BUFFERS,
          G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
Wim Taymans's avatar
Wim Taymans committed
332
  g_object_class_install_property (gobject_class, PROP_MAX_SIZE_TIME,
333
      g_param_spec_uint64 ("max-size-time", "Max. size (ns)",
334
335
          "Max. amount of data in the queue (in ns, 0=disable)", 0, G_MAXUINT64,
          DEFAULT_MAX_SIZE_TIME, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
336

Wim Taymans's avatar
Wim Taymans committed
337
  g_object_class_install_property (gobject_class, PROP_EXTRA_SIZE_BYTES,
338
      g_param_spec_uint ("extra-size-bytes", "Extra Size (kB)",
Wim Taymans's avatar
Wim Taymans committed
339
340
          "Amount of data the queues can grow if one of them is empty (bytes, 0=disable)"
          " (NOT IMPLEMENTED)",
341
342
          0, G_MAXUINT, DEFAULT_EXTRA_SIZE_BYTES,
          G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
Wim Taymans's avatar
Wim Taymans committed
343
  g_object_class_install_property (gobject_class, PROP_EXTRA_SIZE_BUFFERS,
344
      g_param_spec_uint ("extra-size-buffers", "Extra Size (buffers)",
Wim Taymans's avatar
Wim Taymans committed
345
346
          "Amount of buffers the queues can grow if one of them is empty (0=disable)"
          " (NOT IMPLEMENTED)",
347
348
          0, G_MAXUINT, DEFAULT_EXTRA_SIZE_BUFFERS,
          G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
Wim Taymans's avatar
Wim Taymans committed
349
  g_object_class_install_property (gobject_class, PROP_EXTRA_SIZE_TIME,
350
      g_param_spec_uint64 ("extra-size-time", "Extra Size (ns)",
Wim Taymans's avatar
Wim Taymans committed
351
352
          "Amount of time the queues can grow if one of them is empty (in ns, 0=disable)"
          " (NOT IMPLEMENTED)",
353
354
          0, G_MAXUINT64, DEFAULT_EXTRA_SIZE_TIME,
          G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
355

356
357
358
359
360
361
362
363
364
365
  /**
   * GstMultiQueue:use-buffering
   * 
   * Enable the buffering option in multiqueue so that BUFFERING messages are
   * emited based on low-/high-percent thresholds.
   *
   * Since: 0.10.26
   */
  g_object_class_install_property (gobject_class, PROP_USE_BUFFERING,
      g_param_spec_boolean ("use-buffering", "Use buffering",
366
          "Emit GST_MESSAGE_BUFFERING based on low-/high-percent thresholds",
367
          DEFAULT_USE_BUFFERING, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
368
369
370
371
372
373
374
375
376
  /**
   * GstMultiQueue:low-percent
   * 
   * Low threshold percent for buffering to start.
   *
   * Since: 0.10.26
   */
  g_object_class_install_property (gobject_class, PROP_LOW_PERCENT,
      g_param_spec_int ("low-percent", "Low percent",
377
          "Low threshold for buffering to start", 0, 100,
378
379
380
381
382
383
384
385
386
387
          DEFAULT_LOW_PERCENT, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
  /**
   * GstMultiQueue:high-percent
   * 
   * High threshold percent for buffering to finish.
   *
   * Since: 0.10.26
   */
  g_object_class_install_property (gobject_class, PROP_HIGH_PERCENT,
      g_param_spec_int ("high-percent", "High percent",
388
          "High threshold for buffering to finish", 0, 100,
389
390
          DEFAULT_HIGH_PERCENT, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));

391
392
393
394
395
396
397
398
399
  /**
   * GstMultiQueue:sync-by-running-time
   * 
   * If enabled multiqueue will synchronize deactivated or not-linked streams
   * to the activated and linked streams by taking the running time.
   * Otherwise multiqueue will synchronize the deactivated or not-linked
   * streams by keeping the order in which buffers and events arrived compared
   * to active and linked streams.
   *
400
   * Since: 0.10.36
401
402
403
404
405
406
   */
  g_object_class_install_property (gobject_class, PROP_SYNC_BY_RUNNING_TIME,
      g_param_spec_boolean ("sync-by-running-time", "Sync By Running Time",
          "Synchronize deactivated or not-linked streams by running time",
          DEFAULT_SYNC_BY_RUNNING_TIME,
          G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
407

408
  gobject_class->finalize = gst_multi_queue_finalize;
409

410
411
412
413
414
415
416
417
  gst_element_class_set_details_simple (gstelement_class,
      "MultiQueue",
      "Generic", "Multiple data queue", "Edward Hervey <edward@fluendo.com>");
  gst_element_class_add_pad_template (gstelement_class,
      gst_static_pad_template_get (&sinktemplate));
  gst_element_class_add_pad_template (gstelement_class,
      gst_static_pad_template_get (&srctemplate));

418
419
420
421
  gstelement_class->request_new_pad =
      GST_DEBUG_FUNCPTR (gst_multi_queue_request_new_pad);
  gstelement_class->release_pad =
      GST_DEBUG_FUNCPTR (gst_multi_queue_release_pad);
422
423
  gstelement_class->change_state =
      GST_DEBUG_FUNCPTR (gst_multi_queue_change_state);
424
425
426
}

static void
427
gst_multi_queue_init (GstMultiQueue * mqueue)
428
429
430
431
432
433
434
435
436
437
438
439
{
  mqueue->nbqueues = 0;
  mqueue->queues = NULL;

  mqueue->max_size.bytes = DEFAULT_MAX_SIZE_BYTES;
  mqueue->max_size.visible = DEFAULT_MAX_SIZE_BUFFERS;
  mqueue->max_size.time = DEFAULT_MAX_SIZE_TIME;

  mqueue->extra_size.bytes = DEFAULT_EXTRA_SIZE_BYTES;
  mqueue->extra_size.visible = DEFAULT_EXTRA_SIZE_BUFFERS;
  mqueue->extra_size.time = DEFAULT_EXTRA_SIZE_TIME;

440
  mqueue->use_buffering = DEFAULT_USE_BUFFERING;
441
442
  mqueue->low_percent = DEFAULT_LOW_PERCENT;
  mqueue->high_percent = DEFAULT_HIGH_PERCENT;
443

444
445
  mqueue->sync_by_running_time = DEFAULT_SYNC_BY_RUNNING_TIME;

446
  mqueue->counter = 1;
447
  mqueue->highid = -1;
448
  mqueue->high_time = GST_CLOCK_TIME_NONE;
449

Wim Taymans's avatar
Wim Taymans committed
450
  g_mutex_init (&mqueue->qlock);
451
452
453
454
455
456
457
}

static void
gst_multi_queue_finalize (GObject * object)
{
  GstMultiQueue *mqueue = GST_MULTI_QUEUE (object);

458
  g_list_foreach (mqueue->queues, (GFunc) gst_single_queue_free, NULL);
459
  g_list_free (mqueue->queues);
460
  mqueue->queues = NULL;
461
  mqueue->queues_cookie++;
462
463

  /* free/unref instance data */
Wim Taymans's avatar
Wim Taymans committed
464
  g_mutex_clear (&mqueue->qlock);
465

466
  G_OBJECT_CLASS (parent_class)->finalize (object);
467
468
}

469
470
471
472
#define SET_CHILD_PROPERTY(mq,format) G_STMT_START {	        \
    GList * tmp = mq->queues;					\
    while (tmp) {						\
      GstSingleQueue *q = (GstSingleQueue*)tmp->data;		\
473
      q->max_size.format = mq->max_size.format;                 \
474
475
      tmp = g_list_next(tmp);					\
    };								\
476
477
478
479
480
481
482
483
484
} G_STMT_END

static void
gst_multi_queue_set_property (GObject * object, guint prop_id,
    const GValue * value, GParamSpec * pspec)
{
  GstMultiQueue *mq = GST_MULTI_QUEUE (object);

  switch (prop_id) {
Wim Taymans's avatar
Wim Taymans committed
485
    case PROP_MAX_SIZE_BYTES:
486
      GST_MULTI_QUEUE_MUTEX_LOCK (mq);
487
      mq->max_size.bytes = g_value_get_uint (value);
488
      SET_CHILD_PROPERTY (mq, bytes);
489
      GST_MULTI_QUEUE_MUTEX_UNLOCK (mq);
490
      break;
Wim Taymans's avatar
Wim Taymans committed
491
    case PROP_MAX_SIZE_BUFFERS:
492
      GST_MULTI_QUEUE_MUTEX_LOCK (mq);
493
      mq->max_size.visible = g_value_get_uint (value);
494
      SET_CHILD_PROPERTY (mq, visible);
495
      GST_MULTI_QUEUE_MUTEX_UNLOCK (mq);
496
      break;
Wim Taymans's avatar
Wim Taymans committed
497
    case PROP_MAX_SIZE_TIME:
498
      GST_MULTI_QUEUE_MUTEX_LOCK (mq);
499
      mq->max_size.time = g_value_get_uint64 (value);
500
      SET_CHILD_PROPERTY (mq, time);
501
      GST_MULTI_QUEUE_MUTEX_UNLOCK (mq);
502
      break;
Wim Taymans's avatar
Wim Taymans committed
503
    case PROP_EXTRA_SIZE_BYTES:
504
505
      mq->extra_size.bytes = g_value_get_uint (value);
      break;
Wim Taymans's avatar
Wim Taymans committed
506
    case PROP_EXTRA_SIZE_BUFFERS:
507
508
      mq->extra_size.visible = g_value_get_uint (value);
      break;
Wim Taymans's avatar
Wim Taymans committed
509
    case PROP_EXTRA_SIZE_TIME:
510
511
      mq->extra_size.time = g_value_get_uint64 (value);
      break;
512
513
514
    case PROP_USE_BUFFERING:
      mq->use_buffering = g_value_get_boolean (value);
      break;
515
516
517
518
519
520
    case PROP_LOW_PERCENT:
      mq->low_percent = g_value_get_int (value);
      break;
    case PROP_HIGH_PERCENT:
      mq->high_percent = g_value_get_int (value);
      break;
521
522
523
    case PROP_SYNC_BY_RUNNING_TIME:
      mq->sync_by_running_time = g_value_get_boolean (value);
      break;
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
    default:
      G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
      break;
  }
}

static void
gst_multi_queue_get_property (GObject * object, guint prop_id,
    GValue * value, GParamSpec * pspec)
{
  GstMultiQueue *mq = GST_MULTI_QUEUE (object);

  GST_MULTI_QUEUE_MUTEX_LOCK (mq);

  switch (prop_id) {
Wim Taymans's avatar
Wim Taymans committed
539
    case PROP_EXTRA_SIZE_BYTES:
540
541
      g_value_set_uint (value, mq->extra_size.bytes);
      break;
Wim Taymans's avatar
Wim Taymans committed
542
    case PROP_EXTRA_SIZE_BUFFERS:
543
544
      g_value_set_uint (value, mq->extra_size.visible);
      break;
Wim Taymans's avatar
Wim Taymans committed
545
    case PROP_EXTRA_SIZE_TIME:
546
547
      g_value_set_uint64 (value, mq->extra_size.time);
      break;
Wim Taymans's avatar
Wim Taymans committed
548
    case PROP_MAX_SIZE_BYTES:
549
550
      g_value_set_uint (value, mq->max_size.bytes);
      break;
Wim Taymans's avatar
Wim Taymans committed
551
    case PROP_MAX_SIZE_BUFFERS:
552
553
      g_value_set_uint (value, mq->max_size.visible);
      break;
Wim Taymans's avatar
Wim Taymans committed
554
    case PROP_MAX_SIZE_TIME:
555
556
      g_value_set_uint64 (value, mq->max_size.time);
      break;
557
558
559
    case PROP_USE_BUFFERING:
      g_value_set_boolean (value, mq->use_buffering);
      break;
560
561
562
563
564
565
    case PROP_LOW_PERCENT:
      g_value_set_int (value, mq->low_percent);
      break;
    case PROP_HIGH_PERCENT:
      g_value_set_int (value, mq->high_percent);
      break;
566
567
568
    case PROP_SYNC_BY_RUNNING_TIME:
      g_value_set_boolean (value, mq->sync_by_running_time);
      break;
569
570
571
572
573
574
575
576
    default:
      G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
      break;
  }

  GST_MULTI_QUEUE_MUTEX_UNLOCK (mq);
}

577
static GstIterator *
Wim Taymans's avatar
Wim Taymans committed
578
gst_multi_queue_iterate_internal_links (GstPad * pad, GstObject * parent)
579
{
580
581
582
  GstIterator *it = NULL;
  GstPad *opad;
  GstSingleQueue *squeue;
Wim Taymans's avatar
Wim Taymans committed
583
  GstMultiQueue *mq = GST_MULTI_QUEUE (parent);
584
  GValue val = { 0, };
585

586
587
588
589
  GST_MULTI_QUEUE_MUTEX_LOCK (mq);
  squeue = gst_pad_get_element_private (pad);
  if (!squeue)
    goto out;
590

591
592
593
594
595
596
  if (squeue->sinkpad == pad)
    opad = gst_object_ref (squeue->srcpad);
  else if (squeue->srcpad == pad)
    opad = gst_object_ref (squeue->sinkpad);
  else
    goto out;
597

598
599
600
601
  g_value_init (&val, GST_TYPE_PAD);
  g_value_set_object (&val, opad);
  it = gst_iterator_new_single (GST_TYPE_PAD, &val);
  g_value_unset (&val);
602

603
  gst_object_unref (opad);
604

605
606
out:
  GST_MULTI_QUEUE_MUTEX_UNLOCK (mq);
607

608
  return it;
609
610
}

611

612
613
614
615
616
617
/*
 * GstElement methods
 */

static GstPad *
gst_multi_queue_request_new_pad (GstElement * element, GstPadTemplate * temp,
618
    const gchar * name, const GstCaps * caps)
619
620
621
{
  GstMultiQueue *mqueue = GST_MULTI_QUEUE (element);
  GstSingleQueue *squeue;
Wim Taymans's avatar
Wim Taymans committed
622
  guint temp_id = -1;
623

624
  if (name) {
Wim Taymans's avatar
Wim Taymans committed
625
    sscanf (name + 4, "_%u", &temp_id);
626
    GST_LOG_OBJECT (element, "name : %s (id %d)", GST_STR_NULL (name), temp_id);
627
  }
628
629

  /* Create a new single queue, add the sink and source pad and return the sink pad */
630
  squeue = gst_single_queue_new (mqueue, temp_id);
631
632
633
634

  GST_DEBUG_OBJECT (mqueue, "Returning pad %s:%s",
      GST_DEBUG_PAD_NAME (squeue->sinkpad));

635
  return squeue ? squeue->sinkpad : NULL;
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
}

static void
gst_multi_queue_release_pad (GstElement * element, GstPad * pad)
{
  GstMultiQueue *mqueue = GST_MULTI_QUEUE (element);
  GstSingleQueue *sq = NULL;
  GList *tmp;

  GST_LOG_OBJECT (element, "pad %s:%s", GST_DEBUG_PAD_NAME (pad));

  GST_MULTI_QUEUE_MUTEX_LOCK (mqueue);
  /* Find which single queue it belongs to, knowing that it should be a sinkpad */
  for (tmp = mqueue->queues; tmp; tmp = g_list_next (tmp)) {
    sq = (GstSingleQueue *) tmp->data;

    if (sq->sinkpad == pad)
      break;
  }

  if (!tmp) {
    GST_WARNING_OBJECT (mqueue, "That pad doesn't belong to this element ???");
658
    GST_MULTI_QUEUE_MUTEX_UNLOCK (mqueue);
659
660
661
    return;
  }

662
663
664
  /* FIXME: The removal of the singlequeue should probably not happen until it
   * finishes draining */

665
666
  /* remove it from the list */
  mqueue->queues = g_list_delete_link (mqueue->queues, tmp);
667
  mqueue->queues_cookie++;
668

669
670
671
  /* FIXME : recompute next-non-linked */
  GST_MULTI_QUEUE_MUTEX_UNLOCK (mqueue);

672
673
674
  /* delete SingleQueue */
  gst_data_queue_set_flushing (sq->queue, TRUE);

675
676
  gst_pad_set_active (sq->srcpad, FALSE);
  gst_pad_set_active (sq->sinkpad, FALSE);
677
678
  gst_pad_set_element_private (sq->srcpad, NULL);
  gst_pad_set_element_private (sq->sinkpad, NULL);
679
680
  gst_element_remove_pad (element, sq->srcpad);
  gst_element_remove_pad (element, sq->sinkpad);
681
  gst_single_queue_free (sq);
682
683
}

684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
static GstStateChangeReturn
gst_multi_queue_change_state (GstElement * element, GstStateChange transition)
{
  GstMultiQueue *mqueue = GST_MULTI_QUEUE (element);
  GstSingleQueue *sq = NULL;
  GstStateChangeReturn result;

  switch (transition) {
    case GST_STATE_CHANGE_READY_TO_PAUSED:{
      GList *tmp;

      /* Set all pads to non-flushing */
      GST_MULTI_QUEUE_MUTEX_LOCK (mqueue);
      for (tmp = mqueue->queues; tmp; tmp = g_list_next (tmp)) {
        sq = (GstSingleQueue *) tmp->data;
        sq->flushing = FALSE;
      }
      GST_MULTI_QUEUE_MUTEX_UNLOCK (mqueue);
      break;
    }
    case GST_STATE_CHANGE_PAUSED_TO_READY:{
      GList *tmp;

      /* Un-wait all waiting pads */
      GST_MULTI_QUEUE_MUTEX_LOCK (mqueue);
      for (tmp = mqueue->queues; tmp; tmp = g_list_next (tmp)) {
        sq = (GstSingleQueue *) tmp->data;
        sq->flushing = TRUE;
Wim Taymans's avatar
Wim Taymans committed
712
        g_cond_signal (&sq->turn);
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
      }
      GST_MULTI_QUEUE_MUTEX_UNLOCK (mqueue);
      break;
    }
    default:
      break;
  }

  result = GST_ELEMENT_CLASS (parent_class)->change_state (element, transition);

  switch (transition) {
    default:
      break;
  }

  return result;



}

734
static gboolean
735
736
737
738
739
740
741
742
gst_single_queue_flush (GstMultiQueue * mq, GstSingleQueue * sq, gboolean flush)
{
  gboolean result;

  GST_DEBUG_OBJECT (mq, "flush %s queue %d", (flush ? "start" : "stop"),
      sq->id);

  if (flush) {
743
    sq->srcresult = GST_FLOW_FLUSHING;
744
745
    gst_data_queue_set_flushing (sq->queue, TRUE);

746
747
    sq->flushing = TRUE;

748
749
750
751
    /* wake up non-linked task */
    GST_LOG_OBJECT (mq, "SingleQueue %d : waking up eventually waiting task",
        sq->id);
    GST_MULTI_QUEUE_MUTEX_LOCK (mq);
Wim Taymans's avatar
Wim Taymans committed
752
    g_cond_signal (&sq->turn);
753
754
755
756
    GST_MULTI_QUEUE_MUTEX_UNLOCK (mq);

    GST_LOG_OBJECT (mq, "SingleQueue %d : pausing task", sq->id);
    result = gst_pad_pause_task (sq->srcpad);
757
    sq->sink_tainted = sq->src_tainted = TRUE;
758
759
760
761
  } else {
    gst_data_queue_flush (sq->queue);
    gst_segment_init (&sq->sink_segment, GST_FORMAT_TIME);
    gst_segment_init (&sq->src_segment, GST_FORMAT_TIME);
762
    /* All pads start off not-linked for a smooth kick-off */
763
    sq->srcresult = GST_FLOW_OK;
764
    sq->cur_time = 0;
765
766
    sq->max_size.visible = mq->max_size.visible;
    sq->is_eos = FALSE;
767
768
    sq->nextid = 0;
    sq->oldid = 0;
769
    sq->last_oldid = G_MAXUINT32;
770
771
    sq->next_time = GST_CLOCK_TIME_NONE;
    sq->last_time = GST_CLOCK_TIME_NONE;
772
773
    gst_data_queue_set_flushing (sq->queue, FALSE);

774
775
776
777
778
    /* Reset high time to be recomputed next */
    GST_MULTI_QUEUE_MUTEX_LOCK (mq);
    mq->high_time = GST_CLOCK_TIME_NONE;
    GST_MULTI_QUEUE_MUTEX_UNLOCK (mq);

779
780
    sq->flushing = FALSE;

781
782
783
784
785
786
787
788
    GST_LOG_OBJECT (mq, "SingleQueue %d : starting task", sq->id);
    result =
        gst_pad_start_task (sq->srcpad, (GstTaskFunction) gst_multi_queue_loop,
        sq->srcpad);
  }
  return result;
}

789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
static void
update_buffering (GstMultiQueue * mq, GstSingleQueue * sq)
{
  GstDataQueueSize size;
  gint percent, tmp;
  gboolean post = FALSE;

  /* nothing to dowhen we are not in buffering mode */
  if (!mq->use_buffering)
    return;

  gst_data_queue_get_level (sq->queue, &size);

  GST_DEBUG_OBJECT (mq,
      "queue %d: visible %u/%u, bytes %u/%u, time %" G_GUINT64_FORMAT "/%"
      G_GUINT64_FORMAT, sq->id, size.visible, sq->max_size.visible,
      size.bytes, sq->max_size.bytes, sq->cur_time, sq->max_size.time);

  /* get bytes and time percentages and take the max */
  if (sq->is_eos) {
    percent = 100;
  } else {
    percent = 0;
    if (sq->max_size.time > 0) {
      tmp = (sq->cur_time * 100) / sq->max_size.time;
      percent = MAX (percent, tmp);
    }
    if (sq->max_size.bytes > 0) {
      tmp = (size.bytes * 100) / sq->max_size.bytes;
      percent = MAX (percent, tmp);
    }
  }

  if (mq->buffering) {
    post = TRUE;
    if (percent >= mq->high_percent) {
      mq->buffering = FALSE;
    }
827
828
829
830
831
832
833
834
835
    /* make sure it increases */
    percent = MAX (mq->percent, percent);

    if (percent == mq->percent)
      /* don't post if nothing changed */
      post = FALSE;
    else
      /* else keep last value we posted */
      mq->percent = percent;
836
837
838
  } else {
    if (percent < mq->low_percent) {
      mq->buffering = TRUE;
839
      mq->percent = percent;
840
841
842
843
844
845
846
847
848
849
850
851
852
853
854
855
856
857
858
859
860
      post = TRUE;
    }
  }
  if (post) {
    GstMessage *message;

    /* scale to high percent so that it becomes the 100% mark */
    percent = percent * 100 / mq->high_percent;
    /* clip */
    if (percent > 100)
      percent = 100;

    GST_DEBUG_OBJECT (mq, "buffering %d percent", percent);
    message = gst_message_new_buffering (GST_OBJECT_CAST (mq), percent);

    gst_element_post_message (GST_ELEMENT_CAST (mq), message);
  } else {
    GST_DEBUG_OBJECT (mq, "filled %d percent", percent);
  }
}

861
/* calculate the diff between running time on the sink and src of the queue.
862
863
 * This is the total amount of time in the queue. 
 * WITH LOCK TAKEN */
864
865
866
867
868
static void
update_time_level (GstMultiQueue * mq, GstSingleQueue * sq)
{
  gint64 sink_time, src_time;

869
870
871
  if (sq->sink_tainted) {
    sink_time = sq->sinktime =
        gst_segment_to_running_time (&sq->sink_segment, GST_FORMAT_TIME,
Wim Taymans's avatar
Wim Taymans committed
872
        sq->sink_segment.position);
873
874
875
876

    if (G_UNLIKELY (sink_time != GST_CLOCK_TIME_NONE))
      /* if we have a time, we become untainted and use the time */
      sq->sink_tainted = FALSE;
877
878
879
880
881
882
  } else
    sink_time = sq->sinktime;

  if (sq->src_tainted) {
    src_time = sq->srctime =
        gst_segment_to_running_time (&sq->src_segment, GST_FORMAT_TIME,
Wim Taymans's avatar
Wim Taymans committed
883
        sq->src_segment.position);
884
885
886
    /* if we have a time, we become untainted and use the time */
    if (G_UNLIKELY (src_time != GST_CLOCK_TIME_NONE))
      sq->src_tainted = FALSE;
887
888
  } else
    src_time = sq->srctime;
889
890
891
892
893

  GST_DEBUG_OBJECT (mq,
      "queue %d, sink %" GST_TIME_FORMAT ", src %" GST_TIME_FORMAT, sq->id,
      GST_TIME_ARGS (sink_time), GST_TIME_ARGS (src_time));

894
  /* This allows for streams with out of order timestamping - sometimes the
895
   * emerging timestamp is later than the arriving one(s) */
896
897
898
899
  if (G_LIKELY (sink_time != -1 && src_time != -1 && sink_time > src_time))
    sq->cur_time = sink_time - src_time;
  else
    sq->cur_time = 0;
900

901
902
  /* updating the time level can change the buffering state */
  update_buffering (mq, sq);
903

904
  return;
905
906
}

Wim Taymans's avatar
Wim Taymans committed
907
/* take a SEGMENT event and apply the values to segment, updating the time
908
909
910
911
912
 * level of queue. */
static void
apply_segment (GstMultiQueue * mq, GstSingleQueue * sq, GstEvent * event,
    GstSegment * segment)
{
913
  gst_event_copy_segment (event, segment);
914
915
916

  /* now configure the values, we use these to track timestamps on the
   * sinkpad. */
Wim Taymans's avatar
Wim Taymans committed
917
  if (segment->format != GST_FORMAT_TIME) {
918
919
    /* non-time format, pretent the current time segment is closed with a
     * 0 start and unknown stop time. */
Wim Taymans's avatar
Wim Taymans committed
920
921
922
923
    segment->format = GST_FORMAT_TIME;
    segment->start = 0;
    segment->stop = -1;
    segment->time = 0;
924
  }
925
926
  GST_MULTI_QUEUE_MUTEX_LOCK (mq);

927
928
929
930
931
  if (segment == &sq->sink_segment)
    sq->sink_tainted = TRUE;
  else
    sq->src_tainted = TRUE;

932
  GST_DEBUG_OBJECT (mq,
Wim Taymans's avatar
Wim Taymans committed
933
      "queue %d, configured SEGMENT %" GST_SEGMENT_FORMAT, sq->id, segment);
934
935
936

  /* segment can update the time level of the queue */
  update_time_level (mq, sq);
937
938

  GST_MULTI_QUEUE_MUTEX_UNLOCK (mq);
939
940
941
942
}

/* take a buffer and update segment, updating the time level of the queue. */
static void
943
944
apply_buffer (GstMultiQueue * mq, GstSingleQueue * sq, GstClockTime timestamp,
    GstClockTime duration, GstSegment * segment)
945
{
946
947
  GST_MULTI_QUEUE_MUTEX_LOCK (mq);

948
949
950
  /* if no timestamp is set, assume it's continuous with the previous 
   * time */
  if (timestamp == GST_CLOCK_TIME_NONE)
Wim Taymans's avatar
Wim Taymans committed
951
    timestamp = segment->position;
952
953
954
955
956

  /* add duration */
  if (duration != GST_CLOCK_TIME_NONE)
    timestamp += duration;

Wim Taymans's avatar
Wim Taymans committed
957
  GST_DEBUG_OBJECT (mq, "queue %d, position updated to %" GST_TIME_FORMAT,
958
959
      sq->id, GST_TIME_ARGS (timestamp));

Wim Taymans's avatar
Wim Taymans committed
960
  segment->position = timestamp;
961

962
963
964
965
966
  if (segment == &sq->sink_segment)
    sq->sink_tainted = TRUE;
  else
    sq->src_tainted = TRUE;

967
968
  /* calc diff with other end */
  update_time_level (mq, sq);
969
  GST_MULTI_QUEUE_MUTEX_UNLOCK (mq);
970
971
}

972
973
974
975
976
977
978
979
980
981
982
983
984
985
986
987
988
989
static GstClockTime
get_running_time (GstSegment * segment, GstMiniObject * object, gboolean end)
{
  GstClockTime time = GST_CLOCK_TIME_NONE;

  if (GST_IS_BUFFER (object)) {
    GstBuffer *buf = GST_BUFFER_CAST (object);

    if (GST_BUFFER_TIMESTAMP_IS_VALID (buf)) {
      time = GST_BUFFER_TIMESTAMP (buf);
      if (end && GST_BUFFER_DURATION_IS_VALID (buf))
        time += GST_BUFFER_DURATION (buf);
      if (time > segment->stop)
        time = segment->stop;
      time = gst_segment_to_running_time (segment, GST_FORMAT_TIME, time);
    }
  } else if (GST_IS_BUFFER_LIST (object)) {
    GstBufferList *list = GST_BUFFER_LIST_CAST (object);
990
    gint i, n;
991
992
    GstBuffer *buf;

Wim Taymans's avatar
Wim Taymans committed
993
    n = gst_buffer_list_length (list);
994
995
996
997
998
999
1000
1001
1002
1003
    for (i = 0; i < n; i++) {
      buf = gst_buffer_list_get (list, i);
      if (GST_BUFFER_TIMESTAMP_IS_VALID (buf)) {
        time = GST_BUFFER_TIMESTAMP (buf);
        if (end && GST_BUFFER_DURATION_IS_VALID (buf))
          time += GST_BUFFER_DURATION (buf);
        if (time > segment->stop)
          time = segment->stop;
        time = gst_segment_to_running_time (segment, GST_FORMAT_TIME, time);
        if (!end)
1004
          goto done;
1005
1006
      } else if (!end) {
        goto done;
1007
      }
1008
    }
1009
1010
1011
1012
  } else if (GST_IS_EVENT (object)) {
    GstEvent *event = GST_EVENT_CAST (object);

    /* For newsegment events return the running time of the start position */
1013
    if (GST_EVENT_TYPE (event) == GST_EVENT_SEGMENT) {
1014
      const GstSegment *new_segment;
1015

1016
      gst_event_parse_segment (event, &new_segment);
1017
      if (new_segment->format == GST_FORMAT_TIME) {
1018
        time =
1019
1020
            gst_segment_to_running_time (new_segment, GST_FORMAT_TIME,
            new_segment->start);
1021
1022
1023
1024
1025
1026
1027
1028
      }
    }
  }

done:
  return time;
}

1029
static GstFlowReturn
1030
1031
1032
gst_single_queue_push_one (GstMultiQueue * mq, GstSingleQueue * sq,
    GstMiniObject * object)
{
1033
1034
  GstFlowReturn result = GST_FLOW_OK;

1035
  if (GST_IS_BUFFER (object)) {
1036
    GstBuffer *buffer;
1037
    GstClockTime timestamp, duration;
1038

1039
    buffer = GST_BUFFER_CAST (object);
1040
1041
    timestamp = GST_BUFFER_TIMESTAMP (buffer);
    duration = GST_BUFFER_DURATION (buffer);
1042

1043
    apply_buffer (mq, sq, timestamp, duration, &sq->src_segment);
1044

1045
1046
1047
    /* Applying the buffer may have made the queue non-full again, unblock it if needed */
    gst_data_queue_limits_changed (sq->queue);

1048
1049
1050
1051
    GST_DEBUG_OBJECT (mq,
        "SingleQueue %d : Pushing buffer %p with ts %" GST_TIME_FORMAT,
        sq->id, buffer, GST_TIME_ARGS (timestamp));

1052
    result = gst_pad_push (sq->srcpad, buffer);
1053
  } else if (GST_IS_EVENT (object)) {
1054
1055
    GstEvent *event;

1056
1057
1058
1059
    event = GST_EVENT_CAST (object);

    switch (GST_EVENT_TYPE (event)) {
      case GST_EVENT_EOS:
1060
        result = GST_FLOW_EOS;
1061
        break;
Wim Taymans's avatar
Wim Taymans committed
1062
      case GST_EVENT_SEGMENT:
1063
        apply_segment (mq, sq, event, &sq->src_segment);
1064
1065
        /* Applying the segment may have made the queue non-full again, unblock it if needed */
        gst_data_queue_limits_changed (sq->queue);
1066
1067
1068
        break;
      default:
        break;
1069
    }
1070

1071
1072
1073
1074
    GST_DEBUG_OBJECT (mq,
        "SingleQueue %d : Pushing event %p of type %s",
        sq->id, event, GST_EVENT_TYPE_NAME (event));

1075
    gst_pad_push_event (sq->srcpad, event);
1076
1077
1078
1079
  } else {
    g_warning ("Unexpected object in singlequeue %d (refcounting problem?)",
        sq->id);
  }
1080
  return result;
1081

1082
1083
1084
1085
1086
1087
1088
1089
1090
1091
1092
1093
  /* ERRORS */
}

static GstMiniObject *
gst_multi_queue_item_steal_object (GstMultiQueueItem * item)
{
  GstMiniObject *res;

  res = item->object;
  item->object = NULL;

  return res;
1094
1095
1096
1097
1098
}

static void
gst_multi_queue_item_destroy (GstMultiQueueItem * item)
{
1099
1100
  if (item->object)
    gst_mini_object_unref (item->object);
1101
  g_slice_free (GstMultiQueueItem, item);
1102
1103
}

1104
/* takes ownership of passed mini object! */
1105
static GstMultiQueueItem *
1106
gst_multi_queue_buffer_item_new (GstMiniObject * object, guint32 curid)
1107
1108
1109
{
  GstMultiQueueItem *item;

1110
  item = g_slice_new (GstMultiQueueItem);
1111
  item->object = object;
1112
  item->destroy = (GDestroyNotify) gst_multi_queue_item_destroy;
1113
  item->posid = curid;
1114

1115
  item->size = gst_buffer_get_size (GST_BUFFER_CAST (object));
1116
1117
  item->duration = GST_BUFFER_DURATION (object);
  if (item->duration == GST_CLOCK_TIME_NONE)
1118
    item->duration = 0;
1119
1120
1121
1122
1123
1124
1125
1126
1127
1128
1129
1130
1131
1132
1133
1134
1135
  item->visible = TRUE;
  return item;
}

static GstMultiQueueItem *
gst_multi_queue_event_item_new (GstMiniObject * object, guint32 curid)
{
  GstMultiQueueItem *item;

  item = g_slice_new (GstMultiQueueItem);
  item->object = object;
  item->destroy = (GDestroyNotify) gst_multi_queue_item_destroy;
  item->posid = curid;

  item->size = 0;
  item->duration = 0;
  item->visible = FALSE;
1136
1137
1138
  return item;
}

1139
1140
1141
1142
/* Each main loop attempts to push buffers until the return value
 * is not-linked. not-linked pads are not allowed to push data beyond
 * any linked pads, so they don't 'rush ahead of the pack'.
 */
1143
1144
1145
1146
1147
1148
1149
static void
gst_multi_queue_loop (GstPad * pad)
{
  GstSingleQueue *sq;
  GstMultiQueueItem *item;
  GstDataQueueItem *sitem;
  GstMultiQueue *mq;
1150
  GstMiniObject *object = NULL;
1151
  guint32 newid;
1152
  GstFlowReturn result;
1153
  GstClockTime next_time;
1154
1155

  sq = (GstSingleQueue *) gst_pad_get_element_private (pad);
1156
  mq = sq->mqueue;
1157

1158
1159
  GST_DEBUG_OBJECT (mq, "SingleQueue %d : trying to pop an object", sq->id);

1160
1161
1162
  if (sq->flushing)
    goto out_flushing;

1163
1164
1165
1166
1167
1168
1169
1170