Crash when receive_signal is called concurrently
When azync::Proxy::receive_signal
is called concurrently on different threads, threads can panic as follows:
thread 'tokio-runtime-worker' panicked at 'Attempted to set dest_unique_name twice: OwnedUniqueName(UniqueName(Str(":1.0")))', ~/.cargo/registry/src/github.com-1ecc6299db9ec823/zbus-2.0.0-beta.6/src/azync/proxy.rs:797:14
We encountered the problem in the following code:
/// Calls a systemd method and waits until the dependent job is
/// finished.
///
/// The given method enqueues a job in systemd and returns the job
/// object. Systemd sends out a `JobRemoved` signal when the job is
/// dequeued. The signal contains the reason for the dequeuing like
/// `"done"`, `"failed"`, or `"canceled"`.
///
/// This function subscribes to `JobRemoved` signals, calls the
/// given method, awaits the signal for the corresponding job, and
/// returns `Ok(())` if the result is [`JobRemovedResult::Done`].
/// If the signal contains another result or no signal is returned
/// (which should never happen) then an error with a corresponding
/// message is returned.
async fn call_method<'a, F, Fut>(&'a self, method: F) -> anyhow::Result<()>
where
F: Fn(&'a AsyncManagerProxy) -> Fut,
Fut: Future<Output = zbus::Result<AsyncJobProxy<'a>>>,
{
let signals = self
.proxy
.receive_signal(ManagerSignals::JobRemoved)
.await?
.map(|message| message.body::<JobRemovedSignal>().unwrap());
let job = method(&self.proxy).await?;
let mut signals = signals
.filter(|signal| future::ready(&signal.job.to_owned().into_inner() == job.path()));
let signal = signals.next().await;
match signal {
Some(message) if message.result == JobRemovedResult::Done => Ok(()),
Some(message) => Err(anyhow!("The systemd job failed: {:?}", message)),
None => Err(anyhow!(
"No signal was returned for the systemd job: {:?}",
job
)),
}
}
This code worked with zbus-2.0.0-beta.5 but fails with zbus-2.0.0-beta.6.