@@ -551,6 +551,7 @@ def forwardState(self, REQUEST=None):
551
551
""".."""
552
552
result = {}
553
553
engine = getattr (self , ENGINE_ID )
554
+ rmq = getattr (engine , 'env_fwd_rmq' , False )
554
555
if getattr (self , 'wf_status' , None ) == 'forward' :
555
556
wks = self .getListOfWorkitems ()
556
557
wk = wks [- 1 ]
@@ -569,6 +570,12 @@ def forwardState(self, REQUEST=None):
569
570
else :
570
571
if self .wf_status == 'forward' :
571
572
wk .triggerApplication (wk .id , REQUEST )
573
+ if rmq :
574
+ activity = self .getActivity (wk .id )
575
+ queue_msg (
576
+ "{}|{}" .format (self .absolute_url (),
577
+ self .get_freq (wk .id )),
578
+ queue = self .get_rmq_queue (activity .getId ()))
572
579
result ['triggered' ] = wk .activity_id
573
580
574
581
return engine .jsonify (result )
@@ -636,6 +643,23 @@ def getDestinations(self, workitem_id, path=None):
636
643
'process_to_id' : process .id })
637
644
return destinations
638
645
646
+ def get_rmq_queue (self , act_id ):
647
+ queue = 'fwd_envelopes'
648
+ # Uncomment to allow for separate queues based on Activity
649
+ # if act_id in ['AutomaticQA', 'FMEConversionApplication']:
650
+ # queue = 'poll_envelopes'
651
+ return queue
652
+
653
+ def get_freq (self , workitem_id ):
654
+ """Return the frequency at which the application should be exec
655
+ """
656
+ freq = 1
657
+ application_url = self .getApplicationUrl (workitem_id )
658
+ application = self .unrestrictedTraverse (application_url , None )
659
+ if application :
660
+ freq = int (getattr (application , 'retryFrequency' , freq ))
661
+ return freq
662
+
639
663
def handleWorkitem (self , workitem_id , REQUEST = None ):
640
664
# If it's a previously failed application, retry it, otherwise forward
641
665
# it
@@ -653,9 +677,12 @@ def handleWorkitem(self, workitem_id, REQUEST=None):
653
677
if activity .isAutoStart ():
654
678
self .wf_status = 'forward'
655
679
self .reindex_object ()
656
- if rmq :
657
- queue_msg (self .absolute_url (), queue = 'fwd_envelopes' )
658
680
self .startAutomaticApplication (workitem_id )
681
+ if rmq :
682
+ queue_msg (
683
+ "{}|{}" .format (self .absolute_url (),
684
+ self .get_freq (workitem_id )),
685
+ queue = self .get_rmq_queue (activity .getId ()))
659
686
else :
660
687
self .wf_status = 'manual'
661
688
self .reindex_object ()
@@ -920,9 +947,12 @@ def manageWorkitemCreation(self, workitem_id):
920
947
if activity .isAutoStart ():
921
948
self .wf_status = 'forward'
922
949
self .reindex_object ()
923
- if rmq :
924
- queue_msg (self .absolute_url (), queue = 'fwd_envelopes' )
925
950
self .startAutomaticApplication (workitem_id )
951
+ if rmq :
952
+ queue_msg (
953
+ "{}|{}" .format (self .absolute_url (),
954
+ self .get_freq (workitem_id )),
955
+ queue = self .get_rmq_queue (activity .getId ()))
926
956
else :
927
957
self .wf_status = 'manual'
928
958
self .reindex_object ()
0 commit comments