Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
93 changes: 82 additions & 11 deletions backend/api/entities/v1alpha/entities.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (
"encoding/base64"
"encoding/hex"
"encoding/json"
"errors"
"fmt"
"math"
"regexp"
Expand Down Expand Up @@ -213,6 +214,19 @@ func (task *discoveryTask) start(api *Server) {
}
}

var qGetLatestBlockChange = dqb.Str(`
SELECT
blob_id,
version,
block_id,
ts
from fts_index
WHERE type IN ('title', 'document', 'meta')
AND ts >= :Ts
AND genesis_blob = :genesisBlobID
AND rowid != :rowID
ORDER BY ts ASC
`)
var qGetFTS = dqb.Str(`
WITH fts_top100 AS (
SELECT
Expand All @@ -223,7 +237,8 @@ WITH fts_top100 AS (
fts.blob_id,
structural_blobs.genesis_blob,
structural_blobs.extra_attrs->>'tsid' AS tsid,
fts.rank
fts.rank,
fts.rowid
FROM fts
JOIN structural_blobs
ON structural_blobs.id = fts.blob_id
Expand Down Expand Up @@ -265,7 +280,9 @@ SELECT
JOIN blobs AS b2
ON b2.id = a.value
) AS heads,
structural_blobs.ts
structural_blobs.ts,
structural_blobs.genesis_blob,
f.rowid
FROM fts_top100 AS f
JOIN structural_blobs
ON structural_blobs.id = f.blob_id
Expand Down Expand Up @@ -330,6 +347,8 @@ type searchResult struct {
docID string
blobCID string
blobID int64
genesisBlobID int64
rowID int64
contentType string
version string
versionTime *timestamppb.Timestamp
Expand All @@ -338,6 +357,10 @@ type searchResult struct {

// SearchEntities implements the Fuzzy search of entities.
func (srv *Server) SearchEntities(ctx context.Context, in *entities.SearchEntitiesRequest) (*entities.SearchEntitiesResponse, error) {
//start := time.Now()
//defer func() {
// fmt.Println("SearchEntities duration:", time.Since(start))
//s}()
searchResults := []searchResult{}
type value struct {
Value string `json:"v"`
Expand Down Expand Up @@ -460,6 +483,11 @@ func (srv *Server) SearchEntities(ctx context.Context, in *entities.SearchEntiti

ts := hlc.Timestamp(stmt.ColumnInt64(14) * 1000).Time()
res.versionTime = timestamppb.New(ts)
res.genesisBlobID = stmt.ColumnInt64(15)
if res.genesisBlobID == 0 {
res.genesisBlobID = res.blobID
}
res.rowID = stmt.ColumnInt64(16)
if res.contentType == "comment" {
res.iri = "hm://" + res.owner + "/" + res.tsid
} else if res.contentType == "contact" {
Expand Down Expand Up @@ -496,7 +524,7 @@ func (srv *Server) SearchEntities(ctx context.Context, in *entities.SearchEntiti
key := fmt.Sprintf("%s|%s|%s|%s", res.iri, res.blockID, res.rawContent, res.contentType)
if idx, ok := seen[key]; ok {
// duplicate – compare blobID
if res.blobID > uniqueResults[idx].blobID {
if res.versionTime.AsTime().After(uniqueResults[idx].versionTime.AsTime()) {
uniqueResults[idx] = res
bm := bodyMatches[i]
bm.Index = idx
Expand All @@ -517,7 +545,7 @@ func (srv *Server) SearchEntities(ctx context.Context, in *entities.SearchEntiti

//after := time.Now()
//elapsed := after.Sub(before)
//fmt.Printf("qGetFTS took %.9f s and returned %d results\n", elapsed.Seconds(), len(bodyMatches))
//fmt.Printf("qGetFTS took %.3f s and returned %d results\n", elapsed.Seconds(), len(bodyMatches))
matchingEntities := []*entities.Entity{}
getParentsFcn := func(match fuzzy.Match) ([]string, error) {
parents := make(map[string]interface{})
Expand Down Expand Up @@ -551,11 +579,11 @@ func (srv *Server) SearchEntities(ctx context.Context, in *entities.SearchEntiti
}
return parentTitles, nil
}
//before = time.Now()
//totalGetParentsTime := time.Duration(0)
//totalLatestBlockTime := time.Duration(0)
//var timesCalled int

totalLatestBlockTime := time.Duration(0)
timesCalled := 0
iter := 0
//prevIter := 0
for _, match := range bodyMatches {
//startParents := time.Now()
var parentTitles []string
Expand All @@ -574,6 +602,50 @@ func (srv *Server) SearchEntities(ctx context.Context, in *entities.SearchEntiti
id := searchResults[match.Index].iri

if searchResults[match.Index].version != "" && searchResults[match.Index].contentType != "comment" {

startLatestBlockTime := time.Now()
type Change struct {
blobID int64
version string
ts *timestamppb.Timestamp
}
latestUnrelated := Change{
blobID: searchResults[match.Index].blobID,
version: searchResults[match.Index].version,
ts: searchResults[match.Index].versionTime,
}

var errSameBlockChangeDetected = errors.New("same block change detected")
if latestUnrelated.version != searchResults[match.Index].latestVersion {
timesCalled++
//prevIter = iter
if err := srv.db.WithSave(ctx, func(conn *sqlite.Conn) error {
return sqlitex.Exec(conn, qGetLatestBlockChange(), func(stmt *sqlite.Stmt) error {
iter++
ts := hlc.Timestamp(stmt.ColumnInt64(3) * 1000).Time()
blockID := stmt.ColumnText(2)
currentChange := Change{
blobID: stmt.ColumnInt64(0),
version: stmt.ColumnText(1),
ts: timestamppb.New(ts),
}
if blockID == searchResults[match.Index].blockID {
return errSameBlockChangeDetected
}
latestUnrelated = currentChange
return nil
}, searchResults[match.Index].versionTime.Seconds*1_000+int64(searchResults[match.Index].versionTime.Nanos)/1_000_000, searchResults[match.Index].genesisBlobID, searchResults[match.Index].rowID)
}); err != nil && !errors.Is(err, errSameBlockChangeDetected) {
return nil, err
}
//if iter == prevIter {
// fmt.Println("No iteration", searchResults[match.Index].contentType, searchResults[match.Index].versionTime.Seconds*1_000+int64(searchResults[match.Index].versionTime.Nanos)/1_000_000, searchResults[match.Index].genesisBlobID, searchResults[match.Index].blockID, searchResults[match.Index].blobID)
//}
}
searchResults[match.Index].version = latestUnrelated.version
searchResults[match.Index].blobID = latestUnrelated.blobID
searchResults[match.Index].versionTime = latestUnrelated.ts
totalLatestBlockTime += time.Since(startLatestBlockTime)
if searchResults[match.Index].latestVersion == searchResults[match.Index].version {
searchResults[match.Index].version += "&l"
}
Expand Down Expand Up @@ -603,9 +675,8 @@ func (srv *Server) SearchEntities(ctx context.Context, in *entities.SearchEntiti
}
//after = time.Now()

//fmt.Printf("getParentsFcn took %.9f s\n", after.Sub(before).Seconds())
//fmt.Printf("getParentsFcn took %.9f s\n", totalGetParentsTime.Seconds())
//fmt.Printf("qGetLatestBlockChange took %.9f s and was called %d times\n", totalLatestBlockTime.Seconds(), timesCalled)
//fmt.Printf("getParentsFcn took %.3f s\n", totalGetParentsTime.Seconds())
//fmt.Printf("qGetLatestBlockChange took %.3f s and was called %d times and iterated over %d records\n", totalLatestBlockTime.Seconds(), timesCalled, iter)

sort.Slice(matchingEntities, func(i, j int) bool {
a, b := matchingEntities[i], matchingEntities[j]
Expand Down
25 changes: 13 additions & 12 deletions backend/blob/blob_change.go
Original file line number Diff line number Diff line change
Expand Up @@ -418,15 +418,15 @@ func indexChange(ictx *indexingCtx, id int64, eb Encoded[*Change]) error {
if !ok {
continue
}

ftsType := "meta"
// TODO(hm24): index other relevant metadata for list response and so on.
if extra.Title == "" && (k == "title" || k == "name" || k == "alias") {
if k == "title" || k == "name" || k == "alias" {
extra.Title = vs
if err := dbFTSInsertOrReplace(ictx.conn, vs, "title", id, "", sb.CID.String()); err != nil {
return fmt.Errorf("failed to insert record in fts table: %w", err)
}
ftsType = "title"
}
if err := dbFTSInsertOrReplace(ictx.conn, vs, ftsType, id, "", sb.CID.String(), sb.Ts, sb.GenesisBlob.Hash().String()); err != nil {
return fmt.Errorf("failed to insert record in fts table: %w", err)
}

u, err := url.Parse(vs)
if err != nil {
continue
Expand Down Expand Up @@ -458,13 +458,14 @@ func indexChange(ictx *indexingCtx, id int64, eb Encoded[*Change]) error {
vs, isStr := kv.Value.(string)
if len(kv.Key) == 1 && isStr {
k := kv.Key[0]

ftsKey := "meta"
// TODO(hm24): index other relevant metadata for list response and so on.
if extra.Title == "" && (k == "title" || k == "name" || k == "alias") {
if k == "title" || k == "name" || k == "alias" {
extra.Title = vs
if err := dbFTSInsertOrReplace(ictx.conn, vs, "title", id, "", sb.CID.String()); err != nil {
return fmt.Errorf("failed to insert record in fts table: %w", err)
}
ftsKey = "title"
}
if err := dbFTSInsertOrReplace(ictx.conn, vs, ftsKey, id, "", sb.CID.String(), sb.Ts, sb.GenesisBlob.Hash().String()); err != nil {
return fmt.Errorf("failed to insert record in fts table: %w", err)
}
}

Expand Down Expand Up @@ -495,7 +496,7 @@ func indexChange(ictx *indexingCtx, id int64, eb Encoded[*Change]) error {
return err
}
}
if err := dbFTSInsertOrReplace(ictx.conn, blk.Text, "document", id, blk.ID(), sb.CID.String()); err != nil {
if err := dbFTSInsertOrReplace(ictx.conn, blk.Text, "document", id, blk.ID(), sb.CID.String(), sb.Ts, sb.GenesisBlob.Hash().String()); err != nil {
return fmt.Errorf("failed to insert record in fts table: %w", err)
}
}
Expand Down
8 changes: 4 additions & 4 deletions backend/blob/blob_comment.go
Original file line number Diff line number Diff line change
Expand Up @@ -288,11 +288,11 @@ func indexComment(ictx *indexingCtx, id int64, eb Encoded[*Comment]) error {
}
ftsBlkID = blk.ID()
ftsContent = blk.Text
if ftsContent != "" {
if err := dbFTSInsertOrReplace(ictx.conn, ftsContent, ftsType, id, ftsBlkID, sb.CID.String()); err != nil {
return fmt.Errorf("failed to insert record in fts table: %w", err)
}
//if ftsContent != "" {
if err := dbFTSInsertOrReplace(ictx.conn, ftsContent, ftsType, id, ftsBlkID, sb.CID.String(), sb.Ts, sb.GenesisBlob.Hash().String()); err != nil {
return fmt.Errorf("failed to insert record in fts table: %w", err)
}
//}
}

return nil
Expand Down
2 changes: 1 addition & 1 deletion backend/blob/blob_contact.go
Original file line number Diff line number Diff line change
Expand Up @@ -124,7 +124,7 @@ func indexContact(ictx *indexingCtx, id int64, eb Encoded[*Contact]) error {
}
extraAttrs["subject"] = subjectID
extraAttrs["name"] = v.Name
if err := dbFTSInsertOrReplace(ictx.conn, v.Name, "contact", id, "", sb.CID.String()); err != nil {
if err := dbFTSInsertOrReplace(ictx.conn, v.Name, "contact", id, "", sb.CID.String(), sb.Ts, sb.GenesisBlob.Hash().String()); err != nil {
return fmt.Errorf("failed to insert record in fts table: %w", err)
}
} else {
Expand Down
8 changes: 4 additions & 4 deletions backend/blob/blob_profile.go
Original file line number Diff line number Diff line change
Expand Up @@ -272,10 +272,10 @@ func indexProfile(ictx *indexingCtx, id int64, eb Encoded[*Profile]) error {
return fmt.Errorf("failed to save structural blob: %w", err)
}

if ftsContent != "" {
if err := dbFTSInsertOrReplace(ictx.conn, ftsContent, ftsType, id, "", sb.CID.String()); err != nil {
return fmt.Errorf("failed to insert record in fts table: %w", err)
}
//if ftsContent != "" {
if err := dbFTSInsertOrReplace(ictx.conn, ftsContent, ftsType, id, "", sb.CID.String(), sb.Ts, sb.GenesisBlob.Hash().String()); err != nil {
return fmt.Errorf("failed to insert record in fts table: %w", err)
}
//}
return nil
}
Loading
Loading