Skip to content

Commit

Permalink
refact(events): remove timeout for listeners
Browse files Browse the repository at this point in the history
  • Loading branch information
knight42 committed May 11, 2018
1 parent 4b47e58 commit 8953697
Show file tree
Hide file tree
Showing 2 changed files with 12 additions and 45 deletions.
14 changes: 7 additions & 7 deletions core/docker.go
Original file line number Diff line number Diff line change
Expand Up @@ -201,7 +201,7 @@ func (c *Core) Sync(opts SyncOptions) (*Container, error) {
envs.Set("OWNER", r.User)
envs.Set("BIND_ADDRESS", r.BindIP)
envs.SetInt("RETRY", r.Retry)
envs.SetInt("LOG_ROTATE_CYCLE", int(r.LogRotCycle))
envs.SetInt("LOG_ROTATE_CYCLE", r.LogRotCycle)
if opts.Debug {
envs.Set("DEBUG", "true")
} else {
Expand All @@ -215,7 +215,7 @@ func (c *Core) Sync(opts SyncOptions) (*Container, error) {

if opts.MountDir {
logdir := path.Join(opts.LogDir, opts.Name)
if err := os.MkdirAll(logdir, os.ModePerm); err != nil {
if err = os.MkdirAll(logdir, os.ModePerm); err != nil {
return nil, fmt.Errorf("not a directory: %s", logdir)
}
if !common.DirExists(r.StorageDir) {
Expand Down Expand Up @@ -264,14 +264,12 @@ func (c *Core) Sync(opts SyncOptions) (*Container, error) {

// WaitForSync emits `SyncStart` event at first, then blocks until the container stops and emits the `SyncEnd` event.
func (c *Core) WaitForSync(ct Container) error {
if err := events.Emit(events.Payload{
events.Emit(events.Payload{
Evt: events.SyncStart,
Attrs: events.M{
"Name": ct.Labels["org.ustcmirror.name"],
},
}); err != nil {
return err
}
})

code, err := c.Docker.WaitContainer(ct.ID)
if err != nil {
Expand All @@ -287,7 +285,7 @@ func (c *Core) WaitForSync(ct Container) error {
return fmt.Errorf("missing label: org.ustcmirror.storage-dir")
}

return events.Emit(events.Payload{
events.Emit(events.Payload{
Evt: events.SyncEnd,
Attrs: events.M{
"ID": ct.ID,
Expand All @@ -296,4 +294,6 @@ func (c *Core) WaitForSync(ct Container) error {
"ExitCode": code,
},
})

return nil
}
43 changes: 5 additions & 38 deletions events/main.go
Original file line number Diff line number Diff line change
@@ -1,12 +1,6 @@
// Package events implements functions for listening or emitting events.
package events

import (
"fmt"
"sync"
"time"
)

// Event is an alias of int.
type Event = int

Expand Down Expand Up @@ -35,8 +29,6 @@ type Listener func(data Payload)

var (
globalEmitter *Emitter
// ErrTimeout is returned for an expired deadline.
ErrTimeout = fmt.Errorf("timeout")
)

// Emitter is the event emitter.
Expand All @@ -54,8 +46,8 @@ func On(evt Event, listener Listener) *Emitter {
}

// Emit triggers global events.
func Emit(payload Payload) error {
return globalEmitter.Emit(payload)
func Emit(payload Payload) {
globalEmitter.Emit(payload)
}

// On registers a listener for the given event.
Expand All @@ -64,42 +56,17 @@ func (e *Emitter) On(evt Event, listener Listener) *Emitter {
return e
}

func waitTimeout(wg *sync.WaitGroup, timeout time.Duration) error {
c := make(chan struct{})
go func() {
wg.Wait()
close(c)
}()
select {
case <-c:
return nil
case <-time.After(timeout):
return ErrTimeout
}
}

// Emit emits the given event with the padload.
func (e *Emitter) Emit(payload Payload) error {
var (
wg sync.WaitGroup
)

func (e *Emitter) Emit(payload Payload) {
evt := payload.Evt
listeners, ok := e.listeners[evt]
if !ok {
return nil
return
}

wg.Add(len(listeners))

for _, fn := range listeners {
go func(fn Listener) {
defer wg.Done()
fn(payload)
}(fn)
go fn(payload)
}

return waitTimeout(&wg, 5*time.Second)
}

// NewEmitter returns an instance of Emitter.
Expand Down

0 comments on commit 8953697

Please sign in to comment.