gstudpsrc.c 30.7 KB
Newer Older
Andy Wingo Wingo's avatar
Andy Wingo Wingo committed
1
/* GStreamer
2
 * Copyright (C) <2005> Wim Taymans <wim@fluendo.com>
3
 * Copyright (C) <2005> Nokia Corporation <kai.vehmanen@nokia.com>
Andy Wingo Wingo's avatar
Andy Wingo Wingo committed
4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20
 *
 * This library is free software; you can redistribute it and/or
 * modify it under the terms of the GNU Library General Public
 * License as published by the Free Software Foundation; either
 * version 2 of the License, or (at your option) any later version.
 *
 * This library is distributed in the hope that it will be useful,
 * but WITHOUT ANY WARRANTY; without even the implied warranty of
 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
 * Library General Public License for more details.
 *
 * You should have received a copy of the GNU Library General Public
 * License along with this library; if not, write to the
 * Free Software Foundation, Inc., 59 Temple Place - Suite 330,
 * Boston, MA 02111-1307, USA.
 */

Wim Taymans's avatar
Wim Taymans committed
21 22 23 24 25 26
/**
 * SECTION:element-udpsrc
 * @see_also: udpsink, multifdsink
 *
 * udpsrc is a network source that reads UDP packets from the network.
 * It can be combined with RTP depayloaders to implement RTP streaming.
27
 *
Wim Taymans's avatar
Wim Taymans committed
28
 * The udpsrc element supports automatic port allocation by setting the
29 30 31
 * #GstUDPSrc:port property to 0. After setting the udpsrc to PAUSED, the
 * allocated port can be obtained by reading the port property.
 *
32
 * udpsrc can read from multicast groups by setting the #GstUDPSrc:multicast-group
33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64
 * property to the IP address of the multicast group.
 *
 * Alternatively one can provide a custom socket to udpsrc with the #GstUDPSrc:sockfd
 * property, udpsrc will then not allocate a socket itself but use the provided
 * one.
 *
 * The #GstUDPSrc:caps property is mainly used to give a type to the UDP packet
 * so that they can be autoplugged in GStreamer pipelines. This is very usefull
 * for RTP implementations where the contents of the UDP packets is transfered
 * out-of-bounds using SDP or other means.
 *
 * The #GstUDPSrc:buffer property is used to change the default kernel buffer
 * sizes used for receiving packets. The buffer size may be increased for
 * high-volume connections, or may be decreased to limit the possible backlog of
 * incoming data. The system places an absolute limit on these values, on Linux,
 * for example, the default buffer size is typically 50K and can be increased to
 * maximally 100K.
 *
 * The #GstUDPSrc:skip-first-bytes property is used to strip off an arbitrary
 * number of bytes from the start of the raw udp packet and can be used to strip
 * off proprietary header, for example. 
 *
 * The udpsrc is always a live source. It does however not provide a #GstClock,
 * this is left for upstream elements such as an RTP session manager or demuxer
 * (such as an MPEG demuxer). As with all live sources, the captured buffers
 * will have their timestamp set to the current running time of the pipeline.
 *
 * udpsrc implements a #GstURIHandler interface that handles udp://host:port
 * type URIs.
 *
 * If the #GstUDPSrc:timeout property is set to a value bigger than 0, udpsrc
 * will generate an element message named
65
 * <classname>&quot;GstUDPSrcTimeout&quot;</classname>
66 67 68 69 70 71
 * if no data was recieved in the given timeout.
 * The message's structure contains one field:
 * <itemizedlist>
 * <listitem>
 *   <para>
 *   #guint64
72 73
 *   <classname>&quot;timeout&quot;</classname>: the timeout in microseconds that
 *   expired when waiting for data.
74 75 76 77 78 79 80
 *   </para>
 * </listitem>
 * </itemizedlist>
 * The message is typically used to detect that no UDP arrives in the receiver
 * because it is blocked by a firewall.
 * </para>
 * <para>
81
 * A custom file descriptor can be configured with the 
82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100
 * #GstUDPSrc:sockfd property. The socket will be closed when setting the
 * element to READY by default. This behaviour can be
 * overriden with the #GstUDPSrc:closefd property, in which case the application
 * is responsible for closing the file descriptor.
 *
 * <refsect2>
 * <title>Examples</title>
 * |[
 * gst-launch -v udpsrc ! fakesink dump=1
 * ]| A pipeline to read from the default port and dump the udp packets.
 * To actually generate udp packets on the default port one can use the
 * udpsink element. When running the following pipeline in another terminal, the
 * above mentioned pipeline should dump data packets to the console.
 * |[
 * gst-launch -v audiotestsrc ! udpsink
 * ]|
 * |[
 * gst-launch -v udpsrc port=0 ! fakesink
 * ]| read udp packets from a free port.
Wim Taymans's avatar
Wim Taymans committed
101
 * </refsect2>
102 103
 *
 * Last reviewed on 2007-09-20 (0.10.7)
Wim Taymans's avatar
Wim Taymans committed
104
 */
105 106 107 108
#ifdef HAVE_CONFIG_H
#include "config.h"
#endif

Andy Wingo Wingo's avatar
Andy Wingo Wingo committed
109
#include "gstudpsrc.h"
110
#ifdef HAVE_UNISTD_H
111
#include <unistd.h>
112
#endif
113
#include <stdlib.h>
114

115 116 117 118
#if defined _MSC_VER && (_MSC_VER >= 1400)
#include <io.h>
#endif

119
#include <gst/netbuffer/gstnetbuffer.h>
120 121 122
#ifdef G_OS_WIN32
typedef int socklen_t;
#endif
123 124 125 126 127

#ifdef HAVE_FIONREAD_IN_SYS_FILIO
#include <sys/filio.h>
#endif

128
GST_DEBUG_CATEGORY_STATIC (udpsrc_debug);
129 130
#define GST_CAT_DEFAULT (udpsrc_debug)

Wim Taymans's avatar
Wim Taymans committed
131 132 133 134 135
static GstStaticPadTemplate src_template = GST_STATIC_PAD_TEMPLATE ("src",
    GST_PAD_SRC,
    GST_PAD_ALWAYS,
    GST_STATIC_CAPS_ANY);

Thomas Vander Stichele's avatar
Thomas Vander Stichele committed
136 137
#define UDP_DEFAULT_PORT                4951
#define UDP_DEFAULT_MULTICAST_GROUP     "0.0.0.0"
138
#define UDP_DEFAULT_MULTICAST_IFACE     NULL
Wim Taymans's avatar
Wim Taymans committed
139
#define UDP_DEFAULT_URI                 "udp://"UDP_DEFAULT_MULTICAST_GROUP":"G_STRINGIFY(UDP_DEFAULT_PORT)
Thomas Vander Stichele's avatar
Thomas Vander Stichele committed
140
#define UDP_DEFAULT_CAPS                NULL
Wim Taymans's avatar
Wim Taymans committed
141
#define UDP_DEFAULT_SOCKFD              -1
142 143
#define UDP_DEFAULT_BUFFER_SIZE		0
#define UDP_DEFAULT_TIMEOUT             0
144
#define UDP_DEFAULT_SKIP_FIRST_BYTES	0
145
#define UDP_DEFAULT_CLOSEFD            TRUE
146
#define UDP_DEFAULT_SOCK                -1
147
#define UDP_DEFAULT_AUTO_MULTICAST     TRUE
148

Thomas Vander Stichele's avatar
Thomas Vander Stichele committed
149 150
enum
{
151
  PROP_0,
152

153 154
  PROP_PORT,
  PROP_MULTICAST_GROUP,
155
  PROP_MULTICAST_IFACE,
156
  PROP_URI,
157
  PROP_CAPS,
158
  PROP_SOCKFD,
159
  PROP_BUFFER_SIZE,
160
  PROP_TIMEOUT,
161
  PROP_SKIP_FIRST_BYTES,
162
  PROP_CLOSEFD,
163 164 165 166
  PROP_SOCK,
  PROP_AUTO_MULTICAST,

  PROP_LAST
Andy Wingo Wingo's avatar
Andy Wingo Wingo committed
167 168
};

169 170 171 172 173 174 175 176 177 178
#define CLOSE_IF_REQUESTED(udpctx)                                        \
G_STMT_START {                                                            \
  if ((!udpctx->externalfd) || (udpctx->externalfd && udpctx->closefd)) { \
    CLOSE_SOCKET(udpctx->sock.fd);                                        \
    if (udpctx->sock.fd == udpctx->sockfd)                                \
      udpctx->sockfd = UDP_DEFAULT_SOCKFD;                                \
  }                                                                       \
  udpctx->sock.fd = UDP_DEFAULT_SOCK;                                     \
} G_STMT_END

179 180
static void gst_udpsrc_uri_handler_init (gpointer g_iface, gpointer iface_data);

181
static GstCaps *gst_udpsrc_getcaps (GstBaseSrc * src);
182

183
static GstFlowReturn gst_udpsrc_create (GstPushSrc * psrc, GstBuffer ** buf);
184

185
static gboolean gst_udpsrc_start (GstBaseSrc * bsrc);
186

187
static gboolean gst_udpsrc_stop (GstBaseSrc * bsrc);
188

189
static gboolean gst_udpsrc_unlock (GstBaseSrc * bsrc);
190

191
static gboolean gst_udpsrc_unlock_stop (GstBaseSrc * bsrc);
192

193
static void gst_udpsrc_finalize (GObject * object);
Andy Wingo Wingo's avatar
Andy Wingo Wingo committed
194

Thomas Vander Stichele's avatar
Thomas Vander Stichele committed
195 196 197 198
static void gst_udpsrc_set_property (GObject * object, guint prop_id,
    const GValue * value, GParamSpec * pspec);
static void gst_udpsrc_get_property (GObject * object, guint prop_id,
    GValue * value, GParamSpec * pspec);
Andy Wingo Wingo's avatar
Andy Wingo Wingo committed
199

200 201 202 203 204 205 206 207
static void
_do_init (GType type)
{
  static const GInterfaceInfo urihandler_info = {
    gst_udpsrc_uri_handler_init,
    NULL,
    NULL
  };
Thomas Vander Stichele's avatar
Thomas Vander Stichele committed
208

209
  g_type_add_interface_static (type, GST_TYPE_URI_HANDLER, &urihandler_info);
Andy Wingo Wingo's avatar
Andy Wingo Wingo committed
210

211
  GST_DEBUG_CATEGORY_INIT (udpsrc_debug, "udpsrc", 0, "UDP src");
Andy Wingo Wingo's avatar
Andy Wingo Wingo committed
212 213
}

