Skip to content

Commit

Permalink
feat: 新增超时功能与回调方法功能
Browse files Browse the repository at this point in the history
  • Loading branch information
googs1025 committed Aug 2, 2023
1 parent 55f3845 commit 4b87d5c
Show file tree
Hide file tree
Showing 11 changed files with 235 additions and 83 deletions.
69 changes: 39 additions & 30 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -14,26 +14,30 @@

func TestTaskPool1(t *testing.T) {

// 建立一个工作池
// input:池数量
pool := workerpool.NewPool(5)

pool := workerpool.NewPool(5, workerpool.WithTimeout(1), workerpool.WithErrorCallback(func(err error) {
fmt.Println("WithErrorCallback")
if err != nil {
panic(err)
}
}), workerpool.WithResultCallback(func(i interface{}) {
fmt.Println("result: ", i)
}))

// 需要处理的任务
tt := func(data interface{}) error {
tt := func(data interface{}) (interface{}, error) {
taskID := data.(int)
// 业务逻辑

time.Sleep(100 * time.Millisecond)
klog.Info("Task ", taskID, " processed")
return nil
return nil, nil
}

// 准备多个个任务
for i := 1; i <= 1000; i++ {

// 需要做的任务
task := workerpool.NewTask(tt, i)
task := workerpool.NewTaskInstance(fmt.Sprintf("task-%v", i), i, tt)

// 所有的任务放入全局队列中
pool.AddGlobalQueue(task)
Expand All @@ -59,51 +63,56 @@ func TestTaskPool2(t *testing.T) {
// 建立一个池,
// input:池数量

pool := workerpool.NewPool(5)

//pool := workerpool.NewPool(5)
pool := workerpool.NewPool(5, workerpool.WithTimeout(1), workerpool.WithErrorCallback(func(err error) {
if err != nil {
panic(err)
}
}), workerpool.WithResultCallback(func(i interface{}) {
fmt.Println("result: ", i)
}))

// 准备100个任务
for i := 1; i <= 100; i++ {

// 需要做的任务
task := workerpool.NewTask(func(data interface{}) error {
taskID := data.(int)

/*
业务逻辑
*/
time.Sleep(100 * time.Millisecond)
klog.Info("Task ", taskID, " processed")
return nil
}, i)
// 需要做的任务
task := workerpool.NewTaskInstance(fmt.Sprintf("task-%v", i), i, func(data interface{}) (interface{}, error) {
taskID := data.(int)

/*
业务逻辑
*/
time.Sleep(100 * time.Millisecond)
klog.Info("Task ", taskID, " processed")
return nil, nil
})

// 所有的任务放入list中
pool.AddGlobalQueue(task)
}


// 启动在后台等待执行
go pool.RunBackground()


for {
taskID := rand.Intn(100) + 20

// 模拟一个退出条件
//// 模拟一个退出条件
if taskID%7 == 0 {
klog.Info("taskID: ", taskID, "pool stop!")
pool.StopBackground()
break
}

time.Sleep(time.Duration(rand.Intn(5)) * time.Second)
// 模拟后续加入pool
task := workerpool.NewTask(func(data interface{}) error {
task := workerpool.NewTaskInstance(fmt.Sprintf("task-%v", taskID), taskID, func(data interface{}) (interface{}, error) {
taskID := data.(int)
time.Sleep(100 * time.Millisecond)
time.Sleep(3 * time.Second)
klog.Info("Task ", taskID, " processed")
return nil
}, taskID)
return nil, nil
})

pool.AddTask(task)
}

Expand Down
6 changes: 6 additions & 0 deletions example/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
### 示例:

- example1_test.go:同步等待运行的协程池示例
- example2_test.go:异步等待运行的协程池示例
- scheduler包:简易型调度器
- http_example包:模拟httpServer暴露调度接口示例
10 changes: 6 additions & 4 deletions example/example1_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,16 +20,18 @@ func TestTaskPool1(t *testing.T) {

// 建立一个工作池
// input:池数量
pool := workerpool.NewPool(5)
pool := workerpool.NewPool(5, workerpool.WithTimeout(1), workerpool.WithResultCallback(func(i interface{}) {
fmt.Println("result: ", i)
}))

// 需要处理的任务
tt := func(data interface{}) error {
tt := func(data interface{}) (interface{}, error) {
taskID := data.(int)
// 业务逻辑

time.Sleep(100 * time.Millisecond)
klog.Info("Task ", taskID, " processed")
return nil
return nil, nil
}

// 准备多个个任务
Expand All @@ -43,4 +45,4 @@ func TestTaskPool1(t *testing.T) {
}
pool.Run() // 启动

}
}
23 changes: 15 additions & 8 deletions example/example2_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,21 +22,28 @@ func TestTaskPool2(t *testing.T) {
// 建立一个池,
// input:池数量

pool := workerpool.NewPool(5)
//pool := workerpool.NewPool(5)
pool := workerpool.NewPool(5, workerpool.WithTimeout(1), workerpool.WithErrorCallback(func(err error) {
if err != nil {
panic(err)
}
}), workerpool.WithResultCallback(func(i interface{}) {
fmt.Println("result: ", i)
}))

// 准备100个任务
for i := 1; i <= 100; i++ {

// 需要做的任务
task := workerpool.NewTaskInstance(fmt.Sprintf("task-%v", i), i, func(data interface{}) error {
task := workerpool.NewTaskInstance(fmt.Sprintf("task-%v", i), i, func(data interface{}) (interface{}, error) {
taskID := data.(int)

/*
业务逻辑
*/
time.Sleep(100 * time.Millisecond)
klog.Info("Task ", taskID, " processed")
return nil
return nil, nil
})

// 所有的任务放入list中
Expand All @@ -49,7 +56,7 @@ func TestTaskPool2(t *testing.T) {
for {
taskID := rand.Intn(100) + 20

// 模拟一个退出条件
//// 模拟一个退出条件
if taskID%7 == 0 {
klog.Info("taskID: ", taskID, "pool stop!")
pool.StopBackground()
Expand All @@ -58,15 +65,15 @@ func TestTaskPool2(t *testing.T) {

time.Sleep(time.Duration(rand.Intn(5)) * time.Second)
// 模拟后续加入pool
task := workerpool.NewTaskInstance(fmt.Sprintf("task-%v", taskID), taskID, func(data interface{}) error {
task := workerpool.NewTaskInstance(fmt.Sprintf("task-%v", taskID), taskID, func(data interface{}) (interface{}, error) {
taskID := data.(int)
time.Sleep(100 * time.Millisecond)
time.Sleep(3 * time.Second)
klog.Info("Task ", taskID, " processed")
return nil
return nil, nil
})

pool.AddTask(task)
}

fmt.Println("finished...")
}
}
2 changes: 1 addition & 1 deletion example/http_example/cmd/cmd.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,4 +28,4 @@ func Execute() {
fmt.Printf("cmd err: %s\n", err)
os.Exit(1)
}
}
}
6 changes: 3 additions & 3 deletions example/http_example/pkg/server/model/task.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,17 +31,17 @@ func (my *MyTask) ChooseTaskType() {
}
}

func (my *MyTask) Execute() error {
func (my *MyTask) Execute() (interface{}, error) {

my.Status = TaskRunning

if err := my.f(my.Input); err != nil {
my.Err = err
my.Status = TaskFail
return err
return nil, err
}
my.Status = TaskSuccess
return nil
return nil, nil
}

func (my *MyTask) GetTaskName() string {
Expand Down
4 changes: 2 additions & 2 deletions example/scheduler/scheduler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,9 +12,9 @@ func TestScheduler(t *testing.T) {

s.Start()

tsk := workerpool.NewTaskInstance("task1", "aaa", func(i interface{}) error {
tsk := workerpool.NewTaskInstance("task1", "aaa", func(i interface{}) (interface{}, error) {
fmt.Println(i)
return nil
return nil, nil
})

s.AddTask(tsk)
Expand Down
27 changes: 27 additions & 0 deletions pkg/workerpool/option.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
package workerpool

import "time"

// Option 选项模式
type Option func(pool *Pool)

// WithTimeout 设置超时时间
func WithTimeout(timeout time.Duration) Option {
return func(p *Pool) {
p.timeout = timeout
}
}

// WithResultCallback 设置结果回调方法
func WithResultCallback(callback func(interface{})) Option {
return func(p *Pool) {
p.resultCallback = callback
}
}

// WithErrorCallback 设置错误回调方法
func WithErrorCallback(callback func(error)) Option {
return func(p *Pool) {
p.errorCallback = callback
}
}
Loading

0 comments on commit 4b87d5c

Please sign in to comment.