gstudpsrc.c 54 KB
Newer Older
1
/* GStreamer
2
 * Copyright (C) <2005> Wim Taymans <wim@fluendo.com>
3
 * Copyright (C) <2005> Nokia Corporation <kai.vehmanen@nokia.com>
Sebastian Dröge's avatar
Sebastian Dröge committed
4 5
 * Copyright (C) <2012> Collabora Ltd.
 *   Author: Sebastian Dröge <sebastian.droege@collabora.co.uk>
6 7
 * Copyright (C) 2014 Tim-Philipp Müller <tim@centricular.com>
 * Copyright (C) 2014 Centricular Ltd
Andy Wingo Wingo's avatar
Andy Wingo Wingo committed
8 9 10 11 12 13 14 15 16 17 18 19 20
 *
 * This library is free software; you can redistribute it and/or
 * modify it under the terms of the GNU Library General Public
 * License as published by the Free Software Foundation; either
 * version 2 of the License, or (at your option) any later version.
 *
 * This library is distributed in the hope that it will be useful,
 * but WITHOUT ANY WARRANTY; without even the implied warranty of
 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
 * Library General Public License for more details.
 *
 * You should have received a copy of the GNU Library General Public
 * License along with this library; if not, write to the
Tim-Philipp Müller's avatar
Tim-Philipp Müller committed
21 22
 * Free Software Foundation, Inc., 51 Franklin St, Fifth Floor,
 * Boston, MA 02110-1301, USA.
Andy Wingo Wingo's avatar
Andy Wingo Wingo committed
23 24
 */

Wim Taymans's avatar
Wim Taymans committed
25 26 27 28 29 30
/**
 * SECTION:element-udpsrc
 * @see_also: udpsink, multifdsink
 *
 * udpsrc is a network source that reads UDP packets from the network.
 * It can be combined with RTP depayloaders to implement RTP streaming.
31
 *
Wim Taymans's avatar
Wim Taymans committed
32
 * The udpsrc element supports automatic port allocation by setting the
33 34 35
 * #GstUDPSrc:port property to 0. After setting the udpsrc to PAUSED, the
 * allocated port can be obtained by reading the port property.
 *
36
 * udpsrc can read from multicast groups by setting the #GstUDPSrc:multicast-group
37 38
 * property to the IP address of the multicast group.
 *
39
 * Alternatively one can provide a custom socket to udpsrc with the #GstUDPSrc:socket
40 41 42 43
 * property, udpsrc will then not allocate a socket itself but use the provided
 * one.
 *
 * The #GstUDPSrc:caps property is mainly used to give a type to the UDP packet
44
 * so that they can be autoplugged in GStreamer pipelines. This is very useful
45 46 47
 * for RTP implementations where the contents of the UDP packets is transfered
 * out-of-bounds using SDP or other means.
 *
48 49
 * The #GstUDPSrc:buffer-size property is used to change the default kernel
 * buffersizes used for receiving packets. The buffer size may be increased for
50 51 52 53 54 55 56
 * high-volume connections, or may be decreased to limit the possible backlog of
 * incoming data. The system places an absolute limit on these values, on Linux,
 * for example, the default buffer size is typically 50K and can be increased to
 * maximally 100K.
 *
 * The #GstUDPSrc:skip-first-bytes property is used to strip off an arbitrary
 * number of bytes from the start of the raw udp packet and can be used to strip
57
 * off proprietary header, for example.
58 59
 *
 * The udpsrc is always a live source. It does however not provide a #GstClock,
60
 * this is left for downstream elements such as an RTP session manager or demuxer
61 62 63 64 65 66 67 68
 * (such as an MPEG demuxer). As with all live sources, the captured buffers
 * will have their timestamp set to the current running time of the pipeline.
 *
 * udpsrc implements a #GstURIHandler interface that handles udp://host:port
 * type URIs.
 *
 * If the #GstUDPSrc:timeout property is set to a value bigger than 0, udpsrc
 * will generate an element message named
69
 * <classname>&quot;GstUDPSrcTimeout&quot;</classname>
70 71 72 73 74 75
 * if no data was recieved in the given timeout.
 * The message's structure contains one field:
 * <itemizedlist>
 * <listitem>
 *   <para>
 *   #guint64
76 77
 *   <classname>&quot;timeout&quot;</classname>: the timeout in microseconds that
 *   expired when waiting for data.
78 79 80 81 82
 *   </para>
 * </listitem>
 * </itemizedlist>
 * The message is typically used to detect that no UDP arrives in the receiver
 * because it is blocked by a firewall.
83
 *
84
 * A custom file descriptor can be configured with the
85 86 87 88
 * #GstUDPSrc:socket property. The socket will be closed when setting
 * the element to READY by default. This behaviour can be overriden
 * with the #GstUDPSrc:close-socket property, in which case the
 * application is responsible for closing the file descriptor.
89 90 91 92
 *
 * <refsect2>
 * <title>Examples</title>
 * |[
93
 * gst-launch-1.0 -v udpsrc ! fakesink dump=1
94 95 96 97 98
 * ]| A pipeline to read from the default port and dump the udp packets.
 * To actually generate udp packets on the default port one can use the
 * udpsink element. When running the following pipeline in another terminal, the
 * above mentioned pipeline should dump data packets to the console.
 * |[
99
 * gst-launch-1.0 -v audiotestsrc ! udpsink
100 101
 * ]|
 * |[
102
 * gst-launch-1.0 -v udpsrc port=0 ! fakesink
103
 * ]| read udp packets from a free port.
Wim Taymans's avatar
Wim Taymans committed
104 105
 * </refsect2>
 */
106 107 108 109
#ifdef HAVE_CONFIG_H
#include "config.h"
#endif

110 111 112 113
/* Needed to get struct in6_pktinfo.
 * Also all these have to be before glib.h is included as
 * otherwise struct in6_pktinfo is not defined completely
 * due to broken glibc headers */
114
#define _GNU_SOURCE
115 116
/* Needed for OSX/iOS to define the IPv6 variants */
#define __APPLE_USE_RFC_3542
117
#include <sys/types.h>
118
#ifdef HAVE_SYS_SOCKET_H
119
#include <sys/socket.h>
120
#endif
121

122
#include <string.h>
Andy Wingo Wingo's avatar
Andy Wingo Wingo committed
123
#include "gstudpsrc.h"
124

Wim Taymans's avatar
Wim Taymans committed
125
#include <gst/net/gstnetaddressmeta.h>
126

127
#include <gio/gnetworking.h>
128

129 130
/* Required for other parts of in_pktinfo / in6_pktinfo but only
 * on non-Windows and can be included after glib.h */
131
#ifndef G_PLATFORM_WIN32
132 133 134
#include <netinet/ip.h>
#endif

135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159
/* Control messages for getting the destination address */
#ifdef IP_PKTINFO
GType gst_ip_pktinfo_message_get_type (void);

#define GST_TYPE_IP_PKTINFO_MESSAGE         (gst_ip_pktinfo_message_get_type ())
#define GST_IP_PKTINFO_MESSAGE(o)           (G_TYPE_CHECK_INSTANCE_CAST ((o), GST_TYPE_IP_PKTINFO_MESSAGE, GstIPPktinfoMessage))
#define GST_IP_PKTINFO_MESSAGE_CLASS(c)     (G_TYPE_CHECK_CLASS_CAST ((c), GST_TYPE_IP_PKTINFO_MESSAGE, GstIPPktinfoMessageClass))
#define GST_IS_IP_PKTINFO_MESSAGE(o)        (G_TYPE_CHECK_INSTANCE_TYPE ((o), GST_TYPE_IP_PKTINFO_MESSAGE))
#define GST_IS_IP_PKTINFO_MESSAGE_CLASS(c)  (G_TYPE_CHECK_CLASS_TYPE ((c), GST_TYPE_IP_PKTINFO_MESSAGE))
#define GST_IP_PKTINFO_MESSAGE_GET_CLASS(o) (G_TYPE_INSTANCE_GET_CLASS ((o), GST_TYPE_IP_PKTINFO_MESSAGE, GstIPPktinfoMessageClass))

typedef struct _GstIPPktinfoMessage GstIPPktinfoMessage;
typedef struct _GstIPPktinfoMessageClass GstIPPktinfoMessageClass;

struct _GstIPPktinfoMessageClass
{
  GSocketControlMessageClass parent_class;

};

struct _GstIPPktinfoMessage
{
  GSocketControlMessage parent;

  guint ifindex;
160
#ifndef G_PLATFORM_WIN32
161
#ifndef __NetBSD__
162
  struct in_addr spec_dst;
163
#endif
164 165
#endif
  struct in_addr addr;
166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205
};

G_DEFINE_TYPE (GstIPPktinfoMessage, gst_ip_pktinfo_message,
    G_TYPE_SOCKET_CONTROL_MESSAGE);

static gsize
gst_ip_pktinfo_message_get_size (GSocketControlMessage * message)
{
  return sizeof (struct in_pktinfo);
}

static int
gst_ip_pktinfo_message_get_level (GSocketControlMessage * message)
{
  return IPPROTO_IP;
}

static int
gst_ip_pktinfo_message_get_msg_type (GSocketControlMessage * message)
{
  return IP_PKTINFO;
}

static GSocketControlMessage *
gst_ip_pktinfo_message_deserialize (gint level,
    gint type, gsize size, gpointer data)
{
  struct in_pktinfo *pktinfo;
  GstIPPktinfoMessage *message;

  if (level != IPPROTO_IP || type != IP_PKTINFO)
    return NULL;

  if (size < sizeof (struct in_pktinfo))
    return NULL;

  pktinfo = data;

  message = g_object_new (GST_TYPE_IP_PKTINFO_MESSAGE, NULL);
  message->ifindex = pktinfo->ipi_ifindex;
206
#ifndef G_PLATFORM_WIN32
207
#ifndef __NetBSD__
208
  message->spec_dst = pktinfo->ipi_spec_dst;
209
#endif
210
#endif
211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291
  message->addr = pktinfo->ipi_addr;

  return G_SOCKET_CONTROL_MESSAGE (message);
}

static void
gst_ip_pktinfo_message_init (GstIPPktinfoMessage * message)
{
}

static void
gst_ip_pktinfo_message_class_init (GstIPPktinfoMessageClass * class)
{
  GSocketControlMessageClass *scm_class;

  scm_class = G_SOCKET_CONTROL_MESSAGE_CLASS (class);
  scm_class->get_size = gst_ip_pktinfo_message_get_size;
  scm_class->get_level = gst_ip_pktinfo_message_get_level;
  scm_class->get_type = gst_ip_pktinfo_message_get_msg_type;
  scm_class->deserialize = gst_ip_pktinfo_message_deserialize;
}
#endif

#ifdef IPV6_PKTINFO
GType gst_ipv6_pktinfo_message_get_type (void);

#define GST_TYPE_IPV6_PKTINFO_MESSAGE         (gst_ipv6_pktinfo_message_get_type ())
#define GST_IPV6_PKTINFO_MESSAGE(o)           (G_TYPE_CHECK_INSTANCE_CAST ((o), GST_TYPE_IPV6_PKTINFO_MESSAGE, GstIPV6PktinfoMessage))
#define GST_IPV6_PKTINFO_MESSAGE_CLASS(c)     (G_TYPE_CHECK_CLASS_CAST ((c), GST_TYPE_IPV6_PKTINFO_MESSAGE, GstIPV6PktinfoMessageClass))
#define GST_IS_IPV6_PKTINFO_MESSAGE(o)        (G_TYPE_CHECK_INSTANCE_TYPE ((o), GST_TYPE_IPV6_PKTINFO_MESSAGE))
#define GST_IS_IPV6_PKTINFO_MESSAGE_CLASS(c)  (G_TYPE_CHECK_CLASS_TYPE ((c), GST_TYPE_IPV6_PKTINFO_MESSAGE))
#define GST_IPV6_PKTINFO_MESSAGE_GET_CLASS(o) (G_TYPE_INSTANCE_GET_CLASS ((o), GST_TYPE_IPV6_PKTINFO_MESSAGE, GstIPV6PktinfoMessageClass))

typedef struct _GstIPV6PktinfoMessage GstIPV6PktinfoMessage;
typedef struct _GstIPV6PktinfoMessageClass GstIPV6PktinfoMessageClass;

struct _GstIPV6PktinfoMessageClass
{
  GSocketControlMessageClass parent_class;

};

struct _GstIPV6PktinfoMessage
{
  GSocketControlMessage parent;

  guint ifindex;
  struct in6_addr addr;
};

G_DEFINE_TYPE (GstIPV6PktinfoMessage, gst_ipv6_pktinfo_message,
    G_TYPE_SOCKET_CONTROL_MESSAGE);

static gsize
gst_ipv6_pktinfo_message_get_size (GSocketControlMessage * message)
{
  return sizeof (struct in6_pktinfo);
}

static int
gst_ipv6_pktinfo_message_get_level (GSocketControlMessage * message)
{
  return IPPROTO_IPV6;
}

static int
gst_ipv6_pktinfo_message_get_msg_type (GSocketControlMessage * message)
{
  return IPV6_PKTINFO;
}

static GSocketControlMessage *
gst_ipv6_pktinfo_message_deserialize (gint level,
    gint type, gsize size, gpointer data)
{
  struct in6_pktinfo *pktinfo;
  GstIPV6PktinfoMessage *message;

  if (level != IPPROTO_IPV6 || type != IPV6_PKTINFO)
    return NULL;

292
  if (size < sizeof (struct in6_pktinfo))
293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 316 317 318 319 320 321 322 323 324 325 326 327 328 329 330 331 332 333 334 335 336 337 338 339 340 341 342 343 344 345 346 347 348 349 350 351 352 353 354 355 356 357 358 359 360 361 362 363 364 365 366 367 368 369 370 371 372 373 374 375 376 377 378 379 380 381 382 383 384 385 386
    return NULL;

  pktinfo = data;

  message = g_object_new (GST_TYPE_IPV6_PKTINFO_MESSAGE, NULL);
  message->ifindex = pktinfo->ipi6_ifindex;
  message->addr = pktinfo->ipi6_addr;

  return G_SOCKET_CONTROL_MESSAGE (message);
}

static void
gst_ipv6_pktinfo_message_init (GstIPV6PktinfoMessage * message)
{
}

static void
gst_ipv6_pktinfo_message_class_init (GstIPV6PktinfoMessageClass * class)
{
  GSocketControlMessageClass *scm_class;

  scm_class = G_SOCKET_CONTROL_MESSAGE_CLASS (class);
  scm_class->get_size = gst_ipv6_pktinfo_message_get_size;
  scm_class->get_level = gst_ipv6_pktinfo_message_get_level;
  scm_class->get_type = gst_ipv6_pktinfo_message_get_msg_type;
  scm_class->deserialize = gst_ipv6_pktinfo_message_deserialize;
}

#endif

#ifdef IP_RECVDSTADDR
GType gst_ip_recvdstaddr_message_get_type (void);

#define GST_TYPE_IP_RECVDSTADDR_MESSAGE         (gst_ip_recvdstaddr_message_get_type ())
#define GST_IP_RECVDSTADDR_MESSAGE(o)           (G_TYPE_CHECK_INSTANCE_CAST ((o), GST_TYPE_IP_RECVDSTADDR_MESSAGE, GstIPRecvdstaddrMessage))
#define GST_IP_RECVDSTADDR_MESSAGE_CLASS(c)     (G_TYPE_CHECK_CLASS_CAST ((c), GST_TYPE_IP_RECVDSTADDR_MESSAGE, GstIPRecvdstaddrMessageClass))
#define GST_IS_IP_RECVDSTADDR_MESSAGE(o)        (G_TYPE_CHECK_INSTANCE_TYPE ((o), GST_TYPE_IP_RECVDSTADDR_MESSAGE))
#define GST_IS_IP_RECVDSTADDR_MESSAGE_CLASS(c)  (G_TYPE_CHECK_CLASS_TYPE ((c), GST_TYPE_IP_RECVDSTADDR_MESSAGE))
#define GST_IP_RECVDSTADDR_MESSAGE_GET_CLASS(o) (G_TYPE_INSTANCE_GET_CLASS ((o), GST_TYPE_IP_RECVDSTADDR_MESSAGE, GstIPRecvdstaddrMessageClass))

typedef struct _GstIPRecvdstaddrMessage GstIPRecvdstaddrMessage;
typedef struct _GstIPRecvdstaddrMessageClass GstIPRecvdstaddrMessageClass;

struct _GstIPRecvdstaddrMessageClass
{
  GSocketControlMessageClass parent_class;

};

struct _GstIPRecvdstaddrMessage
{
  GSocketControlMessage parent;

  guint ifindex;
  struct in_addr addr;
};

G_DEFINE_TYPE (GstIPRecvdstaddrMessage, gst_ip_recvdstaddr_message,
    G_TYPE_SOCKET_CONTROL_MESSAGE);

static gsize
gst_ip_recvdstaddr_message_get_size (GSocketControlMessage * message)
{
  return sizeof (struct in_addr);
}

static int
gst_ip_recvdstaddr_message_get_level (GSocketControlMessage * message)
{
  return IPPROTO_IP;
}

static int
gst_ip_recvdstaddr_message_get_msg_type (GSocketControlMessage * message)
{
  return IP_RECVDSTADDR;
}

static GSocketControlMessage *
gst_ip_recvdstaddr_message_deserialize (gint level,
    gint type, gsize size, gpointer data)
{
  struct in_addr *addr;
  GstIPRecvdstaddrMessage *message;

  if (level != IPPROTO_IP || type != IP_RECVDSTADDR)
    return NULL;

  if (size < sizeof (struct in_addr))
    return NULL;

  addr = data;

  message = g_object_new (GST_TYPE_IP_RECVDSTADDR_MESSAGE, NULL);
387
  message->addr = *addr;
388 389 390 391 392 393 394 395 396 397 398 399 400 401 402 403 404 405 406 407 408 409

  return G_SOCKET_CONTROL_MESSAGE (message);
}

static void
gst_ip_recvdstaddr_message_init (GstIPRecvdstaddrMessage * message)
{
}

static void
gst_ip_recvdstaddr_message_class_init (GstIPRecvdstaddrMessageClass * class)
{
  GSocketControlMessageClass *scm_class;

  scm_class = G_SOCKET_CONTROL_MESSAGE_CLASS (class);
  scm_class->get_size = gst_ip_recvdstaddr_message_get_size;
  scm_class->get_level = gst_ip_recvdstaddr_message_get_level;
  scm_class->get_type = gst_ip_recvdstaddr_message_get_msg_type;
  scm_class->deserialize = gst_ip_recvdstaddr_message_deserialize;
}
#endif

410 411 412 413 414 415 416 417 418 419 420 421 422 423 424 425 426 427 428 429 430 431 432 433 434 435 436 437 438 439 440 441 442 443 444 445 446
static gboolean
gst_udpsrc_decide_allocation (GstBaseSrc * bsrc, GstQuery * query)
{
  GstUDPSrc *udpsrc;
  GstBufferPool *pool;
  gboolean update;
  GstStructure *config;
  GstCaps *caps = NULL;

  udpsrc = GST_UDPSRC (bsrc);

  if (gst_query_get_n_allocation_pools (query) > 0) {
    update = TRUE;
  } else {
    update = FALSE;
  }

  pool = gst_buffer_pool_new ();

  config = gst_buffer_pool_get_config (pool);

  gst_query_parse_allocation (query, &caps, NULL);

  gst_buffer_pool_config_set_params (config, caps, udpsrc->mtu, 0, 0);

  gst_buffer_pool_set_config (pool, config);

  if (update)
    gst_query_set_nth_allocation_pool (query, 0, pool, udpsrc->mtu, 0, 0);
  else
    gst_query_add_allocation_pool (query, pool, udpsrc->mtu, 0, 0);

  gst_object_unref (pool);

  return TRUE;
}

447 448 449
/* not 100% correct, but a good upper bound for memory allocation purposes */
#define MAX_IPV4_UDP_PACKET_SIZE (65536 - 8)

450
GST_DEBUG_CATEGORY_STATIC (udpsrc_debug);
451 452
#define GST_CAT_DEFAULT (udpsrc_debug)

Wim Taymans's avatar
Wim Taymans committed
453 454 455 456 457
static GstStaticPadTemplate src_template = GST_STATIC_PAD_TEMPLATE ("src",
    GST_PAD_SRC,
    GST_PAD_ALWAYS,
    GST_STATIC_CAPS_ANY);

458
#define UDP_DEFAULT_PORT                5004
Thomas Vander Stichele's avatar
Thomas Vander Stichele committed
459
#define UDP_DEFAULT_MULTICAST_GROUP     "0.0.0.0"
460
#define UDP_DEFAULT_MULTICAST_IFACE     NULL
Wim Taymans's avatar
Wim Taymans committed
461
#define UDP_DEFAULT_URI                 "udp://"UDP_DEFAULT_MULTICAST_GROUP":"G_STRINGIFY(UDP_DEFAULT_PORT)
Thomas Vander Stichele's avatar
Thomas Vander Stichele committed
462
#define UDP_DEFAULT_CAPS                NULL
Sebastian Dröge's avatar
Sebastian Dröge committed
463
#define UDP_DEFAULT_SOCKET              NULL
464 465
#define UDP_DEFAULT_BUFFER_SIZE		0
#define UDP_DEFAULT_TIMEOUT             0
466
#define UDP_DEFAULT_SKIP_FIRST_BYTES	0
Sebastian Dröge's avatar
Sebastian Dröge committed
467 468
#define UDP_DEFAULT_CLOSE_SOCKET       TRUE
#define UDP_DEFAULT_USED_SOCKET        NULL
469
#define UDP_DEFAULT_AUTO_MULTICAST     TRUE
470
#define UDP_DEFAULT_REUSE              TRUE
471
#define UDP_DEFAULT_LOOP               TRUE
472
#define UDP_DEFAULT_RETRIEVE_SENDER_ADDRESS TRUE
473
#define UDP_DEFAULT_MTU                (1492)
474

Thomas Vander Stichele's avatar
Thomas Vander Stichele committed
475 476
enum
{
477
  PROP_0,
478

479 480
  PROP_PORT,
  PROP_MULTICAST_GROUP,
481
  PROP_MULTICAST_IFACE,
482
  PROP_URI,
483
  PROP_CAPS,
Sebastian Dröge's avatar
Sebastian Dröge committed
484
  PROP_SOCKET,
485
  PROP_BUFFER_SIZE,
486
  PROP_TIMEOUT,
487
  PROP_SKIP_FIRST_BYTES,
Sebastian Dröge's avatar
Sebastian Dröge committed
488 489
  PROP_CLOSE_SOCKET,
  PROP_USED_SOCKET,
490
  PROP_AUTO_MULTICAST,
491
  PROP_REUSE,
492
  PROP_ADDRESS,
493
  PROP_LOOP,
494 495
  PROP_RETRIEVE_SENDER_ADDRESS,
  PROP_MTU,
Andy Wingo Wingo's avatar
Andy Wingo Wingo committed
496 497
};

498 499
static void gst_udpsrc_uri_handler_init (gpointer g_iface, gpointer iface_data);

Wim Taymans's avatar
Wim Taymans committed
500
static GstCaps *gst_udpsrc_getcaps (GstBaseSrc * src, GstCaps * filter);
501
static gboolean gst_udpsrc_close (GstUDPSrc * src);
502
static gboolean gst_udpsrc_unlock (GstBaseSrc * bsrc);
503
static gboolean gst_udpsrc_unlock_stop (GstBaseSrc * bsrc);
504
static GstFlowReturn gst_udpsrc_fill (GstPushSrc * psrc, GstBuffer * outbuf);
505

506
static void gst_udpsrc_finalize (GObject * object);
Andy Wingo Wingo's avatar
Andy Wingo Wingo committed
507

Thomas Vander Stichele's avatar
Thomas Vander Stichele committed
508 509 510 511
static void gst_udpsrc_set_property (GObject * object, guint prop_id,
    const GValue * value, GParamSpec * pspec);
static void gst_udpsrc_get_property (GObject * object, guint prop_id,
    GValue * value, GParamSpec * pspec);
Andy Wingo Wingo's avatar
Andy Wingo Wingo committed
512

513 514 515
static GstStateChangeReturn gst_udpsrc_change_state (GstElement * element,
    GstStateChange transition);

Wim Taymans's avatar
Wim Taymans committed
516 517 518
#define gst_udpsrc_parent_class parent_class
G_DEFINE_TYPE_WITH_CODE (GstUDPSrc, gst_udpsrc, GST_TYPE_PUSH_SRC,
    G_IMPLEMENT_INTERFACE (GST_TYPE_URI_HANDLER, gst_udpsrc_uri_handler_init));
Benjamin Otte's avatar
Benjamin Otte committed
519

Andy Wingo Wingo's avatar
Andy Wingo Wingo committed
520
static void
521
gst_udpsrc_class_init (GstUDPSrcClass * klass)
Andy Wingo Wingo's avatar
Andy Wingo Wingo committed
522 523
{
  GObjectClass *gobject_class;
Wim Taymans's avatar
Wim Taymans committed
524
  GstElementClass *gstelement_class;
525 526
  GstBaseSrcClass *gstbasesrc_class;
  GstPushSrcClass *gstpushsrc_class;
Andy Wingo Wingo's avatar
Andy Wingo Wingo committed
527

Thomas Vander Stichele's avatar
Thomas Vander Stichele committed
528
  gobject_class = (GObjectClass *) klass;
Wim Taymans's avatar
Wim Taymans committed
529
  gstelement_class = (GstElementClass *) klass;
530 531
  gstbasesrc_class = (GstBaseSrcClass *) klass;
  gstpushsrc_class = (GstPushSrcClass *) klass;
Andy Wingo Wingo's avatar
Andy Wingo Wingo committed
532

Wim Taymans's avatar
Wim Taymans committed
533 534
  GST_DEBUG_CATEGORY_INIT (udpsrc_debug, "udpsrc", 0, "UDP src");

535 536 537 538 539 540 541 542 543 544
#ifdef IP_PKTINFO
  GST_TYPE_IP_PKTINFO_MESSAGE;
#endif
#ifdef IPV6_PKTINFO
  GST_TYPE_IPV6_PKTINFO_MESSAGE;
#endif
#ifdef IP_RECVDSTADDR
  GST_TYPE_IP_RECVDSTADDR_MESSAGE;
#endif

Wim Taymans's avatar
Wim Taymans committed
545 546
  gobject_class->set_property = gst_udpsrc_set_property;
  gobject_class->get_property = gst_udpsrc_get_property;
547
  gobject_class->finalize = gst_udpsrc_finalize;
Wim Taymans's avatar
Wim Taymans committed
548

549
  g_object_class_install_property (G_OBJECT_CLASS (klass), PROP_PORT,
550
      g_param_spec_int ("port", "Port",
Wim Taymans's avatar
Wim Taymans committed
551
          "The port to receive the packets from, 0=allocate", 0, G_MAXUINT16,
552
          UDP_DEFAULT_PORT, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
553
  /* FIXME 2.0: Remove multicast-group property */
554
#ifndef GST_REMOVE_DEPRECATED
555
  g_object_class_install_property (gobject_class, PROP_MULTICAST_GROUP,
556
      g_param_spec_string ("multicast-group", "Multicast Group",
557 558 559 560
          "The Address of multicast group to join. (DEPRECATED: "
          "Use address property instead)", UDP_DEFAULT_MULTICAST_GROUP,
          G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS | G_PARAM_DEPRECATED));
#endif
561
  g_object_class_install_property (gobject_class, PROP_MULTICAST_IFACE,
562
      g_param_spec_string ("multicast-iface", "Multicast Interface",
563 564
          "The network interface on which to join the multicast group."
          "This allows multiple interfaces seperated by comma. (\"eth0,eth1\")",
565 566
          UDP_DEFAULT_MULTICAST_IFACE,
          G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
567 568
  g_object_class_install_property (gobject_class, PROP_URI,
      g_param_spec_string ("uri", "URI",
569
          "URI in the form of udp://multicast_group:port", UDP_DEFAULT_URI,
570
          G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
571 572
  g_object_class_install_property (gobject_class, PROP_CAPS,
      g_param_spec_boxed ("caps", "Caps",
573 574
          "The caps of the source pad", GST_TYPE_CAPS,
          G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
Sebastian Dröge's avatar
Sebastian Dröge committed
575 576 577 578
  g_object_class_install_property (gobject_class, PROP_SOCKET,
      g_param_spec_object ("socket", "Socket",
          "Socket to use for UDP reception. (NULL == allocate)",
          G_TYPE_SOCKET, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
579 580 581
  g_object_class_install_property (G_OBJECT_CLASS (klass), PROP_BUFFER_SIZE,
      g_param_spec_int ("buffer-size", "Buffer Size",
          "Size of the kernel receive buffer in bytes, 0=default", 0, G_MAXINT,
582
          UDP_DEFAULT_BUFFER_SIZE, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
583 584
  g_object_class_install_property (G_OBJECT_CLASS (klass), PROP_TIMEOUT,
      g_param_spec_uint64 ("timeout", "Timeout",
Sebastian Dröge's avatar
Sebastian Dröge committed
585
          "Post a message after timeout nanoseconds (0 = disabled)", 0,
586 587
          G_MAXUINT64, UDP_DEFAULT_TIMEOUT,
          G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
588 589 590
  g_object_class_install_property (G_OBJECT_CLASS (klass),
      PROP_SKIP_FIRST_BYTES, g_param_spec_int ("skip-first-bytes",
          "Skip first bytes", "number of bytes to skip for each udp packet", 0,
591 592
          G_MAXINT, UDP_DEFAULT_SKIP_FIRST_BYTES,
          G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
Sebastian Dröge's avatar
Sebastian Dröge committed
593 594 595 596 597 598 599 600 601
  g_object_class_install_property (gobject_class, PROP_CLOSE_SOCKET,
      g_param_spec_boolean ("close-socket", "Close socket",
          "Close socket if passed as property on state change",
          UDP_DEFAULT_CLOSE_SOCKET,
          G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
  g_object_class_install_property (gobject_class, PROP_USED_SOCKET,
      g_param_spec_object ("used-socket", "Socket Handle",
          "Socket currently in use for UDP reception. (NULL = no socket)",
          G_TYPE_SOCKET, G_PARAM_READABLE | G_PARAM_STATIC_STRINGS));
602 603 604
  g_object_class_install_property (gobject_class, PROP_AUTO_MULTICAST,
      g_param_spec_boolean ("auto-multicast", "Auto Multicast",
          "Automatically join/leave multicast groups",
605 606
          UDP_DEFAULT_AUTO_MULTICAST,
          G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
607 608
  g_object_class_install_property (gobject_class, PROP_REUSE,
      g_param_spec_boolean ("reuse", "Reuse", "Enable reuse of the port",
609
          UDP_DEFAULT_REUSE, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
610 611 612 613
  g_object_class_install_property (gobject_class, PROP_ADDRESS,
      g_param_spec_string ("address", "Address",
          "Address to receive packets for. This is equivalent to the "
          "multicast-group property for now", UDP_DEFAULT_MULTICAST_GROUP,
614
          G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
615 616 617 618 619
  /**
   * GstUDPSrc::loop:
   *
   * Can be used to disable multicast loopback.
   *
620
   * Since: 1.8
621
   */
622 623 624 625 626
  g_object_class_install_property (gobject_class, PROP_LOOP,
      g_param_spec_boolean ("loop", "Multicast Loopback",
          "Used for setting the multicast loop parameter. TRUE = enable,"
          " FALSE = disable", UDP_DEFAULT_LOOP,
          G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
627 628 629 630 631 632 633 634 635 636 637 638 639 640 641 642
  /**
   * GstUDPSrc::retrieve-sender-address:
   *
   * Whether to retrieve the sender address and add it to the buffers as
   * meta. Disabling this might result in minor performance improvements
   * in certain scenarios.
   *
   * Since: 1.10
   */
  g_object_class_install_property (gobject_class, PROP_RETRIEVE_SENDER_ADDRESS,
      g_param_spec_boolean ("retrieve-sender-address",
          "Retrieve Sender Address",
          "Whether to retrieve the sender address and add it to buffers as "
          "meta. Disabling this might result in minor performance improvements "
          "in certain scenarios", UDP_DEFAULT_RETRIEVE_SENDER_ADDRESS,
          G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
643 644 645 646 647 648 649 650 651 652 653 654 655 656 657 658 659 660
  /**
   * GstUDPSrc::mtu:
   *
   * Maximum expected packet size. This directly defines the allocation
   * size of the receive buffer pool.
   *
   * In case more data is received, a new #GstMemory is appended to the
   * output buffer, ensuring no data is lost, this however leads to that
   * buffer being freed and reallocated.
   *
   * Since: 1.14
   */
  g_object_class_install_property (gobject_class, PROP_MTU,
      g_param_spec_uint ("mtu", "Expected Maximum Transmission Unit",
          "Maximum expected packet size. This directly defines the allocation"
          "size of the receive buffer pool.",
          0, G_MAXINT, UDP_DEFAULT_MTU,
          G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
661

662
  gst_element_class_add_static_pad_template (gstelement_class, &src_template);
Wim Taymans's avatar
Wim Taymans committed
663

664 665
  gst_element_class_set_static_metadata (gstelement_class,
      "UDP packet receiver", "Source/Network",
Wim Taymans's avatar
Wim Taymans committed
666 667 668 669
      "Receive data over the network via UDP",
      "Wim Taymans <wim@fluendo.com>, "
      "Thijs Vermeir <thijs.vermeir@barco.com>");

670 671
  gstelement_class->change_state = gst_udpsrc_change_state;

672
  gstbasesrc_class->unlock = gst_udpsrc_unlock;
673
  gstbasesrc_class->unlock_stop = gst_udpsrc_unlock_stop;
674
  gstbasesrc_class->get_caps = gst_udpsrc_getcaps;
675
  gstbasesrc_class->decide_allocation = gst_udpsrc_decide_allocation;
676

677
  gstpushsrc_class->fill = gst_udpsrc_fill;
Zeeshan Ali's avatar
Zeeshan Ali committed
678
}
Andy Wingo Wingo's avatar
Andy Wingo Wingo committed
679 680

static void
Wim Taymans's avatar
Wim Taymans committed
681
gst_udpsrc_init (GstUDPSrc * udpsrc)
Andy Wingo Wingo's avatar
Andy Wingo Wingo committed
682
{
Sebastian Dröge's avatar
Sebastian Dröge committed
683 684
  udpsrc->uri =
      g_strdup_printf ("udp://%s:%u", UDP_DEFAULT_MULTICAST_GROUP,
Wim Taymans's avatar
Wim Taymans committed
685 686
      UDP_DEFAULT_PORT);

687
  udpsrc->address = g_strdup (UDP_DEFAULT_MULTICAST_GROUP);
Sebastian Dröge's avatar
Sebastian Dröge committed
688 689
  udpsrc->port = UDP_DEFAULT_PORT;
  udpsrc->socket = UDP_DEFAULT_SOCKET;
690
  udpsrc->multi_iface = g_strdup (UDP_DEFAULT_MULTICAST_IFACE);
691
  udpsrc->buffer_size = UDP_DEFAULT_BUFFER_SIZE;
692
  udpsrc->timeout = UDP_DEFAULT_TIMEOUT;
693
  udpsrc->skip_first_bytes = UDP_DEFAULT_SKIP_FIRST_BYTES;
Sebastian Dröge's avatar
Sebastian Dröge committed
694 695
  udpsrc->close_socket = UDP_DEFAULT_CLOSE_SOCKET;
  udpsrc->external_socket = (udpsrc->socket != NULL);
696
  udpsrc->auto_multicast = UDP_DEFAULT_AUTO_MULTICAST;
Sebastian Dröge's avatar
Sebastian Dröge committed
697
  udpsrc->used_socket = UDP_DEFAULT_USED_SOCKET;
698
  udpsrc->reuse = UDP_DEFAULT_REUSE;
699
  udpsrc->loop = UDP_DEFAULT_LOOP;
700
  udpsrc->retrieve_sender_address = UDP_DEFAULT_RETRIEVE_SENDER_ADDRESS;
701
  udpsrc->mtu = UDP_DEFAULT_MTU;
702 703 704 705

  /* configure basesrc to be a live source */
  gst_base_src_set_live (GST_BASE_SRC (udpsrc), TRUE);
  /* make basesrc output a segment in time */
706
  gst_base_src_set_format (GST_BASE_SRC (udpsrc), GST_FORMAT_TIME);
707 708
  /* make basesrc set timestamps on outgoing buffers based on the running_time
   * when they were captured */
709
  gst_base_src_set_do_timestamp (GST_BASE_SRC (udpsrc), TRUE);
Andy Wingo Wingo's avatar
Andy Wingo Wingo committed
710 711
}

712 713 714 715 716 717 718 719 720
static void
gst_udpsrc_finalize (GObject * object)
{
  GstUDPSrc *udpsrc;

  udpsrc = GST_UDPSRC (object);

  if (udpsrc->caps)
    gst_caps_unref (udpsrc->caps);
Sebastian Dröge's avatar
Sebastian Dröge committed
721
  udpsrc->caps = NULL;
Wim Taymans's avatar
Wim Taymans committed
722

723
  g_free (udpsrc->multi_iface);
Sebastian Dröge's avatar
Sebastian Dröge committed
724
  udpsrc->multi_iface = NULL;
Wim Taymans's avatar
Wim Taymans committed
725

Sebastian Dröge's avatar
Sebastian Dröge committed
726 727
  g_free (udpsrc->uri);
  udpsrc->uri = NULL;
728

729 730
  g_free (udpsrc->address);
  udpsrc->address = NULL;
Vincent Penquerc'h's avatar
Vincent Penquerc'h committed
731

Sebastian Dröge's avatar
Sebastian Dröge committed
732 733 734
  if (udpsrc->socket)
    g_object_unref (udpsrc->socket);
  udpsrc->socket = NULL;
735

Sebastian Dröge's avatar
Sebastian Dröge committed
736 737 738 739
  if (udpsrc->used_socket)
    g_object_unref (udpsrc->used_socket);
  udpsrc->used_socket = NULL;

740 741 742 743
  if (udpsrc->extra_mem)
    gst_memory_unref (udpsrc->extra_mem);
  udpsrc->extra_mem = NULL;

744 745 746
  G_OBJECT_CLASS (parent_class)->finalize (object);
}

747
static GstCaps *
Wim Taymans's avatar
Wim Taymans committed
748
gst_udpsrc_getcaps (GstBaseSrc * src, GstCaps * filter)
749 750
{
  GstUDPSrc *udpsrc;
Wim Taymans's avatar
Wim Taymans committed
751
  GstCaps *caps, *result;
752 753 754

  udpsrc = GST_UDPSRC (src);

Wim Taymans's avatar
Wim Taymans committed
755 756 757 758 759 760 761 762 763 764 765 766
  GST_OBJECT_LOCK (src);
  if ((caps = udpsrc->caps))
    gst_caps_ref (caps);
  GST_OBJECT_UNLOCK (src);

  if (caps) {
    if (filter) {
      result = gst_caps_intersect_full (filter, caps, GST_CAPS_INTERSECT_FIRST);
      gst_caps_unref (caps);
    } else {
      result = caps;
    }
Sebastian Dröge's avatar
Sebastian Dröge committed
767
  } else {
Wim Taymans's avatar
Wim Taymans committed
768
    result = (filter) ? gst_caps_ref (filter) : gst_caps_new_any ();
Sebastian Dröge's avatar
Sebastian Dröge committed
769
  }
Wim Taymans's avatar
Wim Taymans committed
770
  return result;
771 772
}

773 774 775 776 777 778 779 780 781 782 783 784 785 786 787 788 789 790 791 792
static void
gst_udpsrc_create_cancellable (GstUDPSrc * src)
{
  GPollFD pollfd;

  src->cancellable = g_cancellable_new ();
  src->made_cancel_fd = g_cancellable_make_pollfd (src->cancellable, &pollfd);
}

static void
gst_udpsrc_free_cancellable (GstUDPSrc * src)
{
  if (src->made_cancel_fd) {
    g_cancellable_release_fd (src->cancellable);
    src->made_cancel_fd = FALSE;
  }
  g_object_unref (src->cancellable);
  src->cancellable = NULL;
}

793
static GstFlowReturn
794
gst_udpsrc_fill (GstPushSrc * psrc, GstBuffer * outbuf)
Andy Wingo Wingo's avatar
Andy Wingo Wingo committed
795 796
{
  GstUDPSrc *udpsrc;
Sebastian Dröge's avatar
Sebastian Dröge committed
797
  GSocketAddress *saddr = NULL;
798
  GSocketAddress **p_saddr;
799
  gint flags = G_SOCKET_MSG_NONE;
800
  gboolean try_again;
Sebastian Dröge's avatar
Sebastian Dröge committed
801
  GError *err = NULL;
802 803
  gssize res;
  gsize offset;
804
  GSocketControlMessage **msgs = NULL;
805
  GSocketControlMessage ***p_msgs;
806
  gint n_msgs = 0, i;
807 808 809
  GstMapInfo info;
  GstMapInfo extra_info;
  GInputVector ivec[2];
Andy Wingo Wingo's avatar
Andy Wingo Wingo committed
810

811
  udpsrc = GST_UDPSRC_CAST (psrc);
Andy Wingo Wingo's avatar
Andy Wingo Wingo committed
812

813 814
  /* optimization: use messages only in multicast mode and
   * if we can't let the kernel do the filtering for us */
815 816 817
  p_msgs =
      (g_inet_address_get_is_multicast (g_inet_socket_address_get_address
          (udpsrc->addr))) ? &msgs : NULL;
818 819 820 821 822
#ifdef IP_MULTICAST_ALL
  if (g_inet_address_get_family (g_inet_socket_address_get_address
          (udpsrc->addr)) == G_SOCKET_FAMILY_IPV4)
    p_msgs = NULL;
#endif
823

824 825 826
  /* Retrieve sender address unless we've been configured not to do so */
  p_saddr = (udpsrc->retrieve_sender_address) ? &saddr : NULL;

827 828 829 830 831 832 833 834 835 836 837 838 839 840 841 842 843 844 845 846 847 848 849 850 851 852 853 854 855 856 857 858
  if (!gst_buffer_map (outbuf, &info, GST_MAP_READWRITE))
    goto buffer_map_error;

  ivec[0].buffer = info.data;
  ivec[0].size = info.size;

  /* Prepare memory in case the data size exceeds mtu */
  if (udpsrc->extra_mem == NULL) {
    GstBufferPool *pool;
    GstStructure *config;
    GstAllocator *allocator = NULL;
    GstAllocationParams params;

    pool = gst_base_src_get_buffer_pool (GST_BASE_SRC_CAST (psrc));
    config = gst_buffer_pool_get_config (pool);
    gst_buffer_pool_config_get_allocator (config, &allocator, &params);

    udpsrc->extra_mem =
        gst_allocator_alloc (allocator, MAX_IPV4_UDP_PACKET_SIZE, &params);

    gst_object_unref (pool);
    gst_structure_free (config);
    if (allocator)
      gst_object_unref (allocator);
  }

  if (!gst_memory_map (udpsrc->extra_mem, &extra_info, GST_MAP_READWRITE))
    goto memory_map_error;

  ivec[1].buffer = extra_info.data;
  ivec[1].size = extra_info.size;

859
retry:
860 861 862 863
  if (saddr != NULL) {
    g_object_unref (saddr);
    saddr = NULL;
  }
864

865
  do {
Wim Taymans's avatar
Wim Taymans committed
866 867
    gint64 timeout;

868 869
    try_again = FALSE;

Wim Taymans's avatar
Wim Taymans committed
870 871 872 873 874
    if (udpsrc->timeout)
      timeout = udpsrc->timeout / 1000;
    else
      timeout = -1;

Marc Leeman's avatar
Marc Leeman committed
875
    GST_LOG_OBJECT (udpsrc, "doing select, timeout %" G_GINT64_FORMAT, timeout);
876

Wim Taymans's avatar
Wim Taymans committed
877 878
    if (!g_socket_condition_timed_wait (udpsrc->used_socket, G_IO_IN | G_IO_PRI,
            timeout, udpsrc->cancellable, &err)) {
Sebastian Dröge's avatar
Sebastian Dröge committed
879 880
      if (g_error_matches (err, G_IO_ERROR, G_IO_ERROR_BUSY)
          || g_error_matches (err, G_IO_ERROR, G_IO_ERROR_CANCELLED)) {
881
        goto stopped;
Sebastian Dröge's avatar
Sebastian Dröge committed
882
      } else if (g_error_matches (err, G_IO_ERROR, G_IO_ERROR_TIMED_OUT)) {
Mark Nauwelaerts's avatar
Mark Nauwelaerts committed
883
        g_clear_error (&err);
Sebastian Dröge's avatar
Sebastian Dröge committed
884 885 886 887 888 889
        /* timeout, post element message */
        gst_element_post_message (GST_ELEMENT_CAST (udpsrc),
            gst_message_new_element (GST_OBJECT_CAST (udpsrc),
                gst_structure_new ("GstUDPSrcTimeout",
                    "timeout", G_TYPE_UINT64, udpsrc->timeout, NULL)));
      } else {
890
        goto select_error;
Sebastian Dröge's avatar
Sebastian Dröge committed
891 892
      }

893
      try_again = TRUE;
894
    }
895
  } while (G_UNLIKELY (try_again));
Andy Wingo Wingo's avatar
Andy Wingo Wingo committed
896

897
  res =
898
      g_socket_receive_message (udpsrc->used_socket, p_saddr, ivec, 2,
899
      p_msgs, &n_msgs, &flags, udpsrc->cancellable, &err);
Sebastian Dröge's avatar
Sebastian Dröge committed
900

901
  if (G_UNLIKELY (res < 0)) {
902 903 904 905
    /* G_IO_ERROR_HOST_UNREACHABLE for a UDP socket means that a packet sent
     * with udpsink generated a "port unreachable" ICMP response. We ignore
     * that and try again.
     * On Windows we get G_IO_ERROR_CONNECTION_CLOSED instead */
906
#if GLIB_CHECK_VERSION(2,44,0)
907 908
    if (g_error_matches (err, G_IO_ERROR, G_IO_ERROR_HOST_UNREACHABLE) ||
        g_error_matches (err, G_IO_ERROR, G_IO_ERROR_CONNECTION_CLOSED)) {
909 910 911
#else
    if (g_error_matches (err, G_IO_ERROR, G_IO_ERROR_HOST_UNREACHABLE)) {
#endif
Sebastian Dröge's avatar
Sebastian Dröge committed
912
      g_clear_error (&err);
913 914
      goto retry;
    }
Sebastian Dröge's avatar
Sebastian Dröge committed
915
    goto receive_error;
916
  }
Zeeshan Ali's avatar
Zeeshan Ali committed
917

918 919
  /* Retry if multicast and the destination address is not ours. We don't want
   * to receive arbitrary packets */
920
  if (p_msgs) {
921 922 923 924 925
    GInetAddress *iaddr = g_inet_socket_address_get_address (udpsrc->addr);
    gboolean skip_packet = FALSE;
    gsize iaddr_size = g_inet_address_get_native_size (iaddr);
    const guint8 *iaddr_bytes = g_inet_address_to_bytes (iaddr);

926
    for (i = 0; i < n_msgs && !skip_packet; i++) {
927
#ifdef IP_PKTINFO
928 929
      if (GST_IS_IP_PKTINFO_MESSAGE (msgs[i])) {
        GstIPPktinfoMessage *msg = GST_IP_PKTINFO_MESSAGE (msgs[i]);
930

931 932 933 934
        if (sizeof (msg->addr) == iaddr_size
            && memcmp (iaddr_bytes, &msg->addr, sizeof (msg->addr)))
          skip_packet = TRUE;
      }
935 936
#endif
#ifdef IPV6_PKTINFO
937 938
      if (GST_IS_IPV6_PKTINFO_MESSAGE (msgs[i])) {
        GstIPV6PktinfoMessage *msg = GST_IPV6_PKTINFO_MESSAGE (msgs[i]);
939

940 941 942 943
        if (sizeof (msg->addr) == iaddr_size
            && memcmp (iaddr_bytes, &msg->addr, sizeof (msg->addr)))
          skip_packet = TRUE;
      }
944 945
#endif
#ifdef IP_RECVDSTADDR
946 947
      if (GST_IS_IP_RECVDSTADDR_MESSAGE (msgs[i])) {
        GstIPRecvdstaddrMessage *msg = GST_IP_RECVDSTADDR_MESSAGE (msgs[i]);
948

949 950 951
        if (sizeof (msg->addr) == iaddr_size
            && memcmp (iaddr_bytes, &msg->addr, sizeof (msg->addr)))
          skip_packet = TRUE;
952
      }
953
#endif
954 955 956 957 958 959 960 961 962 963 964 965 966 967
    }

    for (i = 0; i < n_msgs; i++) {
      g_object_unref (msgs[i]);
    }
    g_free (msgs);

    if (skip_packet) {
      GST_DEBUG_OBJECT (udpsrc,
          "Dropping packet for a different multicast address");
      goto retry;
    }
  }

968 969
  gst_buffer_unmap (outbuf, &info);
  gst_memory_unmap (udpsrc->extra_mem, &extra_info);
970

971 972 973 974 975 976
  /* If this is the case, the buffer will be freed once unreffed,
   * and the buffer pool will have to reallocate a new one.
   */
  if (res > udpsrc->mtu) {
    gst_buffer_append_memory (outbuf, udpsrc->extra_mem);
    udpsrc->extra_mem = NULL;
977
  }
Wim Taymans's avatar
Wim Taymans committed
978

979 980 981 982 983 984
  offset = udpsrc->skip_first_bytes;

  if (G_UNLIKELY (offset > 0 && res < offset))
    goto skip_error;

  gst_buffer_resize (outbuf, offset, res - offset);
985

986
  /* use buffer metadata so receivers can also track the address */
Sebastian Dröge's avatar
Sebastian Dröge committed
987 988 989
  if (saddr) {
    gst_buffer_add_net_address_meta (outbuf, saddr);
    g_object_unref (saddr);
990
    saddr = NULL;
991
  }
Sebastian Dröge's avatar
Sebastian Dröge committed
992

993
  GST_LOG_OBJECT (udpsrc, "read packet of %d bytes", (int) res);
994

995
  return GST_FLOW_OK;
Zeeshan Ali's avatar
Zeeshan Ali committed
996

997
  /* ERRORS */
998 999 1000 1001 1002 1003 1004
buffer_map_error:
  {
    GST_ELEMENT_ERROR (udpsrc, RESOURCE, READ, (NULL),
        ("Failed to map memory"));
    return GST_FLOW_ERROR;
  }
memory_map_error:
1005
  {
1006
    gst_buffer_unmap (outbuf, &info);
1007
    GST_ELEMENT_ERROR (udpsrc, RESOURCE, READ, (NULL),
1008
        ("Failed to map memory"));
1009 1010
    return GST_FLOW_ERROR;
  }
Wim Taymans's avatar
Wim Taymans committed
1011 1012
select_error:
  {
1013 1014
    gst_buffer_unmap (outbuf, &info);
    gst_memory_unmap (udpsrc->extra_mem, &extra_info);
1015
    GST_ELEMENT_ERROR (udpsrc, RESOURCE, READ, (NULL),
Sebastian Dröge's avatar
Sebastian Dröge committed
1016 1017
        ("select error: %s", err->message));
    g_clear_error (&err);
1018 1019 1020 1021
    return GST_FLOW_ERROR;
  }
stopped:
  {
1022 1023
    gst_buffer_unmap (outbuf, &info);
    gst_memory_unmap (udpsrc->extra_mem, &extra_info);
1024
    GST_DEBUG ("stop called");
Mark Nauwelaerts's avatar
Mark Nauwelaerts committed
1025
    g_clear_error (&err);
1026
    return GST_FLOW_FLUSHING;
1027
  }
Wim Taymans's avatar
Wim Taymans committed
1028 1029
receive_error:
  {
1030 1031
    gst_buffer_unmap (outbuf, &info);
    gst_memory_unmap (udpsrc->extra_mem, &extra_info);
Sebastian Dröge's avatar
Sebastian Dröge committed
1032 1033 1034
    if (g_error_matches (err, G_IO_ERROR, G_IO_ERROR_BUSY) ||
        g_error_matches (err, G_IO_ERROR, G_IO_ERROR_CANCELLED)) {
      g_clear_error (&err);
1035
      return GST_FLOW_FLUSHING;
Sebastian Dröge's avatar
Sebastian Dröge committed
1036 1037
    } else {
      GST_ELEMENT_ERROR (udpsrc, RESOURCE, READ, (NULL),
1038
          ("receive error %" G_GSSIZE_FORMAT ": %s", res, err->message));
Sebastian Dröge's avatar
Sebastian Dröge committed
1039 1040 1041
      g_clear_error (&err);
      return GST_FLOW_ERROR;
    }
1042
  }
1043 1044 1045 1046 1047 1048
skip_error:
  {
    GST_ELEMENT_ERROR (udpsrc, STREAM, DECODE, (NULL),
        ("UDP buffer to small to skip header"));
    return GST_FLOW_ERROR;
  }
1049 1050 1051
}

static gboolean
1052
gst_udpsrc_set_uri (GstUDPSrc * src, const gchar * uri, GError ** error)
1053
{
1054
  gchar *address;
Sebastian Dröge's avatar
Sebastian Dröge committed
1055 1056
  guint16 port;

1057
  if (!gst_udp_parse_uri (uri, &address, &port))
Wim Taymans's avatar
Wim Taymans committed
1058
    goto wrong_uri;
1059

Wim Taymans's avatar
Wim Taymans committed
1060
  if (port == (guint16) - 1)
Sebastian Dröge's avatar
Sebastian Dröge committed
1061 1062
    port = UDP_DEFAULT_PORT;

1063 1064
  g_free (src->address);
  src->address = address;
Sebastian Dröge's avatar
Sebastian Dröge committed
1065 1066 1067 1068
  src->port = port;

  g_free (src->uri);
  src->uri = g_strdup (uri);
1069 1070 1071

  return TRUE;

1072
  /* ERRORS */
Wim Taymans's avatar
Wim Taymans committed
1073
wrong_uri:
1074
  {
1075
    GST_ELEMENT_ERROR (src, RESOURCE, READ, (NULL),
Wim Taymans's avatar
Wim Taymans committed
1076
        ("error parsing uri %s", uri));
1077 1078
    g_set_error_literal (error, GST_URI_ERROR, GST_URI_ERROR_BAD_URI,
        "Could not parse UDP URI");
1079 1080
    return FALSE;
  }
Andy Wingo Wingo's avatar
Andy Wingo Wingo committed
1081 1082 1083
}

static void
Thomas Vander Stichele's avatar
Thomas Vander Stichele committed
1084 1085
gst_udpsrc_set_property (GObject * object, guint prop_id, const GValue * value,
    GParamSpec * pspec)
Andy Wingo Wingo's avatar
Andy Wingo Wingo committed
1086
{
1087
  GstUDPSrc *udpsrc = GST_UDPSRC (object);
Andy Wingo Wingo's avatar
Andy Wingo Wingo committed
1088 1089

  switch (prop_id) {
1090 1091
    case PROP_BUFFER_SIZE:
      udpsrc->buffer_size = g_value_get_int (value);
1092
      break;
1093
    case PROP_PORT:
Sebastian Dröge's avatar
Sebastian Dröge committed
1094 1095
      udpsrc->port = g_value_get_int (value);
      g_free (udpsrc->uri);
1096
      udpsrc->uri =
1097
          g_strdup_printf ("udp://%s:%u", udpsrc->address, udpsrc->port);
Andy Wingo Wingo's avatar
Andy Wingo Wingo committed
1098
      break;
1099
    case PROP_MULTICAST_GROUP:
1100
    case PROP_ADDRESS:
Wim Taymans's avatar
Wim Taymans committed
1101 1102
    {
      const gchar *group;
Thomas Vander Stichele's avatar
Thomas Vander Stichele committed
1103

1104
      g_free (udpsrc->address);
Wim Taymans's avatar
Wim Taymans committed
1105
      if ((group = g_value_get_string (value)))
1106
        udpsrc->address = g_strdup (group);
1107
      else
1108
        udpsrc->address = g_strdup (UDP_DEFAULT_MULTICAST_GROUP);
Sebastian Dröge's avatar
Sebastian Dröge committed
1109 1110

      g_free (udpsrc->uri);
1111
      udpsrc->uri =
1112
          g_strdup_printf ("udp://%s:%u", udpsrc->address, udpsrc->port);
1113
      break;
Wim Taymans's avatar
Wim Taymans committed
1114
    }
1115 1116 1117 1118 1119 1120 1121 1122
    case PROP_MULTICAST_IFACE:
      g_free (udpsrc->multi_iface);

      if (g_value_get_string (value) == NULL)
        udpsrc->multi_iface = g_strdup (UDP_DEFAULT_MULTICAST_IFACE);
      else
        udpsrc->multi_iface = g_value_dup_string (value);
      break;
1123
    case PROP_URI:
1124
      gst_udpsrc_set_uri (udpsrc, g_value_get_string (value), NULL);
1125
      break;
1126 1127 1128 1129 1130 1131 1132 1133 1134 1135 1136 1137
    case PROP_CAPS:
    {
      const GstCaps *new_caps_val = gst_value_get_caps (value);
      GstCaps *new_caps;
      GstCaps *old_caps;

      if (new_caps_val == NULL) {
        new_caps = gst_caps_new_any ();
      } else {
        new_caps = gst_caps_copy (new_caps_val);
      }

Wim Taymans's avatar
Wim Taymans committed
1138
      GST_OBJECT_LOCK (udpsrc);
1139 1140
      old_caps = udpsrc->caps;
      udpsrc->caps = new_caps;
Wim Taymans's avatar
Wim Taymans committed
1141
      GST_OBJECT_UNLOCK (udpsrc);
Wim Taymans's avatar
Wim Taymans committed
1142 1143
      if (old_caps)
        gst_caps_unref (old_caps);
Wim Taymans's avatar
Wim Taymans committed
1144 1145

      gst_pad_mark_reconfigure (GST_BASE_SRC_PAD (udpsrc));
1146 1147
      break;
    }
Sebastian Dröge's avatar
Sebastian Dröge committed
1148 1149 1150 1151 1152 1153 1154 1155 1156 1157 1158 1159 1160 1161 1162
    case PROP_SOCKET:
      if (udpsrc->socket != NULL && udpsrc->socket != udpsrc->used_socket &&
          udpsrc->close_socket) {
        GError *err = NULL;

        if (!g_socket_close (udpsrc->socket, &err)) {
          GST_ERROR ("failed to close socket %p: %s", udpsrc->socket,
              err->message);
          g_clear_error (&err);
        }
      }
      if (udpsrc->socket)
        g_object_unref (udpsrc->socket);
      udpsrc->socket = g_value_dup_object (value);
      GST_DEBUG ("setting socket to %p", udpsrc->socket);
1163
      break;
1164 1165 1166
    case PROP_TIMEOUT:
      udpsrc->timeout = g_value_get_uint64 (value);
      break;
1167 1168 1169
    case PROP_SKIP_FIRST_BYTES:
      udpsrc->skip_first_bytes = g_value_get_int (value);
      break;
Sebastian Dröge's avatar
Sebastian Dröge committed
1170 1171
    case PROP_CLOSE_SOCKET:
      udpsrc->close_socket = g_value_get_boolean (value);
1172
      break;
1173 1174 1175
    case PROP_AUTO_MULTICAST:
      udpsrc->auto_multicast = g_value_get_boolean (value);
      break;
1176 1177 1178
    case PROP_REUSE:
      udpsrc->reuse = g_value_get_boolean (value);
      break;
1179 1180 1181
    case PROP_LOOP:
      udpsrc->loop = g_value_get_boolean (value);
      break;
1182 1183 1184
    case PROP_RETRIEVE_SENDER_ADDRESS:
      udpsrc->retrieve_sender_address = g_value_get_boolean (value);
      break;
1185 1186 1187
    case PROP_MTU:
      udpsrc->mtu = g_value_get_uint (value);
      break;
Andy Wingo Wingo's avatar
Andy Wingo Wingo committed
1188 1189 1190 1191 1192 1193
    default:
      break;
  }
}

static void
Thomas Vander Stichele's avatar
Thomas Vander Stichele committed
1194 1195
gst_udpsrc_get_property (GObject * object, guint prop_id, GValue * value,
    GParamSpec * pspec)
Andy Wingo Wingo's avatar
Andy Wingo Wingo committed
1196
{
1197
  GstUDPSrc *udpsrc = GST_UDPSRC (object);
Andy Wingo Wingo's avatar
Andy Wingo Wingo committed
1198 1199

  switch (prop_id) {
1200 1201
    case PROP_BUFFER_SIZE:
      g_value_set_int (value, udpsrc->buffer_size);
1202
      break;
1203
    case PROP_PORT:
Sebastian Dröge's avatar
Sebastian Dröge committed
1204
      g_value_set_int (value, udpsrc->port);
Andy Wingo Wingo's avatar
Andy Wingo Wingo committed
1205
      break;
1206
    case PROP_MULTICAST_GROUP:
1207 1208
    case PROP_ADDRESS:
      g_value_set_string (value, udpsrc->address);