diff --git a/.github/pull_request_template.md b/.github/pull_request_template.md new file mode 100644 index 0000000..14cba97 --- /dev/null +++ b/.github/pull_request_template.md @@ -0,0 +1,12 @@ +## Description + +Add your description here. + +## Merge request checklist + +- [ ] Added the appropriate `bug`, `enhancement` and/or `breaking-change` tags +- [ ] My code passes all tests, linter and formatting checks (`make check`). +- [ ] My commits follow the [How to Write a Git Commit Message Guide](https://chris.beams.io/posts/git-commit/). +- [ ] I have updated the `CHANGELOG` (yeah, last chance to do it). +- [ ] Run the `python make.py onepack` and add the new luxos.pyz/health-checker.pyz + generated files diff --git a/.github/workflows/pull-python-luxos.yml b/.github/workflows/pull-python-luxos.yml new file mode 100644 index 0000000..a5d9107 --- /dev/null +++ b/.github/workflows/pull-python-luxos.yml @@ -0,0 +1,86 @@ +name: LuxOS Pull-Request + +on: + pull_request: + branches: + - main + +concurrency: ${{ github.workflow }}-${{ github.ref }} + +env: + PACKAGE: luxos + +jobs: + build: + strategy: + matrix: + os: [ubuntu-latest, macos-latest, windows-latest] + python-version: ["3.9", "3.10", "3.11", "3.12"] + runs-on: ${{ matrix.os }} + + steps: + - name: Set up runner + run: echo noop + + - name: Checkout + uses: actions/checkout@v4.1.0 + with: + ref: ${{ github.event.pull_request.head.sha }} + + - name: Setup Python toolchain + uses: actions/setup-python@v5 + with: + python-version: ${{ matrix.python-version }} + + - name: Python dependencies + shell: bash + run: | + python -m pip install --upgrade pip + pip install -r tests/requirements.txt + + - name: Run Python checks (ruff) + shell: bash + env: + PYTHONPATH: src + run: | + ruff check src tests + + - name: Run Python checks (mypy) + shell: bash + env: + PYTHONPATH: src + OUTDIR: build/qa-${{ matrix.python-version }}-${{ matrix.os}} + run: | + mypy src \ + --no-incremental --xslt-html-report $OUTDIR/mypy + + - name: Run Python checks + shell: bash + env: + PYTHONPATH: src + OUTDIR: build/qa-${{ matrix.python-version }}-${{ matrix.os}} + run: | + py.test \ + --cov=${{ env.PACKAGE }} \ + --cov-report=html:$OUTDIR/coverage --cov-report=xml:$OUTDIR/coverage.xml \ + --junitxml=$OUTDIR/junit/junit.xml --html=$OUTDIR/junit/junit.html --self-contained-html \ + tests + + - name: Build wheel packages + if: ${{ ! contains(matrix.os, 'windows') }} + env: + GITHUB_DUMP: ${{ toJson(github) }} + run: | + python -m build + touch .keepme + + - name: Archive artifacts + uses: actions/upload-artifact@v4 + with: + name: qa-results-${{ matrix.python-version }}-${{ matrix.os }} + path: | + build/qa-${{ matrix.python-version }}-${{ matrix.os}} + dist + .keepme + # Use always() to always run this step to publish test results when there are test failures + if: always() diff --git a/.github/workflows/push-main.yml b/.github/workflows/push-main.yml new file mode 100644 index 0000000..bcb5ebb --- /dev/null +++ b/.github/workflows/push-main.yml @@ -0,0 +1,88 @@ +name: Master build + +on: + push: + branches: + - main + tags: + - '*' + +concurrency: ${{ github.workflow }}-${{ github.ref }} + +env: + PACKAGE: luxos + +jobs: + build: + strategy: + matrix: + os: [ubuntu-latest, macos-latest, windows-latest] + python-version: ["3.9", "3.10", "3.11", "3.12"] + runs-on: ${{ matrix.os }} + + steps: + - name: Set up runner + run: echo noop + + - name: Checkout + uses: actions/checkout@v4.1.0 + with: + ref: ${{ github.event.push.ref }} + + - name: Setup Python toolchain + uses: actions/setup-python@v5 + with: + python-version: ${{ matrix.python-version }} + + - name: Python dependencies + shell: bash + run: | + python -m pip install --upgrade pip + pip install -r tests/requirements.txt + + - name: Run Python checks (ruff) + shell: bash + env: + PYTHONPATH: src + run: | + ruff check src tests + + - name: Run Python checks (mypy) + shell: bash + env: + PYTHONPATH: src + OUTDIR: build/qa-${{ matrix.python-version }}-${{ matrix.os}} + run: | + mypy src \ + --no-incremental --xslt-html-report $OUTDIR/mypy + + - name: Run Python checks + shell: bash + env: + PYTHONPATH: src + OUTDIR: build/qa-${{ matrix.python-version }}-${{ matrix.os}} + run: | + py.test \ + --cov=${{ env.PACKAGE }} \ + --cov-report=html:$OUTDIR/coverage --cov-report=xml:$OUTDIR/coverage.xml \ + --junitxml=$OUTDIR/junit/junit.xml --html=$OUTDIR/junit/junit.html --self-contained-html \ + tests + + - name: Build wheel packages + if: ${{ ! contains(matrix.os, 'windows') }} + env: + GITHUB_DUMP: ${{ toJson(github) }} + run: | + python -m build + touch .keepme + + - name: Archive artifacts + uses: actions/upload-artifact@v4 + with: + name: qa-results-${{ matrix.python-version }}-${{ matrix.os }} + path: | + build/qa-${{ matrix.python-version }}-${{ matrix.os}} + dist + .keepme + # Use always() to always run this step to publish test results when there are test failures + if: always() diff --git a/.gitignore b/.gitignore new file mode 100644 index 0000000..f93c0ce --- /dev/null +++ b/.gitignore @@ -0,0 +1,37 @@ +/myenv +/build +/dist + + +# Compiled Python files +__pycache__/ +*.pyc +*.pyo +*.pyd + +# Python virtual environment +venv/ +env/ +*.env + +# System, editor and IDE files +.vscode +.work +.ci-cargo +.idea/ +*.swp +*.swo +*~ +.project +*.pyproj +.DS_Store +Thumbs.db + +# Ignore common temporary files +*.log +*.csv +tmp/ +*.tmp + +# Sonar artifacts +clippy.log diff --git a/.pre-commit-config.yaml b/.pre-commit-config.yaml new file mode 100644 index 0000000..6d6ae06 --- /dev/null +++ b/.pre-commit-config.yaml @@ -0,0 +1,49 @@ +# - repo: https://github.com/astral-sh/ruff-pre-commit +# rev: v0.3.4 +# hooks: +# - id: ruff +# args: [--fix, --exit-non-zero-on-fix] +# +# - repo: https://github.com/pre-commit/mirrors-mypy +# rev: v1.9.0 +# hooks: +# - id: mypy +# exclude: ^(tests) + +repos: + - repo: local + hooks: + # repo: https://github.com/astral-sh/ruff-pre-commit + - id: ruff + name: ruff + description: "Run 'ruff' for extremely fast Python linting" + entry: ruff check --force-exclude + language: system + types_or: [python, pyi] + args: [] + require_serial: true + additional_dependencies: [] + minimum_pre_commit_version: "2.9.2" + + - id: ruff-format + name: ruff-format + description: "Run 'ruff format' for extremely fast Python formatting" + entry: ruff format --force-exclude + language: system + types_or: [python, pyi] + args: [] + require_serial: true + additional_dependencies: [] + minimum_pre_commit_version: "2.9.2" + + # https://github.com/pre-commit/mirrors-mypy + - id: mypy + name: mypy + description: '' + entry: mypy + language: system + types_or: [python, pyi] + args: ["--ignore-missing-imports", "--scripts-are-modules"] + require_serial: true + additional_dependencies: [] + minimum_pre_commit_version: '2.9.2' \ No newline at end of file diff --git a/CHANGELOG.md b/CHANGELOG.md new file mode 100644 index 0000000..cddaebc --- /dev/null +++ b/CHANGELOG.md @@ -0,0 +1,11 @@ +# LUXOS CHANGELOG + +All notable changes to this project will be documented in this file. + diff --git a/DEVELOP.md b/DEVELOP.md new file mode 100644 index 0000000..46041d0 --- /dev/null +++ b/DEVELOP.md @@ -0,0 +1,82 @@ +# Develop + +These are the developer's note, to develop luxos. + +> **NOTE** We assume we're working inside the checked out +> `firmware-biz-tools` directory, and you have python (>=3.10) +> in your path. + +## Setup + +These instructions are useful for a quick start, and +you should be able to: + +- **SETUP** the environment (this is done once) +
Windows + + ```shell + python -m venv myenv + .\myenv\Scripts\activate + + pip install -r tests\requirements.txt + pip install -e . + ``` +
+ +
*NIX + + ```shell + python -m venv myenv + source ./myenv/bin/activate + + pip install -r tests\requirements.txt + pip install -e . + ``` + +- **ACTIVATE** the environment (each time you start a new shell) +
Windows + + ```shell + .\myenv\Scripts\activate + ``` +
+
*NIX + + ```shell + source ./myenv/bin/activate + ``` +
+ +- **RUN** the tests + + (Windows & *NIX) + ```shell + pytest -vvs tests + ``` + +## Coding + +### Precommit +When it comes to coding, you can use [pre-commit](https://pre-commit.com/) hooks +to help you validate code at every git commit. + +- **ENABLE** precommit: + ```shell + pre-commit install + ``` + +- **DISABLE** precommit: + ```shell + pre-commit uninstall + ``` + +- **SKIP CHECKS** during git commit: + Use the `-n` flag: + ```shell + git commit -n .... + ``` + + + +At every `git commit` code scanner [ruff](https://github.com/astral-sh/ruff) and +[mypy](https://mypy-lang.org) will run. diff --git a/README.md b/README.md index ba7c634..36fb271 100644 --- a/README.md +++ b/README.md @@ -1,8 +1,42 @@ # LuxOS Tools Repository +> **WARNING** There are references into the PR luxos-code-refactoring + This repository contains scripts we built to operate and troubleshoot miners running LuxOS. -## LuxOS API Wrapper - luxos.py +## Install + +There are few ways to install the luxos package: + +1. Using pip (suggested for end-users): + ```shell + pip install luxos + pip install git+https://github.com/LuxorLabs/luxos-tooling.git@pr/luxos-code-refactoring + ``` + Using pip gives you access to the cli commands `luxos` and `health-checker` as well + the ability to import in python the `import luxos.api` api for luxos. + +2. A single drop in file (for support): + ```shell + curl -LO https://github.com/LuxorLabs/luxos-tooling/raw/pr/luxos-code-refactoring/luxos.pyz + ``` + These are two standalone [zipapp](https://docs.python.org/3/library/zipapp.html) files, you can use + from the command line as `python luxos.pyz`, no dependencies beside a recent-*ish* python + version (eg. >= 3.10) + +3. From the [github](https://github.com/LuxorLabs/luxos-tooling) source checkout (for devs): + ```shell + python -m venv venv + source venv/bin/activate # for Windows: .\myenv\Scripts\activate) + + pip install -r tests/requirements.txt + + export PYTHONPATH=$(pwd)/src # for Windows: SET PYTHONPATH=%CD%\src + (or) + pip install -e . + ``` + +## LuxOS API Wrapper - luxos This tool offers a convenient way to interact with LuxOS through a command-line interface (CLI) or as Python packages for more advanced integrations. @@ -11,28 +45,33 @@ This tool offers a convenient way to interact with LuxOS through a command-line The luxos.py script serves as a versatile LuxOS API wrapper, allowing users to interact with LuxOS features directly from the command line. Below are some basic examples: ```bash -python3 luxos.py --ipfile miners.csv --cmd rebootdevice --timeout 2 -python3 luxos.py --range_start 192.168.1.0 --range_end 192.168.1.255 --cmd rebootdevice --verbose True +python3 -m luxos --ipfile miners.csv --cmd rebootdevice --timeout 2 +python3 -m luxos --range_start 192.168.1.0 --range_end 192.168.1.255 --cmd rebootdevice --verbose True ``` +> **NOTE** Please don't forget to set the PYTHONPATH. + **Library Usage** If you prefer to integrate LuxOS functionality into your Python applications or scripts, luxos.py can also be used as a Python package. Here's a quick example: ```python -from luxos import (execute_command) +from luxos.api import (execute_command) -execute_command('192.168.1.1', 4028, 2, 'rebootdevice', '', False) +execute_command("192.168.1.1", 4028, 2, "rebootdevice", "", False) ``` ## LuxOS HealthChecker - health_checker.py The HealthChecker script is designed to continuously pull miner data from LuxOS, providing valuable insights into the health of your mining machines. -The HealthChecker uses poetry as a package manager, to install project dependencies run: `poetry install`. You can customize the HealthChecker params using the `config.yaml` file provided. Finally, to run the HealthChecker you can run: `poetry run python health_checker.py`. +You can customize the HealthChecker params using the `config.yaml` file provided. +To run the HealthChecker you can use `health-checker` if you installed using pip, or +the cli `python3 -m luxos.scripts.health_checker`. --- -Feel free to explore and customize these tools to suit your specific needs. If you encounter any issues or have suggestions for improvement, please open an issue or submit a pull request. +Feel free to explore and customize these tools to suit your specific needs. +If you encounter any issues or have suggestions for improvement, please open an issue or submit a pull request. You can find LuxOS API documentation [here](https://docs.luxor.tech/firmware/api/intro). diff --git a/health-checker.pyz b/health-checker.pyz new file mode 100644 index 0000000..ca1fe4e Binary files /dev/null and b/health-checker.pyz differ diff --git a/luxos.py b/luxos.py deleted file mode 100644 index a513c76..0000000 --- a/luxos.py +++ /dev/null @@ -1,547 +0,0 @@ -import os -import csv -import json -import time -import socket -import logging -import argparse -import ipaddress -import threading -from typing import Any - -COMMANDS = { - "addgroup": { - "logon_required": False - }, - "addpool": { - "logon_required": False - }, - "asc": { - "logon_required": False - }, - "asccount": { - "logon_required": False - }, - "atm": { - "logon_required": False - }, - "atmset": { - "logon_required": True - }, - "check": { - "logon_required": False - }, - "coin": { - "logon_required": False - }, - "config": { - "logon_required": False - }, - "curtail": { - "logon_required": True - }, - "devdetails": { - "logon_required": False - }, - "devs": { - "logon_required": False - }, - "disablepool": { - "logon_required": False - }, - "edevs": { - "logon_required": False - }, - "enablepool": { - "logon_required": False - }, - "estats": { - "logon_required": False - }, - "fans": { - "logon_required": False - }, - "fanset": { - "logon_required": True - }, - "frequencyget": { - "logon_required": False - }, - "frequencyset": { - "logon_required": True - }, - "frequencystop": { - "logon_required": True - }, - "groupquota": { - "logon_required": False - }, - "healthchipget": { - "logon_required": False - }, - "healthchipset": { - "logon_required": True - }, - "healthctrl": { - "logon_required": False - }, - "healthctrlset": { - "logon_required": True - }, - "immersionswitch": { - "logon_required": True - }, - "kill": { - "logon_required": False - }, - "lcd": { - "logon_required": False - }, - "limits": { - "logon_required": False - }, - "logoff": { - "logon_required": True - }, - "logon": { - "logon_required": False - }, - "netset": { - "logon_required": True - }, - "pools": { - "logon_required": False - }, - "power": { - "logon_required": False - }, - "profilenew": { - "logon_required": True - }, - "profilerem": { - "logon_required": True - }, - "profiles": { - "logon_required": False - }, - "profileset": { - "logon_required": True - }, - "reboot": { - "logon_required": True - }, - "rebootdevice": { - "logon_required": True - }, - "removegroup": { - "logon_required": False - }, - "removepool": { - "logon_required": True - }, - "resetminer": { - "logon_required": True - }, - "session": { - "logon_required": False - }, - "stats": { - "logon_required": False - }, - "summary": { - "logon_required": False - }, - "switchpool": { - "logon_required": False - }, - "tempctrl": { - "logon_required": False - }, - "tempctrlset": { - "logon_required": True - }, - "temps": { - "logon_required": False - }, - "updaterun": { - "logon_required": True - }, - "updateset": { - "logon_required": True - }, - "version": { - "logon_required": False - }, - "voltageget": { - "logon_required": False - }, - "voltageset": { - "logon_required": True - }, - "profileget": { - "logon_required": False - }, - "logset": { - "logon_required": True - }, - "tempsensor": { - "logon_required": False - }, - "tempsensorset": { - "logon_required": True - }, - "uninstallluxos": { - "logon_required": True - } -} - -# Set up logging configuration -logging.basicConfig( - level=logging.INFO, - format="%(asctime)s [%(levelname)s] %(message)s", - handlers=[logging.StreamHandler(), - logging.FileHandler("LuxOS-CLI.log")], -) - - -# internal_send_cgminer_command sends a command to the cgminer API server and returns the response. -def internal_send_cgminer_command(host: str, port: int, command: str, - timeout_sec: int, verbose: bool) -> str: - - # Create a socket connection to the server - with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as sock: - try: - # set timeout - sock.settimeout(timeout_sec) - - # Connect to the server - sock.connect((host, port)) - - # Send the command to the server - sock.sendall(command.encode()) - - # Receive the response from the server - response = bytearray() - while True: - # Read one byte at a time so we can wait for the null terminator. - # this is to avoid waiting for the timeout as we don't know how long - # the response will be and socket.recv() will block until reading - # the specified number of bytes. - try: - data = sock.recv(1) - except socket.timeout: - # Timeout occurred, check if we have any data so far - if len(response) == 0: - raise ValueError("timeout waiting for data") - else: - break - if not data: - break - null_index = data.find(b'\x00') - if null_index >= 0: - response += data[:null_index] - break - response += data - - # Parse the response JSON - r = json.loads(response.decode()) - if verbose: - logging.info(r) - return r - - except socket.error as e: - raise e - - -# send_cgminer_command sends a command to the cgminer API server and returns the response. -def send_cgminer_command(host: str, port: int, cmd: str, param: str, - timeout: int, verbose: bool) -> str: - req = str(f"{{\"command\": \"{cmd}\", \"parameter\": \"{param}\"}}\n") - if verbose: - logging.info( - f"Executing command: {cmd} with params: {param} to host: {host}") - - return internal_send_cgminer_command(host, port, req, timeout, verbose) - - -# send_cgminer_simple_command sends a command with no params to the miner and returns the response. -def send_cgminer_simple_command(host: str, port: int, cmd: str, timeout: int, - verbose: bool) -> str: - req = str(f"{{\"command\": \"{cmd}\"}}\n") - if verbose: - logging.info(f"Executing command: {cmd} to host: {host}") - return internal_send_cgminer_command(host, port, req, timeout, verbose) - - -# check_res_structure checks that the response has the expected structure. -def check_res_structure(res: str, structure: str, min: int, max: int) -> str: - # Check the structure of the response. - if structure not in res or "STATUS" not in res or "id" not in res: - raise ValueError("error: invalid response structure") - - # Should we check min and max? - if min >= 0 and max >= 0: - # Check the number of structure elements. - if not (min <= len(res[structure]) <= max): - raise ValueError( - f"error: unexpected number of {structure} in response; min: {min}, max: {max}, actual: {len(res[structure])}" - ) - - # Should we check only min? - if min >= 0: - # Check the minimum number of structure elements. - if len(res[structure]) < min: - raise ValueError( - f"error: unexpected number of {structure} in response; min: {min}, max: {max}, actual: {len(res[structure])}" - ) - - # Should we check only max? - if max >= 0: - # Check the maximum number of structure elements. - if len(res[structure]) < min: - raise ValueError( - f"error: unexpected number of {structure} in response; min: {min}, max: {max}, actual: {len(res[structure])}" - ) - - return res - - -# get_str_field tries to get the field as a string and returns it. -def get_str_field(struct: str, name: str) -> str: - try: - s = str(struct[name]) - except Exception as e: - raise ValueError(f"error: invalid {name} str field: {e}") - - return s - - -def logon(host: str, port: int, timeout: int, verbose: bool) -> str: - # Send 'logon' command to cgminer and get the response - res = send_cgminer_simple_command(host, port, "logon", timeout, verbose) - - # Check if the response has the expected structure - check_res_structure(res, "SESSION", 1, 1) - - # Extract the session data from the response - session = res["SESSION"][0] - - # Get the 'SessionID' field from the session data - s = get_str_field(session, "SessionID") - - # If 'SessionID' is empty, raise an error indicating invalid session id - if s == "": - raise ValueError("error: invalid session id") - - # Return the extracted 'SessionID' - return s - - -def logon_required(cmd: str, commands_list=COMMANDS) -> bool: - # Check if the command requires logon to LuxOS API - user_cmd = None - - keys = commands_list.keys() - for key in keys: - if key == cmd: - user_cmd = cmd - break - - if user_cmd == None: - logging.info( - f"{cmd} command is not supported. Try again with a different command." - ) - return - return commands_list[cmd]['logon_required'] - - -def add_session_id_parameter(session_id, parameters): - # Add the session id to the parameters - return [session_id, *parameters] - - -def parameters_to_string(parameters): - # Convert the parameters to a string that LuxOS API accepts - return ",".join(parameters) - - -def generate_ip_range(start_ip, end_ip): - # Generate a list of IP addresses from the start and end IP - start = ipaddress.IPv4Address(start_ip) - end = ipaddress.IPv4Address(end_ip) - - ip_list = [] - - while start <= end: - ip_list.append(str(start)) - start += 1 - - return ip_list - - -def load_ip_list_from_csv(csv_file): - # Check if file exists - if not os.path.exists(csv_file): - logging.info(f"Error: {csv_file} file not found.") - exit(1) - - # Load the IP addresses from the CSV file - ip_list = [] - with open(csv_file, 'r') as f: - reader = csv.reader(f) - for i, row in enumerate(reader): - if i == 0 and row and row[0] == "hostname": - continue - if row: # Ignore empty rows - ip_list.extend(row) - return ip_list - - -def execute_command(host: str, port: int, timeout_sec: int, cmd: str, - parameters: list, verbose: bool): - - # Check if logon is required for the command - logon_req = logon_required(cmd) - - try: - if logon_req: - # Get a SessionID - sid = logon(host, port, timeout_sec, verbose) - # Add the SessionID to the parameters list at the left. - parameters = add_session_id_parameter(sid, parameters) - - if verbose: - logging.info( - f'Command requires a SessionID, logging in for host: {host}' - ) - logging.info(f'SessionID obtained for {host}: {sid}') - - elif not logon_required and verbose: - logging.info(f"Logon not required for executing {cmd}") - - # convert the params to a string that LuxOS API accepts - param_string = parameters_to_string(parameters) - - if verbose: - logging.info(f"{cmd} on {host} with parameters: {param_string}") - - # Execute the API command - res = send_cgminer_command(host, port, cmd, param_string, timeout_sec, - verbose) - - if verbose: - logging.info(res) - - # Log off to terminate the session - if logon_req: - send_cgminer_command(host, port, "logoff", sid, timeout_sec, - verbose) - - return res - - except Exception as e: - logging.info(f"Error executing {cmd} on {host}: {e}") - - -if __name__ == "__main__": - - # define arguments - parser = argparse.ArgumentParser(description="LuxOS CLI Tool") - parser.add_argument('--range_start', required=False, help="IP start range") - parser.add_argument('--range_end', required=False, help="IP end range") - parser.add_argument('--ipfile', - required=False, - default='ips.csv', - help="File name to store IP addresses") - parser.add_argument('--cmd', - required=True, - help="Command to execute on LuxOS API") - parser.add_argument('--params', - required=False, - default=[], - nargs='+', - help="Parameters for LuxOS API") - parser.add_argument( - '--max_threads', - required=False, - default=10, - type=int, - help="Maximum number of threads to use. Default is 10.") - parser.add_argument('--timeout', - required=False, - default=3, - type=int, - help="Timeout for network scan in seconds") - parser.add_argument('--port', - required=False, - default=4028, - type=int, - help="Port for LuxOS API") - parser.add_argument('--verbose', - required=False, - default=False, - type=bool, - help="Verbose output") - parser.add_argument('--batch_delay', - required=False, - default=0, - type=int, - help="Delay between batches in seconds") - - # parse arguments - args = parser.parse_args() - - # set timeout to milliseconds - timeout_sec = args.timeout - - # check if IP address range or CSV with list of IP is provided - if args.range_start and args.range_end: - ip_list = generate_ip_range(args.range_start, args.range_end) - elif args.ipfile: - ip_list = load_ip_list_from_csv(args.ipfile) - else: - logging.info("No IP address or IP list found.") - exit(1) - - # Set max threads to use, minimum of max threads and number of IP addresses - max_threads = min(args.max_threads, len(ip_list)) - - # Create a list of threads - threads = [] - - # Set start time - start_time = time.time() - - # Iterate over the IP addresses - for ip in ip_list: - # create new thread for each IP address - thread = threading.Thread(target=execute_command, - args=(ip, args.port, timeout_sec, args.cmd, - args.params, args.verbose)) - - # start the thread - threads.append(thread) - thread.start() - - # Limit the number of concurrent threads - if len(threads) >= max_threads: - # Wait for the threads to finish - for thread in threads: - thread.join() - - # Introduce the batch delay if specified - if args.batch_delay > 0: - print(f"Waiting {args.batch_delay} seconds") - time.sleep(args.batch_delay) - - # Clear the thread list for the next batch - threads = [] - - # Wait for the remaining threads to finish - for thread in threads: - thread.join() - - # Execution completed - end_time = time.time() - execution_time = end_time - start_time - logging.info(f"Execution completed in {execution_time:.2f} seconds.") diff --git a/luxos.pyz b/luxos.pyz new file mode 100644 index 0000000..996e562 Binary files /dev/null and b/luxos.pyz differ diff --git a/make.py b/make.py new file mode 100644 index 0000000..007b24b --- /dev/null +++ b/make.py @@ -0,0 +1,127 @@ +#!/usr/bin/env python +"""A make-like script""" +import argparse +import contextlib +import os +import sys +import logging +import functools +import subprocess +from pathlib import Path + + +def task(name=None): + """task decorator + + This decorator will run the decorated function so: + 1. change the current directory to this script + directory + 2. call the decorated task + 3. return to the (eventual) starting dir + + The wrapped function can have few `magic` arguments: + - args : this is the sys.argv[2:] list + - parser : this will be a argparse.ArgumentParser ready + - workdir : in case you dont want to auto cd into workdir + """ + def _task1(fn): + @functools.wraps(fn) + def _task(workdir, args): + from inspect import signature + cwd = Path.cwd() + try: + kwargs = {} + if "args" in signature(fn).parameters: + kwargs["args"] = args + if "parser" in signature(fn).parameters: + class F(argparse.ArgumentDefaultsHelpFormatter): + pass + kwargs["parser"] = argparse.ArgumentParser( + formatter_class=F, description=_task.description) + if "workdir" in signature(fn).parameters: + kwargs["workdir"] = workdir + else: + os.chdir(workdir) + return fn(**kwargs) + finally: + os.chdir(cwd) + + _task.task = name or fn.__name__ + _task.description = ( + fn.__doc__.strip().partition("\n")[0] if fn.__doc__ else "no help available" + ) + return _task + + return _task1 + + +@task(name=None) +def onepack(parser, args, workdir): + """create a one .pyz single file package""" + from zipapp import create_archive + from configparser import ConfigParser, ParsingError + + config = ConfigParser(strict=False) + with contextlib.suppress(ParsingError): + config.read(workdir / "pyproject.toml") + + targets = [] + section = "project.scripts" + for target in config.options(section): + entrypoint = config.get(section, target).strip("'").strip('"') + targets.append((f"{target}.pyz", entrypoint)) + + parser.add_argument("-o", "--output-dir", + default=workdir, type=Path) + o = parser.parse_args(args) + + for target, entrypoint in targets: + dst = o.output_dir / target + create_archive( + workdir / "src", + dst, + main=entrypoint, + compressed=True + ) + relpath = ( + dst.relative_to(Path.cwd()) + if dst.is_relative_to(Path.cwd()) + else dst + ) + print(f"Written: {relpath}", file=sys.stderr) + + +@task() +def checks(): + """runs all checks on code base""" + subprocess.check_call(["ruff", "check", "src", "tests"], cwd=workdir) + + +@task() +def tests(): + workdir = Path.cwd() + subprocess.check_call( + ["pytest", "-vvs", str(workdir / "tests") ] + ) + + +if __name__ == "__main__": + logging.basicConfig(level=logging.DEBUG) + tasks = [ item for item in list(locals().values()) if getattr(item, "task", None)] + + if len(sys.argv) < 2 or sys.argv[1] not in [t.task for t in tasks]: + txt = "\n".join(f" {task.task} - {task.description}" for task in tasks) + print( # noqa: T201 + f"""\ +make.py {{arguments}} + +Commands: +{txt} +""", + file=sys.stderr, + ) + sys.exit() + + workdir = Path(__file__).parent + function = [t for t in tasks if sys.argv[1] == t.task][0] + function(workdir, args=sys.argv[2:]) diff --git a/pyproject.toml b/pyproject.toml new file mode 100644 index 0000000..83cd7a3 --- /dev/null +++ b/pyproject.toml @@ -0,0 +1,80 @@ +[build-system] +requires = ["hatchling>=1.1.0",] +build-backend = "hatchling.build" + +[project] +name = "luxos" +version = "0.0.0" +description = "The all encompassing LuxOS python library." +readme = "README.md" +license = { text = "MIT" } # TODO I don't think this is a MIT?? +requires-python = ">= 3.9" + +authors = [ + { name = "Antonio Cavallo", email = "antonio.cavallo@luxor.tech" }, +] + +# TODO more classifiers +classifiers = [ + "Development Status :: 4 - Beta", + "Programming Language :: Python", + "Programming Language :: Python :: 3.8", + "Programming Language :: Python :: 3.9", + "Programming Language :: Python :: 3.10", + "Programming Language :: Python :: 3.11", + "Programming Language :: Python :: 3.12", +] + +dependencies = [ + "tqdm", + "asyncpg", + "pandas", + "pyyaml", + "httpx" +] + +[project.urls] +Source = "https://github.com/LuxorLabs/firmware-biz-tools" +Issues = "https://github.com/LuxorLabs/firmware-biz-tools/issues" + +[project.scripts] +luxos = "luxos.scripts.luxos:main" +health-checker = "luxos.scripts.health_checker:main" + +[tool.ruff] +target-version = "py39" +line-length = 88 + + +[tool.ruff.format] +quote-style = "double" + +[tool.mypy] +disallow_untyped_defs = false +follow_imports = "normal" +ignore_missing_imports = true +pretty = true +show_column_numbers = true +show_error_codes = true +warn_no_return = false +warn_unused_ignores = true + +[tool.coverage.run] +branch = true + +[tool.coverage.paths] +source = [ + "src/", +] + +[tool.coverage.report] +exclude_lines = [ + "no cov", + "if __name__ == .__main__.:", + "if TYPE_CHECKING:", +] + +[tool.pytest.ini_options] +markers = [ + "manual: marks tests unsafe for auto-run (eg. better run them manually)", +] \ No newline at end of file diff --git a/src/luxos/__init__.py b/src/luxos/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/src/luxos/__main__.py b/src/luxos/__main__.py new file mode 100644 index 0000000..0a30d71 --- /dev/null +++ b/src/luxos/__main__.py @@ -0,0 +1,4 @@ +from luxos.scripts.luxos import main + +if __name__ == "__main__": + main() \ No newline at end of file diff --git a/src/luxos/api.json b/src/luxos/api.json new file mode 100644 index 0000000..d2173a3 --- /dev/null +++ b/src/luxos/api.json @@ -0,0 +1,173 @@ +{ + "addgroup": { + "logon_required": false + }, + "addpool": { + "logon_required": false + }, + "asc": { + "logon_required": false + }, + "asccount": { + "logon_required": false + }, + "atm": { + "logon_required": false + }, + "atmset": { + "logon_required": true + }, + "check": { + "logon_required": false + }, + "coin": { + "logon_required": false + }, + "config": { + "logon_required": false + }, + "curtail": { + "logon_required": true + }, + "devdetails": { + "logon_required": false + }, + "devs": { + "logon_required": false + }, + "disablepool": { + "logon_required": false + }, + "edevs": { + "logon_required": false + }, + "enablepool": { + "logon_required": false + }, + "estats": { + "logon_required": false + }, + "fans": { + "logon_required": false + }, + "fanset": { + "logon_required": true + }, + "frequencyget": { + "logon_required": false + }, + "frequencyset": { + "logon_required": true + }, + "frequencystop": { + "logon_required": true + }, + "groupquota": { + "logon_required": false + }, + "healthchipget": { + "logon_required": false + }, + "healthchipset": { + "logon_required": true + }, + "healthctrl": { + "logon_required": false + }, + "healthctrlset": { + "logon_required": true + }, + "immersionswitch": { + "logon_required": true + }, + "kill": { + "logon_required": false + }, + "lcd": { + "logon_required": false + }, + "limits": { + "logon_required": false + }, + "logoff": { + "logon_required": true + }, + "logon": { + "logon_required": false + }, + "netset": { + "logon_required": true + }, + "pools": { + "logon_required": false + }, + "power": { + "logon_required": false + }, + "profileget": { + "logon_required": false + }, + "profilenew": { + "logon_required": true + }, + "profilerem": { + "logon_required": true + }, + "profiles": { + "logon_required": false + }, + "profileset": { + "logon_required": true + }, + "reboot": { + "logon_required": true + }, + "rebootdevice": { + "logon_required": true + }, + "removegroup": { + "logon_required": false + }, + "removepool": { + "logon_required": false + }, + "resetminer": { + "logon_required": true + }, + "session": { + "logon_required": false + }, + "stats": { + "logon_required": false + }, + "summary": { + "logon_required": false + }, + "switchpool": { + "logon_required": false + }, + "tempctrl": { + "logon_required": false + }, + "tempctrlset": { + "logon_required": true + }, + "temps": { + "logon_required": false + }, + "updaterun": { + "logon_required": true + }, + "updateset": { + "logon_required": true + }, + "version": { + "logon_required": false + }, + "voltageget": { + "logon_required": false + }, + "voltageset": { + "logon_required": true + } +} \ No newline at end of file diff --git a/src/luxos/api.py b/src/luxos/api.py new file mode 100644 index 0000000..cc9ee81 --- /dev/null +++ b/src/luxos/api.py @@ -0,0 +1,32 @@ +from __future__ import annotations + +import logging +import json +import importlib.resources + +log = logging.getLogger(__name__) + + +COMMANDS = json.loads( + (importlib.resources.files("luxos") / "api.json") + .read_text() +) + + +def logon_required(cmd: str, commands_list=COMMANDS) -> bool | None: + # Check if the command requires logon to LuxOS API + + if cmd not in COMMANDS: + log.info("%s command is not supported. " + "Try again with a different command.", cmd) + return None + + return COMMANDS[cmd]["logon_required"] + + +# TODO timeouts should be float | None +def execute_command(host: str, port: int, timeout: int, cmd: str, params: list[str], verbose: bool): + from .scripts import luxos + return luxos.execute_command(host, port, timeout, cmd, params, verbose) + + diff --git a/src/luxos/asyncops.py b/src/luxos/asyncops.py new file mode 100644 index 0000000..39f33f9 --- /dev/null +++ b/src/luxos/asyncops.py @@ -0,0 +1,208 @@ +from __future__ import annotations + +import functools +import logging +import asyncio +import json + +from typing import Any + +from . import exceptions +from . import api + + +log = logging.getLogger(__name__) + +TIMEOUT = 3.0 # default timeout for operations + + +def wrapped(function): + """wraps a function acting on a host and re-raise with internal exceptions + + This re-raise exceptions so they all derive from MinerConnectionError, eg: + @wrapped + def somcode(host: str, port: int, ...): + ... + + try: + await somecode() + except MinerConnectionError as e: <- this will catch all exceptions! + e.address + raise MyNewExecption() from e <- this will re-raise + """ + @functools.wraps(function) + async def _function(host: str, port: int, *args, **kwargs): + try: + return await function(host, port, *args, **kwargs) + except asyncio.TimeoutError as e: + # we augment underlying TimeOuts + raise exceptions.MinerCommandTimeoutError(host, port) from e + except exceptions.MinerConnectionError: + raise + except Exception as e: + # we augment any other exception with (host, port) info + log.exception("internal error") + raise exceptions.MinerConnectionError(host, port, "internal error") from e + return _function + + +async def _roundtrip( + host: str, port: int, cmd: bytes | str, timeout: float | None +) -> str: + """simple asyncio socket based send/receive function + + Example: + print(await _roundtrip(host, port, version)) + -> (str) "{'STATUS': [{'Code': 22, 'Description'...." + """ + reader, writer = await asyncio.wait_for( + asyncio.open_connection(host, port), timeout + ) + + writer.write(cmd.encode() if isinstance(cmd, str) else cmd) + await writer.drain() + + response = bytearray() + while True: + data = await asyncio.wait_for(reader.read(1), timeout=timeout) + if not data: + break + null_index = data.find(b"\x00") + if null_index >= 0: + response += data[:null_index] + break + response += data + + return response.decode() + +# TODO add annotations +async def roundtrip( + host: str, + port: int, + cmd: bytes | str | dict[str, Any], + timeout: float | None = None, + asjson: bool | None = True, + retry: int = 0, + retry_delay: float | None = None, +): + """utility wrapper around _roundrip + + Example: + print(await roundtrip(host, port, {"version"})) + -> (json) {'STATUS': [{'Code': 22, 'Description': 'LUXminer 20 ... + print(await roundtrip(host, port, "version")) + -> (str) "{'STATUS': [{'Code': 22, 'Description': 'LUXminer 20 .. + """ + timeout = TIMEOUT if timeout is None else timeout + count = 0 + + if not isinstance(cmd, (bytes, str)): + cmd = json.dumps(cmd, indent=2, sort_keys=True) + if asjson is None: + asjson = True + + last_exception = None + while count <= retry: + try: + timeout = TIMEOUT if timeout is None else timeout + res = await _roundtrip(host, port, cmd, timeout) + if asjson: + return json.loads(res) + else: + return res + except (Exception, asyncio.TimeoutError) as e: + last_exception = e + if retry_delay: + await asyncio.sleep(retry_delay) + count += 1 + if last_exception is not None: + raise last_exception + + +def validate_message( + host: str, + port: int, + res: dict[str, Any], + extrakey: str | None = None, + minfields: None | int = None, + maxfields: None | int = None, +) -> dict[str, Any]: + # all miner message comes with a STATUS + for key in ["STATUS", "id", *([extrakey] if extrakey else [])]: + if key in res: + continue + raise exceptions.MinerCommandMalformedMessageError(host, port, f"missing {key} from logon message", res) + + if not extrakey or not (minfields or maxfields): + return res + + n = len(res[extrakey]) + msg = None + if minfields and (n < minfields): + msg = f"found {n} fields for {extrakey} invalid: " f"{n} <= {minfields}" + elif maxfields and (n > maxfields): + msg = f"found {n} fields for {extrakey} invalid: " f"{n} >= {maxfields}" + if msg is None: + return res[extrakey] + raise exceptions.MinerCommandMalformedMessageError(host, port, msg, res) + + +@wrapped +async def logon(host: str, port: int, timeout: float | None = 3) -> str: + timeout = TIMEOUT if timeout is None else timeout + res = await roundtrip(host, port, {"command": "logon"}, timeout) + + # when we first logon, we'll receive a token (session_id) + # [STATUS][SessionID] + # on subsequent logon, we receive a + # [STATUS][Msg] == "Another session is active" ([STATUS][Code] 402) + if "SESSION" not in res and res.get("STATUS", [{}])[0].get("Code") == 402: + raise exceptions.MinerCommandSessionAlreadyActive(host, port, "connection active", res) + sessions = validate_message(host, port, res, "SESSION", 1, 1) + + session = sessions[0] # type: ignore + + if "SessionID" not in session: + raise exceptions.MinerCommandSessionAlreadyActive(host, port, "no SessionID in data", res) + return str(session["SessionID"]) + + +@wrapped +async def logoff( + host: str, port: int, sid: str, timeout: float | None = 3 +) -> dict[str, Any]: + timeout = TIMEOUT if timeout is None else timeout + return await roundtrip(host, port, {"command": "logoff", "parameter": sid}, timeout) + + +@wrapped +async def execute_command( + host: str, + port: int, + timeout_sec: float | None, + cmd: str, + parameters: list[str] | None = None, + verbose: bool = False, + asjson: bool | None = True, + add_address: bool = False +) -> tuple[tuple[str, int], dict[str, Any]] | dict[str, Any]: + timeout = TIMEOUT if timeout_sec is None else timeout_sec + parameters = parameters or [] + + sid = None + if api.logon_required(cmd): + sid = await logon(host, port) + parameters = [sid, *parameters] + log.info("session id requested & obtained for %s:%i (%s)", host, port, sid) + else: + log.debug("no logon required for command %s on %s:%i", cmd, host, port) + + try: + packet = {"command": cmd} + if parameters: + packet["parameter"] = ",".join(parameters) + ret = await roundtrip(host, port, packet, timeout, asjson=asjson) + return ((host, port), ret) if add_address else ret + finally: + if sid: + await logoff(host, port, sid) diff --git a/src/luxos/exceptions.py b/src/luxos/exceptions.py new file mode 100644 index 0000000..39afe2a --- /dev/null +++ b/src/luxos/exceptions.py @@ -0,0 +1,24 @@ +class LuxosBaseException(Exception): + pass + + +class MinerConnectionError(LuxosBaseException): + def __init__(self, host: str, port: int, *args, **kwargs): + super().__init__(host, port, *args, **kwargs) + self.address = (host, port) + + def __str__(self): + return (f"<{self.address[0]}:{self.address[1]}>: {self.__class__.__name__}, " + f"{self.args[2] if self.args[2:] else 'unknown reason'}") + + +class MinerCommandSessionAlreadyActive(MinerConnectionError): + pass + + +class MinerCommandTimeoutError(MinerConnectionError): + pass + + +class MinerCommandMalformedMessageError(MinerConnectionError): + pass diff --git a/src/luxos/misc.py b/src/luxos/misc.py new file mode 100644 index 0000000..b56c7f3 --- /dev/null +++ b/src/luxos/misc.py @@ -0,0 +1,31 @@ +# catch-all module (find later a better place) +from __future__ import annotations +import sys +import itertools + + +if sys.version_info >= (3, 12): + batched = itertools.batched +else: + def batched(iterable, n): + if n < 1: + raise ValueError("n must be at least one") + it = iter(iterable) + while batch := tuple(itertools.islice(it, n)): + yield batch + + +def indent(txt: str, pre: str = " " * 2) -> str: + """simple text indentation""" + + from textwrap import dedent + + txt = dedent(txt) + if txt.endswith("\n"): + last_eol = "\n" + txt = txt[:-1] + else: + last_eol = "" + + result = pre + txt.replace("\n", "\n" + pre) + last_eol + return result if result.strip() else result.strip() \ No newline at end of file diff --git a/src/luxos/scripts/__init__.py b/src/luxos/scripts/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/src/luxos/scripts/async_luxos.py b/src/luxos/scripts/async_luxos.py new file mode 100644 index 0000000..c316c59 --- /dev/null +++ b/src/luxos/scripts/async_luxos.py @@ -0,0 +1,54 @@ +from __future__ import annotations +import asyncio +import json + +from .. import misc +from .. import asyncops + + +async def run( + ip_list: list[str], + port: int, + cmd: str, + params: list[str], + timeout: float, + delay: float | None, + details: bool, + batchsize: int = 0, +) -> None: + result = {} + batchsize = max(batchsize, 2) + + for grupid, addresses in enumerate( + misc.batched([(ip, port) for ip in ip_list], n=batchsize) + ): + tasks = [] + for host, port in addresses: + tasks.append( + asyncops.execute_command( + host, port, timeout, cmd, params, add_address=True + ) + ) + result[grupid] = await asyncio.gather(*tasks, return_exceptions=True) + + # runs only on batchsize, wait delay then proceed onto the next batch + if delay: + await asyncio.sleep(delay) + + alltasks = [task for group in result.values() for task in group] + successes = [task for task in alltasks if not isinstance(task, Exception)] + failures = [task for task in alltasks if isinstance(task, Exception)] + + # print a nice report + print(f"task executed sucessfully: {len(successes)}") + if details: + for (host, port), task in successes: # type: ignore + print(f" > {host}:{port}") + print(misc.indent(json.dumps(task, indent=2, sort_keys=True), pre=" | ")) + print(f"task executed failures: {len(failures)}") + for failure in failures: + print(f" {failure}") + + +def main(*args, **kwargs) -> None: + asyncio.run(run(*args, **kwargs)) diff --git a/health_checker.py b/src/luxos/scripts/health_checker.py similarity index 97% rename from health_checker.py rename to src/luxos/scripts/health_checker.py index 199d74f..90824ac 100644 --- a/health_checker.py +++ b/src/luxos/scripts/health_checker.py @@ -11,6 +11,7 @@ import threading import asyncio from datetime import datetime +from typing import Any from tqdm.asyncio import tqdm as async_tqdm import asyncpg @@ -18,10 +19,11 @@ import yaml import pandas as pd -from luxos import (generate_ip_range, logon_required, logon, +from luxos.api import logon_required + +from luxos.scripts.luxos import (generate_ip_range, add_session_id_parameter, parameters_to_string, - send_cgminer_simple_command, check_res_structure, - get_str_field) + check_res_structure, get_str_field) LOGGING_CONFIG = { 'level': logging.INFO, @@ -48,14 +50,18 @@ LUXOS_MINERS = [] +log = logging.getLogger(__name__) + def parse_args(): """Parse arguments from config file.""" - with open("config.yaml", 'r') as stream: + path = "config.yaml" + with open(path, 'r') as stream: try: config = yaml.safe_load(stream) - except yaml.YAMLError as exc: - logging.error(f"Unable to load configs! {exc}") + except yaml.YAMLError: + log.exception("Unable to load configs from %s!", path) + raise args = argparse.Namespace() @@ -137,7 +143,7 @@ def get_value_with_default(dictionary, key, default=0): async def internal_send_cgminer_command(host: str, port: int, command: str, timeout_sec: int, - verbose: bool) -> dict: + verbose: bool) -> dict[str, Any]: writer = None try: reader, writer = await asyncio.wait_for(asyncio.open_connection( @@ -175,7 +181,7 @@ async def internal_send_cgminer_command(host: str, port: int, command: str, async def send_cgminer_command(host: str, port: int, cmd: str, param: str, - timeout_sec: int, verbose: bool) -> dict: + timeout_sec: int, verbose: bool) -> dict[str, Any]: req = str(f"{{\"command\": \"{cmd}\", \"parameter\": \"{param}\"}}\n") if verbose: logging.info( @@ -186,7 +192,7 @@ async def send_cgminer_command(host: str, port: int, cmd: str, param: str, async def send_cgminer_simple_command(host: str, port: int, cmd: str, - timeout: int, verbose: bool) -> str: + timeout: int, verbose: bool) -> dict[str, Any]: req = str(f"{{\"command\": \"{cmd}\"}}\n") if verbose: logging.info(f"Executing command: {cmd} to host: {host}") @@ -217,7 +223,8 @@ async def execute_command(host: str, port: int, timeout_sec: int, cmd: str, f'Command requires a SessionID, logging in for host: {host}' ) logging.info(f'SessionID obtained for {host}: {sid}') - elif not logon_required and verbose: + # TODO fix this typing issue + elif not logon_required and verbose: # type: ignore logging.info(f"Logon not required for executing {cmd}") param_string = parameters_to_string(parameters) if verbose: @@ -768,7 +775,7 @@ async def main_loop(ip_list, args, lock, sem, buffer, db_config): end_time_healthcheck = time.time() execution_time_healthcheck = end_time_healthcheck - start_time_healthcheck - logging.info(f"Finished performing HealthCheck on all hosts.") + logging.info("Finished performing HealthCheck on all hosts.") logging.info( f"Execution time for Health Check: {execution_time_healthcheck:.2f} seconds." ) @@ -812,7 +819,7 @@ def stop_on_input(loop): message_thread.join() -async def main(): +async def run(): try: args = parse_args() logging.basicConfig(**LOGGING_CONFIG) @@ -847,5 +854,9 @@ async def main(): return +def main(): + asyncio.run(run()) + + if __name__ == "__main__": - asyncio.run(main()) \ No newline at end of file + main() \ No newline at end of file diff --git a/src/luxos/scripts/luxos.py b/src/luxos/scripts/luxos.py new file mode 100644 index 0000000..d78ef02 --- /dev/null +++ b/src/luxos/scripts/luxos.py @@ -0,0 +1,386 @@ +from __future__ import annotations +import asyncio +import os +import csv +import json +import time +import socket +import logging +import argparse +import ipaddress +import threading +from typing import Any + +from luxos.api import logon_required + +log = logging.getLogger(__name__) + + +# internal_send_cgminer_command sends a command to the cgminer API server and returns the response. +def internal_send_cgminer_command( + host: str, port: int, command: str, timeout_sec: int, verbose: bool +) -> dict[str, Any]: + # Create a socket connection to the server + with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as sock: + try: + # set timeout + sock.settimeout(timeout_sec) + + # Connect to the server + sock.connect((host, port)) + + # Send the command to the server + sock.sendall(command.encode()) + + # Receive the response from the server + response = bytearray() + while True: + # Read one byte at a time so we can wait for the null terminator. + # this is to avoid waiting for the timeout as we don't know how long + # the response will be and socket.recv() will block until reading + # the specified number of bytes. + try: + data = sock.recv(1) + except socket.timeout: + # Timeout occurred, check if we have any data so far + if len(response) == 0: + raise ValueError("timeout waiting for data") + else: + break + if not data: + break + null_index = data.find(b"\x00") + if null_index >= 0: + response += data[:null_index] + break + response += data + + # Parse the response JSON + r = json.loads(response.decode()) + log.debug(r) + return r + + except socket.error as e: + raise e + + +# send_cgminer_command sends a command to the cgminer API server and returns the response. +def send_cgminer_command( + host: str, port: int, cmd: str, param: str, timeout: int, verbose: bool +) -> dict[str, Any]: + req = str(f'{{"command": "{cmd}", "parameter": "{param}"}}\n') + log.debug(f"Executing command: {cmd} with params: {param} to host: {host}") + + return internal_send_cgminer_command(host, port, req, timeout, verbose) + + +# send_cgminer_simple_command sends a command with no params to the miner and returns the response. +def send_cgminer_simple_command( + host: str, port: int, cmd: str, timeout: int, verbose: bool +) -> dict[str, Any]: + req = str(f'{{"command": "{cmd}"}}\n') + log.debug(f"Executing command: {cmd} to host: {host}") + return internal_send_cgminer_command(host, port, req, timeout, verbose) + + +# check_res_structure checks that the response has the expected structure. +def check_res_structure( + res: dict[str, Any], structure: str, min: int, max: int +) -> dict[str, Any]: + # Check the structure of the response. + if structure not in res or "STATUS" not in res or "id" not in res: + raise ValueError("error: invalid response structure") + + # Should we check min and max? + if min >= 0 and max >= 0: + # Check the number of structure elements. + if not (min <= len(res[structure]) <= max): + raise ValueError( + f"error: unexpected number of {structure} in response; min: {min}, max: {max}, actual: {len(res[structure])}" + ) + + # Should we check only min? + if min >= 0: + # Check the minimum number of structure elements. + if len(res[structure]) < min: + raise ValueError( + f"error: unexpected number of {structure} in response; min: {min}, max: {max}, actual: {len(res[structure])}" + ) + + # Should we check only max? + if max >= 0: + # Check the maximum number of structure elements. + if len(res[structure]) < min: + raise ValueError( + f"error: unexpected number of {structure} in response; min: {min}, max: {max}, actual: {len(res[structure])}" + ) + + return res + + +# get_str_field tries to get the field as a string and returns it. +def get_str_field(struct: dict[str, Any], name: str) -> str: + try: + s = str(struct[name]) + except Exception as e: + raise ValueError(f"error: invalid {name} str field: {e}") + + return s + + +def logon(host: str, port: int, timeout: int, verbose: bool) -> str: + # Send 'logon' command to cgminer and get the response + res = send_cgminer_simple_command(host, port, "logon", timeout, verbose) + + # Check if the response has the expected structure + check_res_structure(res, "SESSION", 1, 1) + + # Extract the session data from the response + session = res["SESSION"][0] + + # Get the 'SessionID' field from the session data + s = get_str_field(session, "SessionID") + + # If 'SessionID' is empty, raise an error indicating invalid session id + if s == "": + raise ValueError("error: invalid session id") + + # Return the extracted 'SessionID' + return s + + +def add_session_id_parameter(session_id, parameters): + # Add the session id to the parameters + return [session_id, *parameters] + + +def parameters_to_string(parameters): + # Convert the parameters to a string that LuxOS API accepts + return ",".join(parameters) + + +def generate_ip_range(start_ip: str, end_ip: str) -> list[str]: + # Generate a list of IP addresses from the start and end IP + start = ipaddress.IPv4Address(start_ip) + end = ipaddress.IPv4Address(end_ip) + + ip_list = [] + + while start <= end: + ip_list.append(str(start)) + start += 1 + + return ip_list + + +def load_ip_list_from_csv(csv_file: str) -> list[str]: + # Check if file exists + if not os.path.exists(csv_file): + logging.info(f"Error: {csv_file} file not found.") + exit(1) + + # Load the IP addresses from the CSV file + ip_list = [] + with open(csv_file, "r") as f: + reader = csv.reader(f) + for i, row in enumerate(reader): + # skip commented lines + if row and row[0].strip().startswith("#"): + continue + if i == 0 and row and row[0] == "hostname": + continue + if row: # Ignore empty rows + ip_list.extend(row) + return ip_list + + +def execute_command( + host: str, port: int, timeout_sec: int, cmd: str, parameters: list, verbose: bool +): + # Check if logon is required for the command + logon_req = logon_required(cmd) + + try: + if logon_req: + # Get a SessionID + sid = logon(host, port, timeout_sec, verbose) + # Add the SessionID to the parameters list at the left. + parameters = add_session_id_parameter(sid, parameters) + + log.debug("Command requires a SessionID, logging in for host: %s", host) + log.info("SessionID obtained for %s: %s", host, sid) + + # TODO verify this + elif not logon_required: # type: ignore + log.debug("Logon not required for executing %s", cmd) + + # convert the params to a string that LuxOS API accepts + param_string = parameters_to_string(parameters) + + log.debug("%s on %s with parameters: %s", cmd, host, param_string) + + # Execute the API command + res = send_cgminer_command(host, port, cmd, param_string, timeout_sec, verbose) + + log.debug(res) + + # Log off to terminate the session + if logon_req: + send_cgminer_command(host, port, "logoff", sid, timeout_sec, verbose) + + return res + + except Exception: + log.exception("Error executing %s on %s", cmd, host) + + +def main(): + # define arguments + parser = argparse.ArgumentParser(description="LuxOS CLI Tool") + parser.add_argument("--range_start", required=False, help="IP start range") + parser.add_argument("--range_end", required=False, help="IP end range") + parser.add_argument( + "--ipfile", + required=False, + default="ips.csv", + help="File name to store IP addresses", + ) + parser.add_argument("--cmd", required=True, help="Command to execute on LuxOS API") + parser.add_argument( + "--params", + required=False, + default=[], + nargs="+", + help="Parameters for LuxOS API", + ) + parser.add_argument( + "--max_threads", + required=False, + default=10, + type=int, + help="Maximum number of threads to use. Default is 10.", + ) + parser.add_argument( + "--timeout", + required=False, + default=3, + type=int, + help="Timeout for network scan in seconds", + ) + parser.add_argument( + "--port", required=False, default=4028, type=int, help="Port for LuxOS API" + ) + parser.add_argument( + "--verbose", required=False, default=False, type=bool, help="Verbose output" + ) + parser.add_argument( + "--batch_delay", + required=False, + default=0, + type=int, + help="Delay between batches in seconds", + ) + parser.add_argument( + "--async", + dest="async_engine", + action="store_true", + help="enable the new async engine", + ) + parser.add_argument( + "-a", + "--all", + dest="details", + action="store_true", + help="show full result output", + ) + + # parse arguments + args = parser.parse_args() + args.error = parser.error + + logging.basicConfig( + level=logging.DEBUG if args.verbose else logging.INFO, + format="%(asctime)s [%(levelname)s] %(message)s", + handlers=[logging.StreamHandler(), logging.FileHandler("LuxOS-CLI.log")], + ) + + # set timeout to milliseconds + timeout_sec = args.timeout + + # check if IP address range or CSV with list of IP is provided + if args.range_start and args.range_end: + ip_list = generate_ip_range(args.range_start, args.range_end) + elif args.ipfile: + ip_list = load_ip_list_from_csv(args.ipfile) + else: + args.error("No IP address or IP list found.") + + # Set max threads to use, minimum of max threads and number of IP addresses + max_threads = min(args.max_threads, len(ip_list)) + + # Set start time + start_time = time.monotonic() + + if args.async_engine: + from . import async_luxos + + asyncio.run( + async_luxos.run( + ipaddress=ip_list, + port=args.port, + cmd=args.cmd, + params=args.params, + timeout=timeout_sec, + delay=args.batch_delay, + details=args.details, + batchsize=args.max_threads, + ) + ) + + # TODO remove this duplicate code + end_time = time.monotonic() + execution_time = end_time - start_time + log.info(f"Execution completed in {execution_time:.2f} seconds.") + return + + # Create a list of threads + threads = [] + + # Iterate over the IP addresses + for ip in ip_list: + # create new thread for each IP address + thread = threading.Thread( + target=execute_command, + args=(ip, args.port, timeout_sec, args.cmd, args.params, args.verbose), + ) + + # start the thread + threads.append(thread) + thread.start() + + # Limit the number of concurrent threads + if len(threads) >= max_threads: + # Wait for the threads to finish + for thread in threads: + thread.join() + + # Introduce the batch delay if specified + if args.batch_delay > 0: + print(f"Waiting {args.batch_delay} seconds") + time.sleep(args.batch_delay) + + # Clear the thread list for the next batch + threads = [] + + # Wait for the remaining threads to finish + for thread in threads: + thread.join() + + # Execution completed + end_time = time.time() + execution_time = end_time - start_time + log.info(f"Execution completed in {execution_time:.2f} seconds.") + + +if __name__ == "__main__": + main() diff --git a/tests/conftest.py b/tests/conftest.py new file mode 100644 index 0000000..90b0e97 --- /dev/null +++ b/tests/conftest.py @@ -0,0 +1,136 @@ +# nothing to see here, yet +import contextlib +import subprocess +import sys +import dataclasses as dc +import time +import types +from pathlib import Path + +import pytest + + +DATADIR = Path(__file__).parent / "data" + + +@pytest.fixture(scope="function") +def resolver(request): + """return a resolver object to lookup for test data + + Example: + def test_me(resolver): + print(resolver.lookup("a/b/c")) -> tests/data/a/b/c + """ + + @dc.dataclass + class Resolver: + root: Path + name: str + + def lookup(self, path: Path | str) -> Path: + candidates = [ + self.root / self.name / path, + self.root / path, + ] + for candidate in candidates: + if candidate.exists(): + return candidate + raise FileNotFoundError(f"cannot find {path}", candidates) + + yield Resolver(DATADIR, request.module.__name__) + + +def loadmod(path: Path) -> types.ModuleType: + from importlib import util + + spec = util.spec_from_file_location(Path(path).name, Path(path)) + module = util.module_from_spec(spec) # type: ignore + spec.loader.exec_module(module) # type: ignore + return module + + +@pytest.fixture(scope="function") +def echopool(resolver, tmp_path, request): + """yield a pool of echo servers + + Example: + def test_me(echopool): + echopool.start(30) + host, port = echopool.addresses[0] + ret = luxos.api.send_cgminer_command(host, port, "helo", "world") + + Note: + set the environemnt variable to a miner to test against it, + eg. export LUXOS_TEST_MINER=1.2.3.4:99999 + + minerd has only two methods, .server_address and .load! + """ + script = resolver.lookup("echopool.py") + mod = loadmod(script) + + @dc.dataclass + class Pool: + def __init__(self): + self.process: subprocess.Popen | None = None + self.addresses: list[tuple[str, int]] | None = None + + def start( + self, + number: int, + mode: str = "echo+", + verbose: bool = False, + timeout: float | None = None, + ): + cmd = [sys.executable, script, "--mode", mode] + + if verbose: + cmd.append("-v") + + path = tmp_path / f"{request.function.__name__}.txt" + cmd.extend(["--server-file", path]) + + cmd.append(number) + + self.process = subprocess.Popen([str(c) for c in cmd]) + + # wait at most 5 seconds for the underlying server to start up + timeout = 5.0 if timeout is None else timeout + ttl = (time.monotonic() + timeout) if timeout else None + + while self.addresses is None: + if ttl and (time.monotonic() > ttl): + raise RuntimeError("failed to start underlying server", cmd) + addresses = [] + with contextlib.suppress(FileNotFoundError): + for line in path.read_text().split("\n"): + if found := mod.is_server_line(line): + addresses.append(found) + if mod.is_end_server_lines(line): + self.addresses = addresses + break + time.sleep(0.01) + + def shutdown(self): + if self.process: + self.process.kill() + + pool = Pool() + try: + yield pool + finally: + pool.shutdown() + + +def pytest_addoption(parser): + parser.addoption( + "--manual", + action="store_true", + dest="manual", + default=False, + help="run manual tests", + ) + + +def pytest_configure(config): + if not config.option.manual: + setattr(config.option, "markexpr", "not manual") diff --git a/tests/data/echopool.py b/tests/data/echopool.py new file mode 100644 index 0000000..402a245 --- /dev/null +++ b/tests/data/echopool.py @@ -0,0 +1,131 @@ +from __future__ import annotations +import asyncio +import json +import logging +import time +import re +import argparse +import functools +from pathlib import Path + + +ENDCONFIG = "= DONE =" +SERVERLINE = re.compile(r"server:[(]['](?P[^']+)['][,](?P[^)]+)") + +log = logging.getLogger(__name__) + + +def is_server_line(line) -> None | tuple[str, int]: + if match := SERVERLINE.search(line.replace(" ", "")): + return match.group("host"), int(match.group("port")) + return None + + +def is_end_server_lines(line): + return ENDCONFIG in line + + +async def handle_client( + reader, + writer, + mode: str = "echo", + delay_s: float | None = None, + async_delay_s: float | None = None, +): + data = await reader.read(1024) + message = data.decode() + addr = writer.get_extra_info("peername") + this_addr = reader._transport.get_extra_info("sockname") + + log.debug("received at %s: %s", this_addr, message) + if mode == "echo+": + data = f"received by {this_addr}: {message}".encode() + elif mode in {"json", "json+"}: + result = { + "status": "ok", + "message": f"received from {addr}", + "reason": "", + "this": this_addr, + "peer": addr, + "result": {"message": message, "input": "", "output": ""}, + } + try: + message = json.loads(message) + result["result"]["input"] = message + if mode == "json+": + if message.get("command", None) == "sleep": + await asyncio.sleep(float(message["value"])) + result["result"]["output"] = f"slept for {message['value']}" + + except Exception as e: + result["status"] = "failed" + result["reason"] = str(e) + + data = json.dumps(result, indent=2, sort_keys=True).encode() + + # this instroduces a delay between request <-> reply + if async_delay_s: + await asyncio.sleep(async_delay_s) + + writer.write(data) + await writer.drain() + + writer.close() + + # this block the whole server! + if delay_s: + time.sleep(delay_s) + + +async def main(): + """ + The main function that sets up the server to listen on multiple ports. + """ + p = argparse.ArgumentParser() + p.add_argument("-v", "--verbose", action="store_true") + p.add_argument( + "--delay", type=float, help="non async delay in s (block the server)" + ) + p.add_argument( + "--async-delay", type=float, help="async delay in s (block a single request)" + ) + p.add_argument("--mode", choices=["echo", "echo+", "json", "json+"], default="echo") + p.add_argument("--server-file", type=Path) + p.add_argument("number", type=int) + + args = p.parse_args() + logging.basicConfig(level=logging.DEBUG if args.verbose else logging.INFO) + + log.debug("starting %i servers in [%s] mode", args.number, args.mode) + + # Create a server for each port + client = functools.partial( + handle_client, + mode=args.mode, + delay_s=args.delay, + async_delay_s=args.async_delay, + ) + servers = await asyncio.gather( + *[asyncio.start_server(client, "127.0.0.1", 0) for _ in range(args.number)] + ) + + message = [f"server: {server.sockets[0].getsockname()}" for server in servers] + message.append(ENDCONFIG) + txt = "\n".join(message) + + if args.server_file: + args.server_file.parent.mkdir(parents=True, exist_ok=True) + args.server_file.write_text(txt) + log.debug("written servers to %s", args.server_file) + else: + print(txt) + + # Keep the main coroutine running + while True: + if args.server_file and not args.server_file.exists(): + break + await asyncio.sleep(1) + + +if __name__ == "__main__": + asyncio.run(main()) diff --git a/tests/requirements.txt b/tests/requirements.txt new file mode 100644 index 0000000..4b938b2 --- /dev/null +++ b/tests/requirements.txt @@ -0,0 +1,20 @@ +asyncpg +build +httpx +lxml +mypy +pandas +pandas-stubs +pre-commit +pytest +pytest-asyncio +pytest-cov +pytest-html +pyyaml +tqdm +types-pyyaml +types-tqdm + +# For you sanity please sync with .pre-commit-config.yaml +ruff == 0.3.4 +mypy == 1.9.0 diff --git a/tests/test_luxos.py b/tests/test_luxos.py new file mode 100644 index 0000000..4625903 --- /dev/null +++ b/tests/test_luxos.py @@ -0,0 +1,18 @@ +from pathlib import Path + +def test_import(): + """makes sure we're importing the correct luxos package + """ + + from luxos import api + assert (Path(api.__file__).parent / "api.json").exists() + + assert len(api.COMMANDS) == 57 + + +def test_logon_required(): + from luxos import api + + assert api.logon_required("blah") is None + assert api.logon_required("logoff") is True + assert api.logon_required("logon") is False diff --git a/tests/test_luxos_asyncops.py b/tests/test_luxos_asyncops.py new file mode 100644 index 0000000..8c5605d --- /dev/null +++ b/tests/test_luxos_asyncops.py @@ -0,0 +1,134 @@ +from __future__ import annotations + +import os +import asyncio +import pytest +import luxos.asyncops as aapi +from luxos import exceptions +from luxos import misc + +## NOTE ## +# This tests spawn an underlying server, it might be better not run +# unattended. Some also require a miner, we might not have it handy. +pytestmark = pytest.mark.manual + + + +def getminer() -> None | tuple[str, int]: + if not (minerd := os.getenv("LUXOS_TEST_MINER")): + return None + host, port = minerd.split(":") + return host.strip(), int(port) + + +def test_validate_message(): + pytest.raises(exceptions.MinerCommandMalformedMessageError, aapi.validate_message, "a", 0, {}) + host, port = "a", 0 + + assert aapi.validate_message(host, port, {"STATUS": 1, "id": 2}) + + pytest.raises( + exceptions.MinerCommandMalformedMessageError, + aapi.validate_message, + host, port, + { + "STATUS": 1, + "id": 2, + }, + "wooow", + ) + + assert aapi.validate_message(host, port, {"STATUS": 1, "id": 2, "wooow": [1, 2]}, "wooow") + + with pytest.raises(exceptions.MinerCommandMalformedMessageError) as excinfo: + aapi.validate_message(host, port, + {"STATUS": 1, "id": 2, "wooow": [1, 2]}, "wooow", minfields=9 + ) + assert excinfo.value.args[2] == "found 2 fields for wooow invalid: 2 <= 9" + + with pytest.raises(exceptions.MinerCommandMalformedMessageError) as excinfo: + aapi.validate_message(host, port, + {"STATUS": 1, "id": 2, "wooow": [1, 2]}, "wooow", maxfields=1 + ) + assert excinfo.value.args[2] == "found 2 fields for wooow invalid: 2 >= 1" + + with pytest.raises(exceptions.MinerCommandMalformedMessageError) as excinfo: + aapi.validate_message(host, port, + {"STATUS": 1, "id": 2, "wooow": [1, 2]}, "wooow", minfields=9, maxfields=10 + ) + assert excinfo.value.args[2] == "found 2 fields for wooow invalid: 2 <= 9" + + assert aapi.validate_message(host, port, + {"STATUS": 1, "id": 2, "wooow": [1, 2]}, "wooow", minfields=2, maxfields=10 + ) + + +@pytest.mark.asyncio +async def test_private_roundtrip_one_listener(echopool): + """checks roundrtip sends and receive a message (1-listener)""" + echopool.start(1, mode="echo+") + host, port = echopool.addresses[0] + ret = await aapi._roundtrip(host, port, "hello", None) + assert ret == f"received by ('{host}', {port}): hello" + + +@pytest.mark.asyncio +async def test_private_roundtrip_many_listeners(echopool): + """checks the roundrip can connect en-masse to many lsiteners""" + echopool.start(100, mode="echo+") + + blocks = {} + for index, group in enumerate(misc.batched(echopool.addresses, 10)): + tasks = [] + for host, port in group: + tasks.append(aapi._roundtrip(host, port, "hello olleh", None)) + blocks[index] = await asyncio.gather(*tasks, return_exceptions=True) + + assert len(blocks) == 10 + assert sum(len(x) for x in blocks.values()) == 100 + allitems = [item for b in blocks.values() for item in b] + assert f"received by {echopool.addresses[3]}: hello olleh" in allitems + + +@pytest.mark.skipif(not getminer(), reason="need to set LUXOS_TEST_MINER") +@pytest.mark.asyncio +async def test_miner_logon_logoff_cycle(): + host, port = getminer() + + sid = None + try: + sid = await aapi.logon(host, port) + except exceptions.MinerCommandSessionAlreadyActive as e: + raise RuntimeError("a session is already active on {host}:{port}") from e + finally: + if sid: + await aapi.logoff(host, port, sid) + + + +@pytest.mark.skipif(not getminer(), reason="need to set LUXOS_TEST_MINER") +@pytest.mark.asyncio +async def test_miner_double_logon_cycle(): + host, port = getminer() + + sid = await aapi.logon(host, port) + try: + with pytest.raises(exceptions.MinerCommandSessionAlreadyActive) as excinfo: + await aapi.logon(host, port) + assert excinfo.value.args[0] == f"session active for {host}:{port}" + except Exception: + pass + finally: + if sid: + await aapi.logoff(host, port, sid) + + +@pytest.mark.skipif(not getminer(), reason="need to set LUXOS_TEST_MINER") +@pytest.mark.asyncio +async def test_miner_version(): + host, port = getminer() + + res = await aapi.execute_command(host, port, None, "version", asjson=True) + assert "VERSION" in res + assert len(res["VERSION"]) == 1 + assert "API" in res["VERSION"][0] diff --git a/tests/test_luxos_misc.py b/tests/test_luxos_misc.py new file mode 100644 index 0000000..de261b0 --- /dev/null +++ b/tests/test_luxos_misc.py @@ -0,0 +1,28 @@ +from luxos import misc + +def test_misc_batched(): + data = {} + for index, group in enumerate(misc.batched(range(10), 3)): + data[index] = group + + assert data == { + 0: (0, 1, 2), + 1: (3, 4, 5), + 2: (6, 7, 8), + 3: (9,), + } + +def test_indent(): + txt = """\ + An unusually complicated text + with un-even indented lines + that make life harder +""" + assert ( + misc.indent(txt, pre="..") + == """\ +.. An unusually complicated text +.. with un-even indented lines +..that make life harder +""" + ) \ No newline at end of file