Commit f1ef8540 authored by Antonio Ospite's avatar Antonio Ospite

rtpsession: add support for buffer lists on the recv path

The send path in rtpsession processes the buffer list along the way,
sharing info and stats between packets in the same list, because it
assumes that all packets in a buffer list are from the same frame.

However, in the receiving path packets can arrive in all sorts of
arrangements:

  - different sources,
  - different frames (different timestamps),
  - different types (multiplexed RTP and RTCP, invalid RTP packets).

so a more general approach should be used to correctly support buffer
lists in the receive path.

It turns out that it's simpler and more robust to process buffers
individually inside the rtpsession element even if they come in a buffer
list, and then reassemble a new buffer list when pushing the buffers
downstream.

This avoids complicating the existing code to make all functions
buffer-list-aware with the risk of introducing regressions,

To support buffer lists in the receive path and reduce the "push
overhead" in the pipeline, a new private field named processed_list
added to GstRtpSessionPrivate, it is set in the chain_list handler and
used in the process_rtp callback; this is to achieve this following
effect:

  - iterate over the incoming buffer list;
  - process the packets one by one;
  - add the valid ones to a new buffer list;
  - push the new buffer list downstream.

The processed_list field is reset before pushing a buffer list to be on
the safe side in case a single buffer was to be pushed by upstream
at some later point.

NOTE:

The proposed modifications do not change the behavior of the send path.

The process_rtp callback is called in rtpsource.c by the push_rtp
callback (via source_push_rtp) only when the source is not internal.

So even though push_rtp is also called in the send path, it won't end up
using process_rtp in this case because the source would be internal in
the send path.

The reasoning from above may suggest a future refactoring: push_rtp
might be split to better differentiate the send and receive path.
parent 85894216
......@@ -282,6 +282,10 @@ struct _GstRtpSessionPrivate
gboolean rtcp_sync_send_time;
guint rtx_count;
/* The list of processed packets in the receive path when upstream pushed
* a buffer list. */
GstBufferList *processed_list;
};
/* callbacks to handle actions from the session manager */
......@@ -1300,8 +1304,7 @@ gst_rtp_session_clear_pt_map (GstRtpSession * rtpsession)
g_hash_table_foreach_remove (rtpsession->priv->ptmap, return_true, NULL);
}
/* called when the session manager has an RTP packet or a list of packets
* ready for further processing */
/* called when the session manager has an RTP packet ready to be pushed */
static GstFlowReturn
gst_rtp_session_process_rtp (RTPSession * sess, RTPSource * src,
GstBuffer * buffer, gpointer user_data)
......@@ -1318,8 +1321,14 @@ gst_rtp_session_process_rtp (RTPSession * sess, RTPSource * src,
GST_RTP_SESSION_UNLOCK (rtpsession);
if (rtp_src) {
GST_LOG_OBJECT (rtpsession, "pushing received RTP packet");
result = gst_pad_push (rtp_src, buffer);
if (rtpsession->priv->processed_list) {
GST_LOG_OBJECT (rtpsession, "queueing received RTP packet");
gst_buffer_list_add (rtpsession->priv->processed_list, buffer);
result = GST_FLOW_OK;
} else {
GST_LOG_OBJECT (rtpsession, "pushing received RTP packet");
result = gst_pad_push (rtp_src, buffer);
}
gst_object_unref (rtp_src);
} else {
GST_DEBUG_OBJECT (rtpsession, "dropping received RTP packet");
......@@ -1958,6 +1967,60 @@ push_error:
}
}
static gboolean
process_received_buffer_in_list (GstBuffer ** buffer, guint idx, gpointer data)
{
gint ret;
ret = gst_rtp_session_chain_recv_rtp (NULL, data, *buffer);
if (ret != GST_FLOW_OK)
GST_ERROR ("Processing individual buffer in a list failed");
/*
* The buffer has been processed, remove it from the original list, if it was
* a valid RTP buffer it has been added to the "processed" list in
* gst_rtp_session_process_rtp().
*/
*buffer = NULL;
return TRUE;
}
static GstFlowReturn
gst_rtp_session_chain_recv_rtp_list (GstPad * pad, GstObject * parent,
GstBufferList * list)
{
GstRtpSession *rtpsession = GST_RTP_SESSION (parent);
GstBufferList *processed_list;
processed_list = gst_buffer_list_new ();
/* Set some private data to detect that a buffer list is being pushed. */
rtpsession->priv->processed_list = processed_list;
/*
* Individually process the buffers from the incoming buffer list as the
* incoming RTP packets in the list can be mixed in all sorts of ways:
* - different frames,
* - different sources,
* - different types (RTP or RTCP)
*/
gst_buffer_list_foreach (list,
(GstBufferListFunc) process_received_buffer_in_list, parent);
gst_buffer_list_unref (list);
/* Clean up private data in case the next push does not use a buffer list. */
rtpsession->priv->processed_list = NULL;
if (gst_buffer_list_length (processed_list) == 0 || !rtpsession->recv_rtp_src) {
gst_buffer_list_unref (processed_list);
return GST_FLOW_OK;
}
GST_LOG_OBJECT (rtpsession, "pushing received RTP list");
return gst_pad_push_list (rtpsession->recv_rtp_src, processed_list);
}
static gboolean
gst_rtp_session_event_recv_rtcp_sink (GstPad * pad, GstObject * parent,
GstEvent * event)
......@@ -2359,6 +2422,8 @@ create_recv_rtp_sink (GstRtpSession * rtpsession)
"recv_rtp_sink");
gst_pad_set_chain_function (rtpsession->recv_rtp_sink,
gst_rtp_session_chain_recv_rtp);
gst_pad_set_chain_list_function (rtpsession->recv_rtp_sink,
gst_rtp_session_chain_recv_rtp_list);
gst_pad_set_event_function (rtpsession->recv_rtp_sink,
gst_rtp_session_event_recv_rtp_sink);
gst_pad_set_iterate_internal_links_function (rtpsession->recv_rtp_sink,
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment