Commit c0422acf authored by Sebastian Dröge's avatar Sebastian Dröge 🍵

Fix reference cycles and minor related problems in all examples and tutorials

These are now all leak-free.
parent ec8b55ec
......@@ -47,7 +47,7 @@ fn create_pipeline() -> Result<gst::Pipeline, Error> {
pipeline.add_many(&[&src, &sink])?;
src.link(&sink)?;
let appsink = sink.clone()
let appsink = sink
.dynamic_cast::<gst_app::AppSink>()
.expect("Sink element is expected to be an appsink!");
......
......@@ -46,7 +46,7 @@ fn create_pipeline() -> Result<gst::Pipeline, Error> {
pipeline.add_many(&[&src, &videoconvert, &sink])?;
gst::Element::link_many(&[&src, &videoconvert, &sink])?;
let appsrc = src.clone()
let appsrc = src
.dynamic_cast::<gst_app::AppSrc>()
.expect("Source element is expected to be an appsrc!");
......
......@@ -53,10 +53,12 @@ fn example_main() -> Result<(), Error> {
pipeline.add_many(&[&src, &decodebin])?;
gst::Element::link_many(&[&src, &decodebin])?;
// Need to move a new reference into the closure
let pipeline_clone = pipeline.clone();
let pipeline_weak = pipeline.downgrade();
decodebin.connect_pad_added(move |dbin, src_pad| {
let pipeline = &pipeline_clone;
let pipeline = match pipeline_weak.upgrade() {
Some(pipeline) => pipeline,
None => return,
};
let (is_audio, is_video) = {
let media_type = src_pad.get_current_caps().and_then(|caps| {
......
......@@ -17,11 +17,12 @@ fn example_main() {
let ret = pipeline.set_state(gst::State::Playing);
assert_ne!(ret, gst::StateChangeReturn::Failure);
let main_loop_clone = main_loop.clone();
let pipeline_clone = pipeline.clone();
let pipeline_weak = pipeline.downgrade();
glib::timeout_add_seconds(5, move || {
let pipeline = &pipeline_clone;
let pipeline = match pipeline_weak.upgrade() {
Some(pipeline) => pipeline,
None => return glib::Continue(false),
};
println!("sending eos");
......@@ -33,6 +34,7 @@ fn example_main() {
//bus.add_signal_watch();
//bus.connect_message(move |_, msg| {
let main_loop_clone = main_loop.clone();
bus.add_watch(move |_, msg| {
use gst::MessageView;
......@@ -61,6 +63,8 @@ fn example_main() {
let ret = pipeline.set_state(gst::State::Null);
assert_ne!(ret, gst::StateChangeReturn::Failure);
bus.remove_watch();
}
fn main() {
......
......@@ -10,9 +10,7 @@ extern crate gtk;
use gtk::prelude::*;
use std::env;
extern crate send_cell;
use send_cell::SendCell;
use std::cell::RefCell;
fn create_ui(app: &gtk::Application) {
let pipeline = gst::Pipeline::new(None);
......@@ -45,9 +43,13 @@ fn create_ui(app: &gtk::Application) {
app.add_window(&window);
let pipeline_clone = pipeline.clone();
gtk::timeout_add(500, move || {
let pipeline = &pipeline_clone;
let pipeline_weak = pipeline.downgrade();
let timeout_id = gtk::timeout_add(500, move || {
let pipeline = match pipeline_weak.upgrade() {
Some(pipeline) => pipeline,
None => return glib::Continue(true),
};
let position = pipeline
.query_position::<gst::ClockTime>()
.unwrap_or_else(|| 0.into());
......@@ -56,9 +58,13 @@ fn create_ui(app: &gtk::Application) {
glib::Continue(true)
});
let app_clone = app.clone();
let app_weak = app.downgrade();
window.connect_delete_event(move |_, _| {
let app = &app_clone;
let app = match app_weak.upgrade() {
Some(app) => app,
None => return Inhibit(false),
};
app.quit();
Inhibit(false)
});
......@@ -68,11 +74,15 @@ fn create_ui(app: &gtk::Application) {
let ret = pipeline.set_state(gst::State::Playing);
assert_ne!(ret, gst::StateChangeReturn::Failure);
let app_clone = SendCell::new(app.clone());
let app_weak = glib::SendWeakRef::from(app.downgrade());
bus.add_watch(move |_, msg| {
use gst::MessageView;
let app = app_clone.borrow();
let app = match app_weak.upgrade() {
Some(app) => app,
None => return glib::Continue(false),
};
match msg.view() {
MessageView::Eos(..) => gtk::main_quit(),
MessageView::Error(err) => {
......@@ -90,11 +100,17 @@ fn create_ui(app: &gtk::Application) {
glib::Continue(true)
});
let pipeline_clone = pipeline.clone();
// Pipeline reference is owned by the closure below, so will be
// destroyed once the app is destroyed
let timeout_id = RefCell::new(Some(timeout_id));
app.connect_shutdown(move |_| {
let pipeline = &pipeline_clone;
let ret = pipeline.set_state(gst::State::Null);
assert_ne!(ret, gst::StateChangeReturn::Failure);
bus.remove_watch();
if let Some(timeout_id) = timeout_id.borrow_mut().take() {
glib::source_remove(timeout_id);
}
});
}
......
......@@ -20,8 +20,7 @@ use std::env;
use std::os::raw::c_void;
extern crate send_cell;
use send_cell::SendCell;
use std::cell::RefCell;
use std::process;
......@@ -49,9 +48,13 @@ fn create_ui(app: &gtk::Application) {
video_window.set_size_request(320, 240);
let video_overlay = sink.clone()
.dynamic_cast::<gst_video::VideoOverlay>()
.unwrap();
.unwrap()
.downgrade();
video_window.connect_realize(move |video_window| {
let video_overlay = &video_overlay;
let video_overlay = match video_overlay.upgrade() {
Some(video_overlay) => video_overlay,
None => return,
};
let gdk_window = video_window.get_window().unwrap();
......@@ -108,9 +111,13 @@ fn create_ui(app: &gtk::Application) {
app.add_window(&window);
let pipeline_clone = pipeline.clone();
gtk::timeout_add(500, move || {
let pipeline = &pipeline_clone;
let pipeline_weak = pipeline.downgrade();
let timeout_id = gtk::timeout_add(500, move || {
let pipeline = match pipeline_weak.upgrade() {
Some(pipeline) => pipeline,
None => return glib::Continue(true),
};
let position = pipeline
.query_position::<gst::ClockTime>()
.unwrap_or_else(|| 0.into());
......@@ -119,9 +126,13 @@ fn create_ui(app: &gtk::Application) {
glib::Continue(true)
});
let app_clone = app.clone();
let app_weak = app.downgrade();
window.connect_delete_event(move |_, _| {
let app = &app_clone;
let app = match app_weak.upgrade() {
Some(app) => app,
None => return Inhibit(false),
};
app.quit();
Inhibit(false)
});
......@@ -131,11 +142,15 @@ fn create_ui(app: &gtk::Application) {
let ret = pipeline.set_state(gst::State::Playing);
assert_ne!(ret, gst::StateChangeReturn::Failure);
let app_clone = SendCell::new(app.clone());
let app_weak = glib::SendWeakRef::from(app.downgrade());
bus.add_watch(move |_, msg| {
use gst::MessageView;
let app = app_clone.borrow();
let app = match app_weak.upgrade() {
Some(app) => app,
None => return glib::Continue(false),
};
match msg.view() {
MessageView::Eos(..) => gtk::main_quit(),
MessageView::Error(err) => {
......@@ -153,11 +168,17 @@ fn create_ui(app: &gtk::Application) {
glib::Continue(true)
});
let pipeline_clone = pipeline.clone();
// Pipeline reference is owned by the closure below, so will be
// destroyed once the app is destroyed
let timeout_id = RefCell::new(Some(timeout_id));
app.connect_shutdown(move |_| {
let pipeline = &pipeline_clone;
let ret = pipeline.set_state(gst::State::Null);
assert_ne!(ret, gst::StateChangeReturn::Failure);
bus.remove_watch();
if let Some(timeout_id) = timeout_id.borrow_mut().take() {
glib::source_remove(timeout_id);
}
});
}
......
......@@ -50,6 +50,8 @@ fn example_main() {
let ret = pipeline.set_state(gst::State::Null);
assert_ne!(ret, gst::StateChangeReturn::Failure);
bus.remove_watch();
}
fn main() {
......
......@@ -17,12 +17,11 @@ fn example_main() {
"audiotestsrc name=src ! audio/x-raw,format={},channels=1 ! fakesink",
gst_audio::AUDIO_FORMAT_S16.to_string()
)).unwrap();
let bus = pipeline.get_bus().unwrap();
let pipeline = pipeline
.dynamic_cast::<gst::Pipeline>()
.unwrap();
let src = pipeline
.clone()
.dynamic_cast::<gst::Bin>()
.unwrap()
.get_by_name("src")
.unwrap();
let src_pad = src.get_static_pad("src").unwrap();
......@@ -53,6 +52,7 @@ fn example_main() {
let ret = pipeline.set_state(gst::State::Playing);
assert_ne!(ret, gst::StateChangeReturn::Failure);
let bus = pipeline.get_bus().unwrap();
while let Some(msg) = bus.timed_pop(gst::CLOCK_TIME_NONE) {
use gst::MessageView;
......
......@@ -32,21 +32,17 @@ fn main_loop(uri: &str) -> Result<(), Error> {
let error = Arc::new(Mutex::new(Ok(())));
let player_clone = player.clone();
let main_loop_clone = main_loop.clone();
player.connect_end_of_stream(move |_| {
player.connect_end_of_stream(move |player| {
let main_loop = &main_loop_clone;
let player = &player_clone;
player.stop();
main_loop.quit();
});
let player_clone = player.clone();
let main_loop_clone = main_loop.clone();
let error_clone = Arc::clone(&error);
player.connect_error(move |_, err| {
player.connect_error(move |player, err| {
let main_loop = &main_loop_clone;
let player = &player_clone;
let error = &error_clone;
*error.lock().unwrap() = Err(err.clone());
......
......@@ -23,9 +23,12 @@ fn example_main() {
let main_loop_clone = main_loop.clone();
let pipeline_clone = pipeline.clone();
glib::timeout_add_seconds(1, move || {
let pipeline = &pipeline_clone;
let pipeline_weak = pipeline.downgrade();
let timeout_id = glib::timeout_add_seconds(1, move || {
let pipeline = match pipeline_weak.upgrade() {
Some(pipeline) => pipeline,
None => return glib::Continue(true),
};
//let pos = pipeline.query_position(gst::Format::Time).unwrap_or(-1);
//let dur = pipeline.query_duration(gst::Format::Time).unwrap_or(-1);
......@@ -81,6 +84,9 @@ fn example_main() {
let ret = pipeline.set_state(gst::State::Null);
assert_ne!(ret, gst::StateChangeReturn::Failure);
bus.remove_watch();
glib::source_remove(timeout_id);
}
fn main() {
......
......@@ -205,9 +205,14 @@ fn example_main() -> Result<(), Error> {
let sinkpad = get_request_pad(&rtpbin, "recv_rtp_sink_0")?;
srcpad.link(&sinkpad).into_result()?;
let depay_clone = depay.clone();
let depay_weak = depay.downgrade();
rtpbin.connect_pad_added(move |rtpbin, src_pad| {
match connect_rtpbin_srcpad(&src_pad, &depay_clone) {
let depay = match depay_weak.upgrade() {
Some(depay) => depay,
None => return,
};
match connect_rtpbin_srcpad(&src_pad, &depay) {
Ok(_) => (),
Err(err) => {
gst_element_error!(
......@@ -245,6 +250,9 @@ fn example_main() -> Result<(), Error> {
match msg.view() {
MessageView::Eos(..) => break,
MessageView::Error(err) => {
let ret = pipeline.set_state(gst::State::Null);
assert_ne!(ret, gst::StateChangeReturn::Failure);
return Err(ErrorMessage {
src: msg.get_src()
.map(|s| s.get_path_string())
......
......@@ -183,6 +183,9 @@ fn example_main() -> Result<(), Error> {
match msg.view() {
MessageView::Eos(..) => break,
MessageView::Error(err) => {
let ret = pipeline.set_state(gst::State::Null);
assert_ne!(ret, gst::StateChangeReturn::Failure);
return Err(ErrorMessage {
src: msg.get_src()
.map(|s| s.get_path_string())
......
......@@ -94,7 +94,7 @@ fn main_loop() -> Result<(), Error> {
mounts.add_factory("/test", &factory);
server.attach(None);
let id = server.attach(None);
println!(
"Stream ready at rtsps://127.0.0.1:{}/test",
......@@ -103,6 +103,8 @@ fn main_loop() -> Result<(), Error> {
main_loop.run();
glib::source_remove(id);
Ok(())
}
......
......@@ -41,7 +41,7 @@ fn main_loop() -> Result<(), Error> {
mounts.add_factory("/test", &factory);
server.attach(None);
let id = server.attach(None);
println!(
"Stream ready at rtsp://127.0.0.1:{}/test",
......@@ -50,6 +50,8 @@ fn main_loop() -> Result<(), Error> {
main_loop.run();
glib::source_remove(id);
Ok(())
}
......
......@@ -30,9 +30,12 @@ fn example_main() {
gst::Element::link_many(&[&src, &decodebin]).unwrap();
// Need to move a new reference into the closure
let pipeline_clone = pipeline.clone();
let pipeline_weak = pipeline.downgrade();
decodebin.connect_pad_added(move |_, src_pad| {
let pipeline = &pipeline_clone;
let pipeline = match pipeline_weak.upgrade() {
Some(pipeline) => pipeline,
None => return,
};
let queue = gst::ElementFactory::make("queue", None).unwrap();
let sink = gst::ElementFactory::make("fakesink", None).unwrap();
......
......@@ -32,11 +32,18 @@ fn tutorial_main() {
.expect("Can't set uri property on uridecodebin");
// Connect the pad-added signal
let pipeline_clone = pipeline.clone();
let convert_clone = convert.clone();
let pipeline_weak = pipeline.downgrade();
let convert_weak = convert.downgrade();
source.connect_pad_added(move |_, src_pad| {
let pipeline = &pipeline_clone;
let convert = &convert_clone;
let pipeline = match pipeline_weak.upgrade() {
Some(pipeline) => pipeline,
None => return,
};
let convert = match convert_weak.upgrade() {
Some(convert) => convert,
None => return,
};
println!(
"Received new pad {} from {}",
......
......@@ -20,6 +20,33 @@ mod tutorial5 {
extern crate gstreamer_video as gst_video;
use self::gst_video::prelude::*;
use std::ops;
// Custom struct to keep our window reference alive
// and to store the timeout id so that we can remove
// it from the main context again later and drop the
// references it keeps inside its closures
struct AppWindow {
main_window: Window,
timeout_id: Option<glib::SourceId>,
}
impl ops::Deref for AppWindow {
type Target = Window;
fn deref(&self) -> &Window {
&self.main_window
}
}
impl Drop for AppWindow {
fn drop(&mut self) {
if let Some(source_id) = self.timeout_id.take() {
glib::source_remove(source_id);
}
}
}
// Extract tags from streams of @stype and add the info in the UI.
fn add_streams_info(
playbin: &gst::Element,
......@@ -79,7 +106,7 @@ mod tutorial5 {
}
// This creates all the GTK+ widgets that compose our application, and registers the callbacks
pub fn create_ui(playbin: &gst::Element) {
fn create_ui(playbin: &gst::Element) -> AppWindow {
let main_window = Window::new(WindowType::Toplevel);
main_window.connect_delete_event(|_, _| {
gtk::main_quit();
......@@ -144,7 +171,7 @@ mod tutorial5 {
let pipeline = playbin.clone();
let lslider = slider.clone();
// Update the UI (seekbar) every second
gtk::timeout_add_seconds(1, move || {
let timeout_id = gtk::timeout_add_seconds(1, move || {
let pipeline = &pipeline;
let lslider = &lslider;
......@@ -228,7 +255,7 @@ mod tutorial5 {
let streams_list = gtk::TextView::new();
streams_list.set_editable(false);
let pipeline = playbin.clone();
let pipeline_weak = playbin.downgrade();
let textbuf = SendCell::new(
streams_list
.get_buffer()
......@@ -239,6 +266,11 @@ mod tutorial5 {
.unwrap()
.connect_message(move |_, msg| match msg.view() {
gst::MessageView::Application(application) => {
let pipeline = match pipeline_weak.upgrade() {
Some(pipeline) => pipeline,
None => return,
};
if application.get_structure().map(|s| s.get_name()) == Some("tags-changed") {
analyze_streams(&pipeline, &textbuf);
}
......@@ -257,6 +289,11 @@ mod tutorial5 {
main_window.set_default_size(640, 480);
main_window.show_all();
AppWindow {
main_window,
timeout_id: Some(timeout_id),
}
}
// We are possibly in a GStreamer working thread, so we notify the main
......@@ -290,43 +327,45 @@ mod tutorial5 {
data/media/sintel_trailer-480p.webm";
let playbin = gst::ElementFactory::make("playbin", None).unwrap();
playbin
.set_property("uri", &glib::Value::from(uri))
.set_property("uri", &uri)
.unwrap();
let pipeline = playbin.clone();
playbin
.connect("video-tags-changed", false, move |_| {
.connect("video-tags-changed", false, |args| {
let pipeline = args[0].get::<gst::Element>().unwrap();
post_app_message(&pipeline);
None
})
.unwrap();
let pipeline = playbin.clone();
playbin
.connect("audio-tags-changed", false, move |_| {
.connect("audio-tags-changed", false, |args| {
let pipeline = args[0].get::<gst::Element>().unwrap();
post_app_message(&pipeline);
None
})
.unwrap();
let pipeline = playbin.clone();
playbin
.connect("text-tags-changed", false, move |_| {
.connect("text-tags-changed", false, move |args| {
let pipeline = args[0].get::<gst::Element>().unwrap();
post_app_message(&pipeline);
None
})
.unwrap();
create_ui(&playbin);
playbin
.set_state(gst::State::Playing)
.into_result()
.unwrap();
let window = create_ui(&playbin);
let bus = playbin.get_bus().unwrap();
let pipeline = playbin.clone();
bus.add_signal_watch();
playbin.get_bus().unwrap().connect_message(move |_, msg| {
let pipeline_weak = playbin.downgrade();
bus.connect_message(move |_, msg| {
let pipeline = match pipeline_weak.upgrade() {
Some(pipeline) => pipeline,
None => return,
};
match msg.view() {
// This is called when an End-Of-Stream message is posted on the bus.
// We just set the pipeline to READY (which stops playback).
......@@ -359,8 +398,16 @@ mod tutorial5 {
}
});
playbin
.set_state(gst::State::Playing)
.into_result()
.unwrap();
gtk::main();
window.hide();
playbin.set_state(gst::State::Null).into_result().unwrap();
bus.remove_signal_watch();
}
}
......
......@@ -127,30 +127,34 @@ fn main() {
let audio_caps = info.to_caps().unwrap();
let appsrc = appsrc
.clone()
.dynamic_cast::<AppSrc>()
.expect("Source element is expected to be an appsrc!");
appsrc.set_caps(&audio_caps);
appsrc.set_property_format(gst::Format::Time);
let appsink = appsink
.clone()
.dynamic_cast::<AppSink>()
.expect("Sink element is expected to be an appsink!");
let data: Arc<Mutex<CustomData>> = Arc::new(Mutex::new(CustomData::new(&appsrc, &appsink)));
let data_clone = Arc::clone(&data);
let data_weak = Arc::downgrade(&data);
appsrc.connect_need_data(move |_, _size| {
let data = &data_clone;
let data = match data_weak.upgrade() {
Some(data) => data,
None => return,
};
let mut d = data.lock().unwrap();
if d.source_id.is_none() {
println!("start feeding");
let data_clone = Arc::clone(data);
let data_weak = Arc::downgrade(&data);
d.source_id = Some(glib::source::idle_add(move || {
let data = &data_clone;
let data = match data_weak.upgrade() {
Some(data) => data,
None => return glib::Continue(false),
};
let (appsrc, buffer) = {
let mut data = data.lock().unwrap();
......@@ -198,9 +202,12 @@ fn main() {
}
});
let data_clone = Arc::clone(&data);
let data_weak = Arc::downgrade(&data);
appsrc.connect_enough_data(move |_| {
let data = &data_clone;
let data = match data_weak.upgrade() {
Some(data) => data,
None => return,
};
let mut data = data.lock().unwrap();
if let Some(source) = data.source_id.take() {
......@@ -213,10 +220,15 @@ fn main() {
appsink.set_emit_signals(true);
appsink.set_caps(&audio_caps);
let data_clone = Arc::clone(&data);
let data_weak = Arc::downgrade(&data);
appsink.connect_new_sample(move |_| {
let data = match data_weak.upgrade() {
Some(data) => data,
None => return gst::FlowReturn::Ok,
};
let appsink = {
let data = &data_clone.lock().unwrap();
let data = data.lock().unwrap();
data.appsink.clone()
};
......@@ -230,10 +242,6 @@ fn main() {
gst::FlowReturn::Ok
});
pipeline
.set_state(gst::State::Playing)
.into_result()
.expect("Unable to set the pipeline to the Playing state.");
let main_loop = glib::MainLoop::new(None, false);
let main_loop_clone = main_loop.clone();
let bus = pipeline.get_bus().unwrap();
......@@ -252,10 +260,17 @@ fn main() {
});
bus.add_signal_watch();
pipeline
.set_state(gst::State::Playing)
.into_result()
.expect("Unable to set the pipeline to the Playing state.");
main_loop.run();
pipeline
.set_state(gst::State::Null)
.into_result()
.expect("Unable to set the pipeline to the Null state.");
bus.remove_signal_watch();
}
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment