filesink.rs 4.22 KB
Newer Older
1
2
// Copyright (C) 2016-2017 Sebastian Dröge <sebastian@centricular.com>
//               2016 Luis de Bethencourt <luisbg@osg.samsung.com>
3
//
4
5
6
7
8
// Licensed under the Apache License, Version 2.0 <LICENSE-APACHE or
// http://www.apache.org/licenses/LICENSE-2.0> or the MIT license
// <LICENSE-MIT or http://opensource.org/licenses/MIT>, at your
// option. This file may not be copied, modified, or distributed
// except according to those terms.
9

10
11
12
13
use std::fs::File;
use url::Url;

use std::io::Write;
14
use std::convert::From;
15

16
17
18
use gst_plugin::error::*;
use gst_plugin::sink::*;
use gst_plugin::buffer::*;
19
20
21
22
use gst_plugin::utils::*;
use gst_plugin::log::*;

use slog::*;
23

Sebastian Dröge's avatar
Sebastian Dröge committed
24
25
26
27
28
29
#[derive(Debug)]
enum StreamingState {
    Stopped,
    Started { file: File, position: u64 },
}

30
31
#[derive(Debug)]
pub struct FileSink {
32
    streaming_state: StreamingState,
33
    logger: Logger,
34
35
36
}

impl FileSink {
37
38
39
40
41
42
43
44
45
    pub fn new(element: Element) -> FileSink {
        FileSink {
            streaming_state: StreamingState::Stopped,
            logger: Logger::root(GstDebugDrain::new(Some(&element),
                                                    "rsfilesink",
                                                    0,
                                                    "Rust file sink"),
                                 None),
        }
46
47
    }

48
49
    pub fn new_boxed(element: Element) -> Box<Sink> {
        Box::new(FileSink::new(element))
50
51
52
    }
}

53
fn validate_uri(uri: &Url) -> Result<(), UriError> {
54
55
    let _ = try!(uri.to_file_path().or_else(|_| {
                                                Err(UriError::new(UriErrorKind::UnsupportedProtocol,
56
                              Some(format!("Unsupported file URI '{}'", uri.as_str()))))
57
                                            }));
58
59
    Ok(())
}
60

61
62
63
impl Sink for FileSink {
    fn uri_validator(&self) -> Box<UriValidator> {
        Box::new(validate_uri)
64
65
    }

66
    fn start(&mut self, uri: Url) -> Result<(), ErrorMessage> {
67
        if let StreamingState::Started { .. } = self.streaming_state {
68
            return Err(error_msg!(SinkError::Failure, ["Sink already started"]));
Sebastian Dröge's avatar
Sebastian Dröge committed
69
        }
70

71
72
73
74
75
        let location = try!(uri.to_file_path().or_else(|_| {
            error!(self.logger, "Unsupported file URI '{}'", uri.as_str());
            Err(error_msg!(SinkError::Failure,
                           ["Unsupported file URI '{}'", uri.as_str()]))
        }));
76

77
78

        let file = try!(File::create(location.as_path()).or_else(|err| {
79
80
81
            error!(self.logger,
                   "Could not open file for writing: {}",
                   err.to_string());
82
83
84
85
86
87
            Err(error_msg!(SinkError::OpenFailed,
                           ["Could not open file for writing '{}': {}",
                            location.to_str().unwrap_or("Non-UTF8 path"),
                            err.to_string()]))
        }));

88
89
        debug!(self.logger, "Opened file {:?}", file);

90
        self.streaming_state = StreamingState::Started {
91
92
93
94
95
            file: file,
            position: 0,
        };

        Ok(())
96
97
    }

98
99
    fn stop(&mut self) -> Result<(), ErrorMessage> {
        self.streaming_state = StreamingState::Stopped;
100

101
        Ok(())
102
103
    }

104
    fn render(&mut self, buffer: &Buffer) -> Result<(), FlowError> {
105
106
107
108
109
        // FIXME: Because we borrow streaming state mutably below
        let logger = self.logger.clone();

        trace!(logger, "Rendering {:?}", buffer);

110
        let (file, position) = match self.streaming_state {
111
112
113
            StreamingState::Started { ref mut file, ref mut position } => (file, position),
            StreamingState::Stopped => {
                return Err(FlowError::Error(error_msg!(SinkError::Failure, ["Not started yet"])));
114
            }
115
116
        };

117
118
119
120
121
122
123
124
125
        let map = match buffer.map_read() {
            None => {
                return Err(FlowError::Error(error_msg!(SinkError::Failure,
                                                       ["Failed to map buffer"])));
            }
            Some(map) => map,
        };
        let data = map.as_slice();

126
        try!(file.write_all(data).or_else(|err| {
127
            error!(logger, "Failed to write: {}", err);
128
129
130
131
            Err(FlowError::Error(error_msg!(SinkError::WriteFailed, ["Failed to write: {}", err])))
        }));

        *position += data.len() as u64;
132

133
        Ok(())
134
    }
135
}