Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Empty file added mrok/agent/devtools/__init__.py
Empty file.
203 changes: 203 additions & 0 deletions mrok/agent/devtools/inspector.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,203 @@
#!/usr/bin/env python3
import os
import time

from rich.panel import Panel
from rich.syntax import Syntax
from rich.text import Text
from textual import work
from textual.app import App, ComposeResult
from textual.containers import Container, Horizontal, Vertical, VerticalScroll
from textual.reactive import reactive
from textual.widget import Widget
from textual.widgets import DataTable, Footer, Header, Static, TabbedContent, TabPane

from mrok.agent.devtools.ipc import PubSubManager
from mrok.conf import get_settings
from mrok.http.datastructures import Response


def color_for_status(status: int) -> str:
if 200 <= status < 300:
return "green"
return "red"


class RequestDetailCard(Widget):
request = reactive(None, recompose=True)

def compose(self):
with Vertical():
if self.request:
yield from self._build_card()
else:
yield Static("Select a request to view details.", classes="summary")

def _build_card(self):
r = self.request or {} # type: ignore
method = r.get("method", "")
path = r.get("path", "")
status = int(r.get("status", 0))
duration = int(r.get("duration", 0) * 1000)
parameters = r.get("query_string")
req_headers = r.get("request_headers", [])
res_headers = r.get("response_headers", [])
req_body = r.get("request_body")
res_body = r.get("response_body")

summary_text = Text()
summary_text.append(f"{method} ", style="bold cyan")
summary_text.append(f"{path}\n", style="bold")
summary_text.append("Status: ", style="bold")
summary_text.append(f"{status} ", style=f"bold {color_for_status(status)}")
summary_text.append(f"• Duration: {duration} ms", style="dim")
req_headers_formatted = "\n".join(f"[yellow]{k}[/]: {v}" for k, v in req_headers)
res_headers_formatted = "\n".join(f"[yellow]{k}[/]: {v}" for k, v in res_headers)

yield Static(Panel(summary_text, title="Request Summary", expand=False))
with TabbedContent(initial="response"):
with TabPane("Response body", id="response"):
if res_body:
yield VerticalScroll(
Static(Syntax(res_body, "json", theme="monokai", word_wrap=True))
)
else:
yield Static("No body")
with TabPane("Request headers", id="req_headers"):
yield VerticalScroll(
Static(Panel(req_headers_formatted or "<none>", title="Headers"))
)
with TabPane("Response headers", id="res_headers"):
yield VerticalScroll(
Static(Panel(res_headers_formatted or "<none>", title="Headers"))
)
with TabPane("Request body", id="request"):
if req_body:
yield VerticalScroll(
Static(Syntax(req_body, "json", theme="monokai", word_wrap=True))
)
else:
yield Static("No body")
with TabPane("Request parameters", id="params"):
yield VerticalScroll(Static(parameters or "No parameters"))


class InspectorApp(App):
TITLE = "mrok Agent Web Inspection Interface"
BINDINGS = [("d", "toggle_dark", "Toggle dark mode")]
CSS = """
#app-grid {
layout: grid;
grid-size: 2;
grid-columns: 1fr 2fr;
grid-rows: 1fr;
height: 100%;
width: 100%;
}

#left-pane {
background: $panel;
padding: 1;
border: round $accent;
height: 100%;
}

#right-pane {
background: $boost;
padding: 1;
border: round $accent-darken-1;
height: 100%;
overflow: auto;
}

#right-pane > Static {
background: $surface;
height: 100%;
overflow: auto;
padding: 1;
}
"""

def __init__(self):
super().__init__()
self.detail_card = RequestDetailCard()
self.refresh_interval = 10.0
self.requests = {}
self.settings = get_settings()
self.manager = PubSubManager(address=("localhost", 50000), authkey=b"secret")
self.consumer_name = f"mrok_devtools_{os.getpid()}"
self.queue = None

def compose(self) -> ComposeResult:
yield Header()
with Container(id="app-grid"):
with VerticalScroll(id="left-pane"):
yield DataTable(id="requests", zebra_stripes=True, cursor_type="row")
with Horizontal(id="right-pane"):
yield self.detail_card
yield Footer()

async def on_mount(self):
table = self.query_one("#requests", DataTable)
table.add_columns(("ID", "id"), "Method", "Status", "Path", "Duration (ms)")
self.manager.connect()
self.queue = self.manager.register_subscriber(self.consumer_name)
self.heartbeat()
self.consumer()

@work(exclusive=True, thread=True)
def heartbeat(self):
while True:
try:
self.manager.send_heartbeat(self.consumer_name)
except Exception:
return
time.sleep(1)

@work(exclusive=True, thread=True)
def consumer(self):
table = self.query_one("#requests", DataTable)
while True:
try:
response: Response = self.queue.get()
req_id = time.monotonic_ns()
self.requests[req_id] = response
table.add_row(
req_id,
response.request.method,
response.status,
response.request.url,
response.duration,
key=req_id,
)
except Exception:
return
time.sleep(1)

# # self.set_interval(1, self.refresh_data)

# def refresh_data(self):
# log("getting item from store")
# try:
# r = self.store.get(timeout=0.5)
# except Empty:
# return
# log(f"add item {r} to table")
# self.table.add_row(
# r["id"],
# r.get("method", ""),
# r.get("status", ""),
# r.get("path", ""),
# int((r.get("duration", 0)) * 1000),
# key=str(r["id"]),
# )
# self.requests[str(r["id"])] = r

# def on_data_table_row_selected(self, event: DataTable.RowSelected) -> None:
# req_id = event.row_key.value
# self.detail_card.request = self.requests.get(req_id)


if __name__ == "__main__": # pragma: no cover
app = InspectorApp()
app.run()
52 changes: 52 additions & 0 deletions mrok/agent/devtools/ipc.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
import time
from multiprocessing.managers import BaseManager, ListProxy
from queue import Queue

subscribers = {}
heartbeats = {}
HEARTBEAT_TIMEOUT = 5 * 60 # five mins


def register_subscriber(name):
q = Queue()
subscribers[name] = q
heartbeats[name] = time.time()
return q


def send_heartbeat(name):
heartbeats[name] = time.time()


def unsubscribe(name):
subscribers.pop(name, None)
heartbeats.pop(name, None)


def get_subscriber(name):
return subscribers[name]


def get_subscribers():
return list(subscribers.keys())


class PubSubManager(BaseManager):
pass


PubSubManager.register("register_subscriber", callable=register_subscriber)
PubSubManager.register("send_heartbeat", callable=send_heartbeat)
PubSubManager.register("unsubscribe", callable=unsubscribe)
PubSubManager.register("get_subscribers", callable=get_subscribers, proxytype=ListProxy)
PubSubManager.register("get_subscriber", callable=get_subscriber)


def cleanup_thread():
while True:
now = time.time()
dead = [n for n, ts in heartbeats.items() if now - ts > HEARTBEAT_TIMEOUT]
for name in dead:
subscribers.pop(name, None)
heartbeats.pop(name, None)
time.sleep(2)
14 changes: 11 additions & 3 deletions mrok/agent/sidecar/app.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
from pathlib import Path
from typing import Any

from mrok.http.forwarder import ForwardAppBase
from mrok.http.forwarder import ForwardAppBase, RequestCompleteCallback, ResponseCompleteCallback

logger = logging.getLogger("mrok.proxy")

Expand All @@ -15,9 +15,17 @@

class ForwardApp(ForwardAppBase):
def __init__(
self, target_address: str | Path | tuple[str, int], read_chunk_size: int = 65536
self,
target_address: str | Path | tuple[str, int],
read_chunk_size: int = 65536,
on_request_complete: RequestCompleteCallback | None = None,
on_response_complete: ResponseCompleteCallback | None = None,
) -> None:
super().__init__(read_chunk_size=read_chunk_size)
super().__init__(
read_chunk_size=read_chunk_size,
on_request_complete=on_request_complete,
on_response_complete=on_response_complete,
)
self._target_address = target_address

async def select_backend(
Expand Down
34 changes: 27 additions & 7 deletions mrok/agent/sidecar/main.py
Original file line number Diff line number Diff line change
@@ -1,16 +1,35 @@
import asyncio
import contextlib
import logging
from functools import partial
from pathlib import Path
from threading import Thread

from mrok.agent.devtools.ipc import PubSubManager, cleanup_thread
from mrok.agent.sidecar.app import ForwardApp
from mrok.http.config import MrokBackendConfig
from mrok.http.datastructures import Response
from mrok.http.master import Master
from mrok.http.server import MrokServer

logger = logging.getLogger("mrok.proxy")

def run_sidecar(identity_file: str, target_addr: str | Path | tuple[str, int]):
config = MrokBackendConfig(ForwardApp(target_addr), identity_file)

def run_sidecar(
identity_file: str,
target_addr: str | Path | tuple[str, int],
):
mgr = PubSubManager(address=("localhost", 50000), authkey=b"secret")
mgr.connect()

def on_response_complete(response: Response):
for subscriber in mgr.get_subscribers(): # type: ignore
queue = mgr.get_subscriber(subscriber) # type: ignore
queue.put(response)

config = MrokBackendConfig(
ForwardApp(target_addr, on_response_complete=on_response_complete), identity_file
)
server = MrokServer(config)
with contextlib.suppress(KeyboardInterrupt, asyncio.CancelledError):
server.run()
Expand All @@ -19,9 +38,10 @@ def run_sidecar(identity_file: str, target_addr: str | Path | tuple[str, int]):
def run(
identity_file: str,
target_addr: str | Path | tuple[str, int],
workers=4,
reload=False,
workers: int = 4,
):
start_fn = partial(run_sidecar, identity_file, target_addr)
master = Master(start_fn, workers=workers, reload=reload)
master.run()
Thread(target=cleanup_thread, daemon=True).start()
with PubSubManager(address=("localhost", 50000), authkey=b"secret"):
start_fn = partial(run_sidecar, identity_file, target_addr)
master = Master(start_fn, workers=workers, reload=False)
master.run()
2 changes: 2 additions & 0 deletions mrok/cli/commands/agent/__init__.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
import typer

from mrok.cli.commands.agent.dev import app as dev_app
from mrok.cli.commands.agent.run import app as run_app

app = typer.Typer(help="mrok agent commands.")
app.add_typer(run_app)
app.add_typer(dev_app)
7 changes: 7 additions & 0 deletions mrok/cli/commands/agent/dev/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
import typer

from mrok.cli.commands.agent.dev import console, web

app = typer.Typer(name="dev", help="Dev tools for mrok agent.")
console.register(app)
web.register(app)
10 changes: 10 additions & 0 deletions mrok/cli/commands/agent/dev/console.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
import typer

from mrok.agent.devtools.inspector import InspectorApp


def register(app: typer.Typer) -> None:
@app.command("console")
def run_dev_console():
app = InspectorApp()
app.run()
18 changes: 18 additions & 0 deletions mrok/cli/commands/agent/dev/web.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
import pathlib

import typer
from textual_serve.server import Server


def register(app: typer.Typer) -> None:
@app.command("web")
def run_web_console():
script = (
pathlib.Path(__file__).parent.parent.parent.parent.parent
/ "agent/devtools/inspector.py"
)
server = Server(
str(script),
port=8787,
)
server.serve()
Loading
Loading