Skip to content

Commit a4dc5eb

Browse files
committed
new syncops module
1 parent 7ffd82e commit a4dc5eb

File tree

7 files changed

+303
-202
lines changed

7 files changed

+303
-202
lines changed

src/luxos/asyncops.py

Lines changed: 22 additions & 53 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
from __future__ import annotations
22

33
import asyncio
4+
import contextlib
45
import functools
56
import json
67
import logging
@@ -126,9 +127,9 @@ def validate_message(
126127
port: int,
127128
res: dict[str, Any],
128129
extrakey: str | None = None,
129-
minfields: None | int = None,
130-
maxfields: None | int = None,
131-
) -> dict[str, Any]:
130+
minfields: None | int = 1,
131+
maxfields: None | int = 1,
132+
) -> Any:
132133
# all miner message comes with a STATUS
133134
for key in ["STATUS", "id", *([extrakey] if extrakey else [])]:
134135
if key in res:
@@ -142,17 +143,17 @@ def validate_message(
142143

143144
n = len(res[extrakey])
144145
msg = None
145-
if minfields and (n < minfields):
146+
if (minfields is not None) and (n < minfields):
146147
msg = f"found {n} fields for {extrakey} invalid: " f"{n} <= {minfields}"
147-
elif maxfields and (n > maxfields):
148+
elif (maxfields is not None) and (n > maxfields):
148149
msg = f"found {n} fields for {extrakey} invalid: " f"{n} >= {maxfields}"
149150
if msg is None:
150151
return res[extrakey]
151152
raise exceptions.MinerCommandMalformedMessageError(host, port, msg, res)
152153

153154

154155
@wrapped
155-
async def logon(host: str, port: int, timeout: float | None = 3) -> str:
156+
async def logon(host: str, port: int, timeout: float | None = None) -> str:
156157
timeout = TIMEOUT if timeout is None else timeout
157158
res = await roundtrip(host, port, {"command": "logon"}, timeout=timeout)
158159

@@ -166,7 +167,7 @@ async def logon(host: str, port: int, timeout: float | None = 3) -> str:
166167
)
167168
sessions = validate_message(host, port, res, "SESSION", 1, 1)
168169

169-
session = sessions[0] # type: ignore
170+
session = sessions[0]
170171

171172
if "SessionID" not in session:
172173
raise exceptions.MinerCommandSessionAlreadyActive(
@@ -177,56 +178,15 @@ async def logon(host: str, port: int, timeout: float | None = 3) -> str:
177178

178179
@wrapped
179180
async def logoff(
180-
host: str, port: int, sid: str, timeout: float | None = 3
181+
host: str, port: int, sid: str, timeout: float | None = None
181182
) -> dict[str, Any]:
182183
timeout = TIMEOUT if timeout is None else timeout
183184
return await roundtrip(
184185
host, port, {"command": "logoff", "parameter": sid}, timeout=timeout
185186
)
186187

187188

188-
@wrapped
189-
async def execute_command(
190-
host: str,
191-
port: int,
192-
timeout_sec: float | None,
193-
cmd: str,
194-
parameters: list[str] | None = None,
195-
verbose: bool = False,
196-
asjson: bool | None = True,
197-
add_address: bool = False,
198-
) -> tuple[tuple[str, int], dict[str, Any]] | dict[str, Any]:
199-
timeout = TIMEOUT if timeout_sec is None else timeout_sec
200-
parameters = parameters or []
201-
202-
sid = None
203-
if api.logon_required(cmd):
204-
sid = await logon(host, port)
205-
parameters = [sid, *parameters]
206-
log.debug("session id requested & obtained for %s:%i (%s)", host, port, sid)
207-
else:
208-
log.debug("no logon required for command '%s' on %s:%i", cmd, host, port)
209-
210-
try:
211-
packet = {"command": cmd}
212-
if parameters:
213-
packet["parameter"] = ",".join(parameters)
214-
log.debug(
215-
"executing command '%s' on '%s:%i' with parameters: %s",
216-
cmd,
217-
host,
218-
port,
219-
packet.get("parameter", ""),
220-
)
221-
ret = await roundtrip(host, port, packet, timeout=timeout, asjson=asjson)
222-
log.debug("received from %s:%s: %s", host, port, str(ret))
223-
return ((host, port), ret) if add_address else ret
224-
finally:
225-
if sid:
226-
await logoff(host, port, sid)
227-
228-
229-
def _rexec_parameters(
189+
def parameters_to_list(
230190
parameters: str | list[Any] | dict[str, Any] | None = None,
231191
) -> list[str]:
232192
if isinstance(parameters, dict):
@@ -256,9 +216,7 @@ async def rexec(
256216
retry: int | None = None,
257217
retry_delay: float | None = None,
258218
) -> dict[str, Any] | None:
259-
from . import api
260-
261-
parameters = _rexec_parameters(parameters)
219+
parameters = parameters_to_list(parameters)
262220

263221
timeout = TIMEOUT if timeout is None else timeout
264222
retry = RETRIES if retry is None else retry
@@ -328,3 +286,14 @@ async def rexec(
328286
await logoff(host, port, sid)
329287
if isinstance(failure, Exception):
330288
raise failure
289+
290+
291+
@contextlib.asynccontextmanager
292+
async def with_atm(host, port, enabled: bool, timeout: float | None = None):
293+
res = await rexec(host, port, "atm", timeout=timeout)
294+
if not res:
295+
raise exceptions.MinerConnectionError(host, port, "cannot check atm")
296+
current = validate_message(host, port, res, "ATM")[0]["Enabled"]
297+
await rexec(host, port, "atmset", {"enabled": enabled}, timeout=timeout)
298+
yield current
299+
await rexec(host, port, "atmset", {"enabled": current}, timeout=timeout)

src/luxos/exceptions.py

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -32,3 +32,19 @@ class MinerCommandSessionAlreadyActive(MinerConnectionError):
3232

3333
class MinerCommandMalformedMessageError(MinerConnectionError):
3434
pass
35+
36+
37+
class LuxosLaunchError(MinerConnectionError):
38+
def __init__(self, tback: str, host: str, port: int, *args, **kwargs):
39+
self.tback = tback
40+
super().__init__(host, port, *args, **kwargs)
41+
42+
def __str__(self):
43+
from .text import indent
44+
45+
msg = indent(str(self.tback), "| ")
46+
return f"{self.address}: \n{msg}"
47+
48+
49+
class LuxosLaunchTimeoutError(LuxosLaunchError, asyncio.TimeoutError):
50+
pass

src/luxos/ips.py

Lines changed: 21 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -129,6 +129,27 @@ def iter_ip_ranges(
129129
cur += 1
130130

131131

132+
def ip_ranges(
133+
txt: str, rsep: str = "-", gsep: str = ":"
134+
) -> list[tuple[str, int | None]]:
135+
"""return a list of ips given a text expression.
136+
137+
Eg.
138+
>>> for ip in ip_ranges("127.0.0.1"):
139+
... print(ip)
140+
127.0.0.1
141+
142+
>>> for ip in ip_ranges("127.0.0.1-127.0.0.3"):
143+
... print(ip)
144+
127.0.0.1
145+
127.0.0.2
146+
127.0.0.3
147+
148+
NOTE: use the `:` (gsep) to separate ips groups, and `-` (rsep) to define a range.
149+
"""
150+
return list(iter_ip_ranges(txt, rsep=rsep, gsep=gsep))
151+
152+
132153
def load_ips_from_csv(path: Path | str, port: int = 4028) -> list[tuple[str, int]]:
133154
"""loads ip addresses from a csv file
134155

src/luxos/syncops.py

Lines changed: 74 additions & 47 deletions
Original file line numberDiff line numberDiff line change
@@ -1,25 +1,31 @@
11
from __future__ import annotations
22

3+
import contextlib
34
import json
45
import logging
56
import socket
67
from typing import Any
78

89
from luxos.api import logon_required
910

11+
from .asyncops import TIMEOUT, parameters_to_list, validate_message
12+
from .exceptions import MinerCommandSessionAlreadyActive
13+
1014
log = logging.getLogger(__name__)
1115

1216

1317
# internal_send_cgminer_command sends a command to the
1418
# cgminer API server and returns the response.
1519
def internal_send_cgminer_command(
16-
host: str, port: int, command: str, timeout_sec: int, verbose: bool
20+
host: str, port: int, command: str, timeout: float | None
1721
) -> dict[str, Any]:
22+
timeout_sec = TIMEOUT if timeout is None else timeout
1823
# Create a socket connection to the server
1924
with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as sock:
2025
try:
2126
# set timeout
22-
sock.settimeout(timeout_sec)
27+
if timeout_sec is not None:
28+
sock.settimeout(timeout)
2329

2430
# Connect to the server
2531
sock.connect((host, port))
@@ -62,22 +68,23 @@ def internal_send_cgminer_command(
6268
# send_cgminer_command sends a command to the cgminer API server and
6369
# returns the response.
6470
def send_cgminer_command(
65-
host: str, port: int, cmd: str, param: str, timeout: int, verbose: bool
71+
host: str, port: int, cmd: str, param: str, timeout: float | None = None
6672
) -> dict[str, Any]:
73+
timeout = TIMEOUT if timeout is None else timeout
6774
req = str(f'{{"command": "{cmd}", "parameter": "{param}"}}\n')
6875
log.debug(f"Executing command: {cmd} with params: {param} to host: {host}")
69-
70-
return internal_send_cgminer_command(host, port, req, timeout, verbose)
76+
return internal_send_cgminer_command(host, port, req, timeout)
7177

7278

7379
# send_cgminer_simple_command sends a command with no params
7480
# to the miner and returns the response.
7581
def send_cgminer_simple_command(
76-
host: str, port: int, cmd: str, timeout: int, verbose: bool
82+
host: str, port: int, cmd: str, timeout: float | None = None
7783
) -> dict[str, Any]:
84+
timeout = TIMEOUT if timeout is None else timeout
7885
req = str(f'{{"command": "{cmd}"}}\n')
7986
log.debug(f"Executing command: {cmd} to host: {host}")
80-
return internal_send_cgminer_command(host, port, req, timeout, verbose)
87+
return internal_send_cgminer_command(host, port, req, timeout)
8188

8289

8390
# check_res_structure checks that the response has the expected structure.
@@ -128,28 +135,26 @@ def get_str_field(struct: dict[str, Any], name: str) -> str:
128135
return s
129136

130137

131-
def logon(host: str, port: int, timeout: int, verbose: bool) -> str:
138+
def logon(host: str, port: int, timeout: float | None = None) -> str:
132139
# Send 'logon' command to cgminer and get the response
133-
res = send_cgminer_simple_command(host, port, "logon", timeout, verbose)
134-
140+
timeout = TIMEOUT if timeout is None else timeout
141+
res = send_cgminer_simple_command(host, port, "logon", timeout)
142+
if res["STATUS"][0]["STATUS"] == "E":
143+
raise MinerCommandSessionAlreadyActive(host, port, res["STATUS"][0]["Msg"])
135144
# Check if the response has the expected structure
136-
check_res_structure(res, "SESSION", 1, 1)
137-
138-
# Extract the session data from the response
139-
session = res["SESSION"][0]
145+
session = validate_message(host, port, res, "SESSION")[0]
146+
return str(session["SessionID"])
140147

141-
# Get the 'SessionID' field from the session data
142-
s = get_str_field(session, "SessionID")
143148

144-
# If 'SessionID' is empty, raise an error indicating invalid session id
145-
if s == "":
146-
raise ValueError("error: invalid session id")
147-
148-
# Return the extracted 'SessionID'
149-
return s
149+
def logoff(
150+
host: str, port: int, sid: str, timeout: float | None = None
151+
) -> dict[str, Any]:
152+
timeout = TIMEOUT if timeout is None else timeout
153+
res = send_cgminer_command(host, port, "logoff", sid, timeout)
154+
return validate_message(host, port, res)
150155

151156

152-
def add_session_id_parameter(session_id, parameters):
157+
def add_session_id_parameter(session_id, parameters) -> list[Any]:
153158
# Add the session id to the parameters
154159
return [session_id, *parameters]
155160

@@ -160,40 +165,62 @@ def parameters_to_string(parameters):
160165

161166

162167
def execute_command(
163-
host: str, port: int, timeout_sec: int, cmd: str, parameters: list, verbose: bool
168+
host: str,
169+
port: int,
170+
timeout_sec: int | float | None,
171+
cmd: str,
172+
parameters: str | list[Any] | dict[str, Any] | None = None,
173+
verbose: bool = False,
164174
):
175+
timeout_sec = TIMEOUT if timeout_sec is None else timeout_sec
165176
# Check if logon is required for the command
166177
logon_req = logon_required(cmd)
167178

168-
try:
169-
if logon_req:
170-
# Get a SessionID
171-
sid = logon(host, port, timeout_sec, verbose)
172-
# Add the SessionID to the parameters list at the left.
173-
parameters = add_session_id_parameter(sid, parameters)
179+
parameters = parameters_to_list(parameters)
180+
181+
if logon_req:
182+
# Get a SessionID
183+
sid = logon(host, port, timeout_sec)
184+
# Add the SessionID to the parameters list at the left.
185+
parameters = add_session_id_parameter(sid, parameters)
174186

175-
log.debug("Command requires a SessionID, logging in for host: %s", host)
176-
log.info("SessionID obtained for %s: %s", host, sid)
187+
log.debug("SessionID obtained for %s: %s", host, sid)
177188

178-
# TODO verify this
179-
elif not logon_required: # type: ignore
180-
log.debug("Logon not required for executing %s", cmd)
189+
# TODO verify this
190+
elif not logon_req:
191+
log.debug("Logon not required for executing %s", cmd)
181192

182-
# convert the params to a string that LuxOS API accepts
183-
param_string = parameters_to_string(parameters)
193+
# convert the params to a string that LuxOS API accepts
194+
param_string = ",".join(parameters)
184195

185-
log.debug("%s on %s with parameters: %s", cmd, host, param_string)
196+
log.debug("%s on %s with parameters: %s", cmd, host, param_string)
186197

187-
# Execute the API command
188-
res = send_cgminer_command(host, port, cmd, param_string, timeout_sec, verbose)
198+
# Execute the API command
199+
res = send_cgminer_command(host, port, cmd, param_string, timeout_sec)
189200

190-
log.debug(res)
201+
log.debug(res)
191202

192-
# Log off to terminate the session
193-
if logon_req:
194-
send_cgminer_command(host, port, "logoff", sid, timeout_sec, verbose)
203+
# Log off to terminate the session
204+
if logon_req:
205+
logoff(host, port, sid, timeout_sec)
206+
207+
return res
208+
209+
210+
def execute(
211+
host: str,
212+
port: int,
213+
cmd: str,
214+
parameters: str | list[Any] | dict[str, Any] | None = None,
215+
timeout: float | None = None,
216+
):
217+
return execute_command(host, port, timeout, cmd, parameters)
195218

196-
return res
197219

198-
except Exception:
199-
log.exception("Error executing %s on %s", cmd, host)
220+
@contextlib.contextmanager
221+
def with_atm(host, port, enabled: bool, timeout: float | None = None):
222+
res = execute(host, port, "atm", timeout=timeout)
223+
current = validate_message(host, port, res, "ATM")[0]["Enabled"]
224+
execute(host, port, "atmset", {"enabled": enabled}, timeout=timeout)
225+
yield current
226+
execute(host, port, "atmset", {"enabled": current}, timeout=timeout)

0 commit comments

Comments
 (0)