@@ -419,15 +419,21 @@ def evaluateMapState(self, function_input, key, metadata, sapi):
419419        self ._logger .debug ("[StateUtils] evaluateMapState, maxConcurrency: "  +  str (maxConcurrency ))
420420        self ._logger .debug ("[StateUtils] evaluateMapState metadata: "  +  str (metadata ))
421421
422+         self ._logger .info ("[StateUtils] evaluateMapState, maxConcurrency: "  +  str (maxConcurrency ))
423+         self ._logger .info ("[StateUtils] evaluateMapState metadata: "  +  str (metadata ))
424+ 
422425        counter_name_topic  =  self .functionstatename  +  "-"  +  self .sandboxid 
423426
424427        total_branch_count  =  len (function_input ) # all branches executed concurrently 
425- 
428+         #sapi.put(name_prefix + "_" + "mapInputCount", str(len(function_input))) 
429+   
426430        klist  =  [total_branch_count ]
427431
428432        self .parsedfunctionstateinfo ["BranchCount" ] =  int (total_branch_count ) # overwrite parsed BranchCount with new value 
429433        self ._logger .debug ("[StateUtils] evaluateMapState, total_branch_count: "  +  str (total_branch_count ))
430434
435+         self ._logger .info ("[StateUtils] evaluateMapState, total_branch_count: "  +  str (total_branch_count ))
436+ 
431437        # prepare counter metadata 
432438        counter_metadata  =  {}
433439        counter_metadata ["__state_action" ] =  "post_map_processing" 
@@ -459,6 +465,7 @@ def evaluateMapState(self, function_input, key, metadata, sapi):
459465        counter_name_value_metadata ["__state_action" ] =  "post_map_processing" 
460466        counter_name_value_metadata ["state_counter" ] =  metadata ["state_counter" ]
461467        self ._logger .debug ("[StateUtils] evaluateMapState, metadata[state_counter]: "  +  str (metadata ["state_counter" ]))
468+         self ._logger .info ("[StateUtils] evaluateMapState, metadata[state_counter]: "  +  str (metadata ["state_counter" ]))
462469        self .mapStateCounter  =  int (metadata ["state_counter" ])
463470
464471        counter_name_value  =  {"__mfnmetadata" : counter_name_value_metadata , "__mfnuserdata" : '{}' }
@@ -506,6 +513,8 @@ def evaluateMapState(self, function_input, key, metadata, sapi):
506513        assert  py3utils .is_string (workflow_instance_metadata_storage_key )
507514        self ._logger .debug ("[StateUtils] full_metadata_encoded put key: "  +  str (workflow_instance_metadata_storage_key ))
508515
516+         self ._logger .info ("[StateUtils] full_metadata_encoded put key: "  +  str (workflow_instance_metadata_storage_key ))
517+ 
509518        sapi .put (workflow_instance_metadata_storage_key , json .dumps (metadata ))
510519
511520        # Now provide each branch with its own input 
@@ -523,9 +532,14 @@ def evaluateMapState(self, function_input, key, metadata, sapi):
523532            self ._logger .debug ("\t  Map State StartAt:"  +  startat )
524533            self ._logger .debug ("\t  Map State input:"  +  str (function_input [i ]))
525534
535+             self ._logger .info ("\t  Map State StartAt:"  +  startat )
536+             self ._logger .info ("\t  Map State input:"  +  str (function_input [i ]))
537+ 
526538        return  function_input , metadata 
527539
528540    def  evaluatePostMap (self , function_input , key , metadata , sapi ):
541+         self ._logger .info ("\t  inside evaluatePostMap: "  +  str (function_input )+  " "  +  str (metadata ) +  " "  +  str (sapi ))
542+ 
529543
530544        name_prefix  =  self .functiontopic  +  "_"  +  key 
531545
@@ -542,11 +556,15 @@ def evaluatePostMap(self, function_input, key, metadata, sapi):
542556
543557        self ._logger .debug ("\t  metadata:"  +  json .dumps (metadata ))
544558
559+         self ._logger .info ("\t  metadata:"  +  json .dumps (metadata ))
560+ 
545561        workflow_instance_metadata_storage_key  =  str (function_input ["WorkflowInstanceMetadataStorageKey" ])
546562        assert  py3utils .is_string (workflow_instance_metadata_storage_key )
547563        full_metadata_encoded  =  sapi .get (workflow_instance_metadata_storage_key )
548564        self ._logger .debug ("[StateUtils] full_metadata_encoded get: "  +  str (full_metadata_encoded ))
549565
566+         self ._logger .info ("[StateUtils] full_metadata_encoded get: "  +  str (full_metadata_encoded ))
567+ 
550568        full_metadata  =  json .loads (full_metadata_encoded )
551569        full_metadata ["state_counter" ] =  state_counter 
552570
@@ -557,6 +575,8 @@ def evaluatePostMap(self, function_input, key, metadata, sapi):
557575        branchOutputKeysSet  =  sapi .retrieveSet (branchOutputKeysSetKey )
558576        self ._logger .debug ("\t  branchOutputKeysSet: "  +  str (branchOutputKeysSet ))
559577
578+         self ._logger .info ("\t  branchOutputKeysSet: "  +  str (branchOutputKeysSet ))
579+ 
560580        if  not  branchOutputKeysSet :
561581            self ._logger .error ("[StateUtils] branchOutputKeysSet is empty" )
562582            raise  Exception ("[StateUtils] branchOutputKeysSet is empty" )
@@ -576,13 +596,17 @@ def evaluatePostMap(self, function_input, key, metadata, sapi):
576596        NumBranchesFinished  =  abs (counterValue )
577597        self ._logger .debug ("\t  NumBranchesFinished:"  +  str (NumBranchesFinished ))
578598
599+         self ._logger .info ("\t  NumBranchesFinished:"  +  str (NumBranchesFinished ))
600+ 
579601        do_cleanup  =  False 
580602
581603        if  klist [- 1 ] ==  NumBranchesFinished :
582604            do_cleanup  =  True 
583605
584606        self ._logger .debug ("\t  do_cleanup:"  +  str (do_cleanup ))
585607
608+         self ._logger .info ("\t  do_cleanup:"  +  str (do_cleanup ))
609+ 
586610        counterName  =  str (mapInfo ["CounterName" ])
587611        counter_metadata_key_name  =  counterName  +  "_metadata" 
588612        assert  py3utils .is_string (counterName )
@@ -610,6 +634,10 @@ def evaluatePostMap(self, function_input, key, metadata, sapi):
610634
611635        self ._logger .debug ("\t  mapInfo_BranchOutputKeys length: "  +  str (len (mapInfo ["BranchOutputKeys" ])))
612636
637+         self ._logger .info ("\t  mapInfo_BranchOutputKeys:"  +  str (mapInfo ["BranchOutputKeys" ]))
638+ 
639+         self ._logger .info ("\t  mapInfo_BranchOutputKeys length: "  +  str (len (mapInfo ["BranchOutputKeys" ])))
640+ 
613641        for  outputkey  in  mapInfo ["BranchOutputKeys" ]:
614642            outputkey  =  str (outputkey )
615643            if  outputkey  in  branchOutputKeysSet : # mapInfo["BranchOutputKeys"]: 
@@ -623,15 +651,23 @@ def evaluatePostMap(self, function_input, key, metadata, sapi):
623651                self ._logger .debug ("\t  branchOutput:"  +  branchOutput )
624652                self ._logger .debug ("\t  branchOutput_decoded(type):"  +  str (type (branchOutput_decoded )))
625653                self ._logger .debug ("\t  branchOutput_decoded:"  +  str (branchOutput_decoded ))
654+                 self ._logger .info ("\t  branchOutput(type):"  +  str (type (branchOutput )))
655+                 self ._logger .info ("\t  branchOutput:"  +  branchOutput )
656+                 self ._logger .info ("\t  branchOutput_decoded(type):"  +  str (type (branchOutput_decoded )))
657+                 self ._logger .info ("\t  branchOutput_decoded:"  +  str (branchOutput_decoded ))
626658                post_map_output_values  =  post_map_output_values  +  [branchOutput_decoded ]
627659                if  do_cleanup :
628660                    sapi .delete (outputkey ) # cleanup the key from data layer 
629661                    self ._logger .debug ("\t  cleaned output key:"  +  outputkey )
662+                     self ._logger .info ("\t  cleaned output key:"  +  outputkey )
630663            else :
631664                post_map_output_values  =  post_map_output_values  +  [None ]
632665                self ._logger .debug ("\t  this_BranchOutputKeys is not contained: "  +  str (outputkey ))
633666
667+                 self ._logger .info ("\t  this_BranchOutputKeys is not contained: "  +  str (outputkey ))
668+ 
634669        self ._logger .debug ("\t  post_map_output_values:"  +  str (post_map_output_values ))
670+         self ._logger .info ("\t  post_map_output_values:"  +  str (post_map_output_values ))
635671        while  (sapi .get (name_prefix  +  "_"  +  "mapStatePartialResult" )) ==  "" :
636672            time .sleep (0.1 ) # wait until value is available 
637673
@@ -640,15 +676,25 @@ def evaluatePostMap(self, function_input, key, metadata, sapi):
640676        mapStatePartialResult  +=  post_map_output_values 
641677        sapi .put (name_prefix  +  "_"  +  "mapStatePartialResult" , str (mapStatePartialResult ))
642678
679+         time .sleep (5.0 )
680+  
643681        # now apply ResultPath and OutputPath 
644682        if  do_cleanup :
645683
646684            sapi .deleteSet (branchOutputKeysSetKey )
647685
686+         while  (sapi .get (name_prefix  +  "_"  +  "mapInputCount" ) ==  "" ):
687+             time .sleep (0.1 ) # wait until value is available 
688+ 
648689        if  ast .literal_eval (sapi .get (name_prefix  +  "_"  +  "mapInputCount" )) ==  len (mapStatePartialResult ):
690+             #time.sleep(0.5) 
649691
650692            # we are ready to publish  but need to honour ResultPath and OutputPath 
693+             while  (sapi .get (name_prefix  +  "_"  + "mapStatePartialResult" ) ==  "" ):
694+                 time .sleep (0.1 )
651695            res_raw  =  ast .literal_eval (sapi .get (name_prefix  +  "_"  + "mapStatePartialResult" ))
696+             self ._logger .info ("[StateUtils] evaluatePostMap: res_raw"  +  str (res_raw ) +  " vs. "  +  sapi .get (name_prefix  +  "_"  +  "mapInputCount" ))
697+  
652698
653699            # remove unwanted keys from input before publishing 
654700            function_input  =  {}
@@ -668,6 +714,9 @@ def evaluatePostMap(self, function_input, key, metadata, sapi):
668714            sapi .delete (name_prefix  +  "_"  +  "mapStatePartialResult" )
669715            sapi .delete (name_prefix  +  "_"  +  "tobeProcessedlater" )
670716            post_map_output_values  =  function_input_post_output 
717+         else :
718+             #raise Exception("mapInputCount" + str(sapi.get(name_prefix + "_" + "mapInputCount")) + " does not match mapStatePartialResult: " + str(mapStatePartialResult)) 
719+             print ("mapInputCount"  +  str (sapi .get (name_prefix  +  "_"  +  "mapInputCount" )) +  " does not match mapStatePartialResult: "  +  str (mapStatePartialResult ))
671720        return  post_map_output_values , full_metadata 
672721
673722    def  evaluateParallelState (self , function_input , key , metadata , sapi ):
@@ -964,7 +1013,7 @@ def evaluatePostParallel(self, function_input, key, metadata, sapi):
9641013
9651014    def  evaluateNonTaskState (self , function_input , key , metadata , sapi ):
9661015        # 3. Evaluate Non Task states 
967-         # self._logger.debug ("[StateUtils] NonTask state type: " + str(self.functionstatetype))
1016+         self ._logger .info ("[StateUtils] NonTask state type: "  +  str (self .functionstatetype ))
9681017        #self._logger.debug("[StateUtils] Welcome to evaluateNonTaskState! Current key:" + str(key)) 
9691018        function_output  =  None 
9701019        if  self .functionstatetype  ==  StateUtils .choiceStateType :
@@ -1090,6 +1139,9 @@ def evaluateNonTaskState(self, function_input, key, metadata, sapi):
10901139            self ._logger .debug ("[StateUtils] Map state maxConcurrency: "  +  str (maxConcurrency ))
10911140            self ._logger .debug ("[StateUtils] Map state handling" )
10921141
1142+             self ._logger .info ("[StateUtils] Map state maxConcurrency: "  +  str (maxConcurrency ))
1143+             self ._logger .info ("[StateUtils] Map state handling metadata: "  +  str (metadata ) )
1144+ 
10931145            if  "__state_action"  not  in metadata  or  metadata ["__state_action" ] !=  "post_map_processing" :
10941146                # here we start the iteration process on a first batch 
10951147                if  maxConcurrency  !=  0 :
@@ -1099,26 +1151,41 @@ def evaluateNonTaskState(self, function_input, key, metadata, sapi):
10991151                    tobeProcessednow  =  function_input 
11001152                    tobeProcessedlater  =  []
11011153                self ._logger .debug ("[StateUtils] Map state function_input split:"  +  str (tobeProcessednow ) +  " "  +  str (tobeProcessedlater ))
1154+                 self ._logger .info ("[StateUtils] Map state function_input split:"  +  str (tobeProcessednow ) +  " "  +  str (tobeProcessedlater ))
11021155                sapi .put (name_prefix  +  "_"  +  "tobeProcessedlater" , str (tobeProcessedlater )) # store elements to be processed on DL 
11031156                sapi .put (name_prefix  +  "_"  +  "mapStatePartialResult" , "[]" ) # initialise the collector variable 
11041157                sapi .put (name_prefix  +  "_"  +  "mapInputCount" , str (len (function_input )))
1158+                 #metadata["__state_action"] = "" 
11051159
11061160                function_output , metadata  =  self .evaluateMapState (tobeProcessednow , key , metadata , sapi )
1161+                 #metadata["__state_action"] = "" 
1162+ 
11071163
11081164            elif  metadata ["__state_action" ] ==  "post_map_processing" :
11091165                tobeProcessedlater  =  ast .literal_eval (sapi .get (name_prefix  +  "_"  +  "tobeProcessedlater" )) # get all elements that have not yet been processed 
11101166                self ._logger .debug ("[StateUtils] Map state post_map processing input:"  +  str (tobeProcessedlater ))
1167+                 self ._logger .info ("[StateUtils] Map state post_map processing input:"  +  str (tobeProcessedlater ))
11111168                # we need to decide at this point if there is a need for more batches. if so: 
11121169
11131170                if  len (tobeProcessedlater ) >  0 : # we need to start another batch 
1171+                     self ._logger .info ("[StateUtils] tobeProcessedlater: "  +  str (tobeProcessedlater )+  ", function_input: "  + str (function_input ))
11141172                    function_output , metadata2  =  self .evaluatePostMap (function_input , key , metadata , sapi ) # take care not to overwrite metadata 
1173+                     self ._logger .info ("[StateUtils] after evaluatPostMap: "  +  str (function_output ))
11151174                    function_output , metadata  =  self .evaluateMapState (tobeProcessedlater [:maxConcurrency ], key , metadata , sapi ) # start a new batch 
1175+                     self ._logger .info ("[StateUtils] after evaluateMapState:"  +  str (function_output ))
1176+                     self ._logger .info ("[StateUtils] after evaluateMapState, metadata: "  +  str (metadata ))
11161177                    sapi .put (name_prefix  +  "_"  +  "tobeProcessedlater" , str (tobeProcessedlater [maxConcurrency :])) # store remaining elements to be processed on DL 
1178+                     self ._logger .info ("[StateUtils] after sapi.put: "  +  str (tobeProcessedlater [maxConcurrency :]))
1179+  
11171180
11181181                else :# no more batches required. we are at the iteration end, publish the final result 
11191182                    self ._logger .debug ("[StateUtils] Map state input final stage: "  +  str (function_input ))
1183+                     self ._logger .info ("[StateUtils] Map state input final stage: "  +  str (function_input ))
11201184                    function_output , metadata  =  self .evaluatePostMap (function_input , key , metadata , sapi )
11211185
1186+             elif  metadata ["__state_action" ] ==  '' :
1187+                 raise  Exception ("Unkown state action in map state" )
1188+ 
11221189            else :
11231190                raise  Exception ("Unknow action type in map state" )
11241191
0 commit comments