diff --git a/synapse/cores/lmdb.py b/synapse/cores/lmdb.py deleted file mode 100644 index 078d8121..00000000 --- a/synapse/cores/lmdb.py +++ /dev/null @@ -1,819 +0,0 @@ -import sys -import struct -import logging -import functools -from binascii import unhexlify - -from contextlib import contextmanager -from threading import Lock - -import synapse.common as s_common -import synapse.datamodel as s_datamodel - -import synapse.cores.xact as s_xact -import synapse.cores.common as s_cores_common -import synapse.cores.storage as s_cores_storage - -import synapse.lib.threads as s_threads -import synapse.lib.msgpack as s_msgpack - -import lmdb -import xxhash - -logger = logging.getLogger(__name__) - -# File conventions: -# i, p, v, t: iden, prop, value, timestamp -# i_enc, p_enc, v_key_enc, t_enc: above encoded to be space efficient and fully-ordered when -# compared lexicographically (i.e. 'aaa' < 'ba') -# pk: primary key, unique identifier of the row in the main table -# pk_enc: space efficient encoding of pk for use as value in an index table - -# N.B. LMDB calls the separate namespaces in a file 'databases' (e.g. named parameter db=), just -# like Berkeley DB. - -# Largest primary key value. No more rows than this -MAX_PK = sys.maxsize - -# Prefix to indicate that a v is a nonnegative value -NONNEGATIVE_VAL_MARKER = 0 - -# Prefix to indicate that a v is a negative value -NEGATIVE_VAL_MARKER = -1 - -# Prefix to indicate than a v is a string -STRING_VAL_MARKER = -2 - -# Prefix to indicate that a v is hash of a string -HASH_VAL_MARKER = -3 - -# The negative marker encoded -NEGATIVE_VAL_MARKER_ENC = s_msgpack.en(NEGATIVE_VAL_MARKER) - -# The string marker encoded -STRING_VAL_MARKER_ENC = s_msgpack.en(STRING_VAL_MARKER) - -# The hash marker encoded -HASH_VAL_MARKER_ENC = s_msgpack.en(HASH_VAL_MARKER) - -# Number of bytes in a UUID -UUID_SIZE = 16 - -# An index key can't ever be larger (lexicographically) than this -MAX_INDEX_KEY = b'\xff' * 20 - -# String vals of this size or larger will be truncated and hashed in index. What this means is -# that comparison on large string vals require retrieving the row from the main table -LARGE_STRING_SIZE = 128 - -# Largest length allowed for a prop -MAX_PROP_LEN = 350 - -# Smallest and largest values for an integer value. Matches sqlite3 -MAX_INT_VAL = 2 ** 63 - 1 -MIN_INT_VAL = -1 * (2 ** 63) - -# The maximum possible timestamp. Probably a bit overkill -MAX_TIME_ENC = s_msgpack.en(MAX_INT_VAL) - -# Index Names -BLOB_STORE = b'blob_store' -ROWS = b'rows' -IDEN_PROP_INDEX = b'ip' -PROP_VAL_TIME_INDEX = b'pvt' -PROP_TIME_INDEX = b'pt' - -def _round_up(val, modulus): - return val - val % -modulus - -def _encValKey(v): - ''' - Encode a value as used in a key. - - Non-negative numbers are msgpack encoded. Negative numbers are encoded as a marker, then the - encoded negative of that value, so that the ordering of the encodings is easily mapped to the - ordering of the negative numbers. Strings too long are hashed. Note that this scheme prevents - interleaving of value types: all string encodings compare larger than all negative number - encodings compare larger than all nonnegative encodings. - ''' - if isinstance(v, int): - if v >= 0: - return s_msgpack.en(v) - else: - return NEGATIVE_VAL_MARKER_ENC + s_msgpack.en(-v) - else: - if len(v) >= LARGE_STRING_SIZE: - return (HASH_VAL_MARKER_ENC + s_msgpack.en(xxhash.xxh64(v).intdigest())) - else: - return STRING_VAL_MARKER_ENC + s_msgpack.en(v) - -# Really just want to memoize the last iden encoded, but there might be some multithreading, so keep -# a few more (8) -@functools.lru_cache(maxsize=8) -def _encIden(iden): - ''' Encode an iden ''' - return unhexlify(iden) - -# Try to memoize most of the prop names we get -@functools.lru_cache(maxsize=1024) -def _encProp(prop): - return s_msgpack.en(prop) - -# The precompiled struct parser for native size_t -_SIZET_ST = struct.Struct('@Q' if sys.maxsize > 2**32 else '@L') - -def _encPk(pk): - ''' Encode for integerkey row DB option: as a native size_t ''' - return _SIZET_ST.pack(pk) - -def _decPk(pk_enc): - ''' Inverse of above ''' - return _SIZET_ST.unpack(pk_enc)[0] - -def _calcFirstLastKeys(prop, valu, mintime, maxtime): - ''' - Returns the encoded bytes for the start and end keys to the pt or pvt - index. Helper function for _{get,del}RowsByProp - ''' - p_enc = _encProp(prop) - v_key_enc = b'' if valu is None else _encValKey(valu) - v_is_hashed = valu is not None and (v_key_enc[0] == HASH_VAL_MARKER_ENC) - if mintime is None and maxtime is None: - return (p_enc + v_key_enc, None, v_is_hashed, True) - mintime_enc = b'' if mintime is None else s_msgpack.en(mintime) - maxtime_enc = MAX_TIME_ENC if maxtime is None else s_msgpack.en(maxtime) - - first_key = p_enc + v_key_enc + mintime_enc - last_key = p_enc + v_key_enc + maxtime_enc - return (first_key, last_key, v_is_hashed, False) - -def initLmdbCortex(link, conf=None, storconf=None): - ''' - Initialize a LMDB based Cortex from a link tufo. - - Args: - link ((str, dict)): Link tufo. - conf (dict): Configable opts for the Cortex object. - storconf (dict): Configable opts for the storage object. - - Returns: - s_cores_common.Cortex: Cortex created from the link tufo. - ''' - if not conf: - conf = {} - if not storconf: - storconf = {} - - store = LmdbStorage(link, **storconf) - return s_cores_common.Cortex(link, store, **conf) - -class LmdbXact(s_xact.StoreXact): - - def _coreXactInit(self, size=None): - self.txn = None - - def _coreXactCommit(self): - self.txn.commit() - - def _coreXactBegin(self): - self.store._ensure_map_slack() - self.txn = self.store.dbenv.begin(buffers=True, write=True) - - def _coreXactAcquire(self): - self.store._write_lock.acquire() - - def _coreXactRelease(self): - self.store._write_lock.release() - -class LmdbStorage(s_cores_storage.Storage): - - def _initCoreStor(self): - self._initDbConn() - self._initCorTables() - - def _initDbInfo(self): - name = self._link[1].get('path')[1:] - if not name: - raise s_common.NoSuchFile('No Path Specified!') - - if name.find(':') == -1: - name = s_common.genpath(name) - - return {'name': name} - - def getStoreXact(self, size=None, core=None): - return LmdbXact(self, size=size, core=core) - - def _getLargestPk(self): - with self._getTxn() as txn, txn.cursor(self.rows) as cursor: - if not cursor.last(): - return 0 # db is empty - return _decPk(cursor.key()) - - def _checkForTable(self, table): - with self._getTxn() as txn: - ret = txn.get(table) - if ret: - return True - return False - - def _initCorTables(self): - - revs = [ - (0, self._rev0) - ] - - max_rev = max([rev for rev, func in revs]) - vsn_str = 'syn:core:{}:version'.format(self.getStoreType()) - - if not self._checkForTable(ROWS): - # We are a new cortex, stamp in tables and set - # blob values and move along. - self._initCorTable() - self.setBlobValu(vsn_str, max_rev) - return - # We're an existing cortex, strap in all currently required tables, - # then start applying revision functions. The revision functions will - # then be responsible for creating handles to any indexes/tables that - # they will need in order to do their jobs. - self._initCorTable() - # Apply storage layer revisions - self._revCorVers(revs) - - def _initCorTable(self): - ''' - Makes the core LMDB tables. This should always create the tables needed for - operation of the LMDB cortex for the current storage version. - ''' - # Make the main storage table, keyed by an incrementing counter, pk - # LMDB has an optimization (integerkey) if all the keys in a table are unsigned size_t. - self.rows = self.dbenv.open_db(key=ROWS, integerkey=True) # pk -> i,p,v,t - - # Note there's another LMDB optimization ('dupfixed') we're not using that we could - # in the index tables. It would pay off if a large proportion of keys are duplicates. - - # Make the iden-prop index table, keyed by iden-prop, with value being a pk - self.index_ip = self.dbenv.open_db(key=IDEN_PROP_INDEX, dupsort=True) # i,p -> pk - - # Make the prop-val-timestamp index table, with value being a pk - self.index_pvt = self.dbenv.open_db(key=PROP_VAL_TIME_INDEX, dupsort=True) # p,v,t -> pk - - # Make the prop-timestamp index table, with value being a pk - self.index_pt = self.dbenv.open_db(key=PROP_TIME_INDEX, dupsort=True) # p, t -> pk - - # Make the blob key/val index table, with the - self.blob_store = self.dbenv.open_db(key=BLOB_STORE) # k -> v - - # Set max values for dbs - self._setMaxKey() - - def _setMaxKey(self): - ''' - Put 1 max key sentinel at the end of each index table. This avoids unfortunate behavior - where the cursor moves backwards after deleting the final record. - ''' - with self._getTxn(write=True) as txn: - for db in (self.index_ip, self.index_pvt, self.index_pt): - txn.put(MAX_INDEX_KEY, b'', db=db) - # One more sentinel for going backwards through the pvt table. - txn.put(b'\x00', b'', db=self.index_pvt) - - # Find the largest stored pk. We just track this in memory from now on. - largest_pk = self._getLargestPk() - if largest_pk == MAX_PK: - raise s_common.HitCoreLimit(name='MAX_PK', size=MAX_PK, mesg='Out of primary key values') - - self.next_pk = largest_pk + 1 - - def _rev0(self): - # Simple rev0 function stub. - # If we're here, we're clearly an existing - # cortex and we need to have this valu set. - self.setBlobValu('syn:core:created', s_common.now()) - - @contextmanager - def _getTxn(self, write=False): - ''' - Acquires a transaction. - - LMDB doesn't have the concept of store access without a transaction, so figure out - whether there's already one open and use that, else make one. If we found an existing - transaction, this doesn't close it after leaving the context. If we made one and the - context is exited without exception, the transaction is committed. - ''' - existing_xact = self._store_xacts.get(s_threads.iden()) - if existing_xact is not None: - yield existing_xact.txn - else: - if write: - with self._write_lock: - self._ensure_map_slack() - with self.dbenv.begin(buffers=True, write=True) as txn: - yield txn - else: - with self.dbenv.begin(buffers=True, write=False) as txn: - yield txn - - def _ensure_map_slack(self): - ''' - Checks if there's enough extra space in the map to accomodate a commit of at least - self._slack_space size and increase it if not. - ''' - # Don't change map size if 32-bit interpreter. set_mapsize failure will lead to seg fault, - # so avoid it altogether - if sys.maxsize <= 2**32: - return - - # Figure how how much space the DB is using - used = 4096 * self.dbenv.info()['last_pgno'] - - # Round up to the next multiple of _map_slack - target_size = min(self._max_map_size, _round_up(used + self._map_slack, self._map_slack)) - - # Increase map size if necessary - if target_size > self._map_size: - self.dbenv.set_mapsize(target_size) - self._map_size = target_size - - def _initDbConn(self): - dbinfo = self._initDbInfo() - dbname = dbinfo.get('name') - - # Initial DB Size. Must be < 2 GiB for 32-bit. Can be big for 64-bit systems. Will create - # a file of that size. On Windows, will actually immediately take up that much - # disk space. - DEFAULT_MAP_SIZE = 512 * 1024 * 1024 - - # _write_lock exists solely to hold off other threads' write transactions long enough to - # potentially increase the map size. - self._write_lock = Lock() - - map_size = self._link[1].get('lmdb:mapsize', DEFAULT_MAP_SIZE) - self._map_size, _ = s_datamodel.getTypeNorm('int', map_size) - self._max_map_size = 2**46 if sys.maxsize > 2**32 else 2**30 - - map_slack = self._link[1].get('lmdb:mapslack', 2 ** 30) - self._map_slack, _ = s_datamodel.getTypeNorm('int', map_slack) - - # Maximum number of 'databases', really tables. We use 5 different tables (1 main plus - # 3 indices and a blob store), + 10 tables for possible migration use cases. - MAX_DBS = 5 + 10 - - # flush system buffers to disk only once per transaction. Set to False can lead to last - # transaction loss, but not corruption - - metasync_val = self._link[1].get('lmdb:metasync', False) - metasync, _ = s_datamodel.getTypeNorm('bool', metasync_val) - metasync = (metasync == 1) - - # If sync is False, could lead to database corruption on power loss - sync_val = self._link[1].get('lmdb:sync', True) - sync, _ = s_datamodel.getTypeNorm('bool', sync_val) - sync = (sync == 1) - - # Write data directly to mapped memory - WRITEMAP = True - - # Doesn't create a subdirectory for storage files - SUBDIR = False - - # We can disable locking, but bad things might happen if we have multiple threads - DEFAULT_LOCK = True - lock_val = self._link[1].get('lmdb:lock', DEFAULT_LOCK) - lock, _ = s_datamodel.getTypeNorm('bool', lock_val) - lock = (lock == 1) - - # Maximum simultaneous readers. - MAX_READERS = 4 - max_readers = self._link[1].get('lmdb:maxreaders', MAX_READERS) - max_readers, _ = s_datamodel.getTypeNorm('int', max_readers) - if max_readers == 1: - lock = False - - self.dbenv = lmdb.Environment(dbname, - map_size=self._map_size, - subdir=SUBDIR, - metasync=metasync, - writemap=WRITEMAP, - max_readers=max_readers, - max_dbs=MAX_DBS, - sync=sync, - lock=lock) - - # Check we're not running a weird version of LMDB - if self.dbenv.stat()['psize'] != 4096: - raise s_common.BadCoreStore(store='lmdb', mesg='Unknown version of lmdb configured') - - # Ensure we have enough room in the map for expansion - self._ensure_map_slack() - - def onfini(): - self.dbenv.close() - self.onfini(onfini) - - def _addRows(self, rows): - ''' - Adds a bunch of rows to the database - - Take care: this was written this way for performance, in particular when len(rows) is - large. - ''' - encs = [] - - with self._getTxn(write=True) as txn: - next_pk = self.next_pk - - # First, we encode all the i, p, v, t for all rows - for i, p, v, t in rows: - if next_pk > MAX_PK: - raise s_common.HitCoreLimit(name='MAX_PK', size=MAX_PK, mesg='Out of primary key values') - if len(p) > MAX_PROP_LEN: - raise s_common.HitCoreLimit(name='MAX_PROP_LEN', size=MAX_PROP_LEN, mesg='Property length too large') - i_enc = _encIden(i) - p_enc = _encProp(p) - v_key_enc = _encValKey(v) - t_enc = s_msgpack.en(t) - pk_enc = _encPk(next_pk) - row_enc = s_msgpack.en((i, p, v, t)) - - # idx 0 1 2 3 4 5 - encs.append((i_enc, p_enc, row_enc, t_enc, v_key_enc, pk_enc)) - next_pk += 1 - - # An iterator of what goes into the main table: key=pk_enc, val=encoded(i, p, v, t) - kvs = ((x[5], x[2]) for x in encs) - - # Shove it all in at once - consumed, added = txn.cursor(self.rows).putmulti(kvs, overwrite=False, append=True) - if consumed != added or consumed != len(encs): - # Will only fail if record already exists, which should never happen - raise s_common.BadCoreStore(store='lmdb', mesg='unexpected pk in DB') - - # Update the indices for all rows - kvs = ((x[0] + x[1], x[5]) for x in encs) - txn.cursor(self.index_ip).putmulti(kvs, dupdata=True) - kvs = ((x[1] + x[4] + x[3], x[5]) for x in encs) - txn.cursor(self.index_pvt).putmulti(kvs, dupdata=True) - kvs = ((x[1] + x[3], x[5]) for x in encs) - txn.cursor(self.index_pt).putmulti(kvs, dupdata=True) - - # self.next_pk should be protected from multiple writers. Luckily lmdb write lock does - # that for us. - self.next_pk = next_pk - - def _getRowByPkValEnc(self, txn, pk_enc): - row = txn.get(pk_enc, db=self.rows) - if row is None: - raise s_common.BadCoreStore(store='lmdb', mesg='Index val has no corresponding row') - return s_msgpack.un(row) - - def getRowsById(self, iden): - iden_enc = _encIden(iden) - rows = [] - with self._getTxn() as txn, txn.cursor(self.index_ip) as cursor: - if not cursor.set_range(iden_enc): - raise s_common.BadCoreStore(store='lmdb', mesg='Missing sentinel') - for key, pk_enc in cursor: - if key[:len(iden_enc)] != iden_enc: - break - rows.append(self._getRowByPkValEnc(txn, pk_enc)) - - return rows - - def _delRowsById(self, iden): - i_enc = _encIden(iden) - - with self._getTxn(write=True) as txn, txn.cursor(self.index_ip) as cursor: - # Get the first record >= i_enc - if not cursor.set_range(i_enc): - raise s_common.BadCoreStore(store='lmdb', mesg='Missing sentinel') - while True: - # We don't use iterator here because the delete already advances to the next - # record - key, value = cursor.item() - if key[:len(i_enc)] != i_enc: - return - p_enc = key[len(i_enc):].tobytes() - # Need to copy out with tobytes because we're deleting - pk_enc = value.tobytes() - - if not cursor.delete(): - raise s_common.BadCoreStore(store='lmdb', mesg='Delete failure') - self._delRowAndIndices(txn, pk_enc, i_enc=i_enc, p_enc=p_enc, - delete_ip=False) - - def _delRowsByIdProp(self, iden, prop, valu=None): - i_enc = _encIden(iden) - p_enc = _encProp(prop) - first_key = i_enc + p_enc - - with self._getTxn(write=True) as txn, txn.cursor(self.index_ip) as cursor: - # Retrieve and delete I-P index - if not cursor.set_range(first_key): - raise s_common.BadCoreStore(store='lmdb', mesg='Missing sentinel') - while True: - # We don't use iterator here because the delete already advances to the next - # record - key, value = cursor.item() - if key[:len(first_key)] != first_key: - return - # Need to copy out with tobytes because we're deleting - pk_enc = value.tobytes() - - # Delete the row and the other indices - if not self._delRowAndIndices(txn, pk_enc, i_enc=i_enc, p_enc=p_enc, - delete_ip=False, only_if_val=valu): - if not cursor.next(): - raise s_common.BadCoreStore(store='lmdb', mesg='Missing sentinel') - else: - if not cursor.delete(): - raise s_common.BadCoreStore(store='lmdb', mesg='Delete failure') - - def _delRowAndIndices(self, txn, pk_enc, i_enc=None, p_enc=None, v_key_enc=None, t_enc=None, - delete_ip=True, delete_pvt=True, delete_pt=True, only_if_val=None): - ''' Deletes the row corresponding to pk_enc and the indices pointing to it ''' - with txn.cursor(self.rows) as cursor: - if not cursor.set_key(pk_enc): - raise s_common.BadCoreStore(store='lmdb', mesg='Missing PK') - i, p, v, t = s_msgpack.un(cursor.value()) - - if only_if_val is not None and only_if_val != v: - return False - cursor.delete() - - if delete_ip and i_enc is None: - i_enc = _encIden(i) - - if p_enc is None: - p_enc = _encProp(p) - - if delete_pvt and v_key_enc is None: - v_key_enc = _encValKey(v) - - if (delete_pvt or delete_pt) and t_enc is None: - t_enc = s_msgpack.en(t) - - if delete_ip: - # Delete I-P index entry - if not txn.delete(i_enc + p_enc, value=pk_enc, db=self.index_ip): - raise s_common.BadCoreStore(store='lmdb', mesg='Missing I-P index') - - if delete_pvt: - # Delete P-V-T index entry - if not txn.delete(p_enc + v_key_enc + t_enc, value=pk_enc, db=self.index_pvt): - raise s_common.BadCoreStore(store='lmdb', mesg='Missing P-V-T index') - - if delete_pt: - # Delete P-T index entry - if not txn.delete(p_enc + t_enc, value=pk_enc, db=self.index_pt): - raise s_common.BadCoreStore(store='lmdb', mesg='Missing P-T index') - - return True - - def getRowsByIdProp(self, iden, prop, valu=None): - # For now not making a ipv index because multiple v for a given i,p are probably rare - iden_enc = _encIden(iden) - prop_enc = _encProp(prop) - - first_key = iden_enc + prop_enc - - ret = [] - with self._getTxn() as txn, txn.cursor(self.index_ip) as cursor: - if not cursor.set_range(first_key): - raise s_common.BadCoreStore(store='lmdb', mesg='Missing sentinel') - for key, value in cursor: - if key.tobytes() != first_key: - return ret - row = self._getRowByPkValEnc(txn, value) - if valu is not None and row[2] != valu: - continue - ret.append(row) - raise s_common.BadCoreStore(store='lmdb', mesg='Missing sentinel') - - def getSizeByProp(self, prop, valu=None, limit=None, mintime=None, maxtime=None): - return self.getRowsByProp(prop, valu, limit, mintime, maxtime, do_count_only=True) - - def getRowsByProp(self, prop, valu=None, limit=None, mintime=None, maxtime=None, - do_count_only=False): - indx = self.index_pt if valu is None else self.index_pvt - first_key, last_key, v_is_hashed, do_fast_compare = _calcFirstLastKeys(prop, valu, - mintime, maxtime) - - count = 0 - rows = [] - - with self._getTxn() as txn, txn.cursor(indx) as cursor: - if not cursor.set_range(first_key): - raise s_common.BadCoreStore(store='lmdb', mesg='Missing sentinel') - while True: - if limit is not None and count >= limit: - break - key, pk_enc = cursor.item() - if do_fast_compare: - if key[:len(first_key)] != first_key: - break - else: - if key.tobytes() >= last_key: - break - if v_is_hashed or not do_count_only: - row = self._getRowByPkValEnc(txn, pk_enc) - if v_is_hashed: - if row[2] != valu: - continue - if not do_count_only: - rows.append(row) - count += 1 - if not cursor.next(): - raise s_common.BadCoreStore(store='lmdb', mesg='Missing sentinel') - - return count if do_count_only else rows - - def _delRowsByProp(self, prop, valu=None, mintime=None, maxtime=None): - indx = self.index_pt if valu is None else self.index_pvt - first_key, last_key, v_is_hashed, do_fast_compare = _calcFirstLastKeys(prop, valu, - mintime, maxtime) - with self._getTxn(write=True) as txn, txn.cursor(indx) as cursor: - if not cursor.set_range(first_key): - raise s_common.BadCoreStore(store='lmdb', mesg='Missing sentinel') - while True: - key, pk_enc = cursor.item() - if do_fast_compare: - if key[:len(first_key)] != first_key: - break - else: - if key.tobytes() >= last_key: - break - - if self._delRowAndIndices(txn, pk_enc, - delete_pt=(valu is not None), - delete_pvt=(valu is None), - only_if_val=(valu if v_is_hashed else None)): - # Delete did go through: delete entry at cursor - if not cursor.delete(): - raise s_common.BadCoreStore(store='lmdb', mesg='Delete failure') - else: - # Delete didn't go through: advance to next - if not cursor.next(): - raise s_common.BadCoreStore(store='lmdb', mesg='Missing sentinel') - - def sizeByGe(self, prop, valu, limit=None): - return self._rowsByMinmax(prop, valu, MAX_INT_VAL, limit, right_closed=True, - do_count_only=True) - - def rowsByGe(self, prop, valu, limit=None): - return self._rowsByMinmax(prop, valu, MAX_INT_VAL, limit, right_closed=True) - - def sizeByLe(self, prop, valu, limit=None): - return self._rowsByMinmax(prop, MIN_INT_VAL, valu, limit, right_closed=True, - do_count_only=True) - - def sizeByLt(self, prop, valu, limit=None): - return self._rowsByMinmax(prop, MIN_INT_VAL, valu, limit, right_closed=False, - do_count_only=True) - - def rowsByLe(self, prop, valu, limit=None): - return self._rowsByMinmax(prop, MIN_INT_VAL, valu, limit, right_closed=True) - - def sizeByRange(self, prop, valu, limit=None): - return self._rowsByMinmax(prop, valu[0], valu[1], limit, do_count_only=True) - - def rowsByRange(self, prop, valu, limit=None): - return self._rowsByMinmax(prop, valu[0], valu[1], limit) - - def _joinsByLe(self, prop, valu, limit=None): - rows = self._rowsByMinmax(prop, MIN_INT_VAL, valu, limit, right_closed=True) - return rows - - def _joinsByGe(self, prop, valu, limit=None): - rows = self._rowsByMinmax(prop, valu, MAX_INT_VAL, limit, right_closed=True) - return rows - - def _rowsByMinmax(self, prop, minval, maxval, limit, right_closed=False, do_count_only=False): - ''' Returns either count or actual rows for a range of prop vals where both min and max - may be closed (included) or open (not included) ''' - if minval > maxval: - return 0 - do_neg_search = (minval < 0) - do_pos_search = (maxval >= 0) - ret = 0 if do_count_only else [] - - p_enc = _encProp(prop) - - # The encodings of negative integers and positive integers are not continuous, so we split - # into two queries. Also, the ordering of the encoding of negative integers is backwards. - if do_neg_search: - # We include the right boundary (-1) if we're searching through to the positives - this_right_closed = do_pos_search or right_closed - first_val = minval - last_val = min(-1, maxval) - ret += self._subrangeRows(p_enc, first_val, last_val, limit, this_right_closed, - do_count_only) - if limit is not None: - limit -= ret if do_count_only else len(ret) - if limit == 0: - return ret - - if do_pos_search: - first_val = max(0, minval) - last_val = maxval - ret += self._subrangeRows(p_enc, first_val, last_val, limit, right_closed, - do_count_only) - return ret - - def _subrangeRows(self, p_enc, first_val, last_val, limit, right_closed, do_count_only): - ''' Performs part of a range query, either completely negative or non-negative ''' - first_key = p_enc + _encValKey(first_val) - - am_going_backwards = (first_val < 0) - - last_key = p_enc + _encValKey(last_val) - - ret = [] - count = 0 - - # Figure out the terminating condition of the loop - if am_going_backwards: - term_cmp = bytes.__lt__ if right_closed else bytes.__le__ - else: - term_cmp = bytes.__gt__ if right_closed else bytes.__ge__ - - with self._getTxn() as txn, txn.cursor(self.index_pvt) as cursor: - if not cursor.set_range(first_key): - raise s_common.BadCoreStore(store='lmdb', mesg='Missing sentinel') - if am_going_backwards: - # set_range sets the cursor at the first key >= first_key, if we're going backwards - # we actually want the first key <= first_key - if cursor.key()[:len(first_key)].tobytes() > first_key: - if not cursor.prev(): - raise s_common.BadCoreStore(store='lmdb', mesg='Missing sentinel') - it = cursor.iterprev(keys=True, values=True) - else: - it = cursor.iternext(keys=True, values=True) - - for key, value in it: - if term_cmp(key[:len(last_key)].tobytes(), last_key): - break - if limit is not None and count >= limit: - break - if not do_count_only: - ret.append(self._getRowByPkValEnc(txn, value)) - count += 1 - - return count if do_count_only else ret - - def _genStoreRows(self, **kwargs): - gsize = kwargs.get('gsize', 1000) - lifted = 0 - with self._getTxn() as txn: # type: lmdb.Transaction - with txn.cursor(db=self.rows) as cur: # type: lmdb.Cursor - cur.first() - gen = cur.iternext(keys=False, values=True) - while True: - rows = [] - for row in gen: - row = s_msgpack.un(row) - rows.append(row) - if len(rows) == gsize: - break - if rows: - lifted += len(rows) - yield rows - else: - break - - def getStoreType(self): - return 'lmdb' - - def _getBlobValu(self, key): - key_byts = s_msgpack.en(key.encode('utf-8')) - with self._getTxn() as txn: # type: lmdb.Transaction - ret = txn.get(key_byts, default=None, db=self.blob_store) - return ret - - def _setBlobValu(self, key, valu): - key_byts = s_msgpack.en(key.encode('utf-8')) - with self._getTxn(write=True) as txn: # type: lmdb.Transaction - txn.put(key_byts, valu, overwrite=True, db=self.blob_store) - - def _hasBlobValu(self, key): - ret = self._getBlobValu(key) - if ret is None: - return False - return True - - def _delBlobValu(self, key): - key_byts = s_msgpack.en(key.encode('utf-8')) - with self._getTxn(write=True) as txn: # type: lmdb.Transaction - ret = txn.pop(key_byts, db=self.blob_store) - - if ret is None: # pragma: no cover - # We should never get here, but if we do, throw an exception. - raise s_common.NoSuchName(name=key, mesg='Cannot delete key which is not present in the blobstore.') - return ret - - def _getBlobKeys(self): - with self._getTxn(write=True) as txn: # type: lmdb.Transaction - cur = txn.cursor(self.blob_store) # type: lmdb.Cursor - cur.first() - ret = [s_msgpack.un(key).decode('utf-8') for key in cur.iternext(values=False)] - return ret diff --git a/synapse/cores/postgres.py b/synapse/cores/postgres.py deleted file mode 100644 index 21e1d5f7..00000000 --- a/synapse/cores/postgres.py +++ /dev/null @@ -1,189 +0,0 @@ -import time -import hashlib - -import synapse.common as s_common -import synapse.datamodel as s_datamodel - -import synapse.cores.common as s_cores_common -import synapse.cores.sqlite as s_cores_sqlite - -def md5(x): - return hashlib.md5(x.encode('utf8')).hexdigest() - -def initPsqlCortex(link, conf=None, storconf=None): - ''' - Initialize a Sqlite based Cortex from a link tufo. - - Args: - link ((str, dict)): Link tufo. - conf (dict): Configable opts for the Cortex object. - storconf (dict): Configable opts for the storage object. - - Returns: - s_cores_common.Cortex: Cortex created from the link tufo. - ''' - if not conf: - conf = {} - if not storconf: - storconf = {} - - store = PsqlStorage(link, **storconf) - return s_cores_common.Cortex(link, store, **conf) - -class PsqlStorage(s_cores_sqlite.SqliteStorage): - - dblim = None - - # postgres uses BYTEA instead of BLOB - _t_init_blobtable = ''' - CREATE TABLE {{BLOB_TABLE}} ( - k VARCHAR, - v BYTEA - ); - ''' - # postgres upsert! - _t_blob_set = 'INSERT INTO {{BLOB_TABLE}} (k, v) VALUES ({{KEY}}, {{VALU}}) ON CONFLICT (k) DO UPDATE SET ' \ - 'v={{VALU}}' - - # postgres over-rides for md5() based indexing - _t_init_strval_idx = 'CREATE INDEX {{TABLE}}_strval_idx ON {{TABLE}} (prop,MD5(strval),tstamp)' - - _t_getrows_by_prop_str = 'SELECT * FROM {{TABLE}} WHERE prop={{PROP}} AND MD5(strval)=MD5({{VALU}}) LIMIT {{LIMIT}}' - _t_getrows_by_prop_str_wmin = 'SELECT * FROM {{TABLE}} WHERE prop={{PROP}} AND MD5(strval)=MD5({{VALU}}) AND tstamp >= {{MINTIME}} LIMIT {{LIMIT}}' - _t_getrows_by_prop_str_wmax = 'SELECT * FROM {{TABLE}} WHERE prop={{PROP}} AND MD5(strval)=MD5({{VALU}}) AND tstamp<{{MAXTIME}} LIMIT {{LIMIT}}' - _t_getrows_by_prop_str_wminmax = 'SELECT * FROM {{TABLE}} WHERE prop={{PROP}} AND MD5(strval)=MD5({{VALU}}) AND tstamp >= {{MINTIME}} AND tstamp<{{MAXTIME}} LIMIT {{LIMIT}}' - _t_getsize_by_prop_str = 'SELECT COUNT(*) FROM {{TABLE}} WHERE prop={{PROP}} AND MD5(strval)=MD5({{VALU}}) LIMIT {{LIMIT}}' - _t_getsize_by_prop_str_wmin = 'SELECT COUNT(*) FROM {{TABLE}} WHERE prop={{PROP}} AND MD5(strval)=MD5({{VALU}}) AND tstamp>={{MINTIME}} LIMIT {{LIMIT}}' - _t_getsize_by_prop_str_wmax = 'SELECT COUNT(*) FROM {{TABLE}} WHERE prop={{PROP}} AND MD5(strval)=MD5({{VALU}}) AND tstamp<{{MAXTIME}} LIMIT {{LIMIT}}' - _t_getsize_by_prop_str_wminmax = 'SELECT COUNT(*) FROM {{TABLE}} WHERE prop={{PROP}} AND MD5(strval)=MD5({{VALU}}) AND tstamp>={{MINTIME}} AND tstamp<{{MAXTIME}} LIMIT {{LIMIT}}' - _t_delrows_by_prop_str = 'DELETE FROM {{TABLE}} WHERE prop={{PROP}} AND MD5(strval)=MD5({{VALU}})' - _t_delrows_by_prop_str_wmin = 'DELETE FROM {{TABLE}} WHERE prop={{PROP}} AND MD5(strval)=MD5({{VALU}}) AND tstamp>={{MINTIME}}' - _t_delrows_by_prop_str_wmax = 'DELETE FROM {{TABLE}} WHERE prop={{PROP}} AND MD5(strval)=MD5({{VALU}}) AND tstamp<{{MAXTIME}}' - _t_delrows_by_prop_str_wminmax = 'DELETE FROM {{TABLE}} WHERE prop={{PROP}} AND MD5(strval)=MD5({{VALU}}) AND tstamp>={{MINTIME}} AND tstamp<{{MAXTIME}}' - _t_getjoin_by_prop_str = 'SELECT * FROM {{TABLE}} WHERE iden IN (SELECT iden FROM {{TABLE}} WHERE prop={{PROP}} AND MD5(strval)=MD5({{VALU}}) LIMIT {{LIMIT}})' - _t_getjoin_by_prop_str_wmin = 'SELECT * FROM {{TABLE}} WHERE iden IN (SELECT iden FROM {{TABLE}} WHERE prop={{PROP}} AND MD5(strval)=MD5({{VALU}}) AND tstamp>={{MINTIME}} LIMIT {{LIMIT}})' - _t_getjoin_by_prop_str_wmax = 'SELECT * FROM {{TABLE}} WHERE iden IN (SELECT iden FROM {{TABLE}} WHERE prop={{PROP}} AND MD5(strval)=MD5({{VALU}}) AND tstamp<{{MAXTIME}} LIMIT {{LIMIT}})' - _t_getjoin_by_prop_str_wminmax = 'SELECT * FROM {{TABLE}} WHERE iden IN (SELECT iden FROM {{TABLE}} WHERE prop={{PROP}} AND MD5(strval)=MD5({{VALU}}) AND tstamp>={{MINTIME}} AND tstamp<{{MAXTIME}} LIMIT {{LIMIT}})' - - _t_deljoin_by_prop_str = 'DELETE FROM {{TABLE}} WHERE iden IN (SELECT iden FROM {{TABLE}} WHERE prop={{PROP}} AND MD5(strval)=MD5({{VALU}}))' - _t_deljoin_by_prop_str_wmin = 'DELETE FROM {{TABLE}} WHERE iden IN (SELECT iden FROM {{TABLE}} WHERE prop={{PROP}} AND MD5(strval)=MD5({{VALU}}) AND tstamp>={{MINTIME}} )' - _t_deljoin_by_prop_str_wmax = 'DELETE FROM {{TABLE}} WHERE iden IN (SELECT iden FROM {{TABLE}} WHERE prop={{PROP}} AND MD5(strval)=MD5({{VALU}}) AND tstamp<{{MAXTIME}} )' - _t_deljoin_by_prop_str_wminmax = 'DELETE FROM {{TABLE}} WHERE iden IN (SELECT iden FROM {{TABLE}} WHERE prop={{PROP}} AND MD5(strval)=MD5({{VALU}}) AND tstamp>={{MINTIME}} AND tstamp<{{MAXTIME}} )' - - _t_istable = ''' - SELECT 1 - FROM information_schema.tables - WHERE table_name = {{NAME}} - ''' - - _t_getjoin_by_in_int = 'SELECT * FROM {{TABLE}} WHERE iden IN (SELECT iden FROM {{TABLE}} WHERE prop={{PROP}} and intval IN {{VALU}} LIMIT {{LIMIT}})' - _t_getjoin_by_in_str = 'SELECT * FROM {{TABLE}} WHERE iden IN (SELECT iden FROM {{TABLE}} WHERE prop={{PROP}} and MD5(strval) IN {{VALU}} LIMIT {{LIMIT}})' - - _t_getrows_by_idens = 'SELECT * FROM {{TABLE}} WHERE iden IN {{VALU}}' - - _t_uprows_by_prop_val_str = 'UPDATE {{TABLE}} SET strval={{NEWVALU}} WHERE prop={{PROP}} and MD5(strval)=MD5({{OLDVALU}})' - - def _initDbConn(self): - import psycopg2 - self._psycopg2 = psycopg2 - - retry = self._link[1].get('retry', 0) - - dbinfo = self._initDbInfo() - - db = None - tries = 0 - while db is None: - try: - db = psycopg2.connect(**dbinfo) - except Exception as e: # pragma: no cover - tries += 1 - if tries > retry: - raise - - time.sleep(1) - - seqscan = self._link[1].get('pg:seqscan', 0) - seqscan, _ = s_datamodel.getTypeNorm('bool', seqscan) - - c = db.cursor() - c.execute('SET enable_seqscan=%s', (seqscan,)) - c.close() - - return db - - def _getTableName(self): - path = self._link[1].get('path') - if not path: - return 'syncortex' - - parts = [p for p in path.split('/') if p] - if len(parts) <= 1: - return 'syncortex' - - return parts[1] - - def _initDbInfo(self): - - dbinfo = {} - - path = self._link[1].get('path') - if path: - parts = [p for p in path.split('/') if p] - if parts: - dbinfo['database'] = parts[0] - - host = self._link[1].get('host') - if host is not None: - dbinfo['host'] = host - - port = self._link[1].get('port') - if port is not None: - dbinfo['port'] = port - - user = self._link[1].get('user') - if user is not None: - dbinfo['user'] = user - - passwd = self._link[1].get('passwd') - if passwd is not None: - dbinfo['password'] = passwd - - return dbinfo - - def _joinsByIn(self, prop, valus, limit=None): - if len(valus) == 0: - return [] - - limit = self._getDbLimit(limit) - - if isinstance(valus[0], int): - q = self._q_getjoin_by_in_int - else: - q = self._q_getjoin_by_in_str - valus = [md5(v) for v in valus] - - rows = self.select(q, prop=prop, valu=tuple(valus), limit=limit) - return self._foldTypeCols(rows) - - def getRowsByIdens(self, idens): - if len(idens) == 0: - return [] - rows = self.select(self._q_getrows_by_idens, valu=tuple(idens)) - return self._foldTypeCols(rows) - - def _initCorQueries(self): - s_cores_sqlite.SqliteStorage._initCorQueries(self) - - self._q_getrows_by_idens = self._prepQuery(self._t_getrows_by_idens) - self._q_getjoin_by_in_int = self._prepQuery(self._t_getjoin_by_in_int) - self._q_getjoin_by_in_str = self._prepQuery(self._t_getjoin_by_in_str) - - def _addVarDecor(self, name): - return '%%(%s)s' % (name,) - - def getStoreType(self): - return 'postgres' - - def _prepBlobValu(self, valu): - return memoryview(valu) diff --git a/synapse/cores/ram.py b/synapse/cores/ram.py deleted file mode 100644 index e23f126b..00000000 --- a/synapse/cores/ram.py +++ /dev/null @@ -1,230 +0,0 @@ -import sys -import collections - -import synapse.cores.xact as s_xact -import synapse.cores.common as s_cores_common -import synapse.cores.storage as s_cores_storage - -def initRamCortex(link, conf=None, storconf=None): - ''' - Initialize a RAM based Cortex from a link tufo. - - The path element of the link tufo, if present, is used to cache the Cortex - instance. Subsequent calls with the same path will return the existing - Cortex instance. - - Args: - link ((str, dict)): Link tufo. - conf (dict): Configable opts for the Cortex object. - storconf (dict): Configable opts for the storage object. - - Returns: - s_cores_common.Cortex: Cortex created from the link tufo. - ''' - if not conf: - conf = {} - if not storconf: - storconf = {} - - path = link[1].get('path').strip('/') - if not path: - store = RamStorage(link, **storconf) - return s_cores_common.Cortex(link, store, **conf) - - core = ramcores.get(path) - if core is None: - store = RamStorage(link, **storconf) - core = s_cores_common.Cortex(link, store, **conf) - - ramcores[path] = core - def onfini(): - ramcores.pop(path, None) - - core.onfini(onfini) - - return core - -class RamXact(s_xact.StoreXact): - - # Ram Cortex fakes out the idea of xact... - def _coreXactBegin(self): - pass - - def _coreXactCommit(self): - pass - -class RamStorage(s_cores_storage.Storage): - - def _initCoreStor(self): - self.rowsbyid = collections.defaultdict(set) - self.rowsbyprop = collections.defaultdict(set) - self.rowsbyvalu = collections.defaultdict(set) - self._blob_store = {} - - def getStoreXact(self, size=None, core=None): - return RamXact(self, size=size, core=core) - - def _joinsByGe(self, prop, valu, limit=None): - # FIXME sortedcontainers optimizations go here - rows = self.rowsByGe(prop, valu, limit=limit) - return self.getRowsByIdens([r[0] for r in rows]) - - def _joinsByLe(self, prop, valu, limit=None): - # FIXME sortedcontainers optimizations go here - rows = self.rowsByLe(prop, valu, limit=limit) - return self.getRowsByIdens([r[0] for r in rows]) - - def sizeByRange(self, prop, valu, limit=None): - minval, maxval = valu[0], valu[1] - return sum(1 for r in self.rowsbyprop.get(prop, ()) if isinstance(r[2], int) and r[2] >= minval and r[2] < - maxval) - - def rowsByRange(self, prop, valu, limit=None): - minval, maxval = valu[0], valu[1] - # HACK: for speed - ret = [r for r in self.rowsbyprop.get(prop, ()) if isinstance(r[2], int) and r[2] >= minval and r[2] < maxval] - - if limit is not None: - ret = ret[:limit] - - return ret - - def sizeByGe(self, prop, valu, limit=None): - return sum(1 for r in self.rowsbyprop.get(prop, ()) if isinstance(r[2], int) and r[2] >= valu) - - def rowsByGe(self, prop, valu, limit=None): - return [r for r in self.rowsbyprop.get(prop, ()) if isinstance(r[2], int) and r[2] >= valu][:limit] - - def sizeByLe(self, prop, valu, limit=None): - return sum(1 for r in self.rowsbyprop.get(prop, ()) if isinstance(r[2], int) and r[2] <= valu) - - def rowsByLe(self, prop, valu, limit=None): - return [r for r in self.rowsbyprop.get(prop, ()) if isinstance(r[2], int) and r[2] <= valu][:limit] - - def _addRows(self, rows): - for row in rows: - row = (sys.intern(row[0]), sys.intern(row[1]), row[2], row[3]) - self.rowsbyid[row[0]].add(row) - self.rowsbyprop[row[1]].add(row) - self.rowsbyvalu[(row[1], row[2])].add(row) - - def _delRowsById(self, iden): - for row in self.rowsbyid.pop(iden, ()): - self._delRawRow(row) - - def _delRowsByIdProp(self, iden, prop, valu=None): - if valu is None: - rows = [row for row in self.rowsbyid.get(iden, ()) if row[1] == prop] - [self._delRawRow(row) for row in rows] - return - - rows = [row for row in self.rowsbyid.get(iden, ()) if row[1] == prop and row[2] == valu] - [self._delRawRow(row) for row in rows] - return - - def getRowsByIdProp(self, iden, prop, valu=None): - if valu is None: - return [row for row in self.rowsbyid.get(iden, ()) if row[1] == prop] - - return [row for row in self.rowsbyid.get(iden, ()) if row[1] == prop and row[2] == valu] - - def _delRowsByProp(self, prop, valu=None, mintime=None, maxtime=None): - for row in self.getRowsByProp(prop, valu=valu, mintime=mintime, maxtime=maxtime): - self._delRawRow(row) - - def _delRawRow(self, row): - - byid = self.rowsbyid.get(row[0]) - if byid is not None: - byid.discard(row) - - byprop = self.rowsbyprop[row[1]] - byprop.discard(row) - if not byprop: - self.rowsbyprop.pop(row[1], None) - - propvalu = (row[1], row[2]) - - byvalu = self.rowsbyvalu[propvalu] - byvalu.discard(row) - if not byvalu: - self.rowsbyvalu.pop(propvalu, None) - - def getRowsById(self, iden): - return list(self.rowsbyid.get(iden, ())) - - def getRowsByIdens(self, idens): - ret = [] - [ret.extend(self.rowsbyid.get(iden, ())) for iden in idens] - return ret - - def getRowsByProp(self, prop, valu=None, mintime=None, maxtime=None, limit=None): - - if valu is None: - rows = self.rowsbyprop.get(prop) - else: - rows = self.rowsbyvalu.get((prop, valu)) - - if rows is None: - return - - c = 0 - # This was originally a set, but sets are mutable and throw - # runtimeerrors if their size changes during iteration - for row in tuple(rows): - if limit is not None and c >= limit: - break - - if mintime is not None and row[3] < mintime: - continue - - if maxtime is not None and row[3] >= maxtime: - continue - - yield row - - c += 1 - - def getSizeByProp(self, prop, valu=None, mintime=None, maxtime=None): - if valu is None: - rows = self.rowsbyprop.get(prop) - else: - rows = self.rowsbyvalu.get((prop, valu)) - - if rows is None: - return 0 - - if mintime is not None: - rows = [row for row in rows if row[3] >= mintime] - - if maxtime is not None: - rows = [row for row in rows if row[3] < maxtime] - - return len(rows) - - def getStoreType(self): - return 'ram' - - def _getBlobValu(self, key): - ret = self._blob_store.get(key) - return ret - - def _setBlobValu(self, key, valu): - self._blob_store[key] = valu - - def _hasBlobValu(self, key): - return key in self._blob_store - - def _delBlobValu(self, key): - ret = self._blob_store.pop(key) - return ret - - def _getBlobKeys(self): - ret = list(self._blob_store.keys()) - return ret - - def _genStoreRows(self, **kwargs): - for iden, rows in self.rowsbyid.items(): - yield list(rows) - -ramcores = {} diff --git a/synapse/cores/sqlite.py b/synapse/cores/sqlite.py deleted file mode 100644 index 21fa7374..00000000 --- a/synapse/cores/sqlite.py +++ /dev/null @@ -1,863 +0,0 @@ -import queue -import logging -import sqlite3 - -import regex - -import synapse.common as s_common - -import synapse.cores.xact as s_xact -import synapse.cores.common as s_cores_common -import synapse.cores.storage as s_cores_storage - -logger = logging.getLogger(__name__) - -stashre = regex.compile('{{([A-Z]+)}}') - -int_t = type(0) -str_t = type('synapse') -none_t = type(None) - -def initSqliteCortex(link, conf=None, storconf=None): - ''' - Initialize a Sqlite based Cortex from a link tufo. - - Args: - link ((str, dict)): Link tufo. - conf (dict): Configable opts for the Cortex object. - storconf (dict): Configable opts for the storage object. - - Returns: - s_cores_common.Cortex: Cortex created from the link tufo. - ''' - if not conf: - conf = {} - if not storconf: - storconf = {} - - store = SqliteStorage(link, **storconf) - return s_cores_common.Cortex(link, store, **conf) - -class SqlXact(s_xact.StoreXact): - - def _coreXactInit(self): - self.db = None - self.cursor = None - - def _coreXactCommit(self): - self.cursor.execute('COMMIT') - - def _coreXactBegin(self): - self.cursor.execute('BEGIN TRANSACTION') - - def _coreXactAcquire(self): - self.db = self.store.dbpool.get() - self.cursor = self.db.cursor() - - def _coreXactRelease(self): - self.cursor.close() - self.store.dbpool.put(self.db) - - self.db = None - self.cursor = None - -class DbPool: - ''' - The DbPool allows generic db connection pooling using - a factory/ctor method and a python queue. - - Example: - - def connectdb(): - # do stuff - return db - - pool = DbPool(3, connectdb) - - ''' - - def __init__(self, size, ctor): - # TODO: high/low water marks - self.size = size - self.ctor = ctor - self.dbque = queue.Queue() - - for i in range(size): - db = ctor() - self.put(db) - - def put(self, db): - ''' - Add/Return a db connection to the pool. - ''' - self.dbque.put(db) - - def get(self): - return self.dbque.get() - -class SqliteStorage(s_cores_storage.Storage): - - dblim = -1 - - _t_istable = ''' - SELECT - name - FROM - sqlite_master - WHERE - type='table' - AND - name={{NAME}} - ''' - - _t_inittable = ''' - CREATE TABLE {{TABLE}} ( - iden VARCHAR, - prop VARCHAR, - strval TEXT, - intval BIGINT, - tstamp BIGINT - ); - ''' - - _t_init_blobtable = ''' - CREATE TABLE {{BLOB_TABLE}} ( - k VARCHAR, - v BLOB - ); - ''' - - _t_init_iden_idx = 'CREATE INDEX {{TABLE}}_iden_idx ON {{TABLE}} (iden,prop)' - _t_init_prop_idx = 'CREATE INDEX {{TABLE}}_prop_time_idx ON {{TABLE}} (prop,tstamp)' - _t_init_strval_idx = 'CREATE INDEX {{TABLE}}_strval_idx ON {{TABLE}} (prop,strval,tstamp)' - _t_init_intval_idx = 'CREATE INDEX {{TABLE}}_intval_idx ON {{TABLE}} (prop,intval,tstamp)' - _t_init_blobtable_idx = 'CREATE UNIQUE INDEX {{BLOB_TABLE}}_indx ON {{BLOB_TABLE}} (k)' - - _t_addrows = 'INSERT INTO {{TABLE}} (iden,prop,strval,intval,tstamp) VALUES ({{IDEN}},{{PROP}},{{STRVAL}},{{INTVAL}},{{TSTAMP}})' - _t_getrows_by_iden = 'SELECT * FROM {{TABLE}} WHERE iden={{IDEN}}' - _t_getrows_by_range = 'SELECT * FROM {{TABLE}} WHERE prop={{PROP}} and intval >= {{MINVALU}} AND intval < {{MAXVALU}} LIMIT {{LIMIT}}' - _t_getrows_by_le = 'SELECT * FROM {{TABLE}} WHERE prop={{PROP}} and intval <= {{VALU}} LIMIT {{LIMIT}}' - _t_getrows_by_ge = 'SELECT * FROM {{TABLE}} WHERE prop={{PROP}} and intval >= {{VALU}} LIMIT {{LIMIT}}' - _t_getrows_by_iden_prop = 'SELECT * FROM {{TABLE}} WHERE iden={{IDEN}} AND prop={{PROP}}' - _t_getrows_by_iden_prop_intval = 'SELECT * FROM {{TABLE}} WHERE iden={{IDEN}} AND prop={{PROP}} AND intval={{VALU}}' - _t_getrows_by_iden_prop_strval = 'SELECT * FROM {{TABLE}} WHERE iden={{IDEN}} AND prop={{PROP}} AND strval={{VALU}}' - - _t_getrows_by_iden_range = 'SELECT * FROM {{TABLE}} WHERE iden >= {{LOWERBOUND}} and iden < {{UPPERBOUND}}' - _t_getiden_max = 'SELECT MAX(iden) FROM {{TABLE}}' - _t_getiden_min = 'SELECT MIN(iden) FROM {{TABLE}}' - - ################################################################################ - _t_blob_set = 'INSERT OR REPLACE INTO {{BLOB_TABLE}} (k, v) VALUES ({{KEY}}, {{VALU}})' - _t_blob_get = 'SELECT v FROM {{BLOB_TABLE}} WHERE k={{KEY}}' - _t_blob_del = 'DELETE FROM {{BLOB_TABLE}} WHERE k={{KEY}}' - _t_blob_get_keys = 'SELECT k FROM {{BLOB_TABLE}}' - - ################################################################################ - _t_getrows_by_prop = 'SELECT * FROM {{TABLE}} WHERE prop={{PROP}} LIMIT {{LIMIT}}' - _t_getrows_by_prop_int = 'SELECT * FROM {{TABLE}} WHERE prop={{PROP}} AND intval={{VALU}} LIMIT {{LIMIT}}' - _t_getrows_by_prop_str = 'SELECT * FROM {{TABLE}} WHERE prop={{PROP}} AND strval={{VALU}} LIMIT {{LIMIT}}' - - _t_getrows_by_prop_wmin = 'SELECT * FROM {{TABLE}} WHERE prop={{PROP}} AND tstamp >= {{MINTIME}} LIMIT {{LIMIT}}' - _t_getrows_by_prop_int_wmin = 'SELECT * FROM {{TABLE}} WHERE prop={{PROP}} AND intval={{VALU}} AND tstamp >= {{MINTIME}} LIMIT {{LIMIT}}' - _t_getrows_by_prop_str_wmin = 'SELECT * FROM {{TABLE}} WHERE prop={{PROP}} AND strval={{VALU}} AND tstamp >= {{MINTIME}} LIMIT {{LIMIT}}' - - _t_getrows_by_prop_wmax = 'SELECT * FROM {{TABLE}} WHERE prop={{PROP}} AND tstamp<{{MAXTIME}} LIMIT {{LIMIT}}' - _t_getrows_by_prop_int_wmax = 'SELECT * FROM {{TABLE}} WHERE prop={{PROP}} AND intval={{VALU}} AND tstamp<{{MAXTIME}} LIMIT {{LIMIT}}' - _t_getrows_by_prop_str_wmax = 'SELECT * FROM {{TABLE}} WHERE prop={{PROP}} AND strval={{VALU}} AND tstamp<{{MAXTIME}} LIMIT {{LIMIT}}' - - _t_getrows_by_prop_wminmax = 'SELECT * FROM {{TABLE}} WHERE prop={{PROP}} AND tstamp>={{MINTIME}} AND tstamp<{{MAXTIME}} LIMIT {{LIMIT}}' - _t_getrows_by_prop_int_wminmax = 'SELECT * FROM {{TABLE}} WHERE prop={{PROP}} AND intval={{VALU}} AND tstamp >= {{MINTIME}} AND tstamp<{{MAXTIME}} LIMIT {{LIMIT}}' - _t_getrows_by_prop_str_wminmax = 'SELECT * FROM {{TABLE}} WHERE prop={{PROP}} AND strval={{VALU}} AND tstamp >= {{MINTIME}} AND tstamp<{{MAXTIME}} LIMIT {{LIMIT}}' - ################################################################################ - _t_getsize_by_prop = 'SELECT COUNT(*) FROM {{TABLE}} WHERE prop={{PROP}} LIMIT {{LIMIT}}' - _t_getsize_by_prop_int = 'SELECT COUNT(*) FROM {{TABLE}} WHERE prop={{PROP}} AND intval={{VALU}} LIMIT {{LIMIT}}' - _t_getsize_by_prop_str = 'SELECT COUNT(*) FROM {{TABLE}} WHERE prop={{PROP}} AND strval={{VALU}} LIMIT {{LIMIT}}' - - _t_getsize_by_prop_wmin = 'SELECT COUNT(*) FROM {{TABLE}} WHERE prop={{PROP}} AND tstamp>={{MINTIME}} LIMIT {{LIMIT}}' - _t_getsize_by_prop_int_wmin = 'SELECT COUNT(*) FROM {{TABLE}} WHERE prop={{PROP}} AND intval={{VALU}} AND tstamp>={{MINTIME}} LIMIT {{LIMIT}}' - _t_getsize_by_prop_str_wmin = 'SELECT COUNT(*) FROM {{TABLE}} WHERE prop={{PROP}} AND strval={{VALU}} AND tstamp>={{MINTIME}} LIMIT {{LIMIT}}' - - _t_getsize_by_prop_wmax = 'SELECT COUNT(*) FROM {{TABLE}} WHERE prop={{PROP}} AND tstamp<{{MAXTIME}} LIMIT {{LIMIT}}' - _t_getsize_by_prop_int_wmax = 'SELECT COUNT(*) FROM {{TABLE}} WHERE prop={{PROP}} AND intval={{VALU}} AND tstamp<{{MAXTIME}} LIMIT {{LIMIT}}' - _t_getsize_by_prop_str_wmax = 'SELECT COUNT(*) FROM {{TABLE}} WHERE prop={{PROP}} AND strval={{VALU}} AND tstamp<{{MAXTIME}} LIMIT {{LIMIT}}' - - _t_getsize_by_prop_wminmax = 'SELECT COUNT(*) FROM {{TABLE}} WHERE prop={{PROP}} AND tstamp>={{MINTIME}} AND tstamp<{{MAXTIME}} LIMIT {{LIMIT}}' - _t_getsize_by_prop_int_wminmax = 'SELECT COUNT(*) FROM {{TABLE}} WHERE prop={{PROP}} AND intval={{VALU}} AND tstamp>={{MINTIME}} AND tstamp<{{MAXTIME}} LIMIT {{LIMIT}}' - _t_getsize_by_prop_str_wminmax = 'SELECT COUNT(*) FROM {{TABLE}} WHERE prop={{PROP}} AND strval={{VALU}} AND tstamp>={{MINTIME}} AND tstamp<{{MAXTIME}} LIMIT {{LIMIT}}' - ################################################################################ - - _t_getsize_by_range = 'SELECT COUNT(*) FROM {{TABLE}} WHERE prop={{PROP}} and intval >= {{MINVALU}} AND intval < {{MAXVALU}} LIMIT {{LIMIT}}' - _t_getsize_by_le = 'SELECT COUNT(*) FROM {{TABLE}} WHERE prop={{PROP}} and intval <= {{VALU}} LIMIT {{LIMIT}}' - _t_getsize_by_ge = 'SELECT COUNT(*) FROM {{TABLE}} WHERE prop={{PROP}} and intval >= {{VALU}} LIMIT {{LIMIT}}' - - _t_delrows_by_iden = 'DELETE FROM {{TABLE}} WHERE iden={{IDEN}}' - _t_delrows_by_iden_prop = 'DELETE FROM {{TABLE}} WHERE iden={{IDEN}} AND prop={{PROP}}' - _t_delrows_by_iden_prop_strval = 'DELETE FROM {{TABLE}} WHERE iden={{IDEN}} AND prop={{PROP}} AND strval={{VALU}}' - _t_delrows_by_iden_prop_intval = 'DELETE FROM {{TABLE}} WHERE iden={{IDEN}} AND prop={{PROP}} AND intval={{VALU}}' - - ################################################################################ - _t_delrows_by_prop = 'DELETE FROM {{TABLE}} WHERE prop={{PROP}}' - _t_delrows_by_prop_int = 'DELETE FROM {{TABLE}} WHERE prop={{PROP}} AND intval={{VALU}}' - _t_delrows_by_prop_str = 'DELETE FROM {{TABLE}} WHERE prop={{PROP}} AND strval={{VALU}}' - - _t_delrows_by_prop_wmin = 'DELETE FROM {{TABLE}} WHERE prop={{PROP}} AND tstamp>={{MINTIME}}' - _t_delrows_by_prop_int_wmin = 'DELETE FROM {{TABLE}} WHERE prop={{PROP}} AND intval={{VALU}} AND tstamp>={{MINTIME}}' - _t_delrows_by_prop_str_wmin = 'DELETE FROM {{TABLE}} WHERE prop={{PROP}} AND strval={{VALU}} AND tstamp>={{MINTIME}}' - - _t_delrows_by_prop_wmax = 'DELETE FROM {{TABLE}} WHERE prop={{PROP}} AND tstamp<{{MAXTIME}}' - _t_delrows_by_prop_int_wmax = 'DELETE FROM {{TABLE}} WHERE prop={{PROP}} AND intval={{VALU}} AND tstamp<{{MAXTIME}}' - _t_delrows_by_prop_str_wmax = 'DELETE FROM {{TABLE}} WHERE prop={{PROP}} AND strval={{VALU}} AND tstamp<{{MAXTIME}}' - - _t_delrows_by_prop_wminmax = 'DELETE FROM {{TABLE}} WHERE prop={{PROP}} AND tstamp>={{MINTIME}} AND tstamp<{{MAXTIME}}' - _t_delrows_by_prop_int_wminmax = 'DELETE FROM {{TABLE}} WHERE prop={{PROP}} AND intval={{VALU}} AND tstamp>={{MINTIME}} AND tstamp<{{MAXTIME}}' - _t_delrows_by_prop_str_wminmax = 'DELETE FROM {{TABLE}} WHERE prop={{PROP}} AND strval={{VALU}} AND tstamp>={{MINTIME}} AND tstamp<{{MAXTIME}}' - - ################################################################################ - _t_getjoin_by_prop = 'SELECT * FROM {{TABLE}} WHERE iden IN (SELECT iden FROM {{TABLE}} WHERE prop={{PROP}} LIMIT {{LIMIT}})' - _t_getjoin_by_prop_int = 'SELECT * FROM {{TABLE}} WHERE iden IN (SELECT iden FROM {{TABLE}} WHERE prop={{PROP}} AND intval={{VALU}} LIMIT {{LIMIT}})' - _t_getjoin_by_prop_str = 'SELECT * FROM {{TABLE}} WHERE iden IN (SELECT iden FROM {{TABLE}} WHERE prop={{PROP}} AND strval={{VALU}} LIMIT {{LIMIT}})' - - _t_getjoin_by_prop_wmin = 'SELECT * FROM {{TABLE}} WHERE iden IN (SELECT iden FROM {{TABLE}} WHERE prop={{PROP}} AND tstamp>={{MINTIME}} LIMIT {{LIMIT}})' - _t_getjoin_by_prop_int_wmin = 'SELECT * FROM {{TABLE}} WHERE iden IN (SELECT iden FROM {{TABLE}} WHERE prop={{PROP}} AND intval={{VALU}} AND tstamp>={{MINTIME}} LIMIT {{LIMIT}})' - _t_getjoin_by_prop_str_wmin = 'SELECT * FROM {{TABLE}} WHERE iden IN (SELECT iden FROM {{TABLE}} WHERE prop={{PROP}} AND strval={{VALU}} AND tstamp>={{MINTIME}} LIMIT {{LIMIT}})' - - _t_getjoin_by_prop_wmax = 'SELECT * FROM {{TABLE}} WHERE iden IN (SELECT iden FROM {{TABLE}} WHERE prop={{PROP}} AND tstamp<{{MAXTIME}} LIMIT {{LIMIT}})' - _t_getjoin_by_prop_int_wmax = 'SELECT * FROM {{TABLE}} WHERE iden IN (SELECT iden FROM {{TABLE}} WHERE prop={{PROP}} AND intval={{VALU}} AND tstamp<{{MAXTIME}} LIMIT {{LIMIT}})' - _t_getjoin_by_prop_str_wmax = 'SELECT * FROM {{TABLE}} WHERE iden IN (SELECT iden FROM {{TABLE}} WHERE prop={{PROP}} AND strval={{VALU}} AND tstamp<{{MAXTIME}} LIMIT {{LIMIT}})' - - _t_getjoin_by_prop_wminmax = 'SELECT * FROM {{TABLE}} WHERE iden IN (SELECT iden FROM {{TABLE}} WHERE prop={{PROP}} AND tstamp>={{MINTIME}} AND tstamp<{{MAXTIME}} LIMIT {{LIMIT}})' - _t_getjoin_by_prop_int_wminmax = 'SELECT * FROM {{TABLE}} WHERE iden IN (SELECT iden FROM {{TABLE}} WHERE prop={{PROP}} AND intval={{VALU}} AND tstamp>={{MINTIME}} AND tstamp<{{MAXTIME}} LIMIT {{LIMIT}})' - _t_getjoin_by_prop_str_wminmax = 'SELECT * FROM {{TABLE}} WHERE iden IN (SELECT iden FROM {{TABLE}} WHERE prop={{PROP}} AND strval={{VALU}} AND tstamp>={{MINTIME}} AND tstamp<{{MAXTIME}} LIMIT {{LIMIT}})' - - _t_getjoin_by_range_int = 'SELECT * FROM {{TABLE}} WHERE iden IN (SELECT iden FROM {{TABLE}} WHERE prop={{PROP}} and {{MINVALU}} <= intval AND intval < {{MAXVALU}} LIMIT {{LIMIT}})' - _t_getjoin_by_range_str = 'SELECT * FROM {{TABLE}} WHERE iden IN (SELECT iden FROM {{TABLE}} WHERE prop={{PROP}} and {{MINVALU}} <= strval AND strval < {{MAXVALU}} LIMIT {{LIMIT}})' - - _t_getjoin_by_le_int = 'SELECT * FROM {{TABLE}} WHERE iden IN (SELECT iden FROM {{TABLE}} WHERE prop={{PROP}} and intval <= {{VALU}} LIMIT {{LIMIT}})' - _t_getjoin_by_ge_int = 'SELECT * FROM {{TABLE}} WHERE iden IN (SELECT iden FROM {{TABLE}} WHERE prop={{PROP}} and intval >= {{VALU}} LIMIT {{LIMIT}})' - - ################################################################################ - _t_deljoin_by_prop = 'DELETE FROM {{TABLE}} WHERE iden IN (SELECT iden FROM {{TABLE}} WHERE prop={{PROP}})' - _t_deljoin_by_prop_int = 'DELETE FROM {{TABLE}} WHERE iden IN (SELECT iden FROM {{TABLE}} WHERE prop={{PROP}} AND intval={{VALU}})' - _t_deljoin_by_prop_str = 'DELETE FROM {{TABLE}} WHERE iden IN (SELECT iden FROM {{TABLE}} WHERE prop={{PROP}} AND strval={{VALU}})' - - _t_deljoin_by_prop_wmin = 'DELETE FROM {{TABLE}} WHERE iden IN (SELECT iden FROM {{TABLE}} WHERE prop={{PROP}} AND tstamp>={{MINTIME}} )' - _t_deljoin_by_prop_int_wmin = 'DELETE FROM {{TABLE}} WHERE iden IN (SELECT iden FROM {{TABLE}} WHERE prop={{PROP}} AND intval={{VALU}} AND tstamp>={{MINTIME}} )' - _t_deljoin_by_prop_str_wmin = 'DELETE FROM {{TABLE}} WHERE iden IN (SELECT iden FROM {{TABLE}} WHERE prop={{PROP}} AND strval={{VALU}} AND tstamp>={{MINTIME}} )' - - _t_deljoin_by_prop_wmax = 'DELETE FROM {{TABLE}} WHERE iden IN (SELECT iden FROM {{TABLE}} WHERE prop={{PROP}} AND tstamp<{{MAXTIME}} )' - _t_deljoin_by_prop_int_wmax = 'DELETE FROM {{TABLE}} WHERE iden IN (SELECT iden FROM {{TABLE}} WHERE prop={{PROP}} AND intval={{VALU}} AND tstamp<{{MAXTIME}} )' - _t_deljoin_by_prop_str_wmax = 'DELETE FROM {{TABLE}} WHERE iden IN (SELECT iden FROM {{TABLE}} WHERE prop={{PROP}} AND strval={{VALU}} AND tstamp<{{MAXTIME}} )' - - _t_deljoin_by_prop_wminmax = 'DELETE FROM {{TABLE}} WHERE iden IN (SELECT iden FROM {{TABLE}} WHERE prop={{PROP}} AND tstamp>={{MINTIME}} AND tstamp < {{MAXTIME}})' - _t_deljoin_by_prop_int_wminmax = 'DELETE FROM {{TABLE}} WHERE iden IN (SELECT iden FROM {{TABLE}} WHERE prop={{PROP}} AND intval={{VALU}} AND tstamp>={{MINTIME}} AND tstamp<{{MAXTIME}})' - _t_deljoin_by_prop_str_wminmax = 'DELETE FROM {{TABLE}} WHERE iden IN (SELECT iden FROM {{TABLE}} WHERE prop={{PROP}} AND strval={{VALU}} AND tstamp>={{MINTIME}} AND tstamp<{{MAXTIME}})' - - ################################################################################ - _t_uprows_by_iden_prop_str = 'UPDATE {{TABLE}} SET strval={{VALU}} WHERE iden={{IDEN}} and prop={{PROP}}' - _t_uprows_by_iden_prop_int = 'UPDATE {{TABLE}} SET intval={{VALU}} WHERE iden={{IDEN}} and prop={{PROP}}' - _t_uprows_by_prop_prop = 'UPDATE {{TABLE}} SET prop={{NEWVALU}} WHERE prop={{OLDVALU}}' - _t_uprows_by_prop_val_int = 'UPDATE {{TABLE}} SET intval={{NEWVALU}} WHERE prop={{PROP}} and intval={{OLDVALU}}' - _t_uprows_by_prop_val_str = 'UPDATE {{TABLE}} SET strval={{NEWVALU}} WHERE prop={{PROP}} and strval={{OLDVALU}}' - - def _initDbInfo(self): - name = self._link[1].get('path')[1:] - if not name: - raise Exception('No Path Specified!') - - if name.find(':') == -1: - name = s_common.genpath(name) - - return {'name': name} - - def getStoreXact(self, size=None, core=None): - return SqlXact(self, size=size, core=core) - - def _getDbLimit(self, limit): - if limit is not None: - return limit - return self.dblim - - def rowsByRange(self, prop, valu, limit=None): - limit = self._getDbLimit(limit) - - q = self._q_getrows_by_range - - minvalu, maxvalu = valu[0], valu[1] - - rows = self.select(q, prop=prop, minvalu=minvalu, maxvalu=maxvalu, limit=limit) - return self._foldTypeCols(rows) - - def rowsByGe(self, prop, valu, limit=None): - limit = self._getDbLimit(limit) - q = self._q_getrows_by_ge - - rows = self.select(q, prop=prop, valu=valu, limit=limit) - return self._foldTypeCols(rows) - - def rowsByLe(self, prop, valu, limit=None): - limit = self._getDbLimit(limit) - q = self._q_getrows_by_le - rows = self.select(q, prop=prop, valu=valu, limit=limit) - return self._foldTypeCols(rows) - - def sizeByRange(self, prop, valu, limit=None): - limit = self._getDbLimit(limit) - q = self._q_getsize_by_range - minvalu, maxvalu = valu[0], valu[1] - return self.select(q, prop=prop, minvalu=minvalu, maxvalu=maxvalu, limit=limit)[0][0] - - def sizeByGe(self, prop, valu, limit=None): - limit = self._getDbLimit(limit) - q = self._q_getsize_by_ge - return self.select(q, prop=prop, valu=valu, limit=limit)[0][0] - - def sizeByLe(self, prop, valu, limit=None): - limit = self._getDbLimit(limit) - q = self._q_getsize_by_le - args = [prop, valu, limit] - return self.select(q, prop=prop, valu=valu, limit=limit)[0][0] - - def _initDbConn(self): - dbinfo = self._initDbInfo() - dbname = dbinfo.get('name') - db = sqlite3.connect(dbname, check_same_thread=False) - db.isolation_level = None - def onfini(): - db.close() - self.onfini(onfini) - return db - - def _getTableName(self): - return 'syncortex' - - def _addVarDecor(self, name): - return ':%s' % (name,) - - def _initCoreStor(self): - self.dbpool = self._link[1].get('dbpool') - if self.dbpool is None: - pool = int(self._link[1].get('pool', 1)) - self.dbpool = DbPool(pool, self._initDbConn) - - table = self._getTableName() - - self._initCorQueries() - self._initCorTables(table) - - def _prepQuery(self, query): - # prep query strings by replacing all %s with table name - # and all ? with db specific variable token - table = self._getTableName() - query = query.replace('{{TABLE}}', table) - - for name in stashre.findall(query): - query = query.replace('{{%s}}' % name, self._addVarDecor(name.lower())) - - return query - - def _prepBlobQuery(self, query): - # prep query strings by replacing all %s with table name - # and all ? with db specific variable token - table = self._getTableName() - table += '_blob' - query = query.replace('{{BLOB_TABLE}}', table) - - for name in stashre.findall(query): - query = query.replace('{{%s}}' % name, self._addVarDecor(name.lower())) - - return query - - def _initCorQueries(self): - self._q_istable = self._prepQuery(self._t_istable) - self._q_inittable = self._prepQuery(self._t_inittable) - self._q_init_blobtable = self._prepBlobQuery(self._t_init_blobtable) - - self._q_init_iden_idx = self._prepQuery(self._t_init_iden_idx) - self._q_init_prop_idx = self._prepQuery(self._t_init_prop_idx) - self._q_init_strval_idx = self._prepQuery(self._t_init_strval_idx) - self._q_init_intval_idx = self._prepQuery(self._t_init_intval_idx) - self._q_init_blobtable_idx = self._prepBlobQuery(self._t_init_blobtable_idx) - - self._q_addrows = self._prepQuery(self._t_addrows) - self._q_getrows_by_iden = self._prepQuery(self._t_getrows_by_iden) - self._q_getrows_by_range = self._prepQuery(self._t_getrows_by_range) - self._q_getrows_by_ge = self._prepQuery(self._t_getrows_by_ge) - self._q_getrows_by_le = self._prepQuery(self._t_getrows_by_le) - self._q_getrows_by_iden_prop = self._prepQuery(self._t_getrows_by_iden_prop) - self._q_getrows_by_iden_prop_intval = self._prepQuery(self._t_getrows_by_iden_prop_intval) - self._q_getrows_by_iden_prop_strval = self._prepQuery(self._t_getrows_by_iden_prop_strval) - - self._q_getrows_by_iden_range = self._prepQuery(self._t_getrows_by_iden_range) - self._q_getiden_max = self._prepQuery(self._t_getiden_max) - self._q_getiden_min = self._prepQuery(self._t_getiden_min) - - self._q_blob_get = self._prepBlobQuery(self._t_blob_get) - self._q_blob_set = self._prepBlobQuery(self._t_blob_set) - self._q_blob_del = self._prepBlobQuery(self._t_blob_del) - self._q_blob_get_keys = self._prepBlobQuery(self._t_blob_get_keys) - - ################################################################################### - self._q_getrows_by_prop = self._prepQuery(self._t_getrows_by_prop) - self._q_getrows_by_prop_wmin = self._prepQuery(self._t_getrows_by_prop_wmin) - self._q_getrows_by_prop_wmax = self._prepQuery(self._t_getrows_by_prop_wmax) - self._q_getrows_by_prop_wminmax = self._prepQuery(self._t_getrows_by_prop_wminmax) - ################################################################################### - self._q_getrows_by_prop_int = self._prepQuery(self._t_getrows_by_prop_int) - self._q_getrows_by_prop_int_wmin = self._prepQuery(self._t_getrows_by_prop_int_wmin) - self._q_getrows_by_prop_int_wmax = self._prepQuery(self._t_getrows_by_prop_int_wmax) - self._q_getrows_by_prop_int_wminmax = self._prepQuery(self._t_getrows_by_prop_int_wminmax) - ################################################################################### - self._q_getrows_by_prop_str = self._prepQuery(self._t_getrows_by_prop_str) - self._q_getrows_by_prop_str_wmin = self._prepQuery(self._t_getrows_by_prop_str_wmin) - self._q_getrows_by_prop_str_wmax = self._prepQuery(self._t_getrows_by_prop_str_wmax) - self._q_getrows_by_prop_str_wminmax = self._prepQuery(self._t_getrows_by_prop_str_wminmax) - ################################################################################### - self._q_getjoin_by_prop = self._prepQuery(self._t_getjoin_by_prop) - self._q_getjoin_by_prop_wmin = self._prepQuery(self._t_getjoin_by_prop_wmin) - self._q_getjoin_by_prop_wmax = self._prepQuery(self._t_getjoin_by_prop_wmax) - self._q_getjoin_by_prop_wminmax = self._prepQuery(self._t_getjoin_by_prop_wminmax) - ################################################################################### - self._q_getjoin_by_prop_int = self._prepQuery(self._t_getjoin_by_prop_int) - self._q_getjoin_by_prop_int_wmin = self._prepQuery(self._t_getjoin_by_prop_int_wmin) - self._q_getjoin_by_prop_int_wmax = self._prepQuery(self._t_getjoin_by_prop_int_wmax) - self._q_getjoin_by_prop_int_wminmax = self._prepQuery(self._t_getjoin_by_prop_int_wminmax) - ################################################################################### - self._q_getjoin_by_prop_str = self._prepQuery(self._t_getjoin_by_prop_str) - self._q_getjoin_by_prop_str_wmin = self._prepQuery(self._t_getjoin_by_prop_str_wmin) - self._q_getjoin_by_prop_str_wmax = self._prepQuery(self._t_getjoin_by_prop_str_wmax) - self._q_getjoin_by_prop_str_wminmax = self._prepQuery(self._t_getjoin_by_prop_str_wminmax) - ################################################################################### - self._q_getsize_by_prop = self._prepQuery(self._t_getsize_by_prop) - self._q_getsize_by_prop_wmin = self._prepQuery(self._t_getsize_by_prop_wmin) - self._q_getsize_by_prop_wmax = self._prepQuery(self._t_getsize_by_prop_wmax) - self._q_getsize_by_prop_wminmax = self._prepQuery(self._t_getsize_by_prop_wminmax) - ################################################################################### - self._q_getsize_by_prop_int = self._prepQuery(self._t_getsize_by_prop_int) - self._q_getsize_by_prop_int_wmin = self._prepQuery(self._t_getsize_by_prop_int_wmin) - self._q_getsize_by_prop_int_wmax = self._prepQuery(self._t_getsize_by_prop_int_wmax) - self._q_getsize_by_prop_int_wminmax = self._prepQuery(self._t_getsize_by_prop_int_wminmax) - ################################################################################### - self._q_getsize_by_prop_str = self._prepQuery(self._t_getsize_by_prop_str) - self._q_getsize_by_prop_str_wmin = self._prepQuery(self._t_getsize_by_prop_str_wmin) - self._q_getsize_by_prop_str_wmax = self._prepQuery(self._t_getsize_by_prop_str_wmax) - self._q_getsize_by_prop_str_wminmax = self._prepQuery(self._t_getsize_by_prop_str_wminmax) - ################################################################################### - - self.qbuild = { - 'rowsbyprop': { - (none_t, none_t, none_t): self._q_getrows_by_prop, - (none_t, int_t, none_t): self._q_getrows_by_prop_wmin, - (none_t, none_t, int_t): self._q_getrows_by_prop_wmax, - (none_t, int_t, int_t): self._q_getrows_by_prop_wminmax, - - (int_t, none_t, none_t): self._q_getrows_by_prop_int, - (int_t, int_t, none_t): self._q_getrows_by_prop_int_wmin, - (int_t, none_t, int_t): self._q_getrows_by_prop_int_wmax, - (int_t, int_t, int_t): self._q_getrows_by_prop_int_wminmax, - - (str_t, none_t, none_t): self._q_getrows_by_prop_str, - (str_t, int_t, none_t): self._q_getrows_by_prop_str_wmin, - (str_t, none_t, int_t): self._q_getrows_by_prop_str_wmax, - (str_t, int_t, int_t): self._q_getrows_by_prop_str_wminmax, - }, - 'joinbyprop': { - (none_t, none_t, none_t): self._q_getjoin_by_prop, - (none_t, int_t, none_t): self._q_getjoin_by_prop_wmin, - (none_t, none_t, int_t): self._q_getjoin_by_prop_wmax, - (none_t, int_t, int_t): self._q_getjoin_by_prop_wminmax, - - (int_t, none_t, none_t): self._q_getjoin_by_prop_int, - (int_t, int_t, none_t): self._q_getjoin_by_prop_int_wmin, - (int_t, none_t, int_t): self._q_getjoin_by_prop_int_wmax, - (int_t, int_t, int_t): self._q_getjoin_by_prop_int_wminmax, - - (str_t, none_t, none_t): self._q_getjoin_by_prop_str, - (str_t, int_t, none_t): self._q_getjoin_by_prop_str_wmin, - (str_t, none_t, int_t): self._q_getjoin_by_prop_str_wmax, - (str_t, int_t, int_t): self._q_getjoin_by_prop_str_wminmax, - }, - 'sizebyprop': { - (none_t, none_t, none_t): self._q_getsize_by_prop, - (none_t, int_t, none_t): self._q_getsize_by_prop_wmin, - (none_t, none_t, int_t): self._q_getsize_by_prop_wmax, - (none_t, int_t, int_t): self._q_getsize_by_prop_wminmax, - - (int_t, none_t, none_t): self._q_getsize_by_prop_int, - (int_t, int_t, none_t): self._q_getsize_by_prop_int_wmin, - (int_t, none_t, int_t): self._q_getsize_by_prop_int_wmax, - (int_t, int_t, int_t): self._q_getsize_by_prop_int_wminmax, - - (str_t, none_t, none_t): self._q_getsize_by_prop_str, - (str_t, int_t, none_t): self._q_getsize_by_prop_str_wmin, - (str_t, none_t, int_t): self._q_getsize_by_prop_str_wmax, - (str_t, int_t, int_t): self._q_getsize_by_prop_str_wminmax, - }, - 'delrowsbyprop': { - (none_t, none_t, none_t): self._prepQuery(self._t_delrows_by_prop), - (none_t, int_t, none_t): self._prepQuery(self._t_delrows_by_prop_wmin), - (none_t, none_t, int_t): self._prepQuery(self._t_delrows_by_prop_wmax), - (none_t, int_t, int_t): self._prepQuery(self._t_delrows_by_prop_wminmax), - - (int_t, none_t, none_t): self._prepQuery(self._t_delrows_by_prop_int), - (int_t, int_t, none_t): self._prepQuery(self._t_delrows_by_prop_int_wmin), - (int_t, none_t, int_t): self._prepQuery(self._t_delrows_by_prop_int_wmax), - (int_t, int_t, int_t): self._prepQuery(self._t_delrows_by_prop_int_wminmax), - - (str_t, none_t, none_t): self._prepQuery(self._t_delrows_by_prop_str), - (str_t, int_t, none_t): self._prepQuery(self._t_delrows_by_prop_str_wmin), - (str_t, none_t, int_t): self._prepQuery(self._t_delrows_by_prop_str_wmax), - (str_t, int_t, int_t): self._prepQuery(self._t_delrows_by_prop_str_wminmax), - }, - 'deljoinbyprop': { - (none_t, none_t, none_t): self._prepQuery(self._t_deljoin_by_prop), - (none_t, int_t, none_t): self._prepQuery(self._t_deljoin_by_prop_wmin), - (none_t, none_t, int_t): self._prepQuery(self._t_deljoin_by_prop_wmax), - (none_t, int_t, int_t): self._prepQuery(self._t_deljoin_by_prop_wminmax), - - (int_t, none_t, none_t): self._prepQuery(self._t_deljoin_by_prop_int), - (int_t, int_t, none_t): self._prepQuery(self._t_deljoin_by_prop_int_wmin), - (int_t, none_t, int_t): self._prepQuery(self._t_deljoin_by_prop_int_wmax), - (int_t, int_t, int_t): self._prepQuery(self._t_deljoin_by_prop_int_wminmax), - - (str_t, none_t, none_t): self._prepQuery(self._t_deljoin_by_prop_str), - (str_t, int_t, none_t): self._prepQuery(self._t_deljoin_by_prop_str_wmin), - (str_t, none_t, int_t): self._prepQuery(self._t_deljoin_by_prop_str_wmax), - (str_t, int_t, int_t): self._prepQuery(self._t_deljoin_by_prop_str_wminmax), - } - } - - self._q_getsize_by_prop = self._prepQuery(self._t_getsize_by_prop) - - self._q_getsize_by_ge = self._prepQuery(self._t_getsize_by_ge) - self._q_getsize_by_le = self._prepQuery(self._t_getsize_by_le) - self._q_getsize_by_range = self._prepQuery(self._t_getsize_by_range) - - self._q_delrows_by_iden = self._prepQuery(self._t_delrows_by_iden) - self._q_delrows_by_iden_prop = self._prepQuery(self._t_delrows_by_iden_prop) - self._q_delrows_by_iden_prop_intval = self._prepQuery(self._t_delrows_by_iden_prop_intval) - self._q_delrows_by_iden_prop_strval = self._prepQuery(self._t_delrows_by_iden_prop_strval) - - self._q_uprows_by_iden_prop_str = self._prepQuery(self._t_uprows_by_iden_prop_str) - self._q_uprows_by_iden_prop_int = self._prepQuery(self._t_uprows_by_iden_prop_int) - self._q_uprows_by_prop_prop = self._prepQuery(self._t_uprows_by_prop_prop) - self._q_uprows_by_prop_val_str = self._prepQuery(self._t_uprows_by_prop_val_str) - self._q_uprows_by_prop_val_int = self._prepQuery(self._t_uprows_by_prop_val_int) - - self._q_getjoin_by_range_str = self._prepQuery(self._t_getjoin_by_range_str) - self._q_getjoin_by_range_int = self._prepQuery(self._t_getjoin_by_range_int) - - self._q_getjoin_by_ge_int = self._prepQuery(self._t_getjoin_by_ge_int) - self._q_getjoin_by_le_int = self._prepQuery(self._t_getjoin_by_le_int) - - def _checkForTable(self, name): - return len(self.select(self._q_istable, name=name)) - - def _initCorTables(self, table): - - revs = [ - (0, self._rev0) - ] - - max_rev = max([rev for rev, func in revs]) - vsn_str = 'syn:core:{}:version'.format(self.getStoreType()) - - if not self._checkForTable(table): - # We are a new cortex, stamp in tables and set - # blob values and move along. - self._initCorTable(table) - self.setBlobValu(vsn_str, max_rev) - return - - # Strap in the blobstore if it doesn't exist - this allows us to have - # a helper which doesn't have to care about queries against a table - # which may not exist. - blob_table = table + '_blob' - if not self._checkForTable(blob_table): - with self.getCoreXact() as xact: - xact.cursor.execute(self._q_init_blobtable) - xact.cursor.execute(self._q_init_blobtable_idx) - - # Apply storage layer revisions - self._revCorVers(revs) - - def _initCorTable(self, name): - with self.getCoreXact() as xact: - xact.cursor.execute(self._q_inittable) - xact.cursor.execute(self._q_init_iden_idx) - xact.cursor.execute(self._q_init_prop_idx) - xact.cursor.execute(self._q_init_strval_idx) - xact.cursor.execute(self._q_init_intval_idx) - xact.cursor.execute(self._q_init_blobtable) - xact.cursor.execute(self._q_init_blobtable_idx) - - def _rev0(self): - # Simple rev0 function stub. - # If we're here, we're clearly an existing cortex and - # we need to have this valu set. - self.setBlobValu('syn:core:created', s_common.now()) - - def _addRows(self, rows): - args = [] - for i, p, v, t in rows: - if isinstance(v, int): - args.append({'iden': i, 'prop': p, 'intval': v, 'strval': None, 'tstamp': t}) - else: - args.append({'iden': i, 'prop': p, 'intval': None, 'strval': v, 'tstamp': t}) - - with self.getCoreXact() as xact: - xact.cursor.executemany(self._q_addrows, args) - - def update(self, q, **args): - with self.getCoreXact() as xact: - xact.cursor.execute(q, args) - return xact.cursor.rowcount - - def select(self, q, **args): - with self.getCoreXact() as xact: - xact.cursor.execute(q, args) - return xact.cursor.fetchall() - - def delete(self, q, **args): - with self.getCoreXact() as xact: - xact.cursor.execute(q, args) - - def _foldTypeCols(self, rows): - ret = [] - for iden, prop, intval, strval, tstamp in rows: - - if intval is not None: - ret.append((iden, prop, intval, tstamp)) - else: - ret.append((iden, prop, strval, tstamp)) - - return ret - - def getRowsById(self, iden): - rows = self.select(self._q_getrows_by_iden, iden=iden) - return self._foldTypeCols(rows) - - def getSizeByProp(self, prop, valu=None, limit=None, mintime=None, maxtime=None): - rows = self._runPropQuery('sizebyprop', prop, valu=valu, limit=limit, mintime=mintime, maxtime=maxtime) - return rows[0][0] - - def getRowsByProp(self, prop, valu=None, limit=None, mintime=None, maxtime=None): - rows = self._runPropQuery('rowsbyprop', prop, valu=valu, limit=limit, mintime=mintime, maxtime=maxtime) - return self._foldTypeCols(rows) - - def _joinsByRange(self, prop, valu, limit=None): - minvalu, maxvalu = valu[0], valu[1] - - limit = self._getDbLimit(limit) - - rows = self.select(self._q_getjoin_by_range_int, prop=prop, minvalu=minvalu, maxvalu=maxvalu, limit=limit) - return self._foldTypeCols(rows) - - def _joinsByLe(self, prop, valu, limit=None): - limit = self._getDbLimit(limit) - - rows = self.select(self._q_getjoin_by_le_int, prop=prop, valu=valu, limit=limit) - return self._foldTypeCols(rows) - - def _joinsByGe(self, prop, valu, limit=None): - limit = self._getDbLimit(limit) - - rows = self.select(self._q_getjoin_by_ge_int, prop=prop, valu=valu, limit=limit) - return self._foldTypeCols(rows) - - def _runPropQuery(self, name, prop, valu=None, limit=None, mintime=None, maxtime=None, meth=None, nolim=False): - limit = self._getDbLimit(limit) - - qkey = (type(valu), type(mintime), type(maxtime)) - - qstr = self.qbuild[name][qkey] - if meth is None: - meth = self.select - - rows = meth(qstr, prop=prop, valu=valu, limit=limit, mintime=mintime, maxtime=maxtime) - - return rows - - def _delRowsByIdProp(self, iden, prop, valu=None): - if valu is None: - return self.delete(self._q_delrows_by_iden_prop, iden=iden, prop=prop) - - if isinstance(valu, int): - return self.delete(self._q_delrows_by_iden_prop_intval, iden=iden, prop=prop, valu=valu) - else: - return self.delete(self._q_delrows_by_iden_prop_strval, iden=iden, prop=prop, valu=valu) - - def getRowsByIdProp(self, iden, prop, valu=None): - if valu is None: - rows = self.select(self._q_getrows_by_iden_prop, iden=iden, prop=prop) - return self._foldTypeCols(rows) - - if isinstance(valu, int): - rows = self.select(self._q_getrows_by_iden_prop_intval, iden=iden, prop=prop, valu=valu) - else: - rows = self.select(self._q_getrows_by_iden_prop_strval, iden=iden, prop=prop, valu=valu) - return self._foldTypeCols(rows) - - def _setRowsByIdProp(self, iden, prop, valu): - if isinstance(valu, int): - count = self.update(self._q_uprows_by_iden_prop_int, iden=iden, prop=prop, valu=valu) - else: - count = self.update(self._q_uprows_by_iden_prop_str, iden=iden, prop=prop, valu=valu) - - if count == 0: - rows = [(iden, prop, valu, s_common.now()), ] - self._addRows(rows) - - def _delRowsById(self, iden): - self.delete(self._q_delrows_by_iden, iden=iden) - - def _delJoinByProp(self, prop, valu=None, mintime=None, maxtime=None): - self._runPropQuery('deljoinbyprop', prop, valu=valu, mintime=mintime, maxtime=maxtime, meth=self.delete, nolim=True) - - def getJoinByProp(self, prop, valu=None, mintime=None, maxtime=None, limit=None): - rows = self._runPropQuery('joinbyprop', prop, valu=valu, limit=limit, mintime=mintime, maxtime=maxtime) - return self._foldTypeCols(rows) - - def _delRowsByProp(self, prop, valu=None, mintime=None, maxtime=None): - self._runPropQuery('delrowsbyprop', prop, valu=valu, mintime=mintime, maxtime=maxtime, meth=self.delete, nolim=True) - - def _updateProperty(self, oldprop, newprop): - return self.update(self._q_uprows_by_prop_prop, oldvalu=oldprop, newvalu=newprop) - - def _updatePropertyValu(self, prop, oldval, newval): - if isinstance(oldval, int): - return self.update(self._q_uprows_by_prop_val_int, oldvalu=oldval, newvalu=newval, prop=prop) - return self.update(self._q_uprows_by_prop_val_str, oldvalu=oldval, newvalu=newval, prop=prop) - - def _genStoreRows(self, **kwargs): - ''' - Generator which yields lists of rows from the DB in tuple form. - - This works by performing range lookups on iden prefixes over the range - of 00000000000000000000000000000000 to ffffffffffffffffffffffffffffffff. - The runtime of this is dependent on the number of rows in the DB, - but is generally the fastest approach to getting rows out of the DB - in a linear time fashion. - - Args: - **kwargs: Optional args. - - Notes: - The following values may be passed in as kwargs in order to - impact the performance of _genStoreRows: - - * slicebytes: The number of bytes to use when generating the iden - prefix values. This defaults to 4. - * incvalu (int): The amount in which to increase the internal - counter used to generate the prefix by on each pass. This value - determines the width of the iden range looked up at a single - time. This defaults to 4. - - The number of queries which are executed by this generator is - equal to (16 ** slicebytes) / incvalu. This defaults to 16384 - queries. - - Returns: - list: List of tuples, each containing an iden, property, value and timestamp. - ''' - slicebytes = kwargs.get('slicebytes', 4) - incvalu = kwargs.get('incvalu', 4) - - # Compute upper and lower bounds up front - lowest_iden = '00000000000000000000000000000000' - highst_iden = 'ffffffffffffffffffffffffffffffff' - - highest_core_iden = self.select(self._q_getiden_max)[0][0] - if not highest_core_iden: - # No rows present at all - return early - return - - fmt = '{{:0={}x}}'.format(slicebytes) - maxv = 16 ** slicebytes - num_queries = int(maxv / incvalu) - q_count = 0 - percentaged = {} - if num_queries > 128: - percentaged = {int((num_queries * i) / 100): i for i in range(100)} - - # Setup lower bound and first upper bound - lowerbound = lowest_iden[:slicebytes] - c = int(lowerbound, 16) + incvalu - upperbound = fmt.format(c) - - logger.info('Dumping rows - slicebytes %s, incvalu %s', slicebytes, incvalu) - logger.info('Will perform %s SQL queries given the slicebytes/incvalu calculations.', num_queries) - while True: - # Check to see if maxv is reached - if c >= maxv: - upperbound = highst_iden - rows = self.select(self._q_getrows_by_iden_range, lowerbound=lowerbound, upperbound=upperbound) - q_count += 1 - completed_rate = percentaged.get(q_count) - if completed_rate: - logger.info('Completed %s%% queries', completed_rate) - if rows: - rows = self._foldTypeCols(rows) - # print(len(rows), lowerbound, upperbound) - yield rows - if c >= maxv: - break - # Increment and continue - c += incvalu - lowerbound = upperbound - upperbound = fmt.format(c) - continue - - # Edge case because _q_getrows_by_iden_range is exclusive on the upper bound. - if highest_core_iden == highst_iden: - rows = self.select(self._q_getrows_by_iden, iden=highest_core_iden) - rows = self._foldTypeCols(rows) - yield rows - - def getStoreType(self): - return 'sqlite' - - def _getBlobValu(self, key): - rows = self._getBlobValuRows(key) - - if not rows: - return None - - if len(rows) > 1: # pragma: no cover - raise s_common.BadCoreStore(store=self.getCoreType(), mesg='Too many blob rows received.') - - return rows[0][0] - - def _getBlobValuRows(self, key): - rows = self.select(self._q_blob_get, key=key) - return rows - - def _prepBlobValu(self, valu): - return sqlite3.Binary(valu) - - def _setBlobValu(self, key, valu): - v = self._prepBlobValu(valu) - self.update(self._q_blob_set, key=key, valu=v) - return valu - - def _hasBlobValu(self, key): - rows = self._getBlobValuRows(key) - - if len(rows) > 1: # pragma: no cover - raise s_common.BadCoreStore(store=self.getCoreType(), mesg='Too many blob rows received.') - - if not rows: - return False - return True - - def _delBlobValu(self, key): - ret = self._getBlobValu(key) - if ret is None: # pragma: no cover - # We should never get here, but if we do, throw an exception. - raise s_common.NoSuchName(name=key, mesg='Cannot delete key which is not present in the blobstore.') - self.delete(self._q_blob_del, key=key) - return ret - - def _getBlobKeys(self): - rows = self.select(self._q_blob_get_keys) - ret = [row[0] for row in rows] - return ret diff --git a/synapse/cores/storage.py b/synapse/cores/storage.py deleted file mode 100644 index 1637f717..00000000 --- a/synapse/cores/storage.py +++ /dev/null @@ -1,1168 +0,0 @@ -#!/usr/bin/env python -# -*- coding: utf-8 -*- -""" -synapse - storage.py -Created on 7/19/17. - -Base storage layer implementation for Synapse Cortex class. -See Storage class for more information. -""" -# Stdlib -import logging -import threading -# Custom Code -import synapse.common as s_common -import synapse.eventbus as s_eventbus - -import synapse.cores.xact as s_xact - -import synapse.lib.config as s_config -import synapse.lib.msgpack as s_msgpack -import synapse.lib.threads as s_threads - -from synapse.common import reqstor - -logger = logging.getLogger(__name__) - -class Storage(s_config.Config): - ''' - Base class for Cortex storage layer backends. - - This implements functionality which is needed for a storage layer to - operate, as well as providing stubs for the storage layer implementations - to override. See the Synapse Documentation for more details. - - Args: - link ((str, dict)): Link tufo containing information for creating the Storage object. This may include path - information, authentication information, etc. - **conf (dict): Additional configible options for the storage layer. - ''' - def __init__(self, - link, - **conf): - s_config.Config.__init__(self) - if conf: - self.setConfOpts(conf) - - ############################################################# - # buses to save/load *raw* save events - ############################################################# - self.savebus = s_eventbus.EventBus() - self.loadbus = s_eventbus.EventBus() - - self._link = link - - self.onfini(self._defaultFiniCoreStor) - - # Various locks - self.xlock = threading.Lock() - # Transactions are a storage-layer concept - self._store_xacts = {} - - # Dicts for storing retrieval methods - self.sizebymeths = {} - self.rowsbymeths = {} - self.joinsbymeths = {} - - # Register handlers for lifting rows/sizes/joins - self.initRowsBy('gt', self.rowsByGt) - self.initRowsBy('lt', self.rowsByLt) - self.initRowsBy('ge', self.rowsByGe) - self.initRowsBy('le', self.rowsByLe) - self.initRowsBy('range', self.rowsByRange) - - self.initSizeBy('ge', self.sizeByGe) - self.initSizeBy('le', self.sizeByLe) - self.initSizeBy('range', self.sizeByRange) - - self.initJoinsBy('ge', self._joinsByGe) - self.initJoinsBy('gt', self._joinsByGt) - self.initJoinsBy('le', self._joinsByLe) - self.initJoinsBy('lt', self._joinsByLt) - self.initJoinsBy('in', self._joinsByIn) - self.initJoinsBy('range', self._joinsByRange) - - # Events for handling savefile loads/saves - self.loadbus.on('core:save:add:rows', self._loadAddRows) - self.loadbus.on('core:save:del:rows:by:iden', self._loadDelRowsById) - self.loadbus.on('core:save:del:rows:by:prop', self._loadDelRowsByProp) - self.loadbus.on('core:save:set:rows:by:idprop', self._loadSetRowsByIdProp) - self.loadbus.on('core:save:del:rows:by:idprop', self._loadDelRowsByIdProp) - self.loadbus.on('core:save:set:up:prop', self._loadUpdateProperty) - self.loadbus.on('core:save:set:up:propvalu', self._loadUpdatePropertyValu) - self.loadbus.on('syn:core:blob:set', self._onSetBlobValu) - self.loadbus.on('syn:core:blob:del', self._onDelBlobValu) - - # Cache blob save mesgs which may be fired during storage layer init - _blobMesgCache = [] - self.savebus.on('syn:core:blob:set', _blobMesgCache.append) - self.savebus.on('syn:core:blob:del', _blobMesgCache.append) - - # Perform storage layer initializations - self._initCoreStor() - - # Disable the blob message caching - self.savebus.off('syn:core:blob:set', _blobMesgCache.append) - self.savebus.off('syn:core:blob:del', _blobMesgCache.append) - - # process a savefile/savefd if we have one (but only one of the two) - savefd = self._link[1].get('savefd') - if savefd is not None: - self.setSaveFd(savefd) - else: - savefile = self._link[1].get('savefile') - if savefile is not None: - savefd = s_common.genfile(savefile) - self.setSaveFd(savefd, fini=True) - - # The storage layer initialization blob events then trump anything - # which may have been set during the savefile load and make sure they - # get saved as well - if 'savefd' in link[1] or 'savefile' in link[1]: - for evtname, info in _blobMesgCache: - self.savebus.fire(evtname, **info) - self.loadbus.fire(evtname, **info) - - if not self.hasBlobValu('syn:core:created'): - self.setBlobValu('syn:core:created', s_common.now()) - - self.onfini(self._finiCoreStore) - - @staticmethod - @s_config.confdef(name='storage') - def _storage_confdefs(): - confdefs = ( - ('rev:storage', {'type': 'bool', 'defval': 1, 'doc': 'Set to 0 to disallow storage version updates'}), - ) - return confdefs - - def setModlVers(self, name, vers): - ''' - Set the version number for a specific model. - - Args: - name (str): The name of the model - vers (int): The new (linear) version number - - Returns: - (None) - - ''' - prop = '.:modl:vers:' + name - with self.getCoreXact() as xact: - - rows = tuple(self.getRowsByProp(prop)) - - if rows: - iden = rows[0][0] - else: - iden = s_common.guid() - - self.fire('modl:vers:set', name=name, vers=vers) - self.setRowsByIdProp(iden, prop, vers) - return vers - - def initRowsBy(self, name, meth): - ''' - Initialize a "rows by" handler for the Storage layer. - - These helpers are used by the Cortex to do special types of lifts. - This allows different Storage layers to implement certain lifts in a optimized fashion. - - Args: - name (str): Named handler to register. - meth (func): Function to register. - - Examples: - Registering a 'woot' handler:: - - def getbywoot(prop,valu,limit=None): - return stuff() # list of rows - - core.initRowsBy('woot',getbywoot) - - Returns: - None - ''' - self.rowsbymeths[name] = meth - - def initSizeBy(self, name, meth): - ''' - Initialize a "size by" handler for the Storage layer. - - These helpers are used by the Cortex to do size by lifts. - This allows different Storage layers to implement certain lifts in a optimized fashion. - - Args: - name (str): Named handler to register. - meth (func): Function to register. - - Examples: - Registering a 'woot' handler:: - - def sizebywoot(prop,valu,limit=None): - return stuff() # size of rows - - core.initSizeBy('woot',meth) - - Returns: - None - ''' - self.sizebymeths[name] = meth - - def initJoinsBy(self, name, meth): - ''' - Initialize a "joins by" handler for the Storage layer. - - These helpers are used by the Cortex to do special types of lifts. - This allows different Storage layers to implement certain lifts in a optimized fashion. - - Args: - name (str): Named handler to register. - meth (func): Function to register. - - Examples: - Registering a 'woot' handler:: - - def getbywoot(prop,valu,limit=None): - return stuff() # list of rows - - core.initJoinsBy('woot',getbywoot) - - Returns: - None - ''' - self.joinsbymeths[name] = meth - - def reqSizeByMeth(self, name): - ''' - Get a handler for a SizeBy lift. - - Args: - name (str): Name of the registered handler to retrieve. - - Returns: - Function used to lift by size. - - Raises: - NoSuchGetBy: If the named handler does not exist. - ''' - meth = self.sizebymeths.get(name) - if meth is None: - raise s_common.NoSuchGetBy(name=name) - return meth - - def reqRowsByMeth(self, name): - ''' - Get a handler for a RowsBy lift. - - Args: - name (str): Name of the registered handler to retrieve. - - Returns: - Function used to lift by rows. - - Raises: - NoSuchGetBy: If the named handler does not exist. - ''' - meth = self.rowsbymeths.get(name) - if meth is None: - raise s_common.NoSuchGetBy(name=name) - return meth - - def reqJoinByMeth(self, name): - ''' - Get a handler for a JoinBy lift. - - Args: - name (str): Name of the registered handler to retrieve. - - Returns: - Function used to lift joined rows. - - Raises: - NoSuchGetBy: If the named handler does not exist. - ''' - meth = self.joinsbymeths.get(name) - if meth is None: - raise s_common.NoSuchGetBy(name=name) - return meth - - def getJoinsBy(self, name, prop, valu, limit=None): - ''' - Retrieve joined rows by either a sepecified method or by falling back - to the rowsBy handlers. Specialized methods will be dependent on the - storage backind and the data indexed. - - Args: - name (str): Name of the method to look up. - prop (str): Prop to filter by. - valu: Value (or values) to pass to the helper method. - limit (int): Limit on the join. Limit meaning may vary by - implementation or named helper. - - Returns: - - ''' - meth = self.joinsbymeths.get(name) - if not meth: - meth = self.reqRowsByMeth(name) - rows = meth(prop, valu, limit=limit) - return self.getRowsByIdens({i for (i, p, v, t) in rows}) - return meth(prop, valu, limit=limit) - - def _defaultFiniCoreStor(self): - ''' - Default fini handler. Do not override. - ''' - # Close out savefile buses - self.savebus.fini() - self.loadbus.fini() - - def getCoreXact(self, size=1000, core=None): - ''' - Get a Storage transaction context for use in a with block. - - This object allows bulk storage layer optimization and proper ordering - of events. - - Args: - size (int): Number of transactions to cache before starting to - execute storage layer events. - core: Cortex to attach to the StoreXact. Required for splice - event support. - - Examples: - Get a StoreXact object and use it:: - - with store.getCoreXact() as xact: - store.dostuff() - - Returns: - s_xact.StoreXact: A StoreXact object for the current thread. - ''' - iden = s_threads.iden() - - xact = self._store_xacts.get(iden) - if xact is not None: - return xact - - xact = self.getStoreXact(size, core=core) - self._store_xacts[iden] = xact - return xact - - def _popCoreXact(self): - # Used by the CoreXact fini routine - self._store_xacts.pop(s_threads.iden(), None) - - def getBlobValu(self, key, default=None): - ''' - Get a value from the blob key/value (KV) store. - - This resides below the tufo storage layer and is Cortex implementation - dependent. In purely memory backed cortexes, this KV store may not be - persistent, even if the tufo-layer is persistent, through something - such as the savefile mechanism. - - Notes: - Data which is retrieved from the KV store is msgpacked, so caveats - with that apply. - - Args: - key (str): Value to retrieve - default: Value returned if the key is not present in the blob store. - - Returns: - The value from the KV store or the default valu (None). - - ''' - buf = self._getBlobValu(key) - if buf is None: - self.log(logging.WARNING, mesg='Requested key not present in blob store, returning default', name=key) - return default - return s_msgpack.un(buf) - - def getBlobKeys(self): - ''' - Get a list of keys in the blob key/value store. - - Returns: - list: List of keys in the store. - ''' - return self._getBlobKeys() - - def setBlobValu(self, key, valu): - ''' - Set a value from the blob key/value (KV) store. - - This resides below the tufo storage layer and is Cortex implementation - dependent. In purely memory backed cortexes, this KV store may not be - persistent, even if the tufo-layer is persistent, through something - such as the savefile mechanism. - - Notes: - Data which is stored in the KV store is msgpacked, so caveats with - that apply. - - Args: - key (str): Name of the value to store. - valu: Value to store in the KV store. - - Returns: - The input value, unchanged. - ''' - buf = s_msgpack.en(valu) - self._setBlobValu(key, buf) - self.savebus.fire('syn:core:blob:set', key=key, valu=buf) - return valu - - def hasBlobValu(self, key): - ''' - Check the blob store to see if a key is present. - - Args: - key (str): Key to check - - Returns: - bool: If the key is present, returns True, otherwise False. - - ''' - return self._hasBlobValu(key) - - def delBlobValu(self, key): - ''' - Remove and return a value from the blob store. - - Args: - key (str): Key to remove. - - Returns: - Content in the blob store for a given key. - - Raises: - NoSuchName: If the key is not present in the store. - ''' - if not self.hasBlobValu(key): - raise s_common.NoSuchName(name=key, mesg='Cannot delete key which is not present in the blobstore.') - buf = self._delBlobValu(key) - self.savebus.fire('syn:core:blob:del', key=key) - return s_msgpack.un(buf) - - def _onSetBlobValu(self, mesg): - key = mesg[1].get('key') - valu = mesg[1].get('valu') - self._setBlobValu(key, valu) - - def _onDelBlobValu(self, mesg): - key = mesg[1].get('key') - self._delBlobValu(key) - - def addSaveLink(self, func): - ''' - Add an event callback to receive save events for this Storage object. - - Args: - func: Function to receive events from the Storage savebus. - - Examples: - Register a function to receive events:: - - def savemesg(mesg): - dostuff() - - core.addSaveLink(savemesg) - - Returns: - None - ''' - self.savebus.link(func) - - def setSaveFd(self, fd, load=True, fini=False): - ''' - Set a save fd for the cortex and optionally load. - - Args: - fd (file): A file like object to save splice events to using msgpack - load (bool): If True, load splice event from fd before starting to record - fini (bool): If True, close() the fd automatically on cortex fini() - - Returns: - (None) - - Example: - - store.setSaveFd(fd) - - NOTE: This save file is allowed to be storage layer specific. - If you want to store cortex splice events, use addSpliceFd() from the Cortex class. - ''' - self._setSaveFd(fd, load, fini) - - def addRows(self, rows): - ''' - Add (iden, prop, valu, time) rows to the Storage object. - - Args: - rows (list): List of rows containing (i, p, v, t) tuples. - - Examples: - Adding a pair of rows to the storage object:: - - import time - tick = now() - - rows = [ - (id1,'baz',30,tick), - (id1,'foo','bar',tick), - ] - - store.addRows(rows) - - Notes: - The general convention for the iden value is a 16 byte hex string, - such as "e190d108bdd30a035a15764313f4c397". These can be made with - the synapse.common.guid() function. While the storage layer is - free to STORE these idens however it sees fit, some tools may - expect that, at the public row level APIs, idens may conform to - that shape. If other types of idens are put into the system, that - could cause unintentional issues. - - This does fire a "core:save:add:rows" event on the savebus to save - the raw rows which are being send to the storage layer. - - Returns: - None - ''' - [reqstor(p, v) for (i, p, v, t) in rows] - self.savebus.fire('core:save:add:rows', rows=rows) - self._addRows(rows) - - def _loadAddRows(self, mesg): - self._addRows(mesg[1].get('rows')) - - def delRowsById(self, iden): - ''' - Delete all the rows for a given iden. - - Args: - iden (str): Iden to delete rows for. - - Examples: - Delete the rows for a given iden:: - - store.delRowsById(iden) - - Notes: - This does fire a "core:save:del:rows:by:iden" event on the savebus - to record which rows were deleted. - - Returns: - None - ''' - self.savebus.fire('core:save:del:rows:by:iden', iden=iden) - self._delRowsById(iden) - - def _loadDelRowsById(self, mesg): - self._delRowsById(mesg[1].get('iden')) - - def delRowsByIdProp(self, iden, prop, valu=None): - ''' - Delete rows with the given combination of iden and prop[=valu]. - - Args: - iden (str): Iden to delete rows for. - prop (str): Prop to delete rows for. - valu: Optional value to check. If present, only delete iden/prop - rows with this value. - - Examples: - Delete all 'foo' rows for a given iden:: - - store.delRowsByIdProp(iden, 'foo') - - Notes: - This does fire a "core:save:del:rows:by:idprop" event on the - savebus to record which rows were deleted. - - Returns: - None - ''' - self.savebus.fire('core:save:del:rows:by:idprop', iden=iden, prop=prop, valu=valu) - return self._delRowsByIdProp(iden, prop, valu=valu) - - def _loadDelRowsByIdProp(self, mesg): - iden = mesg[1].get('iden') - prop = mesg[1].get('prop') - self._delRowsByIdProp(iden, prop) - - def delRowsByProp(self, prop, valu=None, mintime=None, maxtime=None): - ''' - Delete rows with a given property (and optional valu) combination. - - Args: - prop (str): Property to delete. - valu: Optional value to constrain the property deletion by. - mintime (int): Optional, minimum time in which to constrain the - deletion by. - maxtime (int): Optional, maximum time in which to constrain the - deletion by. - - Examples: - Delete all 'foo' rows with the valu=10:: - - store.delRowsByProp('foo',valu=10) - - Notes: - This does fire a "core:save:del:rows:by:prop" event on the - savebus to record which rows were deleted. - - Returns: - None - ''' - self.savebus.fire('core:save:del:rows:by:prop', prop=prop, valu=valu, mintime=mintime, maxtime=maxtime) - return self._delRowsByProp(prop, valu=valu, mintime=mintime, maxtime=maxtime) - - def _loadDelRowsByProp(self, mesg): - prop = mesg[1].get('prop') - valu = mesg[1].get('valu') - mint = mesg[1].get('mintime') - maxt = mesg[1].get('maxtime') - self._delRowsByProp(prop, valu=valu, mintime=mint, maxtime=maxt) - - def setRowsByIdProp(self, iden, prop, valu): - ''' - Update or insert the value of the row(s) with iden and prop to valu. - - Args: - iden (str): Iden to update. - prop (str): Property to update. - valu: Value to set. - - Examples: - Set the foo=10 value on a given iden:: - - - - Notes: - This does fire a "core:save:set:rows:by:idprop" event on the - savebus to save the changes which are being sent to the storage - layer. - - Returns: - None - ''' - reqstor(prop, valu) - self.savebus.fire('core:save:set:rows:by:idprop', iden=iden, prop=prop, valu=valu) - self._setRowsByIdProp(iden, prop, valu) - - def _loadSetRowsByIdProp(self, mesg): - iden = mesg[1].get('iden') - prop = mesg[1].get('prop') - valu = mesg[1].get('valu') - self._setRowsByIdProp(iden, prop, valu) - - # Blobstore interface isn't clean to seperate - def _revCorVers(self, revs): - ''' - Update a the storage layer with a list of (vers,func) tuples. - - Args: - revs ([(int,function)]): List of (vers,func) revision tuples. - - Returns: - (None) - - Each specified function is expected to update the storage layer including data migration. - ''' - if not revs: - return - vsn_str = 'syn:core:{}:version'.format(self.getStoreType()) - curv = self.getBlobValu(vsn_str, -1) - - maxver = revs[-1][0] - if maxver == curv: - return - - name = 'rev:storage' - if not self.getConfOpt(name): - mesg = 'add rev:storage=1 to storage confs to allow storage updates' - self.log(level=logging.WARNING, mesg=mesg, name=name) - logger.warning(mesg) - return - - for vers, func in sorted(revs): - - if vers <= curv: - continue - - # allow the revision function to optionally return the - # revision he jumped to ( to allow initial override ) - mesg = 'Warning - storage layer update occurring. Do not interrupt. [{}] => [{}]'.format(curv, vers) - logger.warning(mesg) - retn = func() - logger.warning('Storage layer update completed.') - if retn is not None: - vers = retn - - curv = self.setBlobValu(vsn_str, vers) - - def genStoreRows(self, **kwargs): - ''' - A generator which yields raw rows from the storage layer. - - Args: - **kwargs: Arguments which are passed to the storage layer - implementation of _genStoreRows(). - - Notes: - Since this is intended for use as a backup mechanism for a Storage - object, it is not to be considered a performant API. - - Yields: - list: List of rows. The number of rows and contents - will vary by implementation. - ''' - for rows in self._genStoreRows(**kwargs): - yield rows - - def _loadUpdateProperty(self, mesg): - oldprop = mesg[1].get('oldprop') - newprop = mesg[1].get('newprop') - self._updateProperty(oldprop, newprop) - - def updateProperty(self, oldprop, newprop): - ''' - Do a wholesale replacement of one property with another property. - - Args: - oldprop (str): The orginal property which is removed. - newprop (str): The property that is updated in place. - - Examples: - Rename "inet:tcp4:port" to "inet:tcp4:foobar":: - - nrows = store.updateProperty('inet:tcp4:port', 'inet:tcp4:foobar') - - Notes: - This API does fire syn:core:store:up:prop:pre and syn:core:store:up:prop:post events with the old - and new property names in it, before and after the property update is done. This API is primarily designed - for assisting with Cortex data migrations. - - Returns: - int: Number of rows updated in place. - ''' - if oldprop == newprop: - raise s_common.BadPropName(mesg='OldProp and newprop cannot be the same.', - oldprop=oldprop, newprop=newprop) - - if not isinstance(newprop, str): - raise s_common.BadPropName(mesg='newprop must be a str', newprop=newprop) - - self.savebus.fire('core:save:set:up:prop', oldprop=oldprop, newprop=newprop) - self.fire('syn:core:store:up:prop:pre', oldprop=oldprop, newprop=newprop) - nrows = self._updateProperty(oldprop, newprop) - self.fire('syn:core:store:up:prop:post', oldprop=oldprop, newprop=newprop, nrows=nrows) - return nrows - - def _loadUpdatePropertyValu(self, mesg): - prop = mesg[1].get('prop') - oldval = mesg[1].get('oldval') - newval = mesg[1].get('newval') - self._updatePropertyValu(prop, oldval, newval) - - def updatePropertyValu(self, prop, oldval, newval): - ''' - Do a wholesale update of one property=valu combination with a new valu. - - Args: - prop (str): Property to select by for updating. - oldval: Old valu to select rows by. - newval: Valu to set the the prop=oldval rows to be. - - Examples: - Rename "tufo:form"="inet:tcp4" to instead be "tufo:form"="inet:tcp4000":: - - nrows = store.updatePropertyValu('tufo:form', 'inet:tcp4', 'inet:tcp4000') - - Notes: - This API does fire syn:core:store:up:propval:pre and syn:core:store:up:propval:post - events with the property, old value and new values in it; before and after update is done. This API is - primarily designed for assisting with Cortex data migrations. The oldval and newval must be of the same - type. - - Returns: - int: Number of rows updated in place. - ''' - if oldval == newval: - raise s_common.SynErr(mesg='oldval and newval cannot be the same.', - oldval=oldval, newval=newval) - - if not isinstance(newval, (int, str)): - raise s_common.BadPropValu(mesg='newval must be a str or int', newval=newval) - if not isinstance(oldval, type(newval)): - raise s_common.BadPropValu(mesg='oldval and newval must be of the same type', - newval=newval, oldval=oldval) - - self.savebus.fire('core:save:set:up:propvalu', prop=prop, oldval=oldval, newval=newval) - self.fire('syn:core:store:up:propval:pre', prop=prop, oldval=oldval, newval=newval) - nrows = self._updatePropertyValu(prop, oldval, newval) - self.fire('syn:core:store:up:propval:post', prop=prop, oldval=oldval, newval=newval, nrows=nrows) - return nrows - - # The following MUST be implemented by the storage layer in order to - # support the basic idea of a cortex - - def _initCoreStor(self): # pragma: no cover - ''' - This is called to initialize any implementation specific resources. - - This is where things like filesystem allocations, DB connections, - et cetera should be stood up. - - If the Storage layer has additional joinBy* handlers which it needs - to register (for the purpose of the Cortex.getTufosBy() function), - it should add them in this function. - - Returns: - None - ''' - raise s_common.NoSuchImpl(name='_initCoreStor', mesg='Store does not implement _initCoreStor') - - def getStoreType(self): # pragma: no cover - ''' - Get the Store type. - - This may be used by the Cortex to determine what its backing - store is. - - Returns: - str: String indicating what the backing storage layer is. - ''' - raise s_common.NoSuchImpl(name='getStoreType', mesg='Store does not implement getStoreType') - - def getStoreXact(self, size=None, core=None): # pragma: no cover - ''' - Get a StoreXact object. - - This is normally called by the getCoreXact function. - - Args: - size (int): Number of events to cache in the transaction before - executing them. - core: Cortex to attach to the transaction. Required for splice - event support. - - Returns: - s_xact.StoreXact: A storage layer specific StoreXact object. - ''' - raise s_common.NoSuchImpl(name='getStoreXact', mesg='Store does not implement getStoreXact') - - def _addRows(self, rows): # pragma: no cover - ''' - This should perform the actual addition of rows to the storage layer. - - Args: - rows (list): Rows to add to the Storage layer. - - Returns: - None - ''' - raise s_common.NoSuchImpl(name='_addRows', mesg='Store does not implement _addRows') - - def getRowsById(self, iden): # pragma: no cover - ''' - Return all the rows for a given iden. - - Args: - iden (str): Iden to get rows from the storage object for. - - Examples: - Getting rows by iden and doing stuff:: - - for row in store.getRowsById(iden): - stuff() - - Getting rows by iden and making a tufo out of them:: - - rows = store.getRowsById(iden) - tufo = (iden, {p: v for (i, p, v, t) in rows}) - - Returns: - list: List of rows for a given iden. - ''' - raise s_common.NoSuchImpl(name='getRowsById', mesg='Store does not implement getRowsById') - - def getRowsByProp(self, prop, valu=None, mintime=None, maxtime=None, limit=None): # pragma: no cover - ''' - Get rows from the Storage layer based on their property value - and other, optional, constraints. - - Args: - prop (str): Property to retrieve rows based on. - valu: Optional, str or integer value to constrain the retrieval by. - mintime (int): Optional, minimum (inclusive) time to constrain the retrieval by. - maxtime (int): Optiona, maximum (exclusive) time to constrain the retrieval by. - limit (int): Maximum number of rows to return. - - Returns: - list: List of (i, p, v, t) rows. - ''' - raise s_common.NoSuchImpl(name='getRowsByProp', mesg='Store does not implement getRowsByProp') - - def getRowsByIdProp(self, iden, prop, valu=None): # pragma: no cover - ''' - Return rows with the given ,. - - Args: - iden (str): Iden to get rows from the storage object for. - prop (str): Prop to constrain the lift by. - valu: Optional, value to constrain the lift by. - - Examples: - Getting rows by iden, prop and doing stuff:: - - for row in core.getRowsByIdProp(iden,'foo:bar'): - dostuff(row) - - Returns: - list: List of rows for a given iden, prop, value comtination. - ''' - raise s_common.NoSuchImpl(name='getRowsByIdProp', mesg='Store does not implement _getRowsBgetRowsByIdPropyIdProp') - - def _delRowsById(self, iden): # pragma: no cover - ''' - Delete rows from the storage layer with a given iden. - ''' - raise s_common.NoSuchImpl(name='_delRowsById', mesg='Store does not implement _delRowsById') - - def _delRowsByProp(self, prop, valu=None, mintime=None, maxtime=None): # pragma: no cover - ''' - Delete rows from the storage layer with a given prop and other, - optional, constraints. - ''' - raise s_common.NoSuchImpl(name='_delRowsByProp', mesg='Store does not implement _delRowsByProp') - - def _delRowsByIdProp(self, iden, prop, valu=None): # pragma: no cover - ''' - Delete rows from the storage layer with a given iden & prop, with an - optional valu constraint. - ''' - raise s_common.NoSuchImpl(name='_delRowsByIdProp', mesg='Store does not implement _delRowsByIdProp') - - def getSizeByProp(self, prop, valu=None, mintime=None, maxtime=None): # pragma: no cover - raise s_common.NoSuchImpl(name='getSizeByProp', mesg='Store does not implement getSizeByProp') - - def rowsByRange(self, prop, valu, limit=None): # pragma: no cover - raise s_common.NoSuchImpl(name='rowsByRange', mesg='Store does not implement rowsByRange') - - def sizeByGe(self, prop, valu, limit=None): # pragma: no cover - raise s_common.NoSuchImpl(name='sizeByGe', mesg='Store does not implement sizeByGe') - - def rowsByGe(self, prop, valu, limit=None): # pragma: no cover - raise s_common.NoSuchImpl(name='rowsByGe', mesg='Store does not implement rowsByGe') - - def sizeByLe(self, prop, valu, limit=None): # pragma: no cover - raise s_common.NoSuchImpl(name='sizeByLe', mesg='Store does not implement sizeByLe') - - def rowsByLe(self, prop, valu, limit=None): # pragma: no cover - raise s_common.NoSuchImpl(name='rowsByLe', mesg='Store does not implement rowsByLe') - - def sizeByRange(self, prop, valu, limit=None): # pragma: no cover - raise s_common.NoSuchImpl(name='sizeByRange', mesg='Store does not implement sizeByRange') - - def _joinsByGe(self, prop, valu, limit=None): # pragma: no cover - raise s_common.NoSuchImpl(name='joinsByGe', mesg='Store does not implement joinsByGe') - - def _joinsByLe(self, prop, valu, limit=None): # pragma: no cover - raise s_common.NoSuchImpl(name='joinsByLe', mesg='Store does not implement joinsByLe') - - def _getBlobValu(self, key): # pragma: no cover - raise s_common.NoSuchImpl(name='_getBlobValu', mesg='Store does not implement _getBlobValu') - - def _setBlobValu(self, key, valu): # pragma: no cover - raise s_common.NoSuchImpl(name='_setBlobValu', mesg='Store does not implement _setBlobValu') - - def _hasBlobValu(self, key): # pragma: no cover - raise s_common.NoSuchImpl(name='_hasBlobValu', mesg='Store does not implement _hasBlobValu') - - def _delBlobValu(self, key): # pragma: no cover - raise s_common.NoSuchImpl(name='_delBlobValu', mesg='Store does not implement _delBlobValu') - - def _getBlobKeys(self): # pragma: no cover - raise s_common.NoSuchImpl(name='_getBlobKeys', mesg='Store does not implement _getBlobKeys') - - def _genStoreRows(self, **kwargs): # pragma: no cover - raise s_common.NoSuchImpl(name='_genStoreRows', mesg='Store does not implement _genStoreRows') - - # The following are default implementations that may be overridden by - # a storage layer for various reasons. - - def _finiCoreStore(self): # pragma: no cover - ''' - This should be overriden to close out any storge layer specifc resources. - ''' - pass - - def getJoinByProp(self, prop, valu=None, mintime=None, maxtime=None, limit=None): - return [row for row in self.genJoinByProp(prop, valu, mintime, maxtime, limit)] - - def genJoinByProp(self, prop, valu=None, mintime=None, maxtime=None, limit=None): - for irow in self.getRowsByProp(prop, valu=valu, mintime=mintime, maxtime=maxtime, limit=limit): - for jrow in self.getRowsById(irow[0]): - yield jrow - - def _joinsByRange(self, prop, valu, limit=None): - ''' - Default implementation of a 'range' handler for joining rows together - by. - - Args: - prop (str): Prop to select joins by. - valu (list): A list (or tuple) of two items. These should be a - minvalu, maxvalue pair. These serve as the bound for doing the - range lift by. - limit (int): Limit on the umber of rows to lift by range. - - Returns: - list: List of (i, p, v, t) rows. - ''' - rows = self.rowsByRange(prop, valu, limit=limit) - return self.getRowsByIdens({i for i, p, v, t in rows}) - - def _joinsByIn(self, prop, valus, limit=None): - ''' - Default implementation of a 'in' handler for joining rows together by. - - Args: - prop (str): Prop to select joins by. - valu (list): A list (or tuple) of values to query a Storage object - for. If a empty list is provided, an empty list is returned. - limit (int): Limit on the number of joined idens to return. - - Returns: - list: List of (i, p, v, t) rows. - - ''' - if len(valus) == 0: - return [] - - if limit is not None and limit < 1: - return [] - - rows = [] - for valu in valus: - _rows = list(self.getJoinByProp(prop, valu, limit=limit)) - rows.extend(_rows) - if limit is not None: - rowidens = {i for (i, p, v, t) in _rows} - limit -= len(rowidens) - if limit <= 0: - break - return rows - - def _setRowsByIdProp(self, iden, prop, valu): - # base case is delete and add - self._delRowsByIdProp(iden, prop) - rows = [(iden, prop, valu, s_common.now())] - self._addRows(rows) - - def _delJoinByProp(self, prop, valu=None, mintime=None, maxtime=None): - rows = self.getRowsByProp(prop, valu=valu, mintime=mintime, maxtime=maxtime) - done = set() - for row in rows: - iden = row[0] - if iden in done: - continue - - self._delRowsById(iden) - done.add(iden) - - def _setSaveFd(self, fd, load=True, fini=False): - ''' - The default implementation of savefile for a Cortex. - This may be overridden by a storage layer. - ''' - if load: - for mesg in s_msgpack.iterfd(fd): - self.loadbus.dist(mesg) - - self.onfini(fd.flush) - if fini: - self.onfini(fd.close) - - def savemesg(mesg): - fd.write(s_msgpack.en(mesg)) - - self.savebus.link(savemesg) - - def getRowsByIdens(self, idens): - ''' - Return all the rows for a given list of idens. - - Args: - idens (list): Idens to get rows from the storage object for. - - Examples: - Getting rows by idens and doing stuff:: - - for row in store.getRowsByIdens(idens): - stuff(row) - - Getting rows by idens and making a tufos out of them:: - - rows = store.getRowsById(iden) - tufos = s_common.rowstotufos(rows) - - Returns: - list: List of rows for the given idens. - ''' - ret = [] - for iden in idens: - rows = self.getRowsById(iden) - ret.extend(rows) - return ret - - def _updateProperty(self, oldprop, newprop): - ''' - Entrypoint for doing in-place property update type operations. - This is called by self.updateProperty() to do the actual property update. - ''' - adds = [] - for i, p, v, t in self.getRowsByProp(oldprop): - adds.append((i, newprop, v, t)) - - if adds: - self._addRows(adds) - self._delRowsByProp(oldprop) - - return len(adds) - - def _updatePropertyValu(self, prop, oldval, newval): - ''' - Entrypoint for doing in-place property value update type operations. - This is called by self.updatePropertyValu() to do the actual property update. - ''' - adds = [] - rows = self.getRowsByProp(prop, oldval) - for i, p, v, t in rows: - adds.append((i, p, newval, t)) - - if adds: - self._delRowsByProp(prop, oldval) - self._addRows(adds) - return len(adds) - - # these helpers allow a storage layer to simply implement - # and register _getTufosByGe and _getTufosByLe - - def rowsByLt(self, prop, valu, limit=None): - return self.rowsByLe(prop, valu - 1, limit=limit) - - def rowsByGt(self, prop, valu, limit=None): - return self.rowsByGe(prop, valu + 1, limit=limit) - - def _joinsByLt(self, prop, valu, limit=None): - return self._joinsByLe(prop, valu - 1, limit=limit) - - def _joinsByGt(self, prop, valu, limit=None): - return self._joinsByGe(prop, valu + 1, limit=limit) diff --git a/synapse/cores/xact.py b/synapse/cores/xact.py deleted file mode 100644 index 6b46c449..00000000 --- a/synapse/cores/xact.py +++ /dev/null @@ -1,181 +0,0 @@ -#!/usr/bin/env python -# -*- coding: utf-8 -*- -""" -synapse - xact.py.py -Created on 8/1/17. - -StoreXact implementation. This is designed to be subclassed by Storage layer implementors. -""" -import time -import logging - -import synapse.common as s_common -import synapse.lib.auth as s_auth - -logger = logging.getLogger(__name__) - -class StoreXact: - ''' - A context manager for a storage "transaction". - ''' - def __init__(self, store, size=None, core=None): - self.store = store - self.size = size - self.core = core - - self.tick = s_common.now() - - self.refs = 0 - self.ready = False - self.exiting = False - - self.events = [] - self.triggers = [] - - def trigger(self, node, name, **info): - ''' - Fire a trigger from the transaction - - Args: - node ((str,dict)): The node for the trigger - name (str): The trigger permission string - info (dict): The trigger permission metadata - ''' - perm = (name, info) - self.triggers.append((node, perm)) - - def spliced(self, act, **info): - ''' - Fire a splice event from the transaction. - - Args: - act (str): Splice action. - **info: Event values. - - Returns: - None - ''' - # Splice events only matter for StoreXacts which have a Cortex - if not self.core: - return - - form = info.get('form') - - pdef = self.core.getPropDef(form) - if pdef is not None and pdef[1].get('local'): - return - - info['time'] = self.tick - info['user'] = s_auth.whoami() - - self.fire('splice', mesg=(act, info)) - - def _coreXactAcquire(self): - # allow implementors to acquire any synchronized resources - pass - - def _coreXactRelease(self): - # allow implementors to release any synchronized resources - pass - - def _coreXactInit(self): - # called once during the first __enter__ - pass - - def _coreXactFini(self): - # called once during the last __exit__ - pass - - def _coreXactBegin(self): # pragma: no cover - raise s_common.NoSuchImpl(name='_coreXactBegin') - - def _coreXactCommit(self): # pragma: no cover - raise s_common.NoSuchImpl(name='_coreXactCommit') - - def acquire(self): - self._coreXactAcquire() - self.store.xlock.acquire() - - def release(self): - self.store.xlock.release() - self._coreXactRelease() - - def begin(self): - self._coreXactBegin() - - def commit(self): - ''' - Commit the results thus far ( without closing / releasing ) - ''' - self._coreXactCommit() - - def fireall(self): - - events = self.events - triggers = self.triggers - - self.events = [] - self.triggers = [] - - [self.store.fire(name, **props) for (name, props) in events] - - if self.core is not None: - for node, perm in triggers: - self.core._fireNodeTrig(node, perm) - - def cedetime(self): - # release and re acquire the form lock to allow others a shot - # give up our scheduler quanta to allow acquire() priority to go - # to any existing waiters.. ( or come back almost immediately if none ) - self.release() - time.sleep(0) - self.acquire() - - def fire(self, name, **props): - ''' - Pend an event to fire when the transaction next commits. - ''' - self.events.append((name, props)) - - if self.size is not None and len(self.events) >= self.size: - self.sync() - self.cedetime() - self.begin() - - def sync(self): - ''' - Loop commiting and syncing events until there are no more - events that need to fire. - ''' - self.commit() - - # odd thing during exit... we need to fire events - # ( possibly causing more xact uses ) until there are - # no more events left to fire. - while self.events: - self.begin() - self.fireall() - self.commit() - - def __enter__(self): - self.refs += 1 - if self.refs == 1 and not self.ready: - self._coreXactInit() - self.acquire() - self.begin() - self.ready = True - - return self - - def __exit__(self, exc, cls, tb): - # FIXME handle rollback on exc not None - self.refs -= 1 - if self.refs > 0 or self.exiting: - return - - self.exiting = True - - self.sync() - self.release() - self._coreXactFini() - self.store._popCoreXact()