Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Sentinel-10: kvraft comment thru #10

Merged
merged 1 commit into from
Dec 21, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
46 changes: 27 additions & 19 deletions kvraft/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,21 +7,25 @@ import (

"github.com/ReshiAdavan/Sentinel/rpc"
)

// Clerk is a client for a Raft-based key-value store.
type Clerk struct {
servers []*rpc.ClientEnd
mu sync.Mutex
clientId int64
requestId int64
leader int
servers []*rpc.ClientEnd // List of RPC client endpoints for the Raft servers.
mu sync.Mutex // Mutex to protect concurrent access to the next fields.
clientId int64 // Unique client identifier.
requestId int64 // Incrementing request ID to distinguish different requests from the same client.
leader int // Index of the server believed to be the leader.
}

// nrand generates a random 62-bit integer, used for generating unique client IDs.
func nrand() int64 {
max := big.NewInt(int64(1) << 62)
bigx, _ := rand.Int(rand.Reader, max)
ReshiAdavan marked this conversation as resolved.
Show resolved Hide resolved
x := bigx.Int64()
return x
}

// MakeClerk initializes a new Clerk instance with a list of server RPC endpoints.
func MakeClerk(servers []*rpc.ClientEnd) *Clerk {
ck := new(Clerk)
ck.servers = servers
Expand All @@ -32,64 +36,68 @@ func MakeClerk(servers []*rpc.ClientEnd) *Clerk {
}

/*
* Fetch the current value for a key.
* Returns "" if the key does not exist.
* Keeps trying forever in the face of all other errors
* The types of args and reply (including whether they are pointers)
must match the declared types of the RPC handler function's
arguments and reply must be passed as a pointer.
* Get fetches the current value for a key from the key-value store.
* It returns an empty string if the key does not exist.
* The function retries indefinitely in case of errors, trying to find the correct leader.
*/

func (ck *Clerk) Get(key string) string {
args := GetArgs{}
args.Key = key
args.ClientId = ck.clientId

// Locking to ensure that requestId is incremented atomically.
ReshiAdavan marked this conversation as resolved.
Show resolved Hide resolved
ck.mu.Lock()
args.RequestId = ck.requestId
ck.requestId++
ck.mu.Unlock()

for ; ; ck.leader = (ck.leader + 1) % len(ck.servers) {
// Keep trying different servers until a valid response is received.
for {
ReshiAdavan marked this conversation as resolved.
Show resolved Hide resolved
server := ck.servers[ck.leader]
reply := GetReply{}
ok := server.Call("KVServer.Get", &args, &reply)
if ok && !reply.WrongLeader {
return reply.Value
}
ck.leader = (ck.leader + 1) % len(ck.servers)
}
}

/*
* Shared by Put and Append.
* The types of args and reply (including whether they are pointers)
must match the declared types of the RPC handler function's
arguments and reply must be passed as a pointer.
* PutAppend either puts a new value for a key or appends to an existing value, based on the operation type.
* This is a helper function used by both Put and Append.
*/

func (ck *Clerk) PutAppend(key string, value string, op string) {
args := PutAppendArgs{}
args.Key = key
args.Value = value
args.Command = op
args.ClientId = ck.clientId

// Locking to ensure that requestId is incremented atomically.
ReshiAdavan marked this conversation as resolved.
Show resolved Hide resolved
ck.mu.Lock()
args.RequestId = ck.requestId
ck.requestId++
ck.mu.Unlock()

for ; ; ck.leader = (ck.leader + 1) % len(ck.servers) {
// Keep trying different servers until a valid response is received.
for {
ReshiAdavan marked this conversation as resolved.
Show resolved Hide resolved
server := ck.servers[ck.leader]
reply := PutAppendReply{}
ok := server.Call("KVServer.PutAppend", &args, &reply)
if ok && !reply.WrongLeader {
return
}
ck.leader = (ck.leader + 1) % len(ck.servers)
}
}

// Put inserts or updates the value for a given key in the key-value store.
func (ck *Clerk) Put(key string, value string) {
ck.PutAppend(key, value, "put")
}

// Append appends the given value to the existing value for a given key in the key-value store.
func (ck *Clerk) Append(key string, value string) {
ck.PutAppend(key, value, "append")
}
37 changes: 21 additions & 16 deletions kvraft/common.go
Original file line number Diff line number Diff line change
@@ -1,34 +1,39 @@
package raftkv

// Constants defining possible error states.
const (
OK = "OK"
ErrNoKey = "ErrNoKey"
OK = "OK" // Indicates successful operation.
ReshiAdavan marked this conversation as resolved.
Show resolved Hide resolved
ErrNoKey = "ErrNoKey" // Indicates that the requested key does not exist in the key-value store.
ReshiAdavan marked this conversation as resolved.
Show resolved Hide resolved
)

// Err is a custom type representing an error string.
type Err string

// Put or Append
// PutAppendArgs defines the arguments structure for Put and Append operations.
type PutAppendArgs struct {
Key string
Value string
Command string // "Put" or "Append"
ClientId int64
RequestId int64
Key string // Key in the key-value store.
Value string // Value to be associated with the key.
Command string // Operation type: "Put" or "Append".
ReshiAdavan marked this conversation as resolved.
Show resolved Hide resolved
ClientId int64 // Unique client identifier to differentiate requests.
ReshiAdavan marked this conversation as resolved.
Show resolved Hide resolved
RequestId int64 // Unique request identifier for idempotency.
ReshiAdavan marked this conversation as resolved.
Show resolved Hide resolved
}

// PutAppendReply defines the reply structure for Put and Append operations.
type PutAppendReply struct {
WrongLeader bool
Err Err
WrongLeader bool // Flag to indicate if the operation reached a non-leader server.
ReshiAdavan marked this conversation as resolved.
Show resolved Hide resolved
Err Err // Error status of the operation.
ReshiAdavan marked this conversation as resolved.
Show resolved Hide resolved
}

// GetArgs defines the arguments structure for Get operation.
type GetArgs struct {
Key string
ClientId int64
RequestId int64
Key string // Key to retrieve from the key-value store.
ClientId int64 // Unique client identifier.
ReshiAdavan marked this conversation as resolved.
Show resolved Hide resolved
RequestId int64 // Unique request identifier.
ReshiAdavan marked this conversation as resolved.
Show resolved Hide resolved
}

// GetReply defines the reply structure for Get operation.
type GetReply struct {
WrongLeader bool
Err Err
Value string
WrongLeader bool // Flag to indicate if the operation reached a non-leader server.
ReshiAdavan marked this conversation as resolved.
Show resolved Hide resolved
Err Err // Error status of the operation.
ReshiAdavan marked this conversation as resolved.
Show resolved Hide resolved
Value string // The value retrieved for the key, if any.
ReshiAdavan marked this conversation as resolved.
Show resolved Hide resolved
}
52 changes: 22 additions & 30 deletions kvraft/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,14 +19,15 @@ import (
"github.com/ReshiAdavan/Sentinel/raft"
)

// randstring generates a random string of a specified length.
func randstring(n int) string {
b := make([]byte, 2*n)
crand.Read(b)
s := base64.URLEncoding.EncodeToString(b)
return s[0:n]
}

// Randomize server handles
// random_handles shuffles the server handles for load balancing or test randomness.
func random_handles(kvh []*rpc.ClientEnd) []*rpc.ClientEnd {
ReshiAdavan marked this conversation as resolved.
Show resolved Hide resolved
sa := make([]*rpc.ClientEnd, len(kvh))
copy(sa, kvh)
Expand All @@ -37,6 +38,7 @@ func random_handles(kvh []*rpc.ClientEnd) []*rpc.ClientEnd {
return sa
}

// config holds the configuration for a set of raft servers and clients for testing.
type config struct {
mu sync.Mutex
t *testing.T
Expand All @@ -55,6 +57,7 @@ type config struct {
ops int32 // number of clerk get/put/append method calls
}

// cleanup terminates all the servers in the configuration.
func (cfg *config) cleanup() {
cfg.mu.Lock()
defer cfg.mu.Unlock()
Expand All @@ -65,7 +68,7 @@ func (cfg *config) cleanup() {
}
}

// Maximum log size across all servers
// LogSize calculates the maximum log size across all servers.
func (cfg *config) LogSize() int {
ReshiAdavan marked this conversation as resolved.
Show resolved Hide resolved
logsize := 0
for i := 0; i < cfg.n; i++ {
Expand All @@ -77,7 +80,7 @@ func (cfg *config) LogSize() int {
return logsize
}

// Maximum snapshot size across all servers
// SnapshotSize calculates the maximum snapshot size across all servers.
func (cfg *config) SnapshotSize() int {
ReshiAdavan marked this conversation as resolved.
Show resolved Hide resolved
snapshotsize := 0
for i := 0; i < cfg.n; i++ {
Expand All @@ -89,10 +92,7 @@ func (cfg *config) SnapshotSize() int {
return snapshotsize
}

/*
* Attach server i to servers listed in to caller must hold cfg.mu
*/

// connectUnlocked connects server i to the servers listed in `to`. Caller must hold cfg.mu.
func (cfg *config) connectUnlocked(i int, to []int) {
// log.Printf("connect peer %d to %v\n", i, to)

Expand All @@ -115,10 +115,7 @@ func (cfg *config) connect(i int, to []int) {
cfg.connectUnlocked(i, to)
}

/*
* Detach server i from the servers listed in from caller must hold cfg.mu
*/

// disconnectUnlocked disconnects server i from the servers listed in `from`. Caller must hold cfg.mu.
ReshiAdavan marked this conversation as resolved.
Show resolved Hide resolved
func (cfg *config) disconnectUnlocked(i int, from []int) {
// log.Printf("disconnect peer %d from %v\n", i, from)

Expand All @@ -145,6 +142,7 @@ func (cfg *config) disconnect(i int, from []int) {
cfg.disconnectUnlocked(i, from)
}

// All returns a slice of all server indices.
func (cfg *config) All() []int {
all := make([]int, cfg.n)
for i := 0; i < cfg.n; i++ {
Expand All @@ -153,6 +151,7 @@ func (cfg *config) All() []int {
return all
}

// ConnectAll connects all servers in the configuration.
func (cfg *config) ConnectAll() {
cfg.mu.Lock()
defer cfg.mu.Unlock()
Expand All @@ -161,7 +160,7 @@ func (cfg *config) ConnectAll() {
}
}

// Sets up 2 partitions with connectivity between servers in each partition.
// partition sets up 2 partitions with connectivity between servers in each partition.
func (cfg *config) partition(p1 []int, p2 []int) {
cfg.mu.Lock()
defer cfg.mu.Unlock()
Expand All @@ -176,10 +175,7 @@ func (cfg *config) partition(p1 []int, p2 []int) {
}
}

/*
* Create a clerk with clerk specific server names.
* Give it connections to all of the servers, but for now enable only connections to servers in to[].
*/
// makeClient creates a clerk with specific server names and connections.
ReshiAdavan marked this conversation as resolved.
Show resolved Hide resolved
func (cfg *config) makeClient(to []int) *Clerk {
cfg.mu.Lock()
defer cfg.mu.Unlock()
Expand Down Expand Up @@ -211,7 +207,7 @@ func (cfg *config) deleteClient(ck *Clerk) {
delete(cfg.clerks, ck)
}

// caller should hold cfg.mu
// ConnectClientUnlocked connects a client to specified servers. Caller should hold cfg.mu.
ReshiAdavan marked this conversation as resolved.
Show resolved Hide resolved
func (cfg *config) ConnectClientUnlocked(ck *Clerk, to []int) {
// log.Printf("ConnectClient %v to %v\n", ck, to)
endnames := cfg.clerks[ck]
Expand All @@ -221,13 +217,14 @@ func (cfg *config) ConnectClientUnlocked(ck *Clerk, to []int) {
}
}

// ConnectClient wraps ConnectClientUnlocked with mutex locking.
func (cfg *config) ConnectClient(ck *Clerk, to []int) {
cfg.mu.Lock()
defer cfg.mu.Unlock()
cfg.ConnectClientUnlocked(ck, to)
}

// caller should hold cfg.mu
// DisconnectClientUnlocked disconnects a client from specified servers. Caller should hold cfg.mu.
func (cfg *config) DisconnectClientUnlocked(ck *Clerk, from []int) {
// log.Printf("DisconnectClient %v from %v\n", ck, from)
endnames := cfg.clerks[ck]
Expand All @@ -237,13 +234,14 @@ func (cfg *config) DisconnectClientUnlocked(ck *Clerk, from []int) {
}
}

// DisconnectClient wraps DisconnectClientUnlocked with mutex locking.
func (cfg *config) DisconnectClient(ck *Clerk, from []int) {
cfg.mu.Lock()
defer cfg.mu.Unlock()
cfg.DisconnectClientUnlocked(ck, from)
}

// Shutdown a server by isolating it
// ShutdownServer isolates and shuts down a specified server.
func (cfg *config) ShutdownServer(i int) {
cfg.mu.Lock()
defer cfg.mu.Unlock()
Expand Down Expand Up @@ -327,7 +325,7 @@ func (cfg *config) Leader() (bool, int) {
return false, 0
}

// Partition servers into 2 groups and put current leader in minority
// make_partition partitions the servers, ensuring the current leader is in the minority.
func (cfg *config) make_partition() ([]int, []int) {
ReshiAdavan marked this conversation as resolved.
Show resolved Hide resolved
_, l := cfg.Leader()
p1 := make([]int, cfg.n/2+1)
Expand Down Expand Up @@ -379,15 +377,12 @@ func make_config(t *testing.T, n int, unreliable bool, maxraftstate int) *config
return cfg
}

// rpcTotal returns the total number of RPC calls made.
func (cfg *config) rpcTotal() int {
return cfg.net.GetTotalCount()
}

/*
* Start a Test.
* Print the Test message.
*/

// begin starts a test and prints the test message.
func (cfg *config) begin(description string) {
ReshiAdavan marked this conversation as resolved.
Show resolved Hide resolved
fmt.Printf("%s ...\n", description)
cfg.t0 = time.Now()
Expand All @@ -404,15 +399,12 @@ func (cfg *config) begin(description string) {
}()
}

// op records an operation performed by a clerk.
func (cfg *config) op() {
ReshiAdavan marked this conversation as resolved.
Show resolved Hide resolved
atomic.AddInt32(&cfg.ops, 1)
}

/*
* End a Test -- the fact that we got here means there was no failure.
* Print the Passed message, and some performance numbers.
*/

// end concludes a test, prints performance numbers, and checks for timeouts.
func (cfg *config) end() {
ReshiAdavan marked this conversation as resolved.
Show resolved Hide resolved
atomic.AddInt32(&cfg.testNum, 1) // suppress two-minute timeout

Expand Down
Loading