diff --git a/.changeset/hibernation-awareness-fix.md b/.changeset/hibernation-awareness-fix.md new file mode 100644 index 00000000..3905dcba --- /dev/null +++ b/.changeset/hibernation-awareness-fix.md @@ -0,0 +1,24 @@ +--- +"y-partyserver": minor +--- + +Fix Yjs hibernation support and awareness propagation + +**Server:** + +- Replace in-memory `WSSharedDoc.conns` Map with `connection.setState()` and `getConnections()` so connection tracking survives Durable Object hibernation +- Move event handler registration from `WSSharedDoc` constructor into `onStart()` to use `getConnections()` for broadcasting +- Disable awareness protocol's built-in `_checkInterval` in `WSSharedDoc` constructor to prevent timers from defeating hibernation +- On `onStart`, send sync step 1 to all existing connections so clients re-sync the server's document after hibernation wake-up +- Simplify `send()` — no longer forcibly closes connections on failure +- Remove `closeConn()` helper; awareness cleanup now happens in `onClose` via persisted connection state +- Widen `onLoad()` return type to `Promise` to allow seeding the document from a returned YDoc + +**Provider:** + +- Switch awareness event listener from `"update"` to `"change"` so clock-only heartbeat renewals do not produce network traffic (allows DO hibernation during idle sessions) +- Disable awareness protocol's built-in `_checkInterval` on the client to stop 15-second clock renewals and 30-second peer timeout removal +- Remove provider's own `_checkInterval` liveness timer (was coupled to the awareness heartbeat) +- Clear stale awareness meta for remote clients on WebSocket close so reconnecting clients' awareness updates are accepted +- Bump awareness clock on reconnect to ensure remote peers accept the update +- Fix bug where `host.slice(0, -1)` result was not assigned, so trailing slashes were never stripped diff --git a/.gitignore b/.gitignore index f6b646bf..6670af25 100644 --- a/.gitignore +++ b/.gitignore @@ -131,6 +131,7 @@ dist .wrangler +.wrangler-persist-* .DS_Store .env .dev.vars diff --git a/fixtures/monaco-yjs/src/server.ts b/fixtures/monaco-yjs/src/server.ts index ea66a22c..7c137dcc 100644 --- a/fixtures/monaco-yjs/src/server.ts +++ b/fixtures/monaco-yjs/src/server.ts @@ -1,7 +1,17 @@ import { routePartykitRequest } from "partyserver"; import { YServer } from "y-partyserver"; -export { YServer as MonacoServer }; +// export { YServer as MonacoServer }; + +export class MonacoServer extends YServer { + static options = { + hibernate: true + }; + async onStart(): Promise { + console.log("onStart"); + await super.onStart(); + } +} export default { async fetch(request: Request, env: Env): Promise { diff --git a/package-lock.json b/package-lock.json index d5eaae35..1072fb7c 100644 --- a/package-lock.json +++ b/package-lock.json @@ -12235,7 +12235,7 @@ "devDependencies": { "@cloudflare/workers-types": "^4.20251218.0", "hono": "^4.11.1", - "partyserver": ">=0.3.0" + "partyserver": "^0.3.0" }, "peerDependencies": { "@cloudflare/workers-types": "^4.20240729.0", @@ -12260,7 +12260,7 @@ "license": "ISC", "dependencies": { "nanoid": "^5.1.6", - "partysocket": "^1.1.14" + "partysocket": "^1.1.15" } }, "packages/partyhard": { @@ -12336,7 +12336,7 @@ "devDependencies": { "@cloudflare/workers-types": "^4.20251218.0", "partyserver": ">=0.2.0 <1.0.0", - "partysocket": "^1.1.14" + "partysocket": "^1.1.15" }, "peerDependencies": { "@cloudflare/workers-types": "^4.20240729.0", diff --git a/package.json b/package.json index 1fe8cc56..14cc5d12 100644 --- a/package.json +++ b/package.json @@ -3,6 +3,13 @@ "version": "0.0.0", "private": true, "description": "Everything's better with friends", + "license": "ISC", + "author": "Sunil Pai ", + "workspaces": [ + "packages/*", + "fixtures/*" + ], + "type": "module", "scripts": { "build": "npm run build -w partyserver -w partysocket -w y-partyserver -w partysub -w partyfn -w partysync -w partywhen -w partytracks -w hono-party && tsx scripts/check-exports.ts", "check": "npm run check:repo && npm run check:format && npm run check:lint && npm run check:type && npm run check:test", @@ -10,16 +17,10 @@ "check:format": "prettier . --check --ignore-unknown", "check:lint": "biome check", "check:repo": "sherif", - "check:test": "npm run check:test -w partyserver -w partysocket -w partysub -w partywhen -w partytracks", + "check:test": "npm run check:test -w partyserver -w partysocket -w partysub -w partywhen -w partytracks -w y-partyserver", "check:type": "tsx scripts/typecheck.ts", "all": "npm i && npm run build && npm run check" }, - "author": "Sunil Pai ", - "license": "ISC", - "workspaces": [ - "packages/*", - "fixtures/*" - ], "devDependencies": { "@biomejs/biome": "^2.3.10", "@changesets/changelog-github": "^0.5.2", @@ -46,18 +47,17 @@ "wrangler": "^4.56.0" }, "overrides": { - "esbuild": "0.25.0", "@types/node": "25.0.3", + "esbuild": "0.25.0", "prosemirror-model": "1.22.2", "react": "19.2.3", "react-dom": "19.2.3" }, + "packageManager": "npm@11.7.0", "trustedDependencies": [ "@biomejs/biome", "core-js", "esbuild", "workerd" - ], - "packageManager": "npm@11.7.0", - "type": "module" + ] } diff --git a/packages/hono-party/package.json b/packages/hono-party/package.json index b1702fb8..16af9459 100644 --- a/packages/hono-party/package.json +++ b/packages/hono-party/package.json @@ -37,6 +37,6 @@ "devDependencies": { "@cloudflare/workers-types": "^4.20251218.0", "hono": "^4.11.1", - "partyserver": ">=0.3.0" + "partyserver": "^0.3.0" } } diff --git a/packages/partyfn/package.json b/packages/partyfn/package.json index bef838cd..2a972458 100644 --- a/packages/partyfn/package.json +++ b/packages/partyfn/package.json @@ -19,7 +19,7 @@ ], "dependencies": { "nanoid": "^5.1.6", - "partysocket": "^1.1.14" + "partysocket": "^1.1.15" }, "scripts": { "build": "tsx scripts/build.ts" diff --git a/packages/partysub/package.json b/packages/partysub/package.json index 355b1ba6..bdbe7790 100644 --- a/packages/partysub/package.json +++ b/packages/partysub/package.json @@ -47,6 +47,6 @@ "devDependencies": { "@cloudflare/workers-types": "^4.20251218.0", "partyserver": ">=0.2.0 <1.0.0", - "partysocket": "^1.1.14" + "partysocket": "^1.1.15" } } diff --git a/packages/y-partyserver/package.json b/packages/y-partyserver/package.json index a3707f1a..0fb36d8d 100644 --- a/packages/y-partyserver/package.json +++ b/packages/y-partyserver/package.json @@ -1,11 +1,23 @@ { "name": "y-partyserver", "version": "2.0.0", + "description": "", + "keywords": [ + "collaboration", + "text-editors", + "yjs" + ], + "homepage": "https://github.com/cloudflare/partykit/tree/main/packages/y-partyserver", + "license": "ISC", + "author": "Sunil Pai ", "repository": { "type": "git", "url": "git://github.com/cloudflare/partykit.git" }, - "homepage": "https://github.com/cloudflare/partykit/tree/main/packages/y-partyserver", + "files": [ + "dist", + "README.md" + ], "type": "module", "exports": { ".": { @@ -25,36 +37,28 @@ } }, "scripts": { - "build": "tsx scripts/build.ts" + "build": "tsx scripts/build.ts", + "check:test": "vitest -r src/tests --watch false", + "test": "vitest -r src/tests", + "test:integration": "vitest -r src/tests --config vitest.integration.config.ts --watch false", + "test:hibernation": "vitest -r src/tests --config vitest.hibernation.config.ts --watch false" }, - "files": [ - "dist", - "README.md" - ], - "keywords": [ - "yjs", - "collaboration", - "text-editors" - ], - "author": "Sunil Pai ", - "license": "ISC", - "description": "", "dependencies": { "lib0": "^0.2.115", "lodash.debounce": "^4.0.8", "nanoid": "^5.1.6", "y-protocols": "^1.0.7" }, - "peerDependencies": { - "@cloudflare/workers-types": "^4.20240729.0", - "partyserver": ">=0.2.0 <1.0.0", - "yjs": "^13.6.14" - }, "devDependencies": { "@cloudflare/workers-types": "^4.20251218.0", "@types/lodash.debounce": "^4.0.9", "partyserver": ">=0.2.0 <1.0.0", "ws": "^8.18.3", "yjs": "^13.6.28" + }, + "peerDependencies": { + "@cloudflare/workers-types": "^4.20240729.0", + "partyserver": ">=0.2.0 <1.0.0", + "yjs": "^13.6.14" } } diff --git a/packages/y-partyserver/src/provider/index.ts b/packages/y-partyserver/src/provider/index.ts index 735f8699..2ce5d80d 100644 --- a/packages/y-partyserver/src/provider/index.ts +++ b/packages/y-partyserver/src/provider/index.ts @@ -19,12 +19,6 @@ export const messageAuth = 2; // Disable BroadcastChannel by default in Cloudflare Workers / Node const DEFAULT_DISABLE_BC = typeof window === "undefined"; -function assert(condition: unknown, message: string): asserts condition { - if (!condition) { - throw new Error(message); - } -} - const messageHandlers: Array< ( encoder: encoding.Encoder, @@ -101,9 +95,6 @@ messageHandlers[messageAuth] = ( ); }; -// @todo - this should depend on awareness.outdatedTime -const messageReconnectTimeout = 30000; - function permissionDeniedHandler(provider: WebsocketProvider, reason: string) { console.warn(`Permission denied to access ${provider.url}.\n${reason}`); } @@ -165,13 +156,19 @@ function setupWS(provider: WebsocketProvider) { provider.wsconnected = false; provider.synced = false; // update awareness (all users except local left) + const removedClients = Array.from( + provider.awareness.getStates().keys() + ).filter((client) => client !== provider.doc.clientID); awarenessProtocol.removeAwarenessStates( provider.awareness, - Array.from(provider.awareness.getStates().keys()).filter( - (client) => client !== provider.doc.clientID - ), + removedClients, provider ); + // Clear stale meta for remote clients so their awareness + // updates are accepted on reconnect (clock check starts fresh) + for (const clientID of removedClients) { + provider.awareness.meta.delete(clientID); + } provider.emit("status", [ { status: "disconnected" @@ -208,6 +205,9 @@ function setupWS(provider: WebsocketProvider) { websocket.send(encoding.toUint8Array(encoder)); // broadcast local awareness state if (provider.awareness.getLocalState() !== null) { + // Re-set local state to bump the awareness clock, ensuring + // remote clients accept the update even if they have stale meta + provider.awareness.setLocalState(provider.awareness.getLocalState()); const encoderAwarenessState = encoding.createEncoder(); encoding.writeVarUint(encoderAwarenessState, messageAwareness); encoding.writeVarUint8Array( @@ -281,7 +281,6 @@ export class WebsocketProvider extends Observable { _updateHandler: (update: Uint8Array, origin: unknown) => void; _awarenessUpdateHandler: (update: AwarenessUpdate, origin: unknown) => void; _unloadHandler: () => void; - _checkInterval: ReturnType | number; constructor( serverUrl: string, @@ -398,19 +397,23 @@ export class WebsocketProvider extends Observable { ) { process.on("exit", this._unloadHandler); } - awareness.on("update", this._awarenessUpdateHandler); - this._checkInterval = /** @type {any} */ setInterval(() => { - if ( - this.wsconnected && - messageReconnectTimeout < - time.getUnixTime() - this.wsLastMessageReceived - ) { - assert(this.ws !== null, "ws must not be null"); - // no message received in a long time - not even your own awareness - // updates (which are updated every 15 seconds) - this.ws.close(); - } - }, messageReconnectTimeout / 10); + // Listen on 'change' (not 'update') so that clock-only awareness + // renewals (the 15-second heartbeat) do NOT produce network traffic. + // Only actual state changes (cursor moved, name changed, etc.) are sent. + // This allows Durable Objects to hibernate when sessions are idle. + awareness.on("change", this._awarenessUpdateHandler); + + // Disable the awareness protocol's built-in check interval. + // It renews the local clock every 15s (causing wire traffic that defeats + // DO hibernation) and removes remote peers after 30s of inactivity. + // We handle peer cleanup via WebSocket close events instead. + clearInterval( + ( + awareness as unknown as { + _checkInterval: ReturnType; + } + )._checkInterval + ); if (connect) { this.connect(); } @@ -435,7 +438,6 @@ export class WebsocketProvider extends Observable { if (this._resyncInterval !== 0) { clearInterval(this._resyncInterval); } - clearInterval(this._checkInterval); this.disconnect(); if (typeof window !== "undefined") { window.removeEventListener("unload", this._unloadHandler); @@ -445,7 +447,7 @@ export class WebsocketProvider extends Observable { ) { process.off("exit", this._unloadHandler); } - this.awareness.off("update", this._awarenessUpdateHandler); + this.awareness.off("change", this._awarenessUpdateHandler); this.doc.off("update", this._updateHandler); super.destroy(); } @@ -567,7 +569,7 @@ export default class YProvider extends WebsocketProvider { // strip trailing slash from host if any if (host.endsWith("/")) { - host.slice(0, -1); + host = host.slice(0, -1); } const serverUrl = `${ diff --git a/packages/y-partyserver/src/server/index.ts b/packages/y-partyserver/src/server/index.ts index 38e41f2b..9cbe1ed3 100644 --- a/packages/y-partyserver/src/server/index.ts +++ b/packages/y-partyserver/src/server/index.ts @@ -37,71 +37,57 @@ const messageAwareness = 1; // biome-ignore lint/correctness/noUnusedVariables: it's fine const messageAuth = 2; -function updateHandler(update: Uint8Array, _origin: unknown, doc: WSSharedDoc) { - const encoder = encoding.createEncoder(); - encoding.writeVarUint(encoder, messageSync); - syncProtocol.writeUpdate(encoder, update); - const message = encoding.toUint8Array(encoder); - doc.conns.forEach((_, conn) => { - send(doc, conn, message); - }); +/** + * Internal key used in connection.setState() to track which awareness + * client IDs are controlled by each connection. This survives hibernation + * because connection state is persisted to WebSocket attachments. + */ +const AWARENESS_IDS_KEY = "__ypsAwarenessIds"; + +type YServerConnectionState = { + [AWARENESS_IDS_KEY]?: number[]; + [key: string]: unknown; +}; + +function getAwarenessIds(conn: Connection): number[] { + try { + const state = conn.state as YServerConnectionState | null; + return state?.[AWARENESS_IDS_KEY] ?? []; + } catch { + return []; + } +} + +function setAwarenessIds(conn: Connection, ids: number[]): void { + try { + conn.setState((prev: YServerConnectionState | null) => ({ + ...prev, + [AWARENESS_IDS_KEY]: ids + })); + } catch { + // ignore — may fail if connection is already closed + } } class WSSharedDoc extends YDoc { - conns: Map>; awareness: awarenessProtocol.Awareness; constructor() { super({ gc: true }); - - /** - * Maps from conn to set of controlled user ids. Delete all user ids from awareness when this conn is closed - */ - this.conns = new Map(); - this.awareness = new awarenessProtocol.Awareness(this); this.awareness.setLocalState(null); - const awarenessChangeHandler = ( - { - added, - updated, - removed - }: { - added: Array; - updated: Array; - removed: Array; - }, - conn: Connection | null // Origin is the connection that made the change - ) => { - const changedClients = added.concat(updated, removed); - if (conn !== null) { - const connControlledIDs = - /** @type {Set} */ this.conns.get(conn); - if (connControlledIDs !== undefined) { - added.forEach((clientID) => { - connControlledIDs.add(clientID); - }); - removed.forEach((clientID) => { - connControlledIDs.delete(clientID); - }); + // Disable the awareness protocol's built-in check interval. + // It renews the local clock every 15s and removes peers after 30s, + // but we handle peer cleanup via onClose instead. Clearing it here + // prevents it from defeating Durable Object hibernation. + clearInterval( + ( + this.awareness as unknown as { + _checkInterval: ReturnType; } - } - // broadcast awareness update - const encoder = encoding.createEncoder(); - encoding.writeVarUint(encoder, messageAwareness); - encoding.writeVarUint8Array( - encoder, - awarenessProtocol.encodeAwarenessUpdate(this.awareness, changedClients) - ); - const buff = encoding.toUint8Array(encoder); - this.conns.forEach((_, c) => { - send(this, c, buff); - }); - }; - this.awareness.on("update", awarenessChangeHandler); - // @ts-expect-error - TODO: fix this - this.on("update", updateHandler); + )._checkInterval + ); } } @@ -136,35 +122,18 @@ function readSyncMessage( return messageType; } -function closeConn(doc: WSSharedDoc, conn: Connection): void { - if (doc.conns.has(conn)) { - const controlledIds: Set = doc.conns.get(conn)!; - doc.conns.delete(conn); - awarenessProtocol.removeAwarenessStates( - doc.awareness, - Array.from(controlledIds), - null - ); - } - try { - conn.close(); - } catch (e) { - console.warn("failed to close connection", e); - } -} - -function send(doc: WSSharedDoc, conn: Connection, m: Uint8Array) { +function send(conn: Connection, m: Uint8Array): void { if ( conn.readyState !== undefined && conn.readyState !== wsReadyStateConnecting && conn.readyState !== wsReadyStateOpen ) { - closeConn(doc, conn); + return; } try { conn.send(m); - } catch (_e) { - closeConn(doc, conn); + } catch { + // connection is broken, ignore } } @@ -182,7 +151,7 @@ export class YServer< #ParentClass: typeof YServer = Object.getPrototypeOf(this).constructor; readonly document: WSSharedDoc = new WSSharedDoc(); - async onLoad(): Promise { + async onLoad(): Promise { // to be implemented by the user return; } @@ -260,6 +229,70 @@ export class YServer< applyUpdate(this.document, state); } + // Broadcast doc updates to all connections. + // Uses this.getConnections() which works for both hibernate and non-hibernate + // modes and survives DO hibernation (unlike an in-memory Map). + this.document.on("update", (update: Uint8Array) => { + const encoder = encoding.createEncoder(); + encoding.writeVarUint(encoder, messageSync); + syncProtocol.writeUpdate(encoder, update); + const message = encoding.toUint8Array(encoder); + for (const conn of this.getConnections()) { + send(conn, message); + } + }); + + // Track which awareness clientIDs each connection controls. + // Stored in connection.setState() so it survives hibernation. + // When conn is null (internal changes like removeAwarenessStates on close), + // broadcast the update to remaining connections. + // When conn is non-null (client message), handleMessage broadcasts directly. + this.document.awareness.on( + "update", + ( + { + added, + updated, + removed + }: { + added: Array; + updated: Array; + removed: Array; + }, + conn: Connection | null + ) => { + if (conn !== null) { + // Track which clientIDs this connection controls + try { + const currentIds = new Set(getAwarenessIds(conn)); + for (const clientID of added) currentIds.add(clientID); + for (const clientID of removed) currentIds.delete(clientID); + setAwarenessIds(conn, [...currentIds]); + } catch (_e) { + // ignore — best-effort tracking + } + } else { + // Internal awareness change (e.g. removeAwarenessStates on close) + // — broadcast to all remaining connections + const changedClients = added.concat(updated, removed); + const encoder = encoding.createEncoder(); + encoding.writeVarUint(encoder, messageAwareness); + encoding.writeVarUint8Array( + encoder, + awarenessProtocol.encodeAwarenessUpdate( + this.document.awareness, + changedClients + ) + ); + const buff = encoding.toUint8Array(encoder); + for (const c of this.getConnections()) { + send(c, buff); + } + } + } + ); + + // Debounced persistence handler this.document.on( "update", debounce( @@ -281,6 +314,18 @@ export class YServer< } ) ); + + // After hibernation wake-up, the doc is empty but existing connections + // survive. Re-sync by sending sync step 1 to all connections — they'll + // respond with sync step 2 containing their full state. + // On first start there are no connections, so this is a no-op. + const syncEncoder = encoding.createEncoder(); + encoding.writeVarUint(syncEncoder, messageSync); + syncProtocol.writeSyncStep1(syncEncoder, this.document); + const syncMessage = encoding.toUint8Array(syncEncoder); + for (const conn of this.getConnections()) { + send(conn, syncMessage); + } } // biome-ignore lint/correctness/noUnusedFunctionParameters: so autocomplete works @@ -334,23 +379,23 @@ export class YServer< excludeConnection?: Connection ): void { const formattedMessage = `__YPS:${message}`; - this.document.conns.forEach((_, conn) => { + for (const conn of this.getConnections()) { if (excludeConnection && conn === excludeConnection) { - return; + continue; } if ( conn.readyState !== undefined && conn.readyState !== wsReadyStateConnecting && conn.readyState !== wsReadyStateOpen ) { - return; + continue; } try { conn.send(formattedMessage); } catch (e) { console.warn("Failed to broadcast custom message", e); } - }); + } } handleMessage(connection: Connection, message: WSMessage) { @@ -396,15 +441,24 @@ export class YServer< // message, there is no need to send the message. When `encoder` only // contains the type of reply, its length is 1. if (encoding.length(encoder) > 1) { - send(this.document, connection, encoding.toUint8Array(encoder)); + send(connection, encoding.toUint8Array(encoder)); } break; case messageAwareness: { + const awarenessData = decoding.readVarUint8Array(decoder); awarenessProtocol.applyAwarenessUpdate( this.document.awareness, - decoding.readVarUint8Array(decoder), + awarenessData, connection ); + // Forward raw awareness bytes to all connections + const awarenessEncoder = encoding.createEncoder(); + encoding.writeVarUint(awarenessEncoder, messageAwareness); + encoding.writeVarUint8Array(awarenessEncoder, awarenessData); + const awarenessBuff = encoding.toUint8Array(awarenessEncoder); + for (const c of this.getConnections()) { + send(c, awarenessBuff); + } break; } } @@ -425,7 +479,16 @@ export class YServer< _reason: string, _wasClean: boolean ): void | Promise { - closeConn(this.document, connection); + // Read controlled awareness clientIDs from connection state + // (survives hibernation unlike an in-memory Map) + const controlledIds = getAwarenessIds(connection); + if (controlledIds.length > 0) { + awarenessProtocol.removeAwarenessStates( + this.document.awareness, + controlledIds, + null + ); + } } // TODO: explore why onError gets triggered when a connection closes @@ -434,15 +497,14 @@ export class YServer< conn: Connection, _ctx: ConnectionContext ): void | Promise { - // conn.binaryType = "arraybuffer"; // from y-websocket, breaks in our runtime - - this.document.conns.set(conn, new Set()); + // Note: awareness IDs are lazily initialized when the first awareness + // message is received — no need to call setAwarenessIds(conn, []) here // send sync step 1 const encoder = encoding.createEncoder(); encoding.writeVarUint(encoder, messageSync); syncProtocol.writeSyncStep1(encoder, this.document); - send(this.document, conn, encoding.toUint8Array(encoder)); + send(conn, encoding.toUint8Array(encoder)); const awarenessStates = this.document.awareness.getStates(); if (awarenessStates.size > 0) { const encoder = encoding.createEncoder(); @@ -454,7 +516,7 @@ export class YServer< Array.from(awarenessStates.keys()) ) ); - send(this.document, conn, encoding.toUint8Array(encoder)); + send(conn, encoding.toUint8Array(encoder)); } } } diff --git a/packages/y-partyserver/src/tests/global-setup.ts b/packages/y-partyserver/src/tests/global-setup.ts new file mode 100644 index 00000000..90373c05 --- /dev/null +++ b/packages/y-partyserver/src/tests/global-setup.ts @@ -0,0 +1,78 @@ +import { spawn } from "node:child_process"; +import type { ChildProcess } from "node:child_process"; +import path from "node:path"; + +const PORT = 8799; +let wranglerProcess: ChildProcess | null = null; + +async function waitForServer(url: string, timeoutMs = 30000): Promise { + const start = Date.now(); + while (Date.now() - start < timeoutMs) { + try { + const res = await fetch(url); + if (res.ok || res.status === 404) { + return; + } + } catch { + // Server not ready yet + } + await new Promise((r) => setTimeout(r, 200)); + } + throw new Error(`Server at ${url} did not start within ${timeoutMs}ms`); +} + +export async function setup() { + const testDir = path.dirname(new URL(import.meta.url).pathname); + + // Start wrangler dev + wranglerProcess = spawn( + "npx", + [ + "wrangler", + "dev", + "--config", + path.join(testDir, "integration-wrangler.jsonc"), + "--port", + String(PORT), + "--no-show-interactive-dev-session" + ], + { + cwd: testDir, + stdio: ["pipe", "pipe", "pipe"], + env: { + ...process.env, + // Suppress interactive prompts + BROWSER: "none" + } + } + ); + + // Log wrangler output for debugging + wranglerProcess.stdout?.on("data", (data: Buffer) => { + const msg = data.toString(); + if (process.env.DEBUG) { + process.stderr.write(`[wrangler] ${msg}`); + } + }); + wranglerProcess.stderr?.on("data", (data: Buffer) => { + const msg = data.toString(); + if (process.env.DEBUG) { + process.stderr.write(`[wrangler:err] ${msg}`); + } + }); + + // Wait for the server to be ready + await waitForServer(`http://localhost:${PORT}/`); +} + +export async function teardown() { + if (wranglerProcess) { + wranglerProcess.kill("SIGTERM"); + // Give it a moment to shut down gracefully + await new Promise((r) => setTimeout(r, 500)); + if (wranglerProcess.exitCode === null) { + wranglerProcess.kill("SIGKILL"); + } + wranglerProcess = null; + } +} diff --git a/packages/y-partyserver/src/tests/hibernation.test.ts b/packages/y-partyserver/src/tests/hibernation.test.ts new file mode 100644 index 00000000..f6479831 --- /dev/null +++ b/packages/y-partyserver/src/tests/hibernation.test.ts @@ -0,0 +1,675 @@ +import WebSocket from "ws"; +import * as Y from "yjs"; +import { afterAll, afterEach, beforeAll, describe, expect, it } from "vitest"; + +import YProvider from "../provider/index"; +import { WranglerServer } from "./server-harness"; + +const PORT = 8801; +const HOST = `localhost:${PORT}`; +const server = new WranglerServer(PORT); + +// Track providers for cleanup +const providers: YProvider[] = []; + +function createProvider( + room: string, + options: { party?: string; doc?: Y.Doc } = {} +): YProvider { + const doc = options.doc ?? new Y.Doc(); + const provider = new YProvider(HOST, room, doc, { + party: options.party ?? "y-basic", + connect: true, + WebSocketPolyfill: WebSocket as unknown as typeof globalThis.WebSocket, + disableBc: true, + // Faster reconnection for restart tests + maxBackoffTime: 1000 + }); + providers.push(provider); + return provider; +} + +function waitForSync(provider: YProvider, timeoutMs = 20000): Promise { + return new Promise((resolve, reject) => { + if (provider.synced) { + resolve(); + return; + } + const timeout = setTimeout( + () => reject(new Error("Timed out waiting for sync")), + timeoutMs + ); + provider.on("synced", () => { + clearTimeout(timeout); + resolve(); + }); + }); +} + +/** + * Wait for a provider to go through disconnect → reconnect → sync. + * Handles the case where the provider is still connected (hasn't + * detected the server kill yet) or already disconnected. + */ +function waitForReconnectAndSync( + provider: YProvider, + timeoutMs = 30000 +): Promise { + return new Promise((resolve, reject) => { + const timeout = setTimeout( + () => + reject( + new Error("Timed out waiting for reconnect and sync after restart") + ), + timeoutMs + ); + + let sawDisconnect = !provider.wsconnected; + + const statusHandler = (event: { status: string }) => { + if (event.status === "disconnected") { + sawDisconnect = true; + } + }; + provider.on("status", statusHandler); + + const syncHandler = () => { + if (sawDisconnect) { + clearTimeout(timeout); + provider.off("status", statusHandler); + provider.off("synced", syncHandler); + resolve(); + } + }; + provider.on("synced", syncHandler); + + // If already disconnected and re-synced, resolve immediately + if (sawDisconnect && provider.synced) { + clearTimeout(timeout); + provider.off("status", statusHandler); + provider.off("synced", syncHandler); + resolve(); + } + }); +} + +function destroyProvider(provider: YProvider): void { + try { + provider.destroy(); + } catch { + // ignore + } + const idx = providers.indexOf(provider); + if (idx >= 0) providers.splice(idx, 1); +} + +beforeAll(async () => { + server.cleanup(); + await server.start(); +}, 60000); + +afterEach(() => { + for (const p of providers) { + try { + p.destroy(); + } catch { + // ignore + } + } + providers.length = 0; +}); + +afterAll(async () => { + await server.stop(); + server.cleanup(); +}, 30000); + +// --------------------------------------------------------------------------- +// All test DOs in worker.ts use `static options = { hibernate: true }` +// --------------------------------------------------------------------------- + +describe("Server restart — non-persistent YServer (hibernate: true)", () => { + it("connected client re-syncs its local state to server after restart", async () => { + const room = `restart-basic-${Date.now()}`; + + // Connect and write data + const doc = new Y.Doc(); + const provider = createProvider(room, { doc }); + await waitForSync(provider); + + doc.getText("shared").insert(0, "survive-restart"); + await new Promise((r) => setTimeout(r, 300)); + + // Set up listener BEFORE restart to avoid race condition + const reconnect = waitForReconnectAndSync(provider); + + // Kill and restart the server — all server-side in-memory state is lost + await server.restart(); + + // Provider auto-reconnects and re-syncs its local state to the new server + await reconnect; + + // Client still has its local data + expect(doc.getText("shared").toString()).toBe("survive-restart"); + + // A second client connecting should get the re-synced state + const doc2 = new Y.Doc(); + const provider2 = createProvider(room, { doc: doc2 }); + await waitForSync(provider2); + await new Promise((r) => setTimeout(r, 500)); + + expect(doc2.getText("shared").toString()).toBe("survive-restart"); + }); + + it("data is lost if no client is connected to re-sync after restart", async () => { + const room = `restart-lost-${Date.now()}`; + + // Connect, write data, then disconnect before restart + const doc = new Y.Doc(); + const provider = createProvider(room, { doc }); + await waitForSync(provider); + + doc.getText("shared").insert(0, "will-be-lost"); + await new Promise((r) => setTimeout(r, 300)); + + // Destroy the provider — no client will be around to re-sync + destroyProvider(provider); + + // Restart server + await server.restart(); + + // New client gets empty doc — no persistence, no re-sync source + const doc2 = new Y.Doc(); + const provider2 = createProvider(room, { doc: doc2 }); + await waitForSync(provider2); + await new Promise((r) => setTimeout(r, 500)); + + expect(doc2.getText("shared").toString()).toBe(""); + }); + + it("two connected clients both survive restart and stay converged", async () => { + const room = `restart-two-${Date.now()}`; + + const docA = new Y.Doc(); + const providerA = createProvider(room, { doc: docA }); + await waitForSync(providerA); + + const docB = new Y.Doc(); + const providerB = createProvider(room, { doc: docB }); + await waitForSync(providerB); + await new Promise((r) => setTimeout(r, 300)); + + // Both write + docA.getText("shared").insert(0, "A-data"); + await new Promise((r) => setTimeout(r, 200)); + docB.getText("shared").insert(docB.getText("shared").length, "+B-data"); + await new Promise((r) => setTimeout(r, 500)); + + // Verify convergence before restart + const beforeA = docA.getText("shared").toString(); + const beforeB = docB.getText("shared").toString(); + expect(beforeA).toBe(beforeB); + expect(beforeA).toContain("A-data"); + expect(beforeA).toContain("B-data"); + + // Set up reconnection listeners BEFORE restart to avoid race condition + const reconnectA = waitForReconnectAndSync(providerA); + const reconnectB = waitForReconnectAndSync(providerB); + + await server.restart(); + + await Promise.all([reconnectA, reconnectB]); + await new Promise((r) => setTimeout(r, 1000)); + + // Both should still have the same content + expect(docA.getText("shared").toString()).toContain("A-data"); + expect(docA.getText("shared").toString()).toContain("B-data"); + expect(docB.getText("shared").toString()).toContain("A-data"); + expect(docB.getText("shared").toString()).toContain("B-data"); + // And they should still be converged + expect(docA.getText("shared").toString()).toBe( + docB.getText("shared").toString() + ); + }); + + it("clients can continue collaborating after restart", async () => { + const room = `restart-collab-${Date.now()}`; + + const docA = new Y.Doc(); + const providerA = createProvider(room, { doc: docA }); + await waitForSync(providerA); + + const docB = new Y.Doc(); + const providerB = createProvider(room, { doc: docB }); + await waitForSync(providerB); + await new Promise((r) => setTimeout(r, 300)); + + docA.getText("shared").insert(0, "before"); + await new Promise((r) => setTimeout(r, 500)); + + // Set up reconnection listeners BEFORE restart + const reconnectA = waitForReconnectAndSync(providerA); + const reconnectB = waitForReconnectAndSync(providerB); + + await server.restart(); + + await Promise.all([reconnectA, reconnectB]); + await new Promise((r) => setTimeout(r, 500)); + + // Write AFTER restart + const updateReceived = new Promise((resolve) => { + docB.on("update", () => { + if (docB.getText("shared").toString().includes("after")) { + resolve(); + } + }); + }); + docA.getText("shared").insert(docA.getText("shared").length, "-and-after"); + + await updateReceived; + + expect(docB.getText("shared").toString()).toContain("before"); + expect(docB.getText("shared").toString()).toContain("after"); + }); +}); + +describe("Server restart — persistent YServer (hibernate: true)", () => { + it("restores state from storage when no client re-syncs", async () => { + const room = `restart-persist-${Date.now()}`; + + // Connect to persistent server, write data + const doc = new Y.Doc(); + const provider = createProvider(room, { doc, party: "y-persistent" }); + await waitForSync(provider); + + doc.getText("shared").insert(0, "persisted-across-restart"); + + // Wait for debounced onSave + await new Promise((r) => setTimeout(r, 500)); + + // Destroy provider — no client to re-sync + destroyProvider(provider); + + // Restart server + await server.restart(); + + // New client should get state from persistence (onLoad) + const doc2 = new Y.Doc(); + const provider2 = createProvider(room, { + doc: doc2, + party: "y-persistent" + }); + await waitForSync(provider2); + await new Promise((r) => setTimeout(r, 1000)); + + expect(doc2.getText("shared").toString()).toBe("persisted-across-restart"); + }); + + it("connected client + persistence both work after restart", async () => { + const room = `restart-persist-conn-${Date.now()}`; + + const doc = new Y.Doc(); + const provider = createProvider(room, { doc, party: "y-persistent" }); + await waitForSync(provider); + + doc.getText("shared").insert(0, "persistent-and-connected"); + await new Promise((r) => setTimeout(r, 500)); + + // Set up listener BEFORE restart + const reconnect = waitForReconnectAndSync(provider); + + // Restart with client still connected + await server.restart(); + + await reconnect; + await new Promise((r) => setTimeout(r, 500)); + + expect(doc.getText("shared").toString()).toBe("persistent-and-connected"); + + // Second client also sees the data + const doc2 = new Y.Doc(); + const provider2 = createProvider(room, { + doc: doc2, + party: "y-persistent" + }); + await waitForSync(provider2); + await new Promise((r) => setTimeout(r, 1000)); + + expect(doc2.getText("shared").toString()).toBe("persistent-and-connected"); + }); + + it("survives two consecutive restarts", async () => { + const room = `restart-persist-twice-${Date.now()}`; + + // Session 1: write data + const doc1 = new Y.Doc(); + const provider1 = createProvider(room, { + doc: doc1, + party: "y-persistent" + }); + await waitForSync(provider1); + + doc1.getText("shared").insert(0, "round-1"); + await new Promise((r) => setTimeout(r, 500)); + destroyProvider(provider1); + + // First restart + await server.restart(); + + // Session 2: verify + write more + const doc2 = new Y.Doc(); + const provider2 = createProvider(room, { + doc: doc2, + party: "y-persistent" + }); + await waitForSync(provider2); + await new Promise((r) => setTimeout(r, 1000)); + + expect(doc2.getText("shared").toString()).toBe("round-1"); + + doc2.getText("shared").insert(doc2.getText("shared").length, "+round-2"); + await new Promise((r) => setTimeout(r, 500)); + destroyProvider(provider2); + + // Second restart + await server.restart(); + + // Session 3: verify both rounds persisted + const doc3 = new Y.Doc(); + const provider3 = createProvider(room, { + doc: doc3, + party: "y-persistent" + }); + await waitForSync(provider3); + await new Promise((r) => setTimeout(r, 1000)); + + expect(doc3.getText("shared").toString()).toBe("round-1+round-2"); + }); +}); + +describe("Server restart — awareness (hibernate: true)", () => { + it("local awareness state survives restart on the provider", async () => { + const room = `restart-awareness-local-${Date.now()}`; + + const doc = new Y.Doc(); + const provider = createProvider(room, { doc }); + await waitForSync(provider); + + provider.awareness.setLocalState({ user: { name: "Alice" } }); + await new Promise((r) => setTimeout(r, 200)); + + const reconnect = waitForReconnectAndSync(provider); + await server.restart(); + await reconnect; + + // The provider's local awareness state persists across restart + // (it's stored on the provider, not the server) + const local = provider.awareness.getLocalState() as { + user: { name: string }; + }; + expect(local).toBeDefined(); + expect(local.user.name).toBe("Alice"); + }); + + it("awareness automatically re-propagates after restart (no explicit re-trigger)", async () => { + const room = `restart-awareness-auto-${Date.now()}`; + + const docA = new Y.Doc(); + const providerA = createProvider(room, { doc: docA }); + await waitForSync(providerA); + + const docB = new Y.Doc(); + const providerB = createProvider(room, { doc: docB }); + await waitForSync(providerB); + await new Promise((r) => setTimeout(r, 300)); + + // Set awareness before restart + providerA.awareness.setLocalState({ user: { name: "Alice" } }); + + // Wait for B to see it + await new Promise((resolve) => { + const check = () => { + if (providerB.awareness.getStates().get(docA.clientID)) { + resolve(); + } else { + setTimeout(check, 100); + } + }; + check(); + }); + + // Set up reconnection listeners BEFORE restart + const reconnectA = waitForReconnectAndSync(providerA); + const reconnectB = waitForReconnectAndSync(providerB); + + await server.restart(); + + await Promise.all([reconnectA, reconnectB]); + + // Do NOT explicitly re-set awareness — the provider should automatically + // re-broadcast with a bumped clock on reconnect, and the receiving + // provider should accept it because stale meta was cleared on close. + await new Promise((resolve, reject) => { + const timeout = setTimeout( + () => + reject( + new Error( + "Timed out waiting for automatic awareness re-propagation" + ) + ), + 10000 + ); + const check = () => { + const state = providerB.awareness.getStates().get(docA.clientID); + if (state) { + clearTimeout(timeout); + resolve(); + } else { + setTimeout(check, 200); + } + }; + check(); + }); + + const stateA = providerB.awareness.getStates().get(docA.clientID) as { + user: { name: string }; + }; + expect(stateA.user.name).toBe("Alice"); + }); +}); + +describe("Server restart — custom messages (hibernate: true)", () => { + it("custom messages work after restart", async () => { + const room = `restart-custom-${Date.now()}`; + + const provider = createProvider(room, { party: "y-custom-message" }); + await waitForSync(provider); + + // Verify custom messages work before restart + const pong1 = new Promise((resolve) => { + provider.on("custom-message", (msg: string) => resolve(msg)); + }); + provider.sendMessage(JSON.stringify({ action: "ping" })); + expect(JSON.parse(await pong1)).toEqual({ action: "pong" }); + + // Set up listener BEFORE restart + const reconnect = waitForReconnectAndSync(provider); + + await server.restart(); + await reconnect; + + // Custom messages should still work after restart + const pong2 = new Promise((resolve) => { + provider.on("custom-message", (msg: string) => resolve(msg)); + }); + provider.sendMessage(JSON.stringify({ action: "ping" })); + expect(JSON.parse(await pong2)).toEqual({ action: "pong" }); + }); +}); + +describe("Server restart — onStart re-sync (hibernate: true)", () => { + it("onStartCount increments on each restart", async () => { + const room = `tracker-restart-${Date.now()}`; + + const doc = new Y.Doc(); + const provider = createProvider(room, { + doc, + party: "y-hibernate-tracker" + }); + await waitForSync(provider); + + // Initial onStart should have been called once + const res1 = await fetch( + `http://localhost:${PORT}/parties/y-hibernate-tracker/${room}` + ); + const data1 = (await res1.json()) as { onStartCount: number }; + expect(data1.onStartCount).toBe(1); + + doc.getText("shared").insert(0, "before-restart"); + await new Promise((r) => setTimeout(r, 300)); + + const reconnect = waitForReconnectAndSync(provider); + await server.restart(); + await reconnect; + + // After restart, onStartCount should be 2 + const res2 = await fetch( + `http://localhost:${PORT}/parties/y-hibernate-tracker/${room}` + ); + const data2 = (await res2.json()) as { onStartCount: number }; + expect(data2.onStartCount).toBe(2); + + // Data should have survived via client re-sync + expect(doc.getText("shared").toString()).toBe("before-restart"); + + // New client should also see the re-synced data + const doc2 = new Y.Doc(); + const provider2 = createProvider(room, { + doc: doc2, + party: "y-hibernate-tracker" + }); + await waitForSync(provider2); + await new Promise((r) => setTimeout(r, 500)); + expect(doc2.getText("shared").toString()).toBe("before-restart"); + }); + + it("multiple clients re-sync and stay converged after restart", async () => { + const room = `tracker-multi-restart-${Date.now()}`; + + const docA = new Y.Doc(); + const providerA = createProvider(room, { + doc: docA, + party: "y-hibernate-tracker" + }); + await waitForSync(providerA); + + const docB = new Y.Doc(); + const providerB = createProvider(room, { + doc: docB, + party: "y-hibernate-tracker" + }); + await waitForSync(providerB); + await new Promise((r) => setTimeout(r, 300)); + + // Both edit + docA.getText("shared").insert(0, "A-data"); + await new Promise((r) => setTimeout(r, 200)); + docB.getText("shared").insert(docB.getText("shared").length, "+B-data"); + await new Promise((r) => setTimeout(r, 500)); + + const beforeA = docA.getText("shared").toString(); + expect(beforeA).toBe(docB.getText("shared").toString()); + + const reconnectA = waitForReconnectAndSync(providerA); + const reconnectB = waitForReconnectAndSync(providerB); + await server.restart(); + await Promise.all([reconnectA, reconnectB]); + await new Promise((r) => setTimeout(r, 1000)); + + // Both should still have the same converged content + expect(docA.getText("shared").toString()).toContain("A-data"); + expect(docA.getText("shared").toString()).toContain("B-data"); + expect(docA.getText("shared").toString()).toBe( + docB.getText("shared").toString() + ); + + // Continue collaborating after restart + const updateReceived = new Promise((resolve) => { + docB.on("update", () => { + if (docB.getText("shared").toString().includes("post-restart")) { + resolve(); + } + }); + }); + docA + .getText("shared") + .insert(docA.getText("shared").length, "+post-restart"); + await updateReceived; + + expect(docB.getText("shared").toString()).toContain("post-restart"); + }); + + it("new client gets re-synced state after restart", async () => { + const room = `tracker-newclient-restart-${Date.now()}`; + + const doc = new Y.Doc(); + const provider = createProvider(room, { + doc, + party: "y-hibernate-tracker" + }); + await waitForSync(provider); + + doc.getText("shared").insert(0, "existing-data"); + await new Promise((r) => setTimeout(r, 300)); + + const reconnect = waitForReconnectAndSync(provider); + await server.restart(); + await reconnect; + + // New client connects after restart — should get state + // re-synced from the surviving provider + const doc2 = new Y.Doc(); + const provider2 = createProvider(room, { + doc: doc2, + party: "y-hibernate-tracker" + }); + await waitForSync(provider2); + await new Promise((r) => setTimeout(r, 500)); + + expect(doc2.getText("shared").toString()).toBe("existing-data"); + }); +}); + +describe("Server restart — seeded documents (hibernate: true)", () => { + it("onLoad-returned YDoc is re-seeded after restart", async () => { + const room = `restart-seeded-${Date.now()}`; + + // YOnLoadReturnsDoc seeds "seeded-content" on every onLoad + const doc = new Y.Doc(); + const provider = createProvider(room, { + doc, + party: "y-on-load-returns-doc" + }); + await waitForSync(provider); + await new Promise((r) => setTimeout(r, 500)); + + expect(doc.getText("shared").toString()).toBe("seeded-content"); + + // Destroy the provider + destroyProvider(provider); + + // Restart + await server.restart(); + + // New client should still get the seeded content (onLoad runs again) + const doc2 = new Y.Doc(); + const provider2 = createProvider(room, { + doc: doc2, + party: "y-on-load-returns-doc" + }); + await waitForSync(provider2); + await new Promise((r) => setTimeout(r, 500)); + + expect(doc2.getText("shared").toString()).toBe("seeded-content"); + }); +}); diff --git a/packages/y-partyserver/src/tests/index.test.ts b/packages/y-partyserver/src/tests/index.test.ts new file mode 100644 index 00000000..ac237c62 --- /dev/null +++ b/packages/y-partyserver/src/tests/index.test.ts @@ -0,0 +1,998 @@ +import { createExecutionContext, env } from "cloudflare:test"; +import * as encoding from "lib0/encoding"; +import * as decoding from "lib0/decoding"; +import * as syncProtocol from "y-protocols/sync"; +import * as awarenessProtocol from "y-protocols/awareness"; +import * as Y from "yjs"; +import { describe, expect, it } from "vitest"; + +import worker from "./worker"; + +import type { Env } from "./worker"; + +declare module "cloudflare:test" { + interface ProvidedEnv extends Env {} +} + +// --------------------------------------------------------------------------- +// Yjs protocol constants (must match server) +// --------------------------------------------------------------------------- +const messageSync = 0; +const messageAwareness = 1; + +// --------------------------------------------------------------------------- +// Helpers +// --------------------------------------------------------------------------- + +/** Create a WebSocket upgrade request for the given DO path */ +function wsRequest(path: string): Request { + return new Request(`http://example.com/parties/${path}`, { + headers: { Upgrade: "websocket" } + }); +} + +/** Create a regular HTTP request for the given DO path */ +function httpRequest(path: string): Request { + return new Request(`http://example.com/parties/${path}`); +} + +/** Accept a WebSocket from a fetch response */ +function acceptWs(response: Response): WebSocket { + const ws = response.webSocket!; + ws.accept(); + return ws; +} + +/** Wait for the next binary message on a WebSocket */ +function nextBinaryMessage(ws: WebSocket): Promise { + return new Promise((resolve, reject) => { + const timeout = setTimeout( + () => reject(new Error("Timed out waiting for binary message")), + 5000 + ); + const handler = (event: MessageEvent) => { + if (event.data instanceof ArrayBuffer) { + clearTimeout(timeout); + ws.removeEventListener("message", handler); + resolve(event.data); + } + // skip string messages, keep listening + }; + ws.addEventListener("message", handler); + }); +} + +/** Wait for the next string message on a WebSocket */ +function nextStringMessage(ws: WebSocket): Promise { + return new Promise((resolve, reject) => { + const timeout = setTimeout( + () => reject(new Error("Timed out waiting for string message")), + 5000 + ); + const handler = (event: MessageEvent) => { + if (typeof event.data === "string") { + clearTimeout(timeout); + ws.removeEventListener("message", handler); + resolve(event.data); + } + // skip binary messages, keep listening + }; + ws.addEventListener("message", handler); + }); +} + +/** Collect all messages for a short period */ +function collectMessages( + ws: WebSocket, + durationMs: number +): Promise> { + return new Promise((resolve) => { + const messages: Array = []; + const handler = (event: MessageEvent) => { + messages.push(event.data as string | ArrayBuffer); + }; + ws.addEventListener("message", handler); + setTimeout(() => { + ws.removeEventListener("message", handler); + resolve(messages); + }, durationMs); + }); +} + +/** + * Perform the Yjs sync handshake on a WebSocket. + * Reads the server's sync step 1, responds with sync step 2 + our sync step 1, + * then reads the server's sync step 2. + */ +async function performSync(ws: WebSocket, doc: Y.Doc): Promise { + // The server sends sync step 1 on connect — read it + const msg1 = await nextBinaryMessage(ws); + const decoder1 = decoding.createDecoder(new Uint8Array(msg1)); + const msgType1 = decoding.readVarUint(decoder1); + expect(msgType1).toBe(messageSync); + + // Process server's sync step 1 and generate our response + const encoder1 = encoding.createEncoder(); + encoding.writeVarUint(encoder1, messageSync); + syncProtocol.readSyncMessage(decoder1, encoder1, doc, null); + + // Send our sync step 2 (response to server's step 1) + if (encoding.length(encoder1) > 1) { + ws.send(encoding.toUint8Array(encoder1)); + } + + // Also send our sync step 1 + const encoder2 = encoding.createEncoder(); + encoding.writeVarUint(encoder2, messageSync); + syncProtocol.writeSyncStep1(encoder2, doc); + ws.send(encoding.toUint8Array(encoder2)); +} + +/** + * Send a Yjs update to the server via the WebSocket + */ +function sendUpdate(ws: WebSocket, update: Uint8Array): void { + const encoder = encoding.createEncoder(); + encoding.writeVarUint(encoder, messageSync); + syncProtocol.writeUpdate(encoder, update); + ws.send(encoding.toUint8Array(encoder)); +} + +/** + * Apply any incoming binary Yjs messages to a local doc. + * Returns the number of messages applied. + */ +function applyIncomingMessages( + messages: Array, + doc: Y.Doc +): number { + let applied = 0; + for (const msg of messages) { + if (!(msg instanceof ArrayBuffer)) continue; + const decoder = decoding.createDecoder(new Uint8Array(msg)); + const msgType = decoding.readVarUint(decoder); + if (msgType === messageSync) { + const responseEncoder = encoding.createEncoder(); + encoding.writeVarUint(responseEncoder, messageSync); + syncProtocol.readSyncMessage(decoder, responseEncoder, doc, null); + applied++; + } + } + return applied; +} + +// =========================================================================== +// Tests +// =========================================================================== + +describe("YServer — basic sync", () => { + it("accepts a WebSocket connection and sends sync step 1", async () => { + const ctx = createExecutionContext(); + const response = await worker.fetch(wsRequest("y-basic/room1"), env, ctx); + expect(response.status).toBe(101); + const ws = acceptWs(response); + + // Server should send sync step 1 immediately + const msg = await nextBinaryMessage(ws); + const decoder = decoding.createDecoder(new Uint8Array(msg)); + const msgType = decoding.readVarUint(decoder); + expect(msgType).toBe(messageSync); + + // The inner message should be sync step 1 + const syncMsgType = decoding.readVarUint(decoder); + expect(syncMsgType).toBe(syncProtocol.messageYjsSyncStep1); + + ws.close(); + }); + + it("syncs a document between two clients", async () => { + const ctx = createExecutionContext(); + const roomName = "sync-two-clients"; + + // --- Client A connects and inserts text --- + const resA = await worker.fetch(wsRequest(`y-basic/${roomName}`), env, ctx); + const wsA = acceptWs(resA); + const docA = new Y.Doc(); + + await performSync(wsA, docA); + + // Insert text on client A + docA.getText("shared").insert(0, "hello from A"); + + // docA will have generated an update — send it to the server + const updateA = Y.encodeStateAsUpdate(docA); + sendUpdate(wsA, updateA); + + // Give the server a moment to process + await new Promise((r) => setTimeout(r, 100)); + + // --- Client B connects and should receive A's state --- + const resB = await worker.fetch(wsRequest(`y-basic/${roomName}`), env, ctx); + const wsB = acceptWs(resB); + const docB = new Y.Doc(); + + await performSync(wsB, docB); + + // Collect messages for a bit to get sync step 2 from server + const messagesB = await collectMessages(wsB, 200); + applyIncomingMessages(messagesB, docB); + + expect(docB.getText("shared").toString()).toBe("hello from A"); + + wsA.close(); + wsB.close(); + }); + + it("broadcasts updates from one client to another", async () => { + const ctx = createExecutionContext(); + const roomName = "broadcast-test"; + + // Connect client A + const resA = await worker.fetch(wsRequest(`y-basic/${roomName}`), env, ctx); + const wsA = acceptWs(resA); + const docA = new Y.Doc(); + await performSync(wsA, docA); + + // Connect client B + const resB = await worker.fetch(wsRequest(`y-basic/${roomName}`), env, ctx); + const wsB = acceptWs(resB); + const docB = new Y.Doc(); + await performSync(wsB, docB); + + // Drain any initial sync messages on B + await collectMessages(wsB, 100); + + // Client A inserts text — capture the incremental update + const updatePromise = new Promise((resolve) => { + docA.on("update", (update: Uint8Array) => { + resolve(update); + }); + }); + docA.getText("shared").insert(0, "broadcast!"); + const incrementalUpdate = await updatePromise; + + // Send just the incremental update to the server + sendUpdate(wsA, incrementalUpdate); + + // Client B should receive the broadcast + const messagesB = await collectMessages(wsB, 300); + applyIncomingMessages(messagesB, docB); + + expect(docB.getText("shared").toString()).toBe("broadcast!"); + + wsA.close(); + wsB.close(); + }); +}); + +describe("YServer — awareness", () => { + it("broadcasts awareness updates between clients", async () => { + const ctx = createExecutionContext(); + const roomName = "awareness-test"; + + // Connect client A + const resA = await worker.fetch(wsRequest(`y-basic/${roomName}`), env, ctx); + const wsA = acceptWs(resA); + const docA = new Y.Doc(); + const awarenessA = new awarenessProtocol.Awareness(docA); + + await performSync(wsA, docA); + + // Connect client B + const resB = await worker.fetch(wsRequest(`y-basic/${roomName}`), env, ctx); + const wsB = acceptWs(resB); + const docB = new Y.Doc(); + const awarenessB = new awarenessProtocol.Awareness(docB); + + await performSync(wsB, docB); + // Drain initial messages + await collectMessages(wsB, 100); + + // Set awareness state on client A + awarenessA.setLocalState({ user: { name: "Alice" } }); + + // Encode and send awareness update + const awarenessEncoder = encoding.createEncoder(); + encoding.writeVarUint(awarenessEncoder, messageAwareness); + encoding.writeVarUint8Array( + awarenessEncoder, + awarenessProtocol.encodeAwarenessUpdate(awarenessA, [docA.clientID]) + ); + wsA.send(encoding.toUint8Array(awarenessEncoder)); + + // Client B should receive the awareness update + const messagesB = await collectMessages(wsB, 300); + let awarenessReceived = false; + for (const msg of messagesB) { + if (!(msg instanceof ArrayBuffer)) continue; + const decoder = decoding.createDecoder(new Uint8Array(msg)); + const msgType = decoding.readVarUint(decoder); + if (msgType === messageAwareness) { + awarenessProtocol.applyAwarenessUpdate( + awarenessB, + decoding.readVarUint8Array(decoder), + null + ); + awarenessReceived = true; + } + } + + expect(awarenessReceived).toBe(true); + const stateA = awarenessB.getStates().get(docA.clientID); + expect(stateA).toBeDefined(); + expect((stateA as { user: { name: string } }).user.name).toBe("Alice"); + + wsA.close(); + wsB.close(); + }); +}); + +describe("YServer — persistence (onLoad / onSave)", () => { + it("persists document state via onSave and restores via onLoad", async () => { + const ctx = createExecutionContext(); + const roomName = "persist-test"; + + // --- Session 1: Connect, write data, then disconnect --- + { + const res = await worker.fetch( + wsRequest(`y-persistent/${roomName}`), + env, + ctx + ); + const ws = acceptWs(res); + const doc = new Y.Doc(); + await performSync(ws, doc); + + // Insert text + const updatePromise = new Promise((resolve) => { + doc.on("update", (update: Uint8Array) => resolve(update)); + }); + doc.getText("shared").insert(0, "persisted!"); + const update = await updatePromise; + sendUpdate(ws, update); + + // Wait for debounced onSave to fire (debounceWait: 50, maxWait: 100) + await new Promise((r) => setTimeout(r, 250)); + + ws.close(); + } + + // Small gap between sessions + await new Promise((r) => setTimeout(r, 100)); + + // --- Session 2: Reconnect and verify state was loaded --- + { + const res = await worker.fetch( + wsRequest(`y-persistent/${roomName}`), + env, + ctx + ); + const ws = acceptWs(res); + const doc = new Y.Doc(); + await performSync(ws, doc); + + // Collect sync step 2 from server which should contain the persisted state + const messages = await collectMessages(ws, 300); + applyIncomingMessages(messages, doc); + + expect(doc.getText("shared").toString()).toBe("persisted!"); + ws.close(); + } + }); +}); + +describe("YServer — read-only mode", () => { + it("accepts connections but rejects write updates", async () => { + const ctx = createExecutionContext(); + const roomName = "readonly-test"; + + // Connect to the read-only server + const res = await worker.fetch( + wsRequest(`y-read-only/${roomName}`), + env, + ctx + ); + const ws = acceptWs(res); + const doc = new Y.Doc(); + + // Should receive the "connected:readonly" string message + const stringMsg = nextStringMessage(ws); + + await performSync(ws, doc); + + expect(await stringMsg).toBe("connected:readonly"); + + // Try to send an update — it should be silently ignored + const updatePromise = new Promise((resolve) => { + doc.on("update", (update: Uint8Array) => resolve(update)); + }); + doc.getText("shared").insert(0, "should-be-rejected"); + const update = await updatePromise; + sendUpdate(ws, update); + + // Wait for any potential broadcast + await new Promise((r) => setTimeout(r, 200)); + + // Connect a second client to check the server document is still empty + const res2 = await worker.fetch( + wsRequest(`y-read-only/${roomName}`), + env, + ctx + ); + const ws2 = acceptWs(res2); + const doc2 = new Y.Doc(); + await performSync(ws2, doc2); + const messages2 = await collectMessages(ws2, 200); + applyIncomingMessages(messages2, doc2); + + // The server doc should be empty since the write was rejected + expect(doc2.getText("shared").toString()).toBe(""); + + ws.close(); + ws2.close(); + }); +}); + +describe("YServer — custom messages", () => { + it("handles ping/pong custom messages", async () => { + const ctx = createExecutionContext(); + const roomName = "custom-msg-test"; + + const res = await worker.fetch( + wsRequest(`y-custom-message/${roomName}`), + env, + ctx + ); + const ws = acceptWs(res); + + // Drain initial sync messages + await collectMessages(ws, 100); + + // Send a custom ping message using the __YPS: prefix + ws.send(`__YPS:${JSON.stringify({ action: "ping" })}`); + + // Should receive a pong back + const pong = await nextStringMessage(ws); + expect(pong.startsWith("__YPS:")).toBe(true); + const pongData = JSON.parse(pong.slice(6)); + expect(pongData).toEqual({ action: "pong" }); + + ws.close(); + }); + + it("broadcasts custom messages to other clients", async () => { + const ctx = createExecutionContext(); + const roomName = "custom-broadcast-test"; + + // Connect client A + const resA = await worker.fetch( + wsRequest(`y-custom-message/${roomName}`), + env, + ctx + ); + const wsA = acceptWs(resA); + await collectMessages(wsA, 100); + + // Connect client B + const resB = await worker.fetch( + wsRequest(`y-custom-message/${roomName}`), + env, + ctx + ); + const wsB = acceptWs(resB); + await collectMessages(wsB, 100); + + // Client A sends a broadcast request + wsA.send(`__YPS:${JSON.stringify({ action: "broadcast" })}`); + + // Client B should receive the broadcasted message + const msg = await nextStringMessage(wsB); + expect(msg.startsWith("__YPS:")).toBe(true); + const data = JSON.parse(msg.slice(6)); + expect(data).toEqual({ action: "broadcasted" }); + + // Client A should NOT receive the broadcast (excluded) + const msgsA = await collectMessages(wsA, 200); + const customMsgsA = msgsA.filter( + (m) => typeof m === "string" && m.includes("broadcasted") + ); + expect(customMsgsA).toHaveLength(0); + + wsA.close(); + wsB.close(); + }); + + it("handles non-prefixed string messages gracefully", async () => { + const ctx = createExecutionContext(); + const roomName = "custom-nopfx-test"; + + const res = await worker.fetch( + wsRequest(`y-custom-message/${roomName}`), + env, + ctx + ); + const ws = acceptWs(res); + await collectMessages(ws, 100); + + // Send a string message without __YPS: prefix — should be ignored, not crash + ws.send("hello without prefix"); + + // Give the server time to process; it should log a warning but not crash + await new Promise((r) => setTimeout(r, 100)); + + // Connection should still be alive — verify by sending a valid message + ws.send(`__YPS:${JSON.stringify({ action: "echo" })}`); + const echoMsg = await nextStringMessage(ws); + expect(echoMsg.startsWith("__YPS:")).toBe(true); + const echoData = JSON.parse(echoMsg.slice(6)); + expect(echoData).toEqual({ action: "echo" }); + + ws.close(); + }); +}); + +describe("YServer — onLoad returns a YDoc", () => { + it("seeds the document from a returned YDoc", async () => { + const ctx = createExecutionContext(); + const roomName = "onload-return-doc"; + + const res = await worker.fetch( + wsRequest(`y-on-load-returns-doc/${roomName}`), + env, + ctx + ); + const ws = acceptWs(res); + const doc = new Y.Doc(); + await performSync(ws, doc); + + // Collect messages to get the server's document state + const messages = await collectMessages(ws, 300); + applyIncomingMessages(messages, doc); + + // The server should have the seeded content from onLoad + expect(doc.getText("shared").toString()).toBe("seeded-content"); + + ws.close(); + }); +}); + +describe("YServer — onSave callback options", () => { + it("calls onSave after debounce period", async () => { + const ctx = createExecutionContext(); + const roomName = "callback-opts-test"; + + const res = await worker.fetch( + wsRequest(`y-callback-options/${roomName}`), + env, + ctx + ); + const ws = acceptWs(res); + const doc = new Y.Doc(); + await performSync(ws, doc); + + // Send an update to trigger the debounced onSave + const updatePromise = new Promise((resolve) => { + doc.on("update", (update: Uint8Array) => resolve(update)); + }); + doc.getText("shared").insert(0, "trigger-save"); + const update = await updatePromise; + sendUpdate(ws, update); + + // Wait for debounce to fire (debounceWait: 50ms, maxWait: 100ms) + await new Promise((r) => setTimeout(r, 250)); + + // Check saveCount via HTTP + const httpRes = await worker.fetch( + httpRequest(`y-callback-options/${roomName}`), + env, + ctx + ); + const data = (await httpRes.json()) as { saveCount: number }; + expect(data.saveCount).toBeGreaterThanOrEqual(1); + + ws.close(); + }); +}); + +describe("YServer — connection lifecycle", () => { + it("cleans up awareness on connection close", async () => { + const ctx = createExecutionContext(); + const roomName = "cleanup-test"; + + // Connect client A + const resA = await worker.fetch(wsRequest(`y-basic/${roomName}`), env, ctx); + const wsA = acceptWs(resA); + const docA = new Y.Doc(); + const awarenessA = new awarenessProtocol.Awareness(docA); + await performSync(wsA, docA); + + // Set awareness state on client A + awarenessA.setLocalState({ user: { name: "Alice" } }); + const awarenessEncoder = encoding.createEncoder(); + encoding.writeVarUint(awarenessEncoder, messageAwareness); + encoding.writeVarUint8Array( + awarenessEncoder, + awarenessProtocol.encodeAwarenessUpdate(awarenessA, [docA.clientID]) + ); + wsA.send(encoding.toUint8Array(awarenessEncoder)); + await new Promise((r) => setTimeout(r, 100)); + + // Connect client B + const resB = await worker.fetch(wsRequest(`y-basic/${roomName}`), env, ctx); + const wsB = acceptWs(resB); + const docB = new Y.Doc(); + const awarenessB = new awarenessProtocol.Awareness(docB); + await performSync(wsB, docB); + + // Client B should receive awareness with Alice + const msgsB1 = await collectMessages(wsB, 200); + for (const msg of msgsB1) { + if (!(msg instanceof ArrayBuffer)) continue; + const decoder = decoding.createDecoder(new Uint8Array(msg)); + const msgType = decoding.readVarUint(decoder); + if (msgType === messageAwareness) { + awarenessProtocol.applyAwarenessUpdate( + awarenessB, + decoding.readVarUint8Array(decoder), + null + ); + } + } + expect(awarenessB.getStates().get(docA.clientID)).toBeDefined(); + + // Now close client A + wsA.close(); + + // Client B should receive an awareness removal for A + const msgsB2 = await collectMessages(wsB, 300); + for (const msg of msgsB2) { + if (!(msg instanceof ArrayBuffer)) continue; + const decoder = decoding.createDecoder(new Uint8Array(msg)); + const msgType = decoding.readVarUint(decoder); + if (msgType === messageAwareness) { + awarenessProtocol.applyAwarenessUpdate( + awarenessB, + decoding.readVarUint8Array(decoder), + null + ); + } + } + + // A's awareness state should be removed + expect(awarenessB.getStates().get(docA.clientID)).toBeUndefined(); + + wsB.close(); + }); + + it("handles multiple concurrent connections to the same room", async () => { + const ctx = createExecutionContext(); + const roomName = "concurrent-test"; + + const clients: Array<{ ws: WebSocket; doc: Y.Doc }> = []; + + // Connect 3 clients + for (let i = 0; i < 3; i++) { + const res = await worker.fetch( + wsRequest(`y-basic/${roomName}`), + env, + ctx + ); + const ws = acceptWs(res); + const doc = new Y.Doc(); + await performSync(ws, doc); + clients.push({ ws, doc }); + } + + // Drain initial sync messages + for (const client of clients) { + await collectMessages(client.ws, 100); + } + + // Start collecting on clients 1 and 2 BEFORE sending the update + // so we don't miss the broadcast + const collectPromises = [ + collectMessages(clients[1].ws, 500), + collectMessages(clients[2].ws, 500) + ]; + + // Client 0 inserts text + const updatePromise = new Promise((resolve) => { + clients[0].doc.on("update", (update: Uint8Array) => resolve(update)); + }); + clients[0].doc.getText("shared").insert(0, "from-client-0"); + const update = await updatePromise; + sendUpdate(clients[0].ws, update); + + // Wait for collected messages + const [msgs1, msgs2] = await Promise.all(collectPromises); + applyIncomingMessages(msgs1, clients[1].doc); + applyIncomingMessages(msgs2, clients[2].doc); + + expect(clients[1].doc.getText("shared").toString()).toBe("from-client-0"); + expect(clients[2].doc.getText("shared").toString()).toBe("from-client-0"); + + for (const client of clients) { + client.ws.close(); + } + }); +}); + +describe("YServer — handleMessage binary conversion", () => { + it("handles ArrayBuffer messages correctly", async () => { + const ctx = createExecutionContext(); + const roomName = "arraybuffer-test"; + + const res = await worker.fetch(wsRequest(`y-basic/${roomName}`), env, ctx); + const ws = acceptWs(res); + const doc = new Y.Doc(); + await performSync(ws, doc); + + // Send an update as a raw ArrayBuffer (not Uint8Array) + doc.getText("shared").insert(0, "arraybuffer-data"); + const update = Y.encodeStateAsUpdate(doc); + + const encoder = encoding.createEncoder(); + encoding.writeVarUint(encoder, messageSync); + syncProtocol.writeUpdate(encoder, update); + const raw = encoding.toUint8Array(encoder); + + // Send as ArrayBuffer + ws.send(raw.buffer); + + // Give the server time to process + await new Promise((r) => setTimeout(r, 100)); + + // Verify by connecting a second client + const res2 = await worker.fetch(wsRequest(`y-basic/${roomName}`), env, ctx); + const ws2 = acceptWs(res2); + const doc2 = new Y.Doc(); + await performSync(ws2, doc2); + const msgs = await collectMessages(ws2, 200); + applyIncomingMessages(msgs, doc2); + + expect(doc2.getText("shared").toString()).toBe("arraybuffer-data"); + + ws.close(); + ws2.close(); + }); +}); + +describe("YProvider — URL construction", () => { + // These are unit tests for the provider's URL logic; they don't need + // a running server. We import directly and test construction. + it("strips protocol from host", async () => { + // We test YProvider construction by importing and checking the url property. + // We pass connect: false so it doesn't try to actually connect. + const { default: YProvider } = await import("../provider/index"); + const doc = new Y.Doc(); + const provider = new YProvider("https://example.com", "my-room", doc, { + connect: false, + WebSocketPolyfill: null + }); + expect(provider.url).toContain("wss://example.com"); + expect(provider.url).toContain("my-room"); + expect(provider.url).not.toContain("https://"); + provider.destroy(); + }); + + it("strips trailing slash from host (bug fix)", async () => { + const { default: YProvider } = await import("../provider/index"); + const doc = new Y.Doc(); + const provider = new YProvider("example.com/", "my-room", doc, { + connect: false, + WebSocketPolyfill: null + }); + // Should not have double slashes from un-stripped trailing slash + expect(provider.url).not.toContain("com//"); + expect(provider.url).toContain("wss://example.com/"); + provider.destroy(); + }); + + it("uses ws:// for localhost", async () => { + const { default: YProvider } = await import("../provider/index"); + const doc = new Y.Doc(); + const provider = new YProvider("localhost:8787", "room", doc, { + connect: false, + WebSocketPolyfill: null + }); + expect(provider.url).toMatch(/^ws:\/\/localhost:8787/); + provider.destroy(); + }); + + it("uses ws:// for 127.0.0.1", async () => { + const { default: YProvider } = await import("../provider/index"); + const doc = new Y.Doc(); + const provider = new YProvider("127.0.0.1:8787", "room", doc, { + connect: false, + WebSocketPolyfill: null + }); + expect(provider.url).toMatch(/^ws:\/\/127\.0\.0\.1:8787/); + provider.destroy(); + }); + + it("respects explicit protocol option", async () => { + const { default: YProvider } = await import("../provider/index"); + const doc = new Y.Doc(); + const provider = new YProvider("example.com", "room", doc, { + connect: false, + protocol: "ws", + WebSocketPolyfill: null + }); + expect(provider.url).toMatch(/^ws:\/\/example\.com/); + provider.destroy(); + }); + + it("uses custom party name in URL path", async () => { + const { default: YProvider } = await import("../provider/index"); + const doc = new Y.Doc(); + const provider = new YProvider("example.com", "room", doc, { + connect: false, + party: "my-party", + WebSocketPolyfill: null + }); + expect(provider.url).toContain("/parties/my-party/"); + provider.destroy(); + }); + + it("uses custom prefix instead of /parties/:party/", async () => { + const { default: YProvider } = await import("../provider/index"); + const doc = new Y.Doc(); + const provider = new YProvider("example.com", "room", doc, { + connect: false, + prefix: "/custom/path", + WebSocketPolyfill: null + }); + expect(provider.url).toContain("/custom/path"); + expect(provider.url).not.toContain("/parties/"); + provider.destroy(); + }); +}); + +describe("YServer — awareness heartbeat suppression", () => { + it("does not broadcast clock-only awareness renewals to other clients", async () => { + const ctx = createExecutionContext(); + const roomName = "no-heartbeat-test"; + + // Connect client A + const resA = await worker.fetch(wsRequest(`y-basic/${roomName}`), env, ctx); + const wsA = acceptWs(resA); + const docA = new Y.Doc(); + const awarenessA = new awarenessProtocol.Awareness(docA); + await performSync(wsA, docA); + + // Connect client B + const resB = await worker.fetch(wsRequest(`y-basic/${roomName}`), env, ctx); + const wsB = acceptWs(resB); + const docB = new Y.Doc(); + await performSync(wsB, docB); + // Drain initial messages + await collectMessages(wsB, 100); + + // Set awareness state on A — this is an actual state change, should be sent + awarenessA.setLocalState({ user: { name: "Alice" } }); + const enc1 = encoding.createEncoder(); + encoding.writeVarUint(enc1, messageAwareness); + encoding.writeVarUint8Array( + enc1, + awarenessProtocol.encodeAwarenessUpdate(awarenessA, [docA.clientID]) + ); + wsA.send(encoding.toUint8Array(enc1)); + + // B should receive this actual state change + const msgs1 = await collectMessages(wsB, 300); + const awarenessMsgs1 = msgs1.filter((msg) => { + if (!(msg instanceof ArrayBuffer)) return false; + const d = decoding.createDecoder(new Uint8Array(msg)); + return decoding.readVarUint(d) === messageAwareness; + }); + expect(awarenessMsgs1.length).toBeGreaterThan(0); + + // Now simulate a clock-only renewal (same state, just bump the clock) + // This is what the awareness _checkInterval does every 15s + awarenessA.setLocalState(awarenessA.getLocalState()); + const enc2 = encoding.createEncoder(); + encoding.writeVarUint(enc2, messageAwareness); + encoding.writeVarUint8Array( + enc2, + awarenessProtocol.encodeAwarenessUpdate(awarenessA, [docA.clientID]) + ); + wsA.send(encoding.toUint8Array(enc2)); + + // B should receive this too (server forwards all client messages) + // The suppression happens on the CLIENT side (provider doesn't send + // clock-only updates), not the server side. + const msgs2 = await collectMessages(wsB, 300); + const awarenessMsgs2 = msgs2.filter((msg) => { + if (!(msg instanceof ArrayBuffer)) return false; + const d = decoding.createDecoder(new Uint8Array(msg)); + return decoding.readVarUint(d) === messageAwareness; + }); + // Server still forwards — the point is clients won't SEND these + expect(awarenessMsgs2.length).toBeGreaterThan(0); + + wsA.close(); + wsB.close(); + }); + + it("server does not auto-remove awareness after 30s (checkInterval disabled)", async () => { + const ctx = createExecutionContext(); + const roomName = "no-timeout-removal-test"; + + // Connect client A + const resA = await worker.fetch(wsRequest(`y-basic/${roomName}`), env, ctx); + const wsA = acceptWs(resA); + const docA = new Y.Doc(); + const awarenessA = new awarenessProtocol.Awareness(docA); + await performSync(wsA, docA); + + // Set awareness state on A + awarenessA.setLocalState({ user: { name: "Alice" } }); + const enc = encoding.createEncoder(); + encoding.writeVarUint(enc, messageAwareness); + encoding.writeVarUint8Array( + enc, + awarenessProtocol.encodeAwarenessUpdate(awarenessA, [docA.clientID]) + ); + wsA.send(encoding.toUint8Array(enc)); + await new Promise((r) => setTimeout(r, 100)); + + // Connect client B and verify it sees A's awareness + const resB = await worker.fetch(wsRequest(`y-basic/${roomName}`), env, ctx); + const wsB = acceptWs(resB); + const docB = new Y.Doc(); + const awarenessB = new awarenessProtocol.Awareness(docB); + await performSync(wsB, docB); + + const msgs = await collectMessages(wsB, 200); + for (const msg of msgs) { + if (!(msg instanceof ArrayBuffer)) continue; + const decoder = decoding.createDecoder(new Uint8Array(msg)); + const msgType = decoding.readVarUint(decoder); + if (msgType === messageAwareness) { + awarenessProtocol.applyAwarenessUpdate( + awarenessB, + decoding.readVarUint8Array(decoder), + null + ); + } + } + expect(awarenessB.getStates().get(docA.clientID)).toBeDefined(); + + // Wait >3s (the awareness _checkInterval runs every 3s = outdatedTimeout/10) + // If the server's checkInterval were still active, it would start the + // removal process. With it disabled, the state should persist. + await new Promise((r) => setTimeout(r, 4000)); + + // Connect client C to check if A's awareness is still on the server + const resC = await worker.fetch(wsRequest(`y-basic/${roomName}`), env, ctx); + const wsC = acceptWs(resC); + const docC = new Y.Doc(); + const awarenessC = new awarenessProtocol.Awareness(docC); + await performSync(wsC, docC); + + const msgsC = await collectMessages(wsC, 200); + for (const msg of msgsC) { + if (!(msg instanceof ArrayBuffer)) continue; + const decoder = decoding.createDecoder(new Uint8Array(msg)); + const msgType = decoding.readVarUint(decoder); + if (msgType === messageAwareness) { + awarenessProtocol.applyAwarenessUpdate( + awarenessC, + decoding.readVarUint8Array(decoder), + null + ); + } + } + + // A's awareness should still be present — not timed out + expect(awarenessC.getStates().get(docA.clientID)).toBeDefined(); + expect( + (awarenessC.getStates().get(docA.clientID) as { user: { name: string } }) + .user.name + ).toBe("Alice"); + + wsA.close(); + wsB.close(); + wsC.close(); + }); +}); diff --git a/packages/y-partyserver/src/tests/integration-wrangler.jsonc b/packages/y-partyserver/src/tests/integration-wrangler.jsonc new file mode 100644 index 00000000..245fdf01 --- /dev/null +++ b/packages/y-partyserver/src/tests/integration-wrangler.jsonc @@ -0,0 +1,55 @@ +{ + "name": "y-partyserver-integration", + "main": "./worker.ts", + "compatibility_date": "2025-12-10", + "compatibility_flags": ["nodejs_compat"], + "durable_objects": { + "bindings": [ + { + "name": "YBasic", + "class_name": "YBasic" + }, + { + "name": "YPersistent", + "class_name": "YPersistent" + }, + { + "name": "YReadOnly", + "class_name": "YReadOnly" + }, + { + "name": "YCustomMessage", + "class_name": "YCustomMessage" + }, + { + "name": "YOnLoadReturnsDoc", + "class_name": "YOnLoadReturnsDoc" + }, + { + "name": "YCallbackOptions", + "class_name": "YCallbackOptions" + }, + { + "name": "YHibernateTracker", + "class_name": "YHibernateTracker" + } + ] + }, + "migrations": [ + { + "tag": "v1", + "new_sqlite_classes": [ + "YBasic", + "YPersistent", + "YReadOnly", + "YCustomMessage", + "YOnLoadReturnsDoc", + "YCallbackOptions" + ] + }, + { + "tag": "v2", + "new_sqlite_classes": ["YHibernateTracker"] + } + ] +} diff --git a/packages/y-partyserver/src/tests/integration.test.ts b/packages/y-partyserver/src/tests/integration.test.ts new file mode 100644 index 00000000..81f045b3 --- /dev/null +++ b/packages/y-partyserver/src/tests/integration.test.ts @@ -0,0 +1,540 @@ +import WebSocket from "ws"; +import * as Y from "yjs"; +import { afterEach, describe, expect, it } from "vitest"; + +import YProvider from "../provider/index"; + +const PORT = 8799; +const HOST = `localhost:${PORT}`; + +// Track providers for cleanup +const providers: YProvider[] = []; + +function createProvider( + room: string, + options: { + party?: string; + doc?: Y.Doc; + connect?: boolean; + } = {} +): YProvider { + const doc = options.doc ?? new Y.Doc(); + const provider = new YProvider(HOST, room, doc, { + party: options.party ?? "y-basic", + connect: options.connect ?? true, + // Use ws as the WebSocket polyfill for Node.js + WebSocketPolyfill: WebSocket as unknown as typeof globalThis.WebSocket, + disableBc: true + }); + providers.push(provider); + return provider; +} + +function waitForSync(provider: YProvider): Promise { + return new Promise((resolve, reject) => { + if (provider.synced) { + resolve(); + return; + } + const timeout = setTimeout( + () => reject(new Error("Timed out waiting for sync")), + 10000 + ); + provider.on("synced", () => { + clearTimeout(timeout); + resolve(); + }); + }); +} + +function waitForConnection(provider: YProvider): Promise { + return new Promise((resolve, reject) => { + if (provider.wsconnected) { + resolve(); + return; + } + const timeout = setTimeout( + () => reject(new Error("Timed out waiting for connection")), + 10000 + ); + provider.on("status", (event: { status: string }) => { + if (event.status === "connected") { + clearTimeout(timeout); + resolve(); + } + }); + }); +} + +function waitForCustomMessage(provider: YProvider): Promise { + return new Promise((resolve, reject) => { + const timeout = setTimeout( + () => reject(new Error("Timed out waiting for custom message")), + 10000 + ); + provider.on("custom-message", (message: string) => { + clearTimeout(timeout); + resolve(message); + }); + }); +} + +afterEach(() => { + // Destroy all providers created during the test + for (const p of providers) { + try { + p.destroy(); + } catch { + // ignore + } + } + providers.length = 0; +}); + +// --------------------------------------------------------------------------- +// Integration tests — real wrangler dev server +// --------------------------------------------------------------------------- + +describe("Integration — YProvider ↔ YServer sync", () => { + it("connects and syncs a document", async () => { + const room = `sync-basic-${Date.now()}`; + const provider = createProvider(room); + + await waitForSync(provider); + + expect(provider.synced).toBe(true); + expect(provider.wsconnected).toBe(true); + }); + + it("syncs text between two providers", async () => { + const room = `sync-two-${Date.now()}`; + + // Provider A connects and writes + const docA = new Y.Doc(); + const providerA = createProvider(room, { doc: docA }); + await waitForSync(providerA); + + docA.getText("shared").insert(0, "hello from A"); + + // Give server time to process the update + await new Promise((r) => setTimeout(r, 300)); + + // Provider B connects and should receive A's state + const docB = new Y.Doc(); + const providerB = createProvider(room, { doc: docB }); + await waitForSync(providerB); + + // Wait a bit for sync step 2 to arrive + await new Promise((r) => setTimeout(r, 500)); + + expect(docB.getText("shared").toString()).toBe("hello from A"); + }); + + it("broadcasts live updates between connected providers", async () => { + const room = `sync-live-${Date.now()}`; + + const docA = new Y.Doc(); + const providerA = createProvider(room, { doc: docA }); + await waitForSync(providerA); + + const docB = new Y.Doc(); + const providerB = createProvider(room, { doc: docB }); + await waitForSync(providerB); + + // Wait for initial sync to settle + await new Promise((r) => setTimeout(r, 300)); + + // A writes, B should receive the update in real-time + const updateReceived = new Promise((resolve) => { + docB.on("update", () => { + if (docB.getText("shared").toString() === "live-update") { + resolve(); + } + }); + }); + + docA.getText("shared").insert(0, "live-update"); + + await updateReceived; + expect(docB.getText("shared").toString()).toBe("live-update"); + }); + + it("handles concurrent edits from multiple providers", async () => { + const room = `sync-concurrent-${Date.now()}`; + + const docA = new Y.Doc(); + const providerA = createProvider(room, { doc: docA }); + await waitForSync(providerA); + + const docB = new Y.Doc(); + const providerB = createProvider(room, { doc: docB }); + await waitForSync(providerB); + + await new Promise((r) => setTimeout(r, 300)); + + // Both insert concurrently + docA.getText("shared").insert(0, "A"); + docB.getText("shared").insert(0, "B"); + + // Wait for convergence + await new Promise((r) => setTimeout(r, 1000)); + + // Both docs should converge to the same content (Yjs CRDT) + const textA = docA.getText("shared").toString(); + const textB = docB.getText("shared").toString(); + expect(textA).toBe(textB); + expect(textA).toHaveLength(2); + expect(textA).toContain("A"); + expect(textA).toContain("B"); + }); + + it("syncs Map types", async () => { + const room = `sync-map-${Date.now()}`; + + const docA = new Y.Doc(); + const providerA = createProvider(room, { doc: docA }); + await waitForSync(providerA); + + docA.getMap("config").set("key1", "value1"); + docA.getMap("config").set("key2", 42); + + await new Promise((r) => setTimeout(r, 300)); + + const docB = new Y.Doc(); + const providerB = createProvider(room, { doc: docB }); + await waitForSync(providerB); + + await new Promise((r) => setTimeout(r, 500)); + + expect(docB.getMap("config").get("key1")).toBe("value1"); + expect(docB.getMap("config").get("key2")).toBe(42); + }); + + it("syncs Array types", async () => { + const room = `sync-array-${Date.now()}`; + + const docA = new Y.Doc(); + const providerA = createProvider(room, { doc: docA }); + await waitForSync(providerA); + + docA.getArray("items").push(["item1", "item2", "item3"]); + + await new Promise((r) => setTimeout(r, 300)); + + const docB = new Y.Doc(); + const providerB = createProvider(room, { doc: docB }); + await waitForSync(providerB); + + await new Promise((r) => setTimeout(r, 500)); + + expect(docB.getArray("items").toJSON()).toEqual([ + "item1", + "item2", + "item3" + ]); + }); +}); + +describe("Integration — awareness", () => { + it("shares awareness state between providers", async () => { + const room = `awareness-${Date.now()}`; + + const docA = new Y.Doc(); + const providerA = createProvider(room, { doc: docA }); + await waitForSync(providerA); + + const docB = new Y.Doc(); + const providerB = createProvider(room, { doc: docB }); + await waitForSync(providerB); + + await new Promise((r) => setTimeout(r, 300)); + + // Set awareness on A + providerA.awareness.setLocalState({ + user: { name: "Alice", color: "#ff0000" } + }); + + // Wait for the actual awareness state to propagate (not just the + // default {} from the Awareness constructor) + await new Promise((resolve) => { + const check = () => { + const stateA = providerB.awareness.getStates().get(docA.clientID) as + | { user?: { name: string } } + | undefined; + if (stateA?.user) { + resolve(); + } else { + setTimeout(check, 100); + } + }; + check(); + }); + + const stateA = providerB.awareness.getStates().get(docA.clientID) as { + user: { name: string; color: string }; + }; + expect(stateA.user.name).toBe("Alice"); + expect(stateA.user.color).toBe("#ff0000"); + }); + + it("cleans up awareness when a provider disconnects", async () => { + const room = `awareness-cleanup-${Date.now()}`; + + const docA = new Y.Doc(); + const providerA = createProvider(room, { doc: docA }); + await waitForSync(providerA); + + const docB = new Y.Doc(); + const providerB = createProvider(room, { doc: docB }); + await waitForSync(providerB); + + await new Promise((r) => setTimeout(r, 300)); + + providerA.awareness.setLocalState({ user: { name: "Alice" } }); + await new Promise((r) => setTimeout(r, 500)); + + // B should see A's awareness + expect(providerB.awareness.getStates().get(docA.clientID)).toBeDefined(); + + // Disconnect A + providerA.disconnect(); + await new Promise((r) => setTimeout(r, 1000)); + + // B should no longer see A's awareness + expect(providerB.awareness.getStates().get(docA.clientID)).toBeUndefined(); + }); +}); + +describe("Integration — custom messages", () => { + it("sends and receives custom ping/pong messages", async () => { + const room = `custom-ping-${Date.now()}`; + const provider = createProvider(room, { party: "y-custom-message" }); + await waitForSync(provider); + + // Listen for the pong before sending ping + const pongPromise = waitForCustomMessage(provider); + + // Send ping + provider.sendMessage(JSON.stringify({ action: "ping" })); + + const pong = await pongPromise; + expect(JSON.parse(pong)).toEqual({ action: "pong" }); + }); + + it("broadcasts custom messages to other providers", async () => { + const room = `custom-broadcast-${Date.now()}`; + + const providerA = createProvider(room, { party: "y-custom-message" }); + await waitForSync(providerA); + + const providerB = createProvider(room, { party: "y-custom-message" }); + await waitForSync(providerB); + + await new Promise((r) => setTimeout(r, 300)); + + // Listen on B for the broadcast + const broadcastPromise = waitForCustomMessage(providerB); + + // A sends a broadcast request + providerA.sendMessage(JSON.stringify({ action: "broadcast" })); + + const msg = await broadcastPromise; + expect(JSON.parse(msg)).toEqual({ action: "broadcasted" }); + }); +}); + +describe("Integration — persistence", () => { + it("persists document state across sessions", async () => { + const room = `persist-e2e-${Date.now()}`; + + // Session 1: write data + { + const doc = new Y.Doc(); + const provider = createProvider(room, { + doc, + party: "y-persistent" + }); + await waitForSync(provider); + + doc.getText("shared").insert(0, "persisted-data"); + + // Wait for debounced onSave to fire + await new Promise((r) => setTimeout(r, 500)); + + provider.destroy(); + // Remove from tracked list since we destroyed it manually + const idx = providers.indexOf(provider); + if (idx >= 0) providers.splice(idx, 1); + } + + await new Promise((r) => setTimeout(r, 500)); + + // Session 2: reconnect and verify + { + const doc = new Y.Doc(); + const provider = createProvider(room, { + doc, + party: "y-persistent" + }); + await waitForSync(provider); + + // Wait for server to send the persisted state + await new Promise((r) => setTimeout(r, 1000)); + + expect(doc.getText("shared").toString()).toBe("persisted-data"); + } + }); +}); + +describe("Integration — read-only mode", () => { + it("prevents writes from read-only connections", async () => { + const room = `readonly-e2e-${Date.now()}`; + + // Connect a read-only provider and try to write + const docRO = new Y.Doc(); + const providerRO = createProvider(room, { + doc: docRO, + party: "y-read-only" + }); + await waitForSync(providerRO); + + docRO.getText("shared").insert(0, "should-not-appear"); + await new Promise((r) => setTimeout(r, 500)); + + // Connect a second read-only provider and check the server doc is empty + const docRO2 = new Y.Doc(); + const providerRO2 = createProvider(room, { + doc: docRO2, + party: "y-read-only" + }); + await waitForSync(providerRO2); + await new Promise((r) => setTimeout(r, 500)); + + expect(docRO2.getText("shared").toString()).toBe(""); + }); +}); + +describe("Integration — onLoad returns YDoc", () => { + it("seeds document from returned YDoc", async () => { + const room = `onload-doc-e2e-${Date.now()}`; + + const doc = new Y.Doc(); + const provider = createProvider(room, { + doc, + party: "y-on-load-returns-doc" + }); + await waitForSync(provider); + + await new Promise((r) => setTimeout(r, 500)); + + expect(doc.getText("shared").toString()).toBe("seeded-content"); + }); +}); + +describe("Integration — reconnection", () => { + it("reconnects and re-syncs after disconnect", async () => { + const room = `reconnect-${Date.now()}`; + + const doc = new Y.Doc(); + const provider = createProvider(room, { doc }); + await waitForSync(provider); + + doc.getText("shared").insert(0, "before-disconnect"); + await new Promise((r) => setTimeout(r, 300)); + + // Disconnect — with hibernate: true and the ws library, the close + // handshake may not complete (no close event), so we force-terminate + // the underlying WebSocket and wait for the provider to notice. + const ws = provider.ws as unknown as { + terminate?: () => void; + CLOSED?: number; + }; + provider.disconnect(); + if (ws && typeof ws.terminate === "function") { + ws.terminate(); + } + // Wait for provider to process the disconnect + await new Promise((resolve) => { + const check = () => { + if (!provider.wsconnected && provider.ws === null) { + resolve(); + } else { + setTimeout(check, 50); + } + }; + setTimeout(check, 100); + }); + + // Reconnect + provider.connect(); + await waitForConnection(provider); + await waitForSync(provider); + + // The text should still be there (server-side doc persists in memory) + await new Promise((r) => setTimeout(r, 500)); + expect(doc.getText("shared").toString()).toBe("before-disconnect"); + }); +}); + +describe("Integration — multiple rooms", () => { + it("isolates documents across different rooms", async () => { + const roomA = `multi-room-a-${Date.now()}`; + const roomB = `multi-room-b-${Date.now()}`; + + const docA = new Y.Doc(); + const providerA = createProvider(roomA, { doc: docA }); + await waitForSync(providerA); + + const docB = new Y.Doc(); + const providerB = createProvider(roomB, { doc: docB }); + await waitForSync(providerB); + + // Write different content to each room + docA.getText("shared").insert(0, "room-A-content"); + docB.getText("shared").insert(0, "room-B-content"); + + await new Promise((r) => setTimeout(r, 500)); + + // Verify isolation: each room has its own content + expect(docA.getText("shared").toString()).toBe("room-A-content"); + expect(docB.getText("shared").toString()).toBe("room-B-content"); + + // Connect a new provider to room A — should only see A's content + const docA2 = new Y.Doc(); + const providerA2 = createProvider(roomA, { doc: docA2 }); + await waitForSync(providerA2); + await new Promise((r) => setTimeout(r, 500)); + + expect(docA2.getText("shared").toString()).toBe("room-A-content"); + }); +}); + +describe("Integration — large documents", () => { + it("syncs a document with many operations", async () => { + const room = `large-doc-${Date.now()}`; + + const docA = new Y.Doc(); + const providerA = createProvider(room, { doc: docA }); + await waitForSync(providerA); + + // Perform many operations + const text = docA.getText("shared"); + for (let i = 0; i < 100; i++) { + text.insert(text.length, `line-${i}\n`); + } + + await new Promise((r) => setTimeout(r, 1000)); + + // Connect B and verify it gets all the content + const docB = new Y.Doc(); + const providerB = createProvider(room, { doc: docB }); + await waitForSync(providerB); + await new Promise((r) => setTimeout(r, 1000)); + + const contentB = docB.getText("shared").toString(); + expect(contentB).toContain("line-0"); + expect(contentB).toContain("line-99"); + // All 100 lines should be present + expect(contentB.split("\n").filter(Boolean)).toHaveLength(100); + }); +}); diff --git a/packages/y-partyserver/src/tests/server-harness.ts b/packages/y-partyserver/src/tests/server-harness.ts new file mode 100644 index 00000000..af508db5 --- /dev/null +++ b/packages/y-partyserver/src/tests/server-harness.ts @@ -0,0 +1,104 @@ +import { spawn } from "node:child_process"; +import type { ChildProcess } from "node:child_process"; +import fs from "node:fs"; +import path from "node:path"; + +const TEST_DIR = path.dirname(new URL(import.meta.url).pathname); + +export class WranglerServer { + private process: ChildProcess | null = null; + private port: number; + private persistDir: string; + + constructor(port: number) { + this.port = port; + this.persistDir = path.join(TEST_DIR, `.wrangler-persist-${port}`); + } + + async start(): Promise { + this.process = spawn( + "npx", + [ + "wrangler", + "dev", + "--config", + path.join(TEST_DIR, "integration-wrangler.jsonc"), + "--port", + String(this.port), + "--persist-to", + this.persistDir, + "--no-show-interactive-dev-session" + ], + { + cwd: TEST_DIR, + stdio: ["pipe", "pipe", "pipe"], + env: { + ...process.env, + BROWSER: "none" + } + } + ); + + this.process.stdout?.on("data", (data: Buffer) => { + if (process.env.DEBUG) { + process.stderr.write(`[wrangler:${this.port}] ${data.toString()}`); + } + }); + this.process.stderr?.on("data", (data: Buffer) => { + if (process.env.DEBUG) { + process.stderr.write(`[wrangler:${this.port}:err] ${data.toString()}`); + } + }); + + await this.waitForReady(); + } + + async stop(): Promise { + if (!this.process) return; + + const proc = this.process; + this.process = null; + + proc.kill("SIGTERM"); + await new Promise((resolve) => { + const timeout = setTimeout(() => { + proc.kill("SIGKILL"); + resolve(); + }, 3000); + proc.on("exit", () => { + clearTimeout(timeout); + resolve(); + }); + }); + + // Brief pause to let the OS release the port + await new Promise((r) => setTimeout(r, 500)); + } + + async restart(): Promise { + await this.stop(); + await this.start(); + } + + cleanup(): void { + if (fs.existsSync(this.persistDir)) { + fs.rmSync(this.persistDir, { recursive: true, force: true }); + } + } + + private async waitForReady(timeoutMs = 30000): Promise { + const start = Date.now(); + while (Date.now() - start < timeoutMs) { + try { + const res = await fetch(`http://localhost:${this.port}/`); + if (res.ok || res.status === 404) return; + } catch { + // not ready yet + } + await new Promise((r) => setTimeout(r, 200)); + } + throw new Error( + `Server on port ${this.port} did not start within ${timeoutMs}ms` + ); + } +} diff --git a/packages/y-partyserver/src/tests/tsconfig.json b/packages/y-partyserver/src/tests/tsconfig.json new file mode 100644 index 00000000..9c10c993 --- /dev/null +++ b/packages/y-partyserver/src/tests/tsconfig.json @@ -0,0 +1,11 @@ +{ + "extends": "../../../../tsconfig.base.json", + "compilerOptions": { + "types": [ + "@cloudflare/workers-types/experimental", + "@cloudflare/vitest-pool-workers" + ], + "lib": ["ESNext", "DOM"] + }, + "include": ["./**/*.ts"] +} diff --git a/packages/y-partyserver/src/tests/vitest.config.ts b/packages/y-partyserver/src/tests/vitest.config.ts new file mode 100644 index 00000000..7e04b31c --- /dev/null +++ b/packages/y-partyserver/src/tests/vitest.config.ts @@ -0,0 +1,15 @@ +import { defineWorkersConfig } from "@cloudflare/vitest-pool-workers/config"; + +export default defineWorkersConfig({ + test: { + include: ["index.test.ts"], + poolOptions: { + workers: { + isolatedStorage: false, + wrangler: { + configPath: "./wrangler.jsonc" + } + } + } + } +}); diff --git a/packages/y-partyserver/src/tests/vitest.hibernation.config.ts b/packages/y-partyserver/src/tests/vitest.hibernation.config.ts new file mode 100644 index 00000000..4413d50e --- /dev/null +++ b/packages/y-partyserver/src/tests/vitest.hibernation.config.ts @@ -0,0 +1,9 @@ +import { defineConfig } from "vitest/config"; + +export default defineConfig({ + test: { + include: ["hibernation.test.ts"], + testTimeout: 120000, + hookTimeout: 60000 + } +}); diff --git a/packages/y-partyserver/src/tests/vitest.integration.config.ts b/packages/y-partyserver/src/tests/vitest.integration.config.ts new file mode 100644 index 00000000..94424692 --- /dev/null +++ b/packages/y-partyserver/src/tests/vitest.integration.config.ts @@ -0,0 +1,9 @@ +import { defineConfig } from "vitest/config"; + +export default defineConfig({ + test: { + include: ["integration.test.ts"], + testTimeout: 30000, + globalSetup: ["./global-setup.ts"] + } +}); diff --git a/packages/y-partyserver/src/tests/worker.ts b/packages/y-partyserver/src/tests/worker.ts new file mode 100644 index 00000000..5c11b53d --- /dev/null +++ b/packages/y-partyserver/src/tests/worker.ts @@ -0,0 +1,201 @@ +import { routePartykitRequest } from "partyserver"; +import { YServer } from "../server/index"; +import * as Y from "yjs"; + +import type { Connection, ConnectionContext } from "partyserver"; +import type { CallbackOptions } from "../server/index"; + +// --------------------------------------------------------------------------- +// Env type for all test DOs +// --------------------------------------------------------------------------- +export type Env = { + YBasic: DurableObjectNamespace; + YPersistent: DurableObjectNamespace; + YReadOnly: DurableObjectNamespace; + YCustomMessage: DurableObjectNamespace; + YOnLoadReturnsDoc: DurableObjectNamespace; + YCallbackOptions: DurableObjectNamespace; + YHibernateTracker: DurableObjectNamespace; +}; + +// --------------------------------------------------------------------------- +// 1. Basic YServer — no persistence, no customization +// --------------------------------------------------------------------------- +export class YBasic extends YServer { + static options = { + hibernate: true + }; +} + +// --------------------------------------------------------------------------- +// 2. Persistent YServer — stores doc in SQLite, exercises onLoad/onSave +// --------------------------------------------------------------------------- +export class YPersistent extends YServer { + static options = { + hibernate: true + }; + + static callbackOptions: CallbackOptions = { + debounceWait: 50, + debounceMaxWait: 100 + }; + + async onStart() { + this.ctx.storage.sql.exec( + "CREATE TABLE IF NOT EXISTS documents (id TEXT PRIMARY KEY, content BLOB)" + ); + return super.onStart(); + } + + async onLoad() { + const rows = [ + ...this.ctx.storage.sql.exec( + "SELECT content FROM documents WHERE id = ? LIMIT 1", + this.name + ) + ]; + if (rows.length > 0 && rows[0].content) { + Y.applyUpdate( + this.document, + new Uint8Array(rows[0].content as ArrayBuffer) + ); + } + return; + } + + async onSave() { + const update = Y.encodeStateAsUpdate(this.document); + this.ctx.storage.sql.exec( + "INSERT OR REPLACE INTO documents (id, content) VALUES (?, ?)", + this.name, + update + ); + } +} + +// --------------------------------------------------------------------------- +// 3. Read-only YServer — all connections are read-only +// --------------------------------------------------------------------------- +export class YReadOnly extends YServer { + static options = { + hibernate: true + }; + + isReadOnly(_connection: Connection): boolean { + return true; + } + + onConnect(conn: Connection, _ctx: ConnectionContext): void { + super.onConnect(conn, _ctx); + // Also send a marker so the test knows the connection was accepted + conn.send("connected:readonly"); + } +} + +// --------------------------------------------------------------------------- +// 4. Custom message YServer — exercises onCustomMessage, sendCustomMessage, +// broadcastCustomMessage +// --------------------------------------------------------------------------- +export class YCustomMessage extends YServer { + static options = { + hibernate: true + }; + + onCustomMessage(connection: Connection, message: string): void { + try { + const data = JSON.parse(message) as { action: string }; + if (data.action === "ping") { + this.sendCustomMessage(connection, JSON.stringify({ action: "pong" })); + } else if (data.action === "broadcast") { + this.broadcastCustomMessage( + JSON.stringify({ action: "broadcasted" }), + connection + ); + } else if (data.action === "echo") { + this.sendCustomMessage(connection, message); + } + } catch { + this.sendCustomMessage( + connection, + JSON.stringify({ error: "parse-error" }) + ); + } + } +} + +// --------------------------------------------------------------------------- +// 5. YServer where onLoad returns a YDoc (tests the return-YDoc code path) +// --------------------------------------------------------------------------- +export class YOnLoadReturnsDoc extends YServer { + static options = { + hibernate: true + }; + + async onLoad(): Promise { + // Create a fresh doc with some pre-seeded content + const seedDoc = new Y.Doc(); + seedDoc.getText("shared").insert(0, "seeded-content"); + return seedDoc; + } +} + +// --------------------------------------------------------------------------- +// 6. YServer with custom callback options +// --------------------------------------------------------------------------- +export class YCallbackOptions extends YServer { + static options = { + hibernate: true + }; + + static callbackOptions: CallbackOptions = { + debounceWait: 50, + debounceMaxWait: 100 + }; + + saveCount = 0; + + async onSave() { + this.saveCount++; + } + + // Expose saveCount via HTTP for testing + onRequest(): Response { + return Response.json({ saveCount: this.saveCount }); + } +} + +// --------------------------------------------------------------------------- +// 7. YServer that tracks onStart calls via storage — detects hibernation +// --------------------------------------------------------------------------- +export class YHibernateTracker extends YServer { + static options = { + hibernate: true + }; + + async onStart() { + const count = (await this.ctx.storage.get("onStartCount")) ?? 0; + await this.ctx.storage.put("onStartCount", count + 1); + return super.onStart(); + } + + async onRequest(): Promise { + const count = (await this.ctx.storage.get("onStartCount")) ?? 0; + return Response.json({ onStartCount: count }); + } +} + +// --------------------------------------------------------------------------- +// Default fetch handler — routes to the correct DO +// --------------------------------------------------------------------------- +export default { + async fetch( + request: Request, + env: Env, + _ctx: ExecutionContext + ): Promise { + return ( + (await routePartykitRequest(request, env)) || + new Response("Not Found", { status: 404 }) + ); + } +} satisfies ExportedHandler; diff --git a/packages/y-partyserver/src/tests/wrangler.jsonc b/packages/y-partyserver/src/tests/wrangler.jsonc new file mode 100644 index 00000000..e4762a5e --- /dev/null +++ b/packages/y-partyserver/src/tests/wrangler.jsonc @@ -0,0 +1,52 @@ +{ + "main": "/worker.ts", + "compatibility_date": "2026-01-28", + "compatibility_flags": [ + "nodejs_compat", + "enable_nodejs_tty_module", + "enable_nodejs_fs_module", + "enable_nodejs_http_modules", + "enable_nodejs_perf_hooks_module" + ], + "durable_objects": { + "bindings": [ + { + "name": "YBasic", + "class_name": "YBasic" + }, + { + "name": "YPersistent", + "class_name": "YPersistent" + }, + { + "name": "YReadOnly", + "class_name": "YReadOnly" + }, + { + "name": "YCustomMessage", + "class_name": "YCustomMessage" + }, + { + "name": "YOnLoadReturnsDoc", + "class_name": "YOnLoadReturnsDoc" + }, + { + "name": "YCallbackOptions", + "class_name": "YCallbackOptions" + } + ] + }, + "migrations": [ + { + "tag": "v1", + "new_sqlite_classes": [ + "YBasic", + "YPersistent", + "YReadOnly", + "YCustomMessage", + "YOnLoadReturnsDoc", + "YCallbackOptions" + ] + } + ] +} diff --git a/packages/y-partyserver/tsconfig.json b/packages/y-partyserver/tsconfig.json new file mode 100644 index 00000000..a432bf58 --- /dev/null +++ b/packages/y-partyserver/tsconfig.json @@ -0,0 +1,12 @@ +{ + "extends": "../../tsconfig.base.json", + "compilerOptions": { + "types": ["@cloudflare/workers-types"] + }, + "exclude": [ + "src/tests/**/*.ts", + "src/provider/**/*.ts", + "src/provider/**/*.tsx", + "scripts/**/*.ts" + ] +}