Skip to content

Commit

Permalink
Testing out trying to multiplex flow across multiple nr accounts (#438)
Browse files Browse the repository at this point in the history
Lets you do multiple sinks in the config file like

sinks:
  - new_relic_multi
  - stdout
  • Loading branch information
i3149 authored Sep 22, 2022
1 parent 378a91c commit 8223045
Show file tree
Hide file tree
Showing 11 changed files with 278 additions and 133 deletions.
10 changes: 5 additions & 5 deletions cmd/ktranslate/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -179,7 +179,7 @@ func applyMode(cfg *ktranslate.Config, mode string) error {
cfg.Format = "new_relic"
cfg.SampleMin = 100
cfg.Compression = "gzip"
cfg.Sinks = "new_relic"
cfg.Sinks = []string{"new_relic"}

if cfg.SampleRate == 0 {
cfg.SampleRate = 1000
Expand Down Expand Up @@ -213,7 +213,7 @@ func applyMode(cfg *ktranslate.Config, mode string) error {
setNr()
case "nr1.syslog": // Tune for syslog. Don't want any sampling so can't use setNR directly.
cfg.Compression = "gzip"
cfg.Sinks = "new_relic"
cfg.Sinks = []string{"new_relic"}
cfg.Format = "new_relic_metric"
cfg.SNMPInput.FlowOnly = true // Don't do snmp polling.
if cfg.SyslogInput.ListenAddr == "" {
Expand All @@ -222,7 +222,7 @@ func applyMode(cfg *ktranslate.Config, mode string) error {
cfg.SyslogInput.Enable = true
case "nr1.snmp": // Tune for snmp sending.
cfg.Compression = "gzip"
cfg.Sinks = "new_relic"
cfg.Sinks = []string{"new_relic"}
cfg.Format = "new_relic_metric"
cfg.MaxFlowsPerMessage = 100
default:
Expand Down Expand Up @@ -298,7 +298,7 @@ func applyFlags(cfg *ktranslate.Config) error {
case "compression":
cfg.Compression = val
case "sinks":
cfg.Sinks = val
cfg.Sinks = strings.Split(val, ",")
case "max_flows_per_message":
v, err := strconv.Atoi(val)
if err != nil {
Expand Down Expand Up @@ -351,7 +351,7 @@ func applyFlags(cfg *ktranslate.Config) error {
}
cfg.EnableSNMPDiscovery = v
case "kentik_email":
cfg.KentikEmail = val
cfg.KentikCreds = []ktranslate.KentikCred{ktranslate.KentikCred{ApiEmail: val, ApiToken: os.Getenv(ktranslate.KentikAPITokenEnvVar)}}
case "api_root":
cfg.APIBaseURL = val
case "kentik_plan":
Expand Down
31 changes: 25 additions & 6 deletions config.go
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,17 @@ type NewRelicSinkConfig struct {
ValidateJSON bool
}

// NewRelicMultiSinkConfig is the config for Multi New Relic
type NewRelicMultiSinkConfig struct {
CredMap map[int]NRCred
}

// NRCred exposes a list of NR creds.
type NRCred struct {
NRAccount string
NRApiToken string
}

// FileSinkConfig is the config for the file sink
type FileSinkConfig struct {
Path string
Expand Down Expand Up @@ -184,6 +195,11 @@ type FlowInputConfig struct {
MappingFile string
}

type KentikCred struct {
ApiEmail string
ApiToken string
}

// Config is the ktranslate configuration
type Config struct {
// ktranslate
Expand All @@ -201,15 +217,14 @@ type Config struct {
FormatRollup string
FormatMetric string
Compression string
Sinks string
Sinks []string
MaxFlowsPerMessage int
RollupInterval int
RollupAndAlpha bool
SampleRate int
SampleMin int
EnableSNMPDiscovery bool
KentikEmail string
KentikAPIToken string
KentikCreds []KentikCred
KentikPlan int
APIBaseURL string
SSLCertFile string
Expand Down Expand Up @@ -238,6 +253,8 @@ type Config struct {
NetSink *NetSinkConfig
// pkg/sinks/nr
NewRelicSink *NewRelicSinkConfig
// pkg/sinks/nrmulti
NewRelicMultiSink *NewRelicMultiSinkConfig
// pkg/sinks/file
FileSink *FileSinkConfig
// pkg/sinks/gcppubsub
Expand Down Expand Up @@ -288,15 +305,14 @@ func DefaultConfig() *Config {
Format: "flat_json",
FormatRollup: "",
Compression: "none",
Sinks: "stdout",
Sinks: []string{"stdout"},
MaxFlowsPerMessage: 10000,
RollupInterval: 0,
RollupAndAlpha: false,
SampleRate: 1,
SampleMin: 1,
EnableSNMPDiscovery: false,
KentikEmail: "",
KentikAPIToken: os.Getenv(KentikAPITokenEnvVar),
KentikCreds: nil,
KentikPlan: 0,
APIBaseURL: "https://api.kentik.com",
SSLCertFile: "",
Expand Down Expand Up @@ -340,6 +356,9 @@ func DefaultConfig() *Config {
Region: "",
ValidateJSON: false,
},
NewRelicMultiSink: &NewRelicMultiSinkConfig{
CredMap: nil,
},
FileSink: &FileSinkConfig{
Path: "./",
EnableImmediateWrite: false,
Expand Down
144 changes: 77 additions & 67 deletions pkg/api/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,13 +54,12 @@ type KentikApi struct {
setTime time.Time
apiTimeout time.Duration
synClient synthetics.SyntheticsAdminServiceClient
conf *kt.KentikConfig
mux sync.RWMutex
lastSynth time.Time
config *ktranslate.APIConfig
config *ktranslate.Config
}

func NewKentikApi(ctx context.Context, conf *kt.KentikConfig, log logger.ContextL, cfg *ktranslate.APIConfig) (*KentikApi, error) {
func NewKentikApi(ctx context.Context, log logger.ContextL, cfg *ktranslate.Config) (*KentikApi, error) {
apiTimeoutStr := os.Getenv(kt.KentikAPITimeout)
apiTimeout := API_TIMEOUT
if apiTimeoutStr != "" {
Expand All @@ -81,7 +80,6 @@ func NewKentikApi(ctx context.Context, conf *kt.KentikConfig, log logger.Context

kapi := &KentikApi{
ContextL: log,
conf: conf,
tr: tr,
client: client,
apiTimeout: apiTimeout,
Expand All @@ -103,9 +101,9 @@ func NewKentikApi(ctx context.Context, conf *kt.KentikConfig, log logger.Context
return kapi, err
}

func (api *KentikApi) getDeviceInfo(ctx context.Context, apiUrl string) ([]byte, error) {
if api.conf.ApiEmail == "" { // If no creds, use fake file.
if v := api.config.DeviceFile; v != "" {
func (api *KentikApi) getDeviceInfo(ctx context.Context, apiUrl string, apiEmail string, apiToken string) ([]byte, error) {
if apiEmail == "" {
if v := api.config.API.DeviceFile; v != "" {
api.Infof("Reading devices from local file: %s", v)
return os.ReadFile(v)
}
Expand All @@ -120,8 +118,8 @@ func (api *KentikApi) getDeviceInfo(ctx context.Context, apiUrl string) ([]byte,

userAgentString := USER_AGENT_BASE

req.Header.Add(API_EMAIL_HEADER, api.conf.ApiEmail)
req.Header.Add(API_PASSWORD_HEADER, api.conf.ApiToken)
req.Header.Add(API_EMAIL_HEADER, apiEmail)
req.Header.Add(API_PASSWORD_HEADER, apiToken)
req.Header.Add(HTTP_USER_AGENT, userAgentString+" AGENT")

resp, err := api.client.Do(req)
Expand Down Expand Up @@ -165,7 +163,7 @@ func (api *KentikApi) getDeviceInfo(ctx context.Context, apiUrl string) ([]byte,
}

func (api *KentikApi) UpdateTests(ctx context.Context) {
if api == nil || api.conf == nil {
if api == nil || len(api.config.KentikCreds) == 0 {
return
}

Expand Down Expand Up @@ -238,29 +236,33 @@ func (api *KentikApi) GetDevice(cid kt.Cid, did kt.DeviceID) *kt.Device {

func (api *KentikApi) getDevices(ctx context.Context) error {
stime := time.Now()
res, err := api.getDeviceInfo(ctx, api.conf.ApiRoot+"/api/internal/devices")
if err != nil {
return err
}
var devices kt.DeviceList
err = json.Unmarshal(res, &devices)
if err != nil {
return err
}

resDev := map[kt.Cid]kt.Devices{}
num := 0
for _, device := range devices.Devices {
myd := device
if _, ok := resDev[device.CompanyID]; !ok {
resDev[device.CompanyID] = map[kt.DeviceID]*kt.Device{}
for _, info := range api.config.KentikCreds {
res, err := api.getDeviceInfo(ctx, api.config.APIBaseURL+"/api/internal/devices", info.ApiEmail, info.ApiToken)
if err != nil {
return err
}
device.Interfaces = map[kt.IfaceID]kt.Interface{}
for _, intf := range device.AllInterfaces {
device.Interfaces[intf.SnmpID] = intf
var devices kt.DeviceList
err = json.Unmarshal(res, &devices)
if err != nil {
return err
}
resDev[device.CompanyID][device.ID] = &myd
num++

for _, device := range devices.Devices {
myd := device
if _, ok := resDev[device.CompanyID]; !ok {
resDev[device.CompanyID] = map[kt.DeviceID]*kt.Device{}
}
device.Interfaces = map[kt.IfaceID]kt.Interface{}
for _, intf := range device.AllInterfaces {
device.Interfaces[intf.SnmpID] = intf
}
resDev[device.CompanyID][device.ID] = &myd
num++
}

api.Infof("Loaded %d Kentik devices via API for %s", len(devices.Devices), info.ApiEmail)
}

api.setTime = time.Now()
Expand Down Expand Up @@ -332,7 +334,7 @@ func (api *KentikApi) connectSynth(ctxIn context.Context) error {
return err
}

address, err := getAddressFromApiRoot(api.conf.ApiRoot)
address, err := getAddressFromApiRoot(api.config.APIBaseURL)
if err != nil {
return err
}
Expand All @@ -355,57 +357,61 @@ func (api *KentikApi) getSynthInfo(ctx context.Context) error {
defer api.mux.Unlock()
api.lastSynth = time.Now()

md := metadata.New(map[string]string{
"X-CH-Auth-Email": api.conf.ApiEmail,
"X-CH-Auth-API-Token": api.conf.ApiToken,
})
ctxo := metadata.NewOutgoingContext(ctx, md)

lt := &synthetics.ListTestsRequest{}
r, err := api.synClient.ListTests(ctxo, lt)
if err != nil {
return err
}

synTests := map[kt.TestId]*synthetics.Test{}
for _, test := range r.GetTests() {
localt := test
synTests[kt.NewTestId(test.GetId())] = localt
}

la := &synthetics.ListAgentsRequest{}
ra, err := api.synClient.ListAgents(ctxo, la)
if err != nil {
return err
}

synAgents := map[kt.AgentId]*synthetics.Agent{}
synAgentsByIP := map[string]*synthetics.Agent{}
for _, agent := range ra.GetAgents() {
locala := agent
synAgents[kt.NewAgentId(agent.GetId())] = locala
lip := locala.GetLocalIp() // Store local ip seperately from public one, if a local is set.
if lip != "" {
synAgentsByIP[lip] = locala
for _, info := range api.config.KentikCreds {
md := metadata.New(map[string]string{
"X-CH-Auth-Email": info.ApiEmail,
"X-CH-Auth-API-Token": info.ApiToken,
})
ctxo := metadata.NewOutgoingContext(ctx, md)

lt := &synthetics.ListTestsRequest{}
r, err := api.synClient.ListTests(ctxo, lt)
if err != nil {
return err
}
synAgentsByIP[locala.GetIp()] = locala

for _, test := range r.GetTests() {
localt := test
synTests[kt.NewTestId(test.GetId())] = localt
}

la := &synthetics.ListAgentsRequest{}
ra, err := api.synClient.ListAgents(ctxo, la)
if err != nil {
return err
}

for _, agent := range ra.GetAgents() {
locala := agent
synAgents[kt.NewAgentId(agent.GetId())] = locala
lip := locala.GetLocalIp() // Store local ip seperately from public one, if a local is set.
if lip != "" {
synAgentsByIP[lip] = locala
}
synAgentsByIP[locala.GetIp()] = locala
}

api.Infof("Loaded %d Kentik Tests and %d Agents via API for %s", len(r.GetTests()), len(ra.GetAgents()), info.ApiEmail)
}

api.synAgents = synAgents
api.synAgentsByIP = synAgentsByIP
api.synTests = synTests
api.Infof("Loaded %d Kentik Tests and %d Agents via API", len(api.synTests), len(api.synAgents))
api.Infof("Loaded %d Kentik Tests and %d Agents Total via API", len(api.synTests), len(api.synAgents))

return nil
}

func (api *KentikApi) EnsureDevice(ctx context.Context, conf *kt.SnmpDeviceConfig) error {
if api == nil || api.conf == nil {
if api == nil || len(api.config.KentikCreds) == 0 {
return nil
}

// If there's no plan id to create devices on, just silently return here.
if api.conf.ApiPlan == 0 {
if api.config.KentikPlan == 0 {
return nil
}

Expand All @@ -431,13 +437,13 @@ func (api *KentikApi) EnsureDevice(ctx context.Context, conf *kt.SnmpDeviceConfi
Description: desc,
SampleRate: 1,
BgpType: "none",
PlanID: api.conf.ApiPlan,
PlanID: api.config.KentikPlan,
IPs: []net.IP{net.ParseIP(conf.DeviceIP)},
Subtype: "router",
MinSnmp: false,
}

err := api.createDevice(ctx, dev, api.conf.ApiRoot+"/api/v5/device")
err := api.createDevice(ctx, dev, api.config.APIBaseURL+"/api/v5/device")
if err != nil {
return err
}
Expand All @@ -446,6 +452,10 @@ func (api *KentikApi) EnsureDevice(ctx context.Context, conf *kt.SnmpDeviceConfi
}

func (api *KentikApi) createDevice(ctx context.Context, create *deviceCreate, url string) error {
if len(api.config.KentikCreds) == 0 {
return fmt.Errorf("No API Credencials Specified.")
}

payload, err := json.Marshal(map[string]*deviceCreate{
"device": create,
})
Expand All @@ -460,8 +470,8 @@ func (api *KentikApi) createDevice(ctx context.Context, create *deviceCreate, ur
}

userAgentString := USER_AGENT_BASE
req.Header.Add(API_EMAIL_HEADER, api.conf.ApiEmail)
req.Header.Add(API_PASSWORD_HEADER, api.conf.ApiToken)
req.Header.Add(API_EMAIL_HEADER, api.config.KentikCreds[0].ApiEmail)
req.Header.Add(API_PASSWORD_HEADER, api.config.KentikCreds[0].ApiToken)
req.Header.Add(HTTP_USER_AGENT, userAgentString+" AGENT")
req.Header.Add("Content-Type", "application/json")

Expand Down
Loading

0 comments on commit 8223045

Please sign in to comment.