Skip to content

Commit

Permalink
refector: 抽象Pool接口对象
Browse files Browse the repository at this point in the history
  • Loading branch information
googs1025 committed Mar 6, 2024
1 parent b7020a0 commit 74ab4aa
Show file tree
Hide file tree
Showing 11 changed files with 106 additions and 57 deletions.
16 changes: 8 additions & 8 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -20,38 +20,38 @@
- 错误处理回调方法
```go
// Option 选项模式
type Option func(pool *Pool)
type Option func(pool *pool)

// WithTimeout 设置超时时间
func WithTimeout(timeout time.Duration) Option {
return func(p *Pool) {
return func(p *pool) {
p.timeout = timeout
}
}

// WithMaxWorkerNum 设置最大worker数量
func WithMaxWorkerNum(maxWorkerNum int) Option {
return func(p *Pool) {
return func(p *pool) {
p.maxWorkerNum = maxWorkerNum
}
}

// WithResultCallback 设置结果回调方法
func WithResultCallback(callback func(interface{})) Option {
return func(p *Pool) {
return func(p *pool) {
p.resultCallback = callback
}
}

// WithErrorCallback 设置错误回调方法
func WithErrorCallback(callback func(error)) Option {
return func(p *Pool) {
return func(p *pool) {
p.errorCallback = callback
}
}
```
#### 基本使用
1. 实例化Pool
1. 实例化 Pool
```go
pool := workerpool.NewPool(5, workerpool.WithTimeout(1), workerpool.WithErrorCallback(func(err error) {
fmt.Println("WithErrorCallback")
Expand Down Expand Up @@ -103,7 +103,7 @@ taskID := data.(int)

`pool.AddGlobalQueue(task) // 所有的任务放入全局队列中`

动态放入:Pool启动时放入
动态放入:Pool启动后放入

`pool.AddTask(task)`

