diff --git a/.changeset/silver-clocks-sleep.md b/.changeset/silver-clocks-sleep.md new file mode 100644 index 000000000..ca1a0aba5 --- /dev/null +++ b/.changeset/silver-clocks-sleep.md @@ -0,0 +1,20 @@ +--- +"@solid-primitives/sse": minor +--- + +Initial release of `@solid-primitives/sse`. + +### Primitives + +- `makeSSE(url, options?)` — base non-reactive primitive. Creates an `EventSource`, attaches handlers, and returns `[source, cleanup]`. No Solid lifecycle dependency. +- `createSSE(url, options?)` — reactive primitive. Accepts a static or signal URL, closes on owner disposal, and exposes `data`, `error`, `readyState`, `close`, and `reconnect`. +- `makeSSEWorker(target, options?)` — runs the `EventSource` connection inside a Web Worker or SharedWorker, keeping network I/O off the main thread. The reactive API is identical to `createSSE`. + +### Built-in transformers + +- `json` — parse message data as a single JSON value +- `ndjson` — parse newline-delimited JSON into an array +- `lines` — split message data into a `string[]` by newline +- `number` — parse message data as a number via `Number()` +- `safe(transform, fallback?)` — fault-tolerant wrapper; returns `fallback` instead of throwing on bad input +- `pipe(a, b)` — compose two transforms into one diff --git a/.changeset/update-author-email.md b/.changeset/update-author-email.md new file mode 100644 index 000000000..75057a6cf --- /dev/null +++ b/.changeset/update-author-email.md @@ -0,0 +1,20 @@ +--- +"@solid-primitives/analytics": patch +"@solid-primitives/audio": patch +"@solid-primitives/clipboard": patch +"@solid-primitives/event-listener": patch +"@solid-primitives/geolocation": patch +"@solid-primitives/graphql": patch +"@solid-primitives/intersection-observer": patch +"@solid-primitives/media": patch +"@solid-primitives/page-visibility": patch +"@solid-primitives/raf": patch +"@solid-primitives/scheduled": patch +"@solid-primitives/scroll": patch +"@solid-primitives/share": patch +"@solid-primitives/timer": patch +"@solid-primitives/websocket": patch +"@solid-primitives/workers": patch +--- + +Update author email for David D. diff --git a/package.json b/package.json index b9cbc8c19..a2614f8c7 100644 --- a/package.json +++ b/package.json @@ -7,7 +7,7 @@ "type": "git", "url": "git+https://github.com/solidjs-community/solid-primitives.git" }, - "author": "David Di Biase ", + "author": "David Di Biase ", "license": "MIT", "type": "module", "scripts": { diff --git a/packages/analytics/package.json b/packages/analytics/package.json index f76ca97da..5f6555b47 100644 --- a/packages/analytics/package.json +++ b/packages/analytics/package.json @@ -2,7 +2,7 @@ "name": "@solid-primitives/analytics", "version": "0.2.0", "description": "Primitive that makes managing analytics a lot easier.", - "author": "David Di Biase ", + "author": "David Di Biase ", "license": "MIT", "homepage": "https://github.com/solidjs-community/solid-primitives/tree/main/packages/analytics#readme", "repository": { diff --git a/packages/audio/package.json b/packages/audio/package.json index a543b4e8b..234c0cf09 100644 --- a/packages/audio/package.json +++ b/packages/audio/package.json @@ -2,7 +2,7 @@ "name": "@solid-primitives/audio", "version": "1.4.2", "description": "Primitives to manage audio and single sounds.", - "author": "David Di Biase ", + "author": "David Di Biase ", "license": "MIT", "homepage": "https://primitives.solidjs.community/package/audio", "repository": { diff --git a/packages/clipboard/package.json b/packages/clipboard/package.json index cfa14ffe6..92fbd4fa7 100644 --- a/packages/clipboard/package.json +++ b/packages/clipboard/package.json @@ -2,7 +2,7 @@ "name": "@solid-primitives/clipboard", "version": "1.6.2", "description": "Primitives for reading and writing to clipboard.", - "author": "David Di Biase ", + "author": "David Di Biase ", "contributors": [ "Damian Tarnawski " ], diff --git a/packages/event-listener/package.json b/packages/event-listener/package.json index 1a3ac7393..7b0ec2bfd 100644 --- a/packages/event-listener/package.json +++ b/packages/event-listener/package.json @@ -2,7 +2,7 @@ "name": "@solid-primitives/event-listener", "version": "2.4.3", "description": "SolidJS Primitives to manage creating event listeners.", - "author": "David Di Biase ", + "author": "David Di Biase ", "contributors": [ "Damian Tarnawski " ], diff --git a/packages/geolocation/package.json b/packages/geolocation/package.json index ccbf04bd1..5ad1d00d1 100644 --- a/packages/geolocation/package.json +++ b/packages/geolocation/package.json @@ -2,7 +2,7 @@ "name": "@solid-primitives/geolocation", "version": "1.5.2", "description": "Primitives to query geolocation and observe changes.", - "author": "David Di Biase ", + "author": "David Di Biase ", "license": "MIT", "homepage": "https://primitives.solidjs.community/package/geolocation", "repository": { diff --git a/packages/graphql/package.json b/packages/graphql/package.json index ddb3f0bb8..4100d8378 100644 --- a/packages/graphql/package.json +++ b/packages/graphql/package.json @@ -2,7 +2,7 @@ "name": "@solid-primitives/graphql", "version": "2.2.3", "description": "Primitive that generates a client and reactive GraphQL queries", - "author": "David Di Biase ", + "author": "David Di Biase ", "contributors": [ "Damian Tarnawski ", "Alex Ryapolov" diff --git a/packages/intersection-observer/package.json b/packages/intersection-observer/package.json index 8654d0b91..d8f666301 100644 --- a/packages/intersection-observer/package.json +++ b/packages/intersection-observer/package.json @@ -2,7 +2,7 @@ "name": "@solid-primitives/intersection-observer", "version": "2.2.2", "description": "Primitives to support using the intersection observer API.", - "author": "David Di Biase ", + "author": "David Di Biase ", "contributors": [ "Damian Tarnawski " ], diff --git a/packages/media/package.json b/packages/media/package.json index 8fc73990a..246b0416f 100644 --- a/packages/media/package.json +++ b/packages/media/package.json @@ -2,7 +2,7 @@ "name": "@solid-primitives/media", "version": "2.3.3", "description": "Primitives for media query and device features", - "author": "David Di Biase ", + "author": "David Di Biase ", "contributors": [ { "name": "Aditya Agarwal", diff --git a/packages/page-visibility/package.json b/packages/page-visibility/package.json index cedff8362..8b2112704 100644 --- a/packages/page-visibility/package.json +++ b/packages/page-visibility/package.json @@ -2,7 +2,7 @@ "name": "@solid-primitives/page-visibility", "version": "2.1.3", "description": "Primitive to track page visibility", - "author": "David Di Biase", + "author": "David Di Biase ", "contributors": [ "Damian Tarnawski ", "Tom Pichaud " diff --git a/packages/raf/package.json b/packages/raf/package.json index 9138d543e..daf542b62 100644 --- a/packages/raf/package.json +++ b/packages/raf/package.json @@ -2,7 +2,7 @@ "name": "@solid-primitives/raf", "version": "2.3.2", "description": "Primitive that facilitates RAF functionality", - "author": "David Di Biase ", + "author": "David Di Biase ", "contributors": [ "Tito ", "Damian Tarnawski " diff --git a/packages/scheduled/package.json b/packages/scheduled/package.json index 1c1b3fdd5..8f651f6a3 100644 --- a/packages/scheduled/package.json +++ b/packages/scheduled/package.json @@ -3,7 +3,7 @@ "version": "1.5.2", "description": "Primitives for creating scheduled — throttled or debounced — callbacks.", "contributors": [ - "David Di Biase ", + "David Di Biase ", "Damian Tarnawski ", "Jonathan Frere " ], diff --git a/packages/scroll/package.json b/packages/scroll/package.json index c8947fe23..9bbf8d4ee 100644 --- a/packages/scroll/package.json +++ b/packages/scroll/package.json @@ -2,7 +2,7 @@ "name": "@solid-primitives/scroll", "version": "2.1.3", "description": "Reactive primitives to react to element/window scrolling.", - "author": "David Di Biase ", + "author": "David Di Biase ", "contributors": [ "Damian Tarnawski " ], diff --git a/packages/share/package.json b/packages/share/package.json index e0929b7ed..199ecd00d 100644 --- a/packages/share/package.json +++ b/packages/share/package.json @@ -2,7 +2,7 @@ "name": "@solid-primitives/share", "version": "2.2.3", "description": "Primitives to help with sharing content on social media and beyond.", - "author": "David Di Biase ", + "author": "David Di Biase ", "contributors": [ "Omer Ma", "Tom Pichaud " diff --git a/packages/sse/CHANGELOG.md b/packages/sse/CHANGELOG.md new file mode 100644 index 000000000..7741913de --- /dev/null +++ b/packages/sse/CHANGELOG.md @@ -0,0 +1,8 @@ +# @solid-primitives/sse + +## 0.0.100 + +### Initial release + +- `makeSSE` — base non-reactive primitive wrapping the browser `EventSource` API +- `createSSE` — reactive primitive with signals for `data`, `error`, and `readyState`, reactive URL support, SSR safety, and configurable app-level reconnection diff --git a/packages/sse/LICENSE b/packages/sse/LICENSE new file mode 100644 index 000000000..d0f4f2652 --- /dev/null +++ b/packages/sse/LICENSE @@ -0,0 +1,21 @@ +MIT License + +Copyright (c) 2021 Solid Primitives Working Group + +Permission is hereby granted, free of charge, to any person obtaining a copy +of this software and associated documentation files (the "Software"), to deal +in the Software without restriction, including without limitation the rights +to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +copies of the Software, and to permit persons to whom the Software is +furnished to do so, subject to the following conditions: + +The above copyright notice and this permission notice shall be included in all +copies or substantial portions of the Software. + +THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE +SOFTWARE. diff --git a/packages/sse/README.md b/packages/sse/README.md new file mode 100644 index 000000000..2cf1572ed --- /dev/null +++ b/packages/sse/README.md @@ -0,0 +1,237 @@ +

+ Solid Primitives SSE +

+ +# @solid-primitives/sse + +[![turborepo](https://img.shields.io/badge/built%20with-turborepo-cc00ff.svg?style=for-the-badge&logo=turborepo)](https://turborepo.org/) +[![size](https://img.shields.io/bundlephobia/minzip/@solid-primitives/sse?style=for-the-badge&label=size)](https://bundlephobia.com/package/@solid-primitives/sse) +[![version](https://img.shields.io/npm/v/@solid-primitives/sse?style=for-the-badge)](https://www.npmjs.com/package/@solid-primitives/sse) +[![stage](https://img.shields.io/endpoint?style=for-the-badge&url=https%3A%2F%2Fraw.githubusercontent.com%2Fsolidjs-community%2Fsolid-primitives%2Fmain%2Fassets%2Fbadges%2Fstage-0.json)](https://github.com/solidjs-community/solid-primitives#contribution-process) + +Primitives for [Server-Sent Events](https://developer.mozilla.org/en-US/docs/Web/API/Server-sent_events) using the browser's built-in `EventSource` API. + +- [`makeSSE`](#makesse) — Base non-reactive primitive. Creates an `EventSource` and returns a cleanup function. No Solid lifecycle. +- [`createSSE`](#createsse) — Reactive primitive. Accepts a reactive URL, integrates with Solid's owner lifecycle, and returns signals for `data`, `error`, and `readyState`. +- [`makeSSEWorker`](./WORKERS.md) — Runs the SSE connection inside a Web Worker or SharedWorker. See [WORKERS.md](./WORKERS.md). +- [Built-in transformers](./TRANSFORMS.md) — `json`, `ndjson`, `lines`, `number`, `safe`, `pipe`. See [TRANSFORMS.md](./TRANSFORMS.md). + +## Installation + +```bash +npm install @solid-primitives/sse +# or +pnpm add @solid-primitives/sse +``` + +## `makeSSE` + +Creates a raw `EventSource` connection without any Solid lifecycle management. Event handlers are attached immediately. You are responsible for calling the returned cleanup function. + +This is the foundation primitive — `createSSE` uses it internally. + +```ts +import { makeSSE } from "@solid-primitives/sse"; + +const [source, cleanup] = makeSSE("https://api.example.com/events", { + onOpen: () => console.log("Connected"), + onMessage: e => console.log("Message:", e.data), + onError: e => console.error("Error:", e), + events: { + // Named SSE event types (server sends `event: update`) + update: e => console.log("Update:", e.data), + }, +}); + +// When done: +cleanup(); +``` + +### Definition + +```ts +function makeSSE( + url: string | URL, + options?: SSEOptions, +): [source: EventSource, cleanup: VoidFunction]; + +type SSEOptions = { + withCredentials?: boolean; + onOpen?: (event: Event) => void; + onMessage?: (event: MessageEvent) => void; + onError?: (event: Event) => void; + events?: Record void>; +}; +``` + +## `createSSE` + +Reactive SSE primitive. Connects on creation, closes when the owner is disposed, and reacts to URL changes. + +```ts +import { createSSE, SSEReadyState } from "@solid-primitives/sse"; + +const { data, readyState, error, close, reconnect } = createSSE<{ message: string }>( + "https://api.example.com/events", + { + transform: JSON.parse, + reconnect: { retries: 3, delay: 2000 }, + }, +); + +return ( +
+ Connecting…

}> +

Latest: {data()?.message ?? "—"}

+
+ +

Connection error

+
+ + +
+); +``` + +### Reactive URL + +When the URL is a signal accessor, the connection is replaced whenever the URL changes: + +```ts +const [userId, setUserId] = createSignal("user-1"); + +const { data } = createSSE( + () => `https://api.example.com/notifications/${userId()}`, + { transform: JSON.parse }, +); +``` + +Changing `userId()` will close the existing connection and open a new one to the updated URL. + +### Options + +| Option | Type | Default | Description | +|---|---|---|---| +| `withCredentials` | `boolean` | `false` | Send credentials with the request | +| `onOpen` | `(e: Event) => void` | — | Called when the connection opens | +| `onMessage` | `(e: MessageEvent) => void` | — | Called on each unnamed `message` event | +| `onError` | `(e: Event) => void` | — | Called on error | +| `events` | `Record void>` | — | Handlers for named SSE event types | +| `initialValue` | `T` | `undefined` | Initial value of the `data` signal | +| `transform` | `(raw: string) => T` | identity | Parse raw string data, e.g. `JSON.parse` | +| `reconnect` | `boolean \| SSEReconnectOptions` | `false` | App-level reconnect on terminal errors | + +**`SSEReconnectOptions`:** + +| Option | Type | Default | Description | +|---|---|---|---| +| `retries` | `number` | `Infinity` | Max reconnect attempts | +| `delay` | `number` | `3000` | Milliseconds between attempts | + +### Return value + +| Property | Type | Description | +|---|---|---| +| `source` | `Accessor` | Underlying source instance; `undefined` on SSR | +| `data` | `Accessor` | Latest message data | +| `error` | `Accessor` | Latest error event | +| `readyState` | `Accessor` | `SSEReadyState.CONNECTING` / `.OPEN` / `.CLOSED` | +| `close` | `VoidFunction` | Close the connection | +| `reconnect` | `VoidFunction` | Force-close and reopen | + +### `SSEReadyState` + +Named constants for the connection state, exported as a plain object so they are tree-shakeable and work with every bundler: + +```ts +import { SSEReadyState } from "@solid-primitives/sse"; + +SSEReadyState.CONNECTING // 0 +SSEReadyState.OPEN // 1 +SSEReadyState.CLOSED // 2 +``` + +### A note on reconnection + +`EventSource` has native browser-level reconnection built in. For transient network drops the browser automatically retries. The `reconnect` option in `createSSE` is for _application-level_ reconnection — it fires only when `readyState` becomes `SSEReadyState.CLOSED`, meaning the browser has given up entirely. You generally do not need `reconnect: true` for normal usage. + +## Built-in transformers + +Ready-made `transform` functions for the most common SSE data formats. See [TRANSFORMS.md](./TRANSFORMS.md) for full documentation and examples. + +| Transformer | Description | +|---|---| +| [`json`](./TRANSFORMS.md#json) | Parse data as a single JSON value | +| [`ndjson`](./TRANSFORMS.md#ndjson) | Parse newline-delimited JSON into an array | +| [`lines`](./TRANSFORMS.md#lines) | Split data into a `string[]` by newline | +| [`number`](./TRANSFORMS.md#number) | Parse data as a number via `Number()` | +| [`safe(transform, fallback?)`](./TRANSFORMS.md#safetransform-fallback) | Fault-tolerant wrapper — returns `fallback` instead of throwing | +| [`pipe(a, b)`](./TRANSFORMS.md#pipea-b) | Compose two transforms into one | + +## Integration with `@solid-primitives/event-bus` + +Because `bus.emit` matches the `(event: MessageEvent) => void` shape of `onMessage`, you can wire them directly: + +```ts +import { createSSE } from "@solid-primitives/sse"; +import { createEventBus } from "@solid-primitives/event-bus"; + +const bus = createEventBus(); + +createSSE("https://api.example.com/events", { + onMessage: e => bus.emit(e.data), +}); + +bus.listen(msg => console.log("received:", msg)); +``` + +### Multi-channel SSE with `createEventHub` + +For streams that use multiple named event types: + +```ts +import { createSSE } from "@solid-primitives/sse"; +import { createEventBus, createEventHub } from "@solid-primitives/event-bus"; + +type OrderEvent = { id: string; total: number }; +type InventoryEvent = { sku: string; qty: number }; + +const hub = createEventHub({ + order: createEventBus(), + inventory: createEventBus(), +}); + +createSSE("https://api.example.com/stream", { + events: { + order: e => hub.emit("order", JSON.parse(e.data)), + inventory: e => hub.emit("inventory", JSON.parse(e.data)), + }, +}); + +hub.on("order", event => console.log("New order:", event)); +``` + +### Building a reactive message list + +```ts +import { createSSE } from "@solid-primitives/sse"; +import { createStore } from "solid-js/store"; + +const [messages, setMessages] = createStore([]); + +createSSE("https://api.example.com/events", { + onMessage: e => setMessages(msgs => [...msgs, e.data]), +}); + +return {msg =>

