Skip to content

Commit

Permalink
rm compose dir
Browse files Browse the repository at this point in the history
  • Loading branch information
obgnail committed Feb 9, 2023
1 parent a192d52 commit e723184
Show file tree
Hide file tree
Showing 64 changed files with 86 additions and 95 deletions.
43 changes: 43 additions & 0 deletions audit_log/audit_log.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
package audit_log

import (
"github.com/obgnail/audit-log/clickhouse"
"github.com/obgnail/audit-log/config"
"github.com/obgnail/audit-log/logger"
"github.com/obgnail/audit-log/mysql"
"github.com/obgnail/audit-log/syncer"
)

type AuditLogger struct {
binlogSyncer *syncer.BinlogSynchronizer
txInfoSyncer *syncer.TxInfoSynchronizer
}

func New(binlogSyncer *syncer.BinlogSynchronizer, txInfoSyncer *syncer.TxInfoSynchronizer) *AuditLogger {
return &AuditLogger{binlogSyncer: binlogSyncer, txInfoSyncer: txInfoSyncer}
}

func (a *AuditLogger) Sync(handler Handler) {
go a.txInfoSyncer.HandleAuditLog(handler.OnAuditLog)
a.binlogSyncer.Sync()
a.txInfoSyncer.Sync()
}

func Init(path string) {
checkErr(config.InitConfig(path))
checkErr(logger.InitLogger())
checkErr(syncer.InitBinlogSyncer())
checkErr(syncer.InitTxInfoSyncer())
checkErr(mysql.InitDBM())
checkErr(clickhouse.InitClickHouse())
}

func checkErr(err error) {
if err != nil {
panic(err)
}
}

func Run(handler Handler) {
New(syncer.BinlogSyncer, syncer.TxInfoSyncer).Sync(handler)
}
2 changes: 1 addition & 1 deletion compose/audit_log/handler.go → audit_log/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ package audit_log

import (
"fmt"
"github.com/obgnail/audit-log/compose/types"
"github.com/obgnail/audit-log/types"
)

type Handler interface {
Expand Down
Empty file removed auditlog.log
Empty file.
21 changes: 11 additions & 10 deletions compose/broker/binlog_broker.go → broker/binlog_broker.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,8 @@ import (
"encoding/json"
"github.com/Shopify/sarama"
"github.com/juju/errors"
"github.com/obgnail/audit-log/compose/logger"
"github.com/obgnail/audit-log/compose/types"
"github.com/obgnail/audit-log/logger"
"github.com/obgnail/audit-log/types"
"github.com/obgnail/mysql-river/handler/kafka"
"github.com/obgnail/mysql-river/river"
"strings"
Expand All @@ -17,8 +17,8 @@ type BinlogBrokerConfig struct {
}

type BinlogKafkaBroker struct {
include map[string]map[string]struct{} // map[db]map[table]struct{}
*kafka.Broker
include map[string]map[string]struct{} // map[db]map[table]struct{}
defaultBroker *kafka.Broker
}

func New(cfg *BinlogBrokerConfig) (*BinlogKafkaBroker, error) {
Expand All @@ -33,17 +33,18 @@ func New(cfg *BinlogBrokerConfig) (*BinlogKafkaBroker, error) {
mapDb2Table[db][table] = struct{}{}
}
h.include = mapDb2Table

var err error
h.Broker, err = kafka.New(cfg.KafkaConfig)
h.defaultBroker, err = kafka.New(cfg.KafkaConfig)
if err != nil {
return nil, errors.Trace(err)
}
h.Broker.SetEventMarshaller(h.marshaller)
h.defaultBroker.SetHandler(h)
return h, nil
}

func (b *BinlogKafkaBroker) String() string {
return "binlog kafka broker"
return "kafka broker"
}

func (b *BinlogKafkaBroker) check(db, table string) bool {
Expand All @@ -56,7 +57,7 @@ func (b *BinlogKafkaBroker) check(db, table string) bool {
return true
}

func (b *BinlogKafkaBroker) marshaller(event *river.EventData) ([]byte, error) {
func (b *BinlogKafkaBroker) Marshal(event *river.EventData) ([]byte, error) {
switch event.EventType {
case river.EventTypeInsert, river.EventTypeUpdate, river.EventTypeDelete:
if b.check(event.Db, event.Table) {
Expand Down Expand Up @@ -88,7 +89,7 @@ func (b *BinlogKafkaBroker) OnClose(r *river.River) {

// Pipe 将river中的数据流向kafka
func (b *BinlogKafkaBroker) Pipe(river *river.River, from river.From) error {
if err := b.Broker.Pipe(river, from); err != nil {
if err := b.defaultBroker.Pipe(river, from); err != nil {
return errors.Trace(err)
}
return nil
Expand All @@ -107,7 +108,7 @@ func (b *BinlogKafkaBroker) Consume(fn func(*types.BinlogEvent) error) error {
return nil
}

if err := b.Broker.Consume(consumer); err != nil {
if err := b.defaultBroker.Consume(consumer); err != nil {
return errors.Trace(err)
}
return nil
Expand Down
2 changes: 1 addition & 1 deletion compose/broker/tx_broker.go → broker/tx_broker.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ import (
"encoding/json"
"github.com/Shopify/sarama"
"github.com/juju/errors"
"github.com/obgnail/audit-log/compose/types"
"github.com/obgnail/audit-log/types"
"github.com/obgnail/mysql-river/handler/kafka"
)

Expand Down
File renamed without changes.
24 changes: 0 additions & 24 deletions compose/audit_log/audit_log.go

This file was deleted.

24 changes: 0 additions & 24 deletions compose/init.go

This file was deleted.

13 changes: 6 additions & 7 deletions example/example.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,21 +3,20 @@ package main
import (
"fmt"
"github.com/juju/errors"
"github.com/obgnail/audit-log/compose"
"github.com/obgnail/audit-log/compose/audit_log"
"github.com/obgnail/audit-log/compose/mysql"
"github.com/obgnail/audit-log/compose/types"
"github.com/obgnail/audit-log/audit_log"
"github.com/obgnail/audit-log/context"
"github.com/obgnail/audit-log/utils/uuid"
"github.com/obgnail/audit-log/mysql"
"github.com/obgnail/audit-log/mysql/utils/uuid"
"github.com/obgnail/audit-log/types"
"gopkg.in/gorp.v1"
"time"
)

func main() {
compose.Init("../config/config.toml")
audit_log.Init("../config/config.toml")

audit_log.Run(audit_log.FunctionHandler(func(auditLog types.AuditLog) error {
fmt.Printf("get audit log: %+v", auditLog)
fmt.Printf("get audit log: %+v\n", auditLog)
return nil
}))

Expand Down
2 changes: 1 addition & 1 deletion example/user.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ package main
import (
"fmt"
"github.com/juju/errors"
"github.com/obgnail/audit-log/compose/mysql"
"github.com/obgnail/audit-log/mysql"
"gopkg.in/gorp.v1"
)

Expand Down
4 changes: 2 additions & 2 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -2,15 +2,15 @@ module github.com/obgnail/audit-log

go 1.18

replace github.com/obgnail/audit-log/go-mysql-driver => ./compose/mysql/go-mysql-driver
replace github.com/obgnail/audit-log/go-mysql-driver => ./mysql/go-mysql-driver

require (
github.com/BurntSushi/toml v1.2.1
github.com/ClickHouse/clickhouse-go/v2 v2.0.12
github.com/Shopify/sarama v1.37.0
github.com/juju/errors v0.0.0-20220203013757-bd733f3c86b9
github.com/obgnail/audit-log/go-mysql-driver v0.0.0-00010101000000-000000000000
github.com/obgnail/mysql-river v0.0.0-20230208182038-dc88b534228c
github.com/obgnail/mysql-river v0.0.0-20230209124253-5cfe7a909806
github.com/satori/go.uuid v1.2.0
gopkg.in/gorp.v1 v1.7.2
)
Expand Down
4 changes: 2 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -248,8 +248,8 @@ github.com/modern-go/reflect2 v1.0.1/go.mod h1:bx2lNnkwVCuqBIxFjflWJWanXIb3Rllmb
github.com/modern-go/reflect2 v1.0.2/go.mod h1:yWuevngMOJpCy52FWWMvUC8ws7m/LJsjYzDa0/r8luk=
github.com/mwitkow/go-conntrack v0.0.0-20161129095857-cc309e4a2223/go.mod h1:qRWi+5nqEBWmkhHvq77mSJWrCKwh8bxhgT7d/eI7P4U=
github.com/mwitkow/go-conntrack v0.0.0-20190716064945-2f068394615f/go.mod h1:qRWi+5nqEBWmkhHvq77mSJWrCKwh8bxhgT7d/eI7P4U=
github.com/obgnail/mysql-river v0.0.0-20230208182038-dc88b534228c h1:A4ZTopwdq3MuNb7TcQYmxaQ2LmjuaM6ayyY6Dc5ut08=
github.com/obgnail/mysql-river v0.0.0-20230208182038-dc88b534228c/go.mod h1:Q0XBSA7lV4AtTr9bdixpenFXFhqSi2ZRSt5dpJ9wCQA=
github.com/obgnail/mysql-river v0.0.0-20230209124253-5cfe7a909806 h1:sbrSaoH2Ek85KvIW0TKlaMHaLYIhe/nO12Q3PaC3ZLk=
github.com/obgnail/mysql-river v0.0.0-20230209124253-5cfe7a909806/go.mod h1:Q0XBSA7lV4AtTr9bdixpenFXFhqSi2ZRSt5dpJ9wCQA=
github.com/paulmach/orb v0.4.0 h1:ilp1MQjRapLJ1+qcays1nZpe0mvkCY+b8JU/qBKRZ1A=
github.com/paulmach/orb v0.4.0/go.mod h1:FkcWtplUAIVqAuhAOV2d3rpbnQyliDOjOcLW9dUrfdU=
github.com/paulmach/protoscan v0.2.1-0.20210522164731-4e53c6875432/go.mod h1:2sV+uZ/oQh66m4XJVZm5iqUZ62BN88Ex1E+TTS0nLzI=
Expand Down
File renamed without changes.
File renamed without changes.
File renamed without changes.
6 changes: 3 additions & 3 deletions compose/mysql/db.go → mysql/db.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,9 +4,9 @@ import (
"bytes"
"fmt"
"github.com/juju/errors"
"github.com/obgnail/audit-log/compose/logger"
"github.com/obgnail/audit-log/compose/syncer"
"github.com/obgnail/audit-log/compose/types"
"github.com/obgnail/audit-log/logger"
"github.com/obgnail/audit-log/syncer"
"github.com/obgnail/audit-log/types"
uuid "github.com/satori/go.uuid"
"gopkg.in/gorp.v1"
"reflect"
Expand Down
File renamed without changes.
File renamed without changes.
File renamed without changes.
File renamed without changes.
File renamed without changes.
File renamed without changes.
File renamed without changes.
File renamed without changes.
File renamed without changes.
File renamed without changes.
File renamed without changes.
File renamed without changes.
File renamed without changes.
File renamed without changes.
File renamed without changes.
File renamed without changes.
File renamed without changes.
File renamed without changes.
File renamed without changes.
File renamed without changes.
File renamed without changes.
File renamed without changes.
File renamed without changes.
File renamed without changes.
File renamed without changes.
File renamed without changes.
File renamed without changes.
File renamed without changes.
File renamed without changes.
File renamed without changes.
File renamed without changes.
File renamed without changes.
File renamed without changes.
File renamed without changes.
File renamed without changes.
File renamed without changes.
File renamed without changes.
File renamed without changes.
File renamed without changes.
File renamed without changes.
File renamed without changes.
File renamed without changes.
20 changes: 10 additions & 10 deletions compose/syncer/binlog_syncer.go → syncer/binlog_syncer.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,10 +2,10 @@ package syncer

import (
"github.com/juju/errors"
"github.com/obgnail/audit-log/compose/broker"
"github.com/obgnail/audit-log/compose/logger"
"github.com/obgnail/audit-log/compose/types"
"github.com/obgnail/audit-log/broker"
"github.com/obgnail/audit-log/config"
"github.com/obgnail/audit-log/logger"
"github.com/obgnail/audit-log/types"
"github.com/obgnail/mysql-river/handler/kafka"
"github.com/obgnail/mysql-river/river"
"time"
Expand All @@ -19,17 +19,17 @@ const (

// BinlogSynchronizer 将river中的数据通过broker流向clickhouse
type BinlogSynchronizer struct {
*river.River
*broker.BinlogKafkaBroker
river *river.River
broker *broker.BinlogKafkaBroker

syncChan chan *types.BinlogEvent
}

func NewBinlogSyncer(river *river.River, broker *broker.BinlogKafkaBroker) *BinlogSynchronizer {
s := &BinlogSynchronizer{
River: river,
BinlogKafkaBroker: broker,
syncChan: make(chan *types.BinlogEvent, defaultSyncChanSize),
river: river,
broker: broker,
syncChan: make(chan *types.BinlogEvent, defaultSyncChanSize),
}
return s
}
Expand Down Expand Up @@ -69,7 +69,7 @@ func (s *BinlogSynchronizer) Sync() {
go s.batchSend2Clickhouse()

go func() {
err := s.BinlogKafkaBroker.Consume(func(event *types.BinlogEvent) error {
err := s.broker.Consume(func(event *types.BinlogEvent) error {
s.syncChan <- event
return nil
})
Expand All @@ -78,7 +78,7 @@ func (s *BinlogSynchronizer) Sync() {
}
}()
go func() {
err := s.BinlogKafkaBroker.Pipe(s.River, river.FromFile)
err := s.broker.Pipe(s.river, river.FromFile)
if err != nil {
logger.ErrorDetails(errors.Trace(err))
}
Expand Down
12 changes: 4 additions & 8 deletions compose/syncer/tx_syncer.go → syncer/tx_syncer.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,10 +2,10 @@ package syncer

import (
"github.com/juju/errors"
"github.com/obgnail/audit-log/compose/broker"
"github.com/obgnail/audit-log/compose/logger"
"github.com/obgnail/audit-log/compose/types"
"github.com/obgnail/audit-log/broker"
"github.com/obgnail/audit-log/config"
"github.com/obgnail/audit-log/logger"
"github.com/obgnail/audit-log/types"
"time"
)

Expand All @@ -20,15 +20,11 @@ const (

type TxInfoSynchronizer struct {
*broker.TxKafkaBroker

auditChan chan types.AuditLog
}

func NewTxInfoSyncer(broker *broker.TxKafkaBroker) *TxInfoSynchronizer {
return &TxInfoSynchronizer{
TxKafkaBroker: broker,
auditChan: make(chan types.AuditLog, defaultAuditChanSize),
}
return &TxInfoSynchronizer{TxKafkaBroker: broker, auditChan: make(chan types.AuditLog, defaultAuditChanSize)}
}

func (s *TxInfoSynchronizer) HandleAuditLog(fn func(txEvent types.AuditLog) error) {
Expand Down
2 changes: 1 addition & 1 deletion compose/types/binlog_event.go → types/binlog_event.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ package types
import (
"context"
"github.com/juju/errors"
"github.com/obgnail/audit-log/compose/clickhouse"
"github.com/obgnail/audit-log/clickhouse"
"time"
)

Expand Down
File renamed without changes.
2 changes: 1 addition & 1 deletion compose/types/tx_info.go → types/tx_info.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ package types
import (
"context"
"github.com/juju/errors"
"github.com/obgnail/audit-log/compose/clickhouse"
"github.com/obgnail/audit-log/clickhouse"
"time"
)

Expand Down
File renamed without changes.

0 comments on commit e723184

Please sign in to comment.