Skip to content

feat(netsim): added support for playbooks #39

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 6 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
18 changes: 18 additions & 0 deletions .github/workflows/netsim_integration.yml
Original file line number Diff line number Diff line change
Expand Up @@ -45,18 +45,36 @@ jobs:
git clone https://github.com/n0-computer/iroh.git
cd iroh
cargo build --release

- name: Fetch and build iroh-ffi
run: |
git clone https://github.com/n0-computer/iroh-ffi.git
cd iroh-ffi
echo "iroh = { path = \"../iroh\" }" >> Cargo.toml
pip3 install maturin uniffi-bindgen
maturin build --release

- name: Copy binaries to right location
run: |
cp target/release/chuck netsim/bins/chuck
cp iroh/target/release/iroh netsim/bins/iroh
cp iroh-ffi/target/wheels/iroh-*-py3-none-manylinux_2_34_x86_64.whl ./netsim/bins/

- name: Setup python venv
run: |
cd netsim
python3 -m venv venv
source venv/bin/activate
pip3 install bins/iroh-*.whl
pip3 install -r playbooks/requirements.txt

- name: Run tests
run: |
cd netsim
sudo kill -9 $(pgrep ovs)
sudo mn --clean
sudo python3 main.py --integration sims/standard/iroh.json
sudo python3 main.py --integration sims/example/playbook.json

- name: Setup Environment (PR)
if: ${{ github.event_name == 'pull_request' }}
Expand Down
78 changes: 52 additions & 26 deletions netsim/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,32 @@ def logs_on_error(nodes, prefix):
else:
print('[WARN] log file missing: %s' % log_name)

def build_cmd(node, i, node_ips, node_params, node_counts):
cmd = node['cmd']
if 'param' in node:
if node['param'] == 'id':
cmd = cmd % i
if node['connect']['strategy'] == 'plain':
cnt = node_counts[node['connect']['node']]
id = i % cnt
connect_to = '%s_%d' % (node['connect']['node'], id)
ip = node_ips[connect_to]
cmd = cmd % ip
if node['connect']['strategy'] == 'plain_with_id':
cnt = node_counts[node['connect']['node']]
id = i % cnt
connect_to = '%s_%d' % (node['connect']['node'], id)
ip = node_ips[connect_to]
cmd = cmd % (ip, id)
if node['connect']['strategy'] == 'params':
cnt = node_counts[node['connect']['node']]
id = i % cnt
connect_to = '%s_%d' % (node['connect']['node'], id)
param = node_params[connect_to]
cmd = cmd % (param)
return cmd


def run(nodes, prefix, args, debug=False, full_debug=False, visualize=False):
integration = args.integration
topo = StarTopo(nodes=nodes)
Expand Down Expand Up @@ -72,42 +98,39 @@ def run(nodes, prefix, args, debug=False, full_debug=False, visualize=False):

temp_dirs = []

ftc = []

for node in nodes:
node_counts[node['name']] = int(node['count'])
for i in range(int(node['count'])):
node_name = '%s_%d' % (node['name'], i)
f = open('logs/%s__%s.txt' % (prefix, node_name), 'w+')
ftc.append(f)
n = net.get(node_name)
node_ips[node_name] = n.IP()
cmd = node['cmd']
if 'param' in node:
if node['param'] == 'id':
cmd = cmd % i
if node['connect']['strategy'] == 'plain':
cnt = node_counts[node['connect']['node']]
id = i % cnt
connect_to = '%s_%d' % (node['connect']['node'], id)
ip = node_ips[connect_to]
cmd = cmd % ip
if node['connect']['strategy'] == 'plain_with_id':
cnt = node_counts[node['connect']['node']]
id = i % cnt
connect_to = '%s_%d' % (node['connect']['node'], id)
ip = node_ips[connect_to]
cmd = cmd % (ip, id)
if node['connect']['strategy'] == 'params':
cnt = node_counts[node['connect']['node']]
id = i % cnt
connect_to = '%s_%d' % (node['connect']['node'], id)
param = node_params[connect_to]
cmd = cmd % (param)
cmd = ""
if 'cmd' in node:
cmd = build_cmd(node, i, node_ips, node_params, node_counts)
elif 'playbook' in node:
requirements_path = node['playbook']['requirements']
playbook_path = node['playbook']['path']
cmd = f"source venv/bin/activate && python3 playbooks/{playbook_path}"
if node['connect']['strategy'] == 'plain':
cnt = node_counts[node['connect']['node']]
id = i % cnt
connect_to = '%s_%d' % (node['connect']['node'], id)
ip = node_ips[connect_to]
cmd = cmd + ' ' + ip
else:
print("error: no command or playbook specified")
exit(1)
# cleanup_run = subprocess.run("sudo rm -rf /root/.local/share/iroh", shell=True, capture_output=True)
time.sleep(0.1)
env_vars['SSLKEYLOGFILE']= './logs/keylog_%s_%s.txt' % (prefix, node_name)

temp_dir = tempfile.TemporaryDirectory(prefix='netsim', suffix='{}_{}'.format(prefix, node_name))
temp_dirs.append(temp_dir)
env_vars['IROH_DATA_DIR'] = '{}'.format(temp_dir)
env_vars['IROH_DATA_DIR'] = '{}'.format(temp_dir.name)

p = n.popen(cmd, stdout=f, stderr=f, shell=True, env=env_vars)
if 'process' in node and node['process'] == 'short':
Expand All @@ -126,15 +149,15 @@ def run(nodes, prefix, args, debug=False, full_debug=False, visualize=False):
found = 0
node_name = '%s_%d' % (node['name'], zz)
n = net.get(node_name)
f = open('logs/%s__%s.txt' % (prefix, node_name), 'r')
lines = f.readlines()
fx = open('logs/%s__%s.txt' % (prefix, node_name), 'r')
lines = fx.readlines()
for line in lines:
if node['param_parser'] == 'iroh_ticket':
if line.startswith('All-in-one ticket'):
node_params[node_name] = line[len('All-in-one ticket: '):].strip()
found+=1
break
f.close()
fx.close()
if found == int(node['count']):
done_wait = True
break
Expand Down Expand Up @@ -163,6 +186,9 @@ def run(nodes, prefix, args, debug=False, full_debug=False, visualize=False):
p.terminate()
for p in p_box:
p.terminate()
for f in ftc:
f.flush()
f.close()
net.stop()
sniffer.close()

Expand Down
61 changes: 61 additions & 0 deletions netsim/playbooks/get.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,61 @@
import os
import playbook_client as pbc
import iroh
import time
import argparse
import logging
import sys

iroh.set_log_level(iroh.LogLevel.DEBUG)

# Configure the logging system
logging.basicConfig(
level=logging.INFO,
format="%(asctime)s [%(levelname)s] %(message)s",
stream=sys.stderr
)

parser = argparse.ArgumentParser(description="Test param handler")
parser.add_argument("--syncer", help="Syncer address")
args = parser.parse_args()

# syncer = pbc.Syncer(addr="http://127.0.0.1:8000")
syncer = pbc.Syncer(addr=f"http://{args.syncer}:8000")

IROH_DATA_DIR = os.environ.get("IROH_DATA_DIR")

if IROH_DATA_DIR is None:
logging.info("IROH_DATA_DIR is not set")
exit(1)

node = iroh.IrohNode(IROH_DATA_DIR)
logging.info("Started Iroh node: {}".format(node.node_id()))

syncer.set("srv_node_id", node.node_id())

author = node.author_new()
logging.info("Created author: {}".format(author.to_string()))

ticket = syncer.wait_for("ticket")

doc_ticket = iroh.DocTicket.from_string(ticket)
doc = node.doc_join(doc_ticket)
logging.info("Joined doc: {}".format(doc.id()))

syncer.set("get_ready", 1)

time.sleep(15)

keys = doc.keys()
logging.info("Keys:")
for key in keys:
content = doc.get_content_bytes(key)
logging.info("{} : {} (hash: {})".format(key.key(), content.decode("utf8"), key.hash().to_string()))

logging.info("Done logging.infoing keys...")

syncer.inc("test_done")
time.sleep(5)
syncer.inc("test_done")

logging.info("Done")
54 changes: 54 additions & 0 deletions netsim/playbooks/playbook_client.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,54 @@
import requests
import json
import time
import logging

# The playbook_engine should always be the first node to spawn
default_url = "http://10.0.0.1:8000"

class Syncer:
def __init__(self, addr=default_url, run_id="", tick_interval = 0.5, timeout=60):
self.addr = addr
self.run_id = run_id
self.tick_interval = tick_interval
self.timeout = timeout

def set(self, key, value):
key = f"{self.run_id}_{key}"
data = {"key": key, "value": value}
response = requests.post(self.addr, data=json.dumps(data), headers={"Content-Type": "application/json"})
if response.status_code != 200:
logging.info("Error setting value:", response.text)

def inc(self, key):
key = f"{self.run_id}_{key}"
response = requests.put(f"{self.addr}/{key}")
if response.status_code != 200:
logging.info("Error incrementing value:", response.text)

def wait_for(self, key):
key = f"{self.run_id}_{key}"
start = time.time()
while True:
logging.info("wating 1...")
if time.time() - start > self.timeout:
raise Exception(f"Timeout waiting for {key}")
response = requests.get(f"{self.addr}/{key}")
if response.status_code == 200:
value = json.loads(response.text)
return value
time.sleep(self.tick_interval)

def wait_for_val(self, key, val):
key = f"{self.run_id}_{key}"
start = time.time()
while True:
logging.info("wating 2...")
if time.time() - start > self.timeout:
raise Exception(f"Timeout waiting for {key} to be {val}")
response = requests.get(f"{self.addr}/{key}")
if response.status_code == 200:
value = json.loads(response.text)
if value == val:
return value
time.sleep(self.tick_interval)
105 changes: 105 additions & 0 deletions netsim/playbooks/playbook_engine.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,105 @@
from http.server import BaseHTTPRequestHandler, HTTPServer
import json
from threading import Lock
import logging
import sys

# Configure the logging system
logging.basicConfig(
level=logging.DEBUG,
format="%(asctime)s [%(levelname)s] %(message)s",
stream=sys.stderr
)

# Define the key-value store
store = {}
lock = Lock()

# Define the HTTP request handler
class KeyValueStoreHandler(BaseHTTPRequestHandler):
def do_GET(self):
with lock:
try:
# Get the key from the URL path
key = self.path[1:]

# Get the value from the store
value = store.get(key)

if value is None:
# Send a 404 response
self.send_response(404)
self.end_headers()
return

# Send the value as JSON
self.send_response(200)
self.send_header("Content-type", "application/json")
self.end_headers()
self.wfile.write(json.dumps(value).encode())
except Exception as e:
logging.error(f"unexpected error: {e}")

def do_POST(self):
with lock:
try:
# Get the key and value from the request body
content_length = int(self.headers["Content-Length"])
body = self.rfile.read(content_length)
data = json.loads(body.decode())
key = data["key"]
value = data["value"]

# Set the value in the store
store[key] = value

# Send a success response
self.send_response(200)
self.end_headers()
except:
try:
# Send a 500 response
self.send_response(500)
self.end_headers()
except Exception as e:
logging.error(f"unexpected error: {e}")

def do_PUT(self):
with lock:
try:
# Get the key from the URL path
key = self.path[1:]

# Get the value from the store
value = store.get(key)

if value is None:
value = 0

# Increment the value
value += 1

# Set the new value in the store
store[key] = value

# Send the new value as JSON
self.send_response(200)
self.send_header("Content-type", "application/json")
self.end_headers()
self.wfile.write(json.dumps(value).encode())
except:
try:
# Send a 500 response
self.send_response(500)
self.end_headers()
except Exception as e:
logging.error(f"unexpected error: {e}")

if __name__ == '__main__':
# Define the HTTP server
server_address = ("", 8000)
httpd = HTTPServer(server_address, KeyValueStoreHandler)

# Start the HTTP server
logging.info("Starting HTTP server...")
httpd.serve_forever()
2 changes: 2 additions & 0 deletions netsim/playbooks/requirements.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
# iroh==0.2.0
requests==2.31.0
Loading