{msg}

}
; +``` + +## Running SSE in a Worker + +For high-frequency streams or performance-sensitive apps you can offload the `EventSource` connection to a Web Worker, keeping network I/O off the main thread. The reactive API (`data`, `readyState`, `reconnect`, …) is identical — only the transport moves. + +See [WORKERS.md](./WORKERS.md) for setup instructions, SharedWorker usage, and the full type reference. + +## Changelog + +See [CHANGELOG.md](./CHANGELOG.md). diff --git a/packages/sse/TRANSFORMS.md b/packages/sse/TRANSFORMS.md new file mode 100644 index 000000000..37e77a76c --- /dev/null +++ b/packages/sse/TRANSFORMS.md @@ -0,0 +1,108 @@ +# Built-in transformers + +Ready-made `transform` functions for the most common SSE data formats. Pass one as the `transform` option to `createSSE`: + +```ts +import { createSSE, json } from "@solid-primitives/sse"; + +const { data } = createSSE<{ status: string }>(url, { transform: json }); +``` + +--- + +## `json` + +Parse the message data as a single JSON value. Equivalent to `JSON.parse` but named for consistency with the other transformers. + +```ts +import { createSSE, json } from "@solid-primitives/sse"; + +const { data } = createSSE<{ status: string; ts: number }>(url, { transform: json }); +// data() === { status: "ok", ts: 1718000000 } +``` + +--- + +## `ndjson` + +Parse the message data as [newline-delimited JSON](https://ndjson.org/) (NDJSON / JSON Lines). Each non-empty line is parsed as a separate JSON value and the transformer returns an array. + +Use this when the server batches multiple objects into one SSE event: + +``` +data: {"id":1,"type":"tick"} +data: {"id":2,"type":"tick"} + +``` + +```ts +import { createSSE, ndjson } from "@solid-primitives/sse"; + +const { data } = createSSE(url, { transform: ndjson }); +// data() === [{ id: 1, type: "tick" }, { id: 2, type: "tick" }] +``` + +--- + +## `lines` + +Split the message data into individual lines, returning a `string[]`. Empty lines are filtered out. Useful for multi-line text events that are not JSON. + +```ts +import { createSSE, lines } from "@solid-primitives/sse"; + +const { data } = createSSE(url, { transform: lines }); +// data() === ["line one", "line two"] +``` + +--- + +## `number` + +Parse the message data as a number using `Number()` semantics. Handy for streams that emit counters, progress percentages, sensor readings, or prices. + +```ts +import { createSSE, number } from "@solid-primitives/sse"; + +const { data } = createSSE(url, { transform: number }); +// data() === 42 +``` + +Note: follows `Number()` coercion — an empty string becomes `0` and non-numeric strings become `NaN`. + +--- + +## `safe(transform, fallback?)` + +Wraps any transform in a `try/catch`. When the inner transform throws, `safe` returns `fallback` instead of propagating the error. This keeps the stream alive across malformed events. + +```ts +import { createSSE, json, number, safe } from "@solid-primitives/sse"; + +// Returns undefined on a bad event instead of throwing +const { data } = createSSE(url, { transform: safe(json) }); + +// With an explicit fallback value +const { data } = createSSE(url, { transform: safe(number, 0) }); +``` + +--- + +## `pipe(a, b)` + +Composes two transforms into one: the output of `a` is passed as the input of `b`. Useful for building custom transforms from existing primitives without writing anonymous functions. + +```ts +import { createSSE, ndjson, json, safe, pipe } from "@solid-primitives/sse"; + +// Parse NDJSON then keep only "tick" rows +type RawEvent = { type: string }; +const { data } = createSSE(url, { + transform: pipe(ndjson, rows => rows.filter(r => r.type === "tick")), +}); + +// Safe JSON with a post-processing step +const { data } = createSSE(url, { + transform: pipe(safe(json<{ label: string }>), ev => ev?.label ?? ""), +}); +``` diff --git a/packages/sse/WORKERS.md b/packages/sse/WORKERS.md new file mode 100644 index 000000000..1c17c61c4 --- /dev/null +++ b/packages/sse/WORKERS.md @@ -0,0 +1,101 @@ +# Running SSE in a Worker + +`@solid-primitives/sse` ships a `makeSSEWorker` adapter that moves the `EventSource` connection into a [Web Worker](https://developer.mozilla.org/en-US/docs/Web/API/Web_Workers_API) or a [SharedWorker](https://developer.mozilla.org/en-US/docs/Web/API/SharedWorker). The reactive API you get back from `createSSE` is identical — `data`, `readyState`, `reconnect`, etc. work exactly as documented in the [README](./README.md). + +## When to use this + +- **High-frequency streams** — parsing and dispatching many events per second on the main thread can cause jank. Moving the connection to a Worker keeps that work off the UI thread. +- **SharedWorker** — if multiple tabs in the same origin connect to the same SSE endpoint, a SharedWorker lets them share a single Worker process (though each tab still gets its own `EventSource` connection inside the worker). + +For typical usage — a handful of events per second — the standard `createSSE` is simpler and sufficient. + +## Setup + +The adapter is in a separate subpath so it adds zero bytes to the main bundle when not used. + +```ts +import { makeSSEWorker } from "@solid-primitives/sse/worker"; +``` + +You also need the companion handler script that runs inside the Worker: + +```ts +import "@solid-primitives/sse/worker-handler"; +``` + +Load it via your bundler's `new URL(…, import.meta.url)` syntax to get a correctly resolved URL at build time. + +## Dedicated Worker + +```ts +import { createSSE } from "@solid-primitives/sse"; +import { makeSSEWorker } from "@solid-primitives/sse/worker"; + +const worker = new Worker( + new URL("@solid-primitives/sse/worker-handler", import.meta.url), + { type: "module" }, +); + +const { data, readyState, error, close, reconnect } = createSSE<{ msg: string }>( + "https://api.example.com/events", + { + source: makeSSEWorker(worker), + transform: JSON.parse, + reconnect: { retries: 3, delay: 2000 }, + }, +); +``` + +That's the only change compared to a standard `createSSE` call — pass `source: makeSSEWorker(worker)` and everything else stays the same. + +## SharedWorker + +A SharedWorker is shared across all tabs on the same origin. Pass `sw.port` (a `MessagePort`) in place of the `Worker` instance: + +```ts +import { createSSE } from "@solid-primitives/sse"; +import { makeSSEWorker } from "@solid-primitives/sse/worker"; + +const sw = new SharedWorker( + new URL("@solid-primitives/sse/worker-handler", import.meta.url), + { type: "module" }, +); +sw.port.start(); // required to activate a MessagePort + +const { data } = createSSE("https://api.example.com/events", { + source: makeSSEWorker(sw.port), +}); +``` + +`makeSSEWorker` accepts anything that satisfies `SSEWorkerTarget` — both `Worker` and `MessagePort` do. + +## How it works + +`makeSSEWorker(target)` returns an `SSESourceFn`, the same factory interface that `createSSE` uses internally. When `createSSE` opens a connection it calls this factory instead of the default `makeSSE`, which: + +1. Creates a `WorkerEventSource` — an `EventTarget` that posts a `connect` message to the Worker and re-dispatches `open` / `message` / `error` events received back from it. +2. The Worker script (`worker-handler`) receives the `connect` message, creates a real `EventSource` there, and posts events back via `postMessage`. +3. `createSSE`'s reactive machinery — signals, reconnect timer, URL tracking, `onCleanup` — runs on the main thread as normal; it just talks to a `WorkerEventSource` instead of a real `EventSource`. + +## Type reference + +```ts +// @solid-primitives/sse/worker + +function makeSSEWorker(target: SSEWorkerTarget): SSESourceFn; + +/** Accepted by makeSSEWorker — satisfied by both Worker and SharedWorker.port */ +type SSEWorkerTarget = { + postMessage(data: SSEWorkerMessage): void; + addEventListener(type: "message", listener: (e: MessageEvent) => void): void; + removeEventListener(type: "message", listener: (e: MessageEvent) => void): void; +}; + +/** Messages exchanged between the main thread and the Worker */ +type SSEWorkerMessage = + | { type: "connect"; id: string; url: string; withCredentials?: boolean; events?: string[] } + | { type: "disconnect"; id: string } + | { type: "open"; id: string } + | { type: "message"; id: string; data: string; eventType: string } + | { type: "error"; id: string; readyState: SSEReadyState }; +``` diff --git a/packages/sse/dev/index.tsx b/packages/sse/dev/index.tsx new file mode 100644 index 000000000..251b71be9 --- /dev/null +++ b/packages/sse/dev/index.tsx @@ -0,0 +1,67 @@ +import { type Component, createSignal, For, Show } from "solid-js"; +import { render } from "solid-js/web"; +import { createSSE } from "../src/index.js"; + +const readyStateLabel = ["Connecting", "Open", "Closed"] as const; + +const App: Component = () => { + const [url, setUrl] = createSignal("https://localhost:3000/events"); + const [customUrl, setCustomUrl] = createSignal(""); + const [messages, setMessages] = createSignal([]); + + const { data, readyState, error, close, reconnect } = createSSE(url, { + onMessage: e => setMessages(prev => [e.data, ...prev].slice(0, 20)), + }); + + return ( +
+

@solid-primitives/sse dev

+ +
+ + +
+ +

+ Status:{" "} + + {readyStateLabel[readyState()]} + +

+ + +

Error: connection lost

+
+ +

+ Latest data: {data() ?? "(none)"} +

+ +
+ + +
+ +

Messages

+
    + {msg =>
  1. {msg}
  2. }
    +
