From f7a85dcdffbd57a6abc2429289140ca7a77a803b Mon Sep 17 00:00:00 2001 From: Mike Ross Date: Thu, 20 Jun 2024 07:56:06 -0500 Subject: [PATCH 1/2] feat: add StringWithConcurrency function --- pkg/file/file.go | 1 + pkg/file/file_instance.go | 40 +++++++++++++++++++++++++++++++++++---- pkg/file/file_test.go | 3 +++ 3 files changed, 40 insertions(+), 4 deletions(-) diff --git a/pkg/file/file.go b/pkg/file/file.go index 617fd02..1b339df 100644 --- a/pkg/file/file.go +++ b/pkg/file/file.go @@ -33,6 +33,7 @@ type File interface { Parse(record []byte) error Bytes() []byte String(newline bool) string + StringWithConcurrency(newline bool, concurrency int) string Validate() error } diff --git a/pkg/file/file_instance.go b/pkg/file/file_instance.go index 80c69fe..c4b6409 100644 --- a/pkg/file/file_instance.go +++ b/pkg/file/file_instance.go @@ -8,8 +8,10 @@ import ( "encoding/json" "errors" "fmt" + "math" "reflect" "strings" + "sync" "unicode" "github.com/moov-io/base/log" @@ -254,6 +256,15 @@ func (f *fileInstance) Parse(record []byte) error { // String writes the File struct to raw string. func (f *fileInstance) String(isNewLine bool) string { + return f.StringWithConcurrency(isNewLine, 1) +} + +// StringWithConcurrency augments String with a given number of concurrent goroutines. +func (f *fileInstance) StringWithConcurrency(isNewLine bool, concurrency int) string { + if concurrency < 1 { + concurrency = 1 + } + var buf strings.Builder newLine := "" @@ -266,18 +277,39 @@ func (f *fileInstance) String(isNewLine bool) string { // Data Block data := "" - for _, base := range f.Bases { - data += base.String() + newLine + pageSize := int(math.Ceil(float64(len(f.Bases)) / float64(concurrency))) + basePages := [][]lib.Record{} + dataPages := make([]string, concurrency) + for i := 0; i < len(f.Bases); i += pageSize { + end := i + pageSize + if end > len(f.Bases) { + end = len(f.Bases) + } + basePages = append(basePages, f.Bases[i:end]) + } + var wg sync.WaitGroup + for i, page := range basePages { + wg.Add(1) + go func(idx int, page []lib.Record) { + defer wg.Done() + data := "" + for _, base := range page { + data += base.String() + newLine + } + dataPages[idx] = data + }(i, page) + } + wg.Wait() + for _, page := range dataPages { + data += page } // Trailer Block trailer := f.Trailer.String() - buf.Grow(len(header) + len(data) + len(trailer)) buf.WriteString(header) buf.WriteString(data) buf.WriteString(trailer) - return buf.String() } diff --git a/pkg/file/file_test.go b/pkg/file/file_test.go index 415cae1..3bdd5b0 100644 --- a/pkg/file/file_test.go +++ b/pkg/file/file_test.go @@ -77,6 +77,7 @@ func (t *FileTest) TestJsonWithUnpackedVariableBlocked(c *check.C) { rawStr := strings.ReplaceAll(string(raw), "\r\n", "\n") c.Assert(strings.Compare(f.String(true), rawStr), check.Equals, 0) + c.Assert(strings.Compare(f.StringWithConcurrency(true, 2), rawStr), check.Equals, 0) buf, err := json.Marshal(f) c.Assert(err, check.IsNil) @@ -320,6 +321,7 @@ func (t *FileTest) TestCreateFile(c *check.C) { c.Assert(err, check.IsNil) c.Assert(strings.Compare(f.String(false), string(raw)), check.Equals, 0) + c.Assert(strings.Compare(f.StringWithConcurrency(false, 2), string(raw)), check.Equals, 0) } func (t *FileTest) TestNewFileFromReader(c *check.C) { @@ -333,6 +335,7 @@ func (t *FileTest) TestNewFileFromReader(c *check.C) { c.Assert(err, check.IsNil) c.Assert(strings.Compare(f.String(false), string(raw)), check.Equals, 0) + c.Assert(strings.Compare(f.StringWithConcurrency(false, 2), string(raw)), check.Equals, 0) } func (t *FileTest) TestCreateFileFailed(c *check.C) { From 7f13ac70d6cad83fb109febdabb65aa13208bca4 Mon Sep 17 00:00:00 2001 From: Mike Ross Date: Thu, 20 Jun 2024 08:11:12 -0500 Subject: [PATCH 2/2] re-add newlines --- pkg/file/file_instance.go | 2 ++ 1 file changed, 2 insertions(+) diff --git a/pkg/file/file_instance.go b/pkg/file/file_instance.go index c4b6409..5efdd14 100644 --- a/pkg/file/file_instance.go +++ b/pkg/file/file_instance.go @@ -306,10 +306,12 @@ func (f *fileInstance) StringWithConcurrency(isNewLine bool, concurrency int) st // Trailer Block trailer := f.Trailer.String() + buf.Grow(len(header) + len(data) + len(trailer)) buf.WriteString(header) buf.WriteString(data) buf.WriteString(trailer) + return buf.String() }