Skip to content

Commit

Permalink
Merge pull request #1063 from skalenetwork/statsd
Browse files Browse the repository at this point in the history
Introduce statsd
  • Loading branch information
DmytroNazarenko authored May 17, 2024
2 parents 11048bb + 5bacac8 commit d430d8e
Show file tree
Hide file tree
Showing 25 changed files with 372 additions and 160 deletions.
4 changes: 2 additions & 2 deletions admin.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@
from core.schains.process_manager import run_process_manager
from core.schains.cleaner import run_cleaner
from core.updates import soft_updates
from core.filebeat import update_filebeat_service
from core.monitoring import update_monitoring_services

from tools.configs import BACKUP_RUN, INIT_LOCK_PATH, PULL_CONFIG_FOR_SCHAIN
from tools.configs.web3 import (
Expand Down Expand Up @@ -82,7 +82,7 @@ def worker():
skale_ima = SkaleIma(ENDPOINT, MAINNET_IMA_ABI_FILEPATH, wallet)
if BACKUP_RUN:
logger.info('Running sChains in snapshot download mode')
update_filebeat_service(node_config.ip, node_config.id, skale)
update_monitoring_services(node_config.ip, node_config.id, skale)
monitor(skale, skale_ima, node_config)


Expand Down
2 changes: 1 addition & 1 deletion app.py
Original file line number Diff line number Diff line change
Expand Up @@ -35,10 +35,10 @@
FLASK_DEBUG_MODE
)
from tools.configs.web3 import ENDPOINT
from tools.db import get_database, REDIS_URI
from tools.docker_utils import DockerUtils
from tools.helper import wait_until_admin_inited
from tools.logger import init_api_logger
from tools.resources import get_database, REDIS_URI
from tools.str_formatters import arguments_list_string

from web.routes.node import node_bp
Expand Down
53 changes: 0 additions & 53 deletions core/filebeat.py

This file was deleted.

122 changes: 122 additions & 0 deletions core/monitoring.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,122 @@
# -*- coding: utf-8 -*-
#
# This file is part of SKALE Admin
#
# Copyright (C) 2020 SKALE Labs
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU Affero General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU Affero General Public License for more details.
#
# You should have received a copy of the GNU Affero General Public License
# along with this program. If not, see <https://www.gnu.org/licenses/>.

import logging
from typing import Optional

from tools.helper import process_template
from tools.docker_utils import DockerUtils

from tools.configs import SKALE_DIR_HOST
from tools.configs.monitoring import (
FILEBEAT_TEMPLATE_PATH, FILEBEAT_CONTAINER_NAME,
FILEBEAT_CONFIG_PATH,
INFLUX_TOKEN, INFLUX_URL,
TELEGRAF,
TELEGRAF_CONTAINER_NAME, TELEGRAF_IMAGE,
TELEGRAF_TEMPLATE_PATH,
TELEGRAF_CONFIG_PATH,
TELEGRAF_MEM_LIMIT
)

logger = logging.getLogger(__name__)


class TelegrafNotConfiguredError(Exception):
pass


def update_filebeat_service(node_ip, node_id, skale, dutils: Optional[DockerUtils] = None):
dutils = dutils or DockerUtils()
contract_address = skale.manager.address
template_data = {
'ip': node_ip,
'id': node_id,
'contract_address': contract_address
}

logger.info('Configuring filebeat %s', template_data)
process_template(FILEBEAT_TEMPLATE_PATH, FILEBEAT_CONFIG_PATH, template_data)
filebeat_container = dutils.client.containers.get(FILEBEAT_CONTAINER_NAME)
filebeat_container.restart()
logger.info('Filebeat config updated, telegraf restarted')


def filebeat_config_processed() -> bool:
with open(FILEBEAT_CONFIG_PATH) as f:
return 'id: ' in f.read()


def ensure_telegraf_running(dutils: Optional[DockerUtils] = None) -> None:
if dutils.is_container_exists(TELEGRAF_CONTAINER_NAME):
dutils.restart(TELEGRAF_CONTAINER_NAME)
else:
dutils.run_container(
image_name=TELEGRAF_IMAGE,
name=TELEGRAF_CONTAINER_NAME,
network_mode='host',
user='telegraf:998',
restart_policy={'name': 'on-failure'},
environment={'HOST_PROC': '/host/proc'},
volumes={
'/proc': {'bind': '/host/proc', 'mode': 'ro'},
f'{SKALE_DIR_HOST}/config/telegraf.conf': {'bind': '/etc/telegraf/telegraf.conf', 'mode': 'ro'}, # noqa
f'{SKALE_DIR_HOST}/node_data/telegraf': {'bind': '/var/lib/telegraf', 'mode': 'rw'},
'/var/run/skale/': {'bind': '/var/run/skale', 'mode': 'rw'}
},
mem_limit=TELEGRAF_MEM_LIMIT
)


def update_telegraf_service(
node_ip: str,
node_id: int,
token: str = INFLUX_TOKEN,
url: str = INFLUX_URL,
dutils: Optional[DockerUtils] = None
) -> None:
dutils = dutils or DockerUtils()
template_data = {
'ip': node_ip,
'node_id': str(node_id),
'token': token,
'url': url
}
missing = list(filter(lambda k: not template_data[k], template_data))

if missing:
emsg = f'TELEGRAF=True is set, but missing options {template_data}'
raise TelegrafNotConfiguredError(emsg)

logger.info('Configuring telegraf %s', template_data)
process_template(TELEGRAF_TEMPLATE_PATH, TELEGRAF_CONFIG_PATH, template_data)

ensure_telegraf_running(dutils)
logger.info('Telegraf config updated, telegraf restarted')


def telegraf_config_processed() -> bool:
with open(TELEGRAF_CONFIG_PATH) as f:
return 'id: ' in f.read()


def update_monitoring_services(node_ip, node_id, skale, dutils: Optional[DockerUtils] = None):
update_filebeat_service(node_ip, node_id, skale)
if TELEGRAF:
update_telegraf_service(node_ip, node_id)
4 changes: 2 additions & 2 deletions core/node.py
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@
from skale.utils.helper import ip_from_bytes
from skale.utils.web3_utils import public_key_to_address, to_checksum_address

from core.filebeat import update_filebeat_service
from core.monitoring import update_monitoring_services
from tools.configs import WATCHDOG_PORT, CHANGE_IP_DELAY, CHECK_REPORT_PATH, META_FILEPATH
from tools.helper import read_json
from tools.str_formatters import arguments_list_string
Expand Down Expand Up @@ -135,7 +135,7 @@ def register(self, ip, public_ip, port, name, domain_name,
self.config.name = name
self.config.ip = ip

update_filebeat_service(public_ip, self.config.id, self.skale)
update_monitoring_services(public_ip, self.config.id, self.skale)
return self._ok(data=self.config.all())

def create_node_on_contracts(self, ip, public_ip, port, name, domain_name,
Expand Down
16 changes: 15 additions & 1 deletion core/schains/checks.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,8 +22,10 @@
import time
from abc import ABC, abstractmethod
from typing import Any, Dict, List, Optional
from core.node import ExtendedManagerNodeInfo, get_current_ips

import statsd

from core.node import ExtendedManagerNodeInfo, get_current_ips
from core.schains.config.directory import get_schain_check_filepath
from core.schains.config.file_manager import ConfigFileManager
from core.schains.config.helper import (
Expand Down Expand Up @@ -58,6 +60,7 @@
from tools.configs.containers import IMA_CONTAINER, SCHAIN_CONTAINER
from tools.docker_utils import DockerUtils
from tools.helper import write_json
from tools.resources import get_statsd_client
from tools.str_formatters import arguments_list_string

from web.models.schain import SChainRecord
Expand Down Expand Up @@ -111,6 +114,7 @@ def get_name(self) -> str:
def get_all(self,
log: bool = True,
save: bool = False,
expose: bool = False,
needed: Optional[List[str]] = None) -> Dict:
if needed:
names = needed
Expand All @@ -122,6 +126,8 @@ def get_all(self,
if hasattr(self, name):
logger.debug('Running check %s', name)
checks_status[name] = getattr(self, name).status
if expose:
send_to_statsd(self.statsd_client, self.get_name(), checks_status)
if log:
log_checks_dict(self.get_name(), checks_status)
if save:
Expand Down Expand Up @@ -165,6 +171,7 @@ def __init__(self,
self.cfm: ConfigFileManager = ConfigFileManager(
schain_name=schain_name
)
self.statsd_client = get_statsd_client()

def get_name(self) -> str:
return self.name
Expand Down Expand Up @@ -255,6 +262,7 @@ def __init__(
self.cfm: ConfigFileManager = ConfigFileManager(
schain_name=schain_name
)
self.statsd_client = get_statsd_client()

def get_name(self) -> str:
return self.name
Expand Down Expand Up @@ -512,3 +520,9 @@ def log_checks_dict(schain_name, checks_dict):
'Failed sChain checks', 'error'
)
)


def send_to_statsd(statsd_client: statsd.StatsClient, schain_name: str, checks_dict: dict) -> None:
for check, result in checks_dict.items():
mname = f'admin.checks.{schain_name}.{check}'
statsd_client.gauge(mname, int(result))
13 changes: 12 additions & 1 deletion core/schains/dkg/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
from core.schains.dkg.broadcast_filter import Filter
from core.schains.dkg.structures import ComplaintReason, DKGStep
from tools.configs import NODE_DATA_PATH, SGX_CERTIFICATES_FOLDER
from tools.resources import get_statsd_client
from tools.sgx_utils import sgx_unreachable_retry

sys.path.insert(0, NODE_DATA_PATH)
Expand Down Expand Up @@ -171,9 +172,19 @@ def __init__(
self.complaint_error_event_hash = self.skale.web3.to_hex(self.skale.web3.keccak(
text="ComplaintError(string)"
))
self.last_completed_step = step # last step
self.statsd_client = get_statsd_client()
self._last_completed_step = step # last step
logger.info(f'sChain: {self.schain_name}. DKG timeout is {self.dkg_timeout}')

@property
def last_completed_step(self) -> DKGStep:
return self._last_completed_step

@last_completed_step.setter
def last_completed_step(self, value: DKGStep):
self.statsd_client.gauge(f'admin.dkg.last_completed_step.{self.schain_name}', value)
self._last_completed_step = value

def is_channel_opened(self):
return self.skale.dkg.is_channel_opened(self.group_index)

Expand Down
Loading

0 comments on commit d430d8e

Please sign in to comment.