Commit b4670721 authored by François Laignel's avatar François Laignel
Browse files

ts/tests/pad: minor ckeanups

parent c8ba7992
Pipeline #626171 passed with stages
in 23 minutes and 50 seconds
......@@ -21,7 +21,6 @@
use futures::channel::mpsc;
use futures::future::BoxFuture;
use futures::lock::Mutex as FutMutex;
use futures::prelude::*;
use gst::glib;
......@@ -33,13 +32,11 @@ use gst::EventView;
use once_cell::sync::Lazy;
use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::Mutex as StdMutex;
use std::sync::Mutex;
use std::time::Duration;
use gstthreadshare::runtime::prelude::*;
use gstthreadshare::runtime::{
Context, PadSink, PadSinkRef, PadSrc, PadSrcRef, PadSrcWeak, Task, TaskState,
};
use gstthreadshare::runtime::{Context, PadSink, PadSinkRef, PadSrc, PadSrcRef, Task, TaskState};
const DEFAULT_CONTEXT: &str = "";
const THROTTLING_DURATION: Duration = Duration::from_millis(2);
......@@ -87,25 +84,6 @@ mod imp_src {
#[derive(Clone, Debug)]
struct PadSrcTestHandler;
impl PadSrcTestHandler {
async fn push_item(
pad: &PadSrcRef<'_>,
item: Item,
) -> Result<gst::FlowSuccess, gst::FlowError> {
gst::debug!(SRC_CAT, obj: pad.gst_pad(), "Handling {:?}", item);
match item {
Item::Event(event) => {
pad.push_event(event).await;
Ok(gst::FlowSuccess::Ok)
}
Item::Buffer(buffer) => pad.push(buffer).await,
Item::BufferList(list) => pad.push_list(list).await,
}
}
}
impl PadSrcHandler for PadSrcTestHandler {
type ElementImpl = ElementSrcTest;
......@@ -144,21 +122,12 @@ mod imp_src {
#[derive(Debug)]
struct ElementSrcTestTask {
element: super::ElementSrcTest,
src_pad: PadSrcWeak,
receiver: mpsc::Receiver<Item>,
}
impl ElementSrcTestTask {
fn new(
element: &super::ElementSrcTest,
src_pad: &PadSrc,
receiver: mpsc::Receiver<Item>,
) -> Self {
ElementSrcTestTask {
element: element.clone(),
src_pad: src_pad.downgrade(),
receiver,
}
fn new(element: super::ElementSrcTest, receiver: mpsc::Receiver<Item>) -> Self {
ElementSrcTestTask { element, receiver }
}
}
......@@ -167,6 +136,20 @@ mod imp_src {
// Purge the channel
while let Ok(Some(_item)) = self.receiver.try_next() {}
}
async fn push_item(&self, item: Item) -> Result<gst::FlowSuccess, gst::FlowError> {
gst::debug!(SRC_CAT, obj: &self.element, "Handling {:?}", item);
let elementsrctest = self.element.imp();
match item {
Item::Event(event) => {
elementsrctest.src_pad.push_event(event).await;
Ok(gst::FlowSuccess::Ok)
}
Item::Buffer(buffer) => elementsrctest.src_pad.push(buffer).await,
Item::BufferList(list) => elementsrctest.src_pad.push_list(list).await,
}
}
}
impl TaskImpl for ElementSrcTestTask {
......@@ -182,8 +165,7 @@ mod imp_src {
}
};
let pad = self.src_pad.upgrade().expect("PadSrc no longer exists");
let res = PadSrcTestHandler::push_item(&pad, item).await;
let res = self.push_item(item).await;
match res {
Ok(_) => gst::log!(SRC_CAT, obj: &self.element, "Successfully pushed item"),
Err(gst::FlowError::Flushing) => {
......@@ -222,8 +204,8 @@ mod imp_src {
pub struct ElementSrcTest {
src_pad: PadSrc,
task: Task,
sender: StdMutex<Option<mpsc::Sender<Item>>>,
settings: StdMutex<Settings>,
sender: Mutex<Option<mpsc::Sender<Item>>>,
settings: Mutex<Settings>,
}
impl ElementSrcTest {
......@@ -259,10 +241,7 @@ mod imp_src {
*self.sender.lock().unwrap() = Some(sender);
self.task
.prepare(
ElementSrcTestTask::new(element, &self.src_pad, receiver),
context,
)
.prepare(ElementSrcTestTask::new(element.clone(), receiver), context)
.map_err(|err| {
gst::error_msg!(
gst::ResourceError::Failed,
......@@ -316,8 +295,8 @@ mod imp_src {
PadSrcTestHandler,
),
task: Task::default(),
sender: StdMutex::new(None),
settings: StdMutex::new(Settings::default()),
sender: Mutex::new(None),
settings: Mutex::new(Settings::default()),
}
}
}
......@@ -564,7 +543,7 @@ mod imp_sink {
pub struct ElementSinkTest {
sink_pad: PadSink,
flushing: AtomicBool,
sender: FutMutex<Option<mpsc::Sender<Item>>>,
sender: Mutex<Option<mpsc::Sender<Item>>>,
}
impl ElementSinkTest {
......@@ -575,11 +554,14 @@ mod imp_sink {
) -> Result<gst::FlowSuccess, gst::FlowError> {
if !self.flushing.load(Ordering::SeqCst) {
gst::debug!(SINK_CAT, obj: element, "Fowarding {:?}", item);
self.sender
let mut sender = self
.sender
.lock()
.await
.unwrap()
.as_mut()
.expect("Item Sender not set")
.clone();
sender
.send(item)
.await
.map(|_| gst::FlowSuccess::Ok)
......@@ -647,7 +629,7 @@ mod imp_sink {
PadSinkTestHandler,
),
flushing: AtomicBool::new(true),
sender: FutMutex::new(None),
sender: Mutex::new(None),
}
}
}
......@@ -680,7 +662,7 @@ mod imp_sink {
.get::<&ItemSender>()
.expect("type checked upstream")
.clone();
*futures::executor::block_on(self.sender.lock()) = Some(sender);
*self.sender.lock().unwrap() = Some(sender);
}
_ => unimplemented!(),
}
......
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment