Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix #685, the Processor print "incoming chan is full",and exit. #686

Merged
merged 1 commit into from
Dec 10, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 0 additions & 1 deletion cli/cmd/root.go
Original file line number Diff line number Diff line change
Expand Up @@ -244,7 +244,6 @@ func runModule(modName string, modConfig config.IConfig) {

reload:
// 初始化
logger.Warn().Msg("========== module starting. ==========")
mod := modFunc()
ctx, cancelFun := context.WithCancel(context.TODO())
err = mod.Init(ctx, &logger, modConfig, ecw)
Expand Down
13 changes: 9 additions & 4 deletions pkg/event_processor/iworker.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,11 +39,16 @@ type IWorker interface {
}

const (
MaxTickerCount = 10 // 1 Sencond/(eventWorker.ticker.C) = 10
MaxChanLen = 16 // 包队列长度
MaxTickerCount = 10 // 1 Sencond/(eventWorker.ticker.C) = 10
MaxChanLen = 1024 // 包队列长度
//MAX_EVENT_LEN = 16 // 事件数组长度
)

var (
ErrEventWorkerIncomingFull = errors.New("eventWorker Write failed, incoming chan is full")
ErrEventWorkerOutcomingFull = errors.New("eventWorker Write failed, outComing chan is full")
)

type eventWorker struct {
incoming chan event.IEventStruct
//events []user.IEventStruct
Expand Down Expand Up @@ -88,7 +93,7 @@ func (ew *eventWorker) Write(e event.IEventStruct) error {
select {
case ew.incoming <- e:
default:
err = errors.New("eventWorker Write failed, incoming chan is full")
err = ErrEventWorkerIncomingFull
}
return err
}
Expand All @@ -98,7 +103,7 @@ func (ew *eventWorker) writeToChan(s string) error {
select {
case ew.outComing <- s:
default:
err = errors.New("eventWorker Write failed, outComing chan is full")
err = ErrEventWorkerOutcomingFull
}
return err
}
Expand Down
14 changes: 11 additions & 3 deletions pkg/event_processor/processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ type EventProcessor struct {
logger io.Writer

closeChan chan bool
errChan chan error

// output model
isHex bool
Expand All @@ -52,6 +53,7 @@ func (ep *EventProcessor) init() {
ep.incoming = make(chan event.IEventStruct, MaxIncomingChanLen)
ep.outComing = make(chan string, MaxIncomingChanLen)
ep.closeChan = make(chan bool)
ep.errChan = make(chan error, 16)
ep.workerQueue = make(map[string]IWorker, MaxParserQueueLen)
}

Expand All @@ -63,9 +65,12 @@ func (ep *EventProcessor) Serve() error {
case eventStruct := <-ep.incoming:
err = ep.dispatch(eventStruct)
if err != nil {
//err1 := ep.Close()
//return errors.Join(err, err1)
return err
// 不返回error是合理的做法,因为个别事件处理失败不应该影响整个处理器的关闭。
// 但是,需要将这个错误抛给的调用着,让调用者决定是否关闭处理器
select {
case ep.errChan <- err:
default:
}
}
case s := <-ep.outComing:
_, _ = ep.GetLogger().Write([]byte(s))
Expand Down Expand Up @@ -153,6 +158,9 @@ func (ep *EventProcessor) Close() error {
return nil
}

func (ep *EventProcessor) ErrorChan() chan error {
return ep.errChan
}
func NewEventProcessor(logger io.Writer, isHex bool) *EventProcessor {
var ep *EventProcessor
ep = &EventProcessor{}
Expand Down
14 changes: 13 additions & 1 deletion user/module/imodule.go
Original file line number Diff line number Diff line change
Expand Up @@ -97,11 +97,23 @@ func (m *Module) Init(ctx context.Context, logger *zerolog.Logger, conf config.I
m.isClosed.Store(false)
m.ctx = ctx
m.logger = logger
m.errChan = make(chan error)
m.errChan = make(chan error, 16)
m.isKernelLess5_2 = false //set false default
m.eventCollector = eventCollector
//var epl = epLogger{logger: logger}
m.processor = event_processor.NewEventProcessor(eventCollector, conf.GetHex())

go func() {
// 读取错误信息
for {
select {
case err := <-m.processor.ErrorChan():
m.logger.Warn().AnErr("Processor error", err).Send()
case <-m.ctx.Done():
return
}
}
}()
kv, err := kernel.HostVersion()
if err != nil {
m.logger.Warn().Err(err).Msg("Unable to detect kernel version due to an error:%v.used non-Less5_2 bytecode.")
Expand Down
3 changes: 1 addition & 2 deletions user/module/probe_openssl.go
Original file line number Diff line number Diff line change
Expand Up @@ -209,8 +209,7 @@ func (m *MOpenSSLProbe) getSslBpfFile(soPath, sslVersion string) error {
// 未找到版本号, try libcrypto.so.x
if strings.Contains(soPath, "libssl.so.3") {
m.logger.Warn().Err(err).Str("soPath", soPath).Msg("OpenSSL/BoringSSL version not found.")
m.logger.Warn().Msg("Try to detect libcrypto.so.3. If you have doubts")
m.logger.Warn().Msg("See https://github.com/gojue/ecapture/discussions/675 for more information.")
m.logger.Warn().Msg("Try to detect libcrypto.so.3. If you have doubts, See https://github.com/gojue/ecapture/discussions/675 for more information.")

// 从 libssl.so.3 中获取 libcrypto.so.3 的路径
var libcryptoName = "libcrypto.so.3"
Expand Down
Loading