tcp-bsd.c 12.8 KB
Newer Older
1
2
3
/*
 * This file is part of the Nice GLib ICE library.
 *
4
5
6
 * (C) 2008-2009 Collabora Ltd.
 *  Contact: Youness Alaoui
 * (C) 2008-2009 Nokia Corporation. All rights reserved.
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
 *
 * The contents of this file are subject to the Mozilla Public License Version
 * 1.1 (the "License"); you may not use this file except in compliance with
 * the License. You may obtain a copy of the License at
 * http://www.mozilla.org/MPL/
 *
 * Software distributed under the License is distributed on an "AS IS" basis,
 * WITHOUT WARRANTY OF ANY KIND, either express or implied. See the License
 * for the specific language governing rights and limitations under the
 * License.
 *
 * The Original Code is the Nice GLib ICE library.
 *
 * The Initial Developers of the Original Code are Collabora Ltd and Nokia
 * Corporation. All Rights Reserved.
 *
 * Contributors:
24
 *   Youness Alaoui, Collabora Ltd.
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
 *
 * Alternatively, the contents of this file may be used under the terms of the
 * the GNU Lesser General Public License Version 2.1 (the "LGPL"), in which
 * case the provisions of LGPL are applicable instead of those above. If you
 * wish to allow use of your version of this file only under the terms of the
 * LGPL and not to allow others to use your version of this file under the
 * MPL, indicate your decision by deleting the provisions above and replace
 * them with the notice and other provisions required by the LGPL. If you do
 * not delete the provisions above, a recipient may use your version of this
 * file under either the MPL or the LGPL.
 */

/*
 * Implementation of TCP relay socket interface using TCP Berkeley sockets. (See
 * http://en.wikipedia.org/wiki/Berkeley_sockets.)
 */
#ifdef HAVE_CONFIG_H
# include "config.h"
#endif

#include "tcp-bsd.h"
#include "agent-priv.h"

#include <string.h>
#include <errno.h>
#include <fcntl.h>

#ifndef G_OS_WIN32
#include <unistd.h>
#endif

typedef struct {
  NiceAddress server_addr;
  GQueue send_queue;
  GMainContext *context;
  GSource *io_source;
61
  gboolean error;
62
63
64
} TcpPriv;

struct to_be_sent {
65
66
  guint8 *buf;
  gsize length;
67
  gboolean can_drop;
68
69
};

70
#define MAX_QUEUE_LENGTH 20
71
72

static void socket_close (NiceSocket *sock);
73
74
static gint socket_recv_messages (NiceSocket *sock,
    NiceInputMessage *recv_messages, guint n_recv_messages);
75
76
static gint socket_send_messages (NiceSocket *sock,
    const NiceOutputMessage *messages, guint n_messages);
77
78
79
static gboolean socket_is_reliable (NiceSocket *sock);


80
81
static void add_to_be_sent (NiceSocket *sock, const NiceOutputMessage *message,
    gsize message_offset, gboolean head);
82
static void free_to_be_sent (struct to_be_sent *tbs);
83
static gboolean socket_send_more (GSocket *gsocket, GIOCondition condition,
84
85
86
    gpointer data);

NiceSocket *
87
nice_tcp_bsd_socket_new (GMainContext *ctx, NiceAddress *addr)
88
{
89
90
91
92
  union {
    struct sockaddr_storage storage;
    struct sockaddr addr;
  } name;
Olivier Crête's avatar
Olivier Crête committed
93
  NiceSocket *sock;
94
  TcpPriv *priv;
Livio Madaro's avatar
Livio Madaro committed
95
96
  GSocket *gsock = NULL;
  GError *gerr = NULL;
97
  gboolean gret = FALSE;
Livio Madaro's avatar
Livio Madaro committed
98
  GSocketAddress *gaddr;
99

100
101
102
  if (addr == NULL) {
    /* We can't connect a tcp socket with no destination address */
    return NULL;
103
104
  }

Olivier Crête's avatar
Olivier Crête committed
105
106
  sock = g_slice_new0 (NiceSocket);

107
  nice_address_copy_to_sockaddr (addr, &name.addr);
108

Livio Madaro's avatar
Livio Madaro committed
109
  if (gsock == NULL) {
110
    if (name.storage.ss_family == AF_UNSPEC || name.storage.ss_family == AF_INET) {
Youness Alaoui's avatar
Youness Alaoui committed
111
112
      gsock = g_socket_new (G_SOCKET_FAMILY_IPV4, G_SOCKET_TYPE_STREAM,
          G_SOCKET_PROTOCOL_TCP, NULL);
Livio Madaro's avatar
Livio Madaro committed
113

114
      name.storage.ss_family = AF_INET;
115
#ifdef HAVE_SA_LEN
116
      name.storage.ss_len = sizeof (struct sockaddr_in);
117
#endif
118
    } else if (name.storage.ss_family == AF_INET6) {
Youness Alaoui's avatar
Youness Alaoui committed
119
120
      gsock = g_socket_new (G_SOCKET_FAMILY_IPV6, G_SOCKET_TYPE_STREAM,
          G_SOCKET_PROTOCOL_TCP, NULL);
121
      name.storage.ss_family = AF_INET6;
122
#ifdef HAVE_SA_LEN
123
      name.storage.ss_len = sizeof (struct sockaddr_in6);
124
#endif
125
    }
126
127
  }

Youness Alaoui's avatar
Youness Alaoui committed
128
  if (gsock == NULL) {
129
130
    g_slice_free (NiceSocket, sock);
    return NULL;
Youness Alaoui's avatar
Youness Alaoui committed
131
  }
132

Livio Madaro's avatar
Livio Madaro committed
133
  /* GSocket: All socket file descriptors are set to be close-on-exec. */
134
135
  g_socket_set_blocking (gsock, false);

136
  gaddr = g_socket_address_new_from_native (&name.addr, sizeof (name));
137
138
139
140

  if (gaddr != NULL) {
    gret = g_socket_connect (gsock, gaddr, NULL, &gerr);
    g_object_unref (gaddr);
141
142
  }

143
144
145
146
147
148
149
150
151
  if (gret == FALSE) {
    if (g_error_matches (gerr, G_IO_ERROR, G_IO_ERROR_PENDING) == FALSE) {
      g_socket_close (gsock, NULL);
      g_object_unref (gsock);
      g_slice_free (NiceSocket, sock);
      return NULL;
    }
    g_error_free(gerr);
  }
Livio Madaro's avatar
Livio Madaro committed
152

153
154
  gaddr = g_socket_get_local_address (gsock, NULL);
  if (gaddr == NULL ||
155
      !g_socket_address_to_native (gaddr, &name.addr, sizeof (name), NULL)) {
156
    g_slice_free (NiceSocket, sock);
Youness Alaoui's avatar
Youness Alaoui committed
157
158
    g_socket_close (gsock, NULL);
    g_object_unref (gsock);
159
160
    return NULL;
  }
161
  g_object_unref (gaddr);
162

163
  nice_address_set_from_sockaddr (&sock->addr, &name.addr);
Youness Alaoui's avatar
Youness Alaoui committed
164

165
166
  sock->priv = priv = g_slice_new0 (TcpPriv);

167
  priv->context = g_main_context_ref (ctx);
168
  priv->server_addr = *addr;
169
  priv->error = FALSE;
170

Livio Madaro's avatar
Livio Madaro committed
171
  sock->fileno = gsock;
172
  sock->send_messages = socket_send_messages;
173
  sock->recv_messages = socket_recv_messages;
174
175
176
177
178
179
180
181
182
183
184
185
  sock->is_reliable = socket_is_reliable;
  sock->close = socket_close;

  return sock;
}


static void
socket_close (NiceSocket *sock)
{
  TcpPriv *priv = sock->priv;

Youness Alaoui's avatar
Youness Alaoui committed
186
187
188
  if (sock->fileno) {
    g_socket_close (sock->fileno, NULL);
    g_object_unref (sock->fileno);
Livio Madaro's avatar
Livio Madaro committed
189
190
    sock->fileno = NULL;
  }
191
192
193
194
195
196
197
  if (priv->io_source) {
    g_source_destroy (priv->io_source);
    g_source_unref (priv->io_source);
  }
  g_queue_foreach (&priv->send_queue, (GFunc) free_to_be_sent, NULL);
  g_queue_clear (&priv->send_queue);

198
199
200
  if (priv->context)
    g_main_context_unref (priv->context);

201
202
203
204
  g_slice_free(TcpPriv, sock->priv);
}

static gint
205
206
socket_recv_messages (NiceSocket *sock,
    NiceInputMessage *recv_messages, guint n_recv_messages)
