Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Test/data availability and integrity #2

Merged
merged 30 commits into from
Jan 27, 2025
Merged
Show file tree
Hide file tree
Changes from 28 commits
Commits
Show all changes
30 commits
Select commit Hold shift + click to select a range
231a391
test: add api endpoints
romanzac Jan 16, 2025
a5a3f17
test: setup main nodes fixture
romanzac Jan 16, 2025
2fc0cd1
test: use setup_main_nodes fixture
romanzac Jan 17, 2025
0d32bcd
test: add main_nodes as cluster setup
romanzac Jan 17, 2025
b424456
fix: test data set to list
romanzac Jan 17, 2025
0eac80f
fix: convert string into list of bytes
romanzac Jan 17, 2025
b83019c
fix: app_id should be list of 32 integers
romanzac Jan 17, 2025
1025e36
fix: index should be list of 8 integers
romanzac Jan 17, 2025
53bb790
test: get_data_range
romanzac Jan 17, 2025
a56dbfc
fix: remove hardcoded values from disperse_dat
romanzac Jan 17, 2025
b028574
fix: add data extraction and decoding
romanzac Jan 20, 2025
578efed
fix: decode data bytes
romanzac Jan 20, 2025
536864e
fix: access response.content
romanzac Jan 20, 2025
c5f82e6
test: debug received data
romanzac Jan 20, 2025
01b6025
test: use json response
romanzac Jan 20, 2025
be897bf
test: show response
romanzac Jan 20, 2025
77ce25c
test: let the test decode data
romanzac Jan 20, 2025
7cdcc0d
test: type as bytes before decoding
romanzac Jan 20, 2025
d021e52
fix: main nodes setup to 2 node cl setup
romanzac Jan 21, 2025
c6a8b44
fix: add 5 node cluster config
romanzac Jan 21, 2025
696a6b2
test: default config for 5 nodes temporarily
romanzac Jan 21, 2025
ca1686a
test: more robust volume initialization
romanzac Jan 21, 2025
56a4f6b
fix: find executor node
romanzac Jan 21, 2025
4bc7c74
test: retrieve data from half of nodes
romanzac Jan 22, 2025
82abae7
fix: retrieve data from second node
romanzac Jan 22, 2025
1b0731f
fix: copy appropriate config for desired cluster
romanzac Jan 22, 2025
5f68514
fix: optimize setup fixtures
romanzac Jan 22, 2025
0c91fe7
fix: add padding for dispersal data
romanzac Jan 23, 2025
2738c9e
fix: remove unused logger
romanzac Jan 27, 2025
b4d32df
fix: skip tests pending on Nomos node changes
romanzac Jan 27, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
31 changes: 31 additions & 0 deletions cluster_config/cfgsync-2node.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
port: 4400
n_hosts: 2
timeout: 30

# ConsensusConfig related parameters
security_param: 10
active_slot_coeff: 0.9

# DaConfig related parameters
subnetwork_size: 2
dispersal_factor: 2
num_samples: 1
num_subnets: 2
old_blobs_check_interval_secs: 5
blobs_validity_duration_secs: 60
global_params_path: "/kzgrs_test_params"

# Tracing
tracing_settings:
logger: Stdout
tracing: !Otlp
endpoint: http://tempo:4317/
sample_ratio: 0.5
service_name: node
filter: !EnvFilter
filters:
nomos: debug
metrics: !Otlp
endpoint: http://prometheus:9090/api/v1/otlp/v1/metrics
host_identifier: node
level: INFO
31 changes: 31 additions & 0 deletions cluster_config/cfgsync-5node.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
port: 4400
n_hosts: 5
timeout: 30

# ConsensusConfig related parameters
security_param: 10
active_slot_coeff: 0.9

# DaConfig related parameters
subnetwork_size: 2
dispersal_factor: 2
num_samples: 1
num_subnets: 2
old_blobs_check_interval_secs: 5
blobs_validity_duration_secs: 60
global_params_path: "/kzgrs_test_params"

# Tracing
tracing_settings:
logger: Stdout
tracing: !Otlp
endpoint: http://tempo:4317/
sample_ratio: 0.5
service_name: node
filter: !EnvFilter
filters:
nomos: debug
metrics: !Otlp
endpoint: http://prometheus:9090/api/v1/otlp/v1/metrics
host_identifier: node
level: INFO
2 changes: 1 addition & 1 deletion cluster_config/cfgsync.yaml
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
port: 4400
n_hosts: 2
n_hosts: 5
timeout: 30

# ConsensusConfig related parameters
Expand Down
7 changes: 7 additions & 0 deletions src/node/api_clients/rest.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,3 +23,10 @@ def rest_call_text(self, method, endpoint, payload=None):
def info(self):
status_response = self.rest_call("get", "cryptarchia/info")
return status_response.json()

def send_dispersal_request(self, data):
return self.rest_call("post", "disperse-data", json.dumps(data))

def send_get_range(self, query):
response = self.rest_call("post", "da/get-range", json.dumps(query))
return response.json()
13 changes: 11 additions & 2 deletions src/node/nomos_node.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,15 +29,15 @@ def __init__(self, node_type, container_name=""):
self._internal_ports = nomos_nodes[node_type]["ports"]
self._volumes = nomos_nodes[node_type]["volumes"]
self._entrypoint = nomos_nodes[node_type]["entrypoint"]
self._node_type = node_type

self._log_path = os.path.join(DOCKER_LOG_DIR, f"{container_name}__{self._image_name.replace('/', '_')}.log")
self._docker_manager = DockerManager(self._image_name)
self._container_name = container_name
self._container = None

cwd = os.getcwd()
for i, volume in enumerate(self._volumes):
self._volumes[i] = cwd + "/" + volume
self._volumes = [cwd + "/" + volume for volume in self._volumes]

logger.debug(f"NomosNode instance initialized with log path {self._log_path}")

Expand Down Expand Up @@ -136,6 +136,9 @@ def is_nomos(self):
def info(self):
return self._api.info()

def node_type(self):
return self._node_type

def check_nomos_log_errors(self, whitelist=None):
keywords = LOG_ERROR_KEYWORDS

Expand All @@ -145,3 +148,9 @@ def check_nomos_log_errors(self, whitelist=None):

matches = self._docker_manager.search_log_for_keywords(self._log_path, keywords, False)
assert not matches, f"Found errors {matches}"

def send_dispersal_request(self, data):
return self._api.send_dispersal_request(data)

def send_get_data_range_request(self, data):
return self._api.send_get_range(data)
65 changes: 58 additions & 7 deletions src/steps/common.py
Original file line number Diff line number Diff line change
@@ -1,20 +1,71 @@
import inspect
import os
import shutil

import pytest

from src.env_vars import NODE_1, NODE_2
from src.env_vars import CFGSYNC, NOMOS, NOMOS_EXECUTOR
from src.libs.custom_logger import get_custom_logger
from src.node.nomos_node import NomosNode

logger = get_custom_logger(__name__)


def prepare_cluster_config(node_count):
cwd = os.getcwd()
config_dir = "cluster_config"
src = f"{cwd}/{config_dir}/cfgsync-{node_count}node.yaml"
dst = f"{cwd}/{config_dir}/cfgsync.yaml"
shutil.copyfile(src, dst)


def start_nodes(nodes):
for node in nodes:
node.start()


def ensure_nodes_ready(nodes):
for node in nodes:
node.ensure_ready()


class StepsCommon:
@pytest.fixture(scope="function", autouse=True)
def cluster_setup(self):
logger.debug(f"Running fixture setup: {inspect.currentframe().f_code.co_name}")
self.main_nodes = []

@pytest.fixture(scope="function")
def setup_2_node_cluster(self, request):
logger.debug(f"Running fixture setup: {inspect.currentframe().f_code.co_name}")
prepare_cluster_config(2)
self.node1 = NomosNode(CFGSYNC, "cfgsync")
self.node2 = NomosNode(NOMOS, "nomos_node_0")
self.node3 = NomosNode(NOMOS_EXECUTOR, "nomos_node_1")
self.main_nodes.extend([self.node1, self.node2, self.node3])
start_nodes(self.main_nodes)

try:
ensure_nodes_ready(self.main_nodes[2:])
except Exception as ex:
logger.error(f"REST service did not become ready in time: {ex}")
raise

@pytest.fixture(scope="function")
def setup_main_nodes(self, request):
def setup_5_node_cluster(self, request):
logger.debug(f"Running fixture setup: {inspect.currentframe().f_code.co_name}")
self.node1 = NomosNode(NODE_1, f"node1_{request.cls.test_id}")
self.node1.start()
self.node2 = NomosNode(NODE_2, f"node2_{request.cls.test_id}")
self.node2.start()
self.main_nodes.extend([self.node1, self.node2])
prepare_cluster_config(5)
self.node1 = NomosNode(CFGSYNC, "cfgsync")
self.node2 = NomosNode(NOMOS, "nomos_node_0")
self.node3 = NomosNode(NOMOS, "nomos_node_1")
self.node4 = NomosNode(NOMOS, "nomos_node_2")
self.node5 = NomosNode(NOMOS, "nomos_node_3")
self.node6 = NomosNode(NOMOS_EXECUTOR, "nomos_node_4")
self.main_nodes.extend([self.node1, self.node2, self.node3, self.node4, self.node5, self.node6])
start_nodes(self.main_nodes)

try:
ensure_nodes_ready(self.main_nodes[2:])
except Exception as ex:
logger.error(f"REST service did not become ready in time: {ex}")
raise
65 changes: 65 additions & 0 deletions src/steps/da.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,65 @@
import allure

from src.env_vars import NOMOS_EXECUTOR
from src.steps.common import StepsCommon


def add_padding(orig_bytes):
block_size = 31
"""
Pads a list of bytes (integers in [0..255]) using a PKCS#7-like scheme:
- The value of each padded byte is the number of bytes padded.
- If the original data is already a multiple of the block size,
an additional full block of bytes (each the block size) is added.
"""
original_len = len(orig_bytes)
padding_needed = block_size - (original_len % block_size)
# If the data is already a multiple of block_size, add a full block of padding
if padding_needed == 0:
padding_needed = block_size

