Skip to content

Commit

Permalink
Add ArchiveSize and FileSize to parser records (#1118)
Browse files Browse the repository at this point in the history
* Add Metadata.ArchiveSize and TestSource.GetSize()
* Add ArchiveSize and FileSize to ParseInfo
* Populate ArchiveSize and FileSize for all datatypes
  • Loading branch information
stephen-soltesz authored Jun 2, 2023
1 parent f4993ea commit df4bdda
Show file tree
Hide file tree
Showing 18 changed files with 109 additions and 71 deletions.
12 changes: 7 additions & 5 deletions etl/etl.go
Original file line number Diff line number Diff line change
Expand Up @@ -105,11 +105,12 @@ type InserterParams struct {

// Metadata provides metadata about the parser and archive files.
type Metadata struct {
Version string
ArchiveURL string
GitCommit string
Date civil.Date
Start time.Time
Version string
ArchiveURL string
GitCommit string
Date civil.Date
Start time.Time
ArchiveSize int64
}

// ErrHighInsertionFailureRate should be returned by TaskError when there are more than 10% BQ insertion errors.
Expand Down Expand Up @@ -153,6 +154,7 @@ type TestSource interface {
// Returns io.EOF when there are no more tests.
NextTest(maxSize int64) (string, []byte, error)
Close() error
GetSize() int64

Detail() string // Detail for logs.
Type() string // Data type for logs and metrics
Expand Down
12 changes: 7 additions & 5 deletions parser/annotation2.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,11 +63,13 @@ func (ap *Annotation2Parser) ParseAndInsert(meta etl.Metadata, testName string,

row := schema.Annotation2Row{
Parser: schema.ParseInfo{
Version: meta.Version,
Time: time.Now(),
ArchiveURL: meta.ArchiveURL,
Filename: testName,
GitCommit: meta.GitCommit,
Version: meta.Version,
Time: time.Now(),
ArchiveURL: meta.ArchiveURL,
Filename: testName,
GitCommit: meta.GitCommit,
ArchiveSize: meta.ArchiveSize,
FileSize: int64(len(test)),
},
}

Expand Down
1 change: 1 addition & 0 deletions parser/annotation2_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,7 @@ func TestAnnotation2Parser_ParseAndInsert(t *testing.T) {
Filename: tt.file,
Priority: 0,
GitCommit: "12345678",
FileSize: int64(len(data)),
}

if diff := deep.Equal(row.Parser, expPI); diff != nil {
Expand Down
12 changes: 7 additions & 5 deletions parser/hopannotation2.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,11 +48,13 @@ func (p *HopAnnotation2Parser) ParseAndInsert(meta etl.Metadata, testName string

row := schema.HopAnnotation2Row{
Parser: schema.ParseInfo{
Version: meta.Version,
Time: time.Now(),
ArchiveURL: meta.ArchiveURL,
Filename: testName,
GitCommit: meta.GitCommit,
Version: meta.Version,
Time: time.Now(),
ArchiveURL: meta.ArchiveURL,
Filename: testName,
GitCommit: meta.GitCommit,
ArchiveSize: meta.ArchiveSize,
FileSize: int64(len(rawContent)),
},
}

Expand Down
23 changes: 13 additions & 10 deletions parser/hopannotation2_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,10 +32,11 @@ func TestHopAnnotation2Parser_ParseAndInsert(t *testing.T) {
date := civil.Date{Year: 2021, Month: 07, Day: 30}

meta := etl.Metadata{
ArchiveURL: path.Join(hopAnnotation2GCSPath, hopAnnotation2Filename),
Date: date,
Version: parser.Version(),
GitCommit: parser.GitCommit(),
ArchiveURL: path.Join(hopAnnotation2GCSPath, hopAnnotation2Filename),
Date: date,
Version: parser.Version(),
GitCommit: parser.GitCommit(),
ArchiveSize: int64(len(data)),
}

if err := n.ParseAndInsert(meta, hopAnnotation2Filename, data); err != nil {
Expand All @@ -50,12 +51,14 @@ func TestHopAnnotation2Parser_ParseAndInsert(t *testing.T) {
row := ins.data[0].(*schema.HopAnnotation2Row)

expectedParseInfo := schema.ParseInfo{
Version: "https://github.com/m-lab/etl/tree/foobar",
Time: row.Parser.Time,
ArchiveURL: path.Join(hopAnnotation2GCSPath, hopAnnotation2Filename),
Filename: hopAnnotation2Filename,
Priority: 0,
GitCommit: "12345678",
Version: "https://github.com/m-lab/etl/tree/foobar",
Time: row.Parser.Time,
ArchiveURL: path.Join(hopAnnotation2GCSPath, hopAnnotation2Filename),
Filename: hopAnnotation2Filename,
Priority: 0,
GitCommit: "12345678",
ArchiveSize: int64(len(data)),
FileSize: int64(len(data)),
}

expectedGeolocation := annotator.Geolocation{
Expand Down
12 changes: 7 additions & 5 deletions parser/ndt5_result.go
Original file line number Diff line number Diff line change
Expand Up @@ -84,11 +84,13 @@ func (dp *NDT5ResultParser) ParseAndInsert(meta etl.Metadata, testName string, t
}

parser := schema.ParseInfo{
Version: meta.Version,
Time: time.Now(),
ArchiveURL: meta.ArchiveURL,
Filename: testName,
GitCommit: meta.GitCommit,
Version: meta.Version,
Time: time.Now(),
ArchiveURL: meta.ArchiveURL,
Filename: testName,
GitCommit: meta.GitCommit,
ArchiveSize: meta.ArchiveSize,
FileSize: int64(len(test)),
}
date := meta.Date

Expand Down
12 changes: 7 additions & 5 deletions parser/ndt7_result.go
Original file line number Diff line number Diff line change
Expand Up @@ -69,11 +69,13 @@ func (dp *NDT7ResultParser) ParseAndInsert(meta etl.Metadata, testName string, t

row := schema.NDT7ResultRow{
Parser: schema.ParseInfo{
Version: meta.Version,
Time: time.Now(),
ArchiveURL: meta.ArchiveURL,
Filename: testName,
GitCommit: meta.GitCommit,
Version: meta.Version,
Time: time.Now(),
ArchiveURL: meta.ArchiveURL,
Filename: testName,
GitCommit: meta.GitCommit,
ArchiveSize: meta.ArchiveSize,
FileSize: int64(len(test)),
},
}

Expand Down
12 changes: 7 additions & 5 deletions parser/ndt7_result_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ import (
"github.com/m-lab/go/pretty"
)

func setupNDT7InMemoryParser(t *testing.T, testName string) (*schema.NDT7ResultRow, error) {
func setupNDT7InMemoryParser(t *testing.T, testName string) (*schema.NDT7ResultRow, int64, error) {
ins := newInMemorySink()
n := parser.NewNDT7ResultParser(ins, "test", "_suffix")

Expand All @@ -31,14 +31,14 @@ func setupNDT7InMemoryParser(t *testing.T, testName string) (*schema.NDT7ResultR
}
err = n.ParseAndInsert(meta, testName, resultData)
if err != nil {
return nil, err
return nil, 0, err
}
if n.Accepted() != 1 {
t.Fatal("Failed to insert snaplog data.", ins)
}
n.Flush()
row := ins.data[0].(*schema.NDT7ResultRow)
return row, err
return row, int64(len(resultData)), err
}

func TestNDT7ResultParser_ParseAndInsert(t *testing.T) {
Expand All @@ -58,7 +58,7 @@ func TestNDT7ResultParser_ParseAndInsert(t *testing.T) {
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
row, err := setupNDT7InMemoryParser(t, tt.testName)
row, size, err := setupNDT7InMemoryParser(t, tt.testName)
if (err != nil) != tt.wantErr {
t.Errorf("NDT7ResultParser.ParseAndInsert() error = %v, wantErr %v", err, tt.wantErr)
}
Expand All @@ -82,6 +82,7 @@ func TestNDT7ResultParser_ParseAndInsert(t *testing.T) {
Filename: "ndt7-download-20200318T000657.568382877Z.ndt-knwp4_1583603744_000000000000590E.json",
Priority: 0,
GitCommit: "12345678",
FileSize: size,
}
if diff := deep.Equal(row.Parser, expPI); diff != nil {
pretty.Print(row.Parser)
Expand Down Expand Up @@ -121,6 +122,7 @@ func TestNDT7ResultParser_ParseAndInsert(t *testing.T) {
Filename: "ndt7-upload-20200318T001352.496224022Z.ndt-knwp4_1583603744_0000000000005CF2.json",
Priority: 0,
GitCommit: "12345678",
FileSize: size,
}
if diff := deep.Equal(row.Parser, expPI); diff != nil {
t.Errorf("NDT7ResultParser.ParseAndInsert() different summary: %s", strings.Join(diff, "\n"))
Expand Down Expand Up @@ -160,7 +162,7 @@ func TestNDT7ResultParser_ParseAndInsertUnsafe(t *testing.T) {
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
row, err := setupNDT7InMemoryParser(t, tt.testName)
row, _, err := setupNDT7InMemoryParser(t, tt.testName)
if (err != nil) != tt.wantErr {
t.Errorf("NDT7ResultParser.ParseAndInsert() error = %v, wantErr %v", err, tt.wantErr)
}
Expand Down
12 changes: 7 additions & 5 deletions parser/pcap.go
Original file line number Diff line number Diff line change
Expand Up @@ -148,11 +148,13 @@ func (p *PCAPParser) ParseAndInsert(meta etl.Metadata, testName string, rawConte

row := schema.PCAPRow{
Parser: schema.ParseInfo{
Version: meta.Version,
Time: time.Now(),
ArchiveURL: meta.ArchiveURL,
Filename: testName,
GitCommit: meta.GitCommit,
Version: meta.Version,
Time: time.Now(),
ArchiveURL: meta.ArchiveURL,
Filename: testName,
GitCommit: meta.GitCommit,
ArchiveSize: meta.ArchiveSize,
FileSize: int64(len(rawContent)),
},
}

Expand Down
1 change: 1 addition & 0 deletions parser/pcap_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,7 @@ func TestPCAPParser_ParseAndInsert(t *testing.T) {
Filename: pcapFilename,
Priority: 0,
GitCommit: "12345678",
FileSize: int64(len(data)),
}

expectedPCAPRow := schema.PCAPRow{
Expand Down
12 changes: 7 additions & 5 deletions parser/scamper1.go
Original file line number Diff line number Diff line change
Expand Up @@ -126,11 +126,13 @@ func (p *Scamper1Parser) ParseAndInsert(meta etl.Metadata, testName string, rawC
parseTracelb(&bqScamperOutput, scamperOutput.Tracelb)

parseInfo := schema.ParseInfo{
Version: meta.Version,
Time: time.Now(),
ArchiveURL: meta.ArchiveURL,
Filename: testName,
GitCommit: meta.GitCommit,
Version: meta.Version,
Time: time.Now(),
ArchiveURL: meta.ArchiveURL,
Filename: testName,
GitCommit: meta.GitCommit,
ArchiveSize: meta.ArchiveSize,
FileSize: int64(len(rawContent)),
}

row := schema.Scamper1Row{
Expand Down
1 change: 1 addition & 0 deletions parser/scamper1_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@ func TestScamper1Parser_ParseAndInsert(t *testing.T) {

expectedRow := expectedScamper1Row()
expectedRow.Parser.Time = row.Parser.Time
expectedRow.Parser.FileSize = int64(len(data))
if diff := deep.Equal(row, &expectedRow); diff != nil {
t.Errorf("failed to extract correct row from file: different rows - %s", strings.Join(diff, "\n"))
}
Expand Down
12 changes: 7 additions & 5 deletions parser/switch.go
Original file line number Diff line number Diff line change
Expand Up @@ -137,11 +137,13 @@ func (p *SwitchParser) ParseAndInsert(meta etl.Metadata, testName string, rawCon
ID: fmt.Sprintf("%s-%s-%d", machine, site, sample.Timestamp),
Date: archiveDate,
Parser: schema.ParseInfo{
Version: meta.Version,
Time: time.Now(),
ArchiveURL: meta.ArchiveURL,
Filename: testName,
GitCommit: meta.GitCommit,
Version: meta.Version,
Time: time.Now(),
ArchiveURL: meta.ArchiveURL,
Filename: testName,
GitCommit: meta.GitCommit,
ArchiveSize: meta.ArchiveSize,
FileSize: int64(len(rawContent)),
},
A: &schema.SwitchSummary{
Machine: machine,
Expand Down
12 changes: 7 additions & 5 deletions parser/tcpinfo.go
Original file line number Diff line number Diff line change
Expand Up @@ -181,11 +181,13 @@ func (p *TCPInfoParser) ParseAndInsert(meta etl.Metadata, testName string, rawCo
FinalSnapshot: snaps[len(snaps)-1],
},
Parser: schema.ParseInfo{
Version: meta.Version,
Time: time.Now(),
ArchiveURL: meta.ArchiveURL,
Filename: testName,
GitCommit: meta.GitCommit,
Version: meta.Version,
Time: time.Now(),
ArchiveURL: meta.ArchiveURL,
Filename: testName,
GitCommit: meta.GitCommit,
ArchiveSize: meta.ArchiveSize,
FileSize: int64(len(rawContent)),
},
Date: meta.Date,
Raw: &snapshot.ConnectionLog{
Expand Down
4 changes: 4 additions & 0 deletions schema/descriptions/toplevel.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,10 @@ parser.Filename:
Results in the raw record are derived from measurements in this file.
parser.GitCommit:
Description: The git commit of this build of the parser.
parser.ArchiveSize:
Description: The original archive size as found in GCS.
parser.FileSize:
Description: The size of the file data provided to the parser for this row.

server:
Description: Location information about the M-Lab server that collected the
Expand Down
14 changes: 8 additions & 6 deletions schema/schema.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,12 +13,14 @@ import (

// ParseInfo provides details about the parsed row. Uses 'Standard Column' names.
type ParseInfo struct {
Version string
Time time.Time
ArchiveURL string
Filename string
Priority int64
GitCommit string
Version string
Time time.Time
ArchiveURL string
Filename string
Priority int64
GitCommit string
ArchiveSize int64
FileSize int64
}

// ServerInfo details various kinds of information about the server.
Expand Down
5 changes: 5 additions & 0 deletions storage/storage.go
Original file line number Diff line number Diff line change
Expand Up @@ -141,6 +141,11 @@ func (src *GCSSource) Date() civil.Date {
return src.PathDate
}

// GetSize returns a size of the archive at the GCSSource archive path.
func (src *GCSSource) GetSize() int64 {
return src.Size
}

// NextTest reads the next test object from the tar file.
// Skips reading contents of any file larger than maxSize, returning empty data
// and storage.ErrOversizeFile.
Expand Down
11 changes: 6 additions & 5 deletions task/task.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,11 +55,12 @@ func NewTask(archive string, src etl.TestSource, prsr etl.Parser, closer io.Clos
TestSource: src,
Parser: prsr,
meta: etl.Metadata{
Version: parser.Version(),
ArchiveURL: archive,
GitCommit: parser.GitCommit(),
Date: src.Date(),
Start: time.Now(),
Version: parser.Version(),
ArchiveURL: archive,
GitCommit: parser.GitCommit(),
Date: src.Date(),
Start: time.Now(),
ArchiveSize: src.GetSize(),
},
maxFileSize: DefaultMaxFileSize,
closer: closer,
Expand Down

0 comments on commit df4bdda

Please sign in to comment.