Skip to content

Commit

Permalink
Merge pull request #10 from ReshiAdavan/Sentinel-10
Browse files Browse the repository at this point in the history
Sentinel-10: kvraft comment thru
  • Loading branch information
ReshiAdavan authored Dec 21, 2023
2 parents 729fa45 + 268785f commit 6c00b1e
Show file tree
Hide file tree
Showing 4 changed files with 103 additions and 102 deletions.
46 changes: 27 additions & 19 deletions kvraft/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,21 +7,25 @@ import (

"github.com/ReshiAdavan/Sentinel/rpc"
)

// Clerk is a client for a Raft-based key-value store.
type Clerk struct {
servers []*rpc.ClientEnd
mu sync.Mutex
clientId int64
requestId int64
leader int
servers []*rpc.ClientEnd // List of RPC client endpoints for the Raft servers.
mu sync.Mutex // Mutex to protect concurrent access to the next fields.
clientId int64 // Unique client identifier.
requestId int64 // Incrementing request ID to distinguish different requests from the same client.
leader int // Index of the server believed to be the leader.
}

// nrand generates a random 62-bit integer, used for generating unique client IDs.
func nrand() int64 {
max := big.NewInt(int64(1) << 62)
bigx, _ := rand.Int(rand.Reader, max)
x := bigx.Int64()
return x
}

// MakeClerk initializes a new Clerk instance with a list of server RPC endpoints.
func MakeClerk(servers []*rpc.ClientEnd) *Clerk {
ck := new(Clerk)
ck.servers = servers
Expand All @@ -32,64 +36,68 @@ func MakeClerk(servers []*rpc.ClientEnd) *Clerk {
}

/*
* Fetch the current value for a key.
* Returns "" if the key does not exist.
* Keeps trying forever in the face of all other errors
* The types of args and reply (including whether they are pointers)
must match the declared types of the RPC handler function's
arguments and reply must be passed as a pointer.
* Get fetches the current value for a key from the key-value store.
* It returns an empty string if the key does not exist.
* The function retries indefinitely in case of errors, trying to find the correct leader.
*/

func (ck *Clerk) Get(key string) string {
args := GetArgs{}
args.Key = key
args.ClientId = ck.clientId

// Locking to ensure that requestId is incremented atomically.
ck.mu.Lock()
args.RequestId = ck.requestId
ck.requestId++
ck.mu.Unlock()

for ; ; ck.leader = (ck.leader + 1) % len(ck.servers) {
// Keep trying different servers until a valid response is received.
for {
server := ck.servers[ck.leader]
reply := GetReply{}
ok := server.Call("KVServer.Get", &args, &reply)
if ok && !reply.WrongLeader {
return reply.Value
}
ck.leader = (ck.leader + 1) % len(ck.servers)
}
}

/*
* Shared by Put and Append.
* The types of args and reply (including whether they are pointers)
must match the declared types of the RPC handler function's
arguments and reply must be passed as a pointer.
* PutAppend either puts a new value for a key or appends to an existing value, based on the operation type.
* This is a helper function used by both Put and Append.
*/

func (ck *Clerk) PutAppend(key string, value string, op string) {
args := PutAppendArgs{}
args.Key = key
args.Value = value
args.Command = op
args.ClientId = ck.clientId

// Locking to ensure that requestId is incremented atomically.
ck.mu.Lock()
args.RequestId = ck.requestId
ck.requestId++
ck.mu.Unlock()

for ; ; ck.leader = (ck.leader + 1) % len(ck.servers) {
// Keep trying different servers until a valid response is received.
for {
server := ck.servers[ck.leader]
reply := PutAppendReply{}
ok := server.Call("KVServer.PutAppend", &args, &reply)
if ok && !reply.WrongLeader {
return
}
ck.leader = (ck.leader + 1) % len(ck.servers)
}
}

// Put inserts or updates the value for a given key in the key-value store.
func (ck *Clerk) Put(key string, value string) {
ck.PutAppend(key, value, "put")
}

// Append appends the given value to the existing value for a given key in the key-value store.
func (ck *Clerk) Append(key string, value string) {
ck.PutAppend(key, value, "append")
}
37 changes: 21 additions & 16 deletions kvraft/common.go
Original file line number Diff line number Diff line change
@@ -1,34 +1,39 @@
package raftkv

// Constants defining possible error states.
const (
OK = "OK"
ErrNoKey = "ErrNoKey"
OK = "OK" // Indicates successful operation.
ErrNoKey = "ErrNoKey" // Indicates that the requested key does not exist in the key-value store.
)

// Err is a custom type representing an error string.
type Err string

// Put or Append
// PutAppendArgs defines the arguments structure for Put and Append operations.
type PutAppendArgs struct {
Key string
Value string
Command string // "Put" or "Append"
ClientId int64
RequestId int64
Key string // Key in the key-value store.
Value string // Value to be associated with the key.
Command string // Operation type: "Put" or "Append".
ClientId int64 // Unique client identifier to differentiate requests.
RequestId int64 // Unique request identifier for idempotency.
}

// PutAppendReply defines the reply structure for Put and Append operations.
type PutAppendReply struct {
WrongLeader bool
Err Err
WrongLeader bool // Flag to indicate if the operation reached a non-leader server.
Err Err // Error status of the operation.
}

// GetArgs defines the arguments structure for Get operation.
type GetArgs struct {
Key string
ClientId int64
RequestId int64
Key string // Key to retrieve from the key-value store.
ClientId int64 // Unique client identifier.
RequestId int64 // Unique request identifier.
}

// GetReply defines the reply structure for Get operation.
type GetReply struct {
WrongLeader bool
Err Err
Value string
WrongLeader bool // Flag to indicate if the operation reached a non-leader server.
Err Err // Error status of the operation.
Value string // The value retrieved for the key, if any.
}
52 changes: 22 additions & 30 deletions kvraft/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,14 +19,15 @@ import (
"github.com/ReshiAdavan/Sentinel/raft"
)

// randstring generates a random string of a specified length.
func randstring(n int) string {
b := make([]byte, 2*n)
crand.Read(b)
s := base64.URLEncoding.EncodeToString(b)
return s[0:n]
}

// Randomize server handles
// random_handles shuffles the server handles for load balancing or test randomness.
func random_handles(kvh []*rpc.ClientEnd) []*rpc.ClientEnd {
sa := make([]*rpc.ClientEnd, len(kvh))
copy(sa, kvh)
Expand All @@ -37,6 +38,7 @@ func random_handles(kvh []*rpc.ClientEnd) []*rpc.ClientEnd {
return sa
}

// config holds the configuration for a set of raft servers and clients for testing.
type config struct {
mu sync.Mutex
t *testing.T
Expand All @@ -55,6 +57,7 @@ type config struct {
ops int32 // number of clerk get/put/append method calls
}

// cleanup terminates all the servers in the configuration.
func (cfg *config) cleanup() {
cfg.mu.Lock()
defer cfg.mu.Unlock()
Expand All @@ -65,7 +68,7 @@ func (cfg *config) cleanup() {
}
}

// Maximum log size across all servers
// LogSize calculates the maximum log size across all servers.
func (cfg *config) LogSize() int {
logsize := 0
for i := 0; i < cfg.n; i++ {
Expand All @@ -77,7 +80,7 @@ func (cfg *config) LogSize() int {
return logsize
}

// Maximum snapshot size across all servers
// SnapshotSize calculates the maximum snapshot size across all servers.
func (cfg *config) SnapshotSize() int {
snapshotsize := 0
for i := 0; i < cfg.n; i++ {
Expand All @@ -89,10 +92,7 @@ func (cfg *config) SnapshotSize() int {
return snapshotsize
}

/*
* Attach server i to servers listed in to caller must hold cfg.mu
*/

// connectUnlocked connects server i to the servers listed in `to`. Caller must hold cfg.mu.
func (cfg *config) connectUnlocked(i int, to []int) {
// log.Printf("connect peer %d to %v\n", i, to)

Expand All @@ -115,10 +115,7 @@ func (cfg *config) connect(i int, to []int) {
cfg.connectUnlocked(i, to)
}

/*
* Detach server i from the servers listed in from caller must hold cfg.mu
*/

// disconnectUnlocked disconnects server i from the servers listed in `from`. Caller must hold cfg.mu.
func (cfg *config) disconnectUnlocked(i int, from []int) {
// log.Printf("disconnect peer %d from %v\n", i, from)

Expand All @@ -145,6 +142,7 @@ func (cfg *config) disconnect(i int, from []int) {
cfg.disconnectUnlocked(i, from)
}

// All returns a slice of all server indices.
func (cfg *config) All() []int {
all := make([]int, cfg.n)
for i := 0; i < cfg.n; i++ {
Expand All @@ -153,6 +151,7 @@ func (cfg *config) All() []int {
return all
}

// ConnectAll connects all servers in the configuration.
func (cfg *config) ConnectAll() {
cfg.mu.Lock()
defer cfg.mu.Unlock()
Expand All @@ -161,7 +160,7 @@ func (cfg *config) ConnectAll() {
}
}

// Sets up 2 partitions with connectivity between servers in each partition.
// partition sets up 2 partitions with connectivity between servers in each partition.
func (cfg *config) partition(p1 []int, p2 []int) {
cfg.mu.Lock()
defer cfg.mu.Unlock()
Expand All @@ -176,10 +175,7 @@ func (cfg *config) partition(p1 []int, p2 []int) {
}
}

/*
* Create a clerk with clerk specific server names.
* Give it connections to all of the servers, but for now enable only connections to servers in to[].
*/
// makeClient creates a clerk with specific server names and connections.
func (cfg *config) makeClient(to []int) *Clerk {
cfg.mu.Lock()
defer cfg.mu.Unlock()
Expand Down Expand Up @@ -211,7 +207,7 @@ func (cfg *config) deleteClient(ck *Clerk) {
delete(cfg.clerks, ck)
}

// caller should hold cfg.mu
// ConnectClientUnlocked connects a client to specified servers. Caller should hold cfg.mu.
func (cfg *config) ConnectClientUnlocked(ck *Clerk, to []int) {
// log.Printf("ConnectClient %v to %v\n", ck, to)
endnames := cfg.clerks[ck]
Expand All @@ -221,13 +217,14 @@ func (cfg *config) ConnectClientUnlocked(ck *Clerk, to []int) {
}
}

// ConnectClient wraps ConnectClientUnlocked with mutex locking.
func (cfg *config) ConnectClient(ck *Clerk, to []int) {
cfg.mu.Lock()
defer cfg.mu.Unlock()
cfg.ConnectClientUnlocked(ck, to)
}

// caller should hold cfg.mu
// DisconnectClientUnlocked disconnects a client from specified servers. Caller should hold cfg.mu.
func (cfg *config) DisconnectClientUnlocked(ck *Clerk, from []int) {
// log.Printf("DisconnectClient %v from %v\n", ck, from)
endnames := cfg.clerks[ck]
Expand All @@ -237,13 +234,14 @@ func (cfg *config) DisconnectClientUnlocked(ck *Clerk, from []int) {
}
}

// DisconnectClient wraps DisconnectClientUnlocked with mutex locking.
func (cfg *config) DisconnectClient(ck *Clerk, from []int) {
cfg.mu.Lock()
defer cfg.mu.Unlock()
cfg.DisconnectClientUnlocked(ck, from)
}

// Shutdown a server by isolating it
// ShutdownServer isolates and shuts down a specified server.
func (cfg *config) ShutdownServer(i int) {
cfg.mu.Lock()
defer cfg.mu.Unlock()
Expand Down Expand Up @@ -327,7 +325,7 @@ func (cfg *config) Leader() (bool, int) {
return false, 0
}

// Partition servers into 2 groups and put current leader in minority
// make_partition partitions the servers, ensuring the current leader is in the minority.
func (cfg *config) make_partition() ([]int, []int) {
_, l := cfg.Leader()
p1 := make([]int, cfg.n/2+1)
Expand Down Expand Up @@ -379,15 +377,12 @@ func make_config(t *testing.T, n int, unreliable bool, maxraftstate int) *config
return cfg
}

// rpcTotal returns the total number of RPC calls made.
func (cfg *config) rpcTotal() int {
return cfg.net.GetTotalCount()
}

/*
* Start a Test.
* Print the Test message.
*/

// begin starts a test and prints the test message.
func (cfg *config) begin(description string) {
fmt.Printf("%s ...\n", description)
cfg.t0 = time.Now()
Expand All @@ -404,15 +399,12 @@ func (cfg *config) begin(description string) {
}()
}

// op records an operation performed by a clerk.
func (cfg *config) op() {
atomic.AddInt32(&cfg.ops, 1)
}

/*
* End a Test -- the fact that we got here means there was no failure.
* Print the Passed message, and some performance numbers.
*/

// end concludes a test, prints performance numbers, and checks for timeouts.
func (cfg *config) end() {
atomic.AddInt32(&cfg.testNum, 1) // suppress two-minute timeout

Expand Down
Loading

0 comments on commit 6c00b1e

Please sign in to comment.