Skip to content

Commit

Permalink
fin roughly
Browse files Browse the repository at this point in the history
  • Loading branch information
obgnail committed Feb 6, 2023
1 parent 7d1e741 commit 2a8427b
Show file tree
Hide file tree
Showing 20 changed files with 306 additions and 505 deletions.
2 changes: 1 addition & 1 deletion compose/broker/kafka.go
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,7 @@ func (k *KafkaBroker) ConsumeTx(fn func(info *common.TxInfo) error) error {
}
return nil
}
if err := kafka.Consume(k.addrs, k.binlogTopic, f); err != nil {
if err := kafka.Consume(k.addrs, k.txInfoTopic, f); err != nil {
return errors.Trace(err)
}
return nil
Expand Down
59 changes: 36 additions & 23 deletions compose/common/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,19 +8,34 @@ import (
"time"
)

type Action int

const (
EventActionInsert = iota
EventActionInsert Action = iota
EventActionUpdate
EventActionDelete
)

func (a Action) String() string {
switch a {
case EventActionInsert:
return "insert"
case EventActionUpdate:
return "update"
case EventActionDelete:
return "delete"
default:
return "unknown"
}
}

type BinlogEvent struct {
Db string `json:"db"`
EventTable string `json:"event_table"`
EventAction int `json:"event_action"`
GTID string `json:"gtid"`
EventTime uint32 `json:"event_time"`
Event sql.RawBytes `json:"event"`
Db string `json:"db"`
Table string `json:"table"`
Action Action `json:"action"`
GTID string `json:"gtid"`
Time int64 `json:"time"`
Data sql.RawBytes `json:"data"`
}

func (e *BinlogEvent) Marshal() ([]byte, error) {
Expand All @@ -37,7 +52,7 @@ func NewBinlogEvent(event *river.EventData) (*BinlogEvent, error) {
return nil, errors.Trace(err)
}

var action int
var action Action
switch event.EventType {
case river.EventTypeInsert:
action = EventActionInsert
Expand All @@ -48,12 +63,12 @@ func NewBinlogEvent(event *river.EventData) (*BinlogEvent, error) {
}

b := &BinlogEvent{
Db: event.Db,
EventTable: event.Table,
EventAction: action,
GTID: event.GTIDSet,
EventTime: event.Timestamp,
Event: data,
Db: event.Db,
Table: event.Table,
Action: action,
GTID: event.GTIDSet,
Time: int64(event.Timestamp),
Data: data,
}
return b, nil
}
Expand All @@ -77,10 +92,9 @@ var (
)

type TxInfo struct {
Time int64 `db:"time" json:"time"`
Context string `db:"context" json:"context"`
UserUUID string `db:"user_uuid" json:"user_uuid"`
GTID string `db:"gtid" json:"gtid"`
Time int64 `db:"time" json:"time"`
Context string `db:"context" json:"context"`
GTID string `db:"gtid" json:"gtid"`
}

func (t *TxInfo) Marshal() ([]byte, error) {
Expand All @@ -91,12 +105,11 @@ func (t *TxInfo) Marshal() ([]byte, error) {
return b, nil
}

func NewTxInfo(ctx, userUUID, Gtid string) *TxInfo {
func NewTxInfo(ctx, Gtid string) *TxInfo {
return &TxInfo{
Time: time.Now().In(defaultLoc).Unix(),
Context: ctx,
UserUUID: userUUID,
GTID: Gtid,
Time: time.Now().In(defaultLoc).Unix(),
Context: ctx,
GTID: Gtid,
}
}

Expand Down
4 changes: 2 additions & 2 deletions compose/init.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,8 +13,8 @@ func checkErr(err error) {
}
}

func init() {
checkErr(config.InitConfig())
func Init(path string) {
checkErr(config.InitConfig(path))
checkErr(broker.InitBroker())
checkErr(river.InitRiver())
checkErr(mysql.InitDBM())
Expand Down
4 changes: 2 additions & 2 deletions compose/mysql/db.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ const (
sqlDefaultSep = " ,"
)

func DBMTransact(ctx, userUUID string, txFunc func(*gorp.Transaction) error) (err error) {
func DBMTransact(ctx string, txFunc func(tx *gorp.Transaction) error) (err error) {
tx, err := DBM.Begin()
if err != nil {
return
Expand Down Expand Up @@ -48,7 +48,7 @@ func DBMTransact(ctx, userUUID string, txFunc func(*gorp.Transaction) error) (er
GTIDField := txiField.FieldByName("GTID")
GTIDValue := GTIDField.String()
if checkGTID(GTIDValue) {
t := common.NewTxInfo(ctx, userUUID, GTIDValue)
t := common.NewTxInfo(ctx, GTIDValue)
if err := river.AuditLogRiver.PushTx(t); err != nil {
log.Println("[Error] push tx err:\n", errors.ErrorStack(err))
}
Expand Down
2 changes: 1 addition & 1 deletion compose/mysql/init.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,8 @@ import (
"database/sql"
"fmt"
"github.com/juju/errors"
_ "github.com/obgnail/audit-log/compose/mysql/go-mysql-driver"
"github.com/obgnail/audit-log/config"
_ "github.com/obgnail/audit-log/go-mysql-driver"
"gopkg.in/gorp.v1"
"time"
)
Expand Down
6 changes: 3 additions & 3 deletions config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ type MainConfig struct {
Mysql *MySqlConfig `toml:"mysql"`
PositionSaver *PosAutoSaverConfig `toml:"position_saver"`
HealthChecker *HealthCheckerConfig `toml:"health_checker"`
AuditLog *AuditLogHandlerConfig `toml:"river"`
AuditLog *AuditLogHandlerConfig `toml:"audit_log"`
Kafka *KafkaConfig `toml:"kafka"`
ClickHouse *ClickHouseConfig `toml:"clickhouse"`
}
Expand Down Expand Up @@ -62,9 +62,9 @@ var (
ClickHouse *ClickHouseConfig
)

func InitConfig() error {
func InitConfig(path string) error {
var cfg MainConfig
f, err := os.Open("./config/config.toml")
f, err := os.Open(path)
if err != nil {
return errors.Trace(err)
}
Expand Down
8 changes: 8 additions & 0 deletions context/context.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,14 @@ type Context struct {
Param2 string `db:"context_param_2"`
}

func New(Type int, param1, param2 string) Context {
return Context{
Type: Type,
Param1: param1,
Param2: param2,
}
}

// 将 Context 序列化成可读字符串
func (c Context) String() string {
var b strings.Builder
Expand Down
12 changes: 0 additions & 12 deletions context/type.go

This file was deleted.

24 changes: 0 additions & 24 deletions example.go

This file was deleted.

89 changes: 89 additions & 0 deletions example/example.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,89 @@
package main

import (
"fmt"
"github.com/juju/errors"
"github.com/obgnail/audit-log/compose"
"github.com/obgnail/audit-log/compose/common"
"github.com/obgnail/audit-log/compose/mysql"
"github.com/obgnail/audit-log/compose/river"
"github.com/obgnail/audit-log/context"
"github.com/obgnail/audit-log/utils/uuid"
r "github.com/obgnail/mysql-river/river"
"gopkg.in/gorp.v1"
"time"
)

func main() {
go runRiver()
time.Sleep(time.Second * 3)

createTable()
insertUser()
dropTable()
forever := make(chan struct{})
<-forever
}

func runRiver() {
compose.Init("../config/config.toml")

go river.AuditLogRiver.ConsumeLog(func(event *common.BinlogEvent) error {
fmt.Printf("在%d时刻,对%s.%s表进行了%s操作,其gtid为%s.操作的值为%s\n",
event.Time, event.Db, event.Table, event.Action, event.GTID, event.Data)
return nil
})
go river.AuditLogRiver.ConsumeTx(func(info *common.TxInfo) error {
fmt.Printf("在%d时刻,提交的GTID为:%s,携带的上下文为:%s\n", info.Time, info.GTID, info.Context)
return nil
})

err := river.Run(r.FromDB)
checkErr(err)
}

func createTable() {
err := mysql.DBMTransact("", func(tx *gorp.Transaction) error {
sql := `
CREATE TABLE IF NOT EXISTS user (
uuid varchar(8) CHARACTER SET latin1 COLLATE latin1_bin NOT NULL DEFAULT '',
name varchar(64) NOT NULL DEFAULT '' COMMENT '姓名',
email varchar(128) CHARACTER SET latin1 COLLATE latin1_bin DEFAULT '' COMMENT '邮箱',
status tinyint(4) NOT NULL DEFAULT '1' COMMENT '1.正常 2.删除',
PRIMARY KEY (uuid)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4;`
if _, err := tx.Exec(sql); err != nil {
return errors.Trace(err)
}
return nil
})
checkErr(err)
}

func dropTable() {
err := mysql.DBMTransact("", func(tx *gorp.Transaction) error {
sql := `DROP TABLE IF EXISTS user;`
if _, err := tx.Exec(sql); err != nil {
return errors.Trace(err)
}
return nil
})
checkErr(err)
}

func insertUser() {
myType := 1
myContext := context.New(myType, "typeParam1", "typeParam2")
err := mysql.DBMTransact(myContext.String(), func(tx *gorp.Transaction) error {
_uuid := uuid.UUID()
user := &User{_uuid, _uuid + "Name", _uuid + "@gmail.com", 0}
return errors.Trace(AddUser(tx, user))
})
checkErr(err)
}

func checkErr(err error) {
if err != nil {
panic(err)
}
}
Loading

0 comments on commit 2a8427b

Please sign in to comment.