diff --git a/changes/ff623ded36292878c995dfcf1874daf4.yaml b/changes/ff623ded36292878c995dfcf1874daf4.yaml new file mode 100644 index 00000000000..8f3ecc2210a --- /dev/null +++ b/changes/ff623ded36292878c995dfcf1874daf4.yaml @@ -0,0 +1,5 @@ +--- +desc: Fixed a Cortex cron scheduler loop error during a mirror promotion. +prs: [] +type: bug +... diff --git a/synapse/cortex.py b/synapse/cortex.py index 603da3dee6d..ac7999bd836 100644 --- a/synapse/cortex.py +++ b/synapse/cortex.py @@ -1556,7 +1556,6 @@ async def initServiceRuntime(self): async def initServiceActive(self): await self.stormdmons.start() - await self.agenda.clearRunningStatus() async def _runMigrations(): # Run migrations when this cortex becomes active. This is to prevent diff --git a/synapse/lib/agenda.py b/synapse/lib/agenda.py index f2a10f1e7c2..443b2cb5f3e 100644 --- a/synapse/lib/agenda.py +++ b/synapse/lib/agenda.py @@ -679,6 +679,11 @@ async def delete(self, iden): mesg = f'No cron job with iden: {iden}' raise s_exc.NoSuchIden(iden=iden, mesg=mesg) + self._delete_appt_from_heap(appt) + del self.appts[iden] + self.apptdefs.delete(iden) + + def _delete_appt_from_heap(self, appt): try: heappos = self.apptheap.index(appt) except ValueError: @@ -692,9 +697,6 @@ async def delete(self, iden): self.apptheap[heappos] = self.apptheap.pop() heapq.heapify(self.apptheap) - del self.appts[iden] - self.apptdefs.delete(iden) - def _getNowTick(self): return time.time() + self.tickoff @@ -707,12 +709,23 @@ async def clearRunningStatus(self): for appt in list(self.appts.values()): if appt.isrunning: logger.debug(f'Clearing the isrunning flag for {appt.iden}') - await self.core.addCronEdits(appt.iden, {'isrunning': False}) + + edits = { + 'isrunning': False, + 'lastfinishtime': self._getNowTick(), + 'lasterrs': ['aborted'] + appt.lasterrs[-4:] + } + await self.core.addCronEdits(appt.iden, edits) + await self.core.feedBeholder('cron:stop', {'iden': appt.iden}) + + if appt.nexttime is None: + self._delete_appt_from_heap(appt) async def runloop(self): ''' Task loop to issue query tasks at the right times. ''' + await self.clearRunningStatus() while not self.isfini: timeout = None diff --git a/synapse/tests/test_lib_agenda.py b/synapse/tests/test_lib_agenda.py index 285b793610e..fe8422498d2 100644 --- a/synapse/tests/test_lib_agenda.py +++ b/synapse/tests/test_lib_agenda.py @@ -441,7 +441,7 @@ def looptime(): await self.asyncraises(s_exc.DupIden, core.addCronJob(cdef)) await core.delCronJob(viewiden) - self.nn(core.getAuthGate(viewiden)) + self.nn(await core.getAuthGate(viewiden)) async def test_agenda_persistence(self): ''' Test we can make/change/delete appointments and they are persisted to storage ''' @@ -1097,3 +1097,127 @@ async def task(): self.eq(cdef01.get('lastresult'), 'cancelled') self.gt(cdef00['laststarttime'], 0) self.eq(cdef00['laststarttime'], cdef01['laststarttime']) + + async def test_agenda_graceful_promotion_with_running_cron(self): + + async with self.getTestAha() as aha: + + conf00 = { + 'aha:provision': await aha.addAhaSvcProv('00.cortex') + } + + async with self.getTestCore(conf=conf00) as core00: + self.false(core00.conf.get('mirror')) + + q = ''' + while((true)) { + $lib.log.error('I AM A ERROR LOG MESSAGE') + $lib.time.sleep(6) + } + ''' + msgs = await core00.stormlist('cron.at --now $q', opts={'vars': {'q': q}}) + self.stormHasNoWarnErr(msgs) + + crons00 = await core00.callStorm('return($lib.cron.list())') + self.len(1, crons00) + + prov01 = {'mirror': '00.cortex'} + conf01 = { + 'aha:provision': await aha.addAhaSvcProv('01.cortex', provinfo=prov01), + } + + async with self.getTestCore(conf=conf01) as core01: + + with self.getAsyncLoggerStream('synapse.storm.log', 'I AM A ERROR LOG MESSAGE') as stream: + self.true(await stream.wait(timeout=6)) + + cron = await core00.callStorm('return($lib.cron.list())') + self.len(1, cron) + self.true(cron[0].get('isrunning')) + + await core01.promote(graceful=True) + + self.false(core00.isactive) + self.true(core01.isactive) + + await core00.sync() + + cron00 = await core00.callStorm('return($lib.cron.list())') + self.len(1, cron00) + self.false(cron00[0].get('isrunning')) + self.eq(cron00[0].get('lasterrs')[0], 'aborted') + + cron01 = await core01.callStorm('return($lib.cron.list())') + self.len(1, cron01) + self.false(cron01[0].get('isrunning')) + self.eq(cron01[0].get('lasterrs')[0], 'aborted') + + async def test_agenda_force_promotion_with_running_cron(self): + + async with self.getTestAha() as aha: + + conf00 = { + 'aha:provision': await aha.addAhaSvcProv('00.cortex') + } + + async with self.getTestCore(conf=conf00) as core00: + self.false(core00.conf.get('mirror')) + + q = ''' + while((true)) { + $lib.log.error('I AM A ERROR LOG MESSAGE') + $lib.time.sleep(6) + } + ''' + msgs = await core00.stormlist('cron.at --now $q', opts={'vars': {'q': q}}) + self.stormHasNoWarnErr(msgs) + + crons00 = await core00.callStorm('return($lib.cron.list())') + self.len(1, crons00) + + prov01 = {'mirror': '00.cortex'} + conf01 = { + 'aha:provision': await aha.addAhaSvcProv('01.cortex', provinfo=prov01), + } + + async with self.getTestCore(conf=conf01) as core01: + + cron = await core00.callStorm('return($lib.cron.list())') + self.len(1, cron) + self.true(cron[0].get('isrunning')) + + await core01.promote(graceful=False) + + self.true(core00.isactive) + self.true(core01.isactive) + + cron01 = await core01.callStorm('return($lib.cron.list())') + self.len(1, cron01) + self.false(cron01[0].get('isrunning')) + self.eq(cron01[0].get('lasterrs')[0], 'aborted') + + async def test_agenda_clear_running_none_nexttime(self): + + async with self.getTestCore() as core: + + cdef = { + 'creator': core.auth.rootuser.iden, + 'iden': s_common.guid(), + 'storm': '$lib.log.info("test")', + 'reqs': {}, + 'incunit': 'minute', + 'incvals': 1 + } + await core.addCronJob(cdef) + + appt = core.agenda.appts[cdef['iden']] + self.true(appt in core.agenda.apptheap) + + appt.isrunning = True + appt.nexttime = None + + await core.agenda.clearRunningStatus() + self.false(appt in core.agenda.apptheap) + + crons = await core.callStorm('return($lib.cron.list())') + self.len(1, crons)