214 215 216
GST_BOILERPLATE_FULL (GstUDPSrc, gst_udpsrc, GstPushSrc, GST_TYPE_PUSH_SRC,
    _do_init);

Benjamin Otte's avatar
Benjamin Otte committed
217 218 219 220 221
static void
gst_udpsrc_base_init (gpointer g_class)
{
  GstElementClass *element_class = GST_ELEMENT_CLASS (g_class);

Wim Taymans's avatar
Wim Taymans committed
222 223 224
  gst_element_class_add_pad_template (element_class,
      gst_static_pad_template_get (&src_template));

225 226 227 228 229
  gst_element_class_set_details_simple (element_class, "UDP packet receiver",
      "Source/Network",
      "Receive data over the network via UDP",
      "Wim Taymans <wim@fluendo.com>, "
      "Thijs Vermeir <thijs.vermeir@barco.com>");
Benjamin Otte's avatar
Benjamin Otte committed
230 231
}

Andy Wingo Wingo's avatar
Andy Wingo Wingo committed
232
static void
233
gst_udpsrc_class_init (GstUDPSrcClass * klass)
Andy Wingo Wingo's avatar
Andy Wingo Wingo committed
234 235
{
  GObjectClass *gobject_class;
236 237
  GstBaseSrcClass *gstbasesrc_class;
  GstPushSrcClass *gstpushsrc_class;
Andy Wingo Wingo's avatar
Andy Wingo Wingo committed
238

Thomas Vander Stichele's avatar
Thomas Vander Stichele committed
239
  gobject_class = (GObjectClass *) klass;
240 241
  gstbasesrc_class = (GstBaseSrcClass *) klass;
  gstpushsrc_class = (GstPushSrcClass *) klass;
Andy Wingo Wingo's avatar
Andy Wingo Wingo committed
242

Wim Taymans's avatar
Wim Taymans committed
243 244
  gobject_class->set_property = gst_udpsrc_set_property;
  gobject_class->get_property = gst_udpsrc_get_property;
245
  gobject_class->finalize = gst_udpsrc_finalize;
Wim Taymans's avatar
Wim Taymans committed
246

247
  g_object_class_install_property (G_OBJECT_CLASS (klass), PROP_PORT,
248
      g_param_spec_int ("port", "Port",
Wim Taymans's avatar
Wim Taymans committed
249
          "The port to receive the packets from, 0=allocate", 0, G_MAXUINT16,
250 251
          UDP_DEFAULT_PORT, G_PARAM_READWRITE));
  g_object_class_install_property (gobject_class, PROP_MULTICAST_GROUP,
252
      g_param_spec_string ("multicast-group", "Multicast Group",
253 254
          "The Address of multicast group to join", UDP_DEFAULT_MULTICAST_GROUP,
          G_PARAM_READWRITE));
255
  g_object_class_install_property (gobject_class, PROP_MULTICAST_IFACE,
256
      g_param_spec_string ("multicast-iface", "Multicast Interface",
257 258
          "The network interface on which to join the multicast group",
          UDP_DEFAULT_MULTICAST_IFACE, G_PARAM_READWRITE));
259 260
  g_object_class_install_property (gobject_class, PROP_URI,
      g_param_spec_string ("uri", "URI",
261
          "URI in the form of udp://multicast_group:port", UDP_DEFAULT_URI,
262
          G_PARAM_READWRITE));
263 264 265
  g_object_class_install_property (gobject_class, PROP_CAPS,
      g_param_spec_boxed ("caps", "Caps",
          "The caps of the source pad", GST_TYPE_CAPS, G_PARAM_READWRITE));
266
  g_object_class_install_property (gobject_class, PROP_SOCKFD,
267
      g_param_spec_int ("sockfd", "Socket Handle",
Wim Taymans's avatar
Wim Taymans committed
268 269
          "Socket to use for UDP reception. (-1 == allocate)",
          -1, G_MAXINT, UDP_DEFAULT_SOCKFD, G_PARAM_READWRITE));
270 271 272 273
  g_object_class_install_property (G_OBJECT_CLASS (klass), PROP_BUFFER_SIZE,
      g_param_spec_int ("buffer-size", "Buffer Size",
          "Size of the kernel receive buffer in bytes, 0=default", 0, G_MAXINT,
          UDP_DEFAULT_BUFFER_SIZE, G_PARAM_READWRITE));
274 275
  g_object_class_install_property (G_OBJECT_CLASS (klass), PROP_TIMEOUT,
      g_param_spec_uint64 ("timeout", "Timeout",
276 277
          "Post a message after timeout microseconds (0 = disabled)", 0,
          G_MAXUINT64, UDP_DEFAULT_TIMEOUT, G_PARAM_READWRITE));
278 279 280 281
  g_object_class_install_property (G_OBJECT_CLASS (klass),
      PROP_SKIP_FIRST_BYTES, g_param_spec_int ("skip-first-bytes",
          "Skip first bytes", "number of bytes to skip for each udp packet", 0,
          G_MAXINT, UDP_DEFAULT_SKIP_FIRST_BYTES, G_PARAM_READWRITE));
282 283 284 285
  g_object_class_install_property (gobject_class, PROP_CLOSEFD,
      g_param_spec_boolean ("closefd", "Close sockfd",
          "Close sockfd if passed as property on state change",
          UDP_DEFAULT_CLOSEFD, G_PARAM_READWRITE));
286 287 288 289
  g_object_class_install_property (gobject_class, PROP_SOCK,
      g_param_spec_int ("sock", "Socket Handle",
          "Socket currently in use for UDP reception. (-1 = no socket)",
          -1, G_MAXINT, UDP_DEFAULT_SOCK, G_PARAM_READABLE));
290 291 292 293
  g_object_class_install_property (gobject_class, PROP_AUTO_MULTICAST,
      g_param_spec_boolean ("auto-multicast", "Auto Multicast",
          "Automatically join/leave multicast groups",
          UDP_DEFAULT_AUTO_MULTICAST, G_PARAM_READWRITE));
Andy Wingo Wingo's avatar
Andy Wingo Wingo committed
294

295 296
  gstbasesrc_class->start = gst_udpsrc_start;
  gstbasesrc_class->stop = gst_udpsrc_stop;
297
  gstbasesrc_class->unlock = gst_udpsrc_unlock;
298
  gstbasesrc_class->unlock_stop = gst_udpsrc_unlock_stop;
299
  gstbasesrc_class->get_caps = gst_udpsrc_getcaps;
300

301
  gstpushsrc_class->create = gst_udpsrc_create;
Zeeshan Ali's avatar
Zeeshan Ali committed
302
}
Andy Wingo Wingo's avatar
Andy Wingo Wingo committed
303 304

