Skip to content

Commit

Permalink
WIP
Browse files Browse the repository at this point in the history
Signed-off-by: Tim Vaillancourt <tim@timvaillancourt.com>
  • Loading branch information
timvaillancourt committed Oct 4, 2024
1 parent 145197b commit 9d24ede
Show file tree
Hide file tree
Showing 13 changed files with 44 additions and 30 deletions.
4 changes: 2 additions & 2 deletions go/cmd/vtcombo/cli/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -255,7 +255,7 @@ func run(cmd *cobra.Command, args []string) (err error) {
}
}

wr := wrangler.New(env, logutil.NewConsoleLogger(), ts, nil)
wr := wrangler.New(env, logutil.NewConsoleLogger(), ts, nil, nil)
newUID, err := vtcombo.CreateKs(ctx, env, ts, &tpb, mysqld, &dbconfigs.GlobalDBConfigs, schemaDir, ks, true, uid, wr)
if err != nil {
return err
Expand Down Expand Up @@ -309,7 +309,7 @@ func run(cmd *cobra.Command, args []string) (err error) {
vtg := vtgate.Init(context.Background(), env, nil, resilientServer, tpb.Cells[0], tabletTypesToWait, plannerVersion)

// vtctld configuration and init
err = vtctld.InitVtctld(env, ts)
err = vtctld.InitVtctld(env, ts, nil)
if err != nil {
return err
}
Expand Down
4 changes: 2 additions & 2 deletions go/cmd/vtctl/vtctl.go
Original file line number Diff line number Diff line change
Expand Up @@ -163,7 +163,7 @@ func main() {
// New behavior. Strip off the prefix, and set things up to run through
// the vtctldclient command tree, using the localvtctldclient (in-process)
// client.
vtctld := grpcvtctldserver.NewVtctldServer(env, ts)
vtctld := grpcvtctldserver.NewVtctldServer(env, ts, nil)
localvtctldclient.SetServer(vtctld)
command.VtctldClientProtocol = "local"

Expand All @@ -179,7 +179,7 @@ func main() {
fallthrough
default:
log.Warningf("WARNING: vtctl should only be used for VDiff v1 workflows. Please use VDiff v2 and consider using vtctldclient for all other commands.")
wr := wrangler.New(env, logutil.NewConsoleLogger(), ts, tmclient.NewTabletManagerClient())
wr := wrangler.New(env, logutil.NewConsoleLogger(), ts, tmclient.NewTabletManagerClient(), nil)

if args[0] == "--" {
vtctl.PrintDoubleDashDeprecationNotice(wr)
Expand Down
10 changes: 9 additions & 1 deletion go/cmd/vtctld/cli/cli.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import (
"github.com/spf13/cobra"

"vitess.io/vitess/go/acl"
"vitess.io/vitess/go/vt/events/eventer"
"vitess.io/vitess/go/vt/servenv"
"vitess.io/vitess/go/vt/topo"
"vitess.io/vitess/go/vt/vtctld"
Expand Down Expand Up @@ -70,8 +71,15 @@ func run(cmd *cobra.Command, args []string) error {
if err != nil {
return err
}

// Init the eventer
ev, err := eventer.Get()
if err != nil {
return err
}

// Init the vtctld core
if err := vtctld.InitVtctld(env, ts); err != nil {
if err := vtctld.InitVtctld(env, ts, ev); err != nil {
return err
}

Expand Down
2 changes: 1 addition & 1 deletion go/cmd/vtctld/cli/schema.go
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,7 @@ func initSchema() {
return
}
ctx := context.Background()
wr := wrangler.New(env, logutil.NewConsoleLogger(), ts, tmclient.NewTabletManagerClient())
wr := wrangler.New(env, logutil.NewConsoleLogger(), ts, tmclient.NewTabletManagerClient(), nil)
_, err = schemamanager.Run(
ctx,
controller,
Expand Down
2 changes: 1 addition & 1 deletion go/cmd/vtctldclient/command/root.go
Original file line number Diff line number Diff line change
Expand Up @@ -212,7 +212,7 @@ func getClientForCommand(cmd *cobra.Command) (vtctldclient.VtctldClient, error)
return nil
})
})
vtctld := grpcvtctldserver.NewVtctldServer(env, ts)
vtctld := grpcvtctldserver.NewVtctldServer(env, ts, nil)
localvtctldclient.SetServer(vtctld)
VtctldClientProtocol = "local"
server = ""
Expand Down
2 changes: 1 addition & 1 deletion go/vt/throttler/demo/throttler_demo.go
Original file line number Diff line number Diff line change
Expand Up @@ -118,7 +118,7 @@ type replica struct {

func newReplica(env *vtenv.Environment, lagUpdateInterval, degrationInterval, degrationDuration time.Duration, ts *topo.Server) *replica {
t := &testing.T{}
wr := wrangler.New(env, logutil.NewConsoleLogger(), ts, tmclient.NewTabletManagerClient())
wr := wrangler.New(env, logutil.NewConsoleLogger(), ts, tmclient.NewTabletManagerClient(), nil)
fakeTablet := testlib.NewFakeTablet(t, wr, "cell1", 0,
topodatapb.TabletType_REPLICA, nil, testlib.TabletKeyspaceShard(t, "ks", "-80"))
fakeTablet.StartActionLoop(t, wr)
Expand Down
2 changes: 1 addition & 1 deletion go/vt/vtcombo/tablet_map.go
Original file line number Diff line number Diff line change
Expand Up @@ -188,7 +188,7 @@ func InitTabletMap(
})

// iterate through the keyspaces
wr := wrangler.New(env, logutil.NewConsoleLogger(), ts, nil)
wr := wrangler.New(env, logutil.NewConsoleLogger(), ts, nil, nil)
var uid uint32 = 1
for _, kpb := range tpb.Keyspaces {
var err error
Expand Down
2 changes: 1 addition & 1 deletion go/vt/vtctl/grpcvtctlserver/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,7 @@ func (s *VtctlServer) ExecuteVtctlCommand(args *vtctldatapb.ExecuteVtctlCommandR
// create the wrangler
tmc := tmclient.NewTabletManagerClient()
defer tmc.Close()
wr := wrangler.New(s.env, logger, s.ts, tmc)
wr := wrangler.New(s.env, logger, s.ts, tmc, nil)

// execute the command
return vtctl.RunCommand(stream.Context(), wr, args.Args)
Expand Down
12 changes: 7 additions & 5 deletions go/vt/vtctld/action_repository.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,8 +23,8 @@ import (

"github.com/spf13/pflag"

"vitess.io/vitess/go/vt/events/eventer"
"vitess.io/vitess/go/vt/vtenv"

"vitess.io/vitess/go/acl"
"vitess.io/vitess/go/vt/logutil"
"vitess.io/vitess/go/vt/servenv"
Expand Down Expand Up @@ -86,17 +86,19 @@ type ActionRepository struct {
shardActions map[string]actionShardMethod
tabletActions map[string]actionTabletRecord
ts *topo.Server
ev eventer.Eventer
}

// NewActionRepository creates and returns a new ActionRepository,
// with no actions.
func NewActionRepository(env *vtenv.Environment, ts *topo.Server) *ActionRepository {
func NewActionRepository(env *vtenv.Environment, ts *topo.Server, ev eventer.Eventer) *ActionRepository {
return &ActionRepository{
env: env,
keyspaceActions: make(map[string]actionKeyspaceMethod),
shardActions: make(map[string]actionShardMethod),
tabletActions: make(map[string]actionTabletRecord),
ts: ts,
ev: ev,
}
}

Expand Down Expand Up @@ -129,7 +131,7 @@ func (ar *ActionRepository) ApplyKeyspaceAction(ctx context.Context, actionName,
}

ctx, cancel := context.WithTimeout(ctx, actionTimeout)
wr := wrangler.New(ar.env, logutil.NewConsoleLogger(), ar.ts, tmclient.NewTabletManagerClient())
wr := wrangler.New(ar.env, logutil.NewConsoleLogger(), ar.ts, tmclient.NewTabletManagerClient(), ar.ev)
output, err := action(ctx, wr, keyspace)
cancel()
if err != nil {
Expand All @@ -156,7 +158,7 @@ func (ar *ActionRepository) ApplyShardAction(ctx context.Context, actionName, ke
}

ctx, cancel := context.WithTimeout(ctx, actionTimeout)
wr := wrangler.New(ar.env, logutil.NewConsoleLogger(), ar.ts, tmclient.NewTabletManagerClient())
wr := wrangler.New(ar.env, logutil.NewConsoleLogger(), ar.ts, tmclient.NewTabletManagerClient(), ar.ev)
output, err := action(ctx, wr, keyspace, shard)
cancel()
if err != nil {
Expand Down Expand Up @@ -190,7 +192,7 @@ func (ar *ActionRepository) ApplyTabletAction(ctx context.Context, actionName st

// run the action
ctx, cancel := context.WithTimeout(ctx, actionTimeout)
wr := wrangler.New(ar.env, logutil.NewConsoleLogger(), ar.ts, tmclient.NewTabletManagerClient())
wr := wrangler.New(ar.env, logutil.NewConsoleLogger(), ar.ts, tmclient.NewTabletManagerClient(), ar.ev)
output, err := action.method(ctx, wr, tabletAlias)
cancel()
if err != nil {
Expand Down
7 changes: 4 additions & 3 deletions go/vt/vtctld/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ import (

"vitess.io/vitess/go/acl"
"vitess.io/vitess/go/netutil"
"vitess.io/vitess/go/vt/events/eventer"
"vitess.io/vitess/go/vt/log"
"vitess.io/vitess/go/vt/logutil"
"vitess.io/vitess/go/vt/mysqlctl"
Expand Down Expand Up @@ -178,7 +179,7 @@ func unmarshalRequest(r *http.Request, v any) error {
return json.Unmarshal(data, v)
}

func initAPI(ctx context.Context, ts *topo.Server, actions *ActionRepository) {
func initAPI(ctx context.Context, ts *topo.Server, ev eventer.Eventer, actions *ActionRepository) {
tabletHealthCache := newTabletHealthCache(ts)
tmClient := tmclient.NewTabletManagerClient()

Expand Down Expand Up @@ -487,7 +488,7 @@ func initAPI(ctx context.Context, ts *topo.Server, actions *ActionRepository) {

logstream := logutil.NewMemoryLogger()

wr := wrangler.New(actions.env, logstream, ts, tmClient)
wr := wrangler.New(actions.env, logstream, ts, tmClient, ev)
err := vtctl.RunCommand(r.Context(), wr, args)
if err != nil {
resp.Error = err.Error()
Expand Down Expand Up @@ -523,7 +524,7 @@ func initAPI(ctx context.Context, ts *topo.Server, actions *ActionRepository) {
logger := logutil.NewCallbackLogger(func(ev *logutilpb.Event) {
w.Write([]byte(logutil.EventString(ev)))
})
wr := wrangler.New(actions.env, logger, ts, tmClient)
wr := wrangler.New(actions.env, logger, ts, tmClient, ev)

apiCallUUID, err := schema.CreateUUID()
if err != nil {
Expand Down
13 changes: 6 additions & 7 deletions go/vt/vtctld/vtctld.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,12 +23,11 @@ import (

"github.com/spf13/pflag"

"vitess.io/vitess/go/vt/vtenv"

"vitess.io/vitess/go/vt/servenv"

"vitess.io/vitess/go/acl"
"vitess.io/vitess/go/vt/events/eventer"
"vitess.io/vitess/go/vt/servenv"
"vitess.io/vitess/go/vt/topo"
"vitess.io/vitess/go/vt/vtenv"
"vitess.io/vitess/go/vt/wrangler"

topodatapb "vitess.io/vitess/go/vt/proto/topodata"
Expand All @@ -50,8 +49,8 @@ func registerVtctldFlags(fs *pflag.FlagSet) {
}

// InitVtctld initializes all the vtctld functionality.
func InitVtctld(env *vtenv.Environment, ts *topo.Server) error {
actionRepo := NewActionRepository(env, ts)
func InitVtctld(env *vtenv.Environment, ts *topo.Server, ev eventer.Eventer) error {
actionRepo := NewActionRepository(env, ts, ev)

// keyspace actions
actionRepo.RegisterKeyspaceAction("ValidateKeyspace",
Expand Down Expand Up @@ -128,7 +127,7 @@ func InitVtctld(env *vtenv.Environment, ts *topo.Server) error {
})

// Serve the REST API
initAPI(context.Background(), ts, actionRepo)
initAPI(context.Background(), ts, ev, actionRepo)

// Serve the topology endpoint in the REST API at /topodata
initExplorer(ts)
Expand Down
2 changes: 1 addition & 1 deletion go/vt/wrangler/reparent.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@ func (wr *Wrangler) InitShardPrimary(ctx context.Context, keyspace, shard string
ev := &events.Reparent{}

// do the work
err = grpcvtctldserver.NewVtctldServer(wr.env, wr.ts).InitShardPrimaryLocked(ctx, ev, &vtctldatapb.InitShardPrimaryRequest{
err = grpcvtctldserver.NewVtctldServer(wr.env, wr.ts, wr.ev).InitShardPrimaryLocked(ctx, ev, &vtctldatapb.InitShardPrimaryRequest{
Keyspace: keyspace,
Shard: shard,
PrimaryElectTabletAlias: primaryElectTabletAlias,
Expand Down
12 changes: 8 additions & 4 deletions go/vt/wrangler/wrangler.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import (
"golang.org/x/sync/semaphore"

"vitess.io/vitess/go/sqltypes"
"vitess.io/vitess/go/vt/events/eventer"
"vitess.io/vitess/go/vt/logutil"
"vitess.io/vitess/go/vt/sqlparser"
"vitess.io/vitess/go/vt/topo"
Expand Down Expand Up @@ -54,6 +55,7 @@ type Wrangler struct {
ts *topo.Server
tmc tmclient.TabletManagerClient
vtctld vtctlservicepb.VtctldServer
ev eventer.Eventer
sourceTs *topo.Server
// VExecFunc is a test-only fixture that allows us to short circuit vexec commands.
// DO NOT USE in production code.
Expand All @@ -64,26 +66,28 @@ type Wrangler struct {
}

// New creates a new Wrangler object.
func New(env *vtenv.Environment, logger logutil.Logger, ts *topo.Server, tmc tmclient.TabletManagerClient) *Wrangler {
func New(env *vtenv.Environment, logger logutil.Logger, ts *topo.Server, tmc tmclient.TabletManagerClient, ev eventer.Eventer) *Wrangler {
return &Wrangler{
env: env,
logger: logger,
ts: ts,
tmc: tmc,
vtctld: grpcvtctldserver.NewVtctldServer(env, ts),
vtctld: grpcvtctldserver.NewVtctldServer(env, ts, ev),
ev: ev,
sourceTs: ts,
}
}

// NewTestWrangler creates a new Wrangler object for use in tests. This should NOT be used
// in production.
func NewTestWrangler(logger logutil.Logger, ts *topo.Server, tmc tmclient.TabletManagerClient) *Wrangler {
func NewTestWrangler(logger logutil.Logger, ts *topo.Server, tmc tmclient.TabletManagerClient, ev eventer.Eventer) *Wrangler {
return &Wrangler{
env: vtenv.NewTestEnv(),
logger: logger,
ts: ts,
tmc: tmc,
vtctld: grpcvtctldserver.NewTestVtctldServer(ts, tmc),
vtctld: grpcvtctldserver.NewTestVtctldServer(ts, tmc, ev),
ev: ev,
sourceTs: ts,
}
}
Expand Down

0 comments on commit 9d24ede

Please sign in to comment.