diff --git a/.asf.yaml b/.asf.yaml index 4d0f708aec3..3baaba004c6 100644 --- a/.asf.yaml +++ b/.asf.yaml @@ -42,6 +42,16 @@ github: strict: true contexts: - Required + '1.3': {} + '2.0': {} + '2.1': {} + '2.2': {} + '2.3': {} + '2.4': {} + '2.5': {} + '2.6': {} + '2.7': {} + '2.8': {} notifications: commits: commits@kvrocks.apache.org diff --git a/.github/workflows/kvrocks.yaml b/.github/workflows/kvrocks.yaml index 270d4e70138..06762033d8e 100644 --- a/.github/workflows/kvrocks.yaml +++ b/.github/workflows/kvrocks.yaml @@ -57,10 +57,10 @@ jobs: FORCE_COLOR: 1 steps: - uses: actions/checkout@v4 - - name: Install typos - run: curl -LsSf https://github.com/crate-ci/typos/releases/download/v1.18.2/typos-v1.18.2-x86_64-unknown-linux-musl.tar.gz | tar zxf - -C ${CARGO_HOME:-~/.cargo}/bin - - name: Run typos check - run: typos --config .github/config/typos.toml + - name: Check typos + uses: crate-ci/typos@v1.22.9 + with: + config: .github/config/typos.toml check-and-lint: name: Lint and check code @@ -110,24 +110,25 @@ jobs: fail-fast: false matrix: include: - - name: Darwin Clang - os: macos-11 - compiler: auto + # FIXME: update macos-11 to macos-12/13 + # - name: Darwin Clang + # os: macos-11 + # compiler: auto - name: Darwin Clang arm64 os: macos-14 compiler: auto - - name: Darwin Clang without Jemalloc - os: macos-11 - compiler: auto - without_jemalloc: -DDISABLE_JEMALLOC=ON - - name: Darwin Clang with OpenSSL - os: macos-11 - compiler: auto - with_openssl: -DENABLE_OPENSSL=ON - - name: Darwin Clang without luaJIT - os: macos-11 - compiler: auto - without_luajit: -DENABLE_LUAJIT=OFF + # - name: Darwin Clang without Jemalloc + # os: macos-11 + # compiler: auto + # without_jemalloc: -DDISABLE_JEMALLOC=ON + # - name: Darwin Clang with OpenSSL + # os: macos-11 + # compiler: auto + # with_openssl: -DENABLE_OPENSSL=ON + # - name: Darwin Clang without luaJIT + # os: macos-11 + # compiler: auto + # without_luajit: -DENABLE_LUAJIT=OFF - name: Ubuntu GCC os: ubuntu-20.04 compiler: gcc @@ -389,7 +390,7 @@ jobs: - uses: actions/checkout@v4 - name: Get core numbers run: echo "NPROC=$(nproc)" >> $GITHUB_ENV - - uses: docker/build-push-action@v5 + - uses: docker/build-push-action@v6 with: context: . build-args: MORE_BUILD_ARGS=-j${{ env.NPROC }} @@ -419,9 +420,6 @@ jobs: fail-fast: false matrix: include: - - name: CentOS 7 - image: centos:7 - compiler: gcc - name: openSUSE Leap 15 image: opensuse/leap:15 compiler: gcc @@ -433,17 +431,6 @@ jobs: container: image: ${{ matrix.image }} steps: - - name: Setup CentOS - if: ${{ startsWith(matrix.image, 'centos') }} - run: | - yum install -y centos-release-scl-rh - yum install -y devtoolset-11 python3 python3-pip autoconf automake wget git gcc gcc-c++ - echo "NPROC=$(nproc)" >> $GITHUB_ENV - mv /usr/bin/gcc /usr/bin/gcc-4.8.5 - ln -s /opt/rh/devtoolset-11/root/bin/gcc /usr/bin/gcc - mv /usr/bin/g++ /usr/bin/g++-4.8.5 - ln -s /opt/rh/devtoolset-11/root/bin/g++ /usr/bin/g++ - - name: Setup ArchLinux if: ${{ startsWith(matrix.image, 'archlinux') }} run: | @@ -486,15 +473,8 @@ jobs: pushd redis-6.2.14 && USE_JEMALLOC=no make -j$NPROC redis-cli && mv src/redis-cli $HOME/local/bin/ && popd pushd redis-6.2.14 && USE_JEMALLOC=no make -j$NPROC redis-server && mv src/redis-server $HOME/local/bin/ && popd - - name: Install cmake - if: ${{ startsWith(matrix.image, 'centos') }} - run: | - VERSION=3.26.4 - wget https://github.com/Kitware/CMake/releases/download/v$VERSION/cmake-$VERSION-linux-x86_64.sh - bash cmake-$VERSION-linux-x86_64.sh --skip-license --prefix=/usr - - - uses: actions/checkout@v3 #v4 use Node 20 and not working at CentOS 7 - - uses: actions/setup-go@v4 #v5 use Node 20 too + - uses: actions/checkout@v4 + - uses: actions/setup-go@v5 if: ${{ !startsWith(matrix.image, 'opensuse') }} with: go-version-file: 'tests/gocase/go.mod' diff --git a/.github/workflows/nightly.yaml b/.github/workflows/nightly.yaml index 736b603e017..2cbb4719653 100644 --- a/.github/workflows/nightly.yaml +++ b/.github/workflows/nightly.yaml @@ -61,7 +61,7 @@ jobs: type=sha,prefix=nightly-{{date 'YYYYMMDD'}}-,format=short type=raw,value=nightly - - uses: docker/build-push-action@v5 + - uses: docker/build-push-action@v6 with: context: . platforms: linux/amd64, linux/arm64 diff --git a/cmake/fmt.cmake b/cmake/fmt.cmake index 981272e47b3..b5cc7c28a05 100644 --- a/cmake/fmt.cmake +++ b/cmake/fmt.cmake @@ -20,8 +20,8 @@ include_guard() include(cmake/utils.cmake) FetchContent_DeclareGitHubWithMirror(fmt - fmtlib/fmt 10.2.1 - MD5=1bba4e8bdd7b0fa98f207559ffa380a3 + fmtlib/fmt 11.0.1 + MD5=6ca210c0a9e751705bacff468dd55ee1 ) FetchContent_MakeAvailableWithArgs(fmt) diff --git a/cmake/rocksdb.cmake b/cmake/rocksdb.cmake index 8b2ff67c843..c5799763be8 100644 --- a/cmake/rocksdb.cmake +++ b/cmake/rocksdb.cmake @@ -26,8 +26,8 @@ endif() include(cmake/utils.cmake) FetchContent_DeclareGitHubWithMirror(rocksdb - facebook/rocksdb v9.2.1 - MD5=b03db4c602754a2c50aa2967fd07a2a7 + facebook/rocksdb v9.3.1 + MD5=129235c789a963c004290d27d09ca48a ) FetchContent_GetProperties(jemalloc) diff --git a/cmake/tbb.cmake b/cmake/tbb.cmake index a408ac8b9dd..7fde5f30c03 100644 --- a/cmake/tbb.cmake +++ b/cmake/tbb.cmake @@ -20,8 +20,8 @@ include_guard() include(cmake/utils.cmake) FetchContent_DeclareGitHubWithMirror(tbb - oneapi-src/oneTBB v2021.12.0 - MD5=0919a8eda74333e1aafa8d602bb9cc90 + oneapi-src/oneTBB v2021.13.0 + MD5=2dd9b7cfa5de5bb3add2f7392e0c9bab ) FetchContent_MakeAvailableWithArgs(tbb diff --git a/kvrocks.conf b/kvrocks.conf index a5f20f09908..66715028493 100644 --- a/kvrocks.conf +++ b/kvrocks.conf @@ -492,15 +492,23 @@ profiling-sample-record-threshold-ms 100 ################################## CRON ################################### # Compact Scheduler, auto compact at schedule time -# Time expression format is the same as crontab (currently only support *, int and */int) -# e.g. compact-cron 0 3 * * * 0 4 * * * +# Time expression format is the same as crontab (supported cron syntax: *, n, */n, `1,3-6,9,11`) +# e.g. compact-cron 0 3,4 * * * # would compact the db at 3am and 4am everyday # compact-cron 0 3 * * * # The hour range that compaction checker would be active # e.g. compaction-checker-range 0-7 means compaction checker would be worker between # 0-7am every day. -compaction-checker-range 0-7 +# WARNING: this config option is deprecated and will be removed, +# please use compaction-checker-cron instead +# compaction-checker-range 0-7 + +# The time pattern that compaction checker would be active +# Time expression format is the same as crontab (supported cron syntax: *, n, */n, `1,3-6,9,11`) +# e.g. compaction-checker-cron * 0-7 * * * means compaction checker would be worker between +# 0-7am every day. +compaction-checker-cron * 0-7 * * * # When the compaction checker is triggered, the db will periodically pick the SST file # with the highest "deleted percentage" (i.e. the percentage of deleted keys in the SST @@ -515,14 +523,14 @@ compaction-checker-range 0-7 # force-compact-file-min-deleted-percentage 10 # Bgsave scheduler, auto bgsave at scheduled time -# Time expression format is the same as crontab (currently only support *, int and */int) -# e.g. bgsave-cron 0 3 * * * 0 4 * * * +# Time expression format is the same as crontab (supported cron syntax: *, n, */n, `1,3-6,9,11`) +# e.g. bgsave-cron 0 3,4 * * * # would bgsave the db at 3am and 4am every day # Kvrocks doesn't store the key number directly. It needs to scan the DB and # then retrieve the key number by using the dbsize scan command. # The Dbsize scan scheduler auto-recalculates the estimated keys at scheduled time. -# Time expression format is the same as crontab (currently only support *, int and */int) +# Time expression format is the same as crontab (supported cron syntax: *, n, */n, `1,3-6,9,11`) # e.g. dbsize-scan-cron 0 * * * * # would recalculate the keyspace infos of the db every hour. diff --git a/src/cluster/slot_migrate.cc b/src/cluster/slot_migrate.cc index a074d536b74..03625dc6275 100644 --- a/src/cluster/slot_migrate.cc +++ b/src/cluster/slot_migrate.cc @@ -382,6 +382,12 @@ Status SlotMigrator::sendSnapshotByCmd() { } } + if (auto s = iter->status(); !s.ok()) { + auto err_str = s.ToString(); + LOG(ERROR) << "[migrate] Failed to iterate keys of slot " << slot << ": " << err_str; + return {Status::NotOK, fmt::format("failed to iterate keys of slot {}: {}", slot, err_str)}; + } + // It's necessary to send commands that are still in the pipeline since the final pipeline may not be sent // while iterating keys because its size could be less than max_pipeline_size_ auto s = sendCmdsPipelineIfNeed(&restore_cmds, true); @@ -820,6 +826,11 @@ Status SlotMigrator::migrateComplexKey(const rocksdb::Slice &key, const Metadata } } + if (auto s = iter->status(); !s.ok()) { + return {Status::NotOK, + fmt::format("failed to iterate values of the complex key {}: {}", key.ToString(), s.ToString())}; + } + // Have to check the item count of the last command list if (item_count % kMaxItemsInCommand != 0) { *restore_cmds += redis::ArrayOfBulkStrings(user_cmd); @@ -880,6 +891,11 @@ Status SlotMigrator::migrateStream(const Slice &key, const StreamMetadata &metad } } + if (auto s = iter->status(); !s.ok()) { + return {Status::NotOK, + fmt::format("failed to iterate values of the stream key {}: {}", key.ToString(), s.ToString())}; + } + // commands like XTRIM and XDEL affect stream's metadata, but we use only XADD for a slot migration // XSETID is used to adjust stream's info on the destination node according to the current values on the source *restore_cmds += redis::ArrayOfBulkStrings({"XSETID", key.ToString(), metadata.last_generated_id.ToString(), diff --git a/src/commands/cmd_json.cc b/src/commands/cmd_json.cc index 8f788953814..7708d2f8785 100644 --- a/src/commands/cmd_json.cc +++ b/src/commands/cmd_json.cc @@ -423,8 +423,7 @@ class CommandJsonArrTrim : public Commander { auto s = json.ArrTrim(args_[1], path_, start_, stop_, &results); if (s.IsNotFound()) { - *output = conn->NilString(); - return Status::OK(); + return {Status::RedisExecErr, "could not perform this operation on a key that doesn't exist"}; } if (!s.ok()) return {Status::RedisExecErr, s.ToString()}; diff --git a/src/commands/cmd_stream.cc b/src/commands/cmd_stream.cc index ba4f7d9870c..7faeb9c49cc 100644 --- a/src/commands/cmd_stream.cc +++ b/src/commands/cmd_stream.cc @@ -18,6 +18,8 @@ * */ +#include +#include #include #include @@ -358,6 +360,97 @@ class CommandXClaim : public Commander { } }; +class CommandAutoClaim : public Commander { + public: + Status Parse(const std::vector &args) override { + CommandParser parser(args, 1); + key_name_ = GET_OR_RET(parser.TakeStr()); + group_name_ = GET_OR_RET(parser.TakeStr()); + consumer_name_ = GET_OR_RET(parser.TakeStr()); + if (auto parse_status = parser.TakeInt(); !parse_status.IsOK()) { + return {Status::RedisParseErr, "Invalid min-idle-time argument for XAUTOCLAIM"}; + } else { + options_.min_idle_time_ms = parse_status.GetValue(); + } + + auto start_str = GET_OR_RET(parser.TakeStr()); + if (!start_str.empty() && start_str.front() == '(') { + options_.exclude_start = true; + start_str = start_str.substr(1); + } + if (!options_.exclude_start && start_str == "-") { + options_.start_id = StreamEntryID::Minimum(); + } else { + auto parse_status = ParseRangeStart(start_str, &options_.start_id); + if (!parse_status.IsOK()) { + return parse_status; + } + } + + if (parser.EatEqICase("count")) { + uint64_t count = GET_OR_RET(parser.TakeInt()); + constexpr uint64_t min_count = 1; + uint64_t max_count = std::numeric_limits::max() / + (std::max(static_cast(sizeof(StreamEntryID)), options_.attempts_factors)); + if (count < min_count || count > max_count) { + return {Status::RedisParseErr, "COUNT must be > 0"}; + } + options_.count = count; + } + + if (parser.Good() && parser.EatEqICase("justid")) { + options_.just_id = true; + } + + return Status::OK(); + } + + Status Execute(Server *srv, Connection *conn, std::string *output) override { + redis::Stream stream_db(srv->storage, conn->GetNamespace()); + StreamAutoClaimResult result; + auto s = stream_db.AutoClaim(key_name_, group_name_, consumer_name_, options_, &result); + if (!s.ok()) { + if (s.IsNotFound()) { + return {Status::RedisExecErr, + "NOGROUP No such key '" + key_name_ + "' or consumer group '" + group_name_ + "'"}; + } + return {Status::RedisExecErr, s.ToString()}; + } + return sendResults(conn, result, output); + } + + private: + Status sendResults(Connection *conn, const StreamAutoClaimResult &result, std::string *output) const { + output->append(redis::MultiLen(3)); + output->append(redis::BulkString(result.next_claim_id)); + output->append(redis::MultiLen(result.entries.size())); + for (const auto &item : result.entries) { + if (options_.just_id) { + output->append(redis::BulkString(item.key)); + } else { + output->append(redis::MultiLen(2)); + output->append(redis::BulkString(item.key)); + output->append(redis::MultiLen(item.values.size())); + for (const auto &value_item : item.values) { + output->append(redis::BulkString(value_item)); + } + } + } + + output->append(redis::MultiLen(result.deleted_ids.size())); + for (const auto &item : result.deleted_ids) { + output->append(redis::BulkString(item)); + } + + return Status::OK(); + } + + std::string key_name_; + std::string group_name_; + std::string consumer_name_; + StreamAutoClaimOptions options_; +}; + class CommandXGroup : public Commander { public: Status Parse(const std::vector &args) override { @@ -1647,6 +1740,7 @@ REDIS_REGISTER_COMMANDS(MakeCmdAttr("xack", -4, "write no-dbsize-ch MakeCmdAttr("xadd", -5, "write", 1, 1, 1), MakeCmdAttr("xdel", -3, "write no-dbsize-check", 1, 1, 1), MakeCmdAttr("xclaim", -6, "write", 1, 1, 1), + MakeCmdAttr("xautoclaim", -6, "write", 1, 1, 1), MakeCmdAttr("xgroup", -4, "write", 2, 2, 1), MakeCmdAttr("xlen", -2, "read-only", 1, 1, 1), MakeCmdAttr("xinfo", -2, "read-only", 0, 0, 0), diff --git a/src/common/cron.cc b/src/common/cron.cc index 9bac0ca550f..041ffe160be 100644 --- a/src/common/cron.cc +++ b/src/common/cron.cc @@ -23,33 +23,54 @@ #include #include +#include "fmt/core.h" #include "parse_util.h" #include "string_util.h" -std::string Scheduler::ToString() const { - auto param2string = [](int n, bool is_interval) -> std::string { - if (n == -1) return "*"; - return is_interval ? "*/" + std::to_string(n) : std::to_string(n); - }; - return param2string(minute, minute_interval) + " " + param2string(hour, hour_interval) + " " + - param2string(mday, mday_interval) + " " + param2string(month, month_interval) + " " + - param2string(wday, wday_interval); +std::string CronScheduler::ToString() const { + return fmt::format("{} {} {} {} {}", minute.ToString(), hour.ToString(), mday.ToString(), month.ToString(), + wday.ToString()); } +bool CronScheduler::IsMatch(const tm *tm) const { + bool minute_match = minute.IsMatch(tm->tm_min); + bool hour_match = hour.IsMatch(tm->tm_hour); + bool mday_match = mday.IsMatch(tm->tm_mday, 1); + bool month_match = month.IsMatch(tm->tm_mon + 1, 1); + bool wday_match = wday.IsMatch(tm->tm_wday); + + return minute_match && hour_match && mday_match && month_match && wday_match; +} + +StatusOr CronScheduler::Parse(std::string_view minute, std::string_view hour, std::string_view mday, + std::string_view month, std::string_view wday) { + CronScheduler st; + + st.minute = GET_OR_RET(CronPattern::Parse(minute, {0, 59})); + st.hour = GET_OR_RET(CronPattern::Parse(hour, {0, 23})); + st.mday = GET_OR_RET(CronPattern::Parse(mday, {1, 31})); + st.month = GET_OR_RET(CronPattern::Parse(month, {1, 12})); + st.wday = GET_OR_RET(CronPattern::Parse(wday, {0, 6})); + + return st; +} + +void Cron::Clear() { schedulers_.clear(); } + Status Cron::SetScheduleTime(const std::vector &args) { if (args.empty()) { schedulers_.clear(); return Status::OK(); } if (args.size() % 5 != 0) { - return {Status::NotOK, "time expression format error,should only contain 5x fields"}; + return {Status::NotOK, "cron expression format error, should only contain 5x fields"}; } - std::vector new_schedulers; + std::vector new_schedulers; for (size_t i = 0; i < args.size(); i += 5) { - auto s = convertToScheduleTime(args[i], args[i + 1], args[i + 2], args[i + 3], args[i + 4]); + auto s = CronScheduler::Parse(args[i], args[i + 1], args[i + 2], args[i + 3], args[i + 4]); if (!s.IsOK()) { - return std::move(s).Prefixed("time expression format error"); + return std::move(s).Prefixed("cron expression format error"); } new_schedulers.push_back(*s); } @@ -63,20 +84,8 @@ bool Cron::IsTimeMatch(const tm *tm) { return false; } - auto match = [](int current, int val, bool interval, int interval_offset) { - if (val == -1) return true; - if (interval) return (current - interval_offset) % val == 0; - return val == current; - }; - for (const auto &st : schedulers_) { - bool minute_match = match(tm->tm_min, st.minute, st.minute_interval, 0); - bool hour_match = match(tm->tm_hour, st.hour, st.hour_interval, 0); - bool mday_match = match(tm->tm_mday, st.mday, st.mday_interval, 1); - bool month_match = match(tm->tm_mon + 1, st.month, st.month_interval, 1); - bool wday_match = match(tm->tm_wday, st.wday, st.wday_interval, 0); - - if (minute_match && hour_match && mday_match && month_match && wday_match) { + if (st.IsMatch(tm)) { last_tm_ = *tm; return true; } @@ -94,40 +103,3 @@ std::string Cron::ToString() const { } return ret; } - -StatusOr Cron::convertToScheduleTime(const std::string &minute, const std::string &hour, - const std::string &mday, const std::string &month, - const std::string &wday) { - Scheduler st; - - st.minute = GET_OR_RET(convertParam(minute, 0, 59, st.minute_interval)); - st.hour = GET_OR_RET(convertParam(hour, 0, 23, st.hour_interval)); - st.mday = GET_OR_RET(convertParam(mday, 1, 31, st.mday_interval)); - st.month = GET_OR_RET(convertParam(month, 1, 12, st.month_interval)); - st.wday = GET_OR_RET(convertParam(wday, 0, 6, st.wday_interval)); - - return st; -} - -StatusOr Cron::convertParam(const std::string ¶m, int lower_bound, int upper_bound, bool &is_interval) { - if (param == "*") { - return -1; - } - - // Check for interval syntax (*/n) - if (util::HasPrefix(param, "*/")) { - auto s = ParseInt(param.substr(2), {lower_bound, upper_bound}, 10); - if (!s || *s == 0) { - return std::move(s).Prefixed(fmt::format("malformed cron token `{}`", param)); - } - is_interval = true; - return *s; - } - - auto s = ParseInt(param, {lower_bound, upper_bound}, 10); - if (!s) { - return std::move(s).Prefixed(fmt::format("malformed cron token `{}`", param)); - } - - return *s; -} diff --git a/src/common/cron.h b/src/common/cron.h index 325745e9fcd..ede6231beab 100644 --- a/src/common/cron.h +++ b/src/common/cron.h @@ -23,25 +23,135 @@ #include #include #include +#include #include +#include "parse_util.h" #include "status.h" +#include "string_util.h" -struct Scheduler { - int minute; - int hour; - int mday; - int month; - int wday; +struct CronPattern { + using Number = int; + using Range = std::pair; - // Whether we use */n interval syntax - bool minute_interval = false; - bool hour_interval = false; - bool mday_interval = false; - bool month_interval = false; - bool wday_interval = false; + struct Interval { + int interval; + }; // */n + struct Any {}; // * + using Numbers = std::vector>; // 1,2,3-6,7 + + std::variant val; + + static StatusOr Parse(std::string_view str, std::tuple minmax) { + if (str == "*") { + return CronPattern{Any{}}; + } else if (str.rfind("*/", 0) == 0) { + auto num_str = str.substr(2); + auto interval = GET_OR_RET(ParseInt(std::string(num_str.begin(), num_str.end()), minmax) + .Prefixed("an integer is expected after `*/` in a cron expression")); + + if (interval == 0) { + return {Status::NotOK, "interval value after `*/` cannot be zero"}; + } + + return CronPattern{Interval{interval}}; + } else { + auto num_strs = util::Split(str, ","); + + Numbers results; + for (const auto &num_str : num_strs) { + if (auto pos = num_str.find('-'); pos != num_str.npos) { + auto l_str = num_str.substr(0, pos); + auto r_str = num_str.substr(pos + 1); + auto l = GET_OR_RET( + ParseInt(l_str, minmax).Prefixed("an integer is expected before `-` in a cron expression")); + auto r = GET_OR_RET( + ParseInt(r_str, minmax).Prefixed("an integer is expected after `-` in a cron expression")); + + if (l >= r) { + return {Status::NotOK, "for pattern `l-r` in cron expression, r should be larger than l"}; + } + results.push_back(Range(l, r)); + } else { + auto n = GET_OR_RET(ParseInt(std::string(num_str.begin(), num_str.end()), minmax) + .Prefixed("an integer is expected in a cron expression")); + results.push_back(n); + } + } + + if (results.empty()) { + return {Status::NotOK, "invalid cron expression"}; + } + + return CronPattern{results}; + } + } + + std::string ToString() const { + if (std::holds_alternative(val)) { + std::string result; + bool first = true; + + for (const auto &v : std::get(val)) { + if (first) + first = false; + else + result += ","; + + if (std::holds_alternative(v)) { + result += std::to_string(std::get(v)); + } else { + auto range = std::get(v); + result += std::to_string(range.first) + "-" + std::to_string(range.second); + } + } + + return result; + } else if (std::holds_alternative(val)) { + return "*/" + std::to_string(std::get(val).interval); + } else if (std::holds_alternative(val)) { + return "*"; + } + + __builtin_unreachable(); + } + + bool IsMatch(int input, int interval_offset = 0) const { + if (std::holds_alternative(val)) { + bool result = false; + for (const auto &v : std::get(val)) { + if (std::holds_alternative(v)) { + result = result || input == std::get(v); + } else { + auto range = std::get(v); + result = result || (range.first <= input && input <= range.second); + } + } + + return result; + } else if (std::holds_alternative(val)) { + return (input - interval_offset) % std::get(val).interval == 0; + } else if (std::holds_alternative(val)) { + return true; + } + + __builtin_unreachable(); + } +}; + +struct CronScheduler { + CronPattern minute; + CronPattern hour; + CronPattern mday; + CronPattern month; + CronPattern wday; std::string ToString() const; + + static StatusOr Parse(std::string_view minute, std::string_view hour, std::string_view mday, + std::string_view month, std::string_view wday); + + bool IsMatch(const tm *tm) const; }; class Cron { @@ -50,16 +160,13 @@ class Cron { ~Cron() = default; Status SetScheduleTime(const std::vector &args); + void Clear(); + bool IsTimeMatch(const tm *tm); std::string ToString() const; bool IsEnabled() const; private: - std::vector schedulers_; + std::vector schedulers_; tm last_tm_ = {0, 0, 0, 0, 0, 0, 0, 0, 0, 0, nullptr}; - - static StatusOr convertToScheduleTime(const std::string &minute, const std::string &hour, - const std::string &mday, const std::string &month, - const std::string &wday); - static StatusOr convertParam(const std::string ¶m, int lower_bound, int upper_bound, bool &is_interval); }; diff --git a/src/config/config.cc b/src/config/config.cc index 3163dd97ba6..4ff8901f9c8 100644 --- a/src/config/config.cc +++ b/src/config/config.cc @@ -141,6 +141,7 @@ Config::Config() { {"replica-announce-ip", false, new StringField(&replica_announce_ip, "")}, {"replica-announce-port", false, new UInt32Field(&replica_announce_port, 0, 0, PORT_LIMIT)}, {"compaction-checker-range", false, new StringField(&compaction_checker_range_str_, "")}, + {"compaction-checker-cron", false, new StringField(&compaction_checker_cron_str_, "")}, {"force-compact-file-age", false, new Int64Field(&force_compact_file_age, 2 * 24 * 3600, 60, INT64_MAX)}, {"force-compact-file-min-deleted-percentage", false, new IntField(&force_compact_file_min_deleted_percentage, 10, 1, 100)}, @@ -300,21 +301,19 @@ void Config::initFieldValidator() { }}, {"compaction-checker-range", [this](const std::string &k, const std::string &v) -> Status { + if (!compaction_checker_cron_str_.empty()) { + return {Status::NotOK, "compaction-checker-range cannot be set while compaction-checker-cron is set"}; + } if (v.empty()) { - compaction_checker_range.start = -1; - compaction_checker_range.stop = -1; + compaction_checker_cron.Clear(); return Status::OK(); } - std::vector args = util::Split(v, "-"); - if (args.size() != 2) { - return {Status::NotOK, "invalid range format, the range should be between 0 and 24"}; - } - auto start = GET_OR_RET(ParseInt(args[0], {0, 24}, 10)), - stop = GET_OR_RET(ParseInt(args[1], {0, 24}, 10)); - if (start > stop) return {Status::NotOK, "invalid range format, start should be smaller than stop"}; - compaction_checker_range.start = start; - compaction_checker_range.stop = stop; - return Status::OK(); + return compaction_checker_cron.SetScheduleTime({"*", v, "*", "*", "*"}); + }}, + {"compaction-checker-cron", + [this](const std::string &k, const std::string &v) -> Status { + std::vector args = util::Split(v, " \t"); + return compaction_checker_cron.SetScheduleTime(args); }}, {"rename-command", [](const std::string &k, const std::string &v) -> Status { diff --git a/src/config/config.h b/src/config/config.h index c4bc705b006..73936de3eb8 100644 --- a/src/config/config.h +++ b/src/config/config.h @@ -57,14 +57,6 @@ constexpr const char *kDefaultNamespace = "__namespace"; enum class BlockCacheType { kCacheTypeLRU = 0, kCacheTypeHCC }; -struct CompactionCheckerRange { - public: - int start; - int stop; - - bool Enabled() const { return start != -1 || stop != -1; } -}; - struct CLIOptions { std::string conf_file; std::vector> cli_options; @@ -138,7 +130,7 @@ struct Config { Cron compact_cron; Cron bgsave_cron; Cron dbsize_scan_cron; - CompactionCheckerRange compaction_checker_range{-1, -1}; + Cron compaction_checker_cron; int64_t force_compact_file_age; int force_compact_file_min_deleted_percentage; bool repl_namespace_enabled = false; @@ -249,6 +241,7 @@ struct Config { std::string bgsave_cron_str_; std::string dbsize_scan_cron_str_; std::string compaction_checker_range_str_; + std::string compaction_checker_cron_str_; std::string profiling_sample_commands_str_; std::map> fields_; std::vector rename_command_; diff --git a/src/search/executors/filter_executor.h b/src/search/executors/filter_executor.h index f668af7d620..df14b29b80a 100644 --- a/src/search/executors/filter_executor.h +++ b/src/search/executors/filter_executor.h @@ -76,8 +76,15 @@ struct QueryExprEvaluator { auto val = GET_OR_RET(ctx->Retrieve(row, v->field->info)); CHECK(val.Is()); - auto split = val.Get(); - return std::find(split.begin(), split.end(), v->tag->val) != split.end(); + auto tags = val.Get(); + + auto meta = v->field->info->MetadataAs(); + if (meta->case_sensitive) { + return std::find(tags.begin(), tags.end(), v->tag->val) != tags.end(); + } else { + return std::find_if(tags.begin(), tags.end(), + [v](const auto &tag) { return util::EqualICase(tag, v->tag->val); }) != tags.end(); + } } StatusOr Visit(NumericCompareExpr *v) const { diff --git a/src/search/executors/tag_field_scan_executor.h b/src/search/executors/tag_field_scan_executor.h index 9eff2e66aa5..946db767432 100644 --- a/src/search/executors/tag_field_scan_executor.h +++ b/src/search/executors/tag_field_scan_executor.h @@ -29,6 +29,7 @@ #include "storage/redis_db.h" #include "storage/redis_metadata.h" #include "storage/storage.h" +#include "string_util.h" namespace kqir { @@ -39,13 +40,15 @@ struct TagFieldScanExecutor : ExecutorNode { IndexInfo *index; std::string index_key; + bool case_sensitive; TagFieldScanExecutor(ExecutorContext *ctx, TagFieldScan *scan) : ExecutorNode(ctx), scan(scan), ss(ctx->storage), index(scan->field->info->index), - index_key(redis::SearchKey(index->ns, index->name, scan->field->name).ConstructTagFieldData(scan->tag, {})) {} + index_key(redis::SearchKey(index->ns, index->name, scan->field->name).ConstructTagFieldData(scan->tag, {})), + case_sensitive(scan->field->info->MetadataAs()->case_sensitive) {} bool InRangeDecode(Slice key, Slice *user_key) const { uint8_t ns_size = 0; @@ -66,7 +69,7 @@ struct TagFieldScanExecutor : ExecutorNode { if (value != scan->field->name) return false; if (!GetSizedString(&key, &value)) return false; - if (value != scan->tag) return false; + if (case_sensitive ? value != scan->tag : !util::EqualICase(value.ToStringView(), scan->tag)) return false; if (!GetSizedString(&key, user_key)) return false; diff --git a/src/search/index_manager.h b/src/search/index_manager.h index 1d7447047ee..0f90961f52c 100644 --- a/src/search/index_manager.h +++ b/src/search/index_manager.h @@ -129,6 +129,10 @@ struct IndexManager { index_map.Insert(std::move(info)); } + if (auto s = iter->status(); !s.ok()) { + return {Status::NotOK, fmt::format("fail to load index metadata: {}", s.ToString())}; + } + return Status::OK(); } diff --git a/src/search/indexer.cc b/src/search/indexer.cc index 9db89584004..576de07343a 100644 --- a/src/search/indexer.cc +++ b/src/search/indexer.cc @@ -359,6 +359,10 @@ Status IndexUpdater::Build() const { if (s.Is()) continue; if (!s.OK()) return s; } + + if (auto s = iter->status(); !s.ok()) { + return {Status::NotOK, s.ToString()}; + } } return Status::OK(); diff --git a/src/server/server.cc b/src/server/server.cc index 2da7dfa00c1..5e3b7f1363c 100644 --- a/src/server/server.cc +++ b/src/server/server.cc @@ -204,16 +204,18 @@ Status Server::Start() { if (storage->IsClosing()) continue; if (!is_loading_ && ++counter % 600 == 0 // check every minute - && config_->compaction_checker_range.Enabled()) { - auto now_hours = util::GetTimeStamp(); - if (now_hours >= config_->compaction_checker_range.start && - now_hours <= config_->compaction_checker_range.stop) { + && config_->compaction_checker_cron.IsEnabled()) { + auto t_now = static_cast(util::GetTimeStamp()); + std::tm now{}; + localtime_r(&t_now, &now); + if (config_->compaction_checker_cron.IsTimeMatch(&now)) { const auto &column_family_list = engine::ColumnFamilyConfigs::ListAllColumnFamilies(); for (auto &column_family : column_family_list) { compaction_checker.PickCompactionFilesForCf(column_family); } } // compact once per day + auto now_hours = t_now / 3600; if (now_hours != 0 && last_compact_date != now_hours / 24) { last_compact_date = now_hours / 24; compaction_checker.CompactPropagateAndPubSubFiles(); @@ -756,7 +758,7 @@ void Server::cron() { std::tm now{}; localtime_r(&t, &now); // disable compaction cron when the compaction checker was enabled - if (!config_->compaction_checker_range.Enabled() && config_->compact_cron.IsEnabled() && + if (!config_->compaction_checker_cron.IsEnabled() && config_->compact_cron.IsEnabled() && config_->compact_cron.IsTimeMatch(&now)) { Status s = AsyncCompactDB(); LOG(INFO) << "[server] Schedule to compact the db, result: " << s.Msg(); diff --git a/src/storage/redis_db.cc b/src/storage/redis_db.cc index 4f08490bd66..0fd3113778b 100644 --- a/src/storage/redis_db.cc +++ b/src/storage/redis_db.cc @@ -299,6 +299,10 @@ rocksdb::Status Database::Keys(const std::string &prefix, std::vectorstatus(); !s.ok()) { + return s; + } + if (!storage_->IsSlotIdEncoded()) break; if (prefix.empty()) break; if (++slot_id >= HASH_SLOTS_SIZE) break; @@ -368,6 +372,11 @@ rocksdb::Status Database::Scan(const std::string &cursor, uint64_t limit, const keys->emplace_back(user_key); cnt++; } + + if (auto s = iter->status(); !s.ok()) { + return s; + } + if (!storage_->IsSlotIdEncoded() || prefix.empty()) { if (!keys->empty() && cnt >= limit) { end_cursor->append(user_key); @@ -587,7 +596,7 @@ rocksdb::Status SubKeyScanner::Scan(RedisType type, const Slice &user_key, const break; } } - return rocksdb::Status::OK(); + return iter->status(); } RedisType WriteBatchLogData::GetRedisType() const { return type_; } diff --git a/src/storage/scripting.cc b/src/storage/scripting.cc index 5197eb5c1b1..987f9f0dfc0 100644 --- a/src/storage/scripting.cc +++ b/src/storage/scripting.cc @@ -452,6 +452,10 @@ Status FunctionList(Server *srv, const redis::Connection *conn, const std::strin result.emplace_back(lib.ToString(), iter->value().ToString()); } + if (auto s = iter->status(); !s.ok()) { + return {Status::NotOK, s.ToString()}; + } + output->append(redis::MultiLen(result.size())); for (const auto &[lib, code] : result) { output->append(conn->HeaderOfMap(with_code ? 2 : 1)); @@ -488,6 +492,10 @@ Status FunctionListFunc(Server *srv, const redis::Connection *conn, const std::s result.emplace_back(func.ToString(), iter->value().ToString()); } + if (auto s = iter->status(); !s.ok()) { + return {Status::NotOK, s.ToString()}; + } + output->append(redis::MultiLen(result.size())); for (const auto &[func, lib] : result) { output->append(conn->HeaderOfMap(2)); diff --git a/src/types/redis_stream.cc b/src/types/redis_stream.cc index 8b624a2ea8c..97fcf948082 100644 --- a/src/types/redis_stream.cc +++ b/src/types/redis_stream.cc @@ -511,6 +511,170 @@ rocksdb::Status Stream::ClaimPelEntries(const Slice &stream_name, const std::str return storage_->Write(storage_->DefaultWriteOptions(), batch->GetWriteBatch()); } +rocksdb::Status Stream::AutoClaim(const Slice &stream_name, const std::string &group_name, + const std::string &consumer_name, const StreamAutoClaimOptions &options, + StreamAutoClaimResult *result) { + if (options.exclude_start && options.start_id.IsMaximum()) { + return rocksdb::Status::InvalidArgument("invalid start ID for the interval"); + } + + std::string ns_key = AppendNamespacePrefix(stream_name); + StreamMetadata metadata(false); + + LockGuard guard(storage_->GetLockManager(), ns_key); + auto s = GetMetadata(GetOptions{}, ns_key, &metadata); + if (!s.ok()) { // not found will be caught by outside with no such key or consumer group + return s; + } + + std::string consumer_key = internalKeyFromConsumerName(ns_key, metadata, group_name, consumer_name); + std::string get_consumer_value; + s = storage_->Get(rocksdb::ReadOptions(), stream_cf_handle_, consumer_key, &get_consumer_value); + if (!s.ok() && !s.IsNotFound()) { + return s; + } + if (s.IsNotFound()) { + int created_number = 0; + s = createConsumerWithoutLock(stream_name, group_name, consumer_name, &created_number); + if (!s.ok()) { + return s; + } + s = storage_->Get(rocksdb::ReadOptions(), stream_cf_handle_, consumer_key, &get_consumer_value); + if (!s.ok()) { + return s; + } + } + + StreamConsumerMetadata current_consumer_metadata = decodeStreamConsumerMetadataValue(get_consumer_value); + std::map claimed_consumer_entity_count; + std::string prefix_key = internalPelKeyFromGroupAndEntryId(ns_key, metadata, group_name, options.start_id); + std::string end_key = internalPelKeyFromGroupAndEntryId(ns_key, metadata, group_name, StreamEntryID::Maximum()); + + LatestSnapShot ss{storage_}; + rocksdb::ReadOptions read_options = storage_->DefaultScanOptions(); + read_options.snapshot = ss.GetSnapShot(); + rocksdb::Slice lower_bound(prefix_key); + rocksdb::Slice upper_bound(end_key); + read_options.iterate_lower_bound = &lower_bound; + read_options.iterate_upper_bound = &upper_bound; + + auto count = options.count; + uint64_t attempts = options.attempts_factors * count; + auto now_ms = util::GetTimeStampMS(); + std::vector deleted_entries; + std::vector pending_entries; + + auto batch = storage_->GetWriteBatchBase(); + WriteBatchLogData log_data(kRedisStream); + batch->PutLogData(log_data.Encode()); + + auto iter = util::UniqueIterator(storage_, read_options, stream_cf_handle_); + uint64_t total_claimed_count = 0; + for (iter->SeekToFirst(); iter->Valid() && count > 0 && attempts > 0; iter->Next()) { + if (identifySubkeyType(iter->key()) == StreamSubkeyType::StreamPelEntry) { + std::string tmp_group_name; + StreamEntryID entry_id = groupAndEntryIdFromPelInternalKey(iter->key(), tmp_group_name); + if (tmp_group_name != group_name) { + continue; + } + + if (options.exclude_start && entry_id == options.start_id) { + continue; + } + + attempts--; + + StreamPelEntry penl_entry = decodeStreamPelEntryValue(iter->value().ToString()); + if ((now_ms - penl_entry.last_delivery_time_ms) < options.min_idle_time_ms) { + continue; + } + + auto entry_key = internalKeyFromEntryID(ns_key, metadata, entry_id); + std::string entry_value; + s = storage_->Get(rocksdb::ReadOptions(), stream_cf_handle_, entry_key, &entry_value); + if (!s.ok()) { + if (s.IsNotFound()) { + deleted_entries.push_back(entry_id); + batch->Delete(stream_cf_handle_, iter->key()); + --count; + continue; + } + return s; + } + + StreamEntry entry(entry_id.ToString(), {}); + if (!options.just_id) { + auto rv_status = DecodeRawStreamEntryValue(entry_value, &entry.values); + if (!rv_status.OK()) { + return rocksdb::Status::InvalidArgument(rv_status.Msg()); + } + } + + pending_entries.emplace_back(std::move(entry)); + --count; + + if (penl_entry.consumer_name != consumer_name) { + ++total_claimed_count; + claimed_consumer_entity_count[penl_entry.consumer_name] += 1; + penl_entry.consumer_name = consumer_name; + penl_entry.last_delivery_time_ms = now_ms; + // Increment the delivery attempts counter unless JUSTID option provided + if (!options.just_id) { + penl_entry.last_delivery_count += 1; + } + batch->Put(stream_cf_handle_, iter->key(), encodeStreamPelEntryValue(penl_entry)); + } + } + } + + if (total_claimed_count > 0 && !pending_entries.empty()) { + current_consumer_metadata.pending_number += total_claimed_count; + current_consumer_metadata.last_attempted_interaction_ms = now_ms; + + batch->Put(stream_cf_handle_, consumer_key, encodeStreamConsumerMetadataValue(current_consumer_metadata)); + + for (const auto &[consumer, count] : claimed_consumer_entity_count) { + std::string tmp_consumer_key = internalKeyFromConsumerName(ns_key, metadata, group_name, consumer); + std::string tmp_consumer_value; + s = storage_->Get(rocksdb::ReadOptions(), stream_cf_handle_, tmp_consumer_key, &tmp_consumer_value); + if (!s.ok()) { + return s; + } + StreamConsumerMetadata tmp_consumer_metadata = decodeStreamConsumerMetadataValue(tmp_consumer_value); + tmp_consumer_metadata.pending_number -= count; + batch->Put(stream_cf_handle_, tmp_consumer_key, encodeStreamConsumerMetadataValue(tmp_consumer_metadata)); + } + } + + bool has_next_entry = false; + for (; iter->Valid(); iter->Next()) { + if (identifySubkeyType(iter->key()) == StreamSubkeyType::StreamPelEntry) { + has_next_entry = true; + break; + } + } + + if (auto s = iter->status(); !s.ok()) { + return s; + } + + if (has_next_entry) { + std::string tmp_group_name; + StreamEntryID entry_id = groupAndEntryIdFromPelInternalKey(iter->key(), tmp_group_name); + result->next_claim_id = entry_id.ToString(); + } else { + result->next_claim_id = StreamEntryID::Minimum().ToString(); + } + + result->entries = std::move(pending_entries); + result->deleted_ids.clear(); + result->deleted_ids.reserve(deleted_entries.size()); + std::transform(deleted_entries.cbegin(), deleted_entries.cend(), std::back_inserter(result->deleted_ids), + [](const StreamEntryID &id) { return id.ToString(); }); + + return storage_->Write(storage_->DefaultWriteOptions(), batch->GetWriteBatch()); +} + rocksdb::Status Stream::CreateGroup(const Slice &stream_name, const StreamXGroupCreateOptions &options, const std::string &group_name) { if (std::isdigit(group_name[0])) { @@ -603,6 +767,10 @@ rocksdb::Status Stream::DestroyGroup(const Slice &stream_name, const std::string *delete_cnt += 1; } + if (auto s = iter->status(); !s.ok()) { + return s; + } + if (*delete_cnt != 0) { metadata.group_number -= 1; std::string metadata_bytes; @@ -735,6 +903,11 @@ rocksdb::Status Stream::DestroyConsumer(const Slice &stream_name, const std::str } } } + + if (auto s = iter->status(); !s.ok()) { + return s; + } + batch->Delete(stream_cf_handle_, consumer_key); StreamConsumerGroupMetadata group_metadata = decodeStreamConsumerGroupMetadataValue(get_group_value); group_metadata.consumer_number -= 1; @@ -941,6 +1114,10 @@ rocksdb::Status Stream::Len(const Slice &stream_name, const StreamLenOptions &op } } + if (auto s = iter->status(); !s.ok()) { + return s; + } + return rocksdb::Status::OK(); } @@ -1018,6 +1195,10 @@ rocksdb::Status Stream::range(const std::string &ns_key, const StreamMetadata &m } } + if (auto s = iter->status(); !s.ok()) { + return s; + } + return rocksdb::Status::OK(); } @@ -1181,7 +1362,7 @@ rocksdb::Status Stream::GetGroupInfo(const Slice &stream_name, group_metadata.push_back(tmp_item); } } - return rocksdb::Status::OK(); + return iter->status(); } rocksdb::Status Stream::GetConsumerInfo( @@ -1215,7 +1396,7 @@ rocksdb::Status Stream::GetConsumerInfo( consumer_metadata.push_back(tmp_item); } } - return rocksdb::Status::OK(); + return iter->status(); } rocksdb::Status Stream::Range(const Slice &stream_name, const StreamRangeOptions &options, @@ -1379,6 +1560,10 @@ rocksdb::Status Stream::RangeWithPending(const Slice &stream_name, StreamRangeOp if (count >= options.count) break; } } + + if (auto s = iter->status(); !s.ok()) { + return s; + } } batch->Put(stream_cf_handle_, group_key, encodeStreamConsumerGroupMetadataValue(consumergroup_metadata)); batch->Put(stream_cf_handle_, consumer_key, encodeStreamConsumerMetadataValue(consumer_metadata)); diff --git a/src/types/redis_stream.h b/src/types/redis_stream.h index e0546ded418..510cbb66058 100644 --- a/src/types/redis_stream.h +++ b/src/types/redis_stream.h @@ -55,6 +55,8 @@ class Stream : public SubKeyScanner { const std::string &consumer_name, uint64_t min_idle_time_ms, const std::vector &entry_ids, const StreamClaimOptions &options, StreamClaimResult *result); + rocksdb::Status AutoClaim(const Slice &stream_name, const std::string &group_name, const std::string &consumer_name, + const StreamAutoClaimOptions &options, StreamAutoClaimResult *result); rocksdb::Status Len(const Slice &stream_name, const StreamLenOptions &options, uint64_t *size); rocksdb::Status GetStreamInfo(const Slice &stream_name, bool full, uint64_t count, StreamInfo *info); rocksdb::Status GetGroupInfo(const Slice &stream_name, diff --git a/src/types/redis_stream_base.h b/src/types/redis_stream_base.h index 889e4046fb8..82c8f945b39 100644 --- a/src/types/redis_stream_base.h +++ b/src/types/redis_stream_base.h @@ -173,6 +173,15 @@ struct StreamClaimOptions { StreamEntryID last_delivered_id; }; +struct StreamAutoClaimOptions { + uint64_t min_idle_time_ms; + uint64_t count = 100; + uint64_t attempts_factors = 10; + StreamEntryID start_id; + bool just_id = false; + bool exclude_start = false; +}; + struct StreamConsumerGroupMetadata { uint64_t consumer_number = 0; uint64_t pending_number = 0; @@ -224,6 +233,12 @@ struct StreamClaimResult { std::vector entries; }; +struct StreamAutoClaimResult { + std::string next_claim_id; + std::vector entries; + std::vector deleted_ids; +}; + Status IncrementStreamEntryID(StreamEntryID *id); Status ParseStreamEntryID(const std::string &input, StreamEntryID *id); StatusOr> ParseNextStreamEntryIDStrategy(const std::string &input); diff --git a/src/types/redis_string.cc b/src/types/redis_string.cc index b934f3b8d5d..43cf2a30924 100644 --- a/src/types/redis_string.cc +++ b/src/types/redis_string.cc @@ -597,10 +597,10 @@ rocksdb::Status String::LCS(const std::string &user_key1, const std::string &use uint32_t i = alen; uint32_t j = blen; - uint32_t arange_start = alen; // alen signals that values are not set. - uint32_t arange_end = 0; - uint32_t brange_start = 0; - uint32_t brange_end = 0; + uint32_t a_range_start = alen; // alen signals that values are not set. + uint32_t a_range_end = 0; + uint32_t b_range_start = 0; + uint32_t b_range_end = 0; while (i > 0 && j > 0) { bool emit_range = false; if (a[i - 1] == b[j - 1]) { @@ -611,24 +611,24 @@ rocksdb::Status String::LCS(const std::string &user_key1, const std::string &use } // Track the current range. - if (arange_start == alen) { - arange_start = i - 1; - arange_end = i - 1; - brange_start = j - 1; - brange_end = j - 1; + if (a_range_start == alen) { + a_range_start = i - 1; + a_range_end = i - 1; + b_range_start = j - 1; + b_range_end = j - 1; } // Let's see if we can extend the range backward since // it is contiguous. - else if (arange_start == i && brange_start == j) { - arange_start--; - brange_start--; + else if (a_range_start == i && b_range_start == j) { + a_range_start--; + b_range_start--; } else { emit_range = true; } // Emit the range if we matched with the first byte of // one of the two strings. We'll exit the loop ASAP. - if (arange_start == 0 || brange_start == 0) { + if (a_range_start == 0 || b_range_start == 0) { emit_range = true; } idx--; @@ -643,23 +643,23 @@ rocksdb::Status String::LCS(const std::string &user_key1, const std::string &use i--; else j--; - if (arange_start != alen) emit_range = true; + if (a_range_start != alen) emit_range = true; } // Emit the current range if needed. if (emit_range) { if (auto result = std::get_if(rst)) { - uint32_t match_len = arange_end - arange_start + 1; + uint32_t match_len = a_range_end - a_range_start + 1; // Always emit the range when the `min_match_len` is not set. if (args.min_match_len == 0 || match_len >= args.min_match_len) { - result->matches.emplace_back(StringLCSRange{arange_start, arange_end}, - StringLCSRange{brange_start, brange_end}, match_len); + result->matches.emplace_back(StringLCSRange{a_range_start, a_range_end}, + StringLCSRange{b_range_start, b_range_end}, match_len); } } // Restart at the next match. - arange_start = alen; + a_range_start = alen; } } diff --git a/tests/cppunit/cron_test.cc b/tests/cppunit/cron_test.cc index 9ce38a5a998..bccb5ee24e1 100644 --- a/tests/cppunit/cron_test.cc +++ b/tests/cppunit/cron_test.cc @@ -372,3 +372,43 @@ TEST_F(CronTestWeekDayInterval, ToString) { std::string got = cron_->ToString(); ASSERT_EQ("0 * * * */4", got); } + +class CronTestNumberAndRange : public testing::Test { + protected: + explicit CronTestNumberAndRange() { + cron_ = std::make_unique(); + std::vector schedule{"*", "1,3,6-10,20", "*", "*", "*"}; + auto s = cron_->SetScheduleTime(schedule); + EXPECT_TRUE(s.IsOK()); + } + ~CronTestNumberAndRange() override = default; + + std::unique_ptr cron_; +}; + +TEST_F(CronTestNumberAndRange, IsTimeMatch) { + std::time_t t = std::time(nullptr); + std::tm *now = std::localtime(&t); + now->tm_hour = 1; + ASSERT_TRUE(cron_->IsTimeMatch(now)); + now->tm_hour = 3; + ASSERT_TRUE(cron_->IsTimeMatch(now)); + now->tm_hour = 6; + ASSERT_TRUE(cron_->IsTimeMatch(now)); + now->tm_hour = 8; + ASSERT_TRUE(cron_->IsTimeMatch(now)); + now->tm_hour = 10; + ASSERT_TRUE(cron_->IsTimeMatch(now)); + now->tm_hour = 20; + ASSERT_TRUE(cron_->IsTimeMatch(now)); + now->tm_hour = 0; + ASSERT_FALSE(cron_->IsTimeMatch(now)); + now->tm_hour = 2; + ASSERT_FALSE(cron_->IsTimeMatch(now)); + now->tm_hour = 5; + ASSERT_FALSE(cron_->IsTimeMatch(now)); + now->tm_hour = 14; + ASSERT_FALSE(cron_->IsTimeMatch(now)); + now->tm_hour = 22; + ASSERT_FALSE(cron_->IsTimeMatch(now)); +} diff --git a/tests/gocase/unit/type/json/json_test.go b/tests/gocase/unit/type/json/json_test.go index 8182db02df5..e720c89be4d 100644 --- a/tests/gocase/unit/type/json/json_test.go +++ b/tests/gocase/unit/type/json/json_test.go @@ -356,7 +356,7 @@ func TestJson(t *testing.T) { t.Run("JSON.ARRTRIM basics", func(t *testing.T) { require.NoError(t, rdb.Del(ctx, "a").Err()) // key no exists - require.EqualError(t, rdb.Do(ctx, "JSON.ARRTRIM", "not_exists", "$", 0, 0).Err(), redis.Nil.Error()) + require.ErrorContains(t, rdb.Do(ctx, "JSON.ARRTRIM", "not_exists", "$", 0, 0).Err(), "could not perform this operation on a key that doesn't exist") // key not json require.NoError(t, rdb.Do(ctx, "SET", "no_json", "1").Err()) require.Error(t, rdb.Do(ctx, "JSON.ARRTRIM", "no_json", "$", 0, 0).Err()) diff --git a/tests/gocase/unit/type/stream/stream_test.go b/tests/gocase/unit/type/stream/stream_test.go index 7297bb66036..b8958e70f31 100644 --- a/tests/gocase/unit/type/stream/stream_test.go +++ b/tests/gocase/unit/type/stream/stream_test.go @@ -1500,6 +1500,410 @@ func TestStreamOffset(t *testing.T) { require.Len(t, claimedIDs, 1, "Expected to claim exactly one message ID") require.Equal(t, "1-0", claimedIDs[0], "Expected claimed message ID to match") }) + + t.Run("XAUTOCLAIM can claim PEL items from another consume", func(t *testing.T) { + + streamName := "mystream" + groupName := "mygroup" + var id1 string + require.NoError(t, rdb.Del(ctx, streamName).Err()) + { + rsp := rdb.XAdd(ctx, &redis.XAddArgs{ + Stream: streamName, + ID: "*", + Values: []string{"a", "1"}, + }) + require.NoError(t, rsp.Err()) + id1 = rsp.Val() + } + var id2 string + { + rsp := rdb.XAdd(ctx, &redis.XAddArgs{ + Stream: streamName, + ID: "*", + Values: []string{"b", "2"}, + }) + require.NoError(t, rsp.Err()) + id2 = rsp.Val() + } + var id3 string + { + rsp := rdb.XAdd(ctx, &redis.XAddArgs{ + Stream: streamName, + ID: "*", + Values: []string{"c", "3"}, + }) + require.NoError(t, rsp.Err()) + id3 = rsp.Val() + } + var id4 string + { + rsp := rdb.XAdd(ctx, &redis.XAddArgs{ + Stream: streamName, + ID: "*", + Values: []string{"d", "4"}, + }) + require.NoError(t, rsp.Err()) + id4 = rsp.Val() + } + + require.NoError(t, rdb.XGroupCreate(ctx, streamName, groupName, "0").Err()) + + consumer1 := "consumer1" + consumer2 := "consumer2" + { + rsp := rdb.XReadGroup(ctx, &redis.XReadGroupArgs{ + Group: groupName, + Consumer: consumer1, + Streams: []string{streamName, ">"}, + Count: 1, + }) + require.NoError(t, rsp.Err()) + require.Len(t, rsp.Val(), 1) + require.Len(t, rsp.Val()[0].Messages, 1) + require.Equal(t, id1, rsp.Val()[0].Messages[0].ID) + require.Len(t, rsp.Val()[0].Messages[0].Values, 1) + require.Equal(t, "1", rsp.Val()[0].Messages[0].Values["a"]) + } + + { + time.Sleep(200 * time.Millisecond) + rsp := rdb.XAutoClaim(ctx, &redis.XAutoClaimArgs{ + Stream: streamName, + Group: groupName, + Consumer: consumer2, + MinIdle: 10 * time.Millisecond, + Count: 1, + Start: "-", + }) + require.NoError(t, rsp.Err()) + msgs, start := rsp.Val() + require.Equal(t, "0-0", start) + require.Len(t, msgs, 1) + require.Len(t, msgs[0].Values, 1) + require.Equal(t, "1", msgs[0].Values["a"]) + } + + { + rsp := rdb.XReadGroup(ctx, &redis.XReadGroupArgs{ + Group: groupName, + Consumer: consumer1, + Streams: []string{streamName, ">"}, + Count: 3, + }) + require.NoError(t, rsp.Err()) + + time.Sleep(time.Millisecond * 200) + require.NoError(t, rdb.XDel(ctx, streamName, id2).Err()) + } + + { + cmd := rdb.Do(ctx, "XAUTOCLAIM", streamName, groupName, consumer2, 10, "-", "COUNT", 3) + require.NoError(t, cmd.Err()) + require.Equal(t, []interface{}{ + id4, + []interface{}{ + []interface{}{ + id1, + []interface{}{"a", "1"}, + }, + []interface{}{ + id3, + []interface{}{"c", "3"}, + }, + }, + []interface{}{ + id2, + }, + }, cmd.Val()) + } + + { + time.Sleep(time.Millisecond * 200) + require.NoError(t, rdb.XDel(ctx, streamName, id4).Err()) + rsp := rdb.XAutoClaimJustID(ctx, &redis.XAutoClaimArgs{ + Stream: streamName, + Group: groupName, + Consumer: consumer2, + MinIdle: 10 * time.Millisecond, + Start: "-", + }) + require.NoError(t, rsp.Err()) + msgs, start := rsp.Val() + require.Equal(t, "0-0", start) + require.Len(t, msgs, 2) + require.Equal(t, id1, msgs[0]) + require.Equal(t, id3, msgs[1]) + } + }) + + t.Run("XAUTOCLAIM as an iterator", func(t *testing.T) { + streamName := "mystream" + groupName := "mygroup" + var id3, id5 string + require.NoError(t, rdb.Del(ctx, streamName).Err()) + { + rsp := rdb.XAdd(ctx, &redis.XAddArgs{ + Stream: streamName, + ID: "*", + Values: []string{"a", "1"}, + }) + require.NoError(t, rsp.Err()) + } + { + rsp := rdb.XAdd(ctx, &redis.XAddArgs{ + Stream: streamName, + ID: "*", + Values: []string{"b", "2"}, + }) + require.NoError(t, rsp.Err()) + } + { + rsp := rdb.XAdd(ctx, &redis.XAddArgs{ + Stream: streamName, + ID: "*", + Values: []string{"c", "3"}, + }) + require.NoError(t, rsp.Err()) + id3 = rsp.Val() + } + { + rsp := rdb.XAdd(ctx, &redis.XAddArgs{ + Stream: streamName, + ID: "*", + Values: []string{"d", "4"}, + }) + require.NoError(t, rsp.Err()) + } + { + rsp := rdb.XAdd(ctx, &redis.XAddArgs{ + Stream: streamName, + ID: "*", + Values: []string{"e", "5"}, + }) + require.NoError(t, rsp.Err()) + id5 = rsp.Val() + } + require.NoError(t, rdb.XGroupCreate(ctx, streamName, groupName, "0").Err()) + + consumer1, consumer2 := "consumer1", "consumer2" + { + rsp := rdb.XReadGroup(ctx, &redis.XReadGroupArgs{ + Group: groupName, + Consumer: consumer1, + Streams: []string{streamName, ">"}, + Count: 90, + }) + require.NoError(t, rsp.Err()) + time.Sleep(200 * time.Millisecond) + } + { + rsp := rdb.XAutoClaim(ctx, &redis.XAutoClaimArgs{ + Stream: streamName, + Group: groupName, + Consumer: consumer2, + MinIdle: 10 * time.Millisecond, + Count: 2, + Start: "-", + }) + require.NoError(t, rsp.Err()) + msgs, start := rsp.Val() + require.Equal(t, id3, start) + require.Len(t, msgs, 2) + require.Len(t, msgs[0].Values, 1) + require.Equal(t, "1", msgs[0].Values["a"]) + } + + { + rsp := rdb.XAutoClaim(ctx, &redis.XAutoClaimArgs{ + Stream: streamName, + Group: groupName, + Consumer: consumer2, + MinIdle: 10 * time.Millisecond, + Start: id3, + Count: 2, + }) + require.NoError(t, rsp.Err()) + msgs, start := rsp.Val() + require.Equal(t, id5, start) + require.Len(t, msgs, 2) + require.Len(t, msgs[0].Values, 1) + require.Equal(t, "3", msgs[0].Values["c"]) + } + + { + rsp := rdb.XAutoClaim(ctx, &redis.XAutoClaimArgs{ + Stream: streamName, + Group: groupName, + Consumer: consumer2, + MinIdle: 10 * time.Millisecond, + Start: id5, + Count: 1, + }) + require.NoError(t, rsp.Err()) + msgs, start := rsp.Val() + require.Equal(t, "0-0", start) + require.Len(t, msgs, 1) + require.Len(t, msgs[0].Values, 1) + require.Equal(t, "5", msgs[0].Values["e"]) + } + }) + + t.Run("XAUTOCLAIM with XDEL", func(t *testing.T) { + streamName := "x" + groupName := "grp" + require.NoError(t, rdb.Del(ctx, streamName).Err()) + require.NoError(t, rdb.XAdd(ctx, &redis.XAddArgs{ + Stream: streamName, + ID: "1-0", + Values: map[string]interface{}{"f": "v"}, + }).Err()) + require.NoError(t, rdb.XAdd(ctx, &redis.XAddArgs{ + Stream: streamName, + ID: "2-0", + Values: map[string]interface{}{"f": "v"}, + }).Err()) + require.NoError(t, rdb.XAdd(ctx, &redis.XAddArgs{ + Stream: streamName, + ID: "3-0", + Values: map[string]interface{}{"f": "v"}, + }).Err()) + require.NoError(t, rdb.XGroupCreate(ctx, streamName, groupName, "0").Err()) + { + rsp := rdb.XReadGroup(ctx, &redis.XReadGroupArgs{ + Group: groupName, + Consumer: "Alice", + Streams: []string{streamName, ">"}, + }) + require.NoError(t, rsp.Err()) + require.Len(t, rsp.Val(), 1) + require.Len(t, rsp.Val()[0].Messages, 3) + require.Equal(t, "1-0", rsp.Val()[0].Messages[0].ID) + require.Equal(t, "v", rsp.Val()[0].Messages[0].Values["f"]) + require.Equal(t, "2-0", rsp.Val()[0].Messages[1].ID) + require.Equal(t, "v", rsp.Val()[0].Messages[1].Values["f"]) + require.Equal(t, "3-0", rsp.Val()[0].Messages[2].ID) + require.Equal(t, "v", rsp.Val()[0].Messages[2].Values["f"]) + } + { + require.NoError(t, rdb.XDel(ctx, streamName, "2-0").Err()) + cmd := rdb.Do(ctx, "XAUTOCLAIM", streamName, groupName, "Bob", 0, "0-0") + require.NoError(t, cmd.Err()) + require.Equal(t, []interface{}{ + "0-0", + []interface{}{ + []interface{}{ + "1-0", + []interface{}{"f", "v"}, + }, + []interface{}{ + "3-0", + []interface{}{"f", "v"}, + }, + }, + []interface{}{ + "2-0", + }, + }, cmd.Val()) + } + }) + + t.Run("XAUTOCLAIM with XDEL and count", func(t *testing.T) { + streamName := "x" + groupName := "grp" + require.NoError(t, rdb.Del(ctx, streamName).Err()) + require.NoError(t, rdb.XAdd(ctx, &redis.XAddArgs{ + Stream: streamName, + ID: "1-0", + Values: map[string]interface{}{"f": "v"}, + }).Err()) + require.NoError(t, rdb.XAdd(ctx, &redis.XAddArgs{ + Stream: streamName, + ID: "2-0", + Values: map[string]interface{}{"f": "v"}, + }).Err()) + require.NoError(t, rdb.XAdd(ctx, &redis.XAddArgs{ + Stream: streamName, + ID: "3-0", + Values: map[string]interface{}{"f": "v"}, + }).Err()) + require.NoError(t, rdb.XGroupCreate(ctx, streamName, groupName, "0").Err()) + { + rsp := rdb.XReadGroup(ctx, &redis.XReadGroupArgs{ + Group: groupName, + Consumer: "Alice", + Streams: []string{streamName, ">"}, + }) + require.NoError(t, rsp.Err()) + require.Len(t, rsp.Val(), 1) + require.Len(t, rsp.Val()[0].Messages, 3) + require.Equal(t, "1-0", rsp.Val()[0].Messages[0].ID) + require.Equal(t, "v", rsp.Val()[0].Messages[0].Values["f"]) + require.Equal(t, "2-0", rsp.Val()[0].Messages[1].ID) + require.Equal(t, "v", rsp.Val()[0].Messages[1].Values["f"]) + require.Equal(t, "3-0", rsp.Val()[0].Messages[2].ID) + require.Equal(t, "v", rsp.Val()[0].Messages[2].Values["f"]) + } + { + require.NoError(t, rdb.XDel(ctx, streamName, "1-0").Err()) + require.NoError(t, rdb.XDel(ctx, streamName, "2-0").Err()) + cmd := rdb.Do(ctx, "XAUTOCLAIM", streamName, groupName, "Bob", 0, "0-0", "COUNT", 1) + require.NoError(t, cmd.Err()) + require.Equal(t, []interface{}{ + "2-0", + []interface{}{}, + []interface{}{ + "1-0", + }, + }, cmd.Val()) + } + { + cmd := rdb.Do(ctx, "XAUTOCLAIM", streamName, groupName, "Bob", 0, "2-0", "COUNT", 1) + require.NoError(t, cmd.Err()) + require.Equal(t, []interface{}{ + "3-0", + []interface{}{}, + []interface{}{ + "2-0", + }, + }, cmd.Val()) + } + { + cmd := rdb.Do(ctx, "XAUTOCLAIM", streamName, groupName, "Bob", 0, "3-0", "COUNT", 1) + require.NoError(t, cmd.Err()) + require.Equal(t, []interface{}{ + "0-0", + []interface{}{ + []interface{}{ + "3-0", + []interface{}{"f", "v"}, + }, + }, + []interface{}{}, + }, cmd.Val()) + } + // assert_equal [XPENDING x grp - + 10 Alice] {} + // add xpending to this test case when it is supported + }) + + t.Run("XAUTOCLAIM with out of range count", func(t *testing.T) { + err := rdb.XAutoClaim(ctx, &redis.XAutoClaimArgs{ + Stream: "x", + Group: "grp", + Consumer: "Bob", + MinIdle: 0, + Start: "3-0", + Count: 8070450532247928833, + }).Err() + require.Error(t, err) + require.True(t, strings.HasPrefix(err.Error(), "ERR COUNT")) + }) + + t.Run("XAUTOCLAIM COUNT must be > 0", func(t *testing.T) { + cmd := rdb.Do(ctx, "XAUTOCLAIM", "key", "group", "consumer", 1, 1, "COUNT", 0) + require.Error(t, cmd.Err()) + require.Equal(t, "ERR COUNT must be > 0", cmd.Err().Error()) + }) } func parseStreamEntryID(id string) (ts int64, seqNum int64) {