Skip to content

GitLab

  • Menu
Projects Groups Snippets
  • Help
    • Help
    • Support
    • Community forum
    • Submit feedback
    • Contribute to GitLab
  • Sign in / Register
  • gstreamer gstreamer
  • Project information
    • Project information
    • Activity
    • Labels
    • Members
  • Repository
    • Repository
    • Files
    • Commits
    • Branches
    • Tags
    • Contributors
    • Graph
    • Compare
  • Issues 672
    • Issues 672
    • List
    • Boards
    • Service Desk
    • Milestones
  • Merge requests 353
    • Merge requests 353
  • CI/CD
    • CI/CD
    • Pipelines
    • Jobs
    • Schedules
  • Deployments
    • Deployments
    • Environments
    • Releases
  • Monitor
    • Monitor
    • Incidents
  • Packages & Registries
    • Packages & Registries
    • Container Registry
  • Analytics
    • Analytics
    • Value stream
    • CI/CD
    • Repository
  • Snippets
    • Snippets
  • Activity
  • Graph
  • Create a new issue
  • Jobs
  • Commits
  • Issue Boards
Collapse sidebar
  • GStreamer
  • gstreamergstreamer
  • Issues
  • #625

Closed
Open
Created Oct 26, 2020 by Valmir Junior@valmirjunior0088

Idle pad probe in `tee` source pad does not get called sometimes

Hi!

We were debugging a behaviour that we had observed when we tried to dynamically remove branches of a tee from a running pipeline. We noticed that it would just hang without any kind of error whatsoever. We then noticed that it would hang because an idle pad probe was never getting called.

We were able to reproduce this behaviour with the snippet I'm attaching below. Are we doing something wrong?


use gstreamer::prelude::*;

use {
    gstreamer::{Element, ElementFactory, PadProbeReturn, PadProbeType, Pipeline, State},
    std::{error::Error, sync::mpsc, thread, time::Duration},
};

fn main() -> Result<(), Box<dyn Error>> {
    gstreamer::init()?;

    let videotestsrc = ElementFactory::make("videotestsrc", None)?;
    let tee = ElementFactory::make("tee", None)?;

    let pipeline = Pipeline::new(None);
    pipeline.add(&videotestsrc)?;
    pipeline.add(&tee)?;

    videotestsrc.link(&tee)?;

    pipeline.set_state(State::Playing)?;

    (0..100).try_for_each::<_, Result<(), Box<dyn Error>>>(|index| {
        println!("Iteration #[{}]", index);

        let fakesinks = (0..100)
            .map(|index| {
                println!("Adding fakesink #[{}]", index);

                let fakesink = ElementFactory::make("fakesink", None)?;
                pipeline.add(&fakesink)?;
                fakesink.sync_state_with_parent()?;
                tee.link(&fakesink)?;

                Ok((index, fakesink))
            })
            .collect::<Result<Vec<(usize, Element)>, Box<dyn Error>>>()?;

        thread::sleep(Duration::from_secs(5));

        fakesinks.into_iter().for_each(|(index, fakesink)| {
            println!("Removing fakesink #[{}]", index);

            let (sender, receiver) = mpsc::sync_channel(1);

            let fakesink_sink_pad = fakesink.get_static_pad("sink").unwrap();
            let tee_src_pad = fakesink_sink_pad.get_peer().unwrap();

            tee_src_pad.add_probe(PadProbeType::IDLE, {
                let pipeline = pipeline.downgrade();
                let tee = tee.downgrade();
                let fakesink = fakesink.downgrade();
                let fakesink_sink_pad = fakesink_sink_pad.downgrade();

                move |tee_src_pad, _| {
                    println!("Probe called");

                    if let Some(fakesink_sink_pad) = fakesink_sink_pad.upgrade() {
                        tee_src_pad.unlink(&fakesink_sink_pad).unwrap();
                    }

                    if let Some(tee) = tee.upgrade() {
                        tee.release_request_pad(tee_src_pad);
                    }

                    if let Some(pipeline) = pipeline.upgrade() {
                        if let Some(fakesink) = fakesink.upgrade() {
                            pipeline.remove(&fakesink).unwrap();
                            fakesink.set_state(State::Null).unwrap();
                        }
                    }

                    sender.send(()).unwrap_or(());

                    PadProbeReturn::Remove
                }
            });

            receiver.recv().unwrap_or(());
        });

        Ok(())
    })?;

    pipeline.set_state(State::Null)?;

    Ok(())
}

Edit: Unnecessary .try_for_each()

Edited Oct 26, 2020 by Valmir Junior
Assignee
Assign to
Time tracking