From 48b2129ae4a6e52976d38ca9a7313936fa267d2c Mon Sep 17 00:00:00 2001 From: shivamrawat101192 Date: Fri, 7 Jun 2024 13:31:20 +0530 Subject: [PATCH 1/4] http2 as well as grpc handling --- go.mod | 4 +- go.sum | 5 +- main.go | 281 +++++++++++++++++++++++++++++++++++++++++++++++++++++++- 3 files changed, 284 insertions(+), 6 deletions(-) diff --git a/go.mod b/go.mod index b1db211..b5519a7 100644 --- a/go.mod +++ b/go.mod @@ -3,9 +3,10 @@ module github.com/akto-api-security/mirroring-api-logging go 1.17 require ( - github.com/akto-api-security/gomiddleware v0.1.0 + github.com/akto-api-security/gomiddleware v0.1.3 github.com/google/gopacket v1.1.19 github.com/segmentio/kafka-go v0.4.25 + golang.org/x/net v0.0.0-20190620200207-3b0461eec859 ) require ( @@ -13,4 +14,5 @@ require ( github.com/klauspost/compress v1.9.8 // indirect github.com/pierrec/lz4 v2.6.0+incompatible // indirect golang.org/x/sys v0.0.0-20190412213103-97732733099d // indirect + golang.org/x/text v0.3.0 // indirect ) diff --git a/go.sum b/go.sum index ce45bce..0a97600 100644 --- a/go.sum +++ b/go.sum @@ -1,5 +1,5 @@ -github.com/akto-api-security/gomiddleware v0.1.0 h1:7yf8j2yKVX1Ar5kBeIMjzBAuOBZj9BvTZJ8uEALmR8s= -github.com/akto-api-security/gomiddleware v0.1.0/go.mod h1:pCxZc7oWn6Wlv4S8ISJDr7F7mhir0M4IQS/Z5mC0vu8= +github.com/akto-api-security/gomiddleware v0.1.3 h1:rcsQK7uwu5Z56hTM4lQQZbqGSXXtb+1MznbljCfSpUA= +github.com/akto-api-security/gomiddleware v0.1.3/go.mod h1:pCxZc7oWn6Wlv4S8ISJDr7F7mhir0M4IQS/Z5mC0vu8= github.com/davecgh/go-spew v1.1.0 h1:ZDRjVQ15GmhC3fiQ8ni8+OwkZQO4DARzQgrnXU1Liz8= github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= github.com/eapache/go-xerial-snappy v0.0.0-20180814174437-776d5712da21 h1:YEetp8/yCZMuEPMUDHG0CW/brkkEp8mzqk2+ODEitlw= @@ -23,7 +23,6 @@ github.com/pierrec/lz4 v2.6.0+incompatible h1:Ix9yFKn1nSPBLFl/yZknTp8TU5G4Ps0JDm github.com/pierrec/lz4 v2.6.0+incompatible/go.mod h1:pdkljMzZIN41W+lC3N2tnIh5sFi+IEE17M5jbnwPHcY= github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM= github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= -github.com/segmentio/kafka-go v0.4.23/go.mod h1:XzMcoMjSzDGHcIwpWUI7GB43iKZ2fTVmryPSGLf/MPg= github.com/segmentio/kafka-go v0.4.25 h1:QVx9yz12syKBFkxR+dVDDwTO0ItHgnjjhIdBfqizj+8= github.com/segmentio/kafka-go v0.4.25/go.mod h1:XzMcoMjSzDGHcIwpWUI7GB43iKZ2fTVmryPSGLf/MPg= github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME= diff --git a/main.go b/main.go index 12b737d..aa0fff8 100644 --- a/main.go +++ b/main.go @@ -15,7 +15,9 @@ import ( "bytes" "compress/gzip" "context" + "encoding/base64" "encoding/json" + "errors" "fmt" "io" "io/ioutil" @@ -23,6 +25,7 @@ import ( "net/http" "os" "strconv" + "strings" "time" "github.com/google/gopacket" @@ -32,6 +35,9 @@ import ( "github.com/akto-api-security/gomiddleware" "github.com/segmentio/kafka-go" + + "golang.org/x/net/http2" + "golang.org/x/net/http2/hpack" ) var printCounter = 500 @@ -82,6 +88,16 @@ type myFactory struct { vxlanID int } +type http2ReqResp struct { + headersMap map[string]string + payload string + isInvalid bool +} + +func (k http2ReqResp) String() string { + return fmt.Sprintf("%v:%v", k.headersMap, k.payload) +} + // New handles creating a new tcpassembly.Stream. func (f *myFactory) New(netFlow, tcpFlow gopacket.Flow) tcpassembly.Stream { // Create a new stream. @@ -149,8 +165,232 @@ func (s *myStream) ReassemblyComplete() { s.bidi.maybeFinish() } -func tryReadFromBD(bd *bidi, isPending bool) { - reader := bufio.NewReader(bytes.NewReader(bd.a.bytes)) +func tryParseAsHttp2Request(bd *bidi, isPending bool) (bool, error) { + + isHttp2Req := false + if len(bd.a.bytes) > 24 && string(bd.a.bytes[0:24]) == "PRI * HTTP/2.0\r\n\r\nSM\r\n\r\n" { + bd.a.bytes = bd.a.bytes[24:] + } + streamRequestMap := make(map[string][]http2ReqResp) + framer := http2.NewFramer(nil, bytes.NewReader(bd.a.bytes)) + + headersMap := make(map[string]string) + payload := "" + + gotHeaders := make(map[string]bool) + gotPayload := make(map[string]bool) + decoder := hpack.NewDecoder(4096, func(hf hpack.HeaderField) { + + // fmt.Printf("REQ Header: %s: %s\n", hf.Name, hf.Value) + if len(hf.Name) > 0 { + headersMap[hf.Name] = hf.Value + } + }) + + for { + + frame, err := framer.ReadFrame() + // fmt.Printf("Frame: %v\n", frame) + + if err == io.EOF { + break + } + if err != nil { + continue + } + + // Print frame details + // fmt.Printf("Frame reached here: %v\n", frame) + // fmt.Printf("Stream Id: %v\n", frame.Header().StreamID) + streamId := fmt.Sprint(frame.Header().StreamID) + if len(streamId) == 0 { + continue + } + + if !gotHeaders[streamId] { + headersMap = make(map[string]string) + } + + // fmt.Printf("Frame reached here: %v\n", frame.Header().StreamID) + // fmt.Printf("streamId working on %v\n", streamId) + switch f := frame.(type) { + case *http2.HeadersFrame: + _, err := decoder.Write(f.HeaderBlockFragment()) + gotHeaders[streamId] = true + if err != nil { + // log.Printf("Error request decoding headers: %v", err) + } + + case *http2.DataFrame: + // log.Println("Data: ", len(f.Data()), string(f.Data())) + if len(string(f.Data())) > 0 { + payload = base64.StdEncoding.EncodeToString(f.Data()) + gotPayload[streamId] = true + // fmt.Println("payload", payload) + } + } + + if gotHeaders[streamId] && gotPayload[streamId] { + if _, exists := streamRequestMap[streamId]; !exists { + streamRequestMap[streamId] = []http2ReqResp{} + } + streamRequestMap[streamId] = append(streamRequestMap[streamId], http2ReqResp{ + headersMap: headersMap, + payload: payload, + }) + gotHeaders[streamId] = false + gotPayload[streamId] = false + } + } + + // log.Println("Reached here for resp") + // log.Println("bd.b.bytes: ", len(bd.b.bytes), string(bd.b.bytes)) + + gotHeaders = make(map[string]bool) + gotPayload = make(map[string]bool) + gotGrpcHeaders := make(map[string]bool) + headersCount := make(map[string]int) + headersMap = make(map[string]string) + payload = "" + + streamResponseMap := make(map[string][]http2ReqResp) + framerResp := http2.NewFramer(nil, bytes.NewReader(bd.b.bytes)) + headersMap = make(map[string]string) + decoder = hpack.NewDecoder(4096, func(hf hpack.HeaderField) { + // fmt.Printf("RES Header: %s: %s\n", hf.Name, hf.Value) + if len(hf.Name) > 0 { + headersMap[hf.Name] = hf.Value + } + }) + + for { + frame, err := framerResp.ReadFrame() + // fmt.Printf("Frame: %v\n", frame) + if err == io.EOF { + break + } + if err != nil { + continue + } + + // Print frame details + streamId := fmt.Sprint(frame.Header().StreamID) + + if len(streamId) == 0 { + continue + } + if !(gotHeaders[streamId]) { + headersMap = make(map[string]string) + } + + switch f := frame.(type) { + case *http2.HeadersFrame: + // fmt.Println("headers map", headersMap) + _, err := decoder.Write(f.HeaderBlockFragment()) + if err != nil { + log.Printf("Error response decoding headers: %v", err) + } + if headersCount[streamId] == 0 { + if strings.Contains(headersMap["content-type"], "application/grpc") { + gotGrpcHeaders[streamId] = true + } + gotHeaders[streamId] = true + } + headersCount[streamId]++ + case *http2.DataFrame: + // log.Println("Data: ", len(f.Data()), string(f.Data())) + if len(string(f.Data())) > 0 { + payload = base64.StdEncoding.EncodeToString(f.Data()) + gotPayload[streamId] = true + // fmt.Println("payload", payload) + } + } + if gotHeaders[streamId] && gotPayload[streamId] { + + if gotGrpcHeaders[streamId] && headersCount[streamId] == 1 { + continue + } + + if _, exists := streamResponseMap[streamId]; !exists { + streamResponseMap[streamId] = []http2ReqResp{} + } + streamResponseMap[streamId] = append(streamResponseMap[streamId], http2ReqResp{ + headersMap: headersMap, + payload: payload, + }) + gotPayload[streamId] = false + gotHeaders[streamId] = false + gotGrpcHeaders[streamId] = false + headersCount[streamId] = 0 + } + } + + for streamId, http2Req := range streamRequestMap { + http2Resp := streamResponseMap[streamId] + if len(http2Resp) != len(http2Req) { + continue + } + for req := range http2Req { + + http2Request := http2Req[req] + http2Response := http2Resp[req] + + value := make(map[string]string) + + if path, exists := http2Request.headersMap[":path"]; exists { + value["path"] = path + delete(http2Request.headersMap, ":path") + } + if method, exists := http2Request.headersMap[":method"]; exists { + value["method"] = method + delete(http2Request.headersMap, ":method") + } + if scheme, exists := http2Request.headersMap[":scheme"]; exists { + value["scheme"] = scheme + delete(http2Request.headersMap, ":scheme") + } + if status, exists := http2Response.headersMap[":status"]; exists { + value["statusCode"] = status + delete(http2Response.headersMap, ":status") + } + value["requestPayload"] = http2Request.payload + value["responsePayload"] = http2Request.payload + + if len(http2Request.headersMap) > 0 { + requestHeaders, _ := json.Marshal(http2Request.headersMap) + value["requestHeaders"] = string(requestHeaders) + } + if len(http2Response.headersMap) > 0 { + responseHeader, _ := json.Marshal(http2Response.headersMap) + value["responseHeader"] = string(responseHeader) + } + + value["ip"] = bd.key.net.Src().String() + value["akto_account_id"] = fmt.Sprint(1000000) + value["akto_vxlan_id"] = fmt.Sprint(bd.vxlanID) + value["time"] = fmt.Sprint(time.Now().Unix()) + value["is_pending"] = fmt.Sprint(isPending) + out, _ := json.Marshal(value) + isHttp2Req = true + + if printCounter > 0 { + printCounter-- + log.Println("req-resp.String()", string(out)) + } + // go gomiddleware.Produce(kafkaWriter, ctx, string(out)) + } + + } + + if isHttp2Req { + return true, nil + } + return false, errors.New("not an http2 request") +} + +func tryParseAsNormalHttpRequest(bd *bidi, isPending bool) { + + reader := bufio.NewReader(bytes.NewReader(bd.b.bytes)) i := 0 requests := []http.Request{} requestsContent := []string{} @@ -262,6 +502,13 @@ func tryReadFromBD(bd *bidi, isPending bool) { } } +func tryReadFromBD(bd *bidi, isPending bool) { + _, err := tryParseAsHttp2Request(bd, isPending) + if err != nil { + tryParseAsNormalHttpRequest(bd, isPending) + } +} + // maybeFinish will wait until both directions are complete, then print out // stats. func (bd *bidi) maybeFinish() { @@ -281,6 +528,17 @@ func (bd *bidi) maybeFinish() { } } +// func flushAll() { +// for _, v := range assemblerMap { +// log.Println("TIME.SECOND:", time.Second) +// v.FlushOlderThan(time.Now().Add(time.Second * -500)) +// //log.Println("num flushed/closed:", r, k) +// //log.Println("streams before closing: ", len(factoryMap[k].bidiMap)) +// //factoryMap[k].collectOldStreams() +// //log.Println("streams after closing: ", len(factoryMap[k].bidiMap)) +// } +// } + func createAndGetAssembler(vxlanID int) *tcpassembly.Assembler { _assembler := assemblerMap[vxlanID] @@ -332,10 +590,29 @@ func run(handle *pcap.Handle, apiCollectionId int) { // Read in packets, pass to assembler. packetSource := gopacket.NewPacketSource(handle, handle.LinkType()) for packet := range packetSource.Packets() { + // pb, ok := packet.(gopacket.PacketBuilder) + // if !ok { + // panic("Not a PacketBuilder") + // } + // ipv4 := &layers.IPv4{} + // ipv4.DecodeFromBytes(packet.Data()[20:], pb) + // pb.AddLayer(ipv4) + // pb.SetNetworkLayer(ipv4) + // pb.NextDecoder(ipv4.NextLayerType()) + + // arr := packet1.Data() + // if len(arr) <= 20 { + // continue + // } + // + // packet := gopacket.NewPacket(arr[20:], layers.LayerTypeIPv4, gopacket.Default) + // innerPacket := packet vxlanID := apiCollectionId if apiCollectionId <= 0 { + // log.Println("packet.NetworkLayer().NetworkFlow().Des()", packet.NetworkLayer().NetworkFlow().Dst()) + // log.Println("packet.TransportLayer().LayerType()", packet.TransportLayer().LayerType()) if packet.NetworkLayer() == nil || packet.TransportLayer() == nil || packet.TransportLayer().LayerType() != layers.LayerTypeUDP { continue } From e3119f3e2d5a8b7d6f2efa424c733a3028da142f Mon Sep 17 00:00:00 2001 From: shivamrawat101192 Date: Fri, 7 Jun 2024 15:04:53 +0530 Subject: [PATCH 2/4] handling grpc only when "PRI * HTTP/2.0\r\n\r\nSM\r\n\r\n" string found --- main.go | 79 ++++++--------------------------------------------------- 1 file changed, 8 insertions(+), 71 deletions(-) diff --git a/main.go b/main.go index aa0fff8..dd1487a 100644 --- a/main.go +++ b/main.go @@ -17,7 +17,6 @@ import ( "context" "encoding/base64" "encoding/json" - "errors" "fmt" "io" "io/ioutil" @@ -165,12 +164,8 @@ func (s *myStream) ReassemblyComplete() { s.bidi.maybeFinish() } -func tryParseAsHttp2Request(bd *bidi, isPending bool) (bool, error) { +func tryParseAsHttp2Request(bd *bidi, isPending bool) { - isHttp2Req := false - if len(bd.a.bytes) > 24 && string(bd.a.bytes[0:24]) == "PRI * HTTP/2.0\r\n\r\nSM\r\n\r\n" { - bd.a.bytes = bd.a.bytes[24:] - } streamRequestMap := make(map[string][]http2ReqResp) framer := http2.NewFramer(nil, bytes.NewReader(bd.a.bytes)) @@ -181,7 +176,6 @@ func tryParseAsHttp2Request(bd *bidi, isPending bool) (bool, error) { gotPayload := make(map[string]bool) decoder := hpack.NewDecoder(4096, func(hf hpack.HeaderField) { - // fmt.Printf("REQ Header: %s: %s\n", hf.Name, hf.Value) if len(hf.Name) > 0 { headersMap[hf.Name] = hf.Value } @@ -190,7 +184,6 @@ func tryParseAsHttp2Request(bd *bidi, isPending bool) (bool, error) { for { frame, err := framer.ReadFrame() - // fmt.Printf("Frame: %v\n", frame) if err == io.EOF { break @@ -199,9 +192,6 @@ func tryParseAsHttp2Request(bd *bidi, isPending bool) (bool, error) { continue } - // Print frame details - // fmt.Printf("Frame reached here: %v\n", frame) - // fmt.Printf("Stream Id: %v\n", frame.Header().StreamID) streamId := fmt.Sprint(frame.Header().StreamID) if len(streamId) == 0 { continue @@ -211,22 +201,17 @@ func tryParseAsHttp2Request(bd *bidi, isPending bool) (bool, error) { headersMap = make(map[string]string) } - // fmt.Printf("Frame reached here: %v\n", frame.Header().StreamID) - // fmt.Printf("streamId working on %v\n", streamId) switch f := frame.(type) { case *http2.HeadersFrame: _, err := decoder.Write(f.HeaderBlockFragment()) gotHeaders[streamId] = true if err != nil { - // log.Printf("Error request decoding headers: %v", err) } case *http2.DataFrame: - // log.Println("Data: ", len(f.Data()), string(f.Data())) if len(string(f.Data())) > 0 { payload = base64.StdEncoding.EncodeToString(f.Data()) gotPayload[streamId] = true - // fmt.Println("payload", payload) } } @@ -243,9 +228,6 @@ func tryParseAsHttp2Request(bd *bidi, isPending bool) (bool, error) { } } - // log.Println("Reached here for resp") - // log.Println("bd.b.bytes: ", len(bd.b.bytes), string(bd.b.bytes)) - gotHeaders = make(map[string]bool) gotPayload = make(map[string]bool) gotGrpcHeaders := make(map[string]bool) @@ -257,7 +239,6 @@ func tryParseAsHttp2Request(bd *bidi, isPending bool) (bool, error) { framerResp := http2.NewFramer(nil, bytes.NewReader(bd.b.bytes)) headersMap = make(map[string]string) decoder = hpack.NewDecoder(4096, func(hf hpack.HeaderField) { - // fmt.Printf("RES Header: %s: %s\n", hf.Name, hf.Value) if len(hf.Name) > 0 { headersMap[hf.Name] = hf.Value } @@ -265,7 +246,6 @@ func tryParseAsHttp2Request(bd *bidi, isPending bool) (bool, error) { for { frame, err := framerResp.ReadFrame() - // fmt.Printf("Frame: %v\n", frame) if err == io.EOF { break } @@ -273,7 +253,6 @@ func tryParseAsHttp2Request(bd *bidi, isPending bool) (bool, error) { continue } - // Print frame details streamId := fmt.Sprint(frame.Header().StreamID) if len(streamId) == 0 { @@ -285,7 +264,6 @@ func tryParseAsHttp2Request(bd *bidi, isPending bool) (bool, error) { switch f := frame.(type) { case *http2.HeadersFrame: - // fmt.Println("headers map", headersMap) _, err := decoder.Write(f.HeaderBlockFragment()) if err != nil { log.Printf("Error response decoding headers: %v", err) @@ -298,11 +276,9 @@ func tryParseAsHttp2Request(bd *bidi, isPending bool) (bool, error) { } headersCount[streamId]++ case *http2.DataFrame: - // log.Println("Data: ", len(f.Data()), string(f.Data())) if len(string(f.Data())) > 0 { payload = base64.StdEncoding.EncodeToString(f.Data()) gotPayload[streamId] = true - // fmt.Println("payload", payload) } } if gotHeaders[streamId] && gotPayload[streamId] { @@ -371,7 +347,6 @@ func tryParseAsHttp2Request(bd *bidi, isPending bool) (bool, error) { value["time"] = fmt.Sprint(time.Now().Unix()) value["is_pending"] = fmt.Sprint(isPending) out, _ := json.Marshal(value) - isHttp2Req = true if printCounter > 0 { printCounter-- @@ -381,16 +356,16 @@ func tryParseAsHttp2Request(bd *bidi, isPending bool) (bool, error) { } } - - if isHttp2Req { - return true, nil - } - return false, errors.New("not an http2 request") } -func tryParseAsNormalHttpRequest(bd *bidi, isPending bool) { +func tryReadFromBD(bd *bidi, isPending bool) { + if len(bd.a.bytes) > 24 && string(bd.a.bytes[0:24]) == "PRI * HTTP/2.0\r\n\r\nSM\r\n\r\n" { + bd.a.bytes = bd.a.bytes[24:] + tryParseAsHttp2Request(bd, isPending) + return + } - reader := bufio.NewReader(bytes.NewReader(bd.b.bytes)) + reader := bufio.NewReader(bytes.NewReader(bd.a.bytes)) i := 0 requests := []http.Request{} requestsContent := []string{} @@ -412,7 +387,6 @@ func tryParseAsNormalHttpRequest(bd *bidi, isPending bool) { requests = append(requests, *req) requestsContent = append(requestsContent, string(body)) - // log.Println("req.URL.String()", i, req.URL.String(), string(body), len(bd.a.bytes)) i++ } @@ -502,13 +476,6 @@ func tryParseAsNormalHttpRequest(bd *bidi, isPending bool) { } } -func tryReadFromBD(bd *bidi, isPending bool) { - _, err := tryParseAsHttp2Request(bd, isPending) - if err != nil { - tryParseAsNormalHttpRequest(bd, isPending) - } -} - // maybeFinish will wait until both directions are complete, then print out // stats. func (bd *bidi) maybeFinish() { @@ -528,17 +495,6 @@ func (bd *bidi) maybeFinish() { } } -// func flushAll() { -// for _, v := range assemblerMap { -// log.Println("TIME.SECOND:", time.Second) -// v.FlushOlderThan(time.Now().Add(time.Second * -500)) -// //log.Println("num flushed/closed:", r, k) -// //log.Println("streams before closing: ", len(factoryMap[k].bidiMap)) -// //factoryMap[k].collectOldStreams() -// //log.Println("streams after closing: ", len(factoryMap[k].bidiMap)) -// } -// } - func createAndGetAssembler(vxlanID int) *tcpassembly.Assembler { _assembler := assemblerMap[vxlanID] @@ -590,29 +546,10 @@ func run(handle *pcap.Handle, apiCollectionId int) { // Read in packets, pass to assembler. packetSource := gopacket.NewPacketSource(handle, handle.LinkType()) for packet := range packetSource.Packets() { - // pb, ok := packet.(gopacket.PacketBuilder) - // if !ok { - // panic("Not a PacketBuilder") - // } - // ipv4 := &layers.IPv4{} - // ipv4.DecodeFromBytes(packet.Data()[20:], pb) - // pb.AddLayer(ipv4) - // pb.SetNetworkLayer(ipv4) - // pb.NextDecoder(ipv4.NextLayerType()) - - // arr := packet1.Data() - // if len(arr) <= 20 { - // continue - // } - // - // packet := gopacket.NewPacket(arr[20:], layers.LayerTypeIPv4, gopacket.Default) - // innerPacket := packet vxlanID := apiCollectionId if apiCollectionId <= 0 { - // log.Println("packet.NetworkLayer().NetworkFlow().Des()", packet.NetworkLayer().NetworkFlow().Dst()) - // log.Println("packet.TransportLayer().LayerType()", packet.TransportLayer().LayerType()) if packet.NetworkLayer() == nil || packet.TransportLayer() == nil || packet.TransportLayer().LayerType() != layers.LayerTypeUDP { continue } From cfb34b30c81cd19f5a1ee36246bbf1e67eb7ed73 Mon Sep 17 00:00:00 2001 From: shivamrawat101192 Date: Fri, 7 Jun 2024 15:19:00 +0530 Subject: [PATCH 3/4] enabling push to kafka for http2 --- main.go | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/main.go b/main.go index dd1487a..79830ee 100644 --- a/main.go +++ b/main.go @@ -347,12 +347,13 @@ func tryParseAsHttp2Request(bd *bidi, isPending bool) { value["time"] = fmt.Sprint(time.Now().Unix()) value["is_pending"] = fmt.Sprint(isPending) out, _ := json.Marshal(value) + ctx := context.Background() if printCounter > 0 { printCounter-- log.Println("req-resp.String()", string(out)) } - // go gomiddleware.Produce(kafkaWriter, ctx, string(out)) + go gomiddleware.Produce(kafkaWriter, ctx, string(out)) } } From 36b45c32a1bc299a31578a94f90e4e62c7b65b00 Mon Sep 17 00:00:00 2001 From: shivamrawat101192 Date: Sat, 8 Jun 2024 00:16:41 +0530 Subject: [PATCH 4/4] bug fixes and handling the case when "path" is empty --- main.go | 8 ++++++-- 1 file changed, 6 insertions(+), 2 deletions(-) diff --git a/main.go b/main.go index 79830ee..3de88b9 100644 --- a/main.go +++ b/main.go @@ -316,6 +316,8 @@ func tryParseAsHttp2Request(bd *bidi, isPending bool) { if path, exists := http2Request.headersMap[":path"]; exists { value["path"] = path delete(http2Request.headersMap, ":path") + } else { + continue } if method, exists := http2Request.headersMap[":method"]; exists { value["method"] = method @@ -327,10 +329,11 @@ func tryParseAsHttp2Request(bd *bidi, isPending bool) { } if status, exists := http2Response.headersMap[":status"]; exists { value["statusCode"] = status + value["status"] = status delete(http2Response.headersMap, ":status") } value["requestPayload"] = http2Request.payload - value["responsePayload"] = http2Request.payload + value["responsePayload"] = http2Response.payload if len(http2Request.headersMap) > 0 { requestHeaders, _ := json.Marshal(http2Request.headersMap) @@ -338,9 +341,10 @@ func tryParseAsHttp2Request(bd *bidi, isPending bool) { } if len(http2Response.headersMap) > 0 { responseHeader, _ := json.Marshal(http2Response.headersMap) - value["responseHeader"] = string(responseHeader) + value["responseHeaders"] = string(responseHeader) } + value["type"] = "HTTP/2" value["ip"] = bd.key.net.Src().String() value["akto_account_id"] = fmt.Sprint(1000000) value["akto_vxlan_id"] = fmt.Sprint(bd.vxlanID)