gstudpsrc.c 29 KB
Newer Older
Andy Wingo Wingo's avatar
Andy Wingo Wingo committed
1
/* GStreamer
2
 * Copyright (C) <2005> Wim Taymans <wim@fluendo.com>
3
 * Copyright (C) <2005> Nokia Corporation <kai.vehmanen@nokia.com>
Andy Wingo Wingo's avatar
Andy Wingo Wingo committed
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
 *
 * This library is free software; you can redistribute it and/or
 * modify it under the terms of the GNU Library General Public
 * License as published by the Free Software Foundation; either
 * version 2 of the License, or (at your option) any later version.
 *
 * This library is distributed in the hope that it will be useful,
 * but WITHOUT ANY WARRANTY; without even the implied warranty of
 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
 * Library General Public License for more details.
 *
 * You should have received a copy of the GNU Library General Public
 * License along with this library; if not, write to the
 * Free Software Foundation, Inc., 59 Temple Place - Suite 330,
 * Boston, MA 02111-1307, USA.
 */

Wim Taymans's avatar
Wim Taymans committed
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
/**
 * SECTION:element-udpsrc
 * @see_also: udpsink, multifdsink
 *
 * <refsect2>
 * <para>
 * udpsrc is a network source that reads UDP packets from the network.
 * It can be combined with RTP depayloaders to implement RTP streaming.
 * </para>
 * <title>Examples</title>
 * <para>
 * Here is a simple pipeline to read from the default port and dump the udp packets.
 * <programlisting>
 * gst-launch -v udpsrc ! fakesink dump=1
 * </programlisting>
 * To actually generate udp packets on the default port one can use the
 * udpsink element. When running the following pipeline in another terminal, the
 * above mentioned pipeline should dump data packets to the console.
 * <programlisting>
 * gst-launch -v audiotestsrc ! udpsink
 * </programlisting>
 * </para>
 * <para>
 * The udpsrc element supports automatic port allocation by setting the
 * "port" property to 0. the following pipeline reads UDP from a free port.
 * <programlisting>
 * gst-launch -v udpsrc port=0 ! fakesink
 * </programlisting>
49
50
 * After setting the udpsrc to PAUSED, the allocated port can be obtained by
 * reading the port property.
Wim Taymans's avatar
Wim Taymans committed
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
 * </para>
 * <para>
 * udpsrc can read from multicast groups by setting the multicast_group property
 * to the IP address of the multicast group.
 * </para>
 * <para>
 * Alternatively one can provide a custom socket to udpsrc with the "sockfd" property,
 * udpsrc will then not allocate a socket itself but use the provided one.
 * </para>
 * <para>
 * The "caps" property is mainly used to give a type to the UDP packet so that they
 * can be autoplugged in GStreamer pipelines. This is very usefull for RTP 
 * implementations where the contents of the UDP packets is transfered out-of-bounds
 * using SDP or other means. 
 * </para>
 * <para>
67
68
69
70
71
72
73
 * The "buffer" property is used to change the default kernel buffer sizes used for
 * receiving packets. The buffer size may be increased for high-volume connections,
 * or may be decreased to limit the possible backlog of incoming data.
 * The system places an absolute limit on these values, on Linux, for example, the
 * default buffer size is typically 50K and can be increased to maximally 100K.
 * </para>
 * <para>
74
 * The "skip-first-bytes" property is used to strip off an arbitrary number of
Wim Taymans's avatar
Wim Taymans committed
75
76
 * bytes from the start of the raw udp packet and can be used to strip off
 * proprietary header, for example. 
77
78
 * </para>
 * <para>
Wim Taymans's avatar
Wim Taymans committed
79
80
 * The udpsrc is always a live source. It does however not provide a GstClock, this
 * is left for upstream elements such as an RTP session manager or demuxer (such
81
82
 * as an MPEG demuxer). As with all live sources, the captured buffers will have
 * their timestamp set to the current running time of the pipeline.
Wim Taymans's avatar
Wim Taymans committed
83
84
85
86
87
 * </para>
 * <para>
 * udpsrc implements a GstURIHandler interface that handles udp://host:port type
 * URIs.
 * </para>
Wim Taymans's avatar
Wim Taymans committed
88
 * <para>
89
90
 * If the <link linkend="GstUDPSrc--timeout">timeout property</link> is set to a
 * value bigger than 0, udpsrc will generate an element message named
91
 * <classname>&quot;GstUDPSrcTimeout&quot;</classname>
92
93
94
95
96
97
 * if no data was recieved in the given timeout.
 * The message's structure contains one field:
 * <itemizedlist>
 * <listitem>
 *   <para>
 *   #guint64
98
99
 *   <classname>&quot;timeout&quot;</classname>: the timeout in microseconds that
 *   expired when waiting for data.
100
101
102
103
104
105
106
 *   </para>
 * </listitem>
 * </itemizedlist>
 * The message is typically used to detect that no UDP arrives in the receiver
 * because it is blocked by a firewall.
 * </para>
 * <para>
107
108
109
110
111
112
113
 * A custom file descriptor can be configured with the 
 * <link linkend="GstUDPSrc--sockfd">sockfd property</link>. The socket will be
 * closed when setting the element to READY by default. This behaviour can be
 * overriden with the <link linkend="GstUDPSrc--closefd">closefd property</link>,
 * in which case the application is responsible for closing the file descriptor.
 * </para>
 * <para>
114
 * Last reviewed on 2007-09-20 (0.10.7)
Wim Taymans's avatar
Wim Taymans committed
115
 * </para>
Wim Taymans's avatar
Wim Taymans committed
116
117
 * </refsect2>
 */
118
119
120
121
#ifdef HAVE_CONFIG_H
#include "config.h"
#endif

Andy Wingo Wingo's avatar
Andy Wingo Wingo committed
122
#include "gstudpsrc.h"
123
#include <unistd.h>
124
#include <stdlib.h>
125

126
#include <gst/netbuffer/gstnetbuffer.h>
127
128
129
#ifdef G_OS_WIN32
typedef int socklen_t;
#endif
130
131
132
133
134

#ifdef HAVE_FIONREAD_IN_SYS_FILIO
#include <sys/filio.h>
#endif

135
GST_DEBUG_CATEGORY_STATIC (udpsrc_debug);
136
137
138
139
140
141
142
143
144
145
#define GST_CAT_DEFAULT (udpsrc_debug)

/* the select call is also performed on the control sockets, that way
 * we can send special commands to unblock or restart the select call */
#define CONTROL_RESTART        'R'      /* restart the select call */
#define CONTROL_STOP           'S'      /* stop the select call */
#define CONTROL_SOCKETS(src)   src->control_sock
#define WRITE_SOCKET(src)      src->control_sock[1]
#define READ_SOCKET(src)       src->control_sock[0]

146
147
148
149
#define SEND_COMMAND(src, command, res)          \
G_STMT_START {                                   \
  unsigned char c; c = command;                  \
  res = write (WRITE_SOCKET(src), &c, 1);        \
150
151
} G_STMT_END

Thomas Vander Stichele's avatar
Thomas Vander Stichele committed
152
153
#define READ_COMMAND(src, command, res)         \
G_STMT_START {                                  \
154
155
  res = read(READ_SOCKET(src), &command, 1);    \
} G_STMT_END
Andy Wingo Wingo's avatar
Andy Wingo Wingo committed
156

157
158
159
160
161
#define CLOSE_IF_REQUESTED(udpctx)                                        \
  if ((!udpctx->externalfd) || (udpctx->externalfd && udpctx->closefd))   \
    CLOSE_SOCKET(udpctx->sock);                                           \
  udpctx->sock = -1;

Wim Taymans's avatar
Wim Taymans committed
162
163
164
165
166
static GstStaticPadTemplate src_template = GST_STATIC_PAD_TEMPLATE ("src",
    GST_PAD_SRC,
    GST_PAD_ALWAYS,
    GST_STATIC_CAPS_ANY);

167
static const GstElementDetails gst_udpsrc_details =
Thomas Vander Stichele's avatar
Thomas Vander Stichele committed
168
169
170
GST_ELEMENT_DETAILS ("UDP packet receiver",
    "Source/Network",
    "Receive data over the network via UDP",
171
172
    "Wim Taymans <wim@fluendo.com>\n"
    "Thijs Vermeir <thijs.vermeir@barco.com>");
Andy Wingo Wingo's avatar
Andy Wingo Wingo committed
173

Thomas Vander Stichele's avatar
Thomas Vander Stichele committed
174
175
#define UDP_DEFAULT_PORT                4951
#define UDP_DEFAULT_MULTICAST_GROUP     "0.0.0.0"
Wim Taymans's avatar
Wim Taymans committed
176
#define UDP_DEFAULT_URI                 "udp://"UDP_DEFAULT_MULTICAST_GROUP":"G_STRINGIFY(UDP_DEFAULT_PORT)
Thomas Vander Stichele's avatar
Thomas Vander Stichele committed
177
#define UDP_DEFAULT_CAPS                NULL
Wim Taymans's avatar
Wim Taymans committed
178
#define UDP_DEFAULT_SOCKFD              -1
179
180
#define UDP_DEFAULT_BUFFER_SIZE		0
#define UDP_DEFAULT_TIMEOUT             0
181
#define UDP_DEFAULT_SKIP_FIRST_BYTES	0
182
#define UDP_DEFAULT_CLOSEFD            TRUE
183
#define UDP_DEFAULT_SOCK                -1
184

Thomas Vander Stichele's avatar
Thomas Vander Stichele committed
185
186
enum
{
187
188
189
190
  PROP_0,
  PROP_PORT,
  PROP_MULTICAST_GROUP,
  PROP_URI,
191
  PROP_CAPS,
192
  PROP_SOCKFD,
193
  PROP_BUFFER_SIZE,
194
  PROP_TIMEOUT,
195
  PROP_SKIP_FIRST_BYTES,
196
197
  PROP_CLOSEFD,
  PROP_SOCK
Andy Wingo Wingo's avatar
Andy Wingo Wingo committed
198
199
};

200
201
static void gst_udpsrc_uri_handler_init (gpointer g_iface, gpointer iface_data);

202
static GstCaps *gst_udpsrc_getcaps (GstBaseSrc * src);
203
204
205
static GstFlowReturn gst_udpsrc_create (GstPushSrc * psrc, GstBuffer ** buf);
static gboolean gst_udpsrc_start (GstBaseSrc * bsrc);
static gboolean gst_udpsrc_stop (GstBaseSrc * bsrc);
206
static gboolean gst_udpsrc_unlock (GstBaseSrc * bsrc);
207
static gboolean gst_udpsrc_unlock_stop (GstBaseSrc * bsrc);
208
static void gst_udpsrc_finalize (GObject * object);
Andy Wingo Wingo's avatar
Andy Wingo Wingo committed
209

Thomas Vander Stichele's avatar
Thomas Vander Stichele committed
210
211
212
213
static void gst_udpsrc_set_property (GObject * object, guint prop_id,
    const GValue * value, GParamSpec * pspec);
static void gst_udpsrc_get_property (GObject * object, guint prop_id,
    GValue * value, GParamSpec * pspec);
Andy Wingo Wingo's avatar
Andy Wingo Wingo committed
214

215
216
217
218
219
220
221
222
static void
_do_init (GType type)
{
  static const GInterfaceInfo urihandler_info = {
    gst_udpsrc_uri_handler_init,
    NULL,
    NULL
  };
Thomas Vander Stichele's avatar
Thomas Vander Stichele committed
223

224
  g_type_add_interface_static (type, GST_TYPE_URI_HANDLER, &urihandler_info);
Andy Wingo Wingo's avatar
Andy Wingo Wingo committed
225

226
  GST_DEBUG_CATEGORY_INIT (udpsrc_debug, "udpsrc", 0, "UDP src");
Andy Wingo Wingo's avatar
Andy Wingo Wingo committed
227
228
}

229
230
231
GST_BOILERPLATE_FULL (GstUDPSrc, gst_udpsrc, GstPushSrc, GST_TYPE_PUSH_SRC,
    _do_init);

Benjamin Otte's avatar
Benjamin Otte committed
232
233
234
235
236
static void
gst_udpsrc_base_init (gpointer g_class)
{
  GstElementClass *element_class = GST_ELEMENT_CLASS (g_class);

Wim Taymans's avatar
Wim Taymans committed
237
238
239
  gst_element_class_add_pad_template (element_class,
      gst_static_pad_template_get (&src_template));

Benjamin Otte's avatar
Benjamin Otte committed
240
241
242
  gst_element_class_set_details (element_class, &gst_udpsrc_details);
}

Andy Wingo Wingo's avatar
Andy Wingo Wingo committed
243
static void
244
gst_udpsrc_class_init (GstUDPSrcClass * klass)
Andy Wingo Wingo's avatar
Andy Wingo Wingo committed
245
246
{
  GObjectClass *gobject_class;
247
248
  GstBaseSrcClass *gstbasesrc_class;
  GstPushSrcClass *gstpushsrc_class;
Andy Wingo Wingo's avatar
Andy Wingo Wingo committed
249

Thomas Vander Stichele's avatar
Thomas Vander Stichele committed
250
  gobject_class = (GObjectClass *) klass;
251
252
  gstbasesrc_class = (GstBaseSrcClass *) klass;
  gstpushsrc_class = (GstPushSrcClass *) klass;
Andy Wingo Wingo's avatar
Andy Wingo Wingo committed
253

Wim Taymans's avatar
Wim Taymans committed
254
255
  gobject_class->set_property = gst_udpsrc_set_property;
  gobject_class->get_property = gst_udpsrc_get_property;
256
  gobject_class->finalize = gst_udpsrc_finalize;
Wim Taymans's avatar
Wim Taymans committed
257

258
  g_object_class_install_property (G_OBJECT_CLASS (klass), PROP_PORT,
259
      g_param_spec_int ("port", "Port",
Wim Taymans's avatar
Wim Taymans committed
260
          "The port to receive the packets from, 0=allocate", 0, G_MAXUINT16,
261
262
          UDP_DEFAULT_PORT, G_PARAM_READWRITE));
  g_object_class_install_property (gobject_class, PROP_MULTICAST_GROUP,
263
      g_param_spec_string ("multicast_group", "Multicast Group",
264
265
266
267
          "The Address of multicast group to join", UDP_DEFAULT_MULTICAST_GROUP,
          G_PARAM_READWRITE));
  g_object_class_install_property (gobject_class, PROP_URI,
      g_param_spec_string ("uri", "URI",
268
          "URI in the form of udp://multicast_group:port", UDP_DEFAULT_URI,
269
          G_PARAM_READWRITE));
270
271
272
  g_object_class_install_property (gobject_class, PROP_CAPS,
      g_param_spec_boxed ("caps", "Caps",
          "The caps of the source pad", GST_TYPE_CAPS, G_PARAM_READWRITE));
273
  g_object_class_install_property (gobject_class, PROP_SOCKFD,
274
      g_param_spec_int ("sockfd", "Socket Handle",
Wim Taymans's avatar
Wim Taymans committed
275
276
          "Socket to use for UDP reception. (-1 == allocate)",
          -1, G_MAXINT, UDP_DEFAULT_SOCKFD, G_PARAM_READWRITE));
277
278
279
280
  g_object_class_install_property (G_OBJECT_CLASS (klass), PROP_BUFFER_SIZE,
      g_param_spec_int ("buffer-size", "Buffer Size",
          "Size of the kernel receive buffer in bytes, 0=default", 0, G_MAXINT,
          UDP_DEFAULT_BUFFER_SIZE, G_PARAM_READWRITE));
281
282
  g_object_class_install_property (G_OBJECT_CLASS (klass), PROP_TIMEOUT,
      g_param_spec_uint64 ("timeout", "Timeout",
283
284
          "Post a message after timeout microseconds (0 = disabled)", 0,
          G_MAXUINT64, UDP_DEFAULT_TIMEOUT, G_PARAM_READWRITE));
285
286
287
288
  g_object_class_install_property (G_OBJECT_CLASS (klass),
      PROP_SKIP_FIRST_BYTES, g_param_spec_int ("skip-first-bytes",
          "Skip first bytes", "number of bytes to skip for each udp packet", 0,
          G_MAXINT, UDP_DEFAULT_SKIP_FIRST_BYTES, G_PARAM_READWRITE));
289
290
291
292
  g_object_class_install_property (gobject_class, PROP_CLOSEFD,
      g_param_spec_boolean ("closefd", "Close sockfd",
          "Close sockfd if passed as property on state change",
          UDP_DEFAULT_CLOSEFD, G_PARAM_READWRITE));
293
294
295
296
  g_object_class_install_property (gobject_class, PROP_SOCK,
      g_param_spec_int ("sock", "Socket Handle",
          "Socket currently in use for UDP reception. (-1 = no socket)",
          -1, G_MAXINT, UDP_DEFAULT_SOCK, G_PARAM_READABLE));
Andy Wingo Wingo's avatar
Andy Wingo Wingo committed
297

298
299
  gstbasesrc_class->start = gst_udpsrc_start;
  gstbasesrc_class->stop = gst_udpsrc_stop;
300
  gstbasesrc_class->unlock = gst_udpsrc_unlock;
301
  gstbasesrc_class->unlock_stop = gst_udpsrc_unlock_stop;
302
  gstbasesrc_class->get_caps = gst_udpsrc_getcaps;
303

304
  gstpushsrc_class->create = gst_udpsrc_create;
Zeeshan Ali's avatar
Zeeshan Ali committed
305
}
Andy Wingo Wingo's avatar
Andy Wingo Wingo committed
306
307

