gstmultiqueue.c 40 KB
Newer Older
1
2
/* GStreamer
 * Copyright (C) 2006 Edward Hervey <edward@fluendo.com>
3
4
 * Copyright (C) 2007 Jan Schmidt <jan@fluendo.com>
 * Copyright (C) 2007 Wim Taymans <wim@fluendo.com>
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
 *
 * gstmultiqueue.c:
 *
 * This library is free software; you can redistribute it and/or
 * modify it under the terms of the GNU Library General Public
 * License as published by the Free Software Foundation; either
 * version 2 of the License, or (at your option) any later version.
 *
 * This library is distributed in the hope that it will be useful,
 * but WITHOUT ANY WARRANTY; without even the implied warranty of
 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
 * Library General Public License for more details.
 *
 * You should have received a copy of the GNU Library General Public
 * License along with this library; if not, write to the
 * Free Software Foundation, Inc., 59 Temple Place - Suite 330,
 * Boston, MA 02111-1307, USA.
 */

#ifdef HAVE_CONFIG_H
#  include "config.h"
#endif

#include <gst/gst.h>
#include "gstmultiqueue.h"

/**
 * GstSingleQueue:
 * @sinkpad: associated sink #GstPad
 * @srcpad: associated source #GstPad
 *
 * Structure containing all information and properties about
 * a single queue.
 */
typedef struct _GstSingleQueue GstSingleQueue;

struct _GstSingleQueue
{
  /* unique identifier of the queue */
  guint id;

  GstMultiQueue *mqueue;

  GstPad *sinkpad;
  GstPad *srcpad;

  /* flowreturn of previous srcpad push */
  GstFlowReturn srcresult;
53
54
  GstSegment sink_segment;
  GstSegment src_segment;
55
56
57
58

  /* queue of data */
  GstDataQueue *queue;
  GstDataQueueSize max_size, extra_size;
59
60
  GstClockTime cur_time;
  gboolean is_eos;
61
  gboolean inextra;             /* TRUE if the queue is currently in extradata mode */
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84

  /* Protected by global lock */
  guint32 nextid;               /* ID of the next object waiting to be pushed */
  guint32 oldid;                /* ID of the last object pushed (last in a series) */
  GCond *turn;                  /* SingleQueue turn waiting conditional */
};


/* Extension of GstDataQueueItem structure for our usage */
typedef struct _GstMultiQueueItem GstMultiQueueItem;

struct _GstMultiQueueItem
{
  GstMiniObject *object;
  guint size;
  guint64 duration;
  gboolean visible;

  GDestroyNotify destroy;
  guint32 posid;
};

static GstSingleQueue *gst_single_queue_new (GstMultiQueue * mqueue);
85
static void gst_single_queue_free (GstSingleQueue * squeue);
86
87

static void wake_up_next_non_linked (GstMultiQueue * mq);
88
static void compute_high_id (GstMultiQueue * mq);
89
90
91
92
93
94
95
96
97
98
99
100
101
102

static GstStaticPadTemplate sinktemplate = GST_STATIC_PAD_TEMPLATE ("sink%d",
    GST_PAD_SINK,
    GST_PAD_REQUEST,
    GST_STATIC_CAPS_ANY);

static GstStaticPadTemplate srctemplate = GST_STATIC_PAD_TEMPLATE ("src%d",
    GST_PAD_SRC,
    GST_PAD_SOMETIMES,
    GST_STATIC_CAPS_ANY);

GST_DEBUG_CATEGORY_STATIC (multi_queue_debug);
#define GST_CAT_DEFAULT (multi_queue_debug)

103
104
105
106
/* default limits, we try to keep up to 2 seconds of data and if there is not
 * time, up to 10 MB. The number of buffers is dynamically scaled to make sure
 * there is data in the queues. Normally, the byte and time limits are not hit
 * in theses conditions. */
107
#define DEFAULT_MAX_SIZE_BYTES 10 * 1024 * 1024 /* 10 MB */
108
109
#define DEFAULT_MAX_SIZE_BUFFERS 5
#define DEFAULT_MAX_SIZE_TIME 2 * GST_SECOND
110

111
112
113
/* second limits. When we hit one of the above limits we are probably dealing
 * with a badly muxed file and we scale the limits to these emergency values.
 * This is currently not yet implemented. */
114
#define DEFAULT_EXTRA_SIZE_BYTES 10 * 1024 * 1024       /* 10 MB */
115
116
#define DEFAULT_EXTRA_SIZE_BUFFERS 5
#define DEFAULT_EXTRA_SIZE_TIME 3 * GST_SECOND
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154

/* Signals and args */
enum
{
  SIGNAL_UNDERRUN,
  SIGNAL_OVERRUN,
  LAST_SIGNAL
};

enum
{
  ARG_0,
  ARG_EXTRA_SIZE_BYTES,
  ARG_EXTRA_SIZE_BUFFERS,
  ARG_EXTRA_SIZE_TIME,
  ARG_MAX_SIZE_BYTES,
  ARG_MAX_SIZE_BUFFERS,
  ARG_MAX_SIZE_TIME,
};

#define GST_MULTI_QUEUE_MUTEX_LOCK(q) G_STMT_START {                          \
  g_mutex_lock (q->qlock);                                              \
} G_STMT_END

#define GST_MULTI_QUEUE_MUTEX_UNLOCK(q) G_STMT_START {                        \
  g_mutex_unlock (q->qlock);                                            \
} G_STMT_END

static void gst_multi_queue_finalize (GObject * object);
static void gst_multi_queue_set_property (GObject * object,
    guint prop_id, const GValue * value, GParamSpec * pspec);
static void gst_multi_queue_get_property (GObject * object,
    guint prop_id, GValue * value, GParamSpec * pspec);

static GstPad *gst_multi_queue_request_new_pad (GstElement * element,
    GstPadTemplate * temp, const gchar * name);
static void gst_multi_queue_release_pad (GstElement * element, GstPad * pad);

155
156
static void gst_multi_queue_loop (GstPad * pad);

157
158
159
160
161
162
163
164
165
166
167
168
169
#define _do_init(bla) \
  GST_DEBUG_CATEGORY_INIT (multi_queue_debug, "multiqueue", 0, "multiqueue element");

GST_BOILERPLATE_FULL (GstMultiQueue, gst_multi_queue, GstElement,
    GST_TYPE_ELEMENT, _do_init);

static guint gst_multi_queue_signals[LAST_SIGNAL] = { 0 };

static void
gst_multi_queue_base_init (gpointer g_class)
{
  GstElementClass *gstelement_class = GST_ELEMENT_CLASS (g_class);

170
171
172
  gst_element_class_set_details_simple (gstelement_class,
      "MultiQueue",
      "Generic", "Multiple data queue", "Edward Hervey <edward@fluendo.com>");
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
  gst_element_class_add_pad_template (gstelement_class,
      gst_static_pad_template_get (&sinktemplate));
  gst_element_class_add_pad_template (gstelement_class,
      gst_static_pad_template_get (&srctemplate));
}

static void
gst_multi_queue_class_init (GstMultiQueueClass * klass)
{
  GObjectClass *gobject_class = G_OBJECT_CLASS (klass);
  GstElementClass *gstelement_class = GST_ELEMENT_CLASS (klass);

  gobject_class->set_property =
      GST_DEBUG_FUNCPTR (gst_multi_queue_set_property);
  gobject_class->get_property =
      GST_DEBUG_FUNCPTR (gst_multi_queue_get_property);

  /* SIGNALS */
  gst_multi_queue_signals[SIGNAL_UNDERRUN] =
      g_signal_new ("underrun", G_TYPE_FROM_CLASS (klass), G_SIGNAL_RUN_FIRST,
      G_STRUCT_OFFSET (GstMultiQueueClass, underrun), NULL, NULL,
      g_cclosure_marshal_VOID__VOID, G_TYPE_NONE, 0);

  gst_multi_queue_signals[SIGNAL_OVERRUN] =
      g_signal_new ("overrun", G_TYPE_FROM_CLASS (klass), G_SIGNAL_RUN_FIRST,
      G_STRUCT_OFFSET (GstMultiQueueClass, overrun), NULL, NULL,
      g_cclosure_marshal_VOID__VOID, G_TYPE_NONE, 0);

  /* PROPERTIES */

  g_object_class_install_property (gobject_class, ARG_MAX_SIZE_BYTES,
      g_param_spec_uint ("max-size-bytes", "Max. size (kB)",
          "Max. amount of data in the queue (bytes, 0=disable)",
          0, G_MAXUINT, DEFAULT_MAX_SIZE_BYTES, G_PARAM_READWRITE));
  g_object_class_install_property (gobject_class, ARG_MAX_SIZE_BUFFERS,
      g_param_spec_uint ("max-size-buffers", "Max. size (buffers)",
          "Max. number of buffers in the queue (0=disable)",
          0, G_MAXUINT, DEFAULT_MAX_SIZE_BUFFERS, G_PARAM_READWRITE));
  g_object_class_install_property (gobject_class, ARG_MAX_SIZE_TIME,
      g_param_spec_uint64 ("max-size-time", "Max. size (ns)",
          "Max. amount of data in the queue (in ns, 0=disable)",
          0, G_MAXUINT64, DEFAULT_MAX_SIZE_TIME, G_PARAM_READWRITE));

  g_object_class_install_property (gobject_class, ARG_EXTRA_SIZE_BYTES,
      g_param_spec_uint ("extra-size-bytes", "Extra Size (kB)",
          "Amount of data the queues can grow if one of them is empty (bytes, 0=disable)",
          0, G_MAXUINT, DEFAULT_EXTRA_SIZE_BYTES, G_PARAM_READWRITE));
  g_object_class_install_property (gobject_class, ARG_EXTRA_SIZE_BUFFERS,
      g_param_spec_uint ("extra-size-buffers", "Extra Size (buffers)",
          "Amount of buffers the queues can grow if one of them is empty (0=disable)",
          0, G_MAXUINT, DEFAULT_EXTRA_SIZE_BUFFERS, G_PARAM_READWRITE));
  g_object_class_install_property (gobject_class, ARG_EXTRA_SIZE_TIME,
      g_param_spec_uint64 ("extra-size-time", "Extra Size (ns)",
          "Amount of time the queues can grow if one of them is empty (in ns, 0=disable)",
          0, G_MAXUINT64, DEFAULT_EXTRA_SIZE_TIME, G_PARAM_READWRITE));

  gobject_class->finalize = GST_DEBUG_FUNCPTR (gst_multi_queue_finalize);

  gstelement_class->request_new_pad =
      GST_DEBUG_FUNCPTR (gst_multi_queue_request_new_pad);
  gstelement_class->release_pad =
      GST_DEBUG_FUNCPTR (gst_multi_queue_release_pad);
}

static void
gst_multi_queue_init (GstMultiQueue * mqueue, GstMultiQueueClass * klass)
{
  mqueue->nbqueues = 0;
  mqueue->queues = NULL;

  mqueue->max_size.bytes = DEFAULT_MAX_SIZE_BYTES;
  mqueue->max_size.visible = DEFAULT_MAX_SIZE_BUFFERS;
  mqueue->max_size.time = DEFAULT_MAX_SIZE_TIME;

  mqueue->extra_size.bytes = DEFAULT_EXTRA_SIZE_BYTES;
  mqueue->extra_size.visible = DEFAULT_EXTRA_SIZE_BUFFERS;
  mqueue->extra_size.time = DEFAULT_EXTRA_SIZE_TIME;

251
  mqueue->counter = 1;
252
253
254
255
256
257
258
259
260
261
262
  mqueue->highid = -1;
  mqueue->nextnotlinked = -1;

  mqueue->qlock = g_mutex_new ();
}

static void
gst_multi_queue_finalize (GObject * object)
{
  GstMultiQueue *mqueue = GST_MULTI_QUEUE (object);

263
  g_list_foreach (mqueue->queues, (GFunc) gst_single_queue_free, NULL);
264
  g_list_free (mqueue->queues);
265
  mqueue->queues = NULL;
266
267
268
269

  /* free/unref instance data */
  g_mutex_free (mqueue->qlock);

270
  G_OBJECT_CLASS (parent_class)->finalize (object);
271
272
}

273
#define SET_CHILD_PROPERTY(mq,format) G_STMT_START {	        \
274
275
276
    GList * tmp = mq->queues;					\
    while (tmp) {						\
      GstSingleQueue *q = (GstSingleQueue*)tmp->data;		\
277
      q->max_size.format = mq->max_size.format;                 \
278
279
280
281
282
283
284
285
286
287
288
289
290
      tmp = g_list_next(tmp);					\
    };								\
} G_STMT_END

static void
gst_multi_queue_set_property (GObject * object, guint prop_id,
    const GValue * value, GParamSpec * pspec)
{
  GstMultiQueue *mq = GST_MULTI_QUEUE (object);

  switch (prop_id) {
    case ARG_MAX_SIZE_BYTES:
      mq->max_size.bytes = g_value_get_uint (value);
291
      SET_CHILD_PROPERTY (mq, bytes);
292
293
294
      break;
    case ARG_MAX_SIZE_BUFFERS:
      mq->max_size.visible = g_value_get_uint (value);
295
      SET_CHILD_PROPERTY (mq, visible);
296
297
298
      break;
    case ARG_MAX_SIZE_TIME:
      mq->max_size.time = g_value_get_uint64 (value);
299
      SET_CHILD_PROPERTY (mq, time);
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
      break;
    case ARG_EXTRA_SIZE_BYTES:
      mq->extra_size.bytes = g_value_get_uint (value);
      break;
    case ARG_EXTRA_SIZE_BUFFERS:
      mq->extra_size.visible = g_value_get_uint (value);
      break;
    case ARG_EXTRA_SIZE_TIME:
      mq->extra_size.time = g_value_get_uint64 (value);
      break;
    default:
      G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
      break;
  }
}

