Skip to content

Commit

Permalink
Made tokio runtime dynamic based on configuration of daemon.
Browse files Browse the repository at this point in the history
  • Loading branch information
davidv1992 committed Dec 13, 2024
1 parent ef33a8c commit 419832f
Show file tree
Hide file tree
Showing 8 changed files with 218 additions and 197 deletions.
5 changes: 2 additions & 3 deletions ntpd/bin/ntp-ctl.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
#![forbid(unsafe_code)]

#[tokio::main]
async fn main() -> std::io::Result<std::process::ExitCode> {
ntpd::ctl_main().await
fn main() -> std::io::Result<std::process::ExitCode> {
ntpd::ctl_main()
}
5 changes: 2 additions & 3 deletions ntpd/bin/ntp-daemon.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,7 @@

use std::process;

#[tokio::main]
async fn main() {
let result = ntpd::daemon_main().await;
fn main() {
let result = ntpd::daemon_main();
process::exit(if result.is_ok() { 0 } else { 1 });
}
5 changes: 2 additions & 3 deletions ntpd/bin/ntp-metrics-exporter.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
#![forbid(unsafe_code)]

#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
ntpd::metrics_exporter_main().await
fn main() -> Result<(), Box<dyn std::error::Error>> {
ntpd::metrics_exporter_main()
}
26 changes: 16 additions & 10 deletions ntpd/src/ctl.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ use crate::{
daemon::{config::CliArg, tracing::LogLevel, Config, ObservableState},
force_sync,
};
use tokio::runtime::Builder;
use tracing_subscriber::util::SubscriberInitExt;

const USAGE_MSG: &str = "\
Expand Down Expand Up @@ -146,10 +147,10 @@ impl NtpCtlOptions {
}
}

async fn validate(config: Option<PathBuf>) -> std::io::Result<ExitCode> {
fn validate(config: Option<PathBuf>) -> std::io::Result<ExitCode> {
// Late completion not needed, so ignore result.
crate::daemon::tracing::tracing_init(LogLevel::Info, true).init();
match Config::from_args(config, vec![], vec![]).await {
match Config::from_args(config, vec![], vec![]) {
Ok(config) => {
if config.check() {
eprintln!("Config looks good");
Expand All @@ -167,7 +168,7 @@ async fn validate(config: Option<PathBuf>) -> std::io::Result<ExitCode> {

const VERSION: &str = env!("CARGO_PKG_VERSION");

pub async fn main() -> std::io::Result<ExitCode> {
pub fn main() -> std::io::Result<ExitCode> {
let options = match NtpCtlOptions::try_parse_from(std::env::args()) {
Ok(options) => options,
Err(msg) => return Err(std::io::Error::new(std::io::ErrorKind::InvalidInput, msg)),
Expand All @@ -182,10 +183,10 @@ pub async fn main() -> std::io::Result<ExitCode> {
eprintln!("ntp-ctl {VERSION}");
Ok(ExitCode::SUCCESS)
}
NtpCtlAction::Validate => validate(options.config).await,
NtpCtlAction::ForceSync => force_sync::force_sync(options.config).await,
NtpCtlAction::Validate => validate(options.config),
NtpCtlAction::ForceSync => force_sync::force_sync(options.config),
NtpCtlAction::Status => {
let config = Config::from_args(options.config, vec![], vec![]).await;
let config = Config::from_args(options.config, vec![], vec![]);

if let Err(ref e) = config {
println!("Warning: Unable to load configuration file: {e}");
Expand All @@ -198,10 +199,15 @@ pub async fn main() -> std::io::Result<ExitCode> {
.observation_path
.unwrap_or_else(|| PathBuf::from("/var/run/ntpd-rs/observe"));

match options.format {
Format::Plain => print_state(Format::Plain, observation).await,
Format::Prometheus => print_state(Format::Prometheus, observation).await,
}
Builder::new_current_thread()
.enable_all()
.build()?
.block_on(async {
match options.format {
Format::Plain => print_state(Format::Plain, observation).await,
Format::Prometheus => print_state(Format::Prometheus, observation).await,
}
})
}
}
}
Expand Down
16 changes: 8 additions & 8 deletions ntpd/src/daemon/config/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ use ntp_proto::{AlgorithmConfig, SourceDefaultsConfig, SynchronizationConfig};
pub use ntp_source::*;
use serde::{Deserialize, Deserializer};
pub use server::*;
use std::io;
use std::{
fmt::Display,
io::ErrorKind,
Expand All @@ -18,7 +19,6 @@ use std::{
str::FromStr,
};
use timestamped_socket::interface::InterfaceName;
use tokio::{fs::read_to_string, io};
use tracing::{info, warn};

use super::{clock::NtpClockWrapper, tracing::LogLevel};
Expand Down Expand Up @@ -373,31 +373,31 @@ pub struct Config {
}

impl Config {
async fn from_file(file: impl AsRef<Path>) -> Result<Config, ConfigError> {
fn from_file(file: impl AsRef<Path>) -> Result<Config, ConfigError> {
let meta = std::fs::metadata(&file)?;
let perm = meta.permissions();

if perm.mode() as libc::mode_t & libc::S_IWOTH != 0 {
warn!("Unrestricted config file permissions: Others can write.");
}

let contents = read_to_string(file).await?;
let contents = std::fs::read_to_string(file)?;
Ok(toml::de::from_str(&contents)?)
}

async fn from_first_file(file: Option<impl AsRef<Path>>) -> Result<Config, ConfigError> {
fn from_first_file(file: Option<impl AsRef<Path>>) -> Result<Config, ConfigError> {
// if an explicit file is given, always use that one
if let Some(f) = file {
let path: &Path = f.as_ref();
info!(?path, "using config file");
return Config::from_file(f).await;
return Config::from_file(f);
}

// for the global file we also ignore it when there are permission errors
let global_path = Path::new("/etc/ntpd-rs/ntp.toml");
if global_path.exists() {
info!("using config file at default location `{:?}`", global_path);
match Config::from_file(global_path).await {
match Config::from_file(global_path) {
Err(ConfigError::Io(e)) if e.kind() == ErrorKind::PermissionDenied => {
warn!("permission denied on global config file! using default config ...");
}
Expand All @@ -410,12 +410,12 @@ impl Config {
Ok(Config::default())
}

pub async fn from_args(
pub fn from_args(
file: Option<impl AsRef<Path>>,
sources: Vec<NtpSourceConfig>,
servers: Vec<ServerConfig>,
) -> Result<Config, ConfigError> {
let mut config = Config::from_first_file(file.as_ref()).await?;
let mut config = Config::from_first_file(file.as_ref())?;

if !sources.is_empty() {
if !config.sources.is_empty() {
Expand Down
114 changes: 60 additions & 54 deletions ntpd/src/daemon/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ pub use config::Config;
use ntp_proto::KalmanClockController;
pub use observer::ObservableState;
pub use system::spawn;
use tokio::runtime::Builder;
use tracing_subscriber::util::SubscriberInitExt;

use config::NtpDaemonOptions;
Expand All @@ -28,7 +29,7 @@ use self::tracing::LogLevel;

const VERSION: &str = env!("CARGO_PKG_VERSION");

pub async fn main() -> Result<(), Box<dyn Error>> {
pub fn main() -> Result<(), Box<dyn Error>> {
let options = NtpDaemonOptions::try_parse_from(std::env::args())?;

match options.action {
Expand All @@ -38,34 +39,31 @@ pub async fn main() -> Result<(), Box<dyn Error>> {
config::NtpDaemonAction::Version => {
eprintln!("ntp-daemon {VERSION}");
}
config::NtpDaemonAction::Run => run(options).await?,
config::NtpDaemonAction::Run => run(options)?,
}

Ok(())
}

// initializes the logger so that logs during config parsing are reported. Then it overrides the
// log level based on the config if required.
pub(crate) async fn initialize_logging_parse_config(
pub(crate) fn initialize_logging_parse_config(
initial_log_level: Option<LogLevel>,
config_path: Option<PathBuf>,
) -> Config {
let mut log_level = initial_log_level.unwrap_or_default();

let config_tracing = crate::daemon::tracing::tracing_init(log_level, true);
let config = ::tracing::subscriber::with_default(config_tracing, || {
async {
match Config::from_args(config_path, vec![], vec![]).await {
Ok(c) => c,
Err(e) => {
// print to stderr because tracing is not yet setup
eprintln!("There was an error loading the config: {e}");
std::process::exit(exitcode::CONFIG);
}
match Config::from_args(config_path, vec![], vec![]) {
Ok(c) => c,
Err(e) => {
// print to stderr because tracing is not yet setup
eprintln!("There was an error loading the config: {e}");
std::process::exit(exitcode::CONFIG);
}
}
})
.await;
});

if let Some(config_log_level) = config.observability.log_level {
if initial_log_level.is_none() {
Expand All @@ -80,51 +78,59 @@ pub(crate) async fn initialize_logging_parse_config(
config
}

async fn run(options: NtpDaemonOptions) -> Result<(), Box<dyn Error>> {
let config = initialize_logging_parse_config(options.log_level, options.config).await;
fn run(options: NtpDaemonOptions) -> Result<(), Box<dyn Error>> {
let config = initialize_logging_parse_config(options.log_level, options.config);

// give the user a warning that we use the command line option
if config.observability.log_level.is_some() && options.log_level.is_some() {
info!("Log level override from command line arguments is active");
}
let runtime = if config.servers.is_empty() && config.nts_ke.is_empty() {
Builder::new_current_thread().enable_all().build()?
} else {
Builder::new_multi_thread().enable_all().build()?
};

// Warn/error if the config is unreasonable. We do this after finishing
// tracing setup to ensure logging is fully configured.
config.check();

// we always generate the keyset (even if NTS is not used)
let keyset = nts_key_provider::spawn(config.keyset).await;

#[cfg(feature = "hardware-timestamping")]
let clock_config = config.clock;

#[cfg(not(feature = "hardware-timestamping"))]
let clock_config = config::ClockConfig::default();

::tracing::debug!("Configuration loaded, spawning daemon jobs");
let (main_loop_handle, channels) = spawn::<KalmanClockController<_, _>>(
config.synchronization.synchronization_base,
config.synchronization.algorithm,
config.source_defaults,
clock_config,
&config.sources,
&config.servers,
keyset.clone(),
)
.await?;

for nts_ke_config in config.nts_ke {
let _join_handle = keyexchange::spawn(nts_ke_config, keyset.clone());
}
runtime.block_on(async {
// give the user a warning that we use the command line option
if config.observability.log_level.is_some() && options.log_level.is_some() {
info!("Log level override from command line arguments is active");
}

observer::spawn(
&config.observability,
channels.source_snapshots,
channels.server_data_receiver,
channels.system_snapshot_receiver,
);
// Warn/error if the config is unreasonable. We do this after finishing
// tracing setup to ensure logging is fully configured.
config.check();

// we always generate the keyset (even if NTS is not used)
let keyset = nts_key_provider::spawn(config.keyset).await;

#[cfg(feature = "hardware-timestamping")]
let clock_config = config.clock;

#[cfg(not(feature = "hardware-timestamping"))]
let clock_config = config::ClockConfig::default();

::tracing::debug!("Configuration loaded, spawning daemon jobs");
let (main_loop_handle, channels) = spawn::<KalmanClockController<_, _>>(
config.synchronization.synchronization_base,
config.synchronization.algorithm,
config.source_defaults,
clock_config,
&config.sources,
&config.servers,
keyset.clone(),
)
.await?;

for nts_ke_config in config.nts_ke {
let _join_handle = keyexchange::spawn(nts_ke_config, keyset.clone());
}

observer::spawn(
&config.observability,
channels.source_snapshots,
channels.server_data_receiver,
channels.system_snapshot_receiver,
);

Ok(main_loop_handle.await??)
Ok(main_loop_handle.await??)
})
}

pub(crate) mod exitcode {
Expand Down
Loading

0 comments on commit 419832f

Please sign in to comment.