connection.rs 61.2 KB
Newer Older
1
use async_broadcast::{broadcast, InactiveReceiver, Receiver, Sender as Broadcaster};
2
use enumflags2::BitFlags;
3
use event_listener::{Event, EventListener};
4
use once_cell::sync::OnceCell;
danieldg's avatar
danieldg committed
5
use ordered_stream::{OrderedFuture, OrderedStream, PollResult};
6
use static_assertions::assert_impl_all;
7
use std::{
8
    collections::HashMap,
9
    convert::TryInto,
10
    io::{self, ErrorKind},
11
    ops::Deref,
12
    pin::Pin,
13
    sync::{
14
        self,
15
        atomic::{AtomicU32, Ordering::SeqCst},
16
        Arc, Weak,
17
    },
18
19
    task::{Context, Poll},
};
20
use tracing::{debug, info_span, instrument, trace, trace_span, warn, Instrument};
21
use zbus_names::{BusName, ErrorName, InterfaceName, MemberName, OwnedUniqueName, WellKnownName};
22
use zvariant::ObjectPath;
23

24
use futures_core::{ready, Future};
25
use futures_sink::Sink;
26
use futures_util::{sink::SinkExt, StreamExt, TryFutureExt};
27
28

use crate::{
29
    async_lock::Mutex,
30
31
    blocking,
    fdo::{self, RequestNameFlags, RequestNameReply},
danieldg's avatar
danieldg committed
32
    raw::{Connection as RawConnection, Socket},
33
    socket_reader::SocketReader,
34
35
36
    Authenticated, CacheProperties, ConnectionBuilder, DBusError, Error, Executor, Guid, MatchRule,
    Message, MessageBuilder, MessageFlags, MessageStream, MessageType, ObjectServer,
    OwnedMatchRule, Result, Task,
37
38
};

39
const DEFAULT_MAX_QUEUED: usize = 64;
40
const DEFAULT_MAX_METHOD_RETURN_QUEUED: usize = 8;
Zeeshan Ali Khan's avatar
Zeeshan Ali Khan committed
41

42
/// Inner state shared by Connection and WeakConnection
43
#[derive(Debug)]
44
pub(crate) struct ConnectionInner {
45
    server_guid: Guid,
46
    #[cfg(unix)]
47
    cap_unix_fd: bool,
48
    bus_conn: bool,
49
    unique_name: OnceCell<OwnedUniqueName>,
50
    registered_names: Mutex<HashMap<WellKnownName<'static>, NameStatus>>,
51

52
    raw_conn: Arc<sync::Mutex<RawConnection<Box<dyn Socket>>>>,
53

54
    // Serial number for next outgoing message
55
    serial: AtomicU32,
56

57
    // Our executor
58
    executor: Executor<'static>,
59

60
    // Socket reader task
61
    #[allow(unused)]
62
    socket_reader_task: OnceCell<Task<()>>,
63

64
    pub(crate) msg_receiver: InactiveReceiver<Result<Arc<Message>>>,
65
    pub(crate) method_return_receiver: InactiveReceiver<Result<Arc<Message>>>,
66
    msg_senders: Arc<Mutex<HashMap<Option<OwnedMatchRule>, MsgBroadcaster>>>,
67

68
    subscriptions: Mutex<Subscriptions>,
69

70
    object_server: OnceCell<blocking::ObjectServer>,
71
    object_server_dispatch_task: OnceCell<Task<()>>,
72
73
}

74
75
type Subscriptions = HashMap<OwnedMatchRule, (u64, InactiveReceiver<Result<Arc<Message>>>)>;

76
pub(crate) type MsgBroadcaster = Broadcaster<Result<Arc<Message>>>;
77

78
/// A D-Bus connection.
79
///
80
/// A connection to a D-Bus bus, or a direct peer.
81
///
82
83
84
85
86
87
88
89
90
91
/// Once created, the connection is authenticated and negotiated and messages can be sent or
/// received, such as [method calls] or [signals].
///
/// For higher-level message handling (typed functions, introspection, documentation reasons etc),
/// it is recommended to wrap the low-level D-Bus messages into Rust functions with the
/// [`dbus_proxy`] and [`dbus_interface`] macros instead of doing it directly on a `Connection`.
///
/// Typically, a connection is made to the session bus with [`Connection::session`], or to the
/// system bus with [`Connection::system`]. Then the connection is used with [`crate::Proxy`]
/// instances or the on-demand [`ObjectServer`] instance that can be accessed through
92
/// [`Connection::object_server`].
93
94
95
///
/// `Connection` implements [`Clone`] and cloning it is a very cheap operation, as the underlying
/// data is not cloned. This makes it very convenient to share the connection between different
Zeeshan Ali Khan's avatar
Zeeshan Ali Khan committed
96
/// parts of your code. `Connection` also implements [`std::marker::Sync`] and [`std::marker::Send`]
97
98
/// so you can send and share a connection instance across threads as well.
///
99
100
101
102
103
104
/// `Connection` keeps internal queues of incoming message. The default capacity of each of these is
/// 64. The capacity of the main (unfiltered) queue is configurable through the [`set_max_queued`]
/// method. When the queue is full, no more messages can be received until room is created for more.
/// This is why it's important to ensure that all [`crate::MessageStream`] and
/// [`crate::blocking::MessageIterator`] instances are continuously polled and iterated on,
/// respectively.
105
106
///
/// For sending messages you can either use [`Connection::send_message`] method or make use of the
107
108
109
110
111
112
/// [`Sink`] implementation. For latter, you might find [`SinkExt`] API very useful. Keep in mind
/// that [`Connection`] will not manage the serial numbers (cookies) on the messages for you when
/// they are sent through the [`Sink`] implementation. You can manually assign unique serial numbers
/// to them using the [`Connection::assign_serial_num`] method before sending them off, if needed.
/// Having said that, the [`Sink`] is mainly useful for sending out signals, as they do not expect
/// a reply, and serial numbers are not very useful for signals either for the same reason.
113
///
danieldg's avatar
danieldg committed
114
115
116
/// Since you do not need exclusive access to a `zbus::Connection` to send messages on the bus,
/// [`Sink`] is also implemented on `&Connection`.
///
117
118
119
120
121
122
123
124
/// # Caveats
///
/// At the moment, a simultaneous [flush request] from multiple tasks/threads could
/// potentially create a busy loop, thus wasting CPU time. This limitation may be removed in the
/// future.
///
/// [flush request]: https://docs.rs/futures/0.3.15/futures/sink/trait.SinkExt.html#method.flush
///
125
126
127
128
129
130
131
/// [method calls]: struct.Connection.html#method.call_method
/// [signals]: struct.Connection.html#method.emit_signal
/// [`dbus_proxy`]: attr.dbus_proxy.html
/// [`dbus_interface`]: attr.dbus_interface.html
/// [`Clone`]: https://doc.rust-lang.org/std/clone/trait.Clone.html
/// [`set_max_queued`]: struct.Connection.html#method.set_max_queued
///
132
133
134
135
136
/// ### Examples
///
/// #### Get the session bus ID
///
/// ```
137
///# zbus::block_on(async {
138
/// use zbus::Connection;
139
///
140
/// let connection = Connection::session().await?;
141
142
143
144
145
146
147
148
149
150
151
152
153
154
///
/// let reply = connection
///     .call_method(
///         Some("org.freedesktop.DBus"),
///         "/org/freedesktop/DBus",
///         Some("org.freedesktop.DBus"),
///         "GetId",
///         &(),
///     )
///     .await?;
///
/// let id: &str = reply.body()?;
/// println!("Unique ID of the bus: {}", id);
///# Ok::<(), zbus::Error>(())
155
///# }).unwrap();
156
157
158
159
160
161
162
/// ```
///
/// #### Monitoring all messages
///
/// Let's eavesdrop on the session bus 😈 using the [Monitor] interface:
///
/// ```rust,no_run
163
///# zbus::block_on(async {
164
/// use futures_util::stream::TryStreamExt;
165
/// use zbus::{Connection, MessageStream};
166
///
167
/// let connection = Connection::session().await?;
168
169
170
171
172
173
174
175
176
177
178
///
/// connection
///     .call_method(
///         Some("org.freedesktop.DBus"),
///         "/org/freedesktop/DBus",
///         Some("org.freedesktop.DBus.Monitoring"),
///         "BecomeMonitor",
///         &(&[] as &[&str], 0u32),
///     )
///     .await?;
///
179
180
/// let mut stream = MessageStream::from(connection);
/// while let Some(msg) = stream.try_next().await? {
181
182
183
184
///     println!("Got message: {}", msg);
/// }
///
///# Ok::<(), zbus::Error>(())
185
///# }).unwrap();
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
/// ```
///
/// This should print something like:
///
/// ```console
/// Got message: Signal NameAcquired from org.freedesktop.DBus
/// Got message: Signal NameLost from org.freedesktop.DBus
/// Got message: Method call GetConnectionUnixProcessID from :1.1324
/// Got message: Error org.freedesktop.DBus.Error.NameHasNoOwner:
///              Could not get PID of name ':1.1332': no such name from org.freedesktop.DBus
/// Got message: Method call AddMatch from :1.918
/// Got message: Method return from org.freedesktop.DBus
/// ```
///
/// [Monitor]: https://dbus.freedesktop.org/doc/dbus-specification.html#bus-messages-become-monitor
201
#[derive(Clone, Debug)]
202
pub struct Connection {
203
    pub(crate) inner: Arc<ConnectionInner>,
204
}
205

206
207
assert_impl_all!(Connection: Send, Sync, Unpin);

danieldg's avatar
danieldg committed
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
/// A method call whose completion can be awaited or joined with other streams.
///
/// This is useful for cache population method calls, where joining the [`JoinableStream`] with
/// an update signal stream can be used to ensure that cache updates are not overwritten by a cache
/// population whose task is scheduled later.
#[derive(Debug)]
pub(crate) struct PendingMethodCall {
    stream: Option<MessageStream>,
    serial: u32,
}

