Skip to content

Commit

Permalink
Switch to Max Random Weight hashing instead of the complicated consis…
Browse files Browse the repository at this point in the history
…tent hashing
  • Loading branch information
Ramesh VK committed Oct 28, 2019
1 parent 44f0f67 commit 4919d5a
Show file tree
Hide file tree
Showing 4 changed files with 281 additions and 9 deletions.
26 changes: 17 additions & 9 deletions pkg/partition/partition.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,14 +2,14 @@
// Use of this source code is governed by a MIT-style license
// that can be found in the LICENSE file.

// Package partition provides utilities to partition request to a cluster.
// Package partition provides utilities to partition requests to a cluster.
//
// Every server in the cluster receives requests meant for any specific
// Every server in the cluster receives requests meant for any
// partition but these requests are then routed to the right partition
// where it is handled.
//
// Each server in the cluster also serves traffic from other servers
// for the partitions it is responsible for.
// destined for its own partition.
//
// Usage;
//
Expand Down Expand Up @@ -38,11 +38,19 @@
//
// The effectiveness of this strategy depends on how uniform the hash
// is and the mapping strategy. The default mechanism to map hashes
// to specific servers in the cluster is to use a hashring but this
// can be overridden using the WithPicker option.
// to specific servers in the cluster is to use a
// highest-random-weight algorithm which can be overridden using the
// WithPicker option.
//
// The inter-server communication is via RPC and this also can be
// configured to alternate mechanisms using the WithNetwork option.
//
//
// The requests and responses are expected to be byte slices. For
// stronger types, protobufs can be used to serialize structures or
// the runner package (see
// https://godoc.org/github.com/tvastar/cluster/pkg/partition/runner)
// for solution using gob-encoding and reflection.
package partition

import (
Expand Down Expand Up @@ -81,7 +89,7 @@ var defaultConfig = config{
}

// New returns a RunCloser which targets requests to
// specific endpoints based on the hash provided to thee request.
// specific endpoints based on the hash provided to the request.
//
// This automatically adds the provided address to the cluster. The
// provided handler is used to serve requests meant for the local
Expand All @@ -91,7 +99,7 @@ var defaultConfig = config{
// be specified -- no defaults are used for it.
func New(ctx context.Context, addr string, handler Runner, opts ...Option) (RunCloser, error) {
s := &state{config: defaultConfig, addr: addr, handler: handler}
s.config.pickEndpoint = NewHashRing()
s.config.pickEndpoint = NewPicker()
for _, opt := range opts {
opt(&s.config)
}
Expand Down Expand Up @@ -122,8 +130,8 @@ func WithEndpointRegistry(r EndpointRegistry) Option {

// WithPicker specifies how the pick endpoints based on the hash.
//
// The default algorithm is to use a consistent hash using a hash ring
// (via NewHashRing()).
// The default algorithm is to use the highest random weight
// algorithm (via NewPicker())
func WithPicker(picker func(ctx context.Context, list []string, hash uint64) string) Option {
return func(c *config) {
c.pickEndpoint = picker
Expand Down
40 changes: 40 additions & 0 deletions pkg/partition/picker.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
// Copyright (C) 2019 rameshvk. All rights reserved.
// Use of this source code is governed by a MIT-style license
// that can be found in the LICENSE file.

package partition

import (
"context"
"hash/crc32"
"strconv"
)

// NewPicker returns a picker which uses a highest random weight algorithm
//
// See https://en.wikipedia.org/wiki/Rendezvous_hashing
func NewPicker() func(ctx context.Context, list []string, hash uint64) string {
return hrwPicker
}

func hrwPicker(ctx context.Context, list []string, hash uint64) string {
var pick string
var score uint32

h := crc32.NewIEEE()
for _, candidate := range list {
checkWrite(h.Write([]byte(strconv.FormatUint(hash, 16))))
checkWrite(h.Write([]byte(candidate)))
if x := h.Sum32(); x > score {
pick, score = candidate, x
}
h.Reset()
}
return pick
}

func checkWrite(n int, err error) {
if err != nil {
panic(err)
}
}
67 changes: 67 additions & 0 deletions pkg/partition/runner/example_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,67 @@
// Copyright (C) 2019 rameshvk. All rights reserved.
// Use of this source code is governed by a MIT-style license
// that can be found in the LICENSE file.

package runner_test

import (
"context"
"encoding/gob"
"fmt"

"github.com/alicebob/miniredis"
"github.com/tvastar/cluster/pkg/partition"
"github.com/tvastar/cluster/pkg/partition/runner"
)

func Example() {
opt, cleanup := exampleSetup()
defer cleanup()

h := runner.Handlers{}
intf := h.Register(func(ctx context.Context, r IntRequest) (string, error) {
return "hello", nil
}).(func(ctx context.Context, r IntRequest) (string, error))

strf := h.Register(func(ctx context.Context, r StringRequest) (string, error) {
return string(r), nil
}).(func(ctx context.Context, r StringRequest) (string, error))

h.Start(context.Background(), ":2000", opt)

s, err := intf(context.Background(), IntRequest(0))
fmt.Println("Got", s, err)

s, err = strf(context.Background(), StringRequest("boo"))
fmt.Println("Got", s, err)

// Output:
// Got hello <nil>
// Got boo <nil>

}

type IntRequest int

func (i IntRequest) Hash() uint64 {
return 0
}

type StringRequest string

func (s StringRequest) Hash() uint64 {
return 5
}

func exampleSetup() (partition.Option, func()) {
gob.Register(IntRequest(0))
gob.Register(StringRequest("hello"))

minir, err := miniredis.Run()
if err != nil {
panic(err)
}
opt := partition.WithEndpointRegistry(partition.NewRedisRegistry(minir.Addr(), "prefix_"))

return opt, func() { minir.Close() }
}
157 changes: 157 additions & 0 deletions pkg/partition/runner/runner.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,157 @@
// Copyright (C) 2019 rameshvk. All rights reserved.
// Use of this source code is governed by a MIT-style license
// that can be found in the LICENSE file.

// Package runner implements a multiplexer on top of the partition package.
//
// This allows the requests, responses to be more strongly typed than
// raw byte slices. The serialization protocol used is gob-encoding.
//
// This package makes heavy use of reflection.
//
// See the example for sample usage.
package runner

import (
"bytes"
"context"
"encoding/gob"
"reflect"
"strings"

"github.com/tvastar/cluster/pkg/partition"
)

// Handlers allows multiplexing multiple handlers with the same
// partitioner.
//
// Handlers must have the type func(Context, Req) (Res, error)
//
// Further, the Req type must implement a method `Hash() uint64`.
//
// This uses gob-encoding and so the Req/Res types must be registered
// via `gob.Register(sampleReqValue)`
//
// See example for usage details
type Handlers struct {
part partition.RunCloser
fns map[string]reflect.Value
}

// Register registers function as a handler. It returns a function
// (of the same signature as the input function).
//
// The fn must have the type func(Context, Req) (Res, error)
//
// Further, the Req type must implement a method `Hash() uint64`.
//
// The function returned can be used to make partitioned calls
// (i.e. the calls are automatically routed to the right endpoint).
func (h *Handlers) Register(fn interface{}) interface{} {
if h.fns == nil {
h.fns = map[string]reflect.Value{}
}

v := reflect.ValueOf(fn)
t := v.Type()
if _, ok := h.fns[t.String()]; ok {
panic("duplicate function registration")
}
// TODO: t.String() is not guaranteed to be unique yo
h.fns[t.String()] = v
return h.wrap(t)
}

// Start creates a new partitioner under the covers and registers all
// the handlers with it. All the functions returned by the Register
// calls can now be used.
func (h *Handlers) Start(ctx context.Context, address string, opts ...partition.Option) error {
var err error
h.part, err = partition.New(ctx, address, runner(h.run), opts...)
return err
}

// Stop closes the partitioner
func (h *Handlers) Stop() error {
return h.part.Close()
}

func (h *Handlers) run(ctx context.Context, hash uint64, input []byte) ([]byte, error) {
s := string(input)
idx := strings.Index(s, "|")
return h.dispatch(ctx, h.fns[s[:idx]], []byte(s[idx+1:]))
}

func (h *Handlers) dispatch(ctx context.Context, fn reflect.Value, input []byte) ([]byte, error) {
t := fn.Type()
arg := reflect.New(t.In(1))
err := gob.NewDecoder(bytes.NewReader(input)).Decode(arg.Interface())
if err != nil {
return nil, err
}

results := fn.Call([]reflect.Value{reflect.ValueOf(ctx), arg.Elem()})

if err, _ = results[1].Interface().(error); err != nil {
return nil, err
}

var buf bytes.Buffer
err = gob.NewEncoder(&buf).Encode(results[0].Interface())
return buf.Bytes(), err
}

func (h *Handlers) wrap(t reflect.Type) interface{} {
if t.Kind() != reflect.Func {
panic("not a function")
}
if t.NumIn() != 2 || t.NumOut() != 2 {
panic("fn not of form func(context.Context, Request) (Response, error)")
}
if t.In(0) != reflect.TypeOf((*context.Context)(nil)).Elem() {
panic("fn 1st arg is not context.Context type")
}
if !t.In(1).Implements(reflect.TypeOf((*hasher)(nil)).Elem()) {
panic("fn 2nd arg does not implement Hash() uint64")
}
if t.Out(1) != reflect.TypeOf((*error)(nil)).Elem() {
panic("fn 2nd result must be error")
}

return reflect.MakeFunc(t, func(args []reflect.Value) (results []reflect.Value) {
var buf bytes.Buffer
var resp []byte

result := reflect.New(t.Out(0))
results = []reflect.Value{reflect.Zero(t.Out(0)), reflect.Zero(t.Out(1))}

buf.WriteString(t.String() + "|")
enc := gob.NewEncoder(&buf)
err := enc.Encode(args[1].Interface())
if err == nil {
hash := args[1].Interface().(hasher).Hash()
ctx := args[0].Interface().(context.Context)
resp, err = h.part.Run(ctx, hash, buf.Bytes())
}
if err == nil {
err = gob.NewDecoder(bytes.NewReader(resp)).Decode(result.Interface())
}

if err != nil {
results[1] = reflect.ValueOf(err)
} else {
results[0] = result.Elem()
}
return results
}).Interface()
}

type runner func(ctx context.Context, hash uint64, input []byte) ([]byte, error)

func (r runner) Run(ctx context.Context, hash uint64, input []byte) ([]byte, error) {
return r(ctx, hash, input)
}

type hasher interface {
Hash() uint64
}

0 comments on commit 4919d5a

Please sign in to comment.