Commit 7e743303 authored by Sebastian Dröge's avatar Sebastian Dröge 🍵
Browse files

Port everything over to gstreamer-rs

And remove all unneeded code and unused external crates.
parent 5ae66e19
......@@ -8,7 +8,7 @@ license = "MIT/Apache-2.0"
[dependencies]
url = "1.1"
gst-plugin = { path="../gst-plugin" }
slog = "2.0"
gstreamer = { git = "https://github.com/sdroege/gstreamer-rs", features = ["v1_10"] }
[lib]
name = "gstrsfile"
......
......@@ -15,11 +15,9 @@ use std::convert::From;
use gst_plugin::error::*;
use gst_plugin::sink::*;
use gst_plugin::buffer::*;
use gst_plugin::utils::*;
use gst_plugin::log::*;
use slog::Logger;
use gst;
use gst::prelude::*;
#[derive(Debug)]
enum StreamingState {
......@@ -30,30 +28,31 @@ enum StreamingState {
#[derive(Debug)]
pub struct FileSink {
streaming_state: StreamingState,
logger: Logger,
cat: gst::DebugCategory,
}
impl FileSink {
pub fn new(element: Element) -> FileSink {
pub fn new(_sink: &RsSinkWrapper) -> FileSink {
FileSink {
streaming_state: StreamingState::Stopped,
logger: Logger::root(
GstDebugDrain::new(Some(&element), "rsfilesink", 0, "Rust file sink"),
o!(),
cat: gst::DebugCategory::new(
"rsfilesink",
gst::DebugColorFlags::empty(),
"Rust file source",
),
}
}
pub fn new_boxed(element: Element) -> Box<Sink> {
Box::new(FileSink::new(element))
pub fn new_boxed(sink: &RsSinkWrapper) -> Box<Sink> {
Box::new(FileSink::new(sink))
}
}
fn validate_uri(uri: &Url) -> Result<(), UriError> {
let _ = try!(uri.to_file_path().or_else(|_| {
Err(UriError::new(
UriErrorKind::UnsupportedProtocol,
Some(format!("Unsupported file URI '{}'", uri.as_str())),
gst::URIError::UnsupportedProtocol,
format!("Unsupported file URI '{}'", uri.as_str()),
))
}));
Ok(())
......@@ -64,28 +63,37 @@ impl Sink for FileSink {
Box::new(validate_uri)
}
fn start(&mut self, uri: Url) -> Result<(), ErrorMessage> {
fn start(&mut self, sink: &RsSinkWrapper, uri: Url) -> Result<(), ErrorMessage> {
if let StreamingState::Started { .. } = self.streaming_state {
return Err(error_msg!(SinkError::Failure, ["Sink already started"]));
return Err(error_msg!(
gst::LibraryError::Failed,
["Sink already started"]
));
}
let location = try!(uri.to_file_path().or_else(|_| {
error!(self.logger, "Unsupported file URI '{}'", uri.as_str());
gst_error!(
self.cat,
obj: sink,
"Unsupported file URI '{}'",
uri.as_str()
);
Err(error_msg!(
SinkError::Failure,
gst::LibraryError::Failed,
["Unsupported file URI '{}'", uri.as_str()]
))
}));
let file = try!(File::create(location.as_path()).or_else(|err| {
error!(
self.logger,
gst_error!(
self.cat,
obj: sink,
"Could not open file for writing: {}",
err.to_string()
);
Err(error_msg!(
SinkError::OpenFailed,
gst::ResourceError::OpenWrite,
[
"Could not open file for writing '{}': {}",
location.to_str().unwrap_or("Non-UTF8 path"),
......@@ -94,7 +102,7 @@ impl Sink for FileSink {
))
}));
debug!(self.logger, "Opened file {:?}", file);
gst_debug!(self.cat, obj: sink, "Opened file {:?}", file);
self.streaming_state = StreamingState::Started {
file: file,
......@@ -104,17 +112,17 @@ impl Sink for FileSink {
Ok(())
}
fn stop(&mut self) -> Result<(), ErrorMessage> {
fn stop(&mut self, _sink: &RsSinkWrapper) -> Result<(), ErrorMessage> {
self.streaming_state = StreamingState::Stopped;
Ok(())
}
fn render(&mut self, buffer: &Buffer) -> Result<(), FlowError> {
let logger = &self.logger;
fn render(&mut self, sink: &RsSinkWrapper, buffer: &gst::BufferRef) -> Result<(), FlowError> {
let cat = self.cat;
let streaming_state = &mut self.streaming_state;
trace!(logger, "Rendering {:?}", buffer);
gst_trace!(cat, obj: sink, "Rendering {:?}", buffer);
let (file, position) = match *streaming_state {
StreamingState::Started {
......@@ -123,25 +131,26 @@ impl Sink for FileSink {
} => (file, position),
StreamingState::Stopped => {
return Err(FlowError::Error(
error_msg!(SinkError::Failure, ["Not started yet"]),
error_msg!(gst::LibraryError::Failed, ["Not started yet"]),
));
}
};
let map = match buffer.map_read() {
let map = match buffer.map_readable() {
None => {
return Err(FlowError::Error(
error_msg!(SinkError::Failure, ["Failed to map buffer"]),
));
return Err(FlowError::Error(error_msg!(
gst::LibraryError::Failed,
["Failed to map buffer"]
)));
}
Some(map) => map,
};
let data = map.as_slice();
try!(file.write_all(data).or_else(|err| {
error!(logger, "Failed to write: {}", err);
gst_error!(cat, obj: sink, "Failed to write: {}", err);
Err(FlowError::Error(error_msg!(
SinkError::WriteFailed,
gst::ResourceError::Write,
["Failed to write: {}", err]
)))
}));
......
......@@ -13,11 +13,9 @@ use url::Url;
use gst_plugin::error::*;
use gst_plugin::source::*;
use gst_plugin::buffer::*;
use gst_plugin::log::*;
use gst_plugin::utils::*;
use slog::Logger;
use gst;
use gst::prelude::*;
#[derive(Debug)]
enum StreamingState {
......@@ -28,30 +26,31 @@ enum StreamingState {
#[derive(Debug)]
pub struct FileSrc {
streaming_state: StreamingState,
logger: Logger,
cat: gst::DebugCategory,
}
impl FileSrc {
pub fn new(element: Element) -> FileSrc {
pub fn new(_src: &RsSrcWrapper) -> FileSrc {
FileSrc {
streaming_state: StreamingState::Stopped,
logger: Logger::root(
GstDebugDrain::new(Some(&element), "rsfilesrc", 0, "Rust file source"),
o!(),
cat: gst::DebugCategory::new(
"rsfilesrc",
gst::DebugColorFlags::empty(),
"Rust file source",
),
}
}
pub fn new_boxed(element: Element) -> Box<Source> {
Box::new(FileSrc::new(element))
pub fn new_boxed(src: &RsSrcWrapper) -> Box<Source> {
Box::new(FileSrc::new(src))
}
}
fn validate_uri(uri: &Url) -> Result<(), UriError> {
let _ = try!(uri.to_file_path().or_else(|_| {
Err(UriError::new(
UriErrorKind::UnsupportedProtocol,
Some(format!("Unsupported file URI '{}'", uri.as_str())),
gst::URIError::UnsupportedProtocol,
format!("Unsupported file URI '{}'", uri.as_str()),
))
}));
Ok(())
......@@ -62,11 +61,11 @@ impl Source for FileSrc {
Box::new(validate_uri)
}
fn is_seekable(&self) -> bool {
fn is_seekable(&self, _src: &RsSrcWrapper) -> bool {
true
}
fn get_size(&self) -> Option<u64> {
fn get_size(&self, _src: &RsSrcWrapper) -> Option<u64> {
if let StreamingState::Started { ref file, .. } = self.streaming_state {
file.metadata().ok().map(|m| m.len())
} else {
......@@ -74,27 +73,36 @@ impl Source for FileSrc {
}
}
fn start(&mut self, uri: Url) -> Result<(), ErrorMessage> {
fn start(&mut self, src: &RsSrcWrapper, uri: Url) -> Result<(), ErrorMessage> {
if let StreamingState::Started { .. } = self.streaming_state {
return Err(error_msg!(SourceError::Failure, ["Source already started"]));
return Err(error_msg!(
gst::LibraryError::Failed,
["Source already started"]
));
}
let location = try!(uri.to_file_path().or_else(|_| {
error!(self.logger, "Unsupported file URI '{}'", uri.as_str());
gst_error!(
self.cat,
obj: src,
"Unsupported file URI '{}'",
uri.as_str()
);
Err(error_msg!(
SourceError::Failure,
gst::LibraryError::Failed,
["Unsupported file URI '{}'", uri.as_str()]
))
}));
let file = try!(File::open(location.as_path()).or_else(|err| {
error!(
self.logger,
gst_error!(
self.cat,
obj: src,
"Could not open file for reading: {}",
err.to_string()
);
Err(error_msg!(
SourceError::OpenFailed,
gst::ResourceError::OpenRead,
[
"Could not open file for reading '{}': {}",
location.to_str().unwrap_or("Non-UTF8 path"),
......@@ -103,7 +111,7 @@ impl Source for FileSrc {
))
}));
debug!(self.logger, "Opened file {:?}", file);
gst_debug!(self.cat, obj: src, "Opened file {:?}", file);
self.streaming_state = StreamingState::Started {
file: file,
......@@ -113,14 +121,20 @@ impl Source for FileSrc {
Ok(())
}
fn stop(&mut self) -> Result<(), ErrorMessage> {
fn stop(&mut self, _src: &RsSrcWrapper) -> Result<(), ErrorMessage> {
self.streaming_state = StreamingState::Stopped;
Ok(())
}
fn fill(&mut self, offset: u64, _: u32, buffer: &mut Buffer) -> Result<(), FlowError> {
let logger = &self.logger;
fn fill(
&mut self,
src: &RsSrcWrapper,
offset: u64,
_: u32,
buffer: &mut gst::BufferRef,
) -> Result<(), FlowError> {
let cat = self.cat;
let streaming_state = &mut self.streaming_state;
let (file, position) = match *streaming_state {
......@@ -130,16 +144,16 @@ impl Source for FileSrc {
} => (file, position),
StreamingState::Stopped => {
return Err(FlowError::Error(
error_msg!(SourceError::Failure, ["Not started yet"]),
error_msg!(gst::LibraryError::Failed, ["Not started yet"]),
));
}
};
if *position != offset {
try!(file.seek(SeekFrom::Start(offset)).or_else(|err| {
error!(logger, "Failed to seek to {}: {:?}", offset, err);
gst_error!(cat, obj: src, "Failed to seek to {}: {:?}", offset, err);
Err(FlowError::Error(error_msg!(
SourceError::SeekFailed,
gst::ResourceError::Seek,
["Failed to seek to {}: {}", offset, err.to_string()]
)))
}));
......@@ -147,11 +161,12 @@ impl Source for FileSrc {
}
let size = {
let mut map = match buffer.map_readwrite() {
let mut map = match buffer.map_writable() {
None => {
return Err(FlowError::Error(
error_msg!(SourceError::Failure, ["Failed to map buffer"]),
));
return Err(FlowError::Error(error_msg!(
gst::LibraryError::Failed,
["Failed to map buffer"]
)));
}
Some(map) => map,
};
......@@ -159,9 +174,9 @@ impl Source for FileSrc {
let data = map.as_mut_slice();
try!(file.read(data).or_else(|err| {
error!(logger, "Failed to read: {:?}", err);
gst_error!(cat, obj: src, "Failed to read: {:?}", err);
Err(FlowError::Error(error_msg!(
SourceError::ReadFailed,
gst::ResourceError::Read,
["Failed to read at {}: {}", offset, err.to_string()]
)))
}))
......@@ -174,7 +189,7 @@ impl Source for FileSrc {
Ok(())
}
fn seek(&mut self, _: u64, _: Option<u64>) -> Result<(), ErrorMessage> {
fn seek(&mut self, _src: &RsSrcWrapper, _: u64, _: Option<u64>) -> Result<(), ErrorMessage> {
Ok(())
}
}
......@@ -11,10 +11,9 @@
#[macro_use]
extern crate gst_plugin;
#[macro_use]
extern crate slog;
extern crate gstreamer as gst;
extern crate url;
use gst_plugin::plugin::*;
use gst_plugin::source::*;
use gst_plugin::sink::*;
......@@ -24,7 +23,7 @@ mod filesink;
use filesrc::FileSrc;
use filesink::FileSink;
fn plugin_init(plugin: &Plugin) -> bool {
fn plugin_init(plugin: &gst::Plugin) -> bool {
source_register(
plugin,
SourceInfo {
......
......@@ -8,7 +8,8 @@ license = "MIT/Apache-2.0"
[dependencies]
url = "1.1"
gst-plugin = { path="../gst-plugin" }
slog = "2.0"
gstreamer = { git = "https://github.com/sdroege/gstreamer-rs", features = ["v1_10"] }
num-rational = { version = "0.1", default-features = false, features = [] }
nom = "3.0"
flavors = {git = "https://github.com/rust-av/flavors.git"}
muldiv = "0.1"
......
This diff is collapsed.
......@@ -11,22 +11,21 @@
extern crate flavors;
#[macro_use]
extern crate gst_plugin;
extern crate muldiv;
#[macro_use]
extern crate gstreamer as gst;
extern crate muldiv;
extern crate nom;
#[macro_use]
extern crate slog;
extern crate num_rational;
extern crate url;
use gst_plugin::plugin::*;
use gst_plugin::demuxer::*;
use gst_plugin::caps::*;
use gst::prelude::*;
mod flvdemux;
use flvdemux::FlvDemux;
fn plugin_init(plugin: &Plugin) -> bool {
fn plugin_init(plugin: &gst::Plugin) -> bool {
demuxer_register(
plugin,
&DemuxerInfo {
......@@ -37,8 +36,8 @@ fn plugin_init(plugin: &Plugin) -> bool {
author: "Sebastian Dröge <sebastian@centricular.com>",
rank: 256 + 100,
create_instance: FlvDemux::new_boxed,
input_caps: &Caps::new_simple("video/x-flv", &[]),
output_caps: &Caps::new_any(),
input_caps: &gst::Caps::new_simple("video/x-flv", &[]),
output_caps: &gst::Caps::new_any(),
},
);
......
......@@ -9,7 +9,7 @@ license = "MIT/Apache-2.0"
url = "1.1"
gst-plugin = { path="../gst-plugin" }
reqwest = "0.7"
slog = "2.0"
gstreamer = { git = "https://github.com/sdroege/gstreamer-rs", features = ["v1_10"] }
[lib]
name = "gstrshttp"
......
......@@ -15,11 +15,9 @@ use reqwest::header::{AcceptRanges, ByteRangeSpec, ContentLength, ContentRange,
use gst_plugin::error::*;
use gst_plugin::source::*;
use gst_plugin::buffer::*;
use gst_plugin::utils::*;
use gst_plugin::log::*;
use slog::Logger;
use gst;
use gst::prelude::*;
#[derive(Debug)]
enum StreamingState {
......@@ -38,32 +36,35 @@ enum StreamingState {
#[derive(Debug)]
pub struct HttpSrc {
streaming_state: StreamingState,
logger: Logger,
cat: gst::DebugCategory,
client: Client,
}
impl HttpSrc {
pub fn new(element: Element) -> HttpSrc {
pub fn new(_src: &RsSrcWrapper) -> HttpSrc {
HttpSrc {
streaming_state: StreamingState::Stopped,
logger: Logger::root(
GstDebugDrain::new(Some(&element), "rshttpsink", 0, "Rust http sink"),
o!(),
cat: gst::DebugCategory::new(
"rshttpsrc",
gst::DebugColorFlags::empty(),
"Rust HTTP source",
),
client: Client::new().unwrap(),
}
}
pub fn new_boxed(element: Element) -> Box<Source> {
Box::new(HttpSrc::new(element))
pub fn new_boxed(src: &RsSrcWrapper) -> Box<Source> {
Box::new(HttpSrc::new(src))
}
fn do_request(
&self,
src: &RsSrcWrapper,
uri: Url,
start: u64,
stop: Option<u64>,
) -> Result<StreamingState, ErrorMessage> {
let cat = self.cat;
let mut req = self.client.get(uri.clone()).unwrap();
match (start != 0, stop) {
......@@ -76,20 +77,20 @@ impl HttpSrc {
}
}
debug!(self.logger, "Doing new request {:?}", req);
gst_debug!(cat, obj: src, "Doing new request {:?}", req);
let response = try!(req.send().or_else(|err| {
error!(self.logger, "Request failed: {:?}", err);
gst_error!(cat, obj: src, "Request failed: {:?}", err);
Err(error_msg!(
SourceError::ReadFailed,
gst::ResourceError::Read,
["Failed to fetch {}: {}", uri, err.to_string()]
))
}));
if !response.status().is_success() {
error!(self.logger, "Request status failed: {:?}", response);
gst_error!(cat, obj: src, "Request status failed: {:?}", response);
return Err(error_msg!(
SourceError::ReadFailed,
gst::ResourceError::Read,
["Failed to fetch {}: {}", uri, response.status()]
));
}
......@@ -121,12 +122,12 @@ impl HttpSrc {
if position != start {
return Err(error_msg!(
SourceError::SeekFailed,
gst::ResourceError::Seek,
["Failed to seek to {}: Got {}", start, position]
));
}
debug!(self.logger, "Request successful: {:?}", response);
gst_debug!(cat, obj: src, "Request successful: {:?}", response);
Ok(StreamingState::Started {
uri: uri,
......@@ -143,8 +144,8 @@ impl HttpSrc {
fn validate_uri(uri: &Url) -> Result<(), UriError> {
if uri.scheme() != "http" && uri.scheme() != "https" {
return Err(UriError::new(
UriErrorKind::UnsupportedProtocol,
Some(format!("Unsupported URI '{}'", uri.as_str())),
gst::URIError::UnsupportedProtocol,
format!("Unsupported URI '{}'", uri.as_str()),
));
}
......@@ -156,34 +157,39 @@ impl Source for HttpSrc {
Box::new(validate_uri)
}
fn is_seekable(&self) -> bool {
fn is_seekable(&self, _src: &RsSrcWrapper) -> bool {
match self.streaming_state {
StreamingState::Started { seekable, .. } => seekable,
_ => false,
}
}
fn get_size(&self) -> Option<u64> {
fn get_size(&self, _src: &RsSrcWrapper) -> Option<u64> {
match self.streaming_state {
StreamingState::Started { size, .. } => size,
_ => None,
}
}
fn start(&mut self, uri: Url) -> Result<(), ErrorMessage> {
fn start(&mut self, src: &RsSrcWrapper, uri: Url) -> Result<(), ErrorMessage> {
self.streaming_state = StreamingState::Stopped;
self.streaming_state = try!(self.do_request(uri, 0, None));
self.streaming_state = try!(self.do_request(src, uri, 0, None));
Ok(())
}
fn stop(&mut self) -> Result<(), ErrorMessage> {
fn stop(&mut self, _src: &RsSrcWrapper) -> Result<(), ErrorMessage> {
self.streaming_state = StreamingState::Stopped;
Ok(())
}
fn seek(&mut self, start: u64, stop: Option<u64>) -> Result<(), ErrorMessage> {