@@ -31,10 +31,13 @@ import (
31
31
"time"
32
32
33
33
"vitess.io/vitess/go/vt/sqlparser"
34
+ "vitess.io/vitess/go/vt/vtenv"
34
35
"vitess.io/vitess/go/vt/vterrors"
35
36
"vitess.io/vitess/go/vt/vtgate/vtgateconn"
36
37
37
38
"vitess.io/vitess/go/mysql"
39
+ "vitess.io/vitess/go/mysql/replication"
40
+ "vitess.io/vitess/go/mysql/sqlerror"
38
41
"vitess.io/vitess/go/sqltypes"
39
42
"vitess.io/vitess/go/trace"
40
43
"vitess.io/vitess/go/vt/callerid"
56
59
mysqlAllowClearTextWithoutTLS = flag .Bool ("mysql_allow_clear_text_without_tls" , false , "If set, the server will allow the use of a clear text password over non-SSL connections." )
57
60
mysqlProxyProtocol = flag .Bool ("proxy_protocol" , false , "Enable HAProxy PROXY protocol on MySQL listener socket" )
58
61
mysqlConnBufferPooling = flag .Bool ("mysql_conn_buffer_pooling" , false , "Enable mysql conn buffer pooling." )
62
+ mysqlKeepAlivePeriod = flag .Duration ("mysql-server-keepalive-period" , 0 * time .Second , "TCP period between keep-alives" )
63
+ mysqlServerFlushDelay = flag .Duration ("mysql-server-keepalive-period" , 100 * time .Millisecond , "TCP period between keep-alives" )
59
64
60
65
mysqlServerRequireSecureTransport = flag .Bool ("mysql_server_require_secure_transport" , false , "Reject insecure connections but only if mysql_server_ssl_cert and mysql_server_ssl_key are provided" )
61
66
@@ -80,29 +85,43 @@ var (
80
85
busyConnections int32
81
86
)
82
87
83
- // proxyHandler implements the Listener interface.
88
+ // proxyHandler implements the mysql.Handler interface.
84
89
// It stores the Session in the ClientData of a Connection.
85
90
type proxyHandler struct {
86
- mysql.UnimplementedHandler
87
- mu sync.Mutex
88
-
91
+ env * vtenv.Environment
92
+ mu sync.Mutex
89
93
proxy * VTGateProxy
90
94
}
91
95
92
- func newProxyHandler (proxy * VTGateProxy ) * proxyHandler {
96
+ func newProxyHandler (proxy * VTGateProxy ) (* proxyHandler , error ) {
97
+ env , err := vtenv .New (vtenv.Options {
98
+ MySQLServerVersion : servenv .MySQLServerVersion (),
99
+ TruncateUILen : servenv .TruncateUILen ,
100
+ TruncateErrLen : servenv .TruncateErrLen ,
101
+ })
102
+ if err != nil {
103
+ return nil , fmt .Errorf ("unable to initialize env: %v" , err )
104
+ }
105
+
93
106
return & proxyHandler {
107
+ env : env ,
94
108
proxy : proxy ,
95
- }
109
+ }, nil
96
110
}
97
111
112
+ // NewConnection is called when a connection is created.
113
+ // It is not established yet. The handler can decide to
114
+ // set StatusFlags that will be returned by the handshake methods.
115
+ // In particular, ServerStatusAutocommit might be set.
98
116
func (ph * proxyHandler ) NewConnection (c * mysql.Conn ) {
99
117
}
100
118
101
- func ( ph * proxyHandler ) ComResetConnection ( c * mysql. Conn ) {
102
- ctx := context . Background ()
103
- ph . closeSession ( ctx , c )
119
+ // ConnectionReady is called after the connection handshake, but
120
+ // before we begin to process commands.
121
+ func ( ph * proxyHandler ) ConnectionReady ( c * mysql. Conn ) {
104
122
}
105
123
124
+ // ConnectionClosed is called when a connection is closed.
106
125
func (ph * proxyHandler ) ConnectionClosed (c * mysql.Conn ) {
107
126
// Rollback if there is an ongoing transaction. Ignore error.
108
127
defer func () {
@@ -155,6 +174,10 @@ func startSpan(ctx context.Context, query, label string) (trace.Span, context.Co
155
174
return startSpanTestable (ctx , query , label , trace .NewSpan , trace .NewFromString )
156
175
}
157
176
177
+ // ComQuery is called when a connection receives a query.
178
+ // Note the contents of the query slice may change after
179
+ // the first call to callback. So the Handler should not
180
+ // hang on to the byte slice.
158
181
func (ph * proxyHandler ) ComQuery (c * mysql.Conn , query string , callback func (* sqltypes.Result ) error ) error {
159
182
ctx := context .Background ()
160
183
var cancel context.CancelFunc
@@ -198,12 +221,12 @@ func (ph *proxyHandler) ComQuery(c *mysql.Conn, query string, callback func(*sql
198
221
199
222
if session .SessionPb ().Options .Workload == querypb .ExecuteOptions_OLAP {
200
223
err := ph .proxy .StreamExecute (ctx , session , query , make (map [string ]* querypb.BindVariable ), callback )
201
- return mysql .NewSQLErrorFromError (err )
224
+ return sqlerror .NewSQLErrorFromError (err )
202
225
}
203
226
204
227
result , err := ph .proxy .Execute (ctx , session , query , make (map [string ]* querypb.BindVariable ))
205
228
206
- if err := mysql .NewSQLErrorFromError (err ); err != nil {
229
+ if err := sqlerror .NewSQLErrorFromError (err ); err != nil {
207
230
return err
208
231
}
209
232
fillInTxStatusFlags (c , session )
@@ -223,7 +246,8 @@ func fillInTxStatusFlags(c *mysql.Conn, session *vtgateconn.VTGateSession) {
223
246
}
224
247
}
225
248
226
- // ComPrepare is the handler for command prepare.
249
+ // ComPrepare is called when a connection receives a prepared
250
+ // statement query.
227
251
func (ph * proxyHandler ) ComPrepare (c * mysql.Conn , query string , bindVars map [string ]* querypb.BindVariable ) ([]* querypb.Field , error ) {
228
252
var ctx context.Context
229
253
var cancel context.CancelFunc
@@ -262,13 +286,15 @@ func (ph *proxyHandler) ComPrepare(c *mysql.Conn, query string, bindVars map[str
262
286
}(session )
263
287
264
288
_ , fld , err := ph .proxy .Prepare (ctx , session , query , bindVars )
265
- err = mysql .NewSQLErrorFromError (err )
289
+ err = sqlerror .NewSQLErrorFromError (err )
266
290
if err != nil {
267
291
return nil , err
268
292
}
269
293
return fld , nil
270
294
}
271
295
296
+ // ComStmtExecute is called when a connection receives a statement
297
+ // execute query.
272
298
func (ph * proxyHandler ) ComStmtExecute (c * mysql.Conn , prepare * mysql.PrepareData , callback func (* sqltypes.Result ) error ) error {
273
299
var ctx context.Context
274
300
var cancel context.CancelFunc
@@ -308,18 +334,38 @@ func (ph *proxyHandler) ComStmtExecute(c *mysql.Conn, prepare *mysql.PrepareData
308
334
309
335
if session .SessionPb ().Options .Workload == querypb .ExecuteOptions_OLAP {
310
336
err := ph .proxy .StreamExecute (ctx , session , prepare .PrepareStmt , prepare .BindVars , callback )
311
- return mysql .NewSQLErrorFromError (err )
337
+ return sqlerror .NewSQLErrorFromError (err )
312
338
}
313
339
314
340
qr , err := ph .proxy .Execute (ctx , session , prepare .PrepareStmt , prepare .BindVars )
315
341
if err != nil {
316
- return mysql .NewSQLErrorFromError (err )
342
+ return sqlerror .NewSQLErrorFromError (err )
317
343
}
318
344
fillInTxStatusFlags (c , session )
319
345
320
346
return callback (qr )
321
347
}
322
348
349
+ // ComRegisterReplica is called when a connection receives a ComRegisterReplica request
350
+ func (ph * proxyHandler ) ComRegisterReplica (c * mysql.Conn , replicaHost string , replicaPort uint16 , replicaUser string , replicaPassword string ) error {
351
+ return vterrors .New (vtrpcpb .Code_UNIMPLEMENTED , "ComRegisterReplica" )
352
+ }
353
+
354
+ // ComBinlogDump is called when a connection receives a ComBinlogDump request
355
+ func (ph * proxyHandler ) ComBinlogDump (c * mysql.Conn , logFile string , binlogPos uint32 ) error {
356
+ return vterrors .New (vtrpcpb .Code_UNIMPLEMENTED , "ComBinlogDump" )
357
+ }
358
+
359
+ // ComBinlogDumpGTID is part of the mysql.Handler interface.
360
+ func (ph * proxyHandler ) ComBinlogDumpGTID (c * mysql.Conn , logFile string , logPos uint64 , gtidSet replication.GTIDSet ) error {
361
+ return vterrors .New (vtrpcpb .Code_UNIMPLEMENTED , "ComBinlogDumpGTID" )
362
+ }
363
+
364
+ // WarningCount is called at the end of each query to obtain
365
+ // the value to be returned to the client in the EOF packet.
366
+ // Note that this will be called either in the context of the
367
+ // ComQuery callback if the result does not contain any fields,
368
+ // or after the last ComQuery call completes.
323
369
func (ph * proxyHandler ) WarningCount (c * mysql.Conn ) uint16 {
324
370
session , _ := c .ClientData .(* vtgateconn.VTGateSession )
325
371
if session == nil {
@@ -329,9 +375,13 @@ func (ph *proxyHandler) WarningCount(c *mysql.Conn) uint16 {
329
375
return uint16 (len (session .SessionPb ().GetWarnings ()))
330
376
}
331
377
332
- // ComBinlogDumpGTID is part of the mysql.Handler interface.
333
- func (ph * proxyHandler ) ComBinlogDumpGTID (c * mysql.Conn , gtidSet mysql.GTIDSet ) error {
334
- return vterrors .New (vtrpcpb .Code_UNIMPLEMENTED , "ComBinlogDumpGTID" )
378
+ func (ph * proxyHandler ) ComResetConnection (c * mysql.Conn ) {
379
+ ctx := context .Background ()
380
+ ph .closeSession (ctx , c )
381
+ }
382
+
383
+ func (ph * proxyHandler ) Env () * vtenv.Environment {
384
+ return ph .env
335
385
}
336
386
337
387
func (ph * proxyHandler ) getSession (ctx context.Context , c * mysql.Conn ) (* vtgateconn.VTGateSession , error ) {
@@ -437,10 +487,13 @@ func initMySQLProtocol() {
437
487
438
488
// Create a Listener.
439
489
var err error
440
- proxyHandle = newProxyHandler (vtGateProxy )
490
+ proxyHandle , err = newProxyHandler (vtGateProxy )
491
+ if err != nil {
492
+ log .Exitf ("newProxyHandler failed: %v" , err )
493
+ }
441
494
if * mysqlServerPort >= 0 {
442
495
log .Infof ("Mysql Server listening on Port %d" , * mysqlServerPort )
443
- mysqlListener , err = mysql .NewListener (* mysqlTCPVersion , net .JoinHostPort (* mysqlServerBindAddress , fmt .Sprintf ("%v" , * mysqlServerPort )), authServer , proxyHandle , * mysqlConnReadTimeout , * mysqlConnWriteTimeout , * mysqlProxyProtocol , * mysqlConnBufferPooling )
496
+ mysqlListener , err = mysql .NewListener (* mysqlTCPVersion , net .JoinHostPort (* mysqlServerBindAddress , fmt .Sprintf ("%v" , * mysqlServerPort )), authServer , proxyHandle , * mysqlConnReadTimeout , * mysqlConnWriteTimeout , * mysqlProxyProtocol , * mysqlConnBufferPooling , * mysqlKeepAlivePeriod , * mysqlServerFlushDelay )
444
497
if err != nil {
445
498
log .Exitf ("mysql.NewListener failed: %v" , err )
446
499
}
0 commit comments