Skip to content

Commit

Permalink
http forwarder (#8)
Browse files Browse the repository at this point in the history
  • Loading branch information
iqbalaydrus authored Dec 21, 2023
1 parent d1bb9c7 commit 8bec46d
Show file tree
Hide file tree
Showing 10 changed files with 309 additions and 20 deletions.
4 changes: 3 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
# Trap2JSON
Listens to SNMP Trap, converts it to json, and forwards it to zabbix, kafka, mqtt broker, or proxy it to other trap receiver.
Listens to SNMP Trap, converts it to json, and forwards it to zabbix, kafka,
mqtt broker, http server or proxy it to other trap receiver.
Internally it uses snmptrapd to listen to incoming trap message, output it
to stdout and parse the messages. If you're familiar with how snmptrapd works,
you can add any configuration supported by snmptrapd.conf. Prior knowledge
Expand All @@ -16,6 +17,7 @@ can use this as a solution for your distributed zabbix setup.
- Kafka
- MQTT Broker
- SNMP Trap (like a proxy)
- HTTP
- Zabbix
- Message filter for each forwarder
- Decide which messages to drop
Expand Down
38 changes: 37 additions & 1 deletion config.yml
Original file line number Diff line number Diff line change
Expand Up @@ -118,7 +118,7 @@ forwarders:
# send messages to kafka when this many values are ready to be sent
# default: 100
batch_size: 100
# message batch will be sent at least for this many duration regardless
# message batch will be sent at least after this many duration regardless
# of batch size
# default: 1s
batch_timeout: 1s
Expand Down Expand Up @@ -182,6 +182,42 @@ forwarders:
privacy_passphrase: testpriv
# for snmp v3. context name to use
context: name
- id: http
# forward to a http server. payload is in the http body
# request will be considered successful when receiving 2xx status code and retried otherwise
http:
# mandatory, URL to http server
url: https://localhost:8080/trap?token=xxxx
# default: POST
method: POST
# http headers to add to your requests in key: value format
# default: empty
headers:
Authorization:
- Bearer xxxx
# add http basic auth
# default: empty
basic_auth:
username: user
password: passwd
# ssl/tls configuration to connect to http server
# default: empty
tls:
# trust any server certificate
# default: false
insecure_skip_verify: false
# path to ca cert
ca_cert: ""
# path to client cert, used for TLS authentication (mTLS)
client_cert: ""
# path to client key, used for TLS authentication (mTLS)
client_key: ""
# proxy request to a http proxy
# default: empty
proxy: http://user:pass@localhost:3128
# timeout when making http request
# default: 5s
timeout: 5s
- id: zabbix trapper
# forward to zabbix server/zabbix proxy using zabbix trapper item
zabbix_trapper:
Expand Down
21 changes: 21 additions & 0 deletions forwarder/forwarder.go
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,7 @@ type Config struct {
Kafka *KafkaConfig
MQTT *MQTTConfig
Trap *SNMPTrapConfig
HTTP *HTTPConfig
ZabbixTrapper *ZabbixTrapperConfig `mapstructure:"zabbix_trapper"`
}

Expand All @@ -70,6 +71,8 @@ func (c *Config) Type() string {
return "file"
} else if c.Kafka != nil {
return "kafka"
} else if c.HTTP != nil {
return "http"
} else if c.MQTT != nil {
return "mqtt"
} else if c.Trap != nil {
Expand All @@ -83,6 +86,13 @@ func (c *Config) Type() string {
}
}

type Tls struct {
InsecureSkipVerify bool `mapstructure:"insecure_skip_verify"`
CaCert string `mapstructure:"ca_cert"`
ClientCert string `mapstructure:"client_cert"`
ClientKey string `mapstructure:"client_key"`
}

type Forwarder interface {
// Send will send the trap message to its corresponding forwarder.
// Does nothing if the queue buffer is full or forwarder is already closed
Expand Down Expand Up @@ -347,7 +357,18 @@ func StartForwarders(wg *sync.WaitGroup, c []Config, messageChan <-chan snmp.Mes
case "file":
forwarders = append(forwarders, NewFile(fwd, i))
case "kafka":
if fwd.Kafka.BatchSize == 0 {
fwd.Kafka.BatchSize = 100
}
if fwd.Kafka.BatchTimeout.Duration == 0 {
fwd.Kafka.BatchTimeout.Duration = time.Second
}
forwarders = append(forwarders, NewKafka(fwd, i))
case "http":
if fwd.HTTP.Timeout.Duration == 0 {
fwd.HTTP.Timeout.Duration = 5 * time.Second
}
forwarders = append(forwarders, NewHTTP(fwd, i))
case "mqtt":
if fwd.MQTT.Ordered == nil {
b := true
Expand Down
143 changes: 143 additions & 0 deletions forwarder/http.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,143 @@
package forwarder

import (
"context"
"crypto/tls"
"crypto/x509"
"github.com/carlmjohnson/requests"
"github.com/pkg/errors"
"net/http"
"net/url"
"os"
"strings"
)

type HTTPMethod int

const (
HTTPMethodPost HTTPMethod = iota
HTTPMethodGet
HTTPMethodPut
)

func (h *HTTPMethod) String() string {
switch *h {
case HTTPMethodPost:
return "POST"
case HTTPMethodGet:
return "GET"
case HTTPMethodPut:
return "PUT"
default:
return ""
}
}

func (h *HTTPMethod) UnmarshalText(text []byte) error {
switch strings.ToLower(string(text)) {
case "post":
*h = HTTPMethodPost
case "get":
*h = HTTPMethodGet
case "put":
*h = HTTPMethodPut
default:
return errors.Errorf("unsupported HTTPMethod: %s", string(text))
}
return nil
}

type HTTPBasicAuth struct {
Username string
Password string
}

type HTTPConfig struct {
URL string `mapstructure:"url"`
Method HTTPMethod
Headers map[string][]string
BasicAuth *HTTPBasicAuth `mapstructure:"basic_auth"`
Tls *Tls
Proxy string
Timeout Duration
}

type HTTP struct {
Base

builder *requests.Builder
}

func (h *HTTP) Run() {
defer h.cancel()
defer h.logger.Info().Msg("forwarder exited")
h.logger.Info().Msg("starting forwarder")

builder := requests.
URL(h.config.HTTP.URL).
Method(h.config.HTTP.Method.String()).
Headers(h.config.HTTP.Headers)
transport := &http.Transport{}
if h.config.HTTP.BasicAuth != nil {
builder = builder.BasicAuth(h.config.HTTP.BasicAuth.Username, h.config.HTTP.BasicAuth.Password)
}
if h.config.HTTP.Tls != nil {
tlsConf := &tls.Config{
InsecureSkipVerify: h.config.HTTP.Tls.InsecureSkipVerify,
}
if h.config.HTTP.Tls.CaCert != "" {
ca, err := os.ReadFile(h.config.HTTP.Tls.CaCert)
if err != nil {
h.logger.Fatal().Err(err).Msg("failed reading ca certificate")
}
caCerts := x509.NewCertPool()
caCerts.AppendCertsFromPEM(ca)
tlsConf.RootCAs = caCerts
}
if h.config.HTTP.Tls.ClientCert != "" &&
h.config.HTTP.Tls.ClientKey != "" {
cert, err := tls.LoadX509KeyPair(h.config.HTTP.Tls.ClientCert, h.config.HTTP.Tls.ClientKey)
if err != nil {
h.logger.Fatal().Err(err).Msg("failed reading client certificate")
}
tlsConf.Certificates = []tls.Certificate{cert}
}
transport.TLSClientConfig = tlsConf
}
if h.config.HTTP.Proxy != "" {
proxyUrl, err := url.Parse(h.config.HTTP.Proxy)
if err != nil {
h.logger.Fatal().Err(err).Msg("proxy url is not in the correct format")
}
transport.Proxy = http.ProxyURL(proxyUrl)
}
builder = builder.Transport(transport)

for {
m, err := h.Get()
if err != nil {
break
}
m.Compile(h.CompilerConf)
if m.Skip {
h.ctrFiltered.Inc()
continue
}
ctx, cancel := context.WithTimeout(context.Background(), h.config.HTTP.Timeout.Duration)
if err := builder.BodyBytes(m.MessageJSON).Fetch(ctx); err != nil {
cancel()
h.Retry(m, err)
} else {
cancel()
h.ctrSucceeded.Inc()
}
}
}

func NewHTTP(c Config, idx int) Forwarder {
fwd := &HTTP{
Base: NewBase(c, idx),
}
go fwd.Run()
return fwd
}
16 changes: 1 addition & 15 deletions forwarder/kafka.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@ import (
"strings"
"sync"
"sync/atomic"
"time"
)

type KafkaSaslMechanism int
Expand Down Expand Up @@ -60,19 +59,12 @@ type KafkaSasl struct {
Mechanism KafkaSaslMechanism
}

type KafkaTls struct {
InsecureSkipVerify bool `mapstructure:"insecure_skip_verify"`
CaCert string `mapstructure:"ca_cert"`
ClientCert string `mapstructure:"client_cert"`
ClientKey string `mapstructure:"client_key"`
}

type KafkaConfig struct {
RequiredAcks kafka.RequiredAcks `mapstructure:"required_acks"`
KeyField string `mapstructure:"key_field"`
Hosts []string
Topic string
Tls *KafkaTls
Tls *Tls
Sasl *KafkaSasl
BatchSize int `mapstructure:"batch_size"`
BatchTimeout Duration `mapstructure:"batch_timeout"`
Expand Down Expand Up @@ -219,12 +211,6 @@ func NewKafka(c Config, idx int) Forwarder {
fwd.logger.Fatal().Err(err).Msg("failed compiling kafka.key_field expression")
}
}
if fwd.config.Kafka.BatchSize == 0 {
fwd.config.Kafka.BatchSize = 100
}
if fwd.config.Kafka.BatchTimeout.Duration == 0 {
fwd.config.Kafka.BatchTimeout.Duration = time.Second
}
go fwd.Run()
return fwd
}
3 changes: 2 additions & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ go 1.21

require (
github.com/Workiva/go-datastructures v1.1.1
github.com/carlmjohnson/requests v0.23.5
github.com/eclipse/paho.mqtt.golang v1.4.3
github.com/essentialkaos/go-zabbix v1.1.3
github.com/expr-lang/expr v1.15.7
Expand All @@ -16,7 +17,7 @@ require (
github.com/rs/zerolog v1.31.0
github.com/segmentio/kafka-go v0.4.47
github.com/sleepinggenius2/gosmi v0.4.4
github.com/spf13/viper v1.18.1
github.com/spf13/viper v1.18.2
github.com/stretchr/testify v1.8.4
gopkg.in/guregu/null.v4 v4.0.0
)
Expand Down
6 changes: 4 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,8 @@ github.com/alecthomas/template v0.0.0-20190718012654-fb15b899a751/go.mod h1:LOuy
github.com/alecthomas/units v0.0.0-20190924025748-f65c72e2690d/go.mod h1:rBZYJk541a8SKzHPHnH3zbiI+7dagKZ0cgpgrD7Fyho=
github.com/beorn7/perks v1.0.1 h1:VlbKKnNfV8bJzeqoa4cOKqO6bYr3WgKZxO8Z16+hsOM=
github.com/beorn7/perks v1.0.1/go.mod h1:G2ZrVWU2WbWT9wwq4/hrbKbnv/1ERSJQ0ibhJ6rlkpw=
github.com/carlmjohnson/requests v0.23.5 h1:NPANcAofwwSuC6SIMwlgmHry2V3pLrSqRiSBKYbNHHA=
github.com/carlmjohnson/requests v0.23.5/go.mod h1:zG9P28thdRnN61aD7iECFhH5iGGKX2jIjKQD9kqYH+o=
github.com/cespare/xxhash/v2 v2.2.0 h1:DC2CZ1Ep5Y4k3ZQ899DldepgrayRUGE6BBZ/cd9Cj44=
github.com/cespare/xxhash/v2 v2.2.0/go.mod h1:VGX0DQ3Q6kWi7AoAeZDth3/j3BFtOZR5XLFGgcrjCOs=
github.com/coreos/go-systemd/v22 v22.5.0/go.mod h1:Y58oyj3AT4RCenI/lSvhwexgC+NSVTIJ3seZv2GcEnc=
Expand Down Expand Up @@ -117,8 +119,8 @@ github.com/spf13/cast v1.6.0 h1:GEiTHELF+vaR5dhz3VqZfFSzZjYbgeKDpBxQVS4GYJ0=
github.com/spf13/cast v1.6.0/go.mod h1:ancEpBxwJDODSW/UG4rDrAqiKolqNNh2DX3mk86cAdo=
github.com/spf13/pflag v1.0.5 h1:iy+VFUOCP1a+8yFto/drg2CJ5u0yRoB7fZw3DKv/JXA=
github.com/spf13/pflag v1.0.5/go.mod h1:McXfInJRrz4CZXVZOBLb0bTZqETkiAhM9Iw0y3An2Bg=
github.com/spf13/viper v1.18.1 h1:rmuU42rScKWlhhJDyXZRKJQHXFX02chSVW1IvkPGiVM=
github.com/spf13/viper v1.18.1/go.mod h1:EKmWIqdnk5lOcmR72yw6hS+8OPYcwD0jteitLMVB+yk=
github.com/spf13/viper v1.18.2 h1:LUXCnvUvSM6FXAsj6nnfc8Q2tp1dIgUfY9Kc8GsSOiQ=
github.com/spf13/viper v1.18.2/go.mod h1:EKmWIqdnk5lOcmR72yw6hS+8OPYcwD0jteitLMVB+yk=
github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME=
github.com/stretchr/objx v0.4.0/go.mod h1:YvHI0jy2hoMjB+UWwv71VJQ9isScKT/TqJzVSSt89Yw=
github.com/stretchr/objx v0.5.0/go.mod h1:Yh+to48EsGEfYuaHDzXPcE3xhTkx73EhmCGUpEOglKo=
Expand Down
11 changes: 11 additions & 0 deletions tests/forwarder_http_local_test.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
logger:
level: debug
forwarders:
- id: http
http:
url: http://host.docker.internal:9789
snmptrapd:
auth:
enable: true
community:
- name: public
Loading

0 comments on commit 8bec46d

Please sign in to comment.