static void
305
gst_udpsrc_init (GstUDPSrc * udpsrc, GstUDPSrcClass * g_class)
Andy Wingo Wingo's avatar
Andy Wingo Wingo committed
306
{
307 308
  WSA_STARTUP (udpsrc);

Wim Taymans's avatar
Wim Taymans committed
309 310 311
  gst_udp_uri_init (&udpsrc->uri, UDP_DEFAULT_MULTICAST_GROUP,
      UDP_DEFAULT_PORT);

312
  udpsrc->sockfd = UDP_DEFAULT_SOCKFD;
313
  udpsrc->multi_iface = g_strdup (UDP_DEFAULT_MULTICAST_IFACE);
314
  udpsrc->buffer_size = UDP_DEFAULT_BUFFER_SIZE;
315
  udpsrc->timeout = UDP_DEFAULT_TIMEOUT;
316
  udpsrc->skip_first_bytes = UDP_DEFAULT_SKIP_FIRST_BYTES;
317 318
  udpsrc->closefd = UDP_DEFAULT_CLOSEFD;
  udpsrc->externalfd = (udpsrc->sockfd != -1);
319
  udpsrc->auto_multicast = UDP_DEFAULT_AUTO_MULTICAST;
320
  udpsrc->sock.fd = UDP_DEFAULT_SOCK;
321 322 323 324

  /* configure basesrc to be a live source */
  gst_base_src_set_live (GST_BASE_SRC (udpsrc), TRUE);
  /* make basesrc output a segment in time */
325
  gst_base_src_set_format (GST_BASE_SRC (udpsrc), GST_FORMAT_TIME);
326 327
  /* make basesrc set timestamps on outgoing buffers based on the running_time
   * when they were captured */
328
  gst_base_src_set_do_timestamp (GST_BASE_SRC (udpsrc), TRUE);
Andy Wingo Wingo's avatar
Andy Wingo Wingo committed
329 330
}

331 332 333 334 335 336 337 338 339
static void
gst_udpsrc_finalize (GObject * object)
{
  GstUDPSrc *udpsrc;

  udpsrc = GST_UDPSRC (object);

  if (udpsrc->caps)
    gst_caps_unref (udpsrc->caps);
Wim Taymans's avatar
Wim Taymans committed
340

341
  g_free (udpsrc->multi_iface);
Wim Taymans's avatar
Wim Taymans committed
342 343 344

  gst_udp_uri_free (&udpsrc->uri);
  g_free (udpsrc->uristr);
345

346 347 348 349
  if (udpsrc->sockfd >= 0 && udpsrc->closefd)
    CLOSE_SOCKET (udpsrc->sockfd);

  WSA_CLEANUP (object);
350

351 352 353
  G_OBJECT_CLASS (parent_class)->finalize (object);
}

354 355 356 357 358 359 360 361 362 363 364 365 366
static GstCaps *
gst_udpsrc_getcaps (GstBaseSrc * src)
{
  GstUDPSrc *udpsrc;

  udpsrc = GST_UDPSRC (src);

  if (udpsrc->caps)
    return gst_caps_ref (udpsrc->caps);
  else
    return gst_caps_new_any ();
}

367 368 369 370
/* read a message from the error queue */
static void
clear_error (GstUDPSrc * udpsrc)
{
Wim Taymans's avatar
Wim Taymans committed
371
#if defined (MSG_ERRQUEUE)
372 373 374 375 376 377 378 379 380 381 382 383 384 385 386 387 388 389 390 391 392 393 394
  struct msghdr cmsg;
  char cbuf[128];
  char msgbuf[CMSG_SPACE (128)];
  struct iovec iov;

  /* Flush ERRORS from fd so next poll will not return at once */
  /* No need for address : We look for local error */
  cmsg.msg_name = NULL;
  cmsg.msg_namelen = 0;

  /* IOV */
  memset (&cbuf, 0, sizeof (cbuf));
  iov.iov_base = cbuf;
  iov.iov_len = sizeof (cbuf);
  cmsg.msg_iov = &iov;
  cmsg.msg_iovlen = 1;

  /* msg_control */
  memset (&msgbuf, 0, sizeof (msgbuf));
  cmsg.msg_control = &msgbuf;
  cmsg.msg_controllen = sizeof (msgbuf);

  recvmsg (udpsrc->sock.fd, &cmsg, MSG_ERRQUEUE);
Wim Taymans's avatar
Wim Taymans committed
395
#endif
396 397
}

