diff --git a/src/resolvelib/reporters.py b/src/resolvelib/reporters.py index 05c7226..9a6bfad 100644 --- a/src/resolvelib/reporters.py +++ b/src/resolvelib/reporters.py @@ -60,3 +60,6 @@ def rejecting_candidate( def pinning(self, candidate: CT) -> None: """Called when adding a candidate to the potential solution.""" + + def fallback(self, from_: str, to: str) -> None: + """Called when falling back from one backtrack strategy to another.""" diff --git a/src/resolvelib/resolvers.py b/src/resolvelib/resolvers.py index 58a48dd..e5e1b15 100644 --- a/src/resolvelib/resolvers.py +++ b/src/resolvelib/resolvers.py @@ -9,8 +9,13 @@ Collection, Generic, Iterable, + List, + Literal, Mapping, NamedTuple, + Optional, + Set, + Tuple, ) from .providers import AbstractProvider @@ -31,6 +36,8 @@ if TYPE_CHECKING: from .providers import Preference + BacktrackStrategy = Literal["backjump_fallback", "backjump", "backtrack"] + class Result(NamedTuple, Generic[RT, CT, KT]): mapping: Mapping[KT, CT] graph: DirectedGraph[KT | None] @@ -100,9 +107,12 @@ def __init__( self, provider: AbstractProvider[RT, CT, KT], reporter: BaseReporter[RT, CT, KT], + backtrack_strategy: BacktrackStrategy = "backjump_fallback", ) -> None: self._p = provider self._r = reporter + self._backtrack_strategy: BacktrackStrategy = backtrack_strategy + self._fallback_states: Optional[list[State[RT, CT, KT]]] = None self._states: list[State[RT, CT, KT]] = [] @property @@ -269,6 +279,78 @@ def _attempt_to_pin_criterion(self, name: KT) -> list[Criterion[RT, CT]]: # end, signal for backtracking. return causes + def _backtrack_iteration(self) -> Tuple[KT, CT, List[Tuple[KT, list[CT]]]]: + broken_state = self._states.pop() + name, candidate = broken_state.mapping.popitem() + incompatibilities_from_broken = [ + (k, list(v.incompatibilities)) + for k, v in broken_state.criteria.items() + ] + + return name, candidate, incompatibilities_from_broken + + # -> Tuple[KT, CT, List[Tuple[KT, list[CT]]] + def _backjump_iteration( + self, + causes: list[RequirementInformation[RT, CT]], + incompatible_deps: Set[KT], + ) -> Tuple[KT, CT, List[Tuple[KT, list[CT]]]]: + # Ensure to backtrack to a state that caused the incompatibility + incompatible_state = False + name, candidate, broken_state = None, None, None + + if ( + self._backtrack_strategy == "backjump_fallback" + and self._fallback_states is None + ): + fallback_states = [ + State( + s.mapping.copy(), + s.criteria.copy(), + s.backtrack_causes[:], + ) + for s in self._states + ] + else: + fallback_states = None + + backjump_count = 0 + while not incompatible_state: + backjump_count += 1 + + # Retrieve the last candidate pin and known incompatibilities + try: + broken_state = self._states.pop() + name, candidate = broken_state.mapping.popitem() + except (IndexError, KeyError): + raise ResolutionImpossible(causes) + current_dependencies = { + self._p.identify(d) + for d in self._p.get_dependencies(candidate) + } + incompatible_state = not current_dependencies.isdisjoint( + incompatible_deps + ) + + # Backup states first time a backjump goes + # further than a backtrack would have + if ( + self._backtrack_strategy == "backjump_fallback" + and self._fallback_states is None + and backjump_count == 2 + ): + self._fallback_states = fallback_states + + if name is None or candidate is None or broken_state is None: + raise ResolutionImpossible(causes) + + incompatibilities_from_broken = [ + (k, list(v.incompatibilities)) + for k, v in broken_state.criteria.items() + ] + + return name, candidate, incompatibilities_from_broken + def _backjump(self, causes: list[RequirementInformation[RT, CT]]) -> bool: """Perform backjumping. @@ -299,6 +381,17 @@ def _backjump(self, causes: list[RequirementInformation[RT, CT]]) -> bool: 5a. If this causes Y' to conflict, we need to backtrack again. Make Y' the new Z and go back to step 2. 5b. If the incompatibilities apply cleanly, end backtracking. + + If backtracking each iteraction the the loop will: + + 1. Discard Z. + 2. Discard Y but remember its incompatibility information gathered + previously, and the failure we're dealing with right now. + 3. Push a new state Y' based on X, and apply the incompatibility + information from Y to Y'. + 4a. If this causes Y' to conflict, we need to backtrack again. Make Y' + the new Z and go back to step 2. + 4b. If the incompatibilities apply cleanly, end backtracking. """ incompatible_reqs: Iterable[CT | RT] = itertools.chain( (c.parent for c in causes if c.parent is not None), @@ -309,28 +402,42 @@ def _backjump(self, causes: list[RequirementInformation[RT, CT]]) -> bool: # Remove the state that triggered backtracking. del self._states[-1] - # Ensure to backtrack to a state that caused the incompatibility - incompatible_state = False - broken_state = self.state - while not incompatible_state: - # Retrieve the last candidate pin and known incompatibilities. - try: - broken_state = self._states.pop() - name, candidate = broken_state.mapping.popitem() - except (IndexError, KeyError): - raise ResolutionImpossible(causes) from None - current_dependencies = { - self._p.identify(d) - for d in self._p.get_dependencies(candidate) - } - incompatible_state = not current_dependencies.isdisjoint( - incompatible_deps - ) + name, candidate, incompatibilities_from_broken = None, None, None - incompatibilities_from_broken = [ - (k, list(v.incompatibilities)) - for k, v in broken_state.criteria.items() - ] + if self._backtrack_strategy in ("backjump", "backjump_fallback"): + try: + ( + name, + candidate, + incompatibilities_from_broken, + ) = self._backjump_iteration( + causes=causes, incompatible_deps=incompatible_deps + ) + except ResolutionImpossible: + if ( + self._backtrack_strategy == "backjump" + or self._fallback_states is None + ): + raise + + # Backjumping failed but fallback to backtracking was requested + self._states = self._fallback_states + self._backtrack_strategy = "backtrack" + self._r.fallback("backjump_fallback", "backtrack") + + if self._backtrack_strategy == "backtrack": + ( + name, + candidate, + incompatibilities_from_broken, + ) = self._backtrack_iteration() + + if ( + name is None + or candidate is None + or incompatibilities_from_broken is None + ): + raise ResolutionImpossible(causes) # Also mark the newly known incompatibility. incompatibilities_from_broken.append((name, [candidate]))