gstrtpbin.c 77.4 KB
Newer Older
1
/* GStreamer
2
 * Copyright (C) <2007> Wim Taymans <wim.taymans@gmail.com>
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
 *
 * This library is free software; you can redistribute it and/or
 * modify it under the terms of the GNU Library General Public
 * License as published by the Free Software Foundation; either
 * version 2 of the License, or (at your option) any later version.
 *
 * This library is distributed in the hope that it will be useful,
 * but WITHOUT ANY WARRANTY; without even the implied warranty of
 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
 * Library General Public License for more details.
 *
 * You should have received a copy of the GNU Library General Public
 * License along with this library; if not, write to the
 * Free Software Foundation, Inc., 59 Temple Place - Suite 330,
 * Boston, MA 02111-1307, USA.
 */

/**
21
22
 * SECTION:element-gstrtpbin
 * @see_also: gstrtpjitterbuffer, gstrtpsession, gstrtpptdemux, gstrtpssrcdemux
23
 *
24
25
26
27
28
29
30
31
 * RTP bin combines the functions of #GstRtpSession, #GstRtpsSrcDemux,
 * #GstRtpJitterBuffer and #GstRtpPtDemux in one element. It allows for multiple
 * RTP sessions that will be synchronized together using RTCP SR packets.
 * 
 * #GstRtpBin is configured with a number of request pads that define the
 * functionality that is activated, similar to the #GstRtpSession element.
 * 
 * To use #GstRtpBin as an RTP receiver, request a recv_rtp_sink_%%d pad. The session
Wim Taymans's avatar
Wim Taymans committed
32
 * number must be specified in the pad name. 
33
 * Data received on the recv_rtp_sink_%%d pad will be processed in the gstrtpsession
34
35
 * manager and after being validated forwarded on #GstRtpsSrcDemux element. Each
 * RTP stream is demuxed based on the SSRC and send to a #GstRtpJitterBuffer. After
36
 * the packets are released from the jitterbuffer, they will be forwarded to a
37
 * #GstRtpsSrcDemux element. The #GstRtpsSrcDemux element will demux the packets based
Wim Taymans's avatar
Wim Taymans committed
38
 * on the payload type and will create a unique pad recv_rtp_src_%%d_%%d_%%d on
39
 * gstrtpbin with the session number, SSRC and payload type respectively as the pad
Wim Taymans's avatar
Wim Taymans committed
40
 * name.
41
42
 * 
 * To also use #GstRtpBin as an RTCP receiver, request a recv_rtcp_sink_%%d pad. The
Wim Taymans's avatar
Wim Taymans committed
43
 * session number must be specified in the pad name.
44
 * 
Wim Taymans's avatar
Wim Taymans committed
45
46
47
48
 * If you want the session manager to generate and send RTCP packets, request
 * the send_rtcp_src_%%d pad with the session number in the pad name. Packet pushed
 * on this pad contain SR/RR RTCP reports that should be sent to all participants
 * in the session.
49
50
 * 
 * To use #GstRtpBin as a sender, request a send_rtp_sink_%%d pad, which will
51
52
 * automatically create a send_rtp_src_%%d pad. If the session number is not provided,
 * the pad from the lowest available session will be returned. The session manager will modify the
Wim Taymans's avatar
Wim Taymans committed
53
54
 * SSRC in the RTP packets to its own SSRC and wil forward the packets on the
 * send_rtp_src_%%d pad after updating its internal state.
55
 * 
Wim Taymans's avatar
Wim Taymans committed
56
 * The session manager needs the clock-rate of the payload types it is handling
57
58
 * and will signal the #GstRtpSession::request-pt-map signal when it needs such a
 * mapping. One can clear the cached values with the #GstRtpSession::clear-pt-map
Wim Taymans's avatar
Wim Taymans committed
59
 * signal.
60
61
 * 
 * <refsect2>
62
 * <title>Example pipelines</title>
63
 * |[
Wim Taymans's avatar
Wim Taymans committed
64
 * gst-launch udpsrc port=5000 caps="application/x-rtp, ..." ! .recv_rtp_sink_0 \
65
 *     gstrtpbin ! rtptheoradepay ! theoradec ! xvimagesink
66
67
 * ]| Receive RTP data from port 5000 and send to the session 0 in gstrtpbin.
 * |[
68
69
 * gst-launch gstrtpbin name=rtpbin \
 *         v4l2src ! ffmpegcolorspace ! ffenc_h263 ! rtph263ppay ! rtpbin.send_rtp_sink_0 \
70
71
72
73
74
75
 *                   rtpbin.send_rtp_src_0 ! udpsink port=5000                            \
 *                   rtpbin.send_rtcp_src_0 ! udpsink port=5001 sync=false async=false    \
 *                   udpsrc port=5005 ! rtpbin.recv_rtcp_sink_0                           \
 *         audiotestsrc ! amrnbenc ! rtpamrpay ! rtpbin.send_rtp_sink_1                   \
 *                   rtpbin.send_rtp_src_1 ! udpsink port=5002                            \
 *                   rtpbin.send_rtcp_src_1 ! udpsink port=5003 sync=false async=false    \
76
 *                   udpsrc port=5007 ! rtpbin.recv_rtcp_sink_1
77
 * ]| Encode and payload H263 video captured from a v4l2src. Encode and payload AMR
78
79
80
81
82
83
 * audio generated from audiotestsrc. The video is sent to session 0 in rtpbin
 * and the audio is sent to session 1. Video packets are sent on UDP port 5000
 * and audio packets on port 5002. The video RTCP packets for session 0 are sent
 * on port 5001 and the audio RTCP packets for session 0 are sent on port 5003.
 * RTCP packets for session 0 are received on port 5005 and RTCP for session 1
 * is received on port 5007. Since RTCP packets from the sender should be sent
84
85
 * as soon as possible and do not participate in preroll, sync=false and 
 * async=false is configured on udpsink
86
87
 * |[
 * gst-launch -v gstrtpbin name=rtpbin                                          \
88
 *     udpsrc caps="application/x-rtp,media=(string)video,clock-rate=(int)90000,encoding-name=(string)H263-1998" \
89
90
91
92
93
94
95
96
97
 *             port=5000 ! rtpbin.recv_rtp_sink_0                                \
 *         rtpbin. ! rtph263pdepay ! ffdec_h263 ! xvimagesink                    \
 *      udpsrc port=5001 ! rtpbin.recv_rtcp_sink_0                               \
 *      rtpbin.send_rtcp_src_0 ! udpsink port=5005 sync=false async=false        \
 *     udpsrc caps="application/x-rtp,media=(string)audio,clock-rate=(int)8000,encoding-name=(string)AMR,encoding-params=(string)1,octet-align=(string)1" \
 *             port=5002 ! rtpbin.recv_rtp_sink_1                                \
 *         rtpbin. ! rtpamrdepay ! amrnbdec ! alsasink                           \
 *      udpsrc port=5003 ! rtpbin.recv_rtcp_sink_1                               \
 *      rtpbin.send_rtcp_src_1 ! udpsink port=5007 sync=false async=false
98
 * ]| Receive H263 on port 5000, send it through rtpbin in session 0, depayload,
99
100
101
102
103
104
105
106
 * decode and display the video.
 * Receive AMR on port 5002, send it through rtpbin in session 1, depayload,
 * decode and play the audio.
 * Receive server RTCP packets for session 0 on port 5001 and RTCP packets for
 * session 1 on port 5003. These packets will be used for session management and
 * synchronisation.
 * Send RTCP reports for session 0 on port 5005 and RTCP reports for session 1
 * on port 5007.
107
108
 * </refsect2>
 *
109
 * Last reviewed on 2007-08-30 (0.10.6)
110
111
112
113
114
115
116
 */

#ifdef HAVE_CONFIG_H
#include "config.h"
#endif
#include <string.h>

117
118
119
#include <gst/rtp/gstrtpbuffer.h>
#include <gst/rtp/gstrtcpbuffer.h>

120
#include "gstrtpbin-marshal.h"
121
#include "gstrtpbin.h"
122
#include "rtpsession.h"
123
#include "gstrtpsession.h"
124
#include "gstrtpjitterbuffer.h"
125

126
127
128
GST_DEBUG_CATEGORY_STATIC (gst_rtp_bin_debug);
#define GST_CAT_DEFAULT gst_rtp_bin_debug

129
130
/* elementfactory information */
static const GstElementDetails rtpbin_details = GST_ELEMENT_DETAILS ("RTP Bin",
Wim Taymans's avatar
Wim Taymans committed
131
    "Filter/Network/RTP",
132
    "Implement an RTP bin",
133
    "Wim Taymans <wim.taymans@gmail.com>");
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164

/* sink pads */
static GstStaticPadTemplate rtpbin_recv_rtp_sink_template =
GST_STATIC_PAD_TEMPLATE ("recv_rtp_sink_%d",
    GST_PAD_SINK,
    GST_PAD_REQUEST,
    GST_STATIC_CAPS ("application/x-rtp")
    );

static GstStaticPadTemplate rtpbin_recv_rtcp_sink_template =
GST_STATIC_PAD_TEMPLATE ("recv_rtcp_sink_%d",
    GST_PAD_SINK,
    GST_PAD_REQUEST,
    GST_STATIC_CAPS ("application/x-rtcp")
    );

static GstStaticPadTemplate rtpbin_send_rtp_sink_template =
GST_STATIC_PAD_TEMPLATE ("send_rtp_sink_%d",
    GST_PAD_SINK,
    GST_PAD_REQUEST,
    GST_STATIC_CAPS ("application/x-rtp")
    );

/* src pads */
static GstStaticPadTemplate rtpbin_recv_rtp_src_template =
GST_STATIC_PAD_TEMPLATE ("recv_rtp_src_%d_%d_%d",
    GST_PAD_SRC,
    GST_PAD_SOMETIMES,
    GST_STATIC_CAPS ("application/x-rtp")
    );

165
166
static GstStaticPadTemplate rtpbin_send_rtcp_src_template =
GST_STATIC_PAD_TEMPLATE ("send_rtcp_src_%d",
167
168
169
170
171
172
173
174
175
176
177
178
179
    GST_PAD_SRC,
    GST_PAD_REQUEST,
    GST_STATIC_CAPS ("application/x-rtcp")
    );

static GstStaticPadTemplate rtpbin_send_rtp_src_template =
GST_STATIC_PAD_TEMPLATE ("send_rtp_src_%d",
    GST_PAD_SRC,
    GST_PAD_SOMETIMES,
    GST_STATIC_CAPS ("application/x-rtp")
    );

#define GST_RTP_BIN_GET_PRIVATE(obj)  \
180
   (G_TYPE_INSTANCE_GET_PRIVATE ((obj), GST_TYPE_RTP_BIN, GstRtpBinPrivate))
181

182
183
184
#define GST_RTP_BIN_LOCK(bin)   g_mutex_lock ((bin)->priv->bin_lock)
#define GST_RTP_BIN_UNLOCK(bin) g_mutex_unlock ((bin)->priv->bin_lock)

185
186
187
188
/* lock to protect dynamic callbacks, like pad-added and new ssrc. */
#define GST_RTP_BIN_DYN_LOCK(bin)    g_mutex_lock ((bin)->priv->dyn_lock)
#define GST_RTP_BIN_DYN_UNLOCK(bin)  g_mutex_unlock ((bin)->priv->dyn_lock)

189
190
191
192
193
/* lock for shutdown */
#define GST_RTP_BIN_SHUTDOWN_LOCK(bin,label)     \
G_STMT_START {                                   \
  if (g_atomic_int_get (&bin->priv->shutdown))   \
    goto label;                                  \
194
  GST_RTP_BIN_DYN_LOCK (bin);                    \
195
  if (g_atomic_int_get (&bin->priv->shutdown)) { \
196
    GST_RTP_BIN_DYN_UNLOCK (bin);                \
197
198
199
200
201
202
    goto label;                                  \
  }                                              \
} G_STMT_END

/* unlock for shutdown */
#define GST_RTP_BIN_SHUTDOWN_UNLOCK(bin)         \
203
  GST_RTP_BIN_DYN_UNLOCK (bin);                  \
204

205
struct _GstRtpBinPrivate
206
{
207
  GMutex *bin_lock;
208

209
210
211
  /* lock protecting dynamic adding/removing */
  GMutex *dyn_lock;

212
  /* the time when we went to playing */
213
  GstClockTime ntp_ns_base;
214
215
216

  /* if we are shutting down or not */
  gint shutdown;
217
218
219
220
221
};

/* signals and args */
enum
{
222
  SIGNAL_REQUEST_PT_MAP,
223
  SIGNAL_CLEAR_PT_MAP,
224
  SIGNAL_RESET_SYNC,
225
  SIGNAL_GET_INTERNAL_SESSION,
226
227
228
229

  SIGNAL_ON_NEW_SSRC,
  SIGNAL_ON_SSRC_COLLISION,
  SIGNAL_ON_SSRC_VALIDATED,
230
  SIGNAL_ON_SSRC_ACTIVE,
231
  SIGNAL_ON_SSRC_SDES,
232
233
234
  SIGNAL_ON_BYE_SSRC,
  SIGNAL_ON_BYE_TIMEOUT,
  SIGNAL_ON_TIMEOUT,
235
  SIGNAL_ON_SENDER_TIMEOUT,
Wim Taymans's avatar
Wim Taymans committed
236
  SIGNAL_ON_NPT_STOP,
237
238
239
  LAST_SIGNAL
};

240
#define DEFAULT_LATENCY_MS	     200
241
242
243
244
245
246
247
#define DEFAULT_SDES_CNAME           NULL
#define DEFAULT_SDES_NAME            NULL
#define DEFAULT_SDES_EMAIL           NULL
#define DEFAULT_SDES_PHONE           NULL
#define DEFAULT_SDES_LOCATION        NULL
#define DEFAULT_SDES_TOOL            NULL
#define DEFAULT_SDES_NOTE            NULL
248
#define DEFAULT_DO_LOST              FALSE
249

250
251
enum
{
252
  PROP_0,
253
254
255
256
257
258
259
260
  PROP_LATENCY,
  PROP_SDES_CNAME,
  PROP_SDES_NAME,
  PROP_SDES_EMAIL,
  PROP_SDES_PHONE,
  PROP_SDES_LOCATION,
  PROP_SDES_TOOL,
  PROP_SDES_NOTE,
261
  PROP_DO_LOST,
262
  PROP_LAST
263
264
};

265
/* helper objects */
266
267
268
typedef struct _GstRtpBinSession GstRtpBinSession;
typedef struct _GstRtpBinStream GstRtpBinStream;
typedef struct _GstRtpBinClient GstRtpBinClient;
269

270
271
static guint gst_rtp_bin_signals[LAST_SIGNAL] = { 0 };

272
static GstCaps *pt_map_requested (GstElement * element, guint pt,
273
    GstRtpBinSession * session);
274
275
276
static const gchar *sdes_type_to_name (GstRTCPSDESType type);
static void gst_rtp_bin_set_sdes_string (GstRtpBin * bin,
    GstRTCPSDESType type, const gchar * data);
277

278
279
static void free_stream (GstRtpBinStream * stream);

280
281
282
283
/* Manages the RTP stream for one SSRC.
 *
 * We pipe the stream (comming from the SSRC demuxer) into a jitterbuffer.
 * If we see an SDES RTCP packet that links multiple SSRCs together based on a
284
 * common CNAME, we create a GstRtpBinClient structure to group the SSRCs
285
286
 * together (see below).
 */
287
struct _GstRtpBinStream
288
289
290
{
  /* the SSRC of this stream */
  guint32 ssrc;
291

292
  /* parent bin */
293
  GstRtpBin *bin;
294

295
  /* the session this SSRC belongs to */
296
  GstRtpBinSession *session;
297

298
299
  /* the jitterbuffer of the SSRC */
  GstElement *buffer;
300
301
302
  gulong buffer_handlesync_sig;
  gulong buffer_ptreq_sig;
  gulong buffer_ntpstop_sig;
303

304
305
306
  /* the PT demuxer of the SSRC */
  GstElement *demux;
  gulong demux_newpad_sig;
307
  gulong demux_ptreq_sig;
308
  gulong demux_pt_change_sig;
309

310
  /* if we have calculated a valid unix_delta for this stream */
311
312
313
  gboolean have_sync;
  /* mapping to local RTP and NTP time */
  gint64 unix_delta;
314
315
};

316
317
318
#define GST_RTP_SESSION_LOCK(sess)   g_mutex_lock ((sess)->lock)
#define GST_RTP_SESSION_UNLOCK(sess) g_mutex_unlock ((sess)->lock)

319
320
321
322
323
324
/* Manages the receiving end of the packets.
 *
 * There is one such structure for each RTP session (audio/video/...).
 * We get the RTP/RTCP packets and stuff them into the session manager. From
 * there they are pushed into an SSRC demuxer that splits the stream based on
 * SSRC. Each of the SSRC streams go into their own jitterbuffer (managed with
325
 * the GstRtpBinStream above).
326
 */
327
struct _GstRtpBinSession
328
329
330
{
  /* session id */
  gint id;
331
  /* the parent bin */
332
  GstRtpBin *bin;
333
334
335
  /* the session element */
  GstElement *session;
  /* the SSRC demuxer */
336
337
338
  GstElement *demux;
  gulong demux_newpad_sig;

339
340
  GMutex *lock;

341
  /* list of GstRtpBinStream */
342
  GSList *streams;
343

344
345
346
  /* mapping of payload type to caps */
  GHashTable *ptmap;

347
348
  /* the pads of the session */
  GstPad *recv_rtp_sink;
349
  GstPad *recv_rtp_src;
350
  GstPad *recv_rtcp_sink;
351
  GstPad *sync_src;
352
  GstPad *send_rtp_sink;
353
  GstPad *send_rtp_src;
354
  GstPad *send_rtp_src_ghost;
355
  GstPad *send_rtcp_src;
356
};
357

358
359
360
361
362
363
364
365
366
367
368
369
370
371
/* Manages the RTP streams that come from one client and should therefore be
 * synchronized.
 */
struct _GstRtpBinClient
{
  /* the common CNAME for the streams */
  gchar *cname;
  guint cname_len;

  /* the streams */
  guint nstreams;
  GSList *streams;
};

372
/* find a session with the given id. Must be called with RTP_BIN_LOCK */
373
374
static GstRtpBinSession *
find_session_by_id (GstRtpBin * rtpbin, gint id)
375
{
376
  GSList *walk;
377

378
  for (walk = rtpbin->sessions; walk; walk = g_slist_next (walk)) {
379
    GstRtpBinSession *sess = (GstRtpBinSession *) walk->data;
380
381
382
383
384
385
386

    if (sess->id == id)
      return sess;
  }
  return NULL;
}

387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
/* find a session with the given request pad. Must be called with RTP_BIN_LOCK */
static GstRtpBinSession *
find_session_by_pad (GstRtpBin * rtpbin, GstPad * pad)
{
  GSList *walk;

  for (walk = rtpbin->sessions; walk; walk = g_slist_next (walk)) {
    GstRtpBinSession *sess = (GstRtpBinSession *) walk->data;

    if ((sess->recv_rtp_sink == pad) ||
        (sess->recv_rtcp_sink == pad) ||
        (sess->send_rtp_sink == pad) || (sess->send_rtcp_src == pad))
      return sess;
  }
  return NULL;
}

404
static void
405
on_new_ssrc (GstElement * session, guint32 ssrc, GstRtpBinSession * sess)
406
407
408
409
410
411
{
  g_signal_emit (sess->bin, gst_rtp_bin_signals[SIGNAL_ON_NEW_SSRC], 0,
      sess->id, ssrc);
}

static void
412
on_ssrc_collision (GstElement * session, guint32 ssrc, GstRtpBinSession * sess)
413
414
415
416
417
418
{
  g_signal_emit (sess->bin, gst_rtp_bin_signals[SIGNAL_ON_SSRC_COLLISION], 0,
      sess->id, ssrc);
}

static void
419
on_ssrc_validated (GstElement * session, guint32 ssrc, GstRtpBinSession * sess)
420
421
422
423
424
{
  g_signal_emit (sess->bin, gst_rtp_bin_signals[SIGNAL_ON_SSRC_VALIDATED], 0,
      sess->id, ssrc);
}

425
426
427
428
429
430
431
static void
on_ssrc_active (GstElement * session, guint32 ssrc, GstRtpBinSession * sess)
{
  g_signal_emit (sess->bin, gst_rtp_bin_signals[SIGNAL_ON_SSRC_ACTIVE], 0,
      sess->id, ssrc);
}

432
433
434
435
436
437
438
static void
on_ssrc_sdes (GstElement * session, guint32 ssrc, GstRtpBinSession * sess)
{
  g_signal_emit (sess->bin, gst_rtp_bin_signals[SIGNAL_ON_SSRC_SDES], 0,
      sess->id, ssrc);
}

439
static void
440
on_bye_ssrc (GstElement * session, guint32 ssrc, GstRtpBinSession * sess)
441
442
443
444
445
446
{
  g_signal_emit (sess->bin, gst_rtp_bin_signals[SIGNAL_ON_BYE_SSRC], 0,
      sess->id, ssrc);
}

static void
447
on_bye_timeout (GstElement * session, guint32 ssrc, GstRtpBinSession * sess)
448
449
450
451
452
453
{
  g_signal_emit (sess->bin, gst_rtp_bin_signals[SIGNAL_ON_BYE_TIMEOUT], 0,
      sess->id, ssrc);
}

static void
454
on_timeout (GstElement * session, guint32 ssrc, GstRtpBinSession * sess)
455
456
457
458
459
{
  g_signal_emit (sess->bin, gst_rtp_bin_signals[SIGNAL_ON_TIMEOUT], 0,
      sess->id, ssrc);
}

460
461
462
463
464
465
466
static void
on_sender_timeout (GstElement * session, guint32 ssrc, GstRtpBinSession * sess)
{
  g_signal_emit (sess->bin, gst_rtp_bin_signals[SIGNAL_ON_SENDER_TIMEOUT], 0,
      sess->id, ssrc);
}

Wim Taymans's avatar
Wim Taymans committed
467
468
469
470
471
472
473
static void
on_npt_stop (GstElement * jbuf, GstRtpBinStream * stream)
{
  g_signal_emit (stream->bin, gst_rtp_bin_signals[SIGNAL_ON_NPT_STOP], 0,
      stream->session->id, stream->ssrc);
}

474
/* create a session with the given id.  Must be called with RTP_BIN_LOCK */
475
476
static GstRtpBinSession *
create_session (GstRtpBin * rtpbin, gint id)
477
{
478
  GstRtpBinSession *sess;
479
  GstElement *session, *demux;
480
  gint i;
481

482
  if (!(session = gst_element_factory_make ("gstrtpsession", NULL)))
483
484
    goto no_session;

485
  if (!(demux = gst_element_factory_make ("gstrtpssrcdemux", NULL)))
486
487
    goto no_demux;

488
  sess = g_new0 (GstRtpBinSession, 1);
489
  sess->lock = g_mutex_new ();
490
  sess->id = id;
491
  sess->bin = rtpbin;
492
  sess->session = session;
493
  sess->demux = demux;
494
495
  sess->ptmap = g_hash_table_new_full (NULL, NULL, NULL,
      (GDestroyNotify) gst_caps_unref);
496
497
  rtpbin->sessions = g_slist_prepend (rtpbin->sessions, sess);

498
499
  /* set NTP base or new session */
  g_object_set (session, "ntp-ns-base", rtpbin->priv->ntp_ns_base, NULL);
500
501
502
503
504
505
  /* configure SDES items */
  GST_OBJECT_LOCK (rtpbin);
  for (i = GST_RTCP_SDES_CNAME; i < GST_RTCP_SDES_PRIV; i++) {
    g_object_set (session, sdes_type_to_name (i), rtpbin->sdes[i], NULL);
  }
  GST_OBJECT_UNLOCK (rtpbin);
506

507
508
509
510
  /* provide clock_rate to the session manager when needed */
  g_signal_connect (session, "request-pt-map",
      (GCallback) pt_map_requested, sess);

511
512
513
514
515
516
  g_signal_connect (sess->session, "on-new-ssrc",
      (GCallback) on_new_ssrc, sess);
  g_signal_connect (sess->session, "on-ssrc-collision",
      (GCallback) on_ssrc_collision, sess);
  g_signal_connect (sess->session, "on-ssrc-validated",
      (GCallback) on_ssrc_validated, sess);
517
518
  g_signal_connect (sess->session, "on-ssrc-active",
      (GCallback) on_ssrc_active, sess);
519
520
  g_signal_connect (sess->session, "on-ssrc-sdes",
      (GCallback) on_ssrc_sdes, sess);
521
522
523
524
525
  g_signal_connect (sess->session, "on-bye-ssrc",
      (GCallback) on_bye_ssrc, sess);
  g_signal_connect (sess->session, "on-bye-timeout",
      (GCallback) on_bye_timeout, sess);
  g_signal_connect (sess->session, "on-timeout", (GCallback) on_timeout, sess);
526
527
  g_signal_connect (sess->session, "on-sender-timeout",
      (GCallback) on_sender_timeout, sess);
528

Wim Taymans's avatar
Wim Taymans committed
529
  /* FIXME, change state only to what's needed */
530
531
  gst_bin_add (GST_BIN_CAST (rtpbin), session);
  gst_element_set_state (session, GST_STATE_PLAYING);
532
533
  gst_bin_add (GST_BIN_CAST (rtpbin), demux);
  gst_element_set_state (demux, GST_STATE_PLAYING);
534
535
536
537
538
539

  return sess;

  /* ERRORS */
no_session:
  {
540
    g_warning ("gstrtpbin: could not create gstrtpsession element");
541
542
    return NULL;
  }
543
544
no_demux:
  {
545
    gst_object_unref (session);
546
    g_warning ("gstrtpbin: could not create gstrtpssrcdemux element");
547
548
549
550
    return NULL;
  }
}

551
552
553
554
555
556
557
static void
free_session (GstRtpBinSession * sess)
{
  GstRtpBin *bin;

  bin = sess->bin;

558
559
  GST_DEBUG_OBJECT (bin, "freeing session %p", sess);

560
  gst_element_set_state (sess->demux, GST_STATE_NULL);
561
  gst_element_set_state (sess->session, GST_STATE_NULL);
562

563
  if (sess->recv_rtp_sink != NULL) {
564
    gst_element_release_request_pad (sess->session, sess->recv_rtp_sink);
565
566
    gst_object_unref (sess->recv_rtp_sink);
  }
567
568
  if (sess->recv_rtp_src != NULL)
    gst_object_unref (sess->recv_rtp_src);
569
  if (sess->recv_rtcp_sink != NULL) {
570
    gst_element_release_request_pad (sess->session, sess->recv_rtcp_sink);
571
572
    gst_object_unref (sess->recv_rtcp_sink);
  }
573
574
  if (sess->sync_src != NULL)
    gst_object_unref (sess->sync_src);
575
  if (sess->send_rtp_sink != NULL) {
576
    gst_element_release_request_pad (sess->session, sess->send_rtp_sink);
577
578
    gst_object_unref (sess->send_rtp_sink);
  }
579
580
  if (sess->send_rtp_src != NULL)
    gst_object_unref (sess->send_rtp_src);
581
  if (sess->send_rtcp_src != NULL) {
582
    gst_element_release_request_pad (sess->session, sess->send_rtcp_src);
583
584
    gst_object_unref (sess->send_rtcp_src);
  }
585

586
587
588
589
590
591
592
593
594
595
596
597
  gst_bin_remove (GST_BIN_CAST (bin), sess->session);
  gst_bin_remove (GST_BIN_CAST (bin), sess->demux);

  g_slist_foreach (sess->streams, (GFunc) free_stream, NULL);
  g_slist_free (sess->streams);

  g_mutex_free (sess->lock);
  g_hash_table_destroy (sess->ptmap);

  g_free (sess);
}

598
#if 0
599
600
static GstRtpBinStream *
find_stream_by_ssrc (GstRtpBinSession * session, guint32 ssrc)
601
602
603
604
{
  GSList *walk;

  for (walk = session->streams; walk; walk = g_slist_next (walk)) {
605
    GstRtpBinStream *stream = (GstRtpBinStream *) walk->data;
606
607
608
609
610
611
612
613

    if (stream->ssrc == ssrc)
      return stream;
  }
  return NULL;
}
#endif

614
615
/* get the payload type caps for the specific payload @pt in @session */
static GstCaps *
616
get_pt_map (GstRtpBinSession * session, guint pt)
617
618
{
  GstCaps *caps = NULL;
619
  GstRtpBin *bin;
620
621
  GValue ret = { 0 };
  GValue args[3] = { {0}, {0}, {0} };
622

623
  GST_DEBUG ("searching pt %d in cache", pt);
624

625
626
  GST_RTP_SESSION_LOCK (session);

627
628
  /* first look in the cache */
  caps = g_hash_table_lookup (session->ptmap, GINT_TO_POINTER (pt));
629
630
  if (caps) {
    gst_caps_ref (caps);
631
    goto done;
632
  }
633
634
635
636
637
638

  bin = session->bin;

  GST_DEBUG ("emiting signal for pt %d in session %d", pt, session->id);

  /* not in cache, send signal to request caps */
639
640
641
642
643
644
645
646
647
648
  g_value_init (&args[0], GST_TYPE_ELEMENT);
  g_value_set_object (&args[0], bin);
  g_value_init (&args[1], G_TYPE_UINT);
  g_value_set_uint (&args[1], session->id);
  g_value_init (&args[2], G_TYPE_UINT);
  g_value_set_uint (&args[2], pt);

  g_value_init (&ret, GST_TYPE_CAPS);
  g_value_set_boxed (&ret, NULL);

649
650
  GST_RTP_SESSION_UNLOCK (session);

651
652
  g_signal_emitv (args, gst_rtp_bin_signals[SIGNAL_REQUEST_PT_MAP], 0, &ret);

653
654
  GST_RTP_SESSION_LOCK (session);

655
656
657
  g_value_unset (&args[0]);
  g_value_unset (&args[1]);
  g_value_unset (&args[2]);
658
659
660
661
662
663
664
665
666

  /* look in the cache again because we let the lock go */
  caps = g_hash_table_lookup (session->ptmap, GINT_TO_POINTER (pt));
  if (caps) {
    gst_caps_ref (caps);
    g_value_unset (&ret);
    goto done;
  }

667
668
  caps = (GstCaps *) g_value_dup_boxed (&ret);
  g_value_unset (&ret);
669
670
671
  if (!caps)
    goto no_caps;

672
673
  GST_DEBUG ("caching pt %d as %" GST_PTR_FORMAT, pt, caps);

674
675
676
  /* store in cache, take additional ref */
  g_hash_table_insert (session->ptmap, GINT_TO_POINTER (pt),
      gst_caps_ref (caps));
677
678

done:
679
680
  GST_RTP_SESSION_UNLOCK (session);

681
682
683
684
685
  return caps;

  /* ERRORS */
no_caps:
  {
686
    GST_RTP_SESSION_UNLOCK (session);
687
688
689
690
691
    GST_DEBUG ("no pt map could be obtained");
    return NULL;
  }
}

692
693
694
695
696
697
static gboolean
return_true (gpointer key, gpointer value, gpointer user_data)
{
  return TRUE;
}

698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
static void
gst_rtp_bin_reset_sync (GstRtpBin * rtpbin)
{
  GSList *clients, *streams;

  GST_DEBUG_OBJECT (rtpbin, "Reset sync on all clients");

  GST_RTP_BIN_LOCK (rtpbin);
  for (clients = rtpbin->clients; clients; clients = g_slist_next (clients)) {
    GstRtpBinClient *client = (GstRtpBinClient *) clients->data;

    /* reset sync on all streams for this client */
    for (streams = client->streams; streams; streams = g_slist_next (streams)) {
      GstRtpBinStream *stream = (GstRtpBinStream *) streams->data;

      /* make use require a new SR packet for this stream before we attempt new
       * lip-sync */
      stream->have_sync = FALSE;
      stream->unix_delta = 0;
    }
  }
  GST_RTP_BIN_UNLOCK (rtpbin);
}

722
static void
723
gst_rtp_bin_clear_pt_map (GstRtpBin * bin)
724
{
725
  GSList *sessions, *streams;
726
727

  GST_RTP_BIN_LOCK (bin);
728
  GST_DEBUG_OBJECT (bin, "clearing pt map");
729
730
731
732
733
  for (sessions = bin->sessions; sessions; sessions = g_slist_next (sessions)) {
    GstRtpBinSession *session = (GstRtpBinSession *) sessions->data;

    GST_DEBUG_OBJECT (bin, "clearing session %p", session);
    g_signal_emit_by_name (session->session, "clear-pt-map", NULL);
734
735

    GST_RTP_SESSION_LOCK (session);
736
    g_hash_table_foreach_remove (session->ptmap, return_true, NULL);
737
738
739
740
741
742
743
744

    for (streams = session->streams; streams; streams = g_slist_next (streams)) {
      GstRtpBinStream *stream = (GstRtpBinStream *) streams->data;

      GST_DEBUG_OBJECT (bin, "clearing stream %p", stream);
      g_signal_emit_by_name (stream->buffer, "clear-pt-map", NULL);
      g_signal_emit_by_name (stream->demux, "clear-pt-map", NULL);
    }
745
746
747
    GST_RTP_SESSION_UNLOCK (session);
  }
  GST_RTP_BIN_UNLOCK (bin);
748
749
750

  /* reset sync too */
  gst_rtp_bin_reset_sync (bin);
751
752
}

753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
static RTPSession *
gst_rtp_bin_get_internal_session (GstRtpBin * bin, guint session_id)
{
  RTPSession *internal_session = NULL;
  GstRtpBinSession *session;

  GST_RTP_BIN_LOCK (bin);
  GST_DEBUG_OBJECT (bin, "retrieving internal RTPSession object, index: %d",
      session_id);
  session = find_session_by_id (bin, (gint) session_id);
  if (session) {
    g_object_get (session->session, "internal-session", &internal_session,
        NULL);
  }
  GST_RTP_BIN_UNLOCK (bin);

  return internal_session;
}

772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
static void
gst_rtp_bin_propagate_property_to_jitterbuffer (GstRtpBin * bin,
    const gchar * name, const GValue * value)
{
  GSList *sessions, *streams;

  GST_RTP_BIN_LOCK (bin);
  for (sessions = bin->sessions; sessions; sessions = g_slist_next (sessions)) {
    GstRtpBinSession *session = (GstRtpBinSession *) sessions->data;

    GST_RTP_SESSION_LOCK (session);
    for (streams = session->streams; streams; streams = g_slist_next (streams)) {
      GstRtpBinStream *stream = (GstRtpBinStream *) streams->data;

      g_object_set_property (G_OBJECT (stream->buffer), name, value);
    }
    GST_RTP_SESSION_UNLOCK (session);
  }
  GST_RTP_BIN_UNLOCK (bin);
}

793
/* get a client with the given SDES name. Must be called with RTP_BIN_LOCK */
794
static GstRtpBinClient *
795
get_client (GstRtpBin * bin, guint8 len, guint8 * data, gboolean * created)
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
{
  GstRtpBinClient *result = NULL;
  GSList *walk;

  for (walk = bin->clients; walk; walk = g_slist_next (walk)) {
    GstRtpBinClient *client = (GstRtpBinClient *) walk->data;

    if (len != client->cname_len)
      continue;

    if (!strncmp ((gchar *) data, client->cname, client->cname_len)) {
      GST_DEBUG_OBJECT (bin, "found existing client %p with CNAME %s", client,
          client->cname);
      result = client;
      break;
    }
  }

  /* nothing found, create one */
  if (result == NULL) {
    result = g_new0 (GstRtpBinClient, 1);
    result->cname = g_strndup ((gchar *) data, len);
    result->cname_len = len;
    bin->clients = g_slist_prepend (bin->clients, result);
    GST_DEBUG_OBJECT (bin, "created new client %p with CNAME %s", result,
        result->cname);
  }
  return result;
}

826
static void
827
free_client (GstRtpBinClient * client)
828
{
829
  g_slist_free (client->streams);
830
831
832
833
  g_free (client->cname);
  g_free (client);
}

834
/* associate a stream to the given CNAME. This will make sure all streams for
835
836
 * that CNAME are synchronized together.
 * Must be called with GST_RTP_BIN_LOCK */
