Skip to content
GitLab
Projects Groups Snippets
  • /
  • Help
    • Help
    • Support
    • Community forum
    • Submit feedback
    • Contribute to GitLab
  • Register
  • Sign in
  • zbus zbus
  • Project information
    • Project information
    • Activity
    • Labels
    • Members
  • Repository
    • Repository
    • Files
    • Commits
    • Branches
    • Tags
    • Contributors
    • Graph
    • Compare
  • Issues 81
    • Issues 81
    • List
    • Boards
    • Service Desk
    • Milestones
  • Merge requests 4
    • Merge requests 4
  • CI/CD
    • CI/CD
    • Pipelines
    • Jobs
    • Schedules
  • Deployments
    • Deployments
    • Environments
    • Releases
  • Packages and registries
    • Packages and registries
    • Package Registry
    • Container Registry
    • Infrastructure Registry
  • Monitor
    • Monitor
    • Incidents
  • Analytics
    • Analytics
    • Value stream
    • CI/CD
    • Repository
  • Wiki
    • Wiki
  • Snippets
    • Snippets
  • Activity
  • Graph
  • Create a new issue
  • Jobs
  • Commits
  • Issue Boards
Collapse sidebar

Due to an influx of spam, we have had to impose restrictions on new accounts. Please see this wiki page for instructions on how to get full permissions. Sorry for the inconvenience.

Update #1:

Due to abusive users subverting our CI facilities to mine cryptocurrency, breaking out of the container sandbox in the process, we have been forced to take actions to limit the usage of the public runners to official projects only.

The policy will be enforced on 2023-03-23 (or before if we detect abuses).

Please see this issue for more context and to see if and how you are impacted.

  • dbusdbus
  • zbuszbus
  • Issues
  • #233
Closed
Open
Issue created Jan 01, 2022 by Max Verevkin@MaxVerevkinContributor

Convenient API to create stream based on server-side filtering (match rules)

use futures::StreamExt;
use tokio::task::JoinHandle;

type AnyResult<T> = Result<T, Box<dyn std::error::Error + Send + Sync>>;

#[tokio::main]
async fn main() -> AnyResult<()> {
    let dbus1 = zbus::Connection::system().await?;
    let dbus2 = dbus1.clone();

    let task1: JoinHandle<AnyResult<()>> = tokio::spawn(async move {
        let proxy = zbus::fdo::DBusProxy::new(&dbus1).await?;
        proxy
            .add_match(
                "type='signal',\
                    path='/org/freedesktop/NetworkManager',\
                    interface='org.freedesktop.DBus.Properties',\
                    member='PropertiesChanged'",
            )
            .await?;
        let mut stream = zbus::MessageStream::from(dbus1);
        let mut cnt = 0;
        loop {
            if let Some(_) = stream.next().await {
                cnt += 1;
                eprintln!("task1 received {} messages", cnt);
            }
        }
    });

    let task2: JoinHandle<AnyResult<()>> = tokio::spawn(async move {
        let mut stream = zbus::MessageStream::from(dbus2);
        let mut cnt = 0;
        loop {
            if let Some(_) = stream.next().await {
                cnt += 1;
                eprintln!("task2 received {} messages", cnt);
            }
        }
    });

    let (a, b) = tokio::try_join!(task1, task2)?;
    a?;
    b?;
    Ok(())
}

This code opens a connection and gives each task it's own clone. The first task adds a match rule and then converts its connection into a message stream. The second task just converts its connection into a stream.

So what I would like to happen is: the first task receives it's matches, while second task doesn't. Is this possible without manual filtering of messages? I suppose I can just open two separate connections, but I wonder if I can get away with only once connection.

Edited Jan 02, 2022 by Zeeshan Ali Khan
Assignee
Assign to
Time tracking