Commit a12d71b9 authored by Zeeshan Ali's avatar Zeeshan Ali Committed by Marc-André Lureau
Browse files

zb: wrap Message with an Arc


Signed-off-by: default avatarMarc-André Lureau <marcandre.lureau@redhat.com>
parent bc3bde99
......@@ -410,7 +410,7 @@ impl Connection {
/// not. All messages received during this call that are not returned by it, are pushed to the
/// queue to be picked by the susubsequent or awaiting call to this method or by the
/// `MessageStream`.
pub async fn receive_specific<P>(&self, predicate: P) -> Result<Message>
pub async fn receive_specific<P>(&self, predicate: P) -> Result<Arc<Message>>
where
for<'msg> P: Fn(&'msg Message) -> BoxFuture<'msg, Result<bool>>,
{
......@@ -427,7 +427,7 @@ impl Connection {
} {
// SAFETY: we got the index from the queue enumerator so this shouldn't ever
// fail.
return queue.remove(i).expect("removing queue item");
return queue.remove(i).expect("removing queue item").map(Arc::new);
}
}
}
......@@ -470,7 +470,7 @@ impl Connection {
}
}
msg
msg.map(Arc::new)
}
/// Send `msg` to the peer.
......@@ -500,7 +500,7 @@ impl Connection {
interface: Option<&str>,
method_name: &str,
body: &B,
) -> Result<Message>
) -> Result<Arc<Message>>
where
B: serde::ser::Serialize + zvariant::Type,
MessageError: From<E>,
......@@ -884,11 +884,11 @@ impl futures_sink::Sink<Message> for MessageSink<'_> {
/// the `receive_specific` is waiting for and end up in a deadlock situation. It is therefore highly
/// recommended not to use such a combination.
pub struct MessageStream {
stream: stream::BoxStream<'static, Result<Message>>,
stream: stream::BoxStream<'static, Result<Arc<Message>>>,
}
impl stream::Stream for MessageStream {
type Item = Result<Message>;
type Item = Result<Arc<Message>>;
fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
stream::Stream::poll_next(self.get_mut().stream.as_mut(), cx)
......@@ -953,7 +953,7 @@ mod tests {
let (client_conn, server_conn) = futures_util::try_join!(client, server)?;
let server_future = async {
let mut method: Option<Message> = None;
let mut method: Option<Arc<Message>> = None;
while let Some(m) = server_conn.stream().await.try_next().await? {
if m.to_string() == "Method call Test" {
method.replace(m);
......
......@@ -236,7 +236,7 @@ impl<'a> Proxy<'a> {
/// allocation/copying, by deserializing the reply to an unowned type).
///
/// [`call`]: struct.Proxy.html#method.call
pub async fn call_method<B>(&self, method_name: &str, body: &B) -> Result<Message>
pub async fn call_method<B>(&self, method_name: &str, body: &B) -> Result<Arc<Message>>
where
B: serde::ser::Serialize + zvariant::Type,
{
......@@ -422,7 +422,7 @@ impl<'a> Proxy<'a> {
/// # Errors
///
/// This method returns the same errors as [`Self::receive_signal`].
pub async fn next_signal(&self) -> Result<Option<Message>> {
pub async fn next_signal(&self) -> Result<Option<Arc<Message>>> {
let msg = {
// We want to keep a lock on the handlers during `receive_specific` call but we also
// want to avoid using `handlers` directly as that somehow makes this call (or rather
......@@ -550,7 +550,7 @@ impl<'a> Proxy<'a> {
#[derivative(Debug)]
pub struct SignalStream<'s> {
#[derivative(Debug = "ignore")]
stream: stream::BoxStream<'s, Message>,
stream: stream::BoxStream<'s, Arc<Message>>,
conn: Connection,
subscription_id: Option<u64>,
}
......@@ -574,7 +574,7 @@ impl SignalStream<'_> {
}
impl stream::Stream for SignalStream<'_> {
type Item = Message;
type Item = Arc<Message>;
fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
stream::Stream::poll_next(self.get_mut().stream.as_mut(), cx)
......
......@@ -6,6 +6,7 @@ use std::{
io::{AsRawFd, RawFd},
net::UnixStream,
},
sync::Arc,
};
use zvariant::ObjectPath;
......@@ -152,7 +153,7 @@ impl Connection {
/// with situation where this method takes away the message the other API is awaiting for and
/// end up in a deadlock situation. It is therefore highly recommended not to use such a
/// combination.
pub fn receive_message(&self) -> Result<Message> {
pub fn receive_message(&self) -> Result<Arc<Message>> {
self.receive_specific(|_| Ok(true))
}
......@@ -162,7 +163,7 @@ impl Connection {
/// decides if the message received should be returned by this method or not. Message received
/// during this call that are not returned by it, are pushed to the queue to be picked by the
/// susubsequent call to `receive_message`] or this method.
pub fn receive_specific<P>(&self, predicate: P) -> Result<Message>
pub fn receive_specific<P>(&self, predicate: P) -> Result<Arc<Message>>
where
P: Fn(&Message) -> Result<bool>,
{
......@@ -209,7 +210,7 @@ impl Connection {
iface: Option<&str>,
method_name: &str,
body: &B,
) -> Result<Message>
) -> Result<Arc<Message>>
where
B: serde::ser::Serialize + zvariant::Type,
MessageError: From<E>,
......
use std::{convert::Infallible, error, fmt, io};
use std::{convert::Infallible, error, fmt, io, sync::Arc};
use zvariant::Error as VariantError;
use crate::{fdo, Message, MessageError, MessageType};
......@@ -26,7 +26,7 @@ pub enum Error {
/// A D-Bus method error reply.
// According to the spec, there can be all kinds of details in D-Bus errors but nobody adds anything more than a
// string description.
MethodError(String, Option<String>, Message),
MethodError(String, Option<String>, Arc<Message>),
/// Invalid D-Bus GUID.
InvalidGUID,
/// Unsupported function, or support currently lacking.
......@@ -148,6 +148,12 @@ impl From<Infallible> for Error {
// For messages that are D-Bus error returns
impl From<Message> for Error {
fn from(message: Message) -> Error {
Self::from(Arc::new(message))
}
}
impl From<Arc<Message>> for Error {
fn from(message: Arc<Message>) -> Error {
// FIXME: Instead of checking this, we should have Method as trait and specific types for
// each message type.
let header = match message.header() {
......
......@@ -5,6 +5,7 @@ use std::{
convert::TryInto,
fmt::Write,
rc::Rc,
sync::Arc,
};
use scoped_tls::scoped_thread_local;
......@@ -524,7 +525,7 @@ impl ObjectServer {
/// interface, it returns `Ok(None)`. If not, it returns the received message.
///
/// Returns an error if the message is malformed or an error occured.
pub fn try_handle_next(&mut self) -> Result<Option<Message>> {
pub fn try_handle_next(&mut self) -> Result<Option<Arc<Message>>> {
let msg = self.conn.receive_message()?;
if !self.dispatch_message(&msg)? {
......
......@@ -2,6 +2,7 @@ use async_io::block_on;
use std::{
convert::{TryFrom, TryInto},
future::ready,
sync::Arc,
};
use zvariant::{ObjectPath, OwnedValue, Value};
......@@ -149,7 +150,7 @@ impl<'a> Proxy<'a> {
/// allocation/copying, by deserializing the reply to an unowned type).
///
/// [`call`]: struct.Proxy.html#method.call
pub fn call_method<B>(&self, method_name: &str, body: &B) -> Result<Message>
pub fn call_method<B>(&self, method_name: &str, body: &B) -> Result<Arc<Message>>
where
B: serde::ser::Serialize + zvariant::Type,
{
......@@ -223,7 +224,7 @@ impl<'a> Proxy<'a> {
/// Apart from general I/O errors that can result from socket communications, calling this
/// method will also result in an error if the destination service has not yet registered its
/// well-known name with the bus (assuming you're using the well-known name as destination).
pub fn next_signal(&self) -> Result<Option<Message>> {
pub fn next_signal(&self) -> Result<Option<Arc<Message>>> {
block_on(self.azync.next_signal())
}
......
......@@ -3,6 +3,7 @@ use std::{
borrow::Cow,
collections::HashMap,
convert::{AsRef, TryFrom},
sync::Arc,
};
use zvariant::ObjectPath;
......@@ -104,7 +105,7 @@ impl<'r, 'p> SignalReceiver<'r, 'p> {
///
/// If the signal message was handled by a handler, `Ok(None)` is returned. Otherwise, the
/// received message is returned.
pub fn next_signal(&self) -> Result<Option<crate::Message>> {
pub fn next_signal(&self) -> Result<Option<Arc<crate::Message>>> {
let msg = self.conn.receive_specific(|msg| {
let hdr = msg.header()?;
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment