Skip to content

Commit

Permalink
Make Kentik / KFlow a normal sink (#445)
Browse files Browse the repository at this point in the history
Can now send to kentik with `-format kflow -sinks kentik`
  • Loading branch information
i3149 authored Oct 5, 2022
1 parent 33594a7 commit ec609bf
Show file tree
Hide file tree
Showing 15 changed files with 203 additions and 166 deletions.
2 changes: 1 addition & 1 deletion cmd/ktranslate/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -351,7 +351,7 @@ func applyFlags(cfg *ktranslate.Config) error {
}
cfg.EnableSNMPDiscovery = v
case "kentik_email":
cfg.KentikCreds = []ktranslate.KentikCred{ktranslate.KentikCred{ApiEmail: val, ApiToken: os.Getenv(ktranslate.KentikAPITokenEnvVar)}}
cfg.KentikCreds = []ktranslate.KentikCred{ktranslate.KentikCred{APIEmail: val, APIToken: os.Getenv(ktranslate.KentikAPITokenEnvVar)}}
case "api_root":
cfg.APIBaseURL = val
case "kentik_plan":
Expand Down
5 changes: 3 additions & 2 deletions config.go
Original file line number Diff line number Diff line change
Expand Up @@ -195,9 +195,10 @@ type FlowInputConfig struct {
MappingFile string
}

// KentikCred is information needed to auth the Kentik API.
type KentikCred struct {
ApiEmail string
ApiToken string
APIEmail string
APIToken string
}

// Config is the ktranslate configuration
Expand Down
14 changes: 7 additions & 7 deletions pkg/api/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -239,7 +239,7 @@ func (api *KentikApi) getDevices(ctx context.Context) error {
resDev := map[kt.Cid]kt.Devices{}
num := 0
for _, info := range api.config.KentikCreds {
res, err := api.getDeviceInfo(ctx, api.config.APIBaseURL+"/api/internal/devices", info.ApiEmail, info.ApiToken)
res, err := api.getDeviceInfo(ctx, api.config.APIBaseURL+"/api/internal/devices", info.APIEmail, info.APIToken)
if err != nil {
return err
}
Expand All @@ -262,7 +262,7 @@ func (api *KentikApi) getDevices(ctx context.Context) error {
num++
}

api.Infof("Loaded %d Kentik devices via API for %s", len(devices.Devices), info.ApiEmail)
api.Infof("Loaded %d Kentik devices via API for %s", len(devices.Devices), info.APIEmail)
}

api.setTime = time.Now()
Expand Down Expand Up @@ -362,8 +362,8 @@ func (api *KentikApi) getSynthInfo(ctx context.Context) error {
synAgentsByIP := map[string]*synthetics.Agent{}
for _, info := range api.config.KentikCreds {
md := metadata.New(map[string]string{
"X-CH-Auth-Email": info.ApiEmail,
"X-CH-Auth-API-Token": info.ApiToken,
"X-CH-Auth-Email": info.APIEmail,
"X-CH-Auth-API-Token": info.APIToken,
})
ctxo := metadata.NewOutgoingContext(ctx, md)

Expand Down Expand Up @@ -394,7 +394,7 @@ func (api *KentikApi) getSynthInfo(ctx context.Context) error {
synAgentsByIP[locala.GetIp()] = locala
}

api.Infof("Loaded %d Kentik Tests and %d Agents via API for %s", len(r.GetTests()), len(ra.GetAgents()), info.ApiEmail)
api.Infof("Loaded %d Kentik Tests and %d Agents via API for %s", len(r.GetTests()), len(ra.GetAgents()), info.APIEmail)
}

api.synAgents = synAgents
Expand Down Expand Up @@ -470,8 +470,8 @@ func (api *KentikApi) createDevice(ctx context.Context, create *deviceCreate, ur
}

userAgentString := USER_AGENT_BASE
req.Header.Add(API_EMAIL_HEADER, api.config.KentikCreds[0].ApiEmail)
req.Header.Add(API_PASSWORD_HEADER, api.config.KentikCreds[0].ApiToken)
req.Header.Add(API_EMAIL_HEADER, api.config.KentikCreds[0].APIEmail)
req.Header.Add(API_PASSWORD_HEADER, api.config.KentikCreds[0].APIToken)
req.Header.Add(HTTP_USER_AGENT, userAgentString+" AGENT")
req.Header.Add("Content-Type", "application/json")

Expand Down
10 changes: 10 additions & 0 deletions pkg/cat/auth/auth.go
Original file line number Diff line number Diff line change
Expand Up @@ -109,6 +109,16 @@ func (s *Server) GetDeviceMap() map[string]*kt.Device {
return s.devicesByIP
}

func (s *Server) AddDevices(devices map[string]*kt.Device) {
for _, device := range devices {
s.devicesByName[device.ID.Itoa()] = device
for _, ip := range device.SendingIps {
s.devicesByIP[ip.String()] = device
}
}
s.log.Infof("API server running %d devices after remote fetch", len(s.devicesByName))
}

func (s *Server) getDevice(query string) *kt.Device {
// Try finding this device directly by its ID
device, ok := s.devicesByName[query]
Expand Down
77 changes: 3 additions & 74 deletions pkg/cat/jchf.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,62 +20,6 @@ var (
DEFAULT_GEO_PACKED = patricia.PackGeo(DEFAULT_GEO)
)

func (kc *KTranslate) lookupGeo(ipv4 uint32, ipv6 []byte) (string, error) {
if ipv4 != 0 {
return kc.geo.SearchBestFromHostGeo(net.IPv4(byte(ipv4>>24), byte(ipv4>>16), byte(ipv4>>8), byte(ipv4)))
}
return kc.geo.SearchBestFromHostGeo(net.IP(ipv6))
}

func (kc *KTranslate) lookupAsn(ipv4 uint32, ipv6 []byte) (uint32, string, error) {
if ipv4 != 0 {
return kc.asn.SearchBestFromHostAsn(net.IPv4(byte(ipv4>>24), byte(ipv4>>16), byte(ipv4>>8), byte(ipv4)))
}
return kc.geo.SearchBestFromHostAsn(net.IP(ipv6))
}

func (kc *KTranslate) setGeoAsn(src *Flow) (string, string) {
var srcName, dstName string

// Fetch our own geo if not already set.
if kc.geo != nil {
if src.CHF.SrcGeo() == 0 || src.CHF.SrcGeo() == DEFAULT_GEO_PACKED {
ipv6, _ := src.CHF.Ipv6SrcAddr()
if srcGeo, err := kc.lookupGeo(src.CHF.Ipv4SrcAddr(), ipv6); err == nil {
src.CHF.SetSrcGeo(patricia.PackGeo([]byte(srcGeo)))
}
}

if src.CHF.DstGeo() == 0 || src.CHF.DstGeo() == DEFAULT_GEO_PACKED {
ipv6, _ := src.CHF.Ipv6DstAddr()
if dstGeo, err := kc.lookupGeo(src.CHF.Ipv4DstAddr(), ipv6); err == nil {
src.CHF.SetDstGeo(patricia.PackGeo([]byte(dstGeo)))
}
}
}

// And set our own asn also if not set.
if kc.asn != nil {
ipv6, _ := src.CHF.Ipv6SrcAddr()
if asn, name, err := kc.lookupAsn(src.CHF.Ipv4SrcAddr(), ipv6); err == nil {
if src.CHF.SrcAs() == 0 {
src.CHF.SetSrcAs(asn)
}
srcName = name
}

ipv6, _ = src.CHF.Ipv6DstAddr()
if asn, name, err := kc.lookupAsn(src.CHF.Ipv4DstAddr(), ipv6); err == nil {
if src.CHF.DstAs() == 0 {
src.CHF.SetDstAs(asn)
}
dstName = name
}
}

return srcName, dstName
}

func (kc *KTranslate) getEventType(dst *kt.JCHF) string {

// if app_proto is 12, this is snmp and return as such.
Expand Down Expand Up @@ -121,28 +65,12 @@ func (kc *KTranslate) getProviderType(dst *kt.JCHF) kt.Provider {
return kt.ProviderFlowDevice
}

func (kc *KTranslate) flowToJCHF(ctx context.Context, citycache map[uint32]string, regioncache map[uint32]string, dst *kt.JCHF, src *Flow, currentTS int64, tagcache map[uint64]string) error {
func (kc *KTranslate) flowToJCHF(ctx context.Context, dst *kt.JCHF, src *Flow, currentTS int64, tagcache map[uint64]string) error {

dst.CustomStr = make(map[string]string)
dst.CustomInt = make(map[string]int32)
dst.CustomBigInt = make(map[string]int64)

// In the direct case, users can map their own asn/geo values into here.
if kc.geo != nil || kc.asn != nil {
srcAsnName, dstAsnName := kc.setGeoAsn(src)
if srcAsnName != "" {
dst.CustomStr["src_as_name"] = srcAsnName
} else {
dst.CustomStr["src_as_name"] = strconv.Itoa(int(src.CHF.SrcAs()))
}

if dstAsnName != "" {
dst.CustomStr["dst_as_name"] = dstAsnName
} else {
dst.CustomStr["dst_as_name"] = strconv.Itoa(int(src.CHF.DstAs()))
}
}

// dst.Timestamp = src.CHF.Timestamp() This is being strage, use current timestamp for now.
dst.Timestamp = currentTS
dst.DstAs = src.CHF.DstAs()
Expand Down Expand Up @@ -179,6 +107,7 @@ func (kc *KTranslate) flowToJCHF(ctx context.Context, citycache map[uint32]strin
dst.TcpRetransmit = src.CHF.TcpRetransmit()
dst.SampleRate = src.CHF.SampleRate() / 100 // Reduce by 100 to get actual rate.
dst.DeviceId = kt.DeviceID(src.CHF.DeviceId())
dst.DeviceName = src.DeviceName
dst.CompanyId = kt.Cid(src.CompanyId)
dst.SrcNextHopAs = src.CHF.SrcNextHopAs()
dst.DstNextHopAs = src.CHF.DstNextHopAs()
Expand Down Expand Up @@ -529,7 +458,7 @@ var (
)

// Updates asn and geo if set for any of these inputs.
func (kc *KTranslate) doEnrichments(ctx context.Context, citycache map[uint32]string, regioncache map[uint32]string, msgs []*kt.JCHF) {
func (kc *KTranslate) doEnrichments(ctx context.Context, msgs []*kt.JCHF) {
for _, msg := range msgs {
sip := net.ParseIP(msg.SrcAddr)
dip := net.ParseIP(msg.DstAddr)
Expand Down
28 changes: 20 additions & 8 deletions pkg/cat/kentik.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import (
"math/rand"
"net/http"
"strconv"
"strings"
"time"

"github.com/kentik/ktranslate/pkg/kt"
Expand Down Expand Up @@ -196,14 +197,18 @@ func (kc *KTranslate) handleFlow(w http.ResponseWriter, r *http.Request) {

// If we are sending from before kentik, add offset in here.
offset := 0
did := 0
deviceName := ""
if senderId != "" && len(evt) > MSG_KEY_PREFIX && // Direct flow without enrichment.
(evt[0] == 0x00 && evt[1] == 0x00 && evt[2] == 0x00 && evt[3] == 0x00 && evt[4] == 0x00) { // Double check with this
offset = MSG_KEY_PREFIX
}
pts := strings.Split(senderId, ":")
if len(pts) == 3 {
cid, _ = strconv.Atoi(strings.TrimSpace(pts[0]))
deviceName = pts[1]
did, _ = strconv.Atoi(strings.TrimSpace(pts[2]))
}

// If we have a kentik sink, send on here.
if kc.kentik != nil {
go kc.kentik.SendKentik(evt, cid, senderId, offset)
}

// decompress and read (capnproto "packed" representation)
Expand Down Expand Up @@ -233,9 +238,13 @@ func (kc *KTranslate) handleFlow(w http.ResponseWriter, r *http.Request) {
if !msg.SampleAdj() {
msg.SetSampleRate(msg.SampleRate() * 100) // Apply re-sample trick here.
}
if msg.DeviceId() == 0 && senderId != "" {
// Fill in from the parsed senderId.
msg.SetDeviceId(uint32(did))
}

// send without blocking, dropping the message if the channel buffer is full
alpha := &Flow{CompanyId: cid, CHF: msg}
alpha := &Flow{CompanyId: cid, CHF: msg, DeviceName: deviceName}
select {
case kc.alphaChans[next] <- alpha:
sent++
Expand All @@ -260,8 +269,6 @@ func (kc *KTranslate) monitorAlphaChan(ctx context.Context, i int, seri func([]*
defer sendTicker.Stop()

// Set up some data structures.
citycache := map[uint32]string{}
regioncache := map[uint32]string{}
tagcache := map[uint64]string{}
serBuf := make([]byte, 0)
msgs := make([]*kt.JCHF, 0)
Expand All @@ -270,6 +277,11 @@ func (kc *KTranslate) monitorAlphaChan(ctx context.Context, i int, seri func([]*
return
}

// Add in any extra things here.
if kc.geo != nil || kc.asn != nil {
kc.doEnrichments(ctx, msgs)
}

// If we have any rollups defined, send here instead of directly to the output format.
if kc.doRollups {
rv := make([]map[string]interface{}, len(msgs))
Expand Down Expand Up @@ -320,7 +332,7 @@ func (kc *KTranslate) monitorAlphaChan(ctx context.Context, i int, seri func([]*
case f := <-kc.alphaChans[i]:
select {
case jflow := <-kc.jchfChans[i]: // non blocking select on this chan.
err := kc.flowToJCHF(ctx, citycache, regioncache, jflow, f, currentTime, tagcache)
err := kc.flowToJCHF(ctx, jflow, f, currentTime, tagcache)
if err != nil {
kc.log.Errorf("There was an error when converting to json: %v.", err)
jflow.Reset()
Expand Down
25 changes: 8 additions & 17 deletions pkg/cat/kkc.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,6 @@ import (
"github.com/kentik/ktranslate/pkg/maps"
"github.com/kentik/ktranslate/pkg/rollup"
ss "github.com/kentik/ktranslate/pkg/sinks"
"github.com/kentik/ktranslate/pkg/sinks/kentik"
"github.com/kentik/ktranslate/pkg/util/enrich"
"github.com/kentik/ktranslate/pkg/util/gopatricia/patricia"
"github.com/kentik/ktranslate/pkg/util/resolv"
Expand Down Expand Up @@ -169,11 +168,6 @@ func NewKTranslate(config *ktranslate.Config, log logger.ContextL, registry go_m
}
kc.sinks[sink] = snk
kc.log.Infof("Using sink %s", sink)

// Kentik gets special cased
if sink == ss.KentikSink {
kc.kentik = snk.(*kentik.KentikSink)
}
}

// IP based rules
Expand Down Expand Up @@ -385,9 +379,9 @@ func (kc *KTranslate) sendToSinks(ctx context.Context) error {
}

// This processes data from the non-kentik input sets.
func (kc *KTranslate) handleInput(ctx context.Context, msgs []*kt.JCHF, serBuf []byte, citycache map[uint32]string, regioncache map[uint32]string, cb func(error), seri func([]*kt.JCHF, []byte) (*kt.Output, error)) {
func (kc *KTranslate) handleInput(ctx context.Context, msgs []*kt.JCHF, serBuf []byte, cb func(error), seri func([]*kt.JCHF, []byte) (*kt.Output, error)) {
if kc.geo != nil || kc.asn != nil {
kc.doEnrichments(ctx, citycache, regioncache, msgs)
kc.doEnrichments(ctx, msgs)
}

// If we are filtering, cut any out here.
Expand Down Expand Up @@ -473,13 +467,11 @@ func (kc *KTranslate) watchInput(ctx context.Context, seri func([]*kt.JCHF, []by
func (kc *KTranslate) monitorInput(ctx context.Context, num int, seri func([]*kt.JCHF, []byte) (*kt.Output, error)) {
kc.log.Infof("monitorInput %d Starting", num)
serBuf := make([]byte, 0)
citycache := map[uint32]string{}
regioncache := map[uint32]string{}

for {
select {
case msgs := <-kc.inputChan:
kc.handleInput(ctx, msgs, serBuf, citycache, regioncache, nil, seri)
kc.handleInput(ctx, msgs, serBuf, nil, seri)
case <-ctx.Done():
kc.log.Infof("monitorInput %d Done", num)
return
Expand All @@ -490,13 +482,11 @@ func (kc *KTranslate) monitorInput(ctx context.Context, num int, seri func([]*kt
func (kc *KTranslate) monitorMetricsInput(ctx context.Context, seri func([]*kt.JCHF, []byte) (*kt.Output, error)) {
kc.log.Infof("monitorMetricsInput Starting")
serBuf := make([]byte, 0)
citycache := map[uint32]string{}
regioncache := map[uint32]string{}

for {
select {
case msgs := <-kc.metricsChan:
kc.handleInput(ctx, msgs, serBuf, citycache, regioncache, nil, seri)
kc.handleInput(ctx, msgs, serBuf, nil, seri)
case <-ctx.Done():
kc.log.Infof("monitorMetricsInput Done")
return
Expand Down Expand Up @@ -623,6 +613,9 @@ func (kc *KTranslate) Run(ctx context.Context) error {
return err
}
kc.apic = apic
if kc.auth != nil {
kc.auth.AddDevices(apic.GetDevicesAsMap(0)) // Load all these up to be authed also.
}
} else {
kc.apic = api.NewKentikApiFromLocalDevices(kc.auth.GetDeviceMap(), kc.log)
}
Expand Down Expand Up @@ -657,10 +650,8 @@ func (kc *KTranslate) Run(ctx context.Context) error {
if kc.config.GCPVPCInput.Enable || kc.config.AWSVPCInput.Enable {
assureInput()
serBufInput := make([]byte, 0)
citycacheInput := map[uint32]string{}
regioncacheInput := map[uint32]string{}
handler := func(msgs []*kt.JCHF, cb func(error)) { // Capture this in a closure.
kc.handleInput(ctx, msgs, serBufInput, citycacheInput, regioncacheInput, cb, kc.format.To)
kc.handleInput(ctx, msgs, serBufInput, cb, kc.format.To)
}
var vpcSource vpc.CloudSource
if kc.config.GCPVPCInput.Enable && kc.config.AWSVPCInput.Enable {
Expand Down
7 changes: 3 additions & 4 deletions pkg/cat/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@ import (
go_metrics "github.com/kentik/go-metrics"
"github.com/kentik/ktranslate"
"github.com/kentik/ktranslate/pkg/eggs/logger"
"github.com/kentik/ktranslate/pkg/sinks/kentik"

"github.com/kentik/ktranslate/pkg/api"
"github.com/kentik/ktranslate/pkg/cat/auth"
Expand Down Expand Up @@ -60,7 +59,6 @@ type KTranslate struct {
sinks map[sinks.Sink]sinks.SinkImpl
format formats.Formatter
formatRollup formats.Formatter
kentik *kentik.KentikSink // This one gets special handling
rollups []rollup.Roller
doRollups bool
doFilter bool
Expand Down Expand Up @@ -114,8 +112,9 @@ type hc struct {
}

type Flow struct {
CompanyId int
CHF model.CHF
CompanyId int
CHF model.CHF
DeviceName string
}

type KKCMetric struct {
Expand Down
Loading

0 comments on commit ec609bf

Please sign in to comment.