Expand Down Expand Up @@ -229,6 +229,6 @@ func TestTaskPool2(t *testing.T) {
```

#### 更多示例:
可在/example目录下查看
可在/example目录[参考](./example)查看
1. 封装简易调度器
2. 简易http服务实现执行任务
2 changes: 1 addition & 1 deletion example/example1_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ func TestTaskPool1(t *testing.T) {

time.Sleep(100 * time.Millisecond)
klog.Info("Task ", taskID, " processed")
return nil, nil
return taskID, nil
}

// 准备多个个任务
Expand Down
11 changes: 6 additions & 5 deletions example/example2_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ func TestTaskPool2(t *testing.T) {
*/
time.Sleep(100 * time.Millisecond)
klog.Info("Task ", taskID, " processed")
return nil, nil
return taskID, nil
})

// 所有的任务放入list中
Expand All @@ -52,23 +52,24 @@ func TestTaskPool2(t *testing.T) {
// 启动在后台等待执行
go pool.RunBackground()

// 模拟启动 pool 后又动态加入 Task 任务
for {
taskID := rand.Intn(100) + 20

//// 模拟一个退出条件
// 模拟一个退出条件
if taskID%7 == 0 {
klog.Info("taskID: ", taskID, "pool stop!")
klog.Info("taskID: ", taskID, ", pool stop!")
pool.StopBackground()
break
}

time.Sleep(time.Duration(rand.Intn(5)) * time.Second)
// 模拟后续加入pool
// 模拟后续加入 pool
task := workerpool.NewTaskInstance(fmt.Sprintf("task-%v", taskID), taskID, func(data interface{}) (interface{}, error) {
taskID := data.(int)
time.Sleep(3 * time.Second)
klog.Info("Task ", taskID, " processed")
return nil, nil
return taskID, nil
})

pool.AddTask(task)
Expand Down
11 changes: 11 additions & 0 deletions example/http_example/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,17 @@ package main

import "github.com/myconcurrencytools/workpoolframework/example/http_example/cmd"

// 启动:
// ➜ http_example git:(main) ✗ go run main.go httpServer -p=8081
// 2024/03/06 12:18:14 start scheduler...
// I0306 12:18:14.325710 8727 worker.go:102] Starting worker background: 6
// I0306 12:18:14.325697 8727 worker.go:102] Starting worker background: 4
// I0306 12:18:14.325724 8727 worker.go:102] Starting worker background: 5
// I0306 12:18:14.325752 8727 pool.go:142] no task in global queue...
// I0306 12:18:14.325765 8727 worker.go:102] Starting worker background: 1
// I0306 12:18:14.325752 8727 worker.go:102] Starting worker background: 3
// I0306 12:18:14.325830 8727 worker.go:102] Starting worker background: 2

func main() {
cmd.Execute()
}
2 changes: 1 addition & 1 deletion example/http_example/pkg/scheduler/scheduler.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ import (
)

type Scheduler struct {
pool *workerpool.Pool
pool workerpool.Pool
}

func NewScheduler(workerNum int) *Scheduler {
Expand Down
1 change: 0 additions & 1 deletion example/http_example/pkg/server/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,6 @@ import (
)

func HttpServer(c *common.ServerConfig) {

if !c.Debug {
gin.SetMode(gin.ReleaseMode)
}
Expand Down
2 changes: 1 addition & 1 deletion example/scheduler/scheduler.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ import (
)

type Scheduler struct {
pool *workerpool.Pool
pool workerpool.Pool
}

func NewScheduler(workerNum int) *Scheduler {
Expand Down
10 changes: 5 additions & 5 deletions pkg/workerpool/option.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,32 +3,32 @@ package workerpool
import "time"

// Option 选项模式
type Option func(pool *Pool)
type Option func(pool *pool)

// WithTimeout 设置超时时间
func WithTimeout(timeout time.Duration) Option {
return func(p *Pool) {
return func(p *pool) {
p.timeout = timeout
}
}

// WithMaxWorkerNum 设置最大worker数量
func WithMaxWorkerNum(maxWorkerNum int) Option {
return func(p *Pool) {
return func(p *pool) {
p.maxWorkerNum = maxWorkerNum
}
}

// WithResultCallback 设置结果回调方法
func WithResultCallback(callback func(interface{})) Option {
return func(p *Pool) {
return func(p *pool) {
p.resultCallback = callback
}
}

// WithErrorCallback 设置错误回调方法
func WithErrorCallback(callback func(error)) Option {
return func(p *Pool) {
return func(p *pool) {
p.errorCallback = callback
}
}
73 changes: 49 additions & 24 deletions pkg/workerpool/pool.go
Original file line number Diff line number Diff line change
@@ -1,15 +1,30 @@
package workerpool

import (
"fmt"
"k8s.io/klog/v2"
"math/rand"
"sync"
"time"
)

// Pool 工作池
type Pool struct {
// list 装task
type Pool interface {
// AddGlobalQueue 加入工作池的全局队列,静态加入,用于启动工作池前的任务加入时使用
AddGlobalQueue(task Task)
// AddTask 把任务放入chan,当工作池启动后,动态加入使用
AddTask(task Task)
// Run 执行
Run()
// RunBackground 异步执行
RunBackground()
// StopBackground 停止后台执行
StopBackground()
}

// pool 工作池对象,实现 Pool 接口,
// 其中有自动扩缩容功能,支持动态传入任务对象
type pool struct {
// Tasks 存储Task接口对象
Tasks []Task
// Workers 列表
Workers []*worker
Expand All @@ -27,17 +42,24 @@ type Pool struct {
errorCallback func(err error)
// resultCallback 当任务有结果时的回调方法
resultCallback func(result interface{})
wg sync.WaitGroup
lock sync.Mutex
// status 状态
status string
wg sync.WaitGroup
lock sync.Mutex
}

const (
Running = "running"
Stopped = "stopped"
)

const (
defaultMaxWorkerNum = 20
)

// NewPool 建立一个pool
func NewPool(concurrency int, opts ...Option) *Pool {
p := &Pool{
func NewPool(concurrency int, opts ...Option) Pool {
p := &pool{
Tasks: make([]Task, 0),
Workers: make([]*worker, 0),
concurrency: concurrency,
Expand All @@ -63,14 +85,14 @@ func NewPool(concurrency int, opts ...Option) *Pool {

// AddGlobalQueue 加入工作池的全局队列,静态加入,用于启动工作池前的任务加入时使用,
// 在工作池启动后,推荐使用AddTask() 方法动态加入工作池
func (p *Pool) AddGlobalQueue(task Task) {
func (p *pool) AddGlobalQueue(task Task) {
p.Tasks = append(p.Tasks, task)
}

// Run 启动pool,使用Run()方法调用时,只能使用AddGlobalQueue加入全局队列,
// 一旦Run启动后,就不允许调用AddTask加入Task,如果需动态加入pool,可以使用
// RunBackground方法
func (p *Pool) Run() {
func (p *pool) Run() {
// 总共会开启p.concurrency个goroutine
// 启动pool中的每个worker都传入collector chan
for i := 1; i <= p.concurrency; i++ {
Expand All @@ -92,29 +114,31 @@ func (p *Pool) Run() {
for i := range p.Tasks {
p.collector <- p.Tasks[i]
}
// 改变 pool 状态
p.status = Running

// 注意,这里需要close chan。
close(p.collector)

// 阻塞,等待所有的goroutine执行完毕
p.wg.Wait()

p.status = Stopped
}

// AddTask 把任务放入chan,当工作池启动后,动态加入使用
func (p *Pool) AddTask(task Task) {
func (p *pool) AddTask(task Task) {
// 判断执行状态
if p.status != Running {
fmt.Println("please use AddGlobalQueue func to add task when pool is not running")
return
}
// 放入chan
p.collector <- task
}

// RunBackground 后台运行,需要启动一个goroutine来执行
func (p *Pool) RunBackground() {
// 启动goroutine,打印。
go func() {
for {
klog.Info("Waiting for tasks to come in... \n")
time.Sleep(10 * time.Second)
}
}()
func (p *pool) RunBackground() {

// 启动workers 数量: p.concurrency
for i := 1; i <= p.concurrency; i++ {
Expand Down Expand Up @@ -143,18 +167,19 @@ func (p *Pool) RunBackground() {
}

// StopBackground 停止后台运行,需要chan通知
func (p *Pool) StopBackground() {
func (p *pool) StopBackground() {
klog.Info("pool close!")
close(p.collector)
for _, k := range p.Workers {
k.stop()
}
//p.runBackground <- true
p.runBackground <- true
p.status = Stopped
}

// dispatch 由pool chan中不断分发给worker chan
// 使用随机分配的方式
func (p *Pool) dispatch() {
func (p *pool) dispatch() {
for task := range p.collector {
p.lock.Lock()
index := rand.Intn(len(p.Workers))
Expand All @@ -170,7 +195,7 @@ func (p *Pool) dispatch() {
}

// autoScale 监测自动扩缩容
func (p *Pool) autoScale() {
func (p *pool) autoScale() {
for {

time.Sleep(5 * time.Second)
Expand Down Expand Up @@ -199,8 +224,8 @@ func (p *Pool) autoScale() {
}
}

// scaleWorkers 调整worker数方法方法
func (p *Pool) scaleWorkers(numWorkers int) {
// scaleWorkers 调整worker数
func (p *pool) scaleWorkers(numWorkers int) {
// 获取目前的worker数量
currentNumWorkers := len(p.Workers)

Expand Down
Loading

0 comments on commit 74ab4aa

Please sign in to comment.