From db04bed46a3a186c5b458f1f4b9b62e7713593a8 Mon Sep 17 00:00:00 2001 From: hongyunyan <649330952@qq.com> Date: Tue, 21 Jan 2025 16:33:08 +0800 Subject: [PATCH] Not sync ineligable table when not enable force replicate (#929) --- api/v2/changefeed.go | 2 +- .../event_dispatcher_manager.go | 22 +- downstreamadapter/dispatchermanager/helper.go | 4 +- .../eventcollector/event_collector.go | 3 +- eventpb/event.pb.go | 434 ++++++++++++++---- eventpb/event.proto | 8 +- logservice/schemastore/disk_format.go | 68 ++- .../persist_storage_ddl_handlers.go | 20 +- .../schemastore/persist_storage_test_utils.go | 2 +- maintainer/maintainer_controller.go | 2 +- pkg/config/changefeed.go | 4 +- pkg/eventservice/event_service_test.go | 2 +- pkg/filter/filter.go | 115 +++-- utils/dynstream/interfaces.go | 7 + 14 files changed, 535 insertions(+), 158 deletions(-) diff --git a/api/v2/changefeed.go b/api/v2/changefeed.go index 90997a82b..ffb42ed04 100644 --- a/api/v2/changefeed.go +++ b/api/v2/changefeed.go @@ -603,7 +603,7 @@ func (h *OpenAPIV2) updateChangefeed(c *gin.Context) { } // verify changefeed filter - _, err = filter.NewFilter(oldCfInfo.Config.Filter, "", oldCfInfo.Config.CaseSensitive) + _, err = filter.NewFilter(oldCfInfo.Config.Filter, "", oldCfInfo.Config.CaseSensitive, oldCfInfo.Config.ForceReplicate) if err != nil { _ = c.Error(errors.ErrChangefeedUpdateRefused. GenWithStackByArgs(errors.Cause(err).Error())) diff --git a/downstreamadapter/dispatchermanager/event_dispatcher_manager.go b/downstreamadapter/dispatchermanager/event_dispatcher_manager.go index f8175d830..757f1fbb4 100644 --- a/downstreamadapter/dispatchermanager/event_dispatcher_manager.go +++ b/downstreamadapter/dispatchermanager/event_dispatcher_manager.go @@ -105,7 +105,7 @@ type EventDispatcherManager struct { closing atomic.Bool closed atomic.Bool cancel context.CancelFunc - wg *sync.WaitGroup + wg sync.WaitGroup metricTableTriggerEventDispatcherCount prometheus.Gauge metricEventDispatcherCount prometheus.Gauge @@ -129,7 +129,6 @@ func NewEventDispatcherManager( failpoint.Inject("NewEventDispatcherManagerDelay", nil) ctx, cancel := context.WithCancel(context.Background()) - wg := &sync.WaitGroup{} pdClock := appcontext.GetService[pdutil.Clock](appcontext.DefaultPDClock) manager := &EventDispatcherManager{ dispatcherMap: newDispatcherMap(), @@ -139,10 +138,9 @@ func NewEventDispatcherManager( statusesChan: make(chan TableSpanStatusWithSeq, 8192), blockStatusesChan: make(chan *heartbeatpb.TableSpanBlockStatus, 1024*1024), errCh: make(chan error, 1), - wg: wg, cancel: cancel, config: cfConfig, - filterConfig: toFilterConfigPB(cfConfig.Filter), + filterConfig: &eventpb.FilterConfig{CaseSensitive: cfConfig.CaseSensitive, ForceReplicate: cfConfig.ForceReplicate, FilterConfig: toFilterConfigPB(cfConfig.Filter)}, schemaIDToDispatchers: dispatcher.NewSchemaIDToDispatchers(), latestWatermark: NewWatermark(startTs), metricTableTriggerEventDispatcherCount: metrics.TableTriggerEventDispatcherGauge.WithLabelValues(changefeedID.Namespace(), changefeedID.Name()), @@ -185,9 +183,9 @@ func NewEventDispatcherManager( } } - wg.Add(1) + manager.wg.Add(1) go func() { - defer wg.Done() + defer manager.wg.Done() err = manager.sink.Run(ctx) if err != nil && !errors.Is(errors.Cause(err), context.Canceled) { select { @@ -201,23 +199,23 @@ func NewEventDispatcherManager( }() // collect errors from error channel - wg.Add(1) + manager.wg.Add(1) go func() { - defer wg.Done() + defer manager.wg.Done() manager.collectErrors(ctx) }() // collect heart beat info from all dispatchers - wg.Add(1) + manager.wg.Add(1) go func() { - defer wg.Done() + defer manager.wg.Done() manager.collectComponentStatusWhenChanged(ctx) }() // collect block status from all dispatchers - wg.Add(1) + manager.wg.Add(1) go func() { - defer wg.Done() + defer manager.wg.Done() manager.collectBlockStatusRequest(ctx) }() diff --git a/downstreamadapter/dispatchermanager/helper.go b/downstreamadapter/dispatchermanager/helper.go index f16690e85..30d964acf 100644 --- a/downstreamadapter/dispatchermanager/helper.go +++ b/downstreamadapter/dispatchermanager/helper.go @@ -104,8 +104,8 @@ func (d *DispatcherMap) ForEach(fn func(id common.DispatcherID, dispatcher *disp return seq } -func toFilterConfigPB(filter *config.FilterConfig) *eventpb.FilterConfig { - filterConfig := &eventpb.FilterConfig{ +func toFilterConfigPB(filter *config.FilterConfig) *eventpb.InnerFilterConfig { + filterConfig := &eventpb.InnerFilterConfig{ Rules: filter.Rules, IgnoreTxnStartTs: filter.IgnoreTxnStartTs, EventFilters: make([]*eventpb.EventFilterRule, len(filter.EventFilters)), diff --git a/downstreamadapter/eventcollector/event_collector.go b/downstreamadapter/eventcollector/event_collector.go index adafa75e0..1b45b87d8 100644 --- a/downstreamadapter/eventcollector/event_collector.go +++ b/downstreamadapter/eventcollector/event_collector.go @@ -182,8 +182,7 @@ func (c *EventCollector) AddDispatcher(target dispatcher.EventDispatcher, memory c.dispatcherMap.Store(target.GetId(), stat) metrics.EventCollectorRegisteredDispatcherCount.Inc() - areaSetting := dynstream.NewAreaSettings() - areaSetting.MaxPendingSize = memoryQuota + areaSetting := dynstream.NewAreaSettingsWithMaxPendingSize(memoryQuota) err := c.ds.AddPath(target.GetId(), stat, areaSetting) if err != nil { log.Info("add dispatcher to dynamic stream failed", zap.Error(err)) diff --git a/eventpb/event.pb.go b/eventpb/event.pb.go index 07e042c38..507c4f6f2 100644 --- a/eventpb/event.pb.go +++ b/eventpb/event.pb.go @@ -184,17 +184,77 @@ func (m *EventFilterRule) GetIgnoreDeleteValueExpr() string { return "" } -type FilterConfig struct { +type InnerFilterConfig struct { Rules []string `protobuf:"bytes,1,rep,name=rules,proto3" json:"rules,omitempty"` IgnoreTxnStartTs []uint64 `protobuf:"varint,2,rep,packed,name=ignore_txn_start_ts,json=ignoreTxnStartTs,proto3" json:"ignore_txn_start_ts,omitempty"` EventFilters []*EventFilterRule `protobuf:"bytes,3,rep,name=EventFilters,proto3" json:"EventFilters,omitempty"` } +func (m *InnerFilterConfig) Reset() { *m = InnerFilterConfig{} } +func (m *InnerFilterConfig) String() string { return proto.CompactTextString(m) } +func (*InnerFilterConfig) ProtoMessage() {} +func (*InnerFilterConfig) Descriptor() ([]byte, []int) { + return fileDescriptor_d7fb2554dfcf7f7d, []int{1} +} +func (m *InnerFilterConfig) XXX_Unmarshal(b []byte) error { + return m.Unmarshal(b) +} +func (m *InnerFilterConfig) XXX_Marshal(b []byte, deterministic bool) ([]byte, error) { + if deterministic { + return xxx_messageInfo_InnerFilterConfig.Marshal(b, m, deterministic) + } else { + b = b[:cap(b)] + n, err := m.MarshalToSizedBuffer(b) + if err != nil { + return nil, err + } + return b[:n], nil + } +} +func (m *InnerFilterConfig) XXX_Merge(src proto.Message) { + xxx_messageInfo_InnerFilterConfig.Merge(m, src) +} +func (m *InnerFilterConfig) XXX_Size() int { + return m.Size() +} +func (m *InnerFilterConfig) XXX_DiscardUnknown() { + xxx_messageInfo_InnerFilterConfig.DiscardUnknown(m) +} + +var xxx_messageInfo_InnerFilterConfig proto.InternalMessageInfo + +func (m *InnerFilterConfig) GetRules() []string { + if m != nil { + return m.Rules + } + return nil +} + +func (m *InnerFilterConfig) GetIgnoreTxnStartTs() []uint64 { + if m != nil { + return m.IgnoreTxnStartTs + } + return nil +} + +func (m *InnerFilterConfig) GetEventFilters() []*EventFilterRule { + if m != nil { + return m.EventFilters + } + return nil +} + +type FilterConfig struct { + CaseSensitive bool `protobuf:"varint,1,opt,name=caseSensitive,proto3" json:"caseSensitive,omitempty"` + ForceReplicate bool `protobuf:"varint,2,opt,name=forceReplicate,proto3" json:"forceReplicate,omitempty"` + FilterConfig *InnerFilterConfig `protobuf:"bytes,3,opt,name=filterConfig,proto3" json:"filterConfig,omitempty"` +} + func (m *FilterConfig) Reset() { *m = FilterConfig{} } func (m *FilterConfig) String() string { return proto.CompactTextString(m) } func (*FilterConfig) ProtoMessage() {} func (*FilterConfig) Descriptor() ([]byte, []int) { - return fileDescriptor_d7fb2554dfcf7f7d, []int{1} + return fileDescriptor_d7fb2554dfcf7f7d, []int{2} } func (m *FilterConfig) XXX_Unmarshal(b []byte) error { return m.Unmarshal(b) @@ -223,23 +283,23 @@ func (m *FilterConfig) XXX_DiscardUnknown() { var xxx_messageInfo_FilterConfig proto.InternalMessageInfo -func (m *FilterConfig) GetRules() []string { +func (m *FilterConfig) GetCaseSensitive() bool { if m != nil { - return m.Rules + return m.CaseSensitive } - return nil + return false } -func (m *FilterConfig) GetIgnoreTxnStartTs() []uint64 { +func (m *FilterConfig) GetForceReplicate() bool { if m != nil { - return m.IgnoreTxnStartTs + return m.ForceReplicate } - return nil + return false } -func (m *FilterConfig) GetEventFilters() []*EventFilterRule { +func (m *FilterConfig) GetFilterConfig() *InnerFilterConfig { if m != nil { - return m.EventFilters + return m.FilterConfig } return nil } @@ -251,7 +311,7 @@ func (m *ResolvedTs) Reset() { *m = ResolvedTs{} } func (m *ResolvedTs) String() string { return proto.CompactTextString(m) } func (*ResolvedTs) ProtoMessage() {} func (*ResolvedTs) Descriptor() ([]byte, []int) { - return fileDescriptor_d7fb2554dfcf7f7d, []int{2} + return fileDescriptor_d7fb2554dfcf7f7d, []int{3} } func (m *ResolvedTs) XXX_Unmarshal(b []byte) error { return m.Unmarshal(b) @@ -293,7 +353,7 @@ func (m *Event) Reset() { *m = Event{} } func (m *Event) String() string { return proto.CompactTextString(m) } func (*Event) ProtoMessage() {} func (*Event) Descriptor() ([]byte, []int) { - return fileDescriptor_d7fb2554dfcf7f7d, []int{3} + return fileDescriptor_d7fb2554dfcf7f7d, []int{4} } func (m *Event) XXX_Unmarshal(b []byte) error { return m.Unmarshal(b) @@ -360,7 +420,7 @@ func (m *TxnEvent) Reset() { *m = TxnEvent{} } func (m *TxnEvent) String() string { return proto.CompactTextString(m) } func (*TxnEvent) ProtoMessage() {} func (*TxnEvent) Descriptor() ([]byte, []int) { - return fileDescriptor_d7fb2554dfcf7f7d, []int{4} + return fileDescriptor_d7fb2554dfcf7f7d, []int{5} } func (m *TxnEvent) XXX_Unmarshal(b []byte) error { return m.Unmarshal(b) @@ -417,7 +477,7 @@ func (m *TableInfo) Reset() { *m = TableInfo{} } func (m *TableInfo) String() string { return proto.CompactTextString(m) } func (*TableInfo) ProtoMessage() {} func (*TableInfo) Descriptor() ([]byte, []int) { - return fileDescriptor_d7fb2554dfcf7f7d, []int{5} + return fileDescriptor_d7fb2554dfcf7f7d, []int{6} } func (m *TableInfo) XXX_Unmarshal(b []byte) error { return m.Unmarshal(b) @@ -458,7 +518,7 @@ func (m *EventFeed) Reset() { *m = EventFeed{} } func (m *EventFeed) String() string { return proto.CompactTextString(m) } func (*EventFeed) ProtoMessage() {} func (*EventFeed) Descriptor() ([]byte, []int) { - return fileDescriptor_d7fb2554dfcf7f7d, []int{6} + return fileDescriptor_d7fb2554dfcf7f7d, []int{7} } func (m *EventFeed) XXX_Unmarshal(b []byte) error { return m.Unmarshal(b) @@ -540,7 +600,7 @@ func (m *RegisterDispatcherRequest) Reset() { *m = RegisterDispatcherReq func (m *RegisterDispatcherRequest) String() string { return proto.CompactTextString(m) } func (*RegisterDispatcherRequest) ProtoMessage() {} func (*RegisterDispatcherRequest) Descriptor() ([]byte, []int) { - return fileDescriptor_d7fb2554dfcf7f7d, []int{7} + return fileDescriptor_d7fb2554dfcf7f7d, []int{8} } func (m *RegisterDispatcherRequest) XXX_Unmarshal(b []byte) error { return m.Unmarshal(b) @@ -650,6 +710,7 @@ func init() { proto.RegisterEnum("eventpb.OpType", OpType_name, OpType_value) proto.RegisterEnum("eventpb.ActionType", ActionType_name, ActionType_value) proto.RegisterType((*EventFilterRule)(nil), "eventpb.EventFilterRule") + proto.RegisterType((*InnerFilterConfig)(nil), "eventpb.InnerFilterConfig") proto.RegisterType((*FilterConfig)(nil), "eventpb.FilterConfig") proto.RegisterType((*ResolvedTs)(nil), "eventpb.ResolvedTs") proto.RegisterType((*Event)(nil), "eventpb.Event") @@ -662,68 +723,71 @@ func init() { func init() { proto.RegisterFile("eventpb/event.proto", fileDescriptor_d7fb2554dfcf7f7d) } var fileDescriptor_d7fb2554dfcf7f7d = []byte{ - // 962 bytes of a gzipped FileDescriptorProto - 0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0x94, 0x55, 0x51, 0x6f, 0xe2, 0x46, - 0x17, 0x8d, 0x81, 0x00, 0xbe, 0x26, 0x1b, 0x67, 0xb2, 0xd9, 0xcf, 0xd9, 0xec, 0xf2, 0xb1, 0x3c, - 0x54, 0x34, 0x52, 0x49, 0x4b, 0x5b, 0x55, 0x5a, 0x55, 0x2b, 0xa5, 0xe0, 0x6c, 0xfd, 0xb0, 0x09, - 0x1a, 0x9c, 0x95, 0xda, 0x17, 0xcb, 0xd8, 0x03, 0x71, 0xeb, 0x8c, 0x1d, 0xcf, 0xc0, 0xc2, 0xbf, - 0x68, 0x9f, 0xfb, 0x87, 0xfa, 0xb8, 0x8f, 0x7d, 0x6b, 0x95, 0x48, 0xed, 0xdf, 0xa8, 0x3c, 0x63, - 0x8c, 0x59, 0xaa, 0x4a, 0x7d, 0x62, 0xe6, 0x9e, 0x73, 0x67, 0xee, 0x39, 0x73, 0xaf, 0x81, 0x43, - 0x32, 0x27, 0x94, 0xc7, 0xe3, 0x33, 0xf1, 0xdb, 0x8d, 0x93, 0x88, 0x47, 0xa8, 0x96, 0x05, 0x9f, - 0x9e, 0xdc, 0x10, 0x37, 0xe1, 0x63, 0xe2, 0xa6, 0x8c, 0x7c, 0x2d, 0x59, 0xed, 0xdf, 0x4b, 0xb0, - 0x6f, 0xa6, 0xc4, 0x8b, 0x20, 0xe4, 0x24, 0xc1, 0xb3, 0x90, 0x20, 0x03, 0x6a, 0xb7, 0x2e, 0xf7, - 0x6e, 0x48, 0x62, 0x28, 0xad, 0x72, 0x47, 0xc5, 0xab, 0x2d, 0x7a, 0x01, 0x8d, 0x60, 0x4a, 0xa3, - 0x84, 0x38, 0xe2, 0x70, 0xa3, 0x24, 0x60, 0x4d, 0xc6, 0xc4, 0x31, 0xe8, 0x39, 0x40, 0x46, 0x61, - 0x77, 0xa1, 0x51, 0x16, 0x04, 0x55, 0x46, 0x46, 0x77, 0x21, 0xfa, 0x0a, 0x8c, 0x0c, 0x0e, 0x28, - 0x23, 0x09, 0x77, 0xe6, 0x6e, 0x38, 0x23, 0x0e, 0x59, 0xc4, 0x89, 0x51, 0x69, 0x29, 0x1d, 0x15, - 0x1f, 0x49, 0xdc, 0x12, 0xf0, 0xdb, 0x14, 0x35, 0x17, 0x71, 0x82, 0x5e, 0xc1, 0xb3, 0x2c, 0x71, - 0x16, 0xfb, 0x2e, 0x27, 0x0e, 0x25, 0xef, 0x8a, 0xc9, 0xbb, 0x22, 0x39, 0x3b, 0xfc, 0x5a, 0x50, - 0x2e, 0xc9, 0xbb, 0x7f, 0xc9, 0x8f, 0x42, 0xbf, 0x98, 0x5f, 0xdd, 0xce, 0xbf, 0x0a, 0xfd, 0x75, - 0xfe, 0xba, 0x70, 0x9f, 0x84, 0x84, 0x93, 0x62, 0x6e, 0xad, 0x58, 0xf8, 0x40, 0xc0, 0x79, 0x62, - 0xfb, 0x67, 0x05, 0x1a, 0xd2, 0xdc, 0x7e, 0x44, 0x27, 0xc1, 0x14, 0x3d, 0x86, 0xdd, 0x64, 0x16, - 0x12, 0x96, 0x99, 0x2b, 0x37, 0xe8, 0x13, 0x38, 0xcc, 0xce, 0xe7, 0x0b, 0xea, 0x30, 0xee, 0x26, - 0xdc, 0xe1, 0x4c, 0x38, 0x5c, 0xc1, 0xba, 0x84, 0xec, 0x05, 0x1d, 0xa5, 0x80, 0xcd, 0xd0, 0xd7, - 0xd0, 0x28, 0x3c, 0x1b, 0x13, 0x46, 0x6b, 0x3d, 0xa3, 0x9b, 0x3d, 0x7a, 0xf7, 0x83, 0x37, 0xc5, - 0x1b, 0xec, 0x76, 0x03, 0x00, 0x13, 0x16, 0x85, 0x73, 0xe2, 0xdb, 0xac, 0x3d, 0x83, 0x5d, 0xf9, - 0x76, 0x3a, 0x94, 0x7f, 0x24, 0x4b, 0x43, 0x69, 0x29, 0x9d, 0x06, 0x4e, 0x97, 0x69, 0xad, 0x42, - 0xa7, 0x51, 0x12, 0x31, 0xb9, 0x41, 0x4f, 0xa1, 0xbe, 0xf2, 0xc6, 0x28, 0x0b, 0x20, 0xdf, 0xa3, - 0x0e, 0xd4, 0xa2, 0xd8, 0xe1, 0xcb, 0x98, 0x88, 0xf7, 0x7c, 0xd4, 0xdb, 0xcf, 0x6b, 0xba, 0x8a, - 0xed, 0x65, 0x4c, 0x70, 0x35, 0x12, 0xbf, 0xed, 0x1f, 0xa0, 0x6e, 0x2f, 0xa8, 0xbc, 0xf9, 0x23, - 0xa8, 0x0a, 0x96, 0x34, 0x45, 0xeb, 0x3d, 0xda, 0x14, 0x82, 0x33, 0x14, 0x9d, 0x80, 0xea, 0x45, - 0xb7, 0xb7, 0x41, 0xe6, 0x8d, 0xd2, 0xa9, 0xe0, 0xba, 0x0c, 0xd8, 0x0c, 0x1d, 0x43, 0x3d, 0xf7, - 0xad, 0x2c, 0xb0, 0x1a, 0x93, 0x76, 0xb5, 0x35, 0x50, 0x6d, 0x77, 0x1c, 0x12, 0x8b, 0x4e, 0xa2, - 0xf6, 0x5f, 0x0a, 0xa8, 0xd2, 0x0e, 0x42, 0x7c, 0xf4, 0x29, 0x40, 0xea, 0xf8, 0xc6, 0xf5, 0x07, - 0xf9, 0xf5, 0xab, 0x0a, 0xb1, 0xca, 0xb3, 0x15, 0x43, 0xff, 0x07, 0x2d, 0xc9, 0xdc, 0x5b, 0x97, - 0x01, 0x49, 0x6e, 0x28, 0x7a, 0x05, 0x7b, 0x7e, 0xc0, 0x62, 0x39, 0x34, 0x4e, 0xe0, 0x8b, 0x6a, - 0xb4, 0xde, 0x71, 0xb7, 0x30, 0x89, 0xdd, 0x41, 0xce, 0xb0, 0x06, 0xb8, 0xb1, 0xe6, 0x5b, 0xbe, - 0xe8, 0x10, 0x97, 0x07, 0x91, 0x70, 0xb0, 0x84, 0xe5, 0x06, 0x7d, 0x06, 0xc0, 0x53, 0x0d, 0x4e, - 0x40, 0x27, 0x91, 0xe8, 0x77, 0xad, 0x87, 0xd6, 0x85, 0xae, 0xe4, 0x61, 0x95, 0xe7, 0x4a, 0x7f, - 0xa9, 0xc0, 0x31, 0x26, 0xd3, 0x80, 0x71, 0x92, 0xac, 0xef, 0xc3, 0xe4, 0x6e, 0x46, 0x18, 0x4f, - 0xcb, 0xf4, 0x6e, 0x5c, 0x3a, 0x25, 0x13, 0x42, 0xfc, 0xb4, 0x4c, 0xe5, 0x1f, 0xca, 0xec, 0xe7, - 0x8c, 0xb4, 0xcc, 0x35, 0xdf, 0xf2, 0xb7, 0x65, 0x96, 0xfe, 0x9b, 0xcc, 0x2f, 0x57, 0x82, 0x58, - 0xec, 0xd2, 0xcc, 0xa3, 0x27, 0x1b, 0xc9, 0x42, 0xd4, 0x28, 0x76, 0x69, 0x26, 0x2a, 0x5d, 0x6e, - 0x3c, 0x73, 0x65, 0xe3, 0x99, 0xd3, 0xf6, 0x60, 0x24, 0x99, 0xcb, 0x6a, 0xe4, 0x17, 0xa1, 0x2e, - 0x03, 0x96, 0x8f, 0xbe, 0x00, 0xcd, 0xf5, 0x78, 0x10, 0x51, 0xd9, 0x9d, 0x55, 0xd1, 0x9d, 0x87, - 0xb9, 0x81, 0xe7, 0x02, 0x13, 0x1d, 0x0a, 0x6e, 0xbe, 0x46, 0x2f, 0x61, 0x6f, 0x22, 0xa6, 0xc6, - 0xf1, 0xc4, 0xf8, 0x8a, 0x61, 0xd7, 0x7a, 0x47, 0x79, 0x5e, 0x71, 0xb6, 0x71, 0x63, 0x52, 0x9c, - 0xf4, 0x53, 0x38, 0x20, 0x54, 0x2a, 0x5c, 0x52, 0xcf, 0x89, 0xa3, 0x80, 0x72, 0xa3, 0xde, 0x52, - 0x3a, 0x75, 0xbc, 0x2f, 0x81, 0xd1, 0x92, 0x7a, 0xc3, 0x34, 0x8c, 0xda, 0xb0, 0xb7, 0x26, 0xa5, - 0xd2, 0x54, 0x21, 0x4d, 0x63, 0x2b, 0x86, 0xcd, 0x50, 0x17, 0x0e, 0x0b, 0x9c, 0x80, 0x72, 0x92, - 0xcc, 0xdd, 0xd0, 0x00, 0xc1, 0x3c, 0xc8, 0x99, 0x56, 0x06, 0xa4, 0xdf, 0xe2, 0x88, 0x86, 0x4b, - 0x27, 0x21, 0x33, 0x46, 0x0c, 0x4d, 0x5c, 0xac, 0xa6, 0x11, 0x9c, 0x06, 0x4e, 0x3f, 0x86, 0xaa, - 0x1c, 0x49, 0xb4, 0x07, 0xaa, 0x5c, 0x0d, 0x67, 0x5c, 0xdf, 0x41, 0x3a, 0x34, 0xe4, 0x56, 0x7e, - 0xcb, 0x74, 0xe5, 0xf4, 0x4f, 0x05, 0x60, 0x6d, 0x10, 0x3a, 0x81, 0xff, 0x9d, 0xf7, 0x6d, 0xeb, - 0xea, 0xd2, 0xb1, 0xbf, 0x1b, 0x9a, 0xce, 0xf5, 0xe5, 0x68, 0x68, 0xf6, 0xad, 0x0b, 0xcb, 0x1c, - 0xe8, 0x3b, 0xc8, 0x80, 0xc7, 0x45, 0x10, 0x9b, 0xaf, 0xad, 0x91, 0x6d, 0x62, 0x5d, 0x41, 0x4f, - 0x00, 0x6d, 0x22, 0x6f, 0xae, 0xde, 0x9a, 0x7a, 0x09, 0x1d, 0xc1, 0x41, 0x31, 0x3e, 0x3c, 0xbf, - 0x1e, 0x99, 0x7a, 0x79, 0x9b, 0x3e, 0xba, 0x7e, 0x63, 0xea, 0x95, 0x0f, 0xe9, 0xd8, 0x1c, 0x99, - 0xb6, 0xbe, 0x8b, 0x5a, 0xf0, 0x6c, 0xeb, 0x14, 0xa7, 0xff, 0xed, 0xf9, 0xe5, 0x6b, 0xf3, 0xc2, - 0x34, 0x07, 0x7a, 0x15, 0xbd, 0x80, 0xe7, 0xdb, 0x07, 0x16, 0x29, 0xb5, 0x6f, 0x5e, 0xfe, 0x7a, - 0xdf, 0x54, 0xde, 0xdf, 0x37, 0x95, 0x3f, 0xee, 0x9b, 0xca, 0x4f, 0x0f, 0xcd, 0x9d, 0xf7, 0x0f, - 0xcd, 0x9d, 0xdf, 0x1e, 0x9a, 0x3b, 0xdf, 0xb7, 0xa6, 0x01, 0xbf, 0x99, 0x8d, 0xbb, 0x5e, 0x74, - 0x7b, 0x16, 0x07, 0x74, 0xea, 0xb9, 0xf1, 0x19, 0x0f, 0x3c, 0xdf, 0x3b, 0xcb, 0x3a, 0x61, 0x5c, - 0x15, 0x7f, 0xa9, 0x9f, 0xff, 0x1d, 0x00, 0x00, 0xff, 0xff, 0x04, 0x89, 0x5d, 0x24, 0x8f, 0x07, - 0x00, 0x00, + // 1021 bytes of a gzipped FileDescriptorProto + 0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0x94, 0x55, 0x5d, 0x6f, 0xe3, 0x44, + 0x17, 0xae, 0x93, 0x34, 0x1f, 0x27, 0xe9, 0xae, 0x3b, 0xdd, 0xee, 0xeb, 0xb6, 0xbb, 0x79, 0xb3, + 0x11, 0x5a, 0x85, 0x4a, 0xa4, 0x50, 0x40, 0x48, 0x2b, 0x54, 0xa9, 0xb4, 0xee, 0xe2, 0x8b, 0x6d, + 0xab, 0x89, 0xbb, 0x12, 0xdc, 0x58, 0xae, 0x7d, 0x92, 0x1a, 0xdc, 0xb1, 0xeb, 0x99, 0x64, 0x9b, + 0x7f, 0xc1, 0x15, 0x37, 0xf0, 0x83, 0xb8, 0xdc, 0x4b, 0xee, 0x40, 0xad, 0x04, 0x7f, 0x03, 0x79, + 0xc6, 0x71, 0x9c, 0x06, 0x90, 0xb8, 0xca, 0xcc, 0x79, 0x9e, 0x33, 0xe7, 0x39, 0x5f, 0x0e, 0x6c, + 0xe0, 0x04, 0x99, 0x88, 0x2f, 0xf7, 0xe4, 0x6f, 0x3f, 0x4e, 0x22, 0x11, 0x91, 0x5a, 0x66, 0xdc, + 0xde, 0xb9, 0x42, 0x37, 0x11, 0x97, 0xe8, 0xa6, 0x8c, 0xfc, 0xac, 0x58, 0xdd, 0xdf, 0x4a, 0xf0, + 0xd8, 0x4c, 0x89, 0x27, 0x41, 0x28, 0x30, 0xa1, 0xe3, 0x10, 0x89, 0x01, 0xb5, 0x6b, 0x57, 0x78, + 0x57, 0x98, 0x18, 0x5a, 0xa7, 0xdc, 0x6b, 0xd0, 0xd9, 0x95, 0xbc, 0x80, 0x56, 0x30, 0x62, 0x51, + 0x82, 0x8e, 0x7c, 0xdc, 0x28, 0x49, 0xb8, 0xa9, 0x6c, 0xf2, 0x19, 0xf2, 0x1c, 0x20, 0xa3, 0xf0, + 0x9b, 0xd0, 0x28, 0x4b, 0x42, 0x43, 0x59, 0x06, 0x37, 0x21, 0xf9, 0x02, 0x8c, 0x0c, 0x0e, 0x18, + 0xc7, 0x44, 0x38, 0x13, 0x37, 0x1c, 0xa3, 0x83, 0xb7, 0x71, 0x62, 0x54, 0x3a, 0x5a, 0xaf, 0x41, + 0x37, 0x15, 0x6e, 0x49, 0xf8, 0x6d, 0x8a, 0x9a, 0xb7, 0x71, 0x42, 0x0e, 0xe0, 0x59, 0xe6, 0x38, + 0x8e, 0x7d, 0x57, 0xa0, 0xc3, 0xf0, 0x5d, 0xd1, 0x79, 0x55, 0x3a, 0x67, 0x8f, 0x5f, 0x48, 0xca, + 0x29, 0xbe, 0xfb, 0x17, 0xff, 0x28, 0xf4, 0x8b, 0xfe, 0xd5, 0x65, 0xff, 0xb3, 0xd0, 0x9f, 0xfb, + 0xcf, 0x85, 0xfb, 0x18, 0xa2, 0xc0, 0xa2, 0x6f, 0xad, 0x28, 0xfc, 0x58, 0xc2, 0xb9, 0x63, 0xf7, + 0x47, 0x0d, 0xd6, 0x2d, 0xc6, 0x30, 0x51, 0x15, 0x3e, 0x8a, 0xd8, 0x30, 0x18, 0x91, 0x27, 0xb0, + 0x9a, 0x8c, 0x43, 0xe4, 0x59, 0x85, 0xd5, 0x85, 0x7c, 0x04, 0x1b, 0x59, 0x10, 0x71, 0xcb, 0x1c, + 0x2e, 0xdc, 0x44, 0x38, 0x82, 0xcb, 0x32, 0x57, 0xa8, 0xae, 0x20, 0xfb, 0x96, 0x0d, 0x52, 0xc0, + 0xe6, 0xe4, 0x4b, 0x68, 0x15, 0x7a, 0xc7, 0x65, 0xb5, 0x9b, 0xfb, 0x46, 0x3f, 0xeb, 0x7c, 0xff, + 0x41, 0x63, 0xe9, 0x02, 0xbb, 0xfb, 0xb3, 0x06, 0xad, 0x05, 0x4d, 0x1f, 0xc0, 0x9a, 0xe7, 0x72, + 0x1c, 0x20, 0xe3, 0x81, 0x08, 0x26, 0x68, 0x68, 0x1d, 0xad, 0x57, 0xa7, 0x8b, 0x46, 0xf2, 0x12, + 0x1e, 0x0d, 0xa3, 0xc4, 0x43, 0x8a, 0x71, 0x18, 0x78, 0xae, 0x40, 0xa3, 0x24, 0x69, 0x0f, 0xac, + 0xe4, 0x00, 0x5a, 0xc3, 0xc2, 0xeb, 0x46, 0xb9, 0xa3, 0xf5, 0x9a, 0xfb, 0xdb, 0xb9, 0xb8, 0xa5, + 0x9a, 0xd0, 0x05, 0x7e, 0xb7, 0x05, 0x40, 0x91, 0x47, 0xe1, 0x04, 0x7d, 0x9b, 0x77, 0xc7, 0xb0, + 0xaa, 0xe6, 0x4b, 0x87, 0xf2, 0xf7, 0x38, 0x95, 0xd2, 0x5a, 0x34, 0x3d, 0xa6, 0xa5, 0x94, 0xbd, + 0x90, 0x3a, 0x5a, 0x54, 0x5d, 0xc8, 0x36, 0xd4, 0x67, 0xfd, 0x93, 0xa1, 0x5b, 0x34, 0xbf, 0x93, + 0x1e, 0xd4, 0xa2, 0xd8, 0x11, 0xd3, 0x18, 0xe5, 0xcc, 0x3d, 0xda, 0x7f, 0x9c, 0xab, 0x3a, 0x8b, + 0xed, 0x69, 0x8c, 0xb4, 0x1a, 0xc9, 0xdf, 0xee, 0x77, 0x50, 0xb7, 0x6f, 0x99, 0x8a, 0xfc, 0x12, + 0xaa, 0x92, 0xa5, 0x7a, 0xd6, 0xdc, 0x7f, 0xb4, 0x58, 0x67, 0x9a, 0xa1, 0x64, 0x07, 0x1a, 0x5e, + 0x74, 0x7d, 0x1d, 0x64, 0xad, 0xd3, 0x7a, 0x15, 0x5a, 0x57, 0x06, 0x9b, 0x93, 0x2d, 0xa8, 0xe7, + 0x6d, 0x2d, 0x4b, 0xac, 0xc6, 0x55, 0x37, 0xbb, 0x4d, 0x68, 0xd8, 0xee, 0x65, 0x88, 0x16, 0x1b, + 0x46, 0xdd, 0x3f, 0x35, 0x68, 0xa8, 0x6e, 0x21, 0xfa, 0xe4, 0x63, 0x80, 0x74, 0x20, 0x16, 0xc2, + 0xaf, 0xe7, 0xe1, 0x67, 0x0a, 0x69, 0x43, 0x64, 0x27, 0x4e, 0xfe, 0x0f, 0xcd, 0x24, 0xab, 0xde, + 0x5c, 0x06, 0x24, 0x79, 0x41, 0xc9, 0x01, 0xac, 0xf9, 0x01, 0x8f, 0xd5, 0x62, 0x3b, 0x81, 0x9f, + 0xf5, 0x67, 0xab, 0x5f, 0xf8, 0x5a, 0xf4, 0x8f, 0x73, 0x86, 0x75, 0x4c, 0x5b, 0x73, 0xbe, 0xe5, + 0xcb, 0x01, 0x76, 0x45, 0x10, 0xc9, 0x0a, 0x96, 0xa8, 0xba, 0x90, 0x4f, 0x00, 0x44, 0x9a, 0x83, + 0x13, 0xb0, 0x61, 0x24, 0x77, 0xb2, 0xb9, 0x4f, 0xe6, 0x42, 0x67, 0xe9, 0xd1, 0x86, 0xc8, 0x33, + 0xfd, 0xa9, 0x02, 0x5b, 0x14, 0x47, 0x01, 0x17, 0x98, 0xcc, 0xe3, 0x51, 0xbc, 0x19, 0x23, 0x17, + 0xa9, 0x4c, 0xef, 0xca, 0x65, 0x23, 0x1c, 0x22, 0xfa, 0xa9, 0x4c, 0xed, 0x6f, 0x64, 0x1e, 0xe5, + 0x8c, 0x54, 0xe6, 0x9c, 0x6f, 0xf9, 0xcb, 0x69, 0x96, 0xfe, 0x5b, 0x9a, 0x9f, 0xcf, 0x12, 0xe2, + 0xb1, 0xcb, 0xb2, 0x1a, 0x3d, 0x5d, 0x70, 0x96, 0x49, 0x0d, 0x62, 0x97, 0x65, 0x49, 0xa5, 0xc7, + 0x85, 0x36, 0x57, 0x16, 0xda, 0x9c, 0x8e, 0x07, 0xc7, 0x64, 0xa2, 0xd4, 0xa8, 0xaf, 0x56, 0x5d, + 0x19, 0x2c, 0x9f, 0x7c, 0x06, 0x4d, 0xd7, 0x13, 0x41, 0xc4, 0xd4, 0x74, 0x56, 0xe5, 0x74, 0x6e, + 0xe4, 0x05, 0x3c, 0x94, 0x98, 0x9c, 0x50, 0x70, 0xf3, 0x33, 0x79, 0x05, 0x6b, 0x6a, 0x75, 0x1c, + 0x4f, 0xed, 0x5a, 0x4d, 0xea, 0xdc, 0xcc, 0xfd, 0xfe, 0x79, 0xcd, 0xc8, 0x2e, 0xac, 0x23, 0x53, + 0x19, 0x4e, 0x99, 0xe7, 0xc4, 0x51, 0xc0, 0x84, 0x51, 0x97, 0x1b, 0xfd, 0x58, 0x01, 0x83, 0x29, + 0xf3, 0xce, 0x53, 0x33, 0xe9, 0xc2, 0xda, 0x9c, 0x94, 0xa6, 0xd6, 0x90, 0xa9, 0x35, 0xf9, 0x8c, + 0x61, 0x73, 0xd2, 0x87, 0x8d, 0x02, 0x27, 0x60, 0x02, 0x93, 0x89, 0x1b, 0x1a, 0x20, 0x99, 0xeb, + 0x39, 0xd3, 0xca, 0x80, 0xf4, 0xff, 0x22, 0x62, 0xe1, 0xd4, 0x49, 0x70, 0xcc, 0xd1, 0x68, 0xca, + 0xc0, 0x8d, 0xd4, 0x42, 0x53, 0xc3, 0xee, 0x87, 0x50, 0x55, 0x2b, 0x49, 0xd6, 0xa0, 0xa1, 0x4e, + 0xe7, 0x63, 0xa1, 0xaf, 0x10, 0x1d, 0x5a, 0xea, 0xaa, 0xbe, 0xb7, 0xba, 0xb6, 0xfb, 0x87, 0x06, + 0x30, 0x2f, 0x10, 0xd9, 0x81, 0xff, 0x1d, 0x1e, 0xd9, 0xd6, 0xd9, 0xa9, 0x63, 0x7f, 0x73, 0x6e, + 0x3a, 0x17, 0xa7, 0x83, 0x73, 0xf3, 0xc8, 0x3a, 0xb1, 0xcc, 0x63, 0x7d, 0x85, 0x18, 0xf0, 0xa4, + 0x08, 0x52, 0xf3, 0xb5, 0x35, 0xb0, 0x4d, 0xaa, 0x6b, 0xe4, 0x29, 0x90, 0x45, 0xe4, 0xcd, 0xd9, + 0x5b, 0x53, 0x2f, 0x91, 0x4d, 0x58, 0x2f, 0xda, 0xcf, 0x0f, 0x2f, 0x06, 0xa6, 0x5e, 0x5e, 0xa6, + 0x0f, 0x2e, 0xde, 0x98, 0x7a, 0xe5, 0x21, 0x9d, 0x9a, 0x03, 0xd3, 0xd6, 0x57, 0x49, 0x07, 0x9e, + 0x2d, 0xbd, 0xe2, 0x1c, 0x7d, 0x7d, 0x78, 0xfa, 0xda, 0x3c, 0x31, 0xcd, 0x63, 0xbd, 0x4a, 0x5e, + 0xc0, 0xf3, 0xe5, 0x07, 0x8b, 0x94, 0xda, 0x57, 0xaf, 0x7e, 0xb9, 0x6b, 0x6b, 0xef, 0xef, 0xda, + 0xda, 0xef, 0x77, 0x6d, 0xed, 0x87, 0xfb, 0xf6, 0xca, 0xfb, 0xfb, 0xf6, 0xca, 0xaf, 0xf7, 0xed, + 0x95, 0x6f, 0x3b, 0xa3, 0x40, 0x5c, 0x8d, 0x2f, 0xfb, 0x5e, 0x74, 0xbd, 0x17, 0x07, 0x6c, 0xe4, + 0xb9, 0xf1, 0x9e, 0x08, 0x3c, 0xdf, 0xdb, 0xcb, 0x26, 0xe1, 0xb2, 0x2a, 0xff, 0xf6, 0x3f, 0xfd, + 0x2b, 0x00, 0x00, 0xff, 0xff, 0x40, 0x60, 0xc1, 0x74, 0x33, 0x08, 0x00, 0x00, } func (m *EventFilterRule) Marshal() (dAtA []byte, err error) { @@ -804,7 +868,7 @@ func (m *EventFilterRule) MarshalToSizedBuffer(dAtA []byte) (int, error) { return len(dAtA) - i, nil } -func (m *FilterConfig) Marshal() (dAtA []byte, err error) { +func (m *InnerFilterConfig) Marshal() (dAtA []byte, err error) { size := m.Size() dAtA = make([]byte, size) n, err := m.MarshalToSizedBuffer(dAtA[:size]) @@ -814,12 +878,12 @@ func (m *FilterConfig) Marshal() (dAtA []byte, err error) { return dAtA[:n], nil } -func (m *FilterConfig) MarshalTo(dAtA []byte) (int, error) { +func (m *InnerFilterConfig) MarshalTo(dAtA []byte) (int, error) { size := m.Size() return m.MarshalToSizedBuffer(dAtA[:size]) } -func (m *FilterConfig) MarshalToSizedBuffer(dAtA []byte) (int, error) { +func (m *InnerFilterConfig) MarshalToSizedBuffer(dAtA []byte) (int, error) { i := len(dAtA) _ = i var l int @@ -868,6 +932,61 @@ func (m *FilterConfig) MarshalToSizedBuffer(dAtA []byte) (int, error) { return len(dAtA) - i, nil } +func (m *FilterConfig) Marshal() (dAtA []byte, err error) { + size := m.Size() + dAtA = make([]byte, size) + n, err := m.MarshalToSizedBuffer(dAtA[:size]) + if err != nil { + return nil, err + } + return dAtA[:n], nil +} + +func (m *FilterConfig) MarshalTo(dAtA []byte) (int, error) { + size := m.Size() + return m.MarshalToSizedBuffer(dAtA[:size]) +} + +func (m *FilterConfig) MarshalToSizedBuffer(dAtA []byte) (int, error) { + i := len(dAtA) + _ = i + var l int + _ = l + if m.FilterConfig != nil { + { + size, err := m.FilterConfig.MarshalToSizedBuffer(dAtA[:i]) + if err != nil { + return 0, err + } + i -= size + i = encodeVarintEvent(dAtA, i, uint64(size)) + } + i-- + dAtA[i] = 0x1a + } + if m.ForceReplicate { + i-- + if m.ForceReplicate { + dAtA[i] = 1 + } else { + dAtA[i] = 0 + } + i-- + dAtA[i] = 0x10 + } + if m.CaseSensitive { + i-- + if m.CaseSensitive { + dAtA[i] = 1 + } else { + dAtA[i] = 0 + } + i-- + dAtA[i] = 0x8 + } + return len(dAtA) - i, nil +} + func (m *ResolvedTs) Marshal() (dAtA []byte, err error) { size := m.Size() dAtA = make([]byte, size) @@ -1254,7 +1373,7 @@ func (m *EventFilterRule) Size() (n int) { return n } -func (m *FilterConfig) Size() (n int) { +func (m *InnerFilterConfig) Size() (n int) { if m == nil { return 0 } @@ -1282,6 +1401,25 @@ func (m *FilterConfig) Size() (n int) { return n } +func (m *FilterConfig) Size() (n int) { + if m == nil { + return 0 + } + var l int + _ = l + if m.CaseSensitive { + n += 2 + } + if m.ForceReplicate { + n += 2 + } + if m.FilterConfig != nil { + l = m.FilterConfig.Size() + n += 1 + l + sovEvent(uint64(l)) + } + return n +} + func (m *ResolvedTs) Size() (n int) { if m == nil { return 0 @@ -1701,7 +1839,7 @@ func (m *EventFilterRule) Unmarshal(dAtA []byte) error { } return nil } -func (m *FilterConfig) Unmarshal(dAtA []byte) error { +func (m *InnerFilterConfig) Unmarshal(dAtA []byte) error { l := len(dAtA) iNdEx := 0 for iNdEx < l { @@ -1724,10 +1862,10 @@ func (m *FilterConfig) Unmarshal(dAtA []byte) error { fieldNum := int32(wire >> 3) wireType := int(wire & 0x7) if wireType == 4 { - return fmt.Errorf("proto: FilterConfig: wiretype end group for non-group") + return fmt.Errorf("proto: InnerFilterConfig: wiretype end group for non-group") } if fieldNum <= 0 { - return fmt.Errorf("proto: FilterConfig: illegal tag %d (wire type %d)", fieldNum, wire) + return fmt.Errorf("proto: InnerFilterConfig: illegal tag %d (wire type %d)", fieldNum, wire) } switch fieldNum { case 1: @@ -1893,6 +2031,132 @@ func (m *FilterConfig) Unmarshal(dAtA []byte) error { } return nil } +func (m *FilterConfig) Unmarshal(dAtA []byte) error { + l := len(dAtA) + iNdEx := 0 + for iNdEx < l { + preIndex := iNdEx + var wire uint64 + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return ErrIntOverflowEvent + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + wire |= uint64(b&0x7F) << shift + if b < 0x80 { + break + } + } + fieldNum := int32(wire >> 3) + wireType := int(wire & 0x7) + if wireType == 4 { + return fmt.Errorf("proto: FilterConfig: wiretype end group for non-group") + } + if fieldNum <= 0 { + return fmt.Errorf("proto: FilterConfig: illegal tag %d (wire type %d)", fieldNum, wire) + } + switch fieldNum { + case 1: + if wireType != 0 { + return fmt.Errorf("proto: wrong wireType = %d for field CaseSensitive", wireType) + } + var v int + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return ErrIntOverflowEvent + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + v |= int(b&0x7F) << shift + if b < 0x80 { + break + } + } + m.CaseSensitive = bool(v != 0) + case 2: + if wireType != 0 { + return fmt.Errorf("proto: wrong wireType = %d for field ForceReplicate", wireType) + } + var v int + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return ErrIntOverflowEvent + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + v |= int(b&0x7F) << shift + if b < 0x80 { + break + } + } + m.ForceReplicate = bool(v != 0) + case 3: + if wireType != 2 { + return fmt.Errorf("proto: wrong wireType = %d for field FilterConfig", wireType) + } + var msglen int + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return ErrIntOverflowEvent + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + msglen |= int(b&0x7F) << shift + if b < 0x80 { + break + } + } + if msglen < 0 { + return ErrInvalidLengthEvent + } + postIndex := iNdEx + msglen + if postIndex < 0 { + return ErrInvalidLengthEvent + } + if postIndex > l { + return io.ErrUnexpectedEOF + } + if m.FilterConfig == nil { + m.FilterConfig = &InnerFilterConfig{} + } + if err := m.FilterConfig.Unmarshal(dAtA[iNdEx:postIndex]); err != nil { + return err + } + iNdEx = postIndex + default: + iNdEx = preIndex + skippy, err := skipEvent(dAtA[iNdEx:]) + if err != nil { + return err + } + if (skippy < 0) || (iNdEx+skippy) < 0 { + return ErrInvalidLengthEvent + } + if (iNdEx + skippy) > l { + return io.ErrUnexpectedEOF + } + iNdEx += skippy + } + } + + if iNdEx > l { + return io.ErrUnexpectedEOF + } + return nil +} func (m *ResolvedTs) Unmarshal(dAtA []byte) error { l := len(dAtA) iNdEx := 0 diff --git a/eventpb/event.proto b/eventpb/event.proto index 623bfa30b..0325420b6 100644 --- a/eventpb/event.proto +++ b/eventpb/event.proto @@ -14,12 +14,18 @@ message EventFilterRule { string ignore_delete_value_expr = 7; } -message FilterConfig { +message InnerFilterConfig { repeated string rules = 1; repeated uint64 ignore_txn_start_ts = 2; repeated EventFilterRule EventFilters = 3; } +message FilterConfig { + bool caseSensitive = 1; + bool forceReplicate = 2; + InnerFilterConfig filterConfig = 3; +} + message ResolvedTs { diff --git a/logservice/schemastore/disk_format.go b/logservice/schemastore/disk_format.go index f9515d87c..1cc5e4a68 100644 --- a/logservice/schemastore/disk_format.go +++ b/logservice/schemastore/disk_format.go @@ -244,6 +244,66 @@ func loadTablesInKVSnap( return tablesInKVSnap, partitionsInKVSnap, nil } +func loadFullTablesInKVSnap( + snap *pebble.Snapshot, + gcTs uint64, + databaseMap map[int64]*BasicDatabaseInfo, +) (map[int64]*model.TableInfo, map[int64]*BasicTableInfo, map[int64]BasicPartitionInfo, error) { + tableInfosInKVSnap := make(map[int64]*model.TableInfo) + tablesInKVSnap := make(map[int64]*BasicTableInfo) + partitionsInKVSnap := make(map[int64]BasicPartitionInfo) + + startKey, err := tableInfoKey(gcTs, 0) + if err != nil { + log.Fatal("generate lower bound failed", zap.Error(err)) + } + endKey, err := tableInfoKey(gcTs, math.MaxInt64) + if err != nil { + log.Fatal("generate upper bound failed", zap.Error(err)) + } + snapIter, err := snap.NewIter(&pebble.IterOptions{ + LowerBound: startKey, + UpperBound: endKey, + }) + if err != nil { + log.Fatal("new iterator failed", zap.Error(err)) + } + defer snapIter.Close() + for snapIter.First(); snapIter.Valid(); snapIter.Next() { + var table_info_entry PersistedTableInfoEntry + if _, err := table_info_entry.UnmarshalMsg(snapIter.Value()); err != nil { + log.Fatal("unmarshal table info entry failed", zap.Error(err)) + } + + tableInfo := model.TableInfo{} + if err := json.Unmarshal(table_info_entry.TableInfoValue, &tableInfo); err != nil { + log.Fatal("unmarshal table info failed", zap.Error(err)) + } + databaseInfo, ok := databaseMap[table_info_entry.SchemaID] + if !ok { + log.Panic("database not found", + zap.Int64("schemaID", table_info_entry.SchemaID), + zap.String("schemaName", table_info_entry.SchemaName), + zap.String("tableName", tableInfo.Name.O)) + } + // TODO: add a unit test for this case + tableInfosInKVSnap[tableInfo.ID] = &tableInfo + databaseInfo.Tables[tableInfo.ID] = true + tablesInKVSnap[tableInfo.ID] = &BasicTableInfo{ + SchemaID: table_info_entry.SchemaID, + Name: tableInfo.Name.O, + } + if tableInfo.Partition != nil { + partitionInfo := make(BasicPartitionInfo) + for _, partition := range tableInfo.Partition.Definitions { + partitionInfo[partition.ID] = nil + } + partitionsInKVSnap[tableInfo.ID] = partitionInfo + } + } + return tableInfosInKVSnap, tablesInKVSnap, partitionsInKVSnap, nil +} + // load the ddl jobs in the range (gcTs, upperBound] and apply the ddl job to update database and table info func loadAndApplyDDLHistory( snap *pebble.Snapshot, @@ -581,7 +641,7 @@ func loadAllPhysicalTablesAtTs( return nil, err } - tableMap, partitionMap, err := loadTablesInKVSnap(storageSnap, gcTs, databaseMap) + tableInfoMap, tableMap, partitionMap, err := loadFullTablesInKVSnap(storageSnap, gcTs, databaseMap) if err != nil { return nil, err } @@ -632,7 +692,11 @@ func loadAllPhysicalTablesAtTs( zap.Any("databaseMapLen", len(databaseMap))) } schemaName := databaseMap[tableInfo.SchemaID].Name - if tableFilter != nil && tableFilter.ShouldIgnoreTable(schemaName, tableInfo.Name) { + fullTableInfo, ok := tableInfoMap[tableID] + if !ok { + log.Error("table info not found", zap.Any("tableId", tableID)) + } + if tableFilter != nil && tableFilter.ShouldIgnoreTable(schemaName, tableInfo.Name, fullTableInfo) { continue } if partitionInfo, ok := partitionMap[tableID]; ok { diff --git a/logservice/schemastore/persist_storage_ddl_handlers.go b/logservice/schemastore/persist_storage_ddl_handlers.go index dab735676..32e08af96 100644 --- a/logservice/schemastore/persist_storage_ddl_handlers.go +++ b/logservice/schemastore/persist_storage_ddl_handlers.go @@ -1385,10 +1385,10 @@ func buildDDLEventCommon(rawEvent *PersistedDDLEvent, tableFilter filter.Filter, filtered := false // TODO: ShouldDiscardDDL is used for old architecture, should be removed later if tableFilter != nil && rawEvent.CurrentSchemaName != "" && rawEvent.CurrentTableName != "" { - filtered = tableFilter.ShouldDiscardDDL(model.ActionType(rawEvent.Type), rawEvent.CurrentSchemaName, rawEvent.CurrentTableName) + filtered = tableFilter.ShouldDiscardDDL(model.ActionType(rawEvent.Type), rawEvent.CurrentSchemaName, rawEvent.CurrentTableName, rawEvent.TableInfo) // if the ddl invovles another table name, only set filtered to true when all of them should be filtered if rawEvent.PrevSchemaName != "" && rawEvent.PrevTableName != "" { - filtered = filtered && tableFilter.ShouldDiscardDDL(model.ActionType(rawEvent.Type), rawEvent.PrevSchemaName, rawEvent.PrevTableName) + filtered = filtered && tableFilter.ShouldDiscardDDL(model.ActionType(rawEvent.Type), rawEvent.PrevSchemaName, rawEvent.PrevTableName, rawEvent.TableInfo) } } if rawEvent.TableInfo != nil { @@ -1609,8 +1609,8 @@ func buildDDLEventForRenameTable(rawEvent *PersistedDDLEvent, tableFilter filter } ddlEvent.PrevSchemaName = rawEvent.PrevSchemaName ddlEvent.PrevTableName = rawEvent.PrevTableName - ignorePrevTable := tableFilter != nil && tableFilter.ShouldIgnoreTable(rawEvent.PrevSchemaName, rawEvent.PrevTableName) - ignoreCurrentTable := tableFilter != nil && tableFilter.ShouldIgnoreTable(rawEvent.CurrentSchemaName, rawEvent.CurrentTableName) + ignorePrevTable := tableFilter != nil && tableFilter.ShouldIgnoreTable(rawEvent.PrevSchemaName, rawEvent.PrevTableName, rawEvent.TableInfo) + ignoreCurrentTable := tableFilter != nil && tableFilter.ShouldIgnoreTable(rawEvent.CurrentSchemaName, rawEvent.CurrentTableName, rawEvent.TableInfo) if isPartitionTable(rawEvent.TableInfo) { allPhysicalIDs := getAllPartitionIDs(rawEvent.TableInfo) if !ignorePrevTable { @@ -1876,8 +1876,8 @@ func buildDDLEventForExchangeTablePartition(rawEvent *PersistedDDLEvent, tableFi if !ok { return ddlEvent, false } - ignoreNormalTable := tableFilter != nil && tableFilter.ShouldIgnoreTable(rawEvent.PrevSchemaName, rawEvent.PrevTableName) - ignorePartitionTable := tableFilter != nil && tableFilter.ShouldIgnoreTable(rawEvent.CurrentSchemaName, rawEvent.CurrentTableName) + ignoreNormalTable := tableFilter != nil && tableFilter.ShouldIgnoreTable(rawEvent.PrevSchemaName, rawEvent.PrevTableName, rawEvent.TableInfo) + ignorePartitionTable := tableFilter != nil && tableFilter.ShouldIgnoreTable(rawEvent.CurrentSchemaName, rawEvent.CurrentTableName, rawEvent.TableInfo) physicalIDs := getAllPartitionIDs(rawEvent.TableInfo) droppedIDs := getDroppedIDs(rawEvent.PrevPartitions, physicalIDs) if len(droppedIDs) != 1 { @@ -1954,8 +1954,8 @@ func buildDDLEventForRenameTables(rawEvent *PersistedDDLEvent, tableFilter filte allFiltered := true resultQuerys := make([]string, 0) for i, tableInfo := range rawEvent.MultipleTableInfos { - ignorePrevTable := tableFilter != nil && tableFilter.ShouldIgnoreTable(rawEvent.PrevSchemaNames[i], rawEvent.PrevTableNames[i]) - ignoreCurrentTable := tableFilter != nil && tableFilter.ShouldIgnoreTable(rawEvent.CurrentSchemaNames[i], tableInfo.Name.O) + ignorePrevTable := tableFilter != nil && tableFilter.ShouldIgnoreTable(rawEvent.PrevSchemaNames[i], rawEvent.PrevTableNames[i], tableInfo) + ignoreCurrentTable := tableFilter != nil && tableFilter.ShouldIgnoreTable(rawEvent.CurrentSchemaNames[i], tableInfo.Name.O, tableInfo) if ignorePrevTable && ignoreCurrentTable { continue } @@ -2064,7 +2064,7 @@ func buildDDLEventForCreateTables(rawEvent *PersistedDDLEvent, tableFilter filte logicalTableCount := 0 allFiltered := true for _, info := range rawEvent.MultipleTableInfos { - if tableFilter != nil && tableFilter.ShouldIgnoreTable(rawEvent.CurrentSchemaName, info.Name.O) { + if tableFilter != nil && tableFilter.ShouldIgnoreTable(rawEvent.CurrentSchemaName, info.Name.O, info) { continue } allFiltered = false @@ -2092,7 +2092,7 @@ func buildDDLEventForCreateTables(rawEvent *PersistedDDLEvent, tableFilter filte addName := make([]commonEvent.SchemaTableName, 0, logicalTableCount) resultQuerys := make([]string, 0, logicalTableCount) for i, info := range rawEvent.MultipleTableInfos { - if tableFilter != nil && tableFilter.ShouldIgnoreTable(rawEvent.CurrentSchemaName, info.Name.O) { + if tableFilter != nil && tableFilter.ShouldIgnoreTable(rawEvent.CurrentSchemaName, info.Name.O, info) { log.Info("build ddl event for create tables filter table", zap.String("schemaName", rawEvent.CurrentSchemaName), zap.String("tableName", info.Name.O)) diff --git a/logservice/schemastore/persist_storage_test_utils.go b/logservice/schemastore/persist_storage_test_utils.go index 8a9405b8c..703a236b3 100644 --- a/logservice/schemastore/persist_storage_test_utils.go +++ b/logservice/schemastore/persist_storage_test_utils.go @@ -145,7 +145,7 @@ func buildTableFilterByNameForTest(schemaName, tableName string) filter.Filter { filterConfig := &config.FilterConfig{ Rules: []string{filterRule}, } - tableFilter, err := filter.NewFilter(filterConfig, "", false) + tableFilter, err := filter.NewFilter(filterConfig, "", false, false) if err != nil { log.Panic("build filter failed", zap.Error(err)) } diff --git a/maintainer/maintainer_controller.go b/maintainer/maintainer_controller.go index 92b0a5232..a4359082a 100644 --- a/maintainer/maintainer_controller.go +++ b/maintainer/maintainer_controller.go @@ -403,7 +403,7 @@ func (c *Controller) addNewSpans(schemaID int64, tableSpans []*heartbeatpb.Table func (c *Controller) loadTables(startTs uint64) ([]commonEvent.Table, error) { // todo: do we need to set timezone here? - f, err := filter.NewFilter(c.cfConfig.Filter, "", c.cfConfig.ForceReplicate) + f, err := filter.NewFilter(c.cfConfig.Filter, "", c.cfConfig.CaseSensitive, c.cfConfig.ForceReplicate) if err != nil { return nil, errors.Cause(err) } diff --git a/pkg/config/changefeed.go b/pkg/config/changefeed.go index e65a713ff..097481ced 100644 --- a/pkg/config/changefeed.go +++ b/pkg/config/changefeed.go @@ -37,7 +37,8 @@ type ChangefeedConfig struct { TargetTS uint64 `json:"target_ts"` SinkURI string `json:"sink_uri"` // timezone used when checking sink uri - TimeZone string `json:"timezone" default:"system"` + TimeZone string `json:"timezone" default:"system"` + CaseSensitive bool `json:"case_sensitive" default:"false"` // if true, force to replicate some ineligible tables ForceReplicate bool `json:"force_replicate" default:"false"` Filter *FilterConfig `toml:"filter" json:"filter"` @@ -85,6 +86,7 @@ func (info *ChangeFeedInfo) ToChangefeedConfig() *ChangefeedConfig { StartTS: info.StartTs, TargetTS: info.TargetTs, SinkURI: info.SinkURI, + CaseSensitive: info.Config.CaseSensitive, ForceReplicate: info.Config.ForceReplicate, SinkConfig: info.Config.Sink, Filter: info.Config.Filter, diff --git a/pkg/eventservice/event_service_test.go b/pkg/eventservice/event_service_test.go index 78df2159e..25c7abfb7 100644 --- a/pkg/eventservice/event_service_test.go +++ b/pkg/eventservice/event_service_test.go @@ -473,7 +473,7 @@ type mockDispatcherInfo struct { func newMockDispatcherInfo(t *testing.T, dispatcherID common.DispatcherID, tableID int64, actionType eventpb.ActionType) *mockDispatcherInfo { cfg := config.NewDefaultFilterConfig() - filter, err := filter.NewFilter(cfg, "", false) + filter, err := filter.NewFilter(cfg, "", false, false) require.NoError(t, err) return &mockDispatcherInfo{ clusterID: 1, diff --git a/pkg/filter/filter.go b/pkg/filter/filter.go index 08cf0fc7e..8b4feea0d 100644 --- a/pkg/filter/filter.go +++ b/pkg/filter/filter.go @@ -14,16 +14,14 @@ package filter import ( - "strings" "sync" "github.com/pingcap/log" "github.com/pingcap/ticdc/eventpb" - "github.com/pingcap/ticdc/pkg/apperror" "github.com/pingcap/ticdc/pkg/common" - commonEvent "github.com/pingcap/ticdc/pkg/common/event" "github.com/pingcap/ticdc/pkg/config" timodel "github.com/pingcap/tidb/pkg/meta/model" + "github.com/pingcap/tidb/pkg/parser/mysql" tfilter "github.com/pingcap/tidb/pkg/util/table-filter" "github.com/pingcap/tiflow/cdc/model" bf "github.com/pingcap/tiflow/pkg/binlog-filter" @@ -52,17 +50,14 @@ type Filter interface { // ShouldDiscardDDL returns true if this DDL should be discarded. // If a ddl is discarded, it will neither be applied to cdc's schema storage // nor sent to downstream. - ShouldDiscardDDL(ddlType timodel.ActionType, schema, table string) bool + ShouldDiscardDDL(ddlType timodel.ActionType, schema, table string, tableInfo *timodel.TableInfo) bool // ShouldIgnoreTable returns true if the table should be ignored. - ShouldIgnoreTable(schema, table string) bool + ShouldIgnoreTable(schema, table string, tableInfo *timodel.TableInfo) bool // ShouldIgnoreSchema returns true if the schema should be ignored. ShouldIgnoreSchema(schema string) bool // Verify should only be called by create changefeed OpenAPI. // Its purpose is to verify the expression filter config. Verify(tableInfos []*model.TableInfo) error - - // filter ddl event to update query and influenced table spans - FilterDDLEvent(ddl *commonEvent.DDLEvent) error } // filter implements Filter. @@ -75,10 +70,11 @@ type filter struct { sqlEventFilter *sqlEventFilter // ignoreTxnStartTs is used to filter out dml/ddl event by its starsTs. ignoreTxnStartTs []uint64 + forceReplicate bool } // NewFilter creates a filter. -func NewFilter(cfg *config.FilterConfig, tz string, caseSensitive bool) (Filter, error) { +func NewFilter(cfg *config.FilterConfig, tz string, caseSensitive bool, forceReplicate bool) (Filter, error) { f, err := VerifyTableRules(cfg) if err != nil { return nil, err @@ -101,36 +97,71 @@ func NewFilter(cfg *config.FilterConfig, tz string, caseSensitive bool) (Filter, dmlExprFilter: dmlExprFilter, sqlEventFilter: sqlEventFilter, ignoreTxnStartTs: cfg.IgnoreTxnStartTs, + forceReplicate: forceReplicate, }, nil } -func (f *filter) FilterDDLEvent(ddl *commonEvent.DDLEvent) error { - query := ddl.Query - queryList := strings.Split(query, ";") - if len(queryList) == 1 { - return nil +// IsEligible returns whether the table is a eligible table. +// A table is eligible if it has a primary key or unique key on not null columns. +// Or when enable forReplicate or the table is a view. +// TODO: Add some tests for this function. +func (f *filter) IsEligible(tableInfo *timodel.TableInfo) bool { + // Sequence is not supported yet, TiCDC needs to filter all sequence tables. + // See https://github.com/pingcap/tiflow/issues/4559 + if tableInfo.IsSequence() { + return false + } + if f.forceReplicate { + return true } - multiTableInfos := ddl.MultipleTableInfos - schemaName := ddl.SchemaName - if len(multiTableInfos) != len(queryList) { - log.Error("DDL Event is not valid, query count not equal to table count", zap.Any("ddl", ddl)) - return apperror.NewAppError(apperror.ErrorInvalidDDLEvent, "DDL Event is not valid, query count not equal to table count") + if tableInfo.IsView() { + return true } - finalQuery := make([]string, 0, len(queryList)) - for i, query := range queryList { - tableInfo := multiTableInfos[i] - // Only need to check if table name needs to be filtered, - // if schema name needs to be filtered, the entire query will not be given to dispatcher - tableName := tableInfo.TableName.Table - if !f.ShouldIgnoreTable(schemaName, tableName) { - finalQuery = append(finalQuery, query) + + // If the table has primary key, it is eligible. + for _, col := range tableInfo.Columns { + if !(col.IsGenerated() && !col.GeneratedStored) { // visible and not stored generated column + if (tableInfo.PKIsHandle && mysql.HasPriKeyFlag(col.GetFlag())) || col.ID == timodel.ExtraHandleID { + return true + } } } - if len(finalQuery) != len(queryList) { - ddl.Query = strings.Join(finalQuery, ";") + + // If the table has unique key on not null columns, it is eligible. + for _, idx := range tableInfo.Indices { + if idx.Primary { + return true + } + if len(idx.Columns) == 0 { + continue + } + if idx.Unique { + // ensure all columns in unique key have NOT NULL flag + allColNotNull := true + skip := false + for _, idxCol := range idx.Columns { + col := timodel.FindColumnInfo(tableInfo.Cols(), idxCol.Name.L) + // This index has a column in DeleteOnly state, + // or it is expression index (it defined on a hidden column), + // it can not be implicit PK, go to next index iterator + if col == nil || col.Hidden { + skip = true + break + } + if !mysql.HasNotNullFlag(col.GetFlag()) { + allColNotNull = false + break + } + } + if skip { + continue + } + if allColNotNull { + return true + } + } } - // TODO: Should also update the table information that ddl depends on - return nil + return false } // ShouldIgnoreDMLEvent checks if a DML event should be ignore by conditions below: @@ -147,7 +178,7 @@ func (f *filter) ShouldIgnoreDMLEvent( return true, nil } - if f.ShouldIgnoreTable(dml.TableInfo.GetSchemaName(), dml.TableInfo.GetTableName()) { + if f.ShouldIgnoreTable(dml.TableInfo.GetSchemaName(), dml.TableInfo.GetTableName(), nil) { return true, nil } @@ -165,7 +196,7 @@ func (f *filter) ShouldIgnoreDMLEvent( // 0. By allow list. // 1. By schema name. // 2. By table name. -func (f *filter) ShouldDiscardDDL(ddlType timodel.ActionType, schema, table string) bool { +func (f *filter) ShouldDiscardDDL(ddlType timodel.ActionType, schema, table string, tableInfo *timodel.TableInfo) bool { if !isAllowedDDL(ddlType) { return true } @@ -173,7 +204,7 @@ func (f *filter) ShouldDiscardDDL(ddlType timodel.ActionType, schema, table stri if IsSchemaDDL(ddlType) { return f.ShouldIgnoreSchema(schema) } - return f.ShouldIgnoreTable(schema, table) + return f.ShouldIgnoreTable(schema, table, tableInfo) } // ShouldIgnoreDDLEvent checks if a DDL event should be ignore by conditions below: @@ -199,10 +230,16 @@ func (f *filter) ShouldIgnoreDDLEvent(ddl *model.DDLEvent) (bool, error) { // ShouldIgnoreTable returns true if the specified table should be ignored by this changefeed. // NOTICE: Set `tbl` to an empty string to test against the whole database. -func (f *filter) ShouldIgnoreTable(db, tbl string) bool { +func (f *filter) ShouldIgnoreTable(db, tbl string, tableInfo *timodel.TableInfo) bool { if IsSysSchema(db) { return true } + + if tableInfo != nil && !f.IsEligible(tableInfo) { + log.Info("table is not eligible, should ignore this table", zap.String("db", db), zap.String("table", tbl)) + return true + } + return !f.tableFilter.MatchTable(db, tbl) } @@ -273,10 +310,10 @@ func (s *SharedFilterStorage) GetOrSetFilter( } // convert eventpb.FilterConfig to config.FilterConfig filterCfg := &config.FilterConfig{ - Rules: cfg.Rules, - IgnoreTxnStartTs: cfg.IgnoreTxnStartTs, + Rules: cfg.FilterConfig.Rules, + IgnoreTxnStartTs: cfg.FilterConfig.IgnoreTxnStartTs, } - for _, rule := range cfg.EventFilters { + for _, rule := range cfg.FilterConfig.EventFilters { f := &config.EventFilterRule{ Matcher: rule.Matcher, IgnoreSQL: rule.IgnoreSql, @@ -292,7 +329,7 @@ func (s *SharedFilterStorage) GetOrSetFilter( filterCfg.EventFilters = append(filterCfg.EventFilters, f) } // generate table filter - f, err := NewFilter(filterCfg, tz, caseSensitive) + f, err := NewFilter(filterCfg, tz, cfg.CaseSensitive, cfg.ForceReplicate) if err != nil { return nil, err } diff --git a/utils/dynstream/interfaces.go b/utils/dynstream/interfaces.go index 0107bb9b1..00ce39242 100644 --- a/utils/dynstream/interfaces.go +++ b/utils/dynstream/interfaces.go @@ -253,6 +253,13 @@ func NewAreaSettings() AreaSettings { } } +func NewAreaSettingsWithMaxPendingSize(size int) AreaSettings { + return AreaSettings{ + MaxPendingSize: size, + FeedbackInterval: DefaultFeedbackInterval, + } +} + type Feedback[A Area, P Path, D Dest] struct { Area A Path P