Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

http2 and grpc handling #46

Open
wants to merge 4 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 3 additions & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -3,14 +3,16 @@ module github.com/akto-api-security/mirroring-api-logging
go 1.17

require (
github.com/akto-api-security/gomiddleware v0.1.0
github.com/akto-api-security/gomiddleware v0.1.3
github.com/google/gopacket v1.1.19
github.com/segmentio/kafka-go v0.4.25
golang.org/x/net v0.0.0-20190620200207-3b0461eec859
)

require (
github.com/golang/snappy v0.0.1 // indirect
github.com/klauspost/compress v1.9.8 // indirect
github.com/pierrec/lz4 v2.6.0+incompatible // indirect
golang.org/x/sys v0.0.0-20190412213103-97732733099d // indirect
golang.org/x/text v0.3.0 // indirect
)
5 changes: 2 additions & 3 deletions go.sum
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
github.com/akto-api-security/gomiddleware v0.1.0 h1:7yf8j2yKVX1Ar5kBeIMjzBAuOBZj9BvTZJ8uEALmR8s=
github.com/akto-api-security/gomiddleware v0.1.0/go.mod h1:pCxZc7oWn6Wlv4S8ISJDr7F7mhir0M4IQS/Z5mC0vu8=
github.com/akto-api-security/gomiddleware v0.1.3 h1:rcsQK7uwu5Z56hTM4lQQZbqGSXXtb+1MznbljCfSpUA=
github.com/akto-api-security/gomiddleware v0.1.3/go.mod h1:pCxZc7oWn6Wlv4S8ISJDr7F7mhir0M4IQS/Z5mC0vu8=
github.com/davecgh/go-spew v1.1.0 h1:ZDRjVQ15GmhC3fiQ8ni8+OwkZQO4DARzQgrnXU1Liz8=
github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/eapache/go-xerial-snappy v0.0.0-20180814174437-776d5712da21 h1:YEetp8/yCZMuEPMUDHG0CW/brkkEp8mzqk2+ODEitlw=
Expand All @@ -23,7 +23,6 @@ github.com/pierrec/lz4 v2.6.0+incompatible h1:Ix9yFKn1nSPBLFl/yZknTp8TU5G4Ps0JDm
github.com/pierrec/lz4 v2.6.0+incompatible/go.mod h1:pdkljMzZIN41W+lC3N2tnIh5sFi+IEE17M5jbnwPHcY=
github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM=
github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4=
github.com/segmentio/kafka-go v0.4.23/go.mod h1:XzMcoMjSzDGHcIwpWUI7GB43iKZ2fTVmryPSGLf/MPg=
github.com/segmentio/kafka-go v0.4.25 h1:QVx9yz12syKBFkxR+dVDDwTO0ItHgnjjhIdBfqizj+8=
github.com/segmentio/kafka-go v0.4.25/go.mod h1:XzMcoMjSzDGHcIwpWUI7GB43iKZ2fTVmryPSGLf/MPg=
github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME=
Expand Down
221 changes: 220 additions & 1 deletion main.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ import (
"bytes"
"compress/gzip"
"context"
"encoding/base64"
"encoding/json"
"fmt"
"io"
Expand All @@ -23,6 +24,7 @@ import (
"net/http"
"os"
"strconv"
"strings"
"time"

"github.com/google/gopacket"
Expand All @@ -32,6 +34,9 @@ import (

"github.com/akto-api-security/gomiddleware"
"github.com/segmentio/kafka-go"

"golang.org/x/net/http2"
"golang.org/x/net/http2/hpack"
)

var printCounter = 500
Expand Down Expand Up @@ -82,6 +87,16 @@ type myFactory struct {
vxlanID int
}

type http2ReqResp struct {
headersMap map[string]string
payload string
isInvalid bool
}

func (k http2ReqResp) String() string {
return fmt.Sprintf("%v:%v", k.headersMap, k.payload)
}

// New handles creating a new tcpassembly.Stream.
func (f *myFactory) New(netFlow, tcpFlow gopacket.Flow) tcpassembly.Stream {
// Create a new stream.
Expand Down Expand Up @@ -149,7 +164,212 @@ func (s *myStream) ReassemblyComplete() {
s.bidi.maybeFinish()
}

func tryParseAsHttp2Request(bd *bidi, isPending bool) {

streamRequestMap := make(map[string][]http2ReqResp)
framer := http2.NewFramer(nil, bytes.NewReader(bd.a.bytes))

headersMap := make(map[string]string)
payload := ""

gotHeaders := make(map[string]bool)
gotPayload := make(map[string]bool)
decoder := hpack.NewDecoder(4096, func(hf hpack.HeaderField) {

if len(hf.Name) > 0 {
headersMap[hf.Name] = hf.Value
}
})

for {

frame, err := framer.ReadFrame()

if err == io.EOF {
break
}
if err != nil {
continue
}

streamId := fmt.Sprint(frame.Header().StreamID)
if len(streamId) == 0 {
continue
}

if !gotHeaders[streamId] {
headersMap = make(map[string]string)
}

switch f := frame.(type) {
case *http2.HeadersFrame:
_, err := decoder.Write(f.HeaderBlockFragment())
gotHeaders[streamId] = true
if err != nil {
}

case *http2.DataFrame:
if len(string(f.Data())) > 0 {
payload = base64.StdEncoding.EncodeToString(f.Data())
gotPayload[streamId] = true
}
}

if gotHeaders[streamId] && gotPayload[streamId] {
if _, exists := streamRequestMap[streamId]; !exists {
streamRequestMap[streamId] = []http2ReqResp{}
}
streamRequestMap[streamId] = append(streamRequestMap[streamId], http2ReqResp{
headersMap: headersMap,
payload: payload,
})
gotHeaders[streamId] = false
gotPayload[streamId] = false
}
}

gotHeaders = make(map[string]bool)
gotPayload = make(map[string]bool)
gotGrpcHeaders := make(map[string]bool)
headersCount := make(map[string]int)
headersMap = make(map[string]string)
payload = ""

streamResponseMap := make(map[string][]http2ReqResp)
framerResp := http2.NewFramer(nil, bytes.NewReader(bd.b.bytes))
headersMap = make(map[string]string)
decoder = hpack.NewDecoder(4096, func(hf hpack.HeaderField) {
if len(hf.Name) > 0 {
headersMap[hf.Name] = hf.Value
}
})

for {
frame, err := framerResp.ReadFrame()
if err == io.EOF {
break
}
if err != nil {
continue
}

streamId := fmt.Sprint(frame.Header().StreamID)

if len(streamId) == 0 {
continue
}
if !(gotHeaders[streamId]) {
headersMap = make(map[string]string)
}

switch f := frame.(type) {
case *http2.HeadersFrame:
_, err := decoder.Write(f.HeaderBlockFragment())
if err != nil {
log.Printf("Error response decoding headers: %v", err)
}
if headersCount[streamId] == 0 {
if strings.Contains(headersMap["content-type"], "application/grpc") {
gotGrpcHeaders[streamId] = true
}
gotHeaders[streamId] = true
}
headersCount[streamId]++
case *http2.DataFrame:
if len(string(f.Data())) > 0 {
payload = base64.StdEncoding.EncodeToString(f.Data())
gotPayload[streamId] = true
}
}
if gotHeaders[streamId] && gotPayload[streamId] {

if gotGrpcHeaders[streamId] && headersCount[streamId] == 1 {
continue
}

if _, exists := streamResponseMap[streamId]; !exists {
streamResponseMap[streamId] = []http2ReqResp{}
}
streamResponseMap[streamId] = append(streamResponseMap[streamId], http2ReqResp{
headersMap: headersMap,
payload: payload,
})
gotPayload[streamId] = false
gotHeaders[streamId] = false
gotGrpcHeaders[streamId] = false
headersCount[streamId] = 0
}
}

for streamId, http2Req := range streamRequestMap {
http2Resp := streamResponseMap[streamId]
if len(http2Resp) != len(http2Req) {
continue
}
for req := range http2Req {

http2Request := http2Req[req]
http2Response := http2Resp[req]

value := make(map[string]string)

if path, exists := http2Request.headersMap[":path"]; exists {
value["path"] = path
delete(http2Request.headersMap, ":path")
} else {
continue
}
if method, exists := http2Request.headersMap[":method"]; exists {
value["method"] = method
delete(http2Request.headersMap, ":method")
}
if scheme, exists := http2Request.headersMap[":scheme"]; exists {
value["scheme"] = scheme
delete(http2Request.headersMap, ":scheme")
}
if status, exists := http2Response.headersMap[":status"]; exists {
value["statusCode"] = status
value["status"] = status
delete(http2Response.headersMap, ":status")
}
value["requestPayload"] = http2Request.payload
value["responsePayload"] = http2Response.payload

if len(http2Request.headersMap) > 0 {
requestHeaders, _ := json.Marshal(http2Request.headersMap)
value["requestHeaders"] = string(requestHeaders)
}
if len(http2Response.headersMap) > 0 {
responseHeader, _ := json.Marshal(http2Response.headersMap)
value["responseHeaders"] = string(responseHeader)
}

value["type"] = "HTTP/2"
value["ip"] = bd.key.net.Src().String()
value["akto_account_id"] = fmt.Sprint(1000000)
value["akto_vxlan_id"] = fmt.Sprint(bd.vxlanID)
value["time"] = fmt.Sprint(time.Now().Unix())
value["is_pending"] = fmt.Sprint(isPending)
out, _ := json.Marshal(value)
ctx := context.Background()

if printCounter > 0 {
printCounter--
log.Println("req-resp.String()", string(out))
}
go gomiddleware.Produce(kafkaWriter, ctx, string(out))
}

}
}

func tryReadFromBD(bd *bidi, isPending bool) {
if len(bd.a.bytes) > 24 && string(bd.a.bytes[0:24]) == "PRI * HTTP/2.0\r\n\r\nSM\r\n\r\n" {
bd.a.bytes = bd.a.bytes[24:]
tryParseAsHttp2Request(bd, isPending)
return
}

reader := bufio.NewReader(bytes.NewReader(bd.a.bytes))
i := 0
requests := []http.Request{}
Expand All @@ -172,7 +392,6 @@ func tryReadFromBD(bd *bidi, isPending bool) {

requests = append(requests, *req)
requestsContent = append(requestsContent, string(body))
// log.Println("req.URL.String()", i, req.URL.String(), string(body), len(bd.a.bytes))
i++
}

Expand Down
Loading