Skip to content

Add external memory accounting to commonly used stream API objects #4233

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 2 commits into from
Jun 9, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 3 additions & 3 deletions src/workerd/api/node/zlib-util.h
Original file line number Diff line number Diff line change
Expand Up @@ -293,7 +293,7 @@ class ZlibUtil final: public jsg::Object {
class CompressionStream: public jsg::Object {
public:
explicit CompressionStream(
ZlibMode _mode, kj::Own<const jsg::ExternalMemoryTarget>&& externalMemoryTarget)
ZlibMode _mode, kj::Arc<const jsg::ExternalMemoryTarget>&& externalMemoryTarget)
: allocator(kj::mv(externalMemoryTarget)),
context_(_mode) {}
// TODO(soon): Find a way to add noexcept(false) to this destructor.
Expand Down Expand Up @@ -361,7 +361,7 @@ class ZlibUtil final: public jsg::Object {
class ZlibStream final: public CompressionStream<ZlibContext> {
public:
explicit ZlibStream(
ZlibMode mode, kj::Own<const jsg::ExternalMemoryTarget>&& externalMemoryTarget)
ZlibMode mode, kj::Arc<const jsg::ExternalMemoryTarget>&& externalMemoryTarget)
: CompressionStream(mode, kj::mv(externalMemoryTarget)) {}
KJ_DISALLOW_COPY_AND_MOVE(ZlibStream);
static jsg::Ref<ZlibStream> constructor(jsg::Lock& js, ZlibModeValue mode);
Expand All @@ -388,7 +388,7 @@ class ZlibUtil final: public jsg::Object {
class BrotliCompressionStream: public CompressionStream<CompressionContext> {
public:
explicit BrotliCompressionStream(
ZlibMode _mode, kj::Own<const jsg::ExternalMemoryTarget>&& externalMemoryTarget)
ZlibMode _mode, kj::Arc<const jsg::ExternalMemoryTarget>&& externalMemoryTarget)
: CompressionStream<CompressionContext>(_mode, kj::mv(externalMemoryTarget)) {}
KJ_DISALLOW_COPY_AND_MOVE(BrotliCompressionStream);
static jsg::Ref<BrotliCompressionStream> constructor(jsg::Lock& js, ZlibModeValue mode);
Expand Down
6 changes: 3 additions & 3 deletions src/workerd/api/streams/compression.c++
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@

namespace workerd::api {
CompressionAllocator::CompressionAllocator(
kj::Own<const jsg::ExternalMemoryTarget>&& externalMemoryTarget)
kj::Arc<const jsg::ExternalMemoryTarget>&& externalMemoryTarget)
: externalMemoryTarget(kj::mv(externalMemoryTarget)) {}

void CompressionAllocator::configure(z_stream* stream) {
Expand Down Expand Up @@ -71,7 +71,7 @@ class Context {
explicit Context(Mode mode,
kj::StringPtr format,
ContextFlags flags,
kj::Own<const jsg::ExternalMemoryTarget>&& externalMemoryTarget)
kj::Arc<const jsg::ExternalMemoryTarget>&& externalMemoryTarget)
: allocator(kj::mv(externalMemoryTarget)),
mode(mode),
strictCompression(flags)
Expand Down Expand Up @@ -244,7 +244,7 @@ class CompressionStreamImpl: public kj::Refcounted,
public:
explicit CompressionStreamImpl(kj::String format,
Context::ContextFlags flags,
kj::Own<const jsg::ExternalMemoryTarget>&& externalMemoryTarget)
kj::Arc<const jsg::ExternalMemoryTarget>&& externalMemoryTarget)
: context(mode, format, flags, kj::mv(externalMemoryTarget)) {}

// WritableStreamSink implementation ---------------------------------------------------
Expand Down
4 changes: 2 additions & 2 deletions src/workerd/api/streams/compression.h
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ namespace workerd::api {
// isolate pointer and use that to get the external memory adjustment.
class CompressionAllocator final {
public:
CompressionAllocator(kj::Own<const jsg::ExternalMemoryTarget>&& externalMemoryTarget);
CompressionAllocator(kj::Arc<const jsg::ExternalMemoryTarget>&& externalMemoryTarget);
void configure(z_stream* stream);

static void* AllocForZlib(void* data, uInt items, uInt size);
Expand All @@ -31,7 +31,7 @@ class CompressionAllocator final {
kj::Maybe<jsg::ExternalMemoryAdjustment> memoryAdjustment = kj::none;
};

kj::Own<const jsg::ExternalMemoryTarget> externalMemoryTarget;
kj::Arc<const jsg::ExternalMemoryTarget> externalMemoryTarget;
kj::HashMap<void*, Allocation> allocations;
};

Expand Down
6 changes: 5 additions & 1 deletion src/workerd/api/streams/readable.c++
Original file line number Diff line number Diff line change
Expand Up @@ -494,7 +494,11 @@ jsg::Ref<ReadableStream> ReadableStream::constructor(jsg::Lock& js,
"To use the new ReadableStream() constructor, enable the "
"streams_enable_constructors compatibility flag. "
"Refer to the docs for more information: https://developers.cloudflare.com/workers/platform/compatibility-dates/#compatibility-flags");
auto stream = js.alloc<ReadableStream>(newReadableStreamJsController());
// We account for the memory usage of the ReadableStream and its controller together because their
// lifetimes are identical and memory accounting itself has a memory overhead.
auto controller = newReadableStreamJsController();
auto stream = js.allocAccounted<ReadableStream>(
sizeof(ReadableStream) + controller->jsgGetMemorySelfSize(), kj::mv(controller));
stream->getController().setup(js, kj::mv(underlyingSource), kj::mv(queuingStrategy));
return kj::mv(stream);
}
Expand Down
26 changes: 19 additions & 7 deletions src/workerd/api/streams/standard.c++
Original file line number Diff line number Diff line change
Expand Up @@ -1136,9 +1136,10 @@ kj::Own<typename ReadableImpl<Self>::Consumer> ReadableImpl<Self>::getConsumer(
// ======================================================================================

template <typename Self>
WritableImpl<Self>::WritableImpl(jsg::Lock& js, WritableStream& owner)
WritableImpl<Self>::WritableImpl(
jsg::Lock& js, WritableStream& owner, jsg::Ref<AbortSignal> abortSignal)
: owner(owner.addWeakRef()),
signal(js.alloc<AbortSignal>()) {}
signal(kj::mv(abortSignal)) {}

template <typename Self>
jsg::Promise<void> WritableImpl<Self>::abort(
Expand Down Expand Up @@ -2507,14 +2508,21 @@ void ReadableStreamJsController::setup(jsg::Lock& js,
JSG_REQUIRE(
autoAllocateChunkSize > 0, TypeError, "The autoAllocateChunkSize option cannot be zero.");

state = kj::heap<ByteReadable>(controller.addRef(), *this, autoAllocateChunkSize);
// We account for the memory usage of the ByteReadable and its controller together because
// their lifetimes are identical (in practice) and memory accounting itself has a memory
// overhead. The same applies to ValueReadable below.
state = kj::heap<ByteReadable>(controller.addRef(), *this, autoAllocateChunkSize)
.attach(js.getExternalMemoryAdjustment(
sizeof(ByteReadable) + sizeof(ReadableByteStreamController)));
controller->start(js);
} else {
JSG_REQUIRE(
type == "", TypeError, kj::str("\"", type, "\" is not a valid type of ReadableStream."));
auto controller = js.alloc<ReadableStreamDefaultController>(
kj::mv(underlyingSource), kj::mv(queuingStrategy));
state = kj::heap<ValueReadable>(controller.addRef(), *this);
state = kj::heap<ValueReadable>(controller.addRef(), *this)
.attach(js.getExternalMemoryAdjustment(
sizeof(ValueReadable) + sizeof(ReadableStreamDefaultController)));
controller->start(js);
}
}
Expand Down Expand Up @@ -3172,9 +3180,9 @@ kj::Promise<DeferredProxy<void>> ReadableStreamJsController::pumpTo(
// ======================================================================================

WritableStreamDefaultController::WritableStreamDefaultController(
jsg::Lock& js, WritableStream& owner)
jsg::Lock& js, WritableStream& owner, jsg::Ref<AbortSignal> abortSignal)
: ioContext(tryGetIoContext()),
impl(js, owner) {}
impl(js, owner, kj::mv(abortSignal)) {}

jsg::Promise<void> WritableStreamDefaultController::abort(
jsg::Lock& js, v8::Local<v8::Value> reason) {
Expand Down Expand Up @@ -3404,7 +3412,11 @@ void WritableStreamJsController::setup(jsg::Lock& js,
jsg::Optional<StreamQueuingStrategy> maybeQueuingStrategy) {
auto underlyingSink = kj::mv(maybeUnderlyingSink).orDefault({});
auto queuingStrategy = kj::mv(maybeQueuingStrategy).orDefault({});
state = js.alloc<WritableStreamDefaultController>(js, KJ_ASSERT_NONNULL(owner));
// We account for the memory usage of the WritableStreamDefaultController and AbortSignal together
// because their lifetimes are identical and memory accounting itself has a memory overhead.
state = js.allocAccounted<WritableStreamDefaultController>(
sizeof(WritableStreamDefaultController) + sizeof(AbortSignal), js, KJ_ASSERT_NONNULL(owner),
jsg::alloc<AbortSignal>());
state.get<Controller>()->setup(js, kj::mv(underlyingSink), kj::mv(queuingStrategy));
}

Expand Down
5 changes: 3 additions & 2 deletions src/workerd/api/streams/standard.h
Original file line number Diff line number Diff line change
Expand Up @@ -266,7 +266,7 @@ class WritableImpl {
}
};

WritableImpl(jsg::Lock& js, WritableStream& owner);
WritableImpl(jsg::Lock& js, WritableStream& owner, jsg::Ref<AbortSignal> abortSignal);

jsg::Promise<void> abort(jsg::Lock& js, jsg::Ref<Self> self, v8::Local<v8::Value> reason);

Expand Down Expand Up @@ -573,7 +573,8 @@ class WritableStreamDefaultController: public jsg::Object {
public:
using WritableImpl = WritableImpl<WritableStreamDefaultController>;

explicit WritableStreamDefaultController(jsg::Lock& js, WritableStream& owner);
explicit WritableStreamDefaultController(
jsg::Lock& js, WritableStream& owner, jsg::Ref<AbortSignal> abortSignal);

jsg::Promise<void> abort(jsg::Lock& js, v8::Local<v8::Value> reason);

Expand Down
6 changes: 5 additions & 1 deletion src/workerd/api/streams/writable.c++
Original file line number Diff line number Diff line change
Expand Up @@ -292,7 +292,11 @@ jsg::Ref<WritableStream> WritableStream::constructor(jsg::Lock& js,
"To use the new WritableStream() constructor, enable the "
"streams_enable_constructors compatibility flag. "
"Refer to the docs for more information: https://developers.cloudflare.com/workers/platform/compatibility-dates/#compatibility-flags");
auto stream = js.alloc<WritableStream>(newWritableStreamJsController());
auto controller = newWritableStreamJsController();
// We account for the memory usage of the WritableStream and its controller together because their
// lifetimes are identical and memory accounting itself has a memory overhead.
auto stream = js.allocAccounted<WritableStream>(
sizeof(WritableStream) + controller->jsgGetMemorySelfSize(), kj::mv(controller));
stream->getController().setup(js, kj::mv(underlyingSink), kj::mv(queuingStrategy));
return kj::mv(stream);
}
Expand Down
2 changes: 1 addition & 1 deletion src/workerd/jsg/jsg-test.c++
Original file line number Diff line number Diff line change
Expand Up @@ -453,7 +453,7 @@ KJ_TEST("External memory adjustment") {
}

KJ_TEST("External memory adjustment - defered") {
kj::Own<const ExternalMemoryTarget> target;
kj::Arc<const ExternalMemoryTarget> target;

// A memory allocation that will outlive the isolate
kj::Array<kj::byte> mem;
Expand Down
6 changes: 3 additions & 3 deletions src/workerd/jsg/jsg.c++
Original file line number Diff line number Diff line change
Expand Up @@ -353,7 +353,7 @@ void ExternalMemoryTarget::detach() const {
}

ExternalMemoryAdjustment ExternalMemoryTarget::getAdjustment(size_t amount) const {
return ExternalMemoryAdjustment(kj::atomicAddRef(*this), amount);
return ExternalMemoryAdjustment(this->addRefToThis(), amount);
}

void ExternalMemoryTarget::applyDeferredMemoryUpdate() const {
Expand All @@ -375,7 +375,7 @@ ExternalMemoryAdjustment Lock::getExternalMemoryAdjustment(int64_t amount) {
return IsolateBase::from(v8Isolate).getExternalMemoryAdjustment(amount);
}

kj::Own<const ExternalMemoryTarget> Lock::getExternalMemoryTarget() {
kj::Arc<const ExternalMemoryTarget> Lock::getExternalMemoryTarget() {
return IsolateBase::from(v8Isolate).getExternalMemoryTarget();
}

Expand Down Expand Up @@ -408,7 +408,7 @@ void ExternalMemoryAdjustment::maybeDeferAdjustment(ssize_t amount) {
}

ExternalMemoryAdjustment::ExternalMemoryAdjustment(
kj::Own<const ExternalMemoryTarget> externalMemory, size_t amount)
kj::Arc<const ExternalMemoryTarget> externalMemory, size_t amount)
: externalMemory(kj::mv(externalMemory)) {
maybeDeferAdjustment(amount);
}
Expand Down
16 changes: 12 additions & 4 deletions src/workerd/jsg/jsg.h
Original file line number Diff line number Diff line change
Expand Up @@ -2269,7 +2269,8 @@ class ExternalMemoryAdjustment;
// Each isolate has a singleton `ExternalMemoryTarget`, which all `ExternalMemoryAdjustment`s
// point to. The only purpose of this object is to hold a weak reference back to the isolate; the
// reference is nulled out when the isolate is destroyed.
class ExternalMemoryTarget: public kj::AtomicRefcounted {
class ExternalMemoryTarget: public kj::AtomicRefcounted,
public kj::EnableAddRefToThis<ExternalMemoryTarget> {
public:
ExternalMemoryTarget(v8::Isolate* isolate): isolate(isolate) {}

Expand Down Expand Up @@ -2306,7 +2307,7 @@ class ExternalMemoryTarget: public kj::AtomicRefcounted {
// The allocation amount can be adjusted up or down during the lifetime of an object.
class ExternalMemoryAdjustment final {
public:
ExternalMemoryAdjustment(kj::Own<const ExternalMemoryTarget> externalMemory, size_t amount);
ExternalMemoryAdjustment(kj::Arc<const ExternalMemoryTarget> externalMemory, size_t amount);
ExternalMemoryAdjustment(ExternalMemoryAdjustment&& other);
ExternalMemoryAdjustment& operator=(ExternalMemoryAdjustment&& other);
KJ_DISALLOW_COPY(ExternalMemoryAdjustment);
Expand All @@ -2324,7 +2325,7 @@ class ExternalMemoryAdjustment final {
}

private:
kj::Own<const ExternalMemoryTarget> externalMemory;
kj::Arc<const ExternalMemoryTarget> externalMemory;
size_t amount = 0;

// If the isolate is locked, adjust the external memory immediately.
Expand Down Expand Up @@ -2371,6 +2372,13 @@ class Lock {
return Ref<T>(kj::refcounted<T>(kj::fwd<Params>(params)...));
}

// Like alloc() but attaches an external memory adjustment of size indicated by `accountedSize`.
template <typename T, typename... Params>
Ref<T> allocAccounted(size_t accountedSize, Params&&... params) {
return Ref<T>(kj::refcounted<T>(kj::fwd<Params>(params)...)
.attach(getExternalMemoryAdjustment(accountedSize)));
}

// Returns a kj::String with an external memory adjustment attached.
kj::String accountedKjString(kj::Array<char>&& str);
kj::String accountedKjString(kj::String&& str) {
Expand Down Expand Up @@ -2439,7 +2447,7 @@ class Lock {
// Used to save a reference to an isolate that is responsible for external memory usage.
// getAdjustment() can be invoked at any time to create a new RAII adjustment object
// pointing to this isolate
kj::Own<const ExternalMemoryTarget> getExternalMemoryTarget();
kj::Arc<const ExternalMemoryTarget> getExternalMemoryTarget();

Value parseJson(kj::ArrayPtr<const char> data);
Value parseJson(v8::Local<v8::String> text);
Expand Down
6 changes: 3 additions & 3 deletions src/workerd/jsg/setup.c++
Original file line number Diff line number Diff line change
Expand Up @@ -229,8 +229,8 @@ void IsolateBase::deferDestruction(Item item) {
queue.lockExclusive()->push(kj::mv(item));
}

kj::Own<const ExternalMemoryTarget> IsolateBase::getExternalMemoryTarget() {
return kj::atomicAddRef(*externalMemoryTarget);
kj::Arc<const ExternalMemoryTarget> IsolateBase::getExternalMemoryTarget() {
return externalMemoryTarget.addRef();
}

void IsolateBase::terminateExecution() const {
Expand Down Expand Up @@ -368,7 +368,7 @@ IsolateBase::IsolateBase(V8System& system,
: v8System(system),
cppHeap(newCppHeap(const_cast<V8PlatformWrapper*>(system.platformWrapper.get()))),
ptr(newIsolate(kj::mv(createParams), cppHeap.release(), group)),
externalMemoryTarget(kj::atomicRefcounted<ExternalMemoryTarget>(ptr)),
externalMemoryTarget(kj::arc<ExternalMemoryTarget>(ptr)),
envAsyncContextKey(kj::refcounted<AsyncContextFrame::StorageKey>()),
heapTracer(ptr),
observer(kj::mv(observer)) {
Expand Down
4 changes: 2 additions & 2 deletions src/workerd/jsg/setup.h
Original file line number Diff line number Diff line change
Expand Up @@ -207,7 +207,7 @@ class IsolateBase {
}

// Get an object referencing this isolate that can be used to adjust external memory usage later
kj::Own<const ExternalMemoryTarget> getExternalMemoryTarget();
kj::Arc<const ExternalMemoryTarget> getExternalMemoryTarget();

// Equivalent to getExternalMemoryTarget()->getAdjustment(amount), but saves an atomic refcount
// increment and decrement.
Expand Down Expand Up @@ -298,7 +298,7 @@ class IsolateBase {
// ExternalMemoryTarget holds a weak reference back to the isolate. ExternalMemoryAjustments
// hold references to the ExternalMemoryTarget. This allows the ExternalMemoryAjustments to
// outlive the isolate.
kj::Own<const ExternalMemoryTarget> externalMemoryTarget;
kj::Arc<const ExternalMemoryTarget> externalMemoryTarget;

// A shared async context key for accessing env
kj::Own<AsyncContextFrame::StorageKey> envAsyncContextKey;
Expand Down