Skip to content

Commit

Permalink
Initial version
Browse files Browse the repository at this point in the history
  • Loading branch information
Ramesh VK committed Oct 27, 2019
1 parent e6729c3 commit 8dfa2e8
Show file tree
Hide file tree
Showing 13 changed files with 1,194 additions and 1 deletion.
24 changes: 23 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
@@ -1,2 +1,24 @@
# cluster
libraries for cluster/distributed computing

[![Status](https://travis-ci.com/tvastar/cluster.svg?branch=master)](https://travis-ci.com/tvastar/cluster?branch=master)
[![GoDoc](https://godoc.org/github.com/tvastar/cluster?status.svg)](https://godoc.org/github.com/tvastar/cluster)
[![codecov](https://codecov.io/gh/tvastar/cluster/branch/master/graph/badge.svg)](https://codecov.io/gh/tvastar/cluster)
[![Go Report Card](https://goreportcard.com/badge/github.com/tvastar/cluster)](https://goreportcard.com/report/github.com/tvastar/cluster)

Librarie for building distributed services

## Partition

The [partition](https://godoc.org/github.com/tvastar/cluster/pkg/partition) package supports self-sharding: where a cluster of
servers accept any request but then hash the request and route it to
one of the servers in the request based on the hash.

This combines the "gateway which shards the request to servers" with
the "servers which serve a shard" into one single cluster.

Note that this sharding is not perfect but it is quite useful when
sharding improves performance (by caching requests) or allows serial
execution (to avoid redoing some work). This is not useful when
sharding is needed for correctness (that requires some form of
distributed locking).

40 changes: 40 additions & 0 deletions go.mod
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
module github.com/tvastar/cluster

go 1.12

require (
cloud.google.com/go v0.47.0 // indirect
cloud.google.com/go/bigquery v1.1.0 // indirect
cloud.google.com/go/storage v1.1.2 // indirect
github.com/alicebob/gopher-json v0.0.0-20180125190556-5a6b3ba71ee6 // indirect
github.com/alicebob/miniredis v2.5.0+incompatible
github.com/creack/pty v1.1.9 // indirect
github.com/go-redis/redis/v7 v7.0.0-beta.4
github.com/golang/groupcache v0.0.0-20191025150517-4a4ac3fbac33 // indirect
github.com/golang/protobuf v1.3.2
github.com/gomodule/redigo v2.0.0+incompatible // indirect
github.com/google/go-cmp v0.3.1 // indirect
github.com/google/pprof v0.0.0-20191025152101-a8b9f9d2d3ce // indirect
github.com/hashicorp/golang-lru v0.5.3 // indirect
github.com/jstemmer/go-junit-report v0.9.1 // indirect
github.com/kr/pty v1.1.8 // indirect
github.com/onsi/ginkgo v1.10.2 // indirect
github.com/onsi/gomega v1.7.0 // indirect
github.com/rogpeppe/go-internal v1.5.0 // indirect
github.com/yuin/gopher-lua v0.0.0-20190514113301-1cd887cd7036 // indirect
go.opencensus.io v0.22.1 // indirect
golang.org/x/crypto v0.0.0-20191011191535-87dc89f01550 // indirect
golang.org/x/exp v0.0.0-20191024150812-c286b889502e // indirect
golang.org/x/image v0.0.0-20191009234506-e7c1f5e7dbb8 // indirect
golang.org/x/mobile v0.0.0-20191025110607-73ccc5ba0426 // indirect
golang.org/x/net v0.0.0-20191027093000-83d349e8ac1a // indirect
golang.org/x/sync v0.0.0-20190911185100-cd5d95a43a6e // indirect
golang.org/x/sys v0.0.0-20191026070338-33540a1f6037 // indirect
golang.org/x/time v0.0.0-20191024005414-555d28b269f0 // indirect
golang.org/x/tools v0.0.0-20191026034945-b2104f82a97d // indirect
golang.org/x/xerrors v0.0.0-20191011141410-1b5146add898 // indirect
google.golang.org/appengine v1.6.5 // indirect
google.golang.org/grpc v1.24.0
gopkg.in/check.v1 v1.0.0-20190902080502-41f04d3bba15 // indirect
gopkg.in/yaml.v2 v2.2.4 // indirect
)
210 changes: 210 additions & 0 deletions go.sum

Large diffs are not rendered by default.

26 changes: 26 additions & 0 deletions pkg/partition/errors.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
// Copyright (C) 2019 rameshvk. All rights reserved.
// Use of this source code is governed by a MIT-style license
// that can be found in the LICENSE file.

package partition

type errors []error

func (e *errors) check(err error) {
if err != nil {
*e = append(*e, err)
}
}

func (e *errors) toError() error {
if len(*e) == 0 {
return nil
}
return (*e)[0]
}

type IncorrectPartitionError struct{}

func (e IncorrectPartitionError) Error() string {
return "incorrect partition, retry later"
}
77 changes: 77 additions & 0 deletions pkg/partition/example_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,77 @@
// Copyright (C) 2019 rameshvk. All rights reserved.
// Use of this source code is governed by a MIT-style license
// that can be found in the LICENSE file.

package partition_test

import (
"context"
"fmt"
"time"

"github.com/alicebob/miniredis"
"github.com/tvastar/cluster/pkg/partition"
)

func Example() {
minir, err := miniredis.Run()
if err != nil {
panic(err)
}
defer minir.Close()

opt1 := partition.WithEndpointRegistry(partition.NewRedisRegistry(minir.Addr(), "prefix_"))
opt2 := partition.WithEndpointRegistry(partition.NewRedisRegistry(minir.Addr(), "prefix_"))

ctx := context.Background()
h1 := handler(1)
h2 := handler(2)

part1, err := partition.New(ctx, ":2222", h1, opt1)
if err != nil {
panic(err)
}
defer part1.Close()

part2, err := partition.New(ctx, ":2223", h2, opt2)
if err != nil {
panic(err)
}
defer part2.Close()

// wait for both endpoints to be registered
time.Sleep(100 * time.Millisecond)

if _, err := part1.Run(ctx, 555, []byte("hello")); err != nil {
panic(err)
}

if _, err := part2.Run(ctx, 555, []byte("hello")); err != nil {
panic(err)
}

if _, err := part1.Run(ctx, 22222, []byte("hello")); err != nil {
panic(err)
}

if _, err := part2.Run(ctx, 22222, []byte("hello")); err != nil {
panic(err)
}

// Output:
// [1] Run(555, hello)
// [1] Run(555, hello)
// [2] Run(22222, hello)
// [2] Run(22222, hello)
}

type handler int

func (h handler) Run(ctx context.Context, hash uint64, input []byte) ([]byte, error) {
fmt.Printf("[%d] Run(%d, %s)\n", int(h), hash, string(input))
return input, nil
}

func (h handler) Close() error {
return nil
}
83 changes: 83 additions & 0 deletions pkg/partition/hashring.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,83 @@
// Copyright (C) 2019 rameshvk. All rights reserved.
// Use of this source code is governed by a MIT-style license
// that can be found in the LICENSE file.

package partition

import (
"context"
"fmt"
"hash/crc32"
"sort"
"strconv"
"sync"
)

func NewHashRing() func(ctx context.Context, list []string, hash uint64) string {
ring := &hashring{factor: 1000}
return ring.Pick
}

type hashring struct {
factor int // how many time to duplicate a single key for getting uniform spread

sync.Mutex
list []string
hashes []uint32 // sorted
lookup map[uint32]string // hash => list entry
}

func (h *hashring) Pick(ctx context.Context, list []string, hash uint64) string {
if len(list) == 0 {
return ""
}

hash32 := crc32.ChecksumIEEE([]byte(fmt.Sprint(hash)))
hashes, lookup := h.getSortedHashes(list)

idx := sort.Search(len(hashes), func(i int) bool { return hashes[i] >= hash32 })
if idx == len(hashes) {
idx = 0
}

return lookup[hashes[idx]]
}

func (h *hashring) getSortedHashes(list []string) ([]uint32, map[uint32]string) {
h.Lock()
defer h.Unlock()

if h.listsDifferent(h.list, list) {
h.list = list
h.hashes, h.lookup = h.calculateSortedHashes(list)
}
return h.hashes, h.lookup
}

func (h *hashring) calculateSortedHashes(list []string) ([]uint32, map[uint32]string) {
hashes := make([]uint32, len(list)*h.factor)
dict := map[uint32]string{}
for kk := range list {
for ff := 0; ff < h.factor; ff++ {
data := []byte(strconv.Itoa(ff*12394+1) + "-" + list[kk])
idx := kk*h.factor + ff
hashes[idx] = crc32.ChecksumIEEE(data)
dict[hashes[idx]] = list[kk]
}
}
sort.Slice(hashes, func(i, j int) bool { return hashes[i] < hashes[j] })
return hashes, dict
}

func (h *hashring) listsDifferent(l1, l2 []string) bool {
if len(l1) != len(l2) {
return true
}
for kk := range l1 {
if l1[kk] != l2[kk] {
return true
}
}

return false
}
Loading

0 comments on commit 8dfa2e8

Please sign in to comment.