-
Notifications
You must be signed in to change notification settings - Fork 131
Drainer support plugin framework #911
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Open
tsthght
wants to merge
65
commits into
pingcap:plugin
Choose a base branch
from
tsthght:plugin
base: plugin
Could not load branches
Branch not found: {{ refName }}
Loading
Could not load tags
Nothing to show
Loading
Are you sure you want to change the base?
Some commits from the old base branch may be removed from the timeline,
and old review comments may become outdated.
Open
Changes from 61 commits
Commits
Show all changes
65 commits
Select commit
Hold shift + click to select a range
19b6895
add two args
tsthght 4e0a4de
support plugin
tsthght 2cc57b8
refine
tsthght b583108
optimize the plugin management structure
tsthght 76c6be6
modify plugin framework
tsthght 27989e2
modify plugin framework
tsthght 4811764
modify plugin demo
tsthght a5c5572
gofmt
tsthght 73c7f0b
add testcase
tsthght c2ac0ec
modify testcase
tsthght f9a74e3
modify go.mod
tsthght ccfd79b
modify
tsthght 861e5df
fix a problem
tsthght 2af5039
modify plugin
tsthght f4b62a2
modify plugin demo
tsthght 3ca4924
modify plugin pos
tsthght 6ea9197
modify FilterTxn ret
tsthght c085535
merge master
tsthght e4917f3
modify value: index
tsthght 8f24b8a
add RecordId
tsthght 24eaa9e
modify plugin
tsthght c97f3f2
support configure the mark database name and mark table name
tsthght 536943a
add some testcase
tsthght af05581
add testcase
tsthght 1d4e857
add testcase
tsthght fc3644f
modify some error
tsthght 8525698
add some tests
tsthght 3aa0f2e
modify test
tsthght 9f0ae6a
add some log
tsthght b04f256
modify log
tsthght f0cbd85
refine code
tsthght 7782196
add some log
tsthght ce64ee4
trim string
tsthght 2df8841
should not conflict with LoopbackControl
tsthght 0c764b1
refine comment
tsthght 76061b5
add comment
tsthght 78e9153
replace errors.New(fmt.Sprintf(...)) with fmt.Errorf(...)
tsthght 59ad9ee
add comment
tsthght 8859fd7
change RecordId to RecordID
tsthght f06e03f
change SetPlugin to setPlugin
tsthght 355b679
add comment
tsthght 9d8f6e3
refine
tsthght 7a12571
modify comment
tsthght 5e2e3ac
refine
tsthght d385b89
refine code
tsthght f102f7c
handle ci error
tsthght 90c5126
add init interface
tsthght fbd2748
modify SyncerPlugin to
tsthght 7df02f6
modify SyncerPlugin to SyncerFilter
tsthght 2893d2f
modify Loopback to SyncerFilter
tsthght ed5852f
modify
tsthght 518415d
add
tsthght 6ac7d53
refine
tsthght 0929e8d
add LoaderInit
tsthght 124a885
add loaderdestroy
tsthght ce067e3
refine
tsthght 8b5fbd9
modify
tsthght 8a2d0a7
refine
tsthght 8c71d4a
reine
tsthght 1d33874
refine
tsthght c99c28c
modify
tsthght cd06a34
refine
tsthght 613567d
add log
tsthght edecd47
add logs
tsthght 016b67f
Get rid of useless parameters
tsthght File filter
Filter by extension
Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
There are no files selected for viewing
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,11 @@ | ||
package drainer | ||
|
||
import ( | ||
"github.com/pingcap/tidb-binlog/drainer/loopbacksync" | ||
"github.com/pingcap/tidb-binlog/pkg/loader" | ||
) | ||
|
||
// SyncerFilter is the interface that for syncer-plugin | ||
type SyncerFilter interface { | ||
FilterTxn(txn *loader.Txn, info *loopbacksync.LoopBackSync) (bool, error) | ||
} |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -20,6 +20,7 @@ import ( | |
|
||
"github.com/pingcap/errors" | ||
"github.com/pingcap/log" | ||
"github.com/pingcap/tidb-binlog/pkg/plugin" | ||
"go.uber.org/zap" | ||
) | ||
|
||
|
@@ -36,36 +37,58 @@ const ( | |
ChannelInfo = "channel_info" | ||
) | ||
|
||
// CreateMarkTableDDL is the DDL to create the mark table. | ||
var CreateMarkTableDDL string = fmt.Sprintf("CREATE TABLE If Not Exists %s (%s bigint not null,%s bigint not null DEFAULT 0, %s bigint DEFAULT 0, %s varchar(64) ,PRIMARY KEY (%s,%s));", MarkTableName, ID, ChannelID, Val, ChannelInfo, ID, ChannelID) | ||
|
||
// CreateMarkDBDDL is DDL to create the database of mark table. | ||
var CreateMarkDBDDL = "create database IF NOT EXISTS retl;" | ||
|
||
//LoopBackSync loopback sync info | ||
type LoopBackSync struct { | ||
ChannelID int64 | ||
LoopbackControl bool | ||
MarkDBName string | ||
MarkTableName string | ||
SyncDDL bool | ||
Index int64 | ||
PluginPath string | ||
PluginNames []string | ||
Hooks []*plugin.EventHooks | ||
SupportPlugin bool | ||
RecordID int | ||
} | ||
|
||
//NewLoopBackSyncInfo return LoopBackSyncInfo objec | ||
func NewLoopBackSyncInfo(ChannelID int64, LoopbackControl, SyncDDL bool) *LoopBackSync { | ||
func NewLoopBackSyncInfo(ChannelID int64, LoopbackControl, SyncDDL bool, path string, names []string, SupportPlug bool, mdbname, mtablename string) *LoopBackSync { | ||
l := &LoopBackSync{ | ||
ChannelID: ChannelID, | ||
LoopbackControl: LoopbackControl, | ||
SyncDDL: SyncDDL, | ||
Index: 0, | ||
PluginPath: path, | ||
PluginNames: names, | ||
SupportPlugin: SupportPlug, | ||
MarkDBName: strings.TrimSpace(mdbname), | ||
MarkTableName: strings.TrimSpace(mtablename), | ||
} | ||
if l.SupportPlugin { | ||
l.Hooks = make([]*plugin.EventHooks, 4) | ||
l.Hooks[plugin.SyncerFilter] = &plugin.EventHooks{} | ||
|
||
l.Hooks[plugin.ExecutorExtend] = &plugin.EventHooks{} | ||
l.Hooks[plugin.LoaderInit] = &plugin.EventHooks{} | ||
l.Hooks[plugin.LoaderDestroy] = &plugin.EventHooks{} | ||
|
||
} | ||
return l | ||
} | ||
|
||
// CreateMarkTable create the db and table if need. | ||
func CreateMarkTable(db *sql.DB) error { | ||
_, err := db.Exec(CreateMarkDBDDL) | ||
func CreateMarkTable(db *sql.DB, mdbname, mtablename string) error { | ||
// CreateMarkDBDDL is DDL to create the database of mark table. | ||
var err error | ||
var CreateMarkDBDDL = fmt.Sprintf("create database IF NOT EXISTS %s;", mdbname) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. how about use There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I had fixed it. |
||
_, err = db.Exec(CreateMarkDBDDL) | ||
if err != nil { | ||
return errors.Annotate(err, "failed to create mark db") | ||
} | ||
|
||
// CreateMarkTableDDL is the DDL to create the mark table. | ||
var CreateMarkTableDDL string = fmt.Sprintf("CREATE TABLE If Not Exists %s.%s (%s bigint not null,%s bigint not null DEFAULT 0, %s bigint DEFAULT 0, %s varchar(64) ,PRIMARY KEY (%s,%s));", mdbname, mtablename, ID, ChannelID, Val, ChannelInfo, ID, ChannelID) | ||
_, err = db.Exec(CreateMarkTableDDL) | ||
if err != nil { | ||
return errors.Annotate(err, "failed to create mark table") | ||
|
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Oops, something went wrong.
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
Uh oh!
There was an error while loading. Please reload this page.