Skip to content

Commit

Permalink
Not sync ineligable table when not enable force replicate (#929)
Browse files Browse the repository at this point in the history
  • Loading branch information
hongyunyan authored Jan 21, 2025
1 parent c367411 commit db04bed
Show file tree
Hide file tree
Showing 14 changed files with 535 additions and 158 deletions.
2 changes: 1 addition & 1 deletion api/v2/changefeed.go
Original file line number Diff line number Diff line change
Expand Up @@ -603,7 +603,7 @@ func (h *OpenAPIV2) updateChangefeed(c *gin.Context) {
}

// verify changefeed filter
_, err = filter.NewFilter(oldCfInfo.Config.Filter, "", oldCfInfo.Config.CaseSensitive)
_, err = filter.NewFilter(oldCfInfo.Config.Filter, "", oldCfInfo.Config.CaseSensitive, oldCfInfo.Config.ForceReplicate)
if err != nil {
_ = c.Error(errors.ErrChangefeedUpdateRefused.
GenWithStackByArgs(errors.Cause(err).Error()))
Expand Down
22 changes: 10 additions & 12 deletions downstreamadapter/dispatchermanager/event_dispatcher_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -105,7 +105,7 @@ type EventDispatcherManager struct {
closing atomic.Bool
closed atomic.Bool
cancel context.CancelFunc
wg *sync.WaitGroup
wg sync.WaitGroup

metricTableTriggerEventDispatcherCount prometheus.Gauge
metricEventDispatcherCount prometheus.Gauge
Expand All @@ -129,7 +129,6 @@ func NewEventDispatcherManager(
failpoint.Inject("NewEventDispatcherManagerDelay", nil)

ctx, cancel := context.WithCancel(context.Background())
wg := &sync.WaitGroup{}
pdClock := appcontext.GetService[pdutil.Clock](appcontext.DefaultPDClock)
manager := &EventDispatcherManager{
dispatcherMap: newDispatcherMap(),
Expand All @@ -139,10 +138,9 @@ func NewEventDispatcherManager(
statusesChan: make(chan TableSpanStatusWithSeq, 8192),
blockStatusesChan: make(chan *heartbeatpb.TableSpanBlockStatus, 1024*1024),
errCh: make(chan error, 1),
wg: wg,
cancel: cancel,
config: cfConfig,
filterConfig: toFilterConfigPB(cfConfig.Filter),
filterConfig: &eventpb.FilterConfig{CaseSensitive: cfConfig.CaseSensitive, ForceReplicate: cfConfig.ForceReplicate, FilterConfig: toFilterConfigPB(cfConfig.Filter)},
schemaIDToDispatchers: dispatcher.NewSchemaIDToDispatchers(),
latestWatermark: NewWatermark(startTs),
metricTableTriggerEventDispatcherCount: metrics.TableTriggerEventDispatcherGauge.WithLabelValues(changefeedID.Namespace(), changefeedID.Name()),
Expand Down Expand Up @@ -185,9 +183,9 @@ func NewEventDispatcherManager(
}
}

wg.Add(1)
manager.wg.Add(1)
go func() {
defer wg.Done()
defer manager.wg.Done()
err = manager.sink.Run(ctx)
if err != nil && !errors.Is(errors.Cause(err), context.Canceled) {
select {
Expand All @@ -201,23 +199,23 @@ func NewEventDispatcherManager(
}()

// collect errors from error channel
wg.Add(1)
manager.wg.Add(1)
go func() {
defer wg.Done()
defer manager.wg.Done()
manager.collectErrors(ctx)
}()

// collect heart beat info from all dispatchers
wg.Add(1)
manager.wg.Add(1)
go func() {
defer wg.Done()
defer manager.wg.Done()
manager.collectComponentStatusWhenChanged(ctx)
}()

// collect block status from all dispatchers
wg.Add(1)
manager.wg.Add(1)
go func() {
defer wg.Done()
defer manager.wg.Done()
manager.collectBlockStatusRequest(ctx)
}()

Expand Down
4 changes: 2 additions & 2 deletions downstreamadapter/dispatchermanager/helper.go
Original file line number Diff line number Diff line change
Expand Up @@ -104,8 +104,8 @@ func (d *DispatcherMap) ForEach(fn func(id common.DispatcherID, dispatcher *disp
return seq
}

func toFilterConfigPB(filter *config.FilterConfig) *eventpb.FilterConfig {
filterConfig := &eventpb.FilterConfig{
func toFilterConfigPB(filter *config.FilterConfig) *eventpb.InnerFilterConfig {
filterConfig := &eventpb.InnerFilterConfig{
Rules: filter.Rules,
IgnoreTxnStartTs: filter.IgnoreTxnStartTs,
EventFilters: make([]*eventpb.EventFilterRule, len(filter.EventFilters)),
Expand Down
3 changes: 1 addition & 2 deletions downstreamadapter/eventcollector/event_collector.go
Original file line number Diff line number Diff line change
Expand Up @@ -182,8 +182,7 @@ func (c *EventCollector) AddDispatcher(target dispatcher.EventDispatcher, memory
c.dispatcherMap.Store(target.GetId(), stat)
metrics.EventCollectorRegisteredDispatcherCount.Inc()

areaSetting := dynstream.NewAreaSettings()
areaSetting.MaxPendingSize = memoryQuota
areaSetting := dynstream.NewAreaSettingsWithMaxPendingSize(memoryQuota)
err := c.ds.AddPath(target.GetId(), stat, areaSetting)
if err != nil {
log.Info("add dispatcher to dynamic stream failed", zap.Error(err))
Expand Down
434 changes: 349 additions & 85 deletions eventpb/event.pb.go

Large diffs are not rendered by default.

8 changes: 7 additions & 1 deletion eventpb/event.proto
Original file line number Diff line number Diff line change
Expand Up @@ -14,12 +14,18 @@ message EventFilterRule {
string ignore_delete_value_expr = 7;
}

message FilterConfig {
message InnerFilterConfig {
repeated string rules = 1;
repeated uint64 ignore_txn_start_ts = 2;
repeated EventFilterRule EventFilters = 3;
}

message FilterConfig {
bool caseSensitive = 1;
bool forceReplicate = 2;
InnerFilterConfig filterConfig = 3;
}


message ResolvedTs {

Expand Down
68 changes: 66 additions & 2 deletions logservice/schemastore/disk_format.go
Original file line number Diff line number Diff line change
Expand Up @@ -244,6 +244,66 @@ func loadTablesInKVSnap(
return tablesInKVSnap, partitionsInKVSnap, nil
}

func loadFullTablesInKVSnap(
snap *pebble.Snapshot,
gcTs uint64,
databaseMap map[int64]*BasicDatabaseInfo,
) (map[int64]*model.TableInfo, map[int64]*BasicTableInfo, map[int64]BasicPartitionInfo, error) {
tableInfosInKVSnap := make(map[int64]*model.TableInfo)
tablesInKVSnap := make(map[int64]*BasicTableInfo)
partitionsInKVSnap := make(map[int64]BasicPartitionInfo)

startKey, err := tableInfoKey(gcTs, 0)
if err != nil {
log.Fatal("generate lower bound failed", zap.Error(err))
}
endKey, err := tableInfoKey(gcTs, math.MaxInt64)
if err != nil {
log.Fatal("generate upper bound failed", zap.Error(err))
}
snapIter, err := snap.NewIter(&pebble.IterOptions{
LowerBound: startKey,
UpperBound: endKey,
})
if err != nil {
log.Fatal("new iterator failed", zap.Error(err))
}
defer snapIter.Close()
for snapIter.First(); snapIter.Valid(); snapIter.Next() {
var table_info_entry PersistedTableInfoEntry
if _, err := table_info_entry.UnmarshalMsg(snapIter.Value()); err != nil {
log.Fatal("unmarshal table info entry failed", zap.Error(err))
}

tableInfo := model.TableInfo{}
if err := json.Unmarshal(table_info_entry.TableInfoValue, &tableInfo); err != nil {
log.Fatal("unmarshal table info failed", zap.Error(err))
}
databaseInfo, ok := databaseMap[table_info_entry.SchemaID]
if !ok {
log.Panic("database not found",
zap.Int64("schemaID", table_info_entry.SchemaID),
zap.String("schemaName", table_info_entry.SchemaName),
zap.String("tableName", tableInfo.Name.O))
}
// TODO: add a unit test for this case
tableInfosInKVSnap[tableInfo.ID] = &tableInfo
databaseInfo.Tables[tableInfo.ID] = true
tablesInKVSnap[tableInfo.ID] = &BasicTableInfo{
SchemaID: table_info_entry.SchemaID,
Name: tableInfo.Name.O,
}
if tableInfo.Partition != nil {
partitionInfo := make(BasicPartitionInfo)
for _, partition := range tableInfo.Partition.Definitions {
partitionInfo[partition.ID] = nil
}
partitionsInKVSnap[tableInfo.ID] = partitionInfo
}
}
return tableInfosInKVSnap, tablesInKVSnap, partitionsInKVSnap, nil
}

// load the ddl jobs in the range (gcTs, upperBound] and apply the ddl job to update database and table info
func loadAndApplyDDLHistory(
snap *pebble.Snapshot,
Expand Down Expand Up @@ -581,7 +641,7 @@ func loadAllPhysicalTablesAtTs(
return nil, err
}

tableMap, partitionMap, err := loadTablesInKVSnap(storageSnap, gcTs, databaseMap)
tableInfoMap, tableMap, partitionMap, err := loadFullTablesInKVSnap(storageSnap, gcTs, databaseMap)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -632,7 +692,11 @@ func loadAllPhysicalTablesAtTs(
zap.Any("databaseMapLen", len(databaseMap)))
}
schemaName := databaseMap[tableInfo.SchemaID].Name
if tableFilter != nil && tableFilter.ShouldIgnoreTable(schemaName, tableInfo.Name) {
fullTableInfo, ok := tableInfoMap[tableID]
if !ok {
log.Error("table info not found", zap.Any("tableId", tableID))
}
if tableFilter != nil && tableFilter.ShouldIgnoreTable(schemaName, tableInfo.Name, fullTableInfo) {
continue
}
if partitionInfo, ok := partitionMap[tableID]; ok {
Expand Down
20 changes: 10 additions & 10 deletions logservice/schemastore/persist_storage_ddl_handlers.go
Original file line number Diff line number Diff line change
Expand Up @@ -1385,10 +1385,10 @@ func buildDDLEventCommon(rawEvent *PersistedDDLEvent, tableFilter filter.Filter,
filtered := false
// TODO: ShouldDiscardDDL is used for old architecture, should be removed later
if tableFilter != nil && rawEvent.CurrentSchemaName != "" && rawEvent.CurrentTableName != "" {
filtered = tableFilter.ShouldDiscardDDL(model.ActionType(rawEvent.Type), rawEvent.CurrentSchemaName, rawEvent.CurrentTableName)
filtered = tableFilter.ShouldDiscardDDL(model.ActionType(rawEvent.Type), rawEvent.CurrentSchemaName, rawEvent.CurrentTableName, rawEvent.TableInfo)
// if the ddl invovles another table name, only set filtered to true when all of them should be filtered
if rawEvent.PrevSchemaName != "" && rawEvent.PrevTableName != "" {
filtered = filtered && tableFilter.ShouldDiscardDDL(model.ActionType(rawEvent.Type), rawEvent.PrevSchemaName, rawEvent.PrevTableName)
filtered = filtered && tableFilter.ShouldDiscardDDL(model.ActionType(rawEvent.Type), rawEvent.PrevSchemaName, rawEvent.PrevTableName, rawEvent.TableInfo)
}
}
if rawEvent.TableInfo != nil {
Expand Down Expand Up @@ -1609,8 +1609,8 @@ func buildDDLEventForRenameTable(rawEvent *PersistedDDLEvent, tableFilter filter
}
ddlEvent.PrevSchemaName = rawEvent.PrevSchemaName
ddlEvent.PrevTableName = rawEvent.PrevTableName
ignorePrevTable := tableFilter != nil && tableFilter.ShouldIgnoreTable(rawEvent.PrevSchemaName, rawEvent.PrevTableName)
ignoreCurrentTable := tableFilter != nil && tableFilter.ShouldIgnoreTable(rawEvent.CurrentSchemaName, rawEvent.CurrentTableName)
ignorePrevTable := tableFilter != nil && tableFilter.ShouldIgnoreTable(rawEvent.PrevSchemaName, rawEvent.PrevTableName, rawEvent.TableInfo)
ignoreCurrentTable := tableFilter != nil && tableFilter.ShouldIgnoreTable(rawEvent.CurrentSchemaName, rawEvent.CurrentTableName, rawEvent.TableInfo)
if isPartitionTable(rawEvent.TableInfo) {
allPhysicalIDs := getAllPartitionIDs(rawEvent.TableInfo)
if !ignorePrevTable {
Expand Down Expand Up @@ -1876,8 +1876,8 @@ func buildDDLEventForExchangeTablePartition(rawEvent *PersistedDDLEvent, tableFi
if !ok {
return ddlEvent, false
}
ignoreNormalTable := tableFilter != nil && tableFilter.ShouldIgnoreTable(rawEvent.PrevSchemaName, rawEvent.PrevTableName)
ignorePartitionTable := tableFilter != nil && tableFilter.ShouldIgnoreTable(rawEvent.CurrentSchemaName, rawEvent.CurrentTableName)
ignoreNormalTable := tableFilter != nil && tableFilter.ShouldIgnoreTable(rawEvent.PrevSchemaName, rawEvent.PrevTableName, rawEvent.TableInfo)
ignorePartitionTable := tableFilter != nil && tableFilter.ShouldIgnoreTable(rawEvent.CurrentSchemaName, rawEvent.CurrentTableName, rawEvent.TableInfo)
physicalIDs := getAllPartitionIDs(rawEvent.TableInfo)
droppedIDs := getDroppedIDs(rawEvent.PrevPartitions, physicalIDs)
if len(droppedIDs) != 1 {
Expand Down Expand Up @@ -1954,8 +1954,8 @@ func buildDDLEventForRenameTables(rawEvent *PersistedDDLEvent, tableFilter filte
allFiltered := true
resultQuerys := make([]string, 0)
for i, tableInfo := range rawEvent.MultipleTableInfos {
ignorePrevTable := tableFilter != nil && tableFilter.ShouldIgnoreTable(rawEvent.PrevSchemaNames[i], rawEvent.PrevTableNames[i])
ignoreCurrentTable := tableFilter != nil && tableFilter.ShouldIgnoreTable(rawEvent.CurrentSchemaNames[i], tableInfo.Name.O)
ignorePrevTable := tableFilter != nil && tableFilter.ShouldIgnoreTable(rawEvent.PrevSchemaNames[i], rawEvent.PrevTableNames[i], tableInfo)
ignoreCurrentTable := tableFilter != nil && tableFilter.ShouldIgnoreTable(rawEvent.CurrentSchemaNames[i], tableInfo.Name.O, tableInfo)
if ignorePrevTable && ignoreCurrentTable {
continue
}
Expand Down Expand Up @@ -2064,7 +2064,7 @@ func buildDDLEventForCreateTables(rawEvent *PersistedDDLEvent, tableFilter filte
logicalTableCount := 0
allFiltered := true
for _, info := range rawEvent.MultipleTableInfos {
if tableFilter != nil && tableFilter.ShouldIgnoreTable(rawEvent.CurrentSchemaName, info.Name.O) {
if tableFilter != nil && tableFilter.ShouldIgnoreTable(rawEvent.CurrentSchemaName, info.Name.O, info) {
continue
}
allFiltered = false
Expand Down Expand Up @@ -2092,7 +2092,7 @@ func buildDDLEventForCreateTables(rawEvent *PersistedDDLEvent, tableFilter filte
addName := make([]commonEvent.SchemaTableName, 0, logicalTableCount)
resultQuerys := make([]string, 0, logicalTableCount)
for i, info := range rawEvent.MultipleTableInfos {
if tableFilter != nil && tableFilter.ShouldIgnoreTable(rawEvent.CurrentSchemaName, info.Name.O) {
if tableFilter != nil && tableFilter.ShouldIgnoreTable(rawEvent.CurrentSchemaName, info.Name.O, info) {
log.Info("build ddl event for create tables filter table",
zap.String("schemaName", rawEvent.CurrentSchemaName),
zap.String("tableName", info.Name.O))
Expand Down
2 changes: 1 addition & 1 deletion logservice/schemastore/persist_storage_test_utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -145,7 +145,7 @@ func buildTableFilterByNameForTest(schemaName, tableName string) filter.Filter {
filterConfig := &config.FilterConfig{
Rules: []string{filterRule},
}
tableFilter, err := filter.NewFilter(filterConfig, "", false)
tableFilter, err := filter.NewFilter(filterConfig, "", false, false)
if err != nil {
log.Panic("build filter failed", zap.Error(err))
}
Expand Down
2 changes: 1 addition & 1 deletion maintainer/maintainer_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -403,7 +403,7 @@ func (c *Controller) addNewSpans(schemaID int64, tableSpans []*heartbeatpb.Table

func (c *Controller) loadTables(startTs uint64) ([]commonEvent.Table, error) {
// todo: do we need to set timezone here?
f, err := filter.NewFilter(c.cfConfig.Filter, "", c.cfConfig.ForceReplicate)
f, err := filter.NewFilter(c.cfConfig.Filter, "", c.cfConfig.CaseSensitive, c.cfConfig.ForceReplicate)
if err != nil {
return nil, errors.Cause(err)
}
Expand Down
4 changes: 3 additions & 1 deletion pkg/config/changefeed.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,8 @@ type ChangefeedConfig struct {
TargetTS uint64 `json:"target_ts"`
SinkURI string `json:"sink_uri"`
// timezone used when checking sink uri
TimeZone string `json:"timezone" default:"system"`
TimeZone string `json:"timezone" default:"system"`
CaseSensitive bool `json:"case_sensitive" default:"false"`
// if true, force to replicate some ineligible tables
ForceReplicate bool `json:"force_replicate" default:"false"`
Filter *FilterConfig `toml:"filter" json:"filter"`
Expand Down Expand Up @@ -85,6 +86,7 @@ func (info *ChangeFeedInfo) ToChangefeedConfig() *ChangefeedConfig {
StartTS: info.StartTs,
TargetTS: info.TargetTs,
SinkURI: info.SinkURI,
CaseSensitive: info.Config.CaseSensitive,
ForceReplicate: info.Config.ForceReplicate,
SinkConfig: info.Config.Sink,
Filter: info.Config.Filter,
Expand Down
2 changes: 1 addition & 1 deletion pkg/eventservice/event_service_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -473,7 +473,7 @@ type mockDispatcherInfo struct {

func newMockDispatcherInfo(t *testing.T, dispatcherID common.DispatcherID, tableID int64, actionType eventpb.ActionType) *mockDispatcherInfo {
cfg := config.NewDefaultFilterConfig()
filter, err := filter.NewFilter(cfg, "", false)
filter, err := filter.NewFilter(cfg, "", false, false)
require.NoError(t, err)
return &mockDispatcherInfo{
clusterID: 1,
Expand Down
Loading

0 comments on commit db04bed

Please sign in to comment.