diff --git a/jac/jaclang/__init__.py b/jac/jaclang/__init__.py index 5688f245db..a27927956d 100644 --- a/jac/jaclang/__init__.py +++ b/jac/jaclang/__init__.py @@ -1,22 +1,18 @@ """The Jac Programming Language.""" from jaclang.plugin.default import ( - JacAccessValidation, JacBuiltin, JacCmdDefaults, - JacFeatureDefaults, - JacNode, + JacFeatureImpl, ) from jaclang.plugin.feature import JacFeature, hookmanager jac_import = JacFeature.jac_import -hookmanager.register(JacFeatureDefaults) +hookmanager.register(JacFeatureImpl) hookmanager.register(JacBuiltin) hookmanager.register(JacCmdDefaults) -hookmanager.register(JacAccessValidation) -hookmanager.register(JacNode) hookmanager.load_setuptools_entrypoints("jac") __all__ = ["jac_import"] diff --git a/jac/jaclang/plugin/default.py b/jac/jaclang/plugin/default.py index 5fc9b2e58f..5e454b3030 100644 --- a/jac/jaclang/plugin/default.py +++ b/jac/jaclang/plugin/default.py @@ -22,12 +22,11 @@ Anchor, Architype, DSFunc, + EdgeAnchor, EdgeArchitype, EdgeDir, ExecutionContext, - JacAccessValidation as JacAV, JacFeature as Jac, - JacNode as JacN, NodeAnchor, NodeArchitype, P, @@ -52,7 +51,312 @@ logger = getLogger(__name__) -class JacFeatureDefaults: +class JacAccessValidationImpl: + """Jac Access Validation Implementations.""" + + @staticmethod + @hookimpl + def check_read_access(to: Anchor) -> bool: + """Read Access Validation.""" + if not (access_level := Jac.check_access_level(to) > AccessLevel.NO_ACCESS): + logger.info( + f"Current root doesn't have read access to {to.__class__.__name__}[{to.id}]" + ) + return access_level + + @staticmethod + @hookimpl + def check_connect_access(to: Anchor) -> bool: + """Write Access Validation.""" + if not (access_level := Jac.check_access_level(to) > AccessLevel.READ): + logger.info( + f"Current root doesn't have connect access to {to.__class__.__name__}[{to.id}]" + ) + return access_level + + @staticmethod + @hookimpl + def check_write_access(to: Anchor) -> bool: + """Write Access Validation.""" + if not (access_level := Jac.check_access_level(to) > AccessLevel.CONNECT): + logger.info( + f"Current root doesn't have write access to {to.__class__.__name__}[{to.id}]" + ) + return access_level + + @staticmethod + @hookimpl + def check_access_level(to: Anchor) -> AccessLevel: + """Access validation.""" + if not to.persistent: + return AccessLevel.WRITE + + jctx = Jac.get_context() + + jroot = jctx.root + + # if current root is system_root + # if current root id is equal to target anchor's root id + # if current root is the target anchor + if jroot == jctx.system_root or jroot.id == to.root or jroot == to: + return AccessLevel.WRITE + + access_level = AccessLevel.NO_ACCESS + + # if target anchor have set access.all + if (to_access := to.access).all > AccessLevel.NO_ACCESS: + access_level = to_access.all + + # if target anchor's root have set allowed roots + # if current root is allowed to the whole graph of target anchor's root + if to.root and isinstance(to_root := jctx.mem.find_one(to.root), Anchor): + if to_root.access.all > access_level: + access_level = to_root.access.all + + level = to_root.access.roots.check(str(jroot.id)) + if level > AccessLevel.NO_ACCESS and access_level == AccessLevel.NO_ACCESS: + access_level = level + + # if target anchor have set allowed roots + # if current root is allowed to target anchor + level = to_access.roots.check(str(jroot.id)) + if level > AccessLevel.NO_ACCESS and access_level == AccessLevel.NO_ACCESS: + access_level = level + + return access_level + + +class JacNodeImpl: + """Jac Node Operations.""" + + @staticmethod + @hookimpl + def get_edges( + node: NodeAnchor, + dir: EdgeDir, + filter_func: Optional[Callable[[list[EdgeArchitype]], list[EdgeArchitype]]], + target_obj: Optional[list[NodeArchitype]], + ) -> list[EdgeArchitype]: + """Get edges connected to this node.""" + ret_edges: list[EdgeArchitype] = [] + for anchor in node.edges: + if ( + (source := anchor.source) + and (target := anchor.target) + and (not filter_func or filter_func([anchor.architype])) + and source.architype + and target.architype + ): + if ( + dir in [EdgeDir.OUT, EdgeDir.ANY] + and node == source + and (not target_obj or target.architype in target_obj) + and Jac.check_read_access(target) + ): + ret_edges.append(anchor.architype) + if ( + dir in [EdgeDir.IN, EdgeDir.ANY] + and node == target + and (not target_obj or source.architype in target_obj) + and Jac.check_read_access(source) + ): + ret_edges.append(anchor.architype) + return ret_edges + + @staticmethod + @hookimpl + def edges_to_nodes( + node: NodeAnchor, + dir: EdgeDir, + filter_func: Optional[Callable[[list[EdgeArchitype]], list[EdgeArchitype]]], + target_obj: Optional[list[NodeArchitype]], + ) -> list[NodeArchitype]: + """Get set of nodes connected to this node.""" + ret_edges: list[NodeArchitype] = [] + for anchor in node.edges: + if ( + (source := anchor.source) + and (target := anchor.target) + and (not filter_func or filter_func([anchor.architype])) + and source.architype + and target.architype + ): + if ( + dir in [EdgeDir.OUT, EdgeDir.ANY] + and node == source + and (not target_obj or target.architype in target_obj) + and Jac.check_read_access(target) + ): + ret_edges.append(target.architype) + if ( + dir in [EdgeDir.IN, EdgeDir.ANY] + and node == target + and (not target_obj or source.architype in target_obj) + and Jac.check_read_access(source) + ): + ret_edges.append(source.architype) + return ret_edges + + @staticmethod + @hookimpl + def remove_edge(node: NodeAnchor, edge: EdgeAnchor) -> None: + """Remove reference without checking sync status.""" + for idx, ed in enumerate(node.edges): + if ed.id == edge.id: + node.edges.pop(idx) + break + + +class JacEdgeImpl: + """Jac Edge Operations.""" + + @staticmethod + @hookimpl + def detach(edge: EdgeAnchor) -> None: + """Detach edge from nodes.""" + Jac.remove_edge(node=edge.source, edge=edge) + Jac.remove_edge(node=edge.target, edge=edge) + + +class JacWalkerImpl: + """Jac Edge Operations.""" + + @staticmethod + @hookimpl + def visit_node( + walker: WalkerArchitype, + expr: ( + list[NodeArchitype | EdgeArchitype] + | list[NodeArchitype] + | list[EdgeArchitype] + | NodeArchitype + | EdgeArchitype + ), + ) -> bool: + """Jac's visit stmt feature.""" + if isinstance(walker, WalkerArchitype): + """Walker visits node.""" + wanch = walker.__jac__ + before_len = len(wanch.next) + for anchor in ( + (i.__jac__ for i in expr) if isinstance(expr, list) else [expr.__jac__] + ): + if anchor not in wanch.ignores: + if isinstance(anchor, NodeAnchor): + wanch.next.append(anchor) + elif isinstance(anchor, EdgeAnchor): + if target := anchor.target: + wanch.next.append(target) + else: + raise ValueError("Edge has no target.") + return len(wanch.next) > before_len + else: + raise TypeError("Invalid walker object") + + @staticmethod + @hookimpl + def ignore( + walker: WalkerArchitype, + expr: ( + list[NodeArchitype | EdgeArchitype] + | list[NodeArchitype] + | list[EdgeArchitype] + | NodeArchitype + | EdgeArchitype + ), + ) -> bool: + """Jac's ignore stmt feature.""" + if isinstance(walker, WalkerArchitype): + wanch = walker.__jac__ + before_len = len(wanch.ignores) + for anchor in ( + (i.__jac__ for i in expr) if isinstance(expr, list) else [expr.__jac__] + ): + if anchor not in wanch.ignores: + if isinstance(anchor, NodeAnchor): + wanch.ignores.append(anchor) + elif isinstance(anchor, EdgeAnchor): + if target := anchor.target: + wanch.ignores.append(target) + else: + raise ValueError("Edge has no target.") + return len(wanch.ignores) > before_len + else: + raise TypeError("Invalid walker object") + + @staticmethod + @hookimpl + def spawn_call(op1: Architype, op2: Architype) -> WalkerArchitype: + """Invoke data spatial call.""" + if isinstance(op1, WalkerArchitype): + warch = op1 + walker = op1.__jac__ + if isinstance(op2, NodeArchitype): + node = op2.__jac__ + elif isinstance(op2, EdgeArchitype): + node = op2.__jac__.source + else: + raise TypeError("Invalid target object") + elif isinstance(op2, WalkerArchitype): + warch = op2 + walker = op2.__jac__ + if isinstance(op1, NodeArchitype): + node = op1.__jac__ + elif isinstance(op1, EdgeArchitype): + node = op1.__jac__.source + else: + raise TypeError("Invalid target object") + else: + raise TypeError("Invalid walker object") + + walker.path = [] + walker.next = [node] + while len(walker.next): + if current_node := walker.next.pop(0).architype: + for i in current_node._jac_entry_funcs_: + if not i.trigger or isinstance(warch, i.trigger): + if i.func: + i.func(current_node, warch) + else: + raise ValueError(f"No function {i.name} to call.") + if walker.disengaged: + return warch + for i in warch._jac_entry_funcs_: + if not i.trigger or isinstance(current_node, i.trigger): + if i.func: + i.func(warch, current_node) + else: + raise ValueError(f"No function {i.name} to call.") + if walker.disengaged: + return warch + for i in warch._jac_exit_funcs_: + if not i.trigger or isinstance(current_node, i.trigger): + if i.func: + i.func(warch, current_node) + else: + raise ValueError(f"No function {i.name} to call.") + if walker.disengaged: + return warch + for i in current_node._jac_exit_funcs_: + if not i.trigger or isinstance(warch, i.trigger): + if i.func: + i.func(current_node, warch) + else: + raise ValueError(f"No function {i.name} to call.") + if walker.disengaged: + return warch + walker.ignores = [] + return warch + + @staticmethod + @hookimpl + def disengage(walker: WalkerArchitype) -> bool: # noqa: ANN401 + """Jac's disengage stmt feature.""" + walker.__jac__.disengaged = True + return True + + +class JacFeatureImpl(JacAccessValidationImpl, JacNodeImpl, JacEdgeImpl, JacWalkerImpl): """Jac Feature.""" @staticmethod @@ -366,69 +670,11 @@ def has_instance_default(gen_func: Callable[[], T]) -> T: """Jac's has container default feature.""" return field(default_factory=lambda: gen_func()) - @staticmethod - @hookimpl - def spawn_call(op1: Architype, op2: Architype) -> WalkerArchitype: - """Jac's spawn operator feature.""" - if isinstance(op1, WalkerArchitype): - return op1.__jac__.spawn_call(op2.__jac__) - elif isinstance(op2, WalkerArchitype): - return op2.__jac__.spawn_call(op1.__jac__) - else: - raise TypeError("Invalid walker object") - @staticmethod @hookimpl def report(expr: Any) -> Any: # noqa: ANN401 """Jac's report stmt feature.""" - @staticmethod - @hookimpl - def ignore( - walker: WalkerArchitype, - expr: ( - list[NodeArchitype | EdgeArchitype] - | list[NodeArchitype] - | list[EdgeArchitype] - | NodeArchitype - | EdgeArchitype - ), - ) -> bool: - """Jac's ignore stmt feature.""" - if isinstance(walker, WalkerArchitype): - return walker.__jac__.ignore_node( - (i.__jac__ for i in expr) if isinstance(expr, list) else [expr.__jac__] - ) - else: - raise TypeError("Invalid walker object") - - @staticmethod - @hookimpl - def visit_node( - walker: WalkerArchitype, - expr: ( - list[NodeArchitype | EdgeArchitype] - | list[NodeArchitype] - | list[EdgeArchitype] - | NodeArchitype - | EdgeArchitype - ), - ) -> bool: - """Jac's visit stmt feature.""" - if isinstance(walker, WalkerArchitype): - return walker.__jac__.visit_node( - (i.__jac__ for i in expr) if isinstance(expr, list) else [expr.__jac__] - ) - else: - raise TypeError("Invalid walker object") - - @staticmethod - @hookimpl - def disengage(walker: WalkerArchitype) -> bool: # noqa: ANN401 - """Jac's disengage stmt feature.""" - walker.__jac__.disengage_now() - return True - @staticmethod @hookimpl def edge_ref( @@ -449,7 +695,7 @@ def edge_ref( if edges_only: connected_edges: list[EdgeArchitype] = [] for node in node_obj: - connected_edges += JacN.get_edges( + connected_edges += Jac.get_edges( node.__jac__, dir, filter_func, target_obj=targ_obj_set ) return list(set(connected_edges)) @@ -457,7 +703,7 @@ def edge_ref( connected_nodes: list[NodeArchitype] = [] for node in node_obj: connected_nodes.extend( - JacN.edges_to_nodes( + Jac.edges_to_nodes( node.__jac__, dir, filter_func, target_obj=targ_obj_set ) ) @@ -481,10 +727,10 @@ def connect( for i in left: _left = i.__jac__ - if JacAV.check_connect_access(_left): + if Jac.check_connect_access(_left): for j in right: _right = j.__jac__ - if JacAV.check_connect_access(_right): + if Jac.check_connect_access(_right): edges.append(edge_spec(_left, _right)) return right if not edges_only else edges @@ -515,17 +761,17 @@ def disconnect( dir in [EdgeDir.OUT, EdgeDir.ANY] and node == source and target.architype in right - and JacAV.check_write_access(target) + and Jac.check_write_access(target) ): - anchor.destroy() if anchor.persistent else anchor.detach() + anchor.destroy() if anchor.persistent else Jac.detach(anchor) disconnect_occurred = True if ( dir in [EdgeDir.IN, EdgeDir.ANY] and node == target and source.architype in right - and JacAV.check_write_access(source) + and Jac.check_write_access(source) ): - anchor.destroy() if anchor.persistent else anchor.detach() + anchor.destroy() if anchor.persistent else Jac.detach(anchor) disconnect_occurred = True return disconnect_occurred @@ -567,6 +813,11 @@ def build_edge( def builder(source: NodeAnchor, target: NodeAnchor) -> EdgeArchitype: edge = conn_type() if isinstance(conn_type, type) else conn_type edge.__attach__(source, target, is_undirected) + + eanch = edge.__jac__ + source.edges.append(eanch) + target.edges.append(eanch) + if conn_assign: for fld, val in zip(conn_assign[0], conn_assign[1]): if hasattr(edge, fld): @@ -574,7 +825,7 @@ def builder(source: NodeAnchor, target: NodeAnchor) -> EdgeArchitype: else: raise ValueError(f"Invalid attribute: {fld}") if source.persistent or target.persistent: - edge.__jac__.save() + eanch.save() target.save() source.save() return edge @@ -943,150 +1194,3 @@ class JacCmdDefaults: def create_cmd() -> None: """Create Jac CLI cmds.""" pass - - -class JacAccessValidation: - """Jac Access Validation Implementations.""" - - @staticmethod - @hookimpl - def check_read_access(to: Anchor) -> bool: - """Read Access Validation.""" - if not (access_level := JacAV.check_access_level(to) > AccessLevel.NO_ACCESS): - logger.info( - f"Current root doesn't have read access to {to.__class__.__name__}[{to.id}]" - ) - return access_level - - @staticmethod - @hookimpl - def check_connect_access(to: Anchor) -> bool: - """Write Access Validation.""" - if not (access_level := JacAV.check_access_level(to) > AccessLevel.READ): - logger.info( - f"Current root doesn't have connect access to {to.__class__.__name__}[{to.id}]" - ) - return access_level - - @staticmethod - @hookimpl - def check_write_access(to: Anchor) -> bool: - """Write Access Validation.""" - if not (access_level := JacAV.check_access_level(to) > AccessLevel.CONNECT): - logger.info( - f"Current root doesn't have write access to {to.__class__.__name__}[{to.id}]" - ) - return access_level - - @staticmethod - @hookimpl - def check_access_level(to: Anchor) -> AccessLevel: - """Access validation.""" - if not to.persistent: - return AccessLevel.WRITE - - jctx = Jac.get_context() - - jroot = jctx.root - - # if current root is system_root - # if current root id is equal to target anchor's root id - # if current root is the target anchor - if jroot == jctx.system_root or jroot.id == to.root or jroot == to: - return AccessLevel.WRITE - - access_level = AccessLevel.NO_ACCESS - - # if target anchor have set access.all - if (to_access := to.access).all > AccessLevel.NO_ACCESS: - access_level = to_access.all - - # if target anchor's root have set allowed roots - # if current root is allowed to the whole graph of target anchor's root - if to.root and isinstance(to_root := jctx.mem.find_one(to.root), Anchor): - if to_root.access.all > access_level: - access_level = to_root.access.all - - level = to_root.access.roots.check(str(jroot.id)) - if level > AccessLevel.NO_ACCESS and access_level == AccessLevel.NO_ACCESS: - access_level = level - - # if target anchor have set allowed roots - # if current root is allowed to target anchor - level = to_access.roots.check(str(jroot.id)) - if level > AccessLevel.NO_ACCESS and access_level == AccessLevel.NO_ACCESS: - access_level = level - - return access_level - - -class JacNode: - """Jac Node Operations.""" - - @staticmethod - @hookimpl - def get_edges( - node: NodeAnchor, - dir: EdgeDir, - filter_func: Optional[Callable[[list[EdgeArchitype]], list[EdgeArchitype]]], - target_obj: Optional[list[NodeArchitype]], - ) -> list[EdgeArchitype]: - """Get edges connected to this node.""" - ret_edges: list[EdgeArchitype] = [] - for anchor in node.edges: - if ( - (source := anchor.source) - and (target := anchor.target) - and (not filter_func or filter_func([anchor.architype])) - and source.architype - and target.architype - ): - if ( - dir in [EdgeDir.OUT, EdgeDir.ANY] - and node == source - and (not target_obj or target.architype in target_obj) - and JacAV.check_read_access(target) - ): - ret_edges.append(anchor.architype) - if ( - dir in [EdgeDir.IN, EdgeDir.ANY] - and node == target - and (not target_obj or source.architype in target_obj) - and JacAV.check_read_access(source) - ): - ret_edges.append(anchor.architype) - return ret_edges - - @staticmethod - @hookimpl - def edges_to_nodes( - node: NodeAnchor, - dir: EdgeDir, - filter_func: Optional[Callable[[list[EdgeArchitype]], list[EdgeArchitype]]], - target_obj: Optional[list[NodeArchitype]], - ) -> list[NodeArchitype]: - """Get set of nodes connected to this node.""" - ret_edges: list[NodeArchitype] = [] - for anchor in node.edges: - if ( - (source := anchor.source) - and (target := anchor.target) - and (not filter_func or filter_func([anchor.architype])) - and source.architype - and target.architype - ): - if ( - dir in [EdgeDir.OUT, EdgeDir.ANY] - and node == source - and (not target_obj or target.architype in target_obj) - and JacAV.check_read_access(target) - ): - ret_edges.append(target.architype) - if ( - dir in [EdgeDir.IN, EdgeDir.ANY] - and node == target - and (not target_obj or source.architype in target_obj) - and JacAV.check_read_access(source) - ): - ret_edges.append(source.architype) - return ret_edges diff --git a/jac/jaclang/plugin/feature.py b/jac/jaclang/plugin/feature.py index c057c71eee..aff4a5fae3 100644 --- a/jac/jaclang/plugin/feature.py +++ b/jac/jaclang/plugin/feature.py @@ -11,6 +11,7 @@ Anchor, Architype, DSFunc, + EdgeAnchor, EdgeArchitype, EdgeDir, ExecutionContext, @@ -26,7 +27,115 @@ ) -class JacFeature: +class JacAccessValidation: + """Jac Access Validation Specs.""" + + @staticmethod + def check_read_access(to: Anchor) -> bool: + """Read Access Validation.""" + return hookmanager.hook.check_read_access(to=to) + + @staticmethod + def check_connect_access(to: Anchor) -> bool: + """Write Access Validation.""" + return hookmanager.hook.check_connect_access(to=to) + + @staticmethod + def check_write_access(to: Anchor) -> bool: + """Write Access Validation.""" + return hookmanager.hook.check_write_access(to=to) + + @staticmethod + def check_access_level(to: Anchor) -> AccessLevel: + """Access validation.""" + return hookmanager.hook.check_access_level(to=to) + + +class JacNode: + """Jac Node Operations.""" + + @staticmethod + def get_edges( + node: NodeAnchor, + dir: EdgeDir, + filter_func: Optional[Callable[[list[EdgeArchitype]], list[EdgeArchitype]]], + target_obj: Optional[list[NodeArchitype]], + ) -> list[EdgeArchitype]: + """Get edges connected to this node.""" + return hookmanager.hook.get_edges( + node=node, dir=dir, filter_func=filter_func, target_obj=target_obj + ) + + @staticmethod + def edges_to_nodes( + node: NodeAnchor, + dir: EdgeDir, + filter_func: Optional[Callable[[list[EdgeArchitype]], list[EdgeArchitype]]], + target_obj: Optional[list[NodeArchitype]], + ) -> list[NodeArchitype]: + """Get set of nodes connected to this node.""" + return hookmanager.hook.edges_to_nodes( + node=node, dir=dir, filter_func=filter_func, target_obj=target_obj + ) + + @staticmethod + def remove_edge(node: NodeAnchor, edge: EdgeAnchor) -> None: + """Remove reference without checking sync status.""" + return hookmanager.hook.remove_edge(node=node, edge=edge) + + +class JacEdge: + """Jac Edge Operations.""" + + @staticmethod + def detach(edge: EdgeAnchor) -> None: + """Detach edge from nodes.""" + return hookmanager.hook.detach(edge=edge) + + +class JacWalker: + """Jac Edge Operations.""" + + @staticmethod + def visit_node( + walker: WalkerArchitype, + expr: ( + list[NodeArchitype | EdgeArchitype] + | list[NodeArchitype] + | list[EdgeArchitype] + | NodeArchitype + | EdgeArchitype + ), + ) -> bool: # noqa: ANN401 + """Jac's visit stmt feature.""" + return hookmanager.hook.visit_node(walker=walker, expr=expr) + + @staticmethod + def ignore( + walker: WalkerArchitype, + expr: ( + list[NodeArchitype | EdgeArchitype] + | list[NodeArchitype] + | list[EdgeArchitype] + | NodeArchitype + | EdgeArchitype + ), + ) -> bool: # noqa: ANN401 + """Jac's ignore stmt feature.""" + return hookmanager.hook.ignore(walker=walker, expr=expr) + + @staticmethod + def spawn_call(op1: Architype, op2: Architype) -> WalkerArchitype: + """Jac's spawn operator feature.""" + return hookmanager.hook.spawn_call(op1=op1, op2=op2) + + @staticmethod + def disengage(walker: WalkerArchitype) -> bool: + """Jac's disengage stmt feature.""" + return hookmanager.hook.disengage(walker=walker) + + +class JacFeature(JacAccessValidation, JacNode, JacEdge, JacWalker): """Jac Feature.""" @staticmethod @@ -155,49 +264,11 @@ def has_instance_default(gen_func: Callable[[], T]) -> T: """Jac's has container default feature.""" return hookmanager.hook.has_instance_default(gen_func=gen_func) - @staticmethod - def spawn_call(op1: Architype, op2: Architype) -> WalkerArchitype: - """Jac's spawn operator feature.""" - return hookmanager.hook.spawn_call(op1=op1, op2=op2) - @staticmethod def report(expr: Any) -> Any: # noqa: ANN401 """Jac's report stmt feature.""" return hookmanager.hook.report(expr=expr) - @staticmethod - def ignore( - walker: WalkerArchitype, - expr: ( - list[NodeArchitype | EdgeArchitype] - | list[NodeArchitype] - | list[EdgeArchitype] - | NodeArchitype - | EdgeArchitype - ), - ) -> bool: # noqa: ANN401 - """Jac's ignore stmt feature.""" - return hookmanager.hook.ignore(walker=walker, expr=expr) - - @staticmethod - def visit_node( - walker: WalkerArchitype, - expr: ( - list[NodeArchitype | EdgeArchitype] - | list[NodeArchitype] - | list[EdgeArchitype] - | NodeArchitype - | EdgeArchitype - ), - ) -> bool: # noqa: ANN401 - """Jac's visit stmt feature.""" - return hookmanager.hook.visit_node(walker=walker, expr=expr) - - @staticmethod - def disengage(walker: WalkerArchitype) -> bool: # noqa: ANN401 - """Jac's disengage stmt feature.""" - return hookmanager.hook.disengage(walker=walker) - @staticmethod def edge_ref( node_obj: NodeArchitype | list[NodeArchitype], @@ -391,55 +462,3 @@ class JacCmd: def create_cmd() -> None: """Create Jac CLI cmds.""" return hookmanager.hook.create_cmd() - - -class JacAccessValidation: - """Jac Access Validation Specs.""" - - @staticmethod - def check_read_access(to: Anchor) -> bool: - """Read Access Validation.""" - return hookmanager.hook.check_read_access(to=to) - - @staticmethod - def check_connect_access(to: Anchor) -> bool: - """Write Access Validation.""" - return hookmanager.hook.check_connect_access(to=to) - - @staticmethod - def check_write_access(to: Anchor) -> bool: - """Write Access Validation.""" - return hookmanager.hook.check_write_access(to=to) - - @staticmethod - def check_access_level(to: Anchor) -> AccessLevel: - """Access validation.""" - return hookmanager.hook.check_access_level(to=to) - - -class JacNode: - """Jac Node Operations.""" - - @staticmethod - def get_edges( - node: NodeAnchor, - dir: EdgeDir, - filter_func: Optional[Callable[[list[EdgeArchitype]], list[EdgeArchitype]]], - target_obj: Optional[list[NodeArchitype]], - ) -> list[EdgeArchitype]: - """Get edges connected to this node.""" - return hookmanager.hook.get_edges( - node=node, dir=dir, filter_func=filter_func, target_obj=target_obj - ) - - @staticmethod - def edges_to_nodes( - node: NodeAnchor, - dir: EdgeDir, - filter_func: Optional[Callable[[list[EdgeArchitype]], list[EdgeArchitype]]], - target_obj: Optional[list[NodeArchitype]], - ) -> list[NodeArchitype]: - """Get set of nodes connected to this node.""" - return hookmanager.hook.edges_to_nodes( - node=node, dir=dir, filter_func=filter_func, target_obj=target_obj - ) diff --git a/jac/jaclang/plugin/spec.py b/jac/jaclang/plugin/spec.py index df7dd336d9..0149afe09b 100644 --- a/jac/jaclang/plugin/spec.py +++ b/jac/jaclang/plugin/spec.py @@ -24,6 +24,7 @@ Anchor, Architype, DSFunc, + EdgeAnchor, EdgeArchitype, NodeAnchor, NodeArchitype, @@ -41,7 +42,123 @@ P = ParamSpec("P") -class JacFeatureSpec: +class JacAccessValidationSpec: + """Jac Access Validation Specs.""" + + @staticmethod + @hookspec(firstresult=True) + def check_read_access(to: Anchor) -> bool: + """Read Access Validation.""" + raise NotImplementedError + + @staticmethod + @hookspec(firstresult=True) + def check_connect_access(to: Anchor) -> bool: + """Write Access Validation.""" + raise NotImplementedError + + @staticmethod + @hookspec(firstresult=True) + def check_write_access(to: Anchor) -> bool: + """Write Access Validation.""" + raise NotImplementedError + + @staticmethod + @hookspec(firstresult=True) + def check_access_level(to: Anchor) -> AccessLevel: + """Access validation.""" + raise NotImplementedError + + +class JacNodeSpec: + """Jac Node Operations.""" + + @staticmethod + @hookspec(firstresult=True) + def get_edges( + node: NodeAnchor, + dir: EdgeDir, + filter_func: Optional[Callable[[list[EdgeArchitype]], list[EdgeArchitype]]], + target_obj: Optional[list[NodeArchitype]], + ) -> list[EdgeArchitype]: + """Get edges connected to this node.""" + raise NotImplementedError + + @staticmethod + @hookspec(firstresult=True) + def edges_to_nodes( + node: NodeAnchor, + dir: EdgeDir, + filter_func: Optional[Callable[[list[EdgeArchitype]], list[EdgeArchitype]]], + target_obj: Optional[list[NodeArchitype]], + ) -> list[NodeArchitype]: + """Get set of nodes connected to this node.""" + raise NotImplementedError + + @staticmethod + @hookspec(firstresult=True) + def remove_edge(node: NodeAnchor, edge: EdgeAnchor) -> None: + """Remove reference without checking sync status.""" + raise NotImplementedError + + +class JacEdgeSpec: + """Jac Edge Operations.""" + + @staticmethod + @hookspec(firstresult=True) + def detach(edge: EdgeAnchor) -> None: + """Detach edge from nodes.""" + raise NotImplementedError + + +class JacWalkerSpec: + """Jac Edge Operations.""" + + @staticmethod + @hookspec(firstresult=True) + def visit_node( + walker: WalkerArchitype, + expr: ( + list[NodeArchitype | EdgeArchitype] + | list[NodeArchitype] + | list[EdgeArchitype] + | NodeArchitype + | EdgeArchitype + ), + ) -> bool: # noqa: ANN401 + """Jac's visit stmt feature.""" + raise NotImplementedError + + @staticmethod + @hookspec(firstresult=True) + def ignore( + walker: WalkerArchitype, + expr: ( + list[NodeArchitype | EdgeArchitype] + | list[NodeArchitype] + | list[EdgeArchitype] + | NodeArchitype + | EdgeArchitype + ), + ) -> bool: + """Jac's ignore stmt feature.""" + raise NotImplementedError + + @staticmethod + @hookspec(firstresult=True) + def spawn_call(op1: Architype, op2: Architype) -> WalkerArchitype: + """Invoke data spatial call.""" + raise NotImplementedError + + @staticmethod + @hookspec(firstresult=True) + def disengage(walker: WalkerArchitype) -> bool: + """Jac's disengage stmt feature.""" + raise NotImplementedError + + +class JacFeatureSpec(JacAccessValidationSpec, JacNodeSpec, JacEdgeSpec, JacWalkerSpec): """Jac Feature.""" @staticmethod @@ -178,42 +295,6 @@ def report(expr: Any) -> Any: # noqa: ANN401 """Jac's report stmt feature.""" raise NotImplementedError - @staticmethod - @hookspec(firstresult=True) - def ignore( - walker: WalkerArchitype, - expr: ( - list[NodeArchitype | EdgeArchitype] - | list[NodeArchitype] - | list[EdgeArchitype] - | NodeArchitype - | EdgeArchitype - ), - ) -> bool: - """Jac's ignore stmt feature.""" - raise NotImplementedError - - @staticmethod - @hookspec(firstresult=True) - def visit_node( - walker: WalkerArchitype, - expr: ( - list[NodeArchitype | EdgeArchitype] - | list[NodeArchitype] - | list[EdgeArchitype] - | NodeArchitype - | EdgeArchitype - ), - ) -> bool: # noqa: ANN401 - """Jac's visit stmt feature.""" - raise NotImplementedError - - @staticmethod - @hookspec(firstresult=True) - def disengage(walker: WalkerArchitype) -> bool: # noqa: ANN401 - """Jac's disengage stmt feature.""" - raise NotImplementedError - @staticmethod @hookspec(firstresult=True) def edge_ref( @@ -376,62 +457,6 @@ def create_cmd() -> None: raise NotImplementedError -class JacAccessValidation: - """Jac Access Validation Specs.""" - - @staticmethod - @hookspec(firstresult=True) - def check_read_access(to: Anchor) -> bool: - """Read Access Validation.""" - raise NotImplementedError - - @staticmethod - @hookspec(firstresult=True) - def check_connect_access(to: Anchor) -> bool: - """Write Access Validation.""" - raise NotImplementedError - - @staticmethod - @hookspec(firstresult=True) - def check_write_access(to: Anchor) -> bool: - """Write Access Validation.""" - raise NotImplementedError - - @staticmethod - @hookspec(firstresult=True) - def check_access_level(to: Anchor) -> AccessLevel: - """Access validation.""" - raise NotImplementedError - - -class JacNode: - """Jac Node Operations.""" - - @staticmethod - @hookspec(firstresult=True) - def get_edges( - node: NodeAnchor, - dir: EdgeDir, - filter_func: Optional[Callable[[list[EdgeArchitype]], list[EdgeArchitype]]], - target_obj: Optional[list[NodeArchitype]], - ) -> list[EdgeArchitype]: - """Get edges connected to this node.""" - raise NotImplementedError - - @staticmethod - @hookspec(firstresult=True) - def edges_to_nodes( - node: NodeAnchor, - dir: EdgeDir, - filter_func: Optional[Callable[[list[EdgeArchitype]], list[EdgeArchitype]]], - target_obj: Optional[list[NodeArchitype]], - ) -> list[NodeArchitype]: - """Get set of nodes connected to this node.""" - raise NotImplementedError - - hookmanager.add_hookspecs(JacFeatureSpec) hookmanager.add_hookspecs(JacCmdSpec) hookmanager.add_hookspecs(JacBuiltin) -hookmanager.add_hookspecs(JacAccessValidation) -hookmanager.add_hookspecs(JacNode) diff --git a/jac/jaclang/plugin/tests/test_features.py b/jac/jaclang/plugin/tests/test_features.py index 22a3f99faa..6c3e8e0afa 100644 --- a/jac/jaclang/plugin/tests/test_features.py +++ b/jac/jaclang/plugin/tests/test_features.py @@ -3,7 +3,7 @@ import inspect from typing import List, Type -from jaclang.plugin.default import JacFeatureDefaults +from jaclang.plugin.default import JacFeatureImpl from jaclang.plugin.feature import JacFeature from jaclang.plugin.spec import JacFeatureSpec from jaclang.utils.test import TestCase @@ -46,7 +46,7 @@ def test_feature_funcs_synced(self) -> None: """Test if JacFeature, JacFeatureDefaults, and JacFeatureSpec have synced methods.""" # Get methods of each class jac_feature_methods = self.get_methods(JacFeature) - jac_feature_defaults_methods = self.get_methods(JacFeatureDefaults) + jac_feature_defaults_methods = self.get_methods(JacFeatureImpl) jac_feature_spec_methods = self.get_methods(JacFeatureSpec) # Check if all methods are the same in all classes diff --git a/jac/jaclang/runtimelib/architype.py b/jac/jaclang/runtimelib/architype.py index f020e9611d..2aee28c343 100644 --- a/jac/jaclang/runtimelib/architype.py +++ b/jac/jaclang/runtimelib/architype.py @@ -7,7 +7,7 @@ from logging import getLogger from pickle import dumps from types import UnionType -from typing import Any, Callable, ClassVar, Iterable, Optional, TypeVar +from typing import Any, Callable, ClassVar, Optional, TypeVar from uuid import UUID, uuid4 from jaclang.runtimelib.utils import collect_node_connections @@ -303,13 +303,6 @@ class NodeAnchor(Anchor): architype: NodeArchitype edges: list[EdgeAnchor] - def remove_edge(self, edge: EdgeAnchor) -> None: - """Remove reference without checking sync status.""" - for idx, ed in enumerate(self.edges): - if ed.id == edge.id: - self.edges.pop(idx) - break - def gen_dot(self, dot_file: Optional[str] = None) -> str: """Generate Dot file for visualizing nodes and edges.""" visited_nodes: set[NodeAnchor] = set() @@ -333,10 +326,6 @@ def gen_dot(self, dot_file: Optional[str] = None) -> str: f.write(dot_content + "}") return dot_content + "}" - def spawn_call(self, walk: WalkerAnchor) -> WalkerArchitype: - """Invoke data spatial call.""" - return walk.spawn_call(self) - def destroy(self) -> None: """Destroy Anchor.""" from jaclang.plugin.feature import JacFeature as Jac @@ -368,20 +357,6 @@ class EdgeAnchor(Anchor): target: NodeAnchor is_undirected: bool - def __post_init__(self) -> None: - """Populate edge to source and target.""" - self.source.edges.append(self) - self.target.edges.append(self) - - def detach(self) -> None: - """Detach edge from nodes.""" - self.source.remove_edge(self) - self.target.remove_edge(self) - - def spawn_call(self, walk: WalkerAnchor) -> WalkerArchitype: - """Invoke data spatial call.""" - return walk.spawn_call(self.target) - def destroy(self) -> None: """Destroy Anchor.""" from jaclang.plugin.feature import JacFeature as Jac @@ -389,7 +364,7 @@ def destroy(self) -> None: jctx = Jac.get_context() if jctx.root.has_write_access(self): - self.detach() + Jac.detach(self) jctx.mem.remove(self.id) def __getstate__(self) -> dict[str, object]: @@ -418,81 +393,6 @@ class WalkerAnchor(Anchor): ignores: list[Anchor] = field(default_factory=list) disengaged: bool = False - def visit_node(self, anchors: Iterable[NodeAnchor | EdgeAnchor]) -> bool: - """Walker visits node.""" - before_len = len(self.next) - for anchor in anchors: - if anchor not in self.ignores: - if isinstance(anchor, NodeAnchor): - self.next.append(anchor) - elif isinstance(anchor, EdgeAnchor): - if target := anchor.target: - self.next.append(target) - else: - raise ValueError("Edge has no target.") - return len(self.next) > before_len - - def ignore_node(self, anchors: Iterable[NodeAnchor | EdgeAnchor]) -> bool: - """Walker ignores node.""" - before_len = len(self.ignores) - for anchor in anchors: - if anchor not in self.ignores: - if isinstance(anchor, NodeAnchor): - self.ignores.append(anchor) - elif isinstance(anchor, EdgeAnchor): - if target := anchor.target: - self.ignores.append(target) - else: - raise ValueError("Edge has no target.") - return len(self.ignores) > before_len - - def disengage_now(self) -> None: - """Disengage walker from traversal.""" - self.disengaged = True - - def spawn_call(self, node: Anchor) -> WalkerArchitype: - """Invoke data spatial call.""" - if walker := self.architype: - self.path = [] - self.next = [node] - while len(self.next): - if current_node := self.next.pop(0).architype: - for i in current_node._jac_entry_funcs_: - if not i.trigger or isinstance(walker, i.trigger): - if i.func: - i.func(current_node, walker) - else: - raise ValueError(f"No function {i.name} to call.") - if self.disengaged: - return walker - for i in walker._jac_entry_funcs_: - if not i.trigger or isinstance(current_node, i.trigger): - if i.func: - i.func(walker, current_node) - else: - raise ValueError(f"No function {i.name} to call.") - if self.disengaged: - return walker - for i in walker._jac_exit_funcs_: - if not i.trigger or isinstance(current_node, i.trigger): - if i.func: - i.func(walker, current_node) - else: - raise ValueError(f"No function {i.name} to call.") - if self.disengaged: - return walker - for i in current_node._jac_exit_funcs_: - if not i.trigger or isinstance(walker, i.trigger): - if i.func: - i.func(current_node, walker) - else: - raise ValueError(f"No function {i.name} to call.") - if self.disengaged: - return walker - self.ignores = [] - return walker - raise Exception(f"Invalid Reference {self.id}") - class Architype: """Architype Protocol."""