Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -143,3 +143,7 @@ dmypy.json
cnt/*
tar/
cer-feeds.json
*.whl
pairs.py
*cer-feeds.json
NOTES
89 changes: 52 additions & 37 deletions src/collector_node/collector_node.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
```

"""

# pylint: disable=C0412

import argparse
Expand Down Expand Up @@ -43,26 +44,24 @@
import config
import feed_helper
import flock
from cnt_collector_node.helper_functions import check_tokens_pair
from cnt_collector_node.pairs import DEX_PAIRS
from version import get_version
except ModuleNotFoundError:
try:
from cnt_collector_node.helper_functions import check_tokens_pair
from cnt_collector_node.pairs import DEX_PAIRS

from collector_node import config, feed_helper, flock
from collector_node.version import get_version
except ModuleNotFoundError:
from src.collector_node import config, feed_helper, flock
from src.collector_node.version import get_version

try:
from src.cnt_collector_node.helper_functions import check_tokens_pair
from src.cnt_collector_node.pairs import DEX_PAIRS
except ModuleNotFoundError:
CNT_ENABLED = False
try:
# Import CNT related config.
import cnt_collector_node.database_abstraction as dba
import cnt_collector_node.global_helpers as helpers
from cnt_collector_node import load_pairs
from cnt_collector_node.helper_functions import check_tokens_pair

except ModuleNotFoundError:
CNT_ENABLED = False

sys.dont_write_bytecode = True

Expand Down Expand Up @@ -106,52 +105,57 @@ async def retrieve_cnt(requested: list, identity: dict) -> list:
logger.info("connecting to the cnt database")
conn = sqlite3.connect(config.CNT_DB_NAME)
cur = conn.cursor()
database_context = {
"conn": conn,
"cur": cur,
}
res = []
logger.info("connecting to ogmios")
ogmios_ws: websocket.WebSocket = websocket.create_connection(config.OGMIOS_URL)
use_kupo = False
if config.KUPO_URL:
logger.info("using kupo: %s", use_kupo)
use_kupo = True
kupo_url = config.KUPO_URL
context = {
"ogmios_ws": ogmios_ws,
"logger": logger,
"use_kupo": use_kupo,
"kupo_url": kupo_url,
}
database = dba.DBObject(connection=conn, cursor=cur)
app_context = helpers.AppContext(
db_name=config.CNT_DB_NAME,
database=database,
ogmios_url=config.OGMIOS_URL,
ogmios_ws=ogmios_ws,
kupo_url=kupo_url,
use_kupo=use_kupo,
main_event=None,
thread_event=None,
reconnect_event=None,
)
for tokens_pair in requested:
message, timestamp = await check_tokens_pair(
database_context,
context,
identity,
tokens_pair,
app_context=app_context,
identity=identity,
tokens_pair=tokens_pair,
)
if not message:
logger.error("no message returned for: '%s'", tokens_pair["name"])
continue
message = {
"message": message,
"node_id": identity["node_id"],
"validation_timestamp": timestamp,
}
if not message:
logger.error("no message returned for: '%s'", tokens_pair["name"])
continue
res.append(message)
return res


async def fetch_dex_feeds(feeds: list, identity: dict) -> list:
async def fetch_dex_feeds(feeds: list, pairs_file: str, identity: dict) -> list:
"""Retrieve dex data from the CNT indexer."""
logger.debug("retrieving dex pairs")
pairs = []
for dex_pair in DEX_PAIRS:
dex_pairs = load_pairs.load(path=pairs_file)
for dex_pair in dex_pairs.DEX_PAIRS:
if dex_pair["name"] not in feeds:
continue
pairs.append(dex_pair)
logger.info(
"retrieving: '%s' pairs, from '%s' original pairs", len(pairs), len(DEX_PAIRS)
"retrieving: '%s' pairs, from '%s' original pairs",
len(pairs),
len(dex_pairs.DEX_PAIRS),
)
return await retrieve_cnt(pairs, identity)

Expand Down Expand Up @@ -237,13 +241,13 @@ async def send_to_ws(validator_websocket, data_to_send: dict):
return


async def collect_dex(dex_feeds: list, identity: dict) -> list:
async def collect_dex(dex_feeds: list, pairs_file: str, identity: dict) -> list:
"""Collect dex data and provide a way to exit gracefully if the
configuration is incorrect.
"""
data_dex = []
if CNT_ENABLED:
data_dex = await fetch_dex_feeds(dex_feeds, identity)
data_dex = await fetch_dex_feeds(dex_feeds, pairs_file, identity)
return data_dex


Expand Down Expand Up @@ -297,7 +301,7 @@ async def send_data_to_validator(
logger.error("problem connecting to the validator: %s", err)


async def fetch_and_send(feeds: list, identity: dict) -> None:
async def fetch_and_send(feeds: list, pairs_file: str, identity: dict) -> None:
"""Fetch feed data and send it to a validator websocket."""

logger.debug("in fetch and send for all feeds")
Expand All @@ -316,7 +320,7 @@ async def fetch_and_send(feeds: list, identity: dict) -> None:
logger.debug("len dex feeds: '%s'", len(dex_feeds))

data_cex = fetch_cex_feeds(cex_feeds)
data_dex = await collect_dex(dex_feeds, identity)
data_dex = await collect_dex(dex_feeds, pairs_file, identity)

id_ = identity["node_id"]
validator_uris = []
Expand All @@ -333,7 +337,7 @@ async def fetch_and_send(feeds: list, identity: dict) -> None:
await asyncio.gather(*tasks)


async def collector_main(feeds_file: str):
async def collector_main(feeds_file: str, pairs_file: str):
"""Collector node main.

The script is designed so that it is staggered between 1 and 20 seconds
Expand All @@ -352,7 +356,7 @@ async def collector_main(feeds_file: str):
await asyncio.sleep(run_interval)
identity = await read_identity()
feeds = await feed_helper.read_feeds_file(feeds_file=feeds_file)
await fetch_and_send(feeds=feeds, identity=identity)
await fetch_and_send(feeds=feeds, pairs_file=pairs_file, identity=identity)


def main():
Expand All @@ -367,6 +371,12 @@ def main():
help="feed data describing feeds being monitored (CER-feeds (JSON))",
required=True,
)
parser.add_argument(
"--pairs",
"-p",
help="supply a pairs file to the script",
required=False,
)
parser.add_argument(
"--debug",
help="enable debug logging (verbose)",
Expand All @@ -383,7 +393,12 @@ def main():
try:
with flock.FlockContext(flock_name_base="cnode_runner"):
try:
asyncio.run(collector_main(feeds_file=args.feeds))
asyncio.run(
collector_main(
feeds_file=args.feeds,
pairs_file=args.pairs,
)
)
# pylint: disable=W0718 # global catch, if this doesn't run, nothing does.
except Exception as err:
logger.debug("error: %s", repr(err))
Expand Down