Commit 0ac9f3f9 authored by Philip Withnall's avatar Philip Withnall Committed by Olivier Crête
Browse files

agent: Add GPollableInputStream support to NiceInputStream

parent 6d3a32a0
......@@ -60,13 +60,20 @@
# include "config.h"
#endif
#include <errno.h>
#include "inputstream.h"
#include "agent-priv.h"
static void streams_removed_cb (NiceAgent *agent, guint *stream_ids,
gpointer user_data);
static void nice_input_stream_init_pollable (
GPollableInputStreamInterface *iface);
G_DEFINE_TYPE (NiceInputStream, nice_input_stream, G_TYPE_INPUT_STREAM);
G_DEFINE_TYPE_WITH_CODE (NiceInputStream,
nice_input_stream, G_TYPE_INPUT_STREAM,
G_IMPLEMENT_INTERFACE (G_TYPE_POLLABLE_INPUT_STREAM,
nice_input_stream_init_pollable));
enum
{
......@@ -89,6 +96,11 @@ static void nice_input_stream_set_property (GObject *object, guint prop_id,
const GValue *value, GParamSpec *pspec);
static gssize nice_input_stream_read (GInputStream *stream, void *buffer,
gsize count, GCancellable *cancellable, GError **error);
static gboolean nice_input_stream_is_readable (GPollableInputStream *stream);
static gssize nice_input_stream_read_nonblocking (GPollableInputStream *stream,
void *buffer, gsize count, GError **error);
static GSource *nice_input_stream_create_source (GPollableInputStream *stream,
GCancellable *cancellable);
static void
nice_input_stream_class_init (NiceInputStreamClass *klass)
......@@ -239,6 +251,14 @@ nice_input_stream_init (NiceInputStream *stream)
g_weak_ref_init (&stream->priv->agent_ref, NULL);
}
static void
nice_input_stream_init_pollable (GPollableInputStreamInterface *iface)
{
iface->is_readable = nice_input_stream_is_readable;
iface->read_nonblocking = nice_input_stream_read_nonblocking;
iface->create_source = nice_input_stream_create_source;
}
/**
* nice_input_stream_new:
* @agent: A #NiceAgent
......@@ -279,8 +299,11 @@ nice_input_stream_read (GInputStream *stream, void *buffer, gsize count,
gssize len;
/* Closed streams are not readable. */
if (g_input_stream_is_closed (stream))
return 0;
if (g_input_stream_is_closed (stream)) {
g_set_error_literal (error, G_IO_ERROR, G_IO_ERROR_CLOSED,
"Stream is closed.");
return -1;
}
/* Has the agent disappeared? */
agent = g_weak_ref_get (&priv->agent_ref);
......@@ -298,6 +321,133 @@ nice_input_stream_read (GInputStream *stream, void *buffer, gsize count,
return len;
}
static gboolean
nice_input_stream_is_readable (GPollableInputStream *stream)
{
NiceInputStreamPrivate *priv = NICE_INPUT_STREAM (stream)->priv;
Component *component = NULL;
Stream *_stream = NULL;
gboolean retval = FALSE;
GSList *i;
NiceAgent *agent; /* owned */
/* Closed streams are not readable. */
if (g_input_stream_is_closed (G_INPUT_STREAM (stream)))
return FALSE;
/* Has the agent disappeared? */
agent = g_weak_ref_get (&priv->agent_ref);
if (agent == NULL)
return FALSE;
agent_lock ();
if (!agent_find_component (agent, priv->stream_id, priv->component_id,
&_stream, &component)) {
g_warning ("Could not find component %u in stream %u", priv->component_id,
priv->stream_id);
goto done;
}
/* If it’s a reliable agent, see if there’s any pending data in the pseudo-TCP
* buffer. */
if (agent->reliable && component->tcp != NULL &&
pseudo_tcp_socket_get_available_bytes (component->tcp) > 0) {
retval = TRUE;
goto done;
}
/* Check whether any of the component’s FDs are pollable. */
for (i = component->socket_sources; i != NULL; i = i->next) {
SocketSource *socket_source = i->data;
NiceSocket *socket = socket_source->socket;
if (g_socket_condition_check (socket->fileno, G_IO_IN) != 0) {
retval = TRUE;
break;
}
}
done:
agent_unlock ();
g_object_unref (agent);
return retval;
}
static gssize
nice_input_stream_read_nonblocking (GPollableInputStream *stream, void *buffer,
gsize count, GError **error)
{
NiceInputStreamPrivate *priv = NICE_INPUT_STREAM (stream)->priv;
NiceAgent *agent; /* owned */
gssize len;
/* Closed streams are not readable. */
if (g_input_stream_is_closed (G_INPUT_STREAM (stream))) {
g_set_error_literal (error, G_IO_ERROR, G_IO_ERROR_CLOSED,
"Stream is closed.");
return -1;
}
/* Has the agent disappeared? */
agent = g_weak_ref_get (&priv->agent_ref);
if (agent == NULL) {
g_set_error_literal (error, G_IO_ERROR, G_IO_ERROR_CLOSED,
"Stream is closed due to the NiceAgent being finalised.");
return -1;
}
len = nice_agent_recv_nonblocking (agent, priv->stream_id,
priv->component_id, (guint8 *) buffer, count, NULL, error);
g_object_unref (agent);
return len;
}
static GSource *
nice_input_stream_create_source (GPollableInputStream *stream,
GCancellable *cancellable)
{
NiceInputStreamPrivate *priv = NICE_INPUT_STREAM (stream)->priv;
GSource *component_source = NULL;
Component *component = NULL;
Stream *_stream = NULL;
NiceAgent *agent; /* owned */
/* Closed streams cannot have sources. */
if (g_input_stream_is_closed (G_INPUT_STREAM (stream)))
return g_pollable_source_new (G_OBJECT (stream)); /* dummy */
/* Has the agent disappeared? */
agent = g_weak_ref_get (&priv->agent_ref);
if (agent == NULL)
return g_pollable_source_new (G_OBJECT (stream)); /* dummy */
agent_lock ();
/* Grab the socket for this component. */
if (!agent_find_component (agent, priv->stream_id, priv->component_id,
&_stream, &component)) {
g_warning ("Could not find component %u in stream %u", priv->component_id,
priv->stream_id);
component_source = g_pollable_source_new (G_OBJECT (stream)); /* dummy */
goto done;
}
component_source = component_source_new (component, G_OBJECT (stream),
G_IO_IN, cancellable);
done:
agent_unlock ();
g_object_unref (agent);
return component_source;
}
static void
streams_removed_cb (NiceAgent *agent, guint *stream_ids, gpointer user_data)
{
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment