diff --git a/cloudbuild.yaml b/cloudbuild.yaml index f25d9384a..aedbf73bd 100644 --- a/cloudbuild.yaml +++ b/cloudbuild.yaml @@ -31,6 +31,7 @@ options: env: - PROJECT_ID=$PROJECT_ID - GIT_COMMIT=$COMMIT_SHA + machineType: 'N1_HIGHCPU_8' steps: # Make all git tags available. @@ -80,6 +81,7 @@ steps: "-t", "gcr.io/$PROJECT_ID/etl:$_DOCKER_TAG", "-f", "cmd/etl_worker/Dockerfile.k8s", "." ] + waitFor: ['Unshallow git clone'] - name: gcr.io/cloud-builders/docker id: "Push the docker container to gcr.io" diff --git a/headers/ether.go b/headers/ether.go new file mode 100644 index 000000000..6ee10688a --- /dev/null +++ b/headers/ether.go @@ -0,0 +1,31 @@ +package headers + +import ( + "fmt" + "unsafe" + + "github.com/google/gopacket/layers" + be "github.com/m-lab/etl/internal/bigendian" +) + +/******************************************************************************* + Ethernet Header handling +*******************************************************************************/ + +var ( + ErrUnknownEtherType = fmt.Errorf("unknown Ethernet type") + ErrTruncatedEthernetHeader = fmt.Errorf("truncated Ethernet header") +) + +// EthernetHeader struct for the Ethernet Header, in wire format. +type EthernetHeader struct { + SrcMAC, DstMAC [6]byte + etherType be.BE16 // BigEndian +} + +// EtherType returns the EtherType field of the packet. +func (e *EthernetHeader) EtherType() layers.EthernetType { + return layers.EthernetType(e.etherType.Uint16()) +} + +var EthernetHeaderSize = int(unsafe.Sizeof(EthernetHeader{})) diff --git a/headers/ip.go b/headers/ip.go new file mode 100644 index 000000000..a032dc726 --- /dev/null +++ b/headers/ip.go @@ -0,0 +1,327 @@ +// Package headers contains code to efficiently decode packet headers +// from a PCAP data stream. +package headers + +import ( + "fmt" + "log" + "net" + "os" + "time" + "unsafe" + + "github.com/google/gopacket/layers" + + "github.com/m-lab/go/logx" + + be "github.com/m-lab/etl/internal/bigendian" + nano "github.com/m-lab/etl/internal/nano" + "github.com/m-lab/etl/metrics" +) + +var ( + sparseLogger = log.New(os.Stdout, "sparse: ", log.LstdFlags|log.Lshortfile) + sparse1 = logx.NewLogEvery(sparseLogger, 1000*time.Millisecond) + + ErrTruncatedPcap = fmt.Errorf("truncated PCAP file") + + ErrNoIPLayer = fmt.Errorf("no IP layer") + ErrTruncatedIPHeader = fmt.Errorf("truncated IP header") +) + +/****************************************************************************** + * IP Header handling +******************************************************************************/ + +// IP provides the common interface for IPv4 and IPv6 packet headers. +type IP interface { + Version() uint8 + PayloadLength() int + SrcIP(net.IP) net.IP + DstIP(net.IP) net.IP + NextProtocol() layers.IPProtocol + HopLimit() uint8 + HeaderLength() int +} + +//============================================================================= + +// IPv4Header struct for IPv4 header, in wire format +type IPv4Header struct { + versionIHL uint8 // Version (4 bits) + Internet header length (4 bits) + typeOfService uint8 // Type of service + length be.BE16 // Total length + id be.BE16 // Identification + flagsFragOff be.BE16 // Flags (3 bits) + Fragment offset (13 bits) + hopLimit uint8 // Time to live + protocol layers.IPProtocol // Protocol of next following bytes, after the options + checksum be.BE16 // Header checksum + srcIP be.BE32 // Source address + dstIP be.BE32 // Destination address +} + +var IPv4HeaderSize = int(unsafe.Sizeof(IPv4Header{})) + +func (h *IPv4Header) Version() uint8 { + return (h.versionIHL >> 4) +} + +func (h *IPv4Header) PayloadLength() int { + ihl := h.versionIHL & 0x0f + return int(h.length.Uint16()) - int(ihl*4) +} + +// Overwrite the destination IP with the source IP, allocating if needed. +func replace(dst net.IP, src ...byte) net.IP { + if dst != nil { + dst = dst[:0] + } + return append(dst, src...) +} + +// SrcIP returns the source IP address of the packet. +// It uses the provided backing parameter to avoid allocations. +func (h *IPv4Header) SrcIP(backing net.IP) net.IP { + return replace(backing, h.srcIP[:]...) +} + +// DstIP returns the destination IP address of the packet. +// It uses the provided backing parameter to avoid allocations. +func (h *IPv4Header) DstIP(backing net.IP) net.IP { + return replace(backing, h.dstIP[:]...) +} + +// NextProtocol returns the next protocol in the stack. +func (h *IPv4Header) NextProtocol() layers.IPProtocol { + return h.protocol +} + +// HopLimit returns the (remaining?) TTL of the packet. +func (h *IPv4Header) HopLimit() uint8 { + return h.hopLimit +} + +// HeaderLength returns the length of the header in bytes, +// (including extensions for ipv6). +func (h *IPv4Header) HeaderLength() int { + return int(h.versionIHL&0x0f) << 2 +} + +// ExtensionHeader is used to parse IPv6 extension headers. +type ExtensionHeader struct { + NextHeader layers.IPProtocol + HeaderLength uint8 + OptionsAndPadding [6]byte +} + +type EHWrapper struct { + HeaderType layers.IPProtocol // Type of THIS header, not the next header. + eh *ExtensionHeader + data []byte // All the options and padding, including the first 6 bytes. +} + +// IPv6Header struct for IPv6 header +type IPv6Header struct { + versionTrafficClassFlowLabel be.BE32 // Version (4 bits) + Traffic class (8 bits) + Flow label (20 bits) + payloadLength be.BE16 // Original payload length, NOT the payload size of the captured packet. + nextHeader layers.IPProtocol // Protocol of next layer/header + hopLimit uint8 // Hop limit + srcIP [16]byte + dstIP [16]byte +} + +var IPv6HeaderSize = int(unsafe.Sizeof(IPv6Header{})) + +func OverlayIPv6Header(data []byte) (*IPv6Header, []byte, error) { + if len(data) < int(unsafe.Sizeof(IPv6Header{})) { + return nil, nil, ErrTruncatedIPHeader + } + h := (*IPv6Header)(unsafe.Pointer(&data[0])) + if h.Version() != 6 { + return nil, nil, fmt.Errorf("IPv6 packet with version %d", h.Version()) + } + return h, data[IPv6HeaderSize:], nil +} + +// Overlay reuses this object, using the provided wire data. +// The wire data is NOT copied, but is used to back the object fields. +func (w *IPv6Wrapper) Overlay(wire []byte) (payload []byte, err error) { + w.IPv6Header, _, err = OverlayIPv6Header(wire) + if err != nil { + return nil, err + } + err = w.handleExtensionHeaders(wire) + if err != nil { + return nil, err + } + if len(wire) < w.headerLength { + return nil, ErrTruncatedIPHeader + } + return wire[w.headerLength:], err +} + +func (h *IPv6Header) Version() uint8 { + return (h.versionTrafficClassFlowLabel[0] >> 4) +} + +func (h *IPv6Header) PayloadLength() int { + return int(h.payloadLength.Uint16()) +} + +func (h *IPv6Header) SrcIP(backing net.IP) net.IP { + return replace(backing, h.srcIP[:]...) +} + +// DstIP returns the destination IP address of the packet. +func (h *IPv6Header) DstIP(backing net.IP) net.IP { + return replace(backing, h.dstIP[:]...) +} + +func (h *IPv6Header) HopLimit() uint8 { + return h.hopLimit +} + +// TODO - This may not be what we want. +func (h *IPv6Header) NextProtocol() layers.IPProtocol { + return h.nextHeader +} + +func (h *IPv6Header) HeaderLength() int { + // BUG - this is WRONG + return IPv4HeaderSize +} + +type IPv6Wrapper struct { + *IPv6Header + ext []EHWrapper + headerLength int +} + +func (w *IPv6Wrapper) HeaderLength() int { + return w.headerLength +} + +// handleExtensionHeaders reuses the IPv6 header, overlaying it on provided wire data. +// It does not copy or disturb the underlying data. +func (w *IPv6Wrapper) handleExtensionHeaders(rawWire []byte) error { + if w == nil { + return fmt.Errorf("nil IPv6Wrapper") + } + if w.ext != nil { + w.ext = make([]EHWrapper, 0) + } + w.ext = w.ext[:0] + + if w.nextHeader == layers.IPProtocolNoNextHeader { + return nil + } + + np := w.NextProtocol() + for { + switch np { + case layers.IPProtocolNoNextHeader: + return nil + case layers.IPProtocolIPv6HopByHop: + case layers.IPProtocolTCP: + return nil + default: + metrics.WarningCount.WithLabelValues("pcap", "ipv6", "unsupported_extension_type").Inc() + sparse1.Println("Other IPv6 extension type", np) + } + + if len(rawWire) < 8 { + metrics.ErrorCount.WithLabelValues("pcap", "ipv6", "truncated_extension").Inc() + return ErrTruncatedIPHeader + } + + eh := (*ExtensionHeader)(unsafe.Pointer(&rawWire[0])) + if len(rawWire) < int(8+eh.HeaderLength) { + metrics.ErrorCount.WithLabelValues("pcap", "ipv6", "truncated_extension").Inc() + return ErrTruncatedIPHeader + } + w.ext = append(w.ext, EHWrapper{ + HeaderType: np, + eh: eh, + data: rawWire[2 : 8+eh.HeaderLength], + }) + w.headerLength += int(eh.HeaderLength) + 8 + rawWire = rawWire[8+eh.HeaderLength:] + np = eh.NextHeader + } +} + +// Packet struct contains the packet data and metadata. +// Since it is intended primary to access IP and TCP, those interfaces +// are exposes as embedded fields. +type Packet struct { + PktTime nano.UnixNano + eth *EthernetHeader // Pointer to the Ethernet header, if available. + IP // Access to the IP header, if available. + v4 *IPv4Header // DO NOT USE. Use ip field instead. + v6 *IPv6Wrapper // DO NOT USE. Use ip field instead. + + sharedBacking []byte // The raw packet data, including header. NOT a copy! +} + +// RawForTest provides access to the raw packet data for testing. +func (p *Packet) RawForTest() []byte { + return p.sharedBacking +} + +// Overlay updates THIS Packet object to overlay the underlying packet data, +// passed in wire format. It avoids copying and allocation as much as possible. +func (p *Packet) Overlay(pTime nano.UnixNano, wire []byte) (err error) { + + if len(wire) < EthernetHeaderSize { + metrics.ErrorCount.WithLabelValues("pcap", "ethernet", "truncated_header").Inc() + err = ErrTruncatedEthernetHeader + return + } + p.sharedBacking = wire + p.PktTime = pTime + p.eth = (*EthernetHeader)(unsafe.Pointer(&wire[0])) + + switch p.eth.EtherType() { + case layers.EthernetTypeIPv4: + if len(wire) < EthernetHeaderSize+IPv4HeaderSize { + metrics.ErrorCount.WithLabelValues("pcap", "ipv4", "truncated_header").Inc() + err = ErrTruncatedIPHeader + return + } + p.v4 = (*IPv4Header)(unsafe.Pointer(&wire[EthernetHeaderSize])) + p.IP = p.v4 + case layers.EthernetTypeIPv6: + if len(wire) < EthernetHeaderSize+IPv6HeaderSize { + metrics.ErrorCount.WithLabelValues("pcap", "ipv6", "truncated_header").Inc() + err = ErrTruncatedIPHeader + return + } + if p.v6 == nil { + // This allocation should only happen once. + p.v6 = &IPv6Wrapper{} + } + _, err = p.v6.Overlay(wire[EthernetHeaderSize:]) + if err != nil { + return + } + p.IP = p.v6 + default: + err = ErrUnknownEtherType + return + } + if p.IP != nil { + switch p.IP.NextProtocol() { + case layers.IPProtocolTCP: + // TODO - add TCP layer decoding + } + } + + return nil +} + +func (p *Packet) PayloadLength() int { + if p.IP == nil { + return 0 + } + return p.IP.PayloadLength() +} diff --git a/headers/ip_test.go b/headers/ip_test.go new file mode 100644 index 000000000..2d69348b4 --- /dev/null +++ b/headers/ip_test.go @@ -0,0 +1,176 @@ +package headers_test + +import ( + "bytes" + "io" + "io/ioutil" + "log" + "os" + "path" + "strings" + "testing" + "time" + + "github.com/google/gopacket/layers" + "github.com/google/gopacket/pcapgo" + + nano "github.com/m-lab/etl/internal/nano" + + "github.com/m-lab/etl/headers" +) + +func init() { + log.SetFlags(log.LstdFlags | log.Lshortfile) +} + +//lint:ignore U1000 unused +func assertV6isIP(ip *headers.IPv6Header) { + func(headers.IP) {}(ip) +} + +//lint:ignore U1000 unused +func assertV4isIP(ip *headers.IPv4Header) { + func(headers.IP) {}(ip) +} + +func getTestfileForBenchmark(b *testing.B, name string) []byte { + f, err := os.Open(path.Join(`testdata/`, name)) + if err != nil { + b.Fatal(err) + } + data, err := ioutil.ReadAll(f) + if err != nil { + b.Fatal(err) + } + return data +} + +func getTestfile(t *testing.T, name string) []byte { + f, err := os.Open(path.Join(`testdata/`, name)) + if err != nil { + t.Fatal(err) + } + data, err := ioutil.ReadAll(f) + if err != nil { + t.Fatal(err) + } + return data +} + +func ProcessPackets(data []byte) (int, error) { + pcap, err := pcapgo.NewReader(bytes.NewReader(data)) + if err != nil { + return 0, err + } + + p := headers.Packet{} + count := 0 + for data, ci, err := pcap.ZeroCopyReadPacketData(); err == nil; data, ci, err = pcap.ZeroCopyReadPacketData() { + err := p.Overlay(nano.UnixNano(ci.Timestamp.UnixNano()), data) + if err != nil { + return count, err + } + count++ + } + + return count, nil +} + +func ProcessShortPackets(t *testing.T, data []byte) { + pcap, err := pcapgo.NewReader(bytes.NewReader(data)) + if err != nil { + t.Fatal(err) + } + + p := headers.Packet{} + for data, ci, err := pcap.ReadPacketData(); err == nil; data, ci, err = pcap.ZeroCopyReadPacketData() { + for i := 0; i < len(data); i++ { + p.Overlay(nano.UnixNano(ci.Timestamp.UnixNano()), data[:i]) + p.Overlay(nano.UnixNano(ci.Timestamp.UnixNano()), data[i:]) + } + } +} + +func TestShortData(t *testing.T) { + type test struct { + name string + fn string + packets int64 + duration time.Duration + srcIP string + srcPort, dstPort layers.TCPPort + TTL uint8 + } + tests := []test{ + {name: "retransmits", fn: "ndt-nnwk2_1611335823_00000000000C2DFE.pcap.gz", + packets: 336, duration: 15409174000, srcIP: "173.49.19.128", srcPort: 40337, dstPort: 443}, + {name: "ipv6", fn: "ndt-nnwk2_1611335823_00000000000C2DA8.pcap.gz", + packets: 15, duration: 134434000, srcIP: "2a0d:5600:24:a71::1d", srcPort: 1894, dstPort: 443}, + } + for _, tt := range tests { + data := getTestfile(t, tt.fn) + ProcessShortPackets(t, data) + } +} + +func TestPCAPGarbage(t *testing.T) { + data := []byte{0xd4, 0xc3, 0xb2, 0xa1, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12} + _, err := ProcessPackets(data) + if err != io.ErrUnexpectedEOF { + t.Fatal(err) + } + + data = append(data, data...) + _, err = ProcessPackets(data) + if err == nil || !strings.Contains(err.Error(), "Unknown major") { + t.Fatal(err) + } +} + +// goos: darwin goarch: amd64 cpu: Intel(R) Core(TM) i7-7920HQ CPU @ 3.10GHz +// BenchmarkProcessPackets2-8 316 3763748 ns/op 469.07 MB/s 36776 packets/op 60491 B/op 207 allocs/op +func BenchmarkProcessPackets2(b *testing.B) { + type tt struct { + data []byte + numPkts int + ipPayloadBytes int + leftSacks, rightSacks int + } + tests := []tt{ + // Approximately 220K packets, so this is about 140nsec/packet, and about 100 bytes/packet allocated, + // which is roughly the footprint of the packets themselves. + {getTestfileForBenchmark(b, "ndt-nnwk2_1611335823_00000000000C2DA8.pcap.gz"), 15, 4574, 0, 0}, + {getTestfileForBenchmark(b, "ndt-nnwk2_1611335823_00000000000C2DFE.pcap.gz"), 336, 167003, 31, 24}, // retransmits and SACKs + {getTestfileForBenchmark(b, "ndt-nnwk2_1611335823_00000000000C2DA9.pcap.gz"), 5180, 81408294, 0, 0}, + {getTestfileForBenchmark(b, "ndt-m6znc_1632401351_000000000005BA77.pcap.gz"), 40797, 239251626, 70557, 207}, + {getTestfileForBenchmark(b, "ndt-m6znc_1632401351_000000000005B9EA.pcap.gz"), 146172, 158096007, 7, 195}, + {getTestfileForBenchmark(b, "ndt-m6znc_1632401351_000000000005B90B.pcap.gz"), 30097, 126523401, 0, 0}, + } + b.ReportAllocs() + b.ResetTimer() + + b.ReportMetric(220000, "packets/op") + + i := 0 + + numPkts := 0 + ops := 0 + b.RunParallel(func(pb *testing.PB) { + for pb.Next() { + test := tests[i%len(tests)] + ops++ + numPkts += test.numPkts + i++ + count, err := ProcessPackets(test.data) + if err != nil { + b.Fatal(err) + } + if count != test.numPkts { + b.Errorf("expected %d packets, got %d", test.numPkts, count) + } + b.SetBytes(int64(len(test.data))) + } + }) + b.Log("total packets", numPkts, "total ops", ops) + b.ReportMetric(float64(numPkts/ops), "packets/op") +} diff --git a/headers/testdata/ndt-4dh2l_1591894023_0000000000363A84.pcap.gz b/headers/testdata/ndt-4dh2l_1591894023_0000000000363A84.pcap.gz new file mode 100644 index 000000000..f59380329 Binary files /dev/null and b/headers/testdata/ndt-4dh2l_1591894023_0000000000363A84.pcap.gz differ diff --git a/headers/testdata/ndt-m6znc_1632401351_000000000005B90B.pcap.gz b/headers/testdata/ndt-m6znc_1632401351_000000000005B90B.pcap.gz new file mode 100644 index 000000000..0fc4f0fb1 Binary files /dev/null and b/headers/testdata/ndt-m6znc_1632401351_000000000005B90B.pcap.gz differ diff --git a/headers/testdata/ndt-m6znc_1632401351_000000000005B9EA.pcap.gz b/headers/testdata/ndt-m6znc_1632401351_000000000005B9EA.pcap.gz new file mode 100644 index 000000000..891665bb8 Binary files /dev/null and b/headers/testdata/ndt-m6znc_1632401351_000000000005B9EA.pcap.gz differ diff --git a/headers/testdata/ndt-m6znc_1632401351_000000000005BA77.pcap.gz b/headers/testdata/ndt-m6znc_1632401351_000000000005BA77.pcap.gz new file mode 100644 index 000000000..4bc429327 Binary files /dev/null and b/headers/testdata/ndt-m6znc_1632401351_000000000005BA77.pcap.gz differ diff --git a/headers/testdata/ndt-nnwk2_1611335823_00000000000C2DA2.pcap.gz b/headers/testdata/ndt-nnwk2_1611335823_00000000000C2DA2.pcap.gz new file mode 100644 index 000000000..718f4d2e9 Binary files /dev/null and b/headers/testdata/ndt-nnwk2_1611335823_00000000000C2DA2.pcap.gz differ diff --git a/headers/testdata/ndt-nnwk2_1611335823_00000000000C2DA8.pcap.gz b/headers/testdata/ndt-nnwk2_1611335823_00000000000C2DA8.pcap.gz new file mode 100644 index 000000000..92acef543 Binary files /dev/null and b/headers/testdata/ndt-nnwk2_1611335823_00000000000C2DA8.pcap.gz differ diff --git a/headers/testdata/ndt-nnwk2_1611335823_00000000000C2DA9.pcap.gz b/headers/testdata/ndt-nnwk2_1611335823_00000000000C2DA9.pcap.gz new file mode 100644 index 000000000..19edad182 Binary files /dev/null and b/headers/testdata/ndt-nnwk2_1611335823_00000000000C2DA9.pcap.gz differ diff --git a/headers/testdata/ndt-nnwk2_1611335823_00000000000C2DFE.pcap.gz b/headers/testdata/ndt-nnwk2_1611335823_00000000000C2DFE.pcap.gz new file mode 100644 index 000000000..cdd5ed173 Binary files /dev/null and b/headers/testdata/ndt-nnwk2_1611335823_00000000000C2DFE.pcap.gz differ diff --git a/internal/bigendian/bigendian.go b/internal/bigendian/bigendian.go new file mode 100644 index 000000000..db50335d4 --- /dev/null +++ b/internal/bigendian/bigendian.go @@ -0,0 +1,27 @@ +package bigendian + +import "unsafe" + +//============================================================================= + +// These provide byte swapping from BigEndian to LittleEndian. +// Much much faster than binary.BigEndian.UintNN. +// NOTE: If this code is used on a BigEndian machine, it should cause unit tests to fail. + +// BE16 is a 16-bit big-endian value. +type BE16 [2]byte + +// Uint16 returns the 16-bit value in LitteEndian. +func (b BE16) Uint16() uint16 { + swap := [2]byte{b[1], b[0]} + return *(*uint16)(unsafe.Pointer(&swap)) +} + +// BE32 is a 32-bit big-endian value. +type BE32 [4]byte + +// Uint32 returns the 32-bit value in LitteEndian. +func (b BE32) Uint32() uint32 { + swap := [4]byte{b[3], b[2], b[1], b[0]} + return *(*uint32)(unsafe.Pointer(&swap)) +} diff --git a/internal/nano/unixnano.go b/internal/nano/unixnano.go new file mode 100644 index 000000000..808df1862 --- /dev/null +++ b/internal/nano/unixnano.go @@ -0,0 +1,14 @@ +package nano + +import "time" + +//============================================================================= + +// UnixNano is a Unix timestamp in nanoseconds. +// It provided more efficient basic time operations. +type UnixNano int64 + +// Sub returns the difference between two unix times. +func (t UnixNano) Sub(other UnixNano) time.Duration { + return time.Duration(t - other) +} diff --git a/parser/pcap.go b/parser/pcap.go index 760e4c1bb..46b0cb59d 100644 --- a/parser/pcap.go +++ b/parser/pcap.go @@ -3,7 +3,6 @@ package parser import ( "fmt" "log" - "net" "os" "path/filepath" "strings" @@ -11,14 +10,12 @@ import ( "cloud.google.com/go/bigquery" "cloud.google.com/go/civil" - "github.com/google/gopacket" - "github.com/google/gopacket/layers" - "github.com/google/gopacket/pcapgo" v2as "github.com/m-lab/annotation-service/api/v2" "github.com/m-lab/etl/etl" "github.com/m-lab/etl/metrics" "github.com/m-lab/etl/row" "github.com/m-lab/etl/schema" + "github.com/m-lab/etl/tcpip" "github.com/m-lab/go/logx" ) @@ -30,87 +27,6 @@ var ( ErrNoIPLayer = fmt.Errorf("no IP layer") ) -// Packet struct contains the packet data and metadata. -type Packet struct { - // If we use a pointer here, for some reason we get zero value timestamps. - Ci gopacket.CaptureInfo - Data []byte - Err error -} - -// GetIP decodes the IP layers and returns some basic information. -// It is a bit slow and does memory allocation. -func (p *Packet) GetIP() (net.IP, net.IP, uint8, uint16, error) { - // Decode a packet. - pkt := gopacket.NewPacket(p.Data, layers.LayerTypeEthernet, gopacket.DecodeOptions{ - Lazy: true, - NoCopy: true, - SkipDecodeRecovery: true, - DecodeStreamsAsDatagrams: false, - }) - - if ipLayer := pkt.Layer(layers.LayerTypeIPv4); ipLayer != nil { - ip, _ := ipLayer.(*layers.IPv4) - // For IPv4, the TTL length is the ip.Length adjusted for the header length. - return ip.SrcIP, ip.DstIP, ip.TTL, ip.Length - uint16(4*ip.IHL), nil - } else if ipLayer := pkt.Layer(layers.LayerTypeIPv6); ipLayer != nil { - ip, _ := ipLayer.(*layers.IPv6) - // In IPv6, the Length field is the payload length. - return ip.SrcIP, ip.DstIP, ip.HopLimit, ip.Length, nil - } else { - return nil, nil, 0, 0, ErrNoIPLayer - } -} - -func GetPackets(data []byte) ([]Packet, error) { - pcap, err := pcapgo.NewReader(strings.NewReader(string(data))) - if err != nil { - log.Print(err) - return nil, err - } - - // TODO: len(data)/18 provides much better estimate of number of packets. - // len(data)/18 was determined by looking at bytes/packet in a few pcaps files. - // The number seems too small, but perhaps the data is still compressed at this point. - // However, it seems to cause mysterious crashes in sandbox, so - // reverting to /1500 for now. - packets := make([]Packet, 0, len(data)/1500) - - for data, ci, err := pcap.ZeroCopyReadPacketData(); err == nil; data, ci, err = pcap.ReadPacketData() { - packets = append(packets, Packet{Ci: ci, Data: data, Err: err}) - } - - if err != nil { - metrics.WarningCount.WithLabelValues("pcap", "ip_layer_failure").Inc() - metrics.PcapPacketCount.WithLabelValues("IP error").Observe(float64(len(packets))) - return packets, err - } else if len(packets) > 0 { - srcIP, _, _, _, err := packets[0].GetIP() - // TODO - eventually we should identify key local ports, like 443 and 3001. - if err != nil { - metrics.WarningCount.WithLabelValues("pcap", "ip_layer_failure").Inc() - metrics.PcapPacketCount.WithLabelValues("IP error").Observe(float64(len(packets))) - } else { - start := packets[0].Ci.Timestamp - end := packets[len(packets)-1].Ci.Timestamp - duration := end.Sub(start) - // TODO add TCP layer, so we can label the stats based on local port value. - if len(srcIP) == 4 { - metrics.PcapPacketCount.WithLabelValues("ipv4").Observe(float64(len(packets))) - metrics.PcapConnectionDuration.WithLabelValues("ipv4").Observe(duration.Seconds()) - } else { - metrics.PcapPacketCount.WithLabelValues("ipv6").Observe(float64(len(packets))) - metrics.PcapConnectionDuration.WithLabelValues("ipv6").Observe(duration.Seconds()) - } - } - } else { - // No packets. - metrics.PcapPacketCount.WithLabelValues("unknown").Observe(float64(len(packets))) - } - - return packets, nil -} - //===================================================================================== // PCAP Parser //===================================================================================== @@ -171,8 +87,27 @@ func (p *PCAPParser) ParseAndInsert(fileMetadata map[string]bigquery.Value, test row.ID = p.GetUUID(testName) // Parse top level PCAP data and update metrics. - // TODO - add schema fields here. - _, _ = GetPackets(rawContent) + summary, err := tcpip.ProcessPackets(row.Parser.ArchiveURL, testName, rawContent) + server := summary.Server() + client := summary.Client() + + if err != nil { + metrics.ErrorCount.WithLabelValues(p.TableName(), "parsing", "error").Inc() + } else if server.SrcIP == nil || client.SrcIP == nil { + // Seeing about 1 of these in 10K traces. + metrics.WarningCount.WithLabelValues(p.TableName(), "parsing", "server unidentified").Inc() + row.A = schema.PCAPSummary{ + StartTime: time.Unix(0, int64(summary.StartTime)), + EndTime: time.Unix(0, int64(summary.LastTime)), + } + } else { + row.A = schema.PCAPSummary{ + PacketsSent: server.Packets, + PacketsReceived: client.Packets, + StartTime: time.Unix(0, int64(summary.StartTime)), + EndTime: time.Unix(0, int64(summary.LastTime)), + } + } // Insert the row. if err := p.Put(&row); err != nil { diff --git a/parser/pcap_test.go b/parser/pcap_test.go index 5f303f4a2..7be7822c3 100644 --- a/parser/pcap_test.go +++ b/parser/pcap_test.go @@ -1,9 +1,7 @@ package parser_test import ( - "io" "io/ioutil" - "os" "path" "strings" "testing" @@ -56,16 +54,24 @@ func TestPCAPParser_ParseAndInsert(t *testing.T) { GitCommit: "12345678", } + expectedA := schema.PCAPSummary{} + + expectedA.StartTime, err = time.Parse("2006-01-02 15:04:05.00000 -0700", "2021-07-20 20:00:01.18105 -0400") + if err != nil { + t.Fatal(err) + } + expectedA.EndTime, err = time.Parse("2006-01-02 15:04:05.000000 -0700", "2021-07-20 20:00:11.798528 -0400") + expectedPCAPRow := schema.PCAPRow{ ID: "ndt-4c6fb_1625899199_000000000121C1A0", Parser: expectedParseInfo, Date: date, + A: expectedA, } if diff := deep.Equal(row, &expectedPCAPRow); diff != nil { t.Errorf("PCAPParser.ParseAndInsert() different row: %s", strings.Join(diff, "\n")) } - } func TestPCAPParser_IsParsable(t *testing.T) { @@ -134,111 +140,3 @@ func TestPCAPParser_GetUUID(t *testing.T) { }) } } - -func TestIPLayer(t *testing.T) { - type test struct { - name string - fn string - packets int64 - duration time.Duration - srcIP, dstIP string - TTL uint8 - } - tests := []test{ - {name: "retransmits", fn: "testdata/PCAP/ndt-nnwk2_1611335823_00000000000C2DFE.pcap.gz", - packets: 336, duration: 15409174000, srcIP: "173.49.19.128"}, - {name: "ipv6", fn: "testdata/PCAP/ndt-nnwk2_1611335823_00000000000C2DA8.pcap.gz", - packets: 15, duration: 134434000, srcIP: "2a0d:5600:24:a71::1d"}, - {name: "protocolErrors2", fn: "testdata/PCAP/ndt-nnwk2_1611335823_00000000000C2DA9.pcap.gz", - packets: 5180, duration: 13444117000, srcIP: "2a0d:5600:24:a71::1d"}, - } - for _, tt := range tests { - f, err := os.Open(tt.fn) - if err != nil { - t.Fatal(err) - } - data, err := ioutil.ReadAll(f) - if err != nil { - t.Fatal(err) - } - packets, err := parser.GetPackets(data) - if err != nil { - t.Fatal(err) - } - start := packets[0].Ci.Timestamp - end := packets[len(packets)-1].Ci.Timestamp - duration := end.Sub(start) - if duration != tt.duration { - t.Errorf("%s: duration = %v, want %v", tt.name, duration, tt.duration) - } - if len(packets) != int(tt.packets) { - t.Errorf("%s: expected %d packets, got %d", tt.name, tt.packets, len(packets)) - } - srcIP, _, _, _, err := packets[0].GetIP() - if err != nil { - t.Fatal(err) - } - if srcIP.String() != tt.srcIP { - t.Errorf("%s: expected srcIP %s, got %s", tt.name, tt.srcIP, srcIP.String()) - } - } -} - -func TestPCAPGarbage(t *testing.T) { - data := []byte{0xd4, 0xc3, 0xb2, 0xa1, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12} - _, err := parser.GetPackets(data) - if err != io.ErrUnexpectedEOF { - t.Fatal(err) - } - - data = append(data, data...) - _, err = parser.GetPackets(data) - if err == nil || !strings.Contains(err.Error(), "Unknown major") { - t.Fatal(err) - } -} - -func getTestFile(b *testing.B, name string) []byte { - f, err := os.Open(path.Join(`testdata/PCAP/`, name)) - if err != nil { - b.Fatal(err) - } - data, err := ioutil.ReadAll(f) - if err != nil { - b.Fatal(err) - } - return data -} - -// Original single file RunParallel: -// Just packet decoding: BenchmarkGetPackets-8 8678 128426 ns/op 165146 B/op 381 allocs/op -// With IP decoding: BenchmarkGetPackets-8 4279 285547 ns/op 376125 B/op 1729 allocs/op - -// Enhanced RunParallel: BenchmarkGetPackets-8 2311 514898 ns/op 1181138 B/op 1886 allocs/op -func BenchmarkGetPackets(b *testing.B) { - type tt struct { - data []byte - numPkts int - } - tests := []tt{ - {getTestFile(b, "ndt-nnwk2_1611335823_00000000000C2DFE.pcap.gz"), 336}, - {getTestFile(b, "ndt-nnwk2_1611335823_00000000000C2DA8.pcap.gz"), 15}, - {getTestFile(b, "ndt-nnwk2_1611335823_00000000000C2DA9.pcap.gz"), 5180}, - } - b.ResetTimer() - - i := 0 - b.RunParallel(func(pb *testing.PB) { - for pb.Next() { - test := tests[i%len(tests)] - i++ - pkts, err := parser.GetPackets(test.data) - if err != nil { - b.Fatal(err) - } - if len(pkts) != test.numPkts { - b.Errorf("expected %d packets, got %d", test.numPkts, len(pkts)) - } - } - }) -} diff --git a/profile.sandbox-1218d0.cpu b/profile.sandbox-1218d0.cpu new file mode 100644 index 000000000..5ed10e54a Binary files /dev/null and b/profile.sandbox-1218d0.cpu differ diff --git a/schema/pcap.go b/schema/pcap.go index 23a8d4e1b..95b09e6dd 100644 --- a/schema/pcap.go +++ b/schema/pcap.go @@ -1,17 +1,30 @@ package schema import ( + "time" + "cloud.google.com/go/bigquery" "cloud.google.com/go/civil" "github.com/m-lab/etl/row" "github.com/m-lab/go/cloud/bqx" ) +// PCAPSummary is used for the 'a' field, and contains summary data +// describing high level characteristics of the connection. +type PCAPSummary struct { + StartTime time.Time + EndTime time.Time + PacketsSent int // requires IP header parsing + PacketsReceived int // requires IP header parsing +} + // PCAPRow describes a single BQ row of pcap (packet capture) data. type PCAPRow struct { - ID string `bigquery:"id"` - Parser ParseInfo `bigquery:"parser"` - Date civil.Date `bigquery:"date"` + ID string `bigquery:"id" json:"id"` + Parser ParseInfo `bigquery:"parser" json:"parser"` + Date civil.Date `bigquery:"date" json:"date"` + + A PCAPSummary `bigquery:"a" json:"a"` // NOT part of struct schema. Included only to provide a fake annotator interface. row.NullAnnotator `bigquery:"-"` diff --git a/tcpip/tcpip.go b/tcpip/tcpip.go new file mode 100644 index 000000000..dbfc56ab0 --- /dev/null +++ b/tcpip/tcpip.go @@ -0,0 +1,162 @@ +// Package tcpip contains code to extract IP and TCP packets from a PCAP file, +// and model the TCP connection state machine. +package tcpip + +// The key to safety with unsafe pointers is to gaurantee that the +// pointer is nil before the underlying object goes out of scope. +// The opposite is more likely, if there is a wrapper object containing +// to both the underlying object and the unsafe pointer. The wrapper +// points to the underlying object, and the unsafe pointer, and when +// it is garbage collected, it will make both the underlying object +// and the unsafe pointer eligible for collection. + +import ( + "bytes" + "log" + "net" + "os" + "time" + + "github.com/google/gopacket/pcapgo" + + "github.com/m-lab/annotation-service/site" + "github.com/m-lab/go/logx" + "github.com/m-lab/uuid-annotator/annotator" + + "github.com/m-lab/etl/headers" + nano "github.com/m-lab/etl/internal/nano" + "github.com/m-lab/etl/metrics" +) + +var ( + info = log.New(os.Stdout, "info: ", log.LstdFlags|log.Lshortfile) + sparseLogger = log.New(os.Stdout, "sparse: ", log.LstdFlags|log.Lshortfile) + sparse1 = logx.NewLogEvery(sparseLogger, 1000*time.Millisecond) +) + +type Stats struct { + SrcIP net.IP + Packets int + Bytes int +} + +type Summary struct { + init bool + + HopLimit uint8 + Packets int + StartTime nano.UnixNano + LastTime nano.UnixNano + + Left, Right Stats + + // These eventually point to the server and client stats. + server, client *Stats + + srcIP, dstIP net.IP +} + +func (s *Summary) Client() Stats { + if s.client == nil { + return Stats{} + } + return *s.client +} + +func (s *Summary) Server() Stats { + if s.client == nil { + return Stats{} + } + return *s.server +} + +func (s *Summary) Add(p *headers.Packet) { + ip := p.IP + + s.srcIP = ip.SrcIP(s.srcIP) // ESCAPE - these reduce escapes to the heap + s.dstIP = ip.DstIP(s.dstIP) + if !s.init { + s.StartTime = p.PktTime + s.HopLimit = ip.HopLimit() + + s.Left.SrcIP = append([]byte{}, s.srcIP[:]...) + s.Right.SrcIP = append([]byte{}, s.dstIP[:]...) + + s.init = true + } + + s.LastTime = p.PktTime + + if s.srcIP.Equal(s.Left.SrcIP) { + s.Left.Packets++ + s.Left.Bytes += p.PayloadLength() + } else if s.srcIP.Equal(s.Right.SrcIP) { + s.Right.Packets++ + s.Right.Bytes += p.PayloadLength() + } else { + // TODO + } + s.Packets++ +} + +func (s *Summary) Finish() bool { + if !s.init { + return false + } + leftAnn := annotator.ServerAnnotations{} + site.Annotate(s.Left.SrcIP.String(), &leftAnn) + rightAnn := annotator.ServerAnnotations{} + site.Annotate(s.Right.SrcIP.String(), &rightAnn) + if leftAnn.Site != "" { + s.server = &s.Left + s.client = &s.Right + return true + } else if rightAnn.Site != "" { + s.server = &s.Right + s.client = &s.Left + return true + } + sparse1.Printf("no site identified for %v / %v", s.Left.SrcIP, s.Right.SrcIP) + return false +} + +func ProcessPackets(archive, fn string, data []byte) (Summary, error) { + // ESCAPE maps are escaping to the heap + summary := Summary{} + + pcap, rdrErr := pcapgo.NewReader(bytes.NewReader(data)) + if rdrErr != nil { + log.Print(rdrErr) + return summary, rdrErr + } + + p := headers.Packet{} + for pData, ci, pktErr := pcap.ReadPacketData(); pktErr == nil; pData, ci, pktErr = pcap.ZeroCopyReadPacketData() { + // Pass ci by pointer, but Wrap will make a copy, since gopacket NoCopy doesn't preserve the values. + overlayErr := p.Overlay(nano.UnixNano(ci.Timestamp.UnixNano()), pData) + if overlayErr != nil { + sparse1.Println(archive, fn, overlayErr, pData) + continue + } + summary.Add(&p) + } + + if summary.Finish() { + serverIP := summary.Server().SrcIP + // TODO - eventually we should identify key local ports, like 443 and 3001. + duration := summary.LastTime.Sub(summary.StartTime) + // TODO add TCP layer, so we can label the stats based on local port value. + if len(serverIP) == 4 { + metrics.PcapPacketCount.WithLabelValues("ipv4").Observe(float64(summary.Packets)) + metrics.PcapConnectionDuration.WithLabelValues("ipv4").Observe(duration.Seconds()) + } else { + metrics.PcapPacketCount.WithLabelValues("ipv6").Observe(float64(summary.Packets)) + metrics.PcapConnectionDuration.WithLabelValues("ipv6").Observe(duration.Seconds()) + } + } else { + // Server IP not found in the summary. + metrics.PcapPacketCount.WithLabelValues("unknown").Observe(float64(summary.Packets)) + } + + return summary, nil +} diff --git a/tcpip/tcpip_test.go b/tcpip/tcpip_test.go new file mode 100644 index 000000000..fd6066882 --- /dev/null +++ b/tcpip/tcpip_test.go @@ -0,0 +1,146 @@ +package tcpip_test + +import ( + "io/ioutil" + "log" + "net" + "os" + "path" + "testing" + "time" + + "github.com/google/gopacket/layers" + "github.com/m-lab/annotation-service/site" + "github.com/m-lab/etl/tcpip" +) + +func init() { + log.SetFlags(log.LstdFlags | log.Lshortfile) +} + +func getTestfileForBenchmark(b *testing.B, name string) []byte { + f, err := os.Open(path.Join(`testdata/`, name)) + if err != nil { + b.Fatal(err) + } + data, err := ioutil.ReadAll(f) + if err != nil { + b.Fatal(err) + } + return data +} + +func getTestfile(t *testing.T, name string) []byte { + f, err := os.Open(path.Join(`testdata/`, name)) + if err != nil { + t.Fatal(err) + } + data, err := ioutil.ReadAll(f) + if err != nil { + t.Fatal(err) + } + return data +} + +func TestIPLayer(t *testing.T) { + site.MustLoad(time.Minute) + + type test struct { + name string + fn string + packets int64 + duration time.Duration + srcIP, dstIP string + srcPort, dstPort layers.TCPPort + TTL uint8 + totalPayload int + } + tests := []test{ + {name: "retransmits", fn: "ndt-nnwk2_1611335823_00000000000C2DFE.pcap.gz", + packets: 336, duration: 15409174000, srcIP: "173.49.19.128", srcPort: 40337, dstPort: 443}, + {name: "ipv6", fn: "ndt-nnwk2_1611335823_00000000000C2DA8.pcap.gz", + packets: 15, duration: 134434000, srcIP: "2a0d:5600:24:a71::1d", srcPort: 1894, dstPort: 443}, + {name: "protocolErrors2", fn: "ndt-nnwk2_1611335823_00000000000C2DA9.pcap.gz", + packets: 5180, duration: 13444117000, srcIP: "2a0d:5600:24:a71::1d", srcPort: 1896, dstPort: 443}, + + {name: "large_ipv4_1", fn: "ndt-m6znc_1632401351_000000000005BA77.pcap.gz", + packets: 40797, duration: 10719662000, srcIP: "70.187.37.14", srcPort: 60232, dstPort: 443, totalPayload: 239251626}, + {name: "large_ipv6", fn: "ndt-m6znc_1632401351_000000000005B9EA.pcap.gz", + packets: 146172, duration: 15081049000, srcIP: "2600:1700:42d0:67b0:71e7:d89:1d89:9484", srcPort: 49319, dstPort: 443, totalPayload: 158096007}, + {name: "large_ipv4_2", fn: "ndt-m6znc_1632401351_000000000005B90B.pcap.gz", + packets: 30097, duration: 11415041000, srcIP: "104.129.205.7", srcPort: 15227, dstPort: 443, totalPayload: 126523401}, + + {name: "Nops", fn: "ndt-nnwk2_1611335823_00000000000C2DA2.pcap.gz", srcIP: "69.124.153.192", srcPort: 3855, dstPort: 3010, + packets: 18, duration: 173433000}, + } + for _, tt := range tests { + data := getTestfile(t, tt.fn) + summary, err := tcpip.ProcessPackets("none", tt.fn, data) + if err != nil { + t.Fatal(err) + } + duration := summary.LastTime.Sub(summary.StartTime) + if duration != tt.duration { + t.Errorf("%s: duration = %v, want %v", tt.name, duration, tt.duration) + } + if summary.Packets != int(tt.packets) { + t.Errorf("%s: expected %d packets, got %d", tt.name, tt.packets, summary.Packets) + } + + if !summary.Client().SrcIP.Equal(net.ParseIP(tt.srcIP)) { + t.Errorf("%s: srcIP = %s, want %s", tt.name, summary.Server().SrcIP, tt.srcIP) + } + + t.Logf("%+v\n", summary) + } +} + +// goos: darwin goarch: amd64 pkg: github.com/m-lab/etl/tcpip cpu: Intel(R) Core(TM) i7-7920HQ CPU @ 3.10GHz +// BenchmarkProcessPackets2-8 219 5546192 ns/op 318.32 MB/s 36616 packets/op 2146663 B/op 98347 allocs/op +// BenchmarkProcessPackets2-8 261 4763381 ns/op 370.63 MB/s 36694 packets/op 1259030 B/op 25171 allocs/op +// BenchmarkProcessPackets2-8 312 3910415 ns/op 451.48 MB/s 37099 packets/op 61150 B/op 216 allocs/op // reuse ipv6 wrapper +func BenchmarkProcessPackets2(b *testing.B) { + type tt struct { + data []byte + numPkts int + ipPayloadBytes int + leftSacks, rightSacks int + } + tests := []tt{ + // Approximately 220K packets, so this is about 140nsec/packet, and about 100 bytes/packet allocated, + // which is roughly the footprint of the packets themselves. + {getTestfileForBenchmark(b, "ndt-nnwk2_1611335823_00000000000C2DA8.pcap.gz"), 15, 4574, 0, 0}, + {getTestfileForBenchmark(b, "ndt-nnwk2_1611335823_00000000000C2DFE.pcap.gz"), 336, 167003, 31, 24}, // retransmits and SACKs + {getTestfileForBenchmark(b, "ndt-nnwk2_1611335823_00000000000C2DA9.pcap.gz"), 5180, 81408294, 0, 0}, + {getTestfileForBenchmark(b, "ndt-m6znc_1632401351_000000000005BA77.pcap.gz"), 40797, 239251626, 70557, 207}, + {getTestfileForBenchmark(b, "ndt-m6znc_1632401351_000000000005B9EA.pcap.gz"), 146172, 158096007, 7, 195}, + {getTestfileForBenchmark(b, "ndt-m6znc_1632401351_000000000005B90B.pcap.gz"), 30097, 126523401, 0, 0}, + } + b.ReportAllocs() + b.ResetTimer() + + b.ReportMetric(220000, "packets/op") + + i := 0 + + numPkts := 0 + ops := 0 + b.RunParallel(func(pb *testing.PB) { + for pb.Next() { + test := tests[i%len(tests)] + ops++ + numPkts += test.numPkts + i++ + summary, err := tcpip.ProcessPackets("foo", "bar", test.data) + if err != nil { + b.Fatal(err) + } + if summary.Packets != test.numPkts { + b.Errorf("expected %d packets, got %d", test.numPkts, summary.Packets) + } + b.SetBytes(int64(len(test.data))) + } + }) + b.Log("total packets", numPkts, "total ops", ops) + b.ReportMetric(float64(numPkts/ops), "packets/op") +} diff --git a/tcpip/testdata/ndt-4dh2l_1591894023_0000000000363A84.pcap.gz b/tcpip/testdata/ndt-4dh2l_1591894023_0000000000363A84.pcap.gz new file mode 100644 index 000000000..f59380329 Binary files /dev/null and b/tcpip/testdata/ndt-4dh2l_1591894023_0000000000363A84.pcap.gz differ diff --git a/tcpip/testdata/ndt-m6znc_1632401351_000000000005B90B.pcap.gz b/tcpip/testdata/ndt-m6znc_1632401351_000000000005B90B.pcap.gz new file mode 100644 index 000000000..0fc4f0fb1 Binary files /dev/null and b/tcpip/testdata/ndt-m6znc_1632401351_000000000005B90B.pcap.gz differ diff --git a/tcpip/testdata/ndt-m6znc_1632401351_000000000005B9EA.pcap.gz b/tcpip/testdata/ndt-m6znc_1632401351_000000000005B9EA.pcap.gz new file mode 100644 index 000000000..891665bb8 Binary files /dev/null and b/tcpip/testdata/ndt-m6znc_1632401351_000000000005B9EA.pcap.gz differ diff --git a/tcpip/testdata/ndt-m6znc_1632401351_000000000005BA77.pcap.gz b/tcpip/testdata/ndt-m6znc_1632401351_000000000005BA77.pcap.gz new file mode 100644 index 000000000..4bc429327 Binary files /dev/null and b/tcpip/testdata/ndt-m6znc_1632401351_000000000005BA77.pcap.gz differ diff --git a/tcpip/testdata/ndt-nnwk2_1611335823_00000000000C2DA2.pcap.gz b/tcpip/testdata/ndt-nnwk2_1611335823_00000000000C2DA2.pcap.gz new file mode 100644 index 000000000..718f4d2e9 Binary files /dev/null and b/tcpip/testdata/ndt-nnwk2_1611335823_00000000000C2DA2.pcap.gz differ diff --git a/tcpip/testdata/ndt-nnwk2_1611335823_00000000000C2DA8.pcap.gz b/tcpip/testdata/ndt-nnwk2_1611335823_00000000000C2DA8.pcap.gz new file mode 100644 index 000000000..92acef543 Binary files /dev/null and b/tcpip/testdata/ndt-nnwk2_1611335823_00000000000C2DA8.pcap.gz differ diff --git a/tcpip/testdata/ndt-nnwk2_1611335823_00000000000C2DA9.pcap.gz b/tcpip/testdata/ndt-nnwk2_1611335823_00000000000C2DA9.pcap.gz new file mode 100644 index 000000000..19edad182 Binary files /dev/null and b/tcpip/testdata/ndt-nnwk2_1611335823_00000000000C2DA9.pcap.gz differ diff --git a/tcpip/testdata/ndt-nnwk2_1611335823_00000000000C2DFE.pcap.gz b/tcpip/testdata/ndt-nnwk2_1611335823_00000000000C2DFE.pcap.gz new file mode 100644 index 000000000..cdd5ed173 Binary files /dev/null and b/tcpip/testdata/ndt-nnwk2_1611335823_00000000000C2DFE.pcap.gz differ