static void
gst_multi_queue_get_property (GObject * object, guint prop_id,
    GValue * value, GParamSpec * pspec)
{
  GstMultiQueue *mq = GST_MULTI_QUEUE (object);

  GST_MULTI_QUEUE_MUTEX_LOCK (mq);

  switch (prop_id) {
    case ARG_EXTRA_SIZE_BYTES:
      g_value_set_uint (value, mq->extra_size.bytes);
      break;
    case ARG_EXTRA_SIZE_BUFFERS:
      g_value_set_uint (value, mq->extra_size.visible);
      break;
    case ARG_EXTRA_SIZE_TIME:
      g_value_set_uint64 (value, mq->extra_size.time);
      break;
    case ARG_MAX_SIZE_BYTES:
      g_value_set_uint (value, mq->max_size.bytes);
      break;
    case ARG_MAX_SIZE_BUFFERS:
      g_value_set_uint (value, mq->max_size.visible);
      break;
    case ARG_MAX_SIZE_TIME:
      g_value_set_uint64 (value, mq->max_size.time);
      break;
    default:
      G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
      break;
  }

  GST_MULTI_QUEUE_MUTEX_UNLOCK (mq);
}

351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
GList *
gst_multi_queue_get_internal_links (GstPad * pad)
{
  GList *res = NULL;
  GstMultiQueue *mqueue;
  GstSingleQueue *sq = NULL;
  GList *tmp;

  g_return_val_if_fail (GST_IS_PAD (pad), NULL);

  mqueue = GST_MULTI_QUEUE (GST_PAD_PARENT (pad));
  if (!mqueue)
    goto no_parent;

  GST_MULTI_QUEUE_MUTEX_LOCK (mqueue);
  /* Find which single queue it belongs to */
  for (tmp = mqueue->queues; tmp && !res; tmp = g_list_next (tmp)) {
    sq = (GstSingleQueue *) tmp->data;

    if (sq->sinkpad == pad)
      res = g_list_prepend (res, sq->srcpad);
    if (sq->srcpad == pad)
      res = g_list_prepend (res, sq->sinkpad);
  }

  if (!res)
    GST_WARNING_OBJECT (mqueue, "That pad doesn't belong to this element ???");
  GST_MULTI_QUEUE_MUTEX_UNLOCK (mqueue);

  return res;

no_parent:
  {
    GST_DEBUG_OBJECT (pad, "no parent");
    return NULL;
  }
}

389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435

/*
 * GstElement methods
 */

static GstPad *
gst_multi_queue_request_new_pad (GstElement * element, GstPadTemplate * temp,
    const gchar * name)
{
  GstMultiQueue *mqueue = GST_MULTI_QUEUE (element);
  GstSingleQueue *squeue;

  GST_LOG_OBJECT (element, "name : %s", name);

  /* Create a new single queue, add the sink and source pad and return the sink pad */
  squeue = gst_single_queue_new (mqueue);

  GST_MULTI_QUEUE_MUTEX_LOCK (mqueue);
  mqueue->queues = g_list_append (mqueue->queues, squeue);
  GST_MULTI_QUEUE_MUTEX_UNLOCK (mqueue);

  GST_DEBUG_OBJECT (mqueue, "Returning pad %s:%s",
      GST_DEBUG_PAD_NAME (squeue->sinkpad));

  return squeue->sinkpad;
}

static void
gst_multi_queue_release_pad (GstElement * element, GstPad * pad)
{
  GstMultiQueue *mqueue = GST_MULTI_QUEUE (element);
  GstSingleQueue *sq = NULL;
  GList *tmp;

  GST_LOG_OBJECT (element, "pad %s:%s", GST_DEBUG_PAD_NAME (pad));

  GST_MULTI_QUEUE_MUTEX_LOCK (mqueue);
  /* Find which single queue it belongs to, knowing that it should be a sinkpad */
  for (tmp = mqueue->queues; tmp; tmp = g_list_next (tmp)) {
    sq = (GstSingleQueue *) tmp->data;

    if (sq->sinkpad == pad)
      break;
  }

  if (!tmp) {
    GST_WARNING_OBJECT (mqueue, "That pad doesn't belong to this element ???");
436
    GST_MULTI_QUEUE_MUTEX_UNLOCK (mqueue);
437
438
439
    return;
  }

440
441
442
  /* FIXME: The removal of the singlequeue should probably not happen until it
   * finishes draining */

443
444
445
  /* remove it from the list */
  mqueue->queues = g_list_delete_link (mqueue->queues, tmp);

446
447
448
  /* FIXME : recompute next-non-linked */
  GST_MULTI_QUEUE_MUTEX_UNLOCK (mqueue);

449
450
451
  /* delete SingleQueue */
  gst_data_queue_set_flushing (sq->queue, TRUE);

452
453
  gst_pad_set_active (sq->srcpad, FALSE);
  gst_pad_set_active (sq->sinkpad, FALSE);
454
455
  gst_element_remove_pad (element, sq->srcpad);
  gst_element_remove_pad (element, sq->sinkpad);
456
  gst_single_queue_free (sq);
457
458
459
}

static gboolean
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
gst_single_queue_flush (GstMultiQueue * mq, GstSingleQueue * sq, gboolean flush)
{
  gboolean result;

  GST_DEBUG_OBJECT (mq, "flush %s queue %d", (flush ? "start" : "stop"),
      sq->id);

  if (flush) {
    sq->srcresult = GST_FLOW_WRONG_STATE;
    gst_data_queue_set_flushing (sq->queue, TRUE);

    /* wake up non-linked task */
    GST_LOG_OBJECT (mq, "SingleQueue %d : waking up eventually waiting task",
        sq->id);
    GST_MULTI_QUEUE_MUTEX_LOCK (mq);
    g_cond_signal (sq->turn);
    GST_MULTI_QUEUE_MUTEX_UNLOCK (mq);

    GST_LOG_OBJECT (mq, "SingleQueue %d : pausing task", sq->id);
    result = gst_pad_pause_task (sq->srcpad);
  } else {
    gst_data_queue_flush (sq->queue);
    gst_segment_init (&sq->sink_segment, GST_FORMAT_TIME);
    gst_segment_init (&sq->src_segment, GST_FORMAT_TIME);
484
485
    /* All pads start off not-linked for a smooth kick-off */
    sq->srcresult = GST_FLOW_NOT_LINKED;
486
    sq->cur_time = 0;
487
488
489
    sq->max_size.visible = mq->max_size.visible;
    sq->is_eos = FALSE;
    sq->inextra = FALSE;
490
491
    sq->nextid = 0;
    sq->oldid = 0;
492
493
494
495
496
497
498
499
500
501
502
    gst_data_queue_set_flushing (sq->queue, FALSE);

    GST_LOG_OBJECT (mq, "SingleQueue %d : starting task", sq->id);
    result =
        gst_pad_start_task (sq->srcpad, (GstTaskFunction) gst_multi_queue_loop,
        sq->srcpad);
  }
  return result;
}

