Commit 4cee0bda authored by Manuel Stoeckl's avatar Manuel Stoeckl
Browse files

Introduce simple fd replication

parent 90c9f182
# todo, replace with meson
flags=-ggdb3 -fsanitize=address
way_libs := $(shell pkg-config --libs wayland-client wayland-server)
way_cflags := $(shell pkg-config --cflags wayland-client wayland-server)
all: waypipe
waypipe: waypipe.c server.c client.c util.c util.h Makefile
gcc -std=c11 -ggdb3 $(way_libs) $(way_cflags) -o waypipe waypipe.c server.c client.c util.h util.c
gcc $(flags) -std=c11 $(way_libs) $(way_cflags) -lrt -o waypipe waypipe.c server.c client.c util.h util.c
clean:
rm -f waypipe
......
......@@ -79,140 +79,144 @@ int run_client(const char *socket_path)
return EXIT_FAILURE;
}
int maxmsg = 4096;
char *buffer = calloc(1, maxmsg + 1);
wp_log(WP_DEBUG, "I'm a client on %s!\n", socket_path);
for (int i = 0; i < 1; i++) {
// Q: multiple parallel remote client support? then multiplex
// over all accepted clients?
int chanclient = accept(channelsock, NULL, NULL);
if (chanclient == -1) {
wp_log(WP_DEBUG, "Skipping connection\n");
continue;
}
// Q: multiple parallel remote client support? then multiplex
// over all accepted clients?
// TODO: fork the client on each acceptance, and have each forked
// version connect separately to the Wayland server. (At the very least,
// this will be necessary to ensure distinct pids for each client)
int chanclient = accept(channelsock, NULL, NULL);
if (chanclient == -1) {
wp_log(WP_DEBUG, "First connection failed\n");
return EXIT_FAILURE;
}
int bufsize = 4096;
char *buf = calloc(bufsize + 1, 1);
while (1) {
// pselect multiple.
fd_set readfds;
FD_ZERO(&readfds);
FD_SET(chanclient, &readfds);
FD_SET(displayfd, &readfds);
struct timespec timeout = {
.tv_sec = 0, .tv_nsec = 700000000L};
int maxfd = chanclient > displayfd ? chanclient
: displayfd;
int r = pselect(maxfd + 1, &readfds, NULL, NULL,
&timeout, NULL);
if (r == -1) {
wp_log(WP_ERROR, "Select failed, stopping\n");
struct fd_translation_map fdtransmap = {
.local_sign = 1, .list = NULL, .max_local_id = 1};
const int maxmsg = 4096;
char *buffer = calloc(1, maxmsg + 1);
while (1) {
// pselect multiple.
fd_set readfds;
FD_ZERO(&readfds);
FD_SET(chanclient, &readfds);
FD_SET(displayfd, &readfds);
struct timespec timeout = {.tv_sec = 0, .tv_nsec = 700000000L};
int maxfd = chanclient > displayfd ? chanclient : displayfd;
int r = pselect(maxfd + 1, &readfds, NULL, NULL, &timeout,
NULL);
if (r == -1) {
wp_log(WP_ERROR, "Select failed, stopping\n");
break;
}
wp_log(WP_DEBUG, "Post select %d %d %d\n", r,
FD_ISSET(chanclient, &readfds),
FD_ISSET(displayfd, &readfds));
if (FD_ISSET(chanclient, &readfds)) {
wp_log(WP_DEBUG, "client isset\n");
struct muxheader header;
int hr = read(chanclient, &header,
sizeof(struct muxheader));
if (hr >= 0 && hr < sizeof(struct muxheader)) {
wp_log(WP_ERROR,
"channel client connection closed %d\n",
hr);
break;
}
wp_log(WP_DEBUG, "Post select %d %d %d\n", r,
FD_ISSET(chanclient, &readfds),
FD_ISSET(displayfd, &readfds));
if (FD_ISSET(chanclient, &readfds)) {
wp_log(WP_DEBUG, "client isset\n");
struct muxheader header;
if (read(chanclient, &header,
sizeof(struct muxheader)) <
sizeof(struct muxheader)) {
wp_log(WP_ERROR,
"FD header read failure: %s\n",
strerror(errno));
break;
}
char *tmpbuf = calloc(header.length, 1);
int nread = 0;
while (nread < header.length) {
int nr = read(chanclient,
tmpbuf + nread,
header.length - nread);
if (nr <= 0) {
break;
}
nread += nr;
}
if (nread < header.length) {
wp_log(WP_ERROR,
"FD body read failure %ld/%ld: %s\n",
nread, header.length,
strerror(errno));
if (hr == -1) {
wp_log(WP_ERROR, "FD header read failure: %s\n",
strerror(errno));
break;
}
char *tmpbuf = calloc(header.length, 1);
int nread = 0;
while (nread < header.length) {
int nr = read(chanclient, tmpbuf + nread,
header.length - nread);
if (nr <= 0) {
break;
}
nread += nr;
}
if (nread < header.length) {
wp_log(WP_ERROR,
"FD body read failure %ld/%ld: %s\n",
nread, header.length,
strerror(errno));
break;
}
wp_log(WP_DEBUG, "read bytes: %d = %d\n", nread,
header.length);
int wc = iovec_write(displayfd, tmpbuf, nread,
NULL, NULL);
if (wc == -1) {
wp_log(WP_ERROR,
"FD Write failure %ld: %s\n",
wc, strerror(errno));
break;
}
free(tmpbuf);
wp_log(WP_DEBUG, "client done\n");
wp_log(WP_DEBUG, "read bytes: %d = %d\n", nread,
header.length);
int wc = iovec_write(displayfd, tmpbuf, (size_t)nread,
NULL, 0);
if (wc == -1) {
wp_log(WP_ERROR, "FD Write failure %ld: %s\n",
wc, strerror(errno));
break;
}
if (FD_ISSET(displayfd, &readfds)) {
wp_log(WP_DEBUG, "displayfd isset\n");
int fdbuf[28];
int nfds = 28;
int rc = iovec_read(displayfd, buffer, maxmsg,
fdbuf, &nfds);
if (rc == -1) {
free(tmpbuf);
wp_log(WP_DEBUG, "client done\n");
}
if (FD_ISSET(displayfd, &readfds)) {
wp_log(WP_DEBUG, "displayfd isset\n");
int fdbuf[28];
int nfds = 28;
int rc = iovec_read(displayfd, buffer, maxmsg, fdbuf,
&nfds);
if (rc == -1) {
wp_log(WP_ERROR, "CS Read failure %ld: %s\n",
rc, strerror(errno));
break;
}
if (rc > 0) {
int ids[28];
translate_fds(&fdtransmap, nfds, fdbuf, ids);
int ntransfers;
struct transfer transfers[50];
collect_updates(&fdtransmap, &ntransfers,
transfers);
char *msg = NULL;
size_t msglen;
pack_pipe_message(&msglen, &msg, rc, buffer,
nfds, ids, ntransfers,
transfers);
wp_log(WP_DEBUG,
"Packed message size (%d fds): %ld\n",
nfds, msglen);
if (write(chanclient, msg, msglen) == -1) {
free(msg);
wp_log(WP_ERROR,
"CS Read failure %ld: %s\n",
rc, strerror(errno));
break;
}
if (rc > 0) {
if (nfds > 0) {
for (int i = 0; i < nfds; i++) {
wp_log(WP_DEBUG,
"Got FD = %d\n",
fdbuf[i]);
identify_fd(fdbuf[i]);
}
}
struct muxheader header = {
.metadata = 0,
.length = rc};
if (write(chanclient, &header,
sizeof(header)) ==
-1) {
wp_log(WP_ERROR,
"CS write header failure: %s\n",
strerror(errno));
break;
}
if (write(chanclient, buffer, rc) ==
-1) {
wp_log(WP_ERROR,
"CS write body failure: %s\n",
strerror(errno));
break;
}
} else {
wp_log(WP_DEBUG,
"the display shut down\n");
"CS msg write failure: %s\n",
strerror(errno));
break;
}
free(msg);
} else {
wp_log(WP_DEBUG, "the display shut down\n");
break;
}
}
close(chanclient);
wp_log(WP_DEBUG, "...\n");
}
cleanup_translation_map(&fdtransmap);
free(buffer);
close(chanclient);
wp_log(WP_DEBUG, "...\n");
wp_log(WP_DEBUG, "Closing client\n");
close(displayfd);
close(channelsock);
unlink(socket_path);
wl_display_disconnect(display);
return EXIT_SUCCESS;
}
......@@ -74,12 +74,12 @@ int run_server(const char *socket_path, int app_argc, char *const app_argv[])
return EXIT_FAILURE;
}
struct wl_display *display = wl_display_create();
if (wl_display_add_socket_fd(display, csockpair[0]) == -1) {
wp_log(WP_ERROR, "Failed to add socket to display object\n");
wl_display_destroy(display);
return EXIT_FAILURE;
}
// struct wl_display *display = wl_display_create();
// if (wl_display_add_socket_fd(display, csockpair[0]) == -1) {
// wp_log(WP_ERROR, "Failed to add socket to display
// object\n"); wl_display_destroy(display); return
// EXIT_FAILURE;
// }
int status;
......@@ -117,17 +117,16 @@ int run_server(const char *socket_path, int app_argc, char *const app_argv[])
* csockpair[0] -> fd
* 1 second timer (poll waitpid) */
struct timespec timeout = {.tv_sec = 0, .tv_nsec = 500000000L};
fd_set readfds;
int iter = 0;
int maxmsg = 4096;
char *buffer = calloc(1, maxmsg + 1);
struct fd_translation_map fdtransmap = {
.local_sign = -1, .list = NULL, .max_local_id = 1};
while (true) {
iter++;
if (iter > 10) {
break;
}
fd_set readfds;
FD_ZERO(&readfds);
FD_SET(channelfd, &readfds);
FD_SET(client_socket, &readfds);
......@@ -140,45 +139,58 @@ int run_server(const char *socket_path, int app_argc, char *const app_argv[])
strerror(errno));
return EXIT_FAILURE;
}
if (r == 0) {
// timeout!
wp_log(WP_DEBUG, "timeout,?? \n");
} else {
wp_log(WP_DEBUG, "%d are set\n", r);
}
wp_log(WP_DEBUG,
"Post select %d channelfd=%d client_socket=%d\n",
r, FD_ISSET(channelfd, &readfds),
FD_ISSET(client_socket, &readfds));
if (FD_ISSET(channelfd, &readfds)) {
wp_log(WP_DEBUG, "Readfd isset\n");
struct muxheader header;
if (read(channelfd, &header, sizeof(struct muxheader)) <
sizeof(struct muxheader)) {
ssize_t nbytes;
if (read(channelfd, &nbytes, sizeof(ssize_t)) <
(ssize_t)sizeof(ssize_t)) {
wp_log(WP_ERROR, "FD header read failure: %s\n",
strerror(errno));
break;
}
char *tmpbuf = calloc(header.length, 1);
int nread = 0;
while (nread < header.length) {
int nr = read(channelfd, tmpbuf + nread,
header.length - nread);
char *tmpbuf = calloc(nbytes, 1);
ssize_t nread = 0;
while (nread < nbytes) {
ssize_t nr = read(channelfd, tmpbuf + nread,
nbytes - nread);
if (nr <= 0) {
break;
}
nread += nr;
}
if (nread < header.length) {
if (nread < nbytes) {
wp_log(WP_ERROR,
"FD body read failure %ld/%ld: %s\n",
nread, header.length,
strerror(errno));
nread, nbytes, strerror(errno));
break;
}
char *waymsg;
int waylen;
int nids;
int ids[28];
int ntransfers;
struct transfer transfers[50];
unpack_pipe_message((size_t)nbytes, tmpbuf, &waylen,
&waymsg, &nids, ids, &ntransfers,
transfers);
apply_updates(&fdtransmap, ntransfers, transfers);
int fds[28];
memset(fds, 0, sizeof(fds));
untranslate_ids(&fdtransmap, nids, ids, fds);
wp_log(WP_DEBUG, "Read from conn %d = %d bytes\n",
nread, header.length);
int wc = iovec_write(client_socket, tmpbuf, nread, NULL,
NULL);
nread, nbytes);
int wc = iovec_write(client_socket, waymsg, waylen, fds,
nids);
if (wc == -1) {
wp_log(WP_ERROR, "FD Write failure %ld: %s\n",
wp_log(WP_ERROR, "FD Write failure %d: %s\n",
wc, strerror(errno));
break;
}
......@@ -219,9 +231,12 @@ int run_server(const char *socket_path, int app_argc, char *const app_argv[])
break;
}
}
cleanup_translation_map(&fdtransmap);
close(channelfd);
free(buffer);
// todo: scope manipulation, to ensure all cleanups are done
wp_log(WP_DEBUG, "Waiting for child process\n");
waitpid(pid, &status, 0);
wp_log(WP_DEBUG, "Program ended\n");
return EXIT_SUCCESS;
......
......@@ -8,7 +8,7 @@ program=`which demo.py`
($root/waypipe -d client /tmp/socket-client 2>&1 | sed 's/.*/\x1b[33m&\x1b[0m/') &
# ssh-to-self; should have a local keypair set up
(ssh -R/tmp/socket-server:/tmp/socket-client localhost $root/waypipe server -d /tmp/socket-server -- $program) 2>&1 | sed 's/.*/\x1b[34m&\x1b[0m/'
(ssh -R/tmp/socket-server:/tmp/socket-client localhost $root/waypipe server -d /tmp/socket-server -- $program) 2>&1 | sed 's/.*/\x1b[35m&\x1b[0m/'
kill %1
rm -f /tmp/socket-client
rm -f /tmp/socket-server
......@@ -28,6 +28,8 @@
#include "util.h"
#include <errno.h>
#include <fcntl.h>
#include <stdbool.h>
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
......@@ -40,7 +42,7 @@
#include <time.h>
#include <unistd.h>
log_cat_t wp_loglevel = WP_ERROR;
log_cat_t waypipe_loglevel = WP_ERROR;
const char *static_timestamp(void)
{
......@@ -92,9 +94,9 @@ int iovec_read(int conn, char *buf, size_t buflen, int *fds, int *numfds)
}
return ret;
}
int iovec_write(int conn, char *buf, size_t buflen, int *fds, int *numfds)
int iovec_write(int conn, const char *buf, size_t buflen, const int *fds,
int numfds)
{
// char cmsgdata[ (CMSG_LEN(28 * sizeof(int32_t))) ];
struct iovec the_iovec;
the_iovec.iov_len = buflen;
the_iovec.iov_base = buf;
......@@ -106,34 +108,295 @@ int iovec_write(int conn, char *buf, size_t buflen, int *fds, int *numfds)
msg.msg_control = NULL;
msg.msg_controllen = 0;
msg.msg_flags = 0;
ssize_t ret = sendmsg(conn, &msg, 0);
// parse FDS
union {
char buf[CMSG_SPACE(sizeof(int) * 28)];
struct cmsghdr align;
} uc;
memset(uc.buf, 0, sizeof(uc.buf));
if (numfds > 0) {
msg.msg_control = uc.buf;
msg.msg_controllen = sizeof(uc.buf);
struct cmsghdr *frst = CMSG_FIRSTHDR(&msg);
frst->cmsg_level = SOL_SOCKET;
frst->cmsg_type = SCM_RIGHTS;
memcpy(CMSG_DATA(frst), fds, numfds * sizeof(int));
frst->cmsg_len = CMSG_LEN(numfds * sizeof(int));
msg.msg_controllen = CMSG_SPACE(numfds * sizeof(int));
wp_log(WP_DEBUG, "Writing %d fds to cmsg data\n", numfds);
}
ssize_t ret = sendmsg(conn, &msg, 0);
return ret;
}
void identify_fd(int fd)
void cleanup_translation_map(struct fd_translation_map *map)
{
struct stat fsdata;
memset(&fsdata, 0, sizeof(fsdata));
int ret = fstat(fd, &fsdata);
if (ret == -1) {
wp_log(WP_ERROR, "Failed to identify %d as a file: %s\n", fd,
strerror(errno));
} else {
wp_log(WP_DEBUG, "The filedesc %d is a file, of size %d!\n", fd,
fsdata.st_size);
// then we can open the file, read the contents, create a mirror
// file, make diffs, and transfer them out of band!
// memmap & clone, assuming that the file will not be resized.
char *data = mmap(NULL, fsdata.st_size, PROT_READ, MAP_SHARED,
fd, 0);
if (!data) {
wp_log(WP_ERROR, "Mmap failed!\n");
struct shadow_fd *cur = map->list;
map->list = NULL;
while (cur) {
struct shadow_fd *shadow = cur;
close(shadow->fd_local);
if (shadow->memsize != (size_t)-1) {
munmap(shadow->mem_local, shadow->memsize);
free(shadow->mem_mirror);
}
if (shadow->shm_buf_name[0]) {
shm_unlink(shadow->shm_buf_name);
}
cur = shadow->next;
shadow->next = NULL;
free(shadow);
}
}
void translate_fds(struct fd_translation_map *map, int nfds, const int fds[],
int ids[])
{
for (int i = 0; i < nfds; i++) {
struct shadow_fd *cur = map->list;
int the_fd = fds[i];
bool found = false;
while (cur) {
if (cur->fd_local == the_fd) {
ids[i] = cur->remote_id;
found = true;
break;
}
cur = cur->next;
}
if (found) {
continue;
}
// Create a new translation map.
struct shadow_fd *shadow = calloc(1, sizeof(struct shadow_fd));
shadow->next = map->list;
map->list = shadow;
shadow->fd_local = the_fd;
shadow->mem_local = NULL;
shadow->mem_mirror = NULL;
shadow->memsize = (size_t)-1;
shadow->remote_id = (map->max_local_id++) * map->local_sign;
ids[i] = shadow->remote_id;
wp_log(WP_DEBUG, "Creating new shadow buffer for local fd %d\n",
the_fd);
struct stat fsdata;
memset(&fsdata, 0, sizeof(fsdata));
int ret = fstat(the_fd, &fsdata);
if (ret != -1) {
// We have a file-like object
shadow->memsize = fsdata.st_size;
// both r/w permissions, because the size the allocates
// the memory does not always have to be the size that
// modifies it
shadow->mem_local = mmap(NULL, shadow->memsize,
PROT_READ | PROT_WRITE, MAP_SHARED,
the_fd, 0);
if (!shadow->mem_local) {
wp_log(WP_ERROR, "Mmap failed!\n");
continue;
}
// This will be created at the first transfer
shadow->mem_mirror = NULL;
} else {
wp_log(WP_ERROR, "The fd %d is not file-like\n",
the_fd);
}
}
}
void collect_updates(struct fd_translation_map *map, int *ntransfers,
struct transfer transfers[])
{
int nt = 0;
for (struct shadow_fd *cur = map->list; cur; cur = cur->next) {
if (cur->memsize == (size_t)-1) {
wp_log(WP_ERROR,
"shadowlist element not transferrable\n");
continue;
}
if (!cur->mem_mirror) {
cur->mem_mirror = calloc(cur->memsize, 1);
} else if (memcmp(cur->mem_local, cur->mem_mirror,
cur->memsize) == 0) {
continue;
}
memcpy(cur->mem_mirror, cur->mem_local, cur->memsize);
transfers[nt].data = cur->mem_mirror;
transfers[nt].size = cur->memsize;
transfers[nt].obj_id = cur->remote_id;
nt++;
}
*ntransfers = nt;
}
void pack_pipe_message(size_t *msglen, char **msg, int waylen,
const char *waymsg, int nids, const int ids[], int ntransfers,
const struct transfer transfers[])
{
// TODO: network byte order everything!
size_t size = sizeof(size_t); // including the header
size += nids * (2 * sizeof(int));
for (int i = 0; i < ntransfers; i++) {
size_t num_longs = (transfers[i].size + 7) / 8;
size += 2 * sizeof(int) + 8 * num_longs;
}
size_t waymsg_longs = (size_t)(waylen + 7) / 8;
size += 2 * sizeof(int) + 8 * waymsg_longs;
void *data = calloc(size, 1);
size_t *cursor = data;
*cursor++ = size - sizeof(size_t); // size excluding this header
for (int i = 0; i < nids; i++) {
int *sd = (int *)cursor;
sd[0] = ids[i];
sd[1] = -1;