Skip to content

Commit 18f3b5d

Browse files
authored
Merge pull request #4233 from cloudflare/jphillips/account-stream-memory
Add external memory accounting to commonly used stream API objects
2 parents f490852 + 34a8fc6 commit 18f3b5d

File tree

12 files changed

+61
-32
lines changed

12 files changed

+61
-32
lines changed

src/workerd/api/node/zlib-util.h

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -293,7 +293,7 @@ class ZlibUtil final: public jsg::Object {
293293
class CompressionStream: public jsg::Object {
294294
public:
295295
explicit CompressionStream(
296-
ZlibMode _mode, kj::Own<const jsg::ExternalMemoryTarget>&& externalMemoryTarget)
296+
ZlibMode _mode, kj::Arc<const jsg::ExternalMemoryTarget>&& externalMemoryTarget)
297297
: allocator(kj::mv(externalMemoryTarget)),
298298
context_(_mode) {}
299299
// TODO(soon): Find a way to add noexcept(false) to this destructor.
@@ -361,7 +361,7 @@ class ZlibUtil final: public jsg::Object {
361361
class ZlibStream final: public CompressionStream<ZlibContext> {
362362
public:
363363
explicit ZlibStream(
364-
ZlibMode mode, kj::Own<const jsg::ExternalMemoryTarget>&& externalMemoryTarget)
364+
ZlibMode mode, kj::Arc<const jsg::ExternalMemoryTarget>&& externalMemoryTarget)
365365
: CompressionStream(mode, kj::mv(externalMemoryTarget)) {}
366366
KJ_DISALLOW_COPY_AND_MOVE(ZlibStream);
367367
static jsg::Ref<ZlibStream> constructor(jsg::Lock& js, ZlibModeValue mode);
@@ -388,7 +388,7 @@ class ZlibUtil final: public jsg::Object {
388388
class BrotliCompressionStream: public CompressionStream<CompressionContext> {
389389
public:
390390
explicit BrotliCompressionStream(
391-
ZlibMode _mode, kj::Own<const jsg::ExternalMemoryTarget>&& externalMemoryTarget)
391+
ZlibMode _mode, kj::Arc<const jsg::ExternalMemoryTarget>&& externalMemoryTarget)
392392
: CompressionStream<CompressionContext>(_mode, kj::mv(externalMemoryTarget)) {}
393393
KJ_DISALLOW_COPY_AND_MOVE(BrotliCompressionStream);
394394
static jsg::Ref<BrotliCompressionStream> constructor(jsg::Lock& js, ZlibModeValue mode);

src/workerd/api/streams/compression.c++

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,7 @@
1414

1515
namespace workerd::api {
1616
CompressionAllocator::CompressionAllocator(
17-
kj::Own<const jsg::ExternalMemoryTarget>&& externalMemoryTarget)
17+
kj::Arc<const jsg::ExternalMemoryTarget>&& externalMemoryTarget)
1818
: externalMemoryTarget(kj::mv(externalMemoryTarget)) {}
1919

2020
void CompressionAllocator::configure(z_stream* stream) {
@@ -71,7 +71,7 @@ class Context {
7171
explicit Context(Mode mode,
7272
kj::StringPtr format,
7373
ContextFlags flags,
74-
kj::Own<const jsg::ExternalMemoryTarget>&& externalMemoryTarget)
74+
kj::Arc<const jsg::ExternalMemoryTarget>&& externalMemoryTarget)
7575
: allocator(kj::mv(externalMemoryTarget)),
7676
mode(mode),
7777
strictCompression(flags)
@@ -244,7 +244,7 @@ class CompressionStreamImpl: public kj::Refcounted,
244244
public:
245245
explicit CompressionStreamImpl(kj::String format,
246246
Context::ContextFlags flags,
247-
kj::Own<const jsg::ExternalMemoryTarget>&& externalMemoryTarget)
247+
kj::Arc<const jsg::ExternalMemoryTarget>&& externalMemoryTarget)
248248
: context(mode, format, flags, kj::mv(externalMemoryTarget)) {}
249249

250250
// WritableStreamSink implementation ---------------------------------------------------

src/workerd/api/streams/compression.h

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,7 @@ namespace workerd::api {
1818
// isolate pointer and use that to get the external memory adjustment.
1919
class CompressionAllocator final {
2020
public:
21-
CompressionAllocator(kj::Own<const jsg::ExternalMemoryTarget>&& externalMemoryTarget);
21+
CompressionAllocator(kj::Arc<const jsg::ExternalMemoryTarget>&& externalMemoryTarget);
2222
void configure(z_stream* stream);
2323

2424
static void* AllocForZlib(void* data, uInt items, uInt size);
@@ -31,7 +31,7 @@ class CompressionAllocator final {
3131
kj::Maybe<jsg::ExternalMemoryAdjustment> memoryAdjustment = kj::none;
3232
};
3333

34-
kj::Own<const jsg::ExternalMemoryTarget> externalMemoryTarget;
34+
kj::Arc<const jsg::ExternalMemoryTarget> externalMemoryTarget;
3535
kj::HashMap<void*, Allocation> allocations;
3636
};
3737

src/workerd/api/streams/readable.c++

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -494,7 +494,11 @@ jsg::Ref<ReadableStream> ReadableStream::constructor(jsg::Lock& js,
494494
"To use the new ReadableStream() constructor, enable the "
495495
"streams_enable_constructors compatibility flag. "
496496
"Refer to the docs for more information: https://developers.cloudflare.com/workers/platform/compatibility-dates/#compatibility-flags");
497-
auto stream = js.alloc<ReadableStream>(newReadableStreamJsController());
497+
// We account for the memory usage of the ReadableStream and its controller together because their
498+
// lifetimes are identical and memory accounting itself has a memory overhead.
499+
auto controller = newReadableStreamJsController();
500+
auto stream = js.allocAccounted<ReadableStream>(
501+
sizeof(ReadableStream) + controller->jsgGetMemorySelfSize(), kj::mv(controller));
498502
stream->getController().setup(js, kj::mv(underlyingSource), kj::mv(queuingStrategy));
499503
return kj::mv(stream);
500504
}

src/workerd/api/streams/standard.c++

Lines changed: 19 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -1136,9 +1136,10 @@ kj::Own<typename ReadableImpl<Self>::Consumer> ReadableImpl<Self>::getConsumer(
11361136
// ======================================================================================
11371137

11381138
template <typename Self>
1139-
WritableImpl<Self>::WritableImpl(jsg::Lock& js, WritableStream& owner)
1139+
WritableImpl<Self>::WritableImpl(
1140+
jsg::Lock& js, WritableStream& owner, jsg::Ref<AbortSignal> abortSignal)
11401141
: owner(owner.addWeakRef()),
1141-
signal(js.alloc<AbortSignal>()) {}
1142+
signal(kj::mv(abortSignal)) {}
11421143

11431144
template <typename Self>
11441145
jsg::Promise<void> WritableImpl<Self>::abort(
@@ -2507,14 +2508,21 @@ void ReadableStreamJsController::setup(jsg::Lock& js,
25072508
JSG_REQUIRE(
25082509
autoAllocateChunkSize > 0, TypeError, "The autoAllocateChunkSize option cannot be zero.");
25092510

2510-
state = kj::heap<ByteReadable>(controller.addRef(), *this, autoAllocateChunkSize);
2511+
// We account for the memory usage of the ByteReadable and its controller together because
2512+
// their lifetimes are identical (in practice) and memory accounting itself has a memory
2513+
// overhead. The same applies to ValueReadable below.
2514+
state = kj::heap<ByteReadable>(controller.addRef(), *this, autoAllocateChunkSize)
2515+
.attach(js.getExternalMemoryAdjustment(
2516+
sizeof(ByteReadable) + sizeof(ReadableByteStreamController)));
25112517
controller->start(js);
25122518
} else {
25132519
JSG_REQUIRE(
25142520
type == "", TypeError, kj::str("\"", type, "\" is not a valid type of ReadableStream."));
25152521
auto controller = js.alloc<ReadableStreamDefaultController>(
25162522
kj::mv(underlyingSource), kj::mv(queuingStrategy));
2517-
state = kj::heap<ValueReadable>(controller.addRef(), *this);
2523+
state = kj::heap<ValueReadable>(controller.addRef(), *this)
2524+
.attach(js.getExternalMemoryAdjustment(
2525+
sizeof(ValueReadable) + sizeof(ReadableStreamDefaultController)));
25182526
controller->start(js);
25192527
}
25202528
}
@@ -3172,9 +3180,9 @@ kj::Promise<DeferredProxy<void>> ReadableStreamJsController::pumpTo(
31723180
// ======================================================================================
31733181

31743182
WritableStreamDefaultController::WritableStreamDefaultController(
3175-
jsg::Lock& js, WritableStream& owner)
3183+
jsg::Lock& js, WritableStream& owner, jsg::Ref<AbortSignal> abortSignal)
31763184
: ioContext(tryGetIoContext()),
3177-
impl(js, owner) {}
3185+
impl(js, owner, kj::mv(abortSignal)) {}
31783186

31793187
jsg::Promise<void> WritableStreamDefaultController::abort(
31803188
jsg::Lock& js, v8::Local<v8::Value> reason) {
@@ -3404,7 +3412,11 @@ void WritableStreamJsController::setup(jsg::Lock& js,
34043412
jsg::Optional<StreamQueuingStrategy> maybeQueuingStrategy) {
34053413
auto underlyingSink = kj::mv(maybeUnderlyingSink).orDefault({});
34063414
auto queuingStrategy = kj::mv(maybeQueuingStrategy).orDefault({});
3407-
state = js.alloc<WritableStreamDefaultController>(js, KJ_ASSERT_NONNULL(owner));
3415+
// We account for the memory usage of the WritableStreamDefaultController and AbortSignal together
3416+
// because their lifetimes are identical and memory accounting itself has a memory overhead.
3417+
state = js.allocAccounted<WritableStreamDefaultController>(
3418+
sizeof(WritableStreamDefaultController) + sizeof(AbortSignal), js, KJ_ASSERT_NONNULL(owner),
3419+
jsg::alloc<AbortSignal>());
34083420
state.get<Controller>()->setup(js, kj::mv(underlyingSink), kj::mv(queuingStrategy));
34093421
}
34103422

src/workerd/api/streams/standard.h

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -266,7 +266,7 @@ class WritableImpl {
266266
}
267267
};
268268

269-
WritableImpl(jsg::Lock& js, WritableStream& owner);
269+
WritableImpl(jsg::Lock& js, WritableStream& owner, jsg::Ref<AbortSignal> abortSignal);
270270

271271
jsg::Promise<void> abort(jsg::Lock& js, jsg::Ref<Self> self, v8::Local<v8::Value> reason);
272272

@@ -573,7 +573,8 @@ class WritableStreamDefaultController: public jsg::Object {
573573
public:
574574
using WritableImpl = WritableImpl<WritableStreamDefaultController>;
575575

576-
explicit WritableStreamDefaultController(jsg::Lock& js, WritableStream& owner);
576+
explicit WritableStreamDefaultController(
577+
jsg::Lock& js, WritableStream& owner, jsg::Ref<AbortSignal> abortSignal);
577578

578579
jsg::Promise<void> abort(jsg::Lock& js, v8::Local<v8::Value> reason);
579580

src/workerd/api/streams/writable.c++

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -292,7 +292,11 @@ jsg::Ref<WritableStream> WritableStream::constructor(jsg::Lock& js,
292292
"To use the new WritableStream() constructor, enable the "
293293
"streams_enable_constructors compatibility flag. "
294294
"Refer to the docs for more information: https://developers.cloudflare.com/workers/platform/compatibility-dates/#compatibility-flags");
295-
auto stream = js.alloc<WritableStream>(newWritableStreamJsController());
295+
auto controller = newWritableStreamJsController();
296+
// We account for the memory usage of the WritableStream and its controller together because their
297+
// lifetimes are identical and memory accounting itself has a memory overhead.
298+
auto stream = js.allocAccounted<WritableStream>(
299+
sizeof(WritableStream) + controller->jsgGetMemorySelfSize(), kj::mv(controller));
296300
stream->getController().setup(js, kj::mv(underlyingSink), kj::mv(queuingStrategy));
297301
return kj::mv(stream);
298302
}

src/workerd/jsg/jsg-test.c++

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -453,7 +453,7 @@ KJ_TEST("External memory adjustment") {
453453
}
454454

455455
KJ_TEST("External memory adjustment - defered") {
456-
kj::Own<const ExternalMemoryTarget> target;
456+
kj::Arc<const ExternalMemoryTarget> target;
457457

458458
// A memory allocation that will outlive the isolate
459459
kj::Array<kj::byte> mem;

src/workerd/jsg/jsg.c++

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -361,7 +361,7 @@ void ExternalMemoryTarget::detach() const {
361361
}
362362

363363
ExternalMemoryAdjustment ExternalMemoryTarget::getAdjustment(size_t amount) const {
364-
return ExternalMemoryAdjustment(kj::atomicAddRef(*this), amount);
364+
return ExternalMemoryAdjustment(this->addRefToThis(), amount);
365365
}
366366

367367
void ExternalMemoryTarget::applyDeferredMemoryUpdate() const {
@@ -383,7 +383,7 @@ ExternalMemoryAdjustment Lock::getExternalMemoryAdjustment(int64_t amount) {
383383
return IsolateBase::from(v8Isolate).getExternalMemoryAdjustment(amount);
384384
}
385385

386-
kj::Own<const ExternalMemoryTarget> Lock::getExternalMemoryTarget() {
386+
kj::Arc<const ExternalMemoryTarget> Lock::getExternalMemoryTarget() {
387387
return IsolateBase::from(v8Isolate).getExternalMemoryTarget();
388388
}
389389

@@ -416,7 +416,7 @@ void ExternalMemoryAdjustment::maybeDeferAdjustment(ssize_t amount) {
416416
}
417417

418418
ExternalMemoryAdjustment::ExternalMemoryAdjustment(
419-
kj::Own<const ExternalMemoryTarget> externalMemory, size_t amount)
419+
kj::Arc<const ExternalMemoryTarget> externalMemory, size_t amount)
420420
: externalMemory(kj::mv(externalMemory)) {
421421
maybeDeferAdjustment(amount);
422422
}

src/workerd/jsg/jsg.h

Lines changed: 12 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -2292,7 +2292,8 @@ class ExternalMemoryAdjustment;
22922292
// Each isolate has a singleton `ExternalMemoryTarget`, which all `ExternalMemoryAdjustment`s
22932293
// point to. The only purpose of this object is to hold a weak reference back to the isolate; the
22942294
// reference is nulled out when the isolate is destroyed.
2295-
class ExternalMemoryTarget: public kj::AtomicRefcounted {
2295+
class ExternalMemoryTarget: public kj::AtomicRefcounted,
2296+
public kj::EnableAddRefToThis<ExternalMemoryTarget> {
22962297
public:
22972298
ExternalMemoryTarget(v8::Isolate* isolate): isolate(isolate) {}
22982299

@@ -2329,7 +2330,7 @@ class ExternalMemoryTarget: public kj::AtomicRefcounted {
23292330
// The allocation amount can be adjusted up or down during the lifetime of an object.
23302331
class ExternalMemoryAdjustment final {
23312332
public:
2332-
ExternalMemoryAdjustment(kj::Own<const ExternalMemoryTarget> externalMemory, size_t amount);
2333+
ExternalMemoryAdjustment(kj::Arc<const ExternalMemoryTarget> externalMemory, size_t amount);
23332334
ExternalMemoryAdjustment(ExternalMemoryAdjustment&& other);
23342335
ExternalMemoryAdjustment& operator=(ExternalMemoryAdjustment&& other);
23352336
KJ_DISALLOW_COPY(ExternalMemoryAdjustment);
@@ -2347,7 +2348,7 @@ class ExternalMemoryAdjustment final {
23472348
}
23482349

23492350
private:
2350-
kj::Own<const ExternalMemoryTarget> externalMemory;
2351+
kj::Arc<const ExternalMemoryTarget> externalMemory;
23512352
size_t amount = 0;
23522353

23532354
// If the isolate is locked, adjust the external memory immediately.
@@ -2394,6 +2395,13 @@ class Lock {
23942395
return Ref<T>(kj::refcounted<T>(kj::fwd<Params>(params)...));
23952396
}
23962397

2398+
// Like alloc() but attaches an external memory adjustment of size indicated by `accountedSize`.
2399+
template <typename T, typename... Params>
2400+
Ref<T> allocAccounted(size_t accountedSize, Params&&... params) {
2401+
return Ref<T>(kj::refcounted<T>(kj::fwd<Params>(params)...)
2402+
.attach(getExternalMemoryAdjustment(accountedSize)));
2403+
}
2404+
23972405
// Returns a kj::String with an external memory adjustment attached.
23982406
kj::String accountedKjString(kj::Array<char>&& str);
23992407
kj::String accountedKjString(kj::String&& str) {
@@ -2462,7 +2470,7 @@ class Lock {
24622470
// Used to save a reference to an isolate that is responsible for external memory usage.
24632471
// getAdjustment() can be invoked at any time to create a new RAII adjustment object
24642472
// pointing to this isolate
2465-
kj::Own<const ExternalMemoryTarget> getExternalMemoryTarget();
2473+
kj::Arc<const ExternalMemoryTarget> getExternalMemoryTarget();
24662474

24672475
Value parseJson(kj::ArrayPtr<const char> data);
24682476
Value parseJson(v8::Local<v8::String> text);

src/workerd/jsg/setup.c++

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -229,8 +229,8 @@ void IsolateBase::deferDestruction(Item item) {
229229
queue.lockExclusive()->push(kj::mv(item));
230230
}
231231

232-
kj::Own<const ExternalMemoryTarget> IsolateBase::getExternalMemoryTarget() {
233-
return kj::atomicAddRef(*externalMemoryTarget);
232+
kj::Arc<const ExternalMemoryTarget> IsolateBase::getExternalMemoryTarget() {
233+
return externalMemoryTarget.addRef();
234234
}
235235

236236
void IsolateBase::terminateExecution() const {
@@ -368,7 +368,7 @@ IsolateBase::IsolateBase(V8System& system,
368368
: v8System(system),
369369
cppHeap(newCppHeap(const_cast<V8PlatformWrapper*>(system.platformWrapper.get()))),
370370
ptr(newIsolate(kj::mv(createParams), cppHeap.release(), group)),
371-
externalMemoryTarget(kj::atomicRefcounted<ExternalMemoryTarget>(ptr)),
371+
externalMemoryTarget(kj::arc<ExternalMemoryTarget>(ptr)),
372372
envAsyncContextKey(kj::refcounted<AsyncContextFrame::StorageKey>()),
373373
heapTracer(ptr),
374374
observer(kj::mv(observer)) {

src/workerd/jsg/setup.h

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -207,7 +207,7 @@ class IsolateBase {
207207
}
208208

209209
// Get an object referencing this isolate that can be used to adjust external memory usage later
210-
kj::Own<const ExternalMemoryTarget> getExternalMemoryTarget();
210+
kj::Arc<const ExternalMemoryTarget> getExternalMemoryTarget();
211211

212212
// Equivalent to getExternalMemoryTarget()->getAdjustment(amount), but saves an atomic refcount
213213
// increment and decrement.
@@ -309,7 +309,7 @@ class IsolateBase {
309309
// ExternalMemoryTarget holds a weak reference back to the isolate. ExternalMemoryAjustments
310310
// hold references to the ExternalMemoryTarget. This allows the ExternalMemoryAjustments to
311311
// outlive the isolate.
312-
kj::Own<const ExternalMemoryTarget> externalMemoryTarget;
312+
kj::Arc<const ExternalMemoryTarget> externalMemoryTarget;
313313

314314
// A shared async context key for accessing env
315315
kj::Own<AsyncContextFrame::StorageKey> envAsyncContextKey;

0 commit comments

Comments
 (0)