Skip to content

Commit

Permalink
Add parser for pcap index (#1012)
Browse files Browse the repository at this point in the history
* Add parser for pcap index

* Replace regex with filepath.Base to extract the UUIID
  • Loading branch information
cristinaleonr authored Jul 27, 2021
1 parent a8f5d84 commit c49fd6b
Show file tree
Hide file tree
Showing 8 changed files with 286 additions and 1 deletion.
9 changes: 8 additions & 1 deletion etl/globals.go
Original file line number Diff line number Diff line change
Expand Up @@ -260,6 +260,7 @@ const (
NDT7 = DataType("ndt7")
NDT_OMIT_DELTAS = DataType("ndt_nodelta") // to support larger buffer size.
SS = DataType("sidestream")
PCAP = DataType("pcap")
PT = DataType("traceroute")
SW = DataType("switch")
TCPINFO = DataType("tcpinfo")
Expand All @@ -276,6 +277,7 @@ var (
"ndt7": NDT7,
"sidestream": SS,
"paris-traceroute": PT,
"pcap": PCAP,
"switch": SW,
"tcpinfo": TCPINFO,
"traceroute": PT,
Expand All @@ -287,6 +289,7 @@ var (
ANNOTATION: "annotation",
NDT: "ndt",
SS: "sidestream",
PCAP: "pcap",
PT: "traceroute",
SW: "switch",
TCPINFO: "tcpinfo",
Expand All @@ -296,13 +299,17 @@ var (
}

// Map from data type to number of buffer size for BQ insertion.
// TODO - this should be loaded from a config.
// This matters more for the legacy parsing that used BQ streaming inserts.
// For the JSONL output in Gardener 2.0 operation, the buffer size doesn’t matter much,
// as everything is written to gcs files, and the gcs library does it’s own buffering.
// TODO - this should be loaded from a config
dataTypeToBQBufferSize = map[DataType]int{
ANNOTATION: 400, // around 1k each.
NDT: 10,
NDT_OMIT_DELTAS: 50,
TCPINFO: 5,
SS: 500, // Average json size is 2.5K
PCAP: 200,
PT: 20,
SW: 100,
NDT5: 200,
Expand Down
10 changes: 10 additions & 0 deletions etl/globals_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -117,6 +117,16 @@ func TestValidateTestPath(t *testing.T) {
"archive-mlab-oti", "ndt", "traceroute", "2019/06/20", "20190620", "224809.435046", "traceroute", "mlab1", "den06", "ndt", "0001", "", ".tgz",
},
},
{
name: "pcap-tgz",
path: `gs://archive-measurement-lab/ndt/pcap/2021/07/22/20210722T000107.470279Z-pcap-mlab1-dfw05-ndt.tgz`,
wantType: etl.PCAP,
want: etl.DataPath{
`gs://archive-measurement-lab/ndt/pcap/2021/07/22/20210722T000107.470279Z-pcap-mlab1-dfw05-ndt.tgz`,
`ndt/pcap/2021/07/22/20210722T000107.470279Z-pcap-mlab1-dfw05-ndt.tgz`,
"archive-measurement-lab", "ndt", "pcap", "2021/07/22", "20210722", "000107.470279", "pcap", "mlab1", "dfw05", "ndt", "", "", ".tgz",
},
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
Expand Down
2 changes: 2 additions & 0 deletions parser/parser.go
Original file line number Diff line number Diff line change
Expand Up @@ -90,6 +90,8 @@ func NewSinkParser(dt etl.DataType, sink row.Sink, table string, ann api.Annotat
return NewNDT7ResultParser(sink, table, "", ann)
case etl.TCPINFO:
return NewTCPInfoParser(sink, table, "", ann)
case etl.PCAP:
return NewPCAPParser(sink, table, "", ann)
default:
return nil
}
Expand Down
128 changes: 128 additions & 0 deletions parser/pcap.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,128 @@
package parser

import (
"path/filepath"
"strings"
"time"

"cloud.google.com/go/bigquery"
"cloud.google.com/go/civil"
v2as "github.com/m-lab/annotation-service/api/v2"
"github.com/m-lab/etl/etl"
"github.com/m-lab/etl/metrics"
"github.com/m-lab/etl/row"
"github.com/m-lab/etl/schema"
)

//=====================================================================================
// PCAP Parser
//=====================================================================================

const pcapSuffix = ".pcap.gz"

// PCAPParser parses the PCAP datatype from the packet-headers process.
type PCAPParser struct {
*row.Base
table string
suffix string
}

// NewPCAPParser returns a new parser for PCAP archives.
func NewPCAPParser(sink row.Sink, table, suffix string, ann v2as.Annotator) etl.Parser {
bufSize := etl.PCAP.BQBufferSize()
if ann == nil {
ann = v2as.GetAnnotator(etl.BatchAnnotatorURL)
}

return &PCAPParser{
Base: row.NewBase(table, sink, bufSize, ann),
table: table,
suffix: suffix,
}

}

// IsParsable returns the canonical test type and whether to parse data.
func (p *PCAPParser) IsParsable(testName string, data []byte) (string, bool) {
// Files look like (.*).pcap.gz .
if strings.HasSuffix(testName, pcapSuffix) {
return "pcap", true
}
return "", false
}

// ParseAndInsert decodes the PCAP data and inserts it into BQ.
func (p *PCAPParser) ParseAndInsert(fileMetadata map[string]bigquery.Value, testName string, rawContent []byte) error {
metrics.WorkerState.WithLabelValues(p.TableName(), "pcap").Inc()
defer metrics.WorkerState.WithLabelValues(p.TableName(), "pcap").Dec()

row := schema.PCAPRow{
Parser: schema.ParseInfo{
Version: Version(),
Time: time.Now(),
ArchiveURL: fileMetadata["filename"].(string),
Filename: testName,
GitCommit: GitCommit(),
},
}

// NOTE: Civil is not TZ adjusted. It takes the year, month, and date from
// the given timestamp, regardless of the timestamp's timezone. Since we
// run our systems in UTC, all timestamps will be relative to UTC and as
// will these dates.
row.Date = fileMetadata["date"].(civil.Date)
row.ID = p.GetUUID(testName)

// Insert the row.
if err := p.Put(&row); err != nil {
return err
}

// Count successful inserts.
metrics.TestCount.WithLabelValues(p.TableName(), "pcap", "ok").Inc()

return nil
}

// GetUUID extracts the UUID from the filename.
// For example, for filename 2021/07/22/ndt-4c6fb_1625899199_00000000013A4623.pcap.gz,
// it returns ndt-4c6fb_1625899199_00000000013A4623.
func (p *PCAPParser) GetUUID(filename string) string {
id := filepath.Base(filename)
return strings.TrimSuffix(id, pcapSuffix)
}

// NB: These functions are also required to complete the etl.Parser interface
// For PCAP, we just forward the calls to the Inserter.

func (p *PCAPParser) Flush() error {
return p.Base.Flush()
}

func (p *PCAPParser) TableName() string {
return p.table
}

func (p *PCAPParser) FullTableName() string {
return p.table + p.suffix
}

// RowsInBuffer returns the count of rows currently in the buffer.
func (p *PCAPParser) RowsInBuffer() int {
return p.GetStats().Pending
}

// Committed returns the count of rows successfully committed to BQ.
func (p *PCAPParser) Committed() int {
return p.GetStats().Committed
}

// Accepted returns the count of all rows received through InsertRow(s).
func (p *PCAPParser) Accepted() int {
return p.GetStats().Total()
}

// Failed returns the count of all rows that could not be committed.
func (p *PCAPParser) Failed() int {
return p.GetStats().Failed
}
133 changes: 133 additions & 0 deletions parser/pcap_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,133 @@
package parser_test

import (
"io/ioutil"
"path"
"strings"
"testing"

"cloud.google.com/go/bigquery"
"cloud.google.com/go/civil"
"github.com/go-test/deep"
"github.com/m-lab/etl/parser"
"github.com/m-lab/etl/schema"
"github.com/m-lab/go/rtx"
)

const (
pcapFilename = "ndt-4c6fb_1625899199_000000000121C1A0.pcap.gz"
pcapGCSPath = "gs://archive-measurement-lab/ndt/pcap/2021/07/22/"
)

func TestPCAPParser_ParseAndInsert(t *testing.T) {
ins := newInMemorySink()
n := parser.NewPCAPParser(ins, "test", "_suffix", &fakeAnnotator{})

data, err := ioutil.ReadFile(path.Join("testdata/PCAP/", pcapFilename))
rtx.Must(err, "failed to load test file")

date := civil.Date{Year: 2021, Month: 07, Day: 22}

meta := map[string]bigquery.Value{
"filename": path.Join(pcapGCSPath, pcapFilename),
"date": date,
}

if err := n.ParseAndInsert(meta, pcapFilename, data); err != nil {
t.Errorf("PCAPParser.ParseAndInsert() error = %v, wantErr %v", err, true)
}

if n.Accepted() != 1 {
t.Fatal("Failed to insert snaplog data", ins)
}
n.Flush()

row := ins.data[0].(*schema.PCAPRow)

expectedParseInfo := schema.ParseInfo{
Version: "https://github.com/m-lab/etl/tree/foobar",
Time: row.Parser.Time,
ArchiveURL: path.Join(pcapGCSPath, pcapFilename),
Filename: pcapFilename,
Priority: 0,
GitCommit: "12345678",
}

expectedPCAPRow := schema.PCAPRow{
ID: "ndt-4c6fb_1625899199_000000000121C1A0",
Parser: expectedParseInfo,
Date: date,
}

if diff := deep.Equal(row, &expectedPCAPRow); diff != nil {
t.Errorf("PCAPParser.ParseAndInsert() different row: %s", strings.Join(diff, "\n"))
}

}

func TestPCAPParser_IsParsable(t *testing.T) {
tests := []struct {
name string
testName string
want bool
}{
{
name: "success-pcap",
testName: pcapFilename,
want: true,
},
{
name: "error-bad-extension",
testName: "badfile.badextension",
want: false,
},
}

for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
data, err := ioutil.ReadFile(path.Join(`testdata/PCAP/`, tt.testName))
if err != nil {
t.Fatalf(err.Error())
}
p := &parser.PCAPParser{}
_, got := p.IsParsable(tt.testName, data)
if got != tt.want {
t.Errorf("PCAPParser.IsParsable() got = %v, want %v", got, tt.want)
}
})
}
}

func TestPCAPParser_GetUUID(t *testing.T) {
tests := []struct {
name string
filename string
want string
}{
{
name: "filename-expected-format",
filename: "2021/07/22/ndt-4c6fb_1625899199_00000000013A4623.pcap.gz",
want: "ndt-4c6fb_1625899199_00000000013A4623",
},
{
name: "filename-without-date-prefix",
filename: "ndt-4c6fb_1625899199_00000000013A4623.pcap.gz",
want: "ndt-4c6fb_1625899199_00000000013A4623",
},
{
name: "empty-string",
filename: "",
want: ".",
},
}

for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
p := &parser.PCAPParser{}
got := p.GetUUID(tt.filename)
if got != tt.want {
t.Errorf("PCAPParser.GetUUID() got = %v, want %v", got, tt.want)
}
})
}
}
1 change: 1 addition & 0 deletions parser/testdata/PCAP/badfile.badextension
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
baddata
Binary file not shown.
4 changes: 4 additions & 0 deletions schema/pcap.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package schema
import (
"cloud.google.com/go/bigquery"
"cloud.google.com/go/civil"
"github.com/m-lab/etl/row"
"github.com/m-lab/go/cloud/bqx"
)

Expand All @@ -11,6 +12,9 @@ type PCAPRow struct {
ID string `bigquery:"id"`
Parser ParseInfo `bigquery:"parser"`
Date civil.Date `bigquery:"date"`

// NOT part of struct schema. Included only to provide a fake annotator interface.
row.NullAnnotator `bigquery:"-"`
}

// Schema returns the Bigquery schema for Pcap.
Expand Down

0 comments on commit c49fd6b

Please sign in to comment.