static void
308
gst_udpsrc_init (GstUDPSrc * udpsrc, GstUDPSrcClass * g_class)
Andy Wingo Wingo's avatar
Andy Wingo Wingo committed
309
{
310
311
  WSA_STARTUP (udpsrc);

312
  gst_base_src_set_live (GST_BASE_SRC (udpsrc), TRUE);
Andy Wingo Wingo's avatar
Andy Wingo Wingo committed
313
  udpsrc->port = UDP_DEFAULT_PORT;
314
  udpsrc->sockfd = UDP_DEFAULT_SOCKFD;
315
  udpsrc->multi_group = g_strdup (UDP_DEFAULT_MULTICAST_GROUP);
316
  udpsrc->uri = g_strdup (UDP_DEFAULT_URI);
317
  udpsrc->buffer_size = UDP_DEFAULT_BUFFER_SIZE;
318
  udpsrc->timeout = UDP_DEFAULT_TIMEOUT;
319
  udpsrc->skip_first_bytes = UDP_DEFAULT_SKIP_FIRST_BYTES;
320
321
  udpsrc->closefd = UDP_DEFAULT_CLOSEFD;
  udpsrc->externalfd = (udpsrc->sockfd != -1);
322

323
  udpsrc->sock = UDP_DEFAULT_SOCK;
324
325
  udpsrc->control_sock[0] = -1;
  udpsrc->control_sock[1] = -1;
326
327
  gst_base_src_set_format (GST_BASE_SRC (udpsrc), GST_FORMAT_TIME);
  gst_base_src_set_do_timestamp (GST_BASE_SRC (udpsrc), TRUE);
Andy Wingo Wingo's avatar
Andy Wingo Wingo committed
328
329
}

330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
static void
gst_udpsrc_finalize (GObject * object)
{
  GstUDPSrc *udpsrc;

  udpsrc = GST_UDPSRC (object);

  if (udpsrc->caps)
    gst_caps_unref (udpsrc->caps);
  g_free (udpsrc->multi_group);
  g_free (udpsrc->uri);

  G_OBJECT_CLASS (parent_class)->finalize (object);
}

345
346
347
348
349
350
351
352
353
354
355
356
357
static GstCaps *
gst_udpsrc_getcaps (GstBaseSrc * src)
{
  GstUDPSrc *udpsrc;

  udpsrc = GST_UDPSRC (src);

  if (udpsrc->caps)
    return gst_caps_ref (udpsrc->caps);
  else
    return gst_caps_new_any ();
}

358
359
static GstFlowReturn
gst_udpsrc_create (GstPushSrc * psrc, GstBuffer ** buf)
Andy Wingo Wingo's avatar
Andy Wingo Wingo committed
360
361
{
  GstUDPSrc *udpsrc;
362
  GstNetBuffer *outbuf;
Andy Wingo Wingo's avatar
Andy Wingo Wingo committed
363
  struct sockaddr_in tmpaddr;
364
  socklen_t len;
Andy Wingo Wingo's avatar
Andy Wingo Wingo committed
365
  fd_set read_fds;
366
  guint max_sock;
367
  guint8 *pktdata;
368
  gint pktsize;
369
370

#ifdef G_OS_UNIX
371
  gint readsize;
372
#elif defined G_OS_WIN32
373
374
  gulong readsize;
#endif
375
376
  gint ret;
  gboolean try_again;
Andy Wingo Wingo's avatar
Andy Wingo Wingo committed
377

378
  udpsrc = GST_UDPSRC (psrc);
Andy Wingo Wingo's avatar
Andy Wingo Wingo committed
379

380
381
382
383
384
385
386
387
  /* quick check, avoid going in select when we already have data */
  readsize = 0;
  if ((ret = IOCTL_SOCKET (udpsrc->sock, FIONREAD, &readsize)) < 0)
    goto ioctl_failed;

  if (readsize > 0)
    goto no_select;

388
389
  do {
    gboolean stop;
390
391
392
393
394
395
396
397
    struct timeval timeval, *timeout;

    FD_ZERO (&read_fds);
    FD_SET (udpsrc->sock, &read_fds);
#ifndef G_OS_WIN32
    FD_SET (READ_SOCKET (udpsrc), &read_fds);
#endif
    max_sock = MAX (udpsrc->sock, READ_SOCKET (udpsrc));
398
399
400
401

    try_again = FALSE;
    stop = FALSE;

402
403
404
405
    GST_LOG_OBJECT (udpsrc, "doing select, timeout %" G_GUINT64_FORMAT,
        udpsrc->timeout);

    if (udpsrc->timeout > 0) {
406
407
      timeval.tv_sec = udpsrc->timeout / 1000000;
      timeval.tv_usec = udpsrc->timeout % 1000000;
408
409
410
411
412
      timeout = &timeval;
    } else {
      timeout = NULL;
    }

413
414
415
#ifdef G_OS_WIN32
    if (((max_sock + 1) != READ_SOCKET (udpsrc)) ||
        ((max_sock + 1) != WRITE_SOCKET (udpsrc))) {
416
      ret = select (max_sock + 1, &read_fds, NULL, NULL, timeout);
417
418
419
420
    } else {
      ret = 1;
    }
#else
421
    ret = select (max_sock + 1, &read_fds, NULL, NULL, timeout);
422
#endif
423
    GST_LOG_OBJECT (udpsrc, "select returned %d", ret);
424
    if (ret < 0) {
425
426
427
428
#ifdef G_OS_WIN32
      if (WSAGetLastError () != WSAEINTR)
        goto select_error;
#else
429
430
      if (errno != EAGAIN && errno != EINTR)
        goto select_error;
431
432
#endif
      try_again = TRUE;
433
434
435
436
437
438
439
    } else if (ret == 0) {
      /* timeout, post element message */
      gst_element_post_message (GST_ELEMENT_CAST (udpsrc),
          gst_message_new_element (GST_OBJECT_CAST (udpsrc),
              gst_structure_new ("GstUDPSrcTimeout",
                  "timeout", G_TYPE_UINT64, udpsrc->timeout, NULL)));
      try_again = TRUE;
440
    } else {
441
442
      if (FD_ISSET (READ_SOCKET (udpsrc), &read_fds))
        goto stopped;
443
444
    }
  } while (try_again);
Andy Wingo Wingo's avatar
Andy Wingo Wingo committed
445

446
447
448
449
  /* ask how much is available for reading on the socket, this should be exactly
   * one UDP packet. We will check the return value, though, because in some
   * case it can return 0 and we don't want a 0 sized buffer. */
  readsize = 0;
450
  if ((ret = IOCTL_SOCKET (udpsrc->sock, FIONREAD, &readsize)) < 0)
451
    goto ioctl_failed;
Thomas Vander Stichele's avatar
Thomas Vander Stichele committed
452

453
454
455
  if (!readsize)
    goto nothing_to_read;

456
457
458
no_select:
  GST_LOG_OBJECT (udpsrc, "ioctl says %d bytes available", (int) readsize);

459
460
  pktdata = g_malloc (readsize);
  pktsize = readsize;
Zeeshan Ali's avatar
Zeeshan Ali committed
461

462
  while (TRUE) {
463
    len = sizeof (struct sockaddr);
464
    ret = recvfrom (udpsrc->sock, pktdata, pktsize,
465
        0, (struct sockaddr *) &tmpaddr, &len);
466
    if (ret < 0) {
467
468
469
470
471
      if (errno != EAGAIN && errno != EINTR)
        goto receive_error;
    } else
      break;
  }
Zeeshan Ali's avatar
Zeeshan Ali committed
472

473
  /* special case buffer so receivers can also track the address */
474
  outbuf = gst_netbuffer_new ();
475
476
477
478
479
480
481
482
483
484
485
  GST_BUFFER_MALLOCDATA (outbuf) = pktdata;

  /* patch pktdata and len when stripping off the headers */
  if (udpsrc->skip_first_bytes != 0) {
    if (G_UNLIKELY (readsize <= udpsrc->skip_first_bytes))
      goto skip_error;

    pktdata += udpsrc->skip_first_bytes;
    ret -= udpsrc->skip_first_bytes;
  }
  GST_BUFFER_DATA (outbuf) = pktdata;
486
  GST_BUFFER_SIZE (outbuf) = ret;
487

488
489
490
  gst_netaddress_set_ip4_address (&outbuf->from, tmpaddr.sin_addr.s_addr,
      tmpaddr.sin_port);

491
  gst_buffer_set_caps (GST_BUFFER_CAST (outbuf), udpsrc->caps);
492

493
  GST_LOG_OBJECT (udpsrc, "read %d bytes", (int) readsize);
494

495
  *buf = GST_BUFFER_CAST (outbuf);
Zeeshan Ali's avatar
Zeeshan Ali committed
496

497
  return GST_FLOW_OK;
Zeeshan Ali's avatar
Zeeshan Ali committed
498

499
  /* ERRORS */
Wim Taymans's avatar
Wim Taymans committed
500
501
select_error:
  {
502
503
504
505
506
507
508
509
510
511
512
513
514
    GST_ELEMENT_ERROR (udpsrc, RESOURCE, READ, (NULL),
        ("select error %d: %s (%d)", ret, g_strerror (errno), errno));
    return GST_FLOW_ERROR;
  }
stopped:
  {
    GST_DEBUG ("stop called");
    return GST_FLOW_WRONG_STATE;
  }
ioctl_failed:
  {
    GST_ELEMENT_ERROR (udpsrc, RESOURCE, READ, (NULL),
        ("ioctl failed %d: %s (%d)", ret, g_strerror (errno), errno));
515
    return GST_FLOW_ERROR;
Andy Wingo Wingo's avatar
Andy Wingo Wingo committed
516
  }
517
518
519
520
521
522
523
nothing_to_read:
  {
    GST_ELEMENT_ERROR (udpsrc, RESOURCE, READ, (NULL),
        ("ioctl returned readsize 0 %d: %s (%d)", ret, g_strerror (errno),
            errno));
    return GST_FLOW_ERROR;
  }
Wim Taymans's avatar
Wim Taymans committed
524
525
receive_error:
  {
526
527
528
    g_free (pktdata);
    GST_ELEMENT_ERROR (udpsrc, RESOURCE, READ, (NULL),
        ("receive error %d: %s (%d)", ret, g_strerror (errno), errno));
529
    return GST_FLOW_ERROR;
530
  }
531
532
533
534
535
536
skip_error:
  {
    GST_ELEMENT_ERROR (udpsrc, STREAM, DECODE, (NULL),
        ("UDP buffer to small to skip header"));
    return GST_FLOW_ERROR;
  }
537
538
}

539
540
541
542
543
544
545
/* Call this function when multicastgroup and/or port are updated */

static void
gst_udpsrc_update_uri (GstUDPSrc * src)
{
  g_free (src->uri);
  src->uri = g_strdup_printf ("udp://%s:%d", src->multi_group, src->port);
546
547

  GST_DEBUG_OBJECT (src, "updated uri to %s", src->uri);
548
549
}

550
551
552
553
554
555
556
557
558
559
560
561
562
static gboolean
gst_udpsrc_set_uri (GstUDPSrc * src, const gchar * uri)
{
  gchar *protocol;
  gchar *location;
  gchar *colptr;

  protocol = gst_uri_get_protocol (uri);
  if (strcmp (protocol, "udp") != 0)
    goto wrong_protocol;
  g_free (protocol);

  location = gst_uri_get_location (uri);
563
564
  if (!location)
    return FALSE;
565
566
  colptr = strstr (location, ":");
  if (colptr != NULL) {
567
568
    g_free (src->multi_group);
    src->multi_group = g_strndup (location, colptr - location);
569
    src->port = atoi (colptr + 1);
570
571
572
573
  } else {
    g_free (src->multi_group);
    src->multi_group = g_strdup (location);
    src->port = UDP_DEFAULT_PORT;
574
575
576
  }
  g_free (location);

577
  gst_udpsrc_update_uri (src);
578
579
580

  return TRUE;

581
  /* ERRORS */
582
583
584
wrong_protocol:
  {
    g_free (protocol);
585
    GST_ELEMENT_ERROR (src, RESOURCE, READ, (NULL),
586
        ("error parsing uri %s: wrong protocol (%s != udp)", uri, protocol));
587
588
    return FALSE;
  }
Andy Wingo Wingo's avatar
Andy Wingo Wingo committed
589
590
591
}

