-
Notifications
You must be signed in to change notification settings - Fork 12
/
Copy pathio.go
143 lines (124 loc) · 2.92 KB
/
io.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
package gossamr
import (
"bufio"
"fmt"
"github.com/markchadwick/typedbytes"
"io"
"reflect"
)
func Copy(r Reader, w Writer) (err error) {
var k, v interface{}
for {
if k, v, err = r.Next(); err != nil {
if err == io.EOF {
return nil
}
return
}
if err = w.Write(k, v); err != nil {
return
}
}
}
type Reader interface {
Next() (k, v interface{}, err error)
}
type Writer interface {
Write(k, v interface{}) error
Close() error
}
// A reader that, for each key, will group all its values into a channel.
type GroupedReader struct {
nextKey interface{}
nextValue interface{}
nextError error
reader Reader
}
func NewGroupedReader(reader Reader) Reader {
return &GroupedReader{
nextKey: nil,
nextValue: nil,
reader: reader,
}
}
func (gr *GroupedReader) Next() (k, v interface{}, err error) {
if gr.nextError != nil {
err = gr.nextError
return
}
if gr.nextKey == nil && gr.nextValue == nil {
gr.nextKey, gr.nextValue, err = gr.reader.Next()
if err != nil {
return
}
}
key := gr.nextKey
t := reflect.ChanOf(reflect.BothDir, reflect.TypeOf(gr.nextValue))
ch := reflect.MakeChan(t, 0)
go func() {
defer ch.Close()
ch.Send(reflect.ValueOf(gr.nextValue))
for {
k, v, err = gr.reader.Next()
if err != nil {
gr.nextError = err
return
}
if k != key {
gr.nextKey = k
gr.nextValue = v
return
}
ch.Send(reflect.ValueOf(v))
}
}()
return key, ch.Interface(), nil
}
// Read pairs serialized with Hadoop's typedbytes. It is assumed that in
// non-local mode, this will always be the wire format for reading and writing.
func NewPairReader(r io.Reader) Reader {
byteReader := typedbytes.NewReader(r)
return typedbytes.NewPairReader(byteReader)
}
// Write pairs to an underlying writer in Hadoop's typedbytes format. As above,
// it is assumed all non-local IO will happen in this format
func NewPairWriter(w io.WriteCloser) Writer {
byteWriter := typedbytes.NewWriter(w)
return typedbytes.NewPairWriter(byteWriter)
}
// Line Reader is used by basic streaming jobs. It yields a line number and the
// raw line delimited by \n. The consumer must accept the arguments (int64,
// string).
type LineReader struct {
n int64
reader *bufio.Reader
}
func NewLineReader(r io.Reader) *LineReader {
reader := bufio.NewReader(r)
return &LineReader{
n: 0,
reader: reader,
}
}
func (lr *LineReader) Next() (k, v interface{}, err error) {
k = lr.n
var line []byte
v, err = lr.reader.ReadString('\n')
lr.n += int64(len(line))
return
}
// StringWriter will coax each key/value to a simple string and output it in
// simple streaming format: key\tvalue\n
type StringWriter struct {
w io.WriteCloser
}
func NewStringWriter(w io.WriteCloser) *StringWriter {
return &StringWriter{w}
}
func (sw *StringWriter) Write(k, v interface{}) (err error) {
_, err = fmt.Fprintf(sw.w, "%v\t%v\n", k, v)
return
}
func (sw *StringWriter) Close() error {
return sw.w.Close()
}