From 2f446457047d9a1b1cda3f0a8927529f53e1d129 Mon Sep 17 00:00:00 2001 From: OCBender <181250370+OCBender@users.noreply.github.com> Date: Thu, 26 Dec 2024 11:16:24 -0500 Subject: [PATCH] refactor streaming backup (SYN-8340) (#4042) - new helper method to handle backup streaming logic - cleaned up success/failure tracking; exceptions will propagate failures - new tests and coverage for code --------- Co-authored-by: bender Co-authored-by: Cisphyx Co-authored-by: vEpiphyte --- changes/34ba15e2aef8b5c6be4d0274db884f84.yaml | 5 + synapse/lib/cell.py | 116 +++++------------- synapse/tests/test_lib_cell.py | 72 ++++++++++- 3 files changed, 107 insertions(+), 86 deletions(-) create mode 100644 changes/34ba15e2aef8b5c6be4d0274db884f84.yaml diff --git a/changes/34ba15e2aef8b5c6be4d0274db884f84.yaml b/changes/34ba15e2aef8b5c6be4d0274db884f84.yaml new file mode 100644 index 00000000000..87e11e76496 --- /dev/null +++ b/changes/34ba15e2aef8b5c6be4d0274db884f84.yaml @@ -0,0 +1,5 @@ +--- +desc: Refactored backup streaming logic and error handling. +prs: [] +type: feat +... diff --git a/synapse/lib/cell.py b/synapse/lib/cell.py index 5aaa9d31958..adcd1dc5b76 100644 --- a/synapse/lib/cell.py +++ b/synapse/lib/cell.py @@ -2623,18 +2623,12 @@ def walkpath(path): walkpath(self.backdirn) return backups - async def iterBackupArchive(self, name, user): - - success = False - loglevel = logging.WARNING - - path = self._reqBackDirn(name) - cellguid = os.path.join(path, 'cell.guid') - if not os.path.isfile(cellguid): - mesg = 'Specified backup path has no cell.guid file.' - raise s_exc.BadArg(mesg=mesg, arg='path', valu=path) - + async def _streamBackupArchive(self, path, user, name): link = s_scope.get('link') + if link is None: + mesg = 'Link not found in scope. This API must be called via a CellApi.' + raise s_exc.SynErr(mesg=mesg) + linkinfo = await link.getSpawnInfo() linkinfo['logconf'] = await self._getSpawnLogConf() @@ -2642,42 +2636,42 @@ async def iterBackupArchive(self, name, user): ctx = multiprocessing.get_context('spawn') - proc = None - mesg = 'Streaming complete' - def getproc(): proc = ctx.Process(target=_iterBackupProc, args=(path, linkinfo)) proc.start() return proc + mesg = 'Streaming complete' + proc = await s_coro.executor(getproc) + cancelled = False try: - proc = await s_coro.executor(getproc) - await s_coro.executor(proc.join) + self.backlastuploaddt = datetime.datetime.now() + logger.debug(f'Backup streaming completed successfully for {name}') - except (asyncio.CancelledError, Exception) as e: + except asyncio.CancelledError: + logger.warning('Backup streaming was cancelled.') + cancelled = True + raise - # We want to log all exceptions here, an asyncio.CancelledError - # could be the result of a remote link terminating due to the - # backup stream being completed, prior to this function - # finishing. + except Exception as e: logger.exception('Error during backup streaming.') - - if proc: - proc.terminate() - mesg = repr(e) raise - else: - success = True - loglevel = logging.DEBUG - self.backlastuploaddt = datetime.datetime.now() - finally: - phrase = 'successfully' if success else 'with failure' - logger.log(loglevel, f'iterBackupArchive completed {phrase} for {name}') - raise s_exc.DmonSpawn(mesg=mesg) + proc.terminate() + + if not cancelled: + raise s_exc.DmonSpawn(mesg=mesg) + + async def iterBackupArchive(self, name, user): + path = self._reqBackDirn(name) + cellguid = os.path.join(path, 'cell.guid') + if not os.path.isfile(cellguid): + mesg = 'Specified backup path has no cell.guid file.' + raise s_exc.BadArg(mesg=mesg, arg='path', valu=path) + await self._streamBackupArchive(path, user, name) async def iterNewBackupArchive(self, user, name=None, remove=False): @@ -2688,9 +2682,6 @@ async def iterNewBackupArchive(self, user, name=None, remove=False): if remove: self.backupstreaming = True - success = False - loglevel = logging.WARNING - if name is None: name = time.strftime('%Y%m%d%H%M%S', datetime.datetime.now().timetuple()) @@ -2699,10 +2690,6 @@ async def iterNewBackupArchive(self, user, name=None, remove=False): mesg = 'Backup with name already exists' raise s_exc.BadArg(mesg=mesg) - link = s_scope.get('link') - linkinfo = await link.getSpawnInfo() - linkinfo['logconf'] = await self._getSpawnLogConf() - try: await self.runBackup(name) except Exception: @@ -2712,54 +2699,13 @@ async def iterNewBackupArchive(self, user, name=None, remove=False): logger.debug(f'Removed {path}') raise - await self.boss.promote('backup:stream', user=user, info={'name': name}) - - ctx = multiprocessing.get_context('spawn') - - proc = None - mesg = 'Streaming complete' - - def getproc(): - proc = ctx.Process(target=_iterBackupProc, args=(path, linkinfo)) - proc.start() - return proc - - try: - proc = await s_coro.executor(getproc) - - await s_coro.executor(proc.join) - - except (asyncio.CancelledError, Exception) as e: - - # We want to log all exceptions here, an asyncio.CancelledError - # could be the result of a remote link terminating due to the - # backup stream being completed, prior to this function - # finishing. - logger.exception('Error during backup streaming.') - - if proc: - proc.terminate() - - mesg = repr(e) - raise - - else: - success = True - loglevel = logging.DEBUG - self.backlastuploaddt = datetime.datetime.now() - - finally: - if remove: - logger.debug(f'Removing {path}') - await s_coro.executor(shutil.rmtree, path, ignore_errors=True) - logger.debug(f'Removed {path}') - - phrase = 'successfully' if success else 'with failure' - logger.log(loglevel, f'iterNewBackupArchive completed {phrase} for {name}') - raise s_exc.DmonSpawn(mesg=mesg) + await self._streamBackupArchive(path, user, name) finally: if remove: + logger.debug(f'Removing {path}') + await s_coro.executor(shutil.rmtree, path, ignore_errors=True) + logger.debug(f'Removed {path}') self.backupstreaming = False async def isUserAllowed(self, iden, perm, gateiden=None, default=False): diff --git a/synapse/tests/test_lib_cell.py b/synapse/tests/test_lib_cell.py index 84f679a8c28..bf136419978 100644 --- a/synapse/tests/test_lib_cell.py +++ b/synapse/tests/test_lib_cell.py @@ -1433,6 +1433,11 @@ def diffdev(dirn): with mock.patch('os.stat', diffdev): await self.asyncraises(s_exc.LowSpace, proxy.runBackup()) + user = await core.auth.getUserByName('root') + with self.raises(s_exc.SynErr) as cm: + await core.iterNewBackupArchive(user) + self.isin('This API must be called via a CellApi', cm.exception.get('mesg')) + async def err(*args, **kwargs): raise RuntimeError('boom') @@ -2298,11 +2303,13 @@ async def test_backup_restore_double_promote_aha(self): # Backup the mirror (core01) which points to the core00 async with await axon00.upload() as upfd: async with core01.getLocalProxy() as prox: + tot_chunks = 0 async for chunk in prox.iterNewBackupArchive(): await upfd.write(chunk) + tot_chunks += len(chunk) size, sha256 = await upfd.save() - await asyncio.sleep(0) + self.eq(size, tot_chunks) furl = f'{url}{s_common.ehex(sha256)}' purl = await aha.addAhaSvcProv('00.mynewcortex') @@ -3276,3 +3283,66 @@ async def test_lib_cell_promote_schism_prevent(self): with self.raises(s_exc.BadState) as cm: await cell00.promote(graceful=True) self.isin('02.cell is not the current leader', cm.exception.get('mesg')) + + async def test_stream_backup_exception(self): + + with self.getTestDir() as dirn: + backdirn = os.path.join(dirn, 'backups') + coredirn = os.path.join(dirn, 'cortex') + + conf = {'backup:dir': backdirn} + s_common.yamlsave(conf, coredirn, 'cell.yaml') + + async with self.getTestCore(dirn=coredirn) as core: + async with core.getLocalProxy() as proxy: + + await proxy.runBackup(name='bkup') + + mock_proc = mock.Mock() + mock_proc.join = mock.Mock() + + async def mock_executor(func, *args, **kwargs): + if isinstance(func, mock.Mock) and func is mock_proc.join: + raise Exception('boom') + return mock_proc + + with mock.patch('synapse.lib.cell.s_coro.executor', mock_executor): + with self.getAsyncLoggerStream('synapse.lib.cell', 'Error during backup streaming') as stream: + with self.raises(Exception) as cm: + async for _ in proxy.iterBackupArchive('bkup'): + pass + self.true(await stream.wait(timeout=6)) + + async def test_iter_new_backup_archive(self): + + with self.getTestDir() as dirn: + backdirn = os.path.join(dirn, 'backups') + coredirn = os.path.join(dirn, 'cortex') + + conf = {'backup:dir': backdirn} + s_common.yamlsave(conf, coredirn, 'cell.yaml') + + async with self.getTestCore(dirn=coredirn) as core: + async with core.getLocalProxy() as proxy: + + async def mock_runBackup(*args, **kwargs): + raise Exception('backup failed') + + with mock.patch.object(s_cell.Cell, 'runBackup', mock_runBackup): + with self.getAsyncLoggerStream('synapse.lib.cell', 'Removing') as stream: + with self.raises(s_exc.SynErr) as cm: + async for _ in proxy.iterNewBackupArchive('failedbackup', remove=True): + pass + + self.isin('backup failed', str(cm.exception)) + self.true(await stream.wait(timeout=6)) + + path = os.path.join(backdirn, 'failedbackup') + self.false(os.path.exists(path)) + + self.false(core.backupstreaming) + + core.backupstreaming = True + with self.raises(s_exc.BackupAlreadyRunning): + async for _ in proxy.iterNewBackupArchive('newbackup', remove=True): + pass