Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

组件并发模式会出现 go协程阻塞积压 #82

Open
LittleBeeMark opened this issue Sep 30, 2021 · 2 comments
Open

组件并发模式会出现 go协程阻塞积压 #82

LittleBeeMark opened this issue Sep 30, 2021 · 2 comments

Comments

@LittleBeeMark
Copy link

// Do 执行子组件
// ctx 业务上下文
// currentConponent 当前组件
// wg 父组件的waitgroup对象
func (bc *BaseConcurrencyComponent) Do(ctx *Context, currentConponent Component, wg *sync.WaitGroup) (err error) {
defer wg.Done()
// 初始化并发子组件channel
if bc.logicResChan == nil {
bc.logicResChan = make(chan interface{}, 1)
}

go currentConponent.BusinessLogicDo(bc.logicResChan)

select {
// 等待业务执行结果
case <-bc.logicResChan:
	// 业务执行结果
	fmt.Println(runFuncName(), "bc.BusinessLogicDo wait.done...")
	break
// 超时等待
case <-ctx.TimeoutCtx.Done():
	// 超时退出
	fmt.Println(runFuncName(), "bc.BusinessLogicDo timeout...")
	bc.Err = ErrConcurrencyComponentTimeout
	break
}
// 执行子组件
err = currentConponent.ChildsDo(ctx)
return

}

这个组件的并发设计中,子任务的执行是同时起生产者和消费者的,在消费者超时退出后,生产者(urrentConponent.BusinessLogicDo(bc.logicResChan))如果此时才获取到结果发送到消费者将会阻塞,这样会造成多go程阻塞,go程积压造成内存爆表。

我模仿写了一个伪代码:
func run(wg *sync.WaitGroup) {
defer wg.Done()
ch := make(chan string)

wg.Add(1)
go func() {
	time.Sleep(9 * time.Second)
            // 阻塞后死锁
	ch <- "work1"
}()

ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
defer cancel()

select {
case res := <-ch:
	fmt.Println("接收到任务", res)

case <-ctx.Done():
	fmt.Println("ctx err : ", ctx.Err())
	return
}

}

func main() {
wg := sync.WaitGroup{}
wg.Add(1)
go run(&wg)
wg.Wait()

fmt.Println("success")

}

运行结果是go程阻塞死锁 :
ctx err : context deadline exceeded
fatal error: all goroutines are asleep - deadlock!

@LittleBeeMark LittleBeeMark changed the title 组件并发模式可能出现 Panic 组件并发模式会出现 go协程阻塞积压 Sep 30, 2021
@TIGERB
Copy link
Owner

TIGERB commented Sep 30, 2021

是的 可能存在这个问题 我仔细看看在回复你哈

@LittleBeeMark
Copy link
Author

可以在消费者超时结束后通知生产者无需发送处理结果:
func run(wg *sync.WaitGroup, stopChan chan struct{}) {
defer wg.Done()
ch := make(chan string)

wg.Add(1)
go func() {
	defer wg.Done()
	time.Sleep(9 * time.Second)

	select {
	case ch <- "work1":
		fmt.Println("i'm work")
		return
	
	case <-stopChan:
		fmt.Println("i has been stopped")
		return
	}
}()

ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
defer cancel()

select {
case res := <-ch:
	fmt.Println("get a work: ", res)
	return

case <-ctx.Done():
	select {
	case stopChan <- struct{}{}:
	default:
	}
	
	fmt.Println("ctx err : ", ctx.Err())
	return
}

}

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

2 participants