Skip to content

Commit

Permalink
GO-3861 cleanup
Browse files Browse the repository at this point in the history
  • Loading branch information
requilence committed Oct 4, 2024
1 parent 4851588 commit 7869711
Show file tree
Hide file tree
Showing 6 changed files with 447 additions and 445 deletions.
69 changes: 3 additions & 66 deletions core/indexer/fulltext.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,16 +7,13 @@ import (
"strings"
"time"

"github.com/gogo/protobuf/types"
"golang.org/x/exp/slices"

"github.com/anyproto/anytype-heart/core/block/cache"
smartblock2 "github.com/anyproto/anytype-heart/core/block/editor/smartblock"
"github.com/anyproto/anytype-heart/core/block/simple"
"github.com/anyproto/anytype-heart/core/block/simple/text"
"github.com/anyproto/anytype-heart/core/domain"
"github.com/anyproto/anytype-heart/core/subscription"
"github.com/anyproto/anytype-heart/core/syncstatus/syncsubscriptions"
"github.com/anyproto/anytype-heart/metrics"
"github.com/anyproto/anytype-heart/pkg/lib/bundle"
"github.com/anyproto/anytype-heart/pkg/lib/localstore/ftsearch"
Expand All @@ -39,83 +36,23 @@ func (i *indexer) ForceFTIndex() {
}
}

func (i *indexer) getSpaceIdsByPriority() []string {
var ids = make([]string, 0, i.lastSpacesSubscription.Len())
i.lastSpacesSubscription.Iterate(func(_ string, v *types.Struct) bool {
id := pbtypes.GetString(v, bundle.RelationKeyTargetSpaceId.String())
if id != "" {
ids = append(ids, id)
}
return true
})

log.Warnf("ft space ids priority: %v", ids)
return ids
}

func (i *indexer) updateSpacesPriority(priority []string) {
techSpaceId := i.techSpaceId.TechSpaceId()
i.lock.Lock()
i.spacesPriority = append([]string{techSpaceId}, slices.DeleteFunc(priority, func(s string) bool {
return s == techSpaceId
})...)
i.lock.Unlock()

i.spaceReindexQueue.RefreshPriority()
}

func (i *indexer) subscribeToSpaces() error {
objectReq := subscription.SubscribeRequest{
SubId: "lastOpenedSpaces",
Internal: true,
NoDepSubscription: true,
Keys: []string{bundle.RelationKeyTargetSpaceId.String(), bundle.RelationKeyLastOpenedDate.String(), bundle.RelationKeyLastModifiedDate.String()},
Filters: []*model.BlockContentDataviewFilter{
{
RelationKey: bundle.RelationKeyLayout.String(),
Condition: model.BlockContentDataviewFilter_Equal,
Value: pbtypes.Int64(int64(model.ObjectType_spaceView)),
},
},
Sorts: []*model.BlockContentDataviewSort{
{
RelationKey: bundle.RelationKeyLastOpenedDate.String(),
Type: model.BlockContentDataviewSort_Desc,
IncludeTime: true,
Format: model.RelationFormat_date,
EmptyPlacement: model.BlockContentDataviewSort_End,
},
},
}
i.lastSpacesSubscription = syncsubscriptions.NewSubscription(i.subscriptionService, objectReq)
return i.lastSpacesSubscription.Run(i.lastSpacesSubscriptionUpdateChan)
}

func (i *indexer) getIterator() func(id string, data struct{}) bool {
var ids []string
return func(id string, _ struct{}) bool {
ids = append(ids, id)
return true
}
}

// ftLoop runs full-text indexer
// MUST NOT be called more than once
func (i *indexer) ftLoopRoutine() {
ticker := time.NewTicker(ftIndexInterval)

i.runFullTextIndexer(i.componentCtx, i.getSpaceIdsByPriority())
i.runFullTextIndexer(i.componentCtx, i.getSpacesPriority())
defer close(i.ftQueueFinished)
var lastForceIndex time.Time
for {
select {
case <-i.componentCtx.Done():
return
case <-ticker.C:
i.runFullTextIndexer(i.componentCtx, i.getSpaceIdsByPriority())
i.runFullTextIndexer(i.componentCtx, i.getSpacesPriority())
case <-i.forceFt:
if time.Since(lastForceIndex) > ftIndexForceMinInterval {
i.runFullTextIndexer(i.componentCtx, i.getSpaceIdsByPriority())
i.runFullTextIndexer(i.componentCtx, i.getSpacesPriority())
lastForceIndex = time.Now()
}
}
Expand Down
75 changes: 63 additions & 12 deletions core/indexer/indexer.go
Original file line number Diff line number Diff line change
Expand Up @@ -81,14 +81,13 @@ type indexer struct {
btHash Hasher
forceFt chan struct{}

mb *mb.MB[clientspace.Space]
lastSpacesSubscription *syncsubscriptions.ObjectSubscription[*types.Struct]
lastSpacesSubscriptionUpdateChan chan []*types.Struct
lock sync.Mutex
reindexLogFields []zap.Field
techSpaceId techSpaceIdGetter
spacesPriority []string
spaceReindexQueue *taskmanager.TasksManager
mb *mb.MB[clientspace.Space]
lastSpacesSubscription *syncsubscriptions.ObjectSubscription[*types.Struct]
lock sync.Mutex
reindexLogFields []zap.Field
techSpaceId techSpaceIdGetter
spacesPriority []string
spaceReindexQueue *taskmanager.TasksManager
}

func (i *indexer) Init(a *app.App) (err error) {
Expand All @@ -104,9 +103,8 @@ func (i *indexer) Init(a *app.App) (err error) {
i.config = app.MustComponent[*config.Config](a)
i.subscriptionService = app.MustComponent[subscription.Service](a)
i.mb = mb.New[clientspace.Space](0)
i.spaceReindexQueue = taskmanager.NewTasksManager(1, i.taskPrioritySorter)
i.spaceReindexQueue = taskmanager.NewTasksManager(1, i.reindexTasksSorter)
i.techSpaceId = app.MustComponent[techSpaceIdGetter](a)
i.lastSpacesSubscriptionUpdateChan = make(chan []*types.Struct)
i.componentCtx, i.componentCancel = context.WithCancel(context.Background())
return
}
Expand All @@ -116,12 +114,11 @@ func (i *indexer) Name() (name string) {
}

func (i *indexer) Run(context.Context) (err error) {
i.runReindexerQueue()

err = i.subscribeToSpaces()
if err != nil {
return
}
go i.spaceReindexQueue.Run(i.componentCtx)
return i.StartFullTextIndex()
}

Expand Down Expand Up @@ -276,3 +273,57 @@ func headsHash(heads []string) string {
sum := sha256.Sum256([]byte(strings.Join(heads, ",")))
return fmt.Sprintf("%x", sum)
}

// subscribeToSpaces subscribes to the lastOpenedSpaces subscription
// it used by fulltext and reindexing to prioritize most recent spaces
func (i *indexer) subscribeToSpaces() error {
objectReq := subscription.SubscribeRequest{
SubId: "lastOpenedSpaces",
Internal: true,
NoDepSubscription: true,
Keys: []string{bundle.RelationKeyTargetSpaceId.String(), bundle.RelationKeyLastOpenedDate.String(), bundle.RelationKeyLastModifiedDate.String()},
Filters: []*model.BlockContentDataviewFilter{
{
RelationKey: bundle.RelationKeyLayout.String(),
Condition: model.BlockContentDataviewFilter_Equal,
Value: pbtypes.Int64(int64(model.ObjectType_spaceView)),
},
},
Sorts: []*model.BlockContentDataviewSort{
{
RelationKey: bundle.RelationKeyLastOpenedDate.String(),
Type: model.BlockContentDataviewSort_Desc,
IncludeTime: true,
Format: model.RelationFormat_date,
EmptyPlacement: model.BlockContentDataviewSort_End,
},
},
}
updatesChan := make(chan []*types.Struct)
go i.spacesSubscriptionChanWatcher(updatesChan)
i.lastSpacesSubscription = syncsubscriptions.NewSubscription(i.subscriptionService, objectReq)
return i.lastSpacesSubscription.Run(updatesChan)
}

func (i *indexer) spacesSubscriptionSetPriority(priority []string) {
techSpaceId := i.techSpaceId.TechSpaceId()
i.lock.Lock()
i.spacesPriority = append([]string{techSpaceId}, slices.DeleteFunc(priority, func(s string) bool {
return s == techSpaceId
})...)
i.lock.Unlock()

// notify the queue about the priority change
i.spaceReindexQueue.RefreshPriority()
}

func (i *indexer) spacesSubscriptionChanWatcher(ch chan []*types.Struct) {
for {
select {
case <-i.componentCtx.Done():
return
case records := <-ch:
i.spacesSubscriptionSetPriority(pbtypes.ExtractString(records, bundle.RelationKeyTargetSpaceId.String(), true))
}
}
}
Loading

0 comments on commit 7869711

Please sign in to comment.