Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions cmake/Modules/SourceFiles.cmake
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,7 @@ set(VALKEY_SERVER_SRCS
${CMAKE_SOURCE_DIR}/src/geo.c
${CMAKE_SOURCE_DIR}/src/lazyfree.c
${CMAKE_SOURCE_DIR}/src/module.c
${CMAKE_SOURCE_DIR}/src/lrulfu.c
${CMAKE_SOURCE_DIR}/src/evict.c
${CMAKE_SOURCE_DIR}/src/expire.c
${CMAKE_SOURCE_DIR}/src/geohash.c
Expand Down
2 changes: 1 addition & 1 deletion src/Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -423,7 +423,7 @@ ENGINE_NAME=valkey
SERVER_NAME=$(ENGINE_NAME)-server$(PROG_SUFFIX)
ENGINE_SENTINEL_NAME=$(ENGINE_NAME)-sentinel$(PROG_SUFFIX)
ENGINE_TRACE_OBJ=trace/trace.o trace/trace_commands.o trace/trace_db.o trace/trace_cluster.o trace/trace_server.o trace/trace_rdb.o trace/trace_aof.o
ENGINE_SERVER_OBJ=threads_mngr.o adlist.o vector.o quicklist.o ae.o anet.o dict.o hashtable.o kvstore.o server.o sds.o zmalloc.o lzf_c.o lzf_d.o pqsort.o zipmap.o sha1.o ziplist.o release.o memory_prefetch.o io_threads.o networking.o util.o object.o db.o replication.o rdb.o t_string.o t_list.o t_set.o t_zset.o t_hash.o config.o aof.o pubsub.o multi.o debug.o sort.o intset.o syncio.o cluster.o cluster_legacy.o cluster_slot_stats.o crc16.o cluster_migrateslots.o endianconv.o commandlog.o eval.o bio.o rio.o rand.o memtest.o syscheck.o crcspeed.o crccombine.o crc64.o bitops.o sentinel.o notify.o setproctitle.o blocked.o hyperloglog.o latency.o sparkline.o valkey-check-rdb.o valkey-check-aof.o geo.o lazyfree.o module.o evict.o expire.o geohash.o geohash_helper.o childinfo.o allocator_defrag.o defrag.o siphash.o rax.o t_stream.o listpack.o localtime.o lolwut.o lolwut5.o lolwut6.o lolwut9.o acl.o tracking.o socket.o tls.o sha256.o timeout.o setcpuaffinity.o monotonic.o mt19937-64.o resp_parser.o call_reply.o script.o functions.o commands.o strl.o connection.o unix.o logreqres.o rdma.o scripting_engine.o entry.o vset.o lua/script_lua.o lua/function_lua.o lua/engine_lua.o lua/debug_lua.o
ENGINE_SERVER_OBJ=threads_mngr.o adlist.o vector.o quicklist.o ae.o anet.o dict.o hashtable.o kvstore.o server.o sds.o zmalloc.o lzf_c.o lzf_d.o pqsort.o zipmap.o sha1.o ziplist.o release.o memory_prefetch.o io_threads.o networking.o util.o object.o db.o replication.o rdb.o t_string.o t_list.o t_set.o t_zset.o t_hash.o config.o aof.o pubsub.o multi.o debug.o sort.o intset.o syncio.o cluster.o cluster_legacy.o cluster_slot_stats.o crc16.o cluster_migrateslots.o endianconv.o commandlog.o eval.o bio.o rio.o rand.o memtest.o syscheck.o crcspeed.o crccombine.o crc64.o bitops.o sentinel.o notify.o setproctitle.o blocked.o hyperloglog.o latency.o sparkline.o valkey-check-rdb.o valkey-check-aof.o geo.o lazyfree.o module.o lrulfu.o evict.o expire.o geohash.o geohash_helper.o childinfo.o allocator_defrag.o defrag.o siphash.o rax.o t_stream.o listpack.o localtime.o lolwut.o lolwut5.o lolwut6.o lolwut9.o acl.o tracking.o socket.o tls.o sha256.o timeout.o setcpuaffinity.o monotonic.o mt19937-64.o resp_parser.o call_reply.o script.o functions.o commands.o strl.o connection.o unix.o logreqres.o rdma.o scripting_engine.o entry.o vset.o lua/script_lua.o lua/function_lua.o lua/engine_lua.o lua/debug_lua.o
ENGINE_SERVER_OBJ+=$(ENGINE_TRACE_OBJ)
ENGINE_CLI_NAME=$(ENGINE_NAME)-cli$(PROG_SUFFIX)
ENGINE_CLI_OBJ=anet.o adlist.o dict.o valkey-cli.o zmalloc.o release.o ae.o serverassert.o crcspeed.o crccombine.o crc64.o siphash.o crc16.o monotonic.o cli_common.o mt19937-64.o strl.o cli_commands.o sds.o util.o sha256.o
Expand Down
5 changes: 2 additions & 3 deletions src/cluster.c
Original file line number Diff line number Diff line change
Expand Up @@ -198,7 +198,7 @@ void dumpCommand(client *c) {

/* RESTORE key ttl serialized-value [REPLACE] [ABSTTL] [IDLETIME seconds] [FREQ frequency] */
void restoreCommand(client *c) {
long long ttl, lfu_freq = -1, lru_idle = -1, lru_clock = -1;
long long ttl, lfu_freq = -1, lru_idle = -1;
uint16_t rdbver = 0;
rio payload;
int j, type, replace = 0, absttl = 0;
Expand All @@ -217,7 +217,6 @@ void restoreCommand(client *c) {
addReplyError(c, "Invalid IDLETIME value, must be >= 0");
return;
}
lru_clock = LRU_CLOCK();
j++; /* Consume additional arg. */
} else if (!strcasecmp(c->argv[j]->ptr, "freq") && additional >= 1 && lru_idle == -1) {
if (getLongLongFromObjectOrReply(c, c->argv[j + 1], &lfu_freq, NULL) != C_OK) return;
Expand Down Expand Up @@ -305,7 +304,7 @@ void restoreCommand(client *c) {
rewriteClientCommandArgument(c, c->argc, shared.absttl);
}
}
objectSetLRUOrLFU(obj, lfu_freq, lru_idle, lru_clock, 1000);
objectSetLRUOrLFU(obj, lfu_freq, lru_idle);
signalModifiedKey(c, c->db, key);
notifyKeyspaceEvent(NOTIFY_GENERIC, "restore", key, c->db->id);
addReply(c, shared.ok);
Expand Down
14 changes: 1 addition & 13 deletions src/db.c
Original file line number Diff line number Diff line change
Expand Up @@ -49,14 +49,6 @@ static int objectIsExpired(robj *val);
static void dbSetValue(serverDb *db, robj *key, robj **valref, int overwrite, void **oldref);
static robj *dbFindWithDictIndex(serverDb *db, sds key, int dict_index);

/* Update LFU when an object is accessed.
* Firstly, decrement the counter if the decrement time is reached.
* Then logarithmically increment the counter, and update the access time. */
void updateLFU(robj *val) {
unsigned long counter = LFUDecrAndReturn(val);
counter = LFULogIncr(counter);
val->lru = (LFUGetTimeInMinutes() << 8) | counter;
}

/* Lookup a key for read or write operations, or return NULL if the key is not
* found in the specified DB. This function implements the functionality of
Expand Down Expand Up @@ -118,11 +110,7 @@ robj *lookupKey(serverDb *db, robj *key, int flags) {
if (!hasActiveChildProcess() && !(flags & LOOKUP_NOTOUCH)) {
/* Shared objects can't be stored in the database. */
serverAssert(val->refcount != OBJ_SHARED_REFCOUNT);
if (server.maxmemory_policy & MAXMEMORY_FLAG_LFU) {
updateLFU(val);
} else {
val->lru = LRU_CLOCK();
}
val->lru = lrulfu_touch(val->lru);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why not wrap the touch as an Object API as well?

}

if (!(flags & (LOOKUP_NOSTATS | LOOKUP_WRITE))) server.stat_keyspace_hits++;
Expand Down
7 changes: 5 additions & 2 deletions src/debug.c
Original file line number Diff line number Diff line change
Expand Up @@ -688,8 +688,11 @@ void debugCommand(client *c) {
s = sdscatprintf(s, "Value at:%p refcount:%d encoding:%s", (void *)val, val->refcount, strenc);
if (!fast) s = sdscatprintf(s, " serializedlength:%zu", rdbSavedObjectLen(val, c->argv[2], c->db->id));
/* Either lru or lfu field could work correctly which depends on server.maxmemory_policy. */
s = sdscatprintf(s, " lru:%d lru_seconds_idle:%llu", val->lru, estimateObjectIdleTime(val) / 1000);
s = sdscatprintf(s, " lfu_freq:%lu lfu_access_time_minutes:%u", LFUDecrAndReturn(val), val->lru >> 8);
if (lrulfu_isUsingLFU()) {
s = sdscatprintf(s, " lfu_freq:%u lfu_access_time_minutes:%u", objectGetLFUFrequency(val), val->lru >> 8);
} else {
s = sdscatprintf(s, " lru:%d lru_seconds_idle:%u", val->lru, lru_getIdleSecs(val->lru));
}
s = sdscatprintf(s, "%s", extra);
addReplyStatusLength(c, s, sdslen(s));
sdsfree(s);
Expand Down
127 changes: 2 additions & 125 deletions src/evict.c
Original file line number Diff line number Diff line change
Expand Up @@ -67,38 +67,6 @@ static struct evictionPoolEntry *EvictionPoolLRU;
* Implementation of eviction, aging and LRU
* --------------------------------------------------------------------------*/

/* Return the LRU clock, based on the clock resolution. This is a time
* in a reduced-bits format that can be used to set and check the
* object->lru field of serverObject structures. */
unsigned int getLRUClock(void) {
return (mstime() / LRU_CLOCK_RESOLUTION) & LRU_CLOCK_MAX;
}

/* This function is used to obtain the current LRU clock.
* If the current resolution is lower than the frequency we refresh the
* LRU clock (as it should be in production servers) we return the
* precomputed value, otherwise we need to resort to a system call. */
unsigned int LRU_CLOCK(void) {
unsigned int lruclock;
if (1000 / server.hz <= LRU_CLOCK_RESOLUTION) {
lruclock = server.lruclock;
} else {
lruclock = getLRUClock();
}
return lruclock;
}

/* Given an object returns the min number of milliseconds the object was never
* requested, using an approximated LRU algorithm. */
unsigned long long estimateObjectIdleTime(robj *o) {
unsigned long long lruclock = LRU_CLOCK();
if (lruclock >= o->lru) {
return (lruclock - o->lru) * LRU_CLOCK_RESOLUTION;
} else {
return (lruclock + (LRU_CLOCK_MAX - o->lru)) * LRU_CLOCK_RESOLUTION;
}
}

/* LRU approximation algorithm
*
* The server uses an approximation of the LRU algorithm that runs in constant
Expand Down Expand Up @@ -158,17 +126,8 @@ int evictionPoolPopulate(serverDb *db, kvstore *samplekvs, struct evictionPoolEn
/* Calculate the idle time according to the policy. This is called
* idle just because the code initially handled LRU, but is in fact
* just a score where a higher score means better candidate. */
if (server.maxmemory_policy & MAXMEMORY_FLAG_LRU) {
idle = estimateObjectIdleTime(o);
} else if (server.maxmemory_policy & MAXMEMORY_FLAG_LFU) {
/* When we use an LRU policy, we sort the keys by idle time
* so that we expire keys starting from greater idle time.
* However when the policy is an LFU one, we have a frequency
* estimation, and we want to evict keys with lower frequency
* first. So inside the pool we put objects using the inverted
* frequency subtracting the actual frequency to the maximum
* frequency of 255. */
idle = 255 - LFUDecrAndReturn(o);
if (server.maxmemory_policy & (MAXMEMORY_FLAG_LRU | MAXMEMORY_FLAG_LFU)) {
idle = objectGetIdleness(o);
} else if (server.maxmemory_policy == MAXMEMORY_VOLATILE_TTL) {
/* In this case the sooner the expire the better. */
idle = ULLONG_MAX - objectGetExpire(o);
Expand Down Expand Up @@ -230,88 +189,6 @@ int evictionPoolPopulate(serverDb *db, kvstore *samplekvs, struct evictionPoolEn
return count;
}

/* ----------------------------------------------------------------------------
* LFU (Least Frequently Used) implementation.

* We have 24 total bits of space in each object in order to implement
* an LFU (Least Frequently Used) eviction policy, since we re-use the
* LRU field for this purpose.
*
* We split the 24 bits into two fields:
*
* 16 bits 8 bits
* +------------------+--------+
* + Last access time | LOG_C |
* +------------------+--------+
*
* LOG_C is a logarithmic counter that provides an indication of the access
* frequency. However this field must also be decremented otherwise what used
* to be a frequently accessed key in the past, will remain ranked like that
* forever, while we want the algorithm to adapt to access pattern changes.
*
* So the remaining 16 bits are used in order to store the "access time",
* a reduced-precision Unix time (we take 16 bits of the time converted
* in minutes since we don't care about wrapping around) where the LOG_C
* counter decays every minute by default (depends on lfu-decay-time).
*
* New keys don't start at zero, in order to have the ability to collect
* some accesses before being trashed away, so they start at LFU_INIT_VAL.
* The logarithmic increment performed on LOG_C takes care of LFU_INIT_VAL
* when incrementing the key, so that keys starting at LFU_INIT_VAL
* (or having a smaller value) have a very high chance of being incremented
* on access. (The chance depends on counter and lfu-log-factor.)
*
* During decrement, the value of the logarithmic counter is decremented by
* one when lfu-decay-time minutes elapsed.
* --------------------------------------------------------------------------*/

/* Return the current time in minutes, just taking the least significant
* 16 bits. The returned time is suitable to be stored as LDT (last access
* time) for the LFU implementation. */
unsigned long LFUGetTimeInMinutes(void) {
return (server.unixtime / 60) & 65535;
}

/* Given an object ldt (last access time), compute the minimum number of minutes
* that elapsed since the last access. Handle overflow (ldt greater than
* the current 16 bits minutes time) considering the time as wrapping
* exactly once. */
unsigned long LFUTimeElapsed(unsigned long ldt) {
unsigned long now = LFUGetTimeInMinutes();
if (now >= ldt) return now - ldt;
return 65535 - ldt + now;
}

/* Logarithmically increment a counter. The greater is the current counter value
* the less likely is that it gets really incremented. Saturate it at 255. */
uint8_t LFULogIncr(uint8_t counter) {
if (counter == 255) return 255;
double r = (double)rand() / RAND_MAX;
double baseval = counter - LFU_INIT_VAL;
if (baseval < 0) baseval = 0;
double p = 1.0 / (baseval * server.lfu_log_factor + 1);
if (r < p) counter++;
return counter;
}

/* If the object's ldt (last access time) is reached, decrement the LFU counter but
* do not update LFU fields of the object, we update the access time
* and counter in an explicit way when the object is really accessed.
* And we will decrement the counter according to the times of
* elapsed time than server.lfu_decay_time.
* Return the object frequency counter.
*
* This function is used in order to scan the dataset for the best object
* to fit: as we check for the candidate, we incrementally decrement the
* counter of the scanned objects if needed. */
unsigned long LFUDecrAndReturn(robj *o) {
unsigned long ldt = o->lru >> 8;
unsigned long counter = o->lru & 255;
unsigned long num_periods = server.lfu_decay_time ? LFUTimeElapsed(ldt) / server.lfu_decay_time : 0;
if (num_periods) counter = (num_periods > counter) ? 0 : counter - num_periods;
return counter;
}

/* We don't want to count AOF buffers and replicas output buffers as
* used memory: the eviction should use mostly data size, because
* it can cause feedback-loop when we push DELs into them, putting
Expand Down
Loading
Loading