From ee8624b70d328853cb6d21a3f348e6ac78810878 Mon Sep 17 00:00:00 2001 From: ross-spencer Date: Fri, 12 Dec 2025 17:35:20 +0100 Subject: [PATCH 1/4] WIP: dynamic pairs --- src/collector_node/collector_node.py | 52 +++++++++++++++++----------- 1 file changed, 31 insertions(+), 21 deletions(-) diff --git a/src/collector_node/collector_node.py b/src/collector_node/collector_node.py index fd06cb1..708f312 100644 --- a/src/collector_node/collector_node.py +++ b/src/collector_node/collector_node.py @@ -13,6 +13,7 @@ ``` """ + # pylint: disable=C0412 import argparse @@ -43,26 +44,21 @@ import config import feed_helper import flock - from cnt_collector_node.helper_functions import check_tokens_pair - from cnt_collector_node.pairs import DEX_PAIRS from version import get_version except ModuleNotFoundError: try: - from cnt_collector_node.helper_functions import check_tokens_pair - from cnt_collector_node.pairs import DEX_PAIRS - from collector_node import config, feed_helper, flock from collector_node.version import get_version except ModuleNotFoundError: from src.collector_node import config, feed_helper, flock from src.collector_node.version import get_version - try: - from src.cnt_collector_node.helper_functions import check_tokens_pair - from src.cnt_collector_node.pairs import DEX_PAIRS - except ModuleNotFoundError: - CNT_ENABLED = False - +try: + # Import CNT related config. + from cnt_collector_node.helper_functions import check_tokens_pair + from cnt_collector_node import load_pairs +except ModuleNotFoundError: + CNT_ENABLED = False sys.dont_write_bytecode = True @@ -142,16 +138,19 @@ async def retrieve_cnt(requested: list, identity: dict) -> list: return res -async def fetch_dex_feeds(feeds: list, identity: dict) -> list: +async def fetch_dex_feeds(feeds: list, pairs_file: str, identity: dict) -> list: """Retrieve dex data from the CNT indexer.""" logger.debug("retrieving dex pairs") pairs = [] - for dex_pair in DEX_PAIRS: + dex_pairs = load_pairs.load(path=pairs_file) + for dex_pair in dex_pairs.DEX_PAIRS: if dex_pair["name"] not in feeds: continue pairs.append(dex_pair) logger.info( - "retrieving: '%s' pairs, from '%s' original pairs", len(pairs), len(DEX_PAIRS) + "retrieving: '%s' pairs, from '%s' original pairs", + len(pairs), + len(dex_pairs.DEX_PAIRS), ) return await retrieve_cnt(pairs, identity) @@ -237,13 +236,13 @@ async def send_to_ws(validator_websocket, data_to_send: dict): return -async def collect_dex(dex_feeds: list, identity: dict) -> list: +async def collect_dex(dex_feeds: list, pairs_file: str, identity: dict) -> list: """Collect dex data and provide a way to exit gracefully if the configuration is incorrect. """ data_dex = [] if CNT_ENABLED: - data_dex = await fetch_dex_feeds(dex_feeds, identity) + data_dex = await fetch_dex_feeds(dex_feeds, pairs_file, identity) return data_dex @@ -297,7 +296,7 @@ async def send_data_to_validator( logger.error("problem connecting to the validator: %s", err) -async def fetch_and_send(feeds: list, identity: dict) -> None: +async def fetch_and_send(feeds: list, pairs_file: str, identity: dict) -> None: """Fetch feed data and send it to a validator websocket.""" logger.debug("in fetch and send for all feeds") @@ -316,7 +315,7 @@ async def fetch_and_send(feeds: list, identity: dict) -> None: logger.debug("len dex feeds: '%s'", len(dex_feeds)) data_cex = fetch_cex_feeds(cex_feeds) - data_dex = await collect_dex(dex_feeds, identity) + data_dex = await collect_dex(dex_feeds, pairs_file, identity) id_ = identity["node_id"] validator_uris = [] @@ -333,7 +332,7 @@ async def fetch_and_send(feeds: list, identity: dict) -> None: await asyncio.gather(*tasks) -async def collector_main(feeds_file: str): +async def collector_main(feeds_file: str, pairs_file: str): """Collector node main. The script is designed so that it is staggered between 1 and 20 seconds @@ -352,7 +351,7 @@ async def collector_main(feeds_file: str): await asyncio.sleep(run_interval) identity = await read_identity() feeds = await feed_helper.read_feeds_file(feeds_file=feeds_file) - await fetch_and_send(feeds=feeds, identity=identity) + await fetch_and_send(feeds=feeds, pairs_file=pairs_file, identity=identity) def main(): @@ -367,6 +366,12 @@ def main(): help="feed data describing feeds being monitored (CER-feeds (JSON))", required=True, ) + parser.add_argument( + "--pairs", + "-p", + help="supply a pairs file to the script", + required=False, + ) parser.add_argument( "--debug", help="enable debug logging (verbose)", @@ -383,7 +388,12 @@ def main(): try: with flock.FlockContext(flock_name_base="cnode_runner"): try: - asyncio.run(collector_main(feeds_file=args.feeds)) + asyncio.run( + collector_main( + feeds_file=args.feeds, + pairs_file=args.pairs, + ) + ) # pylint: disable=W0718 # global catch, if this doesn't run, nothing does. except Exception as err: logger.debug("error: %s", repr(err)) From f3c5c9cb204f7b8d6c3f20c4b00092cb7f847b91 Mon Sep 17 00:00:00 2001 From: ross-spencer Date: Mon, 15 Dec 2025 13:52:34 +0100 Subject: [PATCH 2/4] WIP: updated collector --- src/collector_node/collector_node.py | 35 ++++++++++++++++------------ 1 file changed, 20 insertions(+), 15 deletions(-) diff --git a/src/collector_node/collector_node.py b/src/collector_node/collector_node.py index 708f312..d1cf45b 100644 --- a/src/collector_node/collector_node.py +++ b/src/collector_node/collector_node.py @@ -55,8 +55,11 @@ try: # Import CNT related config. - from cnt_collector_node.helper_functions import check_tokens_pair + import cnt_collector_node.database_abstraction as dba + import cnt_collector_node.global_helpers as helpers from cnt_collector_node import load_pairs + from cnt_collector_node.helper_functions import check_tokens_pair + except ModuleNotFoundError: CNT_ENABLED = False @@ -102,29 +105,31 @@ async def retrieve_cnt(requested: list, identity: dict) -> list: logger.info("connecting to the cnt database") conn = sqlite3.connect(config.CNT_DB_NAME) cur = conn.cursor() - database_context = { - "conn": conn, - "cur": cur, - } res = [] logger.info("connecting to ogmios") ogmios_ws: websocket.WebSocket = websocket.create_connection(config.OGMIOS_URL) use_kupo = False if config.KUPO_URL: + logger.info("using kupo: %s", use_kupo) use_kupo = True kupo_url = config.KUPO_URL - context = { - "ogmios_ws": ogmios_ws, - "logger": logger, - "use_kupo": use_kupo, - "kupo_url": kupo_url, - } + database = dba.DBObject(connection=conn, cursor=cur) + app_context = helpers.AppContext( + db_name=config.CNT_DB_NAME, + database=database, + ogmios_url=config.OGMIOS_URL, + ogmios_ws=ogmios_ws, + kupo_url=kupo_url, + use_kupo=use_kupo, + main_event=None, + thread_event=None, + reconnect_event=None, + ) for tokens_pair in requested: message, timestamp = await check_tokens_pair( - database_context, - context, - identity, - tokens_pair, + app_context=app_context, + identity=identity, + tokens_pair=tokens_pair, ) message = { "message": message, From 0d05681d7cae79f47871f796fd8b487c8a23c4b9 Mon Sep 17 00:00:00 2001 From: ross-spencer Date: Mon, 15 Dec 2025 13:53:41 +0100 Subject: [PATCH 3/4] WIP: update .gitignore --- .gitignore | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/.gitignore b/.gitignore index 309862d..8c3ccce 100644 --- a/.gitignore +++ b/.gitignore @@ -143,3 +143,7 @@ dmypy.json cnt/* tar/ cer-feeds.json +*.whl +pairs.py +*cer-feeds.json +NOTES From e1a140fcf71ec2f959781aa2c7d810fac62bbe9f Mon Sep 17 00:00:00 2001 From: ross-spencer Date: Mon, 12 Jan 2026 00:27:44 +0100 Subject: [PATCH 4/4] WIP: log and don't send non-messages --- src/collector_node/collector_node.py | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/src/collector_node/collector_node.py b/src/collector_node/collector_node.py index d1cf45b..15e8dba 100644 --- a/src/collector_node/collector_node.py +++ b/src/collector_node/collector_node.py @@ -131,14 +131,14 @@ async def retrieve_cnt(requested: list, identity: dict) -> list: identity=identity, tokens_pair=tokens_pair, ) + if not message: + logger.error("no message returned for: '%s'", tokens_pair["name"]) + continue message = { "message": message, "node_id": identity["node_id"], "validation_timestamp": timestamp, } - if not message: - logger.error("no message returned for: '%s'", tokens_pair["name"]) - continue res.append(message) return res