From ec05c8d94d1085ac61758664444c5986b8a20d8e Mon Sep 17 00:00:00 2001 From: Emil Valeev Date: Fri, 4 Oct 2024 01:38:45 +0500 Subject: [PATCH] refactor(examples:streams:zip/product), fix(backend:golang:tpl): newlines, fix(runtime:funcs:stream_product): concurrently receive form both streams, fix(runtime:msg:string): json marshal method (cases with one letter in stream product example) --- examples/stream_product/e2e_test.go | 39 +++++++------ examples/stream_product/main.neva | 25 +++------ examples/stream_zip/main.neva | 20 +++---- internal/compiler/backend/golang/tpl.go | 3 +- internal/runtime/funcs/stream_product.go | 70 ++++++++++++++++-------- internal/runtime/message.go | 8 ++- 6 files changed, 89 insertions(+), 76 deletions(-) diff --git a/examples/stream_product/e2e_test.go b/examples/stream_product/e2e_test.go index fa65d77f..56d1eeaf 100644 --- a/examples/stream_product/e2e_test.go +++ b/examples/stream_product/e2e_test.go @@ -1,6 +1,7 @@ package test import ( + "fmt" "os" "os/exec" "testing" @@ -8,6 +9,17 @@ import ( "github.com/stretchr/testify/require" ) +var expectedOutput = `{"first": 0, "second": 0} +{"first": 0, "second": 1} +{"first": 0, "second": 2} +{"first": 1, "second": 0} +{"first": 1, "second": 1} +{"first": 1, "second": 2} +{"first": 2, "second": 0} +{"first": 2, "second": 1} +{"first": 2, "second": 2} +` + func Test(t *testing.T) { err := os.Chdir("..") require.NoError(t, err) @@ -16,24 +28,15 @@ func Test(t *testing.T) { require.NoError(t, err) defer os.Chdir(wd) - cmd := exec.Command("neva", "run", "stream_product") + for i := 0; i < 1; i++ { + t.Run(fmt.Sprintf("Run %d", i+1), func(t *testing.T) { + cmd := exec.Command("neva", "run", "stream_product") - out, err := cmd.CombinedOutput() - require.NoError(t, err) - require.Equal( - t, - `{"first": 0, "second": 0} -{"first": 0, "second": 1} -{"first": 0, "second": 2} -{"first": 1, "second": 0} -{"first": 1, "second": 1} -{"first": 1, "second": 2} -{"first": 2, "second": 0} -{"first": 2, "second": 1} -{"first": 2, "second": 2} -`, - string(out), - ) + out, err := cmd.CombinedOutput() + require.NoError(t, err) + require.Equal(t, expectedOutput, string(out)) - require.Equal(t, 0, cmd.ProcessState.ExitCode()) + require.Equal(t, 0, cmd.ProcessState.ExitCode()) + }) + } } diff --git a/examples/stream_product/main.neva b/examples/stream_product/main.neva index fbf07d79..fd01629d 100644 --- a/examples/stream_product/main.neva +++ b/examples/stream_product/main.neva @@ -1,21 +1,12 @@ import { streams } flow Main(start) (stop) { - r1 Range - r2 Range - streams.Product - ForEach> { - Println> - } - Wait - --- - :start -> [ - (0 -> [r1:from, r2:from]), - (3 -> [r1:to, r2:to]) - ] - - r1 -> product:first - r2 -> product:second - - product -> forEach -> wait -> :stop + r1 Range, r2 Range + streams.Product + ForEach{Println}, Wait + --- + :start -> [(0 -> [r1:from, r2:from]), (3 -> [r1:to, r2:to])] + r1 -> product:first + r2 -> product:second + product -> forEach -> wait -> :stop } \ No newline at end of file diff --git a/examples/stream_zip/main.neva b/examples/stream_zip/main.neva index 291f7805..76e8581f 100644 --- a/examples/stream_zip/main.neva +++ b/examples/stream_zip/main.neva @@ -3,17 +3,11 @@ import { streams } const strings list = ['a', 'b', 'c'] flow Main(start) (stop) { - r1 Range - ListToStream - streams.Zip - ForEach{Println} - Wait - --- - :start -> [ - (0 -> r1:from), - (10 -> r1:to) - ] - r1 -> zip:first - $strings -> listToStream -> zip:second - zip -> forEach -> wait -> :stop + Range, ListToStream, streams.Zip + ForEach{Println}, Wait + --- + :start -> [(0 -> range:from), (10 -> range:to)] + range -> zip:first + $strings -> listToStream -> zip:second + zip -> forEach -> wait -> :stop } \ No newline at end of file diff --git a/internal/compiler/backend/golang/tpl.go b/internal/compiler/backend/golang/tpl.go index 08ff03a8..6b67b943 100644 --- a/internal/compiler/backend/golang/tpl.go +++ b/internal/compiler/backend/golang/tpl.go @@ -34,8 +34,8 @@ func main() { {{.}} = make(chan runtime.OrderedMsg) {{- end}} ) - {{- if .Trace }} + interceptor := runtime.NewDebugInterceptor() close, err := interceptor.Open("trace.log") @@ -48,6 +48,7 @@ func main() { } }() {{- else }} + interceptor := runtime.ProdInterceptor{} {{- end }} diff --git a/internal/runtime/funcs/stream_product.go b/internal/runtime/funcs/stream_product.go index 35df04d3..5eac8325 100644 --- a/internal/runtime/funcs/stream_product.go +++ b/internal/runtime/funcs/stream_product.go @@ -2,6 +2,7 @@ package funcs import ( "context" + "sync" "github.com/nevalang/neva/internal/runtime" ) @@ -30,44 +31,65 @@ func (streamProduct) Create( // TODO: make sure it's not possible to do processing on the fly so we don't have to wait for both streams to complete return func(ctx context.Context) { for { - firstData := []runtime.Msg{} - for { - seqMsg, ok := firstIn.Receive(ctx) - if !ok { - return - } + var ( + firstOk, secondOk bool + firstData = []runtime.Msg{} + secondData = []runtime.Msg{} + ) - item := seqMsg.Struct() - firstData = append(firstData, item.Get("data")) + var wg sync.WaitGroup + wg.Add(2) - if item.Get("last").Bool() { - break - } - } + go func() { + defer wg.Done() + for { + var firstMsg runtime.Msg + firstMsg, firstOk = firstIn.Receive(ctx) + if !firstOk { + return + } + + streamItem := firstMsg.Struct() + firstData = append(firstData, streamItem.Get("data")) - secondData := []runtime.Msg{} - for { - seqMsg, ok := secondIn.Receive(ctx) - if !ok { - return + if streamItem.Get("last").Bool() { + break + } } + }() - item := seqMsg.Struct() - secondData = append(secondData, item.Get("data")) + go func() { + defer wg.Done() + for { + var secondMsg runtime.Msg + secondMsg, secondOk = secondIn.Receive(ctx) + if !secondOk { + return + } - if item.Get("last").Bool() { - break + streamItem := secondMsg.Struct() + secondData = append(secondData, streamItem.Get("data")) + + if streamItem.Get("last").Bool() { + break + } } + }() + + wg.Wait() + + if !firstOk || !secondOk { + return } - for i, msg1 := range firstData { - for j, msg2 := range secondData { + for i, firstMsg := range firstData { + for j, secondMsg := range secondData { seqOut.Send( ctx, streamItem( runtime.NewStructMsg( []string{"first", "second"}, - []runtime.Msg{msg1, msg2}, + []runtime.Msg{firstMsg, secondMsg}, ), int64(i), i == len(firstData)-1 && j == len(secondData)-1, diff --git a/internal/runtime/message.go b/internal/runtime/message.go index 150b29f4..585cf2a9 100644 --- a/internal/runtime/message.go +++ b/internal/runtime/message.go @@ -102,9 +102,11 @@ type StringMsg struct { v string } -func (msg StringMsg) Str() string { return msg.v } -func (msg StringMsg) String() string { return msg.v } -func (msg StringMsg) MarshalJSON() ([]byte, error) { return []byte(msg.String()), nil } +func (msg StringMsg) Str() string { return msg.v } +func (msg StringMsg) String() string { return msg.v } +func (msg StringMsg) MarshalJSON() ([]byte, error) { + return json.Marshal(msg.String()) +} func NewStringMsg(s string) StringMsg { return StringMsg{