11
11
Iterable ,
12
12
Mapping ,
13
13
NamedTuple ,
14
+ Optional ,
14
15
)
15
16
16
17
from .providers import AbstractProvider
30
31
31
32
if TYPE_CHECKING :
32
33
from .providers import Preference
34
+ from typing_extensions import Literal
35
+
36
+ BacktrackStrategy = Literal ["backjump_fallback" , "backjump" , "backtrack" ]
33
37
34
38
class Result (NamedTuple , Generic [RT , CT , KT ]):
35
39
mapping : Mapping [KT , CT ]
@@ -100,9 +104,12 @@ def __init__(
100
104
self ,
101
105
provider : AbstractProvider [RT , CT , KT ],
102
106
reporter : BaseReporter [RT , CT , KT ],
107
+ backtrack_strategy : BacktrackStrategy = "backjump_fallback" ,
103
108
) -> None :
104
109
self ._p = provider
105
110
self ._r = reporter
111
+ self ._backtrack_strategy : BacktrackStrategy = backtrack_strategy
112
+ self ._fallback_states : Optional [list [State [RT , CT , KT ]]] = None
106
113
self ._states : list [State [RT , CT , KT ]] = []
107
114
108
115
@property
@@ -269,6 +276,77 @@ def _attempt_to_pin_criterion(self, name: KT) -> list[Criterion[RT, CT]]:
269
276
# end, signal for backtracking.
270
277
return causes
271
278
279
+ def _backtrack_iteration (self ) -> tuple [KT , CT , list [tuple [KT , list [CT ]]]]:
280
+ broken_state = self ._states .pop ()
281
+ name , candidate = broken_state .mapping .popitem ()
282
+ incompatibilities_from_broken = [
283
+ (k , list (v .incompatibilities ))
284
+ for k , v in broken_state .criteria .items ()
285
+ ]
286
+
287
+ return name , candidate , incompatibilities_from_broken
288
+
289
+ def _backjump_iteration (
290
+ self ,
291
+ causes : list [RequirementInformation [RT , CT ]],
292
+ incompatible_deps : set [KT ],
293
+ ) -> tuple [KT , CT , list [tuple [KT , list [CT ]]]]:
294
+ # Ensure to backtrack to a state that caused the incompatibility
295
+ incompatible_state = False
296
+ name , candidate , broken_state = None , None , None
297
+
298
+ if (
299
+ self ._backtrack_strategy == "backjump_fallback"
300
+ and self ._fallback_states is None
301
+ ):
302
+ fallback_states = [
303
+ State (
304
+ s .mapping .copy (),
305
+ s .criteria .copy (),
306
+ s .backtrack_causes [:],
307
+ )
308
+ for s in self ._states
309
+ ]
310
+ else :
311
+ fallback_states = None
312
+
313
+ backjump_count = 0
314
+ while not incompatible_state :
315
+ backjump_count += 1
316
+
317
+ # Retrieve the last candidate pin and known incompatibilities
318
+ try :
319
+ broken_state = self ._states .pop ()
320
+ name , candidate = broken_state .mapping .popitem ()
321
+ except (IndexError , KeyError ):
322
+ raise ResolutionImpossible (causes )
323
+ current_dependencies = {
324
+ self ._p .identify (d )
325
+ for d in self ._p .get_dependencies (candidate )
326
+ }
327
+ incompatible_state = not current_dependencies .isdisjoint (
328
+ incompatible_deps
329
+ )
330
+
331
+ # Backup states first time a backjump goes
332
+ # further than a backtrack would have
333
+ if (
334
+ self ._backtrack_strategy == "backjump_fallback"
335
+ and self ._fallback_states is None
336
+ and backjump_count == 2
337
+ ):
338
+ self ._fallback_states = fallback_states
339
+
340
+ if name is None or candidate is None or broken_state is None :
341
+ raise ResolutionImpossible (causes )
342
+
343
+ incompatibilities_from_broken = [
344
+ (k , list (v .incompatibilities ))
345
+ for k , v in broken_state .criteria .items ()
346
+ ]
347
+
348
+ return name , candidate , incompatibilities_from_broken
349
+
272
350
def _backjump (self , causes : list [RequirementInformation [RT , CT ]]) -> bool :
273
351
"""Perform backjumping.
274
352
@@ -299,6 +377,17 @@ def _backjump(self, causes: list[RequirementInformation[RT, CT]]) -> bool:
299
377
5a. If this causes Y' to conflict, we need to backtrack again. Make Y'
300
378
the new Z and go back to step 2.
301
379
5b. If the incompatibilities apply cleanly, end backtracking.
380
+
381
+ If backtracking each iteraction the the loop will:
382
+
383
+ 1. Discard Z.
384
+ 2. Discard Y but remember its incompatibility information gathered
385
+ previously, and the failure we're dealing with right now.
386
+ 3. Push a new state Y' based on X, and apply the incompatibility
387
+ information from Y to Y'.
388
+ 4a. If this causes Y' to conflict, we need to backtrack again. Make Y'
389
+ the new Z and go back to step 2.
390
+ 4b. If the incompatibilities apply cleanly, end backtracking.
302
391
"""
303
392
incompatible_reqs : Iterable [CT | RT ] = itertools .chain (
304
393
(c .parent for c in causes if c .parent is not None ),
@@ -309,28 +398,42 @@ def _backjump(self, causes: list[RequirementInformation[RT, CT]]) -> bool:
309
398
# Remove the state that triggered backtracking.
310
399
del self ._states [- 1 ]
311
400
312
- # Ensure to backtrack to a state that caused the incompatibility
313
- incompatible_state = False
314
- broken_state = self .state
315
- while not incompatible_state :
316
- # Retrieve the last candidate pin and known incompatibilities.
317
- try :
318
- broken_state = self ._states .pop ()
319
- name , candidate = broken_state .mapping .popitem ()
320
- except (IndexError , KeyError ):
321
- raise ResolutionImpossible (causes ) from None
322
- current_dependencies = {
323
- self ._p .identify (d )
324
- for d in self ._p .get_dependencies (candidate )
325
- }
326
- incompatible_state = not current_dependencies .isdisjoint (
327
- incompatible_deps
328
- )
401
+ name , candidate , incompatibilities_from_broken = None , None , None
329
402
330
- incompatibilities_from_broken = [
331
- (k , list (v .incompatibilities ))
332
- for k , v in broken_state .criteria .items ()
333
- ]
403
+ if self ._backtrack_strategy in ("backjump" , "backjump_fallback" ):
404
+ try :
405
+ (
406
+ name ,
407
+ candidate ,
408
+ incompatibilities_from_broken ,
409
+ ) = self ._backjump_iteration (
410
+ causes = causes , incompatible_deps = incompatible_deps
411
+ )
412
+ except ResolutionImpossible :
413
+ if (
414
+ self ._backtrack_strategy == "backjump"
415
+ or self ._fallback_states is None
416
+ ):
417
+ raise
418
+
419
+ # Backjumping failed but fallback to backtracking was requested
420
+ self ._states = self ._fallback_states
421
+ self ._backtrack_strategy = "backtrack"
422
+ self ._r .fallback ("backjump_fallback" , "backtrack" )
423
+
424
+ if self ._backtrack_strategy == "backtrack" :
425
+ (
426
+ name ,
427
+ candidate ,
428
+ incompatibilities_from_broken ,
429
+ ) = self ._backtrack_iteration ()
430
+
431
+ if (
432
+ name is None
433
+ or candidate is None
434
+ or incompatibilities_from_broken is None
435
+ ):
436
+ raise ResolutionImpossible (causes )
334
437
335
438
# Also mark the newly known incompatibility.
336
439
incompatibilities_from_broken .append ((name , [candidate ]))
0 commit comments