diff --git a/go/cmd/vtcombo/cli/main.go b/go/cmd/vtcombo/cli/main.go index 7b9143f1384..9d67780e3c1 100644 --- a/go/cmd/vtcombo/cli/main.go +++ b/go/cmd/vtcombo/cli/main.go @@ -255,7 +255,7 @@ func run(cmd *cobra.Command, args []string) (err error) { } } - wr := wrangler.New(env, logutil.NewConsoleLogger(), ts, nil) + wr := wrangler.New(env, logutil.NewConsoleLogger(), ts, nil, nil) newUID, err := vtcombo.CreateKs(ctx, env, ts, &tpb, mysqld, &dbconfigs.GlobalDBConfigs, schemaDir, ks, true, uid, wr) if err != nil { return err @@ -309,7 +309,7 @@ func run(cmd *cobra.Command, args []string) (err error) { vtg := vtgate.Init(context.Background(), env, nil, resilientServer, tpb.Cells[0], tabletTypesToWait, plannerVersion) // vtctld configuration and init - err = vtctld.InitVtctld(env, ts) + err = vtctld.InitVtctld(env, ts, nil) if err != nil { return err } diff --git a/go/cmd/vtctl/vtctl.go b/go/cmd/vtctl/vtctl.go index ba84369620a..a21a4cbe3a2 100644 --- a/go/cmd/vtctl/vtctl.go +++ b/go/cmd/vtctl/vtctl.go @@ -163,7 +163,7 @@ func main() { // New behavior. Strip off the prefix, and set things up to run through // the vtctldclient command tree, using the localvtctldclient (in-process) // client. - vtctld := grpcvtctldserver.NewVtctldServer(env, ts) + vtctld := grpcvtctldserver.NewVtctldServer(env, ts, nil) localvtctldclient.SetServer(vtctld) command.VtctldClientProtocol = "local" @@ -179,7 +179,7 @@ func main() { fallthrough default: log.Warningf("WARNING: vtctl should only be used for VDiff v1 workflows. Please use VDiff v2 and consider using vtctldclient for all other commands.") - wr := wrangler.New(env, logutil.NewConsoleLogger(), ts, tmclient.NewTabletManagerClient()) + wr := wrangler.New(env, logutil.NewConsoleLogger(), ts, tmclient.NewTabletManagerClient(), nil) if args[0] == "--" { vtctl.PrintDoubleDashDeprecationNotice(wr) diff --git a/go/cmd/vtctld/cli/cli.go b/go/cmd/vtctld/cli/cli.go index 4f8c57b6b2f..e0ad8f0ba8b 100644 --- a/go/cmd/vtctld/cli/cli.go +++ b/go/cmd/vtctld/cli/cli.go @@ -20,6 +20,7 @@ import ( "github.com/spf13/cobra" "vitess.io/vitess/go/acl" + "vitess.io/vitess/go/vt/events/eventer" "vitess.io/vitess/go/vt/servenv" "vitess.io/vitess/go/vt/topo" "vitess.io/vitess/go/vt/vtctld" @@ -70,8 +71,15 @@ func run(cmd *cobra.Command, args []string) error { if err != nil { return err } + + // Init the eventer + ev, err := eventer.Get() + if err != nil { + return err + } + // Init the vtctld core - if err := vtctld.InitVtctld(env, ts); err != nil { + if err := vtctld.InitVtctld(env, ts, ev); err != nil { return err } diff --git a/go/cmd/vtctld/cli/schema.go b/go/cmd/vtctld/cli/schema.go index 9f1f9d06072..1eeb2ba9ae3 100644 --- a/go/cmd/vtctld/cli/schema.go +++ b/go/cmd/vtctld/cli/schema.go @@ -71,7 +71,7 @@ func initSchema() { return } ctx := context.Background() - wr := wrangler.New(env, logutil.NewConsoleLogger(), ts, tmclient.NewTabletManagerClient()) + wr := wrangler.New(env, logutil.NewConsoleLogger(), ts, tmclient.NewTabletManagerClient(), nil) _, err = schemamanager.Run( ctx, controller, diff --git a/go/cmd/vtctldclient/command/root.go b/go/cmd/vtctldclient/command/root.go index 0fc39423bc7..b0c0d4b9989 100644 --- a/go/cmd/vtctldclient/command/root.go +++ b/go/cmd/vtctldclient/command/root.go @@ -212,7 +212,7 @@ func getClientForCommand(cmd *cobra.Command) (vtctldclient.VtctldClient, error) return nil }) }) - vtctld := grpcvtctldserver.NewVtctldServer(env, ts) + vtctld := grpcvtctldserver.NewVtctldServer(env, ts, nil) localvtctldclient.SetServer(vtctld) VtctldClientProtocol = "local" server = "" diff --git a/go/vt/throttler/demo/throttler_demo.go b/go/vt/throttler/demo/throttler_demo.go index 15228475bfb..e9fbd8c8bd2 100644 --- a/go/vt/throttler/demo/throttler_demo.go +++ b/go/vt/throttler/demo/throttler_demo.go @@ -118,7 +118,7 @@ type replica struct { func newReplica(env *vtenv.Environment, lagUpdateInterval, degrationInterval, degrationDuration time.Duration, ts *topo.Server) *replica { t := &testing.T{} - wr := wrangler.New(env, logutil.NewConsoleLogger(), ts, tmclient.NewTabletManagerClient()) + wr := wrangler.New(env, logutil.NewConsoleLogger(), ts, tmclient.NewTabletManagerClient(), nil) fakeTablet := testlib.NewFakeTablet(t, wr, "cell1", 0, topodatapb.TabletType_REPLICA, nil, testlib.TabletKeyspaceShard(t, "ks", "-80")) fakeTablet.StartActionLoop(t, wr) diff --git a/go/vt/vtcombo/tablet_map.go b/go/vt/vtcombo/tablet_map.go index 369a8138b5a..6c7181c4333 100644 --- a/go/vt/vtcombo/tablet_map.go +++ b/go/vt/vtcombo/tablet_map.go @@ -188,7 +188,7 @@ func InitTabletMap( }) // iterate through the keyspaces - wr := wrangler.New(env, logutil.NewConsoleLogger(), ts, nil) + wr := wrangler.New(env, logutil.NewConsoleLogger(), ts, nil, nil) var uid uint32 = 1 for _, kpb := range tpb.Keyspaces { var err error diff --git a/go/vt/vtctl/grpcvtctlserver/server.go b/go/vt/vtctl/grpcvtctlserver/server.go index d89f91b2d29..c46665f91f3 100644 --- a/go/vt/vtctl/grpcvtctlserver/server.go +++ b/go/vt/vtctl/grpcvtctlserver/server.go @@ -75,7 +75,7 @@ func (s *VtctlServer) ExecuteVtctlCommand(args *vtctldatapb.ExecuteVtctlCommandR // create the wrangler tmc := tmclient.NewTabletManagerClient() defer tmc.Close() - wr := wrangler.New(s.env, logger, s.ts, tmc) + wr := wrangler.New(s.env, logger, s.ts, tmc, nil) // execute the command return vtctl.RunCommand(stream.Context(), wr, args.Args) diff --git a/go/vt/vtctld/action_repository.go b/go/vt/vtctld/action_repository.go index e0f6c45535a..3ddb200f496 100644 --- a/go/vt/vtctld/action_repository.go +++ b/go/vt/vtctld/action_repository.go @@ -23,8 +23,8 @@ import ( "github.com/spf13/pflag" + "vitess.io/vitess/go/vt/events/eventer" "vitess.io/vitess/go/vt/vtenv" - "vitess.io/vitess/go/acl" "vitess.io/vitess/go/vt/logutil" "vitess.io/vitess/go/vt/servenv" @@ -86,17 +86,19 @@ type ActionRepository struct { shardActions map[string]actionShardMethod tabletActions map[string]actionTabletRecord ts *topo.Server + ev eventer.Eventer } // NewActionRepository creates and returns a new ActionRepository, // with no actions. -func NewActionRepository(env *vtenv.Environment, ts *topo.Server) *ActionRepository { +func NewActionRepository(env *vtenv.Environment, ts *topo.Server, ev eventer.Eventer) *ActionRepository { return &ActionRepository{ env: env, keyspaceActions: make(map[string]actionKeyspaceMethod), shardActions: make(map[string]actionShardMethod), tabletActions: make(map[string]actionTabletRecord), ts: ts, + ev: ev, } } @@ -129,7 +131,7 @@ func (ar *ActionRepository) ApplyKeyspaceAction(ctx context.Context, actionName, } ctx, cancel := context.WithTimeout(ctx, actionTimeout) - wr := wrangler.New(ar.env, logutil.NewConsoleLogger(), ar.ts, tmclient.NewTabletManagerClient()) + wr := wrangler.New(ar.env, logutil.NewConsoleLogger(), ar.ts, tmclient.NewTabletManagerClient(), ar.ev) output, err := action(ctx, wr, keyspace) cancel() if err != nil { @@ -156,7 +158,7 @@ func (ar *ActionRepository) ApplyShardAction(ctx context.Context, actionName, ke } ctx, cancel := context.WithTimeout(ctx, actionTimeout) - wr := wrangler.New(ar.env, logutil.NewConsoleLogger(), ar.ts, tmclient.NewTabletManagerClient()) + wr := wrangler.New(ar.env, logutil.NewConsoleLogger(), ar.ts, tmclient.NewTabletManagerClient(), ar.ev) output, err := action(ctx, wr, keyspace, shard) cancel() if err != nil { @@ -190,7 +192,7 @@ func (ar *ActionRepository) ApplyTabletAction(ctx context.Context, actionName st // run the action ctx, cancel := context.WithTimeout(ctx, actionTimeout) - wr := wrangler.New(ar.env, logutil.NewConsoleLogger(), ar.ts, tmclient.NewTabletManagerClient()) + wr := wrangler.New(ar.env, logutil.NewConsoleLogger(), ar.ts, tmclient.NewTabletManagerClient(), ar.ev) output, err := action.method(ctx, wr, tabletAlias) cancel() if err != nil { diff --git a/go/vt/vtctld/api.go b/go/vt/vtctld/api.go index 0452fce3c3d..7ddf83cca65 100644 --- a/go/vt/vtctld/api.go +++ b/go/vt/vtctld/api.go @@ -30,6 +30,7 @@ import ( "vitess.io/vitess/go/acl" "vitess.io/vitess/go/netutil" + "vitess.io/vitess/go/vt/events/eventer" "vitess.io/vitess/go/vt/log" "vitess.io/vitess/go/vt/logutil" "vitess.io/vitess/go/vt/mysqlctl" @@ -178,7 +179,7 @@ func unmarshalRequest(r *http.Request, v any) error { return json.Unmarshal(data, v) } -func initAPI(ctx context.Context, ts *topo.Server, actions *ActionRepository) { +func initAPI(ctx context.Context, ts *topo.Server, ev eventer.Eventer, actions *ActionRepository) { tabletHealthCache := newTabletHealthCache(ts) tmClient := tmclient.NewTabletManagerClient() @@ -487,7 +488,7 @@ func initAPI(ctx context.Context, ts *topo.Server, actions *ActionRepository) { logstream := logutil.NewMemoryLogger() - wr := wrangler.New(actions.env, logstream, ts, tmClient) + wr := wrangler.New(actions.env, logstream, ts, tmClient, ev) err := vtctl.RunCommand(r.Context(), wr, args) if err != nil { resp.Error = err.Error() @@ -523,7 +524,7 @@ func initAPI(ctx context.Context, ts *topo.Server, actions *ActionRepository) { logger := logutil.NewCallbackLogger(func(ev *logutilpb.Event) { w.Write([]byte(logutil.EventString(ev))) }) - wr := wrangler.New(actions.env, logger, ts, tmClient) + wr := wrangler.New(actions.env, logger, ts, tmClient, ev) apiCallUUID, err := schema.CreateUUID() if err != nil { diff --git a/go/vt/vtctld/vtctld.go b/go/vt/vtctld/vtctld.go index 5ca3908c053..f6aa7c7eb2a 100644 --- a/go/vt/vtctld/vtctld.go +++ b/go/vt/vtctld/vtctld.go @@ -23,12 +23,11 @@ import ( "github.com/spf13/pflag" - "vitess.io/vitess/go/vt/vtenv" - - "vitess.io/vitess/go/vt/servenv" - "vitess.io/vitess/go/acl" + "vitess.io/vitess/go/vt/events/eventer" + "vitess.io/vitess/go/vt/servenv" "vitess.io/vitess/go/vt/topo" + "vitess.io/vitess/go/vt/vtenv" "vitess.io/vitess/go/vt/wrangler" topodatapb "vitess.io/vitess/go/vt/proto/topodata" @@ -50,8 +49,8 @@ func registerVtctldFlags(fs *pflag.FlagSet) { } // InitVtctld initializes all the vtctld functionality. -func InitVtctld(env *vtenv.Environment, ts *topo.Server) error { - actionRepo := NewActionRepository(env, ts) +func InitVtctld(env *vtenv.Environment, ts *topo.Server, ev eventer.Eventer) error { + actionRepo := NewActionRepository(env, ts, ev) // keyspace actions actionRepo.RegisterKeyspaceAction("ValidateKeyspace", @@ -128,7 +127,7 @@ func InitVtctld(env *vtenv.Environment, ts *topo.Server) error { }) // Serve the REST API - initAPI(context.Background(), ts, actionRepo) + initAPI(context.Background(), ts, ev, actionRepo) // Serve the topology endpoint in the REST API at /topodata initExplorer(ts) diff --git a/go/vt/wrangler/reparent.go b/go/vt/wrangler/reparent.go index 1a3a45cf99b..ff7b6254768 100644 --- a/go/vt/wrangler/reparent.go +++ b/go/vt/wrangler/reparent.go @@ -59,7 +59,7 @@ func (wr *Wrangler) InitShardPrimary(ctx context.Context, keyspace, shard string ev := &events.Reparent{} // do the work - err = grpcvtctldserver.NewVtctldServer(wr.env, wr.ts).InitShardPrimaryLocked(ctx, ev, &vtctldatapb.InitShardPrimaryRequest{ + err = grpcvtctldserver.NewVtctldServer(wr.env, wr.ts, wr.ev).InitShardPrimaryLocked(ctx, ev, &vtctldatapb.InitShardPrimaryRequest{ Keyspace: keyspace, Shard: shard, PrimaryElectTabletAlias: primaryElectTabletAlias, diff --git a/go/vt/wrangler/wrangler.go b/go/vt/wrangler/wrangler.go index ee18643cc78..e10eed33b33 100644 --- a/go/vt/wrangler/wrangler.go +++ b/go/vt/wrangler/wrangler.go @@ -24,6 +24,7 @@ import ( "golang.org/x/sync/semaphore" "vitess.io/vitess/go/sqltypes" + "vitess.io/vitess/go/vt/events/eventer" "vitess.io/vitess/go/vt/logutil" "vitess.io/vitess/go/vt/sqlparser" "vitess.io/vitess/go/vt/topo" @@ -54,6 +55,7 @@ type Wrangler struct { ts *topo.Server tmc tmclient.TabletManagerClient vtctld vtctlservicepb.VtctldServer + ev eventer.Eventer sourceTs *topo.Server // VExecFunc is a test-only fixture that allows us to short circuit vexec commands. // DO NOT USE in production code. @@ -64,26 +66,28 @@ type Wrangler struct { } // New creates a new Wrangler object. -func New(env *vtenv.Environment, logger logutil.Logger, ts *topo.Server, tmc tmclient.TabletManagerClient) *Wrangler { +func New(env *vtenv.Environment, logger logutil.Logger, ts *topo.Server, tmc tmclient.TabletManagerClient, ev eventer.Eventer) *Wrangler { return &Wrangler{ env: env, logger: logger, ts: ts, tmc: tmc, - vtctld: grpcvtctldserver.NewVtctldServer(env, ts), + vtctld: grpcvtctldserver.NewVtctldServer(env, ts, ev), + ev: ev, sourceTs: ts, } } // NewTestWrangler creates a new Wrangler object for use in tests. This should NOT be used // in production. -func NewTestWrangler(logger logutil.Logger, ts *topo.Server, tmc tmclient.TabletManagerClient) *Wrangler { +func NewTestWrangler(logger logutil.Logger, ts *topo.Server, tmc tmclient.TabletManagerClient, ev eventer.Eventer) *Wrangler { return &Wrangler{ env: vtenv.NewTestEnv(), logger: logger, ts: ts, tmc: tmc, - vtctld: grpcvtctldserver.NewTestVtctldServer(ts, tmc), + vtctld: grpcvtctldserver.NewTestVtctldServer(ts, tmc, ev), + ev: ev, sourceTs: ts, } }