Commit 946bbdbb authored by Zeeshan Ali Khan's avatar Zeeshan Ali Khan
Browse files

zb: Give names to tokio tasks

Conditionally use the experimental API to give names to tasks so
tokio-console can show our tasks by name and users/us will have a much
better luck reading the console output.

`tokio_unstable` cargo flag will need to be specified though but we
leave that to the users.
parent 29ba36d5
......@@ -38,6 +38,7 @@ impl<'a> Executor<'a> {
pub fn spawn<T: Send + 'static>(
&self,
future: impl Future<Output = T> + Send + 'static,
#[allow(unused)] name: &str,
) -> Task<T> {
#[cfg(not(feature = "tokio"))]
{
......@@ -46,7 +47,20 @@ impl<'a> Executor<'a> {
#[cfg(feature = "tokio")]
{
Task(Some(tokio::task::spawn(future)))
#[cfg(tokio_unstable)]
{
Task(Some(
tokio::task::Builder::new()
.name(name)
.spawn(future)
// SAFETY: Looking at the code, this call always returns an `Ok`.
.unwrap(),
))
}
#[cfg(not(tokio_unstable))]
{
Task(Some(tokio::task::spawn(future)))
}
}
}
......
......@@ -97,7 +97,7 @@ impl MessageReceiverTask {
}
fn spawn(self: Arc<Self>, executor: &Executor<'_>) -> Task<()> {
executor.spawn(self.receive_msg())
executor.spawn(self.receive_msg(), "socket reader")
}
// Keep receiving messages and put them on the queue.
......@@ -643,10 +643,10 @@ impl Connection {
let reply = dbus_proxy
.request_name(well_known_name.clone(), flags)
.await?;
let lost_task_name = format!("monitor name {well_known_name} lost");
let name_lost_fut = if flags.contains(RequestNameFlags::AllowReplacement) {
let weak_conn = WeakConnection::from(self);
let well_known_name = well_known_name.to_owned();
let span = info_span!("monitor {name} lost", name = %well_known_name);
Some(
async move {
loop {
......@@ -686,7 +686,7 @@ impl Connection {
}
}
}
.instrument(span),
.instrument(info_span!("{}", lost_task_name)),
)
} else {
None
......@@ -695,7 +695,7 @@ impl Connection {
RequestNameReply::InQueue => {
let weak_conn = WeakConnection::from(self);
let well_known_name = well_known_name.to_owned();
let span = info_span!("monitor name {name} acquired", name = %well_known_name);
let task_name = format!("monitor name {well_known_name} acquired");
let task = self.executor().spawn(
async move {
loop {
......@@ -709,8 +709,9 @@ impl Connection {
Ok(args) if args.name == well_known_name => {
let mut names = inner.registered_names.lock().await;
if let Some(status) = names.get_mut(&well_known_name) {
let task =
name_lost_fut.map(|fut| inner.executor.spawn(fut));
let task = name_lost_fut.map(|fut| {
inner.executor.spawn(fut, &lost_task_name)
});
*status = NameStatus::Owner(task);
break;
......@@ -728,13 +729,14 @@ impl Connection {
}
}
}
.instrument(span),
.instrument(info_span!("{}", task_name)),
&task_name,
);
NameStatus::Queued(task)
}
RequestNameReply::PrimaryOwner | RequestNameReply::AlreadyOwner => {
let task = name_lost_fut.map(|fut| self.executor().spawn(fut));
let task = name_lost_fut.map(|fut| self.executor().spawn(fut, &lost_task_name));
NameStatus::Owner(task)
}
......@@ -938,6 +940,7 @@ impl Connection {
ready(msg.as_ref().map(|m| m.message_type() == MessageType::MethodCall).unwrap_or_default())
});
let obj_server_task_name = "ObjectServer task";
self.inner.executor.spawn(
async move {
trace!("waiting for incoming method call messages..");
......@@ -1011,7 +1014,8 @@ impl Connection {
);
}
}
.instrument(trace_span!("{}", task_name))
.instrument(trace_span!("{}", task_name)),
&task_name,
)
.detach();
} else {
......@@ -1021,7 +1025,8 @@ impl Connection {
}
}
}
.instrument(info_span!("ObjectServer task")),
.instrument(info_span!("{}", obj_server_task_name)),
obj_server_task_name,
)
});
}
......@@ -1077,9 +1082,10 @@ impl Connection {
pub(crate) fn queue_remove_match(&self, rule: OwnedMatchRule) {
let conn = self.clone();
let remove_match = async move { conn.remove_match(rule).await }
.instrument(trace_span!("Remove match `{rule}`"));
self.inner.executor.spawn(remove_match).detach()
let task_name = format!("Remove match `{}`", rule.to_string());
let remove_match =
async move { conn.remove_match(rule).await }.instrument(trace_span!("{}", task_name));
self.inner.executor.spawn(remove_match, &task_name).detach()
}
async fn hello_bus(&self) -> Result<()> {
......@@ -1351,12 +1357,15 @@ mod tests {
server2: Connection,
client2: Connection,
) -> Result<()> {
let _forward_task = client1.executor().spawn(async move {
futures_util::try_join!(
MessageStream::from(&server1).forward(&client2),
MessageStream::from(&client2).forward(&server1),
)
});
let _forward_task = client1.executor().spawn(
async move {
futures_util::try_join!(
MessageStream::from(&server1).forward(&client2),
MessageStream::from(&client2).forward(&server1),
)
},
"forward_task",
);
let server_future = async {
let mut stream = MessageStream::from(&server2);
......
......@@ -269,8 +269,7 @@ impl PropertiesCache {
});
let cache_clone = cache.clone();
let interface_str = interface.as_str();
let span = info_span!("{} proxy caching", interface_str);
let task_name = format!("{} proxy caching", interface);
let proxy_caching = async move {
let (proxy, interface, uncached_properties) = match cache_clone
.init(proxy, interface, uncached_properties)
......@@ -297,8 +296,8 @@ impl PropertiesCache {
debug!("Error keeping properties cache updated: {e}");
}
}
.instrument(span);
let task = executor.spawn(proxy_caching);
.instrument(info_span!("{}", task_name));
let task = executor.spawn(proxy_caching, &task_name);
(cache, task)
}
......@@ -1362,7 +1361,7 @@ mod tests {
let handle = {
let tx = tx.clone();
let conn = server_conn.clone();
server_conn.executor().spawn(async move {
let server_fut = async move {
use std::time::Duration;
#[cfg(not(feature = "tokio"))]
......@@ -1391,7 +1390,8 @@ mod tests {
#[cfg(feature = "tokio")]
sleep(Duration::from_millis(5)).await;
}
})
};
server_conn.executor().spawn(server_fut, "server_task")
};
let signal_fut = async {
......
......@@ -675,7 +675,7 @@ async fn iface_and_proxy_(p2p: bool) {
let listen = event.listen();
let child = client_conn
.executor()
.spawn(my_iface_test(client_conn.clone(), event));
.spawn(my_iface_test(client_conn.clone(), event), "client_task");
debug!("Child task spawned.");
// Wait for the listener to be ready
listen.await;
......
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment