tcp-bsd.c 13 KB
Newer Older
1 2 3
/*
 * This file is part of the Nice GLib ICE library.
 *
4 5 6
 * (C) 2008-2009 Collabora Ltd.
 *  Contact: Youness Alaoui
 * (C) 2008-2009 Nokia Corporation. All rights reserved.
7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23
 *
 * The contents of this file are subject to the Mozilla Public License Version
 * 1.1 (the "License"); you may not use this file except in compliance with
 * the License. You may obtain a copy of the License at
 * http://www.mozilla.org/MPL/
 *
 * Software distributed under the License is distributed on an "AS IS" basis,
 * WITHOUT WARRANTY OF ANY KIND, either express or implied. See the License
 * for the specific language governing rights and limitations under the
 * License.
 *
 * The Original Code is the Nice GLib ICE library.
 *
 * The Initial Developers of the Original Code are Collabora Ltd and Nokia
 * Corporation. All Rights Reserved.
 *
 * Contributors:
24
 *   Youness Alaoui, Collabora Ltd.
25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60
 *
 * Alternatively, the contents of this file may be used under the terms of the
 * the GNU Lesser General Public License Version 2.1 (the "LGPL"), in which
 * case the provisions of LGPL are applicable instead of those above. If you
 * wish to allow use of your version of this file only under the terms of the
 * LGPL and not to allow others to use your version of this file under the
 * MPL, indicate your decision by deleting the provisions above and replace
 * them with the notice and other provisions required by the LGPL. If you do
 * not delete the provisions above, a recipient may use your version of this
 * file under either the MPL or the LGPL.
 */

/*
 * Implementation of TCP relay socket interface using TCP Berkeley sockets. (See
 * http://en.wikipedia.org/wiki/Berkeley_sockets.)
 */
#ifdef HAVE_CONFIG_H
# include "config.h"
#endif

#include "tcp-bsd.h"
#include "agent-priv.h"

#include <string.h>
#include <errno.h>
#include <fcntl.h>

#ifndef G_OS_WIN32
#include <unistd.h>
#endif

typedef struct {
  NiceAddress server_addr;
  GQueue send_queue;
  GMainContext *context;
  GSource *io_source;
61
  gboolean error;
62 63 64
} TcpPriv;

struct to_be_sent {
65 66
  guint8 *buf;
  gsize length;
67
  gboolean can_drop;
68 69
};

70
#define MAX_QUEUE_LENGTH 20
71 72

static void socket_close (NiceSocket *sock);
73 74
static gint socket_recv_messages (NiceSocket *sock,
    NiceInputMessage *recv_messages, guint n_recv_messages);
75
static gint socket_send_messages (NiceSocket *sock, const NiceAddress *to,
76
    const NiceOutputMessage *messages, guint n_messages);
77 78 79
static gboolean socket_is_reliable (NiceSocket *sock);


80
static void add_to_be_sent (NiceSocket *sock, const NiceOutputMessage *message,
81
    gsize message_offset, gsize message_len, gboolean head);
82
static void free_to_be_sent (struct to_be_sent *tbs);
83
static gboolean socket_send_more (GSocket *gsocket, GIOCondition condition,
84 85 86
    gpointer data);

