Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -3,28 +3,76 @@ import { expect, test } from '@playwright/test';
import { INSPECTOR_PORT } from '../playwright.config';

test.describe('Worker V8 isolate memory tests', () => {
test('worker memory is reclaimed after GC', async ({ baseURL }) => {
test('worker memory is stable across request batches', async ({ baseURL }) => {
const profiler = new MemoryProfiler({ port: INSPECTOR_PORT });

// Warm up: make initial requests and let the runtime settle
for (let i = 0; i < 5; i++) {
for (let i = 0; i < 20; i++) {
await fetch(baseURL!);
}

await profiler.connect();

const baselineSnapshot = await profiler.takeHeapSnapshot();
// First batch
for (let i = 0; i < 50; i++) {
const res = await fetch(baseURL!);
expect(res.status).toBe(200);
await res.text();
}

const afterFirstBatch = await profiler.takeHeapSnapshot();

// Second batch
for (let i = 0; i < 50; i++) {
const res = await fetch(baseURL!);
expect(res.status).toBe(200);
await res.text();
}

const finalSnapshot = await profiler.takeHeapSnapshot();
const result = profiler.compareSnapshots(baselineSnapshot, finalSnapshot);
const afterSecondBatch = await profiler.takeHeapSnapshot();

// Compare batches to detect per-request leaks (excludes warm-up effects)
const result = profiler.compareSnapshots(afterFirstBatch, afterSecondBatch);

expect(result.nodeGrowthPercent).toBeLessThan(0.15);

await profiler.close();
});

test('durable object memory is stable across request batches', async ({ baseURL }) => {
const profiler = new MemoryProfiler({ port: INSPECTOR_PORT });

// Warm up: let JIT compile, caches fill, and DO instance stabilize
for (let i = 0; i < 30; i++) {
await fetch(`${baseURL}/pass-to-object/storage/put`);
}

await profiler.connect();

// First batch of requests to the same DO
for (let i = 0; i < 50; i++) {
const res = await fetch(`${baseURL}/pass-to-object/storage/put`);
expect(res.status).toBe(200);
await res.text();
}

const afterFirstBatch = await profiler.takeHeapSnapshot();

// Second batch of requests to the same DO
for (let i = 0; i < 50; i++) {
const res = await fetch(`${baseURL}/pass-to-object/storage/put`);
expect(res.status).toBe(200);
await res.text();
}

const afterSecondBatch = await profiler.takeHeapSnapshot();

// Compare batches to detect per-request leaks (excludes warm-up effects)
// Before fix: makeFlushLock re-wrapped waitUntil on each request = leak
// After fix: growth should be minimal
const result = profiler.compareSnapshots(afterFirstBatch, afterSecondBatch);

expect(result.nodeGrowthPercent).toBeLessThan(1);
expect(result.nodeGrowthPercent).toBeLessThan(0.15);

await profiler.close();
});
Expand Down
73 changes: 63 additions & 10 deletions packages/cloudflare/src/flush.ts
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,17 @@ type FlushLock = {
readonly finalize: () => Promise<void>;
};

type FlushLockRegistry = {
readonly locks: Set<FlushLockInternal>;
};

type FlushLockInternal = FlushLock & {
readonly acquire: () => void;
readonly release: () => void;
};

const flushLockRegistries = new WeakMap<ExecutionContext['waitUntil'], FlushLockRegistry>();

/**
* Enhances the given execution context by wrapping its `waitUntil` method with a proxy
* to monitor pending tasks, and provides a flusher function to ensure all tasks
Expand All @@ -16,27 +27,69 @@ type FlushLock = {
* @return {FlushLock} Returns a flusher function if a valid context is provided, otherwise undefined.
*/
export function makeFlushLock(context: ExecutionContext): FlushLock {
const registry = getOrCreateFlushLockRegistry(context);
let resolveAllDone: () => void = () => undefined;
const allDone = new Promise<void>(res => {
resolveAllDone = res;
});
let pending = 0;

const lock: FlushLockInternal = {
ready: allDone,
acquire: () => {
pending++;
},
release: () => {
if (--pending === 0) {
registry.locks.delete(lock);
resolveAllDone();
}
},
finalize: () => {
if (pending === 0) {
registry.locks.delete(lock);
resolveAllDone();
}
return allDone;
},
};

registry.locks.add(lock);
return Object.freeze(lock);
}

function getOrCreateFlushLockRegistry(context: ExecutionContext): FlushLockRegistry {
// eslint-disable-next-line @typescript-eslint/unbound-method
const waitUntil = context.waitUntil;
const existingRegistry = flushLockRegistries.get(waitUntil);

if (existingRegistry) {
return existingRegistry;
}

const registry: FlushLockRegistry = { locks: new Set() };
const originalWaitUntil = context.waitUntil.bind(context) as typeof context.waitUntil;
context.waitUntil = promise => {
pending++;
const instrumentedWaitUntil: typeof context.waitUntil = promise => {
// Snapshot active locks so locks created after this call do not wait for earlier waitUntil work.
const locks = [...registry.locks];

for (const lock of locks) {
lock.acquire();
}

return originalWaitUntil(
promise.finally(() => {
if (--pending === 0) resolveAllDone();
for (const lock of locks) {
lock.release();
}
}),
);
};
return Object.freeze({
ready: allDone,
finalize: () => {
if (pending === 0) resolveAllDone();
return allDone;
},
});

flushLockRegistries.set(instrumentedWaitUntil, registry);
context.waitUntil = instrumentedWaitUntil;

return registry;
}

/**
Expand Down
47 changes: 47 additions & 0 deletions packages/cloudflare/test/flush.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,53 @@ describe('Flush buffer test', () => {
await Promise.all(waitUntilPromises);
await expect(lock.ready).resolves.toBeUndefined();
});

it('does not grow the waitUntil wrapper stack on repeated flush lock creation', async () => {
const waitUntilPromises: Promise<void>[] = [];
const context: ExecutionContext = {
waitUntil: vi.fn(promise => {
waitUntilPromises.push(promise);
}),
passThroughOnException: vi.fn(),
};

for (let i = 0; i < 20_000; i++) {
makeFlushLock(context);
}

expect(() => context.waitUntil(Promise.resolve())).not.toThrow();
await Promise.all(waitUntilPromises);
});

it('creates a fresh flush lock when waitUntil was already instrumented', async () => {
const waitUntilPromises: Promise<void>[] = [];
const context: ExecutionContext = {
waitUntil: vi.fn(promise => {
waitUntilPromises.push(promise);
}),
passThroughOnException: vi.fn(),
};

const firstLock = makeFlushLock(context);
await firstLock.finalize();

let resolveWaitUntil!: () => void;
const secondTask = new Promise<void>(resolve => {
resolveWaitUntil = resolve;
});
const secondLock = makeFlushLock(context);

context.waitUntil(secondTask);
void secondLock.finalize();

await Promise.resolve();
expect(waitUntilPromises).toHaveLength(1);
await expect(Promise.race([secondLock.ready, Promise.resolve('pending')])).resolves.toBe('pending');

resolveWaitUntil();
await Promise.all(waitUntilPromises);
await expect(secondLock.ready).resolves.toBeUndefined();
});
});

describe('flushAndDispose', () => {
Expand Down
Loading