From a0f8d667ed57f83f900beb38ee39b95fc00b1712 Mon Sep 17 00:00:00 2001 From: Arthur Schreiber Date: Thu, 9 Nov 2023 09:59:51 +0000 Subject: [PATCH] Remove `setDeadline`. Signed-off-by: Arthur Schreiber --- .../vttablet/tabletserver/connpool/dbconn.go | 107 +++++++++--------- 1 file changed, 53 insertions(+), 54 deletions(-) diff --git a/go/vt/vttablet/tabletserver/connpool/dbconn.go b/go/vt/vttablet/tabletserver/connpool/dbconn.go index 3d3f866b180..ab5b69535d4 100644 --- a/go/vt/vttablet/tabletserver/connpool/dbconn.go +++ b/go/vt/vttablet/tabletserver/connpool/dbconn.go @@ -163,17 +163,38 @@ func (dbc *Conn) execOnce(ctx context.Context, query string, maxrows int, wantfi defer dbc.stats.MySQLTimings.Record("Exec", time.Now()) - done, wg := dbc.setDeadline(ctx) - qr, err := dbc.conn.ExecuteFetch(query, maxrows, wantfields) + resultChan := make(chan *sqltypes.Result, 1) + errChan := make(chan error, 1) + + startTime := time.Now() + go func() { + result, err := dbc.conn.ExecuteFetch(query, maxrows, wantfields) + if err != nil { + errChan <- err + } else { + resultChan <- result + } + }() - if done != nil { - close(done) - wg.Wait() + var err error + var result *sqltypes.Result + + select { + case <-ctx.Done(): + killCtx, cancel := context.WithTimeout(context.Background(), 5*time.Second) + defer cancel() + + dbc.KillWithContext(killCtx, ctx.Err().Error(), time.Since(startTime)) + return nil, dbc.Err() + case err = <-errChan: + case result = <-resultChan: } - if dbcerr := dbc.Err(); dbcerr != nil { - return nil, dbcerr + + if dbcErr := dbc.Err(); dbcErr != nil { + return nil, dbcErr } - return qr, err + + return result, err } // ExecOnce executes the specified query, but does not retry on connection errors. @@ -254,16 +275,29 @@ func (dbc *Conn) streamOnce(ctx context.Context, query string, callback func(*sq dbc.current.Store(query) defer dbc.current.Store("") - done, wg := dbc.setDeadline(ctx) - err := dbc.conn.ExecuteStreamFetch(query, callback, alloc, streamBufferSize) + errChan := make(chan error, 1) + startTime := time.Now() + + go func() { + errChan <- dbc.conn.ExecuteStreamFetch(query, callback, alloc, streamBufferSize) + }() - if done != nil { - close(done) - wg.Wait() + var err error + + select { + case <-ctx.Done(): + killCtx, cancel := context.WithTimeout(context.Background(), 5*time.Second) + defer cancel() + + dbc.KillWithContext(killCtx, ctx.Err().Error(), time.Since(startTime)) + return dbc.Err() + case err = <-errChan: } - if dbcerr := dbc.Err(); dbcerr != nil { - return dbcerr + + if dbcErr := dbc.Err(); dbcErr != nil { + return dbcErr } + return err } @@ -409,6 +443,10 @@ func (dbc *Conn) KillWithContext(ctx context.Context, reason string, elapsed tim select { case <-ctx.Done(): killConn.Close() + + dbc.stats.InternalErrors.Add("HungQuery", 1) + log.Warningf("Query may be hung: %s", dbc.CurrentForLogging()) + return context.Cause(ctx) case err := <-errChan: log.Errorf("Could not kill query ID %v %s: %v", dbc.conn.ID(), dbc.CurrentForLogging(), err) @@ -462,45 +500,6 @@ func (dbc *Conn) Reconnect(ctx context.Context) error { return nil } -// setDeadline starts a goroutine that will kill the currently executing query -// if the deadline is exceeded. It returns a channel and a waitgroup. After the -// query is done executing, the caller is required to close the done channel -// and wait for the waitgroup to make sure that the necessary cleanup is done. -func (dbc *Conn) setDeadline(ctx context.Context) (chan bool, *sync.WaitGroup) { - if ctx.Done() == nil { - return nil, nil - } - done := make(chan bool) - var wg sync.WaitGroup - wg.Add(1) - go func() { - defer wg.Done() - startTime := time.Now() - select { - case <-ctx.Done(): - dbc.Kill(ctx.Err().Error(), time.Since(startTime)) - case <-done: - return - } - elapsed := time.Since(startTime) - - // Give 2x the elapsed time and some buffer as grace period - // for the query to get killed. - tmr2 := time.NewTimer(2*elapsed + 5*time.Second) - defer tmr2.Stop() - select { - case <-tmr2.C: - dbc.stats.InternalErrors.Add("HungQuery", 1) - log.Warningf("Query may be hung: %s", dbc.CurrentForLogging()) - case <-done: - return - } - <-done - log.Warningf("Hung query returned") - }() - return done, &wg -} - // CurrentForLogging applies transformations to the query making it suitable to log. // It applies sanitization rules based on tablet settings and limits the max length of // queries.