gstmultiudpsink.c 26.1 KB
Newer Older
1
/* GStreamer
2
 * Copyright (C) <2007> Wim Taymans <wim.taymans@gmail.com>
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
 *
 * This library is free software; you can redistribute it and/or
 * modify it under the terms of the GNU Library General Public
 * License as published by the Free Software Foundation; either
 * version 2 of the License, or (at your option) any later version.
 *
 * This library is distributed in the hope that it will be useful,
 * but WITHOUT ANY WARRANTY; without even the implied warranty of
 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
 * Library General Public License for more details.
 *
 * You should have received a copy of the GNU Library General Public
 * License along with this library; if not, write to the
 * Free Software Foundation, Inc., 59 Temple Place - Suite 330,
 * Boston, MA 02111-1307, USA.
 */

20
21
22
23
24
25
26
27
28
29
30
31
32
/**
 * SECTION:element-multiupdsink
 * @see_also: udpsink, multifdsink
 *
 * <refsect2>
 * <para>
 * multiudpsink is a network sink that sends UDP packets to multiple
 * clients.
 * It can be combined with rtp payload encoders to implement RTP streaming.
 * </para>
 * </refsect2>
 */

33
34
35
36
37
38
#ifdef HAVE_CONFIG_H
#include "config.h"
#endif
#include "gstudp-marshal.h"
#include "gstmultiudpsink.h"

39
40
41
42
43
44
45
46
#include <stdio.h>
#include <stdlib.h>
#ifdef HAVE_UNISTD_H
#include <unistd.h>
#endif
#include <errno.h>
#include <string.h>

47
GST_DEBUG_CATEGORY_STATIC (multiudpsink_debug);
48
49
50
51
52
53
54
55
#define GST_CAT_DEFAULT (multiudpsink_debug)

static GstStaticPadTemplate sink_template = GST_STATIC_PAD_TEMPLATE ("sink",
    GST_PAD_SINK,
    GST_PAD_ALWAYS,
    GST_STATIC_CAPS_ANY);

/* elementfactory information */
56
static const GstElementDetails gst_multiudpsink_details =
57
58
59
GST_ELEMENT_DETAILS ("UDP packet sender",
    "Sink/Network",
    "Send data over the network via UDP",
60
    "Wim Taymans <wim.taymans@gmail.com>");
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78

/* MultiUDPSink signals and args */
enum
{
  /* methods */
  SIGNAL_ADD,
  SIGNAL_REMOVE,
  SIGNAL_CLEAR,
  SIGNAL_GET_STATS,

  /* signals */
  SIGNAL_CLIENT_ADDED,
  SIGNAL_CLIENT_REMOVED,

  /* FILL ME */
  LAST_SIGNAL
};

79
80
81
#define DEFAULT_SOCKFD             -1
#define DEFAULT_CLOSEFD            TRUE
#define DEFAULT_SOCK               -1
82
#define DEFAULT_CLIENTS            NULL
83
84
85
/* FIXME, this should be disabled by default, we don't need to join a multicast
 * group for sending, if this socket is also used for receiving, it should
 * be configured in the element that does the receive. */
86
#define DEFAULT_AUTO_MULTICAST     TRUE
87
88
#define DEFAULT_TTL                64
#define DEFAULT_LOOP               TRUE
89
#define DEFAULT_QOS_DSCP           -1
90

91
92
93
enum
{
  PROP_0,
94
95
  PROP_BYTES_TO_SERVE,
  PROP_BYTES_SERVED,
96
97
  PROP_SOCKFD,
  PROP_CLOSEFD,
98
  PROP_SOCK,
99
  PROP_CLIENTS,
100
101
  PROP_AUTO_MULTICAST,
  PROP_TTL,
102
103
104
  PROP_LOOP,
  PROP_QOS_DSCP,
  PROP_LAST
105
106
};

107
108
109
110
111
#define CLOSE_IF_REQUESTED(udpctx)                                        \
  if ((!udpctx->externalfd) || (udpctx->externalfd && udpctx->closefd))   \
    CLOSE_SOCKET(udpctx->sock);                                           \
  udpctx->sock = -1;

112
static void gst_multiudpsink_base_init (gpointer g_class);
113
static void gst_multiudpsink_class_init (GstMultiUDPSinkClass * klass);
114
115
116
117
118
static void gst_multiudpsink_init (GstMultiUDPSink * udpsink);
static void gst_multiudpsink_finalize (GObject * object);

static GstFlowReturn gst_multiudpsink_render (GstBaseSink * sink,
    GstBuffer * buffer);
119
120
static GstStateChangeReturn gst_multiudpsink_change_state (GstElement *
    element, GstStateChange transition);
121
122
123
124
125
126

static void gst_multiudpsink_set_property (GObject * object, guint prop_id,
    const GValue * value, GParamSpec * pspec);
static void gst_multiudpsink_get_property (GObject * object, guint prop_id,
    GValue * value, GParamSpec * pspec);

127
128
129
130
131
static void gst_multiudpsink_add_internal (GstMultiUDPSink * sink,
    const gchar * host, gint port, gboolean lock);
static void gst_multiudpsink_clear_internal (GstMultiUDPSink * sink,
    gboolean lock);

132
133
static void free_client (GstUDPClient * client);

134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
static GstElementClass *parent_class = NULL;

static guint gst_multiudpsink_signals[LAST_SIGNAL] = { 0 };

GType
gst_multiudpsink_get_type (void)
{
  static GType multiudpsink_type = 0;

  if (!multiudpsink_type) {
    static const GTypeInfo multiudpsink_info = {
      sizeof (GstMultiUDPSinkClass),
      gst_multiudpsink_base_init,
      NULL,
      (GClassInitFunc) gst_multiudpsink_class_init,
      NULL,
      NULL,
      sizeof (GstMultiUDPSink),
      0,
      (GInstanceInitFunc) gst_multiudpsink_init,
      NULL
    };

    multiudpsink_type =
158
        g_type_register_static (GST_TYPE_BASE_SINK, "GstMultiUDPSink",
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
        &multiudpsink_info, 0);
  }
  return multiudpsink_type;
}

static void
gst_multiudpsink_base_init (gpointer g_class)
{
  GstElementClass *element_class = GST_ELEMENT_CLASS (g_class);

  gst_element_class_add_pad_template (element_class,
      gst_static_pad_template_get (&sink_template));

  gst_element_class_set_details (element_class, &gst_multiudpsink_details);
}

static void
176
gst_multiudpsink_class_init (GstMultiUDPSinkClass * klass)
177
178
179
180
181
182
183
184
185
{
  GObjectClass *gobject_class;
  GstElementClass *gstelement_class;
  GstBaseSinkClass *gstbasesink_class;

  gobject_class = (GObjectClass *) klass;
  gstelement_class = (GstElementClass *) klass;
  gstbasesink_class = (GstBaseSinkClass *) klass;

186
  parent_class = g_type_class_peek_parent (klass);
187
188
189
190
191

  gobject_class->set_property = gst_multiudpsink_set_property;
  gobject_class->get_property = gst_multiudpsink_get_property;
  gobject_class->finalize = gst_multiudpsink_finalize;

192
193
194
195
196
  /**
   * GstMultiUDPSink::add:
   * @gstmultiudpsink: the sink on which the signal is emitted
   * @host: the hostname/IP address of the client to add
   * @port: the port of the client to add
197
198
199
   *
   * Add a client with destination @host and @port to the list of
   * clients.
200
   */
201
202
203
204
205
  gst_multiudpsink_signals[SIGNAL_ADD] =
      g_signal_new ("add", G_TYPE_FROM_CLASS (klass), G_SIGNAL_RUN_LAST,
      G_STRUCT_OFFSET (GstMultiUDPSinkClass, add),
      NULL, NULL, gst_udp_marshal_VOID__STRING_INT, G_TYPE_NONE, 2,
      G_TYPE_STRING, G_TYPE_INT);
206
207
208
209
210
  /**
   * GstMultiUDPSink::remove:
   * @gstmultiudpsink: the sink on which the signal is emitted
   * @host: the hostname/IP address of the client to remove
   * @port: the port of the client to remove
211
212
213
   *
   * Remove the client with destination @host and @port from the list of
   * clients.
214
   */
215
216
217
218
219
  gst_multiudpsink_signals[SIGNAL_REMOVE] =
      g_signal_new ("remove", G_TYPE_FROM_CLASS (klass), G_SIGNAL_RUN_LAST,
      G_STRUCT_OFFSET (GstMultiUDPSinkClass, remove),
      NULL, NULL, gst_udp_marshal_VOID__STRING_INT, G_TYPE_NONE, 2,
      G_TYPE_STRING, G_TYPE_INT);
220
221
222
223
224
225
  /**
   * GstMultiUDPSink::clear:
   * @gstmultiudpsink: the sink on which the signal is emitted
   *
   * Clear the list of clients.
   */
226
227
228
229
  gst_multiudpsink_signals[SIGNAL_CLEAR] =
      g_signal_new ("clear", G_TYPE_FROM_CLASS (klass), G_SIGNAL_RUN_LAST,
      G_STRUCT_OFFSET (GstMultiUDPSinkClass, clear),
      NULL, NULL, g_cclosure_marshal_VOID__VOID, G_TYPE_NONE, 0);
230
231
232
233
234
235
  /**
   * GstMultiUDPSink::get-stats:
   * @gstmultiudpsink: the sink on which the signal is emitted
   * @host: the hostname/IP address of the client to get stats on
   * @port: the port of the client to get stats on
   *
236
237
238
   * Get the statistics of the client with destination @host and @port.
   *
   * Returns: a GValueArray of uint64: bytes_sent, packets_sent,
239
240
   *           connect_time (in epoch seconds), disconnect_time (in epoch seconds)
   */
241
242
243
244
245
  gst_multiudpsink_signals[SIGNAL_GET_STATS] =
      g_signal_new ("get-stats", G_TYPE_FROM_CLASS (klass), G_SIGNAL_RUN_LAST,
      G_STRUCT_OFFSET (GstMultiUDPSinkClass, get_stats),
      NULL, NULL, gst_udp_marshal_BOXED__STRING_INT, G_TYPE_VALUE_ARRAY, 2,
      G_TYPE_STRING, G_TYPE_INT);
246
247
248
249
250
  /**
   * GstMultiUDPSink::client-added:
   * @gstmultiudpsink: the sink emitting the signal
   * @host: the hostname/IP address of the added client
   * @port: the port of the added client
251
252
253
   *
   * Signal emited when a new client is added to the list of
   * clients.
254
   */
255
256
257
258
259
  gst_multiudpsink_signals[SIGNAL_CLIENT_ADDED] =
      g_signal_new ("client-added", G_TYPE_FROM_CLASS (klass),
      G_SIGNAL_RUN_LAST, G_STRUCT_OFFSET (GstMultiUDPSinkClass, client_added),
      NULL, NULL, gst_udp_marshal_VOID__STRING_INT, G_TYPE_NONE, 2,
      G_TYPE_STRING, G_TYPE_INT);
260
261
262
263
264
  /**
   * GstMultiUDPSink::client-removed:
   * @gstmultiudpsink: the sink emitting the signal
   * @host: the hostname/IP address of the removed client
   * @port: the port of the removed client
265
266
267
   *
   * Signal emited when a client is removed from the list of
   * clients.
268
   */
269
270
271
272
273
274
  gst_multiudpsink_signals[SIGNAL_CLIENT_REMOVED] =
      g_signal_new ("client-removed", G_TYPE_FROM_CLASS (klass),
      G_SIGNAL_RUN_LAST, G_STRUCT_OFFSET (GstMultiUDPSinkClass,
          client_removed), NULL, NULL, gst_udp_marshal_VOID__STRING_INT,
      G_TYPE_NONE, 2, G_TYPE_STRING, G_TYPE_INT);

275
276
277
278
279
280
281
282
  g_object_class_install_property (G_OBJECT_CLASS (klass), PROP_BYTES_TO_SERVE,
      g_param_spec_uint64 ("bytes-to-serve", "Bytes to serve",
          "Number of bytes received to serve to clients", 0, G_MAXUINT64, 0,
          G_PARAM_READABLE));
  g_object_class_install_property (G_OBJECT_CLASS (klass), PROP_BYTES_SERVED,
      g_param_spec_uint64 ("bytes-served", "Bytes served",
          "Total number of bytes send to all clients", 0, G_MAXUINT64, 0,
          G_PARAM_READABLE));
283
284
285
286
287
288
289
290
291
292
293
294
  g_object_class_install_property (gobject_class, PROP_SOCKFD,
      g_param_spec_int ("sockfd", "Socket Handle",
          "Socket to use for UDP sending. (-1 == allocate)",
          -1, G_MAXINT, DEFAULT_SOCKFD, G_PARAM_READWRITE));
  g_object_class_install_property (gobject_class, PROP_CLOSEFD,
      g_param_spec_boolean ("closefd", "Close sockfd",
          "Close sockfd if passed as property on state change",
          DEFAULT_CLOSEFD, G_PARAM_READWRITE));
  g_object_class_install_property (gobject_class, PROP_SOCK,
      g_param_spec_int ("sock", "Socket Handle",
          "Socket currently in use for UDP sending. (-1 == no socket)",
          -1, G_MAXINT, DEFAULT_SOCK, G_PARAM_READABLE));
295
296
297
298
  g_object_class_install_property (gobject_class, PROP_CLIENTS,
      g_param_spec_string ("clients", "Clients",
          "A comma separated list of host:port pairs with destinations",
          DEFAULT_CLIENTS, G_PARAM_READWRITE));
299
300
301
302
303
  g_object_class_install_property (gobject_class, PROP_AUTO_MULTICAST,
      g_param_spec_boolean ("auto-multicast",
          "Automatically join/leave multicast groups",
          "Automatically join/leave the multicast groups, FALSE means user"
          " has to do it himself", DEFAULT_AUTO_MULTICAST, G_PARAM_READWRITE));
304
305
306
307
308
309
310
311
  g_object_class_install_property (gobject_class, PROP_TTL,
      g_param_spec_int ("ttl", "Multicast TTL",
          "Used for setting the multicast TTL parameter",
          0, 255, DEFAULT_TTL, G_PARAM_READWRITE));
  g_object_class_install_property (gobject_class, PROP_LOOP,
      g_param_spec_boolean ("loop", "Multicast Loopback",
          "Used for setting the multicast loop parameter. TRUE = enable,"
          " FALSE = disable", DEFAULT_LOOP, G_PARAM_READWRITE));
312
  g_object_class_install_property (G_OBJECT_CLASS (klass), PROP_QOS_DSCP,
313
314
315
      g_param_spec_int ("qos-dscp", "QoS diff srv code point",
          "Quality of Service, differentiated services code point (-1 default)",
          -1, 63, DEFAULT_QOS_DSCP, G_PARAM_READWRITE));
316

317
318
319
  gstelement_class->change_state = gst_multiudpsink_change_state;

  gstbasesink_class->render = gst_multiudpsink_render;
320
321
322
323
  klass->add = gst_multiudpsink_add;
  klass->remove = gst_multiudpsink_remove;
  klass->clear = gst_multiudpsink_clear;
  klass->get_stats = gst_multiudpsink_get_stats;
324
325
326
327
328
329
330
331

  GST_DEBUG_CATEGORY_INIT (multiudpsink_debug, "multiudpsink", 0, "UDP sink");
}


