diff --git a/go.mod b/go.mod index b1db211..b5519a7 100644 --- a/go.mod +++ b/go.mod @@ -3,9 +3,10 @@ module github.com/akto-api-security/mirroring-api-logging go 1.17 require ( - github.com/akto-api-security/gomiddleware v0.1.0 + github.com/akto-api-security/gomiddleware v0.1.3 github.com/google/gopacket v1.1.19 github.com/segmentio/kafka-go v0.4.25 + golang.org/x/net v0.0.0-20190620200207-3b0461eec859 ) require ( @@ -13,4 +14,5 @@ require ( github.com/klauspost/compress v1.9.8 // indirect github.com/pierrec/lz4 v2.6.0+incompatible // indirect golang.org/x/sys v0.0.0-20190412213103-97732733099d // indirect + golang.org/x/text v0.3.0 // indirect ) diff --git a/go.sum b/go.sum index ce45bce..0a97600 100644 --- a/go.sum +++ b/go.sum @@ -1,5 +1,5 @@ -github.com/akto-api-security/gomiddleware v0.1.0 h1:7yf8j2yKVX1Ar5kBeIMjzBAuOBZj9BvTZJ8uEALmR8s= -github.com/akto-api-security/gomiddleware v0.1.0/go.mod h1:pCxZc7oWn6Wlv4S8ISJDr7F7mhir0M4IQS/Z5mC0vu8= +github.com/akto-api-security/gomiddleware v0.1.3 h1:rcsQK7uwu5Z56hTM4lQQZbqGSXXtb+1MznbljCfSpUA= +github.com/akto-api-security/gomiddleware v0.1.3/go.mod h1:pCxZc7oWn6Wlv4S8ISJDr7F7mhir0M4IQS/Z5mC0vu8= github.com/davecgh/go-spew v1.1.0 h1:ZDRjVQ15GmhC3fiQ8ni8+OwkZQO4DARzQgrnXU1Liz8= github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= github.com/eapache/go-xerial-snappy v0.0.0-20180814174437-776d5712da21 h1:YEetp8/yCZMuEPMUDHG0CW/brkkEp8mzqk2+ODEitlw= @@ -23,7 +23,6 @@ github.com/pierrec/lz4 v2.6.0+incompatible h1:Ix9yFKn1nSPBLFl/yZknTp8TU5G4Ps0JDm github.com/pierrec/lz4 v2.6.0+incompatible/go.mod h1:pdkljMzZIN41W+lC3N2tnIh5sFi+IEE17M5jbnwPHcY= github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM= github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= -github.com/segmentio/kafka-go v0.4.23/go.mod h1:XzMcoMjSzDGHcIwpWUI7GB43iKZ2fTVmryPSGLf/MPg= github.com/segmentio/kafka-go v0.4.25 h1:QVx9yz12syKBFkxR+dVDDwTO0ItHgnjjhIdBfqizj+8= github.com/segmentio/kafka-go v0.4.25/go.mod h1:XzMcoMjSzDGHcIwpWUI7GB43iKZ2fTVmryPSGLf/MPg= github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME= diff --git a/main.go b/main.go index 12b737d..3de88b9 100644 --- a/main.go +++ b/main.go @@ -15,6 +15,7 @@ import ( "bytes" "compress/gzip" "context" + "encoding/base64" "encoding/json" "fmt" "io" @@ -23,6 +24,7 @@ import ( "net/http" "os" "strconv" + "strings" "time" "github.com/google/gopacket" @@ -32,6 +34,9 @@ import ( "github.com/akto-api-security/gomiddleware" "github.com/segmentio/kafka-go" + + "golang.org/x/net/http2" + "golang.org/x/net/http2/hpack" ) var printCounter = 500 @@ -82,6 +87,16 @@ type myFactory struct { vxlanID int } +type http2ReqResp struct { + headersMap map[string]string + payload string + isInvalid bool +} + +func (k http2ReqResp) String() string { + return fmt.Sprintf("%v:%v", k.headersMap, k.payload) +} + // New handles creating a new tcpassembly.Stream. func (f *myFactory) New(netFlow, tcpFlow gopacket.Flow) tcpassembly.Stream { // Create a new stream. @@ -149,7 +164,212 @@ func (s *myStream) ReassemblyComplete() { s.bidi.maybeFinish() } +func tryParseAsHttp2Request(bd *bidi, isPending bool) { + + streamRequestMap := make(map[string][]http2ReqResp) + framer := http2.NewFramer(nil, bytes.NewReader(bd.a.bytes)) + + headersMap := make(map[string]string) + payload := "" + + gotHeaders := make(map[string]bool) + gotPayload := make(map[string]bool) + decoder := hpack.NewDecoder(4096, func(hf hpack.HeaderField) { + + if len(hf.Name) > 0 { + headersMap[hf.Name] = hf.Value + } + }) + + for { + + frame, err := framer.ReadFrame() + + if err == io.EOF { + break + } + if err != nil { + continue + } + + streamId := fmt.Sprint(frame.Header().StreamID) + if len(streamId) == 0 { + continue + } + + if !gotHeaders[streamId] { + headersMap = make(map[string]string) + } + + switch f := frame.(type) { + case *http2.HeadersFrame: + _, err := decoder.Write(f.HeaderBlockFragment()) + gotHeaders[streamId] = true + if err != nil { + } + + case *http2.DataFrame: + if len(string(f.Data())) > 0 { + payload = base64.StdEncoding.EncodeToString(f.Data()) + gotPayload[streamId] = true + } + } + + if gotHeaders[streamId] && gotPayload[streamId] { + if _, exists := streamRequestMap[streamId]; !exists { + streamRequestMap[streamId] = []http2ReqResp{} + } + streamRequestMap[streamId] = append(streamRequestMap[streamId], http2ReqResp{ + headersMap: headersMap, + payload: payload, + }) + gotHeaders[streamId] = false + gotPayload[streamId] = false + } + } + + gotHeaders = make(map[string]bool) + gotPayload = make(map[string]bool) + gotGrpcHeaders := make(map[string]bool) + headersCount := make(map[string]int) + headersMap = make(map[string]string) + payload = "" + + streamResponseMap := make(map[string][]http2ReqResp) + framerResp := http2.NewFramer(nil, bytes.NewReader(bd.b.bytes)) + headersMap = make(map[string]string) + decoder = hpack.NewDecoder(4096, func(hf hpack.HeaderField) { + if len(hf.Name) > 0 { + headersMap[hf.Name] = hf.Value + } + }) + + for { + frame, err := framerResp.ReadFrame() + if err == io.EOF { + break + } + if err != nil { + continue + } + + streamId := fmt.Sprint(frame.Header().StreamID) + + if len(streamId) == 0 { + continue + } + if !(gotHeaders[streamId]) { + headersMap = make(map[string]string) + } + + switch f := frame.(type) { + case *http2.HeadersFrame: + _, err := decoder.Write(f.HeaderBlockFragment()) + if err != nil { + log.Printf("Error response decoding headers: %v", err) + } + if headersCount[streamId] == 0 { + if strings.Contains(headersMap["content-type"], "application/grpc") { + gotGrpcHeaders[streamId] = true + } + gotHeaders[streamId] = true + } + headersCount[streamId]++ + case *http2.DataFrame: + if len(string(f.Data())) > 0 { + payload = base64.StdEncoding.EncodeToString(f.Data()) + gotPayload[streamId] = true + } + } + if gotHeaders[streamId] && gotPayload[streamId] { + + if gotGrpcHeaders[streamId] && headersCount[streamId] == 1 { + continue + } + + if _, exists := streamResponseMap[streamId]; !exists { + streamResponseMap[streamId] = []http2ReqResp{} + } + streamResponseMap[streamId] = append(streamResponseMap[streamId], http2ReqResp{ + headersMap: headersMap, + payload: payload, + }) + gotPayload[streamId] = false + gotHeaders[streamId] = false + gotGrpcHeaders[streamId] = false + headersCount[streamId] = 0 + } + } + + for streamId, http2Req := range streamRequestMap { + http2Resp := streamResponseMap[streamId] + if len(http2Resp) != len(http2Req) { + continue + } + for req := range http2Req { + + http2Request := http2Req[req] + http2Response := http2Resp[req] + + value := make(map[string]string) + + if path, exists := http2Request.headersMap[":path"]; exists { + value["path"] = path + delete(http2Request.headersMap, ":path") + } else { + continue + } + if method, exists := http2Request.headersMap[":method"]; exists { + value["method"] = method + delete(http2Request.headersMap, ":method") + } + if scheme, exists := http2Request.headersMap[":scheme"]; exists { + value["scheme"] = scheme + delete(http2Request.headersMap, ":scheme") + } + if status, exists := http2Response.headersMap[":status"]; exists { + value["statusCode"] = status + value["status"] = status + delete(http2Response.headersMap, ":status") + } + value["requestPayload"] = http2Request.payload + value["responsePayload"] = http2Response.payload + + if len(http2Request.headersMap) > 0 { + requestHeaders, _ := json.Marshal(http2Request.headersMap) + value["requestHeaders"] = string(requestHeaders) + } + if len(http2Response.headersMap) > 0 { + responseHeader, _ := json.Marshal(http2Response.headersMap) + value["responseHeaders"] = string(responseHeader) + } + + value["type"] = "HTTP/2" + value["ip"] = bd.key.net.Src().String() + value["akto_account_id"] = fmt.Sprint(1000000) + value["akto_vxlan_id"] = fmt.Sprint(bd.vxlanID) + value["time"] = fmt.Sprint(time.Now().Unix()) + value["is_pending"] = fmt.Sprint(isPending) + out, _ := json.Marshal(value) + ctx := context.Background() + + if printCounter > 0 { + printCounter-- + log.Println("req-resp.String()", string(out)) + } + go gomiddleware.Produce(kafkaWriter, ctx, string(out)) + } + + } +} + func tryReadFromBD(bd *bidi, isPending bool) { + if len(bd.a.bytes) > 24 && string(bd.a.bytes[0:24]) == "PRI * HTTP/2.0\r\n\r\nSM\r\n\r\n" { + bd.a.bytes = bd.a.bytes[24:] + tryParseAsHttp2Request(bd, isPending) + return + } + reader := bufio.NewReader(bytes.NewReader(bd.a.bytes)) i := 0 requests := []http.Request{} @@ -172,7 +392,6 @@ func tryReadFromBD(bd *bidi, isPending bool) { requests = append(requests, *req) requestsContent = append(requestsContent, string(body)) - // log.Println("req.URL.String()", i, req.URL.String(), string(body), len(bd.a.bytes)) i++ }