Commit 4afdd2c9 authored by Sebastian Dröge's avatar Sebastian Dröge 🍵
Browse files

Move thread-handling out of the actual Source/Sink implementation

parent af6d6f80
......@@ -25,6 +25,8 @@ use std::fmt::{Display, Formatter};
use std::fmt::Error as FmtError;
use std::borrow::Cow;
use url::Url;
use utils::*;
macro_rules! error_msg(
......@@ -115,6 +117,49 @@ impl ErrorMessage {
line: line,
}
}
pub unsafe fn post(&self, element: *mut c_void) {
extern "C" {
fn gst_rs_element_error(sink: *mut c_void,
error_domain: u32,
error_code: i32,
message: *const c_char,
debug: *const c_char,
filename: *const c_char,
function: *const c_char,
line: u32);
}
let ErrorMessage { error_domain,
error_code,
ref message,
ref debug,
filename,
function,
line } = *self;
let message_cstr = message.as_ref().map(|m| CString::new(m.as_bytes()).unwrap());
let message_ptr = message_cstr.as_ref().map_or(ptr::null(), |m| m.as_ptr());
let debug_cstr = debug.as_ref().map(|m| CString::new(m.as_bytes()).unwrap());
let debug_ptr = debug_cstr.as_ref().map_or(ptr::null(), |m| m.as_ptr());
let file_cstr = CString::new(filename.as_bytes()).unwrap();
let file_ptr = file_cstr.as_ptr();
let function_cstr = CString::new(function.as_bytes()).unwrap();
let function_ptr = function_cstr.as_ptr();
gst_rs_element_error(element,
error_domain,
error_code,
message_ptr,
debug_ptr,
file_ptr,
function_ptr,
line);
}
}
#[derive(Debug)]
......@@ -246,3 +291,5 @@ impl Error for UriError {
}
}
}
pub type UriValidator = Fn(&Url) -> Result<(), UriError>;
......@@ -43,3 +43,12 @@ GST_PLUGIN_DEFINE (GST_VERSION_MAJOR,
rsplugin,
"Rust Plugin", plugin_init, VERSION, "LGPL", GST_PACKAGE_NAME,
GST_PACKAGE_ORIGIN);
void
gst_rs_element_error (GstElement * element, GQuark error_domain,
gint error_code, const gchar * message, const gchar * debug,
const gchar * file, const gchar * function, guint line)
{
gst_element_message_full (element, GST_MESSAGE_ERROR, error_domain,
error_code, g_strdup (message), g_strdup (debug), file, function, line);
}
......@@ -17,21 +17,14 @@
// Boston, MA 02110-1301, USA.
use std::fs::File;
use std::path::PathBuf;
use url::Url;
use std::io::Write;
use std::sync::Mutex;
use std::convert::From;
use error::*;
use rssink::*;
#[derive(Debug)]
struct Settings {
location: Option<PathBuf>,
}
#[derive(Debug)]
enum StreamingState {
Stopped,
......@@ -40,68 +33,44 @@ enum StreamingState {
#[derive(Debug)]
pub struct FileSink {
controller: SinkController,
settings: Mutex<Settings>,
streaming_state: Mutex<StreamingState>,
streaming_state: StreamingState,
}
unsafe impl Sync for FileSink {}
unsafe impl Send for FileSink {}
impl FileSink {
pub fn new(controller: SinkController) -> FileSink {
FileSink {
controller: controller,
settings: Mutex::new(Settings { location: None }),
streaming_state: Mutex::new(StreamingState::Stopped),
}
pub fn new() -> FileSink {
FileSink { streaming_state: StreamingState::Stopped }
}
pub fn new_boxed(controller: SinkController) -> Box<Sink> {
Box::new(FileSink::new(controller))
pub fn new_boxed() -> Box<Sink> {
Box::new(FileSink::new())
}
}
impl Sink for FileSink {
fn get_controller(&self) -> &SinkController {
&self.controller
}
fn set_uri(&self, uri: Option<Url>) -> Result<(), UriError> {
let location = &mut self.settings.lock().unwrap().location;
*location = None;
match uri {
None => Ok(()),
Some(ref uri) => {
*location = Some(try!(uri.to_file_path()
.or_else(|_| {
Err(UriError::new(UriErrorKind::UnsupportedProtocol,
Some(format!("Unsupported file URI '{}'", uri.as_str()))))
})));
Ok(())
}
}
}
fn validate_uri(uri: &Url) -> Result<(), UriError> {
let _ = try!(uri.to_file_path()
.or_else(|_| {
Err(UriError::new(UriErrorKind::UnsupportedProtocol,
Some(format!("Unsupported file URI '{}'", uri.as_str()))))
}));
Ok(())
}
fn get_uri(&self) -> Option<Url> {
let location = &self.settings.lock().unwrap().location;
location.as_ref()
.map(|l| Url::from_file_path(l).ok())
.and_then(|i| i) // join()
impl Sink for FileSink {
fn uri_validator(&self) -> Box<UriValidator> {
Box::new(validate_uri)
}
fn start(&self) -> Result<(), ErrorMessage> {
let location = &self.settings.lock().unwrap().location;
let mut streaming_state = self.streaming_state.lock().unwrap();
if let StreamingState::Started { .. } = *streaming_state {
fn start(&mut self, uri: &Url) -> Result<(), ErrorMessage> {
if let StreamingState::Started { .. } = self.streaming_state {
return Err(error_msg!(SinkError::Failure, ["Sink already started"]));
}
let location = &try!(location.as_ref().ok_or_else(|| {
error_msg!(SinkError::Failure, ["No URI provided"])
}));
let location = try!(uri.to_file_path()
.or_else(|_| {
Err(error_msg!(SinkError::Failure,
["Unsupported file URI '{}'", uri.as_str()]))
}));
let file = try!(File::create(location.as_path()).or_else(|err| {
Err(error_msg!(SinkError::OpenFailed,
......@@ -110,7 +79,7 @@ impl Sink for FileSink {
err.to_string()]))
}));
*streaming_state = StreamingState::Started {
self.streaming_state = StreamingState::Started {
file: file,
position: 0,
};
......@@ -118,17 +87,14 @@ impl Sink for FileSink {
Ok(())
}
fn stop(&self) -> Result<(), ErrorMessage> {
let mut streaming_state = self.streaming_state.lock().unwrap();
*streaming_state = StreamingState::Stopped;
fn stop(&mut self) -> Result<(), ErrorMessage> {
self.streaming_state = StreamingState::Stopped;
Ok(())
}
fn render(&self, data: &[u8]) -> Result<(), FlowError> {
let mut streaming_state = self.streaming_state.lock().unwrap();
let (file, position) = match *streaming_state {
fn render(&mut self, data: &[u8]) -> Result<(), FlowError> {
let (file, position) = match self.streaming_state {
StreamingState::Started { ref mut file, ref mut position } => (file, position),
StreamingState::Stopped => {
return Err(FlowError::Error(error_msg!(SinkError::Failure, ["Not started yet"])));
......@@ -140,6 +106,7 @@ impl Sink for FileSink {
}));
*position += data.len() as u64;
Ok(())
}
}
......@@ -18,18 +18,11 @@
use std::u64;
use std::io::{Read, Seek, SeekFrom};
use std::fs::File;
use std::path::PathBuf;
use std::sync::Mutex;
use url::Url;
use error::*;
use rssource::*;
#[derive(Debug)]
struct Settings {
location: Option<PathBuf>,
}
#[derive(Debug)]
enum StreamingState {
Stopped,
......@@ -38,58 +31,31 @@ enum StreamingState {
#[derive(Debug)]
pub struct FileSrc {
controller: SourceController,
settings: Mutex<Settings>,
streaming_state: Mutex<StreamingState>,
streaming_state: StreamingState,
}
unsafe impl Sync for FileSrc {}
unsafe impl Send for FileSrc {}
impl FileSrc {
pub fn new(controller: SourceController) -> FileSrc {
FileSrc {
controller: controller,
settings: Mutex::new(Settings { location: None }),
streaming_state: Mutex::new(StreamingState::Stopped),
}
pub fn new() -> FileSrc {
FileSrc { streaming_state: StreamingState::Stopped }
}
pub fn new_boxed(controller: SourceController) -> Box<Source> {
Box::new(FileSrc::new(controller))
pub fn new_boxed() -> Box<Source> {
Box::new(FileSrc::new())
}
}
impl Source for FileSrc {
fn get_controller(&self) -> &SourceController {
&self.controller
}
fn set_uri(&self, uri: Option<Url>) -> Result<(), UriError> {
let location = &mut self.settings.lock().unwrap().location;
match uri {
None => {
*location = None;
Ok(())
}
Some(ref uri) => {
*location = Some(try!(uri.to_file_path()
.or_else(|_| {
Err(UriError::new(UriErrorKind::UnsupportedProtocol,
Some(format!("Unsupported file URI '{}'", uri.as_str()))))
})));
Ok(())
}
}
}
fn get_uri(&self) -> Option<Url> {
let location = &self.settings.lock().unwrap().location;
fn validate_uri(uri: &Url) -> Result<(), UriError> {
let _ = try!(uri.to_file_path()
.or_else(|_| {
Err(UriError::new(UriErrorKind::UnsupportedProtocol,
Some(format!("Unsupported file URI '{}'", uri.as_str()))))
}));
Ok(())
}
location.as_ref()
.map(|l| Url::from_file_path(l).ok())
.and_then(|i| i) // join()
impl Source for FileSrc {
fn uri_validator(&self) -> Box<UriValidator> {
Box::new(validate_uri)
}
fn is_seekable(&self) -> bool {
......@@ -97,9 +63,7 @@ impl Source for FileSrc {
}
fn get_size(&self) -> u64 {
let streaming_state = self.streaming_state.lock().unwrap();
if let StreamingState::Started { ref file, .. } = *streaming_state {
if let StreamingState::Started { ref file, .. } = self.streaming_state {
file.metadata()
.ok()
.map_or(u64::MAX, |m| m.len())
......@@ -108,16 +72,16 @@ impl Source for FileSrc {
}
}
fn start(&self) -> Result<(), ErrorMessage> {
let location = &self.settings.lock().unwrap().location;
let mut streaming_state = self.streaming_state.lock().unwrap();
if let StreamingState::Started { .. } = *streaming_state {
fn start(&mut self, uri: &Url) -> Result<(), ErrorMessage> {
if let StreamingState::Started { .. } = self.streaming_state {
return Err(error_msg!(SourceError::Failure, ["Source already started"]));
}
let location = &try!(location.as_ref()
.ok_or_else(|| error_msg!(SourceError::Failure, ["No URI provided"])));
let location = try!(uri.to_file_path()
.or_else(|_| {
Err(error_msg!(SourceError::Failure,
["Unsupported file URI '{}'", uri.as_str()]))
}));
let file = try!(File::open(location.as_path()).or_else(|err| {
Err(error_msg!(SourceError::OpenFailed,
......@@ -126,7 +90,7 @@ impl Source for FileSrc {
err.to_string()]))
}));
*streaming_state = StreamingState::Started {
self.streaming_state = StreamingState::Started {
file: file,
position: 0,
};
......@@ -134,18 +98,14 @@ impl Source for FileSrc {
Ok(())
}
fn stop(&self) -> Result<(), ErrorMessage> {
let mut streaming_state = self.streaming_state.lock().unwrap();
*streaming_state = StreamingState::Stopped;
fn stop(&mut self) -> Result<(), ErrorMessage> {
self.streaming_state = StreamingState::Stopped;
Ok(())
}
fn fill(&self, offset: u64, data: &mut [u8]) -> Result<usize, FlowError> {
let mut streaming_state = self.streaming_state.lock().unwrap();
let (file, position) = match *streaming_state {
fn fill(&mut self, offset: u64, data: &mut [u8]) -> Result<usize, FlowError> {
let (file, position) = match self.streaming_state {
StreamingState::Started { ref mut file, ref mut position } => (file, position),
StreamingState::Stopped => {
return Err(FlowError::Error(error_msg!(SourceError::Failure, ["Not started yet"])));
......@@ -172,7 +132,7 @@ impl Source for FileSrc {
Ok(size)
}
fn do_seek(&self, _: u64, _: u64) -> Result<(), ErrorMessage> {
fn seek(&mut self, _: u64, _: u64) -> Result<(), ErrorMessage> {
Ok(())
}
}
......@@ -23,20 +23,14 @@ use hyper::header::{ContentLength, ContentRange, ContentRangeSpec, Range, ByteRa
use hyper::client::Client;
use hyper::client::response::Response;
use std::sync::Mutex;
use error::*;
use rssource::*;
#[derive(Debug)]
struct Settings {
url: Option<Url>,
}
#[derive(Debug)]
enum StreamingState {
Stopped,
Started {
uri: Url,
response: Response,
seekable: bool,
position: u64,
......@@ -48,37 +42,24 @@ enum StreamingState {
#[derive(Debug)]
pub struct HttpSrc {
controller: SourceController,
settings: Mutex<Settings>,
streaming_state: Mutex<StreamingState>,
streaming_state: StreamingState,
client: Client,
}
unsafe impl Sync for HttpSrc {}
unsafe impl Send for HttpSrc {}
impl HttpSrc {
pub fn new(controller: SourceController) -> HttpSrc {
pub fn new() -> HttpSrc {
HttpSrc {
controller: controller,
settings: Mutex::new(Settings { url: None }),
streaming_state: Mutex::new(StreamingState::Stopped),
streaming_state: StreamingState::Stopped,
client: Client::new(),
}
}
pub fn new_boxed(controller: SourceController) -> Box<Source> {
Box::new(HttpSrc::new(controller))
pub fn new_boxed() -> Box<Source> {
Box::new(HttpSrc::new())
}
fn do_request(&self, start: u64, stop: u64) -> Result<StreamingState, ErrorMessage> {
let url = &self.settings.lock().unwrap().url;
let url = try!(url.as_ref()
.ok_or_else(|| error_msg!(SourceError::Failure, ["No URI provided"])));
let mut req = self.client.get(url.clone());
fn do_request(&self, uri: Url, start: u64, stop: u64) -> Result<StreamingState, ErrorMessage> {
let mut req = self.client.get(uri.clone());
if start != 0 || stop != u64::MAX {
req = if stop == u64::MAX {
......@@ -90,12 +71,12 @@ impl HttpSrc {
let response = try!(req.send().or_else(|err| {
Err(error_msg!(SourceError::ReadFailed,
["Failed to fetch {}: {}", url, err.to_string()]))
["Failed to fetch {}: {}", uri, err.to_string()]))
}));
if !response.status.is_success() {
return Err(error_msg!(SourceError::ReadFailed,
["Failed to fetch {}: {}", url, response.status]));
["Failed to fetch {}: {}", uri, response.status]));
}
let size = if let Some(&ContentLength(content_length)) = response.headers.get() {
......@@ -128,6 +109,7 @@ impl HttpSrc {
}
Ok(StreamingState::Started {
uri: uri,
response: response,
seekable: seekable,
position: 0,
......@@ -138,95 +120,69 @@ impl HttpSrc {
}
}
impl Source for HttpSrc {
fn get_controller(&self) -> &SourceController {
&self.controller
fn validate_uri(uri: &Url) -> Result<(), UriError> {
if uri.scheme() != "http" && uri.scheme() != "https" {
return Err(UriError::new(UriErrorKind::UnsupportedProtocol,
Some(format!("Unsupported URI '{}'", uri.as_str()))));
}
fn set_uri(&self, uri: Option<Url>) -> Result<(), UriError> {
let url = &mut self.settings.lock().unwrap().url;
match uri {
None => {
*url = None;
Ok(())
}
Some(uri) => {
if uri.scheme() != "http" && uri.scheme() != "https" {
*url = None;
return Err(UriError::new(UriErrorKind::UnsupportedProtocol,
Some(format!("Unsupported URI '{}'", uri.as_str()))));
}
*url = Some(uri);
Ok(())
}
}
}
Ok(())
}
fn get_uri(&self) -> Option<Url> {
let url = &self.settings.lock().unwrap().url;
url.as_ref().cloned()
impl Source for HttpSrc {
fn uri_validator(&self) -> Box<UriValidator> {
Box::new(validate_uri)
}
fn is_seekable(&self) -> bool {
let streaming_state = self.streaming_state.lock().unwrap();
match *streaming_state {
match self.streaming_state {
StreamingState::Started { seekable, .. } => seekable,
_ => false,
}
}
fn get_size(&self) -> u64 {
let streaming_state = self.streaming_state.lock().unwrap();
match *streaming_state {
match self.streaming_state {
StreamingState::Started { size, .. } => size,
_ => u64::MAX,
}
}
fn start(&self) -> Result<(), ErrorMessage> {
let mut streaming_state = self.streaming_state.lock().unwrap();
*streaming_state = StreamingState::Stopped;
let new_state = try!(self.do_request(0, u64::MAX));
fn start(&mut self, uri: &Url) -> Result<(), ErrorMessage> {
self.streaming_state = StreamingState::Stopped;
self.streaming_state = try!(self.do_request(uri.clone(), 0, u64::MAX));
*streaming_state = new_state;
Ok(())
}
fn stop(&self) -> Result<(), ErrorMessage> {
let mut streaming_state = self.streaming_state.lock().unwrap();
*streaming_state = StreamingState::Stopped;
fn stop(&mut self) -> Result<(), ErrorMessage> {
self.streaming_state = StreamingState::Stopped;
Ok(())
}
fn do_seek(&self, start: u64, stop: u64) -> Result<(), ErrorMessage> {
let mut streaming_state = self.streaming_state.lock().unwrap();
*streaming_state = StreamingState::Stopped;
fn seek(&mut self, start: u64, stop: u64) -> Result<(), ErrorMessage> {
let (position, old_stop, uri) = match self.streaming_state {
StreamingState::Started { position, stop, ref uri, .. } => {
(position, stop, uri.clone())
}
StreamingState::Stopped => {
return Err(error_msg!(SourceError::Failure, ["Not started yet"]));
}
};
if position == start && old_stop == stop {
return Ok(());
}
let new_state = try!(self.do_request(start, stop));
self.streaming_state = StreamingState::Stopped;
self.streaming_state = try!(self.do_request(uri, start, stop));
*streaming_state = new_state;
Ok(())
}
fn fill(&self, offset: u64, data: &mut [u8]) -> Result<usize, FlowError> {
let mut streaming_state = self.streaming_state.lock().unwrap();
if let StreamingState::Started { position, stop, .. } = *streaming_state {
if position != offset {
*streaming_state = StreamingState::Stopped;
let new_state = try!(self.do_request(offset, stop)
.or_else(|err| Err(FlowError::Error(err))));
*streaming_state = new_state;
}
}
let (response, position) = match *streaming_state {
fn fill(&mut self, offset: u64, data: &mut [u8]) -> Result<usize, FlowError> {
let (response, position) = match self.streaming_state {
StreamingState::Started { ref mut response, ref mut position, .. } => {