impl Future for PendingMethodCall {
    type Output = Result<Arc<Message>>;

    fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
        self.poll_before(cx, None).map(|ret| {
            ret.map(|(_, r)| r).unwrap_or_else(|| {
225
226
227
                Err(crate::Error::InputOutput(
                    io::Error::new(ErrorKind::BrokenPipe, "socket closed").into(),
                ))
danieldg's avatar
danieldg committed
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
            })
        })
    }
}

impl OrderedFuture for PendingMethodCall {
    type Output = Result<Arc<Message>>;
    type Ordering = zbus::MessageSequence;

    fn poll_before(
        self: Pin<&mut Self>,
        cx: &mut Context<'_>,
        before: Option<&Self::Ordering>,
    ) -> Poll<Option<(Self::Ordering, Self::Output)>> {
        let this = self.get_mut();
        if let Some(stream) = &mut this.stream {
            loop {
                match Pin::new(&mut *stream).poll_next_before(cx, before) {
                    Poll::Ready(PollResult::Item {
                        data: Ok(msg),
                        ordering,
                    }) => {
250
                        if msg.reply_serial() != Some(this.serial) {
danieldg's avatar
danieldg committed
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
                            continue;
                        }
                        let res = match msg.message_type() {
                            MessageType::Error => Err(msg.into()),
                            MessageType::MethodReturn => Ok(msg),
                            _ => continue,
                        };
                        this.stream = None;
                        return Poll::Ready(Some((ordering, res)));
                    }
                    Poll::Ready(PollResult::Item {
                        data: Err(e),
                        ordering,
                    }) => {
                        return Poll::Ready(Some((ordering, Err(e))));
                    }

                    Poll::Ready(PollResult::NoneBefore) => {
                        return Poll::Ready(None);
                    }
                    Poll::Ready(PollResult::Terminated) => {
                        return Poll::Ready(None);
                    }
                    Poll::Pending => return Poll::Pending,
                }
            }
        }
        Poll::Ready(None)
    }
}

282
impl Connection {
283
284
    /// Send `msg` to the peer.
    ///
285
286
    /// Unlike our [`Sink`] implementation, this method sets a unique (to this connection) serial
    /// number on the message before sending it off, for you.
287
288
    ///
    /// On successfully sending off `msg`, the assigned serial number is returned.
289
    pub async fn send_message(&self, mut msg: Message) -> Result<u32> {
290
        let serial = self.assign_serial_num(&mut msg)?;
291

292
        trace!("Sending message: {:?}", msg);
293
        (&mut &*self).send(msg).await?;
294
        trace!("Sent message with serial: {}", serial);
295
296
297
298
299
300
301
302

        Ok(serial)
    }

    /// Send a method call.
    ///
    /// Create a method-call message, send it over the connection, then wait for the reply.
    ///
Thomas Mühlbacher's avatar
Thomas Mühlbacher committed
303
    /// On successful reply, an `Ok(Message)` is returned. On error, an `Err` is returned. D-Bus
304
    /// error replies are returned as [`Error::MethodError`].
305
    pub async fn call_method<'d, 'p, 'i, 'm, D, P, I, M, B>(
306
        &self,
307
308
309
310
        destination: Option<D>,
        path: P,
        interface: Option<I>,
        method_name: M,
311
        body: &B,
312
    ) -> Result<Arc<Message>>
313
    where
314
315
316
317
318
319
320
321
        D: TryInto<BusName<'d>>,
        P: TryInto<ObjectPath<'p>>,
        I: TryInto<InterfaceName<'i>>,
        M: TryInto<MemberName<'m>>,
        D::Error: Into<Error>,
        P::Error: Into<Error>,
        I::Error: Into<Error>,
        M::Error: Into<Error>,
322
        B: serde::ser::Serialize + zvariant::DynamicType,
323
    {
324
        self.call_method_raw(
325
326
            destination,
            path,
327
            interface,
328
            method_name,
329
            BitFlags::empty(),
330
            body,
331
332
333
334
        )
        .await?
        .expect("no reply")
        .await
danieldg's avatar
danieldg committed
335
336
337
338
339
340
341
    }

    /// Send a method call.
    ///
    /// Send the given message, which must be a method call, over the connection and return an
    /// object that allows the reply to be retrieved.  Typically you'd want to use
    /// [`Connection::call_method`] instead.
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
    ///
    /// If the `flags` do not contain `MethodFlags::NoReplyExpected`, the return value is
    /// guaranteed to be `Ok(Some(_))`, if there was no error encountered.
    ///
    /// INTERNAL NOTE: If this method is ever made pub, flags should become `BitFlags<MethodFlags>`.
    pub(crate) async fn call_method_raw<'d, 'p, 'i, 'm, D, P, I, M, B>(
        &self,
        destination: Option<D>,
        path: P,
        interface: Option<I>,
        method_name: M,
        flags: BitFlags<MessageFlags>,
        body: &B,
    ) -> Result<Option<PendingMethodCall>>
    where
        D: TryInto<BusName<'d>>,
        P: TryInto<ObjectPath<'p>>,
        I: TryInto<InterfaceName<'i>>,
        M: TryInto<MemberName<'m>>,
        D::Error: Into<Error>,
        P::Error: Into<Error>,
        I::Error: Into<Error>,
        M::Error: Into<Error>,
        B: serde::ser::Serialize + zvariant::DynamicType,
    {
        let mut builder = MessageBuilder::method_call(path, method_name)?;
368
369
370
        if let Some(sender) = self.unique_name() {
            builder = builder.sender(sender)?
        }
371
372
373
374
375
376
377
378
379
380
        if let Some(destination) = destination {
            builder = builder.destination(destination)?
        }
        if let Some(interface) = interface {
            builder = builder.interface(interface)?
        }
        for flag in flags {
            builder = builder.with_flags(flag)?;
        }
        let msg = builder.build(body)?;
danieldg's avatar
danieldg committed
381

382
383
384
385
386
387
388
        let msg_receiver = self.inner.method_return_receiver.activate_cloned();
        let stream = Some(MessageStream::for_subscription_channel(
            msg_receiver,
            // This is a lie but we only use the stream internally so it's fine.
            None,
            self,
        ));
danieldg's avatar
danieldg committed
389
        let serial = self.send_message(msg).await?;
390
391
392
393
394
        if flags.contains(MessageFlags::NoReplyExpected) {
            Ok(None)
        } else {
            Ok(Some(PendingMethodCall { stream, serial }))
        }
395
396
397
398
399
    }

    /// Emit a signal.
    ///
    /// Create a signal message, and send it over the connection.
400
    pub async fn emit_signal<'d, 'p, 'i, 'm, D, P, I, M, B>(
401
        &self,
402
403
404
405
        destination: Option<D>,
        path: P,
        interface: I,
        signal_name: M,
406
407
408
        body: &B,
    ) -> Result<()>
    where
409
410
411
412
413
414
415
416
        D: TryInto<BusName<'d>>,
        P: TryInto<ObjectPath<'p>>,
        I: TryInto<InterfaceName<'i>>,
        M: TryInto<MemberName<'m>>,
        D::Error: Into<Error>,
        P::Error: Into<Error>,
        I::Error: Into<Error>,
        M::Error: Into<Error>,
417
        B: serde::ser::Serialize + zvariant::DynamicType,
418
419
420
421
422
    {
        let m = Message::signal(
            self.unique_name(),
            destination,
            path,
423
            interface,
424
425
426
427
428
429
430
431
432
433
434
435
436
            signal_name,
            body,
        )?;

        self.send_message(m).await.map(|_| ())
    }

    /// Reply to a message.
    ///
    /// Given an existing message (likely a method call), send a reply back to the caller with the
    /// given `body`.
    ///
    /// Returns the message serial number.
437
    pub async fn reply<B>(&self, call: &Message, body: &B) -> Result<u32>
438
    where
439
        B: serde::ser::Serialize + zvariant::DynamicType,
440
441
442
443
444
445
446
447
448
449
450
    {
        let m = Message::method_reply(self.unique_name(), call, body)?;
        self.send_message(m).await
    }

    /// Reply an error to a message.
    ///
    /// Given an existing message (likely a method call), send an error reply back to the caller
    /// with the given `error_name` and `body`.
    ///
    /// Returns the message serial number.
451
    pub async fn reply_error<'e, E, B>(
452
453
        &self,
        call: &Message,
454
        error_name: E,
455
456
        body: &B,
    ) -> Result<u32>
457
    where
458
        B: serde::ser::Serialize + zvariant::DynamicType,
459
460
        E: TryInto<ErrorName<'e>>,
        E::Error: Into<Error>,
461
462
463
464
465
    {
        let m = Message::method_error(self.unique_name(), call, error_name, body)?;
        self.send_message(m).await
    }

466
467
468
469
470
471
472
473
474
475
476
    /// Reply an error to a message.
    ///
    /// Given an existing message (likely a method call), send an error reply back to the caller
    /// using one of the standard interface reply types.
    ///
    /// Returns the message serial number.
    pub async fn reply_dbus_error(
        &self,
        call: &zbus::MessageHeader<'_>,
        err: impl DBusError,
    ) -> Result<u32> {
477
        let m = err.create_reply(call);
478
479
480
        self.send_message(m?).await
    }

481
482
483
    /// Register a well-known name for this connection.
    ///
    /// When connecting to a bus, the name is requested from the bus. In case of p2p connection, the
484
    /// name (if requested) is used of self-identification.
485
    ///
Zeeshan Ali Khan's avatar
Zeeshan Ali Khan committed
486
487
    /// You can request multiple names for the same connection. Use [`Connection::release_name`] for
    /// deregistering names registered through this method.
488
489
    ///
    /// Note that exclusive ownership without queueing is requested (using
490
491
492
    /// [`RequestNameFlags::ReplaceExisting`] and [`RequestNameFlags::DoNotQueue`] flags) since that
    /// is the most typical case. If that is not what you want, you should use
    /// [`Connection::request_name_with_flags`] instead (but make sure then that name is requested
493
    /// **after** you've setup your service implementation with the `ObjectServer`).
Flausch's avatar
Flausch committed
494
    ///
495
496
497
498
499
500
    /// # Caveats
    ///
    /// The associated `ObjectServer` will only handle method calls destined for the unique name of
    /// this connection or any of the registered well-known names. If no well-known name is
    /// registered, the method calls destined to all well-known names will be handled.
    ///
501
502
503
504
505
    /// Since names registered through any other means than `Connection` or [`ConnectionBuilder`]
    /// API are not known to the connection, method calls destined to those names will only be
    /// handled by the associated `ObjectServer` if none of the names are registered through
    /// `Connection*` API. Simply put, either register all the names through `Connection*` API or
    /// none of them.
506
    ///
Flausch's avatar
Flausch committed
507
508
509
    /// # Errors
    ///
    /// Fails with `zbus::Error::NameTaken` if the name is already owned by another peer.
510
    pub async fn request_name<'w, W>(&self, well_known_name: W) -> Result<()>
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
    where
        W: TryInto<WellKnownName<'w>>,
        W::Error: Into<Error>,
    {
        self.request_name_with_flags(
            well_known_name,
            RequestNameFlags::ReplaceExisting | RequestNameFlags::DoNotQueue,
        )
        .await
        .map(|_| ())
    }

    /// Register a well-known name for this connection.
    ///
    /// This is the same as [`Connection::request_name`] but allows to specify the flags to use when
    /// requesting the name.
    ///
    /// If the [`RequestNameFlags::DoNotQueue`] flag is not specified and request ends up in the
    /// queue, you can use [`fdo::NameAcquiredStream`] to be notified when the name is acquired. A
    /// queued name request can be cancelled using [`Connection::release_name`].
    ///
    /// If the [`RequestNameFlags::AllowReplacement`] flag is specified, the requested name can be
    /// lost if another peer requests the same name. You can use [`fdo::NameLostStream`] to be
    /// notified when the name is lost
    ///
    /// # Example
    ///
    /// ```
    ///#
    ///# zbus::block_on(async {
    /// use zbus::{Connection, fdo::{DBusProxy, RequestNameFlags, RequestNameReply}};
    /// use enumflags2::BitFlags;
    /// use futures_util::stream::StreamExt;
    ///
    /// let name = "org.freedesktop.zbus.QueuedNameTest";
    /// let conn1 = Connection::session().await?;
    /// // This should just work right away.
    /// conn1.request_name(name).await?;
    ///
    /// let conn2 = Connection::session().await?;
    /// // A second request from the another connection will fail with `DoNotQueue` flag, which is
    /// // implicit with `request_name` method.
    /// assert!(conn2.request_name(name).await.is_err());
    ///
    /// // Now let's try w/o `DoNotQueue` and we should be queued.
    /// let reply = conn2
    ///     .request_name_with_flags(name, RequestNameFlags::AllowReplacement.into())
    ///     .await?;
    /// assert_eq!(reply, RequestNameReply::InQueue);
    /// // Another request should just give us the same response.
    /// let reply = conn2
    ///     // The flags on subsequent requests will however be ignored.
    ///     .request_name_with_flags(name, BitFlags::empty())
    ///     .await?;
    /// assert_eq!(reply, RequestNameReply::InQueue);
    /// let mut acquired_stream = DBusProxy::new(&conn2)
    ///     .await?
    ///     .receive_name_acquired()
    ///     .await?;
    /// assert!(conn1.release_name(name).await?);
    /// // This would have waited forever if `conn1` hadn't just release the name.
    /// let acquired = acquired_stream.next().await.unwrap();
    /// assert_eq!(acquired.args().unwrap().name, name);
    ///
    /// // conn2 made the mistake of being too nice and allowed name replacemnt, so conn1 should be
    /// // able to take it back.
    /// let mut lost_stream = DBusProxy::new(&conn2)
    ///     .await?
    ///     .receive_name_lost()
    ///     .await?;
    /// conn1.request_name(name).await?;
    /// let lost = lost_stream.next().await.unwrap();
    /// assert_eq!(lost.args().unwrap().name, name);
    ///
    ///# Ok::<(), zbus::Error>(())
586
    ///# }).unwrap();
587
588
589
590
591
592
593
594
595
596
597
598
599
600
    /// ```
    ///
    /// # Caveats
    ///
    /// * Same as that of [`Connection::request_name`].
    /// * If you wish to track changes to name ownership after this call, make sure that the
    /// [`fdo::NameAcquired`] and/or [`fdo::NameLostStream`] instance(s) are created **before**
    /// calling this method. Otherwise, you may loose the signal if it's emitted after this call but
    /// just before the stream instance get created.
    pub async fn request_name_with_flags<'w, W>(
        &self,
        well_known_name: W,
        flags: BitFlags<RequestNameFlags>,
    ) -> Result<RequestNameReply>
