Skip to content

Commit

Permalink
Merge pull request #4 from mailsac/ha
Browse files Browse the repository at this point in the history
High Availability Alpha

Notes and limitations:
1. servers will replicate packets with new command - must have 2 CLI flags
2. child logger for server/client
3. client has pool of servers and checks health
4. note the changes in client and server pre-built apps - slight changes to CLI flags
5. it's not perfect, but for short-lived metrics it should exceed 80/20 rule
  • Loading branch information
ruffrey authored Dec 18, 2021
2 parents 3ba268a + 044cef6 commit 949cb3c
Show file tree
Hide file tree
Showing 9 changed files with 496 additions and 92 deletions.
25 changes: 22 additions & 3 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -134,7 +134,8 @@ import (
)

const (
serverPort = 3509
// include just one server or comma-separated pool
serverIPPortPool = "127.0.0.1:3509,192.168.0.1:3509"
namespace = "default"
)

Expand All @@ -143,7 +144,7 @@ var (
)

func main() {
c := client.NewClient("127.0.0.1", serverPort, clientResponseTimeout, "very-secure3")
c := client.NewClient(serverIPPortPool, clientResponseTimeout, "very-secure3")
c.Listen(9001)

// seed some entries
Expand All @@ -169,6 +170,24 @@ Entries are grouped in a `namespace`.

See `server/server_test.go` for examples.

## High Availability / Failover

Rudimentary and experimental HA is possible via replication by using the `-p` peers list and `-i` self `IP:host` pair flags such as:
```
dracula-server -p "127.0.0.1:3509,127.0.0.1:3519,127.0.0.1:3529" -i 127.0.0.1:3529
```

where clients can connect to the pool and maintain a list of `-i` servers:
```
dracula-cli -i "127.0.0.1:3509,127.0.0.1:3519,127.0.0.1:3529" [...more flags]
```

All peers in the cluster are listed, as well as the self IP and host in the cluster. These flags tell the dracula server to replicate all PUT messages to peers.

In practice, replication only meets the use case of short-lived, imperfectly consistent metrics.

If you require exact replication across peers, this feature will not be tolerant to network partitioning and will not meet your needs.

## Limitations

Messages are sent over UDP and not reliable. The trade-off desired is speed. This project was initially implemented to
Expand All @@ -186,8 +205,8 @@ is running in a trusted environment.

## Roadmap

- High Availability
- Persistence
- Better support for high availability under network partitions
- Clients in other languages
- Retries
- Pipelining
Expand Down
144 changes: 104 additions & 40 deletions client/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,9 +3,15 @@ package client
import (
"errors"
"fmt"
"github.com/mailsac/dracula/client/serverpool"
"github.com/mailsac/dracula/client/waitingmessage"
"github.com/mailsac/dracula/protocol"
"io/ioutil"
"log"
"net"
"os"
"strconv"
"strings"
"sync"
"sync/atomic"
"time"
Expand All @@ -15,29 +21,66 @@ var (
ErrMessageTimedOut = errors.New("timed out waiting for message response")
ErrClientAlreadyInit = errors.New("client already initialized")
ErrCountReturnBytesTooShort = errors.New("too few bytes returned in count callback")
ErrNoHealthyServers = errors.New("no healthy dracula servers")
)

type Client struct {
conn *net.UDPConn
remoteServer *net.UDPAddr
// conn is this clients incoming listen connection
conn *net.UDPConn
// pool is the list of servers it will communciate with
pool *serverpool.Pool
//remoteServer *net.UDPAddr
messagesWaiting *waitingmessage.ResponseCache // byte is the expected response command type

messageIDCounter uint32
preSharedKey []byte

disposed bool
Debug bool
log *log.Logger
}

func NewClient(remoteServerIP string, remoteUDPPort int, timeout time.Duration, preSharedKey string) *Client {
return &Client{
remoteServer: &net.UDPAddr{
Port: remoteUDPPort,
IP: net.ParseIP(remoteServerIP),
},
func NewClient(remoteServerIPPortList string, timeout time.Duration, preSharedKey string) *Client {
var servers []*net.UDPAddr
parts := strings.Split(remoteServerIPPortList, ",")
if len(parts) < 1 {
panic("missing dracula server list on client init!")
}
for _, ipPort := range parts {
p := strings.Split(ipPort, ":")
if len(p) != 2 {
panic(fmt.Errorf("bad <ip:port> dracula client init %s", ipPort))
}
sport, err := strconv.Atoi(p[1])
if err != nil {
panic(fmt.Errorf("bad ip:<port> dracula client init %s", ipPort))
}
servers = append(servers, &net.UDPAddr{
IP: net.ParseIP(p[0]),
Port: sport,
})
}
c := &Client{
preSharedKey: []byte(preSharedKey),
messagesWaiting: waitingmessage.NewCache(timeout),
log: log.New(os.Stdout, "", 0),
}
c.pool = serverpool.NewPool(c, servers)

c.DebugDisable()
return c
}

func (c *Client) GetConn() *net.UDPConn {
return c.conn
}

func (c *Client) DebugEnable(prefix string) {
c.log.SetOutput(os.Stdout)
c.log.SetPrefix(prefix + " ")
}

func (c *Client) DebugDisable() {
c.log.SetOutput(ioutil.Discard)
}

func (c *Client) PendingRequests() int {
Expand All @@ -57,13 +100,14 @@ func (c *Client) Listen(localUDPPort int) error {
}
//defer conn.Close()
c.conn = conn
if c.Debug {
fmt.Printf("client listening %s\n", conn.LocalAddr().String())
}
c.log.Printf("client listening %s\n", conn.LocalAddr().String())

go c.handleResponsesForever()
go c.handleTimeouts()

c.pool.Listen()
c.log.Printf("client created server pool %v\n", c.pool.ListServers())

return nil
}

Expand All @@ -75,6 +119,9 @@ func (c *Client) Close() error {
c.disposed = true
c.messagesWaiting.Dispose()

if c.pool != nil {
c.pool.Dispose()
}
if c.conn != nil {
err = c.conn.Close()
if err != nil {
Expand Down Expand Up @@ -102,31 +149,29 @@ func (c *Client) handleResponsesForever() {
message := make([]byte, protocol.PacketSize)
_, remote, err := c.conn.ReadFromUDP(message[:])
if err != nil {
fmt.Println("client read error:", err)
c.log.Println("client read error:", err)
continue
}
packet, err := protocol.ParsePacket(message)
if err != nil {
if packet != nil && packet.MessageID > 0 {
fmt.Println("client parse packet error but has message id:", packet.MessageID, remote, err, message)
c.log.Println("client parse packet error but has message id:", packet.MessageID, remote, err, message)
} else {
fmt.Println("client received invalid packet from", remote, err, message)
c.log.Println("client received invalid packet from", remote, err, message)
continue
}
}

if c.Debug {
fmt.Println("client received packet:", remote, string(packet.Command), packet.MessageID, packet.NamespaceString(), packet.DataValueString())
}
c.log.Println("client received packet:", remote, string(packet.Command), packet.MessageID, packet.NamespaceString(), packet.DataValueString())

cb, err := c.messagesWaiting.Pull(packet.MessageID)
if err != nil {
fmt.Println("client message not expected:", packet.Command, packet.MessageID, packet.NamespaceString(), err)
c.log.Println("client message not expected:", packet.Command, packet.MessageID, packet.NamespaceString(), err)
continue
}

if !protocol.IsResponseCmd(packet.Command) {
fmt.Println("client message not response command:", packet.Command, packet.MessageID, packet.NamespaceString())
c.log.Println("client message not response command:", packet.Command, packet.MessageID, packet.NamespaceString())
continue
}

Expand All @@ -141,7 +186,7 @@ func (c *Client) handleResponsesForever() {
continue
}

fmt.Println("client unhandled valid response!", packet.Command, packet.MessageID, packet.NamespaceString())
c.log.Println("client unhandled valid response!", packet.Command, packet.MessageID, packet.NamespaceString())
}
}

Expand All @@ -161,9 +206,7 @@ func (c *Client) Count(namespace, entryKey string) (int, error) {
if e != nil {
err = e
} else if len(b) < 4 {
if c.Debug {
fmt.Println("client received too few bytes:", b)
}
c.log.Println("client received too few bytes:", b)
err = ErrCountReturnBytesTooShort
} else {
output = protocol.Uint32FromBytes(b[0:4])
Expand All @@ -179,6 +222,26 @@ func (c *Client) Count(namespace, entryKey string) (int, error) {
return int(output), err
}

// Healthcheck implements serverpool.Checker
func (c *Client) Healthcheck(specificServer *net.UDPAddr) error {
messageID := c.makeMessageID()
var wg sync.WaitGroup
var err error
cb := func(b []byte, e error) {
if e != nil {
err = e
}
wg.Done()
}
wg.Add(1)
// callback has been setup, now make the request
p := protocol.NewPacketFromParts(protocol.CmdCount, messageID, []byte("server_healthcheck_"+specificServer.String()), []byte("check"), c.preSharedKey)
c._send(p, specificServer, cb)

wg.Wait() // wait for callback to be called
return err
}

// CountNamespace (expensive) returns the number of key entries across all keys in a namespace.
func (c *Client) CountNamespace(namespace string) (int, error) {
messageID := c.makeMessageID()
Expand All @@ -189,9 +252,7 @@ func (c *Client) CountNamespace(namespace string) (int, error) {
if e != nil {
err = e
} else if len(b) < 4 {
if c.Debug {
fmt.Println("client received too few bytes:", b)
}
c.log.Println("client received too few bytes:", b)
err = ErrCountReturnBytesTooShort
} else {
output = protocol.Uint32FromBytes(b[0:4])
Expand All @@ -217,9 +278,7 @@ func (c *Client) CountServer() (int, error) {
if e != nil {
err = e
} else if len(b) < 4 {
if c.Debug {
fmt.Println("client received too few bytes:", b)
}
c.log.Println("client received too few bytes:", b)
err = ErrCountReturnBytesTooShort
} else {
output = protocol.Uint32FromBytes(b[0:4])
Expand All @@ -241,9 +300,7 @@ func (c *Client) Put(namespace, value string) error {
var err error
cb := func(b []byte, e error) {
err = e
if err != nil {
fmt.Println(e)
}
c.log.Println("client put error", e)
wg.Done()
}
wg.Add(1)
Expand All @@ -255,10 +312,8 @@ func (c *Client) Put(namespace, value string) error {
return err
}

func (c *Client) sendOrCallbackErr(packet *protocol.Packet, cb waitingmessage.Callback) {
if c.Debug {
fmt.Println("client sending packet:", c.remoteServer, string(packet.Command), packet.MessageID, packet.NamespaceString(), packet.DataValueString())
}
func (c *Client) _send(packet *protocol.Packet, remoteServer *net.UDPAddr, cb waitingmessage.Callback) {
c.log.Println("client sending packet:", remoteServer, string(packet.Command), packet.MessageID, packet.NamespaceString(), packet.DataValueString())

b, err := packet.Bytes()
if err != nil {
Expand All @@ -269,17 +324,17 @@ func (c *Client) sendOrCallbackErr(packet *protocol.Packet, cb waitingmessage.Ca

err = c.messagesWaiting.Add(packet.MessageID, cb)
if err != nil {
fmt.Println("client failed adding waiting message!", packet.MessageID)
c.log.Println("client failed adding waiting message!", packet.MessageID)
cb([]byte{}, err)
return
}

_, err = c.conn.WriteToUDP(b, c.remoteServer)
_, err = c.conn.WriteToUDP(b, remoteServer)
if err != nil {
// immediate failure, handle here
reCall, pullErr := c.messagesWaiting.Pull(packet.MessageID)
if pullErr != nil {
fmt.Println("client failed callback could not be called!", c.remoteServer, string(packet.Command), packet.MessageID, packet.NamespaceString(), packet.DataValueString())
c.log.Println("client failed callback could not be called!", remoteServer, string(packet.Command), packet.MessageID, packet.NamespaceString(), packet.DataValueString())
reCall = cb
}
reCall([]byte{}, err)
Expand All @@ -288,3 +343,12 @@ func (c *Client) sendOrCallbackErr(packet *protocol.Packet, cb waitingmessage.Ca

// ok
}

func (c *Client) sendOrCallbackErr(packet *protocol.Packet, cb waitingmessage.Callback) {
remoteServer := c.pool.Choose()
if remoteServer == nil {
cb([]byte{}, ErrNoHealthyServers)
return
}
c._send(packet, remoteServer, cb)
}
Loading

0 comments on commit 949cb3c

Please sign in to comment.