Skip to content

Commit

Permalink
Merge pull request #60 from multiversx/fix-random-port-and-github-action
Browse files Browse the repository at this point in the history
Fix random port and new Github action
  • Loading branch information
iulianpascalau authored Jul 15, 2024
2 parents 508dc34 + abb1632 commit ce4b9ba
Show file tree
Hide file tree
Showing 3 changed files with 210 additions and 14 deletions.
35 changes: 35 additions & 0 deletions .github/workflows/run-multiple-instances.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
name: Run multiple simulators

on:
push:
branches: [ main, feat/*, rc/* ]
pull_request:
branches: [ main, feat/*, rc/* ]

jobs:
build:
name: Run instances
runs-on: ubuntu-latest
steps:
- name: Set up Go 1.x
uses: actions/setup-go@v2
with:
go-version: 1.20.7
id: go

- name: Check out code into the Go module directory
uses: actions/checkout@v2

- name: Get dependencies
run: |
go get -v -t -d ./...
if [ -f Gopkg.toml ]; then
curl https://raw.githubusercontent.com/golang/dep/master/install.sh | sh
dep ensure
fi
- name: Run multiple instances
run: |
cd cmd/chainsimulator
go build
./chainsimulator --fetch-configs-and-close
python3 ../../scripts/run-multiple-instances/script.py
26 changes: 12 additions & 14 deletions cmd/chainsimulator/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -162,18 +162,6 @@ func startChainSimulator(ctx *cli.Context) error {

localRestApiInterface := "localhost"
apiConfigurator := api.NewFreePortAPIConfigurator(localRestApiInterface)
proxyPort := cfg.Config.Simulator.ServerPort
proxyURL := fmt.Sprintf("%s:%d", localRestApiInterface, proxyPort)
if proxyPort == 0 {
proxyURL = apiConfigurator.RestApiInterface(0)
portString := proxyURL[len(localRestApiInterface)+1:]
port, errConvert := strconv.Atoi(portString)
if errConvert != nil {
return fmt.Errorf("internal error while searching a free port for the proxy component: %w", errConvert)
}
proxyPort = port
}

startTimeUnix := ctx.GlobalInt64(startTime.Name)

tempDir, err := os.MkdirTemp(os.TempDir(), "")
Expand Down Expand Up @@ -228,16 +216,26 @@ func startChainSimulator(ctx *cli.Context) error {
outputProxyConfigs, err := configs.CreateProxyConfigs(configs.ArgsProxyConfigs{
TemDir: tempDir,
PathToProxyConfig: proxyConfigs,
ServerPort: proxyPort,
RestApiInterfaces: restApiInterfaces,
InitialWallets: simulator.GetInitialWalletKeys().BalanceWallets,
})
if err != nil {
return err
}

time.Sleep(time.Second)
proxyPort := cfg.Config.Simulator.ServerPort
proxyURL := fmt.Sprintf("%s:%d", localRestApiInterface, proxyPort)
if proxyPort == 0 {
proxyURL = apiConfigurator.RestApiInterface(0)
portString := proxyURL[len(localRestApiInterface)+1:]
port, errConvert := strconv.Atoi(portString)
if errConvert != nil {
return fmt.Errorf("internal error while searching a free port for the proxy component: %w", errConvert)
}
proxyPort = port
}

outputProxyConfigs.Config.GeneralSettings.ServerPort = proxyPort
outputProxy, err := creator.CreateProxy(creator.ArgsProxy{
Config: outputProxyConfigs.Config,
NodeHandler: metaNode,
Expand Down
163 changes: 163 additions & 0 deletions scripts/run-multiple-instances/script.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,163 @@
import queue
import subprocess
import sys
import threading
import requests
import re
from concurrent.futures import ThreadPoolExecutor, as_completed


def enqueue_output(pipe, q):
for line in iter(pipe.readline, b''):
q.put(line)
pipe.close()


def extract_port_from_process(proc, index):
port_pattern = re.compile(r'INFO.*chain simulator\'s is accessible through the URL localhost:(\d+)')
ansi_escape = re.compile(r'\x1B[@-_][0-?]*[ -/]*[@-~]')

q = queue.Queue()

# Start threads to read stdout and stderr
threading.Thread(target=enqueue_output, args=(proc.stdout, q)).start()
threading.Thread(target=enqueue_output, args=(proc.stderr, q)).start()

while True:
try:
line = q.get_nowait() # Get line from the queue non-blocking
except queue.Empty:
if proc.poll() is not None:
break # Process has finished and queue is empty
continue

# Decode the line and remove ANSI escape sequences
line = line.decode('utf-8').strip()
cleaned_line = ansi_escape.sub('', line)
# Search for the port number
match = port_pattern.search(cleaned_line)
if match:
return match.group(1)

return None


def find_and_print_duplicates(arr):
seen = set()
duplicates = set()

for num in arr:
if num in seen:
duplicates.add(num)
else:
seen.add(num)

if duplicates:
print(f"Duplicated values: {', '.join(map(str, duplicates))}")
return True
else:
return False


def start_instance(index, used_ports):
print(f"Start process index={index}")
proc = subprocess.Popen(['./chainsimulator', '--server-port', "0"],
stdout=subprocess.PIPE,
stderr=subprocess.PIPE)

port = extract_port_from_process(proc, index)
used_ports.add(port)

print(f"Started instance of chain simulator - index={index}, port={port}")

return proc, port


def start_instances(num_instances):
processes = []
used_ports = set()

with ThreadPoolExecutor(max_workers=num_instances) as executor:
futures = {executor.submit(start_instance, i, used_ports): i for i in range(num_instances)}
for future in as_completed(futures):
proc, port = future.result()
processes.append((proc, port))

res= find_and_print_duplicates(used_ports)

print(f"Duplicated ports={res}")

return processes


def get_api_response(url):
try:
response = requests.get(url)
if response.status_code == 200:
return response.json()
else:
print(f"Failed to get response: {response.status_code}, url:{url}")
sys.exit(1)
except Exception as e:
print(f"Error getting API response: {e}")
sys.exit(1)


def post_generate_blocks(port):
try:
print(f"Generating blocks for chain simulator instance with port={port}")

api_url = f"http://localhost:{port}/simulator/generate-blocks/10"
response = requests.post(api_url)
if response.status_code == 200:
return True
else:
print(f"Failed to generate blocks: {response.status_code}")
sys.exit(1)
except Exception as e:
print(f"Error generating blocks: {e}")
sys.exit(1)


def check_for_duplicate_ports(processes):
ports = {}
duplicates = []
for proc, port in processes:
api_url = f"http://localhost:{port}/simulator/observers" # Adjust the URL pattern as needed
response = get_api_response(api_url)
if response:
for key, value in response.get("data", {}).items():
api_port = value.get("api-port")
if api_port in ports:
duplicates.append(api_port)
else:
ports[api_port] = 1
return duplicates


def terminate_instances(processes):
for proc, _ in processes:
proc.terminate()


def main():
num_instances = 100

print("Starting instances...")
processes = start_instances(num_instances)

print("Collecting API responses and checking for duplicate ports...")
duplicates = check_for_duplicate_ports(processes)

if duplicates:
print(f"Duplicate ports found: {duplicates}")
sys.exit(1)
else:
print("No duplicate ports found.")

print("Terminating instances...")
terminate_instances(processes)


if __name__ == "__main__":
main()

0 comments on commit ce4b9ba

Please sign in to comment.