Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Log and ignore the error in reading udfs in schema tracker #16630

Merged
merged 1 commit into from
Aug 22, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 7 additions & 0 deletions go/mysql/fakesqldb/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -592,6 +592,13 @@ func (db *DB) AddQueryPattern(queryPattern string, expectedResult *sqltypes.Resu
db.patternData[queryPattern] = exprResult{queryPattern: queryPattern, expr: expr, result: &result}
}

// RemoveQueryPattern removes a query pattern that was previously added.
func (db *DB) RemoveQueryPattern(queryPattern string) {
db.mu.Lock()
defer db.mu.Unlock()
delete(db.patternData, queryPattern)
}

// RejectQueryPattern allows a query pattern to be rejected with an error
func (db *DB) RejectQueryPattern(queryPattern, error string) {
expr := regexp.MustCompile("(?is)^" + queryPattern + "$")
Expand Down
5 changes: 5 additions & 0 deletions go/vt/logutil/throttled.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,11 @@ var (
errorDepth = log.ErrorDepth
)

// GetLastLogTime gets the last log time for the throttled logger.
func (tl *ThrottledLogger) GetLastLogTime() time.Time {
return tl.lastlogTime
}

func (tl *ThrottledLogger) log(logF logFunc, format string, v ...any) {
now := time.Now()

Expand Down
13 changes: 8 additions & 5 deletions go/vt/vttablet/tabletserver/schema/engine.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ import (
"vitess.io/vitess/go/vt/dbconfigs"
"vitess.io/vitess/go/vt/dbconnpool"
"vitess.io/vitess/go/vt/log"
"vitess.io/vitess/go/vt/logutil"
"vitess.io/vitess/go/vt/mysqlctl/tmutils"
"vitess.io/vitess/go/vt/schema"
"vitess.io/vitess/go/vt/servenv"
Expand Down Expand Up @@ -85,9 +86,10 @@ type Engine struct {

historian *historian

conns *connpool.Pool
ticks *timer.Timer
reloadTimeout time.Duration
conns *connpool.Pool
ticks *timer.Timer
reloadTimeout time.Duration
throttledLogger *logutil.ThrottledLogger

// dbCreationFailed is for preventing log spam.
dbCreationFailed bool
Expand All @@ -109,7 +111,8 @@ func NewEngine(env tabletenv.Env) *Engine {
Size: 3,
IdleTimeout: env.Config().OltpReadPool.IdleTimeout,
}),
ticks: timer.NewTimer(reloadTime),
ticks: timer.NewTimer(reloadTime),
throttledLogger: logutil.NewThrottledLogger("schema-tracker", 1*time.Minute),
}
se.schemaCopy = env.Config().SignalWhenSchemaChange
_ = env.Exporter().NewGaugeDurationFunc("SchemaReloadTime", "vttablet keeps table schemas in its own memory and periodically refreshes it from MySQL. This config controls the reload time.", se.ticks.Interval)
Expand Down Expand Up @@ -448,7 +451,7 @@ func (se *Engine) reload(ctx context.Context, includeStats bool) error {

udfsChanged, err := getChangedUserDefinedFunctions(ctx, conn.Conn, shouldUseDatabase)
if err != nil {
return err
se.throttledLogger.Errorf("error in getting changed UDFs: %v", err)
}

rec := concurrency.AllErrorRecorder{}
Expand Down
13 changes: 12 additions & 1 deletion go/vt/vttablet/tabletserver/schema/engine_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1331,7 +1331,8 @@ func TestEngineReload(t *testing.T) {
}

// adding query pattern for udfs
db.AddQueryPattern("SELECT name.*", &sqltypes.Result{})
udfQueryPattern := "SELECT name.*"
db.AddQueryPattern(udfQueryPattern, &sqltypes.Result{})

// Verify the list of created, altered and dropped tables seen.
se.RegisterNotifier("test", func(full map[string]*Table, created, altered, dropped []*Table, _ bool) {
Expand All @@ -1344,6 +1345,16 @@ func TestEngineReload(t *testing.T) {
err = se.reload(context.Background(), false)
require.NoError(t, err)
require.NoError(t, db.LastError())
require.Zero(t, se.throttledLogger.GetLastLogTime())

// Now if we remove the query pattern for udfs, schema engine shouldn't fail.
// Instead we should see a log message with the error.
db.RemoveQueryPattern(udfQueryPattern)
se.UnregisterNotifier("test")
err = se.reload(context.Background(), false)
require.NoError(t, err)
// Check for the udf error being logged. The last log time should be less than a second.
require.Less(t, time.Since(se.throttledLogger.GetLastLogTime()), 1*time.Second)
}

// TestEngineReload tests the vreplication specific GetTableForPos function to ensure
Expand Down
Loading