diff --git a/core/schains/checks.py b/core/schains/checks.py index 73f12313b..8f4b0d9ee 100644 --- a/core/schains/checks.py +++ b/core/schains/checks.py @@ -32,11 +32,11 @@ get_base_port_from_config, get_node_ips_from_config, get_own_ip_from_config, - get_local_schain_http_endpoint_from_config + get_local_schain_http_endpoint_from_config, ) from core.schains.config.main import ( get_skaled_config_rotations_ids, - get_upstream_config_rotation_ids + get_upstream_config_rotation_ids, ) from core.schains.dkg.utils import get_secret_key_share_filepath from core.schains.firewall.types import IRuleController @@ -45,21 +45,21 @@ from core.schains.rpc import ( check_endpoint_alive, check_endpoint_blocks, - get_endpoint_alive_check_timeout + get_endpoint_alive_check_timeout, ) from core.schains.external_config import ExternalConfig, ExternalState from core.schains.runner import ( get_container_name, get_ima_container_time_frame, get_image_name, - is_new_image_pulled + is_new_image_pulled, ) from core.schains.skaled_exit_codes import SkaledExitCodes from core.schains.volume import is_volume_exists from tools.configs.containers import IMA_CONTAINER, SCHAIN_CONTAINER from tools.docker_utils import DockerUtils -from tools.helper import write_json +from tools.helper import no_hyphens, write_json from tools.resources import get_statsd_client from tools.str_formatters import arguments_list_string @@ -79,7 +79,7 @@ 'rpc', 'blocks', 'process', - 'ima_container' + 'ima_container', ] TG_ALLOWED_CHECKS = [ @@ -90,7 +90,7 @@ 'rpc', 'blocks', 'process', - 'ima_container' + 'ima_container', ] @@ -111,11 +111,13 @@ class IChecks(ABC): def get_name(self) -> str: pass - def get_all(self, - log: bool = True, - save: bool = False, - expose: bool = False, - needed: Optional[List[str]] = None) -> Dict: + def get_all( + self, + log: bool = True, + save: bool = False, + expose: bool = False, + needed: Optional[List[str]] = None, + ) -> Dict: if needed: names = needed else: @@ -140,25 +142,27 @@ def is_healthy(self) -> bool: @classmethod def get_check_names(cls): - return list(filter( - lambda c: not c.startswith('_') and isinstance( - getattr(cls, c), property), - dir(cls) - )) + return list( + filter( + lambda c: not c.startswith('_') and isinstance(getattr(cls, c), property), dir(cls) + ) + ) class ConfigChecks(IChecks): - def __init__(self, - schain_name: str, - node_id: int, - schain_record: SChainRecord, - rotation_id: int, - stream_version: str, - current_nodes: list[ExtendedManagerNodeInfo], - estate: ExternalState, - sync_node: bool = False, - econfig: Optional[ExternalConfig] = None - ) -> None: + def __init__( + self, + schain_name: str, + node_id: int, + schain_record: SChainRecord, + rotation_id: int, + stream_version: str, + current_nodes: list[ExtendedManagerNodeInfo], + estate: ExternalState, + last_dkg_successful: bool, + sync_node: bool = False, + econfig: Optional[ExternalConfig] = None, + ) -> None: self.name = schain_name self.node_id = node_id self.schain_record = schain_record @@ -166,11 +170,10 @@ def __init__(self, self.stream_version = stream_version self.current_nodes = current_nodes self.estate = estate + self._last_dkg_successful = last_dkg_successful self.sync_node = sync_node self.econfig = econfig or ExternalConfig(schain_name) - self.cfm: ConfigFileManager = ConfigFileManager( - schain_name=schain_name - ) + self.cfm: ConfigFileManager = ConfigFileManager(schain_name=schain_name) self.statsd_client = get_statsd_client() def get_name(self) -> str: @@ -182,13 +185,15 @@ def config_dir(self) -> CheckRes: dir_path = self.cfm.dirname return CheckRes(os.path.isdir(dir_path)) + @property + def last_dkg_successful(self) -> CheckRes: + """Checks that last dkg was successfuly completed""" + return CheckRes(self._last_dkg_successful) + @property def dkg(self) -> CheckRes: """Checks that DKG procedure is completed""" - secret_key_share_filepath = get_secret_key_share_filepath( - self.name, - self.rotation_id - ) + secret_key_share_filepath = get_secret_key_share_filepath(self.name, self.rotation_id) return CheckRes(os.path.isfile(secret_key_share_filepath)) @property @@ -227,17 +232,14 @@ def upstream_config(self) -> CheckRes: exists, node_ips_updated, stream_updated, - triggered + triggered, ) return CheckRes(exists and node_ips_updated and stream_updated and not triggered) @property def external_state(self) -> CheckRes: actual_state = self.econfig.get() - logger.debug( - 'Checking external config. Current %s. Saved %s', - self.estate, actual_state - ) + logger.debug('Checking external config. Current %s. Saved %s', self.estate, actual_state) return CheckRes(self.econfig.synced(self.estate)) @@ -250,7 +252,7 @@ def __init__( *, econfig: Optional[ExternalConfig] = None, dutils: Optional[DockerUtils] = None, - sync_node: bool = False + sync_node: bool = False, ): self.name = schain_name self.schain_record = schain_record @@ -259,9 +261,7 @@ def __init__( self.econfig = econfig or ExternalConfig(name=schain_name) self.sync_node = sync_node self.rc = rule_controller - self.cfm: ConfigFileManager = ConfigFileManager( - schain_name=schain_name - ) + self.cfm: ConfigFileManager = ConfigFileManager(schain_name=schain_name) self.statsd_client = get_statsd_client() def get_name(self) -> str: @@ -278,9 +278,7 @@ def rotation_id_updated(self) -> CheckRes: upstream_rotations = get_upstream_config_rotation_ids(self.cfm) config_rotations = get_skaled_config_rotations_ids(self.cfm) logger.debug( - 'Comparing rotation_ids. Upstream: %s. Config: %s', - upstream_rotations, - config_rotations + 'Comparing rotation_ids. Upstream: %s. Config: %s', upstream_rotations, config_rotations ) return CheckRes(upstream_rotations == config_rotations) @@ -292,19 +290,14 @@ def config_updated(self) -> CheckRes: @property def config(self) -> CheckRes: - """ Checks that sChain config file exists """ + """Checks that sChain config file exists""" return CheckRes(self.cfm.skaled_config_exists()) @property def volume(self) -> CheckRes: """Checks that sChain volume exists""" - return CheckRes( - is_volume_exists( - self.name, - sync_node=self.sync_node, - dutils=self.dutils) - ) + return CheckRes(is_volume_exists(self.name, sync_node=self.sync_node, dutils=self.dutils)) @property def firewall_rules(self) -> CheckRes: @@ -316,10 +309,7 @@ def firewall_rules(self) -> CheckRes: own_ip = get_own_ip_from_config(conf) ranges = self.econfig.ranges self.rc.configure( - base_port=base_port, - own_ip=own_ip, - node_ips=node_ips, - sync_ip_ranges=ranges + base_port=base_port, own_ip=own_ip, node_ips=node_ips, sync_ip_ranges=ranges ) logger.debug(f'Rule controller {self.rc.expected_rules()}') return CheckRes(self.rc.is_rules_synced()) @@ -364,19 +354,19 @@ def ima_container(self) -> CheckRes: updated_time_frame = time_frame == container_time_frame logger.debug( 'IMA image %s, container image %s, time frame %d, container_time_frame %d', - expected_image, image, time_frame, container_time_frame + expected_image, + image, + time_frame, + container_time_frame, ) data = { 'container_running': container_running, 'updated_image': updated_image, 'new_image_pulled': new_image_pulled, - 'updated_time_frame': updated_time_frame + 'updated_time_frame': updated_time_frame, } - logger.debug( - '%s, IMA check - %s', - self.name, data - ) + logger.debug('%s, IMA check - %s', self.name, data) result: bool = all(data.values()) return CheckRes(result, data=data) @@ -387,9 +377,7 @@ def rpc(self) -> CheckRes: if self.config: config = self.cfm.skaled_config http_endpoint = get_local_schain_http_endpoint_from_config(config) - timeout = get_endpoint_alive_check_timeout( - self.schain_record.failed_rpc_count - ) + timeout = get_endpoint_alive_check_timeout(self.schain_record.failed_rpc_count) res = check_endpoint_alive(http_endpoint, timeout=timeout) return CheckRes(res) @@ -426,11 +414,12 @@ def __init__( stream_version: str, estate: ExternalState, current_nodes: list[ExtendedManagerNodeInfo], + last_dkg_successful: bool, rotation_id: int = 0, *, econfig: Optional[ExternalConfig] = None, dutils: DockerUtils = None, - sync_node: bool = False + sync_node: bool = False, ): self._subjects = [ ConfigChecks( @@ -440,9 +429,10 @@ def __init__( rotation_id=rotation_id, stream_version=stream_version, current_nodes=current_nodes, + last_dkg_successful=last_dkg_successful, estate=estate, econfig=econfig, - sync_node=sync_node + sync_node=sync_node, ), SkaledChecks( schain_name=schain_name, @@ -450,8 +440,8 @@ def __init__( rule_controller=rule_controller, econfig=econfig, dutils=dutils, - sync_node=sync_node - ) + sync_node=sync_node, + ), ] def __getattr__(self, attr: str) -> Any: @@ -469,11 +459,7 @@ def get_all(self, log: bool = True, save: bool = False, needed: Optional[List[st plain_checks = {} for subj in self._subjects: logger.debug('Running checks for %s', subj) - subj_checks = subj.get_all( - log=False, - save=False, - needed=needed - ) + subj_checks = subj.get_all(log=False, save=False, needed=needed) plain_checks.update(subj_checks) if not self.estate or not self.estate.ima_linked: if 'ima_container' in plain_checks: @@ -492,13 +478,9 @@ def get_api_checks_status(status: Dict, allowed: List = API_ALLOWED_CHECKS) -> D def save_checks_dict(schain_name, checks_dict): schain_check_path = get_schain_check_filepath(schain_name) - logger.info( - f'Saving checks for the chain {schain_name}: {schain_check_path}') + logger.info(f'Saving checks for the chain {schain_name}: {schain_check_path}') try: - write_json(schain_check_path, { - 'time': time.time(), - 'checks': checks_dict - }) + write_json(schain_check_path, {'time': time.time(), 'checks': checks_dict}) except Exception: logger.exception(f'Failed to save checks: {schain_check_path}') @@ -510,19 +492,17 @@ def log_checks_dict(schain_name, checks_dict): if not checks_dict[check]: failed_checks.append(check) if len(failed_checks) != 0: - failed_checks_str = ", ".join(failed_checks) + failed_checks_str = ', '.join(failed_checks) logger.info( arguments_list_string( - { - 'sChain name': schain_name, - 'Failed checks': failed_checks_str - }, - 'Failed sChain checks', 'error' + {'sChain name': schain_name, 'Failed checks': failed_checks_str}, + 'Failed sChain checks', + 'error', ) ) def send_to_statsd(statsd_client: statsd.StatsClient, schain_name: str, checks_dict: dict) -> None: for check, result in checks_dict.items(): - mname = f'admin.checks.{schain_name}.{check}' + mname = f'admin.schain_checks.{check}.{no_hyphens(schain_name)}' statsd_client.gauge(mname, int(result)) diff --git a/core/schains/cleaner.py b/core/schains/cleaner.py index 387f1861d..985089db6 100644 --- a/core/schains/cleaner.py +++ b/core/schains/cleaner.py @@ -224,6 +224,7 @@ def cleanup_schain( schain_name, sync_agent_ranges, rotation_id, + last_dkg_successful, current_nodes, estate, dutils=None @@ -245,6 +246,7 @@ def cleanup_schain( current_nodes=current_nodes, rotation_id=rotation_id, estate=estate, + last_dkg_successful=last_dkg_successful, dutils=dutils, sync_node=SYNC_NODE ) diff --git a/core/schains/dkg/client.py b/core/schains/dkg/client.py index 28c001592..bf8c0db3a 100644 --- a/core/schains/dkg/client.py +++ b/core/schains/dkg/client.py @@ -29,6 +29,7 @@ from core.schains.dkg.broadcast_filter import Filter from core.schains.dkg.structures import ComplaintReason, DKGStep +from tools.helper import no_hyphens from tools.configs import NODE_DATA_PATH, SGX_CERTIFICATES_FOLDER from tools.resources import get_statsd_client from tools.sgx_utils import sgx_unreachable_retry @@ -78,7 +79,7 @@ def convert_g2_point_to_hex(data): data_hexed = '' for coord in data: temp = hex(int(coord))[2:] - while (len(temp) < 64): + while len(temp) < 64: temp = '0' + temp data_hexed += temp return data_hexed @@ -88,7 +89,7 @@ def convert_hex_to_g2_array(data): g2_array = [] while len(data) > 0: cur = data[:256] - g2_array.append([str(x) for x in [int(cur[64 * i:64 * i + 64], 16) for i in range(4)]]) + g2_array.append([str(x) for x in [int(cur[64 * i: 64 * i + 64], 16) for i in range(4)]]) data = data[256:] return g2_array @@ -103,7 +104,7 @@ def convert_str_to_key_share(sent_secret_key_contribution, n): def convert_key_share_to_str(data, n): - return "".join(to_verify(s) for s in [data[i * 192:(i + 1) * 192] for i in range(n)]) + return ''.join(to_verify(s) for s in [data[i * 192: (i + 1) * 192] for i in range(n)]) def to_verify(share): @@ -112,24 +113,24 @@ def to_verify(share): def generate_poly_name(group_index_str, node_id, dkg_id): return ( - "POLY:SCHAIN_ID:" - f"{group_index_str}" - ":NODE_ID:" - f"{str(node_id)}" - ":DKG_ID:" - f"{str(dkg_id)}" - ) + 'POLY:SCHAIN_ID:' + f'{group_index_str}' + ':NODE_ID:' + f'{str(node_id)}' + ':DKG_ID:' + f'{str(dkg_id)}' + ) def generate_bls_key_name(group_index_str, node_id, dkg_id): return ( - "BLS_KEY:SCHAIN_ID:" - f"{group_index_str}" - ":NODE_ID:" - f"{str(node_id)}" - ":DKG_ID:" - f"{str(dkg_id)}" - ) + 'BLS_KEY:SCHAIN_ID:' + f'{group_index_str}' + ':NODE_ID:' + f'{str(node_id)}' + ':DKG_ID:' + f'{str(dkg_id)}' + ) class DKGClient: @@ -146,10 +147,11 @@ def __init__( node_ids_contract, eth_key_name, rotation_id, - step: DKGStep = DKGStep.NONE + step: DKGStep = DKGStep.NONE, ): - self.sgx = SgxClient(os.environ['SGX_SERVER_URL'], n=n, t=t, - path_to_cert=SGX_CERTIFICATES_FOLDER) + self.sgx = SgxClient( + os.environ['SGX_SERVER_URL'], n=n, t=t, path_to_cert=SGX_CERTIFICATES_FOLDER + ) self.schain_name = schain_name self.group_index = skale.schains.name_to_group_id(schain_name) self.node_id_contract = node_id_contract @@ -169,9 +171,9 @@ def __init__( self.node_ids_contract = node_ids_contract self.dkg_contract_functions = self.skale.dkg.contract.functions self.dkg_timeout = self.skale.constants_holder.get_dkg_timeout() - self.complaint_error_event_hash = self.skale.web3.to_hex(self.skale.web3.keccak( - text="ComplaintError(string)" - )) + self.complaint_error_event_hash = self.skale.web3.to_hex( + self.skale.web3.keccak(text='ComplaintError(string)') + ) self.statsd_client = get_statsd_client() self._last_completed_step = step # last step logger.info(f'sChain: {self.schain_name}. DKG timeout is {self.dkg_timeout}') @@ -181,9 +183,11 @@ def last_completed_step(self) -> DKGStep: return self._last_completed_step @last_completed_step.setter - def last_completed_step(self, value: DKGStep): - self.statsd_client.gauge(f'admin.dkg.last_completed_step.{self.schain_name}', value) - self._last_completed_step = value + def last_completed_step(self, step: DKGStep): + self.statsd_client.gauge( + f'admin.schains.dkg.last_completed_step.{no_hyphens(self.schain_name)}', step.value + ) + self._last_completed_step = step def is_channel_opened(self): return self.skale.dkg.is_channel_opened(self.group_index) @@ -214,9 +218,9 @@ def verification_vector(self): @sgx_unreachable_retry def secret_key_contribution(self): - self.sent_secret_key_contribution = self.sgx.get_secret_key_contribution_v2(self.poly_name, - self.public_keys - ) + self.sent_secret_key_contribution = self.sgx.get_secret_key_contribution_v2( + self.poly_name, self.public_keys + ) self.incoming_secret_key_contribution[self.node_id_dkg] = self.sent_secret_key_contribution[ self.node_id_dkg * 192: (self.node_id_dkg + 1) * 192 ] @@ -233,12 +237,14 @@ def broadcast(self): ) is_broadcast_possible = self.skale.dkg.contract.functions.isBroadcastPossible( - self.group_index, self.node_id_contract).call({'from': self.skale.wallet.address}) + self.group_index, self.node_id_contract + ).call({'from': self.skale.wallet.address}) channel_opened = self.is_channel_opened() if not is_broadcast_possible or not channel_opened: - logger.info(f'sChain: {self.schain_name}. ' - f'{self.node_id_dkg} node could not sent broadcast') + logger.info( + f'sChain: {self.schain_name}. ' f'{self.node_id_dkg} node could not sent broadcast' + ) return verification_vector = self.verification_vector() @@ -249,7 +255,7 @@ def broadcast(self): self.node_id_contract, verification_vector, secret_key_contribution, - self.rotation_id + self.rotation_id, ) self.last_completed_step = DKGStep.BROADCAST logger.info('Everything is sent from %d node', self.node_id_dkg) @@ -262,28 +268,31 @@ def receive_from_node(self, from_node, broadcasted_data): try: if not self.verification(from_node): raise DkgVerificationError( - f"sChain: {self.schain_name}. " - f"Fatal error : user {str(from_node + 1)} " + f'sChain: {self.schain_name}. ' + f'Fatal error : user {str(from_node + 1)} ' f"hasn't passed verification by user {str(self.node_id_dkg + 1)}" ) - logger.info(f'sChain: {self.schain_name}. ' - f'All data from {from_node} was received and verified') + logger.info( + f'sChain: {self.schain_name}. ' + f'All data from {from_node} was received and verified' + ) except SgxUnreachableError as e: raise SgxUnreachableError( - f"sChain: {self.schain_name}. " - f"Fatal error : user {str(from_node + 1)} " - f"hasn't passed verification by user {str(self.node_id_dkg + 1)}" - f"with SgxUnreachableError: ", e - ) + f'sChain: {self.schain_name}. ' + f'Fatal error : user {str(from_node + 1)} ' + f"hasn't passed verification by user {str(self.node_id_dkg + 1)}" + f'with SgxUnreachableError: ', + e, + ) @sgx_unreachable_retry def verification(self, from_node): - return self.sgx.verify_secret_share_v2(self.incoming_verification_vector[from_node], - self.eth_key_name, - to_verify( - self.incoming_secret_key_contribution[from_node] - ), - self.node_id_dkg) + return self.sgx.verify_secret_share_v2( + self.incoming_verification_vector[from_node], + self.eth_key_name, + to_verify(self.incoming_secret_key_contribution[from_node]), + self.node_id_dkg, + ) @sgx_unreachable_retry def is_bls_key_generated(self): @@ -298,17 +307,20 @@ def is_bls_key_generated(self): @sgx_unreachable_retry def generate_bls_key(self): - received_secret_key_contribution = "".join(to_verify( - self.incoming_secret_key_contribution[j] - ) - for j in range(self.sgx.n)) - logger.info(f'sChain: {self.schain_name}. ' - f'DKGClient is going to create BLS private key with name {self.bls_name}') - bls_private_key = self.sgx.create_bls_private_key_v2(self.poly_name, self.bls_name, - self.eth_key_name, - received_secret_key_contribution) - logger.info(f'sChain: {self.schain_name}. ' - 'DKGClient is going to fetch BLS public key with name {self.bls_name}') + received_secret_key_contribution = ''.join( + to_verify(self.incoming_secret_key_contribution[j]) for j in range(self.sgx.n) + ) + logger.info( + f'sChain: {self.schain_name}. ' + f'DKGClient is going to create BLS private key with name {self.bls_name}' + ) + bls_private_key = self.sgx.create_bls_private_key_v2( + self.poly_name, self.bls_name, self.eth_key_name, received_secret_key_contribution + ) + logger.info( + f'sChain: {self.schain_name}. ' + 'DKGClient is going to fetch BLS public key with name {self.bls_name}' + ) self.public_key = self.sgx.get_bls_public_key(self.bls_name) return bls_private_key @@ -326,72 +338,70 @@ def get_bls_public_keys(self): def alright(self): logger.info(f'sChain {self.schain_name} sending alright transaction') is_alright_possible = self.skale.dkg.is_alright_possible( - self.group_index, self.node_id_contract, self.skale.wallet.address) + self.group_index, self.node_id_contract, self.skale.wallet.address + ) if not is_alright_possible or not self.is_channel_opened(): - logger.info(f'sChain: {self.schain_name}. ' - f'{self.node_id_dkg} node could not sent an alright note') + logger.info( + f'sChain: {self.schain_name}. ' + f'{self.node_id_dkg} node could not sent an alright note' + ) return self.skale.dkg.alright( - self.group_index, - self.node_id_contract, - gas_limit=ALRIGHT_GAS_LIMIT, - multiplier=2 + self.group_index, self.node_id_contract, gas_limit=ALRIGHT_GAS_LIMIT, multiplier=2 ) self.last_completed_step = DKGStep.ALRIGHT logger.info(f'sChain: {self.schain_name}. {self.node_id_dkg} node sent an alright note') def send_complaint(self, to_node: int, reason: ComplaintReason): - logger.info(f'sChain: {self.schain_name}. ' - f'{self.node_id_dkg} node is trying to sent a {reason} on {to_node} node') + logger.info( + f'sChain: {self.schain_name}. ' + f'{self.node_id_dkg} node is trying to sent a {reason} on {to_node} node' + ) is_complaint_possible = self.skale.dkg.is_complaint_possible( - self.group_index, self.node_id_contract, self.node_ids_dkg[to_node], - self.skale.wallet.address + self.group_index, + self.node_id_contract, + self.node_ids_dkg[to_node], + self.skale.wallet.address, ) is_channel_opened = self.is_channel_opened() logger.info( - 'Complaint possible %s, channel opened %s', - is_complaint_possible, - is_channel_opened + 'Complaint possible %s, channel opened %s', is_complaint_possible, is_channel_opened ) if not is_complaint_possible or not is_channel_opened: - logger.info( - '%d node could not sent a complaint on %d node', - self.node_id_dkg, - to_node - ) + logger.info('%d node could not sent a complaint on %d node', self.node_id_dkg, to_node) return False reason_to_step = { ComplaintReason.NO_BROADCAST: DKGStep.COMPLAINT_NO_BROADCAST, ComplaintReason.BAD_DATA: DKGStep.COMPLAINT_BAD_DATA, ComplaintReason.NO_ALRIGHT: DKGStep.COMPLAINT_NO_ALRIGHT, - ComplaintReason.NO_RESPONSE: DKGStep.COMPLAINT_NO_RESPONSE + ComplaintReason.NO_RESPONSE: DKGStep.COMPLAINT_NO_RESPONSE, } try: if reason == ComplaintReason.BAD_DATA: tx_res = self.skale.dkg.complaint_bad_data( - self.group_index, - self.node_id_contract, - self.node_ids_dkg[to_node] + self.group_index, self.node_id_contract, self.node_ids_dkg[to_node] ) else: tx_res = self.skale.dkg.complaint( - self.group_index, - self.node_id_contract, - self.node_ids_dkg[to_node] + self.group_index, self.node_id_contract, self.node_ids_dkg[to_node] ) if self.check_complaint_logs(tx_res.receipt['logs'][0]): - logger.info(f'sChain: {self.schain_name}. ' - f'{self.node_id_dkg} node sent a complaint on {to_node} node') + logger.info( + f'sChain: {self.schain_name}. ' + f'{self.node_id_dkg} node sent a complaint on {to_node} node' + ) self.last_completed_step = reason_to_step[reason] return True else: - logger.info(f'sChain: {self.schain_name}. Complaint from {self.node_id_dkg} on ' - f'{to_node} node was rejected') + logger.info( + f'sChain: {self.schain_name}. Complaint from {self.node_id_dkg} on ' + f'{to_node} node was rejected' + ) return False except TransactionFailedError as e: logger.error(f'DKG complaint failed: sChain {self.schain_name}') @@ -400,8 +410,7 @@ def send_complaint(self, to_node: int, reason: ComplaintReason): @sgx_unreachable_retry def get_complaint_response(self, to_node_index): response = self.sgx.complaint_response( - self.poly_name, - self.node_ids_contract[to_node_index] + self.poly_name, self.node_ids_contract[to_node_index] ) share, dh_key = response.share, response.dh_key verification_vector_mult = response.verification_vector_mult @@ -413,11 +422,13 @@ def get_complaint_response(self, to_node_index): def response(self, to_node_index): is_pre_response_possible = self.skale.dkg.is_pre_response_possible( - self.group_index, self.node_id_contract, self.skale.wallet.address) + self.group_index, self.node_id_contract, self.skale.wallet.address + ) if not is_pre_response_possible or not self.is_channel_opened(): - logger.info(f'sChain: {self.schain_name}. ' - f'{self.node_id_dkg} node could not sent a response') + logger.info( + f'sChain: {self.schain_name}. ' f'{self.node_id_dkg} node could not sent a response' + ) return share, dh_key, verification_vector_mult = self.get_complaint_response(to_node_index) @@ -428,24 +439,22 @@ def response(self, to_node_index): self.node_id_contract, convert_g2_points_to_array(self.incoming_verification_vector[self.node_id_dkg]), convert_g2_points_to_array(verification_vector_mult), - convert_str_to_key_share(self.sent_secret_key_contribution, self.n) + convert_str_to_key_share(self.sent_secret_key_contribution, self.n), ) self.last_completed_step = DKGStep.PRE_RESPONSE is_response_possible = self.skale.dkg.is_response_possible( - self.group_index, self.node_id_contract, self.skale.wallet.address) + self.group_index, self.node_id_contract, self.skale.wallet.address + ) if not is_response_possible or not self.is_channel_opened(): - logger.info(f'sChain: {self.schain_name}. ' - f'{self.node_id_dkg} node could not sent a response') + logger.info( + f'sChain: {self.schain_name}. ' + f'{self.node_id_dkg} node could not sent a response' + ) return - self.skale.dkg.response( - self.group_index, - self.node_id_contract, - int(dh_key, 16), - share - ) + self.skale.dkg.response(self.group_index, self.node_id_contract, int(dh_key, 16), share) self.last_completed_step = DKGStep.RESPONSE logger.info(f'sChain: {self.schain_name}. {self.node_id_dkg} node sent a response') except TransactionFailedError as e: @@ -461,8 +470,7 @@ def fetch_all_broadcasted_data(self): broadcasted_data = [event.verificationVector, event.secretKeyContribution] self.store_broadcasted_data(broadcasted_data, from_node) logger.info( - f'sChain: {self.schain_name}. Received by {self.node_id_dkg} from ' - f'{from_node}' + f'sChain: {self.schain_name}. Received by {self.node_id_dkg} from ' f'{from_node}' ) def is_all_data_received(self, from_node): diff --git a/core/schains/firewall/rule_controller.py b/core/schains/firewall/rule_controller.py index 2fda28168..51e8920a8 100644 --- a/core/schains/firewall/rule_controller.py +++ b/core/schains/firewall/rule_controller.py @@ -87,7 +87,7 @@ def get_missing(self) -> Dict['str', Any]: return missing def is_configured(self) -> bool: - return all((self.base_port, self.own_ip, self.node_ips)) + return all((self.base_port, self.node_ips)) def configure( self, diff --git a/core/schains/monitor/action.py b/core/schains/monitor/action.py index 88871e467..fb09d98c5 100644 --- a/core/schains/monitor/action.py +++ b/core/schains/monitor/action.py @@ -78,6 +78,7 @@ from tools.configs import SYNC_NODE from tools.configs.containers import IMA_CONTAINER, SCHAIN_CONTAINER from tools.docker_utils import DockerUtils +from tools.helper import no_hyphens from tools.node_options import NodeOptions from tools.notifications.messages import notify_repair_mode from tools.resources import get_statsd_client @@ -178,7 +179,7 @@ def config_dir(self) -> bool: @BaseActionManager.monitor_block def dkg(self) -> bool: initial_status = self.checks.dkg.status - with self.statsd_client.timer(f'admin.dkg.{self.name}'): + with self.statsd_client.timer(f'admin.action.dkg.{no_hyphens(self.name)}'): if not initial_status: logger.info('Initing dkg client') dkg_client = get_dkg_client( @@ -212,7 +213,7 @@ def dkg(self) -> bool: @BaseActionManager.monitor_block def upstream_config(self) -> bool: - with self.statsd_client.timer(f'admin.upstream_config.{self.name}'): + with self.statsd_client.timer(f'admin.action.upstream_config.{no_hyphens(self.name)}'): logger.info( 'Creating new upstream_config rotation_id: %s, stream: %s', self.rotation_data.get('rotation_id'), self.stream_version @@ -348,7 +349,7 @@ def firewall_rules(self, upstream: bool = False) -> bool: ranges = self.econfig.ranges logger.info('Adding ranges %s', ranges) - with self.statsd_client.timer(f'admin.firewall.{self.name}'): + with self.statsd_client.timer(f'admin.action.firewall.{no_hyphens(self.name)}'): self.rc.configure( base_port=base_port, own_ip=own_ip, @@ -356,7 +357,7 @@ def firewall_rules(self, upstream: bool = False) -> bool: sync_ip_ranges=ranges ) self.statsd_client.gauge( - f'admin.expected_rules.{self.name}', + f'admin.action.expected_rules.{no_hyphens(self.name)}', len(self.rc.expected_rules()) ) self.rc.sync() diff --git a/core/schains/monitor/config_monitor.py b/core/schains/monitor/config_monitor.py index 1bd54841d..47587a1bc 100644 --- a/core/schains/monitor/config_monitor.py +++ b/core/schains/monitor/config_monitor.py @@ -29,11 +29,7 @@ class BaseConfigMonitor(IMonitor): - def __init__( - self, - action_manager: ConfigActionManager, - checks: ConfigChecks - ) -> None: + def __init__(self, action_manager: ConfigActionManager, checks: ConfigChecks) -> None: self.am = action_manager self.checks = checks @@ -73,7 +69,7 @@ def execute(self) -> None: self.am.config_dir() if not self.checks.external_state: self.am.external_state() - if not self.checks.upstream_config: + if self.checks.last_dkg_successful and not self.checks.upstream_config: self.am.upstream_config() - self.am.update_reload_ts(self.checks.skaled_node_ips, sync_node=True) + self.am.update_reload_ts(self.checks.skaled_node_ips, sync_node=True) self.am.reset_config_record() diff --git a/core/schains/monitor/main.py b/core/schains/monitor/main.py index 27405872c..58010d347 100644 --- a/core/schains/monitor/main.py +++ b/core/schains/monitor/main.py @@ -31,20 +31,11 @@ from core.node import get_skale_node_version from core.node_config import NodeConfig -from core.schains.checks import ( - ConfigChecks, - get_api_checks_status, - TG_ALLOWED_CHECKS, - SkaledChecks -) +from core.schains.checks import ConfigChecks, get_api_checks_status, TG_ALLOWED_CHECKS, SkaledChecks from core.schains.config.file_manager import ConfigFileManager from core.schains.firewall import get_default_rule_controller from core.schains.firewall.utils import get_sync_agent_ranges -from core.schains.monitor import ( - get_skaled_monitor, - RegularConfigMonitor, - SyncConfigMonitor -) +from core.schains.monitor import get_skaled_monitor, RegularConfigMonitor, SyncConfigMonitor from core.schains.monitor.action import ConfigActionManager, SkaledActionManager from core.schains.external_config import ExternalConfig, ExternalState from core.schains.task import keep_tasks_running, Task @@ -55,7 +46,7 @@ from tools.docker_utils import DockerUtils from tools.configs import SYNC_NODE from tools.notifications.messages import notify_checks -from tools.helper import is_node_part_of_chain +from tools.helper import is_node_part_of_chain, no_hyphens from tools.resources import get_statsd_client from web.models.schain import SChainRecord @@ -70,23 +61,19 @@ def run_config_pipeline( - skale: Skale, - skale_ima: SkaleIma, - schain: Dict, - node_config: NodeConfig, - stream_version: str + skale: Skale, skale_ima: SkaleIma, schain: Dict, node_config: NodeConfig, stream_version: str ) -> None: name = schain['name'] schain_record = SChainRecord.get_by_name(name) rotation_data = skale.node_rotation.get_rotation(name) allowed_ranges = get_sync_agent_ranges(skale) ima_linked = not SYNC_NODE and skale_ima.linker.has_schain(name) + group_index = skale.schains.name_to_group_id(name) + last_dkg_successful = skale.dkg.is_last_dkg_successful(group_index) current_nodes = get_current_nodes(skale, name) estate = ExternalState( - ima_linked=ima_linked, - chain_id=skale_ima.web3.eth.chain_id, - ranges=allowed_ranges + ima_linked=ima_linked, chain_id=skale_ima.web3.eth.chain_id, ranges=allowed_ranges ) econfig = ExternalConfig(name) config_checks = ConfigChecks( @@ -96,8 +83,9 @@ def run_config_pipeline( stream_version=stream_version, rotation_id=rotation_data['rotation_id'], current_nodes=current_nodes, + last_dkg_successful=last_dkg_successful, econfig=econfig, - estate=estate + estate=estate, ) config_am = ConfigActionManager( @@ -109,34 +97,38 @@ def run_config_pipeline( checks=config_checks, current_nodes=current_nodes, estate=estate, - econfig=econfig + econfig=econfig, ) status = config_checks.get_all(log=False, expose=True) logger.info('Config checks: %s', status) if SYNC_NODE: - logger.info('Sync node mode, running config monitor') + logger.info( + 'Sync node last_dkg_successful %s, rotation_data %s', + last_dkg_successful, + rotation_data + ) mon = SyncConfigMonitor(config_am, config_checks) else: logger.info('Regular node mode, running config monitor') mon = RegularConfigMonitor(config_am, config_checks) statsd_client = get_statsd_client() - statsd_client.incr(f'admin.config.pipeline.{name}.{mon.__class__.__name__}') - statsd_client.gauge(f'admin.schain.rotation_id.{name}', rotation_data['rotation_id']) - with statsd_client.timer(f'admin.config.pipeline.{name}.duration'): + statsd_client.incr(f'admin.config_pipeline.{mon.__class__.__name__}.{no_hyphens(name)}') + statsd_client.gauge( + f'admin.config_pipeline.rotation_id.{no_hyphens(name)}', rotation_data['rotation_id'] + ) + with statsd_client.timer(f'admin.config_pipeline.duration.{no_hyphens(name)}'): mon.run() def run_skaled_pipeline( - skale: Skale, - schain: Dict, - node_config: NodeConfig, - dutils: DockerUtils + skale: Skale, schain: Dict, node_config: NodeConfig, dutils: DockerUtils ) -> None: name = schain['name'] schain_record = SChainRecord.get_by_name(name) + logger.info('Record: %s', SChainRecord.to_dict(schain_record)) dutils = dutils or DockerUtils() @@ -146,7 +138,7 @@ def run_skaled_pipeline( schain_record=schain_record, rule_controller=rc, dutils=dutils, - sync_node=SYNC_NODE + sync_node=SYNC_NODE, ) skaled_status = get_skaled_status(name) @@ -157,12 +149,11 @@ def run_skaled_pipeline( checks=skaled_checks, node_config=node_config, econfig=ExternalConfig(name), - dutils=dutils + dutils=dutils, ) status = skaled_checks.get_all(log=False, expose=True) automatic_repair = get_automatic_repair_option() - api_status = get_api_checks_status( - status=status, allowed=TG_ALLOWED_CHECKS) + api_status = get_api_checks_status(status=status, allowed=TG_ALLOWED_CHECKS) notify_checks(name, node_config.all(), api_status) logger.info('Skaled status: %s', status) @@ -174,22 +165,20 @@ def run_skaled_pipeline( status=status, schain_record=schain_record, skaled_status=skaled_status, - automatic_repair=automatic_repair + automatic_repair=automatic_repair, ) statsd_client = get_statsd_client() - statsd_client.incr(f'schain.skaled.pipeline.{name}.{mon.__name__}') - with statsd_client.timer(f'admin.skaled.pipeline.{name}.duration'): + statsd_client.incr(f'admin.skaled_pipeline.{mon.__name__}.{no_hyphens(name)}') + with statsd_client.timer(f'admin.skaled_pipeline.duration.{no_hyphens(name)}'): mon(skaled_am, skaled_checks).run() def post_monitor_sleep(): schain_monitor_sleep = random.randint( - MIN_SCHAIN_MONITOR_SLEEP_INTERVAL, - MAX_SCHAIN_MONITOR_SLEEP_INTERVAL + MIN_SCHAIN_MONITOR_SLEEP_INTERVAL, MAX_SCHAIN_MONITOR_SLEEP_INTERVAL ) - logger.info('Monitor iteration completed, sleeping for %d', - schain_monitor_sleep) + logger.info('Monitor iteration completed, sleeping for %d', schain_monitor_sleep) time.sleep(schain_monitor_sleep) @@ -202,7 +191,7 @@ def create_and_execute_tasks( schain_record, executor, futures, - dutils + dutils, ): reload(web3_request) name = schain['name'] @@ -216,13 +205,15 @@ def create_and_execute_tasks( logger.info( 'sync_config_run %s, config_version %s, stream_version %s', - schain_record.sync_config_run, schain_record.config_version, stream_version + schain_record.sync_config_run, + schain_record.config_version, + stream_version, ) statsd_client = get_statsd_client() monitor_last_seen_ts = schain_record.monitor_last_seen.timestamp() - statsd_client.incr(f'admin.schain.monitor.{name}') - statsd_client.gauge(f'admin.schain.monitor_last_seen.{name}', monitor_last_seen_ts) + statsd_client.incr(f'admin.schain.monitor.{no_hyphens(name)}') + statsd_client.gauge(f'admin.schain.monitor_last_seen.{no_hyphens(name)}', monitor_last_seen_ts) tasks = [] if not leaving_chain: @@ -236,12 +227,14 @@ def create_and_execute_tasks( skale_ima=skale_ima, schain=schain, node_config=node_config, - stream_version=stream_version + stream_version=stream_version, ), - sleep=CONFIG_PIPELINE_SLEEP - )) - if schain_record.config_version != stream_version or \ - (schain_record.sync_config_run and schain_record.first_run): + sleep=CONFIG_PIPELINE_SLEEP, + ) + ) + if schain_record.config_version != stream_version or ( + schain_record.sync_config_run and schain_record.first_run + ): ConfigFileManager(name).remove_skaled_config() else: logger.info('Adding skaled task to the pool') @@ -253,10 +246,11 @@ def create_and_execute_tasks( skale=skale, schain=schain, node_config=node_config, - dutils=dutils + dutils=dutils, ), - sleep=SKALED_PIPELINE_SLEEP - )) + sleep=SKALED_PIPELINE_SLEEP, + ) + ) if len(tasks) == 0: logger.warning('No tasks to run') @@ -264,12 +258,7 @@ def create_and_execute_tasks( def run_monitor_for_schain( - skale, - skale_ima, - node_config: NodeConfig, - schain, - dutils=None, - once=False + skale, skale_ima, node_config: NodeConfig, schain, dutils=None, once=False ): stream_version = get_skale_node_version() tasks_number = 2 @@ -287,7 +276,7 @@ def run_monitor_for_schain( schain_record, executor, futures, - dutils + dutils, ) if once: return True diff --git a/core/schains/monitor/post_rotation_monitor.py b/core/schains/monitor/post_rotation_monitor.py deleted file mode 100644 index 8200ab5a9..000000000 --- a/core/schains/monitor/post_rotation_monitor.py +++ /dev/null @@ -1,38 +0,0 @@ -# -*- coding: utf-8 -*- -# -# This file is part of SKALE Admin -# -# Copyright (C) 2021 SKALE Labs -# -# This program is free software: you can redistribute it and/or modify -# it under the terms of the GNU Affero General Public License as published by -# the Free Software Foundation, either version 3 of the License, or -# (at your option) any later version. -# -# This program is distributed in the hope that it will be useful, -# but WITHOUT ANY WARRANTY; without even the implied warranty of -# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the -# GNU Affero General Public License for more details. -# -# You should have received a copy of the GNU Affero General Public License -# along with this program. If not, see . - -import logging - -from core.schains.monitor.base_monitor import BaseMonitor - - -logger = logging.getLogger(__name__) - - -class PostRotationMonitor(BaseMonitor): - """ - PostRotationMonitor be executed for the sChain on the staying node when rotation is complete. - This type of monitor reloads skaled container. - """ - @BaseMonitor.monitor_runner - def run(self): - logger.info(f'{self.p} was stopped after rotation. Going to restart') - self.config(overwrite=True) - self.firewall_rules() - self.recreated_schain_containers() diff --git a/core/schains/rpc.py b/core/schains/rpc.py index 10519e14b..a9279e265 100644 --- a/core/schains/rpc.py +++ b/core/schains/rpc.py @@ -17,6 +17,8 @@ # You should have received a copy of the GNU Affero General Public License # along with this program. If not, see . +import json +import logging import time from tools.configs import ALLOWED_TIMESTAMP_DIFF @@ -24,6 +26,9 @@ from tools.helper import post_request +logger = logging.getLogger(__name__) + + def make_rpc_call(http_endpoint, method, params=None, timeout=None) -> bool: params = params or [] return post_request( @@ -47,10 +52,16 @@ def check_endpoint_alive(http_endpoint, timeout=None): def check_endpoint_blocks(http_endpoint): res = make_rpc_call(http_endpoint, 'eth_getBlockByNumber', ['latest', False]) - if res and res.json(): - res_data = res.json() - latest_schain_timestamp_hex = res_data['result']['timestamp'] - latest_schain_timestamp = int(latest_schain_timestamp_hex, 16) - admin_timestamp = int(time.time()) - return abs(latest_schain_timestamp - admin_timestamp) < ALLOWED_TIMESTAMP_DIFF - return False + healthy = False + if res: + try: + res_data = res.json() + latest_schain_timestamp_hex = res_data['result']['timestamp'] + latest_schain_timestamp = int(latest_schain_timestamp_hex, 16) + admin_timestamp = int(time.time()) + healthy = abs(latest_schain_timestamp - admin_timestamp) < ALLOWED_TIMESTAMP_DIFF + except (json.JSONDecodeError, KeyError, ValueError) as e: + logger.warning('Failed to parse response, error: %s', e) + else: + logger.warning('Empty response from skaled') + return healthy diff --git a/requirements.txt b/requirements.txt index 664cd2b48..27e560567 100644 --- a/requirements.txt +++ b/requirements.txt @@ -1,7 +1,7 @@ peewee==3.9.5 Flask==2.3.3 -Werkzeug==2.3.7 -gunicorn==20.1.0 +Werkzeug==3.0.3 +gunicorn==22.0.0 Jinja2==3.1.2 @@ -14,7 +14,7 @@ requests==2.31 ima-predeployed==2.1.0b0 etherbase-predeployed==1.1.0b3 marionette-predeployed==2.0.0b2 -config-controller-predeployed==1.1.0b0 +config-controller-predeployed==1.1.0 filestorage-predeployed==1.1.0.dev8 multisigwallet-predeployed==1.1.0a8 diff --git a/tests/conftest.py b/tests/conftest.py index 807884c44..8b34c172f 100644 --- a/tests/conftest.py +++ b/tests/conftest.py @@ -513,6 +513,7 @@ def schain_checks(schain_config, schain_db, current_nodes, rule_controller, esta rule_controller=rule_controller, stream_version=CONFIG_STREAM, current_nodes=current_nodes, + last_dkg_successful=True, estate=estate, dutils=dutils ) diff --git a/tests/firewall/rule_controller_test.py b/tests/firewall/rule_controller_test.py index e0aa26a15..d4f04d598 100644 --- a/tests/firewall/rule_controller_test.py +++ b/tests/firewall/rule_controller_test.py @@ -150,6 +150,50 @@ def test_schain_rule_controller_configure(): own_ip = '1.1.1.1' node_ips = ['1.1.1.1', '2.2.2.2', '3.3.3.3', '4.4.4.4'] base_port = 10000 + + src.configure(base_port=base_port) + with pytest.raises(NotInitializedError): + src.public_ports() + + src.configure(base_port=base_port, node_ips=node_ips) + assert list(src.public_ports) == [10003, 10008, 10002, 10007, 10009] + + expected_rules = { + SChainRule(port=10000, first_ip='1.1.1.1', last_ip=None), + SChainRule(port=10000, first_ip='2.2.2.2', last_ip=None), + SChainRule(port=10000, first_ip='3.3.3.3', last_ip=None), + SChainRule(port=10000, first_ip='4.4.4.4', last_ip=None), + SChainRule(port=10001, first_ip='1.1.1.1', last_ip=None), + SChainRule(port=10001, first_ip='2.2.2.2', last_ip=None), + SChainRule(port=10001, first_ip='3.3.3.3', last_ip=None), + SChainRule(port=10001, first_ip='4.4.4.4', last_ip=None), + SChainRule(port=10002, first_ip=None, last_ip=None), + SChainRule(port=10003, first_ip=None, last_ip=None), + SChainRule(port=10004, first_ip='1.1.1.1', last_ip=None), + SChainRule(port=10004, first_ip='2.2.2.2', last_ip=None), + SChainRule(port=10004, first_ip='3.3.3.3', last_ip=None), + SChainRule(port=10004, first_ip='4.4.4.4', last_ip=None), + SChainRule(port=10005, first_ip='1.1.1.1', last_ip=None), + SChainRule(port=10005, first_ip='2.2.2.2', last_ip=None), + SChainRule(port=10005, first_ip='3.3.3.3', last_ip=None), + SChainRule(port=10005, first_ip='4.4.4.4', last_ip=None), + SChainRule(port=10007, first_ip=None, last_ip=None), + SChainRule(port=10008, first_ip=None, last_ip=None), + SChainRule(port=10009, first_ip=None, last_ip=None), + SChainRule(port=10010, first_ip='1.1.1.1', last_ip=None), + SChainRule(port=10010, first_ip='2.2.2.2', last_ip=None), + SChainRule(port=10010, first_ip='3.3.3.3', last_ip=None), + SChainRule(port=10010, first_ip='4.4.4.4', last_ip=None) + } + src.configure(base_port=base_port, node_ips=node_ips) + + assert not src.is_rules_synced() + assert list(src.expected_rules()) == list(sorted(expected_rules)) + src.sync() + assert src.is_rules_synced() + assert list(src.expected_rules()) == list(sorted(expected_rules)) + assert list(src.actual_rules()) == list(sorted(expected_rules)) + expected_rules = { SChainRule(port=10000, first_ip='2.2.2.2', last_ip=None), SChainRule(port=10000, first_ip='3.3.3.3', last_ip=None), @@ -173,6 +217,7 @@ def test_schain_rule_controller_configure(): SChainRule(port=10010, first_ip='4.4.4.4', last_ip=None) } src.configure(base_port=base_port, own_ip=own_ip, node_ips=node_ips) + assert not src.is_rules_synced() assert list(src.expected_rules()) == list(sorted(expected_rules)) src.sync() diff --git a/tests/helper_test.py b/tests/helper_test.py index 762225232..538690185 100644 --- a/tests/helper_test.py +++ b/tests/helper_test.py @@ -1,4 +1,4 @@ -from tools.helper import is_address_contract +from tools.helper import is_address_contract, no_hyphens from tools.configs.web3 import ZERO_ADDRESS @@ -6,3 +6,10 @@ def test_is_address_contract(skale): assert not is_address_contract(skale.web3, ZERO_ADDRESS) assert is_address_contract(skale.web3, skale.manager.address) assert is_address_contract(skale.web3, skale.nodes.address) + + +def test_no_hyphen(): + assert no_hyphens('too') == 'too' + assert no_hyphens('too-boo') == 'too_boo' + assert no_hyphens('too-boo_goo') == 'too_boo_goo' + assert no_hyphens('too_goo') == 'too_goo' diff --git a/tests/schains/checks_test.py b/tests/schains/checks_test.py index 95bb85950..2e86f4dca 100644 --- a/tests/schains/checks_test.py +++ b/tests/schains/checks_test.py @@ -96,6 +96,7 @@ def sample_false_checks(schain_config, schain_db, rule_controller, current_nodes schain_record=schain_record, rule_controller=rule_controller, stream_version=CONFIG_STREAM, + last_dkg_successful=True, current_nodes=current_nodes, estate=estate, dutils=dutils @@ -120,6 +121,7 @@ def rules_unsynced_checks( rule_controller=uninited_rule_controller, stream_version=CONFIG_STREAM, current_nodes=current_nodes, + last_dkg_successful=True, estate=estate, dutils=dutils ) @@ -288,6 +290,7 @@ def test_init_checks(skale, schain_db, current_nodes, uninited_rule_controller, rule_controller=uninited_rule_controller, stream_version=CONFIG_STREAM, current_nodes=current_nodes, + last_dkg_successful=True, estate=estate, dutils=dutils ) @@ -316,6 +319,7 @@ def test_exit_code(skale, rule_controller, schain_db, current_nodes, estate, dut rule_controller=rule_controller, stream_version=CONFIG_STREAM, current_nodes=current_nodes, + last_dkg_successful=True, estate=estate, dutils=dutils ) @@ -335,6 +339,7 @@ def test_process(skale, rule_controller, schain_db, current_nodes, estate, dutil rule_controller=rule_controller, stream_version=CONFIG_STREAM, current_nodes=current_nodes, + last_dkg_successful=True, estate=estate, dutils=dutils ) @@ -359,6 +364,7 @@ def test_get_all(schain_config, rule_controller, dutils, current_nodes, schain_d rule_controller=rule_controller, stream_version=CONFIG_STREAM, current_nodes=current_nodes, + last_dkg_successful=True, estate=estate, dutils=dutils ) @@ -381,6 +387,7 @@ def test_get_all(schain_config, rule_controller, dutils, current_nodes, schain_d rule_controller=rule_controller, stream_version=CONFIG_STREAM, current_nodes=current_nodes, + last_dkg_successful=True, estate=estate, dutils=dutils ) @@ -408,6 +415,7 @@ def test_get_all_with_save(node_config, rule_controller, current_nodes, dutils, rule_controller=rule_controller, stream_version=CONFIG_STREAM, current_nodes=current_nodes, + last_dkg_successful=True, estate=estate, dutils=dutils ) @@ -432,9 +440,11 @@ def test_config_updated(skale, rule_controller, schain_db, current_nodes, estate rule_controller=rule_controller, stream_version=CONFIG_STREAM, current_nodes=current_nodes, + last_dkg_successful=False, estate=estate, dutils=dutils ) + assert checks.last_dkg_successful.status is False assert checks.config_updated upstream_path = UpstreamConfigFilename( @@ -453,6 +463,7 @@ def test_config_updated(skale, rule_controller, schain_db, current_nodes, estate rule_controller=rule_controller, stream_version=CONFIG_STREAM, current_nodes=current_nodes, + last_dkg_successful=True, estate=estate, dutils=dutils ) @@ -466,7 +477,9 @@ def test_config_updated(skale, rule_controller, schain_db, current_nodes, estate rule_controller=rule_controller, stream_version=CONFIG_STREAM, current_nodes=current_nodes, + last_dkg_successful=True, estate=estate, dutils=dutils ) + assert checks.last_dkg_successful.status is True assert not checks.config_updated diff --git a/tests/schains/cleaner_test.py b/tests/schains/cleaner_test.py index e8319474c..4db2e9f45 100644 --- a/tests/schains/cleaner_test.py +++ b/tests/schains/cleaner_test.py @@ -255,6 +255,7 @@ def test_cleanup_schain( schain_name, current_nodes=current_nodes, sync_agent_ranges=[], + last_dkg_successful=True, rotation_id=0, estate=estate, dutils=dutils diff --git a/tests/schains/monitor/action/config_action_test.py b/tests/schains/monitor/action/config_action_test.py index 771769727..57c904ade 100644 --- a/tests/schains/monitor/action/config_action_test.py +++ b/tests/schains/monitor/action/config_action_test.py @@ -37,6 +37,7 @@ def config_checks( schain_record=schain_record, rotation_id=rotation_data['rotation_id'], stream_version=CONFIG_STREAM, + last_dkg_successful=True, current_nodes=current_nodes, estate=estate ) diff --git a/tests/schains/monitor/config_monitor_test.py b/tests/schains/monitor/config_monitor_test.py index d7c211f65..71fbc2285 100644 --- a/tests/schains/monitor/config_monitor_test.py +++ b/tests/schains/monitor/config_monitor_test.py @@ -10,7 +10,7 @@ from core.schains.config.directory import schain_config_dir from core.schains.monitor.action import ConfigActionManager -from core.schains.monitor.config_monitor import RegularConfigMonitor +from core.schains.monitor.config_monitor import RegularConfigMonitor, SyncConfigMonitor from core.schains.external_config import ExternalConfig from web.models.schain import SChainRecord @@ -42,6 +42,7 @@ def config_checks( rotation_id=rotation_data['rotation_id'], stream_version=CONFIG_STREAM, current_nodes=current_nodes, + last_dkg_successful=True, estate=estate ) @@ -84,9 +85,18 @@ def regular_config_monitor(config_am, config_checks): ) +@pytest.fixture +def sync_config_monitor(config_am, config_checks): + return SyncConfigMonitor( + action_manager=config_am, + checks=config_checks + ) + + def test_regular_config_monitor(schain_db, regular_config_monitor, rotation_data): name = schain_db rotation_id = rotation_data['rotation_id'] + regular_config_monitor.run() config_dir = schain_config_dir(name) @@ -130,3 +140,76 @@ def test_regular_config_monitor_change_ip( regular_config_monitor.am.cfm.sync_skaled_config_with_upstream() regular_config_monitor.run() assert econfig.reload_ts is None + + +def test_sync_config_monitor( + skale, + schain_db, + config_am, + config_checks, + econfig, + estate, + rotation_data +): + name = schain_db + config_dir = schain_config_dir(name) + + rotation_id = rotation_data['rotation_id'] + config_pattern = os.path.join( + config_dir, + f'schain_{name}_{rotation_id}_*.json' + ) + assert len(glob.glob(config_pattern)) == 0 + + assert econfig.synced(estate) + + estate.chain_id = 1 + config_checks.estate = estate + config_am.estate = estate + assert not econfig.synced(estate) + + sync_config_monitor = SyncConfigMonitor( + action_manager=config_am, + checks=config_checks + ) + sync_config_monitor.run() + assert econfig.synced(estate) + config_filename = glob.glob(config_pattern) + assert os.path.isfile(config_filename[0]) + + +def test_sync_config_monitor_dkg_not_completed( + skale, + schain_db, + config_am, + config_checks, + econfig, + estate, + rotation_data +): + name = schain_db + config_dir = schain_config_dir(name) + + rotation_id = rotation_data['rotation_id'] + config_pattern = os.path.join( + config_dir, + f'schain_{name}_{rotation_id}_*.json' + ) + assert len(glob.glob(config_pattern)) == 0 + + assert econfig.synced(estate) + + estate.chain_id = 1 + config_checks.estate = estate + config_am.estate = estate + config_checks._last_dkg_successful = False + assert not econfig.synced(estate) + + sync_config_monitor = SyncConfigMonitor( + action_manager=config_am, + checks=config_checks + ) + sync_config_monitor.run() + assert econfig.synced(estate) + # config generation was not triggered because dkg has not been completed + assert len(glob.glob(config_pattern)) == 0 diff --git a/tests/schains/monitor/rpc_test.py b/tests/schains/monitor/rpc_test.py index 702707b20..5445ef887 100644 --- a/tests/schains/monitor/rpc_test.py +++ b/tests/schains/monitor/rpc_test.py @@ -1,11 +1,20 @@ +import datetime +import json +import mock from time import sleep +import freezegun +import requests + from core.schains.monitor.rpc import handle_failed_schain_rpc from core.schains.runner import get_container_info +from core.schains.rpc import check_endpoint_blocks from tools.configs.containers import SCHAIN_CONTAINER - from web.models.schain import SChainRecord +CURRENT_TIMESTAMP = 1594903080 +CURRENT_DATETIME = datetime.datetime.utcfromtimestamp(CURRENT_TIMESTAMP) + def test_handle_failed_schain_rpc_no_container(schain_db, dutils, skaled_status): schain_record = SChainRecord.get_by_name(schain_db) @@ -15,21 +24,17 @@ def test_handle_failed_schain_rpc_no_container(schain_db, dutils, skaled_status) schain={'name': schain_db}, schain_record=schain_record, skaled_status=skaled_status, - dutils=dutils + dutils=dutils, ) assert not dutils.is_container_exists(container_name) def test_handle_failed_schain_rpc_exit_time_reached( - schain_db, - dutils, - cleanup_schain_containers, - skaled_status_exit_time_reached + schain_db, dutils, cleanup_schain_containers, skaled_status_exit_time_reached ): schain_record = SChainRecord.get_by_name(schain_db) - image_name, container_name, _, _ = get_container_info( - SCHAIN_CONTAINER, schain_db) + image_name, container_name, _, _ = get_container_info(SCHAIN_CONTAINER, schain_db) dutils.run_container(image_name=image_name, name=container_name, entrypoint='bash -c "exit 0"') sleep(7) @@ -42,7 +47,7 @@ def test_handle_failed_schain_rpc_exit_time_reached( schain={'name': schain_db}, schain_record=schain_record, skaled_status=skaled_status_exit_time_reached, - dutils=dutils + dutils=dutils, ) assert dutils.is_container_exists(container_name) @@ -51,20 +56,14 @@ def test_handle_failed_schain_rpc_exit_time_reached( def test_monitor_schain_downloading_snapshot( - schain_db, - dutils, - cleanup_schain_containers, - skaled_status_downloading_snapshot + schain_db, dutils, cleanup_schain_containers, skaled_status_downloading_snapshot ): schain_record = SChainRecord.get_by_name(schain_db) - image_name, container_name, _, _ = get_container_info( - SCHAIN_CONTAINER, schain_db) + image_name, container_name, _, _ = get_container_info(SCHAIN_CONTAINER, schain_db) dutils.run_container( - image_name=image_name, - name=container_name, - entrypoint='bash -c "sleep 100"' + image_name=image_name, name=container_name, entrypoint='bash -c "sleep 100"' ) sleep(7) schain_record.set_failed_rpc_count(100) @@ -76,25 +75,19 @@ def test_monitor_schain_downloading_snapshot( schain={'name': schain_db}, schain_record=schain_record, skaled_status=skaled_status_downloading_snapshot, - dutils=dutils + dutils=dutils, ) container_info = dutils.get_info(container_name) assert container_info['stats']['State']['FinishedAt'] == finished_at def test_handle_failed_schain_rpc_stuck_max_retries( - schain_db, - dutils, - skaled_status, - cleanup_schain_containers + schain_db, dutils, skaled_status, cleanup_schain_containers ): schain_record = SChainRecord.get_by_name(schain_db) - image_name, container_name, _, _ = get_container_info( - SCHAIN_CONTAINER, schain_db) + image_name, container_name, _, _ = get_container_info(SCHAIN_CONTAINER, schain_db) dutils.run_container( - image_name=image_name, - name=container_name, - entrypoint='bash -c "sleep 100"' + image_name=image_name, name=container_name, entrypoint='bash -c "sleep 100"' ) schain_record.set_failed_rpc_count(100) @@ -107,7 +100,7 @@ def test_handle_failed_schain_rpc_stuck_max_retries( schain={'name': schain_db}, schain_record=schain_record, skaled_status=skaled_status, - dutils=dutils + dutils=dutils, ) container_info = dutils.get_info(container_name) assert container_info['stats']['State']['FinishedAt'] == finished_at @@ -115,12 +108,9 @@ def test_handle_failed_schain_rpc_stuck_max_retries( def test_monitor_container_exited(schain_db, dutils, cleanup_schain_containers, skaled_status): schain_record = SChainRecord.get_by_name(schain_db) - image_name, container_name, _, _ = get_container_info( - SCHAIN_CONTAINER, schain_db) + image_name, container_name, _, _ = get_container_info(SCHAIN_CONTAINER, schain_db) dutils.run_container( - image_name=image_name, - name=container_name, - entrypoint='bash -c "exit 100;"' + image_name=image_name, name=container_name, entrypoint='bash -c "exit 100;"' ) schain_record.set_failed_rpc_count(100) @@ -134,7 +124,7 @@ def test_monitor_container_exited(schain_db, dutils, cleanup_schain_containers, schain={'name': schain_db}, schain_record=schain_record, skaled_status=skaled_status, - dutils=dutils + dutils=dutils, ) assert schain_record.restart_count == 0 container_info = dutils.get_info(container_name) @@ -142,18 +132,12 @@ def test_monitor_container_exited(schain_db, dutils, cleanup_schain_containers, def test_handle_failed_schain_rpc_stuck( - schain_db, - dutils, - cleanup_schain_containers, - skaled_status + schain_db, dutils, cleanup_schain_containers, skaled_status ): schain_record = SChainRecord.get_by_name(schain_db) - image_name, container_name, _, _ = get_container_info( - SCHAIN_CONTAINER, schain_db) + image_name, container_name, _, _ = get_container_info(SCHAIN_CONTAINER, schain_db) dutils.run_container( - image_name=image_name, - name=container_name, - entrypoint='bash -c "sleep 100"' + image_name=image_name, name=container_name, entrypoint='bash -c "sleep 100"' ) schain_record.set_failed_rpc_count(100) @@ -167,8 +151,39 @@ def test_handle_failed_schain_rpc_stuck( schain={'name': schain_db}, schain_record=schain_record, skaled_status=skaled_status, - dutils=dutils + dutils=dutils, ) assert schain_record.restart_count == 1 container_info = dutils.get_info(container_name) assert container_info['stats']['State']['FinishedAt'] != finished_at + + +@mock.patch('tools.helper.requests.post') +@freezegun.freeze_time(CURRENT_DATETIME) +def test_check_endpoint_blocks(post_request_mock): + endpoint = 'http://127.0.0.1:10003' + + post_request_mock.side_effect = requests.exceptions.RequestException('Test error') + assert check_endpoint_blocks(endpoint) is False + post_request_mock.side_effect = None + + response_dummy = mock.Mock() + post_request_mock.return_value = response_dummy + + response_dummy.json = mock.Mock(return_value={}) + assert check_endpoint_blocks(endpoint) is False + + response_dummy.json = mock.Mock( + side_effect=json.JSONDecodeError('Test error', doc='doc', pos=1) + ) + assert check_endpoint_blocks(endpoint) is False + + response_dummy.json = mock.Mock(return_value={'result': {'timestamp': '0xhhhhh'}}) + assert check_endpoint_blocks(endpoint) is False + + response_dummy.json = mock.Mock(return_value={'result': {'timestamp': '0x1'}}) + assert check_endpoint_blocks(endpoint) is False + + hex_offset_ts = hex(CURRENT_TIMESTAMP + 1) + response_dummy.json = mock.Mock(return_value={'result': {'timestamp': hex_offset_ts}}) + assert check_endpoint_blocks(endpoint) is True diff --git a/tools/helper.py b/tools/helper.py index 8cfc28c3e..71b788f4f 100644 --- a/tools/helper.py +++ b/tools/helper.py @@ -184,3 +184,7 @@ def is_zero_address(address: str) -> bool: def is_address_contract(web3, address) -> bool: """Returns true if contract is deployed at the requested address""" return web3.eth.get_code(address) != b'' + + +def no_hyphens(name: str) -> str: + return name.replace('-', '_') diff --git a/web/models/schain.py b/web/models/schain.py index c685d864a..a7f67eb79 100644 --- a/web/models/schain.py +++ b/web/models/schain.py @@ -98,7 +98,13 @@ def to_dict(cls, record): 'monitor_last_seen': record.monitor_last_seen.timestamp(), 'monitor_id': record.monitor_id, 'config_version': record.config_version, - 'ssl_change_date': record.ssl_change_date.timestamp() + 'ssl_change_date': record.ssl_change_date.timestamp(), + 'repair_mode': record.repair_mode, + 'backup_run': record.backup_run, + 'sync_config_run': record.sync_config_run, + 'snapshot_from': record.snapshot_from, + 'restart_count': record.restart_count, + 'failed_rpc_count': record.failed_rpc_count } def upload(self, *args, **kwargs) -> None: diff --git a/web/routes/health.py b/web/routes/health.py index 56c5258e5..2503d674b 100644 --- a/web/routes/health.py +++ b/web/routes/health.py @@ -112,6 +112,7 @@ def schains_checks(): rotation_id=rotation_id, stream_version=stream_version, current_nodes=current_nodes, + last_dkg_successful=True, estate=estate, sync_node=False ).get_all(needed=checks_filter)