gstqueue.c 22.3 KB
Newer Older
Erik Walthinsen's avatar
Erik Walthinsen committed
1
2
3
4
/* GStreamer
 * Copyright (C) 1999,2000 Erik Walthinsen <omega@cse.ogi.edu>
 *                    2000 Wim Taymans <wtay@chello.be>
 *
Wim Taymans's avatar
Wim Taymans committed
5
 * gstqueue.c:
Erik Walthinsen's avatar
Erik Walthinsen committed
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
 *
 * This library is free software; you can redistribute it and/or
 * modify it under the terms of the GNU Library General Public
 * License as published by the Free Software Foundation; either
 * version 2 of the License, or (at your option) any later version.
 *
 * This library is distributed in the hope that it will be useful,
 * but WITHOUT ANY WARRANTY; without even the implied warranty of
 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
 * Library General Public License for more details.
 *
 * You should have received a copy of the GNU Library General Public
 * License along with this library; if not, write to the
 * Free Software Foundation, Inc., 59 Temple Place - Suite 330,
 * Boston, MA 02111-1307, USA.
 */

23
24
/* #define DEBUG_ENABLED */
/* #define STATUS_ENABLED */
25

Wim Taymans's avatar
Wim Taymans committed
26
#ifdef STATUS_ENABLED
27
#define STATUS(A) GST_DEBUG(GST_CAT_DATAFLOW, A, GST_ELEMENT_NAME(queue))
Wim Taymans's avatar
Wim Taymans committed
28
#else
Wim Taymans's avatar
Wim Taymans committed
29
#define STATUS(A)
Wim Taymans's avatar
Wim Taymans committed
30
#endif
Erik Walthinsen's avatar
Erik Walthinsen committed
31

Wim Taymans's avatar
Wim Taymans committed
32
33
34

#include "config.h"
#include "gst_private.h"
Erik Walthinsen's avatar
Erik Walthinsen committed
35

Wim Taymans's avatar
Wim Taymans committed
36
#include "gstqueue.h"
37
#include "gstscheduler.h"
38
#include "gstevent.h"
39
#include "gstlog.h"
Erik Walthinsen's avatar
Erik Walthinsen committed
40
41
42

GstElementDetails gst_queue_details = {
  "Queue",
43
  "Generic",
Andy Wingo Wingo's avatar
Andy Wingo Wingo committed
44
  "LGPL",
Erik Walthinsen's avatar
Erik Walthinsen committed
45
46
47
48
49
50
51
52
53
  "Simple data queue",
  VERSION,
  "Erik Walthinsen <omega@cse.ogi.edu>",
  "(C) 1999",
};


/* Queue signals and args */
enum {
54
55
  LOW_WATERMARK,
  HIGH_WATERMARK,
Erik Walthinsen's avatar
Erik Walthinsen committed
56
57
58
59
60
  LAST_SIGNAL
};

enum {
  ARG_0,
61
62
63
64
65
66
67
  ARG_LEVEL_BUFFERS,
  ARG_LEVEL_BYTES,
  ARG_LEVEL_TIME,
  ARG_SIZE_BUFFERS,
  ARG_SIZE_BYTES,
  ARG_SIZE_TIME,
  ARG_LEAKY,
Erik Walthinsen's avatar
Erik Walthinsen committed
68
  ARG_LEVEL,
69
  ARG_MAX_LEVEL,
70
  ARG_MAY_DEADLOCK,
71
  ARG_BLOCK_TIMEOUT,
Erik Walthinsen's avatar
Erik Walthinsen committed
72
73
74
};


75
76
static void			gst_queue_class_init		(GstQueueClass *klass);
static void			gst_queue_init			(GstQueue *queue);
77
static void 			gst_queue_dispose 		(GObject *object);
Erik Walthinsen's avatar
Erik Walthinsen committed
78

79
80
81
82
static void			gst_queue_set_property		(GObject *object, guint prop_id, 
								 const GValue *value, GParamSpec *pspec);
static void			gst_queue_get_property		(GObject *object, guint prop_id, 
								 GValue *value, GParamSpec *pspec);
Wim Taymans's avatar
Wim Taymans committed
83

84
85
86
static void			gst_queue_chain			(GstPad *pad, GstBuffer *buf);
static GstBuffer *		gst_queue_get			(GstPad *pad);
static GstBufferPool* 		gst_queue_get_bufferpool 	(GstPad *pad);
87
	
88
89
90
91
static gboolean 		gst_queue_handle_src_event 	(GstPad *pad, GstEvent *event);


static void			gst_queue_locked_flush		(GstQueue *queue);
Wim Taymans's avatar
Wim Taymans committed
92

93
static GstElementStateReturn	gst_queue_change_state		(GstElement *element);
94
static gboolean			gst_queue_release_locks		(GstElement *element);
Wim Taymans's avatar
Wim Taymans committed
95

96
  
97
#define GST_TYPE_QUEUE_LEAKY (queue_leaky_get_type())
98
static GType
99
queue_leaky_get_type(void) {
100
101
  static GType queue_leaky_type = 0;
  static GEnumValue queue_leaky[] = {
102
103
104
    { GST_QUEUE_NO_LEAK, 		"0", "Not Leaky" },
    { GST_QUEUE_LEAK_UPSTREAM, 		"1", "Leaky on Upstream" },
    { GST_QUEUE_LEAK_DOWNSTREAM, 	"2", "Leaky on Downstream" },
105
106
107
    { 0, NULL, NULL },
  };
  if (!queue_leaky_type) {
108
    queue_leaky_type = g_enum_register_static("GstQueueLeaky", queue_leaky);
109
110
111
  }
  return queue_leaky_type;
}
Erik Walthinsen's avatar
Erik Walthinsen committed
112

113
static GstElementClass *parent_class = NULL;
114
/* static guint gst_queue_signals[LAST_SIGNAL] = { 0 }; */
Erik Walthinsen's avatar
Erik Walthinsen committed
115

116
GType
117
118
gst_queue_get_type(void) 
{
119
  static GType queue_type = 0;
Erik Walthinsen's avatar
Erik Walthinsen committed
120
121

  if (!queue_type) {
122
    static const GTypeInfo queue_info = {
Erik Walthinsen's avatar
Erik Walthinsen committed
123
      sizeof(GstQueueClass),
124
125
126
127
128
129
130
131
      NULL,
      NULL,
      (GClassInitFunc)gst_queue_class_init,
      NULL,
      NULL,
      sizeof(GstQueue),
      4,
      (GInstanceInitFunc)gst_queue_init,
132
      NULL
Erik Walthinsen's avatar
Erik Walthinsen committed
133
    };
134
    queue_type = g_type_register_static (GST_TYPE_ELEMENT, "GstQueue", &queue_info, 0);
Erik Walthinsen's avatar
Erik Walthinsen committed
135
136
137
138
  }
  return queue_type;
}

Wim Taymans's avatar
Wim Taymans committed
139
140
static void
gst_queue_class_init (GstQueueClass *klass)
Wim Taymans's avatar
Wim Taymans committed
141
{
142
  GObjectClass *gobject_class;
Wim Taymans's avatar
Wim Taymans committed
143
  GstElementClass *gstelement_class;
Erik Walthinsen's avatar
Erik Walthinsen committed
144

145
  gobject_class = (GObjectClass*)klass;
Wim Taymans's avatar
Wim Taymans committed
146
  gstelement_class = (GstElementClass*)klass;
Erik Walthinsen's avatar
Erik Walthinsen committed
147

148
  parent_class = g_type_class_ref (GST_TYPE_ELEMENT);
Erik Walthinsen's avatar
Erik Walthinsen committed
149

150
151
152
153
154
155
156
157
158
159
160
161
  g_object_class_install_property (G_OBJECT_CLASS (klass), ARG_LEAKY,
    g_param_spec_enum ("leaky", "Leaky", "Where the queue leaks, if at all.",
                       GST_TYPE_QUEUE_LEAKY, GST_QUEUE_NO_LEAK, G_PARAM_READWRITE));
  g_object_class_install_property (G_OBJECT_CLASS (klass), ARG_LEVEL,
    g_param_spec_int ("level", "Level", "How many buffers are in the queue.",
                      0, G_MAXINT, 0, G_PARAM_READABLE));
  g_object_class_install_property (G_OBJECT_CLASS (klass), ARG_MAX_LEVEL,
    g_param_spec_int ("max_level", "Maximum Level", "How many buffers the queue holds.",
                      0, G_MAXINT, 100, G_PARAM_READWRITE));
  g_object_class_install_property (G_OBJECT_CLASS (klass), ARG_MAY_DEADLOCK,
    g_param_spec_boolean ("may_deadlock", "May Deadlock", "The queue may deadlock if it's full and not PLAYING",
                      TRUE, G_PARAM_READWRITE));
162
163
164
165
166
  g_object_class_install_property (G_OBJECT_CLASS (klass), ARG_BLOCK_TIMEOUT,
    g_param_spec_int ("block_timeout", "Timeout for Block", 
                      "Microseconds until blocked queue times out and returns filler event. "
                      "Value of -1 disables timeout",
                      -1, G_MAXINT, -1, G_PARAM_READWRITE));
167
168
169
170

  gobject_class->dispose                = GST_DEBUG_FUNCPTR (gst_queue_dispose);
  gobject_class->set_property 		= GST_DEBUG_FUNCPTR (gst_queue_set_property);
  gobject_class->get_property 		= GST_DEBUG_FUNCPTR (gst_queue_get_property);
Wim Taymans's avatar
Wim Taymans committed
171

172
173
  gstelement_class->change_state  = GST_DEBUG_FUNCPTR(gst_queue_change_state);
  gstelement_class->release_locks = GST_DEBUG_FUNCPTR(gst_queue_release_locks);
Erik Walthinsen's avatar
Erik Walthinsen committed
174
175
}