static void
Thomas Vander Stichele's avatar
Thomas Vander Stichele committed
592
593
gst_udpsrc_set_property (GObject * object, guint prop_id, const GValue * value,
    GParamSpec * pspec)
Andy Wingo Wingo's avatar
Andy Wingo Wingo committed
594
{
595
  GstUDPSrc *udpsrc = GST_UDPSRC (object);
Andy Wingo Wingo's avatar
Andy Wingo Wingo committed
596
597

  switch (prop_id) {
598
599
    case PROP_BUFFER_SIZE:
      udpsrc->buffer_size = g_value_get_int (value);
600
      break;
601
    case PROP_PORT:
Thomas Vander Stichele's avatar
Thomas Vander Stichele committed
602
      udpsrc->port = g_value_get_int (value);
603
      gst_udpsrc_update_uri (udpsrc);
Andy Wingo Wingo's avatar
Andy Wingo Wingo committed
604
      break;
605
    case PROP_MULTICAST_GROUP:
Thomas Vander Stichele's avatar
Thomas Vander Stichele committed
606
607
      g_free (udpsrc->multi_group);

608
      if (g_value_get_string (value) == NULL)
609
        udpsrc->multi_group = g_strdup (UDP_DEFAULT_MULTICAST_GROUP);
610
      else
611
        udpsrc->multi_group = g_value_dup_string (value);
612
      gst_udpsrc_update_uri (udpsrc);
613
614
615
      break;
    case PROP_URI:
      gst_udpsrc_set_uri (udpsrc, g_value_get_string (value));
616
      break;
617
618
619
620
621
622
623
624
625
626
627
628
629
630
    case PROP_CAPS:
    {
      const GstCaps *new_caps_val = gst_value_get_caps (value);
      GstCaps *new_caps;
      GstCaps *old_caps;

      if (new_caps_val == NULL) {
        new_caps = gst_caps_new_any ();
      } else {
        new_caps = gst_caps_copy (new_caps_val);
      }

      old_caps = udpsrc->caps;
      udpsrc->caps = new_caps;
Wim Taymans's avatar
Wim Taymans committed
631
632
      if (old_caps)
        gst_caps_unref (old_caps);
633
      gst_pad_set_caps (GST_BASE_SRC (udpsrc)->srcpad, new_caps);
634
635
      break;
    }
636
    case PROP_SOCKFD:
637
638
      udpsrc->sockfd = g_value_get_int (value);
      GST_DEBUG ("setting SOCKFD to %d", udpsrc->sockfd);
639
      break;
640
641
642
    case PROP_TIMEOUT:
      udpsrc->timeout = g_value_get_uint64 (value);
      break;
643
644
645
    case PROP_SKIP_FIRST_BYTES:
      udpsrc->skip_first_bytes = g_value_get_int (value);
      break;
646
647
648
    case PROP_CLOSEFD:
      udpsrc->closefd = g_value_get_boolean (value);
      break;
Andy Wingo Wingo's avatar
Andy Wingo Wingo committed
649
650
651
652
653
654
    default:
      break;
  }
}

static void
Thomas Vander Stichele's avatar
Thomas Vander Stichele committed
655
656
gst_udpsrc_get_property (GObject * object, guint prop_id, GValue * value,
    GParamSpec * pspec)
Andy Wingo Wingo's avatar
Andy Wingo Wingo committed
657
{
658
  GstUDPSrc *udpsrc = GST_UDPSRC (object);
Andy Wingo Wingo's avatar
Andy Wingo Wingo committed
659
660

  switch (prop_id) {
661
662
    case PROP_BUFFER_SIZE:
      g_value_set_int (value, udpsrc->buffer_size);
663
      break;
664
    case PROP_PORT:
Andy Wingo Wingo's avatar
Andy Wingo Wingo committed
665
666
      g_value_set_int (value, udpsrc->port);
      break;
667
    case PROP_MULTICAST_GROUP:
668
669
      g_value_set_string (value, udpsrc->multi_group);
      break;
670
671
672
    case PROP_URI:
      g_value_set_string (value, udpsrc->uri);
      break;
673
674
675
    case PROP_CAPS:
      gst_value_set_caps (value, udpsrc->caps);
      break;
676
    case PROP_SOCKFD:
677
      g_value_set_int (value, udpsrc->sockfd);
678
      break;
679
680
681
    case PROP_TIMEOUT:
      g_value_set_uint64 (value, udpsrc->timeout);
      break;
682
683
684
    case PROP_SKIP_FIRST_BYTES:
      g_value_set_int (value, udpsrc->skip_first_bytes);
      break;
685
686
687
    case PROP_CLOSEFD:
      g_value_set_boolean (value, udpsrc->closefd);
      break;
688
689
690
    case PROP_SOCK:
      g_value_set_int (value, udpsrc->sock);
      break;
Andy Wingo Wingo's avatar
Andy Wingo Wingo committed
691
692
693
694
695
696
    default:
      G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
      break;
  }
}

