Commit f90dac8d authored by Sebastian Dröge's avatar Sebastian Dröge 🍵

rtsp-connection: Make use of new GstRTSPMessage API for directly storing a...

rtsp-connection: Make use of new GstRTSPMessage API for directly storing a body buffer and add API for writing multiple messages

By doing so we can send a whole GstBufferList and each memory in the
contained buffers without copying into a single memory area and with a
single writev() call. This improves performance considerably for
high-packet-rate streams.

This depends on https://gitlab.gnome.org/GNOME/glib/merge_requests/333
to be efficient, otherwise each chunk of memory is a separate write()
call.

#370
parent b3c0d8b8
......@@ -1840,6 +1840,7 @@ gst_rtsp_connection_write
gst_rtsp_connection_poll
gst_rtsp_connection_send
gst_rtsp_connection_send_messages
gst_rtsp_connection_receive
gst_rtsp_connection_next_timeout
......@@ -1892,6 +1893,7 @@ gst_rtsp_watch_unref
gst_rtsp_watch_attach
gst_rtsp_watch_reset
gst_rtsp_watch_send_message
gst_rtsp_watch_send_messages
gst_rtsp_watch_write_data
gst_rtsp_watch_get_send_backlog
gst_rtsp_watch_set_send_backlog
......
......@@ -89,6 +89,46 @@ typedef struct
guint coutl;
} DecodeCtx;
typedef struct
{
/* If %TRUE we only own data and none of the
* other fields
*/
gboolean borrowed;
/* Header or full message */
guint8 *data;
guint data_size;
gboolean data_is_data_header;
/* Payload following data, if any */
guint8 *body_data;
guint body_data_size;
/* or */
GstBuffer *body_buffer;
/* DATA packet header statically allocated for above */
guint8 data_header[4];
/* all below only for async writing */
guint data_offset; /* == data_size when done */
guint body_offset; /* into body_data or the buffer */
/* ID of the message for notification */
guint id;
} GstRTSPSerializedMessage;
static void
gst_rtsp_serialized_message_clear (GstRTSPSerializedMessage * msg)
{
if (!msg->borrowed) {
g_free (msg->body_data);
gst_buffer_replace (&msg->body_buffer, NULL);
}
g_free (msg->data);
}
#ifdef MSG_NOSIGNAL
#define SEND_FLAGS MSG_NOSIGNAL
#else
......@@ -1205,6 +1245,100 @@ error:
}
}
/* NOTE: This changes the values of vectors if multiple iterations are needed!
*
* Depends on https://gitlab.gnome.org/GNOME/glib/merge_requests/333
*/
#if GLIB_CHECK_VERSION(2, 59, 0)
static GstRTSPResult
writev_bytes (GOutputStream * stream, GOutputVector * vectors, gint n_vectors,
gsize * bytes_written, gboolean block, GCancellable * cancellable)
{
gsize _bytes_written = 0;
gsize written;
GError *err = NULL;
while (n_vectors > 0) {
gboolean res;
if (block)
res = g_output_stream_writev (stream, vectors, n_vectors, &written,
cancellable, &err);
else
res =
g_pollable_output_stream_writev_nonblocking (G_POLLABLE_OUTPUT_STREAM
(stream), vectors, n_vectors, &written, cancellable, &err);
_bytes_written += written;
if (G_UNLIKELY (!res))
goto error;
/* skip vectors that have been written in full */
while (written >= vectors[0].size) {
written -= vectors[0].size;
++vectors;
--n_vectors;
}
/* skip partially written vector data */
if (written > 0) {
vectors[0].size -= written;
vectors[0].buffer = ((guint8 *) vectors[0].buffer) + written;
}
}
*bytes_written = _bytes_written;
return GST_RTSP_OK;
/* ERRORS */
error:
{
*bytes_written = _bytes_written;
if (G_UNLIKELY (written == 0))
return GST_RTSP_EEOF;
GST_DEBUG ("%s", err->message);
if (g_error_matches (err, G_IO_ERROR, G_IO_ERROR_CANCELLED)) {
g_clear_error (&err);
return GST_RTSP_EINTR;
} else if (g_error_matches (err, G_IO_ERROR, G_IO_ERROR_WOULD_BLOCK)) {
g_clear_error (&err);
return GST_RTSP_EINTR;
} else if (g_error_matches (err, G_IO_ERROR, G_IO_ERROR_TIMED_OUT)) {
g_clear_error (&err);
return GST_RTSP_ETIMEOUT;
}
g_clear_error (&err);
return GST_RTSP_ESYS;
}
}
#else
static GstRTSPResult
writev_bytes (GOutputStream * stream, GOutputVector * vectors, gint n_vectors,
gsize * bytes_written, gboolean block, GCancellable * cancellable)
{
gsize _bytes_written = 0;
guint written;
gint i;
GstRTSPResult res;
for (i = 0; i < n_vectors; i++) {
written = 0;
res =
write_bytes (stream, vectors[i].buffer, &written, vectors[i].size,
block, cancellable);
_bytes_written += written;
if (G_UNLIKELY (res != GST_RTSP_OK))
break;
}
*bytes_written = _bytes_written;
return res;
}
#endif
static gint
fill_raw_bytes (GstRTSPConnection * conn, guint8 * buffer, guint size,
gboolean block, GError ** err)
......@@ -1465,6 +1599,7 @@ read_line (GstRTSPConnection * conn, guint8 * buffer, guint * idx, guint size,
*
* Returns: #GST_RTSP_OK on success.
*/
/* FIXME 2.0: This should've been static! */
GstRTSPResult
gst_rtsp_connection_write (GstRTSPConnection * conn, const guint8 * data,
guint size, GTimeVal * timeout)
......@@ -1490,15 +1625,22 @@ gst_rtsp_connection_write (GstRTSPConnection * conn, const guint8 * data,
return res;
}
static GString *
message_to_string (GstRTSPConnection * conn, GstRTSPMessage * message)
static gboolean
serialize_message (GstRTSPConnection * conn, GstRTSPMessage * message,
GstRTSPSerializedMessage * serialized_message)
{
GString *str = NULL;
str = g_string_new ("");
memset (serialized_message, 0, sizeof (*serialized_message));
/* Initially we borrow the body_data / body_buffer fields from
* the message */
serialized_message->borrowed = TRUE;
switch (message->type) {
case GST_RTSP_MESSAGE_REQUEST:
str = g_string_new ("");
/* create request string, add CSeq */
g_string_append_printf (str, "%s %s RTSP/%s\r\n"
"CSeq: %d\r\n",
......@@ -1516,12 +1658,16 @@ message_to_string (GstRTSPConnection * conn, GstRTSPMessage * message)
add_auth_header (conn, message);
break;
case GST_RTSP_MESSAGE_RESPONSE:
str = g_string_new ("");
/* create response string */
g_string_append_printf (str, "RTSP/%s %d %s\r\n",
gst_rtsp_version_as_text (message->type_data.response.version),
message->type_data.response.code, message->type_data.response.reason);
break;
case GST_RTSP_MESSAGE_HTTP_REQUEST:
str = g_string_new ("");
/* create request string */
g_string_append_printf (str, "%s %s HTTP/%s\r\n",
gst_rtsp_method_as_text (message->type_data.request.method),
......@@ -1531,6 +1677,8 @@ message_to_string (GstRTSPConnection * conn, GstRTSPMessage * message)
add_auth_header (conn, message);
break;
case GST_RTSP_MESSAGE_HTTP_RESPONSE:
str = g_string_new ("");
/* create response string */
g_string_append_printf (str, "HTTP/%s %d %s\r\n",
gst_rtsp_version_as_text (message->type_data.request.version),
......@@ -1538,7 +1686,7 @@ message_to_string (GstRTSPConnection * conn, GstRTSPMessage * message)
break;
case GST_RTSP_MESSAGE_DATA:
{
guint8 data_header[4];
guint8 *data_header = serialized_message->data_header;
/* prepare data header */
data_header[0] = '$';
......@@ -1546,16 +1694,22 @@ message_to_string (GstRTSPConnection * conn, GstRTSPMessage * message)
data_header[2] = (message->body_size >> 8) & 0xff;
data_header[3] = message->body_size & 0xff;
/* create string with header and data */
str = g_string_append_len (str, (gchar *) data_header, 4);
str =
g_string_append_len (str, (gchar *) message->body,
message->body_size);
/* create serialized message with header and data */
serialized_message->data_is_data_header = TRUE;
serialized_message->data_size = 4;
if (message->body) {
serialized_message->body_data = message->body;
serialized_message->body_data_size = message->body_size;
} else {
g_assert (message->body_buffer != NULL);
serialized_message->body_buffer = message->body_buffer;
}
break;
}
default:
g_string_free (str, TRUE);
g_return_val_if_reached (NULL);
g_return_val_if_reached (FALSE);
break;
}
......@@ -1563,6 +1717,8 @@ message_to_string (GstRTSPConnection * conn, GstRTSPMessage * message)
if (message->type != GST_RTSP_MESSAGE_DATA) {
gchar date_string[100];
g_assert (str != NULL);
gen_date_string (date_string, sizeof (date_string));
/* add date header */
......@@ -1573,7 +1729,7 @@ message_to_string (GstRTSPConnection * conn, GstRTSPMessage * message)
gst_rtsp_message_append_headers (message, str);
/* append Content-Length and body if needed */
if (message->body != NULL && message->body_size > 0) {
if (message->body_size > 0) {
gchar *len;
len = g_strdup_printf ("%d", message->body_size);
......@@ -1582,16 +1738,24 @@ message_to_string (GstRTSPConnection * conn, GstRTSPMessage * message)
g_free (len);
/* header ends here */
g_string_append (str, "\r\n");
str =
g_string_append_len (str, (gchar *) message->body,
message->body_size);
if (message->body) {
serialized_message->body_data = message->body;
serialized_message->body_data_size = message->body_size;
} else {
g_assert (message->body_buffer != NULL);
serialized_message->body_buffer = message->body_buffer;
}
} else {
/* just end headers */
g_string_append (str, "\r\n");
}
serialized_message->data_size = str->len;
serialized_message->data = (guint8 *) g_string_free (str, FALSE);
}
return str;
return TRUE;
}
/**
......@@ -1612,36 +1776,190 @@ GstRTSPResult
gst_rtsp_connection_send (GstRTSPConnection * conn, GstRTSPMessage * message,
GTimeVal * timeout)
{
GString *string = NULL;
g_return_val_if_fail (conn != NULL, GST_RTSP_EINVAL);
g_return_val_if_fail (message != NULL, GST_RTSP_EINVAL);
return gst_rtsp_connection_send_messages (conn, message, 1, timeout);
}
/**
* gst_rtsp_connection_send_messages:
* @conn: a #GstRTSPConnection
* @messages: (array length=n_messages): the messages to send
* @n_messages: the number of messages to send
* @timeout: a timeout value or %NULL
*
* Attempt to send @messages to the connected @conn, blocking up to
* the specified @timeout. @timeout can be %NULL, in which case this function
* might block forever.
*
* This function can be cancelled with gst_rtsp_connection_flush().
*
* Returns: #GST_RTSP_OK on success.
*
* Since: 1.16
*/
GstRTSPResult
gst_rtsp_connection_send_messages (GstRTSPConnection * conn,
GstRTSPMessage * messages, guint n_messages, GTimeVal * timeout)
{
GstClockTime to;
GstRTSPResult res;
gchar *str;
gsize len;
GstRTSPSerializedMessage *serialized_messages;
GOutputVector *vectors;
GstMapInfo *map_infos;
guint n_vectors, n_memories;
gint i, j, k;
gsize bytes_to_write, bytes_written;
g_return_val_if_fail (conn != NULL, GST_RTSP_EINVAL);
g_return_val_if_fail (message != NULL, GST_RTSP_EINVAL);
g_return_val_if_fail (messages != NULL || n_messages == 0, GST_RTSP_EINVAL);
serialized_messages = g_newa (GstRTSPSerializedMessage, n_messages);
memset (serialized_messages, 0,
sizeof (GstRTSPSerializedMessage) * n_messages);
if (G_UNLIKELY (!(string = message_to_string (conn, message))))
for (i = 0, n_vectors = 0, n_memories = 0, bytes_to_write = 0; i < n_messages;
i++) {
if (G_UNLIKELY (!serialize_message (conn, &messages[i],
&serialized_messages[i])))
goto no_message;
if (conn->tunneled) {
str = g_base64_encode ((const guchar *) string->str, string->len);
g_string_free (string, TRUE);
len = strlen (str);
gint state = 0, save = 0;
gchar *base64_buffer, *out_buffer;
gsize written = 0;
gsize in_length;
in_length = serialized_messages[i].data_size;
if (serialized_messages[i].body_data)
in_length += serialized_messages[i].body_data_size;
else if (serialized_messages[i].body_buffer)
in_length += gst_buffer_get_size (serialized_messages[i].body_buffer);
in_length = (in_length / 3 + 1) * 4 + 4 + 1;
base64_buffer = out_buffer = g_malloc0 (in_length);
written =
g_base64_encode_step (serialized_messages[i].data_is_data_header ?
serialized_messages[i].data_header : serialized_messages[i].data,
serialized_messages[i].data_size, FALSE, out_buffer, &state, &save);
out_buffer += written;
if (serialized_messages[i].body_data) {
written =
g_base64_encode_step (serialized_messages[i].body_data,
serialized_messages[i].body_data_size, FALSE, out_buffer, &state,
&save);
out_buffer += written;
} else if (serialized_messages[i].body_buffer) {
guint j, n = gst_buffer_n_memory (serialized_messages[i].body_buffer);
for (j = 0; j < n; j++) {
GstMemory *mem =
gst_buffer_peek_memory (serialized_messages[i].body_buffer, j);
GstMapInfo map;
gst_memory_map (mem, &map, GST_MAP_READ);
written = g_base64_encode_step (map.data, map.size,
FALSE, out_buffer, &state, &save);
out_buffer += written;
gst_memory_unmap (mem, &map);
}
}
written = g_base64_encode_close (FALSE, out_buffer, &state, &save);
out_buffer += written;
gst_rtsp_serialized_message_clear (&serialized_messages[i]);
memset (&serialized_messages[i], 0, sizeof (serialized_messages[i]));
serialized_messages[i].data = (guint8 *) base64_buffer;
serialized_messages[i].data_size = (out_buffer - base64_buffer) + 1;
n_vectors++;
} else {
str = string->str;
len = string->len;
g_string_free (string, FALSE);
n_vectors++;
if (serialized_messages[i].body_data) {
n_vectors++;
} else if (serialized_messages[i].body_buffer) {
n_vectors += gst_buffer_n_memory (serialized_messages[i].body_buffer);
n_memories += gst_buffer_n_memory (serialized_messages[i].body_buffer);
}
}
}
vectors = g_newa (GOutputVector, n_vectors);
map_infos = n_memories ? g_newa (GstMapInfo, n_memories) : NULL;
/* write request */
res = gst_rtsp_connection_write (conn, (guint8 *) str, len, timeout);
for (i = 0, j = 0, k = 0; i < n_messages; i++) {
vectors[j].buffer = serialized_messages[i].data_is_data_header ?
serialized_messages[i].data_header : serialized_messages[i].data;
vectors[j].size = serialized_messages[i].data_size;
bytes_to_write += vectors[j].size;
j++;
g_free (str);
if (serialized_messages[i].body_data) {
vectors[j].buffer = serialized_messages[i].body_data;
vectors[j].size = serialized_messages[i].body_data_size;
bytes_to_write += vectors[j].size;
j++;
} else if (serialized_messages[i].body_buffer) {
gint l, n;
n = gst_buffer_n_memory (serialized_messages[i].body_buffer);
for (l = 0; l < n; l++) {
GstMemory *mem =
gst_buffer_peek_memory (serialized_messages[i].body_buffer, l);
gst_memory_map (mem, &map_infos[k], GST_MAP_READ);
vectors[j].buffer = map_infos[k].data;
vectors[j].size = map_infos[k].size;
bytes_to_write += vectors[j].size;
k++;
j++;
}
}
}
/* write request: this is synchronous */
to = timeout ? GST_TIMEVAL_TO_TIME (*timeout) : 0;
g_socket_set_timeout (conn->write_socket, (to + GST_SECOND - 1) / GST_SECOND);
res =
writev_bytes (conn->output_stream, vectors, n_vectors, &bytes_written,
TRUE, conn->cancellable);
g_socket_set_timeout (conn->write_socket, 0);
g_assert (bytes_written == bytes_to_write || res != GST_RTSP_OK);
/* free everything */
for (i = 0, k = 0; i < n_messages; i++) {
if (serialized_messages[i].body_buffer) {
gint l, n;
n = gst_buffer_n_memory (serialized_messages[i].body_buffer);
for (l = 0; l < n; l++) {
GstMemory *mem =
gst_buffer_peek_memory (serialized_messages[i].body_buffer, l);
gst_memory_unmap (mem, &map_infos[k]);
k++;
}
}
g_free (serialized_messages[i].data);
}
return res;
no_message:
{
for (i = 0; i < n_messages; i++) {
gst_rtsp_serialized_message_clear (&serialized_messages[i]);
}
g_warning ("Wrong message");
return GST_RTSP_EINVAL;
}
......@@ -3136,13 +3454,6 @@ gst_rtsp_connection_get_remember_session_id (GstRTSPConnection * conn)
#define WRITE_ERR (G_IO_HUP | G_IO_ERR | G_IO_NVAL)
#define WRITE_COND (G_IO_OUT | WRITE_ERR)
typedef struct
{
guint8 *data;
guint size;
guint id;
} GstRTSPRec;
/* async functions */
struct _GstRTSPWatch
{
......@@ -3164,10 +3475,8 @@ struct _GstRTSPWatch
GMutex mutex;
GstQueueArray *messages;
gsize messages_bytes;
guint8 *write_data;
guint write_off;
guint write_size;
guint write_id;
guint messages_count;
gsize max_bytes;
guint max_messages;
GCond queue_not_full;
......@@ -3180,7 +3489,7 @@ struct _GstRTSPWatch
};
#define IS_BACKLOG_FULL(w) (((w)->max_bytes != 0 && (w)->messages_bytes >= (w)->max_bytes) || \
((w)->max_messages != 0 && gst_queue_array_get_length((w)->messages) >= (w)->max_messages))
((w)->max_messages != 0 && (w)->messages_count >= (w)->max_messages))
static gboolean
gst_rtsp_source_prepare (GSource * source, gint * timeout)
......@@ -3426,17 +3735,29 @@ gst_rtsp_source_dispatch_write (GPollableOutputStream * stream,
GstRTSPConnection *conn = watch->conn;
/* if this connection was already closed, stop now */
if (G_POLLABLE_OUTPUT_STREAM (conn->output_stream) != stream)
if (G_POLLABLE_OUTPUT_STREAM (conn->output_stream) != stream ||
!watch->messages)
goto eof;
g_mutex_lock (&watch->mutex);
do {
if (watch->write_data == NULL) {
GstRTSPRec *rec_ptr, rec;
guint n_messages = gst_queue_array_get_length (watch->messages);
GOutputVector *vectors;
GstMapInfo *map_infos;
guint *ids;
gsize bytes_to_write, bytes_written;
guint n_vectors, n_memories, n_ids, drop_messages;
gint i, j, k, l;
GstRTSPSerializedMessage *msg;
/* get a new message from the queue */
rec_ptr = gst_queue_array_pop_head_struct (watch->messages);
if (rec_ptr == NULL) {
/* if this connection was already closed, stop now */
if (G_POLLABLE_OUTPUT_STREAM (conn->output_stream) != stream ||
!watch->messages) {
g_mutex_unlock (&watch->mutex);
goto eof;
}
if (n_messages == 0) {
if (watch->writesrc) {
if (!g_source_is_destroyed ((GSource *) watch))
g_source_remove_child_source ((GSource *) watch, watch->writesrc);
......@@ -3463,34 +3784,202 @@ gst_rtsp_source_dispatch_write (GPollableOutputStream * stream,
break;
}
rec = *rec_ptr;
watch->messages_bytes -= rec.size;
for (i = 0, n_vectors = 0, n_memories = 0, n_ids = 0; i < n_messages; i++) {
msg = gst_queue_array_peek_nth_struct (watch->messages, i);
if (msg->id != 0)
n_ids++;
if (msg->data_offset < msg->data_size)
n_vectors++;
if (msg->body_data && msg->body_offset < msg->body_data_size) {
n_vectors++;
} else if (msg->body_buffer) {
guint m, n;
guint offset = 0;
n = gst_buffer_n_memory (msg->body_buffer);
for (m = 0; m < n; m++) {
GstMemory *mem = gst_buffer_peek_memory (msg->body_buffer, m);
/* Skip all memories we already wrote */
if (offset + mem->size < msg->body_offset) {
offset += mem->size;
continue;
}
offset += mem->size;
watch->write_off = 0;
watch->write_data = rec.data;
watch->write_size = rec.size;
watch->write_id = rec.id;
n_memories++;
n_vectors++;
}
}
}
res = write_bytes (conn->output_stream, watch->write_data,
&watch->write_off, watch->write_size, FALSE, conn->cancellable);
vectors = g_newa (GOutputVector, n_vectors);
map_infos = n_memories ? g_newa (GstMapInfo, n_memories) : NULL;
ids = n_ids ? g_newa (guint, n_ids + 1) : NULL;
if (ids)
memset (ids, 0, sizeof (guint) * (n_ids + 1));
for (i = 0, j = 0, k = 0, l = 0, bytes_to_write = 0; i < n_messages; i++) {
msg = gst_queue_array_peek_nth_struct (watch->messages, i);
if (msg->data_offset < msg->data_size) {
vectors[j].buffer = (msg->data_is_data_header ?
msg->data_header : msg->data) + msg->data_offset;
vectors[j].size = msg->data_size - msg->data_offset;
bytes_to_write += vectors[j].size;
j++;
}
if (msg->body_data) {
if (msg->body_offset < msg->body_data_size) {
vectors[j].buffer = msg->body_data + msg->body_offset;
vectors[j].size = msg->body_data_size - msg->body_offset;
bytes_to_write += vectors[j].size;
j++;
}
} else if (msg->body_buffer) {
guint m, n;
guint offset = 0;
n = gst_buffer_n_memory (msg->body_buffer);
for (m = 0; m < n; m++) {
GstMemory *mem = gst_buffer_peek_memory (msg->body_buffer, m);
guint off;
/* Skip all memories we already wrote */
if (offset + mem->size < msg->body_offset) {
offset += mem->size;
continue;
}
if (offset < msg->body_offset) {
off = msg->body_offset - offset;
} else {
offset += mem->size;
off = 0;
}
g_assert (off < mem->size);
gst_memory_map (mem, &map_infos[k], GST_MAP_READ);
vectors[j].buffer = map_infos[k].data + off;
vectors[j].size = map_infos[k].size - off;
bytes_to_write += vectors[j].size;
k++;
j++;
}
}
}
res =
writev_bytes (watch->conn->output_stream, vectors, n_vectors,
&bytes_written, FALSE, watch->conn->cancellable);
g_assert (bytes_written == bytes_to_write || res != GST_RTSP_OK);
/* First unmap all memories here, this simplifies the code below
* as we don't have to skip all memories that were already written
* before */
for (k = 0; k < n_memories; k++) {
gst_memory_unmap (map_infos[k].memory, &map_infos[k]);
}
if (bytes_written == bytes_to_write) {
/* fast path, just unmap all memories, free memory, drop all messages and notify them */
l = 0;
while ((msg = gst_queue_array_pop_head_struct (watch->messages))) {
if (msg->id) {
ids[l] = msg->id;
l++;
}
gst_rtsp_serialized_message_clear (msg);
}
g_assert (watch->messages_bytes >= bytes_written);
watch->messages_bytes -= bytes_written;
} else if (bytes_written > 0) {
/* not done, let's skip all messages that were sent already and free them */
for (i = 0, k = 0, drop_messages = 0; i < n_messages; i++) {
msg = gst_queue_array_peek_nth_struct (watch->messages, i);