Commits (1)
......@@ -556,6 +556,7 @@ dependencies = [
"anyhow",
"futures",
"gstreamer",
"gstreamer-net",
]
[[package]]
......@@ -565,6 +566,7 @@ dependencies = [
"anyhow",
"futures",
"gstreamer",
"gstreamer-net",
"gstreamer-rtp",
"gstreamer-rtsp",
"gstreamer-rtsp-server",
......
......@@ -7,7 +7,7 @@ Please read the blog post for details.
The code below needs GStreamer from the git `main` branch at the time of
writing, or version 1.22 or newer.
## Rapid synchronization via RTP header extensions
## Inter-device network synchronization via NTP
This branch contains the code that retrieves the sender clock times on the
receiver and renders them over the video frames on the receiver and also
......@@ -15,6 +15,12 @@ prints them on stdout. The times are synchronized between the sender and
receiver via an RTP header extension from the very beginning instead of
relying on RTCP.
In addition the sender and receiver are synchronized to the same NTP clock and
the receiver is configured to do inter-device synchronization based on this.
If multiple receivers are running next to each other they will all play back
the stream completely synchronized, and in this configuration with exactly 2s
latency compared to when the frames were captured.
Both timestamps on the video frames should be the same: the one rendered on
the sender at the top and the one rendered on the receiver at the bottom.
......
......@@ -8,3 +8,4 @@ license = "MIT"
anyhow = "1"
futures = "0.3"
gst = { version = "0.18", package = "gstreamer", features = ["v1_20"] }
gst-net = { version = "0.18", package = "gstreamer-net", features = ["v1_20"] }
......@@ -18,6 +18,14 @@ fn main() -> Result<(), Error> {
_ => bail!("No RTSP URI provided"),
};
// Create the NTP clock and wait for synchronization.
let clock = gst_net::NtpClock::new(None, "pool.ntp.org", 123, gst::ClockTime::ZERO);
println!("Syncing to NTP clock");
clock
.wait_for_sync(gst::ClockTime::from_seconds(5))
.context("Syncing NTP clock")?;
println!("Synced to NTP clock");
// Create the playbin element...
let pipeline = gst::ElementFactory::make("playbin", None)
.context("Creating playbin")?
......@@ -27,6 +35,15 @@ fn main() -> Result<(), Error> {
// ... and set the RTSP URI on it.
pipeline.set_property("uri", &uri);
// Make sure it uses the NTP clock for synchronization.
pipeline.use_clock(Some(&clock));
// Configure a static latency of 2s. This needs to be the same on all receivers and higher than
// the sum of the sender latency and the receiver latency of the receiver with the highest
// latency. As this can't be known automatically and depends on many factors this has to be
// known for the overall system and configured accordingly.
pipeline.set_latency(gst::ClockTime::from_seconds(2));
// When the RTSP source is created, configure a latency of 40ms instead
// of the default of 2s on it and also add reference timestamp metadata
// with the sender clock times to each packet if possible.
......@@ -37,6 +54,12 @@ fn main() -> Result<(), Error> {
source.set_property("latency", 40u32);
source.set_property("add-reference-timestamp-meta", true);
// Configure for network synchronization via the RTP NTP timestamps.
// This requires that sender and receiver are synchronized to the same
// clock.
source.set_property_from_str("buffer-mode", "synced");
source.set_property("ntp-sync", true);
// Don't bother updating inter-stream offsets if the difference to the previous
// configuration is less than 1ms. The synchronization will have rounding errors
// in the range of the RTP clock rate, i.e. 1/90000s and 1/48000s in this case.
......
......@@ -9,6 +9,7 @@ anyhow = "1"
futures = "0.3"
once_cell = "1"
gst = { version = "0.18", package = "gstreamer", features = ["v1_20"] }
gst-net = { version = "0.18", package = "gstreamer-net", features = ["v1_20" ] }
gst-rtp = { version = "0.18", package = "gstreamer-rtp", features = ["v1_20" ] }
gst-rtsp = { version = "0.18", package = "gstreamer-rtsp", features = ["v1_20" ] }
gst-rtsp-server = { version = "0.18", package = "gstreamer-rtsp-server", features = ["v1_20"] }
......@@ -10,7 +10,7 @@ use gst_rtsp_server::prelude::*;
use anyhow::{Context, Error};
use once_cell::sync::Lazy;
use once_cell::sync::{Lazy, OnceCell};
static CAT: Lazy<gst::DebugCategory> = Lazy::new(|| {
gst::DebugCategory::new(
......@@ -20,10 +20,22 @@ static CAT: Lazy<gst::DebugCategory> = Lazy::new(|| {
)
});
// Singleton NTP clock used by the server
static CLOCK: OnceCell<gst_net::NtpClock> = OnceCell::new();
fn main() -> Result<(), Error> {
// Initialize GStreamer.
gst::init()?;
// Create the NTP clock and wait for synchronization.
let clock = gst_net::NtpClock::new(None, "pool.ntp.org", 123, gst::ClockTime::ZERO);
println!("Syncing to NTP clock");
clock
.wait_for_sync(gst::ClockTime::from_seconds(5))
.context("Syncing NTP clock")?;
println!("Synced to NTP clock");
CLOCK.set(clock).expect("Failed to store clock");
// Create an RTSP server with the default settings.
let server = gst_rtsp_server::RTSPServer::new();
......@@ -72,6 +84,8 @@ mod media_factory {
// Let the factory create medias of our custom media type.
factory.set_media_gtype(super::media::Media::static_type());
// Make sure the media factory uses the NTP clock.
factory.set_clock(Some(CLOCK.get().expect("No clock set")));
}
}
......