gstristsrc.c 42.9 KB
Newer Older
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
/* GStreamer RIST plugin
 * Copyright (C) 2019 Net Insight AB
 *     Author: Nicolas Dufresne <nicolas.dufresne@collabora.com>
 *
 * This library is free software; you can redistribute it and/or
 * modify it under the terms of the GNU Library General Public
 * License as published by the Free Software Foundation; either
 * version 2 of the License, or (at your option) any later version.
 *
 * This library is distributed in the hope that it will be useful,
 * but WITHOUT ANY WARRANTY; without even the implied warranty of
 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
 * Library General Public License for more details.
 *
 * You should have received a copy of the GNU Library General Public
 * License along with this library; if not, write to the
 * Free Software Foundation, Inc., 51 Franklin St, Fifth Floor,
 * Boston, MA 02110-1301, USA.
 */

/**
 * SECTION:element-ristsrc
 * @title: ristsrc
 * @see_also: ristsink
 *
 * This element implements RIST TR-06-1 Simple Profile receiver. The stream
27
 * produced by this element will be RTP payloaded. This element also implements
28
 * the URI scheme `rist://` allowing to render RIST streams in GStreamer based
Olivier Crête's avatar
Olivier Crête committed
29
 * media players. The RIST URI handler also allows setting properties through
30
31
 * the URI query.
 *
32
33
34
35
36
37
 * It also implements part of the RIST TR-06-2 Main Profile receiver. The
 * tunneling, multiplexing and encryption parts of the specification are not
 * included. This element will accept the RIST RTP header extension and restore
 * the null MPEG-TS packets if the extension is included. It will not currently
 * use the sequence number extension when sending RTCP NACK requests.
 *
Olivier Crête's avatar
Olivier Crête committed
38
 * ## Example gst-launch line
39
 * |[
Olivier Crête's avatar
Olivier Crête committed
40
 * gst-launch-1.0 ristsrc address=0.0.0.0 port=5004 ! rtpmp2tdepay ! udpsink
41
42
 * gst-play-1.0 "rist://0.0.0.0:5004?receiver-buffer=700"
 * ]|
43
 *
Olivier Crête's avatar
Olivier Crête committed
44
45
46
47
48
 * Additionally, this element supports link bonding, which means it
 * can receive the same stream from multiple addresses. Each address
 * will be mapped to its own RTP session. In order to enable bonding
 * support, one need to configure the list of addresses through
 * "bonding-addresses" properties.
49
 *
Olivier Crête's avatar
Olivier Crête committed
50
 * ## Example gst-launch line for bonding
51
 * |[
52
 * gst-launch-1.0 ristsrc bonding-addresses="10.0.0.1:5004,11.0.0.1:5006" ! rtpmp2tdepay ! udpsink
53
54
 * gst-play-1.0 "rist://0.0.0.0:5004?bonding-addresses=10.0.0.1:5004,11.0.0.1:5006"
 * ]|
55
56
 */

57
58
/* using GValueArray, which has not replacement */
#define GLIB_DISABLE_DEPRECATION_WARNINGS
59
60
61
62
63
64
65
66
67

#ifdef HAVE_CONFIG_H
#include "config.h"
#endif

#include <gio/gio.h>
#include <gst/net/net.h>
#include <gst/rtp/rtp.h>

68
69
70
/* for strtol() */
#include <stdlib.h>

71
72
73
74
#ifdef HAVE_SYS_SOCKET_H
#include <sys/socket.h>
#endif

75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
#include "gstrist.h"

GST_DEBUG_CATEGORY_STATIC (gst_rist_src_debug);
#define GST_CAT_DEFAULT gst_rist_src_debug

enum
{
  PROP_ADDRESS = 1,
  PROP_PORT,
  PROP_RECEIVER_BUFFER,
  PROP_REORDER_SECTION,
  PROP_MAX_RTX_RETRIES,
  PROP_MIN_RTCP_INTERVAL,
  PROP_MAX_RTCP_BANDWIDTH,
  PROP_STATS_UPDATE_INTERVAL,
  PROP_STATS,
  PROP_CNAME,
  PROP_MULTICAST_LOOPBACK,
  PROP_MULTICAST_IFACE,
94
95
  PROP_MULTICAST_TTL,
  PROP_BONDING_ADDRESSES
96
97
98
99
100
101
102
103
};

static GstStaticPadTemplate src_templ = GST_STATIC_PAD_TEMPLATE ("src",
    GST_PAD_SRC,
    GST_PAD_ALWAYS,
    GST_STATIC_CAPS ("application/x-rtp"));


104
105
106
107
108
109
typedef struct
{
  guint session;
  gchar *address;
  gchar *multicast_iface;
  guint port;
110
111

  GstElement *rtcp_src;
112
  GstElement *rtp_src;
113
  GstElement *rtcp_sink;
114
  GstElement *rtx_receive;
115
116
117
118
  gulong rtcp_recv_probe;
  gulong rtcp_send_probe;
  GSocketAddress *rtcp_send_addr;

119
120
121
122
123
124
125
126
127
128
129
} RistReceiverBond;

struct _GstRistSrc
{
  GstBin parent;