176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
static GstPadConnectReturn
gst_queue_connect (GstPad *pad, GstCaps *caps)
{
  GstQueue *queue = GST_QUEUE (gst_pad_get_parent (pad));
  GstPad *otherpad;

  if (pad == queue->srcpad) 
    otherpad = queue->sinkpad;
  else
    otherpad = queue->srcpad;

  return gst_pad_proxy_connect (otherpad, caps);
}

static GstCaps*
gst_queue_getcaps (GstPad *pad, GstCaps *caps)
{
  GstQueue *queue = GST_QUEUE (gst_pad_get_parent (pad));
  GstPad *otherpad;

  if (pad == queue->srcpad) 
    otherpad = queue->sinkpad;
  else
    otherpad = queue->srcpad;

  return gst_pad_get_allowed_caps (otherpad);
}

Wim Taymans's avatar
Wim Taymans committed
204
205
static void
gst_queue_init (GstQueue *queue)
Wim Taymans's avatar
Wim Taymans committed
206
{
207
  /* scheduling on this kind of element is, well, interesting */
208
  GST_FLAG_SET (queue, GST_ELEMENT_DECOUPLED);
209
  GST_FLAG_SET (queue, GST_ELEMENT_EVENT_AWARE);
210

Wim Taymans's avatar
Wim Taymans committed
211
  queue->sinkpad = gst_pad_new ("sink", GST_PAD_SINK);
212
  gst_pad_set_chain_function (queue->sinkpad, GST_DEBUG_FUNCPTR (gst_queue_chain));
213
  gst_element_add_pad (GST_ELEMENT (queue), queue->sinkpad);
214
215
216
  gst_pad_set_bufferpool_function (queue->sinkpad, GST_DEBUG_FUNCPTR (gst_queue_get_bufferpool));
  gst_pad_set_connect_function (queue->sinkpad, GST_DEBUG_FUNCPTR (gst_queue_connect));
  gst_pad_set_getcaps_function (queue->sinkpad, GST_DEBUG_FUNCPTR (gst_queue_getcaps));
217

Wim Taymans's avatar
Wim Taymans committed
218
  queue->srcpad = gst_pad_new ("src", GST_PAD_SRC);
219
  gst_pad_set_get_function (queue->srcpad, GST_DEBUG_FUNCPTR (gst_queue_get));
Wim Taymans's avatar
Wim Taymans committed
220
  gst_element_add_pad (GST_ELEMENT (queue), queue->srcpad);
221
222
  gst_pad_set_connect_function (queue->srcpad, GST_DEBUG_FUNCPTR (gst_queue_connect));
  gst_pad_set_getcaps_function (queue->srcpad, GST_DEBUG_FUNCPTR (gst_queue_getcaps));
223
  gst_pad_set_event_function (queue->srcpad, GST_DEBUG_FUNCPTR (gst_queue_handle_src_event));
Erik Walthinsen's avatar
Erik Walthinsen committed
224

225
  queue->leaky = GST_QUEUE_NO_LEAK;
Erik Walthinsen's avatar
Erik Walthinsen committed
226
227
228
  queue->queue = NULL;
  queue->level_buffers = 0;
  queue->level_bytes = 0;
229
  queue->level_time = 0LL;
230
231
232
  queue->size_buffers = 100;		/* 100 buffers */
  queue->size_bytes = 100 * 1024;	/* 100KB */
  queue->size_time = 1000000000LL;	/* 1sec */
233
  queue->may_deadlock = TRUE;
234
  queue->block_timeout = -1;
235
236
  queue->interrupt = FALSE;
  queue->flush = FALSE;
Erik Walthinsen's avatar
Erik Walthinsen committed
237

238
239
240
241
242
  queue->qlock = g_mutex_new ();
  queue->reader = FALSE;
  queue->writer = FALSE;
  queue->not_empty = g_cond_new ();
  queue->not_full = g_cond_new ();
243
  queue->events = g_async_queue_new();
244
  queue->queue = g_queue_new ();
245
  GST_DEBUG_ELEMENT (GST_CAT_THREAD, queue, "initialized queue's not_empty & not_full conditions");
Erik Walthinsen's avatar
Erik Walthinsen committed
246
247
}

