Skip to content

Commit

Permalink
Merge pull request #3069 from cloudflare/jphillips/byte-stream-observer
Browse files Browse the repository at this point in the history
Add ByteStreamObserver to enable monitoring of byte stream queue size and memory usage.
  • Loading branch information
jp4a50 authored Nov 7, 2024
2 parents 10a1edb + 3178392 commit df6d46c
Show file tree
Hide file tree
Showing 11 changed files with 143 additions and 13 deletions.
1 change: 1 addition & 0 deletions src/workerd/api/sockets.c++
Original file line number Diff line number Diff line change
Expand Up @@ -160,6 +160,7 @@ jsg::Ref<Socket> setupSocket(jsg::Lock& js,
auto openedPrPair = js.newPromiseAndResolver<SocketInfo>();
openedPrPair.promise.markAsHandled(js);
auto writable = jsg::alloc<WritableStream>(ioContext, kj::mv(sysStreams.writable),
ioContext.getMetrics().tryCreateWritableByteStreamObserver(),
getWritableHighWaterMark(options), openedPrPair.promise.whenResolved(js));

auto result = jsg::alloc<Socket>(js, ioContext, kj::mv(refcountedConnection),
Expand Down
3 changes: 2 additions & 1 deletion src/workerd/api/streams/common.h
Original file line number Diff line number Diff line change
Expand Up @@ -726,7 +726,8 @@ class WritableStreamController {

kj::Own<WritableStreamController> newWritableStreamJsController();
kj::Own<WritableStreamController> newWritableStreamInternalController(IoContext& ioContext,
kj::Own<WritableStreamSink> source,
kj::Own<WritableStreamSink> sink,
kj::Maybe<kj::Own<ByteStreamObserver>> observer,
kj::Maybe<uint64_t> maybeHighWaterMark = kj::none,
kj::Maybe<jsg::Promise<void>> maybeClosureWaitable = kj::none);

Expand Down
6 changes: 4 additions & 2 deletions src/workerd/api/streams/compression.c++
Original file line number Diff line number Diff line change
Expand Up @@ -504,7 +504,8 @@ jsg::Ref<CompressionStream> CompressionStream::constructor(kj::String format) {
auto& ioContext = IoContext::current();

return jsg::alloc<CompressionStream>(jsg::alloc<ReadableStream>(ioContext, kj::mv(readableSide)),
jsg::alloc<WritableStream>(ioContext, kj::mv(writableSide)));
jsg::alloc<WritableStream>(ioContext, kj::mv(writableSide),
ioContext.getMetrics().tryCreateWritableByteStreamObserver()));
}

jsg::Ref<DecompressionStream> DecompressionStream::constructor(jsg::Lock& js, kj::String format) {
Expand All @@ -521,7 +522,8 @@ jsg::Ref<DecompressionStream> DecompressionStream::constructor(jsg::Lock& js, kj

return jsg::alloc<DecompressionStream>(
jsg::alloc<ReadableStream>(ioContext, kj::mv(readableSide)),
jsg::alloc<WritableStream>(ioContext, kj::mv(writableSide)));
jsg::alloc<WritableStream>(ioContext, kj::mv(writableSide),
ioContext.getMetrics().tryCreateWritableByteStreamObserver()));
}

} // namespace workerd::api
82 changes: 81 additions & 1 deletion src/workerd/api/streams/internal-test.c++
Original file line number Diff line number Diff line change
Expand Up @@ -230,7 +230,8 @@ KJ_TEST("WritableStreamInternalController queue size assertion") {
// allowed to be queued.

jsg::Ref<ReadableStream> source = ReadableStream::constructor(env.js, kj::none, kj::none);
jsg::Ref<WritableStream> sink = jsg::alloc<WritableStream>(env.context, kj::heap<MySink>());
jsg::Ref<WritableStream> sink =
jsg::alloc<WritableStream>(env.context, kj::heap<MySink>(), kj::none);

auto pipeTo = source->pipeTo(env.js, sink.addRef(), PipeToOptions{.preventClose = true});

Expand Down Expand Up @@ -272,5 +273,84 @@ KJ_TEST("WritableStreamInternalController queue size assertion") {
});
}

KJ_TEST("WritableStreamInternalController observability") {

capnp::MallocMessageBuilder message;
auto flags = message.initRoot<CompatibilityFlags>();
flags.setNodeJsCompat(true);
flags.setWorkerdExperimental(true);
flags.setStreamsJavaScriptControllers(true);

TestFixture::SetupParams params;
TestFixture fixture({.featureFlags = flags.asReader()});

class MySink final: public WritableStreamSink {
public:
kj::Promise<void> write(kj::ArrayPtr<const byte> buffer) override {
++writeCount;
return kj::READY_NOW;
}
kj::Promise<void> write(kj::ArrayPtr<const kj::ArrayPtr<const byte>> pieces) override {
return kj::READY_NOW;
}
kj::Promise<void> end() override {
return kj::READY_NOW;
}
void abort(kj::Exception reason) override {}
uint getWriteCount() {
return writeCount;
}

private:
uint writeCount = 0;
};

class MyObserver final: public ByteStreamObserver {
public:
virtual void onChunkEnqueued(size_t bytes) {
++queueSize;
queueSizeBytes += bytes;
};
virtual void onChunkDequeued(size_t bytes) {
queueSizeBytes -= bytes;
--queueSize;
};
uint64_t queueSize = 0;
uint64_t queueSizeBytes = 0;
};

auto myObserver = kj::heap<MyObserver>();
auto& observer = *myObserver;
kj::Maybe<jsg::Ref<WritableStream>> stream;
fixture.runInIoContext([&](const TestFixture::Environment& env) -> kj::Promise<void> {
stream = jsg::alloc<WritableStream>(env.context, kj::heap<MySink>(), kj::mv(myObserver));

auto write = [&](size_t size) {
auto buffersource = env.js.bytes(kj::heapArray<kj::byte>(size));
return env.context.awaitJs(env.js,
KJ_ASSERT_NONNULL(stream)->getController().write(env.js, buffersource.getHandle(env.js)));
};

KJ_ASSERT(observer.queueSize == 0);
KJ_ASSERT(observer.queueSizeBytes == 0);

auto builder = kj::heapArrayBuilder<kj::Promise<void>>(2);
builder.add(write(1));

KJ_ASSERT(observer.queueSize == 1);
KJ_ASSERT(observer.queueSizeBytes == 1);

builder.add(write(10));

KJ_ASSERT(observer.queueSize == 2);
KJ_ASSERT(observer.queueSizeBytes == 11);

return kj::joinPromises(builder.finish());
});

KJ_ASSERT(observer.queueSize == 0);
KJ_ASSERT(observer.queueSizeBytes == 0);
}

} // namespace
} // namespace workerd::api
12 changes: 11 additions & 1 deletion src/workerd/api/streams/internal.c++
Original file line number Diff line number Diff line change
Expand Up @@ -933,6 +933,9 @@ jsg::Promise<void> WritableStreamInternalController::write(

auto prp = js.newPromiseAndResolver<void>();
increaseCurrentWriteBufferSize(js, byteLength);
KJ_IF_SOME(o, observer) {
o->onChunkEnqueued(byteLength);
}
auto ptr =
kj::ArrayPtr<kj::byte>(static_cast<kj::byte*>(store->Data()) + byteOffset, byteLength);
queue.push_back(
Expand Down Expand Up @@ -1598,6 +1601,9 @@ jsg::Promise<void> WritableStreamInternalController::writeLoopAfterFrontOutputLo
auto& request = check();
maybeResolvePromise(js, request.promise);
decreaseCurrentWriteBufferSize(js, amountToWrite);
KJ_IF_SOME(o, observer) {
o->onChunkDequeued(amountToWrite);
}
queue.pop_front();
maybeAbort(js, request);
return writeLoop(js, IoContext::current());
Expand All @@ -1610,6 +1616,9 @@ jsg::Promise<void> WritableStreamInternalController::writeLoopAfterFrontOutputLo
auto& request = check();
auto& writable = state.get<IoOwn<Writable>>();
decreaseCurrentWriteBufferSize(js, amountToWrite);
KJ_IF_SOME(o, observer) {
o->onChunkDequeued(amountToWrite);
}
maybeRejectPromise<void>(js, request.promise, handle);
queue.pop_front();
if (!maybeAbort(js, request)) {
Expand Down Expand Up @@ -2468,10 +2477,11 @@ kj::Own<ReadableStreamController> newReadableStreamInternalController(

kj::Own<WritableStreamController> newWritableStreamInternalController(IoContext& ioContext,
kj::Own<WritableStreamSink> sink,
kj::Maybe<kj::Own<ByteStreamObserver>> observer,
kj::Maybe<uint64_t> maybeHighWaterMark,
kj::Maybe<jsg::Promise<void>> maybeClosureWaitable) {
return kj::heap<WritableStreamInternalController>(
kj::mv(sink), maybeHighWaterMark, kj::mv(maybeClosureWaitable));
kj::mv(sink), kj::mv(observer), maybeHighWaterMark, kj::mv(maybeClosureWaitable));
}

kj::StringPtr WritableStreamInternalController::jsgGetMemoryName() const {
Expand Down
5 changes: 5 additions & 0 deletions src/workerd/api/streams/internal.h
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
#include "writable.h"

#include <workerd/io/io-context.h>
#include <workerd/io/observer.h>

#include <deque>

Expand Down Expand Up @@ -183,9 +184,11 @@ class WritableStreamInternalController: public WritableStreamController {
explicit WritableStreamInternalController(StreamStates::Errored errored)
: state(kj::mv(errored)) {}
explicit WritableStreamInternalController(kj::Own<WritableStreamSink> writable,
kj::Maybe<kj::Own<ByteStreamObserver>> observer,
kj::Maybe<uint64_t> maybeHighWaterMark = kj::none,
kj::Maybe<jsg::Promise<void>> maybeClosureWaitable = kj::none)
: state(IoContext::current().addObject(kj::heap<Writable>(kj::mv(writable)))),
observer(kj::mv(observer)),
maybeHighWaterMark(maybeHighWaterMark),
maybeClosureWaitable(kj::mv(maybeClosureWaitable)) {}

Expand Down Expand Up @@ -279,6 +282,8 @@ class WritableStreamInternalController: public WritableStreamController {
kj::OneOf<StreamStates::Closed, StreamStates::Errored, IoOwn<Writable>> state;
kj::OneOf<Unlocked, Locked, PipeLocked, WriterLocked> writeState = Unlocked();

kj::Maybe<kj::Own<ByteStreamObserver>> observer;

kj::Maybe<PendingAbort> maybePendingAbort;

uint64_t currentWriteBufferSize = 0;
Expand Down
9 changes: 5 additions & 4 deletions src/workerd/api/streams/transform.c++
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,7 @@ jsg::Ref<TransformStream> TransformStream::constructor(jsg::Lock& js,
.pull = maybeAddFunctor<UnderlyingSource::PullAlgorithm>(
JSG_VISITABLE_LAMBDA((controller = controller.addRef()), (controller),
(jsg::Lock & js, auto c) mutable { return controller->pull(js); })),
.cancel = maybeAddFunctor<UnderlyingSource::CancelAlgorithm>(JSG_VISITABLE_LAMBDA(
.cancel = maybeAddFunctor<UnderlyingSource::CancelAlgorithm>( JSG_VISITABLE_LAMBDA(
(controller = controller.addRef()), (controller),
(jsg::Lock & js, auto reason) mutable { return controller->cancel(js, reason); })),
.expectedLength = transformer.expectedLength.map(
Expand Down Expand Up @@ -123,9 +123,9 @@ jsg::Ref<IdentityTransformStream> IdentityTransformStream::constructor(
KJ_IF_SOME(queuingStrategy, maybeQueuingStrategy) {
maybeHighWaterMark = queuingStrategy.highWaterMark;
}

return jsg::alloc<IdentityTransformStream>(jsg::alloc<ReadableStream>(ioContext, kj::mv(pipe.in)),
jsg::alloc<WritableStream>(ioContext, kj::mv(pipe.out), maybeHighWaterMark));
jsg::alloc<WritableStream>(ioContext, kj::mv(pipe.out),
ioContext.getMetrics().tryCreateWritableByteStreamObserver(), maybeHighWaterMark));
}

jsg::Ref<FixedLengthStream> FixedLengthStream::constructor(jsg::Lock& js,
Expand All @@ -147,7 +147,8 @@ jsg::Ref<FixedLengthStream> FixedLengthStream::constructor(jsg::Lock& js,
}

return jsg::alloc<FixedLengthStream>(jsg::alloc<ReadableStream>(ioContext, kj::mv(pipe.in)),
jsg::alloc<WritableStream>(ioContext, kj::mv(pipe.out), maybeHighWaterMark));
jsg::alloc<WritableStream>(ioContext, kj::mv(pipe.out),
ioContext.getMetrics().tryCreateWritableByteStreamObserver(), maybeHighWaterMark));
}

OneWayPipe newIdentityPipe(kj::Maybe<uint64_t> expectedLength) {
Expand Down
11 changes: 8 additions & 3 deletions src/workerd/api/streams/writable.c++
Original file line number Diff line number Diff line change
Expand Up @@ -216,10 +216,14 @@ void WritableStreamDefaultWriter::visitForGc(jsg::GcVisitor& visitor) {

WritableStream::WritableStream(IoContext& ioContext,
kj::Own<WritableStreamSink> sink,
kj::Maybe<kj::Own<ByteStreamObserver>> maybeObserver,
kj::Maybe<uint64_t> maybeHighWaterMark,
kj::Maybe<jsg::Promise<void>> maybeClosureWaitable)
: WritableStream(newWritableStreamInternalController(
ioContext, kj::mv(sink), maybeHighWaterMark, kj::mv(maybeClosureWaitable))) {}
: WritableStream(newWritableStreamInternalController(ioContext,
kj::mv(sink),
kj::mv(maybeObserver),
maybeHighWaterMark,
kj::mv(maybeClosureWaitable))) {}

WritableStream::WritableStream(kj::Own<WritableStreamController> controller)
: ioContext(tryGetIoContext()),
Expand Down Expand Up @@ -611,7 +615,8 @@ jsg::Ref<WritableStream> WritableStream::deserialize(
auto stream = ioctx.getByteStreamFactory().capnpToKjExplicitEnd(ws.getByteStream());
auto sink = newSystemStream(kj::mv(stream), encoding, ioctx);

return jsg::alloc<WritableStream>(ioctx, kj::mv(sink));
return jsg::alloc<WritableStream>(
ioctx, kj::mv(sink), ioctx.getMetrics().tryCreateWritableByteStreamObserver());
}

void WritableStreamDefaultWriter::visitForMemoryInfo(jsg::MemoryTracker& tracker) const {
Expand Down
1 change: 1 addition & 0 deletions src/workerd/api/streams/writable.h
Original file line number Diff line number Diff line change
Expand Up @@ -101,6 +101,7 @@ class WritableStream: public jsg::Object {
public:
explicit WritableStream(IoContext& ioContext,
kj::Own<WritableStreamSink> sink,
kj::Maybe<kj::Own<ByteStreamObserver>> observer,
kj::Maybe<uint64_t> maybeHighWaterMark = kj::none,
kj::Maybe<jsg::Promise<void>> maybeClosureWaitable = kj::none);

Expand Down
2 changes: 1 addition & 1 deletion src/workerd/io/io-context.h
Original file line number Diff line number Diff line change
Expand Up @@ -480,7 +480,7 @@ class IoContext final: public kj::Refcounted, private kj::TaskSet::ErrorHandler
// immediately if any of these happen:
// - The JS promise is GC'd without resolving.
// - The JS promise is resolved from the wrong context.
// - The system detects that no further process will be made in this context (because there is no
// - The system detects that no further progress will be made in this context (because there is no
// more JavaScript to run, and there is no outstanding I/O scheduled with awaitIo()).
//
// If `T` is `IoOwn<U>`, it will be unwrapped to just `U` in the result. If `U` is in turn
Expand Down
24 changes: 24 additions & 0 deletions src/workerd/io/observer.h
Original file line number Diff line number Diff line change
Expand Up @@ -24,12 +24,30 @@ class TimerChannel;

class WebSocketObserver: public kj::Refcounted {
public:
virtual ~WebSocketObserver() noexcept(false) = default;
// Called when a worker sends a message on this WebSocket (includes close messages).
virtual void sentMessage(size_t bytes) {};
// Called when a worker receives a message on this WebSocket (includes close messages).
virtual void receivedMessage(size_t bytes) {};
};

// Observes a byte stream. Byte streams which use instances of this observer should call enqueue()
// and dequeue() once for each chunk that passes through the stream. The order of enqueues should
// match the order of dequeues.
//
// Byte observer implementations can then calculate the current number of chunks and the sum of the
// size of the chunks in the internal queue by incrementing and decrementing each metric in
// enqueue() and dequeue() respectively.
class ByteStreamObserver {
public:
virtual ~ByteStreamObserver() noexcept(false) = default;
// Called when a chunk of size `bytes` is enqueued on the stream.
virtual void onChunkEnqueued(size_t bytes) {};
// Called when a chunk of size `bytes` is dequeued from the stream (e.g. when a writable byte
// stream writes the chunk to its corresponding sink).
virtual void onChunkDequeued(size_t bytes) {};
};

// Observes a specific request to a specific worker. Also observes outgoing subrequests.
//
// Observing anything is optional. Default implementations of all methods observe nothing.
Expand All @@ -45,6 +63,12 @@ class RequestObserver: public kj::Refcounted {
return kj::none;
};

// This is called when a writable byte stream is created whilst processing this request. It will
// be destroyed when the corresponding byte stream is destroyed.
virtual kj::Maybe<kj::Own<ByteStreamObserver>> tryCreateWritableByteStreamObserver() {
return kj::none;
}

// Invoked when the request is actually delivered.
//
// If, for some reason, this is not invoked before the object is destroyed, this indicate that
Expand Down

0 comments on commit df6d46c

Please sign in to comment.