From 1c64f03cbc3faf6f8a3551925bdc64cb67d8722a Mon Sep 17 00:00:00 2001 From: sjcsjc123 <1401189096@qq.com> Date: Sun, 9 Jul 2023 15:54:14 +0800 Subject: [PATCH 1/9] feat: update region description --- cluster/region/region.go | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/cluster/region/region.go b/cluster/region/region.go index 5076c238..490c2626 100644 --- a/cluster/region/region.go +++ b/cluster/region/region.go @@ -7,6 +7,11 @@ import ( "sync" ) +// The region is responsible for maintaining the key range and the raftGroups it belongs to, +// directly interacting with the underlying DB APIs, +// and the actual groups where the data falls into the disk. +// raftGroups contains the other raft node of the region, a region has at least three replicas. +// region and replicas are a raft group, and the one region is the leader of the raft group. // region stores the data of a region. type region struct { id uint64 // region id From 6451f69fefbe320f24beba8aea27a1c83c5612d0 Mon Sep 17 00:00:00 2001 From: sjcsjc123 <1401189096@qq.com> Date: Sun, 9 Jul 2023 15:55:33 +0800 Subject: [PATCH 2/9] feat: add store_config.go to create default config for raft server --- cluster/store/store_config.go | 117 ++++++++++++++++++++++++++++++++++ 1 file changed, 117 insertions(+) create mode 100644 cluster/store/store_config.go diff --git a/cluster/store/store_config.go b/cluster/store/store_config.go new file mode 100644 index 00000000..e69491de --- /dev/null +++ b/cluster/store/store_config.go @@ -0,0 +1,117 @@ +package store + +import ( + "github.com/hashicorp/raft" +) + +// newDefaultConfig returns a new default raft config +func (s *store) newDefaultConfig() *raft.Config { + return &raft.Config{ + // implement me + } +} + +// you should read this to ensure suitable config for FlyDB raft, and fill in the newDefaultConfig +// You can check the method `raft.DefaultConfig()` of raft + +// // Config provides any necessary configuration for the Raft server. +//type Config struct { +// // ProtocolVersion allows a Raft server to inter-operate with older +// // Raft servers running an older version of the code. This is used to +// // version the wire protocol as well as Raft-specific log entries that +// // the server uses when _speaking_ to other servers. There is currently +// // no auto-negotiation of versions so all servers must be manually +// // configured with compatible versions. See ProtocolVersionMin and +// // ProtocolVersionMax for the versions of the protocol that this server +// // can _understand_. +// ProtocolVersion ProtocolVersion +// +// // HeartbeatTimeout specifies the time in follower state without contact +// // from a leader before we attempt an election. +// HeartbeatTimeout time.Duration +// +// // ElectionTimeout specifies the time in candidate state without contact +// // from a leader before we attempt an election. +// ElectionTimeout time.Duration +// +// // CommitTimeout specifies the time without an Apply operation before the +// // leader sends an AppendEntry RPC to followers, to ensure a timely commit of +// // log entries. +// // Due to random staggering, may be delayed as much as 2x this value. +// CommitTimeout time.Duration +// +// // MaxAppendEntries controls the maximum number of append entries +// // to send at once. We want to strike a balance between efficiency +// // and avoiding waste if the follower is going to reject because of +// // an inconsistent log. +// MaxAppendEntries int +// +// // BatchApplyCh indicates whether we should buffer applyCh +// // to size MaxAppendEntries. This enables batch log commitment, +// // but breaks the timeout guarantee on Apply. Specifically, +// // a log can be added to the applyCh buffer but not actually be +// // processed until after the specified timeout. +// BatchApplyCh bool +// +// // If we are a member of a cluster, and RemovePeer is invoked for the +// // local node, then we forget all peers and transition into the follower state. +// // If ShutdownOnRemove is set, we additional shutdown Raft. Otherwise, +// // we can become a leader of a cluster containing only this node. +// ShutdownOnRemove bool +// +// // TrailingLogs controls how many logs we leave after a snapshot. This is used +// // so that we can quickly replay logs on a follower instead of being forced to +// // send an entire snapshot. The value passed here is the initial setting used. +// // This can be tuned during operation using ReloadConfig. +// TrailingLogs uint64 +// +// // SnapshotInterval controls how often we check if we should perform a +// // snapshot. We randomly stagger between this value and 2x this value to avoid +// // the entire cluster from performing a snapshot at once. The value passed +// // here is the initial setting used. This can be tuned during operation using +// // ReloadConfig. +// SnapshotInterval time.Duration +// +// // SnapshotThreshold controls how many outstanding logs there must be before +// // we perform a snapshot. This is to prevent excessive snapshotting by +// // replaying a small set of logs instead. The value passed here is the initial +// // setting used. This can be tuned during operation using ReloadConfig. +// SnapshotThreshold uint64 +// +// // LeaderLeaseTimeout is used to control how long the "lease" lasts +// // for being the leader without being able to contact a quorum +// // of nodes. If we reach this interval without contact, we will +// // step down as leader. +// LeaderLeaseTimeout time.Duration +// +// // LocalID is a unique ID for this server across all time. When running with +// // ProtocolVersion < 3, you must set this to be the same as the network +// // address of your transport. +// LocalID ServerID +// +// // NotifyCh is used to provide a channel that will be notified of leadership +// // changes. Raft will block writing to this channel, so it should either be +// // buffered or aggressively consumed. +// NotifyCh chan<- bool +// +// // LogOutput is used as a sink for logs, unless Logger is specified. +// // Defaults to os.Stderr. +// LogOutput io.Writer +// +// // LogLevel represents a log level. If the value does not match a known +// // logging level hclog.NoLevel is used. +// LogLevel string +// +// // Logger is a user-provided logger. If nil, a logger writing to +// // LogOutput with LogLevel is used. +// Logger hclog.Logger +// +// // NoSnapshotRestoreOnStart controls if raft will restore a snapshot to the +// // FSM on start. This is useful if your FSM recovers from other mechanisms +// // than raft snapshotting. Snapshot metadata will still be used to initialize +// // raft's configuration and index values. +// NoSnapshotRestoreOnStart bool +// +// // skipStartup allows NewRaft() to bypass all background work goroutines +// skipStartup bool +//} From bee8af1647276c52e5da08acf62f7d31dfe2c75e Mon Sep 17 00:00:00 2001 From: sjcsjc123 <1401189096@qq.com> Date: Sun, 9 Jul 2023 15:57:57 +0800 Subject: [PATCH 3/9] feat: add store_fsm.go to rewrite fsm for raft server --- cluster/store/store_fsm.go | 30 ++++++++++++++++++++++++++++++ 1 file changed, 30 insertions(+) create mode 100644 cluster/store/store_fsm.go diff --git a/cluster/store/store_fsm.go b/cluster/store/store_fsm.go new file mode 100644 index 00000000..f6b3c34b --- /dev/null +++ b/cluster/store/store_fsm.go @@ -0,0 +1,30 @@ +package store + +import ( + "github.com/hashicorp/raft" + "io" +) + +// fsm implements raft.FSM interface +type fsm struct { + //implement me +} + +func newFSM() raft.FSM { + return &fsm{} +} + +func (f fsm) Apply(log *raft.Log) interface{} { + //TODO implement me + panic("implement me") +} + +func (f fsm) Snapshot() (raft.FSMSnapshot, error) { + //TODO implement me + panic("implement me") +} + +func (f fsm) Restore(snapshot io.ReadCloser) error { + //TODO implement me + panic("implement me") +} From 119e7817af3bcb6d22cede4438edce594dbcb715 Mon Sep 17 00:00:00 2001 From: sjcsjc123 <1401189096@qq.com> Date: Sun, 9 Jul 2023 15:58:23 +0800 Subject: [PATCH 4/9] feat: add store_snapshot.go.go to rewrite snapshot for raft server --- cluster/store/store_snapshot.go | 30 ++++++++++++++++++++++++++++++ 1 file changed, 30 insertions(+) create mode 100644 cluster/store/store_snapshot.go diff --git a/cluster/store/store_snapshot.go b/cluster/store/store_snapshot.go new file mode 100644 index 00000000..71126772 --- /dev/null +++ b/cluster/store/store_snapshot.go @@ -0,0 +1,30 @@ +package store + +import ( + "github.com/hashicorp/raft" + "io" +) + +// snapshot implements raft.SnapshotStore interface +type snapshot struct { + //implement me +} + +func newSnapshot() raft.SnapshotStore { + return &snapshot{} +} + +func (s snapshot) Create(version raft.SnapshotVersion, index, term uint64, configuration raft.Configuration, configurationIndex uint64, trans raft.Transport) (raft.SnapshotSink, error) { + //TODO implement me + panic("implement me") +} + +func (s snapshot) List() ([]*raft.SnapshotMeta, error) { + //TODO implement me + panic("implement me") +} + +func (s snapshot) Open(id string) (*raft.SnapshotMeta, io.ReadCloser, error) { + //TODO implement me + panic("implement me") +} From 62bd063757e8de5bbb9e580ab22baf339ebee51e Mon Sep 17 00:00:00 2001 From: sjcsjc123 <1401189096@qq.com> Date: Sun, 9 Jul 2023 15:58:49 +0800 Subject: [PATCH 5/9] feat: add store_stable.go.go.go to rewrite StableStore for raft server --- cluster/store/store_stable.go | 35 +++++++++++++++++++++++++++++++++++ 1 file changed, 35 insertions(+) create mode 100644 cluster/store/store_stable.go diff --git a/cluster/store/store_stable.go b/cluster/store/store_stable.go new file mode 100644 index 00000000..c645fafd --- /dev/null +++ b/cluster/store/store_stable.go @@ -0,0 +1,35 @@ +package store + +import "github.com/hashicorp/raft" + +// stableLog implements raft.StableStore interface, we can use it to store raft stable logs +// how to store raft stable logs? we can use FlyDB/RocksDB/LevelDB/BoltDB to store raft stable logs +// maybe we can use FlyDB +type stableLog struct { + // implement me +} + +// newStableLog returns a new stableLog, we can use it to store raft stable logs +func newStableLog() raft.StableStore { + return &stableLog{} +} + +func (s stableLog) Set(key []byte, val []byte) error { + //TODO implement me + panic("implement me") +} + +func (s stableLog) Get(key []byte) ([]byte, error) { + //TODO implement me + panic("implement me") +} + +func (s stableLog) SetUint64(key []byte, val uint64) error { + //TODO implement me + panic("implement me") +} + +func (s stableLog) GetUint64(key []byte) (uint64, error) { + //TODO implement me + panic("implement me") +} From 7881cb8a18478b0a13e4933fdad2c732af301a20 Mon Sep 17 00:00:00 2001 From: sjcsjc123 <1401189096@qq.com> Date: Sun, 9 Jul 2023 15:59:17 +0800 Subject: [PATCH 6/9] feat: add store_transport.go to rewrite Transport for raft server --- cluster/store/store_transport.go | 73 ++++++++++++++++++++++++++++++++ 1 file changed, 73 insertions(+) create mode 100644 cluster/store/store_transport.go diff --git a/cluster/store/store_transport.go b/cluster/store/store_transport.go new file mode 100644 index 00000000..7f61015b --- /dev/null +++ b/cluster/store/store_transport.go @@ -0,0 +1,73 @@ +package store + +import ( + "github.com/hashicorp/raft" + "google.golang.org/grpc" + "io" +) + +// transport implements raft.Transport interface +// we can use it to send rpc to other raft nodes +// and receive rpc from other raft nodes +type transport struct { + //implement me + localAddr raft.ServerAddress + consumer chan raft.RPC + clients map[raft.ServerAddress]*grpc.ClientConn + server *grpc.Server +} + +// NewTransport returns a new transport, it needs start a grpc server +func newTransport() raft.Transport { + return &transport{} +} + +func (t *transport) AppendEntriesPipeline(id raft.ServerID, target raft.ServerAddress) (raft.AppendPipeline, error) { + //TODO implement me + panic("implement me") +} + +func (t *transport) AppendEntries(id raft.ServerID, target raft.ServerAddress, args *raft.AppendEntriesRequest, resp *raft.AppendEntriesResponse) error { + //TODO implement me + panic("implement me") +} + +func (t *transport) RequestVote(id raft.ServerID, target raft.ServerAddress, args *raft.RequestVoteRequest, resp *raft.RequestVoteResponse) error { + //TODO implement me + panic("implement me") +} + +func (t *transport) InstallSnapshot(id raft.ServerID, target raft.ServerAddress, args *raft.InstallSnapshotRequest, resp *raft.InstallSnapshotResponse, data io.Reader) error { + //TODO implement me + panic("implement me") +} + +func (t *transport) TimeoutNow(id raft.ServerID, target raft.ServerAddress, args *raft.TimeoutNowRequest, resp *raft.TimeoutNowResponse) error { + //TODO implement me + panic("implement me") +} + +func (t *transport) Consumer() <-chan raft.RPC { + //implement me + panic("implement me") +} + +func (t *transport) LocalAddr() raft.ServerAddress { + //implement me + panic("implement me") +} + +func (t *transport) EncodePeer(id raft.ServerID, addr raft.ServerAddress) []byte { + //implement me + panic("implement me") +} + +func (t *transport) DecodePeer([]byte) raft.ServerAddress { + //implement me + panic("implement me") +} + +func (t *transport) SetHeartbeatHandler(handler func(rpc raft.RPC)) { + //implement me + panic("implement me") +} From e7cb9705a8847d12900266716d76175fd4dcc10e Mon Sep 17 00:00:00 2001 From: sjcsjc123 <1401189096@qq.com> Date: Sun, 9 Jul 2023 15:59:52 +0800 Subject: [PATCH 7/9] feat: add store_log.go to rewrite LogStore for raft server --- cluster/store/store_log.go | 45 ++++++++++++++++++++++++++++++++++++++ 1 file changed, 45 insertions(+) create mode 100644 cluster/store/store_log.go diff --git a/cluster/store/store_log.go b/cluster/store/store_log.go new file mode 100644 index 00000000..0d050fe6 --- /dev/null +++ b/cluster/store/store_log.go @@ -0,0 +1,45 @@ +package store + +import "github.com/hashicorp/raft" + +// raftLog implements raft.LogStore interface, we can use it to store raft logs +// how to store raft logs? we can use FlyDB/RocksDB/LevelDB/BoltDB to store raft logs +// maybe we can use FlyDB +type raftLog struct { + // implement me +} + +// newRaftLog returns a new raftLog +func newRaftLog() raft.LogStore { + return &raftLog{} +} + +func (r *raftLog) FirstIndex() (uint64, error) { + //TODO implement me + panic("implement me") +} + +func (r *raftLog) LastIndex() (uint64, error) { + //TODO implement me + panic("implement me") +} + +func (r *raftLog) GetLog(index uint64, log *raft.Log) error { + //TODO implement me + panic("implement me") +} + +func (r *raftLog) StoreLog(log *raft.Log) error { + //TODO implement me + panic("implement me") +} + +func (r *raftLog) StoreLogs(logs []*raft.Log) error { + //TODO implement me + panic("implement me") +} + +func (r *raftLog) DeleteRange(min, max uint64) error { + //TODO implement me + panic("implement me") +} From e882bb941235b4bf2fc884655bef7b133e85e9e3 Mon Sep 17 00:00:00 2001 From: sjcsjc123 <1401189096@qq.com> Date: Sun, 9 Jul 2023 16:00:12 +0800 Subject: [PATCH 8/9] feat: update meta comment --- cluster/meta/meta.go | 3 +++ 1 file changed, 3 insertions(+) diff --git a/cluster/meta/meta.go b/cluster/meta/meta.go index 34a8e702..0502caea 100644 --- a/cluster/meta/meta.go +++ b/cluster/meta/meta.go @@ -26,6 +26,9 @@ type MetadataManager interface { } // meta stores the metadata of the cluster. +// meta will manage all stores and regions in the cluster. +// meta has at least three nodes in the cluster. +// meta nodes will create a raft group to manage the metadata of the cluster. type meta struct { clusterConfig *config.Config // cluster config, including cluster id, cluster name, etc. heartbeat map[string]time.Time // stores heartbeat, to check whether a store is alive. From f9307ede21cbe186ba9ffc81c9fa548e44fd4b61 Mon Sep 17 00:00:00 2001 From: sjcsjc123 <1401189096@qq.com> Date: Sun, 9 Jul 2023 16:00:41 +0800 Subject: [PATCH 9/9] feat: add some detail for store.go --- cluster/store/store.go | 38 +++++++++++++++++++++++++++++++++++++- 1 file changed, 37 insertions(+), 1 deletion(-) diff --git a/cluster/store/store.go b/cluster/store/store.go index 35fac32f..a0ae60a3 100644 --- a/cluster/store/store.go +++ b/cluster/store/store.go @@ -2,16 +2,21 @@ package store import ( "github.com/ByteStorage/FlyDB/cluster/region" + "github.com/hashicorp/raft" "sync" ) +// The store component is responsible for managing the division and merging of region partitions. +// All regions under the store share a port number. +// Each region under the store is in a raftGroups, and the region clusters in the raftGroups communicate through grpc // store stores the data of a store. type store struct { - id uint64 // store id + id string // store id addr string // store address regionList map[uint64]*region.Region // region list, to store the regions in the store. size int64 // size mu sync.RWMutex // mutex, to protect the store. + raft *raft.Raft // raft, to store the raft group. } // Store is the interface of store. @@ -31,3 +36,34 @@ type Store interface { // GetSize gets the total size of the store. GetSize() int64 } + +// newRaftNode creates a new raft node for the store. +func (s *store) newRaftNode() error { + // All new methods below can add other return values as needed, such as err + + // setup Raft configuration + config := s.newDefaultConfig() + + // setup Raft communication + t := newTransport() + + // create the snapshot store. This allows the Raft to truncate the log. + snapshots := newSnapshot() + + // create the log store and stable store + logStore := newRaftLog() + stableStore := newStableLog() + + // create a new finite state machine + f := newFSM() + + // instantiate the Raft system + r, err := raft.NewRaft(config, f, logStore, stableStore, snapshots, t) + if err != nil { + return err + } + + s.raft = r + + return nil +}