Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: add new hash expire cmd to pika #2883

Open
wants to merge 2 commits into
base: unstable
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions include/acl.h
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@ enum class AclCategory {
CONNECTION = (1ULL << 18),
TRANSACTION = (1ULL << 19),
SCRIPTING = (1ULL << 20),
PKHASH = (1ULL << 21),
};

enum class AclUserFlag {
Expand Down
35 changes: 30 additions & 5 deletions include/pika_command.h
Original file line number Diff line number Diff line change
Expand Up @@ -137,6 +137,31 @@ const std::string kCmdNameHScanx = "hscanx";
const std::string kCmdNamePKHScanRange = "pkhscanrange";
const std::string kCmdNamePKHRScanRange = "pkhrscanrange";

// PKHash
const std::string kCmdNamePKHSet = "pkhset";
const std::string kCmdNamePKHSetex = "pkhsetex";
const std::string kCmdNamePKHExpire = "pkhexpire";
const std::string kCmdNamePKHExpireat = "pkhexpireat";
const std::string kCmdNamePKHExpiretime = "pkhexpiretime";
const std::string kCmdNamePKHTTL = "pkhttl";
const std::string kCmdNamePKHPersist = "pkhpersist";
const std::string kCmdNamePKHGet = "pkhget";
const std::string kCmdNamePKHExists = "pkhexists";
const std::string kCmdNamePKHDel = "pkhdel";
const std::string kCmdNamePKHLen = "pkhlen";
const std::string kCmdNamePKHStrlen = "pkhstrlen";
const std::string kCmdNamePKHIncrby = "pkhincrby";
const std::string kCmdNamePKHIncrbyfloat = "pkhincrbyfloat";
const std::string kCmdNamePKHMSet = "pkhmset";
const std::string kCmdNamePKHMSetex = "pkhmsetex";
const std::string kCmdNamePKHMGet = "pkhmget";
const std::string kCmdNamePKHKeys = "pkhkeys";
const std::string kCmdNamePKHVals = "pkhvals";
const std::string kCmdNamePKHGetall = "pkhgetall";
const std::string kCmdNamePKHScan = "pkhscan";
const std::string kCmdNamePKEHScanRange = "pkehscanrange";
const std::string kCmdNamePKEHRScanRange = "pkehrscanrange";

// List
const std::string kCmdNameLIndex = "lindex";
const std::string kCmdNameLInsert = "linsert";
Expand Down Expand Up @@ -247,7 +272,6 @@ const std::string kCmdNameXInfo = "xinfo";

const std::string kClusterPrefix = "pkcluster";


/*
* If a type holds a key, a new data structure
* that uses the key will use this error
Expand Down Expand Up @@ -290,7 +314,8 @@ enum CmdFlags {
kCmdFlagsOperateKey = (1 << 19), // redis keySpace
kCmdFlagsStream = (1 << 20),
kCmdFlagsFast = (1 << 21),
kCmdFlagsSlow = (1 << 22)
kCmdFlagsSlow = (1 << 22),
kCmdFlagsPKHash = (1 << 23), // TODO(DDD)
};

void inline RedisAppendContent(std::string& str, const std::string& value);
Expand Down Expand Up @@ -480,7 +505,7 @@ class CmdRes {
struct UnblockTaskArgs {
std::string key;
std::shared_ptr<DB> db;
net::DispatchThread* dispatchThread{ nullptr };
net::DispatchThread* dispatchThread{nullptr};
UnblockTaskArgs(std::string key_, std::shared_ptr<DB> db_, net::DispatchThread* dispatchThread_)
: key(std::move(key_)), db(db_), dispatchThread(dispatchThread_) {}
};
Expand Down Expand Up @@ -568,7 +593,7 @@ class Cmd : public std::enable_shared_from_this<Cmd> {
std::shared_ptr<std::string> GetResp();

void SetStage(CmdStage stage);
void SetCmdId(uint32_t cmdId){cmdId_ = cmdId;}
void SetCmdId(uint32_t cmdId) { cmdId_ = cmdId; }

virtual void DoBinlog();

Expand Down Expand Up @@ -610,7 +635,7 @@ class Cmd : public std::enable_shared_from_this<Cmd> {

private:
virtual void DoInitial() = 0;
virtual void Clear(){};
virtual void Clear() {};

Cmd& operator=(const Cmd&);
};
Expand Down
189 changes: 189 additions & 0 deletions include/pika_pkhash.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,189 @@
// Copyright (c) 2015-present, Qihoo, Inc. All rights reserved.
// This source code is licensed under the BSD-style license found in the
// LICENSE file in the root directory of this source tree. An additional grant
// of patent rights can be found in the PATENTS file in the same directory.

#ifndef PIKA_PKHASH_H_
#define PIKA_PKHASH_H_

#include "include/acl.h"
#include "include/pika_command.h"
#include "include/pika_db.h"
#include "storage/storage.h"

class PKHExpireCmd : public Cmd {
public:
PKHExpireCmd(const std::string& name, int arity, uint32_t flag)
: Cmd(name, arity, flag, static_cast<uint32_t>(AclCategory::PKHASH)) {}
std::vector<std::string> current_key() const override {
std::vector<std::string> res;
res.push_back(key_);
return res;
}
void Do() override;
void Split(const HintKeys& hint_keys) override {};
void Merge() override {};
Cmd* Clone() override { return new PKHExpireCmd(*this); }

private:
std::string key_;
int64_t ttl_ = 0;
int64_t numfields_ = 0;
std::vector<std::string> fields_;

rocksdb::Status s_;

void DoInitial() override;
void Clear() override {}
};

class PKHExpireatCmd : public Cmd {
public:
PKHExpireatCmd(const std::string& name, int arity, uint32_t flag)
: Cmd(name, arity, flag, static_cast<uint32_t>(AclCategory::PKHASH)) {}
std::vector<std::string> current_key() const override {
std::vector<std::string> res;
res.push_back(key_);
return res;
}
void Do() override;
void Split(const HintKeys& hint_keys) override {};
void Merge() override {};
Cmd* Clone() override { return new PKHExpireatCmd(*this); }

private:
std::string key_;
int64_t timestamp_ = 0;
int64_t numfields_ = 0;
std::vector<std::string> fields_;

rocksdb::Status s_;

void DoInitial() override;
void Clear() override {}
};
class PKHExpiretimeCmd : public Cmd {
public:
PKHExpiretimeCmd(const std::string& name, int arity, uint32_t flag)
: Cmd(name, arity, flag, static_cast<uint32_t>(AclCategory::PKHASH)) {}
std::vector<std::string> current_key() const override {
std::vector<std::string> res;
res.push_back(key_);
return res;
}
void Do() override;
void Split(const HintKeys& hint_keys) override {};
void Merge() override {};
Cmd* Clone() override { return new PKHExpiretimeCmd(*this); }

private:
std::string key_;
int64_t ttl_ = 0;
int64_t numfields_ = 0;
std::vector<std::string> fields_;

rocksdb::Status s_;

void DoInitial() override;
void Clear() override {}
};

class PKHPersistCmd : public Cmd {
public:
PKHPersistCmd(const std::string& name, int arity, uint32_t flag)
: Cmd(name, arity, flag, static_cast<uint32_t>(AclCategory::PKHASH)) {}
std::vector<std::string> current_key() const override {
std::vector<std::string> res;
res.push_back(key_);
return res;
}
void Do() override;
void Split(const HintKeys& hint_keys) override {};
void Merge() override {};
Cmd* Clone() override { return new PKHPersistCmd(*this); }

private:
std::string key_;
int64_t ttl_ = 0;
int64_t numfields_ = 0;
std::vector<std::string> fields_;

rocksdb::Status s_;

void DoInitial() override;
void Clear() override {}
};

class PKHTTLCmd : public Cmd {
public:
PKHTTLCmd(const std::string& name, int arity, uint32_t flag)
: Cmd(name, arity, flag, static_cast<uint32_t>(AclCategory::PKHASH)) {}
std::vector<std::string> current_key() const override {
std::vector<std::string> res;
res.push_back(key_);
return res;
}
void Do() override;
void Split(const HintKeys& hint_keys) override {};
void Merge() override {};
Cmd* Clone() override { return new PKHTTLCmd(*this); }

private:
std::string key_;
int64_t ttl_ = 0;
int64_t numfields_ = 0;
std::vector<std::string> fields_;

rocksdb::Status s_;

void DoInitial() override;
void Clear() override {}
};

class PKHGetCmd : public Cmd {
public:
PKHGetCmd(const std::string& name, int arity, uint32_t flag)
: Cmd(name, arity, flag, static_cast<uint32_t>(AclCategory::PKHASH)) {}
std::vector<std::string> current_key() const override {
std::vector<std::string> res;
res.push_back(key_);
return res;
}
void Do() override;
void ReadCache() override;
void DoThroughDB() override;
void DoUpdateCache() override;
void Split(const HintKeys& hint_keys) override {};
void Merge() override {};
Cmd* Clone() override { return new PKHGetCmd(*this); }

private:
std::string key_, field_;
void DoInitial() override;
rocksdb::Status s_;
};

class PKHSetCmd : public Cmd {
public:
PKHSetCmd(const std::string& name, int arity, uint32_t flag)
: Cmd(name, arity, flag, static_cast<uint32_t>(AclCategory::PKHASH)) {}
std::vector<std::string> current_key() const override {
std::vector<std::string> res;
res.push_back(key_);
return res;
}
void Do() override;
void DoThroughDB() override;
void DoUpdateCache() override;
void Split(const HintKeys& hint_keys) override {};
void Merge() override {};
Cmd* Clone() override { return new PKHSetCmd(*this); }

private:
// 每个命令的参数组成不同。
std::string key_, field_, value_;
void DoInitial() override;
rocksdb::Status s_;
};

#endif
1 change: 1 addition & 0 deletions src/pika_client_conn.cc
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ extern PikaServer* g_pika_server;
extern std::unique_ptr<PikaReplicaManager> g_pika_rm;
extern std::unique_ptr<PikaCmdTableManager> g_pika_cmd_table_manager;


PikaClientConn::PikaClientConn(int fd, const std::string& ip_port, net::Thread* thread, net::NetMultiplexer* mpx,
const net::HandleType& handle_type, int max_conn_rbuf_size)
: RedisConn(fd, ip_port, thread, mpx, handle_type, max_conn_rbuf_size),
Expand Down
Loading
Loading