/* calculate the diff between running time on the sink and src of the queue.
503
504
 * This is the total amount of time in the queue. 
 * WITH LOCK TAKEN */
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
static void
update_time_level (GstMultiQueue * mq, GstSingleQueue * sq)
{
  gint64 sink_time, src_time;

  sink_time =
      gst_segment_to_running_time (&sq->sink_segment, GST_FORMAT_TIME,
      sq->sink_segment.last_stop);

  src_time = gst_segment_to_running_time (&sq->src_segment, GST_FORMAT_TIME,
      sq->src_segment.last_stop);

  GST_DEBUG_OBJECT (mq,
      "queue %d, sink %" GST_TIME_FORMAT ", src %" GST_TIME_FORMAT, sq->id,
      GST_TIME_ARGS (sink_time), GST_TIME_ARGS (src_time));

521
522
  /* This allows for streams with out of order timestamping - sometimes the 
   * emerging timestamp is later than the arriving one(s) */
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
  if (sink_time >= src_time)
    sq->cur_time = sink_time - src_time;
  else
    sq->cur_time = 0;
}

/* take a NEWSEGMENT event and apply the values to segment, updating the time
 * level of queue. */
static void
apply_segment (GstMultiQueue * mq, GstSingleQueue * sq, GstEvent * event,
    GstSegment * segment)
{
  gboolean update;
  GstFormat format;
  gdouble rate, arate;
  gint64 start, stop, time;

  gst_event_parse_new_segment_full (event, &update, &rate, &arate,
      &format, &start, &stop, &time);

  /* now configure the values, we use these to track timestamps on the
   * sinkpad. */
  if (format != GST_FORMAT_TIME) {
    /* non-time format, pretent the current time segment is closed with a
     * 0 start and unknown stop time. */
    update = FALSE;
    format = GST_FORMAT_TIME;
    start = 0;
    stop = -1;
    time = 0;
  }
554
555
556

  GST_MULTI_QUEUE_MUTEX_LOCK (mq);

557
558
559
560
561
562
563
564
  gst_segment_set_newsegment_full (segment, update,
      rate, arate, format, start, stop, time);

  GST_DEBUG_OBJECT (mq,
      "queue %d, configured NEWSEGMENT %" GST_SEGMENT_FORMAT, sq->id, segment);

  /* segment can update the time level of the queue */
  update_time_level (mq, sq);
565
566

  GST_MULTI_QUEUE_MUTEX_UNLOCK (mq);
567
568
569
570
}

/* take a buffer and update segment, updating the time level of the queue. */
static void
571
572
apply_buffer (GstMultiQueue * mq, GstSingleQueue * sq, GstClockTime timestamp,
    GstClockTime duration, GstSegment * segment)
573
{
574
575
  GST_MULTI_QUEUE_MUTEX_LOCK (mq);

576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
  /* if no timestamp is set, assume it's continuous with the previous 
   * time */
  if (timestamp == GST_CLOCK_TIME_NONE)
    timestamp = segment->last_stop;

  /* add duration */
  if (duration != GST_CLOCK_TIME_NONE)
    timestamp += duration;

  GST_DEBUG_OBJECT (mq, "queue %d, last_stop updated to %" GST_TIME_FORMAT,
      sq->id, GST_TIME_ARGS (timestamp));

  gst_segment_set_last_stop (segment, GST_FORMAT_TIME, timestamp);

  /* calc diff with other end */
  update_time_level (mq, sq);
592
  GST_MULTI_QUEUE_MUTEX_UNLOCK (mq);
593
594
595
}

static GstFlowReturn
596
597
598
gst_single_queue_push_one (GstMultiQueue * mq, GstSingleQueue * sq,
    GstMiniObject * object)
{
599
600
  GstFlowReturn result = GST_FLOW_OK;

601
  if (GST_IS_BUFFER (object)) {
602
    GstBuffer *buffer;
603
    GstClockTime timestamp, duration;
604

605
    buffer = GST_BUFFER_CAST (object);
606
607
    timestamp = GST_BUFFER_TIMESTAMP (buffer);
    duration = GST_BUFFER_DURATION (buffer);
608

609
    apply_buffer (mq, sq, timestamp, duration, &sq->src_segment);
610

611
612
613
    /* Applying the buffer may have made the queue non-full again, unblock it if needed */
    gst_data_queue_limits_changed (sq->queue);

614
615
616
617
    GST_DEBUG_OBJECT (mq,
        "SingleQueue %d : Pushing buffer %p with ts %" GST_TIME_FORMAT,
        sq->id, buffer, GST_TIME_ARGS (timestamp));

618
    result = gst_pad_push (sq->srcpad, buffer);
619
  } else if (GST_IS_EVENT (object)) {
620
621
    GstEvent *event;

622
623
624
625
626
627
628
629
    event = GST_EVENT_CAST (object);

    switch (GST_EVENT_TYPE (event)) {
      case GST_EVENT_EOS:
        result = GST_FLOW_UNEXPECTED;
        break;
      case GST_EVENT_NEWSEGMENT:
        apply_segment (mq, sq, event, &sq->src_segment);
630
631
        /* Applying the segment may have made the queue non-full again, unblock it if needed */
        gst_data_queue_limits_changed (sq->queue);
632
633
634
        break;
      default:
        break;
635
    }
636

637
638
639
640
    GST_DEBUG_OBJECT (mq,
        "SingleQueue %d : Pushing event %p of type %s",
        sq->id, event, GST_EVENT_TYPE_NAME (event));

641
    gst_pad_push_event (sq->srcpad, event);
642
643
644
645
  } else {
    g_warning ("Unexpected object in singlequeue %d (refcounting problem?)",
        sq->id);
  }
646
  return result;
647

648
649
650
651
652
653
654
655
656
657
658
659
  /* ERRORS */
}

static GstMiniObject *
gst_multi_queue_item_steal_object (GstMultiQueueItem * item)
{
  GstMiniObject *res;

  res = item->object;
  item->object = NULL;

  return res;
660
661
662
663
664
}

static void
gst_multi_queue_item_destroy (GstMultiQueueItem * item)
{
665
666
  if (item->object)
    gst_mini_object_unref (item->object);
667
  g_free (item);
668
669
}

670
/* takes ownership of passed mini object! */
671
static GstMultiQueueItem *
672
gst_multi_queue_item_new (GstMiniObject * object, guint32 curid)
673
674
675
{
  GstMultiQueueItem *item;

676
  item = g_new (GstMultiQueueItem, 1);
677
  item->object = object;
678
  item->destroy = (GDestroyNotify) gst_multi_queue_item_destroy;
679
  item->posid = curid;
680
681
682
683
684
685
686

  if (GST_IS_BUFFER (object)) {
    item->size = GST_BUFFER_SIZE (object);
    item->duration = GST_BUFFER_DURATION (object);
    if (item->duration == GST_CLOCK_TIME_NONE)
      item->duration = 0;
    item->visible = TRUE;
687
688
689
690
  } else {
    item->size = 0;
    item->duration = 0;
    item->visible = FALSE;
691
692
693
694
  }
  return item;
}

695
696
697
698
/* Each main loop attempts to push buffers until the return value
 * is not-linked. not-linked pads are not allowed to push data beyond
 * any linked pads, so they don't 'rush ahead of the pack'.
 */
