From 63946a286dfaa380e4fb5379a330fd40261e3b7e Mon Sep 17 00:00:00 2001 From: Sunil Pai Date: Mon, 23 Feb 2026 21:21:51 +0000 Subject: [PATCH 1/5] Add tests and fix provider/server issues Add a comprehensive test suite and related configs for y-partyserver (unit + integration), including global test setup that spawns wrangler dev, Vitest configs, a wrangler integration config, TS test tsconfig, and many test files (index.test.ts, integration.test.ts, worker.ts, global-setup.ts, etc.). Update package.json metadata and scripts (add test, test:integration, check:test), adjust files/peerDependencies/devDependencies ordering. Fix a bug in YProvider where a trailing slash on the host was not being stripped (assign sliced value back to host). Narrow YServer.onLoad return type to Promise to allow returning a Y.Doc from onLoad. Overall this change wires up end-to-end and unit testing infrastructure and includes small API/bug fixes to support tests. --- packages/y-partyserver/package.json | 41 +- packages/y-partyserver/src/provider/index.ts | 2 +- packages/y-partyserver/src/server/index.ts | 2 +- .../y-partyserver/src/tests/global-setup.ts | 78 ++ .../y-partyserver/src/tests/index.test.ts | 849 ++++++++++++++++++ .../src/tests/integration-wrangler.jsonc | 47 + .../src/tests/integration.test.ts | 526 +++++++++++ .../y-partyserver/src/tests/tsconfig.json | 10 + .../y-partyserver/src/tests/vitest.config.ts | 14 + .../src/tests/vitest.integration.config.ts | 9 + packages/y-partyserver/src/tests/worker.ts | 180 ++++ .../y-partyserver/src/tests/wrangler.jsonc | 52 ++ 12 files changed, 1789 insertions(+), 21 deletions(-) create mode 100644 packages/y-partyserver/src/tests/global-setup.ts create mode 100644 packages/y-partyserver/src/tests/index.test.ts create mode 100644 packages/y-partyserver/src/tests/integration-wrangler.jsonc create mode 100644 packages/y-partyserver/src/tests/integration.test.ts create mode 100644 packages/y-partyserver/src/tests/tsconfig.json create mode 100644 packages/y-partyserver/src/tests/vitest.config.ts create mode 100644 packages/y-partyserver/src/tests/vitest.integration.config.ts create mode 100644 packages/y-partyserver/src/tests/worker.ts create mode 100644 packages/y-partyserver/src/tests/wrangler.jsonc diff --git a/packages/y-partyserver/package.json b/packages/y-partyserver/package.json index a3707f1a..ea67bebe 100644 --- a/packages/y-partyserver/package.json +++ b/packages/y-partyserver/package.json @@ -1,11 +1,23 @@ { "name": "y-partyserver", "version": "2.0.0", + "description": "", + "keywords": [ + "collaboration", + "text-editors", + "yjs" + ], + "homepage": "https://github.com/cloudflare/partykit/tree/main/packages/y-partyserver", + "license": "ISC", + "author": "Sunil Pai ", "repository": { "type": "git", "url": "git://github.com/cloudflare/partykit.git" }, - "homepage": "https://github.com/cloudflare/partykit/tree/main/packages/y-partyserver", + "files": [ + "dist", + "README.md" + ], "type": "module", "exports": { ".": { @@ -25,36 +37,27 @@ } }, "scripts": { - "build": "tsx scripts/build.ts" + "build": "tsx scripts/build.ts", + "check:test": "vitest -r src/tests --watch false", + "test": "vitest -r src/tests", + "test:integration": "vitest -r src/tests --config vitest.integration.config.ts --watch false" }, - "files": [ - "dist", - "README.md" - ], - "keywords": [ - "yjs", - "collaboration", - "text-editors" - ], - "author": "Sunil Pai ", - "license": "ISC", - "description": "", "dependencies": { "lib0": "^0.2.115", "lodash.debounce": "^4.0.8", "nanoid": "^5.1.6", "y-protocols": "^1.0.7" }, - "peerDependencies": { - "@cloudflare/workers-types": "^4.20240729.0", - "partyserver": ">=0.2.0 <1.0.0", - "yjs": "^13.6.14" - }, "devDependencies": { "@cloudflare/workers-types": "^4.20251218.0", "@types/lodash.debounce": "^4.0.9", "partyserver": ">=0.2.0 <1.0.0", "ws": "^8.18.3", "yjs": "^13.6.28" + }, + "peerDependencies": { + "@cloudflare/workers-types": "^4.20240729.0", + "partyserver": ">=0.2.0 <1.0.0", + "yjs": "^13.6.14" } } diff --git a/packages/y-partyserver/src/provider/index.ts b/packages/y-partyserver/src/provider/index.ts index 735f8699..75b8755d 100644 --- a/packages/y-partyserver/src/provider/index.ts +++ b/packages/y-partyserver/src/provider/index.ts @@ -567,7 +567,7 @@ export default class YProvider extends WebsocketProvider { // strip trailing slash from host if any if (host.endsWith("/")) { - host.slice(0, -1); + host = host.slice(0, -1); } const serverUrl = `${ diff --git a/packages/y-partyserver/src/server/index.ts b/packages/y-partyserver/src/server/index.ts index 38e41f2b..ddb04327 100644 --- a/packages/y-partyserver/src/server/index.ts +++ b/packages/y-partyserver/src/server/index.ts @@ -182,7 +182,7 @@ export class YServer< #ParentClass: typeof YServer = Object.getPrototypeOf(this).constructor; readonly document: WSSharedDoc = new WSSharedDoc(); - async onLoad(): Promise { + async onLoad(): Promise { // to be implemented by the user return; } diff --git a/packages/y-partyserver/src/tests/global-setup.ts b/packages/y-partyserver/src/tests/global-setup.ts new file mode 100644 index 00000000..90373c05 --- /dev/null +++ b/packages/y-partyserver/src/tests/global-setup.ts @@ -0,0 +1,78 @@ +import { spawn } from "node:child_process"; +import type { ChildProcess } from "node:child_process"; +import path from "node:path"; + +const PORT = 8799; +let wranglerProcess: ChildProcess | null = null; + +async function waitForServer(url: string, timeoutMs = 30000): Promise { + const start = Date.now(); + while (Date.now() - start < timeoutMs) { + try { + const res = await fetch(url); + if (res.ok || res.status === 404) { + return; + } + } catch { + // Server not ready yet + } + await new Promise((r) => setTimeout(r, 200)); + } + throw new Error(`Server at ${url} did not start within ${timeoutMs}ms`); +} + +export async function setup() { + const testDir = path.dirname(new URL(import.meta.url).pathname); + + // Start wrangler dev + wranglerProcess = spawn( + "npx", + [ + "wrangler", + "dev", + "--config", + path.join(testDir, "integration-wrangler.jsonc"), + "--port", + String(PORT), + "--no-show-interactive-dev-session" + ], + { + cwd: testDir, + stdio: ["pipe", "pipe", "pipe"], + env: { + ...process.env, + // Suppress interactive prompts + BROWSER: "none" + } + } + ); + + // Log wrangler output for debugging + wranglerProcess.stdout?.on("data", (data: Buffer) => { + const msg = data.toString(); + if (process.env.DEBUG) { + process.stderr.write(`[wrangler] ${msg}`); + } + }); + wranglerProcess.stderr?.on("data", (data: Buffer) => { + const msg = data.toString(); + if (process.env.DEBUG) { + process.stderr.write(`[wrangler:err] ${msg}`); + } + }); + + // Wait for the server to be ready + await waitForServer(`http://localhost:${PORT}/`); +} + +export async function teardown() { + if (wranglerProcess) { + wranglerProcess.kill("SIGTERM"); + // Give it a moment to shut down gracefully + await new Promise((r) => setTimeout(r, 500)); + if (wranglerProcess.exitCode === null) { + wranglerProcess.kill("SIGKILL"); + } + wranglerProcess = null; + } +} diff --git a/packages/y-partyserver/src/tests/index.test.ts b/packages/y-partyserver/src/tests/index.test.ts new file mode 100644 index 00000000..9ad40173 --- /dev/null +++ b/packages/y-partyserver/src/tests/index.test.ts @@ -0,0 +1,849 @@ +import { createExecutionContext, env } from "cloudflare:test"; +import * as encoding from "lib0/encoding"; +import * as decoding from "lib0/decoding"; +import * as syncProtocol from "y-protocols/sync"; +import * as awarenessProtocol from "y-protocols/awareness"; +import * as Y from "yjs"; +import { describe, expect, it } from "vitest"; + +import worker from "./worker"; + +import type { Env } from "./worker"; + +declare module "cloudflare:test" { + interface ProvidedEnv extends Env {} +} + +// --------------------------------------------------------------------------- +// Yjs protocol constants (must match server) +// --------------------------------------------------------------------------- +const messageSync = 0; +const messageAwareness = 1; + +// --------------------------------------------------------------------------- +// Helpers +// --------------------------------------------------------------------------- + +/** Create a WebSocket upgrade request for the given DO path */ +function wsRequest(path: string): Request { + return new Request(`http://example.com/parties/${path}`, { + headers: { Upgrade: "websocket" } + }); +} + +/** Create a regular HTTP request for the given DO path */ +function httpRequest(path: string): Request { + return new Request(`http://example.com/parties/${path}`); +} + +/** Accept a WebSocket from a fetch response */ +function acceptWs(response: Response): WebSocket { + const ws = response.webSocket!; + ws.accept(); + return ws; +} + +/** Wait for the next binary message on a WebSocket */ +function nextBinaryMessage(ws: WebSocket): Promise { + return new Promise((resolve, reject) => { + const timeout = setTimeout( + () => reject(new Error("Timed out waiting for binary message")), + 5000 + ); + const handler = (event: MessageEvent) => { + if (event.data instanceof ArrayBuffer) { + clearTimeout(timeout); + ws.removeEventListener("message", handler); + resolve(event.data); + } + // skip string messages, keep listening + }; + ws.addEventListener("message", handler); + }); +} + +/** Wait for the next string message on a WebSocket */ +function nextStringMessage(ws: WebSocket): Promise { + return new Promise((resolve, reject) => { + const timeout = setTimeout( + () => reject(new Error("Timed out waiting for string message")), + 5000 + ); + const handler = (event: MessageEvent) => { + if (typeof event.data === "string") { + clearTimeout(timeout); + ws.removeEventListener("message", handler); + resolve(event.data); + } + // skip binary messages, keep listening + }; + ws.addEventListener("message", handler); + }); +} + +/** Collect all messages for a short period */ +function collectMessages( + ws: WebSocket, + durationMs: number +): Promise> { + return new Promise((resolve) => { + const messages: Array = []; + const handler = (event: MessageEvent) => { + messages.push(event.data as string | ArrayBuffer); + }; + ws.addEventListener("message", handler); + setTimeout(() => { + ws.removeEventListener("message", handler); + resolve(messages); + }, durationMs); + }); +} + +/** + * Perform the Yjs sync handshake on a WebSocket. + * Reads the server's sync step 1, responds with sync step 2 + our sync step 1, + * then reads the server's sync step 2. + */ +async function performSync(ws: WebSocket, doc: Y.Doc): Promise { + // The server sends sync step 1 on connect — read it + const msg1 = await nextBinaryMessage(ws); + const decoder1 = decoding.createDecoder(new Uint8Array(msg1)); + const msgType1 = decoding.readVarUint(decoder1); + expect(msgType1).toBe(messageSync); + + // Process server's sync step 1 and generate our response + const encoder1 = encoding.createEncoder(); + encoding.writeVarUint(encoder1, messageSync); + syncProtocol.readSyncMessage(decoder1, encoder1, doc, null); + + // Send our sync step 2 (response to server's step 1) + if (encoding.length(encoder1) > 1) { + ws.send(encoding.toUint8Array(encoder1)); + } + + // Also send our sync step 1 + const encoder2 = encoding.createEncoder(); + encoding.writeVarUint(encoder2, messageSync); + syncProtocol.writeSyncStep1(encoder2, doc); + ws.send(encoding.toUint8Array(encoder2)); +} + +/** + * Send a Yjs update to the server via the WebSocket + */ +function sendUpdate(ws: WebSocket, update: Uint8Array): void { + const encoder = encoding.createEncoder(); + encoding.writeVarUint(encoder, messageSync); + syncProtocol.writeUpdate(encoder, update); + ws.send(encoding.toUint8Array(encoder)); +} + +/** + * Apply any incoming binary Yjs messages to a local doc. + * Returns the number of messages applied. + */ +function applyIncomingMessages( + messages: Array, + doc: Y.Doc +): number { + let applied = 0; + for (const msg of messages) { + if (!(msg instanceof ArrayBuffer)) continue; + const decoder = decoding.createDecoder(new Uint8Array(msg)); + const msgType = decoding.readVarUint(decoder); + if (msgType === messageSync) { + const responseEncoder = encoding.createEncoder(); + encoding.writeVarUint(responseEncoder, messageSync); + syncProtocol.readSyncMessage(decoder, responseEncoder, doc, null); + applied++; + } + } + return applied; +} + +// =========================================================================== +// Tests +// =========================================================================== + +describe("YServer — basic sync", () => { + it("accepts a WebSocket connection and sends sync step 1", async () => { + const ctx = createExecutionContext(); + const response = await worker.fetch(wsRequest("y-basic/room1"), env, ctx); + expect(response.status).toBe(101); + const ws = acceptWs(response); + + // Server should send sync step 1 immediately + const msg = await nextBinaryMessage(ws); + const decoder = decoding.createDecoder(new Uint8Array(msg)); + const msgType = decoding.readVarUint(decoder); + expect(msgType).toBe(messageSync); + + // The inner message should be sync step 1 + const syncMsgType = decoding.readVarUint(decoder); + expect(syncMsgType).toBe(syncProtocol.messageYjsSyncStep1); + + ws.close(); + }); + + it("syncs a document between two clients", async () => { + const ctx = createExecutionContext(); + const roomName = "sync-two-clients"; + + // --- Client A connects and inserts text --- + const resA = await worker.fetch(wsRequest(`y-basic/${roomName}`), env, ctx); + const wsA = acceptWs(resA); + const docA = new Y.Doc(); + + await performSync(wsA, docA); + + // Insert text on client A + docA.getText("shared").insert(0, "hello from A"); + + // docA will have generated an update — send it to the server + const updateA = Y.encodeStateAsUpdate(docA); + sendUpdate(wsA, updateA); + + // Give the server a moment to process + await new Promise((r) => setTimeout(r, 100)); + + // --- Client B connects and should receive A's state --- + const resB = await worker.fetch(wsRequest(`y-basic/${roomName}`), env, ctx); + const wsB = acceptWs(resB); + const docB = new Y.Doc(); + + await performSync(wsB, docB); + + // Collect messages for a bit to get sync step 2 from server + const messagesB = await collectMessages(wsB, 200); + applyIncomingMessages(messagesB, docB); + + expect(docB.getText("shared").toString()).toBe("hello from A"); + + wsA.close(); + wsB.close(); + }); + + it("broadcasts updates from one client to another", async () => { + const ctx = createExecutionContext(); + const roomName = "broadcast-test"; + + // Connect client A + const resA = await worker.fetch(wsRequest(`y-basic/${roomName}`), env, ctx); + const wsA = acceptWs(resA); + const docA = new Y.Doc(); + await performSync(wsA, docA); + + // Connect client B + const resB = await worker.fetch(wsRequest(`y-basic/${roomName}`), env, ctx); + const wsB = acceptWs(resB); + const docB = new Y.Doc(); + await performSync(wsB, docB); + + // Drain any initial sync messages on B + await collectMessages(wsB, 100); + + // Client A inserts text — capture the incremental update + const updatePromise = new Promise((resolve) => { + docA.on("update", (update: Uint8Array) => { + resolve(update); + }); + }); + docA.getText("shared").insert(0, "broadcast!"); + const incrementalUpdate = await updatePromise; + + // Send just the incremental update to the server + sendUpdate(wsA, incrementalUpdate); + + // Client B should receive the broadcast + const messagesB = await collectMessages(wsB, 300); + applyIncomingMessages(messagesB, docB); + + expect(docB.getText("shared").toString()).toBe("broadcast!"); + + wsA.close(); + wsB.close(); + }); +}); + +describe("YServer — awareness", () => { + it("broadcasts awareness updates between clients", async () => { + const ctx = createExecutionContext(); + const roomName = "awareness-test"; + + // Connect client A + const resA = await worker.fetch(wsRequest(`y-basic/${roomName}`), env, ctx); + const wsA = acceptWs(resA); + const docA = new Y.Doc(); + const awarenessA = new awarenessProtocol.Awareness(docA); + + await performSync(wsA, docA); + + // Connect client B + const resB = await worker.fetch(wsRequest(`y-basic/${roomName}`), env, ctx); + const wsB = acceptWs(resB); + const docB = new Y.Doc(); + const awarenessB = new awarenessProtocol.Awareness(docB); + + await performSync(wsB, docB); + // Drain initial messages + await collectMessages(wsB, 100); + + // Set awareness state on client A + awarenessA.setLocalState({ user: { name: "Alice" } }); + + // Encode and send awareness update + const awarenessEncoder = encoding.createEncoder(); + encoding.writeVarUint(awarenessEncoder, messageAwareness); + encoding.writeVarUint8Array( + awarenessEncoder, + awarenessProtocol.encodeAwarenessUpdate(awarenessA, [docA.clientID]) + ); + wsA.send(encoding.toUint8Array(awarenessEncoder)); + + // Client B should receive the awareness update + const messagesB = await collectMessages(wsB, 300); + let awarenessReceived = false; + for (const msg of messagesB) { + if (!(msg instanceof ArrayBuffer)) continue; + const decoder = decoding.createDecoder(new Uint8Array(msg)); + const msgType = decoding.readVarUint(decoder); + if (msgType === messageAwareness) { + awarenessProtocol.applyAwarenessUpdate( + awarenessB, + decoding.readVarUint8Array(decoder), + null + ); + awarenessReceived = true; + } + } + + expect(awarenessReceived).toBe(true); + const stateA = awarenessB.getStates().get(docA.clientID); + expect(stateA).toBeDefined(); + expect((stateA as { user: { name: string } }).user.name).toBe("Alice"); + + wsA.close(); + wsB.close(); + }); +}); + +describe("YServer — persistence (onLoad / onSave)", () => { + it("persists document state via onSave and restores via onLoad", async () => { + const ctx = createExecutionContext(); + const roomName = "persist-test"; + + // --- Session 1: Connect, write data, then disconnect --- + { + const res = await worker.fetch( + wsRequest(`y-persistent/${roomName}`), + env, + ctx + ); + const ws = acceptWs(res); + const doc = new Y.Doc(); + await performSync(ws, doc); + + // Insert text + const updatePromise = new Promise((resolve) => { + doc.on("update", (update: Uint8Array) => resolve(update)); + }); + doc.getText("shared").insert(0, "persisted!"); + const update = await updatePromise; + sendUpdate(ws, update); + + // Wait for debounced onSave to fire (debounceWait: 50, maxWait: 100) + await new Promise((r) => setTimeout(r, 250)); + + ws.close(); + } + + // Small gap between sessions + await new Promise((r) => setTimeout(r, 100)); + + // --- Session 2: Reconnect and verify state was loaded --- + { + const res = await worker.fetch( + wsRequest(`y-persistent/${roomName}`), + env, + ctx + ); + const ws = acceptWs(res); + const doc = new Y.Doc(); + await performSync(ws, doc); + + // Collect sync step 2 from server which should contain the persisted state + const messages = await collectMessages(ws, 300); + applyIncomingMessages(messages, doc); + + expect(doc.getText("shared").toString()).toBe("persisted!"); + ws.close(); + } + }); +}); + +describe("YServer — read-only mode", () => { + it("accepts connections but rejects write updates", async () => { + const ctx = createExecutionContext(); + const roomName = "readonly-test"; + + // Connect to the read-only server + const res = await worker.fetch( + wsRequest(`y-read-only/${roomName}`), + env, + ctx + ); + const ws = acceptWs(res); + const doc = new Y.Doc(); + + // Should receive the "connected:readonly" string message + const stringMsg = nextStringMessage(ws); + + await performSync(ws, doc); + + expect(await stringMsg).toBe("connected:readonly"); + + // Try to send an update — it should be silently ignored + const updatePromise = new Promise((resolve) => { + doc.on("update", (update: Uint8Array) => resolve(update)); + }); + doc.getText("shared").insert(0, "should-be-rejected"); + const update = await updatePromise; + sendUpdate(ws, update); + + // Wait for any potential broadcast + await new Promise((r) => setTimeout(r, 200)); + + // Connect a second client to check the server document is still empty + const res2 = await worker.fetch( + wsRequest(`y-read-only/${roomName}`), + env, + ctx + ); + const ws2 = acceptWs(res2); + const doc2 = new Y.Doc(); + await performSync(ws2, doc2); + const messages2 = await collectMessages(ws2, 200); + applyIncomingMessages(messages2, doc2); + + // The server doc should be empty since the write was rejected + expect(doc2.getText("shared").toString()).toBe(""); + + ws.close(); + ws2.close(); + }); +}); + +describe("YServer — custom messages", () => { + it("handles ping/pong custom messages", async () => { + const ctx = createExecutionContext(); + const roomName = "custom-msg-test"; + + const res = await worker.fetch( + wsRequest(`y-custom-message/${roomName}`), + env, + ctx + ); + const ws = acceptWs(res); + + // Drain initial sync messages + await collectMessages(ws, 100); + + // Send a custom ping message using the __YPS: prefix + ws.send(`__YPS:${JSON.stringify({ action: "ping" })}`); + + // Should receive a pong back + const pong = await nextStringMessage(ws); + expect(pong.startsWith("__YPS:")).toBe(true); + const pongData = JSON.parse(pong.slice(6)); + expect(pongData).toEqual({ action: "pong" }); + + ws.close(); + }); + + it("broadcasts custom messages to other clients", async () => { + const ctx = createExecutionContext(); + const roomName = "custom-broadcast-test"; + + // Connect client A + const resA = await worker.fetch( + wsRequest(`y-custom-message/${roomName}`), + env, + ctx + ); + const wsA = acceptWs(resA); + await collectMessages(wsA, 100); + + // Connect client B + const resB = await worker.fetch( + wsRequest(`y-custom-message/${roomName}`), + env, + ctx + ); + const wsB = acceptWs(resB); + await collectMessages(wsB, 100); + + // Client A sends a broadcast request + wsA.send(`__YPS:${JSON.stringify({ action: "broadcast" })}`); + + // Client B should receive the broadcasted message + const msg = await nextStringMessage(wsB); + expect(msg.startsWith("__YPS:")).toBe(true); + const data = JSON.parse(msg.slice(6)); + expect(data).toEqual({ action: "broadcasted" }); + + // Client A should NOT receive the broadcast (excluded) + const msgsA = await collectMessages(wsA, 200); + const customMsgsA = msgsA.filter( + (m) => typeof m === "string" && m.includes("broadcasted") + ); + expect(customMsgsA).toHaveLength(0); + + wsA.close(); + wsB.close(); + }); + + it("handles non-prefixed string messages gracefully", async () => { + const ctx = createExecutionContext(); + const roomName = "custom-nopfx-test"; + + const res = await worker.fetch( + wsRequest(`y-custom-message/${roomName}`), + env, + ctx + ); + const ws = acceptWs(res); + await collectMessages(ws, 100); + + // Send a string message without __YPS: prefix — should be ignored, not crash + ws.send("hello without prefix"); + + // Give the server time to process; it should log a warning but not crash + await new Promise((r) => setTimeout(r, 100)); + + // Connection should still be alive — verify by sending a valid message + ws.send(`__YPS:${JSON.stringify({ action: "echo" })}`); + const echoMsg = await nextStringMessage(ws); + expect(echoMsg.startsWith("__YPS:")).toBe(true); + const echoData = JSON.parse(echoMsg.slice(6)); + expect(echoData).toEqual({ action: "echo" }); + + ws.close(); + }); +}); + +describe("YServer — onLoad returns a YDoc", () => { + it("seeds the document from a returned YDoc", async () => { + const ctx = createExecutionContext(); + const roomName = "onload-return-doc"; + + const res = await worker.fetch( + wsRequest(`y-on-load-returns-doc/${roomName}`), + env, + ctx + ); + const ws = acceptWs(res); + const doc = new Y.Doc(); + await performSync(ws, doc); + + // Collect messages to get the server's document state + const messages = await collectMessages(ws, 300); + applyIncomingMessages(messages, doc); + + // The server should have the seeded content from onLoad + expect(doc.getText("shared").toString()).toBe("seeded-content"); + + ws.close(); + }); +}); + +describe("YServer — onSave callback options", () => { + it("calls onSave after debounce period", async () => { + const ctx = createExecutionContext(); + const roomName = "callback-opts-test"; + + const res = await worker.fetch( + wsRequest(`y-callback-options/${roomName}`), + env, + ctx + ); + const ws = acceptWs(res); + const doc = new Y.Doc(); + await performSync(ws, doc); + + // Send an update to trigger the debounced onSave + const updatePromise = new Promise((resolve) => { + doc.on("update", (update: Uint8Array) => resolve(update)); + }); + doc.getText("shared").insert(0, "trigger-save"); + const update = await updatePromise; + sendUpdate(ws, update); + + // Wait for debounce to fire (debounceWait: 50ms, maxWait: 100ms) + await new Promise((r) => setTimeout(r, 250)); + + // Check saveCount via HTTP + const httpRes = await worker.fetch( + httpRequest(`y-callback-options/${roomName}`), + env, + ctx + ); + const data = (await httpRes.json()) as { saveCount: number }; + expect(data.saveCount).toBeGreaterThanOrEqual(1); + + ws.close(); + }); +}); + +describe("YServer — connection lifecycle", () => { + it("cleans up awareness on connection close", async () => { + const ctx = createExecutionContext(); + const roomName = "cleanup-test"; + + // Connect client A + const resA = await worker.fetch(wsRequest(`y-basic/${roomName}`), env, ctx); + const wsA = acceptWs(resA); + const docA = new Y.Doc(); + const awarenessA = new awarenessProtocol.Awareness(docA); + await performSync(wsA, docA); + + // Set awareness state on client A + awarenessA.setLocalState({ user: { name: "Alice" } }); + const awarenessEncoder = encoding.createEncoder(); + encoding.writeVarUint(awarenessEncoder, messageAwareness); + encoding.writeVarUint8Array( + awarenessEncoder, + awarenessProtocol.encodeAwarenessUpdate(awarenessA, [docA.clientID]) + ); + wsA.send(encoding.toUint8Array(awarenessEncoder)); + await new Promise((r) => setTimeout(r, 100)); + + // Connect client B + const resB = await worker.fetch(wsRequest(`y-basic/${roomName}`), env, ctx); + const wsB = acceptWs(resB); + const docB = new Y.Doc(); + const awarenessB = new awarenessProtocol.Awareness(docB); + await performSync(wsB, docB); + + // Client B should receive awareness with Alice + const msgsB1 = await collectMessages(wsB, 200); + for (const msg of msgsB1) { + if (!(msg instanceof ArrayBuffer)) continue; + const decoder = decoding.createDecoder(new Uint8Array(msg)); + const msgType = decoding.readVarUint(decoder); + if (msgType === messageAwareness) { + awarenessProtocol.applyAwarenessUpdate( + awarenessB, + decoding.readVarUint8Array(decoder), + null + ); + } + } + expect(awarenessB.getStates().get(docA.clientID)).toBeDefined(); + + // Now close client A + wsA.close(); + + // Client B should receive an awareness removal for A + const msgsB2 = await collectMessages(wsB, 300); + for (const msg of msgsB2) { + if (!(msg instanceof ArrayBuffer)) continue; + const decoder = decoding.createDecoder(new Uint8Array(msg)); + const msgType = decoding.readVarUint(decoder); + if (msgType === messageAwareness) { + awarenessProtocol.applyAwarenessUpdate( + awarenessB, + decoding.readVarUint8Array(decoder), + null + ); + } + } + + // A's awareness state should be removed + expect(awarenessB.getStates().get(docA.clientID)).toBeUndefined(); + + wsB.close(); + }); + + it("handles multiple concurrent connections to the same room", async () => { + const ctx = createExecutionContext(); + const roomName = "concurrent-test"; + + const clients: Array<{ ws: WebSocket; doc: Y.Doc }> = []; + + // Connect 3 clients + for (let i = 0; i < 3; i++) { + const res = await worker.fetch( + wsRequest(`y-basic/${roomName}`), + env, + ctx + ); + const ws = acceptWs(res); + const doc = new Y.Doc(); + await performSync(ws, doc); + clients.push({ ws, doc }); + } + + // Drain initial sync messages + for (const client of clients) { + await collectMessages(client.ws, 100); + } + + // Start collecting on clients 1 and 2 BEFORE sending the update + // so we don't miss the broadcast + const collectPromises = [ + collectMessages(clients[1].ws, 500), + collectMessages(clients[2].ws, 500) + ]; + + // Client 0 inserts text + const updatePromise = new Promise((resolve) => { + clients[0].doc.on("update", (update: Uint8Array) => resolve(update)); + }); + clients[0].doc.getText("shared").insert(0, "from-client-0"); + const update = await updatePromise; + sendUpdate(clients[0].ws, update); + + // Wait for collected messages + const [msgs1, msgs2] = await Promise.all(collectPromises); + applyIncomingMessages(msgs1, clients[1].doc); + applyIncomingMessages(msgs2, clients[2].doc); + + expect(clients[1].doc.getText("shared").toString()).toBe("from-client-0"); + expect(clients[2].doc.getText("shared").toString()).toBe("from-client-0"); + + for (const client of clients) { + client.ws.close(); + } + }); +}); + +describe("YServer — handleMessage binary conversion", () => { + it("handles ArrayBuffer messages correctly", async () => { + const ctx = createExecutionContext(); + const roomName = "arraybuffer-test"; + + const res = await worker.fetch(wsRequest(`y-basic/${roomName}`), env, ctx); + const ws = acceptWs(res); + const doc = new Y.Doc(); + await performSync(ws, doc); + + // Send an update as a raw ArrayBuffer (not Uint8Array) + doc.getText("shared").insert(0, "arraybuffer-data"); + const update = Y.encodeStateAsUpdate(doc); + + const encoder = encoding.createEncoder(); + encoding.writeVarUint(encoder, messageSync); + syncProtocol.writeUpdate(encoder, update); + const raw = encoding.toUint8Array(encoder); + + // Send as ArrayBuffer + ws.send(raw.buffer); + + // Give the server time to process + await new Promise((r) => setTimeout(r, 100)); + + // Verify by connecting a second client + const res2 = await worker.fetch(wsRequest(`y-basic/${roomName}`), env, ctx); + const ws2 = acceptWs(res2); + const doc2 = new Y.Doc(); + await performSync(ws2, doc2); + const msgs = await collectMessages(ws2, 200); + applyIncomingMessages(msgs, doc2); + + expect(doc2.getText("shared").toString()).toBe("arraybuffer-data"); + + ws.close(); + ws2.close(); + }); +}); + +describe("YProvider — URL construction", () => { + // These are unit tests for the provider's URL logic; they don't need + // a running server. We import directly and test construction. + it("strips protocol from host", async () => { + // We test YProvider construction by importing and checking the url property. + // We pass connect: false so it doesn't try to actually connect. + const { default: YProvider } = await import("../provider/index"); + const doc = new Y.Doc(); + const provider = new YProvider("https://example.com", "my-room", doc, { + connect: false, + WebSocketPolyfill: null + }); + expect(provider.url).toContain("wss://example.com"); + expect(provider.url).toContain("my-room"); + expect(provider.url).not.toContain("https://"); + provider.destroy(); + }); + + it("strips trailing slash from host (bug fix)", async () => { + const { default: YProvider } = await import("../provider/index"); + const doc = new Y.Doc(); + const provider = new YProvider("example.com/", "my-room", doc, { + connect: false, + WebSocketPolyfill: null + }); + // Should not have double slashes from un-stripped trailing slash + expect(provider.url).not.toContain("com//"); + expect(provider.url).toContain("wss://example.com/"); + provider.destroy(); + }); + + it("uses ws:// for localhost", async () => { + const { default: YProvider } = await import("../provider/index"); + const doc = new Y.Doc(); + const provider = new YProvider("localhost:8787", "room", doc, { + connect: false, + WebSocketPolyfill: null + }); + expect(provider.url).toMatch(/^ws:\/\/localhost:8787/); + provider.destroy(); + }); + + it("uses ws:// for 127.0.0.1", async () => { + const { default: YProvider } = await import("../provider/index"); + const doc = new Y.Doc(); + const provider = new YProvider("127.0.0.1:8787", "room", doc, { + connect: false, + WebSocketPolyfill: null + }); + expect(provider.url).toMatch(/^ws:\/\/127\.0\.0\.1:8787/); + provider.destroy(); + }); + + it("respects explicit protocol option", async () => { + const { default: YProvider } = await import("../provider/index"); + const doc = new Y.Doc(); + const provider = new YProvider("example.com", "room", doc, { + connect: false, + protocol: "ws", + WebSocketPolyfill: null + }); + expect(provider.url).toMatch(/^ws:\/\/example\.com/); + provider.destroy(); + }); + + it("uses custom party name in URL path", async () => { + const { default: YProvider } = await import("../provider/index"); + const doc = new Y.Doc(); + const provider = new YProvider("example.com", "room", doc, { + connect: false, + party: "my-party", + WebSocketPolyfill: null + }); + expect(provider.url).toContain("/parties/my-party/"); + provider.destroy(); + }); + + it("uses custom prefix instead of /parties/:party/", async () => { + const { default: YProvider } = await import("../provider/index"); + const doc = new Y.Doc(); + const provider = new YProvider("example.com", "room", doc, { + connect: false, + prefix: "/custom/path", + WebSocketPolyfill: null + }); + expect(provider.url).toContain("/custom/path"); + expect(provider.url).not.toContain("/parties/"); + provider.destroy(); + }); +}); diff --git a/packages/y-partyserver/src/tests/integration-wrangler.jsonc b/packages/y-partyserver/src/tests/integration-wrangler.jsonc new file mode 100644 index 00000000..52a363ba --- /dev/null +++ b/packages/y-partyserver/src/tests/integration-wrangler.jsonc @@ -0,0 +1,47 @@ +{ + "name": "y-partyserver-integration", + "main": "./worker.ts", + "compatibility_date": "2025-12-10", + "compatibility_flags": ["nodejs_compat"], + "durable_objects": { + "bindings": [ + { + "name": "YBasic", + "class_name": "YBasic" + }, + { + "name": "YPersistent", + "class_name": "YPersistent" + }, + { + "name": "YReadOnly", + "class_name": "YReadOnly" + }, + { + "name": "YCustomMessage", + "class_name": "YCustomMessage" + }, + { + "name": "YOnLoadReturnsDoc", + "class_name": "YOnLoadReturnsDoc" + }, + { + "name": "YCallbackOptions", + "class_name": "YCallbackOptions" + } + ] + }, + "migrations": [ + { + "tag": "v1", + "new_sqlite_classes": [ + "YBasic", + "YPersistent", + "YReadOnly", + "YCustomMessage", + "YOnLoadReturnsDoc", + "YCallbackOptions" + ] + } + ] +} diff --git a/packages/y-partyserver/src/tests/integration.test.ts b/packages/y-partyserver/src/tests/integration.test.ts new file mode 100644 index 00000000..52b2c4d1 --- /dev/null +++ b/packages/y-partyserver/src/tests/integration.test.ts @@ -0,0 +1,526 @@ +import WebSocket from "ws"; +import * as Y from "yjs"; +import { afterEach, describe, expect, it } from "vitest"; + +import YProvider from "../provider/index"; + +const PORT = 8799; +const HOST = `localhost:${PORT}`; + +// Track providers for cleanup +const providers: YProvider[] = []; + +function createProvider( + room: string, + options: { + party?: string; + doc?: Y.Doc; + connect?: boolean; + } = {} +): YProvider { + const doc = options.doc ?? new Y.Doc(); + const provider = new YProvider(HOST, room, doc, { + party: options.party ?? "y-basic", + connect: options.connect ?? true, + // Use ws as the WebSocket polyfill for Node.js + WebSocketPolyfill: WebSocket as unknown as typeof globalThis.WebSocket, + disableBc: true + }); + providers.push(provider); + return provider; +} + +function waitForSync(provider: YProvider): Promise { + return new Promise((resolve, reject) => { + if (provider.synced) { + resolve(); + return; + } + const timeout = setTimeout( + () => reject(new Error("Timed out waiting for sync")), + 10000 + ); + provider.on("synced", () => { + clearTimeout(timeout); + resolve(); + }); + }); +} + +function waitForConnection(provider: YProvider): Promise { + return new Promise((resolve, reject) => { + if (provider.wsconnected) { + resolve(); + return; + } + const timeout = setTimeout( + () => reject(new Error("Timed out waiting for connection")), + 10000 + ); + provider.on("status", (event: { status: string }) => { + if (event.status === "connected") { + clearTimeout(timeout); + resolve(); + } + }); + }); +} + +function waitForCustomMessage(provider: YProvider): Promise { + return new Promise((resolve, reject) => { + const timeout = setTimeout( + () => reject(new Error("Timed out waiting for custom message")), + 10000 + ); + provider.on("custom-message", (message: string) => { + clearTimeout(timeout); + resolve(message); + }); + }); +} + +afterEach(() => { + // Destroy all providers created during the test + for (const p of providers) { + try { + p.destroy(); + } catch { + // ignore + } + } + providers.length = 0; +}); + +// --------------------------------------------------------------------------- +// Integration tests — real wrangler dev server +// --------------------------------------------------------------------------- + +describe("Integration — YProvider ↔ YServer sync", () => { + it("connects and syncs a document", async () => { + const room = `sync-basic-${Date.now()}`; + const provider = createProvider(room); + + await waitForSync(provider); + + expect(provider.synced).toBe(true); + expect(provider.wsconnected).toBe(true); + }); + + it("syncs text between two providers", async () => { + const room = `sync-two-${Date.now()}`; + + // Provider A connects and writes + const docA = new Y.Doc(); + const providerA = createProvider(room, { doc: docA }); + await waitForSync(providerA); + + docA.getText("shared").insert(0, "hello from A"); + + // Give server time to process the update + await new Promise((r) => setTimeout(r, 300)); + + // Provider B connects and should receive A's state + const docB = new Y.Doc(); + const providerB = createProvider(room, { doc: docB }); + await waitForSync(providerB); + + // Wait a bit for sync step 2 to arrive + await new Promise((r) => setTimeout(r, 500)); + + expect(docB.getText("shared").toString()).toBe("hello from A"); + }); + + it("broadcasts live updates between connected providers", async () => { + const room = `sync-live-${Date.now()}`; + + const docA = new Y.Doc(); + const providerA = createProvider(room, { doc: docA }); + await waitForSync(providerA); + + const docB = new Y.Doc(); + const providerB = createProvider(room, { doc: docB }); + await waitForSync(providerB); + + // Wait for initial sync to settle + await new Promise((r) => setTimeout(r, 300)); + + // A writes, B should receive the update in real-time + const updateReceived = new Promise((resolve) => { + docB.on("update", () => { + if (docB.getText("shared").toString() === "live-update") { + resolve(); + } + }); + }); + + docA.getText("shared").insert(0, "live-update"); + + await updateReceived; + expect(docB.getText("shared").toString()).toBe("live-update"); + }); + + it("handles concurrent edits from multiple providers", async () => { + const room = `sync-concurrent-${Date.now()}`; + + const docA = new Y.Doc(); + const providerA = createProvider(room, { doc: docA }); + await waitForSync(providerA); + + const docB = new Y.Doc(); + const providerB = createProvider(room, { doc: docB }); + await waitForSync(providerB); + + await new Promise((r) => setTimeout(r, 300)); + + // Both insert concurrently + docA.getText("shared").insert(0, "A"); + docB.getText("shared").insert(0, "B"); + + // Wait for convergence + await new Promise((r) => setTimeout(r, 1000)); + + // Both docs should converge to the same content (Yjs CRDT) + const textA = docA.getText("shared").toString(); + const textB = docB.getText("shared").toString(); + expect(textA).toBe(textB); + expect(textA).toHaveLength(2); + expect(textA).toContain("A"); + expect(textA).toContain("B"); + }); + + it("syncs Map types", async () => { + const room = `sync-map-${Date.now()}`; + + const docA = new Y.Doc(); + const providerA = createProvider(room, { doc: docA }); + await waitForSync(providerA); + + docA.getMap("config").set("key1", "value1"); + docA.getMap("config").set("key2", 42); + + await new Promise((r) => setTimeout(r, 300)); + + const docB = new Y.Doc(); + const providerB = createProvider(room, { doc: docB }); + await waitForSync(providerB); + + await new Promise((r) => setTimeout(r, 500)); + + expect(docB.getMap("config").get("key1")).toBe("value1"); + expect(docB.getMap("config").get("key2")).toBe(42); + }); + + it("syncs Array types", async () => { + const room = `sync-array-${Date.now()}`; + + const docA = new Y.Doc(); + const providerA = createProvider(room, { doc: docA }); + await waitForSync(providerA); + + docA.getArray("items").push(["item1", "item2", "item3"]); + + await new Promise((r) => setTimeout(r, 300)); + + const docB = new Y.Doc(); + const providerB = createProvider(room, { doc: docB }); + await waitForSync(providerB); + + await new Promise((r) => setTimeout(r, 500)); + + expect(docB.getArray("items").toJSON()).toEqual([ + "item1", + "item2", + "item3" + ]); + }); +}); + +describe("Integration — awareness", () => { + it("shares awareness state between providers", async () => { + const room = `awareness-${Date.now()}`; + + const docA = new Y.Doc(); + const providerA = createProvider(room, { doc: docA }); + await waitForSync(providerA); + + const docB = new Y.Doc(); + const providerB = createProvider(room, { doc: docB }); + await waitForSync(providerB); + + await new Promise((r) => setTimeout(r, 300)); + + // Set awareness on A + providerA.awareness.setLocalState({ + user: { name: "Alice", color: "#ff0000" } + }); + + // Wait for awareness to propagate + await new Promise((resolve) => { + const check = () => { + const stateA = providerB.awareness.getStates().get(docA.clientID); + if (stateA) { + resolve(); + } else { + setTimeout(check, 100); + } + }; + check(); + }); + + const stateA = providerB.awareness.getStates().get(docA.clientID) as { + user: { name: string; color: string }; + }; + expect(stateA.user.name).toBe("Alice"); + expect(stateA.user.color).toBe("#ff0000"); + }); + + it("cleans up awareness when a provider disconnects", async () => { + const room = `awareness-cleanup-${Date.now()}`; + + const docA = new Y.Doc(); + const providerA = createProvider(room, { doc: docA }); + await waitForSync(providerA); + + const docB = new Y.Doc(); + const providerB = createProvider(room, { doc: docB }); + await waitForSync(providerB); + + await new Promise((r) => setTimeout(r, 300)); + + providerA.awareness.setLocalState({ user: { name: "Alice" } }); + await new Promise((r) => setTimeout(r, 500)); + + // B should see A's awareness + expect(providerB.awareness.getStates().get(docA.clientID)).toBeDefined(); + + // Disconnect A + providerA.disconnect(); + await new Promise((r) => setTimeout(r, 1000)); + + // B should no longer see A's awareness + expect(providerB.awareness.getStates().get(docA.clientID)).toBeUndefined(); + }); +}); + +describe("Integration — custom messages", () => { + it("sends and receives custom ping/pong messages", async () => { + const room = `custom-ping-${Date.now()}`; + const provider = createProvider(room, { party: "y-custom-message" }); + await waitForSync(provider); + + // Listen for the pong before sending ping + const pongPromise = waitForCustomMessage(provider); + + // Send ping + provider.sendMessage(JSON.stringify({ action: "ping" })); + + const pong = await pongPromise; + expect(JSON.parse(pong)).toEqual({ action: "pong" }); + }); + + it("broadcasts custom messages to other providers", async () => { + const room = `custom-broadcast-${Date.now()}`; + + const providerA = createProvider(room, { party: "y-custom-message" }); + await waitForSync(providerA); + + const providerB = createProvider(room, { party: "y-custom-message" }); + await waitForSync(providerB); + + await new Promise((r) => setTimeout(r, 300)); + + // Listen on B for the broadcast + const broadcastPromise = waitForCustomMessage(providerB); + + // A sends a broadcast request + providerA.sendMessage(JSON.stringify({ action: "broadcast" })); + + const msg = await broadcastPromise; + expect(JSON.parse(msg)).toEqual({ action: "broadcasted" }); + }); +}); + +describe("Integration — persistence", () => { + it("persists document state across sessions", async () => { + const room = `persist-e2e-${Date.now()}`; + + // Session 1: write data + { + const doc = new Y.Doc(); + const provider = createProvider(room, { + doc, + party: "y-persistent" + }); + await waitForSync(provider); + + doc.getText("shared").insert(0, "persisted-data"); + + // Wait for debounced onSave to fire + await new Promise((r) => setTimeout(r, 500)); + + provider.destroy(); + // Remove from tracked list since we destroyed it manually + const idx = providers.indexOf(provider); + if (idx >= 0) providers.splice(idx, 1); + } + + await new Promise((r) => setTimeout(r, 500)); + + // Session 2: reconnect and verify + { + const doc = new Y.Doc(); + const provider = createProvider(room, { + doc, + party: "y-persistent" + }); + await waitForSync(provider); + + // Wait for server to send the persisted state + await new Promise((r) => setTimeout(r, 1000)); + + expect(doc.getText("shared").toString()).toBe("persisted-data"); + } + }); +}); + +describe("Integration — read-only mode", () => { + it("prevents writes from read-only connections", async () => { + const room = `readonly-e2e-${Date.now()}`; + + // Connect a read-only provider and try to write + const docRO = new Y.Doc(); + const providerRO = createProvider(room, { + doc: docRO, + party: "y-read-only" + }); + await waitForSync(providerRO); + + docRO.getText("shared").insert(0, "should-not-appear"); + await new Promise((r) => setTimeout(r, 500)); + + // Connect a second read-only provider and check the server doc is empty + const docRO2 = new Y.Doc(); + const providerRO2 = createProvider(room, { + doc: docRO2, + party: "y-read-only" + }); + await waitForSync(providerRO2); + await new Promise((r) => setTimeout(r, 500)); + + expect(docRO2.getText("shared").toString()).toBe(""); + }); +}); + +describe("Integration — onLoad returns YDoc", () => { + it("seeds document from returned YDoc", async () => { + const room = `onload-doc-e2e-${Date.now()}`; + + const doc = new Y.Doc(); + const provider = createProvider(room, { + doc, + party: "y-on-load-returns-doc" + }); + await waitForSync(provider); + + await new Promise((r) => setTimeout(r, 500)); + + expect(doc.getText("shared").toString()).toBe("seeded-content"); + }); +}); + +describe("Integration — reconnection", () => { + it("reconnects and re-syncs after disconnect", async () => { + const room = `reconnect-${Date.now()}`; + + const doc = new Y.Doc(); + const provider = createProvider(room, { doc }); + await waitForSync(provider); + + doc.getText("shared").insert(0, "before-disconnect"); + await new Promise((r) => setTimeout(r, 300)); + + // Disconnect and wait for the status change + const disconnected = new Promise((resolve) => { + provider.on("status", (event: { status: string }) => { + if (event.status === "disconnected") { + resolve(); + } + }); + }); + provider.disconnect(); + await disconnected; + expect(provider.wsconnected).toBe(false); + + // Reconnect + provider.connect(); + await waitForConnection(provider); + await waitForSync(provider); + + // The text should still be there (server-side doc persists in memory) + await new Promise((r) => setTimeout(r, 500)); + expect(doc.getText("shared").toString()).toBe("before-disconnect"); + }); +}); + +describe("Integration — multiple rooms", () => { + it("isolates documents across different rooms", async () => { + const roomA = `multi-room-a-${Date.now()}`; + const roomB = `multi-room-b-${Date.now()}`; + + const docA = new Y.Doc(); + const providerA = createProvider(roomA, { doc: docA }); + await waitForSync(providerA); + + const docB = new Y.Doc(); + const providerB = createProvider(roomB, { doc: docB }); + await waitForSync(providerB); + + // Write different content to each room + docA.getText("shared").insert(0, "room-A-content"); + docB.getText("shared").insert(0, "room-B-content"); + + await new Promise((r) => setTimeout(r, 500)); + + // Verify isolation: each room has its own content + expect(docA.getText("shared").toString()).toBe("room-A-content"); + expect(docB.getText("shared").toString()).toBe("room-B-content"); + + // Connect a new provider to room A — should only see A's content + const docA2 = new Y.Doc(); + const providerA2 = createProvider(roomA, { doc: docA2 }); + await waitForSync(providerA2); + await new Promise((r) => setTimeout(r, 500)); + + expect(docA2.getText("shared").toString()).toBe("room-A-content"); + }); +}); + +describe("Integration — large documents", () => { + it("syncs a document with many operations", async () => { + const room = `large-doc-${Date.now()}`; + + const docA = new Y.Doc(); + const providerA = createProvider(room, { doc: docA }); + await waitForSync(providerA); + + // Perform many operations + const text = docA.getText("shared"); + for (let i = 0; i < 100; i++) { + text.insert(text.length, `line-${i}\n`); + } + + await new Promise((r) => setTimeout(r, 1000)); + + // Connect B and verify it gets all the content + const docB = new Y.Doc(); + const providerB = createProvider(room, { doc: docB }); + await waitForSync(providerB); + await new Promise((r) => setTimeout(r, 1000)); + + const contentB = docB.getText("shared").toString(); + expect(contentB).toContain("line-0"); + expect(contentB).toContain("line-99"); + // All 100 lines should be present + expect(contentB.split("\n").filter(Boolean)).toHaveLength(100); + }); +}); diff --git a/packages/y-partyserver/src/tests/tsconfig.json b/packages/y-partyserver/src/tests/tsconfig.json new file mode 100644 index 00000000..c64482f3 --- /dev/null +++ b/packages/y-partyserver/src/tests/tsconfig.json @@ -0,0 +1,10 @@ +{ + "extends": "../../../../tsconfig.base.json", + "compilerOptions": { + "types": [ + "@cloudflare/workers-types/experimental", + "@cloudflare/vitest-pool-workers" + ] + }, + "include": ["./**/*.ts"] +} diff --git a/packages/y-partyserver/src/tests/vitest.config.ts b/packages/y-partyserver/src/tests/vitest.config.ts new file mode 100644 index 00000000..17c6a269 --- /dev/null +++ b/packages/y-partyserver/src/tests/vitest.config.ts @@ -0,0 +1,14 @@ +import { defineWorkersConfig } from "@cloudflare/vitest-pool-workers/config"; + +export default defineWorkersConfig({ + test: { + poolOptions: { + workers: { + isolatedStorage: false, + wrangler: { + configPath: "./wrangler.jsonc" + } + } + } + } +}); diff --git a/packages/y-partyserver/src/tests/vitest.integration.config.ts b/packages/y-partyserver/src/tests/vitest.integration.config.ts new file mode 100644 index 00000000..94424692 --- /dev/null +++ b/packages/y-partyserver/src/tests/vitest.integration.config.ts @@ -0,0 +1,9 @@ +import { defineConfig } from "vitest/config"; + +export default defineConfig({ + test: { + include: ["integration.test.ts"], + testTimeout: 30000, + globalSetup: ["./global-setup.ts"] + } +}); diff --git a/packages/y-partyserver/src/tests/worker.ts b/packages/y-partyserver/src/tests/worker.ts new file mode 100644 index 00000000..7fe66cce --- /dev/null +++ b/packages/y-partyserver/src/tests/worker.ts @@ -0,0 +1,180 @@ +import { routePartykitRequest } from "partyserver"; +import { YServer } from "../server/index"; +import * as Y from "yjs"; + +import type { Connection, ConnectionContext } from "partyserver"; +import type { CallbackOptions } from "../server/index"; + +// --------------------------------------------------------------------------- +// Env type for all test DOs +// --------------------------------------------------------------------------- +export type Env = { + YBasic: DurableObjectNamespace; + YPersistent: DurableObjectNamespace; + YReadOnly: DurableObjectNamespace; + YCustomMessage: DurableObjectNamespace; + YOnLoadReturnsDoc: DurableObjectNamespace; + YCallbackOptions: DurableObjectNamespace; +}; + +// --------------------------------------------------------------------------- +// 1. Basic YServer — no persistence, no customization +// --------------------------------------------------------------------------- +export class YBasic extends YServer { + static options = { + hibernate: true + }; +} + +// --------------------------------------------------------------------------- +// 2. Persistent YServer — stores doc in SQLite, exercises onLoad/onSave +// --------------------------------------------------------------------------- +export class YPersistent extends YServer { + static options = { + hibernate: true + }; + + static callbackOptions: CallbackOptions = { + debounceWait: 50, + debounceMaxWait: 100 + }; + + async onStart() { + this.ctx.storage.sql.exec( + "CREATE TABLE IF NOT EXISTS documents (id TEXT PRIMARY KEY, content BLOB)" + ); + return super.onStart(); + } + + async onLoad() { + const rows = [ + ...this.ctx.storage.sql.exec( + "SELECT content FROM documents WHERE id = ? LIMIT 1", + this.name + ) + ]; + if (rows.length > 0 && rows[0].content) { + Y.applyUpdate( + this.document, + new Uint8Array(rows[0].content as ArrayBuffer) + ); + } + return; + } + + async onSave() { + const update = Y.encodeStateAsUpdate(this.document); + this.ctx.storage.sql.exec( + "INSERT OR REPLACE INTO documents (id, content) VALUES (?, ?)", + this.name, + update + ); + } +} + +// --------------------------------------------------------------------------- +// 3. Read-only YServer — all connections are read-only +// --------------------------------------------------------------------------- +export class YReadOnly extends YServer { + static options = { + hibernate: true + }; + + isReadOnly(_connection: Connection): boolean { + return true; + } + + onConnect(conn: Connection, _ctx: ConnectionContext): void { + super.onConnect(conn, _ctx); + // Also send a marker so the test knows the connection was accepted + conn.send("connected:readonly"); + } +} + +// --------------------------------------------------------------------------- +// 4. Custom message YServer — exercises onCustomMessage, sendCustomMessage, +// broadcastCustomMessage +// --------------------------------------------------------------------------- +export class YCustomMessage extends YServer { + static options = { + hibernate: true + }; + + onCustomMessage(connection: Connection, message: string): void { + try { + const data = JSON.parse(message) as { action: string }; + if (data.action === "ping") { + this.sendCustomMessage(connection, JSON.stringify({ action: "pong" })); + } else if (data.action === "broadcast") { + this.broadcastCustomMessage( + JSON.stringify({ action: "broadcasted" }), + connection + ); + } else if (data.action === "echo") { + this.sendCustomMessage(connection, message); + } + } catch { + this.sendCustomMessage( + connection, + JSON.stringify({ error: "parse-error" }) + ); + } + } +} + +// --------------------------------------------------------------------------- +// 5. YServer where onLoad returns a YDoc (tests the return-YDoc code path) +// --------------------------------------------------------------------------- +export class YOnLoadReturnsDoc extends YServer { + static options = { + hibernate: true + }; + + async onLoad(): Promise { + // Create a fresh doc with some pre-seeded content + const seedDoc = new Y.Doc(); + seedDoc.getText("shared").insert(0, "seeded-content"); + return seedDoc; + } +} + +// --------------------------------------------------------------------------- +// 6. YServer with custom callback options +// --------------------------------------------------------------------------- +export class YCallbackOptions extends YServer { + static options = { + hibernate: true + }; + + static callbackOptions: CallbackOptions = { + debounceWait: 50, + debounceMaxWait: 100 + }; + + saveCount = 0; + + async onSave() { + this.saveCount++; + } + + // Expose saveCount via HTTP for testing + onRequest(): Response { + return Response.json({ saveCount: this.saveCount }); + } +} + +// --------------------------------------------------------------------------- +// Default fetch handler — routes to the correct DO +// --------------------------------------------------------------------------- +export default { + async fetch( + request: Request, + env: Env, + _ctx: ExecutionContext + ): Promise { + return ( + (await routePartykitRequest(request, env)) || + new Response("Not Found", { status: 404 }) + ); + } +} satisfies ExportedHandler; diff --git a/packages/y-partyserver/src/tests/wrangler.jsonc b/packages/y-partyserver/src/tests/wrangler.jsonc new file mode 100644 index 00000000..e4762a5e --- /dev/null +++ b/packages/y-partyserver/src/tests/wrangler.jsonc @@ -0,0 +1,52 @@ +{ + "main": "/worker.ts", + "compatibility_date": "2026-01-28", + "compatibility_flags": [ + "nodejs_compat", + "enable_nodejs_tty_module", + "enable_nodejs_fs_module", + "enable_nodejs_http_modules", + "enable_nodejs_perf_hooks_module" + ], + "durable_objects": { + "bindings": [ + { + "name": "YBasic", + "class_name": "YBasic" + }, + { + "name": "YPersistent", + "class_name": "YPersistent" + }, + { + "name": "YReadOnly", + "class_name": "YReadOnly" + }, + { + "name": "YCustomMessage", + "class_name": "YCustomMessage" + }, + { + "name": "YOnLoadReturnsDoc", + "class_name": "YOnLoadReturnsDoc" + }, + { + "name": "YCallbackOptions", + "class_name": "YCallbackOptions" + } + ] + }, + "migrations": [ + { + "tag": "v1", + "new_sqlite_classes": [ + "YBasic", + "YPersistent", + "YReadOnly", + "YCustomMessage", + "YOnLoadReturnsDoc", + "YCallbackOptions" + ] + } + ] +} From 7ab56de416bc0b8c364def0c6f214e689000c1ca Mon Sep 17 00:00:00 2001 From: Sunil Pai Date: Mon, 23 Feb 2026 23:35:43 +0000 Subject: [PATCH 2/5] y-partyserver: hibernation support & awareness fix Refactor Y server to support Cloudflare Workers hibernation: remove in-memory conn map and persist per-connection awareness client IDs via connection.state so tracking survives hibernation. Switch broadcasting to use getConnections(), add durable handling for sync and awareness messages, clear stale awareness meta on disconnect, and bump awareness clock on reconnect so remote clients accept re-propagated states. Add debounced persistence and robust connection handling, plus extensive hibernation/integration tests, a Wrangler test harness, and vitest/tsconfig additions. Also update package manifests/workspaces and bump partysocket/partyserver dependency pins. --- package-lock.json | 6 +- package.json | 22 +- packages/hono-party/package.json | 2 +- packages/partyfn/package.json | 2 +- packages/partysub/package.json | 2 +- packages/y-partyserver/package.json | 3 +- packages/y-partyserver/src/provider/index.ts | 15 +- packages/y-partyserver/src/server/index.ts | 216 ++++--- .../src/tests/hibernation.test.ts | 542 ++++++++++++++++++ .../src/tests/integration.test.ts | 36 +- .../y-partyserver/src/tests/server-harness.ts | 104 ++++ .../y-partyserver/src/tests/tsconfig.json | 3 +- .../y-partyserver/src/tests/vitest.config.ts | 1 + .../src/tests/vitest.hibernation.config.ts | 9 + packages/y-partyserver/tsconfig.json | 12 + 15 files changed, 853 insertions(+), 122 deletions(-) create mode 100644 packages/y-partyserver/src/tests/hibernation.test.ts create mode 100644 packages/y-partyserver/src/tests/server-harness.ts create mode 100644 packages/y-partyserver/src/tests/vitest.hibernation.config.ts create mode 100644 packages/y-partyserver/tsconfig.json diff --git a/package-lock.json b/package-lock.json index d5eaae35..1072fb7c 100644 --- a/package-lock.json +++ b/package-lock.json @@ -12235,7 +12235,7 @@ "devDependencies": { "@cloudflare/workers-types": "^4.20251218.0", "hono": "^4.11.1", - "partyserver": ">=0.3.0" + "partyserver": "^0.3.0" }, "peerDependencies": { "@cloudflare/workers-types": "^4.20240729.0", @@ -12260,7 +12260,7 @@ "license": "ISC", "dependencies": { "nanoid": "^5.1.6", - "partysocket": "^1.1.14" + "partysocket": "^1.1.15" } }, "packages/partyhard": { @@ -12336,7 +12336,7 @@ "devDependencies": { "@cloudflare/workers-types": "^4.20251218.0", "partyserver": ">=0.2.0 <1.0.0", - "partysocket": "^1.1.14" + "partysocket": "^1.1.15" }, "peerDependencies": { "@cloudflare/workers-types": "^4.20240729.0", diff --git a/package.json b/package.json index 1fe8cc56..14cc5d12 100644 --- a/package.json +++ b/package.json @@ -3,6 +3,13 @@ "version": "0.0.0", "private": true, "description": "Everything's better with friends", + "license": "ISC", + "author": "Sunil Pai ", + "workspaces": [ + "packages/*", + "fixtures/*" + ], + "type": "module", "scripts": { "build": "npm run build -w partyserver -w partysocket -w y-partyserver -w partysub -w partyfn -w partysync -w partywhen -w partytracks -w hono-party && tsx scripts/check-exports.ts", "check": "npm run check:repo && npm run check:format && npm run check:lint && npm run check:type && npm run check:test", @@ -10,16 +17,10 @@ "check:format": "prettier . --check --ignore-unknown", "check:lint": "biome check", "check:repo": "sherif", - "check:test": "npm run check:test -w partyserver -w partysocket -w partysub -w partywhen -w partytracks", + "check:test": "npm run check:test -w partyserver -w partysocket -w partysub -w partywhen -w partytracks -w y-partyserver", "check:type": "tsx scripts/typecheck.ts", "all": "npm i && npm run build && npm run check" }, - "author": "Sunil Pai ", - "license": "ISC", - "workspaces": [ - "packages/*", - "fixtures/*" - ], "devDependencies": { "@biomejs/biome": "^2.3.10", "@changesets/changelog-github": "^0.5.2", @@ -46,18 +47,17 @@ "wrangler": "^4.56.0" }, "overrides": { - "esbuild": "0.25.0", "@types/node": "25.0.3", + "esbuild": "0.25.0", "prosemirror-model": "1.22.2", "react": "19.2.3", "react-dom": "19.2.3" }, + "packageManager": "npm@11.7.0", "trustedDependencies": [ "@biomejs/biome", "core-js", "esbuild", "workerd" - ], - "packageManager": "npm@11.7.0", - "type": "module" + ] } diff --git a/packages/hono-party/package.json b/packages/hono-party/package.json index b1702fb8..16af9459 100644 --- a/packages/hono-party/package.json +++ b/packages/hono-party/package.json @@ -37,6 +37,6 @@ "devDependencies": { "@cloudflare/workers-types": "^4.20251218.0", "hono": "^4.11.1", - "partyserver": ">=0.3.0" + "partyserver": "^0.3.0" } } diff --git a/packages/partyfn/package.json b/packages/partyfn/package.json index bef838cd..2a972458 100644 --- a/packages/partyfn/package.json +++ b/packages/partyfn/package.json @@ -19,7 +19,7 @@ ], "dependencies": { "nanoid": "^5.1.6", - "partysocket": "^1.1.14" + "partysocket": "^1.1.15" }, "scripts": { "build": "tsx scripts/build.ts" diff --git a/packages/partysub/package.json b/packages/partysub/package.json index 355b1ba6..bdbe7790 100644 --- a/packages/partysub/package.json +++ b/packages/partysub/package.json @@ -47,6 +47,6 @@ "devDependencies": { "@cloudflare/workers-types": "^4.20251218.0", "partyserver": ">=0.2.0 <1.0.0", - "partysocket": "^1.1.14" + "partysocket": "^1.1.15" } } diff --git a/packages/y-partyserver/package.json b/packages/y-partyserver/package.json index ea67bebe..0fb36d8d 100644 --- a/packages/y-partyserver/package.json +++ b/packages/y-partyserver/package.json @@ -40,7 +40,8 @@ "build": "tsx scripts/build.ts", "check:test": "vitest -r src/tests --watch false", "test": "vitest -r src/tests", - "test:integration": "vitest -r src/tests --config vitest.integration.config.ts --watch false" + "test:integration": "vitest -r src/tests --config vitest.integration.config.ts --watch false", + "test:hibernation": "vitest -r src/tests --config vitest.hibernation.config.ts --watch false" }, "dependencies": { "lib0": "^0.2.115", diff --git a/packages/y-partyserver/src/provider/index.ts b/packages/y-partyserver/src/provider/index.ts index 75b8755d..584e20dd 100644 --- a/packages/y-partyserver/src/provider/index.ts +++ b/packages/y-partyserver/src/provider/index.ts @@ -165,13 +165,19 @@ function setupWS(provider: WebsocketProvider) { provider.wsconnected = false; provider.synced = false; // update awareness (all users except local left) + const removedClients = Array.from( + provider.awareness.getStates().keys() + ).filter((client) => client !== provider.doc.clientID); awarenessProtocol.removeAwarenessStates( provider.awareness, - Array.from(provider.awareness.getStates().keys()).filter( - (client) => client !== provider.doc.clientID - ), + removedClients, provider ); + // Clear stale meta for remote clients so their awareness + // updates are accepted on reconnect (clock check starts fresh) + for (const clientID of removedClients) { + provider.awareness.meta.delete(clientID); + } provider.emit("status", [ { status: "disconnected" @@ -208,6 +214,9 @@ function setupWS(provider: WebsocketProvider) { websocket.send(encoding.toUint8Array(encoder)); // broadcast local awareness state if (provider.awareness.getLocalState() !== null) { + // Re-set local state to bump the awareness clock, ensuring + // remote clients accept the update even if they have stale meta + provider.awareness.setLocalState(provider.awareness.getLocalState()); const encoderAwarenessState = encoding.createEncoder(); encoding.writeVarUint(encoderAwarenessState, messageAwareness); encoding.writeVarUint8Array( diff --git a/packages/y-partyserver/src/server/index.ts b/packages/y-partyserver/src/server/index.ts index ddb04327..91036cde 100644 --- a/packages/y-partyserver/src/server/index.ts +++ b/packages/y-partyserver/src/server/index.ts @@ -37,71 +37,45 @@ const messageAwareness = 1; // biome-ignore lint/correctness/noUnusedVariables: it's fine const messageAuth = 2; -function updateHandler(update: Uint8Array, _origin: unknown, doc: WSSharedDoc) { - const encoder = encoding.createEncoder(); - encoding.writeVarUint(encoder, messageSync); - syncProtocol.writeUpdate(encoder, update); - const message = encoding.toUint8Array(encoder); - doc.conns.forEach((_, conn) => { - send(doc, conn, message); - }); +/** + * Internal key used in connection.setState() to track which awareness + * client IDs are controlled by each connection. This survives hibernation + * because connection state is persisted to WebSocket attachments. + */ +const AWARENESS_IDS_KEY = "__ypsAwarenessIds"; + +type YServerConnectionState = { + [AWARENESS_IDS_KEY]?: number[]; + [key: string]: unknown; +}; + +function getAwarenessIds(conn: Connection): number[] { + try { + const state = conn.state as YServerConnectionState | null; + return state?.[AWARENESS_IDS_KEY] ?? []; + } catch { + return []; + } +} + +function setAwarenessIds(conn: Connection, ids: number[]): void { + try { + conn.setState((prev: YServerConnectionState | null) => ({ + ...prev, + [AWARENESS_IDS_KEY]: ids + })); + } catch { + // ignore — may fail if connection is already closed + } } class WSSharedDoc extends YDoc { - conns: Map>; awareness: awarenessProtocol.Awareness; constructor() { super({ gc: true }); - - /** - * Maps from conn to set of controlled user ids. Delete all user ids from awareness when this conn is closed - */ - this.conns = new Map(); - this.awareness = new awarenessProtocol.Awareness(this); this.awareness.setLocalState(null); - - const awarenessChangeHandler = ( - { - added, - updated, - removed - }: { - added: Array; - updated: Array; - removed: Array; - }, - conn: Connection | null // Origin is the connection that made the change - ) => { - const changedClients = added.concat(updated, removed); - if (conn !== null) { - const connControlledIDs = - /** @type {Set} */ this.conns.get(conn); - if (connControlledIDs !== undefined) { - added.forEach((clientID) => { - connControlledIDs.add(clientID); - }); - removed.forEach((clientID) => { - connControlledIDs.delete(clientID); - }); - } - } - // broadcast awareness update - const encoder = encoding.createEncoder(); - encoding.writeVarUint(encoder, messageAwareness); - encoding.writeVarUint8Array( - encoder, - awarenessProtocol.encodeAwarenessUpdate(this.awareness, changedClients) - ); - const buff = encoding.toUint8Array(encoder); - this.conns.forEach((_, c) => { - send(this, c, buff); - }); - }; - this.awareness.on("update", awarenessChangeHandler); - // @ts-expect-error - TODO: fix this - this.on("update", updateHandler); } } @@ -136,35 +110,18 @@ function readSyncMessage( return messageType; } -function closeConn(doc: WSSharedDoc, conn: Connection): void { - if (doc.conns.has(conn)) { - const controlledIds: Set = doc.conns.get(conn)!; - doc.conns.delete(conn); - awarenessProtocol.removeAwarenessStates( - doc.awareness, - Array.from(controlledIds), - null - ); - } - try { - conn.close(); - } catch (e) { - console.warn("failed to close connection", e); - } -} - -function send(doc: WSSharedDoc, conn: Connection, m: Uint8Array) { +function send(conn: Connection, m: Uint8Array): void { if ( conn.readyState !== undefined && conn.readyState !== wsReadyStateConnecting && conn.readyState !== wsReadyStateOpen ) { - closeConn(doc, conn); + return; } try { conn.send(m); - } catch (_e) { - closeConn(doc, conn); + } catch { + // connection is broken, ignore } } @@ -260,6 +217,70 @@ export class YServer< applyUpdate(this.document, state); } + // Broadcast doc updates to all connections. + // Uses this.getConnections() which works for both hibernate and non-hibernate + // modes and survives DO hibernation (unlike an in-memory Map). + this.document.on("update", (update: Uint8Array) => { + const encoder = encoding.createEncoder(); + encoding.writeVarUint(encoder, messageSync); + syncProtocol.writeUpdate(encoder, update); + const message = encoding.toUint8Array(encoder); + for (const conn of this.getConnections()) { + send(conn, message); + } + }); + + // Track which awareness clientIDs each connection controls. + // Stored in connection.setState() so it survives hibernation. + // When conn is null (internal changes like removeAwarenessStates on close), + // broadcast the update to remaining connections. + // When conn is non-null (client message), handleMessage broadcasts directly. + this.document.awareness.on( + "update", + ( + { + added, + updated, + removed + }: { + added: Array; + updated: Array; + removed: Array; + }, + conn: Connection | null + ) => { + if (conn !== null) { + // Track which clientIDs this connection controls + try { + const currentIds = new Set(getAwarenessIds(conn)); + for (const clientID of added) currentIds.add(clientID); + for (const clientID of removed) currentIds.delete(clientID); + setAwarenessIds(conn, [...currentIds]); + } catch (_e) { + // ignore — best-effort tracking + } + } else { + // Internal awareness change (e.g. removeAwarenessStates on close) + // — broadcast to all remaining connections + const changedClients = added.concat(updated, removed); + const encoder = encoding.createEncoder(); + encoding.writeVarUint(encoder, messageAwareness); + encoding.writeVarUint8Array( + encoder, + awarenessProtocol.encodeAwarenessUpdate( + this.document.awareness, + changedClients + ) + ); + const buff = encoding.toUint8Array(encoder); + for (const c of this.getConnections()) { + send(c, buff); + } + } + } + ); + + // Debounced persistence handler this.document.on( "update", debounce( @@ -334,23 +355,23 @@ export class YServer< excludeConnection?: Connection ): void { const formattedMessage = `__YPS:${message}`; - this.document.conns.forEach((_, conn) => { + for (const conn of this.getConnections()) { if (excludeConnection && conn === excludeConnection) { - return; + continue; } if ( conn.readyState !== undefined && conn.readyState !== wsReadyStateConnecting && conn.readyState !== wsReadyStateOpen ) { - return; + continue; } try { conn.send(formattedMessage); } catch (e) { console.warn("Failed to broadcast custom message", e); } - }); + } } handleMessage(connection: Connection, message: WSMessage) { @@ -396,15 +417,24 @@ export class YServer< // message, there is no need to send the message. When `encoder` only // contains the type of reply, its length is 1. if (encoding.length(encoder) > 1) { - send(this.document, connection, encoding.toUint8Array(encoder)); + send(connection, encoding.toUint8Array(encoder)); } break; case messageAwareness: { + const awarenessData = decoding.readVarUint8Array(decoder); awarenessProtocol.applyAwarenessUpdate( this.document.awareness, - decoding.readVarUint8Array(decoder), + awarenessData, connection ); + // Forward raw awareness bytes to all connections + const awarenessEncoder = encoding.createEncoder(); + encoding.writeVarUint(awarenessEncoder, messageAwareness); + encoding.writeVarUint8Array(awarenessEncoder, awarenessData); + const awarenessBuff = encoding.toUint8Array(awarenessEncoder); + for (const c of this.getConnections()) { + send(c, awarenessBuff); + } break; } } @@ -425,7 +455,16 @@ export class YServer< _reason: string, _wasClean: boolean ): void | Promise { - closeConn(this.document, connection); + // Read controlled awareness clientIDs from connection state + // (survives hibernation unlike an in-memory Map) + const controlledIds = getAwarenessIds(connection); + if (controlledIds.length > 0) { + awarenessProtocol.removeAwarenessStates( + this.document.awareness, + controlledIds, + null + ); + } } // TODO: explore why onError gets triggered when a connection closes @@ -434,15 +473,14 @@ export class YServer< conn: Connection, _ctx: ConnectionContext ): void | Promise { - // conn.binaryType = "arraybuffer"; // from y-websocket, breaks in our runtime - - this.document.conns.set(conn, new Set()); + // Note: awareness IDs are lazily initialized when the first awareness + // message is received — no need to call setAwarenessIds(conn, []) here // send sync step 1 const encoder = encoding.createEncoder(); encoding.writeVarUint(encoder, messageSync); syncProtocol.writeSyncStep1(encoder, this.document); - send(this.document, conn, encoding.toUint8Array(encoder)); + send(conn, encoding.toUint8Array(encoder)); const awarenessStates = this.document.awareness.getStates(); if (awarenessStates.size > 0) { const encoder = encoding.createEncoder(); @@ -454,7 +492,7 @@ export class YServer< Array.from(awarenessStates.keys()) ) ); - send(this.document, conn, encoding.toUint8Array(encoder)); + send(conn, encoding.toUint8Array(encoder)); } } } diff --git a/packages/y-partyserver/src/tests/hibernation.test.ts b/packages/y-partyserver/src/tests/hibernation.test.ts new file mode 100644 index 00000000..1376a98d --- /dev/null +++ b/packages/y-partyserver/src/tests/hibernation.test.ts @@ -0,0 +1,542 @@ +import WebSocket from "ws"; +import * as Y from "yjs"; +import { afterAll, afterEach, beforeAll, describe, expect, it } from "vitest"; + +import YProvider from "../provider/index"; +import { WranglerServer } from "./server-harness"; + +const PORT = 8801; +const HOST = `localhost:${PORT}`; +const server = new WranglerServer(PORT); + +// Track providers for cleanup +const providers: YProvider[] = []; + +function createProvider( + room: string, + options: { party?: string; doc?: Y.Doc } = {} +): YProvider { + const doc = options.doc ?? new Y.Doc(); + const provider = new YProvider(HOST, room, doc, { + party: options.party ?? "y-basic", + connect: true, + WebSocketPolyfill: WebSocket as unknown as typeof globalThis.WebSocket, + disableBc: true, + // Faster reconnection for restart tests + maxBackoffTime: 1000 + }); + providers.push(provider); + return provider; +} + +function waitForSync(provider: YProvider, timeoutMs = 20000): Promise { + return new Promise((resolve, reject) => { + if (provider.synced) { + resolve(); + return; + } + const timeout = setTimeout( + () => reject(new Error("Timed out waiting for sync")), + timeoutMs + ); + provider.on("synced", () => { + clearTimeout(timeout); + resolve(); + }); + }); +} + +/** + * Wait for a provider to go through disconnect → reconnect → sync. + * Handles the case where the provider is still connected (hasn't + * detected the server kill yet) or already disconnected. + */ +function waitForReconnectAndSync( + provider: YProvider, + timeoutMs = 30000 +): Promise { + return new Promise((resolve, reject) => { + const timeout = setTimeout( + () => + reject( + new Error("Timed out waiting for reconnect and sync after restart") + ), + timeoutMs + ); + + let sawDisconnect = !provider.wsconnected; + + const statusHandler = (event: { status: string }) => { + if (event.status === "disconnected") { + sawDisconnect = true; + } + }; + provider.on("status", statusHandler); + + const syncHandler = () => { + if (sawDisconnect) { + clearTimeout(timeout); + provider.off("status", statusHandler); + provider.off("synced", syncHandler); + resolve(); + } + }; + provider.on("synced", syncHandler); + + // If already disconnected and re-synced, resolve immediately + if (sawDisconnect && provider.synced) { + clearTimeout(timeout); + provider.off("status", statusHandler); + provider.off("synced", syncHandler); + resolve(); + } + }); +} + +function destroyProvider(provider: YProvider): void { + try { + provider.destroy(); + } catch { + // ignore + } + const idx = providers.indexOf(provider); + if (idx >= 0) providers.splice(idx, 1); +} + +beforeAll(async () => { + server.cleanup(); + await server.start(); +}, 60000); + +afterEach(() => { + for (const p of providers) { + try { + p.destroy(); + } catch { + // ignore + } + } + providers.length = 0; +}); + +afterAll(async () => { + await server.stop(); + server.cleanup(); +}, 30000); + +// --------------------------------------------------------------------------- +// All test DOs in worker.ts use `static options = { hibernate: true }` +// --------------------------------------------------------------------------- + +describe("Server restart — non-persistent YServer (hibernate: true)", () => { + it("connected client re-syncs its local state to server after restart", async () => { + const room = `restart-basic-${Date.now()}`; + + // Connect and write data + const doc = new Y.Doc(); + const provider = createProvider(room, { doc }); + await waitForSync(provider); + + doc.getText("shared").insert(0, "survive-restart"); + await new Promise((r) => setTimeout(r, 300)); + + // Set up listener BEFORE restart to avoid race condition + const reconnect = waitForReconnectAndSync(provider); + + // Kill and restart the server — all server-side in-memory state is lost + await server.restart(); + + // Provider auto-reconnects and re-syncs its local state to the new server + await reconnect; + + // Client still has its local data + expect(doc.getText("shared").toString()).toBe("survive-restart"); + + // A second client connecting should get the re-synced state + const doc2 = new Y.Doc(); + const provider2 = createProvider(room, { doc: doc2 }); + await waitForSync(provider2); + await new Promise((r) => setTimeout(r, 500)); + + expect(doc2.getText("shared").toString()).toBe("survive-restart"); + }); + + it("data is lost if no client is connected to re-sync after restart", async () => { + const room = `restart-lost-${Date.now()}`; + + // Connect, write data, then disconnect before restart + const doc = new Y.Doc(); + const provider = createProvider(room, { doc }); + await waitForSync(provider); + + doc.getText("shared").insert(0, "will-be-lost"); + await new Promise((r) => setTimeout(r, 300)); + + // Destroy the provider — no client will be around to re-sync + destroyProvider(provider); + + // Restart server + await server.restart(); + + // New client gets empty doc — no persistence, no re-sync source + const doc2 = new Y.Doc(); + const provider2 = createProvider(room, { doc: doc2 }); + await waitForSync(provider2); + await new Promise((r) => setTimeout(r, 500)); + + expect(doc2.getText("shared").toString()).toBe(""); + }); + + it("two connected clients both survive restart and stay converged", async () => { + const room = `restart-two-${Date.now()}`; + + const docA = new Y.Doc(); + const providerA = createProvider(room, { doc: docA }); + await waitForSync(providerA); + + const docB = new Y.Doc(); + const providerB = createProvider(room, { doc: docB }); + await waitForSync(providerB); + await new Promise((r) => setTimeout(r, 300)); + + // Both write + docA.getText("shared").insert(0, "A-data"); + await new Promise((r) => setTimeout(r, 200)); + docB.getText("shared").insert(docB.getText("shared").length, "+B-data"); + await new Promise((r) => setTimeout(r, 500)); + + // Verify convergence before restart + const beforeA = docA.getText("shared").toString(); + const beforeB = docB.getText("shared").toString(); + expect(beforeA).toBe(beforeB); + expect(beforeA).toContain("A-data"); + expect(beforeA).toContain("B-data"); + + // Set up reconnection listeners BEFORE restart to avoid race condition + const reconnectA = waitForReconnectAndSync(providerA); + const reconnectB = waitForReconnectAndSync(providerB); + + await server.restart(); + + await Promise.all([reconnectA, reconnectB]); + await new Promise((r) => setTimeout(r, 1000)); + + // Both should still have the same content + expect(docA.getText("shared").toString()).toContain("A-data"); + expect(docA.getText("shared").toString()).toContain("B-data"); + expect(docB.getText("shared").toString()).toContain("A-data"); + expect(docB.getText("shared").toString()).toContain("B-data"); + // And they should still be converged + expect(docA.getText("shared").toString()).toBe( + docB.getText("shared").toString() + ); + }); + + it("clients can continue collaborating after restart", async () => { + const room = `restart-collab-${Date.now()}`; + + const docA = new Y.Doc(); + const providerA = createProvider(room, { doc: docA }); + await waitForSync(providerA); + + const docB = new Y.Doc(); + const providerB = createProvider(room, { doc: docB }); + await waitForSync(providerB); + await new Promise((r) => setTimeout(r, 300)); + + docA.getText("shared").insert(0, "before"); + await new Promise((r) => setTimeout(r, 500)); + + // Set up reconnection listeners BEFORE restart + const reconnectA = waitForReconnectAndSync(providerA); + const reconnectB = waitForReconnectAndSync(providerB); + + await server.restart(); + + await Promise.all([reconnectA, reconnectB]); + await new Promise((r) => setTimeout(r, 500)); + + // Write AFTER restart + const updateReceived = new Promise((resolve) => { + docB.on("update", () => { + if (docB.getText("shared").toString().includes("after")) { + resolve(); + } + }); + }); + docA.getText("shared").insert(docA.getText("shared").length, "-and-after"); + + await updateReceived; + + expect(docB.getText("shared").toString()).toContain("before"); + expect(docB.getText("shared").toString()).toContain("after"); + }); +}); + +describe("Server restart — persistent YServer (hibernate: true)", () => { + it("restores state from storage when no client re-syncs", async () => { + const room = `restart-persist-${Date.now()}`; + + // Connect to persistent server, write data + const doc = new Y.Doc(); + const provider = createProvider(room, { doc, party: "y-persistent" }); + await waitForSync(provider); + + doc.getText("shared").insert(0, "persisted-across-restart"); + + // Wait for debounced onSave + await new Promise((r) => setTimeout(r, 500)); + + // Destroy provider — no client to re-sync + destroyProvider(provider); + + // Restart server + await server.restart(); + + // New client should get state from persistence (onLoad) + const doc2 = new Y.Doc(); + const provider2 = createProvider(room, { + doc: doc2, + party: "y-persistent" + }); + await waitForSync(provider2); + await new Promise((r) => setTimeout(r, 1000)); + + expect(doc2.getText("shared").toString()).toBe("persisted-across-restart"); + }); + + it("connected client + persistence both work after restart", async () => { + const room = `restart-persist-conn-${Date.now()}`; + + const doc = new Y.Doc(); + const provider = createProvider(room, { doc, party: "y-persistent" }); + await waitForSync(provider); + + doc.getText("shared").insert(0, "persistent-and-connected"); + await new Promise((r) => setTimeout(r, 500)); + + // Set up listener BEFORE restart + const reconnect = waitForReconnectAndSync(provider); + + // Restart with client still connected + await server.restart(); + + await reconnect; + await new Promise((r) => setTimeout(r, 500)); + + expect(doc.getText("shared").toString()).toBe("persistent-and-connected"); + + // Second client also sees the data + const doc2 = new Y.Doc(); + const provider2 = createProvider(room, { + doc: doc2, + party: "y-persistent" + }); + await waitForSync(provider2); + await new Promise((r) => setTimeout(r, 1000)); + + expect(doc2.getText("shared").toString()).toBe("persistent-and-connected"); + }); + + it("survives two consecutive restarts", async () => { + const room = `restart-persist-twice-${Date.now()}`; + + // Session 1: write data + const doc1 = new Y.Doc(); + const provider1 = createProvider(room, { + doc: doc1, + party: "y-persistent" + }); + await waitForSync(provider1); + + doc1.getText("shared").insert(0, "round-1"); + await new Promise((r) => setTimeout(r, 500)); + destroyProvider(provider1); + + // First restart + await server.restart(); + + // Session 2: verify + write more + const doc2 = new Y.Doc(); + const provider2 = createProvider(room, { + doc: doc2, + party: "y-persistent" + }); + await waitForSync(provider2); + await new Promise((r) => setTimeout(r, 1000)); + + expect(doc2.getText("shared").toString()).toBe("round-1"); + + doc2.getText("shared").insert(doc2.getText("shared").length, "+round-2"); + await new Promise((r) => setTimeout(r, 500)); + destroyProvider(provider2); + + // Second restart + await server.restart(); + + // Session 3: verify both rounds persisted + const doc3 = new Y.Doc(); + const provider3 = createProvider(room, { + doc: doc3, + party: "y-persistent" + }); + await waitForSync(provider3); + await new Promise((r) => setTimeout(r, 1000)); + + expect(doc3.getText("shared").toString()).toBe("round-1+round-2"); + }); +}); + +describe("Server restart — awareness (hibernate: true)", () => { + it("local awareness state survives restart on the provider", async () => { + const room = `restart-awareness-local-${Date.now()}`; + + const doc = new Y.Doc(); + const provider = createProvider(room, { doc }); + await waitForSync(provider); + + provider.awareness.setLocalState({ user: { name: "Alice" } }); + await new Promise((r) => setTimeout(r, 200)); + + const reconnect = waitForReconnectAndSync(provider); + await server.restart(); + await reconnect; + + // The provider's local awareness state persists across restart + // (it's stored on the provider, not the server) + const local = provider.awareness.getLocalState() as { + user: { name: string }; + }; + expect(local).toBeDefined(); + expect(local.user.name).toBe("Alice"); + }); + + it("awareness automatically re-propagates after restart (no explicit re-trigger)", async () => { + const room = `restart-awareness-auto-${Date.now()}`; + + const docA = new Y.Doc(); + const providerA = createProvider(room, { doc: docA }); + await waitForSync(providerA); + + const docB = new Y.Doc(); + const providerB = createProvider(room, { doc: docB }); + await waitForSync(providerB); + await new Promise((r) => setTimeout(r, 300)); + + // Set awareness before restart + providerA.awareness.setLocalState({ user: { name: "Alice" } }); + + // Wait for B to see it + await new Promise((resolve) => { + const check = () => { + if (providerB.awareness.getStates().get(docA.clientID)) { + resolve(); + } else { + setTimeout(check, 100); + } + }; + check(); + }); + + // Set up reconnection listeners BEFORE restart + const reconnectA = waitForReconnectAndSync(providerA); + const reconnectB = waitForReconnectAndSync(providerB); + + await server.restart(); + + await Promise.all([reconnectA, reconnectB]); + + // Do NOT explicitly re-set awareness — the provider should automatically + // re-broadcast with a bumped clock on reconnect, and the receiving + // provider should accept it because stale meta was cleared on close. + await new Promise((resolve, reject) => { + const timeout = setTimeout( + () => + reject( + new Error( + "Timed out waiting for automatic awareness re-propagation" + ) + ), + 10000 + ); + const check = () => { + const state = providerB.awareness.getStates().get(docA.clientID); + if (state) { + clearTimeout(timeout); + resolve(); + } else { + setTimeout(check, 200); + } + }; + check(); + }); + + const stateA = providerB.awareness.getStates().get(docA.clientID) as { + user: { name: string }; + }; + expect(stateA.user.name).toBe("Alice"); + }); +}); + +describe("Server restart — custom messages (hibernate: true)", () => { + it("custom messages work after restart", async () => { + const room = `restart-custom-${Date.now()}`; + + const provider = createProvider(room, { party: "y-custom-message" }); + await waitForSync(provider); + + // Verify custom messages work before restart + const pong1 = new Promise((resolve) => { + provider.on("custom-message", (msg: string) => resolve(msg)); + }); + provider.sendMessage(JSON.stringify({ action: "ping" })); + expect(JSON.parse(await pong1)).toEqual({ action: "pong" }); + + // Set up listener BEFORE restart + const reconnect = waitForReconnectAndSync(provider); + + await server.restart(); + await reconnect; + + // Custom messages should still work after restart + const pong2 = new Promise((resolve) => { + provider.on("custom-message", (msg: string) => resolve(msg)); + }); + provider.sendMessage(JSON.stringify({ action: "ping" })); + expect(JSON.parse(await pong2)).toEqual({ action: "pong" }); + }); +}); + +describe("Server restart — seeded documents (hibernate: true)", () => { + it("onLoad-returned YDoc is re-seeded after restart", async () => { + const room = `restart-seeded-${Date.now()}`; + + // YOnLoadReturnsDoc seeds "seeded-content" on every onLoad + const doc = new Y.Doc(); + const provider = createProvider(room, { + doc, + party: "y-on-load-returns-doc" + }); + await waitForSync(provider); + await new Promise((r) => setTimeout(r, 500)); + + expect(doc.getText("shared").toString()).toBe("seeded-content"); + + // Destroy the provider + destroyProvider(provider); + + // Restart + await server.restart(); + + // New client should still get the seeded content (onLoad runs again) + const doc2 = new Y.Doc(); + const provider2 = createProvider(room, { + doc: doc2, + party: "y-on-load-returns-doc" + }); + await waitForSync(provider2); + await new Promise((r) => setTimeout(r, 500)); + + expect(doc2.getText("shared").toString()).toBe("seeded-content"); + }); +}); diff --git a/packages/y-partyserver/src/tests/integration.test.ts b/packages/y-partyserver/src/tests/integration.test.ts index 52b2c4d1..81f045b3 100644 --- a/packages/y-partyserver/src/tests/integration.test.ts +++ b/packages/y-partyserver/src/tests/integration.test.ts @@ -254,11 +254,14 @@ describe("Integration — awareness", () => { user: { name: "Alice", color: "#ff0000" } }); - // Wait for awareness to propagate + // Wait for the actual awareness state to propagate (not just the + // default {} from the Awareness constructor) await new Promise((resolve) => { const check = () => { - const stateA = providerB.awareness.getStates().get(docA.clientID); - if (stateA) { + const stateA = providerB.awareness.getStates().get(docA.clientID) as + | { user?: { name: string } } + | undefined; + if (stateA?.user) { resolve(); } else { setTimeout(check, 100); @@ -439,17 +442,28 @@ describe("Integration — reconnection", () => { doc.getText("shared").insert(0, "before-disconnect"); await new Promise((r) => setTimeout(r, 300)); - // Disconnect and wait for the status change - const disconnected = new Promise((resolve) => { - provider.on("status", (event: { status: string }) => { - if (event.status === "disconnected") { + // Disconnect — with hibernate: true and the ws library, the close + // handshake may not complete (no close event), so we force-terminate + // the underlying WebSocket and wait for the provider to notice. + const ws = provider.ws as unknown as { + terminate?: () => void; + CLOSED?: number; + }; + provider.disconnect(); + if (ws && typeof ws.terminate === "function") { + ws.terminate(); + } + // Wait for provider to process the disconnect + await new Promise((resolve) => { + const check = () => { + if (!provider.wsconnected && provider.ws === null) { resolve(); + } else { + setTimeout(check, 50); } - }); + }; + setTimeout(check, 100); }); - provider.disconnect(); - await disconnected; - expect(provider.wsconnected).toBe(false); // Reconnect provider.connect(); diff --git a/packages/y-partyserver/src/tests/server-harness.ts b/packages/y-partyserver/src/tests/server-harness.ts new file mode 100644 index 00000000..af508db5 --- /dev/null +++ b/packages/y-partyserver/src/tests/server-harness.ts @@ -0,0 +1,104 @@ +import { spawn } from "node:child_process"; +import type { ChildProcess } from "node:child_process"; +import fs from "node:fs"; +import path from "node:path"; + +const TEST_DIR = path.dirname(new URL(import.meta.url).pathname); + +export class WranglerServer { + private process: ChildProcess | null = null; + private port: number; + private persistDir: string; + + constructor(port: number) { + this.port = port; + this.persistDir = path.join(TEST_DIR, `.wrangler-persist-${port}`); + } + + async start(): Promise { + this.process = spawn( + "npx", + [ + "wrangler", + "dev", + "--config", + path.join(TEST_DIR, "integration-wrangler.jsonc"), + "--port", + String(this.port), + "--persist-to", + this.persistDir, + "--no-show-interactive-dev-session" + ], + { + cwd: TEST_DIR, + stdio: ["pipe", "pipe", "pipe"], + env: { + ...process.env, + BROWSER: "none" + } + } + ); + + this.process.stdout?.on("data", (data: Buffer) => { + if (process.env.DEBUG) { + process.stderr.write(`[wrangler:${this.port}] ${data.toString()}`); + } + }); + this.process.stderr?.on("data", (data: Buffer) => { + if (process.env.DEBUG) { + process.stderr.write(`[wrangler:${this.port}:err] ${data.toString()}`); + } + }); + + await this.waitForReady(); + } + + async stop(): Promise { + if (!this.process) return; + + const proc = this.process; + this.process = null; + + proc.kill("SIGTERM"); + await new Promise((resolve) => { + const timeout = setTimeout(() => { + proc.kill("SIGKILL"); + resolve(); + }, 3000); + proc.on("exit", () => { + clearTimeout(timeout); + resolve(); + }); + }); + + // Brief pause to let the OS release the port + await new Promise((r) => setTimeout(r, 500)); + } + + async restart(): Promise { + await this.stop(); + await this.start(); + } + + cleanup(): void { + if (fs.existsSync(this.persistDir)) { + fs.rmSync(this.persistDir, { recursive: true, force: true }); + } + } + + private async waitForReady(timeoutMs = 30000): Promise { + const start = Date.now(); + while (Date.now() - start < timeoutMs) { + try { + const res = await fetch(`http://localhost:${this.port}/`); + if (res.ok || res.status === 404) return; + } catch { + // not ready yet + } + await new Promise((r) => setTimeout(r, 200)); + } + throw new Error( + `Server on port ${this.port} did not start within ${timeoutMs}ms` + ); + } +} diff --git a/packages/y-partyserver/src/tests/tsconfig.json b/packages/y-partyserver/src/tests/tsconfig.json index c64482f3..9c10c993 100644 --- a/packages/y-partyserver/src/tests/tsconfig.json +++ b/packages/y-partyserver/src/tests/tsconfig.json @@ -4,7 +4,8 @@ "types": [ "@cloudflare/workers-types/experimental", "@cloudflare/vitest-pool-workers" - ] + ], + "lib": ["ESNext", "DOM"] }, "include": ["./**/*.ts"] } diff --git a/packages/y-partyserver/src/tests/vitest.config.ts b/packages/y-partyserver/src/tests/vitest.config.ts index 17c6a269..7e04b31c 100644 --- a/packages/y-partyserver/src/tests/vitest.config.ts +++ b/packages/y-partyserver/src/tests/vitest.config.ts @@ -2,6 +2,7 @@ import { defineWorkersConfig } from "@cloudflare/vitest-pool-workers/config"; export default defineWorkersConfig({ test: { + include: ["index.test.ts"], poolOptions: { workers: { isolatedStorage: false, diff --git a/packages/y-partyserver/src/tests/vitest.hibernation.config.ts b/packages/y-partyserver/src/tests/vitest.hibernation.config.ts new file mode 100644 index 00000000..4413d50e --- /dev/null +++ b/packages/y-partyserver/src/tests/vitest.hibernation.config.ts @@ -0,0 +1,9 @@ +import { defineConfig } from "vitest/config"; + +export default defineConfig({ + test: { + include: ["hibernation.test.ts"], + testTimeout: 120000, + hookTimeout: 60000 + } +}); diff --git a/packages/y-partyserver/tsconfig.json b/packages/y-partyserver/tsconfig.json new file mode 100644 index 00000000..a432bf58 --- /dev/null +++ b/packages/y-partyserver/tsconfig.json @@ -0,0 +1,12 @@ +{ + "extends": "../../tsconfig.base.json", + "compilerOptions": { + "types": ["@cloudflare/workers-types"] + }, + "exclude": [ + "src/tests/**/*.ts", + "src/provider/**/*.ts", + "src/provider/**/*.tsx", + "scripts/**/*.ts" + ] +} From e4b18124b35c39d7e7a4924e3f8f0482e45614a9 Mon Sep 17 00:00:00 2001 From: Sunil Pai Date: Tue, 24 Feb 2026 00:32:54 +0000 Subject: [PATCH 3/5] Disable awareness heartbeat and re-sync on wake Prevent periodic awareness heartbeats from generating network traffic and enable client-driven re-sync after DO hibernation. Changes include: - Provider: stop sending clock-only awareness renewals by listening to awareness 'change' (not 'update') and clear the awareness _checkInterval instead of using a built-in check loop; removed related reconnect timeout and local interval handling. - Server: clear WSSharedDoc awareness _checkInterval and, on YServer start, send a sync step 1 to all surviving connections so clients re-sync their full state after hibernation wake-up. - Fixtures/worker: add YHibernateTracker Durable Object (and namespace) to track onStart counts via storage and expose onRequest for tests; expose MonacoServer subclass enabling hibernate. - Tests: add hibernation and awareness tests to verify onStart re-sync behavior, multi-client convergence after restart, new-client re-sync, and suppression of clock-only heartbeats / absence of auto-removal. - Integration config: register YHibernateTracker for SQLite migration. These changes are intended to allow Durable Object hibernation (by avoiding periodic awareness traffic) while ensuring state is re-synchronized from surviving connections after a wake-up. --- fixtures/monaco-yjs/src/server.ts | 12 +- packages/y-partyserver/src/provider/index.ts | 43 +++-- packages/y-partyserver/src/server/index.ts | 24 +++ .../src/tests/hibernation.test.ts | 133 ++++++++++++++++ .../y-partyserver/src/tests/index.test.ts | 149 ++++++++++++++++++ .../src/tests/integration-wrangler.jsonc | 8 + packages/y-partyserver/src/tests/worker.ts | 21 +++ 7 files changed, 364 insertions(+), 26 deletions(-) diff --git a/fixtures/monaco-yjs/src/server.ts b/fixtures/monaco-yjs/src/server.ts index ea66a22c..c186a329 100644 --- a/fixtures/monaco-yjs/src/server.ts +++ b/fixtures/monaco-yjs/src/server.ts @@ -1,7 +1,17 @@ import { routePartykitRequest } from "partyserver"; import { YServer } from "y-partyserver"; -export { YServer as MonacoServer }; +// export { YServer as MonacoServer }; + +export class MonacoServer extends YServer { + static options = { + hibernate: true + }; + async onStart(): Promise { + console.log("onStart"); + super.onStart(); + } +} export default { async fetch(request: Request, env: Env): Promise { diff --git a/packages/y-partyserver/src/provider/index.ts b/packages/y-partyserver/src/provider/index.ts index 584e20dd..2ce5d80d 100644 --- a/packages/y-partyserver/src/provider/index.ts +++ b/packages/y-partyserver/src/provider/index.ts @@ -19,12 +19,6 @@ export const messageAuth = 2; // Disable BroadcastChannel by default in Cloudflare Workers / Node const DEFAULT_DISABLE_BC = typeof window === "undefined"; -function assert(condition: unknown, message: string): asserts condition { - if (!condition) { - throw new Error(message); - } -} - const messageHandlers: Array< ( encoder: encoding.Encoder, @@ -101,9 +95,6 @@ messageHandlers[messageAuth] = ( ); }; -// @todo - this should depend on awareness.outdatedTime -const messageReconnectTimeout = 30000; - function permissionDeniedHandler(provider: WebsocketProvider, reason: string) { console.warn(`Permission denied to access ${provider.url}.\n${reason}`); } @@ -290,7 +281,6 @@ export class WebsocketProvider extends Observable { _updateHandler: (update: Uint8Array, origin: unknown) => void; _awarenessUpdateHandler: (update: AwarenessUpdate, origin: unknown) => void; _unloadHandler: () => void; - _checkInterval: ReturnType | number; constructor( serverUrl: string, @@ -407,19 +397,23 @@ export class WebsocketProvider extends Observable { ) { process.on("exit", this._unloadHandler); } - awareness.on("update", this._awarenessUpdateHandler); - this._checkInterval = /** @type {any} */ setInterval(() => { - if ( - this.wsconnected && - messageReconnectTimeout < - time.getUnixTime() - this.wsLastMessageReceived - ) { - assert(this.ws !== null, "ws must not be null"); - // no message received in a long time - not even your own awareness - // updates (which are updated every 15 seconds) - this.ws.close(); - } - }, messageReconnectTimeout / 10); + // Listen on 'change' (not 'update') so that clock-only awareness + // renewals (the 15-second heartbeat) do NOT produce network traffic. + // Only actual state changes (cursor moved, name changed, etc.) are sent. + // This allows Durable Objects to hibernate when sessions are idle. + awareness.on("change", this._awarenessUpdateHandler); + + // Disable the awareness protocol's built-in check interval. + // It renews the local clock every 15s (causing wire traffic that defeats + // DO hibernation) and removes remote peers after 30s of inactivity. + // We handle peer cleanup via WebSocket close events instead. + clearInterval( + ( + awareness as unknown as { + _checkInterval: ReturnType; + } + )._checkInterval + ); if (connect) { this.connect(); } @@ -444,7 +438,6 @@ export class WebsocketProvider extends Observable { if (this._resyncInterval !== 0) { clearInterval(this._resyncInterval); } - clearInterval(this._checkInterval); this.disconnect(); if (typeof window !== "undefined") { window.removeEventListener("unload", this._unloadHandler); @@ -454,7 +447,7 @@ export class WebsocketProvider extends Observable { ) { process.off("exit", this._unloadHandler); } - this.awareness.off("update", this._awarenessUpdateHandler); + this.awareness.off("change", this._awarenessUpdateHandler); this.doc.off("update", this._updateHandler); super.destroy(); } diff --git a/packages/y-partyserver/src/server/index.ts b/packages/y-partyserver/src/server/index.ts index 91036cde..9cbe1ed3 100644 --- a/packages/y-partyserver/src/server/index.ts +++ b/packages/y-partyserver/src/server/index.ts @@ -76,6 +76,18 @@ class WSSharedDoc extends YDoc { super({ gc: true }); this.awareness = new awarenessProtocol.Awareness(this); this.awareness.setLocalState(null); + + // Disable the awareness protocol's built-in check interval. + // It renews the local clock every 15s and removes peers after 30s, + // but we handle peer cleanup via onClose instead. Clearing it here + // prevents it from defeating Durable Object hibernation. + clearInterval( + ( + this.awareness as unknown as { + _checkInterval: ReturnType; + } + )._checkInterval + ); } } @@ -302,6 +314,18 @@ export class YServer< } ) ); + + // After hibernation wake-up, the doc is empty but existing connections + // survive. Re-sync by sending sync step 1 to all connections — they'll + // respond with sync step 2 containing their full state. + // On first start there are no connections, so this is a no-op. + const syncEncoder = encoding.createEncoder(); + encoding.writeVarUint(syncEncoder, messageSync); + syncProtocol.writeSyncStep1(syncEncoder, this.document); + const syncMessage = encoding.toUint8Array(syncEncoder); + for (const conn of this.getConnections()) { + send(conn, syncMessage); + } } // biome-ignore lint/correctness/noUnusedFunctionParameters: so autocomplete works diff --git a/packages/y-partyserver/src/tests/hibernation.test.ts b/packages/y-partyserver/src/tests/hibernation.test.ts index 1376a98d..f6479831 100644 --- a/packages/y-partyserver/src/tests/hibernation.test.ts +++ b/packages/y-partyserver/src/tests/hibernation.test.ts @@ -507,6 +507,139 @@ describe("Server restart — custom messages (hibernate: true)", () => { }); }); +describe("Server restart — onStart re-sync (hibernate: true)", () => { + it("onStartCount increments on each restart", async () => { + const room = `tracker-restart-${Date.now()}`; + + const doc = new Y.Doc(); + const provider = createProvider(room, { + doc, + party: "y-hibernate-tracker" + }); + await waitForSync(provider); + + // Initial onStart should have been called once + const res1 = await fetch( + `http://localhost:${PORT}/parties/y-hibernate-tracker/${room}` + ); + const data1 = (await res1.json()) as { onStartCount: number }; + expect(data1.onStartCount).toBe(1); + + doc.getText("shared").insert(0, "before-restart"); + await new Promise((r) => setTimeout(r, 300)); + + const reconnect = waitForReconnectAndSync(provider); + await server.restart(); + await reconnect; + + // After restart, onStartCount should be 2 + const res2 = await fetch( + `http://localhost:${PORT}/parties/y-hibernate-tracker/${room}` + ); + const data2 = (await res2.json()) as { onStartCount: number }; + expect(data2.onStartCount).toBe(2); + + // Data should have survived via client re-sync + expect(doc.getText("shared").toString()).toBe("before-restart"); + + // New client should also see the re-synced data + const doc2 = new Y.Doc(); + const provider2 = createProvider(room, { + doc: doc2, + party: "y-hibernate-tracker" + }); + await waitForSync(provider2); + await new Promise((r) => setTimeout(r, 500)); + expect(doc2.getText("shared").toString()).toBe("before-restart"); + }); + + it("multiple clients re-sync and stay converged after restart", async () => { + const room = `tracker-multi-restart-${Date.now()}`; + + const docA = new Y.Doc(); + const providerA = createProvider(room, { + doc: docA, + party: "y-hibernate-tracker" + }); + await waitForSync(providerA); + + const docB = new Y.Doc(); + const providerB = createProvider(room, { + doc: docB, + party: "y-hibernate-tracker" + }); + await waitForSync(providerB); + await new Promise((r) => setTimeout(r, 300)); + + // Both edit + docA.getText("shared").insert(0, "A-data"); + await new Promise((r) => setTimeout(r, 200)); + docB.getText("shared").insert(docB.getText("shared").length, "+B-data"); + await new Promise((r) => setTimeout(r, 500)); + + const beforeA = docA.getText("shared").toString(); + expect(beforeA).toBe(docB.getText("shared").toString()); + + const reconnectA = waitForReconnectAndSync(providerA); + const reconnectB = waitForReconnectAndSync(providerB); + await server.restart(); + await Promise.all([reconnectA, reconnectB]); + await new Promise((r) => setTimeout(r, 1000)); + + // Both should still have the same converged content + expect(docA.getText("shared").toString()).toContain("A-data"); + expect(docA.getText("shared").toString()).toContain("B-data"); + expect(docA.getText("shared").toString()).toBe( + docB.getText("shared").toString() + ); + + // Continue collaborating after restart + const updateReceived = new Promise((resolve) => { + docB.on("update", () => { + if (docB.getText("shared").toString().includes("post-restart")) { + resolve(); + } + }); + }); + docA + .getText("shared") + .insert(docA.getText("shared").length, "+post-restart"); + await updateReceived; + + expect(docB.getText("shared").toString()).toContain("post-restart"); + }); + + it("new client gets re-synced state after restart", async () => { + const room = `tracker-newclient-restart-${Date.now()}`; + + const doc = new Y.Doc(); + const provider = createProvider(room, { + doc, + party: "y-hibernate-tracker" + }); + await waitForSync(provider); + + doc.getText("shared").insert(0, "existing-data"); + await new Promise((r) => setTimeout(r, 300)); + + const reconnect = waitForReconnectAndSync(provider); + await server.restart(); + await reconnect; + + // New client connects after restart — should get state + // re-synced from the surviving provider + const doc2 = new Y.Doc(); + const provider2 = createProvider(room, { + doc: doc2, + party: "y-hibernate-tracker" + }); + await waitForSync(provider2); + await new Promise((r) => setTimeout(r, 500)); + + expect(doc2.getText("shared").toString()).toBe("existing-data"); + }); +}); + describe("Server restart — seeded documents (hibernate: true)", () => { it("onLoad-returned YDoc is re-seeded after restart", async () => { const room = `restart-seeded-${Date.now()}`; diff --git a/packages/y-partyserver/src/tests/index.test.ts b/packages/y-partyserver/src/tests/index.test.ts index 9ad40173..ac237c62 100644 --- a/packages/y-partyserver/src/tests/index.test.ts +++ b/packages/y-partyserver/src/tests/index.test.ts @@ -847,3 +847,152 @@ describe("YProvider — URL construction", () => { provider.destroy(); }); }); + +describe("YServer — awareness heartbeat suppression", () => { + it("does not broadcast clock-only awareness renewals to other clients", async () => { + const ctx = createExecutionContext(); + const roomName = "no-heartbeat-test"; + + // Connect client A + const resA = await worker.fetch(wsRequest(`y-basic/${roomName}`), env, ctx); + const wsA = acceptWs(resA); + const docA = new Y.Doc(); + const awarenessA = new awarenessProtocol.Awareness(docA); + await performSync(wsA, docA); + + // Connect client B + const resB = await worker.fetch(wsRequest(`y-basic/${roomName}`), env, ctx); + const wsB = acceptWs(resB); + const docB = new Y.Doc(); + await performSync(wsB, docB); + // Drain initial messages + await collectMessages(wsB, 100); + + // Set awareness state on A — this is an actual state change, should be sent + awarenessA.setLocalState({ user: { name: "Alice" } }); + const enc1 = encoding.createEncoder(); + encoding.writeVarUint(enc1, messageAwareness); + encoding.writeVarUint8Array( + enc1, + awarenessProtocol.encodeAwarenessUpdate(awarenessA, [docA.clientID]) + ); + wsA.send(encoding.toUint8Array(enc1)); + + // B should receive this actual state change + const msgs1 = await collectMessages(wsB, 300); + const awarenessMsgs1 = msgs1.filter((msg) => { + if (!(msg instanceof ArrayBuffer)) return false; + const d = decoding.createDecoder(new Uint8Array(msg)); + return decoding.readVarUint(d) === messageAwareness; + }); + expect(awarenessMsgs1.length).toBeGreaterThan(0); + + // Now simulate a clock-only renewal (same state, just bump the clock) + // This is what the awareness _checkInterval does every 15s + awarenessA.setLocalState(awarenessA.getLocalState()); + const enc2 = encoding.createEncoder(); + encoding.writeVarUint(enc2, messageAwareness); + encoding.writeVarUint8Array( + enc2, + awarenessProtocol.encodeAwarenessUpdate(awarenessA, [docA.clientID]) + ); + wsA.send(encoding.toUint8Array(enc2)); + + // B should receive this too (server forwards all client messages) + // The suppression happens on the CLIENT side (provider doesn't send + // clock-only updates), not the server side. + const msgs2 = await collectMessages(wsB, 300); + const awarenessMsgs2 = msgs2.filter((msg) => { + if (!(msg instanceof ArrayBuffer)) return false; + const d = decoding.createDecoder(new Uint8Array(msg)); + return decoding.readVarUint(d) === messageAwareness; + }); + // Server still forwards — the point is clients won't SEND these + expect(awarenessMsgs2.length).toBeGreaterThan(0); + + wsA.close(); + wsB.close(); + }); + + it("server does not auto-remove awareness after 30s (checkInterval disabled)", async () => { + const ctx = createExecutionContext(); + const roomName = "no-timeout-removal-test"; + + // Connect client A + const resA = await worker.fetch(wsRequest(`y-basic/${roomName}`), env, ctx); + const wsA = acceptWs(resA); + const docA = new Y.Doc(); + const awarenessA = new awarenessProtocol.Awareness(docA); + await performSync(wsA, docA); + + // Set awareness state on A + awarenessA.setLocalState({ user: { name: "Alice" } }); + const enc = encoding.createEncoder(); + encoding.writeVarUint(enc, messageAwareness); + encoding.writeVarUint8Array( + enc, + awarenessProtocol.encodeAwarenessUpdate(awarenessA, [docA.clientID]) + ); + wsA.send(encoding.toUint8Array(enc)); + await new Promise((r) => setTimeout(r, 100)); + + // Connect client B and verify it sees A's awareness + const resB = await worker.fetch(wsRequest(`y-basic/${roomName}`), env, ctx); + const wsB = acceptWs(resB); + const docB = new Y.Doc(); + const awarenessB = new awarenessProtocol.Awareness(docB); + await performSync(wsB, docB); + + const msgs = await collectMessages(wsB, 200); + for (const msg of msgs) { + if (!(msg instanceof ArrayBuffer)) continue; + const decoder = decoding.createDecoder(new Uint8Array(msg)); + const msgType = decoding.readVarUint(decoder); + if (msgType === messageAwareness) { + awarenessProtocol.applyAwarenessUpdate( + awarenessB, + decoding.readVarUint8Array(decoder), + null + ); + } + } + expect(awarenessB.getStates().get(docA.clientID)).toBeDefined(); + + // Wait >3s (the awareness _checkInterval runs every 3s = outdatedTimeout/10) + // If the server's checkInterval were still active, it would start the + // removal process. With it disabled, the state should persist. + await new Promise((r) => setTimeout(r, 4000)); + + // Connect client C to check if A's awareness is still on the server + const resC = await worker.fetch(wsRequest(`y-basic/${roomName}`), env, ctx); + const wsC = acceptWs(resC); + const docC = new Y.Doc(); + const awarenessC = new awarenessProtocol.Awareness(docC); + await performSync(wsC, docC); + + const msgsC = await collectMessages(wsC, 200); + for (const msg of msgsC) { + if (!(msg instanceof ArrayBuffer)) continue; + const decoder = decoding.createDecoder(new Uint8Array(msg)); + const msgType = decoding.readVarUint(decoder); + if (msgType === messageAwareness) { + awarenessProtocol.applyAwarenessUpdate( + awarenessC, + decoding.readVarUint8Array(decoder), + null + ); + } + } + + // A's awareness should still be present — not timed out + expect(awarenessC.getStates().get(docA.clientID)).toBeDefined(); + expect( + (awarenessC.getStates().get(docA.clientID) as { user: { name: string } }) + .user.name + ).toBe("Alice"); + + wsA.close(); + wsB.close(); + wsC.close(); + }); +}); diff --git a/packages/y-partyserver/src/tests/integration-wrangler.jsonc b/packages/y-partyserver/src/tests/integration-wrangler.jsonc index 52a363ba..245fdf01 100644 --- a/packages/y-partyserver/src/tests/integration-wrangler.jsonc +++ b/packages/y-partyserver/src/tests/integration-wrangler.jsonc @@ -28,6 +28,10 @@ { "name": "YCallbackOptions", "class_name": "YCallbackOptions" + }, + { + "name": "YHibernateTracker", + "class_name": "YHibernateTracker" } ] }, @@ -42,6 +46,10 @@ "YOnLoadReturnsDoc", "YCallbackOptions" ] + }, + { + "tag": "v2", + "new_sqlite_classes": ["YHibernateTracker"] } ] } diff --git a/packages/y-partyserver/src/tests/worker.ts b/packages/y-partyserver/src/tests/worker.ts index 7fe66cce..5c11b53d 100644 --- a/packages/y-partyserver/src/tests/worker.ts +++ b/packages/y-partyserver/src/tests/worker.ts @@ -15,6 +15,7 @@ export type Env = { YCustomMessage: DurableObjectNamespace; YOnLoadReturnsDoc: DurableObjectNamespace; YCallbackOptions: DurableObjectNamespace; + YHibernateTracker: DurableObjectNamespace; }; // --------------------------------------------------------------------------- @@ -163,6 +164,26 @@ export class YCallbackOptions extends YServer { } } +// --------------------------------------------------------------------------- +// 7. YServer that tracks onStart calls via storage — detects hibernation +// --------------------------------------------------------------------------- +export class YHibernateTracker extends YServer { + static options = { + hibernate: true + }; + + async onStart() { + const count = (await this.ctx.storage.get("onStartCount")) ?? 0; + await this.ctx.storage.put("onStartCount", count + 1); + return super.onStart(); + } + + async onRequest(): Promise { + const count = (await this.ctx.storage.get("onStartCount")) ?? 0; + return Response.json({ onStartCount: count }); + } +} + // --------------------------------------------------------------------------- // Default fetch handler — routes to the correct DO // --------------------------------------------------------------------------- From a304c035eae3641a88a81add13edd55156be0048 Mon Sep 17 00:00:00 2001 From: Sunil Pai Date: Tue, 24 Feb 2026 00:47:28 +0000 Subject: [PATCH 4/5] Fix Yjs hibernation and awareness handling Add a changeset and update server startup to address Yjs Durable Object hibernation and awareness issues. Key changes: - Add .changeset/hibernation-awareness-fix.md describing fixes for server and provider behavior around hibernation and awareness propagation. - Server: persist connection state instead of in-memory Map, move event handler registration to onStart, disable awareness built-in _checkInterval, resend sync step 1 after wake, simplify send/error handling, and allow onLoad to return a seeded YDoc. - Provider: switch awareness listener from "update" to "change", disable client _checkInterval and provider liveness timer, clear stale awareness meta on WS close, bump awareness clock on reconnect, and fix trailing-slash stripping bug. - Fixture: await super.onStart() in MonacoServer.onStart to ensure proper async startup. These changes enable connections and awareness to survive Durable Object hibernation and reduce unnecessary heartbeat traffic. --- .changeset/hibernation-awareness-fix.md | 24 ++++++++++++++++++++++++ fixtures/monaco-yjs/src/server.ts | 2 +- 2 files changed, 25 insertions(+), 1 deletion(-) create mode 100644 .changeset/hibernation-awareness-fix.md diff --git a/.changeset/hibernation-awareness-fix.md b/.changeset/hibernation-awareness-fix.md new file mode 100644 index 00000000..3905dcba --- /dev/null +++ b/.changeset/hibernation-awareness-fix.md @@ -0,0 +1,24 @@ +--- +"y-partyserver": minor +--- + +Fix Yjs hibernation support and awareness propagation + +**Server:** + +- Replace in-memory `WSSharedDoc.conns` Map with `connection.setState()` and `getConnections()` so connection tracking survives Durable Object hibernation +- Move event handler registration from `WSSharedDoc` constructor into `onStart()` to use `getConnections()` for broadcasting +- Disable awareness protocol's built-in `_checkInterval` in `WSSharedDoc` constructor to prevent timers from defeating hibernation +- On `onStart`, send sync step 1 to all existing connections so clients re-sync the server's document after hibernation wake-up +- Simplify `send()` — no longer forcibly closes connections on failure +- Remove `closeConn()` helper; awareness cleanup now happens in `onClose` via persisted connection state +- Widen `onLoad()` return type to `Promise` to allow seeding the document from a returned YDoc + +**Provider:** + +- Switch awareness event listener from `"update"` to `"change"` so clock-only heartbeat renewals do not produce network traffic (allows DO hibernation during idle sessions) +- Disable awareness protocol's built-in `_checkInterval` on the client to stop 15-second clock renewals and 30-second peer timeout removal +- Remove provider's own `_checkInterval` liveness timer (was coupled to the awareness heartbeat) +- Clear stale awareness meta for remote clients on WebSocket close so reconnecting clients' awareness updates are accepted +- Bump awareness clock on reconnect to ensure remote peers accept the update +- Fix bug where `host.slice(0, -1)` result was not assigned, so trailing slashes were never stripped diff --git a/fixtures/monaco-yjs/src/server.ts b/fixtures/monaco-yjs/src/server.ts index c186a329..7c137dcc 100644 --- a/fixtures/monaco-yjs/src/server.ts +++ b/fixtures/monaco-yjs/src/server.ts @@ -9,7 +9,7 @@ export class MonacoServer extends YServer { }; async onStart(): Promise { console.log("onStart"); - super.onStart(); + await super.onStart(); } } From f9fead410a15a2f0f059838c33df4b68167d539c Mon Sep 17 00:00:00 2001 From: Sunil Pai Date: Tue, 24 Feb 2026 02:30:31 +0000 Subject: [PATCH 5/5] Update .gitignore --- .gitignore | 1 + 1 file changed, 1 insertion(+) diff --git a/.gitignore b/.gitignore index f6b646bf..6670af25 100644 --- a/.gitignore +++ b/.gitignore @@ -131,6 +131,7 @@ dist .wrangler +.wrangler-persist-* .DS_Store .env .dev.vars