248
249
250
251
252
253
254
255
static void
gst_queue_dispose (GObject *object)
{
  GstQueue *queue = GST_QUEUE (object);

  g_mutex_free (queue->qlock);
  g_cond_free (queue->not_empty);
  g_cond_free (queue->not_full);
256
257
  gst_queue_locked_flush (queue);
  g_queue_free (queue->queue);
258

259
260
  g_async_queue_unref(queue->events);

261
262
263
  G_OBJECT_CLASS (parent_class)->dispose (object);
}

264
265
266
267
268
269
270
271
272
273
static GstBufferPool*
gst_queue_get_bufferpool (GstPad *pad)
{
  GstQueue *queue;

  queue = GST_QUEUE (GST_OBJECT_PARENT (pad));

  return gst_pad_get_bufferpool (queue->srcpad);
}

Wim Taymans's avatar
Wim Taymans committed
274
static void
275
gst_queue_cleanup_data (gpointer data, const gpointer user_data)
276
{
277
  GST_DEBUG_ELEMENT (GST_CAT_DATAFLOW, user_data, "cleaning buffer %p", data);
278

279
  gst_data_unref (GST_DATA (data));
280
281
}

Wim Taymans's avatar
Wim Taymans committed
282
static void
283
gst_queue_locked_flush (GstQueue *queue)
Wim Taymans's avatar
Wim Taymans committed
284
{
285
286
287
288
289
  gpointer data;
  
  while ((data = g_queue_pop_head (queue->queue))) {
    gst_queue_cleanup_data (data, (gpointer) queue);
  }
Wim Taymans's avatar
Wim Taymans committed
290
  queue->timeval = NULL;
291
292
293
  queue->level_buffers = 0;
  queue->level_bytes = 0;
  queue->level_time = 0LL;
294
295
  /* make sure any pending buffers to be added are flushed too */
  queue->flush = TRUE;
Wim Taymans's avatar
Wim Taymans committed
296
297
}

