diff --git a/audit_log/audit_log.go b/audit_log/audit_log.go index 1da92d0..9f6aeb9 100644 --- a/audit_log/audit_log.go +++ b/audit_log/audit_log.go @@ -17,10 +17,10 @@ func New(binlogSyncer *syncer.BinlogSynchronizer, txInfoSyncer *syncer.TxInfoSyn return &AuditLogger{binlogSyncer: binlogSyncer, txInfoSyncer: txInfoSyncer} } -func (a *AuditLogger) Sync(handler Handler) { - go a.txInfoSyncer.HandleAuditLog(handler.OnAuditLog) - a.binlogSyncer.Sync() - a.txInfoSyncer.Sync() +func (log *AuditLogger) Sync(handler Handler) { + go log.txInfoSyncer.HandleAuditLog(handler.OnAuditLog) + log.binlogSyncer.Sync() + log.txInfoSyncer.Sync() } func Init(path string) { diff --git a/audit_log/handler.go b/audit_log/handler.go index a8a40a5..8dd65f3 100644 --- a/audit_log/handler.go +++ b/audit_log/handler.go @@ -6,18 +6,18 @@ import ( ) type Handler interface { - OnAuditLog(auditLog types.AuditLog) error + OnAuditLog(auditLog *types.AuditLog) error } -type FunctionHandler func(auditLog types.AuditLog) error +type FunctionHandler func(auditLog *types.AuditLog) error -func (f FunctionHandler) OnAuditLog(auditLog types.AuditLog) error { +func (f FunctionHandler) OnAuditLog(auditLog *types.AuditLog) error { return f(auditLog) } -type DummyAuditLogHandler func(auditLog types.AuditLog) error +type DummyAuditLogHandler func(auditLog *types.AuditLog) error -func (f DummyAuditLogHandler) OnAuditLog(auditLog types.AuditLog) error { +func (f DummyAuditLogHandler) OnAuditLog(auditLog *types.AuditLog) error { fmt.Printf("get audit log: %+v", auditLog) return nil } diff --git a/example/example.go b/example/example.go index a11d6ca..c3e5f5b 100644 --- a/example/example.go +++ b/example/example.go @@ -15,8 +15,8 @@ import ( func main() { audit_log.Init("../config/config.toml") - audit_log.Run(audit_log.FunctionHandler(func(auditLog types.AuditLog) error { - fmt.Printf("get audit log: %+v\n", auditLog) + audit_log.Run(audit_log.FunctionHandler(func(auditLog *types.AuditLog) error { + fmt.Printf("get audit log: %+v\n", *auditLog) return nil })) diff --git a/syncer/tx_syncer.go b/syncer/tx_syncer.go index c972302..c0361ae 100644 --- a/syncer/tx_syncer.go +++ b/syncer/tx_syncer.go @@ -20,14 +20,14 @@ const ( type TxInfoSynchronizer struct { *broker.TxKafkaBroker - auditChan chan types.AuditLog + auditChan chan *types.AuditLog } func NewTxInfoSyncer(broker *broker.TxKafkaBroker) *TxInfoSynchronizer { - return &TxInfoSynchronizer{TxKafkaBroker: broker, auditChan: make(chan types.AuditLog, defaultAuditChanSize)} + return &TxInfoSynchronizer{TxKafkaBroker: broker, auditChan: make(chan *types.AuditLog, defaultAuditChanSize)} } -func (s *TxInfoSynchronizer) HandleAuditLog(fn func(txEvent types.AuditLog) error) { +func (s *TxInfoSynchronizer) HandleAuditLog(fn func(txEvent *types.AuditLog) error) { for audit := range s.auditChan { if err := fn(audit); err != nil { logger.ErrorDetails(errors.Trace(err)) @@ -187,7 +187,7 @@ func (i *unprocessedInfos) Add(info types.ChTxInfo) { } func (i *unprocessedInfos) getToProcess() ( - toProcessInfoEvents []types.AuditLog, + toProcessInfoEvents []*types.AuditLog, toProcessInfo []types.ChTxInfo, err error, ) { diff --git a/types/binlog_event.go b/types/binlog_event.go index a1634bb..846326d 100644 --- a/types/binlog_event.go +++ b/types/binlog_event.go @@ -14,8 +14,8 @@ type AuditLog struct { BinlogEvents []ChBinlogEvent } -func NewAuditLog(txInfo ChTxInfo, events []ChBinlogEvent) AuditLog { - txBinlogEvent := AuditLog{ +func NewAuditLog(txInfo ChTxInfo, events []ChBinlogEvent) *AuditLog { + txBinlogEvent := &AuditLog{ Time: txInfo.Time, Context: txInfo.Context, GTID: txInfo.GTID,