forked from quickfixgo/quickfix
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathmessage.go
620 lines (534 loc) · 17.4 KB
/
message.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
// Copyright (c) quickfixengine.org All rights reserved.
//
// This file may be distributed under the terms of the quickfixengine.org
// license as defined by quickfixengine.org and appearing in the file
// LICENSE included in the packaging of this file.
//
// This file is provided AS IS with NO WARRANTY OF ANY KIND, INCLUDING
// THE WARRANTY OF DESIGN, MERCHANTABILITY AND FITNESS FOR A
// PARTICULAR PURPOSE.
//
// See http://www.quickfixengine.org/LICENSE for licensing information.
//
// Contact ask@quickfixengine.org if any conditions of this licensing
// are not clear to you.
package quickfix
import (
"bytes"
"fmt"
"math"
"time"
"github.com/quickfixgo/quickfix/datadictionary"
)
// Header is first section of a FIX Message.
type Header struct{ FieldMap }
// msgparser contains message parsing vars needed to parse a string into a message.
type msgParser struct {
msg *Message
transportDataDictionary *datadictionary.DataDictionary
appDataDictionary *datadictionary.DataDictionary
rawBytes []byte
fieldIndex int
parsedFieldBytes *TagValue
trailerBytes []byte
foundBody bool
foundTrailer bool
}
// in the message header, the first 3 tags in the message header must be 8,9,35.
func headerFieldOrdering(i, j Tag) bool {
var ordering = func(t Tag) uint32 {
switch t {
case tagBeginString:
return 1
case tagBodyLength:
return 2
case tagMsgType:
return 3
}
return math.MaxUint32
}
orderi := ordering(i)
orderj := ordering(j)
switch {
case orderi < orderj:
return true
case orderi > orderj:
return false
}
return i < j
}
// Init initializes the Header instance.
func (h *Header) Init() {
h.initWithOrdering(headerFieldOrdering)
}
// Body is the primary application section of a FIX message.
type Body struct{ FieldMap }
// Init initializes the FIX message.
func (b *Body) Init() {
b.init()
}
// Trailer is the last section of a FIX message.
type Trailer struct{ FieldMap }
// In the trailer, CheckSum (tag 10) must be last.
func trailerFieldOrdering(i, j Tag) bool {
switch {
case i == tagCheckSum:
return false
case j == tagCheckSum:
return true
}
return i < j
}
// Init initializes the FIX message.
func (t *Trailer) Init() {
t.initWithOrdering(trailerFieldOrdering)
}
// Message is a FIX Message abstraction.
type Message struct {
Header Header
Trailer Trailer
Body Body
// ReceiveTime is the time that this message was read from the socket connection.
ReceiveTime time.Time
rawMessage *bytes.Buffer
// Slice of Bytes corresponding to the message body.
bodyBytes []byte
// Field bytes as they appear in the raw message.
fields []TagValue
}
// ToMessage returns the message itself.
func (m *Message) ToMessage() *Message { return m }
// parseError is returned when bytes cannot be parsed as a FIX message.
type parseError struct {
OrigError string
}
func (e parseError) Error() string { return fmt.Sprintf("error parsing message: %s", e.OrigError) }
// NewMessage returns a newly initialized Message instance.
func NewMessage() *Message {
m := new(Message)
m.Header.Init()
m.Body.Init()
m.Trailer.Init()
return m
}
// CopyInto erases the dest messages and copies the currency message content
// into it.
func (m *Message) CopyInto(to *Message) {
m.Header.CopyInto(&to.Header.FieldMap)
m.Body.CopyInto(&to.Body.FieldMap)
m.Trailer.CopyInto(&to.Trailer.FieldMap)
to.ReceiveTime = m.ReceiveTime
to.bodyBytes = make([]byte, len(m.bodyBytes))
copy(to.bodyBytes, m.bodyBytes)
to.fields = make([]TagValue, len(m.fields))
for i := range to.fields {
to.fields[i].init(m.fields[i].tag, m.fields[i].value)
}
}
// ParseMessage constructs a Message from a byte slice wrapping a FIX message.
func ParseMessage(msg *Message, rawMessage *bytes.Buffer) (err error) {
return ParseMessageWithDataDictionary(msg, rawMessage, nil, nil)
}
// ParseMessageWithDataDictionary constructs a Message from a byte slice wrapping a FIX message using an optional session and application DataDictionary for reference.
func ParseMessageWithDataDictionary(
msg *Message,
rawMessage *bytes.Buffer,
transportDataDictionary *datadictionary.DataDictionary,
appDataDictionary *datadictionary.DataDictionary,
) (err error) {
// Create msgparser before we go any further.
mp := &msgParser{
msg: msg,
transportDataDictionary: transportDataDictionary,
appDataDictionary: appDataDictionary,
}
mp.msg.rawMessage = rawMessage
mp.rawBytes = rawMessage.Bytes()
return doParsing(mp)
}
// doParsing executes the message parsing process.
func doParsing(mp *msgParser) (err error) {
mp.msg.Header.rwLock.Lock()
defer mp.msg.Header.rwLock.Unlock()
mp.msg.Body.rwLock.Lock()
defer mp.msg.Body.rwLock.Unlock()
mp.msg.Trailer.rwLock.Lock()
defer mp.msg.Trailer.rwLock.Unlock()
// Initialize for parsing.
mp.msg.Header.clearNoLock()
mp.msg.Body.clearNoLock()
mp.msg.Trailer.clearNoLock()
// Allocate expected message fields in one chunk.
fieldCount := bytes.Count(mp.rawBytes, []byte{'\001'})
if fieldCount == 0 {
return parseError{OrigError: fmt.Sprintf("No Fields detected in %s", string(mp.rawBytes))}
}
if cap(mp.msg.fields) < fieldCount {
mp.msg.fields = make([]TagValue, fieldCount)
} else {
mp.msg.fields = mp.msg.fields[0:fieldCount]
}
// Message must start with begin string, body length, msg type.
// Get begin string.
if mp.rawBytes, err = extractSpecificField(&mp.msg.fields[mp.fieldIndex], tagBeginString, mp.rawBytes); err != nil {
return
}
mp.msg.Header.add(mp.msg.fields[mp.fieldIndex : mp.fieldIndex+1])
// Get body length.
mp.fieldIndex++
mp.parsedFieldBytes = &mp.msg.fields[mp.fieldIndex]
if mp.rawBytes, err = extractSpecificField(mp.parsedFieldBytes, tagBodyLength, mp.rawBytes); err != nil {
return
}
mp.msg.Header.add(mp.msg.fields[mp.fieldIndex : mp.fieldIndex+1])
// Get msg type.
mp.fieldIndex++
mp.parsedFieldBytes = &mp.msg.fields[mp.fieldIndex]
if mp.rawBytes, err = extractSpecificField(mp.parsedFieldBytes, tagMsgType, mp.rawBytes); err != nil {
return
}
mp.msg.Header.add(mp.msg.fields[mp.fieldIndex : mp.fieldIndex+1])
// Start parsing.
mp.fieldIndex++
xmlDataLen := 0
xmlDataMsg := false
mp.trailerBytes = []byte{}
mp.foundBody = false
mp.foundTrailer = false
for {
mp.parsedFieldBytes = &mp.msg.fields[mp.fieldIndex]
if xmlDataLen > 0 {
mp.rawBytes, err = extractXMLDataField(mp.parsedFieldBytes, mp.rawBytes, xmlDataLen)
xmlDataLen = 0
xmlDataMsg = true
} else {
mp.rawBytes, err = extractField(mp.parsedFieldBytes, mp.rawBytes)
}
if err != nil {
return
}
switch {
case isHeaderField(mp.parsedFieldBytes.tag, mp.transportDataDictionary):
mp.msg.Header.add(mp.msg.fields[mp.fieldIndex : mp.fieldIndex+1])
case isTrailerField(mp.parsedFieldBytes.tag, mp.transportDataDictionary):
mp.msg.Trailer.add(mp.msg.fields[mp.fieldIndex : mp.fieldIndex+1])
mp.foundTrailer = true
case isNumInGroupField(mp.msg, []Tag{mp.parsedFieldBytes.tag}, mp.appDataDictionary):
parseGroup(mp, []Tag{mp.parsedFieldBytes.tag})
default:
mp.foundBody = true
mp.trailerBytes = mp.rawBytes
mp.msg.Body.add(mp.msg.fields[mp.fieldIndex : mp.fieldIndex+1])
}
if mp.parsedFieldBytes.tag == tagCheckSum {
break
}
if !mp.foundBody {
mp.msg.bodyBytes = mp.rawBytes
}
if mp.parsedFieldBytes.tag == tagXMLDataLen {
xmlDataLen, _ = mp.msg.Header.getIntNoLock(tagXMLDataLen)
}
mp.fieldIndex++
}
// This will happen if there are no fields in the body
if mp.foundTrailer && !mp.foundBody {
mp.trailerBytes = mp.rawBytes
mp.msg.bodyBytes = nil
}
// Body length would only be larger than trailer if fields out of order.
if len(mp.msg.bodyBytes) > len(mp.trailerBytes) {
mp.msg.bodyBytes = mp.msg.bodyBytes[:len(mp.msg.bodyBytes)-len(mp.trailerBytes)]
}
length := 0
for _, field := range mp.msg.fields {
switch field.tag {
case tagBeginString, tagBodyLength, tagCheckSum: // Tags do not contribute to length.
default:
length += field.length()
}
}
bodyLength, err := mp.msg.Header.getIntNoLock(tagBodyLength)
if err != nil {
err = parseError{OrigError: err.Error()}
} else if length != bodyLength && !xmlDataMsg {
err = parseError{OrigError: fmt.Sprintf("Incorrect Message Length, expected %d, got %d", bodyLength, length)}
}
return
}
// parseGroup iterates through a repeating group to maintain correct order of those fields.
func parseGroup(mp *msgParser, tags []Tag) {
mp.foundBody = true
dm := mp.msg.fields[mp.fieldIndex : mp.fieldIndex+1]
fields := getGroupFields(mp.msg, tags, mp.appDataDictionary)
for {
mp.fieldIndex++
mp.parsedFieldBytes = &mp.msg.fields[mp.fieldIndex]
mp.rawBytes, _ = extractField(mp.parsedFieldBytes, mp.rawBytes)
mp.trailerBytes = mp.rawBytes
// Is this field a member for the group.
if isGroupMember(mp.parsedFieldBytes.tag, fields) {
// Is this field a nested repeating group.
if isNumInGroupField(mp.msg, append(tags, mp.parsedFieldBytes.tag), mp.appDataDictionary) {
dm = append(dm, *mp.parsedFieldBytes)
tags = append(tags, mp.parsedFieldBytes.tag)
fields = getGroupFields(mp.msg, tags, mp.appDataDictionary)
continue
}
// Add the field member to the group.
dm = append(dm, *mp.parsedFieldBytes)
} else if isHeaderField(mp.parsedFieldBytes.tag, mp.transportDataDictionary) {
// Found a header tag for some reason..
mp.msg.Body.add(dm)
mp.msg.Header.add(mp.msg.fields[mp.fieldIndex : mp.fieldIndex+1])
break
} else if isTrailerField(mp.parsedFieldBytes.tag, mp.transportDataDictionary) {
// Found the trailer at the end of the message.
mp.msg.Body.add(dm)
mp.msg.Trailer.add(mp.msg.fields[mp.fieldIndex : mp.fieldIndex+1])
mp.foundTrailer = true
break
} else {
// Found a body field outside the group.
searchTags := []Tag{mp.parsedFieldBytes.tag}
// Is this a new group not inside the existing group.
if isNumInGroupField(mp.msg, searchTags, mp.appDataDictionary) {
// Add the current repeating group.
mp.msg.Body.add(dm)
// Cycle again with the new group.
dm = mp.msg.fields[mp.fieldIndex : mp.fieldIndex+1]
fields = getGroupFields(mp.msg, searchTags, mp.appDataDictionary)
continue
}
if len(tags) > 1 {
searchTags = tags[:len(tags)-1]
}
// Did this tag occur after a nested group and belongs to the parent group.
if isNumInGroupField(mp.msg, searchTags, mp.appDataDictionary) {
// Add the field member to the group.
dm = append(dm, *mp.parsedFieldBytes)
// Continue parsing the parent group.
fields = getGroupFields(mp.msg, searchTags, mp.appDataDictionary)
continue
}
// Add the repeating group.
mp.msg.Body.add(dm)
// Add the next body field.
mp.msg.Body.add(mp.msg.fields[mp.fieldIndex : mp.fieldIndex+1])
break
}
}
}
// isNumInGroupField evaluates if this tag is the start of a repeating group.
// tags slice will contain multiple tags if the tag in question is found while processing a group already.
func isNumInGroupField(msg *Message, tags []Tag, appDataDictionary *datadictionary.DataDictionary) bool {
if appDataDictionary != nil {
msgt, err := msg.msgTypeNoLock()
if err != nil {
return false
}
mm, ok := appDataDictionary.Messages[msgt]
if ok {
fields := mm.Fields
for idx, tag := range tags {
fd, ok := fields[int(tag)]
if ok {
if idx == len(tags)-1 {
if len(fd.Fields) > 0 {
return true
}
} else {
// Map nested fields.
newFields := make(map[int]*datadictionary.FieldDef)
for _, ff := range fd.Fields {
newFields[ff.Tag()] = ff
}
fields = newFields
}
}
}
}
}
return false
}
// getGroupFields gets the relevant fields for parsing a repeating group if this tag is the start of a repeating group.
// tags slice will contain multiple tags if the tag in question is found while processing a group already.
func getGroupFields(msg *Message, tags []Tag, appDataDictionary *datadictionary.DataDictionary) (fields []*datadictionary.FieldDef) {
if appDataDictionary != nil {
msgt, err := msg.msgTypeNoLock()
if err != nil {
return
}
mm, ok := appDataDictionary.Messages[msgt]
if ok {
fields := mm.Fields
for idx, tag := range tags {
fd, ok := fields[int(tag)]
if ok {
if idx == len(tags)-1 {
if len(fd.Fields) > 0 {
return fd.Fields
}
} else {
// Map nested fields.
newFields := make(map[int]*datadictionary.FieldDef)
for _, ff := range fd.Fields {
newFields[ff.Tag()] = ff
}
fields = newFields
}
}
}
}
}
return
}
// isGroupMember evaluates if this tag belongs to a repeating group.
func isGroupMember(tag Tag, fields []*datadictionary.FieldDef) bool {
for _, f := range fields {
if f.Tag() == int(tag) {
return true
}
}
return false
}
func isHeaderField(tag Tag, dataDict *datadictionary.DataDictionary) bool {
if tag.IsHeader() {
return true
}
if dataDict == nil {
return false
}
_, ok := dataDict.Header.Fields[int(tag)]
return ok
}
func isTrailerField(tag Tag, dataDict *datadictionary.DataDictionary) bool {
if tag.IsTrailer() {
return true
}
if dataDict == nil {
return false
}
_, ok := dataDict.Trailer.Fields[int(tag)]
return ok
}
// MsgType returns MsgType (tag 35) field's value.
func (m *Message) MsgType() (string, MessageRejectError) {
return m.Header.GetString(tagMsgType)
}
func (m *Message) msgTypeNoLock() (string, MessageRejectError) {
return m.Header.getStringNoLock(tagMsgType)
}
// IsMsgTypeOf returns true if the Header contains MsgType (tag 35) field and its value is the specified one.
func (m *Message) IsMsgTypeOf(msgType string) bool {
if v, err := m.MsgType(); err == nil {
return v == msgType
}
return false
}
// reverseRoute returns a message builder with routing header fields initialized as the reverse of this message.
func (m *Message) reverseRoute() *Message {
reverseMsg := NewMessage()
copyFunc := func(src Tag, dest Tag) {
var field FIXString
if m.Header.GetField(src, &field) == nil {
if len(field) != 0 {
reverseMsg.Header.SetField(dest, field)
}
}
}
copyFunc(tagSenderCompID, tagTargetCompID)
copyFunc(tagSenderSubID, tagTargetSubID)
copyFunc(tagSenderLocationID, tagTargetLocationID)
copyFunc(tagTargetCompID, tagSenderCompID)
copyFunc(tagTargetSubID, tagSenderSubID)
copyFunc(tagTargetLocationID, tagSenderLocationID)
copyFunc(tagOnBehalfOfCompID, tagDeliverToCompID)
copyFunc(tagOnBehalfOfSubID, tagDeliverToSubID)
copyFunc(tagDeliverToCompID, tagOnBehalfOfCompID)
copyFunc(tagDeliverToSubID, tagOnBehalfOfSubID)
// Tags added in 4.1.
var beginString FIXString
if m.Header.GetField(tagBeginString, &beginString) == nil {
if string(beginString) != BeginStringFIX40 {
copyFunc(tagOnBehalfOfLocationID, tagDeliverToLocationID)
copyFunc(tagDeliverToLocationID, tagOnBehalfOfLocationID)
}
}
return reverseMsg
}
func extractSpecificField(field *TagValue, expectedTag Tag, buffer []byte) (remBuffer []byte, err error) {
remBuffer, err = extractField(field, buffer)
switch {
case err != nil:
return
case field.tag != expectedTag:
err = parseError{OrigError: fmt.Sprintf("extractSpecificField: Fields out of order, expected %d, got %d", expectedTag, field.tag)}
return
}
return
}
func extractXMLDataField(parsedFieldBytes *TagValue, buffer []byte, dataLen int) (remBytes []byte, err error) {
endIndex := bytes.IndexByte(buffer, '=')
if endIndex == -1 {
err = parseError{OrigError: "extractField: No Trailing Delim in " + string(buffer)}
remBytes = buffer
return
}
endIndex += dataLen + 1
err = parsedFieldBytes.parse(buffer[:endIndex+1])
return buffer[(endIndex + 1):], err
}
func extractField(parsedFieldBytes *TagValue, buffer []byte) (remBytes []byte, err error) {
endIndex := bytes.IndexByte(buffer, '\001')
if endIndex == -1 {
err = parseError{OrigError: "extractField: No Trailing Delim in " + string(buffer)}
remBytes = buffer
return
}
err = parsedFieldBytes.parse(buffer[:endIndex+1])
return buffer[(endIndex + 1):], err
}
func (m *Message) Bytes() []byte {
if m.rawMessage != nil {
return m.rawMessage.Bytes()
}
return m.build()
}
func (m *Message) String() string {
if m.rawMessage != nil {
return m.rawMessage.String()
}
return string(m.build())
}
func formatCheckSum(value int) string {
return fmt.Sprintf("%03d", value)
}
// Build constructs a []byte from a Message instance.
func (m *Message) build() []byte {
m.cook()
var b bytes.Buffer
m.Header.write(&b)
m.Body.write(&b)
m.Trailer.write(&b)
return b.Bytes()
}
// Constructs a []byte from a Message instance, using the given bodyBytes.
// This is a workaround for the fact that we currently rely on the generated Message types to properly serialize/deserialize RepeatingGroups.
// In other words, we cannot go from bytes to a Message then back to bytes, which is exactly what we need to do in the case of a Resend.
// This func lets us pull the Message from the Store, parse it, update the Header, and then build it back into bytes using the original Body.
// Note: The only standard non-Body group is NoHops. If that is used in the Header, this workaround may fail.
func (m *Message) buildWithBodyBytes(bodyBytes []byte) []byte {
m.cook()
var b bytes.Buffer
m.Header.write(&b)
b.Write(bodyBytes)
m.Trailer.write(&b)
return b.Bytes()
}
func (m *Message) cook() {
bodyLength := m.Header.length() + m.Body.length() + m.Trailer.length()
m.Header.SetInt(tagBodyLength, bodyLength)
checkSum := (m.Header.total() + m.Body.total() + m.Trailer.total()) % 256
m.Trailer.SetString(tagCheckSum, formatCheckSum(checkSum))
}