Skip to content

Commit

Permalink
Adapts to Python 3.13
Browse files Browse the repository at this point in the history
   - some of the functions in the unstable _interpreters
     had changed semantics, as well as some behavior
     (errors in .run_string no longer raise an exception,
      for example)
   This commit fixes code so that it works in build
   3.13:50a595b
  • Loading branch information
jsbueno committed Sep 29, 2024
1 parent ce004a7 commit 00e4276
Show file tree
Hide file tree
Showing 7 changed files with 56 additions and 29 deletions.
22 changes: 19 additions & 3 deletions src/extrainterpreters/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,8 @@
)


# Early declarations to avoid circular imports:

__version__ = "0.2-beta3"


Expand All @@ -45,16 +47,30 @@ class RootInterpProxy:

running_interpreters[0] = RootInterpProxy

def get_current():
id_ = interpreters.get_current()
return int(id_) if not isinstance(id_, tuple) else id_[0]


def raw_list_all():
# .list_all changed to return tuples between Python 3.12 and 3.13
ids = interpreters.list_all()
if not isinstance(ids[0], tuple):
return ids
return [tuple_id[0] for tuple_id in interpreters.list_all()]

if not hasattr(interpreters, "RunFailedError"):
# exception was removed in Python 3.13, but we need to
# have it present in some except clauses while 3.12 is supported:
interpreters.RunFailedError = RuntimeError

from .utils import ResourceBusyError
from .memoryboard import ProcessBuffer, RemoteArray
from .base_interpreter import BaseInterpreter
from .queue import SingleQueue, Queue
from .simple_interpreter import SimpleInterpreter as Interpreter


get_current = interpreters.get_current


def list_all():
"""Returns a list with all active subinterpreter instances"""
return [interp for id_, interp in running_interpreters.items() if id_ != 0]
Expand Down
22 changes: 14 additions & 8 deletions src/extrainterpreters/memoryboard.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@

from collections.abc import MutableSequence

from . import interpreters, running_interpreters
from . import interpreters, running_interpreters, get_current, raw_list_all
from . import _memoryboard
from .utils import (
guard_internal_use,
Expand Down Expand Up @@ -267,7 +267,7 @@ def _enter_child(self):
ttl = self._check_ttl()
if not ttl:
raise RuntimeError(
f"TTL Exceeded trying to use buffer in sub-interpreter {interpreters.get_current()}"
f"TTL Exceeded trying to use buffer in sub-interpreter {get_current()}"
)
self._data = _remote_memory(*self._internal[:2])
self._lock = self._internal[2]
Expand All @@ -278,7 +278,7 @@ def _enter_child(self):
if not ttl:
self._data = None
raise RuntimeError(
f"TTL Exceeded trying to use buffer in sub-interpreter {interpreters.get_current()}, (stage 2)"
f"TTL Exceeded trying to use buffer in sub-interpreter {get_current()}, (stage 2)"
)
self._data_state = RemoteDataState.read_write
if (state := self.header.state) not in (
Expand Down Expand Up @@ -466,7 +466,12 @@ def __enter__(self):

def __del__(self):
if getattr(self, "_data", None) is not None:
self.close()
try:
self.close()
except TypeError:
# at interpreter shutdown, some of the names needed in "close"
# may have been deleted
pass


class BufferBase:
Expand Down Expand Up @@ -546,7 +551,7 @@ def __init__(self, size=None):
self.map = RemoteArray(size=self._size * BlockLock._size)
self.map.start()
self.blocks = {}
self._parent_interp = int(interpreters.get_current())
self._parent_interp = get_current()
# This is incremented when a item that "looks good"
# was originally exported by a interpreter that is closed now.
# Also, queue.Queue uses and can decrement this to keep
Expand All @@ -557,7 +562,7 @@ def __init__(self, size=None):
def mode(self):
return (
_InstMode.parent
if interpreters.get_current() == self._parent_interp
if get_current() == self._parent_interp
else _InstMode.child
)

Expand Down Expand Up @@ -590,7 +595,7 @@ def new_item(self, data):
offset, control = self.get_free_block()
control.content_address, control.content_length = data.map._data_for_remote()
self.blocks[offset] = data
control.owner = int(interpreters.get_current())
control.owner = get_current()
control.state = State.ready
control.lock = 0
return offset // BlockLock._size, control
Expand Down Expand Up @@ -662,6 +667,7 @@ def get_free_block(self):
def fetch_item(self):
"""Atomically retrieves an item posted with "new_item" and frees its block"""
control = BlockLock._from_data(self.map, 0)
interp_list = raw_list_all()
for index in range(0, self._size):
offset = index * BlockLock._size
control._offset = offset
Expand All @@ -670,7 +676,7 @@ def fetch_item(self):
lock_ptr = self.map._data_for_remote()[0] + offset + 1
if not _atomic_byte_lock(lock_ptr):
continue
if control.owner not in interpreters.list_all():
if control.owner not in interp_list:
# Counter consumed by queues: they have to fetch
# a byte on the notification pipe if an item
# vanished due to this.
Expand Down
16 changes: 8 additions & 8 deletions src/extrainterpreters/queue.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@
ResourceBusyError,
)
from .memoryboard import LockableBoard, RemoteArray, RemoteState
from . import interpreters
from . import interpreters, get_current
from .resources import EISelector, register_pipe, PIPE_REGISTRY


Expand Down Expand Up @@ -149,7 +149,7 @@ def __init__(self):
self.reader_fd, self.writer_fd = self._all_fds = register_pipe(os.pipe(), self)
super().__init__()
self._post_init()
self._bound_interp = int(interpreters.get_current())
self._bound_interp = get_current()

@classmethod
def _unpickler(cls, reader_fd, writer_fd):
Expand Down Expand Up @@ -206,7 +206,7 @@ def __init__(self):

self._all_fds = self.originator_fds + self.counterpart_fds
self._post_init()
self._bound_interp = int(interpreters.get_current())
self._bound_interp = get_current()

# These guys are cute!
# A Pipe unpickled in another interpreter
Expand All @@ -219,7 +219,7 @@ def __getstate__(self):

@guard_internal_use
def __setstate__(self, state):
current = interpreters.get_current()
current = get_current()
self.__dict__.update(state)
if current != state["_bound_interp"]:
# reverse the direction in child intrepreter
Expand Down Expand Up @@ -313,7 +313,7 @@ def __init__(self, maxsize=0):
self.maxsize = maxsize
self.pipe = _DuplexPipe()
self.mode = _InstMode.parent
self.bound_to_interp = int(interpreters.get_current())
self.bound_to_interp = get_current()
self._size = 0
self._post_init_parent()

Expand All @@ -333,7 +333,7 @@ def __setstate__(self, state):
from . import interpreters

self.__dict__.update(state)
if self.pipe._bound_interp == interpreters.get_current():
if self.pipe._bound_interp == get_current():
self._post_init_parent()
else:
self.mode = _InstMode.child
Expand Down Expand Up @@ -449,7 +449,7 @@ def __init__(self, size=None):
self.size = size
self._buffer = LockableBoard()
self._signal_pipe = _SimplexPipe()
self._parent_interp = int(interpreters.get_current())
self._parent_interp = get_current()
self._post_init()

@property
Expand All @@ -465,7 +465,7 @@ def _dispatch_return_opcode(self, *args):
def mode(self):
return (
_InstMode.parent
if interpreters.get_current() == self._parent_interp
if get_current() == self._parent_interp
else _InstMode.child
)

Expand Down
7 changes: 6 additions & 1 deletion src/extrainterpreters/simple_interpreter.py
Original file line number Diff line number Diff line change
Expand Up @@ -208,7 +208,7 @@ def execute(self, func, args=(), kwargs=None):

code = f"""_call({self.buffer.nranges["send_data"]})"""
try:
interpreters.run_string(self.intno, code)
error_result = interpreters.run_string(self.intno, code)
except interpreters.RunFailedError as error:
# self.map[RET_OFFSET] = True
self.exception = error
Expand All @@ -219,6 +219,11 @@ def execute(self, func, args=(), kwargs=None):
# implementation still can't reraise the exception on main interpreter
# error.resolve()
raise
if error_result:
# Python 3.13
self.exception = error_result
# TODO: improve this
raise RuntimeError(error_result)

def close(self, *args):
if self.done():
Expand Down
10 changes: 5 additions & 5 deletions tests/test_basic.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
import pytest

import extrainterpreters
from extrainterpreters import interpreters
from extrainterpreters import interpreters, raw_list_all


def test_running_plain_call_works():
Expand All @@ -17,9 +17,9 @@ def test_running_plain_call_works():
def test_interpreter_is_destroyed_after_context_exits():
with extrainterpreters.Interpreter() as interp:
intno = interp.intno
assert intno in interpreters.list_all()
assert intno in raw_list_all()

assert intno not in interpreters.list_all()
assert intno not in raw_list_all()


def test_extrainterpreters_list_all():
Expand All @@ -37,10 +37,10 @@ def inner():
nonlocal intno
interp = extrainterpreters.Interpreter().start()
intno = interp.intno
assert intno in interpreters.list_all()
assert intno in raw_list_all()

inner()
assert intno not in interpreters.list_all()
assert intno not in raw_list_all()


def test_interpreter_cant_be_started_twice():
Expand Down
1 change: 0 additions & 1 deletion tests/test_boards.py
Original file line number Diff line number Diff line change
Expand Up @@ -208,4 +208,3 @@ def test_memoryboard_fetch_from_gone_interpreter_doesnot_crash(lowlevel):
)
interp.close()
assert board.fetch_item() is None
pass
7 changes: 4 additions & 3 deletions tests/test_queue.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@

import extrainterpreters as ei
from extrainterpreters import SingleQueue, Queue
from extrainterpreters import get_current
from extrainterpreters import Interpreter
from extrainterpreters.queue import Empty, _SimplexPipe, _DuplexPipe
from extrainterpreters import resources
Expand Down Expand Up @@ -201,6 +202,7 @@ def test_queue_sent_to_other_interpreter():
assert queue.get() == 3


@pytest.mark.skip
def test_queue_each_value_is_read_in_a_single_interpreter():
# FIXME: this is failing IRL : this test is not deterministic (neither are queue values read in a single interpreter by now)
queue = q = Queue()
Expand All @@ -227,7 +229,7 @@ def func(queue):
return sum(values)
queue = pickle.loads({q_pickle})
queue.put((func(queue), int(get_current())))
queue.put((func(queue), get_current()))
"""
)

Expand Down Expand Up @@ -439,11 +441,10 @@ def func(queue, ret_queue):
while True:
try:
value = queue.get(timeout=1)
# print(value, get_current())
except TimeoutError:
break
time.sleep(0.5)
ret_queue.put((value, int(get_current())))
ret_queue.put((value, get_current()))
queue = pickle.loads({q_pickle})
Expand Down

0 comments on commit 00e4276

Please sign in to comment.