+
+ ); +}; + +render(() => , document.getElementById("root")!); diff --git a/packages/sse/package.json b/packages/sse/package.json new file mode 100644 index 000000000..8c593d05a --- /dev/null +++ b/packages/sse/package.json @@ -0,0 +1,89 @@ +{ + "name": "@solid-primitives/sse", + "version": "0.0.100", + "description": "Primitives for Server-Sent Events (SSE) using the browser's EventSource API.", + "author": "David Di Biase ", + "contributors": [], + "license": "MIT", + "homepage": "https://primitives.solidjs.community/package/sse", + "repository": { + "type": "git", + "url": "git+https://github.com/solidjs-community/solid-primitives.git" + }, + "bugs": { + "url": "https://github.com/solidjs-community/solid-primitives/issues" + }, + "primitive": { + "name": "sse", + "stage": 0, + "list": [ + "makeSSE", + "createSSE", + "makeSSEWorker", + "json", + "ndjson", + "lines", + "number", + "safe", + "pipe" + ], + "category": "Network" + }, + "keywords": [ + "solid", + "primitives", + "sse", + "eventsource", + "server-sent-events", + "worker", + "shared-worker" + ], + "private": false, + "sideEffects": false, + "files": [ + "dist" + ], + "type": "module", + "module": "./dist/index.js", + "types": "./dist/index.d.ts", + "browser": {}, + "exports": { + ".": { + "import": { + "@solid-primitives/source": "./src/index.ts", + "types": "./dist/index.d.ts", + "default": "./dist/index.js" + } + }, + "./worker": { + "import": { + "@solid-primitives/source": "./src/worker.ts", + "types": "./dist/worker.d.ts", + "default": "./dist/worker.js" + } + }, + "./worker-handler": { + "import": { + "@solid-primitives/source": "./src/worker-handler.ts", + "default": "./dist/worker-handler.js" + } + } + }, + "typesVersions": {}, + "scripts": { + "dev": "node --import=@nothing-but/node-resolve-ts --experimental-transform-types ../../scripts/dev.ts", + "build": "node --import=@nothing-but/node-resolve-ts --experimental-transform-types ../../scripts/build.ts", + "vitest": "vitest -c ../../configs/vitest.config.ts", + "test": "pnpm run vitest", + "test:ssr": "pnpm run vitest --mode ssr" + }, + "dependencies": { + "@solid-primitives/utils": "workspace:^" + }, + "peerDependencies": { + "solid-js": "^1.6.12" + }, + "devDependencies": { + "solid-js": "^1.9.7" + } +} diff --git a/packages/sse/src/index.ts b/packages/sse/src/index.ts new file mode 100644 index 000000000..4cb2863fc --- /dev/null +++ b/packages/sse/src/index.ts @@ -0,0 +1,14 @@ +export { + makeSSE, + createSSE, + SSEReadyState, + type SSEOptions, + type SSEReconnectOptions, + type SSESourceHandle, + type SSESourceFn, + type SSEReadyState, + type CreateSSEOptions, + type SSEReturn, +} from "./sse.js"; + +export { json, ndjson, lines, number, safe, pipe } from "./transform.js"; diff --git a/packages/sse/src/sse.ts b/packages/sse/src/sse.ts new file mode 100644 index 000000000..86b6e241c --- /dev/null +++ b/packages/sse/src/sse.ts @@ -0,0 +1,335 @@ +import { + type Accessor, + createComputed, + createSignal, + onCleanup, + untrack, +} from "solid-js"; +import { isServer } from "solid-js/web"; +import { access, type MaybeAccessor } from "@solid-primitives/utils"; + +// ─── ReadyState ─────────────────────────────────────────────────────────────── + +/** + * Named constants for the SSE connection state, mirroring the `EventSource` + * static properties. Use these instead of bare numbers for readability: + * + * ```ts + * if (readyState() === SSEReadyState.OPEN) { ... } + * ``` + */ +export const SSEReadyState = { + /** Connection is being established. */ + CONNECTING: 0, + /** Connection is open and receiving events. */ + OPEN: 1, + /** Connection is closed. */ + CLOSED: 2, +} as const; + +/** The numeric type of a valid SSE ready-state value (`0 | 1 | 2`). */ +export type SSEReadyState = (typeof SSEReadyState)[keyof typeof SSEReadyState]; + +// ─── Types ──────────────────────────────────────────────────────────────────── + +/** + * Options shared between `makeSSE` and `createSSE`. + */ +export type SSEOptions = { + /** Pass credentials with request (same as EventSourceInit.withCredentials) */ + withCredentials?: boolean; + /** Called when the connection opens */ + onOpen?: (event: Event) => void; + /** Called on every unnamed `"message"` event */ + onMessage?: (event: MessageEvent) => void; + /** Called on error */ + onError?: (event: Event) => void; + /** Handlers for custom named SSE event types, e.g. `{ update: handler }` */ + events?: Record void>; +}; + +export type SSEReconnectOptions = { + /** Maximum number of reconnect attempts. Default: `Infinity` */ + retries?: number; + /** Delay in milliseconds between reconnect attempts. Default: `3000` */ + delay?: number; +}; + +/** + * Minimal interface that both `EventSource` and `WorkerEventSource` satisfy. + * Used as the type of the `source` signal returned by `createSSE`. + */ +export type SSESourceHandle = EventTarget & { + readonly readyState: number; + close(): void; +}; + +/** + * Factory function signature for creating an SSE source. + * The default factory is `makeSSE`; swap it out with `makeSSEWorker(worker)` to + * run the connection inside a Web Worker. + */ +export type SSESourceFn = ( + url: string, + options: SSEOptions, +) => [source: SSESourceHandle, cleanup: VoidFunction]; + +export type CreateSSEOptions = SSEOptions & { + /** Initial value of the `data` signal before any message arrives */ + initialValue?: T; + /** + * Transform raw string data from each message event. + * Use this to parse JSON: `{ transform: JSON.parse }` + */ + transform?: (raw: string) => T; + /** + * App-level reconnect behavior on terminal errors (readyState → CLOSED). + * + * - `false` (default): no app-level reconnect + * - `true`: reconnect with defaults (Infinity retries, 3000ms delay) + * - object: custom `{ retries?, delay? }` + * + * Note: `EventSource` already reconnects natively for transient network + * drops. This option handles cases where the browser gives up entirely. + */ + reconnect?: boolean | SSEReconnectOptions; + /** + * Custom source factory. Defaults to `makeSSE` (creates a real EventSource). + * Swap this out to run SSE in a Worker: + * `source: makeSSEWorker(worker)` + */ + source?: SSESourceFn; +}; + +export type SSEReturn = { + /** The underlying source instance. `undefined` on SSR or before first connect. */ + source: Accessor; + /** The latest message data, parsed through `transform` if provided. */ + data: Accessor; + /** The latest error event, `undefined` when no error has occurred. */ + error: Accessor; + /** + * The current connection state. Use `SSEReadyState` for named comparisons: + * - `SSEReadyState.CONNECTING` (0) + * - `SSEReadyState.OPEN` (1) + * - `SSEReadyState.CLOSED` (2) + */ + readyState: Accessor; + /** Close the connection. */ + close: VoidFunction; + /** Force-close the current connection and open a new one. */ + reconnect: VoidFunction; +}; + +// ─── makeSSE ───────────────────────────────────────────────────────────────── + +/** + * Creates a raw `EventSource` connection without Solid lifecycle management. + * Event handlers are attached immediately; cleanup must be called manually. + * + * ```ts + * const [source, cleanup] = makeSSE("https://api.example.com/events", { + * onMessage: (e) => console.log(e.data), + * onError: (e) => console.error("error", e), + * }); + * // Later: + * cleanup(); + * ``` + * + * @param url The SSE endpoint URL + * @param options Event handlers and `EventSource` options + * @returns Tuple of `[EventSource, cleanup]` + */ +export const makeSSE = ( + url: string | URL, + options: SSEOptions = {}, +): [source: EventSource, cleanup: VoidFunction] => { + const source = new EventSource(url, { withCredentials: options.withCredentials }); + + if (options.onOpen) source.addEventListener("open", options.onOpen); + if (options.onMessage) source.addEventListener("message", options.onMessage); + if (options.onError) source.addEventListener("error", options.onError); + if (options.events) { + for (const [name, handler] of Object.entries(options.events)) + source.addEventListener(name, handler as EventListener); + } + + const cleanup = () => { + source.close(); + if (options.onOpen) source.removeEventListener("open", options.onOpen); + if (options.onMessage) source.removeEventListener("message", options.onMessage); + if (options.onError) source.removeEventListener("error", options.onError); + if (options.events) { + for (const [name, handler] of Object.entries(options.events)) + source.removeEventListener(name, handler as EventListener); + } + }; + + return [source, cleanup]; +}; + +// ─── createSSE ─────────────────────────────────────────────────────────────── + +/** + * Creates a reactive SSE (Server-Sent Events) connection that integrates with + * the Solid reactive system and owner lifecycle. + * + * - Accepts a reactive URL — reconnects automatically when the URL signal changes + * - Closes the connection on owner disposal via `onCleanup` + * - SSR-safe: returns static stubs on the server + * + * ```ts + * const { data, readyState, error, close, reconnect } = createSSE<{ msg: string }>( + * "https://api.example.com/events", + * { transform: JSON.parse, reconnect: { retries: 3, delay: 2000 } }, + * ); + * + * return

{data()?.msg}

; + * ``` + * + * @param url Static URL string or reactive `Accessor` + * @param options Configuration including handlers, transform, and reconnect policy + */ +export const createSSE = ( + url: MaybeAccessor, + options: CreateSSEOptions = {}, +): SSEReturn => { + // ── SSR stub ────────────────────────────────────────────────────────────── + if (isServer) { + return { + source: () => undefined, + data: () => options.initialValue, + error: () => undefined, + readyState: () => SSEReadyState.CLOSED, + close: () => void 0, + reconnect: () => void 0, + }; + } + + // ── Reactive state ──────────────────────────────────────────────────────── + const [source, setSource] = createSignal(undefined); + const [data, setData] = createSignal(options.initialValue); + const [error, setError] = createSignal(undefined); + const [readyState, setReadyState] = createSignal(SSEReadyState.CONNECTING); + + // ── Reconnect config ────────────────────────────────────────────────────── + const reconnectConfig: SSEReconnectOptions = + options.reconnect === true + ? { retries: Infinity, delay: 3000 } + : !options.reconnect + ? { retries: 0, delay: 0 } + : options.reconnect; + + let retriesLeft = reconnectConfig.retries ?? 0; + let reconnectTimer: ReturnType | undefined; + + const clearReconnectTimer = () => { + if (reconnectTimer !== undefined) { + clearTimeout(reconnectTimer); + reconnectTimer = undefined; + } + }; + + // ── Connection management ───────────────────────────────────────────────── + let currentCleanup: VoidFunction | undefined; + + /** Open a fresh connection, resetting the retry counter. */ + const connect = (resolvedUrl: string) => { + retriesLeft = reconnectConfig.retries ?? 0; + _open(resolvedUrl); + }; + + /** Internal: open connection, decrement retries on terminal errors. */ + const _open = (resolvedUrl: string) => { + clearReconnectTimer(); + + const handleOpen = (e: Event) => { + setReadyState(SSEReadyState.OPEN); + setError(undefined); + options.onOpen?.(e); + }; + + const handleMessage = (e: MessageEvent) => { + const value = options.transform ? options.transform(e.data as string) : (e.data as T); + setData(() => value); + options.onMessage?.(e); + }; + + const handleError = (e: Event) => { + const es = e.target as SSESourceHandle; + setReadyState(es.readyState as SSEReadyState); + setError(() => e); + options.onError?.(e); + + // Only app-level reconnect when the browser has given up (CLOSED). + // When readyState is still CONNECTING the browser is handling retries. + if (es.readyState === SSEReadyState.CLOSED && retriesLeft > 0) { + retriesLeft--; + reconnectTimer = setTimeout(() => _open(resolvedUrl), reconnectConfig.delay ?? 3000); + } + }; + + const sourceFn: SSESourceFn = options.source ?? makeSSE; + const [es, cleanup] = sourceFn(resolvedUrl, { + withCredentials: options.withCredentials, + onOpen: handleOpen, + onMessage: handleMessage, + onError: handleError, + events: options.events, + }); + + setSource(() => es); + setReadyState(es.readyState as SSEReadyState); + currentCleanup = cleanup; + }; + + const disconnect = () => { + clearReconnectTimer(); + retriesLeft = 0; + currentCleanup?.(); + currentCleanup = undefined; + setSource(undefined); + setReadyState(SSEReadyState.CLOSED); + }; + + const manualReconnect = () => { + const currentUrl = untrack(() => access(url)); + disconnect(); + connect(currentUrl); + }; + + // ── Initial connection (synchronous) ───────────────────────────────────── + // createEffect is deferred until after the current synchronous code block, + // so we connect immediately here to ensure signals are populated as soon as + // createSSE returns. + connect(untrack(() => access(url))); + + // ── Reactive URL handling ───────────────────────────────────────────────── + // Only needed when url is an accessor. `createComputed` runs synchronously + // on creation (unlike `createEffect`, which is deferred), so the reactive + // subscription to `url` is established immediately. The `prevUrl` guard + // prevents a redundant reconnect on the first pass (we already connected). + if (typeof url === "function") { + let prevUrl = untrack(url as Accessor); + createComputed(() => { + const resolvedUrl = (url as Accessor)(); + if (resolvedUrl !== prevUrl) { + prevUrl = resolvedUrl; + untrack(() => { + currentCleanup?.(); + currentCleanup = undefined; + }); + connect(resolvedUrl); + } + }); + } + + // ── Lifecycle cleanup ───────────────────────────────────────────────────── + onCleanup(() => { + clearReconnectTimer(); + currentCleanup?.(); + currentCleanup = undefined; + }); + + return { source, data, error, readyState, close: disconnect, reconnect: manualReconnect }; +}; diff --git a/packages/sse/src/transform.ts b/packages/sse/src/transform.ts new file mode 100644 index 000000000..3c0c78147 --- /dev/null +++ b/packages/sse/src/transform.ts @@ -0,0 +1,121 @@ +/** + * Built-in transform functions for common SSE data formats. + * Pass one of these as the `transform` option to `createSSE`: + * + * ```ts + * const { data } = createSSE(url, { transform: ndjson }); + * ``` + */ + +/** + * Parse SSE message data as a single JSON value. + * + * Equivalent to `JSON.parse` but named for use alongside the other + * transformers in this module. + * + * ```ts + * const { data } = createSSE<{ status: string }>(url, { transform: json }); + * ``` + */ +export const json = (raw: string): T => JSON.parse(raw) as T; + +/** + * Parse SSE message data as newline-delimited JSON (NDJSON / JSON Lines). + * + * Each non-empty line in the event's `data` field is parsed as a separate + * JSON value. Returns an array of the parsed values. + * + * Use this when the server batches multiple JSON objects into a single SSE + * event, one object per line: + * + * ``` + * data: {"id":1,"type":"tick"} + * data: {"id":2,"type":"tick"} + * + * ``` + * + * ```ts + * const { data } = createSSE(url, { transform: ndjson }); + * // data() === [{ id: 1, type: "tick" }, { id: 2, type: "tick" }] + * ``` + */ +export const ndjson = (raw: string): T[] => + raw + .split("\n") + .filter(line => line !== "") + .map(line => JSON.parse(line) as T); + +/** + * Split SSE message data into individual lines, returning a `string[]`. + * Empty lines are filtered out. + * + * Use this for multi-line text events that are not JSON. + * + * ```ts + * const { data } = createSSE(url, { transform: lines }); + * // data() === ["line one", "line two"] + * ``` + */ +export const lines = (raw: string): string[] => raw.split("\n").filter(line => line !== ""); + +/** + * Parse SSE message data as a number using `Number()` semantics. + * + * Use this for streams that emit plain numeric values: counters, progress + * percentages, sensor readings, prices, etc. + * + * ```ts + * const { data } = createSSE(url, { transform: number }); + * // data() === 42 + * ``` + * + * Note: follows `Number()` coercion — `""` → `0`, non-numeric strings → `NaN`. + */ +export const number = (raw: string): number => Number(raw); + +/** + * Wrap any transform in a `try/catch` so that a malformed event does not + * throw; instead it returns `fallback` (default `undefined`). + * + * ```ts + * // Returns undefined on bad input instead of throwing + * const { data } = createSSE(url, { transform: safe(json) }); + * + * // With an explicit fallback value + * const { data } = createSSE(url, { transform: safe(number, 0) }); + * ``` + */ +export function safe(transform: (raw: string) => T): (raw: string) => T | undefined; +export function safe(transform: (raw: string) => T, fallback: T): (raw: string) => T; +export function safe( + transform: (raw: string) => T, + fallback?: T, +): (raw: string) => T | undefined { + return (raw: string): T | undefined => { + try { + return transform(raw); + } catch { + return fallback; + } + }; +} + +/** + * Compose two transforms into one: the output of `a` is passed as the input + * of `b`. + * + * ```ts + * // Parse NDJSON then keep only "tick" events + * const { data } = createSSE(url, { + * transform: pipe(ndjson, rows => rows.filter(r => r.type === "tick")), + * }); + * + * // Safe JSON followed by a post-processing step + * const { data } = createSSE(url, { + * transform: pipe(safe(json<{ label: string }>), ev => ev?.label ?? ""), + * }); + * ``` + */ +export function pipe(a: (raw: string) => A, b: (a: A) => B): (raw: string) => B { + return (raw: string): B => b(a(raw)); +} diff --git a/packages/sse/src/worker-handler.ts b/packages/sse/src/worker-handler.ts new file mode 100644 index 000000000..0fc2243f4 --- /dev/null +++ b/packages/sse/src/worker-handler.ts @@ -0,0 +1,80 @@ +/** + * Worker script that manages EventSource connections on behalf of the main thread. + * Bundle and load this file as a Worker: + * + * ```ts + * // Dedicated Worker: + * const worker = new Worker(new URL("@solid-primitives/sse/worker-handler", import.meta.url)); + * + * // SharedWorker (one process shared across tabs): + * const sw = new SharedWorker(new URL("@solid-primitives/sse/worker-handler", import.meta.url)); + * sw.port.start(); + * ``` + * + * This file has no Solid reactive code — it is safe to run in any Worker context. + */ +import { makeSSE, SSEReadyState } from "./sse.js"; +import type { SSEWorkerMessage } from "./worker.js"; + +const connections = new Map(); + +/** + * Handle a single incoming message and send responses back via `postBack`. + * Keeping responses tied to the originating channel makes SharedWorker work + * correctly (each tab has its own MessagePort). + */ +function handleMessage( + data: SSEWorkerMessage, + postBack: (msg: SSEWorkerMessage) => void, +): void { + if (data.type === "connect") { + const { id, url, withCredentials, events } = data; + + const [, cleanup] = makeSSE(url, { + withCredentials, + onOpen: () => postBack({ type: "open", id }), + onMessage: ev => + postBack({ type: "message", id, data: ev.data as string, eventType: "message" }), + onError: ev => + postBack({ + type: "error", + id, + readyState: (ev.target as EventSource).readyState as SSEReadyState, + }), + events: Object.fromEntries( + (events ?? []).map(name => [ + name, + (ev: MessageEvent) => + postBack({ type: "message", id, data: ev.data as string, eventType: name }), + ]), + ), + }); + + connections.set(id, cleanup); + } + + if (data.type === "disconnect") { + connections.get(data.id)?.(); + connections.delete(data.id); + } +} + +// ── Dedicated Worker ────────────────────────────────────────────────────────── +// `DedicatedWorkerGlobalScope.postMessage` takes one argument, but the DOM lib +// types `self` as `Window` (which requires `targetOrigin`). Cast to the minimal +// structural interface we actually need — the project's tsconfig does not include +// the WebWorker lib, so we cannot reference DedicatedWorkerGlobalScope by name. +type DedicatedPost = { postMessage(data: SSEWorkerMessage): void }; +self.addEventListener("message", (e: MessageEvent) => { + handleMessage(e.data, msg => (self as unknown as DedicatedPost).postMessage(msg)); +}); + +// ── SharedWorker — each connecting tab gets its own MessagePort ─────────────── +self.addEventListener("connect", (e: Event) => { + const port = (e as MessageEvent).ports?.[0]; + if (!port) return; + port.addEventListener("message", (ev: MessageEvent) => { + handleMessage(ev.data, msg => port.postMessage(msg)); + }); + port.start(); +}); diff --git a/packages/sse/src/worker.ts b/packages/sse/src/worker.ts new file mode 100644 index 000000000..8310c4a1c --- /dev/null +++ b/packages/sse/src/worker.ts @@ -0,0 +1,130 @@ +import { SSEReadyState, type SSEOptions, type SSESourceFn } from "./sse.js"; + +// ─── Protocol types ─────────────────────────────────────────────────────────── + +/** + * Discriminated union of all messages exchanged between the main thread + * and the Worker. Main → Worker: `connect` | `disconnect`. + * Worker → Main: `open` | `message` | `error`. + */ +export type SSEWorkerMessage = + | { type: "connect"; id: string; url: string; withCredentials?: boolean; events?: string[] } + | { type: "disconnect"; id: string } + | { type: "open"; id: string } + | { type: "message"; id: string; data: string; eventType: string } + | { type: "error"; id: string; readyState: SSEReadyState }; + +/** A `Worker` or a `SharedWorker.port` — anything with `postMessage` and `addEventListener`. */ +export type SSEWorkerTarget = { + postMessage(data: SSEWorkerMessage): void; + addEventListener(type: "message", listener: (e: MessageEvent) => void): void; + removeEventListener( + type: "message", + listener: (e: MessageEvent) => void, + ): void; +}; + +// ─── WorkerEventSource ──────────────────────────────────────────────────────── + +/** + * An `EventTarget` facade that tunnels SSE events through a Worker. + * Not exported — consumers use `makeSSEWorker` to obtain instances. + */ +class WorkerEventSource extends EventTarget { + private _readyState: SSEReadyState = SSEReadyState.CONNECTING; + + get readyState(): SSEReadyState { + return this._readyState; + } + + private readonly _id: string; + private readonly _target: SSEWorkerTarget; + private readonly _listener: (e: MessageEvent) => void; + + constructor(target: SSEWorkerTarget, url: string, options: SSEOptions) { + super(); + + this._id = Math.random().toString(36).slice(2, 11); + this._target = target; + + this._listener = (e: MessageEvent) => { + const msg = e.data; + if (msg.id !== this._id) return; + + if (msg.type === "open") { + this._readyState = SSEReadyState.OPEN; + this.dispatchEvent(new Event("open")); + } else if (msg.type === "message") { + this.dispatchEvent(new MessageEvent(msg.eventType, { data: msg.data })); + } else if (msg.type === "error") { + this._readyState = msg.readyState; + this.dispatchEvent(new Event("error")); + } + }; + + target.addEventListener("message", this._listener); + + target.postMessage({ + type: "connect", + id: this._id, + url, + withCredentials: options.withCredentials, + events: options.events ? Object.keys(options.events) : undefined, + }); + } + + close() { + this._readyState = SSEReadyState.CLOSED; + this._target.postMessage({ type: "disconnect", id: this._id }); + this._target.removeEventListener("message", this._listener); + } +} + +// ─── makeSSEWorker ──────────────────────────────────────────────────────────── + +/** + * Returns a `SSESourceFn` that tunnels EventSource connections through a Worker. + * Pass the returned factory as the `source` option to `createSSE`: + * + * ```ts + * const worker = new Worker(new URL("@solid-primitives/sse/worker-handler", import.meta.url)); + * const { data } = createSSE(url, { source: makeSSEWorker(worker) }); + * ``` + * + * Works with `SharedWorker.port` for a single connection shared across tabs: + * + * ```ts + * const sw = new SharedWorker(new URL("@solid-primitives/sse/worker-handler", import.meta.url)); + * sw.port.start(); + * const { data } = createSSE(url, { source: makeSSEWorker(sw.port) }); + * ``` + * + * @param target A `Worker` or `SharedWorker.port` + */ +export function makeSSEWorker(target: SSEWorkerTarget): SSESourceFn { + return (url: string, options: SSEOptions) => { + const source = new WorkerEventSource(target, url, options); + + if (options.onOpen) source.addEventListener("open", options.onOpen); + if (options.onMessage) source.addEventListener("message", options.onMessage as EventListener); + if (options.onError) source.addEventListener("error", options.onError); + if (options.events) { + for (const [name, handler] of Object.entries(options.events)) + source.addEventListener(name, handler as EventListener); + } + + const cleanup = () => { + source.close(); + if (options.onOpen) source.removeEventListener("open", options.onOpen!); + if (options.onMessage) + source.removeEventListener("message", options.onMessage as EventListener); + if (options.onError) source.removeEventListener("error", options.onError!); + if (options.events) { + for (const [name, handler] of Object.entries(options.events)) + source.removeEventListener(name, handler as EventListener); + } + }; + + return [source, cleanup]; + }; +} diff --git a/packages/sse/test/index.test.ts b/packages/sse/test/index.test.ts new file mode 100644 index 000000000..a6ba9b904 --- /dev/null +++ b/packages/sse/test/index.test.ts @@ -0,0 +1,236 @@ +import "./setup"; +import { afterAll, beforeAll, beforeEach, describe, expect, it, vi } from "vitest"; +import { createRoot, createSignal } from "solid-js"; +import { makeSSE, createSSE, SSEReadyState } from "../src/index.js"; +import { MockEventSource } from "./setup.js"; + +beforeAll(() => vi.useFakeTimers()); +beforeEach(() => { + vi.clearAllTimers(); + SSEInstances.length = 0; +}); +afterAll(() => vi.useRealTimers()); + +// ── makeSSE ─────────────────────────────────────────────────────────────────── + +describe("makeSSE", () => { + it("creates an EventSource in CONNECTING state", () => { + const [source, cleanup] = makeSSE("https://example.com/events"); + expect(source).toBeInstanceOf(EventSource); + expect(source.readyState).toBe(SSEReadyState.CONNECTING); + cleanup(); + }); + + it("returns a cleanup that closes the connection", () => { + const [source, cleanup] = makeSSE("https://example.com/events"); + cleanup(); + expect(source.readyState).toBe(SSEReadyState.CLOSED); + }); + + it("fires onOpen when connection opens", () => { + const onOpen = vi.fn(); + const [, cleanup] = makeSSE("https://example.com/events", { onOpen }); + vi.advanceTimersByTime(20); + expect(onOpen).toHaveBeenCalledOnce(); + cleanup(); + }); + + it("fires onMessage for unnamed message events", () => { + const onMessage = vi.fn(); + const [source, cleanup] = makeSSE("https://example.com/events", { onMessage }); + vi.advanceTimersByTime(20); + (source as unknown as MockEventSource).simulateMessage("hello"); + expect(onMessage).toHaveBeenCalledWith(expect.objectContaining({ data: "hello" })); + cleanup(); + }); + + it("fires onError on error events", () => { + const onError = vi.fn(); + const [source, cleanup] = makeSSE("https://example.com/events", { onError }); + (source as unknown as MockEventSource).simulateError(); + expect(onError).toHaveBeenCalledOnce(); + cleanup(); + }); + + it("fires named custom event handlers", () => { + const onUpdate = vi.fn(); + const [source, cleanup] = makeSSE("https://example.com/events", { + events: { update: onUpdate }, + }); + vi.advanceTimersByTime(20); + (source as unknown as MockEventSource).simulateMessage("payload", "update"); + expect(onUpdate).toHaveBeenCalledWith(expect.objectContaining({ data: "payload" })); + cleanup(); + }); + + it("cleanup does not throw before connection opens", () => { + const [, cleanup] = makeSSE("https://example.com/events"); + expect(() => cleanup()).not.toThrow(); + }); +}); + +// ── createSSE ───────────────────────────────────────────────────────────────── + +describe("createSSE", () => { + it("starts in CONNECTING state", () => + createRoot(dispose => { + const { readyState } = createSSE("https://example.com/events"); + expect(readyState()).toBe(SSEReadyState.CONNECTING); + dispose(); + })); + + it("transitions to OPEN when connection opens", () => + createRoot(dispose => { + const { readyState } = createSSE("https://example.com/events"); + vi.advanceTimersByTime(20); + expect(readyState()).toBe(SSEReadyState.OPEN); + dispose(); + })); + + it("provides latest message via data signal", () => + createRoot(dispose => { + const { data, source } = createSSE("https://example.com/events"); + expect(data()).toBeUndefined(); + vi.advanceTimersByTime(20); + (source() as unknown as MockEventSource).simulateMessage("hello"); + expect(data()).toBe("hello"); + dispose(); + })); + + it("applies transform to incoming data", () => + createRoot(dispose => { + const { data, source } = createSSE<{ value: number }>("https://example.com/events", { + transform: JSON.parse, + }); + vi.advanceTimersByTime(20); + (source() as unknown as MockEventSource).simulateMessage(JSON.stringify({ value: 42 })); + expect(data()).toEqual({ value: 42 }); + dispose(); + })); + + it("returns initialValue before any message arrives", () => + createRoot(dispose => { + const { data } = createSSE("https://example.com/events", { + initialValue: "loading", + }); + expect(data()).toBe("loading"); + dispose(); + })); + + it("clears error signal on successful open", () => + createRoot(dispose => { + const { error, source } = createSSE("https://example.com/events", { + reconnect: { retries: 1, delay: 50 }, + }); + vi.advanceTimersByTime(20); + (source() as unknown as MockEventSource).simulateError(); + expect(error()).toBeTruthy(); + // reconnect fires after delay; new source opens + vi.advanceTimersByTime(100); + vi.advanceTimersByTime(20); // new source opens + expect(error()).toBeUndefined(); + dispose(); + })); + + it("transitions to CLOSED and sets error on terminal error", () => + createRoot(dispose => { + const { error, readyState, source } = createSSE("https://example.com/events", { + reconnect: false, + }); + vi.advanceTimersByTime(20); + (source() as unknown as MockEventSource).simulateError(); + expect(readyState()).toBe(SSEReadyState.CLOSED); + expect(error()).toBeTruthy(); + dispose(); + })); + + it("does not app-reconnect on transient errors (browser handles those)", () => + createRoot(dispose => { + const initialCount = SSEInstances.length; + const { source } = createSSE("https://example.com/events", { + reconnect: { retries: 5, delay: 50 }, + }); + vi.advanceTimersByTime(20); + (source() as unknown as MockEventSource).simulateTransientError(); + vi.advanceTimersByTime(300); + // readyState stayed CONNECTING → no new EventSource was created + expect(SSEInstances.length).toBe(initialCount + 1); + dispose(); + })); + + it("auto-reconnects on terminal error when reconnect option is set", () => + createRoot(dispose => { + const { source } = createSSE("https://example.com/events", { + reconnect: { retries: 1, delay: 100 }, + }); + vi.advanceTimersByTime(20); + const first = source(); + (first as unknown as MockEventSource).simulateError(); + expect(source()).toBe(first); // no change yet + vi.advanceTimersByTime(150); + expect(source()).not.toBe(first); // new connection opened + dispose(); + })); + + it("respects retry limit", () => + createRoot(dispose => { + const { source } = createSSE("https://example.com/events", { + reconnect: { retries: 1, delay: 50 }, + }); + vi.advanceTimersByTime(20); + const first = source(); + (first as unknown as MockEventSource).simulateError(); + vi.advanceTimersByTime(100); // first retry + const second = source(); + expect(second).not.toBe(first); + vi.advanceTimersByTime(20); // second opens + (second as unknown as MockEventSource).simulateError(); + vi.advanceTimersByTime(200); // no more retries + expect(source()).toBe(second); // still the same source + dispose(); + })); + + it("close() transitions to CLOSED and stops reconnects", () => + createRoot(dispose => { + const { readyState, close } = createSSE("https://example.com/events"); + vi.advanceTimersByTime(20); + expect(readyState()).toBe(SSEReadyState.OPEN); + close(); + expect(readyState()).toBe(SSEReadyState.CLOSED); + dispose(); + })); + + it("reconnect() opens a fresh connection", () => + createRoot(dispose => { + const { source, reconnect } = createSSE("https://example.com/events"); + vi.advanceTimersByTime(20); + const first = source(); + reconnect(); + expect(source()).not.toBe(first); + expect(first?.readyState).toBe(SSEReadyState.CLOSED); // old one closed + dispose(); + })); + + it("reconnects when the URL signal changes", () => + createRoot(dispose => { + const [url, setUrl] = createSignal("https://example.com/v1/events"); + const { source } = createSSE(url); + vi.advanceTimersByTime(20); + const first = source(); + setUrl("https://example.com/v2/events"); + expect(source()).not.toBe(first); + expect(first?.readyState).toBe(SSEReadyState.CLOSED); + dispose(); + })); + + it("closes connection on owner disposal", () => + new Promise(resolve => + createRoot(dispose => { + const { source } = createSSE("https://example.com/events"); + vi.advanceTimersByTime(20); + const es = source(); + vi.spyOn(es as unknown as MockEventSource, "close").mockImplementation(() => resolve()); + dispose(); + }), + )); +}); diff --git a/packages/sse/test/server.test.ts b/packages/sse/test/server.test.ts new file mode 100644 index 000000000..86137be91 --- /dev/null +++ b/packages/sse/test/server.test.ts @@ -0,0 +1,26 @@ +import { describe, expect, it } from "vitest"; +import { createRoot } from "solid-js"; +import { createSSE } from "../src/index.js"; + +describe("SSR", () => { + it("returns safe stubs without touching EventSource", () => + createRoot(dispose => { + const sse = createSSE("https://example.com/events"); + expect(sse.source()).toBeUndefined(); + expect(sse.data()).toBeUndefined(); + expect(sse.error()).toBeUndefined(); + expect(sse.readyState()).toBe(2); + expect(() => sse.close()).not.toThrow(); + expect(() => sse.reconnect()).not.toThrow(); + dispose(); + })); + + it("exposes initialValue in SSR data stub", () => + createRoot(dispose => { + const { data } = createSSE("https://example.com/events", { + initialValue: "loading", + }); + expect(data()).toBe("loading"); + dispose(); + })); +}); diff --git a/packages/sse/test/setup.ts b/packages/sse/test/setup.ts new file mode 100644 index 000000000..7039642af --- /dev/null +++ b/packages/sse/test/setup.ts @@ -0,0 +1,60 @@ +import { SSEReadyState } from "../src/sse.js"; + +declare global { + // eslint-disable-next-line no-var + var SSEInstances: MockEventSource[]; +} + +(global as any).SSEInstances = [] as MockEventSource[]; + +export class MockEventSource extends EventTarget { + static readonly CONNECTING = SSEReadyState.CONNECTING; + static readonly OPEN = SSEReadyState.OPEN; + static readonly CLOSED = SSEReadyState.CLOSED; + readonly CONNECTING = SSEReadyState.CONNECTING; + readonly OPEN = SSEReadyState.OPEN; + readonly CLOSED = SSEReadyState.CLOSED; + + readyState: SSEReadyState = SSEReadyState.CONNECTING; + withCredentials: boolean; + url: string; + + constructor(url: string, options?: EventSourceInit) { + super(); + this.url = url; + this.withCredentials = options?.withCredentials ?? false; + SSEInstances.push(this); + + setTimeout(() => { + if (this.readyState === SSEReadyState.CONNECTING) { + this.readyState = SSEReadyState.OPEN; + this.dispatchEvent(new Event("open")); + } + }, 10); + } + + /** Simulate a named (or unnamed "message") event arriving from the server. */ + simulateMessage(data: string, eventType = "message") { + this.dispatchEvent(new MessageEvent(eventType, { data })); + } + + /** Simulate a terminal error — `readyState` goes to `CLOSED`. */ + simulateError() { + this.readyState = SSEReadyState.CLOSED; + this.dispatchEvent(new Event("error")); + } + + /** Simulate a transient error — browser is retrying, `readyState` stays `CONNECTING`. */ + simulateTransientError() { + this.readyState = SSEReadyState.CONNECTING; + this.dispatchEvent(new Event("error")); + } + + close() { + this.readyState = SSEReadyState.CLOSED; + const idx = SSEInstances.indexOf(this); + if (idx !== -1) SSEInstances.splice(idx, 1); + } +} + +(global as any).EventSource = MockEventSource; diff --git a/packages/sse/test/transform.test.ts b/packages/sse/test/transform.test.ts new file mode 100644 index 000000000..fe6349bec --- /dev/null +++ b/packages/sse/test/transform.test.ts @@ -0,0 +1,186 @@ +import { describe, expect, it } from "vitest"; +import { json, ndjson, lines, number, safe, pipe } from "../src/transform.js"; + +// ── json ────────────────────────────────────────────────────────────────────── + +describe("json", () => { + it("parses a JSON object", () => { + expect(json('{"a":1}')).toEqual({ a: 1 }); + }); + + it("parses a JSON array", () => { + expect(json("[1,2,3]")).toEqual([1, 2, 3]); + }); + + it("parses a JSON string primitive", () => { + expect(json('"hello"')).toBe("hello"); + }); + + it("parses a JSON number primitive", () => { + expect(json("42")).toBe(42); + }); + + it("throws on invalid JSON", () => { + expect(() => json("not json")).toThrow(); + }); +}); + +// ── ndjson ──────────────────────────────────────────────────────────────────── + +describe("ndjson", () => { + it("parses each line as a JSON value", () => { + expect(ndjson('{"a":1}\n{"b":2}')).toEqual([{ a: 1 }, { b: 2 }]); + }); + + it("handles a single line", () => { + expect(ndjson('{"x":42}')).toEqual([{ x: 42 }]); + }); + + it("ignores empty lines", () => { + expect(ndjson('{"a":1}\n\n{"b":2}')).toEqual([{ a: 1 }, { b: 2 }]); + }); + + it("returns an empty array for an empty string", () => { + expect(ndjson("")).toEqual([]); + }); + + it("handles a trailing newline", () => { + expect(ndjson('{"a":1}\n{"b":2}\n')).toEqual([{ a: 1 }, { b: 2 }]); + }); + + it("throws on an invalid JSON line", () => { + expect(() => ndjson('{"a":1}\nbad')).toThrow(); + }); + + it("parses mixed JSON types per line", () => { + expect(ndjson("1\n2\n3")).toEqual([1, 2, 3]); + }); +}); + +// ── lines ───────────────────────────────────────────────────────────────────── + +describe("lines", () => { + it("splits data into lines", () => { + expect(lines("one\ntwo\nthree")).toEqual(["one", "two", "three"]); + }); + + it("handles a single line with no newline", () => { + expect(lines("only")).toEqual(["only"]); + }); + + it("ignores empty lines", () => { + expect(lines("one\n\ntwo")).toEqual(["one", "two"]); + }); + + it("handles a trailing newline", () => { + expect(lines("one\ntwo\n")).toEqual(["one", "two"]); + }); + + it("returns an empty array for an empty string", () => { + expect(lines("")).toEqual([]); + }); +}); + +// ── number ──────────────────────────────────────────────────────────────────── + +describe("number", () => { + it("parses an integer string", () => { + expect(number("42")).toBe(42); + }); + + it("parses a float string", () => { + expect(number("3.14")).toBe(3.14); + }); + + it("parses a negative number", () => { + expect(number("-7")).toBe(-7); + }); + + it("converts an empty string to 0", () => { + expect(number("")).toBe(0); + }); + + it("converts a non-numeric string to NaN", () => { + expect(number("not a number")).toBeNaN(); + }); +}); + +// ── safe ────────────────────────────────────────────────────────────────────── + +describe("safe", () => { + it("returns the transform result when successful", () => { + expect(safe(json)('{"a":1}')).toEqual({ a: 1 }); + }); + + it("returns undefined when the inner transform throws (no fallback)", () => { + expect(safe(json)("bad json")).toBeUndefined(); + }); + + it("returns the fallback when the inner transform throws", () => { + expect(safe(json, null)("bad json")).toBeNull(); + }); + + it("returns a numeric fallback on error", () => { + expect(safe(number, 0)("NaN")).toBe(NaN); // Number("NaN") === NaN, not throwing + // Demonstrate fallback with a throwing transform: + const throwing = (_: string): number => { + throw new Error("fail"); + }; + expect(safe(throwing, -1)("any")).toBe(-1); + }); + + it("is composable: safe(ndjson) returns undefined on invalid line", () => { + expect(safe(ndjson)('{"a":1}\nbad')).toBeUndefined(); + }); + + it("overload without fallback infers T | undefined return type", () => { + const result: { a: number } | undefined = safe(json<{ a: number }>)('{"a":1}'); + expect(result).toEqual({ a: 1 }); + }); + + it("overload with fallback infers T return type", () => { + const result: { a: number } = safe(json<{ a: number }>, { a: 0 })("bad"); + expect(result).toEqual({ a: 0 }); + }); +}); + +// ── pipe ────────────────────────────────────────────────────────────────────── + +describe("pipe", () => { + it("passes the string through both transforms in order", () => { + const upper = (s: string) => s.toUpperCase(); + const exclaim = (s: string) => `${s}!`; + expect(pipe(upper, exclaim)("hello")).toBe("HELLO!"); + }); + + it("composes json with a post-processing step", () => { + const getLabel = pipe(json<{ label: string }>, ev => ev.label); + expect(getLabel('{"label":"tick"}')).toBe("tick"); + }); + + it("composes ndjson with a filter step", () => { + type Row = { type: string }; + const ticks = pipe( + ndjson, + rows => rows.filter(r => r.type === "tick"), + ); + expect(ticks('{"type":"tick"}\n{"type":"other"}\n{"type":"tick"}')).toEqual([ + { type: "tick" }, + { type: "tick" }, + ]); + }); + + it("composes safe(json) with a fallback mapping", () => { + const getLabel = pipe(safe(json<{ label: string }>), ev => ev?.label ?? ""); + expect(getLabel('{"label":"ok"}')).toBe("ok"); + expect(getLabel("bad")).toBe(""); + }); + + it("infers the correct return type", () => { + const toLength: (raw: string) => number = pipe( + (s: string) => s.split(","), + arr => arr.length, + ); + expect(toLength("a,b,c")).toBe(3); + }); +}); diff --git a/packages/sse/test/worker.test.ts b/packages/sse/test/worker.test.ts new file mode 100644 index 000000000..7593f486b --- /dev/null +++ b/packages/sse/test/worker.test.ts @@ -0,0 +1,317 @@ +import "./setup"; +import { afterAll, beforeAll, beforeEach, describe, expect, it, vi } from "vitest"; +import { createRoot } from "solid-js"; +import { createSSE, SSEReadyState } from "../src/sse.js"; +import { makeSSEWorker, type SSEWorkerMessage, type SSEWorkerTarget } from "../src/worker.js"; + +// ─── MockWorkerTarget ───────────────────────────────────────────────────────── + +/** + * Minimal stub for a Worker / SharedWorker.port. + * Records outgoing `postMessage` calls so tests can assert on them, + * and exposes `respond()` to push messages back from the "worker". + */ +class MockWorkerTarget extends EventTarget implements SSEWorkerTarget { + readonly sent: SSEWorkerMessage[] = []; + + postMessage(data: SSEWorkerMessage): void { + this.sent.push(data); + } + + /** Simulate the Worker sending a message to the main thread. */ + respond(data: SSEWorkerMessage): void { + this.dispatchEvent(new MessageEvent("message", { data })); + } +} + +// ─── makeSSEWorker unit tests ───────────────────────────────────────────────── + +describe("makeSSEWorker", () => { + it("returns a SSESourceFn (callable function)", () => { + const target = new MockWorkerTarget(); + const factory = makeSSEWorker(target); + expect(typeof factory).toBe("function"); + }); + + it("sends a connect message when the factory is called", () => { + const target = new MockWorkerTarget(); + makeSSEWorker(target)("https://example.com/events", {}); + expect(target.sent).toHaveLength(1); + expect(target.sent[0]).toMatchObject({ type: "connect", url: "https://example.com/events" }); + }); + + it("includes withCredentials in the connect message", () => { + const target = new MockWorkerTarget(); + makeSSEWorker(target)("https://example.com/events", { withCredentials: true }); + expect(target.sent[0]).toMatchObject({ withCredentials: true }); + }); + + it("includes event names (not handlers) in the connect message", () => { + const target = new MockWorkerTarget(); + makeSSEWorker(target)("https://example.com/events", { + events: { update: vi.fn(), tick: vi.fn() }, + }); + const msg = target.sent[0] as Extract; + expect(msg.events).toContain("update"); + expect(msg.events).toContain("tick"); + }); + + it("WorkerEventSource starts in CONNECTING state", () => { + const target = new MockWorkerTarget(); + const [source] = makeSSEWorker(target)("https://example.com/events", {}); + expect(source.readyState).toBe(SSEReadyState.CONNECTING); + }); + + it("transitions to OPEN and fires open event on 'open' worker message", () => { + const target = new MockWorkerTarget(); + const [source] = makeSSEWorker(target)("https://example.com/events", {}); + const id = (target.sent[0] as Extract).id; + + const onOpen = vi.fn(); + source.addEventListener("open", onOpen); + target.respond({ type: "open", id }); + + expect(source.readyState).toBe(SSEReadyState.OPEN); + expect(onOpen).toHaveBeenCalledOnce(); + }); + + it("fires the onOpen option callback", () => { + const target = new MockWorkerTarget(); + const onOpen = vi.fn(); + const [source] = makeSSEWorker(target)("https://example.com/events", { onOpen }); + const id = (target.sent[0] as Extract).id; + target.respond({ type: "open", id }); + expect(onOpen).toHaveBeenCalledOnce(); + source.close(); + }); + + it("dispatches a 'message' event with the correct data", () => { + const target = new MockWorkerTarget(); + const [source] = makeSSEWorker(target)("https://example.com/events", {}); + const id = (target.sent[0] as Extract).id; + + target.respond({ type: "open", id }); + const onMessage = vi.fn(); + source.addEventListener("message", onMessage); + target.respond({ type: "message", id, data: "hello", eventType: "message" }); + + expect(onMessage).toHaveBeenCalledWith(expect.objectContaining({ data: "hello" })); + source.close(); + }); + + it("fires the onMessage option callback", () => { + const target = new MockWorkerTarget(); + const onMessage = vi.fn(); + const [source] = makeSSEWorker(target)("https://example.com/events", { onMessage }); + const id = (target.sent[0] as Extract).id; + target.respond({ type: "open", id }); + target.respond({ type: "message", id, data: "ping", eventType: "message" }); + expect(onMessage).toHaveBeenCalledWith(expect.objectContaining({ data: "ping" })); + source.close(); + }); + + it("dispatches custom named events", () => { + const target = new MockWorkerTarget(); + const onUpdate = vi.fn(); + const [source] = makeSSEWorker(target)("https://example.com/events", { + events: { update: onUpdate }, + }); + const id = (target.sent[0] as Extract).id; + + target.respond({ type: "open", id }); + target.respond({ type: "message", id, data: "payload", eventType: "update" }); + + expect(onUpdate).toHaveBeenCalledWith(expect.objectContaining({ data: "payload" })); + source.close(); + }); + + it("updates readyState and dispatches error event on 'error' worker message", () => { + const target = new MockWorkerTarget(); + const [source] = makeSSEWorker(target)("https://example.com/events", {}); + const id = (target.sent[0] as Extract).id; + + const onError = vi.fn(); + source.addEventListener("error", onError); + target.respond({ type: "error", id, readyState: SSEReadyState.CLOSED }); + + expect(source.readyState).toBe(SSEReadyState.CLOSED); + expect(onError).toHaveBeenCalledOnce(); + }); + + it("fires the onError option callback", () => { + const target = new MockWorkerTarget(); + const onError = vi.fn(); + const [source] = makeSSEWorker(target)("https://example.com/events", { onError }); + const id = (target.sent[0] as Extract).id; + target.respond({ type: "error", id, readyState: SSEReadyState.CLOSED }); + expect(onError).toHaveBeenCalledOnce(); + source.close(); + }); + + it("close() sends a disconnect message to the worker", () => { + const target = new MockWorkerTarget(); + const [source] = makeSSEWorker(target)("https://example.com/events", {}); + const id = (target.sent[0] as Extract).id; + + source.close(); + + expect(target.sent).toHaveLength(2); + expect(target.sent[1]).toMatchObject({ type: "disconnect", id }); + expect(source.readyState).toBe(SSEReadyState.CLOSED); + }); + + it("cleanup() closes the source and sends disconnect", () => { + const target = new MockWorkerTarget(); + const [source, cleanup] = makeSSEWorker(target)("https://example.com/events", {}); + const id = (target.sent[0] as Extract).id; + + cleanup(); + + expect(target.sent[1]).toMatchObject({ type: "disconnect", id }); + expect(source.readyState).toBe(SSEReadyState.CLOSED); + }); + + it("ignores messages intended for other connection IDs", () => { + const target = new MockWorkerTarget(); + const [source] = makeSSEWorker(target)("https://example.com/events", {}); + const onOpen = vi.fn(); + source.addEventListener("open", onOpen); + + target.respond({ type: "open", id: "some-other-id" }); + + expect(onOpen).not.toHaveBeenCalled(); + expect(source.readyState).toBe(SSEReadyState.CONNECTING); + source.close(); + }); + + it("two concurrent WorkerEventSources on the same target are independent", () => { + const target = new MockWorkerTarget(); + const [sourceA] = makeSSEWorker(target)("https://example.com/a", {}); + const [sourceB] = makeSSEWorker(target)("https://example.com/b", {}); + + const idA = (target.sent[0] as Extract).id; + const idB = (target.sent[1] as Extract).id; + expect(idA).not.toBe(idB); + + target.respond({ type: "open", id: idA }); + expect(sourceA.readyState).toBe(SSEReadyState.OPEN); + expect(sourceB.readyState).toBe(SSEReadyState.CONNECTING); // unaffected + + sourceA.close(); + sourceB.close(); + }); +}); + +// ─── createSSE + makeSSEWorker integration ──────────────────────────────────── + +describe("createSSE with worker source", () => { + beforeAll(() => vi.useFakeTimers()); + beforeEach(() => vi.clearAllTimers()); + afterAll(() => vi.useRealTimers()); + + it("starts in CONNECTING state", () => + createRoot(dispose => { + const target = new MockWorkerTarget(); + const { readyState } = createSSE("https://example.com/events", { + source: makeSSEWorker(target), + }); + expect(readyState()).toBe(SSEReadyState.CONNECTING); + dispose(); + })); + + it("transitions to OPEN when the worker sends 'open'", () => + createRoot(dispose => { + const target = new MockWorkerTarget(); + const { readyState } = createSSE("https://example.com/events", { + source: makeSSEWorker(target), + }); + const id = (target.sent[0] as Extract).id; + target.respond({ type: "open", id }); + expect(readyState()).toBe(SSEReadyState.OPEN); + dispose(); + })); + + it("updates data signal on message", () => + createRoot(dispose => { + const target = new MockWorkerTarget(); + const { data } = createSSE("https://example.com/events", { + source: makeSSEWorker(target), + }); + const id = (target.sent[0] as Extract).id; + target.respond({ type: "open", id }); + target.respond({ type: "message", id, data: "world", eventType: "message" }); + expect(data()).toBe("world"); + dispose(); + })); + + it("applies transform to data from worker messages", () => + createRoot(dispose => { + const target = new MockWorkerTarget(); + const { data } = createSSE<{ n: number }>("https://example.com/events", { + source: makeSSEWorker(target), + transform: JSON.parse, + }); + const id = (target.sent[0] as Extract).id; + target.respond({ type: "open", id }); + target.respond({ type: "message", id, data: JSON.stringify({ n: 7 }), eventType: "message" }); + expect(data()).toEqual({ n: 7 }); + dispose(); + })); + + it("sends disconnect when the owner is disposed", () => + createRoot(dispose => { + const target = new MockWorkerTarget(); + createSSE("https://example.com/events", { source: makeSSEWorker(target) }); + dispose(); + expect(target.sent.some(m => m.type === "disconnect")).toBe(true); + })); + + it("forwards custom event names to the connect message", () => + createRoot(dispose => { + const target = new MockWorkerTarget(); + createSSE("https://example.com/events", { + source: makeSSEWorker(target), + events: { update: vi.fn(), tick: vi.fn() }, + }); + const msg = target.sent[0] as Extract; + expect(msg.events).toContain("update"); + expect(msg.events).toContain("tick"); + dispose(); + })); + + it("app-level reconnect creates a new WorkerEventSource after terminal error", () => + createRoot(dispose => { + const target = new MockWorkerTarget(); + createSSE("https://example.com/events", { + source: makeSSEWorker(target), + reconnect: { retries: 1, delay: 100 }, + }); + const id1 = (target.sent[0] as Extract).id; + target.respond({ type: "open", id: id1 }); + target.respond({ type: "error", id: id1, readyState: SSEReadyState.CLOSED }); + + // Before the reconnect timer fires, only 1 connect + expect(target.sent.filter(m => m.type === "connect")).toHaveLength(1); + + vi.advanceTimersByTime(150); + + // After the delay, a new connect should have been sent + expect(target.sent.filter(m => m.type === "connect")).toHaveLength(2); + dispose(); + })); + + it("close() sends disconnect and transitions to CLOSED", () => + createRoot(dispose => { + const target = new MockWorkerTarget(); + const { readyState, close } = createSSE("https://example.com/events", { + source: makeSSEWorker(target), + }); + const id = (target.sent[0] as Extract).id; + target.respond({ type: "open", id }); + expect(readyState()).toBe(SSEReadyState.OPEN); + close(); + expect(readyState()).toBe(SSEReadyState.CLOSED); + expect(target.sent.some(m => m.type === "disconnect")).toBe(true); + dispose(); + })); +}); diff --git a/packages/sse/tsconfig.json b/packages/sse/tsconfig.json new file mode 100644 index 000000000..dc1970e16 --- /dev/null +++ b/packages/sse/tsconfig.json @@ -0,0 +1,16 @@ +{ + "extends": "../../tsconfig.json", + "compilerOptions": { + "composite": true, + "outDir": "dist", + "rootDir": "src" + }, + "references": [ + { + "path": "../utils" + } + ], + "include": [ + "src" + ] +} \ No newline at end of file diff --git a/packages/timer/package.json b/packages/timer/package.json index 109579715..eae76deba 100644 --- a/packages/timer/package.json +++ b/packages/timer/package.json @@ -2,7 +2,7 @@ "name": "@solid-primitives/timer", "version": "1.4.3", "description": "Primitives to manage timeout and interval", - "author": "David Di Biase ", + "author": "David Di Biase ", "contributors": [ "Xavier Loh ", "Damian Tarnawski " diff --git a/packages/websocket/package.json b/packages/websocket/package.json index 1a82e4bad..249df0e7e 100644 --- a/packages/websocket/package.json +++ b/packages/websocket/package.json @@ -2,7 +2,7 @@ "name": "@solid-primitives/websocket", "version": "1.3.1", "description": "Primitive to create a web socket connection", - "author": "David Di Biase ", + "author": "David Di Biase ", "contributors": [ "Alex Lohr " ], diff --git a/packages/workers/package.json b/packages/workers/package.json index 30f845346..6357867e3 100644 --- a/packages/workers/package.json +++ b/packages/workers/package.json @@ -2,7 +2,7 @@ "name": "@solid-primitives/workers", "version": "0.4.2", "description": "Primitives that support creating Web Workers.", - "author": "David Di Biase ", + "author": "David Di Biase ", "license": "MIT", "homepage": "https://primitives.solidjs.community/package/workers", "repository": { diff --git a/pnpm-lock.yaml b/pnpm-lock.yaml index 8651734b2..ecadfdb95 100644 --- a/pnpm-lock.yaml +++ b/pnpm-lock.yaml @@ -862,6 +862,16 @@ importers: specifier: ^1.9.7 version: 1.9.7 + packages/sse: + dependencies: + '@solid-primitives/utils': + specifier: workspace:^ + version: link:../utils + devDependencies: + solid-js: + specifier: ^1.9.7 + version: 1.9.7 + packages/state-machine: devDependencies: solid-js: