Commit eec9bd8d authored by Sebastian Dröge's avatar Sebastian Dröge 🍵

filesink: Implement workaround for some (network) filesystems that spuriously...

filesink: Implement workaround for some (network) filesystems that spuriously return EACCES on write

This seems to happen when another client is accessing the file at the
same time, and retrying after a short amount of time solves it.

Sometimes partial data is written at that point already but we have no
idea how much it is, or if what was written is correct (it sometimes
isn't) so we always first seek back to the current position and repeat
the whole failed write.

It happens at least on Linux and macOS on SMB/CIFS and NFS file systems.

Between write attempts that failed with EACCES we wait 10ms, and after
enough consecutive tries that failed with EACCES we simply time out.

In theory a valid EACCES for files to which we simply have no access
should've happened already during the call to open(), except for NFS
(see open(2)).

This can be enabled with the new max-transient-error-timeout property, and
a new o-sync boolean property was added to open the file in O_SYNC mode
as without that it's not guaranteed that we get EACCES for the actual
writev() call that failed but might only get it at a later time.

Fixes gstreamer/gstreamer#305
parent 162c59b4
......@@ -32,6 +32,7 @@
#ifdef HAVE_SYS_UIO_H
#include <sys/uio.h>
#endif
#include <sys/types.h>
#include <errno.h>
#include <string.h>
#include <string.h>
......@@ -39,6 +40,11 @@
#include "gstelements_private.h"
#ifdef G_OS_WIN32
# include <io.h> /* lseek, open, close, read */
# undef lseek
# define lseek _lseeki64
# undef off_t
# define off_t guint64
# define WIN32_LEAN_AND_MEAN /* prevents from including too many things */
# include <windows.h>
# undef WIN32_LEAN_AND_MEAN
......@@ -215,13 +221,20 @@ fill_vectors (struct iovec *vecs, GstMapInfo * maps, guint n, GstBuffer * buf)
GstFlowReturn
gst_writev_buffers (GstObject * sink, gint fd, GstPoll * fdset,
GstBuffer ** buffers, guint num_buffers, guint8 * mem_nums,
guint total_mem_num, guint64 * bytes_written, guint64 skip)
guint total_mem_num, guint64 * bytes_written, guint64 skip,
gint max_transient_error_timeout, guint64 current_position,
gboolean * flushing)
{
struct iovec *vecs;
GstMapInfo *map_infos;
GstFlowReturn flow_ret;
gsize size = 0;
guint i, j;
gint64 start_time = 0;
max_transient_error_timeout *= 1000;
if (max_transient_error_timeout)
start_time = g_get_monotonic_time ();
GST_LOG_OBJECT (sink, "%u buffers, %u memories", num_buffers, total_mem_num);
......@@ -248,6 +261,11 @@ gst_writev_buffers (GstObject * sink, gint fd, GstPoll * fdset,
}
do {
if (flushing != NULL && g_atomic_int_get (flushing)) {
GST_DEBUG_OBJECT (sink, "Flushing, exiting loop");
flow_ret = GST_FLOW_FLUSHING;
goto out;
}
#ifndef HAVE_WIN32
if (fdset != NULL) {
do {
......@@ -279,9 +297,45 @@ gst_writev_buffers (GstObject * sink, gint fd, GstPoll * fdset,
if (ret < 0 && (errno == EAGAIN || errno == EWOULDBLOCK)) {
/* do nothing, try again */
if (max_transient_error_timeout)
start_time = g_get_monotonic_time ();
} else if (ret < 0 && errno == EACCES && max_transient_error_timeout > 0) {
/* seek back to where we started writing and try again after sleeping
* for 10ms.
*
* Some network file systems report EACCES spuriously, presumably
* because at the same time another client is reading the file.
* It happens at least on Linux and macOS on SMB/CIFS and NFS file
* systems.
*
* Note that NFS does not check access permissions during open()
* but only on write()/read() according to open(2), so we would
* loop here in case of NFS.
*/
if (g_get_monotonic_time () > start_time + max_transient_error_timeout) {
GST_ERROR_OBJECT (sink, "Got EACCES for more than %dms, failing",
max_transient_error_timeout);
goto write_error;
}
GST_DEBUG_OBJECT (sink, "got EACCES, retry after 10ms sleep");
g_assert (current_position != -1);
g_usleep (10000);
/* Seek back to the current position, sometimes a partial write
* happened and we have no idea how much and if what was written
* is actually correct (it sometimes isn't)
*/
ret = lseek (fd, current_position + *bytes_written, SEEK_SET);
if (ret < 0 || ret != current_position + *bytes_written) {
GST_ERROR_OBJECT (sink,
"failed to seek back to current write position");
goto write_error;
}
} else if (ret < 0) {
goto write_error;
} else if (ret < left) {
} else { /* if (ret < left) */
if (max_transient_error_timeout)
start_time = g_get_monotonic_time ();
/* skip vectors that have been written in full */
while (ret >= vecs[0].iov_len) {
ret -= vecs[0].iov_len;
......
......@@ -37,7 +37,9 @@ G_GNUC_INTERNAL
GstFlowReturn gst_writev_buffers (GstObject * sink, gint fd, GstPoll * fdset,
GstBuffer ** buffers, guint num_buffers,
guint8 * mem_nums, guint total_mem_num,
guint64 * bytes_written, guint64 skip);
guint64 * bytes_written, guint64 skip,
gint max_transient_error_timeout, guint64 current_position,
gboolean * flushing);
G_END_DECLS
......
......@@ -249,7 +249,8 @@ gst_fd_sink_render_buffers (GstFdSink * sink, GstBuffer ** buffers,
guint64 bytes_written = 0;
ret = gst_writev_buffers (GST_OBJECT_CAST (sink), sink->fd, sink->fdset,
buffers, num_buffers, mem_nums, total_mems, &bytes_written, skip);
buffers, num_buffers, mem_nums, total_mems, &bytes_written, skip,
0, -1, NULL);
sink->bytes_written += bytes_written;
sink->current_pos += bytes_written;
......
......@@ -49,6 +49,7 @@
#include "gstfilesink.h"
#include <string.h>
#include <sys/types.h>
#include <fcntl.h>
#ifdef G_OS_WIN32
#include <io.h> /* lseek, open, close, read */
......@@ -106,6 +107,8 @@ GST_DEBUG_CATEGORY_STATIC (gst_file_sink_debug);
#define DEFAULT_BUFFER_MODE GST_FILE_SINK_BUFFER_MODE_DEFAULT
#define DEFAULT_BUFFER_SIZE 64 * 1024
#define DEFAULT_APPEND FALSE
#define DEFAULT_O_SYNC FALSE
#define DEFAULT_MAX_TRANSIENT_ERROR_TIMEOUT 0
enum
{
......@@ -114,6 +117,8 @@ enum
PROP_BUFFER_MODE,
PROP_BUFFER_SIZE,
PROP_APPEND,
PROP_O_SYNC,
PROP_MAX_TRANSIENT_ERROR_TIMEOUT,
PROP_LAST
};
......@@ -121,12 +126,12 @@ enum
* use the 'file pointer' opened in glib (and returned from this function)
* in this library, as they may have unrelated C runtimes. */
static FILE *
gst_fopen (const gchar * filename, const gchar * mode)
gst_fopen (const gchar * filename, const gchar * mode, gboolean o_sync)
{
FILE *retval;
#ifdef G_OS_WIN32
wchar_t *wfilename = g_utf8_to_utf16 (filename, -1, NULL, NULL, NULL);
wchar_t *wmode;
FILE *retval;
int save_errno;
if (wfilename == NULL) {
......@@ -151,7 +156,23 @@ gst_fopen (const gchar * filename, const gchar * mode)
errno = save_errno;
return retval;
#else
return fopen (filename, mode);
int fd;
int flags = O_CREAT | O_WRONLY;
if (strcmp (mode, "wb") == 0)
flags |= O_TRUNC;
else if (strcmp (mode, "ab") == 0)
flags |= O_APPEND;
else
g_assert_not_reached ();
if (o_sync)
flags |= O_SYNC;
fd = open (filename, flags, 0666);
retval = fdopen (fd, mode);
return retval;
#endif
}
......@@ -172,6 +193,8 @@ static GstFlowReturn gst_file_sink_render (GstBaseSink * sink,
GstBuffer * buffer);
static GstFlowReturn gst_file_sink_render_list (GstBaseSink * sink,
GstBufferList * list);
static gboolean gst_file_sink_unlock (GstBaseSink * sink);
static gboolean gst_file_sink_unlock_stop (GstBaseSink * sink);
static gboolean gst_file_sink_do_seek (GstFileSink * filesink,
guint64 new_offset);
......@@ -230,6 +253,19 @@ gst_file_sink_class_init (GstFileSinkClass * klass)
"Append to an already existing file", DEFAULT_APPEND,
G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
g_object_class_install_property (gobject_class, PROP_O_SYNC,
g_param_spec_boolean ("o-sync", "Synchronous IO",
"Open the file with O_SYNC for enabling synchronous IO",
DEFAULT_O_SYNC, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
g_object_class_install_property (gobject_class,
PROP_MAX_TRANSIENT_ERROR_TIMEOUT,
g_param_spec_int ("max-transient-error-timeout",
"Max Transient Error Timeout",
"Retry up to this many ms on transient errors (currently EACCES)", 0,
G_MAXINT, DEFAULT_MAX_TRANSIENT_ERROR_TIMEOUT,
G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
gst_element_class_set_static_metadata (gstelement_class,
"File Sink",
"Sink/File", "Write stream to a file",
......@@ -243,6 +279,9 @@ gst_file_sink_class_init (GstFileSinkClass * klass)
gstbasesink_class->render_list =
GST_DEBUG_FUNCPTR (gst_file_sink_render_list);
gstbasesink_class->event = GST_DEBUG_FUNCPTR (gst_file_sink_event);
gstbasesink_class->unlock = GST_DEBUG_FUNCPTR (gst_file_sink_unlock);
gstbasesink_class->unlock_stop =
GST_DEBUG_FUNCPTR (gst_file_sink_unlock_stop);
if (sizeof (off_t) < 8) {
GST_LOG ("No large file support, sizeof (off_t) = %" G_GSIZE_FORMAT "!",
......@@ -330,6 +369,12 @@ gst_file_sink_set_property (GObject * object, guint prop_id,
case PROP_APPEND:
sink->append = g_value_get_boolean (value);
break;
case PROP_O_SYNC:
sink->o_sync = g_value_get_boolean (value);
break;
case PROP_MAX_TRANSIENT_ERROR_TIMEOUT:
sink->max_transient_error_timeout = g_value_get_int (value);
break;
default:
G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
break;
......@@ -355,6 +400,12 @@ gst_file_sink_get_property (GObject * object, guint prop_id, GValue * value,
case PROP_APPEND:
g_value_set_boolean (value, sink->append);
break;
case PROP_O_SYNC:
g_value_set_boolean (value, sink->o_sync);
break;
case PROP_MAX_TRANSIENT_ERROR_TIMEOUT:
g_value_set_int (value, sink->max_transient_error_timeout);
break;
default:
G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
break;
......@@ -369,9 +420,9 @@ gst_file_sink_open_file (GstFileSink * sink)
goto no_filename;
if (sink->append)
sink->file = gst_fopen (sink->filename, "ab");
sink->file = gst_fopen (sink->filename, "ab", sink->o_sync);
else
sink->file = gst_fopen (sink->filename, "wb");
sink->file = gst_fopen (sink->filename, "wb", sink->o_sync);
if (sink->file == NULL)
goto open_failed;
......@@ -652,13 +703,21 @@ static GstFlowReturn
gst_file_sink_render_buffers (GstFileSink * sink, GstBuffer ** buffers,
guint num_buffers, guint8 * mem_nums, guint total_mems, gsize size)
{
GstFlowReturn ret;
guint64 bytes_written = 0;
GST_DEBUG_OBJECT (sink,
"writing %u buffers (%u memories, %" G_GSIZE_FORMAT
" bytes) at position %" G_GUINT64_FORMAT, num_buffers, total_mems, size,
sink->current_pos);
return gst_writev_buffers (GST_OBJECT_CAST (sink), fileno (sink->file), NULL,
buffers, num_buffers, mem_nums, total_mems, &sink->current_pos, 0);
ret = gst_writev_buffers (GST_OBJECT_CAST (sink), fileno (sink->file), NULL,
buffers, num_buffers, mem_nums, total_mems, &bytes_written, 0,
sink->max_transient_error_timeout, sink->current_pos, &sink->flushing);
sink->current_pos += bytes_written;
return ret;
}
static GstFlowReturn
......@@ -857,13 +916,45 @@ gst_file_sink_render (GstBaseSink * sink, GstBuffer * buffer)
static gboolean
gst_file_sink_start (GstBaseSink * basesink)
{
return gst_file_sink_open_file (GST_FILE_SINK (basesink));
GstFileSink *filesink;
filesink = GST_FILE_SINK_CAST (basesink);
g_atomic_int_set (&filesink->flushing, FALSE);
return gst_file_sink_open_file (filesink);
}
static gboolean
gst_file_sink_stop (GstBaseSink * basesink)
{
gst_file_sink_close_file (GST_FILE_SINK (basesink));
GstFileSink *filesink;
filesink = GST_FILE_SINK_CAST (basesink);
gst_file_sink_close_file (filesink);
g_atomic_int_set (&filesink->flushing, TRUE);
return TRUE;
}
static gboolean
gst_file_sink_unlock (GstBaseSink * basesink)
{
GstFileSink *filesink;
filesink = GST_FILE_SINK_CAST (basesink);
g_atomic_int_set (&filesink->flushing, TRUE);
return TRUE;
}
static gboolean
gst_file_sink_unlock_stop (GstBaseSink * basesink)
{
GstFileSink *filesink;
filesink = GST_FILE_SINK_CAST (basesink);
g_atomic_int_set (&filesink->flushing, FALSE);
return TRUE;
}
......
......@@ -85,6 +85,10 @@ struct _GstFileSink {
guint current_buffer_size;
gboolean append;
gboolean o_sync;
gint max_transient_error_timeout;
gboolean flushing;
};
struct _GstFileSinkClass {
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment