diff --git a/README.md b/README.md index 491d0c1..f8da7df 100644 --- a/README.md +++ b/README.md @@ -1,5 +1,6 @@ # Trap2JSON -Listens to SNMP Trap, converts it to json, and forwards it to zabbix, kafka, mqtt broker, or proxy it to other trap receiver. +Listens to SNMP Trap, converts it to json, and forwards it to zabbix, kafka, +mqtt broker, http server or proxy it to other trap receiver. Internally it uses snmptrapd to listen to incoming trap message, output it to stdout and parse the messages. If you're familiar with how snmptrapd works, you can add any configuration supported by snmptrapd.conf. Prior knowledge @@ -16,6 +17,7 @@ can use this as a solution for your distributed zabbix setup. - Kafka - MQTT Broker - SNMP Trap (like a proxy) + - HTTP - Zabbix - Message filter for each forwarder - Decide which messages to drop diff --git a/config.yml b/config.yml index 389806c..cb277fa 100644 --- a/config.yml +++ b/config.yml @@ -118,7 +118,7 @@ forwarders: # send messages to kafka when this many values are ready to be sent # default: 100 batch_size: 100 - # message batch will be sent at least for this many duration regardless + # message batch will be sent at least after this many duration regardless # of batch size # default: 1s batch_timeout: 1s @@ -182,6 +182,42 @@ forwarders: privacy_passphrase: testpriv # for snmp v3. context name to use context: name + - id: http + # forward to a http server. payload is in the http body + # request will be considered successful when receiving 2xx status code and retried otherwise + http: + # mandatory, URL to http server + url: https://localhost:8080/trap?token=xxxx + # default: POST + method: POST + # http headers to add to your requests in key: value format + # default: empty + headers: + Authorization: + - Bearer xxxx + # add http basic auth + # default: empty + basic_auth: + username: user + password: passwd + # ssl/tls configuration to connect to http server + # default: empty + tls: + # trust any server certificate + # default: false + insecure_skip_verify: false + # path to ca cert + ca_cert: "" + # path to client cert, used for TLS authentication (mTLS) + client_cert: "" + # path to client key, used for TLS authentication (mTLS) + client_key: "" + # proxy request to a http proxy + # default: empty + proxy: http://user:pass@localhost:3128 + # timeout when making http request + # default: 5s + timeout: 5s - id: zabbix trapper # forward to zabbix server/zabbix proxy using zabbix trapper item zabbix_trapper: diff --git a/forwarder/forwarder.go b/forwarder/forwarder.go index d928d4b..7504a05 100644 --- a/forwarder/forwarder.go +++ b/forwarder/forwarder.go @@ -62,6 +62,7 @@ type Config struct { Kafka *KafkaConfig MQTT *MQTTConfig Trap *SNMPTrapConfig + HTTP *HTTPConfig ZabbixTrapper *ZabbixTrapperConfig `mapstructure:"zabbix_trapper"` } @@ -70,6 +71,8 @@ func (c *Config) Type() string { return "file" } else if c.Kafka != nil { return "kafka" + } else if c.HTTP != nil { + return "http" } else if c.MQTT != nil { return "mqtt" } else if c.Trap != nil { @@ -83,6 +86,13 @@ func (c *Config) Type() string { } } +type Tls struct { + InsecureSkipVerify bool `mapstructure:"insecure_skip_verify"` + CaCert string `mapstructure:"ca_cert"` + ClientCert string `mapstructure:"client_cert"` + ClientKey string `mapstructure:"client_key"` +} + type Forwarder interface { // Send will send the trap message to its corresponding forwarder. // Does nothing if the queue buffer is full or forwarder is already closed @@ -347,7 +357,18 @@ func StartForwarders(wg *sync.WaitGroup, c []Config, messageChan <-chan snmp.Mes case "file": forwarders = append(forwarders, NewFile(fwd, i)) case "kafka": + if fwd.Kafka.BatchSize == 0 { + fwd.Kafka.BatchSize = 100 + } + if fwd.Kafka.BatchTimeout.Duration == 0 { + fwd.Kafka.BatchTimeout.Duration = time.Second + } forwarders = append(forwarders, NewKafka(fwd, i)) + case "http": + if fwd.HTTP.Timeout.Duration == 0 { + fwd.HTTP.Timeout.Duration = 5 * time.Second + } + forwarders = append(forwarders, NewHTTP(fwd, i)) case "mqtt": if fwd.MQTT.Ordered == nil { b := true diff --git a/forwarder/http.go b/forwarder/http.go new file mode 100644 index 0000000..7b9c4af --- /dev/null +++ b/forwarder/http.go @@ -0,0 +1,143 @@ +package forwarder + +import ( + "context" + "crypto/tls" + "crypto/x509" + "github.com/carlmjohnson/requests" + "github.com/pkg/errors" + "net/http" + "net/url" + "os" + "strings" +) + +type HTTPMethod int + +const ( + HTTPMethodPost HTTPMethod = iota + HTTPMethodGet + HTTPMethodPut +) + +func (h *HTTPMethod) String() string { + switch *h { + case HTTPMethodPost: + return "POST" + case HTTPMethodGet: + return "GET" + case HTTPMethodPut: + return "PUT" + default: + return "" + } +} + +func (h *HTTPMethod) UnmarshalText(text []byte) error { + switch strings.ToLower(string(text)) { + case "post": + *h = HTTPMethodPost + case "get": + *h = HTTPMethodGet + case "put": + *h = HTTPMethodPut + default: + return errors.Errorf("unsupported HTTPMethod: %s", string(text)) + } + return nil +} + +type HTTPBasicAuth struct { + Username string + Password string +} + +type HTTPConfig struct { + URL string `mapstructure:"url"` + Method HTTPMethod + Headers map[string][]string + BasicAuth *HTTPBasicAuth `mapstructure:"basic_auth"` + Tls *Tls + Proxy string + Timeout Duration +} + +type HTTP struct { + Base + + builder *requests.Builder +} + +func (h *HTTP) Run() { + defer h.cancel() + defer h.logger.Info().Msg("forwarder exited") + h.logger.Info().Msg("starting forwarder") + + builder := requests. + URL(h.config.HTTP.URL). + Method(h.config.HTTP.Method.String()). + Headers(h.config.HTTP.Headers) + transport := &http.Transport{} + if h.config.HTTP.BasicAuth != nil { + builder = builder.BasicAuth(h.config.HTTP.BasicAuth.Username, h.config.HTTP.BasicAuth.Password) + } + if h.config.HTTP.Tls != nil { + tlsConf := &tls.Config{ + InsecureSkipVerify: h.config.HTTP.Tls.InsecureSkipVerify, + } + if h.config.HTTP.Tls.CaCert != "" { + ca, err := os.ReadFile(h.config.HTTP.Tls.CaCert) + if err != nil { + h.logger.Fatal().Err(err).Msg("failed reading ca certificate") + } + caCerts := x509.NewCertPool() + caCerts.AppendCertsFromPEM(ca) + tlsConf.RootCAs = caCerts + } + if h.config.HTTP.Tls.ClientCert != "" && + h.config.HTTP.Tls.ClientKey != "" { + cert, err := tls.LoadX509KeyPair(h.config.HTTP.Tls.ClientCert, h.config.HTTP.Tls.ClientKey) + if err != nil { + h.logger.Fatal().Err(err).Msg("failed reading client certificate") + } + tlsConf.Certificates = []tls.Certificate{cert} + } + transport.TLSClientConfig = tlsConf + } + if h.config.HTTP.Proxy != "" { + proxyUrl, err := url.Parse(h.config.HTTP.Proxy) + if err != nil { + h.logger.Fatal().Err(err).Msg("proxy url is not in the correct format") + } + transport.Proxy = http.ProxyURL(proxyUrl) + } + builder = builder.Transport(transport) + + for { + m, err := h.Get() + if err != nil { + break + } + m.Compile(h.CompilerConf) + if m.Skip { + h.ctrFiltered.Inc() + continue + } + ctx, cancel := context.WithTimeout(context.Background(), h.config.HTTP.Timeout.Duration) + if err := builder.BodyBytes(m.MessageJSON).Fetch(ctx); err != nil { + cancel() + h.Retry(m, err) + } else { + cancel() + h.ctrSucceeded.Inc() + } + } +} + +func NewHTTP(c Config, idx int) Forwarder { + fwd := &HTTP{ + Base: NewBase(c, idx), + } + go fwd.Run() + return fwd +} diff --git a/forwarder/kafka.go b/forwarder/kafka.go index 49543ef..4e8e713 100644 --- a/forwarder/kafka.go +++ b/forwarder/kafka.go @@ -16,7 +16,6 @@ import ( "strings" "sync" "sync/atomic" - "time" ) type KafkaSaslMechanism int @@ -60,19 +59,12 @@ type KafkaSasl struct { Mechanism KafkaSaslMechanism } -type KafkaTls struct { - InsecureSkipVerify bool `mapstructure:"insecure_skip_verify"` - CaCert string `mapstructure:"ca_cert"` - ClientCert string `mapstructure:"client_cert"` - ClientKey string `mapstructure:"client_key"` -} - type KafkaConfig struct { RequiredAcks kafka.RequiredAcks `mapstructure:"required_acks"` KeyField string `mapstructure:"key_field"` Hosts []string Topic string - Tls *KafkaTls + Tls *Tls Sasl *KafkaSasl BatchSize int `mapstructure:"batch_size"` BatchTimeout Duration `mapstructure:"batch_timeout"` @@ -219,12 +211,6 @@ func NewKafka(c Config, idx int) Forwarder { fwd.logger.Fatal().Err(err).Msg("failed compiling kafka.key_field expression") } } - if fwd.config.Kafka.BatchSize == 0 { - fwd.config.Kafka.BatchSize = 100 - } - if fwd.config.Kafka.BatchTimeout.Duration == 0 { - fwd.config.Kafka.BatchTimeout.Duration = time.Second - } go fwd.Run() return fwd } diff --git a/go.mod b/go.mod index 50362ed..b7aeea8 100644 --- a/go.mod +++ b/go.mod @@ -4,6 +4,7 @@ go 1.21 require ( github.com/Workiva/go-datastructures v1.1.1 + github.com/carlmjohnson/requests v0.23.5 github.com/eclipse/paho.mqtt.golang v1.4.3 github.com/essentialkaos/go-zabbix v1.1.3 github.com/expr-lang/expr v1.15.7 @@ -16,7 +17,7 @@ require ( github.com/rs/zerolog v1.31.0 github.com/segmentio/kafka-go v0.4.47 github.com/sleepinggenius2/gosmi v0.4.4 - github.com/spf13/viper v1.18.1 + github.com/spf13/viper v1.18.2 github.com/stretchr/testify v1.8.4 gopkg.in/guregu/null.v4 v4.0.0 ) diff --git a/go.sum b/go.sum index 946b60c..add2c9d 100644 --- a/go.sum +++ b/go.sum @@ -10,6 +10,8 @@ github.com/alecthomas/template v0.0.0-20190718012654-fb15b899a751/go.mod h1:LOuy github.com/alecthomas/units v0.0.0-20190924025748-f65c72e2690d/go.mod h1:rBZYJk541a8SKzHPHnH3zbiI+7dagKZ0cgpgrD7Fyho= github.com/beorn7/perks v1.0.1 h1:VlbKKnNfV8bJzeqoa4cOKqO6bYr3WgKZxO8Z16+hsOM= github.com/beorn7/perks v1.0.1/go.mod h1:G2ZrVWU2WbWT9wwq4/hrbKbnv/1ERSJQ0ibhJ6rlkpw= +github.com/carlmjohnson/requests v0.23.5 h1:NPANcAofwwSuC6SIMwlgmHry2V3pLrSqRiSBKYbNHHA= +github.com/carlmjohnson/requests v0.23.5/go.mod h1:zG9P28thdRnN61aD7iECFhH5iGGKX2jIjKQD9kqYH+o= github.com/cespare/xxhash/v2 v2.2.0 h1:DC2CZ1Ep5Y4k3ZQ899DldepgrayRUGE6BBZ/cd9Cj44= github.com/cespare/xxhash/v2 v2.2.0/go.mod h1:VGX0DQ3Q6kWi7AoAeZDth3/j3BFtOZR5XLFGgcrjCOs= github.com/coreos/go-systemd/v22 v22.5.0/go.mod h1:Y58oyj3AT4RCenI/lSvhwexgC+NSVTIJ3seZv2GcEnc= @@ -117,8 +119,8 @@ github.com/spf13/cast v1.6.0 h1:GEiTHELF+vaR5dhz3VqZfFSzZjYbgeKDpBxQVS4GYJ0= github.com/spf13/cast v1.6.0/go.mod h1:ancEpBxwJDODSW/UG4rDrAqiKolqNNh2DX3mk86cAdo= github.com/spf13/pflag v1.0.5 h1:iy+VFUOCP1a+8yFto/drg2CJ5u0yRoB7fZw3DKv/JXA= github.com/spf13/pflag v1.0.5/go.mod h1:McXfInJRrz4CZXVZOBLb0bTZqETkiAhM9Iw0y3An2Bg= -github.com/spf13/viper v1.18.1 h1:rmuU42rScKWlhhJDyXZRKJQHXFX02chSVW1IvkPGiVM= -github.com/spf13/viper v1.18.1/go.mod h1:EKmWIqdnk5lOcmR72yw6hS+8OPYcwD0jteitLMVB+yk= +github.com/spf13/viper v1.18.2 h1:LUXCnvUvSM6FXAsj6nnfc8Q2tp1dIgUfY9Kc8GsSOiQ= +github.com/spf13/viper v1.18.2/go.mod h1:EKmWIqdnk5lOcmR72yw6hS+8OPYcwD0jteitLMVB+yk= github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME= github.com/stretchr/objx v0.4.0/go.mod h1:YvHI0jy2hoMjB+UWwv71VJQ9isScKT/TqJzVSSt89Yw= github.com/stretchr/objx v0.5.0/go.mod h1:Yh+to48EsGEfYuaHDzXPcE3xhTkx73EhmCGUpEOglKo= diff --git a/tests/forwarder_http_local_test.yaml b/tests/forwarder_http_local_test.yaml new file mode 100644 index 0000000..5a91308 --- /dev/null +++ b/tests/forwarder_http_local_test.yaml @@ -0,0 +1,11 @@ +logger: + level: debug +forwarders: + - id: http + http: + url: http://host.docker.internal:9789 +snmptrapd: + auth: + enable: true + community: + - name: public diff --git a/tests/forwarder_http_test.go b/tests/forwarder_http_test.go new file mode 100644 index 0000000..bc206ac --- /dev/null +++ b/tests/forwarder_http_test.go @@ -0,0 +1,76 @@ +package main + +import ( + "context" + "errors" + "fmt" + "github.com/stretchr/testify/assert" + tc "github.com/testcontainers/testcontainers-go" + "io" + "net/http" + "os/exec" + "path" + "testing" + "time" +) + +func TestHTTPForwarder(t *testing.T) { + ctx, cancel := context.WithTimeout(context.Background(), 5*time.Minute) + defer cancel() + confPath := "tests/forwarder_http_test.yaml" + if localOS[operatingSystem] { + confPath = "tests/forwarder_http_local_test.yaml" + } + tfContainer.Container.Mounts = tc.ContainerMounts{ + tc.ContainerMount{ + Source: tc.GenericBindMountSource{ + HostPath: path.Join(wd, confPath), + }, + Target: "/etc/trap2json/config.yml", + ReadOnly: true, + }, + } + setup(ctx, tfContainer) + defer teardown(ctx, tfContainer) + defer func() { + if t.Failed() { + if r, err := tfContainer.Resource.Logs(ctx); err == nil { + if logs, err := io.ReadAll(r); err == nil { + fmt.Println(string(logs)) + } + } + } + }() + + ctxMsg, cancelMsg := context.WithCancel(ctx) + http.HandleFunc("/", func(w http.ResponseWriter, r *http.Request) { + defer cancelMsg() + data, err := io.ReadAll(r.Body) + if !assert.NoError(t, err) { + return + } + defaultTestAssert(t, data, 7) + }) + srv := &http.Server{ + Addr: ":9789", + } + go func() { + err := srv.ListenAndServe() + if !errors.Is(err, http.ErrServerClosed) { + panic(err) + } + }() + // wait for http server to start + time.Sleep(10 * time.Millisecond) + + udpPort, err := tfContainer.Resource.MappedPort(ctx, trapPort) + assert.NoError(t, err) + cmdStr := defaultTestCommand(fmt.Sprintf("localhost:%d", udpPort.Int())) + cmd := exec.Command(cmdStr[0], cmdStr[1:]...) + err = cmd.Run() + if !assert.NoError(t, err) { + return + } + <-ctxMsg.Done() + srv.Shutdown(ctx) +} diff --git a/tests/forwarder_http_test.yaml b/tests/forwarder_http_test.yaml new file mode 100644 index 0000000..1085f84 --- /dev/null +++ b/tests/forwarder_http_test.yaml @@ -0,0 +1,11 @@ +logger: + level: info +forwarders: + - id: http + http: + url: http://172.17.0.1:9789 +snmptrapd: + auth: + enable: true + community: + - name: public