Commit 905aa839 authored by Philip Withnall's avatar Philip Withnall Committed by Olivier Crête
Browse files

tests: Update test-send-recv to support vectored I/O

parent 253be348
......@@ -81,6 +81,20 @@ typedef enum {
} BufferSizeStrategy;
#define BUFFER_SIZE_STRATEGY_N_ELEMENTS (BUFFER_SIZE_RANDOM + 1)
typedef enum {
BUFFER_COUNT_CONSTANT_ONE, /* always a single buffer */
BUFFER_COUNT_CONSTANT_TWO, /* always two buffers */
BUFFER_COUNT_RANDOM, /* random every time */
} BufferCountStrategy;
#define BUFFER_COUNT_STRATEGY_N_ELEMENTS (BUFFER_COUNT_RANDOM + 1)
typedef enum {
MESSAGE_COUNT_CONSTANT_ONE, /* always a single message */
MESSAGE_COUNT_CONSTANT_TWO, /* always two messages */
MESSAGE_COUNT_RANDOM, /* random every time */
} MessageCountStrategy;
#define MESSAGE_COUNT_STRATEGY_N_ELEMENTS (MESSAGE_COUNT_RANDOM + 1)
typedef enum {
BUFFER_DATA_CONSTANT, /* fill with 0xfe */
BUFFER_DATA_ASCENDING, /* ascending values for each byte */
......@@ -92,8 +106,16 @@ typedef struct {
/* Test configuration (immutable per test run). */
gboolean reliable;
StreamApi stream_api;
BufferSizeStrategy transmit_buffer_size_strategy;
BufferSizeStrategy receive_buffer_size_strategy;
struct {
BufferSizeStrategy buffer_size_strategy;
BufferCountStrategy buffer_count_strategy;
MessageCountStrategy message_count_strategy;
} transmit;
struct {
BufferSizeStrategy buffer_size_strategy;
BufferCountStrategy buffer_count_strategy;
MessageCountStrategy message_count_strategy;
} receive;
BufferDataStrategy buffer_data_strategy;
gsize n_bytes;
......@@ -137,11 +159,28 @@ stream_api_is_reliable_only (StreamApi stream_api)
}
}
/* Generate a size for the @buffer_index-th buffer. Guaranteed to be in
* the interval [1, 1 << 16). ((1 << 16) is the maximum message size.) */
/* Whether @stream_api supports vectored I/O (multiple buffers or messages). */
static gboolean
stream_api_supports_vectored_io (StreamApi stream_api)
{
switch (stream_api) {
case STREAM_AGENT:
case STREAM_AGENT_NONBLOCKING:
return TRUE;
case STREAM_GSOURCE:
case STREAM_GIO:
return FALSE;
default:
g_assert_not_reached ();
}
}
/* Generate a size for the buffer containing the @buffer_offset-th byte.
* Guaranteed to be in the interval [1, 1 << 16). ((1 << 16) is the maximum
* message size.) */
static gsize
generate_buffer_size (BufferSizeStrategy strategy, GRand *rand,
guint buffer_index)
gsize buffer_offset)
{
switch (strategy) {
case BUFFER_SIZE_CONSTANT_LARGE:
......@@ -154,7 +193,7 @@ generate_buffer_size (BufferSizeStrategy strategy, GRand *rand,
return 1;
case BUFFER_SIZE_ASCENDING:
return CLAMP (1L << buffer_index, 1, (1 << 16) - 1);
return CLAMP (1L << buffer_offset, 1, (1 << 16) - 1);
case BUFFER_SIZE_RANDOM:
return g_rand_int_range (rand, 1, 1 << 16);
......@@ -164,6 +203,50 @@ generate_buffer_size (BufferSizeStrategy strategy, GRand *rand,
}
}
/* Generate a number of buffers to allocate when receiving the @buffer_offset-th
* byte. Guaranteed to be in the interval [1, 100], where 100 was chosen
* arbitrarily.*/
static guint
generate_buffer_count (BufferCountStrategy strategy, GRand *rand,
gsize buffer_offset)
{
switch (strategy) {
case BUFFER_COUNT_CONSTANT_ONE:
return 1;
case BUFFER_COUNT_CONSTANT_TWO:
return 2;
case BUFFER_COUNT_RANDOM:
return g_rand_int_range (rand, 1, 100 + 1);
default:
g_assert_not_reached ();
}
}
/* Generate a number of messages to allocate and receive into when receiving the
* @buffer_offset-th byte. Guaranteed to be in the interval [1, 100], where 100
* was chosen arbitrarily.*/
static guint
generate_message_count (MessageCountStrategy strategy, GRand *rand,
guint buffer_index)
{
switch (strategy) {
case MESSAGE_COUNT_CONSTANT_ONE:
return 1;
case MESSAGE_COUNT_CONSTANT_TWO:
return 2;
case MESSAGE_COUNT_RANDOM:
return g_rand_int_range (rand, 1, 100 + 1);
default:
g_assert_not_reached ();
}
}
/* Fill the given @buf with @buf_len bytes of generated data. The data is
* deterministically generated, so that:
* generate_buffer_data(_, I, buf, 2)
......@@ -231,7 +314,7 @@ generate_buffer_to_receive (TestIOStreamThreadData *data, gsize buffer_offset,
TestData *test_data = data->user_data;
/* Allocate the buffer. */
*buf_len = generate_buffer_size (test_data->receive_buffer_size_strategy,
*buf_len = generate_buffer_size (test_data->receive.buffer_size_strategy,
test_data->receive_size_rand, buffer_offset);
*buf = g_malloc (*buf_len);
......@@ -239,6 +322,70 @@ generate_buffer_to_receive (TestIOStreamThreadData *data, gsize buffer_offset,
memset (*buf, 0xaa, *buf_len);
}
/* Similar to generate_buffer_to_receive(), but generate an entire message array
* with multiple buffers instead.
*
* @max_buffer_size may be used to limit the total size of all the buffers in
* all the messages, for example to avoid blocking on receiving data which will
* never be sent.
*
* @messages must be freed with g_free(), as must all of the buffer arrays and
* the buffers themselves. */
static void
generate_messages_to_receive (TestIOStreamThreadData *data, gsize buffer_offset,
NiceInputMessage **messages, guint *n_messages, gsize max_buffer_size)
{
TestData *test_data = data->user_data;
guint i;
/* Allocate the messages. */
*n_messages =
generate_message_count (test_data->receive.message_count_strategy,
test_data->receive_size_rand, buffer_offset);
*messages = g_malloc_n (*n_messages, sizeof (NiceInputMessage));
for (i = 0; i < *n_messages; i++) {
NiceInputMessage *message = &((*messages)[i]);
guint j;
message->n_buffers =
generate_buffer_count (test_data->receive.buffer_count_strategy,
test_data->receive_size_rand, buffer_offset);
message->buffers = g_malloc_n (message->n_buffers, sizeof (GInputVector));
message->from = NULL;
message->length = 0;
for (j = 0; j < (guint) message->n_buffers; j++) {
GInputVector *buffer = &message->buffers[j];
gsize buf_len;
buf_len =
generate_buffer_size (test_data->receive.buffer_size_strategy,
test_data->receive_size_rand, buffer_offset);
/* Trim the buffer length if it would otherwise cause the API to block. */
if (data->reliable)
buf_len = MIN (buf_len, max_buffer_size);
max_buffer_size -= buf_len;
buffer->size = buf_len;
buffer->buffer = g_malloc (buffer->size);
/* Fill it with poison to try and detect incorrect writes. */
memset (buffer->buffer, 0xaa, buffer->size);
/* If we’ve hit the max_buffer_size, adjust the buffer and message counts
* and run away. */
if (max_buffer_size == 0) {
message->n_buffers = j + 1;
*n_messages = i + 1;
return;
}
}
}
}
/* Validate the length and data of a received buffer of length @buf_len, filled
* with @len valid bytes. Updates the internal state machine to mark the bytes
* as received. This consumes @buf. */
......@@ -268,6 +415,73 @@ validate_received_buffer (TestIOStreamThreadData *data, gsize buffer_offset,
g_free (*buf);
}
/* Similar to validate_received_buffer(), except it validates a message array
* instead of a single buffer. This consumes @messages. */
static void
validate_received_messages (TestIOStreamThreadData *data, gsize buffer_offset,
NiceInputMessage **messages, guint n_messages, gint n_valid_messages)
{
TestData *test_data = data->user_data;
guint i;
gsize prev_message_len = G_MAXSIZE;
g_assert_cmpint (n_valid_messages, <=, n_messages);
g_assert_cmpint (n_valid_messages, >=, 0);
if (stream_api_is_blocking (test_data->stream_api))
g_assert_cmpint (n_valid_messages, ==, n_messages);
/* Validate the message contents. */
for (i = 0; i < (guint) n_valid_messages; i++) {
NiceInputMessage *message = &((*messages)[i]);
guint j;
gsize total_buf_len = 0;
gsize message_len_remaining = message->length;
g_assert_cmpint (message->n_buffers, >, 0);
for (j = 0; j < (guint) message->n_buffers; j++) {
GInputVector *buffer = &message->buffers[j];
guint8 *expected_buf;
gsize valid_len;
total_buf_len += buffer->size;
valid_len = MIN (message_len_remaining, buffer->size);
expected_buf = g_malloc (buffer->size);
memset (expected_buf, 0xaa, buffer->size);
generate_buffer_data (test_data->buffer_data_strategy, buffer_offset,
expected_buf, valid_len);
g_assert (memcmp (buffer->buffer, expected_buf, buffer->size) == 0);
g_free (expected_buf);
test_data->received_bytes += valid_len;
buffer_offset += valid_len;
message_len_remaining -= valid_len;
g_free (buffer->buffer);
}
g_assert_cmpuint (message->length, <=, total_buf_len);
g_assert_cmpuint (message->length, >=, 0);
/* No non-empty messages can follow an empty message. */
if (prev_message_len == 0)
g_assert_cmpuint (message->length, ==, 0);
prev_message_len = message->length;
/* If the API was blocking, it should have completely filled the message. */
if (stream_api_is_blocking (test_data->stream_api))
g_assert_cmpuint (message->length, ==, total_buf_len);
g_assert (message->from == NULL);
g_free (message->buffers);
}
g_free (*messages);
}
/* Determine a size for the next transmit buffer, allocate it, and fill it with
* data to be transmitted. */
static void
......@@ -277,7 +491,7 @@ generate_buffer_to_transmit (TestIOStreamThreadData *data, gsize buffer_offset,
TestData *test_data = data->user_data;
/* Allocate the buffer. */
*buf_len = generate_buffer_size (test_data->transmit_buffer_size_strategy,
*buf_len = generate_buffer_size (test_data->transmit.buffer_size_strategy,
test_data->transmit_size_rand, buffer_offset);
*buf_len = MIN (*buf_len, test_data->n_bytes - test_data->transmitted_bytes);
*buf = g_malloc (*buf_len);
......@@ -304,7 +518,7 @@ notify_transmitted_buffer (TestIOStreamThreadData *data, gsize buffer_offset,
}
/*
* Implementation using nice_agent_recv() and nice_agent_send().
* Implementation using nice_agent_recv_messages() and nice_agent_send().
*/
static void
read_thread_agent_cb (GInputStream *input_stream, TestIOStreamThreadData *data)
......@@ -319,27 +533,22 @@ read_thread_agent_cb (GInputStream *input_stream, TestIOStreamThreadData *data)
while (test_data->received_bytes < test_data->n_bytes) {
GError *error = NULL;
guint8 *buf = NULL;
gsize buf_len = 0;
gssize len;
NiceInputMessage *messages;
guint n_messages;
gint n_valid_messages;
/* Initialise a receive buffer. */
generate_buffer_to_receive (data, test_data->received_bytes, &buf,
&buf_len);
/* Trim the receive buffer to avoid blocking on bytes which will never
* appear. */
if (data->reliable)
buf_len = MIN (buf_len, test_data->n_bytes - test_data->received_bytes);
/* Initialise an array of messages to receive into. */
generate_messages_to_receive (data, test_data->received_bytes, &messages,
&n_messages, test_data->n_bytes - test_data->received_bytes);
/* Block on receiving some data. */
len = nice_agent_recv (data->agent, stream_id, component_id, buf, buf_len,
NULL, &error);
n_valid_messages = nice_agent_recv_messages (data->agent, stream_id,
component_id, messages, n_messages, NULL, &error);
g_assert_no_error (error);
/* Check the buffer and update the test’s state machine. */
validate_received_buffer (data, test_data->received_bytes, &buf, buf_len,
len);
/* Check the messages and update the test’s state machine. */
validate_received_messages (data, test_data->received_bytes, &messages,
n_messages, n_valid_messages);
}
check_for_termination (data, &test_data->received_bytes,
......@@ -413,30 +622,26 @@ read_thread_agent_nonblocking_cb (GInputStream *input_stream,
while (test_data->received_bytes < test_data->n_bytes) {
GError *error = NULL;
guint8 *buf = NULL;
gsize buf_len = 0;
gssize len;
NiceInputMessage *messages;
guint n_messages;
gint n_valid_messages;
/* Initialise a receive buffer. */
generate_buffer_to_receive (data, test_data->received_bytes, &buf,
&buf_len);
/* Trim the receive buffer to avoid consuming the ‘done’ message. */
if (data->reliable)
buf_len = MIN (buf_len, test_data->n_bytes - test_data->received_bytes);
/* Initialise an array of messages to receive into. */
generate_messages_to_receive (data, test_data->received_bytes, &messages,
&n_messages, test_data->n_bytes - test_data->received_bytes);
/* Busy loop on receiving some data. */
do {
g_clear_error (&error);
len = nice_agent_recv_nonblocking (data->agent, stream_id, component_id,
buf, buf_len, NULL, &error);
} while (len == -1 &&
n_valid_messages = nice_agent_recv_messages_nonblocking (data->agent,
stream_id, component_id, messages, n_messages, NULL, &error);
} while (n_valid_messages == -1 &&
g_error_matches (error, G_IO_ERROR, G_IO_ERROR_WOULD_BLOCK));
g_assert_no_error (error);
/* Check the buffer and update the test’s state machine. */
validate_received_buffer (data, test_data->received_bytes, &buf, buf_len,
len);
/* Check the messages and update the test’s state machine. */
validate_received_messages (data, test_data->received_bytes, &messages,
n_messages, n_valid_messages);
}
check_for_termination (data, &test_data->received_bytes,
......@@ -685,15 +890,23 @@ write_thread_gsource_cb (GOutputStream *output_stream,
static void
test_data_init (TestData *data, gboolean reliable, StreamApi stream_api,
gsize n_bytes, BufferSizeStrategy transmit_buffer_size_strategy,
BufferCountStrategy transmit_buffer_count_strategy,
MessageCountStrategy transmit_message_count_strategy,
BufferSizeStrategy receive_buffer_size_strategy,
BufferCountStrategy receive_buffer_count_strategy,
MessageCountStrategy receive_message_count_strategy,
BufferDataStrategy buffer_data_strategy, guint32 transmit_seed,
guint32 receive_seed, gsize *other_received_bytes)
{
data->reliable = reliable;
data->stream_api = stream_api;
data->n_bytes = n_bytes;
data->transmit_buffer_size_strategy = transmit_buffer_size_strategy;
data->receive_buffer_size_strategy = receive_buffer_size_strategy;
data->transmit.buffer_size_strategy = transmit_buffer_size_strategy;
data->transmit.buffer_count_strategy = transmit_buffer_count_strategy;
data->transmit.message_count_strategy = transmit_message_count_strategy;
data->receive.buffer_size_strategy = receive_buffer_size_strategy;
data->receive.buffer_count_strategy = receive_buffer_count_strategy;
data->receive.message_count_strategy = receive_message_count_strategy;
data->buffer_data_strategy = buffer_data_strategy;
data->transmit_size_rand = g_rand_new_with_seed (transmit_seed);
data->receive_size_rand = g_rand_new_with_seed (receive_seed);
......@@ -715,7 +928,11 @@ test_data_clear (TestData *data)
static void
test (gboolean reliable, StreamApi stream_api, gsize n_bytes,
BufferSizeStrategy transmit_buffer_size_strategy,
BufferCountStrategy transmit_buffer_count_strategy,
MessageCountStrategy transmit_message_count_strategy,
BufferSizeStrategy receive_buffer_size_strategy,
BufferCountStrategy receive_buffer_count_strategy,
MessageCountStrategy receive_message_count_strategy,
BufferDataStrategy buffer_data_strategy,
guint32 transmit_seed, guint32 receive_seed,
guint deadlock_timeout)
......@@ -734,11 +951,15 @@ test (gboolean reliable, StreamApi stream_api, gsize n_bytes,
};
test_data_init (&l_data, reliable, stream_api, n_bytes,
transmit_buffer_size_strategy, receive_buffer_size_strategy,
transmit_buffer_size_strategy, transmit_buffer_count_strategy,
transmit_message_count_strategy, receive_buffer_size_strategy,
receive_buffer_count_strategy, receive_message_count_strategy,
buffer_data_strategy, transmit_seed, receive_seed,
&r_data.received_bytes);
test_data_init (&r_data, reliable, stream_api, n_bytes,
transmit_buffer_size_strategy, receive_buffer_size_strategy,
transmit_buffer_size_strategy, transmit_buffer_count_strategy,
transmit_message_count_strategy, receive_buffer_size_strategy,
receive_buffer_count_strategy, receive_message_count_strategy,
buffer_data_strategy, transmit_seed, receive_seed,
&l_data.received_bytes);
......@@ -776,7 +997,11 @@ main (int argc, char *argv[])
gboolean reliable;
StreamApi stream_api;
BufferSizeStrategy transmit_buffer_size_strategy;
BufferCountStrategy transmit_buffer_count_strategy;
MessageCountStrategy transmit_message_count_strategy;
BufferSizeStrategy receive_buffer_size_strategy;
BufferCountStrategy receive_buffer_count_strategy;
MessageCountStrategy receive_message_count_strategy;
BufferDataStrategy buffer_data_strategy;
guint32 transmit_seed;
guint32 receive_seed;
......@@ -836,68 +1061,87 @@ main (int argc, char *argv[])
transmit_buffer_size_strategy = BUFFER_SIZE_RANDOM;
buffer_data_strategy = BUFFER_DATA_PSEUDO_RANDOM;
if (stream_api_supports_vectored_io (stream_api)) {
transmit_buffer_count_strategy = BUFFER_COUNT_RANDOM;
transmit_message_count_strategy = MESSAGE_COUNT_RANDOM;
receive_buffer_count_strategy = BUFFER_COUNT_RANDOM;
receive_message_count_strategy = MESSAGE_COUNT_RANDOM;
} else {
transmit_buffer_count_strategy = BUFFER_COUNT_CONSTANT_ONE;
transmit_message_count_strategy = MESSAGE_COUNT_CONSTANT_ONE;
receive_buffer_count_strategy = BUFFER_COUNT_CONSTANT_ONE;
receive_message_count_strategy = MESSAGE_COUNT_CONSTANT_ONE;
}
g_debug ("Running test (%u, %u, %" G_GSIZE_FORMAT ", %u, "
"%u, %u, %u, %u)…",
reliable, stream_api, n_bytes, transmit_buffer_size_strategy,
receive_buffer_size_strategy, buffer_data_strategy,
transmit_seed, receive_seed);
test (reliable, stream_api, n_bytes, transmit_buffer_size_strategy,
receive_buffer_size_strategy, buffer_data_strategy,
transmit_buffer_count_strategy, transmit_message_count_strategy,
receive_buffer_size_strategy, receive_buffer_count_strategy,
receive_message_count_strategy, buffer_data_strategy,
transmit_seed, receive_seed,
deadlock_timeout / 20 /* arbitrary reduction */);
deadlock_timeout);
}
}
goto done;
}
/* Transmit buffer strategy. */
for (transmit_buffer_size_strategy = 0;
(guint) transmit_buffer_size_strategy < BUFFER_SIZE_STRATEGY_N_ELEMENTS;
transmit_buffer_size_strategy++) {
/* Receive buffer strategy. */
for (receive_buffer_size_strategy = 0;
(guint) receive_buffer_size_strategy < BUFFER_SIZE_STRATEGY_N_ELEMENTS;
receive_buffer_size_strategy++) {
/* Transmit data strategy. */
for (buffer_data_strategy = 0;
(guint) buffer_data_strategy < BUFFER_DATA_STRATEGY_N_ELEMENTS;
buffer_data_strategy++) {
/* Reliability. */
for (reliable = 0; reliable < 2; reliable++) {
/* Stream API. */
for (stream_api = 0;
(guint) stream_api < STREAM_API_N_ELEMENTS;
stream_api++) {
/* GIO streams must always be reliable. */
if (!reliable && stream_api_is_reliable_only (stream_api))
continue;
/* Non-reliable socket receives require large buffers. We don’t
* claim to support using them with small (<< 65535B) buffers, so
* don’t test them. */
if (!reliable &&
receive_buffer_size_strategy != BUFFER_SIZE_CONSTANT_LARGE)
continue;
/* Non-reliable socket transmits will always block with huge
* buffers. */
if (!reliable &&
transmit_buffer_size_strategy == BUFFER_SIZE_CONSTANT_LARGE)
continue;
g_debug ("Running test (%u, %u, %" G_GSIZE_FORMAT ", %u, "
"%u, %u, %u, %u)…",
reliable, stream_api, n_bytes, transmit_buffer_size_strategy,
receive_buffer_size_strategy, buffer_data_strategy,
transmit_seed, receive_seed);
test (reliable, stream_api, n_bytes, transmit_buffer_size_strategy,
receive_buffer_size_strategy, buffer_data_strategy,
transmit_seed, receive_seed,
deadlock_timeout);
}
}
}
#define STRATEGY_LOOP(V, L) for (V = 0; (guint) V < L##_N_ELEMENTS; V++)
STRATEGY_LOOP(transmit_buffer_size_strategy, BUFFER_SIZE_STRATEGY)
STRATEGY_LOOP(transmit_buffer_count_strategy, BUFFER_COUNT_STRATEGY)
STRATEGY_LOOP(transmit_message_count_strategy, MESSAGE_COUNT_STRATEGY)
STRATEGY_LOOP(receive_buffer_size_strategy, BUFFER_SIZE_STRATEGY)
STRATEGY_LOOP(receive_buffer_count_strategy, BUFFER_COUNT_STRATEGY)
STRATEGY_LOOP(receive_message_count_strategy, MESSAGE_COUNT_STRATEGY)
STRATEGY_LOOP(buffer_data_strategy, BUFFER_DATA_STRATEGY)
/* Reliability. */
for (reliable = 0; reliable < 2; reliable++) {
/* Stream API. */
for (stream_api = 0;
(guint) stream_api < STREAM_API_N_ELEMENTS;
stream_api++) {
/* GIO streams must always be reliable. */
if (!reliable && stream_api_is_reliable_only (stream_api))
continue;
/* Non-reliable socket receives require large buffers. We don’t claim to
* support using them with small (< 65536B) buffers, so don’t test
* them. */
if (!reliable &&
receive_buffer_size_strategy != BUFFER_SIZE_CONSTANT_LARGE)
continue;
/* Non-reliable socket transmits will always block with huge buffers. */
if (!reliable &&
transmit_buffer_size_strategy == BUFFER_SIZE_CONSTANT_LARGE)
continue;
/* Stream APIs which don’t support vectored I/O must not be passed
* I/O vectors. */
if (!stream_api_supports_vectored_io (stream_api) &&
(transmit_buffer_count_strategy != BUFFER_COUNT_CONSTANT_ONE ||
transmit_message_count_strategy != MESSAGE_COUNT_CONSTANT_ONE ||
receive_buffer_count_strategy != BUFFER_COUNT_CONSTANT_ONE ||
receive_message_count_strategy != MESSAGE_COUNT_CONSTANT_ONE))
continue;
g_debug ("Running test (%u, %u, %" G_GSIZE_FORMAT ", %u, "
"%u, %u, %u, %u, %u, %u, %u, %u)…",
reliable, stream_api, n_bytes, transmit_buffer_size_strategy,
transmit_buffer_count_strategy, transmit_message_count_strategy,
receive_buffer_size_strategy, receive_buffer_count_strategy,
receive_message_count_strategy, buffer_data_strategy,
transmit_seed, receive_seed);
test (reliable, stream_api, n_bytes, transmit_buffer_size_strategy,
transmit_buffer_count_strategy, transmit_message_count_strategy,
receive_buffer_size_strategy, receive_buffer_count_strategy,
receive_message_count_strategy, buffer_data_strategy,
transmit_seed, receive_seed,
deadlock_timeout);
}
}
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment