gstbasertpdepayload.c 15.8 KB
Newer Older
1
2
/* GStreamer
 * Copyright (C) <2005> Philippe Khalaf <burger@speedy.org> 
3
 * Copyright (C) <2005> Nokia Corporation <kai.vehmanen@nokia.com>
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
 *
 * This library is free software; you can redistribute it and/or
 * modify it under the terms of the GNU Library General Public
 * License as published by the Free Software Foundation; either
 * version 2 of the License, or (at your option) any later version.
 *
 * This library is distributed in the hope that it will be useful,
 * but WITHOUT ANY WARRANTY; without even the implied warranty of
 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
 * Library General Public License for more details.
 *
 * You should have received a copy of the GNU Library General Public
 * License along with this library; if not, write to the
 * Free Software Foundation, Inc., 59 Temple Place - Suite 330,
 * Boston, MA 02111-1307, USA.
 */

#include "gstbasertpdepayload.h"

GST_DEBUG_CATEGORY (basertpdepayload_debug);
#define GST_CAT_DEFAULT (basertpdepayload_debug)

/* Filter signals and args */
enum
{
  /* FILL ME */
  LAST_SIGNAL
};

enum
{
  ARG_0,
36
  ARG_QUEUE_DELAY,
37
38
39
40
41
42
43
44
45
46
};

static GstElementClass *parent_class = NULL;

static void gst_base_rtp_depayload_base_init (GstBaseRTPDepayloadClass * klass);
static void gst_base_rtp_depayload_class_init (GstBaseRTPDepayloadClass *
    klass);
static void gst_base_rtp_depayload_init (GstBaseRTPDepayload * filter,
    gpointer g_class);

47
static void gst_base_rtp_depayload_push (GstBaseRTPDepayload * filter,
48
    GstBuffer * rtp_buf);
49

50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
GType
gst_base_rtp_depayload_get_type (void)
{
  static GType plugin_type = 0;

  if (!plugin_type) {
    static const GTypeInfo plugin_info = {
      sizeof (GstBaseRTPDepayloadClass),
      (GBaseInitFunc) gst_base_rtp_depayload_base_init,
      NULL,
      (GClassInitFunc) gst_base_rtp_depayload_class_init,
      NULL,
      NULL,
      sizeof (GstBaseRTPDepayload),
      0,
      (GInstanceInitFunc) gst_base_rtp_depayload_init,
    };
    plugin_type = g_type_register_static (GST_TYPE_ELEMENT,
        "GstBaseRTPDepayload", &plugin_info, 0);
  }
  return plugin_type;
}

static void gst_base_rtp_depayload_finalize (GObject * object);
static void gst_base_rtp_depayload_set_property (GObject * object,
    guint prop_id, const GValue * value, GParamSpec * pspec);
static void gst_base_rtp_depayload_get_property (GObject * object,
    guint prop_id, GValue * value, GParamSpec * pspec);

static gboolean gst_base_rtp_depayload_setcaps (GstPad * pad, GstCaps * caps);

static GstFlowReturn gst_base_rtp_depayload_chain (GstPad * pad,
    GstBuffer * in);
83
84
static gboolean
gst_base_rtp_depayload_handle_sink_event (GstPad * pad, GstEvent * event);
85

86
87
static GstStateChangeReturn gst_base_rtp_depayload_change_state (GstElement *
    element, GstStateChange transition);
88
static GstFlowReturn gst_base_rtp_depayload_add_to_queue (GstBaseRTPDepayload *
89
    filter, GstBuffer * in);
90
91
92

static void gst_base_rtp_depayload_set_gst_timestamp
    (GstBaseRTPDepayload * filter, guint32 timestamp, GstBuffer * buf);
93

94
95
static void
gst_base_rtp_depayload_wait (GstBaseRTPDepayload * filter, GstClockTime time);
96
97
98
99

static void
gst_base_rtp_depayload_base_init (GstBaseRTPDepayloadClass * klass)
{
100
  /*GstElementClass *element_class = GST_ELEMENT_CLASS (klass); */
101
102
103
104
105
106
}

static void
gst_base_rtp_depayload_class_init (GstBaseRTPDepayloadClass * klass)
{
  GObjectClass *gobject_class;
107
  GstElementClass *gstelement_class;
108
109

  gobject_class = G_OBJECT_CLASS (klass);
110
  gstelement_class = (GstElementClass *) klass;
111
  parent_class = g_type_class_peek_parent (klass);
112
113
114
115

  gobject_class->set_property = gst_base_rtp_depayload_set_property;
  gobject_class->get_property = gst_base_rtp_depayload_get_property;

116
  g_object_class_install_property (gobject_class, ARG_QUEUE_DELAY,
117
118
119
120
121
      g_param_spec_uint ("queue_delay", "Queue Delay",
          "Amount of ms to queue/buffer", 0, G_MAXUINT, 0, G_PARAM_READWRITE));

  gobject_class->finalize = gst_base_rtp_depayload_finalize;

122
123
  gstelement_class->change_state = gst_base_rtp_depayload_change_state;

124
  klass->add_to_queue = gst_base_rtp_depayload_add_to_queue;
125
  klass->set_gst_timestamp = gst_base_rtp_depayload_set_gst_timestamp;
126
127
128
129
130
131
132
133
134
135

  GST_DEBUG_CATEGORY_INIT (basertpdepayload_debug, "basertpdepayload", 0,
      "Base class for RTP Depayloaders");
}

static void
gst_base_rtp_depayload_init (GstBaseRTPDepayload * filter, gpointer g_class)
{
  GstPadTemplate *pad_template;

136
  GST_DEBUG_OBJECT (filter, "init");
137
138
139
140
141
142
143
144

  pad_template =
      gst_element_class_get_pad_template (GST_ELEMENT_CLASS (g_class), "sink");
  g_return_if_fail (pad_template != NULL);
  filter->sinkpad = gst_pad_new_from_template (pad_template, "sink");
  gst_pad_set_setcaps_function (filter->sinkpad,
      gst_base_rtp_depayload_setcaps);
  gst_pad_set_chain_function (filter->sinkpad, gst_base_rtp_depayload_chain);
145
146
  gst_pad_set_event_function (filter->sinkpad,
      gst_base_rtp_depayload_handle_sink_event);
147
148
149
150
151
152
153
154
155
156
  gst_element_add_pad (GST_ELEMENT (filter), filter->sinkpad);

  pad_template =
      gst_element_class_get_pad_template (GST_ELEMENT_CLASS (g_class), "src");
  g_return_if_fail (pad_template != NULL);
  filter->srcpad = gst_pad_new_from_template (pad_template, "src");
  gst_element_add_pad (GST_ELEMENT (filter), filter->srcpad);

  filter->queue = g_queue_new ();

157
  filter->queue_delay = RTP_QUEUE_DELAY;
158
  filter->need_newsegment = TRUE;
159

160
  /* this one needs to be overwritten by child */
161
162
163
164
165
166
167
  filter->clock_rate = 0;
}

static void
gst_base_rtp_depayload_finalize (GObject * object)
{
  g_queue_free (GST_BASE_RTP_DEPAYLOAD (object)->queue);
168
169
170

  if (G_OBJECT_CLASS (parent_class)->finalize)
    G_OBJECT_CLASS (parent_class)->finalize (object);
171
172
173
174
175
176
}

static gboolean
gst_base_rtp_depayload_setcaps (GstPad * pad, GstCaps * caps)
{
  GstBaseRTPDepayload *filter;
Jens Granseuer's avatar
Jens Granseuer committed
177
  GstBaseRTPDepayloadClass *bclass;
178
  gboolean res;
179
180
181
182
183

  filter = GST_BASE_RTP_DEPAYLOAD (gst_pad_get_parent (pad));
  g_return_val_if_fail (filter != NULL, FALSE);
  g_return_val_if_fail (GST_IS_BASE_RTP_DEPAYLOAD (filter), FALSE);

Jens Granseuer's avatar
Jens Granseuer committed
184
  bclass = GST_BASE_RTP_DEPAYLOAD_GET_CLASS (filter);
185
186

  if (bclass->set_caps)
187
    res = bclass->set_caps (filter, caps);
188
  else
189
190
191
192
    res = TRUE;

  gst_object_unref (filter);
  return res;
193
194
195
196
197
198
}

static GstFlowReturn
gst_base_rtp_depayload_chain (GstPad * pad, GstBuffer * in)
{
  GstBaseRTPDepayload *filter;
199
  GstBaseRTPDepayloadClass *bclass;
200
201
202
203
204
205
  GstFlowReturn ret = GST_FLOW_OK;

  filter = GST_BASE_RTP_DEPAYLOAD (GST_OBJECT_PARENT (pad));

  g_return_val_if_fail (filter->clock_rate > 0, GST_FLOW_ERROR);

206
  bclass = GST_BASE_RTP_DEPAYLOAD_GET_CLASS (filter);
207

208
209
  if (filter->queue_delay == 0) {
    GST_DEBUG_OBJECT (filter, "Pushing directly!");
210
    gst_base_rtp_depayload_push (filter, in);
211
212
  } else {
    if (bclass->add_to_queue)
213
      ret = bclass->add_to_queue (filter, in);
214
215
216
217
  }
  return ret;
}

