diff --git a/src/workerd/io/BUILD.bazel b/src/workerd/io/BUILD.bazel index e4581ad0228..d0db4c65f4a 100644 --- a/src/workerd/io/BUILD.bazel +++ b/src/workerd/io/BUILD.bazel @@ -154,6 +154,7 @@ wd_cc_library( ":trace", ":worker-interface", "//src/workerd/jsg:observer", + "//src/workerd/util:sqlite", ], ) diff --git a/src/workerd/io/observer.h b/src/workerd/io/observer.h index a4052d88cdf..3bcd8ff9aa5 100644 --- a/src/workerd/io/observer.h +++ b/src/workerd/io/observer.h @@ -13,6 +13,7 @@ #include #include #include +#include namespace workerd { @@ -237,7 +238,7 @@ class WorkerObserver: public kj::AtomicRefcounted { virtual void teardownFinished() {} }; -class ActorObserver: public kj::Refcounted { +class ActorObserver: public kj::Refcounted, public SqliteObserver { public: // Allows the observer to run in the background, periodically making observations. Owner must // call this and store the promise. `limitEnforcer` is used to collect CPU usage metrics, it diff --git a/src/workerd/io/worker.c++ b/src/workerd/io/worker.c++ index 75a2a2c8285..225d3bf5f50 100644 --- a/src/workerd/io/worker.c++ +++ b/src/workerd/io/worker.c++ @@ -3224,7 +3224,8 @@ struct Worker::Actor::Impl { transient.emplace(js, js.obj()); } - actorCache = makeActorCache(self.worker->getIsolate().impl->actorCacheLru, outputGate, hooks); + actorCache = makeActorCache( + self.worker->getIsolate().impl->actorCacheLru, outputGate, hooks, *metrics); }); } }; diff --git a/src/workerd/io/worker.h b/src/workerd/io/worker.h index 1da2f81ba2a..3b3c7f7d7f6 100644 --- a/src/workerd/io/worker.h +++ b/src/workerd/io/worker.h @@ -676,8 +676,11 @@ class Worker::Actor final: public kj::Refcounted { // Callback which constructs the `ActorCacheInterface` instance (if any) for the Actor. This // can be used to customize the storage implementation. This will be called synchronously in // the constructor. - using MakeActorCacheFunc = kj::Function>( - const ActorCache::SharedLru& sharedLru, OutputGate& outputGate, ActorCache::Hooks& hooks)>; + using MakeActorCacheFunc = + kj::Function>(const ActorCache::SharedLru& sharedLru, + OutputGate& outputGate, + ActorCache::Hooks& hooks, + SqliteObserver& sqliteObserver)>; // Callback which constructs the `DurableObjectStorage` instance for an actor. This can be used // to customize the JavaScript API. diff --git a/src/workerd/server/server.c++ b/src/workerd/server/server.c++ index d98427a751b..e292cc13e0e 100644 --- a/src/workerd/server/server.c++ +++ b/src/workerd/server/server.c++ @@ -1924,7 +1924,7 @@ public: auto& channels = KJ_ASSERT_NONNULL(service.ioChannels.tryGet()); auto makeActorCache = [&](const ActorCache::SharedLru& sharedLru, OutputGate& outputGate, - ActorCache::Hooks& hooks) { + ActorCache::Hooks& hooks, SqliteObserver& sqliteObserver) { return config.tryGet().map( [&](const Durable& d) -> kj::Own { KJ_IF_SOME(as, channels.actorStorage) { diff --git a/src/workerd/tests/test-fixture.c++ b/src/workerd/tests/test-fixture.c++ index 2fb142a6db3..745d9be42c3 100644 --- a/src/workerd/tests/test-fixture.c++ +++ b/src/workerd/tests/test-fixture.c++ @@ -357,7 +357,7 @@ TestFixture::TestFixture(SetupParams&& params) KJ_IF_SOME(id, params.actorId) { worker->runInLockScope(Worker::Lock::TakeSynchronously(kj::none), [&](Worker::Lock& lock) { auto makeActorCache = [](const ActorCache::SharedLru& sharedLru, OutputGate& outputGate, - ActorCache::Hooks& hooks) { + ActorCache::Hooks& hooks, SqliteObserver& sqliteObserver) { return kj::heap( kj::heap(), sharedLru, outputGate, hooks); }; diff --git a/src/workerd/util/sqlite-test.c++ b/src/workerd/util/sqlite-test.c++ index 843ffe2886e..df11020e9f7 100644 --- a/src/workerd/util/sqlite-test.c++ +++ b/src/workerd/util/sqlite-test.c++ @@ -782,5 +782,75 @@ KJ_TEST("reset database") { } } +KJ_TEST("SQLite observer addQueryStats") { + class TestSqliteObserver: public SqliteObserver { + public: + void addQueryStats(uint64_t read, uint64_t written) override { + rowsRead += read; + rowsWritten += written; + } + + uint64_t rowsRead = 0; + uint64_t rowsWritten = 0; + }; + + TempDirOnDisk dir; + SqliteDatabase::Vfs vfs(*dir); + TestSqliteObserver sqliteObserver = TestSqliteObserver(); + SqliteDatabase db( + vfs, kj::Path({"foo"}), kj::WriteMode::CREATE | kj::WriteMode::MODIFY, sqliteObserver); + + db.run(R"( + CREATE TABLE things ( + id INTEGER PRIMARY KEY + ); + )"); + + // There are some rows read and written when we create the db, we offset this in the test + int rowsReadBefore = sqliteObserver.rowsRead; + int rowsWrittenBefore = sqliteObserver.rowsWritten; + constexpr int dbRowCount = 3; + { + db.run("INSERT INTO things (id) VALUES (10)"); + db.run("INSERT INTO things (id) VALUES (11)"); + db.run("INSERT INTO things (id) VALUES (12)"); + } + KJ_EXPECT(sqliteObserver.rowsRead - rowsReadBefore == dbRowCount); + KJ_EXPECT(sqliteObserver.rowsWritten - rowsWrittenBefore == dbRowCount); + + rowsReadBefore = sqliteObserver.rowsRead; + rowsWrittenBefore = sqliteObserver.rowsWritten; + { + auto getCount = db.prepare("SELECT COUNT(*) FROM things"); + KJ_EXPECT(getCount.run().getInt(0) == dbRowCount); + } + KJ_EXPECT(sqliteObserver.rowsRead - rowsReadBefore == dbRowCount); + KJ_EXPECT(sqliteObserver.rowsWritten - rowsWrittenBefore == 0); + + // Verify if addQueryStats works correctly when we call query.nextRow() + rowsReadBefore = sqliteObserver.rowsRead; + rowsWrittenBefore = sqliteObserver.rowsWritten; + { + auto stmt = db.prepare("SELECT * FROM things"); + auto query = stmt.run(); + KJ_ASSERT(!query.isDone()); + while (!query.isDone()) { + query.nextRow(); + } + } + KJ_EXPECT(sqliteObserver.rowsRead - rowsReadBefore == dbRowCount); + KJ_EXPECT(sqliteObserver.rowsWritten - rowsWrittenBefore == 0); + + // Verify addQueryStats works correctly when db is reset + rowsReadBefore = sqliteObserver.rowsRead; + rowsWrittenBefore = sqliteObserver.rowsWritten; + { + auto query = db.run("INSERT INTO things (id) VALUES (100)"); + db.reset(); + } + KJ_EXPECT(sqliteObserver.rowsRead - rowsReadBefore == 1); + KJ_EXPECT(sqliteObserver.rowsWritten - rowsWrittenBefore == 1); +} + } // namespace } // namespace workerd diff --git a/src/workerd/util/sqlite.c++ b/src/workerd/util/sqlite.c++ index dd7b7745a4f..983798e161f 100644 --- a/src/workerd/util/sqlite.c++ +++ b/src/workerd/util/sqlite.c++ @@ -367,12 +367,18 @@ static constexpr PragmaInfo ALLOWED_PRAGMAS[] = {{"data_version"_kj, PragmaSigna // ======================================================================================= +SqliteObserver SqliteObserver::DEFAULT = SqliteObserver{}; + constexpr SqliteDatabase::Regulator SqliteDatabase::TRUSTED; -SqliteDatabase::SqliteDatabase(const Vfs& vfs, kj::Path path, kj::Maybe maybeMode) +SqliteDatabase::SqliteDatabase(const Vfs& vfs, + kj::Path path, + kj::Maybe maybeMode, + SqliteObserver& sqliteObserver) : vfs(vfs), path(kj::mv(path)), - readOnly(maybeMode == kj::none) { + readOnly(maybeMode == kj::none), + sqliteObserver(sqliteObserver) { init(maybeMode); } @@ -979,6 +985,9 @@ SqliteDatabase::Query::Query(SqliteDatabase& db, } SqliteDatabase::Query::~Query() noexcept(false) { + //Update the db stats that we have collected for the query + db.sqliteObserver.addQueryStats(rowsRead, rowsWritten); + // We only need to reset the statement if we don't own it. If we own it, it's about to be // destroyed anyway. if (ownStatement.get() == nullptr) { @@ -1096,6 +1105,11 @@ void SqliteDatabase::Query::nextRow() { db.currentRegulator = regulator; int err = sqlite3_step(statement); + // TODO(perf): This is slightly inefficient to call for every row read, but not bad enough to + // fix it immediately. The alternate way would be to getRowsRead/Written once when we emit it + // in the Dtor, and handle the case where the statement could be null when the Query gets destructed + rowsRead = getRowsRead(); + rowsWritten = getRowsWritten(); if (err == SQLITE_DONE) { done = true; } else if (err != SQLITE_ROW) { diff --git a/src/workerd/util/sqlite.h b/src/workerd/util/sqlite.h index e82dc65c821..17e8b87e7f0 100644 --- a/src/workerd/util/sqlite.h +++ b/src/workerd/util/sqlite.h @@ -20,6 +20,16 @@ namespace workerd { using kj::byte; using kj::uint; +// Used to collect periodic metrics about queries and size of sqlite db +class SqliteObserver { +public: + virtual void addQueryStats(uint64_t rowsRead, uint64_t rowsWritten) {} + // The method is not used by the SqliteDatabase, it is added here for convenience + virtual void setSqliteStoredBytes(uint64_t sqliteStoredBytes) {} + + static SqliteObserver DEFAULT; +}; + // C++/KJ API for SQLite. // // In addition to providing a more modern C++ interface vs. the classic C API, this API layers @@ -43,7 +53,10 @@ class SqliteDatabase { uint64_t statementCount; }; - SqliteDatabase(const Vfs& vfs, kj::Path path, kj::Maybe maybeMode = kj::none); + SqliteDatabase(const Vfs& vfs, + kj::Path path, + kj::Maybe maybeMode = kj::none, + SqliteObserver& sqliteObserver = SqliteObserver::DEFAULT); ~SqliteDatabase() noexcept(false); KJ_DISALLOW_COPY_AND_MOVE(SqliteDatabase); @@ -209,6 +222,7 @@ class SqliteDatabase { const Vfs& vfs; kj::Path path; bool readOnly; + SqliteObserver& sqliteObserver; // This pointer can be left null if a call to reset() failed to re-open the database. kj::Maybe maybeDb; @@ -390,6 +404,12 @@ class SqliteDatabase::Query final: private ResetListener { kj::Maybe maybeStatement; // null if database was reset bool done = false; + // Storing the rowsRead and rowsWritten here to use in cases where a DB is reset. + // When the DB is reset, getRowdRead and getRowsWritten will fail as the statement they + // refer to gets destroyed as part of the reset process. + uint64_t rowsRead = 0; + uint64_t rowsWritten = 0; + friend class SqliteDatabase; Query(SqliteDatabase& db,