837
838
static void
gst_rtp_bin_associate (GstRtpBin * bin, GstRtpBinStream * stream, guint8 len,
839
840
    guint8 * data, guint64 last_unix, guint64 last_extrtptime,
    guint64 clock_base, guint64 clock_base_time, guint clock_rate)
841
842
843
844
{
  GstRtpBinClient *client;
  gboolean created;
  GSList *walk;
845
846
  guint64 local_unix;
  guint64 local_rtp;
847
848

  /* first find or create the CNAME */
849
  client = get_client (bin, len, data, &created);
850
851
852
853
854
855
856
857
858
859
860
861
862
863
864
865
866
867
868
869
870

  /* find stream in the client */
  for (walk = client->streams; walk; walk = g_slist_next (walk)) {
    GstRtpBinStream *ostream = (GstRtpBinStream *) walk->data;

    if (ostream == stream)
      break;
  }
  /* not found, add it to the list */
  if (walk == NULL) {
    GST_DEBUG_OBJECT (bin,
        "new association of SSRC %08x with client %p with CNAME %s",
        stream->ssrc, client, client->cname);
    client->streams = g_slist_prepend (client->streams, stream);
    client->nstreams++;
  } else {
    GST_DEBUG_OBJECT (bin,
        "found association of SSRC %08x with client %p with CNAME %s",
        stream->ssrc, client, client->cname);
  }

871
872
873
  /* take the extended rtptime we found in the SR packet and map it to the
   * local rtptime. The local rtp time is used to construct timestamps on the
   * buffers. */
874
  local_rtp = last_extrtptime - clock_base;
875
876
877

  GST_DEBUG_OBJECT (bin,
      "base %" G_GUINT64_FORMAT ", extrtptime %" G_GUINT64_FORMAT
878
879
      ", local RTP %" G_GUINT64_FORMAT ", clock-rate %d", clock_base,
      last_extrtptime, local_rtp, clock_rate);
880

881
882
883
  /* calculate local NTP time in gstreamer timestamp, we essentially perform the
   * same conversion that a jitterbuffer would use to convert an rtp timestamp
   * into a corresponding gstreamer timestamp. */
884
885
886
  local_unix = gst_util_uint64_scale_int (local_rtp, GST_SECOND, clock_rate);
  local_unix += clock_base_time;

887
888
889
  /* calculate delta between server and receiver. last_unix is created by
   * converting the ntptime in the last SR packet to a gstreamer timestamp. This
   * delta expresses the difference to our timeline and the server timeline. */
890
891
  stream->unix_delta = last_unix - local_unix;
  stream->have_sync = TRUE;
892
893
894

  GST_DEBUG_OBJECT (bin,
      "local UNIX %" G_GUINT64_FORMAT ", remote UNIX %" G_GUINT64_FORMAT
895
      ", delta %" G_GINT64_FORMAT, local_unix, last_unix, stream->unix_delta);
896

897
  /* recalc inter stream playout offset, but only if there is more than one
898
899
900
901
   * stream. */
  if (client->nstreams > 1) {
    gint64 min;

902
903
904
905
906
907
908
    /* calculate the min of all deltas, ignoring streams that did not yet have a
     * valid unix_delta because we did not yet receive an SR packet for those
     * streams. 
     * We calculate the mininum because we would like to only apply positive
     * offsets to streams, delaying their playback instead of trying to speed up
     * other streams (which might be imposible when we have to create negative
     * latencies).
909
     * The stream that has the smallest diff is selected as the reference stream,
910
     * all other streams will have a positive offset to this difference. */
911
912
913
914
    min = G_MAXINT64;
    for (walk = client->streams; walk; walk = g_slist_next (walk)) {
      GstRtpBinStream *ostream = (GstRtpBinStream *) walk->data;

915
916
917
918
      if (!ostream->have_sync)
        continue;

      if (ostream->unix_delta < min)
919
920
921
922
923
924
925
926
927
        min = ostream->unix_delta;
    }

    GST_DEBUG_OBJECT (bin, "client %p min delta %" G_GINT64_FORMAT, client,
        min);

    /* calculate offsets for each stream */
    for (walk = client->streams; walk; walk = g_slist_next (walk)) {
      GstRtpBinStream *ostream = (GstRtpBinStream *) walk->data;
928
      gint64 ts_offset, prev_ts_offset;
929

930
931
932
933
934
935
936
937
      /* ignore streams for which we didn't receive an SR packet yet, we
       * can't synchronize them yet. We can however sync other streams just
       * fine. */
      if (!ostream->have_sync)
        continue;

      /* calculate offset to our reference stream, this should always give a
       * positive number. */
938
      ts_offset = ostream->unix_delta - min;
939

940
941
      g_object_get (ostream->buffer, "ts-offset", &prev_ts_offset, NULL);

942
      /* delta changed, see how much */
943
      if (prev_ts_offset != ts_offset) {
944
945
        gint64 diff;

946
947
        if (prev_ts_offset > ts_offset)
          diff = prev_ts_offset - ts_offset;
948
        else
949
          diff = ts_offset - prev_ts_offset;
950

951
952
        GST_DEBUG_OBJECT (bin,
            "ts-offset %" G_GUINT64_FORMAT ", prev %" G_GUINT64_FORMAT
953
            ", diff: %" G_GINT64_FORMAT, ts_offset, prev_ts_offset, diff);
954

955
        /* only change diff when it changed more than 4 milliseconds. This
956
957
         * compensates for rounding errors in NTP to RTP timestamp
         * conversions */
958
        if (diff > 4 * GST_MSECOND && diff < (3 * GST_SECOND)) {
959
          g_object_set (ostream->buffer, "ts-offset", ts_offset, NULL);
960
        }
961
962
      }
      GST_DEBUG_OBJECT (bin, "stream SSRC %08x, delta %" G_GINT64_FORMAT,
963
          ostream->ssrc, ts_offset);
964
965
966
967
968
969
970
971
972
973
974
975
976
977
978
979
980
    }
  }
  return;
}

#define GST_RTCP_BUFFER_FOR_PACKETS(b,buffer,packet) \
  for ((b) = gst_rtcp_buffer_get_first_packet ((buffer), (packet)); (b); \
          (b) = gst_rtcp_packet_move_to_next ((packet)))

