Skip to content

Commit

Permalink
Set pre-write pragmas to avoid busy errors, improve perf.
Browse files Browse the repository at this point in the history
This follows some recommendations at https://kerkour.com/sqlite-for-servers
and avoids SQLITE_BUSY errors for attempted concurrent reads/writes.
  • Loading branch information
LTLA committed Mar 22, 2024
1 parent 0ced5f4 commit 0d073c8
Show file tree
Hide file tree
Showing 2 changed files with 131 additions and 26 deletions.
2 changes: 1 addition & 1 deletion .gitignore
Original file line number Diff line number Diff line change
@@ -1,3 +1,3 @@
*.swp
SewerRat
*.sqlite3
*.sqlite3*
155 changes: 130 additions & 25 deletions database.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,9 +11,72 @@ import (
"io/fs"
"database/sql"
"strconv"
"context"
_ "modernc.org/sqlite"
)

type ActiveTransaction struct {
Conn *sql.Conn
Tx *sql.Tx
}

func (t *ActiveTransaction) Finish() {
t.Tx.Rollback() // This is a no-op once committed.
t.Conn.Close()
}

func createTransaction(db *sql.DB) (*ActiveTransaction, error) {
ctx := context.Background()
success := false

// We acquire a connection to run all of the pragmas. We don't know whether
// this is an existing connection or if it's generated anew, as
// database/sql manages the pool itself; so we have to run the pragmas
// every time, just in case. I wish we had some connection hooks.
conn, err := db.Conn(ctx)
if err != nil {
return nil, fmt.Errorf("failed to acquire connection; %w", err)
}
defer func() {
if !success {
conn.Close()
}
}()

// Foreign key set-up must be done outside of the transaction,
// see https://sqlite.org/pragma.html#pragma_foreign_keys.
_, err = conn.ExecContext(ctx, "PRAGMA foreign_keys = ON")
if err != nil {
return nil, fmt.Errorf("failed to enable foreign key support; %w", err)
}

// Improve performance in WAL journalling mode. However, this also needs to
// be changed outsite of a transaction, apparently.
_, err = conn.ExecContext(ctx, "PRAGMA synchronous = NORMAL")
if err != nil {
return nil, fmt.Errorf("failed to enable normal synchronous mode; %w", err)
}

// Setting a busy timeout for write operations to avoid the SQLITE_BUSY error.
_, err = conn.ExecContext(ctx, "PRAGMA busy_timeout = 10000")
if err != nil {
return nil, fmt.Errorf("failed to enable busy timeout; %w", err)
}

tx, err := conn.BeginTx(ctx, nil)
if err != nil {
return nil, fmt.Errorf("failed to create transaction; %w", err)
}
defer func() {
if !success {
tx.Rollback()
}
}()

success = true;
return &ActiveTransaction{ Conn: conn, Tx: tx }, nil
}

