Skip to content

Commit

Permalink
refactor(examples:streams:zip/product), fix(backend:golang:tpl): newl…
Browse files Browse the repository at this point in the history
…ines, fix(runtime:funcs:stream_product): concurrently receive form both streams, fix(runtime:msg:string): json marshal method (cases with one letter in stream product example)
  • Loading branch information
emil14 committed Oct 3, 2024
1 parent 58e8a2e commit ec05c8d
Show file tree
Hide file tree
Showing 6 changed files with 89 additions and 76 deletions.
39 changes: 21 additions & 18 deletions examples/stream_product/e2e_test.go
Original file line number Diff line number Diff line change
@@ -1,13 +1,25 @@
package test

import (
"fmt"
"os"
"os/exec"
"testing"

"github.com/stretchr/testify/require"
)

var expectedOutput = `{"first": 0, "second": 0}
{"first": 0, "second": 1}
{"first": 0, "second": 2}
{"first": 1, "second": 0}
{"first": 1, "second": 1}
{"first": 1, "second": 2}
{"first": 2, "second": 0}
{"first": 2, "second": 1}
{"first": 2, "second": 2}
`

func Test(t *testing.T) {
err := os.Chdir("..")
require.NoError(t, err)
Expand All @@ -16,24 +28,15 @@ func Test(t *testing.T) {
require.NoError(t, err)
defer os.Chdir(wd)

cmd := exec.Command("neva", "run", "stream_product")
for i := 0; i < 1; i++ {
t.Run(fmt.Sprintf("Run %d", i+1), func(t *testing.T) {
cmd := exec.Command("neva", "run", "stream_product")

out, err := cmd.CombinedOutput()
require.NoError(t, err)
require.Equal(
t,
`{"first": 0, "second": 0}
{"first": 0, "second": 1}
{"first": 0, "second": 2}
{"first": 1, "second": 0}
{"first": 1, "second": 1}
{"first": 1, "second": 2}
{"first": 2, "second": 0}
{"first": 2, "second": 1}
{"first": 2, "second": 2}
`,
string(out),
)
out, err := cmd.CombinedOutput()
require.NoError(t, err)
require.Equal(t, expectedOutput, string(out))

require.Equal(t, 0, cmd.ProcessState.ExitCode())
require.Equal(t, 0, cmd.ProcessState.ExitCode())
})
}
}
25 changes: 8 additions & 17 deletions examples/stream_product/main.neva
Original file line number Diff line number Diff line change
@@ -1,21 +1,12 @@
import { streams }

flow Main(start) (stop) {
r1 Range
r2 Range
streams.Product<int, int>
ForEach<streams.ProductResult<int, int>> {
Println<streams.ProductResult<int, int>>
}
Wait
---
:start -> [
(0 -> [r1:from, r2:from]),
(3 -> [r1:to, r2:to])
]

r1 -> product:first
r2 -> product:second

product -> forEach -> wait -> :stop
r1 Range, r2 Range
streams.Product<int, int>
ForEach{Println}, Wait
---
:start -> [(0 -> [r1:from, r2:from]), (3 -> [r1:to, r2:to])]
r1 -> product:first
r2 -> product:second
product -> forEach -> wait -> :stop
}
20 changes: 7 additions & 13 deletions examples/stream_zip/main.neva
Original file line number Diff line number Diff line change
Expand Up @@ -3,17 +3,11 @@ import { streams }
const strings list<string> = ['a', 'b', 'c']

flow Main(start) (stop) {
r1 Range
ListToStream<string>
streams.Zip<int, string>
ForEach{Println}
Wait
---
:start -> [
(0 -> r1:from),
(10 -> r1:to)
]
r1 -> zip:first
$strings -> listToStream -> zip:second
zip -> forEach -> wait -> :stop
Range, ListToStream<string>, streams.Zip<int, string>
ForEach{Println}, Wait
---
:start -> [(0 -> range:from), (10 -> range:to)]
range -> zip:first
$strings -> listToStream -> zip:second
zip -> forEach -> wait -> :stop
}
3 changes: 2 additions & 1 deletion internal/compiler/backend/golang/tpl.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,8 +34,8 @@ func main() {
{{.}} = make(chan runtime.OrderedMsg)
{{- end}}
)
{{- if .Trace }}
interceptor := runtime.NewDebugInterceptor()
close, err := interceptor.Open("trace.log")
Expand All @@ -48,6 +48,7 @@ func main() {
}
}()
{{- else }}
interceptor := runtime.ProdInterceptor{}
{{- end }}
Expand Down
70 changes: 46 additions & 24 deletions internal/runtime/funcs/stream_product.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package funcs

import (
"context"
"sync"

"github.com/nevalang/neva/internal/runtime"
)
Expand Down Expand Up @@ -30,44 +31,65 @@ func (streamProduct) Create(
// TODO: make sure it's not possible to do processing on the fly so we don't have to wait for both streams to complete
return func(ctx context.Context) {
for {
firstData := []runtime.Msg{}
for {
seqMsg, ok := firstIn.Receive(ctx)
if !ok {
return
}
var (
firstOk, secondOk bool
firstData = []runtime.Msg{}
secondData = []runtime.Msg{}
)

item := seqMsg.Struct()
firstData = append(firstData, item.Get("data"))
var wg sync.WaitGroup
wg.Add(2)

if item.Get("last").Bool() {
break
}
}
go func() {
defer wg.Done()
for {
var firstMsg runtime.Msg
firstMsg, firstOk = firstIn.Receive(ctx)
if !firstOk {
return
}

streamItem := firstMsg.Struct()
firstData = append(firstData, streamItem.Get("data"))

secondData := []runtime.Msg{}
for {
seqMsg, ok := secondIn.Receive(ctx)
if !ok {
return
if streamItem.Get("last").Bool() {
break
}
}
}()

item := seqMsg.Struct()
secondData = append(secondData, item.Get("data"))
go func() {
defer wg.Done()
for {
var secondMsg runtime.Msg
secondMsg, secondOk = secondIn.Receive(ctx)
if !secondOk {
return
}

if item.Get("last").Bool() {
break
streamItem := secondMsg.Struct()
secondData = append(secondData, streamItem.Get("data"))

if streamItem.Get("last").Bool() {
break
}
}
}()

wg.Wait()

if !firstOk || !secondOk {
return
}

for i, msg1 := range firstData {
for j, msg2 := range secondData {
for i, firstMsg := range firstData {
for j, secondMsg := range secondData {
seqOut.Send(
ctx,
streamItem(
runtime.NewStructMsg(
[]string{"first", "second"},
[]runtime.Msg{msg1, msg2},
[]runtime.Msg{firstMsg, secondMsg},
),
int64(i),
i == len(firstData)-1 && j == len(secondData)-1,
Expand Down
8 changes: 5 additions & 3 deletions internal/runtime/message.go
Original file line number Diff line number Diff line change
Expand Up @@ -102,9 +102,11 @@ type StringMsg struct {
v string
}

func (msg StringMsg) Str() string { return msg.v }
func (msg StringMsg) String() string { return msg.v }
func (msg StringMsg) MarshalJSON() ([]byte, error) { return []byte(msg.String()), nil }
func (msg StringMsg) Str() string { return msg.v }
func (msg StringMsg) String() string { return msg.v }
func (msg StringMsg) MarshalJSON() ([]byte, error) {
return json.Marshal(msg.String())
}

func NewStringMsg(s string) StringMsg {
return StringMsg{
Expand Down

0 comments on commit ec05c8d

Please sign in to comment.