Commit 8aa69577 authored by Julian Bouzas's avatar Julian Bouzas
Browse files

tests: config-policy: run the main loop in the same thread

parent 3fd65ac9
Pipeline #113055 passed with stage
in 2 minutes and 16 seconds
......@@ -37,8 +37,8 @@ struct _WpConfigPolicy
WpConfiguration *config;
gboolean pending_rescan;
WpBaseEndpoint *pending_endpoint;
gboolean endpoint_handled;
gboolean pending_links;
gboolean pending_done;
};
enum {
......@@ -56,9 +56,29 @@ static guint signals[N_SIGNALS];
G_DEFINE_TYPE (WpConfigPolicy, wp_config_policy, WP_TYPE_POLICY)
static void
on_endpoint_link_created (GObject *initable, GAsyncResult *res, gpointer p)
wp_config_policy_rescan_done (WpConfigPolicy *self)
{
WpConfigPolicy *self = p;
/* Reset pending flags */
self->pending_rescan = FALSE;
self->pending_links = FALSE;
self->pending_done = FALSE;
/* Emit the done signal */
g_signal_emit (self, signals[SIGNAL_DONE], 0);
}
static void
wp_config_policy_sync_done (WpCore *core, GAsyncResult *res, gpointer data)
{
WpConfigPolicy *self = WP_CONFIG_POLICY (data);
wp_config_policy_rescan_done (self);
}
static void
on_endpoint_link_created (GObject *initable, GAsyncResult *res, gpointer data)
{
WpConfigPolicy *self = WP_CONFIG_POLICY (data);
g_autoptr (WpCore) core = wp_policy_get_core (WP_POLICY (self));
g_autoptr (WpBaseEndpointLink) link = NULL;
g_autoptr (GError) error = NULL;
g_autoptr (WpBaseEndpoint) src_ep = NULL;
......@@ -79,15 +99,12 @@ on_endpoint_link_created (GObject *initable, GAsyncResult *res, gpointer p)
g_info ("Sucessfully linked '%s' to '%s'\n", wp_base_endpoint_get_name (src_ep),
wp_base_endpoint_get_name (sink_ep));
/* Emit the done signal */
if (self->pending_endpoint) {
gboolean is_capture =
wp_base_endpoint_get_direction (self->pending_endpoint) == PW_DIRECTION_INPUT;
if (self->pending_endpoint == (is_capture ? sink_ep : src_ep)) {
g_autoptr (WpBaseEndpoint) pending_endpoint =
g_steal_pointer (&self->pending_endpoint);
g_signal_emit (self, signals[SIGNAL_DONE], 0, pending_endpoint, link);
}
/* Finish rescanning if no pending done */
g_return_if_fail (core);
if (!self->pending_done) {
self->pending_done = TRUE;
wp_core_sync (core, NULL, (GAsyncReadyCallback)wp_config_policy_sync_done,
self);
}
}
......@@ -393,7 +410,7 @@ links_table_handle_foreach (gpointer key, gpointer value, gpointer data)
if (i == 0 || li->keep)
if (wp_config_policy_handle_pending_link (self, li, target))
self->endpoint_handled = li->ep == self->pending_endpoint;
self->pending_links = TRUE;
}
}
......@@ -406,8 +423,8 @@ wp_config_policy_sync_rescan (WpCore *core, GAsyncResult *res, gpointer data)
g_debug ("rescanning");
/* Set handle to false to know if pending endpoint was handled in this loop */
self->endpoint_handled = FALSE;
/* Set pending-links to false */
self->pending_links = FALSE;
/* Handle all endpoints when rescanning */
endpoints = wp_policy_manager_list_endpoints (pmgr, NULL);
......@@ -429,51 +446,39 @@ wp_config_policy_sync_rescan (WpCore *core, GAsyncResult *res, gpointer data)
g_clear_pointer (&links_table, g_hash_table_unref);
}
/* If endpoint was not handled, we are done */
if (!self->endpoint_handled) {
g_signal_emit (self, signals[SIGNAL_DONE], 0, self->pending_endpoint,
NULL);
g_clear_object (&self->pending_endpoint);
}
self->pending_rescan = FALSE;
/* Finish rescanning if no pending links */
if (!self->pending_links)
wp_config_policy_rescan_done (self);
}
static void
wp_config_policy_rescan (WpConfigPolicy *self, WpBaseEndpoint *ep)
wp_config_policy_rescan (WpConfigPolicy *self)
{
if (self->pending_rescan)
return;
/* Check if there is a pending link while a new endpoint is added/removed */
if (self->pending_endpoint) {
g_warning ("Not handling endpoint '%s' beacause of pending link",
wp_base_endpoint_get_name (ep));
return;
}
g_autoptr (WpCore) core = wp_policy_get_core (WP_POLICY (self));
if (!core)
return;
self->pending_endpoint = g_object_ref (ep);
wp_core_sync (core, NULL, (GAsyncReadyCallback)wp_config_policy_sync_rescan,
self);
self->pending_rescan = TRUE;
/* Scan if no pending scan */
if (!self->pending_rescan) {
self->pending_rescan = TRUE;
wp_core_sync (core, NULL, (GAsyncReadyCallback)wp_config_policy_sync_rescan,
self);
}
}
static void
wp_config_policy_endpoint_added (WpPolicy *policy, WpBaseEndpoint *ep)
{
WpConfigPolicy *self = WP_CONFIG_POLICY (policy);
wp_config_policy_rescan (self, ep);
wp_config_policy_rescan (self);
}
static void
wp_config_policy_endpoint_removed (WpPolicy *policy, WpBaseEndpoint *ep)
{
WpConfigPolicy *self = WP_CONFIG_POLICY (policy);
wp_config_policy_rescan (self, ep);
wp_config_policy_rescan (self);
}
static void
......@@ -541,7 +546,6 @@ wp_config_policy_finalize (GObject *object)
static void
wp_config_policy_init (WpConfigPolicy *self)
{
self->pending_rescan = FALSE;
}
static void
......@@ -568,7 +572,7 @@ wp_config_policy_class_init (WpConfigPolicyClass *klass)
/* Signals */
signals[SIGNAL_DONE] = g_signal_new ("done",
G_TYPE_FROM_CLASS (klass), G_SIGNAL_RUN_LAST, 0, NULL, NULL, NULL,
G_TYPE_NONE, 2, WP_TYPE_BASE_ENDPOINT, WP_TYPE_BASE_ENDPOINT_LINK);
G_TYPE_NONE, 0);
}
WpConfigPolicy *
......
This diff is collapsed.
......@@ -20,65 +20,27 @@ struct _WpConfigPolicyContext
/* Props */
GWeakRef core;
GMainLoop *loop;
char *config_path;
GMutex mutex;
GCond cond;
WpBaseEndpoint *endpoint;
WpBaseEndpointLink *link;
WpConfigPolicy *policy;
GWeakRef last_endpoint;
};
enum {
PROP_0,
PROP_CORE,
PROP_LOOP,
PROP_CONFIG_PATH,
};
G_DEFINE_TYPE (WpConfigPolicyContext, wp_config_policy_context, G_TYPE_OBJECT);
static WpBaseEndpoint *
wait_for_endpoint (WpConfigPolicyContext *self, WpBaseEndpointLink **link)
{
gint64 end_time;
g_mutex_lock (&self->mutex);
/* Wait for endpoint to be set */
end_time = g_get_monotonic_time () + 3 * G_TIME_SPAN_SECOND;
while (!self->endpoint)
if (!g_cond_wait_until (&self->cond, &self->mutex, end_time)) {
/* Abort when timeout has passed */
g_warning ("Aborting due to timeout when waiting for endpoint");
abort();
}
/* Set endpoint to a local value and clear global value */
WpBaseEndpoint *endpoint = g_object_ref (self->endpoint);
g_clear_object (&self->endpoint);
/* Set link to a local value and clear global value */
if (link)
*link = self->link ? g_object_ref (self->link) : NULL;
g_clear_object (&self->link);
g_mutex_unlock (&self->mutex);
return endpoint;
}
static void
on_done (WpConfigPolicy *cp, WpBaseEndpoint *ep, WpBaseEndpointLink *link,
WpConfigPolicyContext *self)
on_done (WpConfigPolicyContext *self)
{
if (!ep)
return;
g_mutex_lock (&self->mutex);
self->endpoint = g_object_ref (ep);
self->link = link ? g_object_ref (link) : NULL;
g_cond_signal (&self->cond);
g_mutex_unlock (&self->mutex);
g_return_if_fail (self->loop);
g_main_loop_quit (self->loop);
}
static void
......@@ -97,11 +59,12 @@ wp_config_policy_context_constructed (GObject *object)
wp_configuration_add_path (config, self->config_path);
/* Register the config policy */
g_autoptr (WpConfigPolicy) cp = wp_config_policy_new (config);
wp_policy_register (WP_POLICY (cp), core);
self->policy = wp_config_policy_new (config);
wp_policy_register (WP_POLICY (self->policy), core);
/* Handle done and link-created signals */
g_signal_connect (cp, "done", (GCallback) on_done, self);
/* Handle the done signal */
g_signal_connect_object (self->policy, "done", (GCallback) on_done, self,
G_CONNECT_SWAPPED);
G_OBJECT_CLASS (wp_config_policy_context_parent_class)->constructed (object);
}
......@@ -116,6 +79,9 @@ wp_config_policy_context_set_property (GObject * object, guint property_id,
case PROP_CORE:
g_weak_ref_set (&self->core, g_value_get_object (value));
break;
case PROP_LOOP:
self->loop = g_value_get_pointer (value);
break;
case PROP_CONFIG_PATH:
self->config_path = g_value_dup_string (value);
break;
......@@ -135,6 +101,9 @@ wp_config_policy_context_get_property (GObject * object, guint property_id,
case PROP_CORE:
g_value_take_object (value, g_weak_ref_get (&self->core));
break;
case PROP_LOOP:
g_value_set_pointer (value, self->loop);
break;
case PROP_CONFIG_PATH:
g_value_set_string (value, self->config_path);
break;
......@@ -149,9 +118,13 @@ wp_config_policy_context_finalize (GObject * object)
{
WpConfigPolicyContext *self = WP_CONFIG_POLICY_CONTEXT (object);
g_mutex_clear (&self->mutex);
g_cond_clear (&self->cond);
g_weak_ref_clear (&self->last_endpoint);
if (self->policy)
wp_policy_unregister (WP_POLICY (self->policy));
g_clear_object (&self->policy);
self->loop = NULL;
g_weak_ref_clear (&self->core);
G_OBJECT_CLASS (wp_config_policy_context_parent_class)->finalize (object);
......@@ -161,9 +134,9 @@ static void
wp_config_policy_context_init (WpConfigPolicyContext * self)
{
g_weak_ref_init (&self->core, NULL);
self->loop = NULL;
g_mutex_init (&self->mutex);
g_cond_init (&self->cond);
g_weak_ref_init (&self->last_endpoint, NULL);
}
static void
......@@ -180,6 +153,10 @@ wp_config_policy_context_class_init (WpConfigPolicyContextClass * klass)
g_param_spec_object ("core", "core", "The wireplumber core", WP_TYPE_CORE,
G_PARAM_READWRITE | G_PARAM_CONSTRUCT_ONLY | G_PARAM_STATIC_STRINGS));
g_object_class_install_property (object_class, PROP_LOOP,
g_param_spec_pointer ("loop", "loop", "The main loop where pipewire runs",
G_PARAM_READWRITE | G_PARAM_CONSTRUCT_ONLY | G_PARAM_STATIC_STRINGS));
g_object_class_install_property (object_class, PROP_CONFIG_PATH,
g_param_spec_string ("config-path", "config-path",
"The config-path of the context", NULL,
......@@ -187,24 +164,30 @@ wp_config_policy_context_class_init (WpConfigPolicyContextClass * klass)
}
WpConfigPolicyContext *
wp_config_policy_context_new (WpCore *core, const char *config_path)
wp_config_policy_context_new (WpCore *core, GMainLoop *loop,
const char *config_path)
{
return g_object_new (wp_config_policy_context_get_type (),
"core", core,
"loop", loop,
"config-path", config_path,
NULL);
}
static void
on_endpoint_created (GObject *initable, GAsyncResult *res, gpointer d)
on_endpoint_created (GObject *initable, GAsyncResult *res, gpointer data)
{
g_autoptr (WpBaseEndpoint) ep = NULL;
WpConfigPolicyContext *self = data;
GError *error = NULL;
g_autoptr (WpBaseEndpoint) ep = NULL;
ep = wp_base_endpoint_new_finish (initable, res, &error);
g_return_if_fail (!error);
g_return_if_fail (ep);
/* Update last endpoint weak ref */
g_weak_ref_set (&self->last_endpoint, ep);
/* Register the endpoint */
wp_base_endpoint_register (ep);
}
......@@ -212,7 +195,7 @@ on_endpoint_created (GObject *initable, GAsyncResult *res, gpointer d)
WpBaseEndpoint *
wp_config_policy_context_add_endpoint (WpConfigPolicyContext *self,
const char *name, const char *media_class, guint direction,
WpProperties *props, const char *role, guint streams, WpBaseEndpointLink **link)
WpProperties *props, const char *role, guint streams)
{
g_autoptr (WpCore) core = g_weak_ref_get (&self->core);
g_return_val_if_fail (core, NULL);
......@@ -220,7 +203,8 @@ wp_config_policy_context_add_endpoint (WpConfigPolicyContext *self,
wp_fake_endpoint_new_async (core, name, media_class, direction, props, role,
streams, on_endpoint_created, self);
return wait_for_endpoint (self, link);
g_main_loop_run (self->loop);
return g_weak_ref_get(&self->last_endpoint);
}
void
......@@ -230,6 +214,5 @@ wp_config_policy_context_remove_endpoint (WpConfigPolicyContext *self,
g_return_if_fail (ep);
wp_base_endpoint_unregister (ep);
wait_for_endpoint (self, NULL);
g_main_loop_run (self->loop);
}
......@@ -19,11 +19,10 @@ G_DECLARE_FINAL_TYPE (WpConfigPolicyContext, wp_config_policy_context, WP,
CONFIG_POLICY_CONTEXT, GObject);
WpConfigPolicyContext *wp_config_policy_context_new (WpCore *core,
const char *config_path);
GMainLoop *loop, const char *config_path);
WpBaseEndpoint *wp_config_policy_context_add_endpoint (WpConfigPolicyContext *self,
const char *name, const char *media_class, guint direction,
WpProperties *props, const char *role, guint streams,
WpBaseEndpointLink **link);
WpProperties *props, const char *role, guint streams);
void wp_config_policy_context_remove_endpoint (WpConfigPolicyContext *self,
WpBaseEndpoint *ep);
......
......@@ -146,12 +146,15 @@ wp_fake_endpoint_finalize (GObject * object)
G_OBJECT_CLASS (wp_fake_endpoint_parent_class)->finalize (object);
}
static void
wp_fake_endpoint_finish_creation (WpCore *core, GAsyncResult *res,
WpFakeEndpoint *self)
static gboolean
wp_fake_endpoint_finish_creation (gpointer d)
{
WpFakeEndpoint *self = d;
g_task_return_boolean (self->init_task, TRUE);
g_clear_object (&self->init_task);
return FALSE;
}
static void
......@@ -165,10 +168,10 @@ wp_fake_endpoint_init_async (GAsyncInitable *initable, int io_priority,
wp_fake_endpoint_parent_interface->init_async (initable, io_priority,
cancellable, callback, data);
g_autoptr (WpCore) core = wp_base_endpoint_get_core(WP_BASE_ENDPOINT(self));
if (core)
wp_core_sync (core, NULL,
(GAsyncReadyCallback) wp_fake_endpoint_finish_creation, self);
g_autoptr (WpCore) core = wp_base_endpoint_get_core (WP_BASE_ENDPOINT(self));
g_return_if_fail (core);
wp_core_idle_add (core, wp_fake_endpoint_finish_creation, self, NULL);
}
static void
......
......@@ -88,12 +88,15 @@ wp_fake_endpoint_link_finalize (GObject * object)
G_OBJECT_CLASS (wp_fake_endpoint_link_parent_class)->finalize (object);
}
static void
wp_fake_endpoint_link_finish_creation (WpCore *core, GAsyncResult *res,
WpFakeEndpointLink *self)
static gboolean
wp_fake_endpoint_link_finish_creation (gpointer d)
{
WpFakeEndpointLink *self = d;
g_task_return_boolean (self->init_task, TRUE);
g_clear_object (&self->init_task);
return FALSE;
}
static void
......@@ -108,9 +111,9 @@ wp_fake_endpoint_link_init_async (GAsyncInitable *initable, int io_priority,
io_priority, cancellable, callback, data);
g_autoptr (WpCore) core = g_weak_ref_get (&self->core);
if (core)
wp_core_sync (core, NULL,
(GAsyncReadyCallback) wp_fake_endpoint_link_finish_creation, self);
g_return_if_fail (core);
wp_core_idle_add (core, wp_fake_endpoint_link_finish_creation, self, NULL);
}
static void
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment