Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

micro-service: init owner service #104

Open
wants to merge 2 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 16 additions & 0 deletions docs/micro-service.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
# TiDB2DW Micro-Service

## Goals

- TiDB2DW supports multiple replication streams for multiple upstream TiDB clusters.
- TiDB2DW has high availability, can tolerante single node failure.

## Design

### TiDB2DW Owner

The Owner maintains replication tasks meta information, and monitor the health of workers. There is only one active owner at any point of time, if the active owner failed, the standby owner will be the new active owner.

### TiDB2DW Worker

The workers handle replication tasks, they will send heartbeat to the active owner periodically. If one worker is failed or isolated with the active owner, tasks on this workers will be scheduled to other workers.
20 changes: 20 additions & 0 deletions etc/owner.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@

## Server log level, "debug", "info", "warn", "error" are supportted.
# log-level = "info"

# host = "127.0.0.1"
# port = 20891
## Owner lease duration, in seconds.
# lease-duration = 10
## Owner lease renew interval, in seconds.
# lease-renew-interval = 5

[metadb]
## metadb information
# host = "127.0.0.1"
# port = 3306
# user = "tidb2dw"
# password = "tidb2dw"
# db = "tidb2dw"
# max-open-conns = 2
# max-idle-conns = 1
4 changes: 2 additions & 2 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -334,7 +334,7 @@ require (
golang.org/x/crypto v0.17.0 // indirect
golang.org/x/net v0.19.0 // indirect
golang.org/x/oauth2 v0.15.0 // indirect
golang.org/x/sync v0.5.0 // indirect
golang.org/x/sync v0.5.0
golang.org/x/sys v0.15.0 // indirect
golang.org/x/term v0.15.0 // indirect
golang.org/x/text v0.14.0 // indirect
Expand All @@ -343,7 +343,7 @@ require (
golang.org/x/xerrors v0.0.0-20220907171357-04be3eba64a2 // indirect
google.golang.org/appengine v1.6.8 // indirect
google.golang.org/genproto v0.0.0-20231212172506-995d672761c0 // indirect
google.golang.org/grpc v1.60.1 // indirect
google.golang.org/grpc v1.60.1
google.golang.org/protobuf v1.32.0 // indirect
gopkg.in/natefinch/lumberjack.v2 v2.2.1 // indirect
gopkg.in/yaml.v2 v2.4.0 // indirect
Expand Down
69 changes: 69 additions & 0 deletions pkg/owner/compaign.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,69 @@
package owner

import (
"context"
"fmt"
"sync/atomic"
"time"

"github.com/pingcap-inc/tidb2dw/pkg/owner/config"
"github.com/pingcap-inc/tidb2dw/pkg/owner/meta"
"golang.org/x/sync/errgroup"
)

type Campaign struct {
cfg *config.OwnerConfig

backend meta.Backend

isOwner atomic.Bool
}

func NewCampaign(cfg *config.OwnerConfig, backend meta.Backend) *Campaign {
return &Campaign{
cfg: cfg,
backend: backend,
isOwner: atomic.Bool{},
}
}

func (c *Campaign) Start(eg *errgroup.Group, ctx context.Context) {
eg.Go(func() error {
serverId := fmt.Sprintf("%s:%d", c.cfg.Host, c.cfg.Port)

ticker := time.NewTicker(time.Duration(c.cfg.LeaseRenewInterval) * time.Second)
defer ticker.Stop()
for {
select {
case <-ticker.C:
if c.IsOwner() {
success, err := c.backend.RenewOwnerLease(serverId, c.cfg.LeaseDuration)
if err != nil {
// log error
continue
}
if !success {
// Someone else has taken the owner
c.isOwner.Store(false)
}
} else {
// Try to campaign owner
success, err := c.backend.TryCampaignOwner(serverId, c.cfg.LeaseDuration)
if err != nil {
// log error
continue
}
if success {
c.isOwner.Store(true)
}
}
case <-ctx.Done():
return nil
}
}
})
}

func (c *Campaign) IsOwner() bool {
return c.isOwner.Load()
}
31 changes: 31 additions & 0 deletions pkg/owner/config/config.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
package config

type OwnerConfig struct {
// Log Level
LogLevel string
// Server Address
Host string
// Server Port
Port int

MetaDB MetaDBConfig

// Owner Lease Duration
LeaseDuration int
// Owner Lease Renew Interval
LeaseRenewInterval int
}

type MetaDBConfig struct {
// Connect to the metaDB
Host string
Port int
Db string
User string
Password string

// Max Open Connections
MaxOpenConns int
// Max Idle Connections
MaxIdleConns int
}
61 changes: 61 additions & 0 deletions pkg/owner/grpc_server.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,61 @@
package owner

import (
"context"
"fmt"
"net"

"golang.org/x/sync/errgroup"
"google.golang.org/grpc"
)

// GrpcServer is the generic tcp server which provide protobuf rpc service.
type GrpcServer struct {
// The server address and port.
addr string
port int

// The listener to listen to the server.
listener *net.Listener

// The internal grpc server.
InternalServer *grpc.Server
}

// NewGrpcServer creates a new TcpServer.
func NewGrpcServer(addr string, port int) *GrpcServer {
return &GrpcServer{
addr: addr,
port: port,
InternalServer: grpc.NewServer(),
}
}

// Start starts the server.
func (s *GrpcServer) Start(eg *errgroup.Group, _ctx context.Context) error {
// Start to listen to the server.
addr := fmt.Sprintf("%s:%d", s.addr, s.port)
lis, err := net.Listen("tcp", addr)
if err != nil {
return err
}

s.listener = &lis

// Start the server.
eg.Go(func() error {
if err := s.InternalServer.Serve(*s.listener); err != nil {
return err
}
return nil
})

return nil
}

// Stop stops the server.
func (s *GrpcServer) Stop() error {
s.InternalServer.Stop()
(*s.listener).Close()
return nil
}
126 changes: 126 additions & 0 deletions pkg/owner/meta/backend.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,126 @@
package meta

import (
"database/sql"
"fmt"

_ "github.com/go-sql-driver/mysql"
"github.com/pingcap-inc/tidb2dw/pkg/owner/config"
)

type Backend interface {
// Bootstrap initializes the schema of the table.
Bootstrap() error

// GetOwner returns the active owner of the tidb2dw service.
GetOwner() (string, error)

// TryCampaignOwner tries to campaign the owner of the tidb2dw service.
TryCampaignOwner(who string, leaseDurSec int) (bool, error)

// RenewOwnerLease renews the lease of the owner of the tidb2dw service.
RenewOwnerLease(who string, leaseDurSec int) (bool, error)
}

// RdsBackend is the backend implementation for MySQL RDS.
type RdsBackend struct {
cfg *config.MetaDBConfig

db *sql.DB
}

// NewRdsBackend creates a new RdsBackend instance.
func NewRdsBackend(cfg *config.MetaDBConfig) (*RdsBackend, error) {
dbUrl := fmt.Sprintf("%s:%s@tcp(%s:%d)/%s?charset=utf8mb4", cfg.User, cfg.Password, cfg.Host, cfg.Port, cfg.Db)
db, err := sql.Open("mysql", dbUrl)
if err != nil {
return nil, err
}

db.SetMaxOpenConns(cfg.MaxOpenConns)
db.SetMaxIdleConns(cfg.MaxIdleConns)

return &RdsBackend{
cfg: cfg,
db: db,
}, nil
}

// Bootstrap initializes the schema of the table.
func (b *RdsBackend) Bootstrap() error {
// Create the owner table if not exists.
// owner {
// id INT PRIMARY KEY,
// who VARCHAR(255),
// lease_expire TIMESTAMP,
// }
// There is just one row in the owner table which id is 1.
_, err := b.db.Exec("CREATE TABLE IF NOT EXISTS owner (id INT PRIMARY KEY, who VARCHAR(255), lease_expire TIMESTAMP)")
if err != nil {
return err
}

// Create the task table if not exists.
// task {
// id INT PRIMARY KEY,
// src VARCHAR(255), # srouce storage address
// src_bucket VARCHAR(255), # source storage bucket
// src_key VARCHAR(255), # source storage key
// dst VARCHAR(255),
// created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
// updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP),
// unique key (src, dst),
// }
_, err = b.db.Exec("CREATE TABLE IF NOT EXISTS task (id INT PRIMARY KEY, src VARCHAR(255)," +
"src_bucket VARCHAR(255), src_key VARCHAR(255), dst VARCHAR(255)," +
"created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP," +
"updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP," +
"UNIQUE KEY (src, dst))")
if err != nil {
return err
}

// Insert the initial owner row.
_, err = b.db.Exec("INSERT INTO owner (id, who, lease_expire) VALUES (1, '', '1970-01-01 00:00:00')")
if err != nil {
return err
}

return nil
}

// GetOwner returns the active owner of the tidb2dw service.
func (b *RdsBackend) GetOwner() (string, error) {
var who string
err := b.db.QueryRow("SELECT who FROM owner WHERE id = 1").Scan(&who)
if err != nil {
return "", err
}
return who, nil
}

// TryCampaignOwner tries to campaign the owner of the tidb2dw service.
func (b *RdsBackend) TryCampaignOwner(who string, leaseDurSec int) (bool, error) {
res, err := b.db.Exec("UPDATE owner SET who = ?, lease_expire = DATE_ADD(NOW(), INTERVAL ? SECOND) WHERE id = 1 AND lease_expire < NOW()", who, leaseDurSec)
if err != nil {
return false, err
}
affected, err := res.RowsAffected()
if err != nil {
return false, err
}
return affected > 0, nil
}

// RenewOwnerLease renews the lease of the owner of the tidb2dw service.
func (b *RdsBackend) RenewOwnerLease(who string, leaseDurSec int) (bool, error) {
res, err := b.db.Exec("UPDATE owner SET lease_expire = DATE_ADD(NOW(), INTERVAL ? SECOND) WHERE id = 1 AND who = ?", leaseDurSec, who)
if err != nil {
return false, err
}
affected, err := res.RowsAffected()
if err != nil {
return false, err
}
return affected > 0, nil
}
Loading
Loading