Commit cdb7cad9 authored by Youness Alaoui's avatar Youness Alaoui Committed by Olivier Crête

Factor out the add_to_be_sent into a helper API for sockets

The add_to_be_sent was duplicated in http/socks5/pseudossl/tcp-bsd and
had some small differences. It's better to factor it out so bug fixes
get applied to all of them and code is cleaner.
parent d176333b
......@@ -21,6 +21,7 @@ noinst_LTLIBRARIES = libsocket.la
libsocket_la_SOURCES = \
socket.h \
socket-priv.h \
socket.c \
udp-bsd.h \
udp-bsd.c \
......
......@@ -44,6 +44,7 @@
#include "http.h"
#include "agent-priv.h"
#include "socket-priv.h"
#include <string.h>
#include <stdlib.h>
......@@ -83,13 +84,6 @@ typedef struct {
} HttpPriv;
struct to_be_sent {
guint length;
gchar *buf;
NiceAddress to;
};
static void socket_close (NiceSocket *sock);
static gint socket_recv_messages (NiceSocket *sock,
NiceInputMessage *recv_messages, guint n_recv_messages);
......@@ -99,11 +93,6 @@ static gint socket_send_messages_reliable (NiceSocket *sock,
const NiceAddress *to, const NiceOutputMessage *messages, guint n_messages);
static gboolean socket_is_reliable (NiceSocket *sock);
static void add_to_be_sent (NiceSocket *sock, const NiceAddress *to,
const NiceOutputMessage *messages, guint n_messages);
static void free_to_be_sent (struct to_be_sent *tbs);
NiceSocket *
nice_http_socket_new (NiceSocket *base_socket,
NiceAddress *addr, gchar *username, gchar *password)
......@@ -198,8 +187,7 @@ socket_close (NiceSocket *sock)
if (priv->recv_buf)
g_free (priv->recv_buf);
g_queue_foreach (&priv->send_queue, (GFunc) free_to_be_sent, NULL);
g_queue_clear (&priv->send_queue);
nice_socket_free_send_queue (&priv->send_queue);
g_slice_free(HttpPriv, sock->priv);
}
......@@ -542,19 +530,13 @@ retry:
case HTTP_STATE_CONNECTED:
{
gsize len;
struct to_be_sent *tbs = NULL;
len = memcpy_ring_buffer_to_input_messages (priv,
recv_messages, n_recv_messages);
/* Send the pending data */
while ((tbs = g_queue_pop_head (&priv->send_queue))) {
/* We only queue reliable data */
nice_socket_send_reliable (priv->base_socket, &tbs->to,
tbs->length, tbs->buf);
g_free (tbs->buf);
g_slice_free (struct to_be_sent, tbs);
}
nice_socket_flush_send_queue (priv->base_socket,
&priv->send_queue);
return len;
}
......@@ -616,7 +598,7 @@ socket_send_messages_reliable (NiceSocket *sock, const NiceAddress *to,
} else if (priv->state == HTTP_STATE_ERROR) {
return -1;
} else {
add_to_be_sent (sock, to, messages, n_messages);
nice_socket_queue_send (&priv->send_queue, to, messages, n_messages);
}
return n_messages;
......@@ -629,55 +611,3 @@ socket_is_reliable (NiceSocket *sock)
return nice_socket_is_reliable (priv->base_socket);
}
static void
add_to_be_sent (NiceSocket *sock, const NiceAddress *to,
const NiceOutputMessage *messages, guint n_messages)
{
HttpPriv *priv = sock->priv;
guint i;
if (n_messages == 0)
return;
/* Compact the message’s buffers before queueing. */
for (i = 0; i < n_messages; i++) {
const NiceOutputMessage *message = &messages[i];
struct to_be_sent *tbs = NULL;
guint j;
gsize message_len_remaining = output_message_get_size (message);
gsize offset = 0;
if (message_len_remaining == 0)
continue;
tbs = g_slice_new0 (struct to_be_sent);
tbs->buf = g_malloc (message_len_remaining);
tbs->length = message_len_remaining;
if (to)
tbs->to = *to;
g_queue_push_tail (&priv->send_queue, tbs);
for (j = 0;
(message->n_buffers >= 0 && j < (guint) message->n_buffers) ||
(message->n_buffers < 0 && message->buffers[j].buffer != NULL);
j++) {
const GOutputVector *buffer = &message->buffers[j];
gsize len;
len = MIN (buffer->size, message_len_remaining);
memcpy (tbs->buf + offset, buffer->buffer, len);
message_len_remaining -= len;
offset += len;
}
}
}
static void
free_to_be_sent (struct to_be_sent *tbs)
{
g_free (tbs->buf);
g_slice_free (struct to_be_sent, tbs);
}
......@@ -44,6 +44,7 @@
#include "pseudossl.h"
#include "agent-priv.h"
#include "socket-priv.h"
#include <string.h>
......@@ -59,13 +60,6 @@ typedef struct {
} PseudoSSLPriv;
struct to_be_sent {
guint8 *buf; /* owned */
gsize length;
NiceAddress to;
};
static const gchar SSL_SERVER_GOOGLE_HANDSHAKE[] = {
0x16, 0x03, 0x01, 0x00, 0x4a, 0x02, 0x00, 0x00,
0x46, 0x03, 0x01, 0x42, 0x85, 0x45, 0xa7, 0x27,
......@@ -120,11 +114,6 @@ static gint socket_send_messages_reliable (NiceSocket *sock,
const NiceAddress *to, const NiceOutputMessage *messages, guint n_messages);
static gboolean socket_is_reliable (NiceSocket *sock);
static void add_to_be_sent (NiceSocket *sock, const NiceAddress *to,
const NiceOutputMessage *messages, guint n_messages);
static void free_to_be_sent (struct to_be_sent *tbs);
NiceSocket *
nice_pseudossl_socket_new (NiceSocket *base_socket,
NicePseudoSSLSocketCompatibility compatibility)
......@@ -176,8 +165,7 @@ socket_close (NiceSocket *sock)
if (priv->base_socket)
nice_socket_free (priv->base_socket);
g_queue_foreach (&priv->send_queue, (GFunc) free_to_be_sent, NULL);
g_queue_clear (&priv->send_queue);
nice_socket_free_send_queue (&priv->send_queue);
g_slice_free(PseudoSSLPriv, sock->priv);
}
......@@ -238,15 +226,8 @@ socket_recv_messages (NiceSocket *sock,
if (ret <= 0) {
return ret;
} else if (ret == 1 && server_handshake_valid(sock, &local_recv_buf)) {
struct to_be_sent *tbs = NULL;
priv->handshaken = TRUE;
while ((tbs = g_queue_pop_head (&priv->send_queue))) {
/* We only queue reliable data */
nice_socket_send_reliable (priv->base_socket, &tbs->to, tbs->length,
(const gchar *) tbs->buf);
g_free (tbs->buf);
g_slice_free (struct to_be_sent, tbs);
}
nice_socket_flush_send_queue (priv->base_socket, &priv->send_queue);
} else {
if (priv->base_socket)
nice_socket_free (priv->base_socket);
......@@ -294,7 +275,7 @@ socket_send_messages_reliable (NiceSocket *sock, const NiceAddress *to,
return nice_socket_send_messages_reliable (priv->base_socket, to, messages,
n_messages);
} else {
add_to_be_sent (sock, to, messages, n_messages);
nice_socket_queue_send (&priv->send_queue, to, messages, n_messages);
}
return n_messages;
}
......@@ -306,52 +287,3 @@ socket_is_reliable (NiceSocket *sock)
return nice_socket_is_reliable (priv->base_socket);
}
static void
add_to_be_sent (NiceSocket *sock, const NiceAddress *to,
const NiceOutputMessage *messages, guint n_messages)
{
PseudoSSLPriv *priv = sock->priv;
guint i;
for (i = 0; i < n_messages; i++) {
struct to_be_sent *tbs;
const NiceOutputMessage *message = &messages[i];
guint j;
gsize offset = 0;
gsize message_len;
tbs = g_slice_new0 (struct to_be_sent);
message_len = output_message_get_size (message);
/* Compact the buffer. */
tbs->buf = g_malloc (message_len);
tbs->length = message_len;
if (to != NULL)
tbs->to = *to;
g_queue_push_tail (&priv->send_queue, tbs);
for (j = 0;
(message->n_buffers >= 0 && j < (guint) message->n_buffers) ||
(message->n_buffers < 0 && message->buffers[j].buffer != NULL);
j++) {
const GOutputVector *buffer = &message->buffers[j];
gsize len;
len = MIN (message_len - offset, buffer->size);
memcpy (tbs->buf + offset, buffer->buffer, len);
offset += len;
}
g_assert (offset == message_len);
}
}
static void
free_to_be_sent (struct to_be_sent *tbs)
{
g_free (tbs->buf);
g_slice_free (struct to_be_sent, tbs);
}
/*
* This file is part of the Nice GLib ICE library.
*
* (C) 2008-2009 Collabora Ltd.
* Contact: Youness Alaoui
* (C) 2008-2009 Nokia Corporation. All rights reserved.
*
* The contents of this file are subject to the Mozilla Public License Version
* 1.1 (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
* http://www.mozilla.org/MPL/
*
* Software distributed under the License is distributed on an "AS IS" basis,
* WITHOUT WARRANTY OF ANY KIND, either express or implied. See the License
* for the specific language governing rights and limitations under the
* License.
*
* The Original Code is the Nice GLib ICE library.
*
* The Initial Developers of the Original Code are Collabora Ltd and Nokia
* Corporation. All Rights Reserved.
*
* Contributors:
* Youness Alaoui, Collabora Ltd.
*
* Alternatively, the contents of this file may be used under the terms of the
* the GNU Lesser General Public License Version 2.1 (the "LGPL"), in which
* case the provisions of LGPL are applicable instead of those above. If you
* wish to allow use of your version of this file only under the terms of the
* LGPL and not to allow others to use your version of this file under the
* MPL, indicate your decision by deleting the provisions above and replace
* them with the notice and other provisions required by the LGPL. If you do
* not delete the provisions above, a recipient may use your version of this
* file under either the MPL or the LGPL.
*/
#ifndef _SOCKET_PRIV_H
#define _SOCKET_PRIV_H
#include "socket.h"
G_BEGIN_DECLS
/**
* nice_socket_queue_send:
* @send_queue: The queue to add to
* @to : Destination
* @messages: Messages to queue
* @n_messages: Number of messages to queue
*
* Queue messages to be sent later into the GQueue
*/
void nice_socket_queue_send (GQueue *send_queue, const NiceAddress *to,
const NiceOutputMessage *messages, guint n_messages);
/**
* nice_socket_queue_send_with_callback:
* @send_queue: The queue to add to
* @message: The message to queue
* @message_offset: Number of bytes to skip in the message
* @message_len: Total length of the message
* @head: Whether to add the message to the head of the queue or the tail
* @gsock: The #GSocket to create the callback on
* @io_source: Pointer to #GSource pointer to store the created source
* @context: #GMainContext to attach the @io_source to
* @cb: Callback function to call when the @gsock is writable
* @user_data: User data for @cb
*
* Queue (partial) message to be sent later and create a source to call @cb
* when the @gsock becomes writable.
* The @message_offset can be used if a partial write happened and some bytes
* were already written, in which case @head should be set to TRUE to add the
* message to the head of the queue.
*/
void nice_socket_queue_send_with_callback (GQueue *send_queue,
const NiceOutputMessage *message, gsize message_offset, gsize message_len,
gboolean head, GSocket *gsock, GSource **io_source, GMainContext *context,
GSourceFunc cb, gpointer user_data);
/**
* nice_socket_flush_send_queue:
* @base_socket: Base socket to send on
* @send_queue: Queue to flush
*
* Send all the queued messages reliably to the base socket. We assume only
* reliable messages were queued and the underlying socket will handle the
* send.
*/
void nice_socket_flush_send_queue (NiceSocket *base_socket, GQueue *send_queue);
/**
* nice_socket_flush_send_queue_to_socket:
* @gsock: GSocket to send on
* @send_queue: Queue to flush
*
* Send all the queued messages to the socket. If any message fails to be sent
* it will be readded to the queue and #FALSE will be returned, in which case
* the IO source must be kept to allow flushing the next time the socket
* is writable.
* If the queue gets flushed, #TRUE will be returned, in which case, the IO
* source should be destroyed.
*
* Returns: #TRUE if the queue was emptied, #FALSE if the socket would block.
*/
gboolean nice_socket_flush_send_queue_to_socket (GSocket *gsock,
GQueue *send_queue);
/**
* nice_socket_free_send_queue:
* @send_queue: The send queue
*
* Frees every item in the send queue without sending them and empties the queue
*/
void nice_socket_free_send_queue (GQueue *send_queue);
G_END_DECLS
#endif /* _SOCKET_PRIV_H */
......@@ -41,7 +41,22 @@
#include <glib.h>
#include "socket.h"
#include "socket-priv.h"
#include "agent-priv.h"
#include <string.h>
#ifndef G_OS_WIN32
#include <unistd.h>
#endif
typedef struct _NiceSocketQueuedSend NiceSocketQueuedSend;
struct _NiceSocketQueuedSend {
guint8 *buf; /* owned */
gsize length;
NiceAddress to;
};
/**
* nice_socket_recv_messages:
......@@ -225,3 +240,175 @@ nice_socket_free (NiceSocket *sock)
g_slice_free (NiceSocket,sock);
}
}
static void
nice_socket_free_queued_send (NiceSocketQueuedSend *tbs)
{
g_free (tbs->buf);
g_slice_free (NiceSocketQueuedSend, tbs);
}
void
nice_socket_queue_send (GQueue *send_queue, const NiceAddress *to,
const NiceOutputMessage *messages, guint n_messages)
{
guint i;
if (n_messages == 0)
return;
/* Compact the message’s buffers before queueing. */
for (i = 0; i < n_messages; i++) {
NiceSocketQueuedSend *tbs;
const NiceOutputMessage *message = &messages[i];
gsize message_len_remaining = output_message_get_size (message);
guint j;
gsize offset = 0;
if (message_len_remaining == 0)
continue;
/* Compact the buffer. */
tbs = g_slice_new0 (NiceSocketQueuedSend);
tbs->buf = g_malloc (message_len_remaining);
tbs->length = message_len_remaining;
if (to)
tbs->to = *to;
else
memset (&tbs->to, 0, sizeof(NiceAddress));
g_queue_push_tail (send_queue, tbs);
for (j = 0;
(message->n_buffers >= 0 && j < (guint) message->n_buffers) ||
(message->n_buffers < 0 && message->buffers[j].buffer != NULL);
j++) {
const GOutputVector *buffer = &message->buffers[j];
gsize len;
len = MIN (buffer->size, message_len_remaining);
memcpy (tbs->buf + offset, buffer->buffer, len);
message_len_remaining -= len;
offset += len;
}
g_assert (offset == tbs->length);
}
}
void nice_socket_queue_send_with_callback (GQueue *send_queue,
const NiceOutputMessage *message, gsize message_offset, gsize message_len,
gboolean head, GSocket *gsock, GSource **io_source, GMainContext *context,
GSourceFunc cb, gpointer user_data)
{
NiceSocketQueuedSend *tbs;
guint j;
gsize offset = 0;
if (message_offset >= message_len)
return;
tbs = g_slice_new0 (NiceSocketQueuedSend);
tbs->length = message_len - message_offset;
tbs->buf = g_malloc (tbs->length);
if (head)
g_queue_push_head (send_queue, tbs);
else
g_queue_push_tail (send_queue, tbs);
if (io_source && gsock && context && cb && *io_source == NULL) {
*io_source = g_socket_create_source(gsock, G_IO_OUT, NULL);
g_source_set_callback (*io_source, (GSourceFunc) cb, user_data, NULL);
g_source_attach (*io_source, context);
}
/* Move the data into the buffer. */
for (j = 0;
(message->n_buffers >= 0 && j < (guint) message->n_buffers) ||
(message->n_buffers < 0 && message->buffers[j].buffer != NULL);
j++) {
const GOutputVector *buffer = &message->buffers[j];
gsize len;
/* Skip this buffer if it’s within @message_offset. */
if (buffer->size <= message_offset) {
message_offset -= buffer->size;
continue;
}
len = MIN (tbs->length - offset, buffer->size - message_offset);
memcpy (tbs->buf + offset, (guint8 *) buffer->buffer + message_offset, len);
offset += len;
if (message_offset >= len)
message_offset -= len;
else
message_offset = 0;
}
}
void nice_socket_flush_send_queue (NiceSocket *base_socket, GQueue *send_queue)
{
NiceSocketQueuedSend *tbs;
while ((tbs = g_queue_pop_head (send_queue))) {
NiceAddress *to = &tbs->to;
if (!nice_address_is_valid (to))
to = NULL;
/* We only queue reliable data */
nice_socket_send_reliable (base_socket, to,
tbs->length, (const gchar *) tbs->buf);
nice_socket_free_queued_send (tbs);
}
}
gboolean nice_socket_flush_send_queue_to_socket (GSocket *gsock,
GQueue *send_queue)
{
NiceSocketQueuedSend *tbs;
GError *gerr = NULL;
while ((tbs = g_queue_pop_head (send_queue)) != NULL) {
int ret;
GOutputVector local_bufs = { tbs->buf, tbs->length };
ret = g_socket_send_message (gsock, NULL, &local_bufs, 1, NULL, 0,
G_SOCKET_MSG_NONE, NULL, &gerr);
if (ret < 0) {
if (g_error_matches (gerr, G_IO_ERROR, G_IO_ERROR_WOULD_BLOCK)) {
GOutputVector local_buf = { tbs->buf, tbs->length };
NiceOutputMessage local_message = {&local_buf, 1};
nice_socket_queue_send_with_callback (send_queue, &local_message,
0, local_buf.size, TRUE, NULL, NULL, NULL, NULL, NULL);
nice_socket_free_queued_send (tbs);
g_error_free (gerr);
return FALSE;
}
g_clear_error (&gerr);
} else if (ret < (int) tbs->length) {
GOutputVector local_buf = { tbs->buf + ret, tbs->length - ret };
NiceOutputMessage local_message = {&local_buf, 1};
nice_socket_queue_send_with_callback (send_queue, &local_message,
0, local_buf.size, TRUE, NULL, NULL, NULL, NULL, NULL);
nice_socket_free_queued_send (tbs);
return FALSE;
}
nice_socket_free_queued_send (tbs);
}
return TRUE;
}
void
nice_socket_free_send_queue (GQueue *send_queue)
{
g_queue_foreach (send_queue, (GFunc) nice_socket_free_queued_send, NULL);
g_queue_clear (send_queue);
}
......@@ -44,6 +44,7 @@
#include "socks5.h"
#include "agent-priv.h"
#include "socket-priv.h"
#include <string.h>
......@@ -69,13 +70,6 @@ typedef struct {
} Socks5Priv;
struct to_be_sent {
guint8 *buf; /* owned */
gsize length;
NiceAddress to;
};
static void socket_close (NiceSocket *sock);
static gint socket_recv_messages (NiceSocket *sock,
NiceInputMessage *recv_messages, guint n_recv_messages);
......@@ -85,10 +79,6 @@ static gint socket_send_messages_reliable (NiceSocket *sock,
const NiceAddress *to, const NiceOutputMessage *messages, guint n_messages);
static gboolean socket_is_reliable (NiceSocket *sock);
static void add_to_be_sent (NiceSocket *sock, const NiceAddress *to,
const NiceOutputMessage *messages, guint n_messages);
static void free_to_be_sent (struct to_be_sent *tbs);
NiceSocket *
nice_socks5_socket_new (NiceSocket *base_socket,
......@@ -157,8 +147,7 @@ socket_close (NiceSocket *sock)
if (priv->password)
g_free (priv->password);
g_queue_foreach (&priv->send_queue, (GFunc) free_to_be_sent, NULL);
g_queue_clear (&priv->send_queue);
nice_socket_free_send_queue (&priv->send_queue);
g_slice_free(Socks5Priv, sock->priv);
}
......@@ -309,7 +298,6 @@ socket_recv_messages (NiceSocket *sock,
switch (data[1]) {
case 0x00:
if (data[2] == 0x00) {
struct to_be_sent *tbs = NULL;
switch (data[3]) {
case 0x01: /* IPV4 bound address */
local_recv_buf.size = 6;
......@@ -333,13 +321,8 @@ socket_recv_messages (NiceSocket *sock,
/* Unsupported address type */
goto error;
}
while ((tbs = g_queue_pop_head (&priv->send_queue))) {
/* We only queue reliable data */
nice_socket_send_reliable (priv->base_socket, &tbs->to,
tbs->length, (const gchar *) tbs->buf);
g_free (tbs->buf);
g_slice_free (struct to_be_sent, tbs);
}
nice_socket_flush_send_queue (priv->base_socket,
&priv->send_queue);
priv->state = SOCKS_STATE_CONNECTED;
} else {
/* Wrong reserved value */
......@@ -462,7 +445,7 @@ socket_send_messages_reliable (NiceSocket *sock, const NiceAddress *to,
} else if (priv->state == SOCKS_STATE_ERROR) {
return -1;
} else {
add_to_be_sent (sock, to, messages, n_messages);
nice_socket_queue_send (&priv->send_queue, to, messages, n_messages);
}
return n_messages;
}
......@@ -476,49 +459,3 @@ socket_is_reliable (NiceSocket *sock)
return nice_socket_is_reliable (priv->base_socket);
}
static void
add_to_be_sent (NiceSocket *sock, const NiceAddress *to,
const NiceOutputMessage *messages, guint n_messages)
{
Socks5Priv *priv = sock->priv;
guint i;
for (i = 0; i < n_messages; i++) {
struct to_be_sent *tbs;
const NiceOutputMessage *message = &messages[i];
guint j;
gsize offset = 0;
tbs = g_slice_new0 (struct to_be_sent);
/* Compact the buffer. */
tbs->length = output_message_get_size (message);
tbs->buf = g_malloc (tbs->length);
if (to != NULL)
tbs->to = *to;
g_queue_push_tail (&priv->send_queue, tbs);
for (j = 0;
(message->n_buffers >= 0 && j < (guint) message->n_buffers) ||
(message->n_buffers < 0 && message->buffers[j].buffer != NULL);
j++) {
const GOutputVector *buffer = &message->buffers[j];
gsize len;
len = MIN (tbs->length - offset, buffer->size);
memcpy (tbs->buf + offset, buffer->buffer, len);
offset += len;
}
g_assert (offset == tbs->length);
}
}
static void
free_to_be_sent (struct to_be_sent *tbs)
{
g_free (tbs->buf);
g_slice_free (struct to_be_sent, tbs);
}
......@@ -44,6 +44,7 @@
#include "tcp-bsd.h"
#include "agent-priv.h"
#include "socket-priv.h"
#include <string.h>
#include <errno.h>
......@@ -62,11 +63,6 @@ typedef struct {
gboolean reliable;
} TcpPriv;
struct to_be_sent {
guint8 *buf;
gsize length;
};
#define MAX_QUEUE_LENGTH 20
static void socket_close (NiceSocket *sock);
......@@ -78,10 +74,6 @@ static gint socket_send_messages_reliable (NiceSocket *sock,
const NiceAddress *to, const NiceOutputMessage *messages, guint n_messages);
static gboolean socket_is_reliable (NiceSocket *sock);
static void add_to_be_sent (NiceSocket *sock, const NiceOutputMessage *message,
gsize message_offset, gsize message_len, gboolean head);
static void free_to_be_sent (struct to_be_sent *tbs);
static gboolean socket_send_more (GSocket *gsocket, GIOCondition condition,
gpointer data);
......@@ -201,8 +193,8 @@ socket_close (NiceSocket *sock)
g_source_destroy (priv->io_source);
g_source_unref (priv->io_source);
}
g_queue_foreach (&priv->send_queue, (GFunc) free_to_be_sent, NULL);
g_queue_clear (&priv->send_queue);
nice_socket_free_send_queue (&priv->send_queue);
if (priv->context)
g_main_context_unref (priv->context);
......@@ -287,21 +279,27 @@ socket_send_message (NiceSocket *sock,
if (g_error_matches (gerr, G_IO_ERROR, G_IO_ERROR_WOULD_BLOCK) ||
g_error_matches (gerr, G_IO_ERROR, G_IO_ERROR_FAILED)) {
/* Queue the message and send it later. */
add_to_be_sent (sock, message, 0, message_len, FALSE);
nice_socket_queue_send_with_callback (&priv->send_queue,
message, 0, message_len, FALSE, sock->fileno, &priv->io_source,
priv->context, (GSourceFunc) socket_send_more, sock);
ret = message_len;
}
g_error_free (gerr);