Commit 831996a3 authored by Mathieu Duponchelle's avatar Mathieu Duponchelle 🐸

rtpmanager: add new element, rtcpmanager

parent 0bd82cbf
Pipeline #9444 passed with stages
in 2 minutes and 15 seconds
......@@ -3491,6 +3491,21 @@ GST_TYPE_RTP_SESSION
gst_rtp_session_get_type
</SECTION>
<SECTION>
<FILE>element-rtcpmanager</FILE>
<TITLE>rtcpmanager</TITLE>
GstRTCPManager
<SUBSECTION Standard>
GstRTCPManagerClass
GST_TYPE_RTCP_MANAGER
GST_RTCP_MANAGER
GST_RTCP_MANAGER_CLASS
GST_IS_RTCP_MANAGER
GST_IS_RTCP_MANAGER_CLASS
<SUBSECTION Private>
gst_rtcp_manager_get_type
</SECTION>
<SECTION>
<FILE>element-rtpssrcdemux</FILE>
<TITLE>rtpssrcdemux</TITLE>
......
......@@ -15,7 +15,8 @@ libgstrtpmanager_la_SOURCES = gstrtpmanager.c \
rtpsource.c \
rtpstats.c \
gstrtpsession.c \
gstrtpfunnel.c
gstrtpfunnel.c \
gstrtcpmanager.c
noinst_HEADERS = gstrtpbin.h \
gstrtpdtmfmux.h \
......@@ -31,8 +32,8 @@ noinst_HEADERS = gstrtpbin.h \
rtpsource.h \
rtpstats.h \
gstrtpsession.h \
gstrtpfunnel.h
gstrtpfunnel.h \
gstrtcpmanager.h
libgstrtpmanager_la_CFLAGS = $(GST_PLUGINS_BASE_CFLAGS) $(GST_CFLAGS) \
$(GST_NET_CFLAGS) $(WARNING_CFLAGS) $(ERROR_CFLAGS)
......
This diff is collapsed.
/* RTCP manager element for GStreamer
*
* Copyright <2017> Mathieu Duponchelle <mathieu@centricular.com>.
*
* This library is free software; you can redistribute it and/or
* modify it under the terms of the GNU Library General Public
* License as published by the Free Software Foundation; either
* version 2 of the License, or (at your option) any later version.
*
* This library is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
* Library General Public License for more details.
*
* You should have received a copy of the GNU Library General Public
* License along with this library; if not, write to the
* Free Software Foundation, Inc., 51 Franklin St, Fifth Floor,
* Boston, MA 02110-1301, USA.
*/
#ifndef __GST_RTCP_MANAGER_H__
#define __GST_RTCP_MANAGER_H__
#include <gst/gst.h>
G_BEGIN_DECLS
#define GST_TYPE_RTCP_MANAGER (gst_rtcp_manager_get_type())
#define GST_RTCP_MANAGER(obj) (G_TYPE_CHECK_INSTANCE_CAST((obj),GST_TYPE_RTCP_MANAGER, GstRTCPManager))
#define GST_RTCP_MANAGER_CLASS(klass) (G_TYPE_CHECK_CLASS_CAST((klass),GST_TYPE_RTCP_MANAGER, GstRTCPManager))
#define GST_IS_RTCP_MANAGER(obj) (G_TYPE_CHECK_INSTANCE_TYPE((obj),GST_TYPE_RTCP_MANAGER))
#define GST_IS_RTCP_MANAGER_CLASS(obj) (G_TYPE_CHECK_CLASS_TYPE((klass),GST_TYPE_RTCP_MANAGER))
typedef struct _GstRTCPManager GstRTCPManager;
typedef struct _GstRTCPManagerClass GstRTCPManagerClass;
/**
* GstRTCPManager:
*
* The opaque #GstRTCPManager structure.
*/
struct _GstRTCPManager
{
GstElement mux;
GstPad *sinkpad;
GRecMutex lock; /* Protects all the fields below */
GstClock *sysclock; /* Cached system clock */
GSequence *timeouts; /* Sorted list of sessions */
GstClockID id; /* Clock shot ID */
GHashTable *sessions; /* RTPSession -> GSequenceIter mapping */
GHashTable *sender_ssrcs; /* SSRC -> RTPSession mapping */
GHashTable *receiver_ssrcs; /* SSRC -> RTPSession mapping */
/* RTCP dispatching task */
GstTask *task;
GRecMutex task_lock;
};
struct _GstRTCPManagerClass
{
GstElementClass parent_class;
void (*add_session) (GstRTCPManager *mgr, GObject *session);
void (*remove_session) (GstRTCPManager *mgr, GObject *session);
};
GType gst_rtcp_manager_get_type (void);
gboolean gst_rtcp_manager_plugin_init (GstPlugin * plugin);
G_END_DECLS
#endif /* __GST_RTCP_MANAGER_H__ */
......@@ -32,6 +32,7 @@
#include "gstrtpdtmfmux.h"
#include "gstrtpmux.h"
#include "gstrtpfunnel.h"
#include "gstrtcpmanager.h"
static gboolean
plugin_init (GstPlugin * plugin)
......@@ -74,6 +75,9 @@ plugin_init (GstPlugin * plugin)
GST_TYPE_RTP_FUNNEL))
return FALSE;
if (!gst_rtcp_manager_plugin_init (plugin))
return FALSE;
return TRUE;
}
......
......@@ -15,6 +15,7 @@ rtpmanager_sources = [
'rtpstats.c',
'gstrtpsession.c',
'gstrtpfunnel.c',
'gstrtcpmanager.c',
]
gstrtpmanager = library('gstrtpmanager',
......
noinst_PROGRAMS = server-alsasrc-PCMA client-PCMA \
client-rtpaux server-rtpaux client-rtpbundle server-rtpbundle
client-rtpaux server-rtpaux client-rtpbundle server-rtpbundle \
manage-rtcp
# FIXME 0.11: ignore GValueArray warnings for now until this is sorted
ERROR_CFLAGS=
......@@ -28,6 +29,10 @@ client_PCMA_SOURCES = client-PCMA.c
client_PCMA_CFLAGS = $(GST_CFLAGS)
client_PCMA_LDADD = $(GST_LIBS) $(LIBM)
manage_rtcp_SOURCES = manage-rtcp.c
manage_rtcp_CFLAGS = $(GST_CFLAGS)
manage_rtcp_LDADD = $(GST_LIBS) $(LIBM)
noinst_SCRIPTS=client-H263p-AMR.sh \
client-H263p-PCMA.sh \
client-H264-PCMA.sh \
......
/*
* GStreamer
* Copyright (C) 2018 Mathieu Duponchelle <mathieu@centricular.com>
*
* This library is free software; you can redistribute it and/or
* modify it under the terms of the GNU Library General Public
* License as published by the Free Software Foundation; either
* version 2 of the License, or (at your option) any later version.
*
* This library is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
* Library General Public License for more details.
*
* You should have received a copy of the GNU Library General Public
* License along with this library; if not, write to the
* Free Software Foundation, Inc., 51 Franklin St, Fifth Floor,
* Boston, MA 02110-1301, USA.
*/
#include <gst/gst.h>
static GMainLoop *loop = NULL;
#define MAKE_AND_ADD(var, pipe, name, label, elem_name) \
G_STMT_START { \
if (G_UNLIKELY (!(var = (gst_element_factory_make (name, elem_name))))) { \
GST_ERROR ("Could not create element %s", name); \
goto label; \
} \
if (G_UNLIKELY (!gst_bin_add (GST_BIN_CAST (pipe), var))) { \
GST_ERROR ("Could not add element %s", name); \
gst_object_unref (var); \
goto label; \
} \
} G_STMT_END
static void
manage_rtcp (GstElement * rtcp_manager, GstElement * rtpbin, guint session_idx)
{
GObject *session;
g_signal_emit_by_name (rtpbin, "get-internal-session", session_idx, &session);
g_signal_emit_by_name (rtcp_manager, "add-session", session);
}
static gboolean
plug_rtcp (GstElement * pipe, GstElement * rtpbin, guint16 send_rtcp_port,
guint16 recv_rtcp_port)
{
GstElement *rtcpsink, *rtcpsrc;
gboolean ret = FALSE;
MAKE_AND_ADD (rtcpsrc, pipe, "udpsrc", err, NULL);
MAKE_AND_ADD (rtcpsink, pipe, "udpsink", err, NULL);
g_object_set (rtcpsink, "port", send_rtcp_port, NULL);
g_object_set (rtcpsrc, "port", recv_rtcp_port, "address", "localhost", NULL);
if (!gst_element_link_pads (rtpbin, "send_rtcp_src_0", rtcpsink, "sink")) {
GST_ERROR ("Failed to link rtcpsink");
goto err;
}
if (!gst_element_link_pads (rtcpsrc, "src", rtpbin, "recv_rtcp_sink_0")) {
GST_ERROR ("Failed to link rtcp src");
goto err;
}
ret = TRUE;
done:
return ret;
err:
goto done;
}
static GstElement *
make_sender (guint16 rtp_port, guint16 send_rtcp_port, guint16 recv_rtcp_port,
GstElement * rtcp_manager)
{
GstElement *ret = gst_pipeline_new (NULL);
GstElement *src, *enc, *pay, *queue, *rtpbin, *rtpsink, *rtcpsink;
MAKE_AND_ADD (src, ret, "audiotestsrc", err, NULL);
MAKE_AND_ADD (enc, ret, "alawenc", err, NULL);
MAKE_AND_ADD (pay, ret, "rtppcmapay", err, NULL);
MAKE_AND_ADD (queue, ret, "queue", err, NULL);
MAKE_AND_ADD (rtpbin, ret, "rtpbin", err, NULL);
MAKE_AND_ADD (rtpsink, ret, "udpsink", err, NULL);
if (!(rtcpsink = gst_element_factory_make ("udpsink", NULL))) {
GST_ERROR ("Failed to make udpsink");
goto err;
}
if (!gst_element_link_many (src, enc, pay, queue, NULL)) {
GST_ERROR ("Failed to link media source");
goto err;
}
if (!gst_element_link_pads (queue, "src", rtpbin, "send_rtp_sink_0")) {
GST_ERROR ("Failed to link media source");
goto err;
}
if (!gst_element_link_pads (rtpbin, "send_rtp_src_0", rtpsink, "sink")) {
GST_ERROR ("Failed to link rtp sink");
goto err;
}
g_object_set (rtcpsink, "sync", FALSE, "async", FALSE, NULL);
if (rtcp_manager)
manage_rtcp (rtcp_manager, rtpbin, 0);
else if (!plug_rtcp (ret, rtpbin, send_rtcp_port, recv_rtcp_port))
goto err;
g_object_set (rtpsink, "port", rtp_port, NULL);
done:
return ret;
err:
gst_object_unref (ret);
ret = NULL;
goto done;
}
static void
rtpbin_pad_added_cb (GstElement * rtpbin, GstPad * srcpad, GstElement * sink)
{
GstPad *sinkpad = gst_element_get_static_pad (sink, "sink");
gst_pad_link (srcpad, sinkpad);
gst_object_unref (sinkpad);
}
static GstElement *
make_receiver (guint16 rtp_port, guint16 send_rtcp_port, guint16 recv_rtcp_port,
GstElement * rtcp_manager)
{
GstElement *ret = gst_pipeline_new (NULL);
GstElement *rtpsrc, *filter, *rtpbin, *sink;
GstCaps *caps;
MAKE_AND_ADD (rtpsrc, ret, "udpsrc", err, NULL);
MAKE_AND_ADD (filter, ret, "capsfilter", err, NULL);
MAKE_AND_ADD (rtpbin, ret, "rtpbin", err, NULL);
MAKE_AND_ADD (sink, ret, "fakesink", err, NULL);
if (!gst_element_link (rtpsrc, filter)) {
GST_ERROR ("Failed to link rtpsrc with capsfilter");
goto done;
}
if (!gst_element_link_pads (filter, "src", rtpbin, "recv_rtp_sink_0")) {
GST_ERROR ("Failed to link capsfilter to rtpbin");
goto err;
}
if (rtcp_manager)
manage_rtcp (rtcp_manager, rtpbin, 0);
else if (!plug_rtcp (ret, rtpbin, send_rtcp_port, recv_rtcp_port))
goto err;
g_object_set (rtpsrc, "port", rtp_port, "address", "localhost", NULL);
caps = gst_caps_new_simple ("application/x-rtp",
"media", G_TYPE_STRING, "audio",
"clock-rate", G_TYPE_INT, 8000,
"encoding-name", G_TYPE_STRING, "PCMA", "payload", G_TYPE_INT, 8, NULL);
g_object_set (filter, "caps", caps, NULL);
gst_caps_unref (caps);
g_signal_connect (rtpbin, "pad-added", (GCallback) rtpbin_pad_added_cb, sink);
done:
return ret;
err:
gst_object_unref (ret);
ret = NULL;
goto done;
}
static gboolean
message_handler (GstBus * bus, GstMessage * message, gpointer data)
{
if (message->type == GST_MESSAGE_ERROR) {
GST_ERROR ("Error: %" GST_PTR_FORMAT, message);
if (loop) {
g_main_loop_quit (loop);
g_main_loop_unref (loop);
loop = NULL;
}
}
return TRUE;
}
static void
rtcpmanager_pad_added_cb (GstElement * rtcpmanager, GstPad * srcpad,
GstElement * funnel)
{
GstPad *sinkpad = gst_element_get_request_pad (funnel, "sink_%u");
gst_pad_link (srcpad, sinkpad);
gst_object_unref (sinkpad);
}
static GstElement *
make_rtcp_pipeline (guint16 send_port, guint16 recv_port, GstElement ** manager)
{
GstElement *ret = gst_pipeline_new (NULL);
GstElement *src, *funnel, *sink;
GstBus *bus;
MAKE_AND_ADD (src, ret, "udpsrc", err, NULL);
MAKE_AND_ADD (*manager, ret, "rtcpmanager", err, NULL);
MAKE_AND_ADD (funnel, ret, "funnel", err, NULL);
MAKE_AND_ADD (sink, ret, "udpsink", err, NULL);
g_object_set (src, "address", "localhost", "port", recv_port, NULL);
if (!gst_element_link (src, *manager)) {
GST_ERROR ("Failed to link RTCP source to RTCP manager");
goto err;
}
if (!gst_element_link (funnel, sink)) {
GST_ERROR ("Failed to link RTCP source to RTCP manager");
goto err;
}
bus = gst_element_get_bus (ret);
gst_bus_add_watch (bus, message_handler, NULL);
gst_object_unref (bus);
g_signal_connect (*manager, "pad-added", (GCallback) rtcpmanager_pad_added_cb,
funnel);
g_object_set (sink, "sync", FALSE, "async", FALSE, "port", send_port, NULL);
if (gst_element_set_state (ret,
GST_STATE_PLAYING) == GST_STATE_CHANGE_FAILURE) {
GST_ERROR ("Failed to set RTCP pipeline to playing");
goto err;
}
done:
return ret;
err:
gst_object_unref (ret);
ret = NULL;
goto done;
}
static void
free_pipeline (GstElement * pipeline)
{
gst_element_set_state (pipeline, GST_STATE_NULL);
gst_object_unref (pipeline);
}
int
main (int ac, char **av)
{
gint ret = 0;
GstElement *pipe;
GPtrArray *pipelines =
g_ptr_array_new_with_free_func ((GDestroyNotify) free_pipeline);
guint i;
GstElement *send_rtcp_manager = NULL;
GstElement *recv_rtcp_manager = NULL;
GstBus *bus;
guint64 n_pipelines, managed;
gchar *endptr;
gst_init (NULL, NULL);
if (ac != 3) {
GST_ERROR ("Usage: %s N_PIPELINES MANAGE (example: %s 25 1)", av[0], av[0]);
goto err;
}
n_pipelines = g_ascii_strtoull (av[1], &endptr, 10);
if (*endptr != '\0') {
GST_ERROR ("Invalid number of pipelines: %s", av[1]);
goto err;
}
managed = (gboolean) g_ascii_strtoull (av[2], &endptr, 10);
if (*endptr != '\0') {
GST_ERROR ("Invalid managed value: %s", av[2]);
goto err;
}
if (managed) {
if (!(pipe = make_rtcp_pipeline (4098, 4099, &send_rtcp_manager)))
goto err;
g_ptr_array_add (pipelines, pipe);
if (!(pipe = make_rtcp_pipeline (4099, 4098, &recv_rtcp_manager)))
goto err;
g_ptr_array_add (pipelines, pipe);
}
for (i = 0; i < n_pipelines; i++) {
if (!(pipe =
make_receiver (5000 + i * 4, 5003 + i * 4, 5001 + i * 4,
recv_rtcp_manager)))
goto err;
g_ptr_array_add (pipelines, pipe);
bus = gst_element_get_bus (pipe);
gst_bus_add_watch (bus, message_handler, NULL);
gst_object_unref (bus);
if (gst_element_set_state (pipe,
GST_STATE_PLAYING) == GST_STATE_CHANGE_FAILURE) {
GST_ERROR ("Failed to set receiver to PLAYING");
goto err;
}
if (!(pipe =
make_sender (5000 + i * 4, 5001 + i * 4, 5003 + i * 4,
send_rtcp_manager)))
goto err;
g_ptr_array_add (pipelines, pipe);
bus = gst_element_get_bus (pipe);
gst_bus_add_watch (bus, message_handler, NULL);
gst_object_unref (bus);
if (gst_element_set_state (pipe,
GST_STATE_PLAYING) == GST_STATE_CHANGE_FAILURE) {
GST_ERROR ("Failed to set sender to PLAYING");
goto err;
}
}
loop = g_main_loop_new (NULL, FALSE);
g_main_loop_run (loop);
g_print ("Program finished successfully, cleaning up!");
done:
g_ptr_array_unref (pipelines);
return ret;
err:
g_printerr
("Program finished with error, run with GST_DEBUG=1 for details\n");
ret = -1;
goto done;
}
......@@ -5,6 +5,7 @@ rtp_progs = [
'server-rtpaux',
'client-rtpbundle',
'server-rtpbundle',
'manage-rtcp',
]
foreach prog : rtp_progs
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment