Skip to content

Commit

Permalink
refactor streaming backup (SYN-8340) (#4042)
Browse files Browse the repository at this point in the history
- new helper method to handle backup streaming logic
- cleaned up success/failure tracking; exceptions will propagate failures
- new tests and coverage for code

---------

Co-authored-by: bender <bender@vertex.link>
Co-authored-by: Cisphyx <cisphyx@vertex.link>
Co-authored-by: vEpiphyte <epiphyte@vertex.link>
  • Loading branch information
4 people authored Dec 26, 2024
1 parent d37450a commit 2f44645
Show file tree
Hide file tree
Showing 3 changed files with 107 additions and 86 deletions.
5 changes: 5 additions & 0 deletions changes/34ba15e2aef8b5c6be4d0274db884f84.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
---
desc: Refactored backup streaming logic and error handling.
prs: []
type: feat
...
116 changes: 31 additions & 85 deletions synapse/lib/cell.py
Original file line number Diff line number Diff line change
Expand Up @@ -2623,61 +2623,55 @@ def walkpath(path):
walkpath(self.backdirn)
return backups

async def iterBackupArchive(self, name, user):

success = False
loglevel = logging.WARNING

path = self._reqBackDirn(name)
cellguid = os.path.join(path, 'cell.guid')
if not os.path.isfile(cellguid):
mesg = 'Specified backup path has no cell.guid file.'
raise s_exc.BadArg(mesg=mesg, arg='path', valu=path)

async def _streamBackupArchive(self, path, user, name):
link = s_scope.get('link')
if link is None:
mesg = 'Link not found in scope. This API must be called via a CellApi.'
raise s_exc.SynErr(mesg=mesg)

linkinfo = await link.getSpawnInfo()
linkinfo['logconf'] = await self._getSpawnLogConf()

await self.boss.promote('backup:stream', user=user, info={'name': name})

ctx = multiprocessing.get_context('spawn')

proc = None
mesg = 'Streaming complete'

def getproc():
proc = ctx.Process(target=_iterBackupProc, args=(path, linkinfo))
proc.start()
return proc

mesg = 'Streaming complete'
proc = await s_coro.executor(getproc)
cancelled = False
try:
proc = await s_coro.executor(getproc)

await s_coro.executor(proc.join)
self.backlastuploaddt = datetime.datetime.now()
logger.debug(f'Backup streaming completed successfully for {name}')

except (asyncio.CancelledError, Exception) as e:
except asyncio.CancelledError:
logger.warning('Backup streaming was cancelled.')
cancelled = True
raise

# We want to log all exceptions here, an asyncio.CancelledError
# could be the result of a remote link terminating due to the
# backup stream being completed, prior to this function
# finishing.
except Exception as e:
logger.exception('Error during backup streaming.')

if proc:
proc.terminate()

mesg = repr(e)
raise

else:
success = True
loglevel = logging.DEBUG
self.backlastuploaddt = datetime.datetime.now()

finally:
phrase = 'successfully' if success else 'with failure'
logger.log(loglevel, f'iterBackupArchive completed {phrase} for {name}')
raise s_exc.DmonSpawn(mesg=mesg)
proc.terminate()

if not cancelled:
raise s_exc.DmonSpawn(mesg=mesg)

async def iterBackupArchive(self, name, user):
path = self._reqBackDirn(name)
cellguid = os.path.join(path, 'cell.guid')
if not os.path.isfile(cellguid):
mesg = 'Specified backup path has no cell.guid file.'
raise s_exc.BadArg(mesg=mesg, arg='path', valu=path)
await self._streamBackupArchive(path, user, name)

async def iterNewBackupArchive(self, user, name=None, remove=False):

Expand All @@ -2688,9 +2682,6 @@ async def iterNewBackupArchive(self, user, name=None, remove=False):
if remove:
self.backupstreaming = True

success = False
loglevel = logging.WARNING

if name is None:
name = time.strftime('%Y%m%d%H%M%S', datetime.datetime.now().timetuple())

Expand All @@ -2699,10 +2690,6 @@ async def iterNewBackupArchive(self, user, name=None, remove=False):
mesg = 'Backup with name already exists'
raise s_exc.BadArg(mesg=mesg)

link = s_scope.get('link')
linkinfo = await link.getSpawnInfo()
linkinfo['logconf'] = await self._getSpawnLogConf()

try:
await self.runBackup(name)
except Exception:
Expand All @@ -2712,54 +2699,13 @@ async def iterNewBackupArchive(self, user, name=None, remove=False):
logger.debug(f'Removed {path}')
raise

await self.boss.promote('backup:stream', user=user, info={'name': name})

ctx = multiprocessing.get_context('spawn')

proc = None
mesg = 'Streaming complete'

def getproc():
proc = ctx.Process(target=_iterBackupProc, args=(path, linkinfo))
proc.start()
return proc

try:
proc = await s_coro.executor(getproc)

await s_coro.executor(proc.join)

except (asyncio.CancelledError, Exception) as e:

# We want to log all exceptions here, an asyncio.CancelledError
# could be the result of a remote link terminating due to the
# backup stream being completed, prior to this function
# finishing.
logger.exception('Error during backup streaming.')

if proc:
proc.terminate()

mesg = repr(e)
raise

else:
success = True
loglevel = logging.DEBUG
self.backlastuploaddt = datetime.datetime.now()

finally:
if remove:
logger.debug(f'Removing {path}')
await s_coro.executor(shutil.rmtree, path, ignore_errors=True)
logger.debug(f'Removed {path}')

phrase = 'successfully' if success else 'with failure'
logger.log(loglevel, f'iterNewBackupArchive completed {phrase} for {name}')
raise s_exc.DmonSpawn(mesg=mesg)
await self._streamBackupArchive(path, user, name)

finally:
if remove:
logger.debug(f'Removing {path}')
await s_coro.executor(shutil.rmtree, path, ignore_errors=True)
logger.debug(f'Removed {path}')
self.backupstreaming = False

async def isUserAllowed(self, iden, perm, gateiden=None, default=False):
Expand Down
72 changes: 71 additions & 1 deletion synapse/tests/test_lib_cell.py
Original file line number Diff line number Diff line change
Expand Up @@ -1433,6 +1433,11 @@ def diffdev(dirn):
with mock.patch('os.stat', diffdev):
await self.asyncraises(s_exc.LowSpace, proxy.runBackup())

user = await core.auth.getUserByName('root')
with self.raises(s_exc.SynErr) as cm:
await core.iterNewBackupArchive(user)
self.isin('This API must be called via a CellApi', cm.exception.get('mesg'))

async def err(*args, **kwargs):
raise RuntimeError('boom')

Expand Down Expand Up @@ -2298,11 +2303,13 @@ async def test_backup_restore_double_promote_aha(self):
# Backup the mirror (core01) which points to the core00
async with await axon00.upload() as upfd:
async with core01.getLocalProxy() as prox:
tot_chunks = 0
async for chunk in prox.iterNewBackupArchive():
await upfd.write(chunk)
tot_chunks += len(chunk)

size, sha256 = await upfd.save()
await asyncio.sleep(0)
self.eq(size, tot_chunks)

furl = f'{url}{s_common.ehex(sha256)}'
purl = await aha.addAhaSvcProv('00.mynewcortex')
Expand Down Expand Up @@ -3276,3 +3283,66 @@ async def test_lib_cell_promote_schism_prevent(self):
with self.raises(s_exc.BadState) as cm:
await cell00.promote(graceful=True)
self.isin('02.cell is not the current leader', cm.exception.get('mesg'))

async def test_stream_backup_exception(self):

with self.getTestDir() as dirn:
backdirn = os.path.join(dirn, 'backups')
coredirn = os.path.join(dirn, 'cortex')

conf = {'backup:dir': backdirn}
s_common.yamlsave(conf, coredirn, 'cell.yaml')

async with self.getTestCore(dirn=coredirn) as core:
async with core.getLocalProxy() as proxy:

await proxy.runBackup(name='bkup')

mock_proc = mock.Mock()
mock_proc.join = mock.Mock()

async def mock_executor(func, *args, **kwargs):
if isinstance(func, mock.Mock) and func is mock_proc.join:
raise Exception('boom')
return mock_proc

with mock.patch('synapse.lib.cell.s_coro.executor', mock_executor):
with self.getAsyncLoggerStream('synapse.lib.cell', 'Error during backup streaming') as stream:
with self.raises(Exception) as cm:
async for _ in proxy.iterBackupArchive('bkup'):
pass
self.true(await stream.wait(timeout=6))

async def test_iter_new_backup_archive(self):

with self.getTestDir() as dirn:
backdirn = os.path.join(dirn, 'backups')
coredirn = os.path.join(dirn, 'cortex')

conf = {'backup:dir': backdirn}
s_common.yamlsave(conf, coredirn, 'cell.yaml')

async with self.getTestCore(dirn=coredirn) as core:
async with core.getLocalProxy() as proxy:

async def mock_runBackup(*args, **kwargs):
raise Exception('backup failed')

with mock.patch.object(s_cell.Cell, 'runBackup', mock_runBackup):
with self.getAsyncLoggerStream('synapse.lib.cell', 'Removing') as stream:
with self.raises(s_exc.SynErr) as cm:
async for _ in proxy.iterNewBackupArchive('failedbackup', remove=True):
pass

self.isin('backup failed', str(cm.exception))
self.true(await stream.wait(timeout=6))

path = os.path.join(backdirn, 'failedbackup')
self.false(os.path.exists(path))

self.false(core.backupstreaming)

core.backupstreaming = True
with self.raises(s_exc.BackupAlreadyRunning):
async for _ in proxy.iterNewBackupArchive('newbackup', remove=True):
pass

0 comments on commit 2f44645

Please sign in to comment.