42
42
43
43
__author__ = "J.L. Lanfranchi, P. Eller"
44
44
45
- __license__ = """Copyright (c) 2014-2018 , The IceCube Collaboration
45
+ __license__ = """Copyright (c) 2014-2025 , The IceCube Collaboration
46
46
47
47
Licensed under the Apache License, Version 2.0 (the "License");
48
48
you may not use this file except in compliance with the License.
69
69
# a warning message. Or we just wait to see if it fails when the user runs the
70
70
# code.
71
71
72
- # TODO: return an OrderedDict instead of a list if the user requests
73
- # intermediate results? Or simply use the `outputs` attribute of each stage to
74
- # dynamically access this?
75
-
76
72
77
73
class Pipeline ():
78
74
"""Instantiate stages according to a parsed config object; excecute
@@ -120,7 +116,8 @@ def __init__(self, config, profile=False):
120
116
self ._source_code_hash = None
121
117
122
118
if isinstance (self ._output_binning , VarBinning ):
123
- self .check_VarBinning ()
119
+ self .assert_varbinning_compat ()
120
+ self .assert_exclusive_varbinning ()
124
121
125
122
# check in case someone decided to add a non-daemonflux parameter with daemon_
126
123
# in it, which would potentially make penalty calculation incorrect
@@ -372,16 +369,18 @@ def get_outputs(self, **get_outputs_kwargs):
372
369
else :
373
370
outputs = self ._get_outputs (** get_outputs_kwargs )
374
371
return outputs
375
-
372
+
376
373
def _get_outputs (self , output_binning = None , output_key = None ):
377
374
"""Get MapSet output"""
378
375
379
376
self .run ()
380
377
381
378
if output_binning is None :
382
379
output_binning = self .output_binning
383
- elif isinstance (output_binning , VarBinning ):
384
- self .check_VarBinning (output_binning )
380
+ if isinstance (output_binning , VarBinning ):
381
+ # checks also have to be done when no new output_binning is passed
382
+ self .assert_varbinning_compat ()
383
+ self .assert_exclusive_varbinning (output_binning = output_binning )
385
384
if output_key is None :
386
385
output_key = self .output_key
387
386
@@ -395,10 +394,11 @@ def _get_outputs(self, output_binning=None, output_key=None):
395
394
outputs = self .data .get_mapset (output_key [0 ], error = output_key [1 ])
396
395
else :
397
396
outputs = self .data .get_mapset (output_key )
398
-
399
- else : #VarBinning
400
- outputs = []
397
+
398
+ else :
399
+ assert isinstance ( output_binning , VarBinning )
401
400
assert self .data .representation == "events"
401
+ outputs = []
402
402
403
403
selections = output_binning .selections
404
404
for i in range (len (output_binning .binnings )):
@@ -424,7 +424,7 @@ def _get_outputs(self, output_binning=None, output_key=None):
424
424
cc .tranlation_modes [output_key ] = 'sum'
425
425
426
426
containers .append (cc )
427
-
427
+
428
428
dat = ContainerSet (name = self .data .name ,
429
429
containers = containers ,
430
430
representation = output_binning .binnings [i ],
@@ -630,26 +630,58 @@ def hash(self):
630
630
def __hash__ (self ):
631
631
return self .hash
632
632
633
- def check_VarBinning (self , output_binning = None ):
634
- """Checks if pipeline works with VarBinning and if VarBinning selections
635
- are exclusive."""
636
- # VarBinning will only work if all stages have apply_mode=events
633
+ def assert_varbinning_compat (self ):
634
+ """Asserts that pipeline setup is compatible with `VarBinning`:
635
+ all stages need to apply to events.
636
+
637
+ Raises
638
+ ------
639
+ ValueError : if at least one stage has apply_mode!='events'
640
+
641
+ """
642
+ incompat = []
637
643
for s in self .stages :
638
- assert s .apply_mode == 'events'
644
+ if not s .apply_mode == 'events' :
645
+ incompat .append (s )
646
+ if len (incompat ) >= 1 :
647
+ str_incompat = ", " .join (
648
+ [f"{ stage .stage_name } .{ stage .service_name } " for stage in incompat ]
649
+ )
650
+ raise ValueError (
651
+ "When a variable binning is used, all stages need to set "
652
+ f"apply_mode='events', but { str_incompat } do(es) not!"
653
+ )
654
+
655
+ def assert_exclusive_varbinning (self , output_binning = None ):
656
+ """Assert that `VarBinning` selections are mutually exclusive.
657
+ This is done individually for each `Container` in `self.data`.
639
658
640
- # now check if VarBinning selection is exclusive (only necessary if list)
641
- if output_binning == None :
659
+ Parameters
660
+ -----------
661
+ output_binning : None, MultiDimBinning, VarBinning
662
+
663
+ Raises
664
+ ------
665
+ ValueError : if a `VarBinning` is tested and at least two selections
666
+ (if applicable) are not mutually exclusive
667
+
668
+ """
669
+ if output_binning is None :
642
670
selections = self .output_binning .selections
643
671
nselections = self .output_binning .nselections
644
672
else :
645
673
selections = output_binning .selections
646
674
nselections = output_binning .nselections
647
675
if isinstance (selections , list ):
676
+ # list of selection-criteria strings
648
677
for c in self .data :
649
678
keep = np .zeros (c .size )
650
679
for i in range (nselections ):
651
680
keep += c .get_keep_mask (selections [i ])
652
- assert np .all (keep <= 1 ), 'Selection is not exclusive'
681
+ if not np .all (keep <= 1 ):
682
+ raise ValueError (
683
+ f"Selections { selections } are not mutually exclusive!"
684
+ )
653
685
654
686
@property
655
687
def output_binning (self ):
@@ -658,14 +690,13 @@ def output_binning(self):
658
690
@output_binning .setter
659
691
def output_binning (self , binning ):
660
692
if isinstance (binning , VarBinning ):
661
- self .check_VarBinning (binning )
693
+ self .assert_varbinning_compat ()
694
+ self .assert_exclusive_varbinning (output_binning = binning )
662
695
self ._output_binning = binning
663
696
664
697
665
698
def test_Pipeline ():
666
699
"""Unit tests for Pipeline class"""
667
- # pylint: disable=line-too-long
668
-
669
700
# TODO: make a test config file with hierarchy AND material selector,
670
701
# uncomment / add in tests commented / removed below
671
702
@@ -741,6 +772,24 @@ def test_Pipeline():
741
772
#current_hier = new_hier
742
773
#current_mat = new_mat
743
774
775
+ #
776
+ # Test: a pipeline using a VarBinning
777
+ #
778
+ p = Pipeline ("settings/pipeline/varbin_example.cfg" )
779
+ out = p .get_outputs ()
780
+ # a split into two event selections has to result in two MapSets
781
+ assert len (out ) == 2
782
+ # a binned apply_mode has to result in a ValueError
783
+ # first get a pre-existing binning
784
+ binned_calc_mode = p .stages [2 ].calc_mode
785
+ assert isinstance (binned_calc_mode , MultiDimBinning )
786
+ p .stages [2 ].apply_mode = binned_calc_mode
787
+ try :
788
+ out = p .get_outputs ()
789
+ except ValueError :
790
+ pass
791
+ else :
792
+ assert False
744
793
745
794
746
795
# ----- Most of this below cang go (?) ---
0 commit comments