Skip to content
This repository has been archived by the owner on Feb 14, 2023. It is now read-only.

Commit

Permalink
Return entities as endpoints
Browse files Browse the repository at this point in the history
  • Loading branch information
kasisnu committed Feb 17, 2016
1 parent 6b72490 commit 669267a
Show file tree
Hide file tree
Showing 2 changed files with 23 additions and 11 deletions.
8 changes: 4 additions & 4 deletions serverset.go
Original file line number Diff line number Diff line change
Expand Up @@ -121,9 +121,9 @@ func (ss *ServerSet) createFullPath(connection *zk.Conn) error {

// structure of the data in each member znode
// Mimics finagle serverset structure.
type entity struct {
type Entity struct {
ServiceEndpoint endpoint `json:"serviceEndpoint"`
AdditionalEndpoints map[string]endpoint `json:"additionalEndpoints"` // unused
AdditionalEndpoints map[string]endpoint `json:"additionalEndpoints"`
Status string `json:"status"`
}

Expand All @@ -132,8 +132,8 @@ type endpoint struct {
Port int `json:"port"`
}

func newEntity(host string, port int) *entity {
return &entity{
func newEntity(host string, port int) *Entity {
return &Entity{
ServiceEndpoint: endpoint{host, port},
AdditionalEndpoints: make(map[string]endpoint),
Status: statusAlive,
Expand Down
26 changes: 19 additions & 7 deletions watch.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ type Watch struct {

// lock for read/writing the endpoints slice
lock sync.RWMutex
endpoints []string
endpoints []Entity
}

// Watch creates a new watch on this server set. Changes to the set will
Expand Down Expand Up @@ -121,6 +121,19 @@ func (ss *ServerSet) Watch() (*Watch, error) {
func (w *Watch) Endpoints() []string {
w.lock.RLock()
defer w.lock.RUnlock()
endpoints := make([]string, 0, len(w.endpoints))
for _, e := range w.endpoints {
endpoints = append(endpoints, net.JoinHostPort(e.ServiceEndpoint.Host, strconv.Itoa(e.ServiceEndpoint.Port)))
}

sort.Strings(endpoints)
return endpoints
}

// EndpointEntities returns a slice of the current list of Entites associated with this watch, collected at the last event.
func (w *Watch) EndpointEntities() []Entity {
w.lock.RLock()
defer w.lock.RUnlock()

return w.endpoints
}
Expand Down Expand Up @@ -172,8 +185,8 @@ func (w *Watch) watch(connection *zk.Conn) ([]string, <-chan zk.Event, error) {
return children, events, err
}

func (w *Watch) updateEndpoints(connection *zk.Conn, keys []string) ([]string, error) {
endpoints := make([]string, 0, len(keys))
func (w *Watch) updateEndpoints(connection *zk.Conn, keys []string) ([]Entity, error) {
endpoints := make([]Entity, 0, len(keys))

for _, k := range keys {
if !strings.HasPrefix(k, MemberPrefix) {
Expand All @@ -191,16 +204,15 @@ func (w *Watch) updateEndpoints(connection *zk.Conn, keys []string) ([]string, e
}

if e.Status == statusAlive {
endpoints = append(endpoints, net.JoinHostPort(e.ServiceEndpoint.Host, strconv.Itoa(e.ServiceEndpoint.Port)))
endpoints = append(endpoints, *e)
}
}

sort.Strings(endpoints)
return endpoints, nil

}

func (w *Watch) getEndpoint(connection *zk.Conn, key string) (*entity, error) {
func (w *Watch) getEndpoint(connection *zk.Conn, key string) (*Entity, error) {

data, _, err := connection.Get(w.serverSet.directoryPath() + "/" + key)
if err == zk.ErrNoNode {
Expand All @@ -221,7 +233,7 @@ func (w *Watch) getEndpoint(connection *zk.Conn, key string) (*entity, error) {
return w.getEndpoint(connection, key)
}

e := &entity{}
e := &Entity{}
err = json.Unmarshal(data, &e)
if err != nil {
return nil, err
Expand Down

0 comments on commit 669267a

Please sign in to comment.