diff --git a/.gitignore b/.gitignore index b82df91..741c165 100644 --- a/.gitignore +++ b/.gitignore @@ -1,3 +1,3 @@ *.swp SewerRat -*.sqlite3 +*.sqlite3* diff --git a/database.go b/database.go index 3a096cb..3d9fcad 100644 --- a/database.go +++ b/database.go @@ -11,9 +11,72 @@ import ( "io/fs" "database/sql" "strconv" + "context" _ "modernc.org/sqlite" ) +type ActiveTransaction struct { + Conn *sql.Conn + Tx *sql.Tx +} + +func (t *ActiveTransaction) Finish() { + t.Tx.Rollback() // This is a no-op once committed. + t.Conn.Close() +} + +func createTransaction(db *sql.DB) (*ActiveTransaction, error) { + ctx := context.Background() + success := false + + // We acquire a connection to run all of the pragmas. We don't know whether + // this is an existing connection or if it's generated anew, as + // database/sql manages the pool itself; so we have to run the pragmas + // every time, just in case. I wish we had some connection hooks. + conn, err := db.Conn(ctx) + if err != nil { + return nil, fmt.Errorf("failed to acquire connection; %w", err) + } + defer func() { + if !success { + conn.Close() + } + }() + + // Foreign key set-up must be done outside of the transaction, + // see https://sqlite.org/pragma.html#pragma_foreign_keys. + _, err = conn.ExecContext(ctx, "PRAGMA foreign_keys = ON") + if err != nil { + return nil, fmt.Errorf("failed to enable foreign key support; %w", err) + } + + // Improve performance in WAL journalling mode. However, this also needs to + // be changed outsite of a transaction, apparently. + _, err = conn.ExecContext(ctx, "PRAGMA synchronous = NORMAL") + if err != nil { + return nil, fmt.Errorf("failed to enable normal synchronous mode; %w", err) + } + + // Setting a busy timeout for write operations to avoid the SQLITE_BUSY error. + _, err = conn.ExecContext(ctx, "PRAGMA busy_timeout = 10000") + if err != nil { + return nil, fmt.Errorf("failed to enable busy timeout; %w", err) + } + + tx, err := conn.BeginTx(ctx, nil) + if err != nil { + return nil, fmt.Errorf("failed to create transaction; %w", err) + } + defer func() { + if !success { + tx.Rollback() + } + }() + + success = true; + return &ActiveTransaction{ Conn: conn, Tx: tx }, nil +} + func initializeDatabase(path string) (*sql.DB, error) { accessible := false if _, err := os.Stat(path); err == nil { @@ -25,13 +88,15 @@ func initializeDatabase(path string) (*sql.DB, error) { return nil, fmt.Errorf("failed to create SQLite file at %q; %w", path, err) } - _, err = db.Exec("PRAGMA foreign_keys = ON") - if err != nil { - return nil, fmt.Errorf("failed to enable foreign key support; %w", err) - } - if (!accessible) { - _, err = db.Exec(` + err := func () error { + atx, err := createTransaction(db) + if err != nil { + return fmt.Errorf("failed to prepare transaction for table setup; %w", err) + } + defer atx.Finish() + + _, err = atx.Tx.Exec(` CREATE TABLE paths(pid INTEGER PRIMARY KEY, path TEXT NOT NULL UNIQUE, user TEXT NOT NULL, time INTEGER NOT NULL, metadata BLOB); CREATE INDEX index_paths_path ON paths(path); CREATE INDEX index_paths_time ON paths(time, user); @@ -46,6 +111,25 @@ CREATE INDEX index_fields ON fields(field); CREATE TABLE links(pid INTEGER NOT NULL, fid INTEGER NOT NULL, tid INTEGER NOT NULL, FOREIGN KEY(pid) REFERENCES paths(pid) ON DELETE CASCADE, UNIQUE(pid, fid, tid)); CREATE INDEX index_links ON links(tid, fid); `) + if err != nil { + return fmt.Errorf("failed to create table in %q; %w", path, err) + } + + err = atx.Tx.Commit() + if err != nil { + return fmt.Errorf("failed to commit table creation commands for %s; %w", path, err) + } + + // Write-ahead logging is persistent and doesn't need to be set on every connection, + // see https://www.sqlite.org/wal.html#persistence_of_wal_mode. + _, err = atx.Conn.ExecContext(context.Background(), "PRAGMA journal_mode = WAL") + if err != nil { + return fmt.Errorf("failed to enable write-ahead logging; %w", err) + } + + return nil + }() + if err != nil { os.Remove(path) return nil, fmt.Errorf("failed to create table in %q; %w", path, err) @@ -173,27 +257,27 @@ func addDirectory(db *sql.DB, directory string, of_interest map[string]bool, tok } { - tx, err := db.Begin() + atx, err := createTransaction(db) if err != nil { - return nil, fmt.Errorf("failed to prepare a database transaction; %w", err) + return nil, fmt.Errorf("failed to prepare transaction for directory addition; %w", err) } - defer tx.Rollback() + defer atx.Finish() // Delete all previously registered paths with the directory's prefix for a fresh start; // otherwise there's no way to easily get rid of old paths that are no longer here. - err = deleteDirectory(db, directory) + err = deleteDirectoryTx(atx.Tx, directory) if err != nil { return nil, fmt.Errorf("failed to delete existing records for %q; %w", directory, err) } - prepped, err := newInsertStatements(tx) + prepped, err := newInsertStatements(atx.Tx) if err != nil { return nil, fmt.Errorf("failed to create prepared insertion statements for %q; %w", directory, err) } defer prepped.Close() // Adding each document to the pile. We do this in serial because I don't think transactions are thread-safe. - pstmt, err := tx.Prepare("INSERT INTO paths(path, user, time, metadata) VALUES(?, ?, ?, ?) RETURNING pid") + pstmt, err := atx.Tx.Prepare("INSERT INTO paths(path, user, time, metadata) VALUES(?, ?, ?, ?) RETURNING pid") if err != nil { return nil, fmt.Errorf("failed to prepare path insertion statement; %w", err) } @@ -212,11 +296,11 @@ func addDirectory(db *sql.DB, directory string, of_interest map[string]bool, tok continue } - tokfails := tokenizeMetadata(tx, parsed[i], f, pid, "", prepped, tokenizer) + tokfails := tokenizeMetadata(atx.Tx, parsed[i], f, pid, "", prepped, tokenizer) all_failures = append(all_failures, tokfails...) } - err = tx.Commit() + err = atx.Tx.Commit() if err != nil { return nil, fmt.Errorf("failed to commit the transaction for %q; %w", directory, err) } @@ -279,6 +363,26 @@ func tokenizeMetadata(tx *sql.Tx, parsed interface{}, path string, pid int64, fi /**********************************************************************/ func deleteDirectory(db *sql.DB, directory string) error { + atx, err := createTransaction(db) + if err != nil { + return fmt.Errorf("failed to prepare transaction for deletion; %w", err) + } + defer atx.Finish() + + err = deleteDirectoryTx(atx.Tx, directory) + if err != nil { + return fmt.Errorf("failed to set up directory deletion; %w", err) + } + + err = atx.Tx.Commit() + if err != nil { + return fmt.Errorf("failed to commit deletion transaction for %q; %w", directory, err) + } + + return nil +} + +func deleteDirectoryTx(tx *sql.Tx, directory string) error { // Trimming the suffix. if len(directory) > 0 { counter := len(directory) - 1 @@ -298,10 +402,11 @@ func deleteDirectory(db *sql.DB, directory string) error { query += " ESCAPE '" + escape + "'" } - _, err = db.Exec(query, pattern + "/%") + _, err = tx.Exec(query, pattern + "/%") if err != nil { return fmt.Errorf("failed to delete existing entries for %q; %w", directory, err) } + return nil } @@ -387,33 +492,33 @@ func updatePaths(db *sql.DB, tokenizer* unicodeTokenizer) ([]string, error) { } all_failures := []string{} - tx, err := db.Begin() + atx, err := createTransaction(db) if err != nil { - return nil, fmt.Errorf("failed to prepare a database transaction; %w", err) + return nil, fmt.Errorf("failed to prepare a database transaction for path updates; %w", err) } - defer tx.Rollback() + defer atx.Finish() // Updating the existing files. { - pustmt, err := tx.Prepare("UPDATE paths SET user = ?, time = ?, metadata = ? WHERE path = ?") + pustmt, err := atx.Tx.Prepare("UPDATE paths SET user = ?, time = ?, metadata = ? WHERE path = ?") if err != nil { return nil, fmt.Errorf("failed to prepare path update statement; %w", err) } defer pustmt.Close() - pistmt, err := tx.Prepare("SELECT pid FROM paths WHERE path = ?") + pistmt, err := atx.Tx.Prepare("SELECT pid FROM paths WHERE path = ?") if err != nil { return nil, fmt.Errorf("failed to prepare path ID statement; %w", err) } defer pistmt.Close() - delstmt, err := tx.Prepare("DELETE FROM links WHERE pid = ?") + delstmt, err := atx.Tx.Prepare("DELETE FROM links WHERE pid = ?") if err != nil { return nil, fmt.Errorf("failed to prepare link deletion statement; %w", err) } defer delstmt.Close() - prepped, err := newInsertStatements(tx) + prepped, err := newInsertStatements(atx.Tx) if err != nil { return nil, fmt.Errorf("failed to prepare token insertion statements for the update; %w", err) } @@ -445,14 +550,14 @@ func updatePaths(db *sql.DB, tokenizer* unicodeTokenizer) ([]string, error) { continue } - tokfails := tokenizeMetadata(tx, update_parsed[i], f, pid, "", prepped, tokenizer) + tokfails := tokenizeMetadata(atx.Tx, update_parsed[i], f, pid, "", prepped, tokenizer) all_failures = append(all_failures, tokfails...) } } // Purging the paths. { - delstmt, err := tx.Prepare("DELETE FROM paths WHERE path = ?") + delstmt, err := atx.Tx.Prepare("DELETE FROM paths WHERE path = ?") if err != nil { return nil, fmt.Errorf("failed to prepare the delete transaction; %w", err) } @@ -466,7 +571,7 @@ func updatePaths(db *sql.DB, tokenizer* unicodeTokenizer) ([]string, error) { } } - err = tx.Commit() + err = atx.Tx.Commit() if err != nil { return nil, fmt.Errorf("failed to commit the update transaction; %w", err) }