Skip to content

Commit acedfde

Browse files
committed
refac: use lookup instead of workflowFetcher for lookup vindex actions
Signed-off-by: Noble Mittal <noblemittal@outlook.com>
1 parent 4d4e6f1 commit acedfde

File tree

3 files changed

+43
-31
lines changed

3 files changed

+43
-31
lines changed

go/vt/vtctl/workflow/lookup.go

Lines changed: 26 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -26,21 +26,33 @@ import (
2626
"google.golang.org/protobuf/proto"
2727

2828
"vitess.io/vitess/go/sqlescape"
29+
"vitess.io/vitess/go/vt/logutil"
2930
"vitess.io/vitess/go/vt/schema"
3031
"vitess.io/vitess/go/vt/sqlparser"
32+
"vitess.io/vitess/go/vt/topo"
3133
"vitess.io/vitess/go/vt/vtctl/schematools"
3234
"vitess.io/vitess/go/vt/vterrors"
3335
"vitess.io/vitess/go/vt/vtgate/vindexes"
36+
"vitess.io/vitess/go/vt/vttablet/tmclient"
3437

3538
tabletmanagerdatapb "vitess.io/vitess/go/vt/proto/tabletmanagerdata"
3639
vschemapb "vitess.io/vitess/go/vt/proto/vschema"
3740
vtctldatapb "vitess.io/vitess/go/vt/proto/vtctldata"
3841
vtrpcpb "vitess.io/vitess/go/vt/proto/vtrpc"
3942
)
4043