398 399
static GstFlowReturn
gst_udpsrc_create (GstPushSrc * psrc, GstBuffer ** buf)
Andy Wingo Wingo's avatar
Andy Wingo Wingo committed
400 401
{
  GstUDPSrc *udpsrc;
402
  GstNetBuffer *outbuf;
403 404 405 406 407
  union gst_sockaddr
  {
    struct sockaddr sa;
    struct sockaddr_in sa_in;
    struct sockaddr_in6 sa_in6;
408
    struct sockaddr_storage sa_stor;
409
  } sa;
410
  socklen_t slen;
411
  guint8 *pktdata;
412
  gint pktsize;
413
#ifdef G_OS_UNIX
414
  gint readsize;
415
#elif defined G_OS_WIN32
416 417
  gulong readsize;
#endif
418
  GstClockTime timeout;
419 420
  gint ret;
  gboolean try_again;
Andy Wingo Wingo's avatar
Andy Wingo Wingo committed
421

422
  udpsrc = GST_UDPSRC_CAST (psrc);
Andy Wingo Wingo's avatar
Andy Wingo Wingo committed
423

424
retry:
425 426
  /* quick check, avoid going in select when we already have data */
  readsize = 0;
427 428
  if (G_UNLIKELY ((ret =
              IOCTL_SOCKET (udpsrc->sock.fd, FIONREAD, &readsize)) < 0))
429 430 431 432 433
    goto ioctl_failed;

  if (readsize > 0)
    goto no_select;

434 435 436 437 438
  if (udpsrc->timeout > 0) {
    timeout = udpsrc->timeout * GST_USECOND;
  } else {
    timeout = GST_CLOCK_TIME_NONE;
  }
439

440
  do {
441 442
    try_again = FALSE;

443 444 445
    GST_LOG_OBJECT (udpsrc, "doing select, timeout %" G_GUINT64_FORMAT,
        udpsrc->timeout);

446
    ret = gst_poll_wait (udpsrc->fdset, timeout);
447
    GST_LOG_OBJECT (udpsrc, "select returned %d", ret);
448
    if (G_UNLIKELY (ret < 0)) {
449 450
      if (errno == EBUSY)
        goto stopped;
451 452 453 454
#ifdef G_OS_WIN32
      if (WSAGetLastError () != WSAEINTR)
        goto select_error;
#else
455 456
      if (errno != EAGAIN && errno != EINTR)
        goto select_error;
457 458
#endif
      try_again = TRUE;
459
    } else if (G_UNLIKELY (ret == 0)) {
460 461 462 463 464 465
      /* timeout, post element message */
      gst_element_post_message (GST_ELEMENT_CAST (udpsrc),
          gst_message_new_element (GST_OBJECT_CAST (udpsrc),
              gst_structure_new ("GstUDPSrcTimeout",
                  "timeout", G_TYPE_UINT64, udpsrc->timeout, NULL)));
      try_again = TRUE;
466
    }
467
  } while (G_UNLIKELY (try_again));
Andy Wingo Wingo's avatar
Andy Wingo Wingo committed
468

469 470 471 472
  /* ask how much is available for reading on the socket, this should be exactly
   * one UDP packet. We will check the return value, though, because in some
   * case it can return 0 and we don't want a 0 sized buffer. */
  readsize = 0;
473 474
  if (G_UNLIKELY ((ret =
              IOCTL_SOCKET (udpsrc->sock.fd, FIONREAD, &readsize)) < 0))
475
    goto ioctl_failed;
Thomas Vander Stichele's avatar
Thomas Vander Stichele committed
476

477
  /* if we get here and there is nothing to read from the socket, the select got
478
   * woken up by activity on the socket but it was not a read. We know someone
479 480
   * will also do something with the socket so that we don't go into an infinite
   * loop in the select(). */
481 482
  if (G_UNLIKELY (!readsize)) {
    clear_error (udpsrc);
483
    goto retry;
484
  }
485

486 487 488
no_select:
  GST_LOG_OBJECT (udpsrc, "ioctl says %d bytes available", (int) readsize);

489 490
  pktdata = g_malloc (readsize);
  pktsize = readsize;
Zeeshan Ali's avatar
Zeeshan Ali committed
491

492
  while (TRUE) {
493
    slen = sizeof (sa);
494
#ifdef G_OS_WIN32
495 496
    ret = recvfrom (udpsrc->sock.fd, (char *) pktdata, pktsize, 0, &sa.sa,
        &slen);
497
#else
498
    ret = recvfrom (udpsrc->sock.fd, pktdata, pktsize, 0, &sa.sa, &slen);
499
#endif
500
    if (G_UNLIKELY (ret < 0)) {
501 502 503 504 505 506 507 508 509 510 511 512
#ifdef G_OS_WIN32
      /* WSAECONNRESET for a UDP socket means that a packet sent with udpsink
       * generated a "port unreachable" ICMP response. We ignore that and try
       * again. */
      if (WSAGetLastError () == WSAECONNRESET) {
        g_free (pktdata);
        pktdata = NULL;
        goto retry;
      }
      if (WSAGetLastError () != WSAEINTR)
        goto receive_error;
#else
513 514
      if (errno != EAGAIN && errno != EINTR)
        goto receive_error;
515
#endif
516 517 518
    } else
      break;
  }
Zeeshan Ali's avatar
Zeeshan Ali committed
519

520
  /* special case buffer so receivers can also track the address */
521
  outbuf = gst_netbuffer_new ();
522 523 524
  GST_BUFFER_MALLOCDATA (outbuf) = pktdata;

  /* patch pktdata and len when stripping off the headers */
525
  if (G_UNLIKELY (udpsrc->skip_first_bytes != 0)) {
526 527 528 529 530 531 532
    if (G_UNLIKELY (readsize <= udpsrc->skip_first_bytes))
      goto skip_error;

    pktdata += udpsrc->skip_first_bytes;
    ret -= udpsrc->skip_first_bytes;
  }
  GST_BUFFER_DATA (outbuf) = pktdata;
533
  GST_BUFFER_SIZE (outbuf) = ret;
534

535
  switch (sa.sa.sa_family) {
536 537
    case AF_INET:
    {
538 539
      gst_netaddress_set_ip4_address (&outbuf->from, sa.sa_in.sin_addr.s_addr,
          sa.sa_in.sin_port);
540 541 542 543 544 545
    }
      break;
    case AF_INET6:
    {
      guint8 ip6[16];

546 547
      memcpy (ip6, &sa.sa_in6.sin6_addr, sizeof (ip6));
      gst_netaddress_set_ip6_address (&outbuf->from, ip6, sa.sa_in6.sin6_port);
548 549 550
    }
      break;
    default:
551 552 553
#ifdef G_OS_WIN32
      WSASetLastError (WSAEAFNOSUPPORT);
#else
554
      errno = EAFNOSUPPORT;
555
#endif
556 557
      goto receive_error;
  }
558
  GST_LOG_OBJECT (udpsrc, "read %d bytes", (int) readsize);
559

560
  *buf = GST_BUFFER_CAST (outbuf);
Zeeshan Ali's avatar
Zeeshan Ali committed
561

562
  return GST_FLOW_OK;
Zeeshan Ali's avatar
Zeeshan Ali committed
563

564
  /* ERRORS */
Wim Taymans's avatar
Wim Taymans committed
565 566
select_error:
  {
567 568 569 570 571 572 573 574 575 576 577 578 579
    GST_ELEMENT_ERROR (udpsrc, RESOURCE, READ, (NULL),
        ("select error %d: %s (%d)", ret, g_strerror (errno), errno));
    return GST_FLOW_ERROR;
  }
stopped:
  {
    GST_DEBUG ("stop called");
    return GST_FLOW_WRONG_STATE;
  }
ioctl_failed:
  {
    GST_ELEMENT_ERROR (udpsrc, RESOURCE, READ, (NULL),
        ("ioctl failed %d: %s (%d)", ret, g_strerror (errno), errno));
580
    return GST_FLOW_ERROR;
Andy Wingo Wingo's avatar
Andy Wingo Wingo committed
581
  }
Wim Taymans's avatar
Wim Taymans committed
582 583
receive_error:
  {
584
    g_free (pktdata);
585 586 587 588
#ifdef G_OS_WIN32
    GST_ELEMENT_ERROR (udpsrc, RESOURCE, READ, (NULL),
        ("receive error %d (WSA error: %d)", ret, WSAGetLastError ()));
#else
589 590
    GST_ELEMENT_ERROR (udpsrc, RESOURCE, READ, (NULL),
        ("receive error %d: %s (%d)", ret, g_strerror (errno), errno));
591
#endif
592
    return GST_FLOW_ERROR;
593
  }
594 595 596 597 598 599
skip_error:
  {
    GST_ELEMENT_ERROR (udpsrc, STREAM, DECODE, (NULL),
        ("UDP buffer to small to skip header"));
    return GST_FLOW_ERROR;
  }
600 601 602 603 604
}