207
208
{
  TcpPriv *priv = sock->priv;
209
  guint i;
210

211
212
213
214
  /* Don't try to access the socket if it had an error */
  if (priv->error)
    return -1;

215
216
217
218
  for (i = 0; i < n_recv_messages; i++) {
    gint flags = G_SOCKET_MSG_NONE;
    GError *gerr = NULL;
    gssize len;
219

220
221
222
223
224
225
226
227
228
229
230
231
    len = g_socket_receive_message (sock->fileno, NULL,
        recv_messages[i].buffers, recv_messages[i].n_buffers,
        NULL, NULL, &flags, NULL, &gerr);

    recv_messages[i].length = MAX (len, 0);

    /* recv returns 0 when the peer performed a shutdown.. we must return -1
     * here so that the agent destroys the g_source */
    if (len == 0) {
      priv->error = TRUE;
      break;
    }
232

233
234
235
    if (len < 0) {
      if (g_error_matches (gerr, G_IO_ERROR, G_IO_ERROR_WOULD_BLOCK))
        len = 0;
Livio Madaro's avatar
Livio Madaro committed
236

237
238
239
240
241
242
243
244
245
      g_error_free (gerr);
      return len;
    }

    if (recv_messages[i].from)
      *recv_messages[i].from = priv->server_addr;

    if (len <= 0)
      break;
246
247
  }

248
249
250
251
252
  /* Was there an error processing the first message? */
  if (priv->error && i == 0)
    return -1;

  return i;
253
254
}

255
256
static gssize
socket_send_message (NiceSocket *sock, const NiceOutputMessage *message)
257
258
{
  TcpPriv *priv = sock->priv;
259
  gssize ret;
Livio Madaro's avatar
Livio Madaro committed
260
  GError *gerr = NULL;
261

262
  /* Don't try to access the socket if it had an error, otherwise we risk a
263
   * crash with SIGPIPE (Broken pipe) */
264
265
266
  if (priv->error)
    return -1;

267
  /* First try to send the data, don't send it later if it can be sent now
268
   * this way we avoid allocating memory on every send */
269
  if (g_queue_is_empty (&priv->send_queue)) {
270
271
272
    ret = g_socket_send_message (sock->fileno, NULL, message->buffers,
        message->n_buffers, NULL, 0, G_SOCKET_MSG_NONE, NULL, &gerr);

273
    if (ret < 0) {
274
275
276
277
278
      if (g_error_matches (gerr, G_IO_ERROR, G_IO_ERROR_WOULD_BLOCK) ||
          g_error_matches (gerr, G_IO_ERROR, G_IO_ERROR_FAILED)) {
        /* Queue the message and send it later. */
        add_to_be_sent (sock, message, 0, FALSE);
        ret = message->length;
279
      }
280
281
282
283
284
285

      g_error_free (gerr);
    } else if ((gsize) ret < message->length) {
      /* Partial send. */
      add_to_be_sent (sock, message, ret, TRUE);
      ret = message->length;
286
287
    }
  } else {
288
289
290
    /* FIXME: This dropping will break http/socks5/etc
     * We probably need a way to the upper layer to control reliability
     */
291
292
293
    /* If the queue is too long, drop whatever packets we can. */
    if (g_queue_get_length (&priv->send_queue) >= MAX_QUEUE_LENGTH) {
      guint peek_idx = 0;
294
      struct to_be_sent *tbs = NULL;
295

296
297
298
      while ((tbs = g_queue_peek_nth (&priv->send_queue, peek_idx)) != NULL) {
        if (tbs->can_drop) {
          tbs = g_queue_pop_nth (&priv->send_queue, peek_idx);
299
          free_to_be_sent (tbs);
300
301
302
303
304
305
          break;
        } else {
          peek_idx++;
        }
      }
    }
306
307
308
309

    /* Queue the message and send it later. */
    add_to_be_sent (sock, message, 0, FALSE);
    ret = message->length;
310
311
  }

312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
  return ret;
}

/* Data sent to this function must be a single entity because buffers can be
 * dropped if the bandwidth isn't fast enough. So do not send a message in
 * multiple chunks. */
static gint
socket_send_messages (NiceSocket *sock, const NiceOutputMessage *messages,
    guint n_messages)
{
  guint i;

  for (i = 0; i < n_messages; i++) {
    const NiceOutputMessage *message = &messages[i];
    gssize len;

    len = socket_send_message (sock, message);

    if (len < 0) {
      /* Error. */
      return len;
    } else if (len == 0) {
      /* EWOULDBLOCK. */
      break;
    }
  }

  return i;
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
}

static gboolean
socket_is_reliable (NiceSocket *sock)
{
  return TRUE;
}


/*
 * Returns:
 * -1 = error
 * 0 = have more to send
 * 1 = sent everything
 */

static gboolean
socket_send_more (
358
  GSocket *gsocket,
359
360
361
362
363
364
  GIOCondition condition,
  gpointer data)
{
  NiceSocket *sock = (NiceSocket *) data;
  TcpPriv *priv = sock->priv;
  struct to_be_sent *tbs = NULL;
Livio Madaro's avatar
Livio Madaro committed
365
  GError *gerr = NULL;
366

367
368
369
370
371
372
373
374
  agent_lock ();

  if (g_source_is_destroyed (g_main_current_source ())) {
    nice_debug ("Source was destroyed. "
        "Avoided race condition in tcp-bsd.c:socket_send_more");
    agent_unlock ();
    return FALSE;
  }
375

376
  while ((tbs = g_queue_pop_head (&priv->send_queue)) != NULL) {
377
378
    int ret;

Livio Madaro's avatar
Livio Madaro committed
379
380
381
    if(condition & G_IO_HUP) {
      /* connection hangs up */
      ret = -1;
Youness Alaoui's avatar
Youness Alaoui committed
382
    } else {
383
384
385
      GOutputVector local_bufs = { tbs->buf, tbs->length };
      ret = g_socket_send_message (sock->fileno, NULL, &local_bufs, 1, NULL, 0,
          G_SOCKET_MSG_NONE, NULL, &gerr);
Youness Alaoui's avatar
Youness Alaoui committed
386
    }
387
388

    if (ret < 0) {
389
      if (gerr != NULL &&
Youness Alaoui's avatar
Youness Alaoui committed
390
          g_error_matches (gerr, G_IO_ERROR, G_IO_ERROR_WOULD_BLOCK)) {
391
392
393
394
395
396
397
        GOutputVector local_buf = { tbs->buf, tbs->length };
        NiceOutputMessage local_message = {
          &local_buf, 1, NULL, local_buf.size
        };

        add_to_be_sent (sock, &local_message, 0, TRUE);
        free_to_be_sent (tbs);
Youness Alaoui's avatar
Youness Alaoui committed
398
        g_error_free (gerr);
399
400
        break;
      }
Youness Alaoui's avatar
Youness Alaoui committed
401
      g_error_free (gerr);
402
    } else if (ret < (int) tbs->length) {
403
404
405
406
407
408
409
      GOutputVector local_buf = { tbs->buf + ret, tbs->length - ret };
      NiceOutputMessage local_message = {
        &local_buf, 1, NULL, local_buf.size
      };

      add_to_be_sent (sock, &local_message, 0, TRUE);
      free_to_be_sent (tbs);
410
411
412
      break;
    }

413
    free_to_be_sent (tbs);
414
415
416
417
418
419
420
  }

  if (g_queue_is_empty (&priv->send_queue)) {
    g_source_destroy (priv->io_source);
    g_source_unref (priv->io_source);
    priv->io_source = NULL;

421
    agent_unlock ();
422
423
424
    return FALSE;
  }

425
  agent_unlock ();
426
427
428
429
  return TRUE;
}


430
431
/* Queue data starting at byte offset @message_offset from @message’s
 * buffers. */
432
static void
433
434
add_to_be_sent (NiceSocket *sock, const NiceOutputMessage *message,
    gsize message_offset, gboolean head)
435
436
{
  TcpPriv *priv = sock->priv;
437
438
439
  struct to_be_sent *tbs;
  guint j;
  gsize offset = 0;
440

441
  if (message_offset >= message->length)
442
443
    return;

444
  tbs = g_slice_new0 (struct to_be_sent);
445
446
  tbs->buf = g_malloc (message->length - message_offset);
  tbs->length = message->length - message_offset;
447
  tbs->can_drop = !head;
448

449
450
451
452
453
  if (head)
    g_queue_push_head (&priv->send_queue, tbs);
  else
    g_queue_push_tail (&priv->send_queue, tbs);

Livio Madaro's avatar
Livio Madaro committed
454
455
  if (priv->io_source == NULL) {
    priv->io_source = g_socket_create_source(sock->fileno, G_IO_OUT, NULL);
456
457
458
459
    g_source_set_callback (priv->io_source, (GSourceFunc) socket_send_more,
        sock, NULL);
    g_source_attach (priv->io_source, priv->context);
  }
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479

  /* Move the data into the buffer. */
  for (j = 0;
       (message->n_buffers >= 0 && j < (guint) message->n_buffers) ||
       (message->n_buffers < 0 && message->buffers[j].buffer != NULL);
       j++) {
    const GOutputVector *buffer = &message->buffers[j];
    gsize len;

    /* Skip this buffer if it’s within @message_offset. */
    if (buffer->size <= message_offset) {
      message_offset -= buffer->size;
      continue;
    }

    len = MIN (tbs->length - offset, buffer->size - message_offset);
    memcpy (tbs->buf + offset, (guint8 *) buffer->buffer + message_offset, len);
    offset += len;
    message_offset -= len;
  }
480
481
482
483
484
485
486
487
488
489
490
}



static void
free_to_be_sent (struct to_be_sent *tbs)
{
  g_free (tbs->buf);
  g_slice_free (struct to_be_sent, tbs);
}