diff --git a/synapse/__init__.py b/synapse/__init__.py index 1c75faa9..88f0c008 100644 --- a/synapse/__init__.py +++ b/synapse/__init__.py @@ -77,5 +77,4 @@ s_datamodel.rebuildTlib() # load any modules which register dyndeps aliases... -# ( order matters...) -import synapse.axon # synapse.axon brings in synapse.cortex's dyndep registration. +import synapse.cortex diff --git a/synapse/axon.py b/synapse/axon.py index 4db5cf6a..6b2ad251 100644 --- a/synapse/axon.py +++ b/synapse/axon.py @@ -16,7 +16,6 @@ import synapse.lib.const as s_const import synapse.lib.config as s_config import synapse.lib.msgpack as s_msgpack -import synapse.lib.blobfile as s_blobfile # for backward compat (HashSet moved from this module to synapse.lib.hashset ) from synapse.lib.hashset import * diff --git a/synapse/exc.py b/synapse/exc.py index f1dd4df1..a8befe10 100644 --- a/synapse/exc.py +++ b/synapse/exc.py @@ -207,17 +207,6 @@ class BadAtomFile(SynErr): ''' pass -class BadBlobFile(SynErr): - ''' - Raised when there is an internal issue with a blobfile - ''' - pass -class BlobFileIsClone(SynErr): - ''' - Raised when there is a write operation on a blobfile which is a clone. - ''' - pass - class IsFini(Exception): pass class RetnTimeout(SynErr): diff --git a/synapse/lib/blobfile.py b/synapse/lib/blobfile.py deleted file mode 100644 index 33a27e40..00000000 --- a/synapse/lib/blobfile.py +++ /dev/null @@ -1,276 +0,0 @@ -import os -import struct -import logging -import threading - -import synapse.common as s_common -import synapse.reactor as s_reactor -import synapse.eventbus as s_eventbus -import synapse.lib.atomfile as s_atomfile - -log = logging.getLogger(__name__) - - -headfmt = ' max_size: - raise s_common.BadBlobFile(mesg='BlobFile cannot write past allocated size', - max_size=max_size, - off=off, - size=off + len(byts) - ) - self._writeoff(off, byts) - - def alloc(self, size): - ''' - Allocate a block within the Blob and return the offset. - - Args: - size (int): Number of bytes to allocate within the blob. - - Example: - Store a string by allocating space for it in a blob file: - - s = 'foobar' - byts = s.encode() - off = blob.alloc(len(byts)) - blob.writeoff(off, byts) - - Returns: - int: Offset within the blob to use to store size bytes. - ''' - if self.isclone: - raise s_common.BlobFileIsClone(mesg='BlobFile is a clone and cannot alloc space via alloc()') - return self._alloc(size) - - def _alloc(self, size): - ''' - Internal method which implements alloc() - ''' - # Account for the blob header - fullsize = headsize + size - - with self.alloclock: - - # Fire our alloc event - # self.fire('blob:alloc', size=size) - baseoff = self.atom.size - nsize = baseoff + fullsize - - # Grow the file - self.atom.resize(nsize) - - # Write the size dword - self._writeoff(baseoff, struct.pack(headfmt, size)) - - # Compute the return value - dataoff = baseoff + headsize - - return dataoff - - def size(self): - ''' - Get the amount of space currently used by the blob. - - Returns: - int: Size of the bytes currently allocated by the blob. - ''' - return self.atom.size - - def walk(self): - ''' - Walk the BlobFile from the first record until the end of the file. - - Yields: - ((int, int)): A Tuple of integers representing the offset and size - for a record in the BlobFile. - - Examples: - Iterate over the records in a BlobFile and retrieve the contents of - the records and dostuff() with them: - - for offset, size in blob.walk(): - byts = blob.readoff(offset, size) - dostuff(byts) - - Raises: - BadBlobFile: If the BlobFile is truncated or unable to unpack a Blob header. - ''' - baseoff = 0 - # Iterate up to self._size - max_size = self.atom.size - while True: - try: - header = self.readoff(baseoff, headsize) - size, = struct.unpack(headfmt, header) - except Exception as e: - raise s_common.BadBlobFile(mesg='failed to read/unpack header', - baseoff=baseoff, size=headsize, - excinfo=s_common.excinfo(e)) - # compute the offset and yield it - off = baseoff + headsize - yield off, size - # Calcuate the next expected header - baseoff = off + size - - if baseoff > max_size: - raise s_common.BadBlobFile(mesg='BlobFile truncated', - max_size=max_size, - next_header=baseoff) - if baseoff == max_size: - break diff --git a/synapse/tests/test_lib_blobfile.py b/synapse/tests/test_lib_blobfile.py deleted file mode 100644 index fbb46fe1..00000000 --- a/synapse/tests/test_lib_blobfile.py +++ /dev/null @@ -1,244 +0,0 @@ -import synapse.lib.atomfile as s_atomfile -import synapse.lib.blobfile as s_blobfile - -from synapse.tests.common import * - -class BlobFileTest(SynTest): - - def blobfile_basic_assumptions(self, blob): - ''' - Basic assumptions for a Blobfile - - Args: - blob (s_blobfile.BlobFile): BlobFile under test. - ''' - self.false(blob.isclone) - self.eq(blob.size(), 0) - - off0 = blob.alloc(8) - - self.eq(blob.size(), 8 + s_blobfile.headsize) - - off1 = blob.alloc(8) - - # do interlaced writes - blob.writeoff(off0, b'asdf') - blob.writeoff(off1, b'hehe') - - blob.writeoff(off0 + 4, b'qwer') - blob.writeoff(off1 + 4, b'haha') - - self.eq(blob.readoff(off0, 8), b'asdfqwer') - self.eq(blob.readoff(off1, 8), b'hehehaha') - - # Do a third allocation and write - off2 = blob.alloc(8) - # Ensure our alloc did not smash old data - self.eq(blob.readoff(off1, 8), b'hehehaha') - byts = 4 * b':)' - blob.writeoff(off2, byts) - - self.eq(blob.readoff(off0, 8), b'asdfqwer') - self.eq(blob.readoff(off1, 8), b'hehehaha') - self.eq(blob.readoff(off2, 8), byts) - - esisze = (3 * (s_blobfile.headsize + 8)) - self.eq(blob.size(), esisze) - - # Ensure bad reads are caught - self.raises(BadBlobFile, blob.readoff, blob.size() - 4, 8) - # Ensure that bad writes are caught - self.raises(BadBlobFile, blob.writeoff, blob.size(), b':(') - - def test_blob_base(self): - - # FIXME test these on windows... - #self.thisHostMust(platform='linux') - with self.getTestDir() as dir: - fp = os.path.join(dir, 'test.blob') - blob = s_blobfile.BlobFile(fp) - self.blobfile_basic_assumptions(blob) - blob.fini() - - # fini the blob closes the underlying fd - self.true(blob.atom.isfini) - - def test_blob_simple_atom(self): - - with self.getTestDir() as dirn: - fp = os.path.join(dirn, 'test.blob') - blob = s_blobfile.BlobFile(fp) - self.isinstance(blob.atom, s_atomfile.AtomFile) - self.blobfile_basic_assumptions(blob) - blob.fini() - # fini the blob closes the underlying fd - self.true(blob.atom.isfini) - - def test_blob_resize(self): - - with self.getTestDir() as dir: - fp = os.path.join(dir, 'test.blob') - with s_blobfile.BlobFile(fp) as blob: # type: s_blobfile.BlobFile - - blocks = [] - w = blob.waiter(5, 'blob:alloc') - esize = (s_blobfile.headsize + 8) * 5 - while blob.size() < esize: - blocks.append(blob.alloc(8)) - - self.eq(w.count, 5) - w.fini() - self.eq(blob.size(), esize) - - def test_blob_save(self): - - #self.thisHostMust(platform='linux') - - msgs = [] - - with self.getTestDir() as dir: - fp0 = os.path.join(dir, 'test0.blob') - blob0 = s_blobfile.BlobFile(fp0) - - blob0.on('blob:sync', msgs.append) - - off0 = blob0.alloc(8) - off1 = blob0.alloc(8) - - # do interlaced writes - blob0.writeoff(off0, b'asdf') - blob0.writeoff(off1, b'hehe') - - blob0.writeoff(off0 + 4, b'qwer') - blob0.writeoff(off1 + 4, b'haha') - - walks = [t for t in blob0.walk()] - - fp1 = os.path.join(dir, 'test1.blob') - - blob1 = s_blobfile.BlobFile(fp1, isclone=True) - self.true(blob1.isclone) - blob1.syncs(msgs) - - self.eq(blob0.readoff(off0, 8), blob1.readoff(off0, 8)) - self.eq(blob0.readoff(off1, 8), blob1.readoff(off1, 8)) - - self.eq(blob0.size(), blob1.size()) - self.eq(walks, [t for t in blob1.walk()]) - - # Replaying messages to blob0 doesn't do anything to is since - # the reactor events don't do anything for isclone=False - blob0.syncs(msgs) - self.eq(blob0.size(), blob1.size()) - - # Calling alloc / writeoff apis to the clone fails - self.raises(BlobFileIsClone, blob1.alloc, 1) - self.raises(BlobFileIsClone, blob1.writeoff, 1, b'1') - - blob1.fini() - - fp2 = os.path.join(dir, 'test2.blob') - blob2 = s_blobfile.BlobFile(fp2, isclone=True) - self.true(blob2.isclone) - # Using sync() here for test coverage - for msg in msgs: - blob2.sync(msg) - - self.eq(blob0.readoff(off0, 8), blob2.readoff(off0, 8)) - self.eq(blob0.readoff(off1, 8), blob2.readoff(off1, 8)) - - self.eq(blob0.size(), blob2.size()) - self.eq(walks, [t for t in blob2.walk()]) - - blob2.fini() - - blob0.fini() - - def test_blob_readiter(self): - #self.thisHostMust(platform='linux') - - with self.getTestDir() as dir: - fp = os.path.join(dir, 'test.blob') - with s_blobfile.BlobFile(fp) as blob: - - rand = os.urandom(2048) - off = blob.alloc(2048) - blob.writeoff(off, rand) - - blocks = [b for b in blob.readiter(off, 2048, itersize=9)] - byts = b''.join(blocks) - - self.eq(rand, byts) - - def test_blob_walk(self): - edata = {} - with self.getTestDir() as fdir: - fp = os.path.join(fdir, 'test.blob') - bkup_fp = fp + '.bak' - - with s_blobfile.BlobFile(fp) as blob: # type: s_blobfile.BlobFile - - off0 = blob.alloc(8) - off1 = blob.alloc(8) - - edata[off0] = b'asdfqwer' - edata[off1] = b'hehehaha' - - # do interlaced writes - blob.writeoff(off0, b'asdf') - blob.writeoff(off1, b'hehe') - - blob.writeoff(off0 + 4, b'qwer') - blob.writeoff(off1 + 4, b'haha') - # self.eq(blob.readoff(off1, 8), edata[off1]) - # Do a large write - off2 = blob.alloc(1024) - byts = 512 * b':)' - blob.writeoff(off2, byts) - edata[off2] = byts - - # Backup the file - shutil.copy(fp, bkup_fp) - - with s_blobfile.BlobFile(fp) as blob: # type: s_blobfile.BlobFile - dcheck = {} - - def data_check(fd, baseoff, off, size): - fd.seek(off) - byts = fd.read(size) - dcheck[off] = byts == edata.get(off) - - for offset, size in blob.walk(): - dcheck[offset] = blob.readoff(offset, size) == edata.get(offset) - - self.eq({True}, set(dcheck.values())) - - # Restore backup file - shutil.copy(bkup_fp, fp) - fd = genfile(fp) - - # Truncate the file short - fd.seek(-2, os.SEEK_END) - fd.truncate() - fd.close() - - with s_blobfile.BlobFile(fp) as blob: # type: s_blobfile.BlobFile - with self.assertRaises(BadBlobFile) as cm: - for offset, size in blob.walk(): - pass - self.isin('BlobFile truncated', str(cm.exception)) - - # Restore backup file - shutil.copy(bkup_fp, fp) - fd = genfile(fp) - - # Add bytes - fd.seek(0, os.SEEK_END) - fd.write(b':(') - fd.close() - - with s_blobfile.BlobFile(fp) as blob: # type: s_blobfile.BlobFile - with self.assertRaises(BadBlobFile) as cm: - for offset, size in blob.walk(): - pass - self.isin('failed to read/unpack header', str(cm.exception))