From bc607238e5ff19836153e07dc93474fd7e1a5bda Mon Sep 17 00:00:00 2001 From: Jordan Petridis Date: Tue, 7 May 2019 14:34:39 +0300 Subject: [PATCH 1/2] WIP: Implement and test position query --- gst-plugin-sodium/src/encrypter.rs | 42 ++++++++++++++++++- gst-plugin-sodium/tests/encrypter.rs | 62 ++++++++++++++++++++++++++++ 2 files changed, 102 insertions(+), 2 deletions(-) diff --git a/gst-plugin-sodium/src/encrypter.rs b/gst-plugin-sodium/src/encrypter.rs index 565687ec..71299fda 100644 --- a/gst-plugin-sodium/src/encrypter.rs +++ b/gst-plugin-sodium/src/encrypter.rs @@ -389,8 +389,8 @@ impl Encrypter { } }; - let state = self.state.lock().unwrap(); - let state = match state.as_ref() { + let state_mutex = self.state.lock().unwrap(); + let state = match state_mutex.as_ref() { // If state isn't set, it means that the // element hasn't been activated yet. None => return false, @@ -410,6 +410,44 @@ impl Encrypter { true } + QueryView::Position(ref mut q) => { + if q.get_format() != gst::Format::Bytes { + return pad.query_default(element, query); + } + + /* First let's query the bytes duration upstream */ + let mut q = gst::query::Query::new_position(gst::Format::Bytes); + + if !self.sinkpad.peer_query(&mut q) { + gst_error!(CAT, "Failed to query upstream duration"); + return false; + } + + let position = match q.get_result().try_into_bytes().unwrap() { + gst::format::Bytes(Some(size)) => size, + gst::format::Bytes(None) => { + gst_error!(CAT, "Failed to query upstream duration"); + + return false; + } + }; + + let state_mutex = self.state.lock().unwrap(); + let state = match state_mutex.as_ref() { + // If state isn't set, it means that the + // element hasn't been activated yet. + None => return false, + Some(s) => s, + }; + + dbg!(position); + let chunk_index = position as u64 / state.block_size as u64; + let position = + position + (chunk_index * box_::MACBYTES as u64) + super::HEADERS_SIZE as u64; + + q.set(gst::format::Bytes::from(position)); + true + } _ => pad.query_default(element, query), } } diff --git a/gst-plugin-sodium/tests/encrypter.rs b/gst-plugin-sodium/tests/encrypter.rs index 93854e5f..61aa6a74 100644 --- a/gst-plugin-sodium/tests/encrypter.rs +++ b/gst-plugin-sodium/tests/encrypter.rs @@ -108,6 +108,68 @@ fn encrypt_file() { assert_eq!(map.as_ref(), expected_output.as_ref()); } +#[test] +fn test_querries() { + init(); + + let input = include_bytes!("sample.mp3"); + let expected_output = include_bytes!("encrypted_sample.enc"); + + let mut adapter = gst_base::UniqueAdapter::new(); + + let enc = gst::ElementFactory::make("rssodiumencrypter", None).unwrap(); + enc.set_property("sender-key", &*SENDER_PRIVATE) + .expect("failed to set property"); + enc.set_property("receiver-key", &*RECEIVER_PUBLIC) + .expect("failed to set property"); + enc.set_property("block-size", &1024u32) + .expect("failed to set property"); + + let mut h = gst_check::Harness::new_with_element(&enc, None, None); + h.add_element_src_pad(&enc.get_static_pad("src").expect("failed to get src pad")); + h.add_element_sink_pad(&enc.get_static_pad("sink").expect("failed to get src pad")); + h.set_src_caps_str("application/x-sodium-encrypted"); + + let buf = gst::Buffer::from_mut_slice(Vec::from(&input[..])); + + assert_eq!(h.push(buf), Ok(gst::FlowSuccess::Ok)); + h.push_event(gst::Event::new_eos().build()); + + // Query position at 0 + let q1 = enc.query_position::().unwrap(); + assert_eq!(gst::format::Bytes(Some(0)), q1); + + // Query position after a buffer pull + let buf1 = h.pull().unwrap(); + let s1 = buf1.get_size() as u64; + let q1 = enc.query_position::().unwrap(); + assert_eq!(gst::format::Bytes(Some(s1)), q1); + adapter.push(buf1); + + // Query position after 2 buffer pulls + let buf2 = h.pull().unwrap(); + let s2 = buf2.get_size() as u64; + let q2 = enc.query_position::().unwrap(); + // query pos == b1 + b2 len() + assert_eq!(gst::format::Bytes(Some(s1 + s2)), q2); + adapter.push(buf2); + + while let Some(buf) = h.pull() { + adapter.push(buf); + if adapter.available() >= expected_output.len() { + break; + } + } + + // Query position at eos + let q_eos = enc.query_position::().unwrap(); + assert_eq!( + gst::format::Bytes(Some(expected_output.len() as u64)), + q_eos + ); + assert_eq!(gst::format::Bytes(Some(adapter.available() as u64)), q_eos); +} + #[test] fn test_state_changes() { init(); -- GitLab From 9ffbc0896035fe6454b7c655794b4a3abe6a707e Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Sebastian=20Dr=C3=B6ge?= Date: Thu, 9 May 2019 12:29:19 +0300 Subject: [PATCH 2/2] WIP: Various fixups for the position query --- gst-plugin-sodium/src/encrypter.rs | 36 +++++++- gst-plugin-sodium/tests/encrypter.rs | 126 ++++++++++++++++++--------- 2 files changed, 115 insertions(+), 47 deletions(-) diff --git a/gst-plugin-sodium/src/encrypter.rs b/gst-plugin-sodium/src/encrypter.rs index 71299fda..56dbe3e1 100644 --- a/gst-plugin-sodium/src/encrypter.rs +++ b/gst-plugin-sodium/src/encrypter.rs @@ -100,6 +100,7 @@ struct State { precomputed_key: box_::PrecomputedKey, block_size: u32, write_headers: bool, + pending_bytes: u64, } impl State { @@ -153,6 +154,7 @@ impl State { nonce, block_size: props.block_size, write_headers: true, + pending_bytes: 0, }) } @@ -183,6 +185,7 @@ impl State { let buffer = self.adapter.take_buffer(block_size).unwrap(); let out_buf = self.encrypt_message(&buffer); + self.pending_bytes += out_buf.get_size() as u64; buffers.push(out_buf); } @@ -250,7 +253,9 @@ impl Encrypter { // Write the block_size into the stream headers.extend_from_slice(&state.block_size.to_le_bytes()); - buffers.push(gst::Buffer::from_mut_slice(headers)); + let out_buf = gst::Buffer::from_mut_slice(headers); + state.pending_bytes += out_buf.get_size() as u64; + buffers.push(out_buf); state.write_headers = false; } @@ -275,12 +280,24 @@ impl Encrypter { drop(state_guard); for buffer in buffers { + { + let mut state_guard = self.state.lock().unwrap(); + let state = state_guard.as_mut().unwrap(); + state.pending_bytes -= buffer.get_size() as u64; + } + self.srcpad.push(buffer).map_err(|err| { gst_error!(CAT, obj: element, "Failed to push buffer {:?}", err); err })?; } + { + let mut state_guard = self.state.lock().unwrap(); + let state = state_guard.as_mut().unwrap(); + assert_eq!(state.pending_bytes, 0); + } + Ok(gst::FlowSuccess::Ok) } @@ -416,14 +433,14 @@ impl Encrypter { } /* First let's query the bytes duration upstream */ - let mut q = gst::query::Query::new_position(gst::Format::Bytes); + let mut peer_query = gst::query::Query::new_position(gst::Format::Bytes); - if !self.sinkpad.peer_query(&mut q) { + if !self.sinkpad.peer_query(&mut peer_query) { gst_error!(CAT, "Failed to query upstream duration"); return false; } - let position = match q.get_result().try_into_bytes().unwrap() { + let position = match peer_query.get_result().try_into_bytes().unwrap() { gst::format::Bytes(Some(size)) => size, gst::format::Bytes(None) => { gst_error!(CAT, "Failed to query upstream duration"); @@ -440,12 +457,23 @@ impl Encrypter { Some(s) => s, }; + dbg!(position); + let position = position - state.adapter.available() as u64; dbg!(position); let chunk_index = position as u64 / state.block_size as u64; + dbg!(chunk_index); + dbg!(state.block_size); let position = position + (chunk_index * box_::MACBYTES as u64) + super::HEADERS_SIZE as u64; + dbg!(box_::MACBYTES); + dbg!(super::HEADERS_SIZE); + dbg!(position); + dbg!(state.pending_bytes); + let position = position - state.pending_bytes; + dbg!(gst::format::Bytes::from(position)); q.set(gst::format::Bytes::from(position)); + dbg!(q.get_structure()); true } _ => pad.query_default(element, query), diff --git a/gst-plugin-sodium/tests/encrypter.rs b/gst-plugin-sodium/tests/encrypter.rs index 61aa6a74..01f94bca 100644 --- a/gst-plugin-sodium/tests/encrypter.rs +++ b/gst-plugin-sodium/tests/encrypter.rs @@ -29,6 +29,9 @@ extern crate gstrssodium; use glib::prelude::*; use gst::prelude::*; +use std::path::PathBuf; +use std::sync::{Arc, Mutex}; + lazy_static! { static ref RECEIVER_PUBLIC: glib::Bytes = { let public = [ @@ -109,65 +112,102 @@ fn encrypt_file() { } #[test] -fn test_querries() { +fn test_queries() { init(); - let input = include_bytes!("sample.mp3"); - let expected_output = include_bytes!("encrypted_sample.enc"); - - let mut adapter = gst_base::UniqueAdapter::new(); + let input_path = { + let mut r = PathBuf::new(); + r.push(env!("CARGO_MANIFEST_DIR")); + r.push("tests"); + r.push("sample"); + r.set_extension("mp3"); + r + }; + let pipeline = gst::Pipeline::new(None); + let src = gst::ElementFactory::make("filesrc", None).unwrap(); + src.set_property("location", &input_path.to_str().unwrap()) + .unwrap(); let enc = gst::ElementFactory::make("rssodiumencrypter", None).unwrap(); + let sink = gst::ElementFactory::make("appsink", None).unwrap(); enc.set_property("sender-key", &*SENDER_PRIVATE) .expect("failed to set property"); enc.set_property("receiver-key", &*RECEIVER_PUBLIC) .expect("failed to set property"); enc.set_property("block-size", &1024u32) .expect("failed to set property"); + pipeline.add_many(&[&src, &enc, &sink]).unwrap(); + src.link_pads(Some("src"), &enc, Some("sink")).unwrap(); + enc.link_pads(Some("src"), &sink, Some("sink")).unwrap(); + let sink = sink.downcast::().unwrap(); + + let adapter = Arc::new(Mutex::new(gst_base::UniqueAdapter::new())); + + let adapter_clone = adapter.clone(); + let enc_clone = enc.clone(); + sink.set_callbacks( + gst_app::AppSinkCallbacks::new() + // Add a handler to the "new-sample" signal. + .new_sample(move |appsink| { + // Pull the sample in question out of the appsink's buffer. + let sample = appsink.pull_sample().ok_or(gst::FlowError::Eos)?; + let buffer = sample.get_buffer().ok_or(gst::FlowError::Error)?; + + let mut adapter = adapter_clone.lock().unwrap(); + adapter.push(buffer.to_owned()); + + let pos = enc_clone.query_position::().unwrap(); + dbg!(pos); + dbg!(adapter.available()); + assert_eq!(gst::format::Bytes(Some(adapter.available() as u64)), pos); + + let dur = enc_clone.query_duration::().unwrap(); + assert_eq!(gst::format::Bytes(Some(6043)), dur); + + Ok(gst::FlowSuccess::Ok) + }) + .build(), + ); - let mut h = gst_check::Harness::new_with_element(&enc, None, None); - h.add_element_src_pad(&enc.get_static_pad("src").expect("failed to get src pad")); - h.add_element_sink_pad(&enc.get_static_pad("sink").expect("failed to get src pad")); - h.set_src_caps_str("application/x-sodium-encrypted"); + pipeline.set_state(gst::State::Playing).unwrap(); + + let bus = pipeline.get_bus().unwrap(); + for msg in bus.iter_timed(gst::CLOCK_TIME_NONE) { + use gst::MessageView; + match msg.view() { + MessageView::Error(err) => { + eprintln!( + "Error received from element {:?}: {}", + err.get_src().map(|s| s.get_path_string()), + err.get_error() + ); + eprintln!("Debugging information: {:?}", err.get_debug()); + assert!(true); + break; + } + MessageView::Eos(..) => break, + _ => (), + } + } - let buf = gst::Buffer::from_mut_slice(Vec::from(&input[..])); + let expected_output = include_bytes!("encrypted_sample.enc"); + let mut adapter = adapter.lock().unwrap(); + let available = adapter.available(); - assert_eq!(h.push(buf), Ok(gst::FlowSuccess::Ok)); - h.push_event(gst::Event::new_eos().build()); + // Query position/duration at eos + let pos = enc.query_position::().unwrap(); + assert_eq!(gst::format::Bytes(Some(expected_output.len() as u64)), pos); + assert_eq!(gst::format::Bytes(Some(available as u64)), pos); - // Query position at 0 - let q1 = enc.query_position::().unwrap(); - assert_eq!(gst::format::Bytes(Some(0)), q1); - - // Query position after a buffer pull - let buf1 = h.pull().unwrap(); - let s1 = buf1.get_size() as u64; - let q1 = enc.query_position::().unwrap(); - assert_eq!(gst::format::Bytes(Some(s1)), q1); - adapter.push(buf1); - - // Query position after 2 buffer pulls - let buf2 = h.pull().unwrap(); - let s2 = buf2.get_size() as u64; - let q2 = enc.query_position::().unwrap(); - // query pos == b1 + b2 len() - assert_eq!(gst::format::Bytes(Some(s1 + s2)), q2); - adapter.push(buf2); + let dur = enc.query_duration::().unwrap(); + assert_eq!(gst::format::Bytes(Some(6043)), dur); - while let Some(buf) = h.pull() { - adapter.push(buf); - if adapter.available() >= expected_output.len() { - break; - } - } + assert_eq!(available, expected_output.len()); + let output_buffer = adapter.take_buffer(available).unwrap(); + let output = output_buffer.map_readable().unwrap(); + assert_eq!(expected_output.as_ref(), output.as_ref()); - // Query position at eos - let q_eos = enc.query_position::().unwrap(); - assert_eq!( - gst::format::Bytes(Some(expected_output.len() as u64)), - q_eos - ); - assert_eq!(gst::format::Bytes(Some(adapter.available() as u64)), q_eos); + pipeline.set_state(gst::State::Null).unwrap(); } #[test] -- GitLab