Skip to content

Commit

Permalink
Push reader
Browse files Browse the repository at this point in the history
  • Loading branch information
josh-newman committed Nov 6, 2020
1 parent e841916 commit 5c99597
Show file tree
Hide file tree
Showing 3 changed files with 112 additions and 1 deletion.
80 changes: 80 additions & 0 deletions push_read.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,80 @@
package bigslice

import (
"fmt"
"reflect"

"github.com/grailbio/base/must"
"github.com/grailbio/bigslice/slicefunc"
"github.com/grailbio/bigslice/sliceio"
"github.com/grailbio/bigslice/typecheck"
)

func PushReader(nshard int, sinkRead interface{}, prags ...Pragma) Slice {
fn, ok := slicefunc.Of(sinkRead)
if !ok || fn.In.NumOut() != 2 || fn.In.Out(0).Kind() != reflect.Int {
typecheck.Panicf(1, "pushreader: invalid reader function type %T", sinkRead)
}

var (
sinkType = fn.In.Out(1)
errorType = reflect.TypeOf((*error)(nil)).Elem()
errorNilValue = reflect.Zero(errorType)
)

type state struct {
sunkC chan []reflect.Value
err error
}
readerFuncImpl := func(args []reflect.Value) []reflect.Value {
state := args[1].Interface().(*state)
if state.sunkC == nil {
state.sunkC = make(chan []reflect.Value, defaultChunksize)
sinkImpl := func(args []reflect.Value) []reflect.Value {
state.sunkC <- args
return nil
}
sinkFunc := reflect.MakeFunc(sinkType, sinkImpl)
go func() {
defer close(state.sunkC)
defer func() {
if p := recover(); p != nil {
state.err = fmt.Errorf("pushreader: panic from read: %v", p)
}
}()
outs := reflect.ValueOf(sinkRead).Call([]reflect.Value{args[0], sinkFunc})
if errI := outs[0].Interface(); errI != nil {
state.err = errI.(error)
}
}()
}

var rows int
for rows < args[2].Len() {
row := <-state.sunkC
if row == nil {
state.err = sliceio.EOF
break
}
must.True(len(row) == len(args[2:]), "%v, %v", len(row), len(args[2:]))
for c := range row {
args[2+c].Index(rows).Set(row[c])
}
rows++
}
errValue := errorNilValue
if state.err != nil {
errValue = reflect.ValueOf(state.err)
}
return []reflect.Value{reflect.ValueOf(rows), errValue}
}
readerFuncArgTypes := []reflect.Type{reflect.TypeOf(int(0)), reflect.TypeOf(&state{})}
for i := 0; i < sinkType.NumIn(); i++ {
readerFuncArgTypes = append(readerFuncArgTypes, reflect.SliceOf(sinkType.In(i)))
}
readerFuncType := reflect.FuncOf(readerFuncArgTypes,
[]reflect.Type{reflect.TypeOf(int(0)), errorType}, false)
readerFunc := reflect.MakeFunc(readerFuncType, readerFuncImpl)

return ReaderFunc(nshard, readerFunc.Interface(), prags...)
}
31 changes: 31 additions & 0 deletions push_read_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
package bigslice_test

import (
"testing"

fuzz "github.com/google/gofuzz"
"github.com/grailbio/bigslice"
)

func TestPushReader(t *testing.T) {
const (
N = 3
Nshard = 1
)
slice := bigslice.PushReader(Nshard, func(shard int, sink func(string, int)) error {
fuzzer := fuzz.NewWithSeed(1)
var row struct {
string
int
}
for i := 0; i < N; i++ {
fuzzer.Fuzz(&row)
sink(row.string, row.int)
}
return nil
})
// Map everything to the same key so we can count them.
slice = bigslice.Map(slice, func(s string, i int) (key string, count int) { return "", 1 })
slice = bigslice.Fold(slice, func(a, e int) int { return a + e })
assertEqual(t, slice, false, []string{""}, []int{N * Nshard})
}
2 changes: 1 addition & 1 deletion slice_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -325,7 +325,7 @@ func TestReaderFunc(t *testing.T) {
t.Errorf("%d (of %d) nonzero rows", nnonzero, len(strings))
}
if state.Fuzzer == nil {
state.Fuzzer = fuzz.New()
state.Fuzzer = fuzz.NewWithSeed(1)
}
state.NumElements(1, len(strings))
var (
Expand Down

0 comments on commit 5c99597

Please sign in to comment.