static void
gst_multiudpsink_init (GstMultiUDPSink * sink)
{
332
333
  WSA_STARTUP (sink);

334
  sink->client_lock = g_mutex_new ();
335
336
337
338
  sink->sock = -1;
  sink->sockfd = DEFAULT_SOCKFD;
  sink->closefd = DEFAULT_CLOSEFD;
  sink->externalfd = (sink->sockfd != -1);
339
  sink->auto_multicast = DEFAULT_AUTO_MULTICAST;
340
341
  sink->ttl = DEFAULT_TTL;
  sink->loop = DEFAULT_LOOP;
342
  sink->qos_dscp = DEFAULT_QOS_DSCP;
343
344
345
346
347
348
349
350
351
}

static void
gst_multiudpsink_finalize (GObject * object)
{
  GstMultiUDPSink *sink;

  sink = GST_MULTIUDPSINK (object);

352
353
354
  g_list_foreach (sink->clients, (GFunc) free_client, NULL);
  g_list_free (sink->clients);

355
356
  g_mutex_free (sink->client_lock);

357
358
  WSA_CLEANUP (object);

359
360
361
362
363
364
365
  G_OBJECT_CLASS (parent_class)->finalize (object);
}

static GstFlowReturn
gst_multiudpsink_render (GstBaseSink * bsink, GstBuffer * buffer)
{
  GstMultiUDPSink *sink;
366
  gint ret, size, num = 0;
367
368
369
370
371
372
373
374
  guint8 *data;
  GList *clients;

  sink = GST_MULTIUDPSINK (bsink);

  size = GST_BUFFER_SIZE (buffer);
  data = GST_BUFFER_DATA (buffer);

375
376
  sink->bytes_to_serve += size;

Thomas Vander Stichele's avatar
Thomas Vander Stichele committed
377
  /* grab lock while iterating and sending to clients, this should be
378
   * fast as UDP never blocks */
379
  g_mutex_lock (sink->client_lock);
380
381
  GST_LOG_OBJECT (bsink, "about to send %d bytes", size);

382
383
384
385
  for (clients = sink->clients; clients; clients = g_list_next (clients)) {
    GstUDPClient *client;

    client = (GstUDPClient *) clients->data;
386
    num++;
387
    GST_LOG_OBJECT (sink, "sending %d bytes to client %p", size, client);
388
389
390
391
392
393

    while (TRUE) {
      ret = sendto (*client->sock, data, size, 0,
          (struct sockaddr *) &client->theiraddr, sizeof (client->theiraddr));

      if (ret < 0) {
394
395
396
397
        /* we get a non-posix EPERM on Linux when a firewall rule blocks this
         * destination. We will simply ignore this. */
        if (errno == EPERM)
          break;
398
399
400
        if (errno != EINTR && errno != EAGAIN) {
          goto send_error;
        }
401
402
      } else {
        client->bytes_sent += ret;
403
        client->packets_sent++;
404
        sink->bytes_served += ret;
405
        break;
406
      }
407
408
409
410
    }
  }
  g_mutex_unlock (sink->client_lock);

411
  GST_LOG_OBJECT (sink, "sent %d bytes to %d clients", size, num);
412
413
414

  return GST_FLOW_OK;

415
  /* ERRORS */
416
417
send_error:
  {
418
    /* if sendto returns an error, something is seriously wrong */
419
    g_mutex_unlock (sink->client_lock);
420
421
422
    GST_DEBUG_OBJECT (sink, "got send error %d: %s", errno, g_strerror (errno));
    GST_ELEMENT_ERROR (sink, STREAM, FAILED, (NULL),
        ("Got send error %d: %s", errno, g_strerror (errno)));
423
424
425
426
    return GST_FLOW_ERROR;
  }
}

427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
static void
gst_multiudpsink_set_clients_string (GstMultiUDPSink * sink,
    const gchar * string)
{
  gchar **clients;
  gint i;

  clients = g_strsplit (string, ",", 0);

  g_mutex_lock (sink->client_lock);
  /* clear all existing clients */
  gst_multiudpsink_clear_internal (sink, FALSE);
  for (i = 0; clients[i]; i++) {
    gchar *host, *p;
    gint port = 0;

    host = clients[i];
    p = strstr (clients[i], ":");
    if (p != NULL) {
      *p = '\0';
      port = atoi (p + 1);
    }
    if (port != 0)
      gst_multiudpsink_add_internal (sink, host, port, FALSE);
  }
  g_mutex_unlock (sink->client_lock);

  g_strfreev (clients);
}

static gchar *
gst_multiudpsink_get_clients_string (GstMultiUDPSink * sink)
{
  GString *str;
  GList *clients;

  str = g_string_new ("");

  g_mutex_lock (sink->client_lock);
  clients = sink->clients;
  while (clients) {
    GstUDPClient *client;

    client = (GstUDPClient *) clients->data;

    clients = g_list_next (clients);

    g_string_append_printf (str, "%s:%d%s", client->host, client->port,
        (clients ? "," : ""));
  }
  g_mutex_unlock (sink->client_lock);

  return g_string_free (str, FALSE);
}

482
483
484
485
486
static void
gst_multiudpsink_setup_qos_dscp (GstMultiUDPSink * sink)
{
  gint tos;

487
488
489
490
  /* don't touch on -1 */
  if (sink->qos_dscp < 0)
    return;

491
492
493
494
495
496
497
498
  if (sink->sock < 0)
    return;

  GST_DEBUG_OBJECT (sink, "setting TOS to %d", sink->qos_dscp);

  /* Extract and shift 6 bits of DSFIELD */
  tos = (sink->qos_dscp & 0x3f) << 2;

499
  if (setsockopt (sink->sock, IPPROTO_IP, IP_TOS, &tos, sizeof (tos)) < 0) {
500
501
502
    GST_ERROR_OBJECT (sink, "could not set TOS: %s", g_strerror (errno));
  }
#ifdef IPV6_TCLASS
503
504
  if (setsockopt (sink->sock, IPPROTO_IPV6, IPV6_TCLASS, &tos,
          sizeof (tos)) < 0) {
505
506
507
508
509
    GST_ERROR_OBJECT (sink, "could not set TCLASS: %s", g_strerror (errno));
  }
#endif
}

510
511
512
513
514
515
516
517
518
static void
gst_multiudpsink_set_property (GObject * object, guint prop_id,
    const GValue * value, GParamSpec * pspec)
{
  GstMultiUDPSink *udpsink;

  udpsink = GST_MULTIUDPSINK (object);

  switch (prop_id) {
519
520
521
522
523
524
525
    case PROP_SOCKFD:
      udpsink->sockfd = g_value_get_int (value);
      GST_DEBUG_OBJECT (udpsink, "setting SOCKFD to %d", udpsink->sockfd);
      break;
    case PROP_CLOSEFD:
      udpsink->closefd = g_value_get_boolean (value);
      break;
526
527
528
    case PROP_CLIENTS:
      gst_multiudpsink_set_clients_string (udpsink, g_value_get_string (value));
      break;
529
530
531
    case PROP_AUTO_MULTICAST:
      udpsink->auto_multicast = g_value_get_boolean (value);
      break;
532
533
534
535
536
537
    case PROP_TTL:
      udpsink->ttl = g_value_get_int (value);
      break;
    case PROP_LOOP:
      udpsink->loop = g_value_get_boolean (value);
      break;
538
539
540
541
    case PROP_QOS_DSCP:
      udpsink->qos_dscp = g_value_get_uint (value);
      gst_multiudpsink_setup_qos_dscp (udpsink);
      break;
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
    default:
      G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
      break;
  }
}

static void
gst_multiudpsink_get_property (GObject * object, guint prop_id, GValue * value,
    GParamSpec * pspec)
{
  GstMultiUDPSink *udpsink;

  udpsink = GST_MULTIUDPSINK (object);

  switch (prop_id) {
557
558
559
560
561
562
    case PROP_BYTES_TO_SERVE:
      g_value_set_uint64 (value, udpsink->bytes_to_serve);
      break;
    case PROP_BYTES_SERVED:
      g_value_set_uint64 (value, udpsink->bytes_served);
      break;
563
564
565
566
567
568
569
570
571
    case PROP_SOCKFD:
      g_value_set_int (value, udpsink->sockfd);
      break;
    case PROP_CLOSEFD:
      g_value_set_boolean (value, udpsink->closefd);
      break;
    case PROP_SOCK:
      g_value_set_int (value, udpsink->sock);
      break;
572
573
574
575
    case PROP_CLIENTS:
      g_value_take_string (value,
          gst_multiudpsink_get_clients_string (udpsink));
      break;
576
577
578
    case PROP_AUTO_MULTICAST:
      g_value_set_boolean (value, udpsink->auto_multicast);
      break;
579
580
581
582
583
584
    case PROP_TTL:
      g_value_set_int (value, udpsink->ttl);
      break;
    case PROP_LOOP:
      g_value_set_boolean (value, udpsink->loop);
      break;
585
586
587
    case PROP_QOS_DSCP:
      g_value_set_int (value, udpsink->qos_dscp);
      break;
588
589
590
591
592
593
594
595
596
597
598
599
    default:
      G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
      break;
  }
}

/* create a socket for sending to remote machine */
static gboolean
gst_multiudpsink_init_send (GstMultiUDPSink * sink)
{
  guint bc_val;
  gint ret;
600
601
  GList *clients;
  GstUDPClient *client;
602

603
  if (sink->sockfd == -1) {
604
    /* create sender socket try IP6, fall back to IP4 */
605
606
607
    if ((sink->sock = socket (AF_INET6, SOCK_DGRAM, 0)) == -1)
      if ((sink->sock = socket (AF_INET, SOCK_DGRAM, 0)) == -1)
        goto no_socket;
608
609
610
611
612
613
614

    sink->externalfd = FALSE;
  } else {
    /* we use the configured socket */
    sink->sock = sink->sockfd;
    sink->externalfd = TRUE;
  }
615
616
617
618
619
620
621

  bc_val = 1;
  if ((ret =
          setsockopt (sink->sock, SOL_SOCKET, SO_BROADCAST, &bc_val,
              sizeof (bc_val))) < 0)
    goto no_broadcast;

622
623
624
  sink->bytes_to_serve = 0;
  sink->bytes_served = 0;

625
  gst_udp_set_loop_ttl (sink->sock, sink->loop, sink->ttl);
626
  gst_multiudpsink_setup_qos_dscp (sink);
627

628
  /* look for multicast clients and join multicast groups appropriately */
629
630
  for (clients = sink->clients; clients; clients = g_list_next (clients)) {
    client = (GstUDPClient *) clients->data;
631
    if (sink->auto_multicast && gst_udp_is_multicast (&client->theiraddr))
632
      gst_udp_join_group (*(client->sock), &client->theiraddr);
633
634
  }
  return TRUE;
635

636
637
638
  /* ERRORS */
no_socket:
  {
639
640
    GST_ELEMENT_ERROR (sink, RESOURCE, FAILED, (NULL),
        ("Could not create socket (%d): %s", errno, g_strerror (errno)));
641
642
643
644
    return FALSE;
  }
no_broadcast:
  {
645
    CLOSE_IF_REQUESTED (sink);
646
647
648
    GST_ELEMENT_ERROR (sink, RESOURCE, SETTINGS, (NULL),
        ("Could not set broadcast socket option (%d): %s", errno,
            g_strerror (errno)));
649
650
651
652
653
654
655
    return FALSE;
  }
}

static void
gst_multiudpsink_close (GstMultiUDPSink * sink)
{
656
  CLOSE_IF_REQUESTED (sink);
657
658
}

659
660
661
static void
gst_multiudpsink_add_internal (GstMultiUDPSink * sink, const gchar * host,
    gint port, gboolean lock)
662
663
{
  GstUDPClient *client;
664
  GTimeVal now;
665

666
  GST_DEBUG_OBJECT (sink, "adding client on host %s, port %d", host, port);
667
668
669
670
671
  client = g_new0 (GstUDPClient, 1);
  client->host = g_strdup (host);
  client->port = port;
  client->sock = &sink->sock;

672
673
  if (gst_udp_get_addr (host, port, &client->theiraddr) < 0)
    goto getaddrinfo_error;
674

675
676
677
  g_get_current_time (&now);
  client->connect_time = GST_TIMEVAL_TO_TIME (now);

678
679
680
681
682
683
  if (*client->sock > 0) {
    /* check if its a multicast address */
    if (gst_udp_is_multicast (&client->theiraddr)) {
      GST_DEBUG_OBJECT (sink, "multicast address detected");
      if (sink->auto_multicast) {
        GST_DEBUG_OBJECT (sink, "joining multicast group");
684
        gst_udp_join_group (*(client->sock), &client->theiraddr);
685
686
687
688
      }
    } else {
      GST_DEBUG_OBJECT (sink, "normal address detected");
    }
689
690
  }

691
692
  if (lock)
    g_mutex_lock (sink->client_lock);
693
  sink->clients = g_list_prepend (sink->clients, client);
694
695
  if (lock)
    g_mutex_unlock (sink->client_lock);
696

697
698
699
  g_signal_emit (G_OBJECT (sink),
      gst_multiudpsink_signals[SIGNAL_CLIENT_ADDED], 0, host, port);

Thomas Vander Stichele's avatar
Thomas Vander Stichele committed
700
  GST_DEBUG_OBJECT (sink, "added client on host %s, port %d", host, port);
701
702
703
  return;

  /* ERRORS */
704
getaddrinfo_error:
705
  {
Thomas Vander Stichele's avatar
Thomas Vander Stichele committed
706
707
    GST_DEBUG_OBJECT (sink, "did not add client on host %s, port %d", host,
        port);
708
    GST_WARNING_OBJECT (sink, "getaddrinfo lookup error?");
709
710
711
712
713
714
    g_free (client->host);
    g_free (client);
    return;
  }
}

715
716
717
718
719
720
void
gst_multiudpsink_add (GstMultiUDPSink * sink, const gchar * host, gint port)
{
  gst_multiudpsink_add_internal (sink, host, port, TRUE);
}

721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
static gint
client_compare (GstUDPClient * a, GstUDPClient * b)
{
  if ((a->port == b->port) && (strcmp (a->host, b->host) == 0))
    return 0;

  return 1;
}

static void
free_client (GstUDPClient * client)
{
  g_free (client->host);
  g_free (client);
}

737
738
739
void
gst_multiudpsink_remove (GstMultiUDPSink * sink, const gchar * host, gint port)
{
740
741
  GList *find;
  GstUDPClient udpclient;
742
743
  GstUDPClient *client;
  GTimeVal now;
744
745
746
747
748
749
750

  udpclient.host = (gchar *) host;
  udpclient.port = port;

  g_mutex_lock (sink->client_lock);
  find = g_list_find_custom (sink->clients, &udpclient,
      (GCompareFunc) client_compare);
751
752
  if (!find)
    goto not_found;
753

754
  GST_DEBUG_OBJECT (sink, "remove client with host %s, port %d", host, port);
755

756
  client = (GstUDPClient *) find->data;
757

758
759
  g_get_current_time (&now);
  client->disconnect_time = GST_TIMEVAL_TO_TIME (now);
760

761
762
  if (*(client->sock) != -1 && sink->auto_multicast
      && gst_udp_is_multicast (&client->theiraddr))
763
    gst_udp_leave_group (*(client->sock), &client->theiraddr);
764

765
766
767
768
769
  /* Unlock to emit signal before we delete the actual client */
  g_mutex_unlock (sink->client_lock);
  g_signal_emit (G_OBJECT (sink),
      gst_multiudpsink_signals[SIGNAL_CLIENT_REMOVED], 0, host, port);
  g_mutex_lock (sink->client_lock);
770

771
772
773
  sink->clients = g_list_delete_link (sink->clients, find);

  free_client (client);
774
  g_mutex_unlock (sink->client_lock);
775
776
777
778
779
780
781
782
783
784
785

  return;

  /* ERRORS */
not_found:
  {
    g_mutex_unlock (sink->client_lock);
    GST_WARNING_OBJECT (sink, "client at host %s, port %d not found",
        host, port);
    return;
  }
786
787
}

