diff --git a/changelog/60272.fixed b/changelog/60272.fixed new file mode 100644 index 000000000000..3d3dabec3ff9 --- /dev/null +++ b/changelog/60272.fixed @@ -0,0 +1 @@ +Corrected import statement for redis_cache in cluster mode. diff --git a/changelog/61081.changed b/changelog/61081.changed new file mode 100644 index 000000000000..493870220fb4 --- /dev/null +++ b/changelog/61081.changed @@ -0,0 +1 @@ +Updated mysql cache module to also store updated timestamp, making it consistent with default cache module. Users of mysql cache should ensure database size before updating, as ALTER TABLE will add the timestamp column. diff --git a/salt/cache/__init__.py b/salt/cache/__init__.py index 464493eaa39f..2b13a3642353 100644 --- a/salt/cache/__init__.py +++ b/salt/cache/__init__.py @@ -39,7 +39,7 @@ class Cache: The name of the cache driver to use. This is the name of the python module of the `salt.cache` package. Default is `localfs`. - Terminology. + Terminology: Salt cache subsystem is organized as a tree with nodes and leafs like a filesystem. Cache consists of banks. Each bank can contain a number of @@ -345,5 +345,10 @@ def store(self, bank, key, data): self.storage[(bank, key)] = [time.time(), data] def flush(self, bank, key=None): - self.storage.pop((bank, key), None) + if key is None: + for bank_, key_ in tuple(self.storage): + if bank == bank_: + self.storage.pop((bank_, key_)) + else: + self.storage.pop((bank, key), None) super().flush(bank, key) diff --git a/salt/cache/consul.py b/salt/cache/consul.py index fe5a2662138f..14c08cc4d25a 100644 --- a/salt/cache/consul.py +++ b/salt/cache/consul.py @@ -3,6 +3,10 @@ .. versionadded:: 2016.11.2 +.. versionchanged:: 3005.0 + + Timestamp/cache updated support added. + :depends: python-consul >= 0.2.0 It is up to the system administrator to set up and configure the Consul @@ -30,6 +34,12 @@ consul.consistency: default consul.dc: dc1 consul.verify: True + consul.timestamp_suffix: .tstamp # Added in 3005.0 + +In order to bring the cache APIs into conformity, in 3005.0 timestamp +information gets stored as a separate ``{key}.tstamp`` key/value. If your +existing functionality depends on being able to store normal keys with the +``.tstamp`` suffix, override the ``consul.timestamp_suffix`` default config. Related docs could be found in the `python-consul documentation`_. @@ -47,6 +57,7 @@ """ import logging +import time import salt.payload from salt.exceptions import SaltCacheError @@ -61,6 +72,7 @@ log = logging.getLogger(__name__) api = None +_tstamp_suffix = ".tstamp" # Define the module's virtual name @@ -90,7 +102,8 @@ def __virtual__(): } try: - global api + global api, _tstamp_suffix + _tstamp_suffix = __opts__.get("consul.timestamp_suffix", _tstamp_suffix) api = consul.Consul(**consul_kwargs) except AttributeError: return ( @@ -107,9 +120,12 @@ def store(bank, key, data): Store a key value. """ c_key = "{}/{}".format(bank, key) + tstamp_key = "{}/{}{}".format(bank, key, _tstamp_suffix) + try: c_data = salt.payload.dumps(data) api.kv.put(c_key, c_data) + api.kv.put(tstamp_key, salt.payload.dumps(int(time.time()))) except Exception as exc: # pylint: disable=broad-except raise SaltCacheError( "There was an error writing the key, {}: {}".format(c_key, exc) @@ -138,9 +154,13 @@ def flush(bank, key=None): """ if key is None: c_key = bank + tstamp_key = None else: c_key = "{}/{}".format(bank, key) + tstamp_key = "{}/{}{}".format(bank, key, _tstamp_suffix) try: + if tstamp_key: + api.kv.delete(tstamp_key) return api.kv.delete(c_key, recurse=key is None) except Exception as exc: # pylint: disable=broad-except raise SaltCacheError( @@ -166,7 +186,7 @@ def list_(bank): out = set() for key in keys: out.add(key[len(bank) + 1 :].rstrip("/")) - keys = list(out) + keys = [o for o in out if not o.endswith(_tstamp_suffix)] return keys @@ -174,14 +194,28 @@ def contains(bank, key): """ Checks if the specified bank contains the specified key. """ - if key is None: - return True # any key could be a branch and a leaf at the same time in Consul - else: - try: - c_key = "{}/{}".format(bank, key) - _, value = api.kv.get(c_key) - except Exception as exc: # pylint: disable=broad-except - raise SaltCacheError( - "There was an error getting the key, {}: {}".format(c_key, exc) - ) - return value is not None + try: + c_key = "{}/{}".format(bank, key or "") + _, value = api.kv.get(c_key, keys=True) + except Exception as exc: # pylint: disable=broad-except + raise SaltCacheError( + "There was an error getting the key, {}: {}".format(c_key, exc) + ) + return value is not None + + +def updated(bank, key): + """ + Return the Unix Epoch timestamp of when the key was last updated. Return + None if key is not found. + """ + c_key = "{}/{}{}".format(bank, key, _tstamp_suffix) + try: + _, value = api.kv.get(c_key) + if value is None: + return None + return salt.payload.loads(value["Value"]) + except Exception as exc: # pylint: disable=broad-except + raise SaltCacheError( + "There was an error reading the key, {}: {}".format(c_key, exc) + ) diff --git a/salt/cache/etcd_cache.py b/salt/cache/etcd_cache.py index aa584bfc9527..0ebbd88aa52a 100644 --- a/salt/cache/etcd_cache.py +++ b/salt/cache/etcd_cache.py @@ -2,6 +2,7 @@ Minion data cache plugin for Etcd key/value data store. .. versionadded:: 2018.3.0 +.. versionchanged:: 3005 It is up to the system administrator to set up and configure the Etcd infrastructure. All is needed for this plugin is a working Etcd agent @@ -42,6 +43,9 @@ cache: etcd +In Phosphorus, ls/list was changed to always return the final name in the path. +This should only make a difference if you were directly using ``ls`` on paths +that were more or less nested than, for example: ``1/2/3/4``. .. _`Etcd documentation`: https://github.com/coreos/etcd .. _`python-etcd documentation`: http://python-etcd.readthedocs.io/en/latest/ @@ -50,6 +54,7 @@ import base64 import logging +import time import salt.payload from salt.exceptions import SaltCacheError @@ -72,6 +77,7 @@ log = logging.getLogger(__name__) client = None path_prefix = None +_tstamp_suffix = ".tstamp" # Module properties @@ -94,7 +100,7 @@ def __virtual__(): def _init_client(): """Setup client and init datastore.""" - global client, path_prefix + global client, path_prefix, _tstamp_suffix if client is not None: return @@ -111,6 +117,7 @@ def _init_client(): "cert": __opts__.get("etcd.cert", None), "ca_cert": __opts__.get("etcd.ca_cert", None), } + _tstamp_suffix = __opts__.get("etcd.timestamp_suffix", _tstamp_suffix) path_prefix = __opts__.get("etcd.path_prefix", _DEFAULT_PATH_PREFIX) if path_prefix != "": path_prefix = "/{}".format(path_prefix.strip("/")) @@ -129,9 +136,11 @@ def store(bank, key, data): """ _init_client() etcd_key = "{}/{}/{}".format(path_prefix, bank, key) + etcd_tstamp_key = "{}/{}/{}".format(path_prefix, bank, key + _tstamp_suffix) try: value = salt.payload.dumps(data) client.write(etcd_key, base64.b64encode(value)) + client.write(etcd_tstamp_key, int(time.time())) except Exception as exc: # pylint: disable=broad-except raise SaltCacheError( "There was an error writing the key, {}: {}".format(etcd_key, exc) @@ -162,13 +171,17 @@ def flush(bank, key=None): _init_client() if key is None: etcd_key = "{}/{}".format(path_prefix, bank) + tstamp_key = None else: etcd_key = "{}/{}/{}".format(path_prefix, bank, key) + tstamp_key = "{}/{}/{}".format(path_prefix, bank, key + _tstamp_suffix) try: client.read(etcd_key) except etcd.EtcdKeyNotFound: return # nothing to flush try: + if tstamp_key: + client.delete(tstamp_key) client.delete(etcd_key, recursive=True) except Exception as exc: # pylint: disable=broad-except raise SaltCacheError( @@ -182,7 +195,10 @@ def _walk(r): r: etcd.EtcdResult """ if not r.dir: - return [r.key.split("/", 3)[3]] + if r.key.endswith(_tstamp_suffix): + return [] + else: + return [r.key.rsplit("/", 1)[-1]] keys = [] for c in client.read(r.key).children: @@ -199,10 +215,12 @@ def ls(bank): path = "{}/{}".format(path_prefix, bank) try: return _walk(client.read(path)) + except etcd.EtcdKeyNotFound: + return [] except Exception as exc: # pylint: disable=broad-except raise SaltCacheError( 'There was an error getting the key "{}": {}'.format(bank, exc) - ) + ) from exc def contains(bank, key): @@ -210,14 +228,31 @@ def contains(bank, key): Checks if the specified bank contains the specified key. """ _init_client() - etcd_key = "{}/{}/{}".format(path_prefix, bank, key) + etcd_key = "{}/{}/{}".format(path_prefix, bank, key or "") try: r = client.read(etcd_key) - # return True for keys, not dirs - return r.dir is False + # return True for keys, not dirs, unless key is None + return r.dir if key is None else r.dir is False except etcd.EtcdKeyNotFound: return False except Exception as exc: # pylint: disable=broad-except raise SaltCacheError( "There was an error getting the key, {}: {}".format(etcd_key, exc) ) + + +def updated(bank, key): + """ + Return Unix Epoch based timestamp of when the bank/key was updated. + """ + _init_client() + tstamp_key = "{}/{}/{}".format(path_prefix, bank, key + _tstamp_suffix) + try: + value = client.read(tstamp_key).value + return int(value) + except etcd.EtcdKeyNotFound: + return None + except Exception as exc: # pylint: disable=broad-except + raise SaltCacheError( + "There was an error reading the key, {}: {}".format(tstamp_key, exc) + ) diff --git a/salt/cache/mysql_cache.py b/salt/cache/mysql_cache.py index aad94514ea2d..6b723fb72ec7 100644 --- a/salt/cache/mysql_cache.py +++ b/salt/cache/mysql_cache.py @@ -6,15 +6,21 @@ It is up to the system administrator to set up and configure the MySQL infrastructure. All is needed for this plugin is a working MySQL server. -The module requires the `salt_cache` database to exists but creates its own -table if needed. The keys are indexed using the `bank` and `etcd_key` columns. +.. warning:: + + The mysql.database and mysql.table_name will be directly added into certain + queries. Salt treats these as trusted input. + +The module requires the database (default ``salt_cache``) to exist but creates +its own table if needed. The keys are indexed using the ``bank`` and +``etcd_key`` columns. To enable this cache plugin, the master will need the python client for MySQL installed. This can be easily installed with pip: .. code-block:: bash - pip install python-mysql + pip install pymysql Optionally, depending on the MySQL agent configuration, the following values could be set in the master config. These are the defaults: @@ -28,7 +34,7 @@ mysql.database: salt_cache mysql.table_name: cache -Related docs could be found in the `python-mysql documentation`_. +Related docs can be found in the `python-mysql documentation`_. To use the mysql as a minion data cache backend, set the master ``cache`` config value to ``mysql``: @@ -90,17 +96,27 @@ def __virtual__(): return bool(MySQLdb), "No python mysql client installed." if MySQLdb is None else "" +def force_reconnect(): + """ + Force a reconnection to the MySQL database, by removing the client from + Salt's __context__. + """ + __context__.pop("mysql_client", None) + + def run_query(conn, query, args=None, retries=3): """ - Get a cursor and run a query. Reconnect up to `retries` times if + Get a cursor and run a query. Reconnect up to ``retries`` times if needed. Returns: cursor, affected rows counter Raises: SaltCacheError, AttributeError, OperationalError """ + if conn is None: + conn = __context__.get("mysql_client") try: cur = conn.cursor() - if args is None or args == {}: + if not args: log.debug("Doing query: %s", query) out = cur.execute(query) else: @@ -119,12 +135,19 @@ def run_query(conn, query, args=None, retries=3): log.info("mysql_cache: recreating db connection due to: %r", e) __context__["mysql_client"] = MySQLdb.connect(**__context__["mysql_kwargs"]) return run_query( - __context__.get("mysql_client"), query, args=args, retries=(retries - 1) + conn=__context__.get("mysql_client"), + query=query, + args=args, + retries=(retries - 1), ) except Exception as e: # pylint: disable=broad-except if len(query) > 150: query = query[:150] + "<...>" - raise SaltCacheError("Error running {}: {}".format(query, e)) + raise SaltCacheError( + "Error running {}{}: {}".format( + query, "- args: {}".format(args) if args else "", e + ) + ) def _create_table(): @@ -134,20 +157,53 @@ def _create_table(): # Explicitly check if the table already exists as the library logs a # warning on CREATE TABLE query = """SELECT COUNT(TABLE_NAME) FROM information_schema.tables - WHERE table_schema = '{}' AND table_name = '{}'""".format( - __context__["mysql_kwargs"]["db"], - __context__["mysql_table_name"], + WHERE table_schema = %s AND table_name = %s""" + cur, _ = run_query( + __context__.get("mysql_client"), + query, + args=(__context__["mysql_kwargs"]["db"], __context__["mysql_table_name"]), ) - cur, _ = run_query(__context__.get("mysql_client"), query) r = cur.fetchone() cur.close() if r[0] == 1: - return + query = """ + SELECT COUNT(TABLE_NAME) + FROM + information_schema.columns + WHERE + table_schema = %s + AND table_name = %s + AND column_name = 'last_update' + """ + cur, _ = run_query( + __context__["mysql_client"], + query, + args=(__context__["mysql_kwargs"]["db"], __context__["mysql_table_name"]), + ) + r = cur.fetchone() + cur.close() + if r[0] == 1: + return + else: + query = """ + ALTER TABLE {}.{} + ADD COLUMN last_update TIMESTAMP NOT NULL + DEFAULT CURRENT_TIMESTAMP + ON UPDATE CURRENT_TIMESTAMP + """.format( + __context__["mysql_kwargs"]["db"], __context__["mysql_table_name"] + ) + cur, _ = run_query(__context__["mysql_client"], query) + cur.close() + return query = """CREATE TABLE IF NOT EXISTS {} ( bank CHAR(255), etcd_key CHAR(255), data MEDIUMBLOB, + last_update TIMESTAMP NOT NULL + DEFAULT CURRENT_TIMESTAMP + ON UPDATE CURRENT_TIMESTAMP, PRIMARY KEY(bank, etcd_key) );""".format( __context__["mysql_table_name"] @@ -218,10 +274,10 @@ def fetch(bank, key): Fetch a key value. """ _init_client() - query = "SELECT data FROM {} WHERE bank='{}' AND etcd_key='{}'".format( - __context__["mysql_table_name"], bank, key + query = "SELECT data FROM {} WHERE bank=%s AND etcd_key=%s".format( + __context__["mysql_table_name"] ) - cur, _ = run_query(__context__.get("mysql_client"), query) + cur, _ = run_query(__context__.get("mysql_client"), query, args=(bank, key)) r = cur.fetchone() cur.close() if r is None: @@ -234,13 +290,14 @@ def flush(bank, key=None): Remove the key from the cache bank with all the key content. """ _init_client() - query = "DELETE FROM {} WHERE bank='{}'".format( - __context__["mysql_table_name"], bank - ) - if key is not None: - query += " AND etcd_key='{}'".format(key) - - cur, _ = run_query(__context__.get("mysql_client"), query) + query = "DELETE FROM {} WHERE bank=%s".format(__context__["mysql_table_name"]) + if key is None: + data = (bank,) + else: + data = (bank, key) + query += " AND etcd_key=%s" + + cur, _ = run_query(__context__["mysql_client"], query, args=data) cur.close() @@ -250,10 +307,10 @@ def ls(bank): bank. """ _init_client() - query = "SELECT etcd_key FROM {} WHERE bank='{}'".format( - __context__["mysql_table_name"], bank + query = "SELECT etcd_key FROM {} WHERE bank=%s".format( + __context__["mysql_table_name"] ) - cur, _ = run_query(__context__.get("mysql_client"), query) + cur, _ = run_query(__context__.get("mysql_client"), query, args=(bank,)) out = [row[0] for row in cur.fetchall()] cur.close() return out @@ -264,10 +321,34 @@ def contains(bank, key): Checks if the specified bank contains the specified key. """ _init_client() - query = "SELECT COUNT(data) FROM {} WHERE bank='{}' AND etcd_key='{}'".format( - __context__["mysql_table_name"], bank, key - ) - cur, _ = run_query(__context__.get("mysql_client"), query) + if key is None: + data = (bank,) + query = "SELECT COUNT(data) FROM {} WHERE bank=%s".format( + __context__["mysql_table_name"] + ) + else: + data = (bank, key) + query = "SELECT COUNT(data) FROM {} WHERE bank=%s AND etcd_key=%s".format( + __context__["mysql_table_name"] + ) + cur, _ = run_query(__context__.get("mysql_client"), query, args=data) r = cur.fetchone() cur.close() return r[0] == 1 + + +def updated(bank, key): + """ + Return the integer Unix epoch update timestamp of the specified bank and + key. + """ + _init_client() + query = ( + "SELECT UNIX_TIMESTAMP(last_update) FROM {} WHERE bank=%s " + "AND etcd_key=%s".format(__context__["mysql_table_name"]) + ) + data = (bank, key) + cur, _ = run_query(__context__["mysql_client"], query=query, args=data) + r = cur.fetchone() + cur.close() + return int(r[0]) if r else r diff --git a/salt/cache/redis_cache.py b/salt/cache/redis_cache.py index 166ab91c131d..668ce84e3aba 100644 --- a/salt/cache/redis_cache.py +++ b/salt/cache/redis_cache.py @@ -5,6 +5,7 @@ Redis plugin for the Salt caching subsystem. .. versionadded:: 2017.7.0 +.. versionchanged:: 3005 As Redis provides a simple mechanism for very fast key-value store, in order to provide the necessary features for the Salt caching subsystem, the following @@ -36,11 +37,13 @@ 127.0.0.1:6379> GET $KEY_root-bank/sub-bank/leaf-bank/my-key "my-value" -There are three types of keys stored: -- ``$BANK_*`` is a Redis SET containing the list of banks under the current bank -- ``$BANKEYS_*`` is a Redis SET containing the list of keys under the current bank -- ``$KEY_*`` keeps the value of the key +There are four types of keys stored: + +- ``$BANK_*`` is a Redis SET containing the list of banks under the current bank. +- ``$BANKEYS_*`` is a Redis SET containing the list of keys under the current bank. +- ``$KEY_*`` keeps the value of the key. +- ``$TSTAMP_*`` stores the last updated timestamp of the key. These prefixes and the separator can be adjusted using the configuration options: @@ -48,12 +51,17 @@ The prefix used for the name of the Redis key storing the list of sub-banks. bank_keys_prefix: ``$BANKEYS`` - The prefix used for the name of the Redis keyt storing the list of keys under a certain bank. + The prefix used for the name of the Redis key storing the list of keys under a certain bank. key_prefix: ``$KEY`` The prefix of the Redis keys having the value of the keys to be cached under a certain bank. +timestamp_prefix: ``$TSTAMP`` + The prefix for the last modified timestamp for keys. + + .. versionadded:: 3005 + separator: ``_`` The separator between the prefix and the key body. @@ -114,6 +122,7 @@ cache.redis.bank_prefix: #BANK cache.redis.bank_keys_prefix: #BANKEYS cache.redis.key_prefix: #KEY + cache.redis.timestamp_prefix: #TICKS cache.redis.separator: '@' Cluster Configuration Example: @@ -136,9 +145,12 @@ """ +import itertools import logging +import time import salt.payload +import salt.utils.stringutils from salt.exceptions import SaltCacheError # Import salt @@ -153,10 +165,7 @@ HAS_REDIS = False try: - # pylint: disable=no-name-in-module - from rediscluster import StrictRedisCluster - - # pylint: enable=no-name-in-module + from rediscluster import RedisCluster # pylint: disable=no-name-in-module HAS_REDIS_CLUSTER = True except ImportError: @@ -174,6 +183,7 @@ _BANK_PREFIX = "$BANK" _KEY_PREFIX = "$KEY" +_TIMESTAMP_PREFIX = "$TSTAMP" _BANK_KEYS_PREFIX = "$BANKEYS" _SEPARATOR = "_" @@ -203,6 +213,9 @@ def __virtual__(): def init_kwargs(kwargs): + """ + Effectively a noop. Return an empty dictionary. + """ return {} @@ -236,7 +249,7 @@ def _get_redis_server(opts=None): opts = _get_redis_cache_opts() if opts["cluster_mode"]: - REDIS_SERVER = StrictRedisCluster( + REDIS_SERVER = RedisCluster( startup_nodes=opts["startup_nodes"], skip_full_coverage_check=opts["skip_full_coverage_check"], ) @@ -262,6 +275,9 @@ def _get_redis_keys_opts(): ), "key_prefix": __opts__.get("cache.redis.key_prefix", _KEY_PREFIX), "separator": __opts__.get("cache.redis.separator", _SEPARATOR), + "timestamp_prefix": __opts__.get( + "cache.redis.timestamp_prefix", _TIMESTAMP_PREFIX + ), } @@ -275,13 +291,25 @@ def _get_bank_redis_key(bank): ) +def _get_timestamp_key(bank, key): + opts = _get_redis_keys_opts() + return "{}{}{}/{}".format( + opts["timestamp_prefix"], opts["separator"], {bank}, {key} + ) + # Use this line when we can use modern python + # return f"{opts['timestamp_prefix']}{opts['separator']}{bank}/{key}" + + def _get_key_redis_key(bank, key): """ Return the Redis key given the bank name and the key name. """ opts = _get_redis_keys_opts() return "{prefix}{separator}{bank}/{key}".format( - prefix=opts["key_prefix"], separator=opts["separator"], bank=bank, key=key + prefix=opts["key_prefix"], + separator=opts["separator"], + bank=bank, + key=salt.utils.stringutils.to_str(key), ) @@ -302,16 +330,14 @@ def _build_bank_hier(bank, redis_pipe): It's using the Redis pipeline, so there will be only one interaction with the remote server. """ - bank_list = bank.split("/") - parent_bank_path = bank_list[0] - for bank_name in bank_list[1:]: - prev_bank_redis_key = _get_bank_redis_key(parent_bank_path) - redis_pipe.sadd(prev_bank_redis_key, bank_name) - log.debug("Adding %s to %s", bank_name, prev_bank_redis_key) - parent_bank_path = "{curr_path}/{bank_name}".format( - curr_path=parent_bank_path, bank_name=bank_name - ) # this becomes the parent of the next - return True + + def joinbanks(*banks): + return "/".join(banks) + + for bank_path in itertools.accumulate(bank.split("/"), joinbanks): + bank_set = _get_bank_redis_key(bank_path) + log.debug("Adding %s to %s", bank, bank_set) + redis_pipe.sadd(bank_set, ".") def _get_banks_to_remove(redis_server, bank, path=""): @@ -355,6 +381,11 @@ def store(bank, key, data): redis_pipe.set(redis_key, value) log.debug("Setting the value for %s under %s (%s)", key, bank, redis_key) redis_pipe.sadd(redis_bank_keys, key) + # localfs cache truncates the timestamp to int only. We'll do the same. + redis_pipe.set( + _get_timestamp_key(bank=bank, key=key), + salt.payload.dumps(int(time.time())), + ) log.debug("Adding %s to %s", key, redis_bank_keys) redis_pipe.execute() except (RedisConnectionError, RedisResponseError) as rerr: @@ -442,6 +473,8 @@ def flush(bank, key=None): for key in bank_keys: redis_key = _get_key_redis_key(bank_path, key) redis_pipe.delete(redis_key) # kill 'em all! + timestamp_key = _get_timestamp_key(bank=bank_path, key=key.decode()) + redis_pipe.delete(timestamp_key) log.debug( "Removing the key %s under the %s bank (%s)", key, @@ -464,6 +497,8 @@ def flush(bank, key=None): else: redis_key = _get_key_redis_key(bank, key) redis_pipe.delete(redis_key) # delete the key cached + timestamp_key = _get_timestamp_key(bank=bank, key=key) + redis_pipe.delete(timestamp_key) log.debug("Removing the key %s under the %s bank (%s)", key, bank, redis_key) bank_keys_redis_key = _get_bank_keys_redis_key(bank) redis_pipe.srem(bank_keys_redis_key, key) @@ -490,7 +525,7 @@ def list_(bank): Lists entries stored in the specified bank. """ redis_server = _get_redis_server() - bank_redis_key = _get_bank_redis_key(bank) + bank_redis_key = _get_bank_keys_redis_key(bank) try: banks = redis_server.smembers(bank_redis_key) except (RedisConnectionError, RedisResponseError) as rerr: @@ -501,7 +536,7 @@ def list_(bank): raise SaltCacheError(mesg) if not banks: return [] - return list(banks) + return [bank.decode() for bank in banks if bank != b"."] def contains(bank, key): @@ -509,12 +544,31 @@ def contains(bank, key): Checks if the specified bank contains the specified key. """ redis_server = _get_redis_server() - bank_redis_key = _get_bank_redis_key(bank) + bank_redis_key = _get_bank_keys_redis_key(bank) try: - return redis_server.sismember(bank_redis_key, key) + if key is None: + return ( + salt.utils.stringutils.to_str(redis_server.type(bank_redis_key)) + != "none" + ) + else: + return redis_server.sismember(bank_redis_key, key) except (RedisConnectionError, RedisResponseError) as rerr: mesg = "Cannot retrieve the Redis cache key {rkey}: {rerr}".format( rkey=bank_redis_key, rerr=rerr ) log.error(mesg) raise SaltCacheError(mesg) + + +def updated(bank, key): + """ + Return the Unix Epoch timestamp of when the key was last updated. Return + None if key is not found. + """ + redis_server = _get_redis_server() + timestamp_key = _get_timestamp_key(bank=bank, key=key) + value = redis_server.get(timestamp_key) + if value is not None: + value = salt.payload.loads(value) + return value diff --git a/tasks/docstrings.py b/tasks/docstrings.py index 23c549cafca3..6e9cdd247277 100644 --- a/tasks/docstrings.py +++ b/tasks/docstrings.py @@ -28,7 +28,6 @@ "salt/beacons/salt_monitor.py": ["validate", "beacon"], "salt/beacons/watchdog.py": ["close", "to_salt_event"], "salt/cache/localfs.py": ["get_storage_id", "init_kwargs"], - "salt/cache/redis_cache.py": ["init_kwargs"], "salt/cloud/clouds/clc.py": [ "get_creds", "get_configured_provider", diff --git a/tests/pytests/functional/cache/__init__.py b/tests/pytests/functional/cache/__init__.py new file mode 100644 index 000000000000..e69de29bb2d1 diff --git a/tests/pytests/functional/cache/conftest.py b/tests/pytests/functional/cache/conftest.py new file mode 100644 index 000000000000..0e4d385ce17c --- /dev/null +++ b/tests/pytests/functional/cache/conftest.py @@ -0,0 +1,11 @@ +import pytest + + +@pytest.fixture(scope="module") +def states(loaders): + return loaders.states + + +@pytest.fixture(scope="module") +def modules(loaders): + return loaders.modules diff --git a/tests/pytests/functional/cache/test_cache.py b/tests/pytests/functional/cache/test_cache.py new file mode 100644 index 000000000000..7e3cda275d73 --- /dev/null +++ b/tests/pytests/functional/cache/test_cache.py @@ -0,0 +1,785 @@ +import logging +import os +import shutil +import socket +import time + +import pytest +import salt.cache +import salt.loader +from salt.exceptions import SaltCacheError +from saltfactories.utils import random_string +from saltfactories.utils.ports import get_unused_localhost_port +from tests.support.mock import MagicMock, patch + +docker = pytest.importorskip("docker") + +log = logging.getLogger(__name__) + +pytestmark = [ + pytest.mark.slow_test, + pytest.mark.skip_if_binaries_missing("dockerd"), +] + +# TODO: add out-of-band (i.e. not via the API) additions to the cache -W. Werner, 2021-09-28 + +# TODO: in PR request opinion: is it better to double serialize the data, e.g. +# store -> __context__['serial'].dumps({"timestamp": tstamp, "value": __context__['serial'].dumps(value)}) +# or is the existing approach of storing timestamp as a secondary key a good one??? +# ??? Is one slower than the other? + + +# TODO: Is there a better approach for waiting until the container is fully running? -W. Werner, 2021-07-27 +class Timer: + def __init__(self, timeout=20): + self.start = time.time() + self.timeout = timeout + + @property + def expired(self): + return time.time() - self.start > self.timeout + + +@pytest.fixture(scope="module") +def etcd_port(): + return get_unused_localhost_port() + + +@pytest.fixture(scope="module") +def redis_port(): + return get_unused_localhost_port() + + +@pytest.fixture(scope="module") +def consul_port(): + return get_unused_localhost_port() + + +# GIVE ME FIXTURES ON FIXTURES NOW + + +@pytest.fixture(scope="module") +def mysql_5_6_port(): + return get_unused_localhost_port() + + +@pytest.fixture(scope="module") +def mysql_5_7_port(): + return get_unused_localhost_port() + + +@pytest.fixture(scope="module") +def mysql_8_0_port(): + return get_unused_localhost_port() + + +@pytest.fixture(scope="module") +def mariadb_10_1_port(): + return get_unused_localhost_port() + + +@pytest.fixture(scope="module") +def mariadb_10_2_port(): + return get_unused_localhost_port() + + +@pytest.fixture(scope="module") +def mariadb_10_3_port(): + return get_unused_localhost_port() + + +@pytest.fixture(scope="module") +def mariadb_10_4_port(): + return get_unused_localhost_port() + + +@pytest.fixture(scope="module") +def mariadb_10_5_port(): + return get_unused_localhost_port() + + +@pytest.fixture(scope="module") +def percona_5_5_port(): + return get_unused_localhost_port() + + +@pytest.fixture(scope="module") +def percona_5_6_port(): + return get_unused_localhost_port() + + +@pytest.fixture(scope="module") +def percona_5_7_port(): + return get_unused_localhost_port() + + +@pytest.fixture(scope="module") +def percona_8_0_port(): + return get_unused_localhost_port() + + +# TODO: We should probably be building our own etcd docker image - fine to base it off of this one (or... others) -W. Werner, 2021-07-27 +@pytest.fixture(scope="module") +def etcd_apiv2_container(salt_factories, docker_client, etcd_port): + container = salt_factories.get_container( + random_string("etcd-server-"), + image_name="elcolio/etcd", + docker_client=docker_client, + check_ports=[etcd_port], + container_run_kwargs={ + "environment": {"ALLOW_NONE_AUTHENTICATION": "yes"}, + "ports": {"2379/tcp": etcd_port}, + }, + ) + with container.started() as factory: + yield factory + + +@pytest.fixture(scope="module") +def redis_container(salt_factories, docker_client, redis_port, docker_redis_image): + container = salt_factories.get_container( + random_string("redis-server-"), + image_name=docker_redis_image, + docker_client=docker_client, + check_ports=[redis_port], + container_run_kwargs={"ports": {"6379/tcp": redis_port}}, + ) + with container.started() as factory: + yield factory + + +# Pytest does not have the ability to parametrize fixtures with parametriezed +# fixtures, which is super annoying. In other words, in order to have a `cache` +# test fixture that takes different versions of the cache that depend on +# different docker images, I've gotta make up fixtures for each +# image+container. When https://github.com/pytest-dev/pytest/issues/349 is +# actually fixed then we can go ahead and refactor all of these mysql +# containers, caches, and their images into a single parametrized fixture. + + +def start_mysql_container( + salt_factories, docker_client, mysql_port, docker_mysql_image +): + container = salt_factories.get_container( + random_string("mysql-server-"), + image_name=docker_mysql_image, + docker_client=docker_client, + check_ports=[mysql_port], + container_run_kwargs={ + "environment": { + "MYSQL_ROOT_PASSWORD": "fnord", + "MYSQL_ROOT_HOST": "%", + }, + "ports": {"3306/tcp": mysql_port}, + }, + ) + return container.started() + + +@pytest.fixture(scope="module") +def mysql_5_6_container( + salt_factories, docker_client, mysql_5_6_port, docker_mysql_5_6_image +): + with start_mysql_container( + salt_factories, docker_client, mysql_5_6_port, docker_mysql_5_6_image + ) as factory: + yield factory + + +@pytest.fixture(scope="module") +def mysql_5_7_container( + salt_factories, docker_client, mysql_5_7_port, docker_mysql_5_7_image +): + with start_mysql_container( + salt_factories, docker_client, mysql_5_7_port, docker_mysql_5_7_image + ) as factory: + yield factory + + +@pytest.fixture(scope="module") +def mysql_8_0_container( + salt_factories, docker_client, mysql_8_0_port, docker_mysql_8_0_image +): + with start_mysql_container( + salt_factories, docker_client, mysql_8_0_port, docker_mysql_8_0_image + ) as factory: + yield factory + + +@pytest.fixture(scope="module") +def mariadb_10_1_container( + salt_factories, docker_client, mariadb_10_1_port, docker_mariadb_10_1_image +): + with start_mysql_container( + salt_factories, docker_client, mariadb_10_1_port, docker_mariadb_10_1_image + ) as factory: + yield factory + + +@pytest.fixture(scope="module") +def mariadb_10_2_container( + salt_factories, docker_client, mariadb_10_2_port, docker_mariadb_10_2_image +): + with start_mysql_container( + salt_factories, docker_client, mariadb_10_2_port, docker_mariadb_10_2_image + ) as factory: + yield factory + + +@pytest.fixture(scope="module") +def mariadb_10_3_container( + salt_factories, docker_client, mariadb_10_3_port, docker_mariadb_10_3_image +): + with start_mysql_container( + salt_factories, docker_client, mariadb_10_3_port, docker_mariadb_10_3_image + ) as factory: + yield factory + + +@pytest.fixture(scope="module") +def mariadb_10_4_container( + salt_factories, docker_client, mariadb_10_4_port, docker_mariadb_10_4_image +): + with start_mysql_container( + salt_factories, docker_client, mariadb_10_4_port, docker_mariadb_10_4_image + ) as factory: + yield factory + + +@pytest.fixture(scope="module") +def mariadb_10_5_container( + salt_factories, docker_client, mariadb_10_5_port, docker_mariadb_10_5_image +): + with start_mysql_container( + salt_factories, docker_client, mariadb_10_5_port, docker_mariadb_10_5_image + ) as factory: + yield factory + + +@pytest.fixture(scope="module") +def percona_5_5_container( + salt_factories, docker_client, percona_5_5_port, docker_percona_5_5_image +): + with start_mysql_container( + salt_factories, docker_client, percona_5_5_port, docker_percona_5_5_image + ) as factory: + yield factory + + +@pytest.fixture(scope="module") +def percona_5_6_container( + salt_factories, docker_client, percona_5_6_port, docker_percona_5_6_image +): + with start_mysql_container( + salt_factories, docker_client, percona_5_6_port, docker_percona_5_6_image + ) as factory: + yield factory + + +@pytest.fixture(scope="module") +def percona_5_7_container( + salt_factories, docker_client, percona_5_7_port, docker_percona_5_7_image +): + with start_mysql_container( + salt_factories, docker_client, percona_5_7_port, docker_percona_5_7_image + ) as factory: + yield factory + + +@pytest.fixture(scope="module") +def percona_8_0_container( + salt_factories, docker_client, percona_8_0_port, docker_percona_8_0_image +): + with start_mysql_container( + salt_factories, docker_client, percona_8_0_port, docker_percona_8_0_image + ) as factory: + yield factory + + +@pytest.fixture(scope="module") +def consul_container(salt_factories, docker_client, consul_port, docker_consul_image): + container = salt_factories.get_container( + random_string("consul-server-"), + image_name=docker_consul_image, + docker_client=docker_client, + check_ports=[consul_port], + container_run_kwargs={"ports": {"8500/tcp": consul_port}}, + ) + with container.started() as factory: + # TODO: May want to do the same thing for redis to ensure that service is up & running + # TODO: THIS IS HORRIBLE. THERE ARE BETTER WAYS TO DETECT SERVICE IS UP -W. Werner, 2021-10-12 + + timer = Timer(timeout=10) + sleeptime = 0.1 + while not timer.expired: + try: + with socket.create_connection( + ("localhost", consul_port), timeout=1 + ) as cli: + cli.send(b"GET /v1/kv/fnord HTTP/1.1\n\n") + cli.recv(2048) + break + except ConnectionResetError as e: + if e.errno == 104: + time.sleep(sleeptime) + sleeptime += sleeptime + else: + assert False, "Timer expired before connecting to consul" + yield factory + + +@pytest.fixture +def redis_cache(minion_opts, redis_port, redis_container): + opts = minion_opts.copy() + opts["cache"] = "redis" + opts["cache.redis.host"] = "127.0.0.1" + opts["cache.redis.port"] = redis_port + # NOTE: If you would like to ensure that alternate prefixes are properly + # tested, simply change these values and re-run the tests. + opts["cache.redis.bank_prefix"] = "#BANKY_BANK" + opts["cache.redis.bank_keys_prefix"] = "#WHO_HAS_MY_KEYS" + opts["cache.redis.key_prefix"] = "#LPL" + opts["cache.redis.timestamp_prefix"] = "%TICK_TOCK" + opts["cache.redis.separator"] = "\N{SNAKE}" + cache = salt.cache.factory(opts) + yield cache + + +@pytest.fixture(scope="module", autouse="true") +def ensure_deps(states): + installation_result = states.pip.installed( + name="fnord", + pkgs=["python-etcd", "redis", "redis-py-cluster", "python-consul", "pymysql"], + ) + assert ( + installation_result.result is True + ), "unable to pip install requirements {}".format(installation_result.comment) + + +@pytest.fixture +def etcd_cache(minion_opts, etcd_port, etcd_apiv2_container): + opts = minion_opts.copy() + opts["cache"] = "etcd" + opts["etcd.host"] = "127.0.0.1" + opts["etcd.port"] = etcd_port + opts["etcd.protocol"] = "http" + # NOTE: If you would like to ensure that alternate suffixes are properly + # tested, simply change this value and re-run the tests. + opts["etcd.timestamp_suffix"] = ".frobnosticate" + cache = salt.cache.factory(opts) + yield cache + + +@pytest.fixture +def localfs_cache(minion_opts): + opts = minion_opts.copy() + opts["cache"] = "localfs" + cache = salt.cache.factory(opts) + yield cache + shutil.rmtree(opts["cachedir"], ignore_errors=True) + + +@pytest.fixture +def consul_cache(minion_opts, consul_port, consul_container): + opts = minion_opts.copy() + opts["cache"] = "consul" + opts["consul.host"] = "127.0.0.1" + opts["consul.port"] = consul_port + # NOTE: If you would like to ensure that alternate suffixes are properly + # tested, simply change this value and re-run the tests. + opts["consul.timestamp_suffix"] = ".frobnosticate" + cache = salt.cache.factory(opts) + yield cache + + +def fixy(minion_opts, mysql_port, mysql_container): + # We're doing a late import because we need access to the exception + import salt.cache.mysql_cache + + # The container can be available before mysql actually is + mysql_container.container.exec_run( + [ + "/bin/sh", + "-c", + 'while ! mysql -u root -pfnord -e "SELECT 1;" >/dev/null; do sleep 1; done', + ], + ) + + # Gotta make the db we're going to use + res = mysql_container.container.exec_run( + [ + "/bin/sh", + "-c", + 'echo "create database salt_cache;" | mysql -u root -pfnord ', + ], + ) + + opts = minion_opts.copy() + opts["cache"] = "mysql" + opts["mysql.host"] = "127.0.0.1" + opts["mysql.port"] = mysql_port + opts["mysql.user"] = "root" + opts["mysql.password"] = "fnord" + opts["mysql.database"] = "salt_cache" + opts["mysql.table_name"] = "cache" + cache = salt.cache.factory(opts) + + # For some reason even though mysql is available in the container, we + # can't reliably connect outside the container. Wait for access - but we + # may need a new cache... + timer = Timer(timeout=15) + while not timer.expired: + try: + # Doesn't matter what. We just have to execute so that we spin + # here until we can actually connect to the db instance. + cache.modules["mysql.list"]("salt_cache") + except salt.cache.mysql_cache.MySQLdb.DatabaseError: + # We don't really care what MySQL error is happening - + pass + else: + break + else: + if os.environ.get("CI_RUN"): + pytest.skip('Timer expired before "select 1;" worked') + else: + assert False, 'Timer expired before "select 1;" worked' + + # This ensures that we will correctly alter any existing mysql tables for + # current mysql cache users. Without completely altering the mysql_cache + # implementation there's no real other reasonable way to reset the client + # and force the alter_table to be called. Resetting the client to `None` is + # what triggers the implementation to allow the ALTER TABLE to add the + # last_update column + run_query = cache.modules["mysql.run_query"] + run_query( + conn=None, + query="ALTER TABLE salt_cache.cache DROP COLUMN last_update", + )[0].fetchone() + + cache.modules["mysql.force_reconnect"]() + return cache + + +# See container comment above >:( + + +@pytest.fixture(scope="module") +def mysql_5_6_cache(minion_opts, mysql_5_6_port, mysql_5_6_container): + yield fixy(minion_opts, mysql_5_6_port, mysql_5_6_container) + + +@pytest.fixture(scope="module") +def mysql_5_7_cache(minion_opts, mysql_5_7_port, mysql_5_7_container): + yield fixy(minion_opts, mysql_5_7_port, mysql_5_7_container) + + +@pytest.fixture(scope="module") +def mysql_8_0_cache(minion_opts, mysql_8_0_port, mysql_8_0_container): + yield fixy(minion_opts, mysql_8_0_port, mysql_8_0_container) + + +@pytest.fixture(scope="module") +def mariadb_10_1_cache(minion_opts, mariadb_10_1_port, mariadb_10_1_container): + yield fixy(minion_opts, mariadb_10_1_port, mariadb_10_1_container) + + +@pytest.fixture(scope="module") +def mariadb_10_2_cache(minion_opts, mariadb_10_2_port, mariadb_10_2_container): + yield fixy(minion_opts, mariadb_10_2_port, mariadb_10_2_container) + + +@pytest.fixture(scope="module") +def mariadb_10_3_cache(minion_opts, mariadb_10_3_port, mariadb_10_3_container): + yield fixy(minion_opts, mariadb_10_3_port, mariadb_10_3_container) + + +@pytest.fixture(scope="module") +def mariadb_10_4_cache(minion_opts, mariadb_10_4_port, mariadb_10_4_container): + yield fixy(minion_opts, mariadb_10_4_port, mariadb_10_4_container) + + +@pytest.fixture(scope="module") +def mariadb_10_5_cache(minion_opts, mariadb_10_5_port, mariadb_10_5_container): + yield fixy(minion_opts, mariadb_10_5_port, mariadb_10_5_container) + + +@pytest.fixture(scope="module") +def percona_5_5_cache(minion_opts, percona_5_5_port, percona_5_5_container): + yield fixy(minion_opts, percona_5_5_port, percona_5_5_container) + + +@pytest.fixture(scope="module") +def percona_5_6_cache(minion_opts, percona_5_6_port, percona_5_6_container): + yield fixy(minion_opts, percona_5_6_port, percona_5_6_container) + + +@pytest.fixture(scope="module") +def percona_5_7_cache(minion_opts, percona_5_7_port, percona_5_7_container): + yield fixy(minion_opts, percona_5_7_port, percona_5_7_container) + + +@pytest.fixture(scope="module") +def percona_8_0_cache(minion_opts, percona_8_0_port, percona_8_0_container): + yield fixy(minion_opts, percona_8_0_port, percona_8_0_container) + + +# TODO: Figure out how to parametrize this in combo with the getfixturevalue process -W. Werner, 2021-10-28 +@pytest.fixture +def memcache_cache(minion_opts): + opts = minion_opts.copy() + opts["memcache_expire_seconds"] = 42 + cache = salt.cache.factory(opts) + yield cache + + +@pytest.fixture( + params=[ + "localfs_cache", + "redis_cache", + "etcd_cache", + "consul_cache", + "mysql_5_6_cache", + "mysql_5_7_cache", + "mysql_8_0_cache", + "mariadb_10_1_cache", + "mariadb_10_2_cache", + "mariadb_10_3_cache", + "mariadb_10_4_cache", + "mariadb_10_5_cache", + "percona_5_5_cache", + "percona_5_6_cache", + "percona_5_7_cache", + "percona_8_0_cache", + "memcache_cache", # Memcache actually delegates some behavior to the backing cache which alters the API somewhat. + ] +) +def cache(request): + # This is not an ideal way to get the particular cache type but + # it's currently what we have available. It behaves *very* badly when + # attempting to parametrize these fixtures. Don't ask me how I known. + yield request.getfixturevalue(request.param) + + +def test_caching(subtests, cache): + bank = "fnord/kevin/stuart" + # ^^^^ This bank can be just fnord, or fnord/foo, or any mildly reasonable + # or possibly unreasonably nested names. + # + # No. Seriously. Try import string; bank = '/'.join(string.ascii_letters) + # - it works! + # import string; bank = "/".join(string.ascii_letters) + good_key = "roscivs" + bad_key = "monkey" + + with subtests.test("non-existent bank should be empty on cache start"): + assert not cache.contains(bank=bank) + assert cache.list(bank=bank) == [] + + with subtests.test("after storing key in bank it should be in cache list"): + cache.store(bank=bank, key=good_key, data=b"\x01\x04\x05fnordy data") + assert cache.list(bank) == [good_key] + + with subtests.test("after storing value, it should be fetchable"): + expected_data = "trombone pleasantry" + cache.store(bank=bank, key=good_key, data=expected_data) + assert cache.fetch(bank=bank, key=good_key) == expected_data + + with subtests.test("bad key should still be absent from cache"): + assert cache.fetch(bank=bank, key=bad_key) == {} + + with subtests.test("storing new value should update it"): + # Double check that the data was still the old stuff + old_data = expected_data + assert cache.fetch(bank=bank, key=good_key) == old_data + new_data = "stromboli" + cache.store(bank=bank, key=good_key, data=new_data) + assert cache.fetch(bank=bank, key=good_key) == new_data + + with subtests.test("storing complex object works"): + new_thing = { + "some": "data", + 42: "wheee", + "some other": {"sub": {"objects": "here"}}, + } + + cache.store(bank=bank, key=good_key, data=new_thing) + actual_thing = cache.fetch(bank=bank, key=good_key) + if isinstance(cache, salt.cache.MemCache): + # MemCache should actually store the object - everything else + # should create a copy of it. + assert actual_thing is new_thing + else: + assert actual_thing is not new_thing + assert actual_thing == new_thing + + with subtests.test("contains returns true if key in bank"): + assert cache.contains(bank=bank, key=good_key) + + with subtests.test("contains returns true if bank exists and key is None"): + assert cache.contains(bank=bank, key=None) + + with subtests.test( + "contains returns False when bank not in cache and/or key not in bank" + ): + assert not cache.contains(bank=bank, key=bad_key) + assert not cache.contains(bank="nonexistent", key=good_key) + assert not cache.contains(bank="nonexistent", key=bad_key) + assert not cache.contains(bank="nonexistent", key=None) + + with subtests.test("flushing nonexistent key should not remove other keys"): + cache.flush(bank=bank, key=bad_key) + assert cache.contains(bank=bank, key=good_key) + + with subtests.test( + "flushing existing key should not remove bank if no more keys exist" + ): + pytest.skip( + "This is impossible with redis. Should we make localfs behave the same way?" + ) + cache.flush(bank=bank, key=good_key) + assert cache.contains(bank=bank) + assert cache.list(bank=bank) == [] + + with subtests.test( + "after existing key is flushed updated should not return a timestamp for that key" + ): + cache.store(bank=bank, key=good_key, data="fnord") + cache.flush(bank=bank, key=good_key) + timestamp = cache.updated(bank=bank, key=good_key) + assert timestamp is None + + with subtests.test( + "after flushing bank containing a good key, updated should not return a timestamp for that key" + ): + cache.store(bank=bank, key=good_key, data="fnord") + cache.flush(bank=bank, key=None) + timestamp = cache.updated(bank=bank, key=good_key) + assert timestamp is None + + with subtests.test("flushing bank with None as key should remove bank"): + cache.flush(bank=bank, key=None) + assert not cache.contains(bank=bank) + + with subtests.test("Exception should happen when flushing None bank"): + # This bit is maybe an accidental API, but currently there is no + # protection at least with the localfs cache when bank is None. If + # bank is None we try to `os.path.normpath` the bank, which explodes + # and is at least the current behavior. If we want to change that + # this test should change. Or be removed altogether. + # TODO: this should actually not raise. Not sure if there's a test that we can do here... or just call the code which will fail if there's actually an exception. -W. Werner, 2021-09-28 + pytest.skip( + "Skipping for now - etcd, redis, and mysql do not raise. Should ensure all backends behave consistently" + ) + with pytest.raises(Exception): + cache.flush(bank=None, key=None) + + with subtests.test("Updated for non-existent key should return None"): + timestamp = cache.updated(bank="nonexistent", key="whatever") + assert timestamp is None + + with subtests.test("Updated for key should return a reasonable time"): + before_storage = int(time.time()) + cache.store(bank="fnord", key="updated test part 2", data="fnord") + after_storage = int(time.time()) + + timestamp = cache.updated(bank="fnord", key="updated test part 2") + + assert before_storage <= timestamp <= after_storage + + with subtests.test( + "If the module raises SaltCacheError then it should make it out of updated" + ): + with patch.dict( + cache.modules._dict, + {"{}.updated".format(cache.driver): MagicMock(side_effect=SaltCacheError)}, + ), pytest.raises(SaltCacheError): + cache.updated(bank="kaboom", key="oops") + + with subtests.test( + "cache.cache right after a value is cached should not update the cache" + ): + expected_value = "some cool value yo" + cache.store(bank=bank, key=good_key, data=expected_value) + result = cache.cache( + bank=bank, + key=good_key, + fun=lambda **kwargs: "bad bad value no good", + value="some other value?", + loop_fun=lambda x: "super very no good bad", + ) + fetch_result = cache.fetch(bank=bank, key=good_key) + + assert result == fetch_result == expected_value + + with subtests.test( + "cache.cache should update the value with the result of fun when value was updated longer than expiration", + ), patch( + "salt.cache.Cache.updated", + return_value=42, # Dec 31, 1969... time to update the cache! + autospec=True, + ): + expected_value = "this is the return value woo woo woo" + cache.store(bank=bank, key=good_key, data="not this value") + cache_result = cache.cache( + bank=bank, key=good_key, fun=lambda *args, **kwargs: expected_value + ) + fetch_result = cache.fetch(bank=bank, key=good_key) + + assert cache_result == fetch_result == expected_value + + with subtests.test( + "cache.cache should update the value with all of the outputs from loop_fun if loop_fun was provided", + ), patch( + "salt.cache.Cache.updated", + return_value=42, + autospec=True, + ): + expected_value = "SOME HUGE STRING OKAY?" + + cache.store(bank=bank, key=good_key, data="nope, not me") + cache_result = cache.cache( + bank=bank, + key=good_key, + fun=lambda **kwargs: "some huge string okay?", + loop_fun=str.upper, + ) + fetch_result = cache.fetch(bank=bank, key=good_key) + + assert cache_result == fetch_result + assert "".join(fetch_result) == expected_value + + with subtests.test( + "cache.cache should update the value if the stored value is empty but present and expiry is way in the future" + ), patch( + "salt.cache.Cache.updated", + return_value=time.time() * 2, + autospec=True, + ): + # Unclear if this was intended behavior: currently any falsey data will + # be updated by cache.cache. If this is incorrect, this test should + # be updated or removed. + expected_data = "some random string whatever" + for empty in ("", (), [], {}, 0, 0.0, False, None): + with subtests.test(empty=empty): + cache.store( + bank=bank, key=good_key, data=empty + ) # empty chairs and empty data + cache_result = cache.cache( + bank=bank, key=good_key, fun=lambda **kwargs: expected_data + ) + fetch_result = cache.fetch(bank=bank, key=good_key) + + assert cache_result == fetch_result == expected_data + + with subtests.test("cache.cache should store a value if it does not exist"): + expected_result = "some result plz" + cache.flush(bank=bank, key=None) + assert cache.fetch(bank=bank, key=good_key) == {} + cache_result = cache.cache( + bank=bank, key=good_key, fun=lambda **kwargs: expected_result + ) + fetch_result = cache.fetch(bank=bank, key=good_key) + + assert cache_result == fetch_result + assert fetch_result == expected_result + assert cache_result == fetch_result == expected_result diff --git a/tests/pytests/functional/cache/test_redis_cache.py b/tests/pytests/functional/cache/test_redis_cache.py new file mode 100644 index 000000000000..05fa7c981a05 --- /dev/null +++ b/tests/pytests/functional/cache/test_redis_cache.py @@ -0,0 +1,32 @@ +import pytest +import salt.cache + + +@pytest.fixture +def redis_cluster_cache(minion_opts): + opts = minion_opts.copy() + opts["cache"] = "redis" + opts["cache.redis.cluster_mode"] = True + cache = salt.cache.factory(opts) + yield cache + + +@pytest.fixture(scope="module", autouse="true") +def ensure_deps(states): + installation_result = states.pip.installed( + name="fnord", pkgs=["redis", "redis-py-cluster"] + ) + assert ( + installation_result.result is True + ), "unable to pip install requirements {}".format(installation_result.comment) + + +def test_redis_cluster_cache_should_import_correctly(redis_cluster_cache): + import rediscluster.exceptions + + with pytest.raises(rediscluster.exceptions.RedisClusterException): + # Currently the opts aren't actually correct for a redis cluster + # so this will fail. If, in the future, the redis_cluster_cache fixture + # needs to point to an actual redis cluster, then this test will + # probably become obsolete + redis_cluster_cache.store(bank="foo", key="whatever", data="lol") diff --git a/tests/pytests/functional/conftest.py b/tests/pytests/functional/conftest.py index cdcb4af334fa..6e840d1817bb 100644 --- a/tests/pytests/functional/conftest.py +++ b/tests/pytests/functional/conftest.py @@ -3,8 +3,17 @@ import shutil import pytest +from saltfactories.daemons.container import Container from saltfactories.utils.functional import Loaders +try: + import docker +except ImportError: + # Test suites depending on docker should be using + # docker = pytest.importorskip("docker") + # so any fixtures using docker shouldn't ever be called or used. + pass + log = logging.getLogger(__name__) @@ -90,3 +99,104 @@ def reset_loaders_state(loaders): finally: # Reset the loaders state loaders.reset_state() + + +@pytest.fixture(scope="module") +def docker_client(): + try: + client = docker.from_env() + except docker.errors.DockerException: + pytest.skip("Failed to get a connection to docker running on the system") + connectable = Container.client_connectable(client) + if connectable is not True: # pragma: nocover + pytest.skip(connectable) + return client + + +def pull_or_skip(image_name, docker_client): + try: + docker_client.images.pull(image_name) + except docker.errors.APIError as exc: + pytest.skip("Failed to pull docker image {!r}: {}".format(image_name, exc)) + except ImportError: + pytest.skip("docker module was not installed") + return image_name + + +@pytest.fixture(scope="module") +def docker_redis_image(docker_client): + return pull_or_skip("redis:alpine", docker_client) + + +@pytest.fixture(scope="module") +def docker_consul_image(docker_client): + return pull_or_skip("consul:latest", docker_client) + + +# Pytest does not have the ability to parametrize fixtures with parametriezed +# fixtures, which is super annoying. In other words, in order to have a `cache` +# test fixture that takes different versions of the cache that depend on +# different docker images, I've gotta make up fixtures for each +# image+container. When https://github.com/pytest-dev/pytest/issues/349 is +# actually fixed then we can go ahead and refactor all of these mysql images +# (and container fixtures depending on it) into a single fixture. + + +@pytest.fixture(scope="module") +def docker_mysql_5_6_image(docker_client): + return pull_or_skip("mysql/mysql-server:5.6", docker_client) + + +@pytest.fixture(scope="module") +def docker_mysql_5_7_image(docker_client): + return pull_or_skip("mysql/mysql-server:5.7", docker_client) + + +@pytest.fixture(scope="module") +def docker_mysql_8_0_image(docker_client): + return pull_or_skip("mysql/mysql-server:8.0", docker_client) + + +@pytest.fixture(scope="module") +def docker_mariadb_10_1_image(docker_client): + return pull_or_skip("mariadb:10.1", docker_client) + + +@pytest.fixture(scope="module") +def docker_mariadb_10_2_image(docker_client): + return pull_or_skip("mariadb:10.2", docker_client) + + +@pytest.fixture(scope="module") +def docker_mariadb_10_3_image(docker_client): + return pull_or_skip("mariadb:10.3", docker_client) + + +@pytest.fixture(scope="module") +def docker_mariadb_10_4_image(docker_client): + return pull_or_skip("mariadb:10.4", docker_client) + + +@pytest.fixture(scope="module") +def docker_mariadb_10_5_image(docker_client): + return pull_or_skip("mariadb:10.5", docker_client) + + +@pytest.fixture(scope="module") +def docker_percona_5_5_image(docker_client): + return pull_or_skip("percona:5.5", docker_client) + + +@pytest.fixture(scope="module") +def docker_percona_5_6_image(docker_client): + return pull_or_skip("percona:5.6", docker_client) + + +@pytest.fixture(scope="module") +def docker_percona_5_7_image(docker_client): + return pull_or_skip("percona:5.7", docker_client) + + +@pytest.fixture(scope="module") +def docker_percona_8_0_image(docker_client): + return pull_or_skip("percona:8.0", docker_client) diff --git a/tests/pytests/unit/cache/test_mysql_cache.py b/tests/pytests/unit/cache/test_mysql_cache.py index 03e2250931e5..9532436c8861 100644 --- a/tests/pytests/unit/cache/test_mysql_cache.py +++ b/tests/pytests/unit/cache/test_mysql_cache.py @@ -135,7 +135,11 @@ def test_flush(): with patch.object(mysql_cache, "run_query") as mock_run_query: expected_calls = [ - call(mock_connect_client, "DELETE FROM salt WHERE bank='bank'") + call( + mock_connect_client, + "DELETE FROM salt WHERE bank=%s", + args=("bank",), + ), ] mock_run_query.return_value = (MagicMock(), "") mysql_cache.flush(bank="bank") @@ -144,7 +148,8 @@ def test_flush(): expected_calls = [ call( mock_connect_client, - "DELETE FROM salt WHERE bank='bank' AND etcd_key='key'", + "DELETE FROM salt WHERE bank=%s AND etcd_key=%s", + args=("bank", "key"), ) ] mysql_cache.flush(bank="bank", key="key") @@ -219,6 +224,9 @@ def test_create_table(master_config): bank CHAR(255), etcd_key CHAR(255), data MEDIUMBLOB, + last_update TIMESTAMP NOT NULL + DEFAULT CURRENT_TIMESTAMP + ON UPDATE CURRENT_TIMESTAMP, PRIMARY KEY(bank, etcd_key) );""" expected_calls = [call(mock_connect_client, sql_call)]