diff --git a/go/test/endtoend/vtgate/misc_test.go b/go/test/endtoend/vtgate/misc_test.go index f0c063b1caf..d3eaf8479e0 100644 --- a/go/test/endtoend/vtgate/misc_test.go +++ b/go/test/endtoend/vtgate/misc_test.go @@ -31,6 +31,16 @@ import ( "github.com/stretchr/testify/require" ) +func TestInsertOnDuplicateKey(t *testing.T) { + conn, closer := start(t) + defer closer() + + utils.Exec(t, conn, "insert into t11(id, sharding_key, col1, col2, col3) values(1, 2, 'a', 1, 2)") + utils.Exec(t, conn, "insert into t11(id, sharding_key, col1, col2, col3) values(1, 2, 'a', 1, 2) on duplicate key update id=10;") + utils.AssertMatches(t, conn, "select id, sharding_key from t11 where id=10", "[[INT64(10) INT64(2)]]") + +} + func TestInsertNeg(t *testing.T) { conn, closer := start(t) defer closer() diff --git a/go/test/endtoend/vtgate/schema.sql b/go/test/endtoend/vtgate/schema.sql index a883a26519f..4c9ed46fe9a 100644 --- a/go/test/endtoend/vtgate/schema.sql +++ b/go/test/endtoend/vtgate/schema.sql @@ -155,3 +155,13 @@ create table t10_id_to_keyspace_id_idx keyspace_id varbinary(10), primary key (id) ) Engine = InnoDB; + +create table t11 +( + id bigint, + sharding_key bigint, + col1 varchar(50), + col2 int, + col3 int, + primary key (id) +) Engine = InnoDB; \ No newline at end of file diff --git a/go/test/endtoend/vtgate/vschema.json b/go/test/endtoend/vtgate/vschema.json index 8d16beec2a6..07b6e76550f 100644 --- a/go/test/endtoend/vtgate/vschema.json +++ b/go/test/endtoend/vtgate/vschema.json @@ -1,11 +1,10 @@ - { "sharded": true, "vindexes": { - "unicode_loose_xxhash" : { + "unicode_loose_xxhash": { "type": "unicode_loose_xxhash" }, - "unicode_loose_md5" : { + "unicode_loose_md5": { "type": "unicode_loose_md5" }, "hash": { @@ -159,7 +158,10 @@ "name": "hash" }, { - "columns": ["id2", "id1"], + "columns": [ + "id2", + "id1" + ], "name": "t4_id2_vdx" } ] @@ -179,7 +181,10 @@ "name": "hash" }, { - "columns": ["id2", "id1"], + "columns": [ + "id2", + "id1" + ], "name": "t6_id2_vdx" } ] @@ -301,6 +306,14 @@ "name": "hash" } ] + }, + "t11": { + "column_vindexes": [ + { + "column": "sharding_key", + "name": "hash" + } + ] } } } diff --git a/go/vt/vtgate/engine/cached_size.go b/go/vt/vtgate/engine/cached_size.go index 807c7604412..ee84317bc00 100644 --- a/go/vt/vtgate/engine/cached_size.go +++ b/go/vt/vtgate/engine/cached_size.go @@ -386,7 +386,7 @@ func (cached *Insert) CachedSize(alloc bool) int64 { } size := int64(0) if alloc { - size += int64(192) + size += int64(208) } // field InsertCommon vitess.io/vitess/go/vt/vtgate/engine.InsertCommon size += cached.InsertCommon.CachedSize(false) @@ -433,7 +433,7 @@ func (cached *InsertCommon) CachedSize(alloc bool) int64 { } size := int64(0) if alloc { - size += int64(128) + size += int64(144) } // field Keyspace *vitess.io/vitess/go/vt/vtgate/vindexes.Keyspace size += cached.Keyspace.CachedSize(true) @@ -450,8 +450,13 @@ func (cached *InsertCommon) CachedSize(alloc bool) int64 { } // field Prefix string size += hack.RuntimeAllocSize(int64(len(cached.Prefix))) - // field Suffix string - size += hack.RuntimeAllocSize(int64(len(cached.Suffix))) + // field Suffix vitess.io/vitess/go/vt/sqlparser.OnDup + { + size += hack.RuntimeAllocSize(int64(cap(cached.Suffix)) * int64(8)) + for _, elem := range cached.Suffix { + size += elem.CachedSize(true) + } + } return size } func (cached *InsertSelect) CachedSize(alloc bool) int64 { diff --git a/go/vt/vtgate/engine/insert.go b/go/vt/vtgate/engine/insert.go index 46043413732..a1d3a462c1a 100644 --- a/go/vt/vtgate/engine/insert.go +++ b/go/vt/vtgate/engine/insert.go @@ -78,7 +78,7 @@ func newInsert( table *vindexes.Table, prefix string, mid sqlparser.Values, - suffix string, + suffix sqlparser.OnDup, ) *Insert { ins := &Insert{ InsertCommon: InsertCommon{ @@ -265,25 +265,30 @@ func (ins *Insert) getInsertShardedQueries( for _, indexValue := range indexesPerRss[i] { index, _ := strconv.ParseInt(string(indexValue.Value), 0, 64) if keyspaceIDs[index] != nil { + walkFunc := func(node sqlparser.SQLNode) (kontinue bool, err error) { + if arg, ok := node.(*sqlparser.Argument); ok { + bv, exists := bindVars[arg.Name] + if !exists { + return false, vterrors.VT03026(arg.Name) + } + shardBindVars[arg.Name] = bv + } + return true, nil + } mids = append(mids, sqlparser.String(ins.Mid[index])) for _, expr := range ins.Mid[index] { - err = sqlparser.Walk(func(node sqlparser.SQLNode) (kontinue bool, err error) { - if arg, ok := node.(*sqlparser.Argument); ok { - bv, exists := bindVars[arg.Name] - if !exists { - return false, vterrors.VT03026(arg.Name) - } - shardBindVars[arg.Name] = bv - } - return true, nil - }, expr, nil) + err = sqlparser.Walk(walkFunc, expr, nil) if err != nil { return nil, nil, err } } + err = sqlparser.Walk(walkFunc, ins.Suffix, nil) + if err != nil { + return nil, nil, err + } } } - rewritten := ins.Prefix + strings.Join(mids, ",") + ins.Suffix + rewritten := ins.Prefix + strings.Join(mids, ",") + sqlparser.String(ins.Suffix) queries[i] = &querypb.BoundQuery{ Sql: rewritten, BindVariables: shardBindVars, diff --git a/go/vt/vtgate/engine/insert_common.go b/go/vt/vtgate/engine/insert_common.go index d0a14feec26..64b56e35d4e 100644 --- a/go/vt/vtgate/engine/insert_common.go +++ b/go/vt/vtgate/engine/insert_common.go @@ -23,6 +23,8 @@ import ( "strconv" "strings" + "vitess.io/vitess/go/vt/sqlparser" + "vitess.io/vitess/go/sqltypes" "vitess.io/vitess/go/vt/key" querypb "vitess.io/vitess/go/vt/proto/query" @@ -73,7 +75,7 @@ type ( // Prefix, Suffix are for sharded insert plans. Prefix string - Suffix string + Suffix sqlparser.OnDup // Insert needs tx handling txNeeded diff --git a/go/vt/vtgate/engine/insert_select.go b/go/vt/vtgate/engine/insert_select.go index d36176922dc..88767420508 100644 --- a/go/vt/vtgate/engine/insert_select.go +++ b/go/vt/vtgate/engine/insert_select.go @@ -56,7 +56,7 @@ func newInsertSelect( keyspace *vindexes.Keyspace, table *vindexes.Table, prefix string, - suffix string, + suffix sqlparser.OnDup, vv [][]int, input Primitive, ) *InsertSelect { @@ -172,7 +172,7 @@ func (ins *InsertSelect) getInsertUnshardedQuery(rows []sqltypes.Row, bindVars m } mids = append(mids, row) } - return ins.Prefix + sqlparser.String(mids) + ins.Suffix + return ins.Prefix + sqlparser.String(mids) + sqlparser.String(ins.Suffix) } func (ins *InsertSelect) insertIntoShardedTable( @@ -270,7 +270,7 @@ func (ins *InsertSelect) getInsertShardedQueries( mids = append(mids, row) } } - rewritten := ins.Prefix + sqlparser.String(mids) + ins.Suffix + rewritten := ins.Prefix + sqlparser.String(mids) + sqlparser.String(ins.Suffix) queries[i] = &querypb.BoundQuery{ Sql: rewritten, BindVariables: bvs, diff --git a/go/vt/vtgate/engine/insert_test.go b/go/vt/vtgate/engine/insert_test.go index 4ee8431f083..217492a529f 100644 --- a/go/vt/vtgate/engine/insert_test.go +++ b/go/vt/vtgate/engine/insert_test.go @@ -213,7 +213,7 @@ func TestInsertShardedSimple(t *testing.T) { sqlparser.Values{ {&sqlparser.Argument{Name: "_id_0", Type: sqltypes.Int64}}, }, - " suffix", + nil, ) vc := newDMLTestVCursor("-20", "20-") vc.shardForKsid = []string{"20-", "-20", "20-"} @@ -227,7 +227,7 @@ func TestInsertShardedSimple(t *testing.T) { `ResolveDestinations sharded [value:"0"] Destinations:DestinationKeyspaceID(166b40b44aba4bd6)`, // Row 2 will go to -20, rows 1 & 3 will go to 20- `ExecuteMultiShard ` + - `sharded.20-: prefix(:_id_0 /* INT64 */) suffix {_id_0: type:INT64 value:"1"} ` + + `sharded.20-: prefix(:_id_0 /* INT64 */) {_id_0: type:INT64 value:"1"} ` + `true true`, }) @@ -252,7 +252,7 @@ func TestInsertShardedSimple(t *testing.T) { {&sqlparser.Argument{Name: "_id_1", Type: sqltypes.Int64}}, {&sqlparser.Argument{Name: "_id_2", Type: sqltypes.Int64}}, }, - " suffix", + nil, ) vc = newDMLTestVCursor("-20", "20-") vc.shardForKsid = []string{"20-", "-20", "20-"} @@ -266,8 +266,8 @@ func TestInsertShardedSimple(t *testing.T) { `ResolveDestinations sharded [value:"0" value:"1" value:"2"] Destinations:DestinationKeyspaceID(166b40b44aba4bd6),DestinationKeyspaceID(06e7ea22ce92708f),DestinationKeyspaceID(4eb190c9a2fa169c)`, // Row 2 will go to -20, rows 1 & 3 will go to 20- `ExecuteMultiShard ` + - `sharded.20-: prefix(:_id_0 /* INT64 */),(:_id_2 /* INT64 */) suffix {_id_0: type:INT64 value:"1" _id_2: type:INT64 value:"3"} ` + - `sharded.-20: prefix(:_id_1 /* INT64 */) suffix {_id_1: type:INT64 value:"2"} ` + + `sharded.20-: prefix(:_id_0 /* INT64 */),(:_id_2 /* INT64 */) {_id_0: type:INT64 value:"1" _id_2: type:INT64 value:"3"} ` + + `sharded.-20: prefix(:_id_1 /* INT64 */) {_id_1: type:INT64 value:"2"} ` + `true false`, }) @@ -293,7 +293,7 @@ func TestInsertShardedSimple(t *testing.T) { {&sqlparser.Argument{Name: "_id_1", Type: sqltypes.Int64}}, {&sqlparser.Argument{Name: "_id_2", Type: sqltypes.Int64}}, }, - " suffix", + nil, ) ins.MultiShardAutocommit = true @@ -309,8 +309,156 @@ func TestInsertShardedSimple(t *testing.T) { `ResolveDestinations sharded [value:"0" value:"1" value:"2"] Destinations:DestinationKeyspaceID(166b40b44aba4bd6),DestinationKeyspaceID(06e7ea22ce92708f),DestinationKeyspaceID(4eb190c9a2fa169c)`, // Row 2 will go to -20, rows 1 & 3 will go to 20- `ExecuteMultiShard ` + - `sharded.20-: prefix(:_id_0 /* INT64 */),(:_id_2 /* INT64 */) suffix {_id_0: type:INT64 value:"1" _id_2: type:INT64 value:"3"} ` + - `sharded.-20: prefix(:_id_1 /* INT64 */) suffix {_id_1: type:INT64 value:"2"} ` + + `sharded.20-: prefix(:_id_0 /* INT64 */),(:_id_2 /* INT64 */) {_id_0: type:INT64 value:"1" _id_2: type:INT64 value:"3"} ` + + `sharded.-20: prefix(:_id_1 /* INT64 */) {_id_1: type:INT64 value:"2"} ` + + `true true`, + }) +} + +func TestInsertShardWithONDuplicateKey(t *testing.T) { + invschema := &vschemapb.SrvVSchema{ + Keyspaces: map[string]*vschemapb.Keyspace{ + "sharded": { + Sharded: true, + Vindexes: map[string]*vschemapb.Vindex{ + "hash": { + Type: "hash", + }, + }, + Tables: map[string]*vschemapb.Table{ + "t1": { + ColumnVindexes: []*vschemapb.ColumnVindex{{ + Name: "hash", + Columns: []string{"id"}, + }}, + }, + }, + }, + }, + } + vs := vindexes.BuildVSchema(invschema) + ks := vs.Keyspaces["sharded"] + + // A single row insert should be autocommitted + ins := newInsert( + InsertSharded, + false, + ks.Keyspace, + [][][]evalengine.Expr{{ + // colVindex columns: id + { + evalengine.NewLiteralInt(1), + }, + }}, + ks.Tables["t1"], + "prefix", + sqlparser.Values{ + {&sqlparser.Argument{Name: "_id_0", Type: sqltypes.Int64}}, + }, + sqlparser.OnDup{ + &sqlparser.UpdateExpr{Name: sqlparser.NewColName("suffix"), Expr: &sqlparser.Argument{Name: "_id_0", Type: sqltypes.Int64}}, + }, + ) + vc := newDMLTestVCursor("-20", "20-") + vc.shardForKsid = []string{"20-", "-20", "20-"} + + _, err := ins.TryExecute(context.Background(), vc, map[string]*querypb.BindVariable{}, false) + if err != nil { + t.Fatal(err) + } + vc.ExpectLog(t, []string{ + // Based on shardForKsid, values returned will be 20-. + `ResolveDestinations sharded [value:"0"] Destinations:DestinationKeyspaceID(166b40b44aba4bd6)`, + // Row 2 will go to -20, rows 1 & 3 will go to 20- + `ExecuteMultiShard ` + + `sharded.20-: prefix(:_id_0 /* INT64 */) on duplicate key update suffix = :_id_0 /* INT64 */ {_id_0: type:INT64 value:"1"} ` + + `true true`, + }) + + // Multiple rows are not autocommitted by default + ins = newInsert( + InsertSharded, + false, + ks.Keyspace, + [][][]evalengine.Expr{{ + // colVindex columns: id + // 3 rows. + { + evalengine.NewLiteralInt(1), + evalengine.NewLiteralInt(2), + evalengine.NewLiteralInt(3), + }, + }}, + ks.Tables["t1"], + "prefix", + sqlparser.Values{ + {&sqlparser.Argument{Name: "_id_0", Type: sqltypes.Int64}}, + {&sqlparser.Argument{Name: "_id_1", Type: sqltypes.Int64}}, + {&sqlparser.Argument{Name: "_id_2", Type: sqltypes.Int64}}, + }, + sqlparser.OnDup{ + &sqlparser.UpdateExpr{Name: sqlparser.NewColName("suffix"), Expr: &sqlparser.Argument{Name: "_id_0", Type: sqltypes.Int64}}, + }, + ) + vc = newDMLTestVCursor("-20", "20-") + vc.shardForKsid = []string{"20-", "-20", "20-"} + + _, err = ins.TryExecute(context.Background(), vc, map[string]*querypb.BindVariable{}, false) + if err != nil { + t.Fatal(err) + } + vc.ExpectLog(t, []string{ + // Based on shardForKsid, values returned will be 20-, -20, 20-. + `ResolveDestinations sharded [value:"0" value:"1" value:"2"] Destinations:DestinationKeyspaceID(166b40b44aba4bd6),DestinationKeyspaceID(06e7ea22ce92708f),DestinationKeyspaceID(4eb190c9a2fa169c)`, + // Row 2 will go to -20, rows 1 & 3 will go to 20- + `ExecuteMultiShard ` + + `sharded.20-: prefix(:_id_0 /* INT64 */),(:_id_2 /* INT64 */) on duplicate key update suffix = :_id_0 /* INT64 */ {_id_0: type:INT64 value:"1" _id_2: type:INT64 value:"3"} ` + + `sharded.-20: prefix(:_id_1 /* INT64 */) on duplicate key update suffix = :_id_0 /* INT64 */ {_id_0: type:INT64 value:"1" _id_1: type:INT64 value:"2"} ` + + `true false`, + }) + + // Optional flag overrides autocommit + ins = newInsert( + InsertSharded, + false, + ks.Keyspace, + [][][]evalengine.Expr{{ + // colVindex columns: id + // 3 rows. + { + evalengine.NewLiteralInt(1), + evalengine.NewLiteralInt(2), + evalengine.NewLiteralInt(3), + }, + }}, + + ks.Tables["t1"], + "prefix", + sqlparser.Values{ + {&sqlparser.Argument{Name: "_id_0", Type: sqltypes.Int64}}, + {&sqlparser.Argument{Name: "_id_1", Type: sqltypes.Int64}}, + {&sqlparser.Argument{Name: "_id_2", Type: sqltypes.Int64}}, + }, + sqlparser.OnDup{ + &sqlparser.UpdateExpr{Name: sqlparser.NewColName("suffix"), Expr: &sqlparser.Argument{Name: "_id_0", Type: sqltypes.Int64}}, + }, + ) + ins.MultiShardAutocommit = true + + vc = newDMLTestVCursor("-20", "20-") + vc.shardForKsid = []string{"20-", "-20", "20-"} + + _, err = ins.TryExecute(context.Background(), vc, map[string]*querypb.BindVariable{}, false) + if err != nil { + t.Fatal(err) + } + vc.ExpectLog(t, []string{ + // Based on shardForKsid, values returned will be 20-, -20, 20-. + `ResolveDestinations sharded [value:"0" value:"1" value:"2"] Destinations:DestinationKeyspaceID(166b40b44aba4bd6),DestinationKeyspaceID(06e7ea22ce92708f),DestinationKeyspaceID(4eb190c9a2fa169c)`, + // Row 2 will go to -20, rows 1 & 3 will go to 20- + `ExecuteMultiShard ` + + `sharded.20-: prefix(:_id_0 /* INT64 */),(:_id_2 /* INT64 */) on duplicate key update suffix = :_id_0 /* INT64 */ {_id_0: type:INT64 value:"1" _id_2: type:INT64 value:"3"} ` + + `sharded.-20: prefix(:_id_1 /* INT64 */) on duplicate key update suffix = :_id_0 /* INT64 */ {_id_0: type:INT64 value:"1" _id_1: type:INT64 value:"2"} ` + `true true`, }) } @@ -360,7 +508,7 @@ func TestInsertShardedFail(t *testing.T) { sqlparser.Values{ {&sqlparser.Argument{Name: "_id_0", Type: sqltypes.Int64}}, }, - " suffix", + nil, ) vc := &loggingVCursor{} @@ -414,7 +562,7 @@ func TestInsertShardedGenerate(t *testing.T) { {&sqlparser.Argument{Name: "__seq1", Type: sqltypes.Int64}}, {&sqlparser.Argument{Name: "__seq2", Type: sqltypes.Int64}}, }, - " suffix", + nil, ) ins.Generate = &Generate{ @@ -454,9 +602,9 @@ func TestInsertShardedGenerate(t *testing.T) { `ResolveDestinations sharded [value:"0" value:"1" value:"2"] Destinations:DestinationKeyspaceID(166b40b44aba4bd6),DestinationKeyspaceID(06e7ea22ce92708f),DestinationKeyspaceID(4eb190c9a2fa169c)`, // Row 2 will go to -20, rows 1 & 3 will go to 20- `ExecuteMultiShard ` + - `sharded.20-: prefix(:__seq0 /* INT64 */),(:__seq2 /* INT64 */) suffix ` + + `sharded.20-: prefix(:__seq0 /* INT64 */),(:__seq2 /* INT64 */) ` + `{__seq0: type:INT64 value:"1" __seq2: type:INT64 value:"3"} ` + - `sharded.-20: prefix(:__seq1 /* INT64 */) suffix ` + + `sharded.-20: prefix(:__seq1 /* INT64 */) ` + `{__seq1: type:INT64 value:"2"} ` + `true false`, }) @@ -552,7 +700,7 @@ func TestInsertShardedOwned(t *testing.T) { {&sqlparser.Argument{Name: "_id_1", Type: sqltypes.Int64}, &sqlparser.Argument{Name: "_c1_1", Type: sqltypes.Int64}, &sqlparser.Argument{Name: "_c2_1", Type: sqltypes.Int64}, &sqlparser.Argument{Name: "_c3_1", Type: sqltypes.Int64}}, {&sqlparser.Argument{Name: "_id_2", Type: sqltypes.Int64}, &sqlparser.Argument{Name: "_c1_2", Type: sqltypes.Int64}, &sqlparser.Argument{Name: "_c2_2", Type: sqltypes.Int64}, &sqlparser.Argument{Name: "_c3_2", Type: sqltypes.Int64}}, }, - " suffix", + nil, ) vc := newDMLTestVCursor("-20", "20-") @@ -574,12 +722,12 @@ func TestInsertShardedOwned(t *testing.T) { `ResolveDestinations sharded [value:"0" value:"1" value:"2"] Destinations:DestinationKeyspaceID(166b40b44aba4bd6),DestinationKeyspaceID(06e7ea22ce92708f),DestinationKeyspaceID(4eb190c9a2fa169c)`, `ExecuteMultiShard ` + `sharded.20-: prefix(:_id_0 /* INT64 */, :_c1_0 /* INT64 */, :_c2_0 /* INT64 */, :_c3_0 /* INT64 */)` + - `,(:_id_2 /* INT64 */, :_c1_2 /* INT64 */, :_c2_2 /* INT64 */, :_c3_2 /* INT64 */) suffix ` + + `,(:_id_2 /* INT64 */, :_c1_2 /* INT64 */, :_c2_2 /* INT64 */, :_c3_2 /* INT64 */) ` + `{_c1_0: type:INT64 value:"4" _c1_2: type:INT64 value:"6" ` + `_c2_0: type:INT64 value:"7" _c2_2: type:INT64 value:"9" ` + `_c3_0: type:INT64 value:"10" _c3_2: type:INT64 value:"12" ` + `_id_0: type:INT64 value:"1" _id_2: type:INT64 value:"3"} ` + - `sharded.-20: prefix(:_id_1 /* INT64 */, :_c1_1 /* INT64 */, :_c2_1 /* INT64 */, :_c3_1 /* INT64 */) suffix ` + + `sharded.-20: prefix(:_id_1 /* INT64 */, :_c1_1 /* INT64 */, :_c2_1 /* INT64 */, :_c3_1 /* INT64 */) ` + `{_c1_1: type:INT64 value:"5" _c2_1: type:INT64 value:"8" _c3_1: type:INT64 value:"11" ` + `_id_1: type:INT64 value:"2"} ` + `true false`, @@ -644,7 +792,7 @@ func TestInsertShardedOwnedWithNull(t *testing.T) { sqlparser.Values{ {&sqlparser.Argument{Name: "_id_0", Type: sqltypes.Int64}, &sqlparser.Argument{Name: "_c3_0", Type: sqltypes.Null}}, }, - " suffix", + nil, ) vc := newDMLTestVCursor("-20", "20-") @@ -656,7 +804,7 @@ func TestInsertShardedOwnedWithNull(t *testing.T) { } vc.ExpectLog(t, []string{ `ResolveDestinations sharded [value:"0"] Destinations:DestinationKeyspaceID(166b40b44aba4bd6)`, - `ExecuteMultiShard sharded.20-: prefix(:_id_0 /* INT64 */, :_c3_0 /* NULL_TYPE */) suffix ` + + `ExecuteMultiShard sharded.20-: prefix(:_id_0 /* INT64 */, :_c3_0 /* NULL_TYPE */) ` + `{_c3_0: _id_0: type:INT64 value:"1"} true true`, }) } @@ -730,7 +878,7 @@ func TestInsertShardedGeo(t *testing.T) { {&sqlparser.Argument{Name: "_region_0", Type: sqltypes.Int64}, &sqlparser.Argument{Name: "_id_0", Type: sqltypes.Int64}}, {&sqlparser.Argument{Name: "_region_1", Type: sqltypes.Int64}, &sqlparser.Argument{Name: "_id_1", Type: sqltypes.Int64}}, }, - " suffix", + nil, ) vc := newDMLTestVCursor("-20", "20-") @@ -745,9 +893,9 @@ func TestInsertShardedGeo(t *testing.T) { `id_0: type:INT64 value:"1" id_1: type:INT64 value:"1" ` + `keyspace_id_0: type:VARBINARY value:"\x01\x16k@\xb4J\xbaK\xd6" keyspace_id_1: type:VARBINARY value:"\xff\x16k@\xb4J\xbaK\xd6" true`, `ResolveDestinations sharded [value:"0" value:"1"] Destinations:DestinationKeyspaceID(01166b40b44aba4bd6),DestinationKeyspaceID(ff166b40b44aba4bd6)`, - `ExecuteMultiShard sharded.20-: prefix(:_region_0 /* INT64 */, :_id_0 /* INT64 */) suffix ` + + `ExecuteMultiShard sharded.20-: prefix(:_region_0 /* INT64 */, :_id_0 /* INT64 */) ` + `{_id_0: type:INT64 value:"1" _region_0: type:INT64 value:"1"} ` + - `sharded.-20: prefix(:_region_1 /* INT64 */, :_id_1 /* INT64 */) suffix ` + + `sharded.-20: prefix(:_region_1 /* INT64 */, :_id_1 /* INT64 */) ` + `{_id_1: type:INT64 value:"1" _region_1: type:INT64 value:"255"} ` + `true false`, }) @@ -854,7 +1002,7 @@ func TestInsertShardedIgnoreOwned(t *testing.T) { {&sqlparser.Argument{Name: "_id_2", Type: sqltypes.Int64}, &sqlparser.Argument{Name: "_c1_2", Type: sqltypes.Int64}, &sqlparser.Argument{Name: "_c2_2", Type: sqltypes.Int64}, &sqlparser.Argument{Name: "_c3_2", Type: sqltypes.Int64}}, {&sqlparser.Argument{Name: "_id_3", Type: sqltypes.Int64}, &sqlparser.Argument{Name: "_c1_3", Type: sqltypes.Int64}, &sqlparser.Argument{Name: "_c2_3", Type: sqltypes.Int64}, &sqlparser.Argument{Name: "_c3_3", Type: sqltypes.Int64}}, }, - " suffix", + nil, ) ksid0Lookup := sqltypes.MakeTestResult( @@ -917,9 +1065,9 @@ func TestInsertShardedIgnoreOwned(t *testing.T) { `ResolveDestinations sharded [value:"0" value:"3"] Destinations:DestinationKeyspaceID(00),DestinationKeyspaceID(00)`, // Bind vars for rows 2 & 3 may be missing because they were not sent. `ExecuteMultiShard ` + - `sharded.20-: prefix(:_id_0 /* INT64 */, :_c1_0 /* INT64 */, :_c2_0 /* INT64 */, :_c3_0 /* INT64 */) suffix ` + + `sharded.20-: prefix(:_id_0 /* INT64 */, :_c1_0 /* INT64 */, :_c2_0 /* INT64 */, :_c3_0 /* INT64 */) ` + `{_c1_0: type:INT64 value:"5" _c2_0: type:INT64 value:"9" _c3_0: type:INT64 value:"13" _id_0: type:INT64 value:"1"} ` + - `sharded.-20: prefix(:_id_3 /* INT64 */, :_c1_3 /* INT64 */, :_c2_3 /* INT64 */, :_c3_3 /* INT64 */) suffix ` + + `sharded.-20: prefix(:_id_3 /* INT64 */, :_c1_3 /* INT64 */, :_c2_3 /* INT64 */, :_c3_3 /* INT64 */) ` + `{_c1_3: type:INT64 value:"8" _c2_3: type:INT64 value:"12" _c3_3: type:INT64 value:"16" _id_3: type:INT64 value:"4"} ` + `true false`, }) @@ -984,7 +1132,7 @@ func TestInsertShardedIgnoreOwnedWithNull(t *testing.T) { sqlparser.Values{ {&sqlparser.Argument{Name: "_id_0", Type: sqltypes.Int64}, &sqlparser.Argument{Name: "_c3_0", Type: sqltypes.Int64}}, }, - " suffix", + nil, ) ksid0 := sqltypes.MakeTestResult( @@ -1009,7 +1157,7 @@ func TestInsertShardedIgnoreOwnedWithNull(t *testing.T) { vc.ExpectLog(t, []string{ `Execute select from from lkp1 where from = :from and toc = :toc from: toc: type:VARBINARY value:"\x16k@\xb4J\xbaK\xd6" false`, `ResolveDestinations sharded [value:"0"] Destinations:DestinationKeyspaceID(166b40b44aba4bd6)`, - `ExecuteMultiShard sharded.-20: prefix(:_id_0 /* INT64 */, :_c3_0 /* INT64 */) suffix ` + + `ExecuteMultiShard sharded.-20: prefix(:_id_0 /* INT64 */, :_c3_0 /* INT64 */) ` + `{_c3_0: _id_0: type:INT64 value:"1"} true true`, }) } @@ -1102,7 +1250,7 @@ func TestInsertShardedUnownedVerify(t *testing.T) { {&sqlparser.Argument{Name: "_id_1", Type: sqltypes.Int64}, &sqlparser.Argument{Name: "_c1_1", Type: sqltypes.Int64}, &sqlparser.Argument{Name: "_c2_1", Type: sqltypes.Int64}, &sqlparser.Argument{Name: "_c3_1", Type: sqltypes.Int64}}, {&sqlparser.Argument{Name: "_id_2", Type: sqltypes.Int64}, &sqlparser.Argument{Name: "_c1_2", Type: sqltypes.Int64}, &sqlparser.Argument{Name: "_c2_2", Type: sqltypes.Int64}, &sqlparser.Argument{Name: "_c3_2", Type: sqltypes.Int64}}, }, - " suffix", + nil, ) // nonemptyResult will cause the lookup verify queries to succeed. @@ -1141,12 +1289,12 @@ func TestInsertShardedUnownedVerify(t *testing.T) { `ResolveDestinations sharded [value:"0" value:"1" value:"2"] Destinations:DestinationKeyspaceID(166b40b44aba4bd6),DestinationKeyspaceID(06e7ea22ce92708f),DestinationKeyspaceID(4eb190c9a2fa169c)`, `ExecuteMultiShard ` + `sharded.20-: prefix(:_id_0 /* INT64 */, :_c1_0 /* INT64 */, :_c2_0 /* INT64 */, :_c3_0 /* INT64 */),` + - `(:_id_2 /* INT64 */, :_c1_2 /* INT64 */, :_c2_2 /* INT64 */, :_c3_2 /* INT64 */) suffix ` + + `(:_id_2 /* INT64 */, :_c1_2 /* INT64 */, :_c2_2 /* INT64 */, :_c3_2 /* INT64 */) ` + `{_c1_0: type:INT64 value:"4" _c1_2: type:INT64 value:"6" ` + `_c2_0: type:INT64 value:"7" _c2_2: type:INT64 value:"9" ` + `_c3_0: type:INT64 value:"10" _c3_2: type:INT64 value:"12" ` + `_id_0: type:INT64 value:"1" _id_2: type:INT64 value:"3"} ` + - `sharded.-20: prefix(:_id_1 /* INT64 */, :_c1_1 /* INT64 */, :_c2_1 /* INT64 */, :_c3_1 /* INT64 */) suffix ` + + `sharded.-20: prefix(:_id_1 /* INT64 */, :_c1_1 /* INT64 */, :_c2_1 /* INT64 */, :_c3_1 /* INT64 */) ` + `{_c1_1: type:INT64 value:"5" _c2_1: type:INT64 value:"8" ` + `_c3_1: type:INT64 value:"11" _id_1: type:INT64 value:"2"} ` + `true false`, @@ -1216,7 +1364,7 @@ func TestInsertShardedIgnoreUnownedVerify(t *testing.T) { {&sqlparser.Argument{Name: "_id_1", Type: sqltypes.Int64}, &sqlparser.Argument{Name: "_c3_1", Type: sqltypes.Int64}, &sqlparser.Argument{Name: "v2", Type: sqltypes.VarChar}}, {&sqlparser.Argument{Name: "_id_2", Type: sqltypes.Int64}, &sqlparser.Argument{Name: "_c3_2", Type: sqltypes.Int64}, &sqlparser.Argument{Name: "v3", Type: sqltypes.VarChar}}, }, - " suffix", + nil, ) // nonemptyResult will cause the lookup verify queries to succeed. @@ -1251,9 +1399,9 @@ func TestInsertShardedIgnoreUnownedVerify(t *testing.T) { // Based on shardForKsid, values returned will be 20-, -20. `ResolveDestinations sharded [value:"0" value:"2"] Destinations:DestinationKeyspaceID(166b40b44aba4bd6),DestinationKeyspaceID(4eb190c9a2fa169c)`, `ExecuteMultiShard ` + - `sharded.20-: prefix(:_id_0 /* INT64 */, :_c3_0 /* INT64 */, :v1 /* VARCHAR */) suffix ` + + `sharded.20-: prefix(:_id_0 /* INT64 */, :_c3_0 /* INT64 */, :v1 /* VARCHAR */) ` + `{_c3_0: type:INT64 value:"10" _id_0: type:INT64 value:"1" v1: type:VARCHAR value:"a"} ` + - `sharded.-20: prefix(:_id_2 /* INT64 */, :_c3_2 /* INT64 */, :v3 /* VARCHAR */) suffix ` + + `sharded.-20: prefix(:_id_2 /* INT64 */, :_c3_2 /* INT64 */, :v3 /* VARCHAR */) ` + `{_c3_2: type:INT64 value:"12" _id_2: type:INT64 value:"3" v3: type:VARCHAR value:"c"} ` + `true false`, }) @@ -1316,7 +1464,7 @@ func TestInsertShardedIgnoreUnownedVerifyFail(t *testing.T) { sqlparser.Values{ {&sqlparser.Argument{Name: "_id_0", Type: sqltypes.Int64}, &sqlparser.Argument{Name: "_c3_0", Type: sqltypes.Int64}}, }, - " suffix", + nil, ) vc := newDMLTestVCursor("-20", "20-") @@ -1413,7 +1561,7 @@ func TestInsertShardedUnownedReverseMap(t *testing.T) { {&sqlparser.Argument{Name: "_id_1", Type: sqltypes.Int64}, &sqlparser.Argument{Name: "_c1_1", Type: sqltypes.Null}, &sqlparser.Argument{Name: "_c2_1", Type: sqltypes.Null}, &sqlparser.Argument{Name: "_c3_1", Type: sqltypes.Null}}, {&sqlparser.Argument{Name: "_id_2", Type: sqltypes.Int64}, &sqlparser.Argument{Name: "_c1_2", Type: sqltypes.Null}, &sqlparser.Argument{Name: "_c2_2", Type: sqltypes.Null}, &sqlparser.Argument{Name: "_c3_2", Type: sqltypes.Null}}, }, - " suffix", + nil, ) // nonemptyResult will cause the lookup verify queries to succeed. @@ -1439,13 +1587,13 @@ func TestInsertShardedUnownedReverseMap(t *testing.T) { `ResolveDestinations sharded [value:"0" value:"1" value:"2"] Destinations:DestinationKeyspaceID(166b40b44aba4bd6),DestinationKeyspaceID(06e7ea22ce92708f),DestinationKeyspaceID(4eb190c9a2fa169c)`, `ExecuteMultiShard sharded.20-: ` + `prefix(:_id_0 /* INT64 */, :_c1_0 /* NULL_TYPE */, :_c2_0 /* NULL_TYPE */, :_c3_0 /* NULL_TYPE */),` + - `(:_id_2 /* INT64 */, :_c1_2 /* NULL_TYPE */, :_c2_2 /* NULL_TYPE */, :_c3_2 /* NULL_TYPE */) suffix ` + + `(:_id_2 /* INT64 */, :_c1_2 /* NULL_TYPE */, :_c2_2 /* NULL_TYPE */, :_c3_2 /* NULL_TYPE */) ` + `{_c1_0: type:UINT64 value:"1" _c1_2: type:UINT64 value:"3" ` + `_c2_0: _c2_2: ` + `_c3_0: type:UINT64 value:"1" _c3_2: type:UINT64 value:"3" ` + `_id_0: type:INT64 value:"1" _id_2: type:INT64 value:"3"} ` + `sharded.-20: ` + - `prefix(:_id_1 /* INT64 */, :_c1_1 /* NULL_TYPE */, :_c2_1 /* NULL_TYPE */, :_c3_1 /* NULL_TYPE */) suffix ` + + `prefix(:_id_1 /* INT64 */, :_c1_1 /* NULL_TYPE */, :_c2_1 /* NULL_TYPE */, :_c3_1 /* NULL_TYPE */) ` + `{_c1_1: type:UINT64 value:"2" _c2_1: _c3_1: type:UINT64 value:"2" _id_1: type:INT64 value:"2"} true false`, }) } @@ -1507,7 +1655,7 @@ func TestInsertShardedUnownedReverseMapSuccess(t *testing.T) { sqlparser.Values{ {&sqlparser.Argument{Name: "_id_0", Type: sqltypes.Int64}, &sqlparser.Argument{Name: "_c3_0", Type: sqltypes.Null}}, }, - " suffix", + nil, ) vc := newDMLTestVCursor("-20", "20-") @@ -1539,7 +1687,7 @@ func TestInsertSelectSimple(t *testing.T) { RoutingParameters: &RoutingParameters{ Opcode: Scatter, Keyspace: ks.Keyspace}} - ins := newInsertSelect(false, ks.Keyspace, ks.Tables["t1"], "prefix ", " suffix", [][]int{{1}}, rb) + ins := newInsertSelect(false, ks.Keyspace, ks.Tables["t1"], "prefix ", nil, [][]int{{1}}, rb) vc := newDMLTestVCursor("-20", "20-") vc.shardForKsid = []string{"20-", "-20", "20-"} @@ -1563,10 +1711,10 @@ func TestInsertSelectSimple(t *testing.T) { // two rows go to the 20- shard, and one row go to the -20 shard `ExecuteMultiShard ` + - `sharded.20-: prefix values (:_c0_0, :_c0_1), (:_c2_0, :_c2_1) suffix ` + + `sharded.20-: prefix values (:_c0_0, :_c0_1), (:_c2_0, :_c2_1) ` + `{_c0_0: type:VARCHAR value:"a" _c0_1: type:INT64 value:"1"` + ` _c2_0: type:VARCHAR value:"b" _c2_1: type:INT64 value:"2"} ` + - `sharded.-20: prefix values (:_c1_0, :_c1_1) suffix` + + `sharded.-20: prefix values (:_c1_0, :_c1_1)` + ` {_c1_0: type:VARCHAR value:"a" _c1_1: type:INT64 value:"3"} true false`}) vc.Rewind() @@ -1583,10 +1731,10 @@ func TestInsertSelectSimple(t *testing.T) { // two rows go to the 20- shard, and one row go to the -20 shard `ExecuteMultiShard ` + - `sharded.20-: prefix values (:_c0_0, :_c0_1), (:_c2_0, :_c2_1) suffix ` + + `sharded.20-: prefix values (:_c0_0, :_c0_1), (:_c2_0, :_c2_1) ` + `{_c0_0: type:VARCHAR value:"a" _c0_1: type:INT64 value:"1"` + ` _c2_0: type:VARCHAR value:"b" _c2_1: type:INT64 value:"2"} ` + - `sharded.-20: prefix values (:_c1_0, :_c1_1) suffix` + + `sharded.-20: prefix values (:_c1_0, :_c1_1)` + ` {_c1_0: type:VARCHAR value:"a" _c1_1: type:INT64 value:"3"} true false`}) } @@ -1627,7 +1775,7 @@ func TestInsertSelectOwned(t *testing.T) { ks.Keyspace, ks.Tables["t1"], "prefix ", - " suffix", + nil, [][]int{ {1}, // The primary vindex has a single column as sharding key {0}}, // the onecol vindex uses the 'name' column @@ -1662,11 +1810,11 @@ func TestInsertSelectOwned(t *testing.T) { // insert values into the main table `ExecuteMultiShard ` + // first we insert two rows on the 20- shard - `sharded.20-: prefix values (:_c0_0, :_c0_1), (:_c2_0, :_c2_1) suffix ` + + `sharded.20-: prefix values (:_c0_0, :_c0_1), (:_c2_0, :_c2_1) ` + `{_c0_0: type:VARCHAR value:"a" _c0_1: type:INT64 value:"1" _c2_0: type:VARCHAR value:"b" _c2_1: type:INT64 value:"2"} ` + // next we insert one row on the -20 shard - `sharded.-20: prefix values (:_c1_0, :_c1_1) suffix ` + + `sharded.-20: prefix values (:_c1_0, :_c1_1) ` + `{_c1_0: type:VARCHAR value:"a" _c1_1: type:INT64 value:"3"} ` + `true false`}) @@ -1690,11 +1838,11 @@ func TestInsertSelectOwned(t *testing.T) { // insert values into the main table `ExecuteMultiShard ` + // first we insert two rows on the 20- shard - `sharded.20-: prefix values (:_c0_0, :_c0_1), (:_c2_0, :_c2_1) suffix ` + + `sharded.20-: prefix values (:_c0_0, :_c0_1), (:_c2_0, :_c2_1) ` + `{_c0_0: type:VARCHAR value:"a" _c0_1: type:INT64 value:"1" _c2_0: type:VARCHAR value:"b" _c2_1: type:INT64 value:"2"} ` + // next we insert one row on the -20 shard - `sharded.-20: prefix values (:_c1_0, :_c1_1) suffix ` + + `sharded.-20: prefix values (:_c1_0, :_c1_1) ` + `{_c1_0: type:VARCHAR value:"a" _c1_1: type:INT64 value:"3"} ` + `true false`}) } @@ -1728,7 +1876,7 @@ func TestInsertSelectGenerate(t *testing.T) { ks.Keyspace, ks.Tables["t1"], "prefix ", - " suffix", + nil, [][]int{{1}}, // The primary vindex has a single column as sharding key rb, ) @@ -1776,13 +1924,13 @@ func TestInsertSelectGenerate(t *testing.T) { `ResolveDestinations sharded [value:"0" value:"1" value:"2"] Destinations:DestinationKeyspaceID(166b40b44aba4bd6),DestinationKeyspaceID(06e7ea22ce92708f),DestinationKeyspaceID(4eb190c9a2fa169c)`, `ExecuteMultiShard ` + // first we send the insert to the 20- shard - `sharded.20-: prefix values (:_c0_0, :_c0_1), (:_c2_0, :_c2_1) suffix ` + + `sharded.20-: prefix values (:_c0_0, :_c0_1), (:_c2_0, :_c2_1) ` + `{_c0_0: type:VARCHAR value:"a" ` + `_c0_1: type:INT64 value:"1" ` + `_c2_0: type:VARCHAR value:"b" ` + `_c2_1: type:INT64 value:"3"} ` + // next we send the insert to the -20 shard - `sharded.-20: prefix values (:_c1_0, :_c1_1) suffix ` + + `sharded.-20: prefix values (:_c1_0, :_c1_1) ` + `{_c1_0: type:VARCHAR value:"a" _c1_1: type:INT64 value:"2"} ` + `true false`, }) @@ -1820,7 +1968,7 @@ func TestStreamingInsertSelectGenerate(t *testing.T) { ks.Keyspace, ks.Tables["t1"], "prefix ", - " suffix", + nil, [][]int{ {1}}, // The primary vindex has a single column as sharding key rb, @@ -1873,13 +2021,13 @@ func TestStreamingInsertSelectGenerate(t *testing.T) { `ResolveDestinations sharded [value:"0" value:"1" value:"2"] Destinations:DestinationKeyspaceID(166b40b44aba4bd6),DestinationKeyspaceID(06e7ea22ce92708f),DestinationKeyspaceID(4eb190c9a2fa169c)`, `ExecuteMultiShard ` + // first we send the insert to the 20- shard - `sharded.20-: prefix values (:_c0_0, :_c0_1), (:_c2_0, :_c2_1) suffix ` + + `sharded.20-: prefix values (:_c0_0, :_c0_1), (:_c2_0, :_c2_1) ` + `{_c0_0: type:VARCHAR value:"a" ` + `_c0_1: type:INT64 value:"1" ` + `_c2_0: type:VARCHAR value:"b" ` + `_c2_1: type:INT64 value:"3"} ` + // next we send the insert to the -20 shard - `sharded.-20: prefix values (:_c1_0, :_c1_1) suffix ` + + `sharded.-20: prefix values (:_c1_0, :_c1_1) ` + `{_c1_0: type:VARCHAR value:"a" _c1_1: type:INT64 value:"2"} ` + `true false`, }) @@ -1916,7 +2064,7 @@ func TestInsertSelectGenerateNotProvided(t *testing.T) { ks.Keyspace, ks.Tables["t1"], "prefix ", - " suffix", + nil, [][]int{{1}}, // The primary vindex has a single column as sharding key, rb, ) @@ -1963,10 +2111,10 @@ func TestInsertSelectGenerateNotProvided(t *testing.T) { `ExecuteStandalone dummy_generate n: type:INT64 value:"3" ks2 -20`, `ResolveDestinations sharded [value:"0" value:"1" value:"2"] Destinations:DestinationKeyspaceID(166b40b44aba4bd6),DestinationKeyspaceID(06e7ea22ce92708f),DestinationKeyspaceID(4eb190c9a2fa169c)`, `ExecuteMultiShard ` + - `sharded.20-: prefix values (:_c0_0, :_c0_1, :_c0_2), (:_c2_0, :_c2_1, :_c2_2) suffix ` + + `sharded.20-: prefix values (:_c0_0, :_c0_1, :_c0_2), (:_c2_0, :_c2_1, :_c2_2) ` + `{_c0_0: type:VARCHAR value:"a" _c0_1: type:INT64 value:"1" _c0_2: type:INT64 value:"10" ` + `_c2_0: type:VARCHAR value:"b" _c2_1: type:INT64 value:"3" _c2_2: type:INT64 value:"12"} ` + - `sharded.-20: prefix values (:_c1_0, :_c1_1, :_c1_2) suffix ` + + `sharded.-20: prefix values (:_c1_0, :_c1_1, :_c1_2) ` + `{_c1_0: type:VARCHAR value:"a" _c1_1: type:INT64 value:"2" _c1_2: type:INT64 value:"11"} ` + `true false`, }) @@ -2003,7 +2151,7 @@ func TestStreamingInsertSelectGenerateNotProvided(t *testing.T) { ks.Keyspace, ks.Tables["t1"], "prefix ", - " suffix", + nil, [][]int{{1}}, // The primary vindex has a single column as sharding key, rb, ) @@ -2054,10 +2202,10 @@ func TestStreamingInsertSelectGenerateNotProvided(t *testing.T) { `ExecuteStandalone dummy_generate n: type:INT64 value:"3" ks2 -20`, `ResolveDestinations sharded [value:"0" value:"1" value:"2"] Destinations:DestinationKeyspaceID(166b40b44aba4bd6),DestinationKeyspaceID(06e7ea22ce92708f),DestinationKeyspaceID(4eb190c9a2fa169c)`, `ExecuteMultiShard ` + - `sharded.20-: prefix values (:_c0_0, :_c0_1, :_c0_2), (:_c2_0, :_c2_1, :_c2_2) suffix ` + + `sharded.20-: prefix values (:_c0_0, :_c0_1, :_c0_2), (:_c2_0, :_c2_1, :_c2_2) ` + `{_c0_0: type:VARCHAR value:"a" _c0_1: type:INT64 value:"1" _c0_2: type:INT64 value:"10" ` + `_c2_0: type:VARCHAR value:"b" _c2_1: type:INT64 value:"3" _c2_2: type:INT64 value:"12"} ` + - `sharded.-20: prefix values (:_c1_0, :_c1_1, :_c1_2) suffix ` + + `sharded.-20: prefix values (:_c1_0, :_c1_1, :_c1_2) ` + `{_c1_0: type:VARCHAR value:"a" _c1_1: type:INT64 value:"2" _c1_2: type:INT64 value:"11"} ` + `true false`, }) @@ -2100,7 +2248,7 @@ func TestInsertSelectUnowned(t *testing.T) { ks.Keyspace, ks.Tables["t2"], "prefix ", - " suffix", + nil, [][]int{{0}}, // // the onecol vindex as unowned lookup sharding column rb, ) @@ -2130,11 +2278,11 @@ func TestInsertSelectUnowned(t *testing.T) { // insert values into the main table `ExecuteMultiShard ` + // first we insert two rows on the 20- shard - `sharded.20-: prefix values (:_c0_0), (:_c2_0) suffix ` + + `sharded.20-: prefix values (:_c0_0), (:_c2_0) ` + `{_c0_0: type:INT64 value:"1" _c2_0: type:INT64 value:"2"} ` + // next we insert one row on the -20 shard - `sharded.-20: prefix values (:_c1_0) suffix ` + + `sharded.-20: prefix values (:_c1_0) ` + `{_c1_0: type:INT64 value:"3"} ` + `true false`}) @@ -2158,11 +2306,11 @@ func TestInsertSelectUnowned(t *testing.T) { // insert values into the main table `ExecuteMultiShard ` + // first we insert two rows on the 20- shard - `sharded.20-: prefix values (:_c0_0), (:_c2_0) suffix ` + + `sharded.20-: prefix values (:_c0_0), (:_c2_0) ` + `{_c0_0: type:INT64 value:"1" _c2_0: type:INT64 value:"2"} ` + // next we insert one row on the -20 shard - `sharded.-20: prefix values (:_c1_0) suffix ` + + `sharded.-20: prefix values (:_c1_0) ` + `{_c1_0: type:INT64 value:"3"} ` + `true false`}) } @@ -2214,7 +2362,7 @@ func TestInsertSelectShardingCases(t *testing.T) { sks1.Keyspace, sks1.Tables["s1"], "prefix ", - " suffix", + nil, [][]int{{0}}, sRoute, ) @@ -2240,7 +2388,7 @@ func TestInsertSelectShardingCases(t *testing.T) { // the query exec `ResolveDestinations sks1 [value:"0"] Destinations:DestinationKeyspaceID(166b40b44aba4bd6)`, - `ExecuteMultiShard sks1.-20: prefix values (:_c0_0) suffix {_c0_0: type:INT64 value:"1"} true true`}) + `ExecuteMultiShard sks1.-20: prefix values (:_c0_0) {_c0_0: type:INT64 value:"1"} true true`}) vc.Rewind() err = ins.TryStreamExecute(context.Background(), vc, map[string]*querypb.BindVariable{}, false, func(result *sqltypes.Result) error { @@ -2254,7 +2402,7 @@ func TestInsertSelectShardingCases(t *testing.T) { // the query exec `ResolveDestinations sks1 [value:"0"] Destinations:DestinationKeyspaceID(166b40b44aba4bd6)`, - `ExecuteMultiShard sks1.-20: prefix values (:_c0_0) suffix {_c0_0: type:INT64 value:"1"} true true`}) + `ExecuteMultiShard sks1.-20: prefix values (:_c0_0) {_c0_0: type:INT64 value:"1"} true true`}) // sks1 and uks2 ins.Input = uRoute @@ -2269,7 +2417,7 @@ func TestInsertSelectShardingCases(t *testing.T) { // the query exec `ResolveDestinations sks1 [value:"0"] Destinations:DestinationKeyspaceID(166b40b44aba4bd6)`, - `ExecuteMultiShard sks1.-20: prefix values (:_c0_0) suffix {_c0_0: type:INT64 value:"1"} true true`}) + `ExecuteMultiShard sks1.-20: prefix values (:_c0_0) {_c0_0: type:INT64 value:"1"} true true`}) vc.Rewind() err = ins.TryStreamExecute(context.Background(), vc, map[string]*querypb.BindVariable{}, false, func(result *sqltypes.Result) error { @@ -2283,7 +2431,7 @@ func TestInsertSelectShardingCases(t *testing.T) { // the query exec `ResolveDestinations sks1 [value:"0"] Destinations:DestinationKeyspaceID(166b40b44aba4bd6)`, - `ExecuteMultiShard sks1.-20: prefix values (:_c0_0) suffix {_c0_0: type:INT64 value:"1"} true true`}) + `ExecuteMultiShard sks1.-20: prefix values (:_c0_0) {_c0_0: type:INT64 value:"1"} true true`}) // uks1 and sks2 ins = newInsertSelect( @@ -2291,7 +2439,7 @@ func TestInsertSelectShardingCases(t *testing.T) { uks1.Keyspace, nil, "prefix ", - " suffix", + nil, nil, sRoute, ) @@ -2306,7 +2454,7 @@ func TestInsertSelectShardingCases(t *testing.T) { // the query exec `ResolveDestinations uks1 [] Destinations:DestinationAllShards()`, - `ExecuteMultiShard uks1.0: prefix values (:_c0_0) suffix {_c0_0: type:INT64 value:"1"} true true`}) + `ExecuteMultiShard uks1.0: prefix values (:_c0_0) {_c0_0: type:INT64 value:"1"} true true`}) vc.Rewind() err = ins.TryStreamExecute(context.Background(), vc, map[string]*querypb.BindVariable{}, false, func(result *sqltypes.Result) error { @@ -2320,7 +2468,7 @@ func TestInsertSelectShardingCases(t *testing.T) { // the query exec `ResolveDestinations uks1 [] Destinations:DestinationAllShards()`, - `ExecuteMultiShard uks1.0: prefix values (:_c0_0) suffix {_c0_0: type:INT64 value:"1"} true true`}) + `ExecuteMultiShard uks1.0: prefix values (:_c0_0) {_c0_0: type:INT64 value:"1"} true true`}) // uks1 and uks2 ins.Input = uRoute @@ -2335,7 +2483,7 @@ func TestInsertSelectShardingCases(t *testing.T) { // the query exec `ResolveDestinations uks1 [] Destinations:DestinationAllShards()`, - `ExecuteMultiShard uks1.0: prefix values (:_c0_0) suffix {_c0_0: type:INT64 value:"1"} true true`}) + `ExecuteMultiShard uks1.0: prefix values (:_c0_0) {_c0_0: type:INT64 value:"1"} true true`}) vc.Rewind() err = ins.TryStreamExecute(context.Background(), vc, map[string]*querypb.BindVariable{}, false, func(result *sqltypes.Result) error { @@ -2349,5 +2497,5 @@ func TestInsertSelectShardingCases(t *testing.T) { // the query exec `ResolveDestinations uks1 [] Destinations:DestinationAllShards()`, - `ExecuteMultiShard uks1.0: prefix values (:_c0_0) suffix {_c0_0: type:INT64 value:"1"} true true`}) + `ExecuteMultiShard uks1.0: prefix values (:_c0_0) {_c0_0: type:INT64 value:"1"} true true`}) } diff --git a/go/vt/vtgate/planbuilder/operator_transformers.go b/go/vt/vtgate/planbuilder/operator_transformers.go index 7d3eb05549a..3b117411921 100644 --- a/go/vt/vtgate/planbuilder/operator_transformers.go +++ b/go/vt/vtgate/planbuilder/operator_transformers.go @@ -590,7 +590,7 @@ func autoIncGenerate(gen *operators.Generate) *engine.Generate { } } -func generateInsertShardedQuery(ins *sqlparser.Insert) (prefix string, mids sqlparser.Values, suffix string) { +func generateInsertShardedQuery(ins *sqlparser.Insert) (prefix string, mids sqlparser.Values, suffix sqlparser.OnDup) { mids, isValues := ins.Rows.(sqlparser.Values) prefixFormat := "insert %v%sinto %v%v " if isValues { @@ -605,9 +605,13 @@ func generateInsertShardedQuery(ins *sqlparser.Insert) (prefix string, mids sqlp ins.Table, ins.Columns) prefix = prefixBuf.String() - suffixBuf := sqlparser.NewTrackedBuffer(dmlFormatter) - suffixBuf.Myprintf("%v", ins.OnDup) - suffix = suffixBuf.String() + suffix = sqlparser.CopyOnRewrite(ins.OnDup, nil, func(cursor *sqlparser.CopyOnWriteCursor) { + if tblName, ok := cursor.Node().(sqlparser.TableName); ok { + if tblName.Qualifier != sqlparser.NewIdentifierCS("") { + cursor.Replace(sqlparser.NewTableName(tblName.Name.String())) + } + } + }, nil).(sqlparser.OnDup) return } diff --git a/go/vt/vtgate/planbuilder/testdata/dml_cases.json b/go/vt/vtgate/planbuilder/testdata/dml_cases.json index ec39db0b158..511ce9fb954 100644 --- a/go/vt/vtgate/planbuilder/testdata/dml_cases.json +++ b/go/vt/vtgate/planbuilder/testdata/dml_cases.json @@ -4899,5 +4899,33 @@ "comment": "insert row values greater than number of columns", "query": "insert into user(one, two, three) values (1, 2, 3, 4)", "plan": "VT03006: column count does not match value count at row 1" + }, + { + "comment": "insert on duplicate key update with database qualifier", + "query": "insert into user.music(id, user_id, col) values (1, 2, 3) on duplicate key update user.music.col = 5", + "plan": { + "QueryType": "INSERT", + "Original": "insert into user.music(id, user_id, col) values (1, 2, 3) on duplicate key update user.music.col = 5", + "Instructions": { + "OperatorType": "Insert", + "Variant": "Sharded", + "Keyspace": { + "Name": "user", + "Sharded": true + }, + "TargetTabletType": "PRIMARY", + "InsertIgnore": true, + "Query": "insert into music(id, user_id, col) values (:_id_0, :_user_id_0, 3) on duplicate key update music.col = 5", + "TableName": "music", + "VindexValues": { + "music_user_map": "1", + "user_index": "2" + } + }, + "TablesUsed": [ + "user.music" + ] + } } + ]