From eb07c81173e0bb281d219625bcdb1e05683ea820 Mon Sep 17 00:00:00 2001 From: AlexCXC <1223408988@qq.com> Date: Mon, 18 Dec 2023 18:18:12 +0800 Subject: [PATCH 1/3] check workflow schedule to_users before do schedule --- dtable_events/workflow/workflow_schedules_scanner.py | 12 ++++++++++-- 1 file changed, 10 insertions(+), 2 deletions(-) diff --git a/dtable_events/workflow/workflow_schedules_scanner.py b/dtable_events/workflow/workflow_schedules_scanner.py index bfd96714..ac2952b0 100644 --- a/dtable_events/workflow/workflow_schedules_scanner.py +++ b/dtable_events/workflow/workflow_schedules_scanner.py @@ -41,13 +41,21 @@ def start(self): WorkflowSchedulesScannerTimer(self._db_session_class).start() -def do_notify_schedule(schedule_id, task_id, action): +def do_notify_schedule(schedule_id, task_id, action, db_session): try: offset = action['offset'] token = action['token'] to_users = action['to_users'] if not to_users or not isinstance(to_users, list): return + sql = "SELECT user FROM profile_profile WHERE user IN :users" + try: + valid_users = [result.user for result in db_session.execute(sql, {'users': to_users})] + except Exception as e: + logging.error('query valid users from db users: %s ', to_users) + return + if not valid_users: + return detail = { 'task_id': task_id, 'token': token, @@ -77,7 +85,7 @@ def scan_workflow_schedules(db_session): logging.error('schedule: %s action: %s invalid', schedule_id, action) continue if action.get('type') == 'notify': - do_notify_schedule(schedule_id, task_id, action) + do_notify_schedule(schedule_id, task_id, action, db_session) try: db_session.execute('UPDATE dtable_workflow_task_schedules SET is_executed=1 WHERE id=:schedule_id', { 'schedule_id': schedule_id From 0ebd7b51cf8c0e0cd44577cfa13259be4d3b9b77 Mon Sep 17 00:00:00 2001 From: AlexCXC <1223408988@qq.com> Date: Wed, 20 Dec 2023 11:06:33 +0800 Subject: [PATCH 2/3] opt worklfow notify --- dtable_events/workflow/workflow_schedules_scanner.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/dtable_events/workflow/workflow_schedules_scanner.py b/dtable_events/workflow/workflow_schedules_scanner.py index ac2952b0..81f46da3 100644 --- a/dtable_events/workflow/workflow_schedules_scanner.py +++ b/dtable_events/workflow/workflow_schedules_scanner.py @@ -62,7 +62,7 @@ def do_notify_schedule(schedule_id, task_id, action, db_session): 'offset': offset } dtable_web_api = DTableWebAPI(DTABLE_WEB_SERVICE_URL) - dtable_web_api.internal_add_notification(to_users, 'workflow_processing_expired', detail) + dtable_web_api.internal_add_notification(valid_users, 'workflow_processing_expired', detail) except Exception as e: logging.exception(e) logging.error('schedule_id: %s task_id: %s action: %s send notifications error: %s', schedule_id, task_id, action, e) From c7d469be0ae4694ec0e9d18dbd861b0a7aef465f Mon Sep 17 00:00:00 2001 From: AlexCXC <1223408988@qq.com> Date: Wed, 20 Dec 2023 14:13:23 +0800 Subject: [PATCH 3/3] opt internal submit workflow --- dtable_events/automations/actions.py | 14 +------------- dtable_events/utils/dtable_web_api.py | 20 +++++++++++++++++++- 2 files changed, 20 insertions(+), 14 deletions(-) diff --git a/dtable_events/automations/actions.py b/dtable_events/automations/actions.py index e42c777b..ca6125c9 100644 --- a/dtable_events/automations/actions.py +++ b/dtable_events/automations/actions.py @@ -2296,20 +2296,8 @@ def do_action(self): logger.error('rule: %s submit workflow: %s append row dtable: %s, error: %s', self.auto_rule.rule_id, self.token, self.auto_rule.dtable_uuid, e) return - internal_submit_workflow_url = DTABLE_WEB_SERVICE_URL.strip('/') + '/api/v2.1/workflows/%s/internal-task-submit/' % self.token - data = { - 'row_id': row_id, - 'replace': 'true', - 'submit_from': 'Automation Rule', - 'automation_rule_id': self.auto_rule.rule_id - } - logger.debug('trigger workflow data: %s', data) try: - header_token = 'Token ' + jwt.encode({'token': self.token}, DTABLE_PRIVATE_KEY, 'HS256') - resp = requests.post(internal_submit_workflow_url, data=data, headers={'Authorization': header_token}) - if resp.status_code != 200: - logger.error('rule: %s row_id: %s new workflow: %s task error status code: %s content: %s', self.auto_rule.rule_id, row_id, self.token, resp.status_code, resp.content) - self.auto_rule.set_done_actions() + self.auto_rule.dtable_web_api.internal_submit_row_workflow(self.token, row_id, 'Automation Rule', automation_rule_id=self.auto_rule.rule_id) except Exception as e: logger.error('submit workflow: %s row_id: %s error: %s', self.token, row_id, e) diff --git a/dtable_events/utils/dtable_web_api.py b/dtable_events/utils/dtable_web_api.py index e908cf6b..66138ea0 100644 --- a/dtable_events/utils/dtable_web_api.py +++ b/dtable_events/utils/dtable_web_api.py @@ -122,7 +122,7 @@ def internal_add_notification(self, to_users, msg_type, detail): url = '%(server_url)s/api/v2.1/internal-notifications/?from=dtable_events' % { 'server_url': self.dtable_web_service_url } - token = jwt.encode({}, DTABLE_PRIVATE_KEY, algorithm='HS256') + token = jwt.encode({'is_internal': True}, DTABLE_PRIVATE_KEY, algorithm='HS256') headers = {'Authorization': 'Token ' + token} resp = requests.post(url, json={ 'detail': detail, @@ -130,3 +130,21 @@ def internal_add_notification(self, to_users, msg_type, detail): 'type': msg_type }, headers=headers) return parse_response(resp) + + def internal_submit_row_workflow(self, workflow_token, row_id, submit_from, **kwargs): + url = '%(server_url)s/api/v2.1/workflows/%(workflow_token)s/internal-task-submit/?from=dtable_events' % { + 'server_url': self.dtable_web_service_url, + 'workflow_token': workflow_token + } + data = { + 'row_id': row_id, + 'replace': 'true', + 'submit_from': submit_from + } + if kwargs.get('automation_rule_id'): + data['automation_rule_id'] = kwargs['automation_rule_id'] + logger.debug('trigger workflow data: %s', data) + token = jwt.encode({'token': workflow_token}, DTABLE_PRIVATE_KEY, algorithm='HS256') + headers = {'Authorization': 'Token ' + token} + resp = requests.post(url, data=data, headers=headers) + return parse_response(resp)