From c4f9aa267fbd5f5643b1d53097354556a2b25f98 Mon Sep 17 00:00:00 2001 From: CFC4N Date: Mon, 9 Dec 2024 23:59:22 +0800 Subject: [PATCH] fix #685, the Processor print "incoming chan is full",and exit. Signed-off-by: CFC4N --- cli/cmd/root.go | 1 - pkg/event_processor/iworker.go | 13 +++++++++---- pkg/event_processor/processor.go | 14 +++++++++++--- user/module/imodule.go | 14 +++++++++++++- user/module/probe_openssl.go | 3 +-- 5 files changed, 34 insertions(+), 11 deletions(-) diff --git a/cli/cmd/root.go b/cli/cmd/root.go index b252163b3..acd586c18 100644 --- a/cli/cmd/root.go +++ b/cli/cmd/root.go @@ -244,7 +244,6 @@ func runModule(modName string, modConfig config.IConfig) { reload: // 初始化 - logger.Warn().Msg("========== module starting. ==========") mod := modFunc() ctx, cancelFun := context.WithCancel(context.TODO()) err = mod.Init(ctx, &logger, modConfig, ecw) diff --git a/pkg/event_processor/iworker.go b/pkg/event_processor/iworker.go index 0eed31c31..a2d163e1d 100644 --- a/pkg/event_processor/iworker.go +++ b/pkg/event_processor/iworker.go @@ -39,11 +39,16 @@ type IWorker interface { } const ( - MaxTickerCount = 10 // 1 Sencond/(eventWorker.ticker.C) = 10 - MaxChanLen = 16 // 包队列长度 + MaxTickerCount = 10 // 1 Sencond/(eventWorker.ticker.C) = 10 + MaxChanLen = 1024 // 包队列长度 //MAX_EVENT_LEN = 16 // 事件数组长度 ) +var ( + ErrEventWorkerIncomingFull = errors.New("eventWorker Write failed, incoming chan is full") + ErrEventWorkerOutcomingFull = errors.New("eventWorker Write failed, outComing chan is full") +) + type eventWorker struct { incoming chan event.IEventStruct //events []user.IEventStruct @@ -88,7 +93,7 @@ func (ew *eventWorker) Write(e event.IEventStruct) error { select { case ew.incoming <- e: default: - err = errors.New("eventWorker Write failed, incoming chan is full") + err = ErrEventWorkerIncomingFull } return err } @@ -98,7 +103,7 @@ func (ew *eventWorker) writeToChan(s string) error { select { case ew.outComing <- s: default: - err = errors.New("eventWorker Write failed, outComing chan is full") + err = ErrEventWorkerOutcomingFull } return err } diff --git a/pkg/event_processor/processor.go b/pkg/event_processor/processor.go index d4018df48..194706232 100644 --- a/pkg/event_processor/processor.go +++ b/pkg/event_processor/processor.go @@ -39,6 +39,7 @@ type EventProcessor struct { logger io.Writer closeChan chan bool + errChan chan error // output model isHex bool @@ -52,6 +53,7 @@ func (ep *EventProcessor) init() { ep.incoming = make(chan event.IEventStruct, MaxIncomingChanLen) ep.outComing = make(chan string, MaxIncomingChanLen) ep.closeChan = make(chan bool) + ep.errChan = make(chan error, 16) ep.workerQueue = make(map[string]IWorker, MaxParserQueueLen) } @@ -63,9 +65,12 @@ func (ep *EventProcessor) Serve() error { case eventStruct := <-ep.incoming: err = ep.dispatch(eventStruct) if err != nil { - //err1 := ep.Close() - //return errors.Join(err, err1) - return err + // 不返回error是合理的做法,因为个别事件处理失败不应该影响整个处理器的关闭。 + // 但是,需要将这个错误抛给的调用着,让调用者决定是否关闭处理器 + select { + case ep.errChan <- err: + default: + } } case s := <-ep.outComing: _, _ = ep.GetLogger().Write([]byte(s)) @@ -153,6 +158,9 @@ func (ep *EventProcessor) Close() error { return nil } +func (ep *EventProcessor) ErrorChan() chan error { + return ep.errChan +} func NewEventProcessor(logger io.Writer, isHex bool) *EventProcessor { var ep *EventProcessor ep = &EventProcessor{} diff --git a/user/module/imodule.go b/user/module/imodule.go index 874d7345c..1d872e8dc 100644 --- a/user/module/imodule.go +++ b/user/module/imodule.go @@ -97,11 +97,23 @@ func (m *Module) Init(ctx context.Context, logger *zerolog.Logger, conf config.I m.isClosed.Store(false) m.ctx = ctx m.logger = logger - m.errChan = make(chan error) + m.errChan = make(chan error, 16) m.isKernelLess5_2 = false //set false default m.eventCollector = eventCollector //var epl = epLogger{logger: logger} m.processor = event_processor.NewEventProcessor(eventCollector, conf.GetHex()) + + go func() { + // 读取错误信息 + for { + select { + case err := <-m.processor.ErrorChan(): + m.logger.Warn().AnErr("Processor error", err).Send() + case <-m.ctx.Done(): + return + } + } + }() kv, err := kernel.HostVersion() if err != nil { m.logger.Warn().Err(err).Msg("Unable to detect kernel version due to an error:%v.used non-Less5_2 bytecode.") diff --git a/user/module/probe_openssl.go b/user/module/probe_openssl.go index 2bc07dad9..2e0cc2f14 100644 --- a/user/module/probe_openssl.go +++ b/user/module/probe_openssl.go @@ -209,8 +209,7 @@ func (m *MOpenSSLProbe) getSslBpfFile(soPath, sslVersion string) error { // 未找到版本号, try libcrypto.so.x if strings.Contains(soPath, "libssl.so.3") { m.logger.Warn().Err(err).Str("soPath", soPath).Msg("OpenSSL/BoringSSL version not found.") - m.logger.Warn().Msg("Try to detect libcrypto.so.3. If you have doubts") - m.logger.Warn().Msg("See https://github.com/gojue/ecapture/discussions/675 for more information.") + m.logger.Warn().Msg("Try to detect libcrypto.so.3. If you have doubts, See https://github.com/gojue/ecapture/discussions/675 for more information.") // 从 libssl.so.3 中获取 libcrypto.so.3 的路径 var libcryptoName = "libcrypto.so.3"