697
/* create a socket for sending to remote machine */
Andy Wingo Wingo's avatar
Andy Wingo Wingo committed
698
static gboolean
699
gst_udpsrc_start (GstBaseSrc * bsrc)
Andy Wingo Wingo's avatar
Andy Wingo Wingo committed
700
{
701
  guint bc_val;
702
  gint reuse;
703
  struct sockaddr_in my_addr;
704
705
  guint len;
  int port;
706
  GstUDPSrc *src;
707
  gint ret;
708
  int rcvsize;
709
710

  src = GST_UDPSRC (bsrc);
Thomas Vander Stichele's avatar
Thomas Vander Stichele committed
711

712
713
714
715
716
717
718
719
#ifdef G_OS_WIN32
  GST_DEBUG_OBJECT (src, "creating pipe");

  /* This should work on UNIX too. PF_UNIX sockets replaced with pipe */
  /* pipe( CONTROL_SOCKETS(src) ) */
  if ((ret = pipe (CONTROL_SOCKETS (src))) < 0)
    goto no_socket_pair;
#else
720
  GST_DEBUG_OBJECT (src, "creating socket pair");
721
722
723
724
725
  if ((ret = socketpair (PF_UNIX, SOCK_STREAM, 0, CONTROL_SOCKETS (src))) < 0)
    goto no_socket_pair;

  fcntl (READ_SOCKET (src), F_SETFL, O_NONBLOCK);
  fcntl (WRITE_SOCKET (src), F_SETFL, O_NONBLOCK);
726
#endif
Andy Wingo Wingo's avatar
Andy Wingo Wingo committed
727

728
729
730
  if (!inet_aton (src->multi_group, &(src->multi_addr.imr_multiaddr)))
    src->multi_addr.imr_multiaddr.s_addr = 0;

731
732
  if (src->sockfd == -1) {
    /* need to allocate a socket */
733
    if ((ret = socket (PF_INET, SOCK_DGRAM, IPPROTO_UDP)) < 0)
734
735
736
      goto no_socket;

    src->sock = ret;
737
    src->externalfd = FALSE;
738
739
740
741
742
743
744
745
746
747

    reuse = 1;
    if ((ret =
            setsockopt (src->sock, SOL_SOCKET, SO_REUSEADDR, &reuse,
                sizeof (reuse))) < 0)
      goto setsockopt_error;

    memset (&src->myaddr, 0, sizeof (src->myaddr));
    src->myaddr.sin_family = AF_INET;   /* host byte order */
    src->myaddr.sin_port = htons (src->port);   /* short, network byte order */
748
749
750
751
752

    if (src->multi_addr.imr_multiaddr.s_addr)
      src->myaddr.sin_addr.s_addr = src->multi_addr.imr_multiaddr.s_addr;
    else
      src->myaddr.sin_addr.s_addr = INADDR_ANY;
753
754

    GST_DEBUG_OBJECT (src, "binding on port %d", src->port);
755
    if ((ret = bind (src->sock, (struct sockaddr *) &src->myaddr,
756
757
                sizeof (src->myaddr))) < 0)
      goto bind_error;
758
759
760
761
  } else {
    /* we use the configured socket */
    src->sock = src->sockfd;
    src->externalfd = TRUE;
762
  }
Andy Wingo Wingo's avatar
Andy Wingo Wingo committed
763

764
765
766
767
768
769
  if (src->multi_addr.imr_multiaddr.s_addr) {
    src->multi_addr.imr_interface.s_addr = INADDR_ANY;
    if ((ret =
            setsockopt (src->sock, IPPROTO_IP, IP_ADD_MEMBERSHIP,
                &src->multi_addr, sizeof (src->multi_addr))) < 0)
      goto membership;
770
771
  }

772
  len = sizeof (my_addr);
773
  if ((ret = getsockname (src->sock, (struct sockaddr *) &my_addr, &len)) < 0)
774
775
    goto getsockname_error;

776
  len = sizeof (rcvsize);
777
778
  if (src->buffer_size != 0) {
    rcvsize = src->buffer_size;
779
780
781
782
783
784
785
786

    GST_DEBUG_OBJECT (src, "setting udp buffer of %d bytes", rcvsize);
    /* set buffer size, Note that on Linux this is typically limited to a
     * maximum of around 100K. Also a minimum of 128 bytes is required on
     * Linux. */
    ret = setsockopt (src->sock, SOL_SOCKET, SO_RCVBUF, (void *) &rcvsize, len);
    if (ret != 0)
      goto udpbuffer_error;
787
788
  }

789
790
791
792
793
794
795
796
797
  /* read the value of the receive buffer. Note that on linux this returns 2x the
   * value we set because the kernel allocates extra memory for metadata.
   * The default on Linux is about 100K (which is about 50K without metadata) */
  ret = getsockopt (src->sock, SOL_SOCKET, SO_RCVBUF, (void *) &rcvsize, &len);
  if (ret == 0)
    GST_DEBUG_OBJECT (src, "have udp buffer of %d bytes", rcvsize);
  else
    GST_DEBUG_OBJECT (src, "could not get udp buffer size");

798
  bc_val = 1;
799
  if ((ret = setsockopt (src->sock, SOL_SOCKET, SO_BROADCAST, &bc_val,
800
801
802
              sizeof (bc_val))) < 0)
    goto no_broadcast;

803
804
805
806
807
808
809
810
  port = ntohs (my_addr.sin_port);
  GST_DEBUG_OBJECT (src, "bound, on port %d", port);
  if (port != src->port) {
    src->port = port;
    GST_DEBUG_OBJECT (src, "notifying %d", port);
    g_object_notify (G_OBJECT (src), "port");
  }

Thomas Vander Stichele's avatar
Thomas Vander Stichele committed
811
812
  src->myaddr.sin_port = htons (src->port + 1);

Andy Wingo Wingo's avatar
Andy Wingo Wingo committed
813
  return TRUE;
814

815
  /* ERRORS */
816
817
818
819
820
821
no_socket_pair:
  {
    GST_ELEMENT_ERROR (src, RESOURCE, OPEN_READ_WRITE, (NULL),
        ("no socket pair %d: %s (%d)", ret, g_strerror (errno), errno));
    return FALSE;
  }
822
no_socket:
823
  {
824
825
    GST_ELEMENT_ERROR (src, RESOURCE, OPEN_READ, (NULL),
        ("no socket error %d: %s (%d)", ret, g_strerror (errno), errno));
826
827
    return FALSE;
  }
828
829
setsockopt_error:
  {
830
    CLOSE_IF_REQUESTED (src);
831
832
    GST_ELEMENT_ERROR (src, RESOURCE, SETTINGS, (NULL),
        ("setsockopt failed %d: %s (%d)", ret, g_strerror (errno), errno));
833
    return FALSE;
834
  }
835
836
bind_error:
  {
837
    CLOSE_IF_REQUESTED (src);
838
839
840
841
842
843
    GST_ELEMENT_ERROR (src, RESOURCE, SETTINGS, (NULL),
        ("bind failed %d: %s (%d)", ret, g_strerror (errno), errno));
    return FALSE;
  }
membership:
  {
844
    CLOSE_IF_REQUESTED (src);
845
846
    GST_ELEMENT_ERROR (src, RESOURCE, SETTINGS, (NULL),
        ("could add membership %d: %s (%d)", ret, g_strerror (errno), errno));
847
848
849
850
    return FALSE;
  }
getsockname_error:
  {
851
    CLOSE_IF_REQUESTED (src);
852
853
854
855
    GST_ELEMENT_ERROR (src, RESOURCE, SETTINGS, (NULL),
        ("getsockname failed %d: %s (%d)", ret, g_strerror (errno), errno));
    return FALSE;
  }
856
857
udpbuffer_error:
  {
858
    CLOSE_IF_REQUESTED (src);
859
860
861
862
863
    GST_ELEMENT_ERROR (src, RESOURCE, SETTINGS, (NULL),
        ("Could not create a buffer of the size requested, %d: %s (%d)", ret,
            g_strerror (errno), errno));
    return FALSE;
  }
864
865
no_broadcast:
  {
866
    CLOSE_IF_REQUESTED (src);
867
868
869
    GST_ELEMENT_ERROR (src, RESOURCE, SETTINGS, (NULL),
        ("could not configure socket for broadcast %d: %s (%d)", ret,
            g_strerror (errno), errno));
870
    return FALSE;
Wim Taymans's avatar
Wim Taymans committed
871
  }
Andy Wingo Wingo's avatar
Andy Wingo Wingo committed
872
873
}