  GstUri *uri;

  /* Common elements in the pipeline */
  GstElement *rtpbin;
  GstPad *srcpad;
130
  GstElement *rtxbin;
131
  GstElement *rtx_funnel;
132
  GstElement *rtpdeext;
133

134
  /* Common properties, protected by bonds_lock */
135
136
  guint reorder_section;
  guint max_rtx_retries;
137
138
139
140
141
142
143
144
145
146
147
  GstClockTime min_rtcp_interval;
  gdouble max_rtcp_bandwidth;
  gint multicast_loopback;
  gint multicast_ttl;

  /* Bonds */
  GPtrArray *bonds;
  /* this is needed as setting sibling properties will try to take the object
   * lock. Thus, any properties that affects the bonds will be protected with
   * that lock instead of the object lock. */
  GMutex bonds_lock;
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165

  /* For stats */
  guint stats_interval;
  guint32 rtp_ssrc;
  GstClockID stats_cid;
  GstElement *jitterbuffer;

  /* This is set whenever there is a pipeline construction failure, and used
   * to fail state changes later */
  gboolean construct_failed;
  const gchar *missing_plugin;
};

static void gst_rist_src_uri_init (gpointer g_iface, gpointer iface_data);

G_DEFINE_TYPE_WITH_CODE (GstRistSrc, gst_rist_src, GST_TYPE_BIN,
    G_IMPLEMENT_INTERFACE (GST_TYPE_URI_HANDLER, gst_rist_src_uri_init);
    GST_DEBUG_CATEGORY_INIT (gst_rist_src_debug, "ristsrc", 0, "RIST Source"));
166
167
GST_ELEMENT_REGISTER_DEFINE (ristsrc, "ristsrc", GST_RANK_PRIMARY,
    GST_TYPE_RIST_SRC);
168

169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
/* called with bonds lock */
static RistReceiverBond *
gst_rist_src_add_bond (GstRistSrc * src)
{
  RistReceiverBond *bond = g_slice_new0 (RistReceiverBond);
  GstPad *pad, *gpad;
  gchar name[32];

  bond->session = src->bonds->len;
  bond->address = g_strdup ("0.0.0.0");

  g_snprintf (name, 32, "rist_rtx_receive%u", bond->session);
  bond->rtx_receive = gst_element_factory_make ("ristrtxreceive", name);
  gst_bin_add (GST_BIN (src->rtxbin), bond->rtx_receive);

  g_snprintf (name, 32, "sink_%u", bond->session);
  gst_element_link_pads (bond->rtx_receive, "src", src->rtx_funnel, name);

  g_snprintf (name, 32, "sink_%u", bond->session);
  pad = gst_element_get_static_pad (bond->rtx_receive, "sink");
  gpad = gst_ghost_pad_new (name, pad);
  gst_object_unref (pad);
  gst_element_add_pad (src->rtxbin, gpad);

  g_snprintf (name, 32, "rist_rtp_udpsrc%u", bond->session);
  bond->rtp_src = gst_element_factory_make ("udpsrc", name);
  g_snprintf (name, 32, "rist_rtcp_udpsrc%u", bond->session);
  bond->rtcp_src = gst_element_factory_make ("udpsrc", name);
  g_snprintf (name, 32, "rist_rtcp_dynudpsink%u", bond->session);
  bond->rtcp_sink = gst_element_factory_make ("dynudpsink", name);
  if (!bond->rtp_src || !bond->rtcp_src || !bond->rtcp_sink) {
    g_clear_object (&bond->rtp_src);
    g_clear_object (&bond->rtcp_src);
    g_clear_object (&bond->rtcp_sink);
    g_slice_free (RistReceiverBond, bond);
    src->missing_plugin = "udp";
    return NULL;
  }
  gst_bin_add_many (GST_BIN (src), bond->rtp_src, bond->rtcp_src,
      bond->rtcp_sink, NULL);
  g_object_set (bond->rtcp_sink, "sync", FALSE, "async", FALSE, NULL);
  gst_element_set_locked_state (bond->rtcp_sink, TRUE);

  g_snprintf (name, 32, "recv_rtp_sink_%u", bond->session);
  gst_element_link_pads (bond->rtp_src, "src", src->rtpbin, name);
  g_snprintf (name, 32, "recv_rtcp_sink_%u", bond->session);
  gst_element_link_pads (bond->rtcp_src, "src", src->rtpbin, name);
  g_snprintf (name, 32, "send_rtcp_src_%u", bond->session);
  gst_element_link_pads (src->rtpbin, name, bond->rtcp_sink, "sink");

  g_ptr_array_add (src->bonds, bond);
  return bond;
}

223
224
225
226
227
228
229
230
231
232
233
234
235
static void
gst_rist_src_pad_added (GstRistSrc * src, GstPad * new_pad, GstElement * rtpbin)
{
  GST_TRACE_OBJECT (src, "New pad '%s'.", GST_PAD_NAME (new_pad));

  if (g_str_has_prefix (GST_PAD_NAME (new_pad), "recv_rtp_src_0_")) {
    GST_DEBUG_OBJECT (src, "Using new pad '%s' as ghost pad target.",
        GST_PAD_NAME (new_pad));
    gst_ghost_pad_set_target (GST_GHOST_PAD (src->srcpad), new_pad);
  }
}

static GstCaps *
236
237
gst_rist_src_request_pt_map (GstRistSrc * src, guint session_id, guint pt,
    GstElement * rtpbin)
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
{
  const GstRTPPayloadInfo *pt_info;
  GstCaps *ret;

  pt_info = gst_rtp_payload_info_for_pt (pt);
  if (!pt_info || !pt_info->clock_rate)
    return NULL;

  ret = gst_caps_new_simple ("application/x-rtp",
      "media", G_TYPE_STRING, pt_info->media,
      "encoding_name", G_TYPE_STRING, pt_info->encoding_name,
      "clock-rate", G_TYPE_INT, (gint) pt_info->clock_rate, NULL);

  /* FIXME add sprop-parameter-set if any */
  g_warn_if_fail (pt_info->encoding_parameters == NULL);

  return ret;
}

static GstElement *
gst_rist_src_request_aux_receiver (GstRistSrc * src, guint session_id,
    GstElement * rtpbin)
{
  return gst_object_ref (src->rtxbin);
}

/* Overrides the nack creation. Right now we don't send mixed NACKS type, we
 * simply send a set of range NACK if it takes less space, or allow adding
 * more seqnum. */
static guint
gst_rist_src_on_sending_nacks (GObject * session, guint sender_ssrc,
    guint media_ssrc, GArray * nacks, GstBuffer * buffer, gpointer user_data)
{
  GstRTCPBuffer rtcp = GST_RTCP_BUFFER_INIT;
  GstRTCPPacket packet;
  guint8 *app_data;
  guint nacked_seqnums = 0;
  guint range_size = 0;
  guint n_rg_nacks = 0;
  guint n_fb_nacks = 0;
  guint16 seqnum;
  guint i;
  gint diff;

  /* We'll assume that range will be best, and find how many generic NACK
   * would have been created. If this number ends up being smaller, we will
   * just remove the APP packet and return 0, leaving it to RTPSession to
   * create the generic NACK.*/

  gst_rtcp_buffer_map (buffer, GST_MAP_READWRITE, &rtcp);
  if (!gst_rtcp_buffer_add_packet (&rtcp, GST_RTCP_TYPE_APP, &packet))
    /* exit because the packet is full, will put next request in a
     * further packet */
    goto done;

  gst_rtcp_packet_app_set_ssrc (&packet, media_ssrc);
  gst_rtcp_packet_app_set_name (&packet, "RIST");

  if (!gst_rtcp_packet_app_set_data_length (&packet, 1)) {
    gst_rtcp_packet_remove (&packet);
    GST_WARNING ("no range nacks fit in the packet");
    goto done;
  }

  app_data = gst_rtcp_packet_app_get_data (&packet);
  for (i = 0; i < nacks->len; i = nacked_seqnums) {
    guint j;
    seqnum = g_array_index (nacks, guint16, i);

    if (!gst_rtcp_packet_app_set_data_length (&packet, n_rg_nacks + 1))
      break;

    n_rg_nacks++;
    nacked_seqnums++;

    for (j = i + 1; j < nacks->len; j++) {
      guint16 next_seqnum = g_array_index (nacks, guint16, j);
      diff = gst_rtp_buffer_compare_seqnum (seqnum, next_seqnum);
      GST_TRACE ("[%u][%u] %u %u diff %i", i, j, seqnum, next_seqnum, diff);
      if (diff > (j - i))
        break;

      nacked_seqnums++;
    }

    range_size = j - i - 1;
    GST_WRITE_UINT32_BE (app_data, seqnum << 16 | range_size);
    app_data += 4;
  }

  /* count how many FB NACK it would take to wrap nacked_seqnums */
  seqnum = g_array_index (nacks, guint16, 0);
  n_fb_nacks = 1;
  for (i = 1; i < nacked_seqnums; i++) {
    guint16 next_seqnum = g_array_index (nacks, guint16, i);
    diff = gst_rtp_buffer_compare_seqnum (seqnum, next_seqnum);
    if (diff > 16) {
      n_fb_nacks++;
      seqnum = next_seqnum;
    }
  }

  if (n_fb_nacks <= n_rg_nacks) {
    GST_DEBUG ("Not sending %u range nacks, as %u FB nacks will be smaller",
        n_rg_nacks, n_fb_nacks);
    gst_rtcp_packet_remove (&packet);
    nacked_seqnums = 0;
    goto done;
  }

  GST_DEBUG ("Sent %u seqnums into %u Range NACKs", nacked_seqnums, n_rg_nacks);

done:
  gst_rtcp_buffer_unmap (&rtcp);
  return nacked_seqnums;
}

static void
gst_rist_src_on_new_ssrc (GstRistSrc * src, guint session_id, guint ssrc,
    GstElement * rtpbin)
{
  GObject *session = NULL;
  GObject *source = NULL;

  g_signal_emit_by_name (rtpbin, "get-internal-session", session_id, &session);
  g_signal_emit_by_name (session, "get-source-by-ssrc", ssrc, &source);

365
366
367
  if (ssrc & 1) {
    GST_DEBUG ("Disabling RTCP and probation on RTX stream "
        "(SSRC %u on session %u)", ssrc, session_id);
368
    g_object_set (source, "disable-rtcp", TRUE, "probation", 0, NULL);
369
  } else {
370
371
    g_signal_connect (session, "on-sending-nacks",
        (GCallback) gst_rist_src_on_sending_nacks, NULL);
372
  }
373
374
375
376
377
378
379
380
381

  g_object_unref (source);
  g_object_unref (session);
}

static void
gst_rist_src_new_jitterbuffer (GstRistSrc * src, GstElement * jitterbuffer,
    guint session, guint ssrc, GstElement * rtpbin)
{
382
383
384
385
386
  if (session != 0) {
    GST_WARNING_OBJECT (rtpbin, "Unexpected jitterbuffer created.");
    return;
  }

387
388
389
390
391
392
  g_object_set (jitterbuffer, "rtx-delay", src->reorder_section,
      "rtx-max-retries", src->max_rtx_retries, NULL);

  if ((ssrc & 1) == 0) {
    GST_INFO_OBJECT (src, "Saving jitterbuffer for session %u ssrc %u",
        session, ssrc);
393

394
395
396
397
398
399
400
401
402
403
404
    g_clear_object (&src->jitterbuffer);
    src->jitterbuffer = gst_object_ref (jitterbuffer);
    src->rtp_ssrc = ssrc;
  }
}

static void
gst_rist_src_init (GstRistSrc * src)
{
  GstPad *pad, *gpad;
  GstStructure *sdes = NULL;
405
406
407
408
  RistReceiverBond *bond;

  g_mutex_init (&src->bonds_lock);
  src->bonds = g_ptr_array_new ();
409
410
411
412
413
414
415
416
417
418
419
420
421
422

  /* Construct the RIST RTP receiver pipeline.
   *
   * udpsrc -> [recv_rtp_sink_%u]  --------  [recv_rtp_src_%u_%u_%u]
   *                              | rtpbin |
   * udpsrc -> [recv_rtcp_sink_%u] --------  [send_rtcp_src_%u] -> udpsink
   *
   * This pipeline is fixed for now, note that optionally an FEC stream could
   * be added later.
   */
  src->srcpad = gst_ghost_pad_new_no_target_from_template ("src",
      gst_static_pad_template_get (&src_templ));
  gst_element_add_pad (GST_ELEMENT (src), src->srcpad);

Olivier Crête's avatar
Olivier Crête committed
423
  src->rtpbin = gst_element_factory_make ("rtpbin", "rist_recv_rtpbin");
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
  if (!src->rtpbin) {
    src->missing_plugin = "rtpmanager";
    goto missing_plugin;
  }

  /* RIST specification says the SDES should only contain the CNAME */
  g_object_get (src->rtpbin, "sdes", &sdes, NULL);
  gst_structure_remove_field (sdes, "tool");

  gst_bin_add (GST_BIN (src), src->rtpbin);
  g_object_set (src->rtpbin, "do-retransmission", TRUE,
      "rtp-profile", 3 /* AVPF */ ,
      "sdes", sdes, NULL);
  gst_structure_free (sdes);

439
440
441
442
  g_signal_connect_object (src->rtpbin, "request-pt-map",
      G_CALLBACK (gst_rist_src_request_pt_map), src, G_CONNECT_SWAPPED);
  g_signal_connect_object (src->rtpbin, "request-aux-receiver",
      G_CALLBACK (gst_rist_src_request_aux_receiver), src, G_CONNECT_SWAPPED);
443
444
445
446

  src->rtxbin = gst_bin_new ("rist_recv_rtxbin");
  g_object_ref_sink (src->rtxbin);

447
448
  src->rtx_funnel = gst_element_factory_make ("funnel", "rist_rtx_funnel");
  gst_bin_add (GST_BIN (src->rtxbin), src->rtx_funnel);
449

450
451
452
453
454
  src->rtpdeext = gst_element_factory_make ("ristrtpdeext", "rist_rtp_de_ext");
  gst_bin_add (GST_BIN (src->rtxbin), src->rtpdeext);
  gst_element_link (src->rtx_funnel, src->rtpdeext);

  pad = gst_element_get_static_pad (src->rtpdeext, "src");
455
456
457
458
  gpad = gst_ghost_pad_new ("src_0", pad);
  gst_object_unref (pad);
  gst_element_add_pad (src->rtxbin, gpad);

459
460
461
462
463
464
  g_signal_connect_object (src->rtpbin, "pad-added",
      G_CALLBACK (gst_rist_src_pad_added), src, G_CONNECT_SWAPPED);
  g_signal_connect_object (src->rtpbin, "on-new-ssrc",
      G_CALLBACK (gst_rist_src_on_new_ssrc), src, G_CONNECT_SWAPPED);
  g_signal_connect_object (src->rtpbin, "new-jitterbuffer",
      G_CALLBACK (gst_rist_src_new_jitterbuffer), src, G_CONNECT_SWAPPED);
465

466
467
468
469
  bond = gst_rist_src_add_bond (src);
  if (!bond)
    goto missing_plugin;

470
471
472
473
474
475
476
477
478
  return;

missing_plugin:
  {
    GST_ERROR_OBJECT (src, "'%s' plugin is missing.", src->missing_plugin);
    src->construct_failed = TRUE;
  }
}

479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
static void
gst_rist_src_handle_message (GstBin * bin, GstMessage * message)
{
  switch (GST_MESSAGE_TYPE (message)) {
    case GST_MESSAGE_STREAM_START:
    case GST_MESSAGE_EOS:
      /* drop stream-start & eos from our internal udp sink(s);
         https://gitlab.freedesktop.org/gstreamer/gst-plugins-bad/-/issues/1368 */
      gst_message_unref (message);
      break;
    default:
      GST_BIN_CLASS (gst_rist_src_parent_class)->handle_message (bin, message);
      break;
  }
}

495
496
497
498
499
500
501
static GstPadProbeReturn
gst_rist_src_on_recv_rtcp (GstPad * pad, GstPadProbeInfo * info,
    gpointer user_data)
{
  GstRistSrc *src = GST_RIST_SRC (user_data);
  GstBuffer *buffer;
  GstNetAddressMeta *meta;
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
  GstElement *rtcp_src;
  RistReceiverBond *bond = NULL;
  gint i;

  rtcp_src = GST_ELEMENT (gst_pad_get_parent (pad));

  g_mutex_lock (&src->bonds_lock);

  for (i = 0; i < src->bonds->len; i++) {
    RistReceiverBond *b = g_ptr_array_index (src->bonds, i);
    if (b->rtcp_src == rtcp_src) {
      bond = b;
      break;
    }
  }
  gst_object_unref (rtcp_src);

  if (!bond) {
    GST_WARNING_OBJECT (src, "Unexpected RTCP source.");
    g_mutex_unlock (&src->bonds_lock);
    return GST_PAD_PROBE_OK;
  }
524
525
526
527
528
529
530
531
532
533

  if (info->type == GST_PAD_PROBE_TYPE_BUFFER_LIST) {
    GstBufferList *buffer_list = info->data;
    buffer = gst_buffer_list_get (buffer_list, 0);
  } else {
    buffer = info->data;
  }

  meta = gst_buffer_get_net_address_meta (buffer);

534
535
536
537
  g_clear_object (&bond->rtcp_send_addr);
  bond->rtcp_send_addr = g_object_ref (meta->addr);

  g_mutex_unlock (&src->bonds_lock);
538
539
540
541

  return GST_PAD_PROBE_OK;
}

542
/* called with bonds lock */
543
static inline void
544
545
gst_rist_src_attach_net_address_meta (RistReceiverBond * bond,
    GstBuffer * buffer)
546
{
547
548
  if (bond->rtcp_send_addr)
    gst_buffer_add_net_address_meta (buffer, bond->rtcp_send_addr);
549
550
551
552
553
554
555
}

static GstPadProbeReturn
gst_rist_src_on_send_rtcp (GstPad * pad, GstPadProbeInfo * info,
    gpointer user_data)
{
  GstRistSrc *src = GST_RIST_SRC (user_data);
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
  GstElement *rtcp_sink;
  RistReceiverBond *bond = NULL;
  gint i;

  rtcp_sink = GST_ELEMENT (gst_pad_get_parent (pad));

  g_mutex_lock (&src->bonds_lock);

  for (i = 0; i < src->bonds->len; i++) {
    RistReceiverBond *b = g_ptr_array_index (src->bonds, i);
    if (b->rtcp_sink == rtcp_sink) {
      bond = b;
      break;
    }
  }
  gst_object_unref (rtcp_sink);

  if (!bond) {
    GST_WARNING_OBJECT (src, "Unexpected RTCP sink.");
    g_mutex_unlock (&src->bonds_lock);
    return GST_PAD_PROBE_OK;
  }
578
579
580
581
582
583
584
585
586

  if (info->type == GST_PAD_PROBE_TYPE_BUFFER_LIST) {
    GstBufferList *buffer_list = info->data;
    GstBuffer *buffer;
    gint i;

    info->data = buffer_list = gst_buffer_list_make_writable (buffer_list);
    for (i = 0; i < gst_buffer_list_length (buffer_list); i++) {
      buffer = gst_buffer_list_get (buffer_list, i);
587
      gst_rist_src_attach_net_address_meta (bond, buffer);
588
589
590
591
    }
  } else {
    GstBuffer *buffer = info->data;
    info->data = buffer = gst_buffer_make_writable (buffer);
592
    gst_rist_src_attach_net_address_meta (bond, buffer);
593
594
  }

595
596
  g_mutex_unlock (&src->bonds_lock);

597
598
599
  return GST_PAD_PROBE_OK;
}

600
601
static gboolean
gst_rist_src_setup_rtcp_socket (GstRistSrc * src, RistReceiverBond * bond)
602
603
604
{
  GstPad *pad;
  GSocket *socket = NULL;
605
606
  GInetAddress *iaddr = NULL;
  guint port = bond->port + 1;
607
  GError *error = NULL;
608

609
610
  g_object_get (bond->rtcp_src, "used-socket", &socket, NULL);
  if (!socket)
611
612
    return GST_STATE_CHANGE_FAILURE;

613
  iaddr = g_inet_address_new_from_string (bond->address);
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
  if (!iaddr) {
    GList *results;
    GResolver *resolver = NULL;

    resolver = g_resolver_get_default ();
    results = g_resolver_lookup_by_name (resolver, bond->address, NULL, &error);

    if (!results) {
      g_object_unref (resolver);
      goto dns_resolve_failed;
    }

    iaddr = G_INET_ADDRESS (g_object_ref (results->data));

    g_resolver_free_addresses (results);
    g_object_unref (resolver);
  }
631
632
633
634
635

  if (g_inet_address_get_is_multicast (iaddr)) {
    /* mc-ttl is not supported by dynudpsink */
    g_socket_set_multicast_ttl (socket, src->multicast_ttl);
    /* In multicast, send RTCP to the multicast group */
636
    bond->rtcp_send_addr = g_inet_socket_address_new (iaddr, port);
637
638
  } else {
    /* In unicast, send RTCP to the detected sender address */
639
640
    pad = gst_element_get_static_pad (bond->rtcp_src, "src");
    bond->rtcp_recv_probe = gst_pad_add_probe (pad,
641
642
643
644
645
646
        GST_PAD_PROBE_TYPE_BUFFER | GST_PAD_PROBE_TYPE_BUFFER_LIST,
        gst_rist_src_on_recv_rtcp, src, NULL);
    gst_object_unref (pad);
  }
  g_object_unref (iaddr);

647
648
  pad = gst_element_get_static_pad (bond->rtcp_sink, "sink");
  bond->rtcp_send_probe = gst_pad_add_probe (pad,
649
650
651
652
      GST_PAD_PROBE_TYPE_BUFFER | GST_PAD_PROBE_TYPE_BUFFER_LIST,
      gst_rist_src_on_send_rtcp, src, NULL);
  gst_object_unref (pad);

653
654
655
656
657
658
659
660
661
662
663
664
665
666
  if (bond->multicast_iface) {
#ifdef SO_BINDTODEVICE
    if (setsockopt (g_socket_get_fd (socket), SOL_SOCKET,
            SO_BINDTODEVICE, bond->multicast_iface,
            strlen (bond->multicast_iface)) < 0)
      GST_WARNING_OBJECT (src, "setsockopt SO_BINDTODEVICE failed: %s",
          strerror (errno));
#else
    GST_WARNING_OBJECT (src, "Tried to set a multicast interface while"
        " GStreamer was compiled on a platform without SO_BINDTODEVICE");
#endif
  }


667
  /* share the socket created by the source */
668
  g_object_set (bond->rtcp_sink, "socket", socket, "close-socket", FALSE, NULL);
669
670
  g_object_unref (socket);

671
672
673
674
  gst_element_set_locked_state (bond->rtcp_sink, FALSE);
  gst_element_sync_state_with_parent (bond->rtcp_sink);

  return GST_STATE_CHANGE_SUCCESS;
675
676
677
678
679
680
681
682

dns_resolve_failed:
  GST_ELEMENT_ERROR (src, RESOURCE, NOT_FOUND,
      ("Could not resolve hostname '%s'", GST_STR_NULL (bond->address)),
      ("DNS resolver reported: %s", error->message));
  g_error_free (error);
  return GST_STATE_CHANGE_FAILURE;

683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
}

static GstStateChangeReturn
gst_rist_src_start (GstRistSrc * src)
{
  gint i;

  if (src->construct_failed) {
    GST_ELEMENT_ERROR (src, CORE, MISSING_PLUGIN,
        ("Your GStreamer installation is missing plugin '%s'",
            src->missing_plugin), (NULL));
    return GST_STATE_CHANGE_FAILURE;
  }

  for (i = 0; i < src->bonds->len; i++) {
    RistReceiverBond *bond = g_ptr_array_index (src->bonds, i);
    GObject *session = NULL;

    g_signal_emit_by_name (src->rtpbin, "get-session", i, &session);
    g_object_set (session, "rtcp-min-interval", src->min_rtcp_interval,
        "rtcp-fraction", src->max_rtcp_bandwidth, NULL);
    g_object_unref (session);

    if (!gst_rist_src_setup_rtcp_socket (src, bond))
      return GST_STATE_CHANGE_FAILURE;
  }
709
710
711
712
713
714
715

  return GST_STATE_CHANGE_SUCCESS;
}

static GstStructure *
gst_rist_src_create_stats (GstRistSrc * src)
{
716
717
718
  GstStructure *ret;
  GValueArray *session_stats;
  guint64 total_dropped = 0, total_received = 0, recovered = 0, lost = 0;
719
  guint64 duplicates = 0, rtx_sent = 0, rtt = 0;
720
  gint i;
721
722

  ret = gst_structure_new_empty ("rist/x-receiver-stats");
723
  session_stats = g_value_array_new (src->bonds->len);
724

725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
  for (i = 0; i < src->bonds->len; i++) {
    GObject *session = NULL, *source = NULL;
    GstStructure *sstats = NULL, *stats;
    const gchar *rtp_from = NULL, *rtcp_from = NULL;
    guint64 dropped = 0, received = 0;
    GValue value = G_VALUE_INIT;

    g_signal_emit_by_name (src->rtpbin, "get-internal-session", i, &session);
    if (!session)
      continue;

    stats = gst_structure_new_empty ("rist/x-receiver-session-stats");

    g_signal_emit_by_name (session, "get-source-by-ssrc", src->rtp_ssrc,
        &source);
    if (source) {
      gint packet_lost;
      g_object_get (source, "stats", &sstats, NULL);
      gst_structure_get_int (sstats, "packets-lost", &packet_lost);
      dropped = MAX (packet_lost, 0);
      gst_structure_get_uint64 (sstats, "packets-received", &received);
      rtp_from = gst_structure_get_string (sstats, "rtp-from");
      rtcp_from = gst_structure_get_string (sstats, "rtcp-from");
    }
    g_object_unref (session);
750

751
752
753
754
755
756
757
758
    gst_structure_set (stats, "session-id", G_TYPE_INT, i,
        "rtp-from", G_TYPE_STRING, rtp_from ? rtp_from : "",
        "rtcp-from", G_TYPE_STRING, rtcp_from ? rtcp_from : "",
        "dropped", G_TYPE_UINT64, MAX (dropped, 0),
        "received", G_TYPE_UINT64, received, NULL);

    if (sstats)
      gst_structure_free (sstats);
759
    g_clear_object (&source);
760
761
762
763
764
765
766

    g_value_init (&value, GST_TYPE_STRUCTURE);
    g_value_take_boxed (&value, stats);
    g_value_array_append (session_stats, &value);
    g_value_unset (&value);

    total_dropped += dropped;
767
768
769
  }

  if (src->jitterbuffer) {
770
    GstStructure *stats;
771
    g_object_get (src->jitterbuffer, "stats", &stats, NULL);
772
773
    gst_structure_get (stats,
        "num-pushed", G_TYPE_UINT64, &total_received,
774
775
776
777
778
779
780
781
        "num-lost", G_TYPE_UINT64, &lost,
        "rtx-count", G_TYPE_UINT64, &rtx_sent,
        "num-duplicates", G_TYPE_UINT64, &duplicates,
        "rtx-success-count", G_TYPE_UINT64, &recovered,
        "rtx-rtt", G_TYPE_UINT64, &rtt, NULL);
    gst_structure_free (stats);
  }

782
783
  gst_structure_set (ret, "dropped", G_TYPE_UINT64, total_dropped,
      "received", G_TYPE_UINT64, total_received,
784
785
786
787
      "recovered", G_TYPE_UINT64, recovered,
      "permanently-lost", G_TYPE_UINT64, lost,
      "duplicates", G_TYPE_UINT64, duplicates,
      "retransmission-requests-sent", G_TYPE_UINT64, rtx_sent,
788
789
790
      "rtx-roundtrip-time", G_TYPE_UINT64, rtt,
      "session-stats", G_TYPE_VALUE_ARRAY, session_stats, NULL);
  g_value_array_free (session_stats);
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841

  return ret;
}

static gboolean
gst_rist_src_dump_stats (GstClock * clock, GstClockTime time, GstClockID id,
    gpointer user_data)
{
  GstRistSrc *src = GST_RIST_SRC (user_data);
  GstStructure *stats = gst_rist_src_create_stats (src);

  gst_println ("%s: %" GST_PTR_FORMAT, GST_OBJECT_NAME (src), stats);

  gst_structure_free (stats);
  return TRUE;
}

static void
gst_rist_src_enable_stats_interval (GstRistSrc * src)
{
  GstClock *clock;
  GstClockTime start, interval;

  if (src->stats_interval == 0)
    return;

  interval = src->stats_interval * GST_MSECOND;
  clock = gst_system_clock_obtain ();
  start = gst_clock_get_time (clock) + interval;

  src->stats_cid = gst_clock_new_periodic_id (clock, start, interval);
  gst_clock_id_wait_async (src->stats_cid, gst_rist_src_dump_stats,
      gst_object_ref (src), (GDestroyNotify) gst_object_unref);

  gst_object_unref (clock);
}

static void
gst_rist_src_disable_stats_interval (GstRistSrc * src)
{
  if (src->stats_cid) {
    gst_clock_id_unschedule (src->stats_cid);
    gst_clock_id_unref (src->stats_cid);
    src->stats_cid = NULL;
  }
}

static void
gst_rist_src_stop (GstRistSrc * src)
{
  GstPad *pad;
842
  gint i;
843

844
845
  for (i = 0; i < src->bonds->len; i++) {
    RistReceiverBond *bond = g_ptr_array_index (src->bonds, i);
846

847
848
849
850
851
852
853
854
855
856
857
858
859
860
    if (bond->rtcp_recv_probe) {
      pad = gst_element_get_static_pad (bond->rtcp_src, "src");
      gst_pad_remove_probe (pad, bond->rtcp_recv_probe);
      bond->rtcp_recv_probe = 0;
      gst_object_unref (pad);
    }

    if (bond->rtcp_send_probe) {
      pad = gst_element_get_static_pad (bond->rtcp_sink, "sink");
      gst_pad_remove_probe (pad, bond->rtcp_send_probe);
      bond->rtcp_send_probe = 0;
      gst_object_unref (pad);
    }
  }
861
862
863
864
865
866
867
868
869
870
871
872
873
874
875
876
877
878
879
880
881
882
883
884
885
886
887
888
889
890
891
892
893
894
895
896
}

