Commit 4c7fad2d authored by Sebastian Dröge's avatar Sebastian Dröge 🍵
Browse files

rtspsrc: Use a mutex for protecting against concurrent send/receives

We currently send data to the RTSP connection from multiple threads:
whenever a command is to be handled and whenever RTCP is generated. This
can cause data corruption or worse if both happen at the same time.

As such, protect gst_rtsp_connection_send() and gst_rtsp_connection_receive()
calls with a mutex. While this means that we hold a mutex during the IO
operation, this is not actually a problem as the IO operation can be
interrupted (gst_rtsp_connection_flush()) at any time and is blocking by
itself anyway.
parent 0421fb04
......@@ -2052,29 +2052,35 @@ gst_rtspsrc_flush (GstRTSPSrc * src, gboolean flush, gboolean playing)
}
static GstRTSPResult
gst_rtspsrc_connection_send (GstRTSPSrc * src, GstRTSPConnection * conn,
gst_rtspsrc_connection_send (GstRTSPSrc * src, GstRTSPConnInfo * conninfo,
GstRTSPMessage * message, GTimeVal * timeout)
{
GstRTSPResult ret;
if (conn)
ret = gst_rtsp_connection_send (conn, message, timeout);
else
if (conninfo->connection) {
g_mutex_lock (&conninfo->send_lock);
ret = gst_rtsp_connection_send (conninfo->connection, message, timeout);
g_mutex_unlock (&conninfo->send_lock);
} else {
ret = GST_RTSP_ERROR;
}
return ret;
}
static GstRTSPResult
gst_rtspsrc_connection_receive (GstRTSPSrc * src, GstRTSPConnection * conn,
gst_rtspsrc_connection_receive (GstRTSPSrc * src, GstRTSPConnInfo * conninfo,
GstRTSPMessage * message, GTimeVal * timeout)
{
GstRTSPResult ret;
if (conn)
ret = gst_rtsp_connection_receive (conn, message, timeout);
else
if (conninfo->connection) {
g_mutex_lock (&conninfo->send_lock);
ret = gst_rtsp_connection_receive (conninfo->connection, message, timeout);
g_mutex_unlock (&conninfo->send_lock);
} else {
ret = GST_RTSP_ERROR;
}
return ret;
}
......@@ -2461,7 +2467,7 @@ gst_rtspsrc_sink_chain (GstPad * pad, GstObject * parent, GstBuffer * buffer)
guint size;
GstRTSPResult ret;
GstRTSPMessage message = { 0 };
GstRTSPConnection *conn;
GstRTSPConnInfo *conninfo;
stream = (GstRTSPStream *) gst_pad_get_element_private (pad);
src = stream->parent;
......@@ -2476,12 +2482,12 @@ gst_rtspsrc_sink_chain (GstPad * pad, GstObject * parent, GstBuffer * buffer)
gst_rtsp_message_take_body (&message, data, size);
if (stream->conninfo.connection)
conn = stream->conninfo.connection;
conninfo = &stream->conninfo;
else
conn = src->conninfo.connection;
conninfo = &src->conninfo;
GST_DEBUG_OBJECT (src, "sending %u bytes RTCP", size);
ret = gst_rtspsrc_connection_send (src, conn, &message, NULL);
ret = gst_rtspsrc_connection_send (src, conninfo, &message, NULL);
GST_DEBUG_OBJECT (src, "sent RTCP, %d", ret);
/* and steal it away again because we will free it when unreffing the
......@@ -4163,6 +4169,10 @@ gst_rtsp_conninfo_connect (GstRTSPSrc * src, GstRTSPConnInfo * info,
goto could_not_connect;
}
} while (!info->connected && retry);
g_mutex_init (&info->send_lock);
g_mutex_init (&info->recv_lock);
gst_rtsp_message_unset (&response);
return GST_RTSP_OK;
......@@ -4207,6 +4217,9 @@ gst_rtsp_conninfo_close (GstRTSPSrc * src, GstRTSPConnInfo * info,
gst_rtsp_connection_free (info->connection);
info->connection = NULL;
info->flushing = FALSE;
g_mutex_clear (&info->send_lock);
g_mutex_clear (&info->recv_lock);
}
GST_RTSP_STATE_UNLOCK (src);
return GST_RTSP_OK;
......@@ -4267,7 +4280,7 @@ gst_rtspsrc_init_request (GstRTSPSrc * src, GstRTSPMessage * msg,
/* FIXME, handle server request, reply with OK, for now */
static GstRTSPResult
gst_rtspsrc_handle_request (GstRTSPSrc * src, GstRTSPConnection * conn,
gst_rtspsrc_handle_request (GstRTSPSrc * src, GstRTSPConnInfo * conninfo,
GstRTSPMessage * request)
{
GstRTSPMessage response = { 0 };
......@@ -4296,7 +4309,7 @@ gst_rtspsrc_handle_request (GstRTSPSrc * src, GstRTSPConnection * conn,
if (src->debug)
gst_rtsp_message_dump (&response);
res = gst_rtspsrc_connection_send (src, conn, &response, NULL);
res = gst_rtspsrc_connection_send (src, conninfo, &response, NULL);
if (res < 0)
goto send_error;
......@@ -4348,9 +4361,7 @@ gst_rtspsrc_send_keep_alive (GstRTSPSrc * src)
if (src->debug)
gst_rtsp_message_dump (&request);
res =
gst_rtspsrc_connection_send (src, src->conninfo.connection, &request,
NULL);
res = gst_rtspsrc_connection_send (src, &src->conninfo, &request, NULL);
if (res < 0)
goto send_error;
......@@ -4631,7 +4642,7 @@ gst_rtspsrc_loop_interleaved (GstRTSPSrc * src)
/* protect the connection with the connection lock so that we can see when
* we are finished doing server communication */
res =
gst_rtspsrc_connection_receive (src, src->conninfo.connection,
gst_rtspsrc_connection_receive (src, &src->conninfo,
&message, src->ptcp_timeout);
switch (res) {
......@@ -4657,9 +4668,7 @@ gst_rtspsrc_loop_interleaved (GstRTSPSrc * src)
switch (message.type) {
case GST_RTSP_MESSAGE_REQUEST:
/* server sends us a request message, handle it */
res =
gst_rtspsrc_handle_request (src, src->conninfo.connection,
&message);
res = gst_rtspsrc_handle_request (src, &src->conninfo, &message);
if (res == GST_RTSP_EEOF)
goto server_eof;
else if (res < 0)
......@@ -4750,7 +4759,7 @@ gst_rtspsrc_loop_udp (GstRTSPSrc * src)
/* we should continue reading the TCP socket because the server might
* send us requests. When the session timeout expires, we need to send a
* keep-alive request to keep the session open. */
res = gst_rtspsrc_connection_receive (src, src->conninfo.connection,
res = gst_rtspsrc_connection_receive (src, &src->conninfo,
&message, &tv_timeout);
switch (res) {
......@@ -4790,9 +4799,7 @@ gst_rtspsrc_loop_udp (GstRTSPSrc * src)
switch (message.type) {
case GST_RTSP_MESSAGE_REQUEST:
/* server sends us a request message, handle it */
res =
gst_rtspsrc_handle_request (src, src->conninfo.connection,
&message);
res = gst_rtspsrc_handle_request (src, &src->conninfo, &message);
if (res == GST_RTSP_EEOF)
goto server_eof;
else if (res < 0)
......@@ -5334,7 +5341,7 @@ no_user_pass:
}
static GstRTSPResult
gst_rtspsrc_try_send (GstRTSPSrc * src, GstRTSPConnection * conn,
gst_rtspsrc_try_send (GstRTSPSrc * src, GstRTSPConnInfo * conninfo,
GstRTSPMessage * request, GstRTSPMessage * response,
GstRTSPStatusCode * code)
{
......@@ -5352,14 +5359,16 @@ again:
if (src->debug)
gst_rtsp_message_dump (request);
res = gst_rtspsrc_connection_send (src, conn, request, src->ptcp_timeout);
res = gst_rtspsrc_connection_send (src, conninfo, request, src->ptcp_timeout);
if (res < 0)
goto send_error;
gst_rtsp_connection_reset_timeout (conn);
gst_rtsp_connection_reset_timeout (conninfo->connection);
next:
res = gst_rtspsrc_connection_receive (src, conn, response, src->ptcp_timeout);
res =
gst_rtspsrc_connection_receive (src, conninfo, response,
src->ptcp_timeout);
if (res < 0)
goto receive_error;
......@@ -5368,7 +5377,7 @@ next:
switch (response->type) {
case GST_RTSP_MESSAGE_REQUEST:
res = gst_rtspsrc_handle_request (src, conn, response);
res = gst_rtspsrc_handle_request (src, conninfo, response);
if (res == GST_RTSP_EEOF)
goto server_eof;
else if (res < 0)
......@@ -5496,7 +5505,7 @@ server_eof:
* Returns: #GST_RTSP_OK if the processing was successful.
*/
static GstRTSPResult
gst_rtspsrc_send (GstRTSPSrc * src, GstRTSPConnection * conn,
gst_rtspsrc_send (GstRTSPSrc * src, GstRTSPConnInfo * conninfo,
GstRTSPMessage * request, GstRTSPMessage * response,
GstRTSPStatusCode * code)
{
......@@ -5518,7 +5527,8 @@ gst_rtspsrc_send (GstRTSPSrc * src, GstRTSPConnection * conn,
method = request->type_data.request.method;
if ((res =
gst_rtspsrc_try_send (src, conn, request, response, &int_code)) < 0)
gst_rtspsrc_try_send (src, conninfo, request, response,
&int_code)) < 0)
goto error;
switch (int_code) {
......@@ -5621,8 +5631,7 @@ static GstRTSPResult
gst_rtspsrc_send_cb (GstRTSPExtension * ext, GstRTSPMessage * request,
GstRTSPMessage * response, GstRTSPSrc * src)
{
return gst_rtspsrc_send (src, src->conninfo.connection, request, response,
NULL);
return gst_rtspsrc_send (src, &src->conninfo, request, response, NULL);
}
......@@ -5991,7 +6000,7 @@ gst_rtspsrc_setup_streams (GstRTSPSrc * src, gboolean async)
goto no_streams;
for (walk = src->streams; walk; walk = g_list_next (walk)) {
GstRTSPConnection *conn;
GstRTSPConnInfo *conninfo;
gchar *transports;
gint retry = 0;
guint mask = 0;
......@@ -6056,9 +6065,9 @@ gst_rtspsrc_setup_streams (GstRTSPSrc * src, gboolean async)
GST_DEBUG_OBJECT (src, "skipping stream %p, failed to connect", stream);
continue;
}
conn = stream->conninfo.connection;
conninfo = &stream->conninfo;
} else {
conn = src->conninfo.connection;
conninfo = &src->conninfo;
}
GST_DEBUG_OBJECT (src, "doing setup of stream %p with %s", stream,
stream->conninfo.location);
......@@ -6135,7 +6144,7 @@ gst_rtspsrc_setup_streams (GstRTSPSrc * src, gboolean async)
stream->id));
/* handle the code ourselves */
res = gst_rtspsrc_send (src, conn, &request, &response, &code);
res = gst_rtspsrc_send (src, conninfo, &request, &response, &code);
if (res < 0)
goto send_error;
......@@ -6655,7 +6664,7 @@ restart:
GST_ELEMENT_PROGRESS (src, CONTINUE, "open", ("Retrieving server options"));
if ((res =
gst_rtspsrc_send (src, src->conninfo.connection, &request, &response,
gst_rtspsrc_send (src, &src->conninfo, &request, &response,
NULL)) < 0)
goto send_error;
......@@ -6682,7 +6691,7 @@ restart:
GST_ELEMENT_PROGRESS (src, CONTINUE, "open", ("Retrieving media info"));
if ((res =
gst_rtspsrc_send (src, src->conninfo.connection, &request, &response,
gst_rtspsrc_send (src, &src->conninfo, &request, &response,
NULL)) < 0)
goto send_error;
......@@ -6905,9 +6914,7 @@ gst_rtspsrc_close (GstRTSPSrc * src, gboolean async, gboolean only_close)
if (async)
GST_ELEMENT_PROGRESS (src, CONTINUE, "close", ("Closing stream"));
if ((res =
gst_rtspsrc_send (src, info->connection, &request, &response,
NULL)) < 0)
if ((res = gst_rtspsrc_send (src, info, &request, &response, NULL)) < 0)
goto send_error;
/* FIXME, parse result? */
......@@ -7199,7 +7206,7 @@ restart:
for (walk = src->streams; walk; walk = g_list_next (walk)) {
GstRTSPStream *stream = (GstRTSPStream *) walk->data;
const gchar *setup_url;
GstRTSPConnection *conn;
GstRTSPConnInfo *conninfo;
/* try aggregate control first but do non-aggregate control otherwise */
if (control)
......@@ -7208,9 +7215,9 @@ restart:
continue;
if (src->conninfo.connection) {
conn = src->conninfo.connection;
conninfo = &src->conninfo;
} else if (stream->conninfo.connection) {
conn = stream->conninfo.connection;
conninfo = &stream->conninfo;
} else {
continue;
}
......@@ -7242,7 +7249,7 @@ restart:
if (async)
GST_ELEMENT_PROGRESS (src, CONTINUE, "request", ("Sending PLAY request"));
if ((res = gst_rtspsrc_send (src, conn, &request, &response, NULL)) < 0)
if ((res = gst_rtspsrc_send (src, conninfo, &request, &response, NULL)) < 0)
goto send_error;
if (src->need_redirect) {
......@@ -7420,7 +7427,7 @@ gst_rtspsrc_pause (GstRTSPSrc * src, gboolean async)
* aggregate control */
for (walk = src->streams; walk; walk = g_list_next (walk)) {
GstRTSPStream *stream = (GstRTSPStream *) walk->data;
GstRTSPConnection *conn;
GstRTSPConnInfo *conninfo;
const gchar *setup_url;
/* try aggregate control first but do non-aggregate control otherwise */
......@@ -7430,9 +7437,9 @@ gst_rtspsrc_pause (GstRTSPSrc * src, gboolean async)
continue;
if (src->conninfo.connection) {
conn = src->conninfo.connection;
conninfo = &src->conninfo;
} else if (stream->conninfo.connection) {
conn = stream->conninfo.connection;
conninfo = &stream->conninfo;
} else {
continue;
}
......@@ -7446,7 +7453,7 @@ gst_rtspsrc_pause (GstRTSPSrc * src, gboolean async)
setup_url)) < 0)
goto create_request_failed;
if ((res = gst_rtspsrc_send (src, conn, &request, &response, NULL)) < 0)
if ((res = gst_rtspsrc_send (src, conninfo, &request, &response, NULL)) < 0)
goto send_error;
gst_rtsp_message_unset (&request);
......
......@@ -86,6 +86,9 @@ struct _GstRTSPConnInfo {
GstRTSPConnection *connection;
gboolean connected;
gboolean flushing;
GMutex send_lock;
GMutex recv_lock;
};
typedef struct _GstRTSPStream GstRTSPStream;
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment