Skip to content

Commit

Permalink
Ver2.0.0 alpha.18 (#26)
Browse files Browse the repository at this point in the history
* fix: config UnmarshalJSON

* fix nats input

* update: InputServer err msg
  • Loading branch information
mimuret authored Sep 8, 2023
1 parent 433fae2 commit 9d96e09
Show file tree
Hide file tree
Showing 3 changed files with 26 additions and 22 deletions.
30 changes: 14 additions & 16 deletions pkg/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -65,20 +65,8 @@ type Config struct {
OutputGroups []OutputGroupConfig
}

func NewConfig() *Config {
return &Config{
MetricsListen: ":9520",
LogLevel: "info",
InputBufferConfig: &BufferConfig{
Name: "input",
Size: DefaultInputBufferSize,
},
InputFilterWorkerNum: DefaultInputFilterWorkerNum,
}
}

func LoadConfig(fs afero.Fs, cfgFile string) (*Config, error) {
c := NewConfig()
c := &Config{}
bs, err := afero.ReadFile(fs, cfgFile)
if err != nil {
return nil, errors.Wrap(err, "failed to open config file")
Expand Down Expand Up @@ -127,7 +115,15 @@ func (c *Config) UnmarshalJSON(bs []byte) error {
Inputs json.RawMessage
Filters json.RawMessage
OutputGroups []json.RawMessage
}{}
}{
MetricsListen: ":9520",
LogLevel: "info",
InputBufferConfig: &BufferConfig{
Name: "input",
Size: DefaultInputBufferSize,
},
InputFilterWorkerNum: DefaultInputFilterWorkerNum,
}

if err := json.Unmarshal(bs, &cfg); err != nil {
return errors.Wrap(err, "invalid json Input")
Expand All @@ -142,8 +138,10 @@ func (c *Config) UnmarshalJSON(bs []byte) error {
if err := json.Unmarshal(cfg.Inputs, &c.Inputs); err != nil {
results = gerrors.Join(results, errors.Wrap(err, "failed to create input plugins"))
}
if err := json.Unmarshal(cfg.Filters, &c.Filters); err != nil {
results = gerrors.Join(results, errors.Wrap(err, "failed to create global filter plugins"))
if cfg.Filters != nil {
if err := json.Unmarshal(cfg.Filters, &c.Filters); err != nil {
results = gerrors.Join(results, errors.Wrap(err, "failed to create global filter plugins"))
}
}
for i, v := range cfg.OutputGroups {
var og OutputGroupConfig
Expand Down
2 changes: 1 addition & 1 deletion pkg/plugin/input/input.go
Original file line number Diff line number Diff line change
Expand Up @@ -124,7 +124,7 @@ func (i *InputServer) Serve(ln net.Listener, buf types.Writer) error {
func (i *InputServer) Read(r io.Reader, buf types.Writer) error {
decoder, err := framestream.NewDecoder(r, i.DecoderOptions)
if err != nil {
return err
return errors.Wrap(err, "failed to create fstrm decoder")
}
LOOP:
for {
Expand Down
16 changes: 11 additions & 5 deletions pkg/plugin/input/nats/nats.go
Original file line number Diff line number Diff line change
Expand Up @@ -143,18 +143,24 @@ func (f *Nats) Subscribe(ctx context.Context, w types.Writer) error {
defer func() {
_ = sub.Unsubscribe()
}()

wg := sync.WaitGroup{}
defer wg.Wait()
f.ic.Logger.Info("start subscribe", zap.String("subject", f.Subject), zap.String("queue name", f.QueueName), zap.Int("queue len", f.QueueLen))
LOOP:
for {
select {
case <-ctx.Done():
break LOOP
case msg := <-ch:
buf := bytes.NewBuffer(msg.Data)
if err := is.Read(buf, w); err != nil {
return err
}
wg.Add(1)
go func(bs []byte) {
buf := bytes.NewBuffer(bs)
if err := is.Read(buf, w); err != nil {
input.TotalDecordError.Inc()
f.ic.Logger.Debug("input error", zap.Error(err))
}
wg.Done()
}(msg.Data)
}
}
return nil
Expand Down

0 comments on commit 9d96e09

Please sign in to comment.