Commit 8f5770d4 authored by Manuel Stoeckl's avatar Manuel Stoeckl
Browse files

Only transfer bytes which have changed

parent f1e25cf6
......@@ -142,6 +142,7 @@ void cleanup_translation_map(struct fd_translation_map *map)
if (shadow->memsize != (size_t)-1) {
munmap(shadow->mem_local, shadow->memsize);
free(shadow->mem_mirror);
free(shadow->diff_buffer);
}
if (shadow->shm_buf_name[0]) {
shm_unlink(shadow->shm_buf_name);
......@@ -209,6 +210,95 @@ void translate_fds(struct fd_translation_map *map, int nfds, const int fds[],
}
}
}
/** Construct a very simple binary diff format, designed to be fast for small
* changes in big files, and entire-file changes in essentially random files.
* Tries not to read beyond the end of the input buffers, because they are often
* mmap'd.
*
* Requires that `diff` point to a memory buffer of size `size + 8`.
*/
static void construct_diff(size_t size, const char *__restrict__ base,
const char *__restrict__ changed, size_t *diffsize,
char *__restrict__ diff)
{
uint64_t nblocks = size / 8;
uint64_t *__restrict__ base_blocks = (uint64_t *)base;
uint64_t *__restrict__ changed_blocks = (uint64_t *)changed;
uint64_t *__restrict__ diff_blocks = (uint64_t *)diff;
uint64_t ntrailing = size - 8 * nblocks;
uint64_t nskip = 0, ncopy = 0;
uint64_t cursor = 0;
diff_blocks[0] = 0;
bool skipping = true;
/* we paper over gaps of a given window size, to avoid fine grained
* context switches */
const uint64_t window_size = 128;
uint64_t last_header = 0;
for (uint64_t i = 0; i < nblocks; i++) {
if (skipping) {
if (base_blocks[i] != changed_blocks[i]) {
skipping = false;
last_header = cursor++;
diff_blocks[last_header] = i << 32;
nskip = 0;
diff_blocks[cursor++] = changed_blocks[i];
ncopy = 1;
} else {
nskip++;
}
} else {
if (base_blocks[i] == changed_blocks[i]) {
nskip++;
} else {
nskip = 0;
}
if (nskip > window_size) {
skipping = true;
cursor -= (nskip - 1);
ncopy -= (nskip - 1);
diff_blocks[last_header] |= i - (nskip - 1);
ncopy = 0;
} else {
diff_blocks[cursor++] = changed_blocks[i];
ncopy++;
}
}
}
// We do not add a final 'skip' block, because the unpacking routine
if (!skipping) {
diff_blocks[last_header] |= nblocks - nskip;
}
if (ntrailing > 0) {
memcpy(&diff[cursor * 8], &changed[nblocks * 8], ntrailing);
}
*diffsize = cursor * 8 + ntrailing;
}
static void apply_diff(size_t size, char *__restrict__ base, size_t diffsize,
const char *__restrict__ diff)
{
uint64_t nblocks = size / 8;
uint64_t ndiffblocks = diffsize / 8;
uint64_t *__restrict__ base_blocks = (uint64_t *)base;
uint64_t *__restrict__ diff_blocks = (uint64_t *)diff;
uint64_t ntrailing = size - 8 * nblocks;
if (ntrailing != (diffsize - 8 * ndiffblocks)) {
wp_log(WP_ERROR, "Trailing bytes mismatch for diff.");
return;
}
for (uint64_t i = 0; i < ndiffblocks;) {
uint64_t block = diff_blocks[i];
uint64_t nfrom = block >> 32;
uint64_t nto = (block << 32) >> 32;
memcpy(base_blocks + nfrom, diff_blocks + i + 1,
8 * (nto - nfrom));
i += nto - nfrom + 1;
}
if (ntrailing > 0) {
memcpy(&base[nblocks * 8], &diff[ndiffblocks * 8], ntrailing);
}
}
void collect_updates(struct fd_translation_map *map, int *ntransfers,
struct transfer transfers[])
{
......@@ -222,15 +312,31 @@ void collect_updates(struct fd_translation_map *map, int *ntransfers,
if (!cur->mem_mirror) {
cur->mem_mirror = calloc(cur->memsize, 1);
// 8 extra bytes for worst case diff expansion
cur->diff_buffer = calloc(cur->memsize + 8, 1);
memcpy(cur->mem_mirror, cur->mem_local, cur->memsize);
// new transfer, we send file contents verbatim
transfers[nt].data = cur->mem_mirror;
transfers[nt].size = cur->memsize;
transfers[nt].obj_id = cur->remote_id;
nt++;
} else if (memcmp(cur->mem_local, cur->mem_mirror,
cur->memsize) == 0) {
continue;
}
memcpy(cur->mem_mirror, cur->mem_local, cur->memsize);
transfers[nt].data = cur->mem_mirror;
transfers[nt].size = cur->memsize;
transfers[nt].obj_id = cur->remote_id;
transfers[nt].data = cur->diff_buffer;
size_t diffsize;
wp_log(WP_DEBUG, "Diff construction start\n");
construct_diff(cur->memsize, cur->mem_mirror, cur->mem_local,
&diffsize, cur->diff_buffer);
// update mirror
apply_diff(cur->memsize, cur->mem_mirror, diffsize,
cur->diff_buffer);
wp_log(WP_DEBUG, "Diff construction end: %ld/%ld\n", diffsize,
cur->memsize);
transfers[nt].size = diffsize;
nt++;
}
*ntransfers = nt;
......@@ -356,12 +462,15 @@ static void apply_update(
cur = cur->next;
}
if (found) {
if (transf->size != cur->memsize) {
// `memsize+8` is the worst-case diff expansion
if (transf->size > cur->memsize + 8) {
wp_log(WP_ERROR, "Transfer size mismatch %ld %ld\n",
transf->size, cur->memsize);
}
memcpy(cur->mem_mirror, transf->data, transf->size);
memcpy(cur->mem_local, cur->mem_mirror, transf->size);
apply_diff(cur->memsize, cur->mem_mirror, transf->size,
transf->data);
apply_diff(cur->memsize, cur->mem_local, transf->size,
transf->data);
return;
}
......@@ -372,6 +481,7 @@ static void apply_update(
shadow->memsize = transf->size;
shadow->remote_id = transf->obj_id;
shadow->mem_mirror = calloc(shadow->memsize, 1);
// The first time only, the transfer data is a direct copy of the source
memcpy(shadow->mem_mirror, transf->data, transf->size);
sprintf(shadow->shm_buf_name, "/waypipe-data_%d", shadow->remote_id);
......
......@@ -63,8 +63,9 @@ struct shadow_fd {
int remote_id; // + if created serverside; - if created clientside
size_t memsize;
int fd_local;
char *mem_local; // mmap'd
char *mem_mirror; // malloc'd
char *mem_local; // mmap'd
char *mem_mirror; // malloc'd
char *diff_buffer; // malloc'd
char shm_buf_name[256];
};
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment