From aa9bf273d18a2b21b03b56f1d2be8f3b1de24ecb Mon Sep 17 00:00:00 2001 From: iamxy Date: Mon, 26 Sep 2016 12:14:26 +0800 Subject: [PATCH] pump: implement gRPC server and make code more solid. (#9) * drainer: init new cmd * pump: register node to etcd and refresh state by heartbeat. * pump: implement grpc methods. * *: some clean up. * *: improve Makefile and make vet happy * *: use `glide` to manage vendor packages. * *: remove blank lines. * *: address comment * vendor: remove vendor temporarily in this PR * node: fix a type bug. --- Makefile | 62 +++++++++------ cmd/drainer/README.md | 0 cmd/drainer/main.go | 1 + cmd/pump/main.go | 2 +- gitcookie.sh | 10 +++ glide.lock | 57 ++++++++++++++ glide.yaml | 23 ++++++ pkg/etcd/etcd.go | 73 +++++++++++++----- pkg/etcd/etcd_test.go | 3 +- pkg/file/lock_test.go | 1 - pkg/flags/flag.go | 2 +- pkg/flags/urls.go | 6 +- pkg/types/urls.go | 14 +++- pump/binlogger.go | 10 +-- pump/config.go | 68 +++++++++-------- pump/config_test.go | 24 +++--- pump/node.go | 171 +++++++++++++++++++++++++++++++++-------- pump/registry.go | 78 ++++++++++--------- pump/registry_test.go | 42 ++++------- pump/server.go | 172 +++++++++++++++++++++++++++++++++++++++--- pump/util.go | 51 +------------ pump/version.go | 7 +- 22 files changed, 625 insertions(+), 252 deletions(-) create mode 100644 cmd/drainer/README.md create mode 100644 cmd/drainer/main.go create mode 100644 gitcookie.sh create mode 100644 glide.lock create mode 100644 glide.yaml diff --git a/Makefile b/Makefile index 6f1a49dcd..09d5774ce 100644 --- a/Makefile +++ b/Makefile @@ -1,3 +1,16 @@ +### Makefile for tidb-binlog + + +# Ensure GOPATH is set before running build process. +ifeq "$(GOPATH)" "" + $(error Please set the environment variable GOPATH before running `make`) +endif + +CURDIR := $(shell pwd) +export GOPATH := $(CURDIR)/_vendor:$(GOPATH) +path_to_add := $(addsuffix /bin,$(subst :,/bin:,$(GOPATH))) +export PATH := $(path_to_add):$(PATH) + ARCH := "`uname -s`" LINUX := "Linux" MAC := "Darwin" @@ -7,71 +20,72 @@ FILES := $$(find . -name '*.go' -type f | grep -vE 'vendor') LDFLAGS += -X "github.com/pingcap/tidb-binlog/pump.BuildTS=$(shell date -u '+%Y-%m-%d %I:%M:%S')" LDFLAGS += -X "github.com/pingcap/tidb-binlog/pump.GitSHA=$(shell git rev-parse HEAD)" -default: build +default: build buildsucc + +buildsucc: + @echo Build TiDB Binlog Utils successfully! all: dev install dev: build check test -build: pump server +build: pump server drainer proto/pump.pb.go: proto/pump.proto sh proto/generate.sh pump: proto/pump.pb.go - rm -rf vendor && ln -s _vendor/vendor vendor GO15VENDOREXPERIMENT=1 go build -ldflags '$(LDFLAGS)' -o bin/pump cmd/pump/main.go - rm -rf vendor server: proto/pump.pb.go - rm -rf vendor && ln -s _vendor/vendor vendor GO15VENDOREXPERIMENT=1 go build -ldflags '$(LDFLAGS)' -o bin/binlog-server cmd/binlog-server/main.go - rm -rf vendor + +drainer: + GO15VENDOREXPERIMENT=1 go build -ldflags '$(LDFLAGS)' -o bin/drainer cmd/drainer/main.go proto: proto/pump.pb.go install: - rm -rf vendor && ln -s _vendor/vendor vendor go install ./... - rm -rf vendor test: - rm -rf vendor && ln -s _vendor/vendor vendor @export log_level=error;\ - go test -cover $(PACKAGES) - rm -rf vendor + GO15VENDOREXPERIMENT=1 go test -cover $(PACKAGES) fmt: go fmt ./... @goimports -w $(FILES) check: + bash gitcookie.sh go get github.com/golang/lint/golint - go tool vet . 2>&1 | grep -vE 'vendor|render.Delims' | awk '{print} END{if(NR>0) {exit 1}}' - go tool vet --shadow . 2>&1 | grep -vE 'vendor' | awk '{print} END{if(NR>0) {exit 1}}' - golint ./... 2>&1 | grep -vE 'vendor' | awk '{print} END{if(NR>0) {exit 1}}' - gofmt -s -l . 2>&1 | grep -vE 'vendor' | awk '{print} END{if(NR>0) {exit 1}}' + @echo "vet" + @ go tool vet $(FILES) 2>&1 | awk '{print} END{if(NR>0) {exit 1}}' + @echo "vet --shadow" + @ go tool vet --shadow $(FILES) 2>&1 | awk '{print} END{if(NR>0) {exit 1}}' + @echo "golint" + @ golint ./... 2>&1 | grep -vE '\.pb\.go' | awk '{print} END{if(NR>0) {exit 1}}' + @echo "gofmt (simplify)" + @ gofmt -s -l -w $(FILES) 2>&1 | awk '{print} END{if(NR>0) {exit 1}}' update: which glide >/dev/null || curl https://glide.sh/get | sh which glide-vc || go get -v -u github.com/sgotti/glide-vc - rm -r vendor && mv _vendor/vendor vendor || true + rm -r vendor && mv _vendor/src vendor || true rm -rf _vendor ifdef PKG - glide --verbose get --strip-vendor --skip-test ${PKG} + glide get -s -v --skip-test ${PKG} else - glide --verbose update --strip-vendor --skip-test + glide update -s -v --skip-test endif @echo "removing test files" glide vc --only-code --no-tests mkdir -p _vendor - mv vendor _vendor/vendor + mv vendor _vendor/src clean: - find . -type s -exec rm {} \; - rm -rf vendor && ln -s _vendor/vendor vendor - go clean ./... - rm -rf vendor + go clean -i ./... + rm -rf *.out -.PHONY: build test check update clean pump server fmt proto +.PHONY: build test check update clean pump server drainer fmt proto diff --git a/cmd/drainer/README.md b/cmd/drainer/README.md new file mode 100644 index 000000000..e69de29bb diff --git a/cmd/drainer/main.go b/cmd/drainer/main.go new file mode 100644 index 000000000..1e3993661 --- /dev/null +++ b/cmd/drainer/main.go @@ -0,0 +1 @@ +package drainer diff --git a/cmd/pump/main.go b/cmd/pump/main.go index ac2c69cb7..ddf052901 100644 --- a/cmd/pump/main.go +++ b/cmd/pump/main.go @@ -23,7 +23,7 @@ func main() { os.Exit(2) } - pump.InitLogger(cfg) + pump.InitLogger(cfg.Debug) pump.PrintVersionInfo() sc := make(chan os.Signal, 1) diff --git a/gitcookie.sh b/gitcookie.sh new file mode 100644 index 000000000..ea4bae174 --- /dev/null +++ b/gitcookie.sh @@ -0,0 +1,10 @@ +#!/usr/bin/env bash +touch ~/.gitcookies +chmod 0600 ~/.gitcookies + +git config --global http.cookiefile ~/.gitcookies + +tr , \\t <<\__END__ >>~/.gitcookies +go.googlesource.com,FALSE,/,TRUE,2147483647,o,git-shenli.pingcap.com=1/rGvVlvFq_x9rxOmXqQe_rfcrjbOk6NSOHIQKhhsfidM +go-review.googlesource.com,FALSE,/,TRUE,2147483647,o,git-shenli.pingcap.com=1/rGvVlvFq_x9rxOmXqQe_rfcrjbOk6NSOHIQKhhsfidM +__END__ \ No newline at end of file diff --git a/glide.lock b/glide.lock new file mode 100644 index 000000000..98456f525 --- /dev/null +++ b/glide.lock @@ -0,0 +1,57 @@ +hash: 8290402a2e8dbd173ba0fcd7dcf5b662b72d7623227808e7737c37ac24cbfa62 +updated: 2016-09-24T13:33:25.211040403+08:00 +imports: +- name: github.com/coreos/etcd + version: 2469a9568548b3f8abc22f5a2462c3443823334a + subpackages: + - auth/authpb + - clientv3 + - etcdserver/api/v3rpc/rpctypes + - etcdserver/etcdserverpb + - mvcc/mvccpb + - pkg/tlsutil +- name: github.com/ghodss/yaml + version: 73d445a93680fa1a78ae23a5839bad48f32ba1ee +- name: github.com/golang/protobuf + version: 8616e8ee5e20a1704615e6c8d7afcdac06087a67 + subpackages: + - jsonpb + - proto +- name: github.com/grpc-ecosystem/grpc-gateway + version: f52d055dc48aec25854ed7d31862f78913cf17d1 + subpackages: + - runtime + - runtime/internal + - utilities +- name: github.com/jonboulle/clockwork + version: 2eee05ed794112d45db504eb05aa693efd2b8b09 +- name: github.com/juju/errors + version: 6f54ff6318409d31ff16261533ce2c8381a4fd5d +- name: github.com/ngaut/log + version: cec23d3e10b016363780d894a0eb732a12c06e02 +- name: github.com/twinj/uuid + version: 89173bcdda19db0eb88aef1e1cb1cb2505561d31 +- name: golang.org/x/net + version: 6acef71eb69611914f7a30939ea9f6e194c78172 + subpackages: + - context + - http2 + - http2/hpack + - internal/timeseries + - trace +- name: google.golang.org/grpc + version: 0032a855ba5c8a3c8e0d71c2deef354b70af1584 + subpackages: + - codes + - credentials + - grpclog + - internal + - metadata + - naming + - peer + - transport +- name: gopkg.in/yaml.v2 + version: 53feefa2559fb8dfa8d81baad31be332c97d6c77 +testImports: +- name: github.com/pingcap/check + version: "" diff --git a/glide.yaml b/glide.yaml new file mode 100644 index 000000000..ef65447df --- /dev/null +++ b/glide.yaml @@ -0,0 +1,23 @@ +package: github.com/pingcap/tidb-binlog +import: +- package: github.com/coreos/etcd + version: ^3.1.0-alpha.0 + subpackages: + - clientv3 +- package: github.com/ghodss/yaml +- package: github.com/golang/protobuf + subpackages: + - proto +- package: github.com/jonboulle/clockwork + version: ^0.1.0 +- package: github.com/juju/errors +- package: github.com/ngaut/log +- package: github.com/twinj/uuid + version: ^0.10.0 +- package: golang.org/x/net + subpackages: + - context +- package: google.golang.org/grpc + version: ^1.0.1-GA +testImport: +- package: github.com/pingcap/check diff --git a/pkg/etcd/etcd.go b/pkg/etcd/etcd.go index c093b82d7..aee8c0fab 100644 --- a/pkg/etcd/etcd.go +++ b/pkg/etcd/etcd.go @@ -3,12 +3,18 @@ package etcd import ( "path" "strings" + "time" "github.com/coreos/etcd/clientv3" "github.com/juju/errors" "golang.org/x/net/context" ) +const ( + // DefaultRootPath is the root path of the keys stored in etcd + DefaultRootPath = "tidb-binlog" +) + // Node organize the ectd query result as a Trie tree type Node struct { Value []byte @@ -17,23 +23,39 @@ type Node struct { // Client is a wrapped etcd client that support some simple method type Client struct { - client *clientv3.Client - pathPrefix string + client *clientv3.Client + rootPath string } -// NewClient return an EtcdClient obj -func NewClient(client *clientv3.Client, pathPrefix string) *Client { +// NewClient return a wrapped etcd client +func NewClient(cli *clientv3.Client, root string) *Client { return &Client{ - client: client, - pathPrefix: pathPrefix, + client: cli, + rootPath: root, + } +} + +// NewClientFromCfg return a wrapped etcd client +func NewClientFromCfg(endpoints []string, dialTimeout time.Duration, root string) (*Client, error) { + cli, err := clientv3.New(clientv3.Config{ + Endpoints: endpoints, + DialTimeout: dialTimeout, + }) + if err != nil { + return nil, errors.Trace(err) } + + return &Client{ + client: cli, + rootPath: root, + }, nil } // Create guarantees to set a key = value with some options(like ttl) func (e *Client) Create(ctx context.Context, key string, val string, opts []clientv3.OpOption) error { - key = keyWithPrefix(e.pathPrefix, key) + key = keyWithPrefix(e.rootPath, key) txnResp, err := e.client.KV.Txn(ctx).If( - notFound(key), + clientv3.Compare(clientv3.ModRevision(key), "=", 0), ).Then( clientv3.OpPut(key, val, opts...), ).Commit() @@ -42,7 +64,7 @@ func (e *Client) Create(ctx context.Context, key string, val string, opts []clie } if !txnResp.Succeeded { - return errors.AlreadyExistsf("key %s is not found in etcd", key) + return errors.AlreadyExistsf("key %s in etcd", key) } return nil @@ -50,7 +72,7 @@ func (e *Client) Create(ctx context.Context, key string, val string, opts []clie // Get return a key/value matchs the given key func (e *Client) Get(ctx context.Context, key string) ([]byte, error) { - key = keyWithPrefix(e.pathPrefix, key) + key = keyWithPrefix(e.rootPath, key) resp, err := e.client.KV.Get(ctx, key) if err != nil { return nil, errors.Trace(err) @@ -66,7 +88,7 @@ func (e *Client) Get(ctx context.Context, key string) ([]byte, error) { // Update updates a key/value. // set ttl 0 to disable the Lease ttl feature func (e *Client) Update(ctx context.Context, key string, val string, ttl int64) error { - key = keyWithPrefix(e.pathPrefix, key) + key = keyWithPrefix(e.rootPath, key) var opts []clientv3.OpOption if ttl > 0 { @@ -88,15 +110,36 @@ func (e *Client) Update(ctx context.Context, key string, val string, ttl int64) } if !txnResp.Succeeded { - return errors.NotFoundf("key %s is not found in etcd", key) + return errors.NotFoundf("key %s in etcd", key) + } + + return nil +} + +// UpdateOrCreate updates a key/value, if the key does not exist then create, or update +func (e *Client) UpdateOrCreate(ctx context.Context, key string, val string, ttl int64) error { + key = keyWithPrefix(e.rootPath, key) + + var opts []clientv3.OpOption + if ttl > 0 { + lcr, err := e.client.Lease.Grant(ctx, ttl) + if err != nil { + return errors.Trace(err) + } + + opts = []clientv3.OpOption{clientv3.WithLease(lcr.ID)} } + _, err := e.client.KV.Do(ctx, clientv3.OpPut(key, val, opts...)) + if err != nil { + return errors.Trace(err) + } return nil } // List return the trie struct that constructed by the key/value with same prefix func (e *Client) List(ctx context.Context, key string) (*Node, error) { - key = keyWithPrefix(e.pathPrefix, key) + key = keyWithPrefix(e.rootPath, key) if !strings.HasSuffix(key, "/") { key += "/" } @@ -145,10 +188,6 @@ func parseToDirTree(root *Node, path string) *Node { return current } -func notFound(key string) clientv3.Cmp { - return clientv3.Compare(clientv3.ModRevision(key), "=", 0) -} - func keyWithPrefix(prefix, key string) string { if strings.HasPrefix(key, prefix) { return key diff --git a/pkg/etcd/etcd_test.go b/pkg/etcd/etcd_test.go index d5de059ea..a1809d806 100644 --- a/pkg/etcd/etcd_test.go +++ b/pkg/etcd/etcd_test.go @@ -169,6 +169,5 @@ func TestList(t *testing.T) { func testSetup(t *testing.T) (context.Context, *Client, *integration.ClusterV3) { cluster := integration.NewClusterV3(t, &integration.ClusterConfig{Size: 1}) etcd := NewClient(cluster.RandClient(), "binlog") - ctx := context.Background() - return ctx, etcd, cluster + return context.Background(), etcd, cluster } diff --git a/pkg/file/lock_test.go b/pkg/file/lock_test.go index 3cfb04723..16e622046 100644 --- a/pkg/file/lock_test.go +++ b/pkg/file/lock_test.go @@ -8,7 +8,6 @@ import ( . "github.com/pingcap/check" ) - var _ = Suite(&testLockSuite{}) type testLockSuite struct{} diff --git a/pkg/flags/flag.go b/pkg/flags/flag.go index b2273c598..fa9d96450 100644 --- a/pkg/flags/flag.go +++ b/pkg/flags/flag.go @@ -79,7 +79,7 @@ func URLsFromFlag(fs *flag.FlagSet, urlsFlagName string) []url.URL { return fs.Lookup(urlsFlagName).Value.(*URLsValue).URLSlice() } -// URLsFromFlag returns a string slices from url got from the flag. +// URLStrsFromFlag returns a string slices from url got from the flag. func URLStrsFromFlag(fs *flag.FlagSet, urlsFlagName string) []string { return fs.Lookup(urlsFlagName).Value.(*URLsValue).StringSlice() } diff --git a/pkg/flags/urls.go b/pkg/flags/urls.go index cc2966989..bfbb03f28 100644 --- a/pkg/flags/urls.go +++ b/pkg/flags/urls.go @@ -1,13 +1,14 @@ package flags import ( - "strings" "net/url" + "strings" "github.com/juju/errors" "github.com/pingcap/tidb-binlog/pkg/types" ) +// URLsValue define a slice of URLs as a type type URLsValue types.URLs // Set parses a command line set of URLs formatted like: @@ -31,6 +32,7 @@ func (us *URLsValue) String() string { return strings.Join(all, ",") } +// StringSlice return a slice of string with formatted URL func (us *URLsValue) StringSlice() []string { all := make([]string, len(*us)) for i, u := range *us { @@ -39,11 +41,13 @@ func (us *URLsValue) StringSlice() []string { return all } +// URLSlice return a slice of URLs func (us *URLsValue) URLSlice() []url.URL { urls := []url.URL(*us) return urls } +// NewURLsValue return a URLsValue from a string of URLs list func NewURLsValue(init string) (*URLsValue, error) { v := &URLsValue{} err := v.Set(init) diff --git a/pkg/types/urls.go b/pkg/types/urls.go index 936b47fb8..e2685f2e2 100644 --- a/pkg/types/urls.go +++ b/pkg/types/urls.go @@ -9,8 +9,10 @@ import ( "github.com/juju/errors" ) +// URLs defines a slice of URLs as a type type URLs []url.URL +// NewURLs return a URLs from a slice of formatted URL strings func NewURLs(strs []string) (URLs, error) { all := make([]url.URL, len(strs)) if len(all) == 0 { @@ -39,18 +41,26 @@ func NewURLs(strs []string) (URLs, error) { return us, nil } +// String return a string of list of URLs witch separated by comma func (us URLs) String() string { return strings.Join(us.StringSlice(), ",") } +// Sort sorts the URLs func (us *URLs) Sort() { sort.Sort(us) } -func (us URLs) Len() int { return len(us) } +// Len return the lenght of URL slice +func (us URLs) Len() int { return len(us) } + +// Less compares two URL and return the less one func (us URLs) Less(i, j int) bool { return us[i].String() < us[j].String() } -func (us URLs) Swap(i, j int) { us[i], us[j] = us[j], us[i] } +// Swap swaps two URLs in the slice +func (us URLs) Swap(i, j int) { us[i], us[j] = us[j], us[i] } + +// StringSlice return a slice of formatted string of URL func (us URLs) StringSlice() []string { out := make([]string, len(us)) for i := range us { diff --git a/pump/binlogger.go b/pump/binlogger.go index ab057485d..4ac4eb290 100644 --- a/pump/binlogger.go +++ b/pump/binlogger.go @@ -32,7 +32,7 @@ var ( // Binlogger is the interface that for append and read binlog type Binlogger interface { // read nums binlog events from the "from" position - ReadFrom(from pb.Pos, nums int) ([]pb.Binlog, error) + ReadFrom(from pb.Pos, nums int32) ([]pb.Binlog, error) // batch write binlog event WriteTail(payload []byte) error @@ -79,7 +79,7 @@ func CreateBinlogger(dirpath string) (Binlogger, error) { return binlog, nil } -//OpenBinlogger returns a binlogger for write, then it can be appendd +//OpenBinlogger returns a binlogger for write, then it can be appended func OpenBinlogger(dirpath string) (Binlogger, error) { names, err := readBinlogNames(dirpath) if err != nil { @@ -98,7 +98,7 @@ func OpenBinlogger(dirpath string) (Binlogger, error) { } p := path.Join(dirpath, lastFileName) - f, err := file.TryLockFile(p, os.O_RDWR, file.PrivateFileMode) + f, err := file.TryLockFile(p, os.O_WRONLY, file.PrivateFileMode) if err != nil { return nil, errors.Trace(err) } @@ -124,10 +124,10 @@ func CloseBinlogger(binlogger Binlogger) error { // Read reads nums logs from the given log position. // it reads binlogs from files and append to result set util the count = num // after reads all binlog from one file then close it and open the following file -func (b *binlogger) ReadFrom(from pb.Pos, nums int) ([]pb.Binlog, error) { +func (b *binlogger) ReadFrom(from pb.Pos, nums int32) ([]pb.Binlog, error) { var ent = &pb.Binlog{} var ents = []pb.Binlog{} - var index int + var index int32 var decoder *decoder var first = true diff --git a/pump/config.go b/pump/config.go index 390bdf4de..496bb8a90 100644 --- a/pump/config.go +++ b/pump/config.go @@ -19,24 +19,26 @@ const ( defaultEtcdDialTimeout = 5 * time.Second defaultEtcdURLs = "http://127.0.0.1:2379" defaultListenAddr = "127.0.0.1:8250" - defaultHeartbeatInterval = 1000 + defaultHeartbeatInterval = 2000 defaultDataDir = "data.pump" ) +// Config holds the configuration of pump type Config struct { *flag.FlagSet - ListenAddr string `json:"addr"` - AdvertiseAddr string `json:"advertise-addr"` - EtcdURLs string `json:"pd-urls"` - EtcdDialTimeout time.Duration - DataDir string `json:"data-dir"` - HeartbeatMS uint `json:"heartbeat-interval"` - Debug bool - configFile string - printVersion bool + ListenAddr string `json:"addr"` + AdvertiseAddr string `json:"advertise-addr"` + EtcdURLs string `json:"pd-urls"` + EtcdDialTimeout time.Duration + DataDir string `json:"data-dir"` + HeartbeatInterval uint `json:"heartbeat-interval"` + Debug bool + configFile string + printVersion bool } +// NewConfig return an instance of configuration func NewConfig() *Config { cfg := &Config{ EtcdDialTimeout: defaultEtcdDialTimeout, @@ -52,7 +54,7 @@ func NewConfig() *Config { fs.StringVar(&cfg.AdvertiseAddr, "advertise-addr", "", "addr(i.e. 'host:port') to advertise to the public") fs.StringVar(&cfg.EtcdURLs, "pd-urls", defaultEtcdURLs, "a comma separated list of the PD endpoints") fs.StringVar(&cfg.DataDir, "data-dir", "", "the path to store binlog data") - fs.UintVar(&cfg.HeartbeatMS, "heartbeat-interval", defaultHeartbeatInterval, "number of milliseconds between heartbeat ticks") + fs.UintVar(&cfg.HeartbeatInterval, "heartbeat-interval", defaultHeartbeatInterval, "number of milliseconds between heartbeat ticks") fs.BoolVar(&cfg.Debug, "debug", false, "whether to enable debug-level logging") fs.StringVar(&cfg.configFile, "config-file", "", "path to the pump configuration file") fs.BoolVar(&cfg.printVersion, "version", false, "print pump version info") @@ -60,6 +62,7 @@ func NewConfig() *Config { return cfg } +// Parse parses all config from command-line flags, environment vars or configuration file func (cfg *Config) Parse(arguments []string) error { // Parse first to get config file perr := cfg.FlagSet.Parse(arguments) @@ -105,7 +108,7 @@ func (cfg *Config) Parse(arguments []string) error { cfg.AdvertiseAddr = "http://" + cfg.AdvertiseAddr // add 'http:' scheme to facilitate parsing adjustDuration(&cfg.EtcdDialTimeout, defaultEtcdDialTimeout) adjustString(&cfg.DataDir, defaultDataDir) - adjustUint(&cfg.HeartbeatMS, defaultHeartbeatInterval) + adjustUint(&cfg.HeartbeatInterval, defaultHeartbeatInterval) return cfg.validate() } @@ -143,35 +146,36 @@ func adjustDuration(v *time.Duration, defValue time.Duration) { // validate checks whether the configuration is valid func (cfg *Config) validate() error { // check ListenAddr - if urllis, err := url.Parse(cfg.ListenAddr); err != nil { + urllis, err := url.Parse(cfg.ListenAddr) + if err != nil { return errors.Errorf("parse ListenAddr error: %s, %v", cfg.ListenAddr, err) - } else { - if _, _, err := net.SplitHostPort(urllis.Host); err != nil { - return errors.Errorf("bad ListenAddr host format: %s, %v", urllis.Host, err) - } + } + + if _, _, err := net.SplitHostPort(urllis.Host); err != nil { + return errors.Errorf("bad ListenAddr host format: %s, %v", urllis.Host, err) } // check AdvertiseAddr - if urladv, err := url.Parse(cfg.AdvertiseAddr); err != nil { + urladv, err := url.Parse(cfg.AdvertiseAddr) + if err != nil { return errors.Errorf("parse AdvertiseAddr error: %s, %v", cfg.AdvertiseAddr, err) - } else { - if host, _, err := net.SplitHostPort(urladv.Host); err != nil { - return errors.Errorf("bad AdvertiseAddr host format: %s, %v", urladv.Host, err) - } else { - if host == "0.0.0.0" { - return errors.New("advertiseAddr host is not allowed to be set to 0.0.0.0") - } - } + } + host, _, err := net.SplitHostPort(urladv.Host) + if err != nil { + return errors.Errorf("bad AdvertiseAddr host format: %s, %v", urladv.Host, err) + } + if host == "0.0.0.0" { + return errors.New("advertiseAddr host is not allowed to be set to 0.0.0.0") } // check EtcdEndpoints - if urlv, err := flags.NewURLsValue(cfg.EtcdURLs); err != nil { + urlv, err := flags.NewURLsValue(cfg.EtcdURLs) + if err != nil { return errors.Errorf("parse EtcdURLs error: %s, %v", cfg.EtcdURLs, err) - } else { - for _, u := range urlv.URLSlice() { - if _, _, err := net.SplitHostPort(u.Host); err != nil { - return errors.Errorf("bad EtcdURL host format: %s, %v", u.Host, err) - } + } + for _, u := range urlv.URLSlice() { + if _, _, err := net.SplitHostPort(u.Host); err != nil { + return errors.Errorf("bad EtcdURL host format: %s, %v", u.Host, err) } } diff --git a/pump/config_test.go b/pump/config_test.go index 67a5c8ac4..c77e89ab9 100644 --- a/pump/config_test.go +++ b/pump/config_test.go @@ -51,11 +51,11 @@ func (s *testConfigSuite) TestConfigParsingEnvFlags(c *C) { func (s *testConfigSuite) TestConfigParsingFileFlags(c *C) { yc := struct { - ListenAddr string `json:"addr"` - AdvertiseAddr string `json:"advertise-addr"` - EtcdURLs string `json:"pd-urls"` - BinlogDir string `json:"data-dir"` - HeartbeatMS uint `json:"heartbeat-interval"` + ListenAddr string `json:"addr"` + AdvertiseAddr string `json:"advertise-addr"` + EtcdURLs string `json:"pd-urls"` + BinlogDir string `json:"data-dir"` + HeartbeatInterval uint `json:"heartbeat-interval"` }{ "192.168.199.100:8260", "192.168.199.100:8260", @@ -101,18 +101,18 @@ func mustCreateCfgFile(c *C, b []byte, prefix string) *os.File { func validateConfig(c *C, cfg *Config) { vcfg := &Config{ - ListenAddr: "http://192.168.199.100:8260", - AdvertiseAddr: "http://192.168.199.100:8260", - EtcdURLs: "http://192.168.199.110:2379,http://hostname:2379", - DataDir: "/tmp/pump", - HeartbeatMS: 1500, - Debug: true, + ListenAddr: "http://192.168.199.100:8260", + AdvertiseAddr: "http://192.168.199.100:8260", + EtcdURLs: "http://192.168.199.110:2379,http://hostname:2379", + DataDir: "/tmp/pump", + HeartbeatInterval: 1500, + Debug: true, } c.Assert(cfg.ListenAddr, Equals, vcfg.ListenAddr) c.Assert(cfg.AdvertiseAddr, Equals, vcfg.AdvertiseAddr) c.Assert(cfg.EtcdURLs, Equals, vcfg.EtcdURLs) c.Assert(cfg.DataDir, Equals, vcfg.DataDir) - c.Assert(cfg.HeartbeatMS, Equals, vcfg.HeartbeatMS) + c.Assert(cfg.HeartbeatInterval, Equals, vcfg.HeartbeatInterval) c.Assert(cfg.Debug, Equals, vcfg.Debug) } diff --git a/pump/node.go b/pump/node.go index 8cbbb08fd..580abcf29 100644 --- a/pump/node.go +++ b/pump/node.go @@ -1,53 +1,96 @@ package pump import ( + "context" + "io/ioutil" + "net/url" + "os" + "path/filepath" + "time" + + "github.com/jonboulle/clockwork" "github.com/juju/errors" - "github.com/ngaut/log" + "github.com/pingcap/tidb-binlog/pkg/etcd" + "github.com/pingcap/tidb-binlog/pkg/file" + "github.com/pingcap/tidb-binlog/pkg/flags" pb "github.com/pingcap/tidb-binlog/proto" + "github.com/twinj/uuid" ) const ( shortIDLen = 8 - nodeDir = ".node" - nodeIDFile = "nodeID" + nodeIDFile = ".node" + lockFile = ".lock" ) -// Node is a node interface that has the node basic infomation quey method +// Node holds the states of this pump node type Node interface { + // ID return the uuid representing of this pump node ID() string + // a short ID as 8 bytes length ShortID() string - NodeID(ID string) bool - Status() *NodeStatus + // Register register this pump node to Etcd + // create new one if nodeID not exist, or update it + Register(ctx context.Context) error + // Heartbeat refreshes the state of this pump node in etcd periodically + // if the pump is dead, the key 'root/nodes//alive' will dissolve after a TTL time passed + Heartbeat(ctx context.Context, done <-chan struct{}) <-chan error } -// NodeStatus has some basic node status infomations -type NodeStatus struct { - NodeID string - Host string - IsAlive bool - Offsets map[string]*pb.Pos +type pumpNode struct { + *EtcdRegistry + id string + host string + heartbeatTTL int64 + heartbeatInterval time.Duration } -type pumpNode struct { - id string - status *NodeStatus +// NodeStatus describes the status information of a node in etcd +type NodeStatus struct { + NodeID string + Host string + IsAlive bool + LastReadPos map[string]*pb.Pos } // NewPumpNode return a pumpNode obj that inited by server config func NewPumpNode(cfg *Config) (Node, error) { - nodeID, err := readLocalNodeID(cfg.DataDir) + if err := checkExclusive(cfg.DataDir); err != nil { + return nil, errors.Trace(err) + } + + urlv, err := flags.NewURLsValue(cfg.EtcdURLs) + if err != nil { + return nil, errors.Trace(err) + } + cli, err := etcd.NewClientFromCfg(urlv.StringSlice(), cfg.EtcdDialTimeout, etcd.DefaultRootPath) if err != nil { - log.Errorf("Read local node ID error, %v", err) return nil, errors.Trace(err) } + nodeID, err := readLocalNodeID(cfg.DataDir) + if err != nil { + if errors.IsNotFound(err) { + nodeID, err = generateLocalNodeID(cfg.DataDir) + if err != nil { + return nil, errors.Trace(err) + } + } else { + return nil, errors.Trace(err) + } + } + + advURL, err := url.Parse(cfg.AdvertiseAddr) + if err != nil { + return nil, errors.Annotatef(err, "invalid configuration of advertise addr(%s)", cfg.AdvertiseAddr) + } + node := &pumpNode{ - id: nodeID, - status: &NodeStatus{ - NodeID: nodeID, - Host: cfg.AdvertiseAddr, - IsAlive: true, - }, + EtcdRegistry: NewEtcdRegistry(cli, cfg.EtcdDialTimeout), + id: nodeID, + host: advURL.Host, + heartbeatInterval: time.Duration(cfg.HeartbeatInterval), + heartbeatTTL: int64(cfg.HeartbeatInterval) * 3 / 2, } return node, nil } @@ -56,10 +99,6 @@ func (p *pumpNode) ID() string { return p.id } -func (p *pumpNode) Host() string { - return p.status.Host -} - func (p *pumpNode) ShortID() string { if len(p.id) <= shortIDLen { return p.id @@ -67,10 +106,80 @@ func (p *pumpNode) ShortID() string { return p.id[0:shortIDLen] } -func (p *pumpNode) NodeID(ID string) bool { - return p.id == ID || p.ShortID() == ID +func (p *pumpNode) Register(ctx context.Context) error { + err := p.RegisterNode(ctx, p.id, p.host) + if err != nil { + return errors.Trace(err) + } + return nil +} + +func (p *pumpNode) Heartbeat(ctx context.Context, done <-chan struct{}) <-chan error { + errc := make(chan error, 1) + go func() { + var clock = clockwork.NewRealClock() + for { + select { + case <-done: + // stop heartbeat and prepare to exit + close(errc) + return + case <-clock.After(p.heartbeatInterval): + if err := p.RefreshNode(ctx, p.id, p.heartbeatTTL); err != nil { + errc <- errors.Trace(err) + } + } + } + }() + return errc +} + +// readLocalNodeID read nodeID from a local file +// returns a NotFound error if the nodeID file not exist +// in this case, the caller should invoke generateLocalNodeID() +func readLocalNodeID(dataDir string) (string, error) { + nodeIDPath := filepath.Join(dataDir, nodeIDFile) + if _, err := CheckFileExist(nodeIDPath); err != nil { + return "", errors.NewNotFound(err, "local nodeID file not exist") + } + data, err := ioutil.ReadFile(nodeIDPath) + if err != nil { + return "", errors.Annotate(err, "local nodeID file is collapsed") + } + if len(data) < 16 { + return "", errors.Errorf("local nodeID file(%s) is collapsed", nodeIDPath) + } + id := uuid.New(data) + return id.String(), nil +} + +// generate a new nodeID, and store it to local filesystem +func generateLocalNodeID(dataDir string) (string, error) { + if err := os.MkdirAll(dataDir, file.PrivateDirMode); err != nil { + return "", errors.Trace(err) + } + + id := uuid.NewV1() + nodeIDPath := filepath.Join(dataDir, nodeIDFile) + if err := ioutil.WriteFile(nodeIDPath, id.Bytes(), file.PrivateFileMode); err != nil { + return "", errors.Trace(err) + } + return id.String(), nil } -func (p *pumpNode) Status() *NodeStatus { - return p.status +// checkExclusive try to get filelock of dataDir in exclusive mode +// if get lock fails, maybe some other pump is running +func checkExclusive(dataDir string) error { + err := os.MkdirAll(dataDir, file.PrivateDirMode) + if err != nil { + return errors.Trace(err) + } + lockPath := filepath.Join(dataDir, lockFile) + // when the process exits, the lockfile will be closed by system + // and automatically release the lock + _, err = file.TryLockFile(lockPath, os.O_WRONLY|os.O_CREATE, file.PrivateFileMode) + if err != nil { + return errors.Trace(err) + } + return nil } diff --git a/pump/registry.go b/pump/registry.go index d3539aea8..84bb05cff 100644 --- a/pump/registry.go +++ b/pump/registry.go @@ -6,12 +6,11 @@ import ( "time" "github.com/juju/errors" - "github.com/ngaut/log" "github.com/pingcap/tidb-binlog/pkg/etcd" "golang.org/x/net/context" ) -const nodePrefix = "node" +const nodePrefix = "nodes" // EtcdRegistry wraps the reaction with etcd type EtcdRegistry struct { @@ -32,35 +31,36 @@ func (r *EtcdRegistry) prefixed(p ...string) string { } // Node returns the nodeStatus that matchs nodeID in the etcd -func (r *EtcdRegistry) Node(ctx context.Context, nodeID string) (*NodeStatus, error) { +func (r *EtcdRegistry) Node(pctx context.Context, nodeID string) (*NodeStatus, error) { + ctx, cancel := context.WithTimeout(pctx, r.reqTimeout) + defer cancel() + resp, err := r.client.List(ctx, r.prefixed(nodePrefix, nodeID)) if err != nil { return nil, errors.Trace(err) } - status, err := nodeStatusFromEtcdNode(nodeID, resp) if err != nil { return nil, errors.Errorf("Invalid node, nodeID[%s], error[%v]", nodeID, err) } - return status, nil } // RegisterNode register the node in the etcd -func (r *EtcdRegistry) RegisterNode(ctx context.Context, nodeID, host string) error { +func (r *EtcdRegistry) RegisterNode(pctx context.Context, nodeID, host string) error { + ctx, cancel := context.WithTimeout(pctx, r.reqTimeout) + defer cancel() + if exists, err := r.checkNodeExists(ctx, nodeID); err != nil { return errors.Trace(err) } else if !exists { // not found then create a new node return r.createNode(ctx, nodeID, host) + } else { + // found it, update host infomation of the node + return r.updateNode(ctx, nodeID, host) } - // found it, update host infomation of the node - nodeStatus := &NodeStatus{ - NodeID: nodeID, - Host: host, - } - return r.UpdateNodeStatus(ctx, nodeID, nodeStatus) } func (r *EtcdRegistry) checkNodeExists(ctx context.Context, nodeID string) (bool, error) { @@ -74,44 +74,54 @@ func (r *EtcdRegistry) checkNodeExists(ctx context.Context, nodeID string) (bool return true, nil } -// UpdateNodeStatus updates the node -func (r *EtcdRegistry) UpdateNodeStatus(ctx context.Context, nodeID string, nodeStatus *NodeStatus) error { - object, err := json.Marshal(nodeStatus) +// UpdateNode updates the node +func (r *EtcdRegistry) UpdateNode(pctx context.Context, nodeID, host string) error { + ctx, cancel := context.WithTimeout(pctx, r.reqTimeout) + defer cancel() + + return r.updateNode(ctx, nodeID, host) +} + +func (r *EtcdRegistry) updateNode(ctx context.Context, nodeID, host string) error { + obj := &NodeStatus{ + NodeID: nodeID, + Host: host, + } + objstr, err := json.Marshal(obj) if err != nil { - return errors.Errorf("Error marshaling NodeSattus, %v, %v", object, err) + return errors.Annotatef(err, "error marshal NodeStatus(%v)", obj) } - key := r.prefixed(nodePrefix, nodeID, "object") - if err := r.client.Update(ctx, key, string(object), 0); err != nil { - return errors.Errorf("Failed to update NodeStatus in etcd, %s, %v, %v", nodeID, object, err) + if err := r.client.Update(ctx, key, string(objstr), 0); err != nil { + return errors.Annotatef(err, "fail to update node with NodeStatus(%v)", obj) } return nil } -func (r *EtcdRegistry) createNode(ctx context.Context, nodeID string, host string) error { - object := &NodeStatus{ +func (r *EtcdRegistry) createNode(ctx context.Context, nodeID, host string) error { + obj := &NodeStatus{ NodeID: nodeID, Host: host, } - - objstr, err := json.Marshal(object) + objstr, err := json.Marshal(obj) if err != nil { - return errors.Errorf("Error marshaling NodeStatus, %v, %v", object, err) + return errors.Annotatef(err, "error marshal NodeStatus(%v)", obj) } - - if err = r.client.Create(ctx, r.prefixed(nodePrefix, nodeID, "object"), string(objstr), nil); err != nil { - return errors.Errorf("Failed to create NodeStatus of node, %s, %v, %v", nodeID, object, err) + key := r.prefixed(nodePrefix, nodeID, "object") + if err := r.client.Create(ctx, key, string(objstr), nil); err != nil { + return errors.Annotatef(err, "fail to create node with NodeStatus(%v)", obj) } - return nil } // RefreshNode keeps the heartbeats with etcd -func (r *EtcdRegistry) RefreshNode(ctx context.Context, nodeID string, ttl int64) error { - aliveKey := r.prefixed(nodePrefix, nodeID, "alive") +func (r *EtcdRegistry) RefreshNode(pctx context.Context, nodeID string, ttl int64) error { + ctx, cancel := context.WithTimeout(pctx, r.reqTimeout) + defer cancel() + aliveKey := r.prefixed(nodePrefix, nodeID, "alive") // try to touch alive state of node, update ttl - if err := r.client.Update(ctx, aliveKey, "", ttl); err != nil { + if err := r.client.UpdateOrCreate(ctx, aliveKey, "", ttl); err != nil { return errors.Trace(err) } return nil @@ -119,14 +129,12 @@ func (r *EtcdRegistry) RefreshNode(ctx context.Context, nodeID string, ttl int64 func nodeStatusFromEtcdNode(nodeID string, node *etcd.Node) (*NodeStatus, error) { status := &NodeStatus{} - var isAlive bool for key, n := range node.Childs { switch key { case "object": - if err := json.Unmarshal(n.Value, &status); err != nil { - log.Errorf("Error unmarshaling NodeStatus, nodeID: %s, %v", nodeID, err) - return nil, errors.Trace(err) + if err := json.Unmarshal(n.Value, status); err != nil { + return nil, errors.Annotatef(err, "error unmarshal NodeStatus with nodeID(%s)", nodeID) } case "alive": isAlive = true diff --git a/pump/registry_test.go b/pump/registry_test.go index 95e8257fc..5b6ccb479 100644 --- a/pump/registry_test.go +++ b/pump/registry_test.go @@ -6,7 +6,6 @@ import ( "github.com/coreos/etcd/integration" "github.com/pingcap/tidb-binlog/pkg/etcd" - pb "github.com/pingcap/tidb-binlog/proto" "golang.org/x/net/context" ) @@ -19,45 +18,32 @@ func TestUpdateNodeInfo(t *testing.T) { nodeID := "test1" host := "mytest" - ctx, cancel := context.WithTimeout(context.Background(), r.reqTimeout) - defer cancel() - err := r.RegisterNode(ctx, nodeID, host) + err := r.RegisterNode(context.Background(), nodeID, host) if err != nil { t.Fatal(err) } - status, err := r.Node(ctx, nodeID) + status, err := r.Node(context.Background(), nodeID) if err != nil { t.Fatal(err) } - if status.NodeID != nodeID || len(status.Offsets) != 0 { + if status.NodeID != nodeID || status.Host != host { t.Fatalf("node info have error : %v", status) } - newNodeStatus := &NodeStatus{ - NodeID: nodeID, - Host: host, - Offsets: make(map[string]*pb.Pos), - } - - newNodeStatus.Offsets["cluster1"] = &pb.Pos{ - Suffix: 1, - Offset: 1, - } - - err = r.UpdateNodeStatus(ctx, nodeID, newNodeStatus) + host = "localhost:1234" + err = r.UpdateNode(context.Background(), nodeID, host) if err != nil { t.Fatal(err) } - status, err = r.Node(ctx, nodeID) + status, err = r.Node(context.Background(), nodeID) if err != nil { t.Fatal(err) } - if status.NodeID != nodeID || status.Offsets["cluster1"].Suffix != 1 || - status.Offsets["cluster1"].Offset != 1 { + if status.NodeID != nodeID || status.Host != host { t.Fatalf("node info have error : %v", status) } } @@ -71,34 +57,32 @@ func TestRefreshNode(t *testing.T) { nodeID := "test1" host := "mytest" - ctx, cancel := context.WithTimeout(context.Background(), r.reqTimeout) - defer cancel() - err := r.RegisterNode(ctx, nodeID, host) + err := r.RegisterNode(context.Background(), nodeID, host) if err != nil { t.Fatal(err) } - err = r.RefreshNode(ctx, nodeID, 2) + err = r.RefreshNode(context.Background(), nodeID, 2) if err != nil { t.Fatal(err) } - status, err := r.Node(ctx, nodeID) + status, err := r.Node(context.Background(), nodeID) if err != nil { t.Fatal(err) } - if status.NodeID != nodeID || len(status.Offsets) != 0 || !status.IsAlive { + if status.NodeID != nodeID || !status.IsAlive { t.Fatalf("node info have error : %v", status) } time.Sleep(3 * time.Second) - status, err = r.Node(ctx, nodeID) + status, err = r.Node(context.Background(), nodeID) if err != nil { t.Fatal(err) } - if status.NodeID != nodeID || len(status.Offsets) != 0 || status.IsAlive { + if status.NodeID != nodeID || status.IsAlive { t.Fatalf("node info have error : %v", status) } } diff --git a/pump/server.go b/pump/server.go index 30e3fff70..342c894cb 100644 --- a/pump/server.go +++ b/pump/server.go @@ -1,36 +1,190 @@ package pump import ( + "fmt" "net" "net/url" + "os" + "path" + "sync" + "github.com/juju/errors" "github.com/ngaut/log" + "github.com/pingcap/tidb-binlog/pkg/file" pb "github.com/pingcap/tidb-binlog/proto" "golang.org/x/net/context" "google.golang.org/grpc" ) -type server struct{} +// Pump server implements the gRPC interface, +// and maintains pump's status at run time. +type pumpServer struct { + // RWMutex protects dispatcher + sync.RWMutex -func (s *server) WriteBinlog(ctx context.Context, in *pb.WriteBinlogReq) (*pb.WriteBinlogResp, error) { - return nil, nil + // dispatcher keeps all opened binloggers which is indexed by clusterID. + dispatcher map[string]Binlogger + + // dataDir is the root directory of all pump data + // | + // +-- .node + // | | + // | +-- nodeID + // | + // +-- clusters + // | + // +-- 100 + // | | + // | +-- binlog.000001 + // | | + // | +-- binlog.000002 + // | | + // | +-- ... + // | + // +-- 200 + // | + // +-- binlog.000001 + // | + // +-- binlog.000002 + // | + // +-- ... + // + dataDir string + + // node maintain the status of this pump and interact with etcd registry + node Node +} + +func newPumpServer(cfg *Config, n Node) *pumpServer { + return &pumpServer{ + dispatcher: make(map[string]Binlogger), + dataDir: cfg.DataDir, + node: n, + } } -func (s *server) PullBinlogs(ctx context.Context, in *pb.PullBinlogReq) (*pb.PullBinlogResp, error) { - return nil, nil +// init scan the dataDir to find all clusterIDs, and for each to create binlogger, +// then add them to dispathcer map +func (s *pumpServer) init() error { + clusterDir := path.Join(s.dataDir, "clusters") + if !file.Exist(clusterDir) { + if err := os.MkdirAll(clusterDir, file.PrivateDirMode); err != nil { + return errors.Trace(err) + } + } + + names, err := file.ReadDir(clusterDir) + if err != nil { + return errors.Trace(err) + } + + for _, n := range names { + binlogDir := path.Join(clusterDir, n) + binlogger, err := OpenBinlogger(binlogDir) + if err != nil { + return errors.Trace(err) + } + s.dispatcher[n] = binlogger + } + return nil +} + +func (s *pumpServer) getBinloggerToWrite(cid string) (Binlogger, error) { + s.Lock() + defer s.Unlock() + blr, ok := s.dispatcher[cid] + if ok { + return blr, nil + } + newblr, err := CreateBinlogger(path.Join(s.dataDir, "clusters", cid)) + if err != nil { + return nil, errors.Trace(err) + } + s.dispatcher[cid] = newblr + return newblr, nil +} + +func (s *pumpServer) getBinloggerToRead(cid string) (Binlogger, error) { + s.RLock() + defer s.RUnlock() + blr, ok := s.dispatcher[cid] + if ok { + return blr, nil + } + return nil, errors.NotFoundf("no binlogger of clusterID: %s", cid) } +func (s *pumpServer) WriteBinlog(ctx context.Context, in *pb.WriteBinlogReq) (*pb.WriteBinlogResp, error) { + cid := fmt.Sprintf("%d", in.ClusterID) + ret := &pb.WriteBinlogResp{} + binlogger, err := s.getBinloggerToWrite(cid) + if err != nil { + ret.Errmsg = err.Error() + return ret, err + } + if err := binlogger.WriteTail(in.Payload); err != nil { + ret.Errmsg = err.Error() + return ret, err + } + return ret, nil +} + +func (s *pumpServer) PullBinlogs(ctx context.Context, in *pb.PullBinlogReq) (*pb.PullBinlogResp, error) { + cid := fmt.Sprintf("%d", in.ClusterID) + ret := &pb.PullBinlogResp{} + binlogger, err := s.getBinloggerToRead(cid) + if err != nil { + if errors.IsNotFound(err) { + // return an empty slice and a nil error + ret.Binlogs = []pb.Binlog{} + return ret, nil + } + ret.Errmsg = err.Error() + return ret, err + } + binlogs, err := binlogger.ReadFrom(in.StartFrom, in.Batch) + if err != nil { + ret.Errmsg = err.Error() + return ret, err + } + ret.Binlogs = binlogs + return ret, nil +} + +// Start runs PumpServer to serve the listening port, and maintains heartbeat to Etcd func Start(cfg *Config) { + node, err := NewPumpNode(cfg) + if err != nil { + log.Fatalf("fail to create node, %v", err) + } + if err := node.Register(context.Background()); err != nil { + log.Fatalf("fail to register node to etcd, %v", err) + } + done := make(chan struct{}) + defer close(done) + errc := node.Heartbeat(context.Background(), done) + go func() { + for err := range errc { + log.Error(err) + } + }() + + server := newPumpServer(cfg, node) + if err := server.init(); err != nil { + log.Fatalf("fail to initialize pump server, %v", err) + } + + // start to listen u, err := url.Parse(cfg.ListenAddr) if err != nil { - log.Fatalf("bad configuration of listening addr: %s, error: %v", cfg.ListenAddr, err) + log.Fatalf("invalid configuration of listening addr: %s, error: %v", cfg.ListenAddr, err) } lis, err := net.Listen("tcp", u.Host) if err != nil { - log.Fatalf("failed to listen: %v", err) + log.Fatalf("fail to listen on: %s, %v", u.Host, err) } - + // start a gRPC server and register the pump server with it s := grpc.NewServer() - pb.RegisterPumpServer(s, &server{}) + pb.RegisterPumpServer(s, server) s.Serve(lis) } diff --git a/pump/util.go b/pump/util.go index 50f134509..536919437 100644 --- a/pump/util.go +++ b/pump/util.go @@ -1,13 +1,9 @@ package pump import ( - "crypto/sha1" "fmt" - "io" - "io/ioutil" "math/rand" "os" - "path/filepath" "strings" "github.com/juju/errors" @@ -20,8 +16,8 @@ var ( ) // InitLogger initalizes Pump's logger. -func InitLogger(cfg *Config) { - if cfg.Debug { +func InitLogger(isDebug bool) { + if isDebug { log.SetLevelByString("debug") } else { log.SetLevelByString("info") @@ -29,50 +25,9 @@ func InitLogger(cfg *Config) { log.SetHighlighting(false) } -func readLocalNodeID(dir string) (string, error) { - fullPath := filepath.Join(dir, nodeDir, nodeIDFile) - if _, err := CheckFileExist(fullPath); err != nil { - return generateLocalNodeID(dir) - } - - // read the node ID from file - hash, err := ioutil.ReadFile(fullPath) - if err != nil { - return "", errors.Trace(err) - } - nodeID := fmt.Sprintf("%X", hash) - if len(nodeID) == 0 { - return generateLocalNodeID(dir) - } - - return nodeID, nil -} - -// generate a new node ID, and save it to file -func generateLocalNodeID(dirpath string) (string, error) { - rand64 := string(KRand(64, 3)) - log.Debugf("Generated a randomized string with 64 runes, %s", rand64) - t := sha1.New() - io.WriteString(t, rand64) - hash := t.Sum(nil) - - // dir not exists, make it - dir := filepath.Join(dirpath, nodeDir) - if err := os.MkdirAll(dir, os.ModePerm); err != nil { - return "", errors.Trace(err) - } - - file := filepath.Join(dir, nodeIDFile) - if err := ioutil.WriteFile(file, hash, os.ModePerm); err != nil { - return "", errors.Trace(err) - } - nodeID := fmt.Sprintf("%X", hash) - return nodeID, nil -} - // KRand is an algorithm that compute rand nums func KRand(size int, kind int) []byte { - ikind, kinds, result := kind, [][]int{[]int{10, 48}, []int{26, 97}, []int{26, 65}}, make([]byte, size) + ikind, kinds, result := kind, [][]int{{10, 48}, {26, 97}, {26, 65}}, make([]byte, size) isAll := kind > 2 || kind < 0 for i := 0; i < size; i++ { if isAll { // random ikind diff --git a/pump/version.go b/pump/version.go index 2e8e548be..3e4235383 100644 --- a/pump/version.go +++ b/pump/version.go @@ -7,13 +7,16 @@ import ( ) var ( + // Version defines the version of pump Version = "1.0.0+git" - // GitSHA and BuildTS will be set during make - GitSHA = "Not provided (use make build instead of go build)" + // GitSHA will be set during make + GitSHA = "Not provided (use make build instead of go build)" + // BuildTS and BuildTS will be set during make BuildTS = "Not provided (use make build instead of go build)" ) +// PrintVersionInfo show version info to stdout func PrintVersionInfo() { log.Infof("pump Version: %s\n", Version) log.Infof("Git SHA: %s\n", GitSHA)