Commit 0dd47bb7 authored by Manuel Stoeckl's avatar Manuel Stoeckl
Browse files

Compress large data transfer messages

waypipe can now compress the large data transfer blocks produced
by collect_update by itself, instead of relying on the channel
transport method (typically ssh -C) to do so. This change adds
dependencies for the two different compression methods, zstd and
lz4, both known for their high compression and decompression rates.
parent f198795a
......@@ -47,6 +47,8 @@ Requirements:
* wayland-protocols (>= 1.12, for the xdg-shell protocol, and others)
* libffi
* libgbm (from a recent version of mesa)
* libzstd
* liblz4
* scdoc (optional, to generate a man page)
* sys/sdt.h (optional, to provide static tracepoints for profiling)
* ssh (runtime, OpenSSH >= 6.7, for Unix domain socket forwarding)
......
......@@ -55,31 +55,27 @@ struct pidstack {
pid_t proc;
};
static int run_client_child(int chanfd, const char *drm_node, bool no_gpu,
const char *socket_path)
int run_client(const char *socket_path, const char *drm_node,
enum compression_mode compression, bool oneshot, bool no_gpu,
pid_t eol_pid)
{
wp_log(WP_DEBUG, "I'm a client on %s!", socket_path);
struct wl_display *display = wl_display_connect(NULL);
if (!display) {
wp_log(WP_ERROR, "Failed to connect to a wayland server.");
return EXIT_FAILURE;
}
int dispfd = wl_display_get_fd(display);
int retcode = main_interface_loop(
chanfd, dispfd, drm_node, no_gpu, true);
wl_display_disconnect(display);
return retcode;
}
int run_client(const char *socket_path, const char *drm_node, bool oneshot,
bool no_gpu, pid_t eol_pid)
{
if (verify_connection() == -1) {
wp_log(WP_ERROR, "Failed to connect to a wayland compositor.");
if (eol_pid) {
waitpid(eol_pid, NULL, 0);
struct wl_display *display = NULL;
if (oneshot) {
display = wl_display_connect(NULL);
if (!display) {
wp_log(WP_ERROR,
"Failed to connect to a wayland server.");
return EXIT_FAILURE;
}
} else {
if (verify_connection() == -1) {
wp_log(WP_ERROR,
"Failed to connect to a wayland compositor.");
if (eol_pid) {
waitpid(eol_pid, NULL, 0);
}
return EXIT_FAILURE;
}
return EXIT_FAILURE;
}
wp_log(WP_DEBUG, "A wayland compositor is available. Proceeding.");
......@@ -90,6 +86,9 @@ int run_client(const char *socket_path, const char *drm_node, bool oneshot,
if (eol_pid) {
waitpid(eol_pid, NULL, 0);
}
if (display) {
wl_display_disconnect(display);
}
return EXIT_FAILURE;
}
......@@ -143,8 +142,10 @@ int run_client(const char *socket_path, const char *drm_node, bool oneshot,
break;
} else {
if (oneshot) {
retcode = run_client_child(chanclient, drm_node,
no_gpu, socket_path);
retcode = main_interface_loop(chanclient,
wl_display_get_fd(display),
drm_node, compression, no_gpu,
true);
break;
} else {
pid_t npid = fork();
......@@ -160,8 +161,23 @@ int run_client(const char *socket_path, const char *drm_node, bool oneshot,
}
close(channelsock);
run_client_child(chanclient, drm_node,
no_gpu, socket_path);
struct wl_display *local_display =
wl_display_connect(
NULL);
if (!local_display) {
wp_log(WP_ERROR,
"Failed to connect to a wayland server.");
return EXIT_FAILURE;
}
int dispfd = wl_display_get_fd(
local_display);
// ignore retcode ?
main_interface_loop(chanclient, dispfd,
drm_node, compression,
no_gpu, true);
wl_display_disconnect(local_display);
// exit path?
return EXIT_SUCCESS;
} else if (npid == -1) {
......@@ -183,6 +199,9 @@ int run_client(const char *socket_path, const char *drm_node, bool oneshot,
}
}
if (display) {
wl_display_disconnect(display);
}
close(channelsock);
unlink(socket_path);
int cleanup_type = shutdown_flag ? WNOHANG : 0;
......
......@@ -165,7 +165,7 @@ static void collect_updates(struct fd_translation_map *map, int *ntransfers,
struct transfer transfers[])
{
for (struct shadow_fd *cur = map->list; cur; cur = cur->next) {
collect_update(cur, ntransfers, transfers);
collect_update(map, cur, ntransfers, transfers);
}
}
......@@ -290,7 +290,7 @@ static void pack_pipe_message(struct block_transfer *bt, int nids,
sd->id = transfers[i].obj_id;
sd->type = (int)transfers[i].type;
sd->size = (int)transfers[i].size;
sd->special = transfers[i].special;
sd->special = transfers[i].special.raw;
size_t tsize = transfers[i].size;
size_t num_longs = (tsize + sizeof(uint64_t) - 1) /
......@@ -299,7 +299,7 @@ static void pack_pipe_message(struct block_transfer *bt, int nids,
bt->blocks[2 * i + 1].iov_base = sd;
bt->blocks[2 * i + 2].iov_len = sizeof(uint64_t) * num_longs;
bt->blocks[2 * i + 2].iov_base = transfers[i].data;
bt->blocks[2 * i + 2].iov_base = (void *)transfers[i].data;
total_size += sizeof(struct pipe_elem_header) +
sizeof(uint64_t) * num_longs;
......@@ -351,7 +351,7 @@ static void unpack_pipe_message(size_t msglen, const char *msg, int *waylen,
transfers[nt].size = (size_t)sd->size;
transfers[nt].type = (fdcat_t)sd->type;
transfers[nt].data = (char *)data;
transfers[nt].special = sd->special;
transfers[nt].special.raw = sd->special;
nt++;
}
size_t nlongs = ((size_t)sd->size + 7) / 8;
......@@ -833,7 +833,8 @@ static int advance_waymsg_transfer(struct fd_translation_map *map,
}
int main_interface_loop(int chanfd, int progfd, const char *drm_node,
bool no_gpu, bool display_side)
enum compression_mode compression, bool no_gpu,
bool display_side)
{
const char *progdesc = display_side ? "compositor" : "application";
if (set_fnctl_flag(chanfd, O_NONBLOCK | O_CLOEXEC) == -1) {
......@@ -904,6 +905,7 @@ int main_interface_loop(int chanfd, int progfd, const char *drm_node,
.local_sign = (display_side ? -1 : 1),
.list = NULL,
.max_local_id = 1,
.compression = compression,
.rdata = {.disabled = no_gpu,
.drm_node_path = drm_node,
.drm_fd = -1,
......
......@@ -47,6 +47,8 @@ rt = cc.find_library('rt')
if cc.has_header('sys/sdt.h')
add_project_arguments('-DHAS_USDT=1', language : 'c')
endif
liblz4 = dependency('liblz4')
libzstd = dependency('libzstd')
subdir('protocols')
......@@ -56,6 +58,8 @@ executable(
dependencies : [
libgbm, # General GPU buffer creation, aligned with dmabuf proto
libffi, # To call wayland protocol functions
liblz4, # Fast compression option
libzstd, # Slow compression option
protos, # Wayland protocol data
rt, # For shared memory
wayland_client # For wl_display_connnect
......
......@@ -59,7 +59,8 @@ static int connect_to_channel(const char *socket_path)
return chanfd;
}
int run_server(const char *socket_path, const char *drm_node, bool oneshot,
int run_server(const char *socket_path, const char *drm_node,
enum compression_mode compression, bool oneshot,
bool unlink_at_end, const char *application,
char *const app_argv[])
{
......@@ -148,7 +149,7 @@ int run_server(const char *socket_path, const char *drm_node, bool oneshot,
wp_log(WP_DEBUG, "Oneshot connected");
if (chanfd != -1) {
retval = main_interface_loop(chanfd, server_link,
drm_node, false, false);
drm_node, compression, false, false);
} else {
retval = EXIT_FAILURE;
}
......@@ -222,7 +223,8 @@ int run_server(const char *socket_path, const char *drm_node, bool oneshot,
socket_path);
return main_interface_loop(chanfd,
appfd, drm_node, false,
appfd, drm_node,
compression, false,
false);
} else if (npid == -1) {
wp_log(WP_DEBUG, "Fork failure");
......
......@@ -38,6 +38,9 @@
#include <sys/stat.h>
#include <unistd.h>
#include <lz4.h>
#include <zstd.h>
bool fdcat_ispipe(fdcat_t t)
{
return t == FDC_PIPE_IR || t == FDC_PIPE_RW || t == FDC_PIPE_IW;
......@@ -67,15 +70,17 @@ static void destroy_unlinked_sfd(
{
if (shadow->type == FDC_FILE) {
munmap(shadow->file_mem_local, shadow->file_size);
free(shadow->file_mem_mirror);
free(shadow->file_diff_buffer);
free(shadow->mem_mirror);
free(shadow->diff_buffer);
free(shadow->compress_buffer);
if (shadow->file_shm_buf_name[0]) {
shm_unlink(shadow->file_shm_buf_name);
}
} else if (shadow->type == FDC_DMABUF) {
destroy_dmabuf(shadow->dmabuf_bo);
free(shadow->dmabuf_mem_mirror);
free(shadow->dmabuf_diff_buffer);
free(shadow->mem_mirror);
free(shadow->diff_buffer);
free(shadow->compress_buffer);
} else if (fdcat_ispipe(shadow->type)) {
if (shadow->pipe_fd != shadow->fd_local &&
shadow->pipe_fd != -1 &&
......@@ -155,6 +160,111 @@ fdcat_t get_fd_type(int fd, size_t *size)
}
}
static size_t compress_bufsize(struct fd_translation_map *map, size_t max_input)
{
switch (map->compression) {
default:
case COMP_NONE:
return 0;
case COMP_LZ4:
return (size_t)max(
LZ4_compressBound((int)max_input), max_input);
case COMP_ZSTD:
return ZSTD_compressBound(max_input);
}
}
/* With the selected compression method, compress the buffer {isize,ibuf},
* possibly modifying {msize,mbuf}, and setting {wsize,wbuf} to indicate
* the result */
static void compress_buffer(struct fd_translation_map *map, size_t isize,
const char *ibuf, size_t msize, char *mbuf, size_t *wsize,
const char **wbuf)
{
// Ensure inputs always nontrivial
if (isize == 0) {
*wsize = 0;
*wbuf = ibuf;
return;
}
switch (map->compression) {
case COMP_NONE:
*wsize = isize;
*wbuf = ibuf;
return;
case COMP_LZ4: {
int ws = LZ4_compress_default(
ibuf, mbuf, (int)isize, (int)msize);
if (ws == 0) {
wp_log(WP_ERROR,
"Lz4 compression failed for %d bytes in %d of space",
(int)isize, (int)msize);
}
*wsize = (size_t)ws;
*wbuf = mbuf;
return;
}
case COMP_ZSTD: {
size_t ws = ZSTD_compress(mbuf, msize, ibuf, isize, 5);
if (ZSTD_isError(ws)) {
wp_log(WP_ERROR,
"Zstd compression failed for %d bytes in %d of space: %s",
(int)isize, (int)msize,
ZSTD_getErrorName(ws));
}
*wsize = (size_t)ws;
*wbuf = mbuf;
return;
}
}
}
/* With the selected compression method, uncompress the buffer {isize,ibuf},
* possibly modifying {msize,mbuf}, and setting {wsize,wbuf} to indicate
* the result. msize should be set = the uncompressed buffer size, which
* should have been provided. */
static void uncompress_buffer(struct fd_translation_map *map, size_t isize,
const char *ibuf, size_t msize, char *mbuf, size_t *wsize,
const char **wbuf)
{
// Ensure inputs always nontrivial
if (isize == 0) {
*wsize = 0;
*wbuf = ibuf;
return;
}
switch (map->compression) {
case COMP_NONE:
*wsize = isize;
*wbuf = ibuf;
return;
case COMP_LZ4: {
int ws = LZ4_decompress_safe(
ibuf, mbuf, (int)isize, (int)msize);
if (ws < 0 || (size_t)ws != msize) {
wp_log(WP_ERROR,
"Lz4 decompression failed for %d bytes to %d of space, used %d",
(int)isize, (int)msize, ws);
}
*wsize = (size_t)ws;
*wbuf = mbuf;
return;
}
case COMP_ZSTD: {
size_t ws = ZSTD_decompress(mbuf, msize, ibuf, isize);
if (ZSTD_isError(ws) || (size_t)ws != msize) {
wp_log(WP_ERROR,
"Zstd decompression failed for %d bytes to %d of space: %s",
(int)isize, (int)msize,
ZSTD_getErrorName(ws));
}
*wsize = (size_t)ws;
*wbuf = mbuf;
return;
}
}
}
struct shadow_fd *translate_fd(struct fd_translation_map *map, int fd,
struct dmabuf_slice_data *info)
{
......@@ -172,7 +282,7 @@ struct shadow_fd *translate_fd(struct fd_translation_map *map, int fd,
map->list = shadow;
shadow->fd_local = fd;
shadow->file_mem_local = NULL;
shadow->file_mem_mirror = NULL;
shadow->mem_mirror = NULL;
shadow->file_size = (size_t)-1;
shadow->remote_id = (map->max_local_id++) * map->local_sign;
shadow->type = FDC_UNKNOWN;
......@@ -202,7 +312,7 @@ struct shadow_fd *translate_fd(struct fd_translation_map *map, int fd,
return shadow;
}
// This will be created at the first transfer
shadow->file_mem_mirror = NULL;
shadow->mem_mirror = NULL;
} else if (fdcat_ispipe(shadow->type)) {
// Make this end of the pipe nonblocking, so that we can include
// it in our main loop.
......@@ -230,8 +340,8 @@ struct shadow_fd *translate_fd(struct fd_translation_map *map, int fd,
// already zero initialized (no information).
}
// to be created on first transfer
shadow->dmabuf_mem_mirror = NULL;
shadow->dmabuf_diff_buffer = NULL;
shadow->mem_mirror = NULL;
shadow->diff_buffer = NULL;
shadow->type = FDC_DMABUF;
}
return shadow;
......@@ -356,8 +466,8 @@ static void apply_diff(size_t size, char *__restrict__ base, size_t diffsize,
}
}
void collect_update(struct shadow_fd *cur, int *ntransfers,
struct transfer transfers[])
void collect_update(struct fd_translation_map *map, struct shadow_fd *cur,
int *ntransfers, struct transfer transfers[])
{
if (cur->type == FDC_FILE) {
if (!cur->is_dirty) {
......@@ -374,55 +484,64 @@ void collect_update(struct shadow_fd *cur, int *ntransfers,
cur->dirty_interval_min = INT32_MAX;
cur->dirty_interval_max = INT32_MIN;
if (!cur->file_mem_mirror) {
if (!cur->mem_mirror) {
// increase space, to avoid overflow when
// writing this buffer along with padding
cur->file_mem_mirror =
calloc(align(cur->file_size, 8), 1);
cur->mem_mirror = calloc(align(cur->file_size, 8), 1);
// 8 extra bytes for worst case diff expansion
cur->file_diff_buffer =
cur->diff_buffer =
calloc(align(cur->file_size + 8, 8), 1);
memcpy(cur->file_mem_mirror, cur->file_mem_local,
memcpy(cur->mem_mirror, cur->file_mem_local,
cur->file_size);
cur->compress_space = compress_bufsize(
map, align(cur->file_size + 8, 8));
cur->compress_buffer = calloc(cur->compress_space, 1);
// new transfer, we send file contents verbatim
int nt = (*ntransfers)++;
transfers[nt].data = cur->file_mem_mirror;
transfers[nt].size = cur->file_size;
compress_buffer(map, cur->file_size, cur->mem_mirror,
cur->compress_space,
cur->compress_buffer,
&transfers[nt].size,
&transfers[nt].data);
transfers[nt].type = cur->type;
transfers[nt].obj_id = cur->remote_id;
transfers[nt].special = 0;
transfers[nt].special.file_actual_size = cur->file_size;
}
if (intv_min >= intv_max) {
return;
}
bool delta = memcmp(cur->file_mem_local + intv_min,
(cur->file_mem_mirror + intv_min),
(cur->mem_mirror + intv_min),
(size_t)(intv_max - intv_min)) !=
0;
if (!delta) {
return;
}
if (!cur->file_diff_buffer) {
if (!cur->diff_buffer) {
/* Create diff buffer by need for remote files
*/
cur->file_diff_buffer = calloc(cur->file_size + 8, 1);
cur->diff_buffer = calloc(cur->file_size + 8, 1);
}
size_t diffsize;
wp_log(WP_DEBUG, "Diff construction start");
construct_diff(cur->file_size, (size_t)intv_min,
(size_t)intv_max, cur->file_mem_mirror,
(size_t)intv_max, cur->mem_mirror,
cur->file_mem_local, &diffsize,
cur->file_diff_buffer);
cur->diff_buffer);
wp_log(WP_DEBUG, "Diff construction end: %ld/%ld", diffsize,
cur->file_size);
if (diffsize > 0) {
int nt = (*ntransfers)++;
transfers[nt].obj_id = cur->remote_id;
transfers[nt].data = cur->file_diff_buffer;
compress_buffer(map, diffsize, cur->diff_buffer,
cur->compress_space,
cur->compress_buffer,
&transfers[nt].size,
&transfers[nt].data);
transfers[nt].type = cur->type;
transfers[nt].size = diffsize;
transfers[nt].special = 0;
transfers[nt].special.file_actual_size = (int)diffsize;
}
} else if (cur->type == FDC_DMABUF) {
// If buffer is clean, do not check for changes
......@@ -432,15 +551,21 @@ void collect_update(struct shadow_fd *cur, int *ntransfers,
cur->is_dirty = false;
bool first = false;
if (!cur->dmabuf_mem_mirror) {
cur->dmabuf_mem_mirror = calloc(1, cur->dmabuf_size);
if (!cur->mem_mirror) {
cur->mem_mirror = calloc(1, cur->dmabuf_size);
// 8 extra bytes for diff messages, or
// alternatively for type header info
size_t diffb_size =
(size_t)max(sizeof(struct dmabuf_slice_data),
8) +
(size_t)align((int)cur->dmabuf_size, 8);
cur->dmabuf_diff_buffer = calloc(diffb_size, 1);
cur->diff_buffer = calloc(diffb_size, 1);
cur->compress_space = compress_bufsize(map, diffb_size);
cur->compress_buffer =
cur->compress_space
? calloc(cur->compress_space,
1)
: NULL;
first = true;
}
void *handle = NULL;
......@@ -455,25 +580,29 @@ void collect_update(struct shadow_fd *cur, int *ntransfers,
if (first) {
// Write diff with a header, and build mirror,
// only touching data once
memcpy(cur->dmabuf_mem_mirror, data, cur->dmabuf_size);
memcpy(cur->dmabuf_diff_buffer, &cur->dmabuf_info,
memcpy(cur->mem_mirror, data, cur->dmabuf_size);
memcpy(cur->diff_buffer, &cur->dmabuf_info,
sizeof(struct dmabuf_slice_data));
memcpy(cur->dmabuf_diff_buffer +
sizeof(struct dmabuf_slice_data),
cur->dmabuf_mem_mirror,
cur->dmabuf_size);
memcpy(cur->diff_buffer + sizeof(struct dmabuf_slice_data),
cur->mem_mirror, cur->dmabuf_size);
// new transfer, we send file contents verbatim
wp_log(WP_DEBUG, "Sending initial dmabuf");
int nt = (*ntransfers)++;
transfers[nt].data = cur->dmabuf_diff_buffer;
transfers[nt].size = cur->dmabuf_size +
sizeof(struct dmabuf_slice_data);
compress_buffer(map,
cur->dmabuf_size +
sizeof(struct dmabuf_slice_data),
cur->diff_buffer, cur->compress_space,
cur->compress_buffer,
&transfers[nt].size,
&transfers[nt].data);
transfers[nt].type = cur->type;
transfers[nt].obj_id = cur->remote_id;
transfers[nt].special = 0;
transfers[nt].special.file_actual_size =
cur->dmabuf_size +
sizeof(struct dmabuf_slice_data);
} else {
bool delta = memcmp(cur->dmabuf_mem_mirror, data,
bool delta = memcmp(cur->mem_mirror, data,
cur->dmabuf_size);
if (delta) {
// TODO: damage region support!
......@@ -481,19 +610,23 @@ void collect_update(struct shadow_fd *cur, int *ntransfers,
wp_log(WP_DEBUG, "Diff construction start");
construct_diff(cur->dmabuf_size, 0,
cur->dmabuf_size,
cur->dmabuf_mem_mirror, data,
&diffsize,
cur->dmabuf_diff_buffer);
cur->mem_mirror, data,
&diffsize, cur->diff_buffer);
wp_log(WP_DEBUG,
"Diff construction end: %ld/%ld",
diffsize, cur->dmabuf_size);
int nt = (*ntransfers)++;
transfers[nt].data = cur->diff_buffer;
compress_buffer(map, diffsize, cur->diff_buffer,
cur->compress_space,
cur->compress_buffer,
&transfers[nt].size,
&transfers[nt].data);
transfers[nt].obj_id = cur->remote_id;
transfers[nt].data = cur->dmabuf_diff_buffer;
transfers[nt].type = cur->type;
transfers[nt].size = diffsize;
transfers[nt].special = 0;
transfers[nt].special.file_actual_size =
diffsize;
}
}
if (unmap_dmabuf(cur->dmabuf_bo, handle) == -1) {
......@@ -519,9 +652,9 @@ void collect_update(struct shadow_fd *cur, int *ntransfers,
transfers[nt].size = cur->pipe_recv.used;
transfers[nt].type = cur->type;
transfers[nt].obj_id = cur->remote_id;
transfers[nt].special = 0;
transfers[nt].special.pipeclose = 0;
if (cur->pipe_lclosed && !cur->pipe_rclosed) {
transfers[nt].special = 1;
transfers[nt].special.pipeclose = 1;
cur->pipe_rclosed = true;
close(cur->pipe_fd);
cur->pipe_fd = -2;
......@@ -534,34 +667,30 @@ void collect_update(struct shadow_fd *cur, int *ntransfers,
void apply_update(struct fd_translation_map *map, const struct transfer *transf)
{
struct shadow_fd *cur = map->list;
bool found = false;
while (cur) {
if (cur->remote_id == transf->obj_id) {
found = true;
break;
}
cur = cur->next;
}
if (found) {
struct shadow_fd *cur = get_shadow_for_rid(map, transf->obj_id);
if (cur) {
if (cur->type == FDC_FILE) {
if (transf->type != cur->type) {
wp_log(WP_ERROR, "Transfer type mismatch %d %d",
transf->type, cur->type);
}
const char *act_buffer = NULL;
size_t act_size = 0;
uncompress_buffer(map, transf->size, transf->data,
transf->special.file_actual_size,
cur->compress_buffer, &act_size,
&act_buffer);
// `memsize+8` is the worst-case diff expansion
if (transf->size > cur->file_size + 8) {
if (act_size > cur->file_size + 8) {
wp_log(WP_ERROR,
"Transfer size mismatch %ld %ld",
transf->size, cur->file_size);
act_size, cur->file_size);
}
apply_diff(cur->file_size, cur->file_mem_mirror,
transf->size, transf->data);
apply_diff(cur->file_size, cur->mem_mirror, act_size,
act_buffer);
apply_diff(cur->file_size, cur->file_mem_local,