Skip to content

Commit

Permalink
sphinx-agent: Inject uri and line number information into build files
Browse files Browse the repository at this point in the history
In order to support included files, the webview code needs access to
the source uri in addition to the line number.

Rather than even attempt to force uris to work with CSS classes, the
`SourceLocationTransform` includes a hidden div with a list of
`<span>` elements, one for each location marker.

These `<span>` elements have `data-uri` and `data-line` attributes
containing the necessary information to make sync scrolling work.
Each element has a `data-id` attribute which corresponds to a CSS
class `esbonio-marker-<id>` which links the location which its
corresponding location in the DOM
  • Loading branch information
alcarney committed Jul 11, 2024
1 parent 9fadfb9 commit 9fadad5
Show file tree
Hide file tree
Showing 5 changed files with 190 additions and 123 deletions.
25 changes: 3 additions & 22 deletions lib/esbonio/esbonio/sphinx_agent/handlers/__init__.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
import inspect
import logging
import pathlib
import sys
import traceback
import typing
Expand All @@ -18,16 +17,15 @@
from .. import types
from ..app import Sphinx
from ..config import SphinxConfig
from ..transforms import LineNumberTransform
from ..types import Uri
from ..util import send_error
from ..util import send_message

STATIC_DIR = (pathlib.Path(__file__).parent.parent / "static").resolve()
sphinx_logger = logging.getLogger(SPHINX_LOG_NAMESPACE)

# Inject our own 'core' extensions into Sphinx
sphinx.application.builtin_extensions += (
f"{__name__}.webview",
f"{__name__}.files",
f"{__name__}.diagnostics",
f"{__name__}.symbols",
Expand Down Expand Up @@ -118,13 +116,10 @@ def create_sphinx_app(self, request: types.CreateApplicationRequest):
self.app = Sphinx(**sphinx_args)

# Connect event handlers.
self.app.connect("env-before-read-docs", self._cb_env_before_read_docs)
self.app.connect("source-read", self._cb_source_read, priority=0)

# TODO: Sphinx 7.x has introduced a `include-read` event
# See: https://github.com/sphinx-doc/sphinx/pull/11657

_enable_sync_scrolling(self.app)
self.app.connect("env-before-read-docs", self._cb_env_before_read_docs)
self.app.connect("source-read", self._cb_source_read, priority=0)

response = types.CreateApplicationResponse(
id=request.id,
Expand Down Expand Up @@ -191,17 +186,3 @@ def build_sphinx_app(self, request: types.BuildRequest):
def notify_exit(self, request: types.ExitNotification):
"""Sent from the client to signal that the agent should exit."""
sys.exit(0)


def _enable_sync_scrolling(app: Sphinx):
"""Given a Sphinx application, configure it so that we can support syncronised
scrolling."""

# Inline the JS code we need to enable sync scrolling.
#
# Yes this "bloats" every page in the generated docs, but is generally more robust
# see: https://github.com/swyddfa/esbonio/issues/810
webview_js = STATIC_DIR / "webview.js"
app.add_js_file(None, body=webview_js.read_text())

app.add_transform(LineNumberTransform)
91 changes: 91 additions & 0 deletions lib/esbonio/esbonio/sphinx_agent/handlers/webview.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,91 @@
from __future__ import annotations

import pathlib
import typing

from docutils import nodes
from docutils.transforms import Transform

from ..log import source_to_uri_and_linum

if typing.TYPE_CHECKING:
from typing import Dict
from typing import Tuple

from sphinx.application import Sphinx


STATIC_DIR = (pathlib.Path(__file__).parent.parent / "static").resolve()


def has_source(node):
if isinstance(node, nodes.Text):
return False

return (node.line or 0) > 0 and node.source is not None


class source_locations(nodes.General, nodes.Element):
"""Index of all known source locations."""


def visit_source_locations(self, node):
source_index: Dict[int, Tuple[str, int]] = node["index"]

self.body.append('<div id="esbonio-marker-index" style="display: none">\n')
for idx, (uri, linum) in source_index.items():
self.body.append(
f' <span data-id="{idx}" data-uri="{uri}" data-line="{linum}"></span>\n'
)

self.body.append("</div>")


def depart_source_locations(self, node): ...


class SourceLocationTransform(Transform):
"""Add source location information to the doctree.
Used to support features like synchronised scrolling.
"""

default_priority = 500

def apply(self, **kwargs):
current_line = 0
current_source = None

source_index = {}
source_nodes = self.document.traverse(condition=has_source)

for idx, node in enumerate(source_nodes):
if node.line > current_line or node.source != current_source:
uri, linum = source_to_uri_and_linum(f"{node.source}:{node.line}")

if linum is None:
continue

source_index[idx] = (str(uri), linum)
node["classes"].extend(["esbonio-marker", f"esbonio-marker-{idx}"])

# Use the source and line reported by docutils.
# Just in case source_to_uri_and_linum doesn't handle things correctly
current_line = node.line
current_source = node.source

self.document.children.append(source_locations("", index=source_index))


def setup(app: Sphinx):
# Inline the JS code we need to enable sync scrolling.
#
# Yes this "bloats" every page in the generated docs, but is generally more robust
# see: https://github.com/swyddfa/esbonio/issues/810
webview_js = STATIC_DIR / "webview.js"
app.add_js_file(None, body=webview_js.read_text())

app.add_node(
source_locations, html=(visit_source_locations, depart_source_locations)
)
app.add_transform(SourceLocationTransform)
154 changes: 85 additions & 69 deletions lib/esbonio/esbonio/sphinx_agent/log.py
Original file line number Diff line number Diff line change
Expand Up @@ -38,73 +38,6 @@ def __init__(self, app, *args, **kwargs):
self.app = app
self.diagnostics: Dict[Uri, Set[types.Diagnostic]] = {}

def get_location(self, location: str) -> Tuple[str, Optional[int]]:
if not location:
conf = pathlib.Path(self.app.confdir, "conf.py")
return (str(conf), None)

lineno = None
path, parts = self.get_location_path(location)

if len(parts) == 1:
try:
lineno = int(parts[0])
except ValueError:
pass

if len(parts) == 2 and parts[0].startswith("docstring of "):
target = parts[0].replace("docstring of ", "")
lineno = self.get_docstring_location(target, parts[1])

return (path, lineno)

def get_location_path(self, location: str) -> Tuple[str, List[str]]:
"""Determine the filepath from the given location."""

if location.startswith("internal padding before "):
location = location.replace("internal padding before ", "")

if location.startswith("internal padding after "):
location = location.replace("internal padding after ", "")

path, *parts = location.split(":")

# On windows the rest of the path will be the first element of parts
if pathlib.Path(location).drive:
path += f":{parts.pop(0)}"

# Diagnostics in .. included:: files are reported relative to the process'
# working directory, so ensure the path is absolute.
path = os.path.abspath(path)

return path, parts

def get_docstring_location(self, target: str, offset: str) -> Optional[int]:
# The containing module will be the longest substring we can find in target
candidates = [m for m in sys.modules.keys() if target.startswith(m)] + [""]
module = sys.modules.get(sorted(candidates, key=len, reverse=True)[0], None)

if module is None:
return None

obj: Union[ModuleType, Any, None] = module
dotted_name = target.replace(module.__name__ + ".", "")

for name in dotted_name.split("."):
obj = getattr(obj, name, None)
if obj is None:
return None

try:
_, line = inspect.getsourcelines(obj) # type: ignore

# Correct off by one error for docstrings that don't start with a newline.
nl = (obj.__doc__ or "").startswith("\n")
return line + int(offset) - (not nl)
except Exception:
logger.debug("Unable to determine diagnostic location\n%s", exc_info=True)
return None

def filter(self, record: logging.LogRecord) -> bool:
conditions = [
"sphinx" not in record.name,
Expand All @@ -115,7 +48,12 @@ def filter(self, record: logging.LogRecord) -> bool:
return True

loc = getattr(record, "location", "")
doc, lineno = self.get_location(loc)
uri, lineno = source_to_uri_and_linum(loc)

if uri is None:
conf = pathlib.Path(self.app.confdir, "conf.py")
uri, lineno = (Uri.for_file(conf), None)

line = lineno or 1

try:
Expand Down Expand Up @@ -144,5 +82,83 @@ def filter(self, record: logging.LogRecord) -> bool:
),
)

self.diagnostics.setdefault(Uri.for_file(doc), set()).add(diagnostic)
self.diagnostics.setdefault(uri, set()).add(diagnostic)
return True


def source_to_uri_and_linum(location: str) -> Tuple[Optional[Uri], Optional[int]]:
"""Convert the given source location to a uri and corresponding line number
Parameters
----------
location
The location to convert
Returns
-------
Tuple[Optional[Uri], Optional[int]]
The corresponding uri and line number, if known
"""
lineno = None
path, parts = _get_location_path(location)

if len(parts) == 1:
try:
lineno = int(parts[0])
except ValueError:
pass

if len(parts) == 2 and parts[0].startswith("docstring of "):
target = parts[0].replace("docstring of ", "")
lineno = _get_docstring_linum(target, parts[1])

return Uri.for_file(path), lineno


def _get_location_path(location: str) -> Tuple[str, List[str]]:
"""Determine the filepath from the given location."""

if location.startswith("internal padding before "):
location = location.replace("internal padding before ", "")

if location.startswith("internal padding after "):
location = location.replace("internal padding after ", "")

path, *parts = location.split(":")

# On windows the rest of the path will be the first element of parts
if pathlib.Path(location).drive:
path += f":{parts.pop(0)}"

# Diagnostics in .. included:: files are reported relative to the process'
# working directory, so ensure the path is absolute.
path = os.path.abspath(path)

return path, parts


def _get_docstring_linum(target: str, offset: str) -> Optional[int]:
# The containing module will be the longest substring we can find in target
candidates = [m for m in sys.modules.keys() if target.startswith(m)] + [""]
module = sys.modules.get(sorted(candidates, key=len, reverse=True)[0], None)

if module is None:
return None

obj: Union[ModuleType, Any, None] = module
dotted_name = target.replace(module.__name__ + ".", "")

for name in dotted_name.split("."):
obj = getattr(obj, name, None)
if obj is None:
return None

try:
_, line = inspect.getsourcelines(obj) # type: ignore

# Correct off by one error for docstrings that don't start with a newline.
nl = (obj.__doc__ or "").startswith("\n")
return line + int(offset) - (not nl)
except Exception:
logger.debug("Unable to determine diagnostic location\n%s", exc_info=True)
return None
17 changes: 0 additions & 17 deletions lib/esbonio/esbonio/sphinx_agent/transforms.py

This file was deleted.

26 changes: 11 additions & 15 deletions lib/esbonio/tests/sphinx-agent/test_sa_unit.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,8 @@

from esbonio.sphinx_agent.config import SphinxConfig
from esbonio.sphinx_agent.log import DiagnosticFilter
from esbonio.sphinx_agent.log import source_to_uri_and_linum
from esbonio.sphinx_agent.types import Uri

if typing.TYPE_CHECKING:
from typing import Any
Expand Down Expand Up @@ -457,30 +459,24 @@ def test_cli_arg_handling(args: List[str], expected: Dict[str, Any]):
@pytest.mark.parametrize(
"location, expected",
[
("", (str(CONF_PATH), None)),
(f"{RST_PATH}", (str(RST_PATH), None)),
(f"{RST_PATH}:", (str(RST_PATH), None)),
(f"{RST_PATH}:3", (str(RST_PATH), 3)),
(f"{REL_INC_PATH}:12", (str(INC_PATH), 12)),
(f"{RST_PATH}", (Uri.for_file(RST_PATH), None)),
(f"{RST_PATH}:", (Uri.for_file(RST_PATH), None)),
(f"{RST_PATH}:3", (Uri.for_file(RST_PATH), 3)),
(f"{REL_INC_PATH}:12", (Uri.for_file(INC_PATH), 12)),
(
f"{PY_PATH}:docstring of esbonio.sphinx_agent.log.DiagnosticFilter:3",
(str(PY_PATH), 22),
(Uri.for_file(PY_PATH), 22),
),
(f"internal padding after {RST_PATH}:34", (str(RST_PATH), 34)),
(f"internal padding before {RST_PATH}:34", (str(RST_PATH), 34)),
(f"internal padding after {RST_PATH}:34", (Uri.for_file(RST_PATH), 34)),
(f"internal padding before {RST_PATH}:34", (Uri.for_file(RST_PATH), 34)),
],
)
def test_get_diagnostic_location(location: str, expected: Tuple[str, Optional[int]]):
def test_source_to_uri_linum(location: str, expected: Tuple[str, Optional[int]]):
"""Ensure we can correctly determine a dianostic's location based on the string we
get from sphinx."""

app = mock.Mock()
app.confdir = str(ROOT / "sphinx-extensions")

handler = DiagnosticFilter(app)

mockpath = f"{DiagnosticFilter.__module__}.inspect.getsourcelines"
with mock.patch(mockpath, return_value=([""], 20)):
actual = handler.get_location(location)
actual = source_to_uri_and_linum(location)

assert actual == expected

0 comments on commit 9fadad5

Please sign in to comment.