Commit 25bf61d0 authored by Santiago's avatar Santiago Committed by Santiago

gstatomicqueue: Improve performance on push operations

Do not block fast writers until slower ones finish. Instead we store the data
at the right posistion and update the read index when all consecutive data
is available for readers.

Change-Id: I80546fed4f740ff20c3ddc30ae8f0ffc27108687

gstreamer/gstreamer#192
parent be56bd9a
Pipeline #48420 passed with stages
in 110 minutes and 44 seconds
......@@ -60,6 +60,8 @@ struct _GstAQueueMem
volatile gint head;
volatile gint tail_write;
volatile gint tail_read;
volatile guint filled_mask;
volatile guint skip;
GstAQueueMem *next;
GstAQueueMem *free;
};
......@@ -90,6 +92,8 @@ new_queue_mem (guint size, gint pos)
mem->tail_read = pos;
mem->next = NULL;
mem->free = NULL;
mem->filled_mask = 0;
mem->skip = -1;
return mem;
}
......@@ -337,8 +341,8 @@ gst_atomic_queue_pop (GstAtomicQueue * queue)
void
gst_atomic_queue_push (GstAtomicQueue * queue, gpointer data)
{
GstAQueueMem *tail_mem;
gint head, tail, size;
GstAQueueMem *tail_mem, *current;
gint head, tail, next, flags, size;
g_return_if_fail (queue != NULL);
......@@ -373,19 +377,88 @@ gst_atomic_queue_push (GstAtomicQueue * queue, gpointer data)
* manages to swap the pointer is the only one who can set the next
* pointer to the new array */
g_atomic_pointer_set (&tail_mem->next, mem);
/* Set the tail index that caused the new array to be created */
g_atomic_int_set (&tail_mem->skip, tail);
}
} while G_UNLIKELY
(!g_atomic_int_compare_and_exchange (&tail_mem->tail_write, tail, tail + 1));
tail_mem->array[tail & size] = data;
g_atomic_int_or (&tail_mem->filled_mask, 1 << (tail & size));
if (!g_atomic_int_compare_and_exchange (&tail_mem->tail_read, tail, tail + 1)) {
/* we beat other writers so we let slower writers update the tail_read index */
return;
}
/* clear our write_flag */
flags = g_atomic_int_and (&tail_mem->filled_mask, (~(1 << (tail & size))));
if (!(flags & (1 << (tail & size)))) {
/* If our flag is set to 0 is beacause another writer beat us */
return;
}
/* Check how many writers beat us to update the read index properly */
next = tail;
current = tail_mem;
#ifdef LOW_MEM
/* We must take care that no one frees the current memory while we are updating the */
/* read_index and we find out that it is filled and a new array has ben created at */
/* the time other processes are popping data from the array memory pointed by current */
g_atomic_int_inc (&queue->num_readers);
#endif
/* now wait until all writers have completed their write before we move the
* tail_read to this new item. It is possible that other writers are still
* updating the previous array slots and we don't want to reveal their changes
* before they are done. FIXME, it would be nice if we didn't have to busy
* wait here. */
while G_UNLIKELY
(!g_atomic_int_compare_and_exchange (&tail_mem->tail_read, tail, tail + 1));
while (TRUE) {
gint skip;
do {
skip = g_atomic_int_get (&tail_mem->skip);
next++;
if (skip >= 0 && next >= skip) {
tail_mem = g_atomic_pointer_get (&tail_mem->next);
size = tail_mem->size;
}
flags =
g_atomic_int_and (&tail_mem->filled_mask, (~(1 << (next & size))));
} while G_UNLIKELY
(flags & (1 << (next & size)));
if (tail + 1 >= next) {
/* No more pending writers */
break;
}
tail_mem = current;
while G_UNLIKELY
(++tail < next) {
skip = g_atomic_int_get (&tail_mem->skip);
if (skip >= 0 && tail >= skip) {
tail_mem = g_atomic_pointer_get (&tail_mem->next);
}
/* Only the latest writer among the faster ones will be capable of updating the read index */
/* so we keep trying to check if we are the elected one. We do not care about the return */
/* value because even we could update the index here, there might be a new slot of memory */
/* that should be updated as well, so as long as the condition (tail < index) is true we */
/* must keep trying. */
g_atomic_int_compare_and_exchange (&tail_mem->tail_read, tail, next);
};
/* Check if by the time we updated tail_read index any writer beat us and left */
/* the function without updating the index */
tail = --next;
current = tail_mem;
}
#ifdef LOW_MEM
/* decrement number of readers, when we reach 0 readers we can be sure that
* none is accessing the memory in the free list and we can try to clean up */
if (g_atomic_int_dec_and_test (&queue->num_readers))
clear_free_list (queue);
#endif
}
/**
......
......@@ -24,6 +24,10 @@
#include <gst/gstatomicqueue.h>
#include <gst/gst.h>
#define NUMBER_OF_ELEMENTS 300
static volatile gint number = 1;
GST_START_TEST (test_create_free)
{
GstAtomicQueue *aq;
......@@ -34,6 +38,121 @@ GST_START_TEST (test_create_free)
GST_END_TEST;
GST_START_TEST (test_simple_push_pop)
{
GstAtomicQueue *aq;
gpointer p;
guint i;
aq = gst_atomic_queue_new (20);
for (i = 1; i <= NUMBER_OF_ELEMENTS; i++) {
gst_atomic_queue_push (aq, GUINT_TO_POINTER (i));
}
fail_if (gst_atomic_queue_length (aq) != NUMBER_OF_ELEMENTS);
for (i = 1; i <= NUMBER_OF_ELEMENTS; i++) {
p = gst_atomic_queue_pop (aq);
fail_if (GPOINTER_TO_UINT (p) != i);
}
gst_atomic_queue_unref (aq);
}
GST_END_TEST;
struct thread_data
{
GstAtomicQueue *queue;
GSList *list;
GMutex mutex;
};
static gint
cmp_int_p (gconstpointer a, gconstpointer b, gpointer user_data)
{
gint n1 = GPOINTER_TO_UINT (a), n2 = GPOINTER_TO_UINT (b);
if (n1 < n2) {
return -1;
} else if (n1 == n2) {
return 0;
} else {
return 1;
}
}
static void
pop_data (struct thread_data *data)
{
gpointer p = NULL;
while (p == NULL) {
p = gst_atomic_queue_pop (data->queue);
}
g_mutex_lock (&data->mutex);
data->list = g_slist_insert_sorted_with_data (data->list, p, cmp_int_p, NULL);
g_mutex_unlock (&data->mutex);
}
static void
push_data (struct thread_data *data)
{
gint n = g_atomic_int_add (&number, 1);
gst_atomic_queue_push (data->queue, GUINT_TO_POINTER (n));
}
GST_START_TEST (test_concurrent_access)
{
GstTaskPool *w_pool, *r_pool;
struct thread_data data;
GError *err = NULL;
GSList *l;
guint i;
w_pool = gst_task_pool_new ();
r_pool = gst_task_pool_new ();
gst_task_pool_prepare (r_pool, &err);
fail_if (err != NULL);
gst_task_pool_prepare (w_pool, &err);
fail_if (err != NULL);
g_mutex_init (&data.mutex);
data.queue = gst_atomic_queue_new (20);
data.list = NULL;
for (i = 0; i < NUMBER_OF_ELEMENTS; i++) {
gst_task_pool_push (r_pool, (GstTaskPoolFunction) pop_data, &data, &err);
fail_if (err != NULL);
gst_task_pool_push (w_pool, (GstTaskPoolFunction) push_data, &data, &err);
fail_if (err != NULL);
}
gst_task_pool_cleanup (r_pool);
gst_task_pool_cleanup (w_pool);
g_mutex_clear (&data.mutex);
gst_atomic_queue_unref (data.queue);
g_object_unref (r_pool);
g_object_unref (w_pool);
fail_if (g_slist_length (data.list) != NUMBER_OF_ELEMENTS);
for (l = data.list, i = 1; l != NULL; l = g_slist_next (l), i++) {
fail_if (i != GPOINTER_TO_UINT (l->data));
}
g_slist_free (data.list);
}
GST_END_TEST;
static Suite *
gst_atomic_queue_suite (void)
{
......@@ -41,7 +160,10 @@ gst_atomic_queue_suite (void)
TCase *tc_chain = tcase_create ("GstAtomicQueue tests");
suite_add_tcase (s, tc_chain);
tcase_add_test (tc_chain, test_create_free);
tcase_add_test (tc_chain, test_simple_push_pop);
tcase_add_test (tc_chain, test_concurrent_access);
return s;
}
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment