Skip to content

Commit

Permalink
feat: support wait sink node close (#3373)
Browse files Browse the repository at this point in the history
Signed-off-by: Song Gao <disxiaofei@163.com>
  • Loading branch information
Yisaer authored Nov 13, 2024
1 parent bfbb96d commit 3d23ce4
Show file tree
Hide file tree
Showing 4 changed files with 45 additions and 2 deletions.
1 change: 1 addition & 0 deletions internal/server/rule_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -341,6 +341,7 @@ func stopRule(name string) (result string, err error) {
if err != nil {
conf.Log.Warn(err)
}
rs.Topology.WaitOperatorClose()
_, err = ruleProcessor.ExecReplaceRuleState(name, false)
if err != nil {
conf.Log.Warnf("stop rule found error: %s", err.Error())
Expand Down
1 change: 1 addition & 0 deletions internal/topo/context/default.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ import (
const (
LoggerKey = "$$logger"
RuleStartKey = "$$ruleStart"
RuleOpsWg = "$$ops_wg"
)

type DefaultContext struct {
Expand Down
36 changes: 34 additions & 2 deletions internal/topo/node/sink_node.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,10 +16,12 @@ package node

import (
"fmt"
"sync"

"github.com/lf-edge/ekuiper/internal/binder/io"
"github.com/lf-edge/ekuiper/internal/conf"
"github.com/lf-edge/ekuiper/internal/topo/context"
kctx "github.com/lf-edge/ekuiper/internal/topo/context"
"github.com/lf-edge/ekuiper/internal/topo/node/cache"
nodeConf "github.com/lf-edge/ekuiper/internal/topo/node/conf"
"github.com/lf-edge/ekuiper/internal/topo/node/metric"
Expand Down Expand Up @@ -55,6 +57,7 @@ type SinkNode struct {
// configs (also static for sinks)
options map[string]interface{}
isMock bool
wg *sync.WaitGroup
// states varies after restart
sink api.Sink
}
Expand All @@ -77,8 +80,21 @@ func NewSinkNodeWithSink(name string, sink api.Sink, props map[string]interface{
}
}

func (m *SinkNode) Close(ctx api.StreamContext) {
if m.wg != nil {
m.wg.Done()
}
}

func (m *SinkNode) Open(ctx api.StreamContext, result chan<- error) {
m.ctx = ctx
v := ctx.Value(kctx.RuleOpsWg)
if v != nil {
wg, ok := v.(*sync.WaitGroup)
if ok {
m.wg = wg
}
}
logger := ctx.GetLogger()
logger.Debugf("open sink node %s", m.name)
go func() {
Expand Down Expand Up @@ -109,6 +125,20 @@ func (m *SinkNode) Open(ctx api.StreamContext, result chan<- error) {

go func(instance int) {
panicOrError := infra.SafeRun(func() error {
if m.wg != nil {
m.wg.Add(1)
}
var exitCh1 chan struct{}
var exitCh2 chan struct{}
defer func() {
if exitCh1 != nil {
<-exitCh1
}
if exitCh2 != nil {
<-exitCh2
}
m.Close(ctx)
}()
var (
sink api.Sink
err error
Expand Down Expand Up @@ -148,10 +178,12 @@ func (m *SinkNode) Open(ctx api.StreamContext, result chan<- error) {
if !sconf.EnableCache {
dataOutCh = dataCh
} else {
c = cache.NewSyncCache(ctx, dataCh, result, &sconf.SinkConf, sconf.BufferLength)
exitCh1 = make(chan struct{}, 2)
c = cache.NewSyncCacheWithExitChanel(ctx, dataCh, result, &sconf.SinkConf, sconf.BufferLength, exitCh1)
if sconf.ResendAlterQueue {
resendCh = make(chan []map[string]interface{}, sconf.BufferLength)
rq = cache.NewSyncCache(ctx, resendCh, result, &sconf.SinkConf, sconf.BufferLength)
exitCh2 = make(chan struct{}, 2)
rq = cache.NewSyncCacheWithExitChanel(ctx, resendCh, result, &sconf.SinkConf, sconf.BufferLength, exitCh2)
}
dataOutCh = c.Out
}
Expand Down
9 changes: 9 additions & 0 deletions internal/topo/topo.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@ type Topo struct {
topo *api.PrintableTopo
mu sync.Mutex
hasOpened atomic.Bool
wg *sync.WaitGroup
}

func NewWithNameAndOptions(name string, options *api.RuleOption) (*Topo, error) {
Expand Down Expand Up @@ -152,6 +153,12 @@ func (s *Topo) addEdge(from api.TopNode, to api.TopNode, toType string) {
s.topo.Edges[f] = append(e, t)
}

func (s *Topo) WaitOperatorClose() {
if s.wg != nil {
s.wg.Wait()
}
}

// prepareContext setups internal context before
// stream starts execution.
func (s *Topo) prepareContext() {
Expand Down Expand Up @@ -191,6 +198,8 @@ func (s *Topo) prepareContext() {
}
ctx := kctx.WithValue(kctx.Background(), kctx.LoggerKey, contextLogger)
ctx = kctx.WithValue(ctx, kctx.RuleStartKey, conf.GetNowInMilli())
s.wg = &sync.WaitGroup{}
ctx = kctx.WithValue(ctx, kctx.RuleOpsWg, s.wg)
s.ctx, s.cancel = ctx.WithCancel()
}
}
Expand Down

0 comments on commit 3d23ce4

Please sign in to comment.