diff --git a/src/melobot/plugin/imp.py b/src/melobot/plugin/imp.py index 6491504e..7abec7b0 100644 --- a/src/melobot/plugin/imp.py +++ b/src/melobot/plugin/imp.py @@ -1,16 +1,27 @@ import sys -from importlib._bootstrap_external import _get_supported_file_loaders +from importlib._bootstrap_external import PathFinder as _PathFinder +from importlib._bootstrap_external import ( + _get_supported_file_loaders, + _NamespaceLoader, + _NamespacePath, +) from importlib.abc import Loader, MetaPathFinder -from importlib.machinery import ModuleSpec +from importlib.machinery import ModuleSpec, all_suffixes from importlib.util import module_from_spec, spec_from_file_location from os import PathLike from pathlib import Path from types import ModuleType -from typing import Any, Sequence +from typing import Any, Sequence, cast from ..exceptions import DynamicImpError from ..utils import singleton +ALL_EXTS = tuple(all_suffixes()) +NAMESPACE_PKG_TAG = "__melobot_namespace__" + + +class _NestedQuickExit(BaseException): ... + @singleton class SpecFinder(MetaPathFinder): @@ -32,30 +43,66 @@ def find_spec( else: name = fullname - for entry in paths: - mod_path = Path(entry).joinpath(name) - if mod_path.is_dir(): - file = mod_path.joinpath("__init__.py") - submodule_locations = [str(mod_path)] - if not mod_path.exists(): - continue - else: - file = Path(entry).joinpath(f"{name}.py") - submodule_locations = None - if not file.exists(): - continue - - file = file.resolve() - return spec_from_file_location( - fullname, - file, - loader=ModuleLoader( - fullname, file, sys_cache, load_cache, pre_sys_len, pre_cache_len - ), - submodule_search_locations=submodule_locations, - ) + mod_path: Path | None = None + submod_locs: list[str] | None = None + # The spec finding according PEP420: https://peps.python.org/pep-0420/#specification + try: + for entry in paths: + dir_path = Path(entry).joinpath(name) + pkg_init_path = dir_path.joinpath("__init__.py") + if pkg_init_path.exists(): + mod_path = pkg_init_path + submod_locs = [str(dir_path)] + raise _NestedQuickExit + + for ext in ALL_EXTS: + _mod_path = Path(entry).joinpath(f"{name}{ext}") + if _mod_path.exists(): + mod_path = _mod_path + submod_locs = None + raise _NestedQuickExit + + if dir_path.exists() and dir_path.is_dir(): + submod_locs = _NamespacePath( + fullname, + [str(dir_path)], + _PathFinder()._get_spec, # pylint: disable=protected-access + ) + spec = spec_from_file_location( + fullname, + dir_path, + loader=ModuleLoader( + fullname, + dir_path, + sys_cache, + load_cache, + pre_sys_len, + pre_cache_len, + _NamespaceLoader(fullname, dir_path, submod_locs), + ), + submodule_search_locations=submod_locs, + ) + assert spec is not None + spec.has_location = False + spec.origin = None + setattr(spec, NAMESPACE_PKG_TAG, True) + return spec + + except _NestedQuickExit: + pass - return None + if mod_path is None and submod_locs is None: + return None + + mod_path = cast(Path, mod_path).resolve() + return spec_from_file_location( + fullname, + mod_path, + loader=ModuleLoader( + fullname, mod_path, sys_cache, load_cache, pre_sys_len, pre_cache_len + ), + submodule_search_locations=submod_locs, + ) sys.meta_path.insert(0, SpecFinder()) @@ -74,6 +121,7 @@ def has_cache(self, mod: ModuleType) -> bool: return mod in self._caches.values() def get_cache(self, path: Path) -> ModuleType | None: + # 对应有 __init__.py 的包模块 if path.parts[-1] == "__init__.py": path = path.parent return self._caches.get(path) @@ -86,7 +134,7 @@ def set_cache(self, name: str, mod: ModuleType) -> None: ): return - # __file__ 存在且不为空,可能包或文件,包对应 __init__.py,应该转换为不包含 __init__.py 后缀的形式 + # __file__ 存在且不为空,可能包或任意可被加载的文件,包对应 __init__.py,应该转换为不包含 __init__.py 后缀的形式 if mod.__file__ is not None: fp = Path(mod.__file__) if fp.parts[-1] == "__init__.py": @@ -120,6 +168,7 @@ def __init__( load_cache: bool, pre_sys_len: int = -1, pre_cache_len: int = -1, + inner_loader: Loader | None = None, ) -> None: super().__init__() self.cacher = ModuleCacher() @@ -130,7 +179,10 @@ def __init__( self.pre_sys_len = pre_sys_len self.pre_cache_len = pre_cache_len - self.inner_loader: Loader | None = None + self.inner_loader: Loader | None = inner_loader + if inner_loader is not None: + return + for loader_class, suffixes in _get_supported_file_loaders(): if str(fp).endswith(tuple(suffixes)): loader = loader_class(fullname, str(fp)) # pylint: disable=not-callable @@ -146,11 +198,13 @@ def create_module(self, spec: ModuleSpec) -> ModuleType | None: return mod def exec_module(self, mod: ModuleType) -> None: - if self.cacher.has_cache(mod) or not self.fp.exists(): + if self.cacher.has_cache(mod): pass else: if self.inner_loader is not None: self.inner_loader.exec_module(mod) + if hasattr(mod.__spec__, NAMESPACE_PKG_TAG): + mod.__file__ = None if self.use_load_cache: self.cacher.set_cache(self.fullname, mod)