Skip to content

Commit

Permalink
Drop objects registry mutex (#965)
Browse files Browse the repository at this point in the history
* Drop the ObjectsRegistry mutex and use message passing.
* Use `async_trait` in the network Adapter trait, although it should not
be needed in version 1.75 (another PR would follow).
* Set the minimal version to 1.74.
* Improve ServiceError messages.
  • Loading branch information
imobachgs authored Dec 29, 2023
2 parents 51abf8d + f652948 commit e4ae7ed
Show file tree
Hide file tree
Showing 12 changed files with 136 additions and 109 deletions.
6 changes: 5 additions & 1 deletion rust/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,10 @@ members = [
"agama-derive",
"agama-lib",
"agama-locale-data",
"agama-settings"
"agama-settings",
]
resolver = "2"

[workspace.package]
rust-version = "1.74"
edition = "2021"
5 changes: 3 additions & 2 deletions rust/agama-dbus-server/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -2,13 +2,14 @@
name = "agama-dbus-server"
version = "0.1.0"
edition = "2021"
rust-version.workspace = true

# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html

[dependencies]
anyhow = "1.0"
agama-locale-data = { path="../agama-locale-data" }
agama-lib = { path="../agama-lib" }
agama-locale-data = { path = "../agama-locale-data" }
agama-lib = { path = "../agama-lib" }
log = "0.4"
simplelog = "0.12.1"
systemd-journal-logger = "1.0"
Expand Down
6 changes: 6 additions & 0 deletions rust/agama-dbus-server/src/network/action.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,11 +23,17 @@ pub enum Action {
),
/// Gets a connection
GetConnection(Uuid, Responder<Option<Connection>>),
/// Gets a connection
GetConnectionPath(String, Responder<Option<OwnedObjectPath>>),
/// Get connections paths
GetConnectionsPaths(Responder<Vec<OwnedObjectPath>>),
/// Gets a controller connection
GetController(
Uuid,
Responder<Result<ControllerConnection, NetworkStateError>>,
),
/// Get devices paths
GetDevicesPaths(Responder<Vec<OwnedObjectPath>>),
/// Sets a controller's ports. It uses the Uuid of the controller and the IDs or interface names
/// of the ports.
SetPorts(
Expand Down
6 changes: 4 additions & 2 deletions rust/agama-dbus-server/src/network/adapter.rs
Original file line number Diff line number Diff line change
@@ -1,8 +1,10 @@
use crate::network::NetworkState;
use async_trait::async_trait;
use std::error::Error;

/// A trait for the ability to read/write from/to a network service
#[async_trait]
pub trait Adapter {
fn read(&self) -> Result<NetworkState, Box<dyn Error>>;
fn write(&self, network: &NetworkState) -> Result<(), Box<dyn Error>>;
async fn read(&self) -> Result<NetworkState, Box<dyn Error>>;
async fn write(&self, network: &NetworkState) -> Result<(), Box<dyn Error>>;
}
2 changes: 1 addition & 1 deletion rust/agama-dbus-server/src/network/dbus.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,4 +8,4 @@ pub mod service;
mod tree;

pub use service::NetworkService;
pub(crate) use tree::{ObjectsRegistry, Tree};
pub(crate) use tree::Tree;
34 changes: 18 additions & 16 deletions rust/agama-dbus-server/src/network/dbus/interfaces/connections.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,23 +9,21 @@ use zbus::{
};

use super::common::ConnectionInterface;
use crate::network::{dbus::ObjectsRegistry, error::NetworkStateError, model::MacAddress, Action};
use crate::network::{error::NetworkStateError, model::MacAddress, Action};

/// D-Bus interface for the set of connections.
///
/// It offers an API to query the connections collection.
pub struct Connections {
actions: Arc<Mutex<UnboundedSender<Action>>>,
objects: Arc<Mutex<ObjectsRegistry>>,
}

impl Connections {
/// Creates a Connections interface object.
///
/// * `objects`: Objects paths registry.
pub fn new(objects: Arc<Mutex<ObjectsRegistry>>, actions: UnboundedSender<Action>) -> Self {
pub fn new(actions: UnboundedSender<Action>) -> Self {
Self {
objects,
actions: Arc::new(Mutex::new(actions)),
}
}
Expand All @@ -34,13 +32,12 @@ impl Connections {
#[dbus_interface(name = "org.opensuse.Agama1.Network.Connections")]
impl Connections {
/// Returns the D-Bus paths of the network connections.
pub async fn get_connections(&self) -> Vec<ObjectPath> {
let objects = self.objects.lock().await;
objects
.connections_paths()
.iter()
.filter_map(|c| ObjectPath::try_from(c.clone()).ok())
.collect()
pub async fn get_connections(&self) -> zbus::fdo::Result<Vec<OwnedObjectPath>> {
let actions = self.actions.lock().await;
let (tx, rx) = oneshot::channel();
actions.send(Action::GetConnectionsPaths(tx)).unwrap();
let result = rx.await.unwrap();
Ok(result)
}

/// Adds a new network connection.
Expand All @@ -67,11 +64,16 @@ impl Connections {
///
/// * `id`: connection ID.
pub async fn get_connection(&self, id: &str) -> zbus::fdo::Result<OwnedObjectPath> {
let objects = self.objects.lock().await;
match objects.connection_path(id) {
Some(path) => Ok(path.into()),
None => Err(NetworkStateError::UnknownConnection(id.to_string()).into()),
}
let actions = self.actions.lock().await;
let (tx, rx) = oneshot::channel();
actions
.send(Action::GetConnectionPath(id.to_string(), tx))
.unwrap();
let path = rx
.await
.unwrap()
.ok_or(NetworkStateError::UnknownConnection(id.to_string()))?;
Ok(path)
}

/// Removes a network connection.
Expand Down
27 changes: 14 additions & 13 deletions rust/agama-dbus-server/src/network/dbus/interfaces/devices.rs
Original file line number Diff line number Diff line change
@@ -1,34 +1,35 @@
use crate::network::{dbus::ObjectsRegistry, model::Device as NetworkDevice};
use crate::network::{model::Device as NetworkDevice, Action};
use std::sync::Arc;
use tokio::sync::Mutex;
use zbus::{dbus_interface, zvariant::ObjectPath};
use tokio::sync::{mpsc::UnboundedSender, oneshot, Mutex};
use zbus::{dbus_interface, zvariant::OwnedObjectPath};

/// D-Bus interface for the network devices collection
///
/// It offers an API to query the devices collection.
pub struct Devices {
objects: Arc<Mutex<ObjectsRegistry>>,
actions: Arc<Mutex<UnboundedSender<Action>>>,
}

impl Devices {
/// Creates a Devices interface object.
///
/// * `objects`: Objects paths registry.
pub fn new(objects: Arc<Mutex<ObjectsRegistry>>) -> Self {
Self { objects }
pub fn new(actions: UnboundedSender<Action>) -> Self {
Self {
actions: Arc::new(Mutex::new(actions)),
}
}
}

#[dbus_interface(name = "org.opensuse.Agama1.Network.Devices")]
impl Devices {
/// Returns the D-Bus paths of the network devices.
pub async fn get_devices(&self) -> Vec<ObjectPath> {
let objects = self.objects.lock().await;
objects
.devices_paths()
.iter()
.filter_map(|c| ObjectPath::try_from(c.clone()).ok())
.collect()
pub async fn get_devices(&self) -> zbus::fdo::Result<Vec<OwnedObjectPath>> {
let actions = self.actions.lock().await;
let (tx, rx) = oneshot::channel();
actions.send(Action::GetDevicesPaths(tx)).unwrap();
let result = rx.await.unwrap();
Ok(result)
}
}

Expand Down
78 changes: 43 additions & 35 deletions rust/agama-dbus-server/src/network/dbus/tree.rs
Original file line number Diff line number Diff line change
@@ -1,10 +1,9 @@
use agama_lib::error::ServiceError;
use tokio::sync::Mutex;
use zbus::zvariant::{ObjectPath, OwnedObjectPath};

use crate::network::{action::Action, dbus::interfaces, model::*};
use log;
use std::{collections::HashMap, sync::Arc};
use std::collections::HashMap;
use tokio::sync::mpsc::UnboundedSender;

const CONNECTIONS_PATH: &str = "/org/opensuse/Agama1/Network/connections";
Expand All @@ -14,7 +13,7 @@ const DEVICES_PATH: &str = "/org/opensuse/Agama1/Network/devices";
pub struct Tree {
connection: zbus::Connection,
actions: UnboundedSender<Action>,
objects: Arc<Mutex<ObjectsRegistry>>,
objects: ObjectsRegistry,
}

impl Tree {
Expand All @@ -37,7 +36,7 @@ impl Tree {
///
/// * `connections`: list of connections.
pub async fn set_connections(
&self,
&mut self,
connections: &mut [Connection],
) -> Result<(), ServiceError> {
self.remove_connections().await?;
Expand All @@ -63,15 +62,11 @@ impl Tree {
let path = ObjectPath::try_from(path.as_str()).unwrap();
self.add_interface(&path, interfaces::Device::new(dev.clone()))
.await?;
let mut objects = self.objects.lock().await;
objects.register_device(&dev.name, path);
self.objects.register_device(&dev.name, path);
}

self.add_interface(
DEVICES_PATH,
interfaces::Devices::new(Arc::clone(&self.objects)),
)
.await?;
self.add_interface(DEVICES_PATH, interfaces::Devices::new(self.actions.clone()))
.await?;

Ok(())
}
Expand All @@ -81,17 +76,16 @@ impl Tree {
/// * `conn`: connection to add.
/// * `notify`: whether to notify the added connection
pub async fn add_connection(
&self,
&mut self,
conn: &mut Connection,
) -> Result<OwnedObjectPath, ServiceError> {
let mut objects = self.objects.lock().await;

let uuid = conn.uuid;
let (id, path) = objects.register_connection(conn);
let (id, path) = self.objects.register_connection(conn);
if id != conn.id {
conn.id = id.clone();
}
log::info!("Publishing network connection '{}'", id);
let path: OwnedObjectPath = path.into();
log::info!("Publishing network connection '{}' on '{}'", id, &path);

self.add_interface(
&path,
Expand All @@ -115,59 +109,73 @@ impl Tree {
.await?;
}

Ok(path.into())
Ok(path)
}

/// Removes a connection from the tree
///
/// * `id`: connection ID.
pub async fn remove_connection(&mut self, id: &str) -> Result<(), ServiceError> {
let mut objects = self.objects.lock().await;
let Some(path) = objects.connection_path(id) else {
let Some(path) = self.objects.connection_path(id) else {
return Ok(());
};
self.remove_connection_on(path.as_str()).await?;
objects.deregister_connection(id).unwrap();
self.objects.deregister_connection(id).unwrap();
Ok(())
}

/// Returns all devices paths.
pub fn devices_paths(&self) -> Vec<OwnedObjectPath> {
self.objects.devices_paths()
}

/// Returns all connection paths.
pub fn connections_paths(&self) -> Vec<OwnedObjectPath> {
self.objects.connections_paths()
}

pub fn connection_path(&self, id: &str) -> Option<OwnedObjectPath> {
self.objects.connection_path(id).map(|o| o.into())
}

/// Adds connections to the D-Bus tree.
///
/// * `connections`: list of connections.
async fn add_connections(&self, connections: &mut [Connection]) -> Result<(), ServiceError> {
async fn add_connections(
&mut self,
connections: &mut [Connection],
) -> Result<(), ServiceError> {
for conn in connections.iter_mut() {
self.add_connection(conn).await?;
}

self.add_interface(
CONNECTIONS_PATH,
interfaces::Connections::new(Arc::clone(&self.objects), self.actions.clone()),
interfaces::Connections::new(self.actions.clone()),
)
.await?;

Ok(())
}

/// Clears all the connections from the tree.
async fn remove_connections(&self) -> Result<(), ServiceError> {
let mut objects = self.objects.lock().await;
for path in objects.connections.values() {
async fn remove_connections(&mut self) -> Result<(), ServiceError> {
for path in self.objects.connections.values() {
self.remove_connection_on(path.as_str()).await?;
}
objects.connections.clear();
self.objects.connections.clear();
Ok(())
}

/// Clears all the devices from the tree.
async fn remove_devices(&mut self) -> Result<(), ServiceError> {
let object_server = self.connection.object_server();
let mut objects = self.objects.lock().await;
for path in objects.devices.values() {
for path in self.objects.devices.values() {
object_server
.remove::<interfaces::Device, _>(path.as_str())
.await?;
}
objects.devices.clear();
self.objects.devices.clear();
Ok(())
}

Expand All @@ -185,7 +193,7 @@ impl Tree {
Ok(())
}

async fn add_interface<T>(&self, path: &str, iface: T) -> Result<bool, ServiceError>
async fn add_interface<T>(&mut self, path: &str, iface: T) -> Result<bool, ServiceError>
where
T: zbus::Interface,
{
Expand All @@ -198,7 +206,7 @@ impl Tree {
///
/// Connections are indexed by its Id, which is expected to be unique.
#[derive(Debug, Default)]
pub struct ObjectsRegistry {
struct ObjectsRegistry {
/// device_name (eth0) -> object_path
devices: HashMap<String, OwnedObjectPath>,
/// id -> object_path
Expand Down Expand Up @@ -246,13 +254,13 @@ impl ObjectsRegistry {
}

/// Returns all devices paths.
pub fn devices_paths(&self) -> Vec<String> {
self.devices.values().map(|p| p.to_string()).collect()
pub fn devices_paths(&self) -> Vec<OwnedObjectPath> {
self.devices.values().cloned().collect()
}

/// Returns all connection paths.
pub fn connections_paths(&self) -> Vec<String> {
self.connections.values().map(|p| p.to_string()).collect()
pub fn connections_paths(&self) -> Vec<OwnedObjectPath> {
self.connections.values().cloned().collect()
}

/// Proposes a connection ID.
Expand Down
Loading

0 comments on commit e4ae7ed

Please sign in to comment.