699
700
701
702
703
704
705
706
707
static void
gst_multi_queue_loop (GstPad * pad)
{
  GstSingleQueue *sq;
  GstMultiQueueItem *item;
  GstDataQueueItem *sitem;
  GstMultiQueue *mq;
  GstMiniObject *object;
  guint32 newid;
708
  guint32 oldid = G_MAXUINT32;
709
  GstFlowReturn result;
710
711

  sq = (GstSingleQueue *) gst_pad_get_element_private (pad);
712
  mq = sq->mqueue;
713

714
715
  do {
    GST_DEBUG_OBJECT (mq, "SingleQueue %d : trying to pop an object", sq->id);
716

717
718
719
720
    /* Get something from the queue, blocking until that happens, or we get
     * flushed */
    if (!(gst_data_queue_pop (sq->queue, &sitem)))
      goto out_flushing;
721

722
723
    item = (GstMultiQueueItem *) sitem;
    newid = item->posid;
724

725
726
727
    /* steal the object and destroy the item */
    object = gst_multi_queue_item_steal_object (item);
    gst_multi_queue_item_destroy (item);
728

729
730
    GST_LOG_OBJECT (mq, "SingleQueue %d : newid:%d , oldid:%d",
        sq->id, newid, oldid);
731

732
733
734
735
736
737
738
739
740
    /* If we're not-linked, we do some extra work because we might need to
     * wait before pushing. If we're linked but there's a gap in the IDs,
     * or it's the first loop, or we just passed the previous highid, 
     * we might need to wake some sleeping pad up, so there's extra work 
     * there too */
    if (sq->srcresult == GST_FLOW_NOT_LINKED ||
        (oldid == -1) || (newid != (oldid + 1)) || oldid > mq->highid) {
      GST_LOG_OBJECT (mq, "CHECKING sq->srcresult: %s",
          gst_flow_get_name (sq->srcresult));
741

742
743
744
      GST_MULTI_QUEUE_MUTEX_LOCK (mq);

      /* Update the nextid so other threads know when to wake us up */
745
746
      sq->nextid = newid;

747
748
749
      /* Update the oldid (the last ID we output) for highid tracking */
      if (oldid != -1)
        sq->oldid = oldid;
750

751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
      if (sq->srcresult == GST_FLOW_NOT_LINKED) {
        /* Go to sleep until it's time to push this buffer */

        /* Recompute the highid */
        compute_high_id (mq);
        while (newid > mq->highid && sq->srcresult == GST_FLOW_NOT_LINKED) {
          GST_DEBUG_OBJECT (mq, "queue %d sleeping for not-linked wakeup with "
              "newid %u and highid %u", sq->id, newid, mq->highid);


          /* Wake up all non-linked pads before we sleep */
          wake_up_next_non_linked (mq);

          mq->numwaiting++;
          g_cond_wait (sq->turn, mq->qlock);
          mq->numwaiting--;

          GST_DEBUG_OBJECT (mq, "queue %d woken from sleeping for not-linked "
              "wakeup with newid %u and highid %u", sq->id, newid, mq->highid);
        }

        /* Re-compute the high_id in case someone else pushed */
        compute_high_id (mq);
      } else {
        compute_high_id (mq);
        /* Wake up all non-linked pads */
        wake_up_next_non_linked (mq);
      }
      /* We're done waiting, we can clear the nextid */
      sq->nextid = 0;
781
782
783

      GST_MULTI_QUEUE_MUTEX_UNLOCK (mq);
    }
784

785
786
    GST_LOG_OBJECT (mq, "BEFORE PUSHING sq->srcresult: %s",
        gst_flow_get_name (sq->srcresult));
787

788
789
790
    /* Try to push out the new object */
    result = gst_single_queue_push_one (mq, sq, object);
    sq->srcresult = result;
791

792
793
    if (result != GST_FLOW_OK && result != GST_FLOW_NOT_LINKED)
      goto out_flushing;
794

795
796
    GST_LOG_OBJECT (mq, "AFTER PUSHING sq->srcresult: %s",
        gst_flow_get_name (sq->srcresult));
797

798
799
800
    oldid = newid;
  }
  while (TRUE);
801
802
803

out_flushing:
  {
804
805
806
807
808
809
    /* Need to make sure wake up any sleeping pads when we exit */
    GST_MULTI_QUEUE_MUTEX_LOCK (mq);
    compute_high_id (mq);
    wake_up_next_non_linked (mq);
    GST_MULTI_QUEUE_MUTEX_UNLOCK (mq);

810
    gst_data_queue_set_flushing (sq->queue, TRUE);
811
812
813
814
    gst_pad_pause_task (sq->srcpad);
    GST_CAT_LOG_OBJECT (multi_queue_debug, mq,
        "SingleQueue[%d] task paused, reason:%s",
        sq->id, gst_flow_get_name (sq->srcresult));
815
    return;
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
  }
}

/**
 * gst_multi_queue_chain:
 *
 * This is similar to GstQueue's chain function, except:
 * _ we don't have leak behavioures,
 * _ we push with a unique id (curid)
 */
static GstFlowReturn
gst_multi_queue_chain (GstPad * pad, GstBuffer * buffer)
{
  GstSingleQueue *sq;
  GstMultiQueue *mq;
  GstMultiQueueItem *item;
  GstFlowReturn ret = GST_FLOW_OK;
  guint32 curid;
834
  GstClockTime timestamp, duration;
835
836
837
838

  sq = gst_pad_get_element_private (pad);
  mq = (GstMultiQueue *) gst_pad_get_parent (pad);

839
  /* Get a unique incrementing id */
840
841
842
843
  GST_MULTI_QUEUE_MUTEX_LOCK (mq);
  curid = mq->counter++;
  GST_MULTI_QUEUE_MUTEX_UNLOCK (mq);

844
845
  GST_LOG_OBJECT (mq, "SingleQueue %d : about to enqueue buffer %p with id %d",
      sq->id, buffer, curid);
846

847
  item = gst_multi_queue_item_new (GST_MINI_OBJECT_CAST (buffer), curid);
848

849
850
851
  timestamp = GST_BUFFER_TIMESTAMP (buffer);
  duration = GST_BUFFER_DURATION (buffer);

852
853
854
  if (!(gst_data_queue_push (sq->queue, (GstDataQueueItem *) item)))
    goto flushing;

855
856
  /* update time level, we must do this after pushing the data in the queue so
   * that we never end up filling the queue first. */
857
  apply_buffer (mq, sq, timestamp, duration, &sq->sink_segment);
858

859
done:
860
  gst_object_unref (mq);
861

862
  return ret;
863
864
865
866
867
868
869
870
871
872

  /* ERRORS */
flushing:
  {
    ret = sq->srcresult;
    GST_LOG_OBJECT (mq, "SingleQueue %d : exit because task paused, reason: %s",
        sq->id, gst_flow_get_name (ret));
    gst_multi_queue_item_destroy (item);
    goto done;
  }
873
874
875
876
877
878
879
880
881
}

static gboolean
gst_multi_queue_sink_activate_push (GstPad * pad, gboolean active)
{
  GstSingleQueue *sq;

  sq = (GstSingleQueue *) gst_pad_get_element_private (pad);

882
  if (active) {
883
884
    /* All pads start off not-linked for a smooth kick-off */
    sq->srcresult = GST_FLOW_NOT_LINKED;
885
  } else {
886
887
888
889
890
891
892
893
894
895
896
897
898
    sq->srcresult = GST_FLOW_WRONG_STATE;
    gst_data_queue_flush (sq->queue);
  }
  return TRUE;
}

static gboolean
gst_multi_queue_sink_event (GstPad * pad, GstEvent * event)
{
  GstSingleQueue *sq;
  GstMultiQueue *mq;
  guint32 curid;
  GstMultiQueueItem *item;
899
900
  gboolean res;
  GstEventType type;
901
  GstEvent *sref = NULL;
902
903
904
905

  sq = (GstSingleQueue *) gst_pad_get_element_private (pad);
  mq = (GstMultiQueue *) gst_pad_get_parent (pad);

906
907
908
  type = GST_EVENT_TYPE (event);

  switch (type) {
909
910
911
912
    case GST_EVENT_FLUSH_START:
      GST_DEBUG_OBJECT (mq, "SingleQueue %d : received flush start event",
          sq->id);

913
      res = gst_pad_push_event (sq->srcpad, event);
914

915
      gst_single_queue_flush (mq, sq, TRUE);
916
917
918
919
920
921
      goto done;

    case GST_EVENT_FLUSH_STOP:
      GST_DEBUG_OBJECT (mq, "SingleQueue %d : received flush stop event",
          sq->id);

922
      res = gst_pad_push_event (sq->srcpad, event);
923

924
      gst_single_queue_flush (mq, sq, FALSE);
925
      goto done;
926
927
928
929
930
    case GST_EVENT_NEWSEGMENT:
      /* take ref because the queue will take ownership and we need the event
       * afterwards to update the segment */
      sref = gst_event_ref (event);
      break;
931
932
933

    default:
      if (!(GST_EVENT_IS_SERIALIZED (event))) {
934
        res = gst_pad_push_event (sq->srcpad, event);
935
936
937
938
939
940
941
942
943
944
        goto done;
      }
      break;
  }

  /* Get an unique incrementing id */
  GST_MULTI_QUEUE_MUTEX_LOCK (mq);
  curid = mq->counter++;
  GST_MULTI_QUEUE_MUTEX_UNLOCK (mq);

945
  item = gst_multi_queue_item_new ((GstMiniObject *) event, curid);
946
947

  GST_DEBUG_OBJECT (mq,
948
949
      "SingleQueue %d : Enqueuing event %p of type %s with id %d",
      sq->id, event, GST_EVENT_TYPE_NAME (event), curid);
950

951
952
953
954
955
956
957
  if (!(res = gst_data_queue_push (sq->queue, (GstDataQueueItem *) item)))
    goto flushing;

  /* mark EOS when we received one, we must do that after putting the
   * buffer in the queue because EOS marks the buffer as filled. No need to take
   * a lock, the _check_full happens from this thread only, right before pushing
   * into dataqueue. */
958
959
960
961
962
  switch (type) {
    case GST_EVENT_EOS:
      sq->is_eos = TRUE;
      break;
    case GST_EVENT_NEWSEGMENT:
963
      apply_segment (mq, sq, sref, &sq->sink_segment);
964
      gst_event_unref (sref);
965
966
967
968
      break;
    default:
      break;
  }
969
970
971
972
973
974
done:
  gst_object_unref (mq);
  return res;

flushing:
  {
975
976
    GST_LOG_OBJECT (mq, "SingleQueue %d : exit because task paused, reason: %s",
        sq->id, gst_flow_get_name (sq->srcresult));
977
978
    if (sref)
      gst_event_unref (sref);
979
    gst_multi_queue_item_destroy (item);
980
    goto done;
981
982
983
984
985
986
987
988
989
990
991
992
993
994
995
996
997
998
999
1000
1001
1002
1003
1004
1005
1006
1007
1008
1009
1010
1011
1012
1013
1014
1015
1016
1017
1018
  }
}

static GstCaps *
gst_multi_queue_getcaps (GstPad * pad)
{
  GstSingleQueue *sq = gst_pad_get_element_private (pad);
  GstPad *otherpad;
  GstCaps *result;

  otherpad = (pad == sq->srcpad) ? sq->sinkpad : sq->srcpad;

  GST_LOG_OBJECT (otherpad, "Getting caps from the peer of this pad");

  result = gst_pad_peer_get_caps (otherpad);
  if (result == NULL)
    result = gst_caps_new_any ();

  return result;
}

static GstFlowReturn
gst_multi_queue_bufferalloc (GstPad * pad, guint64 offset, guint size,
    GstCaps * caps, GstBuffer ** buf)
{
  GstSingleQueue *sq = gst_pad_get_element_private (pad);

  return gst_pad_alloc_buffer (sq->srcpad, offset, size, caps, buf);
}

static gboolean
gst_multi_queue_src_activate_push (GstPad * pad, gboolean active)
{
  GstMultiQueue *mq;
  GstSingleQueue *sq;
  gboolean result = FALSE;

  sq = (GstSingleQueue *) gst_pad_get_element_private (pad);
1019
  mq = sq->mqueue;
1020

1021
  GST_DEBUG_OBJECT (mq, "SingleQueue %d", sq->id);
1022
1023

  if (active) {
1024
    result = gst_single_queue_flush (mq, sq, FALSE);
1025
  } else {
1026
1027
1028
    result = gst_single_queue_flush (mq, sq, TRUE);
    /* make sure streaming finishes */
    result |= gst_pad_stop_task (pad);
1029
1030
1031
1032
1033
1034
1035
1036
1037
1038
1039
1040
1041
1042
1043
1044
1045
1046
1047
1048
1049
1050
1051
1052
1053
  }
  return result;
}

static gboolean
gst_multi_queue_acceptcaps (GstPad * pad, GstCaps * caps)
{
  return TRUE;
}

static gboolean
gst_multi_queue_src_event (GstPad * pad, GstEvent * event)
{
  GstSingleQueue *sq = gst_pad_get_element_private (pad);

  return gst_pad_push_event (sq->sinkpad, event);
}

static gboolean
gst_multi_queue_src_query (GstPad * pad, GstQuery * query)
{
  GstSingleQueue *sq = gst_pad_get_element_private (pad);
  GstPad *peerpad;
  gboolean res;

1054
  /* FIXME, Handle position offset depending on queue size */
1055
1056
1057
1058
1059
1060
1061
1062
1063
1064
1065

  /* default handling */
  if (!(peerpad = gst_pad_get_peer (sq->sinkpad)))
    goto no_peer;

  res = gst_pad_query (peerpad, query);

  gst_object_unref (peerpad);

  return res;

1066
  /* ERRORS */
1067
1068
1069
1070
1071
1072
1073
1074
1075
1076
1077
1078
1079
1080
1081
1082
1083
1084
no_peer:
  {
    GST_LOG_OBJECT (sq->sinkpad, "Couldn't send query because we have no peer");
    return FALSE;
  }
}

/*
 * Next-non-linked functions
 */

/* WITH LOCK TAKEN */
static void
wake_up_next_non_linked (GstMultiQueue * mq)
{
  GList *tmp;

  /* maybe no-one is waiting */
1085
  if (mq->numwaiting < 1)
1086
1087
    return;

1088
  /* Else figure out which singlequeue(s) need waking up */
1089
1090
1091
  for (tmp = mq->queues; tmp; tmp = g_list_next (tmp)) {
    GstSingleQueue *sq = (GstSingleQueue *) tmp->data;

1092
1093
    if (sq->srcresult == GST_FLOW_NOT_LINKED) {
      if (sq->nextid != 0 && sq->nextid <= mq->highid) {
1094
1095
1096
        GST_LOG_OBJECT (mq, "Waking up singlequeue %d", sq->id);
        g_cond_signal (sq->turn);
      }
1097
    }
1098
1099
1100
1101
1102
  }
}

/* WITH LOCK TAKEN */
static void
1103
compute_high_id (GstMultiQueue * mq)
1104
{
1105
1106
  /* The high-id is either the highest id among the linked pads, or if all
   * pads are not-linked, it's the lowest not-linked pad */
1107
1108
  GList *tmp;
  guint32 lowest = G_MAXUINT32;
1109
  guint32 highid = G_MAXUINT32;
1110
1111
1112
1113

  for (tmp = mq->queues; tmp; tmp = g_list_next (tmp)) {
    GstSingleQueue *sq = (GstSingleQueue *) tmp->data;

1114
    GST_LOG_OBJECT (mq, "inspecting sq:%d , nextid:%d, oldid:%d, srcresult:%s",
1115
1116
        sq->id, sq->nextid, sq->oldid, gst_flow_get_name (sq->srcresult));

1117
1118
1119
1120
1121
    if (sq->srcresult == GST_FLOW_NOT_LINKED) {
      /* No need to consider queues which are not waiting */
      if (sq->nextid == 0) {
        GST_LOG_OBJECT (mq, "sq:%d is not waiting - ignoring", sq->id);
        continue;
1122
1123
      }

1124
1125
1126
1127
1128
1129
1130
1131
1132
      if (sq->nextid < lowest)
        lowest = sq->nextid;
    } else if (sq->srcresult != GST_FLOW_UNEXPECTED) {
      /* If we don't have a global highid, or the global highid is lower than
       * this single queue's last outputted id, store the queue's one, 
       * unless the singlequeue is at EOS (srcresult = UNEXPECTED) */
      if ((highid == G_MAXUINT32) || (sq->oldid > highid))
        highid = sq->oldid;
    }
1133
1134
  }

1135
1136
1137
1138
1139
1140
1141
  if (highid == G_MAXUINT32 || lowest < highid)
    mq->highid = lowest;
  else
    mq->highid = highid;

  GST_LOG_OBJECT (mq, "Highid is now : %u, lowest non-linked %u", mq->highid,
      lowest);
1142
1143
}

1144
1145
1146
#define IS_FILLED(format, value) ((sq->max_size.format) != 0 && \
     (sq->max_size.format) <= (value))

1147
1148
1149
1150
/*
 * GstSingleQueue functions
 */
static void
1151
single_queue_overrun_cb (GstDataQueue * dq, GstSingleQueue * sq)
1152
{
1153
1154
  GstMultiQueue *mq = sq->mqueue;
  GList *tmp;
1155
  GstDataQueueSize size;
1156
  gboolean filled = FALSE;
1157
1158

  gst_data_queue_get_level (sq->queue, &size);
1159

1160
  GST_LOG_OBJECT (mq, "Single Queue %d is full", sq->id);
1161

1162
1163
1164
  GST_MULTI_QUEUE_MUTEX_LOCK (mq);
  for (tmp = mq->queues; tmp; tmp = g_list_next (tmp)) {
    GstSingleQueue *ssq = (GstSingleQueue *) tmp->data;
1165
    GstDataQueueSize ssize;
1166

1167
1168
    GST_LOG_OBJECT (mq, "Checking Queue %d", ssq->id);

1169
    if (gst_data_queue_is_empty (ssq->queue)) {
1170
      GST_LOG_OBJECT (mq, "Queue %d is empty", ssq->id);
1171
      if (IS_FILLED (visible, size.visible)) {
1172
        sq->max_size.visible++;
1173
        GST_DEBUG_OBJECT (mq,
1174
1175
            "Another queue is empty, bumping single queue %d max visible to %d",
            sq->id, sq->max_size.visible);
1176
      }
1177
1178
      GST_MULTI_QUEUE_MUTEX_UNLOCK (mq);
      goto beach;
1179
    }
1180
1181
1182
    /* check if we reached the hard time/bytes limits */
    gst_data_queue_get_level (ssq->queue, &ssize);

1183
1184
1185
1186
1187
    GST_DEBUG_OBJECT (mq,
        "queue %d: visible %u/%u, bytes %u/%u, time %" G_GUINT64_FORMAT "/%"
        G_GUINT64_FORMAT, ssq->id, ssize.visible, sq->max_size.visible,
        ssize.bytes, sq->max_size.bytes, sq->cur_time, sq->max_size.time);

1188
    /* if this queue is filled completely we must signal overrun */
1189
1190
    if (IS_FILLED (bytes, ssize.bytes) || IS_FILLED (time, sq->cur_time)) {
      GST_LOG_OBJECT (mq, "Queue %d is filled", ssq->id);
1191
1192
      filled = TRUE;
    }
1193
  }
1194
  /* no queues were empty */
1195
  GST_MULTI_QUEUE_MUTEX_UNLOCK (mq);
1196

1197
  /* Overrun is always forwarded, since this is blocking the upstream element */
1198
1199
1200
1201
  if (filled) {
    GST_DEBUG_OBJECT (mq, "A queue is filled, signalling overrun");
    g_signal_emit (G_OBJECT (mq), gst_multi_queue_signals[SIGNAL_OVERRUN], 0);
  }
1202
1203
1204

beach:
  return;
1205
1206
1207
}

static void
1208
single_queue_underrun_cb (GstDataQueue * dq, GstSingleQueue * sq)
1209
1210
{
  gboolean empty = TRUE;
1211
  GstMultiQueue *mq = sq->mqueue;
1212
1213
1214
  GList *tmp;

  GST_LOG_OBJECT (mq,
1215
      "Single Queue %d is empty, Checking other single queues", sq->id);
1216
1217
1218
1219
1220

  GST_MULTI_QUEUE_MUTEX_LOCK (mq);
  for (tmp = mq->queues; tmp; tmp = g_list_next (tmp)) {
    GstSingleQueue *sq = (GstSingleQueue *) tmp->data;

1221
1222
1223
1224
    if (gst_data_queue_is_full (sq->queue)) {
      GstDataQueueSize size;

      gst_data_queue_get_level (sq->queue, &size);
1225
      if (IS_FILLED (visible, size.visible)) {
1226
1227
1228
1229
1230
1231
        sq->max_size.visible++;
        GST_DEBUG_OBJECT (mq,
            "queue %d is filled, bumping its max visible to %d", sq->id,
            sq->max_size.visible);
        gst_data_queue_limits_changed (sq->queue);
      }
1232
    }
1233
1234
    if (!gst_data_queue_is_empty (sq->queue))
      empty = FALSE;
1235
1236
1237
1238
1239
1240
1241
1242
1243
1244
1245
1246
1247
1248
1249
  }
  GST_MULTI_QUEUE_MUTEX_UNLOCK (mq);

  if (empty) {
    GST_DEBUG_OBJECT (mq, "All queues are empty, signalling it");
    g_signal_emit (G_OBJECT (mq), gst_multi_queue_signals[SIGNAL_UNDERRUN], 0);
  }
}

static gboolean
single_queue_check_full (GstDataQueue * dataq, guint visible, guint bytes,
    guint64 time, GstSingleQueue * sq)
{
  gboolean res;

1250
1251
1252
1253
1254
1255
1256
  GST_DEBUG ("queue %d: visible %u/%u, bytes %u/%u, time %" G_GUINT64_FORMAT
      "/%" G_GUINT64_FORMAT, sq->id, visible, sq->max_size.visible, bytes,
      sq->max_size.bytes, sq->cur_time, sq->max_size.time);

  /* we are always filled on EOS */
  if (sq->is_eos)
    return TRUE;
1257

1258
1259
1260
1261
1262
1263
1264
1265
1266
1267
  /* we never go past the max visible items */
  if (IS_FILLED (visible, visible))
    return TRUE;

  if (sq->cur_time != 0) {
    /* if we have valid time in the queue, check */
    res = IS_FILLED (time, sq->cur_time);
  } else {
    /* no valid time, check bytes */
    res = IS_FILLED (bytes, bytes);
1268
  }
1269
1270
1271
  return res;
}

1272
1273
1274
1275
1276
1277
static void
gst_single_queue_free (GstSingleQueue * sq)
{
  /* DRAIN QUEUE */
  gst_data_queue_flush (sq->queue);
  g_object_unref (sq->queue);
1278
  g_cond_free (sq->turn);
1279
1280
1281
  g_free (sq);
}

1282
1283
1284
1285
1286
1287
1288
1289
1290
1291
1292
1293
1294
1295
1296
1297
1298
1299
1300
1301
1302
1303
1304
1305
1306
1307
static GstSingleQueue *
gst_single_queue_new (GstMultiQueue * mqueue)
{
  GstSingleQueue *sq;
  gchar *tmp;

  sq = g_new0 (GstSingleQueue, 1);

  GST_MULTI_QUEUE_MUTEX_LOCK (mqueue);
  sq->id = mqueue->nbqueues++;

  /* copy over max_size and extra_size so we don't need to take the lock
   * any longer when checking if the queue is full. */
  sq->max_size.visible = mqueue->max_size.visible;
  sq->max_size.bytes = mqueue->max_size.bytes;
  sq->max_size.time = mqueue->max_size.time;

  sq->extra_size.visible = mqueue->extra_size.visible;
  sq->extra_size.bytes = mqueue->extra_size.bytes;
  sq->extra_size.time = mqueue->extra_size.time;

  GST_MULTI_QUEUE_MUTEX_UNLOCK (mqueue);

  GST_DEBUG_OBJECT (mqueue, "Creating GstSingleQueue id:%d", sq->id);

  sq->mqueue = mqueue;
1308
  sq->srcresult = GST_FLOW_WRONG_STATE;
1309
  sq->queue = gst_data_queue_new ((GstDataQueueCheckFullFunction)
1310
      single_queue_check_full, sq);
1311
1312
1313
  sq->is_eos = FALSE;
  gst_segment_init (&sq->sink_segment, GST_FORMAT_TIME);
  gst_segment_init (&sq->src_segment, GST_FORMAT_TIME);
1314

1315
1316
  sq->nextid = 0;
  sq->oldid = 0;
1317
1318
  sq->turn = g_cond_new ();

1319
  /* attach to underrun/overrun signals to handle non-starvation  */
1320
  g_signal_connect (G_OBJECT (sq->queue), "full",
1321
      G_CALLBACK (single_queue_overrun_cb), sq);
1322
  g_signal_connect (G_OBJECT (sq->queue), "empty",
1323
      G_CALLBACK (single_queue_underrun_cb), sq);
1324
1325
1326
1327
1328
1329
1330
1331
1332
1333
1334
1335
1336
1337
1338

  tmp = g_strdup_printf ("sink%d", sq->id);
  sq->sinkpad = gst_pad_new_from_static_template (&sinktemplate, tmp);
  g_free (tmp);

  gst_pad_set_chain_function (sq->sinkpad,
      GST_DEBUG_FUNCPTR (gst_multi_queue_chain));
  gst_pad_set_activatepush_function (sq->sinkpad,
      GST_DEBUG_FUNCPTR (gst_multi_queue_sink_activate_push));
  gst_pad_set_event_function (sq->sinkpad,
      GST_DEBUG_FUNCPTR (gst_multi_queue_sink_event));
  gst_pad_set_getcaps_function (sq->sinkpad,
      GST_DEBUG_FUNCPTR (gst_multi_queue_getcaps));
  gst_pad_set_bufferalloc_function (sq->sinkpad,
      GST_DEBUG_FUNCPTR (gst_multi_queue_bufferalloc));
1339
1340
  gst_pad_set_internal_link_function (sq->sinkpad,
      GST_DEBUG_FUNCPTR (gst_multi_queue_get_internal_links));
1341
1342
1343
1344
1345
1346
1347
1348
1349
1350
1351
1352
1353
1354
1355

  tmp = g_strdup_printf ("src%d", sq->id);
  sq->srcpad = gst_pad_new_from_static_template (&srctemplate, tmp);
  g_free (tmp);

  gst_pad_set_activatepush_function (sq->srcpad,
      GST_DEBUG_FUNCPTR (gst_multi_queue_src_activate_push));
  gst_pad_set_acceptcaps_function (sq->srcpad,
      GST_DEBUG_FUNCPTR (gst_multi_queue_acceptcaps));
  gst_pad_set_getcaps_function (sq->srcpad,
      GST_DEBUG_FUNCPTR (gst_multi_queue_getcaps));
  gst_pad_set_event_function (sq->srcpad,
      GST_DEBUG_FUNCPTR (gst_multi_queue_src_event));
  gst_pad_set_query_function (sq->srcpad,
      GST_DEBUG_FUNCPTR (gst_multi_queue_src_query));
1356
1357
  gst_pad_set_internal_link_function (sq->srcpad,
      GST_DEBUG_FUNCPTR (gst_multi_queue_get_internal_links));
1358
1359
1360
1361

  gst_pad_set_element_private (sq->sinkpad, (gpointer) sq);
  gst_pad_set_element_private (sq->srcpad, (gpointer) sq);

1362
  gst_pad_set_active (sq->srcpad, TRUE);
1363
1364
1365
  gst_element_add_pad (GST_ELEMENT (mqueue), sq->srcpad);

  gst_pad_set_active (sq->sinkpad, TRUE);