tcp-bsd.c 12.8 KB
Newer Older
1 2 3
/*
 * This file is part of the Nice GLib ICE library.
 *
4 5 6
 * (C) 2008-2009 Collabora Ltd.
 *  Contact: Youness Alaoui
 * (C) 2008-2009 Nokia Corporation. All rights reserved.
7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23
 *
 * The contents of this file are subject to the Mozilla Public License Version
 * 1.1 (the "License"); you may not use this file except in compliance with
 * the License. You may obtain a copy of the License at
 * http://www.mozilla.org/MPL/
 *
 * Software distributed under the License is distributed on an "AS IS" basis,
 * WITHOUT WARRANTY OF ANY KIND, either express or implied. See the License
 * for the specific language governing rights and limitations under the
 * License.
 *
 * The Original Code is the Nice GLib ICE library.
 *
 * The Initial Developers of the Original Code are Collabora Ltd and Nokia
 * Corporation. All Rights Reserved.
 *
 * Contributors:
24
 *   Youness Alaoui, Collabora Ltd.
25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60
 *
 * Alternatively, the contents of this file may be used under the terms of the
 * the GNU Lesser General Public License Version 2.1 (the "LGPL"), in which
 * case the provisions of LGPL are applicable instead of those above. If you
 * wish to allow use of your version of this file only under the terms of the
 * LGPL and not to allow others to use your version of this file under the
 * MPL, indicate your decision by deleting the provisions above and replace
 * them with the notice and other provisions required by the LGPL. If you do
 * not delete the provisions above, a recipient may use your version of this
 * file under either the MPL or the LGPL.
 */

/*
 * Implementation of TCP relay socket interface using TCP Berkeley sockets. (See
 * http://en.wikipedia.org/wiki/Berkeley_sockets.)
 */
#ifdef HAVE_CONFIG_H
# include "config.h"
#endif

#include "tcp-bsd.h"
#include "agent-priv.h"

#include <string.h>
#include <errno.h>
#include <fcntl.h>

#ifndef G_OS_WIN32
#include <unistd.h>
#endif

typedef struct {
  NiceAddress server_addr;
  GQueue send_queue;
  GMainContext *context;
  GSource *io_source;
61
  gboolean error;
62 63 64
} TcpPriv;

struct to_be_sent {
65 66
  guint8 *buf;
  gsize length;
67
  gboolean can_drop;
68 69
};

70
#define MAX_QUEUE_LENGTH 20
71 72

static void socket_close (NiceSocket *sock);
73 74
static gint socket_recv_messages (NiceSocket *sock,
    NiceInputMessage *recv_messages, guint n_recv_messages);
75 76
static gint socket_send_messages (NiceSocket *sock,
    const NiceOutputMessage *messages, guint n_messages);
77 78 79
static gboolean socket_is_reliable (NiceSocket *sock);


80 81
static void add_to_be_sent (NiceSocket *sock, const NiceOutputMessage *message,
    gsize message_offset, gboolean head);
82
static void free_to_be_sent (struct to_be_sent *tbs);
83
static gboolean socket_send_more (GSocket *gsocket, GIOCondition condition,
84 85 86
    gpointer data);

NiceSocket *
87
nice_tcp_bsd_socket_new (GMainContext *ctx, NiceAddress *addr)
88
{
89 90 91 92
  union {
    struct sockaddr_storage storage;
    struct sockaddr addr;
  } name;
Olivier Crête's avatar
Olivier Crête committed
93
  NiceSocket *sock;
94
  TcpPriv *priv;
Livio Madaro's avatar
Livio Madaro committed
95 96
  GSocket *gsock = NULL;
  GError *gerr = NULL;
97
  gboolean gret = FALSE;
Livio Madaro's avatar
Livio Madaro committed
98
  GSocketAddress *gaddr;
99

100 101 102
  if (addr == NULL) {
    /* We can't connect a tcp socket with no destination address */
    return NULL;
103 104
  }

Olivier Crête's avatar
Olivier Crête committed
105 106
  sock = g_slice_new0 (NiceSocket);

107
  nice_address_copy_to_sockaddr (addr, &name.addr);
108

Livio Madaro's avatar
Livio Madaro committed
109
  if (gsock == NULL) {
110
    if (name.storage.ss_family == AF_UNSPEC || name.storage.ss_family == AF_INET) {
Youness Alaoui's avatar
Youness Alaoui committed
111 112
      gsock = g_socket_new (G_SOCKET_FAMILY_IPV4, G_SOCKET_TYPE_STREAM,
          G_SOCKET_PROTOCOL_TCP, NULL);
Livio Madaro's avatar
Livio Madaro committed
113

114
      name.storage.ss_family = AF_INET;
115
#ifdef HAVE_SA_LEN
116
      name.storage.ss_len = sizeof (struct sockaddr_in);
117
#endif
118
    } else if (name.storage.ss_family == AF_INET6) {
Youness Alaoui's avatar
Youness Alaoui committed
119 120
      gsock = g_socket_new (G_SOCKET_FAMILY_IPV6, G_SOCKET_TYPE_STREAM,
          G_SOCKET_PROTOCOL_TCP, NULL);
121
      name.storage.ss_family = AF_INET6;
122
#ifdef HAVE_SA_LEN
123
      name.storage.ss_len = sizeof (struct sockaddr_in6);
124
#endif
125
    }
126 127
  }

Youness Alaoui's avatar
Youness Alaoui committed
128
  if (gsock == NULL) {
129 130
    g_slice_free (NiceSocket, sock);
    return NULL;
Youness Alaoui's avatar
Youness Alaoui committed
131
  }
132

Livio Madaro's avatar
Livio Madaro committed
133
  /* GSocket: All socket file descriptors are set to be close-on-exec. */
134 135
  g_socket_set_blocking (gsock, false);

136
  gaddr = g_socket_address_new_from_native (&name.addr, sizeof (name));
137 138 139 140

  if (gaddr != NULL) {
    gret = g_socket_connect (gsock, gaddr, NULL, &gerr);
    g_object_unref (gaddr);
141 142
  }

143 144 145 146 147 148 149 150 151
  if (gret == FALSE) {
    if (g_error_matches (gerr, G_IO_ERROR, G_IO_ERROR_PENDING) == FALSE) {
      g_socket_close (gsock, NULL);
      g_object_unref (gsock);
      g_slice_free (NiceSocket, sock);
      return NULL;
    }
    g_error_free(gerr);
  }
Livio Madaro's avatar
Livio Madaro committed
152

153 154
  gaddr = g_socket_get_local_address (gsock, NULL);
  if (gaddr == NULL ||
155
      !g_socket_address_to_native (gaddr, &name.addr, sizeof (name), NULL)) {
156
    g_slice_free (NiceSocket, sock);
Youness Alaoui's avatar
Youness Alaoui committed
157 158
    g_socket_close (gsock, NULL);
    g_object_unref (gsock);
159 160
    return NULL;
  }
161
  g_object_unref (gaddr);
162

163
  nice_address_set_from_sockaddr (&sock->addr, &name.addr);
Youness Alaoui's avatar
Youness Alaoui committed
164

165 166
  sock->priv = priv = g_slice_new0 (TcpPriv);

167
  priv->context = g_main_context_ref (ctx);
168
  priv->server_addr = *addr;
169
  priv->error = FALSE;
170

Livio Madaro's avatar
Livio Madaro committed
171
  sock->fileno = gsock;
172
  sock->send_messages = socket_send_messages;
173
  sock->recv_messages = socket_recv_messages;
174 175 176 177 178 179 180 181 182 183 184 185
  sock->is_reliable = socket_is_reliable;
  sock->close = socket_close;

  return sock;
}


static void
socket_close (NiceSocket *sock)
{
  TcpPriv *priv = sock->priv;

Youness Alaoui's avatar
Youness Alaoui committed
186 187 188
  if (sock->fileno) {
    g_socket_close (sock->fileno, NULL);
    g_object_unref (sock->fileno);
Livio Madaro's avatar
Livio Madaro committed
189 190
    sock->fileno = NULL;
  }
191 192 193 194 195 196 197
  if (priv->io_source) {
    g_source_destroy (priv->io_source);
    g_source_unref (priv->io_source);
  }
  g_queue_foreach (&priv->send_queue, (GFunc) free_to_be_sent, NULL);
  g_queue_clear (&priv->send_queue);

198 199 200
  if (priv->context)
    g_main_context_unref (priv->context);

201 202 203 204
  g_slice_free(TcpPriv, sock->priv);
}

static gint
205 206
socket_recv_messages (NiceSocket *sock,
    NiceInputMessage *recv_messages, guint n_recv_messages)
207 208
{
  TcpPriv *priv = sock->priv;
209
  guint i;
210

211 212 213 214
  /* Don't try to access the socket if it had an error */
  if (priv->error)
    return -1;

215 216 217 218
  for (i = 0; i < n_recv_messages; i++) {
    gint flags = G_SOCKET_MSG_NONE;
    GError *gerr = NULL;
    gssize len;
219

220 221 222 223 224 225 226 227 228 229 230 231
    len = g_socket_receive_message (sock->fileno, NULL,
        recv_messages[i].buffers, recv_messages[i].n_buffers,
        NULL, NULL, &flags, NULL, &gerr);

    recv_messages[i].length = MAX (len, 0);

    /* recv returns 0 when the peer performed a shutdown.. we must return -1
     * here so that the agent destroys the g_source */
    if (len == 0) {
      priv->error = TRUE;
      break;
    }
232

233 234 235
    if (len < 0) {
      if (g_error_matches (gerr, G_IO_ERROR, G_IO_ERROR_WOULD_BLOCK))
        len = 0;
Livio Madaro's avatar
Livio Madaro committed
236

237 238 239 240 241 242 243 244 245
      g_error_free (gerr);
      return len;
    }

    if (recv_messages[i].from)
      *recv_messages[i].from = priv->server_addr;

    if (len <= 0)
      break;
246 247
  }

248 249 250 251 252
  /* Was there an error processing the first message? */
  if (priv->error && i == 0)
    return -1;

  return i;
253 254
}

255 256
static gssize
socket_send_message (NiceSocket *sock, const NiceOutputMessage *message)
257 258
{
  TcpPriv *priv = sock->priv;
259
  gssize ret;
Livio Madaro's avatar
Livio Madaro committed
260
  GError *gerr = NULL;
261

262
  /* Don't try to access the socket if it had an error, otherwise we risk a
263
   * crash with SIGPIPE (Broken pipe) */
264 265 266
  if (priv->error)
    return -1;

267
  /* First try to send the data, don't send it later if it can be sent now
268
   * this way we avoid allocating memory on every send */
269
  if (g_queue_is_empty (&priv->send_queue)) {
270 271 272
    ret = g_socket_send_message (sock->fileno, NULL, message->buffers,
        message->n_buffers, NULL, 0, G_SOCKET_MSG_NONE, NULL, &gerr);

273
    if (ret < 0) {
274 275 276 277 278
      if (g_error_matches (gerr, G_IO_ERROR, G_IO_ERROR_WOULD_BLOCK) ||
          g_error_matches (gerr, G_IO_ERROR, G_IO_ERROR_FAILED)) {
        /* Queue the message and send it later. */
        add_to_be_sent (sock, message, 0, FALSE);
        ret = message->length;
279
      }
280 281 282 283 284 285

      g_error_free (gerr);
    } else if ((gsize) ret < message->length) {
      /* Partial send. */
      add_to_be_sent (sock, message, ret, TRUE);
      ret = message->length;
286 287
    }
  } else {
288 289 290
    /* FIXME: This dropping will break http/socks5/etc
     * We probably need a way to the upper layer to control reliability
     */
291 292 293
    /* If the queue is too long, drop whatever packets we can. */
    if (g_queue_get_length (&priv->send_queue) >= MAX_QUEUE_LENGTH) {
      guint peek_idx = 0;
294
      struct to_be_sent *tbs = NULL;
295

296 297 298
      while ((tbs = g_queue_peek_nth (&priv->send_queue, peek_idx)) != NULL) {
        if (tbs->can_drop) {
          tbs = g_queue_pop_nth (&priv->send_queue, peek_idx);
299
          free_to_be_sent (tbs);
300 301 302 303 304 305
          break;
        } else {
          peek_idx++;
        }
      }
    }
306 307 308 309

    /* Queue the message and send it later. */
    add_to_be_sent (sock, message, 0, FALSE);
    ret = message->length;
310 311
  }

312 313 314 315 316 317 318 319 320 321 322 323 324 325 326 327 328 329 330 331 332 333 334 335 336 337 338 339
  return ret;
}

/* Data sent to this function must be a single entity because buffers can be
 * dropped if the bandwidth isn't fast enough. So do not send a message in
 * multiple chunks. */
static gint
socket_send_messages (NiceSocket *sock, const NiceOutputMessage *messages,
    guint n_messages)
{
  guint i;

  for (i = 0; i < n_messages; i++) {
    const NiceOutputMessage *message = &messages[i];
    gssize len;

    len = socket_send_message (sock, message);

    if (len < 0) {
      /* Error. */
      return len;
    } else if (len == 0) {
      /* EWOULDBLOCK. */
      break;
    }
  }

  return i;
340 341 342 343 344 345 346 347 348 349 350 351 352 353 354 355 356 357
}

static gboolean
socket_is_reliable (NiceSocket *sock)
{
  return TRUE;
}


/*
 * Returns:
 * -1 = error
 * 0 = have more to send
 * 1 = sent everything
 */

static gboolean
socket_send_more (
358
  GSocket *gsocket,
359 360 361 362 363 364
  GIOCondition condition,
  gpointer data)
{
  NiceSocket *sock = (NiceSocket *) data;
  TcpPriv *priv = sock->priv;
  struct to_be_sent *tbs = NULL;
Livio Madaro's avatar
Livio Madaro committed
365
  GError *gerr = NULL;
366

367 368 369 370 371 372 373 374
  agent_lock ();

  if (g_source_is_destroyed (g_main_current_source ())) {
    nice_debug ("Source was destroyed. "
        "Avoided race condition in tcp-bsd.c:socket_send_more");
    agent_unlock ();
    return FALSE;
  }
375

376
  while ((tbs = g_queue_pop_head (&priv->send_queue)) != NULL) {
377 378
    int ret;

Livio Madaro's avatar
Livio Madaro committed
379 380 381
    if(condition & G_IO_HUP) {
      /* connection hangs up */
      ret = -1;
Youness Alaoui's avatar
Youness Alaoui committed
382
    } else {
383 384 385
      GOutputVector local_bufs = { tbs->buf, tbs->length };
      ret = g_socket_send_message (sock->fileno, NULL, &local_bufs, 1, NULL, 0,
          G_SOCKET_MSG_NONE, NULL, &gerr);
Youness Alaoui's avatar
Youness Alaoui committed
386
    }
387 388

    if (ret < 0) {
389
      if (gerr != NULL &&
Youness Alaoui's avatar
Youness Alaoui committed
390
          g_error_matches (gerr, G_IO_ERROR, G_IO_ERROR_WOULD_BLOCK)) {
391 392 393 394 395 396 397
        GOutputVector local_buf = { tbs->buf, tbs->length };
        NiceOutputMessage local_message = {
          &local_buf, 1, NULL, local_buf.size
        };

        add_to_be_sent (sock, &local_message, 0, TRUE);
        free_to_be_sent (tbs);
Youness Alaoui's avatar
Youness Alaoui committed
398
        g_error_free (gerr);
399 400
        break;
      }
Youness Alaoui's avatar
Youness Alaoui committed
401
      g_error_free (gerr);
402
    } else if (ret < (int) tbs->length) {
403 404 405 406 407 408 409
      GOutputVector local_buf = { tbs->buf + ret, tbs->length - ret };
      NiceOutputMessage local_message = {
        &local_buf, 1, NULL, local_buf.size
      };

      add_to_be_sent (sock, &local_message, 0, TRUE);
      free_to_be_sent (tbs);
410 411 412
      break;
    }

413
    free_to_be_sent (tbs);
414 415 416 417 418 419 420
  }

  if (g_queue_is_empty (&priv->send_queue)) {
    g_source_destroy (priv->io_source);
    g_source_unref (priv->io_source);
    priv->io_source = NULL;

421
    agent_unlock ();
422 423 424
    return FALSE;
  }

425
  agent_unlock ();
426 427 428 429
  return TRUE;
}


430 431
/* Queue data starting at byte offset @message_offset from @message’s
 * buffers. */
432
static void
433 434
add_to_be_sent (NiceSocket *sock, const NiceOutputMessage *message,
    gsize message_offset, gboolean head)
435 436
{
  TcpPriv *priv = sock->priv;
437 438 439
  struct to_be_sent *tbs;
  guint j;
  gsize offset = 0;
440

441
  if (message_offset >= message->length)
442 443
    return;

444
  tbs = g_slice_new0 (struct to_be_sent);
445 446
  tbs->buf = g_malloc (message->length - message_offset);
  tbs->length = message->length - message_offset;
447
  tbs->can_drop = !head;
448

449 450 451 452 453
  if (head)
    g_queue_push_head (&priv->send_queue, tbs);
  else
    g_queue_push_tail (&priv->send_queue, tbs);

Livio Madaro's avatar
Livio Madaro committed
454 455
  if (priv->io_source == NULL) {
    priv->io_source = g_socket_create_source(sock->fileno, G_IO_OUT, NULL);
456 457 458 459
    g_source_set_callback (priv->io_source, (GSourceFunc) socket_send_more,
        sock, NULL);
    g_source_attach (priv->io_source, priv->context);
  }
460 461 462 463 464 465 466 467 468 469 470 471 472 473 474 475 476 477 478 479

  /* Move the data into the buffer. */
  for (j = 0;
       (message->n_buffers >= 0 && j < (guint) message->n_buffers) ||
       (message->n_buffers < 0 && message->buffers[j].buffer != NULL);
       j++) {
    const GOutputVector *buffer = &message->buffers[j];
    gsize len;

    /* Skip this buffer if it’s within @message_offset. */
    if (buffer->size <= message_offset) {
      message_offset -= buffer->size;
      continue;
    }

    len = MIN (tbs->length - offset, buffer->size - message_offset);
    memcpy (tbs->buf + offset, (guint8 *) buffer->buffer + message_offset, len);
    offset += len;
    message_offset -= len;
  }
480 481 482 483 484 485 486 487 488 489 490
}



static void
free_to_be_sent (struct to_be_sent *tbs)
{
  g_free (tbs->buf);
  g_slice_free (struct to_be_sent, tbs);
}