static gboolean
gst_udpsrc_set_uri (GstUDPSrc * src, const gchar * uri)
{
Wim Taymans's avatar
Wim Taymans committed
605 606
  if (gst_udp_parse_uri (uri, &src->uri) < 0)
    goto wrong_uri;
607

Wim Taymans's avatar
Wim Taymans committed
608 609
  if (src->uri.port == -1)
    src->uri.port = UDP_DEFAULT_PORT;
610 611 612

  return TRUE;

613
  /* ERRORS */
Wim Taymans's avatar
Wim Taymans committed
614
wrong_uri:
615
  {
616
    GST_ELEMENT_ERROR (src, RESOURCE, READ, (NULL),
Wim Taymans's avatar
Wim Taymans committed
617
        ("error parsing uri %s", uri));
618 619
    return FALSE;
  }
Andy Wingo Wingo's avatar
Andy Wingo Wingo committed
620 621 622
}

static void
Thomas Vander Stichele's avatar
Thomas Vander Stichele committed
623 624
gst_udpsrc_set_property (GObject * object, guint prop_id, const GValue * value,
    GParamSpec * pspec)
Andy Wingo Wingo's avatar
Andy Wingo Wingo committed
625
{
626
  GstUDPSrc *udpsrc = GST_UDPSRC (object);
Andy Wingo Wingo's avatar
Andy Wingo Wingo committed
627 628

  switch (prop_id) {
629 630
    case PROP_BUFFER_SIZE:
      udpsrc->buffer_size = g_value_get_int (value);
631
      break;
632
    case PROP_PORT:
Wim Taymans's avatar
Wim Taymans committed
633
      gst_udp_uri_update (&udpsrc->uri, NULL, g_value_get_int (value));
Andy Wingo Wingo's avatar
Andy Wingo Wingo committed
634
      break;
635
    case PROP_MULTICAST_GROUP:
Wim Taymans's avatar
Wim Taymans committed
636 637
    {
      const gchar *group;
Thomas Vander Stichele's avatar
Thomas Vander Stichele committed
638

Wim Taymans's avatar
Wim Taymans committed
639 640
      if ((group = g_value_get_string (value)))
        gst_udp_uri_update (&udpsrc->uri, group, -1);
641
      else
Wim Taymans's avatar
Wim Taymans committed
642
        gst_udp_uri_update (&udpsrc->uri, UDP_DEFAULT_MULTICAST_GROUP, -1);
643
      break;
Wim Taymans's avatar
Wim Taymans committed
644
    }
645 646 647 648 649 650 651 652
    case PROP_MULTICAST_IFACE:
      g_free (udpsrc->multi_iface);

      if (g_value_get_string (value) == NULL)
        udpsrc->multi_iface = g_strdup (UDP_DEFAULT_MULTICAST_IFACE);
      else
        udpsrc->multi_iface = g_value_dup_string (value);
      break;
653 654
    case PROP_URI:
      gst_udpsrc_set_uri (udpsrc, g_value_get_string (value));
655
      break;
656 657 658
    case PROP_CAPS:
    {
      const GstCaps *new_caps_val = gst_value_get_caps (value);
659

660
      GstCaps *new_caps;
661

662 663 664 665 666 667 668 669 670 671
      GstCaps *old_caps;

      if (new_caps_val == NULL) {
        new_caps = gst_caps_new_any ();
      } else {
        new_caps = gst_caps_copy (new_caps_val);
      }

      old_caps = udpsrc->caps;
      udpsrc->caps = new_caps;
Wim Taymans's avatar
Wim Taymans committed
672 673
      if (old_caps)
        gst_caps_unref (old_caps);
674
      gst_pad_set_caps (GST_BASE_SRC (udpsrc)->srcpad, new_caps);
675 676
      break;
    }
677
    case PROP_SOCKFD:
678 679 680
      if (udpsrc->sockfd >= 0 && udpsrc->sockfd != udpsrc->sock.fd &&
          udpsrc->closefd)
        CLOSE_SOCKET (udpsrc->sockfd);
681 682
      udpsrc->sockfd = g_value_get_int (value);
      GST_DEBUG ("setting SOCKFD to %d", udpsrc->sockfd);
683
      break;
684 685 686
    case PROP_TIMEOUT:
      udpsrc->timeout = g_value_get_uint64 (value);
      break;
687 688 689
    case PROP_SKIP_FIRST_BYTES:
      udpsrc->skip_first_bytes = g_value_get_int (value);
      break;
690 691 692
    case PROP_CLOSEFD:
      udpsrc->closefd = g_value_get_boolean (value);
      break;
693 694 695
    case PROP_AUTO_MULTICAST:
      udpsrc->auto_multicast = g_value_get_boolean (value);
      break;
Andy Wingo Wingo's avatar
Andy Wingo Wingo committed
696 697 698 699 700 701
    default:
      break;
  }
}

static void
Thomas Vander Stichele's avatar
Thomas Vander Stichele committed
702 703
gst_udpsrc_get_property (GObject * object, guint prop_id, GValue * value,
    GParamSpec * pspec)
Andy Wingo Wingo's avatar
Andy Wingo Wingo committed
704
{
705
  GstUDPSrc *udpsrc = GST_UDPSRC (object);
Andy Wingo Wingo's avatar
Andy Wingo Wingo committed
706 707

  switch (prop_id) {
708 709
    case PROP_BUFFER_SIZE:
      g_value_set_int (value, udpsrc->buffer_size);
710
      break;
711
    case PROP_PORT:
Wim Taymans's avatar
Wim Taymans committed
712
      g_value_set_int (value, udpsrc->uri.port);
Andy Wingo Wingo's avatar
Andy Wingo Wingo committed
713
      break;
714
    case PROP_MULTICAST_GROUP:
Wim Taymans's avatar
Wim Taymans committed
715
      g_value_set_string (value, udpsrc->uri.host);
716
      break;
717 718 719
    case PROP_MULTICAST_IFACE:
      g_value_set_string (value, udpsrc->multi_iface);
      break;
720
    case PROP_URI:
Wim Taymans's avatar
Wim Taymans committed
721
      g_value_take_string (value, gst_udp_uri_string (&udpsrc->uri));
722
      break;
723 724 725
    case PROP_CAPS:
      gst_value_set_caps (value, udpsrc->caps);
      break;
726
    case PROP_SOCKFD:
727
      g_value_set_int (value, udpsrc->sockfd);
728
      break;
729 730 731
    case PROP_TIMEOUT:
      g_value_set_uint64 (value, udpsrc->timeout);
      break;
732 733 734
    case PROP_SKIP_FIRST_BYTES:
      g_value_set_int (value, udpsrc->skip_first_bytes);
      break;
735 736 737
    case PROP_CLOSEFD:
      g_value_set_boolean (value, udpsrc->closefd);
      break;
738
    case PROP_SOCK:
739
      g_value_set_int (value, udpsrc->sock.fd);
740
      break;
741 742 743
    case PROP_AUTO_MULTICAST:
      g_value_set_boolean (value, udpsrc->auto_multicast);
      break;
Andy Wingo Wingo's avatar
Andy Wingo Wingo committed
744 745 746 747 748 749
    default:
      G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
      break;
  }
}

