Commit afb27f91 authored by Göran Jönsson's avatar Göran Jönsson Committed by Sebastian Dröge

rtsp-server: remove recursive behavior

Introduce a threadpool to send rtp and rtcp to avoid recursive behavior.
parent 4be7424d
Pipeline #17362 passed with stages
in 13 minutes and 50 seconds
......@@ -2594,7 +2594,6 @@ handle_setup_request (GstRTSPClient * client, GstRTSPContext * ctx)
g_object_ref (trans);
add_data_seq (client, ct->interleaved.min);
add_data_seq (client, ct->interleaved.max);
gst_rtsp_stream_set_watch_context (stream, priv->watch_context);
}
/* create and serialize the server transport */
......
......@@ -181,7 +181,7 @@ struct _GstRTSPStreamPrivate
GHashTable *ptmap;
GstRTSPPublishClockMode publish_clock_mode;
GMainContext *watch_context;
GThreadPool *send_pool;
};
#define DEFAULT_CONTROL NULL
......@@ -298,7 +298,7 @@ gst_rtsp_stream_init (GstRTSPStream * stream)
NULL, (GDestroyNotify) gst_caps_unref);
priv->ptmap = g_hash_table_new_full (NULL, NULL, NULL,
(GDestroyNotify) gst_caps_unref);
priv->watch_context = NULL;
priv->send_pool = NULL;
}
typedef struct _UdpClientAddrInfo UdpClientAddrInfo;
......@@ -334,6 +334,8 @@ gst_rtsp_stream_finalize (GObject * obj)
/* we really need to be unjoined now */
g_return_if_fail (priv->joined_bin == NULL);
if (priv->send_pool)
g_thread_pool_free (priv->send_pool, TRUE, TRUE);
if (priv->mcast_addr_v4)
gst_rtsp_address_free (priv->mcast_addr_v4);
if (priv->mcast_addr_v6)
......@@ -378,9 +380,6 @@ gst_rtsp_stream_finalize (GObject * obj)
g_hash_table_unref (priv->keys);
g_hash_table_destroy (priv->ptmap);
if (priv->watch_context)
g_main_context_unref (priv->watch_context);
G_OBJECT_CLASS (gst_rtsp_stream_parent_class)->finalize (obj);
}
......@@ -2569,6 +2568,34 @@ send_tcp_message (GstRTSPStream * stream, gint idx)
g_mutex_lock (&priv->lock);
}
static void
send_thread_main (gpointer data, gpointer user_data)
{
GstRTSPStream *stream = user_data;
GstRTSPStreamPrivate *priv = stream->priv;
gint idx;
gint i;
g_mutex_lock (&priv->lock);
do {
idx = -1;
/* iterate from 1 and down, so we prioritize RTCP over RTP */
for (i = 1; i >= 0; i--) {
if (priv->have_buffer[i]) {
/* send message */
idx = i;
break;
}
}
if (idx != -1 && priv->n_outstanding == 0)
send_tcp_message (stream, idx);
} while (idx != -1 && priv->n_outstanding == 0);
GST_DEBUG_OBJECT (stream, "send thread done");
g_mutex_unlock (&priv->lock);
}
static GstFlowReturn
handle_new_sample (GstAppSink * sink, gpointer user_data)
{
......@@ -2579,6 +2606,12 @@ handle_new_sample (GstAppSink * sink, gpointer user_data)
g_mutex_lock (&priv->lock);
if (priv->send_pool == NULL) {
GST_DEBUG_OBJECT (stream, "create thread pool");
priv->send_pool =
g_thread_pool_new (send_thread_main, user_data, 1, TRUE, NULL);
}
for (i = 0; i < 2; i++)
if (GST_ELEMENT_CAST (sink) == priv->appsink[i]) {
priv->have_buffer[i] = TRUE;
......@@ -4360,37 +4393,12 @@ mcast_error:
}
}
static gboolean
cb_send_tcp_message (GstRTSPStream * stream)
{
GstRTSPStreamPrivate *priv = stream->priv;
gint idx = -1;
gint i;
g_mutex_lock (&priv->lock);
/* iterate from 1 and down, so we prioritize RTCP over RTP */
for (i = 1; i >= 0; i--) {
if (priv->have_buffer[i]) {
/* send message */
idx = i;
break;
}
}
if (idx != -1)
send_tcp_message (stream, idx);
g_mutex_unlock (&priv->lock);
return G_SOURCE_REMOVE;
}
static void
on_message_sent (gpointer user_data)
{
GstRTSPStream *stream = user_data;
GstRTSPStreamPrivate *priv = stream->priv;
gint idx = -1;
GSource *idle_src;
GST_DEBUG_OBJECT (stream, "message send complete");
......@@ -4416,24 +4424,12 @@ on_message_sent (gpointer user_data)
}
if (idx != -1) {
/* When appsink running this callback we want to send as much as we can
* But when idle callback or watch callback is running we will first
* queue an idle probe. This so we prevent a loop to occur were callback
* is sending more data that then call the callback that sends more data
* and so on. If the loop occur then it will starve out handling off
* other events that are handled by watch's context. */
if (priv->watch_context && g_main_context_is_owner (priv->watch_context)) {
/* underlaying layer is running this callback */
idle_src = g_idle_source_new ();
g_source_set_callback (idle_src, (GSourceFunc) cb_send_tcp_message,
g_object_ref (stream), g_object_unref);
g_source_attach (idle_src, priv->watch_context);
g_source_unref (idle_src);
} else {
/* appsink is running this callback */
send_tcp_message (stream, idx);
}
gint dummy;
GST_DEBUG_OBJECT (stream, "start thread");
g_thread_pool_push (priv->send_pool, &dummy, NULL);
}
g_mutex_unlock (&priv->lock);
return;
......@@ -5802,28 +5798,3 @@ gst_rtsp_stream_get_ulpfec_percentage (GstRTSPStream * stream)
return res;
}
/**
* gst_rtsp_stream_set_watch_context:
* @stream: a #GstRTSPStream
* @context: a #GMainContext
*
* Sets stream private watch_context.
*
*/
void
gst_rtsp_stream_set_watch_context (GstRTSPStream * stream,
GMainContext * context)
{
GstRTSPStreamPrivate *priv;
priv = stream->priv;
g_mutex_lock (&priv->lock);
if (priv->watch_context != NULL) {
g_main_context_unref (priv->watch_context);
priv->watch_context = NULL;
}
if (context)
priv->watch_context = g_main_context_ref (context);
g_mutex_unlock (&priv->lock);
}
......@@ -354,9 +354,6 @@ void gst_rtsp_stream_set_ulpfec_percentage (GstRTSPStream *stream,
GST_RTSP_SERVER_API
guint gst_rtsp_stream_get_ulpfec_percentage (GstRTSPStream *stream);
GST_RTSP_SERVER_API
void gst_rtsp_stream_set_watch_context (GstRTSPStream * stream, GMainContext * context);
/**
* GstRTSPStreamTransportFilterFunc:
* @stream: a #GstRTSPStream object
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment