Skip to content

Commit

Permalink
Add hopannotation1 to paris traceroute (#1047)
Browse files Browse the repository at this point in the history
* Add hopannotation1 to paris traceroute

* Update comment

* Fix idiom

* Fix comment

* Use filepath hostname

* Undo test change

* Use etl.ValidateTestPath()

* Reuse existing Datapath

* Formatting

* Refactor code
  • Loading branch information
cristinaleonr authored Feb 3, 2022
1 parent 84a2778 commit 84187df
Show file tree
Hide file tree
Showing 2 changed files with 32 additions and 9 deletions.
27 changes: 21 additions & 6 deletions parser/pt.go
Original file line number Diff line number Diff line change
Expand Up @@ -374,7 +374,8 @@ func NewPTParser(ins etl.Inserter, ann ...v2as.Annotator) *PTParser {
}

// ProcessAllNodes take the array of the Nodes, and generate one ScamperHop entry from each node.
func ProcessAllNodes(allNodes []Node, server_IP, protocol string, tableName string) []schema.ScamperHop {
func ProcessAllNodes(allNodes []Node, server_IP, protocol string, tableName string,
logTime time.Time, machine string) []schema.ScamperHop {
var results []schema.ScamperHop
if len(allNodes) == 0 {
return nil
Expand All @@ -399,6 +400,7 @@ func ProcessAllNodes(allNodes []Node, server_IP, protocol string, tableName stri
source := schema.HopIP{
IP: server_IP,
}
source.HopAnnotation1 = getParisHopAnnotation(logTime, machine, source.IP)
oneHop := schema.ScamperHop{
Source: source,
Links: links,
Expand All @@ -410,6 +412,7 @@ func ProcessAllNodes(allNodes []Node, server_IP, protocol string, tableName stri
IP: allNodes[i].parent_ip,
Hostname: allNodes[i].parent_hostname,
}
source.HopAnnotation1 = getParisHopAnnotation(logTime, machine, source.IP)
oneHop := schema.ScamperHop{
Source: source,
Links: links,
Expand All @@ -420,6 +423,16 @@ func ProcessAllNodes(allNodes []Node, server_IP, protocol string, tableName stri
return results
}

// getParisHopAnnotation() returns returns a new `*hopannotation.HopAnnotation1` to use
// as a synthetic annotation for paris-traceroute hops.
func getParisHopAnnotation(logTime time.Time, machine string, IP string) *hopannotation.HopAnnotation1 {
hopID := GetHopID(float64(logTime.UTC().Unix()), machine, IP)
return &hopannotation.HopAnnotation1{
ID: hopID,
Timestamp: logTime,
}
}

// This function was designed for hops with multiple flows. When the source IP are duplicate flows, but the destination IP is
// single flow IP, those hops will result in just one node in the list.
func Unique(oneNode Node, list []Node) bool {
Expand Down Expand Up @@ -587,12 +600,12 @@ func (pt *PTParser) ParseAndInsert(meta map[string]bigquery.Value, testName stri
return nil
}

// ArchiveURL must already be valid, so error is safe to ignore.
dp, _ := etl.ValidateTestPath(pt.taskFileName)
// Process the jsonl output of Scamper binary.
if strings.HasSuffix(testName, ".jsonl") {
ptTest, err := ParseJSONL(testName, rawContent, pt.TableName(), pt.taskFileName)
if err == nil {
// ArchiveURL must already be valid, so error is safe to ignore.
dp, _ := etl.ValidateTestPath(pt.taskFileName)
ptTest.ServerX.Site = dp.Site
ptTest.ServerX.Machine = dp.Host

Expand All @@ -611,7 +624,7 @@ func (pt *PTParser) ParseAndInsert(meta map[string]bigquery.Value, testName stri
}

// Process the legacy Paris Traceroute txt output
cachedTest, err := Parse(meta, testName, testId, rawContent, pt.TableName())
cachedTest, err := Parse(meta, testName, testId, rawContent, pt.TableName(), dp)
if err != nil {
// These are happening at a high rate, so demote them to warnings until we can fix them.
metrics.WarningCount.WithLabelValues(
Expand Down Expand Up @@ -786,7 +799,8 @@ func ProcessOneTuple(parts []string, protocol string, currentLeaves []Node, allN

// Parse the raw test file into hops ParisTracerouteHop.
// TODO(dev): dedup the hops that are identical.
func Parse(meta map[string]bigquery.Value, testName string, testId string, rawContent []byte, tableName string) (cachedPTData, error) {
func Parse(meta map[string]bigquery.Value, testName string, testId string, rawContent []byte,
tableName string, dp etl.DataPath) (cachedPTData, error) {
//log.Printf("%s", testName)
metrics.WorkerState.WithLabelValues(tableName, "pt-parse").Inc()
defer metrics.WorkerState.WithLabelValues(tableName, "pt-parse").Dec()
Expand Down Expand Up @@ -911,8 +925,9 @@ func Parse(meta map[string]bigquery.Value, testName string, testId string, rawCo
metrics.PTBitsAwayFromDestV6.WithLabelValues(iataCode).Observe(float64(bitsDiff))
}

machine := fmt.Sprintf("%s-%s", dp.Host, dp.Site)
// Generate Hops from allNodes
PTHops := ProcessAllNodes(allNodes, serverIP, protocol, tableName)
PTHops := ProcessAllNodes(allNodes, serverIP, protocol, tableName, logTime, machine)

source := schema.ServerInfo{
IP: serverIP,
Expand Down
14 changes: 11 additions & 3 deletions parser/pt_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ import (
"github.com/go-test/deep"
"github.com/m-lab/annotation-service/api"
v2 "github.com/m-lab/annotation-service/api/v2"
"github.com/m-lab/etl/etl"
"github.com/m-lab/etl/parser"
"github.com/m-lab/etl/schema"
"github.com/m-lab/traceroute-caller/hopannotation"
Expand Down Expand Up @@ -242,7 +243,7 @@ func TestParseLegacyFormatData(t *testing.T) {
fmt.Println("cannot load test data")
return
}
cachedTest, err := parser.Parse(nil, "testdata/PT/20160112T00:45:44Z_ALL27409.paris", "", rawData, "pt-daily")
cachedTest, err := parser.Parse(nil, "testdata/PT/20160112T00:45:44Z_ALL27409.paris", "", rawData, "pt-daily", etl.DataPath{})
if err != nil {
t.Fatalf(err.Error())
}
Expand Down Expand Up @@ -340,7 +341,9 @@ func TestParseJSONL(t *testing.T) {

func TestParse(t *testing.T) {
rawData, err := ioutil.ReadFile("testdata/PT/20170320T23:53:10Z-172.17.94.34-33456-74.125.224.100-33457.paris")
cachedTest, err := parser.Parse(nil, "testdata/PT/20170320T23:53:10Z-172.17.94.34-33456-74.125.224.100-33457.paris", "", rawData, "pt-daily")
dp, _ := etl.ValidateTestPath("gs://archive-measurement-lab/paris-traceroute/2017/03/20/20170320T000000Z-mlab1-lax05-paris-traceroute-0000.tgz")
cachedTest, err := parser.Parse(nil, "testdata/PT/20170320T23:53:10Z-172.17.94.34-33456-74.125.224.100-33457.paris", "", rawData,
"pt-daily", dp)
if err != nil {
t.Fatalf(err.Error())
}
Expand All @@ -363,6 +366,10 @@ func TestParse(t *testing.T) {
City: "",
CountryCode: "",
Hostname: "sr05-te1-8.nuq04.net.google.com",
HopAnnotation1: &hopannotation.HopAnnotation1{
ID: "20170320_mlab1-lax05_64.233.174.109",
Timestamp: cachedTest.LogTime,
},
},
Linkc: 0,
Links: []schema.HopLink{
Expand Down Expand Up @@ -623,7 +630,8 @@ func TestParseEmpty(t *testing.T) {
fmt.Println("cannot load test data")
return
}
_, parseErr := parser.Parse(nil, "testdata/20180201T07:57:37Z-125.212.217.215-56622-208.177.76.115-9100.paris", "", rawData, "pt-daily")
_, parseErr := parser.Parse(nil, "testdata/20180201T07:57:37Z-125.212.217.215-56622-208.177.76.115-9100.paris", "", rawData, "pt-daily",
etl.DataPath{})
if parseErr == nil {
t.Fatal(parseErr)
}
Expand Down

0 comments on commit 84187df

Please sign in to comment.