gstdataqueue.c 23.2 KB
Newer Older
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17
/* GStreamer
 * Copyright (C) 2006 Edward Hervey <edward@fluendo.com>
 *
 * gstdataqueue.c:
 *
 * This library is free software; you can redistribute it and/or
 * modify it under the terms of the GNU Library General Public
 * License as published by the Free Software Foundation; either
 * version 2 of the License, or (at your option) any later version.
 *
 * This library is distributed in the hope that it will be useful,
 * but WITHOUT ANY WARRANTY; without even the implied warranty of
 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
 * Library General Public License for more details.
 *
 * You should have received a copy of the GNU Library General Public
 * License along with this library; if not, write to the
Tim-Philipp Müller's avatar
Tim-Philipp Müller committed
18 19
 * Free Software Foundation, Inc., 51 Franklin St, Fifth Floor,
 * Boston, MA 02110-1301, USA.
20 21 22 23
 */

/**
 * SECTION:gstdataqueue
24
 * @title: GstDataQueue
25 26
 * @short_description: Threadsafe queueing object
 *
27 28 29
 * #GstDataQueue is an object that handles threadsafe queueing of objects. It
 * also provides size-related functionality. This object should be used for
 * any #GstElement that wishes to provide some sort of queueing functionality.
30
 */
31 32 33
#ifdef HAVE_CONFIG_H
#include "config.h"
#endif
34 35

#include <gst/gst.h>
36
#include "string.h"
37
#include "gstdataqueue.h"
38
#include "gstqueuearray.h"
39
#include "gst/glib-compat-private.h"
40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55

GST_DEBUG_CATEGORY_STATIC (data_queue_debug);
#define GST_CAT_DEFAULT (data_queue_debug)
GST_DEBUG_CATEGORY_STATIC (data_queue_dataflow);


/* Queue signals and args */
enum
{
  SIGNAL_EMPTY,
  SIGNAL_FULL,
  LAST_SIGNAL
};

enum
{
56 57 58 59
  PROP_0,
  PROP_CUR_LEVEL_VISIBLE,
  PROP_CUR_LEVEL_BYTES,
  PROP_CUR_LEVEL_TIME
60
      /* FILL ME */
61 62
};

63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82
struct _GstDataQueuePrivate
{
  /* the array of data we're keeping our grubby hands on */
  GstQueueArray *queue;

  GstDataQueueSize cur_level;   /* size of the queue */
  GstDataQueueCheckFullFunction checkfull;      /* Callback to check if the queue is full */
  gpointer *checkdata;

  GMutex qlock;                 /* lock for queue (vs object lock) */
  gboolean waiting_add;
  GCond item_add;               /* signals buffers now available for reading */
  gboolean waiting_del;
  GCond item_del;               /* signals space now available for writing */
  gboolean flushing;            /* indicates whether conditions where signalled because
                                 * of external flushing */
  GstDataQueueFullCallback fullcallback;
  GstDataQueueEmptyCallback emptycallback;
};

83
#define GST_DATA_QUEUE_MUTEX_LOCK(q) G_STMT_START {                     \
84
    GST_CAT_TRACE (data_queue_dataflow,                                 \
85 86
      "locking qlock from thread %p",                                   \
      g_thread_self ());                                                \
87
  g_mutex_lock (&q->priv->qlock);                                       \
88
  GST_CAT_TRACE (data_queue_dataflow,                                   \
89 90 91 92
      "locked qlock from thread %p",                                    \
      g_thread_self ());                                                \
} G_STMT_END

93 94
#define GST_DATA_QUEUE_MUTEX_LOCK_CHECK(q, label) G_STMT_START {        \
    GST_DATA_QUEUE_MUTEX_LOCK (q);                                      \
95
    if (q->priv->flushing)                                              \
96
      goto label;                                                       \
97 98 99
  } G_STMT_END

#define GST_DATA_QUEUE_MUTEX_UNLOCK(q) G_STMT_START {                   \
100
    GST_CAT_TRACE (data_queue_dataflow,                                 \
101 102
      "unlocking qlock from thread %p",                                 \
      g_thread_self ());                                                \
103
  g_mutex_unlock (&q->priv->qlock);                                     \
104 105
} G_STMT_END

106 107 108 109 110 111
#define STATUS(q, msg)                                                  \
  GST_CAT_LOG (data_queue_dataflow,                                     \
               "queue:%p " msg ": %u visible items, %u "                \
               "bytes, %"G_GUINT64_FORMAT                               \
               " ns, %u elements",                                      \
               queue,                                                   \
112 113 114 115
               q->priv->cur_level.visible,                              \
               q->priv->cur_level.bytes,                                \
               q->priv->cur_level.time,                                 \
               gst_queue_array_get_length (q->priv->queue))
116 117 118 119 120 121 122 123 124 125

static void gst_data_queue_finalize (GObject * object);

static void gst_data_queue_set_property (GObject * object,
    guint prop_id, const GValue * value, GParamSpec * pspec);
static void gst_data_queue_get_property (GObject * object,
    guint prop_id, GValue * value, GParamSpec * pspec);

static guint gst_data_queue_signals[LAST_SIGNAL] = { 0 };

126 127 128 129 130 131
#define _do_init \
{ \
  GST_DEBUG_CATEGORY_INIT (data_queue_debug, "dataqueue", 0, \
      "data queue object"); \
  GST_DEBUG_CATEGORY_INIT (data_queue_dataflow, "data_queue_dataflow", 0, \
      "dataflow inside the data queue object"); \
132 133
}

134
#define parent_class gst_data_queue_parent_class
135 136
G_DEFINE_TYPE_WITH_CODE (GstDataQueue, gst_data_queue, G_TYPE_OBJECT,
    G_ADD_PRIVATE (GstDataQueue) _do_init);
137

138 139 140 141 142
static void
gst_data_queue_class_init (GstDataQueueClass * klass)
{
  GObjectClass *gobject_class = G_OBJECT_CLASS (klass);

143 144
  gobject_class->set_property = gst_data_queue_set_property;
  gobject_class->get_property = gst_data_queue_get_property;
145 146 147

  /* signals */
  /**
148
   * GstDataQueue::empty: (skip)
149 150 151 152 153 154 155 156 157 158 159 160 161
   * @queue: the queue instance
   *
   * Reports that the queue became empty (empty).
   * A queue is empty if the total amount of visible items inside it (num-visible, time,
   * size) is lower than the boundary values which can be set through the GObject
   * properties.
   */
  gst_data_queue_signals[SIGNAL_EMPTY] =
      g_signal_new ("empty", G_TYPE_FROM_CLASS (klass), G_SIGNAL_RUN_FIRST,
      G_STRUCT_OFFSET (GstDataQueueClass, empty), NULL, NULL,
      g_cclosure_marshal_VOID__VOID, G_TYPE_NONE, 0);

  /**
162
   * GstDataQueue::full: (skip)
163 164 165 166 167 168 169 170 171 172 173 174 175
   * @queue: the queue instance
   *
   * Reports that the queue became full (full).
   * A queue is full if the total amount of data inside it (num-visible, time,
   * size) is higher than the boundary values which can be set through the GObject
   * properties.
   */
  gst_data_queue_signals[SIGNAL_FULL] =
      g_signal_new ("full", G_TYPE_FROM_CLASS (klass), G_SIGNAL_RUN_FIRST,
      G_STRUCT_OFFSET (GstDataQueueClass, full), NULL, NULL,
      g_cclosure_marshal_VOID__VOID, G_TYPE_NONE, 0);

  /* properties */
176
  g_object_class_install_property (gobject_class, PROP_CUR_LEVEL_BYTES,
177 178
      g_param_spec_uint ("current-level-bytes", "Current level (kB)",
          "Current amount of data in the queue (bytes)",
179
          0, G_MAXUINT, 0, G_PARAM_READABLE | G_PARAM_STATIC_STRINGS));
180
  g_object_class_install_property (gobject_class, PROP_CUR_LEVEL_VISIBLE,
181 182 183
      g_param_spec_uint ("current-level-visible",
          "Current level (visible items)",
          "Current number of visible items in the queue", 0, G_MAXUINT, 0,
184
          G_PARAM_READABLE | G_PARAM_STATIC_STRINGS));
185
  g_object_class_install_property (gobject_class, PROP_CUR_LEVEL_TIME,
186 187
      g_param_spec_uint64 ("current-level-time", "Current level (ns)",
          "Current amount of data in the queue (in ns)", 0, G_MAXUINT64, 0,
188
          G_PARAM_READABLE | G_PARAM_STATIC_STRINGS));
189

190
  gobject_class->finalize = gst_data_queue_finalize;
191 192 193 194 195
}

static void
gst_data_queue_init (GstDataQueue * queue)
{
196
  queue->priv = gst_data_queue_get_instance_private (queue);
197 198 199 200

  queue->priv->cur_level.visible = 0;   /* no content */
  queue->priv->cur_level.bytes = 0;     /* no content */
  queue->priv->cur_level.time = 0;      /* no content */
201

202
  queue->priv->checkfull = NULL;
203

204 205 206 207
  g_mutex_init (&queue->priv->qlock);
  g_cond_init (&queue->priv->item_add);
  g_cond_init (&queue->priv->item_del);
  queue->priv->queue = gst_queue_array_new (50);
208 209 210 211 212

  GST_DEBUG ("initialized queue's not_empty & not_full conditions");
}

/**
213
 * gst_data_queue_new: (skip)
214 215
 * @checkfull: the callback used to tell if the element considers the queue full
 * or not.
216 217
 * @fullcallback: the callback which will be called when the queue is considered full.
 * @emptycallback: the callback which will be called when the queue is considered empty.
218 219
 * @checkdata: a #gpointer that will be passed to the @checkfull, @fullcallback,
 *   and @emptycallback callbacks.
220
 *
221 222 223 224
 * Creates a new #GstDataQueue. If @fullcallback or @emptycallback are supplied, then
 * the #GstDataQueue will call the respective callback to signal full or empty condition.
 * If the callbacks are NULL the #GstDataQueue will instead emit 'full' and 'empty'
 * signals.
225
 *
226
 * Returns: a new #GstDataQueue.
227
 *
228
 * Since: 1.2
229 230
 */
GstDataQueue *
231
gst_data_queue_new (GstDataQueueCheckFullFunction checkfull,
232 233
    GstDataQueueFullCallback fullcallback,
    GstDataQueueEmptyCallback emptycallback, gpointer checkdata)
234 235 236 237 238
{
  GstDataQueue *ret;

  g_return_val_if_fail (checkfull != NULL, NULL);

239
  ret = g_object_new (GST_TYPE_DATA_QUEUE, NULL);
240 241 242 243
  ret->priv->checkfull = checkfull;
  ret->priv->checkdata = checkdata;
  ret->priv->fullcallback = fullcallback;
  ret->priv->emptycallback = emptycallback;
244 245 246 247 248 249 250

  return ret;
}

static void
gst_data_queue_cleanup (GstDataQueue * queue)
{
251 252 253 254
  GstDataQueuePrivate *priv = queue->priv;

  while (!gst_queue_array_is_empty (priv->queue)) {
    GstDataQueueItem *item = gst_queue_array_pop_head (priv->queue);
255 256 257 258

    /* Just call the destroy notify on the item */
    item->destroy (item);
  }
259 260 261
  priv->cur_level.visible = 0;
  priv->cur_level.bytes = 0;
  priv->cur_level.time = 0;
262 263 264 265 266 267 268
}

/* called only once, as opposed to dispose */
static void
gst_data_queue_finalize (GObject * object)
{
  GstDataQueue *queue = GST_DATA_QUEUE (object);
269
  GstDataQueuePrivate *priv = queue->priv;
270 271 272 273

  GST_DEBUG ("finalizing queue");

  gst_data_queue_cleanup (queue);
274
  gst_queue_array_free (priv->queue);
275 276

  GST_DEBUG ("free mutex");
277
  g_mutex_clear (&priv->qlock);
278 279
  GST_DEBUG ("done free mutex");

280 281
  g_cond_clear (&priv->item_add);
  g_cond_clear (&priv->item_del);
282

283
  G_OBJECT_CLASS (parent_class)->finalize (object);
284 285
}

286
static inline void
287 288
gst_data_queue_locked_flush (GstDataQueue * queue)
{
289 290
  GstDataQueuePrivate *priv = queue->priv;

291 292 293 294
  STATUS (queue, "before flushing");
  gst_data_queue_cleanup (queue);
  STATUS (queue, "after flushing");
  /* we deleted something... */
295 296
  if (priv->waiting_del)
    g_cond_signal (&priv->item_del);
297 298
}

299
static inline gboolean
300 301
gst_data_queue_locked_is_empty (GstDataQueue * queue)
{
302 303 304
  GstDataQueuePrivate *priv = queue->priv;

  return (gst_queue_array_get_length (priv->queue) == 0);
305 306
}

307
static inline gboolean
308 309
gst_data_queue_locked_is_full (GstDataQueue * queue)
{
310 311 312 313
  GstDataQueuePrivate *priv = queue->priv;

  return priv->checkfull (queue, priv->cur_level.visible,
      priv->cur_level.bytes, priv->cur_level.time, priv->checkdata);
314 315 316
}

/**
317
 * gst_data_queue_flush: (skip)
318 319
 * @queue: a #GstDataQueue.
 *
320
 * Flushes all the contents of the @queue. Any call to #gst_data_queue_push and
321 322
 * #gst_data_queue_pop will be released.
 * MT safe.
323
 *
324
 * Since: 1.2
325 326 327 328 329 330 331 332 333 334 335
 */
void
gst_data_queue_flush (GstDataQueue * queue)
{
  GST_DEBUG ("queue:%p", queue);
  GST_DATA_QUEUE_MUTEX_LOCK (queue);
  gst_data_queue_locked_flush (queue);
  GST_DATA_QUEUE_MUTEX_UNLOCK (queue);
}

/**
336
 * gst_data_queue_is_empty: (skip)
337 338 339 340 341
 * @queue: a #GstDataQueue.
 *
 * Queries if there are any items in the @queue.
 * MT safe.
 *
342
 * Returns: %TRUE if @queue is empty.
343
 *
344
 * Since: 1.2
345 346 347 348 349 350 351 352 353 354 355 356 357 358
 */
gboolean
gst_data_queue_is_empty (GstDataQueue * queue)
{
  gboolean res;

  GST_DATA_QUEUE_MUTEX_LOCK (queue);
  res = gst_data_queue_locked_is_empty (queue);
  GST_DATA_QUEUE_MUTEX_UNLOCK (queue);

  return res;
}

/**
359
 * gst_data_queue_is_full: (skip)
360 361 362
 * @queue: a #GstDataQueue.
 *
 * Queries if @queue is full. This check will be done using the
Stefan Kost's avatar
Stefan Kost committed
363
 * #GstDataQueueCheckFullFunction registered with @queue.
364 365
 * MT safe.
 *
366
 * Returns: %TRUE if @queue is full.
367
 *
368
 * Since: 1.2
369 370 371 372 373 374 375 376 377 378 379 380 381 382
 */
gboolean
gst_data_queue_is_full (GstDataQueue * queue)
{
  gboolean res;

  GST_DATA_QUEUE_MUTEX_LOCK (queue);
  res = gst_data_queue_locked_is_full (queue);
  GST_DATA_QUEUE_MUTEX_UNLOCK (queue);

  return res;
}

/**
383
 * gst_data_queue_set_flushing: (skip)
384 385 386
 * @queue: a #GstDataQueue.
 * @flushing: a #gboolean stating if the queue will be flushing or not.
 *
387
 * Sets the queue to flushing state if @flushing is %TRUE. If set to flushing
388 389
 * state, any incoming data on the @queue will be discarded. Any call currently
 * blocking on #gst_data_queue_push or #gst_data_queue_pop will return straight
390
 * away with a return value of %FALSE. While the @queue is in flushing state,
391
 * all calls to those two functions will return %FALSE.
392
 *
393
 * MT Safe.
394
 *
395
 * Since: 1.2
396 397 398 399
 */
void
gst_data_queue_set_flushing (GstDataQueue * queue, gboolean flushing)
{
400 401
  GstDataQueuePrivate *priv = queue->priv;

402
  GST_DEBUG ("queue:%p , flushing:%d", queue, flushing);
403

404
  GST_DATA_QUEUE_MUTEX_LOCK (queue);
405
  priv->flushing = flushing;
406 407
  if (flushing) {
    /* release push/pop functions */
408 409 410 411
    if (priv->waiting_add)
      g_cond_signal (&priv->item_add);
    if (priv->waiting_del)
      g_cond_signal (&priv->item_del);
412 413 414 415
  }
  GST_DATA_QUEUE_MUTEX_UNLOCK (queue);
}

416 417 418 419 420 421 422 423 424 425 426 427 428 429 430
static void
gst_data_queue_push_force_unlocked (GstDataQueue * queue,
    GstDataQueueItem * item)
{
  GstDataQueuePrivate *priv = queue->priv;

  gst_queue_array_push_tail (priv->queue, item);

  if (item->visible)
    priv->cur_level.visible++;
  priv->cur_level.bytes += item->size;
  priv->cur_level.time += item->duration;
}

/**
431
 * gst_data_queue_push_force: (skip)
432 433 434 435 436 437 438 439 440 441
 * @queue: a #GstDataQueue.
 * @item: a #GstDataQueueItem.
 *
 * Pushes a #GstDataQueueItem (or a structure that begins with the same fields)
 * on the @queue. It ignores if the @queue is full or not and forces the @item
 * to be pushed anyway.
 * MT safe.
 *
 * Note that this function has slightly different semantics than gst_pad_push()
 * and gst_pad_push_event(): this function only takes ownership of @item and
442
 * the #GstMiniObject contained in @item if the push was successful. If %FALSE
443 444
 * is returned, the caller is responsible for freeing @item and its contents.
 *
445
 * Returns: %TRUE if the @item was successfully pushed on the @queue.
446
 *
447
 * Since: 1.2
448 449 450 451 452 453 454 455 456 457 458 459 460 461 462 463 464 465 466 467 468 469 470 471 472 473 474 475 476 477
 */
gboolean
gst_data_queue_push_force (GstDataQueue * queue, GstDataQueueItem * item)
{
  GstDataQueuePrivate *priv = queue->priv;

  g_return_val_if_fail (GST_IS_DATA_QUEUE (queue), FALSE);
  g_return_val_if_fail (item != NULL, FALSE);

  GST_DATA_QUEUE_MUTEX_LOCK_CHECK (queue, flushing);

  STATUS (queue, "before pushing");
  gst_data_queue_push_force_unlocked (queue, item);
  STATUS (queue, "after pushing");
  if (priv->waiting_add)
    g_cond_signal (&priv->item_add);

  GST_DATA_QUEUE_MUTEX_UNLOCK (queue);

  return TRUE;

  /* ERRORS */
flushing:
  {
    GST_DEBUG ("queue:%p, we are flushing", queue);
    GST_DATA_QUEUE_MUTEX_UNLOCK (queue);
    return FALSE;
  }
}

478
/**
479
 * gst_data_queue_push: (skip)
480 481 482 483 484 485 486 487
 * @queue: a #GstDataQueue.
 * @item: a #GstDataQueueItem.
 *
 * Pushes a #GstDataQueueItem (or a structure that begins with the same fields)
 * on the @queue. If the @queue is full, the call will block until space is
 * available, OR the @queue is set to flushing state.
 * MT safe.
 *
488 489
 * Note that this function has slightly different semantics than gst_pad_push()
 * and gst_pad_push_event(): this function only takes ownership of @item and
490
 * the #GstMiniObject contained in @item if the push was successful. If %FALSE
491 492
 * is returned, the caller is responsible for freeing @item and its contents.
 *
493
 * Returns: %TRUE if the @item was successfully pushed on the @queue.
494
 *
495
 * Since: 1.2
496 497 498 499
 */
gboolean
gst_data_queue_push (GstDataQueue * queue, GstDataQueueItem * item)
{
500 501
  GstDataQueuePrivate *priv = queue->priv;

502 503
  g_return_val_if_fail (GST_IS_DATA_QUEUE (queue), FALSE);
  g_return_val_if_fail (item != NULL, FALSE);
504

505
  GST_DATA_QUEUE_MUTEX_LOCK_CHECK (queue, flushing);
506 507 508 509

  STATUS (queue, "before pushing");

  /* We ALWAYS need to check for queue fillness */
510
  if (gst_data_queue_locked_is_full (queue)) {
511
    GST_DATA_QUEUE_MUTEX_UNLOCK (queue);
512 513
    if (G_LIKELY (priv->fullcallback))
      priv->fullcallback (queue, priv->checkdata);
514 515
    else
      g_signal_emit (queue, gst_data_queue_signals[SIGNAL_FULL], 0);
516
    GST_DATA_QUEUE_MUTEX_LOCK_CHECK (queue, flushing);
517 518 519

    /* signal might have removed some items */
    while (gst_data_queue_locked_is_full (queue)) {
520 521 522 523
      priv->waiting_del = TRUE;
      g_cond_wait (&priv->item_del, &priv->qlock);
      priv->waiting_del = FALSE;
      if (priv->flushing)
524
        goto flushing;
525 526 527
    }
  }

528
  gst_data_queue_push_force_unlocked (queue, item);
529 530

  STATUS (queue, "after pushing");
531 532
  if (priv->waiting_add)
    g_cond_signal (&priv->item_add);
533 534 535

  GST_DATA_QUEUE_MUTEX_UNLOCK (queue);

536
  return TRUE;
537

538 539 540 541 542 543 544
  /* ERRORS */
flushing:
  {
    GST_DEBUG ("queue:%p, we are flushing", queue);
    GST_DATA_QUEUE_MUTEX_UNLOCK (queue);
    return FALSE;
  }
545 546
}

547 548 549 550 551 552 553 554 555 556 557 558 559 560 561
static gboolean
_gst_data_queue_wait_non_empty (GstDataQueue * queue)
{
  GstDataQueuePrivate *priv = queue->priv;

  while (gst_data_queue_locked_is_empty (queue)) {
    priv->waiting_add = TRUE;
    g_cond_wait (&priv->item_add, &priv->qlock);
    priv->waiting_add = FALSE;
    if (priv->flushing)
      return FALSE;
  }
  return TRUE;
}

562
/**
563
 * gst_data_queue_pop: (skip)
564
 * @queue: a #GstDataQueue.
565
 * @item: (out): pointer to store the returned #GstDataQueueItem.
566 567 568 569 570 571
 *
 * Retrieves the first @item available on the @queue. If the queue is currently
 * empty, the call will block until at least one item is available, OR the
 * @queue is set to the flushing state.
 * MT safe.
 *
572
 * Returns: %TRUE if an @item was successfully retrieved from the @queue.
573
 *
574
 * Since: 1.2
575 576 577 578
 */
gboolean
gst_data_queue_pop (GstDataQueue * queue, GstDataQueueItem ** item)
{
579 580
  GstDataQueuePrivate *priv = queue->priv;

581
  g_return_val_if_fail (GST_IS_DATA_QUEUE (queue), FALSE);
582 583
  g_return_val_if_fail (item != NULL, FALSE);

584
  GST_DATA_QUEUE_MUTEX_LOCK_CHECK (queue, flushing);
585 586 587

  STATUS (queue, "before popping");

588
  if (gst_data_queue_locked_is_empty (queue)) {
589
    GST_DATA_QUEUE_MUTEX_UNLOCK (queue);
590 591
    if (G_LIKELY (priv->emptycallback))
      priv->emptycallback (queue, priv->checkdata);
592 593
    else
      g_signal_emit (queue, gst_data_queue_signals[SIGNAL_EMPTY], 0);
594
    GST_DATA_QUEUE_MUTEX_LOCK_CHECK (queue, flushing);
595

596 597
    if (!_gst_data_queue_wait_non_empty (queue))
      goto flushing;
598 599 600
  }

  /* Get the item from the GQueue */
601
  *item = gst_queue_array_pop_head (priv->queue);
602 603 604

  /* update current level counter */
  if ((*item)->visible)
605 606 607
    priv->cur_level.visible--;
  priv->cur_level.bytes -= (*item)->size;
  priv->cur_level.time -= (*item)->duration;
608

609
  STATUS (queue, "after popping");
610 611
  if (priv->waiting_del)
    g_cond_signal (&priv->item_del);
612 613 614

  GST_DATA_QUEUE_MUTEX_UNLOCK (queue);

615
  return TRUE;
616

617 618 619 620 621 622 623
  /* ERRORS */
flushing:
  {
    GST_DEBUG ("queue:%p, we are flushing", queue);
    GST_DATA_QUEUE_MUTEX_UNLOCK (queue);
    return FALSE;
  }
624 625
}

Edward Hervey's avatar
Edward Hervey committed
626 627 628
static gint
is_of_type (gconstpointer a, gconstpointer b)
{
629
  return !G_TYPE_CHECK_INSTANCE_TYPE (a, GPOINTER_TO_SIZE (b));
Edward Hervey's avatar
Edward Hervey committed
630 631
}

632
/**
633
 * gst_data_queue_peek: (skip)
634
 * @queue: a #GstDataQueue.
635
 * @item: (out): pointer to store the returned #GstDataQueueItem.
636 637 638 639 640 641
 *
 * Retrieves the first @item available on the @queue without removing it.
 * If the queue is currently empty, the call will block until at least
 * one item is available, OR the @queue is set to the flushing state.
 * MT safe.
 *
642
 * Returns: %TRUE if an @item was successfully retrieved from the @queue.
643
 *
644
 * Since: 1.2
645 646 647 648 649 650 651 652 653 654 655 656 657 658 659 660 661 662 663 664 665 666 667 668 669 670 671 672 673 674 675 676 677 678 679 680 681 682 683 684 685 686
 */
gboolean
gst_data_queue_peek (GstDataQueue * queue, GstDataQueueItem ** item)
{
  GstDataQueuePrivate *priv = queue->priv;

  g_return_val_if_fail (GST_IS_DATA_QUEUE (queue), FALSE);
  g_return_val_if_fail (item != NULL, FALSE);

  GST_DATA_QUEUE_MUTEX_LOCK_CHECK (queue, flushing);

  STATUS (queue, "before peeking");

  if (gst_data_queue_locked_is_empty (queue)) {
    GST_DATA_QUEUE_MUTEX_UNLOCK (queue);
    if (G_LIKELY (priv->emptycallback))
      priv->emptycallback (queue, priv->checkdata);
    else
      g_signal_emit (queue, gst_data_queue_signals[SIGNAL_EMPTY], 0);
    GST_DATA_QUEUE_MUTEX_LOCK_CHECK (queue, flushing);

    if (!_gst_data_queue_wait_non_empty (queue))
      goto flushing;
  }

  /* Get the item from the GQueue */
  *item = gst_queue_array_peek_head (priv->queue);

  STATUS (queue, "after peeking");
  GST_DATA_QUEUE_MUTEX_UNLOCK (queue);

  return TRUE;

  /* ERRORS */
flushing:
  {
    GST_DEBUG ("queue:%p, we are flushing", queue);
    GST_DATA_QUEUE_MUTEX_UNLOCK (queue);
    return FALSE;
  }
}

687
/**
688
 * gst_data_queue_drop_head: (skip)
689 690 691 692 693
 * @queue: The #GstDataQueue to drop an item from.
 * @type: The #GType of the item to drop.
 *
 * Pop and unref the head-most #GstMiniObject with the given #GType.
 *
694
 * Returns: %TRUE if an element was removed.
695
 *
696
 * Since: 1.2
697 698 699 700 701 702
 */
gboolean
gst_data_queue_drop_head (GstDataQueue * queue, GType type)
{
  gboolean res = FALSE;
  GstDataQueueItem *leak = NULL;
Edward Hervey's avatar
Edward Hervey committed
703
  guint idx;
704
  GstDataQueuePrivate *priv = queue->priv;
705

706 707
  g_return_val_if_fail (GST_IS_DATA_QUEUE (queue), FALSE);

708 709 710
  GST_DEBUG ("queue:%p", queue);

  GST_DATA_QUEUE_MUTEX_LOCK (queue);
711
  idx = gst_queue_array_find (priv->queue, is_of_type, GSIZE_TO_POINTER (type));
712

Edward Hervey's avatar
Edward Hervey committed
713
  if (idx == -1)
714 715
    goto done;

716
  leak = gst_queue_array_drop_element (priv->queue, idx);
717 718

  if (leak->visible)
719 720 721
    priv->cur_level.visible--;
  priv->cur_level.bytes -= leak->size;
  priv->cur_level.time -= leak->duration;
722 723 724 725 726 727 728 729 730 731 732 733 734

  leak->destroy (leak);

  res = TRUE;

done:
  GST_DATA_QUEUE_MUTEX_UNLOCK (queue);

  GST_DEBUG ("queue:%p , res:%d", queue, res);

  return res;
}

735
/**
736
 * gst_data_queue_limits_changed: (skip)
737
 * @queue: The #GstDataQueue
738 739
 *
 * Inform the queue that the limits for the fullness check have changed and that
740
 * any blocking gst_data_queue_push() should be unblocked to recheck the limits.
741
 *
742
 * Since: 1.2
743 744 745 746
 */
void
gst_data_queue_limits_changed (GstDataQueue * queue)
{
747 748
  GstDataQueuePrivate *priv = queue->priv;

749 750 751
  g_return_if_fail (GST_IS_DATA_QUEUE (queue));

  GST_DATA_QUEUE_MUTEX_LOCK (queue);
752
  if (priv->waiting_del) {
753
    GST_DEBUG ("signal del");
754
    g_cond_signal (&priv->item_del);
755
  }
756 757 758 759
  GST_DATA_QUEUE_MUTEX_UNLOCK (queue);
}

/**
760
 * gst_data_queue_get_level: (skip)
761
 * @queue: The #GstDataQueue
762
 * @level: (out): the location to store the result
763 764
 *
 * Get the current level of the queue.
765
 *
766
 * Since: 1.2
767 768 769 770
 */
void
gst_data_queue_get_level (GstDataQueue * queue, GstDataQueueSize * level)
{
771 772 773
  GstDataQueuePrivate *priv = queue->priv;

  memcpy (level, (&priv->cur_level), sizeof (GstDataQueueSize));
774 775
}

776 777 778 779 780 781 782 783 784 785 786 787 788 789 790 791
static void
gst_data_queue_set_property (GObject * object,
    guint prop_id, const GValue * value, GParamSpec * pspec)
{
  switch (prop_id) {
    default:
      G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
      break;
  }
}

static void
gst_data_queue_get_property (GObject * object,
    guint prop_id, GValue * value, GParamSpec * pspec)
{
  GstDataQueue *queue = GST_DATA_QUEUE (object);
792
  GstDataQueuePrivate *priv = queue->priv;
793 794 795 796

  GST_DATA_QUEUE_MUTEX_LOCK (queue);

  switch (prop_id) {
797 798
    case PROP_CUR_LEVEL_BYTES:
      g_value_set_uint (value, priv->cur_level.bytes);
799
      break;
800 801
    case PROP_CUR_LEVEL_VISIBLE:
      g_value_set_uint (value, priv->cur_level.visible);
802
      break;
803 804
    case PROP_CUR_LEVEL_TIME:
      g_value_set_uint64 (value, priv->cur_level.time);
805 806 807 808 809 810 811 812
      break;
    default:
      G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
      break;
  }

  GST_DATA_QUEUE_MUTEX_UNLOCK (queue);
}