874
875
876
877
static gboolean
gst_udpsrc_unlock (GstBaseSrc * bsrc)
{
  GstUDPSrc *src;
878
  gint res;
879
880
881

  src = GST_UDPSRC (bsrc);

882
  GST_LOG_OBJECT (src, "sending stop command");
883
  SEND_COMMAND (src, CONTROL_STOP, res);
884
885
886
887
888
889
890
891
892
893
894
895
896
897
898
899
900
901
902
903
904
905
906
907
908
909
910
  GST_LOG_OBJECT (src, "sent stop command %d", res);

  return TRUE;
}

static gboolean
gst_udpsrc_unlock_stop (GstBaseSrc * bsrc)
{
  GstUDPSrc *src;

  src = GST_UDPSRC (bsrc);

  GST_LOG_OBJECT (src, "clearing unlock command queue");

  while (TRUE) {
    gchar command;
    int res;

    GST_LOG_OBJECT (src, "reading command");

    READ_COMMAND (src, command, res);
    if (res <= 0) {
      GST_LOG_OBJECT (src, "no more commands");
      /* no more commands */
      break;
    }
  }
911
912
913
914

  return TRUE;
}

915
916
static gboolean
gst_udpsrc_stop (GstBaseSrc * bsrc)
Andy Wingo Wingo's avatar
Andy Wingo Wingo committed
917
{
Wim Taymans's avatar
Wim Taymans committed
918
919
  GstUDPSrc *src;

920
  src = GST_UDPSRC (bsrc);
Wim Taymans's avatar
Wim Taymans committed
921

922
923
  GST_DEBUG ("stopping, closing sockets");

924
  if (src->sock != -1) {
925
    CLOSE_IF_REQUESTED (src);
Wim Taymans's avatar
Wim Taymans committed
926
  }
927

928
  /* pipes on WIN32 else sockets */
929
930
931
932
933
934
935
936
  if (src->control_sock[0] != -1) {
    close (src->control_sock[0]);
    src->control_sock[0] = -1;
  }
  if (src->control_sock[1] != -1) {
    close (src->control_sock[1]);
    src->control_sock[1] = -1;
  }
937
938
939

  WSA_CLEANUP (src);

940
  return TRUE;
Andy Wingo Wingo's avatar
Andy Wingo Wingo committed
941
}
942
943
944

/*** GSTURIHANDLER INTERFACE *************************************************/

945
static GstURIType
946
947
948
949
950
951
952
953
954
955
956
957
958
959
960
961
962
gst_udpsrc_uri_get_type (void)
{
  return GST_URI_SRC;
}
static gchar **
gst_udpsrc_uri_get_protocols (void)
{
  static gchar *protocols[] = { "udp", NULL };

  return protocols;
}

static const gchar *
gst_udpsrc_uri_get_uri (GstURIHandler * handler)
{
  GstUDPSrc *src = GST_UDPSRC (handler);

963
  return src->uri;
964
965
966
967
968
969
970
971
972
973
974
975
976
977
978
979
980
981
982
983
984
985
986
}

static gboolean
gst_udpsrc_uri_set_uri (GstURIHandler * handler, const gchar * uri)
{
  gboolean ret;
  GstUDPSrc *src = GST_UDPSRC (handler);

  ret = gst_udpsrc_set_uri (src, uri);

  return ret;
}

static void
gst_udpsrc_uri_handler_init (gpointer g_iface, gpointer iface_data)
{
  GstURIHandlerInterface *iface = (GstURIHandlerInterface *) g_iface;

  iface->get_type = gst_udpsrc_uri_get_type;
  iface->get_protocols = gst_udpsrc_uri_get_protocols;
  iface->get_uri = gst_udpsrc_uri_get_uri;
  iface->set_uri = gst_udpsrc_uri_set_uri;
}