Wim Taymans's avatar
Wim Taymans committed
298
299
static void
gst_queue_chain (GstPad *pad, GstBuffer *buf)
Wim Taymans's avatar
Wim Taymans committed
300
{
Erik Walthinsen's avatar
Erik Walthinsen committed
301
  GstQueue *queue;
302
  gboolean reader;
Erik Walthinsen's avatar
Erik Walthinsen committed
303

Wim Taymans's avatar
Wim Taymans committed
304
305
306
  g_return_if_fail (pad != NULL);
  g_return_if_fail (GST_IS_PAD (pad));
  g_return_if_fail (buf != NULL);
Erik Walthinsen's avatar
Erik Walthinsen committed
307

308
  queue = GST_QUEUE (GST_OBJECT_PARENT (pad));
309
310
311
312
313
  
  /* check for events to send upstream */
  g_async_queue_lock(queue->events);
  while (g_async_queue_length_unlocked(queue->events) > 0){
    GstEvent *event = (GstEvent*)g_async_queue_pop_unlocked(queue->events);
Steve Baker Baker's avatar
Steve Baker Baker committed
314
    GST_DEBUG_ELEMENT (GST_CAT_DATAFLOW, queue, "sending event upstream\n");
315
    gst_pad_event_default (pad, event);
Steve Baker Baker's avatar
Steve Baker Baker committed
316
    GST_DEBUG_ELEMENT (GST_CAT_DATAFLOW, queue, "event sent\n");
317
318
  }
  g_async_queue_unlock(queue->events);
Erik Walthinsen's avatar
Erik Walthinsen committed
319

320
restart:
321
  /* we have to lock the queue since we span threads */
322
  GST_DEBUG_ELEMENT (GST_CAT_DATAFLOW, queue, "locking t:%p", g_thread_self ());
323
  g_mutex_lock (queue->qlock);
324
  GST_DEBUG_ELEMENT (GST_CAT_DATAFLOW, queue, "locked t:%p", g_thread_self ());
325

326
327
328
  /* assume don't need to flush this buffer when the queue is filled */
  queue->flush = FALSE;

329
330
  if (GST_IS_EVENT (buf)) {
    switch (GST_EVENT_TYPE (buf)) {
331
      case GST_EVENT_FLUSH:
332
        GST_DEBUG_ELEMENT (GST_CAT_DATAFLOW, queue, "FLUSH event, flushing queue\n");
333
        gst_queue_locked_flush (queue);
334
335
	break;
      case GST_EVENT_EOS:
336
337
	GST_DEBUG_ELEMENT (GST_CAT_DATAFLOW, queue, "eos in on %s %d\n", 
			   GST_ELEMENT_NAME (queue), queue->level_buffers);
338
	break;
339
      default:
340
	/* we put the event in the queue, we don't have to act ourselves */
341
	break;
342
    }
343
344
  }

345
  GST_DEBUG_ELEMENT (GST_CAT_DATAFLOW, queue, "adding buffer %p of size %d",buf,GST_BUFFER_SIZE(buf));
346

347
  if (queue->level_buffers == queue->size_buffers) {
348
    /* if this is a leaky queue... */
349
    if (queue->leaky) {
350
351
      /* FIXME don't want to leak events! */
      /* if we leak on the upstream side, drop the current buffer */
352
      if (queue->leaky == GST_QUEUE_LEAK_UPSTREAM) {
353
        GST_DEBUG_ELEMENT (GST_CAT_DATAFLOW, queue, "queue is full, leaking buffer on upstream end");
354
355
356
357
        if (GST_IS_EVENT (buf))
          fprintf(stderr, "Error: queue [%s] leaked an event, type:%d\n",
              GST_ELEMENT_NAME(GST_ELEMENT(queue)),
              GST_EVENT_TYPE(GST_EVENT(buf)));
358
          GST_DEBUG_ELEMENT (GST_CAT_DATAFLOW, queue, "queue is full, leaking buffer on upstream end");
359
        gst_buffer_unref(buf);
360
        /* now we have to clean up and exit right away */
361
        g_mutex_unlock (queue->qlock);
362
363
        return;
      }
364
      /* otherwise we have to push a buffer off the other end */
365
      else {
366
        gpointer front;
367
        GstBuffer *leakbuf;
368

369
        GST_DEBUG_ELEMENT (GST_CAT_DATAFLOW, queue, "queue is full, leaking buffer on downstream end");
370
371
372
373
374

        front = g_queue_pop_head (queue->queue);
        leakbuf = (GstBuffer *)(front);

        if (GST_IS_EVENT (leakbuf)) {
375
376
377
          fprintf(stderr, "Error: queue [%s] leaked an event, type:%d\n",
              GST_ELEMENT_NAME(GST_ELEMENT(queue)),
              GST_EVENT_TYPE(GST_EVENT(leakbuf)));
378
	}
379
380
        queue->level_buffers--;
        queue->level_bytes -= GST_BUFFER_SIZE(leakbuf);
381
        gst_data_unref (GST_DATA (leakbuf));
382
383
      }
    }
Erik Walthinsen's avatar
Erik Walthinsen committed
384

385
    GST_DEBUG_ELEMENT (GST_CAT_DATAFLOW, queue, "pre full wait, level:%d/%d",
386
387
        		queue->level_buffers, queue->size_buffers);

388
    while (queue->level_buffers == queue->size_buffers) {
389
390
      /* if there's a pending state change for this queue or its manager, switch */
      /* back to iterator so bottom half of state change executes */
391
      if (queue->interrupt) {
392
        GST_DEBUG_ELEMENT (GST_CAT_DATAFLOW, queue, "interrupted!!");
393
        g_mutex_unlock (queue->qlock);
394
	if (gst_scheduler_interrupt (gst_pad_get_scheduler (queue->sinkpad), GST_ELEMENT (queue)))
395
          return;
396
397
398
399
400
401
402
	/* if we got here bacause we were unlocked after a flush, we don't need
	 * to add the buffer to the queue again */
	if (queue->flush) {
          GST_DEBUG_ELEMENT (GST_CAT_DATAFLOW, queue, "not adding pending buffer after flush");
	  return;
	}
        GST_DEBUG_ELEMENT (GST_CAT_DATAFLOW, queue, "adding pending buffer after interrupt");
403
	goto restart;
404
      }
405
406
407
408
      if (GST_STATE (queue) != GST_STATE_PLAYING) {
	/* this means the other end is shut down */
	/* try to signal to resolve the error */
	if (!queue->may_deadlock) {
409
          gst_data_unref (GST_DATA (buf));
410
411
412
413
414
          g_mutex_unlock (queue->qlock);
          gst_element_error (GST_ELEMENT (queue), "deadlock found, source pad elements are shut down");
	  return;
	}
	else {
415
          g_print ("%s: waiting for the app to restart source pad elements\n", GST_ELEMENT_NAME (queue));
416
417
	}
      }
418

419
420
      GST_DEBUG_ELEMENT (GST_CAT_DATAFLOW, queue, "waiting for not_full, level:%d/%d", 
		      queue->level_buffers, queue->size_buffers);
421
      if (queue->writer)
422
        GST_DEBUG_ELEMENT (GST_CAT_DATAFLOW, queue, "WARNING: multiple writers on queue!");
423
424
425
      queue->writer = TRUE;
      g_cond_wait (queue->not_full, queue->qlock);
      queue->writer = FALSE;
426
      GST_DEBUG_ELEMENT (GST_CAT_DATAFLOW, queue, "got not_full signal");
427
    }
428
    GST_DEBUG_ELEMENT (GST_CAT_DATAFLOW, queue, "post full wait, level:%d/%d",
429
        queue->level_buffers, queue->size_buffers);
430
431
  }

432
  /* put the buffer on the tail of the list */
433
434
  g_queue_push_tail (queue->queue, buf);

435
  queue->level_buffers++;
436
  queue->level_bytes += GST_BUFFER_SIZE(buf);
437

438
439
440
  /* this assertion _has_ to hold */
  g_assert (queue->queue->length == queue->level_buffers);

441
  GST_DEBUG_ELEMENT (GST_CAT_DATAFLOW, queue, "(%s:%s)+ level:%d/%d",
442
443
444
445
446
447
448
      GST_DEBUG_PAD_NAME(pad),
      queue->level_buffers, queue->size_buffers);

  /* reader waiting on an empty queue */
  reader = queue->reader;

  g_mutex_unlock (queue->qlock);
Erik Walthinsen's avatar
Erik Walthinsen committed
449

450
  if (reader)
451
  {
452
    GST_DEBUG_ELEMENT (GST_CAT_DATAFLOW, queue, "signalling not_empty");
453
    g_cond_signal (queue->not_empty);
Erik Walthinsen's avatar
Erik Walthinsen committed
454
455
456
  }
}

457
static GstBuffer *
Wim Taymans's avatar
Wim Taymans committed
458
gst_queue_get (GstPad *pad)
Wim Taymans's avatar
Wim Taymans committed
459
{
460
  GstQueue *queue;
Erik Walthinsen's avatar
Erik Walthinsen committed
461
  GstBuffer *buf = NULL;
462
  gpointer front;
463
  gboolean writer;
464

465
466
  g_assert(pad != NULL);
  g_assert(GST_IS_PAD(pad));
467
468
469
470
  g_return_val_if_fail (pad != NULL, NULL);
  g_return_val_if_fail (GST_IS_PAD (pad), NULL);

  queue = GST_QUEUE (GST_OBJECT_PARENT (pad));
471

472
restart:
Erik Walthinsen's avatar
Erik Walthinsen committed
473
  /* have to lock for thread-safety */
474
  GST_DEBUG_ELEMENT (GST_CAT_DATAFLOW, queue, "locking t:%p", g_thread_self ());
475
  g_mutex_lock (queue->qlock);
476
  GST_DEBUG_ELEMENT (GST_CAT_DATAFLOW, queue, "locked t:%p %p", g_thread_self (), queue->not_empty);
477

478
  GST_DEBUG_ELEMENT (GST_CAT_DATAFLOW, queue, "pre empty wait, level:%d/%d", queue->level_buffers, queue->size_buffers);
479
  while (queue->level_buffers == 0) {
480
481
482
    /* if there's a pending state change for this queue or its manager, switch
     * back to iterator so bottom half of state change executes
     */ 
483
484
    //while (GST_STATE_PENDING (queue) != GST_STATE_VOID_PENDING) {
    if (queue->interrupt) {
485
      GST_DEBUG_ELEMENT (GST_CAT_DATAFLOW, queue, "interrupted!!");
486
      g_mutex_unlock (queue->qlock);
487
      if (gst_scheduler_interrupt (gst_pad_get_scheduler (queue->srcpad), GST_ELEMENT (queue)))
Wim Taymans's avatar
Wim Taymans committed
488
        return NULL;
489
      goto restart;
490
    }
491
492
493
494
495
    if (GST_STATE (queue) != GST_STATE_PLAYING) {
      /* this means the other end is shut down */
      if (!queue->may_deadlock) {
        g_mutex_unlock (queue->qlock);
        gst_element_error (GST_ELEMENT (queue), "deadlock found, sink pad elements are shut down");
496
        goto restart;
497
498
      }
      else {
499
        g_print ("%s: waiting for the app to restart source pad elements\n", GST_ELEMENT_NAME (queue));
500
501
      }
    }
502

503
    GST_DEBUG_ELEMENT (GST_CAT_DATAFLOW, queue, "waiting for not_empty, level:%d/%d", queue->level_buffers, queue->size_buffers);
504
    if (queue->reader)
505
      GST_DEBUG_ELEMENT (GST_CAT_DATAFLOW, queue, "WARNING: multiple readers on queue!");
506
    queue->reader = TRUE;
507
    
508
509
    //if (queue->block_timeout > -1){
    if (FALSE) {
510
511
512
513
514
      GTimeVal timeout;
      g_get_current_time(&timeout);
      g_time_val_add(&timeout, queue->block_timeout);
      if (!g_cond_timed_wait (queue->not_empty, queue->qlock, &timeout)){
        g_mutex_unlock (queue->qlock);
515
	g_warning ("filler");
516
517
518
519
520
521
        return GST_BUFFER(gst_event_new_filler());
      }
    }
    else {
      g_cond_wait (queue->not_empty, queue->qlock);
    }
522
    queue->reader = FALSE;
523
    GST_DEBUG_ELEMENT (GST_CAT_DATAFLOW, queue, "got not_empty signal");
Erik Walthinsen's avatar
Erik Walthinsen committed
524
  }
525
  GST_DEBUG_ELEMENT (GST_CAT_DATAFLOW, queue, "post empty wait, level:%d/%d", queue->level_buffers, queue->size_buffers);
Erik Walthinsen's avatar
Erik Walthinsen committed
526

527
528
  front = g_queue_pop_head (queue->queue);
  buf = (GstBuffer *)(front);
529
  GST_DEBUG_ELEMENT (GST_CAT_DATAFLOW, queue, "retrieved buffer %p from queue", buf);
530

531
532
  queue->level_buffers--;
  queue->level_bytes -= GST_BUFFER_SIZE(buf);
533

534
  GST_DEBUG_ELEMENT (GST_CAT_DATAFLOW, queue, "(%s:%s)- level:%d/%d",
535
536
537
      GST_DEBUG_PAD_NAME(pad),
      queue->level_buffers, queue->size_buffers);

538
  /* this assertion _has_ to hold */
539
  g_assert (queue->queue->length == queue->level_buffers);
540

541
542
  /* writer waiting on a full queue */
  writer = queue->writer;
543

544
545
546
547
  g_mutex_unlock (queue->qlock);

  if (writer)
  {
548
    GST_DEBUG_ELEMENT (GST_CAT_DATAFLOW, queue, "signalling not_full");
549
550
551
    g_cond_signal (queue->not_full);
  }

552
  /* FIXME where should this be? locked? */
553
554
555
556
  if (GST_IS_EVENT(buf)) {
    GstEvent *event = GST_EVENT(buf);
    switch (GST_EVENT_TYPE(event)) {
      case GST_EVENT_EOS:
557
        GST_DEBUG_ELEMENT (GST_CAT_DATAFLOW, queue, "queue \"%s\" eos", GST_ELEMENT_NAME (queue));
558
        gst_element_set_eos (GST_ELEMENT (queue));
559
560
561
562
563
        break;
      default:
        break;
    }
  }
564

565
  return buf;
Erik Walthinsen's avatar
Erik Walthinsen committed
566
567
}

568
569
570
571
572

static gboolean
gst_queue_handle_src_event (GstPad *pad, GstEvent *event)
{
  GstQueue *queue;
573
  gboolean res;
574
575
576

  queue = GST_QUEUE (GST_OBJECT_PARENT (pad));

577
  g_mutex_lock (queue->qlock);
578
579
580
581
582
583
584
585
586
587
588
589

  if (gst_element_get_state (GST_ELEMENT (queue)) == GST_STATE_PLAYING) {
    /* push the event to the queue for upstream consumption */
    g_async_queue_push(queue->events, event);
    g_mutex_unlock (queue->qlock);
    g_warning ("FIXME: sending event in a running queue");
    /* FIXME wait for delivery of the event here, then return the result
     * instead of FALSE */
    return FALSE;
  }

  res = gst_pad_event_default (pad, event); 
590
591
592
593
594
595
  switch (GST_EVENT_TYPE (event)) {
    case GST_EVENT_FLUSH:
      GST_DEBUG_ELEMENT (GST_CAT_DATAFLOW, queue, "FLUSH event, flushing queue\n");
      gst_queue_locked_flush (queue);
      break;
    case GST_EVENT_SEEK:
596
      if (GST_EVENT_SEEK_FLAGS (event) & GST_SEEK_FLAG_FLUSH) {
597
        gst_queue_locked_flush (queue);
598
      }
599
600
    default:
      break;
601
602
  }
  g_mutex_unlock (queue->qlock);
603

604
605
  /* we have to claim success, but we don't really know */
  return TRUE;
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
}

static gboolean
gst_queue_release_locks (GstElement *element)
{
  GstQueue *queue;

  queue = GST_QUEUE (element);

  g_mutex_lock (queue->qlock);
  queue->interrupt = TRUE;
  g_cond_signal (queue->not_full);
  g_cond_signal (queue->not_empty); 
  g_mutex_unlock (queue->qlock);

  return TRUE;
}

Wim Taymans's avatar
Wim Taymans committed
624
625
static GstElementStateReturn
gst_queue_change_state (GstElement *element)
Wim Taymans's avatar
Wim Taymans committed
626
{
Wim Taymans's avatar
Wim Taymans committed
627
  GstQueue *queue;
628
  GstElementStateReturn ret;
629
  GstElementState new_state;
Wim Taymans's avatar
Wim Taymans committed
630
  g_return_val_if_fail (GST_IS_QUEUE (element), GST_STATE_FAILURE);
Wim Taymans's avatar
Wim Taymans committed
631

Wim Taymans's avatar
Wim Taymans committed
632
  queue = GST_QUEUE (element);
633

634
635
  GST_DEBUG_ENTER("('%s')", GST_ELEMENT_NAME (element));

636
637
638
  /* lock the queue so another thread (not in sync with this thread's state)
   * can't call this queue's _get (or whatever)
   */
639
  g_mutex_lock (queue->qlock);
Wim Taymans's avatar
Wim Taymans committed
640

641
  new_state = GST_STATE_PENDING (element);
Wim Taymans's avatar
Wim Taymans committed
642

643
  if (new_state == GST_STATE_READY) {
644
    gst_queue_locked_flush (queue);
645
  }
646
  else if (new_state == GST_STATE_PLAYING) {
Wim Taymans's avatar
Wim Taymans committed
647
    if (!GST_PAD_IS_USABLE (queue->sinkpad)) {
Wim Taymans's avatar
Wim Taymans committed
648
      GST_DEBUG_ELEMENT (GST_CAT_STATES, queue, "queue %s is not connected", GST_ELEMENT_NAME (queue));
649
      /* FIXME can this be? */
650
651
652
653
654
655
      if (queue->reader)
        g_cond_signal (queue->not_empty);
      g_mutex_unlock (queue->qlock);

      return GST_STATE_FAILURE;
    }
656
    queue->interrupt = FALSE;
657
658
659
660
  }

  ret = GST_ELEMENT_CLASS (parent_class)->change_state (element);
  g_mutex_unlock (queue->qlock);
Wim Taymans's avatar
Wim Taymans committed
661

662
  GST_DEBUG_LEAVE("('%s')", GST_ELEMENT_NAME (element));
663
  return ret;
Wim Taymans's avatar
Wim Taymans committed
664
665
}

Erik Walthinsen's avatar
Erik Walthinsen committed
666

Wim Taymans's avatar
Wim Taymans committed
667
static void
668
gst_queue_set_property (GObject *object, guint prop_id, const GValue *value, GParamSpec *pspec)
Wim Taymans's avatar
Wim Taymans committed
669
{
Erik Walthinsen's avatar
Erik Walthinsen committed
670
671
672
  GstQueue *queue;

  /* it's not null if we got it, but it might not be ours */
Wim Taymans's avatar
Wim Taymans committed
673
  g_return_if_fail (GST_IS_QUEUE (object));
Wim Taymans's avatar
Wim Taymans committed
674

Wim Taymans's avatar
Wim Taymans committed
675
  queue = GST_QUEUE (object);
Erik Walthinsen's avatar
Erik Walthinsen committed
676

677
  switch (prop_id) {
678
    case ARG_LEAKY:
679
      queue->leaky = g_value_get_enum (value);
680
      break;
681
    case ARG_MAX_LEVEL:
682
683
684
685
      queue->size_buffers = g_value_get_int (value);
      break;
    case ARG_MAY_DEADLOCK:
      queue->may_deadlock = g_value_get_boolean (value);
686
      break;
687
688
689
    case ARG_BLOCK_TIMEOUT:
      queue->block_timeout = g_value_get_int (value);
      break;
Erik Walthinsen's avatar
Erik Walthinsen committed
690
    default:
691
      G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
Erik Walthinsen's avatar
Erik Walthinsen committed
692
693
694
695
      break;
  }
}

Wim Taymans's avatar
Wim Taymans committed
696
static void
697
gst_queue_get_property (GObject *object, guint prop_id, GValue *value, GParamSpec *pspec)
Wim Taymans's avatar
Wim Taymans committed
698
{
Erik Walthinsen's avatar
Erik Walthinsen committed
699
700
701
  GstQueue *queue;

  /* it's not null if we got it, but it might not be ours */
Wim Taymans's avatar
Wim Taymans committed
702
  g_return_if_fail (GST_IS_QUEUE (object));
Wim Taymans's avatar
Wim Taymans committed
703

Wim Taymans's avatar
Wim Taymans committed
704
  queue = GST_QUEUE (object);
Erik Walthinsen's avatar
Erik Walthinsen committed
705

706
  switch (prop_id) {
707
    case ARG_LEAKY:
708
      g_value_set_enum (value, queue->leaky);
709
      break;
Erik Walthinsen's avatar
Erik Walthinsen committed
710
    case ARG_LEVEL:
711
      g_value_set_int (value, queue->level_buffers);
Erik Walthinsen's avatar
Erik Walthinsen committed
712
      break;
713
    case ARG_MAX_LEVEL:
714
715
716
717
      g_value_set_int (value, queue->size_buffers);
      break;
    case ARG_MAY_DEADLOCK:
      g_value_set_boolean (value, queue->may_deadlock);
718
      break;
719
720
721
    case ARG_BLOCK_TIMEOUT:
      g_value_set_int (value, queue->block_timeout);
      break;
Erik Walthinsen's avatar
Erik Walthinsen committed
722
    default:
723
      G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
Erik Walthinsen's avatar
Erik Walthinsen committed
724
725
726
      break;
  }
}