forked from uber/tchannel-go
-
Notifications
You must be signed in to change notification settings - Fork 3
/
Copy pathfragmenting_writer.go
299 lines (255 loc) · 9.05 KB
/
fragmenting_writer.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
// Copyright (c) 2015 Uber Technologies, Inc.
// Permission is hereby granted, free of charge, to any person obtaining a copy
// of this software and associated documentation files (the "Software"), to deal
// in the Software without restriction, including without limitation the rights
// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
// copies of the Software, and to permit persons to whom the Software is
// furnished to do so, subject to the following conditions:
//
// The above copyright notice and this permission notice shall be included in
// all copies or substantial portions of the Software.
//
// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
// THE SOFTWARE.
package tchannel
import (
"errors"
"fmt"
"github.com/temporalio/tchannel-go/typed"
)
var (
errAlreadyWritingArgument = errors.New("already writing argument")
errNotWritingArgument = errors.New("not writing argument")
errComplete = errors.New("last argument already sent")
)
const (
chunkHeaderSize = 2 // each chunk is a uint16
hasMoreFragmentsFlag = 0x01 // flags indicating there are more fragments coming
)
// A writableFragment is a fragment that can be written to, containing a buffer
// for contents, a running checksum, and placeholders for the fragment flags
// and final checksum value
type writableFragment struct {
flagsRef typed.ByteRef
checksumRef typed.BytesRef
checksum Checksum
contents *typed.WriteBuffer
frame *Frame
}
// finish finishes the fragment, updating the final checksum and fragment flags
func (f *writableFragment) finish(hasMoreFragments bool) {
f.checksumRef.Update(f.checksum.Sum())
if hasMoreFragments {
// Important: hasMoreFragmentsFlag is set if there are more fragments, but NOT CLEARED if there aren't.
// This allows for callReqContinue frames to follow a fragmented callReq frame e.g. when arg2 is modified
// by the relayer
f.flagsRef.Update(hasMoreFragmentsFlag)
} else {
f.checksum.Release()
}
}
// A writableChunk is a chunk of data within a fragment, representing the
// contents of an argument within that fragment
type writableChunk struct {
size uint16
sizeRef typed.Uint16Ref
checksum Checksum
contents *typed.WriteBuffer
}
// newWritableChunk creates a new writable chunk around a checksum and a buffer to hold data
func newWritableChunk(checksum Checksum, contents *typed.WriteBuffer) *writableChunk {
return &writableChunk{
size: 0,
sizeRef: contents.DeferUint16(),
checksum: checksum,
contents: contents,
}
}
// writeAsFits writes as many bytes from the given slice as fits into the chunk
func (c *writableChunk) writeAsFits(b []byte) int {
if len(b) > c.contents.BytesRemaining() {
b = b[:c.contents.BytesRemaining()]
}
c.checksum.Add(b)
c.contents.WriteBytes(b)
written := len(b)
c.size += uint16(written)
return written
}
// finish finishes the chunk, updating its chunk size
func (c *writableChunk) finish() {
c.sizeRef.Update(c.size)
}
// A fragmentSender allocates and sends outbound fragments to a target
type fragmentSender interface {
// newFragment allocates a new fragment
newFragment(initial bool, checksum Checksum) (*writableFragment, error)
// flushFragment flushes the given fragment
flushFragment(f *writableFragment) error
// doneSending is called when the fragment receiver is finished sending all fragments.
doneSending()
}
type fragmentingWriterState int
const (
fragmentingWriteStart fragmentingWriterState = iota
fragmentingWriteInArgument
fragmentingWriteInLastArgument
fragmentingWriteWaitingForArgument
fragmentingWriteComplete
)
func (s fragmentingWriterState) isWritingArgument() bool {
return s == fragmentingWriteInArgument || s == fragmentingWriteInLastArgument
}
// A fragmentingWriter writes one or more arguments to an underlying stream,
// breaking them into fragments as needed, and applying an overarching
// checksum. It relies on an underlying fragmentSender, which creates and
// flushes the fragments as needed
type fragmentingWriter struct {
logger Logger
sender fragmentSender
checksum Checksum
curFragment *writableFragment
curChunk *writableChunk
state fragmentingWriterState
err error
}
// newFragmentingWriter creates a new fragmenting writer
func newFragmentingWriter(logger Logger, sender fragmentSender, checksum Checksum) *fragmentingWriter {
return &fragmentingWriter{
logger: logger,
sender: sender,
checksum: checksum,
state: fragmentingWriteStart,
}
}
// ArgWriter returns an ArgWriter to write an argument. The ArgWriter will handle
// fragmentation as needed. Once the argument is written, the ArgWriter must be closed.
func (w *fragmentingWriter) ArgWriter(last bool) (ArgWriter, error) {
if err := w.BeginArgument(last); err != nil {
return nil, err
}
return w, nil
}
// BeginArgument tells the writer that the caller is starting a new argument.
// Must not be called while an existing argument is in place
func (w *fragmentingWriter) BeginArgument(last bool) error {
if w.err != nil {
return w.err
}
switch {
case w.state == fragmentingWriteComplete:
w.err = errComplete
return w.err
case w.state.isWritingArgument():
w.err = errAlreadyWritingArgument
return w.err
}
// If we don't have a fragment, request one
if w.curFragment == nil {
initial := w.state == fragmentingWriteStart
if w.curFragment, w.err = w.sender.newFragment(initial, w.checksum); w.err != nil {
return w.err
}
}
// If there's no room in the current fragment, freak out. This will
// only happen due to an implementation error in the TChannel stack
// itself
if w.curFragment.contents.BytesRemaining() <= chunkHeaderSize {
panic(fmt.Errorf("attempting to begin an argument in a fragment with only %d bytes available",
w.curFragment.contents.BytesRemaining()))
}
w.curChunk = newWritableChunk(w.checksum, w.curFragment.contents)
w.state = fragmentingWriteInArgument
if last {
w.state = fragmentingWriteInLastArgument
}
return nil
}
// Write writes argument data, breaking it into fragments as needed
func (w *fragmentingWriter) Write(b []byte) (int, error) {
if w.err != nil {
return 0, w.err
}
if !w.state.isWritingArgument() {
w.err = errNotWritingArgument
return 0, w.err
}
totalWritten := 0
for {
bytesWritten := w.curChunk.writeAsFits(b)
totalWritten += bytesWritten
if bytesWritten == len(b) {
// The whole thing fit, we're done
return totalWritten, nil
}
// There was more data than fit into the fragment, so flush the current fragment,
// start a new fragment and chunk, and continue writing
if w.err = w.Flush(); w.err != nil {
return totalWritten, w.err
}
b = b[bytesWritten:]
}
}
// Flush flushes the current fragment, and starts a new fragment and chunk.
func (w *fragmentingWriter) Flush() error {
w.curChunk.finish()
w.curFragment.finish(true)
if w.err = w.sender.flushFragment(w.curFragment); w.err != nil {
return w.err
}
if w.curFragment, w.err = w.sender.newFragment(false, w.checksum); w.err != nil {
return w.err
}
w.curChunk = newWritableChunk(w.checksum, w.curFragment.contents)
return nil
}
// Close ends the current argument.
func (w *fragmentingWriter) Close() error {
last := w.state == fragmentingWriteInLastArgument
if w.err != nil {
return w.err
}
if !w.state.isWritingArgument() {
w.err = errNotWritingArgument
return w.err
}
w.curChunk.finish()
// There are three possibilities here:
// 1. There are no more arguments
// flush with more_fragments=false, mark the stream as complete
// 2. There are more arguments, but we can't fit more data into this fragment
// flush with more_fragments=true, start new fragment, write empty chunk to indicate
// the current argument is complete
// 3. There are more arguments, and we can fit more data into this fragment
// update the chunk but leave the current fragment open
if last {
// No more arguments - flush this final fragment and mark ourselves complete
w.state = fragmentingWriteComplete
w.curFragment.finish(false)
w.err = w.sender.flushFragment(w.curFragment)
w.sender.doneSending()
return w.err
}
w.state = fragmentingWriteWaitingForArgument
if w.curFragment.contents.BytesRemaining() > chunkHeaderSize {
// There's enough room in this fragment for the next argument's
// initial chunk, so we're done here
return nil
}
// This fragment is full - flush and prepare for another argument
w.curFragment.finish(true)
if w.err = w.sender.flushFragment(w.curFragment); w.err != nil {
return w.err
}
if w.curFragment, w.err = w.sender.newFragment(false, w.checksum); w.err != nil {
return w.err
}
// Write an empty chunk to indicate this argument has ended
w.curFragment.contents.WriteUint16(0)
return nil
}