788
789
static void
gst_multiudpsink_clear_internal (GstMultiUDPSink * sink, gboolean lock)
790
{
791
  GST_DEBUG_OBJECT (sink, "clearing");
792
793
  /* we only need to remove the client structure, there is no additional
   * socket or anything to free for UDP */
794
795
  if (lock)
    g_mutex_lock (sink->client_lock);
796
797
798
  g_list_foreach (sink->clients, (GFunc) free_client, sink);
  g_list_free (sink->clients);
  sink->clients = NULL;
799
800
801
802
803
804
805
806
  if (lock)
    g_mutex_unlock (sink->client_lock);
}

void
gst_multiudpsink_clear (GstMultiUDPSink * sink)
{
  gst_multiudpsink_clear_internal (sink, TRUE);
807
808
809
810
811
812
}

GValueArray *
gst_multiudpsink_get_stats (GstMultiUDPSink * sink, const gchar * host,
    gint port)
{
813
814
815
816
  GstUDPClient *client;
  GValueArray *result = NULL;
  GstUDPClient udpclient;
  GList *find;
817
  GValue value = { 0 };
818
819
820
821
822
823
824
825

  udpclient.host = (gchar *) host;
  udpclient.port = port;

  g_mutex_lock (sink->client_lock);

  find = g_list_find_custom (sink->clients, &udpclient,
      (GCompareFunc) client_compare);
826
827
  if (!find)
    goto not_found;
828

829
  GST_DEBUG_OBJECT (sink, "stats for client with host %s, port %d", host, port);
830

831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
851
852
853
854
855
856
857
  client = (GstUDPClient *) find->data;

  /* Result is a value array of (bytes_sent, packets_sent,
   * connect_time, disconnect_time), all as uint64 */
  result = g_value_array_new (4);

  g_value_init (&value, G_TYPE_UINT64);
  g_value_set_uint64 (&value, client->bytes_sent);
  result = g_value_array_append (result, &value);
  g_value_unset (&value);

  g_value_init (&value, G_TYPE_UINT64);
  g_value_set_uint64 (&value, client->packets_sent);
  result = g_value_array_append (result, &value);
  g_value_unset (&value);

  g_value_init (&value, G_TYPE_UINT64);
  g_value_set_uint64 (&value, client->connect_time);
  result = g_value_array_append (result, &value);
  g_value_unset (&value);

  g_value_init (&value, G_TYPE_UINT64);
  g_value_set_uint64 (&value, client->disconnect_time);
  result = g_value_array_append (result, &value);
  g_value_unset (&value);

  g_mutex_unlock (sink->client_lock);
858
859

  return result;
860
861
862
863
864
865
866
867
868
869
870

  /* ERRORS */
not_found:
  {
    g_mutex_unlock (sink->client_lock);
    GST_WARNING_OBJECT (sink, "client with host %s, port %d not found",
        host, port);
    /* Apparently (see comment in gstmultifdsink.c) returning NULL from here may
     * confuse/break python bindings */
    return g_value_array_new (0);
  }
871
872
}

873
874
static GstStateChangeReturn
gst_multiudpsink_change_state (GstElement * element, GstStateChange transition)
875
{
876
  GstStateChangeReturn ret;
877
878
879
880
881
  GstMultiUDPSink *sink;

  sink = GST_MULTIUDPSINK (element);

  switch (transition) {
882
    case GST_STATE_CHANGE_READY_TO_PAUSED:
883
884
885
886
887
888
889
      if (!gst_multiudpsink_init_send (sink))
        goto no_init;
      break;
    default:
      break;
  }

890
  ret = GST_ELEMENT_CLASS (parent_class)->change_state (element, transition);
891
892

  switch (transition) {
893
    case GST_STATE_CHANGE_PAUSED_TO_READY:
894
895
896
897
898
899
900
901
902
903
      gst_multiudpsink_close (sink);
      break;
    default:
      break;
  }
  return ret;

  /* ERRORS */
no_init:
  {
904
    /* _init_send() posted specific error already */
905
    return GST_STATE_CHANGE_FAILURE;
906
907
  }
}