diff --git a/.gitignore b/.gitignore index 309862d..8c3ccce 100644 --- a/.gitignore +++ b/.gitignore @@ -143,3 +143,7 @@ dmypy.json cnt/* tar/ cer-feeds.json +*.whl +pairs.py +*cer-feeds.json +NOTES diff --git a/src/collector_node/collector_node.py b/src/collector_node/collector_node.py index fd06cb1..15e8dba 100644 --- a/src/collector_node/collector_node.py +++ b/src/collector_node/collector_node.py @@ -13,6 +13,7 @@ ``` """ + # pylint: disable=C0412 import argparse @@ -43,26 +44,24 @@ import config import feed_helper import flock - from cnt_collector_node.helper_functions import check_tokens_pair - from cnt_collector_node.pairs import DEX_PAIRS from version import get_version except ModuleNotFoundError: try: - from cnt_collector_node.helper_functions import check_tokens_pair - from cnt_collector_node.pairs import DEX_PAIRS - from collector_node import config, feed_helper, flock from collector_node.version import get_version except ModuleNotFoundError: from src.collector_node import config, feed_helper, flock from src.collector_node.version import get_version - try: - from src.cnt_collector_node.helper_functions import check_tokens_pair - from src.cnt_collector_node.pairs import DEX_PAIRS - except ModuleNotFoundError: - CNT_ENABLED = False +try: + # Import CNT related config. + import cnt_collector_node.database_abstraction as dba + import cnt_collector_node.global_helpers as helpers + from cnt_collector_node import load_pairs + from cnt_collector_node.helper_functions import check_tokens_pair +except ModuleNotFoundError: + CNT_ENABLED = False sys.dont_write_bytecode = True @@ -106,52 +105,57 @@ async def retrieve_cnt(requested: list, identity: dict) -> list: logger.info("connecting to the cnt database") conn = sqlite3.connect(config.CNT_DB_NAME) cur = conn.cursor() - database_context = { - "conn": conn, - "cur": cur, - } res = [] logger.info("connecting to ogmios") ogmios_ws: websocket.WebSocket = websocket.create_connection(config.OGMIOS_URL) use_kupo = False if config.KUPO_URL: + logger.info("using kupo: %s", use_kupo) use_kupo = True kupo_url = config.KUPO_URL - context = { - "ogmios_ws": ogmios_ws, - "logger": logger, - "use_kupo": use_kupo, - "kupo_url": kupo_url, - } + database = dba.DBObject(connection=conn, cursor=cur) + app_context = helpers.AppContext( + db_name=config.CNT_DB_NAME, + database=database, + ogmios_url=config.OGMIOS_URL, + ogmios_ws=ogmios_ws, + kupo_url=kupo_url, + use_kupo=use_kupo, + main_event=None, + thread_event=None, + reconnect_event=None, + ) for tokens_pair in requested: message, timestamp = await check_tokens_pair( - database_context, - context, - identity, - tokens_pair, + app_context=app_context, + identity=identity, + tokens_pair=tokens_pair, ) + if not message: + logger.error("no message returned for: '%s'", tokens_pair["name"]) + continue message = { "message": message, "node_id": identity["node_id"], "validation_timestamp": timestamp, } - if not message: - logger.error("no message returned for: '%s'", tokens_pair["name"]) - continue res.append(message) return res -async def fetch_dex_feeds(feeds: list, identity: dict) -> list: +async def fetch_dex_feeds(feeds: list, pairs_file: str, identity: dict) -> list: """Retrieve dex data from the CNT indexer.""" logger.debug("retrieving dex pairs") pairs = [] - for dex_pair in DEX_PAIRS: + dex_pairs = load_pairs.load(path=pairs_file) + for dex_pair in dex_pairs.DEX_PAIRS: if dex_pair["name"] not in feeds: continue pairs.append(dex_pair) logger.info( - "retrieving: '%s' pairs, from '%s' original pairs", len(pairs), len(DEX_PAIRS) + "retrieving: '%s' pairs, from '%s' original pairs", + len(pairs), + len(dex_pairs.DEX_PAIRS), ) return await retrieve_cnt(pairs, identity) @@ -237,13 +241,13 @@ async def send_to_ws(validator_websocket, data_to_send: dict): return -async def collect_dex(dex_feeds: list, identity: dict) -> list: +async def collect_dex(dex_feeds: list, pairs_file: str, identity: dict) -> list: """Collect dex data and provide a way to exit gracefully if the configuration is incorrect. """ data_dex = [] if CNT_ENABLED: - data_dex = await fetch_dex_feeds(dex_feeds, identity) + data_dex = await fetch_dex_feeds(dex_feeds, pairs_file, identity) return data_dex @@ -297,7 +301,7 @@ async def send_data_to_validator( logger.error("problem connecting to the validator: %s", err) -async def fetch_and_send(feeds: list, identity: dict) -> None: +async def fetch_and_send(feeds: list, pairs_file: str, identity: dict) -> None: """Fetch feed data and send it to a validator websocket.""" logger.debug("in fetch and send for all feeds") @@ -316,7 +320,7 @@ async def fetch_and_send(feeds: list, identity: dict) -> None: logger.debug("len dex feeds: '%s'", len(dex_feeds)) data_cex = fetch_cex_feeds(cex_feeds) - data_dex = await collect_dex(dex_feeds, identity) + data_dex = await collect_dex(dex_feeds, pairs_file, identity) id_ = identity["node_id"] validator_uris = [] @@ -333,7 +337,7 @@ async def fetch_and_send(feeds: list, identity: dict) -> None: await asyncio.gather(*tasks) -async def collector_main(feeds_file: str): +async def collector_main(feeds_file: str, pairs_file: str): """Collector node main. The script is designed so that it is staggered between 1 and 20 seconds @@ -352,7 +356,7 @@ async def collector_main(feeds_file: str): await asyncio.sleep(run_interval) identity = await read_identity() feeds = await feed_helper.read_feeds_file(feeds_file=feeds_file) - await fetch_and_send(feeds=feeds, identity=identity) + await fetch_and_send(feeds=feeds, pairs_file=pairs_file, identity=identity) def main(): @@ -367,6 +371,12 @@ def main(): help="feed data describing feeds being monitored (CER-feeds (JSON))", required=True, ) + parser.add_argument( + "--pairs", + "-p", + help="supply a pairs file to the script", + required=False, + ) parser.add_argument( "--debug", help="enable debug logging (verbose)", @@ -383,7 +393,12 @@ def main(): try: with flock.FlockContext(flock_name_base="cnode_runner"): try: - asyncio.run(collector_main(feeds_file=args.feeds)) + asyncio.run( + collector_main( + feeds_file=args.feeds, + pairs_file=args.pairs, + ) + ) # pylint: disable=W0718 # global catch, if this doesn't run, nothing does. except Exception as err: logger.debug("error: %s", repr(err))