#define GST_RTCP_SDES_FOR_ITEMS(b,packet) \
  for ((b) = gst_rtcp_packet_sdes_first_item ((packet)); (b); \
          (b) = gst_rtcp_packet_sdes_next_item ((packet)))

#define GST_RTCP_SDES_FOR_ENTRIES(b,packet) \
  for ((b) = gst_rtcp_packet_sdes_first_entry ((packet)); (b); \
          (b) = gst_rtcp_packet_sdes_next_entry ((packet)))

981
982
983
static void
gst_rtp_bin_handle_sync (GstElement * jitterbuffer, GstStructure * s,
    GstRtpBinStream * stream)
984
985
986
987
988
989
990
{
  GstRtpBin *bin;
  GstRTCPPacket packet;
  guint32 ssrc;
  guint64 ntptime;
  gboolean have_sr, have_sdes;
  gboolean more;
991
  guint64 clock_base;
992
  guint64 clock_base_time;
993
  guint clock_rate;
994
995
  guint64 extrtptime;
  GstBuffer *buffer;
996
997
998

  bin = stream->bin;

999
  GST_DEBUG_OBJECT (bin, "sync handler called");
1000

1001
1002
  /* get the last relation between the rtp timestamps and the gstreamer
   * timestamps. We get this info directly from the jitterbuffer which
1003
1004
   * constructs gstreamer timestamps from rtp timestamps and so it know exactly
   * what the current situation is. */
1005
1006
1007
1008
1009
1010
1011
  clock_base = g_value_get_uint64 (gst_structure_get_value (s, "base-rtptime"));
  clock_base_time =
      g_value_get_uint64 (gst_structure_get_value (s, "base-time"));
  clock_rate = g_value_get_uint (gst_structure_get_value (s, "clock-rate"));
  extrtptime =
      g_value_get_uint64 (gst_structure_get_value (s, "sr-ext-rtptime"));
  buffer = gst_value_get_buffer (gst_structure_get_value (s, "sr-buffer"));
1012

1013
1014
1015
1016
1017
1018
1019
1020
1021
1022
1023
  have_sr = FALSE;
  have_sdes = FALSE;
  GST_RTCP_BUFFER_FOR_PACKETS (more, buffer, &packet) {
    /* first packet must be SR or RR or else the validate would have failed */
    switch (gst_rtcp_packet_get_type (&packet)) {
      case GST_RTCP_TYPE_SR:
        /* only parse first. There is only supposed to be one SR in the packet
         * but we will deal with malformed packets gracefully */
        if (have_sr)
          break;
        /* get NTP and RTP times */
1024
        gst_rtcp_packet_sr_get_sender_info (&packet, &ssrc, &ntptime, NULL,
1025
1026
1027
1028
1029
1030
1031
1032
1033
1034
1035
1036
1037
1038
1039
1040
1041
1042
1043
1044
1045
1046
1047
1048
1049
1050
1051
1052
1053
1054
1055
1056
1057
            NULL, NULL);

        GST_DEBUG_OBJECT (bin, "received sync packet from SSRC %08x", ssrc);
        /* ignore SR that is not ours */
        if (ssrc != stream->ssrc)
          continue;

        have_sr = TRUE;
        break;
      case GST_RTCP_TYPE_SDES:
      {
        gboolean more_items, more_entries;

        /* only deal with first SDES, there is only supposed to be one SDES in
         * the RTCP packet but we deal with bad packets gracefully. Also bail
         * out if we have not seen an SR item yet. */
        if (have_sdes || !have_sr)
          break;

        GST_RTCP_SDES_FOR_ITEMS (more_items, &packet) {
          /* skip items that are not about the SSRC of the sender */
          if (gst_rtcp_packet_sdes_get_ssrc (&packet) != ssrc)
            continue;

          /* find the CNAME entry */
          GST_RTCP_SDES_FOR_ENTRIES (more_entries, &packet) {
            GstRTCPSDESType type;
            guint8 len;
            guint8 *data;

            gst_rtcp_packet_sdes_get_entry (&packet, &type, &len, &data);

            if (type == GST_RTCP_SDES_CNAME) {
1058
              GST_RTP_BIN_LOCK (bin);
1059
              /* associate the stream to CNAME */
1060
1061
1062
              gst_rtp_bin_associate (bin, stream, len, data,
                  gst_rtcp_ntp_to_unix (ntptime), extrtptime,
                  clock_base, clock_base_time, clock_rate);
1063
              GST_RTP_BIN_UNLOCK (bin);
1064
1065
1066
1067
1068
1069
1070
1071
1072
1073
1074
1075
1076
            }
          }
        }
        have_sdes = TRUE;
        break;
      }
      default:
        /* we can ignore these packets */
        break;
    }
  }
}

1077
1078
/* create a new stream with @ssrc in @session. Must be called with
 * RTP_SESSION_LOCK. */
1079
1080
static GstRtpBinStream *
create_stream (GstRtpBinSession * session, guint32 ssrc)
1081
1082
{
  GstElement *buffer, *demux;
1083
  GstRtpBinStream *stream;
1084

1085
  if (!(buffer = gst_element_factory_make ("gstrtpjitterbuffer", NULL)))
1086
1087
    goto no_jitterbuffer;

1088
  if (!(demux = gst_element_factory_make ("gstrtpptdemux", NULL)))
1089
1090
    goto no_demux;

1091
  stream = g_new0 (GstRtpBinStream, 1);
1092
1093
1094
1095
1096
  stream->ssrc = ssrc;
  stream->bin = session->bin;
  stream->session = session;
  stream->buffer = buffer;
  stream->demux = demux;
1097
  stream->have_sync = FALSE;
1098
  stream->unix_delta = 0;
1099
1100
  session->streams = g_slist_prepend (session->streams, stream);

1101
  /* provide clock_rate to the jitterbuffer when needed */
1102
  stream->buffer_ptreq_sig = g_signal_connect (buffer, "request-pt-map",
1103
      (GCallback) pt_map_requested, session);
1104
1105
  stream->buffer_ntpstop_sig = g_signal_connect (buffer, "on-npt-stop",
      (GCallback) on_npt_stop, stream);
1106

1107
  /* configure latency and packet lost */
1108
  g_object_set (buffer, "latency", session->bin->latency, NULL);