# Each padded byte will be equal to padding_needed
padded_bytes = orig_bytes + [padding_needed] * padding_needed
return padded_bytes


def prepare_dispersal_request(data, app_id, index):
data_bytes = data.encode("utf-8")
padded_bytes = add_padding(list(data_bytes))
dispersal_data = {"data": padded_bytes, "metadata": {"app_id": app_id, "index": index}}
return dispersal_data


def prepare_get_range_request(app_id, start_index, end_index):
query_data = {"app_id": app_id, "range": {"start": start_index, "end": end_index}}
return query_data


class StepsDataAvailability(StepsCommon):

def find_executor_node(self):
executor = {}
for node in self.main_nodes:
if node.node_type() == NOMOS_EXECUTOR:
executor = node
return executor

@allure.step
def disperse_data(self, data, app_id, index):
request = prepare_dispersal_request(data, app_id, index)
executor = self.find_executor_node()
try:
executor.send_dispersal_request(request)
except Exception as ex:
assert "Bad Request" in str(ex) or "Internal Server Error" in str(ex)

@allure.step
def get_data_range(self, node, app_id, start, end):
response = []
query = prepare_get_range_request(app_id, start, end)
try:
response = node.send_get_data_range_request(query)
except Exception as ex:
assert "Bad Request" in str(ex) or "Internal Server Error" in str(ex)

return response
10 changes: 10 additions & 0 deletions src/test_data.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,3 +24,13 @@
"race condition",
"double free",
]

DATA_TO_DISPERSE = [
"Hello World!",
"1234567890",
'{"key": "value"}',
"这是一些中文",
"🚀🌟✨",
"Lorem ipsum dolor sit amet",
"<html><body>Hello</body></html>",
]
32 changes: 28 additions & 4 deletions tests/data_integrity/test_data_integrity.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,30 @@
class TestDataIntegrity:
import pytest

from src.libs.custom_logger import get_custom_logger
from src.steps.da import StepsDataAvailability
from src.test_data import DATA_TO_DISPERSE

logger = get_custom_logger(__name__)

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

doesn't seem to be used

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fixed at 2738c9e


class TestDataIntegrity(StepsDataAvailability):
main_nodes = []

def test_cluster_start(self):
for node in self.main_nodes:
print(node)
@pytest.mark.usefixtures("setup_5_node_cluster")
def test_da_identify_retrieve_missing_columns(self):
self.disperse_data(DATA_TO_DISPERSE[0], [0] * 31 + [1], [0] * 8)
received_data = []
# Get data only from half of nodes
for node in self.main_nodes[2:4]:
received_data.append(self.get_data_range(node, [0] * 31 + [1], [0] * 8, [0] * 7 + [3]))

# Use received blob data to reconstruct the original data
# nomos-cli reconstruct command required
reconstructed_data = []
assert DATA_TO_DISPERSE[0] == bytes(reconstructed_data).decode("utf-8")

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

but bytes(reconstructed_data).decode("utf-8") is empty string and DATA_TO_DISPERSE[0] should be "Hello World!"
what are we asserting here?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, the nomos-cli should be called to reconstruct the data from received blobs. PR in nomos-node repo logos-co/nomos-node#994. I would assume nomos-cli as our "first to go" verification tool. We might think about being independent, with our own tooling, however this would come at certain cost. We can discuss more during meeting.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ok, but now the assert will not pass, right?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Tests skipped at b4d32df

@pytest.mark.usefixtures("setup_2_node_cluster")
def test_da_sampling_determines_data_presence(self):
self.disperse_data(DATA_TO_DISPERSE[0], [0] * 31 + [1], [0] * 8)
received_data = self.get_data_range(self.node2, [0] * 31 + [1], [0] * 8, [0] * 7 + [5])
assert DATA_TO_DISPERSE[0] == bytes(received_data[0][1]).decode("utf-8")
22 changes: 6 additions & 16 deletions tests/e2e/test_2node_alive.py
Original file line number Diff line number Diff line change
@@ -1,24 +1,14 @@
import pytest

from src.env_vars import CFGSYNC, NOMOS, NOMOS_EXECUTOR
from src.libs.custom_logger import get_custom_logger
from src.node.nomos_node import NomosNode
from src.steps.common import StepsCommon

logger = get_custom_logger(__name__)


class Test2NodeClAlive:
class Test2NodeClAlive(StepsCommon):
@pytest.mark.usefixtures("setup_2_node_cluster")
def test_cluster_start(self):

self.node1 = NomosNode(CFGSYNC, "cfgsync")
self.node2 = NomosNode(NOMOS, "nomos_node_0")
self.node3 = NomosNode(NOMOS_EXECUTOR, "nomos_node_1")

self.node1.start()
self.node2.start()
self.node3.start()

try:
self.node2.ensure_ready()
self.node3.ensure_ready()
except Exception as ex:
logger.error(f"REST service did not become ready in time: {ex}")
raise
logger.debug("Two node cluster started successfully!")
Loading