Newer
Older
/*
* Copyright (c) 2015, Collabora Ltd.
*
* Redistribution and use in source and binary forms, with or without modification,
* are permitted provided that the following conditions are met:
*
* 1. Redistributions of source code must retain the above copyright notice, this
* list of conditions and the following disclaimer.
*
* 2. Redistributions in binary form must reproduce the above copyright notice, this
* list of conditions and the following disclaimer in the documentation and/or other
* materials provided with the distribution.
*
* THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND
* ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
* WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED.
* IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT,
* INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT
* NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR
* PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY,
* WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
* ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY
* OF SUCH DAMAGE.
*/
#ifdef HAVE_CONFIG_H
#include "config.h"
#endif
#include "sctpassociation.h"
#include <gst/gst.h>
#include <string.h>
#include <errno.h>
#include <stdlib.h>
Sebastian Dröge
committed
GST_DEBUG_CATEGORY_STATIC (gst_sctp_association_debug_category);
#define GST_CAT_DEFAULT gst_sctp_association_debug_category
GST_DEBUG_CATEGORY_STATIC (gst_sctp_debug_category);
Sebastian Dröge
committed
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
#define GST_SCTP_ASSOCIATION_STATE_TYPE (gst_sctp_association_state_get_type())
static GType
gst_sctp_association_state_get_type (void)
{
static const GEnumValue values[] = {
{GST_SCTP_ASSOCIATION_STATE_NEW, "state-new", "state-new"},
{GST_SCTP_ASSOCIATION_STATE_READY, "state-ready", "state-ready"},
{GST_SCTP_ASSOCIATION_STATE_CONNECTING, "state-connecting",
"state-connecting"},
{GST_SCTP_ASSOCIATION_STATE_CONNECTED, "state-connected",
"state-connected"},
{GST_SCTP_ASSOCIATION_STATE_DISCONNECTING, "state-disconnecting",
"state-disconnecting"},
{GST_SCTP_ASSOCIATION_STATE_DISCONNECTED, "state-disconnected",
"state-disconnected"},
{GST_SCTP_ASSOCIATION_STATE_ERROR, "state-error", "state-error"},
{0, NULL, NULL}
};
static volatile GType id = 0;
if (g_once_init_enter ((gsize *) & id)) {
GType _id;
_id = g_enum_register_static ("GstSctpAssociationState", values);
g_once_init_leave ((gsize *) & id, _id);
}
return id;
}
G_DEFINE_TYPE (GstSctpAssociation, gst_sctp_association, G_TYPE_OBJECT);
enum
{
SIGNAL_STREAM_RESET,
LAST_SIGNAL
};
enum
{
PROP_0,
PROP_ASSOCIATION_ID,
PROP_LOCAL_PORT,
PROP_REMOTE_PORT,
PROP_STATE,
PROP_USE_SOCK_STREAM,
NUM_PROPERTIES
};
static guint signals[LAST_SIGNAL] = { 0 };
static GParamSpec *properties[NUM_PROPERTIES];
#define DEFAULT_NUMBER_OF_SCTP_STREAMS 1024
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
#define DEFAULT_LOCAL_SCTP_PORT 0
#define DEFAULT_REMOTE_SCTP_PORT 0
static GHashTable *associations = NULL;
G_LOCK_DEFINE_STATIC (associations_lock);
static guint32 number_of_associations = 0;
/* Interface implementations */
static void gst_sctp_association_finalize (GObject * object);
static void gst_sctp_association_set_property (GObject * object, guint prop_id,
const GValue * value, GParamSpec * pspec);
static void gst_sctp_association_get_property (GObject * object, guint prop_id,
GValue * value, GParamSpec * pspec);
static struct socket *create_sctp_socket (GstSctpAssociation *
gst_sctp_association);
static struct sockaddr_conn get_sctp_socket_address (GstSctpAssociation *
gst_sctp_association, guint16 port);
static gboolean client_role_connect (GstSctpAssociation * self);
static int sctp_packet_out (void *addr, void *buffer, size_t length, guint8 tos,
guint8 set_df);
static int receive_cb (struct socket *sock, union sctp_sockstore addr,
void *data, size_t datalen, struct sctp_rcvinfo rcv_info, gint flags,
void *ulp_info);
static void handle_notification (GstSctpAssociation * self,
const union sctp_notification *notification, size_t length);
static void handle_association_changed (GstSctpAssociation * self,
const struct sctp_assoc_change *sac);
static void handle_stream_reset_event (GstSctpAssociation * self,
const struct sctp_stream_reset_event *ssr);
static void handle_message (GstSctpAssociation * self, guint8 * data,
guint32 datalen, guint16 stream_id, guint32 ppid);
static void maybe_set_state_to_ready (GstSctpAssociation * self);
Sebastian Dröge
committed
static gboolean gst_sctp_association_change_state (GstSctpAssociation * self,
Sebastian Dröge
committed
GstSctpAssociationState new_state, gboolean lock);
static void
gst_sctp_association_class_init (GstSctpAssociationClass * klass)
{
GObjectClass *gobject_class;
gobject_class = (GObjectClass *) klass;
gobject_class->finalize = gst_sctp_association_finalize;
gobject_class->set_property = gst_sctp_association_set_property;
gobject_class->get_property = gst_sctp_association_get_property;
signals[SIGNAL_STREAM_RESET] =
g_signal_new ("stream-reset", G_OBJECT_CLASS_TYPE (klass),
G_SIGNAL_RUN_FIRST, G_STRUCT_OFFSET (GstSctpAssociationClass,
on_sctp_stream_reset), NULL, NULL, NULL, G_TYPE_NONE, 1, G_TYPE_UINT);
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
properties[PROP_ASSOCIATION_ID] = g_param_spec_uint ("association-id",
"The SCTP association-id", "The SCTP association-id.", 0, G_MAXUSHORT,
DEFAULT_LOCAL_SCTP_PORT, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS);
properties[PROP_LOCAL_PORT] = g_param_spec_uint ("local-port", "Local SCTP",
"The local SCTP port for this association", 0, G_MAXUSHORT,
DEFAULT_LOCAL_SCTP_PORT, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS);
properties[PROP_REMOTE_PORT] =
g_param_spec_uint ("remote-port", "Remote SCTP",
"The remote SCTP port for this association", 0, G_MAXUSHORT,
DEFAULT_LOCAL_SCTP_PORT, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS);
properties[PROP_STATE] = g_param_spec_enum ("state", "SCTP Association state",
"The state of the SCTP association", GST_SCTP_ASSOCIATION_STATE_TYPE,
GST_SCTP_ASSOCIATION_STATE_NEW,
G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS);
properties[PROP_USE_SOCK_STREAM] =
g_param_spec_boolean ("use-sock-stream", "Use sock-stream",
"When set to TRUE, a sequenced, reliable, connection-based connection is used."
"When TRUE the partial reliability parameters of the channel is ignored.",
FALSE, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS);
g_object_class_install_properties (gobject_class, NUM_PROPERTIES, properties);
}
#define USRSCTP_GST_DEBUG_LEVEL GST_LEVEL_DEBUG
static void
gst_usrsctp_debug (const gchar * format, ...)
{
va_list varargs;
va_start (varargs, format);
gst_debug_log_valist (gst_sctp_debug_category, USRSCTP_GST_DEBUG_LEVEL,
__FILE__, GST_FUNCTION, __LINE__, NULL, format, varargs);
va_end (varargs);
}
static void
gst_sctp_association_init (GstSctpAssociation * self)
{
/* No need to lock mutex here as long as the function is only called from gst_sctp_association_get */
if (number_of_associations == 0) {
usrsctp_init (0, sctp_packet_out, gst_usrsctp_debug);
/* Explicit Congestion Notification */
usrsctp_sysctl_set_sctp_ecn_enable (0);
/* Do not send ABORTs in response to INITs (1).
* Do not send ABORTs for received Out of the Blue packets (2).
*/
usrsctp_sysctl_set_sctp_blackhole (2);
/* Enable interleaving messages for different streams (incoming)
* See: https://tools.ietf.org/html/rfc6458#section-8.1.20
*/
usrsctp_sysctl_set_sctp_default_frag_interleave (2);
usrsctp_sysctl_set_sctp_nr_outgoing_streams_default
(DEFAULT_NUMBER_OF_SCTP_STREAMS);
#if defined(SCTP_DEBUG) && !defined(GST_DISABLE_GST_DEBUG)
if (USRSCTP_GST_DEBUG_LEVEL <= GST_LEVEL_MAX
&& USRSCTP_GST_DEBUG_LEVEL <= _gst_debug_min
&& USRSCTP_GST_DEBUG_LEVEL <=
gst_debug_category_get_threshold (gst_sctp_debug_category)) {
usrsctp_sysctl_set_sctp_debug_on (SCTP_DEBUG_ALL);
}
#endif
}
number_of_associations++;
self->local_port = DEFAULT_LOCAL_SCTP_PORT;
self->remote_port = DEFAULT_REMOTE_SCTP_PORT;
self->sctp_ass_sock = NULL;
Sebastian Dröge
committed
g_mutex_init (&self->association_mutex);
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
self->state = GST_SCTP_ASSOCIATION_STATE_NEW;
self->use_sock_stream = FALSE;
usrsctp_register_address ((void *) self);
}
static void
gst_sctp_association_finalize (GObject * object)
{
GstSctpAssociation *self = GST_SCTP_ASSOCIATION (object);
G_LOCK (associations_lock);
g_hash_table_remove (associations, GUINT_TO_POINTER (self->association_id));
usrsctp_deregister_address ((void *) self);
number_of_associations--;
if (number_of_associations == 0) {
usrsctp_finish ();
}
G_UNLOCK (associations_lock);
G_OBJECT_CLASS (gst_sctp_association_parent_class)->finalize (object);
}
static void
gst_sctp_association_set_property (GObject * object, guint prop_id,
const GValue * value, GParamSpec * pspec)
{
GstSctpAssociation *self = GST_SCTP_ASSOCIATION (object);
Sebastian Dröge
committed
g_mutex_lock (&self->association_mutex);
if (self->state != GST_SCTP_ASSOCIATION_STATE_NEW) {
switch (prop_id) {
case PROP_LOCAL_PORT:
case PROP_REMOTE_PORT:
Sebastian Dröge
committed
GST_ERROR_OBJECT (self, "These properties cannot be set in this state");
goto error;
}
}
switch (prop_id) {
case PROP_ASSOCIATION_ID:
self->association_id = g_value_get_uint (value);
break;
case PROP_LOCAL_PORT:
self->local_port = g_value_get_uint (value);
break;
case PROP_REMOTE_PORT:
self->remote_port = g_value_get_uint (value);
break;
case PROP_STATE:
self->state = g_value_get_enum (value);
break;
case PROP_USE_SOCK_STREAM:
self->use_sock_stream = g_value_get_boolean (value);
break;
default:
G_OBJECT_WARN_INVALID_PROPERTY_ID (self, prop_id, pspec);
break;
}
Sebastian Dröge
committed
g_mutex_unlock (&self->association_mutex);
if (prop_id == PROP_LOCAL_PORT || prop_id == PROP_REMOTE_PORT)
maybe_set_state_to_ready (self);
return;
error:
Sebastian Dröge
committed
g_mutex_unlock (&self->association_mutex);
}
static void
maybe_set_state_to_ready (GstSctpAssociation * self)
{
gboolean signal_ready_state = FALSE;
Sebastian Dröge
committed
g_mutex_lock (&self->association_mutex);
if ((self->state == GST_SCTP_ASSOCIATION_STATE_NEW) &&
(self->local_port != 0 && self->remote_port != 0)
&& (self->packet_out_cb != NULL) && (self->packet_received_cb != NULL)) {
Sebastian Dröge
committed
signal_ready_state =
gst_sctp_association_change_state (self,
GST_SCTP_ASSOCIATION_STATE_READY, FALSE);
Sebastian Dröge
committed
g_mutex_unlock (&self->association_mutex);
Sebastian Dröge
committed
g_object_notify_by_pspec (G_OBJECT (self), properties[PROP_STATE]);
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
}
static void
gst_sctp_association_get_property (GObject * object, guint prop_id,
GValue * value, GParamSpec * pspec)
{
GstSctpAssociation *self = GST_SCTP_ASSOCIATION (object);
switch (prop_id) {
case PROP_ASSOCIATION_ID:
g_value_set_uint (value, self->association_id);
break;
case PROP_LOCAL_PORT:
g_value_set_uint (value, self->local_port);
break;
case PROP_REMOTE_PORT:
g_value_set_uint (value, self->remote_port);
break;
case PROP_STATE:
g_value_set_enum (value, self->state);
break;
case PROP_USE_SOCK_STREAM:
g_value_set_boolean (value, self->use_sock_stream);
break;
default:
G_OBJECT_WARN_INVALID_PROPERTY_ID (self, prop_id, pspec);
break;
}
}
/* Public functions */
GstSctpAssociation *
gst_sctp_association_get (guint32 association_id)
{
GstSctpAssociation *association;
G_LOCK (associations_lock);
Sebastian Dröge
committed
GST_DEBUG_CATEGORY_INIT (gst_sctp_association_debug_category,
"sctpassociation", 0, "debug category for sctpassociation");
GST_DEBUG_CATEGORY_INIT (gst_sctp_debug_category,
"sctplib", 0, "debug category for messages from usrsctp");
Sebastian Dröge
committed
if (!associations) {
associations =
g_hash_table_new_full (g_direct_hash, g_direct_equal, NULL, NULL);
}
association =
g_hash_table_lookup (associations, GUINT_TO_POINTER (association_id));
if (!association) {
association =
g_object_new (GST_SCTP_TYPE_ASSOCIATION, "association-id",
association_id, NULL);
g_hash_table_insert (associations, GUINT_TO_POINTER (association_id),
association);
} else {
g_object_ref (association);
}
G_UNLOCK (associations_lock);
return association;
}
gboolean
gst_sctp_association_start (GstSctpAssociation * self)
{
if (self->state != GST_SCTP_ASSOCIATION_STATE_READY) {
Sebastian Dröge
committed
GST_WARNING_OBJECT (self,
"SCTP association is in wrong state and cannot be started");
goto configure_required;
}
if ((self->sctp_ass_sock = create_sctp_socket (self)) == NULL)
goto error;
/* TODO: Support both server and client role */
if (!client_role_connect (self)) {
gst_sctp_association_change_state (self, GST_SCTP_ASSOCIATION_STATE_ERROR,
TRUE);
goto error;
}
Sebastian Dröge
committed
gst_sctp_association_change_state (self,
GST_SCTP_ASSOCIATION_STATE_CONNECTING, TRUE);
return TRUE;
error:
gst_sctp_association_change_state (self, GST_SCTP_ASSOCIATION_STATE_ERROR,
TRUE);
return FALSE;
configure_required:
return FALSE;
}
void
gst_sctp_association_set_on_packet_out (GstSctpAssociation * self,
GstSctpAssociationPacketOutCb packet_out_cb, gpointer user_data,
GDestroyNotify destroy_notify)
{
g_return_if_fail (GST_SCTP_IS_ASSOCIATION (self));
Sebastian Dröge
committed
g_mutex_lock (&self->association_mutex);
if (self->packet_out_destroy_notify)
self->packet_out_destroy_notify (self->packet_out_user_data);
self->packet_out_cb = packet_out_cb;
self->packet_out_user_data = user_data;
self->packet_out_destroy_notify = destroy_notify;
Sebastian Dröge
committed
g_mutex_unlock (&self->association_mutex);
maybe_set_state_to_ready (self);
}
void
gst_sctp_association_set_on_packet_received (GstSctpAssociation * self,
GstSctpAssociationPacketReceivedCb packet_received_cb, gpointer user_data,
GDestroyNotify destroy_notify)
{
g_return_if_fail (GST_SCTP_IS_ASSOCIATION (self));
Sebastian Dröge
committed
g_mutex_lock (&self->association_mutex);
if (self->packet_received_destroy_notify)
self->packet_received_destroy_notify (self->packet_received_user_data);
self->packet_received_cb = packet_received_cb;
self->packet_received_user_data = user_data;
self->packet_received_destroy_notify = destroy_notify;
Sebastian Dröge
committed
g_mutex_unlock (&self->association_mutex);
maybe_set_state_to_ready (self);
}
void
gst_sctp_association_incoming_packet (GstSctpAssociation * self,
const guint8 * buf, guint32 length)
{
usrsctp_conninput ((void *) self, (const void *) buf, (size_t) length, 0);
}
Sebastian Dröge
committed
GstFlowReturn
gst_sctp_association_send_data (GstSctpAssociation * self, const guint8 * buf,
guint32 length, guint16 stream_id, guint32 ppid, gboolean ordered,
Sebastian Dröge
committed
GstSctpAssociationPartialReliability pr, guint32 reliability_param,
guint32 * bytes_sent_)
Sebastian Dröge
committed
GstFlowReturn flow_ret;
Sebastian Dröge
committed
gint32 bytes_sent = 0;
struct sockaddr_conn remote_addr;
Sebastian Dröge
committed
g_mutex_lock (&self->association_mutex);
if (self->state != GST_SCTP_ASSOCIATION_STATE_CONNECTED) {
Sebastian Dröge
committed
if (self->state == GST_SCTP_ASSOCIATION_STATE_DISCONNECTED ||
self->state == GST_SCTP_ASSOCIATION_STATE_DISCONNECTING) {
GST_ERROR_OBJECT (self, "Disconnected");
flow_ret = GST_FLOW_EOS;
Sebastian Dröge
committed
g_mutex_unlock (&self->association_mutex);
Sebastian Dröge
committed
goto end;
} else {
GST_ERROR_OBJECT (self, "Association not connected yet");
flow_ret = GST_FLOW_ERROR;
Sebastian Dröge
committed
g_mutex_unlock (&self->association_mutex);
Sebastian Dröge
committed
goto end;
}
Sebastian Dröge
committed
remote_addr = get_sctp_socket_address (self, self->remote_port);
g_mutex_unlock (&self->association_mutex);
/* TODO: We probably want to split too large chunks into multiple packets
* and only set the SCTP_EOR flag on the last one. Firefox is using 0x4000
* as the maximum packet size
*/
memset (&spa, 0, sizeof (spa));
spa.sendv_sndinfo.snd_ppid = g_htonl (ppid);
spa.sendv_sndinfo.snd_sid = stream_id;
spa.sendv_sndinfo.snd_flags = SCTP_EOR | (ordered ? 0 : SCTP_UNORDERED);
spa.sendv_sndinfo.snd_context = 0;
spa.sendv_sndinfo.snd_assoc_id = 0;
spa.sendv_flags = SCTP_SEND_SNDINFO_VALID;
if (pr != GST_SCTP_ASSOCIATION_PARTIAL_RELIABILITY_NONE) {
spa.sendv_flags |= SCTP_SEND_PRINFO_VALID;
spa.sendv_prinfo.pr_value = g_htonl (reliability_param);
if (pr == GST_SCTP_ASSOCIATION_PARTIAL_RELIABILITY_TTL)
spa.sendv_prinfo.pr_policy = SCTP_PR_SCTP_TTL;
else if (pr == GST_SCTP_ASSOCIATION_PARTIAL_RELIABILITY_RTX)
spa.sendv_prinfo.pr_policy = SCTP_PR_SCTP_RTX;
else if (pr == GST_SCTP_ASSOCIATION_PARTIAL_RELIABILITY_BUF)
spa.sendv_prinfo.pr_policy = SCTP_PR_SCTP_BUF;
}
bytes_sent =
usrsctp_sendv (self->sctp_ass_sock, buf, length,
(struct sockaddr *) &remote_addr, 1, (void *) &spa,
(socklen_t) sizeof (struct sctp_sendv_spa), SCTP_SENDV_SPA, 0);
if (bytes_sent < 0) {
if (errno == EAGAIN || errno == EWOULDBLOCK) {
bytes_sent = 0;
/* Resending this buffer is taken care of by the gstsctpenc */
Sebastian Dröge
committed
flow_ret = GST_FLOW_OK;
GST_ERROR_OBJECT (self, "Error sending data on stream %u: (%u) %s",
stream_id, errno, g_strerror (errno));
Sebastian Dröge
committed
flow_ret = GST_FLOW_ERROR;
Sebastian Dröge
committed
flow_ret = GST_FLOW_OK;
Sebastian Dröge
committed
if (bytes_sent_)
*bytes_sent_ = bytes_sent;
Sebastian Dröge
committed
return flow_ret;
}
void
gst_sctp_association_reset_stream (GstSctpAssociation * self, guint16 stream_id)
{
struct sctp_reset_streams *srs;
socklen_t length;
length = (socklen_t) (sizeof (struct sctp_reset_streams) + sizeof (guint16));
srs = (struct sctp_reset_streams *) g_malloc0 (length);
srs->srs_flags = SCTP_STREAM_RESET_OUTGOING;
srs->srs_number_streams = 1;
srs->srs_stream_list[0] = stream_id;
usrsctp_setsockopt (self->sctp_ass_sock, IPPROTO_SCTP, SCTP_RESET_STREAMS,
srs, length);
g_free (srs);
}
void
gst_sctp_association_force_close (GstSctpAssociation * self)
{
if (self->sctp_ass_sock) {
Sebastian Dröge
committed
struct socket *s = self->sctp_ass_sock;
Sebastian Dröge
committed
usrsctp_close (s);
}
Sebastian Dröge
committed
gst_sctp_association_change_state (self,
GST_SCTP_ASSOCIATION_STATE_DISCONNECTED, TRUE);
}
static struct socket *
create_sctp_socket (GstSctpAssociation * self)
{
struct socket *sock;
struct linger l;
struct sctp_event event;
struct sctp_assoc_value stream_reset;
int buf_size = 1024 * 1024;
int value = 1;
guint16 event_types[] = {
SCTP_ASSOC_CHANGE,
SCTP_PEER_ADDR_CHANGE,
SCTP_REMOTE_ERROR,
SCTP_SEND_FAILED,
SCTP_SEND_FAILED_EVENT,
SCTP_SHUTDOWN_EVENT,
SCTP_ADAPTATION_INDICATION,
SCTP_PARTIAL_DELIVERY_EVENT,
/*SCTP_AUTHENTICATION_EVENT, */
SCTP_STREAM_RESET_EVENT,
/*SCTP_SENDER_DRY_EVENT, */
/*SCTP_NOTIFICATIONS_STOPPED_EVENT, */
/*SCTP_ASSOC_RESET_EVENT, */
SCTP_STREAM_CHANGE_EVENT
};
guint32 i;
guint sock_type = self->use_sock_stream ? SOCK_STREAM : SOCK_SEQPACKET;
if ((sock =
usrsctp_socket (AF_CONN, sock_type, IPPROTO_SCTP, receive_cb, NULL, 0,
Sebastian Dröge
committed
(void *) self)) == NULL) {
GST_ERROR_OBJECT (self, "Could not open SCTP socket: (%u) %s", errno,
g_strerror (errno));
Sebastian Dröge
committed
}
if (usrsctp_setsockopt (sock, SOL_SOCKET, SO_RCVBUF,
(const void *) &buf_size, sizeof (buf_size)) < 0) {
GST_ERROR_OBJECT (self, "Could not change receive buffer size: (%u) %s",
errno, g_strerror (errno));
goto error;
}
if (usrsctp_setsockopt (sock, SOL_SOCKET, SO_SNDBUF,
(const void *) &buf_size, sizeof (buf_size)) < 0) {
GST_ERROR_OBJECT (self, "Could not change send buffer size: (%u) %s",
errno, g_strerror (errno));
goto error;
}
Sebastian Dröge
committed
/* Properly return errors */
if (usrsctp_set_non_blocking (sock, 1) < 0) {
Sebastian Dröge
committed
GST_ERROR_OBJECT (self,
"Could not set non-blocking mode on SCTP socket: (%u) %s", errno,
g_strerror (errno));
goto error;
}
memset (&l, 0, sizeof (l));
l.l_onoff = 1;
l.l_linger = 0;
if (usrsctp_setsockopt (sock, SOL_SOCKET, SO_LINGER, (const void *) &l,
(socklen_t) sizeof (struct linger)) < 0) {
Sebastian Dröge
committed
GST_ERROR_OBJECT (self, "Could not set SO_LINGER on SCTP socket: (%u) %s",
errno, g_strerror (errno));
if (usrsctp_setsockopt (sock, IPPROTO_SCTP, SCTP_REUSE_PORT, &value,
sizeof (int))) {
GST_DEBUG_OBJECT (self, "Could not set SCTP_REUSE_PORT: (%u) %s", errno,
g_strerror (errno));
}
if (usrsctp_setsockopt (sock, IPPROTO_SCTP, SCTP_NODELAY, &value,
sizeof (int))) {
GST_DEBUG_OBJECT (self, "Could not set SCTP_NODELAY: (%u) %s", errno,
g_strerror (errno));
goto error;
}
if (usrsctp_setsockopt (sock, IPPROTO_SCTP, SCTP_EXPLICIT_EOR, &value,
sizeof (int))) {
GST_ERROR_OBJECT (self, "Could not set SCTP_EXPLICIT_EOR: (%u) %s", errno,
Sebastian Dröge
committed
g_strerror (errno));
goto error;
}
memset (&stream_reset, 0, sizeof (stream_reset));
stream_reset.assoc_id = SCTP_ALL_ASSOC;
stream_reset.assoc_value =
SCTP_ENABLE_RESET_STREAM_REQ | SCTP_ENABLE_CHANGE_ASSOC_REQ;
if (usrsctp_setsockopt (sock, IPPROTO_SCTP, SCTP_ENABLE_STREAM_RESET,
&stream_reset, sizeof (stream_reset))) {
GST_ERROR_OBJECT (self,
"Could not set SCTP_ENABLE_STREAM_RESET | SCTP_ENABLE_CHANGE_ASSOC_REQ: (%u) %s",
Sebastian Dröge
committed
errno, g_strerror (errno));
goto error;
}
memset (&event, 0, sizeof (event));
event.se_assoc_id = SCTP_ALL_ASSOC;
event.se_on = 1;
for (i = 0; i < sizeof (event_types) / sizeof (event_types[0]); i++) {
event.se_type = event_types[i];
if (usrsctp_setsockopt (sock, IPPROTO_SCTP, SCTP_EVENT,
&event, sizeof (event)) < 0) {
Sebastian Dröge
committed
GST_ERROR_OBJECT (self, "Failed to register event %u: (%u) %s",
event_types[i], errno, g_strerror (errno));
}
}
return sock;
error:
Sebastian Dröge
committed
if (sock)
usrsctp_close (sock);
return NULL;
}
static struct sockaddr_conn
get_sctp_socket_address (GstSctpAssociation * gst_sctp_association,
guint16 port)
{
struct sockaddr_conn addr;
memset ((void *) &addr, 0, sizeof (struct sockaddr_conn));
#ifdef __APPLE__
addr.sconn_len = sizeof (struct sockaddr_conn);
#endif
addr.sconn_family = AF_CONN;
addr.sconn_port = g_htons (port);
addr.sconn_addr = (void *) gst_sctp_association;
return addr;
}
static gboolean
client_role_connect (GstSctpAssociation * self)
{
Sebastian Dröge
committed
struct sockaddr_conn local_addr, remote_addr;
struct sctp_paddrparams paddrparams;
socklen_t opt_len;
Sebastian Dröge
committed
g_mutex_lock (&self->association_mutex);
local_addr = get_sctp_socket_address (self, self->local_port);
remote_addr = get_sctp_socket_address (self, self->remote_port);
g_mutex_unlock (&self->association_mutex);
Sebastian Dröge
committed
usrsctp_bind (self->sctp_ass_sock, (struct sockaddr *) &local_addr,
sizeof (struct sockaddr_conn));
if (ret < 0) {
Sebastian Dröge
committed
GST_ERROR_OBJECT (self, "usrsctp_bind() error: (%u) %s", errno,
g_strerror (errno));
goto error;
}
ret =
Sebastian Dröge
committed
usrsctp_connect (self->sctp_ass_sock, (struct sockaddr *) &remote_addr,
sizeof (struct sockaddr_conn));
if (ret < 0 && errno != EINPROGRESS) {
Sebastian Dröge
committed
GST_ERROR_OBJECT (self, "usrsctp_connect() error: (%u) %s", errno,
g_strerror (errno));
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
memset (&paddrparams, 0, sizeof (struct sctp_paddrparams));
memcpy (&paddrparams.spp_address, &remote_addr,
sizeof (struct sockaddr_conn));
opt_len = (socklen_t) sizeof (struct sctp_paddrparams);
ret =
usrsctp_getsockopt (self->sctp_ass_sock, IPPROTO_SCTP,
SCTP_PEER_ADDR_PARAMS, &paddrparams, &opt_len);
if (ret < 0) {
GST_WARNING_OBJECT (self,
"usrsctp_getsockopt(SCTP_PEER_ADDR_PARAMS) error: (%u) %s", errno,
g_strerror (errno));
} else {
/* draft-ietf-rtcweb-data-channel-13 section 5: max initial MTU IPV4 1200, IPV6 1280 */
paddrparams.spp_pathmtu = 1200;
paddrparams.spp_flags &= ~SPP_PMTUD_ENABLE;
paddrparams.spp_flags |= SPP_PMTUD_DISABLE;
opt_len = (socklen_t) sizeof (struct sctp_paddrparams);
ret = usrsctp_setsockopt (self->sctp_ass_sock, IPPROTO_SCTP,
SCTP_PEER_ADDR_PARAMS, &paddrparams, opt_len);
if (ret < 0) {
GST_WARNING_OBJECT (self,
"usrsctp_setsockopt(SCTP_PEER_ADDR_PARAMS) error: (%u) %s", errno,
g_strerror (errno));
} else {
GST_DEBUG_OBJECT (self, "PMTUD disabled, MTU set to %u",
paddrparams.spp_pathmtu);
}
}
return TRUE;
error:
return FALSE;
}
static int
sctp_packet_out (void *addr, void *buffer, size_t length, guint8 tos,
guint8 set_df)
{
GstSctpAssociation *self = GST_SCTP_ASSOCIATION (addr);
Sebastian Dröge
committed
g_mutex_lock (&self->association_mutex);
if (self->packet_out_cb) {
self->packet_out_cb (self, buffer, length, self->packet_out_user_data);
}
Sebastian Dröge
committed
g_mutex_unlock (&self->association_mutex);
return 0;
}
static int
receive_cb (struct socket *sock, union sctp_sockstore addr, void *data,
size_t datalen, struct sctp_rcvinfo rcv_info, gint flags, void *ulp_info)
{
GstSctpAssociation *self = GST_SCTP_ASSOCIATION (ulp_info);
if (!data) {
/* Not sure if this can happend. */
Sebastian Dröge
committed
GST_WARNING_OBJECT (self, "Received empty data buffer");
} else {
if (flags & MSG_NOTIFICATION) {
handle_notification (self, (const union sctp_notification *) data,
datalen);
/* We use this instead of a bare `free()` so that we use the `free` from
* the C runtime that usrsctp was built with. This makes a difference on
* Windows where libusrstcp and GStreamer can be linked to two different
* CRTs. */
usrsctp_freedumpbuffer (data);
} else {
handle_message (self, data, datalen, rcv_info.rcv_sid,
ntohl (rcv_info.rcv_ppid));
}
}
return 1;
}
static void
handle_notification (GstSctpAssociation * self,
const union sctp_notification *notification, size_t length)
{
g_assert (notification->sn_header.sn_length == length);
switch (notification->sn_header.sn_type) {
case SCTP_ASSOC_CHANGE:
Sebastian Dröge
committed
GST_DEBUG_OBJECT (self, "Event: SCTP_ASSOC_CHANGE");
handle_association_changed (self, ¬ification->sn_assoc_change);
break;
case SCTP_PEER_ADDR_CHANGE:
Sebastian Dröge
committed
GST_DEBUG_OBJECT (self, "Event: SCTP_PEER_ADDR_CHANGE");
break;
case SCTP_REMOTE_ERROR:
Sebastian Dröge
committed
GST_ERROR_OBJECT (self, "Event: SCTP_REMOTE_ERROR (%u)",
notification->sn_remote_error.sre_error);
break;
case SCTP_SEND_FAILED:
Sebastian Dröge
committed
GST_ERROR_OBJECT (self, "Event: SCTP_SEND_FAILED");
break;
case SCTP_SHUTDOWN_EVENT:
Sebastian Dröge
committed
GST_DEBUG_OBJECT (self, "Event: SCTP_SHUTDOWN_EVENT");
Sebastian Dröge
committed
gst_sctp_association_change_state (self,
GST_SCTP_ASSOCIATION_STATE_DISCONNECTING, TRUE);
break;
case SCTP_ADAPTATION_INDICATION:
Sebastian Dröge
committed
GST_DEBUG_OBJECT (self, "Event: SCTP_ADAPTATION_INDICATION");
break;
case SCTP_PARTIAL_DELIVERY_EVENT:
Sebastian Dröge
committed
GST_DEBUG_OBJECT (self, "Event: SCTP_PARTIAL_DELIVERY_EVENT");
break;
case SCTP_AUTHENTICATION_EVENT:
Sebastian Dröge
committed
GST_DEBUG_OBJECT (self, "Event: SCTP_AUTHENTICATION_EVENT");
break;
case SCTP_STREAM_RESET_EVENT:
Sebastian Dröge
committed
GST_DEBUG_OBJECT (self, "Event: SCTP_STREAM_RESET_EVENT");
handle_stream_reset_event (self, ¬ification->sn_strreset_event);
break;
case SCTP_SENDER_DRY_EVENT:
Sebastian Dröge
committed
GST_DEBUG_OBJECT (self, "Event: SCTP_SENDER_DRY_EVENT");
break;
case SCTP_NOTIFICATIONS_STOPPED_EVENT:
Sebastian Dröge
committed
GST_DEBUG_OBJECT (self, "Event: SCTP_NOTIFICATIONS_STOPPED_EVENT");
break;
case SCTP_ASSOC_RESET_EVENT:
Sebastian Dröge
committed
GST_DEBUG_OBJECT (self, "Event: SCTP_ASSOC_RESET_EVENT");
break;
case SCTP_STREAM_CHANGE_EVENT:
Sebastian Dröge
committed
GST_DEBUG_OBJECT (self, "Event: SCTP_STREAM_CHANGE_EVENT");
break;
case SCTP_SEND_FAILED_EVENT:
Sebastian Dröge
committed
GST_ERROR_OBJECT (self, "Event: SCTP_SEND_FAILED_EVENT (%u)",
notification->sn_send_failed_event.ssfe_error);
break;
default:
break;
}
}
static void
handle_association_changed (GstSctpAssociation * self,
const struct sctp_assoc_change *sac)
{
gboolean change_state = FALSE;
GstSctpAssociationState new_state;
switch (sac->sac_state) {
case SCTP_COMM_UP:
Sebastian Dröge
committed
GST_DEBUG_OBJECT (self, "SCTP_COMM_UP");
Sebastian Dröge
committed
g_mutex_lock (&self->association_mutex);
if (self->state == GST_SCTP_ASSOCIATION_STATE_CONNECTING) {
change_state = TRUE;
new_state = GST_SCTP_ASSOCIATION_STATE_CONNECTED;
Sebastian Dröge
committed
GST_DEBUG_OBJECT (self, "SCTP association connected!");
} else if (self->state == GST_SCTP_ASSOCIATION_STATE_CONNECTED) {
Sebastian Dröge
committed
GST_FIXME_OBJECT (self, "SCTP association already open");
Sebastian Dröge
committed
GST_WARNING_OBJECT (self, "SCTP association in unexpected state");
Sebastian Dröge
committed
g_mutex_unlock (&self->association_mutex);
break;
case SCTP_COMM_LOST:
Sebastian Dröge
committed
GST_WARNING_OBJECT (self, "SCTP event SCTP_COMM_LOST received");
Sebastian Dröge
committed
change_state = TRUE;
new_state = GST_SCTP_ASSOCIATION_STATE_ERROR;
break;
case SCTP_RESTART:
Sebastian Dröge
committed
GST_DEBUG_OBJECT (self, "SCTP event SCTP_RESTART received");
break;
case SCTP_SHUTDOWN_COMP:
Sebastian Dröge
committed
GST_DEBUG_OBJECT (self, "SCTP event SCTP_SHUTDOWN_COMP received");
change_state = TRUE;
new_state = GST_SCTP_ASSOCIATION_STATE_DISCONNECTED;
break;
case SCTP_CANT_STR_ASSOC:
Sebastian Dröge
committed
GST_WARNING_OBJECT (self, "SCTP event SCTP_CANT_STR_ASSOC received");
Sebastian Dröge
committed
change_state = TRUE;
new_state = GST_SCTP_ASSOCIATION_STATE_ERROR;
908
909
910
911
912
913
914
915
916
917
918
919
920
921
922
923
924
925
926
927
928
929
930
931
932
933
934
935
936
break;
}
if (change_state)
gst_sctp_association_change_state (self, new_state, TRUE);
}
static void
handle_stream_reset_event (GstSctpAssociation * self,
const struct sctp_stream_reset_event *sr)
{
guint32 i, n;
if (!(sr->strreset_flags & SCTP_STREAM_RESET_DENIED) &&
!(sr->strreset_flags & SCTP_STREAM_RESET_DENIED)) {
n = (sr->strreset_length -
sizeof (struct sctp_stream_reset_event)) / sizeof (uint16_t);
for (i = 0; i < n; i++) {
if (sr->strreset_flags & SCTP_STREAM_RESET_INCOMING_SSN) {
g_signal_emit (self, signals[SIGNAL_STREAM_RESET], 0,
sr->strreset_stream_list[i]);
}
}
}
}
static void
handle_message (GstSctpAssociation * self, guint8 * data, guint32 datalen,
guint16 stream_id, guint32 ppid)
{
Sebastian Dröge
committed
g_mutex_lock (&self->association_mutex);
if (self->packet_received_cb) {
/* It's the callbacks job to free the data correctly */
self->packet_received_cb (self, data, datalen, stream_id, ppid,
self->packet_received_user_data);
} else {
/* We use this instead of a bare `free()` so that we use the `free` from
* the C runtime that usrsctp was built with. This makes a difference on
* Windows where libusrstcp and GStreamer can be linked to two different
* CRTs. */
usrsctp_freedumpbuffer ((gchar *) data);
Sebastian Dröge
committed
g_mutex_unlock (&self->association_mutex);
Sebastian Dröge
committed
/* Returns TRUE if lock==FALSE and notification is needed later.
* Takes the mutex shortly if lock==TRUE! */
Sebastian Dröge
committed
static gboolean
gst_sctp_association_change_state (GstSctpAssociation * self,
Sebastian Dröge
committed
GstSctpAssociationState new_state, gboolean lock)
Sebastian Dröge
committed
if (lock)
g_mutex_lock (&self->association_mutex);
Sebastian Dröge
committed
if (self->state != new_state
&& self->state != GST_SCTP_ASSOCIATION_STATE_ERROR) {
self->state = new_state;
Sebastian Dröge
committed
if (lock) {
g_mutex_unlock (&self->association_mutex);
Sebastian Dröge
committed
g_object_notify_by_pspec (G_OBJECT (self), properties[PROP_STATE]);
return FALSE;
} else {
return TRUE;
}
} else {
Sebastian Dröge
committed
if (lock)
g_mutex_unlock (&self->association_mutex);
Sebastian Dröge
committed
return FALSE;
}