-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathframing.go
435 lines (401 loc) · 12.9 KB
/
framing.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
package flowd
import (
"bufio"
"bytes"
"errors"
"fmt"
"io"
"net/textproto"
"strconv"
"strings"
)
// IP is one of the central concepts in FBP; realized here as a type alias
type IP *Frame
// Frame is the serializable structure for carrying information through the processing network
type Frame struct {
Type string
BodyType string
Port string
Extensions map[string]string
Body []byte
}
/*
v1 format based on MIME:
Type: data.MessageTypeRN
Port: OUTRN
Content-Length: 10RN
Conn-Id: 123RN
RN
= 71 bytes
v2 format based on STOMP v1.2:
2dataN
type:MessageTypeN
port:OUTN
length:10N
conn-id:123N
N
= 54 bytes
*/
const maxBodySize = 1 * 1000 * 1000 * 1000 // 1 GByte
var (
portBytes = []byte{'p', 'o', 'r', 't'}
lengthBytes = []byte{'l', 'e', 'n', 'g', 't', 'h'}
typeBytes = []byte{'t', 'y', 'p', 'e'}
)
// StringByteWriter is what Serialize() uses - satisfied by *bufio.Writer and *bytes.Buffer
// NOTE: io.WriteString() uses type assertion whereas bufio.Writer.WriteString() does not
type StringByteWriter interface {
io.Writer
io.ByteWriter
WriteString(s string) (int, error)
}
// Deserialize reads an IP from a buffered data stream, like STDOUT from a network process or a network connection
/*
The frame format is that of STOMP v1.2, with the following modifications:
* content-length header field is renamed length
* length header field must be present if body is present
* if length header is absent, no scanning until null byte is done, but absent body is assumed
* content-type is optional and is no special header field, but may be used by components and application networks
* TODO no escape substitution in header values - for now at least
* TODO no support for multiple occurrences of same header field; gets overwritten -> last value remains
* all header field names are to be in lowercase
* COMMAND line contains not a command, but the frame type, also in lowercase, e.g. data, control
* the port and type header fields have special meaning; type = body type, port = input / output port name
* before COMMAND, a single byte is read (version marker) to designate the frame format version
* this format has designation '2' = 0x32
*/
//TODO want re-use err -> but then have to var-define all other return values -> ugly -> benchmark
func Deserialize(stream *bufio.Reader) (f *Frame, err error) {
// read version marker
version, err := stream.ReadByte()
if err != nil {
// first read is usual place to block for new frame, so EOF is no error here
if err == io.EOF {
return nil, err
}
return nil, errors.New("reading version marker: " + err.Error())
}
switch version {
case '2':
// OK, will do it here
case '1':
// old format
return DeserializeV1(stream)
default:
// unknown version
return nil, errors.New("unknown version marker: " + string(version))
}
/* TODO
if version != '2' {
//TODO
}
*/
// initialize frame
f = &Frame{} // NOTE: same as new(Frame)
// read STOMP command line = frame type
frameType, _, err := stream.ReadLine()
if err != nil {
return nil, errors.New("reading frame type: " + err.Error())
}
f.Type = string(frameType)
// read header
var bodyLength uint64
line, _, err := stream.ReadLine()
if err != nil {
return nil, errors.New("reading header line: " + err.Error())
}
// read until empty line = end of header
var sepIndex int
var key []byte
var value string
for len(line) > 0 {
// split line on :
sepIndex = bytes.IndexByte(line, ':')
if sepIndex == -1 {
return nil, errors.New("splitting header line: separator ':' missing")
} else if sepIndex == 0 {
return nil, errors.New("malformed header line: key / field name missing")
} else if sepIndex == len(line) {
return nil, errors.New("malformed header line: value missing")
}
key = line[:sepIndex]
value = string(line[sepIndex+1:])
// store line appropriately
// NOTE: Go has no switch on []byte
if bytes.Equal(key, portBytes) {
f.Port = value
} else if bytes.Equal(key, lengthBytes) {
bodyLength, err = strconv.ParseUint(value, 10, 0)
if err != nil {
return nil, errors.New("parsing body length value: " + err.Error())
}
if bodyLength < 0 || bodyLength > maxBodySize {
return nil, fmt.Errorf("given body length out of bounds: %d", bodyLength)
}
} else if bytes.Equal(key, typeBytes) {
f.BodyType = value
} else {
// all other fields ("extensions")
if f.Extensions == nil {
f.Extensions = map[string]string{}
}
f.Extensions[string(key)] = value
}
// try to read next header line
line, _, err = stream.ReadLine()
if err != nil {
return nil, errors.New("reading next header line: " + err.Error())
}
}
// read body
if bodyLength > 0 {
buf := make([]byte, bodyLength)
if _, err = io.ReadFull(stream, buf); err != nil {
if err == io.EOF {
return nil, errors.New("reading full frame body hit EOF: " + err.Error())
}
return nil, errors.New("reading full frame body short read: " + err.Error())
}
f.Body = buf
}
// read frame terminator octet = \0
nul, err := stream.ReadByte()
if err != nil {
return nil, errors.New("reading frame terminator: " + err.Error())
}
if nul != 0x00 {
return nil, errors.New("frame terminator is not 0x00: " + string(nul))
}
return f, nil
}
// DeserializeV1 deserializes a frame in v1 format = strict MIME header
// NOTE: require bufio.Reader not io.Reader, because textproto.Reader requires one. Making a local one would swallow any following frames into it.
func DeserializeV1(stream *bufio.Reader) (f *Frame, err error) {
// read headers
textReader := textproto.NewReader(stream) //TODO To avoid denial of service attacks, the provided bufio.Reader should be reading from an io.LimitReader or similar Reader to bound the size of responses.
header, err := textReader.ReadMIMEHeader()
var port []string
var ipType []string
var found bool
if err != nil {
if err == io.EOF {
// just an EOF received, return it as such
return nil, err
}
return nil, errors.New("cannot parse into frame header: " + err.Error())
}
if ipType, found = header["Type"]; !found {
return nil, errors.New("missing Type header field")
}
if port, found = header["Port"]; !found {
return nil, errors.New("missing Port header field")
}
types := strings.SplitN(ipType[0], ".", 2)
if len(types) != 2 {
return nil, errors.New("missing separator in Type header field")
}
// initialize frame structure
f = &Frame{Type: types[0], BodyType: types[1], Port: port[0], Body: nil}
// read content length
var lenStr []string
if lenStr, found = header["Content-Length"]; !found {
return nil, errors.New("missing Content-Length header field")
}
lenInt, err := strconv.Atoi(lenStr[0])
if err != nil {
return nil, errors.New("converting content length to integer: " + err.Error())
}
// read any remaining header fields into frame.Extensions
//FIXME optimize: do without the deletions
//TODO decide if map[string]string suffices (are duplicate headers useful? maybe for layered information.)
if len(header) > 3 {
delete(header, "Type")
delete(header, "Port")
delete(header, "Content-Length")
f.Extensions = make(map[string]string)
for key, values := range header {
//FIXME implement this correctly
f.Extensions[key] = values[0]
//fmt.Fprintf(os.Stderr, "framing got extension header %s = %s\n", key, values[0])
}
}
// read body
buf := make([]byte, lenInt)
if n, err := io.ReadFull(stream, buf); err != nil {
if err == io.EOF {
return nil, errors.New("reading full frame body encountered EOF: " + err.Error())
}
return nil, fmt.Errorf("reading full frame body short read %d bytes of %d expected: %s", n, lenInt, err.Error())
}
f.Body = buf
return f, nil
}
var (
typeSepBytes = []byte{'t', 'y', 'p', 'e', ':'}
portSepBytes = []byte{'p', 'o', 'r', 't', ':'}
lengthSepBytes = []byte{'l', 'e', 'n', 'g', 't', 'h', ':'}
)
// Serialize serializes an IP into a data stream, like STDIN into a network process or a network connection
//TODO optimize: does Go pre-calculate all values like []byte{'2'} ?
//TODO optimize: is +"\n" efficient?
func (f *Frame) Serialize(stream StringByteWriter) (err error) {
// write version marker
err = stream.WriteByte('2')
if err != nil {
return errors.New("writing version marker: " + err.Error())
}
// write frame type
if f.Type == "" {
return errors.New("type is empty")
}
// NOTE: strings.ToUpper() increases runtime by 7 %
_, err = stream.WriteString(f.Type)
if err != nil {
return errors.New("writing frame type: " + err.Error())
}
err = stream.WriteByte('\n')
if err != nil {
return errors.New("writing frame type newline: " + err.Error())
}
// write body type, if present
// NOTE: concatenating strings is more expensive than multiple Write() calls
// NOTE: fmt.Sprintf is expensive
if f.BodyType != "" {
_, err = stream.Write(typeSepBytes)
if err != nil {
return errors.New("writing body type key: " + err.Error())
}
_, err = stream.WriteString(f.BodyType)
if err != nil {
return errors.New("writing body type value: " + err.Error())
}
err = stream.WriteByte('\n')
if err != nil {
return errors.New("writing body type newline: " + err.Error())
}
}
// write port, if present
if f.Port != "" {
_, err = stream.Write(portSepBytes)
if err != nil {
return errors.New("writing port key: " + err.Error())
}
_, err = stream.WriteString(f.Port)
if err != nil {
return errors.New("writing port value: " + err.Error())
}
err = stream.WriteByte('\n')
if err != nil {
return errors.New("writing port newline: " + err.Error())
}
}
// write any other header fields, if present
if f.Extensions != nil {
for key, value := range f.Extensions {
// write line
// NOTE: strings.ToLower() increases runtime by 10 %
_, err = stream.WriteString(key)
if err != nil {
return errors.New("writing extension header lines key: " + err.Error())
}
err = stream.WriteByte(':')
if err != nil {
return errors.New("writing extension header lines separator: " + err.Error())
}
_, err = stream.WriteString(value)
if err != nil {
return errors.New("writing extension header lines value: " + err.Error())
}
err = stream.WriteByte('\n')
if err != nil {
return errors.New("writing extension header lines newline: " + err.Error())
}
}
}
// if body present, write length
if f.Body != nil {
_, err = stream.Write(lengthSepBytes)
if err != nil {
return errors.New("writing body length key: " + err.Error())
}
_, err = stream.WriteString(strconv.Itoa(len(f.Body)))
if err != nil {
return errors.New("writing body length value: " + err.Error())
}
err = stream.WriteByte('\n')
if err != nil {
return errors.New("writing body length newline: " + err.Error())
}
// write end-of-header marker = empty line
err = stream.WriteByte('\n')
if err != nil {
return errors.New("writing end-of-header marker: " + err.Error())
}
// write body
_, err = stream.Write(f.Body)
if err != nil {
return errors.New("writing body: " + err.Error())
}
} else {
// write only end-of-header marker = empty line
err = stream.WriteByte('\n')
if err != nil {
return errors.New("writing end-of-header marker: " + err.Error())
}
}
// write frame terminator null byte
err = stream.WriteByte(0x00)
if err != nil {
return errors.New("writing frame terminator: " + err.Error())
}
// success
return nil
}
// SerializeV1 serializes an IP into a data stream in previous format (strict MIME + content-length)
//TODO avoid allocating buffered writer on every call
func (f *Frame) SerializeV1(stream StringByteWriter) error {
if f == nil {
return errors.New("refusing to marshal nil frame")
}
bufw := bufio.NewWriter(stream)
tpw := textproto.NewWriter(bufw)
if err := printHeaderLine(tpw, "type", f.Type+"."+f.BodyType); err != nil {
return errors.New("marshal: " + err.Error())
}
if err := printHeaderLine(tpw, "port", f.Port); err != nil {
return errors.New("marshal: " + err.Error())
}
if err := printHeaderLine(tpw, "content-length", strconv.Itoa(len(f.Body))); err != nil {
return errors.New("marshal: " + err.Error())
}
if f.Extensions != nil {
for key, value := range f.Extensions {
if err := printHeaderLine(tpw, key, value); err != nil {
return errors.New("marshal extension header: " + err.Error())
}
//fmt.Fprintf(os.Stderr, "marshal extension header: %s = %s\n", key, value)
}
}
if err := finalizeHeader(tpw); err != nil {
return errors.New("marshal: " + err.Error())
}
if _, err := bufw.Write(f.Body); err != nil {
return errors.New("marshal: writing body: " + err.Error())
}
if err := bufw.Flush(); err != nil {
return errors.New("marshal: flushing writer: " + err.Error())
}
return nil
}
func printHeaderLine(w *textproto.Writer, key string, value string) error {
if err := w.PrintfLine("%s: %s", textproto.CanonicalMIMEHeaderKey(key), value); err != nil {
return errors.New("writing header line: " + err.Error())
}
return nil
}
func finalizeHeader(w *textproto.Writer) error {
if err := w.PrintfLine(""); err != nil {
return errors.New("finalizing header: " + err.Error())
}
return nil
}