750
/* create a socket for sending to remote machine */
Andy Wingo Wingo's avatar
Andy Wingo Wingo committed
751
static gboolean
752
gst_udpsrc_start (GstBaseSrc * bsrc)
Andy Wingo Wingo's avatar
Andy Wingo Wingo committed
753
{
754
  guint bc_val;
755
  guint err_val;
756
  gint reuse;
757
  int port;
758
  GstUDPSrc *src;
759
  gint ret;
760
  int rcvsize;
761 762 763
#ifdef G_OS_WIN32
  gint len;
#else
764
  guint len;
765
#endif
766
  src = GST_UDPSRC (bsrc);
Thomas Vander Stichele's avatar
Thomas Vander Stichele committed
767

768 769
  if (src->sockfd == -1) {
    /* need to allocate a socket */
Wim Taymans's avatar
Wim Taymans committed
770 771
    GST_DEBUG_OBJECT (src, "allocating socket for %s:%d", src->uri.host,
        src->uri.port);
772
    if ((ret =
Wim Taymans's avatar
Wim Taymans committed
773
            gst_udp_get_addr (src->uri.host, src->uri.port, &src->myaddr)) < 0)
774
      goto getaddrinfo_error;
775

776
    if ((ret = socket (src->myaddr.ss_family, SOCK_DGRAM, IPPROTO_UDP)) < 0)
777 778
      goto no_socket;

779
    src->sock.fd = ret;
780
    src->externalfd = FALSE;
781

782 783
    GST_DEBUG_OBJECT (src, "got socket %d", src->sock.fd);

784 785
    reuse = 1;
    if ((ret =
786
            setsockopt (src->sock.fd, SOL_SOCKET, SO_REUSEADDR, &reuse,
787 788 789
                sizeof (reuse))) < 0)
      goto setsockopt_error;

Wim Taymans's avatar
Wim Taymans committed
790
    GST_DEBUG_OBJECT (src, "binding on port %d", src->uri.port);
791

792
    len = gst_udp_get_sockaddr_length (&src->myaddr);
793
    if ((ret = bind (src->sock.fd, (struct sockaddr *) &src->myaddr, len)) < 0)
794
      goto bind_error;
795 796 797 798 799

    len = sizeof (src->myaddr);
    if ((ret = getsockname (src->sock.fd, (struct sockaddr *) &src->myaddr,
                &len)) < 0)
      goto getsockname_error;
800
  } else {
801 802 803 804 805 806 807 808
    GST_DEBUG_OBJECT (src, "using provided socket %d", src->sockfd);
    /* we use the configured socket, try to get some info about it */
    len = sizeof (src->myaddr);
    if ((ret =
            getsockname (src->sockfd, (struct sockaddr *) &src->myaddr,
                &len)) < 0)
      goto getsockname_error;

809
    src->sock.fd = src->sockfd;
810
    src->externalfd = TRUE;
811
  }
Andy Wingo Wingo's avatar
Andy Wingo Wingo committed
812

813
  len = sizeof (rcvsize);
814 815
  if (src->buffer_size != 0) {
    rcvsize = src->buffer_size;
816 817 818 819 820

    GST_DEBUG_OBJECT (src, "setting udp buffer of %d bytes", rcvsize);
    /* set buffer size, Note that on Linux this is typically limited to a
     * maximum of around 100K. Also a minimum of 128 bytes is required on
     * Linux. */
821 822 823
    ret =
        setsockopt (src->sock.fd, SOL_SOCKET, SO_RCVBUF, (void *) &rcvsize,
        len);
824 825 826 827 828
    if (ret != 0) {
      GST_ELEMENT_WARNING (src, RESOURCE, SETTINGS, (NULL),
          ("Could not create a buffer of requested %d bytes, %d: %s (%d)",
              rcvsize, ret, g_strerror (errno), errno));
    }
829 830
  }

831 832 833
  /* read the value of the receive buffer. Note that on linux this returns 2x the
   * value we set because the kernel allocates extra memory for metadata.
   * The default on Linux is about 100K (which is about 50K without metadata) */
834 835
  ret =
      getsockopt (src->sock.fd, SOL_SOCKET, SO_RCVBUF, (void *) &rcvsize, &len);
836 837 838 839 840
  if (ret == 0)
    GST_DEBUG_OBJECT (src, "have udp buffer of %d bytes", rcvsize);
  else
    GST_DEBUG_OBJECT (src, "could not get udp buffer size");

841
  bc_val = 1;
842
  if ((ret = setsockopt (src->sock.fd, SOL_SOCKET, SO_BROADCAST, &bc_val,