From 80b2e948907b982b5df8008af98afb171509bdaf Mon Sep 17 00:00:00 2001 From: sjcsjc123 <1401189096@qq.com> Date: Sat, 29 Jul 2023 00:19:49 +0800 Subject: [PATCH 01/10] modify store.go detail --- cluster/region/region.go | 17 ++++++++++++ cluster/store/store.go | 56 ++++++++++++++++++++++++++++------------ 2 files changed, 57 insertions(+), 16 deletions(-) diff --git a/cluster/region/region.go b/cluster/region/region.go index 490c2626..ed0e2428 100644 --- a/cluster/region/region.go +++ b/cluster/region/region.go @@ -2,6 +2,7 @@ package region import ( "errors" + "github.com/ByteStorage/FlyDB/config" "github.com/ByteStorage/FlyDB/engine" "github.com/hashicorp/raft" "sync" @@ -24,6 +25,7 @@ type region struct { peers []string // peers size int64 // size mu sync.RWMutex // mutex, to protect the region. + conf config.Config // config } // Region is the interface of region. @@ -52,6 +54,21 @@ type Region interface { GetSize() int64 } +func NewRegion(start []byte, end []byte, options config.Options, conf config.Config) (Region, error) { + db, err := engine.NewDB(options) + if err != nil { + return nil, errors.New("new db failed") + } + return ®ion{ + startKey: start, + endKey: end, + raftGroups: make(map[uint64]*raft.Raft), + db: db, + mu: sync.RWMutex{}, + conf: conf, + }, nil +} + func (r *region) Put(key []byte, value []byte) error { r.mu.RLock() defer r.mu.RUnlock() diff --git a/cluster/store/store.go b/cluster/store/store.go index 3d124c98..b6eb760d 100644 --- a/cluster/store/store.go +++ b/cluster/store/store.go @@ -1,6 +1,7 @@ package store import ( + "bytes" "errors" "github.com/ByteStorage/FlyDB/cluster/region" "github.com/ByteStorage/FlyDB/config" @@ -16,36 +17,41 @@ type store struct { id string // store id conf config.Config opts config.Options - addr string // store address - regionList map[uint64]*region.Region // region list, to store the regions in the store. - size int64 // size - mu sync.RWMutex // mutex, to protect the store. - raft *raft.Raft // raft, to store the raft group. + addr string // store address + regionList map[uint64]region.Region // region list, to store the regions in the store. + size int64 // size + mu sync.RWMutex // mutex, to protect the store. + raft *raft.Raft // raft, to store the raft group. } // Store is the interface of store. type Store interface { // GetRegionByKey gets region and leader peer by region key from cluster. - GetRegionByKey(key []byte) (*region.Region, error) + GetRegionByKey(key []byte) (region.Region, error) // GetRegionByID gets region and leader peer by region id from cluster. - GetRegionByID(id uint64) (*region.Region, error) + GetRegionByID(id uint64) (region.Region, error) // AddRegion adds a new region to cluster. - AddRegion(region *region.Region) error + AddRegion(region region.Region) error // RemoveRegion removes a region from cluster. RemoveRegion(id uint64) error // Split splits the region into two regions. - Split(region *region.Region, splitKey []byte) error + Split(region region.Region, splitKey []byte) error // Merge merges two adjacent regions into one region. - Merge(regionA *region.Region, regionB *region.Region) error + Merge(regionA region.Region, regionB region.Region) error // GetSize gets the total size of the store. GetSize() int64 } -func (s *store) GetRegionByKey(key []byte) (*region.Region, error) { - panic("implement me") +func (s *store) GetRegionByKey(key []byte) (region.Region, error) { + for _, r := range s.regionList { + if isKeyInRange(key, r.GetStartKey(), r.GetEndKey()) { + return r, nil + } + } + return nil, errors.New("the specified region does not exist") } -func (s *store) GetRegionByID(id uint64) (*region.Region, error) { +func (s *store) GetRegionByID(id uint64) (region.Region, error) { s.mu.RLock() defer s.mu.RUnlock() if _, ok := s.regionList[id]; !ok { @@ -54,7 +60,7 @@ func (s *store) GetRegionByID(id uint64) (*region.Region, error) { return s.regionList[id], nil } -func (s *store) AddRegion(region *region.Region) error { +func (s *store) AddRegion(region region.Region) error { panic("implement me") } @@ -62,11 +68,11 @@ func (s *store) RemoveRegion(id uint64) error { panic("implement me") } -func (s *store) Split(region *region.Region, splitKey []byte) error { +func (s *store) Split(region region.Region, splitKey []byte) error { panic("implement me") } -func (s *store) Merge(regionA *region.Region, regionB *region.Region) error { +func (s *store) Merge(regionA region.Region, regionB region.Region) error { panic("implement me") } @@ -113,3 +119,21 @@ func (s *store) newRaftNode() error { return nil } + +// isKeyInRange checks if the key is in the range of the region. +func isKeyInRange(key, startRange, endRange []byte) bool { + // Compare the key to the start of the range + // If key < startRange, it's not in range + if bytes.Compare(key, startRange) < 0 { + return false + } + + // Compare the key to the end of the range + // If key >= endRange, it's not in range + if bytes.Compare(key, endRange) >= 0 { + return false + } + + // If neither of the above, the key is in range + return true +} From 36177f4b091683b456451e809841c528e6bb1f16 Mon Sep 17 00:00:00 2001 From: sjcsjc123 <1401189096@qq.com> Date: Sat, 29 Jul 2023 00:32:45 +0800 Subject: [PATCH 02/10] refactor component for store and region --- cluster/region/region.go | 38 ++++++++++++++++++ .../region_config.go} | 4 +- .../store_fsm.go => region/region_fsm.go} | 2 +- .../store_log.go => region/region_log.go} | 2 +- .../region_log_test.go} | 2 +- .../region_snapshot.go} | 2 +- .../region_snapshot_test.go} | 2 +- .../region_stable.go} | 2 +- .../region_transport.go} | 2 +- cluster/store/store.go | 40 ------------------- 10 files changed, 47 insertions(+), 49 deletions(-) rename cluster/{store/store_config.go => region/region_config.go} (93%) rename cluster/{store/store_fsm.go => region/region_fsm.go} (96%) rename cluster/{store/store_log.go => region/region_log.go} (99%) rename cluster/{store/store_log_test.go => region/region_log_test.go} (99%) rename cluster/{store/store_snapshot.go => region/region_snapshot.go} (98%) rename cluster/{store/store_snapshot_test.go => region/region_snapshot_test.go} (98%) rename cluster/{store/store_stable.go => region/region_stable.go} (94%) rename cluster/{store/store_transport.go => region/region_transport.go} (99%) diff --git a/cluster/region/region.go b/cluster/region/region.go index ed0e2428..b153b647 100644 --- a/cluster/region/region.go +++ b/cluster/region/region.go @@ -69,6 +69,44 @@ func NewRegion(start []byte, end []byte, options config.Options, conf config.Con }, nil } +// newRaftNode creates a new raft node for the store. +func newRaftNode(conf config.Config) (*raft.Raft, error) { + // All new methods below can add other return values as needed, such as err + + // create default config for raft + raftConfig := newDefaultConfig() + + // setup Raft communication + t := newTransport() + + // create the snapshot store. This allows the Raft to truncate the log. + snapshots, err := newSnapshotStore(conf) + if err != nil { + return nil, err + } + + // create the log store and stable store + logStore, err := newRaftLog(conf) + if err != nil { + return nil, err + } + stableStore, err := newStableLog(conf) + if err != nil { + return nil, err + } + + // create a new finite state machine + f := newFSM() + + // instantiate the Raft system + r, err := raft.NewRaft(raftConfig, f, logStore, stableStore, snapshots, t) + if err != nil { + return nil, err + } + + return r, nil +} + func (r *region) Put(key []byte, value []byte) error { r.mu.RLock() defer r.mu.RUnlock() diff --git a/cluster/store/store_config.go b/cluster/region/region_config.go similarity index 93% rename from cluster/store/store_config.go rename to cluster/region/region_config.go index 6a0acfb9..1f0dbbbd 100644 --- a/cluster/store/store_config.go +++ b/cluster/region/region_config.go @@ -1,4 +1,4 @@ -package store +package region import ( "github.com/hashicorp/raft" @@ -7,7 +7,7 @@ import ( ) // newDefaultConfig returns a new default raft config -func (s *store) newDefaultConfig() *raft.Config { +func newDefaultConfig() *raft.Config { return &raft.Config{ ProtocolVersion: raft.ProtocolVersionMax, // using latest protocol version HeartbeatTimeout: 1000 * time.Millisecond, diff --git a/cluster/store/store_fsm.go b/cluster/region/region_fsm.go similarity index 96% rename from cluster/store/store_fsm.go rename to cluster/region/region_fsm.go index f6b3c34b..fbfbd3de 100644 --- a/cluster/store/store_fsm.go +++ b/cluster/region/region_fsm.go @@ -1,4 +1,4 @@ -package store +package region import ( "github.com/hashicorp/raft" diff --git a/cluster/store/store_log.go b/cluster/region/region_log.go similarity index 99% rename from cluster/store/store_log.go rename to cluster/region/region_log.go index 16bc5739..3d012622 100644 --- a/cluster/store/store_log.go +++ b/cluster/region/region_log.go @@ -1,4 +1,4 @@ -package store +package region import ( "fmt" diff --git a/cluster/store/store_log_test.go b/cluster/region/region_log_test.go similarity index 99% rename from cluster/store/store_log_test.go rename to cluster/region/region_log_test.go index 85df0a6d..a46b8818 100644 --- a/cluster/store/store_log_test.go +++ b/cluster/region/region_log_test.go @@ -1,4 +1,4 @@ -package store +package region import ( "github.com/ByteStorage/FlyDB/config" diff --git a/cluster/store/store_snapshot.go b/cluster/region/region_snapshot.go similarity index 98% rename from cluster/store/store_snapshot.go rename to cluster/region/region_snapshot.go index cdcc6726..f4a96551 100644 --- a/cluster/store/store_snapshot.go +++ b/cluster/region/region_snapshot.go @@ -1,4 +1,4 @@ -package store +package region import ( "errors" diff --git a/cluster/store/store_snapshot_test.go b/cluster/region/region_snapshot_test.go similarity index 98% rename from cluster/store/store_snapshot_test.go rename to cluster/region/region_snapshot_test.go index 8d963a02..1e1d3749 100644 --- a/cluster/store/store_snapshot_test.go +++ b/cluster/region/region_snapshot_test.go @@ -1,4 +1,4 @@ -package store +package region import ( "github.com/ByteStorage/FlyDB/config" diff --git a/cluster/store/store_stable.go b/cluster/region/region_stable.go similarity index 94% rename from cluster/store/store_stable.go rename to cluster/region/region_stable.go index bef728e0..7f7bad52 100644 --- a/cluster/store/store_stable.go +++ b/cluster/region/region_stable.go @@ -1,4 +1,4 @@ -package store +package region import ( "github.com/ByteStorage/FlyDB/config" diff --git a/cluster/store/store_transport.go b/cluster/region/region_transport.go similarity index 99% rename from cluster/store/store_transport.go rename to cluster/region/region_transport.go index 7f61015b..ff45be77 100644 --- a/cluster/store/store_transport.go +++ b/cluster/region/region_transport.go @@ -1,4 +1,4 @@ -package store +package region import ( "github.com/hashicorp/raft" diff --git a/cluster/store/store.go b/cluster/store/store.go index b6eb760d..d0f8fb68 100644 --- a/cluster/store/store.go +++ b/cluster/store/store.go @@ -80,46 +80,6 @@ func (s *store) GetSize() int64 { panic("implement me") } -// newRaftNode creates a new raft node for the store. -func (s *store) newRaftNode() error { - // All new methods below can add other return values as needed, such as err - - // setup Raft configuration - conf := s.newDefaultConfig() - - // setup Raft communication - t := newTransport() - - // create the snapshot store. This allows the Raft to truncate the log. - snapshots, err := newSnapshotStore(s.conf) - if err != nil { - return err - } - - // create the log store and stable store - logStore, err := newRaftLog(s.conf) - if err != nil { - return err - } - stableStore, err := newStableLog(s.conf) - if err != nil { - return err - } - - // create a new finite state machine - f := newFSM() - - // instantiate the Raft system - r, err := raft.NewRaft(conf, f, logStore, stableStore, snapshots, t) - if err != nil { - return err - } - - s.raft = r - - return nil -} - // isKeyInRange checks if the key is in the range of the region. func isKeyInRange(key, startRange, endRange []byte) bool { // Compare the key to the start of the range From 5e3aaf11851497002bccf97d9e5f45b39d5db25d Mon Sep 17 00:00:00 2001 From: sjcsjc123 <1401189096@qq.com> Date: Sat, 29 Jul 2023 00:34:12 +0800 Subject: [PATCH 03/10] refactor component for store and region --- cluster/region/region.go | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/cluster/region/region.go b/cluster/region/region.go index b153b647..9e5bf7f1 100644 --- a/cluster/region/region.go +++ b/cluster/region/region.go @@ -59,6 +59,10 @@ func NewRegion(start []byte, end []byte, options config.Options, conf config.Con if err != nil { return nil, errors.New("new db failed") } + raftNode, err := newRaftNode(conf) + if err != nil { + return nil, errors.New("new raft node failed") + } return ®ion{ startKey: start, endKey: end, @@ -66,6 +70,7 @@ func NewRegion(start []byte, end []byte, options config.Options, conf config.Con db: db, mu: sync.RWMutex{}, conf: conf, + raft: raftNode, }, nil } From 86db284e98fc44db4e8616b474c49e7077206004 Mon Sep 17 00:00:00 2001 From: sjcsjc123 <1401189096@qq.com> Date: Sat, 29 Jul 2023 01:09:50 +0800 Subject: [PATCH 04/10] refactor component for store and region --- cluster/region/region.go | 10 ++++++- cluster/store/store.go | 60 ++++++++++++++++++++++++++-------------- config/store_config.go | 8 ++++++ go.mod | 7 +++-- go.sum | 2 ++ 5 files changed, 62 insertions(+), 25 deletions(-) create mode 100644 config/store_config.go diff --git a/cluster/region/region.go b/cluster/region/region.go index 9e5bf7f1..d3c41985 100644 --- a/cluster/region/region.go +++ b/cluster/region/region.go @@ -15,7 +15,7 @@ import ( // region and replicas are a raft group, and the one region is the leader of the raft group. // region stores the data of a region. type region struct { - id uint64 // region id + id int64 // region id startKey []byte // start key endKey []byte // end key db *engine.DB // db, to store the data. @@ -52,6 +52,8 @@ type Region interface { RemovePeer(peer string) error // GetSize gets the total size of the region. GetSize() int64 + // GetID gets the id of the region. + GetID() int64 } func NewRegion(start []byte, end []byte, options config.Options, conf config.Config) (Region, error) { @@ -205,3 +207,9 @@ func (r *region) GetSize() int64 { defer r.mu.RUnlock() return r.size } + +func (r *region) GetID() int64 { + r.mu.RLock() + defer r.mu.RUnlock() + return r.id +} diff --git a/cluster/store/store.go b/cluster/store/store.go index d0f8fb68..e4e1900d 100644 --- a/cluster/store/store.go +++ b/cluster/store/store.go @@ -5,23 +5,27 @@ import ( "errors" "github.com/ByteStorage/FlyDB/cluster/region" "github.com/ByteStorage/FlyDB/config" - "github.com/hashicorp/raft" + "github.com/bwmarrin/snowflake" "sync" ) +var ( + MinKey []byte // Min Key of the all regions + MaxKey = []byte{255, 255, 255, 255, 255, 255, 255, 255} // Max Key of the all regions +) + // The store component is responsible for managing the division and merging of region partitions. // All regions under the store share a port number. // Each region under the store is in a raftGroups, and the region clusters in the raftGroups communicate through grpc // store stores the data of a store. type store struct { - id string // store id + id int64 // store id conf config.Config opts config.Options - addr string // store address - regionList map[uint64]region.Region // region list, to store the regions in the store. - size int64 // size - mu sync.RWMutex // mutex, to protect the store. - raft *raft.Raft // raft, to store the raft group. + addr string // store address + regionList map[int64]region.Region // region list, to store the regions in the store. + mu sync.RWMutex // mutex, to protect the store. + node *snowflake.Node // snowflake node, to generate the id. } // Store is the interface of store. @@ -29,11 +33,7 @@ type Store interface { // GetRegionByKey gets region and leader peer by region key from cluster. GetRegionByKey(key []byte) (region.Region, error) // GetRegionByID gets region and leader peer by region id from cluster. - GetRegionByID(id uint64) (region.Region, error) - // AddRegion adds a new region to cluster. - AddRegion(region region.Region) error - // RemoveRegion removes a region from cluster. - RemoveRegion(id uint64) error + GetRegionByID(id int64) (region.Region, error) // Split splits the region into two regions. Split(region region.Region, splitKey []byte) error // Merge merges two adjacent regions into one region. @@ -42,6 +42,32 @@ type Store interface { GetSize() int64 } +// NewStore creates a new store. +func NewStore(conf config.StoreConfig) (Store, error) { + // create a new region, when initialize, a store just has one region. + // when the region size exceeds the threshold, the region will be split into two regions. + newRegion, err := region.NewRegion(MinKey, MaxKey, conf.Options, conf.Config) + if err != nil { + return nil, err + } + // create a new snowflake node. + node, err := snowflake.NewNode(conf.Id) + if err != nil { + return nil, err + } + return &store{ + id: conf.Id, + node: node, + regionList: map[int64]region.Region{ + newRegion.GetID(): newRegion, + }, + addr: conf.Addr, + conf: conf.Config, + opts: conf.Options, + mu: sync.RWMutex{}, + }, nil +} + func (s *store) GetRegionByKey(key []byte) (region.Region, error) { for _, r := range s.regionList { if isKeyInRange(key, r.GetStartKey(), r.GetEndKey()) { @@ -51,7 +77,7 @@ func (s *store) GetRegionByKey(key []byte) (region.Region, error) { return nil, errors.New("the specified region does not exist") } -func (s *store) GetRegionByID(id uint64) (region.Region, error) { +func (s *store) GetRegionByID(id int64) (region.Region, error) { s.mu.RLock() defer s.mu.RUnlock() if _, ok := s.regionList[id]; !ok { @@ -60,14 +86,6 @@ func (s *store) GetRegionByID(id uint64) (region.Region, error) { return s.regionList[id], nil } -func (s *store) AddRegion(region region.Region) error { - panic("implement me") -} - -func (s *store) RemoveRegion(id uint64) error { - panic("implement me") -} - func (s *store) Split(region region.Region, splitKey []byte) error { panic("implement me") } diff --git a/config/store_config.go b/config/store_config.go new file mode 100644 index 00000000..d75b714c --- /dev/null +++ b/config/store_config.go @@ -0,0 +1,8 @@ +package config + +type StoreConfig struct { + Options Options + Config Config + Id int64 + Addr string +} diff --git a/go.mod b/go.mod index b589d54a..7458ff5f 100644 --- a/go.mod +++ b/go.mod @@ -5,14 +5,18 @@ go 1.18 require ( github.com/bits-and-blooms/bitset v1.8.0 github.com/boltdb/bolt v1.3.1 + github.com/bwmarrin/snowflake v0.3.0 github.com/chen3feng/stl4go v0.1.1 github.com/desertbit/grumble v1.1.3 + github.com/edsrzf/mmap-go v1.1.0 github.com/fatih/color v1.13.0 + github.com/golang/protobuf v1.5.3 github.com/google/btree v1.1.2 github.com/hashicorp/go-msgpack v0.5.5 github.com/hashicorp/raft v1.5.0 github.com/hashicorp/raft-boltdb v0.0.0-20230125174641-2a8082862702 github.com/klauspost/reedsolomon v1.11.7 + github.com/pkg/errors v0.9.1 github.com/plar/go-adaptive-radix-tree v1.0.5 github.com/stretchr/testify v1.8.2 github.com/tecbot/gorocksdb v0.0.0-20191217155057-f0fad39f321c @@ -30,11 +34,9 @@ require ( github.com/desertbit/columnize v2.1.0+incompatible // indirect github.com/desertbit/go-shlex v0.1.1 // indirect github.com/desertbit/readline v1.5.1 // indirect - github.com/edsrzf/mmap-go v1.1.0 // indirect github.com/facebookgo/ensure v0.0.0-20200202191622-63f1cf65ac4c // indirect github.com/facebookgo/stack v0.0.0-20160209184415-751773369052 // indirect github.com/facebookgo/subset v0.0.0-20200203212716-c811ad88dec4 // indirect - github.com/golang/protobuf v1.5.3 // indirect github.com/hashicorp/errwrap v1.1.0 // indirect github.com/hashicorp/go-hclog v1.5.0 // indirect github.com/hashicorp/go-immutable-radix v1.0.0 // indirect @@ -44,7 +46,6 @@ require ( github.com/kr/pretty v0.3.0 // indirect github.com/mattn/go-colorable v0.1.12 // indirect github.com/mattn/go-isatty v0.0.16 // indirect - github.com/pkg/errors v0.9.1 // indirect github.com/pmezard/go-difflib v1.0.0 // indirect github.com/rogpeppe/go-internal v1.9.0 // indirect github.com/tidwall/gjson v1.14.4 // indirect diff --git a/go.sum b/go.sum index d6b87b1b..fc1015ee 100644 --- a/go.sum +++ b/go.sum @@ -18,6 +18,8 @@ github.com/bits-and-blooms/bitset v1.8.0 h1:FD+XqgOZDUxxZ8hzoBFuV9+cGWY9CslN6d5M github.com/bits-and-blooms/bitset v1.8.0/go.mod h1:7hO7Gc7Pp1vODcmWvKMRA9BNmbv6a/7QIWpPxHddWR8= github.com/boltdb/bolt v1.3.1 h1:JQmyP4ZBrce+ZQu0dY660FMfatumYDLun9hBCUVIkF4= github.com/boltdb/bolt v1.3.1/go.mod h1:clJnj/oiGkjum5o1McbSZDSLxVThjynRyGBgiAx27Ps= +github.com/bwmarrin/snowflake v0.3.0 h1:xm67bEhkKh6ij1790JB83OujPR5CzNe8QuQqAgISZN0= +github.com/bwmarrin/snowflake v0.3.0/go.mod h1:NdZxfVWX+oR6y2K0o6qAYv6gIOP9rjG0/E9WsDpxqwE= github.com/cespare/xxhash/v2 v2.1.1/go.mod h1:VGX0DQ3Q6kWi7AoAeZDth3/j3BFtOZR5XLFGgcrjCOs= github.com/chen3feng/stl4go v0.1.1 h1:0L1+mDw7pomftKDruM23f1mA7miavOj6C6MZeadzN2Q= github.com/chen3feng/stl4go v0.1.1/go.mod h1:5ml3psLgETJjRJnMbPE+JiHLrCpt+Ajc2weeTECXzWU= From 54c7c0edd6efd3ee87887b4c68b6c70df3d565d7 Mon Sep 17 00:00:00 2001 From: sjcsjc123 <1401189096@qq.com> Date: Sat, 29 Jul 2023 01:21:06 +0800 Subject: [PATCH 05/10] refactor component for store and region --- cluster/region/region.go | 12 ++++---- cluster/store/store.go | 60 ++++++++++++++++++++++++++++++++++++---- config/region_config.go | 9 ++++++ 3 files changed, 69 insertions(+), 12 deletions(-) create mode 100644 config/region_config.go diff --git a/cluster/region/region.go b/cluster/region/region.go index d3c41985..8eb2e859 100644 --- a/cluster/region/region.go +++ b/cluster/region/region.go @@ -56,22 +56,22 @@ type Region interface { GetID() int64 } -func NewRegion(start []byte, end []byte, options config.Options, conf config.Config) (Region, error) { - db, err := engine.NewDB(options) +func NewRegion(conf config.RegionConfig) (Region, error) { + db, err := engine.NewDB(conf.Options) if err != nil { return nil, errors.New("new db failed") } - raftNode, err := newRaftNode(conf) + raftNode, err := newRaftNode(conf.Config) if err != nil { return nil, errors.New("new raft node failed") } return ®ion{ - startKey: start, - endKey: end, + startKey: conf.Start, + endKey: conf.End, raftGroups: make(map[uint64]*raft.Raft), db: db, mu: sync.RWMutex{}, - conf: conf, + conf: conf.Config, raft: raftNode, }, nil } diff --git a/cluster/store/store.go b/cluster/store/store.go index e4e1900d..80c5d042 100644 --- a/cluster/store/store.go +++ b/cluster/store/store.go @@ -10,8 +10,9 @@ import ( ) var ( - MinKey []byte // Min Key of the all regions - MaxKey = []byte{255, 255, 255, 255, 255, 255, 255, 255} // Max Key of the all regions + MinKey []byte // Min Key of the all regions + MaxKey = []byte{255, 255, 255, 255, 255, 255, 255, 255} // Max Key of the all regions + Threshold int64 = 256 * 1024 * 1024 // Threshold of the region size ) // The store component is responsible for managing the division and merging of region partitions. @@ -35,7 +36,7 @@ type Store interface { // GetRegionByID gets region and leader peer by region id from cluster. GetRegionByID(id int64) (region.Region, error) // Split splits the region into two regions. - Split(region region.Region, splitKey []byte) error + Split() error // Merge merges two adjacent regions into one region. Merge(regionA region.Region, regionB region.Region) error // GetSize gets the total size of the store. @@ -44,9 +45,16 @@ type Store interface { // NewStore creates a new store. func NewStore(conf config.StoreConfig) (Store, error) { + // create a new region config. + regionConfig := config.RegionConfig{ + Options: conf.Options, + Config: conf.Config, + Start: MinKey, + End: MaxKey, + } // create a new region, when initialize, a store just has one region. // when the region size exceeds the threshold, the region will be split into two regions. - newRegion, err := region.NewRegion(MinKey, MaxKey, conf.Options, conf.Config) + newRegion, err := region.NewRegion(regionConfig) if err != nil { return nil, err } @@ -68,6 +76,7 @@ func NewStore(conf config.StoreConfig) (Store, error) { }, nil } +// GetRegionByKey gets region by region key from store. func (s *store) GetRegionByKey(key []byte) (region.Region, error) { for _, r := range s.regionList { if isKeyInRange(key, r.GetStartKey(), r.GetEndKey()) { @@ -77,6 +86,7 @@ func (s *store) GetRegionByKey(key []byte) (region.Region, error) { return nil, errors.New("the specified region does not exist") } +// GetRegionByID gets region by region id from store. func (s *store) GetRegionByID(id int64) (region.Region, error) { s.mu.RLock() defer s.mu.RUnlock() @@ -86,8 +96,36 @@ func (s *store) GetRegionByID(id int64) (region.Region, error) { return s.regionList[id], nil } -func (s *store) Split(region region.Region, splitKey []byte) error { - panic("implement me") +func (s *store) Split() error { + for _, r := range s.regionList { + if r.GetSize() >= Threshold { + // define the middle key + end := r.GetEndKey() + start := r.GetStartKey() + middle := make([]byte, len(end)) + for i := 0; i < len(end); i++ { + middle[i] = (end[i] + start[i]) / 2 + } + // create a new region config + regionConfig := config.RegionConfig{ + Options: s.opts, + Config: s.conf, + Start: middle, + End: end, + } + // create a new region + newRegion, err := region.NewRegion(regionConfig) + if err != nil { + return err + } + // move the data to the new region + err = moveDataToNewRegion(newRegion, start, middle, r, end) + if err != nil { + return err + } + } + } + return nil } func (s *store) Merge(regionA region.Region, regionB region.Region) error { @@ -115,3 +153,13 @@ func isKeyInRange(key, startRange, endRange []byte) bool { // If neither of the above, the key is in range return true } + +// moveDataToNewRegion moves the data from the old region to the new region. +// new: the new region +// start: the start key of the new region +// end: the end key of the new region +// old: the old region +func moveDataToNewRegion(new region.Region, start []byte, end []byte, old region.Region, oldEnd []byte) error { + // modify the start key and the end key of the old region + panic("implement me") +} diff --git a/config/region_config.go b/config/region_config.go new file mode 100644 index 00000000..35e5822c --- /dev/null +++ b/config/region_config.go @@ -0,0 +1,9 @@ +package config + +type RegionConfig struct { + Options Options + Config Config + Id int64 + Start []byte + End []byte +} From 89f2f389d9a05a524d55ae73050533edfcfed46e Mon Sep 17 00:00:00 2001 From: sjcsjc123 <1401189096@qq.com> Date: Sat, 29 Jul 2023 01:39:50 +0800 Subject: [PATCH 06/10] refactor component for store and region --- cluster/store/store.go | 69 +++++++++++++++++++++++------------------- 1 file changed, 38 insertions(+), 31 deletions(-) diff --git a/cluster/store/store.go b/cluster/store/store.go index 80c5d042..5c0698dd 100644 --- a/cluster/store/store.go +++ b/cluster/store/store.go @@ -23,24 +23,20 @@ type store struct { id int64 // store id conf config.Config opts config.Options - addr string // store address - regionList map[int64]region.Region // region list, to store the regions in the store. - mu sync.RWMutex // mutex, to protect the store. - node *snowflake.Node // snowflake node, to generate the id. + addr string // store address + regionList []region.Region // region list, to store the regions in the store. + mu sync.RWMutex // mutex, to protect the store. + node *snowflake.Node // snowflake node, to generate the id. } // Store is the interface of store. type Store interface { // GetRegionByKey gets region and leader peer by region key from cluster. GetRegionByKey(key []byte) (region.Region, error) - // GetRegionByID gets region and leader peer by region id from cluster. - GetRegionByID(id int64) (region.Region, error) // Split splits the region into two regions. Split() error // Merge merges two adjacent regions into one region. - Merge(regionA region.Region, regionB region.Region) error - // GetSize gets the total size of the store. - GetSize() int64 + Merge() error } // NewStore creates a new store. @@ -66,8 +62,8 @@ func NewStore(conf config.StoreConfig) (Store, error) { return &store{ id: conf.Id, node: node, - regionList: map[int64]region.Region{ - newRegion.GetID(): newRegion, + regionList: []region.Region{ + newRegion, }, addr: conf.Addr, conf: conf.Config, @@ -86,16 +82,6 @@ func (s *store) GetRegionByKey(key []byte) (region.Region, error) { return nil, errors.New("the specified region does not exist") } -// GetRegionByID gets region by region id from store. -func (s *store) GetRegionByID(id int64) (region.Region, error) { - s.mu.RLock() - defer s.mu.RUnlock() - if _, ok := s.regionList[id]; !ok { - return nil, errors.New("the specified region does not exist") - } - return s.regionList[id], nil -} - func (s *store) Split() error { for _, r := range s.regionList { if r.GetSize() >= Threshold { @@ -119,21 +105,36 @@ func (s *store) Split() error { return err } // move the data to the new region - err = moveDataToNewRegion(newRegion, start, middle, r, end) + err = moveDataToNewRegion(newRegion, middle, end, r, start, end) if err != nil { return err } + // add the new region to the region list + s.regionList = append(s.regionList, newRegion) } } return nil } -func (s *store) Merge(regionA region.Region, regionB region.Region) error { - panic("implement me") -} - -func (s *store) GetSize() int64 { - panic("implement me") +// Merge merges two adjacent regions into one region. +func (s *store) Merge() error { + // find the two adjacent regions which have the smallest size + for i, r := range s.regionList { + if i == len(s.regionList)-1 { + break + } + // regionList[i] and regionList[i+1] are adjacent + // and if the size of them is smaller than the threshold / 2, merge them + if r.GetSize()+s.regionList[i+1].GetSize() < Threshold/2 { + err := mergeTwoRegions(r, s.regionList[i+1]) + if err != nil { + return err + } + // delete the second region + s.regionList = append(s.regionList[:i+1], s.regionList[i+2:]...) + } + } + return nil } // isKeyInRange checks if the key is in the range of the region. @@ -156,10 +157,16 @@ func isKeyInRange(key, startRange, endRange []byte) bool { // moveDataToNewRegion moves the data from the old region to the new region. // new: the new region -// start: the start key of the new region -// end: the end key of the new region +// newStartKey: the start key of the new region +// newEndKey: the end key of the new region // old: the old region -func moveDataToNewRegion(new region.Region, start []byte, end []byte, old region.Region, oldEnd []byte) error { +// oldStartKey: the start key of the old region +// oldEndKey: the end key of the old region +func moveDataToNewRegion(new region.Region, newStartKey []byte, newEndKey []byte, old region.Region, oldStartKey []byte, oldEndKey []byte) error { // modify the start key and the end key of the old region panic("implement me") } + +func mergeTwoRegions(r1 region.Region, r2 region.Region) error { + panic("implement me") +} From 7c98862fd70710a0b8900c8694ba9f8c36dc9772 Mon Sep 17 00:00:00 2001 From: sjcsjc123 <1401189096@qq.com> Date: Sat, 29 Jul 2023 01:40:31 +0800 Subject: [PATCH 07/10] refactor component for store and region --- cluster/store/store.go | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/cluster/store/store.go b/cluster/store/store.go index 5c0698dd..6d135910 100644 --- a/cluster/store/store.go +++ b/cluster/store/store.go @@ -167,6 +167,10 @@ func moveDataToNewRegion(new region.Region, newStartKey []byte, newEndKey []byte panic("implement me") } +// mergeTwoRegions merges two adjacent regions into one region. +// r1: the first region +// r2: the second region +// move the data from the second region to the first region func mergeTwoRegions(r1 region.Region, r2 region.Region) error { panic("implement me") } From c3e6c3aee42bcc5e402c88f5f8729a9b09bb0f80 Mon Sep 17 00:00:00 2001 From: sjcsjc123 <1401189096@qq.com> Date: Sat, 29 Jul 2023 01:46:19 +0800 Subject: [PATCH 08/10] refactor component for store and region --- cluster/meta/meta.go | 40 ++++++++++++++++++++++++++++++++++++++++ cluster/store/store.go | 1 + 2 files changed, 41 insertions(+) diff --git a/cluster/meta/meta.go b/cluster/meta/meta.go index 0b9d0809..8d8b7400 100644 --- a/cluster/meta/meta.go +++ b/cluster/meta/meta.go @@ -40,3 +40,43 @@ type meta struct { scheduler *Scheduler // scheduler, to schedule the cluster. raft *raft.Raft // raft, to store the raft group. } + +func (m *meta) GetStore(addr string) (*store.Store, error) { + //TODO implement me + panic("implement me") +} + +func (m *meta) AddStore(addr string) error { + //TODO implement me + panic("implement me") +} + +func (m *meta) RemoveStore(addr string) error { + //TODO implement me + panic("implement me") +} + +func (m *meta) GetAllStores() []*store.Store { + //TODO implement me + panic("implement me") +} + +func (m *meta) GetRegionByID(id uint64) (*region.Region, error) { + //TODO implement me + panic("implement me") +} + +func (m *meta) GetStoreByID(id uint64) (*store.Store, error) { + //TODO implement me + panic("implement me") +} + +func NewMeta(conf config.Config) MetadataManager { + return &meta{ + clusterConfig: &conf, + heartbeat: make(map[string]time.Time), + dirTree: dirtree.NewDirTree(), + stores: make(map[string]*store.Store), + regions: make(map[uint64]*region.Region), + } +} diff --git a/cluster/store/store.go b/cluster/store/store.go index 6d135910..26790102 100644 --- a/cluster/store/store.go +++ b/cluster/store/store.go @@ -82,6 +82,7 @@ func (s *store) GetRegionByKey(key []byte) (region.Region, error) { return nil, errors.New("the specified region does not exist") } +// Split splits the region into two regions. func (s *store) Split() error { for _, r := range s.regionList { if r.GetSize() >= Threshold { From 88a35ef033b8ab7c5e58ebcd4401989f1d3a7a73 Mon Sep 17 00:00:00 2001 From: sjcsjc123 <1401189096@qq.com> Date: Sat, 29 Jul 2023 01:57:48 +0800 Subject: [PATCH 09/10] refactor component for store and region --- cluster/meta/meta.go | 43 +++++++++++++++++++++++++++++++++++++++ config/cluster_options.go | 2 ++ 2 files changed, 45 insertions(+) diff --git a/cluster/meta/meta.go b/cluster/meta/meta.go index 8d8b7400..6ba6a392 100644 --- a/cluster/meta/meta.go +++ b/cluster/meta/meta.go @@ -1,6 +1,7 @@ package meta import ( + "fmt" "github.com/ByteStorage/FlyDB/cluster/region" "github.com/ByteStorage/FlyDB/cluster/store" "github.com/ByteStorage/FlyDB/config" @@ -24,6 +25,8 @@ type MetadataManager interface { GetRegionByID(id uint64) (*region.Region, error) // GetStoreByID gets a store by id. GetStoreByID(id uint64) (*store.Store, error) + // ApplyConfig applies a new config to the cluster. + ApplyConfig(config *config.Config) error } // meta stores the metadata of the cluster. @@ -39,38 +42,78 @@ type meta struct { mu sync.RWMutex // mutex, to protect the metadata. scheduler *Scheduler // scheduler, to schedule the cluster. raft *raft.Raft // raft, to store the raft group. + addr string // address, to store the address of the meta node. } +// GetStore gets a store by address. func (m *meta) GetStore(addr string) (*store.Store, error) { //TODO implement me panic("implement me") } +// AddStore adds a new store to the cluster. func (m *meta) AddStore(addr string) error { //TODO implement me panic("implement me") } +// RemoveStore removes a store from the cluster. func (m *meta) RemoveStore(addr string) error { //TODO implement me panic("implement me") } +// GetAllStores gets all stores in the cluster. func (m *meta) GetAllStores() []*store.Store { //TODO implement me panic("implement me") } +// GetRegionByID gets a region by id. func (m *meta) GetRegionByID(id uint64) (*region.Region, error) { //TODO implement me panic("implement me") } +// GetStoreByID gets a store by id. func (m *meta) GetStoreByID(id uint64) (*store.Store, error) { //TODO implement me panic("implement me") } +// ApplyConfig applies a new config to the cluster. +func (m *meta) ApplyConfig(config *config.Config) error { + m.mu.Lock() + defer m.mu.Unlock() + if m != nil { + err := m.stop() + if err != nil { + // if err = ErrNotStarted, it means the meta node has not started yet. + } + err = m.start() + if err != nil { + return err + } + } + return nil +} + +func (m *meta) start() error { + for _, metaNode := range m.clusterConfig.MetaNodes { + // ssh to the meta node + // start the meta node + fmt.Println(metaNode) + panic("implement me") + } + return nil +} + +func (m *meta) stop() error { + //TODO implement me + panic("implement me") +} + +// NewMeta creates a new meta. func NewMeta(conf config.Config) MetadataManager { return &meta{ clusterConfig: &conf, diff --git a/config/cluster_options.go b/config/cluster_options.go index ec10dddf..d9ac2cb1 100644 --- a/config/cluster_options.go +++ b/config/cluster_options.go @@ -12,4 +12,6 @@ type Config struct { SnapshotStoragePath string LogDataStorageSize int64 HeartbeatInterval time.Duration + MetaNodes []string + StoreNodes []string } From e481aedbd3b79cb18d8a843ebb90282266dac1c4 Mon Sep 17 00:00:00 2001 From: sjcsjc123 <1401189096@qq.com> Date: Sat, 29 Jul 2023 14:31:23 +0800 Subject: [PATCH 10/10] refactor component for store and region --- cluster/meta/meta.go | 1 - 1 file changed, 1 deletion(-) diff --git a/cluster/meta/meta.go b/cluster/meta/meta.go index 6ba6a392..1110a249 100644 --- a/cluster/meta/meta.go +++ b/cluster/meta/meta.go @@ -42,7 +42,6 @@ type meta struct { mu sync.RWMutex // mutex, to protect the metadata. scheduler *Scheduler // scheduler, to schedule the cluster. raft *raft.Raft // raft, to store the raft group. - addr string // address, to store the address of the meta node. } // GetStore gets a store by address.