From 7da1aadc973fa4efda9e831d590c8c22d8c0a0ce Mon Sep 17 00:00:00 2001 From: hanjinming Date: Mon, 30 Oct 2017 18:37:32 +0800 Subject: [PATCH] fix block at send nil chan, optimize TaskManager.PushTasksUpdate() --- conns.go | 5 ++++- tasks.go | 37 +++++++++++++++++++++++++++++-------- 2 files changed, 33 insertions(+), 9 deletions(-) diff --git a/conns.go b/conns.go index 365bce4..89863f9 100644 --- a/conns.go +++ b/conns.go @@ -93,7 +93,10 @@ func (cm *ConnectionsManger) Broadcast(msgObj interface{}) { cm.mutex.Lock() defer cm.mutex.Unlock() for _, ch := range cm.conns { - ch <- msgObj + select { + case ch <- msgObj: + default: + } } } diff --git a/tasks.go b/tasks.go index 29ff6db..1be1432 100644 --- a/tasks.go +++ b/tasks.go @@ -12,6 +12,7 @@ import ( "time" //"syscall" "net/url" + "runtime" ) type TasksManager struct { @@ -27,7 +28,7 @@ func NewTasksManager(downloadDir string, limitByteSize int64, limitTimeout time. return &TasksManager{ tasks: make([]Task, 0, 64), ConnectionsManger: NewConnectionsManger(), - PushTasksUpdateChan: make(chan struct{}, 1), + PushTasksUpdateChan: make(chan struct{}, 2), downloadDir: downloadDir, limitByteSize: limitByteSize, limitTimeout: limitTimeout, @@ -50,7 +51,7 @@ func (m *TasksManager) WebSocketHandler(w http.ResponseWriter, r *http.Request) log.Infof("new connection from %s, number of active connections %d", conn.RemoteAddr(), len(m.ConnectionsManger.conns)+1) m.ConnectionsManger.Add(conn) // 首次连接,推送文件信息 - m.PushTasksUpdateChan <- struct{}{} + m.PushTasksUpdate() } func (m *TasksManager) TaskHandler(w http.ResponseWriter, r *http.Request) { @@ -106,18 +107,18 @@ func (m *TasksManager) TaskHandler(w http.ResponseWriter, r *http.Request) { if err != nil { log.Errorf("task download error:%s, task name:%s", err, task.FileName()) } - m.PushTasksUpdateChan <- struct{}{} + m.PushTasksUpdate() }(m) // 添加任务后,推送文件信息 w.WriteHeader(http.StatusCreated) w.Write([]byte("CREATE OK")) - m.PushTasksUpdateChan <- struct{}{} + m.PushTasksUpdate() return case http.MethodDelete: log.Infof("[TaskHandler]delete %s", filename) // 删除文件 defer func() { - m.PushTasksUpdateChan <- struct{}{} + m.PushTasksUpdate() }() if filename == "" { w.WriteHeader(http.StatusBadRequest) @@ -141,12 +142,32 @@ func (m *TasksManager) TaskHandler(w http.ResponseWriter, r *http.Request) { w.WriteHeader(http.StatusOK) w.Write([]byte("DELETE OK")) log.Infof("[TaskHandler]delete ok, %s", filename) - m.PushTasksUpdateChan <- struct{}{} + m.PushTasksUpdate() default: w.WriteHeader(http.StatusBadRequest) } } +func (m *TasksManager) PushTasksUpdate() { + select { + case m.PushTasksUpdateChan <- struct{}{}: + //log.Debugf("PushTasksUpdate") + default: + pcs := make([]uintptr, 5) + runtime.Callers(1, pcs) + frames := runtime.CallersFrames(pcs) + var callers []string + for { + frame, more := frames.Next() + if !more { + break + } + callers = append(callers, fmt.Sprintf("[%s:%d:%s]", frame.File, frame.Line, frame.Function)) + } + log.Warnf("%vPushTasksUpdate timeOut", callers) + } +} + // HasDownloadingTask 检查是否有正在下载的任务 func (m *TasksManager) HasDownloadingTask() bool { for _, v := range m.tasks { @@ -262,7 +283,7 @@ func (m *TasksManager) ReDownloadUncompleted() { if err != nil { log.Errorf("task download error:%s, task name:%s", err, task.FileName()) } - m.PushTasksUpdateChan <- struct{}{} + m.PushTasksUpdate() }(m) } } @@ -328,7 +349,7 @@ func (m *TasksManager) PushTasksUpdateWorker() { case <-ticker.C: if m.HasDownloadingTask() && m.ConnectionsManger.Count() > 0 { // 当有文件在下载且有连接时, 推送下载任务更新信息 - m.PushTasksUpdateChan <- struct{}{} + m.PushTasksUpdate() } } }