Skip to content

Commit

Permalink
Merge pull request #2707 from cloudflare/shrima/STOR-3710-add-SqliteO…
Browse files Browse the repository at this point in the history
…bserver

Introduce SqliteObserver and use it to collect stats related to queries and db size
  • Loading branch information
shrima-cf authored Sep 16, 2024
2 parents 488b490 + 222d244 commit 4d9d2f1
Show file tree
Hide file tree
Showing 9 changed files with 119 additions and 9 deletions.
1 change: 1 addition & 0 deletions src/workerd/io/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -154,6 +154,7 @@ wd_cc_library(
":trace",
":worker-interface",
"//src/workerd/jsg:observer",
"//src/workerd/util:sqlite",
],
)

Expand Down
3 changes: 2 additions & 1 deletion src/workerd/io/observer.h
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
#include <workerd/io/trace.h>
#include <workerd/io/features.capnp.h>
#include <workerd/jsg/observer.h>
#include <workerd/util/sqlite.h>

namespace workerd {

Expand Down Expand Up @@ -237,7 +238,7 @@ class WorkerObserver: public kj::AtomicRefcounted {
virtual void teardownFinished() {}
};

class ActorObserver: public kj::Refcounted {
class ActorObserver: public kj::Refcounted, public SqliteObserver {
public:
// Allows the observer to run in the background, periodically making observations. Owner must
// call this and store the promise. `limitEnforcer` is used to collect CPU usage metrics, it
Expand Down
3 changes: 2 additions & 1 deletion src/workerd/io/worker.c++
Original file line number Diff line number Diff line change
Expand Up @@ -3224,7 +3224,8 @@ struct Worker::Actor::Impl {
transient.emplace(js, js.obj());
}

actorCache = makeActorCache(self.worker->getIsolate().impl->actorCacheLru, outputGate, hooks);
actorCache = makeActorCache(
self.worker->getIsolate().impl->actorCacheLru, outputGate, hooks, *metrics);
});
}
};
Expand Down
7 changes: 5 additions & 2 deletions src/workerd/io/worker.h
Original file line number Diff line number Diff line change
Expand Up @@ -676,8 +676,11 @@ class Worker::Actor final: public kj::Refcounted {
// Callback which constructs the `ActorCacheInterface` instance (if any) for the Actor. This
// can be used to customize the storage implementation. This will be called synchronously in
// the constructor.
using MakeActorCacheFunc = kj::Function<kj::Maybe<kj::Own<ActorCacheInterface>>(
const ActorCache::SharedLru& sharedLru, OutputGate& outputGate, ActorCache::Hooks& hooks)>;
using MakeActorCacheFunc =
kj::Function<kj::Maybe<kj::Own<ActorCacheInterface>>(const ActorCache::SharedLru& sharedLru,
OutputGate& outputGate,
ActorCache::Hooks& hooks,
SqliteObserver& sqliteObserver)>;

// Callback which constructs the `DurableObjectStorage` instance for an actor. This can be used
// to customize the JavaScript API.
Expand Down
2 changes: 1 addition & 1 deletion src/workerd/server/server.c++
Original file line number Diff line number Diff line change
Expand Up @@ -1924,7 +1924,7 @@ public:
auto& channels = KJ_ASSERT_NONNULL(service.ioChannels.tryGet<LinkedIoChannels>());

auto makeActorCache = [&](const ActorCache::SharedLru& sharedLru, OutputGate& outputGate,
ActorCache::Hooks& hooks) {
ActorCache::Hooks& hooks, SqliteObserver& sqliteObserver) {
return config.tryGet<Durable>().map(
[&](const Durable& d) -> kj::Own<ActorCacheInterface> {
KJ_IF_SOME(as, channels.actorStorage) {
Expand Down
2 changes: 1 addition & 1 deletion src/workerd/tests/test-fixture.c++
Original file line number Diff line number Diff line change
Expand Up @@ -357,7 +357,7 @@ TestFixture::TestFixture(SetupParams&& params)
KJ_IF_SOME(id, params.actorId) {
worker->runInLockScope(Worker::Lock::TakeSynchronously(kj::none), [&](Worker::Lock& lock) {
auto makeActorCache = [](const ActorCache::SharedLru& sharedLru, OutputGate& outputGate,
ActorCache::Hooks& hooks) {
ActorCache::Hooks& hooks, SqliteObserver& sqliteObserver) {
return kj::heap<ActorCache>(
kj::heap<server::EmptyReadOnlyActorStorageImpl>(), sharedLru, outputGate, hooks);
};
Expand Down
70 changes: 70 additions & 0 deletions src/workerd/util/sqlite-test.c++
Original file line number Diff line number Diff line change
Expand Up @@ -782,5 +782,75 @@ KJ_TEST("reset database") {
}
}

KJ_TEST("SQLite observer addQueryStats") {
class TestSqliteObserver: public SqliteObserver {
public:
void addQueryStats(uint64_t read, uint64_t written) override {
rowsRead += read;
rowsWritten += written;
}

uint64_t rowsRead = 0;
uint64_t rowsWritten = 0;
};

TempDirOnDisk dir;
SqliteDatabase::Vfs vfs(*dir);
TestSqliteObserver sqliteObserver = TestSqliteObserver();
SqliteDatabase db(
vfs, kj::Path({"foo"}), kj::WriteMode::CREATE | kj::WriteMode::MODIFY, sqliteObserver);

db.run(R"(
CREATE TABLE things (
id INTEGER PRIMARY KEY
);
)");

// There are some rows read and written when we create the db, we offset this in the test
int rowsReadBefore = sqliteObserver.rowsRead;
int rowsWrittenBefore = sqliteObserver.rowsWritten;
constexpr int dbRowCount = 3;
{
db.run("INSERT INTO things (id) VALUES (10)");
db.run("INSERT INTO things (id) VALUES (11)");
db.run("INSERT INTO things (id) VALUES (12)");
}
KJ_EXPECT(sqliteObserver.rowsRead - rowsReadBefore == dbRowCount);
KJ_EXPECT(sqliteObserver.rowsWritten - rowsWrittenBefore == dbRowCount);

rowsReadBefore = sqliteObserver.rowsRead;
rowsWrittenBefore = sqliteObserver.rowsWritten;
{
auto getCount = db.prepare("SELECT COUNT(*) FROM things");
KJ_EXPECT(getCount.run().getInt(0) == dbRowCount);
}
KJ_EXPECT(sqliteObserver.rowsRead - rowsReadBefore == dbRowCount);
KJ_EXPECT(sqliteObserver.rowsWritten - rowsWrittenBefore == 0);

// Verify if addQueryStats works correctly when we call query.nextRow()
rowsReadBefore = sqliteObserver.rowsRead;
rowsWrittenBefore = sqliteObserver.rowsWritten;
{
auto stmt = db.prepare("SELECT * FROM things");
auto query = stmt.run();
KJ_ASSERT(!query.isDone());
while (!query.isDone()) {
query.nextRow();
}
}
KJ_EXPECT(sqliteObserver.rowsRead - rowsReadBefore == dbRowCount);
KJ_EXPECT(sqliteObserver.rowsWritten - rowsWrittenBefore == 0);

// Verify addQueryStats works correctly when db is reset
rowsReadBefore = sqliteObserver.rowsRead;
rowsWrittenBefore = sqliteObserver.rowsWritten;
{
auto query = db.run("INSERT INTO things (id) VALUES (100)");
db.reset();
}
KJ_EXPECT(sqliteObserver.rowsRead - rowsReadBefore == 1);
KJ_EXPECT(sqliteObserver.rowsWritten - rowsWrittenBefore == 1);
}

} // namespace
} // namespace workerd
18 changes: 16 additions & 2 deletions src/workerd/util/sqlite.c++
Original file line number Diff line number Diff line change
Expand Up @@ -367,12 +367,18 @@ static constexpr PragmaInfo ALLOWED_PRAGMAS[] = {{"data_version"_kj, PragmaSigna

// =======================================================================================

SqliteObserver SqliteObserver::DEFAULT = SqliteObserver{};

constexpr SqliteDatabase::Regulator SqliteDatabase::TRUSTED;

SqliteDatabase::SqliteDatabase(const Vfs& vfs, kj::Path path, kj::Maybe<kj::WriteMode> maybeMode)
SqliteDatabase::SqliteDatabase(const Vfs& vfs,
kj::Path path,
kj::Maybe<kj::WriteMode> maybeMode,
SqliteObserver& sqliteObserver)
: vfs(vfs),
path(kj::mv(path)),
readOnly(maybeMode == kj::none) {
readOnly(maybeMode == kj::none),
sqliteObserver(sqliteObserver) {
init(maybeMode);
}

Expand Down Expand Up @@ -979,6 +985,9 @@ SqliteDatabase::Query::Query(SqliteDatabase& db,
}

SqliteDatabase::Query::~Query() noexcept(false) {
//Update the db stats that we have collected for the query
db.sqliteObserver.addQueryStats(rowsRead, rowsWritten);

// We only need to reset the statement if we don't own it. If we own it, it's about to be
// destroyed anyway.
if (ownStatement.get() == nullptr) {
Expand Down Expand Up @@ -1096,6 +1105,11 @@ void SqliteDatabase::Query::nextRow() {
db.currentRegulator = regulator;

int err = sqlite3_step(statement);
// TODO(perf): This is slightly inefficient to call for every row read, but not bad enough to
// fix it immediately. The alternate way would be to getRowsRead/Written once when we emit it
// in the Dtor, and handle the case where the statement could be null when the Query gets destructed
rowsRead = getRowsRead();
rowsWritten = getRowsWritten();
if (err == SQLITE_DONE) {
done = true;
} else if (err != SQLITE_ROW) {
Expand Down
22 changes: 21 additions & 1 deletion src/workerd/util/sqlite.h
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,16 @@ namespace workerd {
using kj::byte;
using kj::uint;

// Used to collect periodic metrics about queries and size of sqlite db
class SqliteObserver {
public:
virtual void addQueryStats(uint64_t rowsRead, uint64_t rowsWritten) {}
// The method is not used by the SqliteDatabase, it is added here for convenience
virtual void setSqliteStoredBytes(uint64_t sqliteStoredBytes) {}

static SqliteObserver DEFAULT;
};

// C++/KJ API for SQLite.
//
// In addition to providing a more modern C++ interface vs. the classic C API, this API layers
Expand All @@ -43,7 +53,10 @@ class SqliteDatabase {
uint64_t statementCount;
};

SqliteDatabase(const Vfs& vfs, kj::Path path, kj::Maybe<kj::WriteMode> maybeMode = kj::none);
SqliteDatabase(const Vfs& vfs,
kj::Path path,
kj::Maybe<kj::WriteMode> maybeMode = kj::none,
SqliteObserver& sqliteObserver = SqliteObserver::DEFAULT);
~SqliteDatabase() noexcept(false);
KJ_DISALLOW_COPY_AND_MOVE(SqliteDatabase);

Expand Down Expand Up @@ -209,6 +222,7 @@ class SqliteDatabase {
const Vfs& vfs;
kj::Path path;
bool readOnly;
SqliteObserver& sqliteObserver;

// This pointer can be left null if a call to reset() failed to re-open the database.
kj::Maybe<sqlite3&> maybeDb;
Expand Down Expand Up @@ -390,6 +404,12 @@ class SqliteDatabase::Query final: private ResetListener {
kj::Maybe<sqlite3_stmt&> maybeStatement; // null if database was reset
bool done = false;

// Storing the rowsRead and rowsWritten here to use in cases where a DB is reset.
// When the DB is reset, getRowdRead and getRowsWritten will fail as the statement they
// refer to gets destroyed as part of the reset process.
uint64_t rowsRead = 0;
uint64_t rowsWritten = 0;

friend class SqliteDatabase;

Query(SqliteDatabase& db,
Expand Down

0 comments on commit 4d9d2f1

Please sign in to comment.