forked from IBM/sarama
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathlength_field.go
99 lines (79 loc) · 2.05 KB
/
length_field.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
package sarama
import (
"encoding/binary"
"sync"
)
// LengthField implements the PushEncoder and PushDecoder interfaces for calculating 4-byte lengths.
type lengthField struct {
startOffset int
length int32
}
var lengthFieldPool = sync.Pool{}
func acquireLengthField() *lengthField {
val := lengthFieldPool.Get()
if val != nil {
return val.(*lengthField)
}
return &lengthField{}
}
func releaseLengthField(m *lengthField) {
lengthFieldPool.Put(m)
}
func (l *lengthField) decode(pd packetDecoder) error {
var err error
l.length, err = pd.getInt32()
if err != nil {
return err
}
if l.length > int32(pd.remaining()) {
return ErrInsufficientData
}
return nil
}
func (l *lengthField) saveOffset(in int) {
l.startOffset = in
}
func (l *lengthField) reserveLength() int {
return 4
}
func (l *lengthField) run(curOffset int, buf []byte) error {
binary.BigEndian.PutUint32(buf[l.startOffset:], uint32(curOffset-l.startOffset-4))
return nil
}
func (l *lengthField) check(curOffset int, buf []byte) error {
if int32(curOffset-l.startOffset-4) != l.length {
return PacketDecodingError{"length field invalid"}
}
return nil
}
type varintLengthField struct {
startOffset int
length int64
}
func (l *varintLengthField) decode(pd packetDecoder) error {
var err error
l.length, err = pd.getVarint()
return err
}
func (l *varintLengthField) saveOffset(in int) {
l.startOffset = in
}
func (l *varintLengthField) adjustLength(currOffset int) int {
oldFieldSize := l.reserveLength()
l.length = int64(currOffset - l.startOffset - oldFieldSize)
return l.reserveLength() - oldFieldSize
}
func (l *varintLengthField) reserveLength() int {
var tmp [binary.MaxVarintLen64]byte
return binary.PutVarint(tmp[:], l.length)
}
func (l *varintLengthField) run(curOffset int, buf []byte) error {
binary.PutVarint(buf[l.startOffset:], l.length)
return nil
}
func (l *varintLengthField) check(curOffset int, buf []byte) error {
if int64(curOffset-l.startOffset-l.reserveLength()) != l.length {
return PacketDecodingError{"length field invalid"}
}
return nil
}