601
602
603
604
605
    where
        W: TryInto<WellKnownName<'w>>,
        W::Error: Into<Error>,
    {
        let well_known_name = well_known_name.try_into().map_err(Into::into)?;
606
607
        // We keep the lock until the end of this function so that the (possibly) spawned task
        // doesn't end up accessing the name entry before it's inserted.
608
609
        let mut names = self.inner.registered_names.lock().await;

610
611
612
613
        match names.get(&well_known_name) {
            Some(NameStatus::Owner(_)) => return Ok(RequestNameReply::AlreadyOwner),
            Some(NameStatus::Queued(_)) => return Ok(RequestNameReply::InQueue),
            None => (),
614
615
        }

616
        if !self.is_bus() {
617
            names.insert(well_known_name.to_owned(), NameStatus::Owner(None));
618

619
            return Ok(RequestNameReply::PrimaryOwner);
620
621
        }

622
        let dbus_proxy = fdo::DBusProxy::builder(self)
623
624
625
            .cache_properties(CacheProperties::No)
            .build()
            .await?;
626
627
628
629
630
        let mut acquired_stream = dbus_proxy.receive_name_acquired().await?;
        let mut lost_stream = dbus_proxy.receive_name_lost().await?;
        let reply = dbus_proxy
            .request_name(well_known_name.clone(), flags)
            .await?;
631
        let lost_task_name = format!("monitor name {well_known_name} lost");
632
633
634
        let name_lost_fut = if flags.contains(RequestNameFlags::AllowReplacement) {
            let weak_conn = WeakConnection::from(self);
            let well_known_name = well_known_name.to_owned();
635
636
637
638
639
640
641
642
            Some(
                async move {
                    loop {
                        let signal = lost_stream.next().await;
                        let inner = match weak_conn.upgrade() {
                            Some(conn) => conn.inner.clone(),
                            None => break,
                        };
643

644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
                        match signal {
                            Some(signal) => match signal.args() {
                                Ok(args) if args.name == well_known_name => {
                                    tracing::info!(
                                        "Connection `{}` lost name `{}`",
                                        // SAFETY: This is bus connection so unique name can't be None.
                                        inner.unique_name.get().unwrap(),
                                        well_known_name
                                    );
                                    inner.registered_names.lock().await.remove(&well_known_name);

                                    break;
                                }
                                Ok(_) => (),
                                Err(e) => warn!("Failed to parse `NameLost` signal: {}", e),
                            },
                            None => {
                                trace!("`NameLost` signal stream closed");
                                // This is a very strange state we end up in. Now the name is question
                                // remains in the queue forever. Maybe we can do better here but I
                                // think it's a very unlikely scenario anyway.
                                //
                                // Can happen if the connection is lost/dropped but then the whole
                                // `Connection` instance will go away soon anyway and hence this
                                // strange state along with it.
669
670
671
672
673
                                break;
                            }
                        }
                    }
                }
674
                .instrument(info_span!("{}", lost_task_name)),
675
            )
Flausch's avatar
Flausch committed
676
        } else {
677
678
679
680
681
682
            None
        };
        let status = match reply {
            RequestNameReply::InQueue => {
                let weak_conn = WeakConnection::from(self);
                let well_known_name = well_known_name.to_owned();
683
                let task_name = format!("monitor name {well_known_name} acquired");
684
685
686
687
688
689
690
691
692
693
694
695
696
                let task = self.executor().spawn(
                    async move {
                        loop {
                            let signal = acquired_stream.next().await;
                            let inner = match weak_conn.upgrade() {
                                Some(conn) => conn.inner.clone(),
                                None => break,
                            };
                            match signal {
                                Some(signal) => match signal.args() {
                                    Ok(args) if args.name == well_known_name => {
                                        let mut names = inner.registered_names.lock().await;
                                        if let Some(status) = names.get_mut(&well_known_name) {
697
698
699
                                            let task = name_lost_fut.map(|fut| {
                                                inner.executor.spawn(fut, &lost_task_name)
                                            });
700
701
702
703
704
                                            *status = NameStatus::Owner(task);

                                            break;
                                        }
                                        // else the name was released in the meantime. :shrug:
705
                                    }
706
707
708
709
710
711
712
                                    Ok(_) => (),
                                    Err(e) => warn!("Failed to parse `NameAcquired` signal: {}", e),
                                },
                                None => {
                                    trace!("`NameAcquired` signal stream closed");
                                    // See comment above for similar state in case of `NameLost` stream.
                                    break;
713
714
715
716
                                }
                            }
                        }
                    }
717
718
                    .instrument(info_span!("{}", task_name)),
                    &task_name,
719
                );
720
721
722
723

                NameStatus::Queued(task)
            }
            RequestNameReply::PrimaryOwner | RequestNameReply::AlreadyOwner => {
724
                let task = name_lost_fut.map(|fut| self.executor().spawn(fut, &lost_task_name));
725
726
727
728
729
730
731
732
733

                NameStatus::Owner(task)
            }
            RequestNameReply::Exists => return Err(Error::NameTaken),
        };

        names.insert(well_known_name.to_owned(), status);

        Ok(reply)
734
735
736
737
738
739
740
741
    }

    /// Deregister a previously registered well-known name for this service on the bus.
    ///
    /// Use this method to deregister a well-known name, registered through
    /// [`Connection::request_name`].
    ///
    /// Unless an error is encountered, returns `Ok(true)` if name was previously registered with
Arnold Loubriat's avatar
Arnold Loubriat committed
742
    /// the bus through `self` and it has now been successfully deregistered, `Ok(false)` if name
743
744
745
746
747
748
749
750
751
    /// was not previously registered or already deregistered.
    pub async fn release_name<'w, W>(&self, well_known_name: W) -> Result<bool>
    where
        W: TryInto<WellKnownName<'w>>,
        W::Error: Into<Error>,
    {
        let well_known_name: WellKnownName<'w> = well_known_name.try_into().map_err(Into::into)?;
        let mut names = self.inner.registered_names.lock().await;
        // FIXME: Should be possible to avoid cloning/allocation here
752
        if names.remove(&well_known_name.to_owned()).is_none() {
753
            return Ok(false);
754
        };
755

756
757
758
759
        if !self.is_bus() {
            return Ok(true);
        }

760
761
762
        fdo::DBusProxy::builder(self)
            .cache_properties(CacheProperties::No)
            .build()
763
764
765
766
767
768
769
            .await?
            .release_name(well_known_name)
            .await
            .map(|_| true)
            .map_err(Into::into)
    }

770
771
772
773
    /// Checks if `self` is a connection to a message bus.
    ///
    /// This will return `false` for p2p connections.
    pub fn is_bus(&self) -> bool {
774
        self.inner.bus_conn
775
776
    }

777
778
    /// Assigns a serial number to `msg` that is unique to this connection.
    ///
Arnold Loubriat's avatar
Arnold Loubriat committed
779
    /// This method can fail if `msg` is corrupted.
780
    pub fn assign_serial_num(&self, msg: &mut Message) -> Result<u32> {
781
        let mut serial = 0;
782
        msg.modify_primary_header(|primary| {
783
            serial = *primary.serial_num_or_init(|| self.next_serial());
784
785
786
787
788
789
            Ok(())
        })?;

        Ok(serial)
    }

790
791
    /// The unique name of the connection, if set/applicable.
    ///
792
793
    /// The unique name is assigned by the message bus or set manually using
    /// [`Connection::set_unique_name`].
794
795
    pub fn unique_name(&self) -> Option<&OwnedUniqueName> {
        self.inner.unique_name.get()
796
797
    }

798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
    /// Sets the unique name of the connection (if not already set).
    ///
    /// # Panics
    ///
    /// This method panics if the unique name is already set. It will always panic if the connection
    /// is to a message bus as it's the bus that assigns peers their unique names. This is mainly
    /// provided for bus implementations. All other users should not need to use this method.
    pub fn set_unique_name<U>(&self, unique_name: U) -> Result<()>
    where
        U: TryInto<OwnedUniqueName>,
        U::Error: Into<Error>,
    {
        let name = unique_name.try_into().map_err(Into::into)?;
        self.inner
            .unique_name
            .set(name)
            .expect("unique name already set");

        Ok(())
    }

819
    /// The capacity of the main (unfiltered) queue.
820
    pub fn max_queued(&self) -> usize {
821
        self.inner.msg_receiver.capacity()
822
823
    }

824
    /// Set the capacity of the main (unfiltered) queue.
825
    pub fn set_max_queued(&mut self, max: usize) {
826
        self.inner.msg_receiver.clone().set_capacity(max);
827
828
829
830
    }

    /// The server's GUID.
    pub fn server_guid(&self) -> &str {
831
        self.inner.server_guid.as_str()
832
833
    }

834
835
    /// The underlying executor.
    ///
836
837
838
    /// When a connection is built with internal_executor set to false, zbus will not spawn a
    /// thread to run the executor. You're responsible to continuously [tick the executor][tte].
    /// Failure to do so will result in hangs.
839
    ///
840
841
    /// # Examples
    ///
842
    /// Here is how one would typically run the zbus executor through async-std's single-threaded
843
844
845
    /// scheduler:
    ///
    /// ```
846
847
848
    ///# // Disable on windows because somehow it triggers a stack overflow there:
    ///# // https://gitlab.freedesktop.org/zeenix/zbus/-/jobs/34023494
    ///# #[cfg(all(not(feature = "tokio"), not(target_os = "windows")))]
849
    ///# {
850
    /// use zbus::ConnectionBuilder;
851
    /// use async_std::task::{block_on, spawn};
852
    ///
853
    /// block_on(async {
854
855
856
857
858
859
    ///     let conn = ConnectionBuilder::session()
    ///         .unwrap()
    ///         .internal_executor(false)
    ///         .build()
    ///         .await
    ///         .unwrap();
860
861
    ///     {
    ///        let conn = conn.clone();
862
    ///        spawn(async move {
863
864
865
866
867
868
869
870
    ///            loop {
    ///                conn.executor().tick().await;
    ///            }
    ///        });
    ///     }
    ///
    ///     // All your other async code goes here.
    /// });
871
    ///# }
872
873
    /// ```
    ///