218
219
220
221
222
223
224
225
226
227
static gboolean
gst_base_rtp_depayload_handle_sink_event (GstPad * pad, GstEvent * event)
{
  GstBaseRTPDepayload *filter =
      GST_BASE_RTP_DEPAYLOAD (GST_OBJECT_PARENT (pad));
  gboolean res = TRUE;

  switch (GST_EVENT_TYPE (event)) {
    case GST_EVENT_NEWSEGMENT:
    {
228
229
230
231
232
233
234
235
236
237
238
239
240
241
      /* intercept NEWSEGMENT events only if the packet scheduler thread
         is active */
      if (filter->thread) {
        GST_DEBUG_OBJECT (filter,
            "Upstream sent a NEWSEGMENT, handle in worker thread.");
        /* the worker thread will assign a new RTP-TS<->GST-TS mapping
         * based on the next processed RTP packet */
        filter->need_newsegment = TRUE;
        gst_event_unref (event);
        break;
      } else {
        GST_DEBUG_OBJECT (filter,
            "Upstream sent a NEWSEGMENT, passing through.");
      }
242
    }
243
      /* note: pass through to default if no thread running */
244
245
246
247
248
249
250
251
252
    default:
      /* pass other events forward */
      res = gst_pad_push_event (filter->srcpad, event);
      break;
  }

  return res;
}

253
254
static GstFlowReturn
gst_base_rtp_depayload_add_to_queue (GstBaseRTPDepayload * filter,
255
    GstBuffer * in)
256
257
{
  GQueue *queue = filter->queue;
Jens Granseuer's avatar
Jens Granseuer committed
258
  int i;
259

260
  /* our first packet, just push it */
261
  QUEUE_LOCK (filter);
262
263
  if (g_queue_is_empty (queue)) {
    g_queue_push_tail (queue, in);
264
    QUEUE_UNLOCK (filter);
265
266
267
268
  } else {
    guint16 seqnum, queueseq;
    guint32 timestamp;

269
270
    seqnum = gst_rtp_buffer_get_seq (in);
    queueseq = gst_rtp_buffer_get_seq (GST_BUFFER (g_queue_peek_head (queue)));
271

272
    /* look for right place to insert it */
Jens Granseuer's avatar
Jens Granseuer committed
273
    i = 0;
274

Zeeshan Ali's avatar
Zeeshan Ali committed
275
276
277
    while (seqnum > queueseq) {
      gpointer data;

278
      i++;
Zeeshan Ali's avatar
Zeeshan Ali committed
279
280
281
282
      data = g_queue_peek_nth (queue, i);
      if (!data)
        break;

283
      queueseq = gst_rtp_buffer_get_seq (GST_BUFFER (data));
284
285
    }

286
    /* now insert it at that place */
287
    g_queue_push_nth (queue, in, i);
288
    QUEUE_UNLOCK (filter);
289

290
    timestamp = gst_rtp_buffer_get_timestamp (in);
291

292
293
    GST_DEBUG_OBJECT (filter,
        "Packet added to queue %d at pos %d timestamp %u sn %d",
294
        g_queue_get_length (queue), i, timestamp, seqnum);
295
296
297
298
  }
  return GST_FLOW_OK;
}

299
static void
300
gst_base_rtp_depayload_push (GstBaseRTPDepayload * filter, GstBuffer * rtp_buf)
301
302
303
{
  GstBaseRTPDepayloadClass *bclass = GST_BASE_RTP_DEPAYLOAD_GET_CLASS (filter);
  GstBuffer *out_buf;
Zeeshan Ali's avatar
Zeeshan Ali committed
304
  GstCaps *srccaps;
305

306
  /* let's send it out to processing */
307
  out_buf = bclass->process (filter, rtp_buf);
308
  if (out_buf) {
309
    /* set the caps */
Zeeshan Ali's avatar
Zeeshan Ali committed
310
311
312
313
314
    srccaps = GST_PAD_CAPS (filter->srcpad);

    if (srccaps)
      gst_buffer_set_caps (GST_BUFFER (out_buf), srccaps);

315
316
317
318
319
320
    /* set the timestamp
     * I am assuming here that the timestamp of the last RTP buffer
     * is the same as the timestamp wanted on the collector
     * maybe i should add a way to override this timestamp from the
     * depayloader child class
     */
321
    bclass->set_gst_timestamp (filter, gst_rtp_buffer_get_timestamp (rtp_buf),
322
        out_buf);
323
    /* push it */
324
    GST_DEBUG_OBJECT (filter, "Pushing buffer size %d, timestamp %u",
325
326
        GST_BUFFER_SIZE (out_buf), GST_BUFFER_TIMESTAMP (out_buf));
    gst_pad_push (filter->srcpad, GST_BUFFER (out_buf));
327
    gst_buffer_unref (rtp_buf);
328
    GST_DEBUG_OBJECT (filter, "Pushed buffer");
329
330
331
332
333
334
  }
}

static void
gst_base_rtp_depayload_set_gst_timestamp (GstBaseRTPDepayload * filter,
    guint32 timestamp, GstBuffer * buf)
335
{
336
337
  guint64 ts = ((timestamp * GST_SECOND) / filter->clock_rate);

338
339
340
  /* rtp timestamps are based on the clock_rate
   * gst timesamps are in nanoseconds
   */
341
342
  GST_DEBUG_OBJECT (filter, "calculating ts : timestamp : %u, clockrate : %u",
      timestamp, filter->clock_rate);
343

344
345
346
  /* add delay to timestamp */
  GST_BUFFER_TIMESTAMP (buf) = ts + (filter->queue_delay * GST_MSECOND);

347
  GST_DEBUG_OBJECT (filter, "calculated ts %"
348
349
      GST_TIME_FORMAT, GST_TIME_ARGS (GST_BUFFER_TIMESTAMP (buf)));

350
  /* if this is the first buf send a discont */
351
  if (filter->need_newsegment) {
352
    /* send discont */
353
    GstEvent *event = gst_event_new_new_segment (FALSE, 1.0, GST_FORMAT_TIME,
354
        GST_BUFFER_TIMESTAMP (buf), GST_CLOCK_TIME_NONE, 0);
355
356

    gst_pad_push_event (filter->srcpad, event);
357
358
    filter->need_newsegment = FALSE;
    GST_DEBUG_OBJECT (filter, "Pushed newsegment event on this first buffer");
359
  }
360
361
}

362
363
364
365
static void
gst_base_rtp_depayload_queue_release (GstBaseRTPDepayload * filter)
{
  GQueue *queue = filter->queue;
366
367
  guint32 headts, tailts;
  GstBaseRTPDepayloadClass *bclass;
Jens Granseuer's avatar
Jens Granseuer committed
368
369
  gfloat q_size_secs;
  guint maxtsunits;
370
371
372
373

  if (g_queue_is_empty (queue))
    return;

374
375
376
  /* if our queue is getting to big (more than RTP_QUEUEDELAY ms of data)
   * release heading buffers
   */
377
378
  /*GST_DEBUG_OBJECT (filter, "clockrate %d, queue_delay %d", filter->clock_rate,
     filter->queue_delay); */
Jens Granseuer's avatar
Jens Granseuer committed
379
380
  q_size_secs = (gfloat) filter->queue_delay / 1000;
  maxtsunits = (gfloat) filter->clock_rate * q_size_secs;
381
382

  QUEUE_LOCK (filter);
383
384
385
386
  headts =
      gst_rtp_buffer_get_timestamp (GST_BUFFER (g_queue_peek_head (queue)));
  tailts =
      gst_rtp_buffer_get_timestamp (GST_BUFFER (g_queue_peek_tail (queue)));
387
388
389

  bclass = GST_BASE_RTP_DEPAYLOAD_GET_CLASS (filter);

390
  /*GST_DEBUG("maxtsunit is %u %u %u %u", maxtsunits, headts, tailts, headts - tailts); */
391
  while (headts - tailts > maxtsunits) {
392
    GST_DEBUG_OBJECT (filter, "Poping packet from queue");
393
    if (bclass->process) {
394
      GstBuffer *in = g_queue_pop_head (queue);
395

396
      gst_base_rtp_depayload_push (filter, in);
397
    }
398
    headts =
399
        gst_rtp_buffer_get_timestamp (GST_BUFFER (g_queue_peek_head (queue)));
400
401
402
403
404
405
406
407
408
409
  }
  QUEUE_UNLOCK (filter);
}


static gpointer
gst_base_rtp_depayload_thread (GstBaseRTPDepayload * filter)
{
  while (filter->thread_running) {
    gst_base_rtp_depayload_queue_release (filter);
410
411
412
413
    /* sleep for 5msec (XXX: 5msec is a value that works for audio and video,
     * should be adjusted based on frequency of incoming packet, 
     * or by data comsumption rate of the sink (depends on how
     * clock-drift compensation is implemented) */
414
    gst_base_rtp_depayload_wait (filter, GST_MSECOND * 5);
415
416
417
418
419
420
421
  }
  return NULL;
}

static gboolean
gst_base_rtp_depayload_start_thread (GstBaseRTPDepayload * filter)
{
422
423
424
425
426
427
428
429
430
  /* only launch the thread if processing is needed */
  if (filter->queue_delay) {
    GST_DEBUG_OBJECT (filter, "Starting queue release thread");
    filter->thread_running = TRUE;
    filter->thread =
        g_thread_create ((GThreadFunc) gst_base_rtp_depayload_thread, filter,
        TRUE, NULL);
    GST_DEBUG_OBJECT (filter, "Started queue release thread");
  }
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
  return TRUE;
}

static gboolean
gst_base_rtp_depayload_stop_thread (GstBaseRTPDepayload * filter)
{
  filter->thread_running = FALSE;

  if (filter->thread) {
    g_thread_join (filter->thread);
    filter->thread = NULL;
  }
  QUEUE_LOCK_FREE (filter);
  return TRUE;
}

447
448
449
450
451
452
static void
gst_base_rtp_depayload_wait (GstBaseRTPDepayload * filter, GstClockTime time)
{
  GstClockID id;

  g_return_if_fail (GST_CLOCK_TIME_IS_VALID (time));
453
  if (GST_ELEMENT_CLOCK (filter) == NULL) {
Zeeshan Ali's avatar
Zeeshan Ali committed
454
455
456
    GST_DEBUG_OBJECT (filter, "No clock given yet");
    return;
  }
457

458
  id = gst_clock_new_single_shot_id (GST_ELEMENT_CLOCK (filter), time);
459
460
461
462
463

  gst_clock_id_wait (id, NULL);
  gst_clock_id_unref (id);
}

464
465
466
static GstStateChangeReturn
gst_base_rtp_depayload_change_state (GstElement * element,
    GstStateChange transition)
467
468
469
{
  GstBaseRTPDepayload *filter;

470
471
  g_return_val_if_fail (GST_IS_BASE_RTP_DEPAYLOAD (element),
      GST_STATE_CHANGE_FAILURE);
472
473
474
475
  filter = GST_BASE_RTP_DEPAYLOAD (element);

  /* we disallow changing the state from the thread */
  if (g_thread_self () == filter->thread)
476
    return GST_STATE_CHANGE_FAILURE;
477
478
479


  switch (transition) {
480
    case GST_STATE_CHANGE_NULL_TO_READY:
481
482
483
      if (!gst_base_rtp_depayload_start_thread (filter))
        goto start_failed;
      break;
484
    case GST_STATE_CHANGE_READY_TO_PAUSED:
485
      break;
486
    case GST_STATE_CHANGE_PAUSED_TO_PLAYING:
487
488
489
490
491
492
      break;
    default:
      break;
  }

  switch (transition) {
493
    case GST_STATE_CHANGE_PLAYING_TO_PAUSED:
494
      break;
495
    case GST_STATE_CHANGE_PAUSED_TO_READY:
496
      break;
497
    case GST_STATE_CHANGE_READY_TO_NULL:
498
499
      gst_base_rtp_depayload_stop_thread (filter);
      break;
500
501
    default:
      break;
502
  }
503
  return GST_STATE_CHANGE_SUCCESS;
504
505
506
507

  /* ERRORS */
start_failed:
  {
508
    return GST_STATE_CHANGE_FAILURE;
509
510
511
  }
}

512
513
514
515
516
517
518
519
520
521
static void
gst_base_rtp_depayload_set_property (GObject * object, guint prop_id,
    const GValue * value, GParamSpec * pspec)
{
  GstBaseRTPDepayload *filter;

  g_return_if_fail (GST_IS_BASE_RTP_DEPAYLOAD (object));
  filter = GST_BASE_RTP_DEPAYLOAD (object);

  switch (prop_id) {
522
    case ARG_QUEUE_DELAY:
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
      filter->queue_delay = g_value_get_uint (value);
      break;
    default:
      G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
      break;
  }
}

static void
gst_base_rtp_depayload_get_property (GObject * object, guint prop_id,
    GValue * value, GParamSpec * pspec)
{
  GstBaseRTPDepayload *filter;

  g_return_if_fail (GST_IS_BASE_RTP_DEPAYLOAD (object));
  filter = GST_BASE_RTP_DEPAYLOAD (object);

  switch (prop_id) {
541
    case ARG_QUEUE_DELAY:
542
543
544
545
546
547
548
      g_value_set_uint (value, filter->queue_delay);
      break;
    default:
      G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
      break;
  }
}