diff --git a/.changeset/y-partyserver-with-yjs-mixin.md b/.changeset/y-partyserver-with-yjs-mixin.md new file mode 100644 index 00000000..ea5199ff --- /dev/null +++ b/.changeset/y-partyserver-with-yjs-mixin.md @@ -0,0 +1,5 @@ +--- +"y-partyserver": patch +--- + +Extract Yjs functionality into a `withYjs` mixin that can be applied to any Server subclass. `YServer` is now `withYjs(Server)`. diff --git a/package-lock.json b/package-lock.json index 6fd8ad55..807748cf 100644 --- a/package-lock.json +++ b/package-lock.json @@ -12513,7 +12513,7 @@ "license": "ISC", "dependencies": { "nanoid": "^5.1.6", - "partysocket": "^1.1.15" + "partysocket": "^1.1.16" } }, "packages/partyfn/node_modules/nanoid": { @@ -12622,7 +12622,7 @@ "license": "ISC", "devDependencies": { "@cloudflare/workers-types": "^4.20260303.0", - "partyserver": ">=0.3.1", + "partyserver": "^0.3.1", "partysocket": "^1.1.16" }, "peerDependencies": { @@ -12641,7 +12641,7 @@ "devDependencies": { "@cloudflare/workers-types": "^4.20260303.0", "partyfn": "^0.1.0", - "partyserver": ">=0.3.1", + "partyserver": "^0.3.1", "partysocket": "^1.1.16" }, "peerDependencies": { @@ -12684,7 +12684,7 @@ "license": "ISC", "dependencies": { "cron-parser": "^5.5.0", - "partyserver": ">=0.3.1" + "partyserver": "^0.3.1" } }, "packages/y-partyserver": { @@ -12699,7 +12699,7 @@ "devDependencies": { "@cloudflare/workers-types": "^4.20260303.0", "@types/lodash.debounce": "^4.0.9", - "partyserver": ">=0.3.1", + "partyserver": "^0.3.1", "ws": "^8.19.0", "yjs": "^13.6.29" }, diff --git a/packages/partyfn/package.json b/packages/partyfn/package.json index 2a972458..e7726dda 100644 --- a/packages/partyfn/package.json +++ b/packages/partyfn/package.json @@ -19,7 +19,7 @@ ], "dependencies": { "nanoid": "^5.1.6", - "partysocket": "^1.1.15" + "partysocket": "^1.1.16" }, "scripts": { "build": "tsx scripts/build.ts" diff --git a/packages/partyserver/CHANGELOG.md b/packages/partyserver/CHANGELOG.md index 44668159..e1f4c4a2 100644 --- a/packages/partyserver/CHANGELOG.md +++ b/packages/partyserver/CHANGELOG.md @@ -280,14 +280,12 @@ ### Patch Changes - [`528adea`](https://github.com/threepointone/partyserver/commit/528adeaced6dce6e888d2f54cc75c3569bf2c277) Thanks [@threepointone](https://github.com/threepointone)! - some fixes and tweaks - - getServerByName was throwing on all requests - `Env` is now an optional arg when defining `Server` - `y-partyserver/provider` can now take an optional `prefix` arg to use a custom url to connect - `routePartyKitRequest`/`getServerByName` now accepts `jurisdiction` bonus: - - added a bunch of fixtures - added stubs for docs diff --git a/packages/partysocket/CHANGELOG.md b/packages/partysocket/CHANGELOG.md index b57d2a57..a3472dd7 100644 --- a/packages/partysocket/CHANGELOG.md +++ b/packages/partysocket/CHANGELOG.md @@ -319,7 +319,6 @@ ### Patch Changes - [#251](https://github.com/partykit/partykit/pull/251) [`049bcac`](https://github.com/partykit/partykit/commit/049bcac42aa49e4bddec975c63b7d7984112e450) Thanks [@threepointone](https://github.com/threepointone)! - small tweaks to `init` - - replace `process.env.PARTYKIT_HOST` with just `PARTYKIT_HOST` - add a `tsconfig.json` - add partykit to devDependencies in `init` @@ -340,7 +339,6 @@ - [#211](https://github.com/partykit/partykit/pull/211) [`fffe721`](https://github.com/partykit/partykit/commit/fffe72148e5cc425e80c90b6bf180192df410080) Thanks [@threepointone](https://github.com/threepointone)! - update dependencies - [#191](https://github.com/partykit/partykit/pull/191) [`39cf5ce`](https://github.com/partykit/partykit/commit/39cf5cebf5e699bc50ace8b6d25cd82c807e863a) Thanks [@jevakallio](https://github.com/jevakallio)! - Improve PartySocket types and React hooks API: - - Add websocket lifecycle event handlers to usePartyKit options to reduce need for effects in userland - Allow usePartySocket to provide startClosed option to initialize without opening connection - Fix types for PartySocket#removeEventListener diff --git a/packages/partysub/package.json b/packages/partysub/package.json index fab27f3d..2b8a5e98 100644 --- a/packages/partysub/package.json +++ b/packages/partysub/package.json @@ -46,7 +46,7 @@ }, "devDependencies": { "@cloudflare/workers-types": "^4.20260303.0", - "partyserver": ">=0.3.1", + "partyserver": "^0.3.1", "partysocket": "^1.1.16" } } diff --git a/packages/partysync/package.json b/packages/partysync/package.json index ba911fdf..743d8490 100644 --- a/packages/partysync/package.json +++ b/packages/partysync/package.json @@ -51,7 +51,7 @@ "devDependencies": { "@cloudflare/workers-types": "^4.20260303.0", "partyfn": "^0.1.0", - "partyserver": ">=0.3.1", + "partyserver": "^0.3.1", "partysocket": "^1.1.16" }, "peerDependencies": { diff --git a/packages/partywhen/package.json b/packages/partywhen/package.json index 415ebbb4..d215cb15 100644 --- a/packages/partywhen/package.json +++ b/packages/partywhen/package.json @@ -29,6 +29,6 @@ "description": "A library for scheduling and running tasks in Cloudflare Workers", "dependencies": { "cron-parser": "^5.5.0", - "partyserver": ">=0.3.1" + "partyserver": "^0.3.1" } } diff --git a/packages/y-partyserver/CHANGELOG.md b/packages/y-partyserver/CHANGELOG.md index 41bbd19b..50979dd7 100644 --- a/packages/y-partyserver/CHANGELOG.md +++ b/packages/y-partyserver/CHANGELOG.md @@ -7,7 +7,6 @@ - [#341](https://github.com/cloudflare/partykit/pull/341) [`e7f4b51`](https://github.com/cloudflare/partykit/commit/e7f4b51198904273befb1d39478840c628f6e2b1) Thanks [@threepointone](https://github.com/threepointone)! - Fix Yjs hibernation support and awareness propagation **Server:** - - Replace in-memory `WSSharedDoc.conns` Map with `connection.setState()` and `getConnections()` so connection tracking survives Durable Object hibernation - Move event handler registration from `WSSharedDoc` constructor into `onStart()` to use `getConnections()` for broadcasting - Disable awareness protocol's built-in `_checkInterval` in `WSSharedDoc` constructor to prevent timers from defeating hibernation @@ -17,7 +16,6 @@ - Widen `onLoad()` return type to `Promise` to allow seeding the document from a returned YDoc **Provider:** - - Switch awareness event listener from `"update"` to `"change"` so clock-only heartbeat renewals do not produce network traffic (allows DO hibernation during idle sessions) - Disable awareness protocol's built-in `_checkInterval` on the client to stop 15-second clock renewals and 30-second peer timeout removal - Remove provider's own `_checkInterval` liveness timer (was coupled to the awareness heartbeat) @@ -352,14 +350,12 @@ ### Patch Changes - [`528adea`](https://github.com/threepointone/partyserver/commit/528adeaced6dce6e888d2f54cc75c3569bf2c277) Thanks [@threepointone](https://github.com/threepointone)! - some fixes and tweaks - - getServerByName was throwing on all requests - `Env` is now an optional arg when defining `Server` - `y-partyserver/provider` can now take an optional `prefix` arg to use a custom url to connect - `routePartyKitRequest`/`getServerByName` now accepts `jurisdiction` bonus: - - added a bunch of fixtures - added stubs for docs diff --git a/packages/y-partyserver/package.json b/packages/y-partyserver/package.json index 8a18f491..00b92ec5 100644 --- a/packages/y-partyserver/package.json +++ b/packages/y-partyserver/package.json @@ -52,7 +52,7 @@ "devDependencies": { "@cloudflare/workers-types": "^4.20260303.0", "@types/lodash.debounce": "^4.0.9", - "partyserver": ">=0.3.1", + "partyserver": "^0.3.1", "ws": "^8.19.0", "yjs": "^13.6.29" }, diff --git a/packages/y-partyserver/src/server/index.ts b/packages/y-partyserver/src/server/index.ts index 0e3a39af..50136986 100644 --- a/packages/y-partyserver/src/server/index.ts +++ b/packages/y-partyserver/src/server/index.ts @@ -143,380 +143,408 @@ export interface CallbackOptions { timeout?: number; } -export class YServer< - Env extends Cloudflare.Env = Cloudflare.Env -> extends Server { - static callbackOptions: CallbackOptions = {}; +type ServerClass = new (...args: any[]) => Server; - #ParentClass: typeof YServer = Object.getPrototypeOf(this).constructor; - readonly document: WSSharedDoc = new WSSharedDoc(); - - async onLoad(): Promise { - // to be implemented by the user - return; - } - - async onSave(): Promise { - // to be implemented by the user - } - - /** - * Replaces the document with a different state using Yjs UndoManager key remapping. - * - * @param snapshotUpdate - The snapshot update to replace the document with. - * @param getMetadata (optional) - A function that returns the type of the root for a given key. - */ +export interface YjsInstance { + readonly document: WSSharedDoc; + onLoad(): Promise; + onSave(): Promise; unstable_replaceDocument( snapshotUpdate: Uint8Array, - getMetadata: (key: string) => YjsRootType = () => "Map" - ): void { - try { - const doc = this.document; - const snapshotDoc = new YDoc(); - applyUpdate(snapshotDoc, snapshotUpdate, snapshotOrigin); - - const currentStateVector = encodeStateVector(doc); - const snapshotStateVector = encodeStateVector(snapshotDoc); - - const changesSinceSnapshotUpdate = encodeStateAsUpdate( - doc, - snapshotStateVector - ); + getMetadata?: (key: string) => YjsRootType + ): void; + isReadOnly(connection: Connection): boolean; + onCustomMessage(connection: Connection, message: string): void; + sendCustomMessage(connection: Connection, message: string): void; + broadcastCustomMessage(message: string, excludeConnection?: Connection): void; + handleMessage(connection: Connection, message: WSMessage): void; +} - const undoManager = new UndoManager( - [...snapshotDoc.share.keys()].map((key) => { - const type = getMetadata(key); - if (type === "Text") { - return snapshotDoc.getText(key); - } else if (type === "Map") { - return snapshotDoc.getMap(key); - } else if (type === "Array") { - return snapshotDoc.getArray(key); - } else if (type === "XmlText") { - return snapshotDoc.get(key, XmlText); - } else if (type === "XmlElement") { - return snapshotDoc.get(key, XmlElement); - } else if (type === "XmlFragment") { - return snapshotDoc.get(key, XmlFragment); - } - throw new Error(`Unknown root type: ${type} for key: ${key}`); - }), - { - trackedOrigins: new Set([snapshotOrigin]) - } - ); +export interface YjsStatic { + callbackOptions: CallbackOptions; +} - applyUpdate(snapshotDoc, changesSinceSnapshotUpdate, snapshotOrigin); - undoManager.undo(); +export function withYjs( + Base: TBase +): TBase & YjsStatic & (new (...args: any[]) => YjsInstance) { + class YjsMixin extends Base { + static callbackOptions: CallbackOptions = {}; - const documentChangesSinceSnapshotUpdate = encodeStateAsUpdate( - snapshotDoc, - currentStateVector - ); + readonly document: WSSharedDoc = new WSSharedDoc(); - applyUpdate(this.document, documentChangesSinceSnapshotUpdate); - } catch (error) { - throw new Error( - `Failed to replace document: ${error instanceof Error ? error.message : "Unknown error"}` - ); + async onLoad(): Promise { + // to be implemented by the user + return; } - } - async onStart(): Promise { - const src = await this.onLoad(); - if (src != null) { - const state = encodeStateAsUpdate(src); - applyUpdate(this.document, state); + async onSave(): Promise { + // to be implemented by the user } - // Broadcast doc updates to all connections. - // Uses this.getConnections() which works for both hibernate and non-hibernate - // modes and survives DO hibernation (unlike an in-memory Map). - this.document.on("update", (update: Uint8Array) => { - const encoder = encoding.createEncoder(); - encoding.writeVarUint(encoder, messageSync); - syncProtocol.writeUpdate(encoder, update); - const message = encoding.toUint8Array(encoder); - for (const conn of this.getConnections()) { - send(conn, message); - } - }); - - // Track which awareness clientIDs each connection controls. - // Stored in connection.setState() so it survives hibernation. - // When conn is null (internal changes like removeAwarenessStates on close), - // broadcast the update to remaining connections. - // When conn is non-null (client message), handleMessage broadcasts directly. - this.document.awareness.on( - "update", - ( - { - added, - updated, - removed - }: { - added: Array; - updated: Array; - removed: Array; - }, - conn: Connection | null - ) => { - if (conn !== null) { - // Track which clientIDs this connection controls - try { - const currentIds = new Set(getAwarenessIds(conn)); - for (const clientID of added) currentIds.add(clientID); - for (const clientID of removed) currentIds.delete(clientID); - setAwarenessIds(conn, [...currentIds]); - } catch (_e) { - // ignore — best-effort tracking + /** + * Replaces the document with a different state using Yjs UndoManager key remapping. + * + * @param snapshotUpdate - The snapshot update to replace the document with. + * @param getMetadata (optional) - A function that returns the type of the root for a given key. + */ + unstable_replaceDocument( + snapshotUpdate: Uint8Array, + getMetadata: (key: string) => YjsRootType = () => "Map" + ): void { + try { + const doc = this.document; + const snapshotDoc = new YDoc(); + applyUpdate(snapshotDoc, snapshotUpdate, snapshotOrigin); + + const currentStateVector = encodeStateVector(doc); + const snapshotStateVector = encodeStateVector(snapshotDoc); + + const changesSinceSnapshotUpdate = encodeStateAsUpdate( + doc, + snapshotStateVector + ); + + const undoManager = new UndoManager( + [...snapshotDoc.share.keys()].map((key) => { + const type = getMetadata(key); + if (type === "Text") { + return snapshotDoc.getText(key); + } else if (type === "Map") { + return snapshotDoc.getMap(key); + } else if (type === "Array") { + return snapshotDoc.getArray(key); + } else if (type === "XmlText") { + return snapshotDoc.get(key, XmlText); + } else if (type === "XmlElement") { + return snapshotDoc.get(key, XmlElement); + } else if (type === "XmlFragment") { + return snapshotDoc.get(key, XmlFragment); + } + throw new Error(`Unknown root type: ${type} for key: ${key}`); + }), + { + trackedOrigins: new Set([snapshotOrigin]) } - } else { - // Internal awareness change (e.g. removeAwarenessStates on close) - // — broadcast to all remaining connections - const changedClients = added.concat(updated, removed); - const encoder = encoding.createEncoder(); - encoding.writeVarUint(encoder, messageAwareness); - encoding.writeVarUint8Array( - encoder, - awarenessProtocol.encodeAwarenessUpdate( - this.document.awareness, - changedClients - ) - ); - const buff = encoding.toUint8Array(encoder); - for (const c of this.getConnections()) { - send(c, buff); + ); + + applyUpdate(snapshotDoc, changesSinceSnapshotUpdate, snapshotOrigin); + undoManager.undo(); + + const documentChangesSinceSnapshotUpdate = encodeStateAsUpdate( + snapshotDoc, + currentStateVector + ); + + applyUpdate(this.document, documentChangesSinceSnapshotUpdate); + } catch (error) { + throw new Error( + `Failed to replace document: ${error instanceof Error ? error.message : "Unknown error"}` + ); + } + } + + async onStart(): Promise { + const src = await this.onLoad(); + if (src != null) { + const state = encodeStateAsUpdate(src); + applyUpdate(this.document, state); + } + + // Broadcast doc updates to all connections. + // Uses this.getConnections() which works for both hibernate and non-hibernate + // modes and survives DO hibernation (unlike an in-memory Map). + this.document.on("update", (update: Uint8Array) => { + const encoder = encoding.createEncoder(); + encoding.writeVarUint(encoder, messageSync); + syncProtocol.writeUpdate(encoder, update); + const message = encoding.toUint8Array(encoder); + for (const conn of this.getConnections()) { + send(conn, message); + } + }); + + // Track which awareness clientIDs each connection controls. + // Stored in connection.setState() so it survives hibernation. + // When conn is null (internal changes like removeAwarenessStates on close), + // broadcast the update to remaining connections. + // When conn is non-null (client message), handleMessage broadcasts directly. + this.document.awareness.on( + "update", + ( + { + added, + updated, + removed + }: { + added: Array; + updated: Array; + removed: Array; + }, + conn: Connection | null + ) => { + if (conn !== null) { + // Track which clientIDs this connection controls + try { + const currentIds = new Set(getAwarenessIds(conn)); + for (const clientID of added) currentIds.add(clientID); + for (const clientID of removed) currentIds.delete(clientID); + setAwarenessIds(conn, [...currentIds]); + } catch (_e) { + // ignore — best-effort tracking + } + } else { + // Internal awareness change (e.g. removeAwarenessStates on close) + // — broadcast to all remaining connections + const changedClients = added.concat(updated, removed); + const encoder = encoding.createEncoder(); + encoding.writeVarUint(encoder, messageAwareness); + encoding.writeVarUint8Array( + encoder, + awarenessProtocol.encodeAwarenessUpdate( + this.document.awareness, + changedClients + ) + ); + const buff = encoding.toUint8Array(encoder); + for (const c of this.getConnections()) { + send(c, buff); + } } } - } - ); + ); - // Debounced persistence handler - this.document.on( - "update", - debounce( - (_update: Uint8Array, _origin: Connection, _doc: YDoc) => { - try { - this.onSave().catch((err) => { + // Debounced persistence handler + const ctor = this.constructor as typeof YjsMixin; + this.document.on( + "update", + debounce( + (_update: Uint8Array, _origin: Connection, _doc: YDoc) => { + try { + this.onSave().catch((err) => { + console.error("failed to persist:", err); + }); + } catch (err) { console.error("failed to persist:", err); - }); - } catch (err) { - console.error("failed to persist:", err); + } + }, + ctor.callbackOptions.debounceWait || CALLBACK_DEFAULTS.debounceWait, + { + maxWait: + ctor.callbackOptions.debounceMaxWait || + CALLBACK_DEFAULTS.debounceMaxWait } - }, - this.#ParentClass.callbackOptions.debounceWait || - CALLBACK_DEFAULTS.debounceWait, - { - maxWait: - this.#ParentClass.callbackOptions.debounceMaxWait || - CALLBACK_DEFAULTS.debounceMaxWait - } - ) - ); + ) + ); - // After hibernation wake-up, the doc is empty but existing connections - // survive. Re-sync by sending sync step 1 to all connections — they'll - // respond with sync step 2 containing their full state. - // On first start there are no connections, so this is a no-op. - const syncEncoder = encoding.createEncoder(); - encoding.writeVarUint(syncEncoder, messageSync); - syncProtocol.writeSyncStep1(syncEncoder, this.document); - const syncMessage = encoding.toUint8Array(syncEncoder); - for (const conn of this.getConnections()) { - send(conn, syncMessage); + // After hibernation wake-up, the doc is empty but existing connections + // survive. Re-sync by sending sync step 1 to all connections — they'll + // respond with sync step 2 containing their full state. + // On first start there are no connections, so this is a no-op. + const syncEncoder = encoding.createEncoder(); + encoding.writeVarUint(syncEncoder, messageSync); + syncProtocol.writeSyncStep1(syncEncoder, this.document); + const syncMessage = encoding.toUint8Array(syncEncoder); + for (const conn of this.getConnections()) { + send(conn, syncMessage); + } } - } - // oxlint-disable-next-line no-unused-vars - isReadOnly(connection: Connection): boolean { - // to be implemented by the user - return false; - } - - /** - * Handle custom string messages from the client. - * Override this method to implement custom message handling. - * @param connection - The connection that sent the message - * @param message - The custom message string (without the __YPS: prefix) - */ - // oxlint-disable-next-line no-unused-vars - onCustomMessage(connection: Connection, message: string): void { - // to be implemented by the user - console.warn( - `Received custom message but onCustomMessage is not implemented in ${this.#ParentClass.name}:`, - message - ); - } - - /** - * Send a custom string message to a specific connection. - * @param connection - The connection to send the message to - * @param message - The custom message string to send - */ - sendCustomMessage(connection: Connection, message: string): void { - if ( - connection.readyState !== undefined && - connection.readyState !== wsReadyStateConnecting && - connection.readyState !== wsReadyStateOpen - ) { - return; + // oxlint-disable-next-line no-unused-vars + isReadOnly(connection: Connection): boolean { + // to be implemented by the user + return false; } - try { - connection.send(`__YPS:${message}`); - } catch (e) { - console.warn("Failed to send custom message", e); + + /** + * Handle custom string messages from the client. + * Override this method to implement custom message handling. + * @param connection - The connection that sent the message + * @param message - The custom message string (without the __YPS: prefix) + */ + // oxlint-disable-next-line no-unused-vars + onCustomMessage(connection: Connection, message: string): void { + // to be implemented by the user + console.warn( + `Received custom message but onCustomMessage is not implemented in ${this.constructor.name}:`, + message + ); } - } - /** - * Broadcast a custom string message to all connected clients. - * @param message - The custom message string to broadcast - * @param excludeConnection - Optional connection to exclude from the broadcast - */ - broadcastCustomMessage( - message: string, - excludeConnection?: Connection - ): void { - const formattedMessage = `__YPS:${message}`; - for (const conn of this.getConnections()) { - if (excludeConnection && conn === excludeConnection) { - continue; - } + /** + * Send a custom string message to a specific connection. + * @param connection - The connection to send the message to + * @param message - The custom message string to send + */ + sendCustomMessage(connection: Connection, message: string): void { if ( - conn.readyState !== undefined && - conn.readyState !== wsReadyStateConnecting && - conn.readyState !== wsReadyStateOpen + connection.readyState !== undefined && + connection.readyState !== wsReadyStateConnecting && + connection.readyState !== wsReadyStateOpen ) { - continue; + return; } try { - conn.send(formattedMessage); + connection.send(`__YPS:${message}`); } catch (e) { - console.warn("Failed to broadcast custom message", e); + console.warn("Failed to send custom message", e); } } - } - handleMessage(connection: Connection, message: WSMessage) { - if (typeof message === "string") { - // Handle custom messages with __YPS: prefix - if (message.startsWith("__YPS:")) { - const customMessage = message.slice(6); // Remove __YPS: prefix - this.onCustomMessage(connection, customMessage); - return; + /** + * Broadcast a custom string message to all connected clients. + * @param message - The custom message string to broadcast + * @param excludeConnection - Optional connection to exclude from the broadcast + */ + broadcastCustomMessage( + message: string, + excludeConnection?: Connection + ): void { + const formattedMessage = `__YPS:${message}`; + for (const conn of this.getConnections()) { + if (excludeConnection && conn === excludeConnection) { + continue; + } + if ( + conn.readyState !== undefined && + conn.readyState !== wsReadyStateConnecting && + conn.readyState !== wsReadyStateOpen + ) { + continue; + } + try { + conn.send(formattedMessage); + } catch (e) { + console.warn("Failed to broadcast custom message", e); + } } - console.warn( - `Received non-prefixed string message. Custom messages should be sent using sendMessage() on the provider.` - ); - return; } - try { - const encoder = encoding.createEncoder(); - // Convert ArrayBuffer to Uint8Array if needed (ArrayBufferView like Uint8Array can be used directly) - const uint8Array = - message instanceof Uint8Array - ? message - : message instanceof ArrayBuffer - ? new Uint8Array(message) - : new Uint8Array( - message.buffer, - message.byteOffset, - message.byteLength - ); - const decoder = decoding.createDecoder(uint8Array); - const messageType = decoding.readVarUint(decoder); - switch (messageType) { - case messageSync: - encoding.writeVarUint(encoder, messageSync); - readSyncMessage( - decoder, - encoder, - this.document, - connection, - this.isReadOnly(connection) - ); - - // If the `encoder` only contains the type of reply message and no - // message, there is no need to send the message. When `encoder` only - // contains the type of reply, its length is 1. - if (encoding.length(encoder) > 1) { - send(connection, encoding.toUint8Array(encoder)); - } - break; - case messageAwareness: { - const awarenessData = decoding.readVarUint8Array(decoder); - awarenessProtocol.applyAwarenessUpdate( - this.document.awareness, - awarenessData, - connection - ); - // Forward raw awareness bytes to all connections - const awarenessEncoder = encoding.createEncoder(); - encoding.writeVarUint(awarenessEncoder, messageAwareness); - encoding.writeVarUint8Array(awarenessEncoder, awarenessData); - const awarenessBuff = encoding.toUint8Array(awarenessEncoder); - for (const c of this.getConnections()) { - send(c, awarenessBuff); + + handleMessage(connection: Connection, message: WSMessage) { + if (typeof message === "string") { + // Handle custom messages with __YPS: prefix + if (message.startsWith("__YPS:")) { + const customMessage = message.slice(6); // Remove __YPS: prefix + this.onCustomMessage(connection, customMessage); + return; + } + console.warn( + `Received non-prefixed string message. Custom messages should be sent using sendMessage() on the provider.` + ); + return; + } + try { + const encoder = encoding.createEncoder(); + // Convert ArrayBuffer to Uint8Array if needed (ArrayBufferView like Uint8Array can be used directly) + const uint8Array = + message instanceof Uint8Array + ? message + : message instanceof ArrayBuffer + ? new Uint8Array(message) + : new Uint8Array( + message.buffer, + message.byteOffset, + message.byteLength + ); + const decoder = decoding.createDecoder(uint8Array); + const messageType = decoding.readVarUint(decoder); + switch (messageType) { + case messageSync: + encoding.writeVarUint(encoder, messageSync); + readSyncMessage( + decoder, + encoder, + this.document, + connection, + this.isReadOnly(connection) + ); + + // If the `encoder` only contains the type of reply message and no + // message, there is no need to send the message. When `encoder` only + // contains the type of reply, its length is 1. + if (encoding.length(encoder) > 1) { + send(connection, encoding.toUint8Array(encoder)); + } + break; + case messageAwareness: { + const awarenessData = decoding.readVarUint8Array(decoder); + awarenessProtocol.applyAwarenessUpdate( + this.document.awareness, + awarenessData, + connection + ); + // Forward raw awareness bytes to all connections + const awarenessEncoder = encoding.createEncoder(); + encoding.writeVarUint(awarenessEncoder, messageAwareness); + encoding.writeVarUint8Array(awarenessEncoder, awarenessData); + const awarenessBuff = encoding.toUint8Array(awarenessEncoder); + for (const c of this.getConnections()) { + send(c, awarenessBuff); + } + break; } - break; } + } catch (err) { + console.error(err); + // @ts-expect-error - TODO: fix this + this.document.emit("error", [err]); } - } catch (err) { - console.error(err); - // @ts-expect-error - TODO: fix this - this.document.emit("error", [err]); } - } - onMessage(conn: Connection, message: WSMessage) { - this.handleMessage(conn, message); - } + onMessage(conn: Connection, message: WSMessage) { + this.handleMessage(conn, message); + } - onClose( - connection: Connection, - _code: number, - _reason: string, - _wasClean: boolean - ): void | Promise { - // Read controlled awareness clientIDs from connection state - // (survives hibernation unlike an in-memory Map) - const controlledIds = getAwarenessIds(connection); - if (controlledIds.length > 0) { - awarenessProtocol.removeAwarenessStates( - this.document.awareness, - controlledIds, - null - ); + onClose( + connection: Connection, + _code: number, + _reason: string, + _wasClean: boolean + ): void | Promise { + // Read controlled awareness clientIDs from connection state + // (survives hibernation unlike an in-memory Map) + const controlledIds = getAwarenessIds(connection); + if (controlledIds.length > 0) { + awarenessProtocol.removeAwarenessStates( + this.document.awareness, + controlledIds, + null + ); + } } - } - // TODO: explore why onError gets triggered when a connection closes - - onConnect( - conn: Connection, - _ctx: ConnectionContext - ): void | Promise { - // Note: awareness IDs are lazily initialized when the first awareness - // message is received — no need to call setAwarenessIds(conn, []) here - - // send sync step 1 - const encoder = encoding.createEncoder(); - encoding.writeVarUint(encoder, messageSync); - syncProtocol.writeSyncStep1(encoder, this.document); - send(conn, encoding.toUint8Array(encoder)); - const awarenessStates = this.document.awareness.getStates(); - if (awarenessStates.size > 0) { + // TODO: explore why onError gets triggered when a connection closes + + onConnect( + conn: Connection, + _ctx: ConnectionContext + ): void | Promise { + // Note: awareness IDs are lazily initialized when the first awareness + // message is received — no need to call setAwarenessIds(conn, []) here + + // send sync step 1 const encoder = encoding.createEncoder(); - encoding.writeVarUint(encoder, messageAwareness); - encoding.writeVarUint8Array( - encoder, - awarenessProtocol.encodeAwarenessUpdate( - this.document.awareness, - Array.from(awarenessStates.keys()) - ) - ); + encoding.writeVarUint(encoder, messageSync); + syncProtocol.writeSyncStep1(encoder, this.document); send(conn, encoding.toUint8Array(encoder)); + const awarenessStates = this.document.awareness.getStates(); + if (awarenessStates.size > 0) { + const encoder = encoding.createEncoder(); + encoding.writeVarUint(encoder, messageAwareness); + encoding.writeVarUint8Array( + encoder, + awarenessProtocol.encodeAwarenessUpdate( + this.document.awareness, + Array.from(awarenessStates.keys()) + ) + ); + send(conn, encoding.toUint8Array(encoder)); + } } } + + return YjsMixin as unknown as TBase & + YjsStatic & + (new (...args: any[]) => YjsInstance); } + +export const YServer = withYjs(Server);