Commit 92396be1 authored by Wim Taymans's avatar Wim Taymans
Browse files

gst/rtsp/gstrtspsrc.*: Fix sending RTCP to the right place.

Original commit message from CVS:
* gst/rtsp/gstrtspsrc.c: (gst_rtspsrc_init),
(gst_rtspsrc_finalize), (gst_rtspsrc_stream_configure_transport),
(gst_rtspsrc_handle_request), (gst_rtspsrc_loop_interleaved),
(gst_rtspsrc_send_keep_alive), (gst_rtspsrc_loop_udp),
(gst_rtspsrc_loop_send_cmd), (gst_rtspsrc_try_send),
(gst_rtspsrc_open), (gst_rtspsrc_handle_message):
* gst/rtsp/gstrtspsrc.h:
Fix sending RTCP to the right place.
Fix bug in reffing the wrong UDP element.
Use new pad names for the session manager.
Implement handling server requests in interleaved and UDP modes.
Handle session keep-alive in UDP modes.
Remove GCond for handling UDP timeouts.
* gst/rtsp/rtspconnection.c: (rtsp_connection_connect),
(rtsp_connection_send), (rtsp_connection_read), (read_body),
(rtsp_connection_receive), (rtsp_connection_close):
* gst/rtsp/rtspconnection.h:
Store connection IP address for later.
Add timeout args to all operations that might block forever.
Parse session timeout.
Only close sockets when not already closed.
* gst/rtsp/rtspdefs.c:
* gst/rtsp/rtspdefs.h:
Add timeout return value and error string.
* gst/rtsp/rtspmessage.c: (rtsp_message_init_response):
Add small comment.
parent f34fce9d
2007-05-02 Wim Taymans <wim@fluendo.com>
* gst/rtsp/gstrtspsrc.c: (gst_rtspsrc_init),
(gst_rtspsrc_finalize), (gst_rtspsrc_stream_configure_transport),
(gst_rtspsrc_handle_request), (gst_rtspsrc_loop_interleaved),
(gst_rtspsrc_send_keep_alive), (gst_rtspsrc_loop_udp),
(gst_rtspsrc_loop_send_cmd), (gst_rtspsrc_try_send),
(gst_rtspsrc_open), (gst_rtspsrc_handle_message):
* gst/rtsp/gstrtspsrc.h:
Fix sending RTCP to the right place.
Fix bug in reffing the wrong UDP element.
Use new pad names for the session manager.
Implement handling server requests in interleaved and UDP modes.
Handle session keep-alive in UDP modes.
Remove GCond for handling UDP timeouts.
* gst/rtsp/rtspconnection.c: (rtsp_connection_connect),
(rtsp_connection_send), (rtsp_connection_read), (read_body),
(rtsp_connection_receive), (rtsp_connection_close):
* gst/rtsp/rtspconnection.h:
Store connection IP address for later.
Add timeout args to all operations that might block forever.
Parse session timeout.
Only close sockets when not already closed.
* gst/rtsp/rtspdefs.c:
* gst/rtsp/rtspdefs.h:
Add timeout return value and error string.
* gst/rtsp/rtspmessage.c: (rtsp_message_init_response):
Add small comment.
2007-05-01 Wim Taymans <wim@fluendo.com>
Patch by: Sjoerd Simons <sjoerd at luon dot net>
......
......@@ -290,8 +290,6 @@ gst_rtspsrc_init (GstRTSPSrc * src, GstRTSPSrcClass * g_class)
src->stream_rec_lock = g_new (GStaticRecMutex, 1);
g_static_rec_mutex_init (src->stream_rec_lock);
src->loop_cond = g_cond_new ();
src->location = g_strdup (DEFAULT_LOCATION);
src->url = NULL;
......@@ -316,7 +314,6 @@ gst_rtspsrc_finalize (GObject * object)
g_static_rec_mutex_free (rtspsrc->stream_rec_lock);
g_free (rtspsrc->stream_rec_lock);
g_cond_free (rtspsrc->loop_cond);
g_free (rtspsrc->location);
g_free (rtspsrc->req_location);
g_free (rtspsrc->content_base);
......@@ -1280,8 +1277,8 @@ use_no_manager:
goto no_element;
/* take ownership */
gst_object_ref (stream->udpsrc[0]);
gst_object_sink (stream->udpsrc[0]);
gst_object_ref (stream->udpsrc[1]);
gst_object_sink (stream->udpsrc[1]);
gst_element_set_state (stream->udpsrc[1], GST_STATE_READY);
}
......@@ -1350,9 +1347,11 @@ use_no_manager:
else
port = transport->server_port.max;
destination = transport->destination;
/* first take the source, then the endpoint to figure out where to send
* the RTCP. */
destination = transport->source;
if (destination == NULL)
destination = src->addr;
destination = src->connection->ip;
GST_DEBUG_OBJECT (src, "configure UDP sink for %s:%d", destination, port);
......@@ -1375,7 +1374,7 @@ use_no_manager:
stream->rtcppad = gst_element_get_pad (stream->udpsink, "sink");
/* get session RTCP pad */
name = g_strdup_printf ("rtcp_src_%d", stream->id);
name = g_strdup_printf ("send_rtcp_src_%d", stream->id);
pad = gst_element_get_request_pad (src->session, name);
g_free (name);
......@@ -1557,10 +1556,43 @@ gst_rtspsrc_push_event (GstRTSPSrc * src, GstEvent * event)
gst_event_unref (event);
}
/* FIXME, handle server request, reply with OK, for now */
static RTSPResult
gst_rtspsrc_handle_request (GstRTSPSrc * src, RTSPMessage * request)
{
RTSPMessage response = { 0 };
RTSPResult res;
GST_DEBUG_OBJECT (src, "got server request message");
if (src->debug)
rtsp_message_dump (request);
res = rtsp_message_init_response (&response, RTSP_STS_OK, "OK", request);
if (res < 0)
goto send_error;
GST_DEBUG_OBJECT (src, "replying with OK");
if (src->debug)
rtsp_message_dump (&response);
if ((res = rtsp_connection_send (src->connection, &response, NULL)) < 0)
goto send_error;
return RTSP_OK;
/* ERRORS */
send_error:
{
return res;
}
}
static void
gst_rtspsrc_loop_interleaved (GstRTSPSrc * src)
{
RTSPMessage response = { 0 };
RTSPMessage message = { 0 };
RTSPResult res;
gint channel;
GList *lstream;
......@@ -1570,18 +1602,38 @@ gst_rtspsrc_loop_interleaved (GstRTSPSrc * src)
guint size;
GstFlowReturn ret = GST_FLOW_OK;
GstBuffer *buf;
gboolean is_rtcp = FALSE;
gboolean is_rtcp, have_data;
have_data = FALSE;
do {
GST_DEBUG_OBJECT (src, "doing receive");
if ((res = rtsp_connection_receive (src->connection, &response)) < 0)
if ((res = rtsp_connection_receive (src->connection, &message, NULL)) < 0)
goto receive_error;
GST_DEBUG_OBJECT (src, "got packet type %d", response.type);
switch (message.type) {
case RTSP_MESSAGE_REQUEST:
/* server sends us a request message, handle it */
if ((res = gst_rtspsrc_handle_request (src, &message)) < 0)
goto handle_request_failed;
break;
case RTSP_MESSAGE_RESPONSE:
/* we ignore response messages */
GST_DEBUG_OBJECT (src, "ignoring response message");
break;
case RTSP_MESSAGE_DATA:
GST_DEBUG_OBJECT (src, "got data message");
have_data = TRUE;
break;
default:
GST_WARNING_OBJECT (src, "ignoring unknown message type %d",
message.type);
break;
}
}
while (response.type != RTSP_MESSAGE_DATA);
while (!have_data);
channel = response.type_data.data.channel;
channel = message.type_data.data.channel;
lstream = g_list_find_custom (src->streams, GINT_TO_POINTER (channel),
(GCompareFunc) find_stream_by_channel);
......@@ -1591,13 +1643,16 @@ gst_rtspsrc_loop_interleaved (GstRTSPSrc * src)
stream = (GstRTSPStream *) lstream->data;
if (channel == stream->channel[0]) {
outpad = stream->channelpad[0];
is_rtcp = FALSE;
} else if (channel == stream->channel[1]) {
outpad = stream->channelpad[1];
is_rtcp = TRUE;
} else {
is_rtcp = FALSE;
}
/* take a look at the body to figure out what we have */
rtsp_message_get_body (&response, &data, &size);
rtsp_message_get_body (&message, &data, &size);
if (size < 2)
goto invalid_length;
......@@ -1612,8 +1667,8 @@ gst_rtspsrc_loop_interleaved (GstRTSPSrc * src)
if (outpad == NULL)
goto unknown_stream;
/* and chain buffer to internal element */
rtsp_message_steal_body (&response, &data, &size);
/* take the message body for further processing */
rtsp_message_steal_body (&message, &data, &size);
/* strip the trailing \0 */
size -= 1;
......@@ -1624,7 +1679,7 @@ gst_rtspsrc_loop_interleaved (GstRTSPSrc * src)
GST_BUFFER_SIZE (buf) = size;
/* don't need message anymore */
rtsp_message_unset (&response);
rtsp_message_unset (&message);
GST_DEBUG_OBJECT (src, "pushing data of size %d on channel %d", size,
channel);
......@@ -1652,7 +1707,7 @@ gst_rtspsrc_loop_interleaved (GstRTSPSrc * src)
unknown_stream:
{
GST_DEBUG_OBJECT (src, "unknown stream on channel %d, ignored", channel);
rtsp_message_unset (&response);
rtsp_message_unset (&message);
return;
}
receive_error:
......@@ -1664,9 +1719,20 @@ receive_error:
g_free (str);
if (src->debug)
rtsp_message_dump (&response);
rtsp_message_dump (&message);
rtsp_message_unset (&response);
rtsp_message_unset (&message);
ret = GST_FLOW_UNEXPECTED;
goto need_pause;
}
handle_request_failed:
{
gchar *str = rtsp_strresult (res);
GST_ELEMENT_ERROR (src, RESOURCE, WRITE, (NULL),
("Could not send message. (%s)", str));
g_free (str);
rtsp_message_unset (&message);
ret = GST_FLOW_UNEXPECTED;
goto need_pause;
}
......@@ -1674,7 +1740,7 @@ invalid_length:
{
GST_ELEMENT_WARNING (src, RESOURCE, READ, (NULL),
("Short message received."));
rtsp_message_unset (&response);
rtsp_message_unset (&message);
return;
}
need_pause:
......@@ -1707,21 +1773,118 @@ need_pause:
}
}
/* send server keep-alive */
static RTSPResult
gst_rtspsrc_send_keep_alive (GstRTSPSrc * src)
{
RTSPMessage request = { 0 };
RTSPMessage response = { 0 };
RTSPResult res;
GST_DEBUG_OBJECT (src, "creating server keep-alive");
res =
rtsp_message_init_request (&request, RTSP_GET_PARAMETER,
src->req_location);
if (res < 0)
goto send_error;
if (!gst_rtspsrc_send (src, &request, &response, NULL))
goto send_error;
rtsp_message_unset (&request);
return RTSP_OK;
/* ERRORS */
send_error:
{
gchar *str = rtsp_strresult (res);
rtsp_message_unset (&request);
GST_ELEMENT_WARNING (src, RESOURCE, WRITE, (NULL),
("Could not send keep-alive. (%s)", str));
g_free (str);
return res;
}
}
static void
gst_rtspsrc_loop_udp (GstRTSPSrc * src)
{
gboolean restart = FALSE;
RTSPResult res;
GST_OBJECT_LOCK (src);
if (src->loop_cmd == CMD_STOP)
goto stopping;
/* FIXME, we should continue reading the TCP socket because the server might
* send us requests */
while (src->loop_cmd == CMD_WAIT) {
GST_DEBUG_OBJECT (src, "waiting");
GST_RTSP_LOOP_WAIT (src);
GST_DEBUG_OBJECT (src, "waiting done");
GTimeVal tv_timeout;
gint timeout;
GST_OBJECT_UNLOCK (src);
while (TRUE) {
RTSPMessage message = { 0 };
/* calculate the session timeout. We should send the keep-alive request a
* little earlier to compensate for the round trip time to the server. We
* subtract 1 second here. */
timeout = src->connection->timeout;
if (timeout > 1)
timeout -= 1;
/* use the session timeout for receiving data */
tv_timeout.tv_sec = timeout;
tv_timeout.tv_usec = 0;
GST_DEBUG_OBJECT (src, "doing receive with timeout %d seconds", timeout);
/* we should continue reading the TCP socket because the server might
* send us requests. When the session timeout expires, we need to send a
* keep-alive request to keep the session open. */
res = rtsp_connection_receive (src->connection, &message, &tv_timeout);
switch (res) {
case RTSP_OK:
GST_DEBUG_OBJECT (src, "we received a server message");
break;
case RTSP_EINTR:
/* we got interrupted, see what we have to do */
GST_DEBUG_OBJECT (src, "we got interrupted");
/* unset flushing so we can do something else */
rtsp_connection_flush (src->connection, FALSE);
goto interrupt;
case RTSP_ETIMEOUT:
/* ignore result, a warning was posted */
GST_DEBUG_OBJECT (src, "timout, sending keep-alive");
res = gst_rtspsrc_send_keep_alive (src);
continue;
default:
goto receive_error;
}
switch (message.type) {
case RTSP_MESSAGE_REQUEST:
/* server sends us a request message, handle it */
if ((res = gst_rtspsrc_handle_request (src, &message)) < 0)
goto handle_request_failed;
break;
case RTSP_MESSAGE_RESPONSE:
case RTSP_MESSAGE_DATA:
/* we ignore response and data messages */
GST_DEBUG_OBJECT (src, "ignoring message");
break;
default:
GST_WARNING_OBJECT (src, "ignoring unknown message type %d",
message.type);
break;
}
}
interrupt:
GST_OBJECT_LOCK (src);
GST_DEBUG_OBJECT (src, "we have command %d", src->loop_cmd);
if (src->loop_cmd == CMD_STOP)
goto stopping;
}
......@@ -1762,7 +1925,7 @@ gst_rtspsrc_loop_udp (GstRTSPSrc * src)
gst_rtspsrc_close (src);
/* see if we have TCP left to try */
if (!(src->cur_protocols & RTSP_LOWER_TRANS_TCP))
if (!(src->protocols & RTSP_LOWER_TRANS_TCP))
goto no_protocols;
/* open new connection using tcp */
......@@ -1790,6 +1953,24 @@ stopping:
gst_task_pause (src->task);
return;
}
receive_error:
{
gchar *str = rtsp_strresult (res);
GST_ELEMENT_WARNING (src, RESOURCE, READ, (NULL),
("Could not receive message. (%s)", str));
g_free (str);
return;
}
handle_request_failed:
{
gchar *str = rtsp_strresult (res);
GST_ELEMENT_WARNING (src, RESOURCE, READ, (NULL),
("Could not handle server message. (%s)", str));
g_free (str);
return;
}
no_protocols:
{
src->cur_protocols = 0;
......@@ -1815,7 +1996,8 @@ gst_rtspsrc_loop_send_cmd (GstRTSPSrc * src, gint cmd)
{
GST_OBJECT_LOCK (src);
src->loop_cmd = cmd;
GST_RTSP_LOOP_SIGNAL (src);
if (cmd != CMD_WAIT)
rtsp_connection_flush (src->connection, TRUE);
GST_OBJECT_UNLOCK (src);
}
......@@ -1828,36 +2010,6 @@ gst_rtspsrc_loop (GstRTSPSrc * src)
gst_rtspsrc_loop_udp (src);
}
static RTSPResult
gst_rtspsrc_handle_request (GstRTSPSrc * src, RTSPMessage * request)
{
RTSPMessage response = { 0 };
RTSPResult res;
res = rtsp_message_init_response (&response, RTSP_STS_OK, "OK", request);
if (res < 0)
goto send_error;
if (src->debug)
rtsp_message_dump (&response);
if ((res = rtsp_connection_send (src->connection, &response)) < 0)
goto send_error;
return RTSP_OK;
/* ERRORS */
send_error:
{
gchar *str = rtsp_strresult (res);
GST_ELEMENT_ERROR (src, RESOURCE, WRITE, (NULL),
("Could not send message. (%s)", str));
g_free (str);
return res;
}
}
#ifndef GST_DISABLE_GST_DEBUG
const gchar *
rtsp_auth_method_to_string (RTSPAuthMethod method)
......@@ -2085,11 +2237,11 @@ gst_rtspsrc_try_send (GstRTSPSrc * src, RTSPMessage * request,
if (src->debug)
rtsp_message_dump (request);
if ((res = rtsp_connection_send (src->connection, request)) < 0)
if ((res = rtsp_connection_send (src->connection, request, NULL)) < 0)
goto send_error;
next:
if ((res = rtsp_connection_receive (src->connection, response)) < 0)
if ((res = rtsp_connection_receive (src->connection, response, NULL)) < 0)
goto receive_error;
if (src->debug)
......@@ -2097,16 +2249,17 @@ next:
switch (response->type) {
case RTSP_MESSAGE_REQUEST:
/* FIXME, handle server request, reply with OK, for now */
if ((res = gst_rtspsrc_handle_request (src, response)) < 0)
goto handle_request_failed;
goto next;
case RTSP_MESSAGE_RESPONSE:
/* ok, a response is good */
GST_DEBUG_OBJECT (src, "received response message");
break;
default:
case RTSP_MESSAGE_DATA:
/* get next response */
GST_DEBUG_OBJECT (src, "ignoring data response message");
goto next;
}
......@@ -2587,7 +2740,7 @@ gst_rtspsrc_open (GstRTSPSrc * src)
/* connect */
GST_DEBUG_OBJECT (src, "connecting (%s)...", src->req_location);
if ((res = rtsp_connection_connect (src->connection)) < 0)
if ((res = rtsp_connection_connect (src->connection, NULL)) < 0)
goto could_not_connect;
/* create OPTIONS */
......@@ -3116,8 +3269,8 @@ gst_rtspsrc_handle_message (GstBin * bin, GstMessage * message)
gst_message_unref (message);
break;
/* fatal our not our message, forward */
forward:
/* fatal but not our message, forward */
GST_BIN_CLASS (parent_class)->handle_message (bin, message);
break;
}
......
......@@ -71,10 +71,6 @@ typedef struct _GstRTSPSrcClass GstRTSPSrcClass;
#define GST_RTSP_STATE_LOCK(rtsp) (g_mutex_lock (GST_RTSP_STATE_GET_LOCK(rtsp)))
#define GST_RTSP_STATE_UNLOCK(rtsp) (g_mutex_unlock (GST_RTSP_STATE_GET_LOCK(rtsp)))
#define GST_RTSP_LOOP_GET_COND(rtsp) (GST_RTSPSRC_CAST(rtsp)->loop_cond)
#define GST_RTSP_LOOP_WAIT(rtsp) (g_cond_wait(GST_RTSP_LOOP_GET_COND (rtsp), GST_OBJECT_GET_LOCK (rtsp)))
#define GST_RTSP_LOOP_SIGNAL(rtsp) (g_cond_signal(GST_RTSP_LOOP_GET_COND (rtsp)))
typedef struct _GstRTSPStream GstRTSPStream;
#include "rtspext.h"
......@@ -126,7 +122,6 @@ struct _GstRTSPSrc {
gint free_channel;
/* cond to signal loop */
GCond *loop_cond;
gint loop_cmd;
GMutex *state_lock;
......
......@@ -160,7 +160,7 @@ no_socket_pair:
}
RTSPResult
rtsp_connection_connect (RTSPConnection * conn)
rtsp_connection_connect (RTSPConnection * conn, GTimeVal * timeout)
{
gint fd;
struct sockaddr_in sin;
......@@ -210,6 +210,7 @@ rtsp_connection_connect (RTSPConnection * conn)
goto sys_error;
conn->fd = fd;
conn->ip = ip;
return RTSP_OK;
......@@ -259,7 +260,8 @@ append_auth_header (RTSPConnection * conn, RTSPMessage * message, GString * str)
}
RTSPResult
rtsp_connection_send (RTSPConnection * conn, RTSPMessage * message)
rtsp_connection_send (RTSPConnection * conn, RTSPMessage * message,
GTimeVal * timeout)
{
GString *str;
gint towrite;
......@@ -543,11 +545,13 @@ no_column:
}
RTSPResult
rtsp_connection_read (RTSPConnection * conn, gpointer data, guint size)
rtsp_connection_read (RTSPConnection * conn, gpointer data, guint size,
GTimeVal * timeout)
{
fd_set readfds;
guint toread;
gint retval;
struct timeval tv_timeout, *ptv_timeout = NULL;
#ifndef G_OS_WIN32
gint avail;
......@@ -570,6 +574,13 @@ rtsp_connection_read (RTSPConnection * conn, gpointer data, guint size)
else if (avail >= toread)
goto do_read;
/* configure timeout if any */
if (timeout != NULL) {
tv_timeout.tv_sec = timeout->tv_sec;
tv_timeout.tv_usec = timeout->tv_usec;
ptv_timeout = &tv_timeout;
}
FD_ZERO (&readfds);
FD_SET (conn->fd, &readfds);
FD_SET (READ_SOCKET (conn), &readfds);
......@@ -578,12 +589,16 @@ rtsp_connection_read (RTSPConnection * conn, gpointer data, guint size)
gint bytes;
do {
retval = select (FD_SETSIZE, &readfds, NULL, NULL, NULL);
retval = select (FD_SETSIZE, &readfds, NULL, NULL, ptv_timeout);
} while ((retval == -1 && errno == EINTR));
if (retval == -1)
goto select_error;
/* check for timeout */
if (retval == 0)
goto select_timeout;
if (FD_ISSET (READ_SOCKET (conn), &readfds)) {
/* read all stop commands */
while (TRUE) {
......@@ -621,6 +636,10 @@ select_error:
{
return RTSP_ESYS;
}
select_timeout:
{
return RTSP_ETIMEOUT;
}
stopped:
{
return RTSP_EINTR;
......@@ -636,7 +655,8 @@ read_error:
}
static RTSPResult
read_body (RTSPConnection * conn, glong content_length, RTSPMessage * msg)
read_body (RTSPConnection * conn, glong content_length, RTSPMessage * msg,
GTimeVal * timeout)
{
gchar *body;
RTSPResult res;
......@@ -650,7 +670,8 @@ read_body (RTSPConnection * conn, glong content_length, RTSPMessage * msg)
body = g_malloc (content_length + 1);
body[content_length] = '\0';
RTSP_CHECK (rtsp_connection_read (conn, body, content_length), read_error);
RTSP_CHECK (rtsp_connection_read (conn, body, content_length, timeout),
read_error);
content_length += 1;
......@@ -668,7 +689,8 @@ read_error:
}
RTSPResult
rtsp_connection_receive (RTSPConnection * conn, RTSPMessage * msg)
rtsp_connection_receive (RTSPConnection * conn, RTSPMessage * msg,