Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add alpha functionality to pcap parser #1028

Closed
wants to merge 65 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
65 commits
Select commit Hold shift + click to select a range
1449fbc
add top level packet parsing
Nov 5, 2021
2213103
improve seq/ack handling
Nov 8, 2021
03ae66e
refinements
Nov 8, 2021
134ab62
log refinement
Nov 8, 2021
cb4eead
comment out debug and fix unit test
Nov 8, 2021
9b5c2ee
ECE and Retransmits
Nov 8, 2021
22be0e8
questionable delta code
Nov 8, 2021
c80ac8d
hacking ip header
Nov 9, 2021
8ad1320
add tcp packet header specific package
Nov 9, 2021
b4a356b
hook to new tcp package
Nov 9, 2021
48d908d
fix unit test
Nov 9, 2021
b6dd49e
disable alpha output for testing
Nov 9, 2021
7633dba
remove log.Fatal
Nov 9, 2021
a41ad87
improve bad delta message
Nov 9, 2021
ef4ef4e
fix optionCount index bug
Nov 9, 2021
bf5cc36
check for illegal option type
Nov 9, 2021
29e9295
cleanup and tests for Tracker
Nov 10, 2021
11fe723
add ack gap tracking
Nov 10, 2021
02b4733
fix unit test
Nov 10, 2021
0e44fd6
improve window and gap handling
Nov 10, 2021
2d57855
check for window limit
Nov 10, 2021
97d34f1
fix window limit logging
Nov 10, 2021
5332844
reduce log spam for now
Nov 10, 2021
52ba201
increase max_active
Nov 10, 2021
5d74cd3
use info logger, and sparse logger
Nov 10, 2021
6d0d9b1
implement protocol violation check
Nov 10, 2021
1e6a49d
tweak sparse logger
Nov 10, 2021
f68e9ea
back to 100 max_active
Nov 10, 2021
366f3e6
tweak some names
Nov 10, 2021
cdaf7c1
add a test to exercise more code paths
Nov 11, 2021
3aecc91
even less logging
Nov 11, 2021
cd1e7fe
improve logging on poller errors
Nov 12, 2021
3e5e505
improve poller logging and intercept iterator.Done
Nov 12, 2021
c2a7537
fix unit test
Nov 12, 2021
692909d
nocopy and better log message
Nov 12, 2021
d037313
allocate more memory to maybe reduce gc workload
Nov 13, 2021
257b6d0
add lazy decoding
Nov 13, 2021
5944eaa
add counter and logging to throttle for debugging
Nov 13, 2021
39aa56b
add log on throttle error
Nov 13, 2021
fcc8af6
skip tcp for now
Nov 14, 2021
3933abd
fix continue bug
Nov 14, 2021
8151553
cleanup and fix off by one alpha.Packet bug
Nov 14, 2021
ec4466d
partial refactor
Nov 14, 2021
ecff217
add second parse test with protocol violations
Nov 14, 2021
06b7b67
improved trivial test coverage
Nov 14, 2021
912196d
more refactoring
Nov 14, 2021
1c5f16c
more refactoring
Nov 15, 2021
0d00fe8
more refactoring
Nov 15, 2021
e7bd370
comment about token bucket validation
Nov 15, 2021
974a314
refactor ip layer code
Nov 15, 2021
01b277d
extensive refactoring to separate tcp state from stats
Nov 15, 2021
e838ee4
preprocess all the packets
Nov 18, 2021
6e468a1
add seq number tracking and initial packet time offset
Nov 18, 2021
b2d832c
Add one way jitter code. Incomplete - needs tickrate calc
Nov 18, 2021
73b6a6b
defer IP parsing to later
Nov 18, 2021
a7904c1
remove throttle diagnostics
Nov 18, 2021
c69307c
add bounds check for timestamp option
Nov 18, 2021
fe4b616
include alpha in schema, write out jitter logs
Nov 19, 2021
a06b4d0
fix divide by zero
Nov 19, 2021
32d2b5e
change to Exp (alpha cant be reused for 7 days)
Nov 19, 2021
c7c7d27
use len(OptionData) instead of OptionLength
Nov 19, 2021
f795e98
add panic handling
Nov 19, 2021
9038e03
Add hopannotation1 schema to legacy ndt/traceroute table schema
cristinaleonr Nov 19, 2021
4d39052
fix json label for exp_a
Nov 19, 2021
fe76fed
fix duplicate Delay assignment
Nov 20, 2021
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 5 additions & 5 deletions active/active.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ import (
"github.com/m-lab/go/logx"
)

