Convenient API to create stream based on server-side filtering (match rules)
use futures::StreamExt;
use tokio::task::JoinHandle;
type AnyResult<T> = Result<T, Box<dyn std::error::Error + Send + Sync>>;
#[tokio::main]
async fn main() -> AnyResult<()> {
let dbus1 = zbus::Connection::system().await?;
let dbus2 = dbus1.clone();
let task1: JoinHandle<AnyResult<()>> = tokio::spawn(async move {
let proxy = zbus::fdo::DBusProxy::new(&dbus1).await?;
proxy
.add_match(
"type='signal',\
path='/org/freedesktop/NetworkManager',\
interface='org.freedesktop.DBus.Properties',\
member='PropertiesChanged'",
)
.await?;
let mut stream = zbus::MessageStream::from(dbus1);
let mut cnt = 0;
loop {
if let Some(_) = stream.next().await {
cnt += 1;
eprintln!("task1 received {} messages", cnt);
}
}
});
let task2: JoinHandle<AnyResult<()>> = tokio::spawn(async move {
let mut stream = zbus::MessageStream::from(dbus2);
let mut cnt = 0;
loop {
if let Some(_) = stream.next().await {
cnt += 1;
eprintln!("task2 received {} messages", cnt);
}
}
});
let (a, b) = tokio::try_join!(task1, task2)?;
a?;
b?;
Ok(())
}
This code opens a connection and gives each task it's own clone. The first task adds a match rule and then converts its connection into a message stream. The second task just converts its connection into a stream.
So what I would like to happen is: the first task receives it's matches, while second task doesn't. Is this possible without manual filtering of messages? I suppose I can just open two separate connections, but I wonder if I can get away with only once connection.
Edited by Zeeshan Ali Khan