Commit dfab04cd authored by Philip Withnall's avatar Philip Withnall Committed by Olivier Crête

socket: Add vectored I/O support for receiving on sockets

Replace the recv() API with a recv_messages() API, which supports
receiving multiple messages, each with multiple buffers rather than a
single monolithic buffer.

This doesn’t break API, as the socket API is not exposed outside
libnice. It does introduce a new struct: NiceInputMessage, which is
analogous to struct mmsghdr.

This includes updates to the test-bsd test to cover the changed API.
parent bc527dcf
......@@ -181,4 +181,10 @@ component_io_cb (
gssize agent_recv_locked (NiceAgent *agent, Stream *stream,
Component *component, NiceSocket *socket, guint8 *buf, gsize buf_len);
gsize
memcpy_buffer_to_input_message (NiceInputMessage *message,
const guint8 *buffer, gsize buffer_length);
guint8 *
compact_input_message (NiceInputMessage *message, gsize *buffer_length);
#endif /*_NICE_AGENT_PRIV_H */
......@@ -2438,13 +2438,19 @@ agent_recv_locked (
GList *item;
guint8 local_buf[MAX_BUFFER_SIZE];
gsize local_buf_len = MAX_BUFFER_SIZE;
GInputVector local_bufs = { local_buf, local_buf_len };
NiceInputMessage local_messages = { &local_bufs, 1, &from, 0 };
gint n_valid_messages;
/* Returns -1 on error, 0 on EWOULDBLOCK, and > 0 on success.
*
* FIXME: We have to receive into a local buffer then copy out because
* otherwise, if @buf is too small, we could lose data, even when in
* reliable mode (because reliable streams are packetised). */
len = nice_socket_recv (socket, &from, local_buf_len, (gchar *) local_buf);
n_valid_messages = nice_socket_recv_messages (socket, &local_messages, 1);
len = (n_valid_messages == 1) ?
(gssize) local_messages.length : n_valid_messages;
if (len == 0) {
return 0;
......@@ -2555,6 +2561,99 @@ agent_recv_locked (
return len;
}
/* Print the composition of an array of messages. No-op if debugging is
* disabled. */
static void
nice_debug_message_composition (NiceInputMessage *messages, guint n_messages)
{
#ifndef NDEBUG
guint i;
for (i = 0; i < n_messages; i++) {
NiceInputMessage *message = &messages[i];
guint j;
nice_debug ("Message %p (from: %p, length: %" G_GSIZE_FORMAT ")", message,
message->from, message->length);
for (j = 0;
(message->n_buffers >= 0 && j < (guint) message->n_buffers) ||
(message->n_buffers < 0 && message->buffers[j].buffer != NULL);
j++) {
GInputVector *buffer = &message->buffers[j];
nice_debug ("\tBuffer %p (length: %" G_GSIZE_FORMAT ")", buffer->buffer,
buffer->size);
}
}
#endif
}
/* Concatenate all the buffers in the given @recv_message into a single, newly
* allocated, monolithic buffer which is returned. The length of the new buffer
* is returned in @buffer_length, and should be equal to the length field of
* @recv_message.
*
* The return value must be freed with g_free(). */
guint8 *
compact_input_message (NiceInputMessage *message, gsize *buffer_length)
{
guint8 *buffer;
gsize offset = 0;
guint i;
nice_debug ("%s: **WARNING: SLOW PATH**", G_STRFUNC);
nice_debug_message_composition (message, 1);
*buffer_length = message->length;
buffer = g_malloc (*buffer_length);
for (i = 0;
(message->n_buffers >= 0 && i < (guint) message->n_buffers) ||
(message->n_buffers < 0 && message->buffers[i].buffer != NULL);
i++) {
gsize len = MIN (*buffer_length - offset, message->buffers[i].size);
memcpy (buffer + offset, message->buffers[i].buffer, len);
offset += len;
}
return buffer;
}
/* Returns the number of bytes copied. Silently drops any data from @buffer
* which doesn’t fit in @message. */
gsize
memcpy_buffer_to_input_message (NiceInputMessage *message,
const guint8 *buffer, gsize buffer_length)
{
guint i;
nice_debug ("%s: **WARNING: SLOW PATH**", G_STRFUNC);
message->length = 0;
for (i = 0;
buffer_length > 0 &&
((message->n_buffers >= 0 && i < (guint) message->n_buffers) ||
(message->n_buffers < 0 && message->buffers[i].buffer != NULL));
i++) {
gsize len;
len = MIN (message->buffers[i].size, buffer_length);
memcpy (message->buffers[i].buffer, buffer, len);
buffer += len;
buffer_length -= len;
message->buffers[i].size = len;
message->length += len;
}
nice_debug_message_composition (message, 1);
return message->length;
}
static gboolean
nice_agent_recv_cancelled_cb (GCancellable *cancellable, gpointer user_data)
{
......
......@@ -137,6 +137,40 @@ typedef struct _NiceAgent NiceAgent;
G_BEGIN_DECLS
/**
* NiceInputMessage:
* @buffers: (array length=n_buffers): unowned array of #GInputVector buffers to
* store data in for this message
* @n_buffers: number of #GInputVectors in @buffers, or -1 to indicate @buffers
* is %NULL-terminated
* @from: (allow-none): return location to store the address of the peer who
* transmitted the message, or %NULL
* @length: total number of valid bytes contiguously stored in @buffers
*
* Represents a single message received off the network. For reliable
* connections, this is essentially just an array of buffers (specifically,
* @from can be ignored). for non-reliable connections, it represents a single
* packet as received from the OS.
*
* @n_buffers may be -1 to indicate that @buffers is terminated by a
* #GInputVector with a %NULL buffer pointer.
*
* By providing arrays of #NiceInputMessages to functions like
* nice_agent_recv_messages(), multiple messages may be received with a single
* call, which is more efficient than making multiple calls in a loop. In this
* manner, nice_agent_recv_messages() is analogous to recvmmsg(); and
* #NiceInputMessage to struct mmsghdr.
*
* Since: 0.1.5
*/
typedef struct {
GInputVector *buffers;
gint n_buffers; /* may be -1 to indicate @buffers is NULL-terminated */
NiceAddress *from; /* return location for address of message sender */
gsize length; /* sum of the lengths of @buffers */
} NiceInputMessage;
#define NICE_TYPE_AGENT nice_agent_get_type()
#define NICE_AGENT(obj) \
......
......@@ -7,6 +7,7 @@ NiceComponentType
NiceProxyType
NiceCompatibility
NiceAgentRecvFunc
NiceInputMessage
NICE_AGENT_MAX_REMOTE_CANDIDATES
nice_agent_new
nice_agent_new_reliable
......
......@@ -70,9 +70,15 @@ typedef struct {
gchar *username;
gchar *password;
GQueue send_queue;
gchar *recv_buf;
gint recv_len;
gint content_length;
/* Ring buffer for receiving HTTP headers into before they’re parsed. */
guint8 *recv_buf;
gsize recv_buf_length; /* allocation size of @recv_buf */
gsize recv_buf_pos; /* offset from @recv_buf of the 0th byte in the buffer */
gsize recv_buf_fill; /* number of bytes occupied in the buffer */
/* Parsed from the Content-Length header provided by the other endpoint. */
gsize content_length;
} HttpPriv;
......@@ -84,8 +90,8 @@ struct to_be_sent {
static void socket_close (NiceSocket *sock);
static gint socket_recv (NiceSocket *sock, NiceAddress *from,
guint len, gchar *buf);
static gint socket_recv_messages (NiceSocket *sock,
NiceInputMessage *recv_messages, guint n_recv_messages);
static gboolean socket_send (NiceSocket *sock, const NiceAddress *to,
guint len, const gchar *buf);
static gboolean socket_is_reliable (NiceSocket *sock);
......@@ -111,14 +117,16 @@ nice_http_socket_new (NiceSocket *base_socket,
priv->username = g_strdup (username);
priv->password = g_strdup (password);
priv->recv_buf = NULL;
priv->recv_len = 0;
priv->recv_buf_length = 0;
priv->recv_buf_pos = 0;
priv->recv_buf_fill = 0;
priv->content_length = 0;
sock->fileno = priv->base_socket->fileno;
sock->addr = priv->base_socket->addr;
sock->send = socket_send;
sock->recv = socket_recv;
sock->recv_messages = socket_recv_messages;
sock->is_reliable = socket_is_reliable;
sock->close = socket_close;
......@@ -183,152 +191,349 @@ socket_close (NiceSocket *sock)
g_slice_free(HttpPriv, sock->priv);
}
static void
assert_ring_buffer_valid (HttpPriv *priv)
{
g_assert (priv->recv_buf_fill <= priv->recv_buf_length);
g_assert (priv->recv_buf_pos == 0 ||
priv->recv_buf_pos < priv->recv_buf_length);
g_assert (priv->recv_buf_length == 0 || priv->recv_buf != NULL);
}
/* Pops up to @buffer_length bytes off the ring buffer and copies them into
* @buffer. Returns the number of bytes copied. */
static gsize
memcpy_ring_buffer_to_buffer (HttpPriv *priv,
guint8 *buffer, gsize buffer_length)
{
gsize len, consumed = 0;
gboolean has_wrapped;
has_wrapped =
(priv->recv_buf_pos + priv->recv_buf_fill) > priv->recv_buf_length;
if (has_wrapped) {
len = MIN (priv->recv_buf_length - priv->recv_buf_pos, buffer_length);
memcpy (buffer, priv->recv_buf + priv->recv_buf_pos, len);
consumed += len;
buffer += len;
buffer_length -= len;
len = MIN (priv->recv_buf_fill - len, buffer_length);
memcpy (buffer, priv->recv_buf, len);
consumed += len;
} else {
len = MIN (priv->recv_buf_fill, buffer_length);
memcpy (buffer, priv->recv_buf + priv->recv_buf_pos, len);
consumed += len;
}
priv->recv_buf_pos =
(priv->recv_buf_pos + consumed) % priv->recv_buf_length;
priv->recv_buf_fill -= consumed;
return consumed;
}
/* Returns the number of messages touched. Silently drops any data from @buffer
* which doesn’t fit in @messages. Updates the ring buffer to pop the copied
* data off it. Treats all #GInputVectors in @messages the same; there is no
* differentiation between different #NiceInputMessages. */
static gint
memcpy_ring_buffer_to_input_messages (HttpPriv *priv,
NiceInputMessage *messages, guint n_messages)
{
guint i, j;
for (i = 0; priv->recv_buf_fill > 0 && i < n_messages; i++) {
NiceInputMessage *message = &messages[i];
for (j = 0;
priv->recv_buf_fill > 0 &&
((message->n_buffers >= 0 && j < (guint) message->n_buffers) ||
(message->n_buffers < 0 && message->buffers[j].buffer != NULL));
j++) {
message->buffers[j].size =
memcpy_ring_buffer_to_buffer (priv,
message->buffers[j].buffer, message->buffers[j].size);
message->length += message->buffers[j].size;
}
}
return i;
}
/* FIXME: The current implementation of socket_recv_message() is a fast
* pass-through to nice_socket_recv_message() if the HTTP socket is connected,
* but is a slow state machine otherwise, using multiple memcpy()s. Spruce it up
* to better to use the recv_messages to avoid the memcpy()s. */
static gint
socket_recv (NiceSocket *sock, NiceAddress *from, guint len, gchar *buf)
socket_recv_messages (NiceSocket *sock,
NiceInputMessage *recv_messages, guint n_recv_messages)
{
HttpPriv *priv = sock->priv;
gint read = -1;
gint ret = -1;
if (from)
*from = priv->addr;
if (priv->state == HTTP_STATE_CONNECTED) {
guint i;
if (priv->base_socket)
read = nice_socket_recv (priv->base_socket, NULL, len, buf);
/* Fast path: pass through to the base socket once we’re connected. */
if (priv->base_socket) {
ret = nice_socket_recv_messages (priv->base_socket,
recv_messages, n_recv_messages);
}
if (ret <= 0)
return ret;
if (read <= 0 || priv->state == HTTP_STATE_CONNECTED) {
return read;
/* After successfully receiving into at least one NiceInputMessage,
* update the from address in each valid NiceInputMessage. */
for (i = 0; i < (guint) ret; i++) {
if (recv_messages[i].from != NULL)
*recv_messages[i].from = priv->addr;
}
return ret;
} else {
priv->recv_buf = g_realloc (priv->recv_buf, priv->recv_len + read);
memcpy (priv->recv_buf + priv->recv_len, buf, read);
priv->recv_len += read;
/* Slow path: read into a local ring buffer until we’re parsed enough of the
* headers. Double the buffer in size every time it fills up. */
gboolean has_wrapped;
GInputVector local_recv_bufs[2];
NiceInputMessage local_recv_message = { local_recv_bufs, 2, NULL, 0 };
/* Has the buffer filled up? Start with an initial buffer of 1KB, which
* should cover the average size of HTTP response headers. Source:
* http://dev.chromium.org/spdy/spdy-whitepaper */
if (priv->recv_buf_fill == priv->recv_buf_length) {
priv->recv_buf_length = MAX (priv->recv_buf_length * 2, 1024);
priv->recv_buf = g_realloc (priv->recv_buf, priv->recv_buf_length);
}
assert_ring_buffer_valid (priv);
/* Read some data into the buffer. Use two GInputVectors: one for the tail
* of the buffer and one for the head. */
has_wrapped =
(priv->recv_buf_pos + priv->recv_buf_fill) > priv->recv_buf_length;
if (has_wrapped) {
local_recv_bufs[0].buffer =
priv->recv_buf + (priv->recv_buf_pos + priv->recv_buf_fill) %
priv->recv_buf_length;
local_recv_bufs[0].size = priv->recv_buf_length - priv->recv_buf_fill;
local_recv_bufs[1].buffer = NULL;
local_recv_bufs[1].size = 0;
} else {
local_recv_bufs[0].buffer =
priv->recv_buf + priv->recv_buf_pos + priv->recv_buf_fill;
local_recv_bufs[0].size =
priv->recv_buf_length - (priv->recv_buf_pos + priv->recv_buf_fill);
local_recv_bufs[1].buffer = priv->recv_buf;
local_recv_bufs[1].size = priv->recv_buf_pos;
}
if (priv->base_socket) {
ret = nice_socket_recv_messages (priv->base_socket,
&local_recv_message, 1);
}
if (ret <= 0)
return ret;
/* Update the buffer’s metadata. */
priv->recv_buf_fill += local_recv_message.length;
assert_ring_buffer_valid (priv);
/* Fall through and try parsing the newly received data. */
}
retry:
nice_debug ("Receiving from HTTP proxy (state %d) : %d \n'%s'", priv->state, priv->recv_len, priv->recv_buf);
#define GET_BYTE(pos) \
priv->recv_buf[(pos + priv->recv_buf_pos) % priv->recv_buf_length]
#define EAT_WHITESPACE(pos) \
while (pos < priv->recv_buf_fill && GET_BYTE(pos) == ' ') \
pos++; \
if (pos >= priv->recv_buf_fill) \
goto not_enough_data;
retry:
nice_debug ("Receiving from HTTP proxy (state %d) : %" G_GSSIZE_FORMAT " \n"
"'%s'", priv->state, priv->recv_buf_fill,
priv->recv_buf + priv->recv_buf_pos);
switch (priv->state) {
case HTTP_STATE_INIT:
{
gint pos = 0;
/* This is a logical position in the recv_buf; add
* (priv->recv_buf + priv->recv_buf_pos) to get the actual byte in
* memory. */
guint pos = 0;
/* Remove any leading spaces (could happen!) */
while (pos < priv->recv_len && priv->recv_buf[pos] == ' ')
pos++;
/* Make sure we have enough data */
if (pos >= priv->recv_len)
goto not_enough_data;
/* Eat leading whitespace and check we have enough data. */
EAT_WHITESPACE (pos);
if (pos + 7 > priv->recv_len)
if (pos + 7 > priv->recv_buf_fill)
goto not_enough_data;
if (strncmp (priv->recv_buf + pos, "HTTP/1.", 7) != 0)
if (GET_BYTE (pos + 0) != 'H' ||
GET_BYTE (pos + 1) != 'T' ||
GET_BYTE (pos + 2) != 'T' ||
GET_BYTE (pos + 3) != 'P' ||
GET_BYTE (pos + 4) != '/' ||
GET_BYTE (pos + 5) != '1' ||
GET_BYTE (pos + 6) != '.')
goto error;
pos += 7;
if (pos >= priv->recv_len)
if (pos >= priv->recv_buf_fill)
goto not_enough_data;
if(priv->recv_buf[pos] != '0' && priv->recv_buf[pos] != '1')
if (GET_BYTE (pos) != '0' && GET_BYTE (pos) != '1')
goto error;
pos++;
/* Make sure we have a space after the HTTP version */
if (pos >= priv->recv_len)
if (pos >= priv->recv_buf_fill)
goto not_enough_data;
if (priv->recv_buf[pos] != ' ')
if (GET_BYTE (pos) != ' ')
goto error;
/* Skip all spaces (could be more than one!) */
while (pos < priv->recv_len && priv->recv_buf[pos] == ' ')
pos++;
if (pos >= priv->recv_len)
goto not_enough_data;
EAT_WHITESPACE (pos);
/* Check for a successfull 2xx code */
if (pos + 3 > priv->recv_len)
/* Check for a successful 2xx code */
if (pos + 3 > priv->recv_buf_fill)
goto not_enough_data;
if (priv->recv_buf[pos] != '2' ||
priv->recv_buf[pos+1] < '0' || priv->recv_buf[pos+1] > '9' ||
priv->recv_buf[pos+2] < '0' || priv->recv_buf[pos+2] > '9')
if (GET_BYTE (pos) != '2' ||
GET_BYTE (pos + 1) < '0' || GET_BYTE (pos + 1) > '9' ||
GET_BYTE (pos + 2) < '0' || GET_BYTE (pos + 2) > '9')
goto error;
/* Clear any trailing chars */
while (pos + 1 < priv->recv_len &&
priv->recv_buf[pos] != '\r' && priv->recv_buf[pos+1] != '\n')
while (pos + 1 < priv->recv_buf_fill &&
GET_BYTE (pos) != '\r' && GET_BYTE (pos + 1) != '\n')
pos++;
if (pos + 1 >= priv->recv_len)
if (pos + 1 >= priv->recv_buf_fill)
goto not_enough_data;
pos += 2;
/* consume the data we just parsed */
priv->recv_len -= pos;
memmove (priv->recv_buf, priv->recv_buf + pos, priv->recv_len);
priv->recv_buf = g_realloc (priv->recv_buf, priv->recv_len);
/* Consume the data we just parsed. */
priv->recv_buf_pos = (priv->recv_buf_pos + pos) % priv->recv_buf_length;
priv->recv_buf_fill -= pos;
priv->content_length = 0;
priv->state = HTTP_STATE_HEADERS;
goto retry;
}
break;
case HTTP_STATE_HEADERS:
{
gint pos = 0;
if (pos + 15 < priv->recv_len &&
g_ascii_strncasecmp (priv->recv_buf, "Content-Length:", 15) == 0) {
priv->content_length = atoi(priv->recv_buf + 15);
guint pos = 0;
if (pos + 15 < priv->recv_buf_fill &&
(GET_BYTE (pos + 0) == 'C' || GET_BYTE (pos + 0) == 'c') &&
(GET_BYTE (pos + 1) == 'o' || GET_BYTE (pos + 1) == 'O') &&
(GET_BYTE (pos + 2) == 'n' || GET_BYTE (pos + 2) == 'N') &&
(GET_BYTE (pos + 3) == 't' || GET_BYTE (pos + 3) == 'T') &&
(GET_BYTE (pos + 4) == 'e' || GET_BYTE (pos + 4) == 'E') &&
(GET_BYTE (pos + 5) == 'n' || GET_BYTE (pos + 5) == 'N') &&
(GET_BYTE (pos + 6) == 't' || GET_BYTE (pos + 6) == 'T') &&
GET_BYTE (pos + 7) == '-' &&
(GET_BYTE (pos + 8) == 'L' || GET_BYTE (pos + 8) == 'l') &&
(GET_BYTE (pos + 9) == 'e' || GET_BYTE (pos + 9) == 'E') &&
(GET_BYTE (pos + 10) == 'n' || GET_BYTE (pos + 10) == 'N') &&
(GET_BYTE (pos + 11) == 'g' || GET_BYTE (pos + 11) == 'G') &&
(GET_BYTE (pos + 12) == 't' || GET_BYTE (pos + 12) == 'T') &&
(GET_BYTE (pos + 13) == 'h' || GET_BYTE (pos + 13) == 'H') &&
GET_BYTE (pos + 14) == ':') {
/* Found a Content-Length header. Parse and store the value. Note that
* the HTTP standard allows for arbitrarily-big content lengths. We
* limit it to G_MAXSIZE for sanity’s sake.
*
* The code below is equivalent to strtoul(input, NULL, 10), but
* operates on a ring buffer. */
pos += 15;
EAT_WHITESPACE (pos);
priv->content_length = 0;
while (TRUE) {
guint8 byte = GET_BYTE (pos);
gint val = g_ascii_digit_value (byte);
if (byte == '\r') {
/* Reached the end of the value; fall out to the code below which
* will grab the \n. */
break;
} else if (val == -1) {
priv->content_length = 0;
goto error;
}
/* Check for overflow. Don’t flag it as an error; just fall through
* to the code below which will skip to the \r\n. */
if (priv->content_length > G_MAXSIZE / 10 ||
priv->content_length * 10 > G_MAXSIZE - val) {
priv->content_length = 0;
break;
}
priv->content_length = (priv->content_length * 10) + val;
if (pos + 1 > priv->recv_buf_fill)
goto not_enough_data;
pos++;
}
}
while (pos + 1 < priv->recv_len &&
priv->recv_buf[pos] != '\r' && priv->recv_buf[pos+1] != '\n')
/* Skip over the header. */
while (pos + 1 < priv->recv_buf_fill &&
GET_BYTE (pos) != '\r' && GET_BYTE (pos + 1) != '\n')
pos++;
nice_debug ("pos = %d, len = %d", pos, priv->recv_len);
if (pos + 1 >= priv->recv_len)
nice_debug ("pos = %u, fill = %" G_GSSIZE_FORMAT,
pos, priv->recv_buf_fill);
if (pos + 1 >= priv->recv_buf_fill)
goto not_enough_data;
pos += 2;
/* consume the data we just parsed */
priv->recv_len -= pos;
memmove (priv->recv_buf, priv->recv_buf + pos, priv->recv_len);
priv->recv_buf = g_realloc (priv->recv_buf, priv->recv_len);
/* Consume the data we just parsed. */
priv->recv_buf_pos = (priv->recv_buf_pos + pos) % priv->recv_buf_length;
priv->recv_buf_fill -= pos;
if (pos == 2)
priv->state = HTTP_STATE_BODY;
goto retry;
}
break;
case HTTP_STATE_BODY:
{
gint consumed = priv->content_length;
gsize consumed;
if (priv->content_length == 0) {
priv->state = HTTP_STATE_CONNECTED;
goto retry;
}
if (priv->recv_len == 0)
if (priv->recv_buf_fill == 0)
goto not_enough_data;
if (priv->content_length > priv->recv_len)
consumed = priv->recv_len;
consumed = MIN (priv->content_length, priv->recv_buf_fill);
priv->recv_len -= consumed;
priv->recv_buf_pos =
(priv->recv_buf_pos + consumed) % priv->recv_buf_length;
priv->recv_buf_fill -= consumed;
priv->content_length -= consumed;
memmove (priv->recv_buf, priv->recv_buf + consumed, priv->recv_len);
priv->recv_buf = g_realloc (priv->recv_buf, priv->recv_len);