Manually dropping video frame in element
I have been working on an element to selectively drop video frames in a video encoding pipeline according to a boolean value set in a Meta
written to the buffer upstream. My frameselect
plugin implementation is based on the identity
example in the plugin repo and the metadata example in this rep. I have been struggling with 2 issues blocking progress for the past couple days and would appreciate any insights. Please let me know if there would be a more appropriate place to move this issue.
The issues I am having:
Metadata is written to upstream of the frameselect
element in a buffer probe of the source pad of a video decoder, but it cannot be read in the frameselect
sink_chain
method (the meta api type is read as glib::Type::Invalid
). However, I am able to write and read this Meta
successfully from a contrived, source / sink element pipeline as is done in this example
Independently of passing the custom Meta
, I tried to validate my approach to "dropping" frames from the decoder. As a test (to get around the failure to read the metadata), I set up a simple mutex protected counter in the FrameSelect
element and would return Ok(gst_base::BASE_TRANSFORM_FLOW_DROPPED)
at a given frequency based on the counter value, otherwise I would push the buffer to downstream to the source and return Ok(gst::FlowSuccess::Ok)
. Unfortunately this fails to drop frames as expected.
The custom metadata implementation:
use gst::prelude::*;
use std::fmt;
use std::ptr;
// Public Rust type for the custom meta.
#[repr(transparent)]
pub struct SelectMeta(imp::SelectMeta);
// Metas must be Send+Sync.
unsafe impl Send for SelectMeta {}
unsafe impl Sync for SelectMeta {}
impl SelectMeta {
// Add a new custom meta to the buffer with the given select value.
pub fn add(
buffer: &mut gst::BufferRef,
select: bool,
) -> gst::MetaRefMut<Self, gst::meta::Standalone> {
unsafe {
// First add it: this will store an empty label via custom_meta_init().
let meta = gst::ffi::gst_buffer_add_meta(
buffer.as_mut_ptr(),
imp::custom_meta_get_info(),
ptr::null_mut(),
) as *mut imp::SelectMeta;
// Then actually set the label.
{
let meta = &mut *meta;
meta.select = select;
}
Self::from_mut_ptr(buffer, meta)
}
}
// Retrieve the stored label.
pub fn get_select(&self) -> bool {
self.0.select
}
}
// Trait to allow using the gst::Buffer API with this meta.
unsafe impl MetaAPI for SelectMeta {
type GstType = imp::SelectMeta;
fn get_meta_api() -> glib::Type {
imp::custom_meta_api_get_type()
}
}
impl fmt::Debug for SelectMeta {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
f.debug_struct("SelectMeta")
.field("select", &self.get_select())
.finish()
}
}
// Actual unsafe implementation of the meta.
mod imp {
use glib::translate::*;
use once_cell::sync::Lazy;
use std::mem;
use std::ptr;
// This is the C type that is actually stored as meta inside the buffers.
#[repr(C)]
pub struct SelectMeta {
parent: gst::ffi::GstMeta,
pub(super) select: bool,
}
// Function to register the meta API and get a type back.
pub(super) fn custom_meta_api_get_type() -> glib::Type {
static TYPE: Lazy<glib::Type> = Lazy::new(|| unsafe {
let t = from_glib(gst::ffi::gst_meta_api_type_register(
b"SelectMetaAPI\0".as_ptr() as *const _,
// We provide no tags here as our meta is just a label and does
// not refer to any specific aspect of the buffer
[ptr::null::<std::os::raw::c_char>()].as_ptr() as *mut *const _,
));
assert_ne!(t, glib::Type::Invalid);
t
});
*TYPE
}
// Initialization function for our meta. This needs to ensure all fields are correctly
// initialized. They will contain random memory before.
unsafe extern "C" fn custom_meta_init(
meta: *mut gst::ffi::GstMeta,
_params: glib::ffi::gpointer,
_buffer: *mut gst::ffi::GstBuffer,
) -> glib::ffi::gboolean {
let meta = &mut *(meta as *mut SelectMeta);
// Need to initialize all our fields correctly here
ptr::write(&mut meta.select, false);
true.to_glib()
}
// Free function for our meta. This needs to free/drop all memory we allocated.
unsafe extern "C" fn custom_meta_free(
meta: *mut gst::ffi::GstMeta,
_buffer: *mut gst::ffi::GstBuffer,
) {
let meta = &mut *(meta as *mut SelectMeta);
// Need to free/drop all our fields here.
ptr::drop_in_place(&mut meta.select);
}
// Transform function for our meta. This needs to get it from the old buffer to the new one
// in a way that is compatible with the transformation type. In this case we just always
// copy it over.
unsafe extern "C" fn custom_meta_transform(
dest: *mut gst::ffi::GstBuffer,
meta: *mut gst::ffi::GstMeta,
_buffer: *mut gst::ffi::GstBuffer,
_type_: glib::ffi::GQuark,
_data: glib::ffi::gpointer,
) -> glib::ffi::gboolean {
let meta = &mut *(meta as *mut SelectMeta);
// We simply copy over our meta here. Other metas might have to look at the type
// and do things conditional on that, or even just drop the meta.
super::SelectMeta::add(gst::BufferRef::from_mut_ptr(dest), meta.select.clone());
true.to_glib()
}
// Register the meta itself with its functions.
pub(super) fn custom_meta_get_info() -> *const gst::ffi::GstMetaInfo {
struct MetaInfo(ptr::NonNull<gst::ffi::GstMetaInfo>);
unsafe impl Send for MetaInfo {}
unsafe impl Sync for MetaInfo {}
static META_INFO: Lazy<MetaInfo> = Lazy::new(|| unsafe {
MetaInfo(
ptr::NonNull::new(gst::ffi::gst_meta_register(
custom_meta_api_get_type().to_glib(),
b"SelectMeta\0".as_ptr() as *const _,
mem::size_of::<SelectMeta>(),
Some(custom_meta_init),
Some(custom_meta_free),
Some(custom_meta_transform),
) as *mut gst::ffi::GstMetaInfo)
.expect("Failed to register meta API"),
)
});
META_INFO.0.as_ptr()
}
}
The upstream probe:
decoder.get_static_pad("src").unwrap().add_probe(
gst::PadProbeType::BUFFER,
|_pad, info| {
if let Some(gst::PadProbeData::Buffer(ref mut buffer)) = &mut info.data {
{
let buffer = buffer.make_mut();
selectmeta::SelectMeta::add(buffer, true);
}
}
gst::PadProbeReturn::Ok
},
);
The frame select element:
use gst::{gst_debug, gst_element_error, gst_info, gst_loggable_error};
use crate::selectmeta;
use glib::subclass;
use glib::subclass::prelude::*;
use gst::prelude::*;
use gst::subclass::prelude::*;
use gst::{gst_log, gst_trace};
use once_cell::sync::Lazy;
static CAT: Lazy<gst::DebugCategory> = Lazy::new(|| {
gst::DebugCategory::new(
"frameselect",
gst::DebugColorFlags::empty(),
Some("FrameSelect Element"),
)
});
pub struct FrameSelect {
srcpad: gst::Pad,
sinkpad: gst::Pad,
}
impl FrameSelect {
fn sink_chain(
&self,
pad: &gst::Pad,
_element: &super::FrameSelect,
buffer: gst::Buffer,
) -> Result<gst::FlowSuccess, gst::FlowError> {
gst_log!(CAT, obj: pad, "Handling buffer {:?}", buffer);
let meta = buffer
.get_meta::<selectmeta::SelectMeta>()
.expect("No custom meta found");
match meta.get_select() {
true => {
self.srcpad.push(buffer)
},
false => {
Ok(gst_base::BASE_TRANSFORM_FLOW_DROPPED)
}
}
}
fn sink_event(&self, pad: &gst::Pad, _element: &super::FrameSelect, event: gst::Event) -> bool {
gst_log!(CAT, obj: pad, "Handling event {:?}", event);
self.srcpad.push_event(event)
}
fn sink_query(
&self,
pad: &gst::Pad,
_element: &super::FrameSelect,
query: &mut gst::QueryRef,
) -> bool {
gst_log!(CAT, obj: pad, "Handling query {:?}", query);
self.srcpad.peer_query(query)
}
fn src_event(&self, pad: &gst::Pad, _element: &super::FrameSelect, event: gst::Event) -> bool {
gst_log!(CAT, obj: pad, "Handling event {:?}", event);
self.sinkpad.push_event(event)
}
fn src_query(
&self,
pad: &gst::Pad,
_element: &super::FrameSelect,
query: &mut gst::QueryRef,
) -> bool {
gst_log!(CAT, obj: pad, "Handling query {:?}", query);
self.sinkpad.peer_query(query)
}
}
impl ObjectSubclass for FrameSelect {
const NAME: &'static str = "frameselect";
type Type = super::FrameSelect;
type ParentType = gst::Element;
type Instance = gst::subclass::ElementInstanceStruct<Self>;
type Class = subclass::simple::ClassStruct<Self>;
glib::glib_object_subclass!();
fn with_class(klass: &Self::Class) -> Self {
let templ = klass.get_pad_template("sink").unwrap();
let sinkpad = gst::Pad::builder_with_template(&templ, Some("sink"))
.chain_function(|pad, parent, buffer| {
FrameSelect::catch_panic_pad_function(
parent,
|| Err(gst::FlowError::Error),
|frameselect, element| frameselect.sink_chain(pad, element, buffer),
)
})
.event_function(|pad, parent, event| {
FrameSelect::catch_panic_pad_function(
parent,
|| false,
|frameselect, element| frameselect.sink_event(pad, element, event),
)
})
.query_function(|pad, parent, query| {
FrameSelect::catch_panic_pad_function(
parent,
|| false,
|frameselect, element| frameselect.sink_query(pad, element, query),
)
})
.build();
let templ = klass.get_pad_template("src").unwrap();
let srcpad = gst::Pad::builder_with_template(&templ, Some("src"))
.event_function(|pad, parent, event| {
FrameSelect::catch_panic_pad_function(
parent,
|| false,
|frameselect, element| frameselect.src_event(pad, element, event),
)
})
.query_function(|pad, parent, query| {
FrameSelect::catch_panic_pad_function(
parent,
|| false,
|frameselect, element| frameselect.src_query(pad, element, query),
)
})
.build();
Self { srcpad, sinkpad }
}
fn class_init(klass: &mut Self::Class) {
klass.set_metadata(
"FrameSelect",
"Generic",
"",
"Ryan Brigden <rpb@hey.com>",
);
let caps = gst::Caps::new_any();
let src_pad_template = gst::PadTemplate::new(
"src",
gst::PadDirection::Src,
gst::PadPresence::Always,
&caps,
)
.unwrap();
klass.add_pad_template(src_pad_template);
let sink_pad_template = gst::PadTemplate::new(
"sink",
gst::PadDirection::Sink,
gst::PadPresence::Always,
&caps,
)
.unwrap();
klass.add_pad_template(sink_pad_template);
}
}
impl ObjectImpl for FrameSelect {
fn constructed(&self, obj: &Self::Type) {
self.parent_constructed(obj);
obj.add_pad(&self.sinkpad).unwrap();
obj.add_pad(&self.srcpad).unwrap();
}
}
impl ElementImpl for FrameSelect {
fn change_state(
&self,
element: &Self::Type,
transition: gst::StateChange,
) -> Result<gst::StateChangeSuccess, gst::StateChangeError> {
gst_trace!(CAT, obj: element, "Changing state {:?}", transition);
self.parent_change_state(element, transition)
}
}