var debug = logx.Debug
var dbg = logx.Debug

/* Discussion:
What should happen if:
Expand Down Expand Up @@ -78,7 +78,7 @@ func (c *Context) Err() error {

// Fail cancels the context, and sets the result of context.Err()
func (c *Context) Fail(err error) {
debug.Output(2, "stopping")
dbg.Output(2, "stopping")
c.lock.Lock()
defer c.lock.Unlock()
c.otherErr = err
Expand Down Expand Up @@ -154,7 +154,7 @@ func (src *GCSSource) Next(ctx context.Context) (Runnable, error) {
if ok {
return next, nil
}
debug.Println("iterator.Done")
dbg.Println("iterator.Done")
return nil, iterator.Done
}
}
Expand All @@ -181,7 +181,7 @@ func (src *GCSSource) streamToPending(ctx context.Context, job tracker.Job) {
skipCount := dataType.SkipCount()

for _, f := range files {
debug.Println(f)
dbg.Println(f)
if f == nil {
log.Println("Nil file!!")
metrics.ActiveErrors.WithLabelValues(src.Label(), "nil file").Inc()
Expand All @@ -194,7 +194,7 @@ func (src *GCSSource) streamToPending(ctx context.Context, job tracker.Job) {
}

if index%(skipCount+1) == 0 {
debug.Printf("Adding gs://%s/%s", f.Bucket, f.Name)
dbg.Printf("Adding gs://%s/%s", f.Bucket, f.Name)
// Blocks until consumer reads channel.
src.pendingChan <- src.toRunnable(f)
}
Expand Down
1 change: 1 addition & 0 deletions active/active_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ func init() {
// Always prepend the filename and line number.
log.SetFlags(log.LstdFlags | log.Lshortfile)
logx.LogxDebug.Set("true")
logx.Debug.SetFlags(log.LstdFlags | log.Lshortfile)
}

type counter struct {
Expand Down
39 changes: 30 additions & 9 deletions active/poller.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ import (
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/promauto"
"golang.org/x/sync/errgroup"
"google.golang.org/api/iterator"
"google.golang.org/api/option"

job "github.com/m-lab/etl-gardener/client"
Expand Down Expand Up @@ -95,36 +96,48 @@ func postAndIgnoreResponse(ctx context.Context, url url.URL) error {
// or the context is canceled.
func (g *GardenerAPI) RunAll(ctx context.Context, rSrc RunnableSource, job tracker.Job) (*errgroup.Group, error) {
eg := &errgroup.Group{}
count := 0
for {
run, err := rSrc.Next(ctx)
if err != nil {
metrics.BackendFailureCount.WithLabelValues(
job.Datatype, "rSrc.Next").Inc()
log.Println(err)
return eg, err
if err == iterator.Done {
log.Printf("Dispatched total of %d archives for %s\n", count, job.String())
return eg, nil
} else {
metrics.BackendFailureCount.WithLabelValues(
job.Datatype, "rSrc.Next").Inc()
log.Println(err, "processing", job.String())
return eg, err
}
}

heartbeat := tracker.HeartbeatURL(g.trackerBase, job)
if postErr := postAndIgnoreResponse(ctx, *heartbeat); postErr != nil {
log.Println(postErr, "on heartbeat for", job.Path())
}

debug.Println("Starting func")
dbg.Println("Starting func")

f := func() error {
f := func() (err error) {
metrics.ActiveTasks.WithLabelValues(rSrc.Label()).Inc()
defer metrics.ActiveTasks.WithLabelValues(rSrc.Label()).Dec()

err := run.Run(ctx)
// Capture any panic and convert it to an error.
defer func(tag string) {
err = metrics.PanicToErr(err, recover(), "Runall.f")
}(run.Info())

err = run.Run(ctx)
if err == nil {
update := tracker.UpdateURL(g.trackerBase, job, tracker.Parsing, run.Info())
if postErr := postAndIgnoreResponse(ctx, *update); postErr != nil {
log.Println(postErr, "on update for", job.Path())
}
}
return err
return
}

count++
eg.Go(f)
}
}
Expand Down Expand Up @@ -171,12 +184,14 @@ func (g *GardenerAPI) pollAndRun(ctx context.Context,
toRunnable func(o *storage.ObjectAttrs) Runnable, tokens TokenSource) error {
job, err := g.NextJob(ctx)
if err != nil {
log.Println(err, "on Gardener client.NextJob()")
return err
}

log.Println(job, "filter:", job.Filter)
gcsSource, err := g.JobFileSource(ctx, job.Job, toRunnable)
if err != nil {
log.Println(err, "on JobFileSource")
return err
}
src := Throttle(gcsSource, tokens)
Expand All @@ -189,12 +204,18 @@ func (g *GardenerAPI) pollAndRun(ctx context.Context,
}

eg, err := g.RunAll(ctx, src, job.Job)
if err != nil {
log.Println(err)
}

// Once all are dispatched, we want to wait until all have completed
// before posting the state change.
go func() {
log.Println("all tasks dispatched for", job.Path())
eg.Wait()
err := eg.Wait()
if err != nil {
log.Println(err, "on wait for", job.Path())
}
log.Println("finished", job.Path())
update := tracker.UpdateURL(g.trackerBase, job.Job, tracker.ParseComplete, "")
// TODO - should this have a retry?
Expand Down
10 changes: 9 additions & 1 deletion active/poller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,10 +12,18 @@ import (

"github.com/m-lab/etl-gardener/tracker"
"github.com/m-lab/etl/active"
"github.com/m-lab/go/logx"
"github.com/m-lab/go/rtx"
"google.golang.org/api/iterator"
)

func init() {
// Always prepend the filename and line number.
log.SetFlags(log.LstdFlags | log.Lshortfile)
logx.LogxDebug.Set("true")
logx.Debug.SetFlags(log.LstdFlags | log.Lshortfile)
}

type fakeGardener struct {
t *testing.T // for logging

Expand Down Expand Up @@ -129,7 +137,7 @@ func TestGardenerAPI_RunAll(t *testing.T) {
src, err := g.JobFileSource(ctx, job.Job, p.toRunnable)
rtx.Must(err, "file source")
eg, err := g.RunAll(ctx, src, job.Job)
if err != iterator.Done {
if err != nil {
t.Fatal(err)
}
err = eg.Wait()
Expand Down
7 changes: 5 additions & 2 deletions active/throttle.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package active

import (
"context"
"log"

"golang.org/x/sync/semaphore"
)
Expand All @@ -19,7 +20,8 @@ type wsTokenSource struct {

// Acquire acquires an admission token.
func (ts *wsTokenSource) Acquire(ctx context.Context) error {
return ts.sem.Acquire(ctx, 1)
err := ts.sem.Acquire(ctx, 1)
return err
}

// Release releases an admission token.
Expand All @@ -29,7 +31,7 @@ func (ts *wsTokenSource) Release() {

// NewWSTokenSource returns a TokenSource based on semaphore.Weighted.
func NewWSTokenSource(n int) TokenSource {
return &wsTokenSource{semaphore.NewWeighted(int64(n))}
return &wsTokenSource{sem: semaphore.NewWeighted(int64(n))}
}

// throttedSource encapsulates a Source and a throttling mechanism.
Expand All @@ -56,6 +58,7 @@ func (ts *throttledSource) Next(ctx context.Context) (Runnable, error) {
// We want Next to block here until a throttle token is available.
err := ts.throttle.Acquire(ctx)
if err != nil {
log.Println("Throttle.Acquire error", err)
return nil, err
}
next, err := ts.RunnableSource.Next(ctx)
Expand Down
1 change: 1 addition & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ require (
github.com/go-test/deep v1.0.7
github.com/golang/groupcache v0.0.0-20210331224755-41bb18bfe9da // indirect
github.com/google/go-jsonnet v0.17.0
github.com/google/gopacket v1.1.19 // indirect
github.com/google/uuid v1.3.0 // indirect
github.com/googleapis/google-cloud-go-testing v0.0.0-20210719221736-1c9a4c676720
github.com/kr/pretty v0.2.1
Expand Down
2 changes: 2 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -184,6 +184,8 @@ github.com/google/go-cmp v0.5.6/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/
github.com/google/go-jsonnet v0.17.0 h1:/9NIEfhK1NQRKl3sP2536b2+x5HnZMdql7x3yK/l8JY=
github.com/google/go-jsonnet v0.17.0/go.mod h1:sOcuej3UW1vpPTZOr8L7RQimqai1a57bt5j22LzGZCw=
github.com/google/gofuzz v1.0.0/go.mod h1:dBl0BpW6vV/+mYPU4Po3pmUjxk6FQPldtuIdl/M65Eg=
github.com/google/gopacket v1.1.19 h1:ves8RnFZPGiFnTS0uPQStjwru6uO6h+nlr9j6fL7kF8=
github.com/google/gopacket v1.1.19/go.mod h1:iJ8V8n6KS+z2U1A8pUwu8bW5SyEMkXJB8Yo/Vo+TKTo=
github.com/google/martian v2.1.0+incompatible h1:/CP5g8u/VJHijgedC/Legn3BAbAaWPgecwXBIDzw5no=
github.com/google/martian v2.1.0+incompatible/go.mod h1:9I4somxYTbIHy5NJKHRl3wXiIaQGbYVAs8BPL6v8lEs=
github.com/google/martian/v3 v3.0.0/go.mod h1:y5Zk1BBys9G+gd6Jrk0W3cC1+ELVxBWuIGO+w/tUAp0=
Expand Down
6 changes: 3 additions & 3 deletions k8s/data-processing/deployments/parser.yml
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ spec:
args: ["--prometheusx.listen-address=:9090",
"--output=gcs",
"--service_port=:8080", # If we move to jsonnet, this could be bound to service-port defined below
"--max_active=100",
"--max_active=100", # pcap parsing reduces utilization. Hopefully this will restore it.
]
env:
- name: GCLOUD_PROJECT
Expand Down Expand Up @@ -79,10 +79,10 @@ spec:

resources:
requests:
memory: "15Gi"
memory: "35Gi"
cpu: "15"
limits:
memory: "20Gi"
memory: "50Gi"
cpu: "15"

nodeSelector:
Expand Down
8 changes: 8 additions & 0 deletions parser/parser.go
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,14 @@ func NormalizeIP(ip string) string {
return n.String()
}

// GetHopID creates a unique identifier to join Hop Annotations
// with traceroute datasets.
func GetHopID(cycleStartTime float64, hostname string, address string) string {
traceStartTime := time.Unix(int64(cycleStartTime), 0).UTC()
date := traceStartTime.Format("20060102")
return fmt.Sprintf("%s_%s_%s", date, hostname, address)
}

// NewSinkParser creates an appropriate parser for a given data type.
// Eventually all datatypes will use this instead of NewParser.
func NewSinkParser(dt etl.DataType, sink row.Sink, table string, ann api.Annotator) etl.Parser {
Expand Down
32 changes: 32 additions & 0 deletions parser/parser_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -112,6 +112,38 @@ func TestNormalizeIP(t *testing.T) {
}
}

func TestGetHopID(t *testing.T) {
tests := []struct {
name string
cycleStartTime float64
hostname string
address string
want string
}{
{
name: "success",
cycleStartTime: float64(1566691268),
hostname: "ndt-plh7v",
address: "2001:550:3::1ca",
want: "20190825_ndt-plh7v_2001:550:3::1ca",
},
{
name: "unix-start",
cycleStartTime: float64(0),
hostname: "ndt-plh7v",
address: "2001:550:3::1ca",
want: "19700101_ndt-plh7v_2001:550:3::1ca",
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
if got := parser.GetHopID(tt.cycleStartTime, tt.hostname, tt.address); got != tt.want {
t.Errorf("GetHopID() = %v, want %v", got, tt.want)
}
})
}
}

//------------------------------------------------------------------------------------
// TestParser ignores the content, returns a MapSaver containing meta data and
// "testname":"..."
Expand Down
20 changes: 15 additions & 5 deletions parser/pcap.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ import (
"github.com/m-lab/etl/metrics"
"github.com/m-lab/etl/row"
"github.com/m-lab/etl/schema"
"github.com/m-lab/etl/tcp"
)

//=====================================================================================
Expand Down Expand Up @@ -56,6 +57,13 @@ func (p *PCAPParser) ParseAndInsert(fileMetadata map[string]bigquery.Value, test
metrics.WorkerState.WithLabelValues(p.TableName(), "pcap").Inc()
defer metrics.WorkerState.WithLabelValues(p.TableName(), "pcap").Dec()

//log.Println(testName)
tcp := tcp.NewParser()
alpha, err := tcp.Parse(rawContent)
if err != nil {
return err
}

row := schema.PCAPRow{
Parser: schema.ParseInfo{
Version: Version(),
Expand All @@ -64,6 +72,12 @@ func (p *PCAPParser) ParseAndInsert(fileMetadata map[string]bigquery.Value, test
Filename: testName,
GitCommit: GitCommit(),
},

Exp: alpha,
}

if err := p.Put(&row); err != nil {
return err
}

// NOTE: Civil is not TZ adjusted. It takes the year, month, and date from
Expand All @@ -73,11 +87,7 @@ func (p *PCAPParser) ParseAndInsert(fileMetadata map[string]bigquery.Value, test
row.Date = fileMetadata["date"].(civil.Date)
row.ID = p.GetUUID(testName)

// Insert the row.
if err := p.Put(&row); err != nil {
return err
}

// log.Println(count, "packets", sacks, "sacks", optionCounts, optionNames)
// Count successful inserts.
metrics.TestCount.WithLabelValues(p.TableName(), "pcap", "ok").Inc()

Expand Down
27 changes: 27 additions & 0 deletions parser/pcap_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
"path"
"strings"
"testing"
"time"

"cloud.google.com/go/bigquery"
"cloud.google.com/go/civil"
Expand Down Expand Up @@ -57,6 +58,32 @@ func TestPCAPParser_ParseAndInsert(t *testing.T) {
ID: "ndt-4c6fb_1625899199_000000000121C1A0",
Parser: expectedParseInfo,
Date: date,

Exp: &schema.AlphaFields{
SynPacket: 0,
SynTime: time.Date(2021, 07, 21, 00, 00, 01, 181050000, time.UTC),
SynAckPacket: 1,
SynAckTime: time.Date(2021, 07, 21, 00, 00, 01, 181063000, time.UTC),
Packets: 18240,

LeftStats: schema.TcpStats{
OptionCounts: []int64{0, 55673, 3, 3, 3, 755, 0, 0, 27083, 0, 0, 0, 0, 0, 0, 0},
WindowChanges: 2,
BadSacks: 2077,
Delay: 0.0004516243614685919,
Jitter: 0.004294499982849156,
TickInterval: 0.009988213,
},
RightStats: schema.TcpStats{
OptionCounts: []int64{0, 58291, 3, 3, 3, 1510, 0, 0, 27637, 0, 0, 0, 0, 0, 0, 0},
WindowChanges: 1157,
RetransmitPackets: 39,
RetransmitBytes: 46332,
Delay: -0.11833836178341439, // TODO - these are goofy values.
Jitter: 2.7687400015472363,
TickInterval: 1.8e-08,
},
},
}

if diff := deep.Equal(row, &expectedPCAPRow); diff != nil {
Expand Down
Loading