44+
// lookup is responsible for performing actions related to lookup vindexes.
45+
type lookup struct {
46+
ts *topo.Server
47+
tmc tmclient.TabletManagerClient
48+
49+
logger logutil.Logger
50+
parser *sqlparser.Parser
51+
}
52+
4153
// prepareCreateLookup performs the preparatory steps for creating a
4254
// Lookup Vindex.
43-
func (wf *workflowFetcher) prepareCreateLookup(ctx context.Context, workflow, keyspace string, specs *vschemapb.Keyspace, continueAfterCopyWithOwner bool) (
55+
func (l *lookup) prepareCreateLookup(ctx context.Context, workflow, keyspace string, specs *vschemapb.Keyspace, continueAfterCopyWithOwner bool) (
4456
ms *vtctldatapb.MaterializeSettings, sourceVSchema, targetVSchema *vschemapb.Keyspace, cancelFunc func() error, err error) {
4557
var (
4658
// sourceVSchemaTable is the table info present in the vschema.
@@ -54,7 +66,7 @@ func (wf *workflowFetcher) prepareCreateLookup(ctx context.Context, workflow, ke
5466
)
5567

5668
// Validate input vindex.
57-
vindex, vInfo, err := wf.validateAndGetVindex(specs)
69+
vindex, vInfo, err := l.validateAndGetVindex(specs)
5870
if err != nil {
5971
return nil, nil, nil, nil, err
6072
}
@@ -69,7 +81,7 @@ func (wf *workflowFetcher) prepareCreateLookup(ctx context.Context, workflow, ke
6981
return nil, nil, nil, nil, err
7082
}
7183

72-
sourceVSchema, targetVSchema, err = wf.getTargetAndSourceVSchema(ctx, keyspace, vInfo.targetKeyspace)
84+
sourceVSchema, targetVSchema, err = l.getTargetAndSourceVSchema(ctx, keyspace, vInfo.targetKeyspace)
7385
if err != nil {
7486
return nil, nil, nil, nil, err
7587
}
@@ -91,7 +103,7 @@ func (wf *workflowFetcher) prepareCreateLookup(ctx context.Context, workflow, ke
91103
}
92104

93105
// Validate against source schema.
94-
sourceShards, err := wf.ts.GetServingShards(ctx, keyspace)
106+
sourceShards, err := l.ts.GetServingShards(ctx, keyspace)
95107
if err != nil {
96108
return nil, nil, nil, nil, err
97109
}
@@ -102,7 +114,7 @@ func (wf *workflowFetcher) prepareCreateLookup(ctx context.Context, workflow, ke
102114
}
103115

104116
req := &tabletmanagerdatapb.GetSchemaRequest{Tables: []string{vInfo.sourceTableName}}
105-
tableSchema, err := schematools.GetSchema(ctx, wf.ts, wf.tmc, onesource.PrimaryAlias, req)
117+
tableSchema, err := schematools.GetSchema(ctx, l.ts, l.tmc, onesource.PrimaryAlias, req)
106118
if err != nil {
107119
return nil, nil, nil, nil, err
108120
}
@@ -113,7 +125,7 @@ func (wf *workflowFetcher) prepareCreateLookup(ctx context.Context, workflow, ke
113125
}
114126

115127
// Generate "create table" statement.
116-
createDDL, err = wf.generateCreateDDLStatement(tableSchema, sourceVindexColumns, vInfo, vindex)
128+
createDDL, err = l.generateCreateDDLStatement(tableSchema, sourceVindexColumns, vInfo, vindex)
117129
if err != nil {
118130
return nil, nil, nil, nil, err
119131
}
@@ -191,7 +203,7 @@ func (wf *workflowFetcher) prepareCreateLookup(ctx context.Context, workflow, ke
191203
if targetChanged {
192204
cancelFunc = func() error {
193205
// Restore the original target vschema.
194-
return wf.ts.SaveVSchema(ctx, vInfo.targetKeyspace, ogTargetVSchema)
206+
return l.ts.SaveVSchema(ctx, vInfo.targetKeyspace, ogTargetVSchema)
195207
}
196208
}
197209

@@ -230,7 +242,7 @@ type vindexInfo struct {
230242
}
231243

232244
// validateAndGetVindex validates and extracts vindex configuration
233-
func (wf *workflowFetcher) validateAndGetVindex(specs *vschemapb.Keyspace) (*vschemapb.Vindex, *vindexInfo, error) {
245+
func (l *lookup) validateAndGetVindex(specs *vschemapb.Keyspace) (*vschemapb.Vindex, *vindexInfo, error) {
234246
if specs == nil {
235247
return nil, nil, vterrors.Errorf(vtrpcpb.Code_INVALID_ARGUMENT, "no vindex provided")
236248
}
@@ -245,7 +257,7 @@ func (wf *workflowFetcher) validateAndGetVindex(specs *vschemapb.Keyspace) (*vsc
245257
return nil, nil, vterrors.Errorf(vtrpcpb.Code_INVALID_ARGUMENT, "vindex %s is not a lookup type", vindex.Type)
246258
}
247259

248-
targetKeyspace, targetTableName, err := wf.parser.ParseTable(vindex.Params["table"])
260+
targetKeyspace, targetTableName, err := l.parser.ParseTable(vindex.Params["table"])
249261
if err != nil || targetKeyspace == "" {
250262
return nil, nil, vterrors.Errorf(vtrpcpb.Code_INVALID_ARGUMENT,
251263
"vindex table name (%s) must be in the form <keyspace>.<table>", vindex.Params["table"])
@@ -306,8 +318,8 @@ func (wf *workflowFetcher) validateAndGetVindex(specs *vschemapb.Keyspace) (*vsc
306318
}, nil
307319
}
308320

309-
func (wf *workflowFetcher) getTargetAndSourceVSchema(ctx context.Context, sourceKeyspace string, targetKeyspace string) (sourceVSchema *vschemapb.Keyspace, targetVSchema *vschemapb.Keyspace, err error) {
310-
sourceVSchema, err = wf.ts.GetVSchema(ctx, sourceKeyspace)
321+
func (l *lookup) getTargetAndSourceVSchema(ctx context.Context, sourceKeyspace string, targetKeyspace string) (sourceVSchema *vschemapb.Keyspace, targetVSchema *vschemapb.Keyspace, err error) {
322+
sourceVSchema, err = l.ts.GetVSchema(ctx, sourceKeyspace)
311323
if err != nil {
312324
return nil, nil, err
313325
}
@@ -319,7 +331,7 @@ func (wf *workflowFetcher) getTargetAndSourceVSchema(ctx context.Context, source
319331
if sourceKeyspace == targetKeyspace {
320332
targetVSchema = sourceVSchema
321333
} else {
322-
targetVSchema, err = wf.ts.GetVSchema(ctx, targetKeyspace)
334+
targetVSchema, err = l.ts.GetVSchema(ctx, targetKeyspace)
323335
if err != nil {
324336
return nil, nil, err
325337
}
@@ -367,7 +379,7 @@ func getSourceTable(specs *vschemapb.Keyspace, targetTableName string, fromCols
367379
return sourceTable, sourceTableName, nil
368380
}
369381

370-
func (wf *workflowFetcher) generateCreateDDLStatement(tableSchema *tabletmanagerdatapb.SchemaDefinition, sourceVindexColumns []string, vInfo *vindexInfo, vindex *vschemapb.Vindex) (string, error) {
382+
func (l *lookup) generateCreateDDLStatement(tableSchema *tabletmanagerdatapb.SchemaDefinition, sourceVindexColumns []string, vInfo *vindexInfo, vindex *vschemapb.Vindex) (string, error) {
371383
lines := strings.Split(tableSchema.TableDefinitions[0].Schema, "\n")
372384
if len(lines) < 3 {
373385
// Should never happen.
@@ -405,7 +417,7 @@ func (wf *workflowFetcher) generateCreateDDLStatement(tableSchema *tabletmanager
405417
createDDL := strings.Join(modified, "\n")
406418

407419
// Confirm that our DDL is valid before we create anything.
408-
if _, err := wf.parser.ParseStrictDDL(createDDL); err != nil {
420+
if _, err := l.parser.ParseStrictDDL(createDDL); err != nil {
409421
return "", vterrors.Errorf(vtrpcpb.Code_INTERNAL, "error: %v; invalid lookup table definition generated: %s",
410422
err, createDDL)
411423
}

go/vt/vtctl/workflow/materializer_test.go

Lines changed: 15 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -1515,13 +1515,13 @@ func TestCreateLookupVindexCreateDDL(t *testing.T) {
15151515
setStartingVschema()
15161516
}()
15171517
}
1518-
w := &workflowFetcher{
1518+
l := &lookup{
15191519
ts: env.ws.ts,
15201520
tmc: env.ws.tmc,
15211521
logger: env.ws.Logger(),
15221522
parser: env.ws.SQLParser(),
15231523
}
1524-
outms, _, _, cancelFunc, err := w.prepareCreateLookup(ctx, "workflow", ms.SourceKeyspace, tcase.specs, false)
1524+
outms, _, _, cancelFunc, err := l.prepareCreateLookup(ctx, "workflow", ms.SourceKeyspace, tcase.specs, false)
15251525
if tcase.err != "" {
15261526
require.Error(t, err)
15271527
require.Contains(t, err.Error(), tcase.err, "prepareCreateLookup(%s) err: %v, does not contain %v", tcase.description, err, tcase.err)
@@ -1769,13 +1769,13 @@ func TestCreateLookupVindexSourceVSchema(t *testing.T) {
17691769
t.Fatal(err)
17701770
}
17711771

1772-
w := &workflowFetcher{
1772+
l := &lookup{
17731773
ts: env.ws.ts,
17741774
tmc: env.ws.tmc,
17751775
logger: env.ws.Logger(),
17761776
parser: env.ws.SQLParser(),
17771777
}
1778-
_, got, _, _, err := w.prepareCreateLookup(ctx, "workflow", ms.SourceKeyspace, specs, false)
1778+
_, got, _, _, err := l.prepareCreateLookup(ctx, "workflow", ms.SourceKeyspace, specs, false)
17791779
require.NoError(t, err)
17801780
if !proto.Equal(got, tcase.out) {
17811781
t.Errorf("%s: got:\n%v, want\n%v", tcase.description, got, tcase.out)
@@ -2011,13 +2011,13 @@ func TestCreateLookupVindexTargetVSchema(t *testing.T) {
20112011
t.Fatal(err)
20122012
}
20132013

2014-
w := &workflowFetcher{
2014+
l := &lookup{
20152015
ts: env.ws.ts,
20162016
tmc: env.ws.tmc,
20172017
logger: env.ws.Logger(),
20182018
parser: env.ws.SQLParser(),
20192019
}
2020-
_, _, got, cancelFunc, err := w.prepareCreateLookup(ctx, "workflow", ms.SourceKeyspace, specs, false)
2020+
_, _, got, cancelFunc, err := l.prepareCreateLookup(ctx, "workflow", ms.SourceKeyspace, specs, false)
20212021
if tcase.err != "" {
20222022
if err == nil || !strings.Contains(err.Error(), tcase.err) {
20232023
t.Errorf("prepareCreateLookup(%s) err: %v, must contain %v", tcase.description, err, tcase.err)
@@ -2139,13 +2139,13 @@ func TestCreateLookupVindexSameKeyspace(t *testing.T) {
21392139
t.Fatal(err)
21402140
}
21412141

2142-
w := &workflowFetcher{
2142+
l := &lookup{
21432143
ts: env.ws.ts,
21442144
tmc: env.ws.tmc,
21452145
logger: env.ws.Logger(),
21462146
parser: env.ws.SQLParser(),
21472147
}
2148-
_, got, _, _, err := w.prepareCreateLookup(ctx, "keyspace", ms.TargetKeyspace, specs, false)
2148+
_, got, _, _, err := l.prepareCreateLookup(ctx, "keyspace", ms.TargetKeyspace, specs, false)
21492149
require.NoError(t, err)
21502150
if !proto.Equal(got, want) {
21512151
t.Errorf("same keyspace: got:\n%v, want\n%v", got, want)
@@ -2271,13 +2271,13 @@ func TestCreateCustomizedVindex(t *testing.T) {
22712271
t.Fatal(err)
22722272
}
22732273

2274-
w := &workflowFetcher{
2274+
l := &lookup{
22752275
ts: env.ws.ts,
22762276
tmc: env.ws.tmc,
22772277
logger: env.ws.Logger(),
22782278
parser: env.ws.SQLParser(),
22792279
}
2280-
_, got, _, _, err := w.prepareCreateLookup(ctx, "workflow", ms.TargetKeyspace, specs, false)
2280+
_, got, _, _, err := l.prepareCreateLookup(ctx, "workflow", ms.TargetKeyspace, specs, false)
22812281
require.NoError(t, err)
22822282
if !proto.Equal(got, want) {
22832283
t.Errorf("customize create lookup error same: got:\n%v, want\n%v", got, want)
@@ -2395,13 +2395,13 @@ func TestCreateLookupVindexIgnoreNulls(t *testing.T) {
23952395
t.Fatal(err)
23962396
}
23972397

2398-
w := &workflowFetcher{
2398+
l := &lookup{
23992399
ts: env.ws.ts,
24002400
tmc: env.ws.tmc,
24012401
logger: env.ws.Logger(),
24022402
parser: env.ws.SQLParser(),
24032403
}
2404-
ms, ks, _, _, err := w.prepareCreateLookup(ctx, "workflow", ms.TargetKeyspace, specs, false)
2404+
ms, ks, _, _, err := l.prepareCreateLookup(ctx, "workflow", ms.TargetKeyspace, specs, false)
24052405
require.NoError(t, err)
24062406
if !proto.Equal(wantKs, ks) {
24072407
t.Errorf("unexpected keyspace value: got:\n%v, want\n%v", ks, wantKs)
@@ -2481,17 +2481,17 @@ func TestStopAfterCopyFlag(t *testing.T) {
24812481
t.Fatal(err)
24822482
}
24832483

2484-
w := &workflowFetcher{
2484+
l := &lookup{
24852485
ts: env.ws.ts,
24862486
tmc: env.ws.tmc,
24872487
logger: env.ws.Logger(),
24882488
parser: env.ws.SQLParser(),
24892489
}
2490-
ms1, _, _, _, err := w.prepareCreateLookup(ctx, "workflow", ms.TargetKeyspace, specs, false)
2490+
ms1, _, _, _, err := l.prepareCreateLookup(ctx, "workflow", ms.TargetKeyspace, specs, false)
24912491
require.NoError(t, err)
24922492
require.Equal(t, ms1.StopAfterCopy, true)
24932493

2494-
ms2, _, _, _, err := w.prepareCreateLookup(ctx, "workflow", ms.TargetKeyspace, specs, true)
2494+
ms2, _, _, _, err := l.prepareCreateLookup(ctx, "workflow", ms.TargetKeyspace, specs, true)
24952495
require.NoError(t, err)
24962496
require.Equal(t, ms2.StopAfterCopy, false)
24972497
}

go/vt/vtctl/workflow/server.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -567,13 +567,13 @@ func (s *Server) LookupVindexCreate(ctx context.Context, req *vtctldatapb.Lookup
567567
span.Annotate("cells", req.Cells)
568568
span.Annotate("tablet_types", req.TabletTypes)
569569

570-
w := &workflowFetcher{
570+
l := &lookup{
571571
ts: s.ts,
572572
tmc: s.tmc,
573573
logger: s.Logger(),
574574
parser: s.SQLParser(),
575575
}
576-
ms, sourceVSchema, targetVSchema, cancelFunc, err := w.prepareCreateLookup(ctx, req.Workflow, req.Keyspace, req.Vindex, req.ContinueAfterCopyWithOwner)
576+
ms, sourceVSchema, targetVSchema, cancelFunc, err := l.prepareCreateLookup(ctx, req.Workflow, req.Keyspace, req.Vindex, req.ContinueAfterCopyWithOwner)
577577
if err != nil {
578578
return nil, err
579579
}

0 commit comments

Comments
 (0)