From eea5e55c16390ed2cd65eb852ce27fe6be5b08f4 Mon Sep 17 00:00:00 2001 From: bender Date: Wed, 8 Jan 2025 14:38:45 -0500 Subject: [PATCH 01/10] wip --- synapse/cortex.py | 17 ++++++++++ synapse/lib/agenda.py | 46 ++++++++++++++++++++++++-- synapse/lib/cell.py | 5 +++ synapse/tests/test_lib_agenda.py | 56 +++++++++++++++++++++++++++++++- 4 files changed, 121 insertions(+), 3 deletions(-) diff --git a/synapse/cortex.py b/synapse/cortex.py index 603da3dee6d..ef6d17aa5ab 100644 --- a/synapse/cortex.py +++ b/synapse/cortex.py @@ -741,6 +741,19 @@ async def watchAllUserNotifs(self, offs=None): async def getHttpExtApiByPath(self, path): return await self.cell.getHttpExtApiByPath(path) + async def cancelRunningSchedulerJobs(self): + return await self.cell.agenda.cancelAll() + + async def pauseScheduler(self): + await self._reqUserAllowed(('agenda', 'control')) + if self.cell.agenda is not None: + await self.cell.agenda.pause() + + async def resumeScheduler(self): + await self._reqUserAllowed(('agenda', 'control')) + if self.cell.agenda is not None: + await self.cell.agenda.resume() + class Cortex(s_oauth.OAuthMixin, s_cell.Cell): # type: ignore ''' A Cortex implements the synapse hypergraph. @@ -1493,6 +1506,10 @@ def _initCorePerms(self): 'desc': 'Controls the ability to check if the Axon contains a file.'}, {'perm': ('axon', 'del'), 'gate': 'cortex', 'desc': 'Controls the ability to remove a file from the Axon.'}, + + {'perm': ('agenda', 'control'), 'gate': 'cortex', + 'desc': 'Controls the scheduler state'}, + )) for pdef in self._cortex_permdefs: s_storm.reqValidPermDef(pdef) diff --git a/synapse/lib/agenda.py b/synapse/lib/agenda.py index f2a10f1e7c2..0a52eae8a87 100644 --- a/synapse/lib/agenda.py +++ b/synapse/lib/agenda.py @@ -445,6 +445,7 @@ async def __anit__(self, core): self.appts = {} # Dict[bytes: Appt] self._next_indx = 0 # index a new appt gets assigned self.tickoff = 0 # Used for test overrides + self._paused = False # Used to track paused state self._wake_event = s_coro.Event() # Causes the scheduler loop to wake up self.onfini(self._wake_event.set) @@ -707,7 +708,20 @@ async def clearRunningStatus(self): for appt in list(self.appts.values()): if appt.isrunning: logger.debug(f'Clearing the isrunning flag for {appt.iden}') - await self.core.addCronEdits(appt.iden, {'isrunning': False}) + edits = { + 'isrunning': False, + 'lastfinishtime': self._getNowTick(), + 'lasterrs': ['aborted'] + appt.lasterrs[-4:] + } + await self.core.addCronEdits(appt.iden, edits) + + async def pause(self): + self._paused = True + self._wake_event.set() + + async def resume(self): + self._paused = False + self._wake_event.set() async def runloop(self): ''' @@ -716,7 +730,7 @@ async def runloop(self): while not self.isfini: timeout = None - if self.apptheap: + if self.apptheap and not self._paused: timeout = self.apptheap[0].nexttime - self._getNowTick() if timeout is None or timeout > 0: @@ -726,6 +740,9 @@ async def runloop(self): if self.isfini: return + if self._paused: + continue + now = self._getNowTick() while self.apptheap and self.apptheap[0].nexttime <= now: @@ -899,3 +916,28 @@ async def _runJob(self, user, appt): if not self.isfini: # fire beholder event before invoking nexus change (in case readonly) await self.core.feedBeholder('cron:stop', {'iden': appt.iden}) + + async def cancelAll(self): + now = self._getNowTick() + + for appt in self.appts.values(): + logger.warning(f'Cancelling agenda task {appt.iden}') + logger.warning(f'Task state: isrunning={appt.isrunning}, task={appt.task}') + + if appt.isrunning: + logger.warning(f'Found running task for {appt.iden}') + + await appt.task.kill() + + if self.core.isactive: + edits = { + 'isrunning': False, + 'errcount': appt.errcount, + 'lasterrs': appt.lasterrs, + 'lastfinishtime': now, + 'lastresult': 'cancelled', + } + await self.core.addCronEdits(appt.iden, edits) + + self.apptheap = [] + self._wake_event.set() diff --git a/synapse/lib/cell.py b/synapse/lib/cell.py index adcd1dc5b76..56b6ddbcc23 100644 --- a/synapse/lib/cell.py +++ b/synapse/lib/cell.py @@ -2150,6 +2150,11 @@ async def promote(self, graceful=False): logger.debug(f'PROMOTION: Connecting to {mirurl} to request leadership handoff{_dispname}.') async with await s_telepath.openurl(mirurl) as lead: + + if hasattr(lead, 'agenda'): + await lead.cancelRunningSchedulerJobs() + await lead.pauseScheduler() + await lead.handoff(myurl) logger.warning(f'PROMOTION: Completed leadership handoff to {myurl}{_dispname}') return diff --git a/synapse/tests/test_lib_agenda.py b/synapse/tests/test_lib_agenda.py index 285b793610e..eeadb6b7c4b 100644 --- a/synapse/tests/test_lib_agenda.py +++ b/synapse/tests/test_lib_agenda.py @@ -441,7 +441,7 @@ def looptime(): await self.asyncraises(s_exc.DupIden, core.addCronJob(cdef)) await core.delCronJob(viewiden) - self.nn(core.getAuthGate(viewiden)) + self.nn(await core.getAuthGate(viewiden)) async def test_agenda_persistence(self): ''' Test we can make/change/delete appointments and they are persisted to storage ''' @@ -1097,3 +1097,57 @@ async def task(): self.eq(cdef01.get('lastresult'), 'cancelled') self.gt(cdef00['laststarttime'], 0) self.eq(cdef00['laststarttime'], cdef01['laststarttime']) + + async def test_agenda_graceful_promotion_with_running_cron(self): + + async with self.getTestAha() as aha: + + conf00 = { + 'aha:provision': await aha.addAhaSvcProv('00.cortex') + } + + async with self.getTestCore(conf=conf00) as core00: + self.false(core00.conf.get('mirror')) + + q = ''' + while((true)) { + $lib.log.error('I AM A ERROR LOG MESSAGE') + $lib.time.sleep(6) + } + ''' + msgs = await core00.stormlist('cron.at --now $q', opts={'vars': {'q': q}}) + self.stormHasNoWarnErr(msgs) + + crons00 = await core00.callStorm('return($lib.cron.list())') + self.len(1, crons00) + + prov01 = {'mirror': '00.cortex'} + conf01 = { + 'aha:provision': await aha.addAhaSvcProv('01.cortex', provinfo=prov01), + } + + async with self.getTestCore(conf=conf01) as core01: + + with self.getAsyncLoggerStream('synapse.storm.log', 'I AM A ERROR LOG MESSAGE') as stream: + self.true(await stream.wait(timeout=6)) + + cron = await core00.callStorm('return($lib.cron.list())') + self.len(1, cron) + self.true(cron[0].get('isrunning')) + + await core01.promote(graceful=True) + + self.false(core00.isactive) + self.true(core01.isactive) + + await core00.sync() + + cron00 = await core00.callStorm('return($lib.cron.list())') + self.len(1, cron00) + self.false(cron00[0].get('isrunning')) + self.eq(cron00[0].get('lasterrs')[0], 'cancelled') + + cron01 = await core01.callStorm('return($lib.cron.list())') + self.len(1, cron01) + self.false(cron01[0].get('isrunning')) + self.eq(cron01[0].get('lasterrs')[0], 'cancelled') From aa65d9230bbb490a0f201260d5c456cefccaa787 Mon Sep 17 00:00:00 2001 From: bender Date: Wed, 8 Jan 2025 15:28:38 -0500 Subject: [PATCH 02/10] updates --- synapse/lib/agenda.py | 1 + synapse/lib/cell.py | 2 +- synapse/tests/test_lib_agenda.py | 47 ++++++++++++++++++++++++++++++++ 3 files changed, 49 insertions(+), 1 deletion(-) diff --git a/synapse/lib/agenda.py b/synapse/lib/agenda.py index 0a52eae8a87..2ba2e51d490 100644 --- a/synapse/lib/agenda.py +++ b/synapse/lib/agenda.py @@ -928,6 +928,7 @@ async def cancelAll(self): logger.warning(f'Found running task for {appt.iden}') await appt.task.kill() + await self.core.feedBeholder('cron:stop', {'iden': appt.iden}) if self.core.isactive: edits = { diff --git a/synapse/lib/cell.py b/synapse/lib/cell.py index 56b6ddbcc23..3b444e76ac4 100644 --- a/synapse/lib/cell.py +++ b/synapse/lib/cell.py @@ -2151,7 +2151,7 @@ async def promote(self, graceful=False): logger.debug(f'PROMOTION: Connecting to {mirurl} to request leadership handoff{_dispname}.') async with await s_telepath.openurl(mirurl) as lead: - if hasattr(lead, 'agenda'): + if 'storm' in lead.methinfo: await lead.cancelRunningSchedulerJobs() await lead.pauseScheduler() diff --git a/synapse/tests/test_lib_agenda.py b/synapse/tests/test_lib_agenda.py index eeadb6b7c4b..afb1941a056 100644 --- a/synapse/tests/test_lib_agenda.py +++ b/synapse/tests/test_lib_agenda.py @@ -1151,3 +1151,50 @@ async def test_agenda_graceful_promotion_with_running_cron(self): self.len(1, cron01) self.false(cron01[0].get('isrunning')) self.eq(cron01[0].get('lasterrs')[0], 'cancelled') + + async def test_agenda_force_promotion_with_running_cron(self): + + async with self.getTestAha() as aha: + + conf00 = { + 'aha:provision': await aha.addAhaSvcProv('00.cortex') + } + + async with self.getTestCore(conf=conf00) as core00: + self.false(core00.conf.get('mirror')) + + q = ''' + while((true)) { + $lib.log.error('I AM A ERROR LOG MESSAGE') + $lib.time.sleep(6) + } + ''' + msgs = await core00.stormlist('cron.at --now $q', opts={'vars': {'q': q}}) + self.stormHasNoWarnErr(msgs) + + crons00 = await core00.callStorm('return($lib.cron.list())') + self.len(1, crons00) + + prov01 = {'mirror': '00.cortex'} + conf01 = { + 'aha:provision': await aha.addAhaSvcProv('01.cortex', provinfo=prov01), + } + + async with self.getTestCore(conf=conf01) as core01: + + with self.getAsyncLoggerStream('synapse.storm.log', 'I AM A ERROR LOG MESSAGE') as stream: + self.true(await stream.wait(timeout=6)) + + cron = await core00.callStorm('return($lib.cron.list())') + self.len(1, cron) + self.true(cron[0].get('isrunning')) + + await core01.promote(graceful=False) + + self.true(core00.isactive) + self.true(core01.isactive) + + cron01 = await core01.callStorm('return($lib.cron.list())') + self.len(1, cron01) + self.false(cron01[0].get('isrunning')) + self.eq(cron01[0].get('lasterrs')[0], 'aborted') From a138c209ed86fd1466ba16d3f19742c1a8c38a38 Mon Sep 17 00:00:00 2001 From: bender Date: Wed, 8 Jan 2025 16:00:40 -0500 Subject: [PATCH 03/10] updates --- synapse/tests/test_lib_agenda.py | 33 ++++++++++++++++++++++++++++++++ 1 file changed, 33 insertions(+) diff --git a/synapse/tests/test_lib_agenda.py b/synapse/tests/test_lib_agenda.py index afb1941a056..bfdd7c97234 100644 --- a/synapse/tests/test_lib_agenda.py +++ b/synapse/tests/test_lib_agenda.py @@ -1198,3 +1198,36 @@ async def test_agenda_force_promotion_with_running_cron(self): self.len(1, cron01) self.false(cron01[0].get('isrunning')) self.eq(cron01[0].get('lasterrs')[0], 'aborted') + + async def test_agenda_pause_resume(self): + + async with self.getTestCore() as core: + + q = '$lib.log.info("cron job executed")' + cdef = { + 'creator': core.auth.rootuser.iden, + 'iden': s_common.guid(), + 'storm': q, + 'reqs': {}, + 'incunit': 'minute', + 'incvals': 1 + } + await core.addCronJob(cdef) + + with self.getAsyncLoggerStream('synapse.storm.log', 'cron job executed') as stream: + core.agenda._addTickOff(60) + self.true(await stream.wait(timeout=6)) + + await core.agenda.pause() + self.true(core.agenda._paused) + + with self.getAsyncLoggerStream('synapse.storm.log', 'cron job executed') as stream: + core.agenda._addTickOff(60) + self.false(await stream.wait(timeout=1)) + + await core.agenda.resume() + self.false(core.agenda._paused) + + with self.getAsyncLoggerStream('synapse.storm.log', 'cron job executed') as stream: + core.agenda._addTickOff(60) + self.true(await stream.wait(timeout=6)) From 20428f84314a698d5ead1052f0101880c6a147e6 Mon Sep 17 00:00:00 2001 From: bender Date: Thu, 9 Jan 2025 16:10:26 -0500 Subject: [PATCH 04/10] updates --- synapse/cortex.py | 17 -------------- synapse/lib/agenda.py | 40 +------------------------------- synapse/lib/cell.py | 5 ---- synapse/tests/test_lib_agenda.py | 37 ++--------------------------- 4 files changed, 3 insertions(+), 96 deletions(-) diff --git a/synapse/cortex.py b/synapse/cortex.py index ef6d17aa5ab..603da3dee6d 100644 --- a/synapse/cortex.py +++ b/synapse/cortex.py @@ -741,19 +741,6 @@ async def watchAllUserNotifs(self, offs=None): async def getHttpExtApiByPath(self, path): return await self.cell.getHttpExtApiByPath(path) - async def cancelRunningSchedulerJobs(self): - return await self.cell.agenda.cancelAll() - - async def pauseScheduler(self): - await self._reqUserAllowed(('agenda', 'control')) - if self.cell.agenda is not None: - await self.cell.agenda.pause() - - async def resumeScheduler(self): - await self._reqUserAllowed(('agenda', 'control')) - if self.cell.agenda is not None: - await self.cell.agenda.resume() - class Cortex(s_oauth.OAuthMixin, s_cell.Cell): # type: ignore ''' A Cortex implements the synapse hypergraph. @@ -1506,10 +1493,6 @@ def _initCorePerms(self): 'desc': 'Controls the ability to check if the Axon contains a file.'}, {'perm': ('axon', 'del'), 'gate': 'cortex', 'desc': 'Controls the ability to remove a file from the Axon.'}, - - {'perm': ('agenda', 'control'), 'gate': 'cortex', - 'desc': 'Controls the scheduler state'}, - )) for pdef in self._cortex_permdefs: s_storm.reqValidPermDef(pdef) diff --git a/synapse/lib/agenda.py b/synapse/lib/agenda.py index 2ba2e51d490..4ee7df5831f 100644 --- a/synapse/lib/agenda.py +++ b/synapse/lib/agenda.py @@ -445,7 +445,6 @@ async def __anit__(self, core): self.appts = {} # Dict[bytes: Appt] self._next_indx = 0 # index a new appt gets assigned self.tickoff = 0 # Used for test overrides - self._paused = False # Used to track paused state self._wake_event = s_coro.Event() # Causes the scheduler loop to wake up self.onfini(self._wake_event.set) @@ -715,14 +714,6 @@ async def clearRunningStatus(self): } await self.core.addCronEdits(appt.iden, edits) - async def pause(self): - self._paused = True - self._wake_event.set() - - async def resume(self): - self._paused = False - self._wake_event.set() - async def runloop(self): ''' Task loop to issue query tasks at the right times. @@ -730,7 +721,7 @@ async def runloop(self): while not self.isfini: timeout = None - if self.apptheap and not self._paused: + if self.apptheap: timeout = self.apptheap[0].nexttime - self._getNowTick() if timeout is None or timeout > 0: @@ -740,9 +731,6 @@ async def runloop(self): if self.isfini: return - if self._paused: - continue - now = self._getNowTick() while self.apptheap and self.apptheap[0].nexttime <= now: @@ -916,29 +904,3 @@ async def _runJob(self, user, appt): if not self.isfini: # fire beholder event before invoking nexus change (in case readonly) await self.core.feedBeholder('cron:stop', {'iden': appt.iden}) - - async def cancelAll(self): - now = self._getNowTick() - - for appt in self.appts.values(): - logger.warning(f'Cancelling agenda task {appt.iden}') - logger.warning(f'Task state: isrunning={appt.isrunning}, task={appt.task}') - - if appt.isrunning: - logger.warning(f'Found running task for {appt.iden}') - - await appt.task.kill() - await self.core.feedBeholder('cron:stop', {'iden': appt.iden}) - - if self.core.isactive: - edits = { - 'isrunning': False, - 'errcount': appt.errcount, - 'lasterrs': appt.lasterrs, - 'lastfinishtime': now, - 'lastresult': 'cancelled', - } - await self.core.addCronEdits(appt.iden, edits) - - self.apptheap = [] - self._wake_event.set() diff --git a/synapse/lib/cell.py b/synapse/lib/cell.py index 3b444e76ac4..adcd1dc5b76 100644 --- a/synapse/lib/cell.py +++ b/synapse/lib/cell.py @@ -2150,11 +2150,6 @@ async def promote(self, graceful=False): logger.debug(f'PROMOTION: Connecting to {mirurl} to request leadership handoff{_dispname}.') async with await s_telepath.openurl(mirurl) as lead: - - if 'storm' in lead.methinfo: - await lead.cancelRunningSchedulerJobs() - await lead.pauseScheduler() - await lead.handoff(myurl) logger.warning(f'PROMOTION: Completed leadership handoff to {myurl}{_dispname}') return diff --git a/synapse/tests/test_lib_agenda.py b/synapse/tests/test_lib_agenda.py index bfdd7c97234..1cc809ba5ac 100644 --- a/synapse/tests/test_lib_agenda.py +++ b/synapse/tests/test_lib_agenda.py @@ -1145,12 +1145,12 @@ async def test_agenda_graceful_promotion_with_running_cron(self): cron00 = await core00.callStorm('return($lib.cron.list())') self.len(1, cron00) self.false(cron00[0].get('isrunning')) - self.eq(cron00[0].get('lasterrs')[0], 'cancelled') + self.eq(cron00[0].get('lasterrs')[0], 'aborted') cron01 = await core01.callStorm('return($lib.cron.list())') self.len(1, cron01) self.false(cron01[0].get('isrunning')) - self.eq(cron01[0].get('lasterrs')[0], 'cancelled') + self.eq(cron01[0].get('lasterrs')[0], 'aborted') async def test_agenda_force_promotion_with_running_cron(self): @@ -1198,36 +1198,3 @@ async def test_agenda_force_promotion_with_running_cron(self): self.len(1, cron01) self.false(cron01[0].get('isrunning')) self.eq(cron01[0].get('lasterrs')[0], 'aborted') - - async def test_agenda_pause_resume(self): - - async with self.getTestCore() as core: - - q = '$lib.log.info("cron job executed")' - cdef = { - 'creator': core.auth.rootuser.iden, - 'iden': s_common.guid(), - 'storm': q, - 'reqs': {}, - 'incunit': 'minute', - 'incvals': 1 - } - await core.addCronJob(cdef) - - with self.getAsyncLoggerStream('synapse.storm.log', 'cron job executed') as stream: - core.agenda._addTickOff(60) - self.true(await stream.wait(timeout=6)) - - await core.agenda.pause() - self.true(core.agenda._paused) - - with self.getAsyncLoggerStream('synapse.storm.log', 'cron job executed') as stream: - core.agenda._addTickOff(60) - self.false(await stream.wait(timeout=1)) - - await core.agenda.resume() - self.false(core.agenda._paused) - - with self.getAsyncLoggerStream('synapse.storm.log', 'cron job executed') as stream: - core.agenda._addTickOff(60) - self.true(await stream.wait(timeout=6)) From c7a0b136a22837ff15f2b9b1fc8edcb762ae83be Mon Sep 17 00:00:00 2001 From: bender Date: Thu, 9 Jan 2025 16:53:03 -0500 Subject: [PATCH 05/10] add changelog --- changes/ff623ded36292878c995dfcf1874daf4.yaml | 5 +++++ 1 file changed, 5 insertions(+) create mode 100644 changes/ff623ded36292878c995dfcf1874daf4.yaml diff --git a/changes/ff623ded36292878c995dfcf1874daf4.yaml b/changes/ff623ded36292878c995dfcf1874daf4.yaml new file mode 100644 index 00000000000..936c01a0150 --- /dev/null +++ b/changes/ff623ded36292878c995dfcf1874daf4.yaml @@ -0,0 +1,5 @@ +--- +desc: Fixed a scheduler loop error during a mirror promotion. +prs: [] +type: bug +... From 5058ca78cab2c04fe4f160423a7705870efe70bd Mon Sep 17 00:00:00 2001 From: invisig0th Date: Fri, 10 Jan 2025 10:25:07 -0500 Subject: [PATCH 06/10] Update changes/ff623ded36292878c995dfcf1874daf4.yaml --- changes/ff623ded36292878c995dfcf1874daf4.yaml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/changes/ff623ded36292878c995dfcf1874daf4.yaml b/changes/ff623ded36292878c995dfcf1874daf4.yaml index 936c01a0150..8f3ecc2210a 100644 --- a/changes/ff623ded36292878c995dfcf1874daf4.yaml +++ b/changes/ff623ded36292878c995dfcf1874daf4.yaml @@ -1,5 +1,5 @@ --- -desc: Fixed a scheduler loop error during a mirror promotion. +desc: Fixed a Cortex cron scheduler loop error during a mirror promotion. prs: [] type: bug ... From 69275ad86c36f3df797a11b5fc35c6bbbbf44f65 Mon Sep 17 00:00:00 2001 From: bender Date: Mon, 13 Jan 2025 17:00:54 -0500 Subject: [PATCH 07/10] updates --- synapse/cortex.py | 1 - synapse/lib/agenda.py | 5 +++++ synapse/tests/test_lib_agenda.py | 3 --- 3 files changed, 5 insertions(+), 4 deletions(-) diff --git a/synapse/cortex.py b/synapse/cortex.py index 603da3dee6d..ac7999bd836 100644 --- a/synapse/cortex.py +++ b/synapse/cortex.py @@ -1556,7 +1556,6 @@ async def initServiceRuntime(self): async def initServiceActive(self): await self.stormdmons.start() - await self.agenda.clearRunningStatus() async def _runMigrations(): # Run migrations when this cortex becomes active. This is to prevent diff --git a/synapse/lib/agenda.py b/synapse/lib/agenda.py index 4ee7df5831f..4c5cf3840c6 100644 --- a/synapse/lib/agenda.py +++ b/synapse/lib/agenda.py @@ -707,6 +707,10 @@ async def clearRunningStatus(self): for appt in list(self.appts.values()): if appt.isrunning: logger.debug(f'Clearing the isrunning flag for {appt.iden}') + + if appt.nexttime is None and appt in self.apptheap: + self.delete(appt.iden) + edits = { 'isrunning': False, 'lastfinishtime': self._getNowTick(), @@ -718,6 +722,7 @@ async def runloop(self): ''' Task loop to issue query tasks at the right times. ''' + await self.clearRunningStatus() while not self.isfini: timeout = None diff --git a/synapse/tests/test_lib_agenda.py b/synapse/tests/test_lib_agenda.py index 1cc809ba5ac..1cec89b1b88 100644 --- a/synapse/tests/test_lib_agenda.py +++ b/synapse/tests/test_lib_agenda.py @@ -1182,9 +1182,6 @@ async def test_agenda_force_promotion_with_running_cron(self): async with self.getTestCore(conf=conf01) as core01: - with self.getAsyncLoggerStream('synapse.storm.log', 'I AM A ERROR LOG MESSAGE') as stream: - self.true(await stream.wait(timeout=6)) - cron = await core00.callStorm('return($lib.cron.list())') self.len(1, cron) self.true(cron[0].get('isrunning')) From 8fa8b9b1b362ae469d5ad213aefabc5ff9481e50 Mon Sep 17 00:00:00 2001 From: vEpiphyte Date: Tue, 14 Jan 2025 08:50:55 -0500 Subject: [PATCH 08/10] agenda runloop - remove appts from the heap which don't have a nexttime (#4067) --- synapse/lib/agenda.py | 14 ++++++++------ 1 file changed, 8 insertions(+), 6 deletions(-) diff --git a/synapse/lib/agenda.py b/synapse/lib/agenda.py index 4c5cf3840c6..412df7b2f48 100644 --- a/synapse/lib/agenda.py +++ b/synapse/lib/agenda.py @@ -679,6 +679,11 @@ async def delete(self, iden): mesg = f'No cron job with iden: {iden}' raise s_exc.NoSuchIden(iden=iden, mesg=mesg) + self._delete_appt_from_heap(appt) + del self.appts[iden] + self.apptdefs.delete(iden) + + def _delete_appt_from_heap(self, appt): try: heappos = self.apptheap.index(appt) except ValueError: @@ -692,9 +697,6 @@ async def delete(self, iden): self.apptheap[heappos] = self.apptheap.pop() heapq.heapify(self.apptheap) - del self.appts[iden] - self.apptdefs.delete(iden) - def _getNowTick(self): return time.time() + self.tickoff @@ -708,9 +710,6 @@ async def clearRunningStatus(self): if appt.isrunning: logger.debug(f'Clearing the isrunning flag for {appt.iden}') - if appt.nexttime is None and appt in self.apptheap: - self.delete(appt.iden) - edits = { 'isrunning': False, 'lastfinishtime': self._getNowTick(), @@ -718,6 +717,9 @@ async def clearRunningStatus(self): } await self.core.addCronEdits(appt.iden, edits) + if appt.nexttime is None: + self._delete_appt_from_heap(appt) + async def runloop(self): ''' Task loop to issue query tasks at the right times. From e5e0dd2819776816259322691d2626bdd0b938fa Mon Sep 17 00:00:00 2001 From: bender Date: Tue, 14 Jan 2025 08:58:30 -0500 Subject: [PATCH 09/10] updates --- synapse/tests/test_lib_agenda.py | 26 ++++++++++++++++++++++++++ 1 file changed, 26 insertions(+) diff --git a/synapse/tests/test_lib_agenda.py b/synapse/tests/test_lib_agenda.py index 1cec89b1b88..fe8422498d2 100644 --- a/synapse/tests/test_lib_agenda.py +++ b/synapse/tests/test_lib_agenda.py @@ -1195,3 +1195,29 @@ async def test_agenda_force_promotion_with_running_cron(self): self.len(1, cron01) self.false(cron01[0].get('isrunning')) self.eq(cron01[0].get('lasterrs')[0], 'aborted') + + async def test_agenda_clear_running_none_nexttime(self): + + async with self.getTestCore() as core: + + cdef = { + 'creator': core.auth.rootuser.iden, + 'iden': s_common.guid(), + 'storm': '$lib.log.info("test")', + 'reqs': {}, + 'incunit': 'minute', + 'incvals': 1 + } + await core.addCronJob(cdef) + + appt = core.agenda.appts[cdef['iden']] + self.true(appt in core.agenda.apptheap) + + appt.isrunning = True + appt.nexttime = None + + await core.agenda.clearRunningStatus() + self.false(appt in core.agenda.apptheap) + + crons = await core.callStorm('return($lib.cron.list())') + self.len(1, crons) From b7a64265f0322925de24a5d721e4691c7a7c8a1c Mon Sep 17 00:00:00 2001 From: bender Date: Tue, 14 Jan 2025 13:41:42 -0500 Subject: [PATCH 10/10] updates --- synapse/lib/agenda.py | 1 + 1 file changed, 1 insertion(+) diff --git a/synapse/lib/agenda.py b/synapse/lib/agenda.py index 412df7b2f48..443b2cb5f3e 100644 --- a/synapse/lib/agenda.py +++ b/synapse/lib/agenda.py @@ -716,6 +716,7 @@ async def clearRunningStatus(self): 'lasterrs': ['aborted'] + appt.lasterrs[-4:] } await self.core.addCronEdits(appt.iden, edits) + await self.core.feedBeholder('cron:stop', {'iden': appt.iden}) if appt.nexttime is None: self._delete_appt_from_heap(appt)