...
 
Commits (8)
......@@ -14,14 +14,14 @@ application forwarding similar to `ssh -X` [1] feasible.
a user-friendly command line pattern which prefixes a call to `ssh` and
automatically sets up a reverse tunnel for protocol data. For example,
waypipe ssh -C user@theserver weston-terminal
waypipe ssh user@theserver weston-terminal
will run `ssh`, connect to `theserver`, and remotely run `weston-terminal`,
using local and remote `waypipe` processes to synchronize the shared memory
buffers used by Wayland clients between both computers. Command line arguments
before `ssh` apply only to `waypipe`; those after `ssh` belong to `ssh`.
Alternatively, one can set up the local and remote processes by hand, with the
Alternatively, one can launch the local and remote processes by hand, with the
following set of shell commands:
/usr/bin/waypipe -s /tmp/socket-local client &
......@@ -39,24 +39,27 @@ For a more detailed example, see the man page.
Build with meson[0]. A typical incantation is
cd /path/to/waypipe/..
cd /path/to/waypipe/ && cd ..
mkdir build-waypipe
meson --buildtype debugoptimized waypipe build-waypipe
ninja -C build-waypipe install
Requirements:
Core build requirements:
* meson (build, >= 0.47. with dependencies `ninja`, `pkg-config`, `python3`)
* wayland (build, >= 1.10 for the `wl_surface::damage_buffer` request)
* wayland-protocols (build, >= 1.12, for the xdg-shell protocol, and others)
* liblz4 (optional, >=1.7.0)
* libzstd (optional, >= 0.4.6)
* libgbm (optional, to support programs using OpenGL via DMABUFs)
* libdrm (optional, same as for libgbm)
* ffmpeg (optional, >=3.1, needs avcodec/avutil/swscale for lossy video encoding)
* libva (optional, for hardware video encoding and decoding)
* scdoc (optional, to generate a man page)
* sys/sdt.h (optional, to provide static tracepoints for profiling)
Optional dependencies:
* liblz4 (for fast compression, >=1.7.0)
* libzstd (for slower compression, >= 0.4.6)
* libgbm (to support programs using OpenGL via DMABUFs)
* libdrm (same as for libgbm)
* ffmpeg (>=3.1, needs avcodec/avutil/swscale for lossy video encoding)
* libva (for hardware video encoding and decoding)
* scdoc (to generate a man page)
* sys/sdt.h (to provide static tracepoints for profiling)
* ssh (runtime, OpenSSH >= 6.7, for Unix domain socket forwarding)
* libx264 (ffmpeg runtime, for software video decoding and encoding)
......@@ -65,7 +68,7 @@ Requirements:
## Status
This is usable, but very unstable right now[0]. The main development
This is usable, but still unstable right now[0]. The main development
location[1], command-line interface, wire format, and project name may
yet change completely. Bug reports and patches are always welcome.
......
......@@ -8,7 +8,7 @@ project(
'warning_level=3',
'werror=true',
],
version: '0.6.0',
version: '0.6.1',
)
# DEFAULT_SOURCE implies POSIX_C_SOURCE 200809L + extras like CMSG_LEN
......
......@@ -134,8 +134,8 @@ static int run_single_client_reconnector(
close(newclient);
break;
}
bool update = (new_conn & CONN_UPDATE) != 0;
new_conn = new_conn & ~CONN_UPDATE;
bool update = (new_conn & CONN_UPDATE_BIT) != 0;
new_conn = new_conn & ~CONN_UPDATE_BIT;
if (new_conn != conn_id) {
close(newclient);
continue;
......@@ -220,33 +220,39 @@ static int run_single_client(int channelsock, pid_t *eol_pid,
if (retcode == EXIT_FAILURE || shutdown_flag || chanclient == -1) {
return retcode;
}
if (conn_id & CONN_UPDATE) {
if (conn_id & CONN_UPDATE_BIT) {
wp_error("Initial connection token had update flag set");
return retcode;
}
/* Fork a reconnection handler */
int linkfds[2];
if (socketpair(AF_UNIX, SOCK_STREAM, 0, linkfds) == -1) {
wp_error("Failed to create socketpair: %s", strerror(errno));
close(chanclient);
return EXIT_FAILURE;
}
/* Fork a reconnection handler, only if the connection is
* reconnectable/has a nonzero id */
int linkfds[2] = {-1, -1};
if (conn_id != 0) {
if (socketpair(AF_UNIX, SOCK_STREAM, 0, linkfds) == -1) {
wp_error("Failed to create socketpair: %s",
strerror(errno));
close(chanclient);
return EXIT_FAILURE;
}
pid_t reco_pid = fork();
if (reco_pid == -1) {
wp_debug("Fork failure");
close(chanclient);
return EXIT_FAILURE;
} else if (reco_pid == 0) {
close(linkfds[0]);
close(chanclient);
close(disp_fd);
int rc = run_single_client_reconnector(
channelsock, linkfds[1], conn_id);
exit(rc);
pid_t reco_pid = fork();
if (reco_pid == -1) {
wp_debug("Fork failure");
close(chanclient);
return EXIT_FAILURE;
} else if (reco_pid == 0) {
if (linkfds[0] != -1) {
close(linkfds[0]);
}
close(chanclient);
close(disp_fd);
int rc = run_single_client_reconnector(
channelsock, linkfds[1], conn_id);
exit(rc);
}
close(linkfds[1]);
}
close(linkfds[1]);
close(channelsock);
return main_interface_loop(
......@@ -263,8 +269,8 @@ static int handle_new_client_connection(int channelsock, int chanclient,
wp_error("Failed to get connection id");
goto fail_cc;
}
if (conn_id & CONN_UPDATE) {
conn_id = conn_id & ~CONN_UPDATE;
if (conn_id & CONN_UPDATE_BIT) {
conn_id = conn_id & ~CONN_UPDATE_BIT;
for (int i = 0; i < connmap->count; i++) {
if (connmap->data[i].token == conn_id) {
if (send_one_fd(connmap->data[i].linkfd,
......@@ -279,25 +285,32 @@ static int handle_new_client_connection(int channelsock, int chanclient,
close(chanclient);
return 0;
}
bool reconnectable = conn_id != 0;
if (buf_ensure_size(connmap->count + 1, sizeof(struct conn_addr),
&connmap->size, (void **)&connmap->data) == -1) {
if (reconnectable && buf_ensure_size(connmap->count + 1,
sizeof(struct conn_addr),
&connmap->size,
(void **)&connmap->data) == -1) {
wp_error("Failed to allocate space to track connection");
goto fail_cc;
}
int linkfds[2];
if (socketpair(AF_UNIX, SOCK_STREAM, 0, linkfds) == -1) {
wp_error("Failed to create socketpair: %s", strerror(errno));
goto fail_cc;
int linkfds[2] = {-1, -1};
if (reconnectable) {
if (socketpair(AF_UNIX, SOCK_STREAM, 0, linkfds) == -1) {
wp_error("Failed to create socketpair: %s",
strerror(errno));
goto fail_cc;
}
}
pid_t npid = fork();
if (npid == 0) {
// Run forked process, with the only
// shared state being the new channel
// socket
close(channelsock);
close(linkfds[0]);
if (reconnectable) {
close(linkfds[0]);
}
for (int i = 0; i < connmap->count; i++) {
close(connmap->data[i].linkfd);
}
......@@ -316,10 +329,15 @@ static int handle_new_client_connection(int channelsock, int chanclient,
goto fail_ps;
}
// Remove connection from this process
close(linkfds[1]);
close(chanclient);
connmap->data[connmap->count++] = (struct conn_addr){
.linkfd = linkfds[0], .token = conn_id, .pid = npid};
if (reconnectable) {
close(linkfds[1]);
connmap->data[connmap->count++] =
(struct conn_addr){.linkfd = linkfds[0],
.token = conn_id,
.pid = npid};
}
return 0;
fail_ps:
......
......@@ -61,8 +61,8 @@ size_t run_interval_diff_C(const int diff_window_size,
clear_exit = true;
break;
}
size_t last_header = dc++;
diff[last_header] = (uint64_t)((i - 1) * 2);
uint32_t *ctrl_blocks = (uint32_t *)&diff[dc++];
ctrl_blocks[0] = (uint32_t)((i - 1) * 2);
diff[dc++] = changed_val;
base[i - 1] = changed_val;
// changed_val != base_val, difference occurs at early
......@@ -80,7 +80,7 @@ size_t run_interval_diff_C(const int diff_window_size,
nskip *= (base_val == changed_val);
}
dc -= nskip;
diff[last_header] |= (uint64_t)((i - nskip) * 2) << 32;
ctrl_blocks[1] = (uint32_t)((i - nskip) * 2);
/* our sentinel, at worst, causes overcopy by one. this
* is fine
*/
......@@ -88,8 +88,9 @@ size_t run_interval_diff_C(const int diff_window_size,
/* If only the last block changed */
if ((clear_exit || i_start + 1 == i_end) && changed_val != base_val) {
diff[dc++] = ((uint64_t)(i_end * 2) << 32) |
(uint64_t)((i_end - 1) * 2);
uint32_t *ctrl_blocks = (uint32_t *)&diff[dc++];
ctrl_blocks[0] = (uint32_t)(i_end - 1) * 2;
ctrl_blocks[1] = (uint32_t)i_end * 2;
diff[dc++] = changed_val;
base[i_end - 1] = changed_val;
}
......
......@@ -51,7 +51,10 @@ struct globals {
*
* chanfd: connected socket to channel
* progfd: connected socket to Wayland program
* linkfd: optional connected socket providing new chanfds */
* linkfd: optional socket providing new chanfds. (-1 means not provided)
*
* Returns either EXIT_SUCCESS or EXIT_FAILURE (if exit caused by an error.)
*/
int main_interface_loop(int chanfd, int progfd, int linkfd,
const struct main_config *config, bool display_side);
/** Act as a Wayland server */
......
This diff is collapsed.
......@@ -38,24 +38,34 @@
#include <sys/wait.h>
#include <unistd.h>
/** Generate a token with a very low accidental collision probability */
/** Generate a token with a very low accidental collision probability.
* The token produced will be a) nonzero, and b) have it's low bit
* masked out, so that 'zero' tokens can be used to denote connections
* for which resets are not possible, and tokens with the low bit set
* to denote reconnection attempts. */
static uint64_t get_random_token(uint64_t last_token)
{
struct timespec tp;
clock_gettime(CLOCK_REALTIME, &tp);
uint64_t pid = (uint64_t)getpid();
uint64_t base = last_token + 1;
base += ((uint64_t)tp.tv_sec * 1000000000uL + (uint64_t)tp.tv_nsec) *
0x1000uL;
base += pid;
/* /dev/urandom isn't always available, e.g., when using chroot */
int devrand = open("/dev/urandom", O_RDONLY);
if (devrand != -1) {
errno = 0;
uint64_t offset = 0;
(void)read(devrand, &offset, sizeof(offset));
close(devrand);
base += offset;
uint64_t base = 0;
while (base == 0) {
struct timespec tp;
clock_gettime(CLOCK_REALTIME, &tp);
uint64_t pid = (uint64_t)getpid();
base = last_token + 1;
base += ((uint64_t)tp.tv_sec * 1000000000uL +
(uint64_t)tp.tv_nsec) *
0x1000uL;
base += pid;
/* /dev/urandom isn't always available, e.g., when using chroot
*/
int devrand = open("/dev/urandom", O_RDONLY);
if (devrand != -1) {
errno = 0;
uint64_t offset = 0;
(void)read(devrand, &offset, sizeof(offset));
close(devrand);
base += offset;
}
base = base & ~1uLL;
}
return base;
}
......@@ -109,7 +119,8 @@ static int run_single_server_reconnector(
/* Socket path was invalid */
continue;
}
uint64_t flagged_token = token | CONN_UPDATE;
uint64_t flagged_token =
token | CONN_UPDATE_BIT;
if (write(new_conn, &flagged_token,
sizeof(flagged_token)) !=
sizeof(flagged_token)) {
......@@ -144,22 +155,25 @@ static int run_single_server(int control_pipe, const char *socket_path,
if (unlink_at_end) {
unlink(socket_path);
}
bool reconnectable = control_pipe != -1;
uint64_t token = get_random_token(0);
uint64_t unflagged_token = token & ~CONN_UPDATE;
if (write(chanfd, &unflagged_token, sizeof(uint64_t)) !=
sizeof(uint64_t)) {
wp_error("Failed to write connection token to socket");
goto fail_cfd;
uint64_t token = 0;
if (reconnectable) {
token = get_random_token(0);
}
int linkfds[2];
if (socketpair(AF_UNIX, SOCK_STREAM, 0, linkfds) == -1) {
wp_error("Failed to create socketpair: %s", strerror(errno));
if (write(chanfd, &token, sizeof(uint64_t)) != sizeof(uint64_t)) {
wp_error("Failed to write connection token to socket");
goto fail_cfd;
}
int linkfds[2] = {-1, -1};
if (control_pipe != -1) {
if (socketpair(AF_UNIX, SOCK_STREAM, 0, linkfds) == -1) {
wp_error("Failed to create socketpair: %s",
strerror(errno));
goto fail_cfd;
}
pid_t reco_pid = fork();
if (reco_pid == -1) {
wp_debug("Fork failure");
......@@ -178,13 +192,8 @@ static int run_single_server(int control_pipe, const char *socket_path,
close(linkfds[1]);
}
/* If there is no reconnection process, the file descriptor linkfds[1]
* is kept alive in this process to avoid hangup spam */
int ret = main_interface_loop(
chanfd, server_link, linkfds[0], config, false);
if (control_pipe == -1) {
close(linkfds[1]);
}
return ret;
fail_cfd:
......@@ -199,29 +208,35 @@ static int handle_new_server_connection(const char *current_sockpath,
struct conn_map *connmap, const struct main_config *config,
uint64_t new_token)
{
if (buf_ensure_size(connmap->count + 1, sizeof(struct conn_addr),
&connmap->size, (void **)&connmap->data) == -1) {
bool reconnectable = control_pipe != -1;
if (reconnectable && buf_ensure_size(connmap->count + 1,
sizeof(struct conn_addr),
&connmap->size,
(void **)&connmap->data) == -1) {
wp_error("Failed to allocate memory to track new connection");
goto fail_appfd;
}
if (!reconnectable) {
new_token = 0;
}
int chanfd = connect_to_socket(current_sockpath);
if (chanfd == -1) {
goto fail_appfd;
}
uint64_t unflagged_token = new_token & ~CONN_UPDATE;
if (write(chanfd, &unflagged_token, sizeof(uint64_t)) !=
sizeof(uint64_t)) {
if (write(chanfd, &new_token, sizeof(uint64_t)) != sizeof(uint64_t)) {
wp_error("Failed to write connection token: %s",
strerror(errno));
goto fail_chanfd;
}
int linksocks[2];
if (socketpair(AF_UNIX, SOCK_STREAM, 0, linksocks) == -1) {
wp_error("Socketpair for process link failed: %s",
strerror(errno));
goto fail_chanfd;
int linksocks[2] = {-1, -1};
if (reconnectable) {
if (socketpair(AF_UNIX, SOCK_STREAM, 0, linksocks) == -1) {
wp_error("Socketpair for process link failed: %s",
strerror(errno));
goto fail_chanfd;
}
}
pid_t npid = fork();
......@@ -229,31 +244,39 @@ static int handle_new_server_connection(const char *current_sockpath,
// Run forked process, with the only shared state being the
// new channel socket
close(wdisplay_socket);
close(control_pipe);
close(linksocks[0]);
if (reconnectable) {
close(control_pipe);
close(linksocks[0]);
}
for (int i = 0; i < connmap->count; i++) {
close(connmap->data[i].linkfd);
if (connmap->data[i].linkfd != -1) {
close(connmap->data[i].linkfd);
}
}
int rc = main_interface_loop(
chanfd, appfd, linksocks[1], config, false);
exit(rc);
} else if (npid == -1) {
wp_debug("Fork failure");
close(linksocks[0]);
close(linksocks[1]);
if (reconnectable) {
close(linksocks[0]);
close(linksocks[1]);
}
goto fail_chanfd;
}
// This process no longer needs the application connection
close(chanfd);
close(appfd);
close(linksocks[1]);
if (reconnectable) {
close(linksocks[1]);
connmap->data[connmap->count++] = (struct conn_addr){
.token = new_token,
.pid = npid,
.linkfd = linksocks[0],
};
connmap->data[connmap->count++] = (struct conn_addr){
.token = new_token,
.pid = npid,
.linkfd = linksocks[0],
};
}
return 0;
fail_chanfd:
......@@ -274,7 +297,8 @@ static int update_connections(char current_sockpath[static 110],
path, strerror(errno));
return -1;
}
uint64_t flagged_token = connmap->data[i].token | CONN_UPDATE;
uint64_t flagged_token =
connmap->data[i].token | CONN_UPDATE_BIT;
if (write(chanfd, &flagged_token, sizeof(uint64_t)) !=
sizeof(uint64_t)) {
wp_error("Failed to write token to replacement connection: %s",
......
......@@ -78,7 +78,7 @@ int setup_nb_socket(const char *socket_path, int nmaxclients);
* successful, else -1.*/
int connect_to_socket(const char *socket_path);
/** A type to help keep track of the connection handling processes */
#define CONN_UPDATE 0x1uLL
#define CONN_UPDATE_BIT 0x1uLL
struct conn_addr {
uint64_t token;
pid_t pid;
......
......@@ -44,7 +44,6 @@
struct copy_setup {
int conn;
int wayl;
int link;
bool is_display_side;
struct main_config *mc;
};
......@@ -52,7 +51,7 @@ struct copy_setup {
static void *start_looper(void *data)
{
struct copy_setup *setup = (struct copy_setup *)data;
main_interface_loop(setup->conn, setup->wayl, setup->link, setup->mc,
main_interface_loop(setup->conn, setup->wayl, -1, setup->mc,
setup->is_display_side);
return NULL;
}
......@@ -90,12 +89,10 @@ int main(int argc, char **argv)
close(fd);
printf("Loaded %" PRId64 " bytes\n", len);
int srv_fds[2], cli_fds[2], conn_fds[2], srv_links[2], cli_links[2];
int srv_fds[2], cli_fds[2], conn_fds[2];
if (socketpair(AF_UNIX, SOCK_STREAM, 0, srv_fds) == -1 ||
socketpair(AF_UNIX, SOCK_STREAM, 0, cli_fds) == -1 ||
socketpair(AF_UNIX, SOCK_STREAM, 0, conn_fds) == -1 ||
socketpair(AF_UNIX, SOCK_STREAM, 0, srv_links) == -1 ||
socketpair(AF_UNIX, SOCK_STREAM, 0, cli_links) == -1) {
socketpair(AF_UNIX, SOCK_STREAM, 0, conn_fds) == -1) {
printf("Socketpair failed\n");
return EXIT_FAILURE;
}
......@@ -115,12 +112,10 @@ int main(int argc, char **argv)
pthread_t thread_a, thread_b;
struct copy_setup server_conf = {.conn = conn_fds[0],
.wayl = srv_fds[1],
.link = srv_links[1],
.is_display_side = true,
.mc = &config};
struct copy_setup client_conf = {.conn = conn_fds[1],
.wayl = cli_fds[1],
.link = cli_links[1],
.is_display_side = false,
.mc = &config};
if (pthread_create(&thread_a, NULL, start_looper, &server_conf) == -1) {
......@@ -287,9 +282,6 @@ int main(int argc, char **argv)
pthread_join(thread_a, NULL);
pthread_join(thread_b, NULL);
close(srv_links[0]);
close(cli_links[0]);
free(buf);
free(ignore_buf);
return EXIT_SUCCESS;
......
......@@ -44,7 +44,6 @@
struct copy_setup {
int conn;
int wayl;
int link;
bool is_display_side;
struct main_config *mc;
};
......@@ -52,7 +51,7 @@ struct copy_setup {
static void *start_looper(void *data)
{
struct copy_setup *setup = (struct copy_setup *)data;
main_interface_loop(setup->conn, setup->wayl, setup->link, setup->mc,
main_interface_loop(setup->conn, setup->wayl, -1, setup->mc,
setup->is_display_side);
return NULL;
}
......@@ -106,10 +105,9 @@ int main(int argc, char **argv)
}
printf("Loaded %zu bytes\n", len);
int way_fds[2], conn_fds[2], link_fds[2];
int way_fds[2], conn_fds[2];
if (socketpair(AF_UNIX, SOCK_STREAM, 0, way_fds) == -1 ||
socketpair(AF_UNIX, SOCK_STREAM, 0, conn_fds) == -1 ||
socketpair(AF_UNIX, SOCK_STREAM, 0, link_fds) == -1) {
socketpair(AF_UNIX, SOCK_STREAM, 0, conn_fds) == -1) {
printf("Socketpair failed\n");
return EXIT_FAILURE;
}
......@@ -129,7 +127,6 @@ int main(int argc, char **argv)
pthread_t thread;
struct copy_setup conf = {.conn = conn_fds[1],
.wayl = way_fds[1],
.link = link_fds[1],
.is_display_side = display_side,
.mc = &config};
if (pthread_create(&thread, NULL, start_looper, &conf) == -1) {
......@@ -295,7 +292,6 @@ int main(int argc, char **argv)
}
close(conn_fds[0]);
close(way_fds[0]);
close(link_fds[0]);
pthread_join(thread, NULL);
......
......@@ -98,6 +98,19 @@ struct wire_test {
int nwords;
};
static inline uint32_t pack_u32(uint8_t a0, uint8_t a1, uint8_t a2, uint8_t a3)
{
union {
uint8_t s[4];
uint32_t v;
} u;
u.s[0] = a0;
u.s[1] = a1;
u.s[2] = a2;
u.s[3] = a3;
return u.v;
}
log_handler_func_t log_funcs[2] = {test_log_handler, test_log_handler};
int main(int argc, char **argv)
{
......@@ -126,25 +139,46 @@ int main(int argc, char **argv)
struct wire_test tests[] = {
{call_xtype_req_blue, &intf_xtype.funcs[0][0], {7771},
{8, 0x61626162, 0x00616263, 4441,
yobj.obj_id, 3331, 4442,
xobj.obj_id, 0, 4443},
{8, pack_u32(0x62, 0x61, 0x62, 0x61),
pack_u32(0x63, 0x62,
0x61,
0),
4441, yobj.obj_id, 3331,
4442, xobj.obj_id, 0,
4443},
1, 10},
{call_xtype_evt_yellow, &intf_xtype.funcs[1][0], {0},
{4441}, 0, 1},
{call_ytype_req_green, &intf_ytype.funcs[0][0], {7771},
{4441, 4, 0x00616562, 0, 5, 0x63626263,
0x99999900, xobj.obj_id,
8, 0x80818081,
0x99999990},
{4441, 4, pack_u32(0x62, 0x65, 0x61, 0),
0, 5,
pack_u32(0x63, 0x62,
0x62,
0x63),
pack_u32(0, 0x99, 0x99,
0x99),
xobj.obj_id, 8,
pack_u32(0x81, 0x80,
0x81,
0x80),
pack_u32(0x90, 0x99,
0x99,
0x99)},
1, 11},
{call_ytype_evt_red, &intf_ytype.funcs[1][0],
{8881, 8882, 8883},
{7770, 33330, 7771, 33331, 33332, 7773,
33333, 44440, 6,
0x62616362, 0x99990061,
3, 0x11808080, 99990, 0,
yobj.obj_id,
pack_u32(0x62, 0x63,
0x61,
0x62),
pack_u32(0x61, 0, 0x99,
0x99),
3,
pack_u32(0x80, 0x80,
0x80,
0x11),
99990, 0, yobj.obj_id,
xobj.obj_id},
3, 17}};
......