Idle pad probe in `tee` source pad does not get called sometimes
Hi!
We were debugging a behaviour that we had observed when we tried to dynamically remove branches of a tee
from a running pipeline. We noticed that it would just hang without any kind of error whatsoever. We then noticed that it would hang because an idle pad probe was never getting called.
We were able to reproduce this behaviour with the snippet I'm attaching below. Are we doing something wrong?
use gstreamer::prelude::*;
use {
gstreamer::{Element, ElementFactory, PadProbeReturn, PadProbeType, Pipeline, State},
std::{error::Error, sync::mpsc, thread, time::Duration},
};
fn main() -> Result<(), Box<dyn Error>> {
gstreamer::init()?;
let videotestsrc = ElementFactory::make("videotestsrc", None)?;
let tee = ElementFactory::make("tee", None)?;
let pipeline = Pipeline::new(None);
pipeline.add(&videotestsrc)?;
pipeline.add(&tee)?;
videotestsrc.link(&tee)?;
pipeline.set_state(State::Playing)?;
(0..100).try_for_each::<_, Result<(), Box<dyn Error>>>(|index| {
println!("Iteration #[{}]", index);
let fakesinks = (0..100)
.map(|index| {
println!("Adding fakesink #[{}]", index);
let fakesink = ElementFactory::make("fakesink", None)?;
pipeline.add(&fakesink)?;
fakesink.sync_state_with_parent()?;
tee.link(&fakesink)?;
Ok((index, fakesink))
})
.collect::<Result<Vec<(usize, Element)>, Box<dyn Error>>>()?;
thread::sleep(Duration::from_secs(5));
fakesinks.into_iter().for_each(|(index, fakesink)| {
println!("Removing fakesink #[{}]", index);
let (sender, receiver) = mpsc::sync_channel(1);
let fakesink_sink_pad = fakesink.get_static_pad("sink").unwrap();
let tee_src_pad = fakesink_sink_pad.get_peer().unwrap();
tee_src_pad.add_probe(PadProbeType::IDLE, {
let pipeline = pipeline.downgrade();
let tee = tee.downgrade();
let fakesink = fakesink.downgrade();
let fakesink_sink_pad = fakesink_sink_pad.downgrade();
move |tee_src_pad, _| {
println!("Probe called");
if let Some(fakesink_sink_pad) = fakesink_sink_pad.upgrade() {
tee_src_pad.unlink(&fakesink_sink_pad).unwrap();
}
if let Some(tee) = tee.upgrade() {
tee.release_request_pad(tee_src_pad);
}
if let Some(pipeline) = pipeline.upgrade() {
if let Some(fakesink) = fakesink.upgrade() {
pipeline.remove(&fakesink).unwrap();
fakesink.set_state(State::Null).unwrap();
}
}
sender.send(()).unwrap_or(());
PadProbeReturn::Remove
}
});
receiver.recv().unwrap_or(());
});
Ok(())
})?;
pipeline.set_state(State::Null)?;
Ok(())
}
Edit: Unnecessary .try_for_each()