Skip to content

Commit

Permalink
Merge pull request #252 from ByteStorage/develop
Browse files Browse the repository at this point in the history
refactor component for store and region
  • Loading branch information
sjcsjc123 authored Jul 29, 2023
2 parents 4eeb2d8 + e481aed commit e46dcef
Show file tree
Hide file tree
Showing 16 changed files with 321 additions and 87 deletions.
82 changes: 82 additions & 0 deletions cluster/meta/meta.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package meta

import (
"fmt"
"github.com/ByteStorage/FlyDB/cluster/region"
"github.com/ByteStorage/FlyDB/cluster/store"
"github.com/ByteStorage/FlyDB/config"
Expand All @@ -24,6 +25,8 @@ type MetadataManager interface {
GetRegionByID(id uint64) (*region.Region, error)
// GetStoreByID gets a store by id.
GetStoreByID(id uint64) (*store.Store, error)
// ApplyConfig applies a new config to the cluster.
ApplyConfig(config *config.Config) error
}

// meta stores the metadata of the cluster.
Expand All @@ -40,3 +43,82 @@ type meta struct {
scheduler *Scheduler // scheduler, to schedule the cluster.
raft *raft.Raft // raft, to store the raft group.
}

// GetStore gets a store by address.
func (m *meta) GetStore(addr string) (*store.Store, error) {
//TODO implement me
panic("implement me")
}

// AddStore adds a new store to the cluster.
func (m *meta) AddStore(addr string) error {
//TODO implement me
panic("implement me")
}

// RemoveStore removes a store from the cluster.
func (m *meta) RemoveStore(addr string) error {
//TODO implement me
panic("implement me")
}

// GetAllStores gets all stores in the cluster.
func (m *meta) GetAllStores() []*store.Store {
//TODO implement me
panic("implement me")
}

// GetRegionByID gets a region by id.
func (m *meta) GetRegionByID(id uint64) (*region.Region, error) {
//TODO implement me
panic("implement me")
}

// GetStoreByID gets a store by id.
func (m *meta) GetStoreByID(id uint64) (*store.Store, error) {
//TODO implement me
panic("implement me")
}

// ApplyConfig applies a new config to the cluster.
func (m *meta) ApplyConfig(config *config.Config) error {
m.mu.Lock()
defer m.mu.Unlock()
if m != nil {
err := m.stop()
if err != nil {
// if err = ErrNotStarted, it means the meta node has not started yet.
}
err = m.start()
if err != nil {
return err
}
}
return nil
}

func (m *meta) start() error {
for _, metaNode := range m.clusterConfig.MetaNodes {
// ssh to the meta node
// start the meta node
fmt.Println(metaNode)
panic("implement me")
}
return nil
}

func (m *meta) stop() error {
//TODO implement me
panic("implement me")
}

// NewMeta creates a new meta.
func NewMeta(conf config.Config) MetadataManager {
return &meta{
clusterConfig: &conf,
heartbeat: make(map[string]time.Time),
dirTree: dirtree.NewDirTree(),
stores: make(map[string]*store.Store),
regions: make(map[uint64]*region.Region),
}
}
70 changes: 69 additions & 1 deletion cluster/region/region.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package region

import (
"errors"
"github.com/ByteStorage/FlyDB/config"
"github.com/ByteStorage/FlyDB/engine"
"github.com/hashicorp/raft"
"sync"
Expand All @@ -14,7 +15,7 @@ import (
// region and replicas are a raft group, and the one region is the leader of the raft group.
// region stores the data of a region.
type region struct {
id uint64 // region id
id int64 // region id
startKey []byte // start key
endKey []byte // end key
db *engine.DB // db, to store the data.
Expand All @@ -24,6 +25,7 @@ type region struct {
peers []string // peers
size int64 // size
mu sync.RWMutex // mutex, to protect the region.
conf config.Config // config
}

// Region is the interface of region.
Expand All @@ -50,6 +52,66 @@ type Region interface {
RemovePeer(peer string) error
// GetSize gets the total size of the region.
GetSize() int64
// GetID gets the id of the region.
GetID() int64
}

func NewRegion(conf config.RegionConfig) (Region, error) {
db, err := engine.NewDB(conf.Options)
if err != nil {
return nil, errors.New("new db failed")
}
raftNode, err := newRaftNode(conf.Config)
if err != nil {
return nil, errors.New("new raft node failed")
}
return &region{
startKey: conf.Start,
endKey: conf.End,
raftGroups: make(map[uint64]*raft.Raft),
db: db,
mu: sync.RWMutex{},
conf: conf.Config,
raft: raftNode,
}, nil
}

// newRaftNode creates a new raft node for the store.
func newRaftNode(conf config.Config) (*raft.Raft, error) {
// All new methods below can add other return values as needed, such as err

// create default config for raft
raftConfig := newDefaultConfig()

// setup Raft communication
t := newTransport()

// create the snapshot store. This allows the Raft to truncate the log.
snapshots, err := newSnapshotStore(conf)
if err != nil {
return nil, err
}

// create the log store and stable store
logStore, err := newRaftLog(conf)
if err != nil {
return nil, err
}
stableStore, err := newStableLog(conf)
if err != nil {
return nil, err
}

// create a new finite state machine
f := newFSM()

// instantiate the Raft system
r, err := raft.NewRaft(raftConfig, f, logStore, stableStore, snapshots, t)
if err != nil {
return nil, err
}

return r, nil
}

func (r *region) Put(key []byte, value []byte) error {
Expand Down Expand Up @@ -145,3 +207,9 @@ func (r *region) GetSize() int64 {
defer r.mu.RUnlock()
return r.size
}

func (r *region) GetID() int64 {
r.mu.RLock()
defer r.mu.RUnlock()
return r.id
}
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
package store
package region

import (
"github.com/hashicorp/raft"
Expand All @@ -7,7 +7,7 @@ import (
)

// newDefaultConfig returns a new default raft config
func (s *store) newDefaultConfig() *raft.Config {
func newDefaultConfig() *raft.Config {
return &raft.Config{
ProtocolVersion: raft.ProtocolVersionMax, // using latest protocol version
HeartbeatTimeout: 1000 * time.Millisecond,
Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
package store
package region

import (
"github.com/hashicorp/raft"
Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
package store
package region

import (
"fmt"
Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
package store
package region

import (
"github.com/ByteStorage/FlyDB/config"
Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
package store
package region

import (
"errors"
Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
package store
package region

import (
"github.com/ByteStorage/FlyDB/config"
Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
package store
package region

import (
"github.com/ByteStorage/FlyDB/config"
Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
package store
package region

import (
"github.com/hashicorp/raft"
Expand Down
Loading

0 comments on commit e46dcef

Please sign in to comment.