Commit d66aa872 authored by Sebastian Dröge's avatar Sebastian Dröge 🍵 Committed by GStreamer Merge Bot

dtls: Handle errors/close_notify at all steps and propagate through the layers properly

Previously we simply logged errors but never reported them to elements
or even to the user. Fatal errors are now properly reported.

Additionally proper connection closing is implemented based on EOS:
- dtlsenc: EOS will cause close_notify to be sent to the peer and only
           if the peer also sent back close_notify we will forward the
           EOS event.
- dtlsdec: EOS will be forwarded normally, this only means that the
           unterlying transport was closed. On receiving a DTLS packet
           containing close_notify, return EOS and send EOS downstream.
parent a132138f
This diff is collapsed.
......@@ -26,7 +26,7 @@
#ifndef gstdtlsconnection_h
#define gstdtlsconnection_h
#include <glib-object.h>
#include <gst/gst.h>
G_BEGIN_DECLS
......@@ -84,7 +84,7 @@ struct _GstDtlsConnectionClass {
GType gst_dtls_connection_get_type(void) G_GNUC_CONST;
void gst_dtls_connection_start(GstDtlsConnection *, gboolean is_client);
gboolean gst_dtls_connection_start(GstDtlsConnection *, gboolean is_client, GError **err);
void gst_dtls_connection_check_timeout(GstDtlsConnection *);
/*
......@@ -108,15 +108,30 @@ void gst_dtls_connection_set_send_callback(GstDtlsConnection *, GstDtlsConnectio
/*
* Processes data that has been received, the transformation is done in-place.
* Returns the length of the plaintext data that was decoded, if no data is available, 0<= will be returned.
*
* Returns:
* - GST_FLOW_EOS if the receive side of the DTLS connection was closed by
* the peer, i.e. close_notify was sent by the peer
* - GST_FLOW_ERROR + err if an error happened
* - GST_FLOW_OK + written >= 0 if processing was successful. ptr then
* contains the decoded bytes
*/
gint gst_dtls_connection_process(GstDtlsConnection *, gpointer ptr, gint len);
GstFlowReturn gst_dtls_connection_process(GstDtlsConnection *, gpointer ptr, gsize len, gsize *written, GError **err);
/*
* If the DTLS handshake is completed this function will encode the given data.
* Returns the length of the data sent, or 0 if the DTLS handshake is not completed.
* Will encode and send the given data.
*
* Sending with len == 0 will close the send side of the DTLS connection and
* no further data can be sent anymore in the future. This will also send the
* close_notify to the peer.
*
* Returns:
* - GST_FLOW_EOS if the send side of the DTLS connection was closed, i.e.
* we received an EOS before.
* - GST_FLOW_ERROR + err if an error happened
* - GST_FLOW_OK + written >= 0 if processing was successful
*/
gint gst_dtls_connection_send(GstDtlsConnection *, gpointer ptr, gint len);
GstFlowReturn gst_dtls_connection_send(GstDtlsConnection *, gconstpointer ptr, gsize len, gsize *written, GError **err);
G_END_DECLS
......
......@@ -452,62 +452,105 @@ on_peer_certificate_received (GstDtlsConnection * connection, gchar * pem,
return TRUE;
}
static gint
static GstFlowReturn
process_buffer (GstDtlsDec * self, GstBuffer * buffer)
{
GstFlowReturn flow_ret;
GstMapInfo map_info;
gint size;
GError *err = NULL;
gsize written = 0;
if (!gst_buffer_map (buffer, &map_info, GST_MAP_READWRITE))
return 0;
return GST_FLOW_ERROR;
if (!map_info.size) {
gst_buffer_unmap (buffer, &map_info);
return 0;
return GST_FLOW_ERROR;
}
size =
flow_ret =
gst_dtls_connection_process (self->connection, map_info.data,
map_info.size);
map_info.size, &written, &err);
gst_buffer_unmap (buffer, &map_info);
if (size <= 0)
return size;
gst_buffer_set_size (buffer, size);
switch (flow_ret) {
case GST_FLOW_OK:
GST_LOG_OBJECT (self,
"Decoded buffer of size %" G_GSIZE_FORMAT " B to %" G_GSIZE_FORMAT,
map_info.size, written);
gst_buffer_set_size (buffer, written);
break;
case GST_FLOW_EOS:
gst_buffer_set_size (buffer, written);
GST_DEBUG_OBJECT (self, "Peer closed the connection");
break;
case GST_FLOW_ERROR:
GST_ERROR_OBJECT (self, "Error processing buffer: %s", err->message);
GST_ELEMENT_ERROR (self, RESOURCE, READ, (NULL), ("%s", err->message));
g_clear_error (&err);
break;
default:
g_assert_not_reached ();
}
g_assert (err == NULL);
return size;
return flow_ret;
}
typedef struct
{
GstDtlsDec *self;
GstFlowReturn flow_ret;
guint processed;
} ProcessListData;
static gboolean
process_buffer_from_list (GstBuffer ** buffer, guint idx, gpointer user_data)
{
GstDtlsDec *self = GST_DTLS_DEC (user_data);
gint size;
ProcessListData *process_list_data = user_data;
GstDtlsDec *self = GST_DTLS_DEC (process_list_data->self);
GstFlowReturn flow_ret;
*buffer = gst_buffer_make_writable (*buffer);
size = process_buffer (self, *buffer);
if (size <= 0)
flow_ret = process_buffer (self, *buffer);
process_list_data->flow_ret = flow_ret;
if (gst_buffer_get_size (*buffer) == 0)
gst_buffer_replace (buffer, NULL);
else if (flow_ret != GST_FLOW_ERROR)
process_list_data->processed++;
return TRUE;
return flow_ret == GST_FLOW_OK;
}
static GstFlowReturn
sink_chain_list (GstPad * pad, GstObject * parent, GstBufferList * list)
{
GstDtlsDec *self = GST_DTLS_DEC (parent);
GstFlowReturn ret = GST_FLOW_OK;
GstPad *other_pad;
ProcessListData process_list_data = { self, GST_FLOW_OK, 0 };
list = gst_buffer_list_make_writable (list);
gst_buffer_list_foreach (list, process_buffer_from_list, self);
gst_buffer_list_foreach (list, process_buffer_from_list, &process_list_data);
/* If we successfully processed at least some buffers then forward those */
if (process_list_data.flow_ret != GST_FLOW_OK
&& process_list_data.processed == 0) {
GST_ERROR_OBJECT (self, "Failed to process buffer list: %s",
gst_flow_get_name (process_list_data.flow_ret));
gst_buffer_list_unref (list);
return process_list_data.flow_ret;
}
/* Remove all buffers after the first one that failed to be processed */
gst_buffer_list_remove (list, process_list_data.processed,
gst_buffer_list_length (list) - process_list_data.processed);
if (gst_buffer_list_length (list) == 0) {
GST_DEBUG_OBJECT (self, "Not produced any buffers");
gst_buffer_list_unref (list);
return GST_FLOW_OK;
return process_list_data.flow_ret;
}
g_mutex_lock (&self->src_mutex);
......@@ -517,17 +560,25 @@ sink_chain_list (GstPad * pad, GstObject * parent, GstBufferList * list)
g_mutex_unlock (&self->src_mutex);
if (other_pad) {
GST_LOG_OBJECT (self, "decoded buffer list with length %u, pushing",
gboolean was_eos = process_list_data.flow_ret == GST_FLOW_EOS;
GST_LOG_OBJECT (self, "pushing buffer list with length %u",
gst_buffer_list_length (list));
ret = gst_pad_push_list (other_pad, list);
process_list_data.flow_ret = gst_pad_push_list (other_pad, list);
/* If the peer closed the connection, signal that we're done here now */
if (was_eos)
gst_pad_push_event (other_pad, gst_event_new_eos ());
gst_object_unref (other_pad);
} else {
GST_LOG_OBJECT (self, "dropped buffer list with length %d, not linked",
GST_LOG_OBJECT (self,
"dropping buffer list with length %d, have no source pad",
gst_buffer_list_length (list));
gst_buffer_list_unref (list);
}
return ret;
return process_list_data.flow_ret;
}
static GstFlowReturn
......@@ -535,7 +586,6 @@ sink_chain (GstPad * pad, GstObject * parent, GstBuffer * buffer)
{
GstDtlsDec *self = GST_DTLS_DEC (parent);
GstFlowReturn ret = GST_FLOW_OK;
gint size;
GstPad *other_pad;
if (!self->agent) {
......@@ -548,12 +598,12 @@ sink_chain (GstPad * pad, GstObject * parent, GstBuffer * buffer)
self->connection_id, gst_buffer_get_size (buffer));
buffer = gst_buffer_make_writable (buffer);
size = process_buffer (self, buffer);
if (size <= 0) {
ret = process_buffer (self, buffer);
if (ret == GST_FLOW_ERROR) {
GST_ERROR_OBJECT (self, "Failed to process buffer: %s",
gst_flow_get_name (ret));
gst_buffer_unref (buffer);
return GST_FLOW_OK;
return ret;
}
g_mutex_lock (&self->src_mutex);
......@@ -563,11 +613,25 @@ sink_chain (GstPad * pad, GstObject * parent, GstBuffer * buffer)
g_mutex_unlock (&self->src_mutex);
if (other_pad) {
GST_LOG_OBJECT (self, "decoded buffer with length %d, pushing", size);
ret = gst_pad_push (other_pad, buffer);
gboolean was_eos = (ret == GST_FLOW_EOS);
if (gst_buffer_get_size (buffer) > 0) {
GST_LOG_OBJECT (self, "pushing buffer");
ret = gst_pad_push (other_pad, buffer);
} else {
gst_buffer_unref (buffer);
}
/* If the peer closed the connection, signal that we're done here now */
if (was_eos) {
gst_pad_push_event (other_pad, gst_event_new_eos ());
if (ret == GST_FLOW_OK)
ret = GST_FLOW_EOS;
}
gst_object_unref (other_pad);
} else {
GST_LOG_OBJECT (self, "dropped buffer with length %d, not linked", size);
GST_LOG_OBJECT (self, "dropping buffer, have no source pad");
gst_buffer_unref (buffer);
}
......
......@@ -101,7 +101,7 @@ static gboolean sink_event (GstPad * pad, GstObject * parent, GstEvent * event);
static void on_key_received (GstDtlsConnection *, gpointer key, guint cipher,
guint auth, GstDtlsEnc *);
static gboolean on_send_data (GstDtlsConnection *, gconstpointer data,
gint length, GstDtlsEnc *);
gsize length, GstDtlsEnc *);
static void
gst_dtls_enc_class_init (GstDtlsEncClass * klass)
......@@ -326,10 +326,17 @@ gst_dtls_enc_change_state (GstElement * element, GstStateChange transition)
ret = GST_ELEMENT_CLASS (parent_class)->change_state (element, transition);
switch (transition) {
case GST_STATE_CHANGE_READY_TO_PAUSED:
case GST_STATE_CHANGE_READY_TO_PAUSED:{
GError *err = NULL;
GST_DEBUG_OBJECT (self, "starting connection %s", self->connection_id);
gst_dtls_connection_start (self->connection, self->is_client);
if (!gst_dtls_connection_start (self->connection, self->is_client, &err)) {
GST_ELEMENT_ERROR (self, RESOURCE, OPEN_WRITE, (NULL), ("%s",
err->message));
g_clear_error (&err);
}
break;
}
default:
break;
}
......@@ -460,17 +467,25 @@ src_task_loop (GstPad * pad)
GST_TRACE_OBJECT (self, "src loop: releasing lock");
ret = gst_pad_push (self->src, buffer);
if (check_connection_timeout)
gst_dtls_connection_check_timeout (self->connection);
if (buffer) {
ret = gst_pad_push (self->src, buffer);
if (check_connection_timeout)
gst_dtls_connection_check_timeout (self->connection);
if (G_UNLIKELY (ret == GST_FLOW_NOT_LINKED || ret < GST_FLOW_EOS)) {
GST_WARNING_OBJECT (self, "failed to push buffer on src pad: %s",
gst_flow_get_name (ret));
if (G_UNLIKELY (ret == GST_FLOW_NOT_LINKED || ret < GST_FLOW_EOS)) {
GST_WARNING_OBJECT (self, "failed to push buffer on src pad: %s",
gst_flow_get_name (ret));
}
g_mutex_lock (&self->queue_lock);
self->src_ret = ret;
g_mutex_unlock (&self->queue_lock);
} else {
GST_DEBUG_OBJECT (self, "Peer and us closed the connection, sending EOS");
gst_pad_push_event (self->src, gst_event_new_eos ());
g_mutex_lock (&self->queue_lock);
self->src_ret = GST_FLOW_EOS;
g_mutex_unlock (&self->queue_lock);
}
g_mutex_lock (&self->queue_lock);
self->src_ret = ret;
g_mutex_unlock (&self->queue_lock);
}
static GstFlowReturn
......@@ -478,7 +493,9 @@ sink_chain (GstPad * pad, GstObject * parent, GstBuffer * buffer)
{
GstDtlsEnc *self = GST_DTLS_ENC (parent);
GstMapInfo map_info;
gint ret;
GError *err = NULL;
gsize to_write, written = 0;
GstFlowReturn ret = GST_FLOW_OK;
g_mutex_lock (&self->queue_lock);
if (self->src_ret != GST_FLOW_OK) {
......@@ -495,28 +512,48 @@ sink_chain (GstPad * pad, GstObject * parent, GstBuffer * buffer)
gst_buffer_map (buffer, &map_info, GST_MAP_READ);
if (map_info.size) {
to_write = map_info.size;
while (to_write > 0 && ret == GST_FLOW_OK) {
ret =
gst_dtls_connection_send (self->connection, map_info.data,
map_info.size);
if (ret != map_info.size) {
GST_WARNING_OBJECT (self,
"error sending data: %d B were written, expected value was %"
G_GSIZE_FORMAT " B", ret, map_info.size);
map_info.size, &written, &err);
switch (ret) {
case GST_FLOW_OK:
GST_DEBUG_OBJECT (self,
"Wrote %" G_GSIZE_FORMAT " B of %" G_GSIZE_FORMAT " B", written,
map_info.size);
g_assert (written <= to_write);
to_write -= written;
break;
case GST_FLOW_EOS:
GST_INFO_OBJECT (self, "Received data after the connection was closed");
break;
case GST_FLOW_ERROR:
GST_WARNING_OBJECT (self, "error sending data: %s", err->message);
GST_ELEMENT_ERROR (self, RESOURCE, WRITE, (NULL), ("%s", err->message));
g_clear_error (&err);
break;
default:
g_assert_not_reached ();
break;
}
g_assert (err == NULL);
}
gst_buffer_unmap (buffer, &map_info);
gst_buffer_unref (buffer);
return GST_FLOW_OK;
return ret;
}
static gboolean
sink_event (GstPad * pad, GstObject * parent, GstEvent * event)
{
GstDtlsEnc *self = GST_DTLS_ENC (parent);
gboolean ret = FALSE;
switch (GST_EVENT_TYPE (event)) {
......@@ -528,6 +565,28 @@ sink_event (GstPad * pad, GstObject * parent, GstEvent * event)
gst_event_unref (event);
ret = TRUE;
break;
case GST_EVENT_EOS:{
GstFlowReturn flow_ret;
/* Close the write side of the connection now */
flow_ret =
gst_dtls_connection_send (self->connection, NULL, 0, NULL, NULL);
if (flow_ret != GST_FLOW_OK)
GST_ERROR_OBJECT (self, "Failed to send close_notify");
/* Do not forward the EOS event unless the peer already closed to the
* connection itself. If it didn't yet then we'll later get the send
* callback called with no data and send EOS from there */
if (flow_ret == GST_FLOW_EOS) {
ret = gst_pad_event_default (pad, parent, event);
} else {
gst_event_unref (event);
ret = TRUE;
}
break;
}
default:
ret = gst_pad_event_default (pad, parent, event);
break;
......@@ -567,16 +626,17 @@ on_key_received (GstDtlsConnection * connection, gpointer key, guint cipher,
}
static gboolean
on_send_data (GstDtlsConnection * connection, gconstpointer data, gint length,
on_send_data (GstDtlsConnection * connection, gconstpointer data, gsize length,
GstDtlsEnc * self)
{
GstBuffer *buffer;
gboolean ret;
GST_DEBUG_OBJECT (self, "sending data from %s with length %d",
GST_DEBUG_OBJECT (self, "sending data from %s with length %" G_GSIZE_FORMAT,
self->connection_id, length);
buffer = gst_buffer_new_wrapped (g_memdup (data, length), length);
buffer =
data ? gst_buffer_new_wrapped (g_memdup (data, length), length) : NULL;
GST_TRACE_OBJECT (self, "send data: acquiring lock");
g_mutex_lock (&self->queue_lock);
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment