Commit aff2b8ff authored by Marcin Kolny's avatar Marcin Kolny

Add plugin to stream data to AWS S3 bucket

parent 42d4d10b
Pipeline #45050 passed with stages
in 14 minutes and 11 seconds
......@@ -35,3 +35,15 @@ $ gst-launch-1.0 \
s3src uri=s3://ap-south-1/my-bucket/my-object-key/which-can-have-slashes?version=my-optional-version !
filesink name=my-object.out
```
## s3sink
Writes data to a specified S3 bucket. The `region` parameter is optional, and if not specified, the default parameter will be used (from `.aws/config` file).
```
$ gst-launch-1.0 \
videotestsrc ! \
theoraenc ! \
oggmux ! \
s3sink bucket=example-bucket key=my/file.ogv region=us-west-1
```
......@@ -14,11 +14,16 @@ extern crate glib;
extern crate gstreamer as gst;
extern crate gstreamer_base as gst_base;
mod s3sink;
mod s3src;
mod s3url;
mod s3utils;
fn plugin_init(plugin: &gst::Plugin) -> Result<(), glib::BoolError> {
s3src::register(plugin)
s3sink::register(plugin)?;
s3src::register(plugin)?;
Ok(())
}
gst_plugin_define!(
......
This diff is collapsed.
......@@ -26,6 +26,7 @@ use gst_base::prelude::*;
use gst_base::subclass::prelude::*;
use crate::s3url::*;
use crate::s3utils;
enum StreamingState {
Stopped,
......@@ -64,36 +65,6 @@ impl S3Src {
}
}
fn wait<F>(&self, future: F) -> Result<F::Item, Option<gst::ErrorMessage>>
where
F: Send + Future<Error = gst::ErrorMessage> + 'static,
F::Item: Send,
{
let mut canceller = self.canceller.lock().unwrap();
let (sender, receiver) = oneshot::channel::<Bytes>();
canceller.replace(sender);
let unlock_error = gst_error_msg!(gst::ResourceError::Busy, ["unlock"]);
let res = oneshot::spawn(future, &self.runtime.executor())
.select(receiver.then(|_| Err(unlock_error.clone())))
.wait()
.map(|v| v.0)
.map_err(|err| {
if err.0 == unlock_error {
None
} else {
Some(err.0)
}
});
/* Clear out the canceller */
*canceller = None;
res
}
fn connect(self: &S3Src, url: &GstS3Url) -> Result<S3Client, gst::ErrorMessage> {
Ok(S3Client::new(url.region.clone()))
}
......@@ -137,19 +108,22 @@ impl S3Src {
let response = client.head_object(request);
let output = self
.wait(response.map_err(|err| {
let output = s3utils::wait(
&self.canceller,
&self.runtime,
response.map_err(|err| {
gst_error_msg!(
gst::ResourceError::NotFound,
["Failed to HEAD object: {}", err]
)
}))
.map_err(|err| {
err.unwrap_or(gst_error_msg!(
gst::LibraryError::Failed,
["Interrupted during start"]
))
})?;
}),
)
.map_err(|err| {
err.unwrap_or(gst_error_msg!(
gst::LibraryError::Failed,
["Interrupted during start"]
))
})?;
if let Some(size) = output.content_length {
gst_info!(
......@@ -212,9 +186,13 @@ impl S3Src {
* interruptible */
drop(state);
let output = self.wait(response.map_err(|err| {
gst_error_msg!(gst::ResourceError::Read, ["Could not read: {}", err])
}))?;
let output = s3utils::wait(
&self.canceller,
&self.runtime,
response.map_err(|err| {
gst_error_msg!(gst::ResourceError::Read, ["Could not read: {}", err])
}),
)?;
gst_debug!(
self.cat,
......@@ -223,7 +201,9 @@ impl S3Src {
output.content_length.unwrap()
);
self.wait(
s3utils::wait(
&self.canceller,
&self.runtime,
output.body.unwrap().concat2().map_err(|err| {
gst_error_msg!(gst::ResourceError::Read, ["Could not read: {}", err])
}),
......
// Copyright (C) 2017 Author: Arun Raghavan <arun@arunraghavan.net>
//
// Licensed under the Apache License, Version 2.0 <LICENSE-APACHE or
// http://www.apache.org/licenses/LICENSE-2.0> or the MIT license
// <LICENSE-MIT or http://opensource.org/licenses/MIT>, at your
// option. This file may not be copied, modified, or distributed
// except according to those terms.
use futures::sync::oneshot;
use futures::Future;
use std::sync::Mutex;
use tokio::runtime;
pub fn wait<F, T>(
canceller: &Mutex<Option<oneshot::Sender<T>>>,
runtime: &runtime::Runtime,
future: F,
) -> Result<F::Item, Option<gst::ErrorMessage>>
where
F: Send + Future<Error = gst::ErrorMessage> + 'static,
F::Item: Send,
{
let mut canceller = canceller.lock().unwrap();
let (sender, receiver) = oneshot::channel::<T>();
canceller.replace(sender);
let unlock_error = gst_error_msg!(gst::ResourceError::Busy, ["unlock"]);
let res = oneshot::spawn(future, &runtime.executor())
.select(receiver.then(|_| Err(unlock_error.clone())))
.wait()
.map(|v| v.0)
.map_err(|err| {
if err.0 == unlock_error {
None
} else {
Some(err.0)
}
});
/* Clear out the canceller */
*canceller = None;
res
}
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment