Skip to content

Commit

Permalink
feat(watcher): telegram alerting and automatically persist val node (#…
Browse files Browse the repository at this point in the history
…1127)

Description
---

* Add Telegram support for alerting on various events
* Add automatic restart of `tari_validator_node` if it crashes and
configuration to disable

How Has This Been Tested?
---
Create TG channel and populate `credentials` (bot token) and
`channel_id` in the config file. Go through previous steps of starting
`tari_swarm_daemon` and have the `tari_watcher` register the node.

Look at the watcher logs for the spawned process pid, or use `ps aux |
grep 'tari_validator_node'` and look for the process with the default VN
folder, and call `kill -9 $PID`. This induces a crash of the node and if
`auto_restart` is enabled, the `tari_watcher` will start a new validator
node. It will keep the state of registry since before, as long as the
`tari_watcher` does not exit, and send registration transaction as
usual.
  • Loading branch information
therealdannzor authored Aug 30, 2024
1 parent 23bbb44 commit d85bf65
Show file tree
Hide file tree
Showing 8 changed files with 367 additions and 169 deletions.
4 changes: 2 additions & 2 deletions applications/tari_watcher/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@

### Quickstart

Initialize the project with `tari_watcher init` and start it with `tari_watcher run`. Edit the newly generated `config.toml` to enable notifications on channels such as Mattermost and Telegram.Make sure to have started up `tari_validator_node` once previously to have a node directory set up, default is `tari_validator_node -- -b data/vn1`.
Initialize the project with `tari_watcher init` and start it with `tari_watcher run`. Edit the newly generated `config.toml` to enable notifications on Mattermost and Telegram. Make sure to have started up `tari_validator_node` once previously to have a node directory set up, default is `tari_validator_node -- -b data/vn1`.

### Setup

Expand All @@ -29,7 +29,7 @@ The default values used (see `constants.rs`) when running the project without an
```
├── alerting.rs # channel notifier implementations
├── cli.rs # cli and flags passed during bootup
├── config.rs # main config file creation
├── config.rs # main config file creation
├── constants.rs # various constants used as default values
├── helpers.rs # common helper functions
├── logger.rs
Expand Down
94 changes: 75 additions & 19 deletions applications/tari_watcher/src/alerting.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,6 @@ use reqwest::StatusCode;
use serde_json::json;

pub trait Alerting {
fn new(url: String, channel_id: String, credentials: String) -> Self;

// Sends an alert message to the service
async fn alert(&mut self, message: &str) -> Result<()>;

Expand All @@ -21,31 +19,31 @@ pub trait Alerting {

pub struct MatterMostNotifier {
// Mattermost server URL
server_url: String,
pub server_url: String,
// Mattermost channel ID used for alerts
channel_id: String,
pub channel_id: String,
// User token (retrieved after login)
credentials: String,
pub credentials: String,
// Alerts sent since last reset
alerts_sent: u64,
pub alerts_sent: u64,
// HTTP client
client: reqwest::Client,
pub client: reqwest::Client,
}

impl Alerting for MatterMostNotifier {
fn new(server_url: String, channel_id: String, credentials: String) -> Self {
Self {
server_url,
channel_id,
credentials,
alerts_sent: 0,
client: reqwest::Client::new(),
async fn alert(&mut self, message: &str) -> Result<()> {
if self.server_url.is_empty() {
bail!("Server URL field is empty");
}
if self.credentials.is_empty() {
bail!("Credentials field is empty");
}
if self.channel_id.is_empty() {
bail!("Channel ID is empty");
}
}

async fn alert(&mut self, message: &str) -> Result<()> {
const LOGIN_ENDPOINT: &str = "/api/v4/posts";
let url = format!("{}{}", self.server_url, LOGIN_ENDPOINT);
const POST_ENDPOINT: &str = "/api/v4/posts";
let url = format!("{}{}", self.server_url, POST_ENDPOINT);
let req = json!({
"channel_id": self.channel_id,
"message": message,
Expand Down Expand Up @@ -73,7 +71,7 @@ impl Alerting for MatterMostNotifier {
bail!("Server URL is empty");
}
if self.credentials.is_empty() {
bail!("Credentials are empty");
bail!("Credentials field is empty");
}

let url = format!("{}{}", self.server_url, PING_ENDPOINT);
Expand All @@ -95,3 +93,61 @@ impl Alerting for MatterMostNotifier {
Ok(self.alerts_sent)
}
}

pub struct TelegramNotifier {
// Telegram bot token
pub bot_token: String,
// Telegram chat ID (either in @channel or number id format)
pub chat_id: String,
// Alerts sent since last reset
pub alerts_sent: u64,
// HTTP client
pub client: reqwest::Client,
}

impl Alerting for TelegramNotifier {
async fn alert(&mut self, message: &str) -> Result<()> {
if self.bot_token.is_empty() {
bail!("Bot token is empty");
}
if self.chat_id.is_empty() {
bail!("Chat ID is empty");
}

let post_endpoint: &str = &format!("/bot{}/sendMessage", self.bot_token);
let url = format!("https://api.telegram.org{}", post_endpoint);
let req = json!({
"chat_id": self.chat_id,
"text": message,
});
let resp = self.client.post(&url).json(&req).send().await?;

if resp.status() != StatusCode::OK {
bail!("Failed to send alert, got response: {}", resp.status());
}

self.alerts_sent += 1;

Ok(())
}

async fn ping(&self) -> Result<()> {
if self.bot_token.is_empty() {
bail!("Bot token is empty");
}

let ping_endpoint: &str = &format!("/bot{}/getMe", self.bot_token);
let url = format!("https://api.telegram.org{}", ping_endpoint);
let resp = self.client.get(url.clone()).send().await?;

if resp.status() != StatusCode::OK {
bail!("Failed to ping, got response: {}", resp.status());
}

Ok(())
}

fn stats(&self) -> Result<u64> {
Ok(self.alerts_sent)
}
}
5 changes: 5 additions & 0 deletions applications/tari_watcher/src/cli.rs
Original file line number Diff line number Diff line change
Expand Up @@ -56,11 +56,16 @@ pub struct InitArgs {
#[clap(long)]
/// Disable initial and auto registration of the validator node
pub no_auto_register: bool,

#[clap(long)]
/// Disable auto restart of the validator node
pub no_auto_restart: bool,
}

impl InitArgs {
pub fn apply(&self, config: &mut Config) {
config.auto_register = !self.no_auto_register;
config.auto_restart = !self.no_auto_restart;
}
}

Expand Down
6 changes: 5 additions & 1 deletion applications/tari_watcher/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,9 @@ pub struct Config {
/// the current registration expires
pub auto_register: bool,

/// Allow watcher to restart the validator node if it crashes and stops running
pub auto_restart: bool,

/// The Minotari node gRPC address
pub base_node_grpc_address: String,

Expand Down Expand Up @@ -158,6 +161,7 @@ pub fn get_base_config(cli: &Cli) -> anyhow::Result<Config> {

Ok(Config {
auto_register: true,
auto_restart: true,
base_node_grpc_address: DEFAULT_BASE_NODE_GRPC_ADDRESS.to_string(),
base_wallet_grpc_address: DEFAULT_BASE_WALLET_GRPC_ADDRESS.to_string(),
base_dir: base_dir.to_path_buf(),
Expand All @@ -177,7 +181,7 @@ pub fn get_base_config(cli: &Cli) -> anyhow::Result<Config> {
telegram: ChannelConfig {
name: "telegram".to_string(),
enabled: false,
server_url: "".to_string(),
server_url: "https://api.telegram.org".to_string(),
channel_id: "".to_string(),
credentials: "".to_string(),
},
Expand Down
10 changes: 7 additions & 3 deletions applications/tari_watcher/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,7 @@ async fn main() -> anyhow::Result<()> {
async fn start(config: Config) -> anyhow::Result<()> {
let shutdown = Shutdown::new();
let signal = shutdown.to_signal().select(exit_signal()?);
let (task_handle, manager_handle) = spawn(config.clone(), shutdown.to_signal()).await;
let (task_handle, manager_handle) = spawn(config.clone(), shutdown.to_signal(), shutdown).await;

tokio::select! {
_ = signal => {
Expand All @@ -92,8 +92,12 @@ async fn start(config: Config) -> anyhow::Result<()> {
Ok(())
}

async fn spawn(config: Config, shutdown: ShutdownSignal) -> (task::JoinHandle<anyhow::Result<()>>, ManagerHandle) {
let (manager, manager_handle) = ProcessManager::new(config, shutdown);
async fn spawn(
config: Config,
shutdown: ShutdownSignal,
trigger: Shutdown,
) -> (task::JoinHandle<anyhow::Result<()>>, ManagerHandle) {
let (manager, manager_handle) = ProcessManager::new(config, shutdown, trigger);
let task_handle = tokio::spawn(manager.start());
(task_handle, manager_handle)
}
29 changes: 19 additions & 10 deletions applications/tari_watcher/src/manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,46 +10,48 @@ use minotari_app_grpc::tari_rpc::{
GetActiveValidatorNodesResponse,
RegisterValidatorNodeResponse,
};
use tari_shutdown::ShutdownSignal;
use tari_shutdown::{Shutdown, ShutdownSignal};
use tokio::sync::{mpsc, oneshot};

use crate::{
config::{Channels, Config, ExecutableConfig},
constants::DEFAULT_VALIDATOR_NODE_BINARY_PATH,
minotari::{Minotari, TipStatus},
monitoring::{process_status_alert, process_status_log, ProcessStatus, Transaction},
process::Process,
process::start_validator,
};

pub struct ProcessManager {
pub base_dir: PathBuf,
pub validator_base_dir: PathBuf,
pub validator_config: ExecutableConfig,
pub wallet_config: ExecutableConfig,
pub process: Process,
pub shutdown_signal: ShutdownSignal,
pub shutdown_signal: ShutdownSignal, // listen for keyboard exit signal
pub trigger_signal: Shutdown, // triggered when validator auto-restart is disabled
pub rx_request: mpsc::Receiver<ManagerRequest>,
pub chain: Minotari,
pub alerting_config: Channels,
pub auto_restart: bool,
}

impl ProcessManager {
pub fn new(config: Config, shutdown_signal: ShutdownSignal) -> (Self, ManagerHandle) {
pub fn new(config: Config, shutdown_signal: ShutdownSignal, trigger_signal: Shutdown) -> (Self, ManagerHandle) {
let (tx_request, rx_request) = mpsc::channel(1);
let this = Self {
base_dir: config.base_dir.clone(),
validator_base_dir: config.vn_base_dir,
validator_config: config.executable_config[0].clone(),
wallet_config: config.executable_config[1].clone(),
process: Process::new(),
shutdown_signal,
trigger_signal,
rx_request,
chain: Minotari::new(
config.base_node_grpc_address,
config.base_wallet_grpc_address,
config.vn_registration_file,
),
alerting_config: config.channel_config,
auto_restart: config.auto_restart,
};
(this, ManagerHandle::new(tx_request))
}
Expand All @@ -68,10 +70,15 @@ impl ProcessManager {
let vn_base_dir = self.base_dir.join(self.validator_base_dir);

// get child channel to communicate with the validator node process
let cc = self
.process
.start_validator(vn_binary_path, vn_base_dir, self.base_dir, self.alerting_config)
.await;
let cc = start_validator(
vn_binary_path,
vn_base_dir,
self.base_dir,
self.alerting_config,
self.auto_restart,
self.trigger_signal.clone(),
)
.await;
if cc.is_none() {
todo!("Create new validator node process event listener for fetched existing PID from OS");
}
Expand All @@ -80,9 +87,11 @@ impl ProcessManager {
// spawn logging and alerting tasks to process status updates
tokio::spawn(async move {
process_status_log(cc.rx_log).await;
warn!("Logging task has exited");
});
tokio::spawn(async move {
process_status_alert(cc.rx_alert, cc.cfg_alert).await;
warn!("Alerting task has exited");
});

self.chain.bootstrap().await?;
Expand Down
Loading

0 comments on commit d85bf65

Please sign in to comment.