Commit 4d42fae3 authored by Manuel Stoeckl's avatar Manuel Stoeckl
Browse files

Rewrite main proxy loop to use nonblocking sockets

Previously, the main proxy loops in `run_server_child` and
`run_client_child` alternated between making transfers from their shared
channel to their individual wayland compositor/application channels.

By using socket connections in nonblocking mode, the proxy now can
simultaneously maintain transfers in both directions. This ensures that
large transfers (such as a full-screen image) do not prevent e.g. key
release events from being transferred.

Additionally, as the two proxy loops were very similar, they have been
 combined into the `main_interface_loop` routine.

The code still does not correctly handle the case where a read from
the wayland compositor/application fills the entire read buffer and
truncates a sent message, but it is now easier to adjust for that case.
parent 1c834fdf
......@@ -67,169 +67,9 @@ static int run_client_child(int chanfd, const char *socket_path)
return EXIT_FAILURE;
}
int dispfd = wl_display_get_fd(display);
struct fd_translation_map fdtransmap = {
.local_sign = 1, .list = NULL, .max_local_id = 1};
struct message_tracker mtracker;
init_message_tracker(&mtracker);
const int maxmsg = 4096;
char *buffer = calloc(1, maxmsg + 1);
struct pollfd *pfds = NULL;
while (!shutdown_flag) {
int npoll = 2 + count_npipes(&fdtransmap);
free(pfds);
// todo: resizing logic
pfds = calloc(npoll, sizeof(struct pollfd));
pfds[0].fd = chanfd;
pfds[0].events = POLL_IN;
pfds[1].fd = dispfd;
pfds[1].events = POLL_IN;
fill_with_pipes(&fdtransmap, pfds + 2);
int r = poll(pfds, (nfds_t)npoll, -1);
if (r == -1) {
wp_log(WP_ERROR, "poll failed, stopping\n");
break;
}
mark_pipe_object_statuses(&fdtransmap, npoll - 2, pfds + 2);
if (pfds[0].revents & POLLIN) {
// chanfd
char *tmpbuf;
wp_log(WP_DEBUG, "Channel read begun\n");
ssize_t nbytes = read_size_then_buf(chanfd, &tmpbuf);
if (nbytes == 0) {
wp_log(WP_DEBUG,
"Channel read connection closed\n");
break;
}
if (nbytes == -1) {
wp_log(WP_ERROR, "Channel read failure: %s\n",
strerror(errno));
break;
}
char *waymsg = NULL;
int waylen = 0;
int nids = 0;
int ids[28];
int ntransfers = 0;
struct transfer transfers[50];
unpack_pipe_message((size_t)nbytes, tmpbuf, &waylen,
&waymsg, &nids, ids, &ntransfers,
transfers);
wp_log(WP_DEBUG,
"Read %ld byte msg, %d fds, %d transfers\n",
nbytes, nids, ntransfers);
apply_updates(&fdtransmap, ntransfers, transfers);
int fds[28];
memset(fds, 0, sizeof(fds));
untranslate_ids(&fdtransmap, nids, ids, fds);
if (waymsg) {
parse_and_prune_messages(&mtracker, &fdtransmap,
true, true, waymsg, &waylen,
fds, &nids);
}
if (waylen > 0) {
ssize_t wc = iovec_write(dispfd, waymsg, waylen,
fds, nids);
decref_transferred_fds(&fdtransmap, nids, fds);
free(tmpbuf);
if (wc == -1) {
wp_log(WP_ERROR,
"dispfd write failure %ld: %s\n",
wc, strerror(errno));
break;
}
close_local_pipe_ends(&fdtransmap);
} else {
free(tmpbuf);
}
flush_writable_pipes(&fdtransmap);
close_rclosed_pipes(&fdtransmap);
}
int ntransfers = 0;
// the wayland message is a zeroth transfer
struct transfer transfers[50];
int nfds = 0;
int ids[28];
if (pfds[1].revents & POLLIN) {
int fdbuf[28];
ssize_t rc = iovec_read(dispfd, buffer, maxmsg, fdbuf,
&nfds, 28);
if (rc == -1) {
wp_log(WP_ERROR,
"dispfd read failure %ld: %s\n",
rc, strerror(errno));
break;
}
if (rc > 0) {
translate_fds(&fdtransmap, nfds, fdbuf, ids);
int nrc = (int)rc;
parse_and_prune_messages(&mtracker, &fdtransmap,
true, false, buffer, &nrc,
fdbuf, &nfds);
rc = nrc;
}
if (rc > 0) {
transfers[0].obj_id = 0;
transfers[0].size = (size_t)rc;
transfers[0].data = buffer;
transfers[0].type = FDC_UNKNOWN;
ntransfers = 1;
} else {
wp_log(WP_DEBUG, "The display shut down\n");
break;
}
}
read_readable_pipes(&fdtransmap);
collect_updates(&fdtransmap, &ntransfers, transfers);
if (ntransfers > 0) {
char *msg = NULL;
size_t msglen;
pack_pipe_message(&msglen, &msg, nfds, ids, ntransfers,
transfers);
decref_transferred_rids(&fdtransmap, nfds, ids);
wp_log(WP_DEBUG,
"Packed message size (%d fds, %d blobs): %ld\n",
nfds, ntransfers, msglen);
if (write(chanfd, msg, msglen) == -1) {
free(msg);
wp_log(WP_ERROR, "chanfd write failure: %s\n",
strerror(errno));
break;
}
free(msg);
wp_log(WP_DEBUG, "Channel write complete\n");
}
}
free(pfds);
cleanup_translation_map(&fdtransmap);
cleanup_message_tracker(&mtracker);
free(buffer);
close(chanfd);
wp_log(WP_DEBUG, "...\n");
wp_log(WP_DEBUG, "Closing client\n");
close(dispfd);
int retcode = main_interface_loop(chanfd, dispfd, true);
wl_display_disconnect(display);
return EXIT_SUCCESS;
return retcode;
}
int run_client(const char *socket_path, bool oneshot, pid_t eol_pid)
......
......@@ -295,8 +295,9 @@ static void invoke_msg_handler(const struct wl_message *msg, bool is_event,
continue;
len_overflow:
wp_log(WP_ERROR,
"Message parse length overflow, bytes=%d/%d, fds=%d/%d, c=%c\n",
4 * i, 4 * paylen, *fds_used, fdlen, *c);
"Message '%s' parse length overflow, bytes=%d/%d, fds=%d/%d, c=%c\n",
msg->name, 4 * i, 4 * paylen, *fds_used, fdlen,
*c);
return;
}
if (i != paylen) {
......@@ -313,8 +314,9 @@ static void invoke_msg_handler(const struct wl_message *msg, bool is_event,
}
}
bool handle_message(struct message_tracker *mt, struct fd_translation_map *map,
bool display_side, bool from_client, void *data, int data_len,
enum message_action handle_message(struct message_tracker *mt,
struct fd_translation_map *map, bool display_side,
bool from_client, void *data, int data_len,
int *consumed_length, int *fds, int fds_len,
int *n_consumed_fds, bool *unidentified_changes)
{
......@@ -322,84 +324,86 @@ bool handle_message(struct message_tracker *mt, struct fd_translation_map *map,
uint32_t obj = header[0];
int meth = (int)((header[1] << 16) >> 16);
int len = (int)(header[1] >> 16);
*consumed_length = len;
if (len > data_len) {
wp_log(WP_ERROR,
"Message length overflow: %d claimed vs %d available. Keeping message, uninterpreted",
"Message length overflow: %d claimed vs %d available. Keeping message, uninterpreted\n",
len, data_len);
return false;
*consumed_length = 0;
return MESSACT_DELAY;
}
if (len < 2 * (int)sizeof(uint32_t)) {
wp_log(WP_ERROR,
"Message length underflow (%d), probably parsing error\n",
len);
*consumed_length = 0;
return MESSACT_ERROR;
}
*consumed_length = len;
// display: object = 0?
struct wp_object *objh = listset_get(&mt->objects, obj);
if (objh && objh->type) {
const struct wl_interface *intf = objh->type;
const struct wl_message *msg = NULL;
if (from_client) {
if (meth < intf->method_count && meth >= 0) {
msg = &intf->methods[meth];
} else {
wp_log(WP_DEBUG,
"Unidentified request #%d (of %d) on interface %s\n",
meth, intf->method_count,
intf->name);
}
if (!objh || !objh->type) {
wp_log(WP_DEBUG, "Unidentified object %d with %s\n", obj,
from_client ? "request" : "event");
*unidentified_changes = true;
return MESSACT_KEEP;
}
const struct wl_interface *intf = objh->type;
const struct wl_message *msg = NULL;
if (from_client) {
if (meth < intf->method_count && meth >= 0) {
msg = &intf->methods[meth];
} else {
if (meth < intf->event_count && meth >= 0) {
msg = &intf->events[meth];
} else {
wp_log(WP_ERROR,
"Unidentified event #%d on interface %s\n",
meth, intf->name);
}
wp_log(WP_DEBUG,
"Unidentified request #%d (of %d) on interface %s\n",
meth, intf->method_count, intf->name);
}
if (msg) {
const struct msg_handler *handler =
get_handler_for_interface(objh->type);
void (*fn)(void) = NULL;
if (handler) {
if (from_client && handler->request_handlers) {
fn = ((void (*const *)(
void))handler->request_handlers)
[meth];
}
if (!from_client && handler->event_handlers) {
fn = ((void (*const *)(
void))handler->event_handlers)
[meth];
}
}
struct context ctx;
ctx.mt = mt;
ctx.map = map;
ctx.obj = objh;
ctx.on_display_side = display_side;
ctx.drop_this_msg = false;
const uint32_t *payload = header + 2;
invoke_msg_handler(msg, !from_client, payload,
len / 4 - 2, &fds[*n_consumed_fds],
fds_len - *n_consumed_fds,
n_consumed_fds, fn, &ctx, mt);
// Flag set by the protocol handler function
if (ctx.drop_this_msg) {
return false; // DROP
}
*unidentified_changes = false;
} else {
if (meth < intf->event_count && meth >= 0) {
msg = &intf->events[meth];
} else {
wp_log(WP_DEBUG, "Unidentified %s from known object\n",
from_client ? "request" : "event");
*unidentified_changes = true;
wp_log(WP_ERROR,
"Unidentified event #%d on interface %s\n",
meth, intf->name);
}
} else {
wp_log(WP_DEBUG, "Unidentified object %d with %s\n", obj,
}
if (!msg) {
wp_log(WP_DEBUG, "Unidentified %s from known object\n",
from_client ? "request" : "event");
*unidentified_changes = true;
return MESSACT_KEEP;
}
*unidentified_changes = false;
const struct msg_handler *handler =
get_handler_for_interface(objh->type);
void (*fn)(void) = NULL;
if (handler) {
if (from_client && handler->request_handlers) {
fn = ((void (*const *)(
void))handler->request_handlers)[meth];
}
if (!from_client && handler->event_handlers) {
fn = ((void (*const *)(
void))handler->event_handlers)[meth];
}
}
struct context ctx;
ctx.mt = mt;
ctx.map = map;
ctx.obj = objh;
ctx.on_display_side = display_side;
ctx.drop_this_msg = false;
const uint32_t *payload = header + 2;
invoke_msg_handler(msg, !from_client, payload, len / 4 - 2,
&fds[*n_consumed_fds], fds_len - *n_consumed_fds,
n_consumed_fds, fn, &ctx, mt);
// Flag set by the protocol handler function
if (ctx.drop_this_msg) {
return MESSACT_DROP;
}
(void)fds;
(void)fds_len;
(void)n_consumed_fds;
return true; // keep
return MESSACT_KEEP;
}
......@@ -45,171 +45,7 @@
/* Closes both provided file descriptors */
static int run_server_child(int chanfd, int appfd)
{
int maxmsg = 4096;
char *buffer = calloc(1, maxmsg + 1);
struct fd_translation_map fdtransmap = {
.local_sign = -1, .list = NULL, .max_local_id = 1};
struct message_tracker mtracker;
init_message_tracker(&mtracker);
struct pollfd *pfds = NULL;
while (!shutdown_flag) {
int npoll = 2 + count_npipes(&fdtransmap);
free(pfds);
// todo: resizing logic
pfds = calloc(npoll, sizeof(struct pollfd));
pfds[0].fd = chanfd;
pfds[0].events = POLL_IN;
pfds[1].fd = appfd;
pfds[1].events = POLL_IN;
fill_with_pipes(&fdtransmap, pfds + 2);
int r = poll(pfds, (nfds_t)npoll, -1);
if (r == -1) {
if (errno == EINTR) {
wp_log(WP_ERROR,
"poll interrupted: shutdown=%c\n",
shutdown_flag ? 'Y' : 'n');
} else {
wp_log(WP_ERROR,
"poll failed due to, stopping: %s\n",
strerror(errno));
break;
}
}
mark_pipe_object_statuses(&fdtransmap, npoll - 2, pfds + 2);
if (pfds[0].revents & POLLIN) {
// chanfd
char *tmpbuf;
wp_log(WP_DEBUG, "Channel read begun\n");
ssize_t nbytes = read_size_then_buf(chanfd, &tmpbuf);
if (nbytes == 0) {
wp_log(WP_ERROR,
"Channel read connection closed\n");
break;
}
if (nbytes == -1) {
wp_log(WP_ERROR, "Channel read failure: %s\n",
strerror(errno));
break;
}
char *waymsg = NULL;
int waylen = 0;
int nids = 0;
int ids[28];
int ntransfers = 0;
struct transfer transfers[50];
unpack_pipe_message((size_t)nbytes, tmpbuf, &waylen,
&waymsg, &nids, ids, &ntransfers,
transfers);
wp_log(WP_DEBUG,
"Read %ld byte msg, %d fds, %d transfers\n",
nbytes, nids, ntransfers);
apply_updates(&fdtransmap, ntransfers, transfers);
int fds[28];
memset(fds, 0, sizeof(fds));
untranslate_ids(&fdtransmap, nids, ids, fds);
if (waymsg) {
parse_and_prune_messages(&mtracker, &fdtransmap,
false, false, waymsg, &waylen,
fds, &nids);
}
if (waymsg) {
ssize_t wc = iovec_write(appfd, waymsg, waylen,
fds, nids);
decref_transferred_fds(&fdtransmap, nids, fds);
free(tmpbuf);
if (wc == -1) {
wp_log(WP_ERROR,
"appfd write failure %ld: %s\n",
wc, strerror(errno));
break;
}
close_local_pipe_ends(&fdtransmap);
} else {
free(tmpbuf);
}
flush_writable_pipes(&fdtransmap);
close_rclosed_pipes(&fdtransmap);
}
int ntransfers = 0;
// the wayland message is a zeroth transfer
struct transfer transfers[50];
int nfds = 0;
int ids[28];
if (pfds[1].revents & POLLIN) {
int fdbuf[28];
ssize_t rc = iovec_read(appfd, buffer, maxmsg, fdbuf,
&nfds, 28);
if (rc == -1) {
wp_log(WP_ERROR, "appfd read failure %ld: %s\n",
rc, strerror(errno));
break;
}
if (rc > 0) {
translate_fds(&fdtransmap, nfds, fdbuf, ids);
int nrc = (int)rc;
parse_and_prune_messages(&mtracker, &fdtransmap,
false, true, buffer, &nrc,
fdbuf, &nfds);
rc = nrc;
}
if (rc > 0) {
wp_log(WP_DEBUG,
"appfd: read %ld byte waymsg, %d fds\n",
rc, nfds);
transfers[0].obj_id = 0;
transfers[0].size = (size_t)rc;
transfers[0].data = buffer;
transfers[0].type = FDC_UNKNOWN;
ntransfers = 1;
} else {
wp_log(WP_DEBUG, "The client shut down\n");
break;
}
}
read_readable_pipes(&fdtransmap);
collect_updates(&fdtransmap, &ntransfers, transfers);
if (ntransfers > 0) {
char *msg = NULL;
size_t msglen;
pack_pipe_message(&msglen, &msg, nfds, ids, ntransfers,
transfers);
decref_transferred_rids(&fdtransmap, nfds, ids);
wp_log(WP_DEBUG,
"Packed message size (%d fds, %d blobs): %ld\n",
nfds, ntransfers, msglen);
if (write(chanfd, msg, msglen) == -1) {
free(msg);
wp_log(WP_ERROR, "chanfd write failure: %s\n",
strerror(errno));
break;
}
free(msg);
wp_log(WP_DEBUG, "Channel write complete\n");
}
}
free(pfds);
cleanup_translation_map(&fdtransmap);
cleanup_message_tracker(&mtracker);
close(chanfd);
close(appfd);
free(buffer);
return EXIT_SUCCESS;
return main_interface_loop(chanfd, appfd, false);
}
static int connect_to_channel(const char *socket_path)
......
This diff is collapsed.
......@@ -48,6 +48,8 @@ ssize_t iovec_read(int socket, char *buf, size_t buflen, int *fds, int *numfds,
ssize_t iovec_write(int conn, const char *buf, size_t buflen, const int *fds,
int numfds);
int main_interface_loop(int chanfd, int progfd, bool display_side);
typedef enum { WP_DEBUG = 1, WP_ERROR = 2 } log_cat_t;
extern char waypipe_log_mode;
......@@ -160,9 +162,12 @@ ssize_t read_size_then_buf(int fd, char **msg);
/** Count the number of pipe fds being maintained by the translation map */
int count_npipes(const struct fd_translation_map *map);
/** Fill in pollfd entries, with POLL_IN | POLLOUT */
/** Fill in pollfd entries, with POLL_IN | POLLOUT, for applicable pipe objects.
* Specifically, if check_read is true, indicate all readable pipes.
* Also, indicate all writeable pipes for which we also something to write. */
struct pollfd;
void fill_with_pipes(const struct fd_translation_map *map, struct pollfd *pfds);
int fill_with_pipes(const struct fd_translation_map *map, struct pollfd *pfds,
bool check_read);
/** mark pipe shadows as being ready to read or write */
void mark_pipe_object_statuses(
......@@ -240,8 +245,11 @@ struct context {
/**
* Given a set of messages and fds, parse the messages, and if indicated by
* parsing logic, compact the message buffer by removing selected messages.
*
* Returns the number of trailing bytes which were originally part of a
* truncated message and should be reused for the next read cycle.
*/
void parse_and_prune_messages(struct message_tracker *mt,
int parse_and_prune_messages(struct message_tracker *mt,
struct fd_translation_map *map, bool on_display_side,
bool from_client, char *data, int *len, int *fds, int *nfds);
......@@ -253,14 +261,22 @@ struct wp_object *listset_get(struct obj_list *lst, uint32_t id);
void init_message_tracker(struct message_tracker *mt);
void cleanup_message_tracker(struct message_tracker *mt);
enum message_action {
MESSACT_KEEP,
MESSACT_DROP,
MESSACT_ERROR,
MESSACT_DELAY
};
/**
* The return value is false iff the given message should be dropped.
* The flag `unidentified_changes` is set to true if the message does
* not correspond to a known protocol.
*/
bool handle_message(struct message_tracker *mt, struct fd_translation_map *map,
bool on_display_side, bool from_client, void *data,
int data_len, int *consumed_length, int *fds, int fds_len,
enum message_action handle_message(struct message_tracker *mt,
struct fd_translation_map *map, bool on_display_side,
bool from_client, void *data, int data_len,
int *consumed_length, int *fds, int fds_len,
int *n_consumed_fds, bool *unidentified_changes);
struct wl_interface;
struct wp_object *make_wp_object(uint32_t id, const struct wl_interface *type);
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment