From 20993f43498bcd3dbd864c2358a42fe01fe2fa72 Mon Sep 17 00:00:00 2001 From: juligasa <11684004+juligasa@users.noreply.github.com> Date: Fri, 25 Jul 2025 01:58:31 +0200 Subject: [PATCH 1/3] wip --- backend/api/entities/v1alpha/entities.go | 88 ++++++++++-- backend/blob/blob_change.go | 6 +- backend/blob/blob_comment.go | 2 +- backend/blob/blob_contact.go | 2 +- backend/blob/blob_profile.go | 2 +- backend/blob/index_sql.go | 168 +++++++++++++++++------ backend/storage/schema.gen.go | 30 ++-- backend/storage/schema.gensum | 4 +- backend/storage/schema.sql | 10 +- backend/storage/storage_migrations.go | 27 ++++ 10 files changed, 264 insertions(+), 75 deletions(-) diff --git a/backend/api/entities/v1alpha/entities.go b/backend/api/entities/v1alpha/entities.go index 56877db95..ced0ee9aa 100644 --- a/backend/api/entities/v1alpha/entities.go +++ b/backend/api/entities/v1alpha/entities.go @@ -6,6 +6,7 @@ import ( "encoding/base64" "encoding/hex" "encoding/json" + "errors" "fmt" "math" "regexp" @@ -213,6 +214,20 @@ func (task *discoveryTask) start(api *Server) { } } +var qGetLatestBlockChange = dqb.Str(` +SELECT + blob_id, + version, + block_id, + ts + from fts_index + -- WHERE type = :entityType + WHERE type IN ('title', 'document') + AND ts >= :Ts + AND genesis_blob = :genesisBlobID + AND blob_id != :blobID + ORDER BY ts ASC +`) var qGetFTS = dqb.Str(` WITH fts_top100 AS ( SELECT @@ -265,7 +280,8 @@ SELECT JOIN blobs AS b2 ON b2.id = a.value ) AS heads, - structural_blobs.ts + structural_blobs.ts, + structural_blobs.genesis_blob FROM fts_top100 AS f JOIN structural_blobs ON structural_blobs.id = f.blob_id @@ -330,6 +346,7 @@ type searchResult struct { docID string blobCID string blobID int64 + genesisBlobID int64 contentType string version string versionTime *timestamppb.Timestamp @@ -338,6 +355,10 @@ type searchResult struct { // SearchEntities implements the Fuzzy search of entities. func (srv *Server) SearchEntities(ctx context.Context, in *entities.SearchEntitiesRequest) (*entities.SearchEntitiesResponse, error) { + //start := time.Now() + //defer func() { + // fmt.Println("SearchEntities duration:", time.Since(start)) + //s}() searchResults := []searchResult{} type value struct { Value string `json:"v"` @@ -460,6 +481,10 @@ func (srv *Server) SearchEntities(ctx context.Context, in *entities.SearchEntiti ts := hlc.Timestamp(stmt.ColumnInt64(14) * 1000).Time() res.versionTime = timestamppb.New(ts) + res.genesisBlobID = stmt.ColumnInt64(15) + if res.genesisBlobID == 0 { + res.genesisBlobID = res.blobID + } if res.contentType == "comment" { res.iri = "hm://" + res.owner + "/" + res.tsid } else if res.contentType == "contact" { @@ -496,7 +521,7 @@ func (srv *Server) SearchEntities(ctx context.Context, in *entities.SearchEntiti key := fmt.Sprintf("%s|%s|%s|%s", res.iri, res.blockID, res.rawContent, res.contentType) if idx, ok := seen[key]; ok { // duplicate – compare blobID - if res.blobID > uniqueResults[idx].blobID { + if res.versionTime.AsTime().After(uniqueResults[idx].versionTime.AsTime()) { uniqueResults[idx] = res bm := bodyMatches[i] bm.Index = idx @@ -517,7 +542,7 @@ func (srv *Server) SearchEntities(ctx context.Context, in *entities.SearchEntiti //after := time.Now() //elapsed := after.Sub(before) - //fmt.Printf("qGetFTS took %.9f s and returned %d results\n", elapsed.Seconds(), len(bodyMatches)) + //fmt.Printf("qGetFTS took %.3f s and returned %d results\n", elapsed.Seconds(), len(bodyMatches)) matchingEntities := []*entities.Entity{} getParentsFcn := func(match fuzzy.Match) ([]string, error) { parents := make(map[string]interface{}) @@ -551,11 +576,11 @@ func (srv *Server) SearchEntities(ctx context.Context, in *entities.SearchEntiti } return parentTitles, nil } - //before = time.Now() //totalGetParentsTime := time.Duration(0) - //totalLatestBlockTime := time.Duration(0) - //var timesCalled int - + totalLatestBlockTime := time.Duration(0) + timesCalled := 0 + iter := 0 + prevIter := 0 for _, match := range bodyMatches { //startParents := time.Now() var parentTitles []string @@ -574,6 +599,50 @@ func (srv *Server) SearchEntities(ctx context.Context, in *entities.SearchEntiti id := searchResults[match.Index].iri if searchResults[match.Index].version != "" && searchResults[match.Index].contentType != "comment" { + + startLatestBlockTime := time.Now() + type Change struct { + blobID int64 + version string + ts *timestamppb.Timestamp + } + latestUnrelated := Change{ + blobID: searchResults[match.Index].blobID, + version: searchResults[match.Index].version, + ts: searchResults[match.Index].versionTime, + } + + var errSameBlockChangeDetected = errors.New("same block change detected") + if latestUnrelated.version != searchResults[match.Index].latestVersion { + timesCalled++ + prevIter = iter + if err := srv.db.WithSave(ctx, func(conn *sqlite.Conn) error { + return sqlitex.Exec(conn, qGetLatestBlockChange(), func(stmt *sqlite.Stmt) error { + iter++ + ts := hlc.Timestamp(stmt.ColumnInt64(3) * 1000).Time() + blockID := stmt.ColumnText(2) + currentChange := Change{ + blobID: stmt.ColumnInt64(0), + version: stmt.ColumnText(1), + ts: timestamppb.New(ts), + } + if blockID == searchResults[match.Index].blockID { + return errSameBlockChangeDetected + } + latestUnrelated = currentChange + return nil + }, searchResults[match.Index].versionTime.Seconds*1_000+int64(searchResults[match.Index].versionTime.Nanos)/1_000_000, searchResults[match.Index].genesisBlobID, searchResults[match.Index].blobID) + }); err != nil && !errors.Is(err, errSameBlockChangeDetected) { + return nil, err + } + if iter == prevIter { + fmt.Println("No iteration", searchResults[match.Index].contentType, searchResults[match.Index].versionTime.Seconds*1_000+int64(searchResults[match.Index].versionTime.Nanos)/1_000_000, searchResults[match.Index].genesisBlobID) + } + } + searchResults[match.Index].version = latestUnrelated.version + searchResults[match.Index].blobID = latestUnrelated.blobID + searchResults[match.Index].versionTime = latestUnrelated.ts + totalLatestBlockTime += time.Since(startLatestBlockTime) if searchResults[match.Index].latestVersion == searchResults[match.Index].version { searchResults[match.Index].version += "&l" } @@ -603,9 +672,8 @@ func (srv *Server) SearchEntities(ctx context.Context, in *entities.SearchEntiti } //after = time.Now() - //fmt.Printf("getParentsFcn took %.9f s\n", after.Sub(before).Seconds()) - //fmt.Printf("getParentsFcn took %.9f s\n", totalGetParentsTime.Seconds()) - //fmt.Printf("qGetLatestBlockChange took %.9f s and was called %d times\n", totalLatestBlockTime.Seconds(), timesCalled) + //fmt.Printf("getParentsFcn took %.3f s\n", totalGetParentsTime.Seconds()) + fmt.Printf("qGetLatestBlockChange took %.3f s and was called %d times and iterated over %d records\n", totalLatestBlockTime.Seconds(), timesCalled, iter) sort.Slice(matchingEntities, func(i, j int) bool { a, b := matchingEntities[i], matchingEntities[j] diff --git a/backend/blob/blob_change.go b/backend/blob/blob_change.go index 888c8460d..82ce3a1b2 100644 --- a/backend/blob/blob_change.go +++ b/backend/blob/blob_change.go @@ -422,7 +422,7 @@ func indexChange(ictx *indexingCtx, id int64, eb Encoded[*Change]) error { // TODO(hm24): index other relevant metadata for list response and so on. if extra.Title == "" && (k == "title" || k == "name" || k == "alias") { extra.Title = vs - if err := dbFTSInsertOrReplace(ictx.conn, vs, "title", id, "", sb.CID.String()); err != nil { + if err := dbFTSInsertOrReplace(ictx.conn, vs, "title", id, "", sb.CID.String(), sb.Ts, sb.GenesisBlob.Hash().String()); err != nil { return fmt.Errorf("failed to insert record in fts table: %w", err) } } @@ -462,7 +462,7 @@ func indexChange(ictx *indexingCtx, id int64, eb Encoded[*Change]) error { // TODO(hm24): index other relevant metadata for list response and so on. if extra.Title == "" && (k == "title" || k == "name" || k == "alias") { extra.Title = vs - if err := dbFTSInsertOrReplace(ictx.conn, vs, "title", id, "", sb.CID.String()); err != nil { + if err := dbFTSInsertOrReplace(ictx.conn, vs, "title", id, "", sb.CID.String(), sb.Ts, sb.GenesisBlob.Hash().String()); err != nil { return fmt.Errorf("failed to insert record in fts table: %w", err) } } @@ -495,7 +495,7 @@ func indexChange(ictx *indexingCtx, id int64, eb Encoded[*Change]) error { return err } } - if err := dbFTSInsertOrReplace(ictx.conn, blk.Text, "document", id, blk.ID(), sb.CID.String()); err != nil { + if err := dbFTSInsertOrReplace(ictx.conn, blk.Text, "document", id, blk.ID(), sb.CID.String(), sb.Ts, sb.GenesisBlob.Hash().String()); err != nil { return fmt.Errorf("failed to insert record in fts table: %w", err) } } diff --git a/backend/blob/blob_comment.go b/backend/blob/blob_comment.go index 86b5239fc..719ec9ac8 100644 --- a/backend/blob/blob_comment.go +++ b/backend/blob/blob_comment.go @@ -289,7 +289,7 @@ func indexComment(ictx *indexingCtx, id int64, eb Encoded[*Comment]) error { ftsBlkID = blk.ID() ftsContent = blk.Text if ftsContent != "" { - if err := dbFTSInsertOrReplace(ictx.conn, ftsContent, ftsType, id, ftsBlkID, sb.CID.String()); err != nil { + if err := dbFTSInsertOrReplace(ictx.conn, ftsContent, ftsType, id, ftsBlkID, sb.CID.String(), sb.Ts, sb.GenesisBlob.Hash().String()); err != nil { return fmt.Errorf("failed to insert record in fts table: %w", err) } } diff --git a/backend/blob/blob_contact.go b/backend/blob/blob_contact.go index fc423fe92..17aeb9cae 100644 --- a/backend/blob/blob_contact.go +++ b/backend/blob/blob_contact.go @@ -124,7 +124,7 @@ func indexContact(ictx *indexingCtx, id int64, eb Encoded[*Contact]) error { } extraAttrs["subject"] = subjectID extraAttrs["name"] = v.Name - if err := dbFTSInsertOrReplace(ictx.conn, v.Name, "contact", id, "", sb.CID.String()); err != nil { + if err := dbFTSInsertOrReplace(ictx.conn, v.Name, "contact", id, "", sb.CID.String(), sb.Ts, sb.GenesisBlob.Hash().String()); err != nil { return fmt.Errorf("failed to insert record in fts table: %w", err) } } else { diff --git a/backend/blob/blob_profile.go b/backend/blob/blob_profile.go index 8cdffb28e..eb80bda76 100644 --- a/backend/blob/blob_profile.go +++ b/backend/blob/blob_profile.go @@ -273,7 +273,7 @@ func indexProfile(ictx *indexingCtx, id int64, eb Encoded[*Profile]) error { } if ftsContent != "" { - if err := dbFTSInsertOrReplace(ictx.conn, ftsContent, ftsType, id, "", sb.CID.String()); err != nil { + if err := dbFTSInsertOrReplace(ictx.conn, ftsContent, ftsType, id, "", sb.CID.String(), sb.Ts, sb.GenesisBlob.Hash().String()); err != nil { return fmt.Errorf("failed to insert record in fts table: %w", err) } } diff --git a/backend/blob/index_sql.go b/backend/blob/index_sql.go index f0fba3bb7..4154779ba 100644 --- a/backend/blob/index_sql.go +++ b/backend/blob/index_sql.go @@ -6,6 +6,8 @@ import ( "seed/backend/util/dqb" "seed/backend/util/maybe" "seed/backend/util/sqlitegen" + "strings" + "time" "seed/backend/util/sqlite" "seed/backend/util/sqlite/sqlitex" @@ -49,7 +51,7 @@ var qBlobLinksInsertOrIgnore = dqb.Str(` VALUES (:blobLinksSource, :blobLinksType, :blobLinksTarget) `) -func dbFTSInsertOrReplace(conn *sqlite.Conn, FTSContent, FTSType string, FTSBlobID int64, FTSBlockID, FTSVersion string) error { +func dbFTSInsertOrReplace(conn *sqlite.Conn, FTSContent, FTSType string, FTSBlobID int64, FTSBlockID, FTSVersion string, FTSTs time.Time, FTSGenesisMultihash string) error { before := func(stmt *sqlite.Stmt) { stmt.SetText(":FTSContent", FTSContent) stmt.SetText(":FTSType", FTSType) @@ -67,13 +69,32 @@ func dbFTSInsertOrReplace(conn *sqlite.Conn, FTSContent, FTSType string, FTSBlob err = fmt.Errorf("failed query: FTSInsert: %w", err) return err } + lastRowID := conn.LastInsertRowID() + var genesisID int64 + if FTSGenesisMultihash != "" { + before = func(stmt *sqlite.Stmt) { + stmt.SetText(":FTSMultihash", strings.ToUpper(FTSGenesisMultihash)) + } + err = sqlitegen.ExecStmt(conn, qGetGenesisId(), before, func(_ int, stmt *sqlite.Stmt) error { + genesisID = stmt.ColumnInt64(0) + return nil + }) + if err != nil { + err = fmt.Errorf("failed query: qGetGenesisId: %w", err) + return err + } + } else { + genesisID = FTSBlobID + } before = func(stmt *sqlite.Stmt) { stmt.SetText(":FTSType", FTSType) stmt.SetInt64(":FTSBlobID", FTSBlobID) stmt.SetText(":FTSBlockID", FTSBlockID) stmt.SetText(":FTSVersion", FTSVersion) - stmt.SetText(":FTSRowID", "last_insert_rowid()") + stmt.SetInt64(":FTSRowID", lastRowID) + stmt.SetInt64(":FTSTs", FTSTs.UnixMilli()) + stmt.SetInt64(":FTSGenesisBlob", genesisID) } err = sqlitegen.ExecStmt(conn, qFTSIndexInsert(), before, onStep) @@ -81,64 +102,125 @@ func dbFTSInsertOrReplace(conn *sqlite.Conn, FTSContent, FTSType string, FTSBlob err = fmt.Errorf("failed query: FTSIndexInsert: %w", err) return err } + /* + rowsToUpdate := []int64{1, 45, 1034, 56, 467, 832, 11023} - before = func(stmt *sqlite.Stmt) { - stmt.SetText(":FTSType", FTSType) - stmt.SetInt64(":FTSBlobID", FTSBlobID) - stmt.SetText(":FTSBlockID", FTSBlockID) - } - rowsToUpdate := []int64{} - onStep = func(_ int, stmt *sqlite.Stmt) error { - rowsToUpdate = append(rowsToUpdate, stmt.ColumnInt64(0)) - return nil - } + before = func(stmt *sqlite.Stmt) { + stmt.SetText(":FTSMultihash", FTSGenesisHash) + } + var genesisID int64 + onStep = func(_ int, stmt *sqlite.Stmt) error { + genesisID = stmt.ColumnInt64(0) + return nil + } + genesisID++ - err = sqlitegen.ExecStmt(conn, qFTSCheck(), before, onStep) - if err != nil { - err = fmt.Errorf("failed query: FTSCheck: %w", err) - return err - } + err = sqlitegen.ExecStmt(conn, qGetDocumentBlobs(), before, onStep) + if err != nil { + err = fmt.Errorf("failed query: FTSCheck: %w", err) + return err + } - var idx int - if len(rowsToUpdate) > 0 { - before := func(stmt *sqlite.Stmt) { + before = func(stmt *sqlite.Stmt) { + stmt.SetText(":FTSType", FTSType) stmt.SetInt64(":FTSBlobID", FTSBlobID) - stmt.SetInt64(":FTSRowID", rowsToUpdate[idx]) - stmt.SetText(":FTSVersion", FTSVersion) - idx++ + stmt.SetText(":DocBlobIDs", strconv.FormatInt(genesisID, 10)) } - onStep := func(_ int, _ *sqlite.Stmt) error { + onStep = func(_ int, stmt *sqlite.Stmt) error { + rowsToUpdate = append(rowsToUpdate, stmt.ColumnInt64(0)) return nil } - err = sqlitegen.ExecStmt(conn, qFTSUpdate(), before, onStep) + + err = sqlitegen.ExecStmt(conn, qFTSCheck(), before, onStep) if err != nil { - err = fmt.Errorf("failed query: FTSUpdate: %w", err) + err = fmt.Errorf("failed query: FTSCheck: %w", err) return err } - } + var idx int + if len(rowsToUpdate) > 0 { + //fmt.Println("FTSUpdate: updating", len(rowsToUpdate), "rows") + before := func(stmt *sqlite.Stmt) { + stmt.SetInt64(":FTSBlobID", FTSBlobID) + stmt.SetInt64(":FTSRowID", rowsToUpdate[idx]) + stmt.SetText(":FTSVersion", FTSVersion) + idx++ + } + + onStep := func(_ int, _ *sqlite.Stmt) error { + return nil + } + err = sqlitegen.ExecStmt(conn, qFTSUpdate(), before, onStep) + if err != nil { + err = fmt.Errorf("failed query: FTSUpdate: %w", err) + return err + } + } + */ return nil } +var qGetGenesisId = dqb.Str(` +SELECT id FROM blobs +WHERE multihash = unhex(:FTSMultihash) +LIMIT 1; +`) + +var qFTSRecursiveCheck = dqb.Str(` +WITH RECURSIVE +genesis_id AS ( + SELECT + id + FROM blobs + WHERE lower(hex(multihash)) = :FTSMultihash + LIMIT 1 +) +relevant_cols AS ( + SELECT + fts_index.version, + fts_index.blob_id, + fts_type + FROM fts_index + JOIN structural_blobs ON fts_index.blob_id = structural_blobs.id + WHERE type IN ('document', 'title') + AND genesis_blob = (SELECT id FROM genesis_id) +), +nodes(rowid, ) AS ( + VALUES(:FTSBlobID) + UNION ALL + SELECT blob_id, block_id FROM fts_index JOIN nodes ON blob_id=bid + WHERE blob_id > +) +SELECT x FROM nodes; +`) + var qFTSCheck = dqb.Str(` SELECT rowid FROM fts_index - WHERE - block_id != :FTSBlockID - AND type = :FTSType - AND blob_id < :FTSBlobID - AND blob_id IN ( - SELECT id - FROM structural_blobs - WHERE genesis_blob = IFNULL( - (SELECT genesis_blob - FROM structural_blobs - WHERE id = :FTSBlobID), - :FTSBlobID - ) - ) + WHERE + ( + :FTSType = 'document' + AND type != :FTSType + ) OR ( + block_id != :FTSBlockID + AND (type = 'document' AND type = :FTSType) + AND blob_id < :FTSBlobID + AND blob_id IN (:DocBlobIDs) + ) OR ( + (type = 'title' AND type = :FTSType) + AND blob_id NOT IN (:DocBlobIDs) + ) +`) + +var qFTSCheckFast = dqb.Str(` + SELECT + rowid, + type, + blob_id + FROM fts_index + WHERE type = 'document' OR type = 'title' `) var qFTSUpdate = dqb.Str(` @@ -156,8 +238,8 @@ var qFTSInsert = dqb.Str(` `) var qFTSIndexInsert = dqb.Str(` - INSERT OR REPLACE INTO fts_index(rowid, type, blob_id, block_id, version) - VALUES (:FTSRowID, :FTSType, :FTSBlobID, :FTSBlockID, :FTSVersion) + INSERT OR REPLACE INTO fts_index(rowid, type, blob_id, block_id, version, ts, genesis_blob) + VALUES (:FTSRowID, :FTSType, :FTSBlobID, :FTSBlockID, :FTSVersion, :FTSTs, :FTSGenesisBlob) `) func dbResourceLinksInsert(conn *sqlite.Conn, sourceBlob, targetResource int64, ltype string, isPinned bool, meta []byte) error { diff --git a/backend/storage/schema.gen.go b/backend/storage/schema.gen.go index a4413f9d0..9978e55f9 100644 --- a/backend/storage/schema.gen.go +++ b/backend/storage/schema.gen.go @@ -194,22 +194,26 @@ const ( // Table fts_index. const ( - FtsIndex sqlitegen.Table = "fts_index" - FtsIndexBlobID sqlitegen.Column = "fts_index.blob_id" - FtsIndexBlockID sqlitegen.Column = "fts_index.block_id" - FtsIndexRowid sqlitegen.Column = "fts_index.rowid" - FtsIndexType sqlitegen.Column = "fts_index.type" - FtsIndexVersion sqlitegen.Column = "fts_index.version" + FtsIndex sqlitegen.Table = "fts_index" + FtsIndexBlobID sqlitegen.Column = "fts_index.blob_id" + FtsIndexBlockID sqlitegen.Column = "fts_index.block_id" + FtsIndexGenesisBlob sqlitegen.Column = "fts_index.genesis_blob" + FtsIndexRowid sqlitegen.Column = "fts_index.rowid" + FtsIndexTs sqlitegen.Column = "fts_index.ts" + FtsIndexType sqlitegen.Column = "fts_index.type" + FtsIndexVersion sqlitegen.Column = "fts_index.version" ) // Table fts_index. Plain strings. const ( - T_FtsIndex = "fts_index" - C_FtsIndexBlobID = "fts_index.blob_id" - C_FtsIndexBlockID = "fts_index.block_id" - C_FtsIndexRowid = "fts_index.rowid" - C_FtsIndexType = "fts_index.type" - C_FtsIndexVersion = "fts_index.version" + T_FtsIndex = "fts_index" + C_FtsIndexBlobID = "fts_index.blob_id" + C_FtsIndexBlockID = "fts_index.block_id" + C_FtsIndexGenesisBlob = "fts_index.genesis_blob" + C_FtsIndexRowid = "fts_index.rowid" + C_FtsIndexTs = "fts_index.ts" + C_FtsIndexType = "fts_index.type" + C_FtsIndexVersion = "fts_index.version" ) // Table kv. @@ -487,7 +491,9 @@ var Schema = sqlitegen.Schema{ FtsIdxTerm: {Table: FtsIdx, SQLType: ""}, FtsIndexBlobID: {Table: FtsIndex, SQLType: "INTEGER"}, FtsIndexBlockID: {Table: FtsIndex, SQLType: "TEXT"}, + FtsIndexGenesisBlob: {Table: FtsIndex, SQLType: "INTEGER"}, FtsIndexRowid: {Table: FtsIndex, SQLType: "INTEGER"}, + FtsIndexTs: {Table: FtsIndex, SQLType: "INTEGER"}, FtsIndexType: {Table: FtsIndex, SQLType: "TEXT"}, FtsIndexVersion: {Table: FtsIndex, SQLType: "TEXT"}, KVKey: {Table: KV, SQLType: "TEXT"}, diff --git a/backend/storage/schema.gensum b/backend/storage/schema.gensum index a427a7b5d..5fa67475a 100644 --- a/backend/storage/schema.gensum +++ b/backend/storage/schema.gensum @@ -1,2 +1,2 @@ -srcs: cd41c2d9a52971b49eb01d2c0bd67578 -outs: e0dbe662c3fc273ebe634f179b7f756b +srcs: 2935ac520500ac73c0925623c9c4f3fc +outs: ce0313f0b6d49569f615189dba0a46ad diff --git a/backend/storage/schema.sql b/backend/storage/schema.sql index e89610331..24cdc09d3 100644 --- a/backend/storage/schema.sql +++ b/backend/storage/schema.sql @@ -271,9 +271,15 @@ CREATE TABLE fts_index ( -- The block ID of the block that contains the content. block_id TEXT NOT NULL, -- The type of the content being indexed. - type TEXT NOT NULL + type TEXT NOT NULL, + -- The timestamp of the content being indexed. + ts INTEGER, + -- The genesis blob ID of the content being indexed. + genesis_blob INTEGER ) WITHOUT ROWID; CREATE INDEX fts_index_by_blob ON fts_index (blob_id); CREATE INDEX fts_index_by_version ON fts_index (version); CREATE INDEX fts_index_by_block ON fts_index (block_id); -CREATE INDEX fts_index_by_type ON fts_index (type); \ No newline at end of file +CREATE INDEX fts_index_by_type ON fts_index (type); +CREATE INDEX fts_index_by_ts ON fts_index (ts); +CREATE INDEX fts_index_by_genesis_blob ON fts_index (genesis_blob); \ No newline at end of file diff --git a/backend/storage/storage_migrations.go b/backend/storage/storage_migrations.go index bded33aaa..35b230546 100644 --- a/backend/storage/storage_migrations.go +++ b/backend/storage/storage_migrations.go @@ -57,6 +57,33 @@ type migration struct { // // In case of even the most minor doubts, consult with the team before adding a new migration, and submit the code to review if needed. var migrations = []migration{ + + {Version: "2025-07-24.01", Run: func(_ *Store, conn *sqlite.Conn) error { + if err := sqlitex.ExecScript(conn, sqlfmt(` + ALTER TABLE fts_index + ADD COLUMN ts INTEGER; + `)); err != nil { + return err + } + if err := sqlitex.ExecScript(conn, sqlfmt(` + ALTER TABLE fts_index + ADD COLUMN genesis_blob INTEGER; + `)); err != nil { + return err + } + if err := sqlitex.ExecScript(conn, sqlfmt(` + CREATE INDEX fts_index_by_ts ON fts_index (ts); + `)); err != nil { + return err + } + if err := sqlitex.ExecScript(conn, sqlfmt(` + CREATE INDEX fts_index_by_genesis_blob ON fts_index (genesis_blob); + `)); err != nil { + return err + } + // Reindexing to fix comment causality issues again. + return scheduleReindex(conn) + }}, {Version: "2025-07-22.01", Run: func(_ *Store, conn *sqlite.Conn) error { // Reindexing to fix comment causality issues again. return scheduleReindex(conn) From 7dc3db1aa536d30d4081799805c6026601a1ae08 Mon Sep 17 00:00:00 2001 From: juligasa <11684004+juligasa@users.noreply.github.com> Date: Fri, 25 Jul 2025 11:56:31 +0200 Subject: [PATCH 2/3] fix(daemon): account for proper document changes in fts --- backend/api/entities/v1alpha/entities.go | 27 ++-- backend/blob/blob_change.go | 23 ++-- backend/blob/blob_comment.go | 8 +- backend/blob/blob_profile.go | 8 +- backend/blob/index_sql.go | 121 ------------------ .../genproto/entities/v1alpha/entities.pb.go | 2 +- backend/storage/storage_migrations.go | 2 +- .../entities/v1alpha/entities_pb.ts | 2 +- proto/entities/v1alpha/entities.proto | 2 +- proto/entities/v1alpha/go.gensum | 4 +- proto/entities/v1alpha/js.gensum | 4 +- 11 files changed, 43 insertions(+), 160 deletions(-) diff --git a/backend/api/entities/v1alpha/entities.go b/backend/api/entities/v1alpha/entities.go index ced0ee9aa..6b1505ef7 100644 --- a/backend/api/entities/v1alpha/entities.go +++ b/backend/api/entities/v1alpha/entities.go @@ -221,11 +221,10 @@ SELECT block_id, ts from fts_index - -- WHERE type = :entityType - WHERE type IN ('title', 'document') + WHERE type IN ('title', 'document', 'meta') AND ts >= :Ts AND genesis_blob = :genesisBlobID - AND blob_id != :blobID + AND rowid != :rowID ORDER BY ts ASC `) var qGetFTS = dqb.Str(` @@ -238,7 +237,8 @@ WITH fts_top100 AS ( fts.blob_id, structural_blobs.genesis_blob, structural_blobs.extra_attrs->>'tsid' AS tsid, - fts.rank + fts.rank, + fts.rowid FROM fts JOIN structural_blobs ON structural_blobs.id = fts.blob_id @@ -281,7 +281,8 @@ SELECT ON b2.id = a.value ) AS heads, structural_blobs.ts, - structural_blobs.genesis_blob + structural_blobs.genesis_blob, + f.rowid FROM fts_top100 AS f JOIN structural_blobs ON structural_blobs.id = f.blob_id @@ -347,6 +348,7 @@ type searchResult struct { blobCID string blobID int64 genesisBlobID int64 + rowID int64 contentType string version string versionTime *timestamppb.Timestamp @@ -485,6 +487,7 @@ func (srv *Server) SearchEntities(ctx context.Context, in *entities.SearchEntiti if res.genesisBlobID == 0 { res.genesisBlobID = res.blobID } + res.rowID = stmt.ColumnInt64(16) if res.contentType == "comment" { res.iri = "hm://" + res.owner + "/" + res.tsid } else if res.contentType == "contact" { @@ -580,7 +583,7 @@ func (srv *Server) SearchEntities(ctx context.Context, in *entities.SearchEntiti totalLatestBlockTime := time.Duration(0) timesCalled := 0 iter := 0 - prevIter := 0 + //prevIter := 0 for _, match := range bodyMatches { //startParents := time.Now() var parentTitles []string @@ -615,7 +618,7 @@ func (srv *Server) SearchEntities(ctx context.Context, in *entities.SearchEntiti var errSameBlockChangeDetected = errors.New("same block change detected") if latestUnrelated.version != searchResults[match.Index].latestVersion { timesCalled++ - prevIter = iter + //prevIter = iter if err := srv.db.WithSave(ctx, func(conn *sqlite.Conn) error { return sqlitex.Exec(conn, qGetLatestBlockChange(), func(stmt *sqlite.Stmt) error { iter++ @@ -631,13 +634,13 @@ func (srv *Server) SearchEntities(ctx context.Context, in *entities.SearchEntiti } latestUnrelated = currentChange return nil - }, searchResults[match.Index].versionTime.Seconds*1_000+int64(searchResults[match.Index].versionTime.Nanos)/1_000_000, searchResults[match.Index].genesisBlobID, searchResults[match.Index].blobID) + }, searchResults[match.Index].versionTime.Seconds*1_000+int64(searchResults[match.Index].versionTime.Nanos)/1_000_000, searchResults[match.Index].genesisBlobID, searchResults[match.Index].rowID) }); err != nil && !errors.Is(err, errSameBlockChangeDetected) { return nil, err } - if iter == prevIter { - fmt.Println("No iteration", searchResults[match.Index].contentType, searchResults[match.Index].versionTime.Seconds*1_000+int64(searchResults[match.Index].versionTime.Nanos)/1_000_000, searchResults[match.Index].genesisBlobID) - } + //if iter == prevIter { + // fmt.Println("No iteration", searchResults[match.Index].contentType, searchResults[match.Index].versionTime.Seconds*1_000+int64(searchResults[match.Index].versionTime.Nanos)/1_000_000, searchResults[match.Index].genesisBlobID, searchResults[match.Index].blockID, searchResults[match.Index].blobID) + //} } searchResults[match.Index].version = latestUnrelated.version searchResults[match.Index].blobID = latestUnrelated.blobID @@ -673,7 +676,7 @@ func (srv *Server) SearchEntities(ctx context.Context, in *entities.SearchEntiti //after = time.Now() //fmt.Printf("getParentsFcn took %.3f s\n", totalGetParentsTime.Seconds()) - fmt.Printf("qGetLatestBlockChange took %.3f s and was called %d times and iterated over %d records\n", totalLatestBlockTime.Seconds(), timesCalled, iter) + //fmt.Printf("qGetLatestBlockChange took %.3f s and was called %d times and iterated over %d records\n", totalLatestBlockTime.Seconds(), timesCalled, iter) sort.Slice(matchingEntities, func(i, j int) bool { a, b := matchingEntities[i], matchingEntities[j] diff --git a/backend/blob/blob_change.go b/backend/blob/blob_change.go index 82ce3a1b2..1c2aa1d3b 100644 --- a/backend/blob/blob_change.go +++ b/backend/blob/blob_change.go @@ -418,15 +418,15 @@ func indexChange(ictx *indexingCtx, id int64, eb Encoded[*Change]) error { if !ok { continue } - + ftsType := "meta" // TODO(hm24): index other relevant metadata for list response and so on. - if extra.Title == "" && (k == "title" || k == "name" || k == "alias") { + if k == "title" || k == "name" || k == "alias" { extra.Title = vs - if err := dbFTSInsertOrReplace(ictx.conn, vs, "title", id, "", sb.CID.String(), sb.Ts, sb.GenesisBlob.Hash().String()); err != nil { - return fmt.Errorf("failed to insert record in fts table: %w", err) - } + ftsType = "title" + } + if err := dbFTSInsertOrReplace(ictx.conn, vs, ftsType, id, "", sb.CID.String(), sb.Ts, sb.GenesisBlob.Hash().String()); err != nil { + return fmt.Errorf("failed to insert record in fts table: %w", err) } - u, err := url.Parse(vs) if err != nil { continue @@ -458,13 +458,14 @@ func indexChange(ictx *indexingCtx, id int64, eb Encoded[*Change]) error { vs, isStr := kv.Value.(string) if len(kv.Key) == 1 && isStr { k := kv.Key[0] - + ftsKey := "meta" // TODO(hm24): index other relevant metadata for list response and so on. - if extra.Title == "" && (k == "title" || k == "name" || k == "alias") { + if k == "title" || k == "name" || k == "alias" { extra.Title = vs - if err := dbFTSInsertOrReplace(ictx.conn, vs, "title", id, "", sb.CID.String(), sb.Ts, sb.GenesisBlob.Hash().String()); err != nil { - return fmt.Errorf("failed to insert record in fts table: %w", err) - } + ftsKey = "title" + } + if err := dbFTSInsertOrReplace(ictx.conn, vs, ftsKey, id, "", sb.CID.String(), sb.Ts, sb.GenesisBlob.Hash().String()); err != nil { + return fmt.Errorf("failed to insert record in fts table: %w", err) } } diff --git a/backend/blob/blob_comment.go b/backend/blob/blob_comment.go index 719ec9ac8..c4dbe588f 100644 --- a/backend/blob/blob_comment.go +++ b/backend/blob/blob_comment.go @@ -288,11 +288,11 @@ func indexComment(ictx *indexingCtx, id int64, eb Encoded[*Comment]) error { } ftsBlkID = blk.ID() ftsContent = blk.Text - if ftsContent != "" { - if err := dbFTSInsertOrReplace(ictx.conn, ftsContent, ftsType, id, ftsBlkID, sb.CID.String(), sb.Ts, sb.GenesisBlob.Hash().String()); err != nil { - return fmt.Errorf("failed to insert record in fts table: %w", err) - } + //if ftsContent != "" { + if err := dbFTSInsertOrReplace(ictx.conn, ftsContent, ftsType, id, ftsBlkID, sb.CID.String(), sb.Ts, sb.GenesisBlob.Hash().String()); err != nil { + return fmt.Errorf("failed to insert record in fts table: %w", err) } + //} } return nil diff --git a/backend/blob/blob_profile.go b/backend/blob/blob_profile.go index eb80bda76..d3d6f9c76 100644 --- a/backend/blob/blob_profile.go +++ b/backend/blob/blob_profile.go @@ -272,10 +272,10 @@ func indexProfile(ictx *indexingCtx, id int64, eb Encoded[*Profile]) error { return fmt.Errorf("failed to save structural blob: %w", err) } - if ftsContent != "" { - if err := dbFTSInsertOrReplace(ictx.conn, ftsContent, ftsType, id, "", sb.CID.String(), sb.Ts, sb.GenesisBlob.Hash().String()); err != nil { - return fmt.Errorf("failed to insert record in fts table: %w", err) - } + //if ftsContent != "" { + if err := dbFTSInsertOrReplace(ictx.conn, ftsContent, ftsType, id, "", sb.CID.String(), sb.Ts, sb.GenesisBlob.Hash().String()); err != nil { + return fmt.Errorf("failed to insert record in fts table: %w", err) } + //} return nil } diff --git a/backend/blob/index_sql.go b/backend/blob/index_sql.go index 4154779ba..7bf72e8fb 100644 --- a/backend/blob/index_sql.go +++ b/backend/blob/index_sql.go @@ -102,62 +102,6 @@ func dbFTSInsertOrReplace(conn *sqlite.Conn, FTSContent, FTSType string, FTSBlob err = fmt.Errorf("failed query: FTSIndexInsert: %w", err) return err } - /* - rowsToUpdate := []int64{1, 45, 1034, 56, 467, 832, 11023} - - before = func(stmt *sqlite.Stmt) { - stmt.SetText(":FTSMultihash", FTSGenesisHash) - } - var genesisID int64 - onStep = func(_ int, stmt *sqlite.Stmt) error { - genesisID = stmt.ColumnInt64(0) - return nil - } - genesisID++ - - err = sqlitegen.ExecStmt(conn, qGetDocumentBlobs(), before, onStep) - if err != nil { - err = fmt.Errorf("failed query: FTSCheck: %w", err) - return err - } - - before = func(stmt *sqlite.Stmt) { - stmt.SetText(":FTSType", FTSType) - stmt.SetInt64(":FTSBlobID", FTSBlobID) - stmt.SetText(":DocBlobIDs", strconv.FormatInt(genesisID, 10)) - } - - onStep = func(_ int, stmt *sqlite.Stmt) error { - rowsToUpdate = append(rowsToUpdate, stmt.ColumnInt64(0)) - return nil - } - - err = sqlitegen.ExecStmt(conn, qFTSCheck(), before, onStep) - if err != nil { - err = fmt.Errorf("failed query: FTSCheck: %w", err) - return err - } - - var idx int - if len(rowsToUpdate) > 0 { - //fmt.Println("FTSUpdate: updating", len(rowsToUpdate), "rows") - before := func(stmt *sqlite.Stmt) { - stmt.SetInt64(":FTSBlobID", FTSBlobID) - stmt.SetInt64(":FTSRowID", rowsToUpdate[idx]) - stmt.SetText(":FTSVersion", FTSVersion) - idx++ - } - - onStep := func(_ int, _ *sqlite.Stmt) error { - return nil - } - err = sqlitegen.ExecStmt(conn, qFTSUpdate(), before, onStep) - if err != nil { - err = fmt.Errorf("failed query: FTSUpdate: %w", err) - return err - } - } - */ return nil } @@ -167,71 +111,6 @@ WHERE multihash = unhex(:FTSMultihash) LIMIT 1; `) -var qFTSRecursiveCheck = dqb.Str(` -WITH RECURSIVE -genesis_id AS ( - SELECT - id - FROM blobs - WHERE lower(hex(multihash)) = :FTSMultihash - LIMIT 1 -) -relevant_cols AS ( - SELECT - fts_index.version, - fts_index.blob_id, - fts_type - FROM fts_index - JOIN structural_blobs ON fts_index.blob_id = structural_blobs.id - WHERE type IN ('document', 'title') - AND genesis_blob = (SELECT id FROM genesis_id) -), -nodes(rowid, ) AS ( - VALUES(:FTSBlobID) - UNION ALL - SELECT blob_id, block_id FROM fts_index JOIN nodes ON blob_id=bid - WHERE blob_id > -) -SELECT x FROM nodes; -`) - -var qFTSCheck = dqb.Str(` - SELECT - rowid - FROM fts_index - WHERE - ( - :FTSType = 'document' - AND type != :FTSType - ) OR ( - block_id != :FTSBlockID - AND (type = 'document' AND type = :FTSType) - AND blob_id < :FTSBlobID - AND blob_id IN (:DocBlobIDs) - ) OR ( - (type = 'title' AND type = :FTSType) - AND blob_id NOT IN (:DocBlobIDs) - ) -`) - -var qFTSCheckFast = dqb.Str(` - SELECT - rowid, - type, - blob_id - FROM fts_index - WHERE type = 'document' OR type = 'title' -`) - -var qFTSUpdate = dqb.Str(` - UPDATE fts - SET - blob_id = :FTSBlobID, - version = :FTSVersion - WHERE - rowid = :FTSRowID -`) - var qFTSInsert = dqb.Str(` INSERT OR REPLACE INTO fts(raw_content, type, blob_id, block_id, version) VALUES (:FTSContent, :FTSType, :FTSBlobID, :FTSBlockID, :FTSVersion) diff --git a/backend/genproto/entities/v1alpha/entities.pb.go b/backend/genproto/entities/v1alpha/entities.pb.go index 73799d883..56bde23f9 100644 --- a/backend/genproto/entities/v1alpha/entities.pb.go +++ b/backend/genproto/entities/v1alpha/entities.pb.go @@ -646,7 +646,7 @@ type Entity struct { Content string `protobuf:"bytes,5,opt,name=content,proto3" json:"content,omitempty"` // The owner of the entity Owner string `protobuf:"bytes,6,opt,name=owner,proto3" json:"owner,omitempty"` - // The type of the entity it coud be Title, Document or Comment + // The type of the entity it coud be Title, Document, Comment, ... Type string `protobuf:"bytes,7,opt,name=type,proto3" json:"type,omitempty"` // Icon of the document containing that entity Icon string `protobuf:"bytes,8,opt,name=icon,proto3" json:"icon,omitempty"` diff --git a/backend/storage/storage_migrations.go b/backend/storage/storage_migrations.go index 35b230546..939d02735 100644 --- a/backend/storage/storage_migrations.go +++ b/backend/storage/storage_migrations.go @@ -58,7 +58,7 @@ type migration struct { // In case of even the most minor doubts, consult with the team before adding a new migration, and submit the code to review if needed. var migrations = []migration{ - {Version: "2025-07-24.01", Run: func(_ *Store, conn *sqlite.Conn) error { + {Version: "2025-07-25.01", Run: func(_ *Store, conn *sqlite.Conn) error { if err := sqlitex.ExecScript(conn, sqlfmt(` ALTER TABLE fts_index ADD COLUMN ts INTEGER; diff --git a/frontend/packages/shared/src/client/.generated/entities/v1alpha/entities_pb.ts b/frontend/packages/shared/src/client/.generated/entities/v1alpha/entities_pb.ts index 369e4f878..7d41fb8d1 100644 --- a/frontend/packages/shared/src/client/.generated/entities/v1alpha/entities_pb.ts +++ b/frontend/packages/shared/src/client/.generated/entities/v1alpha/entities_pb.ts @@ -585,7 +585,7 @@ export class Entity extends Message { owner = ""; /** - * The type of the entity it coud be Title, Document or Comment + * The type of the entity it coud be Title, Document, Comment, ... * * @generated from field: string type = 7; */ diff --git a/proto/entities/v1alpha/entities.proto b/proto/entities/v1alpha/entities.proto index 400288d76..5fcdc6e85 100644 --- a/proto/entities/v1alpha/entities.proto +++ b/proto/entities/v1alpha/entities.proto @@ -201,7 +201,7 @@ message Entity { // The owner of the entity string owner = 6; - // The type of the entity it coud be Title, Document or Comment + // The type of the entity it coud be Title, Document, Comment, ... string type = 7; // Icon of the document containing that entity diff --git a/proto/entities/v1alpha/go.gensum b/proto/entities/v1alpha/go.gensum index a7acaf843..236c7ff80 100644 --- a/proto/entities/v1alpha/go.gensum +++ b/proto/entities/v1alpha/go.gensum @@ -1,2 +1,2 @@ -srcs: 7350c8f46051e2875c77c1c0472449b1 -outs: e8dd21b1e801683e573147d33a66fbdb +srcs: 826b25cb3ad2ed1f0267f8bed7c791c5 +outs: 70868dced7347445be66bc259cae6882 diff --git a/proto/entities/v1alpha/js.gensum b/proto/entities/v1alpha/js.gensum index 19131cae2..8adc5b0fd 100644 --- a/proto/entities/v1alpha/js.gensum +++ b/proto/entities/v1alpha/js.gensum @@ -1,2 +1,2 @@ -srcs: 7350c8f46051e2875c77c1c0472449b1 -outs: 42b7ad32776e260fe6b15b69f6992664 +srcs: 826b25cb3ad2ed1f0267f8bed7c791c5 +outs: 030750789cf38cc03c61c7d393c54c9c From a46707ce2f0f61cd0dc3df978522d2e487d50d15 Mon Sep 17 00:00:00 2001 From: juligasa <11684004+juligasa@users.noreply.github.com> Date: Fri, 25 Jul 2025 12:05:01 +0200 Subject: [PATCH 3/3] fix(daemon): lint --- backend/blob/index_sql.go | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/backend/blob/index_sql.go b/backend/blob/index_sql.go index 7bf72e8fb..372aaf998 100644 --- a/backend/blob/index_sql.go +++ b/backend/blob/index_sql.go @@ -76,12 +76,12 @@ func dbFTSInsertOrReplace(conn *sqlite.Conn, FTSContent, FTSType string, FTSBlob stmt.SetText(":FTSMultihash", strings.ToUpper(FTSGenesisMultihash)) } - err = sqlitegen.ExecStmt(conn, qGetGenesisId(), before, func(_ int, stmt *sqlite.Stmt) error { + err = sqlitegen.ExecStmt(conn, qGetGenesisID(), before, func(_ int, stmt *sqlite.Stmt) error { genesisID = stmt.ColumnInt64(0) return nil }) if err != nil { - err = fmt.Errorf("failed query: qGetGenesisId: %w", err) + err = fmt.Errorf("failed query: qGetGenesisID: %w", err) return err } } else { @@ -105,7 +105,7 @@ func dbFTSInsertOrReplace(conn *sqlite.Conn, FTSContent, FTSType string, FTSBlob return nil } -var qGetGenesisId = dqb.Str(` +var qGetGenesisID = dqb.Str(` SELECT id FROM blobs WHERE multihash = unhex(:FTSMultihash) LIMIT 1;