diff --git a/cmd/ktranslate/main.go b/cmd/ktranslate/main.go index 680b586b..3277d6d0 100644 --- a/cmd/ktranslate/main.go +++ b/cmd/ktranslate/main.go @@ -351,7 +351,7 @@ func applyFlags(cfg *ktranslate.Config) error { } cfg.EnableSNMPDiscovery = v case "kentik_email": - cfg.KentikCreds = []ktranslate.KentikCred{ktranslate.KentikCred{ApiEmail: val, ApiToken: os.Getenv(ktranslate.KentikAPITokenEnvVar)}} + cfg.KentikCreds = []ktranslate.KentikCred{ktranslate.KentikCred{APIEmail: val, APIToken: os.Getenv(ktranslate.KentikAPITokenEnvVar)}} case "api_root": cfg.APIBaseURL = val case "kentik_plan": diff --git a/config.go b/config.go index 8475b623..c540a7e4 100644 --- a/config.go +++ b/config.go @@ -195,9 +195,10 @@ type FlowInputConfig struct { MappingFile string } +// KentikCred is information needed to auth the Kentik API. type KentikCred struct { - ApiEmail string - ApiToken string + APIEmail string + APIToken string } // Config is the ktranslate configuration diff --git a/pkg/api/api.go b/pkg/api/api.go index d86fdbd1..c758543e 100644 --- a/pkg/api/api.go +++ b/pkg/api/api.go @@ -239,7 +239,7 @@ func (api *KentikApi) getDevices(ctx context.Context) error { resDev := map[kt.Cid]kt.Devices{} num := 0 for _, info := range api.config.KentikCreds { - res, err := api.getDeviceInfo(ctx, api.config.APIBaseURL+"/api/internal/devices", info.ApiEmail, info.ApiToken) + res, err := api.getDeviceInfo(ctx, api.config.APIBaseURL+"/api/internal/devices", info.APIEmail, info.APIToken) if err != nil { return err } @@ -262,7 +262,7 @@ func (api *KentikApi) getDevices(ctx context.Context) error { num++ } - api.Infof("Loaded %d Kentik devices via API for %s", len(devices.Devices), info.ApiEmail) + api.Infof("Loaded %d Kentik devices via API for %s", len(devices.Devices), info.APIEmail) } api.setTime = time.Now() @@ -362,8 +362,8 @@ func (api *KentikApi) getSynthInfo(ctx context.Context) error { synAgentsByIP := map[string]*synthetics.Agent{} for _, info := range api.config.KentikCreds { md := metadata.New(map[string]string{ - "X-CH-Auth-Email": info.ApiEmail, - "X-CH-Auth-API-Token": info.ApiToken, + "X-CH-Auth-Email": info.APIEmail, + "X-CH-Auth-API-Token": info.APIToken, }) ctxo := metadata.NewOutgoingContext(ctx, md) @@ -394,7 +394,7 @@ func (api *KentikApi) getSynthInfo(ctx context.Context) error { synAgentsByIP[locala.GetIp()] = locala } - api.Infof("Loaded %d Kentik Tests and %d Agents via API for %s", len(r.GetTests()), len(ra.GetAgents()), info.ApiEmail) + api.Infof("Loaded %d Kentik Tests and %d Agents via API for %s", len(r.GetTests()), len(ra.GetAgents()), info.APIEmail) } api.synAgents = synAgents @@ -470,8 +470,8 @@ func (api *KentikApi) createDevice(ctx context.Context, create *deviceCreate, ur } userAgentString := USER_AGENT_BASE - req.Header.Add(API_EMAIL_HEADER, api.config.KentikCreds[0].ApiEmail) - req.Header.Add(API_PASSWORD_HEADER, api.config.KentikCreds[0].ApiToken) + req.Header.Add(API_EMAIL_HEADER, api.config.KentikCreds[0].APIEmail) + req.Header.Add(API_PASSWORD_HEADER, api.config.KentikCreds[0].APIToken) req.Header.Add(HTTP_USER_AGENT, userAgentString+" AGENT") req.Header.Add("Content-Type", "application/json") diff --git a/pkg/cat/auth/auth.go b/pkg/cat/auth/auth.go index 03b73759..2763d8b2 100644 --- a/pkg/cat/auth/auth.go +++ b/pkg/cat/auth/auth.go @@ -109,6 +109,16 @@ func (s *Server) GetDeviceMap() map[string]*kt.Device { return s.devicesByIP } +func (s *Server) AddDevices(devices map[string]*kt.Device) { + for _, device := range devices { + s.devicesByName[device.ID.Itoa()] = device + for _, ip := range device.SendingIps { + s.devicesByIP[ip.String()] = device + } + } + s.log.Infof("API server running %d devices after remote fetch", len(s.devicesByName)) +} + func (s *Server) getDevice(query string) *kt.Device { // Try finding this device directly by its ID device, ok := s.devicesByName[query] diff --git a/pkg/cat/jchf.go b/pkg/cat/jchf.go index 1d295156..46fcf46b 100644 --- a/pkg/cat/jchf.go +++ b/pkg/cat/jchf.go @@ -20,62 +20,6 @@ var ( DEFAULT_GEO_PACKED = patricia.PackGeo(DEFAULT_GEO) ) -func (kc *KTranslate) lookupGeo(ipv4 uint32, ipv6 []byte) (string, error) { - if ipv4 != 0 { - return kc.geo.SearchBestFromHostGeo(net.IPv4(byte(ipv4>>24), byte(ipv4>>16), byte(ipv4>>8), byte(ipv4))) - } - return kc.geo.SearchBestFromHostGeo(net.IP(ipv6)) -} - -func (kc *KTranslate) lookupAsn(ipv4 uint32, ipv6 []byte) (uint32, string, error) { - if ipv4 != 0 { - return kc.asn.SearchBestFromHostAsn(net.IPv4(byte(ipv4>>24), byte(ipv4>>16), byte(ipv4>>8), byte(ipv4))) - } - return kc.geo.SearchBestFromHostAsn(net.IP(ipv6)) -} - -func (kc *KTranslate) setGeoAsn(src *Flow) (string, string) { - var srcName, dstName string - - // Fetch our own geo if not already set. - if kc.geo != nil { - if src.CHF.SrcGeo() == 0 || src.CHF.SrcGeo() == DEFAULT_GEO_PACKED { - ipv6, _ := src.CHF.Ipv6SrcAddr() - if srcGeo, err := kc.lookupGeo(src.CHF.Ipv4SrcAddr(), ipv6); err == nil { - src.CHF.SetSrcGeo(patricia.PackGeo([]byte(srcGeo))) - } - } - - if src.CHF.DstGeo() == 0 || src.CHF.DstGeo() == DEFAULT_GEO_PACKED { - ipv6, _ := src.CHF.Ipv6DstAddr() - if dstGeo, err := kc.lookupGeo(src.CHF.Ipv4DstAddr(), ipv6); err == nil { - src.CHF.SetDstGeo(patricia.PackGeo([]byte(dstGeo))) - } - } - } - - // And set our own asn also if not set. - if kc.asn != nil { - ipv6, _ := src.CHF.Ipv6SrcAddr() - if asn, name, err := kc.lookupAsn(src.CHF.Ipv4SrcAddr(), ipv6); err == nil { - if src.CHF.SrcAs() == 0 { - src.CHF.SetSrcAs(asn) - } - srcName = name - } - - ipv6, _ = src.CHF.Ipv6DstAddr() - if asn, name, err := kc.lookupAsn(src.CHF.Ipv4DstAddr(), ipv6); err == nil { - if src.CHF.DstAs() == 0 { - src.CHF.SetDstAs(asn) - } - dstName = name - } - } - - return srcName, dstName -} - func (kc *KTranslate) getEventType(dst *kt.JCHF) string { // if app_proto is 12, this is snmp and return as such. @@ -121,28 +65,12 @@ func (kc *KTranslate) getProviderType(dst *kt.JCHF) kt.Provider { return kt.ProviderFlowDevice } -func (kc *KTranslate) flowToJCHF(ctx context.Context, citycache map[uint32]string, regioncache map[uint32]string, dst *kt.JCHF, src *Flow, currentTS int64, tagcache map[uint64]string) error { +func (kc *KTranslate) flowToJCHF(ctx context.Context, dst *kt.JCHF, src *Flow, currentTS int64, tagcache map[uint64]string) error { dst.CustomStr = make(map[string]string) dst.CustomInt = make(map[string]int32) dst.CustomBigInt = make(map[string]int64) - // In the direct case, users can map their own asn/geo values into here. - if kc.geo != nil || kc.asn != nil { - srcAsnName, dstAsnName := kc.setGeoAsn(src) - if srcAsnName != "" { - dst.CustomStr["src_as_name"] = srcAsnName - } else { - dst.CustomStr["src_as_name"] = strconv.Itoa(int(src.CHF.SrcAs())) - } - - if dstAsnName != "" { - dst.CustomStr["dst_as_name"] = dstAsnName - } else { - dst.CustomStr["dst_as_name"] = strconv.Itoa(int(src.CHF.DstAs())) - } - } - // dst.Timestamp = src.CHF.Timestamp() This is being strage, use current timestamp for now. dst.Timestamp = currentTS dst.DstAs = src.CHF.DstAs() @@ -179,6 +107,7 @@ func (kc *KTranslate) flowToJCHF(ctx context.Context, citycache map[uint32]strin dst.TcpRetransmit = src.CHF.TcpRetransmit() dst.SampleRate = src.CHF.SampleRate() / 100 // Reduce by 100 to get actual rate. dst.DeviceId = kt.DeviceID(src.CHF.DeviceId()) + dst.DeviceName = src.DeviceName dst.CompanyId = kt.Cid(src.CompanyId) dst.SrcNextHopAs = src.CHF.SrcNextHopAs() dst.DstNextHopAs = src.CHF.DstNextHopAs() @@ -529,7 +458,7 @@ var ( ) // Updates asn and geo if set for any of these inputs. -func (kc *KTranslate) doEnrichments(ctx context.Context, citycache map[uint32]string, regioncache map[uint32]string, msgs []*kt.JCHF) { +func (kc *KTranslate) doEnrichments(ctx context.Context, msgs []*kt.JCHF) { for _, msg := range msgs { sip := net.ParseIP(msg.SrcAddr) dip := net.ParseIP(msg.DstAddr) diff --git a/pkg/cat/kentik.go b/pkg/cat/kentik.go index def8db8a..6482a80a 100644 --- a/pkg/cat/kentik.go +++ b/pkg/cat/kentik.go @@ -9,6 +9,7 @@ import ( "math/rand" "net/http" "strconv" + "strings" "time" "github.com/kentik/ktranslate/pkg/kt" @@ -196,14 +197,18 @@ func (kc *KTranslate) handleFlow(w http.ResponseWriter, r *http.Request) { // If we are sending from before kentik, add offset in here. offset := 0 + did := 0 + deviceName := "" if senderId != "" && len(evt) > MSG_KEY_PREFIX && // Direct flow without enrichment. (evt[0] == 0x00 && evt[1] == 0x00 && evt[2] == 0x00 && evt[3] == 0x00 && evt[4] == 0x00) { // Double check with this offset = MSG_KEY_PREFIX - } + pts := strings.Split(senderId, ":") + if len(pts) == 3 { + cid, _ = strconv.Atoi(strings.TrimSpace(pts[0])) + deviceName = pts[1] + did, _ = strconv.Atoi(strings.TrimSpace(pts[2])) + } - // If we have a kentik sink, send on here. - if kc.kentik != nil { - go kc.kentik.SendKentik(evt, cid, senderId, offset) } // decompress and read (capnproto "packed" representation) @@ -233,9 +238,13 @@ func (kc *KTranslate) handleFlow(w http.ResponseWriter, r *http.Request) { if !msg.SampleAdj() { msg.SetSampleRate(msg.SampleRate() * 100) // Apply re-sample trick here. } + if msg.DeviceId() == 0 && senderId != "" { + // Fill in from the parsed senderId. + msg.SetDeviceId(uint32(did)) + } // send without blocking, dropping the message if the channel buffer is full - alpha := &Flow{CompanyId: cid, CHF: msg} + alpha := &Flow{CompanyId: cid, CHF: msg, DeviceName: deviceName} select { case kc.alphaChans[next] <- alpha: sent++ @@ -260,8 +269,6 @@ func (kc *KTranslate) monitorAlphaChan(ctx context.Context, i int, seri func([]* defer sendTicker.Stop() // Set up some data structures. - citycache := map[uint32]string{} - regioncache := map[uint32]string{} tagcache := map[uint64]string{} serBuf := make([]byte, 0) msgs := make([]*kt.JCHF, 0) @@ -270,6 +277,11 @@ func (kc *KTranslate) monitorAlphaChan(ctx context.Context, i int, seri func([]* return } + // Add in any extra things here. + if kc.geo != nil || kc.asn != nil { + kc.doEnrichments(ctx, msgs) + } + // If we have any rollups defined, send here instead of directly to the output format. if kc.doRollups { rv := make([]map[string]interface{}, len(msgs)) @@ -320,7 +332,7 @@ func (kc *KTranslate) monitorAlphaChan(ctx context.Context, i int, seri func([]* case f := <-kc.alphaChans[i]: select { case jflow := <-kc.jchfChans[i]: // non blocking select on this chan. - err := kc.flowToJCHF(ctx, citycache, regioncache, jflow, f, currentTime, tagcache) + err := kc.flowToJCHF(ctx, jflow, f, currentTime, tagcache) if err != nil { kc.log.Errorf("There was an error when converting to json: %v.", err) jflow.Reset() diff --git a/pkg/cat/kkc.go b/pkg/cat/kkc.go index 81f957ef..3eb9fb3f 100644 --- a/pkg/cat/kkc.go +++ b/pkg/cat/kkc.go @@ -25,7 +25,6 @@ import ( "github.com/kentik/ktranslate/pkg/maps" "github.com/kentik/ktranslate/pkg/rollup" ss "github.com/kentik/ktranslate/pkg/sinks" - "github.com/kentik/ktranslate/pkg/sinks/kentik" "github.com/kentik/ktranslate/pkg/util/enrich" "github.com/kentik/ktranslate/pkg/util/gopatricia/patricia" "github.com/kentik/ktranslate/pkg/util/resolv" @@ -169,11 +168,6 @@ func NewKTranslate(config *ktranslate.Config, log logger.ContextL, registry go_m } kc.sinks[sink] = snk kc.log.Infof("Using sink %s", sink) - - // Kentik gets special cased - if sink == ss.KentikSink { - kc.kentik = snk.(*kentik.KentikSink) - } } // IP based rules @@ -385,9 +379,9 @@ func (kc *KTranslate) sendToSinks(ctx context.Context) error { } // This processes data from the non-kentik input sets. -func (kc *KTranslate) handleInput(ctx context.Context, msgs []*kt.JCHF, serBuf []byte, citycache map[uint32]string, regioncache map[uint32]string, cb func(error), seri func([]*kt.JCHF, []byte) (*kt.Output, error)) { +func (kc *KTranslate) handleInput(ctx context.Context, msgs []*kt.JCHF, serBuf []byte, cb func(error), seri func([]*kt.JCHF, []byte) (*kt.Output, error)) { if kc.geo != nil || kc.asn != nil { - kc.doEnrichments(ctx, citycache, regioncache, msgs) + kc.doEnrichments(ctx, msgs) } // If we are filtering, cut any out here. @@ -473,13 +467,11 @@ func (kc *KTranslate) watchInput(ctx context.Context, seri func([]*kt.JCHF, []by func (kc *KTranslate) monitorInput(ctx context.Context, num int, seri func([]*kt.JCHF, []byte) (*kt.Output, error)) { kc.log.Infof("monitorInput %d Starting", num) serBuf := make([]byte, 0) - citycache := map[uint32]string{} - regioncache := map[uint32]string{} for { select { case msgs := <-kc.inputChan: - kc.handleInput(ctx, msgs, serBuf, citycache, regioncache, nil, seri) + kc.handleInput(ctx, msgs, serBuf, nil, seri) case <-ctx.Done(): kc.log.Infof("monitorInput %d Done", num) return @@ -490,13 +482,11 @@ func (kc *KTranslate) monitorInput(ctx context.Context, num int, seri func([]*kt func (kc *KTranslate) monitorMetricsInput(ctx context.Context, seri func([]*kt.JCHF, []byte) (*kt.Output, error)) { kc.log.Infof("monitorMetricsInput Starting") serBuf := make([]byte, 0) - citycache := map[uint32]string{} - regioncache := map[uint32]string{} for { select { case msgs := <-kc.metricsChan: - kc.handleInput(ctx, msgs, serBuf, citycache, regioncache, nil, seri) + kc.handleInput(ctx, msgs, serBuf, nil, seri) case <-ctx.Done(): kc.log.Infof("monitorMetricsInput Done") return @@ -623,6 +613,9 @@ func (kc *KTranslate) Run(ctx context.Context) error { return err } kc.apic = apic + if kc.auth != nil { + kc.auth.AddDevices(apic.GetDevicesAsMap(0)) // Load all these up to be authed also. + } } else { kc.apic = api.NewKentikApiFromLocalDevices(kc.auth.GetDeviceMap(), kc.log) } @@ -657,10 +650,8 @@ func (kc *KTranslate) Run(ctx context.Context) error { if kc.config.GCPVPCInput.Enable || kc.config.AWSVPCInput.Enable { assureInput() serBufInput := make([]byte, 0) - citycacheInput := map[uint32]string{} - regioncacheInput := map[uint32]string{} handler := func(msgs []*kt.JCHF, cb func(error)) { // Capture this in a closure. - kc.handleInput(ctx, msgs, serBufInput, citycacheInput, regioncacheInput, cb, kc.format.To) + kc.handleInput(ctx, msgs, serBufInput, cb, kc.format.To) } var vpcSource vpc.CloudSource if kc.config.GCPVPCInput.Enable && kc.config.AWSVPCInput.Enable { diff --git a/pkg/cat/types.go b/pkg/cat/types.go index 641e5559..6eac234d 100644 --- a/pkg/cat/types.go +++ b/pkg/cat/types.go @@ -6,7 +6,6 @@ import ( go_metrics "github.com/kentik/go-metrics" "github.com/kentik/ktranslate" "github.com/kentik/ktranslate/pkg/eggs/logger" - "github.com/kentik/ktranslate/pkg/sinks/kentik" "github.com/kentik/ktranslate/pkg/api" "github.com/kentik/ktranslate/pkg/cat/auth" @@ -60,7 +59,6 @@ type KTranslate struct { sinks map[sinks.Sink]sinks.SinkImpl format formats.Formatter formatRollup formats.Formatter - kentik *kentik.KentikSink // This one gets special handling rollups []rollup.Roller doRollups bool doFilter bool @@ -114,8 +112,9 @@ type hc struct { } type Flow struct { - CompanyId int - CHF model.CHF + CompanyId int + CHF model.CHF + DeviceName string } type KKCMetric struct { diff --git a/pkg/formats/kflow/kflow.go b/pkg/formats/kflow/kflow.go index c4bbc365..53bf0265 100644 --- a/pkg/formats/kflow/kflow.go +++ b/pkg/formats/kflow/kflow.go @@ -3,12 +3,15 @@ package kflow import ( "bytes" "compress/gzip" + "encoding/binary" "fmt" "hash/crc32" + "net" "github.com/kentik/ktranslate/pkg/eggs/logger" "github.com/kentik/ktranslate/pkg/kt" "github.com/kentik/ktranslate/pkg/rollup" + patricia "github.com/kentik/ktranslate/pkg/util/gopatricia/patricia" "github.com/kentik/ktranslate/pkg/util/ic" model "github.com/kentik/ktranslate/pkg/util/kflow2" @@ -17,7 +20,7 @@ import ( const ( MSG_KEY_PREFIX = 80 // This many bytes in every rcv message are for the key. - KTRANSLATE_PROTO = 100 + KTRANSLATE_PROTO = 0 KTRANSLATE_MAP_PROTO = 101 kentikDefaultCapnprotoDecodeLimit = 128 << 20 // 128 MiB ) @@ -77,11 +80,16 @@ func (f *KflowFormat) To(flows []*kt.JCHF, serBuf []byte) (*kt.Output, error) { root.SetMsgs(msgs) - cid := [MSG_KEY_PREFIX]byte{} + key := fmt.Sprintf("%d:%s:%d^", flows[0].CompanyId, flows[0].DeviceName, flows[0].DeviceId) + cid := make([]byte, MSG_KEY_PREFIX) + if len(key) < MSG_KEY_PREFIX { + copy(cid, key) + } + buf := bytes.NewBuffer(serBuf) z := gzip.NewWriter(buf) z.Reset(buf) - z.Write(cid[:]) + z.Write(cid) err = capn.NewPackedEncoder(z).Encode(msg) if err != nil { @@ -89,7 +97,7 @@ func (f *KflowFormat) To(flows []*kt.JCHF, serBuf []byte) (*kt.Output, error) { } z.Close() - return kt.NewOutputWithProvider(buf.Bytes(), flows[0].Provider, kt.EventOutput), nil + return kt.NewOutputWithProviderAndCompanySender(buf.Bytes(), flows[0].Provider, flows[0].CompanyId, kt.EventOutput, key[0:len(key)-1]), nil } func (f *KflowFormat) From(raw *kt.Output) ([]map[string]interface{}, error) { @@ -107,6 +115,11 @@ func (f *KflowFormat) From(raw *kt.Output) ([]map[string]interface{}, error) { } evt := bodyBuffer.Bytes() + keyP := bytes.Split(evt[0:MSG_KEY_PREFIX], []byte("^")) + if len(keyP) < 2 { + return nil, fmt.Errorf("Invalid prefix found for kflow: %s", string(evt[0:MSG_KEY_PREFIX])) + } + decoder := capn.NewPackedDecoder(bytes.NewBuffer(evt[MSG_KEY_PREFIX:])) decoder.MaxMessageSize = kentikDefaultCapnprotoDecodeLimit capnprotoMessage, err := decoder.Decode() @@ -133,7 +146,29 @@ func (f *KflowFormat) From(raw *kt.Output) ([]map[string]interface{}, error) { case KTRANSLATE_PROTO: flow := map[string]interface{}{ "timestamp": msg.Timestamp(), + "protocol": ic.PROTO_NAMES[uint16(msg.Protocol())], + "src_geo": fmt.Sprintf("%c%c", msg.SrcGeo()>>8, msg.SrcGeo()&0xFF), + "server_id": keyP[0], } + + // Now the addresses. + var addr net.IP + if msg.Ipv4DstAddr() > 0 { + addr = int2ip(msg.Ipv4DstAddr()) + } else { + ipr, _ := msg.Ipv6DstAddr() + addr = net.IP(ipr) + } + flow["dst_addr"] = addr.String() + + if msg.Ipv4SrcAddr() > 0 { + addr = int2ip(msg.Ipv4SrcAddr()) + } else { + ipr, _ := msg.Ipv6SrcAddr() + addr = net.IP(ipr) + } + flow["src_addr"] = addr.String() + customs, _ := msg.Custom() for i, customsLen := 0, customs.Len(); i < customsLen; i++ { cust := customs.At(i) @@ -176,6 +211,7 @@ func (ff *KflowFormat) pack(f *kt.JCHF, kflow model.CHF, list model.Custom_List, kflow.SetAppProtocol(KTRANSLATE_PROTO) kflow.SetTimestamp(f.Timestamp) kflow.SetDstAs(f.DstAs) + kflow.SetDstGeo(patricia.PackGeo([]byte(f.DstGeo))) kflow.SetHeaderLen(f.HeaderLen) kflow.SetInBytes(f.InBytes) kflow.SetInPkts(f.InPkts) @@ -187,6 +223,7 @@ func (ff *KflowFormat) pack(f *kt.JCHF, kflow model.CHF, list model.Custom_List, kflow.SetProtocol(uint32(ic.PROTO_NUMS[f.Protocol])) kflow.SetSampledPacketSize(f.SampledPacketSize) kflow.SetSrcAs(f.SrcAs) + kflow.SetSrcGeo(patricia.PackGeo([]byte(f.SrcGeo))) kflow.SetTcpFlags(f.TcpFlags) kflow.SetTos(f.Tos) kflow.SetVlanIn(f.VlanIn) @@ -204,6 +241,24 @@ func (ff *KflowFormat) pack(f *kt.JCHF, kflow model.CHF, list model.Custom_List, kflow.SetSrcThirdAsn(f.SrcThirdAsn) kflow.SetDstThirdAsn(f.DstThirdAsn) + sip := net.ParseIP(f.SrcAddr) + dip := net.ParseIP(f.DstAddr) + if dip != nil { + if dip.To4() != nil { + kflow.SetIpv4DstAddr(binary.BigEndian.Uint32(dip.To4())) + } else { + kflow.SetIpv6DstAddr(dip) + } + } + + if sip != nil { + if sip.To4() != nil { + kflow.SetIpv4SrcAddr(binary.BigEndian.Uint32(sip.To4())) + } else { + kflow.SetIpv6SrcAddr(sip) + } + } + next := 0 for key, val := range f.CustomStr { kc := list.At(next) @@ -274,3 +329,9 @@ func (ff *KflowFormat) getIds(flows []*kt.JCHF, kflow model.CHF, seg *capn.Segme // And return the map we used. return ids, nil } + +func int2ip(nn uint32) net.IP { + ip := make(net.IP, 4) + binary.BigEndian.PutUint32(ip, nn) + return ip +} diff --git a/pkg/formats/kflow/kflow_test.go b/pkg/formats/kflow/kflow_test.go index 011fbbcf..46d2a66f 100644 --- a/pkg/formats/kflow/kflow_test.go +++ b/pkg/formats/kflow/kflow_test.go @@ -31,6 +31,9 @@ func TestSeriToJflow(t *testing.T) { assert.Equal(len(kt.InputTesting), len(out)) for i, _ := range out { assert.Equal(kt.InputTesting[i].Timestamp, out[i]["timestamp"]) + assert.Equal(kt.InputTesting[i].SrcAddr, out[i]["src_addr"]) + assert.Equal(kt.InputTesting[i].SrcGeo, out[i]["src_geo"]) + assert.Equal(kt.InputTesting[i].Protocol, out[i]["protocol"]) for k, v := range kt.InputTesting[i].CustomStr { assert.Equal(v, out[i][k]) } diff --git a/pkg/formats/nrm/nrm.go b/pkg/formats/nrm/nrm.go index 0b92892d..cad23c91 100644 --- a/pkg/formats/nrm/nrm.go +++ b/pkg/formats/nrm/nrm.go @@ -123,7 +123,7 @@ func (f *NRMFormat) To(msgs []*kt.JCHF, serBuf []byte) (*kt.Output, error) { } if !f.doGz { - return kt.NewOutputWithProviderAndCompany(target, msgs[0].Provider, msgs[0].CompanyId, kt.MetricOutput), nil + return kt.NewOutputWithProviderAndCompanySender(target, msgs[0].Provider, msgs[0].CompanyId, kt.MetricOutput, ""), nil } buf := bytes.NewBuffer(serBuf) diff --git a/pkg/kt/testing.go b/pkg/kt/testing.go index e468a321..266db7db 100644 --- a/pkg/kt/testing.go +++ b/pkg/kt/testing.go @@ -4,8 +4,8 @@ import () var ( InputTesting = []*JCHF{ - &JCHF{CompanyId: 10, SrcAddr: "10.2.2.1", Protocol: "TCP", DstAddr: "2001:db8::68", Timestamp: 1, L4DstPort: 80, OutputPort: IfaceID(20), EventType: KENTIK_EVENT_TYPE, CustomStr: map[string]string{"foo": "bar"}, CustomInt: map[string]int32{"fooI": 1}, CustomBigInt: map[string]int64{"fooII": 12}, InBytes: 12121, InPkts: 12, OutBytes: 13, OutPkts: 1, SrcEthMac: "90:61:ae:fb:c2:19", avroSet: map[string]interface{}{}}, - &JCHF{CompanyId: 10, SrcAddr: "3.2.2.2", InBytes: 1, OutBytes: 12, InPkts: 12, OutPkts: 1, Protocol: "UDP", DstAddr: "2001:db8::69", SrcEthMac: "90:61:ae:fb:c2:20", Timestamp: 2, CustomStr: map[string]string{"tar": "far"}, EventType: KENTIK_EVENT_TYPE, avroSet: map[string]interface{}{}}, + &JCHF{CompanyId: 10, SrcAddr: "10.2.2.1", Protocol: "TCP", DstAddr: "2001:db8::68", Timestamp: 1, L4DstPort: 80, SrcAs: 1111, SrcGeo: "US", OutputPort: IfaceID(20), EventType: KENTIK_EVENT_TYPE, CustomStr: map[string]string{"foo": "bar"}, CustomInt: map[string]int32{"fooI": 1}, CustomBigInt: map[string]int64{"fooII": 12}, InBytes: 12121, InPkts: 12, OutBytes: 13, OutPkts: 1, SrcEthMac: "90:61:ae:fb:c2:19", avroSet: map[string]interface{}{}}, + &JCHF{CompanyId: 10, SrcAddr: "3.2.2.2", InBytes: 1, OutBytes: 12, InPkts: 12, OutPkts: 1, Protocol: "UDP", SrcAs: 222223, SrcGeo: "CA", DstAddr: "2001:db8::69", SrcEthMac: "90:61:ae:fb:c2:20", Timestamp: 2, CustomStr: map[string]string{"tar": "far"}, EventType: KENTIK_EVENT_TYPE, avroSet: map[string]interface{}{}}, } InputTestingSynth = []*JCHF{ diff --git a/pkg/kt/types.go b/pkg/kt/types.go index cdc16def..78e8dca7 100644 --- a/pkg/kt/types.go +++ b/pkg/kt/types.go @@ -346,6 +346,7 @@ type OutputContext struct { Provider Provider Type OutputType CompanyId Cid + SenderId string } type Output struct { @@ -362,8 +363,8 @@ func NewOutputWithProvider(body []byte, prov Provider, stype OutputType) *Output return &Output{Body: body, Ctx: OutputContext{Provider: prov, Type: stype}} } -func NewOutputWithProviderAndCompany(body []byte, prov Provider, cid Cid, stype OutputType) *Output { - return &Output{Body: body, Ctx: OutputContext{Provider: prov, Type: stype, CompanyId: cid}} +func NewOutputWithProviderAndCompanySender(body []byte, prov Provider, cid Cid, stype OutputType, senderid string) *Output { + return &Output{Body: body, Ctx: OutputContext{Provider: prov, Type: stype, CompanyId: cid, SenderId: senderid}} } func (o *Output) IsEvent() bool { diff --git a/pkg/sinks/kentik/kentik.go b/pkg/sinks/kentik/kentik.go index f8c2bff8..1cda25d4 100644 --- a/pkg/sinks/kentik/kentik.go +++ b/pkg/sinks/kentik/kentik.go @@ -2,7 +2,6 @@ package kentik import ( "bytes" - "compress/gzip" "context" "crypto/tls" "flag" @@ -12,16 +11,20 @@ import ( "net/url" "strconv" "strings" + "time" go_metrics "github.com/kentik/go-metrics" "github.com/kentik/ktranslate" "github.com/kentik/ktranslate/pkg/eggs/logger" "github.com/kentik/ktranslate/pkg/formats" + "github.com/kentik/ktranslate/pkg/formats/kflow" "github.com/kentik/ktranslate/pkg/kt" ) const ( CHF_TYPE = "application/chf" + + DefaultSendTimeout = 30 * time.Second ) var ( @@ -34,13 +37,14 @@ func init() { type KentikSink struct { logger.ContextL - registry go_metrics.Registry - metrics *KentikMetric - KentikUrl string - client *http.Client - tr *http.Transport - isKentik bool - config *ktranslate.Config + registry go_metrics.Registry + metrics *KentikMetric + KentikUrl string + client *http.Client + tr *http.Transport + isKentik bool + config *ktranslate.Config + sendMaxDuration time.Duration } type KentikMetric struct { @@ -56,7 +60,8 @@ func NewSink(log logger.Underlying, registry go_metrics.Registry, cfg *ktranslat DeliveryErr: go_metrics.GetOrRegisterMeter("delivery_errors_kentik", registry), DeliveryWin: go_metrics.GetOrRegisterMeter("delivery_wins_kentik", registry), }, - config: cfg, + sendMaxDuration: DefaultSendTimeout, + config: cfg, }, nil } @@ -82,7 +87,11 @@ func (s *KentikSink) Init(ctx context.Context, format formats.Format, compressio } func (s *KentikSink) Send(ctx context.Context, payload *kt.Output) { - // Noop, can't send this way. + go func() { + ctxC, cancel := context.WithTimeout(ctx, s.sendMaxDuration) + defer cancel() + s.sendKentik(ctxC, payload.Body, int(payload.Ctx.CompanyId), payload.Ctx.SenderId, kflow.MSG_KEY_PREFIX) + }() } func (s *KentikSink) Close() {} @@ -94,7 +103,7 @@ func (s *KentikSink) HttpInfo() map[string]float64 { } } -func (s *KentikSink) SendKentik(payload []byte, cid int, senderId string, offset int) { +func (s *KentikSink) sendKentik(ctx context.Context, payload []byte, cid int, senderId string, offset int) { if s.isKentik && offset == 0 { // Cut short any flow which is coming from kentik going back to kentik. return } @@ -105,19 +114,14 @@ func (s *KentikSink) SendKentik(payload []byte, cid int, senderId string, offset valString := vals.Encode() fullUrl := s.KentikUrl + "?" + valString - gziped, err := s.gzBuf(nil, payload) - if err != nil { - s.Errorf("Cannot compress Kentik forward: %v", err) - return - } - req, err := http.NewRequestWithContext(context.Background(), "POST", fullUrl, bytes.NewBuffer(gziped)) + req, err := http.NewRequestWithContext(ctx, "POST", fullUrl, bytes.NewBuffer(payload)) if err != nil { s.Errorf("Cannot create Kentik request: %v", err) return } - req.Header.Set("X-CH-Auth-Email", s.config.KentikCreds[0].ApiEmail) - req.Header.Set("X-CH-Auth-API-Token", s.config.KentikCreds[0].ApiToken) + req.Header.Set("X-CH-Auth-Email", s.config.KentikCreds[0].APIEmail) + req.Header.Set("X-CH-Auth-API-Token", s.config.KentikCreds[0].APIToken) req.Header.Set("Content-Type", CHF_TYPE) req.Header.Set("Content-Encoding", "gzip") @@ -141,27 +145,3 @@ func (s *KentikSink) SendKentik(payload []byte, cid int, senderId string, offset } } } - -func (s *KentikSink) gzBuf(serBuf []byte, raw []byte) ([]byte, error) { - if serBuf == nil { - serBuf = make([]byte, len(raw)) - } - buf := bytes.NewBuffer(serBuf) - buf.Reset() - zw, err := gzip.NewWriterLevel(buf, gzip.DefaultCompression) - if err != nil { - return nil, err - } - - _, err = zw.Write(raw) - if err != nil { - return nil, err - } - - err = zw.Close() - if err != nil { - return nil, err - } - - return buf.Bytes(), nil -} diff --git a/pkg/util/enrich/enrich.go b/pkg/util/enrich/enrich.go index fe6147fd..c857e071 100644 --- a/pkg/util/enrich/enrich.go +++ b/pkg/util/enrich/enrich.go @@ -3,18 +3,31 @@ package enrich import ( "bytes" "context" + "crypto/sha256" "fmt" "io/ioutil" + "net" "net/http" + "strconv" + "strings" "github.com/kentik/ktranslate/pkg/eggs/logger" "github.com/kentik/ktranslate/pkg/kt" ) +const ( + EnrichUrlHashSrcIP = "hash_src_ip" + EnrichUrlHashDstIP = "hash_dst_ip" + EnrichUrlHashAllIP = "hash_ip" +) + type Enricher struct { logger.ContextL url string client *http.Client + doSrc bool + doDst bool + salt []byte } func NewEnricher(url string, log logger.Underlying) (*Enricher, error) { @@ -22,13 +35,28 @@ func NewEnricher(url string, log logger.Underlying) (*Enricher, error) { ContextL: logger.NewContextLFromUnderlying(logger.SContext{S: "Enricher"}, log), url: url, client: &http.Client{}, + doSrc: strings.HasPrefix(url, EnrichUrlHashSrcIP) || strings.HasPrefix(url, EnrichUrlHashAllIP), + doDst: strings.HasPrefix(url, EnrichUrlHashDstIP) || strings.HasPrefix(url, EnrichUrlHashAllIP), } - e.Infof("Enriching at %s", url) + if e.doSrc || e.doDst { + var salt string + if strings.HasPrefix(url, EnrichUrlHashAllIP) { + salt = url[len(EnrichUrlHashAllIP):] + } else { + salt = url[len(EnrichUrlHashSrcIP):] // same # chars src and dst. + } + e.salt = []byte(salt) + } + + e.Infof("Enriching at %s. Source: %v, Dest: %v, Salt %s", url, e.doSrc, e.doDst, string(e.salt)) return &e, nil } func (e *Enricher) Enrich(ctx context.Context, msgs []*kt.JCHF) ([]*kt.JCHF, error) { + if e.doSrc || e.doDst { + return e.hashIP(ctx, msgs) + } target, err := json.Marshal(msgs) // Has to be an array here, no idea why. if err != nil { @@ -59,3 +87,25 @@ func (e *Enricher) Enrich(ctx context.Context, msgs []*kt.JCHF) ([]*kt.JCHF, err err = json.Unmarshal(body, &msgs) return msgs, err } + +func (e *Enricher) hashIP(ctx context.Context, msgs []*kt.JCHF) ([]*kt.JCHF, error) { + h := sha256.New() + for _, msg := range msgs { + if e.doSrc { + h.Write(e.salt) + h.Write([]byte(msg.SrcAddr)) + msg.SrcAddr = net.IP(h.Sum(nil)[0:16]).String() + msg.CustomStr["src_endpoint"] = msg.SrcAddr + ":" + strconv.Itoa(int(msg.L4SrcPort)) + h.Reset() + } + if e.doDst { + h.Write(e.salt) + h.Write([]byte(msg.DstAddr)) + msg.DstAddr = net.IP(h.Sum(nil)[0:16]).String() + msg.CustomStr["dst_endpoint"] = msg.DstAddr + ":" + strconv.Itoa(int(msg.L4DstPort)) + h.Reset() + } + } + + return msgs, nil +}