Commit 9ffbc089 authored by Sebastian Dröge's avatar Sebastian Dröge 🍵
Browse files

WIP: Various fixups for the position query

parent bc607238
Pipeline #35910 failed with stages
in 10 minutes and 43 seconds
......@@ -100,6 +100,7 @@ struct State {
precomputed_key: box_::PrecomputedKey,
block_size: u32,
write_headers: bool,
pending_bytes: u64,
}
impl State {
......@@ -153,6 +154,7 @@ impl State {
nonce,
block_size: props.block_size,
write_headers: true,
pending_bytes: 0,
})
}
......@@ -183,6 +185,7 @@ impl State {
let buffer = self.adapter.take_buffer(block_size).unwrap();
let out_buf = self.encrypt_message(&buffer);
self.pending_bytes += out_buf.get_size() as u64;
buffers.push(out_buf);
}
......@@ -250,7 +253,9 @@ impl Encrypter {
// Write the block_size into the stream
headers.extend_from_slice(&state.block_size.to_le_bytes());
buffers.push(gst::Buffer::from_mut_slice(headers));
let out_buf = gst::Buffer::from_mut_slice(headers);
state.pending_bytes += out_buf.get_size() as u64;
buffers.push(out_buf);
state.write_headers = false;
}
......@@ -275,12 +280,24 @@ impl Encrypter {
drop(state_guard);
for buffer in buffers {
{
let mut state_guard = self.state.lock().unwrap();
let state = state_guard.as_mut().unwrap();
state.pending_bytes -= buffer.get_size() as u64;
}
self.srcpad.push(buffer).map_err(|err| {
gst_error!(CAT, obj: element, "Failed to push buffer {:?}", err);
err
})?;
}
{
let mut state_guard = self.state.lock().unwrap();
let state = state_guard.as_mut().unwrap();
assert_eq!(state.pending_bytes, 0);
}
Ok(gst::FlowSuccess::Ok)
}
......@@ -416,14 +433,14 @@ impl Encrypter {
}
/* First let's query the bytes duration upstream */
let mut q = gst::query::Query::new_position(gst::Format::Bytes);
let mut peer_query = gst::query::Query::new_position(gst::Format::Bytes);
if !self.sinkpad.peer_query(&mut q) {
if !self.sinkpad.peer_query(&mut peer_query) {
gst_error!(CAT, "Failed to query upstream duration");
return false;
}
let position = match q.get_result().try_into_bytes().unwrap() {
let position = match peer_query.get_result().try_into_bytes().unwrap() {
gst::format::Bytes(Some(size)) => size,
gst::format::Bytes(None) => {
gst_error!(CAT, "Failed to query upstream duration");
......@@ -440,12 +457,23 @@ impl Encrypter {
Some(s) => s,
};
dbg!(position);
let position = position - state.adapter.available() as u64;
dbg!(position);
let chunk_index = position as u64 / state.block_size as u64;
dbg!(chunk_index);
dbg!(state.block_size);
let position =
position + (chunk_index * box_::MACBYTES as u64) + super::HEADERS_SIZE as u64;
dbg!(box_::MACBYTES);
dbg!(super::HEADERS_SIZE);
dbg!(position);
dbg!(state.pending_bytes);
let position = position - state.pending_bytes;
dbg!(gst::format::Bytes::from(position));
q.set(gst::format::Bytes::from(position));
dbg!(q.get_structure());
true
}
_ => pad.query_default(element, query),
......
......@@ -29,6 +29,9 @@ extern crate gstrssodium;
use glib::prelude::*;
use gst::prelude::*;
use std::path::PathBuf;
use std::sync::{Arc, Mutex};
lazy_static! {
static ref RECEIVER_PUBLIC: glib::Bytes = {
let public = [
......@@ -109,65 +112,102 @@ fn encrypt_file() {
}
#[test]
fn test_querries() {
fn test_queries() {
init();
let input = include_bytes!("sample.mp3");
let expected_output = include_bytes!("encrypted_sample.enc");
let mut adapter = gst_base::UniqueAdapter::new();
let input_path = {
let mut r = PathBuf::new();
r.push(env!("CARGO_MANIFEST_DIR"));
r.push("tests");
r.push("sample");
r.set_extension("mp3");
r
};
let pipeline = gst::Pipeline::new(None);
let src = gst::ElementFactory::make("filesrc", None).unwrap();
src.set_property("location", &input_path.to_str().unwrap())
.unwrap();
let enc = gst::ElementFactory::make("rssodiumencrypter", None).unwrap();
let sink = gst::ElementFactory::make("appsink", None).unwrap();
enc.set_property("sender-key", &*SENDER_PRIVATE)
.expect("failed to set property");
enc.set_property("receiver-key", &*RECEIVER_PUBLIC)
.expect("failed to set property");
enc.set_property("block-size", &1024u32)
.expect("failed to set property");
pipeline.add_many(&[&src, &enc, &sink]).unwrap();
src.link_pads(Some("src"), &enc, Some("sink")).unwrap();
enc.link_pads(Some("src"), &sink, Some("sink")).unwrap();
let sink = sink.downcast::<gst_app::AppSink>().unwrap();
let adapter = Arc::new(Mutex::new(gst_base::UniqueAdapter::new()));
let adapter_clone = adapter.clone();
let enc_clone = enc.clone();
sink.set_callbacks(
gst_app::AppSinkCallbacks::new()
// Add a handler to the "new-sample" signal.
.new_sample(move |appsink| {
// Pull the sample in question out of the appsink's buffer.
let sample = appsink.pull_sample().ok_or(gst::FlowError::Eos)?;
let buffer = sample.get_buffer().ok_or(gst::FlowError::Error)?;
let mut adapter = adapter_clone.lock().unwrap();
adapter.push(buffer.to_owned());
let pos = enc_clone.query_position::<gst::format::Bytes>().unwrap();
dbg!(pos);
dbg!(adapter.available());
assert_eq!(gst::format::Bytes(Some(adapter.available() as u64)), pos);
let dur = enc_clone.query_duration::<gst::format::Bytes>().unwrap();
assert_eq!(gst::format::Bytes(Some(6043)), dur);
Ok(gst::FlowSuccess::Ok)
})
.build(),
);
let mut h = gst_check::Harness::new_with_element(&enc, None, None);
h.add_element_src_pad(&enc.get_static_pad("src").expect("failed to get src pad"));
h.add_element_sink_pad(&enc.get_static_pad("sink").expect("failed to get src pad"));
h.set_src_caps_str("application/x-sodium-encrypted");
pipeline.set_state(gst::State::Playing).unwrap();
let bus = pipeline.get_bus().unwrap();
for msg in bus.iter_timed(gst::CLOCK_TIME_NONE) {
use gst::MessageView;
match msg.view() {
MessageView::Error(err) => {
eprintln!(
"Error received from element {:?}: {}",
err.get_src().map(|s| s.get_path_string()),
err.get_error()
);
eprintln!("Debugging information: {:?}", err.get_debug());
assert!(true);
break;
}
MessageView::Eos(..) => break,
_ => (),
}
}
let buf = gst::Buffer::from_mut_slice(Vec::from(&input[..]));
let expected_output = include_bytes!("encrypted_sample.enc");
let mut adapter = adapter.lock().unwrap();
let available = adapter.available();
assert_eq!(h.push(buf), Ok(gst::FlowSuccess::Ok));
h.push_event(gst::Event::new_eos().build());
// Query position/duration at eos
let pos = enc.query_position::<gst::format::Bytes>().unwrap();
assert_eq!(gst::format::Bytes(Some(expected_output.len() as u64)), pos);
assert_eq!(gst::format::Bytes(Some(available as u64)), pos);
// Query position at 0
let q1 = enc.query_position::<gst::format::Bytes>().unwrap();
assert_eq!(gst::format::Bytes(Some(0)), q1);
// Query position after a buffer pull
let buf1 = h.pull().unwrap();
let s1 = buf1.get_size() as u64;
let q1 = enc.query_position::<gst::format::Bytes>().unwrap();
assert_eq!(gst::format::Bytes(Some(s1)), q1);
adapter.push(buf1);
// Query position after 2 buffer pulls
let buf2 = h.pull().unwrap();
let s2 = buf2.get_size() as u64;
let q2 = enc.query_position::<gst::format::Bytes>().unwrap();
// query pos == b1 + b2 len()
assert_eq!(gst::format::Bytes(Some(s1 + s2)), q2);
adapter.push(buf2);
let dur = enc.query_duration::<gst::format::Bytes>().unwrap();
assert_eq!(gst::format::Bytes(Some(6043)), dur);
while let Some(buf) = h.pull() {
adapter.push(buf);
if adapter.available() >= expected_output.len() {
break;
}
}
assert_eq!(available, expected_output.len());
let output_buffer = adapter.take_buffer(available).unwrap();
let output = output_buffer.map_readable().unwrap();
assert_eq!(expected_output.as_ref(), output.as_ref());
// Query position at eos
let q_eos = enc.query_position::<gst::format::Bytes>().unwrap();
assert_eq!(
gst::format::Bytes(Some(expected_output.len() as u64)),
q_eos
);
assert_eq!(gst::format::Bytes(Some(adapter.available() as u64)), q_eos);
pipeline.set_state(gst::State::Null).unwrap();
}
#[test]
......
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment