Skip to content

Commit

Permalink
New unified internal table names format, part 1: identifying and acce…
Browse files Browse the repository at this point in the history
…pting new format tables (#14613)

Signed-off-by: Shlomi Noach <2607934+shlomi-noach@users.noreply.github.com>
  • Loading branch information
shlomi-noach authored Jan 11, 2024
1 parent c69e120 commit edce68b
Show file tree
Hide file tree
Showing 10 changed files with 667 additions and 148 deletions.
213 changes: 125 additions & 88 deletions go/test/endtoend/tabletmanager/tablegc/tablegc_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import (
"flag"
"fmt"
"os"
"strings"
"testing"
"time"

Expand Down Expand Up @@ -165,10 +166,17 @@ func populateTable(t *testing.T) {
}

// tableExists sees that a given table exists in MySQL
func tableExists(tableExpr string) (exists bool, tableName string, err error) {
query := `select table_name as table_name from information_schema.tables where table_schema=database() and table_name like '%a'`
parsed := sqlparser.BuildParsedQuery(query, tableExpr)
rs, err := primaryTablet.VttabletProcess.QueryTablet(parsed.Query, keyspaceName, true)
func tableExists(exprs ...string) (exists bool, tableName string, err error) {
if len(exprs) == 0 {
return false, "", fmt.Errorf("empty table list")
}
var clauses []string
for _, expr := range exprs {
clauses = append(clauses, fmt.Sprintf("table_name like '%s'", expr))
}
clause := strings.Join(clauses, " or ")
query := fmt.Sprintf(`select table_name as table_name from information_schema.tables where table_schema=database() and (%s)`, clause)
rs, err := primaryTablet.VttabletProcess.QueryTablet(query, keyspaceName, true)
if err != nil {
return false, "", err
}
Expand Down Expand Up @@ -236,22 +244,28 @@ func validateAnyState(t *testing.T, expectNumRows int64, states ...schema.TableG
for _, state := range states {
expectTableToExist := true
searchExpr := ""
searchExpr2 := ""
switch state {
case schema.HoldTableGCState:
searchExpr = `\_vt\_HOLD\_%`
searchExpr2 = `\_vt\_hld\_%`
case schema.PurgeTableGCState:
searchExpr = `\_vt\_PURGE\_%`
searchExpr2 = `\_vt\_prg\_%`
case schema.EvacTableGCState:
searchExpr = `\_vt\_EVAC\_%`
searchExpr2 = `\_vt\_evc\_%`
case schema.DropTableGCState:
searchExpr = `\_vt\_DROP\_%`
searchExpr2 = `\_vt\_drp\_%`
case schema.TableDroppedGCState:
searchExpr = `\_vt\_%`
searchExpr2 = `\_vt\_%`
expectTableToExist = false
default:
require.Failf(t, "unknown state", "%v", state)
}
exists, tableName, err := tableExists(searchExpr)
exists, tableName, err := tableExists(searchExpr, searchExpr2)
require.NoError(t, err)

var foundRows int64
Expand Down Expand Up @@ -304,108 +318,131 @@ func TestPopulateTable(t *testing.T) {
validateTableDoesNotExist(t, "no_such_table")
}

func generateRenameStatement(newFormat bool, fromTableName string, state schema.TableGCState, tm time.Time) (statement string, toTableName string, err error) {
if newFormat {
return schema.GenerateRenameStatementNewFormat(fromTableName, state, tm)
}
return schema.GenerateRenameStatement(fromTableName, state, tm)
}

func TestHold(t *testing.T) {
populateTable(t)
query, tableName, err := schema.GenerateRenameStatement("t1", schema.HoldTableGCState, time.Now().UTC().Add(tableTransitionExpiration))
assert.NoError(t, err)
for _, newNameFormat := range []bool{false, true} {
t.Run(fmt.Sprintf("new format=%t", newNameFormat), func(t *testing.T) {
populateTable(t)
query, tableName, err := generateRenameStatement(newNameFormat, "t1", schema.HoldTableGCState, time.Now().UTC().Add(tableTransitionExpiration))
assert.NoError(t, err)

_, err = primaryTablet.VttabletProcess.QueryTablet(query, keyspaceName, true)
assert.NoError(t, err)
_, err = primaryTablet.VttabletProcess.QueryTablet(query, keyspaceName, true)
assert.NoError(t, err)

validateTableDoesNotExist(t, "t1")
validateTableExists(t, tableName)
validateTableDoesNotExist(t, "t1")
validateTableExists(t, tableName)

time.Sleep(tableTransitionExpiration / 2)
{
// Table was created with +10s timestamp, so it should still exist
validateTableExists(t, tableName)
time.Sleep(tableTransitionExpiration / 2)
{
// Table was created with +10s timestamp, so it should still exist
validateTableExists(t, tableName)

checkTableRows(t, tableName, 1024)
}
checkTableRows(t, tableName, 1024)
}

time.Sleep(tableTransitionExpiration)
// We're now both beyond table's timestamp as well as a tableGC interval
validateTableDoesNotExist(t, tableName)
if fastDropTable {
validateAnyState(t, -1, schema.DropTableGCState, schema.TableDroppedGCState)
} else {
validateAnyState(t, -1, schema.PurgeTableGCState, schema.EvacTableGCState, schema.DropTableGCState, schema.TableDroppedGCState)
time.Sleep(tableTransitionExpiration)
// We're now both beyond table's timestamp as well as a tableGC interval
validateTableDoesNotExist(t, tableName)
if fastDropTable {
validateAnyState(t, -1, schema.DropTableGCState, schema.TableDroppedGCState)
} else {
validateAnyState(t, -1, schema.PurgeTableGCState, schema.EvacTableGCState, schema.DropTableGCState, schema.TableDroppedGCState)
}
})
}
}

func TestEvac(t *testing.T) {
var tableName string
t.Run("setting up EVAC table", func(t *testing.T) {
populateTable(t)
var query string
var err error
query, tableName, err = schema.GenerateRenameStatement("t1", schema.EvacTableGCState, time.Now().UTC().Add(tableTransitionExpiration))
assert.NoError(t, err)

_, err = primaryTablet.VttabletProcess.QueryTablet(query, keyspaceName, true)
assert.NoError(t, err)

validateTableDoesNotExist(t, "t1")
})

t.Run("validating before expiration", func(t *testing.T) {
time.Sleep(tableTransitionExpiration / 2)
// Table was created with +10s timestamp, so it should still exist
if fastDropTable {
// EVAC state is skipped in mysql 8.0.23 and beyond
validateTableDoesNotExist(t, tableName)
} else {
validateTableExists(t, tableName)
checkTableRows(t, tableName, 1024)
}
})

t.Run("validating rows evacuated", func(t *testing.T) {
// We're now both beyond table's timestamp as well as a tableGC interval
validateTableDoesNotExist(t, tableName)
// Table should be renamed as _vt_DROP_... and then dropped!
validateAnyState(t, 0, schema.DropTableGCState, schema.TableDroppedGCState)
})
for _, newNameFormat := range []bool{false, true} {
t.Run(fmt.Sprintf("new format=%t", newNameFormat), func(t *testing.T) {
var tableName string
t.Run("setting up EVAC table", func(t *testing.T) {
populateTable(t)
var query string
var err error
query, tableName, err = generateRenameStatement(newNameFormat, "t1", schema.EvacTableGCState, time.Now().UTC().Add(tableTransitionExpiration))
assert.NoError(t, err)

_, err = primaryTablet.VttabletProcess.QueryTablet(query, keyspaceName, true)
assert.NoError(t, err)

validateTableDoesNotExist(t, "t1")
})

t.Run("validating before expiration", func(t *testing.T) {
time.Sleep(tableTransitionExpiration / 2)
// Table was created with +10s timestamp, so it should still exist
if fastDropTable {
// EVAC state is skipped in mysql 8.0.23 and beyond
validateTableDoesNotExist(t, tableName)
} else {
validateTableExists(t, tableName)
checkTableRows(t, tableName, 1024)
}
})

t.Run("validating rows evacuated", func(t *testing.T) {
// We're now both beyond table's timestamp as well as a tableGC interval
validateTableDoesNotExist(t, tableName)
// Table should be renamed as _vt_DROP_... and then dropped!
validateAnyState(t, 0, schema.DropTableGCState, schema.TableDroppedGCState)
})
})
}
}

func TestDrop(t *testing.T) {
populateTable(t)
query, tableName, err := schema.GenerateRenameStatement("t1", schema.DropTableGCState, time.Now().UTC().Add(tableTransitionExpiration))
assert.NoError(t, err)
for _, newNameFormat := range []bool{false, true} {
t.Run(fmt.Sprintf("new format=%t", newNameFormat), func(t *testing.T) {
populateTable(t)
query, tableName, err := generateRenameStatement(newNameFormat, "t1", schema.DropTableGCState, time.Now().UTC().Add(tableTransitionExpiration))
assert.NoError(t, err)

_, err = primaryTablet.VttabletProcess.QueryTablet(query, keyspaceName, true)
assert.NoError(t, err)
_, err = primaryTablet.VttabletProcess.QueryTablet(query, keyspaceName, true)
assert.NoError(t, err)

validateTableDoesNotExist(t, "t1")
validateTableDoesNotExist(t, "t1")

time.Sleep(tableTransitionExpiration)
time.Sleep(2 * gcCheckInterval)
// We're now both beyond table's timestamp as well as a tableGC interval
validateTableDoesNotExist(t, tableName)
time.Sleep(tableTransitionExpiration)
time.Sleep(2 * gcCheckInterval)
// We're now both beyond table's timestamp as well as a tableGC interval
validateTableDoesNotExist(t, tableName)
})
}
}

func TestPurge(t *testing.T) {
populateTable(t)
query, tableName, err := schema.GenerateRenameStatement("t1", schema.PurgeTableGCState, time.Now().UTC().Add(tableTransitionExpiration))
require.NoError(t, err)

_, err = primaryTablet.VttabletProcess.QueryTablet(query, keyspaceName, true)
require.NoError(t, err)

validateTableDoesNotExist(t, "t1")
if !fastDropTable {
validateTableExists(t, tableName)
checkTableRows(t, tableName, 1024)
}
if !fastDropTable {
time.Sleep(5 * gcPurgeCheckInterval) // wait for table to be purged
}
validateTableDoesNotExist(t, tableName) // whether purged or not, table should at some point transition to next state
if fastDropTable {
// if MySQL supports fast DROP TABLE, TableGC completely skips the PURGE state. Rows are not purged.
validateAnyState(t, 1024, schema.DropTableGCState, schema.TableDroppedGCState)
} else {
validateAnyState(t, 0, schema.EvacTableGCState, schema.DropTableGCState, schema.TableDroppedGCState)
for _, newNameFormat := range []bool{false, true} {
t.Run(fmt.Sprintf("new format=%t", newNameFormat), func(t *testing.T) {
populateTable(t)
query, tableName, err := generateRenameStatement(newNameFormat, "t1", schema.PurgeTableGCState, time.Now().UTC().Add(tableTransitionExpiration))
require.NoError(t, err)

_, err = primaryTablet.VttabletProcess.QueryTablet(query, keyspaceName, true)
require.NoError(t, err)

validateTableDoesNotExist(t, "t1")
if !fastDropTable {
validateTableExists(t, tableName)
checkTableRows(t, tableName, 1024)
}
if !fastDropTable {
time.Sleep(5 * gcPurgeCheckInterval) // wait for table to be purged
}
validateTableDoesNotExist(t, tableName) // whether purged or not, table should at some point transition to next state
if fastDropTable {
// if MySQL supports fast DROP TABLE, TableGC completely skips the PURGE state. Rows are not purged.
validateAnyState(t, 1024, schema.DropTableGCState, schema.TableDroppedGCState)
} else {
validateAnyState(t, 0, schema.EvacTableGCState, schema.DropTableGCState, schema.TableDroppedGCState)
}
})
}
}

Expand Down
31 changes: 31 additions & 0 deletions go/vt/schema/name.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ limitations under the License.
package schema

import (
"regexp"
"strings"
"time"

Expand All @@ -27,6 +28,15 @@ const (
readableTimeFormat = "20060102150405"
)

const (
InternalTableNameExpression string = `^_vt_([a-zA-Z0-9]{3})_([0-f]{32})_([0-9]{14})_$`
)

var (
// internalTableNameRegexp parses new intrnal table name format, e.g. _vt_hld_6ace8bcef73211ea87e9f875a4d24e90_20200915120410_
internalTableNameRegexp = regexp.MustCompile(InternalTableNameExpression)
)

// CreateUUIDWithDelimiter creates a globally unique ID, with a given delimiter
// example results:
// - 1876a01a-354d-11eb-9a79-f8e4e33000bb (delimiter = "-")
Expand Down Expand Up @@ -61,6 +71,9 @@ func ToReadableTimestamp(t time.Time) string {
// - Table GC (renamed before drop)
// Apps such as VStreamer may choose to ignore such tables.
func IsInternalOperationTableName(tableName string) bool {
if internalTableNameRegexp.MatchString(tableName) {
return true
}
if IsGCTableName(tableName) {
return true
}
Expand All @@ -69,3 +82,21 @@ func IsInternalOperationTableName(tableName string) bool {
}
return false
}

// AnalyzeInternalTableName analyzes a table name, and assumign it's a vitess internal table name, extracts
// the hint, uuid and time out of the name.
// An internal table name can be e.g. `_vt_hld_6ace8bcef73211ea87e9f875a4d24e90_20200915120410_`, analyzed like so:
// - hint is `hld`
// - UUID is `6ace8bcef73211ea87e9f875a4d24e90`
// - Time is 2020-09-15 12:04:10
func AnalyzeInternalTableName(tableName string) (isInternalTable bool, hint string, uuid string, t time.Time, err error) {
submatch := internalTableNameRegexp.FindStringSubmatch(tableName)
if len(submatch) == 0 {
return false, hint, uuid, t, nil
}
t, err = time.Parse(readableTimeFormat, submatch[3])
if err != nil {
return false, hint, uuid, t, err
}
return true, submatch[1], submatch[2], t, nil
}
Loading

0 comments on commit edce68b

Please sign in to comment.