diff --git a/gst-plugin-sodium/src/encrypter.rs b/gst-plugin-sodium/src/encrypter.rs index 565687ec14539e5772695db5d57c159e8c396ff6..56dbe3e15eda25d29c808b4ce35fa9dde2407145 100644 --- a/gst-plugin-sodium/src/encrypter.rs +++ b/gst-plugin-sodium/src/encrypter.rs @@ -100,6 +100,7 @@ struct State { precomputed_key: box_::PrecomputedKey, block_size: u32, write_headers: bool, + pending_bytes: u64, } impl State { @@ -153,6 +154,7 @@ impl State { nonce, block_size: props.block_size, write_headers: true, + pending_bytes: 0, }) } @@ -183,6 +185,7 @@ impl State { let buffer = self.adapter.take_buffer(block_size).unwrap(); let out_buf = self.encrypt_message(&buffer); + self.pending_bytes += out_buf.get_size() as u64; buffers.push(out_buf); } @@ -250,7 +253,9 @@ impl Encrypter { // Write the block_size into the stream headers.extend_from_slice(&state.block_size.to_le_bytes()); - buffers.push(gst::Buffer::from_mut_slice(headers)); + let out_buf = gst::Buffer::from_mut_slice(headers); + state.pending_bytes += out_buf.get_size() as u64; + buffers.push(out_buf); state.write_headers = false; } @@ -275,12 +280,24 @@ impl Encrypter { drop(state_guard); for buffer in buffers { + { + let mut state_guard = self.state.lock().unwrap(); + let state = state_guard.as_mut().unwrap(); + state.pending_bytes -= buffer.get_size() as u64; + } + self.srcpad.push(buffer).map_err(|err| { gst_error!(CAT, obj: element, "Failed to push buffer {:?}", err); err })?; } + { + let mut state_guard = self.state.lock().unwrap(); + let state = state_guard.as_mut().unwrap(); + assert_eq!(state.pending_bytes, 0); + } + Ok(gst::FlowSuccess::Ok) } @@ -389,8 +406,8 @@ impl Encrypter { } }; - let state = self.state.lock().unwrap(); - let state = match state.as_ref() { + let state_mutex = self.state.lock().unwrap(); + let state = match state_mutex.as_ref() { // If state isn't set, it means that the // element hasn't been activated yet. None => return false, @@ -410,6 +427,55 @@ impl Encrypter { true } + QueryView::Position(ref mut q) => { + if q.get_format() != gst::Format::Bytes { + return pad.query_default(element, query); + } + + /* First let's query the bytes duration upstream */ + let mut peer_query = gst::query::Query::new_position(gst::Format::Bytes); + + if !self.sinkpad.peer_query(&mut peer_query) { + gst_error!(CAT, "Failed to query upstream duration"); + return false; + } + + let position = match peer_query.get_result().try_into_bytes().unwrap() { + gst::format::Bytes(Some(size)) => size, + gst::format::Bytes(None) => { + gst_error!(CAT, "Failed to query upstream duration"); + + return false; + } + }; + + let state_mutex = self.state.lock().unwrap(); + let state = match state_mutex.as_ref() { + // If state isn't set, it means that the + // element hasn't been activated yet. + None => return false, + Some(s) => s, + }; + + dbg!(position); + let position = position - state.adapter.available() as u64; + dbg!(position); + let chunk_index = position as u64 / state.block_size as u64; + dbg!(chunk_index); + dbg!(state.block_size); + let position = + position + (chunk_index * box_::MACBYTES as u64) + super::HEADERS_SIZE as u64; + dbg!(box_::MACBYTES); + dbg!(super::HEADERS_SIZE); + dbg!(position); + dbg!(state.pending_bytes); + let position = position - state.pending_bytes; + dbg!(gst::format::Bytes::from(position)); + + q.set(gst::format::Bytes::from(position)); + dbg!(q.get_structure()); + true + } _ => pad.query_default(element, query), } } diff --git a/gst-plugin-sodium/tests/encrypter.rs b/gst-plugin-sodium/tests/encrypter.rs index 93854e5fba174cd47f05e59c8bf8d50d8ba05368..01f94bca52e58fdb9dcd27ef93cab60393e0e4d3 100644 --- a/gst-plugin-sodium/tests/encrypter.rs +++ b/gst-plugin-sodium/tests/encrypter.rs @@ -29,6 +29,9 @@ extern crate gstrssodium; use glib::prelude::*; use gst::prelude::*; +use std::path::PathBuf; +use std::sync::{Arc, Mutex}; + lazy_static! { static ref RECEIVER_PUBLIC: glib::Bytes = { let public = [ @@ -108,6 +111,105 @@ fn encrypt_file() { assert_eq!(map.as_ref(), expected_output.as_ref()); } +#[test] +fn test_queries() { + init(); + + let input_path = { + let mut r = PathBuf::new(); + r.push(env!("CARGO_MANIFEST_DIR")); + r.push("tests"); + r.push("sample"); + r.set_extension("mp3"); + r + }; + + let pipeline = gst::Pipeline::new(None); + let src = gst::ElementFactory::make("filesrc", None).unwrap(); + src.set_property("location", &input_path.to_str().unwrap()) + .unwrap(); + let enc = gst::ElementFactory::make("rssodiumencrypter", None).unwrap(); + let sink = gst::ElementFactory::make("appsink", None).unwrap(); + enc.set_property("sender-key", &*SENDER_PRIVATE) + .expect("failed to set property"); + enc.set_property("receiver-key", &*RECEIVER_PUBLIC) + .expect("failed to set property"); + enc.set_property("block-size", &1024u32) + .expect("failed to set property"); + pipeline.add_many(&[&src, &enc, &sink]).unwrap(); + src.link_pads(Some("src"), &enc, Some("sink")).unwrap(); + enc.link_pads(Some("src"), &sink, Some("sink")).unwrap(); + let sink = sink.downcast::().unwrap(); + + let adapter = Arc::new(Mutex::new(gst_base::UniqueAdapter::new())); + + let adapter_clone = adapter.clone(); + let enc_clone = enc.clone(); + sink.set_callbacks( + gst_app::AppSinkCallbacks::new() + // Add a handler to the "new-sample" signal. + .new_sample(move |appsink| { + // Pull the sample in question out of the appsink's buffer. + let sample = appsink.pull_sample().ok_or(gst::FlowError::Eos)?; + let buffer = sample.get_buffer().ok_or(gst::FlowError::Error)?; + + let mut adapter = adapter_clone.lock().unwrap(); + adapter.push(buffer.to_owned()); + + let pos = enc_clone.query_position::().unwrap(); + dbg!(pos); + dbg!(adapter.available()); + assert_eq!(gst::format::Bytes(Some(adapter.available() as u64)), pos); + + let dur = enc_clone.query_duration::().unwrap(); + assert_eq!(gst::format::Bytes(Some(6043)), dur); + + Ok(gst::FlowSuccess::Ok) + }) + .build(), + ); + + pipeline.set_state(gst::State::Playing).unwrap(); + + let bus = pipeline.get_bus().unwrap(); + for msg in bus.iter_timed(gst::CLOCK_TIME_NONE) { + use gst::MessageView; + match msg.view() { + MessageView::Error(err) => { + eprintln!( + "Error received from element {:?}: {}", + err.get_src().map(|s| s.get_path_string()), + err.get_error() + ); + eprintln!("Debugging information: {:?}", err.get_debug()); + assert!(true); + break; + } + MessageView::Eos(..) => break, + _ => (), + } + } + + let expected_output = include_bytes!("encrypted_sample.enc"); + let mut adapter = adapter.lock().unwrap(); + let available = adapter.available(); + + // Query position/duration at eos + let pos = enc.query_position::().unwrap(); + assert_eq!(gst::format::Bytes(Some(expected_output.len() as u64)), pos); + assert_eq!(gst::format::Bytes(Some(available as u64)), pos); + + let dur = enc.query_duration::().unwrap(); + assert_eq!(gst::format::Bytes(Some(6043)), dur); + + assert_eq!(available, expected_output.len()); + let output_buffer = adapter.take_buffer(available).unwrap(); + let output = output_buffer.map_readable().unwrap(); + assert_eq!(expected_output.as_ref(), output.as_ref()); + + pipeline.set_state(gst::State::Null).unwrap(); +} + #[test] fn test_state_changes() { init();