Commit f9a39b11 authored by Guillaume Desmottes's avatar Guillaume Desmottes 🐐
Browse files

add uriplaylistbin plugin

uriplaylistbin plays a list of URIs sequentially, ensuring gapless transitions
and proper streams synchronization.
parent 97e6a89c
Pipeline #455129 passed with stages
in 24 minutes and 50 seconds
......@@ -16,6 +16,7 @@ members = [
"net/rusoto",
"utils/fallbackswitch",
"utils/togglerecord",
"utils/uriplaylistbin",
"video/cdg",
"video/closedcaption",
"video/videofx",
......@@ -46,6 +47,7 @@ default-members = [
"net/rusoto",
"utils/fallbackswitch",
"utils/togglerecord",
"utils/uriplaylistbin",
"video/cdg",
"video/ffv1",
"video/flavors",
......
......@@ -58,6 +58,7 @@ plugins = {
# https://github.com/qnighy/libwebp-sys2-rs/issues/4
'gst-plugin-webp': 'libgstrswebp',
'gst-plugin-videofx': 'libgstvideofx',
'gst-plugin-uriplaylistbin': 'libgsturiplaylistbin',
}
extra_env = {}
......
[package]
name = "gst-plugin-uriplaylistbin"
version = "0.8.0"
authors = ["Guillaume Desmottes <guillaume.desmottes@onestream.live>"]
repository = "https://gitlab.freedesktop.org/gstreamer/gst-plugins-rs"
license = "MPL-2.0"
edition = "2018"
description = "Playlist Plugin"
[dependencies]
gst = { package = "gstreamer", git = "https://gitlab.freedesktop.org/gstreamer/gstreamer-rs", features = ["v1_14"] }
once_cell = "1.0"
anyhow = "1"
crossbeam-channel = "0.5"
[dev-dependencies]
gst-app = { package = "gstreamer-app", git = "https://gitlab.freedesktop.org/gstreamer/gstreamer-rs", features = ["v1_14"]}
structopt = "0.3"
url = "2.2"
more-asserts = "0.2"
[lib]
name = "gsturiplaylistbin"
crate-type = ["cdylib", "rlib"]
path = "src/lib.rs"
[[example]]
name = "playlist"
path = "examples/playlist.rs"
[build-dependencies]
gst-plugin-version-helper = { path="../../version-helper" }
[features]
# GStreamer 1.14 is required for static linking
static = ["gst/v1_14"]
capi = []
[package.metadata.capi]
min_version = "0.8.0"
[package.metadata.capi.header]
enabled = false
[package.metadata.capi.library]
install_subdir = "gstreamer-1.0"
versioning = false
[package.metadata.capi.pkg_config]
requires_private = "gstreamer-1.0, gobject-2.0, glib-2.0, gmodule-2.0"
fn main() {
gst_plugin_version_helper::info()
}
// Copyright (C) 2021 OneStream Live <guillaume.desmottes@onestream.live>
//
// This Source Code Form is subject to the terms of the Mozilla Public License, v2.0.
// If a copy of the MPL was not distributed with this file, You can obtain one at
// <https://mozilla.org/MPL/2.0/>.
//
// SPDX-License-Identifier: MPL-2.0
use std::{
collections::HashMap,
path::Path,
sync::{Arc, Mutex},
};
use gst::prelude::*;
use structopt::StructOpt;
#[derive(Debug, StructOpt)]
#[structopt(name = "playlist", about = "An example of uriplaylistbin usage.")]
struct Opt {
#[structopt(default_value = "1", short = "i", long = "iterations")]
iterations: u32,
uris: Vec<String>,
}
fn create_pipeline(uris: Vec<String>, iterations: u32) -> anyhow::Result<gst::Pipeline> {
let pipeline = gst::Pipeline::new(None);
let playlist = gst::ElementFactory::make("uriplaylistbin", None)?;
pipeline.add(&playlist)?;
playlist.set_property("uris", &uris);
playlist.set_property("iterations", &iterations);
let sink_bins = Arc::new(Mutex::new(HashMap::new()));
let sink_bins_clone = sink_bins.clone();
let pipeline_weak = pipeline.downgrade();
playlist.connect_pad_added(move |_playlist, src_pad| {
let pipeline = match pipeline_weak.upgrade() {
None => return,
Some(pipeline) => pipeline,
};
let pad_name = src_pad.name();
let sink = if pad_name.starts_with("audio") {
gst::parse_bin_from_description("audioconvert ! audioresample ! autoaudiosink", true)
.unwrap()
} else if pad_name.starts_with("video") {
gst::parse_bin_from_description("videoconvert ! autovideosink", true).unwrap()
} else {
unimplemented!();
};
pipeline.add(&sink).unwrap();
sink.sync_state_with_parent().unwrap();
let sink_pad = sink.static_pad("sink").unwrap();
src_pad.link(&sink_pad).unwrap();
sink_bins.lock().unwrap().insert(pad_name, sink);
});
let pipeline_weak = pipeline.downgrade();
playlist.connect_pad_removed(move |_playlist, pad| {
let pipeline = match pipeline_weak.upgrade() {
None => return,
Some(pipeline) => pipeline,
};
// remove sink bin that was handling the pad
let sink_bins = sink_bins_clone.lock().unwrap();
let sink = sink_bins.get(&pad.name()).unwrap();
pipeline.remove(sink).unwrap();
let _ = sink.set_state(gst::State::Null);
});
Ok(pipeline)
}
fn main() -> anyhow::Result<()> {
gst::init().unwrap();
gsturiplaylistbin::plugin_register_static().expect("Failed to register uriplaylistbin plugin");
let opt = Opt::from_args();
if opt.uris.is_empty() {
anyhow::bail!("Need at least one URI to play");
}
let uris = opt
.uris
.into_iter()
.map(|uri| {
let p = Path::new(&uri);
match p.canonicalize() {
Ok(p) => format!("file://{}", p.to_str().unwrap().to_string()),
_ => uri,
}
})
.collect();
{
let pipeline = create_pipeline(uris, opt.iterations)?;
pipeline
.set_state(gst::State::Playing)
.expect("Unable to set the pipeline to the `Playing` state");
let bus = pipeline.bus().unwrap();
for msg in bus.iter_timed(gst::ClockTime::NONE) {
use gst::MessageView;
match msg.view() {
MessageView::Error(err) => {
eprintln!(
"Error received from element {:?}: {}",
err.src().map(|s| s.path_string()),
err.error()
);
eprintln!("Debugging information: {:?}", err.debug());
break;
}
MessageView::Eos(..) => break,
_ => (),
}
}
pipeline
.set_state(gst::State::Null)
.expect("Unable to set the pipeline to the `Null` state");
}
unsafe {
gst::deinit();
}
Ok(())
}
// Copyright (C) 2021 OneStream Live <guillaume.desmottes@onestream.live>
//
// This Source Code Form is subject to the terms of the Mozilla Public License, v2.0.
// If a copy of the MPL was not distributed with this file, You can obtain one at
// <https://mozilla.org/MPL/2.0/>.
//
// SPDX-License-Identifier: MPL-2.0
use gst::glib;
mod uriplaylistbin;
fn plugin_init(plugin: &gst::Plugin) -> Result<(), glib::BoolError> {
uriplaylistbin::register(plugin)?;
Ok(())
}
gst::plugin_define!(
uriplaylistbin,
env!("CARGO_PKG_DESCRIPTION"),
plugin_init,
concat!(env!("CARGO_PKG_VERSION"), "-", env!("COMMIT_ID")),
"LGPL",
env!("CARGO_PKG_NAME"),
env!("CARGO_PKG_NAME"),
env!("CARGO_PKG_REPOSITORY"),
env!("BUILD_REL_DATE")
);
This diff is collapsed.
// Copyright (C) 2021 OneStream Live <guillaume.desmottes@onestream.live>
//
// This Source Code Form is subject to the terms of the Mozilla Public License, v2.0.
// If a copy of the MPL was not distributed with this file, You can obtain one at
// <https://mozilla.org/MPL/2.0/>.
//
// SPDX-License-Identifier: MPL-2.0
use gst::glib;
use gst::prelude::*;
mod imp;
glib::wrapper! {
pub struct UriPlaylistBin(ObjectSubclass<imp::UriPlaylistBin>) @extends gst::Bin, gst::Element, gst::Object;
}
// GStreamer elements need to be thread-safe. For the private implementation this is automatically
// enforced but for the public wrapper type we need to specify this manually.
unsafe impl Send for UriPlaylistBin {}
unsafe impl Sync for UriPlaylistBin {}
pub fn register(plugin: &gst::Plugin) -> Result<(), glib::BoolError> {
gst::Element::register(
Some(plugin),
"uriplaylistbin",
gst::Rank::None,
UriPlaylistBin::static_type(),
)
}
// Copyright (C) 2021 OneStream Live <guillaume.desmottes@onestream.live>
//
// This Source Code Form is subject to the terms of the Mozilla Public License, v2.0.
// If a copy of the MPL was not distributed with this file, You can obtain one at
// <https://mozilla.org/MPL/2.0/>.
//
// SPDX-License-Identifier: MPL-2.0
use std::path::PathBuf;
use gst::prelude::*;
use gst::MessageView;
use more_asserts::assert_ge;
struct TestMedia {
uri: String,
len: gst::ClockTime,
}
fn file_name_to_uri(name: &str) -> String {
let input_path = {
let mut r = PathBuf::new();
r.push(env!("CARGO_MANIFEST_DIR"));
r.push("tests");
r.push(name);
r
};
let url = url::Url::from_file_path(&input_path).unwrap();
url.to_string()
}
impl TestMedia {
fn ogg() -> Self {
Self {
uri: file_name_to_uri("sample.ogg"),
len: gst::ClockTime::from_mseconds(510),
}
}
fn mkv() -> Self {
Self {
uri: file_name_to_uri("sample.mkv"),
len: gst::ClockTime::from_mseconds(510),
}
}
fn missing_file() -> Self {
Self {
uri: "file:///not-there.ogg".to_string(),
len: gst::ClockTime::from_mseconds(10),
}
}
fn missing_http() -> Self {
Self {
uri: "http:///not-there.ogg".to_string(),
len: gst::ClockTime::from_mseconds(10),
}
}
}
fn init() {
use std::sync::Once;
static INIT: Once = Once::new();
INIT.call_once(|| {
gst::init().unwrap();
gsturiplaylistbin::plugin_register_static()
.expect("Failed to register uriplaylistbin plugin");
});
}
fn test(
medias: Vec<TestMedia>,
n_streams: u32,
iterations: u32,
check_streams: bool,
) -> Vec<gst::Message> {
init();
let playlist_len = medias.len() * (iterations as usize);
let pipeline = gst::Pipeline::new(None);
let playlist = gst::ElementFactory::make("uriplaylistbin", None).unwrap();
let mq = gst::ElementFactory::make("multiqueue", None).unwrap();
pipeline.add_many(&[&playlist, &mq]).unwrap();
let total_len = gst::ClockTime::from_nseconds(
medias
.iter()
.map(|t| t.len.nseconds() * (iterations as u64))
.sum(),
);
let uris: Vec<String> = medias.iter().map(|t| t.uri.clone()).collect();
playlist.set_property("uris", &uris);
playlist.set_property("iterations", &iterations);
let mq_clone = mq.clone();
playlist.connect_pad_added(move |_playlist, src_pad| {
let mq_sink = mq_clone.request_pad_simple("sink_%u").unwrap();
src_pad.link(&mq_sink).unwrap();
});
let pipeline_weak = pipeline.downgrade();
mq.connect_pad_added(move |_mq, pad| {
if pad.direction() != gst::PadDirection::Src {
return;
}
let pipeline = match pipeline_weak.upgrade() {
Some(pipeline) => pipeline,
None => return,
};
let sink = gst::ElementFactory::make("fakesink", None).unwrap();
pipeline.add(&sink).unwrap();
sink.sync_state_with_parent().unwrap();
pad.link(&sink.static_pad("sink").unwrap()).unwrap();
});
pipeline.set_state(gst::State::Playing).unwrap();
let bus = pipeline.bus().unwrap();
let mut events = vec![];
loop {
let msg = bus.iter_timed(gst::ClockTime::NONE).next().unwrap();
match msg.view() {
MessageView::Error(_) | MessageView::Eos(..) => {
events.push(msg.clone());
break;
}
// check stream related messages
MessageView::StreamCollection(_) | MessageView::StreamsSelected(_) => {
events.push(msg.clone())
}
_ => {}
}
}
// check we actually played all files and all streams
fn stream_end_ts(sink: &gst::Element) -> gst::ClockTime {
let sample: gst::Sample = sink.property("last-sample");
let buffer = sample.buffer().unwrap();
let pts = buffer.pts().unwrap();
let segment = sample.segment().unwrap();
let segment = segment.downcast_ref::<gst::ClockTime>().unwrap();
let rt = segment.to_running_time(pts).unwrap();
rt + buffer.duration().unwrap()
}
if check_streams {
// check all streams have been fully played
let mut n = 0;
for sink in pipeline.iterate_sinks() {
let sink = sink.unwrap();
assert_ge!(
stream_end_ts(&sink),
total_len,
"{}: {} < {}",
sink.name(),
stream_end_ts(&sink),
total_len
);
n += 1;
}
assert_eq!(n, n_streams);
// check stream-collection and streams-selected message ordering
let mut events = events.clone().into_iter();
for _ in 0..playlist_len {
let decodebin = assert_stream_collection(events.next().unwrap(), n_streams as usize);
assert_eq!(
assert_stream_selected(events.next().unwrap(), n_streams as usize),
decodebin
);
}
}
pipeline.set_state(gst::State::Null).unwrap();
events
}
fn assert_eos(msg: gst::Message) {
assert!(matches!(msg.view(), MessageView::Eos(_)));
}
fn assert_error(msg: gst::Message, failing: TestMedia) {
match msg.view() {
MessageView::Error(err) => {
let details = err.details().unwrap();
assert_eq!(details.get::<&str>("uri").unwrap(), failing.uri);
}
_ => {
panic!("last message is not an error");
}
}
}
fn assert_stream_collection(msg: gst::Message, n_streams: usize) -> gst::Object {
match msg.view() {
MessageView::StreamCollection(sc) => {
let collection = sc.stream_collection();
assert_eq!(collection.len(), n_streams);
sc.src().unwrap()
}
_ => {
panic!("message is not a stream collection");
}
}
}
fn assert_stream_selected(msg: gst::Message, n_streams: usize) -> gst::Object {
match msg.view() {
MessageView::StreamsSelected(ss) => {
let collection = ss.stream_collection();
assert_eq!(collection.len(), n_streams);
ss.src().unwrap()
}
_ => {
panic!("message is not stream selected");
}
}
}
#[test]
fn single_audio() {
let events = test(vec![TestMedia::ogg()], 1, 1, true).into_iter();
assert_eos(events.last().unwrap());
}
#[test]
fn single_video() {
let events = test(vec![TestMedia::mkv()], 2, 1, true).into_iter();
assert_eos(events.last().unwrap());
}
#[test]
fn multi_audio() {
let events = test(
vec![TestMedia::ogg(), TestMedia::ogg(), TestMedia::ogg()],
1,
1,
true,
)
.into_iter();
assert_eos(events.last().unwrap());
}
#[test]
fn multi_audio_video() {
let events = test(vec![TestMedia::mkv(), TestMedia::mkv()], 2, 1, true).into_iter();
assert_eos(events.last().unwrap());
}
#[test]
fn iterations() {
let events = test(vec![TestMedia::mkv(), TestMedia::mkv()], 2, 2, true).into_iter();
assert_eos(events.last().unwrap());
}
#[test]
fn nb_streams_increasing() {
let events = test(vec![TestMedia::ogg(), TestMedia::mkv()], 2, 1, false).into_iter();
assert_eos(events.last().unwrap());
}
#[test]
fn missing_file() {
let events = test(
vec![TestMedia::ogg(), TestMedia::missing_file()],
1,
1,
false,
)
.into_iter();
assert_error(events.last().unwrap(), TestMedia::missing_file());
}
#[test]
fn missing_http() {
let events = test(
vec![TestMedia::ogg(), TestMedia::missing_http()],
1,
1,
false,
)
.into_iter();
assert_error(events.last().unwrap(), TestMedia::missing_http());
}
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment