gstmultiqueue.c 60 KB
Newer Older
1 2
/* GStreamer
 * Copyright (C) 2006 Edward Hervey <edward@fluendo.com>
3 4
 * Copyright (C) 2007 Jan Schmidt <jan@fluendo.com>
 * Copyright (C) 2007 Wim Taymans <wim@fluendo.com>
5
 * Copyright (C) 2011 Sebastian Dröge <sebastian.droege@collabora.co.uk>
6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24
 *
 * gstmultiqueue.c:
 *
 * This library is free software; you can redistribute it and/or
 * modify it under the terms of the GNU Library General Public
 * License as published by the Free Software Foundation; either
 * version 2 of the License, or (at your option) any later version.
 *
 * This library is distributed in the hope that it will be useful,
 * but WITHOUT ANY WARRANTY; without even the implied warranty of
 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
 * Library General Public License for more details.
 *
 * You should have received a copy of the GNU Library General Public
 * License along with this library; if not, write to the
 * Free Software Foundation, Inc., 59 Temple Place - Suite 330,
 * Boston, MA 02111-1307, USA.
 */

25 26 27 28 29 30 31 32 33 34 35 36 37
/**
 * SECTION:element-multiqueue
 * @see_also: #GstQueue
 *
 * <refsect2>
 * <para>
 * Multiqueue is similar to a normal #GstQueue with the following additional
 * features:
 * <orderedlist>
 * <listitem>
 *   <itemizedlist><title>Multiple streamhandling</title>
 *   <listitem><para>
 *     The element handles queueing data on more than one stream at once. To
38 39
 *     achieve such a feature it has request sink pads (sink&percnt;d) and
 *     'sometimes' src pads (src&percnt;d).
40 41 42
 *   </para><para>
 *     When requesting a given sinkpad with gst_element_get_request_pad(),
 *     the associated srcpad for that stream will be created.
43
 *     Example: requesting sink1 will generate src1.
44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107
 *   </para></listitem>
 *   </itemizedlist>
 * </listitem>
 * <listitem>
 *   <itemizedlist><title>Non-starvation on multiple streams</title>
 *   <listitem><para>
 *     If more than one stream is used with the element, the streams' queues
 *     will be dynamically grown (up to a limit), in order to ensure that no
 *     stream is risking data starvation. This guarantees that at any given
 *     time there are at least N bytes queued and available for each individual
 *     stream.
 *   </para><para>
 *     If an EOS event comes through a srcpad, the associated queue will be
 *     considered as 'not-empty' in the queue-size-growing algorithm.
 *   </para></listitem>
 *   </itemizedlist>
 * </listitem>
 * <listitem>
 *   <itemizedlist><title>Non-linked srcpads graceful handling</title>
 *   <listitem><para>
 *     In order to better support dynamic switching between streams, the multiqueue
 *     (unlike the current GStreamer queue) continues to push buffers on non-linked
 *     pads rather than shutting down.
 *   </para><para>
 *     In addition, to prevent a non-linked stream from very quickly consuming all
 *     available buffers and thus 'racing ahead' of the other streams, the element
 *     must ensure that buffers and inlined events for a non-linked stream are pushed
 *     in the same order as they were received, relative to the other streams
 *     controlled by the element. This means that a buffer cannot be pushed to a
 *     non-linked pad any sooner than buffers in any other stream which were received
 *     before it.
 *   </para></listitem>
 *   </itemizedlist>
 * </listitem>
 * </orderedlist>
 * </para>
 * <para>
 *   Data is queued until one of the limits specified by the
 *   #GstMultiQueue:max-size-buffers, #GstMultiQueue:max-size-bytes and/or
 *   #GstMultiQueue:max-size-time properties has been reached. Any attempt to push
 *   more buffers into the queue will block the pushing thread until more space
 *   becomes available. #GstMultiQueue:extra-size-buffers,
 * </para>
 * <para>
 *   #GstMultiQueue:extra-size-bytes and #GstMultiQueue:extra-size-time are
 *   currently unused.
 * </para>
 * <para>
 *   The default queue size limits are 5 buffers, 10MB of data, or
 *   two second worth of data, whichever is reached first. Note that the number
 *   of buffers will dynamically grow depending on the fill level of 
 *   other queues.
 * </para>
 * <para>
 *   The #GstMultiQueue::underrun signal is emitted when all of the queues
 *   are empty. The #GstMultiQueue::overrun signal is emitted when one of the
 *   queues is filled.
 *   Both signals are emitted from the context of the streaming thread.
 * </para>
 * </refsect2>
 *
 * Last reviewed on 2008-01-25 (0.10.17)
 */

108 109 110 111 112
#ifdef HAVE_CONFIG_H
#  include "config.h"
#endif

#include <gst/gst.h>
113
#include <stdio.h>
114
#include "gstmultiqueue.h"
115
#include <gst/glib-compat-private.h>
116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138

/**
 * GstSingleQueue:
 * @sinkpad: associated sink #GstPad
 * @srcpad: associated source #GstPad
 *
 * Structure containing all information and properties about
 * a single queue.
 */
typedef struct _GstSingleQueue GstSingleQueue;

struct _GstSingleQueue
{
  /* unique identifier of the queue */
  guint id;

  GstMultiQueue *mqueue;

  GstPad *sinkpad;
  GstPad *srcpad;

  /* flowreturn of previous srcpad push */
  GstFlowReturn srcresult;
139 140

  /* segments */
141 142
  GstSegment sink_segment;
  GstSegment src_segment;
143

144 145 146 147 148
  /* position of src/sink */
  GstClockTime sinktime, srctime;
  /* TRUE if either position needs to be recalculated */
  gboolean sink_tainted, src_tainted;

149 150 151
  /* queue of data */
  GstDataQueue *queue;
  GstDataQueueSize max_size, extra_size;
152 153
  GstClockTime cur_time;
  gboolean is_eos;
154
  gboolean flushing;
155 156 157 158

  /* Protected by global lock */
  guint32 nextid;               /* ID of the next object waiting to be pushed */
  guint32 oldid;                /* ID of the last object pushed (last in a series) */
159
  guint32 last_oldid;           /* Previously observed old_id, reset to MAXUINT32 on flush */
160 161
  GstClockTime next_time;       /* End running time of next buffer to be pushed */
  GstClockTime last_time;       /* Start running time of last pushed buffer */
Wim Taymans's avatar
Wim Taymans committed
162
  GCond turn;                   /* SingleQueue turn waiting conditional */
163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179
};


/* Extension of GstDataQueueItem structure for our usage */
typedef struct _GstMultiQueueItem GstMultiQueueItem;

struct _GstMultiQueueItem
{
  GstMiniObject *object;
  guint size;
  guint64 duration;
  gboolean visible;

  GDestroyNotify destroy;
  guint32 posid;
};

Wim Taymans's avatar
Wim Taymans committed
180
static GstSingleQueue *gst_single_queue_new (GstMultiQueue * mqueue, guint id);
181
static void gst_single_queue_free (GstSingleQueue * squeue);
182 183

static void wake_up_next_non_linked (GstMultiQueue * mq);
184
static void compute_high_id (GstMultiQueue * mq);
185
static void compute_high_time (GstMultiQueue * mq);
186
static void single_queue_overrun_cb (GstDataQueue * dq, GstSingleQueue * sq);
187
static void single_queue_underrun_cb (GstDataQueue * dq, GstSingleQueue * sq);
188

Wim Taymans's avatar
Wim Taymans committed
189
static GstStaticPadTemplate sinktemplate = GST_STATIC_PAD_TEMPLATE ("sink_%u",
190 191 192 193
    GST_PAD_SINK,
    GST_PAD_REQUEST,
    GST_STATIC_CAPS_ANY);

Wim Taymans's avatar
Wim Taymans committed
194
static GstStaticPadTemplate srctemplate = GST_STATIC_PAD_TEMPLATE ("src_%u",
195 196 197 198 199 200 201
    GST_PAD_SRC,
    GST_PAD_SOMETIMES,
    GST_STATIC_CAPS_ANY);

GST_DEBUG_CATEGORY_STATIC (multi_queue_debug);
#define GST_CAT_DEFAULT (multi_queue_debug)

Wim Taymans's avatar
Wim Taymans committed
202 203 204 205 206 207 208 209
/* Signals and args */
enum
{
  SIGNAL_UNDERRUN,
  SIGNAL_OVERRUN,
  LAST_SIGNAL
};

210 211 212 213
/* default limits, we try to keep up to 2 seconds of data and if there is not
 * time, up to 10 MB. The number of buffers is dynamically scaled to make sure
 * there is data in the queues. Normally, the byte and time limits are not hit
 * in theses conditions. */
214
#define DEFAULT_MAX_SIZE_BYTES 10 * 1024 * 1024 /* 10 MB */
215 216
#define DEFAULT_MAX_SIZE_BUFFERS 5
#define DEFAULT_MAX_SIZE_TIME 2 * GST_SECOND
217

218 219
/* second limits. When we hit one of the above limits we are probably dealing
 * with a badly muxed file and we scale the limits to these emergency values.
Wim Taymans's avatar
Wim Taymans committed
220 221 222 223
 * This is currently not yet implemented.
 * Since we dynamically scale the queue buffer size up to the limits but avoid
 * going above the max-size-buffers when we can, we don't really need this
 * aditional extra size. */
224
#define DEFAULT_EXTRA_SIZE_BYTES 10 * 1024 * 1024       /* 10 MB */
225 226
#define DEFAULT_EXTRA_SIZE_BUFFERS 5
#define DEFAULT_EXTRA_SIZE_TIME 3 * GST_SECOND
227

228
#define DEFAULT_USE_BUFFERING FALSE
229 230
#define DEFAULT_LOW_PERCENT   10
#define DEFAULT_HIGH_PERCENT  99
231
#define DEFAULT_SYNC_BY_RUNNING_TIME FALSE
232

233 234
enum
{
Wim Taymans's avatar
Wim Taymans committed
235 236 237 238 239 240 241
  PROP_0,
  PROP_EXTRA_SIZE_BYTES,
  PROP_EXTRA_SIZE_BUFFERS,
  PROP_EXTRA_SIZE_TIME,
  PROP_MAX_SIZE_BYTES,
  PROP_MAX_SIZE_BUFFERS,
  PROP_MAX_SIZE_TIME,
242
  PROP_USE_BUFFERING,
243 244
  PROP_LOW_PERCENT,
  PROP_HIGH_PERCENT,
245
  PROP_SYNC_BY_RUNNING_TIME,
Wim Taymans's avatar
Wim Taymans committed
246
  PROP_LAST
247 248 249
};

#define GST_MULTI_QUEUE_MUTEX_LOCK(q) G_STMT_START {                          \
Wim Taymans's avatar
Wim Taymans committed
250
  g_mutex_lock (&q->qlock);                                              \
251 252 253
} G_STMT_END

#define GST_MULTI_QUEUE_MUTEX_UNLOCK(q) G_STMT_START {                        \
Wim Taymans's avatar
Wim Taymans committed
254
  g_mutex_unlock (&q->qlock);                                            \
255 256 257 258 259 260 261 262 263
} G_STMT_END

static void gst_multi_queue_finalize (GObject * object);
static void gst_multi_queue_set_property (GObject * object,
    guint prop_id, const GValue * value, GParamSpec * pspec);
static void gst_multi_queue_get_property (GObject * object,
    guint prop_id, GValue * value, GParamSpec * pspec);

static GstPad *gst_multi_queue_request_new_pad (GstElement * element,
264
    GstPadTemplate * temp, const gchar * name, const GstCaps * caps);
265
static void gst_multi_queue_release_pad (GstElement * element, GstPad * pad);
266 267
static GstStateChangeReturn gst_multi_queue_change_state (GstElement *
    element, GstStateChange transition);
268

269 270
static void gst_multi_queue_loop (GstPad * pad);

271
#define _do_init \
272
  GST_DEBUG_CATEGORY_INIT (multi_queue_debug, "multiqueue", 0, "multiqueue element");
273 274 275
#define gst_multi_queue_parent_class parent_class
G_DEFINE_TYPE_WITH_CODE (GstMultiQueue, gst_multi_queue, GST_TYPE_ELEMENT,
    _do_init);
276 277 278 279 280 281 282 283 284

static guint gst_multi_queue_signals[LAST_SIGNAL] = { 0 };

static void
gst_multi_queue_class_init (GstMultiQueueClass * klass)
{
  GObjectClass *gobject_class = G_OBJECT_CLASS (klass);
  GstElementClass *gstelement_class = GST_ELEMENT_CLASS (klass);

285 286
  gobject_class->set_property = gst_multi_queue_set_property;
  gobject_class->get_property = gst_multi_queue_get_property;
287 288

  /* SIGNALS */
289 290 291 292 293 294 295 296 297 298

  /**
   * GstMultiQueue::underrun:
   * @multiqueue: the multqueue instance
   *
   * This signal is emitted from the streaming thread when there is
   * no data in any of the queues inside the multiqueue instance (underrun).
   *
   * This indicates either starvation or EOS from the upstream data sources.
   */
299 300 301 302 303
  gst_multi_queue_signals[SIGNAL_UNDERRUN] =
      g_signal_new ("underrun", G_TYPE_FROM_CLASS (klass), G_SIGNAL_RUN_FIRST,
      G_STRUCT_OFFSET (GstMultiQueueClass, underrun), NULL, NULL,
      g_cclosure_marshal_VOID__VOID, G_TYPE_NONE, 0);

304 305 306 307 308 309 310 311 312 313 314
  /**
   * GstMultiQueue::overrun:
   * @multiqueue: the multiqueue instance
   *
   * Reports that one of the queues in the multiqueue is full (overrun).
   * A queue is full if the total amount of data inside it (num-buffers, time,
   * size) is higher than the boundary values which can be set through the
   * GObject properties.
   *
   * This can be used as an indicator of pre-roll. 
   */
315 316 317 318 319 320 321
  gst_multi_queue_signals[SIGNAL_OVERRUN] =
      g_signal_new ("overrun", G_TYPE_FROM_CLASS (klass), G_SIGNAL_RUN_FIRST,
      G_STRUCT_OFFSET (GstMultiQueueClass, overrun), NULL, NULL,
      g_cclosure_marshal_VOID__VOID, G_TYPE_NONE, 0);

  /* PROPERTIES */

Wim Taymans's avatar
Wim Taymans committed
322
  g_object_class_install_property (gobject_class, PROP_MAX_SIZE_BYTES,
323 324
      g_param_spec_uint ("max-size-bytes", "Max. size (kB)",
          "Max. amount of data in the queue (bytes, 0=disable)",
325 326
          0, G_MAXUINT, DEFAULT_MAX_SIZE_BYTES,
          G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
Wim Taymans's avatar
Wim Taymans committed
327
  g_object_class_install_property (gobject_class, PROP_MAX_SIZE_BUFFERS,
328
      g_param_spec_uint ("max-size-buffers", "Max. size (buffers)",
329 330 331
          "Max. number of buffers in the queue (0=disable)", 0, G_MAXUINT,
          DEFAULT_MAX_SIZE_BUFFERS,
          G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
Wim Taymans's avatar
Wim Taymans committed
332
  g_object_class_install_property (gobject_class, PROP_MAX_SIZE_TIME,
333
      g_param_spec_uint64 ("max-size-time", "Max. size (ns)",
334 335
          "Max. amount of data in the queue (in ns, 0=disable)", 0, G_MAXUINT64,
          DEFAULT_MAX_SIZE_TIME, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
336

Wim Taymans's avatar
Wim Taymans committed
337
  g_object_class_install_property (gobject_class, PROP_EXTRA_SIZE_BYTES,
338
      g_param_spec_uint ("extra-size-bytes", "Extra Size (kB)",
Wim Taymans's avatar
Wim Taymans committed
339 340
          "Amount of data the queues can grow if one of them is empty (bytes, 0=disable)"
          " (NOT IMPLEMENTED)",
341 342
          0, G_MAXUINT, DEFAULT_EXTRA_SIZE_BYTES,
          G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
Wim Taymans's avatar
Wim Taymans committed
343
  g_object_class_install_property (gobject_class, PROP_EXTRA_SIZE_BUFFERS,
344
      g_param_spec_uint ("extra-size-buffers", "Extra Size (buffers)",
Wim Taymans's avatar
Wim Taymans committed
345 346
          "Amount of buffers the queues can grow if one of them is empty (0=disable)"
          " (NOT IMPLEMENTED)",
347 348
          0, G_MAXUINT, DEFAULT_EXTRA_SIZE_BUFFERS,
          G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
Wim Taymans's avatar
Wim Taymans committed
349
  g_object_class_install_property (gobject_class, PROP_EXTRA_SIZE_TIME,
350
      g_param_spec_uint64 ("extra-size-time", "Extra Size (ns)",
Wim Taymans's avatar
Wim Taymans committed
351 352
          "Amount of time the queues can grow if one of them is empty (in ns, 0=disable)"
          " (NOT IMPLEMENTED)",
353 354
          0, G_MAXUINT64, DEFAULT_EXTRA_SIZE_TIME,
          G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
355

356 357 358 359 360 361 362 363 364 365
  /**
   * GstMultiQueue:use-buffering
   * 
   * Enable the buffering option in multiqueue so that BUFFERING messages are
   * emited based on low-/high-percent thresholds.
   *
   * Since: 0.10.26
   */
  g_object_class_install_property (gobject_class, PROP_USE_BUFFERING,
      g_param_spec_boolean ("use-buffering", "Use buffering",
366
          "Emit GST_MESSAGE_BUFFERING based on low-/high-percent thresholds",
367
          DEFAULT_USE_BUFFERING, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
368 369 370 371 372 373 374 375 376
  /**
   * GstMultiQueue:low-percent
   * 
   * Low threshold percent for buffering to start.
   *
   * Since: 0.10.26
   */
  g_object_class_install_property (gobject_class, PROP_LOW_PERCENT,
      g_param_spec_int ("low-percent", "Low percent",
377
          "Low threshold for buffering to start", 0, 100,
378 379 380 381 382 383 384 385 386 387
          DEFAULT_LOW_PERCENT, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
  /**
   * GstMultiQueue:high-percent
   * 
   * High threshold percent for buffering to finish.
   *
   * Since: 0.10.26
   */
  g_object_class_install_property (gobject_class, PROP_HIGH_PERCENT,
      g_param_spec_int ("high-percent", "High percent",
388
          "High threshold for buffering to finish", 0, 100,
389 390
          DEFAULT_HIGH_PERCENT, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));

391 392 393 394 395 396 397 398 399
  /**
   * GstMultiQueue:sync-by-running-time
   * 
   * If enabled multiqueue will synchronize deactivated or not-linked streams
   * to the activated and linked streams by taking the running time.
   * Otherwise multiqueue will synchronize the deactivated or not-linked
   * streams by keeping the order in which buffers and events arrived compared
   * to active and linked streams.
   *
400
   * Since: 0.10.36
401 402 403 404 405 406
   */
  g_object_class_install_property (gobject_class, PROP_SYNC_BY_RUNNING_TIME,
      g_param_spec_boolean ("sync-by-running-time", "Sync By Running Time",
          "Synchronize deactivated or not-linked streams by running time",
          DEFAULT_SYNC_BY_RUNNING_TIME,
          G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
407

408
  gobject_class->finalize = gst_multi_queue_finalize;
409

410 411 412 413 414 415 416 417
  gst_element_class_set_details_simple (gstelement_class,
      "MultiQueue",
      "Generic", "Multiple data queue", "Edward Hervey <edward@fluendo.com>");
  gst_element_class_add_pad_template (gstelement_class,
      gst_static_pad_template_get (&sinktemplate));
  gst_element_class_add_pad_template (gstelement_class,
      gst_static_pad_template_get (&srctemplate));

418 419 420 421
  gstelement_class->request_new_pad =
      GST_DEBUG_FUNCPTR (gst_multi_queue_request_new_pad);
  gstelement_class->release_pad =
      GST_DEBUG_FUNCPTR (gst_multi_queue_release_pad);
422 423
  gstelement_class->change_state =
      GST_DEBUG_FUNCPTR (gst_multi_queue_change_state);
424 425 426
}

static void
427
gst_multi_queue_init (GstMultiQueue * mqueue)
428 429 430 431 432 433 434 435 436 437 438 439
{
  mqueue->nbqueues = 0;
  mqueue->queues = NULL;

  mqueue->max_size.bytes = DEFAULT_MAX_SIZE_BYTES;
  mqueue->max_size.visible = DEFAULT_MAX_SIZE_BUFFERS;
  mqueue->max_size.time = DEFAULT_MAX_SIZE_TIME;

  mqueue->extra_size.bytes = DEFAULT_EXTRA_SIZE_BYTES;
  mqueue->extra_size.visible = DEFAULT_EXTRA_SIZE_BUFFERS;
  mqueue->extra_size.time = DEFAULT_EXTRA_SIZE_TIME;

440
  mqueue->use_buffering = DEFAULT_USE_BUFFERING;
441 442
  mqueue->low_percent = DEFAULT_LOW_PERCENT;
  mqueue->high_percent = DEFAULT_HIGH_PERCENT;
443

444 445
  mqueue->sync_by_running_time = DEFAULT_SYNC_BY_RUNNING_TIME;

446
  mqueue->counter = 1;
447
  mqueue->highid = -1;
448
  mqueue->high_time = GST_CLOCK_TIME_NONE;
449

Wim Taymans's avatar
Wim Taymans committed
450
  g_mutex_init (&mqueue->qlock);
451 452 453 454 455 456 457
}

static void
gst_multi_queue_finalize (GObject * object)
{
  GstMultiQueue *mqueue = GST_MULTI_QUEUE (object);

458
  g_list_foreach (mqueue->queues, (GFunc) gst_single_queue_free, NULL);
459
  g_list_free (mqueue->queues);
460
  mqueue->queues = NULL;
461
  mqueue->queues_cookie++;
462 463

  /* free/unref instance data */
Wim Taymans's avatar
Wim Taymans committed
464
  g_mutex_clear (&mqueue->qlock);
465

466
  G_OBJECT_CLASS (parent_class)->finalize (object);
467 468
}

469 470 471 472
#define SET_CHILD_PROPERTY(mq,format) G_STMT_START {	        \
    GList * tmp = mq->queues;					\
    while (tmp) {						\
      GstSingleQueue *q = (GstSingleQueue*)tmp->data;		\
473
      q->max_size.format = mq->max_size.format;                 \
474 475
      tmp = g_list_next(tmp);					\
    };								\
476 477 478 479 480 481 482 483 484
} G_STMT_END

static void
gst_multi_queue_set_property (GObject * object, guint prop_id,
    const GValue * value, GParamSpec * pspec)
{
  GstMultiQueue *mq = GST_MULTI_QUEUE (object);

  switch (prop_id) {
Wim Taymans's avatar
Wim Taymans committed
485
    case PROP_MAX_SIZE_BYTES:
486
      GST_MULTI_QUEUE_MUTEX_LOCK (mq);
487
      mq->max_size.bytes = g_value_get_uint (value);
488
      SET_CHILD_PROPERTY (mq, bytes);
489
      GST_MULTI_QUEUE_MUTEX_UNLOCK (mq);
490
      break;
Wim Taymans's avatar
Wim Taymans committed
491
    case PROP_MAX_SIZE_BUFFERS:
492
      GST_MULTI_QUEUE_MUTEX_LOCK (mq);
493
      mq->max_size.visible = g_value_get_uint (value);
494
      SET_CHILD_PROPERTY (mq, visible);
495
      GST_MULTI_QUEUE_MUTEX_UNLOCK (mq);
496
      break;
Wim Taymans's avatar
Wim Taymans committed
497
    case PROP_MAX_SIZE_TIME:
498
      GST_MULTI_QUEUE_MUTEX_LOCK (mq);
499
      mq->max_size.time = g_value_get_uint64 (value);
500
      SET_CHILD_PROPERTY (mq, time);
501
      GST_MULTI_QUEUE_MUTEX_UNLOCK (mq);
502
      break;
Wim Taymans's avatar
Wim Taymans committed
503
    case PROP_EXTRA_SIZE_BYTES:
504 505
      mq->extra_size.bytes = g_value_get_uint (value);
      break;
Wim Taymans's avatar
Wim Taymans committed
506
    case PROP_EXTRA_SIZE_BUFFERS:
507 508
      mq->extra_size.visible = g_value_get_uint (value);
      break;
Wim Taymans's avatar
Wim Taymans committed
509
    case PROP_EXTRA_SIZE_TIME:
510 511
      mq->extra_size.time = g_value_get_uint64 (value);
      break;
512 513 514
    case PROP_USE_BUFFERING:
      mq->use_buffering = g_value_get_boolean (value);
      break;
515 516 517 518 519 520
    case PROP_LOW_PERCENT:
      mq->low_percent = g_value_get_int (value);
      break;
    case PROP_HIGH_PERCENT:
      mq->high_percent = g_value_get_int (value);
      break;
521 522 523
    case PROP_SYNC_BY_RUNNING_TIME:
      mq->sync_by_running_time = g_value_get_boolean (value);
      break;
524 525 526 527 528 529 530 531 532 533 534 535 536 537 538
    default:
      G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
      break;
  }
}

static void
gst_multi_queue_get_property (GObject * object, guint prop_id,
    GValue * value, GParamSpec * pspec)
{
  GstMultiQueue *mq = GST_MULTI_QUEUE (object);

  GST_MULTI_QUEUE_MUTEX_LOCK (mq);

  switch (prop_id) {
Wim Taymans's avatar
Wim Taymans committed
539
    case PROP_EXTRA_SIZE_BYTES:
540 541
      g_value_set_uint (value, mq->extra_size.bytes);
      break;
Wim Taymans's avatar
Wim Taymans committed
542
    case PROP_EXTRA_SIZE_BUFFERS:
543 544
      g_value_set_uint (value, mq->extra_size.visible);
      break;
Wim Taymans's avatar
Wim Taymans committed
545
    case PROP_EXTRA_SIZE_TIME:
546 547
      g_value_set_uint64 (value, mq->extra_size.time);
      break;
Wim Taymans's avatar
Wim Taymans committed
548
    case PROP_MAX_SIZE_BYTES:
549 550
      g_value_set_uint (value, mq->max_size.bytes);
      break;
Wim Taymans's avatar
Wim Taymans committed
551
    case PROP_MAX_SIZE_BUFFERS:
552 553
      g_value_set_uint (value, mq->max_size.visible);
      break;
Wim Taymans's avatar
Wim Taymans committed
554
    case PROP_MAX_SIZE_TIME:
555 556
      g_value_set_uint64 (value, mq->max_size.time);
      break;
557 558 559
    case PROP_USE_BUFFERING:
      g_value_set_boolean (value, mq->use_buffering);
      break;
560 561 562 563 564 565
    case PROP_LOW_PERCENT:
      g_value_set_int (value, mq->low_percent);
      break;
    case PROP_HIGH_PERCENT:
      g_value_set_int (value, mq->high_percent);
      break;
566 567 568
    case PROP_SYNC_BY_RUNNING_TIME:
      g_value_set_boolean (value, mq->sync_by_running_time);
      break;
569 570 571 572 573 574 575 576
    default:
      G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
      break;
  }

  GST_MULTI_QUEUE_MUTEX_UNLOCK (mq);
}

577
static GstIterator *
Wim Taymans's avatar
Wim Taymans committed
578
gst_multi_queue_iterate_internal_links (GstPad * pad, GstObject * parent)
579
{
580 581 582
  GstIterator *it = NULL;
  GstPad *opad;
  GstSingleQueue *squeue;
Wim Taymans's avatar
Wim Taymans committed
583
  GstMultiQueue *mq = GST_MULTI_QUEUE (parent);
584
  GValue val = { 0, };
585

586 587 588 589
  GST_MULTI_QUEUE_MUTEX_LOCK (mq);
  squeue = gst_pad_get_element_private (pad);
  if (!squeue)
    goto out;
590

591 592 593 594 595 596
  if (squeue->sinkpad == pad)
    opad = gst_object_ref (squeue->srcpad);
  else if (squeue->srcpad == pad)
    opad = gst_object_ref (squeue->sinkpad);
  else
    goto out;
597

598 599 600 601
  g_value_init (&val, GST_TYPE_PAD);
  g_value_set_object (&val, opad);
  it = gst_iterator_new_single (GST_TYPE_PAD, &val);
  g_value_unset (&val);
602

603
  gst_object_unref (opad);
604

605 606
out:
  GST_MULTI_QUEUE_MUTEX_UNLOCK (mq);
607

608
  return it;
609 610
}

611

612 613 614 615 616 617
/*
 * GstElement methods
 */

static GstPad *
gst_multi_queue_request_new_pad (GstElement * element, GstPadTemplate * temp,
618
    const gchar * name, const GstCaps * caps)
619 620 621
{
  GstMultiQueue *mqueue = GST_MULTI_QUEUE (element);
  GstSingleQueue *squeue;
Wim Taymans's avatar
Wim Taymans committed
622
  guint temp_id = -1;
623

624
  if (name) {
Wim Taymans's avatar
Wim Taymans committed
625
    sscanf (name + 4, "_%u", &temp_id);
626
    GST_LOG_OBJECT (element, "name : %s (id %d)", GST_STR_NULL (name), temp_id);
627
  }
628 629

  /* Create a new single queue, add the sink and source pad and return the sink pad */
630
  squeue = gst_single_queue_new (mqueue, temp_id);
631 632 633 634

  GST_DEBUG_OBJECT (mqueue, "Returning pad %s:%s",
      GST_DEBUG_PAD_NAME (squeue->sinkpad));

635
  return squeue ? squeue->sinkpad : NULL;
636 637 638 639 640 641 642 643 644 645 646 647 648 649 650 651 652 653 654 655 656 657
}

static void
gst_multi_queue_release_pad (GstElement * element, GstPad * pad)
{
  GstMultiQueue *mqueue = GST_MULTI_QUEUE (element);
  GstSingleQueue *sq = NULL;
  GList *tmp;

  GST_LOG_OBJECT (element, "pad %s:%s", GST_DEBUG_PAD_NAME (pad));

  GST_MULTI_QUEUE_MUTEX_LOCK (mqueue);
  /* Find which single queue it belongs to, knowing that it should be a sinkpad */
  for (tmp = mqueue->queues; tmp; tmp = g_list_next (tmp)) {
    sq = (GstSingleQueue *) tmp->data;

    if (sq->sinkpad == pad)
      break;
  }

  if (!tmp) {
    GST_WARNING_OBJECT (mqueue, "That pad doesn't belong to this element ???");
658
    GST_MULTI_QUEUE_MUTEX_UNLOCK (mqueue);
659 660 661
    return;
  }

662 663 664
  /* FIXME: The removal of the singlequeue should probably not happen until it
   * finishes draining */

665 666
  /* remove it from the list */
  mqueue->queues = g_list_delete_link (mqueue->queues, tmp);
667
  mqueue->queues_cookie++;
668

669 670 671
  /* FIXME : recompute next-non-linked */
  GST_MULTI_QUEUE_MUTEX_UNLOCK (mqueue);

672 673 674
  /* delete SingleQueue */
  gst_data_queue_set_flushing (sq->queue, TRUE);

675 676
  gst_pad_set_active (sq->srcpad, FALSE);
  gst_pad_set_active (sq->sinkpad, FALSE);
677 678
  gst_pad_set_element_private (sq->srcpad, NULL);
  gst_pad_set_element_private (sq->sinkpad, NULL);
679 680
  gst_element_remove_pad (element, sq->srcpad);
  gst_element_remove_pad (element, sq->sinkpad);
681
  gst_single_queue_free (sq);
682 683
}

684 685 686 687 688 689 690 691 692 693 694 695 696 697 698 699 700 701 702 703 704 705 706 707 708 709 710 711
static GstStateChangeReturn
gst_multi_queue_change_state (GstElement * element, GstStateChange transition)
{
  GstMultiQueue *mqueue = GST_MULTI_QUEUE (element);
  GstSingleQueue *sq = NULL;
  GstStateChangeReturn result;

  switch (transition) {
    case GST_STATE_CHANGE_READY_TO_PAUSED:{
      GList *tmp;

      /* Set all pads to non-flushing */
      GST_MULTI_QUEUE_MUTEX_LOCK (mqueue);
      for (tmp = mqueue->queues; tmp; tmp = g_list_next (tmp)) {
        sq = (GstSingleQueue *) tmp->data;
        sq->flushing = FALSE;
      }
      GST_MULTI_QUEUE_MUTEX_UNLOCK (mqueue);
      break;
    }
    case GST_STATE_CHANGE_PAUSED_TO_READY:{
      GList *tmp;

      /* Un-wait all waiting pads */
      GST_MULTI_QUEUE_MUTEX_LOCK (mqueue);
      for (tmp = mqueue->queues; tmp; tmp = g_list_next (tmp)) {
        sq = (GstSingleQueue *) tmp->data;
        sq->flushing = TRUE;
Wim Taymans's avatar
Wim Taymans committed
712
        g_cond_signal (&sq->turn);
713 714 715 716 717 718 719 720 721 722 723 724 725 726 727 728 729 730 731 732 733
      }
      GST_MULTI_QUEUE_MUTEX_UNLOCK (mqueue);
      break;
    }
    default:
      break;
  }

  result = GST_ELEMENT_CLASS (parent_class)->change_state (element, transition);

  switch (transition) {
    default:
      break;
  }

  return result;



}

734
static gboolean
735 736 737 738 739 740 741 742
gst_single_queue_flush (GstMultiQueue * mq, GstSingleQueue * sq, gboolean flush)
{
  gboolean result;

  GST_DEBUG_OBJECT (mq, "flush %s queue %d", (flush ? "start" : "stop"),
      sq->id);

  if (flush) {
743
    sq->srcresult = GST_FLOW_FLUSHING;
744 745
    gst_data_queue_set_flushing (sq->queue, TRUE);

746 747
    sq->flushing = TRUE;

748 749 750 751
    /* wake up non-linked task */
    GST_LOG_OBJECT (mq, "SingleQueue %d : waking up eventually waiting task",
        sq->id);
    GST_MULTI_QUEUE_MUTEX_LOCK (mq);
Wim Taymans's avatar
Wim Taymans committed
752
    g_cond_signal (&sq->turn);
753 754 755 756
    GST_MULTI_QUEUE_MUTEX_UNLOCK (mq);

    GST_LOG_OBJECT (mq, "SingleQueue %d : pausing task", sq->id);
    result = gst_pad_pause_task (sq->srcpad);
757
    sq->sink_tainted = sq->src_tainted = TRUE;
758 759 760 761
  } else {
    gst_data_queue_flush (sq->queue);
    gst_segment_init (&sq->sink_segment, GST_FORMAT_TIME);
    gst_segment_init (&sq->src_segment, GST_FORMAT_TIME);
762
    /* All pads start off not-linked for a smooth kick-off */
763
    sq->srcresult = GST_FLOW_OK;
764
    sq->cur_time = 0;
765 766
    sq->max_size.visible = mq->max_size.visible;
    sq->is_eos = FALSE;
767 768
    sq->nextid = 0;
    sq->oldid = 0;
769
    sq->last_oldid = G_MAXUINT32;
770 771
    sq->next_time = GST_CLOCK_TIME_NONE;
    sq->last_time = GST_CLOCK_TIME_NONE;
772 773
    gst_data_queue_set_flushing (sq->queue, FALSE);

774 775 776 777 778
    /* Reset high time to be recomputed next */
    GST_MULTI_QUEUE_MUTEX_LOCK (mq);
    mq->high_time = GST_CLOCK_TIME_NONE;
    GST_MULTI_QUEUE_MUTEX_UNLOCK (mq);

779 780
    sq->flushing = FALSE;

781 782 783 784 785 786 787 788
    GST_LOG_OBJECT (mq, "SingleQueue %d : starting task", sq->id);
    result =
        gst_pad_start_task (sq->srcpad, (GstTaskFunction) gst_multi_queue_loop,
        sq->srcpad);
  }
  return result;
}

789 790 791 792 793 794 795 796 797 798 799 800 801 802 803 804 805 806 807 808 809 810 811 812 813 814 815 816 817 818 819 820 821 822 823 824 825 826
static void
update_buffering (GstMultiQueue * mq, GstSingleQueue * sq)
{
  GstDataQueueSize size;
  gint percent, tmp;
  gboolean post = FALSE;

  /* nothing to dowhen we are not in buffering mode */
  if (!mq->use_buffering)
    return;

  gst_data_queue_get_level (sq->queue, &size);

  GST_DEBUG_OBJECT (mq,
      "queue %d: visible %u/%u, bytes %u/%u, time %" G_GUINT64_FORMAT "/%"
      G_GUINT64_FORMAT, sq->id, size.visible, sq->max_size.visible,
      size.bytes, sq->max_size.bytes, sq->cur_time, sq->max_size.time);

  /* get bytes and time percentages and take the max */
  if (sq->is_eos) {
    percent = 100;
  } else {
    percent = 0;
    if (sq->max_size.time > 0) {
      tmp = (sq->cur_time * 100) / sq->max_size.time;
      percent = MAX (percent, tmp);
    }
    if (sq->max_size.bytes > 0) {
      tmp = (size.bytes * 100) / sq->max_size.bytes;
      percent = MAX (percent, tmp);
    }
  }

  if (mq->buffering) {
    post = TRUE;
    if (percent >= mq->high_percent) {
      mq->buffering = FALSE;
    }
827 828 829 830 831 832 833 834 835
    /* make sure it increases */
    percent = MAX (mq->percent, percent);

    if (percent == mq->percent)
      /* don't post if nothing changed */
      post = FALSE;
    else
      /* else keep last value we posted */
      mq->percent = percent;
836 837 838
  } else {
    if (percent < mq->low_percent) {
      mq->buffering = TRUE;
839
      mq->percent = percent;
840 841 842 843 844 845 846 847 848 849 850 851 852 853 854 855 856 857 858 859 860
      post = TRUE;
    }
  }
  if (post) {
    GstMessage *message;

    /* scale to high percent so that it becomes the 100% mark */
    percent = percent * 100 / mq->high_percent;
    /* clip */
    if (percent > 100)
      percent = 100;

    GST_DEBUG_OBJECT (mq, "buffering %d percent", percent);
    message = gst_message_new_buffering (GST_OBJECT_CAST (mq), percent);

    gst_element_post_message (GST_ELEMENT_CAST (mq), message);
  } else {
    GST_DEBUG_OBJECT (mq, "filled %d percent", percent);
  }
}

861
/* calculate the diff between running time on the sink and src of the queue.
862 863
 * This is the total amount of time in the queue. 
 * WITH LOCK TAKEN */
864 865 866 867 868
static void
update_time_level (GstMultiQueue * mq, GstSingleQueue * sq)
{
  gint64 sink_time, src_time;

869 870 871
  if (sq->sink_tainted) {
    sink_time = sq->sinktime =
        gst_segment_to_running_time (&sq->sink_segment, GST_FORMAT_TIME,
Wim Taymans's avatar
Wim Taymans committed
872
        sq->sink_segment.position);
873 874 875 876

    if (G_UNLIKELY (sink_time != GST_CLOCK_TIME_NONE))
      /* if we have a time, we become untainted and use the time */
      sq->sink_tainted = FALSE;
877 878 879 880 881 882
  } else
    sink_time = sq->sinktime;

  if (sq->src_tainted) {
    src_time = sq->srctime =
        gst_segment_to_running_time (&sq->src_segment, GST_FORMAT_TIME,
Wim Taymans's avatar
Wim Taymans committed
883
        sq->src_segment.position);
884 885 886
    /* if we have a time, we become untainted and use the time */
    if (G_UNLIKELY (src_time != GST_CLOCK_TIME_NONE))
      sq->src_tainted = FALSE;
887 888
  } else
    src_time = sq->srctime;
889 890 891 892 893

  GST_DEBUG_OBJECT (mq,
      "queue %d, sink %" GST_TIME_FORMAT ", src %" GST_TIME_FORMAT, sq->id,
      GST_TIME_ARGS (sink_time), GST_TIME_ARGS (src_time));

894
  /* This allows for streams with out of order timestamping - sometimes the
895
   * emerging timestamp is later than the arriving one(s) */
896 897 898 899
  if (G_LIKELY (sink_time != -1 && src_time != -1 && sink_time > src_time))
    sq->cur_time = sink_time - src_time;
  else
    sq->cur_time = 0;
900

901 902
  /* updating the time level can change the buffering state */
  update_buffering (mq, sq);
903

904
  return;
905 906
}

Wim Taymans's avatar
Wim Taymans committed
907
/* take a SEGMENT event and apply the values to segment, updating the time
908 909 910 911 912
 * level of queue. */
static void
apply_segment (GstMultiQueue * mq, GstSingleQueue * sq, GstEvent * event,
    GstSegment * segment)
{
913
  gst_event_copy_segment (event, segment);
914 915 916

  /* now configure the values, we use these to track timestamps on the
   * sinkpad. */
Wim Taymans's avatar
Wim Taymans committed
917
  if (segment->format != GST_FORMAT_TIME) {
918 919
    /* non-time format, pretent the current time segment is closed with a
     * 0 start and unknown stop time. */
Wim Taymans's avatar
Wim Taymans committed
920 921 922 923
    segment->format = GST_FORMAT_TIME;
    segment->start = 0;
    segment->stop = -1;
    segment->time = 0;
924
  }
925 926
  GST_MULTI_QUEUE_MUTEX_LOCK (mq);

927 928 929 930 931
  if (segment == &sq->sink_segment)
    sq->sink_tainted = TRUE;
  else
    sq->src_tainted = TRUE;

932
  GST_DEBUG_OBJECT (mq,
Wim Taymans's avatar
Wim Taymans committed
933
      "queue %d, configured SEGMENT %" GST_SEGMENT_FORMAT, sq->id, segment);
934 935 936

  /* segment can update the time level of the queue */
  update_time_level (mq, sq);
937 938

  GST_MULTI_QUEUE_MUTEX_UNLOCK (mq);
939 940 941 942
}

/* take a buffer and update segment, updating the time level of the queue. */
static void
943 944
apply_buffer (GstMultiQueue * mq, GstSingleQueue * sq, GstClockTime timestamp,
    GstClockTime duration, GstSegment * segment)
945
{
946 947
  GST_MULTI_QUEUE_MUTEX_LOCK (mq);

948 949 950
  /* if no timestamp is set, assume it's continuous with the previous 
   * time */
  if (timestamp == GST_CLOCK_TIME_NONE)
Wim Taymans's avatar
Wim Taymans committed
951
    timestamp = segment->position;
952 953 954 955 956

  /* add duration */
  if (duration != GST_CLOCK_TIME_NONE)
    timestamp += duration;

Wim Taymans's avatar
Wim Taymans committed
957
  GST_DEBUG_OBJECT (mq, "queue %d, position updated to %" GST_TIME_FORMAT,
958 959
      sq->id, GST_TIME_ARGS (timestamp));

Wim Taymans's avatar
Wim Taymans committed
960
  segment->position = timestamp;
961

962 963 964 965 966
  if (segment == &sq->sink_segment)
    sq->sink_tainted = TRUE;
  else
    sq->src_tainted = TRUE;

967 968
  /* calc diff with other end */
  update_time_level (mq, sq);
969
  GST_MULTI_QUEUE_MUTEX_UNLOCK (mq);
970 971
}

972 973 974 975 976 977 978 979 980 981 982 983 984 985 986 987 988 989
static GstClockTime
get_running_time (GstSegment * segment, GstMiniObject * object, gboolean end)
{
  GstClockTime time = GST_CLOCK_TIME_NONE;

  if (GST_IS_BUFFER (object)) {
    GstBuffer *buf = GST_BUFFER_CAST (object);

    if (GST_BUFFER_TIMESTAMP_IS_VALID (buf)) {
      time = GST_BUFFER_TIMESTAMP (buf);
      if (end && GST_BUFFER_DURATION_IS_VALID (buf))
        time += GST_BUFFER_DURATION (buf);
      if (time > segment->stop)
        time = segment->stop;
      time = gst_segment_to_running_time (segment, GST_FORMAT_TIME, time);
    }
  } else if (GST_IS_BUFFER_LIST (object)) {
    GstBufferList *list = GST_BUFFER_LIST_CAST (object);
990
    gint i, n;
991 992
    GstBuffer *buf;

Wim Taymans's avatar
Wim Taymans committed
993
    n = gst_buffer_list_length (list);
994 995 996 997 998 999 1000 1001 1002 1003
    for (i = 0; i < n; i++) {
      buf = gst_buffer_list_get (list, i);
      if (GST_BUFFER_TIMESTAMP_IS_VALID (buf)) {
        time = GST_BUFFER_TIMESTAMP (buf);
        if (end && GST_BUFFER_DURATION_IS_VALID (buf))
          time += GST_BUFFER_DURATION (buf);
        if (time > segment->stop)
          time = segment->stop;
        time = gst_segment_to_running_time (segment, GST_FORMAT_TIME, time);
        if (!end)
1004
          goto done;
1005 1006
      } else if (!end) {
        goto done;
1007
      }
1008
    }
1009 1010 1011 1012
  } else if (GST_IS_EVENT (object)) {
    GstEvent *event = GST_EVENT_CAST (object);

    /* For newsegment events return the running time of the start position */
1013
    if (GST_EVENT_TYPE (event) == GST_EVENT_SEGMENT) {
1014
      const GstSegment *new_segment;
1015

1016
      gst_event_parse_segment (event, &new_segment);
1017
      if (new_segment->format == GST_FORMAT_TIME) {
1018
        time =
1019 1020
            gst_segment_to_running_time (new_segment, GST_FORMAT_TIME,
            new_segment->start);
1021 1022 1023 1024 1025 1026 1027 1028
      }
    }
  }

done:
  return time;
}

1029
static GstFlowReturn
1030 1031 1032
gst_single_queue_push_one (GstMultiQueue * mq, GstSingleQueue * sq,
    GstMiniObject * object)
{
1033 1034
  GstFlowReturn result = GST_FLOW_OK;

1035
  if (GST_IS_BUFFER (object)) {
1036
    GstBuffer *buffer;
1037
    GstClockTime timestamp, duration;
1038

1039
    buffer = GST_BUFFER_CAST (object);
1040 1041
    timestamp = GST_BUFFER_TIMESTAMP (buffer);
    duration = GST_BUFFER_DURATION (buffer);
1042

1043
    apply_buffer (mq, sq, timestamp, duration, &sq->src_segment);
1044

1045 1046 1047
    /* Applying the buffer may have made the queue non-full again, unblock it if needed */
    gst_data_queue_limits_changed (sq->queue);

1048 1049 1050 1051
    GST_DEBUG_OBJECT (mq,
        "SingleQueue %d : Pushing buffer %p with ts %" GST_TIME_FORMAT,
        sq->id, buffer, GST_TIME_ARGS (timestamp));

1052
    result = gst_pad_push (sq->srcpad, buffer);
1053
  } else if (GST_IS_EVENT (object)) {
1054 1055
    GstEvent *event;

1056 1057 1058 1059
    event = GST_EVENT_CAST (object);

    switch (GST_EVENT_TYPE (event)) {
      case GST_EVENT_EOS:
1060
        result = GST_FLOW_EOS;
1061
        break;
Wim Taymans's avatar
Wim Taymans committed
1062
      case GST_EVENT_SEGMENT:
1063
        apply_segment (mq, sq, event, &sq->src_segment);
1064 1065
        /* Applying the segment may have made the queue non-full again, unblock it if needed */
        gst_data_queue_limits_changed (sq->queue);
1066 1067 1068
        break;
      default:
        break;
1069
    }
1070

1071 1072 1073 1074
    GST_DEBUG_OBJECT (mq,
        "SingleQueue %d : Pushing event %p of type %s",
        sq->id, event, GST_EVENT_TYPE_NAME (event));

1075
    gst_pad_push_event (sq->srcpad, event);
1076 1077 1078 1079
  } else {
    g_warning ("Unexpected object in singlequeue %d (refcounting problem?)",
        sq->id);
  }
1080
  return result;
1081

1082 1083 1084 1085 1086 1087 1088 1089 1090 1091 1092 1093
  /* ERRORS */
}

static GstMiniObject *
gst_multi_queue_item_steal_object (GstMultiQueueItem * item)
{
  GstMiniObject *res;

  res = item->object;
  item->object = NULL;

  return res;
1094 1095 1096 1097 1098
}

static void
gst_multi_queue_item_destroy (GstMultiQueueItem * item)
{
1099 1100
  if (item->object)
    gst_mini_object_unref (item->object);
1101
  g_slice_free (GstMultiQueueItem, item);
1102 1103
}

1104
/* takes ownership of passed mini object! */
1105
static GstMultiQueueItem *
1106