Skip to content

Commit

Permalink
fix block at send nil chan, optimize TaskManager.PushTasksUpdate()
Browse files Browse the repository at this point in the history
  • Loading branch information
hanjinming committed Oct 30, 2017
1 parent 7178de6 commit 7da1aad
Show file tree
Hide file tree
Showing 2 changed files with 33 additions and 9 deletions.
5 changes: 4 additions & 1 deletion conns.go
Original file line number Diff line number Diff line change
Expand Up @@ -93,7 +93,10 @@ func (cm *ConnectionsManger) Broadcast(msgObj interface{}) {
cm.mutex.Lock()
defer cm.mutex.Unlock()
for _, ch := range cm.conns {
ch <- msgObj
select {
case ch <- msgObj:
default:
}
}
}

Expand Down
37 changes: 29 additions & 8 deletions tasks.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ import (
"time"
//"syscall"
"net/url"
"runtime"
)

type TasksManager struct {
Expand All @@ -27,7 +28,7 @@ func NewTasksManager(downloadDir string, limitByteSize int64, limitTimeout time.
return &TasksManager{
tasks: make([]Task, 0, 64),
ConnectionsManger: NewConnectionsManger(),
PushTasksUpdateChan: make(chan struct{}, 1),
PushTasksUpdateChan: make(chan struct{}, 2),
downloadDir: downloadDir,
limitByteSize: limitByteSize,
limitTimeout: limitTimeout,
Expand All @@ -50,7 +51,7 @@ func (m *TasksManager) WebSocketHandler(w http.ResponseWriter, r *http.Request)
log.Infof("new connection from %s, number of active connections %d", conn.RemoteAddr(), len(m.ConnectionsManger.conns)+1)
m.ConnectionsManger.Add(conn)
// 首次连接,推送文件信息
m.PushTasksUpdateChan <- struct{}{}
m.PushTasksUpdate()
}

func (m *TasksManager) TaskHandler(w http.ResponseWriter, r *http.Request) {
Expand Down Expand Up @@ -106,18 +107,18 @@ func (m *TasksManager) TaskHandler(w http.ResponseWriter, r *http.Request) {
if err != nil {
log.Errorf("task download error:%s, task name:%s", err, task.FileName())
}
m.PushTasksUpdateChan <- struct{}{}
m.PushTasksUpdate()
}(m)
// 添加任务后,推送文件信息
w.WriteHeader(http.StatusCreated)
w.Write([]byte("CREATE OK"))
m.PushTasksUpdateChan <- struct{}{}
m.PushTasksUpdate()
return
case http.MethodDelete:
log.Infof("[TaskHandler]delete %s", filename)
// 删除文件
defer func() {
m.PushTasksUpdateChan <- struct{}{}
m.PushTasksUpdate()
}()
if filename == "" {
w.WriteHeader(http.StatusBadRequest)
Expand All @@ -141,12 +142,32 @@ func (m *TasksManager) TaskHandler(w http.ResponseWriter, r *http.Request) {
w.WriteHeader(http.StatusOK)
w.Write([]byte("DELETE OK"))
log.Infof("[TaskHandler]delete ok, %s", filename)
m.PushTasksUpdateChan <- struct{}{}
m.PushTasksUpdate()
default:
w.WriteHeader(http.StatusBadRequest)
}
}

func (m *TasksManager) PushTasksUpdate() {
select {
case m.PushTasksUpdateChan <- struct{}{}:
//log.Debugf("PushTasksUpdate")
default:
pcs := make([]uintptr, 5)
runtime.Callers(1, pcs)
frames := runtime.CallersFrames(pcs)
var callers []string
for {
frame, more := frames.Next()
if !more {
break
}
callers = append(callers, fmt.Sprintf("[%s:%d:%s]", frame.File, frame.Line, frame.Function))
}
log.Warnf("%vPushTasksUpdate timeOut", callers)
}
}

// HasDownloadingTask 检查是否有正在下载的任务
func (m *TasksManager) HasDownloadingTask() bool {
for _, v := range m.tasks {
Expand Down Expand Up @@ -262,7 +283,7 @@ func (m *TasksManager) ReDownloadUncompleted() {
if err != nil {
log.Errorf("task download error:%s, task name:%s", err, task.FileName())
}
m.PushTasksUpdateChan <- struct{}{}
m.PushTasksUpdate()
}(m)
}
}
Expand Down Expand Up @@ -328,7 +349,7 @@ func (m *TasksManager) PushTasksUpdateWorker() {
case <-ticker.C:
if m.HasDownloadingTask() && m.ConnectionsManger.Count() > 0 {
// 当有文件在下载且有连接时, 推送下载任务更新信息
m.PushTasksUpdateChan <- struct{}{}
m.PushTasksUpdate()
}
}
}
Expand Down

0 comments on commit 7da1aad

Please sign in to comment.