Skip to content

Commit

Permalink
Bugfix: Failover for non-existent nodes (#1992)
Browse files Browse the repository at this point in the history
  • Loading branch information
k9ert authored Dec 2, 2022
1 parent da0731d commit 6f72a5c
Show file tree
Hide file tree
Showing 6 changed files with 80 additions and 15 deletions.
26 changes: 25 additions & 1 deletion src/cryptoadvance/specter/managers/node_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
from ..node import Node
from ..internal_node import InternalNode
from ..services import callbacks
from ..managers.service_manager import ServiceManager
from ..util.bitcoind_setup_tasks import setup_bitcoind_thread

logger = logging.getLogger(__name__)
Expand All @@ -28,6 +29,7 @@ def __init__(
bitcoind_path="",
internal_bitcoind_version="",
data_folder="",
service_manager=None,
):
self.nodes = {}
# Dict is sth. like: {'nigiri_regtest': <Node name=Nigiri regtest fullpath=...>, 'default': <Node name=Bitcoin Core fullpath=...>}
Expand All @@ -37,6 +39,7 @@ def __init__(
self.only_tor = only_tor
self.bitcoind_path = bitcoind_path
self.internal_bitcoind_version = internal_bitcoind_version
self.service_manager: ServiceManager = service_manager
self.load_from_disk(data_folder)
internal_nodes = [
node for node in self.nodes.values() if not node.external_node
Expand All @@ -55,12 +58,33 @@ def load_from_disk(self, data_folder=None):
nodes_files = load_jsons(self.data_folder, key="alias")
for node_alias in nodes_files:
try:
self.nodes[node_alias] = PersistentObject.from_json(
valid_node = True
node = PersistentObject.from_json(
nodes_files[node_alias],
self,
default_alias=nodes_files[node_alias]["alias"],
default_fullpath=calc_fullpath(self.data_folder, node_alias),
)
print("---------------------------------------")
print(node.__class__.__module__.split("."))
if (
node.__class__.__module__.split(".")[1] == "specterext"
): # e.g. cryptoadvance.specterext.spectrum
if self.service_manager:
if not self.service_manager.is_class_from_loaded_extension(
node.__class__
):
logger.warning(
f"Cannot Load {node} due to corresponding plugin not loaded"
)
continue
else:
logger.warning(
f"Cannot validate Node {node} to be a valid node, skipping"
)
continue
self.nodes[node_alias] = node
logger.info(f"Loaded Node {self.nodes[node_alias]}")
except SpecterInternalException as e:
logger.error(f"Skipping node {node_alias} due to {e}")

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -315,6 +315,17 @@ def services_sorted(self):
)
return [self._services[s] for s in service_names]

def is_class_from_loaded_extension(self, claz):
"""Returns Ture if that class is from a module which belongs to an extension
which is loaded, False otherwise
"""
print("")
ext_module = ".".join(claz.__module__.split(".")[0:3])
for ext in self.services_sorted:
if ext.__class__.__module__.startswith(ext_module):
return True
return False

def user_has_encrypted_storage(self, user: User) -> bool:
"""Looks for any data for any service in the User's ServiceEncryptedStorage.
This check works even if the user doesn't have their plaintext_user_secret
Expand Down
19 changes: 16 additions & 3 deletions src/cryptoadvance/specter/specter.py
Original file line number Diff line number Diff line change
Expand Up @@ -120,10 +120,23 @@ def initialize(self):
bitcoind_path=self.bitcoind_path,
internal_bitcoind_version=self._internal_bitcoind_version,
data_folder=os.path.join(self.data_folder, "nodes"),
service_manager=self.service_manager
if hasattr(self, "service_manager")
else None,
)
logger.debug(
f"This is the active node in the node manager: {self.node_manager.active_node}"
)
try:
logger.debug(
f"This is the active node in the node manager: {self.node_manager.active_node}"
)
except SpecterError as e:
if str(e).endswith("does not exist!"):
logger.warning(
f"Current Node doesn't exist. Switching over to node {self.node_manager.DEFAULT_ALIAS}."
)
self.update_active_node(self.node_manager.DEFAULT_ALIAS)
else:
raise e

self.torbrowser_path = os.path.join(
self.data_folder, f"tor-binaries/tor{get_tor_daemon_suffix()}"
)
Expand Down
20 changes: 12 additions & 8 deletions src/cryptoadvance/specter/util/checker.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,24 +28,26 @@ def __init__(self, callback, period=600, desc="unknown"):
def start(self):
if not self.running:
self.running = True
self.error_counter = 0
self.thread = FlaskThread(target=self.loop)
self.thread.daemon = True
self.thread.start()
logger.info(f"Checker {self.desc} started with period {self.period}")
logger.info(f"Checker {self.desc} started with period {self.period}")
else:
logger.warning(f"Checker {self.desc} started but ran already")

def stop(self):
logger.info(f"Checker {self.desc} stopped.")
self.running = False

def loop(self):
self.error_counter = 0
self._execute(first_execution=True)
while self.running:
# check if it's time to update
if time.time() - self.last_check >= self.period:
self._execute()
# wait 1 second
self._sleep()
logger.info(f"Checker {self.desc} stopped.")

def _execute(self, first_execution=False):
try:
Expand All @@ -58,11 +60,13 @@ def _execute(self, first_execution=False):
% dt
)
except Exception as e:
if self.error_counter < 5:
logger.exception(e)
self.error_counter = self.error_counter + 1
if self.error_counter == 4:
logger.error("The above Error-Message is now suppressed!")
self.error_counter += 1
if self.error_counter <= 5:
logger.exception(
f"Checker thread {self.desc} threw {e} for the {self.error_counter}th time"
)
if self.error_counter == 5:
logger.error(f"The above Error-Message is from now on suppressed!")
finally:
self.last_check = time.time()

Expand Down
6 changes: 3 additions & 3 deletions tests/test_checker.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ def test_checker(caplog):
assert "Checker test started" in caplog.text
assert "This message won't show again until stopped and started." in caplog.text
checker.stop()
time.sleep(0.01)
assert "Checker test stopped" in caplog.text
callback_mock.side_effect = Exception("someException")
checker.start()
Expand All @@ -25,8 +26,7 @@ def test_checker(caplog):
checker.stop()
assert "someException" in caplog.text
# should output 5 times
assert caplog.text.count("[ ERROR] someException") == 5
assert caplog.text.count("[ ERROR] Checker thread") == 5
# But should also show stacktrace 5 times
assert caplog.text.count("Exception: someException") == 5
assert "The above Error-Message is now suppressed" in caplog.text
assert caplog.text.count("The above Error-Message is now suppressed") == 1
assert caplog.text.count("The above Error-Message is from now on suppressed!") == 1
13 changes: 13 additions & 0 deletions tests/test_managers_service.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,10 @@
from cryptoadvance.specter.managers.service_manager import ServiceManager
from cryptoadvance.specter.services.callbacks import after_serverpy_init_app

from cryptoadvance.specterext.swan.service import SwanService
from cryptoadvance.specterext.swan.service import SwanClient
from cryptoadvance.specterext.devhelp.service import DevhelpService


def test_ServiceManager2(mock_specter, mock_flaskapp, caplog):
ctx = mock_flaskapp.app_context()
Expand All @@ -22,6 +26,15 @@ def test_ServiceManager2(mock_specter, mock_flaskapp, caplog):
sm.execute_ext_callbacks(after_serverpy_init_app, scheduler=None)


def test_is_class_from_loaded_extension(mock_specter, mock_flaskapp):
with mock_flaskapp.app_context():
sm = ServiceManager(mock_specter, "alpha")
assert type(sm.services_sorted[0]) == SwanService
assert sm.is_class_from_loaded_extension(SwanClient)
assert sm.is_class_from_loaded_extension(SwanService)
assert not sm.is_class_from_loaded_extension(DevhelpService)


@pytest.mark.skip(reason="The .buildenv directoy does not exist on the CI-Infra")
def test_ServiceManager_get_service_x_dirs(caplog):
caplog.set_level(logging.DEBUG)
Expand Down

0 comments on commit 6f72a5c

Please sign in to comment.