diff --git a/docs/micro-service.md b/docs/micro-service.md new file mode 100644 index 0000000..c14b946 --- /dev/null +++ b/docs/micro-service.md @@ -0,0 +1,16 @@ +# TiDB2DW Micro-Service + +## Goals + +- TiDB2DW supports multiple replication streams for multiple upstream TiDB clusters. +- TiDB2DW has high availability, can tolerante single node failure. + +## Design + +### TiDB2DW Owner + +The Owner maintains replication tasks meta information, and monitor the health of workers. There is only one active owner at any point of time, if the active owner failed, the standby owner will be the new active owner. + +### TiDB2DW Worker + +The workers handle replication tasks, they will send heartbeat to the active owner periodically. If one worker is failed or isolated with the active owner, tasks on this workers will be scheduled to other workers. \ No newline at end of file diff --git a/etc/owner.toml b/etc/owner.toml new file mode 100644 index 0000000..804670d --- /dev/null +++ b/etc/owner.toml @@ -0,0 +1,20 @@ + +## Server log level, "debug", "info", "warn", "error" are supportted. +# log-level = "info" + +# host = "127.0.0.1" +# port = 20891 +## Owner lease duration, in seconds. +# lease-duration = 10 +## Owner lease renew interval, in seconds. +# lease-renew-interval = 5 + +[metadb] +## metadb information +# host = "127.0.0.1" +# port = 3306 +# user = "tidb2dw" +# password = "tidb2dw" +# db = "tidb2dw" +# max-open-conns = 2 +# max-idle-conns = 1 diff --git a/go.mod b/go.mod index 4a267c9..0e306fc 100644 --- a/go.mod +++ b/go.mod @@ -334,7 +334,7 @@ require ( golang.org/x/crypto v0.17.0 // indirect golang.org/x/net v0.19.0 // indirect golang.org/x/oauth2 v0.15.0 // indirect - golang.org/x/sync v0.5.0 // indirect + golang.org/x/sync v0.5.0 golang.org/x/sys v0.15.0 // indirect golang.org/x/term v0.15.0 // indirect golang.org/x/text v0.14.0 // indirect @@ -343,7 +343,7 @@ require ( golang.org/x/xerrors v0.0.0-20220907171357-04be3eba64a2 // indirect google.golang.org/appengine v1.6.8 // indirect google.golang.org/genproto v0.0.0-20231212172506-995d672761c0 // indirect - google.golang.org/grpc v1.60.1 // indirect + google.golang.org/grpc v1.60.1 google.golang.org/protobuf v1.32.0 // indirect gopkg.in/natefinch/lumberjack.v2 v2.2.1 // indirect gopkg.in/yaml.v2 v2.4.0 // indirect diff --git a/pkg/owner/compaign.go b/pkg/owner/compaign.go new file mode 100644 index 0000000..5c7b4d3 --- /dev/null +++ b/pkg/owner/compaign.go @@ -0,0 +1,69 @@ +package owner + +import ( + "context" + "fmt" + "sync/atomic" + "time" + + "github.com/pingcap-inc/tidb2dw/pkg/owner/config" + "github.com/pingcap-inc/tidb2dw/pkg/owner/meta" + "golang.org/x/sync/errgroup" +) + +type Campaign struct { + cfg *config.OwnerConfig + + backend meta.Backend + + isOwner atomic.Bool +} + +func NewCampaign(cfg *config.OwnerConfig, backend meta.Backend) *Campaign { + return &Campaign{ + cfg: cfg, + backend: backend, + isOwner: atomic.Bool{}, + } +} + +func (c *Campaign) Start(eg *errgroup.Group, ctx context.Context) { + eg.Go(func() error { + serverId := fmt.Sprintf("%s:%d", c.cfg.Host, c.cfg.Port) + + ticker := time.NewTicker(time.Duration(c.cfg.LeaseRenewInterval) * time.Second) + defer ticker.Stop() + for { + select { + case <-ticker.C: + if c.IsOwner() { + success, err := c.backend.RenewOwnerLease(serverId, c.cfg.LeaseDuration) + if err != nil { + // log error + continue + } + if !success { + // Someone else has taken the owner + c.isOwner.Store(false) + } + } else { + // Try to campaign owner + success, err := c.backend.TryCampaignOwner(serverId, c.cfg.LeaseDuration) + if err != nil { + // log error + continue + } + if success { + c.isOwner.Store(true) + } + } + case <-ctx.Done(): + return nil + } + } + }) +} + +func (c *Campaign) IsOwner() bool { + return c.isOwner.Load() +} diff --git a/pkg/owner/config/config.go b/pkg/owner/config/config.go new file mode 100644 index 0000000..f70aa55 --- /dev/null +++ b/pkg/owner/config/config.go @@ -0,0 +1,31 @@ +package config + +type OwnerConfig struct { + // Log Level + LogLevel string + // Server Address + Host string + // Server Port + Port int + + MetaDB MetaDBConfig + + // Owner Lease Duration + LeaseDuration int + // Owner Lease Renew Interval + LeaseRenewInterval int +} + +type MetaDBConfig struct { + // Connect to the metaDB + Host string + Port int + Db string + User string + Password string + + // Max Open Connections + MaxOpenConns int + // Max Idle Connections + MaxIdleConns int +} diff --git a/pkg/owner/grpc_server.go b/pkg/owner/grpc_server.go new file mode 100644 index 0000000..d3484b8 --- /dev/null +++ b/pkg/owner/grpc_server.go @@ -0,0 +1,61 @@ +package owner + +import ( + "context" + "fmt" + "net" + + "golang.org/x/sync/errgroup" + "google.golang.org/grpc" +) + +// GrpcServer is the generic tcp server which provide protobuf rpc service. +type GrpcServer struct { + // The server address and port. + addr string + port int + + // The listener to listen to the server. + listener *net.Listener + + // The internal grpc server. + InternalServer *grpc.Server +} + +// NewGrpcServer creates a new TcpServer. +func NewGrpcServer(addr string, port int) *GrpcServer { + return &GrpcServer{ + addr: addr, + port: port, + InternalServer: grpc.NewServer(), + } +} + +// Start starts the server. +func (s *GrpcServer) Start(eg *errgroup.Group, _ctx context.Context) error { + // Start to listen to the server. + addr := fmt.Sprintf("%s:%d", s.addr, s.port) + lis, err := net.Listen("tcp", addr) + if err != nil { + return err + } + + s.listener = &lis + + // Start the server. + eg.Go(func() error { + if err := s.InternalServer.Serve(*s.listener); err != nil { + return err + } + return nil + }) + + return nil +} + +// Stop stops the server. +func (s *GrpcServer) Stop() error { + s.InternalServer.Stop() + (*s.listener).Close() + return nil +} diff --git a/pkg/owner/meta/backend.go b/pkg/owner/meta/backend.go new file mode 100644 index 0000000..722e87f --- /dev/null +++ b/pkg/owner/meta/backend.go @@ -0,0 +1,126 @@ +package meta + +import ( + "database/sql" + "fmt" + + _ "github.com/go-sql-driver/mysql" + "github.com/pingcap-inc/tidb2dw/pkg/owner/config" +) + +type Backend interface { + // Bootstrap initializes the schema of the table. + Bootstrap() error + + // GetOwner returns the active owner of the tidb2dw service. + GetOwner() (string, error) + + // TryCampaignOwner tries to campaign the owner of the tidb2dw service. + TryCampaignOwner(who string, leaseDurSec int) (bool, error) + + // RenewOwnerLease renews the lease of the owner of the tidb2dw service. + RenewOwnerLease(who string, leaseDurSec int) (bool, error) +} + +// RdsBackend is the backend implementation for MySQL RDS. +type RdsBackend struct { + cfg *config.MetaDBConfig + + db *sql.DB +} + +// NewRdsBackend creates a new RdsBackend instance. +func NewRdsBackend(cfg *config.MetaDBConfig) (*RdsBackend, error) { + dbUrl := fmt.Sprintf("%s:%s@tcp(%s:%d)/%s?charset=utf8mb4", cfg.User, cfg.Password, cfg.Host, cfg.Port, cfg.Db) + db, err := sql.Open("mysql", dbUrl) + if err != nil { + return nil, err + } + + db.SetMaxOpenConns(cfg.MaxOpenConns) + db.SetMaxIdleConns(cfg.MaxIdleConns) + + return &RdsBackend{ + cfg: cfg, + db: db, + }, nil +} + +// Bootstrap initializes the schema of the table. +func (b *RdsBackend) Bootstrap() error { + // Create the owner table if not exists. + // owner { + // id INT PRIMARY KEY, + // who VARCHAR(255), + // lease_expire TIMESTAMP, + // } + // There is just one row in the owner table which id is 1. + _, err := b.db.Exec("CREATE TABLE IF NOT EXISTS owner (id INT PRIMARY KEY, who VARCHAR(255), lease_expire TIMESTAMP)") + if err != nil { + return err + } + + // Create the task table if not exists. + // task { + // id INT PRIMARY KEY, + // src VARCHAR(255), # srouce storage address + // src_bucket VARCHAR(255), # source storage bucket + // src_key VARCHAR(255), # source storage key + // dst VARCHAR(255), + // created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP, + // updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP), + // unique key (src, dst), + // } + _, err = b.db.Exec("CREATE TABLE IF NOT EXISTS task (id INT PRIMARY KEY, src VARCHAR(255)," + + "src_bucket VARCHAR(255), src_key VARCHAR(255), dst VARCHAR(255)," + + "created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP," + + "updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP," + + "UNIQUE KEY (src, dst))") + if err != nil { + return err + } + + // Insert the initial owner row. + _, err = b.db.Exec("INSERT INTO owner (id, who, lease_expire) VALUES (1, '', '1970-01-01 00:00:00')") + if err != nil { + return err + } + + return nil +} + +// GetOwner returns the active owner of the tidb2dw service. +func (b *RdsBackend) GetOwner() (string, error) { + var who string + err := b.db.QueryRow("SELECT who FROM owner WHERE id = 1").Scan(&who) + if err != nil { + return "", err + } + return who, nil +} + +// TryCampaignOwner tries to campaign the owner of the tidb2dw service. +func (b *RdsBackend) TryCampaignOwner(who string, leaseDurSec int) (bool, error) { + res, err := b.db.Exec("UPDATE owner SET who = ?, lease_expire = DATE_ADD(NOW(), INTERVAL ? SECOND) WHERE id = 1 AND lease_expire < NOW()", who, leaseDurSec) + if err != nil { + return false, err + } + affected, err := res.RowsAffected() + if err != nil { + return false, err + } + return affected > 0, nil +} + +// RenewOwnerLease renews the lease of the owner of the tidb2dw service. +func (b *RdsBackend) RenewOwnerLease(who string, leaseDurSec int) (bool, error) { + res, err := b.db.Exec("UPDATE owner SET lease_expire = DATE_ADD(NOW(), INTERVAL ? SECOND) WHERE id = 1 AND who = ?", leaseDurSec, who) + if err != nil { + return false, err + } + affected, err := res.RowsAffected() + if err != nil { + return false, err + } + return affected > 0, nil +} diff --git a/pkg/owner/server.go b/pkg/owner/server.go new file mode 100644 index 0000000..061bf48 --- /dev/null +++ b/pkg/owner/server.go @@ -0,0 +1,79 @@ +package owner + +import ( + "context" + + "github.com/pingcap-inc/tidb2dw/pkg/owner/config" + "github.com/pingcap-inc/tidb2dw/pkg/owner/meta" + "golang.org/x/sync/errgroup" +) + +// Server is the owner server. +type Server struct { + cfg *config.OwnerConfig + + // gRPC server + grpcServer *GrpcServer + + // backend + backend meta.Backend + campaign *Campaign + + // errgroup and context + eg *errgroup.Group + ctx context.Context + cancel context.CancelFunc +} + +func NewServer(cfg *config.OwnerConfig) (*Server, error) { + grpcServer := NewGrpcServer(cfg.Host, cfg.Port) + backend, err := meta.NewRdsBackend(&cfg.MetaDB) + if err != nil { + return nil, err + } + campaign := NewCampaign(cfg, backend) + + ctx, cancel := context.WithCancel(context.Background()) + eg, ctx := errgroup.WithContext(ctx) + return &Server{ + cfg: cfg, + grpcServer: grpcServer, + backend: backend, + campaign: campaign, + eg: eg, + ctx: ctx, + cancel: cancel, + }, nil +} + +// Prepare bootstraps the backend and starts the owner campaign. +func (s *Server) Prepare() error { + // Initialize the backend meta schema. + if err := s.backend.Bootstrap(); err != nil { + return err + } + + // Start to campaign the owner. + s.campaign.Start(s.eg, s.ctx) + + // Todo: + // Register the owner service to the gRPC server. + + return nil +} + +// Start starts the owner server. +func (s *Server) Start() error { + return s.grpcServer.Start(s.eg, s.ctx) +} + +// Stop stops the owner server. +func (s *Server) Stop() error { + s.cancel() + return s.grpcServer.Stop() +} + +// Wait waits for the owner server to stop. +func (s *Server) Wait() error { + return s.eg.Wait() +}