Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
24 changes: 23 additions & 1 deletion packages/cloudflare/src/flush.ts
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ type FlushLock = {

type FlushLockRegistry = {
readonly locks: Set<FlushLockInternal>;
readonly originalWaitUntil: ExecutionContext['waitUntil'];
Comment thread
cursor[bot] marked this conversation as resolved.
};

type FlushLockInternal = FlushLock & {
Expand All @@ -18,6 +19,27 @@ type FlushLockInternal = FlushLock & {

const flushLockRegistries = new WeakMap<ExecutionContext['waitUntil'], FlushLockRegistry>();

/**
* Returns the original (un-instrumented) waitUntil function for a context.
* This should be used when calling waitUntil with flushAndDispose to avoid deadlock.
*
* The flush lock mechanism wraps context.waitUntil to track pending tasks.
* If we call waitUntil(flushAndDispose(client)) through the instrumented version,
* it creates a deadlock because:
* 1. The instrumented waitUntil acquires the flush lock
* 2. flushAndDispose calls client.flush() which waits for the lock to be released
* 3. The lock won't be released until the waitUntil promise completes
* 4. The waitUntil promise won't complete until flush() returns
*
* By using the original waitUntil for flush operations, we bypass this issue.
*/
export function getOriginalWaitUntil(context: ExecutionContext): ExecutionContext['waitUntil'] | undefined {
// eslint-disable-next-line @typescript-eslint/unbound-method
const currentWaitUntil = context.waitUntil;
const original = flushLockRegistries.get(currentWaitUntil)?.originalWaitUntil;
return original ?? currentWaitUntil;
}

/**
* Enhances the given execution context by wrapping its `waitUntil` method with a proxy
* to monitor pending tasks, and provides a flusher function to ensure all tasks
Expand Down Expand Up @@ -67,8 +89,8 @@ function getOrCreateFlushLockRegistry(context: ExecutionContext): FlushLockRegis
return existingRegistry;
}

const registry: FlushLockRegistry = { locks: new Set() };
const originalWaitUntil = context.waitUntil.bind(context) as typeof context.waitUntil;
const registry: FlushLockRegistry = { locks: new Set(), originalWaitUntil };
const instrumentedWaitUntil: typeof context.waitUntil = promise => {
// Snapshot active locks so locks created after this call do not wait for earlier waitUntil work.
const locks = [...registry.locks];
Expand Down
9 changes: 7 additions & 2 deletions packages/cloudflare/src/request.ts
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ import {
} from '@sentry/core';
import { captureIncomingRequestBody } from './integrations/httpServer';
import type { CloudflareOptions } from './client';
import { flushAndDispose } from './flush';
import { flushAndDispose, getOriginalWaitUntil } from './flush';
import { addCloudResourceContext, addCultureContext, addRequest } from './scope-utils';
import { init } from './sdk';
import { classifyResponseStreaming } from './utils/streaming';
Expand Down Expand Up @@ -47,7 +47,12 @@ export function wrapRequestHandler(
const { options, request, captureErrors = true } = wrapperOptions;
const context = wrapperOptions.context;

const waitUntil = context?.waitUntil?.bind?.(context);
// Use getOriginalWaitUntil to get the un-instrumented waitUntil function.
// This is crucial to avoid deadlock: the flush lock mechanism wraps waitUntil
// to track pending tasks. If we use the instrumented version for flushAndDispose,
// it acquires the lock, then flushAndDispose tries to wait for the same lock,
// creating a deadlock.
const waitUntil = context ? getOriginalWaitUntil(context)?.bind(context) : undefined;

const client = init({ ...options, ctx: context });
isolationScope.setClient(client);
Expand Down
77 changes: 76 additions & 1 deletion packages/cloudflare/test/flush.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ import { type ExecutionContext } from '@cloudflare/workers-types';
import * as sentryCore from '@sentry/core';
import { type Client } from '@sentry/core';
import { describe, expect, it, onTestFinished, vi } from 'vitest';
import { flushAndDispose, makeFlushLock } from '../src/flush';
import { flushAndDispose, getOriginalWaitUntil, makeFlushLock } from '../src/flush';

describe('Flush buffer test', () => {
const waitUntilPromises: Promise<void>[] = [];
Expand Down Expand Up @@ -109,3 +109,78 @@ describe('flushAndDispose', () => {
flushSpy.mockRestore();
});
});

describe('getOriginalWaitUntil', () => {
it('returns the original waitUntil before instrumentation', () => {
const originalWaitUntil = vi.fn();
const context: ExecutionContext = {
waitUntil: originalWaitUntil,
passThroughOnException: vi.fn(),
};

const result = getOriginalWaitUntil(context);
expect(result).toBe(originalWaitUntil);
});

it('returns the original waitUntil after instrumentation', () => {
const originalWaitUntil = vi.fn();
const context: ExecutionContext = {
waitUntil: originalWaitUntil,
passThroughOnException: vi.fn(),
};

makeFlushLock(context);

const result = getOriginalWaitUntil(context);

expect(result).not.toBe(context.waitUntil);
expect(result).toBeDefined();
result!(Promise.resolve());
expect(originalWaitUntil).toHaveBeenCalled();
});

it('returns the original waitUntil after multiple instrumentations', () => {
const originalWaitUntil = vi.fn();
const context: ExecutionContext = {
waitUntil: originalWaitUntil,
passThroughOnException: vi.fn(),
};

makeFlushLock(context);
makeFlushLock(context);
makeFlushLock(context);

const result = getOriginalWaitUntil(context);

expect(result).not.toBe(context.waitUntil);
result!(Promise.resolve());
expect(originalWaitUntil).toHaveBeenCalled();
});

it('allows flushAndDispose to complete when called via original waitUntil', async () => {
const waitUntilPromises: Promise<void>[] = [];
const context: ExecutionContext = {
waitUntil: vi.fn(promise => {
waitUntilPromises.push(promise);
}),
passThroughOnException: vi.fn(),
};

const lock = makeFlushLock(context);

const mockClient = {
flush: vi.fn(async () => {
await lock.finalize();
return true;
}),
dispose: vi.fn(),
} as unknown as Client;

const originalWaitUntil = getOriginalWaitUntil(context);
originalWaitUntil!.call(context, flushAndDispose(mockClient));

await vi.waitFor(() => Promise.all(waitUntilPromises));
expect(mockClient.flush).toHaveBeenCalled();
expect(mockClient.dispose).toHaveBeenCalled();
});
});
Loading