NiceSocket *
87
nice_tcp_bsd_socket_new (GMainContext *ctx, NiceAddress *addr)
88
{
89 90 91 92
  union {
    struct sockaddr_storage storage;
    struct sockaddr addr;
  } name;
93
  NiceSocket *sock;
94
  TcpPriv *priv;
95 96
  GSocket *gsock = NULL;
  GError *gerr = NULL;
97
  gboolean gret = FALSE;
98
  GSocketAddress *gaddr;
99

100 101 102
  if (addr == NULL) {
    /* We can't connect a tcp socket with no destination address */
    return NULL;
103 104
  }

105 106
  sock = g_slice_new0 (NiceSocket);

107
  nice_address_copy_to_sockaddr (addr, &name.addr);
108

109
  if (gsock == NULL) {
110
    if (name.storage.ss_family == AF_UNSPEC || name.storage.ss_family == AF_INET) {
Youness Alaoui's avatar
Youness Alaoui committed
111 112
      gsock = g_socket_new (G_SOCKET_FAMILY_IPV4, G_SOCKET_TYPE_STREAM,
          G_SOCKET_PROTOCOL_TCP, NULL);
113

114
      name.storage.ss_family = AF_INET;
115
#ifdef HAVE_SA_LEN
116
      name.storage.ss_len = sizeof (struct sockaddr_in);
117
#endif
118
    } else if (name.storage.ss_family == AF_INET6) {
Youness Alaoui's avatar
Youness Alaoui committed
119 120
      gsock = g_socket_new (G_SOCKET_FAMILY_IPV6, G_SOCKET_TYPE_STREAM,
          G_SOCKET_PROTOCOL_TCP, NULL);
121
      name.storage.ss_family = AF_INET6;
122
#ifdef HAVE_SA_LEN
123
      name.storage.ss_len = sizeof (struct sockaddr_in6);
124
#endif
125
    }
126 127
  }

Youness Alaoui's avatar
Youness Alaoui committed
128
  if (gsock == NULL) {
129 130
    g_slice_free (NiceSocket, sock);
    return NULL;
Youness Alaoui's avatar
Youness Alaoui committed
131
  }
132

133
  /* GSocket: All socket file descriptors are set to be close-on-exec. */
134 135
  g_socket_set_blocking (gsock, false);

136
  gaddr = g_socket_address_new_from_native (&name.addr, sizeof (name));
137 138 139 140

  if (gaddr != NULL) {
    gret = g_socket_connect (gsock, gaddr, NULL, &gerr);
    g_object_unref (gaddr);
141 142
  }

143 144 145 146 147 148 149 150 151
  if (gret == FALSE) {
    if (g_error_matches (gerr, G_IO_ERROR, G_IO_ERROR_PENDING) == FALSE) {
      g_socket_close (gsock, NULL);
      g_object_unref (gsock);
      g_slice_free (NiceSocket, sock);
      return NULL;
    }
    g_error_free(gerr);
  }
152

153 154
  gaddr = g_socket_get_local_address (gsock, NULL);
  if (gaddr == NULL ||
155
      !g_socket_address_to_native (gaddr, &name.addr, sizeof (name), NULL)) {
156
    g_slice_free (NiceSocket, sock);
Youness Alaoui's avatar
Youness Alaoui committed
157 158
    g_socket_close (gsock, NULL);
    g_object_unref (gsock);
159 160
    return NULL;
  }
161
  g_object_unref (gaddr);
162

163
  nice_address_set_from_sockaddr (&sock->addr, &name.addr);
Youness Alaoui's avatar
Youness Alaoui committed
164

165 166
  sock->priv = priv = g_slice_new0 (TcpPriv);

167 168
  if (ctx == NULL)
    ctx = g_main_context_default ();
169
  priv->context = g_main_context_ref (ctx);
170
  priv->server_addr = *addr;
171
  priv->error = FALSE;
172

173
  sock->fileno = gsock;
174
  sock->send_messages = socket_send_messages;
175
  sock->recv_messages = socket_recv_messages;
176 177 178 179 180 181 182 183 184 185 186 187
  sock->is_reliable = socket_is_reliable;
  sock->close = socket_close;

  return sock;
}


static void
socket_close (NiceSocket *sock)
{
  TcpPriv *priv = sock->priv;

Youness Alaoui's avatar
Youness Alaoui committed
188 189 190
  if (sock->fileno) {
    g_socket_close (sock->fileno, NULL);
    g_object_unref (sock->fileno);
191 192
    sock->fileno = NULL;
  }
193 194 195 196 197 198 199
  if (priv->io_source) {
    g_source_destroy (priv->io_source);
    g_source_unref (priv->io_source);
  }
  g_queue_foreach (&priv->send_queue, (GFunc) free_to_be_sent, NULL);
  g_queue_clear (&priv->send_queue);

200 201 202
  if (priv->context)
    g_main_context_unref (priv->context);

203 204 205 206
  g_slice_free(TcpPriv, sock->priv);
}

static gint
207 208
socket_recv_messages (NiceSocket *sock,
    NiceInputMessage *recv_messages, guint n_recv_messages)
209 210
{
  TcpPriv *priv = sock->priv;
211
  guint i;
212

213 214 215 216
  /* Don't try to access the socket if it had an error */
  if (priv->error)
    return -1;

217 218 219 220
  for (i = 0; i < n_recv_messages; i++) {
    gint flags = G_SOCKET_MSG_NONE;
    GError *gerr = NULL;
    gssize len;
221

222 223 224 225 226 227 228 229 230 231 232 233
    len = g_socket_receive_message (sock->fileno, NULL,
        recv_messages[i].buffers, recv_messages[i].n_buffers,
        NULL, NULL, &flags, NULL, &gerr);

    recv_messages[i].length = MAX (len, 0);

    /* recv returns 0 when the peer performed a shutdown.. we must return -1
     * here so that the agent destroys the g_source */
    if (len == 0) {
      priv->error = TRUE;
      break;
    }
234

235 236 237
    if (len < 0) {
      if (g_error_matches (gerr, G_IO_ERROR, G_IO_ERROR_WOULD_BLOCK))
        len = 0;
238

239 240 241 242 243 244 245 246 247
      g_error_free (gerr);
      return len;
    }

    if (recv_messages[i].from)
      *recv_messages[i].from = priv->server_addr;

    if (len <= 0)
      break;
248 249
  }

250 251 252 253 254
  /* Was there an error processing the first message? */
  if (priv->error && i == 0)
    return -1;

  return i;
255 256
}

257 258
static gssize
socket_send_message (NiceSocket *sock, const NiceOutputMessage *message)
259 260
{
  TcpPriv *priv = sock->priv;
261
  gssize ret;
262
  GError *gerr = NULL;
263
  gsize message_len;
264

265
  /* Don't try to access the socket if it had an error, otherwise we risk a
266
   * crash with SIGPIPE (Broken pipe) */
267 268 269
  if (priv->error)
    return -1;

270 271
  message_len = output_message_get_size (message);

272
  /* First try to send the data, don't send it later if it can be sent now
273
   * this way we avoid allocating memory on every send */
274
  if (g_queue_is_empty (&priv->send_queue)) {
275 276 277
    ret = g_socket_send_message (sock->fileno, NULL, message->buffers,
        message->n_buffers, NULL, 0, G_SOCKET_MSG_NONE, NULL, &gerr);

278
    if (ret < 0) {
279 280 281
      if (g_error_matches (gerr, G_IO_ERROR, G_IO_ERROR_WOULD_BLOCK) ||
          g_error_matches (gerr, G_IO_ERROR, G_IO_ERROR_FAILED)) {
        /* Queue the message and send it later. */
282 283
        add_to_be_sent (sock, message, 0, message_len, FALSE);
        ret = message_len;
284
      }
285 286

      g_error_free (gerr);
287
    } else if ((gsize) ret < message_len) {
288
      /* Partial send. */
289 290
      add_to_be_sent (sock, message, ret, message_len, TRUE);
      ret = message_len;
291 292
    }
  } else {
293 294 295
    /* FIXME: This dropping will break http/socks5/etc
     * We probably need a way to the upper layer to control reliability
     */
296 297 298
    /* If the queue is too long, drop whatever packets we can. */
    if (g_queue_get_length (&priv->send_queue) >= MAX_QUEUE_LENGTH) {
      guint peek_idx = 0;
299
      struct to_be_sent *tbs = NULL;
300

301 302 303
      while ((tbs = g_queue_peek_nth (&priv->send_queue, peek_idx)) != NULL) {
        if (tbs->can_drop) {
          tbs = g_queue_pop_nth (&priv->send_queue, peek_idx);
304
          free_to_be_sent (tbs);
305 306 307 308 309 310
          break;
        } else {
          peek_idx++;
        }
      }
    }
311 312

    /* Queue the message and send it later. */
313 314
    add_to_be_sent (sock, message, 0, message_len, FALSE);
    ret = message_len;
315 316
  }

317 318 319 320 321 322 323
  return ret;
}

/* Data sent to this function must be a single entity because buffers can be
 * dropped if the bandwidth isn't fast enough. So do not send a message in
 * multiple chunks. */
static gint
324 325
socket_send_messages (NiceSocket *sock, const NiceAddress *to,
    const NiceOutputMessage *messages, guint n_messages)
326 327 328 329 330 331 332 333 334 335 336 337 338 339 340 341 342 343 344
{
  guint i;

  for (i = 0; i < n_messages; i++) {
    const NiceOutputMessage *message = &messages[i];
    gssize len;

    len = socket_send_message (sock, message);

    if (len < 0) {
      /* Error. */
      return len;
    } else if (len == 0) {
      /* EWOULDBLOCK. */
      break;
    }
  }

  return i;
345 346 347 348 349 350 351 352 353 354 355 356 357 358 359 360 361 362
}

static gboolean
socket_is_reliable (NiceSocket *sock)
{
  return TRUE;
}


/*
 * Returns:
 * -1 = error
 * 0 = have more to send
 * 1 = sent everything
 */

static gboolean
socket_send_more (
363
  GSocket *gsocket,
364 365 366 367 368 369
  GIOCondition condition,
  gpointer data)
{
  NiceSocket *sock = (NiceSocket *) data;
  TcpPriv *priv = sock->priv;
  struct to_be_sent *tbs = NULL;
370
  GError *gerr = NULL;
371

372 373 374 375 376 377 378 379
  agent_lock ();

  if (g_source_is_destroyed (g_main_current_source ())) {
    nice_debug ("Source was destroyed. "
        "Avoided race condition in tcp-bsd.c:socket_send_more");
    agent_unlock ();
    return FALSE;
  }
380

381
  while ((tbs = g_queue_pop_head (&priv->send_queue)) != NULL) {
382 383
    int ret;

384 385 386
    if(condition & G_IO_HUP) {
      /* connection hangs up */
      ret = -1;
Youness Alaoui's avatar
Youness Alaoui committed
387
    } else {
388 389 390
      GOutputVector local_bufs = { tbs->buf, tbs->length };
      ret = g_socket_send_message (sock->fileno, NULL, &local_bufs, 1, NULL, 0,
          G_SOCKET_MSG_NONE, NULL, &gerr);
Youness Alaoui's avatar
Youness Alaoui committed
391
    }
392 393

    if (ret < 0) {
394
      if (gerr != NULL &&
Youness Alaoui's avatar
Youness Alaoui committed
395
          g_error_matches (gerr, G_IO_ERROR, G_IO_ERROR_WOULD_BLOCK)) {
396
        GOutputVector local_buf = { tbs->buf, tbs->length };
397
        NiceOutputMessage local_message = {&local_buf, 1};
398

399
        add_to_be_sent (sock, &local_message, 0, local_buf.size, TRUE);
400
        free_to_be_sent (tbs);
Youness Alaoui's avatar
Youness Alaoui committed
401
        g_error_free (gerr);
402 403
        break;
      }
Youness Alaoui's avatar
Youness Alaoui committed
404
      g_error_free (gerr);
405
    } else if (ret < (int) tbs->length) {
406
      GOutputVector local_buf = { tbs->buf + ret, tbs->length - ret };
407
      NiceOutputMessage local_message = {&local_buf, 1};
408

409
      add_to_be_sent (sock, &local_message, 0, local_buf.size, TRUE);
410
      free_to_be_sent (tbs);
411 412 413
      break;
    }

414
    free_to_be_sent (tbs);
415 416 417 418 419 420 421
  }

  if (g_queue_is_empty (&priv->send_queue)) {
    g_source_destroy (priv->io_source);
    g_source_unref (priv->io_source);
    priv->io_source = NULL;

422
    agent_unlock ();
423 424 425
    return FALSE;
  }

426
  agent_unlock ();
427 428 429 430
  return TRUE;
}


431
/* Queue data starting at byte offset @message_offset from @message’s
432 433 434
 * buffers.
 *
 * Returns the message's length */
435
static void
436
add_to_be_sent (NiceSocket *sock, const NiceOutputMessage *message,
437
    gsize message_offset, gsize message_len, gboolean head)
438 439
{
  TcpPriv *priv = sock->priv;
440 441 442
  struct to_be_sent *tbs;
  guint j;
  gsize offset = 0;
443

444
  if (message_offset >= message_len)
445 446
    return;

447
  tbs = g_slice_new0 (struct to_be_sent);
448 449
  tbs->length = message_len - message_offset;
  tbs->buf = g_malloc (tbs->length);
450
  tbs->can_drop = !head;
451

452 453 454 455 456
  if (head)
    g_queue_push_head (&priv->send_queue, tbs);
  else
    g_queue_push_tail (&priv->send_queue, tbs);

457 458
  if (priv->io_source == NULL) {
    priv->io_source = g_socket_create_source(sock->fileno, G_IO_OUT, NULL);
459 460 461 462
    g_source_set_callback (priv->io_source, (GSourceFunc) socket_send_more,
        sock, NULL);
    g_source_attach (priv->io_source, priv->context);
  }
463 464 465 466 467 468 469 470 471 472 473 474 475 476 477 478 479 480 481 482

  /* Move the data into the buffer. */
  for (j = 0;
       (message->n_buffers >= 0 && j < (guint) message->n_buffers) ||
       (message->n_buffers < 0 && message->buffers[j].buffer != NULL);
       j++) {
    const GOutputVector *buffer = &message->buffers[j];
    gsize len;

    /* Skip this buffer if it’s within @message_offset. */
    if (buffer->size <= message_offset) {
      message_offset -= buffer->size;
      continue;
    }

    len = MIN (tbs->length - offset, buffer->size - message_offset);
    memcpy (tbs->buf + offset, (guint8 *) buffer->buffer + message_offset, len);
    offset += len;
    message_offset -= len;
  }
483 484 485 486 487 488 489 490 491 492 493
}



static void
free_to_be_sent (struct to_be_sent *tbs)
{
  g_free (tbs->buf);
  g_slice_free (struct to_be_sent, tbs);
}