Skip to content

Commit

Permalink
Merge pull request ByteStorage#174 from ByteStorage/sjc-dev
Browse files Browse the repository at this point in the history
feat importance: improve the configuration of cluster mode
  • Loading branch information
sjcsjc123 authored Jul 9, 2023
2 parents 4e8579c + f9307ed commit ba4fec7
Show file tree
Hide file tree
Showing 9 changed files with 375 additions and 1 deletion.
3 changes: 3 additions & 0 deletions cluster/meta/meta.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,9 @@ type MetadataManager interface {
}

// meta stores the metadata of the cluster.
// meta will manage all stores and regions in the cluster.
// meta has at least three nodes in the cluster.
// meta nodes will create a raft group to manage the metadata of the cluster.
type meta struct {
clusterConfig *config.Config // cluster config, including cluster id, cluster name, etc.
heartbeat map[string]time.Time // stores heartbeat, to check whether a store is alive.
Expand Down
5 changes: 5 additions & 0 deletions cluster/region/region.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,11 @@ import (
"sync"
)

// The region is responsible for maintaining the key range and the raftGroups it belongs to,
// directly interacting with the underlying DB APIs,
// and the actual groups where the data falls into the disk.
// raftGroups contains the other raft node of the region, a region has at least three replicas.
// region and replicas are a raft group, and the one region is the leader of the raft group.
// region stores the data of a region.
type region struct {
id uint64 // region id
Expand Down
38 changes: 37 additions & 1 deletion cluster/store/store.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,16 +2,21 @@ package store

import (
"github.com/ByteStorage/FlyDB/cluster/region"
"github.com/hashicorp/raft"
"sync"
)

// The store component is responsible for managing the division and merging of region partitions.
// All regions under the store share a port number.
// Each region under the store is in a raftGroups, and the region clusters in the raftGroups communicate through grpc
// store stores the data of a store.
type store struct {
id uint64 // store id
id string // store id
addr string // store address
regionList map[uint64]*region.Region // region list, to store the regions in the store.
size int64 // size
mu sync.RWMutex // mutex, to protect the store.
raft *raft.Raft // raft, to store the raft group.
}

// Store is the interface of store.
Expand All @@ -31,3 +36,34 @@ type Store interface {
// GetSize gets the total size of the store.
GetSize() int64
}

// newRaftNode creates a new raft node for the store.
func (s *store) newRaftNode() error {
// All new methods below can add other return values as needed, such as err

// setup Raft configuration
config := s.newDefaultConfig()

// setup Raft communication
t := newTransport()

// create the snapshot store. This allows the Raft to truncate the log.
snapshots := newSnapshot()

// create the log store and stable store
logStore := newRaftLog()
stableStore := newStableLog()

// create a new finite state machine
f := newFSM()

// instantiate the Raft system
r, err := raft.NewRaft(config, f, logStore, stableStore, snapshots, t)
if err != nil {
return err
}

s.raft = r

return nil
}
117 changes: 117 additions & 0 deletions cluster/store/store_config.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,117 @@
package store

import (
"github.com/hashicorp/raft"
)

// newDefaultConfig returns a new default raft config
func (s *store) newDefaultConfig() *raft.Config {
return &raft.Config{
// implement me
}
}

// you should read this to ensure suitable config for FlyDB raft, and fill in the newDefaultConfig
// You can check the method `raft.DefaultConfig()` of raft

// // Config provides any necessary configuration for the Raft server.
//type Config struct {
// // ProtocolVersion allows a Raft server to inter-operate with older
// // Raft servers running an older version of the code. This is used to
// // version the wire protocol as well as Raft-specific log entries that
// // the server uses when _speaking_ to other servers. There is currently
// // no auto-negotiation of versions so all servers must be manually
// // configured with compatible versions. See ProtocolVersionMin and
// // ProtocolVersionMax for the versions of the protocol that this server
// // can _understand_.
// ProtocolVersion ProtocolVersion
//
// // HeartbeatTimeout specifies the time in follower state without contact
// // from a leader before we attempt an election.
// HeartbeatTimeout time.Duration
//
// // ElectionTimeout specifies the time in candidate state without contact
// // from a leader before we attempt an election.
// ElectionTimeout time.Duration
//
// // CommitTimeout specifies the time without an Apply operation before the
// // leader sends an AppendEntry RPC to followers, to ensure a timely commit of
// // log entries.
// // Due to random staggering, may be delayed as much as 2x this value.
// CommitTimeout time.Duration
//
// // MaxAppendEntries controls the maximum number of append entries
// // to send at once. We want to strike a balance between efficiency
// // and avoiding waste if the follower is going to reject because of
// // an inconsistent log.
// MaxAppendEntries int
//
// // BatchApplyCh indicates whether we should buffer applyCh
// // to size MaxAppendEntries. This enables batch log commitment,
// // but breaks the timeout guarantee on Apply. Specifically,
// // a log can be added to the applyCh buffer but not actually be
// // processed until after the specified timeout.
// BatchApplyCh bool
//
// // If we are a member of a cluster, and RemovePeer is invoked for the
// // local node, then we forget all peers and transition into the follower state.
// // If ShutdownOnRemove is set, we additional shutdown Raft. Otherwise,
// // we can become a leader of a cluster containing only this node.
// ShutdownOnRemove bool
//
// // TrailingLogs controls how many logs we leave after a snapshot. This is used
// // so that we can quickly replay logs on a follower instead of being forced to
// // send an entire snapshot. The value passed here is the initial setting used.
// // This can be tuned during operation using ReloadConfig.
// TrailingLogs uint64
//
// // SnapshotInterval controls how often we check if we should perform a
// // snapshot. We randomly stagger between this value and 2x this value to avoid
// // the entire cluster from performing a snapshot at once. The value passed
// // here is the initial setting used. This can be tuned during operation using
// // ReloadConfig.
// SnapshotInterval time.Duration
//
// // SnapshotThreshold controls how many outstanding logs there must be before
// // we perform a snapshot. This is to prevent excessive snapshotting by
// // replaying a small set of logs instead. The value passed here is the initial
// // setting used. This can be tuned during operation using ReloadConfig.
// SnapshotThreshold uint64
//
// // LeaderLeaseTimeout is used to control how long the "lease" lasts
// // for being the leader without being able to contact a quorum
// // of nodes. If we reach this interval without contact, we will
// // step down as leader.
// LeaderLeaseTimeout time.Duration
//
// // LocalID is a unique ID for this server across all time. When running with
// // ProtocolVersion < 3, you must set this to be the same as the network
// // address of your transport.
// LocalID ServerID
//
// // NotifyCh is used to provide a channel that will be notified of leadership
// // changes. Raft will block writing to this channel, so it should either be
// // buffered or aggressively consumed.
// NotifyCh chan<- bool
//
// // LogOutput is used as a sink for logs, unless Logger is specified.
// // Defaults to os.Stderr.
// LogOutput io.Writer
//
// // LogLevel represents a log level. If the value does not match a known
// // logging level hclog.NoLevel is used.
// LogLevel string
//
// // Logger is a user-provided logger. If nil, a logger writing to
// // LogOutput with LogLevel is used.
// Logger hclog.Logger
//
// // NoSnapshotRestoreOnStart controls if raft will restore a snapshot to the
// // FSM on start. This is useful if your FSM recovers from other mechanisms
// // than raft snapshotting. Snapshot metadata will still be used to initialize
// // raft's configuration and index values.
// NoSnapshotRestoreOnStart bool
//
// // skipStartup allows NewRaft() to bypass all background work goroutines
// skipStartup bool
//}
30 changes: 30 additions & 0 deletions cluster/store/store_fsm.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
package store

import (
"github.com/hashicorp/raft"
"io"
)

// fsm implements raft.FSM interface
type fsm struct {
//implement me
}

func newFSM() raft.FSM {
return &fsm{}
}

func (f fsm) Apply(log *raft.Log) interface{} {
//TODO implement me
panic("implement me")
}

func (f fsm) Snapshot() (raft.FSMSnapshot, error) {
//TODO implement me
panic("implement me")
}

func (f fsm) Restore(snapshot io.ReadCloser) error {
//TODO implement me
panic("implement me")
}
45 changes: 45 additions & 0 deletions cluster/store/store_log.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
package store

import "github.com/hashicorp/raft"

// raftLog implements raft.LogStore interface, we can use it to store raft logs
// how to store raft logs? we can use FlyDB/RocksDB/LevelDB/BoltDB to store raft logs
// maybe we can use FlyDB
type raftLog struct {
// implement me
}

// newRaftLog returns a new raftLog
func newRaftLog() raft.LogStore {
return &raftLog{}
}

func (r *raftLog) FirstIndex() (uint64, error) {
//TODO implement me
panic("implement me")
}

func (r *raftLog) LastIndex() (uint64, error) {
//TODO implement me
panic("implement me")
}

func (r *raftLog) GetLog(index uint64, log *raft.Log) error {
//TODO implement me
panic("implement me")
}

func (r *raftLog) StoreLog(log *raft.Log) error {
//TODO implement me
panic("implement me")
}

func (r *raftLog) StoreLogs(logs []*raft.Log) error {
//TODO implement me
panic("implement me")
}

func (r *raftLog) DeleteRange(min, max uint64) error {
//TODO implement me
panic("implement me")
}
30 changes: 30 additions & 0 deletions cluster/store/store_snapshot.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
package store

import (
"github.com/hashicorp/raft"
"io"
)

// snapshot implements raft.SnapshotStore interface
type snapshot struct {
//implement me
}

func newSnapshot() raft.SnapshotStore {
return &snapshot{}
}

func (s snapshot) Create(version raft.SnapshotVersion, index, term uint64, configuration raft.Configuration, configurationIndex uint64, trans raft.Transport) (raft.SnapshotSink, error) {
//TODO implement me
panic("implement me")
}

func (s snapshot) List() ([]*raft.SnapshotMeta, error) {
//TODO implement me
panic("implement me")
}

func (s snapshot) Open(id string) (*raft.SnapshotMeta, io.ReadCloser, error) {
//TODO implement me
panic("implement me")
}
35 changes: 35 additions & 0 deletions cluster/store/store_stable.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
package store

import "github.com/hashicorp/raft"

// stableLog implements raft.StableStore interface, we can use it to store raft stable logs
// how to store raft stable logs? we can use FlyDB/RocksDB/LevelDB/BoltDB to store raft stable logs
// maybe we can use FlyDB
type stableLog struct {
// implement me
}

// newStableLog returns a new stableLog, we can use it to store raft stable logs
func newStableLog() raft.StableStore {
return &stableLog{}
}

func (s stableLog) Set(key []byte, val []byte) error {
//TODO implement me
panic("implement me")
}

func (s stableLog) Get(key []byte) ([]byte, error) {
//TODO implement me
panic("implement me")
}

func (s stableLog) SetUint64(key []byte, val uint64) error {
//TODO implement me
panic("implement me")
}

func (s stableLog) GetUint64(key []byte) (uint64, error) {
//TODO implement me
panic("implement me")
}
Loading

0 comments on commit ba4fec7

Please sign in to comment.