Skip to content

Commit 0dd2e28

Browse files
djshow832sre-bot
authored andcommitted
drainer: support reading from relay log (pingcap#849)
1 parent 094482d commit 0dd2e28

File tree

6 files changed

+342
-6
lines changed

6 files changed

+342
-6
lines changed

cmd/drainer/drainer.toml

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -68,6 +68,8 @@ ignore-schemas = "INFORMATION_SCHEMA,PERFORMANCE_SCHEMA,mysql"
6868
relay-log-dir = ""
6969
# max file size of each relay log
7070
relay-log-size = 10485760
71+
# read buffer size of relay log
72+
relay-read-buf-size = 8
7173

7274
#[[syncer.replicate-do-table]]
7375
#db-name ="test"

drainer/config.go

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -72,6 +72,7 @@ type SyncerConfig struct {
7272
DestDBType string `toml:"db-type" json:"db-type"`
7373
RelayLogDir string `toml:"relay-log-dir" json:"relay-log-dir"`
7474
RelayLogSize int64 `toml:"relay-log-size" json:"relay-log-size"`
75+
RelayReadBufSize int `toml:"relay-read-buf-size" json:"relay-read-buf-size"`
7576
EnableDispatch bool `toml:"enable-dispatch" json:"enable-dispatch"`
7677
SafeMode bool `toml:"safe-mode" json:"safe-mode"`
7778
EnableCausality bool `toml:"enable-detect" json:"enable-detect"`
@@ -133,6 +134,7 @@ func NewConfig() *Config {
133134
fs.StringVar(&cfg.SyncerCfg.DestDBType, "dest-db-type", "mysql", "target db type: mysql or tidb or file or kafka; see syncer section in conf/drainer.toml")
134135
fs.StringVar(&cfg.SyncerCfg.RelayLogDir, "relay-log-dir", "", "path to relay log of syncer")
135136
fs.Int64Var(&cfg.SyncerCfg.RelayLogSize, "relay-log-size", 10*1024*1024, "max file size of each relay log")
137+
fs.IntVar(&cfg.SyncerCfg.RelayReadBufSize, "relay-read-buf-size", 8, "read buffer size of relay log")
136138
fs.BoolVar(&cfg.SyncerCfg.EnableDispatch, "enable-dispatch", true, "enable dispatching sqls that in one same binlog; if set true, work-count and txn-batch would be useless")
137139
fs.BoolVar(&cfg.SyncerCfg.SafeMode, "safe-mode", false, "enable safe mode to make syncer reentrant")
138140
fs.BoolVar(&cfg.SyncerCfg.EnableCausality, "enable-detect", false, "enable detect causality")

drainer/relay/reader.go

Lines changed: 137 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,137 @@
1+
// Copyright 2019 PingCAP, Inc.
2+
//
3+
// Licensed under the Apache License, Version 2.0 (the "License");
4+
// you may not use this file except in compliance with the License.
5+
// You may obtain a copy of the License at
6+
//
7+
// http://www.apache.org/licenses/LICENSE-2.0
8+
//
9+
// Unless required by applicable law or agreed to in writing, software
10+
// distributed under the License is distributed on an "AS IS" BASIS,
11+
// See the License for the specific language governing permissions and
12+
// limitations under the License.
13+
14+
package relay
15+
16+
import (
17+
"context"
18+
19+
"github.com/pingcap/errors"
20+
"github.com/pingcap/log"
21+
"github.com/pingcap/tidb-binlog/pkg/binlogfile"
22+
"github.com/pingcap/tidb-binlog/pkg/loader"
23+
obinlog "github.com/pingcap/tidb-tools/tidb-binlog/slave_binlog_proto/go-binlog"
24+
"github.com/pingcap/tipb/go-binlog"
25+
)
26+
27+
var _ Reader = &reader{}
28+
29+
// Reader is the interface for reading relay log.
30+
type Reader interface {
31+
// Run reads relay log.
32+
Run() context.CancelFunc
33+
34+
// Txns returns parsed transactions.
35+
Txns() <-chan *loader.Txn
36+
37+
// Close releases resources.
38+
Close() error
39+
40+
// Error returns error occurs in reading.
41+
Error() <-chan error
42+
}
43+
44+
type reader struct {
45+
binlogger binlogfile.Binlogger
46+
txns chan *loader.Txn
47+
err chan error
48+
}
49+
50+
// NewReader creates a relay reader.
51+
func NewReader(dir string, readBufferSize int) (Reader, error) {
52+
binlogger, err := binlogfile.OpenBinlogger(dir, binlogfile.SegmentSizeBytes)
53+
if err != nil {
54+
return nil, errors.Trace(err)
55+
}
56+
57+
return &reader{
58+
binlogger: binlogger,
59+
txns: make(chan *loader.Txn, readBufferSize),
60+
}, nil
61+
}
62+
63+
// Run implements Reader interface.
64+
func (r *reader) Run() context.CancelFunc {
65+
r.err = make(chan error, 1)
66+
ctx, cancel := context.WithCancel(context.Background())
67+
binlogChan, binlogErr := r.binlogger.ReadAll(ctx)
68+
69+
go func(ctx context.Context) {
70+
var err error
71+
for err == nil {
72+
var blg *binlog.Entity
73+
select {
74+
case <-ctx.Done():
75+
err = ctx.Err()
76+
log.Warn("Reading relay log is interrupted")
77+
case blg = <-binlogChan:
78+
}
79+
if blg == nil {
80+
break
81+
}
82+
83+
slaveBinlog := new(obinlog.Binlog)
84+
if err = slaveBinlog.Unmarshal(blg.Payload); err != nil {
85+
break
86+
}
87+
88+
var txn *loader.Txn
89+
txn, err = loader.SlaveBinlogToTxn(slaveBinlog)
90+
if err != nil {
91+
break
92+
}
93+
select {
94+
case <-ctx.Done():
95+
err = ctx.Err()
96+
log.Warn("Producing transaction is interrupted")
97+
case r.txns <- txn:
98+
}
99+
}
100+
// If binlogger is not done, notify it to stop.
101+
cancel()
102+
close(r.txns)
103+
104+
if err == nil {
105+
err = <-binlogErr
106+
}
107+
if err != nil {
108+
r.err <- err
109+
}
110+
close(r.err)
111+
}(ctx)
112+
113+
return cancel
114+
}
115+
116+
// Txns implements Reader interface.
117+
func (r *reader) Txns() <-chan *loader.Txn {
118+
return r.txns
119+
}
120+
121+
// Error implements Reader interface.
122+
func (r *reader) Error() <-chan error {
123+
return r.err
124+
}
125+
126+
// Close implements Reader interface.
127+
func (r *reader) Close() error {
128+
var err error
129+
// If it's reading, wait until it's finished.
130+
if r.err != nil {
131+
err = <-r.err
132+
}
133+
if closeBinloggerErr := r.binlogger.Close(); err == nil {
134+
err = closeBinloggerErr
135+
}
136+
return errors.Trace(err)
137+
}

drainer/relay/reader_test.go

Lines changed: 151 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,151 @@
1+
// Copyright 2019 PingCAP, Inc.
2+
//
3+
// Licensed under the Apache License, Version 2.0 (the "License");
4+
// you may not use this file except in compliance with the License.
5+
// You may obtain a copy of the License at
6+
//
7+
// http://www.apache.org/licenses/LICENSE-2.0
8+
//
9+
// Unless required by applicable law or agreed to in writing, software
10+
// distributed under the License is distributed on an "AS IS" BASIS,
11+
// See the License for the specific language governing permissions and
12+
// limitations under the License.
13+
14+
package relay
15+
16+
import (
17+
"os"
18+
"path"
19+
20+
. "github.com/pingcap/check"
21+
"github.com/pingcap/tidb-binlog/drainer/translator"
22+
"github.com/pingcap/tidb-binlog/pkg/binlogfile"
23+
"github.com/pingcap/tidb-binlog/pkg/file"
24+
"github.com/pingcap/tidb-binlog/pkg/loader"
25+
)
26+
27+
var _ = Suite(&testReaderSuite{})
28+
29+
type testReaderSuite struct {
30+
translator.BinlogGenerator
31+
}
32+
33+
func (r *testReaderSuite) TestCreate(c *C) {
34+
_, err := NewReader("", 8)
35+
c.Assert(err, NotNil)
36+
37+
_, err = NewReader("/", 8)
38+
c.Assert(err, NotNil)
39+
40+
dir := c.MkDir()
41+
relayReader, err := NewReader(dir, 8)
42+
c.Assert(err, IsNil)
43+
err = relayReader.Close()
44+
c.Assert(err, IsNil)
45+
}
46+
47+
func (r *testReaderSuite) TestReadBinlog(c *C) {
48+
dir := c.MkDir()
49+
r.SetDDL()
50+
r.writeBinlog(c, dir)
51+
txn := r.readBinlogAndCheck(c, dir, 1)
52+
c.Assert(r.Table, Equals, txn.DDL.Table)
53+
c.Assert(r.Schema, Equals, txn.DDL.Database)
54+
55+
r.SetInsert(c)
56+
r.writeBinlog(c, dir)
57+
txn = r.readBinlogAndCheck(c, dir, 2)
58+
c.Assert(txn.DMLs[0].Tp, Equals, loader.InsertDMLType)
59+
60+
r.SetUpdate(c)
61+
r.writeBinlog(c, dir)
62+
txn = r.readBinlogAndCheck(c, dir, 3)
63+
c.Assert(txn.DMLs[0].Tp, Equals, loader.UpdateDMLType)
64+
65+
r.SetDelete(c)
66+
r.writeBinlog(c, dir)
67+
txn = r.readBinlogAndCheck(c, dir, 4)
68+
c.Assert(txn.DMLs[0].Tp, Equals, loader.DeleteDMLType)
69+
}
70+
71+
func (r *testReaderSuite) writeBinlog(c *C, dir string) {
72+
relayer, err := NewRelayer(dir, binlogfile.SegmentSizeBytes, r)
73+
c.Assert(relayer, NotNil)
74+
c.Assert(err, IsNil)
75+
defer func() { c.Assert(relayer.Close(), IsNil) }()
76+
77+
_, err = relayer.WriteBinlog(r.Schema, r.Table, r.TiBinlog, r.PV)
78+
c.Assert(err, IsNil)
79+
}
80+
81+
func (r *testReaderSuite) readBinlogAndCheck(c *C, dir string, expectedNumber int) *loader.Txn {
82+
relayReader, err := NewReader(dir, 8)
83+
c.Assert(relayReader, NotNil)
84+
c.Assert(err, IsNil)
85+
defer func() { c.Assert(relayReader.Close(), IsNil) }()
86+
87+
relayReader.Run()
88+
89+
var lastTxn *loader.Txn
90+
number := 0
91+
for txn := range relayReader.Txns() {
92+
number++
93+
lastTxn = txn
94+
}
95+
c.Assert(<-relayReader.Error(), IsNil)
96+
c.Assert(number, Equals, expectedNumber)
97+
return lastTxn
98+
}
99+
100+
func (r *testReaderSuite) TestReadBinlogError(c *C) {
101+
dir := c.MkDir()
102+
r.SetDDL()
103+
r.writeBinlog(c, dir)
104+
105+
// Set the file mode to 0100
106+
names, err := binlogfile.ReadBinlogNames(dir)
107+
c.Assert(err, IsNil)
108+
c.Assert(names, HasLen, 1)
109+
fpath := path.Join(dir, names[0])
110+
f, err := os.OpenFile(fpath, os.O_WRONLY, file.PrivateFileMode)
111+
c.Assert(err, IsNil)
112+
c.Assert(f.Chmod(0222), IsNil)
113+
c.Assert(f.Close(), IsNil)
114+
115+
relayReader, err := NewReader(dir, 8)
116+
c.Assert(err, IsNil)
117+
relayReader.Run()
118+
c.Assert(<-relayReader.Error(), ErrorMatches, "*permission denied*")
119+
c.Assert(relayReader.Close(), IsNil)
120+
121+
// Append some invalid data to the file.
122+
f, err = os.OpenFile(fpath, os.O_WRONLY, 0222)
123+
c.Assert(err, IsNil)
124+
c.Assert(f.Chmod(file.PrivateFileMode), IsNil)
125+
_, err = f.WriteString("test")
126+
c.Assert(err, IsNil)
127+
c.Assert(f.Close(), IsNil)
128+
129+
relayReader, err = NewReader(dir, 8)
130+
c.Assert(err, IsNil)
131+
relayReader.Run()
132+
// It's recovered by binlogger.
133+
c.Assert(<-relayReader.Error(), IsNil)
134+
c.Assert(relayReader.Close(), IsNil)
135+
}
136+
137+
func (r *testReaderSuite) TestCancelRead(c *C) {
138+
dir := c.MkDir()
139+
140+
r.SetInsert(c)
141+
for i := 0; i < 1000; i++ {
142+
r.writeBinlog(c, dir)
143+
}
144+
145+
relayReader, err := NewReader(dir, 8)
146+
defer func() { c.Assert(relayReader.Close(), IsNil) }()
147+
c.Assert(err, IsNil)
148+
cancelFunc := relayReader.Run()
149+
cancelFunc()
150+
c.Assert(<-relayReader.Error(), ErrorMatches, "context canceled")
151+
}

drainer/relay/relayer_test.go

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -47,33 +47,33 @@ func (r *testRelayerSuite) TestCreate(c *C) {
4747

4848
func (r *testRelayerSuite) TestWriteBinlog(c *C) {
4949
dir := c.MkDir()
50-
relayer, err := NewRelayer(dir, binlogfile.SegmentSizeBytes, nil)
50+
relayer, err := NewRelayer(dir, binlogfile.SegmentSizeBytes, r)
5151
c.Assert(relayer, NotNil)
5252
c.Assert(err, IsNil)
5353
defer relayer.Close()
5454

5555
r.SetDDL()
56-
pos1, err := relayer.WriteBinlog(r.Schema, r.Table, r.TiBinlog, nil)
56+
pos1, err := relayer.WriteBinlog(r.Schema, r.Table, r.TiBinlog, r.PV)
5757
c.Assert(err, IsNil)
5858
c.Assert(pos1.Suffix, Equals, uint64(0))
5959
c.Assert(pos1.Offset, Greater, int64(0))
6060

6161
r.SetInsert(c)
62-
pos2, err := relayer.WriteBinlog(r.Schema, r.Table, r.TiBinlog, nil)
62+
pos2, err := relayer.WriteBinlog(r.Schema, r.Table, r.TiBinlog, r.PV)
6363
c.Assert(err, IsNil)
6464
c.Assert(pos2.Suffix, Equals, uint64(0))
6565
c.Assert(pos2.Offset, Greater, pos1.Offset)
6666
}
6767

6868
func (r *testRelayerSuite) TestGCBinlog(c *C) {
6969
dir := c.MkDir()
70-
relayer, err := NewRelayer(dir, 10, nil)
70+
relayer, err := NewRelayer(dir, 10, r)
7171
c.Assert(relayer, NotNil)
7272
c.Assert(err, IsNil)
7373
defer relayer.Close()
7474

7575
r.SetDDL()
76-
pos1, err := relayer.WriteBinlog(r.Schema, r.Table, r.TiBinlog, nil)
76+
pos1, err := relayer.WriteBinlog(r.Schema, r.Table, r.TiBinlog, r.PV)
7777
c.Assert(err, IsNil)
7878
// There should be 2 files: the written file, the new created file.
7979
// GC won't remove the first file now.
@@ -82,7 +82,7 @@ func (r *testRelayerSuite) TestGCBinlog(c *C) {
8282
checkRelayLogNumber(c, dir, 2)
8383

8484
r.SetDDL()
85-
pos2, err := relayer.WriteBinlog(r.Schema, r.Table, r.TiBinlog, nil)
85+
pos2, err := relayer.WriteBinlog(r.Schema, r.Table, r.TiBinlog, r.PV)
8686
c.Assert(err, IsNil)
8787
// Rotate twice, so there would be 3 files.
8888
checkRelayLogNumber(c, dir, 3)

0 commit comments

Comments
 (0)