Commit 2b6370a8 authored by Philip Withnall's avatar Philip Withnall Committed by Olivier Crête
Browse files

agent: Support invoking I/O callbacks in non-default contexts

If the Component’s I/O receiver machinery is invoked from a thread which
can’t acquire the main context specified for the I/O callbacks, the
callbacks need to be queued as idle handlers in that main context.

This is needed for the case where blocking reads are being performed in
one thread, with their callbacks needing to be delivered in another

This introduces a new fine-grained lock to Component: io_mutex. This
protects accesses to Component->io_callback, Component->io_user_data and
Component->pending_io_callbacks. If being locked at the same time as the
main agent lock, it must always be locked afterwards, but the agent lock
does not *have* to be held in order to lock io_mutex.
parent 3be5b428
......@@ -1043,7 +1043,9 @@ pseudo_tcp_socket_readable (PseudoTcpSocket *sock, gpointer user_data)
g_object_add_weak_pointer (G_OBJECT (agent), (gpointer *)&agent);
do {
if (component->io_callback != NULL)
gboolean has_io_callback = component_has_io_callback (component);
if (has_io_callback)
len = pseudo_tcp_socket_recv (sock, (gchar *) buf, sizeof(buf));
len = 0;
......@@ -2696,7 +2698,7 @@ nice_agent_g_source_cb (
nice_debug ("Agent %p: unable to recv from socket %p. Detaching",
ctx->agent, ctx->socket);
component_detach_socket_source (component, ctx->socket);
} else if (len > 0 && component->io_callback) {
} else if (len > 0) {
component_emit_io_callback (component, buf, len);
......@@ -54,6 +54,14 @@
#include "agent-priv.h"
static void
component_schedule_io_callback (Component *component);
static void
component_deschedule_io_callback (Component *component);
/* Must *not* take the agent lock, since it’s called from within
* component_set_io_callback(), which holds the Component’s I/O lock. */
static void
socket_source_detach (SocketSource *source)
......@@ -73,7 +81,6 @@ socket_source_free (SocketSource *source)
g_slice_free (SocketSource, source);
Component *
component_new (guint id, NiceAgent *agent, Stream *stream)
......@@ -87,6 +94,16 @@ component_new (guint id, NiceAgent *agent, Stream *stream)
component->agent = agent;
component->stream = stream;
g_mutex_init (&component->io_mutex);
g_queue_init (&component->pending_io_messages);
component->io_callback_id = 0;
/* Start off with a fresh main context and all I/O paused. This
* will be updated when nice_agent_attach_recv() or nice_agent_recv() are
* called. */
component_set_io_context (component, NULL);
component_set_io_callback (component, NULL, NULL);
return component;
......@@ -95,6 +112,7 @@ component_free (Component *cmp)
GSList *i;
GList *item;
IOCallbackData *data;
for (i = cmp->local_candidates; i; i = i->next) {
NiceCandidate *candidate = i->data;
......@@ -146,11 +164,18 @@ component_free (Component *cmp)
cmp->tcp = NULL;
while ((data = g_queue_pop_head (&cmp->pending_io_messages)) != NULL)
io_callback_data_free (data);
component_deschedule_io_callback (cmp);
if (cmp->ctx != NULL) {
g_main_context_unref (cmp->ctx);
cmp->ctx = NULL;
g_mutex_clear (&cmp->io_mutex);
g_slice_free (Component, cmp);
......@@ -425,6 +450,9 @@ component_detach_socket_source (Component *component, NiceSocket *socket)
* Detaches socket handles of @component from the main context. Leaves the
* sockets themselves untouched.
* Must *not* take the agent lock, since it’s called from within
* component_set_io_callback(), which holds the Component’s I/O lock.
component_detach_socket_sources (Component *component)
......@@ -450,6 +478,8 @@ void
component_set_io_callback (Component *component, NiceAgentRecvFunc func,
gpointer user_data, GMainContext *context)
g_mutex_lock (&component->io_mutex);
/* Reference the context early so we don’t accidentally free it below. */
if (context != NULL && func != NULL)
g_main_context_ref (context);
......@@ -468,7 +498,117 @@ component_set_io_callback (Component *component, NiceAgentRecvFunc func,
component->io_callback = func;
component->io_user_data = user_data;
component->ctx = context; /* referenced above */
component_schedule_io_callback (component);
} else {
component_deschedule_io_callback (component);
g_mutex_unlock (&component->io_mutex);
component_has_io_callback (Component *component)
gboolean has_io_callback;
g_mutex_lock (&component->io_mutex);
has_io_callback = (component->io_callback != NULL);
g_mutex_unlock (&component->io_mutex);
return has_io_callback;
IOCallbackData *
io_callback_data_new (const guint8 *buf, gsize buf_len)
IOCallbackData *data;
data = g_slice_new0 (IOCallbackData);
data->buf = g_memdup (buf, buf_len);
data->buf_len = buf_len;
data->offset = 0;
return data;
io_callback_data_free (IOCallbackData *data)
g_free (data->buf);
g_slice_free (IOCallbackData, data);
/* This is called with the global agent lock released. It does not take that
* lock, but does take the io_mutex. */
static gboolean
emit_io_callback_cb (gpointer user_data)
Component *component = user_data;
IOCallbackData *data;
NiceAgentRecvFunc io_callback;
gpointer io_user_data;
guint stream_id, component_id;
NiceAgent *agent;
agent = component->agent;
g_object_add_weak_pointer (G_OBJECT (agent), (gpointer *) &agent);
stream_id = component->stream->id;
component_id = component->id;
g_mutex_lock (&component->io_mutex);
/* The members of Component are guaranteed not to have changed since this
* GSource was attached in component_emit_io_callback(). The Component’s agent
* and stream are immutable after construction, as are the stream and
* component IDs. The callback and its user data may have changed, but are
* guaranteed to be non-%NULL at the start as the idle source is removed when
* the callback is set to %NULL. They may become %NULL during the io_callback,
* so must be re-checked every loop iteration. The data buffer is copied into
* the #IOCallbackData closure.
* If the component is destroyed (which happens if the agent or stream are
* destroyed) between attaching the GSource and firing it, the GSource is
* detached in component_free() and this callback is never invoked. If the
* agent is destroyed during an io_callback, its weak pointer will be
* nullified. Similarly, the Component needs to be re-queried for after every
* iteration, just in case the client has removed the stream in the
* callback. */
while (TRUE) {
io_callback = component->io_callback;
io_user_data = component->io_user_data;
data = g_queue_peek_head (&component->pending_io_messages);
if (data == NULL || io_callback == NULL)
g_mutex_unlock (&component->io_mutex);
io_callback (agent, stream_id, component_id,
data->buf_len - data->offset, (gchar *) data->buf + data->offset,
/* Check for the user destroying things underneath our feet. */
if (agent == NULL ||
!agent_find_component (agent, stream_id, component_id,
NULL, &component)) {
nice_debug ("%s: Agent or component destroyed.", G_STRFUNC);
g_queue_pop_head (&component->pending_io_messages);
io_callback_data_free (data);
g_mutex_lock (&component->io_mutex);
component->io_callback_id = 0;
g_mutex_unlock (&component->io_mutex);
g_object_remove_weak_pointer (G_OBJECT (agent), (gpointer *) &agent);
/* This must be called with the agent lock *held*. */
......@@ -488,18 +628,80 @@ component_emit_io_callback (Component *component,
agent = component->agent;
stream_id = component->stream->id;
component_id = component->id;
g_mutex_lock (&component->io_mutex);
io_callback = component->io_callback;
io_user_data = component->io_user_data;
g_mutex_unlock (&component->io_mutex);
/* Allow this to be called with a NULL io_callback, since the caller can’t
* lock io_mutex to check beforehand. */
if (io_callback == NULL)
g_assert (NICE_IS_AGENT (agent));
g_assert (stream_id > 0);
g_assert (component_id > 0);
g_assert (io_callback != NULL);
agent_unlock ();
/* Only allocate a closure if the callback is being deferred to an idle
* handler. */
if (g_main_context_is_owner (component->ctx)) {
/* Thread owns the main context, so invoke the callback directly. */
agent_unlock ();
io_callback (agent, stream_id,
component_id, buf_len, (gchar *) buf, io_user_data);
agent_lock ();
} else {
IOCallbackData *data;
g_mutex_lock (&component->io_mutex);
/* Slow path: Current thread doesn’t own the Component’s context at the
* moment, so schedule the callback in an idle handler. */
data = io_callback_data_new (buf, buf_len);
g_queue_push_tail (&component->pending_io_messages,
data); /* transfer ownership */
nice_debug ("%s: **WARNING: SLOW PATH**", G_STRFUNC);
component_schedule_io_callback (component);
g_mutex_unlock (&component->io_mutex);
/* Note: Must be called with the io_mutex held. */
static void
component_schedule_io_callback (Component *component)
GSource *source;
io_callback (agent, stream_id, component_id,
buf_len, (gchar *) buf, io_user_data);
/* Already scheduled or nothing to schedule? */
if (component->io_callback_id != 0 ||
g_queue_is_empty (&component->pending_io_messages))
/* Add the idle callback. If nice_agent_attach_recv() is called with a
* NULL callback before this source is dispatched, the source will be
* destroyed, but any pending data will remain in
* component->pending_io_messages, ready to be picked up when a callback
* is re-attached, or if nice_agent_recv() is called. */
source = g_idle_source_new ();
g_source_set_priority (source, G_PRIORITY_DEFAULT);
g_source_set_callback (source, emit_io_callback_cb, component, NULL);
component->io_callback_id = g_source_attach (source, component->ctx);
g_source_unref (source);
/* Note: Must be called with the io_mutex held. */
static void
component_deschedule_io_callback (Component *component)
/* Already descheduled? */
if (component->io_callback_id == 0)
agent_lock ();
g_source_remove (component->io_callback_id);
component->io_callback_id = 0;
......@@ -105,6 +105,31 @@ typedef struct {
GSource *source;
} SocketSource;
/* A buffer of data which has been received and processed (so is guaranteed not
* to be a STUN packet, or to contain pseudo-TCP header bytes, for example), but
* which hasn’t yet been sent to the client in an I/O callback. This could be
* due to the main context not being run, or due to the I/O callback being
* detached.
* The @offset member gives the byte offset into @buf which has already been
* sent to the client. #IOCallbackData buffers remain in the
* #Component::pending_io_messages queue until all of their bytes have been sent
* to the client.
* @offset is guaranteed to be smaller than @buf_len. */
typedef struct {
guint8 *buf; /* owned */
gsize buf_len;
gsize offset;
} IOCallbackData;
IOCallbackData *
io_callback_data_new (const guint8 *buf, gsize buf_len);
io_callback_data_free (IOCallbackData *data);
struct _Component
NiceComponentType type;
......@@ -119,13 +144,27 @@ struct _Component
see ICE 11.1. "Sending Media" (ID-19) */
NiceCandidate *restart_candidate; /**< for storing active remote candidate during a restart */
GMutex io_mutex; /**< protects io_callback, io_user_data,
pending_io_messages and io_callback_id.
immutable: can be accessed without
holding the agent lock; if the agent
lock is to be taken, it must always be
taken before this one */
NiceAgentRecvFunc io_callback; /**< function called on io cb */
gpointer io_user_data; /**< data passed to the io function */
GMainContext *ctx; /**< context for GSources for this
component */
NiceAgent *agent; /* unowned */
Stream *stream; /* unowned */
GQueue pending_io_messages; /**< queue of packets which have been
received but not passed to the client
in an I/O callback or recv() call yet.
each element is an owned
IOCallbackData */
guint io_callback_id; /* GSource ID of the I/O callback */
NiceAgent *agent; /* unowned, immutable: can be accessed without holding the
* agent lock */
Stream *stream; /* unowned, immutable: can be accessed without holding the
* agent lock */
PseudoTcpSocket *tcp;
GSource* tcp_clock;
......@@ -177,6 +216,9 @@ void
component_emit_io_callback (Component *component,
const guint8 *buf, gsize buf_len);
component_has_io_callback (Component *component);
#endif /* _NICE_COMPONENT_H */
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment