-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathpartition.go
149 lines (135 loc) · 4.66 KB
/
partition.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
// Copyright (C) 2019 rameshvk. All rights reserved.
// Use of this source code is governed by a MIT-style license
// that can be found in the LICENSE file.
// Package partition provides utilities to partition requests to a cluster.
//
// Every server in the cluster receives requests meant for any
// partition but these requests are then routed to the right partition
// where it is handled.
//
// Each server in the cluster also serves traffic from other servers
// destined for its own partition.
//
// Usage;
//
// Every server in the cluster creates a router at initialization
// time:
//
// endpointRegistry = partition.WithEndpointRegistry(...)
// router, err := partition.New(ctx, "ownIP:2222", handler, endpointRegistry)
//
// The endpoint registry keeps track of all the servers in the cluster
// and their local addresses for inter-server communication
//
// The handler is the servers own implementation of requests routed to
// it by other servers.
//
// When an external request comes in, the server would hash the
// request and then use the router to execute it on the right server
// in its cluster:
//
// resp, err := router.Run(ctx, hash, request)
//
//
// Note that this call would end up being executed on another server
// in the cluster (or on the local server itself if this hash is
// mapped to the local server).
//
// The effectiveness of this strategy depends on how uniform the hash
// is and the mapping strategy. The default mechanism to map hashes
// to specific servers in the cluster is to use a
// highest-random-weight algorithm which can be overridden using the
// WithPicker option.
//
// The inter-server communication is via RPC and this also can be
// configured to alternate mechanisms using the WithNetwork option.
//
//
// The requests and responses are expected to be byte slices. For
// stronger types, protobufs can be used to serialize structures or
// the runner package (see
// https://godoc.org/github.com/tvastar/cluster/pkg/partition/runner)
// for solution using gob-encoding and reflection.
package partition
import (
"context"
"io"
)
// EndpointRegistry manages the live list of endpoints in a cluster.
//
// The partition package does not cache the results so any requirement
// to cache this should be
type EndpointRegistry interface {
RegisterEndpoint(ctx context.Context, addr string) (io.Closer, error)
ListEndpoints(ctx context.Context, refresh bool) ([]string, error)
}
// Network implements the communication network between servers in the clsuter.
type Network interface {
DialClient(ctx context.Context, addr string) (RunCloser, error)
RegisterServer(ctx context.Context, addr string, handler Runner) (io.Closer, error)
}
// Runner executes a single request with the specified hash
type Runner interface {
Run(ctx context.Context, hash uint64, input []byte) ([]byte, error)
}
// RunCloser combines Runner and io.Closer
type RunCloser interface {
Runner
io.Closer
}
var defaultConfig = config{
Network: NewRPCNetwork(nil),
}
// New returns a RunCloser which targets requests to
// specific endpoints based on the hash provided to the request.
//
// This automatically adds the provided address to the cluster. The
// provided handler is used to serve requests meant for the local
// server.
//
// Defaults are used for Picker and Network but EndpointRegistry must
// be specified -- no defaults are used for it.
func New(ctx context.Context, addr string, handler Runner, opts ...Option) (RunCloser, error) {
s := &state{config: defaultConfig, addr: addr, handler: handler}
s.config.pickEndpoint = NewPicker()
for _, opt := range opts {
opt(&s.config)
}
return s.init(ctx)
}
type config struct {
EndpointRegistry
Network
pickEndpoint func(ctx context.Context, list []string, hash uint64) string
}
// Option configures the partitioning algorithm.
type Option func(c *config)
// WithEndpointRegistry specifies how the endpoints
// discovery/registration happens.
//
// There is no defaualt endpoint registry.
//
// RedisRegistry implements a Redis-based endpoint registry.
func WithEndpointRegistry(r EndpointRegistry) Option {
return func(c *config) {
c.EndpointRegistry = r
}
}
// WithPicker specifies how the pick endpoints based on the hash.
//
// The default algorithm is to use the highest random weight
// algorithm (via NewPicker())
func WithPicker(picker func(ctx context.Context, list []string, hash uint64) string) Option {
return func(c *config) {
c.pickEndpoint = picker
}
}
// WithNetwork provides a network implementation for servers in the
// cluster to route requests to eqch othere.
//
// The default mechanism is to use RPC (via NewRPCNetwork)
func WithNetwork(nw Network) Option {
return func(c *config) {
c.Network = nw
}
}