9
9
Collection ,
10
10
Generic ,
11
11
Iterable ,
12
+ List ,
12
13
Mapping ,
13
14
NamedTuple ,
15
+ Optional ,
16
+ Set ,
17
+ Tuple ,
14
18
)
15
19
16
20
from .providers import AbstractProvider
30
34
31
35
if TYPE_CHECKING :
32
36
from .providers import Preference
37
+ from typing_extensions import Literal
38
+
39
+ BacktrackStrategy = Literal ["backjump_fallback" , "backjump" , "backtrack" ]
33
40
34
41
class Result (NamedTuple , Generic [RT , CT , KT ]):
35
42
mapping : Mapping [KT , CT ]
@@ -100,9 +107,12 @@ def __init__(
100
107
self ,
101
108
provider : AbstractProvider [RT , CT , KT ],
102
109
reporter : BaseReporter [RT , CT , KT ],
110
+ backtrack_strategy : BacktrackStrategy = "backjump_fallback" ,
103
111
) -> None :
104
112
self ._p = provider
105
113
self ._r = reporter
114
+ self ._backtrack_strategy : BacktrackStrategy = backtrack_strategy
115
+ self ._fallback_states : Optional [list [State [RT , CT , KT ]]] = None
106
116
self ._states : list [State [RT , CT , KT ]] = []
107
117
108
118
@property
@@ -269,6 +279,77 @@ def _attempt_to_pin_criterion(self, name: KT) -> list[Criterion[RT, CT]]:
269
279
# end, signal for backtracking.
270
280
return causes
271
281
282
+ def _backtrack_iteration (self ) -> Tuple [KT , CT , List [Tuple [KT , list [CT ]]]]:
283
+ broken_state = self ._states .pop ()
284
+ name , candidate = broken_state .mapping .popitem ()
285
+ incompatibilities_from_broken = [
286
+ (k , list (v .incompatibilities ))
287
+ for k , v in broken_state .criteria .items ()
288
+ ]
289
+
290
+ return name , candidate , incompatibilities_from_broken
291
+
292
+ def _backjump_iteration (
293
+ self ,
294
+ causes : list [RequirementInformation [RT , CT ]],
295
+ incompatible_deps : Set [KT ],
296
+ ) -> Tuple [KT , CT , List [Tuple [KT , list [CT ]]]]:
297
+ # Ensure to backtrack to a state that caused the incompatibility
298
+ incompatible_state = False
299
+ name , candidate , broken_state = None , None , None
300
+
301
+ if (
302
+ self ._backtrack_strategy == "backjump_fallback"
303
+ and self ._fallback_states is None
304
+ ):
305
+ fallback_states = [
306
+ State (
307
+ s .mapping .copy (),
308
+ s .criteria .copy (),
309
+ s .backtrack_causes [:],
310
+ )
311
+ for s in self ._states
312
+ ]
313
+ else :
314
+ fallback_states = None
315
+
316
+ backjump_count = 0
317
+ while not incompatible_state :
318
+ backjump_count += 1
319
+
320
+ # Retrieve the last candidate pin and known incompatibilities
321
+ try :
322
+ broken_state = self ._states .pop ()
323
+ name , candidate = broken_state .mapping .popitem ()
324
+ except (IndexError , KeyError ):
325
+ raise ResolutionImpossible (causes )
326
+ current_dependencies = {
327
+ self ._p .identify (d )
328
+ for d in self ._p .get_dependencies (candidate )
329
+ }
330
+ incompatible_state = not current_dependencies .isdisjoint (
331
+ incompatible_deps
332
+ )
333
+
334
+ # Backup states first time a backjump goes
335
+ # further than a backtrack would have
336
+ if (
337
+ self ._backtrack_strategy == "backjump_fallback"
338
+ and self ._fallback_states is None
339
+ and backjump_count == 2
340
+ ):
341
+ self ._fallback_states = fallback_states
342
+
343
+ if name is None or candidate is None or broken_state is None :
344
+ raise ResolutionImpossible (causes )
345
+
346
+ incompatibilities_from_broken = [
347
+ (k , list (v .incompatibilities ))
348
+ for k , v in broken_state .criteria .items ()
349
+ ]
350
+
351
+ return name , candidate , incompatibilities_from_broken
352
+
272
353
def _backjump (self , causes : list [RequirementInformation [RT , CT ]]) -> bool :
273
354
"""Perform backjumping.
274
355
@@ -299,6 +380,17 @@ def _backjump(self, causes: list[RequirementInformation[RT, CT]]) -> bool:
299
380
5a. If this causes Y' to conflict, we need to backtrack again. Make Y'
300
381
the new Z and go back to step 2.
301
382
5b. If the incompatibilities apply cleanly, end backtracking.
383
+
384
+ If backtracking each iteraction the the loop will:
385
+
386
+ 1. Discard Z.
387
+ 2. Discard Y but remember its incompatibility information gathered
388
+ previously, and the failure we're dealing with right now.
389
+ 3. Push a new state Y' based on X, and apply the incompatibility
390
+ information from Y to Y'.
391
+ 4a. If this causes Y' to conflict, we need to backtrack again. Make Y'
392
+ the new Z and go back to step 2.
393
+ 4b. If the incompatibilities apply cleanly, end backtracking.
302
394
"""
303
395
incompatible_reqs : Iterable [CT | RT ] = itertools .chain (
304
396
(c .parent for c in causes if c .parent is not None ),
@@ -309,28 +401,42 @@ def _backjump(self, causes: list[RequirementInformation[RT, CT]]) -> bool:
309
401
# Remove the state that triggered backtracking.
310
402
del self ._states [- 1 ]
311
403
312
- # Ensure to backtrack to a state that caused the incompatibility
313
- incompatible_state = False
314
- broken_state = self .state
315
- while not incompatible_state :
316
- # Retrieve the last candidate pin and known incompatibilities.
317
- try :
318
- broken_state = self ._states .pop ()
319
- name , candidate = broken_state .mapping .popitem ()
320
- except (IndexError , KeyError ):
321
- raise ResolutionImpossible (causes ) from None
322
- current_dependencies = {
323
- self ._p .identify (d )
324
- for d in self ._p .get_dependencies (candidate )
325
- }
326
- incompatible_state = not current_dependencies .isdisjoint (
327
- incompatible_deps
328
- )
404
+ name , candidate , incompatibilities_from_broken = None , None , None
329
405
330
- incompatibilities_from_broken = [
331
- (k , list (v .incompatibilities ))
332
- for k , v in broken_state .criteria .items ()
333
- ]
406
+ if self ._backtrack_strategy in ("backjump" , "backjump_fallback" ):
407
+ try :
408
+ (
409
+ name ,
410
+ candidate ,
411
+ incompatibilities_from_broken ,
412
+ ) = self ._backjump_iteration (
413
+ causes = causes , incompatible_deps = incompatible_deps
414
+ )
415
+ except ResolutionImpossible :
416
+ if (
417
+ self ._backtrack_strategy == "backjump"
418
+ or self ._fallback_states is None
419
+ ):
420
+ raise
421
+
422
+ # Backjumping failed but fallback to backtracking was requested
423
+ self ._states = self ._fallback_states
424
+ self ._backtrack_strategy = "backtrack"
425
+ self ._r .fallback ("backjump_fallback" , "backtrack" )
426
+
427
+ if self ._backtrack_strategy == "backtrack" :
428
+ (
429
+ name ,
430
+ candidate ,
431
+ incompatibilities_from_broken ,
432
+ ) = self ._backtrack_iteration ()
433
+
434
+ if (
435
+ name is None
436
+ or candidate is None
437
+ or incompatibilities_from_broken is None
438
+ ):
439
+ raise ResolutionImpossible (causes )
334
440
335
441
# Also mark the newly known incompatibility.
336
442
incompatibilities_from_broken .append ((name , [candidate ]))
0 commit comments