func initializeDatabase(path string) (*sql.DB, error) {
accessible := false
if _, err := os.Stat(path); err == nil {
Expand All @@ -25,13 +88,15 @@ func initializeDatabase(path string) (*sql.DB, error) {
return nil, fmt.Errorf("failed to create SQLite file at %q; %w", path, err)
}

_, err = db.Exec("PRAGMA foreign_keys = ON")
if err != nil {
return nil, fmt.Errorf("failed to enable foreign key support; %w", err)
}

if (!accessible) {
_, err = db.Exec(`
err := func () error {
atx, err := createTransaction(db)
if err != nil {
return fmt.Errorf("failed to prepare transaction for table setup; %w", err)
}
defer atx.Finish()

_, err = atx.Tx.Exec(`
CREATE TABLE paths(pid INTEGER PRIMARY KEY, path TEXT NOT NULL UNIQUE, user TEXT NOT NULL, time INTEGER NOT NULL, metadata BLOB);
CREATE INDEX index_paths_path ON paths(path);
CREATE INDEX index_paths_time ON paths(time, user);
Expand All @@ -46,6 +111,25 @@ CREATE INDEX index_fields ON fields(field);
CREATE TABLE links(pid INTEGER NOT NULL, fid INTEGER NOT NULL, tid INTEGER NOT NULL, FOREIGN KEY(pid) REFERENCES paths(pid) ON DELETE CASCADE, UNIQUE(pid, fid, tid));
CREATE INDEX index_links ON links(tid, fid);
`)
if err != nil {
return fmt.Errorf("failed to create table in %q; %w", path, err)
}

err = atx.Tx.Commit()
if err != nil {
return fmt.Errorf("failed to commit table creation commands for %s; %w", path, err)
}

// Write-ahead logging is persistent and doesn't need to be set on every connection,
// see https://www.sqlite.org/wal.html#persistence_of_wal_mode.
_, err = atx.Conn.ExecContext(context.Background(), "PRAGMA journal_mode = WAL")
if err != nil {
return fmt.Errorf("failed to enable write-ahead logging; %w", err)
}

return nil
}()

if err != nil {
os.Remove(path)
return nil, fmt.Errorf("failed to create table in %q; %w", path, err)
Expand Down Expand Up @@ -173,27 +257,27 @@ func addDirectory(db *sql.DB, directory string, of_interest map[string]bool, tok
}

{
tx, err := db.Begin()
atx, err := createTransaction(db)
if err != nil {
return nil, fmt.Errorf("failed to prepare a database transaction; %w", err)
return nil, fmt.Errorf("failed to prepare transaction for directory addition; %w", err)
}
defer tx.Rollback()
defer atx.Finish()

// Delete all previously registered paths with the directory's prefix for a fresh start;
// otherwise there's no way to easily get rid of old paths that are no longer here.
err = deleteDirectory(db, directory)
err = deleteDirectoryTx(atx.Tx, directory)
if err != nil {
return nil, fmt.Errorf("failed to delete existing records for %q; %w", directory, err)
}

prepped, err := newInsertStatements(tx)
prepped, err := newInsertStatements(atx.Tx)
if err != nil {
return nil, fmt.Errorf("failed to create prepared insertion statements for %q; %w", directory, err)
}
defer prepped.Close()

// Adding each document to the pile. We do this in serial because I don't think transactions are thread-safe.
pstmt, err := tx.Prepare("INSERT INTO paths(path, user, time, metadata) VALUES(?, ?, ?, ?) RETURNING pid")
pstmt, err := atx.Tx.Prepare("INSERT INTO paths(path, user, time, metadata) VALUES(?, ?, ?, ?) RETURNING pid")
if err != nil {
return nil, fmt.Errorf("failed to prepare path insertion statement; %w", err)
}
Expand All @@ -212,11 +296,11 @@ func addDirectory(db *sql.DB, directory string, of_interest map[string]bool, tok
continue
}

tokfails := tokenizeMetadata(tx, parsed[i], f, pid, "", prepped, tokenizer)
tokfails := tokenizeMetadata(atx.Tx, parsed[i], f, pid, "", prepped, tokenizer)
all_failures = append(all_failures, tokfails...)
}

err = tx.Commit()
err = atx.Tx.Commit()
if err != nil {
return nil, fmt.Errorf("failed to commit the transaction for %q; %w", directory, err)
}
Expand Down Expand Up @@ -279,6 +363,26 @@ func tokenizeMetadata(tx *sql.Tx, parsed interface{}, path string, pid int64, fi
/**********************************************************************/

func deleteDirectory(db *sql.DB, directory string) error {
atx, err := createTransaction(db)
if err != nil {
return fmt.Errorf("failed to prepare transaction for deletion; %w", err)
}
defer atx.Finish()

err = deleteDirectoryTx(atx.Tx, directory)
if err != nil {
return fmt.Errorf("failed to set up directory deletion; %w", err)
}

err = atx.Tx.Commit()
if err != nil {
return fmt.Errorf("failed to commit deletion transaction for %q; %w", directory, err)
}

return nil
}

func deleteDirectoryTx(tx *sql.Tx, directory string) error {
// Trimming the suffix.
if len(directory) > 0 {
counter := len(directory) - 1
Expand All @@ -298,10 +402,11 @@ func deleteDirectory(db *sql.DB, directory string) error {
query += " ESCAPE '" + escape + "'"
}

_, err = db.Exec(query, pattern + "/%")
_, err = tx.Exec(query, pattern + "/%")
if err != nil {
return fmt.Errorf("failed to delete existing entries for %q; %w", directory, err)
}

return nil
}

Expand Down Expand Up @@ -387,33 +492,33 @@ func updatePaths(db *sql.DB, tokenizer* unicodeTokenizer) ([]string, error) {
}

all_failures := []string{}
tx, err := db.Begin()
atx, err := createTransaction(db)
if err != nil {
return nil, fmt.Errorf("failed to prepare a database transaction; %w", err)
return nil, fmt.Errorf("failed to prepare a database transaction for path updates; %w", err)
}
defer tx.Rollback()
defer atx.Finish()

// Updating the existing files.
{
pustmt, err := tx.Prepare("UPDATE paths SET user = ?, time = ?, metadata = ? WHERE path = ?")
pustmt, err := atx.Tx.Prepare("UPDATE paths SET user = ?, time = ?, metadata = ? WHERE path = ?")
if err != nil {
return nil, fmt.Errorf("failed to prepare path update statement; %w", err)
}
defer pustmt.Close()

pistmt, err := tx.Prepare("SELECT pid FROM paths WHERE path = ?")
pistmt, err := atx.Tx.Prepare("SELECT pid FROM paths WHERE path = ?")
if err != nil {
return nil, fmt.Errorf("failed to prepare path ID statement; %w", err)
}
defer pistmt.Close()

delstmt, err := tx.Prepare("DELETE FROM links WHERE pid = ?")
delstmt, err := atx.Tx.Prepare("DELETE FROM links WHERE pid = ?")
if err != nil {
return nil, fmt.Errorf("failed to prepare link deletion statement; %w", err)
}
defer delstmt.Close()

prepped, err := newInsertStatements(tx)
prepped, err := newInsertStatements(atx.Tx)
if err != nil {
return nil, fmt.Errorf("failed to prepare token insertion statements for the update; %w", err)
}
Expand Down Expand Up @@ -445,14 +550,14 @@ func updatePaths(db *sql.DB, tokenizer* unicodeTokenizer) ([]string, error) {
continue
}

tokfails := tokenizeMetadata(tx, update_parsed[i], f, pid, "", prepped, tokenizer)
tokfails := tokenizeMetadata(atx.Tx, update_parsed[i], f, pid, "", prepped, tokenizer)
all_failures = append(all_failures, tokfails...)
}
}

// Purging the paths.
{
delstmt, err := tx.Prepare("DELETE FROM paths WHERE path = ?")
delstmt, err := atx.Tx.Prepare("DELETE FROM paths WHERE path = ?")
if err != nil {
return nil, fmt.Errorf("failed to prepare the delete transaction; %w", err)
}
Expand All @@ -466,7 +571,7 @@ func updatePaths(db *sql.DB, tokenizer* unicodeTokenizer) ([]string, error) {
}
}

err = tx.Commit()
err = atx.Tx.Commit()
if err != nil {
return nil, fmt.Errorf("failed to commit the update transaction; %w", err)
}
Expand Down

0 comments on commit 0d073c8

Please sign in to comment.