Skip to content

Draft: fallbacksrc: Add multi-stream support with the stream API

Closes #383

Marking as draft for now because of a few missing features and an upstream blocker.

This MR aims to bring support for handling more than 2 (1 video + 1 audio) streams to fallbacksrc.

fallbacksrc now fully supports the stream selection API and lets the app/user select which outputs are needed, by posting its own StreamCollection and respecting choices made via the select-streams message.

Internally fallbacksrc will still always provide the 2 default streams, and if used like the old version, it will still only expose pads according to enable_video/enable_audio properties.

However, as soon as the source posts its StreamCollection, fallbacksrc will create its own streams to match and will let the user/application know it can select more outputs in that case. Once selected, an output is guaranteed to stay present - if the source disappears, it'll switch to the fallback one.

There are a few things missing/blocking for now, WIP:

  • A decodebin3 race condition which is blocking us from selecting source streams correctly
  • A mapping system to allow user to specify which fallback streams should be picked once a given source stream disappears
  • CustomSource needs modifications to work in this new scenario
  • Seqnums usage needs to be more consistent (should match between various events at the beginning, after a seek etc.)
  • Streams need group IDs correctly set

Co-authored-by: @slomo


Example application of some sort

use std::collections::HashMap;
use std::sync::Arc;
use std::sync::Mutex;

use gst::glib;
use gst::prelude::*;

fn main() {
    gst::init().unwrap();

    let pipeline = gst::Pipeline::default();
    let src = gst::ElementFactory::make("fallbacksrc")
        .property("uri", "http://127.0.0.1:8000/output-single.mp4")
        .property("fallback-uri", "http://127.0.0.1:8000/fallback-single.mp4")
        .build()
        .unwrap();

    pipeline.add(&src).unwrap();

    let streams = Arc::new(Mutex::new(HashMap::new()));

    src.connect_pad_added({
        let streams = streams.clone();
        move |src, pad| {
            let pipeline = src.parent().and_downcast::<gst::Pipeline>().unwrap();
            let stream = pad.stream().unwrap();
            println!("Got pad for stream {:#?}", stream.debug());
            let stream_id = stream.stream_id().unwrap();

            if stream.stream_type().contains(gst::StreamType::AUDIO) {
                let audioconvert = gst::ElementFactory::make("audioconvert")
                    .name(format!("audioconvert-{stream_id}"))
                    .build()
                    .unwrap();
                let autoaudiosink = gst::ElementFactory::make("autoaudiosink")
                    .name(format!("autoaudiosink-{stream_id}"))
                    .build()
                    .unwrap();
                pipeline.add(&audioconvert).unwrap();
                pipeline.add(&autoaudiosink).unwrap();

                audioconvert.link(&autoaudiosink).unwrap();

                autoaudiosink.sync_state_with_parent().unwrap();
                audioconvert.sync_state_with_parent().unwrap();

                pad.link(&audioconvert.static_pad("sink").unwrap()).unwrap();
            } else {
                let videoconvert = gst::ElementFactory::make("videoconvert")
                    .name(format!("videoconvert-{stream_id}"))
                    .build()
                    .unwrap();
                let autovideosink = gst::ElementFactory::make("autovideosink")
                    .name(format!("autovideosink-{stream_id}"))
                    .build()
                    .unwrap();
                pipeline.add(&videoconvert).unwrap();
                pipeline.add(&autovideosink).unwrap();

                videoconvert.link(&autovideosink).unwrap();

                autovideosink.sync_state_with_parent().unwrap();
                videoconvert.sync_state_with_parent().unwrap();

                pad.link(&videoconvert.static_pad("sink").unwrap()).unwrap();
            }

            streams.lock().unwrap().insert(pad.clone(), stream);
        }
    });

    src.connect_pad_removed({
        let streams = streams.clone();
        move |src, pad| {
            let pipeline = src.parent().and_downcast::<gst::Pipeline>().unwrap();

            let Some(stream) = streams.lock().unwrap().get(pad).cloned() else {
                return;
            };

            println!("Lost pad for stream {:#?}", stream.debug());
            let stream_id = stream.stream_id().unwrap();

            let videoconvert = pipeline
                .by_name(&format!("videoconvert-{stream_id}"))
                .unwrap();
            let autovideosink = pipeline
                .by_name(&format!("autovideosink-{stream_id}"))
                .unwrap();
            pipeline.remove(&autovideosink).unwrap();
            pipeline.remove(&videoconvert).unwrap();

            autovideosink.set_state(gst::State::Null).unwrap();
            videoconvert.set_state(gst::State::Null).unwrap();
        }
    });

    let (bus_sender, bus_receiver) = async_channel::unbounded();
    let bus = pipeline.bus().unwrap();
    bus.set_sync_handler({
        let src = src.clone();
        move |_bus, msg| {
            use gst::MessageView;

            match msg.view() {
                MessageView::StreamCollection(msg) => {
                    let collection = msg.stream_collection();

                    println!("Received stream-collection: {:#?}", collection.debug());

                    let mut selected_streams = Vec::new();
                    for stream in &collection {
                        let caps = stream.caps().unwrap();
                        let stream_id = stream.stream_id().unwrap();
                        let s = caps.structure(0).unwrap();

                        if s.name() == "video/x-raw" {
                            println!("Selecting stream {stream_id}");
                            selected_streams.push(stream_id);
                        }
                    }

                    src.send_event(
                        gst::event::SelectStreams::builder(
                            &selected_streams
                                .iter()
                                .map(|s| s.as_str())
                                .collect::<Vec<_>>(),
                        )
                        .build(),
                    );
                }
                MessageView::StreamsSelected(msg) => {
                    let streams = msg.streams();

                    for stream in &streams {
                        let stream_id = stream.stream_id().unwrap();
                        println!("Selected {stream_id}");
                    }
                }
                _ => (),
            }

            bus_sender.send_blocking(msg.clone()).unwrap();

            gst::BusSyncReply::Drop
        }
    });

    pipeline.set_state(gst::State::Playing).unwrap();

    glib::MainContext::default().block_on(async {
        while let Ok(msg) = bus_receiver.recv().await {
            use gst::MessageView;

            match msg.view() {
                MessageView::Eos(..) => {
                    println!("EOS");
                    break;
                }
                MessageView::Error(msg) => {
                    println!(
                        "Error from {:?}: {} ({:?})",
                        msg.src().map(|s| s.path_string()),
                        msg.error(),
                        msg.debug()
                    );
                    break;
                }
                _ => (),
            };
        }
    });

    pipeline.set_state(gst::State::Null).unwrap();
}
Edited by Piotr Brzeziński

Merge request reports