874
875
    /// **Note**: zbus 2.1 added support for tight integration with tokio. This means, if you use
    /// zbus with tokio, you do not need to worry about this at all. All you need to do is enable
876
    /// `tokio` feature. You should also disable the (default) `async-io` feature in your
877
878
    /// `Cargo.toml` to avoid unused dependencies. Also note that **prior** to zbus 3.0, disabling
    /// `async-io` was required to enable tight `tokio` integration.
879
    ///
880
881
    /// [tte]: https://docs.rs/async-executor/1.4.1/async_executor/struct.Executor.html#method.tick
    pub fn executor(&self) -> &Executor<'static> {
882
        &self.inner.executor
883
884
    }

885
886
887
    /// Get a reference to the associated [`ObjectServer`].
    ///
    /// The `ObjectServer` is created on-demand.
888
889
890
891
    ///
    /// **Note**: Once the `ObjectServer` is created, it will be replying to all method calls
    /// received on `self`. If you want to manually reply to method calls, do not use this
    /// method (or any of the `ObjectServer` related API).
892
    pub fn object_server(&self) -> impl Deref<Target = ObjectServer> + '_ {
893
        // FIXME: Maybe it makes sense after all to implement Deref<Target= ObjectServer> for
894
        // crate::ObjectServer instead of this wrapper?
895
896
        struct Wrapper<'a>(&'a blocking::ObjectServer);
        impl<'a> Deref for Wrapper<'a> {
897
            type Target = ObjectServer;
898
899
900
901
902
903

            fn deref(&self) -> &Self::Target {
                self.0.inner()
            }
        }

904
        Wrapper(self.sync_object_server(true, None))
905
906
    }

907
908
909
910
911
    pub(crate) fn sync_object_server(
        &self,
        start: bool,
        started_event: Option<Event>,
    ) -> &blocking::ObjectServer {
912
913
        self.inner
            .object_server
914
            .get_or_init(move || self.setup_object_server(start, started_event))
915
916
    }

917
918
919
920
921
    fn setup_object_server(
        &self,
        start: bool,
        started_event: Option<Event>,
    ) -> blocking::ObjectServer {
922
        if start {
923
            self.start_object_server(started_event);
924
        }
925

926
        blocking::ObjectServer::new(self)
927
928
    }

929
    #[instrument(skip(self))]
930
    pub(crate) fn start_object_server(&self, started_event: Option<Event>) {
931
        self.inner.object_server_dispatch_task.get_or_init(|| {
932
            trace!("starting ObjectServer task");
933
934
            let weak_conn = WeakConnection::from(self);

935
            let obj_server_task_name = "ObjectServer task";
936
937
            self.inner.executor.spawn(
                async move {
938
939
940
941
942
943
944
                    let mut stream = match weak_conn.upgrade() {
                        Some(conn) => {
                            let mut builder = MatchRule::builder().msg_type(MessageType::MethodCall);
                            if let Some(unique_name) = conn.unique_name() {
                                builder = builder.destination(&**unique_name).expect("unique name");
                            }
                            let rule = builder.build();
945
                            match conn.add_match(rule.into(), None).await {
946
947
948
949
950
951
952
953
954
955
956
957
958
959
960
961
962
963
964
                                Ok(stream) => stream,
                                Err(e) => {
                                    // Very unlikely but can happen I guess if connection is closed.
                                    debug!("Failed to create message stream: {}", e);

                                    return;
                                }
                            }
                        }
                        None => {
                            trace!("Connection is gone, stopping associated object server task");

                            return;
                        }
                    };
                    if let Some(started_event) = started_event {
                        started_event.notify(1);
                    }

965
                    trace!("waiting for incoming method call messages..");
966
967
968
969
970
971
972
                    while let Some(msg) = stream.next().await.and_then(|m| {
                        if let Err(e) = &m {
                            debug!("Error while reading from object server stream: {:?}", e);
                        }
                        m.ok()
                    }) {
                        if let Some(conn) = weak_conn.upgrade() {
973
974
975
976
977
978
979
980
981
                            let hdr = match msg.header() {
                                Ok(hdr) => hdr,
                                Err(e) => {
                                    warn!("Failed to parse header: {}", e);

                                    continue;
                                }
                            };
                            match hdr.destination() {
982
983
                                // Unique name is already checked by the match rule.
                                Ok(Some(BusName::Unique(_))) => (),
984
985
                                Ok(Some(BusName::WellKnown(dest))) => {
                                    let names = conn.inner.registered_names.lock().await;
986
987
                                    // destination doesn't matter if no name has been registered
                                    // (probably means name it's registered through external means).
988
                                    if !names.is_empty() && !names.contains_key(dest) {
989
990
991
992
993
994
995
996
997
998
999
1000
                                        trace!("Got a method call for a different destination: {}", dest);

                                        continue;
                                    }
                                }
                                Ok(None) => {
                                    warn!("Got a method call with no destination: {}", msg);

                                    continue;
                                }
                                Err(e) => {
                                    warn!("Failed to parse destination: {}", e);
For faster browsing, not all history is shown. View entire blame