Skip to content

Commit

Permalink
[enhancement] add concurrency when fetch objects of table (#17449)
Browse files Browse the repository at this point in the history
add concurrency when fetch objects of table

Approved by: @daviszhen, @XuPeng-SH, @reusee, @sukki37
  • Loading branch information
volgariver6 authored Jul 11, 2024
1 parent 424f10d commit 84ef665
Show file tree
Hide file tree
Showing 5 changed files with 145 additions and 26 deletions.
2 changes: 1 addition & 1 deletion pkg/cnservice/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -409,7 +409,7 @@ func (c *Config) Validate() error {
}

if c.LogtailUpdateWorkerFactor == 0 {
c.LogtailUpdateWorkerFactor = 8
c.LogtailUpdateWorkerFactor = 4
}

if !metadata.ValidStateString(c.InitWorkState) {
Expand Down
7 changes: 3 additions & 4 deletions pkg/vm/engine/disttae/logtailreplay/partition_state.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,6 @@ import (
"github.com/matrixorigin/matrixone/pkg/pb/api"
"github.com/matrixorigin/matrixone/pkg/perfcounter"
txnTrace "github.com/matrixorigin/matrixone/pkg/txn/trace"
"github.com/matrixorigin/matrixone/pkg/vm/engine/tae/blockio"
)

type PartitionState struct {
Expand Down Expand Up @@ -477,9 +476,9 @@ func (p *PartitionState) HandleObjectInsert(ctx context.Context, bat *api.Batch,
p.objectIndexByTS.Set(e)
}
//prefetch the object meta
if err := blockio.PrefetchMeta(fs, objEntry.Location()); err != nil {
logutil.Errorf("prefetch object meta failed. %v", err)
}
// if err := blockio.PrefetchMeta(fs, objEntry.Location()); err != nil {
// logutil.Errorf("prefetch object meta failed. %v", err)
// }

p.dataObjects.Set(objEntry)
{
Expand Down
53 changes: 44 additions & 9 deletions pkg/vm/engine/disttae/stats.go
Original file line number Diff line number Diff line change
Expand Up @@ -152,6 +152,8 @@ type GlobalStats struct {

// KeyRouter is the router to decides which node should send to.
KeyRouter client.KeyRouter[pb.StatsInfoKey]

concurrentExecutor ConcurrentExecutor
}

func NewGlobalStats(
Expand All @@ -172,6 +174,8 @@ func NewGlobalStats(
for _, opt := range opts {
opt(s)
}
s.concurrentExecutor = newConcurrentExecutor(runtime.GOMAXPROCS(0) * s.updateWorkerFactor * 4)
s.concurrentExecutor.Run(ctx)
go s.consumeWorker(ctx)
go s.updateWorker(ctx)
return s
Expand Down Expand Up @@ -511,7 +515,7 @@ func (gs *GlobalStats) updateTableStats(key pb.StatsInfoKey) {
approxObjectNum,
stats,
)
if err := UpdateStats(gs.ctx, req); err != nil {
if err := UpdateStats(gs.ctx, req, gs.concurrentExecutor); err != nil {
logutil.Errorf("failed to init stats info for table %v, err: %v", key, err)
return
}
Expand Down Expand Up @@ -610,7 +614,9 @@ func getMinMaxValueByFloat64(typ types.Type, buf []byte) float64 {
}

// get ndv, minval , maxval, datatype from zonemap. Retrieve all columns except for rowid, return accurate number of objects
func updateInfoFromZoneMap(ctx context.Context, req *updateStatsRequest, info *plan2.InfoFromZoneMap) error {
func updateInfoFromZoneMap(
ctx context.Context, req *updateStatsRequest, info *plan2.InfoFromZoneMap, executor ConcurrentExecutor,
) error {
start := time.Now()
defer func() {
v2.TxnStatementUpdateInfoFromZonemapHistogram.Observe(time.Since(start).Seconds())
Expand All @@ -627,14 +633,34 @@ func updateInfoFromZoneMap(ctx context.Context, req *updateStatsRequest, info *p
return err
}

onObjFn := func(obj logtailreplay.ObjectEntry) error {
type metaWrapper struct {
meta objectio.ObjectMeta
blkCount uint32
index int
}

var metas []metaWrapper
var updateMu sync.Mutex
onObjFn := func(objIndex int, obj logtailreplay.ObjectEntry) error {
location := obj.Location()
if objMeta, err = objectio.FastLoadObjectMeta(ctx, &location, false, fs); err != nil {
return err
}
updateMu.Lock()
defer updateMu.Unlock()
metas = append(metas, metaWrapper{
meta: objMeta,
blkCount: obj.BlkCnt(),
index: objIndex,
})
return nil
}

mergeFn := func(objMetaW metaWrapper) {
objMeta = objMetaW.meta
meta = objMeta.MustDataMeta()
info.AccurateObjectNumber++
info.BlockNumber += int64(obj.BlkCnt())
info.BlockNumber += int64(objMetaW.blkCount)
info.TableCnt += float64(meta.BlockHeader().Rows())
if !init {
init = true
Expand Down Expand Up @@ -687,11 +713,18 @@ func updateInfoFromZoneMap(ctx context.Context, req *updateStatsRequest, info *p
}
}
}
return nil
}
if err = ForeachVisibleDataObject(req.partitionState, req.ts, onObjFn); err != nil {
if err = ForeachVisibleDataObject(
req.partitionState,
req.ts,
onObjFn,
executor,
); err != nil {
return err
}
for _, m := range metas {
mergeFn(m)
}

return nil
}
Expand Down Expand Up @@ -721,7 +754,9 @@ func adjustNDV(info *plan2.InfoFromZoneMap, tableDef *plan2.TableDef) {
}

// UpdateStats is the main function to calculate and update the stats for scan node.
func UpdateStats(ctx context.Context, req *updateStatsRequest) error {
func UpdateStats(
ctx context.Context, req *updateStatsRequest, executor ConcurrentExecutor,
) error {
start := time.Now()
defer func() {
v2.TxnStatementUpdateStatsDurationHistogram.Observe(time.Since(start).Seconds())
Expand All @@ -737,11 +772,11 @@ func UpdateStats(ctx context.Context, req *updateStatsRequest) error {
if len(req.partitionsTableDef) > 0 {
for _, def := range req.partitionsTableDef {
req.tableDef = def
if err := updateInfoFromZoneMap(ctx, req, info); err != nil {
if err := updateInfoFromZoneMap(ctx, req, info, executor); err != nil {
return err
}
}
} else if err := updateInfoFromZoneMap(ctx, req, info); err != nil {
} else if err := updateInfoFromZoneMap(ctx, req, info, executor); err != nil {
return err
}
adjustNDV(info, baseTableDef)
Expand Down
48 changes: 39 additions & 9 deletions pkg/vm/engine/disttae/txn_table.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ import (
"fmt"
"strconv"
"strings"
"sync"
"sync/atomic"
"time"
"unsafe"
Expand Down Expand Up @@ -138,7 +139,7 @@ func (tbl *txnTable) stats(ctx context.Context) (*pb.StatsInfo, error) {
approxObjectNum,
stats,
)
if err := UpdateStats(ctx, req); err != nil {
if err := UpdateStats(ctx, req, nil); err != nil {
logutil.Errorf("failed to init stats info for table %d", tbl.tableId)
return nil, err
}
Expand Down Expand Up @@ -296,19 +297,35 @@ func (tbl *txnTable) Size(ctx context.Context, columnName string) (uint64, error
func ForeachVisibleDataObject(
state *logtailreplay.PartitionState,
ts types.TS,
fn func(obj logtailreplay.ObjectEntry) error,
fn func(index int, obj logtailreplay.ObjectEntry) error,
executor ConcurrentExecutor,
) (err error) {
iter, err := state.NewObjectsIter(ts)
if err != nil {
return err
}
defer iter.Close()
var i int
var wg sync.WaitGroup
for iter.Next() {
var j = i
entry := iter.Entry()
if err = fn(entry); err != nil {
break
if executor != nil {
wg.Add(1)
executor.AppendTask(func() error {
defer wg.Done()
return fn(j, entry)
})
} else {
if err = fn(j, entry); err != nil {
break
}
}
i++
}
if executor != nil {
wg.Wait()
}
iter.Close()
return
}

Expand Down Expand Up @@ -346,12 +363,15 @@ func (tbl *txnTable) MaxAndMinValues(ctx context.Context) ([][2]any, []uint8, er
if err != nil {
return nil, nil, err
}
onObjFn := func(obj logtailreplay.ObjectEntry) error {
var updateMu sync.Mutex
onObjFn := func(_ int, obj logtailreplay.ObjectEntry) error {
var err error
location := obj.Location()
if objMeta, err = objectio.FastLoadObjectMeta(ctx, &location, false, fs); err != nil {
return err
}
updateMu.Lock()
defer updateMu.Unlock()
meta = objMeta.MustDataMeta()
if inited {
for idx := range zms {
Expand All @@ -376,7 +396,9 @@ func (tbl *txnTable) MaxAndMinValues(ctx context.Context) ([][2]any, []uint8, er
if err = ForeachVisibleDataObject(
part,
types.TimestampToTS(tbl.db.op.SnapshotTS()),
onObjFn); err != nil {
onObjFn,
nil,
); err != nil {
return nil, nil, err
}

Expand Down Expand Up @@ -426,7 +448,8 @@ func (tbl *txnTable) GetColumMetadataScanInfo(ctx context.Context, name string)
return nil, err
}
infoList := make([]*plan.MetadataScanInfo, 0, state.ApproxObjectsNum())
onObjFn := func(obj logtailreplay.ObjectEntry) error {
var updateMu sync.Mutex
onObjFn := func(_ int, obj logtailreplay.ObjectEntry) error {
createTs, err := obj.CreateTime.Marshal()
if err != nil {
return err
Expand Down Expand Up @@ -460,6 +483,8 @@ func (tbl *txnTable) GetColumMetadataScanInfo(ctx context.Context, name string)
if err != nil {
return err
}
updateMu.Lock()
defer updateMu.Unlock()
meta := objMeta.MustDataMeta()
rowCnt := int64(meta.BlockHeader().Rows())

Expand All @@ -482,7 +507,12 @@ func (tbl *txnTable) GetColumMetadataScanInfo(ctx context.Context, name string)
return nil
}

if err = ForeachVisibleDataObject(state, types.TimestampToTS(tbl.db.op.SnapshotTS()), onObjFn); err != nil {
if err = ForeachVisibleDataObject(
state,
types.TimestampToTS(tbl.db.op.SnapshotTS()),
onObjFn,
nil,
); err != nil {
return nil, err
}

Expand Down
61 changes: 58 additions & 3 deletions pkg/vm/engine/disttae/util.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,9 +18,6 @@ import (
"bytes"
"context"
"fmt"
"regexp"
"strings"

"github.com/matrixorigin/matrixone/pkg/catalog"
"github.com/matrixorigin/matrixone/pkg/clusterservice"
"github.com/matrixorigin/matrixone/pkg/common/moerr"
Expand All @@ -45,6 +42,8 @@ import (
"github.com/matrixorigin/matrixone/pkg/vm/engine/tae/options"
"github.com/matrixorigin/matrixone/pkg/vm/process"
"go.uber.org/zap"
"regexp"
"strings"
)

func compPkCol(colName string, pkName string) bool {
Expand Down Expand Up @@ -2596,3 +2595,59 @@ func CheckTxnIsValid(txnOp client.TxnOperator) (err error) {
_, err = txnIsValid(txnOp)
return err
}

// concurrentTask is the task that runs in the concurrent executor.
type concurrentTask func() error

// ConcurrentExecutor is an interface that runs tasks concurrently.
type ConcurrentExecutor interface {
// AppendTask append the concurrent task to the exuecutor.
AppendTask(concurrentTask)
// Run starts receive task to execute.
Run(context.Context)
// GetConcurrency returns the concurrency of this executor.
GetConcurrency() int
}

type concurrentExecutor struct {
// concurrency is the concurrency to run the tasks at the same time.
concurrency int
// task contains all the tasks needed to run.
tasks chan concurrentTask
}

func newConcurrentExecutor(concurrency int) ConcurrentExecutor {
return &concurrentExecutor{
concurrency: concurrency,
tasks: make(chan concurrentTask, 2048),
}
}

// AppendTask implements the ConcurrentExecutor interface.
func (e *concurrentExecutor) AppendTask(t concurrentTask) {
e.tasks <- t
}

// Run implements the ConcurrentExecutor interface.
func (e *concurrentExecutor) Run(ctx context.Context) {
for i := 0; i < e.concurrency; i++ {
go func() {
for {
select {
case <-ctx.Done():
return

case t := <-e.tasks:
if err := t(); err != nil {
logutil.Errorf("failed to execute task: %v", err)
}
}
}
}()
}
}

// GetConcurrency implements the ConcurrentExecutor interface.
func (e *concurrentExecutor) GetConcurrency() int {
return e.concurrency
}

0 comments on commit 84ef665

Please sign in to comment.