diff --git a/pkg/cnservice/types.go b/pkg/cnservice/types.go index b87e881939e77..bf17e1501db26 100644 --- a/pkg/cnservice/types.go +++ b/pkg/cnservice/types.go @@ -409,7 +409,7 @@ func (c *Config) Validate() error { } if c.LogtailUpdateWorkerFactor == 0 { - c.LogtailUpdateWorkerFactor = 8 + c.LogtailUpdateWorkerFactor = 4 } if !metadata.ValidStateString(c.InitWorkState) { diff --git a/pkg/vm/engine/disttae/logtailreplay/partition_state.go b/pkg/vm/engine/disttae/logtailreplay/partition_state.go index a32b1d4e7b4f0..794843b278d3b 100644 --- a/pkg/vm/engine/disttae/logtailreplay/partition_state.go +++ b/pkg/vm/engine/disttae/logtailreplay/partition_state.go @@ -34,7 +34,6 @@ import ( "github.com/matrixorigin/matrixone/pkg/pb/api" "github.com/matrixorigin/matrixone/pkg/perfcounter" txnTrace "github.com/matrixorigin/matrixone/pkg/txn/trace" - "github.com/matrixorigin/matrixone/pkg/vm/engine/tae/blockio" ) type PartitionState struct { @@ -477,9 +476,9 @@ func (p *PartitionState) HandleObjectInsert(ctx context.Context, bat *api.Batch, p.objectIndexByTS.Set(e) } //prefetch the object meta - if err := blockio.PrefetchMeta(fs, objEntry.Location()); err != nil { - logutil.Errorf("prefetch object meta failed. %v", err) - } + // if err := blockio.PrefetchMeta(fs, objEntry.Location()); err != nil { + // logutil.Errorf("prefetch object meta failed. %v", err) + // } p.dataObjects.Set(objEntry) { diff --git a/pkg/vm/engine/disttae/stats.go b/pkg/vm/engine/disttae/stats.go index ee63a4ea19b8e..ed6910bd9f584 100644 --- a/pkg/vm/engine/disttae/stats.go +++ b/pkg/vm/engine/disttae/stats.go @@ -152,6 +152,8 @@ type GlobalStats struct { // KeyRouter is the router to decides which node should send to. KeyRouter client.KeyRouter[pb.StatsInfoKey] + + concurrentExecutor ConcurrentExecutor } func NewGlobalStats( @@ -172,6 +174,8 @@ func NewGlobalStats( for _, opt := range opts { opt(s) } + s.concurrentExecutor = newConcurrentExecutor(runtime.GOMAXPROCS(0) * s.updateWorkerFactor * 4) + s.concurrentExecutor.Run(ctx) go s.consumeWorker(ctx) go s.updateWorker(ctx) return s @@ -511,7 +515,7 @@ func (gs *GlobalStats) updateTableStats(key pb.StatsInfoKey) { approxObjectNum, stats, ) - if err := UpdateStats(gs.ctx, req); err != nil { + if err := UpdateStats(gs.ctx, req, gs.concurrentExecutor); err != nil { logutil.Errorf("failed to init stats info for table %v, err: %v", key, err) return } @@ -610,7 +614,9 @@ func getMinMaxValueByFloat64(typ types.Type, buf []byte) float64 { } // get ndv, minval , maxval, datatype from zonemap. Retrieve all columns except for rowid, return accurate number of objects -func updateInfoFromZoneMap(ctx context.Context, req *updateStatsRequest, info *plan2.InfoFromZoneMap) error { +func updateInfoFromZoneMap( + ctx context.Context, req *updateStatsRequest, info *plan2.InfoFromZoneMap, executor ConcurrentExecutor, +) error { start := time.Now() defer func() { v2.TxnStatementUpdateInfoFromZonemapHistogram.Observe(time.Since(start).Seconds()) @@ -627,14 +633,34 @@ func updateInfoFromZoneMap(ctx context.Context, req *updateStatsRequest, info *p return err } - onObjFn := func(obj logtailreplay.ObjectEntry) error { + type metaWrapper struct { + meta objectio.ObjectMeta + blkCount uint32 + index int + } + + var metas []metaWrapper + var updateMu sync.Mutex + onObjFn := func(objIndex int, obj logtailreplay.ObjectEntry) error { location := obj.Location() if objMeta, err = objectio.FastLoadObjectMeta(ctx, &location, false, fs); err != nil { return err } + updateMu.Lock() + defer updateMu.Unlock() + metas = append(metas, metaWrapper{ + meta: objMeta, + blkCount: obj.BlkCnt(), + index: objIndex, + }) + return nil + } + + mergeFn := func(objMetaW metaWrapper) { + objMeta = objMetaW.meta meta = objMeta.MustDataMeta() info.AccurateObjectNumber++ - info.BlockNumber += int64(obj.BlkCnt()) + info.BlockNumber += int64(objMetaW.blkCount) info.TableCnt += float64(meta.BlockHeader().Rows()) if !init { init = true @@ -687,11 +713,18 @@ func updateInfoFromZoneMap(ctx context.Context, req *updateStatsRequest, info *p } } } - return nil } - if err = ForeachVisibleDataObject(req.partitionState, req.ts, onObjFn); err != nil { + if err = ForeachVisibleDataObject( + req.partitionState, + req.ts, + onObjFn, + executor, + ); err != nil { return err } + for _, m := range metas { + mergeFn(m) + } return nil } @@ -721,7 +754,9 @@ func adjustNDV(info *plan2.InfoFromZoneMap, tableDef *plan2.TableDef) { } // UpdateStats is the main function to calculate and update the stats for scan node. -func UpdateStats(ctx context.Context, req *updateStatsRequest) error { +func UpdateStats( + ctx context.Context, req *updateStatsRequest, executor ConcurrentExecutor, +) error { start := time.Now() defer func() { v2.TxnStatementUpdateStatsDurationHistogram.Observe(time.Since(start).Seconds()) @@ -737,11 +772,11 @@ func UpdateStats(ctx context.Context, req *updateStatsRequest) error { if len(req.partitionsTableDef) > 0 { for _, def := range req.partitionsTableDef { req.tableDef = def - if err := updateInfoFromZoneMap(ctx, req, info); err != nil { + if err := updateInfoFromZoneMap(ctx, req, info, executor); err != nil { return err } } - } else if err := updateInfoFromZoneMap(ctx, req, info); err != nil { + } else if err := updateInfoFromZoneMap(ctx, req, info, executor); err != nil { return err } adjustNDV(info, baseTableDef) diff --git a/pkg/vm/engine/disttae/txn_table.go b/pkg/vm/engine/disttae/txn_table.go index 0e6ec10288099..3e3ec57389aae 100644 --- a/pkg/vm/engine/disttae/txn_table.go +++ b/pkg/vm/engine/disttae/txn_table.go @@ -19,6 +19,7 @@ import ( "fmt" "strconv" "strings" + "sync" "sync/atomic" "time" "unsafe" @@ -138,7 +139,7 @@ func (tbl *txnTable) stats(ctx context.Context) (*pb.StatsInfo, error) { approxObjectNum, stats, ) - if err := UpdateStats(ctx, req); err != nil { + if err := UpdateStats(ctx, req, nil); err != nil { logutil.Errorf("failed to init stats info for table %d", tbl.tableId) return nil, err } @@ -296,19 +297,35 @@ func (tbl *txnTable) Size(ctx context.Context, columnName string) (uint64, error func ForeachVisibleDataObject( state *logtailreplay.PartitionState, ts types.TS, - fn func(obj logtailreplay.ObjectEntry) error, + fn func(index int, obj logtailreplay.ObjectEntry) error, + executor ConcurrentExecutor, ) (err error) { iter, err := state.NewObjectsIter(ts) if err != nil { return err } + defer iter.Close() + var i int + var wg sync.WaitGroup for iter.Next() { + var j = i entry := iter.Entry() - if err = fn(entry); err != nil { - break + if executor != nil { + wg.Add(1) + executor.AppendTask(func() error { + defer wg.Done() + return fn(j, entry) + }) + } else { + if err = fn(j, entry); err != nil { + break + } } + i++ + } + if executor != nil { + wg.Wait() } - iter.Close() return } @@ -346,12 +363,15 @@ func (tbl *txnTable) MaxAndMinValues(ctx context.Context) ([][2]any, []uint8, er if err != nil { return nil, nil, err } - onObjFn := func(obj logtailreplay.ObjectEntry) error { + var updateMu sync.Mutex + onObjFn := func(_ int, obj logtailreplay.ObjectEntry) error { var err error location := obj.Location() if objMeta, err = objectio.FastLoadObjectMeta(ctx, &location, false, fs); err != nil { return err } + updateMu.Lock() + defer updateMu.Unlock() meta = objMeta.MustDataMeta() if inited { for idx := range zms { @@ -376,7 +396,9 @@ func (tbl *txnTable) MaxAndMinValues(ctx context.Context) ([][2]any, []uint8, er if err = ForeachVisibleDataObject( part, types.TimestampToTS(tbl.db.op.SnapshotTS()), - onObjFn); err != nil { + onObjFn, + nil, + ); err != nil { return nil, nil, err } @@ -426,7 +448,8 @@ func (tbl *txnTable) GetColumMetadataScanInfo(ctx context.Context, name string) return nil, err } infoList := make([]*plan.MetadataScanInfo, 0, state.ApproxObjectsNum()) - onObjFn := func(obj logtailreplay.ObjectEntry) error { + var updateMu sync.Mutex + onObjFn := func(_ int, obj logtailreplay.ObjectEntry) error { createTs, err := obj.CreateTime.Marshal() if err != nil { return err @@ -460,6 +483,8 @@ func (tbl *txnTable) GetColumMetadataScanInfo(ctx context.Context, name string) if err != nil { return err } + updateMu.Lock() + defer updateMu.Unlock() meta := objMeta.MustDataMeta() rowCnt := int64(meta.BlockHeader().Rows()) @@ -482,7 +507,12 @@ func (tbl *txnTable) GetColumMetadataScanInfo(ctx context.Context, name string) return nil } - if err = ForeachVisibleDataObject(state, types.TimestampToTS(tbl.db.op.SnapshotTS()), onObjFn); err != nil { + if err = ForeachVisibleDataObject( + state, + types.TimestampToTS(tbl.db.op.SnapshotTS()), + onObjFn, + nil, + ); err != nil { return nil, err } diff --git a/pkg/vm/engine/disttae/util.go b/pkg/vm/engine/disttae/util.go index 4285f956e8649..3aa7434f80861 100644 --- a/pkg/vm/engine/disttae/util.go +++ b/pkg/vm/engine/disttae/util.go @@ -18,9 +18,6 @@ import ( "bytes" "context" "fmt" - "regexp" - "strings" - "github.com/matrixorigin/matrixone/pkg/catalog" "github.com/matrixorigin/matrixone/pkg/clusterservice" "github.com/matrixorigin/matrixone/pkg/common/moerr" @@ -45,6 +42,8 @@ import ( "github.com/matrixorigin/matrixone/pkg/vm/engine/tae/options" "github.com/matrixorigin/matrixone/pkg/vm/process" "go.uber.org/zap" + "regexp" + "strings" ) func compPkCol(colName string, pkName string) bool { @@ -2596,3 +2595,59 @@ func CheckTxnIsValid(txnOp client.TxnOperator) (err error) { _, err = txnIsValid(txnOp) return err } + +// concurrentTask is the task that runs in the concurrent executor. +type concurrentTask func() error + +// ConcurrentExecutor is an interface that runs tasks concurrently. +type ConcurrentExecutor interface { + // AppendTask append the concurrent task to the exuecutor. + AppendTask(concurrentTask) + // Run starts receive task to execute. + Run(context.Context) + // GetConcurrency returns the concurrency of this executor. + GetConcurrency() int +} + +type concurrentExecutor struct { + // concurrency is the concurrency to run the tasks at the same time. + concurrency int + // task contains all the tasks needed to run. + tasks chan concurrentTask +} + +func newConcurrentExecutor(concurrency int) ConcurrentExecutor { + return &concurrentExecutor{ + concurrency: concurrency, + tasks: make(chan concurrentTask, 2048), + } +} + +// AppendTask implements the ConcurrentExecutor interface. +func (e *concurrentExecutor) AppendTask(t concurrentTask) { + e.tasks <- t +} + +// Run implements the ConcurrentExecutor interface. +func (e *concurrentExecutor) Run(ctx context.Context) { + for i := 0; i < e.concurrency; i++ { + go func() { + for { + select { + case <-ctx.Done(): + return + + case t := <-e.tasks: + if err := t(); err != nil { + logutil.Errorf("failed to execute task: %v", err) + } + } + } + }() + } +} + +// GetConcurrency implements the ConcurrentExecutor interface. +func (e *concurrentExecutor) GetConcurrency() int { + return e.concurrency +}