static GstStateChangeReturn
gst_rist_src_change_state (GstElement * element, GstStateChange transition)
{
  GstRistSrc *src = GST_RIST_SRC (element);
  GstStateChangeReturn ret;

  switch (transition) {
    case GST_STATE_CHANGE_PAUSED_TO_READY:
      gst_rist_src_disable_stats_interval (src);
      break;
    default:
      break;
  }

  ret = GST_ELEMENT_CLASS (gst_rist_src_parent_class)->change_state (element,
      transition);

  switch (transition) {
    case GST_STATE_CHANGE_NULL_TO_READY:
      gst_rist_src_start (src);
      break;
    case GST_STATE_CHANGE_READY_TO_PAUSED:
      gst_rist_src_enable_stats_interval (src);
      break;
    case GST_STATE_CHANGE_READY_TO_NULL:
      gst_rist_src_stop (src);
      break;
    default:
      break;
  }

  return ret;
}

897
898
899
900
901
902
903
904
905
906
907
908
909
910
911
912
913
914
915
916
917
918
919
920
921
922
923
924
925
926
927
928
929
930
931
932
933
934
935
936
937
938
939
940
941
942
943
944
945
946
947
948
949
950
951
952
953
954
955
956
957
958
959
960
961
962
963
964
965
966
967
968
969
970
971
972
973
974
975
976
977
978
979
980
981
982
983
984
985
986
987
988
989
990
991
992
993
994
995
996
997
998
999
1000
/* called with bonds lock */
static void
gst_rist_src_update_bond_address (GstRistSrc * src, RistReceiverBond * bond,
    const gchar * address, guint port, const gchar * multicast_iface)
{
  g_free (bond->address);
  g_free (bond->multicast_iface);
  bond->address = g_strdup (address);
  bond->multicast_iface = multicast_iface ? g_strdup (multicast_iface) : NULL;
  bond->port = port;

  g_object_set (G_OBJECT (bond->rtp_src), "address", address, "port", port,
      "multicast-iface", bond->multicast_iface, NULL);
  g_object_set (G_OBJECT (bond->rtcp_src), "address", address,
      "port", port + 1, "multicast-iface", bond->multicast_iface, NULL);

  /* TODO add runtime support
   *  - add blocking the pad probe
   *  - update RTCP socket
   *  - cycle elements through NULL state
   */
}

/* called with bonds lock */
static gchar *
gst_rist_src_get_bonds (GstRistSrc * src)
{
  GString *bonds = g_string_new ("");
  gint i;

  for (i = 0; i < src->bonds->len; i++) {
    RistReceiverBond *bond = g_ptr_array_index (src->bonds, i);
    if (bonds->len > 0)
      g_string_append_c (bonds, ':');

    g_string_append_printf (bonds, "%s:%u", bond->address, bond->port);

    if (bond->multicast_iface)
      g_string_append_printf (bonds, "/%s", bond->multicast_iface);
  }

  return g_string_free (bonds, FALSE);
}

struct RistAddress
{
  gchar *address;
  char *multicast_iface;
  guint port;
};

/* called with bonds lock */
static void
gst_rist_src_set_bonds (GstRistSrc * src, const gchar * bonds)
{
  GStrv tokens = NULL;
  struct RistAddress *addrs;
  gint i;

  if (bonds == NULL)
    goto missing_address;

  tokens = g_strsplit (bonds, ",", 0);
  if (tokens[0] == NULL)
    goto missing_address;

  addrs = g_new0 (struct RistAddress, g_strv_length (tokens));

  /* parse the address list */
  for (i = 0; tokens[i]; i++) {
    gchar *address = tokens[i];
    char *port_ptr, *iface_ptr, *endptr;
    guint port;

    port_ptr = g_utf8_strrchr (address, -1, ':');
    iface_ptr = g_utf8_strrchr (address, -1, '/');

    if (!port_ptr)
      goto bad_parameter;
    if (!g_ascii_isdigit (port_ptr[1]))
      goto bad_parameter;

    if (iface_ptr) {
      if (iface_ptr < port_ptr)
        goto bad_parameter;
      iface_ptr[0] = '\0';
    }

    port = strtol (port_ptr + 1, &endptr, 0);
    if (endptr[0] != '\0')
      goto bad_parameter;

    /* port must be a multiple of 2 between 2 and 65534 */
    if (port < 2 || (port & 1) || port > G_MAXUINT16)
      goto invalid_port;

    port_ptr[0] = '\0';
    addrs[i].port = port;
    addrs[i].address = g_strstrip (address);
    if (iface_ptr)
      addrs[i].multicast_iface = g_strstrip (iface_ptr + 1);
  }

  /* configure the bonds */
For faster browsing, not all history is shown. View entire blame