Commit a9ab0f7a authored by Zeeshan Ali's avatar Zeeshan Ali
Browse files

tmp

parent ea68fd36
Pipeline #395106 skipped
......@@ -12,12 +12,13 @@ use std::{
future::ready,
hash::{Hash, Hasher},
io::{self, ErrorKind},
ops::{Deref, DerefMut},
os::unix::io::RawFd,
pin::Pin,
sync::{
self,
atomic::{AtomicU32, Ordering::SeqCst},
Arc,
Arc, RwLock,
},
task::{Context, Poll},
};
......@@ -36,7 +37,7 @@ use crate::{
azync::{Authenticated, BroadcastReceiver, ConnectionBuilder, MessageStream},
fdo,
raw::{Connection as RawConnection, Socket},
Error, Guid, Message, MessageType, Result,
Error, Guid, Message, MessageType, ObjectServer, Result,
};
const DEFAULT_MAX_QUEUED: usize = 64;
......@@ -126,6 +127,9 @@ struct ConnectionInner {
msg_receiver_task: sync::Mutex<Option<Task<()>>>,
signal_subscriptions: Mutex<HashMap<u64, SignalSubscription>>,
object_server: OnceCell<RwLock<ObjectServer>>,
object_server_dispatch_task: OnceCell<Task<()>>,
}
// FIXME: Should really use [`AsyncDrop`] for `ConnectionInner` when we've something like that to
......@@ -608,6 +612,42 @@ impl Connection {
(self.inner.raw_conn.lock().expect("poisened lock").socket()).as_raw_fd()
}
// TODO:
pub fn object_server(&self) -> impl Deref<Target = ObjectServer> + '_ {
self.inner
.object_server
.get_or_init(|| self.setup_object_server())
.read()
.expect("lock poisoned")
}
// TODO:
pub fn object_server_mut(&self) -> impl DerefMut<Target = ObjectServer> + '_ {
self.inner
.object_server
.get_or_init(|| self.setup_object_server())
.write()
.expect("lock poisoned")
}
fn setup_object_server(&self) -> RwLock<ObjectServer> {
let conn = crate::Connection::from(self.clone());
let mut stream = MessageStream::from(self.clone());
let task = self.inner.executor.spawn(async move {
// TODO: Log errors when we've logging.
while let Some(msg) = stream.next().await.and_then(|m| m.ok()) {
let mut server = conn.object_server_mut();
let _ = server.dispatch_message(&conn, &*msg);
}
});
self.inner
.object_server_dispatch_task
.set(task)
.expect("object server task set twice");
RwLock::new(ObjectServer::new())
}
pub(crate) async fn subscribe_signal<'s, 'p, 'i, 'm, S, P, I, M>(
&self,
sender: S,
......@@ -786,6 +826,8 @@ impl Connection {
serial: AtomicU32::new(1),
unique_name: OnceCell::new(),
signal_subscriptions: Mutex::new(HashMap::new()),
object_server: OnceCell::new(),
object_server_dispatch_task: OnceCell::new(),
executor: executor.clone(),
msg_receiver_task: sync::Mutex::new(Some(msg_receiver_task)),
registered_names: Mutex::new(HashSet::new()),
......
use static_assertions::assert_impl_all;
use std::{
convert::TryInto,
ops::{Deref, DerefMut},
os::unix::io::{AsRawFd, RawFd},
sync::Arc,
};
......@@ -9,7 +10,7 @@ use zvariant::ObjectPath;
use async_io::block_on;
use crate::{azync, Error, Message, Result};
use crate::{azync, Error, Message, ObjectServer, Result};
/// A D-Bus connection.
///
......@@ -235,6 +236,16 @@ impl Connection {
self.inner.is_bus()
}
// TODO:
pub fn object_server(&self) -> impl Deref<Target = ObjectServer> + '_ {
self.inner.object_server()
}
// TODO:
pub fn object_server_mut(&self) -> impl DerefMut<Target = ObjectServer> + '_ {
self.inner.object_server_mut()
}
/// Get a reference to the underlying async Connection.
pub fn inner(&self) -> &azync::Connection {
&self.inner
......
......@@ -598,11 +598,9 @@ mod tests {
// signature we receive on the reply message.
use zvariant::{ObjectPath, Value};
let conn = Connection::session().unwrap();
let stream = MessageStream::from(&conn);
let service_name = conn.unique_name().unwrap().clone();
let mut object_server = super::ObjectServer::new(&conn);
struct Secret(Arc<Mutex<bool>>);
struct Secret;
#[super::dbus_interface(name = "org.freedesktop.Secret.Service")]
impl Secret {
fn open_session(
......@@ -610,7 +608,6 @@ mod tests {
_algorithm: &str,
input: Value<'_>,
) -> zbus::fdo::Result<(OwnedValue, OwnedObjectPath)> {
*self.0.lock().unwrap() = true;
Ok((
OwnedValue::from(input),
ObjectPath::try_from("/org/freedesktop/secrets/Blah")
......@@ -620,9 +617,8 @@ mod tests {
}
}
let quit = Arc::new(Mutex::new(false));
let secret = Secret(quit.clone());
object_server
let secret = Secret;
conn.object_server_mut()
.at("/org/freedesktop/secrets", secret)
.unwrap();
......@@ -650,17 +646,6 @@ mod tests {
2u32
});
for m in stream {
let m = m.unwrap();
if let Err(e) = object_server.dispatch_message(&m) {
eprintln!("{}", e);
}
if *quit.lock().unwrap() {
break;
}
}
let val = child.join().expect("failed to join");
assert_eq!(val, 2);
}
......@@ -813,16 +798,17 @@ mod tests {
.unwrap()
.request_name("org.freedesktop.zbus.ComeAndGo")
.unwrap();
let mut object_server = super::ObjectServer::new(&conn);
object_server
conn.object_server_mut()
.at("/org/freedesktop/zbus/ComeAndGo", ComeAndGo)
.unwrap();
object_server
.with("/org/freedesktop/zbus/ComeAndGo", |_: &ComeAndGo, ctxt| {
ComeAndGo::the_signal(ctxt)
})
conn.object_server_mut()
.with(
"/org/freedesktop/zbus/ComeAndGo",
&conn,
|_: &ComeAndGo, ctxt| ComeAndGo::the_signal(ctxt),
)
.unwrap();
rx.recv().unwrap();
......
......@@ -3,7 +3,6 @@ use std::{
collections::{hash_map::Entry, HashMap},
convert::TryInto,
fmt::Write,
io::{self, ErrorKind},
marker::PhantomData,
ops::{Deref, DerefMut},
sync::{Arc, RwLock, RwLockWriteGuard},
......@@ -16,7 +15,7 @@ use zvariant::{ObjectPath, OwnedObjectPath, OwnedValue, Value};
use crate::{
fdo,
fdo::{Introspectable, Peer, Properties},
Connection, Error, Message, MessageHeader, MessageStream, MessageType, Result, SignalContext,
Connection, Error, Message, MessageHeader, MessageType, Result, SignalContext,
};
/// The trait used to dispatch messages to an interface instance.
......@@ -323,23 +322,17 @@ impl Node {
/// }
///# Ok::<_, Box<dyn Error + Send + Sync>>(())
/// ```
#[derive(derivative::Derivative)]
#[derivative(Debug)]
#[derive(Debug)]
pub struct ObjectServer {
conn: Connection,
root: Node,
#[derivative(Debug = "ignore")]
msg_stream: MessageStream,
}
assert_impl_all!(ObjectServer: Send, Sync, Unpin);
impl ObjectServer {
/// Creates a new D-Bus `ObjectServer` for a given connection.
pub fn new(connection: &Connection) -> Self {
/// Creates a new D-Bus `ObjectServer`.
pub(crate) fn new() -> Self {
Self {
conn: connection.clone(),
msg_stream: MessageStream::from(connection),
root: Node::new("/".try_into().expect("zvariant bug")),
}
}
......@@ -462,14 +455,14 @@ impl ObjectServer {
///#
///# let path = "/org/zbus/path";
///# object_server.at(path, MyIface)?;
/// object_server.with(path, |_iface: &MyIface, signal_ctxt| {
/// object_server.with(path, &connection, |_iface: &MyIface, signal_ctxt| {
/// MyIface::emit_signal(signal_ctxt)
/// })?;
///#
///#
///# Ok::<_, Box<dyn Error + Send + Sync>>(())
/// ```
pub fn with<'p, P, F, I>(&self, path: P, func: F) -> Result<()>
pub fn with<'p, P, F, I>(&self, path: P, connection: &Connection, func: F) -> Result<()>
where
F: Fn(&I, &SignalContext<'_>) -> Result<()>,
I: Interface,
......@@ -479,7 +472,7 @@ impl ObjectServer {
let path = path.try_into().map_err(Into::into)?;
let node = self.get_node(&path).ok_or(Error::InterfaceNotFound)?;
// SAFETY: We know that there is a valid path on the node as we already converted w/o error.
let ctxt = SignalContext::new(&self.conn, path).unwrap();
let ctxt = SignalContext::new(connection, path).unwrap();
node.with_iface_func(func, &ctxt)
}
......@@ -519,7 +512,7 @@ impl ObjectServer {
///#
///# Ok::<_, Box<dyn Error + Send + Sync>>(())
/// ```
pub fn with_mut<'p, P, F, I>(&self, path: P, func: F) -> Result<()>
pub fn with_mut<'p, P, F, I>(&self, path: P, connection: &Connection, func: F) -> Result<()>
where
F: Fn(&mut I, &SignalContext<'_>) -> Result<()>,
I: Interface,
......@@ -529,7 +522,7 @@ impl ObjectServer {
let path = path.try_into().map_err(Into::into)?;
let node = self.get_node(&path).ok_or(Error::InterfaceNotFound)?;
// SAFETY: We know that there is a valid path on the node as we already converted w/o error.
let ctxt = SignalContext::new(&self.conn, path).unwrap();
let ctxt = SignalContext::new(connection, path).unwrap();
node.with_iface_func_mut(func, &ctxt)
}
......@@ -590,6 +583,7 @@ impl ObjectServer {
fn dispatch_method_call_try(
&mut self,
connection: &Connection,
msg_header: &MessageHeader<'_>,
msg: &Message,
) -> fdo::Result<Result<u32>> {
......@@ -623,23 +617,24 @@ impl ObjectServer {
let res = iface
.read()
.expect("lock poisoned")
.call(self, &self.conn, msg, member.clone());
.call(self, connection, msg, member.clone());
res.or_else(|| {
iface
.write()
.expect("lock poisoned")
.call_mut(self, &self.conn, msg, member.clone())
.call_mut(self, connection, msg, member.clone())
})
.ok_or_else(|| fdo::Error::UnknownMethod(format!("Unknown method '{}'", member)))
}
fn dispatch_method_call(
&mut self,
connection: &Connection,
msg_header: &MessageHeader<'_>,
msg: &Message,
) -> Result<u32> {
match self.dispatch_method_call_try(msg_header, msg) {
Err(e) => e.reply(&self.conn, msg),
match self.dispatch_method_call_try(connection, msg_header, msg) {
Err(e) => e.reply(connection, msg),
Ok(r) => r,
}
}
......@@ -653,49 +648,20 @@ impl ObjectServer {
/// - calling the associated method if one exists,
///
/// - returning a message (responding to the caller with either a return or error message) to
/// the caller through the associated server connection.
/// the caller through given connection.
///
/// Returns an error if the message is malformed, true if it's handled, false otherwise.
pub fn dispatch_message(&mut self, msg: &Message) -> Result<bool> {
pub fn dispatch_message(&mut self, connection: &Connection, msg: &Message) -> Result<bool> {
let msg_header = msg.header()?;
match msg_header.message_type()? {
MessageType::MethodCall => {
self.dispatch_method_call(&msg_header, msg)?;
self.dispatch_method_call(connection, &msg_header, msg)?;
Ok(true)
}
_ => Ok(false),
}
}
/// Receive and handle the next message from the associated connection.
///
/// This function will read the incoming message from
/// [`receive_message()`](Connection::receive_message) of the associated connection and pass it
/// to [`dispatch_message()`](Self::dispatch_message). If the message was handled by an an
/// interface, it returns `Ok(None)`. If not, it returns the received message.
///
/// Returns an error if the message is malformed or an error occurred.
pub fn try_handle_next(&mut self) -> Result<Option<Arc<Message>>> {
match self.msg_stream.next() {
Some(msg) => {
let msg = msg?;
if !self.dispatch_message(&msg)? {
Ok(Some(msg))
} else {
Ok(None)
}
}
None => {
// If SocketStream gives us None, that means the socket was closed
Err(Error::Io(io::Error::new(
ErrorKind::BrokenPipe,
"socket closed",
)))
}
}
}
}
#[cfg(test)]
......@@ -706,8 +672,8 @@ mod tests {
convert::TryInto,
error::Error,
sync::{
mpsc::{channel, Sender},
Arc, Mutex,
mpsc::{channel, sync_channel, Sender, SyncSender},
Arc,
},
thread,
};
......@@ -720,8 +686,7 @@ mod tests {
use zvariant::derive::Type;
use crate::{
dbus_interface, dbus_proxy, Connection, MessageHeader, MessageStream, MessageType,
ObjectServer, SignalContext,
dbus_interface, dbus_proxy, Connection, MessageHeader, MessageType, SignalContext,
};
#[derive(Deserialize, Serialize, Type)]
......@@ -764,20 +729,19 @@ mod tests {
#[derive(Debug, Clone)]
enum NextAction {
Nothing,
Quit,
CreateObj(String),
DestroyObj(String),
}
struct MyIfaceImpl {
action: Arc<Mutex<NextAction>>,
next_tx: SyncSender<NextAction>,
count: u32,
}
impl MyIfaceImpl {
fn new(action: Arc<Mutex<NextAction>>) -> Self {
Self { action, count: 0 }
fn new(next_tx: SyncSender<NextAction>) -> Self {
Self { next_tx, count: 0 }
}
}
......@@ -799,8 +763,8 @@ mod tests {
self.count
}
fn quit(&mut self) {
*self.action.lock().unwrap() = NextAction::Quit;
fn quit(&self) {
self.next_tx.send(NextAction::Quit).unwrap();
}
fn test_header(&self, #[zbus(header)] header: MessageHeader<'_>) {
......@@ -851,11 +815,11 @@ mod tests {
}
fn create_obj(&self, key: String) {
*self.action.lock().unwrap() = NextAction::CreateObj(key);
self.next_tx.send(NextAction::CreateObj(key)).unwrap();
}
fn destroy_obj(&self, key: String) {
*self.action.lock().unwrap() = NextAction::DestroyObj(key);
self.next_tx.send(NextAction::DestroyObj(key)).unwrap();
}
#[dbus_interface(property)]
......@@ -985,53 +949,48 @@ mod tests {
.unwrap()
.request_name("org.freedesktop.MyService.bar")
.unwrap();
let stream = MessageStream::from(&conn);
let mut object_server = ObjectServer::new(&conn);
let action = Arc::new(Mutex::new(NextAction::Nothing));
let child = thread::spawn(move || my_iface_test(tx).expect("child failed"));
// Wait for the listener to be ready
rx.recv().unwrap();
let iface = MyIfaceImpl::new(action.clone());
object_server
.at("/org/freedesktop/MyService", iface)
.unwrap();
object_server
.with("/org/freedesktop/MyService", |iface: &MyIfaceImpl, ctxt| {
iface.count_changed(&ctxt)
})
.unwrap();
let (next_tx, next_rx) = sync_channel(64);
let iface = MyIfaceImpl::new(next_tx.clone());
{
let mut server = conn.object_server_mut();
server.at("/org/freedesktop/MyService", iface).unwrap();
for m in stream {
let m = m.unwrap();
if let Err(e) = object_server.dispatch_message(&m) {
eprintln!("{}", e);
}
server
.with(
"/org/freedesktop/MyService",
&conn,
|iface: &MyIfaceImpl, ctxt| iface.count_changed(&ctxt),
)
.unwrap();
}
object_server
loop {
conn.object_server_mut()
.with(
"/org/freedesktop/MyService",
&conn,
|_iface: &MyIfaceImpl, ctxt| MyIfaceImpl::alert_count(&ctxt, 51),
)
.unwrap();
let mut next = action.lock().unwrap();
let current_next = next.clone();
*next = NextAction::Nothing;
match current_next {
NextAction::Nothing => (),
match next_rx.recv().unwrap() {
NextAction::Quit => break,
NextAction::CreateObj(key) => {
let path = format!("/zbus/test/{}", key);
object_server
.at(path, MyIfaceImpl::new(action.clone()))
conn.object_server_mut()
.at(path, MyIfaceImpl::new(next_tx.clone()))
.unwrap();
}
NextAction::DestroyObj(key) => {
let path = format!("/zbus/test/{}", key);
object_server.remove::<MyIfaceImpl, _>(path).unwrap();
conn.object_server_mut()
.remove::<MyIfaceImpl, _>(path)
.unwrap();
}
}
}
......
......@@ -214,7 +214,7 @@ fn test_interface() {
if false {
// check compilation
let c = zbus::Connection::session().unwrap();
let s = zbus::ObjectServer::new(&c);
let s = c.object_server();
let m = zbus::Message::method(None::<()>, None::<()>, "/", None::<()>, "StrU32", &(42,))
.unwrap();
let _ = t.call(&s, &c, &m, "StrU32".try_into().unwrap()).unwrap();
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment