Commit e7e5c668 authored by George Kiagiadakis's avatar George Kiagiadakis
Browse files

lib: introduce WpObjectManager

* rework how global objects are stored in the core
* rework how users get notified about global objects
  and proxies of remote global objects

The purpose of this change is to have a class that can manage
objects that are registered in the core or signalled through the
registry. This object can declare interest on certain types
of global objects and only keep & signal those objects that it is
interested in. Additionally, it can prepare proxy features and
asynchronously deliver an 'objects-changed' signal, which is
basically telling us that the list of objects has changed.

This is useful to simplify port proxies management in WpAudioStream.
Now the stream object can declare that it is interested in ports
that have "node.id" == X and the object manager will only maintain
a list of those. Additionally, it will emit the 'objects-changed'
signal when the list of ports is complete, so there is no reason to
do complex operations and core syncs in the WpAudioStream class
in order to figure out when the list of ports is ready.

As a side effect, this also reduces resource management. Now we
don't construct a WpProxy for every global that pipewire reports;
we only construct proxies when there is interest in them!

Another interesting side effect is that we can now register an
object manager at any point in time and get immediately notified
about remote globals that already exist. i.e. when you register
an object manager that is interested in nodes, it will be immediately
notified about all the existing nodes in the graph. This is useful
to avoid race conditions between connecting the signal and objects
beting created in pipewire
parent a93cbdf8
......@@ -7,12 +7,14 @@
*/
#include "core.h"
#include "object-manager.h"
#include "proxy.h"
#include "wpenums.h"
#include "private.h"
#include <pipewire/pipewire.h>
#include <spa/utils/result.h>
#include <spa/debug/types.h>
/*
* Integration between the PipeWire main loop and GMainLoop
......@@ -94,19 +96,9 @@ struct _WpCore
struct pw_registry_proxy *registry_proxy;
struct spa_hook registry_listener;
/* local proxies */
GHashTable *proxies;
GHashTable *default_features;
/* local global objects */
GPtrArray *global_objects;
};
struct global_object
{
GQuark key;
gpointer object;
GDestroyNotify destroy;
GPtrArray *globals; // elementy-type: WpGlobal*
GPtrArray *objects; // element-type: GObject*
GPtrArray *object_managers; // element-type: WpObjectManager*
};
enum {
......@@ -120,10 +112,6 @@ enum {
enum {
SIGNAL_REMOTE_STATE_CHANGED,
SIGNAL_GLOBAL_ADDED,
SIGNAL_GLOBAL_REMOVED,
SIGNAL_REMOTE_GLOBAL_ADDED,
SIGNAL_REMOTE_GLOBAL_REMOVED,
NUM_SIGNALS
};
......@@ -131,61 +119,59 @@ static guint32 signals[NUM_SIGNALS];
G_DEFINE_TYPE (WpCore, wp_core, G_TYPE_OBJECT)
static void
on_proxy_ready (GObject * obj, GAsyncResult * res, gpointer data)
{
WpCore *self = WP_CORE (data);
WpProxy *proxy = WP_PROXY (obj);
g_autoptr (GError) error = NULL;
if (!wp_proxy_augment_finish (proxy, res, &error)) {
g_warning ("Failed to augment WpProxy (%p): %s", obj, error->message);
return;
}
g_signal_emit (self, signals[SIGNAL_REMOTE_GLOBAL_ADDED],
wp_proxy_get_interface_quark (proxy), proxy);
}
static void
registry_global (void *data, uint32_t id, uint32_t permissions,
uint32_t type, uint32_t version, const struct spa_dict *props)
{
WpCore *self = WP_CORE (data);
WpProxy *proxy;
WpProxyFeatures features;
g_autoptr (WpProperties) properties = wp_properties_new_copy_dict (props);
WpGlobal *global = NULL;
guint i;
g_return_if_fail (self->globals->len <= id ||
g_ptr_array_index (self->globals, id) == NULL);
g_debug ("registry global:%u perm:0x%x type:%u/%u (%s)",
id, permissions, type, version,
spa_debug_type_find_name (pw_type_info (), type));
g_return_if_fail (!g_hash_table_contains (self->proxies, GUINT_TO_POINTER (id)));
/* construct & store the global */
global = wp_global_new ();
global->id = id;
global->type = type;
global->version = version;
global->permissions = permissions;
global->properties = wp_properties_new_copy_dict (props);
/* construct & store WpProxy */
proxy = wp_proxy_new_global (self, id, permissions, properties,
type, version);
g_hash_table_insert (self->proxies, GUINT_TO_POINTER (id), proxy);
if (self->globals->len <= id)
g_ptr_array_set_size (self->globals, id + 1);
g_debug ("registry global:%u perm:0x%x type:%u/%u -> %s:%p",
id, permissions, type, version, G_OBJECT_TYPE_NAME (proxy), proxy);
g_ptr_array_index (self->globals, id) = global;
/* augment */
features = GPOINTER_TO_UINT (g_hash_table_lookup (self->default_features,
GUINT_TO_POINTER (G_TYPE_FROM_INSTANCE (proxy))));
wp_proxy_augment (proxy, features, NULL, on_proxy_ready, self);
/* notify object managers */
for (i = 0; i < self->object_managers->len; i++) {
WpObjectManager *om = g_ptr_array_index (self->object_managers, i);
wp_object_manager_add_global (om, global);
}
}
static void
registry_global_remove (void *data, uint32_t id)
{
WpCore *self = WP_CORE (data);
g_autoptr (WpProxy) proxy = NULL;
g_autoptr (WpGlobal) global = NULL;
guint i;
g_hash_table_steal_extended (self->proxies, GUINT_TO_POINTER (id), NULL,
(gpointer *) &proxy);
global = g_steal_pointer (&g_ptr_array_index (self->globals, id));
g_debug ("registry global removed: %u (%p)", id, proxy);
g_debug ("registry global removed:%u type:%u/%u (%s)", id,
global->type, global->version,
spa_debug_type_find_name (pw_type_info (), global->type));
if (proxy)
g_signal_emit (data, signals[SIGNAL_REMOTE_GLOBAL_REMOVED],
wp_proxy_get_interface_quark (proxy), proxy);
/* notify object managers */
for (i = 0; i < self->object_managers->len; i++) {
WpObjectManager *om = g_ptr_array_index (self->object_managers, i);
wp_object_manager_rm_global (om, id);
}
}
static const struct pw_registry_proxy_events registry_proxy_events = {
......@@ -232,25 +218,21 @@ static const struct pw_remote_events remote_events = {
.state_changed = on_remote_state_changed,
};
static void
free_global_object (gpointer p)
/* wrapper around wp_global_unref because
the WpGlobal pointers in self->globals can be NULL */
static inline void
free_global (WpGlobal * g)
{
struct global_object *g = p;
/* Destroy the object */
if (g->destroy)
g->destroy(g->object);
g_slice_free (struct global_object, p);
if (g)
wp_global_unref (g);
}
static void
wp_core_init (WpCore * self)
{
self->proxies = g_hash_table_new_full (g_direct_hash, g_direct_equal, NULL,
g_object_unref);
self->default_features = g_hash_table_new (g_direct_hash, g_direct_equal);
self->global_objects = g_ptr_array_new_with_free_func (free_global_object);
self->globals = g_ptr_array_new_with_free_func ((GDestroyNotify) free_global);
self->objects = g_ptr_array_new_with_free_func (g_object_unref);
self->object_managers = g_ptr_array_new ();
}
static void
......@@ -274,22 +256,62 @@ wp_core_constructed (GObject *object)
G_OBJECT_CLASS (wp_core_parent_class)->constructed (object);
}
static void object_manager_destroyed (gpointer data, GObject * om);
static void
wp_core_dispose (GObject * obj)
{
WpCore *self = WP_CORE (obj);
g_autoptr (GPtrArray) global_objects;
struct global_object *global;
global_objects = g_steal_pointer (&self->global_objects);
/* Remove and emit the removed signal for all globals */
while (global_objects->len > 0) {
global = g_ptr_array_steal_index_fast (global_objects,
global_objects->len - 1);
g_signal_emit (self, signals[SIGNAL_GLOBAL_REMOVED], global->key,
global->key, global->object);
free_global_object (global);
/* remove pipewire globals */
{
g_autoptr (GPtrArray) objlist = g_steal_pointer (&self->globals);
while (objlist->len > 0) {
guint i;
g_autoptr (WpGlobal) global = g_ptr_array_steal_index_fast (objlist,
objlist->len - 1);
if (!global)
continue;
for (i = 0; i < self->object_managers->len; i++) {
WpObjectManager *om = g_ptr_array_index (self->object_managers, i);
wp_object_manager_rm_global (om, global->id);
}
}
}
/* remove all the registered objects
this will normally also destroy the object managers, eventually, since
they are normally ref'ed by modules, which are registered objects */
{
g_autoptr (GPtrArray) objlist = g_steal_pointer (&self->objects);
while (objlist->len > 0) {
guint i;
g_autoptr (GObject) object = g_ptr_array_steal_index_fast (objlist,
objlist->len - 1);
for (i = 0; i < self->object_managers->len; i++) {
WpObjectManager *om = g_ptr_array_index (self->object_managers, i);
wp_object_manager_rm_object (om, object);
}
}
}
/* in case there are any object managers left,
remove the weak ref on them and let them be... */
{
g_autoptr (GPtrArray) object_mgrs;
GObject *om;
object_mgrs = g_steal_pointer (&self->object_managers);
while (object_mgrs->len > 0) {
om = g_ptr_array_steal_index_fast (object_mgrs, object_mgrs->len - 1);
g_object_weak_unref (om, object_manager_destroyed, self);
}
}
G_OBJECT_CLASS (wp_core_parent_class)->dispose (obj);
......@@ -300,8 +322,6 @@ wp_core_finalize (GObject * obj)
{
WpCore *self = WP_CORE (obj);
g_clear_pointer (&self->proxies, g_hash_table_unref);
g_clear_pointer (&self->default_features, g_hash_table_unref);
g_clear_pointer (&self->pw_remote, pw_remote_destroy);
self->core_proxy= NULL;
self->registry_proxy = NULL;
......@@ -402,22 +422,6 @@ wp_core_class_init (WpCoreClass * klass)
signals[SIGNAL_REMOTE_STATE_CHANGED] = g_signal_new ("remote-state-changed",
G_TYPE_FROM_CLASS (klass), G_SIGNAL_DETAILED | G_SIGNAL_RUN_LAST,
0, NULL, NULL, NULL, G_TYPE_NONE, 1, WP_TYPE_REMOTE_STATE);
signals[SIGNAL_GLOBAL_ADDED] = g_signal_new ("global-added",
G_TYPE_FROM_CLASS (klass), G_SIGNAL_DETAILED | G_SIGNAL_RUN_LAST, 0, NULL,
NULL, NULL, G_TYPE_NONE, 2, G_TYPE_UINT, G_TYPE_POINTER);
signals[SIGNAL_GLOBAL_REMOVED] = g_signal_new ("global-removed",
G_TYPE_FROM_CLASS (klass), G_SIGNAL_DETAILED | G_SIGNAL_RUN_LAST, 0, NULL,
NULL, NULL, G_TYPE_NONE, 2, G_TYPE_UINT, G_TYPE_POINTER);
signals[SIGNAL_REMOTE_GLOBAL_ADDED] = g_signal_new ("remote-global-added",
G_TYPE_FROM_CLASS (klass), G_SIGNAL_DETAILED | G_SIGNAL_RUN_LAST,
0, NULL, NULL, NULL, G_TYPE_NONE, 1, WP_TYPE_PROXY);
signals[SIGNAL_REMOTE_GLOBAL_REMOVED] = g_signal_new ("remote-global-removed",
G_TYPE_FROM_CLASS (klass), G_SIGNAL_DETAILED | G_SIGNAL_RUN_LAST,
0, NULL, NULL, NULL, G_TYPE_NONE, 1, WP_TYPE_PROXY);
}
WpCore *
......@@ -485,16 +489,6 @@ wp_core_get_remote_state (WpCore * self, const gchar ** error)
return (WpRemoteState) pw_remote_get_state (self->pw_remote, error);
}
void
wp_core_set_default_proxy_features (WpCore * self,
GType proxy_type, WpProxyFeatures features)
{
g_return_if_fail (WP_IS_CORE (self));
g_hash_table_insert (self->default_features, GUINT_TO_POINTER (proxy_type),
GUINT_TO_POINTER (features));
}
WpProxy *
wp_core_create_remote_object (WpCore *self,
const gchar *factory_name, guint32 interface_type,
......@@ -527,130 +521,137 @@ wp_core_get_pw_registry_proxy (WpCore * self)
}
/**
* wp_core_get_global: (method)
* wp_core_find_object: (skip)
* @self: the core
* @key: the key of the global
* @func: (scope call): a function that takes the object being searched
* as the first argument and @data as the second. it should return TRUE if
* the object is found or FALSE otherwise
* @data: the second argument to @func
*
* Finds a registered object
*
* Returns: (type GObject*) (nullable) (transfer none): the global object
* associated with @key; if multiple globals with the same key exist, the
* first one found is returned
* Returns: (transfer full) (type GObject *) (nullable): the registered object
* or NULL if not found
*/
gpointer
wp_core_get_global (WpCore * self, GQuark key)
wp_core_find_object (WpCore * self, GEqualFunc func, gconstpointer data)
{
gint i;
struct global_object *global;
GObject *object;
guint i;
g_return_val_if_fail (WP_IS_CORE (self), NULL);
if (G_UNLIKELY (!self->global_objects))
/* prevent bad things when called from within _dispose() */
if (G_UNLIKELY (!self->objects))
return NULL;
for (i = 0; i < self->global_objects->len; i++) {
global = g_ptr_array_index (self->global_objects, i);
if (global->key == key)
return global->object;
for (i = 0; i < self->objects->len; i++) {
object = g_ptr_array_index (self->objects, i);
if (func (object, data))
return g_object_ref (object);
}
return NULL;
}
/**
* wp_core_foreach_global: (method)
* wp_core_register_object: (skip)
* @self: the core
* @callback: (scope call): the function to call for each global object
* @user_data: data to passs to @callback
* @obj: (transfer full) (type GObject*): the object to register
*
* Calls @callback for every global object registered
* Registers @obj with the core, making it appear on #WpObjectManager
* instances as well. The core will also maintain a ref to that object
* until it is removed.
*/
void
wp_core_foreach_global (WpCore * self, WpCoreForeachGlobalFunc callback,
gpointer user_data)
wp_core_register_object (WpCore * self, gpointer obj)
{
gint i;
struct global_object *global;
guint i;
g_return_if_fail (WP_IS_CORE (self));
g_return_if_fail (G_IS_OBJECT (obj));
if (G_UNLIKELY (!self->global_objects))
/* prevent bad things when called from within _dispose() */
if (G_UNLIKELY (!self->objects)) {
g_object_unref (obj);
return;
}
for (i = 0; i < self->global_objects->len; i++) {
global = g_ptr_array_index (self->global_objects, i);
if (!callback (global->key, global->object, user_data))
break;
g_ptr_array_add (self->objects, obj);
/* notify object managers */
for (i = 0; i < self->object_managers->len; i++) {
WpObjectManager *om = g_ptr_array_index (self->object_managers, i);
wp_object_manager_add_object (om, obj);
}
}
/**
* wp_core_register_global: (method)
* wp_core_remove_object: (skip)
* @self: the core
* @key: the key for this global
* @obj: (transfer full): the global object to attach
* @destroy_obj: the destroy function for @obj
* @obj: (transfer none) (type GObject*): a pointer to the object to remove
*
* Registers @obj as a global object associated with @key
* Detaches and unrefs the specified object from this core
*/
void
wp_core_register_global (WpCore * self, GQuark key, gpointer obj,
GDestroyNotify destroy_obj)
wp_core_remove_object (WpCore * self, gpointer obj)
{
struct global_object *global;
guint i;
g_return_if_fail (WP_IS_CORE(self));
g_return_if_fail (WP_IS_CORE (self));
g_return_if_fail (G_IS_OBJECT (obj));
if (G_UNLIKELY (!self->global_objects)) {
if (destroy_obj)
destroy_obj (obj);
/* prevent bad things when called from within _dispose() */
if (G_UNLIKELY (!self->objects))
return;
/* notify object managers */
for (i = 0; i < self->object_managers->len; i++) {
WpObjectManager *om = g_ptr_array_index (self->object_managers, i);
wp_object_manager_rm_object (om, obj);
}
global = g_slice_new0 (struct global_object);
global->key = key;
global->object = obj;
global->destroy = destroy_obj;
g_ptr_array_add (self->global_objects, global);
g_ptr_array_remove_fast (self->objects, obj);
}
g_signal_emit (self, signals[SIGNAL_GLOBAL_ADDED], key, key, obj);
static void
object_manager_destroyed (gpointer data, GObject * om)
{
WpCore *self = WP_CORE (data);
g_ptr_array_remove_fast (self->object_managers, om);
}
/**
* wp_core_remove_global: (method)
* wp_core_install_object_manager: (method)
* @self: the core
* @key: the key for this global
* @obj: (nullable): a pointer to the global object to match, if there are
* multiple ones with the same key
* @om: (transfer none): a #WpObjectManager
*
* Detaches and unrefs the specified global from this core
* Installs the object manager on this core, activating its internal management
* engine. This will immediately emit signals about objects added on @om
* if objects that the @om is interested in were in existence already.
*/
void
wp_core_remove_global (WpCore * self, GQuark key, gpointer obj)
wp_core_install_object_manager (WpCore * self, WpObjectManager * om)
{
gint i;
struct global_object *global;
guint i;
g_return_if_fail (WP_IS_CORE (self));
if (G_UNLIKELY (!self->global_objects))
return;
for (i = 0; i < self->global_objects->len; i++) {
global = g_ptr_array_index (self->global_objects, i);
if (global->key == key && (!obj || global->object == obj))
break;
g_return_if_fail (WP_IS_OBJECT_MANAGER (om));
g_object_weak_ref (G_OBJECT (om), object_manager_destroyed, self);
g_ptr_array_add (self->object_managers, om);
g_object_set (om, "core", self, NULL);
/* add pre-existing objects to the object manager,
in case it's interested in them */
for (i = 0; i < self->globals->len; i++) {
WpGlobal *g = g_ptr_array_index (self->globals, i);
/* check if null because the globals array can have gaps */
if (g)
wp_object_manager_add_global (om, g);
}
if (i < self->global_objects->len) {
global = g_ptr_array_steal_index_fast (self->global_objects, i);
g_signal_emit (self, signals[SIGNAL_GLOBAL_REMOVED], key,
key, global->object);
free_global_object (global);
for (i = 0; i < self->objects->len; i++) {
GObject *o = g_ptr_array_index (self->objects, i);
wp_object_manager_add_object (om, o);
}
}
G_DEFINE_QUARK (endpoint, wp_global_endpoint)
G_DEFINE_QUARK (factory, wp_global_factory)
G_DEFINE_QUARK (module, wp_global_module)
G_DEFINE_QUARK (policy-manager, wp_global_policy_manager)
......@@ -10,6 +10,7 @@
#define __WIREPLUMBER_CORE_H__
#include <glib-object.h>
#include "object-manager.h"
#include "proxy.h"
G_BEGIN_DECLS
......@@ -45,13 +46,12 @@ struct pw_remote * wp_core_get_pw_remote (WpCore * self);
gboolean wp_core_connect (WpCore * self);
WpRemoteState wp_core_get_remote_state (WpCore * self, const gchar ** error);
void wp_core_set_default_proxy_features (
WpCore * self, GType proxy_type, WpProxyFeatures features);
WpProxy * wp_core_create_remote_object (WpCore * self,
const gchar * factory_name, guint32 interface_type,
guint32 interface_version, WpProperties * properties);
void wp_core_install_object_manager (WpCore * self, WpObjectManager * om);
G_END_DECLS
#endif
......@@ -356,8 +356,7 @@ wp_endpoint_register (WpEndpoint * self)
g_info ("WpEndpoint:%p registering '%s' (%s)", self, priv->name,
priv->media_class);
wp_core_register_global (core, WP_GLOBAL_ENDPOINT, g_object_ref (self),
g_object_unref);
wp_core_register_object (core, g_object_ref (self));
}
/**
......@@ -387,82 +386,10 @@ wp_endpoint_unregister (WpEndpoint * self)
g_info ("WpEndpoint:%p unregistering '%s' (%s)", self, priv->name,
priv->media_class);
wp_core_remove_global (core, WP_GLOBAL_ENDPOINT, self);
wp_core_remove_object (core, self);
}
}
struct endpoints_foreach_data
{
GPtrArray *result;
const gchar *lookup;
};
static inline gboolean
media_class_matches (const gchar * media_class, const gchar * lookup)
{
const gchar *c1 = media_class, *c2 = lookup;
/* empty lookup matches all classes */
if (!lookup)
return TRUE;
/* compare until we reach the end of the lookup string */
for (; *c2 != '\0'; c1++, c2++) {
if (*c1 != *c2)
return FALSE;
}
/* the lookup may not end in a slash, however it must match up
* to the end of a submedia_class. i.e.:
* match: media_class: Audio/Source/Virtual
* lookup: Audio/Source
*
* NO match: media_class: Audio/Source/Virtual
* lookup: Audio/Sou
*
* if *c1 is not /, also check the previous char, because the lookup
* may actually end in a slash:
*
* match: media_class: Audio/Source/Virtual
* lookup: Audio/Source/
*/
if (!(*c1 == '/' || *c1 == '\0' || *(c1 - 1) == '/'))
return FALSE;
return TRUE;
}
static gboolean
find_endpoints (GQuark key, gpointer global, gpointer user_data)
{
struct endpoints_foreach_data * data = user_data;
if (key == WP_GLOBAL_ENDPOINT &&
media_class_matches (wp_endpoint_get_media_class (WP_ENDPOINT (global)),
data->lookup))
g_ptr_array_add (data->result, g_object_ref (global));
return WP_CORE_FOREACH_GLOBAL_CONTINUE;
}
/**
* wp_endpoint_find:
* @core: the core
* @media_class_lookup: the media class lookup string
*
* Returns: (element-type WpEndpoint) (transfer full): an array with all the
* endpoints matching the media class lookup string
*/
GPtrArray *
wp_endpoint_find (WpCore * core, const gchar * media_class_lookup)
{
struct endpoints_foreach_data data;
data.result = g_ptr_array_new_with_free_func (g_object_unref);
data.lookup = media_class_lookup;